Spark is an analytics engine for large-scale data engineering. Despite its long history, it still has its well-deserved place in the big data landscape. QuestDB, on the other hand, is a time-series database with a very high data ingestion rate. This means that Spark desperately needs data, a lot of it! ...and QuestDB has it, a match made in heaven.
Of course, there is pandas for data analytics! The key here is the expression large-scale. Unlike pandas, Spark is a distributed system and can scale really well.
What does this mean exactly?
Let's take a look at how data is processed in Spark.
For the purposes of this article, we only need to know that a Spark job consists of multiple tasks, and each task works on a single data partition. Tasks are executed parallel in stages, distributed on the cluster. Stages have a dependency on the previous ones; tasks from different stages cannot run in parallel.
The schematic diagram below depicts an example job:
In this tutorial, we will load data from a QuestDB table into a Spark application and explore the inner working of Spark to refine data loading. Finally, we will modify and save the data back to QuestDB.
First thing first, we need to load time-series data from QuestDB. I will use an
trades, with just over 1.3 million rows.
It contains bitcoin trades, spanning over 3 days: not exactly a big data scenario but good enough to experiment.
The table contains the following columns:
|Column Name||Column Type|
The table is partitioned by day, and the
timestamp column serves as the
QuestDB accepts connections via Postgres wire protocol, so we can use JDBC to integrate. You can choose from various languages to create Spark applications, and here we will go for Python.
Create the script,
Believe it or not, this tiny application already reads data from the database when submitted as a Spark job.
The job prints the following row count:
And these are the first 3 rows of the filtered table:
sparktest.py speaks for itself, it is still worth mentioning that
this application has a dependency on the JDBC driver located in
postgresql-42.5.1.jar. It cannot run without this dependency; hence it has to
be submitted to Spark together with the application.
We have loaded data into Spark. Now we will look at how this was completed and some aspects to consider.
The easiest way to peek under the hood is to check QuestDB's log, which should tell us how Spark interacted with the database. We will also make use of the Spark UI, which displays useful insights of the execution, including stages and tasks.
QuestDB log shows that Spark connected three times to the database. For simplicity I only show the relevant lines in the log:
Spark first queried the database when we created the DataFrame, but as it turns
out, it was not too interested in the data itself. The query looked like this:
SELECT * FROM trades WHERE 1=0
The only thing Spark wanted to know was the schema of the table in order to create an empty DataFrame. Spark evaluates expressions lazily, and only does the bare minimum required at each step. After all, it is meant to analyze big data, so resources are incredibly precious for Spark. Especially memory: data is not cached by default.
The second connection happened when Spark counted the rows of the DataFrame. It
did not query the data this time, either. Interestingly, instead of pushing the
aggregation down to the database by running
SELECT count(*) FROM trades, it
just queried a
1 for each record:
SELECT 1 FROM trades. Spark adds the
together to get the actual count.
Working with the data itself eventually forced Spark to get a taste of the table's content too. Filters are pushed down to the database by default.
We can see that Spark's interaction with the database is rather sophisticated, and optimized to achieve good performance without wasting resources. The Spark DataFrame is the key component which takes care of the optimization, and it deserves some further analysis.
DataFrame sounds like a container to hold data, but we have seen it
earlier that this is not really true. So what is a Spark DataFrame then?
One way to look at Spark SQL, with the risk of oversimplifying it, is that it is
a query engine.
df.filter(predicate) is really just another way of saying
WHERE predicate. With this in mind, the DataFrame is pretty much a query, or
actually more like a query plan.
Most databases come with functionality to display query plans, and Spark has it too! Let's check the plan for the above DataFrame we just created:
If the DataFrame knows how to reproduce the data by remembering the execution plan, it does not need to store the actual data. This is precisely what we have seen earlier. Spark desperately tried not to load our data, but this can have disadvantages too.
Not caching the data radically reduces Spark's memory footprint, but there is a bit of jugglery here. Data does not have to be cached because the plan printed above can be executed again and again and again...
Now imagine how a mere decently-sized Spark cluster could make our lonely QuestDB instance suffer martyrdom.
With a massive table containing many partitions, Spark would generate a large number of tasks to be executed parallel across different nodes of the cluster. These tasks would query the table almost simultaneously, putting a heavy load on the database. So, if you find your colleagues cooking breakfast on your database servers, consider forcing Spark to cache some data to reduce the number of trips to the database.
This can be done by calling
df.cache(). In a large application, it might
require a bit of thinking about what is worth caching and how to ensure that
Spark executors have enough memory to store the data.
In practice, you should consider caching smallish datasets used frequently throughout the application's life.
Let's rerun our code with a tiny modification, adding
This time Spark hit the database only twice. First, it came for the schema, the
second time for the data:
SELECT "symbol","side","price","amount","timestamp" FROM trades.
Clearly, even a few carefully placed
.cache() calls can improve the overall
performance of an application, sometimes significantly. What else should we take
into consideration when thinking about performance?
Earlier, we mentioned that our Spark application consists of tasks, which are working on the different partitions of the data parallel. So, partitioned data mean parallelism, which results in better performance.
Now we turn to the Spark UI.
It tells us that the job was done in a single task:
The truth is that we have already suspected this. The execution plan told us
numPartitions=1) and we did not see any parallelism in the QuestDB logs
either. We can display more details about this partition with a bit of
The UI helps us confirm that the data is loaded as a single partition. QuestDB stores this data in 3 partitions. We should try to fix this.
Although it is not recommended, we can try to use
This call reshuffles data across the cluster while partitioning the data, so it
should be our last resort. After running
df.repartition(3, df.timestamp), we
see 3 partitions, but not exactly the way we expected. The partitions overlap
with one another:
It seems that
DataFrame.repartition() used hashes to distribute the rows
across the 3 partitions. This would mean that all 3 tasks would require data
from all 3 QuestDB partitions.
Let's try this instead:
This looks better but still not ideal. That is because
DaraFrame.repartitionByRange() samples the dataset and then estimates the
borders of the partitions.
What we really want is for the DataFrame partitions to match exactly the partitions we see in QuestDB. This way, the tasks running parallel in Spark do not cross their way in QuestDB, likely to result in better performance.
Data source options are to the rescue! Let's try the following:
upperBound, the situation is much better: the row counts in the partitions
match what we have seen in the QuestDB logs earlier:
rowCount=377783. Looks like we did it!
We can check Spark UI again; it also confirms that the job was completed in 3 separate tasks, each of them working on a single partition.
Sometimes it might be tricky to know the minimum and maximum timestamps when creating the DataFrame. In the worst case, you could query the database for those values via an ordinary connection.
We have managed to replicate our QuestDB partitions in Spark, but data does not always come from a single table. What if the data required is the result of a query? Can we load that, and is it possible to partition it?
We can use the
query option to load data from QuestDB with the help of a SQL
Depending on the amount of data and the actual query, you might find that pushing the aggregations to QuestDB is faster than completing it in Spark. Spark definitely has an edge when the dataset is really large.
Now let's try partitioning this DataFrame with the options used before with the
dbtable. Unfortunately, we will get an error message:
However, we can trick Spark by just giving the query an alias name. This means
we can go back to using the
dbtable option again, which lets us specify
partitioning. See the example below:
Looking good. Now it seems that we can load any data from QuestDB into Spark by passing a SQL query to the DataFrame. Do we, really?
trades table is limited to three data types only. What about all the other
types you can find in QuestDB?
We expect that Spark will successfully map a
double or a
queried from the database, but what about a
geohash? It is not that obvious
what is going to happen.
As always, when unsure, we should test.
I have another table in the database with a different schema. This table has a column for each type currently available in QuestDB.
long128 is not fully supported by QuestDB yet, so it is commented out.
Let's try to load and print the data; we can also take a look at the schema of the DataFrame:
Much to my surprise, Spark managed to create the DataFrame and mapped all types.
Here is the schema:
It looks pretty good but you might wonder if it is a good idea to map
geohash types to
String. QuestDB does not provide arithmetics for these
types, so it is not a big deal.
Geohashes are basically 32-base numbers, represented and stored in their string
format. The 256-bit long values are also treated as string literals.
is used to store cryptocurrency private keys.
Now let's see the data:
It also looks good, but we could omit the
00:00:00 from the end of the date
field. We can see that it is mapped to
Timestamp and not
We could also try to map one of the numeric fields to
Decimal. This can be
useful if later we want to do computations that require high precision.
We can use the
customSchema option to customize the column types. Our modified
The new schema:
And the data is displayed as:
It seems that Spark can handle almost all database types. The only issue is
long128, but this type is a work in progress currently in QuestDB. When
completed, it will be mapped as
String, just like
There is only one thing left: writing data back into QuestDB.
In this example, first, we will load some data from the database and add two new features:
- 10-minute moving average
- standard deviation, also calculated over the last 10-minute window
Then we will try to save the modified DataFrame back into QuestDB as a new
table. We need to take care of some type mappings as
DOUBLE columns are sent
FLOAT8 to QuestDB by default, so we end up with this code:
All works but… we soon realize that our new table,
trades_enriched is not
partitioned and does not have a designated timestamp, which is not ideal.
Obviously Spark has no idea of QuestDB specifics.
It would work better if we created the table upfront and Spark only saved the data into it. We drop the table and re-create it; this time, it is partitioned and has a designated timestamp:
The table is empty and waiting for the data.
We rerun the code; all works, no complaints. The data is in the table, and it is partitioned.
One aspect of writing data into the database is if we are allowed to create duplicates. What if I try to rerun the code again without dropping the table? Will Spark let me save the data this time? No, we get an error:
The last part of the error message looks interesting:
SaveMode? It turns out we can configure what should happen if the
table already exists. Our options are:
errorifexists: the default behavior is to return an error if the table already exists, Spark is playing safe here
append: data will be appended to the existing rows already present in the table
overwrite: the content of the table will be replaced entirely by the newly saved data
ignore: if the table is not empty, our save operation gets ignored without any error
We have already seen how
ignore seem to
be simple enough just to work.
overwrite is not straightforward. The content of the table must be
cleared before the new data can be saved. Spark will delete and re-create the
table by default, which means losing partitioning and the designated timestamp.
In general, we do not want Spark to create tables for us. Luckily, with the
truncate option we can tell Spark to use
TRUNCATE to clear the table instead
of deleting it:
The above works as expected.
Our ride might seem bumpy, but we finally have everything working. Our new motto should be "There is a config option for everything!".
To summarize what we have found:
- We can use Spark's JDBC data source to integrate with QuestDB.
- It is recommended to use the
dbtableoption, even if we use a SQL query to load data.
- Always try to specify partitioning options (
upperBound) when loading data, partitions ideally should match with the QuestDB partitions.
- Sometimes it makes sense to cache some data in Spark to reduce the number of trips to the database.
- It can be beneficial to push work down into the database, depending on the
task and how much data is involved. It makes sense to make use of QuestDB's
time-series-specific features, such as
SAMPLE BY, instead of trying to rewrite it in Spark.
- Type mappings can be customized via the
customSchemaoption when loading data.
- When writing data into QuestDB always specify the desired saving mode.
- Generally works better if you create the table upfront and do not let Spark create it, because this way you can add partitioning and designated timestamp.
- If selected the
overwritesaving mode, you should enable the
truncateoption too to make sure Spark does not delete the table; hence partitioning and the designated timestamp will not get lost.
- Type mappings can be customized via the
createTableColumnTypesoption when saving data.
I mentioned only the config options which are most likely to be tweaked when working with QuestDB; the complete set of options can be found here: Spark data source options.
Overall everything works, but it would be nice to have a much more seamless way
of integration, where partitioning would be taken care of automagically. Some
type mappings could use better defaults, too, when saving data into QuestDB. The
overwrite saving mode could default to use
More seamless integration is not impossible to achieve. If QuestDB provided its
JDBCDialect implementation for Spark, the above nuances could be handled.
We should probably consider adding this.
Finally, there is one more thing we did not mention yet, data locality. That is because, currently QuestDB cannot run as a cluster. However, we are actively working on a solution - check out The Inner Workings of Distributed Databases for more information.
When the time comes, we should ensure that data locality is also considered. Ideally, each Spark node would work on tasks that require partitions loaded from the local (or closest) QuestDB instance.
However, this is not something we should be concerned about at this moment... for now, just enjoy data crunching!