Skip to main content

nautilus_backtest/
exchange.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `SimulatedExchange` venue for backtesting on historical data.
17
18use std::{
19    cell::RefCell,
20    collections::{BinaryHeap, VecDeque},
21    fmt::Debug,
22    rc::Rc,
23};
24
25use ahash::AHashMap;
26use indexmap::IndexMap;
27use nautilus_common::{
28    cache::Cache,
29    clients::ExecutionClient,
30    clock::{Clock, TestClock},
31    messages::execution::TradingCommand,
32};
33use nautilus_core::{
34    UnixNanos,
35    correctness::{CorrectnessResultExt, FAILED, check_equal},
36};
37use nautilus_execution::{
38    matching_core::OrderMatchInfo,
39    matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
40    models::{fee::FeeModelAny, fill::FillModelAny, latency::LatencyModel},
41};
42use nautilus_model::{
43    accounts::{AccountAny, margin_model::MarginModelAny},
44    data::{
45        Bar, Data, InstrumentClose, InstrumentStatus, OrderBookDelta, OrderBookDeltas,
46        OrderBookDeltas_API, OrderBookDepth10, QuoteTick, TradeTick,
47    },
48    enums::{AccountType, AggressorSide, BookType, OmsType},
49    identifiers::{AccountId, InstrumentId, Venue},
50    instruments::{Instrument, InstrumentAny},
51    orderbook::OrderBook,
52    orders::{Order, OrderAny},
53    types::{AccountBalance, Currency, Money, Price},
54};
55use rust_decimal::Decimal;
56
57use crate::{
58    config::SimulatedVenueConfig,
59    modules::{ExchangeContext, SimulationModule},
60};
61
62/// Represents commands with simulated network latency in a min-heap priority queue.
63/// The commands are ordered by timestamp for FIFO processing, with the
64/// earliest timestamp having the highest priority in the queue.
65#[derive(Debug, Eq, PartialEq)]
66struct InflightCommand {
67    timestamp: UnixNanos,
68    counter: u32,
69    command: TradingCommand,
70}
71
72impl InflightCommand {
73    const fn new(timestamp: UnixNanos, counter: u32, command: TradingCommand) -> Self {
74        Self {
75            timestamp,
76            counter,
77            command,
78        }
79    }
80}
81
82impl Ord for InflightCommand {
83    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
84        // Reverse ordering for min-heap (earliest timestamp first then lowest counter)
85        other
86            .timestamp
87            .cmp(&self.timestamp)
88            .then_with(|| other.counter.cmp(&self.counter))
89    }
90}
91
92impl PartialOrd for InflightCommand {
93    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
94        Some(self.cmp(other))
95    }
96}
97
98/// Simulated exchange venue for realistic trading execution during backtesting.
99///
100/// The `SimulatedExchange` provides a simulation of a trading venue,
101/// including order matching engines, account management, and realistic execution
102/// models. It maintains order books, processes market data, and executes trades
103/// with configurable latency and fill models to accurately simulate real market
104/// conditions during backtesting.
105///
106/// Key features:
107/// - Multi-instrument order matching with realistic execution
108/// - Configurable fee, fill, and latency models
109/// - Support for various order types and execution options
110/// - Account balance and position management
111/// - Market data processing and order book maintenance
112/// - Simulation modules for custom venue behaviors
113pub struct SimulatedExchange {
114    /// The venue identifier.
115    pub id: Venue,
116    /// The order management system type.
117    pub oms_type: OmsType,
118    /// The account type for the venue.
119    pub account_type: AccountType,
120    /// The optional base currency for single-currency accounts.
121    pub base_currency: Option<Currency>,
122    starting_balances: Vec<Money>,
123    book_type: BookType,
124    default_leverage: Decimal,
125    exec_client: Option<Rc<dyn ExecutionClient>>,
126    fee_model: FeeModelAny,
127    fill_model: FillModelAny,
128    latency_model: Option<Box<dyn LatencyModel>>,
129    instruments: AHashMap<InstrumentId, InstrumentAny>,
130    matching_engines: AHashMap<InstrumentId, OrderMatchingEngine>,
131    settlement_prices: AHashMap<InstrumentId, Price>,
132    leverages: AHashMap<InstrumentId, Decimal>,
133    margin_model: Option<MarginModelAny>,
134    modules: Vec<Box<dyn SimulationModule>>,
135    clock: Rc<RefCell<dyn Clock>>,
136    cache: Rc<RefCell<Cache>>,
137    message_queue: VecDeque<TradingCommand>,
138    inflight_queue: BinaryHeap<InflightCommand>,
139    inflight_counter: AHashMap<UnixNanos, u32>,
140    bar_execution: bool,
141    bar_adaptive_high_low_ordering: bool,
142    trade_execution: bool,
143    liquidity_consumption: bool,
144    reject_stop_orders: bool,
145    support_gtd_orders: bool,
146    support_contingent_orders: bool,
147    use_position_ids: bool,
148    use_random_ids: bool,
149    use_reduce_only: bool,
150    use_message_queue: bool,
151    use_market_order_acks: bool,
152    _allow_cash_borrowing: bool,
153    frozen_account: bool,
154    queue_position: bool,
155    oto_full_trigger: bool,
156    price_protection_points: u32,
157}
158
159impl Debug for SimulatedExchange {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        f.debug_struct(stringify!(SimulatedExchange))
162            .field("id", &self.id)
163            .field("account_type", &self.account_type)
164            .finish()
165    }
166}
167
168impl SimulatedExchange {
169    /// Creates a new [`SimulatedExchange`] instance from a venue configuration.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if:
174    /// - `starting_balances` is empty.
175    /// - `base_currency` is `Some` but `starting_balances` contains multiple currencies.
176    pub fn new(
177        config: SimulatedVenueConfig,
178        cache: Rc<RefCell<Cache>>,
179        clock: Rc<RefCell<dyn Clock>>,
180    ) -> anyhow::Result<Self> {
181        if config.starting_balances.is_empty() {
182            anyhow::bail!("Starting balances must be provided")
183        }
184
185        if config.base_currency.is_some() && config.starting_balances.len() > 1 {
186            anyhow::bail!("single-currency account has multiple starting currencies")
187        }
188
189        let default_leverage = config.default_leverage.unwrap_or_else(|| {
190            if config.account_type == AccountType::Margin {
191                Decimal::from(10)
192            } else {
193                Decimal::from(1)
194            }
195        });
196
197        Ok(Self {
198            id: config.venue,
199            oms_type: config.oms_type,
200            account_type: config.account_type,
201            base_currency: config.base_currency,
202            starting_balances: config.starting_balances,
203            book_type: config.book_type,
204            default_leverage,
205            exec_client: None,
206            fee_model: config.fee_model,
207            fill_model: config.fill_model,
208            latency_model: config.latency_model,
209            instruments: AHashMap::new(),
210            matching_engines: AHashMap::new(),
211            settlement_prices: AHashMap::new(),
212            leverages: config.leverages,
213            margin_model: config.margin_model,
214            modules: config.modules,
215            clock,
216            cache,
217            message_queue: VecDeque::new(),
218            inflight_queue: BinaryHeap::new(),
219            inflight_counter: AHashMap::new(),
220            bar_execution: config.bar_execution,
221            bar_adaptive_high_low_ordering: config.bar_adaptive_high_low_ordering,
222            trade_execution: config.trade_execution,
223            liquidity_consumption: config.liquidity_consumption,
224            reject_stop_orders: config.reject_stop_orders,
225            support_gtd_orders: config.support_gtd_orders,
226            support_contingent_orders: config.support_contingent_orders,
227            use_position_ids: config.use_position_ids,
228            use_random_ids: config.use_random_ids,
229            use_reduce_only: config.use_reduce_only,
230            use_message_queue: config.use_message_queue,
231            use_market_order_acks: config.use_market_order_acks,
232            _allow_cash_borrowing: config.allow_cash_borrowing,
233            frozen_account: config.frozen_account,
234            queue_position: config.queue_position,
235            oto_full_trigger: config.oto_full_trigger,
236            price_protection_points: config.price_protection_points,
237        })
238    }
239
240    /// Registers the execution client for the exchange.
241    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
242        self.exec_client = Some(client);
243    }
244
245    /// Sets the fill model for the exchange.
246    pub fn set_fill_model(&mut self, fill_model: FillModelAny) {
247        for matching_engine in self.matching_engines.values_mut() {
248            matching_engine.set_fill_model(fill_model.clone());
249            log::info!(
250                "Setting fill model for {} to {}",
251                matching_engine.venue,
252                self.fill_model
253            );
254        }
255        self.fill_model = fill_model;
256    }
257
258    /// Sets the latency model for the exchange.
259    pub fn set_latency_model(&mut self, latency_model: Box<dyn LatencyModel>) {
260        self.latency_model = Some(latency_model);
261    }
262
263    /// Sets the settlement price for the given instrument.
264    pub fn set_settlement_price(&mut self, instrument_id: InstrumentId, price: Price) {
265        self.settlement_prices.insert(instrument_id, price);
266    }
267
268    /// Returns the configured book type for this venue.
269    #[must_use]
270    pub const fn book_type(&self) -> BookType {
271        self.book_type
272    }
273
274    /// Returns an iterator over the instrument IDs registered with this exchange.
275    pub fn instrument_ids(&self) -> impl Iterator<Item = &InstrumentId> {
276        self.instruments.keys()
277    }
278
279    pub fn initialize_account(&mut self) {
280        self.generate_fresh_account_state();
281    }
282
283    /// Loads non-emulated open orders from the cache into matching engines.
284    pub fn load_open_orders(&mut self) {
285        let mut open_orders: Vec<(OrderAny, AccountId)> = {
286            let cache = self.cache.as_ref().borrow();
287            cache
288                .orders_open(Some(&self.id), None, None, None, None)
289                .into_iter()
290                .filter(|order| !order.is_emulated())
291                .filter_map(|order| {
292                    order
293                        .account_id()
294                        .map(|account_id| (order.clone(), account_id))
295                })
296                .collect()
297        };
298
299        // Sort for deterministic insertion order
300        open_orders.sort_by(|(a, _), (b, _)| {
301            a.ts_init()
302                .cmp(&b.ts_init())
303                .then_with(|| a.client_order_id().cmp(&b.client_order_id()))
304        });
305
306        for (mut order, account_id) in open_orders {
307            let instrument_id = order.instrument_id();
308            if let Some(matching_engine) = self.matching_engines.get_mut(&instrument_id) {
309                matching_engine.process_order(&mut order, account_id);
310            } else {
311                log::error!(
312                    "No matching engine for {instrument_id} to load open order {}",
313                    order.client_order_id()
314                );
315            }
316        }
317    }
318
319    // panics-doc-ok (transitive via expect_display on venue mismatch)
320    /// Adds an instrument to the simulated exchange and initializes its matching engine.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if the exchange account type is `Cash` and the instrument is a `CryptoPerpetual` or `CryptoFuture`.
325    ///
326    /// # Panics
327    ///
328    /// Panics if the instrument cannot be added to the exchange.
329    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
330        check_equal(
331            &instrument.id().venue,
332            &self.id,
333            "Venue of instrument id",
334            "Venue of simulated exchange",
335        )
336        .expect_display(FAILED);
337
338        if self.account_type == AccountType::Cash
339            && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
340                || matches!(instrument, InstrumentAny::CryptoFuture(_))
341                || matches!(instrument, InstrumentAny::PerpetualContract(_)))
342        {
343            anyhow::bail!("Cash account cannot trade futures or perpetuals")
344        }
345
346        self.instruments.insert(instrument.id(), instrument.clone());
347
348        let price_protection = if self.price_protection_points == 0 {
349            None
350        } else {
351            Some(self.price_protection_points)
352        };
353
354        let matching_engine_config = OrderMatchingEngineConfig::builder()
355            .bar_execution(self.bar_execution)
356            .bar_adaptive_high_low_ordering(self.bar_adaptive_high_low_ordering)
357            .trade_execution(self.trade_execution)
358            .liquidity_consumption(self.liquidity_consumption)
359            .reject_stop_orders(self.reject_stop_orders)
360            .support_gtd_orders(self.support_gtd_orders)
361            .support_contingent_orders(self.support_contingent_orders)
362            .use_position_ids(self.use_position_ids)
363            .use_random_ids(self.use_random_ids)
364            .use_reduce_only(self.use_reduce_only)
365            .use_market_order_acks(self.use_market_order_acks)
366            .queue_position(self.queue_position)
367            .oto_full_trigger(self.oto_full_trigger)
368            .maybe_price_protection_points(price_protection)
369            .build();
370        let instrument_id = instrument.id();
371        let matching_engine = OrderMatchingEngine::new(
372            instrument,
373            self.instruments.len() as u32,
374            self.fill_model.clone(),
375            self.fee_model.clone(),
376            self.book_type,
377            self.oms_type,
378            self.account_type,
379            self.clock.clone(),
380            Rc::clone(&self.cache),
381            matching_engine_config,
382        );
383        self.matching_engines.insert(instrument_id, matching_engine);
384
385        log::info!("Added instrument {instrument_id} and created matching engine");
386        Ok(())
387    }
388
389    /// Returns the best bid price for the given instrument, if available.
390    #[must_use]
391    pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
392        self.matching_engines
393            .get(&instrument_id)
394            .and_then(OrderMatchingEngine::best_bid_price)
395    }
396
397    /// Returns the best ask price for the given instrument, if available.
398    #[must_use]
399    pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
400        self.matching_engines
401            .get(&instrument_id)
402            .and_then(OrderMatchingEngine::best_ask_price)
403    }
404
405    /// Returns a reference to the order book for the given instrument, if available.
406    pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
407        self.matching_engines
408            .get(&instrument_id)
409            .map(OrderMatchingEngine::get_book)
410    }
411
412    /// Returns a reference to the matching engine for the given instrument, if available.
413    #[must_use]
414    pub fn get_matching_engine(
415        &self,
416        instrument_id: &InstrumentId,
417    ) -> Option<&OrderMatchingEngine> {
418        self.matching_engines.get(instrument_id)
419    }
420
421    /// Returns a reference to all matching engines keyed by instrument ID.
422    #[must_use]
423    pub const fn get_matching_engines(&self) -> &AHashMap<InstrumentId, OrderMatchingEngine> {
424        &self.matching_engines
425    }
426
427    /// Returns all order books keyed by instrument ID.
428    #[must_use]
429    pub fn get_books(&self) -> AHashMap<InstrumentId, OrderBook> {
430        let mut books = AHashMap::new();
431        for (instrument_id, matching_engine) in &self.matching_engines {
432            books.insert(*instrument_id, matching_engine.get_book().clone());
433        }
434        books
435    }
436
437    /// Returns all open orders, optionally filtered by instrument ID.
438    #[must_use]
439    pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<OrderMatchInfo> {
440        instrument_id
441            .and_then(|id| {
442                self.matching_engines
443                    .get(&id)
444                    .map(OrderMatchingEngine::get_open_orders)
445            })
446            .unwrap_or_else(|| {
447                self.matching_engines
448                    .values()
449                    .flat_map(OrderMatchingEngine::get_open_orders)
450                    .collect()
451            })
452    }
453
454    /// Returns all open bid orders, optionally filtered by instrument ID.
455    #[must_use]
456    pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<OrderMatchInfo> {
457        instrument_id
458            .and_then(|id| {
459                self.matching_engines
460                    .get(&id)
461                    .map(|engine| engine.get_open_bid_orders().to_vec())
462            })
463            .unwrap_or_else(|| {
464                self.matching_engines
465                    .values()
466                    .flat_map(|engine| engine.get_open_bid_orders().to_vec())
467                    .collect()
468            })
469    }
470
471    /// Returns all open ask orders, optionally filtered by instrument ID.
472    #[must_use]
473    pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<OrderMatchInfo> {
474        instrument_id
475            .and_then(|id| {
476                self.matching_engines
477                    .get(&id)
478                    .map(|engine| engine.get_open_ask_orders().to_vec())
479            })
480            .unwrap_or_else(|| {
481                self.matching_engines
482                    .values()
483                    .flat_map(|engine| engine.get_open_ask_orders().to_vec())
484                    .collect()
485            })
486    }
487
488    /// Returns the account for this exchange, if an execution client is registered.
489    #[must_use]
490    pub fn get_account(&self) -> Option<AccountAny> {
491        self.exec_client
492            .as_ref()
493            .and_then(|client| client.get_account())
494    }
495
496    /// Returns a reference to the cache.
497    #[must_use]
498    pub fn cache(&self) -> &Rc<RefCell<Cache>> {
499        &self.cache
500    }
501
502    /// Adjusts the account balance by the given amount.
503    ///
504    /// # Panics
505    ///
506    /// Panics if generating account state fails during adjustment.
507    pub fn adjust_account(&mut self, adjustment: Money) {
508        if self.frozen_account {
509            // Nothing to adjust
510            return;
511        }
512
513        if let Some(exec_client) = &self.exec_client {
514            let venue = exec_client.venue();
515            log::debug!("Adjusting account for venue {venue}");
516            if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
517                match account.balance(Some(adjustment.currency)) {
518                    Some(balance) => {
519                        let mut current_balance = *balance;
520                        current_balance.total = current_balance.total + adjustment;
521                        current_balance.free = current_balance.free + adjustment;
522
523                        let margins = match account {
524                            AccountAny::Margin(margin_account) => margin_account.margins.clone(),
525                            _ => IndexMap::new(),
526                        };
527
528                        if let Some(exec_client) = &self.exec_client {
529                            exec_client
530                                .generate_account_state(
531                                    vec![current_balance],
532                                    margins.values().copied().collect(),
533                                    true,
534                                    self.clock.borrow().timestamp_ns(),
535                                )
536                                .unwrap();
537                        }
538                    }
539                    None => {
540                        log::error!(
541                            "Cannot adjust account: no balance for currency {}",
542                            adjustment.currency
543                        );
544                    }
545                }
546            } else {
547                log::error!("Cannot adjust account: no account for venue {venue}");
548            }
549        }
550    }
551
552    /// Returns whether there are pending commands at or before `ts_now`.
553    #[must_use]
554    pub fn has_pending_commands(&self, ts_now: UnixNanos) -> bool {
555        if !self.message_queue.is_empty() {
556            return true;
557        }
558        self.inflight_queue
559            .peek()
560            .is_some_and(|inflight| inflight.timestamp <= ts_now)
561    }
562
563    /// Iterates all matching engines so newly submitted orders can match
564    /// against the current market state.
565    pub fn iterate_matching_engines(&mut self, ts_now: UnixNanos) {
566        for matching_engine in self.matching_engines.values_mut() {
567            matching_engine.iterate(ts_now, AggressorSide::NoAggressor);
568        }
569    }
570
571    /// Advances the exchange clock to the given timestamp so that any event
572    /// generators (modules, account state) see the correct time even when
573    /// no commands are pending.
574    ///
575    /// # Panics
576    ///
577    /// Panics if the clock is not a [`TestClock`].
578    pub fn set_clock_time(&self, ts_now: UnixNanos) {
579        let mut clock_ref = self.clock.borrow_mut();
580        let test_clock = clock_ref
581            .as_any_mut()
582            .downcast_mut::<TestClock>()
583            .expect("SimulatedExchange requires TestClock");
584        test_clock.set_time(ts_now);
585    }
586
587    /// Sends a trading command to the exchange for processing.
588    pub fn send(&mut self, command: TradingCommand) {
589        if !self.use_message_queue {
590            self.process_trading_command(command);
591        } else if self.latency_model.is_none() {
592            self.message_queue.push_back(command);
593        } else {
594            let (timestamp, counter) = self.generate_inflight_command(&command);
595            self.inflight_queue
596                .push(InflightCommand::new(timestamp, counter, command));
597        }
598    }
599
600    fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
601        if let Some(latency_model) = &self.latency_model {
602            let ts = match command {
603                TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
604                    command.ts_init() + latency_model.get_insert_latency()
605                }
606                TradingCommand::ModifyOrder(_) => {
607                    command.ts_init() + latency_model.get_update_latency()
608                }
609                TradingCommand::CancelOrder(_)
610                | TradingCommand::CancelAllOrders(_)
611                | TradingCommand::BatchCancelOrders(_) => {
612                    command.ts_init() + latency_model.get_delete_latency()
613                }
614                _ => panic!("Cannot handle command: {command:?}"),
615            };
616
617            let counter = self
618                .inflight_counter
619                .entry(ts)
620                .and_modify(|e| *e += 1)
621                .or_insert(1);
622
623            (ts, *counter)
624        } else {
625            panic!("Latency model should be initialized");
626        }
627    }
628
629    /// Processes a single order book delta.
630    ///
631    /// # Panics
632    ///
633    /// Panics if adding a missing instrument during delta processing fails.
634    pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
635        for module in &self.modules {
636            module.pre_process(&Data::Delta(delta));
637        }
638
639        if !self.matching_engines.contains_key(&delta.instrument_id) {
640            let instrument = {
641                let cache = self.cache.as_ref().borrow();
642                cache.instrument(&delta.instrument_id).cloned()
643            };
644
645            if let Some(instrument) = instrument {
646                self.add_instrument(instrument).unwrap();
647            } else {
648                panic!(
649                    "No matching engine found for instrument {}",
650                    delta.instrument_id
651                );
652            }
653        }
654
655        if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
656            matching_engine.process_order_book_delta(&delta).unwrap();
657        } else {
658            panic!("Matching engine should be initialized");
659        }
660    }
661
662    /// Processes a batch of order book deltas.
663    ///
664    /// # Panics
665    ///
666    /// Panics if adding a missing instrument during deltas processing fails.
667    pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) {
668        for module in &self.modules {
669            module.pre_process(&Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
670        }
671
672        if !self.matching_engines.contains_key(&deltas.instrument_id) {
673            let instrument = {
674                let cache = self.cache.as_ref().borrow();
675                cache.instrument(&deltas.instrument_id).cloned()
676            };
677
678            if let Some(instrument) = instrument {
679                self.add_instrument(instrument).unwrap();
680            } else {
681                panic!(
682                    "No matching engine found for instrument {}",
683                    deltas.instrument_id
684                );
685            }
686        }
687
688        if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
689            matching_engine.process_order_book_deltas(deltas).unwrap();
690        } else {
691            panic!("Matching engine should be initialized");
692        }
693    }
694
695    /// Processes an L2 order book depth snapshot.
696    ///
697    /// # Panics
698    ///
699    /// Panics if adding a missing instrument during depth10 processing fails.
700    pub fn process_order_book_depth10(&mut self, depth: &OrderBookDepth10) {
701        for module in &self.modules {
702            module.pre_process(&Data::Depth10(Box::new(*depth)));
703        }
704
705        if !self.matching_engines.contains_key(&depth.instrument_id) {
706            let instrument = {
707                let cache = self.cache.as_ref().borrow();
708                cache.instrument(&depth.instrument_id).cloned()
709            };
710
711            if let Some(instrument) = instrument {
712                self.add_instrument(instrument).unwrap();
713            } else {
714                panic!(
715                    "No matching engine found for instrument {}",
716                    depth.instrument_id
717                );
718            }
719        }
720
721        if let Some(matching_engine) = self.matching_engines.get_mut(&depth.instrument_id) {
722            matching_engine.process_order_book_depth10(depth).unwrap();
723        } else {
724            panic!("Matching engine should be initialized");
725        }
726    }
727
728    /// Processes a quote tick and updates the matching engine.
729    ///
730    /// # Panics
731    ///
732    /// Panics if adding a missing instrument during quote tick processing fails.
733    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
734        for module in &self.modules {
735            module.pre_process(&Data::Quote(*quote));
736        }
737
738        if !self.matching_engines.contains_key(&quote.instrument_id) {
739            let instrument = {
740                let cache = self.cache.as_ref().borrow();
741                cache.instrument(&quote.instrument_id).cloned()
742            };
743
744            if let Some(instrument) = instrument {
745                self.add_instrument(instrument).unwrap();
746            } else {
747                panic!(
748                    "No matching engine found for instrument {}",
749                    quote.instrument_id
750                );
751            }
752        }
753
754        if let Some(matching_engine) = self.matching_engines.get_mut(&quote.instrument_id) {
755            matching_engine.process_quote_tick(quote);
756        } else {
757            panic!("Matching engine should be initialized");
758        }
759    }
760
761    /// Processes a trade tick and updates the matching engine.
762    ///
763    /// # Panics
764    ///
765    /// Panics if adding a missing instrument during trade tick processing fails.
766    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
767        for module in &self.modules {
768            module.pre_process(&Data::Trade(*trade));
769        }
770
771        if !self.matching_engines.contains_key(&trade.instrument_id) {
772            let instrument = {
773                let cache = self.cache.as_ref().borrow();
774                cache.instrument(&trade.instrument_id).cloned()
775            };
776
777            if let Some(instrument) = instrument {
778                self.add_instrument(instrument).unwrap();
779            } else {
780                panic!(
781                    "No matching engine found for instrument {}",
782                    trade.instrument_id
783                );
784            }
785        }
786
787        if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
788            matching_engine.process_trade_tick(trade);
789        } else {
790            panic!("Matching engine should be initialized");
791        }
792    }
793
794    /// Processes a bar and updates the matching engine.
795    ///
796    /// # Panics
797    ///
798    /// Panics if adding a missing instrument during bar processing fails.
799    pub fn process_bar(&mut self, bar: Bar) {
800        for module in &self.modules {
801            module.pre_process(&Data::Bar(bar));
802        }
803
804        if !self.matching_engines.contains_key(&bar.instrument_id()) {
805            let instrument = {
806                let cache = self.cache.as_ref().borrow();
807                cache.instrument(&bar.instrument_id()).cloned()
808            };
809
810            if let Some(instrument) = instrument {
811                self.add_instrument(instrument).unwrap();
812            } else {
813                panic!(
814                    "No matching engine found for instrument {}",
815                    bar.instrument_id()
816                );
817            }
818        }
819
820        if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
821            matching_engine.process_bar(&bar);
822        } else {
823            panic!("Matching engine should be initialized");
824        }
825    }
826
827    /// Processes an instrument status update.
828    ///
829    /// # Panics
830    ///
831    /// Panics if adding a missing instrument during instrument status processing fails.
832    pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
833        for module in &self.modules {
834            module.pre_process(&Data::InstrumentStatus(status));
835        }
836
837        if !self.matching_engines.contains_key(&status.instrument_id) {
838            let instrument = {
839                let cache = self.cache.as_ref().borrow();
840                cache.instrument(&status.instrument_id).cloned()
841            };
842
843            if let Some(instrument) = instrument {
844                self.add_instrument(instrument).unwrap();
845            } else {
846                panic!(
847                    "No matching engine found for instrument {}",
848                    status.instrument_id
849                );
850            }
851        }
852
853        if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
854            matching_engine.process_status(status.action);
855        } else {
856            panic!("Matching engine should be initialized");
857        }
858    }
859
860    /// Processes an instrument close event.
861    ///
862    /// # Panics
863    ///
864    /// Panics if adding a missing instrument during instrument close processing fails.
865    pub fn process_instrument_close(&mut self, close: InstrumentClose) {
866        for module in &self.modules {
867            module.pre_process(&Data::InstrumentClose(close));
868        }
869
870        if !self.matching_engines.contains_key(&close.instrument_id) {
871            let instrument = {
872                let cache = self.cache.as_ref().borrow();
873                cache.instrument(&close.instrument_id).cloned()
874            };
875
876            if let Some(instrument) = instrument {
877                self.add_instrument(instrument).unwrap();
878            } else {
879                panic!(
880                    "No matching engine found for instrument {}",
881                    close.instrument_id
882                );
883            }
884        }
885
886        if let Some(matching_engine) = self.matching_engines.get_mut(&close.instrument_id) {
887            if let Some(price) = self.settlement_prices.get(&close.instrument_id) {
888                matching_engine.set_settlement_price(*price);
889            }
890            matching_engine.process_instrument_close(close);
891        } else {
892            panic!("Matching engine should be initialized");
893        }
894    }
895
896    /// Processes all pending inflight and queued trading commands up to `ts_now`.
897    ///
898    /// # Panics
899    ///
900    /// Panics if popping an inflight command fails during processing.
901    pub fn process(&mut self, ts_now: UnixNanos) {
902        // Clock is advanced by BacktestEngine::settle_venues before entering
903        // the settlement loop, so we don't set it here.
904
905        // Process inflight commands
906        while let Some(inflight) = self.inflight_queue.peek() {
907            if inflight.timestamp > ts_now {
908                // Future commands remain in the queue
909                break;
910            }
911            // We get the inflight command, remove it from the queue and process it
912            let inflight = self.inflight_queue.pop().unwrap();
913            self.process_trading_command(inflight.command);
914        }
915
916        // Process regular message queue
917        while let Some(command) = self.message_queue.pop_front() {
918            self.process_trading_command(command);
919        }
920    }
921
922    /// Runs all simulation modules for the given timestamp.
923    ///
924    /// Must be called once per time step after all command queues have fully
925    /// settled, not inside the settle loop.
926    pub fn process_modules(&mut self, ts_now: UnixNanos) {
927        let adjustments = {
928            let cache = self.cache.borrow();
929            let ctx = ExchangeContext {
930                venue: self.id,
931                base_currency: self.base_currency,
932                instruments: &self.instruments,
933                matching_engines: &self.matching_engines,
934                cache: &cache,
935            };
936            self.modules
937                .iter()
938                .flat_map(|m| m.process(ts_now, &ctx))
939                .collect::<Vec<Money>>()
940        };
941
942        for adjustment in adjustments {
943            self.adjust_account(adjustment);
944        }
945    }
946
947    /// Resets the exchange to its initial state.
948    pub fn reset(&mut self) {
949        for module in &self.modules {
950            module.reset();
951        }
952
953        self.generate_fresh_account_state();
954
955        for matching_engine in self.matching_engines.values_mut() {
956            matching_engine.reset();
957        }
958
959        self.settlement_prices.clear();
960        self.message_queue.clear();
961        self.inflight_queue.clear();
962
963        log::info!("Resetting exchange state");
964    }
965
966    /// Logs diagnostic information from all simulation modules.
967    pub fn log_diagnostics(&self) {
968        for module in &self.modules {
969            module.log_diagnostics();
970        }
971    }
972
973    fn process_trading_command(&mut self, command: TradingCommand) {
974        if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
975            let account_id = if let Some(exec_client) = &self.exec_client {
976                exec_client.account_id()
977            } else {
978                panic!("Execution client should be initialized");
979            };
980
981            match command {
982                TradingCommand::SubmitOrder(command) => {
983                    let mut order = self
984                        .cache
985                        .borrow()
986                        .order(&command.client_order_id)
987                        .cloned()
988                        .expect("Order must exist in cache");
989                    matching_engine.process_order(&mut order, account_id);
990                }
991                TradingCommand::ModifyOrder(ref command) => {
992                    matching_engine.process_modify(command, account_id);
993                }
994                TradingCommand::CancelOrder(ref command) => {
995                    matching_engine.process_cancel(command, account_id);
996                }
997                TradingCommand::CancelAllOrders(ref command) => {
998                    matching_engine.process_cancel_all(command, account_id);
999                }
1000                TradingCommand::BatchCancelOrders(ref command) => {
1001                    matching_engine.process_batch_cancel(command, account_id);
1002                }
1003                TradingCommand::SubmitOrderList(ref command) => {
1004                    let mut orders: Vec<OrderAny> = self
1005                        .cache
1006                        .borrow()
1007                        .orders_for_ids(&command.order_list.client_order_ids, command);
1008
1009                    for order in &mut orders {
1010                        matching_engine.process_order(order, account_id);
1011                    }
1012                }
1013                _ => {}
1014            }
1015        } else {
1016            panic!(
1017                "Matching engine not found for instrument {}",
1018                command.instrument_id()
1019            );
1020        }
1021    }
1022
1023    fn generate_fresh_account_state(&self) {
1024        let balances: Vec<AccountBalance> = self
1025            .starting_balances
1026            .iter()
1027            .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
1028            .collect();
1029
1030        if let Some(exec_client) = &self.exec_client {
1031            exec_client
1032                .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
1033                .unwrap();
1034        }
1035
1036        if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
1037            margin_account.set_default_leverage(self.default_leverage);
1038            for (instrument_id, leverage) in &self.leverages {
1039                margin_account.set_leverage(*instrument_id, *leverage);
1040            }
1041
1042            if let Some(model) = &self.margin_model {
1043                margin_account.set_margin_model(model.clone());
1044            }
1045            self.cache
1046                .borrow_mut()
1047                .update_account(&AccountAny::Margin(margin_account))
1048                .unwrap();
1049        }
1050    }
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055    use std::{
1056        cell::{Cell, RefCell},
1057        collections::BinaryHeap,
1058        rc::Rc,
1059    };
1060
1061    use nautilus_common::{
1062        cache::Cache,
1063        clock::TestClock,
1064        messages::execution::{SubmitOrder, TradingCommand},
1065        msgbus::{self, stubs::get_typed_message_saving_handler},
1066    };
1067    use nautilus_core::{UUID4, UnixNanos};
1068    use nautilus_execution::models::{
1069        fee::{FeeModelAny, MakerTakerFeeModel},
1070        latency::StaticLatencyModel,
1071    };
1072    use nautilus_model::{
1073        accounts::{AccountAny, MarginAccount},
1074        data::{
1075            Bar, BarType, BookOrder, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas,
1076            QuoteTick, TradeTick,
1077        },
1078        enums::{
1079            AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
1080            OmsType, OrderSide, OrderType,
1081        },
1082        events::AccountState,
1083        identifiers::{
1084            AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
1085        },
1086        instruments::{
1087            CryptoPerpetual, Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt,
1088        },
1089        orders::{Order, OrderAny, OrderTestBuilder},
1090        stubs::TestDefault,
1091        types::{AccountBalance, Currency, Money, Price, Quantity},
1092    };
1093    use rstest::rstest;
1094    use rust_decimal::Decimal;
1095
1096    use crate::{
1097        config::SimulatedVenueConfig,
1098        exchange::{InflightCommand, SimulatedExchange},
1099        execution_client::BacktestExecutionClient,
1100        modules::{ExchangeContext, SimulationModule},
1101    };
1102
1103    fn get_exchange(
1104        venue: Venue,
1105        account_type: AccountType,
1106        book_type: BookType,
1107        cache: Option<Rc<RefCell<Cache>>>,
1108    ) -> Rc<RefCell<SimulatedExchange>> {
1109        let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
1110        let clock = Rc::new(RefCell::new(TestClock::new()));
1111        let config = SimulatedVenueConfig::builder()
1112            .venue(venue)
1113            .oms_type(OmsType::Netting)
1114            .account_type(account_type)
1115            .book_type(book_type)
1116            .starting_balances(vec![Money::new(1000.0, Currency::USD())])
1117            .default_leverage(Decimal::ONE)
1118            .fee_model(FeeModelAny::MakerTaker(MakerTakerFeeModel))
1119            .build();
1120        let exchange = Rc::new(RefCell::new(
1121            SimulatedExchange::new(config, cache.clone(), clock).unwrap(),
1122        ));
1123
1124        let clock = TestClock::new();
1125        let execution_client = BacktestExecutionClient::new(
1126            TraderId::test_default(),
1127            AccountId::test_default(),
1128            &exchange,
1129            cache,
1130            Rc::new(RefCell::new(clock)),
1131            None,
1132            None,
1133        );
1134        exchange
1135            .borrow_mut()
1136            .register_client(Rc::new(execution_client));
1137
1138        exchange
1139    }
1140
1141    fn create_submit_order_command(
1142        ts_init: UnixNanos,
1143        client_order_id: &str,
1144    ) -> (OrderAny, TradingCommand) {
1145        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
1146        let order = OrderTestBuilder::new(OrderType::Market)
1147            .instrument_id(instrument_id)
1148            .client_order_id(ClientOrderId::new(client_order_id))
1149            .quantity(Quantity::from(1))
1150            .build();
1151        let command = TradingCommand::SubmitOrder(SubmitOrder::new(
1152            TraderId::test_default(),
1153            None,
1154            StrategyId::test_default(),
1155            instrument_id,
1156            order.client_order_id(),
1157            order.init_event().clone(),
1158            None,
1159            None,
1160            None, // params
1161            UUID4::default(),
1162            ts_init,
1163        ));
1164        (order, command)
1165    }
1166
1167    #[rstest]
1168    #[should_panic(
1169        expected = "Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"
1170    )]
1171    fn test_venue_mismatch_between_exchange_and_instrument(
1172        crypto_perpetual_ethusdt: CryptoPerpetual,
1173    ) {
1174        let exchange = get_exchange(
1175            Venue::new("SIM"),
1176            AccountType::Margin,
1177            BookType::L1_MBP,
1178            None,
1179        );
1180        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1181        exchange.borrow_mut().add_instrument(instrument).unwrap();
1182    }
1183
1184    #[rstest]
1185    #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
1186    fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
1187        let exchange = get_exchange(
1188            Venue::new("BINANCE"),
1189            AccountType::Cash,
1190            BookType::L1_MBP,
1191            None,
1192        );
1193        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1194        exchange.borrow_mut().add_instrument(instrument).unwrap();
1195    }
1196
1197    #[rstest]
1198    fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
1199        let exchange = get_exchange(
1200            Venue::new("BINANCE"),
1201            AccountType::Margin,
1202            BookType::L1_MBP,
1203            None,
1204        );
1205        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1206
1207        // register instrument
1208        exchange.borrow_mut().add_instrument(instrument).unwrap();
1209
1210        // process tick
1211        let quote_tick = QuoteTick::new(
1212            crypto_perpetual_ethusdt.id,
1213            Price::from("1000.00"),
1214            Price::from("1001.00"),
1215            Quantity::from("1.000"),
1216            Quantity::from("1.000"),
1217            UnixNanos::default(),
1218            UnixNanos::default(),
1219        );
1220        exchange.borrow_mut().process_quote_tick(&quote_tick);
1221
1222        let best_bid_price = exchange
1223            .borrow()
1224            .best_bid_price(crypto_perpetual_ethusdt.id);
1225        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1226        let best_ask_price = exchange
1227            .borrow()
1228            .best_ask_price(crypto_perpetual_ethusdt.id);
1229        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1230    }
1231
1232    #[rstest]
1233    fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
1234        let exchange = get_exchange(
1235            Venue::new("BINANCE"),
1236            AccountType::Margin,
1237            BookType::L1_MBP,
1238            None,
1239        );
1240        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1241
1242        // register instrument
1243        exchange.borrow_mut().add_instrument(instrument).unwrap();
1244
1245        // process tick
1246        let trade_tick = TradeTick::new(
1247            crypto_perpetual_ethusdt.id,
1248            Price::from("1000.00"),
1249            Quantity::from("1.000"),
1250            AggressorSide::Buyer,
1251            TradeId::from("1"),
1252            UnixNanos::default(),
1253            UnixNanos::default(),
1254        );
1255        exchange.borrow_mut().process_trade_tick(&trade_tick);
1256
1257        let best_bid_price = exchange
1258            .borrow()
1259            .best_bid_price(crypto_perpetual_ethusdt.id);
1260        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1261        let best_ask = exchange
1262            .borrow()
1263            .best_ask_price(crypto_perpetual_ethusdt.id);
1264        assert_eq!(best_ask, Some(Price::from("1000.00")));
1265    }
1266
1267    #[rstest]
1268    fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1269        let exchange = get_exchange(
1270            Venue::new("BINANCE"),
1271            AccountType::Margin,
1272            BookType::L1_MBP,
1273            None,
1274        );
1275        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1276
1277        // register instrument
1278        exchange.borrow_mut().add_instrument(instrument).unwrap();
1279
1280        // process bar
1281        let bar = Bar::new(
1282            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1283            Price::from("1500.00"),
1284            Price::from("1505.00"),
1285            Price::from("1490.00"),
1286            Price::from("1502.00"),
1287            Quantity::from("100.000"),
1288            UnixNanos::default(),
1289            UnixNanos::default(),
1290        );
1291        exchange.borrow_mut().process_bar(bar);
1292
1293        // this will be processed as ticks so both bid and ask will be the same as close of the bar
1294        let best_bid_price = exchange
1295            .borrow()
1296            .best_bid_price(crypto_perpetual_ethusdt.id);
1297        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1298        let best_ask_price = exchange
1299            .borrow()
1300            .best_ask_price(crypto_perpetual_ethusdt.id);
1301        assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1302    }
1303
1304    #[rstest]
1305    fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1306        let exchange = get_exchange(
1307            Venue::new("BINANCE"),
1308            AccountType::Margin,
1309            BookType::L1_MBP,
1310            None,
1311        );
1312        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1313
1314        // register instrument
1315        exchange.borrow_mut().add_instrument(instrument).unwrap();
1316
1317        // create both bid and ask based bars
1318        // add +1 on ask to make sure it is different from bid
1319        let bar_bid = Bar::new(
1320            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1321            Price::from("1500.00"),
1322            Price::from("1505.00"),
1323            Price::from("1490.00"),
1324            Price::from("1502.00"),
1325            Quantity::from("100.000"),
1326            UnixNanos::from(1),
1327            UnixNanos::from(1),
1328        );
1329        let bar_ask = Bar::new(
1330            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1331            Price::from("1501.00"),
1332            Price::from("1506.00"),
1333            Price::from("1491.00"),
1334            Price::from("1503.00"),
1335            Quantity::from("100.000"),
1336            UnixNanos::from(1),
1337            UnixNanos::from(1),
1338        );
1339
1340        // process them
1341        exchange.borrow_mut().process_bar(bar_bid);
1342        exchange.borrow_mut().process_bar(bar_ask);
1343
1344        // current bid and ask prices will be the corresponding close of the ask and bid bar
1345        let best_bid_price = exchange
1346            .borrow()
1347            .best_bid_price(crypto_perpetual_ethusdt.id);
1348        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1349        let best_ask_price = exchange
1350            .borrow()
1351            .best_ask_price(crypto_perpetual_ethusdt.id);
1352        assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1353    }
1354
1355    #[rstest]
1356    fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1357        let exchange = get_exchange(
1358            Venue::new("BINANCE"),
1359            AccountType::Margin,
1360            BookType::L2_MBP,
1361            None,
1362        );
1363        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1364
1365        // register instrument
1366        exchange.borrow_mut().add_instrument(instrument).unwrap();
1367
1368        // create order book delta at both bid and ask with incremented ts init and sequence
1369        let delta_buy = OrderBookDelta::new(
1370            crypto_perpetual_ethusdt.id,
1371            BookAction::Add,
1372            BookOrder::new(
1373                OrderSide::Buy,
1374                Price::from("1000.00"),
1375                Quantity::from("1.000"),
1376                1,
1377            ),
1378            0,
1379            0,
1380            UnixNanos::from(1),
1381            UnixNanos::from(1),
1382        );
1383        let delta_sell = OrderBookDelta::new(
1384            crypto_perpetual_ethusdt.id,
1385            BookAction::Add,
1386            BookOrder::new(
1387                OrderSide::Sell,
1388                Price::from("1001.00"),
1389                Quantity::from("1.000"),
1390                1,
1391            ),
1392            0,
1393            1,
1394            UnixNanos::from(2),
1395            UnixNanos::from(2),
1396        );
1397
1398        // process both deltas
1399        exchange.borrow_mut().process_order_book_delta(delta_buy);
1400        exchange.borrow_mut().process_order_book_delta(delta_sell);
1401
1402        let book = exchange
1403            .borrow()
1404            .get_book(crypto_perpetual_ethusdt.id)
1405            .unwrap()
1406            .clone();
1407        assert_eq!(book.update_count, 2);
1408        assert_eq!(book.sequence, 1);
1409        assert_eq!(book.ts_last, UnixNanos::from(2));
1410        let best_bid_price = exchange
1411            .borrow()
1412            .best_bid_price(crypto_perpetual_ethusdt.id);
1413        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1414        let best_ask_price = exchange
1415            .borrow()
1416            .best_ask_price(crypto_perpetual_ethusdt.id);
1417        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1418    }
1419
1420    #[rstest]
1421    fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1422        let exchange = get_exchange(
1423            Venue::new("BINANCE"),
1424            AccountType::Margin,
1425            BookType::L2_MBP,
1426            None,
1427        );
1428        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1429
1430        // register instrument
1431        exchange.borrow_mut().add_instrument(instrument).unwrap();
1432
1433        // create two sell order book deltas with same timestamps and higher sequence
1434        let delta_sell_1 = OrderBookDelta::new(
1435            crypto_perpetual_ethusdt.id,
1436            BookAction::Add,
1437            BookOrder::new(
1438                OrderSide::Sell,
1439                Price::from("1000.00"),
1440                Quantity::from("3.000"),
1441                1,
1442            ),
1443            0,
1444            0,
1445            UnixNanos::from(1),
1446            UnixNanos::from(1),
1447        );
1448        let delta_sell_2 = OrderBookDelta::new(
1449            crypto_perpetual_ethusdt.id,
1450            BookAction::Add,
1451            BookOrder::new(
1452                OrderSide::Sell,
1453                Price::from("1001.00"),
1454                Quantity::from("1.000"),
1455                1,
1456            ),
1457            0,
1458            1,
1459            UnixNanos::from(1),
1460            UnixNanos::from(1),
1461        );
1462        let orderbook_deltas = OrderBookDeltas::new(
1463            crypto_perpetual_ethusdt.id,
1464            vec![delta_sell_1, delta_sell_2],
1465        );
1466
1467        // process both deltas
1468        exchange
1469            .borrow_mut()
1470            .process_order_book_deltas(&orderbook_deltas);
1471
1472        let book = exchange
1473            .borrow()
1474            .get_book(crypto_perpetual_ethusdt.id)
1475            .unwrap()
1476            .clone();
1477        assert_eq!(book.update_count, 2);
1478        assert_eq!(book.sequence, 1);
1479        assert_eq!(book.ts_last, UnixNanos::from(1));
1480        let best_bid_price = exchange
1481            .borrow()
1482            .best_bid_price(crypto_perpetual_ethusdt.id);
1483        // no bid orders in orderbook deltas
1484        assert_eq!(best_bid_price, None);
1485        let best_ask_price = exchange
1486            .borrow()
1487            .best_ask_price(crypto_perpetual_ethusdt.id);
1488        // best ask price is the first order in orderbook deltas
1489        assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1490    }
1491
1492    #[rstest]
1493    fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1494        let exchange = get_exchange(
1495            Venue::new("BINANCE"),
1496            AccountType::Margin,
1497            BookType::L2_MBP,
1498            None,
1499        );
1500        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1501
1502        // register instrument
1503        exchange.borrow_mut().add_instrument(instrument).unwrap();
1504
1505        let instrument_status = InstrumentStatus::new(
1506            crypto_perpetual_ethusdt.id,
1507            MarketStatusAction::Close, // close the market
1508            UnixNanos::from(1),
1509            UnixNanos::from(1),
1510            None,
1511            None,
1512            None,
1513            None,
1514            None,
1515        );
1516
1517        exchange
1518            .borrow_mut()
1519            .process_instrument_status(instrument_status);
1520
1521        let market_status = exchange
1522            .borrow()
1523            .get_matching_engine(&crypto_perpetual_ethusdt.id)
1524            .unwrap()
1525            .market_status;
1526        assert_eq!(market_status, MarketStatus::Closed);
1527    }
1528
1529    #[rstest]
1530    fn test_accounting() {
1531        let account_type = AccountType::Margin;
1532        let mut cache = Cache::default();
1533        let (handler, saving_handler) = get_typed_message_saving_handler::<AccountState>(None);
1534        msgbus::register_account_state_endpoint("Portfolio.update_account".into(), handler);
1535        let margin_account = MarginAccount::new(
1536            AccountState::new(
1537                AccountId::from("SIM-001"),
1538                account_type,
1539                vec![AccountBalance::new(
1540                    Money::from("1000 USD"),
1541                    Money::from("0 USD"),
1542                    Money::from("1000 USD"),
1543                )],
1544                vec![],
1545                false,
1546                UUID4::default(),
1547                UnixNanos::default(),
1548                UnixNanos::default(),
1549                None,
1550            ),
1551            false,
1552        );
1553        let () = cache
1554            .add_account(AccountAny::Margin(margin_account))
1555            .unwrap();
1556        // build indexes
1557        cache.build_index();
1558
1559        let exchange = get_exchange(
1560            Venue::new("SIM"),
1561            account_type,
1562            BookType::L2_MBP,
1563            Some(Rc::new(RefCell::new(cache))),
1564        );
1565        exchange.borrow_mut().initialize_account();
1566
1567        // Test adjust account, increase balance by 500 USD
1568        exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1569
1570        // Check if we received two messages, one for initial account state and one for adjusted account state
1571        let messages = saving_handler.get_messages();
1572        assert_eq!(messages.len(), 2);
1573        let account_state_first = messages.first().unwrap();
1574        let account_state_second = messages.last().unwrap();
1575
1576        assert_eq!(account_state_first.balances.len(), 1);
1577        let current_balance = account_state_first.balances[0];
1578        assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1579        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1580        assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1581
1582        assert_eq!(account_state_second.balances.len(), 1);
1583        let current_balance = account_state_second.balances[0];
1584        assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1585        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1586        assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1587    }
1588
1589    #[rstest]
1590    fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1591        // Create 3 inflight commands with different timestamps and counters
1592        let (_, cmd1) = create_submit_order_command(UnixNanos::from(100), "O-1");
1593        let (_, cmd2) = create_submit_order_command(UnixNanos::from(200), "O-2");
1594        let (_, cmd3) = create_submit_order_command(UnixNanos::from(100), "O-3");
1595
1596        let inflight1 = InflightCommand::new(UnixNanos::from(100), 1, cmd1);
1597        let inflight2 = InflightCommand::new(UnixNanos::from(200), 2, cmd2);
1598        let inflight3 = InflightCommand::new(UnixNanos::from(100), 2, cmd3);
1599
1600        // Create a binary heap and push the inflight commands
1601        let mut inflight_heap = BinaryHeap::new();
1602        inflight_heap.push(inflight1);
1603        inflight_heap.push(inflight2);
1604        inflight_heap.push(inflight3);
1605
1606        // Pop the inflight commands and check if they are in the correct order
1607        // by our custom ordering with counter and timestamp
1608        let first = inflight_heap.pop().unwrap();
1609        let second = inflight_heap.pop().unwrap();
1610        let third = inflight_heap.pop().unwrap();
1611
1612        assert_eq!(first.timestamp, UnixNanos::from(100));
1613        assert_eq!(first.counter, 1);
1614        assert_eq!(second.timestamp, UnixNanos::from(100));
1615        assert_eq!(second.counter, 2);
1616        assert_eq!(third.timestamp, UnixNanos::from(200));
1617        assert_eq!(third.counter, 2);
1618    }
1619
1620    #[rstest]
1621    fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1622        let exchange = get_exchange(
1623            Venue::new("BINANCE"),
1624            AccountType::Margin,
1625            BookType::L2_MBP,
1626            None,
1627        );
1628
1629        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1630        exchange.borrow_mut().add_instrument(instrument).unwrap();
1631
1632        let (order1, command1) = create_submit_order_command(UnixNanos::from(100), "O-1");
1633        let (order2, command2) = create_submit_order_command(UnixNanos::from(200), "O-2");
1634
1635        exchange
1636            .borrow()
1637            .cache()
1638            .borrow_mut()
1639            .add_order(order1, None, None, false)
1640            .unwrap();
1641        exchange
1642            .borrow()
1643            .cache()
1644            .borrow_mut()
1645            .add_order(order2, None, None, false)
1646            .unwrap();
1647
1648        exchange.borrow_mut().send(command1);
1649        exchange.borrow_mut().send(command2);
1650
1651        // Verify that message queue has 2 commands and inflight queue is empty
1652        // as we are not using latency model
1653        assert_eq!(exchange.borrow().message_queue.len(), 2);
1654        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1655
1656        // Process command and check that queues is empty
1657        exchange.borrow_mut().process(UnixNanos::from(300));
1658        assert_eq!(exchange.borrow().message_queue.len(), 0);
1659        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1660    }
1661
1662    #[rstest]
1663    fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1664        // StaticLatencyModel adds base_latency to each operation latency
1665        // base=100, insert=200 -> effective insert latency = 300
1666        let latency_model = StaticLatencyModel::new(
1667            UnixNanos::from(100),
1668            UnixNanos::from(200),
1669            UnixNanos::from(300),
1670            UnixNanos::from(100),
1671        );
1672        let exchange = get_exchange(
1673            Venue::new("BINANCE"),
1674            AccountType::Margin,
1675            BookType::L2_MBP,
1676            None,
1677        );
1678        exchange
1679            .borrow_mut()
1680            .set_latency_model(Box::new(latency_model));
1681
1682        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1683        exchange.borrow_mut().add_instrument(instrument).unwrap();
1684
1685        let (order1, command1) = create_submit_order_command(UnixNanos::from(100), "O-1");
1686        let (order2, command2) = create_submit_order_command(UnixNanos::from(150), "O-2");
1687
1688        exchange
1689            .borrow()
1690            .cache()
1691            .borrow_mut()
1692            .add_order(order1, None, None, false)
1693            .unwrap();
1694        exchange
1695            .borrow()
1696            .cache()
1697            .borrow_mut()
1698            .add_order(order2, None, None, false)
1699            .unwrap();
1700
1701        exchange.borrow_mut().send(command1);
1702        exchange.borrow_mut().send(command2);
1703
1704        // Verify that inflight queue has 2 commands and message queue is empty
1705        assert_eq!(exchange.borrow().message_queue.len(), 0);
1706        assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1707        // First inflight command: ts_init=100 + effective_insert_latency=300 = 400
1708        assert_eq!(
1709            exchange
1710                .borrow()
1711                .inflight_queue
1712                .iter()
1713                .next()
1714                .unwrap()
1715                .timestamp,
1716            UnixNanos::from(400)
1717        );
1718        // Second inflight command: ts_init=150 + effective_insert_latency=300 = 450
1719        assert_eq!(
1720            exchange
1721                .borrow()
1722                .inflight_queue
1723                .iter()
1724                .nth(1)
1725                .unwrap()
1726                .timestamp,
1727            UnixNanos::from(450)
1728        );
1729
1730        // Process at timestamp 420, and test that only first command is processed
1731        exchange.borrow_mut().process(UnixNanos::from(420));
1732        assert_eq!(exchange.borrow().message_queue.len(), 0);
1733        assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1734        assert_eq!(
1735            exchange
1736                .borrow()
1737                .inflight_queue
1738                .iter()
1739                .next()
1740                .unwrap()
1741                .timestamp,
1742            UnixNanos::from(450)
1743        );
1744    }
1745
1746    #[rstest]
1747    fn test_process_iterates_matching_engines_after_commands(
1748        crypto_perpetual_ethusdt: CryptoPerpetual,
1749    ) {
1750        let cache = Rc::new(RefCell::new(Cache::default()));
1751        let exchange = get_exchange(
1752            Venue::new("BINANCE"),
1753            AccountType::Margin,
1754            BookType::L1_MBP,
1755            Some(cache.clone()),
1756        );
1757        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1758        let instrument_id = instrument.id();
1759        exchange.borrow_mut().add_instrument(instrument).unwrap();
1760
1761        let quote = QuoteTick::new(
1762            instrument_id,
1763            Price::from("1000.00"),
1764            Price::from("1001.00"),
1765            Quantity::from("1.000"),
1766            Quantity::from("1.000"),
1767            UnixNanos::from(1),
1768            UnixNanos::from(1),
1769        );
1770        exchange.borrow_mut().process_quote_tick(&quote);
1771
1772        // Create a passive buy limit below the ask (should NOT fill)
1773        let order = OrderTestBuilder::new(OrderType::Limit)
1774            .instrument_id(instrument_id)
1775            .client_order_id(ClientOrderId::new("O-LIMIT-1"))
1776            .side(OrderSide::Buy)
1777            .quantity(Quantity::from("1.000"))
1778            .price(Price::from("999.00"))
1779            .build();
1780
1781        cache
1782            .borrow_mut()
1783            .add_order(order.clone(), None, None, false)
1784            .unwrap();
1785
1786        let command = TradingCommand::SubmitOrder(SubmitOrder::new(
1787            TraderId::test_default(),
1788            None,
1789            StrategyId::test_default(),
1790            instrument_id,
1791            order.client_order_id(),
1792            order.init_event().clone(),
1793            None,
1794            None,
1795            None,
1796            UUID4::default(),
1797            UnixNanos::from(1),
1798        ));
1799        exchange.borrow_mut().send(command);
1800
1801        exchange.borrow_mut().process(UnixNanos::from(1));
1802
1803        let open_orders = exchange.borrow().get_open_orders(Some(instrument_id));
1804        assert_eq!(open_orders.len(), 1);
1805        assert_eq!(
1806            open_orders[0].client_order_id,
1807            ClientOrderId::new("O-LIMIT-1")
1808        );
1809    }
1810
1811    #[derive(Clone)]
1812    struct MockModuleCounts {
1813        pre_process: Rc<Cell<u32>>,
1814        process: Rc<Cell<u32>>,
1815        reset: Rc<Cell<u32>>,
1816        log_diagnostics: Rc<Cell<u32>>,
1817    }
1818
1819    impl MockModuleCounts {
1820        fn new() -> Self {
1821            Self {
1822                pre_process: Rc::new(Cell::new(0)),
1823                process: Rc::new(Cell::new(0)),
1824                reset: Rc::new(Cell::new(0)),
1825                log_diagnostics: Rc::new(Cell::new(0)),
1826            }
1827        }
1828    }
1829
1830    struct MockSimulationModule {
1831        counts: MockModuleCounts,
1832    }
1833
1834    impl MockSimulationModule {
1835        fn new(counts: MockModuleCounts) -> Self {
1836            Self { counts }
1837        }
1838    }
1839
1840    impl SimulationModule for MockSimulationModule {
1841        fn pre_process(&self, _data: &Data) {
1842            self.counts
1843                .pre_process
1844                .set(self.counts.pre_process.get() + 1);
1845        }
1846
1847        fn process(&self, _ts_now: UnixNanos, _ctx: &ExchangeContext) -> Vec<Money> {
1848            self.counts.process.set(self.counts.process.get() + 1);
1849            Vec::new()
1850        }
1851
1852        fn log_diagnostics(&self) {
1853            self.counts
1854                .log_diagnostics
1855                .set(self.counts.log_diagnostics.get() + 1);
1856        }
1857
1858        fn reset(&self) {
1859            self.counts.reset.set(self.counts.reset.get() + 1);
1860        }
1861    }
1862
1863    fn get_exchange_with_module(
1864        venue: Venue,
1865        counts: MockModuleCounts,
1866    ) -> Rc<RefCell<SimulatedExchange>> {
1867        let cache = Rc::new(RefCell::new(Cache::default()));
1868        let clock = Rc::new(RefCell::new(TestClock::new()));
1869
1870        // Register msgbus handler so generate_account_state works during reset
1871        let (handler, _saving_handler) = get_typed_message_saving_handler::<AccountState>(None);
1872        msgbus::register_account_state_endpoint("Portfolio.update_account".into(), handler);
1873
1874        let modules: Vec<Box<dyn SimulationModule>> =
1875            vec![Box::new(MockSimulationModule::new(counts))];
1876
1877        let config = SimulatedVenueConfig::builder()
1878            .venue(venue)
1879            .oms_type(OmsType::Netting)
1880            .account_type(AccountType::Margin)
1881            .book_type(BookType::L1_MBP)
1882            .starting_balances(vec![Money::new(1000.0, Currency::USD())])
1883            .default_leverage(Decimal::ONE)
1884            .modules(modules)
1885            .fee_model(FeeModelAny::MakerTaker(MakerTakerFeeModel))
1886            .build();
1887        let exchange = Rc::new(RefCell::new(
1888            SimulatedExchange::new(config, cache.clone(), clock).unwrap(),
1889        ));
1890
1891        let exec_clock = TestClock::new();
1892        let execution_client = BacktestExecutionClient::new(
1893            TraderId::test_default(),
1894            AccountId::test_default(),
1895            &exchange,
1896            cache,
1897            Rc::new(RefCell::new(exec_clock)),
1898            None,
1899            None,
1900        );
1901        exchange
1902            .borrow_mut()
1903            .register_client(Rc::new(execution_client));
1904
1905        exchange
1906    }
1907
1908    #[rstest]
1909    fn test_module_pre_process_called_on_quote(crypto_perpetual_ethusdt: CryptoPerpetual) {
1910        let counts = MockModuleCounts::new();
1911        let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
1912        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1913        exchange.borrow_mut().add_instrument(instrument).unwrap();
1914
1915        let quote = QuoteTick::new(
1916            crypto_perpetual_ethusdt.id,
1917            Price::from("1000.00"),
1918            Price::from("1001.00"),
1919            Quantity::from("1.000"),
1920            Quantity::from("1.000"),
1921            UnixNanos::default(),
1922            UnixNanos::default(),
1923        );
1924        exchange.borrow_mut().process_quote_tick(&quote);
1925
1926        assert_eq!(counts.pre_process.get(), 1);
1927        assert_eq!(counts.process.get(), 0);
1928    }
1929
1930    #[rstest]
1931    fn test_module_pre_process_called_on_instrument_status(
1932        crypto_perpetual_ethusdt: CryptoPerpetual,
1933    ) {
1934        let counts = MockModuleCounts::new();
1935        let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
1936        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1937        exchange.borrow_mut().add_instrument(instrument).unwrap();
1938
1939        let status = InstrumentStatus::new(
1940            crypto_perpetual_ethusdt.id,
1941            MarketStatusAction::Close,
1942            UnixNanos::from(1),
1943            UnixNanos::from(1),
1944            None,
1945            None,
1946            None,
1947            None,
1948            None,
1949        );
1950        exchange.borrow_mut().process_instrument_status(status);
1951
1952        assert_eq!(counts.pre_process.get(), 1);
1953        assert_eq!(counts.process.get(), 0);
1954    }
1955
1956    #[rstest]
1957    fn test_module_process_not_called_by_process(crypto_perpetual_ethusdt: CryptoPerpetual) {
1958        let counts = MockModuleCounts::new();
1959        let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
1960        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1961        exchange.borrow_mut().add_instrument(instrument).unwrap();
1962
1963        // process() drains commands but does not run modules
1964        exchange.borrow_mut().process(UnixNanos::from(100));
1965
1966        assert_eq!(counts.process.get(), 0);
1967    }
1968
1969    #[rstest]
1970    fn test_module_process_called_by_process_modules(crypto_perpetual_ethusdt: CryptoPerpetual) {
1971        let counts = MockModuleCounts::new();
1972        let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
1973        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1974        exchange.borrow_mut().add_instrument(instrument).unwrap();
1975
1976        exchange.borrow_mut().process_modules(UnixNanos::from(100));
1977
1978        assert_eq!(counts.process.get(), 1);
1979    }
1980
1981    #[rstest]
1982    fn test_module_reset_called_on_reset(crypto_perpetual_ethusdt: CryptoPerpetual) {
1983        let counts = MockModuleCounts::new();
1984        let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
1985        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1986        exchange.borrow_mut().add_instrument(instrument).unwrap();
1987
1988        // Pre-populate account in cache so generate_fresh_account_state succeeds
1989        let margin_account = MarginAccount::new(
1990            AccountState::new(
1991                AccountId::test_default(),
1992                AccountType::Margin,
1993                vec![AccountBalance::new(
1994                    Money::from("1000 USD"),
1995                    Money::from("0 USD"),
1996                    Money::from("1000 USD"),
1997                )],
1998                vec![],
1999                false,
2000                UUID4::default(),
2001                UnixNanos::default(),
2002                UnixNanos::default(),
2003                None,
2004            ),
2005            false,
2006        );
2007        exchange
2008            .borrow()
2009            .cache()
2010            .borrow_mut()
2011            .add_account(AccountAny::Margin(margin_account))
2012            .unwrap();
2013
2014        exchange.borrow_mut().reset();
2015
2016        assert_eq!(counts.reset.get(), 1);
2017    }
2018
2019    #[rstest]
2020    fn test_module_log_diagnostics(crypto_perpetual_ethusdt: CryptoPerpetual) {
2021        let counts = MockModuleCounts::new();
2022        let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
2023        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
2024        exchange.borrow_mut().add_instrument(instrument).unwrap();
2025
2026        exchange.borrow().log_diagnostics();
2027
2028        assert_eq!(counts.log_diagnostics.get(), 1);
2029    }
2030
2031    #[rstest]
2032    fn test_module_pre_process_and_process_call_order(crypto_perpetual_ethusdt: CryptoPerpetual) {
2033        let counts = MockModuleCounts::new();
2034        let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
2035        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
2036        exchange.borrow_mut().add_instrument(instrument).unwrap();
2037
2038        // pre_process called per data item, process_modules called separately
2039        let quote = QuoteTick::new(
2040            crypto_perpetual_ethusdt.id,
2041            Price::from("1000.00"),
2042            Price::from("1001.00"),
2043            Quantity::from("1.000"),
2044            Quantity::from("1.000"),
2045            UnixNanos::default(),
2046            UnixNanos::default(),
2047        );
2048        exchange.borrow_mut().process_quote_tick(&quote);
2049        exchange.borrow_mut().process_quote_tick(&quote);
2050        exchange.borrow_mut().process(UnixNanos::from(100));
2051        exchange.borrow_mut().process_modules(UnixNanos::from(100));
2052
2053        assert_eq!(counts.pre_process.get(), 2);
2054        assert_eq!(counts.process.get(), 1);
2055    }
2056}