This tutorial is a guest post contributed by Sooter Saalu, who put together a tutorial to show you how to build a data pipeline using Confluent Kafka, QuestDB, and Python. If you like this content or have any feedback, feel free to reach out to the author or to us on GitHub or on Slack.
A data pipeline, at its base, is a series of data processing measures that is used to automate the transport and transformation of data between systems or data stores. Data pipelines can be used for a wide range of use cases in a business, including aggregating data on customers for recommendation purposes or customer relationship management, combining and transforming data from multiple sources, as well as collating/streaming real-time data from sensors or transactions.
For example, a company like Airbnb could have data pipelines that go back and forth between their application and their platform of choice to improve customer service. Netflix utilizes a recommendation data pipeline that automates the data science steps for generating movie and series recommendations. Also, depending on the rate at which it updates, a batch or streaming data pipeline can be used to generate and update the data used in an analytics dashboard for stakeholders.
In this article, you will learn how to implement a data pipeline that utilizes Kafka to aggregate data from multiple sources into a QuestDB database. Specifically, you will see the implementation of a streaming data pipeline that collates cryptocurrency market price data from CoinCap into a QuestDB instance where metrics, analytics, and further dashboards can be made.
Data pipelines are made up of a data source (e.g., applications, databases, or web services), a processing or transformation procedure (actions such as moving or modifying the data, occurring in parallel or in sequence with each other), and a destination (e.g., another application, repository, or web service).
The type or format of data being moved or transformed, the size of the data, and the rate at which it will be moved or transformed (batch or stream processing) are some other considerations you need to be aware of when building data pipelines. A data pipeline that only needs to be triggered once a month will be different from one made to handle real-time notifications from your application.
Apache Kafka is an open source distributed event platform optimized for processing and modifying streaming data in real time. It is a fast and scalable option for creating high-performing, low-latency data pipelines and building functionality for the data integration of high-volume streaming data from multiple sources. Kafka is a fairly popular tool used by thousands of companies.
QuestDB is a high-performance, open source SQL database designed to process time series data with ease and speed. It is a relational column-oriented database with applications in areas such as IoT, sensor data and observability, financial services, and machine learning. The database’s functionality is written in Java with a supported REST API and support for the PostgreSQL wire protocol and InfluxDB line protocol, allowing for multiple ways to ingest and query data in QuestDB.
QuestDB can be installed using Docker, TAR files, or a package manager such as Homebrew. And Confluent offers a Kafka distribution, Confluent Platform, with addons that ease your data pipeline process and can be installed using Docker images or its downloaded TAR file.
Note: Confluent Platform is licensed separately from Apache Kafka. If you wish to use this setup in production environment, make sure to read through the Confluent Platform Licenses.
All files used in the article are available in this Github repository. You can clone the repository to work through the steps directly:
For the purpose of this article, you can install both using a Docker Compose file that creates the required Docker containers for the Kafka and QuestDB pipeline:
In sequential order, this Docker Compose file installs Confluent-managed Kafka tools, Zookeeper, and Kafka broker, which manage the connections and processes in the Kafka ecosystem. Then, it installs a JDBC Connector that will enable the connection between Kafka and any relational database such as QuestDB, this particular JDBC connector image is a custom connector created to simplify the connection between Confluent’s Kafka service and your Postgres database Finally, it installs the latest version of QuestDB.
You can set up Kafka and QuestDB by moving to the Docker directory and then running the Docker Compose file:
The installation process should take a few minutes. You can check if the
services are up and working with
docker-compose ps. Once you see the
container status as
healthy your cluster will be ready to go.
At this point, your Kafka cluster and QuestDB instance are still unconnected, with no avenue to pass data between them. Using your installed connector, you can create this connection by setting the configuration settings for the connector:
Here, you are setting the topic or topics that the connection monitors, the format for the message entries, and the authentication details for the connection. QuestDB then accepts admin and quest as user and password by default.
You can send this configuration to your installed connector using the following command:
When successfully executed, you should be able to see a response in JSON that includes the configuration above.
At this point, you have a connected QuestDB instance that will monitor the
topic_ETH topic and pull any records sent to it for storage on the database.
You can then create a table for the records on your database.
QuestDB has an interactive web console for that and can be accessed at
http://localhost:9000/. Here, you can query the data and generate some simple
visualizations directly. Use the following command to create a table for your
This creates a formatted table for your records, the next step involves generating records that will be sent to this table.
Using CoinCap’s API and some Python code, you can create a Kafka Producer that will generate data in real time:
This Python code queries the CoinCap API in a continuous loop that generates the
market price of ETH every 15 seconds. It then processes this data and sends it
to the Kafka topic
topic_ETH, where it can be consumed by QuestDB. The data
schema and payload used here is just an example as it doesn’t utilize some
QuestDB optimizations such as partitions
You can run this code with the following commands:
Note: If you are having issues installing the dependencies using the
requirements.txtfile, particularly if you are getting a
Microsoft Visual C++error, please check your Python version first. The Confluent-Kafka-Python package supports a few Python versions on Windows at the time of writing, specifically Python 3.7, 3.8, and 3.9. If you get an error that
librdkafka/rdkafka.his not found, you can try following the steps at this GitHub Issue. In our particular case with an Apple M1 we solved this problem by executingbrew install librdkafkaexport LIBRARY_PATH=/opt/homebrew/Cellar/librdkafka/1.8.2/libexport C_INCLUDE_PATH=/opt/homebrew/Cellar/librdkafka/1.8.2/includepip3 install -r requirements.txt
With the producer script up and running, you will be collating ETH market prices every fifteen seconds, where this data will be sent to your Kafka topic. Your QuestDB instance then automatically updates its database with the data from the monitored Kafka topic. With this data pipeline and connection in place, your QuestDB instance will be populated with data at fifteen-seconds intervals.
Try running the following command a few times to observe the database updating itself from the Kafka topic:
With the data on your QuestDB instance, you can query or modify it and even generate more records to be sent to other Kafka topics, creating materialized views and key performance indicators from your data.
Note: To take down the installed containers used in this article, move to the
coincap_kafka_questdb/dockerdirectory and run the following command:
Data pipelines are a central consideration in the effective movement and transformation of data for your use. To efficiently collate raw data from where they are generated and transform this raw data into valuable insights, you need data pipelines.
In this article, you learned how to collate data with Kafka and implement a data pipeline that collects real-time ETH market data and stores data to QuestDB through Kafka connections.
QuestDB is an open source SQL database with a focus on fast performance and ease of use. It is an optimized storage for high-volume time series data, whether from your financial services or sensor applications, where time series data are constantly being generated. It satisfies the need for high-performance ingestion and query times. Used in conjunction with Kafka, you can aggregate data from multiple sources, modify them, and store them for use at a constantly-updating rate that fits your end user or application.