cancel
Showing results for 
Search instead for 
Did you mean: 
dylanwan
Employee
Employee

 

Symptoms

JSON data are provided by the source system for sharing with Incorta.  The JSON has a nested structure, and we would like to transform it to a tabular format and load the data as a flattened structure.

Here are the sample CSV files that include the JSON data.

id,json_data
1,"[{""name"":""Alice"",""age"":30},{""name"":""Bob"",""age"":25}]"
2,"[{""name"":""Charlie"",""age"":35}]"
3,"[{""name"":""David"",""age"":28},{""name"":""Eva"",""age"":22}]"

The expected result is as the following table:

 

Screenshot 2024-11-27 at 5.36.38 PM.png

Diagnosis

The assumption is that the JSON data will be part of the data ingested from the source.  Each record may have one or more attributes with the JSON data.

The JSON data may have a nested structure, and multiple (child) records may exist in JSON data.

Incorta MV will be used to load the JSON data into Incorta. 

Solution

1.  Read Data into Spark

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, LongType, ArrayType
import pyspark.sql.functions as F

df = read("JSON_Process.SampleJSON")

Using the read("<SchemaName>.<TableName>"), the data will be available as a Spark DataFrame for processing.

2. Create JSON Schema

You will define the Spark DataFrame schema to describe the JSON data structure.  If you have multiple JSON data columns, create multiple Spark schemas for each JSON data column.

json_schema =  ArrayType(
        StructType([
            StructField("name", StringType(), True),
            StructField("age",IntegerType() , True)
        ]))

In the sample, multiple person records are included in the JSON data column, so the ArrayType() is being used.

3. Parse JSON and Create a Temporary Column

Based on the defined schema, we use the from_json() function from Spark SQL functions to parse the "json_data" column.

df.select("id", F.from_json("json_data",json_schema )\
  .alias("json_str"))\
  .show()

The parsed data is assigned a temporary alias, "json_str", creating a new column with the extracted JSON structure.

 

Screenshot 2024-11-27 at 5.54.51 PM.png

4. Explode Nested JSON Array

Since the "json_data" contains an array of JSON objects, we can use explode() to transform each element into a separate row.

df.select("id", F.from_json("json_data",json_schema ).alias("json_str"))\
  .select("id", F.explode("json_str").alias("item")).show()

The explode() function will increase the number of rows:

Screenshot 2024-11-27 at 6.45.05 PM.png

5. Save the Final DataFrame 

Using dot notation, we can now access specific fields within the exploded JSON objects.

output_df = df.select("id", F.from_json("json_data",json_schema ).alias("json_str"))\
  .select("id", F.explode("json_str").alias("item"))\
  .select("id", "item.name", "item.age")
save(output_df)

We create a final DataFrame (output_df) containing the "id" and the extracted "name" and "age" from the nested JSON data.

Screenshot 2024-11-27 at 6.52.43 PM.png

 

Finally, use the save() function to persist the output_df as the output of the materialized view.

Download

The MV notebook is available as an attachment and can be imported after unzipping the file to a .zpln file.

Best Practices Index
Best Practices

Just here to browse knowledge? This might help!

Contributors
Version history
Last update:
‎12-06-2024 11:34 AM
Updated by: