Showing posts with label dataframe. Show all posts
Showing posts with label dataframe. Show all posts

Tuesday, 14 June 2022

Save dataframe to table and ADLS path in one go

 Lets see how to save dataframe into a table and create view on adls.


df.write.format('delta')
            .mode('overwrite')
            .option('overwriteSchema', 'true')
            .saveAsTable('{database_name}.{tbl}'.format(database_name = database,tbl = table_name)
                , path = '{base_dir}/{tbl}/'.format(base_dir  =  location,tbl  =  table_name))

Thursday, 24 September 2020

Create Delta table on csv file in python spark

 

You can read files into Dataframe and write out in delta format

Step 1 : Read the input csv

Step 2 : Write the csv to ADLS location using Delta format

Step 3: Create a table on top of it


myCSV= spark.read.csv("/path/to/input/data",header=True,sep=","); 
myCSV.write.format("delta").mode("overwrite").option('overwriteSchema','true').save("/mnt/delta/Employee") 
spark.sql("CREATE TABLE employee USING DELTA LOCATION '/mnt/delta/Employee/'") 

Tuesday, 12 February 2019

How to select multiple columns from a spark data frame using List[String]


Lets see how to select multiple columns from a spark data frame.
Create Example DataFrame
spark-shell --queue= *;

To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
Spark context available as sc 
SQL context available as sqlContext.

scala>  val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
sqlcontext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4f9a8d71  

scala> val BazarDF = Seq(
     | ("Veg", "tomato", 1.99),
     | ("Veg", "potato", 0.45),
     | ("Fruit", "apple", 0.99),
     | ("Fruit", "pineapple", 2.59),
     | ("Fruit", "apple", 1.99)
     | ).toDF("Type", "Item", "Price")
BazarDF: org.apache.spark.sql.DataFrame = [Type: string, Item: string, Price: double]

scala> BazarDF.show()
+-----+---------+-----+
| Type|     Item|Price|
+-----+---------+-----+
|  Veg|   tomato| 1.99|
|  Veg|   potato| 0.45|
|Fruit|    apple| 0.99|
|Fruit|pineapple| 2.59|
|Fruit|    apple| 1.99|
+-----+---------+-----+

Now our example dataframe is ready.
Create a List[String] with column names.
scala> var selectExpr : List[String] = List("Type","Item","Price")
selectExpr: List[String] = List(Type, Item, Price)

Now our list of column names is also created.
Lets select these columns from our dataframe.
Use .head and .tail to select the whole values mentioned in the List()

scala> var dfNew = BazarDF.select(selectExpr.head,selectExpr.tail: _*)
dfNew: org.apache.spark.sql.DataFrame = [Type: string, Item: string, Price: double]

scala> dfNew.show()
+-----+---------+-----+
| Type|     Item|Price|
+-----+---------+-----+
|  Veg|   tomato| 1.99|
|  Veg|   potato| 0.45|
|Fruit|    apple| 0.99|
|Fruit|pineapple| 2.59|
|Fruit|    apple| 1.99|
+-----+---------+-----+

I will also explaine How to select multiple columns from a spark data frame using List[Column] in next post.

Thursday, 8 November 2018

How to do an aggregate function on a Spark Dataframe using collect_set


In order to explain usage of collect_set, Lets create a Dataframe with 3 columns.
spark-shell --queue= *;

To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
Spark context available as sc 
SQL context available as sqlContext.

scala>  val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
sqlcontext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4f9a8d71  
 
scala> import org.apache.spark.sql.Column
scala> val BazarDF = Seq(
        ("Veg", "tomato", 1.99),
        ("Veg", "potato", 0.45),
        ("Fruit", "apple", 0.99),
        ("Fruit", "pineapple", 2.59)
         ).toDF("Type", "Item", "Price")
BazarDF: org.apache.spark.sql.DataFrame = [Type: string, Item: string, Price: double]

Now lets do a group by on Type column and get distinct values in Item column using collect_set()
scala> var aggBazarDF = BazarDF.groupBy($"Type")
         .agg(collect_set($"Item").as("All_Items"))
aggBazarDF: org.apache.spark.sql.DataFrame = [Type: string, All_Items: array<string>]
collect_set() : returns distinct values for a particular key specified.
Lets see the resultant Dataframe.
scala>  aggBazarDF.show()
+-----+------------------+
| Type|         All_Items|
+-----+------------------+
|  Veg|  [tomato, potato]|
|Fruit|[apple, pineapple]|
+-----+------------------+

