How Reliable is Your Distributed Database?

Alex Pelagenko

Alex Pelagenko

QuestDB team

A brief analysis of what can go wrong with databases and how different time-series databases can use replication to solve common write failover scenarios.

Failover#

Last year, I started to commute to the office since the pandemic loosened its grip on London life. To beat the traffic and unreliable (as well as expensive!) trains, I bought a quality electric bicycle from a famous Swiss-American brand. It's a well-thought-out device; sometimes I even want to call it a gadget! Despite how enjoyable it is, in the last 6 months, I've had to go to the bike repair shop to fix quite a few things. There were some gearbox glitches (yes, it has a gearbox), then broken electrical wiring to the backlight stopped the bike from running. Soon after that, I had to do some trivial maintenance, replacing the a tire and brake pads. Finally, the driving belt snapped (it has a belt instead of a chain). Electric bikes are hot at the moment, but not mainstream yet, and annoyingly it takes a few days or even weeks for the replacement parts to come in.

commute failover problem
Commute failover problem

I like the bike when it works, but on its bad days, I cannot help but think that I have a strong case for redundancy. What if I had another bike to ride while my current one is being fixed? That would definitely help.

In a sense, databases are like bicycles. Sometimes you ride them, but other times they can have a bad day. Fortunately, databases are much more reliable than my commuting piece of art, but there are still some rough edges. For example, if database hardware fails, there is an option to recover your data from a backup. But that would be comparable to going to the shop to buy a new bike every time it breaks. Even if a new bike was reasonably cheap, it would still take time to buy and set up; similar to how restoring DB backups also comes along with downtime. Ideally, I'd want to have a second bike ready to go as a hot backup, or in database terms, I'd want to have a replicated database node to switch to in case of a failure.

Time series databases are designed to be writing data constantly, and one of the basic use cases of any time series database is to support nonstop writing. Using my bike analogy, if I had a device on my bicycle to record my trip with very high precision, saving the location multiple times per second, that would be somewhat similar to this requirement. And in an ideal world, if the bike breaks down, I could just switch to a spare right on the spot, in the middle of the road, and continue my journey along with all of my trip data saved without delay.

A similar scenario is when an application writes to a database. When it sends inserts to a table and then suddenly gets a network disconnect, the application should switch to a replicated node and continue inserting.

For example, if I had an application App writing to DB Node 1 with replica DB Node 2 I could draw this scenario as a sequence diagram:

Write failover use case
Write failover use case

Seems straightforward, but is it easy to get a database to play its part in this dance? I believe it's not. To achieve this, the database has to be able to write data into the same table A through multiple nodes. In the general case, the database also has to evolve the schema to support adding/removing columns from multiple nodes and applying the transaction in exactly the same order on all the replicas. This is because some transactions like INSERT and UPDATE that are executed on the same row will have different outcomes if applied in the wrong order.

To summarize, a database needs to be able to:

  • Write data to the same table using multiple DB Nodes (multi-master replication) or mirror the data to a read-only replicated node with automatic failover

  • Evolve table schemas such that the same table columns can be added simultaneously from multiple connections

  • Maintain the global order of the writes and schema changes across replica nodes

Sync and Async replication#

Different things can go wrong with databases when a failure happens. For instance, there can be two reasons why the database Node 1 may not reply to the second insert, insert into A values(2): it can either be that the node fails to receive and process the transaction, or that the data is inserted but the OK reply is not delivered back to the application due to a network disconnect.

To avoid losing the second insert after disconnecting from Node 1, the application has to repeat transaction insert into A values(2) to Node 2. The application also has to do the repeated insert into A values(2) attempt in such a way that the database does not create a duplicate row in the case when the same insert has already been processed by Node 1 but an OK reply is not delivered back to the application. In order to achieve these goals, all data inserted into Node 1 has to be readable from Node 2 immediately for the application to perform the deduplication on failures. Alternatively, there must be another built-in mechanism to de-duplicate the inserted data in the database. This leads to the conclusion that at least one of the points below has to exist for no-gap, no-duplicate write failover:

  • Data written to one node is immediately selectable through all other replicas so that the writing application can check for the duplicates

  • There is a de-duplication mechanism built into the insert protocol or table storage

The first bullet point above is also called Synchronous (Sync) Replication where Node 1 replies OK only after an insert is already replicated to Node 2. In contrast, Asynchronous (Async) replication allows Node 1 to reply OK to the App before replicating the inserted data to Node 2. This way Node 2 may not receive the first insert before Node 1 goes down, so the database will have to reattempt to replicate the data when Node 1 is started or connected back.

Choice of replication flavor#

Back to my bike analogy, a databased configured in single primary with a read-only replica in Sync mode is something akin to riding one bike while simultaneously trying to roll another alongside you at all times. You can imagine that it's pretty hard to ride while simultaneously balancing on two bikes, and that it's nearly impossible to actually cycle quickly!

Sync bike riding
Sync replication bike riding

On the other hand, a database configured in single primary with a read-only Async replica is similar to me buying a spare bike, storing it at work, and synchronizing my trip data when the bikes are at the same place. In case of a breakdown, I'll lose data from that particular journey but I would still be able to continue commuting on the spare bike the same day.

I imagine that multi-master replication is something like riding with a friend on two tandem bikes, where everyone is riding the tandem on the first seat. If one of the tandems breaks, the rider can move to the back seat of the other one. Multi-master Sync replication is riding two tandems next to each other without the freedom of turning or stopping independently. Async replication would be the independent rides with occasional location/trip data catch-ups.

Async replication is the most reasonable choice in the world of bicycles; I do not see people running bikes in sync on the streets. It is also a default/only choice for many in the database world. Sync replication can be simple to reason about, and may look like the best solution overall, but it has a hefty performance price to pay since each step needs to wait until it is completed on every node. Counterintuitively, even though synchronous replication makes data available to multiple nodes at the same time, it also results in lower availability of the cluster since it dramatically reduces the transactions rate.

There is no silver bullet for the write failover problem and every database offers different replication flavors to choose from. To find the best option for QuestDB, we did serious research on how replication in other time-series databases handles the write failover and here are some of the results.

TimescaleDB#

Everyone loves classic relational databases, and nearly every developer is prepared to answer interview questions about ACID properties and sometimes even about Transaction Isolation levels of different RDMS systems (and what can go wrong with each of them!). Building distributed Read / Write applications using a RDBMS is not easy but it is definitely doable.

PostgreSQL is one of the leading open source RDBMSes, and many say that it's more than just a database. Postgres is also an extensible platform where you can, for example, add a geographical location column type, a geospatial SQL Query syntax, and indexes, effectively turning it into GIS system. Similarly, TimescaleDB is a PostgreSQL extension built to optimize the storage and query performance of time-series workloads.

Out of the box, TimescaleDB inherits its replication functionality from Postgres. PostgreSQL supports multiple read-only replicas with Sync or Async replication with all the ACID and Transaction Isolation properties, and so does Timescale.

Unfortunately, automatic failover is solved neither by PostgreSQL nor TimescaleDB, but there are 3rd-party solutions like Patroni that add support for that functionality. PostgreSQL describes the process of failover as STONITH (Shoot The Other Node In The Head), meaning that the primary node has to be shot down once it starts to misbehave.

Running Sync replication can solve the data gaps and duplicate problems after the failover. If the application detects the failover, it can re-run the last non-confirmed INSERT as an UPSERT. With Async replication, a few recent transactions may be missing on the replica that is promoted to primary. This is because the old primary node had to be shot down (STONISHed) and there is no trivial way to move the missing data from that "dead" node to the new primary node post-failover.

ClickHouse#

ClickHouse had been developed open source for many years by Yandex, a search provider in Russia. ClickHouse's functionality in open source (Apache 2.0) is comprehensive and includes high availability and horizontal scaling. There are also quite a few independent managed cloud offerings that support ClickHouse:

It makes sense to talk about ClickHouse's replication in the context of its Open Source product, since cloud features can vary dramatically from provider to provider.

The great thing about ClickHouse open source is that it supports multi-master replication. So if one creates a cluster with 2 nodes (Node 1 and Node 2) and replicated table A (using the ReplicationMergeTree engine):

When Bob sends to Node 1

INSERT INTO A VALUES(1);

And Alice sends to Node 2

