Friday 25 November 2022

Extracting JSON object within a spark dataframe

 Let's see how we can extract a Json object from a spark dataframe column

This is an example data frame

import numpy as np
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled',False)

d = { 'result': [{"A": {"B": [{"key":{"dateAtString": "1990-10-12T10:45:11.691274Z", "dateLastString": "1990-10-12T10:46:45.372113Z", "valueRes": {"C": "AZZ", "test": "2"}, "result": "True"}},{"key":{"dateAtString": "1990-10-12T10:45:11.691274Z", "dateLastString": "1990-10-12T10:46:45.372113Z", "valueRes": {"C": "AW", "test": "2"}, "result": "true"}}]}}]}

df = pd.DataFrame(d)
sparkDF = spark.createDataFrame(df)



1. Let's extract value of  'A'
sparkDF = sparkDF.select(explode(sparkDF.result.A).alias('col1','col2'))
As we are exploding till A Json object, this will help to bring key (B) and value (array) into 2 different columns.





2. Now let's drill down further
    explode array element in col2

sparkDF = sparkDF.select(explode(sparkDF.result.A.B).alias('result'))



3. Expanding result column

sparkDF = sparkDF.select(explode(sparkDF.result.A.B).alias('result')).select('result.key')


I am just selecting data from result column and that too I want to expend the key to get all the needed values.


4. Extract value of result
     sparkDF = sparkDF.select('key.result')
  













5. Extract valueRes.test
    sparkDF = sparkDF.select('key.valueRes')














 You can also put columnname.* to bring all the values into column format.

Tuesday 14 June 2022

Join through expression variable as on condition in databricks using PySpark

 Lets see how to join 2 table with a parameterized on condition in PySpark

Eg: I have 2 dataframes A and B and I want to join them with id,inv_no,item and subitem


onExpr = [(A.id == B.id) &
                    (A.invc_no == B.invc_no) & 
                    (A.item == B.item) & 
                    (A.subItem == B.subItem)] 

 dailySaleDF = A.join(B, onExpr, 'left').select([c for c in df.columns])



Save dataframe to table and ADLS path in one go

 Lets see how to save dataframe into a table and create view on adls.


df.write.format('delta')
            .mode('overwrite')
            .option('overwriteSchema', 'true')
            .saveAsTable('{database_name}.{tbl}'.format(database_name = database,tbl = table_name)
                , path = '{base_dir}/{tbl}/'.format(base_dir  =  location,tbl  =  table_name))

How to get Azure Key Vault values into Azure Databricks Notebook

It is always a best practice to store the secrets in Azure Key vault. In order to access them in databricks , first a scope needs to be defined and using the scope and key you will be able to access the secrets.


In below example scope is "myScopeDEV" and key is "myScecretKey". Passing env_name as "DEV"