Realtime crypto tracker with QuestDB Kafka Connector

Yitaek Hwang introduces a more performant setup to stream data from Kafka to QuestDB leveraging the InfluxDB line protocol.

Kafka and QuestDB logos

Back in 2021, I wrote about a reference architecture for streaming realtime cryptocurrency prices with Kafka and QuestDB. That project polled the public Coinbase API for the price of various cryptocurrencies, published that information to individual Kafka topics, and used the Kafka Connect JDBC driver to ingest data into QuestDB.

Although this architecture worked as intended, it had some major drawbacks:

  1. Degraded performance due to JDBC and parallelism set to 1

  2. Required explicit schema information for messages

To address these challenges, the QuestDB team recently released the official QuestDB Kafka connector. Underneath the hood, the new connector uses InfluxDB line protocol (ILP), which has superior ingestion performance over the JDBC driver utilizing the PostgreSQL wire and simplifies schema management. So I decided to give this a spin and apply it to my original project, kafka-crypto-questdb.


As a quick recap, the original project's goal was to poll the public Coinbase API for the price of Bitcoin, Ethereum, and Chainlink. This information was then published onto individual topics on Kafka (e.g. topic_BTC) and sent to QuestDb via Kafka Connect.

Overview from coinbase to QuestDB via Kafka

The codebase is organized into three parts:

  • docker-compose: holds docker-compose file to start Kafka (zookeeper, broker, kafka connect), QuestDB, and JSON file to initialize Kafka Connect

  • docker: Dockerfile to build Kafka Connect image (pre-built image is available via docker-compose)

  • Python files: grabs latest pricing information from Coinbase and publishes information to Kafka

Project Setup#

At a high-level, the architecture does not change much with the new connector. The ILP functionality is abstracted away with the connector binary so the code changes required was minimal.

First, I needed to update the Dockerfile to install the QuestDB Kafka Connector files:

FROM alpine:latest AS builder
RUN wget && \
unzip kafka-questdb-connector-*-bin.zi
FROM confluentinc/cp-kafka-connect:7.3.0-1
COPY --from=builder /kafka-questdb-connector/*.jar /usr/share/java/kafka/

The above Dockerfile simply downloads the binaries for the connector and copies the JAR files to the location Kafka Connect expects. As part of the upgrade, the base Kafka image was also bumped from 6.1.0 to 7.3.0.

Next, the docker compose file was updated to expose port 9009 (ILP) on QuestDB instead of 8812 (PostgreSQL):

image: questdb/questdb:6.6.1
hostname: questdb
container_name: questdb
- "9000:9000"
- "9009:9009"

At this point, we have all the necessary components for the demo so started the stack via docker compose :

$ cd docker-compose
$ docker compose up

Previous repo used the now deprecated docker-compose command, which is now replaced by the new docker compose command.

Code Changes#

Since ILP can infer the schema, we no longer need to explicitly include the schema information in the message published onto Kafka. This simplifies the payload significantly. Previously we had this ugly JSON blob:

"schema": {
"type": "struct",
"fields": [
"type": "string",
"optional": False,
"field": "currency"
"type": "float",
"optional": False,
"field": "amount"
"type": "string",
"optional": False,
"field": "timestamp"
"optional": False,
"name": "coinbase"
"payload": {
"timestamp": dt.datetime.utcnow(),
"currency": raw_data["data"]["base"],
"amount": float(raw_data["data"]["amount"])

With the new connector, we can just send the payload directly:

"timestamp": int(time.time() * 1000),
"currency": raw_data['data']['base'],
"amount": float(raw_data['data']['amount'])

Then for the Kafka Connect configuration, we can specify the new fields.

"name": "questdb-sink-btc",
"config": {
"topics": "topic_BTC",
"key.converter": "",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"host": "questdb",
"": "timestamp"

The important things to notice here are:

  • connector.class : it's now using io.questdb.kafka.QuestDBSinkConnector instead of the Confluent JDBC connector

  • value.converter.schemas.enable : since schema is no longer included, we set the converter to false

  • : the connector supports designated timestamps either in integer or timestamp format. For integers, units will be autodetected as long as the field name is configured.

For a full list of configuration options, refer to the configuration manual.

Final Result#

To finish the setup, I first created the Kafka Connect sink:

$ curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @questdb-sink-btc.json http://localhost:8083/connectors

Then I started polling for the latest prices:

$ pip install -r requirements.txt
$ python

Navigating to the console UI (localhost:9000), we can see the prices of Bitcoin being ingested as before:

Screenshot of the result

Wrapping Up#

Compared to the original setup utilizing the JDBC driver, the QuestDB Kafka connector was simple to use and made the schema management much easier. The code repo is now updated to leverage the new connector, so feel free to submit a PR or an issue.

The overview page for the QuestDB Kafka connector on the QuestDB website lists more information on the configuration details and FAQs. The GitHub repo for the connector also has sample projects including a Node.js and a Java example for those looking to leverage a reference architecture.

An icon showing wave propagation

Join our developer community

QuestDB is open source. Follow us on Twitter, star our GitHub repo, and join our developer community on Slack!

An icon showing a paper plane

Subscribe to our newsletter

Stay up to date with all things QuestDB