1use std::{
17 cell::RefCell,
18 fmt::Debug,
19 ops::{Deref, DerefMut},
20 rc::Rc,
21};
22
23use ahash::{AHashMap, AHashSet};
24use nautilus_common::{
25 actor::{DataActorConfig, DataActorCore},
26 cache::Cache,
27 clock::Clock,
28 logging::{CMD, EVT, RECV},
29 messages::execution::{
30 CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
31 },
32 msgbus::{
33 self, TypedHandler,
34 switchboard::{get_quotes_topic, get_trades_topic},
35 },
36};
37use nautilus_core::{UUID4, WeakCell};
38use nautilus_model::{
39 data::{OrderBookDeltas, QuoteTick, TradeTick},
40 enums::{ContingencyType, OrderSide, OrderSideSpecified, OrderStatus, OrderType, TriggerType},
41 events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
42 identifiers::{ActorId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId},
43 instruments::Instrument,
44 orders::{LimitOrder, MarketOrder, Order, OrderAny},
45 types::{Price, Quantity},
46};
47
48use crate::{
49 matching_core::{MatchAction, OrderMatchInfo, OrderMatchingCore},
50 order_manager::{
51 handlers::{CancelOrderHandlerAny, ModifyOrderHandlerAny, SubmitOrderHandlerAny},
52 manager::OrderManager,
53 },
54 trailing::trailing_stop_calculate,
55};
56
57pub struct OrderEmulator {
58 actor: DataActorCore,
59 clock: Rc<RefCell<dyn Clock>>,
60 cache: Rc<RefCell<Cache>>,
61 manager: OrderManager,
62 matching_cores: AHashMap<InstrumentId, OrderMatchingCore>,
63 subscribed_quotes: AHashSet<InstrumentId>,
64 subscribed_trades: AHashSet<InstrumentId>,
65 subscribed_strategies: AHashSet<StrategyId>,
66 monitored_positions: AHashSet<PositionId>,
67 on_event_handler: Option<TypedHandler<OrderEventAny>>,
68 self_ref: Option<WeakCell<Self>>,
69}
70
71impl Debug for OrderEmulator {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct(stringify!(OrderEmulator))
74 .field("actor", &self.actor)
75 .field("cores", &self.matching_cores.len())
76 .field("subscribed_quotes", &self.subscribed_quotes.len())
77 .finish()
78 }
79}
80
81impl Deref for OrderEmulator {
82 type Target = DataActorCore;
83
84 fn deref(&self) -> &Self::Target {
85 &self.actor
86 }
87}
88
89impl DerefMut for OrderEmulator {
90 fn deref_mut(&mut self) -> &mut Self::Target {
91 &mut self.actor
92 }
93}
94
95impl OrderEmulator {
96 pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
97 let config = DataActorConfig {
98 actor_id: Some(ActorId::from("OrderEmulator")),
99 ..Default::default()
100 };
101
102 let active_local = true;
103 let manager =
104 OrderManager::new(clock.clone(), cache.clone(), active_local, None, None, None);
105
106 Self {
107 actor: DataActorCore::new(config),
108 clock,
109 cache,
110 manager,
111 matching_cores: AHashMap::new(),
112 subscribed_quotes: AHashSet::new(),
113 subscribed_trades: AHashSet::new(),
114 subscribed_strategies: AHashSet::new(),
115 monitored_positions: AHashSet::new(),
116 on_event_handler: None,
117 self_ref: None,
118 }
119 }
120
121 pub fn set_self_ref(&mut self, self_ref: WeakCell<Self>) {
123 self.self_ref = Some(self_ref);
124 }
125
126 pub fn register(
132 &mut self,
133 trader_id: TraderId,
134 clock: Rc<RefCell<dyn Clock>>,
135 cache: Rc<RefCell<Cache>>,
136 ) -> anyhow::Result<()> {
137 self.actor.register(trader_id, clock, cache)
138 }
139
140 pub fn set_on_event_handler(&mut self, handler: TypedHandler<OrderEventAny>) {
141 self.on_event_handler = Some(handler);
142 }
143
144 pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
146 self.manager.set_submit_order_handler(handler);
147 }
148
149 pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
151 self.manager.set_cancel_order_handler(handler);
152 }
153
154 pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
156 self.manager.set_modify_order_handler(handler);
157 }
158
159 pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
161 self.manager.cache_submit_order_command(command);
162 }
163
164 fn subscribe_quotes_for_instrument(&mut self, instrument_id: InstrumentId) {
166 let Some(self_ref) = self.self_ref.clone() else {
167 log::warn!("Cannot subscribe to quotes: self_ref not set");
168 return;
169 };
170
171 let topic = get_quotes_topic(instrument_id);
172 let handler = TypedHandler::from(move |quote: &QuoteTick| {
173 if let Some(emulator) = self_ref.upgrade() {
174 emulator.borrow_mut().on_quote_tick(*quote);
175 }
176 });
177
178 self.actor
179 .subscribe_quotes(topic, handler, instrument_id, None, None);
180 }
181
182 fn subscribe_trades_for_instrument(&mut self, instrument_id: InstrumentId) {
184 let Some(self_ref) = self.self_ref.clone() else {
185 log::warn!("Cannot subscribe to trades: self_ref not set");
186 return;
187 };
188
189 let topic = get_trades_topic(instrument_id);
190 let handler = TypedHandler::from(move |trade: &TradeTick| {
191 if let Some(emulator) = self_ref.upgrade() {
192 emulator.borrow_mut().on_trade_tick(*trade);
193 }
194 });
195
196 self.actor
197 .subscribe_trades(topic, handler, instrument_id, None, None);
198 }
199
200 #[must_use]
201 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
202 let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
203 quotes.sort();
204 quotes
205 }
206
207 #[must_use]
208 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
209 let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
210 trades.sort();
211 trades
212 }
213
214 #[must_use]
215 pub fn get_submit_order_commands(&self) -> AHashMap<ClientOrderId, SubmitOrder> {
216 self.manager.get_submit_order_commands()
217 }
218
219 #[must_use]
220 pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
221 self.matching_cores.get(instrument_id).cloned()
222 }
223
224 pub fn on_start(&mut self) -> anyhow::Result<()> {
231 let emulated_orders: Vec<OrderAny> = self
232 .cache
233 .borrow()
234 .orders_emulated(None, None, None, None, None)
235 .into_iter()
236 .cloned()
237 .collect();
238
239 if emulated_orders.is_empty() {
240 log::error!("No emulated orders to reactivate");
241 return Ok(());
242 }
243
244 for order in emulated_orders {
245 if !matches!(
246 order.status(),
247 OrderStatus::Initialized | OrderStatus::Emulated
248 ) {
249 continue; }
251
252 if let Some(parent_order_id) = &order.parent_order_id() {
253 let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
254 order.clone()
255 } else {
256 log::error!("Cannot handle order: parent {parent_order_id} not found");
257 continue;
258 };
259
260 let is_position_closed = parent_order
261 .position_id()
262 .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
263 if parent_order.is_closed() && is_position_closed {
264 self.manager.cancel_order(&order);
265 continue; }
267
268 if parent_order.contingency_type() == Some(ContingencyType::Oto)
269 && (parent_order.is_active_local()
270 || parent_order.filled_qty() == Quantity::zero(0))
271 {
272 continue; }
274 }
275
276 let position_id = self
277 .cache
278 .borrow()
279 .position_id(&order.client_order_id())
280 .copied();
281 let client_id = self
282 .cache
283 .borrow()
284 .client_id(&order.client_order_id())
285 .copied();
286
287 let command = SubmitOrder::new(
288 order.trader_id(),
289 client_id,
290 order.strategy_id(),
291 order.instrument_id(),
292 order.client_order_id(),
293 order.init_event().clone(),
294 order.exec_algorithm_id(),
295 position_id,
296 None, UUID4::new(),
298 self.clock.borrow().timestamp_ns(),
299 );
300
301 self.handle_submit_order(command);
302 }
303
304 Ok(())
305 }
306
307 pub fn on_event(&mut self, event: &OrderEventAny) {
308 log::info!("{RECV}{EVT} {event}");
309
310 self.manager.handle_event(event);
311
312 if let Some(order) = self.cache.borrow().order(&event.client_order_id())
313 && order.is_closed()
314 && let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id())
315 && let Err(e) = matching_core.delete_order(event.client_order_id())
316 {
317 log::error!("Error deleting order: {e}");
318 }
319 }
321
322 pub const fn on_stop(&self) {}
323
324 pub fn on_reset(&mut self) {
325 self.manager.reset();
326 self.matching_cores.clear();
327 }
328
329 pub const fn on_dispose(&self) {}
330
331 pub fn execute(&mut self, command: TradingCommand) {
332 log::info!("{RECV}{CMD} {command}");
333
334 match command {
335 TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
336 TradingCommand::SubmitOrderList(ref command) => self.handle_submit_order_list(command),
337 TradingCommand::ModifyOrder(ref command) => self.handle_modify_order(command),
338 TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
339 TradingCommand::CancelAllOrders(ref command) => self.handle_cancel_all_orders(command),
340 _ => log::error!("Cannot handle command: unrecognized {command:?}"),
341 }
342 }
343
344 fn create_matching_core(
345 &mut self,
346 instrument_id: InstrumentId,
347 price_increment: Price,
348 ) -> OrderMatchingCore {
349 let matching_core = OrderMatchingCore::new(instrument_id, price_increment);
350 self.matching_cores
351 .insert(instrument_id, matching_core.clone());
352 log::info!("Creating matching core for {instrument_id:?}");
353 matching_core
354 }
355
356 pub fn handle_submit_order(&mut self, command: SubmitOrder) {
360 let client_order_id = command.client_order_id;
361
362 let mut order = self
363 .cache
364 .borrow()
365 .order(&client_order_id)
366 .cloned()
367 .expect("order must exist in cache");
368
369 let emulation_trigger = order.emulation_trigger();
370
371 assert_ne!(
372 emulation_trigger,
373 Some(TriggerType::NoTrigger),
374 "order.emulation_trigger must not be TriggerType::NoTrigger"
375 );
376 assert!(
377 self.manager
378 .get_submit_order_commands()
379 .contains_key(&client_order_id),
380 "client_order_id must be in submit_order_commands"
381 );
382
383 if !matches!(
384 emulation_trigger,
385 Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
386 ) {
387 log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
388 self.manager.cancel_order(&order);
389 return;
390 }
391
392 self.check_monitoring(command.strategy_id, command.position_id);
393
394 let trigger_instrument_id = order
396 .trigger_instrument_id()
397 .unwrap_or_else(|| order.instrument_id());
398
399 let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
400
401 let mut matching_core = if let Some(core) = matching_core {
402 core
403 } else {
404 let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
406 let synthetic = self
407 .cache
408 .borrow()
409 .synthetic(&trigger_instrument_id)
410 .cloned();
411
412 if let Some(synthetic) = synthetic {
413 (synthetic.id, synthetic.price_increment)
414 } else {
415 log::error!(
416 "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
417 );
418 self.manager.cancel_order(&order);
419 return;
420 }
421 } else {
422 let instrument = self
423 .cache
424 .borrow()
425 .instrument(&trigger_instrument_id)
426 .cloned();
427
428 if let Some(instrument) = instrument {
429 (instrument.id(), instrument.price_increment())
430 } else {
431 log::error!(
432 "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
433 );
434 self.manager.cancel_order(&order);
435 return;
436 }
437 };
438
439 self.create_matching_core(instrument_id, price_increment)
440 };
441
442 if matches!(
444 order.order_type(),
445 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
446 ) {
447 self.update_trailing_stop_order(&mut order);
448 if order.trigger_price().is_none() {
449 log::error!(
450 "Cannot handle trailing stop order with no trigger_price and no market updates"
451 );
452
453 self.manager.cancel_order(&order);
454 return;
455 }
456 }
457
458 self.manager.cache_submit_order_command(command);
460
461 let match_info = OrderMatchInfo::new(
463 order.client_order_id(),
464 order.order_side().as_specified(),
465 order.order_type(),
466 order.trigger_price(),
467 order.price(),
468 true, );
470 matching_core.match_order(&match_info);
471
472 match emulation_trigger.unwrap() {
474 TriggerType::Default | TriggerType::BidAsk => {
475 if !self.subscribed_quotes.contains(&trigger_instrument_id) {
476 self.subscribe_quotes_for_instrument(trigger_instrument_id);
477 self.subscribed_quotes.insert(trigger_instrument_id);
478 }
479 }
480 TriggerType::LastPrice => {
481 if !self.subscribed_trades.contains(&trigger_instrument_id) {
482 self.subscribe_trades_for_instrument(trigger_instrument_id);
483 self.subscribed_trades.insert(trigger_instrument_id);
484 }
485 }
486 _ => {
487 log::error!("Invalid TriggerType: {emulation_trigger:?}");
488 return;
489 }
490 }
491
492 if !self
494 .manager
495 .get_submit_order_commands()
496 .contains_key(&order.client_order_id())
497 {
498 return; }
500
501 matching_core.add_order(match_info);
503
504 if order.status() == OrderStatus::Initialized {
506 let event = OrderEmulated::new(
507 order.trader_id(),
508 order.strategy_id(),
509 order.instrument_id(),
510 order.client_order_id(),
511 UUID4::new(),
512 self.clock.borrow().timestamp_ns(),
513 self.clock.borrow().timestamp_ns(),
514 );
515
516 if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
517 log::error!("Cannot apply order event: {e:?}");
518 return;
519 }
520
521 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
522 log::error!("Cannot update order: {e:?}");
523 return;
524 }
525
526 self.manager.send_risk_event(OrderEventAny::Emulated(event));
527
528 msgbus::publish_order_event(
529 format!("events.order.{}", order.strategy_id()).into(),
530 &OrderEventAny::Emulated(event),
531 );
532 }
533
534 self.matching_cores
536 .insert(trigger_instrument_id, matching_core);
537
538 log::info!("Emulating {order}");
539 }
540
541 fn handle_submit_order_list(&mut self, command: &SubmitOrderList) {
542 self.check_monitoring(command.strategy_id, command.position_id);
543
544 let orders: Vec<OrderAny> = self
545 .cache
546 .borrow()
547 .orders_for_ids(&command.order_list.client_order_ids, &command);
548
549 for order in &orders {
550 if let Some(parent_order_id) = order.parent_order_id() {
551 let cache = self.cache.borrow();
552 let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
553 parent_order
554 } else {
555 log::error!("Parent order for {} not found", order.client_order_id());
556 continue;
557 };
558
559 if parent_order.contingency_type() == Some(ContingencyType::Oto) {
560 continue; }
562 }
563
564 if let Err(e) =
565 self.manager
566 .create_new_submit_order(order, command.position_id, command.client_id)
567 {
568 log::error!("Error creating new submit order: {e}");
569 }
570 }
571 }
572
573 fn handle_modify_order(&mut self, command: &ModifyOrder) {
574 if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
575 let price = match command.price {
576 Some(price) => Some(price),
577 None => order.price(),
578 };
579
580 let trigger_price = match command.trigger_price {
581 Some(trigger_price) => Some(trigger_price),
582 None => order.trigger_price(),
583 };
584
585 let ts_now = self.clock.borrow().timestamp_ns();
586 let event = OrderUpdated::new(
587 order.trader_id(),
588 order.strategy_id(),
589 order.instrument_id(),
590 order.client_order_id(),
591 command.quantity.unwrap_or(order.quantity()),
592 UUID4::new(),
593 ts_now,
594 ts_now,
595 false,
596 order.venue_order_id(),
597 order.account_id(),
598 price,
599 trigger_price,
600 None,
601 order.is_quote_quantity(),
602 );
603
604 self.manager.send_exec_event(OrderEventAny::Updated(event));
605
606 let trigger_instrument_id = order
607 .trigger_instrument_id()
608 .unwrap_or_else(|| order.instrument_id());
609
610 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
611 let match_info = OrderMatchInfo::new(
612 order.client_order_id(),
613 order.order_side().as_specified(),
614 order.order_type(),
615 trigger_price,
616 price,
617 true, );
619 matching_core.match_order(&match_info);
620 } else {
621 log::error!(
622 "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
623 );
624 }
625 } else {
626 log::error!("Cannot modify order: {} not found", command.client_order_id);
627 }
628 }
629
630 pub fn handle_cancel_order(&mut self, command: CancelOrder) {
631 let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
632 order.clone()
633 } else {
634 log::error!("Cannot cancel order: {} not found", command.client_order_id);
635 return;
636 };
637
638 let trigger_instrument_id = order
639 .trigger_instrument_id()
640 .unwrap_or_else(|| order.instrument_id());
641
642 let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
643 core
644 } else {
645 self.manager.cancel_order(&order);
646 return;
647 };
648
649 if !matching_core.order_exists(order.client_order_id())
650 && order.is_open()
651 && !order.is_pending_cancel()
652 {
653 self.manager
655 .send_exec_command(TradingCommand::CancelOrder(command));
656 } else {
657 self.manager.cancel_order(&order);
658 }
659 }
660
661 fn handle_cancel_all_orders(&mut self, command: &CancelAllOrders) {
662 let instrument_id = command.instrument_id;
663 let matching_core = match self.matching_cores.get(&instrument_id) {
664 Some(core) => core,
665 None => return, };
667
668 let orders_to_cancel = match command.order_side {
669 OrderSide::NoOrderSide => {
670 let mut all_orders = Vec::new();
672 all_orders.extend(matching_core.get_orders_bid().iter().cloned());
673 all_orders.extend(matching_core.get_orders_ask().iter().cloned());
674 all_orders
675 }
676 OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
677 OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
678 };
679
680 for match_info in orders_to_cancel {
682 if let Some(order) = self
683 .cache
684 .borrow()
685 .order(&match_info.client_order_id)
686 .cloned()
687 {
688 self.manager.cancel_order(&order);
689 }
690 }
691 }
692
693 pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
694 log::info!(
695 "Updating order {} quantity to {new_quantity}",
696 order.client_order_id(),
697 );
698
699 let ts_now = self.clock.borrow().timestamp_ns();
700 let event = OrderUpdated::new(
701 order.trader_id(),
702 order.strategy_id(),
703 order.instrument_id(),
704 order.client_order_id(),
705 new_quantity,
706 UUID4::new(),
707 ts_now,
708 ts_now,
709 false,
710 None,
711 order.account_id(),
712 None,
713 None,
714 None,
715 order.is_quote_quantity(),
716 );
717
718 if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
719 log::error!("Cannot apply order event: {e:?}");
720 return;
721 }
722
723 if let Err(e) = self.cache.borrow_mut().update_order(order) {
724 log::error!("Cannot update order: {e:?}");
725 return;
726 }
727
728 self.manager.send_risk_event(OrderEventAny::Updated(event));
729 }
730
731 pub fn on_order_book_deltas(&mut self, deltas: &OrderBookDeltas) {
732 log::debug!("Processing {deltas:?}");
733
734 let instrument_id = &deltas.instrument_id;
735 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
736 if let Some(book) = self.cache.borrow().order_book(instrument_id) {
737 let best_bid = book.best_bid_price();
738 let best_ask = book.best_ask_price();
739
740 if let Some(best_bid) = best_bid {
741 matching_core.set_bid_raw(best_bid);
742 }
743
744 if let Some(best_ask) = best_ask {
745 matching_core.set_ask_raw(best_ask);
746 }
747 } else {
748 log::error!(
749 "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
750 deltas.instrument_id
751 );
752 }
753
754 self.iterate_orders(instrument_id);
755 } else {
756 log::error!(
757 "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
758 deltas.instrument_id
759 );
760 }
761 }
762
763 pub fn on_quote_tick(&mut self, quote: QuoteTick) {
764 log::debug!("Processing {quote}:?");
765
766 let instrument_id = "e.instrument_id;
767 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
768 matching_core.set_bid_raw(quote.bid_price);
769 matching_core.set_ask_raw(quote.ask_price);
770
771 self.iterate_orders(instrument_id);
772 } else {
773 log::error!(
774 "Cannot handle `QuoteTick`: no matching core for instrument {}",
775 quote.instrument_id
776 );
777 }
778 }
779
780 pub fn on_trade_tick(&mut self, trade: TradeTick) {
781 log::debug!("Processing {trade:?}");
782
783 let instrument_id = &trade.instrument_id;
784 if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
785 matching_core.set_last_raw(trade.price);
786
787 if !self.subscribed_quotes.contains(instrument_id) {
788 matching_core.set_bid_raw(trade.price);
789 matching_core.set_ask_raw(trade.price);
790 }
791
792 self.iterate_orders(instrument_id);
793 } else {
794 log::error!(
795 "Cannot handle `TradeTick`: no matching core for instrument {}",
796 trade.instrument_id
797 );
798 }
799 }
800
801 fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
802 let bid_actions = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
805 matching_core.iterate_bids()
806 } else {
807 log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
808 return;
809 };
810
811 for action in bid_actions {
812 match action {
813 MatchAction::FillLimit(id) => self.fill_limit_order(id),
814 MatchAction::TriggerStop(id) => self.trigger_stop_order(id),
815 }
816 }
817
818 let ask_actions = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
819 matching_core.iterate_asks()
820 } else {
821 return;
822 };
823
824 for action in ask_actions {
825 match action {
826 MatchAction::FillLimit(id) => self.fill_limit_order(id),
827 MatchAction::TriggerStop(id) => self.trigger_stop_order(id),
828 }
829 }
830
831 let orders = if let Some(matching_core) = self.matching_cores.get(instrument_id) {
833 matching_core.get_orders()
834 } else {
835 return;
836 };
837
838 for match_info in orders {
839 if !matches!(
840 match_info.order_type,
841 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
842 ) {
843 continue;
844 }
845
846 let mut order = match self
847 .cache
848 .borrow()
849 .order(&match_info.client_order_id)
850 .cloned()
851 {
852 Some(order) => order,
853 None => continue,
854 };
855
856 if order.is_closed() {
857 continue;
858 }
859
860 self.update_trailing_stop_order(&mut order);
861 }
862 }
863
864 pub fn cancel_order(&mut self, order: &OrderAny) {
865 log::info!("Canceling order {}", order.client_order_id());
866
867 let mut order = order.clone();
868 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
869
870 let trigger_instrument_id = order
871 .trigger_instrument_id()
872 .unwrap_or(order.instrument_id());
873
874 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id)
875 && let Err(e) = matching_core.delete_order(order.client_order_id())
876 {
877 log::error!("Cannot delete order: {e:?}");
878 }
879
880 self.manager
881 .pop_submit_order_command(order.client_order_id());
882
883 self.cache
884 .borrow_mut()
885 .update_order_pending_cancel_local(&order);
886
887 let ts_now = self.clock.borrow().timestamp_ns();
888 let event = OrderCanceled::new(
889 order.trader_id(),
890 order.strategy_id(),
891 order.instrument_id(),
892 order.client_order_id(),
893 UUID4::new(),
894 ts_now,
895 ts_now,
896 false,
897 order.venue_order_id(),
898 order.account_id(),
899 );
900
901 self.manager.send_exec_event(OrderEventAny::Canceled(event));
902 }
903
904 fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
905 if !self.subscribed_strategies.contains(&strategy_id) {
906 if let Some(handler) = &self.on_event_handler {
908 msgbus::subscribe_order_events(
909 format!("events.order.{strategy_id}").into(),
910 handler.clone(),
911 None,
912 );
913 self.subscribed_strategies.insert(strategy_id);
914 log::info!("Subscribed to strategy {strategy_id} order events");
915 }
916 }
917
918 if let Some(position_id) = position_id
919 && !self.monitored_positions.contains(&position_id)
920 {
921 self.monitored_positions.insert(position_id);
922 }
923 }
924
925 fn validate_release(
933 &self,
934 order: &OrderAny,
935 matching_core: &OrderMatchingCore,
936 trigger_instrument_id: InstrumentId,
937 ) -> Option<Price> {
938 let released_price = match order.order_side_specified() {
939 OrderSideSpecified::Buy => matching_core.ask,
940 OrderSideSpecified::Sell => matching_core.bid,
941 };
942
943 if released_price.is_none() {
944 log::warn!(
945 "Cannot release order {} yet: no market data available for {trigger_instrument_id}, will retry on next update",
946 order.client_order_id(),
947 );
948 return None;
949 }
950
951 Some(released_price.unwrap())
952 }
953
954 pub fn trigger_stop_order(&mut self, client_order_id: ClientOrderId) {
958 let order = match self.cache.borrow().order(&client_order_id).cloned() {
959 Some(order) => order,
960 None => {
961 log::error!(
962 "Cannot trigger stop order: order {client_order_id} not found in cache"
963 );
964 return;
965 }
966 };
967
968 match order.order_type() {
969 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
970 self.fill_limit_order(client_order_id);
971 }
972 OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
973 self.fill_market_order(client_order_id);
974 }
975 _ => panic!("invalid `OrderType`, was {}", order.order_type()),
976 }
977 }
978
979 pub fn fill_limit_order(&mut self, client_order_id: ClientOrderId) {
983 let order = match self.cache.borrow().order(&client_order_id).cloned() {
984 Some(order) => order,
985 None => {
986 log::error!("Cannot fill limit order: order {client_order_id} not found in cache");
987 return;
988 }
989 };
990
991 if matches!(order.order_type(), OrderType::Limit) {
992 self.fill_market_order(client_order_id);
993 return;
994 }
995
996 let trigger_instrument_id = order
997 .trigger_instrument_id()
998 .unwrap_or(order.instrument_id());
999
1000 let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
1001 Some(core) => core,
1002 None => {
1003 log::error!(
1004 "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
1005 );
1006 return; }
1008 };
1009
1010 let released_price =
1011 match self.validate_release(&order, matching_core, trigger_instrument_id) {
1012 Some(price) => price,
1013 None => return, };
1015
1016 let command = match self
1017 .manager
1018 .pop_submit_order_command(order.client_order_id())
1019 {
1020 Some(command) => command,
1021 None => return, };
1023
1024 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1025 if let Err(e) = matching_core.delete_order(client_order_id) {
1026 log::error!("Error deleting order: {e:?}");
1027 }
1028
1029 let emulation_trigger = TriggerType::NoTrigger;
1030
1031 let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
1033 order.trader_id(),
1034 order.strategy_id(),
1035 order.instrument_id(),
1036 order.client_order_id(),
1037 order.order_side(),
1038 order.quantity(),
1039 order.price().unwrap(),
1040 order.time_in_force(),
1041 order.expire_time(),
1042 order.is_post_only(),
1043 order.is_reduce_only(),
1044 order.is_quote_quantity(),
1045 order.display_qty(),
1046 Some(emulation_trigger),
1047 Some(trigger_instrument_id),
1048 order.contingency_type(),
1049 order.order_list_id(),
1050 order.linked_order_ids().map(Vec::from),
1051 order.parent_order_id(),
1052 order.exec_algorithm_id(),
1053 order.exec_algorithm_params().cloned(),
1054 order.exec_spawn_id(),
1055 order.tags().map(Vec::from),
1056 UUID4::new(),
1057 self.clock.borrow().timestamp_ns(),
1058 ) {
1059 transformed
1060 } else {
1061 log::error!("Cannot create limit order");
1062 return;
1063 };
1064 transformed.liquidity_side = order.liquidity_side();
1065
1066 let original_events = order.events();
1073
1074 for event in original_events {
1075 transformed.events.insert(0, event.clone());
1076 }
1077
1078 if let Err(e) = self.cache.borrow_mut().add_order(
1079 OrderAny::Limit(transformed.clone()),
1080 command.position_id,
1081 command.client_id,
1082 true,
1083 ) {
1084 log::error!("Failed to add order: {e}");
1085 }
1086
1087 msgbus::publish_order_event(
1088 format!("events.order.{}", order.strategy_id()).into(),
1089 transformed.last_event(),
1090 );
1091
1092 let event = OrderReleased::new(
1093 order.trader_id(),
1094 order.strategy_id(),
1095 order.instrument_id(),
1096 order.client_order_id(),
1097 released_price,
1098 UUID4::new(),
1099 self.clock.borrow().timestamp_ns(),
1100 self.clock.borrow().timestamp_ns(),
1101 );
1102
1103 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1104 log::error!("Failed to apply order event: {e}");
1105 }
1106
1107 if let Err(e) = self
1108 .cache
1109 .borrow_mut()
1110 .update_order(&OrderAny::Limit(transformed.clone()))
1111 {
1112 log::error!("Failed to update order: {e}");
1113 }
1114
1115 self.manager.send_risk_event(OrderEventAny::Released(event));
1116
1117 log::info!("Releasing order {}", order.client_order_id());
1118
1119 msgbus::publish_order_event(
1121 format!("events.order.{}", transformed.strategy_id()).into(),
1122 &OrderEventAny::Released(event),
1123 );
1124
1125 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1126 self.manager.send_algo_command(command, exec_algorithm_id);
1127 } else {
1128 self.manager
1129 .send_exec_command(TradingCommand::SubmitOrder(command));
1130 }
1131 }
1132 }
1133
1134 pub fn fill_market_order(&mut self, client_order_id: ClientOrderId) {
1138 let mut order = match self.cache.borrow().order(&client_order_id).cloned() {
1139 Some(order) => order,
1140 None => {
1141 log::error!("Cannot fill market order: order {client_order_id} not found in cache");
1142 return;
1143 }
1144 };
1145
1146 let trigger_instrument_id = order
1147 .trigger_instrument_id()
1148 .unwrap_or(order.instrument_id());
1149
1150 let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
1151 Some(core) => core,
1152 None => {
1153 log::error!(
1154 "Cannot fill market order: no matching core for instrument {trigger_instrument_id}"
1155 );
1156 return; }
1158 };
1159
1160 let released_price =
1161 match self.validate_release(&order, matching_core, trigger_instrument_id) {
1162 Some(price) => price,
1163 None => return, };
1165
1166 let command = self
1167 .manager
1168 .pop_submit_order_command(order.client_order_id())
1169 .expect("invalid operation `fill_market_order` with no command");
1170
1171 if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1172 if let Err(e) = matching_core.delete_order(client_order_id) {
1173 log::error!("Cannot delete order: {e:?}");
1174 }
1175
1176 order.set_emulation_trigger(Some(TriggerType::NoTrigger));
1177
1178 let mut transformed = MarketOrder::new(
1180 order.trader_id(),
1181 order.strategy_id(),
1182 order.instrument_id(),
1183 order.client_order_id(),
1184 order.order_side(),
1185 order.quantity(),
1186 order.time_in_force(),
1187 UUID4::new(),
1188 self.clock.borrow().timestamp_ns(),
1189 order.is_reduce_only(),
1190 order.is_quote_quantity(),
1191 order.contingency_type(),
1192 order.order_list_id(),
1193 order.linked_order_ids().map(Vec::from),
1194 order.parent_order_id(),
1195 order.exec_algorithm_id(),
1196 order.exec_algorithm_params().cloned(),
1197 order.exec_spawn_id(),
1198 order.tags().map(Vec::from),
1199 );
1200
1201 let original_events = order.events();
1202
1203 for event in original_events {
1204 transformed.events.insert(0, event.clone());
1205 }
1206
1207 if let Err(e) = self.cache.borrow_mut().add_order(
1208 OrderAny::Market(transformed.clone()),
1209 command.position_id,
1210 command.client_id,
1211 true,
1212 ) {
1213 log::error!("Failed to add order: {e}");
1214 }
1215
1216 msgbus::publish_order_event(
1217 format!("events.order.{}", order.strategy_id()).into(),
1218 transformed.last_event(),
1219 );
1220
1221 let ts_now = self.clock.borrow().timestamp_ns();
1222 let event = OrderReleased::new(
1223 order.trader_id(),
1224 order.strategy_id(),
1225 order.instrument_id(),
1226 order.client_order_id(),
1227 released_price,
1228 UUID4::new(),
1229 ts_now,
1230 ts_now,
1231 );
1232
1233 if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1234 log::error!("Failed to apply order event: {e}");
1235 }
1236
1237 if let Err(e) = self
1238 .cache
1239 .borrow_mut()
1240 .update_order(&OrderAny::Market(transformed))
1241 {
1242 log::error!("Failed to update order: {e}");
1243 }
1244 self.manager.send_risk_event(OrderEventAny::Released(event));
1245
1246 log::info!("Releasing order {}", order.client_order_id());
1247
1248 msgbus::publish_order_event(
1250 format!("events.order.{}", order.strategy_id()).into(),
1251 &OrderEventAny::Released(event),
1252 );
1253
1254 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1255 self.manager.send_algo_command(command, exec_algorithm_id);
1256 } else {
1257 self.manager
1258 .send_exec_command(TradingCommand::SubmitOrder(command));
1259 }
1260 }
1261 }
1262
1263 fn update_trailing_stop_order(&self, order: &mut OrderAny) {
1264 let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) else {
1265 log::error!(
1266 "Cannot update trailing-stop order: no matching core for instrument {}",
1267 order.instrument_id()
1268 );
1269 return;
1270 };
1271
1272 let mut bid = matching_core.bid;
1273 let mut ask = matching_core.ask;
1274 let mut last = matching_core.last;
1275
1276 if bid.is_none() || ask.is_none() || last.is_none() {
1277 if let Some(q) = self.cache.borrow().quote(&matching_core.instrument_id) {
1278 bid.get_or_insert(q.bid_price);
1279 ask.get_or_insert(q.ask_price);
1280 }
1281
1282 if let Some(t) = self.cache.borrow().trade(&matching_core.instrument_id) {
1283 last.get_or_insert(t.price);
1284 }
1285 }
1286
1287 let (new_trigger_px, new_limit_px) = match trailing_stop_calculate(
1288 matching_core.price_increment,
1289 order.trigger_price(),
1290 order.activation_price(),
1291 order,
1292 bid,
1293 ask,
1294 last,
1295 ) {
1296 Ok(pair) => pair,
1297 Err(e) => {
1298 log::warn!("Cannot calculate trailing-stop update: {e}");
1299 return;
1300 }
1301 };
1302
1303 if new_trigger_px.is_none() && new_limit_px.is_none() {
1304 return;
1305 }
1306
1307 let ts_now = self.clock.borrow().timestamp_ns();
1308 let update = OrderUpdated::new(
1309 order.trader_id(),
1310 order.strategy_id(),
1311 order.instrument_id(),
1312 order.client_order_id(),
1313 order.quantity(),
1314 UUID4::new(),
1315 ts_now,
1316 ts_now,
1317 false,
1318 order.venue_order_id(),
1319 order.account_id(),
1320 new_limit_px,
1321 new_trigger_px,
1322 None,
1323 order.is_quote_quantity(),
1324 );
1325 let wrapped = OrderEventAny::Updated(update);
1326 if let Err(e) = order.apply(wrapped.clone()) {
1327 log::error!("Failed to apply order event: {e}");
1328 return;
1329 }
1330
1331 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1332 log::error!("Failed to update order in cache: {e}");
1333 return;
1334 }
1335 self.manager.send_risk_event(wrapped);
1336 }
1337}
1338
1339#[cfg(test)]
1340mod tests {
1341 use std::{cell::RefCell, rc::Rc};
1342
1343 use nautilus_common::{cache::Cache, clock::TestClock};
1344 use nautilus_core::{UUID4, WeakCell};
1345 use nautilus_model::{
1346 data::{QuoteTick, TradeTick},
1347 enums::{AggressorSide, OrderSide, OrderType, TriggerType},
1348 identifiers::{StrategyId, TradeId, TraderId},
1349 instruments::{
1350 CryptoPerpetual, Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt,
1351 },
1352 orders::OrderTestBuilder,
1353 types::{Price, Quantity},
1354 };
1355 use rstest::{fixture, rstest};
1356
1357 use super::*;
1358
1359 #[fixture]
1360 fn instrument() -> CryptoPerpetual {
1361 crypto_perpetual_ethusdt()
1362 }
1363
1364 #[expect(clippy::type_complexity)]
1365 fn create_emulator() -> (
1366 Rc<RefCell<dyn Clock>>,
1367 Rc<RefCell<Cache>>,
1368 Rc<RefCell<OrderEmulator>>,
1369 ) {
1370 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1371 let cache = Rc::new(RefCell::new(Cache::new(None, None)));
1372 let emulator = Rc::new(RefCell::new(OrderEmulator::new(
1373 clock.clone(),
1374 cache.clone(),
1375 )));
1376
1377 emulator
1379 .borrow_mut()
1380 .register(TraderId::from("TRADER-001"), clock.clone(), cache.clone())
1381 .unwrap();
1382
1383 let self_ref = WeakCell::from(Rc::downgrade(&emulator));
1385 emulator.borrow_mut().set_self_ref(self_ref);
1386
1387 (clock, cache, emulator)
1388 }
1389
1390 fn create_stop_market_order(instrument: &CryptoPerpetual, trigger: TriggerType) -> OrderAny {
1391 OrderTestBuilder::new(OrderType::StopMarket)
1392 .instrument_id(instrument.id())
1393 .side(OrderSide::Buy)
1394 .trigger_price(Price::from("5100.00"))
1395 .quantity(Quantity::from(1))
1396 .emulation_trigger(trigger)
1397 .build()
1398 }
1399
1400 fn create_submit_order(instrument: &CryptoPerpetual, order: &OrderAny) -> SubmitOrder {
1401 SubmitOrder::new(
1402 TraderId::from("TRADER-001"),
1403 None,
1404 StrategyId::from("STRATEGY-001"),
1405 instrument.id(),
1406 order.client_order_id(),
1407 order.init_event().clone(),
1408 None,
1409 None,
1410 None,
1411 UUID4::new(),
1412 0.into(),
1413 )
1414 }
1415
1416 fn create_quote_tick(instrument: &CryptoPerpetual, bid: &str, ask: &str) -> QuoteTick {
1417 QuoteTick::new(
1418 instrument.id(),
1419 Price::from(bid),
1420 Price::from(ask),
1421 Quantity::from(10),
1422 Quantity::from(10),
1423 0.into(),
1424 0.into(),
1425 )
1426 }
1427
1428 fn create_trade_tick(instrument: &CryptoPerpetual, price: &str) -> TradeTick {
1429 TradeTick::new(
1430 instrument.id(),
1431 Price::from(price),
1432 Quantity::from(1),
1433 AggressorSide::Buyer,
1434 TradeId::from("T-001"),
1435 0.into(),
1436 0.into(),
1437 )
1438 }
1439
1440 fn add_instrument_to_cache(cache: &Rc<RefCell<Cache>>, instrument: &CryptoPerpetual) {
1441 cache
1442 .borrow_mut()
1443 .add_instrument(InstrumentAny::CryptoPerpetual(instrument.clone()))
1444 .unwrap();
1445 }
1446
1447 #[rstest]
1448 fn test_subscribed_quotes_initially_empty() {
1449 let (_clock, _cache, emulator) = create_emulator();
1450
1451 assert!(emulator.borrow().subscribed_quotes().is_empty());
1452 }
1453
1454 #[rstest]
1455 fn test_subscribed_trades_initially_empty() {
1456 let (_clock, _cache, emulator) = create_emulator();
1457
1458 assert!(emulator.borrow().subscribed_trades().is_empty());
1459 }
1460
1461 #[rstest]
1462 fn test_get_submit_order_commands_initially_empty() {
1463 let (_clock, _cache, emulator) = create_emulator();
1464
1465 assert!(emulator.borrow().get_submit_order_commands().is_empty());
1466 }
1467
1468 #[rstest]
1469 fn test_get_matching_core_returns_none_when_not_created(instrument: CryptoPerpetual) {
1470 let (_clock, _cache, emulator) = create_emulator();
1471
1472 assert!(
1473 emulator
1474 .borrow()
1475 .get_matching_core(&instrument.id())
1476 .is_none()
1477 );
1478 }
1479
1480 #[rstest]
1481 fn test_create_matching_core(instrument: CryptoPerpetual) {
1482 let (_clock, _cache, emulator) = create_emulator();
1483
1484 emulator
1485 .borrow_mut()
1486 .create_matching_core(instrument.id(), instrument.price_increment);
1487
1488 assert!(
1489 emulator
1490 .borrow()
1491 .get_matching_core(&instrument.id())
1492 .is_some()
1493 );
1494 }
1495
1496 #[rstest]
1497 fn test_on_quote_tick_no_matching_core_does_not_panic(instrument: CryptoPerpetual) {
1498 let (_clock, _cache, emulator) = create_emulator();
1499 let quote = create_quote_tick(&instrument, "5060.00", "5070.00");
1500
1501 emulator.borrow_mut().on_quote_tick(quote);
1502 }
1503
1504 #[rstest]
1505 fn test_on_trade_tick_no_matching_core_does_not_panic(instrument: CryptoPerpetual) {
1506 let (_clock, _cache, emulator) = create_emulator();
1507 let trade = create_trade_tick(&instrument, "5065.00");
1508
1509 emulator.borrow_mut().on_trade_tick(trade);
1510 }
1511
1512 #[rstest]
1513 fn test_submit_order_bid_ask_trigger_creates_matching_core(instrument: CryptoPerpetual) {
1514 let (_clock, cache, emulator) = create_emulator();
1515 add_instrument_to_cache(&cache, &instrument);
1516 let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1517 let command = create_submit_order(&instrument, &order);
1518 cache
1519 .borrow_mut()
1520 .add_order(order, None, None, false)
1521 .unwrap();
1522
1523 emulator
1524 .borrow_mut()
1525 .cache_submit_order_command(command.clone());
1526 emulator.borrow_mut().handle_submit_order(command);
1527
1528 assert!(
1529 emulator
1530 .borrow()
1531 .get_matching_core(&instrument.id())
1532 .is_some()
1533 );
1534 }
1535
1536 #[rstest]
1537 fn test_submit_order_bid_ask_trigger_tracks_quote_subscription(instrument: CryptoPerpetual) {
1538 let (_clock, cache, emulator) = create_emulator();
1539 add_instrument_to_cache(&cache, &instrument);
1540 let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1541 let command = create_submit_order(&instrument, &order);
1542 cache
1543 .borrow_mut()
1544 .add_order(order, None, None, false)
1545 .unwrap();
1546
1547 emulator
1548 .borrow_mut()
1549 .cache_submit_order_command(command.clone());
1550 emulator.borrow_mut().handle_submit_order(command);
1551
1552 assert_eq!(emulator.borrow().subscribed_quotes(), vec![instrument.id()]);
1553 assert!(emulator.borrow().subscribed_trades().is_empty());
1554 }
1555
1556 #[rstest]
1557 fn test_submit_order_last_price_trigger_tracks_trade_subscription(instrument: CryptoPerpetual) {
1558 let (_clock, cache, emulator) = create_emulator();
1559 add_instrument_to_cache(&cache, &instrument);
1560 let order = create_stop_market_order(&instrument, TriggerType::LastPrice);
1561 let command = create_submit_order(&instrument, &order);
1562 cache
1563 .borrow_mut()
1564 .add_order(order, None, None, false)
1565 .unwrap();
1566
1567 emulator
1568 .borrow_mut()
1569 .cache_submit_order_command(command.clone());
1570 emulator.borrow_mut().handle_submit_order(command);
1571
1572 assert!(emulator.borrow().subscribed_quotes().is_empty());
1573 assert_eq!(emulator.borrow().subscribed_trades(), vec![instrument.id()]);
1574 }
1575
1576 #[rstest]
1577 fn test_submit_order_caches_command(instrument: CryptoPerpetual) {
1578 let (_clock, cache, emulator) = create_emulator();
1579 add_instrument_to_cache(&cache, &instrument);
1580 let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1581 let client_order_id = order.client_order_id();
1582 let command = create_submit_order(&instrument, &order);
1583 cache
1584 .borrow_mut()
1585 .add_order(order, None, None, false)
1586 .unwrap();
1587
1588 emulator
1589 .borrow_mut()
1590 .cache_submit_order_command(command.clone());
1591 emulator.borrow_mut().handle_submit_order(command);
1592
1593 let commands = emulator.borrow().get_submit_order_commands();
1594 assert!(commands.contains_key(&client_order_id));
1595 }
1596
1597 #[rstest]
1598 fn test_quote_tick_updates_matching_core_prices(instrument: CryptoPerpetual) {
1599 let (_clock, cache, emulator) = create_emulator();
1600 add_instrument_to_cache(&cache, &instrument);
1601 let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1602 let command = create_submit_order(&instrument, &order);
1603 cache
1604 .borrow_mut()
1605 .add_order(order, None, None, false)
1606 .unwrap();
1607 emulator
1608 .borrow_mut()
1609 .cache_submit_order_command(command.clone());
1610 emulator.borrow_mut().handle_submit_order(command);
1611
1612 let quote = create_quote_tick(&instrument, "5060.00", "5070.00");
1613 emulator.borrow_mut().on_quote_tick(quote);
1614
1615 let core = emulator
1616 .borrow()
1617 .get_matching_core(&instrument.id())
1618 .unwrap();
1619 assert_eq!(core.bid, Some(Price::from("5060.00")));
1620 assert_eq!(core.ask, Some(Price::from("5070.00")));
1621 }
1622
1623 #[rstest]
1624 fn test_trade_tick_updates_matching_core_last_price(instrument: CryptoPerpetual) {
1625 let (_clock, cache, emulator) = create_emulator();
1626 add_instrument_to_cache(&cache, &instrument);
1627 let order = create_stop_market_order(&instrument, TriggerType::LastPrice);
1628 let command = create_submit_order(&instrument, &order);
1629 cache
1630 .borrow_mut()
1631 .add_order(order, None, None, false)
1632 .unwrap();
1633 emulator
1634 .borrow_mut()
1635 .cache_submit_order_command(command.clone());
1636 emulator.borrow_mut().handle_submit_order(command);
1637
1638 let trade = create_trade_tick(&instrument, "5065.00");
1639 emulator.borrow_mut().on_trade_tick(trade);
1640
1641 let core = emulator
1642 .borrow()
1643 .get_matching_core(&instrument.id())
1644 .unwrap();
1645 assert_eq!(core.last, Some(Price::from("5065.00")));
1646 }
1647
1648 #[rstest]
1649 fn test_cancel_order_removes_from_matching_core(instrument: CryptoPerpetual) {
1650 let (_clock, cache, emulator) = create_emulator();
1651 add_instrument_to_cache(&cache, &instrument);
1652 let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1653 let command = create_submit_order(&instrument, &order);
1654 cache
1655 .borrow_mut()
1656 .add_order(order.clone(), None, None, false)
1657 .unwrap();
1658 emulator
1659 .borrow_mut()
1660 .cache_submit_order_command(command.clone());
1661 emulator.borrow_mut().handle_submit_order(command);
1662
1663 emulator.borrow_mut().cancel_order(&order);
1664
1665 let core = emulator
1666 .borrow()
1667 .get_matching_core(&instrument.id())
1668 .unwrap();
1669 assert!(core.get_orders().is_empty());
1670 }
1671
1672 #[rstest]
1673 fn test_cancel_order_removes_cached_command(instrument: CryptoPerpetual) {
1674 let (_clock, cache, emulator) = create_emulator();
1675 add_instrument_to_cache(&cache, &instrument);
1676 let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1677 let client_order_id = order.client_order_id();
1678 let command = create_submit_order(&instrument, &order);
1679 cache
1680 .borrow_mut()
1681 .add_order(order.clone(), None, None, false)
1682 .unwrap();
1683 emulator
1684 .borrow_mut()
1685 .cache_submit_order_command(command.clone());
1686 emulator.borrow_mut().handle_submit_order(command);
1687
1688 emulator.borrow_mut().cancel_order(&order);
1689
1690 let commands = emulator.borrow().get_submit_order_commands();
1691 assert!(!commands.contains_key(&client_order_id));
1692 }
1693}