QuestDB Kafka Connector

QuestDB ships a QuestDB Kafka connector for fast ingestion from Kafka into QuestDB. This is also useful for processing change data capture for the dataflow. The connector is based on the Kafka Connect framework and acts as a sink for Kafka topics.

This page has the following main sections:

Integration guide#

This guide shows the steps to use the QuestDB Kafka connector to read JSON data from Kafka topics and write them as rows into a QuestDB table. For Confluent users, please check the instructions in the Confluent Docker images.

Prerequisites#

You will need the following:

Configure Kafka#

info

Before you begin manual configuration, note that that the connector is also available from the Confluent Hub.

Download the latest QuestDB Kafka connector package:

curl -s https://api.github.com/repos/questdb/kafka-questdb-connector/releases/latest |
jq -r '.assets[]|select(.content_type == "application/zip")|.browser_download_url'|
wget -qi -

Next, unzip the contents of the archive and copy the required .jar files to your Kafka libs directory:

unzip kafka-questdb-connector-*-bin.zip
cd kafka-questdb-connector
cp ./*.jar /path/to/kafka_*.*-*.*.*/libs

Set Kafka configuration file#

Create a Kafka Connect configuration file at /path/to/kafka/config/questdb-connector.properties. You can also define a host, port, as well as a topic under the topics={mytopic} key. For more information on how to configure properties, see the configuration manual.

Next, follow whichever pattern you are using: self-hosted or QuestDB Cloud:


Self-hosted#

This example file:

  1. Assumes a running InfluxDB Line Protocol default port of 9009
  2. Creates a reader from a Kafka topic: example-topic
  3. Creates & writes into a QuestDB table: example_table
Create a configuration file
name=questdb-sink
host=localhost:9009
topics=example-topic
table=example_table
connector.class=io.questdb.kafka.QuestDBSinkConnector
# message format configuration
value.converter=org.apache.kafka.connect.json.JsonConverter
include.key=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false

QuestDB Cloud#

Retrieve the appropriate values from the QuestDB Cloud UI. You will need the following credentials to both configure Kafka, and then launch a test query. To do so, we will apply two endpoints: the InfluxDB Line Protocol API and the REST HTTP API.

All together, to configure Kafka we will set:

  • [QUESTDB_CLOUD_USERNAME]
  • [QUESTDB_CLOUD_TOKEN]
  • [QUESTDB_CLOUD_ILP_ENDPOINT]

And to query our instance we will apply:

  • [QUESTDB_CLOUD_USERNAME] (same as above)
  • [QUESTDB_CLOUD_HTTP_PASSWORD]
  • [QUESTDB_CLOUD_REST_ENDPOINT]

First, login.

Next, click into your instance.

Within the Security pane, copy the d value and note admin:

The QuestDB Cloud UI. It shows the first page after clicking into an instance. The security values on the right are revealed for example purposes.

Those are your [QUESTDB_CLOUD_TOKEN] and [QUESTDB_CLOUD_USERNAME], respectively. The token is part of your InfluxDB Line Protocol private key and the username is the same as your HTTP Basic Auth / PostgreSQL Wire Protocol User, which is used in both query and configuration.

Next, retrieve your [QUESTDB_CLOUD_ILP_ENDPOINT] within the Connect tab:

The Connect tab from within a QuestDB instance. It shows up to Endpoint, and also has the basic HTTP auth credentials at the top.

After configuration, we will query our instance to test. To do so, retrieve the HTTP Basic Auth / PostgreSQL Wire Protocol Password as your [QUESTDB_CLOUD_HTTP_PASSWORD] and the REST API URL as your [QUESTDB_CLOUD_REST_ENDPOINT], which is at the bottom of the Connect page:

The Connect tab from within a QuestDB instance. The bottom box shows the REST API endpoint. Retrieve it!

