1use std::{
19 cell::RefCell,
20 collections::{BinaryHeap, VecDeque},
21 fmt::Debug,
22 rc::Rc,
23};
24
25use ahash::AHashMap;
26use indexmap::IndexMap;
27use nautilus_common::{
28 cache::Cache,
29 clients::ExecutionClient,
30 clock::{Clock, TestClock},
31 messages::execution::TradingCommand,
32};
33use nautilus_core::{
34 UnixNanos,
35 correctness::{CorrectnessResultExt, FAILED, check_equal},
36};
37use nautilus_execution::{
38 matching_core::OrderMatchInfo,
39 matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
40 models::{fee::FeeModelAny, fill::FillModelAny, latency::LatencyModel},
41};
42use nautilus_model::{
43 accounts::{AccountAny, margin_model::MarginModelAny},
44 data::{
45 Bar, Data, InstrumentClose, InstrumentStatus, OrderBookDelta, OrderBookDeltas,
46 OrderBookDeltas_API, OrderBookDepth10, QuoteTick, TradeTick,
47 },
48 enums::{AccountType, AggressorSide, BookType, OmsType},
49 identifiers::{AccountId, InstrumentId, Venue},
50 instruments::{Instrument, InstrumentAny},
51 orderbook::OrderBook,
52 orders::{Order, OrderAny},
53 types::{AccountBalance, Currency, Money, Price},
54};
55use rust_decimal::Decimal;
56
57use crate::{
58 config::SimulatedVenueConfig,
59 modules::{ExchangeContext, SimulationModule},
60};
61
62#[derive(Debug, Eq, PartialEq)]
66struct InflightCommand {
67 timestamp: UnixNanos,
68 counter: u32,
69 command: TradingCommand,
70}
71
72impl InflightCommand {
73 const fn new(timestamp: UnixNanos, counter: u32, command: TradingCommand) -> Self {
74 Self {
75 timestamp,
76 counter,
77 command,
78 }
79 }
80}
81
82impl Ord for InflightCommand {
83 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
84 other
86 .timestamp
87 .cmp(&self.timestamp)
88 .then_with(|| other.counter.cmp(&self.counter))
89 }
90}
91
92impl PartialOrd for InflightCommand {
93 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
94 Some(self.cmp(other))
95 }
96}
97
98pub struct SimulatedExchange {
114 pub id: Venue,
116 pub oms_type: OmsType,
118 pub account_type: AccountType,
120 pub base_currency: Option<Currency>,
122 starting_balances: Vec<Money>,
123 book_type: BookType,
124 default_leverage: Decimal,
125 exec_client: Option<Rc<dyn ExecutionClient>>,
126 fee_model: FeeModelAny,
127 fill_model: FillModelAny,
128 latency_model: Option<Box<dyn LatencyModel>>,
129 instruments: AHashMap<InstrumentId, InstrumentAny>,
130 matching_engines: AHashMap<InstrumentId, OrderMatchingEngine>,
131 settlement_prices: AHashMap<InstrumentId, Price>,
132 leverages: AHashMap<InstrumentId, Decimal>,
133 margin_model: Option<MarginModelAny>,
134 modules: Vec<Box<dyn SimulationModule>>,
135 clock: Rc<RefCell<dyn Clock>>,
136 cache: Rc<RefCell<Cache>>,
137 message_queue: VecDeque<TradingCommand>,
138 inflight_queue: BinaryHeap<InflightCommand>,
139 inflight_counter: AHashMap<UnixNanos, u32>,
140 bar_execution: bool,
141 bar_adaptive_high_low_ordering: bool,
142 trade_execution: bool,
143 liquidity_consumption: bool,
144 reject_stop_orders: bool,
145 support_gtd_orders: bool,
146 support_contingent_orders: bool,
147 use_position_ids: bool,
148 use_random_ids: bool,
149 use_reduce_only: bool,
150 use_message_queue: bool,
151 use_market_order_acks: bool,
152 _allow_cash_borrowing: bool,
153 frozen_account: bool,
154 queue_position: bool,
155 oto_full_trigger: bool,
156 price_protection_points: u32,
157}
158
159impl Debug for SimulatedExchange {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 f.debug_struct(stringify!(SimulatedExchange))
162 .field("id", &self.id)
163 .field("account_type", &self.account_type)
164 .finish()
165 }
166}
167
168impl SimulatedExchange {
169 pub fn new(
177 config: SimulatedVenueConfig,
178 cache: Rc<RefCell<Cache>>,
179 clock: Rc<RefCell<dyn Clock>>,
180 ) -> anyhow::Result<Self> {
181 if config.starting_balances.is_empty() {
182 anyhow::bail!("Starting balances must be provided")
183 }
184
185 if config.base_currency.is_some() && config.starting_balances.len() > 1 {
186 anyhow::bail!("single-currency account has multiple starting currencies")
187 }
188
189 let default_leverage = config.default_leverage.unwrap_or_else(|| {
190 if config.account_type == AccountType::Margin {
191 Decimal::from(10)
192 } else {
193 Decimal::from(1)
194 }
195 });
196
197 Ok(Self {
198 id: config.venue,
199 oms_type: config.oms_type,
200 account_type: config.account_type,
201 base_currency: config.base_currency,
202 starting_balances: config.starting_balances,
203 book_type: config.book_type,
204 default_leverage,
205 exec_client: None,
206 fee_model: config.fee_model,
207 fill_model: config.fill_model,
208 latency_model: config.latency_model,
209 instruments: AHashMap::new(),
210 matching_engines: AHashMap::new(),
211 settlement_prices: AHashMap::new(),
212 leverages: config.leverages,
213 margin_model: config.margin_model,
214 modules: config.modules,
215 clock,
216 cache,
217 message_queue: VecDeque::new(),
218 inflight_queue: BinaryHeap::new(),
219 inflight_counter: AHashMap::new(),
220 bar_execution: config.bar_execution,
221 bar_adaptive_high_low_ordering: config.bar_adaptive_high_low_ordering,
222 trade_execution: config.trade_execution,
223 liquidity_consumption: config.liquidity_consumption,
224 reject_stop_orders: config.reject_stop_orders,
225 support_gtd_orders: config.support_gtd_orders,
226 support_contingent_orders: config.support_contingent_orders,
227 use_position_ids: config.use_position_ids,
228 use_random_ids: config.use_random_ids,
229 use_reduce_only: config.use_reduce_only,
230 use_message_queue: config.use_message_queue,
231 use_market_order_acks: config.use_market_order_acks,
232 _allow_cash_borrowing: config.allow_cash_borrowing,
233 frozen_account: config.frozen_account,
234 queue_position: config.queue_position,
235 oto_full_trigger: config.oto_full_trigger,
236 price_protection_points: config.price_protection_points,
237 })
238 }
239
240 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
242 self.exec_client = Some(client);
243 }
244
245 pub fn set_fill_model(&mut self, fill_model: FillModelAny) {
247 for matching_engine in self.matching_engines.values_mut() {
248 matching_engine.set_fill_model(fill_model.clone());
249 log::info!(
250 "Setting fill model for {} to {}",
251 matching_engine.venue,
252 self.fill_model
253 );
254 }
255 self.fill_model = fill_model;
256 }
257
258 pub fn set_latency_model(&mut self, latency_model: Box<dyn LatencyModel>) {
260 self.latency_model = Some(latency_model);
261 }
262
263 pub fn set_settlement_price(&mut self, instrument_id: InstrumentId, price: Price) {
265 self.settlement_prices.insert(instrument_id, price);
266 }
267
268 #[must_use]
270 pub const fn book_type(&self) -> BookType {
271 self.book_type
272 }
273
274 pub fn instrument_ids(&self) -> impl Iterator<Item = &InstrumentId> {
276 self.instruments.keys()
277 }
278
279 pub fn initialize_account(&mut self) {
280 self.generate_fresh_account_state();
281 }
282
283 pub fn load_open_orders(&mut self) {
285 let mut open_orders: Vec<(OrderAny, AccountId)> = {
286 let cache = self.cache.as_ref().borrow();
287 cache
288 .orders_open(Some(&self.id), None, None, None, None)
289 .into_iter()
290 .filter(|order| !order.is_emulated())
291 .filter_map(|order| {
292 order
293 .account_id()
294 .map(|account_id| (order.clone(), account_id))
295 })
296 .collect()
297 };
298
299 open_orders.sort_by(|(a, _), (b, _)| {
301 a.ts_init()
302 .cmp(&b.ts_init())
303 .then_with(|| a.client_order_id().cmp(&b.client_order_id()))
304 });
305
306 for (mut order, account_id) in open_orders {
307 let instrument_id = order.instrument_id();
308 if let Some(matching_engine) = self.matching_engines.get_mut(&instrument_id) {
309 matching_engine.process_order(&mut order, account_id);
310 } else {
311 log::error!(
312 "No matching engine for {instrument_id} to load open order {}",
313 order.client_order_id()
314 );
315 }
316 }
317 }
318
319 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
330 check_equal(
331 &instrument.id().venue,
332 &self.id,
333 "Venue of instrument id",
334 "Venue of simulated exchange",
335 )
336 .expect_display(FAILED);
337
338 if self.account_type == AccountType::Cash
339 && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
340 || matches!(instrument, InstrumentAny::CryptoFuture(_))
341 || matches!(instrument, InstrumentAny::PerpetualContract(_)))
342 {
343 anyhow::bail!("Cash account cannot trade futures or perpetuals")
344 }
345
346 self.instruments.insert(instrument.id(), instrument.clone());
347
348 let price_protection = if self.price_protection_points == 0 {
349 None
350 } else {
351 Some(self.price_protection_points)
352 };
353
354 let matching_engine_config = OrderMatchingEngineConfig::builder()
355 .bar_execution(self.bar_execution)
356 .bar_adaptive_high_low_ordering(self.bar_adaptive_high_low_ordering)
357 .trade_execution(self.trade_execution)
358 .liquidity_consumption(self.liquidity_consumption)
359 .reject_stop_orders(self.reject_stop_orders)
360 .support_gtd_orders(self.support_gtd_orders)
361 .support_contingent_orders(self.support_contingent_orders)
362 .use_position_ids(self.use_position_ids)
363 .use_random_ids(self.use_random_ids)
364 .use_reduce_only(self.use_reduce_only)
365 .use_market_order_acks(self.use_market_order_acks)
366 .queue_position(self.queue_position)
367 .oto_full_trigger(self.oto_full_trigger)
368 .maybe_price_protection_points(price_protection)
369 .build();
370 let instrument_id = instrument.id();
371 let matching_engine = OrderMatchingEngine::new(
372 instrument,
373 self.instruments.len() as u32,
374 self.fill_model.clone(),
375 self.fee_model.clone(),
376 self.book_type,
377 self.oms_type,
378 self.account_type,
379 self.clock.clone(),
380 Rc::clone(&self.cache),
381 matching_engine_config,
382 );
383 self.matching_engines.insert(instrument_id, matching_engine);
384
385 log::info!("Added instrument {instrument_id} and created matching engine");
386 Ok(())
387 }
388
389 #[must_use]
391 pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
392 self.matching_engines
393 .get(&instrument_id)
394 .and_then(OrderMatchingEngine::best_bid_price)
395 }
396
397 #[must_use]
399 pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
400 self.matching_engines
401 .get(&instrument_id)
402 .and_then(OrderMatchingEngine::best_ask_price)
403 }
404
405 pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
407 self.matching_engines
408 .get(&instrument_id)
409 .map(OrderMatchingEngine::get_book)
410 }
411
412 #[must_use]
414 pub fn get_matching_engine(
415 &self,
416 instrument_id: &InstrumentId,
417 ) -> Option<&OrderMatchingEngine> {
418 self.matching_engines.get(instrument_id)
419 }
420
421 #[must_use]
423 pub const fn get_matching_engines(&self) -> &AHashMap<InstrumentId, OrderMatchingEngine> {
424 &self.matching_engines
425 }
426
427 #[must_use]
429 pub fn get_books(&self) -> AHashMap<InstrumentId, OrderBook> {
430 let mut books = AHashMap::new();
431 for (instrument_id, matching_engine) in &self.matching_engines {
432 books.insert(*instrument_id, matching_engine.get_book().clone());
433 }
434 books
435 }
436
437 #[must_use]
439 pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<OrderMatchInfo> {
440 instrument_id
441 .and_then(|id| {
442 self.matching_engines
443 .get(&id)
444 .map(OrderMatchingEngine::get_open_orders)
445 })
446 .unwrap_or_else(|| {
447 self.matching_engines
448 .values()
449 .flat_map(OrderMatchingEngine::get_open_orders)
450 .collect()
451 })
452 }
453
454 #[must_use]
456 pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<OrderMatchInfo> {
457 instrument_id
458 .and_then(|id| {
459 self.matching_engines
460 .get(&id)
461 .map(|engine| engine.get_open_bid_orders().to_vec())
462 })
463 .unwrap_or_else(|| {
464 self.matching_engines
465 .values()
466 .flat_map(|engine| engine.get_open_bid_orders().to_vec())
467 .collect()
468 })
469 }
470
471 #[must_use]
473 pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<OrderMatchInfo> {
474 instrument_id
475 .and_then(|id| {
476 self.matching_engines
477 .get(&id)
478 .map(|engine| engine.get_open_ask_orders().to_vec())
479 })
480 .unwrap_or_else(|| {
481 self.matching_engines
482 .values()
483 .flat_map(|engine| engine.get_open_ask_orders().to_vec())
484 .collect()
485 })
486 }
487
488 #[must_use]
490 pub fn get_account(&self) -> Option<AccountAny> {
491 self.exec_client
492 .as_ref()
493 .and_then(|client| client.get_account())
494 }
495
496 #[must_use]
498 pub fn cache(&self) -> &Rc<RefCell<Cache>> {
499 &self.cache
500 }
501
502 pub fn adjust_account(&mut self, adjustment: Money) {
508 if self.frozen_account {
509 return;
511 }
512
513 if let Some(exec_client) = &self.exec_client {
514 let venue = exec_client.venue();
515 log::debug!("Adjusting account for venue {venue}");
516 if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
517 match account.balance(Some(adjustment.currency)) {
518 Some(balance) => {
519 let mut current_balance = *balance;
520 current_balance.total = current_balance.total + adjustment;
521 current_balance.free = current_balance.free + adjustment;
522
523 let margins = match account {
524 AccountAny::Margin(margin_account) => margin_account.margins.clone(),
525 _ => IndexMap::new(),
526 };
527
528 if let Some(exec_client) = &self.exec_client {
529 exec_client
530 .generate_account_state(
531 vec![current_balance],
532 margins.values().copied().collect(),
533 true,
534 self.clock.borrow().timestamp_ns(),
535 )
536 .unwrap();
537 }
538 }
539 None => {
540 log::error!(
541 "Cannot adjust account: no balance for currency {}",
542 adjustment.currency
543 );
544 }
545 }
546 } else {
547 log::error!("Cannot adjust account: no account for venue {venue}");
548 }
549 }
550 }
551
552 #[must_use]
554 pub fn has_pending_commands(&self, ts_now: UnixNanos) -> bool {
555 if !self.message_queue.is_empty() {
556 return true;
557 }
558 self.inflight_queue
559 .peek()
560 .is_some_and(|inflight| inflight.timestamp <= ts_now)
561 }
562
563 pub fn iterate_matching_engines(&mut self, ts_now: UnixNanos) {
566 for matching_engine in self.matching_engines.values_mut() {
567 matching_engine.iterate(ts_now, AggressorSide::NoAggressor);
568 }
569 }
570
571 pub fn set_clock_time(&self, ts_now: UnixNanos) {
579 let mut clock_ref = self.clock.borrow_mut();
580 let test_clock = clock_ref
581 .as_any_mut()
582 .downcast_mut::<TestClock>()
583 .expect("SimulatedExchange requires TestClock");
584 test_clock.set_time(ts_now);
585 }
586
587 pub fn send(&mut self, command: TradingCommand) {
589 if !self.use_message_queue {
590 self.process_trading_command(command);
591 } else if self.latency_model.is_none() {
592 self.message_queue.push_back(command);
593 } else {
594 let (timestamp, counter) = self.generate_inflight_command(&command);
595 self.inflight_queue
596 .push(InflightCommand::new(timestamp, counter, command));
597 }
598 }
599
600 fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
601 if let Some(latency_model) = &self.latency_model {
602 let ts = match command {
603 TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
604 command.ts_init() + latency_model.get_insert_latency()
605 }
606 TradingCommand::ModifyOrder(_) => {
607 command.ts_init() + latency_model.get_update_latency()
608 }
609 TradingCommand::CancelOrder(_)
610 | TradingCommand::CancelAllOrders(_)
611 | TradingCommand::BatchCancelOrders(_) => {
612 command.ts_init() + latency_model.get_delete_latency()
613 }
614 _ => panic!("Cannot handle command: {command:?}"),
615 };
616
617 let counter = self
618 .inflight_counter
619 .entry(ts)
620 .and_modify(|e| *e += 1)
621 .or_insert(1);
622
623 (ts, *counter)
624 } else {
625 panic!("Latency model should be initialized");
626 }
627 }
628
629 pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
635 for module in &self.modules {
636 module.pre_process(&Data::Delta(delta));
637 }
638
639 if !self.matching_engines.contains_key(&delta.instrument_id) {
640 let instrument = {
641 let cache = self.cache.as_ref().borrow();
642 cache.instrument(&delta.instrument_id).cloned()
643 };
644
645 if let Some(instrument) = instrument {
646 self.add_instrument(instrument).unwrap();
647 } else {
648 panic!(
649 "No matching engine found for instrument {}",
650 delta.instrument_id
651 );
652 }
653 }
654
655 if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
656 matching_engine.process_order_book_delta(&delta).unwrap();
657 } else {
658 panic!("Matching engine should be initialized");
659 }
660 }
661
662 pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) {
668 for module in &self.modules {
669 module.pre_process(&Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
670 }
671
672 if !self.matching_engines.contains_key(&deltas.instrument_id) {
673 let instrument = {
674 let cache = self.cache.as_ref().borrow();
675 cache.instrument(&deltas.instrument_id).cloned()
676 };
677
678 if let Some(instrument) = instrument {
679 self.add_instrument(instrument).unwrap();
680 } else {
681 panic!(
682 "No matching engine found for instrument {}",
683 deltas.instrument_id
684 );
685 }
686 }
687
688 if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
689 matching_engine.process_order_book_deltas(deltas).unwrap();
690 } else {
691 panic!("Matching engine should be initialized");
692 }
693 }
694
695 pub fn process_order_book_depth10(&mut self, depth: &OrderBookDepth10) {
701 for module in &self.modules {
702 module.pre_process(&Data::Depth10(Box::new(*depth)));
703 }
704
705 if !self.matching_engines.contains_key(&depth.instrument_id) {
706 let instrument = {
707 let cache = self.cache.as_ref().borrow();
708 cache.instrument(&depth.instrument_id).cloned()
709 };
710
711 if let Some(instrument) = instrument {
712 self.add_instrument(instrument).unwrap();
713 } else {
714 panic!(
715 "No matching engine found for instrument {}",
716 depth.instrument_id
717 );
718 }
719 }
720
721 if let Some(matching_engine) = self.matching_engines.get_mut(&depth.instrument_id) {
722 matching_engine.process_order_book_depth10(depth).unwrap();
723 } else {
724 panic!("Matching engine should be initialized");
725 }
726 }
727
728 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
734 for module in &self.modules {
735 module.pre_process(&Data::Quote(*quote));
736 }
737
738 if !self.matching_engines.contains_key("e.instrument_id) {
739 let instrument = {
740 let cache = self.cache.as_ref().borrow();
741 cache.instrument("e.instrument_id).cloned()
742 };
743
744 if let Some(instrument) = instrument {
745 self.add_instrument(instrument).unwrap();
746 } else {
747 panic!(
748 "No matching engine found for instrument {}",
749 quote.instrument_id
750 );
751 }
752 }
753
754 if let Some(matching_engine) = self.matching_engines.get_mut("e.instrument_id) {
755 matching_engine.process_quote_tick(quote);
756 } else {
757 panic!("Matching engine should be initialized");
758 }
759 }
760
761 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
767 for module in &self.modules {
768 module.pre_process(&Data::Trade(*trade));
769 }
770
771 if !self.matching_engines.contains_key(&trade.instrument_id) {
772 let instrument = {
773 let cache = self.cache.as_ref().borrow();
774 cache.instrument(&trade.instrument_id).cloned()
775 };
776
777 if let Some(instrument) = instrument {
778 self.add_instrument(instrument).unwrap();
779 } else {
780 panic!(
781 "No matching engine found for instrument {}",
782 trade.instrument_id
783 );
784 }
785 }
786
787 if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
788 matching_engine.process_trade_tick(trade);
789 } else {
790 panic!("Matching engine should be initialized");
791 }
792 }
793
794 pub fn process_bar(&mut self, bar: Bar) {
800 for module in &self.modules {
801 module.pre_process(&Data::Bar(bar));
802 }
803
804 if !self.matching_engines.contains_key(&bar.instrument_id()) {
805 let instrument = {
806 let cache = self.cache.as_ref().borrow();
807 cache.instrument(&bar.instrument_id()).cloned()
808 };
809
810 if let Some(instrument) = instrument {
811 self.add_instrument(instrument).unwrap();
812 } else {
813 panic!(
814 "No matching engine found for instrument {}",
815 bar.instrument_id()
816 );
817 }
818 }
819
820 if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
821 matching_engine.process_bar(&bar);
822 } else {
823 panic!("Matching engine should be initialized");
824 }
825 }
826
827 pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
833 for module in &self.modules {
834 module.pre_process(&Data::InstrumentStatus(status));
835 }
836
837 if !self.matching_engines.contains_key(&status.instrument_id) {
838 let instrument = {
839 let cache = self.cache.as_ref().borrow();
840 cache.instrument(&status.instrument_id).cloned()
841 };
842
843 if let Some(instrument) = instrument {
844 self.add_instrument(instrument).unwrap();
845 } else {
846 panic!(
847 "No matching engine found for instrument {}",
848 status.instrument_id
849 );
850 }
851 }
852
853 if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
854 matching_engine.process_status(status.action);
855 } else {
856 panic!("Matching engine should be initialized");
857 }
858 }
859
860 pub fn process_instrument_close(&mut self, close: InstrumentClose) {
866 for module in &self.modules {
867 module.pre_process(&Data::InstrumentClose(close));
868 }
869
870 if !self.matching_engines.contains_key(&close.instrument_id) {
871 let instrument = {
872 let cache = self.cache.as_ref().borrow();
873 cache.instrument(&close.instrument_id).cloned()
874 };
875
876 if let Some(instrument) = instrument {
877 self.add_instrument(instrument).unwrap();
878 } else {
879 panic!(
880 "No matching engine found for instrument {}",
881 close.instrument_id
882 );
883 }
884 }
885
886 if let Some(matching_engine) = self.matching_engines.get_mut(&close.instrument_id) {
887 if let Some(price) = self.settlement_prices.get(&close.instrument_id) {
888 matching_engine.set_settlement_price(*price);
889 }
890 matching_engine.process_instrument_close(close);
891 } else {
892 panic!("Matching engine should be initialized");
893 }
894 }
895
896 pub fn process(&mut self, ts_now: UnixNanos) {
902 while let Some(inflight) = self.inflight_queue.peek() {
907 if inflight.timestamp > ts_now {
908 break;
910 }
911 let inflight = self.inflight_queue.pop().unwrap();
913 self.process_trading_command(inflight.command);
914 }
915
916 while let Some(command) = self.message_queue.pop_front() {
918 self.process_trading_command(command);
919 }
920 }
921
922 pub fn process_modules(&mut self, ts_now: UnixNanos) {
927 let adjustments = {
928 let cache = self.cache.borrow();
929 let ctx = ExchangeContext {
930 venue: self.id,
931 base_currency: self.base_currency,
932 instruments: &self.instruments,
933 matching_engines: &self.matching_engines,
934 cache: &cache,
935 };
936 self.modules
937 .iter()
938 .flat_map(|m| m.process(ts_now, &ctx))
939 .collect::<Vec<Money>>()
940 };
941
942 for adjustment in adjustments {
943 self.adjust_account(adjustment);
944 }
945 }
946
947 pub fn reset(&mut self) {
949 for module in &self.modules {
950 module.reset();
951 }
952
953 self.generate_fresh_account_state();
954
955 for matching_engine in self.matching_engines.values_mut() {
956 matching_engine.reset();
957 }
958
959 self.settlement_prices.clear();
960 self.message_queue.clear();
961 self.inflight_queue.clear();
962
963 log::info!("Resetting exchange state");
964 }
965
966 pub fn log_diagnostics(&self) {
968 for module in &self.modules {
969 module.log_diagnostics();
970 }
971 }
972
973 fn process_trading_command(&mut self, command: TradingCommand) {
974 if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
975 let account_id = if let Some(exec_client) = &self.exec_client {
976 exec_client.account_id()
977 } else {
978 panic!("Execution client should be initialized");
979 };
980
981 match command {
982 TradingCommand::SubmitOrder(command) => {
983 let mut order = self
984 .cache
985 .borrow()
986 .order(&command.client_order_id)
987 .cloned()
988 .expect("Order must exist in cache");
989 matching_engine.process_order(&mut order, account_id);
990 }
991 TradingCommand::ModifyOrder(ref command) => {
992 matching_engine.process_modify(command, account_id);
993 }
994 TradingCommand::CancelOrder(ref command) => {
995 matching_engine.process_cancel(command, account_id);
996 }
997 TradingCommand::CancelAllOrders(ref command) => {
998 matching_engine.process_cancel_all(command, account_id);
999 }
1000 TradingCommand::BatchCancelOrders(ref command) => {
1001 matching_engine.process_batch_cancel(command, account_id);
1002 }
1003 TradingCommand::SubmitOrderList(ref command) => {
1004 let mut orders: Vec<OrderAny> = self
1005 .cache
1006 .borrow()
1007 .orders_for_ids(&command.order_list.client_order_ids, command);
1008
1009 for order in &mut orders {
1010 matching_engine.process_order(order, account_id);
1011 }
1012 }
1013 _ => {}
1014 }
1015 } else {
1016 panic!(
1017 "Matching engine not found for instrument {}",
1018 command.instrument_id()
1019 );
1020 }
1021 }
1022
1023 fn generate_fresh_account_state(&self) {
1024 let balances: Vec<AccountBalance> = self
1025 .starting_balances
1026 .iter()
1027 .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
1028 .collect();
1029
1030 if let Some(exec_client) = &self.exec_client {
1031 exec_client
1032 .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
1033 .unwrap();
1034 }
1035
1036 if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
1037 margin_account.set_default_leverage(self.default_leverage);
1038 for (instrument_id, leverage) in &self.leverages {
1039 margin_account.set_leverage(*instrument_id, *leverage);
1040 }
1041
1042 if let Some(model) = &self.margin_model {
1043 margin_account.set_margin_model(model.clone());
1044 }
1045 self.cache
1046 .borrow_mut()
1047 .update_account(&AccountAny::Margin(margin_account))
1048 .unwrap();
1049 }
1050 }
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055 use std::{
1056 cell::{Cell, RefCell},
1057 collections::BinaryHeap,
1058 rc::Rc,
1059 };
1060
1061 use nautilus_common::{
1062 cache::Cache,
1063 clock::TestClock,
1064 messages::execution::{SubmitOrder, TradingCommand},
1065 msgbus::{self, stubs::get_typed_message_saving_handler},
1066 };
1067 use nautilus_core::{UUID4, UnixNanos};
1068 use nautilus_execution::models::{
1069 fee::{FeeModelAny, MakerTakerFeeModel},
1070 latency::StaticLatencyModel,
1071 };
1072 use nautilus_model::{
1073 accounts::{AccountAny, MarginAccount},
1074 data::{
1075 Bar, BarType, BookOrder, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas,
1076 QuoteTick, TradeTick,
1077 },
1078 enums::{
1079 AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
1080 OmsType, OrderSide, OrderType,
1081 },
1082 events::AccountState,
1083 identifiers::{
1084 AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
1085 },
1086 instruments::{
1087 CryptoPerpetual, Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt,
1088 },
1089 orders::{Order, OrderAny, OrderTestBuilder},
1090 stubs::TestDefault,
1091 types::{AccountBalance, Currency, Money, Price, Quantity},
1092 };
1093 use rstest::rstest;
1094 use rust_decimal::Decimal;
1095
1096 use crate::{
1097 config::SimulatedVenueConfig,
1098 exchange::{InflightCommand, SimulatedExchange},
1099 execution_client::BacktestExecutionClient,
1100 modules::{ExchangeContext, SimulationModule},
1101 };
1102
1103 fn get_exchange(
1104 venue: Venue,
1105 account_type: AccountType,
1106 book_type: BookType,
1107 cache: Option<Rc<RefCell<Cache>>>,
1108 ) -> Rc<RefCell<SimulatedExchange>> {
1109 let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
1110 let clock = Rc::new(RefCell::new(TestClock::new()));
1111 let config = SimulatedVenueConfig::builder()
1112 .venue(venue)
1113 .oms_type(OmsType::Netting)
1114 .account_type(account_type)
1115 .book_type(book_type)
1116 .starting_balances(vec![Money::new(1000.0, Currency::USD())])
1117 .default_leverage(Decimal::ONE)
1118 .fee_model(FeeModelAny::MakerTaker(MakerTakerFeeModel))
1119 .build();
1120 let exchange = Rc::new(RefCell::new(
1121 SimulatedExchange::new(config, cache.clone(), clock).unwrap(),
1122 ));
1123
1124 let clock = TestClock::new();
1125 let execution_client = BacktestExecutionClient::new(
1126 TraderId::test_default(),
1127 AccountId::test_default(),
1128 &exchange,
1129 cache,
1130 Rc::new(RefCell::new(clock)),
1131 None,
1132 None,
1133 );
1134 exchange
1135 .borrow_mut()
1136 .register_client(Rc::new(execution_client));
1137
1138 exchange
1139 }
1140
1141 fn create_submit_order_command(
1142 ts_init: UnixNanos,
1143 client_order_id: &str,
1144 ) -> (OrderAny, TradingCommand) {
1145 let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
1146 let order = OrderTestBuilder::new(OrderType::Market)
1147 .instrument_id(instrument_id)
1148 .client_order_id(ClientOrderId::new(client_order_id))
1149 .quantity(Quantity::from(1))
1150 .build();
1151 let command = TradingCommand::SubmitOrder(SubmitOrder::new(
1152 TraderId::test_default(),
1153 None,
1154 StrategyId::test_default(),
1155 instrument_id,
1156 order.client_order_id(),
1157 order.init_event().clone(),
1158 None,
1159 None,
1160 None, UUID4::default(),
1162 ts_init,
1163 ));
1164 (order, command)
1165 }
1166
1167 #[rstest]
1168 #[should_panic(
1169 expected = "Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"
1170 )]
1171 fn test_venue_mismatch_between_exchange_and_instrument(
1172 crypto_perpetual_ethusdt: CryptoPerpetual,
1173 ) {
1174 let exchange = get_exchange(
1175 Venue::new("SIM"),
1176 AccountType::Margin,
1177 BookType::L1_MBP,
1178 None,
1179 );
1180 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1181 exchange.borrow_mut().add_instrument(instrument).unwrap();
1182 }
1183
1184 #[rstest]
1185 #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
1186 fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
1187 let exchange = get_exchange(
1188 Venue::new("BINANCE"),
1189 AccountType::Cash,
1190 BookType::L1_MBP,
1191 None,
1192 );
1193 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1194 exchange.borrow_mut().add_instrument(instrument).unwrap();
1195 }
1196
1197 #[rstest]
1198 fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
1199 let exchange = get_exchange(
1200 Venue::new("BINANCE"),
1201 AccountType::Margin,
1202 BookType::L1_MBP,
1203 None,
1204 );
1205 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1206
1207 exchange.borrow_mut().add_instrument(instrument).unwrap();
1209
1210 let quote_tick = QuoteTick::new(
1212 crypto_perpetual_ethusdt.id,
1213 Price::from("1000.00"),
1214 Price::from("1001.00"),
1215 Quantity::from("1.000"),
1216 Quantity::from("1.000"),
1217 UnixNanos::default(),
1218 UnixNanos::default(),
1219 );
1220 exchange.borrow_mut().process_quote_tick("e_tick);
1221
1222 let best_bid_price = exchange
1223 .borrow()
1224 .best_bid_price(crypto_perpetual_ethusdt.id);
1225 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1226 let best_ask_price = exchange
1227 .borrow()
1228 .best_ask_price(crypto_perpetual_ethusdt.id);
1229 assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1230 }
1231
1232 #[rstest]
1233 fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
1234 let exchange = get_exchange(
1235 Venue::new("BINANCE"),
1236 AccountType::Margin,
1237 BookType::L1_MBP,
1238 None,
1239 );
1240 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1241
1242 exchange.borrow_mut().add_instrument(instrument).unwrap();
1244
1245 let trade_tick = TradeTick::new(
1247 crypto_perpetual_ethusdt.id,
1248 Price::from("1000.00"),
1249 Quantity::from("1.000"),
1250 AggressorSide::Buyer,
1251 TradeId::from("1"),
1252 UnixNanos::default(),
1253 UnixNanos::default(),
1254 );
1255 exchange.borrow_mut().process_trade_tick(&trade_tick);
1256
1257 let best_bid_price = exchange
1258 .borrow()
1259 .best_bid_price(crypto_perpetual_ethusdt.id);
1260 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1261 let best_ask = exchange
1262 .borrow()
1263 .best_ask_price(crypto_perpetual_ethusdt.id);
1264 assert_eq!(best_ask, Some(Price::from("1000.00")));
1265 }
1266
1267 #[rstest]
1268 fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1269 let exchange = get_exchange(
1270 Venue::new("BINANCE"),
1271 AccountType::Margin,
1272 BookType::L1_MBP,
1273 None,
1274 );
1275 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1276
1277 exchange.borrow_mut().add_instrument(instrument).unwrap();
1279
1280 let bar = Bar::new(
1282 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1283 Price::from("1500.00"),
1284 Price::from("1505.00"),
1285 Price::from("1490.00"),
1286 Price::from("1502.00"),
1287 Quantity::from("100.000"),
1288 UnixNanos::default(),
1289 UnixNanos::default(),
1290 );
1291 exchange.borrow_mut().process_bar(bar);
1292
1293 let best_bid_price = exchange
1295 .borrow()
1296 .best_bid_price(crypto_perpetual_ethusdt.id);
1297 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1298 let best_ask_price = exchange
1299 .borrow()
1300 .best_ask_price(crypto_perpetual_ethusdt.id);
1301 assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1302 }
1303
1304 #[rstest]
1305 fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1306 let exchange = get_exchange(
1307 Venue::new("BINANCE"),
1308 AccountType::Margin,
1309 BookType::L1_MBP,
1310 None,
1311 );
1312 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1313
1314 exchange.borrow_mut().add_instrument(instrument).unwrap();
1316
1317 let bar_bid = Bar::new(
1320 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1321 Price::from("1500.00"),
1322 Price::from("1505.00"),
1323 Price::from("1490.00"),
1324 Price::from("1502.00"),
1325 Quantity::from("100.000"),
1326 UnixNanos::from(1),
1327 UnixNanos::from(1),
1328 );
1329 let bar_ask = Bar::new(
1330 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1331 Price::from("1501.00"),
1332 Price::from("1506.00"),
1333 Price::from("1491.00"),
1334 Price::from("1503.00"),
1335 Quantity::from("100.000"),
1336 UnixNanos::from(1),
1337 UnixNanos::from(1),
1338 );
1339
1340 exchange.borrow_mut().process_bar(bar_bid);
1342 exchange.borrow_mut().process_bar(bar_ask);
1343
1344 let best_bid_price = exchange
1346 .borrow()
1347 .best_bid_price(crypto_perpetual_ethusdt.id);
1348 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1349 let best_ask_price = exchange
1350 .borrow()
1351 .best_ask_price(crypto_perpetual_ethusdt.id);
1352 assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1353 }
1354
1355 #[rstest]
1356 fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1357 let exchange = get_exchange(
1358 Venue::new("BINANCE"),
1359 AccountType::Margin,
1360 BookType::L2_MBP,
1361 None,
1362 );
1363 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1364
1365 exchange.borrow_mut().add_instrument(instrument).unwrap();
1367
1368 let delta_buy = OrderBookDelta::new(
1370 crypto_perpetual_ethusdt.id,
1371 BookAction::Add,
1372 BookOrder::new(
1373 OrderSide::Buy,
1374 Price::from("1000.00"),
1375 Quantity::from("1.000"),
1376 1,
1377 ),
1378 0,
1379 0,
1380 UnixNanos::from(1),
1381 UnixNanos::from(1),
1382 );
1383 let delta_sell = OrderBookDelta::new(
1384 crypto_perpetual_ethusdt.id,
1385 BookAction::Add,
1386 BookOrder::new(
1387 OrderSide::Sell,
1388 Price::from("1001.00"),
1389 Quantity::from("1.000"),
1390 1,
1391 ),
1392 0,
1393 1,
1394 UnixNanos::from(2),
1395 UnixNanos::from(2),
1396 );
1397
1398 exchange.borrow_mut().process_order_book_delta(delta_buy);
1400 exchange.borrow_mut().process_order_book_delta(delta_sell);
1401
1402 let book = exchange
1403 .borrow()
1404 .get_book(crypto_perpetual_ethusdt.id)
1405 .unwrap()
1406 .clone();
1407 assert_eq!(book.update_count, 2);
1408 assert_eq!(book.sequence, 1);
1409 assert_eq!(book.ts_last, UnixNanos::from(2));
1410 let best_bid_price = exchange
1411 .borrow()
1412 .best_bid_price(crypto_perpetual_ethusdt.id);
1413 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1414 let best_ask_price = exchange
1415 .borrow()
1416 .best_ask_price(crypto_perpetual_ethusdt.id);
1417 assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1418 }
1419
1420 #[rstest]
1421 fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1422 let exchange = get_exchange(
1423 Venue::new("BINANCE"),
1424 AccountType::Margin,
1425 BookType::L2_MBP,
1426 None,
1427 );
1428 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1429
1430 exchange.borrow_mut().add_instrument(instrument).unwrap();
1432
1433 let delta_sell_1 = OrderBookDelta::new(
1435 crypto_perpetual_ethusdt.id,
1436 BookAction::Add,
1437 BookOrder::new(
1438 OrderSide::Sell,
1439 Price::from("1000.00"),
1440 Quantity::from("3.000"),
1441 1,
1442 ),
1443 0,
1444 0,
1445 UnixNanos::from(1),
1446 UnixNanos::from(1),
1447 );
1448 let delta_sell_2 = OrderBookDelta::new(
1449 crypto_perpetual_ethusdt.id,
1450 BookAction::Add,
1451 BookOrder::new(
1452 OrderSide::Sell,
1453 Price::from("1001.00"),
1454 Quantity::from("1.000"),
1455 1,
1456 ),
1457 0,
1458 1,
1459 UnixNanos::from(1),
1460 UnixNanos::from(1),
1461 );
1462 let orderbook_deltas = OrderBookDeltas::new(
1463 crypto_perpetual_ethusdt.id,
1464 vec![delta_sell_1, delta_sell_2],
1465 );
1466
1467 exchange
1469 .borrow_mut()
1470 .process_order_book_deltas(&orderbook_deltas);
1471
1472 let book = exchange
1473 .borrow()
1474 .get_book(crypto_perpetual_ethusdt.id)
1475 .unwrap()
1476 .clone();
1477 assert_eq!(book.update_count, 2);
1478 assert_eq!(book.sequence, 1);
1479 assert_eq!(book.ts_last, UnixNanos::from(1));
1480 let best_bid_price = exchange
1481 .borrow()
1482 .best_bid_price(crypto_perpetual_ethusdt.id);
1483 assert_eq!(best_bid_price, None);
1485 let best_ask_price = exchange
1486 .borrow()
1487 .best_ask_price(crypto_perpetual_ethusdt.id);
1488 assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1490 }
1491
1492 #[rstest]
1493 fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1494 let exchange = get_exchange(
1495 Venue::new("BINANCE"),
1496 AccountType::Margin,
1497 BookType::L2_MBP,
1498 None,
1499 );
1500 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1501
1502 exchange.borrow_mut().add_instrument(instrument).unwrap();
1504
1505 let instrument_status = InstrumentStatus::new(
1506 crypto_perpetual_ethusdt.id,
1507 MarketStatusAction::Close, UnixNanos::from(1),
1509 UnixNanos::from(1),
1510 None,
1511 None,
1512 None,
1513 None,
1514 None,
1515 );
1516
1517 exchange
1518 .borrow_mut()
1519 .process_instrument_status(instrument_status);
1520
1521 let market_status = exchange
1522 .borrow()
1523 .get_matching_engine(&crypto_perpetual_ethusdt.id)
1524 .unwrap()
1525 .market_status;
1526 assert_eq!(market_status, MarketStatus::Closed);
1527 }
1528
1529 #[rstest]
1530 fn test_accounting() {
1531 let account_type = AccountType::Margin;
1532 let mut cache = Cache::default();
1533 let (handler, saving_handler) = get_typed_message_saving_handler::<AccountState>(None);
1534 msgbus::register_account_state_endpoint("Portfolio.update_account".into(), handler);
1535 let margin_account = MarginAccount::new(
1536 AccountState::new(
1537 AccountId::from("SIM-001"),
1538 account_type,
1539 vec![AccountBalance::new(
1540 Money::from("1000 USD"),
1541 Money::from("0 USD"),
1542 Money::from("1000 USD"),
1543 )],
1544 vec![],
1545 false,
1546 UUID4::default(),
1547 UnixNanos::default(),
1548 UnixNanos::default(),
1549 None,
1550 ),
1551 false,
1552 );
1553 let () = cache
1554 .add_account(AccountAny::Margin(margin_account))
1555 .unwrap();
1556 cache.build_index();
1558
1559 let exchange = get_exchange(
1560 Venue::new("SIM"),
1561 account_type,
1562 BookType::L2_MBP,
1563 Some(Rc::new(RefCell::new(cache))),
1564 );
1565 exchange.borrow_mut().initialize_account();
1566
1567 exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1569
1570 let messages = saving_handler.get_messages();
1572 assert_eq!(messages.len(), 2);
1573 let account_state_first = messages.first().unwrap();
1574 let account_state_second = messages.last().unwrap();
1575
1576 assert_eq!(account_state_first.balances.len(), 1);
1577 let current_balance = account_state_first.balances[0];
1578 assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1579 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1580 assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1581
1582 assert_eq!(account_state_second.balances.len(), 1);
1583 let current_balance = account_state_second.balances[0];
1584 assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1585 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1586 assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1587 }
1588
1589 #[rstest]
1590 fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1591 let (_, cmd1) = create_submit_order_command(UnixNanos::from(100), "O-1");
1593 let (_, cmd2) = create_submit_order_command(UnixNanos::from(200), "O-2");
1594 let (_, cmd3) = create_submit_order_command(UnixNanos::from(100), "O-3");
1595
1596 let inflight1 = InflightCommand::new(UnixNanos::from(100), 1, cmd1);
1597 let inflight2 = InflightCommand::new(UnixNanos::from(200), 2, cmd2);
1598 let inflight3 = InflightCommand::new(UnixNanos::from(100), 2, cmd3);
1599
1600 let mut inflight_heap = BinaryHeap::new();
1602 inflight_heap.push(inflight1);
1603 inflight_heap.push(inflight2);
1604 inflight_heap.push(inflight3);
1605
1606 let first = inflight_heap.pop().unwrap();
1609 let second = inflight_heap.pop().unwrap();
1610 let third = inflight_heap.pop().unwrap();
1611
1612 assert_eq!(first.timestamp, UnixNanos::from(100));
1613 assert_eq!(first.counter, 1);
1614 assert_eq!(second.timestamp, UnixNanos::from(100));
1615 assert_eq!(second.counter, 2);
1616 assert_eq!(third.timestamp, UnixNanos::from(200));
1617 assert_eq!(third.counter, 2);
1618 }
1619
1620 #[rstest]
1621 fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1622 let exchange = get_exchange(
1623 Venue::new("BINANCE"),
1624 AccountType::Margin,
1625 BookType::L2_MBP,
1626 None,
1627 );
1628
1629 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1630 exchange.borrow_mut().add_instrument(instrument).unwrap();
1631
1632 let (order1, command1) = create_submit_order_command(UnixNanos::from(100), "O-1");
1633 let (order2, command2) = create_submit_order_command(UnixNanos::from(200), "O-2");
1634
1635 exchange
1636 .borrow()
1637 .cache()
1638 .borrow_mut()
1639 .add_order(order1, None, None, false)
1640 .unwrap();
1641 exchange
1642 .borrow()
1643 .cache()
1644 .borrow_mut()
1645 .add_order(order2, None, None, false)
1646 .unwrap();
1647
1648 exchange.borrow_mut().send(command1);
1649 exchange.borrow_mut().send(command2);
1650
1651 assert_eq!(exchange.borrow().message_queue.len(), 2);
1654 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1655
1656 exchange.borrow_mut().process(UnixNanos::from(300));
1658 assert_eq!(exchange.borrow().message_queue.len(), 0);
1659 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1660 }
1661
1662 #[rstest]
1663 fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1664 let latency_model = StaticLatencyModel::new(
1667 UnixNanos::from(100),
1668 UnixNanos::from(200),
1669 UnixNanos::from(300),
1670 UnixNanos::from(100),
1671 );
1672 let exchange = get_exchange(
1673 Venue::new("BINANCE"),
1674 AccountType::Margin,
1675 BookType::L2_MBP,
1676 None,
1677 );
1678 exchange
1679 .borrow_mut()
1680 .set_latency_model(Box::new(latency_model));
1681
1682 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1683 exchange.borrow_mut().add_instrument(instrument).unwrap();
1684
1685 let (order1, command1) = create_submit_order_command(UnixNanos::from(100), "O-1");
1686 let (order2, command2) = create_submit_order_command(UnixNanos::from(150), "O-2");
1687
1688 exchange
1689 .borrow()
1690 .cache()
1691 .borrow_mut()
1692 .add_order(order1, None, None, false)
1693 .unwrap();
1694 exchange
1695 .borrow()
1696 .cache()
1697 .borrow_mut()
1698 .add_order(order2, None, None, false)
1699 .unwrap();
1700
1701 exchange.borrow_mut().send(command1);
1702 exchange.borrow_mut().send(command2);
1703
1704 assert_eq!(exchange.borrow().message_queue.len(), 0);
1706 assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1707 assert_eq!(
1709 exchange
1710 .borrow()
1711 .inflight_queue
1712 .iter()
1713 .next()
1714 .unwrap()
1715 .timestamp,
1716 UnixNanos::from(400)
1717 );
1718 assert_eq!(
1720 exchange
1721 .borrow()
1722 .inflight_queue
1723 .iter()
1724 .nth(1)
1725 .unwrap()
1726 .timestamp,
1727 UnixNanos::from(450)
1728 );
1729
1730 exchange.borrow_mut().process(UnixNanos::from(420));
1732 assert_eq!(exchange.borrow().message_queue.len(), 0);
1733 assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1734 assert_eq!(
1735 exchange
1736 .borrow()
1737 .inflight_queue
1738 .iter()
1739 .next()
1740 .unwrap()
1741 .timestamp,
1742 UnixNanos::from(450)
1743 );
1744 }
1745
1746 #[rstest]
1747 fn test_process_iterates_matching_engines_after_commands(
1748 crypto_perpetual_ethusdt: CryptoPerpetual,
1749 ) {
1750 let cache = Rc::new(RefCell::new(Cache::default()));
1751 let exchange = get_exchange(
1752 Venue::new("BINANCE"),
1753 AccountType::Margin,
1754 BookType::L1_MBP,
1755 Some(cache.clone()),
1756 );
1757 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1758 let instrument_id = instrument.id();
1759 exchange.borrow_mut().add_instrument(instrument).unwrap();
1760
1761 let quote = QuoteTick::new(
1762 instrument_id,
1763 Price::from("1000.00"),
1764 Price::from("1001.00"),
1765 Quantity::from("1.000"),
1766 Quantity::from("1.000"),
1767 UnixNanos::from(1),
1768 UnixNanos::from(1),
1769 );
1770 exchange.borrow_mut().process_quote_tick("e);
1771
1772 let order = OrderTestBuilder::new(OrderType::Limit)
1774 .instrument_id(instrument_id)
1775 .client_order_id(ClientOrderId::new("O-LIMIT-1"))
1776 .side(OrderSide::Buy)
1777 .quantity(Quantity::from("1.000"))
1778 .price(Price::from("999.00"))
1779 .build();
1780
1781 cache
1782 .borrow_mut()
1783 .add_order(order.clone(), None, None, false)
1784 .unwrap();
1785
1786 let command = TradingCommand::SubmitOrder(SubmitOrder::new(
1787 TraderId::test_default(),
1788 None,
1789 StrategyId::test_default(),
1790 instrument_id,
1791 order.client_order_id(),
1792 order.init_event().clone(),
1793 None,
1794 None,
1795 None,
1796 UUID4::default(),
1797 UnixNanos::from(1),
1798 ));
1799 exchange.borrow_mut().send(command);
1800
1801 exchange.borrow_mut().process(UnixNanos::from(1));
1802
1803 let open_orders = exchange.borrow().get_open_orders(Some(instrument_id));
1804 assert_eq!(open_orders.len(), 1);
1805 assert_eq!(
1806 open_orders[0].client_order_id,
1807 ClientOrderId::new("O-LIMIT-1")
1808 );
1809 }
1810
1811 #[derive(Clone)]
1812 struct MockModuleCounts {
1813 pre_process: Rc<Cell<u32>>,
1814 process: Rc<Cell<u32>>,
1815 reset: Rc<Cell<u32>>,
1816 log_diagnostics: Rc<Cell<u32>>,
1817 }
1818
1819 impl MockModuleCounts {
1820 fn new() -> Self {
1821 Self {
1822 pre_process: Rc::new(Cell::new(0)),
1823 process: Rc::new(Cell::new(0)),
1824 reset: Rc::new(Cell::new(0)),
1825 log_diagnostics: Rc::new(Cell::new(0)),
1826 }
1827 }
1828 }
1829
1830 struct MockSimulationModule {
1831 counts: MockModuleCounts,
1832 }
1833
1834 impl MockSimulationModule {
1835 fn new(counts: MockModuleCounts) -> Self {
1836 Self { counts }
1837 }
1838 }
1839
1840 impl SimulationModule for MockSimulationModule {
1841 fn pre_process(&self, _data: &Data) {
1842 self.counts
1843 .pre_process
1844 .set(self.counts.pre_process.get() + 1);
1845 }
1846
1847 fn process(&self, _ts_now: UnixNanos, _ctx: &ExchangeContext) -> Vec<Money> {
1848 self.counts.process.set(self.counts.process.get() + 1);
1849 Vec::new()
1850 }
1851
1852 fn log_diagnostics(&self) {
1853 self.counts
1854 .log_diagnostics
1855 .set(self.counts.log_diagnostics.get() + 1);
1856 }
1857
1858 fn reset(&self) {
1859 self.counts.reset.set(self.counts.reset.get() + 1);
1860 }
1861 }
1862
1863 fn get_exchange_with_module(
1864 venue: Venue,
1865 counts: MockModuleCounts,
1866 ) -> Rc<RefCell<SimulatedExchange>> {
1867 let cache = Rc::new(RefCell::new(Cache::default()));
1868 let clock = Rc::new(RefCell::new(TestClock::new()));
1869
1870 let (handler, _saving_handler) = get_typed_message_saving_handler::<AccountState>(None);
1872 msgbus::register_account_state_endpoint("Portfolio.update_account".into(), handler);
1873
1874 let modules: Vec<Box<dyn SimulationModule>> =
1875 vec![Box::new(MockSimulationModule::new(counts))];
1876
1877 let config = SimulatedVenueConfig::builder()
1878 .venue(venue)
1879 .oms_type(OmsType::Netting)
1880 .account_type(AccountType::Margin)
1881 .book_type(BookType::L1_MBP)
1882 .starting_balances(vec![Money::new(1000.0, Currency::USD())])
1883 .default_leverage(Decimal::ONE)
1884 .modules(modules)
1885 .fee_model(FeeModelAny::MakerTaker(MakerTakerFeeModel))
1886 .build();
1887 let exchange = Rc::new(RefCell::new(
1888 SimulatedExchange::new(config, cache.clone(), clock).unwrap(),
1889 ));
1890
1891 let exec_clock = TestClock::new();
1892 let execution_client = BacktestExecutionClient::new(
1893 TraderId::test_default(),
1894 AccountId::test_default(),
1895 &exchange,
1896 cache,
1897 Rc::new(RefCell::new(exec_clock)),
1898 None,
1899 None,
1900 );
1901 exchange
1902 .borrow_mut()
1903 .register_client(Rc::new(execution_client));
1904
1905 exchange
1906 }
1907
1908 #[rstest]
1909 fn test_module_pre_process_called_on_quote(crypto_perpetual_ethusdt: CryptoPerpetual) {
1910 let counts = MockModuleCounts::new();
1911 let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
1912 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1913 exchange.borrow_mut().add_instrument(instrument).unwrap();
1914
1915 let quote = QuoteTick::new(
1916 crypto_perpetual_ethusdt.id,
1917 Price::from("1000.00"),
1918 Price::from("1001.00"),
1919 Quantity::from("1.000"),
1920 Quantity::from("1.000"),
1921 UnixNanos::default(),
1922 UnixNanos::default(),
1923 );
1924 exchange.borrow_mut().process_quote_tick("e);
1925
1926 assert_eq!(counts.pre_process.get(), 1);
1927 assert_eq!(counts.process.get(), 0);
1928 }
1929
1930 #[rstest]
1931 fn test_module_pre_process_called_on_instrument_status(
1932 crypto_perpetual_ethusdt: CryptoPerpetual,
1933 ) {
1934 let counts = MockModuleCounts::new();
1935 let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
1936 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
1937 exchange.borrow_mut().add_instrument(instrument).unwrap();
1938
1939 let status = InstrumentStatus::new(
1940 crypto_perpetual_ethusdt.id,
1941 MarketStatusAction::Close,
1942 UnixNanos::from(1),
1943 UnixNanos::from(1),
1944 None,
1945 None,
1946 None,
1947 None,
1948 None,
1949 );
1950 exchange.borrow_mut().process_instrument_status(status);
1951
1952 assert_eq!(counts.pre_process.get(), 1);
1953 assert_eq!(counts.process.get(), 0);
1954 }
1955
1956 #[rstest]
1957 fn test_module_process_not_called_by_process(crypto_perpetual_ethusdt: CryptoPerpetual) {
1958 let counts = MockModuleCounts::new();
1959 let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
1960 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1961 exchange.borrow_mut().add_instrument(instrument).unwrap();
1962
1963 exchange.borrow_mut().process(UnixNanos::from(100));
1965
1966 assert_eq!(counts.process.get(), 0);
1967 }
1968
1969 #[rstest]
1970 fn test_module_process_called_by_process_modules(crypto_perpetual_ethusdt: CryptoPerpetual) {
1971 let counts = MockModuleCounts::new();
1972 let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
1973 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1974 exchange.borrow_mut().add_instrument(instrument).unwrap();
1975
1976 exchange.borrow_mut().process_modules(UnixNanos::from(100));
1977
1978 assert_eq!(counts.process.get(), 1);
1979 }
1980
1981 #[rstest]
1982 fn test_module_reset_called_on_reset(crypto_perpetual_ethusdt: CryptoPerpetual) {
1983 let counts = MockModuleCounts::new();
1984 let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
1985 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1986 exchange.borrow_mut().add_instrument(instrument).unwrap();
1987
1988 let margin_account = MarginAccount::new(
1990 AccountState::new(
1991 AccountId::test_default(),
1992 AccountType::Margin,
1993 vec![AccountBalance::new(
1994 Money::from("1000 USD"),
1995 Money::from("0 USD"),
1996 Money::from("1000 USD"),
1997 )],
1998 vec![],
1999 false,
2000 UUID4::default(),
2001 UnixNanos::default(),
2002 UnixNanos::default(),
2003 None,
2004 ),
2005 false,
2006 );
2007 exchange
2008 .borrow()
2009 .cache()
2010 .borrow_mut()
2011 .add_account(AccountAny::Margin(margin_account))
2012 .unwrap();
2013
2014 exchange.borrow_mut().reset();
2015
2016 assert_eq!(counts.reset.get(), 1);
2017 }
2018
2019 #[rstest]
2020 fn test_module_log_diagnostics(crypto_perpetual_ethusdt: CryptoPerpetual) {
2021 let counts = MockModuleCounts::new();
2022 let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
2023 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
2024 exchange.borrow_mut().add_instrument(instrument).unwrap();
2025
2026 exchange.borrow().log_diagnostics();
2027
2028 assert_eq!(counts.log_diagnostics.get(), 1);
2029 }
2030
2031 #[rstest]
2032 fn test_module_pre_process_and_process_call_order(crypto_perpetual_ethusdt: CryptoPerpetual) {
2033 let counts = MockModuleCounts::new();
2034 let exchange = get_exchange_with_module(Venue::new("BINANCE"), counts.clone());
2035 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt.clone());
2036 exchange.borrow_mut().add_instrument(instrument).unwrap();
2037
2038 let quote = QuoteTick::new(
2040 crypto_perpetual_ethusdt.id,
2041 Price::from("1000.00"),
2042 Price::from("1001.00"),
2043 Quantity::from("1.000"),
2044 Quantity::from("1.000"),
2045 UnixNanos::default(),
2046 UnixNanos::default(),
2047 );
2048 exchange.borrow_mut().process_quote_tick("e);
2049 exchange.borrow_mut().process_quote_tick("e);
2050 exchange.borrow_mut().process(UnixNanos::from(100));
2051 exchange.borrow_mut().process_modules(UnixNanos::from(100));
2052
2053 assert_eq!(counts.pre_process.get(), 2);
2054 assert_eq!(counts.process.get(), 1);
2055 }
2056}