1

Configure Kafka as a data source

Overview

Incorta analytics supports Kafka as a data source, which can be used to enable Incorta to consume messages from  the No-SQL MongoDB database. This article explains how to connect to a Kafka datasource and consume JSON messages to load them into Incorta.

Pre-Requisites

This section lists the pre-requisites needed prior to creating a Kafka data source, assuming awareness of the Kafka setup and terminologies. 

In order to create a Kafka data source, you will need to have:

  • an up-and-running Incorta instance.
  • an up-and-running zookeeper.
  • an up-and-running Kafka instance.
  • the Avro Extractor tool (shipped by Incorta Analytics)
  • a sample JSON file to be consumed from Kafka.

Steps

Before continuing, it is important to ensure that you have read and understood the "Pre-Requisites" section. This section describes the steps required to create and update a Kafka data source.

Create a Kafka data source

In this section, you will find information about creating a fresh Avro file from sample JSON messages. If you have already created an Avro file that you would like to update, please refer to the "Update a Kafka data source" section below.

Create an Avro file using the Avro Extractor tool:

The Avro Extractor is an external tool that can be used to generate an Avro file from sample JSON messages to create a schema readable by the Incorta application. Since Incorta Analytics is designed to consume data having a predefined structure, which the Kafka-produced data do not have, you may use the Avro Extractor tool using the following steps:

  1. Go to the "<INSTALLATION_PATH>/IncortaNode/bin" directory (for releases prior to 4.0, go to "<INSTALLATION_PATH>/bin") to locate the "avroExtractor.jar" file (i.e. the Avro Extractor tool).
  2. Run the Avro Extractor tool to generate a JSON file containing the Avro schema describing the source message, e.g. employee.avro, by running the following command:
    java -jar avroExtractor.jar [-input <JSON_FILE_PATH.json>][-output
    <FILE_PATH>] [-messageType <KEYNAME>] [-trimAfterDash <true/false>]
    where,
    -input: This parameter takes a sample JSON message file name (and path
            if not in the current directory).
    -output: This optional parameter takes the generated file output name (and path
             if not in the current directory).
             If this parameter is not set, the Avro will be created in the
             same path as that of the input file.
    -messageType: This parameter takes the key carrying the root table
                  name for messages. For example, if the key in the JSON
                  message is {"entity" : "Employee"}, type "-messageType
                  entity" for the schema to be named after the key value,
                  i.e. "Employee". Users can also specify the root table name
                  dynamically from any level within the JSON message in the
                  form of dot-separated keys, e.g. {data.employee.id}. Notice
                  the curly braces wrap is used when specifying the root table
                  name while not being at the message root level. However,
                  when being at the root message level, you can just type the
                  key value, e.g. "Entity".
    -trimAfterDash: This optional parameter takes a boolean data type (true
                    or false) of a "false" default value. Any text entered of
                    a value other than "true" is considered "false". This
                    parameter is especially helpful for JSON messages that
                    include messageType names containing a "-". For example,
                    the JSON message {"messageType" : "employee-USA"} would
                    result in naming the root table "employee-USA", which
                    would result in a zero-loaded rows in the Incorta schema
                    due to illegal characters, i.e. "-". Setting this flag to
                    "true" would remove the dash, along with the subsequent
                    characters to get only "employee" without the dash.

The output will be a JSON file containing the Avro schema describing the source message, e.g. employee.avro. Make note of this file location in order to import later into the Incorta Analytics application.

Create a Kafka data source in the Incorta Analytics application

  1. Log in to the Incorta Analytics web application.
  2. Navigate to the "Data Sources & Data Files" tab in the menu on the left-hand side.
  3. Click "+" in the upper right-hand corner to add a data source. The "Add New Data 
    Source" window appears.
  4.  In the "Add New Data Source" window:
    • Select "Kafka" from the Data Source drop-down list.
    • Provide the data source name in the "Data Source Name" field.
    • Provide the topic in the "Topic" field.
    • Type the brokers list in the "Brokers List" field.
    • In the "Message Type Field, it is optional to provide the name which is 
      the root table name, and would also be used as the schema name.
    • Enable the "Trim messageType after dash" feature.
    • Select a Kafka version from the "Select Kafka Version" drop-down list.
    • Click Choose File to locate the generated avro-file. Note that the "Mapping File" is an Avro file that is generated using the "avroExtractor.jar" tool shipped in the "bin" directory under the <INSTALLATION_PATH>.
    • Click Add Data Source.