Finally, assemble all but your [QUESTDB_CLOUD_HTTP_PASSWORD] & [QUESTDB_CLOUD_REST_ENDPOINT] within your configuration file.

This example:

  1. Creates a reader from a Kafka topic: example-topic
  2. Creates & writes into a QuestDB table: example_table
name=questdb-cloud
tls=true
topics=example-topics
table=example-table
host=[QUESTDB_CLOUD_ILP_ENDPOINT]
username=[QUESTDB_CLOUD_USERNAME]
token=[QUESTDB_CLOUD_TOKEN]
connector.class=io.questdb.kafka.QuestDBSinkConnector
# message format configuration
value.converter=org.apache.kafka.connect.json.JsonConverter
include.key=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false

With your questdb-connector.properties file created, start Kafka.

Start Kafka#

The commands listed in this section must be run from the Kafka home directory and in the order shown below.

  1. Start the Kafka Zookeeper used to coordinate the server:
bin/zookeeper-server-start.sh config/zookeeper.properties
  1. Start a Kafka server:
bin/kafka-server-start.sh config/server.properties
  1. Start the QuestDB Kafka connector:
bin/connect-standalone.sh config/connect-standalone.properties config/questdb-connector.properties

Publish messages#

Messages can be published via the console producer script:

bin/kafka-console-producer.sh --topic example-topic --bootstrap-server localhost:9092

A greater-than symbol, >, indicates that a message can be published to the example topic from the interactive session. Paste the following minified JSON as a single line to publish the message and create the table example-topic in the QuestDB instance:

{"firstname": "Arthur", "lastname": "Dent", "age": 42}

Verify the integration#

To verify that the data has been ingested into the example-topic table, the following request to QuestDB's /exp REST API endpoint can be made to export the table contents via the curl command.


Self-hosted#

curl -G \
--data-urlencode "query=select * from 'example_table'" \
http://localhost:9000/exp

QuestDB Cloud#

Retrieve the appropriate credentials for your QuestDB REST endpoint.

Use the the same [QUESTDB_CLOUD_USERNAME], but now add your [QUESTDB_CLOUD_REST_ENDPOINT] and HTTP Basic Auth / PostgreSQL Wire Protocol Password [QUESTDB_CLOUD_HTTP_PASSWORD]:

curl --get \
--user "[QUESTDB_CLOUD_USERNAME]:[QUESTDB_CLOUD_HTTP_PASSWORD]" \
--data-urlencode "query=select * from 'example_table'" \
https://[QUESTDB_CLOUD_REST_ENDPOINT]/exp

The expected response based on the example JSON message published above will be similar to the following:

"firstname","age","lastname","timestamp"
"Arthur",42,"Dent","2022-11-01T13:11:55.558108Z"

If you can see the expected result, then congratulations!

You have successfully created and executed your first Kafka to QuestDB pipeline. ๐ŸŽ‰

Additional sample projects#

You can find additional sample projects on the QuestDB Kafka connector Github project page.

It includes a sample integration with Debezium for CDC from PostgreSQL.

Configuration manual#

This section lists configuration options as well as further information about the Kafka Connect connector.

Configuration Options#

The connector supports the following configuration options:

