on 02-27-2025 09:55 AM
You need to ingest data from S3 to Incorta. The data is in CSV format and requires transformations. While Incorta's Data Lake connector can read data from S3, it loads the data in raw format before transformations can be applied. Transforming data before it's stored in Incorta can be more efficient.
An alternative solution to using the datalake connector is to write a PySpark MV program to read data from S3, perform transformations, and then save the transformed data as an Incorta materialized view.
Here is how:
You'll need an AWS access key and secret. For enhanced security, avoid hardcoding these credentials directly in your Python code. Consider using Incorta's global variables or configuring an AWS STS credential service. This example demonstrates setting them directly, but this should be avoided in production.
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "your_access_key")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "your_secret_key")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "yours3endpoint.amazonaws.com")
spark._jsc.hadoopConfiguration().set("fs.s3a.signing-algorithm", "S3SignerType")
Important: Replace "your_access_key" and "your_secret_access_key" with your actual AWS credentials.
You need to know the S3 bucket name and the path to your CSV files.
my_bucket = 'your_bucket_name'
s3_path = f's3a://{my_bucket}/parent_folder/myfilename*.csv'
Replace "your_bucket_name" with the actual name of your S3 bucket. The s3_path variable uses an f-string to construct the full S3 path, including the s3a:// prefix required by the S3A connector. Use wildcards (*) in the filename if you need to load multiple files.
Define the schema of your CSV files. This ensures that Spark correctly interprets the data types.
from pyspark.sql.types import *
my_schema = (StructType()
.add("id",LongType(),True)
.add("event_type_id",IntegerType(),True)
.add("event_date",LongType(),True)
.add("message_key",IntegerType(),True)
.add("message",StringType(),True)
.add("updated_dt",StringType(),True)
)
Make sure the schema definition matches the actual structure and data types of your CSV files. The True argument indicates that null values are allowed for each column.
Read the data from S3 into a Spark DataFrame.
df_csv = (spark.read.format("csv")
.option("delimiter", ",")
.option("header", "true") # If your CSV has a header row, include this option
.schema(my_schema)
.load(s3_path)
.select("*", "_metadata.file_name") # Optional, you can file metdata
)
Perform the necessary data transformations using the Spark DataFrame API. Here's an example of converting the updated_dt column to a timestamp:
df_csv = df_csv.withColumn("updated_timestamp", to_timestamp(from_unixtime(col("updated_dt"), "yyyy-MM-dd HH:mm:ss")))
This code assumes the updated_dt column is a string representing a Unix timestamp. It converts these strings to Unix timestamps (seconds since the epoch), and then converts it to a Spark timestamp data type. Adjust the transformation logic as needed for your specific requirements.
Save the transformed DataFrame as an Incorta materialized view. The save() function is an Incorta-specific wrapper which will save the data to incorta storage.
save(df_csv)