Skip to main content

nautilus_backtest/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a [`BacktestNode`] that orchestrates catalog-driven backtests.
17
18use std::iter::Peekable;
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_core::UnixNanos;
22use nautilus_model::{
23    data::{
24        Bar, Data, HasTsInit, IndexPriceUpdate, InstrumentClose, InstrumentStatus, MarkPriceUpdate,
25        OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
26    },
27    enums::{BookType, OtoTriggerMode},
28    identifiers::{InstrumentId, Venue},
29    instruments::Instrument,
30    types::Money,
31};
32use nautilus_persistence::backend::{catalog::ParquetDataCatalog, session::QueryResult};
33
34use crate::{
35    config::{BacktestDataConfig, BacktestRunConfig, NautilusDataType, SimulatedVenueConfig},
36    engine::BacktestEngine,
37    result::BacktestResult,
38};
39
40/// Orchestrates catalog-driven backtests from run configurations.
41///
42/// `BacktestNode` connects the [`ParquetDataCatalog`] with [`BacktestEngine`] to load
43/// historical data and run backtests. Supports both oneshot and streaming modes.
44#[derive(Debug)]
45#[cfg_attr(
46    feature = "python",
47    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.backtest", unsendable)
48)]
49#[cfg_attr(
50    feature = "python",
51    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.backtest")
52)]
53pub struct BacktestNode {
54    configs: Vec<BacktestRunConfig>,
55    engines: AHashMap<String, BacktestEngine>,
56}
57
58impl BacktestNode {
59    /// Creates a new [`BacktestNode`] instance.
60    ///
61    /// Validates that configs are non-empty and internally consistent:
62    /// - All data config instrument venues must have a matching venue config.
63    /// - L2/L3 book types require order book data in the data configs.
64    /// - Data config time ranges must be valid (start <= end).
65    ///
66    /// # Errors
67    ///
68    /// Returns an error if `configs` is empty or validation fails.
69    pub fn new(configs: Vec<BacktestRunConfig>) -> anyhow::Result<Self> {
70        anyhow::ensure!(!configs.is_empty(), "At least one run config is required");
71        validate_configs(&configs)?;
72        Ok(Self {
73            configs,
74            engines: AHashMap::new(),
75        })
76    }
77
78    /// Returns the run configurations.
79    #[must_use]
80    pub fn configs(&self) -> &[BacktestRunConfig] {
81        &self.configs
82    }
83
84    /// Builds backtest engines from the run configurations.
85    ///
86    /// For each config, creates a [`BacktestEngine`], adds venues, and loads
87    /// instruments from the catalog.
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if engine creation, venue setup, or instrument loading fails.
92    pub fn build(&mut self) -> anyhow::Result<()> {
93        for config in &self.configs {
94            if self.engines.contains_key(config.id()) {
95                continue;
96            }
97
98            let engine_config = config.engine().clone();
99            let mut engine = BacktestEngine::new(engine_config)?;
100
101            for venue_config in config.venues() {
102                let starting_balances: Vec<Money> = venue_config
103                    .starting_balances()
104                    .iter()
105                    .map(|s| s.parse::<Money>())
106                    .collect::<Result<Vec<_>, _>>()
107                    .map_err(|e| anyhow::anyhow!("Invalid starting balance: {e}"))?;
108
109                let default_leverage = venue_config.default_leverage();
110                let leverages = venue_config.leverages().cloned().unwrap_or_default();
111                let margin_model = venue_config.margin_model().cloned();
112                let modules = venue_config
113                    .modules()
114                    .iter()
115                    .cloned()
116                    .map(Into::into)
117                    .collect();
118                let fill_model = venue_config.fill_model().cloned().unwrap_or_default();
119                let fee_model = venue_config.fee_model().cloned().unwrap_or_default();
120                let latency_model = venue_config.latency_model().cloned().map(Into::into);
121                let sim_config = SimulatedVenueConfig::builder()
122                    .venue(Venue::from(venue_config.name().as_str()))
123                    .oms_type(venue_config.oms_type())
124                    .account_type(venue_config.account_type())
125                    .book_type(venue_config.book_type())
126                    .starting_balances(starting_balances)
127                    .maybe_base_currency(venue_config.base_currency())
128                    .default_leverage(default_leverage)
129                    .leverages(leverages)
130                    .maybe_margin_model(margin_model)
131                    .modules(modules)
132                    .fill_model(fill_model)
133                    .fee_model(fee_model)
134                    .maybe_latency_model(latency_model)
135                    .routing(venue_config.routing())
136                    .reject_stop_orders(venue_config.reject_stop_orders())
137                    .support_gtd_orders(venue_config.support_gtd_orders())
138                    .support_contingent_orders(venue_config.support_contingent_orders())
139                    .use_position_ids(venue_config.use_position_ids())
140                    .use_random_ids(venue_config.use_random_ids())
141                    .use_reduce_only(venue_config.use_reduce_only())
142                    .use_market_order_acks(venue_config.use_market_order_acks())
143                    .bar_execution(venue_config.bar_execution())
144                    .bar_adaptive_high_low_ordering(venue_config.bar_adaptive_high_low_ordering())
145                    .trade_execution(venue_config.trade_execution())
146                    .liquidity_consumption(venue_config.liquidity_consumption())
147                    .allow_cash_borrowing(venue_config.allow_cash_borrowing())
148                    .frozen_account(venue_config.frozen_account())
149                    .queue_position(venue_config.queue_position())
150                    .oto_full_trigger(venue_config.oto_trigger_mode() == OtoTriggerMode::Full)
151                    .price_protection_points(venue_config.price_protection_points())
152                    .build();
153                engine.add_venue(sim_config)?;
154            }
155
156            for data_config in config.data() {
157                let catalog = create_catalog(data_config)?;
158                let instr_ids: Vec<InstrumentId> = data_config.get_instrument_ids()?;
159                let filter: Option<Vec<String>> = if instr_ids.is_empty() {
160                    None
161                } else {
162                    Some(instr_ids.iter().map(ToString::to_string).collect())
163                };
164
165                let instruments = catalog.query_instruments(filter.as_deref())?;
166
167                if !instr_ids.is_empty() && instruments.is_empty() {
168                    let ids: Vec<String> = instr_ids.iter().map(ToString::to_string).collect();
169                    anyhow::bail!(
170                        "No instruments found in catalog for requested IDs: [{}]",
171                        ids.join(", ")
172                    );
173                }
174
175                for instrument in instruments {
176                    engine.add_instrument(&instrument)?;
177                }
178            }
179
180            for venue_config in config.venues() {
181                let Some(settlement_prices) = venue_config.settlement_prices() else {
182                    continue;
183                };
184                let venue = Venue::from(venue_config.name().as_str());
185
186                for (instrument_id, raw_price) in settlement_prices {
187                    let price = {
188                        let cache = engine.kernel().cache.borrow();
189                        let instrument = cache.instrument(instrument_id).ok_or_else(|| {
190                            anyhow::anyhow!(
191                                "No instrument found for settlement price configuration: {instrument_id}"
192                            )
193                        })?;
194                        instrument.make_price(*raw_price)
195                    };
196                    engine.set_settlement_price(venue, *instrument_id, price)?;
197                }
198            }
199
200            self.engines.insert(config.id().to_string(), engine);
201        }
202
203        Ok(())
204    }
205
206    /// Returns a mutable reference to the engine for the given run config ID.
207    #[must_use]
208    pub fn get_engine_mut(&mut self, id: &str) -> Option<&mut BacktestEngine> {
209        self.engines.get_mut(id)
210    }
211
212    /// Returns a reference to the engine for the given run config ID.
213    #[must_use]
214    pub fn get_engine(&self, id: &str) -> Option<&BacktestEngine> {
215        self.engines.get(id)
216    }
217
218    /// Returns all created backtest engines.
219    #[must_use]
220    pub fn get_engines(&self) -> Vec<&BacktestEngine> {
221        self.engines.values().collect()
222    }
223
224    /// Runs all configured backtests and returns results.
225    ///
226    /// Automatically calls [`build()`](Self::build) if engines have not been created yet.
227    /// For each run config, loads data from the catalog and runs the engine.
228    /// Supports both oneshot (`chunk_size = None`) and streaming modes.
229    ///
230    /// # Errors
231    ///
232    /// Returns an error if building, data loading, or engine execution fails.
233    pub fn run(&mut self) -> anyhow::Result<Vec<BacktestResult>> {
234        // Auto-build if not already done
235        if self.engines.is_empty() {
236            self.build()?;
237        }
238
239        let mut results = Vec::new();
240
241        for config in &self.configs {
242            let engine = self.engines.get_mut(config.id()).ok_or_else(|| {
243                anyhow::anyhow!(
244                    "Engine not found for config '{}'. Call build() first.",
245                    config.id()
246                )
247            })?;
248
249            match config.chunk_size() {
250                None => run_oneshot(engine, config)?,
251                Some(chunk_size) => {
252                    anyhow::ensure!(chunk_size > 0, "chunk_size must be > 0");
253                    run_streaming(engine, config, chunk_size)?;
254                }
255            }
256
257            results.push(engine.get_result());
258
259            if config.dispose_on_completion() {
260                engine.dispose();
261            } else {
262                engine.clear_data();
263            }
264        }
265
266        Ok(results)
267    }
268
269    /// Creates a [`ParquetDataCatalog`] from a data config.
270    ///
271    /// # Errors
272    ///
273    /// Returns an error if the catalog cannot be created from the URI.
274    pub fn load_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
275        create_catalog(config)
276    }
277
278    /// Loads data from the catalog for a specific data config.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if catalog creation or data querying fails.
283    pub fn load_data_config(
284        config: &BacktestDataConfig,
285        start: Option<UnixNanos>,
286        end: Option<UnixNanos>,
287    ) -> anyhow::Result<Vec<Data>> {
288        load_data(config, start, end)
289    }
290
291    /// Disposes all engines and releases resources.
292    pub fn dispose(&mut self) {
293        for engine in self.engines.values_mut() {
294            engine.dispose();
295        }
296        self.engines.clear();
297    }
298}
299
300fn validate_configs(configs: &[BacktestRunConfig]) -> anyhow::Result<()> {
301    // Kernel initialization sets a thread-local MessageBus that can only be
302    // initialized once per thread, so multiple engines cannot coexist
303    anyhow::ensure!(
304        configs.len() <= 1,
305        "Only one run config per BacktestNode is supported \
306         (kernel MessageBus is a thread-local singleton)"
307    );
308
309    let mut seen_ids = AHashSet::new();
310
311    for config in configs {
312        anyhow::ensure!(
313            seen_ids.insert(config.id()),
314            "Duplicate run config ID '{}'",
315            config.id()
316        );
317
318        let venue_names: Vec<String> = config
319            .venues()
320            .iter()
321            .map(|v| v.name().to_string())
322            .collect();
323
324        for data_config in config.data() {
325            if let (Some(start), Some(end)) = (data_config.start_time(), data_config.end_time()) {
326                anyhow::ensure!(
327                    start <= end,
328                    "Data config start_time ({start}) must be <= end_time ({end})"
329                );
330            }
331
332            for instrument_id in data_config.get_instrument_ids()? {
333                let venue = instrument_id.venue.to_string();
334                anyhow::ensure!(
335                    venue_names.contains(&venue),
336                    "No venue config found for venue '{venue}' (required by instrument {instrument_id})"
337                );
338            }
339        }
340
341        for venue_config in config.venues() {
342            let needs_book_data = matches!(
343                venue_config.book_type(),
344                BookType::L2_MBP | BookType::L3_MBO
345            );
346
347            if needs_book_data {
348                let venue_name = venue_config.name().to_string();
349                let has_book_data = config.data().iter().any(|dc| {
350                    let is_book_type = matches!(
351                        dc.data_type(),
352                        NautilusDataType::OrderBookDelta | NautilusDataType::OrderBookDepth10
353                    );
354
355                    if !is_book_type {
356                        return false;
357                    }
358
359                    // Unfiltered config (no instrument filter) covers all venues
360                    let ids = dc.get_instrument_ids().unwrap_or_default();
361                    ids.is_empty() || ids.iter().any(|id| id.venue.to_string() == venue_name)
362                });
363                anyhow::ensure!(
364                    has_book_data,
365                    "Venue '{venue_name}' has book_type {:?} but no order book data configured",
366                    venue_config.book_type()
367                );
368            }
369        }
370    }
371    Ok(())
372}
373
374fn run_oneshot(engine: &mut BacktestEngine, config: &BacktestRunConfig) -> anyhow::Result<()> {
375    for data_config in config.data() {
376        let data = load_data(data_config, config.start(), config.end())?;
377        if data.is_empty() {
378            log::warn!("No data found for config: {:?}", data_config.data_type());
379            continue;
380        }
381        engine.add_data(data, data_config.client_id(), false, false)?;
382    }
383
384    engine.sort_data();
385    engine.run(
386        config.start(),
387        config.end(),
388        Some(config.id().to_string()),
389        false,
390    )
391}
392
393fn run_streaming(
394    engine: &mut BacktestEngine,
395    config: &BacktestRunConfig,
396    chunk_size: usize,
397) -> anyhow::Result<()> {
398    let data_configs = config.data();
399
400    if data_configs.len() == 1 {
401        // Single config: stream directly from catalog iterator without
402        // materializing the full dataset, bounded by chunk_size
403        let data_config = &data_configs[0];
404        let mut catalog = create_catalog(data_config)?;
405        let result = dispatch_query(&mut catalog, data_config, config.start(), config.end())?;
406        stream_chunks(engine, config, result.peekable(), chunk_size)?;
407    } else {
408        // Multiple configs require loading all data to merge-sort across types
409        let all_data = load_and_merge_data(config)?;
410        stream_chunks(engine, config, all_data.into_iter().peekable(), chunk_size)?;
411    }
412
413    Ok(())
414}
415
416// Feeds data from an iterator to the engine in timestamp-aligned chunks.
417// Each chunk contains up to `chunk_size` events, extended to include all
418// events sharing the boundary timestamp so timers flush correctly.
419fn stream_chunks<I: Iterator<Item = Data>>(
420    engine: &mut BacktestEngine,
421    config: &BacktestRunConfig,
422    mut iter: Peekable<I>,
423    chunk_size: usize,
424) -> anyhow::Result<()> {
425    if iter.peek().is_none() {
426        engine.end();
427        return Ok(());
428    }
429
430    let mut next_start = config.start();
431
432    loop {
433        let chunk = take_aligned_chunk(&mut iter, chunk_size);
434        if chunk.is_empty() {
435            break;
436        }
437
438        let is_last = iter.peek().is_none();
439        let end = if is_last {
440            config.end()
441        } else {
442            chunk.last().map(HasTsInit::ts_init)
443        };
444
445        engine.add_data(chunk, None, false, true)?;
446        engine.run(next_start, end, Some(config.id().to_string()), true)?;
447        engine.clear_data();
448
449        // A shutdown request during the chunk already triggered end() inside
450        // engine.run(); stop loading further chunks so later data is not processed
451        if engine.kernel().is_shutdown_requested() {
452            return Ok(());
453        }
454
455        // Carry forward the end timestamp so the next chunk's run_impl
456        // sets clocks contiguously and processes gap timers correctly
457        next_start = end;
458    }
459
460    engine.end();
461    Ok(())
462}
463
464// Takes up to `chunk_size` items, then extends to include all remaining
465// items sharing the boundary timestamp to avoid splitting same-ts events.
466fn take_aligned_chunk<I: Iterator<Item = Data>>(
467    iter: &mut Peekable<I>,
468    chunk_size: usize,
469) -> Vec<Data> {
470    let mut chunk = Vec::with_capacity(chunk_size);
471
472    for _ in 0..chunk_size {
473        match iter.next() {
474            Some(item) => chunk.push(item),
475            None => return chunk,
476        }
477    }
478
479    if let Some(boundary_ts) = chunk.last().map(HasTsInit::ts_init) {
480        while iter.peek().is_some_and(|d| d.ts_init() == boundary_ts) {
481            chunk.push(iter.next().unwrap());
482        }
483    }
484
485    chunk
486}
487
488fn load_and_merge_data(config: &BacktestRunConfig) -> anyhow::Result<Vec<Data>> {
489    let mut all_data = Vec::new();
490
491    for data_config in config.data() {
492        let data = load_data(data_config, config.start(), config.end())?;
493        if data.is_empty() {
494            log::warn!("No data found for config: {:?}", data_config.data_type());
495            continue;
496        }
497        all_data.extend(data);
498    }
499    all_data.sort_by_key(HasTsInit::ts_init);
500    Ok(all_data)
501}
502
503fn create_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
504    let uri = match config.catalog_fs_protocol() {
505        Some(protocol) => format!("{protocol}://{}", config.catalog_path()),
506        None => config.catalog_path().to_string(),
507    };
508    let storage_options = config
509        .catalog_fs_rust_storage_options()
510        .cloned()
511        .or_else(|| config.catalog_fs_storage_options().cloned());
512    ParquetDataCatalog::from_uri(&uri, storage_options, None, None, None)
513}
514
515fn load_data(
516    config: &BacktestDataConfig,
517    run_start: Option<UnixNanos>,
518    run_end: Option<UnixNanos>,
519) -> anyhow::Result<Vec<Data>> {
520    let mut catalog = create_catalog(config)?;
521    let result = dispatch_query(&mut catalog, config, run_start, run_end)?;
522    Ok(result.collect())
523}
524
525fn dispatch_query(
526    catalog: &mut ParquetDataCatalog,
527    config: &BacktestDataConfig,
528    run_start: Option<UnixNanos>,
529    run_end: Option<UnixNanos>,
530) -> anyhow::Result<QueryResult> {
531    catalog.reset_session();
532
533    let identifiers = config.query_identifiers();
534    let start = max_opt(config.start_time(), run_start);
535    let end = min_opt(config.end_time(), run_end);
536    let filter = config.filter_expr();
537    let optimize = config.optimize_file_loading();
538
539    match config.data_type() {
540        NautilusDataType::QuoteTick => {
541            catalog.query::<QuoteTick>(identifiers, start, end, filter, None, optimize)
542        }
543        NautilusDataType::TradeTick => {
544            catalog.query::<TradeTick>(identifiers, start, end, filter, None, optimize)
545        }
546        NautilusDataType::Bar => {
547            catalog.query::<Bar>(identifiers, start, end, filter, None, optimize)
548        }
549        NautilusDataType::OrderBookDelta => {
550            catalog.query::<OrderBookDelta>(identifiers, start, end, filter, None, optimize)
551        }
552        NautilusDataType::OrderBookDepth10 => {
553            catalog.query::<OrderBookDepth10>(identifiers, start, end, filter, None, optimize)
554        }
555        NautilusDataType::MarkPriceUpdate => {
556            catalog.query::<MarkPriceUpdate>(identifiers, start, end, filter, None, optimize)
557        }
558        NautilusDataType::IndexPriceUpdate => {
559            catalog.query::<IndexPriceUpdate>(identifiers, start, end, filter, None, optimize)
560        }
561        NautilusDataType::InstrumentStatus => {
562            catalog.query::<InstrumentStatus>(identifiers, start, end, filter, None, optimize)
563        }
564        NautilusDataType::InstrumentClose => {
565            catalog.query::<InstrumentClose>(identifiers, start, end, filter, None, optimize)
566        }
567    }
568}
569
570fn max_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
571    match (a, b) {
572        (Some(a), Some(b)) => Some(a.max(b)),
573        (Some(a), None) => Some(a),
574        (None, Some(b)) => Some(b),
575        (None, None) => None,
576    }
577}
578
579fn min_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
580    match (a, b) {
581        (Some(a), Some(b)) => Some(a.min(b)),
582        (Some(a), None) => Some(a),
583        (None, Some(b)) => Some(b),
584        (None, None) => None,
585    }
586}