Incorta Materialized Views (MV) provide a way to run PySpark, Scala, and Spark R and can be used for building machine learning (ML) models. We will discuss PySpark, Scala, and Spark R separately. Here are the best practices of using Incorta MV for model training and testing.
We recommend that you be familiar with these Incorta concepts before exploring this topic further.
The MV concepts discussed in this article apply to all releases of Incorta. Incorta ML Library was introduced in release 4.9 as a lab feature.
Now let's dive into some details on the best practices of creating Machine Learning models using Incorta Materialized Views.
Incorta MV with PySpark lets you use the Spark Python ML Library to create machine learning models. Since you are creating a Python program, you can also use other Python machine learning libraries, such as Scikit Learn, Tensorflow, or Pytorch, etc.
Here are some of the considerations.
Python ML Libraries run operations on a single machine whereas PySpark runs on multiple machines. Due to parallel execution on all cores on multiple machines, PySpark runs operations faster than Pandas, hence we often prefer to use PySpark for better performance. This is one of the major differences between Pandas and PySpark DataFrames.
If you are working on a Machine Learning application where you are dealing with larger datasets, PySpark is the best fit as it can process operations many times(100x) faster than Pandas.
Python Libraries, such as Pandas, don’t support distributed processing hence you will always need to make sure that you have the resources on the local machine when you need to support your growing data. The python process is competing for memory with the Incorta Analytics service and/or Incorta Loader services. These heavy processes run on the local machine and will compete with the other processing from Incorta.
The Spark driver is a Java program running on the local machine as well. Incorta PySpark MV reads the files into Spark as a Spark DataFrame using Spark executors which are running from the cluster. toPandas() is an action that collects the data from your Spark DataFrame object and pulls all partitions onto the client Spark driver memory before deserializing it as a pandas DataFrame in Python. After the toPandas() action, you aren't using spark cluster distributed computing anymore. You have to be very careful while dealing with large datasets. You will get an OutOfMemoryException if the collected data doesn’t fit in the Spark Driver memory.
When dealing with a large amount of data within Python, you have to be careful with how you use memory. Shortage of memory is a common issue when you have a large amount of data at hand. If the entire RAM space is consumed, the program can crash and throw a MemoryError, which can be tricky to handle at times. Limiting memory usage becomes important in this case.
ML job loads may be very different from other regular data refresh jobs. It is important to test a small data set first and assess the impact before you run or deploy the model building MVs.
Please set the property spark.dataframe.sampling.enabled to false for the Incorta MV that is used for building the ML model. Incorta MVs, by default, use data sampling while saving the MV.
If you use a Spark DataFrame, Incorta MV requires you to save a DataFrame as a result. This can be any DataFrame. You can use the result of applying the model to training or testing data.
Use the incorta_ml package, which can simplify your model building process. Currently, incorta_ml is available in PySpark.
When using Pandas DataFrame, the major challenge is controlling memory usage. Here are the best practices.
You can use the memory_usage function to get the memory size used by columns of pandas dataframe:
Quite often the original DataFrame you’re loading includes columns you don’t actually use in machine learning features. You can select the columns needed before converting the Spark DataFrame to Pandas DataFrame.
Some columns are useful for identifying and describing the observations, but not really ML features for predictions, we can drop these columns from the DataFrame. Only the ID columns are needed and we can join other columns back in PySpark or in Incorta.
After selecting a subset of columns, if the grain of the data is changed and duplicate rows are created, remember to use SELECT DISTINCT or GROUP BY logic to eliminate the duplicates.
#Using Spark SQL to Reduce the Number of Columns df_sub = spark.sql(''' select distinct Salesrep_Id, Inventory_Item_Id, COALESCE(Sold_to_City, End_Customer_City) AS Sold_to_City, COALESCE(Sold_to_County, End_Customer_County) AS Sold_to_County, COALESCE(Sold_to_State, End_Customer_State) AS Sold_to_State, COALESCE(Sold_to_Country, End_Customer_Country) AS Sold_to_Country, COALESCE(Sold_to_Customer_Class_Code,End_Customer_Class_Code) AS Sold_to_Customer_Class_Code, COALESCE(Sold_to_Sales_Channel_Code, End_Customer_Sales_Channel_Code) AS Sold_to_Sales_Channel_Code, COALESCE(Sold_to_Postal_Code,End_Customer_Postal_Code) AS Sold_to_Postal_Code from SALES_CREDT ''')
#Using PySpark to Reduce the Number of Columns import pyspark.sql.functions as f df.createOrReplaceTempView("SALES_CREDT") df = df.withColumn("Sold_to_City",f.coalesce(df.Sold_to_City, df.End_Customer_City)) df = df.withColumn("Sold_to_County",f.coalesce(df.Sold_to_County, df.End_Customer_County)) df = df.withColumn("Sold_to_State",f.coalesce(df.Sold_to_State, df.End_Customer_State)) df = df.withColumn("Sold_to_Country",f.coalesce(df.Sold_to_Country, df.End_Customer_Country)) df = df.withColumn("Sold_to_Postal_Code",f.coalesce(df.Sold_to_Postal_Code, df.End_Customer_Postal_Code)) df = df.withColumn("Sold_to_Customer_Class_Code",f.coalesce(df.Sold_to_Customer_Class_Code, df.End_Customer_Class_Code)) df = df.withColumn("Sold_to_Sales_Channel_Code",f.coalesce(df.Sold_to_Sales_Channel_Code, df.End_Customer_Sales_Channel_Code))
Filter out the necessary rows with PySpark DataFrame filter. You don't have to load the data to a Pandas DataFrame and then filter out the unwanted data using the Pandas API. Filtering data can be done on Spark DataFrames directly.
For categorical columns, such as the country or city, you can do set-based encoding to convert to numeric values. You can encode before passing the data to Pandas. PySpark has an API to do encoding, and you can also manually encode by selecting distinct and given numbers.
You need to convert the Pandas DataFrame to Spark DataFrame and save Spark DataFrame.
from pyspark.sql.types import * schema = StructType( [StructField("Salesrep_Id",LongType(),True), StructField("Inventory_Item_Id",StringType(),True), StructField("Sold_to_City",StringType(),True), StructField("Sold_to_County",StringType(),True), StructField("Sold_to_State",StringType(),True), StructField("Sold_to_Country",StringType(),True), StructField("Sold_to_Customer_Class_Code", StringType(),True), StructField("Sold_to_Sales_Channel_Code",StringType(),True), StructField("Sold_to_Postal_Code",StringType(),True), StructField("predicted_salesrep",LongType(),True)] ) result_df = spark.createDataFrame(pdf, schema) result_df.show() save(result_df)
Model building and the actual inference using the model can be in separate MVs and placed in different schemas.
We don’t need to do the modeling and testing in the same code. PySpark allows you to save the model, and use it later. You can use another MV to read the model and do the testing or inferencing. Since Spark is only using the model for inference, it will run much faster.
If you use the Incorta ML library, it automatically saves the model to the disk under the Incorta tenant folder.
All the models should be saved to the same location on the disk in an on-premises environment. A good place to create a folder is under <Incorta Tenant Folder>/data/models. Putting the model in the shared tenant folder will allow you to access it in a multiple-node environment.
Different ML libraries may provide different ways of saving the models. Use the corresponding native ML library API to save the models.