.png)
- Article History
- Subscribe to RSS Feed
- Mark as New
- Mark as Read
- Bookmark
- Subscribe
- Printer Friendly Page
- Report Inappropriate Content
on 04-07-2025 08:22 PM
- Symptoms
- Diagnosis
- Boto3 Kinesis Data Stream API vs. Spark Streaming
- Solution
- Authentication
- Read Streaming Data
- Write data to Incorta
- References
Symptoms
IoT devices generate a high volume of real-time data. The deploying company has published this data to an AWS Kinesis Data Stream. The challenge is to efficiently and reliably ingest this high-velocity data into Incorta for analysis, while maintaining data integrity and minimizing latency.
Diagnosis
The data residing in AWS Kinesis necessitates a scalable and robust ingestion mechanism. While AWS provides the Kinesis Data Streams API for direct data access, handling the sheer volume and velocity of IoT data requires a distributed processing framework. Incorta's integration with Spark offers parallel processing capabilities, making Spark Streaming an ideal solution for real-time Kinesis data ingestion.
Boto3 Kinesis Data Stream API vs. Spark Streaming
Boto3 is the AWS SDK for Python, which provides an interface to interact with AWS services, including Kinesis Data Streams. You can use Boto3 to perform various operations on Kinesis streams, such as putting records, getting records, and managing streams.
Pros | Cons |
|
|
In the context of this solution, while Boto3 is essential for setting up and managing Kinesis streams, Spark Streaming is better suited for the large-scale, fault-tolerant consumption of data for ingestion into Incorta.
Solution
Authentication
AWS Credentials
You first need to get the credentials for reading data from Kinesis. Here is the basic information required before starting.
stream_name = "<your stream name>"
aws_region = "us-east-1" # change to your region
aws_access_key_id = "<your key id>"
aws_secret_access_key = "<your secret>"
Private vs Public Kinesis URL
While the public API for kinesis may be accessible from Incorta Cloud, please talk with Incorta support to configure the connection between your AWS account and Incorta Cloud. This will enable you to avoid the extra cost of using the net gateway from Google Cloud, which is used by Incorta Cloud.
Read Streaming Data
Spark Streaming or Structured Streaming?
Apache Spark provides two primary approaches to stream processing: Spark Streaming and Spark Structured Streaming.
Spark Streaming | Spark Structured Streaming |
Spark Streaming is the original streaming engine, which processes data in micro-batches using Resilient Distributed Datasets (RDDs). While it offers fault tolerance and scalability, it can present challenges in handling complex transformations and ensuring exactly-once semantics. | Spark Structured Streaming is a newer, more advanced streaming engine built on top of the Spark SQL engine. It treats a data stream as a continuously growing table and enables users to apply relational queries and transformations using DataFrames and Datasets, providing a higher level of abstraction and more consistent semantics. |
Spark Streaming Connectors for Kinesis
Spark Structured Streaming is a framework which is in a way similar to JDBC. You still need the JDBC driver to talk with a specific database using JDBC. AWS Kinesis is a specific data provider and multiple connectors are available for enabling Spark Structured Streaming access to AWS connector.
The connector we will use in this example comes from AWSLab and it is available to download.
Please download the connector and make the jar available to your Spark MV.
Reading Data from Kinesis
In this example, you will read data using Spark Streaming.
The format "aws-kinesis" is a keyword that represents the connector.
df_stream = (spark.readStream
.format("aws-kinesis")
.option("kinesis.streamName", streamName)
.option("kinesis.consumerType", "GetRecords")
.option("kinesis.consumerName", "<Put a name for Incorta>")
.option("kinesis.region", aws_region)
.option("kinesis.endpointUrl", "https://kinesis.us-east-1.amazonaws.com")
.option("kinesis.awsAccessKeyId", aws_access_key_id)
.option("kinesis.awsSecretKey", aws_secret_access_key)
.option("kinesis.startingposition", "earliest")
.load()
)
## Added for debug
df_stream.printSchema()
The last command of printSchema() is to verify whether you can get the payload from the source.
Up to this point, the data has not yet been ingested because Spark performs lazy execution.
Define the Schema and Parse the JSON data
Here is a sample code:
from pyspark.sql.types import *
json_schema = StructType([
StructField("id", StringType(), True),
StructField("ts", LongType(), True),
StructField("v", StringType(), True)
])
# data is a binary format so we need to convert to string first
json_df = df_stream.selectExpr("CAST(data AS STRING) as json_string")
# from_json built-in Spark SQL function will be used to parse the json string
from pyspark.sql.functions import from_json, col
parsed_df = json_df.withColumn("json_data", from_json(col("json_string"), json_schema))
# The structured_df has the data in a tabular format
structured_df = parsed_df.select("json_data.*")
You can add other transformation logic here before writing the data.
Write data to Incorta
Writing to a Staging Area
Incorta Materialized Views (MV) run as part of scheduled batch processes. The streaming data frame cannot be written using a regular Spark data frame write. Instead, you will need to write the data into a staging area in order to send it to Incorta as data in a materialized view.
output_data_folder = "kinesis_staging/" + streamName
# this is used is saving the output data in staging
tenant_path = spark.conf.get("ml.incorta.tenant_path")
output_path = tenant_path + "/data/" + output_data_folder
stream_obj = (
structured_df.writeStream
.queryName("newQuery")
.outputMode('append')
.format('parquet')
.option("path", output_path)
.option("checkpointLocation", output_path +"/checkpoint")
.trigger(processingTime="15 seconds")
.start()
)
stream_obj.awaitTermination(timeout=2*60)
# We cannot use the following. it may run forever
# stream_obj.awaitTermination()
The timeout is set to terminate the streaming read and write, so you can read and write to an MV's parquet files in Incorta.
Write to Incorta MV
This is the regular MV save(), which will create parquet files in the Incorta tenant folder as part of the schema.
df_output = spark.read.format("parquet").schema(schema).load(output_path)
## check the data in the MV notebook, comment it out when you save the MV
df_output.count()
save(df_output)