.png)
- Article History
- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
on 12-06-2024 11:34 AM
- Symptoms
- Diagnosis
- Solution
- 1. Read Data into Spark
- 2. Create JSON Schema
- 3. Parse JSON and Create a Temporary Column
- 4. Explode Nested JSON Array
- 5. Save the Final DataFrame
- Download
Symptoms
JSON data are provided by the source system for sharing with Incorta. The JSON has a nested structure, and we would like to transform it to a tabular format and load the data as a flattened structure.
Here are the sample CSV files that include the JSON data.
id,json_data
1,"[{""name"":""Alice"",""age"":30},{""name"":""Bob"",""age"":25}]"
2,"[{""name"":""Charlie"",""age"":35}]"
3,"[{""name"":""David"",""age"":28},{""name"":""Eva"",""age"":22}]"
The expected result is as the following table:
Diagnosis
The assumption is that the JSON data will be part of the data ingested from the source. Each record may have one or more attributes with the JSON data.
The JSON data may have a nested structure, and multiple (child) records may exist in JSON data.
Incorta MV will be used to load the JSON data into Incorta.
Solution
1. Read Data into Spark
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, LongType, ArrayType
import pyspark.sql.functions as F
df = read("JSON_Process.SampleJSON")
Using the read("<SchemaName>.<TableName>"), the data will be available as a Spark DataFrame for processing.
2. Create JSON Schema
You will define the Spark DataFrame schema to describe the JSON data structure. If you have multiple JSON data columns, create multiple Spark schemas for each JSON data column.
json_schema = ArrayType(
StructType([
StructField("name", StringType(), True),
StructField("age",IntegerType() , True)
]))
In the sample, multiple person records are included in the JSON data column, so the ArrayType() is being used.
3. Parse JSON and Create a Temporary Column
Based on the defined schema, we use the from_json() function from Spark SQL functions to parse the "json_data" column.
df.select("id", F.from_json("json_data",json_schema )\
.alias("json_str"))\
.show()
The parsed data is assigned a temporary alias, "json_str", creating a new column with the extracted JSON structure.
4. Explode Nested JSON Array
Since the "json_data" contains an array of JSON objects, we can use explode() to transform each element into a separate row.
df.select("id", F.from_json("json_data",json_schema ).alias("json_str"))\
.select("id", F.explode("json_str").alias("item")).show()
The explode() function will increase the number of rows:
5. Save the Final DataFrame
Using dot notation, we can now access specific fields within the exploded JSON objects.
output_df = df.select("id", F.from_json("json_data",json_schema ).alias("json_str"))\
.select("id", F.explode("json_str").alias("item"))\
.select("id", "item.name", "item.age")
save(output_df)
We create a final DataFrame (output_df) containing the "id" and the extracted "name" and "age" from the nested JSON data.
Finally, use the save() function to persist the output_df as the output of the materialized view.
Download
The MV notebook is available as an attachment and can be imported after unzipping the file to a .zpln file.