Yitaek Hwang introduces a more performant setup to stream data from Kafka to QuestDB leveraging the InfluxDB line protocol.
Back in 2021, I wrote about a reference architecture for streaming realtime cryptocurrency prices with Kafka and QuestDB. That project polled the public Coinbase API for the price of various cryptocurrencies, published that information to individual Kafka topics, and used the Kafka Connect JDBC driver to ingest data into QuestDB.
Although this architecture worked as intended, it had some major drawbacks:
Degraded performance due to JDBC and parallelism set to 1
Required explicit schema information for messages
To address these challenges, the QuestDB team recently released the official QuestDB Kafka connector. Underneath the hood, the new connector uses InfluxDB line protocol (ILP), which has superior ingestion performance over the JDBC driver utilizing the PostgreSQL wire and simplifies schema management. So I decided to give this a spin and apply it to my original project, kafka-crypto-questdb.
As a quick recap, the original project's goal was to poll the public Coinbase API for the price of Bitcoin, Ethereum, and Chainlink. This information was then published onto individual topics on Kafka (e.g. topic_BTC) and sent to QuestDb via Kafka Connect.
The codebase is organized into three parts:
docker-compose: holds docker-compose file to start Kafka (zookeeper, broker, kafka connect), QuestDB, and JSON file to initialize Kafka Connect
docker: Dockerfile to build Kafka Connect image (pre-built image is available via docker-compose)
Python files: grabs latest pricing information from Coinbase and publishes information to Kafka
At a high-level, the architecture does not change much with the new connector. The ILP functionality is abstracted away with the connector binary so the code changes required was minimal.
First, I needed to update the Dockerfile to install the QuestDB Kafka Connector files:
The above Dockerfile simply downloads the binaries for the connector and copies the JAR files to the location Kafka Connect expects. As part of the upgrade, the base Kafka image was also bumped from 6.1.0 to 7.3.0.
Next, the docker compose file was updated to expose port 9009 (ILP) on QuestDB instead of 8812 (PostgreSQL):
At this point, we have all the necessary components for the demo so started the stack via docker compose :
Previous repo used the now deprecated docker-compose command, which is now replaced by the new docker compose command.
Since ILP can infer the schema, we no longer need to explicitly include the schema information in the message published onto Kafka. This simplifies the payload significantly. Previously we had this ugly JSON blob:
With the new connector, we can just send the payload directly:
Then for the Kafka Connect configuration, we can specify the new fields.
The important things to notice here are:
connector.class : it's now using io.questdb.kafka.QuestDBSinkConnector instead of the Confluent JDBC connector
value.converter.schemas.enable : since schema is no longer included, we set the converter to false
timestamp.field.name : the connector supports designated timestamps either in integer or timestamp format. For integers, units will be autodetected as long as the field name is configured.
For a full list of configuration options, refer to the configuration manual.
To finish the setup, I first created the Kafka Connect sink:
Then I started polling for the latest prices:
Navigating to the console UI (localhost:9000), we can see the prices of Bitcoin being ingested as before:
Compared to the original setup utilizing the JDBC driver, the QuestDB Kafka connector was simple to use and made the schema management much easier. The code repo is now updated to leverage the new connector, so feel free to submit a PR or an issue.
The overview page for the QuestDB Kafka connector on the QuestDB website lists more information on the configuration details and FAQs. The GitHub repo for the connector also has sample projects including a Node.js and a Java example for those looking to leverage a reference architecture.