Skip to main content

nautilus_backtest/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The core `BacktestEngine` for backtesting on historical data.
17
18use std::{any::Any, cell::RefCell, fmt::Debug, rc::Rc, sync::Arc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23    actor::DataActor,
24    cache::Cache,
25    clock::{Clock, TestClock},
26    component::Component,
27    enums::LogColor,
28    log_info,
29    logging::{
30        logging_clock_set_realtime_mode, logging_clock_set_static_mode,
31        logging_clock_set_static_time,
32    },
33    runner::{
34        SyncDataCommandSender, SyncTradingCommandSender, data_cmd_queue_is_empty,
35        drain_data_cmd_queue, drain_trading_cmd_queue, replace_data_cmd_sender,
36        replace_exec_cmd_sender, trading_cmd_queue_is_empty,
37    },
38};
39use nautilus_core::{
40    UUID4, UnixNanos, datetime::unix_nanos_to_iso8601, string::formatting::Separable,
41};
42use nautilus_data::client::DataClientAdapter;
43use nautilus_execution::models::fill::FillModelAny;
44use nautilus_model::{
45    accounts::{Account, AccountAny},
46    data::{Data, HasTsInit},
47    enums::{AccountType, AggregationSource, BookType},
48    identifiers::{AccountId, ClientId, InstrumentId, TraderId, Venue},
49    instruments::{Instrument, InstrumentAny},
50    orders::Order,
51    position::Position,
52    types::Price,
53};
54use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
55use nautilus_trading::{ExecutionAlgorithm, strategy::Strategy};
56
57use crate::{
58    accumulator::TimeEventAccumulator,
59    config::{BacktestEngineConfig, SimulatedVenueConfig},
60    data_client::BacktestDataClient,
61    data_iterator::BacktestDataIterator,
62    exchange::SimulatedExchange,
63    execution_client::BacktestExecutionClient,
64    result::BacktestResult,
65};
66
67/// Core backtesting engine for running event-driven strategy backtests on historical data.
68///
69/// The `BacktestEngine` provides a high-fidelity simulation environment that processes
70/// historical market data chronologically through an event-driven architecture. It maintains
71/// simulated exchanges with realistic order matching and execution, allowing strategies
72/// to be tested exactly as they would run in live trading:
73///
74/// - Event-driven data replay with configurable latency models.
75/// - Multi-venue and multi-asset support.
76/// - Realistic order matching and execution simulation.
77/// - Strategy and portfolio performance analysis.
78/// - Transition from backtesting to live trading.
79pub struct BacktestEngine {
80    instance_id: UUID4,
81    config: BacktestEngineConfig,
82    kernel: NautilusKernel,
83    accumulator: TimeEventAccumulator,
84    run_config_id: Option<String>,
85    run_id: Option<UUID4>,
86    venues: AHashMap<Venue, Rc<RefCell<SimulatedExchange>>>,
87    exec_clients: Vec<BacktestExecutionClient>,
88    has_data: AHashSet<InstrumentId>,
89    has_book_data: AHashSet<InstrumentId>,
90    data_iterator: BacktestDataIterator,
91    data_len: usize,
92    data_stream_counter: usize,
93    ts_first: Option<UnixNanos>,
94    ts_last_data: Option<UnixNanos>,
95    sorted: bool,
96    iteration: usize,
97    force_stop: bool,
98    last_ns: UnixNanos,
99    last_module_ns: Option<UnixNanos>,
100    end_ns: UnixNanos,
101    run_started: Option<UnixNanos>,
102    run_finished: Option<UnixNanos>,
103    backtest_start: Option<UnixNanos>,
104    backtest_end: Option<UnixNanos>,
105}
106
107impl Debug for BacktestEngine {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        f.debug_struct(stringify!(BacktestEngine))
110            .field("instance_id", &self.instance_id)
111            .field("run_config_id", &self.run_config_id)
112            .field("run_id", &self.run_id)
113            .finish()
114    }
115}
116
117impl BacktestEngine {
118    /// Create a new [`BacktestEngine`] instance.
119    ///
120    /// # Errors
121    ///
122    /// Returns an error if the core `NautilusKernel` fails to initialize.
123    pub fn new(mut config: BacktestEngineConfig) -> anyhow::Result<Self> {
124        // The engine does not replay `add_instrument` on reset, so reruns rely
125        // on the cache retaining instruments regardless of the caller's config.
126        let mut cache_config = config.cache.unwrap_or_default();
127        cache_config.drop_instruments_on_reset = false;
128        config.cache = Some(cache_config);
129        let kernel = NautilusKernel::new("BacktestEngine".to_string(), config.clone())?;
130        Ok(Self {
131            instance_id: kernel.instance_id,
132            config,
133            accumulator: TimeEventAccumulator::new(),
134            kernel,
135            run_config_id: None,
136            run_id: None,
137            venues: AHashMap::new(),
138            exec_clients: Vec::new(),
139            has_data: AHashSet::new(),
140            has_book_data: AHashSet::new(),
141            data_iterator: BacktestDataIterator::new(),
142            data_len: 0,
143            data_stream_counter: 0,
144            ts_first: None,
145            ts_last_data: None,
146            sorted: true,
147            iteration: 0,
148            force_stop: false,
149            last_ns: UnixNanos::default(),
150            last_module_ns: None,
151            end_ns: UnixNanos::default(),
152            run_started: None,
153            run_finished: None,
154            backtest_start: None,
155            backtest_end: None,
156        })
157    }
158
159    /// Returns a reference to the underlying kernel.
160    #[must_use]
161    pub const fn kernel(&self) -> &NautilusKernel {
162        &self.kernel
163    }
164
165    /// Returns a mutable reference to the underlying kernel.
166    pub fn kernel_mut(&mut self) -> &mut NautilusKernel {
167        &mut self.kernel
168    }
169
170    /// Returns the trader ID for this engine.
171    #[must_use]
172    pub fn trader_id(&self) -> TraderId {
173        self.kernel.trader_id()
174    }
175
176    /// Returns the machine ID for this engine.
177    #[must_use]
178    pub fn machine_id(&self) -> &str {
179        self.kernel.machine_id()
180    }
181
182    /// Returns the unique instance ID for this engine.
183    #[must_use]
184    pub fn instance_id(&self) -> UUID4 {
185        self.instance_id
186    }
187
188    /// Returns the current iteration count.
189    #[must_use]
190    pub fn iteration(&self) -> usize {
191        self.iteration
192    }
193
194    /// Returns the last run config ID, if any.
195    #[must_use]
196    pub fn run_config_id(&self) -> Option<&str> {
197        self.run_config_id.as_deref()
198    }
199
200    /// Returns the last run ID, if any.
201    #[must_use]
202    pub const fn run_id(&self) -> Option<UUID4> {
203        self.run_id
204    }
205
206    /// Returns when the last run started, if any.
207    #[must_use]
208    pub const fn run_started(&self) -> Option<UnixNanos> {
209        self.run_started
210    }
211
212    /// Returns when the last run finished, if any.
213    #[must_use]
214    pub const fn run_finished(&self) -> Option<UnixNanos> {
215        self.run_finished
216    }
217
218    /// Returns the last backtest range start, if any.
219    #[must_use]
220    pub const fn backtest_start(&self) -> Option<UnixNanos> {
221        self.backtest_start
222    }
223
224    /// Returns the last backtest range end, if any.
225    #[must_use]
226    pub const fn backtest_end(&self) -> Option<UnixNanos> {
227        self.backtest_end
228    }
229
230    /// Returns the list of registered venue identifiers.
231    #[must_use]
232    pub fn list_venues(&self) -> Vec<Venue> {
233        self.venues.keys().copied().collect()
234    }
235
236    /// # Errors
237    ///
238    /// Returns an error if initializing the simulated exchange for the venue fails.
239    pub fn add_venue(&mut self, config: SimulatedVenueConfig) -> anyhow::Result<()> {
240        // `routing` and `frozen_account` flow to the exec client, so capture
241        // them before the config is consumed by the exchange constructor.
242        let venue = config.venue;
243        let routing = Some(config.routing);
244        let frozen_account = Some(config.frozen_account);
245
246        let exchange =
247            SimulatedExchange::new(config, self.kernel.cache.clone(), self.kernel.clock.clone())?;
248        let exchange = Rc::new(RefCell::new(exchange));
249        self.venues.insert(venue, exchange.clone());
250
251        let account_id = AccountId::from(format!("{venue}-001").as_str());
252
253        let exec_client = BacktestExecutionClient::new(
254            self.config.trader_id(),
255            account_id,
256            &exchange,
257            self.kernel.cache.clone(),
258            self.kernel.clock.clone(),
259            routing,
260            frozen_account,
261        );
262
263        exchange
264            .borrow_mut()
265            .register_client(Rc::new(exec_client.clone()));
266
267        self.exec_clients.push(exec_client.clone());
268
269        self.kernel
270            .exec_engine
271            .borrow_mut()
272            .register_client(Box::new(exec_client))?;
273
274        log::info!("Adding exchange {venue} to engine");
275
276        Ok(())
277    }
278
279    /// Sets the settlement price for the specified venue instrument.
280    ///
281    /// # Errors
282    ///
283    /// Returns an error if the venue has not been added to the engine.
284    pub fn set_settlement_price(
285        &mut self,
286        venue: Venue,
287        instrument_id: InstrumentId,
288        price: Price,
289    ) -> anyhow::Result<()> {
290        let exchange = self
291            .venues
292            .get_mut(&venue)
293            .ok_or_else(|| anyhow::anyhow!("Unknown venue {venue}"))?;
294        exchange
295            .borrow_mut()
296            .set_settlement_price(instrument_id, price);
297        Ok(())
298    }
299
300    /// Changes the fill model for the specified venue.
301    pub fn change_fill_model(&mut self, venue: Venue, fill_model: FillModelAny) {
302        if let Some(exchange) = self.venues.get_mut(&venue) {
303            exchange.borrow_mut().set_fill_model(fill_model);
304        } else {
305            log::warn!(
306                "BacktestEngine::change_fill_model called for unknown venue {venue}, ignoring"
307            );
308        }
309    }
310
311    /// Adds an instrument to the backtest engine for the specified venue.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if:
316    /// - The instrument's associated venue has not been added via `add_venue`.
317    /// - Attempting to add a `CurrencyPair` instrument for a single-currency CASH account.
318    ///
319    pub fn add_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
320        let instrument_id = instrument.id();
321        if let Some(exchange) = self.venues.get_mut(&instrument.id().venue) {
322            if matches!(
323                instrument,
324                InstrumentAny::CurrencyPair(_) | InstrumentAny::TokenizedAsset(_)
325            ) && exchange.borrow().account_type != AccountType::Margin
326                && exchange.borrow().base_currency.is_some()
327            {
328                anyhow::bail!(
329                    "Cannot add a multi-currency spot instrument {instrument_id} for a venue with a single-currency CASH account"
330                )
331            }
332            exchange.borrow_mut().add_instrument(instrument.clone())?;
333        } else {
334            anyhow::bail!(
335                "Cannot add an `Instrument` object without first adding its associated venue {}",
336                instrument.id().venue
337            )
338        }
339
340        self.add_market_data_client_if_not_exists(instrument.id().venue);
341
342        self.kernel
343            .data_engine
344            .borrow_mut()
345            .process(instrument as &dyn Any);
346        log::info!(
347            "Added instrument {} to exchange {}",
348            instrument_id,
349            instrument_id.venue
350        );
351        Ok(())
352    }
353
354    /// Adds market data to the engine for replay during the backtest run.
355    ///
356    /// # Errors
357    ///
358    /// Returns an error if:
359    /// - `data` is empty.
360    /// - `validate` is `true` and the instrument for the first element has not been
361    ///   added to the cache via [`add_instrument`](Self::add_instrument).
362    /// - `validate` is `true` and the first element is a [`Data::Bar`] whose
363    ///   `aggregation_source` is not [`AggregationSource::External`].
364    pub fn add_data(
365        &mut self,
366        data: Vec<Data>,
367        _client_id: Option<ClientId>,
368        validate: bool,
369        sort: bool,
370    ) -> anyhow::Result<()> {
371        anyhow::ensure!(!data.is_empty(), "data was empty");
372
373        let count = data.len();
374        let mut to_add = data;
375
376        if sort {
377            to_add.sort_by_key(HasTsInit::ts_init);
378        }
379
380        if validate {
381            // Mirror Cython: validate against the first element only and assume the
382            // batch is homogeneous (documented contract on add_data).
383            let first = &to_add[0];
384            let first_instrument_id = first.instrument_id();
385            anyhow::ensure!(
386                self.kernel
387                    .cache
388                    .borrow()
389                    .instrument(&first_instrument_id)
390                    .is_some(),
391                "Instrument {first_instrument_id} for the given data not found in the cache. \
392                 Add the instrument through `add_instrument()` prior to adding related data."
393            );
394
395            if let Data::Bar(bar) = first {
396                anyhow::ensure!(
397                    bar.bar_type.aggregation_source() == AggregationSource::External,
398                    "bar_type.aggregation_source must be External, was {:?}",
399                    bar.bar_type.aggregation_source(),
400                );
401            }
402        }
403
404        // Track has_data / has_book_data unconditionally so the depth-vs-data
405        // run-time check still fires for callers that pass validate=false
406        // (e.g. node.rs run_oneshot loading from a catalog). Time bounds are
407        // also tracked here so start/end defaults are correct even when the
408        // batch was added with sort=false.
409        let mut batch_min_ts: Option<UnixNanos> = None;
410        let mut batch_max_ts: Option<UnixNanos> = None;
411
412        for item in &to_add {
413            let instr_id = item.instrument_id();
414            self.has_data.insert(instr_id);
415
416            if item.is_order_book_data() {
417                self.has_book_data.insert(instr_id);
418            }
419
420            self.add_market_data_client_if_not_exists(instr_id.venue);
421
422            let ts = item.ts_init();
423            batch_min_ts = Some(batch_min_ts.map_or(ts, |cur| cur.min(ts)));
424            batch_max_ts = Some(batch_max_ts.map_or(ts, |cur| cur.max(ts)));
425        }
426
427        if let Some(ts) = batch_min_ts
428            && self.ts_first.is_none_or(|t| ts < t)
429        {
430            self.ts_first = Some(ts);
431        }
432
433        if let Some(ts) = batch_max_ts
434            && self.ts_last_data.is_none_or(|t| ts > t)
435        {
436            self.ts_last_data = Some(ts);
437        }
438
439        self.data_len += count;
440        let stream_name = format!("backtest_data_{}", self.data_stream_counter);
441        self.data_stream_counter += 1;
442        self.data_iterator.add_data(&stream_name, to_add, true);
443
444        self.sorted = sort;
445
446        log::info!(
447            "Added {count} data element{} to BacktestEngine ({} total)",
448            if count == 1 { "" } else { "s" },
449            self.data_len,
450        );
451
452        Ok(())
453    }
454
455    /// Adds a strategy to the backtest engine.
456    ///
457    /// # Errors
458    ///
459    /// Returns an error if the strategy is already registered or the trader is in an invalid
460    /// state for strategy registration.
461    pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
462    where
463        T: Strategy + Component + Debug + 'static,
464    {
465        self.kernel.trader.borrow_mut().add_strategy(strategy)
466    }
467
468    /// Adds the given strategies to the backtest engine. Stops at the first error.
469    ///
470    /// # Errors
471    ///
472    /// Returns an error if any strategy fails to register; preceding strategies remain registered.
473    pub fn add_strategies<T>(&mut self, strategies: Vec<T>) -> anyhow::Result<()>
474    where
475        T: Strategy + Component + Debug + 'static,
476    {
477        for strategy in strategies {
478            self.add_strategy(strategy)?;
479        }
480        Ok(())
481    }
482
483    /// Adds an actor to the backtest engine.
484    ///
485    /// # Errors
486    ///
487    /// Returns an error if the actor is already registered or the trader is in an invalid
488    /// state for actor registration.
489    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
490    where
491        T: DataActor + Component + Debug + 'static,
492    {
493        self.kernel.trader.borrow_mut().add_actor(actor)
494    }
495
496    /// Adds the given actors to the backtest engine. Stops at the first error.
497    ///
498    /// # Errors
499    ///
500    /// Returns an error if any actor fails to register; preceding actors remain registered.
501    pub fn add_actors<T>(&mut self, actors: Vec<T>) -> anyhow::Result<()>
502    where
503        T: DataActor + Component + Debug + 'static,
504    {
505        for actor in actors {
506            self.add_actor(actor)?;
507        }
508        Ok(())
509    }
510
511    /// Adds an execution algorithm to the backtest engine.
512    ///
513    /// # Errors
514    ///
515    /// Returns an error if the algorithm is already registered or the trader is running.
516    pub fn add_exec_algorithm<T>(&mut self, exec_algorithm: T) -> anyhow::Result<()>
517    where
518        T: ExecutionAlgorithm + Component + Debug + 'static,
519    {
520        self.kernel
521            .trader
522            .borrow_mut()
523            .add_exec_algorithm(exec_algorithm)
524    }
525
526    /// Adds the given execution algorithms to the backtest engine. Stops at the first error.
527    ///
528    /// # Errors
529    ///
530    /// Returns an error if any execution algorithm fails to register; preceding algorithms remain
531    /// registered.
532    pub fn add_exec_algorithms<T>(&mut self, exec_algorithms: Vec<T>) -> anyhow::Result<()>
533    where
534        T: ExecutionAlgorithm + Component + Debug + 'static,
535    {
536        for exec_algorithm in exec_algorithms {
537            self.add_exec_algorithm(exec_algorithm)?;
538        }
539        Ok(())
540    }
541
542    /// Run a backtest.
543    ///
544    /// Processes all data chronologically. When `streaming` is false (default),
545    /// finalizes the run via [`end`](Self::end). When `streaming` is true, the
546    /// run pauses without finalizing so additional data batches can be loaded.
547    /// Timer advancement stops at data exhaustion to avoid producing synthetic
548    /// events (e.g. zero-volume bars) past the current batch.
549    ///
550    /// Streaming workflow:
551    /// 1. Add initial data and strategies
552    /// 2. Loop: call `run(streaming=true)`, `clear_data()`, `add_data(next_batch)`
553    /// 3. After all batches: call `end()` to finalize
554    ///
555    /// # Errors
556    ///
557    /// Returns an error if the backtest encounters an unrecoverable state.
558    pub fn run(
559        &mut self,
560        start: Option<UnixNanos>,
561        end: Option<UnixNanos>,
562        run_config_id: Option<String>,
563        streaming: bool,
564    ) -> anyhow::Result<()> {
565        self.run_impl(start, end, run_config_id, streaming)?;
566
567        // Finalize on non-streaming runs, or when a shutdown was triggered
568        // at any point during the run (including the trailing settle, module,
569        // and flush callbacks that execute after the main data loop) so the
570        // trader and engines actually stop.
571        if !streaming || self.force_stop || self.kernel.is_shutdown_requested() {
572            self.end();
573        }
574
575        Ok(())
576    }
577
578    fn run_impl(
579        &mut self,
580        start: Option<UnixNanos>,
581        end: Option<UnixNanos>,
582        run_config_id: Option<String>,
583        streaming: bool,
584    ) -> anyhow::Result<()> {
585        anyhow::ensure!(
586            self.sorted,
587            "Data has been added but not sorted, call `engine.sort_data()` or use \
588             `engine.add_data(..., sort=true)` before running"
589        );
590
591        for exchange in self.venues.values() {
592            let exchange = exchange.borrow();
593            let book_type_has_depth = exchange.book_type() as u8 > BookType::L1_MBP as u8;
594            if !book_type_has_depth {
595                continue;
596            }
597
598            for instrument_id in exchange.instrument_ids() {
599                let has_data = self.has_data.contains(instrument_id);
600                let missing_book_data = !self.has_book_data.contains(instrument_id);
601                if has_data && missing_book_data {
602                    anyhow::bail!(
603                        "No order book data found for instrument '{instrument_id}' when `book_type` \
604                         is '{:?}'. Set the venue `book_type` to 'L1_MBP' (for top-of-book data \
605                         like quotes, trades, and bars) or provide order book data for this \
606                         instrument.",
607                        exchange.book_type()
608                    );
609                }
610            }
611        }
612
613        // Determine time boundaries
614        let start_ns = start.unwrap_or_else(|| self.ts_first.unwrap_or_default());
615        let end_ns = end.unwrap_or_else(|| {
616            self.ts_last_data
617                .unwrap_or(UnixNanos::from(4_102_444_800_000_000_000u64))
618        });
619        anyhow::ensure!(start_ns <= end_ns, "start was > end");
620        self.end_ns = end_ns;
621        self.last_ns = start_ns;
622        self.last_module_ns = None;
623
624        // Set all component clocks to start
625        let clocks = self.collect_all_clocks();
626        Self::set_all_clocks_time(&clocks, start_ns);
627
628        // First-iteration initialization
629        if self.iteration == 0 {
630            self.run_config_id = run_config_id;
631            self.run_id = Some(UUID4::new());
632            self.run_started = Some(UnixNanos::from(std::time::SystemTime::now()));
633            self.backtest_start = Some(start_ns);
634
635            for exchange in self.venues.values() {
636                let mut ex = exchange.borrow_mut();
637                ex.initialize_account();
638                ex.load_open_orders();
639            }
640
641            // Re-set clocks after account init
642            Self::set_all_clocks_time(&clocks, start_ns);
643
644            // Reset force stop flag
645            self.force_stop = false;
646            self.kernel.reset_shutdown_flag();
647
648            // Initialize sync command senders (once per thread)
649            Self::init_command_senders();
650
651            // Set logging to static clock mode for deterministic timestamps
652            logging_clock_set_static_mode();
653            logging_clock_set_static_time(start_ns.as_u64());
654
655            // Start kernel (engines + trader init + clients)
656            self.kernel.start();
657            self.kernel.start_trader();
658
659            self.log_pre_run();
660        }
661
662        self.log_run();
663
664        // Skip data before start_ns
665        let mut data = self.data_iterator.next();
666        while let Some(ref d) = data {
667            if d.ts_init() >= start_ns {
668                break;
669            }
670            data = self.data_iterator.next();
671        }
672
673        // Initialize last_ns before first data point
674        if let Some(ref d) = data {
675            let ts = d.ts_init();
676            self.last_ns = if ts.as_u64() > 0 {
677                UnixNanos::from(ts.as_u64() - 1)
678            } else {
679                UnixNanos::default()
680            };
681        } else {
682            self.last_ns = start_ns;
683        }
684
685        loop {
686            if self.kernel.is_shutdown_requested() {
687                log::info!("Shutdown requested via ShutdownSystem, ending backtest");
688                self.force_stop = true;
689            }
690
691            if self.force_stop {
692                log::error!("Force stop triggered, ending backtest");
693                break;
694            }
695
696            if data.is_none() {
697                if streaming {
698                    // In streaming mode, don't advance timers past the
699                    // current batch. The next batch will provide more data
700                    // and timers will fire naturally as time advances.
701                    break;
702                }
703                let done = self.process_next_timer(&clocks);
704                data = self.data_iterator.next();
705                if data.is_none() && done {
706                    break;
707                }
708                continue;
709            }
710
711            let d = data.as_ref().unwrap();
712            let ts_init = d.ts_init();
713
714            if ts_init > end_ns {
715                break;
716            }
717
718            if ts_init > self.last_ns {
719                self.last_ns = ts_init;
720                self.advance_time_impl(ts_init, &clocks);
721            }
722
723            // Route data to exchange
724            self.route_data_to_exchange(d);
725
726            // Process through data engine (may trigger strategy callbacks
727            // which queue trading commands via the sync senders)
728            self.kernel.data_engine.borrow_mut().process_data(d.clone());
729
730            // Drain deferred commands, then process exchange queues
731            self.drain_command_queues();
732            self.settle_venues(ts_init);
733
734            let prev_last_ns = self.last_ns;
735            data = self.data_iterator.next();
736
737            // If timestamp changed (or exhausted), flush timers then run modules
738            if data.is_none() || data.as_ref().unwrap().ts_init() > prev_last_ns {
739                self.flush_accumulator_events(&clocks, prev_last_ns);
740                self.run_venue_modules(prev_last_ns);
741            }
742
743            self.iteration += 1;
744        }
745
746        // Process remaining exchange messages
747        let ts_now = self.kernel.clock.borrow().timestamp_ns();
748        self.settle_venues(ts_now);
749        self.run_venue_modules(ts_now);
750
751        // Flush remaining timer events. In streaming mode only flush to the
752        // last data timestamp to avoid advancing timers past the current batch.
753        // The final flush to end_ns happens in end() or a non-streaming run.
754        if streaming {
755            self.flush_accumulator_events(&clocks, self.last_ns);
756        } else {
757            self.flush_accumulator_events(&clocks, end_ns);
758        }
759
760        Ok(())
761    }
762
763    /// Manually end the backtest.
764    pub fn end(&mut self) {
765        // Flush remaining timer events to the backtest end boundary so that
766        // tail alerts/expiries scheduled after the last data point still fire.
767        // Must run before stopping engines since DataEngine::stop() cancels
768        // bar aggregator timers. When a shutdown was requested, cap the flush
769        // at the last processed timestamp so timers scheduled past the stop
770        // point do not fire extra callbacks after the graceful stop request.
771        if self.end_ns.as_u64() > 0 {
772            let clocks = self.collect_all_clocks();
773            let flush_ts = if self.force_stop || self.kernel.is_shutdown_requested() {
774                self.last_ns
775            } else {
776                self.end_ns
777            };
778
779            self.flush_accumulator_events(&clocks, flush_ts);
780        }
781
782        // Stop trader
783        self.kernel.stop_trader();
784
785        // Stop engines
786        self.kernel.data_engine.borrow_mut().stop();
787        self.kernel.risk_engine.borrow_mut().stop();
788        self.kernel.exec_engine.borrow_mut().stop();
789
790        // Process remaining exchange messages
791        let ts_now = self.kernel.clock.borrow().timestamp_ns();
792        self.settle_venues(ts_now);
793        self.run_venue_modules(ts_now);
794
795        self.run_finished = Some(UnixNanos::from(std::time::SystemTime::now()));
796        self.backtest_end = Some(self.kernel.clock.borrow().timestamp_ns());
797
798        // Switch logging back to realtime mode
799        logging_clock_set_realtime_mode();
800
801        self.log_post_run();
802    }
803
804    /// Reset the backtest engine.
805    ///
806    /// All stateful fields are reset to their initial value. Data and instruments
807    /// persist across resets to enable repeated runs with different strategies.
808    pub fn reset(&mut self) {
809        log::debug!("Resetting");
810
811        if self.kernel.trader.borrow().is_running() {
812            self.end();
813        }
814
815        // Stop and reset engines
816        self.kernel.data_engine.borrow_mut().stop();
817        self.kernel.data_engine.borrow_mut().reset();
818
819        self.kernel.exec_engine.borrow_mut().stop();
820        self.kernel.exec_engine.borrow_mut().reset();
821
822        self.kernel.risk_engine.borrow_mut().stop();
823        self.kernel.risk_engine.borrow_mut().reset();
824
825        // Reset trader
826        if let Err(e) = self.kernel.trader.borrow_mut().reset() {
827            log::error!("Error resetting trader: {e:?}");
828        }
829
830        // `exchange.reset()` re-emits a fresh account state event; the cache
831        // reset that follows drops it so the next run starts with the same
832        // event count as the first.
833        for exchange in self.venues.values() {
834            exchange.borrow_mut().reset();
835        }
836        self.kernel.cache.borrow_mut().reset();
837        self.kernel.portfolio.borrow_mut().reset();
838
839        // Clear run state
840        self.run_config_id = None;
841        self.run_id = None;
842        self.run_started = None;
843        self.run_finished = None;
844        self.backtest_start = None;
845        self.backtest_end = None;
846        self.iteration = 0;
847        self.force_stop = false;
848        self.last_ns = UnixNanos::default();
849        self.last_module_ns = None;
850        self.end_ns = UnixNanos::default();
851
852        self.accumulator.clear();
853
854        // Reset all iterator cursors to beginning (data persists)
855        self.data_iterator.reset_all_cursors();
856
857        log::info!("Reset");
858    }
859
860    /// Sort the engine's internal data stream by timestamp.
861    ///
862    /// Useful when data has been added with `sort=false` for batch performance,
863    /// then sorted once before running.
864    pub fn sort_data(&mut self) {
865        // Each `add_data` call creates its own stream; the iterator merges streams
866        // by `ts_init` across streams but does not re-sort within a stream. Mark
867        // the engine as sorted so `run` no longer rejects it.
868        self.sorted = true;
869        log::info!("Data sort requested (iterator merges streams by ts_init)");
870    }
871
872    /// Clear the engine's internal data stream. Does not clear instruments.
873    pub fn clear_data(&mut self) {
874        self.has_data.clear();
875        self.has_book_data.clear();
876        self.data_iterator = BacktestDataIterator::new();
877        self.data_len = 0;
878        self.data_stream_counter = 0;
879        self.ts_first = None;
880        self.ts_last_data = None;
881        self.sorted = true;
882    }
883
884    /// Clear all actors from the engine's internal trader.
885    ///
886    /// # Errors
887    ///
888    /// Returns an error if any actor fails to dispose.
889    pub fn clear_actors(&mut self) -> anyhow::Result<()> {
890        self.kernel.trader.borrow_mut().clear_actors()
891    }
892
893    /// Clear all trading strategies from the engine's internal trader.
894    ///
895    /// # Errors
896    ///
897    /// Returns an error if any strategy fails to dispose.
898    pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
899        self.kernel.trader.borrow_mut().clear_strategies()
900    }
901
902    /// Clear all execution algorithms from the engine's internal trader.
903    ///
904    /// # Errors
905    ///
906    /// Returns an error if any execution algorithm fails to dispose.
907    pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
908        self.kernel.trader.borrow_mut().clear_exec_algorithms()
909    }
910
911    /// Dispose of the backtest engine, releasing all resources.
912    pub fn dispose(&mut self) {
913        self.clear_data();
914        self.accumulator.clear();
915        self.kernel.dispose();
916    }
917
918    /// Return the backtest result from the last run.
919    #[must_use]
920    pub fn get_result(&self) -> BacktestResult {
921        let elapsed_time_secs = match (self.backtest_start, self.backtest_end) {
922            (Some(start), Some(end)) => {
923                (end.as_u64() as f64 - start.as_u64() as f64) / 1_000_000_000.0
924            }
925            _ => 0.0,
926        };
927
928        let cache = self.kernel.cache.borrow();
929        let orders = cache.orders(None, None, None, None, None);
930        let total_events: usize = orders.iter().map(|o| o.event_count()).sum();
931        let total_orders = orders.len();
932        let positions = cache.positions(None, None, None, None, None);
933        let total_positions = positions.len();
934
935        let analyzer = self.build_analyzer(&cache, &positions);
936        let mut stats_pnls = AHashMap::new();
937
938        for currency in analyzer.currencies() {
939            if let Ok(pnls) = analyzer.get_performance_stats_pnls(Some(currency), None) {
940                stats_pnls.insert(currency.code.to_string(), pnls);
941            }
942        }
943
944        let stats_returns = analyzer.get_performance_stats_returns();
945        let stats_general = analyzer.get_performance_stats_general();
946
947        BacktestResult {
948            trader_id: self.config.trader_id().to_string(),
949            machine_id: self.kernel.machine_id.clone(),
950            instance_id: self.instance_id,
951            run_config_id: self.run_config_id.clone(),
952            run_id: self.run_id,
953            run_started: self.run_started,
954            run_finished: self.run_finished,
955            backtest_start: self.backtest_start,
956            backtest_end: self.backtest_end,
957            elapsed_time_secs,
958            iterations: self.iteration,
959            total_events,
960            total_orders,
961            total_positions,
962            stats_pnls,
963            stats_returns,
964            stats_general,
965        }
966    }
967
968    fn build_analyzer(&self, cache: &Cache, positions: &[&Position]) -> PortfolioAnalyzer {
969        let mut analyzer = PortfolioAnalyzer::default();
970        let positions_owned: Vec<_> = positions.iter().map(|p| (*p).clone()).collect();
971        let mut snapshot_positions = Vec::new();
972
973        for position in positions {
974            snapshot_positions.extend(cache.position_snapshots(Some(&position.id), None));
975        }
976
977        // Aggregate starting and current balances across all venue accounts
978        for venue in self.venues.keys() {
979            if let Some(account) = cache.account_for_venue(venue) {
980                let account_ref: &dyn Account = match account {
981                    AccountAny::Margin(margin) => margin,
982                    AccountAny::Cash(cash) => cash,
983                    AccountAny::Betting(betting) => betting,
984                };
985
986                for (currency, money) in account_ref.starting_balances() {
987                    analyzer
988                        .account_balances_starting
989                        .entry(currency)
990                        .and_modify(|existing| *existing = *existing + money)
991                        .or_insert(money);
992                }
993
994                for (currency, money) in account_ref.balances_total() {
995                    analyzer
996                        .account_balances
997                        .entry(currency)
998                        .and_modify(|existing| *existing = *existing + money)
999                        .or_insert(money);
1000                }
1001            }
1002        }
1003
1004        analyzer.add_positions(&positions_owned);
1005        analyzer.add_positions(&snapshot_positions);
1006        analyzer
1007    }
1008
1009    fn route_data_to_exchange(&self, data: &Data) {
1010        if matches!(
1011            data,
1012            Data::MarkPriceUpdate(_) | Data::IndexPriceUpdate(_) | Data::Custom(_)
1013        ) {
1014            return;
1015        }
1016
1017        let venue = data.instrument_id().venue;
1018        if let Some(exchange) = self.venues.get(&venue) {
1019            let mut exchange = exchange.borrow_mut();
1020
1021            match data {
1022                Data::Delta(delta) => exchange.process_order_book_delta(*delta),
1023                Data::Deltas(deltas) => exchange.process_order_book_deltas(deltas),
1024                Data::Quote(quote) => exchange.process_quote_tick(quote),
1025                Data::Trade(trade) => exchange.process_trade_tick(trade),
1026                Data::Bar(bar) => exchange.process_bar(*bar),
1027                Data::InstrumentStatus(status) => exchange.process_instrument_status(*status),
1028                Data::InstrumentClose(close) => exchange.process_instrument_close(*close),
1029                Data::Depth10(depth) => exchange.process_order_book_depth10(depth),
1030                Data::MarkPriceUpdate(_) | Data::IndexPriceUpdate(_) | Data::Custom(_) => {
1031                    unreachable!("filtered by early return above")
1032                }
1033            }
1034        } else {
1035            log::warn!("No exchange found for venue {venue}, data not routed");
1036        }
1037    }
1038
1039    fn advance_time_impl(&mut self, ts_now: UnixNanos, clocks: &[Rc<RefCell<dyn Clock>>]) {
1040        // Advance all clocks to ts_now via accumulator
1041        for clock in clocks {
1042            Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1043        }
1044
1045        // Process events with ts_event < ts_now
1046        let ts_before = if ts_now.as_u64() > 0 {
1047            UnixNanos::from(ts_now.as_u64() - 1)
1048        } else {
1049            UnixNanos::default()
1050        };
1051
1052        let mut ts_last: Option<UnixNanos> = None;
1053
1054        while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_before) {
1055            let ts_event = handler.event.ts_event;
1056
1057            // Settle previous timestamp batch before advancing
1058            if let Some(ts) = ts_last
1059                && ts != ts_event
1060            {
1061                self.settle_venues(ts);
1062                self.run_venue_modules(ts);
1063            }
1064
1065            ts_last = Some(ts_event);
1066            Self::set_all_clocks_time(clocks, ts_event);
1067            logging_clock_set_static_time(ts_event.as_u64());
1068
1069            handler.run();
1070            self.drain_command_queues();
1071
1072            // Re-advance clocks to capture chained timers
1073            for clock in clocks {
1074                Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1075            }
1076        }
1077
1078        // Settle the last timestamp batch
1079        if let Some(ts) = ts_last {
1080            self.settle_venues(ts);
1081            self.run_venue_modules(ts);
1082        }
1083
1084        Self::set_all_clocks_time(clocks, ts_now);
1085        logging_clock_set_static_time(ts_now.as_u64());
1086    }
1087
1088    fn flush_accumulator_events(&mut self, clocks: &[Rc<RefCell<dyn Clock>>], ts_now: UnixNanos) {
1089        for clock in clocks {
1090            Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1091        }
1092
1093        let mut ts_last: Option<UnixNanos> = None;
1094
1095        while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_now) {
1096            let ts_event = handler.event.ts_event;
1097
1098            // Settle previous timestamp batch before advancing
1099            if let Some(ts) = ts_last
1100                && ts != ts_event
1101            {
1102                self.settle_venues(ts);
1103                self.run_venue_modules(ts);
1104            }
1105
1106            ts_last = Some(ts_event);
1107            Self::set_all_clocks_time(clocks, ts_event);
1108            logging_clock_set_static_time(ts_event.as_u64());
1109
1110            handler.run();
1111            self.drain_command_queues();
1112
1113            // Re-advance clocks to capture chained timers
1114            for clock in clocks {
1115                Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1116            }
1117        }
1118
1119        // Settle the last timestamp batch
1120        if let Some(ts) = ts_last {
1121            self.settle_venues(ts);
1122            self.run_venue_modules(ts);
1123        }
1124    }
1125
1126    fn process_next_timer(&mut self, clocks: &[Rc<RefCell<dyn Clock>>]) -> bool {
1127        self.flush_accumulator_events(clocks, self.last_ns);
1128
1129        // Find minimum next timer time across all component clocks
1130        let mut min_next_time: Option<UnixNanos> = None;
1131
1132        for clock in clocks {
1133            let clock_ref = clock.borrow();
1134            for name in clock_ref.timer_names() {
1135                if let Some(next_time) = clock_ref.next_time_ns(name)
1136                    && next_time > self.last_ns
1137                {
1138                    min_next_time = Some(match min_next_time {
1139                        Some(current_min) => next_time.min(current_min),
1140                        None => next_time,
1141                    });
1142                }
1143            }
1144        }
1145
1146        match min_next_time {
1147            None => true,
1148            Some(t) if t > self.end_ns => true,
1149            Some(t) => {
1150                self.last_ns = t;
1151                self.flush_accumulator_events(clocks, t);
1152                false
1153            }
1154        }
1155    }
1156
1157    fn collect_all_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
1158        let mut clocks = vec![self.kernel.clock.clone()];
1159        clocks.extend(self.kernel.trader.borrow().get_component_clocks());
1160        clocks
1161    }
1162
1163    fn settle_venues(&self, ts_now: UnixNanos) {
1164        // Advance venue clocks so modules and event generators see the
1165        // correct timestamp even when no commands are pending
1166        for exchange in self.venues.values() {
1167            exchange.borrow().set_clock_time(ts_now);
1168        }
1169
1170        // Drain commands then iterate matching engines to fill newly added
1171        // orders. Fills may enqueue further commands (e.g. hedge orders
1172        // submitted from on_order_filled), so loop until quiescent.
1173        // Only process and iterate venues that had pending commands each
1174        // pass, to avoid extra fill-model rolls on untouched venues.
1175        loop {
1176            let active_venues: Vec<Venue> = self
1177                .venues
1178                .iter()
1179                .filter(|(_, ex)| ex.borrow().has_pending_commands(ts_now))
1180                .map(|(id, _)| *id)
1181                .collect();
1182
1183            if active_venues.is_empty() {
1184                break;
1185            }
1186
1187            for venue_id in &active_venues {
1188                self.venues[venue_id].borrow_mut().process(ts_now);
1189            }
1190            self.drain_command_queues();
1191
1192            for venue_id in &active_venues {
1193                self.venues[venue_id]
1194                    .borrow_mut()
1195                    .iterate_matching_engines(ts_now);
1196            }
1197
1198            // Drain again so fill-triggered commands (e.g. hedge orders
1199            // from on_order_filled) are visible to has_pending_commands
1200            self.drain_command_queues();
1201        }
1202    }
1203
1204    fn run_venue_modules(&mut self, ts_now: UnixNanos) {
1205        if self.last_module_ns == Some(ts_now) {
1206            return;
1207        }
1208        self.last_module_ns = Some(ts_now);
1209
1210        // Pre-settle handler-generated work so modules see final state
1211        self.drain_command_queues();
1212        self.settle_venues(ts_now);
1213
1214        for exchange in self.venues.values() {
1215            exchange.borrow_mut().process_modules(ts_now);
1216        }
1217
1218        // Post-settle any commands emitted by modules
1219        self.drain_command_queues();
1220        self.settle_venues(ts_now);
1221    }
1222
1223    fn drain_exec_client_events(&self) {
1224        for client in &self.exec_clients {
1225            client.drain_queued_events();
1226        }
1227    }
1228
1229    fn drain_command_queues(&self) {
1230        // Drain trading commands, exec client events, and data commands
1231        // in a loop until all queues settle. Handles cascading re-entrancy
1232        // (e.g. strategy submits order from on_order_filled).
1233        loop {
1234            drain_trading_cmd_queue();
1235            drain_data_cmd_queue();
1236            self.drain_exec_client_events();
1237
1238            if trading_cmd_queue_is_empty() && data_cmd_queue_is_empty() {
1239                break;
1240            }
1241        }
1242    }
1243
1244    fn init_command_senders() {
1245        replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
1246        replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
1247    }
1248
1249    fn advance_clock_on_accumulator(
1250        accumulator: &mut TimeEventAccumulator,
1251        clock: &Rc<RefCell<dyn Clock>>,
1252        to_time_ns: UnixNanos,
1253        set_time: bool,
1254    ) {
1255        let mut clock_ref = clock.borrow_mut();
1256        let test_clock = clock_ref
1257            .as_any_mut()
1258            .downcast_mut::<TestClock>()
1259            .expect("BacktestEngine requires TestClock");
1260        accumulator.advance_clock(test_clock, to_time_ns, set_time);
1261    }
1262
1263    fn set_all_clocks_time(clocks: &[Rc<RefCell<dyn Clock>>], ts: UnixNanos) {
1264        for clock in clocks {
1265            let mut clock_ref = clock.borrow_mut();
1266            let test_clock = clock_ref
1267                .as_any_mut()
1268                .downcast_mut::<TestClock>()
1269                .expect("BacktestEngine requires TestClock");
1270            test_clock.set_time(ts);
1271        }
1272    }
1273
1274    #[rustfmt::skip]
1275    fn log_pre_run(&self) {
1276        log_info!("=================================================================", color = LogColor::Cyan);
1277        log_info!(" BACKTEST PRE-RUN", color = LogColor::Cyan);
1278        log_info!("=================================================================", color = LogColor::Cyan);
1279
1280        let cache = self.kernel.cache.borrow();
1281        for exchange in self.venues.values() {
1282            let ex = exchange.borrow();
1283            log_info!("=================================================================", color = LogColor::Cyan);
1284            log::info!(" SimulatedVenue {} ({})", ex.id, ex.account_type);
1285            log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1286
1287            if let Some(account) = cache.account_for_venue(&ex.id) {
1288                log::info!("Balances starting:");
1289                let account_ref: &dyn Account = match account {
1290                    AccountAny::Margin(margin) => margin,
1291                    AccountAny::Cash(cash) => cash,
1292                    AccountAny::Betting(betting) => betting,
1293                };
1294
1295                for balance in account_ref.starting_balances().values() {
1296                    log::info!("  {balance}");
1297                }
1298            }
1299        }
1300
1301        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1302    }
1303
1304    #[rustfmt::skip]
1305    fn log_run(&self) {
1306        let config_id = self.run_config_id.as_deref().unwrap_or("None");
1307        let id = format_optional_uuid(self.run_id.as_ref());
1308        let start = format_optional_nanos(self.backtest_start);
1309
1310        log_info!("=================================================================", color = LogColor::Cyan);
1311        log_info!(" BACKTEST RUN", color = LogColor::Cyan);
1312        log_info!("=================================================================", color = LogColor::Cyan);
1313        log::info!("Run config ID:  {config_id}");
1314        log::info!("Run ID:         {id}");
1315        log::info!("Backtest start: {start}");
1316        log::info!("Data elements:  {}", self.data_len);
1317        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1318    }
1319
1320    #[rustfmt::skip]
1321    fn log_post_run(&self) {
1322        let cache = self.kernel.cache.borrow();
1323        let orders = cache.orders(None, None, None, None, None);
1324        let total_events: usize = orders.iter().map(|o| o.event_count()).sum();
1325        let total_orders = orders.len();
1326        let positions = cache.positions(None, None, None, None, None);
1327        let total_positions = positions.len();
1328
1329        let config_id = self.run_config_id.as_deref().unwrap_or("None");
1330        let id = format_optional_uuid(self.run_id.as_ref());
1331        let started = format_optional_nanos(self.run_started);
1332        let finished = format_optional_nanos(self.run_finished);
1333        let elapsed = format_optional_duration(self.run_started, self.run_finished);
1334        let bt_start = format_optional_nanos(self.backtest_start);
1335        let bt_end = format_optional_nanos(self.backtest_end);
1336        let bt_range = format_optional_duration(self.backtest_start, self.backtest_end);
1337        let iterations = self.iteration.separate_with_underscores();
1338        let events = total_events.separate_with_underscores();
1339        let num_orders = total_orders.separate_with_underscores();
1340        let num_positions = total_positions.separate_with_underscores();
1341
1342        log_info!("=================================================================", color = LogColor::Cyan);
1343        log_info!(" BACKTEST POST-RUN", color = LogColor::Cyan);
1344        log_info!("=================================================================", color = LogColor::Cyan);
1345        log::info!("Run config ID:  {config_id}");
1346        log::info!("Run ID:         {id}");
1347        log::info!("Run started:    {started}");
1348        log::info!("Run finished:   {finished}");
1349        log::info!("Elapsed time:   {elapsed}");
1350        log::info!("Backtest start: {bt_start}");
1351        log::info!("Backtest end:   {bt_end}");
1352        log::info!("Backtest range: {bt_range}");
1353        log::info!("Iterations: {iterations}");
1354        log::info!("Total events: {events}");
1355        log::info!("Total orders: {num_orders}");
1356        log::info!("Total positions: {num_positions}");
1357
1358        if !self.config.run_analysis {
1359            return;
1360        }
1361
1362        let analyzer = self.build_analyzer(&cache, &positions);
1363        log_portfolio_performance(&analyzer);
1364    }
1365
1366    /// Registers a data client for the given `client_id` if one does not already exist.
1367    pub fn add_data_client_if_not_exists(&mut self, client_id: ClientId) {
1368        if self
1369            .kernel
1370            .data_engine
1371            .borrow()
1372            .registered_clients()
1373            .contains(&client_id)
1374        {
1375            return;
1376        }
1377
1378        let venue = Venue::from(client_id.as_str());
1379        let backtest_client = BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1380        let data_client_adapter = DataClientAdapter::new(
1381            backtest_client.client_id,
1382            None,
1383            false,
1384            false,
1385            Box::new(backtest_client),
1386        );
1387
1388        self.kernel
1389            .data_engine
1390            .borrow_mut()
1391            .register_client(data_client_adapter, None);
1392    }
1393
1394    /// Registers a market data client for the given `venue` if one does not already exist.
1395    pub fn add_market_data_client_if_not_exists(&mut self, venue: Venue) {
1396        let client_id = ClientId::from(venue.as_str());
1397
1398        if !self
1399            .kernel
1400            .data_engine
1401            .borrow()
1402            .registered_clients()
1403            .contains(&client_id)
1404        {
1405            let backtest_client =
1406                BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1407            let data_client_adapter = DataClientAdapter::new(
1408                client_id,
1409                Some(venue),
1410                false,
1411                false,
1412                Box::new(backtest_client),
1413            );
1414            self.kernel
1415                .data_engine
1416                .borrow_mut()
1417                .register_client(data_client_adapter, Some(venue));
1418        }
1419    }
1420}
1421
1422fn format_optional_nanos(nanos: Option<UnixNanos>) -> String {
1423    nanos.map_or("None".to_string(), unix_nanos_to_iso8601)
1424}
1425
1426fn format_optional_uuid(uuid: Option<&UUID4>) -> String {
1427    uuid.map_or("None".to_string(), |id| id.to_string())
1428}
1429
1430fn format_optional_duration(start: Option<UnixNanos>, end: Option<UnixNanos>) -> String {
1431    match (start, end) {
1432        (Some(s), Some(e)) => {
1433            let delta = e.to_datetime_utc() - s.to_datetime_utc();
1434            let days = delta.num_days().abs();
1435            let hours = delta.num_hours().abs() % 24;
1436            let minutes = delta.num_minutes().abs() % 60;
1437            let seconds = delta.num_seconds().abs() % 60;
1438            let micros = delta.subsec_nanos().unsigned_abs() / 1_000;
1439            format!("{days} days {hours:02}:{minutes:02}:{seconds:02}.{micros:06}")
1440        }
1441        _ => "None".to_string(),
1442    }
1443}
1444
1445#[rustfmt::skip]
1446fn log_portfolio_performance(analyzer: &PortfolioAnalyzer) {
1447    log_info!("=================================================================", color = LogColor::Cyan);
1448    log_info!(" PORTFOLIO PERFORMANCE", color = LogColor::Cyan);
1449    log_info!("=================================================================", color = LogColor::Cyan);
1450
1451    for currency in analyzer.currencies() {
1452        log::info!(" PnL Statistics ({})", currency.code);
1453        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1454
1455        if let Ok(pnl_lines) = analyzer.get_stats_pnls_formatted(Some(currency), None) {
1456            for line in &pnl_lines {
1457                log::info!("{line}");
1458            }
1459        }
1460
1461        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1462    }
1463
1464    log::info!(" Returns Statistics");
1465    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1466
1467    for line in &analyzer.get_stats_returns_formatted() {
1468        log::info!("{line}");
1469    }
1470    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1471
1472    log::info!(" General Statistics");
1473    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1474
1475    for line in &analyzer.get_stats_general_formatted() {
1476        log::info!("{line}");
1477    }
1478    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1479}
1480
1481#[cfg(test)]
1482mod tests {
1483    use nautilus_model::{
1484        data::{Data, InstrumentStatus},
1485        enums::{AccountType, BookType, MarketStatus, MarketStatusAction, OmsType},
1486        identifiers::Venue,
1487        instruments::{
1488            CryptoPerpetual, Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt,
1489        },
1490        types::Money,
1491    };
1492    use rstest::*;
1493
1494    use super::*;
1495
1496    fn create_engine() -> BacktestEngine {
1497        let mut engine = BacktestEngine::new(BacktestEngineConfig::default()).unwrap();
1498        let venue_config = SimulatedVenueConfig::builder()
1499            .venue(Venue::from("BINANCE"))
1500            .oms_type(OmsType::Netting)
1501            .account_type(AccountType::Margin)
1502            .book_type(BookType::L1_MBP)
1503            .starting_balances(vec![Money::from("1_000_000 USDT")])
1504            .build();
1505        engine.add_venue(venue_config).unwrap();
1506        engine
1507    }
1508
1509    #[rstest]
1510    #[case(None)]
1511    #[case(Some(true))]
1512    #[case(Some(false))]
1513    fn test_new_forces_drop_instruments_on_reset_false(
1514        crypto_perpetual_ethusdt: CryptoPerpetual,
1515        #[case] user_value: Option<bool>,
1516    ) {
1517        use nautilus_common::cache::CacheConfig;
1518
1519        let config = match user_value {
1520            None => BacktestEngineConfig::builder().build(),
1521            Some(value) => BacktestEngineConfig::builder()
1522                .cache(
1523                    CacheConfig::builder()
1524                        .drop_instruments_on_reset(value)
1525                        .build(),
1526                )
1527                .build(),
1528        };
1529        let mut engine = BacktestEngine::new(config).unwrap();
1530
1531        let venue_config = SimulatedVenueConfig::builder()
1532            .venue(Venue::from("BINANCE"))
1533            .oms_type(OmsType::Netting)
1534            .account_type(AccountType::Margin)
1535            .book_type(BookType::L1_MBP)
1536            .starting_balances(vec![Money::from("1_000_000 USDT")])
1537            .build();
1538        engine.add_venue(venue_config).unwrap();
1539
1540        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1541        let instrument_id = instrument.id();
1542        engine.add_instrument(&instrument).unwrap();
1543
1544        engine.reset();
1545
1546        assert!(
1547            engine
1548                .kernel()
1549                .cache
1550                .borrow()
1551                .instrument(&instrument_id)
1552                .is_some(),
1553            "instrument must survive engine.reset(); user-supplied \
1554             drop_instruments_on_reset={user_value:?} must not leak through",
1555        );
1556    }
1557
1558    #[rstest]
1559    fn test_route_data_to_exchange_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1560        let mut engine = create_engine();
1561        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1562        let instrument_id = instrument.id();
1563        engine.add_instrument(&instrument).unwrap();
1564
1565        let status = InstrumentStatus::new(
1566            instrument_id,
1567            MarketStatusAction::Close,
1568            UnixNanos::from(1),
1569            UnixNanos::from(1),
1570            None,
1571            None,
1572            None,
1573            None,
1574            None,
1575        );
1576
1577        engine.route_data_to_exchange(&Data::InstrumentStatus(status));
1578
1579        let exchange = engine.venues.get(&instrument_id.venue).unwrap().borrow();
1580        let market_status = exchange
1581            .get_matching_engine(&instrument_id)
1582            .unwrap()
1583            .market_status;
1584        assert_eq!(market_status, MarketStatus::Closed);
1585    }
1586}