on 03-24-2022 08:02 AM
A Slowly Changing Dimension (SCD) is a dimension that stores and manages both current and historical data over time. It is considered and implemented as one of the most critical tasks in tracking the history of dimension records and can help support "As Is" and "As Was" reporting. There are two ways to do this -
Usecase 1: When the source table tracks the changes via an effective_date field and always inserts new rows on any changes. For eg the person table has key columns as (PERSON_ID, START_DATE) .
This usecase is very easily supported in Incorta by just creating a special join from the child transaction table to the above parent source table . This join will have one equi join on the ID field, for eg PERSON_ID and one >= join from a date field of the transaction table to the above source table on the effective date field, for eg transaction_date >= START_DATE.
Usecase 2: When the source table does not track the changes and always updates existing rows on any changes .
We can create a Materialized View (MV) to support this usecase of SCD Type 2 dimension. The MV code below has both the full and incremental logic to support it . It track changes for attributes like BILLING_FREQUENCY , PRIMARY_SALES_REP, BILLING_ANALYST for a sample Customer table which does not store any history. It inserts new rows if any change is detected for those 3 fields for the same CUST_ID otherwise it just updates the row.
We also create a formula column for the CURRENT_FLAG which will be set to Y if EFFECTIVE_TO_DATE is in the future else it will be set to N , eg expression: if(Custom.CUSTOMER_MV.EFFECTIVE_TO_DATE > current date ,'Y','N')
Script:
#Name of scd type 2 MV: CUSTOMER_MV
# Tracks changes for attributes like BILLING_FREQUENCY , PRIMARY_SALES_REP, BILLING_ANALYST
#Keys of CUSTOMER_MV: (CUST_ID,EFFECTIVE_FROM_DT)
-------------Full Load Logic-------------
from pyspark.sql.window import Window
import pyspark.sql.functions as func
df_CUSTOMER = read("Custom.CUSTOMER")
df_CUSTOMER.createOrReplaceTempView("CUSTOMER")
df = spark.sql("""
SELECT
CUST_ID,
CUSTOMER_NUMBER,
CUSTOMER_NAME,
BILLING_FREQUENCY,
PRIMARY_SALES_REP,
BILLING_ANALYST,
CREATE_DATE EFFECTIVE_FROM_DT ,
to_date("9999-12-31") EFFECTIVE_TO_DT
FROM CUSTOMER
""")
save(df)
-------------Incrmental Load Logic-------------
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from datetime import date
#read the MV
df_CUSTOMER_SCD = read("Custom.CUSTOMER_MV")
df_CUSTOMER_SCD.createOrReplaceTempView("CUSTOMER_MV")
df = spark.sql("""
SELECT
CUST_ID,
CUSTOMER_NUMBER,
CUSTOMER_NAME,
BILLING_FREQUENCY,
PRIMARY_SALES_REP,
BILLING_ANALYST,
EFFECTIVE_FROM_DT ,
EFFECTIVE_TO_DT
FROM CUSTOMER_MV
""")
last_df = df.filter("EFFECTIVE_TO_DT = '9999-12-31'").drop('EFFECTIVE_TO_DT')
#read the modified rows
df_CUSTOMER = read("Custom.CUSTOMER")
df_CUSTOMER.createOrReplaceTempView("CUSTOMER")
idf = spark.sql("""
SELECT
CUST_ID,
CUSTOMER_NUMBER,
CUSTOMER_NAME,
BILLING_FREQUENCY,
PRIMARY_SALES_REP,
BILLING_ANALYST,
CREATE_DATE EFFECTIVE_FROM_DT
FROM CUSTOMER
WHERE UPDATE_DATE > ?
""")
union_df = last_df.union(idf)
windowSpec = Window.partitionBy(union_df['CUST_ID']).orderBy(union_df['EFFECTIVE_FROM_DT'].asc())
compare_df = union_df.withColumn("prev_BILLING_FREQUENCY", func.lag(union_df['BILLING_FREQUENCY']).over(windowSpec)) .withColumn("prev_PRIMARY_SALES_REP", func.lag(union_df['PRIMARY_SALES_REP']).over(windowSpec)) .withColumn("prev_BILLING_ANALYST", func.lag(union_df['BILLING_ANALYST']).over(windowSpec))
updates_df = compare_df.filter("BILLING_FREQUENCY != prev_BILLING_FREQUENCY OR PRIMARY_SALES_REP != prev_PRIMARY_SALES_REP OR BILLING_ANALYST != prev_BILLING_ANALYST OR prev_BILLING_FREQUENCY IS NULL") .withColumn("EFFECTIVE_TO_DT", func.coalesce(func.date_sub(func.lead(compare_df['EFFECTIVE_FROM_DT']).over(windowSpec), 1), func.lit(date(9999, 12, 31)))) .drop(*[ c for c in compare_df.columns if c.startswith("prev") ])
compare_df1= compare_df.withColumn("EFFECTIVE_TO_DT", func.coalesce(func.date_sub(func.lead(compare_df['EFFECTIVE_FROM_DT']).over(windowSpec), 1), func.lit(date(9999, 12, 31)))) .drop(*[ c for c in compare_df.columns if c.startswith("prev") ])
compare_df1.createOrReplaceTempView("compare_rows")
updates_df.createOrReplaceTempView("updated_rows")
df1 = spark.sql("""
select *
from compare_rows
where CUST_ID in (select CUST_ID from updated_rows)
""")
result = df.filter("Eff_To_Date != '9999-12-31'").union(df1)
save(result)