This submission comes from one of our community contributors Yitaek Hwang who has put together an excellent tutorial that shows how to use Python to send real-time cryptocurrency metrics into Kafka topics, store these records in QuestDB, and perform moving average calculations on this time series data with Pandas.
Thanks for your contribution, Yitaek!
Bitcoin soars past 50,000 USD for the first time — CNN
Tesla invests 1.5 billion USD in bitcoin, will start accepting it as payment — Washington Post
Not a day goes by without some crypto news stealing the headlines these days. From institutional support of Bitcoin to central banks around the world exploring some form of digital currency, interest in cryptocurrency has never been higher. This is also reflected in the daily exchange volume:
As someone interested in the future of DeFi (decentralized finance), I wanted to better track the price of different cryptocurrencies and store them into a timeseries database for further analysis. I found an interesting talk by Ludvig Sandman and Bruce Zulu at Kafka Summit London 2019, Using Kafka Streams to Analyze Live Trading Activity for Crypto Exchanges, so I decided to leverage Kafka and modify it for my own use.
- Docker with at least 4GB memory
- Python 3.7+ and pip
- GitHub repository which contains the source for the examples below
Note: Memory can be increased on Docker Desktop in Settings -> Resources
-> Memory and increasing the default limit from
At a high level, this project polls the public Coinbase API for the price of
Bitcoin, Ethereum, and Chainlink. This information is then published onto
individual topics on Kafka (e.g.
topic_BTC). The raw price information is sent
to a QuestDB via Kafka Connect to populate the timeseries database. At the same
time, a separate consumer also pulls that data and calculates a moving average
for a quick trend analysis.
The codebase is organized into three parts:
./docker-compose: holds docker-compose file to start QuestDB, Kafka (zookeeper, broker & kafka connect), and a JSON file to initialize Kafka Connect
./docker: Dockerfile to build Kafka Connect image (pre-built image is available via docker-compose)
*.py: Python files to grab latest pricing information from Coinbase, publishes information to Kafka, and calculates a moving average
If you would like to analyze different cryptocurrencies or extend the simple moving average example with a more complicated algorithm like relative strength index analysis, feel free to fork the repo on GitHub.
Before pulling data from Coinbase, we need a running instance of a Kafka cluster and QuestDB. In the repo, I have a working docker-compose file with Confluent Kafka components (i.e. Zookeeper, Kafka Connect, Kafka Broker) and QuestDB. If you would like to run this on the cloud or run it locally, follow the instructions on the Confluent website. Otherwise simply run the docker-compose file:
The docker-compose file runs the following services:
- Zookeeper for Kafka config information, distributed sync, and group services
- Kafka Broker for receiving and storing messages and for consumers to fetch messages by topic
- Kafka Connect with JDBC driver for streaming data between Apache Kafka and other systems
The Kafka Connect image is based on
If you wish to modify this image (e.g. add a new connector or modify the bootup
process), you can override the
and build it locally.
Wait for the Kafka cluster to come up and check the logs in the
container until you see the following messages:
At this point, we have a healthy Kafka cluster and a running instance of QuestDB, but they are not connected. Since QuestDB supports the Kafka Connect JDBC driver, we can leverage the Postgres sink to populate our database automatically. Post this connector definition to our Kafka Connect container:
The contents of the
postgres-sink-btc.json we are sending holds the following
Some important fields to note:
- topics: Kafka topic to consume and convert into Postgres format
- connection: Using default credentials for QuestDB (
quest) on port
- value.converter: This example uses JSON with schema, but you can also use Avro or raw JSON. If you would like to override the default configuration, you can refer to the Kafka sink connector guide from MongoDB
Now our that our Kafka-QuestDB connection is made, we can start pulling data
from Coinbase. The Python code requires
run. Using pip, install those packages and run the
It will now print out debug message with pricing information as well as the schema we’re using to populate QuestDB:
QuestDB is a fast, open-source, timeseries database with SQL support. This makes
it a great candidate to store financial market data for further historical trend
analysis and generating trade signals. By default, QuestDB ships with a console
UI exposed on port 9000. Navigate to localhost:9000 and query Bitcoin tracking
topic_BTC to see price data stream in:
You can repeat this process for the other topics as well. If you prefer to run without a UI, you can also use the REST API to check:
The QuestDB console UI also provides the ability to generate basic graphs:
- Click on the Chart tab underneath the Tables
lineas the chart type,
timestampas the label
- Click Draw
The QuestDB native charting capabilities are limited to a few graph types, so for more advanced visualization, check out my previous guide on streaming heart rate data to QuestDB under the Visualizing data with Grafana section.
While we store the raw data on QuestDB for more sophisticated analysis, we can also consume from the same topics to calculate a quick moving average. This may be useful if you want to also post these records to another Kafka topic that you may use on a dashboard or to set alerts on pricing trends.
On a separate terminal, run the moving average script:
It will print out the moving average of 25 data points and post it to
If you wish to also populate these data points into QuestDB, supplement the JSON
data with schema information in
movingAverage.py similar to the way it is
defined in the new data JSON block in
getData.py. Then create another Postgres
sink via curl with topic set as
To stop streaming data, simply stop the Python scripts. To stop and remove the Kafka cluster / QuestDB services, run:
While this is a simple example, you can extend this to optimize the data format with Avro, connect it with your Coinbase account to execute trades based on trading signals, or test out different statistical methods on the raw data. Feel free to submit a PR if you have a suggestion or improvements to make!
If you like this content, we'd love to know your thoughts! Feel free to share your feedback or just come and say hello in the QuestDB Community Slack.