1use std::{any::Any, cell::RefCell, fmt::Debug, rc::Rc, sync::Arc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23 actor::DataActor,
24 cache::Cache,
25 clock::{Clock, TestClock},
26 component::Component,
27 enums::LogColor,
28 log_info,
29 logging::{
30 logging_clock_set_realtime_mode, logging_clock_set_static_mode,
31 logging_clock_set_static_time,
32 },
33 runner::{
34 SyncDataCommandSender, SyncTradingCommandSender, data_cmd_queue_is_empty,
35 drain_data_cmd_queue, drain_trading_cmd_queue, replace_data_cmd_sender,
36 replace_exec_cmd_sender, trading_cmd_queue_is_empty,
37 },
38};
39use nautilus_core::{
40 UUID4, UnixNanos, datetime::unix_nanos_to_iso8601, string::formatting::Separable,
41};
42use nautilus_data::client::DataClientAdapter;
43use nautilus_execution::models::fill::FillModelAny;
44use nautilus_model::{
45 accounts::{Account, AccountAny},
46 data::{Data, HasTsInit},
47 enums::{AccountType, AggregationSource, BookType},
48 identifiers::{AccountId, ClientId, InstrumentId, TraderId, Venue},
49 instruments::{Instrument, InstrumentAny},
50 orders::Order,
51 position::Position,
52 types::Price,
53};
54use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
55use nautilus_trading::{ExecutionAlgorithm, strategy::Strategy};
56
57use crate::{
58 accumulator::TimeEventAccumulator,
59 config::{BacktestEngineConfig, SimulatedVenueConfig},
60 data_client::BacktestDataClient,
61 data_iterator::BacktestDataIterator,
62 exchange::SimulatedExchange,
63 execution_client::BacktestExecutionClient,
64 result::BacktestResult,
65};
66
67pub struct BacktestEngine {
80 instance_id: UUID4,
81 config: BacktestEngineConfig,
82 kernel: NautilusKernel,
83 accumulator: TimeEventAccumulator,
84 run_config_id: Option<String>,
85 run_id: Option<UUID4>,
86 venues: AHashMap<Venue, Rc<RefCell<SimulatedExchange>>>,
87 exec_clients: Vec<BacktestExecutionClient>,
88 has_data: AHashSet<InstrumentId>,
89 has_book_data: AHashSet<InstrumentId>,
90 data_iterator: BacktestDataIterator,
91 data_len: usize,
92 data_stream_counter: usize,
93 ts_first: Option<UnixNanos>,
94 ts_last_data: Option<UnixNanos>,
95 sorted: bool,
96 iteration: usize,
97 force_stop: bool,
98 last_ns: UnixNanos,
99 last_module_ns: Option<UnixNanos>,
100 end_ns: UnixNanos,
101 run_started: Option<UnixNanos>,
102 run_finished: Option<UnixNanos>,
103 backtest_start: Option<UnixNanos>,
104 backtest_end: Option<UnixNanos>,
105}
106
107impl Debug for BacktestEngine {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 f.debug_struct(stringify!(BacktestEngine))
110 .field("instance_id", &self.instance_id)
111 .field("run_config_id", &self.run_config_id)
112 .field("run_id", &self.run_id)
113 .finish()
114 }
115}
116
117impl BacktestEngine {
118 pub fn new(mut config: BacktestEngineConfig) -> anyhow::Result<Self> {
124 let mut cache_config = config.cache.unwrap_or_default();
127 cache_config.drop_instruments_on_reset = false;
128 config.cache = Some(cache_config);
129 let kernel = NautilusKernel::new("BacktestEngine".to_string(), config.clone())?;
130 Ok(Self {
131 instance_id: kernel.instance_id,
132 config,
133 accumulator: TimeEventAccumulator::new(),
134 kernel,
135 run_config_id: None,
136 run_id: None,
137 venues: AHashMap::new(),
138 exec_clients: Vec::new(),
139 has_data: AHashSet::new(),
140 has_book_data: AHashSet::new(),
141 data_iterator: BacktestDataIterator::new(),
142 data_len: 0,
143 data_stream_counter: 0,
144 ts_first: None,
145 ts_last_data: None,
146 sorted: true,
147 iteration: 0,
148 force_stop: false,
149 last_ns: UnixNanos::default(),
150 last_module_ns: None,
151 end_ns: UnixNanos::default(),
152 run_started: None,
153 run_finished: None,
154 backtest_start: None,
155 backtest_end: None,
156 })
157 }
158
159 #[must_use]
161 pub const fn kernel(&self) -> &NautilusKernel {
162 &self.kernel
163 }
164
165 pub fn kernel_mut(&mut self) -> &mut NautilusKernel {
167 &mut self.kernel
168 }
169
170 #[must_use]
172 pub fn trader_id(&self) -> TraderId {
173 self.kernel.trader_id()
174 }
175
176 #[must_use]
178 pub fn machine_id(&self) -> &str {
179 self.kernel.machine_id()
180 }
181
182 #[must_use]
184 pub fn instance_id(&self) -> UUID4 {
185 self.instance_id
186 }
187
188 #[must_use]
190 pub fn iteration(&self) -> usize {
191 self.iteration
192 }
193
194 #[must_use]
196 pub fn run_config_id(&self) -> Option<&str> {
197 self.run_config_id.as_deref()
198 }
199
200 #[must_use]
202 pub const fn run_id(&self) -> Option<UUID4> {
203 self.run_id
204 }
205
206 #[must_use]
208 pub const fn run_started(&self) -> Option<UnixNanos> {
209 self.run_started
210 }
211
212 #[must_use]
214 pub const fn run_finished(&self) -> Option<UnixNanos> {
215 self.run_finished
216 }
217
218 #[must_use]
220 pub const fn backtest_start(&self) -> Option<UnixNanos> {
221 self.backtest_start
222 }
223
224 #[must_use]
226 pub const fn backtest_end(&self) -> Option<UnixNanos> {
227 self.backtest_end
228 }
229
230 #[must_use]
232 pub fn list_venues(&self) -> Vec<Venue> {
233 self.venues.keys().copied().collect()
234 }
235
236 pub fn add_venue(&mut self, config: SimulatedVenueConfig) -> anyhow::Result<()> {
240 let venue = config.venue;
243 let routing = Some(config.routing);
244 let frozen_account = Some(config.frozen_account);
245
246 let exchange =
247 SimulatedExchange::new(config, self.kernel.cache.clone(), self.kernel.clock.clone())?;
248 let exchange = Rc::new(RefCell::new(exchange));
249 self.venues.insert(venue, exchange.clone());
250
251 let account_id = AccountId::from(format!("{venue}-001").as_str());
252
253 let exec_client = BacktestExecutionClient::new(
254 self.config.trader_id(),
255 account_id,
256 &exchange,
257 self.kernel.cache.clone(),
258 self.kernel.clock.clone(),
259 routing,
260 frozen_account,
261 );
262
263 exchange
264 .borrow_mut()
265 .register_client(Rc::new(exec_client.clone()));
266
267 self.exec_clients.push(exec_client.clone());
268
269 self.kernel
270 .exec_engine
271 .borrow_mut()
272 .register_client(Box::new(exec_client))?;
273
274 log::info!("Adding exchange {venue} to engine");
275
276 Ok(())
277 }
278
279 pub fn set_settlement_price(
285 &mut self,
286 venue: Venue,
287 instrument_id: InstrumentId,
288 price: Price,
289 ) -> anyhow::Result<()> {
290 let exchange = self
291 .venues
292 .get_mut(&venue)
293 .ok_or_else(|| anyhow::anyhow!("Unknown venue {venue}"))?;
294 exchange
295 .borrow_mut()
296 .set_settlement_price(instrument_id, price);
297 Ok(())
298 }
299
300 pub fn change_fill_model(&mut self, venue: Venue, fill_model: FillModelAny) {
302 if let Some(exchange) = self.venues.get_mut(&venue) {
303 exchange.borrow_mut().set_fill_model(fill_model);
304 } else {
305 log::warn!(
306 "BacktestEngine::change_fill_model called for unknown venue {venue}, ignoring"
307 );
308 }
309 }
310
311 pub fn add_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
320 let instrument_id = instrument.id();
321 if let Some(exchange) = self.venues.get_mut(&instrument.id().venue) {
322 if matches!(
323 instrument,
324 InstrumentAny::CurrencyPair(_) | InstrumentAny::TokenizedAsset(_)
325 ) && exchange.borrow().account_type != AccountType::Margin
326 && exchange.borrow().base_currency.is_some()
327 {
328 anyhow::bail!(
329 "Cannot add a multi-currency spot instrument {instrument_id} for a venue with a single-currency CASH account"
330 )
331 }
332 exchange.borrow_mut().add_instrument(instrument.clone())?;
333 } else {
334 anyhow::bail!(
335 "Cannot add an `Instrument` object without first adding its associated venue {}",
336 instrument.id().venue
337 )
338 }
339
340 self.add_market_data_client_if_not_exists(instrument.id().venue);
341
342 self.kernel
343 .data_engine
344 .borrow_mut()
345 .process(instrument as &dyn Any);
346 log::info!(
347 "Added instrument {} to exchange {}",
348 instrument_id,
349 instrument_id.venue
350 );
351 Ok(())
352 }
353
354 pub fn add_data(
365 &mut self,
366 data: Vec<Data>,
367 _client_id: Option<ClientId>,
368 validate: bool,
369 sort: bool,
370 ) -> anyhow::Result<()> {
371 anyhow::ensure!(!data.is_empty(), "data was empty");
372
373 let count = data.len();
374 let mut to_add = data;
375
376 if sort {
377 to_add.sort_by_key(HasTsInit::ts_init);
378 }
379
380 if validate {
381 let first = &to_add[0];
384 let first_instrument_id = first.instrument_id();
385 anyhow::ensure!(
386 self.kernel
387 .cache
388 .borrow()
389 .instrument(&first_instrument_id)
390 .is_some(),
391 "Instrument {first_instrument_id} for the given data not found in the cache. \
392 Add the instrument through `add_instrument()` prior to adding related data."
393 );
394
395 if let Data::Bar(bar) = first {
396 anyhow::ensure!(
397 bar.bar_type.aggregation_source() == AggregationSource::External,
398 "bar_type.aggregation_source must be External, was {:?}",
399 bar.bar_type.aggregation_source(),
400 );
401 }
402 }
403
404 let mut batch_min_ts: Option<UnixNanos> = None;
410 let mut batch_max_ts: Option<UnixNanos> = None;
411
412 for item in &to_add {
413 let instr_id = item.instrument_id();
414 self.has_data.insert(instr_id);
415
416 if item.is_order_book_data() {
417 self.has_book_data.insert(instr_id);
418 }
419
420 self.add_market_data_client_if_not_exists(instr_id.venue);
421
422 let ts = item.ts_init();
423 batch_min_ts = Some(batch_min_ts.map_or(ts, |cur| cur.min(ts)));
424 batch_max_ts = Some(batch_max_ts.map_or(ts, |cur| cur.max(ts)));
425 }
426
427 if let Some(ts) = batch_min_ts
428 && self.ts_first.is_none_or(|t| ts < t)
429 {
430 self.ts_first = Some(ts);
431 }
432
433 if let Some(ts) = batch_max_ts
434 && self.ts_last_data.is_none_or(|t| ts > t)
435 {
436 self.ts_last_data = Some(ts);
437 }
438
439 self.data_len += count;
440 let stream_name = format!("backtest_data_{}", self.data_stream_counter);
441 self.data_stream_counter += 1;
442 self.data_iterator.add_data(&stream_name, to_add, true);
443
444 self.sorted = sort;
445
446 log::info!(
447 "Added {count} data element{} to BacktestEngine ({} total)",
448 if count == 1 { "" } else { "s" },
449 self.data_len,
450 );
451
452 Ok(())
453 }
454
455 pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
462 where
463 T: Strategy + Component + Debug + 'static,
464 {
465 self.kernel.trader.borrow_mut().add_strategy(strategy)
466 }
467
468 pub fn add_strategies<T>(&mut self, strategies: Vec<T>) -> anyhow::Result<()>
474 where
475 T: Strategy + Component + Debug + 'static,
476 {
477 for strategy in strategies {
478 self.add_strategy(strategy)?;
479 }
480 Ok(())
481 }
482
483 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
490 where
491 T: DataActor + Component + Debug + 'static,
492 {
493 self.kernel.trader.borrow_mut().add_actor(actor)
494 }
495
496 pub fn add_actors<T>(&mut self, actors: Vec<T>) -> anyhow::Result<()>
502 where
503 T: DataActor + Component + Debug + 'static,
504 {
505 for actor in actors {
506 self.add_actor(actor)?;
507 }
508 Ok(())
509 }
510
511 pub fn add_exec_algorithm<T>(&mut self, exec_algorithm: T) -> anyhow::Result<()>
517 where
518 T: ExecutionAlgorithm + Component + Debug + 'static,
519 {
520 self.kernel
521 .trader
522 .borrow_mut()
523 .add_exec_algorithm(exec_algorithm)
524 }
525
526 pub fn add_exec_algorithms<T>(&mut self, exec_algorithms: Vec<T>) -> anyhow::Result<()>
533 where
534 T: ExecutionAlgorithm + Component + Debug + 'static,
535 {
536 for exec_algorithm in exec_algorithms {
537 self.add_exec_algorithm(exec_algorithm)?;
538 }
539 Ok(())
540 }
541
542 pub fn run(
559 &mut self,
560 start: Option<UnixNanos>,
561 end: Option<UnixNanos>,
562 run_config_id: Option<String>,
563 streaming: bool,
564 ) -> anyhow::Result<()> {
565 self.run_impl(start, end, run_config_id, streaming)?;
566
567 if !streaming || self.force_stop || self.kernel.is_shutdown_requested() {
572 self.end();
573 }
574
575 Ok(())
576 }
577
578 fn run_impl(
579 &mut self,
580 start: Option<UnixNanos>,
581 end: Option<UnixNanos>,
582 run_config_id: Option<String>,
583 streaming: bool,
584 ) -> anyhow::Result<()> {
585 anyhow::ensure!(
586 self.sorted,
587 "Data has been added but not sorted, call `engine.sort_data()` or use \
588 `engine.add_data(..., sort=true)` before running"
589 );
590
591 for exchange in self.venues.values() {
592 let exchange = exchange.borrow();
593 let book_type_has_depth = exchange.book_type() as u8 > BookType::L1_MBP as u8;
594 if !book_type_has_depth {
595 continue;
596 }
597
598 for instrument_id in exchange.instrument_ids() {
599 let has_data = self.has_data.contains(instrument_id);
600 let missing_book_data = !self.has_book_data.contains(instrument_id);
601 if has_data && missing_book_data {
602 anyhow::bail!(
603 "No order book data found for instrument '{instrument_id}' when `book_type` \
604 is '{:?}'. Set the venue `book_type` to 'L1_MBP' (for top-of-book data \
605 like quotes, trades, and bars) or provide order book data for this \
606 instrument.",
607 exchange.book_type()
608 );
609 }
610 }
611 }
612
613 let start_ns = start.unwrap_or_else(|| self.ts_first.unwrap_or_default());
615 let end_ns = end.unwrap_or_else(|| {
616 self.ts_last_data
617 .unwrap_or(UnixNanos::from(4_102_444_800_000_000_000u64))
618 });
619 anyhow::ensure!(start_ns <= end_ns, "start was > end");
620 self.end_ns = end_ns;
621 self.last_ns = start_ns;
622 self.last_module_ns = None;
623
624 let clocks = self.collect_all_clocks();
626 Self::set_all_clocks_time(&clocks, start_ns);
627
628 if self.iteration == 0 {
630 self.run_config_id = run_config_id;
631 self.run_id = Some(UUID4::new());
632 self.run_started = Some(UnixNanos::from(std::time::SystemTime::now()));
633 self.backtest_start = Some(start_ns);
634
635 for exchange in self.venues.values() {
636 let mut ex = exchange.borrow_mut();
637 ex.initialize_account();
638 ex.load_open_orders();
639 }
640
641 Self::set_all_clocks_time(&clocks, start_ns);
643
644 self.force_stop = false;
646 self.kernel.reset_shutdown_flag();
647
648 Self::init_command_senders();
650
651 logging_clock_set_static_mode();
653 logging_clock_set_static_time(start_ns.as_u64());
654
655 self.kernel.start();
657 self.kernel.start_trader();
658
659 self.log_pre_run();
660 }
661
662 self.log_run();
663
664 let mut data = self.data_iterator.next();
666 while let Some(ref d) = data {
667 if d.ts_init() >= start_ns {
668 break;
669 }
670 data = self.data_iterator.next();
671 }
672
673 if let Some(ref d) = data {
675 let ts = d.ts_init();
676 self.last_ns = if ts.as_u64() > 0 {
677 UnixNanos::from(ts.as_u64() - 1)
678 } else {
679 UnixNanos::default()
680 };
681 } else {
682 self.last_ns = start_ns;
683 }
684
685 loop {
686 if self.kernel.is_shutdown_requested() {
687 log::info!("Shutdown requested via ShutdownSystem, ending backtest");
688 self.force_stop = true;
689 }
690
691 if self.force_stop {
692 log::error!("Force stop triggered, ending backtest");
693 break;
694 }
695
696 if data.is_none() {
697 if streaming {
698 break;
702 }
703 let done = self.process_next_timer(&clocks);
704 data = self.data_iterator.next();
705 if data.is_none() && done {
706 break;
707 }
708 continue;
709 }
710
711 let d = data.as_ref().unwrap();
712 let ts_init = d.ts_init();
713
714 if ts_init > end_ns {
715 break;
716 }
717
718 if ts_init > self.last_ns {
719 self.last_ns = ts_init;
720 self.advance_time_impl(ts_init, &clocks);
721 }
722
723 self.route_data_to_exchange(d);
725
726 self.kernel.data_engine.borrow_mut().process_data(d.clone());
729
730 self.drain_command_queues();
732 self.settle_venues(ts_init);
733
734 let prev_last_ns = self.last_ns;
735 data = self.data_iterator.next();
736
737 if data.is_none() || data.as_ref().unwrap().ts_init() > prev_last_ns {
739 self.flush_accumulator_events(&clocks, prev_last_ns);
740 self.run_venue_modules(prev_last_ns);
741 }
742
743 self.iteration += 1;
744 }
745
746 let ts_now = self.kernel.clock.borrow().timestamp_ns();
748 self.settle_venues(ts_now);
749 self.run_venue_modules(ts_now);
750
751 if streaming {
755 self.flush_accumulator_events(&clocks, self.last_ns);
756 } else {
757 self.flush_accumulator_events(&clocks, end_ns);
758 }
759
760 Ok(())
761 }
762
763 pub fn end(&mut self) {
765 if self.end_ns.as_u64() > 0 {
772 let clocks = self.collect_all_clocks();
773 let flush_ts = if self.force_stop || self.kernel.is_shutdown_requested() {
774 self.last_ns
775 } else {
776 self.end_ns
777 };
778
779 self.flush_accumulator_events(&clocks, flush_ts);
780 }
781
782 self.kernel.stop_trader();
784
785 self.kernel.data_engine.borrow_mut().stop();
787 self.kernel.risk_engine.borrow_mut().stop();
788 self.kernel.exec_engine.borrow_mut().stop();
789
790 let ts_now = self.kernel.clock.borrow().timestamp_ns();
792 self.settle_venues(ts_now);
793 self.run_venue_modules(ts_now);
794
795 self.run_finished = Some(UnixNanos::from(std::time::SystemTime::now()));
796 self.backtest_end = Some(self.kernel.clock.borrow().timestamp_ns());
797
798 logging_clock_set_realtime_mode();
800
801 self.log_post_run();
802 }
803
804 pub fn reset(&mut self) {
809 log::debug!("Resetting");
810
811 if self.kernel.trader.borrow().is_running() {
812 self.end();
813 }
814
815 self.kernel.data_engine.borrow_mut().stop();
817 self.kernel.data_engine.borrow_mut().reset();
818
819 self.kernel.exec_engine.borrow_mut().stop();
820 self.kernel.exec_engine.borrow_mut().reset();
821
822 self.kernel.risk_engine.borrow_mut().stop();
823 self.kernel.risk_engine.borrow_mut().reset();
824
825 if let Err(e) = self.kernel.trader.borrow_mut().reset() {
827 log::error!("Error resetting trader: {e:?}");
828 }
829
830 for exchange in self.venues.values() {
834 exchange.borrow_mut().reset();
835 }
836 self.kernel.cache.borrow_mut().reset();
837 self.kernel.portfolio.borrow_mut().reset();
838
839 self.run_config_id = None;
841 self.run_id = None;
842 self.run_started = None;
843 self.run_finished = None;
844 self.backtest_start = None;
845 self.backtest_end = None;
846 self.iteration = 0;
847 self.force_stop = false;
848 self.last_ns = UnixNanos::default();
849 self.last_module_ns = None;
850 self.end_ns = UnixNanos::default();
851
852 self.accumulator.clear();
853
854 self.data_iterator.reset_all_cursors();
856
857 log::info!("Reset");
858 }
859
860 pub fn sort_data(&mut self) {
865 self.sorted = true;
869 log::info!("Data sort requested (iterator merges streams by ts_init)");
870 }
871
872 pub fn clear_data(&mut self) {
874 self.has_data.clear();
875 self.has_book_data.clear();
876 self.data_iterator = BacktestDataIterator::new();
877 self.data_len = 0;
878 self.data_stream_counter = 0;
879 self.ts_first = None;
880 self.ts_last_data = None;
881 self.sorted = true;
882 }
883
884 pub fn clear_actors(&mut self) -> anyhow::Result<()> {
890 self.kernel.trader.borrow_mut().clear_actors()
891 }
892
893 pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
899 self.kernel.trader.borrow_mut().clear_strategies()
900 }
901
902 pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
908 self.kernel.trader.borrow_mut().clear_exec_algorithms()
909 }
910
911 pub fn dispose(&mut self) {
913 self.clear_data();
914 self.accumulator.clear();
915 self.kernel.dispose();
916 }
917
918 #[must_use]
920 pub fn get_result(&self) -> BacktestResult {
921 let elapsed_time_secs = match (self.backtest_start, self.backtest_end) {
922 (Some(start), Some(end)) => {
923 (end.as_u64() as f64 - start.as_u64() as f64) / 1_000_000_000.0
924 }
925 _ => 0.0,
926 };
927
928 let cache = self.kernel.cache.borrow();
929 let orders = cache.orders(None, None, None, None, None);
930 let total_events: usize = orders.iter().map(|o| o.event_count()).sum();
931 let total_orders = orders.len();
932 let positions = cache.positions(None, None, None, None, None);
933 let total_positions = positions.len();
934
935 let analyzer = self.build_analyzer(&cache, &positions);
936 let mut stats_pnls = AHashMap::new();
937
938 for currency in analyzer.currencies() {
939 if let Ok(pnls) = analyzer.get_performance_stats_pnls(Some(currency), None) {
940 stats_pnls.insert(currency.code.to_string(), pnls);
941 }
942 }
943
944 let stats_returns = analyzer.get_performance_stats_returns();
945 let stats_general = analyzer.get_performance_stats_general();
946
947 BacktestResult {
948 trader_id: self.config.trader_id().to_string(),
949 machine_id: self.kernel.machine_id.clone(),
950 instance_id: self.instance_id,
951 run_config_id: self.run_config_id.clone(),
952 run_id: self.run_id,
953 run_started: self.run_started,
954 run_finished: self.run_finished,
955 backtest_start: self.backtest_start,
956 backtest_end: self.backtest_end,
957 elapsed_time_secs,
958 iterations: self.iteration,
959 total_events,
960 total_orders,
961 total_positions,
962 stats_pnls,
963 stats_returns,
964 stats_general,
965 }
966 }
967
968 fn build_analyzer(&self, cache: &Cache, positions: &[&Position]) -> PortfolioAnalyzer {
969 let mut analyzer = PortfolioAnalyzer::default();
970 let positions_owned: Vec<_> = positions.iter().map(|p| (*p).clone()).collect();
971 let mut snapshot_positions = Vec::new();
972
973 for position in positions {
974 snapshot_positions.extend(cache.position_snapshots(Some(&position.id), None));
975 }
976
977 for venue in self.venues.keys() {
979 if let Some(account) = cache.account_for_venue(venue) {
980 let account_ref: &dyn Account = match account {
981 AccountAny::Margin(margin) => margin,
982 AccountAny::Cash(cash) => cash,
983 AccountAny::Betting(betting) => betting,
984 };
985
986 for (currency, money) in account_ref.starting_balances() {
987 analyzer
988 .account_balances_starting
989 .entry(currency)
990 .and_modify(|existing| *existing = *existing + money)
991 .or_insert(money);
992 }
993
994 for (currency, money) in account_ref.balances_total() {
995 analyzer
996 .account_balances
997 .entry(currency)
998 .and_modify(|existing| *existing = *existing + money)
999 .or_insert(money);
1000 }
1001 }
1002 }
1003
1004 analyzer.add_positions(&positions_owned);
1005 analyzer.add_positions(&snapshot_positions);
1006 analyzer
1007 }
1008
1009 fn route_data_to_exchange(&self, data: &Data) {
1010 if matches!(
1011 data,
1012 Data::MarkPriceUpdate(_) | Data::IndexPriceUpdate(_) | Data::Custom(_)
1013 ) {
1014 return;
1015 }
1016
1017 let venue = data.instrument_id().venue;
1018 if let Some(exchange) = self.venues.get(&venue) {
1019 let mut exchange = exchange.borrow_mut();
1020
1021 match data {
1022 Data::Delta(delta) => exchange.process_order_book_delta(*delta),
1023 Data::Deltas(deltas) => exchange.process_order_book_deltas(deltas),
1024 Data::Quote(quote) => exchange.process_quote_tick(quote),
1025 Data::Trade(trade) => exchange.process_trade_tick(trade),
1026 Data::Bar(bar) => exchange.process_bar(*bar),
1027 Data::InstrumentStatus(status) => exchange.process_instrument_status(*status),
1028 Data::InstrumentClose(close) => exchange.process_instrument_close(*close),
1029 Data::Depth10(depth) => exchange.process_order_book_depth10(depth),
1030 Data::MarkPriceUpdate(_) | Data::IndexPriceUpdate(_) | Data::Custom(_) => {
1031 unreachable!("filtered by early return above")
1032 }
1033 }
1034 } else {
1035 log::warn!("No exchange found for venue {venue}, data not routed");
1036 }
1037 }
1038
1039 fn advance_time_impl(&mut self, ts_now: UnixNanos, clocks: &[Rc<RefCell<dyn Clock>>]) {
1040 for clock in clocks {
1042 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1043 }
1044
1045 let ts_before = if ts_now.as_u64() > 0 {
1047 UnixNanos::from(ts_now.as_u64() - 1)
1048 } else {
1049 UnixNanos::default()
1050 };
1051
1052 let mut ts_last: Option<UnixNanos> = None;
1053
1054 while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_before) {
1055 let ts_event = handler.event.ts_event;
1056
1057 if let Some(ts) = ts_last
1059 && ts != ts_event
1060 {
1061 self.settle_venues(ts);
1062 self.run_venue_modules(ts);
1063 }
1064
1065 ts_last = Some(ts_event);
1066 Self::set_all_clocks_time(clocks, ts_event);
1067 logging_clock_set_static_time(ts_event.as_u64());
1068
1069 handler.run();
1070 self.drain_command_queues();
1071
1072 for clock in clocks {
1074 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1075 }
1076 }
1077
1078 if let Some(ts) = ts_last {
1080 self.settle_venues(ts);
1081 self.run_venue_modules(ts);
1082 }
1083
1084 Self::set_all_clocks_time(clocks, ts_now);
1085 logging_clock_set_static_time(ts_now.as_u64());
1086 }
1087
1088 fn flush_accumulator_events(&mut self, clocks: &[Rc<RefCell<dyn Clock>>], ts_now: UnixNanos) {
1089 for clock in clocks {
1090 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1091 }
1092
1093 let mut ts_last: Option<UnixNanos> = None;
1094
1095 while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_now) {
1096 let ts_event = handler.event.ts_event;
1097
1098 if let Some(ts) = ts_last
1100 && ts != ts_event
1101 {
1102 self.settle_venues(ts);
1103 self.run_venue_modules(ts);
1104 }
1105
1106 ts_last = Some(ts_event);
1107 Self::set_all_clocks_time(clocks, ts_event);
1108 logging_clock_set_static_time(ts_event.as_u64());
1109
1110 handler.run();
1111 self.drain_command_queues();
1112
1113 for clock in clocks {
1115 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1116 }
1117 }
1118
1119 if let Some(ts) = ts_last {
1121 self.settle_venues(ts);
1122 self.run_venue_modules(ts);
1123 }
1124 }
1125
1126 fn process_next_timer(&mut self, clocks: &[Rc<RefCell<dyn Clock>>]) -> bool {
1127 self.flush_accumulator_events(clocks, self.last_ns);
1128
1129 let mut min_next_time: Option<UnixNanos> = None;
1131
1132 for clock in clocks {
1133 let clock_ref = clock.borrow();
1134 for name in clock_ref.timer_names() {
1135 if let Some(next_time) = clock_ref.next_time_ns(name)
1136 && next_time > self.last_ns
1137 {
1138 min_next_time = Some(match min_next_time {
1139 Some(current_min) => next_time.min(current_min),
1140 None => next_time,
1141 });
1142 }
1143 }
1144 }
1145
1146 match min_next_time {
1147 None => true,
1148 Some(t) if t > self.end_ns => true,
1149 Some(t) => {
1150 self.last_ns = t;
1151 self.flush_accumulator_events(clocks, t);
1152 false
1153 }
1154 }
1155 }
1156
1157 fn collect_all_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
1158 let mut clocks = vec![self.kernel.clock.clone()];
1159 clocks.extend(self.kernel.trader.borrow().get_component_clocks());
1160 clocks
1161 }
1162
1163 fn settle_venues(&self, ts_now: UnixNanos) {
1164 for exchange in self.venues.values() {
1167 exchange.borrow().set_clock_time(ts_now);
1168 }
1169
1170 loop {
1176 let active_venues: Vec<Venue> = self
1177 .venues
1178 .iter()
1179 .filter(|(_, ex)| ex.borrow().has_pending_commands(ts_now))
1180 .map(|(id, _)| *id)
1181 .collect();
1182
1183 if active_venues.is_empty() {
1184 break;
1185 }
1186
1187 for venue_id in &active_venues {
1188 self.venues[venue_id].borrow_mut().process(ts_now);
1189 }
1190 self.drain_command_queues();
1191
1192 for venue_id in &active_venues {
1193 self.venues[venue_id]
1194 .borrow_mut()
1195 .iterate_matching_engines(ts_now);
1196 }
1197
1198 self.drain_command_queues();
1201 }
1202 }
1203
1204 fn run_venue_modules(&mut self, ts_now: UnixNanos) {
1205 if self.last_module_ns == Some(ts_now) {
1206 return;
1207 }
1208 self.last_module_ns = Some(ts_now);
1209
1210 self.drain_command_queues();
1212 self.settle_venues(ts_now);
1213
1214 for exchange in self.venues.values() {
1215 exchange.borrow_mut().process_modules(ts_now);
1216 }
1217
1218 self.drain_command_queues();
1220 self.settle_venues(ts_now);
1221 }
1222
1223 fn drain_exec_client_events(&self) {
1224 for client in &self.exec_clients {
1225 client.drain_queued_events();
1226 }
1227 }
1228
1229 fn drain_command_queues(&self) {
1230 loop {
1234 drain_trading_cmd_queue();
1235 drain_data_cmd_queue();
1236 self.drain_exec_client_events();
1237
1238 if trading_cmd_queue_is_empty() && data_cmd_queue_is_empty() {
1239 break;
1240 }
1241 }
1242 }
1243
1244 fn init_command_senders() {
1245 replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
1246 replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
1247 }
1248
1249 fn advance_clock_on_accumulator(
1250 accumulator: &mut TimeEventAccumulator,
1251 clock: &Rc<RefCell<dyn Clock>>,
1252 to_time_ns: UnixNanos,
1253 set_time: bool,
1254 ) {
1255 let mut clock_ref = clock.borrow_mut();
1256 let test_clock = clock_ref
1257 .as_any_mut()
1258 .downcast_mut::<TestClock>()
1259 .expect("BacktestEngine requires TestClock");
1260 accumulator.advance_clock(test_clock, to_time_ns, set_time);
1261 }
1262
1263 fn set_all_clocks_time(clocks: &[Rc<RefCell<dyn Clock>>], ts: UnixNanos) {
1264 for clock in clocks {
1265 let mut clock_ref = clock.borrow_mut();
1266 let test_clock = clock_ref
1267 .as_any_mut()
1268 .downcast_mut::<TestClock>()
1269 .expect("BacktestEngine requires TestClock");
1270 test_clock.set_time(ts);
1271 }
1272 }
1273
1274 #[rustfmt::skip]
1275 fn log_pre_run(&self) {
1276 log_info!("=================================================================", color = LogColor::Cyan);
1277 log_info!(" BACKTEST PRE-RUN", color = LogColor::Cyan);
1278 log_info!("=================================================================", color = LogColor::Cyan);
1279
1280 let cache = self.kernel.cache.borrow();
1281 for exchange in self.venues.values() {
1282 let ex = exchange.borrow();
1283 log_info!("=================================================================", color = LogColor::Cyan);
1284 log::info!(" SimulatedVenue {} ({})", ex.id, ex.account_type);
1285 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1286
1287 if let Some(account) = cache.account_for_venue(&ex.id) {
1288 log::info!("Balances starting:");
1289 let account_ref: &dyn Account = match account {
1290 AccountAny::Margin(margin) => margin,
1291 AccountAny::Cash(cash) => cash,
1292 AccountAny::Betting(betting) => betting,
1293 };
1294
1295 for balance in account_ref.starting_balances().values() {
1296 log::info!(" {balance}");
1297 }
1298 }
1299 }
1300
1301 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1302 }
1303
1304 #[rustfmt::skip]
1305 fn log_run(&self) {
1306 let config_id = self.run_config_id.as_deref().unwrap_or("None");
1307 let id = format_optional_uuid(self.run_id.as_ref());
1308 let start = format_optional_nanos(self.backtest_start);
1309
1310 log_info!("=================================================================", color = LogColor::Cyan);
1311 log_info!(" BACKTEST RUN", color = LogColor::Cyan);
1312 log_info!("=================================================================", color = LogColor::Cyan);
1313 log::info!("Run config ID: {config_id}");
1314 log::info!("Run ID: {id}");
1315 log::info!("Backtest start: {start}");
1316 log::info!("Data elements: {}", self.data_len);
1317 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1318 }
1319
1320 #[rustfmt::skip]
1321 fn log_post_run(&self) {
1322 let cache = self.kernel.cache.borrow();
1323 let orders = cache.orders(None, None, None, None, None);
1324 let total_events: usize = orders.iter().map(|o| o.event_count()).sum();
1325 let total_orders = orders.len();
1326 let positions = cache.positions(None, None, None, None, None);
1327 let total_positions = positions.len();
1328
1329 let config_id = self.run_config_id.as_deref().unwrap_or("None");
1330 let id = format_optional_uuid(self.run_id.as_ref());
1331 let started = format_optional_nanos(self.run_started);
1332 let finished = format_optional_nanos(self.run_finished);
1333 let elapsed = format_optional_duration(self.run_started, self.run_finished);
1334 let bt_start = format_optional_nanos(self.backtest_start);
1335 let bt_end = format_optional_nanos(self.backtest_end);
1336 let bt_range = format_optional_duration(self.backtest_start, self.backtest_end);
1337 let iterations = self.iteration.separate_with_underscores();
1338 let events = total_events.separate_with_underscores();
1339 let num_orders = total_orders.separate_with_underscores();
1340 let num_positions = total_positions.separate_with_underscores();
1341
1342 log_info!("=================================================================", color = LogColor::Cyan);
1343 log_info!(" BACKTEST POST-RUN", color = LogColor::Cyan);
1344 log_info!("=================================================================", color = LogColor::Cyan);
1345 log::info!("Run config ID: {config_id}");
1346 log::info!("Run ID: {id}");
1347 log::info!("Run started: {started}");
1348 log::info!("Run finished: {finished}");
1349 log::info!("Elapsed time: {elapsed}");
1350 log::info!("Backtest start: {bt_start}");
1351 log::info!("Backtest end: {bt_end}");
1352 log::info!("Backtest range: {bt_range}");
1353 log::info!("Iterations: {iterations}");
1354 log::info!("Total events: {events}");
1355 log::info!("Total orders: {num_orders}");
1356 log::info!("Total positions: {num_positions}");
1357
1358 if !self.config.run_analysis {
1359 return;
1360 }
1361
1362 let analyzer = self.build_analyzer(&cache, &positions);
1363 log_portfolio_performance(&analyzer);
1364 }
1365
1366 pub fn add_data_client_if_not_exists(&mut self, client_id: ClientId) {
1368 if self
1369 .kernel
1370 .data_engine
1371 .borrow()
1372 .registered_clients()
1373 .contains(&client_id)
1374 {
1375 return;
1376 }
1377
1378 let venue = Venue::from(client_id.as_str());
1379 let backtest_client = BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1380 let data_client_adapter = DataClientAdapter::new(
1381 backtest_client.client_id,
1382 None,
1383 false,
1384 false,
1385 Box::new(backtest_client),
1386 );
1387
1388 self.kernel
1389 .data_engine
1390 .borrow_mut()
1391 .register_client(data_client_adapter, None);
1392 }
1393
1394 pub fn add_market_data_client_if_not_exists(&mut self, venue: Venue) {
1396 let client_id = ClientId::from(venue.as_str());
1397
1398 if !self
1399 .kernel
1400 .data_engine
1401 .borrow()
1402 .registered_clients()
1403 .contains(&client_id)
1404 {
1405 let backtest_client =
1406 BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1407 let data_client_adapter = DataClientAdapter::new(
1408 client_id,
1409 Some(venue),
1410 false,
1411 false,
1412 Box::new(backtest_client),
1413 );
1414 self.kernel
1415 .data_engine
1416 .borrow_mut()
1417 .register_client(data_client_adapter, Some(venue));
1418 }
1419 }
1420}
1421
1422fn format_optional_nanos(nanos: Option<UnixNanos>) -> String {
1423 nanos.map_or("None".to_string(), unix_nanos_to_iso8601)
1424}
1425
1426fn format_optional_uuid(uuid: Option<&UUID4>) -> String {
1427 uuid.map_or("None".to_string(), |id| id.to_string())
1428}
1429
1430fn format_optional_duration(start: Option<UnixNanos>, end: Option<UnixNanos>) -> String {
1431 match (start, end) {
1432 (Some(s), Some(e)) => {
1433 let delta = e.to_datetime_utc() - s.to_datetime_utc();
1434 let days = delta.num_days().abs();
1435 let hours = delta.num_hours().abs() % 24;
1436 let minutes = delta.num_minutes().abs() % 60;
1437 let seconds = delta.num_seconds().abs() % 60;
1438 let micros = delta.subsec_nanos().unsigned_abs() / 1_000;
1439 format!("{days} days {hours:02}:{minutes:02}:{seconds:02}.{micros:06}")
1440 }
1441 _ => "None".to_string(),
1442 }
1443}
1444
1445#[rustfmt::skip]
1446fn log_portfolio_performance(analyzer: &PortfolioAnalyzer) {
1447 log_info!("=================================================================", color = LogColor::Cyan);
1448 log_info!(" PORTFOLIO PERFORMANCE", color = LogColor::Cyan);
1449 log_info!("=================================================================", color = LogColor::Cyan);
1450
1451 for currency in analyzer.currencies() {
1452 log::info!(" PnL Statistics ({})", currency.code);
1453 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1454
1455 if let Ok(pnl_lines) = analyzer.get_stats_pnls_formatted(Some(currency), None) {
1456 for line in &pnl_lines {
1457 log::info!("{line}");
1458 }
1459 }
1460
1461 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1462 }
1463
1464 log::info!(" Returns Statistics");
1465 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1466
1467 for line in &analyzer.get_stats_returns_formatted() {
1468 log::info!("{line}");
1469 }
1470 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1471
1472 log::info!(" General Statistics");
1473 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1474
1475 for line in &analyzer.get_stats_general_formatted() {
1476 log::info!("{line}");
1477 }
1478 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1479}
1480
1481#[cfg(test)]
1482mod tests {
1483 use nautilus_model::{
1484 data::{Data, InstrumentStatus},
1485 enums::{AccountType, BookType, MarketStatus, MarketStatusAction, OmsType},
1486 identifiers::Venue,
1487 instruments::{
1488 CryptoPerpetual, Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt,
1489 },
1490 types::Money,
1491 };
1492 use rstest::*;
1493
1494 use super::*;
1495
1496 fn create_engine() -> BacktestEngine {
1497 let mut engine = BacktestEngine::new(BacktestEngineConfig::default()).unwrap();
1498 let venue_config = SimulatedVenueConfig::builder()
1499 .venue(Venue::from("BINANCE"))
1500 .oms_type(OmsType::Netting)
1501 .account_type(AccountType::Margin)
1502 .book_type(BookType::L1_MBP)
1503 .starting_balances(vec![Money::from("1_000_000 USDT")])
1504 .build();
1505 engine.add_venue(venue_config).unwrap();
1506 engine
1507 }
1508
1509 #[rstest]
1510 #[case(None)]
1511 #[case(Some(true))]
1512 #[case(Some(false))]
1513 fn test_new_forces_drop_instruments_on_reset_false(
1514 crypto_perpetual_ethusdt: CryptoPerpetual,
1515 #[case] user_value: Option<bool>,
1516 ) {
1517 use nautilus_common::cache::CacheConfig;
1518
1519 let config = match user_value {
1520 None => BacktestEngineConfig::builder().build(),
1521 Some(value) => BacktestEngineConfig::builder()
1522 .cache(
1523 CacheConfig::builder()
1524 .drop_instruments_on_reset(value)
1525 .build(),
1526 )
1527 .build(),
1528 };
1529 let mut engine = BacktestEngine::new(config).unwrap();
1530
1531 let venue_config = SimulatedVenueConfig::builder()
1532 .venue(Venue::from("BINANCE"))
1533 .oms_type(OmsType::Netting)
1534 .account_type(AccountType::Margin)
1535 .book_type(BookType::L1_MBP)
1536 .starting_balances(vec![Money::from("1_000_000 USDT")])
1537 .build();
1538 engine.add_venue(venue_config).unwrap();
1539
1540 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1541 let instrument_id = instrument.id();
1542 engine.add_instrument(&instrument).unwrap();
1543
1544 engine.reset();
1545
1546 assert!(
1547 engine
1548 .kernel()
1549 .cache
1550 .borrow()
1551 .instrument(&instrument_id)
1552 .is_some(),
1553 "instrument must survive engine.reset(); user-supplied \
1554 drop_instruments_on_reset={user_value:?} must not leak through",
1555 );
1556 }
1557
1558 #[rstest]
1559 fn test_route_data_to_exchange_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1560 let mut engine = create_engine();
1561 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1562 let instrument_id = instrument.id();
1563 engine.add_instrument(&instrument).unwrap();
1564
1565 let status = InstrumentStatus::new(
1566 instrument_id,
1567 MarketStatusAction::Close,
1568 UnixNanos::from(1),
1569 UnixNanos::from(1),
1570 None,
1571 None,
1572 None,
1573 None,
1574 None,
1575 );
1576
1577 engine.route_data_to_exchange(&Data::InstrumentStatus(status));
1578
1579 let exchange = engine.venues.get(&instrument_id.venue).unwrap().borrow();
1580 let market_status = exchange
1581 .get_matching_engine(&instrument_id)
1582 .unwrap()
1583 .market_status;
1584 assert_eq!(market_status, MarketStatus::Closed);
1585 }
1586}