Friday, 25 November 2022

Extracting JSON object within a spark dataframe

 Let's see how we can extract a Json object from a spark dataframe column

This is an example data frame

import numpy as np
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled',False)

d = { 'result': [{"A": {"B": [{"key":{"dateAtString": "1990-10-12T10:45:11.691274Z", "dateLastString": "1990-10-12T10:46:45.372113Z", "valueRes": {"C": "AZZ", "test": "2"}, "result": "True"}},{"key":{"dateAtString": "1990-10-12T10:45:11.691274Z", "dateLastString": "1990-10-12T10:46:45.372113Z", "valueRes": {"C": "AW", "test": "2"}, "result": "true"}}]}}]}

df = pd.DataFrame(d)
sparkDF = spark.createDataFrame(df)



1. Let's extract value of  'A'
sparkDF = sparkDF.select(explode(sparkDF.result.A).alias('col1','col2'))
As we are exploding till A Json object, this will help to bring key (B) and value (array) into 2 different columns.





2. Now let's drill down further
    explode array element in col2

sparkDF = sparkDF.select(explode(sparkDF.result.A.B).alias('result'))



3. Expanding result column

sparkDF = sparkDF.select(explode(sparkDF.result.A.B).alias('result')).select('result.key')


I am just selecting data from result column and that too I want to expend the key to get all the needed values.


4. Extract value of result
     sparkDF = sparkDF.select('key.result')
  













5. Extract valueRes.test
    sparkDF = sparkDF.select('key.valueRes')














 You can also put columnname.* to bring all the values into column format.

Tuesday, 14 June 2022

Join through expression variable as on condition in databricks using PySpark

 Lets see how to join 2 table with a parameterized on condition in PySpark

Eg: I have 2 dataframes A and B and I want to join them with id,inv_no,item and subitem


onExpr = [(A.id == B.id) &
                    (A.invc_no == B.invc_no) & 
                    (A.item == B.item) & 
                    (A.subItem == B.subItem)] 

 dailySaleDF = A.join(B, onExpr, 'left').select([c for c in df.columns])



Save dataframe to table and ADLS path in one go

 Lets see how to save dataframe into a table and create view on adls.


df.write.format('delta')
            .mode('overwrite')
            .option('overwriteSchema', 'true')
            .saveAsTable('{database_name}.{tbl}'.format(database_name = database,tbl = table_name)
                , path = '{base_dir}/{tbl}/'.format(base_dir  =  location,tbl  =  table_name))

How to get Azure Key Vault values into Azure Databricks Notebook

It is always a best practice to store the secrets in Azure Key vault. In order to access them in databricks , first a scope needs to be defined and using the scope and key you will be able to access the secrets.


In below example scope is "myScopeDEV" and key is "myScecretKey". Passing env_name as "DEV"






Saturday, 17 April 2021

How to calculate rolling dates using pySpark

Lets calculate current year minus 3 years data using pySpark 
  
You can achieve this using F.add_months(current_date(),-36) option
 












You also have the option to change the dates/year to start . In order to achieve this use F.trunc("end_date", "month") and F.trunc("end_date", "year") 

 




Monday, 5 October 2020

How to write dataframe output to a single file with a specific name using Spark


Spark is designed to write out multiple files in parallel. So there may be cases where we need to merge all the part files, remove the success/commit files and write the content to a single file.

This blog helps you to write spark output to a single file.

Using  df.coalesce(1) we can write data to a single file, 

 result_location = "dbfs:///mnt/datalake/unmesha/output/"   df.coalesce(1).write.format("csv").options(header='true').mode("overwrite").save(result_location)

but still you will see _success files.



 


This solution - adding coalesce isn’t sufficient when you want to write data to a file with a specific name.

We are going to achieve this using dbutils
  result_location = "dbfs:///mnt/datalake/unmesha/output/"
     df.coalesce(1).write.format("csv").options(header='true').mode("overwrite").save(result_location)
  files = dbutils.fs.ls(result_location)
  csv_file = [x.path for x in files if x.path.endswith(".csv")][0] 
  dbutils.fs.mv(csv_file, result_location.rstrip('/') + ".csv") 
  dbutils.fs.rm(result_location, recurse = True)
 Above snippet helps you  to write dataframe output to a single file with a specific name.



  

How to append content to a DBFS file using python spark


You can read and write to DBFS files using 'dbutils'

Lets see one example

dbutils.fs.put("dbfs:///mnt/sample.txt", "sample content")

Above command helps to write "sample content" to 'dbfs:///mnt/sample.txt'

Now you have the file in your DBFS location. Lets see how to append content to this file.


In order to do that, you can open the file in append mode.



with  open("/dbfs/mnt/sample.txt""a") as f:
  f.write("append values")

Now your appended file is ready!!!