What if we need to remove the square brackets?
We can make use of concat_ws()

scala> var aggBazarDFNew = BazarDF.groupBy($"Type")
     .agg(concat_ws(",",collect_set($"Item"))
                                 .as("All_Items"))
aggBazarDFNew: org.apache.spark.sql.DataFrame = [Type: string, All_Items: string]

scala> aggBazarDFNew.show()
+-----+---------------+
| Type|      All_Items|
+-----+---------------+
|  Veg|  tomato,potato|
|Fruit|apple,pineapple|
+-----+---------------+


Friday, 26 October 2018

How to add multiple withColumn to Spark Dataframe




In order to explain, Lets create a dataframe with 3 columns

spark-shell --queue= *;

To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
Spark context available as sc 
SQL context available as sqlContext.

scala>  val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
sqlcontext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4f9a8d71  
 
scala> import org.apache.spark.sql.Column
scala> val BazarDF = Seq(
        ("Veg", "tomato", 1.99),
        ("Veg", "potato", 0.45),
        ("Fruit", "apple", 0.99),
        ("Fruit", "pineapple", 2.59)
         ).toDF("Type", "Item", "Price")
BazarDF: org.apache.spark.sql.DataFrame = [Type: string, Item: string, Price: double]

Lets see how to add 3 new columns into this dataframe with dummy values.

One way of doing this is

scala> var BazarWithColumnDF = BazarDF.withColumn("Retailer",lit("null").as("StringType"))
          .withColumn("Quantity",lit(0.0).as("DoubleType"))
BazarWithColumnDF: org.apache.spark.sql.DataFrame = 
 [Type: string, Item: string, Price: double, Retailer: string, Quantity: double]

We can use the same method using foldLeft aswell

Now we need to define a list with new columnName to do this.

scala> var ColNameWithDatatype: List[(String, Column)] = List()
ColNameWithDatatype: List[(String, org.apache.spark.sql.Column)] = List()

scala> ColNameWithDatatype = List(("Retailer", lit("null").as("StringType")),
     ("Quantity", lit(0.0).as("DoubleType")))
ColNameWithDatatype: List[(String, org.apache.spark.sql.Column)] = 
 List((Retailer,null AS StringType#91), 
      (Quantity,0.0 AS DoubleType#92))

scala> var BazarWithColumnDF1 = ColNameWithDatatype.foldLeft(BazarDF) 
  { (tempDF, colName) =>
     |       tempDF.withColumn(colName._1, colName._2)
     |     }
BazarWithColumnDF1: org.apache.spark.sql.DataFrame = 
       [Type: string, Item: string, Price: double, 
              Retailer: string, Quantity: double]
scala> BazarWithColumnDF1.show()
+-----+---------+-----+--------+--------+
| Type|     Item|Price|Retailer|Quantity|
+-----+---------+-----+--------+--------+
|  Veg|   tomato| 1.99|    null|     0.0|
|  Veg|   potato| 0.45|    null|     0.0|
|Fruit|    apple| 0.99|    null|     0.0|
|Fruit|pineapple| 2.59|    null|     0.0|
+-----+---------+-----+--------+--------+

Saturday, 3 March 2018

Conditional Join in Spark using Dataframe

Lets see how can we add conditions along with dataframe join in spark
Say we have 2 dataframes: dataFrame1,dataFrame2
val dataFrame1 = hc.sql("select * from tbl1") //id,name,code
val dataFrame2 = hc.sql("select * from tbl2") //id,name,code

We need to join these 2 df's with different columns based on condition.

We have a decision Flag coming in with true/false value.
If the decision flag is true we need to set join condition with id and code columns else only with id column.

So how can we achieve this in scala.
val decision: Boolean = false

Lets set an expression
val exprs = (if (decision != true) 
                   dataFrame1.col("id").equalTo(dataFrame2.col("id"))
             else dataFrame1.col("id").equalTo(dataFrame2.col("id")) 
                   and dataFrame1.col("code").equalTo(dataFrame2.col("code")))
            )

and then join
dataFrame1.join(dataFrame2, exprs).show

This is how you join 2 dataframes with conditions.