QuestDB ships a QuestDB Kafka connector for fast ingestion from Kafka into QuestDB. This is also useful for processing change data capture for the dataflow. The connector is based on the Kafka Connect framework and acts as a sink for Kafka topics.
This page has the following main sections:
This guide shows the steps to use the QuestDB Kafka connector to read JSON data from Kafka topics and write them as rows into a QuestDB table.
For Confluent users, please check the instructions in the Confluent Docker images.
You will need the following:
- A running QuestDB instance
- A local JDK installation
Before starting Kafka, the following steps must be completed:
- Download the connector file.
The Apache Kafka distribution contains the Kafka Connect framework, but the
QuestDB-specific components need to be downloaded from
the QuestDB Kafka connector GH page,
under the zip archive named
The connector is also available from the Confluent Hub.
- Copy the file to the Kafka
Once downloaded, unzip the contents of the archive and copy the required
files to the Kafka
You can automate downloading the latest connector package by running this command:
- Set the configuration file.
A configuration file
be created for Kafka Connect in the standalone mode. The host and port of the
running QuestDB server must be defined. A topic can be specified under the
The example below creates a configuration file. It assumes a running QuestDB
server on the InfluxDB Line Protocol default port,
9009, creates a reader from
a Kafka topic,
example-topic, and writes into a QuestDB table,
The commands listed in this section must be run from the Kafka home directory and in the order shown below.
- Start the Kafka Zookeeper used to coordinate the server:
- Start a Kafka server:
- Start the QuestDB Kafka connector:
Messages can be published via the console producer script:
A greater-than symbol,
>, indicates that a message can be published to the
example topic from the interactive session. Paste the following minified JSON as
a single line to publish the message and create the table
example-topic in the
To verify that the data has been ingested into the
example-topic table, the
following request to QuestDB's
/exp REST API endpoint can be made to export
the table contents via the curl command:
The expected response based on the example JSON message published above will be similar to the following:
If you can see the expected result then congratulations, you have successfully created and executed your first Kafka to QuestDB pipeline! 🎉
This section lists configuration options as well as further information about the Kafka Connect connector.
The connector supports the following configuration options:
|topics||orders,audit||N/A||Topics to read from|
|key.converter||org.apache.kafka.connect.storage.StringConverter||N/A||Converter for keys stored in Kafka|
|value.converter||org.apache.kafka.connect.json.JsonConverter||N/A||Converter for values stored in Kafka|
|host||localhost:9009||N/A||Host and port where QuestDB server is running|
|table||my_table||Same as Topic name||Target table in QuestDB|
|key.prefix||from_key||key||Prefix for key fields|
|value.prefix||from_value||N/A||Prefix for value fields|
|skip.unsupported.types||false||false||Skip unsupported types|
|timestamp.field.name||pickup_time||N/A||Designated timestamp field name|
|timestamp.units||micros||auto||Designated timestamp field units|
|timestamp.kafka.native||true||false||Use Kafka timestamps as designated timestamps|
|timestamp.string.fields||creation_time,pickup_time||N/A||String fields with textual timestamps|
|timestamp.string.format||yyyy-MM-dd HH:mm:ss.SSSUUU z||N/A||Timestamp format, used when parsing timestamp string fields|
|include.key||false||true||Include message key in target table|
|symbols||instrument,stock||N/A||Comma separated list of columns that should be symbol type|
|doubles||volume,price||N/A||Comma separated list of columns that should be double type|
|username||user1||admin||User name for QuestDB. Used only when token is non-empty|
|token||QgHCOyq35D5HocCMrUGJinEsjEscJlC||N/A||Token for QuestDB authentication|
|tls||true||false||Use TLS for QuestDB connection|
|retry.backoff.ms||1000||3000||Connection retry interval in milliseconds|
|max.retries||1||10||Maximum number of connection retry attempts|
The connector reads data from Kafka topics and writes it to QuestDB tables via InfluxDB Line Protocol. The connector converts each field in the Kafka message to a column in the QuestDB table. Structures and maps are flatted into columns.
Example: Consider the following Kafka message:
The connector will create a table with the following columns:
|firstname string||lastname string||age long||address_street string||address_city string|
|John||Doe||30||Main Street||New York|
The connector does not deserialize data independently. It relies on Kafka
Connect converters. The connector has been tested predominantly with JSON, but
it should work with any converter, including Avro. Converters can be configured
value.converter options, both are included in the
Configuration options table above.
The connector supports designated timestamps.
There are three distinct strategies for designated timestamp handling:
- QuestDB server assigns a timestamp when it receives data from the connector. (Default)
- The connector extracts the timestamp from the Kafka message payload.
- The connector extracts timestamps from Kafka message metadata.
Kafka messages carry various metadata, one of which is a timestamp.
To use the Kafka message metadata timestamp as a QuestDB designated timestamp,
If a message payload contains a timestamp field, the connector can utilize it
as a designated timestamp. The field's name should be configured using the
timestamp.field.name option. This field should either be an integer or a timestamp.
When the field is defined as an integer, the connector will automatically detect its units. This is applicable for timestamps after
04/26/1970, 5:46:40 PM.
The units can also be configured explicitly using the
timestamp.units option, which supports the following values:
Note: These 3 strategies are mutually exclusive. Cannot set both
Kafka messages often contain timestamps in a textual format. The connector can parse these and use them as timestamps. Configure field names as a string with the
timestamp.string.fields option. Set the timestamp format with the
timestamp.string.format option, which adheres to the QuestDB timestamp format.
See the QuestDB timestamp documentation for more details.
Consider the following Kafka message:
To use the
born field as a designated timestamp and
died as a regular timestamp
set the following properties in your QuestDB connector configuration:
timestamp.field.name=born- the field
bornis a designated timestamp.
timestamp.string.fields=died- set the field name
diedas a textual timestamp. Notice this option does not contain the field
born. This field is already set as a designated timestamp so the connector will attempt to parse it as a timestamp automatically.
timestamp.string.format=yyyy-MM-dd HH:mm:ss.SSSUUU z- set the timestamp format. Please note the correct format for microseconds is
SSSUUU(3 digits for milliseconds and 3 digits for microseconds).
QuestDB supports a special type called
symbol. Use the
configuration option to specify which columns should be created as the
When a configured Kafka Connect deserializer provides a schema, the connector
uses it to determine column types. If a schema is unavailable, the connector
infers the type from the value. This might produce unexpected results for
floating point numbers, which may be interpreted as
long initially and
generates an error.
Consider this example:
Kafka Connect JSON converter deserializes the
volume field as a
The connector sends it to the QuestDB server as a
long value. If the target
table does not have a column
volume, the database creates a
long column. If
the next message contains a floating point value for the
volume field, the
connector sends it to QuestDB as a
double value. This causes an error because
the existing column
volume is of type
To avoid this problem, the connector can be configured to send selected numeric
double regardless of the actual initial input value. Use the
doubles configuration option to specify which columns should the connector
always send as the
When a target table does not exist in QuestDB, it will be created automatically. This is the recommended approach for development and testing.
In production, it's recommended to use the SQL CREATE TABLE keyword, because it gives you more control over the table schema, allowing per-table partitioning, creating indexes, etc.
Does this connector work with Schema Registry?
The Connector works independently of the serialization strategy used. It relies
on Kafka Connect converters to deserialize data. Converters can be configured
value.converter options, see the configuration
I'm getting this error: "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires 'schema' and 'payload' fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration."
This error means that the connector is trying to deserialize data using a
converter that expects a schema. The connector does not require schemas, so you
need to configure the converter to not expect a schema. For example, if you are
using a JSON converter, you need to set
key.converter.schemas.enable=false in the connector configuration.
Does this connector work with Debezium?
Yes, it's been tested with Debezium as a source and a
is available. Bear in mind that QuestDB is meant to be used as an append-only
database; hence, updates should be translated as new inserts. The connector
ExtractNewRecordState transformation to extract the new
state of the record. The transformation by default drops DELETE events, so there
is no need to handle them explicitly.
QuestDB is a time-series database, how does it fit into Change Data Capture via Debezium?
QuestDB works with Debezium just great! This is the recommended pattern: Transactional applications use a relational database to store the current state of the data. QuestDB is used to store the history of changes. Example: Imagine you have a PostgreSQL table with the most recent stock prices. Whenever a stock price changes, an application updates the PostgreSQL table. Debezium captures each UPDATE/INSERT and pushes it as an event to Kafka. Kafka Connect QuestDB connector reads the events and inserts them into QuestDB. In this way, PostgreSQL will have the most recent stock prices and QuestDB will have the history of changes. You can use QuestDB to build a dashboard with the most recent stock prices and a chart with the history of changes.
How I can select which fields to include in the target table?
Use the ReplaceField transformation to remove unwanted fields. For example, if
you want to remove the
address field, you can use the following configuration:
See ReplaceField documentation for more details.
I need to run Kafka Connect on Java 8, but the connector says it requires Java 11. What should I do?
The Kafka Connect-specific part of the connectors works with Java 8. The
requirement for Java 11 is coming from QuestDB client itself. The zip archive
contains 2 JARs:
questdb-VERSION.jar. You can replace the latter with
questdb-VERSION-jdk8.jar from the
Please note that this setup is not officially supported, and you may encounter
issues. If you do, please report them to us.