Working on a project migrating an enterprise system to use Apache Kafka, a recent requirement was to ingest data coming in as XML documents with no pre-defined schema and convert them into the JSON format before they are published to Kafka topics.
This article focuses on how to create a custom source/sink connector using Apache Kafka and not on the actual XML-to-JSON conversion. We go through the steps we took to be able to write, build and run custom code with its dependent libraries
Quoting Apache Kafka documentation,
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export job can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline analysis.
The Source connector moves data into Kafka and the Sink connector moves data out of Kafka.
Since this fits the bill exactly of what the client wanted to accomplish, we knew we wanted to use a Kafka Source Connector for ingesting the XML documents, convert them into JSON, and then add them to the Kafka topics.
Next, we started exploring available custom Kafka connectors providing out-of-the-box XML-to-JSON conversion. We found solutions, where this was done, provided the XML schema was known. However, our client wanted a schema-agnostic solution.
While these solutions worked well, they offered a lot more functionality than the simple XML-to-JSON conversion our client was looking for. This led us to the path of developing a bare-bones custom Kafka Source Connector.
Download and install Kafka: https://kafka.apache.org/downloads
The installation has the following subfolders:
1. bin: The bash files and batch files for kafka-server, zookeeper, consumer, producer, connect-standalone, connect-distributed, etc.
2. config: The config files for connect-file-sink, connect-file-source, zookeeper, server, producer, consumer, etc.
3. data: Logs and offset data
4. libs: The jar files for all dependencies
Inspecting the connect-file-source.properties file in the config directory, we observed that it specifies the class that will be run as the source connector, FileStreamSource.
Creating a custom source connector class and providing it here and adding its compiled jar to the bin directory would make Kafka run our source connector.
Next, we needed a starting point for our version of FileStreamSource connector source code. We referred to the Apache Kafka Connect source code. The source code is available under the Apache License, Version 2.0
The files we needed were FileStreamSourceConnector.java and FileStreamSourceTask.java
FileStreamSourceConnector calls FileStreamSourceTask. We can add custom logic to FileStreamSourceTask.
New to Maven? How to set up a Maven Project using Visual Studio CodeXML-to-JSON conversion
We made use of the poll method from FileStreamSourceTask.java to invoke our XML-to-JSON conversion code. We used org.json for the conversion.
Code sample on how to use this dependency.
We created a JAVA Maven project and added only FileStreamSourceConnector.java and FileStreamSourceTask.java to a new folder under the src directory.
We added the following maven plugin to the pom.xml for generating all the jar files for the dependencies under target/libs when we build the project.
Running the source connector:
1. Build the maven project copy over the jar files for the project SNAPSHOT and the dependent libs generated under target and target/libs directory to the libs directory in the Kafka installation.
2. Edit connect-file-source.properties file from config directory in the Kafka installation. Change the connector.class config here to the custom connector we made.
3. Using terminal or Windows PowerShell navigate to the Kafka installation directory.
4. (Windows commands, use the bash (.sh) files for Mac/Linux)
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.propertiesStarting kafka-server
./bin/windows/kafka-server-start.bat ./config/server.propertiesStarting kafka source connector in standalone mode
./bin/windows/connect-standalone.bat ./config/connect-standalone.properties ./config/connect-file-source.propertiesSimilarly, FileStreamSinkConnector and FileStreamSinkTask can be edited for a custom sink connector and run with:
./bin/windows/connect-standalone.bat ./config/connect-standalone.properties ./config/connect-file-source.properties ./config/connect-file-sink.propertiesFor running Kafka file source connector in distributed mode which allows for multiple tasks to be spawned in case of errors:
./bin/windows/connect-distributed.bat ./config/ connect-distributed.properties ./config/connect-file-source.properties