NameTypeExampleDefaultMeaning
topicsstringorders,auditN/ATopics to read from
key.converterstringorg.apache.kafka.connect.storage.StringConverterN/AConverter for keys stored in Kafka
value.converterstringorg.apache.kafka.connect.json.JsonConverterN/AConverter for values stored in Kafka
hoststringlocalhost:9009N/AHost and port where QuestDB server is running
tablestringmy_tableSame as Topic nameTarget table in QuestDB
key.prefixstringfrom_keykeyPrefix for key fields
value.prefixstringfrom_valueN/APrefix for value fields
skip.unsupported.typesbooleanfalsefalseSkip unsupported types
timestamp.field.namestringpickup_timeN/ADesignated timestamp field name
timestamp.unitsstringmicrosautoDesignated timestamp field units
timestamp.kafka.nativebooleantruefalseUse Kafka timestamps as designated timestamps
timestamp.string.fieldsstringcreation_time,pickup_timeN/AString fields with textual timestamps
timestamp.string.formatstringyyyy-MM-dd HH:mm:ss.SSSUUU zN/ATimestamp format, used when parsing timestamp string fields
include.keybooleanfalsetrueInclude message key in target table
symbolsstringinstrument,stockN/AComma separated list of columns that should be symbol type
doublesstringvolume,priceN/AComma separated list of columns that should be double type
usernamestringuser1adminUser name for QuestDB. Used only when token is non-empty
tokenstringQgHCOyq35D5HocCMrUGJinEsjEscJlCN/AToken for QuestDB authentication
tlsbooleantruefalseUse TLS for QuestDB connection
retry.backoff.mslong10003000Connection retry interval in milliseconds
max.retrieslong110Maximum number of connection retry attempts

How does the connector work?#

The connector reads data from Kafka topics and writes it to QuestDB tables via InfluxDB Line Protocol. The connector converts each field in the Kafka message to a column in the QuestDB table. Structures and maps are flatted into columns.

Example: Consider the following Kafka message:

{
"firstname": "John",
"lastname": "Doe",
"age": 30,
"address": {
"street": "Main Street",
"city": "New York"
}
}

The connector will create a table with the following columns:

firstname stringlastname stringage longaddress_street stringaddress_city string
JohnDoe30Main StreetNew York

Supported serialization formats#

The connector does not deserialize data independently. It relies on Kafka Connect converters. The connector has been tested predominantly with JSON, but it should work with any converter, including Avro. Converters can be configured using key.converter and value.converter options, both are included in the Configuration options table above.

Designated timestamps#

The connector supports designated timestamps.

There are three distinct strategies for designated timestamp handling:

  1. QuestDB server assigns a timestamp when it receives data from the connector. (Default)
  2. The connector extracts the timestamp from the Kafka message payload.
  3. The connector extracts timestamps from Kafka message metadata.

Kafka messages carry various metadata, one of which is a timestamp. To use the Kafka message metadata timestamp as a QuestDB designated timestamp, set timestamp.kafka.native to true.

If a message payload contains a timestamp field, the connector can utilize it as a designated timestamp. The field's name should be configured using the timestamp.field.name option. This field should either be an integer or a timestamp.

When the field is defined as an integer, the connector will automatically detect its units. This is applicable for timestamps after 04/26/1970, 5:46:40 PM.

The units can also be configured explicitly using the timestamp.units option, which supports the following values:

  • nanos
  • micros
  • millis
  • auto (default)

Note: These 3 strategies are mutually exclusive. Cannot set both timestamp.kafka.native=true and timestamp.field.name.

Textual timestamps parsing#

Kafka messages often contain timestamps in a textual format. The connector can parse these and use them as timestamps. Configure field names as a string with the timestamp.string.fields option. Set the timestamp format with the timestamp.string.format option, which adheres to the QuestDB timestamp format.

See the QuestDB timestamp documentation for more details.

Example#

Consider the following Kafka message:

{
"firstname": "John",
"lastname": "Doe",
"born": "1982-01-07 05:42:11.123456 UTC",
"died": "2031-05-01 09:11:42.456123 UTC"
}

To use the born field as a designated timestamp and died as a regular timestamp set the following properties in your QuestDB connector configuration:

  1. timestamp.field.name=born - the field born is a designated timestamp.
  2. timestamp.string.fields=died - set the field name died as a textual timestamp. Notice this option does not contain the field born. This field is already set as a designated timestamp so the connector will attempt to parse it as a timestamp automatically.
  3. timestamp.string.format=yyyy-MM-dd HH:mm:ss.SSSUUU z - set the timestamp format. Please note the correct format for microseconds is SSSUUU (3 digits for milliseconds and 3 digits for microseconds).

Symbol type#

QuestDB supports a special type called symbol. Use the symbols configuration option to specify which columns should be created as the symbol type.

Numeric type inference for floating point type#

When a configured Kafka Connect deserializer provides a schema, the connector uses it to determine column types. If a schema is unavailable, the connector infers the type from the value. This might produce unexpected results for floating point numbers, which may be interpreted as long initially and generates an error.

Consider this example:

{
"instrument": "BTC-USD",
"volume": 42
}

Kafka Connect JSON converter deserializes the volume field as a long value. The connector sends it to the QuestDB server as a long value. If the target table does not have a column volume, the database creates a long column. If the next message contains a floating point value for the volume field, the connector sends it to QuestDB as a double value. This causes an error because the existing column volume is of type long.

To avoid this problem, the connector can be configured to send selected numeric columns as double regardless of the actual initial input value. Use the doubles configuration option to specify which columns should the connector always send as the double type.

Target table considerations#

When a target table does not exist in QuestDB, it will be created automatically. This is the recommended approach for development and testing.

In production, it's recommended to use the SQL CREATE TABLE keyword, because it gives you more control over the table schema, allowing per-table partitioning, creating indexes, etc.

FAQ#

Does this connector work with Schema Registry?

The Connector works independently of the serialization strategy used. It relies on Kafka Connect converters to deserialize data. Converters can be configured using key.converter and value.converter options, see the configuration section above.

I'm getting this error: "org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires 'schema' and 'payload' fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration."

This error means that the connector is trying to deserialize data using a converter that expects a schema. The connector does not require schemas, so you need to configure the converter to not expect a schema. For example, if you are using a JSON converter, you need to set value.converter.schemas.enable=false or key.converter.schemas.enable=false in the connector configuration.

Does this connector work with Debezium?

Yes, it's been tested with Debezium as a source and a sample project is available. Bear in mind that QuestDB is meant to be used as an append-only database; hence, updates should be translated as new inserts. The connector supports Debezium's ExtractNewRecordState transformation to extract the new state of the record. The transformation by default drops DELETE events, so there is no need to handle them explicitly.

QuestDB is a time-series database, how does it fit into Change Data Capture via Debezium?

QuestDB works with Debezium just great! This is the recommended pattern: Transactional applications use a relational database to store the current state of the data. QuestDB is used to store the history of changes. Example: Imagine you have a PostgreSQL table with the most recent stock prices. Whenever a stock price changes, an application updates the PostgreSQL table. Debezium captures each UPDATE/INSERT and pushes it as an event to Kafka. Kafka Connect QuestDB connector reads the events and inserts them into QuestDB. In this way, PostgreSQL will have the most recent stock prices and QuestDB will have the history of changes. You can use QuestDB to build a dashboard with the most recent stock prices and a chart with the history of changes.

How I can select which fields to include in the target table?

Use the ReplaceField transformation to remove unwanted fields. For example, if you want to remove the address field, you can use the following configuration:

{
"name": "questdb-sink",
"config": {
"connector.class": "io.questdb.kafka.QuestDBSinkConnector",
"host": "localhost:9009",
"topics": "Orders",
"table": "orders_table",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "removeAddress",
"transforms.removeAddress.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.removeAddress.blacklist": "address"
}
}

See ReplaceField documentation for more details.

I need to run Kafka Connect on Java 8, but the connector says it requires Java 17. What should I do?

The Kafka Connect-specific part of the connectors works with Java 8. The requirement for Java 17 is coming from QuestDB client itself. The zip archive contains 2 JARs: questdb-kafka-connector-VERSION.jar and questdb-VERSION.jar. You can replace the latter with questdb-VERSION-jdk8.jar from the Maven central. Please note that this setup is not officially supported, and you may encounter issues. If you do, please report them to us.

See also#


โญ Something missing? Page not helpful? Please suggest an edit on GitHub.