on 12-06-2024 11:34 AM
JSON data are provided by the source system for sharing with Incorta. The JSON has a nested structure, and we would like to transform it to a tabular format and load the data as a flattened structure.
Here are the sample CSV files that include the JSON data.
id,json_data
1,"[{""name"":""Alice"",""age"":30},{""name"":""Bob"",""age"":25}]"
2,"[{""name"":""Charlie"",""age"":35}]"
3,"[{""name"":""David"",""age"":28},{""name"":""Eva"",""age"":22}]"
The expected result is as the following table:
The assumption is that the JSON data will be part of the data ingested from the source. Each record may have one or more attributes with the JSON data.
The JSON data may have a nested structure, and multiple (child) records may exist in JSON data.
Incorta MV will be used to load the JSON data into Incorta.
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, LongType, ArrayType
import pyspark.sql.functions as F
df = read("JSON_Process.SampleJSON")
Using the read("<SchemaName>.<TableName>"), the data will be available as a Spark DataFrame for processing.
You will define the Spark DataFrame schema to describe the JSON data structure. If you have multiple JSON data columns, create multiple Spark schemas for each JSON data column.
json_schema = ArrayType(
StructType([
StructField("name", StringType(), True),
StructField("age",IntegerType() , True)
]))
In the sample, multiple person records are included in the JSON data column, so the ArrayType() is being used.
Based on the defined schema, we use the from_json() function from Spark SQL functions to parse the "json_data" column.
df.select("id", F.from_json("json_data",json_schema )\
.alias("json_str"))\
.show()
The parsed data is assigned a temporary alias, "json_str", creating a new column with the extracted JSON structure.
Since the "json_data" contains an array of JSON objects, we can use explode() to transform each element into a separate row.
df.select("id", F.from_json("json_data",json_schema ).alias("json_str"))\
.select("id", F.explode("json_str").alias("item")).show()
The explode() function will increase the number of rows:
Using dot notation, we can now access specific fields within the exploded JSON objects.
output_df = df.select("id", F.from_json("json_data",json_schema ).alias("json_str"))\
.select("id", F.explode("json_str").alias("item"))\
.select("id", "item.name", "item.age")
save(output_df)
We create a final DataFrame (output_df) containing the "id" and the extracted "name" and "age" from the nested JSON data.
Finally, use the save() function to persist the output_df as the output of the materialized view.
The MV notebook is available as an attachment and can be imported after unzipping the file to a .zpln file.