As someone interested in the future of DeFi (decentralized finance), I wanted to better track the price of different cryptocurrencies and store them into a timeseries database for further analysis. I found an interesting talk by Ludvig Sandman and Bruce Zulu at Kafka Summit London 2019, Using Kafka Streams to Analyze Live Trading Activity for Crypto Exchanges, so I decided to leverage Kafka and modify it for my own use.
QuestDB is a fast, open-source, timeseries database with SQL support. This makes it a great candidate to store financial market data for further historical trend analysis and generating trade signals. The team has released an official QuestDB Kafka connector. Underneath the hood, the new connector uses InfluxDB line protocol (ILP), which has excellent ingestion performance and easy schema management. So I decided to give this a spin and apply it to my project, kafka-crypto-questdb.
- Docker with at least 4GB memory
- Python 3.7+ and pip
- GitHub repository which contains the source for the examples below
Note: Memory can be increased on Docker Desktop in Settings -> Resources
-> Memory and increasing the default limit from
At a high level, this project polls the public Coinbase API for the price of Bitcoin, Ethereum, and Chainlink. This information was then published onto individual topics on Kafka (e.g. topic_BTC) and sent to QuestDb via Kafka Connect:
The codebase is organized into three parts:
docker: Dockerfile to set up the Kafka Connector
docker-compose: holds docker-compose file to start Kafka (zookeeper, broker, kafka connect), QuestDB, and JSON file to initialize Kafka Connect
Python files: grabs latest pricing information from Coinbase and publishes information to Kafka
First, I installed the QuestDB Kafka Connector files:
The above Dockerfile simply downloads the binaries for the connector and copies the JAR files to the location Kafka Connect expects.
Next, the docker compose file exposes port 9009 (ILP) on QuestDB instead of 8812 (PostgreSQL):
At this point, we have all the necessary components for the demo so started the stack via docker compose:
The docker-compose file runs the following services:
- Zookeeper for Kafka config information, distributed sync, and group services
- Kafka Broker for receiving and storing messages and for consumers to fetch messages by topic
- Kafka Connect with the QuestDB Kafka Connector
#Query the data with Python files
To finish the setup, I first created the Kafka Connect sink:
The curl command above submitted a JSON file which contains the configuration for the QuestDB Kafka Connector:
The important things to notice here are:
value.converter.schemas.enable: since there is no schema in messages and we do not use Schema Registry either, we have to set this value to
timestamp.field.name: the connector supports designated timestamps either in integer or timestamp format. For integers, units will be autodetected as long as the field name is configured.
For a full list of configuration options, refer to the configuration manual.
Then I started a Python script polling for the latest prices:
Navigating to the console UI at
localhost:9000, we can see the prices of
Bitcoin being ingested:
You can repeat this process for the other topics as well. If you prefer to run without a UI, you can also use the REST API to check:
#Visualize the data
The QuestDB console UI also provides the ability to generate basic graphs:
- Click on the Chart tab underneath the Tables
lineas the chart type,
timestampas the label
- Click Draw
The QuestDB native charting capabilities are limited to a few graph types, so for more advanced visualization, check out Time-Series Monitoring Dashboard with Grafana and QuestDB.
To stop streaming data, simply stop the Python scripts. To stop and remove the Kafka cluster / QuestDB services, run:
The overview page for the QuestDB Kafka connector on the QuestDB website lists more information on the configuration details and FAQs. The GitHub repo for the connector also has sample projects including a Node.js and a Java example for those looking to leverage a reference architecture.