Realtime crypto tracker with Kafka and QuestDB

A photograph of a laptop displaying candle charts of stock market data
Photo by M. B. M. via Unsplash

This submission comes from one of our community contributors Yitaek Hwang who has put together an excellent tutorial that shows how to use Python to send real-time cryptocurrency metrics into Kafka topics, store these records in QuestDB, and perform moving average calculations on this time series data with Pandas.

Thanks for your contribution, Yitaek!

Background#

Bitcoin soars past 50,000 USD for the first time — CNN

Tesla invests 1.5 billion USD in bitcoin, will start accepting it as payment — Washington Post

Not a day goes by without some crypto news stealing the headlines these days. From institutional support of Bitcoin to central banks around the world exploring some form of digital currency, interest in cryptocurrency has never been higher. This is also reflected in the daily exchange volume:

A chart showing cryptocurrency exchange volumes from 2017 to present

As someone interested in the future of DeFi (decentralized finance), I wanted to better track the price of different cryptocurrencies and store them into a timeseries database for further analysis. I found an interesting talk by Ludvig Sandman and Bruce Zulu at Kafka Summit London 2019, Using Kafka Streams to Analyze Live Trading Activity for Crypto Exchanges, so I decided to leverage Kafka and modify it for my own use.

Prerequisites#

Note: Memory can be increased on Docker Desktop in Settings -> Resources -> Memory and increasing the default limit from 2GB to 4GB.

Project setup#

At a high level, this project polls the public Coinbase API for the price of Bitcoin, Ethereum, and Chainlink. This information is then published onto individual topics on Kafka (e.g. topic_BTC). The raw price information is sent to a QuestDB via Kafka Connect to populate the timeseries database. At the same time, a separate consumer also pulls that data and calculates a moving average for a quick trend analysis.

A diagram showing the flow of information in this tutorial from Coinbase to Kafka and finally to QuestDB
git clone git@github.com:Yitaek/kafka-crypto-questdb.git

The codebase is organized into three parts:

  • ./docker-compose: holds docker-compose file to start QuestDB, Kafka (zookeeper, broker & kafka connect), and a JSON file to initialize Kafka Connect
  • ./docker: Dockerfile to build Kafka Connect image (pre-built image is available via docker-compose)
  • *.py: Python files to grab latest pricing information from Coinbase, publishes information to Kafka, and calculates a moving average

If you would like to analyze different cryptocurrencies or extend the simple moving average example with a more complicated algorithm like relative strength index analysis, feel free to fork the repo on GitHub.

Setting up Kafka & QuestDB#

Before pulling data from Coinbase, we need a running instance of a Kafka cluster and QuestDB. In the repo, I have a working docker-compose file with Confluent Kafka components (i.e. Zookeeper, Kafka Connect, Kafka Broker) and QuestDB. If you would like to run this on the cloud or run it locally, follow the instructions on the Confluent website. Otherwise simply run the docker-compose file:

cd docker-compose
docker-compose up

The docker-compose file runs the following services:

  • QuestDB
  • Zookeeper for Kafka config information, distributed sync, and group services
  • Kafka Broker for receiving and storing messages and for consumers to fetch messages by topic
  • Kafka Connect with JDBC driver for streaming data between Apache Kafka and other systems

The Kafka Connect image is based on confluentinc/cp-kafka-connect-base:6.1.0. If you wish to modify this image (e.g. add a new connector or modify the bootup process), you can override the Dockerfile and build it locally.

Wait for the Kafka cluster to come up and check the logs in the connect container until you see the following messages:

[2021-02-...] INFO [Worker clientId=connect-1 ... ] Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2021-02-...] INFO [Worker clientId=connect-1 ... ] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2021-02-...] INFO [Worker clientId=connect-1 ... ] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Postgres sink#

At this point, we have a healthy Kafka cluster and a running instance of QuestDB, but they are not connected. Since QuestDB supports the Kafka Connect JDBC driver, we can leverage the Postgres sink to populate our database automatically. Post this connector definition to our Kafka Connect container:

From ./docker-compose directory
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
--data @postgres-sink-btc.json http://localhost:8083/connectors

The contents of the postgres-sink-btc.json we are sending holds the following configuration settings:

{
"name": "postgres-sink-btc",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "topic_BTC",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connection.url": "jdbc:postgresql://questdb:8812/qdb?useSSL=false",
"connection.user": "admin",
"connection.password": "quest",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"auto.create": "true",
"insert.mode": "insert",
"pk.mode": "none"
}
}

Some important fields to note:

  • topics: Kafka topic to consume and convert into Postgres format
  • connection: Using default credentials for QuestDB (admin/quest) on port 8812
  • value.converter: This example uses JSON with schema, but you can also use Avro or raw JSON. If you would like to override the default configuration, you can refer to the Kafka sink connector guide from MongoDB

