Skip to main content

nautilus_execution/order_emulator/
emulator.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    fmt::Debug,
19    ops::{Deref, DerefMut},
20    rc::Rc,
21};
22
23use ahash::{AHashMap, AHashSet};
24use nautilus_common::{
25    actor::{DataActorConfig, DataActorCore},
26    cache::Cache,
27    clock::Clock,
28    logging::{CMD, EVT, RECV},
29    messages::execution::{
30        CancelAllOrders, CancelOrder, ModifyOrder, SubmitOrder, SubmitOrderList, TradingCommand,
31    },
32    msgbus::{
33        self, TypedHandler,
34        switchboard::{get_quotes_topic, get_trades_topic},
35    },
36};
37use nautilus_core::{UUID4, WeakCell};
38use nautilus_model::{
39    data::{OrderBookDeltas, QuoteTick, TradeTick},
40    enums::{ContingencyType, OrderSide, OrderSideSpecified, OrderStatus, OrderType, TriggerType},
41    events::{OrderCanceled, OrderEmulated, OrderEventAny, OrderReleased, OrderUpdated},
42    identifiers::{ActorId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId},
43    instruments::Instrument,
44    orders::{LimitOrder, MarketOrder, Order, OrderAny},
45    types::{Price, Quantity},
46};
47
48use crate::{
49    matching_core::{MatchAction, OrderMatchInfo, OrderMatchingCore},
50    order_manager::{
51        handlers::{CancelOrderHandlerAny, ModifyOrderHandlerAny, SubmitOrderHandlerAny},
52        manager::OrderManager,
53    },
54    trailing::trailing_stop_calculate,
55};
56
57pub struct OrderEmulator {
58    actor: DataActorCore,
59    clock: Rc<RefCell<dyn Clock>>,
60    cache: Rc<RefCell<Cache>>,
61    manager: OrderManager,
62    matching_cores: AHashMap<InstrumentId, OrderMatchingCore>,
63    subscribed_quotes: AHashSet<InstrumentId>,
64    subscribed_trades: AHashSet<InstrumentId>,
65    subscribed_strategies: AHashSet<StrategyId>,
66    monitored_positions: AHashSet<PositionId>,
67    on_event_handler: Option<TypedHandler<OrderEventAny>>,
68    self_ref: Option<WeakCell<Self>>,
69}
70
71impl Debug for OrderEmulator {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct(stringify!(OrderEmulator))
74            .field("actor", &self.actor)
75            .field("cores", &self.matching_cores.len())
76            .field("subscribed_quotes", &self.subscribed_quotes.len())
77            .finish()
78    }
79}
80
81impl Deref for OrderEmulator {
82    type Target = DataActorCore;
83
84    fn deref(&self) -> &Self::Target {
85        &self.actor
86    }
87}
88
89impl DerefMut for OrderEmulator {
90    fn deref_mut(&mut self) -> &mut Self::Target {
91        &mut self.actor
92    }
93}
94
95impl OrderEmulator {
96    pub fn new(clock: Rc<RefCell<dyn Clock>>, cache: Rc<RefCell<Cache>>) -> Self {
97        let config = DataActorConfig {
98            actor_id: Some(ActorId::from("OrderEmulator")),
99            ..Default::default()
100        };
101
102        let active_local = true;
103        let manager =
104            OrderManager::new(clock.clone(), cache.clone(), active_local, None, None, None);
105
106        Self {
107            actor: DataActorCore::new(config),
108            clock,
109            cache,
110            manager,
111            matching_cores: AHashMap::new(),
112            subscribed_quotes: AHashSet::new(),
113            subscribed_trades: AHashSet::new(),
114            subscribed_strategies: AHashSet::new(),
115            monitored_positions: AHashSet::new(),
116            on_event_handler: None,
117            self_ref: None,
118        }
119    }
120
121    /// Sets the weak self-reference for creating subscription handlers.
122    pub fn set_self_ref(&mut self, self_ref: WeakCell<Self>) {
123        self.self_ref = Some(self_ref);
124    }
125
126    /// Registers the emulator with the trading system.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if registration fails.
131    pub fn register(
132        &mut self,
133        trader_id: TraderId,
134        clock: Rc<RefCell<dyn Clock>>,
135        cache: Rc<RefCell<Cache>>,
136    ) -> anyhow::Result<()> {
137        self.actor.register(trader_id, clock, cache)
138    }
139
140    pub fn set_on_event_handler(&mut self, handler: TypedHandler<OrderEventAny>) {
141        self.on_event_handler = Some(handler);
142    }
143
144    /// Sets the handler for submit order commands.
145    pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
146        self.manager.set_submit_order_handler(handler);
147    }
148
149    /// Sets the handler for cancel order commands.
150    pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
151        self.manager.set_cancel_order_handler(handler);
152    }
153
154    /// Sets the handler for modify order commands.
155    pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
156        self.manager.set_modify_order_handler(handler);
157    }
158
159    /// Caches a submit order command for emulation tracking.
160    pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
161        self.manager.cache_submit_order_command(command);
162    }
163
164    /// Subscribes to quote data for the given instrument.
165    fn subscribe_quotes_for_instrument(&mut self, instrument_id: InstrumentId) {
166        let Some(self_ref) = self.self_ref.clone() else {
167            log::warn!("Cannot subscribe to quotes: self_ref not set");
168            return;
169        };
170
171        let topic = get_quotes_topic(instrument_id);
172        let handler = TypedHandler::from(move |quote: &QuoteTick| {
173            if let Some(emulator) = self_ref.upgrade() {
174                emulator.borrow_mut().on_quote_tick(*quote);
175            }
176        });
177
178        self.actor
179            .subscribe_quotes(topic, handler, instrument_id, None, None);
180    }
181
182    /// Subscribes to trade data for the given instrument.
183    fn subscribe_trades_for_instrument(&mut self, instrument_id: InstrumentId) {
184        let Some(self_ref) = self.self_ref.clone() else {
185            log::warn!("Cannot subscribe to trades: self_ref not set");
186            return;
187        };
188
189        let topic = get_trades_topic(instrument_id);
190        let handler = TypedHandler::from(move |trade: &TradeTick| {
191            if let Some(emulator) = self_ref.upgrade() {
192                emulator.borrow_mut().on_trade_tick(*trade);
193            }
194        });
195
196        self.actor
197            .subscribe_trades(topic, handler, instrument_id, None, None);
198    }
199
200    #[must_use]
201    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
202        let mut quotes: Vec<InstrumentId> = self.subscribed_quotes.iter().copied().collect();
203        quotes.sort();
204        quotes
205    }
206
207    #[must_use]
208    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
209        let mut trades: Vec<_> = self.subscribed_trades.iter().copied().collect();
210        trades.sort();
211        trades
212    }
213
214    #[must_use]
215    pub fn get_submit_order_commands(&self) -> AHashMap<ClientOrderId, SubmitOrder> {
216        self.manager.get_submit_order_commands()
217    }
218
219    #[must_use]
220    pub fn get_matching_core(&self, instrument_id: &InstrumentId) -> Option<OrderMatchingCore> {
221        self.matching_cores.get(instrument_id).cloned()
222    }
223
224    /// Reactivates emulated orders from cache on start.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if no emulated orders are found or processing fails.
229    ///
230    pub fn on_start(&mut self) -> anyhow::Result<()> {
231        let emulated_orders: Vec<OrderAny> = self
232            .cache
233            .borrow()
234            .orders_emulated(None, None, None, None, None)
235            .into_iter()
236            .cloned()
237            .collect();
238
239        if emulated_orders.is_empty() {
240            log::error!("No emulated orders to reactivate");
241            return Ok(());
242        }
243
244        for order in emulated_orders {
245            if !matches!(
246                order.status(),
247                OrderStatus::Initialized | OrderStatus::Emulated
248            ) {
249                continue; // No longer emulated
250            }
251
252            if let Some(parent_order_id) = &order.parent_order_id() {
253                let parent_order = if let Some(order) = self.cache.borrow().order(parent_order_id) {
254                    order.clone()
255                } else {
256                    log::error!("Cannot handle order: parent {parent_order_id} not found");
257                    continue;
258                };
259
260                let is_position_closed = parent_order
261                    .position_id()
262                    .is_some_and(|id| self.cache.borrow().is_position_closed(&id));
263                if parent_order.is_closed() && is_position_closed {
264                    self.manager.cancel_order(&order);
265                    continue; // Parent already closed
266                }
267
268                if parent_order.contingency_type() == Some(ContingencyType::Oto)
269                    && (parent_order.is_active_local()
270                        || parent_order.filled_qty() == Quantity::zero(0))
271                {
272                    continue; // Process contingency order later once parent triggered
273                }
274            }
275
276            let position_id = self
277                .cache
278                .borrow()
279                .position_id(&order.client_order_id())
280                .copied();
281            let client_id = self
282                .cache
283                .borrow()
284                .client_id(&order.client_order_id())
285                .copied();
286
287            let command = SubmitOrder::new(
288                order.trader_id(),
289                client_id,
290                order.strategy_id(),
291                order.instrument_id(),
292                order.client_order_id(),
293                order.init_event().clone(),
294                order.exec_algorithm_id(),
295                position_id,
296                None, // params
297                UUID4::new(),
298                self.clock.borrow().timestamp_ns(),
299            );
300
301            self.handle_submit_order(command);
302        }
303
304        Ok(())
305    }
306
307    pub fn on_event(&mut self, event: &OrderEventAny) {
308        log::info!("{RECV}{EVT} {event}");
309
310        self.manager.handle_event(event);
311
312        if let Some(order) = self.cache.borrow().order(&event.client_order_id())
313            && order.is_closed()
314            && let Some(matching_core) = self.matching_cores.get_mut(&order.instrument_id())
315            && let Err(e) = matching_core.delete_order(event.client_order_id())
316        {
317            log::error!("Error deleting order: {e}");
318        }
319        // else: Order not in cache yet
320    }
321
322    pub const fn on_stop(&self) {}
323
324    pub fn on_reset(&mut self) {
325        self.manager.reset();
326        self.matching_cores.clear();
327    }
328
329    pub const fn on_dispose(&self) {}
330
331    pub fn execute(&mut self, command: TradingCommand) {
332        log::info!("{RECV}{CMD} {command}");
333
334        match command {
335            TradingCommand::SubmitOrder(command) => self.handle_submit_order(command),
336            TradingCommand::SubmitOrderList(ref command) => self.handle_submit_order_list(command),
337            TradingCommand::ModifyOrder(ref command) => self.handle_modify_order(command),
338            TradingCommand::CancelOrder(command) => self.handle_cancel_order(command),
339            TradingCommand::CancelAllOrders(ref command) => self.handle_cancel_all_orders(command),
340            _ => log::error!("Cannot handle command: unrecognized {command:?}"),
341        }
342    }
343
344    fn create_matching_core(
345        &mut self,
346        instrument_id: InstrumentId,
347        price_increment: Price,
348    ) -> OrderMatchingCore {
349        let matching_core = OrderMatchingCore::new(instrument_id, price_increment);
350        self.matching_cores
351            .insert(instrument_id, matching_core.clone());
352        log::info!("Creating matching core for {instrument_id:?}");
353        matching_core
354    }
355
356    /// # Panics
357    ///
358    /// Panics if the emulation trigger type is `NoTrigger` or if order not in cache.
359    pub fn handle_submit_order(&mut self, command: SubmitOrder) {
360        let client_order_id = command.client_order_id;
361
362        let mut order = self
363            .cache
364            .borrow()
365            .order(&client_order_id)
366            .cloned()
367            .expect("order must exist in cache");
368
369        let emulation_trigger = order.emulation_trigger();
370
371        assert_ne!(
372            emulation_trigger,
373            Some(TriggerType::NoTrigger),
374            "order.emulation_trigger must not be TriggerType::NoTrigger"
375        );
376        assert!(
377            self.manager
378                .get_submit_order_commands()
379                .contains_key(&client_order_id),
380            "client_order_id must be in submit_order_commands"
381        );
382
383        if !matches!(
384            emulation_trigger,
385            Some(TriggerType::Default | TriggerType::BidAsk | TriggerType::LastPrice)
386        ) {
387            log::error!("Cannot emulate order: `TriggerType` {emulation_trigger:?} not supported");
388            self.manager.cancel_order(&order);
389            return;
390        }
391
392        self.check_monitoring(command.strategy_id, command.position_id);
393
394        // Get or create matching core
395        let trigger_instrument_id = order
396            .trigger_instrument_id()
397            .unwrap_or_else(|| order.instrument_id());
398
399        let matching_core = self.matching_cores.get(&trigger_instrument_id).cloned();
400
401        let mut matching_core = if let Some(core) = matching_core {
402            core
403        } else {
404            // Handle synthetic instruments
405            let (instrument_id, price_increment) = if trigger_instrument_id.is_synthetic() {
406                let synthetic = self
407                    .cache
408                    .borrow()
409                    .synthetic(&trigger_instrument_id)
410                    .cloned();
411
412                if let Some(synthetic) = synthetic {
413                    (synthetic.id, synthetic.price_increment)
414                } else {
415                    log::error!(
416                        "Cannot emulate order: no synthetic instrument {trigger_instrument_id} for trigger"
417                    );
418                    self.manager.cancel_order(&order);
419                    return;
420                }
421            } else {
422                let instrument = self
423                    .cache
424                    .borrow()
425                    .instrument(&trigger_instrument_id)
426                    .cloned();
427
428                if let Some(instrument) = instrument {
429                    (instrument.id(), instrument.price_increment())
430                } else {
431                    log::error!(
432                        "Cannot emulate order: no instrument {trigger_instrument_id} for trigger"
433                    );
434                    self.manager.cancel_order(&order);
435                    return;
436                }
437            };
438
439            self.create_matching_core(instrument_id, price_increment)
440        };
441
442        // Update trailing stop
443        if matches!(
444            order.order_type(),
445            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
446        ) {
447            self.update_trailing_stop_order(&mut order);
448            if order.trigger_price().is_none() {
449                log::error!(
450                    "Cannot handle trailing stop order with no trigger_price and no market updates"
451                );
452
453                self.manager.cancel_order(&order);
454                return;
455            }
456        }
457
458        // Cache command
459        self.manager.cache_submit_order_command(command);
460
461        // Check if immediately marketable
462        let match_info = OrderMatchInfo::new(
463            order.client_order_id(),
464            order.order_side().as_specified(),
465            order.order_type(),
466            order.trigger_price(),
467            order.price(),
468            true, // is_activated
469        );
470        matching_core.match_order(&match_info);
471
472        // Handle data subscriptions
473        match emulation_trigger.unwrap() {
474            TriggerType::Default | TriggerType::BidAsk => {
475                if !self.subscribed_quotes.contains(&trigger_instrument_id) {
476                    self.subscribe_quotes_for_instrument(trigger_instrument_id);
477                    self.subscribed_quotes.insert(trigger_instrument_id);
478                }
479            }
480            TriggerType::LastPrice => {
481                if !self.subscribed_trades.contains(&trigger_instrument_id) {
482                    self.subscribe_trades_for_instrument(trigger_instrument_id);
483                    self.subscribed_trades.insert(trigger_instrument_id);
484                }
485            }
486            _ => {
487                log::error!("Invalid TriggerType: {emulation_trigger:?}");
488                return;
489            }
490        }
491
492        // Check if order was already released
493        if !self
494            .manager
495            .get_submit_order_commands()
496            .contains_key(&order.client_order_id())
497        {
498            return; // Already released
499        }
500
501        // Hold in matching core
502        matching_core.add_order(match_info);
503
504        // Generate emulated event if needed
505        if order.status() == OrderStatus::Initialized {
506            let event = OrderEmulated::new(
507                order.trader_id(),
508                order.strategy_id(),
509                order.instrument_id(),
510                order.client_order_id(),
511                UUID4::new(),
512                self.clock.borrow().timestamp_ns(),
513                self.clock.borrow().timestamp_ns(),
514            );
515
516            if let Err(e) = order.apply(OrderEventAny::Emulated(event)) {
517                log::error!("Cannot apply order event: {e:?}");
518                return;
519            }
520
521            if let Err(e) = self.cache.borrow_mut().update_order(&order) {
522                log::error!("Cannot update order: {e:?}");
523                return;
524            }
525
526            self.manager.send_risk_event(OrderEventAny::Emulated(event));
527
528            msgbus::publish_order_event(
529                format!("events.order.{}", order.strategy_id()).into(),
530                &OrderEventAny::Emulated(event),
531            );
532        }
533
534        // Since we are cloning the matching core, we need to insert it back into the original hashmap
535        self.matching_cores
536            .insert(trigger_instrument_id, matching_core);
537
538        log::info!("Emulating {order}");
539    }
540
541    fn handle_submit_order_list(&mut self, command: &SubmitOrderList) {
542        self.check_monitoring(command.strategy_id, command.position_id);
543
544        let orders: Vec<OrderAny> = self
545            .cache
546            .borrow()
547            .orders_for_ids(&command.order_list.client_order_ids, &command);
548
549        for order in &orders {
550            if let Some(parent_order_id) = order.parent_order_id() {
551                let cache = self.cache.borrow();
552                let parent_order = if let Some(parent_order) = cache.order(&parent_order_id) {
553                    parent_order
554                } else {
555                    log::error!("Parent order for {} not found", order.client_order_id());
556                    continue;
557                };
558
559                if parent_order.contingency_type() == Some(ContingencyType::Oto) {
560                    continue; // Process contingency order later once parent triggered
561                }
562            }
563
564            if let Err(e) =
565                self.manager
566                    .create_new_submit_order(order, command.position_id, command.client_id)
567            {
568                log::error!("Error creating new submit order: {e}");
569            }
570        }
571    }
572
573    fn handle_modify_order(&mut self, command: &ModifyOrder) {
574        if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
575            let price = match command.price {
576                Some(price) => Some(price),
577                None => order.price(),
578            };
579
580            let trigger_price = match command.trigger_price {
581                Some(trigger_price) => Some(trigger_price),
582                None => order.trigger_price(),
583            };
584
585            let ts_now = self.clock.borrow().timestamp_ns();
586            let event = OrderUpdated::new(
587                order.trader_id(),
588                order.strategy_id(),
589                order.instrument_id(),
590                order.client_order_id(),
591                command.quantity.unwrap_or(order.quantity()),
592                UUID4::new(),
593                ts_now,
594                ts_now,
595                false,
596                order.venue_order_id(),
597                order.account_id(),
598                price,
599                trigger_price,
600                None,
601                order.is_quote_quantity(),
602            );
603
604            self.manager.send_exec_event(OrderEventAny::Updated(event));
605
606            let trigger_instrument_id = order
607                .trigger_instrument_id()
608                .unwrap_or_else(|| order.instrument_id());
609
610            if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
611                let match_info = OrderMatchInfo::new(
612                    order.client_order_id(),
613                    order.order_side().as_specified(),
614                    order.order_type(),
615                    trigger_price,
616                    price,
617                    true, // is_activated
618                );
619                matching_core.match_order(&match_info);
620            } else {
621                log::error!(
622                    "Cannot handle `ModifyOrder`: no matching core for trigger instrument {trigger_instrument_id}"
623                );
624            }
625        } else {
626            log::error!("Cannot modify order: {} not found", command.client_order_id);
627        }
628    }
629
630    pub fn handle_cancel_order(&mut self, command: CancelOrder) {
631        let order = if let Some(order) = self.cache.borrow().order(&command.client_order_id) {
632            order.clone()
633        } else {
634            log::error!("Cannot cancel order: {} not found", command.client_order_id);
635            return;
636        };
637
638        let trigger_instrument_id = order
639            .trigger_instrument_id()
640            .unwrap_or_else(|| order.instrument_id());
641
642        let matching_core = if let Some(core) = self.matching_cores.get(&trigger_instrument_id) {
643            core
644        } else {
645            self.manager.cancel_order(&order);
646            return;
647        };
648
649        if !matching_core.order_exists(order.client_order_id())
650            && order.is_open()
651            && !order.is_pending_cancel()
652        {
653            // Order not held in the emulator
654            self.manager
655                .send_exec_command(TradingCommand::CancelOrder(command));
656        } else {
657            self.manager.cancel_order(&order);
658        }
659    }
660
661    fn handle_cancel_all_orders(&mut self, command: &CancelAllOrders) {
662        let instrument_id = command.instrument_id;
663        let matching_core = match self.matching_cores.get(&instrument_id) {
664            Some(core) => core,
665            None => return, // No orders to cancel
666        };
667
668        let orders_to_cancel = match command.order_side {
669            OrderSide::NoOrderSide => {
670                // Get both bid and ask orders
671                let mut all_orders = Vec::new();
672                all_orders.extend(matching_core.get_orders_bid().iter().cloned());
673                all_orders.extend(matching_core.get_orders_ask().iter().cloned());
674                all_orders
675            }
676            OrderSide::Buy => matching_core.get_orders_bid().to_vec(),
677            OrderSide::Sell => matching_core.get_orders_ask().to_vec(),
678        };
679
680        // Process all orders in a single iteration
681        for match_info in orders_to_cancel {
682            if let Some(order) = self
683                .cache
684                .borrow()
685                .order(&match_info.client_order_id)
686                .cloned()
687            {
688                self.manager.cancel_order(&order);
689            }
690        }
691    }
692
693    pub fn update_order(&mut self, order: &mut OrderAny, new_quantity: Quantity) {
694        log::info!(
695            "Updating order {} quantity to {new_quantity}",
696            order.client_order_id(),
697        );
698
699        let ts_now = self.clock.borrow().timestamp_ns();
700        let event = OrderUpdated::new(
701            order.trader_id(),
702            order.strategy_id(),
703            order.instrument_id(),
704            order.client_order_id(),
705            new_quantity,
706            UUID4::new(),
707            ts_now,
708            ts_now,
709            false,
710            None,
711            order.account_id(),
712            None,
713            None,
714            None,
715            order.is_quote_quantity(),
716        );
717
718        if let Err(e) = order.apply(OrderEventAny::Updated(event)) {
719            log::error!("Cannot apply order event: {e:?}");
720            return;
721        }
722
723        if let Err(e) = self.cache.borrow_mut().update_order(order) {
724            log::error!("Cannot update order: {e:?}");
725            return;
726        }
727
728        self.manager.send_risk_event(OrderEventAny::Updated(event));
729    }
730
731    pub fn on_order_book_deltas(&mut self, deltas: &OrderBookDeltas) {
732        log::debug!("Processing {deltas:?}");
733
734        let instrument_id = &deltas.instrument_id;
735        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
736            if let Some(book) = self.cache.borrow().order_book(instrument_id) {
737                let best_bid = book.best_bid_price();
738                let best_ask = book.best_ask_price();
739
740                if let Some(best_bid) = best_bid {
741                    matching_core.set_bid_raw(best_bid);
742                }
743
744                if let Some(best_ask) = best_ask {
745                    matching_core.set_ask_raw(best_ask);
746                }
747            } else {
748                log::error!(
749                    "Cannot handle `OrderBookDeltas`: no book being maintained for {}",
750                    deltas.instrument_id
751                );
752            }
753
754            self.iterate_orders(instrument_id);
755        } else {
756            log::error!(
757                "Cannot handle `OrderBookDeltas`: no matching core for instrument {}",
758                deltas.instrument_id
759            );
760        }
761    }
762
763    pub fn on_quote_tick(&mut self, quote: QuoteTick) {
764        log::debug!("Processing {quote}:?");
765
766        let instrument_id = &quote.instrument_id;
767        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
768            matching_core.set_bid_raw(quote.bid_price);
769            matching_core.set_ask_raw(quote.ask_price);
770
771            self.iterate_orders(instrument_id);
772        } else {
773            log::error!(
774                "Cannot handle `QuoteTick`: no matching core for instrument {}",
775                quote.instrument_id
776            );
777        }
778    }
779
780    pub fn on_trade_tick(&mut self, trade: TradeTick) {
781        log::debug!("Processing {trade:?}");
782
783        let instrument_id = &trade.instrument_id;
784        if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
785            matching_core.set_last_raw(trade.price);
786
787            if !self.subscribed_quotes.contains(instrument_id) {
788                matching_core.set_bid_raw(trade.price);
789                matching_core.set_ask_raw(trade.price);
790            }
791
792            self.iterate_orders(instrument_id);
793        } else {
794            log::error!(
795                "Cannot handle `TradeTick`: no matching core for instrument {}",
796                trade.instrument_id
797            );
798        }
799    }
800
801    fn iterate_orders(&mut self, instrument_id: &InstrumentId) {
802        // Process bid actions before ask actions so cross-side
803        // contingencies (OCO/OUO) mutate state between sides
804        let bid_actions = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
805            matching_core.iterate_bids()
806        } else {
807            log::error!("Cannot iterate orders: no matching core for instrument {instrument_id}");
808            return;
809        };
810
811        for action in bid_actions {
812            match action {
813                MatchAction::FillLimit(id) => self.fill_limit_order(id),
814                MatchAction::TriggerStop(id) => self.trigger_stop_order(id),
815            }
816        }
817
818        let ask_actions = if let Some(matching_core) = self.matching_cores.get_mut(instrument_id) {
819            matching_core.iterate_asks()
820        } else {
821            return;
822        };
823
824        for action in ask_actions {
825            match action {
826                MatchAction::FillLimit(id) => self.fill_limit_order(id),
827                MatchAction::TriggerStop(id) => self.trigger_stop_order(id),
828            }
829        }
830
831        // Re-snapshot orders after actions to avoid stale trailing stop updates
832        let orders = if let Some(matching_core) = self.matching_cores.get(instrument_id) {
833            matching_core.get_orders()
834        } else {
835            return;
836        };
837
838        for match_info in orders {
839            if !matches!(
840                match_info.order_type,
841                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
842            ) {
843                continue;
844            }
845
846            let mut order = match self
847                .cache
848                .borrow()
849                .order(&match_info.client_order_id)
850                .cloned()
851            {
852                Some(order) => order,
853                None => continue,
854            };
855
856            if order.is_closed() {
857                continue;
858            }
859
860            self.update_trailing_stop_order(&mut order);
861        }
862    }
863
864    pub fn cancel_order(&mut self, order: &OrderAny) {
865        log::info!("Canceling order {}", order.client_order_id());
866
867        let mut order = order.clone();
868        order.set_emulation_trigger(Some(TriggerType::NoTrigger));
869
870        let trigger_instrument_id = order
871            .trigger_instrument_id()
872            .unwrap_or(order.instrument_id());
873
874        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id)
875            && let Err(e) = matching_core.delete_order(order.client_order_id())
876        {
877            log::error!("Cannot delete order: {e:?}");
878        }
879
880        self.manager
881            .pop_submit_order_command(order.client_order_id());
882
883        self.cache
884            .borrow_mut()
885            .update_order_pending_cancel_local(&order);
886
887        let ts_now = self.clock.borrow().timestamp_ns();
888        let event = OrderCanceled::new(
889            order.trader_id(),
890            order.strategy_id(),
891            order.instrument_id(),
892            order.client_order_id(),
893            UUID4::new(),
894            ts_now,
895            ts_now,
896            false,
897            order.venue_order_id(),
898            order.account_id(),
899        );
900
901        self.manager.send_exec_event(OrderEventAny::Canceled(event));
902    }
903
904    fn check_monitoring(&mut self, strategy_id: StrategyId, position_id: Option<PositionId>) {
905        if !self.subscribed_strategies.contains(&strategy_id) {
906            // Subscribe to strategy order events
907            if let Some(handler) = &self.on_event_handler {
908                msgbus::subscribe_order_events(
909                    format!("events.order.{strategy_id}").into(),
910                    handler.clone(),
911                    None,
912                );
913                self.subscribed_strategies.insert(strategy_id);
914                log::info!("Subscribed to strategy {strategy_id} order events");
915            }
916        }
917
918        if let Some(position_id) = position_id
919            && !self.monitored_positions.contains(&position_id)
920        {
921            self.monitored_positions.insert(position_id);
922        }
923    }
924
925    /// Validates market data availability for order release.
926    ///
927    /// Returns `Some(released_price)` if market data is available, `None` otherwise.
928    /// Logs appropriate warnings when market data is not yet available.
929    ///
930    /// Does NOT pop the submit order command - caller must do that and handle missing command
931    /// according to their contract (panic for market orders, return for limit orders).
932    fn validate_release(
933        &self,
934        order: &OrderAny,
935        matching_core: &OrderMatchingCore,
936        trigger_instrument_id: InstrumentId,
937    ) -> Option<Price> {
938        let released_price = match order.order_side_specified() {
939            OrderSideSpecified::Buy => matching_core.ask,
940            OrderSideSpecified::Sell => matching_core.bid,
941        };
942
943        if released_price.is_none() {
944            log::warn!(
945                "Cannot release order {} yet: no market data available for {trigger_instrument_id}, will retry on next update",
946                order.client_order_id(),
947            );
948            return None;
949        }
950
951        Some(released_price.unwrap())
952    }
953
954    /// # Panics
955    ///
956    /// Panics if the order type is invalid for a stop order.
957    pub fn trigger_stop_order(&mut self, client_order_id: ClientOrderId) {
958        let order = match self.cache.borrow().order(&client_order_id).cloned() {
959            Some(order) => order,
960            None => {
961                log::error!(
962                    "Cannot trigger stop order: order {client_order_id} not found in cache"
963                );
964                return;
965            }
966        };
967
968        match order.order_type() {
969            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
970                self.fill_limit_order(client_order_id);
971            }
972            OrderType::Market | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
973                self.fill_market_order(client_order_id);
974            }
975            _ => panic!("invalid `OrderType`, was {}", order.order_type()),
976        }
977    }
978
979    /// # Panics
980    ///
981    /// Panics if a limit order has no price.
982    pub fn fill_limit_order(&mut self, client_order_id: ClientOrderId) {
983        let order = match self.cache.borrow().order(&client_order_id).cloned() {
984            Some(order) => order,
985            None => {
986                log::error!("Cannot fill limit order: order {client_order_id} not found in cache");
987                return;
988            }
989        };
990
991        if matches!(order.order_type(), OrderType::Limit) {
992            self.fill_market_order(client_order_id);
993            return;
994        }
995
996        let trigger_instrument_id = order
997            .trigger_instrument_id()
998            .unwrap_or(order.instrument_id());
999
1000        let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
1001            Some(core) => core,
1002            None => {
1003                log::error!(
1004                    "Cannot fill limit order: no matching core for instrument {trigger_instrument_id}"
1005                );
1006                return; // Order stays queued for retry
1007            }
1008        };
1009
1010        let released_price =
1011            match self.validate_release(&order, matching_core, trigger_instrument_id) {
1012                Some(price) => price,
1013                None => return, // Order stays queued for retry
1014            };
1015
1016        let command = match self
1017            .manager
1018            .pop_submit_order_command(order.client_order_id())
1019        {
1020            Some(command) => command,
1021            None => return, // Order already released
1022        };
1023
1024        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1025            if let Err(e) = matching_core.delete_order(client_order_id) {
1026                log::error!("Error deleting order: {e:?}");
1027            }
1028
1029            let emulation_trigger = TriggerType::NoTrigger;
1030
1031            // Transform order
1032            let mut transformed = if let Ok(transformed) = LimitOrder::new_checked(
1033                order.trader_id(),
1034                order.strategy_id(),
1035                order.instrument_id(),
1036                order.client_order_id(),
1037                order.order_side(),
1038                order.quantity(),
1039                order.price().unwrap(),
1040                order.time_in_force(),
1041                order.expire_time(),
1042                order.is_post_only(),
1043                order.is_reduce_only(),
1044                order.is_quote_quantity(),
1045                order.display_qty(),
1046                Some(emulation_trigger),
1047                Some(trigger_instrument_id),
1048                order.contingency_type(),
1049                order.order_list_id(),
1050                order.linked_order_ids().map(Vec::from),
1051                order.parent_order_id(),
1052                order.exec_algorithm_id(),
1053                order.exec_algorithm_params().cloned(),
1054                order.exec_spawn_id(),
1055                order.tags().map(Vec::from),
1056                UUID4::new(),
1057                self.clock.borrow().timestamp_ns(),
1058            ) {
1059                transformed
1060            } else {
1061                log::error!("Cannot create limit order");
1062                return;
1063            };
1064            transformed.liquidity_side = order.liquidity_side();
1065
1066            // TODO: fix
1067            // let triggered_price = order.trigger_price();
1068            // if triggered_price.is_some() {
1069            //     transformed.trigger_price() = (triggered_price.unwrap());
1070            // }
1071
1072            let original_events = order.events();
1073
1074            for event in original_events {
1075                transformed.events.insert(0, event.clone());
1076            }
1077
1078            if let Err(e) = self.cache.borrow_mut().add_order(
1079                OrderAny::Limit(transformed.clone()),
1080                command.position_id,
1081                command.client_id,
1082                true,
1083            ) {
1084                log::error!("Failed to add order: {e}");
1085            }
1086
1087            msgbus::publish_order_event(
1088                format!("events.order.{}", order.strategy_id()).into(),
1089                transformed.last_event(),
1090            );
1091
1092            let event = OrderReleased::new(
1093                order.trader_id(),
1094                order.strategy_id(),
1095                order.instrument_id(),
1096                order.client_order_id(),
1097                released_price,
1098                UUID4::new(),
1099                self.clock.borrow().timestamp_ns(),
1100                self.clock.borrow().timestamp_ns(),
1101            );
1102
1103            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1104                log::error!("Failed to apply order event: {e}");
1105            }
1106
1107            if let Err(e) = self
1108                .cache
1109                .borrow_mut()
1110                .update_order(&OrderAny::Limit(transformed.clone()))
1111            {
1112                log::error!("Failed to update order: {e}");
1113            }
1114
1115            self.manager.send_risk_event(OrderEventAny::Released(event));
1116
1117            log::info!("Releasing order {}", order.client_order_id());
1118
1119            // Publish event
1120            msgbus::publish_order_event(
1121                format!("events.order.{}", transformed.strategy_id()).into(),
1122                &OrderEventAny::Released(event),
1123            );
1124
1125            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1126                self.manager.send_algo_command(command, exec_algorithm_id);
1127            } else {
1128                self.manager
1129                    .send_exec_command(TradingCommand::SubmitOrder(command));
1130            }
1131        }
1132    }
1133
1134    /// # Panics
1135    ///
1136    /// Panics if a market order command is missing.
1137    pub fn fill_market_order(&mut self, client_order_id: ClientOrderId) {
1138        let mut order = match self.cache.borrow().order(&client_order_id).cloned() {
1139            Some(order) => order,
1140            None => {
1141                log::error!("Cannot fill market order: order {client_order_id} not found in cache");
1142                return;
1143            }
1144        };
1145
1146        let trigger_instrument_id = order
1147            .trigger_instrument_id()
1148            .unwrap_or(order.instrument_id());
1149
1150        let matching_core = match self.matching_cores.get(&trigger_instrument_id) {
1151            Some(core) => core,
1152            None => {
1153                log::error!(
1154                    "Cannot fill market order: no matching core for instrument {trigger_instrument_id}"
1155                );
1156                return; // Order stays queued for retry
1157            }
1158        };
1159
1160        let released_price =
1161            match self.validate_release(&order, matching_core, trigger_instrument_id) {
1162                Some(price) => price,
1163                None => return, // Order stays queued for retry
1164            };
1165
1166        let command = self
1167            .manager
1168            .pop_submit_order_command(order.client_order_id())
1169            .expect("invalid operation `fill_market_order` with no command");
1170
1171        if let Some(matching_core) = self.matching_cores.get_mut(&trigger_instrument_id) {
1172            if let Err(e) = matching_core.delete_order(client_order_id) {
1173                log::error!("Cannot delete order: {e:?}");
1174            }
1175
1176            order.set_emulation_trigger(Some(TriggerType::NoTrigger));
1177
1178            // Transform order
1179            let mut transformed = MarketOrder::new(
1180                order.trader_id(),
1181                order.strategy_id(),
1182                order.instrument_id(),
1183                order.client_order_id(),
1184                order.order_side(),
1185                order.quantity(),
1186                order.time_in_force(),
1187                UUID4::new(),
1188                self.clock.borrow().timestamp_ns(),
1189                order.is_reduce_only(),
1190                order.is_quote_quantity(),
1191                order.contingency_type(),
1192                order.order_list_id(),
1193                order.linked_order_ids().map(Vec::from),
1194                order.parent_order_id(),
1195                order.exec_algorithm_id(),
1196                order.exec_algorithm_params().cloned(),
1197                order.exec_spawn_id(),
1198                order.tags().map(Vec::from),
1199            );
1200
1201            let original_events = order.events();
1202
1203            for event in original_events {
1204                transformed.events.insert(0, event.clone());
1205            }
1206
1207            if let Err(e) = self.cache.borrow_mut().add_order(
1208                OrderAny::Market(transformed.clone()),
1209                command.position_id,
1210                command.client_id,
1211                true,
1212            ) {
1213                log::error!("Failed to add order: {e}");
1214            }
1215
1216            msgbus::publish_order_event(
1217                format!("events.order.{}", order.strategy_id()).into(),
1218                transformed.last_event(),
1219            );
1220
1221            let ts_now = self.clock.borrow().timestamp_ns();
1222            let event = OrderReleased::new(
1223                order.trader_id(),
1224                order.strategy_id(),
1225                order.instrument_id(),
1226                order.client_order_id(),
1227                released_price,
1228                UUID4::new(),
1229                ts_now,
1230                ts_now,
1231            );
1232
1233            if let Err(e) = transformed.apply(OrderEventAny::Released(event)) {
1234                log::error!("Failed to apply order event: {e}");
1235            }
1236
1237            if let Err(e) = self
1238                .cache
1239                .borrow_mut()
1240                .update_order(&OrderAny::Market(transformed))
1241            {
1242                log::error!("Failed to update order: {e}");
1243            }
1244            self.manager.send_risk_event(OrderEventAny::Released(event));
1245
1246            log::info!("Releasing order {}", order.client_order_id());
1247
1248            // Publish event
1249            msgbus::publish_order_event(
1250                format!("events.order.{}", order.strategy_id()).into(),
1251                &OrderEventAny::Released(event),
1252            );
1253
1254            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
1255                self.manager.send_algo_command(command, exec_algorithm_id);
1256            } else {
1257                self.manager
1258                    .send_exec_command(TradingCommand::SubmitOrder(command));
1259            }
1260        }
1261    }
1262
1263    fn update_trailing_stop_order(&self, order: &mut OrderAny) {
1264        let Some(matching_core) = self.matching_cores.get(&order.instrument_id()) else {
1265            log::error!(
1266                "Cannot update trailing-stop order: no matching core for instrument {}",
1267                order.instrument_id()
1268            );
1269            return;
1270        };
1271
1272        let mut bid = matching_core.bid;
1273        let mut ask = matching_core.ask;
1274        let mut last = matching_core.last;
1275
1276        if bid.is_none() || ask.is_none() || last.is_none() {
1277            if let Some(q) = self.cache.borrow().quote(&matching_core.instrument_id) {
1278                bid.get_or_insert(q.bid_price);
1279                ask.get_or_insert(q.ask_price);
1280            }
1281
1282            if let Some(t) = self.cache.borrow().trade(&matching_core.instrument_id) {
1283                last.get_or_insert(t.price);
1284            }
1285        }
1286
1287        let (new_trigger_px, new_limit_px) = match trailing_stop_calculate(
1288            matching_core.price_increment,
1289            order.trigger_price(),
1290            order.activation_price(),
1291            order,
1292            bid,
1293            ask,
1294            last,
1295        ) {
1296            Ok(pair) => pair,
1297            Err(e) => {
1298                log::warn!("Cannot calculate trailing-stop update: {e}");
1299                return;
1300            }
1301        };
1302
1303        if new_trigger_px.is_none() && new_limit_px.is_none() {
1304            return;
1305        }
1306
1307        let ts_now = self.clock.borrow().timestamp_ns();
1308        let update = OrderUpdated::new(
1309            order.trader_id(),
1310            order.strategy_id(),
1311            order.instrument_id(),
1312            order.client_order_id(),
1313            order.quantity(),
1314            UUID4::new(),
1315            ts_now,
1316            ts_now,
1317            false,
1318            order.venue_order_id(),
1319            order.account_id(),
1320            new_limit_px,
1321            new_trigger_px,
1322            None,
1323            order.is_quote_quantity(),
1324        );
1325        let wrapped = OrderEventAny::Updated(update);
1326        if let Err(e) = order.apply(wrapped.clone()) {
1327            log::error!("Failed to apply order event: {e}");
1328            return;
1329        }
1330
1331        if let Err(e) = self.cache.borrow_mut().update_order(order) {
1332            log::error!("Failed to update order in cache: {e}");
1333            return;
1334        }
1335        self.manager.send_risk_event(wrapped);
1336    }
1337}
1338
1339#[cfg(test)]
1340mod tests {
1341    use std::{cell::RefCell, rc::Rc};
1342
1343    use nautilus_common::{cache::Cache, clock::TestClock};
1344    use nautilus_core::{UUID4, WeakCell};
1345    use nautilus_model::{
1346        data::{QuoteTick, TradeTick},
1347        enums::{AggressorSide, OrderSide, OrderType, TriggerType},
1348        identifiers::{StrategyId, TradeId, TraderId},
1349        instruments::{
1350            CryptoPerpetual, Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt,
1351        },
1352        orders::OrderTestBuilder,
1353        types::{Price, Quantity},
1354    };
1355    use rstest::{fixture, rstest};
1356
1357    use super::*;
1358
1359    #[fixture]
1360    fn instrument() -> CryptoPerpetual {
1361        crypto_perpetual_ethusdt()
1362    }
1363
1364    #[expect(clippy::type_complexity)]
1365    fn create_emulator() -> (
1366        Rc<RefCell<dyn Clock>>,
1367        Rc<RefCell<Cache>>,
1368        Rc<RefCell<OrderEmulator>>,
1369    ) {
1370        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1371        let cache = Rc::new(RefCell::new(Cache::new(None, None)));
1372        let emulator = Rc::new(RefCell::new(OrderEmulator::new(
1373            clock.clone(),
1374            cache.clone(),
1375        )));
1376
1377        // Register with trader for subscription support
1378        emulator
1379            .borrow_mut()
1380            .register(TraderId::from("TRADER-001"), clock.clone(), cache.clone())
1381            .unwrap();
1382
1383        // Set self-ref for subscription handlers
1384        let self_ref = WeakCell::from(Rc::downgrade(&emulator));
1385        emulator.borrow_mut().set_self_ref(self_ref);
1386
1387        (clock, cache, emulator)
1388    }
1389
1390    fn create_stop_market_order(instrument: &CryptoPerpetual, trigger: TriggerType) -> OrderAny {
1391        OrderTestBuilder::new(OrderType::StopMarket)
1392            .instrument_id(instrument.id())
1393            .side(OrderSide::Buy)
1394            .trigger_price(Price::from("5100.00"))
1395            .quantity(Quantity::from(1))
1396            .emulation_trigger(trigger)
1397            .build()
1398    }
1399
1400    fn create_submit_order(instrument: &CryptoPerpetual, order: &OrderAny) -> SubmitOrder {
1401        SubmitOrder::new(
1402            TraderId::from("TRADER-001"),
1403            None,
1404            StrategyId::from("STRATEGY-001"),
1405            instrument.id(),
1406            order.client_order_id(),
1407            order.init_event().clone(),
1408            None,
1409            None,
1410            None,
1411            UUID4::new(),
1412            0.into(),
1413        )
1414    }
1415
1416    fn create_quote_tick(instrument: &CryptoPerpetual, bid: &str, ask: &str) -> QuoteTick {
1417        QuoteTick::new(
1418            instrument.id(),
1419            Price::from(bid),
1420            Price::from(ask),
1421            Quantity::from(10),
1422            Quantity::from(10),
1423            0.into(),
1424            0.into(),
1425        )
1426    }
1427
1428    fn create_trade_tick(instrument: &CryptoPerpetual, price: &str) -> TradeTick {
1429        TradeTick::new(
1430            instrument.id(),
1431            Price::from(price),
1432            Quantity::from(1),
1433            AggressorSide::Buyer,
1434            TradeId::from("T-001"),
1435            0.into(),
1436            0.into(),
1437        )
1438    }
1439
1440    fn add_instrument_to_cache(cache: &Rc<RefCell<Cache>>, instrument: &CryptoPerpetual) {
1441        cache
1442            .borrow_mut()
1443            .add_instrument(InstrumentAny::CryptoPerpetual(instrument.clone()))
1444            .unwrap();
1445    }
1446
1447    #[rstest]
1448    fn test_subscribed_quotes_initially_empty() {
1449        let (_clock, _cache, emulator) = create_emulator();
1450
1451        assert!(emulator.borrow().subscribed_quotes().is_empty());
1452    }
1453
1454    #[rstest]
1455    fn test_subscribed_trades_initially_empty() {
1456        let (_clock, _cache, emulator) = create_emulator();
1457
1458        assert!(emulator.borrow().subscribed_trades().is_empty());
1459    }
1460
1461    #[rstest]
1462    fn test_get_submit_order_commands_initially_empty() {
1463        let (_clock, _cache, emulator) = create_emulator();
1464
1465        assert!(emulator.borrow().get_submit_order_commands().is_empty());
1466    }
1467
1468    #[rstest]
1469    fn test_get_matching_core_returns_none_when_not_created(instrument: CryptoPerpetual) {
1470        let (_clock, _cache, emulator) = create_emulator();
1471
1472        assert!(
1473            emulator
1474                .borrow()
1475                .get_matching_core(&instrument.id())
1476                .is_none()
1477        );
1478    }
1479
1480    #[rstest]
1481    fn test_create_matching_core(instrument: CryptoPerpetual) {
1482        let (_clock, _cache, emulator) = create_emulator();
1483
1484        emulator
1485            .borrow_mut()
1486            .create_matching_core(instrument.id(), instrument.price_increment);
1487
1488        assert!(
1489            emulator
1490                .borrow()
1491                .get_matching_core(&instrument.id())
1492                .is_some()
1493        );
1494    }
1495
1496    #[rstest]
1497    fn test_on_quote_tick_no_matching_core_does_not_panic(instrument: CryptoPerpetual) {
1498        let (_clock, _cache, emulator) = create_emulator();
1499        let quote = create_quote_tick(&instrument, "5060.00", "5070.00");
1500
1501        emulator.borrow_mut().on_quote_tick(quote);
1502    }
1503
1504    #[rstest]
1505    fn test_on_trade_tick_no_matching_core_does_not_panic(instrument: CryptoPerpetual) {
1506        let (_clock, _cache, emulator) = create_emulator();
1507        let trade = create_trade_tick(&instrument, "5065.00");
1508
1509        emulator.borrow_mut().on_trade_tick(trade);
1510    }
1511
1512    #[rstest]
1513    fn test_submit_order_bid_ask_trigger_creates_matching_core(instrument: CryptoPerpetual) {
1514        let (_clock, cache, emulator) = create_emulator();
1515        add_instrument_to_cache(&cache, &instrument);
1516        let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1517        let command = create_submit_order(&instrument, &order);
1518        cache
1519            .borrow_mut()
1520            .add_order(order, None, None, false)
1521            .unwrap();
1522
1523        emulator
1524            .borrow_mut()
1525            .cache_submit_order_command(command.clone());
1526        emulator.borrow_mut().handle_submit_order(command);
1527
1528        assert!(
1529            emulator
1530                .borrow()
1531                .get_matching_core(&instrument.id())
1532                .is_some()
1533        );
1534    }
1535
1536    #[rstest]
1537    fn test_submit_order_bid_ask_trigger_tracks_quote_subscription(instrument: CryptoPerpetual) {
1538        let (_clock, cache, emulator) = create_emulator();
1539        add_instrument_to_cache(&cache, &instrument);
1540        let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1541        let command = create_submit_order(&instrument, &order);
1542        cache
1543            .borrow_mut()
1544            .add_order(order, None, None, false)
1545            .unwrap();
1546
1547        emulator
1548            .borrow_mut()
1549            .cache_submit_order_command(command.clone());
1550        emulator.borrow_mut().handle_submit_order(command);
1551
1552        assert_eq!(emulator.borrow().subscribed_quotes(), vec![instrument.id()]);
1553        assert!(emulator.borrow().subscribed_trades().is_empty());
1554    }
1555
1556    #[rstest]
1557    fn test_submit_order_last_price_trigger_tracks_trade_subscription(instrument: CryptoPerpetual) {
1558        let (_clock, cache, emulator) = create_emulator();
1559        add_instrument_to_cache(&cache, &instrument);
1560        let order = create_stop_market_order(&instrument, TriggerType::LastPrice);
1561        let command = create_submit_order(&instrument, &order);
1562        cache
1563            .borrow_mut()
1564            .add_order(order, None, None, false)
1565            .unwrap();
1566
1567        emulator
1568            .borrow_mut()
1569            .cache_submit_order_command(command.clone());
1570        emulator.borrow_mut().handle_submit_order(command);
1571
1572        assert!(emulator.borrow().subscribed_quotes().is_empty());
1573        assert_eq!(emulator.borrow().subscribed_trades(), vec![instrument.id()]);
1574    }
1575
1576    #[rstest]
1577    fn test_submit_order_caches_command(instrument: CryptoPerpetual) {
1578        let (_clock, cache, emulator) = create_emulator();
1579        add_instrument_to_cache(&cache, &instrument);
1580        let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1581        let client_order_id = order.client_order_id();
1582        let command = create_submit_order(&instrument, &order);
1583        cache
1584            .borrow_mut()
1585            .add_order(order, None, None, false)
1586            .unwrap();
1587
1588        emulator
1589            .borrow_mut()
1590            .cache_submit_order_command(command.clone());
1591        emulator.borrow_mut().handle_submit_order(command);
1592
1593        let commands = emulator.borrow().get_submit_order_commands();
1594        assert!(commands.contains_key(&client_order_id));
1595    }
1596
1597    #[rstest]
1598    fn test_quote_tick_updates_matching_core_prices(instrument: CryptoPerpetual) {
1599        let (_clock, cache, emulator) = create_emulator();
1600        add_instrument_to_cache(&cache, &instrument);
1601        let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1602        let command = create_submit_order(&instrument, &order);
1603        cache
1604            .borrow_mut()
1605            .add_order(order, None, None, false)
1606            .unwrap();
1607        emulator
1608            .borrow_mut()
1609            .cache_submit_order_command(command.clone());
1610        emulator.borrow_mut().handle_submit_order(command);
1611
1612        let quote = create_quote_tick(&instrument, "5060.00", "5070.00");
1613        emulator.borrow_mut().on_quote_tick(quote);
1614
1615        let core = emulator
1616            .borrow()
1617            .get_matching_core(&instrument.id())
1618            .unwrap();
1619        assert_eq!(core.bid, Some(Price::from("5060.00")));
1620        assert_eq!(core.ask, Some(Price::from("5070.00")));
1621    }
1622
1623    #[rstest]
1624    fn test_trade_tick_updates_matching_core_last_price(instrument: CryptoPerpetual) {
1625        let (_clock, cache, emulator) = create_emulator();
1626        add_instrument_to_cache(&cache, &instrument);
1627        let order = create_stop_market_order(&instrument, TriggerType::LastPrice);
1628        let command = create_submit_order(&instrument, &order);
1629        cache
1630            .borrow_mut()
1631            .add_order(order, None, None, false)
1632            .unwrap();
1633        emulator
1634            .borrow_mut()
1635            .cache_submit_order_command(command.clone());
1636        emulator.borrow_mut().handle_submit_order(command);
1637
1638        let trade = create_trade_tick(&instrument, "5065.00");
1639        emulator.borrow_mut().on_trade_tick(trade);
1640
1641        let core = emulator
1642            .borrow()
1643            .get_matching_core(&instrument.id())
1644            .unwrap();
1645        assert_eq!(core.last, Some(Price::from("5065.00")));
1646    }
1647
1648    #[rstest]
1649    fn test_cancel_order_removes_from_matching_core(instrument: CryptoPerpetual) {
1650        let (_clock, cache, emulator) = create_emulator();
1651        add_instrument_to_cache(&cache, &instrument);
1652        let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1653        let command = create_submit_order(&instrument, &order);
1654        cache
1655            .borrow_mut()
1656            .add_order(order.clone(), None, None, false)
1657            .unwrap();
1658        emulator
1659            .borrow_mut()
1660            .cache_submit_order_command(command.clone());
1661        emulator.borrow_mut().handle_submit_order(command);
1662
1663        emulator.borrow_mut().cancel_order(&order);
1664
1665        let core = emulator
1666            .borrow()
1667            .get_matching_core(&instrument.id())
1668            .unwrap();
1669        assert!(core.get_orders().is_empty());
1670    }
1671
1672    #[rstest]
1673    fn test_cancel_order_removes_cached_command(instrument: CryptoPerpetual) {
1674        let (_clock, cache, emulator) = create_emulator();
1675        add_instrument_to_cache(&cache, &instrument);
1676        let order = create_stop_market_order(&instrument, TriggerType::BidAsk);
1677        let client_order_id = order.client_order_id();
1678        let command = create_submit_order(&instrument, &order);
1679        cache
1680            .borrow_mut()
1681            .add_order(order.clone(), None, None, false)
1682            .unwrap();
1683        emulator
1684            .borrow_mut()
1685            .cache_submit_order_command(command.clone());
1686        emulator.borrow_mut().handle_submit_order(command);
1687
1688        emulator.borrow_mut().cancel_order(&order);
1689
1690        let commands = emulator.borrow().get_submit_order_commands();
1691        assert!(!commands.contains_key(&client_order_id));
1692    }
1693}