.png)
- Article History
- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
on
02-22-2023
04:29 PM
- edited on
02-22-2023
04:39 PM
by
Tristan
Introduction
Materialized Views in Incorta give lots of flexibility for end users to work with data in a language of their choice. Users working with PySpark often leverage Pandas DataFrames and want to save resulting DataFrames as tables in Incorta. In order to achieve this, the pandas DataFrame must be converted to a Spark DataFrame then saved. This article will walk through the steps to quickly convert and save a Pandas DataFrame in your materialized view.
What you need to know before reading this article
Let's Go
The next steps assume that you are already working with a Pandas DataFrame using PySpark.
Import Libraries
The following libraries should already be installed on your Incorta Cluster, but they still need to be imported. To learn about installing libraries, check out this article.
#import necessary functions
import pandas as pd
import numpy as np
from pyspark.sql.types import *
Create Functions
Three functions are defined below: equivalent_type(), define_structure(), and pandas_to_spark(). The only one we will use is pandas_to_spark(). Here is the purpose for each:
- equivalent_type(): evaluates the datatype of a column in a Pandas DataFrame and converts it to a Spark data type
- define_structure(): creates a StructField to use when the schema is defined for the Spark DataFrame
- pandas_to_spark(): converts a Pandas DataFrame to a Spark DataFrame using the previous two functions
#create funcitons to convert datatypes
def equivalent_type(f):
if f == 'datetime64[ns]': return TimestampType()
elif f == 'int64': return LongType()
elif f == 'int32': return IntegerType()
elif f == 'float64': return DoubleType()
elif f == 'float32': return FloatType()
else: return StringType()
def define_structure(string, format_type):
try: typo = equivalent_type(format_type)
except: typo = StringType()
return StructField(string, typo)
#given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
columns = list(pandas_df.columns)
types = list(pandas_df.dtypes)
struct_list = []
for column, typo in zip(columns, types):
struct_list.append(define_structure(column, typo))
p_schema = StructType(struct_list)
return spark.createDataFrame(pandas_df, p_schema)
The code above can be copied and pasted into the end of your materialized view.
Convert and Save Your DataFrame
The first line of code uses the pandas_to_spark() function with the pandasDF as input. The output is an equivalent Spark DataFrame named spark_df. The second line simply saves the Spark DataFrame as output for the materialized view.
#use function to convert pandas dataframe to spark dataframe, save it
spark_df = pandas_to_spark(pandasDF)
save(spark_df)
Saving and running the materialized view should result in a successfully loaded table with data types and functions configured for you.