In order to explain usage of collect_set, Lets create a Dataframe with 3 columns.
spark-shell --queue= *;
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
Spark context available as sc
SQL context available as sqlContext.
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
sqlcontext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@4f9a8d71
scala> import org.apache.spark.sql.Column
scala> val BazarDF = Seq(
("Veg", "tomato", 1.99),
("Veg", "potato", 0.45),
("Fruit", "apple", 0.99),
("Fruit", "pineapple", 2.59)
).toDF("Type", "Item", "Price")
BazarDF: org.apache.spark.sql.DataFrame = [Type: string, Item: string, Price: double]
Now lets do a group by on Type column and get distinct values in Item
column using collect_set()
scala> var aggBazarDF = BazarDF.groupBy($"Type")
.agg(collect_set($"Item").as("All_Items"))
aggBazarDF: org.apache.spark.sql.DataFrame = [Type: string, All_Items: array<string>]
collect_set() : returns distinct values for a particular key specified.
Lets see the resultant Dataframe.
scala> aggBazarDF.show()
+-----+------------------+
| Type| All_Items|
+-----+------------------+
| Veg| [tomato, potato]|
|Fruit|[apple, pineapple]|
+-----+------------------+
What if we need to remove the square brackets?
We can make use of concat_ws()
scala> var aggBazarDFNew = BazarDF.groupBy($"Type")
.agg(concat_ws(",",collect_set($"Item"))
.as("All_Items"))
aggBazarDFNew: org.apache.spark.sql.DataFrame = [Type: string, All_Items: string]
scala> aggBazarDFNew.show()
+-----+---------------+
| Type| All_Items|
+-----+---------------+
| Veg| tomato,potato|
|Fruit|apple,pineapple|
+-----+---------------+