on 02-22-2023 04:29 PM - edited on 02-22-2023 04:39 PM by Tristan
Materialized Views in Incorta give lots of flexibility for end users to work with data in a language of their choice. Users working with PySpark often leverage Pandas DataFrames and want to save resulting DataFrames as tables in Incorta. In order to achieve this, the pandas DataFrame must be converted to a Spark DataFrame then saved. This article will walk through the steps to quickly convert and save a Pandas DataFrame in your materialized view.
The next steps assume that you are already working with a Pandas DataFrame using PySpark.
The following libraries should already be installed on your Incorta Cluster, but they still need to be imported. To learn about installing libraries, check out this article.
#import necessary functions
import pandas as pd
import numpy as np
from pyspark.sql.types import *
Three functions are defined below: equivalent_type(), define_structure(), and pandas_to_spark(). The only one we will use is pandas_to_spark(). Here is the purpose for each:
#create funcitons to convert datatypes
def equivalent_type(f):
if f == 'datetime64[ns]': return TimestampType()
elif f == 'int64': return LongType()
elif f == 'int32': return IntegerType()
elif f == 'float64': return DoubleType()
elif f == 'float32': return FloatType()
else: return StringType()
def define_structure(string, format_type):
try: typo = equivalent_type(format_type)
except: typo = StringType()
return StructField(string, typo)
#given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
columns = list(pandas_df.columns)
types = list(pandas_df.dtypes)
struct_list = []
for column, typo in zip(columns, types):
struct_list.append(define_structure(column, typo))
p_schema = StructType(struct_list)
return spark.createDataFrame(pandas_df, p_schema)
The code above can be copied and pasted into the end of your materialized view.
The first line of code uses the pandas_to_spark() function with the pandasDF as input. The output is an equivalent Spark DataFrame named spark_df. The second line simply saves the Spark DataFrame as output for the materialized view.
#use function to convert pandas dataframe to spark dataframe, save it
spark_df = pandas_to_spark(pandasDF)
save(spark_df)
Saving and running the materialized view should result in a successfully loaded table with data types and functions configured for you.