Now that you created the Kafka data source, you are ready to navigate to the "Schemas" page, and create a schema that consumes data from the Kafka data source.

Annotations

In the generated Avro file, you may add annotations to ask Incorta to modify column or table definitions, e.g. ignore a table, set certain columns as keys, identify maps, etc. The following table includes a full list of the supported annotations:

No Name Syntax Description Example
1 Primary Key isKey Sets the given field/column to be part of the primary key of its parent table. This flag is set on the field level. {"path": "employee.dsid", "isKey":"true"}
2 Set as table isTable The default behavior of Incorta is to flatten the fields of nested child JSON objects inside its parent. If it is needed to have a nested child being a separate table, the "isTable" flag should be set to "true". {"path":"employee.qualifications", "isTable":"true"}
3 Parent table isParent

A nested child object in JSON is a child table in Incorta by default. However, if the table should be a parent table, the "isParent" flag should be set to "true". 
Note that "isParent" must be used in conjunction with "isTable" set to "true".

{"path":"employee.organization", "isTable":"true", "isParent":"true"}

4 Persist-ent isPersistent Set it to "false" if the data for a JSON nested object will be sent via another Kafka message and it does not have to be in the current message. 
Note "isPersistent" must be used in conjunction with "isTable".
{"path":"employee.organization", "isTable":"true", "isParent":"true", "isPersistent":"false"}
5 Map isMap If a nested JSON object is a map between the keys in the JSON and a set of child records, then it should be a child table and the isMap should be flagged to "true".
Note that "isMap" cannot be used in conjunction with isTable.
{"path":"employee.addresses", "isMap":"true"}
6 Map Key <KEY> In the "path", when referencing fields or records that are child of a map, then the path will include a variable element that will be referenced via the name <KEY>. {"path":"employee.addresses.<KEY>.typeName", "isKey":"true"}.
7 Array Map tableName If a map is a set of fields inside a record, then the annotations will need to specify the names of these fields, along with the corresponding table name because, as it would not have a name in the JSON sample. A list of the specified field names is specified inside the "path" and comma-separated.
Note that the tableName must be used in conjunction with isMap set to "true".
{"path":"employee.addresses.<KEY>.local1, local2", "isMap":"employee.addresses.LocalAddresses"}.
 8 Ignored isIgnored Any table record marked as "isIgnored" will not be shown in discovery. {"path":"employee.correction", "isIgnored":"true"}
 9 Table Name TableName An alias table name can be added to any table, map, or array.  {"path":"country_lastModification", "isTable":"true", "tableName":"coun_last_mod"}
10 One-To-One isOneToOne When annotating a table as "isTable", you can annotate it as one-to-one table, accordingly Incorta not mark more columns as PRIMARY_KEY other than those that were inherited from the parent table. {"path":"employee.demographic", "isTable":"true", 
"isOneToOne":"true"}
11 Source encryp-ted isEncrypt-ed Annotating a field as "isEncrypted" means that it is encrypted from the source and needs to be decrypted by Incorta using custom crypto class. {"path":"employee.back.firstName", "isEncrypted":"true".
12 Encry-ption name encryption Name This annotation should be following isEncrypted = true, as it is meaningless to have it alone. The value of this annotation is the crypto name that should be configured in the "kafka-crypto.properties" file. {"path":"employee.basic.firstName", "isEncrypted":"true", "encryptionName": "person"}
Reply Oldest first
  • Oldest first
  • Newest first
  • Active threads
  • Popular
Like1 Follow
  • 1 Likes
  • 3 wk agoLast active
  • 124Views
  • 1 Following

Welcome!

Welcome to Incorta's user community! Whether you are already an Incorta customer, or interested in becoming one, this is your place to come together and discuss the software, register for webinars, learn about events, learn about new product releases and get support from the community.