Creating Materialized View using PySpark

Created by Amit Kothari

Incorta allows you to create Materialized Views using Python and Spark SQL to read the data from the Parquet files of existing Incorta Tables, transform it and persist the data so that it can be used in Dashboards.


We can leverage the power of scripting via Python and Spark SQL to handle  many usecases , for eg -

  • Creating bridge tables to resolve many to many relations .
  • Creating an aggregate table from a detail table and using that to join as a parent table - for eg let us say we have a line table and a distribution table which is at a lower grain than the line table. Now we want to keep the dashboard query  with the line table as the base table and just want to get some aggregated metric from the distribution table. Then a MV can be created on the  distribution table with a key  of the line table which is then joined as a parent table to the line table.
  • Converting Oracle PL/SQL procedures - where multi pass logic is required  
  • Predictive analytics using Spark ML and NumPy
  • Data Quality analysis  

Best Practices:

  • Refer to this MV performance tuning article
  • Use the language as SQL if there is only query for which you need to create a MV. For other complex usecase you can select Python as the language.
  • Rather than denormalizing the dimensional attributes in the MV itself try creating a MV which has id and metric fields so that the id fields can be joined to the keys of the parent tables for resolving the dimensional attributes
  • Incremental logic can be supported in a MV if a proper key can be defined on it.
    • We can  use the ? inside the MV sql, ? means last successful transformation timestamp value of MV. This is similar to ? used inside our regular SQL Queries. The value is stored in a long format representing a timestamp value. Eg 

      Select * FROM incorta_metadata.DASHBOARD WHERE CREATIONDATE > ?

    • Another way to run incremental MVs is to select the max date from the MV itself to get the incremented data, we have a new feature where this can be done automatically instead of writing the below code. 

      Here is a simple example.

      from pyspark.sql import functions as F
      P= read("SALES.PRODUCTS")
      //here we are going to read the MV itself.
      mv = read("SALES_MV.SALES_PRODUCTS")
      selectedSales = spark.sql("SELECT p.name, s.*  from products p inner join sales s on p.id= s.product_id
      WHERE s.last_updated > (SELECT MAX(mv.last_updated) FROM mv)")
    • In an Incremental MV please make sure that field order, field names (they are case sensitive) and field datatypes are the same in the full and incremental part of the code.  If the datatypes are not the same then use the CAST function, for eg Timestamp to Date :  CAST(the_ts AS DATE) AS the_date
    • One other way to do incremental logic in a MV where we always want to refresh the last 30 days worth of data -
      • #Filtering je_lines by creation date
        from pyspark.sql.functions import lit, when, col, coalesce
        from datetime import datetime, timedelta
        df_GL_JE_LINES = read("EBS_GL.GL_JE_LINES")
        d = datetime.today() - timedelta(days=30)
        df_GL_JE_LINES = df_GL_JE_LINES.filter(df_GL_JE_LINES.EFFECTIVE_DATE > lit(d)

Here is a sample script: 

1) Shows how to create  a MV  using dataframes and joining  them via SQL , base tables are in a Incorta schema called TEST  and should have been loaded

from pyspark.sql import functions as F

df_customers = read("TEST.customers")
df_submitters = read("TEST.submitters")
df = spark.sql("""
  SELECT DISTINCT a.submitter_id,
                   coalesce(upper(b.revenuegroupname), upper(a.customer_name)) AS submitter_finance_name,
                   coalesce(b.revenuegroupnumber, a.customer_agn) AS submitter_financials_customer_id,
   FROM submitters_table a
   LEFT OUTER JOIN customers_table b ON a.customer_agn=b.customerid
   WHERE lower(a.source_name)='test'


2.1)  Example of a Incremental MV using get_last_refresh_time()

The get_last_refresh_time() is a built-in function provided in the pyspark template to support incremental MV.  Here is an example:

from pyspark import *
import pyspark.sql.functions as F
dashboard = read("incorta_metadata2.DASHBOARD")

df = spark.sql(
            SELECT  *
            FROM dashboard
            WHERE CREATIONDATE >  from_unixtime(%d / 1000)
       """ % last_refresh_time)

The last refresh time is the timestamp when the MV job was executed last time.  It is stored in the epoch time format and thus you can pass it to your Spark SQL as described above.

The assumption is that the data populated in the parquet layer has a date column that can be used to indicate how fresh the data is.  Typically, it is the last updated date column.  In the example, since we are only interested in the newly created records, we use the CREATIONDATE as the filter and compare it with last refresh time.  The CREATIONDATE has to be a column with the Timestamp data type defined in Incorta. 

2.2)  Example of a Incremental MV using timedelta() ,  logic bolded below 