Polling Coinbase#

Now our that our Kafka-QuestDB connection is made, we can start pulling data from Coinbase. The Python code requires numpy, kafka-python, and pandas to run. Using pip, install those packages and run the getData.py script:

pip install -r requirements.txt
python getData.py

It will now print out debug message with pricing information as well as the schema we’re using to populate QuestDB:

Initializing Kafka producer at 2021-02-17 14:38:18.655069
Initialized Kafka producer at 2021-02-17 14:38:18.812354
API request at time 2021-02-17 14:38:19.170623Record: {'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'currency'}, {'type': 'float', 'optional': False, 'field': 'amount'}, {'type': 'string', 'optional': False, 'field': 'timestamp'}], 'optional': False, 'name': 'coinbase'}, 'payload': {'timestamp': datetime.datetime(2021, 2, 17, 14, 38, 19, 170617), 'currency': 'BTC', 'amount': 50884.75}}API request at time 2021-02-17 14:38:19.313046
Record: {'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'currency'}, {'type': 'float', 'optional': False, 'field': 'amount'}, {'type': 'string', 'optional': False, 'field': 'timestamp'}], 'optional': False, 'name': 'coinbase'}, 'payload': {'timestamp': datetime.datetime(2021, 2, 17, 14, 38, 19, 313041), 'currency': 'ETH', 'amount': 1809.76}}API request at time 2021-02-17 14:38:19.471573
Record: {'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'currency'}, {'type': 'float', 'optional': False, 'field': 'amount'}, {'type': 'string', 'optional': False, 'field': 'timestamp'}], 'optional': False, 'name': 'coinbase'}, 'payload': {'timestamp': datetime.datetime(2021, 2, 17, 14, 38, 19, 471566), 'currency': 'LINK', 'amount': 31.68216}}API request at time 2021-02-17 14:38:23.978928
Record: {'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'currency'}, {'type': 'float', 'optional': False, 'field': 'amount'}, {'type': 'string', 'optional': False, 'field': 'timestamp'}], 'optional': False, 'name': 'coinbase'}, 'payload': {'timestamp': datetime.datetime(2021, 2, 17, 14, 38, 23, 978918), 'currency': 'BTC', 'amount': 50884.75}}...

Querying QuestDB#

QuestDB is a fast, open-source, timeseries database with SQL support. This makes it a great candidate to store financial market data for further historical trend analysis and generating trade signals. By default, QuestDB ships with a console UI exposed on port 9000. Navigate to localhost:9000 and query Bitcoin tracking topic topic_BTC to see price data stream in:

The Web Console in QuestDB showing results from a table of Bitcoin records

You can repeat this process for the other topics as well. If you prefer to run without a UI, you can also use the REST API to check:

curl -G \
--data-urlencode "query=select * from topic_BTC" \
http://localhost:9000/exp

The QuestDB console UI also provides the ability to generate basic graphs:

  1. Click on the Chart tab underneath the Tables
  2. Select line as the chart type, timestamp as the label
  3. Click Draw
The Web Console in QuestDB showing how to visualize results from a table of Bitcoin records on a graph

The QuestDB native charting capabilities are limited to a few graph types, so for more advanced visualization, check out my previous guide on streaming heart rate data to QuestDB under the Visualizing data with Grafana section.

Calculate moving average#

While we store the raw data on QuestDB for more sophisticated analysis, we can also consume from the same topics to calculate a quick moving average. This may be useful if you want to also post these records to another Kafka topic that you may use on a dashboard or to set alerts on pricing trends.

On a separate terminal, run the moving average script:

python movingAverage.py

It will print out the moving average of 25 data points and post it to topic_<crypto>_ma_25 :

Starting Apache Kafka consumers and producer
Initializing Kafka producer at 2021-02-17 16:28:33.584649
Initialized Kafka producer at 2021-02-17 16:28:33.699208Consume record from topic 'topic_BTC' at time 2021-02-17 16:28:34.933318
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.072581
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.075352
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.077106
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.088821
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.091865
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.094458
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.096814
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.098512
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.100150
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.103512

If you wish to also populate these data points into QuestDB, supplement the JSON data with schema information in movingAverage.py similar to the way it is defined in the new data JSON block in getData.py. Then create another Postgres sink via curl with topic set as topic<crypto>_ma_25.

Wrapping up#

To stop streaming data, simply stop the Python scripts. To stop and remove the Kafka cluster / QuestDB services, run:

docker-compose down

While this is a simple example, you can extend this to optimize the data format with Avro, connect it with your Coinbase account to execute trades based on trading signals, or test out different statistical methods on the raw data. Feel free to submit a PR if you have a suggestion or improvements to make!

If you like this content, we'd love to know your thoughts! Feel free to share your feedback or just come and say hello in the QuestDB Community Slack.