INSERT INTO A VALUES(2);

Both records will be written to each of the nodes. When the INSERT INTO A VALUES(1) statement is received on Node 1, ClickHouse writes it to part 1_1. Next, Node 1 registers the data part with the Zookeeper (or ClickHouse Keeper). Zookeeper notifies each node about the new part, and the nodes download the data from the source and apply it to the local table replica. The same process happens simultaneously with INSERT 2.

ClickHouse multi-master replication
ClickHouse multi-master replication

In this architecture, inserts can be written to each of the nodes in parallel.

What about reading the data back? ClickHouse documentation states that the replication process is Asynchronous and it may take some time for Node 2 to catch up with Node 1. There is however an option to specify insert_quorum with every insert. If the insert_quorum is set to 2 then the application gets confirmation back from the database after both Node 1 and Node 2 have inserted the data, effectively turning this into Sync replication. There are a few more settings to consider like insert_quorum_parallel, insert_quorum_timeout, and select_sequential_consistency to define how concurrent parallel inserts work.

It is also possible to modify the table schema by adding new columns on the replicated table. An ALTER TABLE statement can be sent to any of the nodes in the cluster, and it will be replicated across the nodes. ClickHouse does not allow concurrent table schema change execution so if 2 of the nodes receive the same non-conflicting statement:

ALTER table A add column if not exists TIMESTAMP datatime64(6)

one of the nodes can reply with the failure:

Metadata on replica is not up to date with common metadata in Zookeeper.
It means that this replica still not applied some of previous alters.
Probably too many alters executing concurrently (highly not recommended).
You can retry this error

SQL Update statements are also written in ClickHouse dialect as ALTER TABLE but fortunately, they can be executed in parallel without the above error.

ClickHouse also has a useful method to solve lost write confirmations; in cases where an INSERT (or other) query confirmation is lost because of a network disconnect or timeout, the client can resend the whole block of data in exactly the same way to any other available node. The receiving node then calculates the hash code of the data and not apply it a second time if it is able to recognize that this data has already been applied via another node.

There are more options and flavors of how to set up replication in ClickHouse, the most popular approach being ReplicatedMergeTree storage.

InfluxDB#

InfluxDB has the highest DB engines time series ranking at the time of writing, and I feel that it has to be included here even though InfluxData removed the clustering product from the open source version in 2016 to sell it as a commercial product, InfluxDB Enterprise. Since then, their focus has shifted from the enterprise version to the cloud offering in recent years, where they have built InfluxDB Cloud v2. While this is a closed-source system, its high-level architecture is deducible from the marketing diagrams InfluxData officially provides.

InfluxDB cloud architecture
InfluxDB cloud architecture

InfluxDB Cloud v2 persists incoming writes to the Write Ahead Log (WAL) written over a Kafka cluster. It is a clean solution that solves the durability and distribution of the WAL and ensures that the data is already replicated when the client receives a write confirmation.

WAL application to "Queriable" table storage runs asynchronously in the Ingester component, consuming messages from Kafka and writing them to 2 independent TSDB copies. There is a delay between reading the message confirmed to be written, in Influx terms this is called Time to Become Readable.

Influx protocol messages are idempotent in the sense that the same message can be processed many times without creating duplicates. This is because in InfluxDB, the same set of tags can have only one row per timestamp value. So if one sends a line:

measurement1 field=34.23 1562387656300000000

and then sends another line with the same measurement and timestamp:

measurement1 new_fields=0.1234 1562387656300000000

the new field value will be added to the same line as if the fields were sent together, in the same message:

measurement1 field=34.23,new_fields=0.1234 1562387656300000000

And if any of the above messages are sent again, Influx will not add a new row. This approach solves the problem of resending data on timeout or lost replies. So if the client does not receive a write confirmation from Influx cloud, it can re-send the same data again and again. When the data is sent with the same timestamp and tag set from different connections:

measurement1 field1=1 1677851990000000000
measurement1 field1=2,field2=2000 1677851990000000000

The querying storage nodes can become inconsistent for some time, returning any row out of the 3:

field1field2time
12023-03-03T13:59:50.000Z
220002023-03-03T13:59:50.000Z
120002023-03-03T13:59:50.000Z

It can even be that the first query returns field1=1, a second query returns field1=2 and then a third tries flipping back to field1=1. Eventually, the query result will become stable and return the same data on each run. This is a very typical outcome for querying nodes in Round Robin with Async replication.

The Influx data model also solves the problem of a dynamically evolving schema. Since there are no traditional columns (since any unknown fields and tags that are encountered are added automatically by the database engine), there is no problem writing a different set of fields for the same measurement by design. Influx also checks for schema conflicts and returns errors to the writing application if there are any. For example, if the same field is sent as a number and then as a string:

m1 value=34.23 1562387656300000000
m1 value="hello" 1562387656300000123

The write will fail with the error column value is type f64 but write has type string or column value is type string but write has type f64.

Summary#

Here is the summary of the replication features supported by time-series databases relevant to High Availability write use case:

PostgreSQL / TimescaleDBClickHouseInfluxDB Cloud
Multi-master replicationNoYesNo
Supports Sync replicationYesYesNo
Supports Async replicationYesYesYes
Concurrently evolves replicated table schemaYesYes¹Yes²
Same Insert / Update order on all nodesYesYesYes²
No gaps and duplicates after failoverSync mode onlyYesYes
Uses WAL for ReplicationYesYes³Yes

¹ Concurrent schema updates have to be re-tried

² InfluxDB cloud is a closed-source system, certain conclusions are made on the assumption of the reasonable use of Kafka WAL partitioning and the correctness of this claim depends on the implementation.

³ ClickHouse replication Data Part plays the role of WAL

To draw up some conclusions:

  • All 3 systems replicate by writing to the Write Ahead Log and copying it across the nodes.

  • Asynchronous replication, where data written to Node 1 is eventually visible at Node 2, is the most popular approach used by InfluxDB Cloud and is the default in both ClickHouse and Postgres.

  • Postgres / Timescale replication can be used in both synchronous and asynchronous modes, but it does not have multi-master replication and there is no option for automatic failover. It is not possible to solve write failover without additional software systems or human intervention.

  • Multi-master replication is available in ClickHouse. There are also enough available settings to strike an appropriate balance between experiencing data loss (in extreme scenarios) and writing throughput.

  • InfluxDB does not offer replication support in its open source product. There is a closed-source cloud solution that leverages Kafka to solve automatic write failover. Kafka replication is not multi-master but with the help of automatic failover, it solves high-availability write use cases.

QuestDB Replication Plans#

QuestDB released Write Ahead Log table storage mode in v7.0 as the first step in our replication journey. It uses a multi-master write architecture internally to make non-locking writes to the same table possible from parallel connections. Transactions are written in parallel to different WAL segments, and a global order of commits is maintained in a Sequencer component. The most complicated bit is automatic schema conflict resolution so that table schema changes can also be be performed in parallel.

QuestDB WAL writing
QuestDB WAL writing

We tried to avoid having a Write Ahead Log for a long time, writing directly into the table storage. It was not an easy decision to accept WAL write amplification for the sake of a cleaner path to replication and non-locking parallel writes. In the end, the additional write operations did not impact overall throughput. On the contrary, because of better parallelism, we achieved 3x better write performance in Time Series Benchmarking Suite compared to our own (quite extraordinary) performance for non-WAL tables.

Looking at how other databases solve the replication problem, we chose our goal to be achieving multi-master replication with Async consistency. We believe that this approach strikes the best balance of fault tolerance and transaction throughput. And it is essential to have a built-in write de-duplication mechanism for automatic write failover cases. The next steps for QuestDB will be to move the built-in Sequencer component to a distributed environment and solve WAL sharing between multiple instances.

Riding tandem bicycles with a friend is the best redundancy solution we see for QuestDB. And as for my commute problem, well, I still don't know how to solve it. You are more than welcome to join our Slack Community and share your feedback. You can also play with QuestDB live demo or play.questdb.io to see how fast it rides. And, of course, open-source contributions to our project on GitHub are more than welcome.

An icon showing wave propagation

Join our developer community

QuestDB is open source. Follow us on Twitter, star our GitHub repo, and join our developer community on Slack!

An icon showing a paper plane

Subscribe to our newsletter

Stay up to date with all things QuestDB