#incremental MV example
import pyspark.sql.functions as F
from datetime import datetime, timedelta
from pyspark.sql.functions import lit, when, col, coalesce

df_ITEMS    = read("items.ITEMS")

#Filtering by 7 days
d = datetime.today() - timedelta(days=7)


df1 = spark.sql("""
ON ts.ITEM_ID               = it.ITEM_ID


3)  Example of a MV which can explode a comma separated list of values in a column:

#Explode userString sample
from pyspark.sql import functions as F

df = read("Test.UserData")

df1 = spark.sql("""SELECT  c1,  userString FROM UserData  WHERE 1=1""")
df2 = df1.withColumn("User",F.explode(F.split(df1.userString, ",")))


MV Configuration

Spark configuration for a MV are listed below are based on the machine specs and impact the performance significantly. The configuration tab of a sample MV can looks like this, so start with the five mandatory properties mentioned below : 



1.      Spark.driver.memory: Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in GB unless otherwise specified (e.g. 1g, 2g). This is the memory allocated for your spark logical plan and any spark actions that transfer the data to the driver, the default is 1g. 

2.      Spark.executor.memory: Amount of memory to use per executor process, in GB unless otherwise specified. (e.g. 2g, 8g). This is the memory allocated for executing you logic, which the data will be loaded and transformed, default is 1g. 

3.      Spark.cores.max: The maximum amount of CPU cores to request for the application from across the cluster. For example, on previous image we set the cores.max to 44, that means the MV will only use 44 cores from the cluster. 

4.      Spark.executor.cores: The number of cores to use on each executor. 

5.      Spark.sql.shuffle.partitions: Configures the number of partitions to use when shuffling data for joins  or aggregations, default is 200. Initially set the shuffle partition equals to cores.max or its multiple and as the data increases you can increase it by (cores.max * 2, cores.max *3, cores.max *4, …) and so on. 

Full Spark Configuration Documentation

Note: Always use the documentation version of your running spark version. 

5replies Oldest first
  • Oldest first
  • Newest first
  • Active threads
  • Popular
  • Good info. One suggestion from performance perspective: In the following sql, the select max(mv.last_updated) from mv will be evaluated as many times as the join between sales and products return rows. I recommend moving the sql to get the max last_update_date one step above and execute it once and store it in a variable. Then use that variable in the incremental sql. 

    selectedSales = sqlContext.sql("SELECT p.name, s.*  from products p inner join sales s on p.id= s.product_id
    WHERE s.last_updated > (SELECT MAX(mv.last_updated) FROM mv)")
  • Can anyone tell me what libraries are already imported into this MV? For example:

    df1 = read("HR.EMPLOYEES")

    From where is this read() method already imported so that we are able to use it?

    • peddinti satya sashikanth 

      Methods "read" and "save" are helper methods defined in the MV template, they're not from a library. "read" allows you to use the table name and it internally substitutes it with the corresponding parquet files. "save" allows you to write the result dataframe in a parquet file that will represent the MV.

    • Ahmed Moawad So, when writing Python inside MV, am I limited to write using only PySpark or I can also use Pandas for my tasks?

    • peddinti satya sashikanth you can use Pandas.  Please reference this other post for more details: https://community.incorta.com/t/x1cgmm/pandas-and-other-python-libraries-in-materialized-view

      Like 1
Like13 Follow
  • 7 days agoLast active
  • 5Replies
  • 2989Views
  • 15 Following

Product Announcement

A new community experience is coming! If you would like to have beta access to provide feedback, please contact us at community@incorta.com.