QuestDB Flink connector

Apache Flink is a popular framework and stream processing engine. QuestDB ships a QuestDB Flink Sink connector for fast ingestion from Apache Flink into QuestDB. The connector implements the Table API and SQL for Flink.

Apache Flink logo

Quick start​

This section shows the steps to use the QuestDB Flink connector to ingest data from Flink into QuestDB. The connector uses the SQL interface to interact with Flink. The overall steps are the followings:

  1. The connector creates a table in Flink backed by QuestDB.
  2. The connector inserts data into the table.
  3. Finally it queries the data in QuestDB.

Prerequisites​

  • A local JDK version 11 installation
  • Docker for running QuestDB

Connector installation​

  • Start the QuestDB container image:

    docker run -p 9000:9000 -p 9009:9009 questdb/questdb:7.4.0
  • Download Apache Flink distribution and unpack it.

  • Download the QuestDB Flink connector from Maven Central and place it in the lib directory of your Flink installation.

  • Go to the bin directory of your Flink installation and run the following to start a Flink cluster:

    ./start-cluster.sh
  • While still in the bin directory, start a Flink SQL console by running:

    ./sql-client.sh

    Then, run the following SQL command in the Flink SQL console:

    CREATE TABLE Orders (
    order_number BIGINT,
    price BIGINT,
    buyer STRING
    ) WITH (
    'connector'='questdb',
    'host'='localhost'
    );

    Expected output: [INFO] Execute statement succeed.

    This command created a Flink table backed by QuestDB. The table is called Orders and has three columns: order_number, price, and buyer. The connector option specifies the QuestDB Flink connector. The host option specifies the host and port where QuestDB is running. The default port is 9009.

  • While still in the Flink SQL console execute:

    INSERT INTO Orders values (0, 42, 'IBM');

    Expected output:

    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: <random hexadecimal id>

    This command used Flink SQL to insert a row into the Orders table in Flink. The table is connected to QuestDB, so the row is also into QuestDB.

  • Go to the QuestDB web console http://localhost:9000 and execute this query:

    SELECT * FROM Orders;

    You should see a table with one row.

    QuestDB web console screenshot with the query result

Congratulations! You have successfully used the QuestDB Flink connector to ingest data from Flink into QuestDB. You can now build Flink data pipelines that use QuestDB as a sink.

See the QuestDB Flink connector GitHub repository for more examples.

Configuration​

The QuestDB Flink connector supports the following configuration options:

NameTypeExampleDefaultMeaning
hoststringlocalhost:9009N/AHost and port where QuestDB server is running
usernamestringtestUser1adminUsername for authentication. The default is used when also token is set.
tokenstringGwBXoGG5c6NoUTLXnzMxwadminToken for authentication
tablestringmy_tableSame as Flink table nameTarget table in QuestDB
tlsbooleantruefalseWhether to use TLS/SSL for connecting to QuestDB server
buffer.size.kbinteger3264Size of the QuestDB client send buffer
sink.parallelisminteger2Same as upstream processorsQuestDB Sink Parallelism

Example configuration for connecting to QuestDB running on localhost:

CREATE TABLE Orders (
order_number BIGINT,
price BIGINT,
buyer STRING
) WITH (
'connector'='questdb',
'host'='localhost',
'table' = 'orders'
);

Example configuration for connecting to QuestDB running in QuestDB Cloud:

CREATE TABLE Orders (
order_number BIGINT,
price BIGINT,
buyer STRING
) WITH (
'connector'='questdb',
'host'='agreeable-brown-297-bee317da.ilp.b04c.questdb.net:31277',
'username' = 'admin',
'token' = 'KBeYuNwOHzEuxQ72YnToBCpQN7WVOHDm-oTp5dVNB1o',
'tls' = 'true',
'table' = 'orders'
);

Connector Distribution​

The connector is distributed as a single jar file. The jar file is available in the Maven Central repository and it's available under the following coordinates:

<dependency>
<groupId>org.questdb</groupId>
<artifactId>flink-questdb-connector</artifactId>
<version>LATEST</version>
</dependency>

The latest version is: a badge with the latest connector version in Maven Central

FAQ​

Q: Why is QuestDB client not repackaged into a different Java package?
A: QuestDB client uses native code, this makes repackaging hard.

Q: I need to use QuestDB as a Flink source, what should I do?
A: This connector is Sink only. If you want to use QuestDB as a Source then your best chance is to use Flink JDBC source and rely on QuestDB Postgres compatibility.