cancel
Showing results for 
Search instead for 
Did you mean: 
dylanwan
Employee
Employee

Problem

You need to ingest data from S3 to Incorta. The data is in CSV format and requires transformations. While Incorta's Data Lake connector can read data from S3, it loads the data in raw format before transformations can be applied. Transforming data before it's stored in Incorta can be more efficient.

Solution

An alternative solution to using the datalake connector is to write a PySpark MV program to read data from S3, perform transformations, and then save the transformed data as an Incorta materialized view. 

Here is how:

AWS Authenticate

You'll need an AWS access key and secret. For enhanced security, avoid hardcoding these credentials directly in your Python code. Consider using Incorta's global variables or configuring an AWS STS credential service. This example demonstrates setting them directly, but this should be avoided in production.

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "your_access_key")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "your_secret_key")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "yours3endpoint.amazonaws.com") 
spark._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "S3SignerType")

Important: Replace "your_access_key" and "your_secret_access_key" with your actual AWS credentials.

S3 Bucket and Path

You need to know the S3 bucket name and the path to your CSV files. 

my_bucket = 'your_bucket_name'
s3_path = f's3a://{my_bucket}/parent_folder/myfilename*.csv'

Replace "your_bucket_name" with the actual name of your S3 bucket. The s3_path variable uses an f-string to construct the full S3 path, including the s3a:// prefix required by the S3A connector. Use wildcards (*) in the filename if you need to load multiple files.

Spark DataFrame Schema

Define the schema of your CSV files. This ensures that Spark correctly interprets the data types.

from pyspark.sql.types import *

my_schema = (StructType()
.add("id",LongType(),True)
.add("event_type_id",IntegerType(),True)
.add("event_date",LongType(),True)
.add("message_key",IntegerType(),True)
.add("message",StringType(),True)
.add("updated_dt",StringType(),True)
)

Make sure the schema definition matches the actual structure and data types of your CSV files. The True argument indicates that null values are allowed for each column.

Read the data from S3

Read the data from S3 into a Spark DataFrame.

df_csv = (spark.read.format("csv")
.option("delimiter", ",")
.option("header", "true") # If your CSV has a header row, include this option
.schema(my_schema)
.load(s3_path)
.select("*", "_metadata.file_name")  # Optional, you can file metdata 
)

Data Transformation (Optional)

Perform the necessary data transformations using the Spark DataFrame API. Here's an example of converting the updated_dt column to a timestamp:

df_csv = df_csv.withColumn("updated_timestamp", to_timestamp(from_unixtime(col("updated_dt"), "yyyy-MM-dd HH:mm:ss")))

This code assumes the updated_dt column is a string representing a Unix timestamp. It converts these strings to Unix timestamps (seconds since the epoch), and then converts it to a Spark timestamp data type. Adjust the transformation logic as needed for your specific requirements.  

Save the DataFrame as an Incorta materialized view

Save the transformed DataFrame as an Incorta materialized view. The save() function is an Incorta-specific wrapper which will save the data to incorta storage. 

save(df_csv)
Best Practices Index
Best Practices

Just here to browse knowledge? This might help!

Contributors
Version history
Last update:
‎02-27-2025 09:55 AM
Updated by: