cancel
Showing results for 
Search instead for 
Did you mean: 

Unable to save the mv for creating a table from using a json keys

LB123
Rocketeer

I am trying to create a table reading the json object and dynamically getting the table columns from the json objects. I am able to display the data in notebook and getting the expected result but unable to save the schema when I am dynamically getting the table columns. If I hard code the values I am able to save it. Even added a property  spark.dataframe.sampling.enabled to false.

 Failed to generate schema
ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1

 

2 REPLIES 2

dylanwan
Employee
Employee

Although Incorta performs initial schema inference when an MV is saved for the first time, its ongoing incremental refresh relies on the saved schema definition.  Getting the table columns dynamically will not work without some specially handling to match the target schema.

Can you share the code or provide us more details?

Hey Dylanwan,

Let me explain what I am doing. I have a json object in one of the schemas. where I am trying to use the schema and extract the json keys dynamically and use those keys as column names and create a table by using these keys. I am able to extract the data and even I am getting the columns and data while I display in the notebook, but when I am saving the mv I am getting the error. here is some of the code logic apart from this the json object is encrypted. I am also doing a decryption for this. I have even added this property(spark.dataframe.sampling.enabled to false.). 

 Failed to generate schema
ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1


Below is the code snippet. 

%pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, explode, trim, from_json, coalesce, length, map_keys, lit
from pyspark.sql.types import MapType, StringType
import json

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Function to flatten JSON
def smart_flatten(json_str):
def flatten(y, prefix=''):
out = {}
if isinstance(y, dict):
if not y:
out[prefix.lower()] = "{}"
else:
for k, v in y.items():
new_prefix = f"{prefix}_{k}".lower() if prefix else k.lower()
out.update(flatten(v, new_prefix))
elif isinstance(y, list):
if not y:
out[prefix.lower()] = "[]"
elif all(isinstance(i, dict) for i in y):
for idx, item in enumerate(y):
for k, v in item.items():
out.update(flatten(v, f"{prefix}_{idx}_{k}".lower()))
else:
for idx, item in enumerate(y):
out[f"{prefix}_{idx}".lower()] = item
else:
out[prefix.lower()] = str(y)
return out

try:
parsed = json.loads(json_str)
flat = flatten(parsed)
return json.dumps(flat)
except Exception as e:
return json.dumps({"error": str(e)})

# Apply UDF to flatten JSON
extract_nested_keys_udf = udf(smart_flatten, StringType())

# Assuming `flattened_table` is defined elsewhere in your code
flattened_table = flattened_table.filter(col("flattened_json").isNotNull() & (length(col("flattened_json")) > 2))

# Convert flattened JSON to MapType
flattened_with_map = flattened_table.withColumn("json_map", from_json(col("flattened_json"), MapType(StringType(), StringType()))).filter(col("json_map").isNotNull())
flattened_with_keys = flattened_with_map.withColumn("json_keys", map_keys(col("json_map")))

# Extract JSON keys
json_key_list = (flattened_with_keys
.select(explode(col("json_keys")).alias("json_key"))
.distinct()
.rdd
.map(lambda row: row["json_key"])
.collect())
json_key_list = [k for k in json_key_list if k and k.strip()]

# Rename columns to safe names
for col_name in flattened_with_map.columns:
safe_name = col_name.strip().replace(" ", "_").lower()
flattened_with_map = flattened_with_map.withColumnRenamed(col_name, safe_name)

# Select columns based on JSON keys
final_df = flattened_with_map.select(
*[coalesce(col("json_map").getItem(k), lit(None)).alias(k) for k in json_key_list]
)