4

How do we create a Slowly Changing Dimension (SCD) Type2 dimension in Incorta?

A Slowly Changing Dimension (SCD) is a dimension that stores and manages both current and historical data over time. It is considered and implemented as one of the most critical tasks in tracking the history of dimension records and can help support "As Is" and "As Was" reporting. 

We can create a Materialized View (MV) to support this usecase of SCD Type 2 dimension. The MV code below has both the full and incremental logic to support it . It track changes for attributes like BILLING_FREQUENCY , PRIMARY_SALES_REP, BILLING_ANALYST for a sample Customer table which does not store any history. It inserts new rows if any change is detected for those 3 fields for the same CUST_ID otherwise it just updates the row.

We also create a formula column for the CURRENT_FLAG which will be set to Y if EFFECTIVE_TO_DATE is in the future else it will be set to N , eg expression: if(Custom.CUSTOMER_MV.EFFECTIVE_TO_DATE > current date ,'Y','N')

  • As Was Reporting (Historical): Once the CUSTOMER _MV is loaded we can then create a range join of the transaction table to this MV as a parent using the (CUST_ID, EFFECTIVE_FROM_DATE and EFFECTIVE_TO_DATE) columns. 
  • As Is Reporting(Current):  Join the transaction table to the CUSTOMER table as a parent using the CUST_ID column. 

Script:

#Name of scd type 2 MV: CUSTOMER_MV  
# Tracks changes for attributes like BILLING_FREQUENCY , PRIMARY_SALES_REP, BILLING_ANALYST
#Keys of CUSTOMER_MV: (CUST_ID,EFFECTIVE_FROM_DT)

-------------Full Load Logic-------------
from pyspark.sql.window import Window
import pyspark.sql.functions as func

df_CUSTOMER = read("Custom.CUSTOMER")
df_CUSTOMER.createOrReplaceTempView("CUSTOMER")
df = spark.sql("""
   SELECT
   CUST_ID,
   CUSTOMER_NUMBER,
   CUSTOMER_NAME,
   BILLING_FREQUENCY,
   PRIMARY_SALES_REP,
   BILLING_ANALYST,
   CREATE_DATE EFFECTIVE_FROM_DT ,
   to_date("9999-12-31") EFFECTIVE_TO_DT
   FROM CUSTOMER
          """)
save(df)

-------------Incrmental Load Logic-------------
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from datetime import date

#read the MV
df_CUSTOMER_SCD = read("Custom.CUSTOMER_MV")
df_CUSTOMER_SCD.createOrReplaceTempView("CUSTOMER_MV")
df = spark.sql("""
   SELECT
   CUST_ID,
   CUSTOMER_NUMBER,
   CUSTOMER_NAME,
   BILLING_FREQUENCY,
   PRIMARY_SALES_REP,
   BILLING_ANALYST,
   EFFECTIVE_FROM_DT ,
   EFFECTIVE_TO_DT
   FROM CUSTOMER_MV
          """)

last_df = df.filter("EFFECTIVE_TO_DT = '9999-12-31'").drop('EFFECTIVE_TO_DT')

#read the modified rows
df_CUSTOMER = read("Custom.CUSTOMER")
df_CUSTOMER.createOrReplaceTempView("CUSTOMER")
idf = spark.sql("""
   SELECT
   CUST_ID,
   CUSTOMER_NUMBER,
   CUSTOMER_NAME,
   BILLING_FREQUENCY,
   PRIMARY_SALES_REP,
   BILLING_ANALYST,
   CREATE_DATE EFFECTIVE_FROM_DT
   FROM CUSTOMER
   WHERE UPDATE_DATE > ?
          """)

union_df = last_df.union(idf)
windowSpec = Window.partitionBy(union_df['CUST_ID']).orderBy(union_df['EFFECTIVE_FROM_DT'].asc())

compare_df = union_df.withColumn("prev_BILLING_FREQUENCY", func.lag(union_df['BILLING_FREQUENCY']).over(windowSpec))                  .withColumn("prev_PRIMARY_SALES_REP", func.lag(union_df['PRIMARY_SALES_REP']).over(windowSpec))                  .withColumn("prev_BILLING_ANALYST", func.lag(union_df['BILLING_ANALYST']).over(windowSpec))

updates_df = compare_df.filter("BILLING_FREQUENCY != prev_BILLING_FREQUENCY OR PRIMARY_SALES_REP != prev_PRIMARY_SALES_REP OR BILLING_ANALYST != prev_BILLING_ANALYST OR prev_BILLING_FREQUENCY IS NULL")            .withColumn("EFFECTIVE_TO_DT", func.coalesce(func.date_sub(func.lead(compare_df['EFFECTIVE_FROM_DT']).over(windowSpec), 1), func.lit(date(9999, 12, 31))))            .drop(*[ c for c in compare_df.columns if c.startswith("prev") ])

compare_df1= compare_df.withColumn("EFFECTIVE_TO_DT", func.coalesce(func.date_sub(func.lead(compare_df['EFFECTIVE_FROM_DT']).over(windowSpec), 1), func.lit(date(9999, 12, 31))))            .drop(*[ c for c in compare_df.columns if c.startswith("prev") ])

compare_df1.createOrReplaceTempView("compare_rows")
updates_df.createOrReplaceTempView("updated_rows")

df1 = spark.sql("""
select *
from compare_rows
where CUST_ID in (select CUST_ID from updated_rows)
          """)

result = df.filter("Eff_To_Date != '9999-12-31'").union(df1)
save(result)

 

Posted by Amit Kothari

Reply Oldest first
  • Oldest first
  • Newest first
  • Active threads
  • Popular
Like4 Follow
  • 4 Likes
  • 2 mths agoLast active
  • 83Views
  • 3 Following