Building a new vector based storage model

QuestDB is the world's fast growing time-series database. It offers premium ingestion throughput, enhanced SQL analytics that can power through analysis, and cost-saving hardware efficiency. It's open source and integrates with many tools and languages.

In the early stages of QuestDB, we were inspired by vector-based append-only systems like kdb+. The approach offered better speed and simplified code paths. Our needs also required that row timestamps be stored in ascending order, resulting in fast time series queries without an expensive index, which made the model even more compelling.

But we soon found out that this model does not fit well with all data acquisition use cases, such as out-of-order data. Although several workarounds were available, we wanted to provide this functionality without losing any performance. This changed was shipped two years ago, in QuestDB 6.0. But we feel it's a fun time capsule, solving a critical problem which benefitted us greatly today.

The solution that we present in this article is built from scratch. It's a neat technical story detailing the sort of problem you'll run into when building your own database, and there's sample code and helpful images.

The problem with out-of-order data

First, let's look at the challenge with out-of-order or O3 data. Our initial data model had one fatal flaw - records were discarded if they appeared out-of-order by timestamp compared to existing data. That's not ideal. In real-world applications, payload data doesn’t behave like this way. There's network jitter, latency, or clock synchronization issues and that make out-of-order data mroe common.

We knew that the lack of out-of-order (O3) support was a showstopper for some users and we needed a solid solution. There were possible workarounds, such as using a single table per data source or re-ordering tables periodically, but for most users this is neither convenient nor sustainable.

Options to deal with O3 data

As we reviewed our data model, one possibility was to use something radically different from what we already had, such as including LSM trees or B-trees, commonly used in time series databases. Adding trees would bring the benefit of ordered data, on the fly, without inventing a replacement storage model from scratch.

But what bothered us most with this approach is that every subsequent read operation would face a performance penalty versus having data stored in arrays. This also introduced complexity, as we would have duplicate storage models. One for ordered data and another for O3 data.

A more promising option was to introduce a sort-and-merge phase as data arrives. This way, our storage model remained unchanged. Butnow, we'd merge data on the fly, with ordered vectors landing on disk as the output.

Early thoughts on a solution

Our idea of how we could handle O3 ingestion was a three-stage approach:

  1. Keep the append model until records arrive out-of-order
  2. Sort uncommitted records into a staging area in-memory
  3. Reconcile and merge the sorted O3 data and persisted data at commit time

The first two steps are straightforward and easy to implement. Handling append-only data remains unchanged. The heavy O3 commit kicks in only when there is data in the staging area. The bonus of this design is that it outputs to vectors, meaning our vector-based readers are still compatible.

The potential downside is that this pre-commit sort-and-merge adds an extra processing phase to ingestion with an accompanying performance penalty. We nevertheless decided to explore this approach and see how far we could reduce the penalty by optimizing the O3 commit.

Quick definitions

Before we get too far, let's add some quick definitions:

  • Vector-based append-only systems: Data storage models where data is written sequentially (appended) to vectors (arrays), optimized for high-speed reads and writes.

  • SIMD (Single Instruction, Multiple Data): Parallel processing method where a single instruction operates on multiple data points at once. It is much more efficient.

  • Non-temporal data access: A memory access method that bypasses the CPU cache, directly writing or reading data from memory, reducing cache pollution for certain operations.

Now that we've got that squared away, let's dive into the implementation.

What happens on O3 commit?

Processing a staging area in bulk gives us a unique opportunity to analyze the data holistically. Such analysis aims to avoid physical merges altogether where possible and perhaps get away with fast and straightforward memcpy or similar data movement methods. Such methods can be parallelized thanks to our column-based storage. We can employ SIMD and non-temporal data access where it makes a difference.

We sort the timestamp column from the staging area via an optimized version of radix sort, and the resulting index is used to reshuffle the remaining columns in the staging area in parallel:

The now-sorted staging area is mapped relative to the existing partition data. It may not be obvious from the start, but we are trying to establish the type of operation needed and the dimensions of each of the three groups below:

When merging datasets in this way, the prefix and suffix groups can be persisted data, O3 data, or none. The merge group is where more cases occur as it can be occupied by persisted data, O3 data, both O3 and persisted data, or none.

When it's clear how to group and treat data in the staging area, a pool of workers perform the required operations, calling memcpy in trivial cases and shifting to SIMD-optimized code for everything else. With a prefix, merge, and suffix split, the maximum liveliness of the commit (how susceptible it is to add more CPU capacity) is partitions_affected x number_of_columns x 3.

Optimizing O3 commits with SIMD

Because we aim to rely on memcpy the most, we benchmarked code that merges variable-length columns:

template<typename T>
inline void merge_copy_var_column(
index_t *merge_index,
int64_t merge_index_size,
int64_t *src_data_fix,
char *src_data_var,
int64_t *src_ooo_fix,
char *src_ooo_var,
int64_t *dst_fix,
char *dst_var,
int64_t dst_var_offset,
T mult
) {
int64_t *src_fix[] = {src_ooo_fix, src_data_fix};
char *src_var[] = {src_ooo_var, src_data_var};
for (int64_t l = 0; l < merge_index_size; l++) {
MM_PREFETCH_T0(merge_index + l + 64);
dst_fix[l] = dst_var_offset;
const uint64_t row = merge_index[l].i;
const uint32_t bit = (row >> 63);
const uint64_t rr = row & ~(1ull << 63);
const int64_t offset = src_fix[bit][rr];
char *src_var_ptr = src_var[bit] + offset;
auto len = *reinterpret_cast<T *>(src_var_ptr);
auto char_count = len > 0 ? len * mult : 0;
reinterpret_cast<T *>(dst_var + dst_var_offset)[0] = len;
__MEMCPY(dst_var + dst_var_offset + sizeof(T), src_var_ptr + sizeof(T), char_count);
dst_var_offset += char_count + sizeof(T);
}
}

with __MEMCPY as Angner Fog's Asmlib A_memcpy, in one instance and glibC's memcpy in the other.

Xeon 8275CL CPU @ 3.00GHz, AVX 512, 3.00GHz, 36608K cache, Amzn2 Linux. Units are microseconds/Mb, lower score is better.

And by comparison:

i7-3770, 3.40GHz, 8Mb cache, AVX, Ubuntu 20. Units are microseconds/Mb, lower score is better.

The key results from this comparison are:

  • glibc could be slow and inconsistent on AVX512 for our use case. We speculate that A_memcpy does better because it uses non-temporal copy instructions.
  • Windows memcpy is pretty bad.
  • A_memcpy and memcpy perform well on CPUs below AVX512.

A_memcpy uses non-temporal streaming instruction which appear to work well with the following simple loop:

template<typename T>
void set_memory_vanilla(T *addr, const T value, const int64_t count) {
for (int64_t i = 0; i < count; i++) {
addr[i] = value;
}
}

The above is a memory buffer filled with the same 64-bit pattern. It can be implemented as memset if all bytes are the same. It also can be written as vectorized code which uses platform-specific _mm??_stream_ps(p,?mm) in store_nt vector method, as seen below:

template<typename T, typename TVec>
inline void set_memory_vanilla(T *addr, const T value, const int64_t count) {
const auto l_iteration = [addr, value](int64_t i) {
addr[i] = value;
};
const TVec vec(value);
const auto l_bulk = [&vec, addr](const int64_t i) {
vec.store_nt(addr + i);
};
run_vec_bulk<T, TVec>(addr, count, l_iteration, l_bulk);
}

The results were quite surprising. Non-temporal SIMD instructions showed the most stable results with similar performance to memset.

Initialize buffer with same 64bit value

Unfortunately, benchmark results with other functions were less conclusive. Some perform better with hand-written SIMD and some just as fast with GCC's SSE4 generated code even when it is ran on AVX512 systems.

Hand-writing SIMD instructions is both time consuming and verbose. We ended up optimizing parts of the code base with SIMD only when the performance benefits outweighed code maintenance.

Commit Hysteresis to further improve performance

While being able to copy data fast is a strong option, we think that heavy data copying can be avoided in most time series ingestion scenarios. Assuming that most real-time out-of-order situations are caused by the delivery mechanism and hardware jitter, we can deduce that the timestamp distribution will be locally contained by some boundary.

For example, if any new timestamp value has a high probability to fall within 10 seconds of the previously received value, the boundary is then 10 seconds. We call this boundary O3 hysteresis.

When timestamp values follow this pattern, deferring the commit can effectively render O3 an append operation. We did not design QuestDB's O3 to deal with the hysteresis pattern only, but should your data conform to this pattern, it will be recognized and prioritized for the hot path.

Summary

To add O3 support, we went for a novel solution that yielded surprisingly good performance versus well-trodden approaches such as B-trees or LSM-based ingestion frameworks. We're happy to have shared the journey.

Like this kind of thing? Come talk to us on Slack or within our Discourse community!

RedditHackerNewsX
Subscribe to our newsletters for the latest. Secure and never shared or sold.