10

Creating Materialized View using PySpark

Incorta allows you to create Materialized Views using Python and Spark to read the data from the Parquet files of existing Incorta Tables, transform it and persist the data so that it can be used in Dashboards.

   

We can leverage the power of scripting via Python and Spark to handle  many usecases , for eg -

  • Creating bridge tables to resolve many to many relations .
  • Creating an aggregate table from a detail table and using that to join as a parent table - for eg let us say we have a line table and a distribution table which is at a lower grain than the line table. Now we want to keep the dashboard query  with the line table as the base table and just want to get some aggregated metric from the distribution table. Then a MV can be created on the  distribution table with a key  of the line table which is then joined as a parent table to the line table.
  • Converting Oracle PL/SQL procedures - where multi pass logic is required  
  • Predictive analytics using Spark ML and NumPy
  • Data Quality analysis  

Best Practices:

  • Use the language as SQL if there is only query for which you need to create a MV. For other complex usecase you can select Python as the language.
  • Rather than denormalizing the dimensional attributes in the MV itself try creating a MV which has id and metric fields so that the id fields can be joined to the keys of the parent tables for resolving the dimensional attributes
  • Incremental logic can be supported in a MV if a proper key can be defined on it.
    • We can  use the ? inside the MV sql, ? means last successful transformation timestamp value of MV. This is similar to ? used inside our regular SQL Queries. The value is stored in a long format representing a timestamp value. Eg 

      Select * FROM incorta_metadata.DASHBOARD WHERE CREATIONDATE > ?

    • The most accurate way to run incremental MVs is to select the max date from the MV itself to get the incremented data accurately. 

      Here is a simple example.

      s=read("SALES.SALES")
      s.registerTempTable("sales")
      
      P= read("SALES. PRODUCTS")
      P.registerTempTable("products")
      
      //here we are going to read the MV itself.
      mv = read("SALES_MV.SALES_PRODUCTS")
      mv.registerTempTable("mv")
      
      selectedSales = sqlContext.sql("SELECT p.name, s.*  from products p inner join sales s on p.id= s.product_id
      WHERE s.last_updated > (SELECT MAX(mv.last_updated) FROM mv)")
      
      save(selectedSales)
    • In an Incremental MV please make sure that field order, field names (they are case sensitive) and field datatypes are the same in the full and incremental part of the code.  If the datatypes are not the same then use the CAST function, for eg Timestamp to Date :  CAST(the_ts AS DATE) AS the_date
    • One more way to do incremental logic in a MV where we always want to refresh the last 30 days worth of data -
      • #Filtering je_lines by creation date

        from pyspark.sql.functions import lit, when, col, coalesce

        from datetime import datetime, timedelta

        df_GL_JE_LINES = read("EBS_GL.GL_JE_LINES")
        d = datetime.today() - timedelta(days=30)
        df_GL_JE_LINES = df_GL_JE_LINES.filter(df_GL_JE_LINES.EFFECTIVE_DATE > lit(d)

Here is a sample script: 

1) Shows how to create  a MV  using dataframes and joining  them via SQL , base tables are in a Incorta schema called TEST  and should have been loaded

------------
from pyspark.sql import functions as F

df_customers = read("TEST.customers")
df_customers.createOrReplaceTempView("customers_table")
df_submitters = read("TEST.submitters")
df_submitters.createOrReplaceTempView("submitters_table")
df = spark.sql("""
  SELECT DISTINCT a.submitter_id,
                   a.submitter_name,
                   a.submitter_type,
                   coalesce(upper(b.revenuegroupname), upper(a.customer_name)) AS submitter_finance_name,
                   coalesce(b.revenuegroupnumber, a.customer_agn) AS submitter_financials_customer_id,
                   a.vendor_id,
                   a.vendor_name
   FROM submitters_table a
   LEFT OUTER JOIN customers_table b ON a.customer_agn=b.customerid
   WHERE lower(a.source_name)='test'
""")
save(df)

------------

2)  Example of a Incremental MV,  logic bolded below

#incremental MV example
import pyspark.sql.functions as F
from datetime import datetime, timedelta
from pyspark.sql.functions import lit, when, col, coalesce


df_TRANSACTION_LINES_SOURCE   = read("transactions.TRANSACTION_LINES_SOURCE")
df_ITEMS    = read("items.ITEMS")

#Filtering by 7 days
d = datetime.today() - timedelta(days=7)
df_TRANSACTION_LINES_SOURCE = df_TRANSACTION_LINES_SOURCE.filter(df_TRANSACTION_LINES_SOURCE.DATE_MODIFIED > lit(d))


df_TRANSACTION_LINES_SOURCE.createOrReplaceTempView("TRANSACTION_LINES_SOURCE")
df_ITEMS.createOrReplaceTempView("ITEMS")


df1 = spark.sql("""
SELECT ts.ACCOUNT_ID,
  ts.AMOUNT,
  ts.AMOUNT_TAXED,
  ts.COMPLETE_0,
  CAST(ts.DATE_ARRIVE AS TIMESTAMP) + INTERVAL '8' HOUR AS DATE_ARRIVE,
  COALESCE(ts.BRAND_ID, it.BRAND_ID)                AS CALCULATED_BRAND_ID
FROM TRANSACTION_LINES_SOURCE AS ts
LEFT OUTER JOIN ITEMS it
ON ts.ITEM_ID               = it.ITEM_ID
WHERE 1=1
              """)

save(df1)

-------------------

Following are some sample MV properties which we can add for performance, for eg this config uses total of 25G * 2 = 50G ram, we can tweak each one of them based on how spark has been configured and the amount of free RAM  :


spark.driver.memory   - 4G
spark.executor.memory  - 25G  
spark.executor.cores - 2
spark.cores.max - 2
spark.sql.shuffle.partitions - 200

 

 

1reply Oldest first
  • Oldest first
  • Newest first
  • Active threads
  • Popular
  • Good info. One suggestion from performance perspective: In the following sql, the select max(mv.last_updated) from mv will be evaluated as many times as the join between sales and products return rows. I recommend moving the sql to get the max last_update_date one step above and execute it once and store it in a variable. Then use that variable in the incremental sql. 

    selectedSales = sqlContext.sql("SELECT p.name, s.*  from products p inner join sales s on p.id= s.product_id
    WHERE s.last_updated > (SELECT MAX(mv.last_updated) FROM mv)")
    
    Reply Like
Like10 Follow
  • 10 Likes
  • 3 days agoLast active
  • 1Replies
  • 918Views
  • 7 Following

Welcome!

Welcome to Incorta's user community! Whether you are already an Incorta customer, or interested in becoming one, this is your place to come together and discuss the software, register for webinars, learn about events, learn about new product releases and get support from the community.