Connectors → Apache Kafka
About the Apache Kafka connector
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. To learn more, please review Concepts → Apache Kafka.
With the Kafka connector, you can create an external data source for a Kafka topic available on a list of one or more Kafka brokers. The Kafka topic must contain messages in valid JavaScript Object Notation (JSON) format.
In order to ingest JSON using a defined schema, the Kafka connector requires an Apache Avro file. An Avro file is a map for how to serialize and flatten semi-structured, schema-less JSON into structured data for related physical schema tables. You can use the Avro Extractor Tool to generate an .avro
file for the JSON messages related to a specific Kafka topic.
When enabled, the Kafka connector creates a Kafka Consumer in a Consumer Group that immediately begins ingesting an initial stream of messages from the Kafka topic from the available list of Kafka Brokers.
In this release, the Kafka Consumer in Incorta uses a Java client for Apache Kafka 2.1.
The Kafka connector applies the Avro mapping file, transforms the schema-less, unordered JSON into structured data, encrypts the structured data, and stores the encrypted data as Comma Separated Value (.csv
) files in Shared Storage for the given tenant.
Because Kafka is a platform for streaming data, the Kafka connector inherently loads data incrementally. The Kafka connector supports specifying a rewind offset.
The Kafka connector supports the following Incorta specific functionality:
Feature | Supported |
---|---|
Chunking | |
Data Agent | |
Encryption at Ingest | ✔ |
Incremental Load | ✔ |
Multi-Source | ✔ |
OAuth | |
Performance Optimized | ✔ |
Remote | |
Single-Source | ✔ |
Spark Extraction | |
Webhook Callbacks | ✔ |
Deployment of the Kafka connector may require the following:
- Specify the Server Configurations for the Kafka Consumer Service Name
- Create an Apache Avro file
- Custom properties file
About the Kafka topic JSON message
As semi-structured data, JSON is schema-less and unordered. For this reason, the content of the JSON message for a given Kafka topic may vary by design. One message may contain only one JSON object in a nested array whereas another message may contain numerous nested objects within a nested array. In addition, some key-value pairs may be present, absent, and in differing order between messages.
The Kafka connector requires the JSON message to be valid JSON and in one line. In addition, the JSON message itself must contain a key labeled data
that contains the actual data you want to store in Incorta.
Although optional, it is strongly recommended to include an additional key-pair that names the root entity in the physical schema table. For example, it is recommended that the JSON message contain a key arbitrarily labeled entity
that contains the value of the name of root entity in the physical schema table
Here is an valid example:
{ "entity": "my_table","data": { "name1": "value1","name2": { "name3": "value4","name5": "value5"}}}
Because of the key-value requirement for a 'data' key and for each message to be one line, you may need to make changes to the applications that produce and write JSON messages to a given Kafka topic.
About Apache Avro and the Avro Extractor Tool
Apache Avro is a JSON file itself that defines how to serialize JSON as a schema for a given JSON object. The Avro Extractor Tool allows you to generate an Avro file for the JSON messages related to a specific Kafka topic.
To learn more about Apache Avro, please review Concepts → Apache Avro.
To learn more about the Avro Extractor Tool, please review Tools → Avro Extractor Tool.
Deployment Steps
The Kafka connector ships with Incorta. In certain cases, you may need to make additional configurations to deploy the connector successfully as follows:
Specify a Kafka Consumer Service Name
For an Incorta Cluster with two or Incorta Nodes that each run a Loader Service, you must specify a Kafka Consumer Service Name in the Cluster Management Console.
A Cluster Management Console (CMC) Administrator for your Incorta Cluster must configure the Kafka Consumer Service Name. Changes to this property require that the CMC Administrator restart each Loader Service.
If your Incorta Cluster contains more than two Incorta Nodes each with a Loader Service, then you must specify the Incorta Node and Loader Service to use. If you do not assign a loader service to consume the Kafka messages, Incorta assigns a Loader Service randomly. This can result in unexpected behavior and row duplication.
Here are the steps to specify the required properties for the Server Configurations:
- As the CMC Administrator, sign in to the CMC.
- In the Navigation bar, select Clusters.
- In the cluster list, select a Cluster name.
- In the canvas tabs, select Cluster Configurations.
- In the panel tabs, select Server Configurations.
- In the left pane, select Clustering.
- In the right pane, tor Kafka Consumer Service Name, enter the
<NODE_NAME>.<SERVICE_NAME>
. - Select Save.
When changing the Kafka loader service consumer from one loader service (A) to another loader service (B), you must restart the current loader service (A) first, then restart the other loader service (B).
A CMC Administrator must restart the Loader Services in the Incorta cluster.
Steps to connect Apache Kafka and Incorta
To connect Apache Kafka and Incorta, here are the high level steps, tools, and procedures:
- Create an Apache Avro file
- Create an external data source
- Create a physical schema with the Schema Wizard
- or, Create a physical schema with the Schema Designer
- Load the physical schema
- Explore the physical schema
Create an Apache Avro file
You must create an Avro file for an external data source that uses the Apache Kafka connector. To learn how to create an Avro file with the Avro Extractor Tool, please review Tools → Avro Extractor Tool.
Create an external data source
Here are the steps to create a external data source with the Kafka connector:
- Sign in to the Incorta Direct Data Platform™.
- In the Navigation bar, select Data.
- In the Action bar, select + New → Add Data Source.
- In the Choose a Data Source dialog, in Streaming, select Kafka.
- In the New Data Source dialog, specify the applicable connector properties.
- To test, select Test Connection.
- Select Ok to save your changes.
Kafka connector properties
You can now specify the Security Protocol as a property of the external data source in the New Data Source dialog. This means that you no longer need to define this protocol in the kafka-consumer.properties
file or in a custom properties file.
Here are the properties for the Kafka connector:
Property | Control | Description |
---|---|---|
Data Source Name | text box | Enter the name of the data source |
Topic | text box | Enter the name of the Kafka topic |
Broker List | text box | Enter a comma-separated list of Kafka Brokers as follows:<IPv4\_ADDRESS>:<PORT> The default port for a Kafka Broker is 9092 . |
Message Type Field | text box | Optional. If you created an .avro file using the same input parameter for the avroExtractor.jar command, then specify the same value for this property. |
Trim messageType after dash | toggle | Optional. If you created an .avro file using the same input parameter for the avroExtractor.jar command, then enable this property. |
Kafka Version | drop down list | Select a supported Apache Kafka version. Blank defaults to the most current Kafka version. |
Enable Kafka Consumer | toggle | When enabled, the Loader Service defined as the Kafka Consumer will start ingesting messages and write encrypted .csv files to to the KAFKA directory in the Tenant’s shared directory. |
Date Expiration | input number | This property is not functional. |
Offset rewind to | text box | Optional. Using the yyyy-mm-dd hh:mm:ss.SSS zzz timestamp format, specify the time to rewind the consumer to. This may cause the consumer to retrieve old messages that might have been previously retrieved. |
Mapping File | upload | Select Choose File and the select a local .avro file to upload to Incorta. |
Security Protocol | drop down list | Select from the available options: ● SASL PLAINTEXT ● SSL ● (none) SSL (Secure Socket Layer) uses private-key certificate pairs. SASL (Simple Authentication Security Layer) is a framework that provides developers of applications and shared libraries with mechanisms for authentication, data integrity-checking, and encryption. |
SASL → SASL Mechanism For Client Connection | drop down list | Available with SASL PLAINTEXT. Select PLAIN or GSSAPI. SASL and PLAIN uses a simple username and password for authentication. SASL and GSSAPI uses Kerberos with Active Directory. Kerberos requires that all hosts resolve with a fully-qualified domain names (FQDNs). |
SASL → JAAS Config | text box | Specify the JASS (Java Authentication and Authorization Service) configuration, for example:org.apache.kafka.common.security.plain.PlainLoginModule required |
SASL → JAAS Username | text box | Specify the JASS username for the configuration |
SASL → JAAS Password | text box | Specify the JASS password for the configuration |
SSL → SSL Protocols | drop down list | Select from the available options: ● TLS v1 ● TLS v1.1 ● TLS v1.2 |
SSL → Endpoint Identification Algorithm | textbox | Specify the algorithm for host name validation. An empty string disables host name verification, which is the default. Specify https to enable host name verification. |
SSL → Keystore File | button | To upload a Keystore file, select "Choose File". In the Finder or File Explorer, select your Keystore file such as kafka.client.keystore.jks |
SSL → Keystore password | text box | Specify the keystore password |
SSL → Truststore File | button | To upload a truststore file, select "Choose File". In the Finder or File Explorer, select your truststore file such as kafka.client.truststore.jks |
SSL → Trustore password | text box | Specify the truststore password. Without a password, the truststore remains available, but there are no integrity checks. |
Custom Properties File | button (+) | To upload a .csv file, select +. Then select Upload a csv file. In the Finder or File Explorer, select your custom properties file in .csv format. |
Custom properties file
In previous releases, it was necessary to create a kafka-consumer.properties
file that affected the Kafka consumer group for the given Incorta Cluster. Starting in Release 5.1, you can create and upload a custom properties file for each external data source that uses the Kafka connector.
The properties file is in CSV
format and must contain the following header:
Property,Type,Value
Here is an example:
Property,Type,Valuekafka.logger.class,string,MyClasskafka.logger.logWarning,boolean,truekafka.logger.logInfo,boolean,falsekafka.auto.commit.interval.ms,numeric,10000kafka.request.timeout.ms,numeric,605000kafka.max.poll.interval.ms,numeric,600000
To learn more about the available Consumer Configurations properties, please review Apache Kafka 2.1 Consumer Configs documentation.
Create a physical schema with the Schema Wizard
Here are the steps to create a Kafka physical schema with the Schema Wizard:
- Sign in to the Incorta Direct Data Platform™.
- In the Navigation bar, select Schema.
- In the Action bar, select + New → Schema Wizard
- In (1) Choose a Source, specify the following:
- For Enter a name, enter the physical schema name.
- For Select a Datasource, select the Kafka external data source.
- Optionally create a description.
- In the Schema Wizard footer, select Next.
- In (2) Manage Tables, in the Data Panel, first select the name of the Data Source, and then check the Select All checkbox.
- In the Schema Wizard footer, select Next.
- In (3) Finalize, in the Schema Wizard footer, select Create Schema.
Create a physical schema with the Schema Designer
Here are the steps to create a Kafka physical schema using the Schema Designer:
- Sign in to the Incorta Direct Data Platform™.
- In the Navigation bar, select Schema.
- In the Action bar, select + New → Create Schema.
- In Name, specify the physical schema name, and select Save.
- In the Schema Designer, in Start adding tables to your schema, select Kafka.
- In the Data Source dialog, specify the Kafka table data source properties.
- Select Add.
- In the Table Editor, in the Table Summary section, enter the table name.
- To save your changes, select Done in the Action Bar.
Kafka table data source properties
For a physical schema table, you can define the following Kafka specific data source properties as follows:
Property | Control | Description |
---|---|---|
Type | drop down list | Default is Kafka |
Data Source | drop down list | Select the Kafka external data source |
Query | Text box | Select the text box. In the Edit Query dialog, specify a SELECT statement for the table as defined in the .avro file. |
Callback | toggle | Enables the Callback URL field |
Callback URL | text box | This property appears when the Callback toggle is enabled. Specify the URL. |
Example of Query:
SELECT id, dt, item, quantity, price FROM tblFruitSales.tblFruitSales
The Kafka connector natively supports incremental loads for streaming messages from Kafka. There are no configurations to enable an Update query other than specifying one or more key columns in the physical schema table. You can schedule an incremental load for a Kafka physical schema table in a physical schema as frequently as every 5 minutes.
View the physical schema diagram with the Schema Diagram Viewer
Here are the steps to view the physical schema diagram using the Schema Diagram Viewer:
- Sign in to the Incorta Direct Data Platform™.
- In the Navigation bar, select Schema.
- In the list of schemas, select the Kafka physical schema.
- In the Schema Designer, in the Action bar, select Diagram.
Load the physical schema
Here are the steps to perform a Full Load of the Kafka physical schema using the Schema Designer:
- Sign in to the Incorta Direct Data Platform™.
- In the Navigation bar, select Schema.
- In the list of physical schemas, select the Kafka physical schema.
- In the Schema Designer, in the Action bar, select Load → Load Now → Full.
- To review the load status, in Last Load Status, select the date.
Here are the steps to perform an Incremental Load of the Kafka physical schema with the Schema Designer:
- Sign in to the Incorta Direct Data Platform™.
- In the Navigation bar, select Schema.
- In the list of physical schemas, select the Kafka physical schema.
- In the Schema Designer, in the Action bar, select Load → Load Now → Incremental.
- To review the load status, in Last Load Status, select the date.
Explore the physical schema
With the full or incremental load of the Kafka physical schema complete, you can use the Analyzer to explore the physical schema, create your first insight, and save the insight to a new dashboard tab or an existing dashboard tab.
To open the Analyzer from the physical schema, follow these steps:
- In the Navigation bar, select Schema.
- In the Schema Manager, in the List view, select the Kafka physical schema.
- In the Schema Designer, in the Action bar, select Explore Data.
Known issues
By default, the Kafka connector converts Boolean and Date data types to String. You can use a formula column in a physical schema table to convert a String to a specific data type using one or more built-in functions.