cancel
Showing results for 
Search instead for 
Did you mean: 
LukeG
Employee
Employee

Introduction

Materialized Views in Incorta give lots of flexibility for end users to work with data in a language of their choice. Users working with PySpark often leverage Pandas DataFrames and want to save resulting DataFrames as tables in Incorta. In order to achieve this, the pandas DataFrame must be converted to a Spark DataFrame then saved. This article will walk through the steps to quickly convert and save a Pandas DataFrame in your materialized view.

What you need to know before reading this article

Let's Go

The next steps assume that you are already working with a Pandas DataFrame using PySpark.

Import Libraries

The following libraries should already be installed on your Incorta Cluster, but they still need to be imported. To learn about installing libraries, check out this article.

 

#import necessary functions
import pandas as pd
import numpy as np
from pyspark.sql.types import *

 

Create Functions

Three functions are defined below: equivalent_type(), define_structure(), and pandas_to_spark(). The only one we will use is pandas_to_spark(). Here is the purpose for each:

  • equivalent_type(): evaluates the datatype of a column in a Pandas DataFrame and converts it to a Spark data type
  • define_structure(): creates a StructField to use when the schema is defined for the Spark DataFrame
  • pandas_to_spark(): converts a Pandas DataFrame to a Spark DataFrame using the previous two functions

 

#create funcitons to convert datatypes
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return DoubleType()
    elif f == 'float32': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

#given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return spark.createDataFrame(pandas_df, p_schema)

 

The code above can be copied and pasted into the end of your materialized view.

Convert and Save Your DataFrame

The first line of code uses the pandas_to_spark() function with the pandasDF as input. The output is an equivalent Spark DataFrame named spark_df. The second line simply saves the Spark DataFrame as output for the materialized view.

 

#use function to convert pandas dataframe to spark dataframe, save it
spark_df = pandas_to_spark(pandasDF)
save(spark_df)

 

Saving and running the materialized view should result in a successfully loaded table with data types and functions configured for you.

Screen Shot 2023-01-04 at 11.24.06 AM.png

Related Material

Best Practices Index
Best Practices

Just here to browse knowledge? This might help!

Contributors
Version history
Last update:
‎02-22-2023 04:39 PM
Updated by: