Scaling a trading bot with a time-series database
We recently came across an interesting article by Marc van Duyn on “How to build a trading bot in 5 steps”, which walks through building a trading bot using an open-source Python library called the investing-algorithm-framework.
This framework comes with a suite of nice features such as order execution and tracking, broker and exchange connection, convenience methods for backtesting, as well as fast deployment options for various cloud providers. Out of the box, the investing algorithm framework works with a SQLite database or an in-memory db for data persistence.
This inspired us to explore how trading bots can scale effectively using a modern time-series database like QuestDB, especially for handling growing volumes of tick data. We got to thinking about how this proof-of-concept bot can be scaled up for an actual, production use case.
So we dove into this specific implementation and also tried a few other popular solutions to come up with a hypothetical exercise in scaling a trading bot with our next-generation time-series database. Why trading bots? As the volume of market data grows and trading strategies become more complex, scaling trading bots to handle high-frequency, multi-market scenarios has become essential.
Testing out Awesome Quant Examples
Before diving into our solution, we wanted to first check out some of the other popular libraries and frameworks linked in the original article. The Awesome Quant Github page aggregates a bunch of libraries and packages used for various parts of building a trading bot from data storage, model building, trading, and backtesting.
We didn’t go through each example in detail, but after reviewing some of the more popular packages, we bucketed them into these loose categories:
-
Fully managed solutions (Octo Cloud, Blankly Finance): these products were geared towards abstracting away the underlying framework and getting you up and running with a trading bot quickly. A lot of the functionality is wrapped in convenience functions.
-
Flexible Python frameworks (Quant Connect): on the other end were more low-level libraries and frameworks to allow the user to build the model in a modular manner, but required more user input to get to a working product.
-
Specialized frameworks dealing with a specific task (backtrader): there were also quite a few libraries and packages solely focused on a specific task like backtesting or portfolio analytics that works with other popular frameworks like
pandas
orscikit-learn
and left the implementation of a fully-working trading bot to the user.
Leaving aside the fully-managed solutions where the focus is on convenience and time to market, a lot of the demos and documentations for other frameworks are great for a quickstart tutorial but do not mention how to productionize and scale to a use case beyond personal or paper trading use case.
Looking at a lot of these examples, data sources are loaded on demand to train the algorithm and then run a backtest. While this is sufficient for one-off computation or for algorithms that run at a slower interval, for a more high-frequency use case or scaling up to multiple markets, we will need to separate loading the market data so that datasets can be leveraged by multiple consumers and not incur the cost of initialization from an external data source (either an API call or CSV loading).
It might look something like this:
Looks good, but there are limits. To address them and support scalable, production-ready use cases, we turn to QuestDB as a centralized, high-performance data backbone.
Scaling up with QuestDB
With QuestDB, we can have a separate data loading pipeline to ingest market data, and then on the trading bot side load that data in memory for buy/sell activities:
The advantages here are:
-
Ability to handle proprietary datasets beyond what’s readily available via an API (e.g., maybe you’re dealing with energy markets where you need to grab CSV data via SFTP)
-
Ability to preprocess the data into a format that is easily consumable by the trading algorithm (e.g., OHLV format)
-
Decoupling data storage builds fault-tolerant architectures, such as seamlessly switching to Exchange B if Exchange A becomes unavailable
-
Being able to visualize data for other use cases (e.g., accounting or compliance)
-
Scaling data to multiple strategies with a single source of truth
-
Easier to do historical analysis
-
Modularize backtesting and new trading algorithm deployment
The best part is that with an optimized database storing our data, we don’t take any performance hit. And we can still enjoy compatibility with popular data types like pandas, Parquet, and Apache Arrow that work out of the box with these Python libraries.
That's the benefit of a specialized database. This modular approach decouples data ingestion, storage, and consumption, making it easier to debug, extend, and optimize each component.
With QuestDB, we gain a unified source of truth, compatibility with popular formats like Pandas and Polars, and the ability to scale seamlessly to handle complex, high-frequency workloads.
Integrating with QuestDB
Getting back to it, how do we separate out the market data ingestion with running our algorithm? How do we define and construct the Data Loading Pipeline that feeds into QuestDB?
To help us, we'll consider the original Investing Algorithm Framework. We could choose any libraries for the above Awesome Quant list, but we'll use this as a reference.
The Investing Algorithm Framework is a comprehensive Python library designed for the rapid development of trading bots. It has the goods built-in, like data provisioning, portfolio management, order execution, and backtesting. It's a solid starting point for building a trading bot.
By default, the framework passes OHLCV data as a Polars dataframe in the form of <identifier>:<dataframe>
. So as long as we are able to pass in our market data in that format, we can get this working.
Building our data loading pipeline
Let's walk through a few approaches for building this pipeline, starting with data extraction.
Option 1: Use ccxt and convert to Polars
Underneath the hood, the Investing Algorithm Framework leverages the ccxt library to grab data from various crypto exchanges. We can pick any of the exchanges that support OHLCV data, and we can turn the Python dictionaries into a Polars dataframe. The Investing Algorithm Framework uses Polars out of the box for its speed and efficiency, especially in high-frequency trading scenarios.
For simplicity, we chose Bitmex as it doesn’t require an API key:
import timeimport polars as plimport ccxt # noqa: E402bitmex = ccxt.bitmex()# Parameterssymbol = 'BTC/USD:BTC'timeframe = '1m'limit = 100params = {'partial': False} # Disable reversal for complete datawhile True:# Calculate the start time for fetching OHLCV datasince = bitmex.milliseconds() - limit * 60 * 1000# Fetch OHLCV candles from Bitmexcandles = bitmex.fetch_ohlcv(symbol, timeframe, since, limit, params)# Convert to Polars DataFramedf_polars = pl.DataFrame(candles, schema=['timestamp', 'open', 'high', 'low', 'close', 'volume'])# Convert timestamp to datetime in millisecondsdf_polars = df_polars.with_column(pl.col("timestamp").cast(pl.Datetime).dt.with_time_unit("ms"))print(df_polars)# Delay to respect rate limits and prevent API overloadtime.sleep(bitmex.rateLimit / 1000)
We can then use the QuestDB Python Client to send that dataframe to be ingested.
We assume that QuestDB is running on default
localhost:9000
Follow the quickstart guide if you need a refresher!
import pandas as pdfrom questdb.ingress import Senderconf = f'http::addr=localhost:9000;'with Sender.from_conf(conf) as sender:sender.dataframe(df, table_name='bitmex', at=TimestampNanos.now())
Option 2: Ingest csv data
Another popular way you might get OHLCV data is via CSV.
This might be through some market data vendor or even ccxt.
Let’s take the example from ccxt website for Binance:
# You'll require an API key!import csvimport ccxt # noqa: E402def retry_fetch_ohlcv(exchange, max_retries, symbol, timeframe, since, limit):num_retries = 0try:num_retries += 1ohlcv = exchange.fetch_ohlcv(symbol, timeframe, since, limit)# print('Fetched', len(ohlcv), symbol, 'candles from', exchange.iso8601 (ohlcv[0][0]), 'to', exchange.iso8601 (ohlcv[-1][0]))return ohlcvexcept Exception:if num_retries > max_retries:raise # Exception('Failed to fetch', timeframe, symbol, 'OHLCV in', max_retries, 'attempts')def scrape_ohlcv(exchange, max_retries, symbol, timeframe, since, limit):timeframe_duration_in_seconds = exchange.parse_timeframe(timeframe)timeframe_duration_in_ms = timeframe_duration_in_seconds * 1000timedelta = limit * timeframe_duration_in_msnow = exchange.milliseconds()all_ohlcv = []fetch_since = sincewhile fetch_since < now:ohlcv = retry_fetch_ohlcv(exchange, max_retries, symbol, timeframe, fetch_since, limit)fetch_since = (ohlcv[-1][0] + 1) if len(ohlcv) else (fetch_since + timedelta)all_ohlcv = all_ohlcv + ohlcvif len(all_ohlcv):print(len(all_ohlcv), 'candles in total from', exchange.iso8601(all_ohlcv[0][0]), 'to', exchange.iso8601(all_ohlcv[-1][0]))else:print(len(all_ohlcv), 'candles in total from', exchange.iso8601(fetch_since))return exchange.filter_by_since_limit(all_ohlcv, since, None, key=0)def write_to_csv(filename, data):with open(filename, mode='w') as output_file:csv_writer = csv.writer(output_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)csv_writer.writerows(data)def scrape_candles_to_csv(filename, exchange_id, max_retries, symbol, timeframe, since, limit):# instantiate the exchange by idexchange = getattr(ccxt, exchange_id)()# authenticateexchange = ccxt.binance({'apiKey': 'api_key','secret': 'api_secret',})# convert since from string to milliseconds integer if neededif isinstance(since, str):since = exchange.parse8601(since)# preload all markets from the exchangeexchange.load_markets()# fetch all candlesohlcv = scrape_ohlcv(exchange, max_retries, symbol, timeframe, since, limit)# save them to csv filewrite_to_csv(filename, ohlcv)print('Saved', len(ohlcv), 'candles from', exchange.iso8601(ohlcv[0][0]), 'to', exchange.iso8601(ohlcv[-1][0]), 'to', filename)# Binance's BTC/USDT candles start on 2017-08-17scrape_candles_to_csv('binance.csv', 'binance', 3, 'BTC/USDT', '1m', '2017-08-17T00:00:00Z', 100)
Now that you have a CSV file called binance.csv
we can use the REST API to ingest it into QuestDB:
import sysimport requestscsv = {'data': ('binance', open('./binance.csv', 'r'))}host = 'http://localhost:9000'try:response \= requests.post(host \+ '/imp', files\=csv)print(response.text)except requests.exceptions.RequestException as e:print(f'Error: {e}', file\=sys.stderr)
Option 3: CDC via Kafka
Alternatively, you can set up a separate ingestion pipeline using a separate library like Cryptofeed. Using Kafka streams for real-time data updates allows low-latency trading strategies to leverage fresh market insights. Cryptofeed doesn’t have a OHLCV endpoint directly, but you can stream trade data from an exchange and aggregate them into candle data:
from cryptofeed import FeedHandlerfrom cryptofeed.defines import TRADESfrom cryptofeed.exchanges import Binancefrom collections import dequefrom datetime import datetime, timedelta# Buffer for storing trade data within a rolling time windowtrades_window = deque()def aggregate_ohlcv(trades):prices = [trade['price'] for trade in trades]volumes = [trade['amount'] for trade in trades]ohlcv = {'open': prices[0],'high': max(prices),'low': min(prices),'close': prices[-1],'volume': sum(volumes),}return ohlcvdef trade_callback(feed, symbol, order_id, timestamp, side, amount, price, receipt_timestamp):trade = {'timestamp': timestamp,'price': price,'amount': amount,}trades_window.append(trade)# Aggregate OHLCV every minute (you can adjust the timeframe as needed)now = datetime.utcnow()one_min_ago = now - timedelta(minutes=1)# Keep only trades in the last minutewhile trades_window and trades_window[0]['timestamp'] < one_min_ago.timestamp():trades_window.popleft()if len(trades_window) > 0:ohlcv = aggregate_ohlcv(trades_window)print(f"OHLCV for the last minute: {ohlcv}")# Feed handler to subscribe to trade streamsfh = FeedHandler()fh.add_feed(Binance(channels=[TRADES], symbols=['BTC-USDT'], callbacks={TRADES: trade_callback}))# Start feed handler to collect datafh.run()
We can then use the CDC method explained in this blog post to stream that into QuestDB.
Loading data into trading bot
Now that we have our OHLCV data in QuestDB, loading it into memory for our trading bot to use is just as simple.
Assuming we have the same data structure, we can connect to QuestDB using psycopg2 and turn the SQL result into polars dataframe format:
import psycopg2import polars as pl# Connect to QuestDB using PostgreSQL wire protocolconn = psycopg2.connect(dbname='qdb',user='admin',password='quest',host='localhost',port='8812')# Create a cursor object to execute SQL queriescur = conn.cursor()# Query data from QuestDBohlcv_query = """SELECT timestamp, open, high, low, close, volume FROM binance"""cur.execute(ohlcv_query)rows = cur.fetchall()# Convert rows into a Polars DataFramedf_polars = pl.DataFrame(rows,schema=["Datetime", "Open", "High", "Low", "Close", "Volume"])# Close the cursor and connectioncur.close()conn.close()
Given that CCXTOHLCVMarketDataSource
simply returns a polars dataframe, we can set the result of our sample query above (df_polars
) and plug it directly into the app.add_market_data_source()
function. After a couple more steps, we can run the trading bot as is and scale it up by adding more data sources. In other words, this process can be extended to multiple data sources, enabling a unified pipeline for various markets.
Using CCXTOHLCVMarketDataSource
The Investing Algorithm Framework uses app.add_market_data_source()
to register market data sources. By creating a CCXTOHLCVMarketDataSource
instance, we can wrap our polars DataFrame and register it:
from investing_algorithm_framework import CCXTOHLCVMarketDataSource# Create a market data source for the fetched OHLCV dataohlcv_market_data = CCXTOHLCVMarketDataSource(identifier="binance-ohlcv",market="BINANCE",symbol="BTC/USDT",data=df_polars)# Register the market data source with the appapp.add_market_data_source(ohlcv_market_data)
This step ensures that the OHLCV data from QuestDB is now registered andaccessible to the trading bot using the identifier "binance-ohlcv"
.
Prepare a trading strategy
Once the data is registered, it becomes available to trading strategies. Based around the Investing Algorithm Framework documentation, here is how a trading strategy might access and use the data:
from investing_algorithm_framework import TradingStrategy, Algorithm, TimeUnitclass MyTradingStrategy(TradingStrategy):time_unit = TimeUnit.SECOND # The time unit for strategy executioninterval = 10 # Run every 10 secondsmarket_data_sources = ["binance-ohlcv"] # Use the registered market data sourcedef apply_strategy(self, algorithm: Algorithm, data: dict):# Access the OHLCV data from the data objectohlcv_data = data["binance-ohlcv"]print("Received OHLCV data:", ohlcv_data)
After that, we register the strategy with the algorithm:
app.add_trading_strategy(MyTradingStrategy())
And run it!
app.run()
Wrapping up
In this tutorial, we went through a hypothetical exercise of scaling up a demo trading bot by separating it into two services: one for loading the market data into a database and the trading bot that uses that data to make decisions. While separating responsibilities may initially seem complex, once we start to deal with more markets or market data which spans more than what can be held in memory, this architecture can handle the load and scale with new use cases accordingly.
QuestDB serves as the backbone for modern trading systems, offering high-speed ingestion, time-series optimization, and compatibility with Python libraries for seamless integration.
Whether you’re looking for an open-source solution for production workloads or need enterprise-grade capabilities, QuestDB is designed to deliver both speed and flexibility.
To learn more, check out our GitHub repository, try the demo, or join the conversation on Slack.