1use std::iter::Peekable;
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_core::UnixNanos;
22use nautilus_model::{
23 data::{
24 Bar, Data, HasTsInit, IndexPriceUpdate, InstrumentClose, InstrumentStatus, MarkPriceUpdate,
25 OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
26 },
27 enums::{BookType, OtoTriggerMode},
28 identifiers::{InstrumentId, Venue},
29 instruments::Instrument,
30 types::Money,
31};
32use nautilus_persistence::backend::{catalog::ParquetDataCatalog, session::QueryResult};
33
34use crate::{
35 config::{BacktestDataConfig, BacktestRunConfig, NautilusDataType, SimulatedVenueConfig},
36 engine::BacktestEngine,
37 result::BacktestResult,
38};
39
40#[derive(Debug)]
45#[cfg_attr(
46 feature = "python",
47 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.backtest", unsendable)
48)]
49#[cfg_attr(
50 feature = "python",
51 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.backtest")
52)]
53pub struct BacktestNode {
54 configs: Vec<BacktestRunConfig>,
55 engines: AHashMap<String, BacktestEngine>,
56}
57
58impl BacktestNode {
59 pub fn new(configs: Vec<BacktestRunConfig>) -> anyhow::Result<Self> {
70 anyhow::ensure!(!configs.is_empty(), "At least one run config is required");
71 validate_configs(&configs)?;
72 Ok(Self {
73 configs,
74 engines: AHashMap::new(),
75 })
76 }
77
78 #[must_use]
80 pub fn configs(&self) -> &[BacktestRunConfig] {
81 &self.configs
82 }
83
84 pub fn build(&mut self) -> anyhow::Result<()> {
93 for config in &self.configs {
94 if self.engines.contains_key(config.id()) {
95 continue;
96 }
97
98 let engine_config = config.engine().clone();
99 let mut engine = BacktestEngine::new(engine_config)?;
100
101 for venue_config in config.venues() {
102 let starting_balances: Vec<Money> = venue_config
103 .starting_balances()
104 .iter()
105 .map(|s| s.parse::<Money>())
106 .collect::<Result<Vec<_>, _>>()
107 .map_err(|e| anyhow::anyhow!("Invalid starting balance: {e}"))?;
108
109 let default_leverage = venue_config.default_leverage();
110 let leverages = venue_config.leverages().cloned().unwrap_or_default();
111 let margin_model = venue_config.margin_model().cloned();
112 let modules = venue_config
113 .modules()
114 .iter()
115 .cloned()
116 .map(Into::into)
117 .collect();
118 let fill_model = venue_config.fill_model().cloned().unwrap_or_default();
119 let fee_model = venue_config.fee_model().cloned().unwrap_or_default();
120 let latency_model = venue_config.latency_model().cloned().map(Into::into);
121 let sim_config = SimulatedVenueConfig::builder()
122 .venue(Venue::from(venue_config.name().as_str()))
123 .oms_type(venue_config.oms_type())
124 .account_type(venue_config.account_type())
125 .book_type(venue_config.book_type())
126 .starting_balances(starting_balances)
127 .maybe_base_currency(venue_config.base_currency())
128 .default_leverage(default_leverage)
129 .leverages(leverages)
130 .maybe_margin_model(margin_model)
131 .modules(modules)
132 .fill_model(fill_model)
133 .fee_model(fee_model)
134 .maybe_latency_model(latency_model)
135 .routing(venue_config.routing())
136 .reject_stop_orders(venue_config.reject_stop_orders())
137 .support_gtd_orders(venue_config.support_gtd_orders())
138 .support_contingent_orders(venue_config.support_contingent_orders())
139 .use_position_ids(venue_config.use_position_ids())
140 .use_random_ids(venue_config.use_random_ids())
141 .use_reduce_only(venue_config.use_reduce_only())
142 .use_market_order_acks(venue_config.use_market_order_acks())
143 .bar_execution(venue_config.bar_execution())
144 .bar_adaptive_high_low_ordering(venue_config.bar_adaptive_high_low_ordering())
145 .trade_execution(venue_config.trade_execution())
146 .liquidity_consumption(venue_config.liquidity_consumption())
147 .allow_cash_borrowing(venue_config.allow_cash_borrowing())
148 .frozen_account(venue_config.frozen_account())
149 .queue_position(venue_config.queue_position())
150 .oto_full_trigger(venue_config.oto_trigger_mode() == OtoTriggerMode::Full)
151 .price_protection_points(venue_config.price_protection_points())
152 .build();
153 engine.add_venue(sim_config)?;
154 }
155
156 for data_config in config.data() {
157 let catalog = create_catalog(data_config)?;
158 let instr_ids: Vec<InstrumentId> = data_config.get_instrument_ids()?;
159 let filter: Option<Vec<String>> = if instr_ids.is_empty() {
160 None
161 } else {
162 Some(instr_ids.iter().map(ToString::to_string).collect())
163 };
164
165 let instruments = catalog.query_instruments(filter.as_deref())?;
166
167 if !instr_ids.is_empty() && instruments.is_empty() {
168 let ids: Vec<String> = instr_ids.iter().map(ToString::to_string).collect();
169 anyhow::bail!(
170 "No instruments found in catalog for requested IDs: [{}]",
171 ids.join(", ")
172 );
173 }
174
175 for instrument in instruments {
176 engine.add_instrument(&instrument)?;
177 }
178 }
179
180 for venue_config in config.venues() {
181 let Some(settlement_prices) = venue_config.settlement_prices() else {
182 continue;
183 };
184 let venue = Venue::from(venue_config.name().as_str());
185
186 for (instrument_id, raw_price) in settlement_prices {
187 let price = {
188 let cache = engine.kernel().cache.borrow();
189 let instrument = cache.instrument(instrument_id).ok_or_else(|| {
190 anyhow::anyhow!(
191 "No instrument found for settlement price configuration: {instrument_id}"
192 )
193 })?;
194 instrument.make_price(*raw_price)
195 };
196 engine.set_settlement_price(venue, *instrument_id, price)?;
197 }
198 }
199
200 self.engines.insert(config.id().to_string(), engine);
201 }
202
203 Ok(())
204 }
205
206 #[must_use]
208 pub fn get_engine_mut(&mut self, id: &str) -> Option<&mut BacktestEngine> {
209 self.engines.get_mut(id)
210 }
211
212 #[must_use]
214 pub fn get_engine(&self, id: &str) -> Option<&BacktestEngine> {
215 self.engines.get(id)
216 }
217
218 #[must_use]
220 pub fn get_engines(&self) -> Vec<&BacktestEngine> {
221 self.engines.values().collect()
222 }
223
224 pub fn run(&mut self) -> anyhow::Result<Vec<BacktestResult>> {
234 if self.engines.is_empty() {
236 self.build()?;
237 }
238
239 let mut results = Vec::new();
240
241 for config in &self.configs {
242 let engine = self.engines.get_mut(config.id()).ok_or_else(|| {
243 anyhow::anyhow!(
244 "Engine not found for config '{}'. Call build() first.",
245 config.id()
246 )
247 })?;
248
249 match config.chunk_size() {
250 None => run_oneshot(engine, config)?,
251 Some(chunk_size) => {
252 anyhow::ensure!(chunk_size > 0, "chunk_size must be > 0");
253 run_streaming(engine, config, chunk_size)?;
254 }
255 }
256
257 results.push(engine.get_result());
258
259 if config.dispose_on_completion() {
260 engine.dispose();
261 } else {
262 engine.clear_data();
263 }
264 }
265
266 Ok(results)
267 }
268
269 pub fn load_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
275 create_catalog(config)
276 }
277
278 pub fn load_data_config(
284 config: &BacktestDataConfig,
285 start: Option<UnixNanos>,
286 end: Option<UnixNanos>,
287 ) -> anyhow::Result<Vec<Data>> {
288 load_data(config, start, end)
289 }
290
291 pub fn dispose(&mut self) {
293 for engine in self.engines.values_mut() {
294 engine.dispose();
295 }
296 self.engines.clear();
297 }
298}
299
300fn validate_configs(configs: &[BacktestRunConfig]) -> anyhow::Result<()> {
301 anyhow::ensure!(
304 configs.len() <= 1,
305 "Only one run config per BacktestNode is supported \
306 (kernel MessageBus is a thread-local singleton)"
307 );
308
309 let mut seen_ids = AHashSet::new();
310
311 for config in configs {
312 anyhow::ensure!(
313 seen_ids.insert(config.id()),
314 "Duplicate run config ID '{}'",
315 config.id()
316 );
317
318 let venue_names: Vec<String> = config
319 .venues()
320 .iter()
321 .map(|v| v.name().to_string())
322 .collect();
323
324 for data_config in config.data() {
325 if let (Some(start), Some(end)) = (data_config.start_time(), data_config.end_time()) {
326 anyhow::ensure!(
327 start <= end,
328 "Data config start_time ({start}) must be <= end_time ({end})"
329 );
330 }
331
332 for instrument_id in data_config.get_instrument_ids()? {
333 let venue = instrument_id.venue.to_string();
334 anyhow::ensure!(
335 venue_names.contains(&venue),
336 "No venue config found for venue '{venue}' (required by instrument {instrument_id})"
337 );
338 }
339 }
340
341 for venue_config in config.venues() {
342 let needs_book_data = matches!(
343 venue_config.book_type(),
344 BookType::L2_MBP | BookType::L3_MBO
345 );
346
347 if needs_book_data {
348 let venue_name = venue_config.name().to_string();
349 let has_book_data = config.data().iter().any(|dc| {
350 let is_book_type = matches!(
351 dc.data_type(),
352 NautilusDataType::OrderBookDelta | NautilusDataType::OrderBookDepth10
353 );
354
355 if !is_book_type {
356 return false;
357 }
358
359 let ids = dc.get_instrument_ids().unwrap_or_default();
361 ids.is_empty() || ids.iter().any(|id| id.venue.to_string() == venue_name)
362 });
363 anyhow::ensure!(
364 has_book_data,
365 "Venue '{venue_name}' has book_type {:?} but no order book data configured",
366 venue_config.book_type()
367 );
368 }
369 }
370 }
371 Ok(())
372}
373
374fn run_oneshot(engine: &mut BacktestEngine, config: &BacktestRunConfig) -> anyhow::Result<()> {
375 for data_config in config.data() {
376 let data = load_data(data_config, config.start(), config.end())?;
377 if data.is_empty() {
378 log::warn!("No data found for config: {:?}", data_config.data_type());
379 continue;
380 }
381 engine.add_data(data, data_config.client_id(), false, false)?;
382 }
383
384 engine.sort_data();
385 engine.run(
386 config.start(),
387 config.end(),
388 Some(config.id().to_string()),
389 false,
390 )
391}
392
393fn run_streaming(
394 engine: &mut BacktestEngine,
395 config: &BacktestRunConfig,
396 chunk_size: usize,
397) -> anyhow::Result<()> {
398 let data_configs = config.data();
399
400 if data_configs.len() == 1 {
401 let data_config = &data_configs[0];
404 let mut catalog = create_catalog(data_config)?;
405 let result = dispatch_query(&mut catalog, data_config, config.start(), config.end())?;
406 stream_chunks(engine, config, result.peekable(), chunk_size)?;
407 } else {
408 let all_data = load_and_merge_data(config)?;
410 stream_chunks(engine, config, all_data.into_iter().peekable(), chunk_size)?;
411 }
412
413 Ok(())
414}
415
416fn stream_chunks<I: Iterator<Item = Data>>(
420 engine: &mut BacktestEngine,
421 config: &BacktestRunConfig,
422 mut iter: Peekable<I>,
423 chunk_size: usize,
424) -> anyhow::Result<()> {
425 if iter.peek().is_none() {
426 engine.end();
427 return Ok(());
428 }
429
430 let mut next_start = config.start();
431
432 loop {
433 let chunk = take_aligned_chunk(&mut iter, chunk_size);
434 if chunk.is_empty() {
435 break;
436 }
437
438 let is_last = iter.peek().is_none();
439 let end = if is_last {
440 config.end()
441 } else {
442 chunk.last().map(HasTsInit::ts_init)
443 };
444
445 engine.add_data(chunk, None, false, true)?;
446 engine.run(next_start, end, Some(config.id().to_string()), true)?;
447 engine.clear_data();
448
449 if engine.kernel().is_shutdown_requested() {
452 return Ok(());
453 }
454
455 next_start = end;
458 }
459
460 engine.end();
461 Ok(())
462}
463
464fn take_aligned_chunk<I: Iterator<Item = Data>>(
467 iter: &mut Peekable<I>,
468 chunk_size: usize,
469) -> Vec<Data> {
470 let mut chunk = Vec::with_capacity(chunk_size);
471
472 for _ in 0..chunk_size {
473 match iter.next() {
474 Some(item) => chunk.push(item),
475 None => return chunk,
476 }
477 }
478
479 if let Some(boundary_ts) = chunk.last().map(HasTsInit::ts_init) {
480 while iter.peek().is_some_and(|d| d.ts_init() == boundary_ts) {
481 chunk.push(iter.next().unwrap());
482 }
483 }
484
485 chunk
486}
487
488fn load_and_merge_data(config: &BacktestRunConfig) -> anyhow::Result<Vec<Data>> {
489 let mut all_data = Vec::new();
490
491 for data_config in config.data() {
492 let data = load_data(data_config, config.start(), config.end())?;
493 if data.is_empty() {
494 log::warn!("No data found for config: {:?}", data_config.data_type());
495 continue;
496 }
497 all_data.extend(data);
498 }
499 all_data.sort_by_key(HasTsInit::ts_init);
500 Ok(all_data)
501}
502
503fn create_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
504 let uri = match config.catalog_fs_protocol() {
505 Some(protocol) => format!("{protocol}://{}", config.catalog_path()),
506 None => config.catalog_path().to_string(),
507 };
508 let storage_options = config
509 .catalog_fs_rust_storage_options()
510 .cloned()
511 .or_else(|| config.catalog_fs_storage_options().cloned());
512 ParquetDataCatalog::from_uri(&uri, storage_options, None, None, None)
513}
514
515fn load_data(
516 config: &BacktestDataConfig,
517 run_start: Option<UnixNanos>,
518 run_end: Option<UnixNanos>,
519) -> anyhow::Result<Vec<Data>> {
520 let mut catalog = create_catalog(config)?;
521 let result = dispatch_query(&mut catalog, config, run_start, run_end)?;
522 Ok(result.collect())
523}
524
525fn dispatch_query(
526 catalog: &mut ParquetDataCatalog,
527 config: &BacktestDataConfig,
528 run_start: Option<UnixNanos>,
529 run_end: Option<UnixNanos>,
530) -> anyhow::Result<QueryResult> {
531 catalog.reset_session();
532
533 let identifiers = config.query_identifiers();
534 let start = max_opt(config.start_time(), run_start);
535 let end = min_opt(config.end_time(), run_end);
536 let filter = config.filter_expr();
537 let optimize = config.optimize_file_loading();
538
539 match config.data_type() {
540 NautilusDataType::QuoteTick => {
541 catalog.query::<QuoteTick>(identifiers, start, end, filter, None, optimize)
542 }
543 NautilusDataType::TradeTick => {
544 catalog.query::<TradeTick>(identifiers, start, end, filter, None, optimize)
545 }
546 NautilusDataType::Bar => {
547 catalog.query::<Bar>(identifiers, start, end, filter, None, optimize)
548 }
549 NautilusDataType::OrderBookDelta => {
550 catalog.query::<OrderBookDelta>(identifiers, start, end, filter, None, optimize)
551 }
552 NautilusDataType::OrderBookDepth10 => {
553 catalog.query::<OrderBookDepth10>(identifiers, start, end, filter, None, optimize)
554 }
555 NautilusDataType::MarkPriceUpdate => {
556 catalog.query::<MarkPriceUpdate>(identifiers, start, end, filter, None, optimize)
557 }
558 NautilusDataType::IndexPriceUpdate => {
559 catalog.query::<IndexPriceUpdate>(identifiers, start, end, filter, None, optimize)
560 }
561 NautilusDataType::InstrumentStatus => {
562 catalog.query::<InstrumentStatus>(identifiers, start, end, filter, None, optimize)
563 }
564 NautilusDataType::InstrumentClose => {
565 catalog.query::<InstrumentClose>(identifiers, start, end, filter, None, optimize)
566 }
567 }
568}
569
570fn max_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
571 match (a, b) {
572 (Some(a), Some(b)) => Some(a.max(b)),
573 (Some(a), None) => Some(a),
574 (None, Some(b)) => Some(b),
575 (None, None) => None,
576 }
577}
578
579fn min_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
580 match (a, b) {
581 (Some(a), Some(b)) => Some(a.min(b)),
582 (Some(a), None) => Some(a),
583 (None, Some(b)) => Some(b),
584 (None, None) => None,
585 }
586}