cancel
Showing results for 
Search instead for 
Did you mean: 
dylanwan
Employee
Employee

Symptoms

IoT devices generate a high volume of real-time data. The deploying company has published this data to an AWS Kinesis Data Stream. The challenge is to efficiently and reliably ingest this high-velocity data into Incorta for analysis, while maintaining data integrity and minimizing latency.

Diagnosis

The data residing in AWS Kinesis necessitates a scalable and robust ingestion mechanism. While AWS provides the Kinesis Data Streams API for direct data access, handling the sheer volume and velocity of IoT data requires a distributed processing framework. Incorta's integration with Spark offers parallel processing capabilities, making Spark Streaming an ideal solution for real-time Kinesis data ingestion.

Boto3 Kinesis Data Stream API vs. Spark Streaming

Boto3 is the AWS SDK for Python, which provides an interface to interact with AWS services, including Kinesis Data Streams. You can use Boto3 to perform various operations on Kinesis streams, such as putting records, getting records, and managing streams.

Pros Cons
  • Ease of Use: Boto3 provides a Python interface to the Kinesis API, making it relatively easy for Python developers to work with Kinesis.

  • Integration with other AWS services: Boto3 seamlessly integrates with other AWS services, allowing you to build complex data processing pipelines.

  • Full API Access: Boto3 provides access to the entire Kinesis Data Streams API, giving you fine-grained control over your Kinesis streams.

  • Resource Management: Boto3 offers both high-level resource abstractions and low-level clients, providing flexibility in how you interact with Kinesis.

 
  • Complexity for large-scale consumption: For high-throughput data consumption, directly using Boto3 to get records from Kinesis can be complex. You need to handle shard management, sequence numbers, and error handling manually, which can be challenging to scale.

  • Lower-level operations: Compared to Spark Streaming's Kinesis integration, Boto3 operates at a lower level, requiring you to manage more of the data consumption process.

  • No built-in fault tolerance for consumption: Boto3 by itself does not provide built-in fault tolerance for consuming data from Kinesis in a distributed manner. You would need to implement your own checkpointing and recovery mechanisms.

In the context of this solution, while Boto3 is essential for setting up and managing Kinesis streams, Spark Streaming is better suited for the large-scale, fault-tolerant consumption of data for ingestion into Incorta.

Solution

Authentication

AWS Credentials

You first need to get the credentials for reading data from Kinesis.  Here is the basic information required before starting.

stream_name = "<your stream name>"
aws_region = "us-east-1"  # change to your region
aws_access_key_id = "<your key id>"
aws_secret_access_key = "<your secret>"

Private vs Public Kinesis URL

While the public API for kinesis may be accessible from Incorta Cloud, please talk with Incorta support to configure the connection between your AWS account and Incorta Cloud.  This will enable you to avoid the extra cost of using the net gateway from Google Cloud, which is used by Incorta Cloud.

Read Streaming Data

Spark Streaming or Structured Streaming?

Apache Spark provides two primary approaches to stream processing: Spark Streaming and Spark Structured Streaming.

Spark Streaming Spark Structured Streaming
Spark Streaming is the original streaming engine, which processes data in micro-batches using Resilient Distributed Datasets (RDDs). While it offers fault tolerance and scalability, it can present challenges in handling complex transformations and ensuring exactly-once semantics. Spark Structured Streaming is a newer, more advanced streaming engine built on top of the Spark SQL engine. It treats a data stream as a continuously growing table and enables users to apply relational queries and transformations using DataFrames and Datasets, providing a higher level of abstraction and more consistent semantics. 

Spark Streaming Connectors for Kinesis

Spark Structured Streaming is a framework which is in a way similar to JDBC.  You still need the JDBC driver to talk with a specific database using JDBC.  AWS Kinesis is a specific data provider and multiple connectors are available for enabling Spark Structured Streaming access to AWS connector.

The connector we will use in this example comes from AWSLab and it is available to download.

Spoiler
Note: As I write this article, this connector is not published in PyPi. You can download the connector from AWS S3: s3://awslabs-code-us-east-1/spark-sql-kinesis-connector/ 

Please download the connector and make the jar available to your Spark MV.  

Reading Data from Kinesis 

In this example, you will read data using Spark Streaming.

The format "aws-kinesis" is a keyword that represents the connector.

df_stream = (spark.readStream
    .format("aws-kinesis")
    .option("kinesis.streamName", streamName)
    .option("kinesis.consumerType", "GetRecords")
    .option("kinesis.consumerName", "<Put a name for Incorta>")
    .option("kinesis.region", aws_region)
    .option("kinesis.endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
    .option("kinesis.awsAccessKeyId", aws_access_key_id)
    .option("kinesis.awsSecretKey", aws_secret_access_key)
    .option("kinesis.startingposition", "earliest")
    .load()
    )
## Added for debug
df_stream.printSchema()

The last command of printSchema() is to verify whether you can get the payload from the source. 

Up to this point, the data has not yet been ingested because Spark performs lazy execution.

Define the Schema and Parse the JSON data

Here is a sample code:

from pyspark.sql.types import *
json_schema = StructType([
    StructField("id", StringType(), True),
    StructField("ts", LongType(), True),
    StructField("v", StringType(), True)
])
# data is a binary format so we need to convert to string first
json_df = df_stream.selectExpr("CAST(data AS STRING) as json_string")

# from_json built-in Spark SQL function will be used to parse the json string
from pyspark.sql.functions import from_json, col
parsed_df = json_df.withColumn("json_data", from_json(col("json_string"), json_schema))

# The structured_df has the data in a tabular format
structured_df = parsed_df.select("json_data.*")

You can add other transformation logic here before writing the data.

Write data to Incorta

Writing to a Staging Area

Incorta Materialized Views (MV) run as part of scheduled batch processes.  The streaming data frame cannot be written using a regular Spark data frame write.  Instead, you will need to write the data into a staging area in order to send it to Incorta as data in a materialized view.

output_data_folder = "kinesis_staging/" + streamName

# this is used is saving the output data in staging
tenant_path = spark.conf.get("ml.incorta.tenant_path")
output_path = tenant_path + "/data/" +  output_data_folder

stream_obj = (
    structured_df.writeStream
    .queryName("newQuery")
    .outputMode('append')
    .format('parquet') 
    .option("path", output_path)
    .option("checkpointLocation", output_path +"/checkpoint")
    .trigger(processingTime="15 seconds")
    .start()
    )
stream_obj.awaitTermination(timeout=2*60)

# We cannot use the following. it may run forever
# stream_obj.awaitTermination()

The timeout is set to terminate the streaming read and write, so you can read and write to an MV's parquet files in Incorta. 

Spoiler
If you are using an on-premises Incorta instance, you can go to the disk to see the files that are created.

Write to Incorta MV

This is the regular MV save(), which will create parquet files in the Incorta tenant folder as part of the schema.

df_output = spark.read.format("parquet").schema(schema).load(output_path)
## check the data in the MV notebook, comment it out when you save the MV
df_output.count()
save(df_output)

References

Best Practices Index
Best Practices

Just here to browse knowledge? This might help!

Contributors
Version history
Last update:
‎04-07-2025 08:22 PM
Updated by: