on 05-05-2025 12:38 PM
This article outlines the best practices for incrementally loading large datasets from AWS S3 into Incorta, focusing on minimizing egress costs and maintaining high ingestion performance with low latency. It is built using materialized views and provides a detailed approach for handling timestamped files.
A large volume of files in AWS S3 needs to be loaded into Incorta. File names include timestamps, allowing for incremental loading. Minimizing AWS S3 egress costs is critical. Also low latency with high performance during data ingestion is required.
In another Community article, you can find instructions for how to ingest AWS S3 data into Incorta using materialized views. This article is about the best practice of using incremental logic to load a large volume of files.
Spark DataFrame Reader can read the data from AWS S3 via its Spark AWS S3 connector. You can specify an S3 path and the parquet files from the folder will be extracted.
In this use case, the files will be generated from IoT devices and grow very fast. New files are added almost every minute. You cannot always extract all files during the incremental refresh of Incorta scheduled jobs.
By using the AWS boto3 API, you can scan the file directory and identify only the new files and load only those specific files in each incremental refresh.
Filter the files using the timestamp from the file name.
import boto3
from datetime import datetime, timedelta
def list_s3_keys(bucket_name, prefix='', last_n_days_to_read):
keys = []
paginator = s3_client.get_paginator('list_objects_v2')
threshold_date = (datetime.utcnow() - timedelta(days=last_n_days_to_read)).strftime('%Y%m%d-%H%M%S%f')
for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
if 'Contents' in page:
for obj in page['Contents']:
file_key = obj['Key']
file_name = file_key.split('/')[-1] # Extract the filename only
file_base = file_name.split('.')[0] # Remove the extension
# Extract the timestamp portion (YYYYMMDD_HHMMSS)
parts = file_base.split('_')
if len(parts) >= 3:
file_datetime_str = f"{parts[-3]}_{parts[-2]}" # Extract timestamp
else:
continue # Skip if filename format is unexpected
# Compare extracted timestamp
if file_datetime_str > threshold_date:
keys.append(f"s3a://{bucket_name}/{file_key}")
return keys
files_to_read = list_s3_keys(bucket_name, prefix, 3)
Only read the files that are greater than the specified threshold date.
combined_df = None
for fp in files_to_read:
df = spark.read.format("csv").option("delimiter", "\t").schema(schema).load(fp).select("*", "_metadata.file_name")
if combined_df is None:
combined_df = df
else:
combined_df = combined_df.union(df)
Finally, you can apply additional transformations and save the data frame as the MV.
save(combined_df)