cancel
Showing results for 
Search instead for 
Did you mean: 
amit_kothari
Employee
Employee

A Slowly Changing Dimension (SCD) is a dimension that stores and manages both current and historical data over time. It is considered and implemented as one of the most critical tasks in tracking the history of dimension records and can help support "As Is" and "As Was" reporting.  There are two ways to do this - 

Usecase 1: When the source table tracks the changes via an effective_date field and always inserts new rows on any changes.   For eg the person table has key columns as (PERSON_ID, START_DATE) .

This usecase is very easily supported in Incorta by just creating a special join from the child transaction table to the above parent source table . This join will have one equi join on the ID field, for eg PERSON_ID and one  >= join from a date field of the transaction table to the above source table on the effective date field, for eg transaction_date >= START_DATE. 

amit_kothari_0-1648133853196.png

Usecase 2: When the source table does not track the changes and always updates existing rows on any changes . 

We can create a Materialized View (MV) to support this usecase of SCD Type 2 dimension. The MV code below has both the full and incremental logic to support it . It track changes for attributes like BILLING_FREQUENCY , PRIMARY_SALES_REP, BILLING_ANALYST for a sample Customer table which does not store any history. It inserts new rows if any change is detected for those 3 fields for the same CUST_ID otherwise it just updates the row.

We also create a formula column for the CURRENT_FLAG which will be set to Y if EFFECTIVE_TO_DATE is in the future else it will be set to N , eg expression: if(Custom.CUSTOMER_MV.EFFECTIVE_TO_DATE > current date ,'Y','N')

  • As Was Reporting (Historical): Once the CUSTOMER _MV is loaded we can then create a range join of the transaction table to this MV as a parent using the (CUST_ID, EFFECTIVE_FROM_DATE and EFFECTIVE_TO_DATE) columns. 
  • As Is Reporting(Current):  Join the transaction table to the CUSTOMER table as a parent using the CUST_ID column. 

Script:

#Name of scd type 2 MV: CUSTOMER_MV  
# Tracks changes for attributes like BILLING_FREQUENCY , PRIMARY_SALES_REP, BILLING_ANALYST
#Keys of CUSTOMER_MV: (CUST_ID,EFFECTIVE_FROM_DT)

-------------Full Load Logic-------------
from pyspark.sql.window import Window
import pyspark.sql.functions as func

df_CUSTOMER = read("Custom.CUSTOMER")
df_CUSTOMER.createOrReplaceTempView("CUSTOMER")
df = spark.sql("""
   SELECT
   CUST_ID,
   CUSTOMER_NUMBER,
   CUSTOMER_NAME,
   BILLING_FREQUENCY,
   PRIMARY_SALES_REP,
   BILLING_ANALYST,
   CREATE_DATE EFFECTIVE_FROM_DT ,
   to_date("9999-12-31") EFFECTIVE_TO_DT
   FROM CUSTOMER
          """)
save(df)

-------------Incrmental Load Logic-------------
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from datetime import date

#read the MV
df_CUSTOMER_SCD = read("Custom.CUSTOMER_MV")
df_CUSTOMER_SCD.createOrReplaceTempView("CUSTOMER_MV")
df = spark.sql("""
   SELECT
   CUST_ID,
   CUSTOMER_NUMBER,
   CUSTOMER_NAME,
   BILLING_FREQUENCY,
   PRIMARY_SALES_REP,
   BILLING_ANALYST,
   EFFECTIVE_FROM_DT ,
   EFFECTIVE_TO_DT
   FROM CUSTOMER_MV
          """)

last_df = df.filter("EFFECTIVE_TO_DT = '9999-12-31'").drop('EFFECTIVE_TO_DT')

#read the modified rows
df_CUSTOMER = read("Custom.CUSTOMER")
df_CUSTOMER.createOrReplaceTempView("CUSTOMER")
idf = spark.sql("""
   SELECT
   CUST_ID,
   CUSTOMER_NUMBER,
   CUSTOMER_NAME,
   BILLING_FREQUENCY,
   PRIMARY_SALES_REP,
   BILLING_ANALYST,
   CREATE_DATE EFFECTIVE_FROM_DT
   FROM CUSTOMER
   WHERE UPDATE_DATE > ?
          """)

union_df = last_df.union(idf)
windowSpec = Window.partitionBy(union_df['CUST_ID']).orderBy(union_df['EFFECTIVE_FROM_DT'].asc())

compare_df = union_df.withColumn("prev_BILLING_FREQUENCY", func.lag(union_df['BILLING_FREQUENCY']).over(windowSpec))                  .withColumn("prev_PRIMARY_SALES_REP", func.lag(union_df['PRIMARY_SALES_REP']).over(windowSpec))                  .withColumn("prev_BILLING_ANALYST", func.lag(union_df['BILLING_ANALYST']).over(windowSpec))

updates_df = compare_df.filter("BILLING_FREQUENCY != prev_BILLING_FREQUENCY OR PRIMARY_SALES_REP != prev_PRIMARY_SALES_REP OR BILLING_ANALYST != prev_BILLING_ANALYST OR prev_BILLING_FREQUENCY IS NULL")            .withColumn("EFFECTIVE_TO_DT", func.coalesce(func.date_sub(func.lead(compare_df['EFFECTIVE_FROM_DT']).over(windowSpec), 1), func.lit(date(9999, 12, 31))))            .drop(*[ c for c in compare_df.columns if c.startswith("prev") ])

compare_df1= compare_df.withColumn("EFFECTIVE_TO_DT", func.coalesce(func.date_sub(func.lead(compare_df['EFFECTIVE_FROM_DT']).over(windowSpec), 1), func.lit(date(9999, 12, 31))))            .drop(*[ c for c in compare_df.columns if c.startswith("prev") ])

compare_df1.createOrReplaceTempView("compare_rows")
updates_df.createOrReplaceTempView("updated_rows")

df1 = spark.sql("""
select *
from compare_rows
where CUST_ID in (select CUST_ID from updated_rows)
          """)

result = df.filter("Eff_To_Date != '9999-12-31'").union(df1)
save(result)
Best Practices Index
Best Practices

Just here to browse knowledge? This might help!

Contributors
Version history
Last update:
‎03-24-2022 08:02 AM
Updated by: