cancel
Showing results for 
Search instead for 
Did you mean: 
dylanwan
Employee
Employee

 

Introduction

This article outlines the best practices for incrementally loading large datasets from AWS S3 into Incorta, focusing on minimizing egress costs and maintaining high ingestion performance with low latency. It is built using materialized views and provides a detailed approach for handling timestamped files.

Symptoms

A large volume of files in AWS S3 needs to be loaded into Incorta.  File names include timestamps, allowing for incremental loading. Minimizing AWS S3 egress costs is critical.  Also low latency with high performance during data ingestion is required.

In another Community article, you can find instructions for how to ingest AWS S3 data into Incorta using materialized views.  This article is about the best practice of using incremental logic to load a large volume of files. 

Diagnosis

Spark DataFrame Reader can read the data from AWS S3 via its Spark AWS S3 connector.  You can specify an S3 path and the parquet files from the folder will be extracted.

In this use case, the files will be generated from IoT devices and grow very fast.  New files are added almost every minute.  You cannot always extract all files during the incremental refresh of Incorta scheduled jobs.

Solution

By using the AWS boto3 API, you can scan the file directory and identify only the new files and load only those specific files in each incremental refresh.

Filter the files using the timestamp from the file name.

import boto3
from datetime import datetime, timedelta

def list_s3_keys(bucket_name, prefix='', last_n_days_to_read):
    keys = []
    paginator = s3_client.get_paginator('list_objects_v2')
    threshold_date = (datetime.utcnow() - timedelta(days=last_n_days_to_read)).strftime('%Y%m%d-%H%M%S%f')

    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        if 'Contents' in page:
            for obj in page['Contents']:
                file_key = obj['Key']
                file_name = file_key.split('/')[-1]  # Extract the filename only
                file_base = file_name.split('.')[0]  # Remove the extension

                # Extract the timestamp portion (YYYYMMDD_HHMMSS)
                parts = file_base.split('_')
                if len(parts) >= 3:  
                    file_datetime_str = f"{parts[-3]}_{parts[-2]}"  # Extract timestamp
                else:
                    continue  # Skip if filename format is unexpected
                
                # Compare extracted timestamp
                if file_datetime_str > threshold_date:
                    keys.append(f"s3a://{bucket_name}/{file_key}")
    
    return keys

files_to_read = list_s3_keys(bucket_name, prefix, 3)

Only read the files that are greater than the specified threshold date.

combined_df = None

for fp in files_to_read:
    df = spark.read.format("csv").option("delimiter", "\t").schema(schema).load(fp).select("*", "_metadata.file_name")

    if combined_df is None:
        combined_df = df
    else:
        combined_df = combined_df.union(df)

Finally, you can apply additional transformations and save the data frame as the MV.

save(combined_df)

References

 

Best Practices Index
Best Practices

Just here to browse knowledge? This might help!

Contributors
Version history
Last update:
‎05-05-2025 12:38 PM
Updated by: