Thursday, 13 December 2018

error: not found: type Column in Spark Scala


How to get rid of below error
spark-shell --queue= *;

To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
Spark context available as sc 
SQL context available as sqlContext.

scala>  val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
sqlcontext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4f9a8d71  

scala> var selectExpr : List[Column] = List("Type","Item","Price")
<console>:25: error: not found: type Column
         var selectExpr : List[Column] = List("Type","Item","Price")
                               ^

You need to just import org.apache.spark.sql.Column
scala> import org.apache.spark.sql.Column

import org.apache.spark.sql.Column

scala> var selectExpr : List[Column] = List(col("Type"),col("Item"),col("Price"))
selectExpr: List[org.apache.spark.sql.Column] = List(Type, Item, Price)


Thursday, 8 November 2018

How to do an aggregate function on a Spark Dataframe using collect_set


In order to explain usage of collect_set, Lets create a Dataframe with 3 columns.
spark-shell --queue= *;

To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
Spark context available as sc 
SQL context available as sqlContext.

scala>  val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
sqlcontext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4f9a8d71  
 
scala> import org.apache.spark.sql.Column
scala> val BazarDF = Seq(
        ("Veg", "tomato", 1.99),
        ("Veg", "potato", 0.45),
        ("Fruit", "apple", 0.99),
        ("Fruit", "pineapple", 2.59)
         ).toDF("Type", "Item", "Price")
BazarDF: org.apache.spark.sql.DataFrame = [Type: string, Item: string, Price: double]

Now lets do a group by on Type column and get distinct values in Item column using collect_set()
scala> var aggBazarDF = BazarDF.groupBy($"Type")
         .agg(collect_set($"Item").as("All_Items"))
aggBazarDF: org.apache.spark.sql.DataFrame = [Type: string, All_Items: array<string>]
collect_set() : returns distinct values for a particular key specified.
Lets see the resultant Dataframe.
scala>  aggBazarDF.show()
+-----+------------------+
| Type|         All_Items|
+-----+------------------+
|  Veg|  [tomato, potato]|
|Fruit|[apple, pineapple]|
+-----+------------------+

What if we need to remove the square brackets?
We can make use of concat_ws()

scala> var aggBazarDFNew = BazarDF.groupBy($"Type")
     .agg(concat_ws(",",collect_set($"Item"))
                                 .as("All_Items"))
aggBazarDFNew: org.apache.spark.sql.DataFrame = [Type: string, All_Items: string]

scala> aggBazarDFNew.show()
+-----+---------------+
| Type|      All_Items|
+-----+---------------+
|  Veg|  tomato,potato|
|Fruit|apple,pineapple|
+-----+---------------+


Friday, 26 October 2018

How to add multiple withColumn to Spark Dataframe




In order to explain, Lets create a dataframe with 3 columns

spark-shell --queue= *;

To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
Spark context available as sc 
SQL context available as sqlContext.

scala>  val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
sqlcontext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4f9a8d71  
 
scala> import org.apache.spark.sql.Column
scala> val BazarDF = Seq(
        ("Veg", "tomato", 1.99),
        ("Veg", "potato", 0.45),
        ("Fruit", "apple", 0.99),
        ("Fruit", "pineapple", 2.59)
         ).toDF("Type", "Item", "Price")
BazarDF: org.apache.spark.sql.DataFrame = [Type: string, Item: string, Price: double]

Lets see how to add 3 new columns into this dataframe with dummy values.

One way of doing this is

scala> var BazarWithColumnDF = BazarDF.withColumn("Retailer",lit("null").as("StringType"))
          .withColumn("Quantity",lit(0.0).as("DoubleType"))
BazarWithColumnDF: org.apache.spark.sql.DataFrame = 
 [Type: string, Item: string, Price: double, Retailer: string, Quantity: double]

We can use the same method using foldLeft aswell

Now we need to define a list with new columnName to do this.

scala> var ColNameWithDatatype: List[(String, Column)] = List()
ColNameWithDatatype: List[(String, org.apache.spark.sql.Column)] = List()

scala> ColNameWithDatatype = List(("Retailer", lit("null").as("StringType")),
     ("Quantity", lit(0.0).as("DoubleType")))
ColNameWithDatatype: List[(String, org.apache.spark.sql.Column)] = 
 List((Retailer,null AS StringType#91), 
      (Quantity,0.0 AS DoubleType#92))

scala> var BazarWithColumnDF1 = ColNameWithDatatype.foldLeft(BazarDF) 
  { (tempDF, colName) =>
     |       tempDF.withColumn(colName._1, colName._2)
     |     }
BazarWithColumnDF1: org.apache.spark.sql.DataFrame = 
       [Type: string, Item: string, Price: double, 
              Retailer: string, Quantity: double]
scala> BazarWithColumnDF1.show()
+-----+---------+-----+--------+--------+
| Type|     Item|Price|Retailer|Quantity|
+-----+---------+-----+--------+--------+
|  Veg|   tomato| 1.99|    null|     0.0|
|  Veg|   potato| 0.45|    null|     0.0|
|Fruit|    apple| 0.99|    null|     0.0|
|Fruit|pineapple| 2.59|    null|     0.0|
+-----+---------+-----+--------+--------+

Saturday, 3 March 2018

Conditional Join in Spark using Dataframe

Lets see how can we add conditions along with dataframe join in spark
Say we have 2 dataframes: dataFrame1,dataFrame2
val dataFrame1 = hc.sql("select * from tbl1") //id,name,code
val dataFrame2 = hc.sql("select * from tbl2") //id,name,code

We need to join these 2 df's with different columns based on condition.

We have a decision Flag coming in with true/false value.
If the decision flag is true we need to set join condition with id and code columns else only with id column.

So how can we achieve this in scala.
val decision: Boolean = false

Lets set an expression
val exprs = (if (decision != true) 
                   dataFrame1.col("id").equalTo(dataFrame2.col("id"))
             else dataFrame1.col("id").equalTo(dataFrame2.col("id")) 
                   and dataFrame1.col("code").equalTo(dataFrame2.col("code")))
            )

and then join
dataFrame1.join(dataFrame2, exprs).show

This is how you join 2 dataframes with conditions.