Skip to main content

nautilus_execution/matching_engine/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    cmp::min,
19    fmt::Debug,
20    ops::{Add, Sub},
21    rc::Rc,
22};
23
24use chrono::TimeDelta;
25use indexmap::IndexMap;
26use nautilus_common::{
27    cache::Cache,
28    clock::Clock,
29    messages::execution::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
30    msgbus::{self, MessagingSwitchboard},
31};
32use nautilus_core::{UUID4, UnixNanos};
33use nautilus_model::{
34    data::{
35        Bar, BarType, InstrumentClose, OrderBookDelta, OrderBookDeltas, OrderBookDepth10,
36        QuoteTick, TradeTick, order::BookOrder,
37    },
38    enums::{
39        AccountType, AggregationSource, AggressorSide, BookAction, BookType, ContingencyType,
40        InstrumentCloseType, LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OrderSide,
41        OrderSideSpecified, OrderStatus, OrderType, PositionSide, PriceType, TimeInForce,
42        TriggerType,
43    },
44    events::{
45        OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
46        OrderFilled, OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
47    },
48    identifiers::{
49        AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId, Venue,
50        VenueOrderId,
51    },
52    instruments::{Instrument, InstrumentAny},
53    orderbook::OrderBook,
54    orders::{MarketOrder, Order, OrderAny, OrderCore},
55    position::Position,
56    types::{
57        Currency, Money, Price, Quantity, fixed::FIXED_PRECISION, price::PriceRaw,
58        quantity::QuantityRaw,
59    },
60};
61use ustr::Ustr;
62
63use crate::{
64    matching_core::{MatchAction, OrderMatchInfo, OrderMatchingCore},
65    matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
66    models::{
67        fee::{FeeModel, FeeModelAny},
68        fill::{FillModel, FillModelAny},
69    },
70    protection::protection_price_calculate,
71    trailing::trailing_stop_calculate,
72};
73
74/// An order matching engine for a single market.
75pub struct OrderMatchingEngine {
76    /// The venue for the matching engine.
77    pub venue: Venue,
78    /// The instrument for the matching engine.
79    pub instrument: InstrumentAny,
80    /// The instruments raw integer ID for the venue.
81    pub raw_id: u32,
82    /// The order book type for the matching engine.
83    pub book_type: BookType,
84    /// The order management system (OMS) type for the matching engine.
85    pub oms_type: OmsType,
86    /// The account type for the matching engine.
87    pub account_type: AccountType,
88    /// The market status for the matching engine.
89    pub market_status: MarketStatus,
90    /// The config for the matching engine.
91    pub config: OrderMatchingEngineConfig,
92    core: OrderMatchingCore,
93    clock: Rc<RefCell<dyn Clock>>,
94    cache: Rc<RefCell<Cache>>,
95    book: OrderBook,
96    fill_model: FillModelAny,
97    fee_model: FeeModelAny,
98    event_handler: Option<Rc<dyn Fn(OrderEventAny)>>,
99    target_bid: Option<Price>,
100    target_ask: Option<Price>,
101    target_last: Option<Price>,
102    last_bar_bid: Option<Bar>,
103    last_bar_ask: Option<Bar>,
104    fill_at_market: bool,
105    execution_bar_types: IndexMap<InstrumentId, BarType>,
106    execution_bar_deltas: IndexMap<BarType, TimeDelta>,
107    account_ids: IndexMap<TraderId, AccountId>,
108    cached_filled_qty: IndexMap<ClientOrderId, Quantity>,
109    ids_generator: IdsGenerator,
110    last_trade_size: Option<Quantity>,
111    bid_consumption: IndexMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
112    ask_consumption: IndexMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
113    trade_consumption: QuantityRaw,
114    queue_ahead: IndexMap<ClientOrderId, (PriceRaw, QuantityRaw)>,
115    queue_excess: IndexMap<ClientOrderId, QuantityRaw>,
116    queue_pending: IndexMap<ClientOrderId, PriceRaw>,
117    prev_bid_price_raw: PriceRaw,
118    prev_bid_size_raw: QuantityRaw,
119    prev_ask_price_raw: PriceRaw,
120    prev_ask_size_raw: QuantityRaw,
121    last_quote_bid: Option<Price>,
122    last_quote_ask: Option<Price>,
123    tob_initialized: bool,
124    instrument_close: Option<InstrumentClose>,
125    settlement_price: Option<Price>,
126    expiration_processed: bool,
127}
128
129impl Debug for OrderMatchingEngine {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        f.debug_struct(stringify!(OrderMatchingEngine))
132            .field("venue", &self.venue)
133            .field("instrument", &self.instrument.id())
134            .finish()
135    }
136}
137
138impl OrderMatchingEngine {
139    /// Creates a new [`OrderMatchingEngine`] instance.
140    #[expect(clippy::too_many_arguments)]
141    pub fn new(
142        instrument: InstrumentAny,
143        raw_id: u32,
144        fill_model: FillModelAny,
145        fee_model: FeeModelAny,
146        book_type: BookType,
147        oms_type: OmsType,
148        account_type: AccountType,
149        clock: Rc<RefCell<dyn Clock>>,
150        cache: Rc<RefCell<Cache>>,
151        config: OrderMatchingEngineConfig,
152    ) -> Self {
153        let book = OrderBook::new(instrument.id(), book_type);
154        let mut core = OrderMatchingCore::new(instrument.id(), instrument.price_increment());
155        core.set_fill_limit_inside_spread(fill_model.fill_limit_inside_spread());
156        let ids_generator = IdsGenerator::new(
157            instrument.id().venue,
158            oms_type,
159            raw_id,
160            config.use_random_ids,
161            config.use_position_ids,
162            cache.clone(),
163        );
164
165        Self {
166            venue: instrument.id().venue,
167            instrument,
168            raw_id,
169            fill_model,
170            fee_model,
171            event_handler: None,
172            book_type,
173            oms_type,
174            account_type,
175            clock,
176            cache,
177            book,
178            market_status: MarketStatus::Open,
179            config,
180            core,
181            target_bid: None,
182            target_ask: None,
183            target_last: None,
184            last_bar_bid: None,
185            last_bar_ask: None,
186            fill_at_market: true,
187            execution_bar_types: IndexMap::new(),
188            execution_bar_deltas: IndexMap::new(),
189            account_ids: IndexMap::new(),
190            cached_filled_qty: IndexMap::new(),
191            ids_generator,
192            last_trade_size: None,
193            bid_consumption: IndexMap::new(),
194            ask_consumption: IndexMap::new(),
195            trade_consumption: 0,
196            queue_ahead: IndexMap::new(),
197            queue_excess: IndexMap::new(),
198            queue_pending: IndexMap::new(),
199            prev_bid_price_raw: 0,
200            prev_bid_size_raw: 0,
201            prev_ask_price_raw: 0,
202            prev_ask_size_raw: 0,
203            last_quote_bid: None,
204            last_quote_ask: None,
205            tob_initialized: false,
206            instrument_close: None,
207            settlement_price: None,
208            expiration_processed: false,
209        }
210    }
211
212    /// Sets the event handler for dispatching order events.
213    ///
214    /// When set, events are routed through the handler instead of directly
215    /// through the message bus. This allows sandbox execution clients to
216    /// dispatch events through the async runner channel, avoiding `RefCell`
217    /// re-entrancy panics.
218    pub fn set_event_handler(&mut self, handler: Rc<dyn Fn(OrderEventAny)>) {
219        self.event_handler = Some(handler);
220    }
221
222    fn dispatch_order_event(&self, event: OrderEventAny) {
223        if let Some(handler) = &self.event_handler {
224            handler(event);
225        } else {
226            let endpoint = MessagingSwitchboard::exec_engine_process();
227            msgbus::send_order_event(endpoint, event);
228        }
229    }
230
231    /// Resets the matching engine to its initial state.
232    ///
233    /// Clears the order book, execution state, cached data, and resets all
234    /// internal components. This is typically used for backtesting scenarios
235    /// where the engine needs to be reset between test runs.
236    pub fn reset(&mut self) {
237        self.book.reset();
238        self.execution_bar_types.clear();
239        self.execution_bar_deltas.clear();
240        self.account_ids.clear();
241        self.cached_filled_qty.clear();
242        self.core.reset();
243        self.target_bid = None;
244        self.target_ask = None;
245        self.target_last = None;
246        self.last_trade_size = None;
247        self.bid_consumption.clear();
248        self.ask_consumption.clear();
249        self.trade_consumption = 0;
250        self.queue_ahead.clear();
251        self.queue_excess.clear();
252        self.queue_pending.clear();
253        self.prev_bid_price_raw = 0;
254        self.prev_bid_size_raw = 0;
255        self.prev_ask_price_raw = 0;
256        self.prev_ask_size_raw = 0;
257        self.last_quote_bid = None;
258        self.last_quote_ask = None;
259        self.tob_initialized = false;
260        self.instrument_close = None;
261        self.settlement_price = None;
262        self.expiration_processed = false;
263        self.fill_at_market = true;
264        self.ids_generator.reset();
265
266        log::info!("Reset {}", self.instrument.id());
267    }
268
269    fn apply_liquidity_consumption(
270        &mut self,
271        fills: Vec<(Price, Quantity)>,
272        order_side: OrderSide,
273        leaves_qty: Quantity,
274        book_prices: Option<&[Price]>,
275    ) -> Vec<(Price, Quantity)> {
276        if !self.config.liquidity_consumption {
277            return fills;
278        }
279
280        let consumption = match order_side {
281            OrderSide::Buy => &mut self.ask_consumption,
282            OrderSide::Sell => &mut self.bid_consumption,
283            _ => return fills,
284        };
285
286        let mut adjusted_fills = Vec::with_capacity(fills.len());
287        let mut remaining_qty = leaves_qty.raw;
288
289        for (fill_idx, (price, qty)) in fills.into_iter().enumerate() {
290            if remaining_qty == 0 {
291                break;
292            }
293
294            // Use book_price for consumption tracking (original price before MAKER adjustment),
295            // but use price (potentially adjusted) for the output fill.
296            let book_price = book_prices
297                .and_then(|bp| bp.get(fill_idx).copied())
298                .unwrap_or(price);
299
300            let book_price_raw = book_price.raw;
301            let level_size = self
302                .book
303                .get_quantity_at_level(book_price, order_side, qty.precision);
304
305            let (original_size, consumed) = consumption
306                .entry(book_price_raw)
307                .or_insert((level_size.raw, 0));
308
309            // Reset consumption when book size changes (fresh data)
310            if *original_size != level_size.raw {
311                *original_size = level_size.raw;
312                *consumed = 0;
313            }
314
315            let available = original_size.saturating_sub(*consumed);
316            if available == 0 {
317                continue;
318            }
319
320            let adjusted_qty_raw = min(min(qty.raw, available), remaining_qty);
321            if adjusted_qty_raw == 0 {
322                continue;
323            }
324
325            *consumed += adjusted_qty_raw;
326            remaining_qty -= adjusted_qty_raw;
327
328            let adjusted_qty = Quantity::from_raw(adjusted_qty_raw, qty.precision);
329            adjusted_fills.push((price, adjusted_qty));
330        }
331
332        adjusted_fills
333    }
334
335    fn seed_trade_consumption(
336        &mut self,
337        trade_price_raw: PriceRaw,
338        trade_size_raw: QuantityRaw,
339        trade_ts_event: UnixNanos,
340        aggressor_side: AggressorSide,
341    ) {
342        if trade_size_raw == 0 {
343            return;
344        }
345
346        // If the book was updated after the trade's event time, depth deltas
347        // already reflect this trade's consumed volume, skip to avoid double-counting
348        if self.book.ts_last > trade_ts_event {
349            return;
350        }
351
352        let consumption = match aggressor_side {
353            AggressorSide::Buyer => &mut self.ask_consumption,
354            AggressorSide::Seller => &mut self.bid_consumption,
355            AggressorSide::NoAggressor => return,
356        };
357
358        let levels: Vec<_> = match aggressor_side {
359            AggressorSide::Buyer => self
360                .book
361                .asks(None)
362                .take_while(|l| l.price.value.raw <= trade_price_raw)
363                .collect(),
364            AggressorSide::Seller => self
365                .book
366                .bids(None)
367                .take_while(|l| l.price.value.raw >= trade_price_raw)
368                .collect(),
369            _ => unreachable!(),
370        };
371
372        let mut remaining = trade_size_raw;
373        for level in &levels {
374            if remaining == 0 {
375                break;
376            }
377            let level_size = level.size_raw();
378            let entry = consumption
379                .entry(level.price.value.raw)
380                .or_insert((level_size, 0));
381
382            // Reconcile stale level size to prevent reset in apply_liquidity_consumption
383            if entry.0 != level_size {
384                entry.0 = level_size;
385                entry.1 = 0;
386            }
387
388            let available = level_size.saturating_sub(entry.1);
389            let consume = min(remaining, available);
390            entry.1 += consume;
391            remaining -= consume;
392        }
393    }
394
395    /// Sets the fill model for the matching engine.
396    pub fn set_fill_model(&mut self, fill_model: FillModelAny) {
397        self.core
398            .set_fill_limit_inside_spread(fill_model.fill_limit_inside_spread());
399        self.fill_model = fill_model;
400    }
401
402    pub fn set_settlement_price(&mut self, price: Price) {
403        self.settlement_price = Some(price);
404    }
405
406    fn snapshot_queue_position(&mut self, order: &OrderAny, price: Price) {
407        if !self.config.queue_position {
408            return;
409        }
410        let size_prec = self.instrument.size_precision();
411
412        // Pass opposite side because get_quantity_at_level flips internally
413        // (BUY reads asks, SELL reads bids). We want the resting side depth.
414        let qty_ahead = self.book.get_quantity_at_level(
415            price,
416            OrderCore::opposite_side(order.order_side()),
417            size_prec,
418        );
419
420        let client_order_id = order.client_order_id();
421
422        // Clear stale entries from both maps (e.g. order modified to new price)
423        self.queue_pending.shift_remove(&client_order_id);
424        self.queue_ahead.shift_remove(&client_order_id);
425
426        // For L1 books, levels behind the BBO have no visible depth. Track
427        // these orders separately so fills are blocked until the BBO reaches
428        // this price. Only truly behind-BBO prices are pending (BUY below
429        // best bid / SELL above best ask); inside-spread and no-book keep 0.
430        if self.book_type == BookType::L1_MBP && qty_ahead.raw == 0 {
431            let behind_bbo = match order.order_side() {
432                OrderSide::Buy => self.book.best_bid_price().is_some_and(|bid| price < bid),
433                OrderSide::Sell => self.book.best_ask_price().is_some_and(|ask| price > ask),
434                _ => false,
435            };
436
437            if behind_bbo {
438                self.queue_pending.insert(client_order_id, price.raw);
439                return;
440            }
441        }
442
443        self.queue_ahead
444            .insert(client_order_id, (price.raw, qty_ahead.raw));
445    }
446
447    fn decrement_queue_on_trade(
448        &mut self,
449        price_raw: PriceRaw,
450        trade_size_raw: QuantityRaw,
451        aggressor_side: AggressorSide,
452    ) {
453        if !self.config.queue_position {
454            return;
455        }
456
457        self.queue_excess.clear();
458
459        let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
460        let mut entries: Vec<(ClientOrderId, QuantityRaw, QuantityRaw)> = Vec::new();
461        let mut stale: Vec<ClientOrderId> = Vec::new();
462
463        for client_order_id in keys {
464            let (order_price_raw, ahead_raw) = match self.queue_ahead.get(&client_order_id).copied()
465            {
466                Some(v) => v,
467                None => continue,
468            };
469
470            let cache = self.cache.borrow();
471            let order_info = cache.order(&client_order_id).and_then(|order| {
472                if order.is_closed() {
473                    None
474                } else {
475                    Some((order.order_side(), order.leaves_qty().raw))
476                }
477            });
478            drop(cache);
479
480            let Some((order_side, leaves_raw)) = order_info else {
481                stale.push(client_order_id);
482                continue;
483            };
484
485            if order_price_raw != price_raw || ahead_raw == 0 {
486                continue;
487            }
488
489            let should_decrement = matches!(aggressor_side, AggressorSide::NoAggressor)
490                || (aggressor_side == AggressorSide::Buyer && order_side == OrderSide::Sell)
491                || (aggressor_side == AggressorSide::Seller && order_side == OrderSide::Buy);
492
493            if should_decrement {
494                entries.push((client_order_id, ahead_raw, leaves_raw));
495            }
496        }
497
498        for id in stale {
499            self.queue_ahead.shift_remove(&id);
500        }
501
502        // Sort by queue position (earliest first) for shared budget allocation
503        entries.sort_by_key(|&(_, ahead, _)| ahead);
504
505        let mut remaining = trade_size_raw;
506        let mut prev_position: QuantityRaw = 0;
507
508        for (client_order_id, ahead_raw, leaves_raw) in &entries {
509            if remaining == 0 {
510                let new_ahead = ahead_raw.saturating_sub(trade_size_raw);
511                self.queue_ahead
512                    .insert(*client_order_id, (price_raw, new_ahead));
513                if new_ahead == 0 {
514                    // Queue cleared but no trade volume left for this order
515                    self.queue_excess.insert(*client_order_id, 0);
516                }
517                continue;
518            }
519
520            // Consume the gap between previous position and this order's depth
521            let gap = ahead_raw.saturating_sub(prev_position);
522            let queue_consumed = remaining.min(gap);
523            remaining -= queue_consumed;
524
525            if remaining == 0 && queue_consumed < gap {
526                let new_ahead = ahead_raw.saturating_sub(trade_size_raw);
527                self.queue_ahead
528                    .insert(*client_order_id, (price_raw, new_ahead));
529                continue;
530            }
531
532            self.queue_ahead.insert(*client_order_id, (price_raw, 0));
533            let excess = remaining.min(*leaves_raw);
534            self.queue_excess.insert(*client_order_id, excess);
535            remaining -= excess;
536            prev_position = ahead_raw + excess;
537        }
538    }
539
540    fn determine_trade_fill_qty(&self, order: &OrderAny) -> Option<QuantityRaw> {
541        if !self.config.queue_position {
542            return Some(order.leaves_qty().raw);
543        }
544
545        let client_order_id = order.client_order_id();
546
547        // Block fills for L1 orders pending a deferred snapshot
548        if self.queue_pending.contains_key(&client_order_id) {
549            return None;
550        }
551
552        if let Some(&(tracked_price_raw, ahead_raw)) = self.queue_ahead.get(&client_order_id)
553            && let Some(order_price) = order.price()
554            && order_price.raw == tracked_price_raw
555            && ahead_raw > 0
556        {
557            // Allow fill when a current trade has crossed through the order's price
558            let crossed = self.last_trade_size.is_some()
559                && self
560                    .core
561                    .last
562                    .is_some_and(|trade_price| match order.order_side() {
563                        OrderSide::Buy => trade_price.raw < order_price.raw,
564                        OrderSide::Sell => trade_price.raw > order_price.raw,
565                        _ => false,
566                    });
567
568            if !crossed {
569                return None;
570            }
571        }
572
573        let leaves_raw = order.leaves_qty().raw;
574        if leaves_raw == 0 {
575            return None;
576        }
577
578        let mut available_raw = leaves_raw;
579
580        // Cap by remaining trade volume and queue excess (only during trade processing)
581        if let Some(trade_size) = self.last_trade_size {
582            let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
583            available_raw = available_raw.min(remaining);
584
585            if let Some(&excess_raw) = self.queue_excess.get(&client_order_id) {
586                if excess_raw == 0 {
587                    return None;
588                }
589                available_raw = available_raw.min(excess_raw);
590            }
591        }
592
593        if available_raw == 0 {
594            return None;
595        }
596
597        Some(available_raw)
598    }
599
600    fn clear_all_queue_positions(&mut self) {
601        for (_, (_, ahead_raw)) in &mut self.queue_ahead {
602            *ahead_raw = 0;
603        }
604    }
605
606    fn clear_queue_on_delete(&mut self, deleted_price_raw: PriceRaw, deleted_side: OrderSide) {
607        let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
608        for client_order_id in keys {
609            if let Some(&(order_price_raw, _)) = self.queue_ahead.get(&client_order_id)
610                && order_price_raw == deleted_price_raw
611            {
612                let cache = self.cache.borrow();
613                if let Some(order) = cache.order(&client_order_id)
614                    && order.order_side() == deleted_side
615                {
616                    drop(cache);
617                    self.queue_ahead
618                        .insert(client_order_id, (order_price_raw, 0));
619                }
620            }
621        }
622    }
623
624    fn cap_queue_ahead(
625        &mut self,
626        price_raw: PriceRaw,
627        size_raw: QuantityRaw,
628        order_side: OrderSide,
629    ) {
630        let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
631        let mut stale: Vec<ClientOrderId> = Vec::new();
632
633        for client_order_id in keys {
634            let (order_price_raw, ahead_raw) = match self.queue_ahead.get(&client_order_id).copied()
635            {
636                Some(v) => v,
637                None => continue,
638            };
639
640            if order_price_raw != price_raw || ahead_raw <= size_raw {
641                continue;
642            }
643
644            let cache = self.cache.borrow();
645            let order_info = cache.order(&client_order_id).and_then(|order| {
646                if order.is_closed() {
647                    None
648                } else {
649                    Some(order.order_side())
650                }
651            });
652            drop(cache);
653
654            let Some(side) = order_info else {
655                stale.push(client_order_id);
656                continue;
657            };
658
659            if side != order_side {
660                continue;
661            }
662
663            self.queue_ahead
664                .insert(client_order_id, (order_price_raw, size_raw));
665        }
666
667        for id in stale {
668            self.queue_ahead.shift_remove(&id);
669        }
670    }
671
672    fn seed_tob_baseline(&mut self) {
673        let bid = self.book.best_bid_price();
674        let ask = self.book.best_ask_price();
675        self.prev_bid_price_raw = bid.map_or(0, |p| p.raw);
676        self.prev_bid_size_raw = self.book.best_bid_size().map_or(0, |q| q.raw);
677        self.prev_ask_price_raw = ask.map_or(0, |p| p.raw);
678        self.prev_ask_size_raw = self.book.best_ask_size().map_or(0, |q| q.raw);
679        self.tob_initialized = bid.is_some() || ask.is_some();
680    }
681
682    fn decrement_l1_queue_on_quote(
683        &mut self,
684        bid_price_raw: PriceRaw,
685        bid_size_raw: QuantityRaw,
686        ask_price_raw: PriceRaw,
687        ask_size_raw: QuantityRaw,
688    ) {
689        if !self.config.queue_position {
690            return;
691        }
692
693        // Price-move detection requires a valid prior TOB snapshot
694        if self.tob_initialized {
695            // BID side (BUY limit orders): handle price drops (crossed/snapshot)
696            if bid_price_raw < self.prev_bid_price_raw {
697                self.adjust_l1_queue_on_price_move(bid_price_raw, bid_size_raw, OrderSide::Buy);
698            }
699
700            // ASK side (SELL limit orders): handle price rises (crossed/snapshot)
701            if ask_price_raw > self.prev_ask_price_raw {
702                self.adjust_l1_queue_on_price_move(ask_price_raw, ask_size_raw, OrderSide::Sell);
703            }
704        }
705
706        // Resolve pending snapshots when BBO reaches a tracked order's price
707        self.resolve_pending_l1_snapshots(bid_price_raw, bid_size_raw, ask_price_raw, ask_size_raw);
708    }
709
710    fn adjust_l1_queue_on_price_move(
711        &mut self,
712        new_price_raw: PriceRaw,
713        new_size_raw: QuantityRaw,
714        order_side: OrderSide,
715    ) {
716        let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
717        let mut stale: Vec<ClientOrderId> = Vec::new();
718
719        for client_order_id in keys {
720            let Some(&(order_price_raw, ahead_raw)) = self.queue_ahead.get(&client_order_id) else {
721                continue;
722            };
723
724            let cache = self.cache.borrow();
725            let order_info = cache.order(&client_order_id).and_then(|order| {
726                if order.is_closed() {
727                    None
728                } else {
729                    Some(order.order_side())
730                }
731            });
732            drop(cache);
733
734            let Some(side) = order_info else {
735                stale.push(client_order_id);
736                continue;
737            };
738
739            if side != order_side {
740                continue;
741            }
742
743            // BUY orders crossed when bid drops below order price
744            // SELL orders crossed when ask rises above order price
745            let crossed = match order_side {
746                OrderSide::Buy => order_price_raw > new_price_raw,
747                _ => order_price_raw < new_price_raw,
748            };
749
750            if crossed {
751                self.queue_ahead
752                    .insert(client_order_id, (order_price_raw, 0));
753            } else if order_price_raw == new_price_raw && ahead_raw > new_size_raw {
754                self.queue_ahead
755                    .insert(client_order_id, (order_price_raw, new_size_raw));
756            }
757        }
758
759        for id in stale {
760            self.queue_ahead.shift_remove(&id);
761        }
762
763        // Also resolve pending L1 orders affected by this price move
764        let pending_keys: Vec<ClientOrderId> = self.queue_pending.keys().copied().collect();
765        let mut pending_stale: Vec<ClientOrderId> = Vec::new();
766
767        for client_order_id in pending_keys {
768            let Some(&order_price_raw) = self.queue_pending.get(&client_order_id) else {
769                continue;
770            };
771
772            let cache = self.cache.borrow();
773            let order_info = cache.order(&client_order_id).and_then(|order| {
774                if order.is_closed() {
775                    None
776                } else {
777                    Some(order.order_side())
778                }
779            });
780            drop(cache);
781
782            let Some(side) = order_info else {
783                pending_stale.push(client_order_id);
784                continue;
785            };
786
787            if side != order_side {
788                continue;
789            }
790
791            let crossed = match order_side {
792                OrderSide::Buy => order_price_raw > new_price_raw,
793                _ => order_price_raw < new_price_raw,
794            };
795
796            if crossed {
797                self.queue_pending.shift_remove(&client_order_id);
798                self.queue_ahead
799                    .insert(client_order_id, (order_price_raw, 0));
800            } else if order_price_raw == new_price_raw {
801                self.queue_pending.shift_remove(&client_order_id);
802                self.queue_ahead
803                    .insert(client_order_id, (order_price_raw, new_size_raw));
804            }
805        }
806
807        for id in pending_stale {
808            self.queue_pending.shift_remove(&id);
809        }
810    }
811
812    fn resolve_pending_l1_snapshots(
813        &mut self,
814        bid_price_raw: PriceRaw,
815        bid_size_raw: QuantityRaw,
816        ask_price_raw: PriceRaw,
817        ask_size_raw: QuantityRaw,
818    ) {
819        let keys: Vec<ClientOrderId> = self.queue_pending.keys().copied().collect();
820        let mut stale: Vec<ClientOrderId> = Vec::new();
821
822        for client_order_id in keys {
823            let Some(&order_price_raw) = self.queue_pending.get(&client_order_id) else {
824                continue;
825            };
826
827            let cache = self.cache.borrow();
828            let order_info = cache.order(&client_order_id).and_then(|order| {
829                if order.is_closed() {
830                    None
831                } else {
832                    Some(order.order_side())
833                }
834            });
835            drop(cache);
836
837            let Some(side) = order_info else {
838                stale.push(client_order_id);
839                continue;
840            };
841
842            // Initialize snapshot when BBO reaches the order's price level
843            let matched_size = match side {
844                OrderSide::Buy if order_price_raw == bid_price_raw => Some(bid_size_raw),
845                OrderSide::Sell if order_price_raw == ask_price_raw => Some(ask_size_raw),
846                _ => None,
847            };
848
849            if let Some(size) = matched_size {
850                self.queue_pending.shift_remove(&client_order_id);
851                self.queue_ahead
852                    .insert(client_order_id, (order_price_raw, size));
853            }
854        }
855
856        for id in stale {
857            self.queue_pending.shift_remove(&id);
858        }
859    }
860
861    fn resolve_pending_on_trade(&mut self, trade_price_raw: PriceRaw) {
862        let keys: Vec<ClientOrderId> = self.queue_pending.keys().copied().collect();
863        let mut stale: Vec<ClientOrderId> = Vec::new();
864
865        for client_order_id in keys {
866            let Some(&order_price_raw) = self.queue_pending.get(&client_order_id) else {
867                continue;
868            };
869
870            let cache = self.cache.borrow();
871            let order_side = cache.order(&client_order_id).and_then(|order| {
872                if order.is_closed() {
873                    None
874                } else {
875                    Some(order.order_side())
876                }
877            });
878            drop(cache);
879
880            let Some(side) = order_side else {
881                stale.push(client_order_id);
882                continue;
883            };
884
885            // Trade through a pending level proves the queue was crossed
886            let crossed = match side {
887                OrderSide::Buy => trade_price_raw < order_price_raw,
888                OrderSide::Sell => trade_price_raw > order_price_raw,
889                _ => false,
890            };
891
892            if crossed {
893                self.queue_pending.shift_remove(&client_order_id);
894                self.queue_ahead
895                    .insert(client_order_id, (order_price_raw, 0));
896            }
897        }
898
899        for id in stale {
900            self.queue_pending.shift_remove(&id);
901        }
902    }
903
904    #[must_use]
905    /// Returns the best bid price from the order book.
906    pub fn best_bid_price(&self) -> Option<Price> {
907        self.book.best_bid_price()
908    }
909
910    #[must_use]
911    /// Returns the best ask price from the order book.
912    pub fn best_ask_price(&self) -> Option<Price> {
913        self.book.best_ask_price()
914    }
915
916    #[must_use]
917    /// Returns a reference to the internal order book.
918    pub const fn get_book(&self) -> &OrderBook {
919        &self.book
920    }
921
922    #[must_use]
923    /// Returns all open bid orders managed by the matching core.
924    pub const fn get_open_bid_orders(&self) -> &[OrderMatchInfo] {
925        self.core.get_orders_bid()
926    }
927
928    #[must_use]
929    /// Returns all open ask orders managed by the matching core.
930    pub const fn get_open_ask_orders(&self) -> &[OrderMatchInfo] {
931        self.core.get_orders_ask()
932    }
933
934    #[must_use]
935    /// Returns all open orders from both bid and ask sides.
936    pub fn get_open_orders(&self) -> Vec<OrderMatchInfo> {
937        let mut orders = Vec::new();
938        orders.extend_from_slice(self.core.get_orders_bid());
939        orders.extend_from_slice(self.core.get_orders_ask());
940        orders
941    }
942
943    #[must_use]
944    /// Returns true if an order with the given client order ID exists in the matching engine.
945    pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
946        self.core.order_exists(client_order_id)
947    }
948
949    #[must_use]
950    pub const fn get_core(&self) -> &OrderMatchingCore {
951        &self.core
952    }
953
954    pub fn set_fill_at_market(&mut self, value: bool) {
955        self.fill_at_market = value;
956    }
957
958    fn check_price_precision(&self, actual: u8, field: &str) -> anyhow::Result<()> {
959        let expected = self.instrument.price_precision();
960        if actual != expected {
961            anyhow::bail!(
962                "Invalid {field} precision {actual}, expected {expected} for {}",
963                self.instrument.id()
964            );
965        }
966        Ok(())
967    }
968
969    fn check_size_precision(&self, actual: u8, field: &str) -> anyhow::Result<()> {
970        let expected = self.instrument.size_precision();
971        if actual != expected {
972            anyhow::bail!(
973                "Invalid {field} precision {actual}, expected {expected} for {}",
974                self.instrument.id()
975            );
976        }
977        Ok(())
978    }
979
980    /// Process the venues market for the given order book delta.
981    ///
982    /// # Errors
983    ///
984    /// - If delta order price precision does not match the instrument (for Add/Update actions).
985    /// - If delta order size precision does not match the instrument (for Add/Update actions).
986    /// - If applying the delta to the book fails.
987    pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) -> anyhow::Result<()> {
988        log::debug!("Processing {delta}");
989
990        // Validate precision for Add and Update actions (Delete/Clear may have NULL_ORDER)
991        if matches!(delta.action, BookAction::Add | BookAction::Update) {
992            self.check_price_precision(delta.order.price.precision, "delta order price")?;
993            self.check_size_precision(delta.order.size.precision, "delta order size")?;
994        }
995
996        // L1 books are driven by top-of-book data only, ignore deltas
997        if self.book_type == BookType::L1_MBP {
998            self.iterate(delta.ts_init, AggressorSide::NoAggressor);
999            return Ok(());
1000        }
1001
1002        self.book.apply_delta(delta)?;
1003
1004        let delta_snapshot_or_clear = (delta.flags & 32) != 0 || delta.action == BookAction::Clear;
1005
1006        if self.config.queue_position {
1007            if delta_snapshot_or_clear {
1008                self.clear_all_queue_positions();
1009            } else if delta.action == BookAction::Delete {
1010                self.clear_queue_on_delete(delta.order.price.raw, delta.order.side);
1011            } else if delta.action == BookAction::Update {
1012                self.cap_queue_ahead(
1013                    delta.order.price.raw,
1014                    delta.order.size.raw,
1015                    delta.order.side,
1016                );
1017            }
1018        }
1019
1020        if self.config.queue_position && delta_snapshot_or_clear {
1021            self.seed_tob_baseline();
1022        }
1023
1024        self.iterate(delta.ts_init, AggressorSide::NoAggressor);
1025        Ok(())
1026    }
1027
1028    /// Process the venues market for the given order book deltas.
1029    ///
1030    /// # Errors
1031    ///
1032    /// - If any delta order price precision does not match the instrument (for Add/Update actions).
1033    /// - If any delta order size precision does not match the instrument (for Add/Update actions).
1034    /// - If applying the deltas to the book fails.
1035    pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
1036        log::debug!("Processing {deltas}");
1037
1038        // Validate precision for Add and Update actions (Delete/Clear may have NULL_ORDER)
1039        for delta in &deltas.deltas {
1040            if matches!(delta.action, BookAction::Add | BookAction::Update) {
1041                self.check_price_precision(delta.order.price.precision, "delta order price")?;
1042                self.check_size_precision(delta.order.size.precision, "delta order size")?;
1043            }
1044        }
1045
1046        // L1 books are driven by top-of-book data only, ignore deltas
1047        if self.book_type == BookType::L1_MBP {
1048            self.iterate(deltas.ts_init, AggressorSide::NoAggressor);
1049            return Ok(());
1050        }
1051
1052        self.book.apply_deltas(deltas)?;
1053
1054        let mut has_snapshot_or_clear = false;
1055
1056        if self.config.queue_position {
1057            for delta in &deltas.deltas {
1058                if (delta.flags & 32) != 0 || delta.action == BookAction::Clear {
1059                    self.clear_all_queue_positions();
1060                    has_snapshot_or_clear = true;
1061                    break;
1062                } else if delta.action == BookAction::Delete {
1063                    self.clear_queue_on_delete(delta.order.price.raw, delta.order.side);
1064                } else if delta.action == BookAction::Update {
1065                    self.cap_queue_ahead(
1066                        delta.order.price.raw,
1067                        delta.order.size.raw,
1068                        delta.order.side,
1069                    );
1070                }
1071            }
1072        }
1073
1074        if self.config.queue_position && has_snapshot_or_clear {
1075            self.seed_tob_baseline();
1076        }
1077
1078        self.iterate(deltas.ts_init, AggressorSide::NoAggressor);
1079        Ok(())
1080    }
1081
1082    /// Process the venues market for the given order book depth10.
1083    ///
1084    /// # Errors
1085    ///
1086    /// - If any bid/ask price precision does not match the instrument.
1087    /// - If any bid/ask size precision does not match the instrument.
1088    /// - If applying the depth to the book fails.
1089    /// - If updating the L1 order book with the top-of-book quote fails.
1090    pub fn process_order_book_depth10(&mut self, depth: &OrderBookDepth10) -> anyhow::Result<()> {
1091        log::debug!("Processing OrderBookDepth10 for {}", depth.instrument_id);
1092
1093        // Validate precision for non-padding entries
1094        for order in &depth.bids {
1095            if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
1096                continue;
1097            }
1098            self.check_price_precision(order.price.precision, "bid price")?;
1099            self.check_size_precision(order.size.precision, "bid size")?;
1100        }
1101
1102        for order in &depth.asks {
1103            if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
1104                continue;
1105            }
1106            self.check_price_precision(order.price.precision, "ask price")?;
1107            self.check_size_precision(order.size.precision, "ask size")?;
1108        }
1109
1110        // For L1 books, only apply top-of-book to avoid mispricing
1111        // against worst-level entries when full depth is applied
1112        if self.book_type == BookType::L1_MBP {
1113            let quote = QuoteTick::new(
1114                depth.instrument_id,
1115                depth.bids[0].price,
1116                depth.asks[0].price,
1117                depth.bids[0].size,
1118                depth.asks[0].size,
1119                depth.ts_event,
1120                depth.ts_init,
1121            );
1122            self.book.update_quote_tick(&quote)?;
1123            self.last_quote_bid = Some(depth.bids[0].price);
1124            self.last_quote_ask = Some(depth.asks[0].price);
1125        } else {
1126            self.book.apply_depth(depth)?;
1127        }
1128
1129        // Depth10 always replaces the full book via apply_depth regardless of flags
1130        if self.config.queue_position {
1131            self.clear_all_queue_positions();
1132            let bid_price_raw = depth.bids[0].price.raw;
1133            let bid_size_raw = depth.bids[0].size.raw;
1134            let ask_price_raw = depth.asks[0].price.raw;
1135            let ask_size_raw = depth.asks[0].size.raw;
1136
1137            // Handle crossed/matched pending orders (same as quote path)
1138            if self.tob_initialized {
1139                if bid_price_raw < self.prev_bid_price_raw {
1140                    self.adjust_l1_queue_on_price_move(bid_price_raw, bid_size_raw, OrderSide::Buy);
1141                }
1142
1143                if ask_price_raw > self.prev_ask_price_raw {
1144                    self.adjust_l1_queue_on_price_move(
1145                        ask_price_raw,
1146                        ask_size_raw,
1147                        OrderSide::Sell,
1148                    );
1149                }
1150            }
1151
1152            self.resolve_pending_l1_snapshots(
1153                bid_price_raw,
1154                bid_size_raw,
1155                ask_price_raw,
1156                ask_size_raw,
1157            );
1158
1159            self.prev_bid_price_raw = bid_price_raw;
1160            self.prev_bid_size_raw = bid_size_raw;
1161            self.prev_ask_price_raw = ask_price_raw;
1162            self.prev_ask_size_raw = ask_size_raw;
1163            self.tob_initialized = true;
1164        }
1165
1166        self.iterate(depth.ts_init, AggressorSide::NoAggressor);
1167        Ok(())
1168    }
1169
1170    /// Processes a quote tick to update the market state.
1171    ///
1172    /// # Panics
1173    ///
1174    /// - If updating the order book with the quote tick fails.
1175    /// - If bid/ask price precision does not match the instrument.
1176    /// - If bid/ask size precision does not match the instrument.
1177    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
1178        log::debug!("Processing {quote}");
1179
1180        self.check_price_precision(quote.bid_price.precision, "bid_price")
1181            .unwrap();
1182        self.check_price_precision(quote.ask_price.precision, "ask_price")
1183            .unwrap();
1184        self.check_size_precision(quote.bid_size.precision, "bid_size")
1185            .unwrap();
1186        self.check_size_precision(quote.ask_size.precision, "ask_size")
1187            .unwrap();
1188
1189        if self.book_type == BookType::L1_MBP {
1190            // Stale update: skip book mutation and cache updates
1191            if quote.ts_event < self.book.ts_last {
1192                log::warn!(
1193                    "Skipping stale quote: ts_event {} < book.ts_last {} for {}",
1194                    quote.ts_event,
1195                    self.book.ts_last,
1196                    self.book.instrument_id,
1197                );
1198                self.iterate(quote.ts_init, AggressorSide::NoAggressor);
1199                return;
1200            }
1201
1202            if self.config.queue_position {
1203                self.decrement_l1_queue_on_quote(
1204                    quote.bid_price.raw,
1205                    quote.bid_size.raw,
1206                    quote.ask_price.raw,
1207                    quote.ask_size.raw,
1208                );
1209                self.prev_bid_price_raw = quote.bid_price.raw;
1210                self.prev_bid_size_raw = quote.bid_size.raw;
1211                self.prev_ask_price_raw = quote.ask_price.raw;
1212                self.prev_ask_size_raw = quote.ask_size.raw;
1213                self.tob_initialized = true;
1214            }
1215            self.book.update_quote_tick(quote).unwrap();
1216            self.last_quote_bid = Some(quote.bid_price);
1217            self.last_quote_ask = Some(quote.ask_price);
1218        }
1219
1220        self.iterate(quote.ts_init, AggressorSide::NoAggressor);
1221    }
1222
1223    /// Processes a bar and simulates market dynamics by creating synthetic ticks.
1224    ///
1225    /// For L1 books with bar execution enabled, generates synthetic trade or quote
1226    /// ticks from bar OHLC data to drive order matching.
1227    ///
1228    /// # Panics
1229    ///
1230    /// - If the bar type configuration is missing a time delta.
1231    /// - If bar OHLC price precision does not match the instrument.
1232    /// - If bar volume precision does not match the instrument.
1233    pub fn process_bar(&mut self, bar: &Bar) {
1234        log::debug!("Processing {bar}");
1235
1236        // Check if configured for bar execution can only process an L1 book with bars
1237        if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
1238            return;
1239        }
1240
1241        let bar_type = bar.bar_type;
1242        // Do not process internally aggregated bars
1243        if bar_type.aggregation_source() == AggregationSource::Internal {
1244            return;
1245        }
1246
1247        self.check_price_precision(bar.open.precision, "bar open")
1248            .unwrap();
1249        self.check_price_precision(bar.high.precision, "bar high")
1250            .unwrap();
1251        self.check_price_precision(bar.low.precision, "bar low")
1252            .unwrap();
1253        self.check_price_precision(bar.close.precision, "bar close")
1254            .unwrap();
1255        self.check_size_precision(bar.volume.precision, "bar volume")
1256            .unwrap();
1257
1258        let execution_bar_type =
1259            if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
1260                execution_bar_type.to_owned()
1261            } else {
1262                self.execution_bar_types
1263                    .insert(bar.instrument_id(), bar_type);
1264                self.execution_bar_deltas
1265                    .insert(bar_type, bar_type.spec().timedelta());
1266                bar_type
1267            };
1268
1269        if execution_bar_type != bar_type {
1270            let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
1271            if bar_type_timedelta.is_none() {
1272                bar_type_timedelta = Some(bar_type.spec().timedelta());
1273                self.execution_bar_deltas
1274                    .insert(bar_type, bar_type_timedelta.unwrap());
1275            }
1276
1277            if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
1278                >= &bar_type_timedelta.unwrap()
1279            {
1280                self.execution_bar_types
1281                    .insert(bar_type.instrument_id(), bar_type);
1282            } else {
1283                return;
1284            }
1285        }
1286
1287        match bar_type.spec().price_type {
1288            PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
1289            PriceType::Bid => {
1290                self.last_bar_bid = Some(bar.to_owned());
1291                self.process_quote_ticks_from_bar(bar);
1292            }
1293            PriceType::Ask => {
1294                self.last_bar_ask = Some(bar.to_owned());
1295                self.process_quote_ticks_from_bar(bar);
1296            }
1297            PriceType::Mark => panic!("Not implemented"),
1298        }
1299    }
1300
1301    fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
1302        // Split the bar into 4 trades, adding remainder to close trade
1303        let quarter_raw = bar.volume.raw / 4;
1304        let remainder_raw = bar.volume.raw % 4;
1305        let size = Quantity::from_raw(quarter_raw, bar.volume.precision);
1306        let close_size = Quantity::from_raw(quarter_raw + remainder_raw, bar.volume.precision);
1307
1308        let aggressor_side = if !self.core.is_last_initialized || bar.open > self.core.last.unwrap()
1309        {
1310            AggressorSide::Buyer
1311        } else {
1312            AggressorSide::Seller
1313        };
1314
1315        // Create reusable trade tick
1316        let mut trade_tick = TradeTick::new(
1317            bar.instrument_id(),
1318            bar.open,
1319            size,
1320            aggressor_side,
1321            self.ids_generator.generate_trade_id(bar.ts_init),
1322            bar.ts_init,
1323            bar.ts_init,
1324        );
1325
1326        // Open: fill at market price (gap from previous bar)
1327        if !self.core.is_last_initialized {
1328            self.fill_at_market = true;
1329            self.book.update_trade_tick(&trade_tick).unwrap();
1330            self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
1331            self.core.set_last_raw(trade_tick.price);
1332        } else if self.core.last.is_some_and(|last| bar.open != last) {
1333            // Gap between previous close and this bar's open
1334            self.fill_at_market = true;
1335            self.book.update_trade_tick(&trade_tick).unwrap();
1336            self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
1337            self.core.set_last_raw(trade_tick.price);
1338        }
1339
1340        // Determine high/low processing order.
1341        // Default: O→H→L→C. With adaptive ordering, swap if low is closer to open.
1342        let high_first = !self.config.bar_adaptive_high_low_ordering
1343            || (bar.high.raw - bar.open.raw).abs() < (bar.low.raw - bar.open.raw).abs();
1344
1345        if high_first {
1346            self.process_bar_high(&mut trade_tick, bar);
1347            self.process_bar_low(&mut trade_tick, bar);
1348        } else {
1349            self.process_bar_low(&mut trade_tick, bar);
1350            self.process_bar_high(&mut trade_tick, bar);
1351        }
1352
1353        // Close: fill at trigger price (market moving through prices)
1354        if self.core.last.is_some_and(|last| bar.close != last) {
1355            self.fill_at_market = false;
1356            trade_tick.price = bar.close;
1357            trade_tick.size = close_size;
1358
1359            if bar.close > self.core.last.unwrap() {
1360                trade_tick.aggressor_side = AggressorSide::Buyer;
1361            } else {
1362                trade_tick.aggressor_side = AggressorSide::Seller;
1363            }
1364            trade_tick.trade_id = self.ids_generator.generate_trade_id(trade_tick.ts_init);
1365
1366            self.book.update_trade_tick(&trade_tick).unwrap();
1367            self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
1368
1369            self.core.set_last_raw(trade_tick.price);
1370        }
1371
1372        self.fill_at_market = true;
1373    }
1374
1375    fn process_bar_high(&mut self, trade_tick: &mut TradeTick, bar: &Bar) {
1376        if self.core.last.is_some_and(|last| bar.high > last) {
1377            self.fill_at_market = false;
1378            trade_tick.price = bar.high;
1379            trade_tick.aggressor_side = AggressorSide::Buyer;
1380            trade_tick.trade_id = self.ids_generator.generate_trade_id(trade_tick.ts_init);
1381
1382            self.book.update_trade_tick(trade_tick).unwrap();
1383            self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
1384
1385            self.core.set_last_raw(trade_tick.price);
1386        }
1387    }
1388
1389    fn process_bar_low(&mut self, trade_tick: &mut TradeTick, bar: &Bar) {
1390        if self.core.last.is_some_and(|last| bar.low < last) {
1391            self.fill_at_market = false;
1392            trade_tick.price = bar.low;
1393            trade_tick.aggressor_side = AggressorSide::Seller;
1394            trade_tick.trade_id = self.ids_generator.generate_trade_id(trade_tick.ts_init);
1395
1396            self.book.update_trade_tick(trade_tick).unwrap();
1397            self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
1398
1399            self.core.set_last_raw(trade_tick.price);
1400        }
1401    }
1402
1403    fn process_quote_ticks_from_bar(&mut self, bar: &Bar) {
1404        // Wait for next bar
1405        if self.last_bar_bid.is_none()
1406            || self.last_bar_ask.is_none()
1407            || self.last_bar_bid.unwrap().ts_init != self.last_bar_ask.unwrap().ts_init
1408        {
1409            return;
1410        }
1411        let bid_bar = self.last_bar_bid.unwrap();
1412        let ask_bar = self.last_bar_ask.unwrap();
1413
1414        // Split bar volume into 4, adding remainder to close quote
1415        let bid_quarter = bid_bar.volume.raw / 4;
1416        let bid_remainder = bid_bar.volume.raw % 4;
1417        let ask_quarter = ask_bar.volume.raw / 4;
1418        let ask_remainder = ask_bar.volume.raw % 4;
1419
1420        let bid_size = Quantity::from_raw(bid_quarter, bar.volume.precision);
1421        let ask_size = Quantity::from_raw(ask_quarter, bar.volume.precision);
1422        let bid_close_size = Quantity::from_raw(bid_quarter + bid_remainder, bar.volume.precision);
1423        let ask_close_size = Quantity::from_raw(ask_quarter + ask_remainder, bar.volume.precision);
1424
1425        // Create reusable quote tick
1426        let mut quote_tick = QuoteTick::new(
1427            self.book.instrument_id,
1428            bid_bar.open,
1429            ask_bar.open,
1430            bid_size,
1431            ask_size,
1432            bid_bar.ts_init,
1433            bid_bar.ts_init,
1434        );
1435
1436        // Open: fill at market price (gap from previous bar)
1437        self.fill_at_market = true;
1438        self.book.update_quote_tick(&quote_tick).unwrap();
1439        self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
1440
1441        // High: fill at trigger price (market moving through prices)
1442        self.fill_at_market = false;
1443        quote_tick.bid_price = bid_bar.high;
1444        quote_tick.ask_price = ask_bar.high;
1445        self.book.update_quote_tick(&quote_tick).unwrap();
1446        self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
1447
1448        // Low: fill at trigger price (market moving through prices)
1449        self.fill_at_market = false;
1450        quote_tick.bid_price = bid_bar.low;
1451        quote_tick.ask_price = ask_bar.low;
1452        self.book.update_quote_tick(&quote_tick).unwrap();
1453        self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
1454
1455        // Close: fill at trigger price (market moving through prices)
1456        self.fill_at_market = false;
1457        quote_tick.bid_price = bid_bar.close;
1458        quote_tick.ask_price = ask_bar.close;
1459        quote_tick.bid_size = bid_close_size;
1460        quote_tick.ask_size = ask_close_size;
1461        self.book.update_quote_tick(&quote_tick).unwrap();
1462        self.last_quote_bid = Some(bid_bar.close);
1463        self.last_quote_ask = Some(ask_bar.close);
1464        self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
1465
1466        self.last_bar_bid = None;
1467        self.last_bar_ask = None;
1468        self.fill_at_market = true;
1469    }
1470
1471    /// Processes a trade tick to update the market state.
1472    ///
1473    /// For L1 books, always updates the order book with the trade tick to maintain
1474    /// market state. When `trade_execution` is disabled, order matching and maintenance
1475    /// operations (GTD order expiry, trailing stop activation, instrument expiration)
1476    /// are skipped. These maintenance operations will run on the next quote tick or bar.
1477    ///
1478    /// # Panics
1479    ///
1480    /// - If updating the order book with the trade tick fails.
1481    /// - If trade price precision does not match the instrument.
1482    /// - If trade size precision does not match the instrument.
1483    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
1484        log::debug!("Processing {trade}");
1485
1486        self.check_price_precision(trade.price.precision, "trade price")
1487            .unwrap();
1488        self.check_size_precision(trade.size.precision, "trade size")
1489            .unwrap();
1490
1491        let price_raw = trade.price.raw;
1492
1493        if self.book_type == BookType::L1_MBP {
1494            // Stale update: skip book mutation and trade execution
1495            if trade.ts_event < self.book.ts_last {
1496                log::warn!(
1497                    "Skipping stale trade: ts_event {} < book.ts_last {} for {}",
1498                    trade.ts_event,
1499                    self.book.ts_last,
1500                    self.book.instrument_id,
1501                );
1502                self.iterate(trade.ts_init, AggressorSide::NoAggressor);
1503                return;
1504            }
1505
1506            self.book.update_trade_tick(trade).unwrap();
1507        }
1508
1509        self.core.set_last_raw(trade.price);
1510
1511        if !self.config.trade_execution {
1512            // Sync core to L1 book, skip order matching
1513            if self.book_type == BookType::L1_MBP {
1514                if let Some(bid) = self.book.best_bid_price() {
1515                    self.core.set_bid_raw(bid);
1516                }
1517
1518                if let Some(ask) = self.book.best_ask_price() {
1519                    self.core.set_ask_raw(ask);
1520                }
1521            }
1522            return;
1523        }
1524
1525        let aggressor_side = trade.aggressor_side;
1526
1527        match aggressor_side {
1528            AggressorSide::Buyer => {
1529                // Buyer lifted the ask: ask was at trade.price, post-trade
1530                // ask is at least this level (only widen)
1531                if self.core.ask.is_none() || price_raw > self.core.ask.map_or(0, |p| p.raw) {
1532                    self.core.set_ask_raw(trade.price);
1533                }
1534
1535                // Initialize bid from first trade if needed
1536                if self.core.bid.is_none() {
1537                    self.core.set_bid_raw(trade.price);
1538                }
1539            }
1540            AggressorSide::Seller => {
1541                // Seller hit the bid: bid was at trade.price, post-trade
1542                // bid is at most this level (only narrow)
1543                if self.core.bid.is_none()
1544                    || price_raw < self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1545                {
1546                    self.core.set_bid_raw(trade.price);
1547                }
1548
1549                // Initialize ask from first trade if needed
1550                if self.core.ask.is_none() {
1551                    self.core.set_ask_raw(trade.price);
1552                }
1553            }
1554            AggressorSide::NoAggressor => {
1555                if self.core.bid.is_none()
1556                    || price_raw <= self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1557                {
1558                    self.core.set_bid_raw(trade.price);
1559                }
1560
1561                if self.core.ask.is_none() || price_raw >= self.core.ask.map_or(0, |p| p.raw) {
1562                    self.core.set_ask_raw(trade.price);
1563                }
1564            }
1565        }
1566
1567        let original_bid = self.core.bid;
1568        let original_ask = self.core.ask;
1569
1570        match aggressor_side {
1571            AggressorSide::Seller => {
1572                if original_ask.is_some_and(|ask| price_raw < ask.raw) {
1573                    self.core.set_ask_raw(trade.price);
1574                }
1575            }
1576            AggressorSide::Buyer => {
1577                if original_bid.is_some_and(|bid| price_raw > bid.raw) {
1578                    self.core.set_bid_raw(trade.price);
1579                }
1580            }
1581            AggressorSide::NoAggressor => {
1582                // Force both sides to trade price (parity with Cython)
1583                self.core.set_bid_raw(trade.price);
1584                self.core.set_ask_raw(trade.price);
1585            }
1586        }
1587
1588        self.last_trade_size = Some(trade.size);
1589        self.trade_consumption = 0;
1590
1591        if self.config.liquidity_consumption && self.book_type != BookType::L1_MBP {
1592            self.seed_trade_consumption(price_raw, trade.size.raw, trade.ts_event, aggressor_side);
1593        }
1594
1595        self.resolve_pending_on_trade(price_raw);
1596        self.decrement_queue_on_trade(price_raw, trade.size.raw, aggressor_side);
1597
1598        self.iterate(trade.ts_init, aggressor_side);
1599
1600        self.last_trade_size = None;
1601        self.trade_consumption = 0;
1602
1603        // Restore the non-aggressor side after temporary trade price override.
1604        // For L2/L3 books the book has independent depth so restore from originals.
1605        // For L1_MBP restore from the last quote values (not originals, which are
1606        // polluted by iterate's L1 book sync). Without quotes, skip the restore
1607        // so the core tracks the latest trade price.
1608        if self.book_type == BookType::L1_MBP {
1609            match aggressor_side {
1610                AggressorSide::Seller => {
1611                    if let Some(ask) = self.last_quote_ask {
1612                        self.core.ask = Some(ask);
1613                    }
1614                }
1615                AggressorSide::Buyer => {
1616                    if let Some(bid) = self.last_quote_bid {
1617                        self.core.bid = Some(bid);
1618                    }
1619                }
1620                AggressorSide::NoAggressor => {}
1621            }
1622        } else {
1623            match aggressor_side {
1624                AggressorSide::Seller => {
1625                    if let Some(ask) = original_ask
1626                        && price_raw < ask.raw
1627                    {
1628                        self.core.ask = Some(ask);
1629                    }
1630                }
1631                AggressorSide::Buyer => {
1632                    if let Some(bid) = original_bid
1633                        && price_raw > bid.raw
1634                    {
1635                        self.core.bid = Some(bid);
1636                    }
1637                }
1638                AggressorSide::NoAggressor => {}
1639            }
1640        }
1641    }
1642
1643    /// Processes a market status action to update the market state.
1644    pub fn process_status(&mut self, action: MarketStatusAction) {
1645        log::debug!("Processing {action}");
1646
1647        // Check if market is closed and market opens with trading or pre-open status
1648        if self.market_status == MarketStatus::Closed
1649            && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
1650        {
1651            self.market_status = MarketStatus::Open;
1652        }
1653        // Check if market is open and market pauses
1654        if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
1655            self.market_status = MarketStatus::Paused;
1656        }
1657        // Check if market is open and market suspends
1658        if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
1659            self.market_status = MarketStatus::Suspended;
1660        }
1661        // Check if market is open and we halt or close
1662        if self.market_status == MarketStatus::Open
1663            && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
1664        {
1665            self.market_status = MarketStatus::Closed;
1666        }
1667    }
1668
1669    /// Processes an instrument close event.
1670    ///
1671    /// For `ContractExpired` close types, stores the close and triggers expiration
1672    /// processing which cancels all open orders and closes all open positions.
1673    pub fn process_instrument_close(&mut self, close: InstrumentClose) {
1674        if close.instrument_id != self.instrument.id() {
1675            log::warn!(
1676                "Received instrument close for unknown instrument_id: {}",
1677                close.instrument_id
1678            );
1679            return;
1680        }
1681
1682        if close.close_type == InstrumentCloseType::ContractExpired {
1683            self.instrument_close = Some(close);
1684            self.iterate(close.ts_init, AggressorSide::NoAggressor);
1685        }
1686    }
1687
1688    fn check_instrument_expiration(&mut self) {
1689        if self.expiration_processed || self.instrument_close.is_none() {
1690            return;
1691        }
1692
1693        self.expiration_processed = true;
1694        let close = self.instrument_close.take().unwrap();
1695        log::info!("{} reached expiration", self.instrument.id());
1696
1697        let open_orders: Vec<OrderMatchInfo> = self.get_open_orders();
1698        for order_info in &open_orders {
1699            let order = {
1700                let cache = self.cache.borrow();
1701                cache.order(&order_info.client_order_id).cloned()
1702            };
1703
1704            if let Some(order) = order {
1705                self.cancel_order(&order, None);
1706            }
1707        }
1708
1709        let instrument_id = self.instrument.id();
1710        let positions: Vec<(TraderId, StrategyId, PositionId, OrderSide, Quantity)> = {
1711            let cache = self.cache.borrow();
1712            cache
1713                .positions_open(None, Some(&instrument_id), None, None, None)
1714                .into_iter()
1715                .map(|pos| {
1716                    let closing_side = match pos.side {
1717                        PositionSide::Long => OrderSide::Sell,
1718                        PositionSide::Short => OrderSide::Buy,
1719                        _ => OrderSide::NoOrderSide,
1720                    };
1721                    (
1722                        pos.trader_id,
1723                        pos.strategy_id,
1724                        pos.id,
1725                        closing_side,
1726                        pos.quantity,
1727                    )
1728                })
1729                .collect()
1730        };
1731
1732        let ts_now = self.clock.borrow().timestamp_ns();
1733
1734        for (trader_id, strategy_id, position_id, closing_side, quantity) in positions {
1735            let client_order_id =
1736                ClientOrderId::from(format!("EXPIRATION-{}-{}", self.venue, UUID4::new()).as_str());
1737            let order = OrderAny::Market(MarketOrder::new(
1738                trader_id,
1739                strategy_id,
1740                instrument_id,
1741                client_order_id,
1742                closing_side,
1743                quantity,
1744                TimeInForce::Gtc,
1745                UUID4::new(),
1746                ts_now,
1747                true, // reduce_only
1748                false,
1749                None,
1750                None,
1751                None,
1752                None,
1753                None,
1754                None,
1755                None,
1756                Some(vec![Ustr::from(&format!(
1757                    "EXPIRATION_{}_CLOSE",
1758                    self.venue
1759                ))]),
1760            ));
1761
1762            if self
1763                .cache
1764                .borrow_mut()
1765                .add_order(order.clone(), Some(position_id), None, false)
1766                .is_err()
1767            {
1768                log::debug!("Expiration order already in cache: {client_order_id}");
1769            }
1770
1771            let venue_order_id = self.ids_generator.get_venue_order_id(&order).unwrap();
1772            self.generate_order_accepted(&order, venue_order_id);
1773            let fill_price = self.settlement_price.unwrap_or(close.close_price);
1774            self.apply_fills(
1775                &order,
1776                &[(fill_price, quantity)],
1777                LiquiditySide::Taker,
1778                Some(position_id),
1779                None,
1780            );
1781        }
1782    }
1783
1784    /// Processes a new order submission.
1785    ///
1786    /// Validates the order against instrument precision, expiration, and contingency
1787    /// rules before accepting or rejecting it.
1788    ///
1789    /// # Panics
1790    ///
1791    /// Panics if an OTO child order references a missing or non-OTO parent.
1792    pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
1793        // Validate inside a cache borrow scope, collecting any rejection
1794        // reason rather than emitting events while the borrow is held.
1795        // This avoids RefCell re-entrancy panics from synchronous event
1796        // dispatch that calls back into the execution engine.
1797        let reject_reason: Option<Ustr> = 'validate: {
1798            let cache_borrow = self.cache.as_ref().borrow();
1799
1800            if self.core.order_exists(order.client_order_id()) {
1801                break 'validate Some("Order already exists".into());
1802            }
1803
1804            // Index identifiers
1805            self.account_ids.insert(order.trader_id(), account_id);
1806
1807            // Check for instrument expiration or activation
1808            if self.instrument.has_expiration() {
1809                if let Some(activation_ns) = self.instrument.activation_ns()
1810                    && self.clock.borrow().timestamp_ns() < activation_ns
1811                {
1812                    break 'validate Some(
1813                        format!(
1814                            "Contract {} is not yet active, activation {activation_ns}",
1815                            self.instrument.id(),
1816                        )
1817                        .into(),
1818                    );
1819                }
1820
1821                if let Some(expiration_ns) = self.instrument.expiration_ns()
1822                    && self.clock.borrow().timestamp_ns() >= expiration_ns
1823                {
1824                    break 'validate Some(
1825                        format!(
1826                            "Contract {} has expired, expiration {expiration_ns}",
1827                            self.instrument.id(),
1828                        )
1829                        .into(),
1830                    );
1831                }
1832            }
1833
1834            // Contingent orders checks
1835            if self.config.support_contingent_orders {
1836                if let Some(parent_order_id) = order.parent_order_id() {
1837                    let parent_order = cache_borrow.order(&parent_order_id);
1838                    if parent_order.is_none()
1839                        || parent_order.unwrap().contingency_type().unwrap() != ContingencyType::Oto
1840                    {
1841                        panic!("OTO parent not found");
1842                    }
1843
1844                    if let Some(parent_order) = parent_order {
1845                        if parent_order.status() == OrderStatus::Rejected && order.is_open() {
1846                            break 'validate Some(
1847                                format!("Rejected OTO order from {parent_order_id}").into(),
1848                            );
1849                        } else if parent_order.status() == OrderStatus::Accepted
1850                            || parent_order.status() == OrderStatus::Triggered
1851                            || (self.config.oto_full_trigger
1852                                && parent_order.status() == OrderStatus::PartiallyFilled)
1853                        {
1854                            log::info!(
1855                                "Pending OTO order {} triggers from {parent_order_id}",
1856                                order.client_order_id(),
1857                            );
1858                            return;
1859                        }
1860                    }
1861                }
1862
1863                if let Some(linked_order_ids) = order.linked_order_ids() {
1864                    for client_order_id in linked_order_ids {
1865                        match cache_borrow.order(client_order_id) {
1866                            Some(contingent_order)
1867                                if (order.contingency_type().unwrap() == ContingencyType::Oco
1868                                    || order.contingency_type().unwrap()
1869                                        == ContingencyType::Ouo)
1870                                    && !order.is_closed()
1871                                    && contingent_order.is_closed() =>
1872                            {
1873                                break 'validate Some(
1874                                    format!("Contingent order {client_order_id} already closed")
1875                                        .into(),
1876                                );
1877                            }
1878                            None => panic!("Cannot find contingent order for {client_order_id}"),
1879                            _ => {}
1880                        }
1881                    }
1882                }
1883            }
1884
1885            // Check for valid order quantity precision
1886            if order.quantity().precision != self.instrument.size_precision() {
1887                break 'validate Some(
1888                    format!(
1889                        "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
1890                        order.client_order_id(),
1891                        order.quantity().precision,
1892                        self.instrument.id(),
1893                        self.instrument.size_precision()
1894                    )
1895                    .into(),
1896                );
1897            }
1898
1899            // Check for valid order price precision
1900            if let Some(price) = order.price()
1901                && price.precision != self.instrument.price_precision()
1902            {
1903                break 'validate Some(
1904                    format!(
1905                        "Invalid order price precision for order {}, was {} when {} price precision is {}",
1906                        order.client_order_id(),
1907                        price.precision,
1908                        self.instrument.id(),
1909                        self.instrument.price_precision()
1910                    )
1911                    .into(),
1912                );
1913            }
1914
1915            // Check for valid order trigger price precision
1916            if let Some(trigger_price) = order.trigger_price()
1917                && trigger_price.precision != self.instrument.price_precision()
1918            {
1919                break 'validate Some(
1920                    format!(
1921                        "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
1922                        order.client_order_id(),
1923                        trigger_price.precision,
1924                        self.instrument.id(),
1925                        self.instrument.price_precision()
1926                    )
1927                    .into(),
1928                );
1929            }
1930
1931            // Get position if exists
1932            let position: Option<&Position> = cache_borrow
1933                .position_for_order(&order.client_order_id())
1934                .or_else(|| {
1935                    if self.oms_type == OmsType::Netting {
1936                        let position_id = PositionId::new(
1937                            format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
1938                        );
1939                        cache_borrow.position(&position_id)
1940                    } else {
1941                        None
1942                    }
1943                });
1944
1945            // Check not shorting an equity without a MARGIN account
1946            if order.order_side() == OrderSide::Sell
1947                && self.account_type != AccountType::Margin
1948                && matches!(self.instrument, InstrumentAny::Equity(_))
1949                && (position.is_none()
1950                    || !order.would_reduce_only(position.unwrap().side, position.unwrap().quantity))
1951            {
1952                let position_string = position.map_or("None".to_string(), |pos| pos.id.to_string());
1953                break 'validate Some(
1954                    format!(
1955                        "Short selling not permitted on a CASH account with position {position_string} and order {order}",
1956                    )
1957                    .into(),
1958                );
1959            }
1960
1961            // Check reduce-only instruction
1962            if self.config.use_reduce_only
1963                && order.is_reduce_only()
1964                && !order.is_closed()
1965                && position.is_none_or(|pos| {
1966                    pos.is_closed()
1967                        || (order.is_buy() && pos.is_long())
1968                        || (order.is_sell() && pos.is_short())
1969                })
1970            {
1971                break 'validate Some(
1972                    format!(
1973                        "Reduce-only order {} ({}-{}) would have increased position",
1974                        order.client_order_id(),
1975                        order.order_type().to_string().to_uppercase(),
1976                        order.order_side().to_string().to_uppercase()
1977                    )
1978                    .into(),
1979                );
1980            }
1981
1982            None
1983        };
1984
1985        if let Some(reason) = reject_reason {
1986            self.generate_order_rejected(order, reason);
1987            return;
1988        }
1989
1990        // Convert quote-denominated quantity to base quantity for non-inverse instruments.
1991        // Mirrors live venue semantics where the quote notional is settled into a base
1992        // quantity before the order enters normal fill and state handling. Without this
1993        // conversion the book simulation would treat the quote notional as base size.
1994        // Only applies to order types with a reliable reference price at submission;
1995        // trigger-style market orders and trailing orders are left untouched so they
1996        // convert at fill time from the actual (possibly-trailed) price.
1997        if order.is_quote_quantity()
1998            && !self.instrument.is_inverse()
1999            && !matches!(
2000                order.order_type(),
2001                OrderType::TrailingStopLimit | OrderType::TrailingStopMarket,
2002            )
2003            && (order.price().is_some()
2004                || matches!(
2005                    order.order_type(),
2006                    OrderType::Market | OrderType::MarketToLimit,
2007                ))
2008            && !self.convert_quote_to_base_quantity(order)
2009        {
2010            return;
2011        }
2012
2013        match order.order_type() {
2014            OrderType::Market => self.process_market_order(order),
2015            OrderType::Limit => self.process_limit_order(order),
2016            OrderType::MarketToLimit => self.process_market_to_limit_order(order),
2017            OrderType::StopMarket => self.process_stop_market_order(order),
2018            OrderType::StopLimit => self.process_stop_limit_order(order),
2019            OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
2020            OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
2021            OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
2022            OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
2023        }
2024    }
2025
2026    fn convert_quote_to_base_quantity(&self, order: &mut OrderAny) -> bool {
2027        // Pick a reference price to convert the quote notional into a base quantity.
2028        // Priced orders use their own price (worst-case execution); marketable orders
2029        // use the best opposing book level.
2030        let reference_price = if let Some(price) = order.price() {
2031            Some(price)
2032        } else {
2033            match order.order_side() {
2034                OrderSide::Buy => self.core.ask,
2035                OrderSide::Sell => self.core.bid,
2036                OrderSide::NoOrderSide => None,
2037            }
2038        };
2039
2040        let Some(reference_price) = reference_price else {
2041            self.generate_order_rejected(
2042                order,
2043                format!(
2044                    "No market for {} to convert quote quantity to base",
2045                    order.instrument_id(),
2046                )
2047                .into(),
2048            );
2049            return false;
2050        };
2051
2052        let base_quantity = self
2053            .instrument
2054            .calculate_base_quantity(order.quantity(), reference_price);
2055
2056        let ts_now = self.clock.borrow().timestamp_ns();
2057        let event = OrderEventAny::Updated(OrderUpdated::new(
2058            order.trader_id(),
2059            order.strategy_id(),
2060            order.instrument_id(),
2061            order.client_order_id(),
2062            base_quantity,
2063            UUID4::new(),
2064            ts_now,
2065            ts_now,
2066            false,
2067            order.venue_order_id(),
2068            order.account_id(),
2069            None,
2070            None,
2071            None,
2072            false,
2073        ));
2074
2075        // Apply the update to the local order so subsequent dispatch uses the base
2076        // quantity immediately (the event is also dispatched to the execution engine
2077        // for cache reconciliation).
2078        if let Err(e) = order.apply(event.clone()) {
2079            log::error!(
2080                "Failed to apply quote-to-base update for {}: {e}",
2081                order.client_order_id(),
2082            );
2083            return false;
2084        }
2085        self.dispatch_order_event(event);
2086        true
2087    }
2088
2089    /// Processes an order modify command to update quantity, price, or trigger price.
2090    pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
2091        if !self.core.order_exists(command.client_order_id) {
2092            self.generate_order_modify_rejected(
2093                command.trader_id,
2094                command.strategy_id,
2095                command.instrument_id,
2096                command.client_order_id,
2097                Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
2098                command.venue_order_id,
2099                Some(account_id),
2100            );
2101            return;
2102        }
2103
2104        let mut order = match self.cache.borrow().order(&command.client_order_id).cloned() {
2105            Some(order) => order,
2106            None => {
2107                log::error!(
2108                    "Cannot modify order: order {} not found in cache",
2109                    command.client_order_id
2110                );
2111                return;
2112            }
2113        };
2114
2115        let update_success = self.update_order(
2116            &mut order,
2117            command.quantity,
2118            command.price,
2119            command.trigger_price,
2120            None,
2121        );
2122
2123        // Only persist changes if update succeeded and order is still open
2124        if update_success && order.is_open() {
2125            let _ = self.core.delete_order(command.client_order_id);
2126            let match_info = OrderMatchInfo::new(
2127                order.client_order_id(),
2128                order.order_side().as_specified(),
2129                order.order_type(),
2130                order.trigger_price(),
2131                order.price(),
2132                true,
2133            );
2134            self.core.add_order(match_info);
2135
2136            if self.config.queue_position
2137                && let Some(new_price) = order.price()
2138            {
2139                self.snapshot_queue_position(&order, new_price);
2140                self.queue_excess.swap_remove(&order.client_order_id());
2141            }
2142        }
2143    }
2144
2145    /// Processes an order cancel command.
2146    pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
2147        if !self.core.order_exists(command.client_order_id) {
2148            self.generate_order_cancel_rejected(
2149                command.trader_id,
2150                command.strategy_id,
2151                account_id,
2152                command.instrument_id,
2153                command.client_order_id,
2154                command.venue_order_id,
2155                Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
2156            );
2157            return;
2158        }
2159
2160        let order = match self.cache.borrow().order(&command.client_order_id).cloned() {
2161            Some(order) => order,
2162            None => {
2163                log::error!(
2164                    "Cannot cancel order: order {} not found in cache",
2165                    command.client_order_id
2166                );
2167                return;
2168            }
2169        };
2170
2171        if !order.is_inflight() && !order.is_open() {
2172            self.purge_stale_core_entry(command.client_order_id);
2173            return;
2174        }
2175
2176        self.cancel_order(&order, None);
2177    }
2178
2179    /// Processes a cancel all orders command for an instrument.
2180    pub fn process_cancel_all(&mut self, command: &CancelAllOrders, _account_id: AccountId) {
2181        let instrument_id = command.instrument_id;
2182        let order_side = if command.order_side == OrderSide::NoOrderSide {
2183            None
2184        } else {
2185            Some(command.order_side)
2186        };
2187
2188        let client_order_ids: Vec<ClientOrderId> = self
2189            .cache
2190            .borrow()
2191            .orders_open(None, Some(&instrument_id), None, None, order_side)
2192            .iter()
2193            .map(|o| o.client_order_id())
2194            .collect();
2195
2196        for client_order_id in client_order_ids {
2197            let order = match self.cache.borrow().order(&client_order_id).cloned() {
2198                Some(order) => order,
2199                None => continue,
2200            };
2201
2202            if !order.is_inflight() && !order.is_open() {
2203                self.purge_stale_core_entry(client_order_id);
2204                continue;
2205            }
2206
2207            self.cancel_order(&order, None);
2208        }
2209    }
2210
2211    // Removes a closed order's stale entry from the matching core so the next
2212    // `iterate_bids/asks` does not produce a spurious fill action.
2213    fn purge_stale_core_entry(&mut self, client_order_id: ClientOrderId) {
2214        if self.core.order_exists(client_order_id) {
2215            let _ = self.core.delete_order(client_order_id);
2216        }
2217        self.cached_filled_qty.swap_remove(&client_order_id);
2218    }
2219
2220    /// Processes a batch cancel orders command.
2221    pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
2222        for order in &command.cancels {
2223            self.process_cancel(order, account_id);
2224        }
2225    }
2226
2227    fn process_market_order(&mut self, order: &OrderAny) {
2228        if order.time_in_force() == TimeInForce::AtTheOpen
2229            || order.time_in_force() == TimeInForce::AtTheClose
2230        {
2231            log::error!(
2232                "Market auction for the time in force {} is currently not supported",
2233                order.time_in_force()
2234            );
2235            return;
2236        }
2237
2238        // Check if market exists
2239        if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
2240            || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
2241        {
2242            self.generate_order_rejected(
2243                order,
2244                format!("No market for {}", order.instrument_id()).into(),
2245            );
2246            return;
2247        }
2248
2249        if self.config.use_market_order_acks {
2250            let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2251            self.generate_order_accepted(order, venue_order_id);
2252        }
2253
2254        // Add order to cache for fill_market_order to fetch
2255        if let Err(e) = self
2256            .cache
2257            .borrow_mut()
2258            .add_order(order.clone(), None, None, false)
2259        {
2260            log::debug!("Order already in cache: {e}");
2261        }
2262
2263        self.fill_market_order(order.client_order_id());
2264    }
2265
2266    fn process_limit_order(&mut self, order: &mut OrderAny) {
2267        let limit_px = order.price().expect("Limit order must have a price");
2268        if order.is_post_only()
2269            && self
2270                .core
2271                .is_limit_matched(order.order_side_specified(), limit_px)
2272        {
2273            self.generate_order_rejected(
2274                order,
2275                format!(
2276                    "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
2277                    order.order_type(),
2278                    order.order_side(),
2279                    order.price().unwrap(),
2280                    self.core
2281                        .bid
2282                        .map_or_else(|| "None".to_string(), |p| p.to_string()),
2283                    self.core
2284                        .ask
2285                        .map_or_else(|| "None".to_string(), |p| p.to_string())
2286                )
2287                .into(),
2288            );
2289            return;
2290        }
2291
2292        // Order is valid and accepted
2293        self.accept_order(order);
2294
2295        // Check for immediate fill
2296        if self
2297            .core
2298            .is_limit_matched(order.order_side_specified(), limit_px)
2299        {
2300            // Filling as liquidity taker
2301            order.set_liquidity_side(LiquiditySide::Taker);
2302
2303            if self
2304                .cache
2305                .borrow_mut()
2306                .add_order(order.clone(), None, None, false)
2307                .is_err()
2308                && let Err(e) = self.cache.borrow_mut().update_order(order)
2309            {
2310                log::debug!("Failed to update order in cache: {e}");
2311            }
2312            self.fill_limit_order(order.client_order_id());
2313
2314            // If fill didn't execute (e.g. all liquidity consumed), revert to
2315            // maker so the fill model check applies on subsequent iterations
2316            if self.core.order_exists(order.client_order_id())
2317                && let Some(cached) = self.cache.borrow_mut().mut_order(&order.client_order_id())
2318            {
2319                cached.set_liquidity_side(LiquiditySide::Maker);
2320            }
2321        } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
2322            self.cancel_order(order, None);
2323        } else {
2324            // Add passive order to cache for later modify/cancel operations
2325            order.set_liquidity_side(LiquiditySide::Maker);
2326
2327            if let Some(price) = order.price() {
2328                self.snapshot_queue_position(order, price);
2329            }
2330
2331            let add_result = self
2332                .cache
2333                .borrow_mut()
2334                .add_order(order.clone(), None, None, false);
2335
2336            if let Err(e) = add_result {
2337                log::debug!("Failed to add order to cache: {e}");
2338
2339                // Persist Maker side on the cached copy when exec engine
2340                // already cached the order (only if not already Maker/Taker)
2341                if let Some(cached) = self.cache.borrow_mut().mut_order(&order.client_order_id())
2342                    && !matches!(
2343                        cached.liquidity_side(),
2344                        Some(LiquiditySide::Maker | LiquiditySide::Taker)
2345                    )
2346                {
2347                    cached.set_liquidity_side(LiquiditySide::Maker);
2348                }
2349            }
2350        }
2351    }
2352
2353    fn process_market_to_limit_order(&mut self, order: &OrderAny) {
2354        // Check that market exists
2355        if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
2356            || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
2357        {
2358            self.generate_order_rejected(
2359                order,
2360                format!("No market for {}", order.instrument_id()).into(),
2361            );
2362            return;
2363        }
2364
2365        if self.config.use_market_order_acks {
2366            let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2367            self.generate_order_accepted(order, venue_order_id);
2368        }
2369
2370        // Immediately fill marketable order
2371        if let Err(e) = self
2372            .cache
2373            .borrow_mut()
2374            .add_order(order.clone(), None, None, false)
2375        {
2376            log::debug!("Order already in cache: {e}");
2377        }
2378        let client_order_id = order.client_order_id();
2379        self.fill_market_order(client_order_id);
2380
2381        // Check for remaining quantity to rest as limit order
2382        let filled_qty = self
2383            .cached_filled_qty
2384            .get(&client_order_id)
2385            .copied()
2386            .unwrap_or_default();
2387        let leaves_qty = order.quantity().saturating_sub(filled_qty);
2388        if !leaves_qty.is_zero() {
2389            // Re-fetch from cache to get updated price from partial fill
2390            let updated_order = self.cache.borrow().order(&client_order_id).cloned();
2391            if let Some(mut updated_order) = updated_order {
2392                self.accept_order(&mut updated_order);
2393            }
2394        }
2395    }
2396
2397    fn process_stop_market_order(&mut self, order: &mut OrderAny) {
2398        let stop_px = order
2399            .trigger_price()
2400            .expect("Stop order must have a trigger price");
2401
2402        if self
2403            .core
2404            .is_stop_matched(order.order_side_specified(), stop_px)
2405        {
2406            if self.config.reject_stop_orders {
2407                self.generate_order_rejected(
2408                    order,
2409                    format!(
2410                        "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
2411                        order.order_type(),
2412                        order.order_side(),
2413                        order.trigger_price().unwrap(),
2414                        self.core
2415                            .bid
2416                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
2417                        self.core
2418                            .ask
2419                            .map_or_else(|| "None".to_string(), |p| p.to_string())
2420                    ).into(),
2421                );
2422                return;
2423            }
2424
2425            if let Err(e) = self
2426                .cache
2427                .borrow_mut()
2428                .add_order(order.clone(), None, None, false)
2429            {
2430                log::debug!("Order already in cache: {e}");
2431            }
2432            self.fill_market_order(order.client_order_id());
2433            return;
2434        }
2435
2436        // order is not matched but is valid and we accept it
2437        self.accept_order(order);
2438
2439        // Add passive order to cache for later modify/cancel operations
2440        order.set_liquidity_side(LiquiditySide::Maker);
2441
2442        if let Err(e) = self
2443            .cache
2444            .borrow_mut()
2445            .add_order(order.clone(), None, None, false)
2446        {
2447            log::debug!("Order already in cache: {e}");
2448        }
2449    }
2450
2451    fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
2452        let stop_px = order
2453            .trigger_price()
2454            .expect("Stop order must have a trigger price");
2455
2456        if self
2457            .core
2458            .is_stop_matched(order.order_side_specified(), stop_px)
2459        {
2460            if self.config.reject_stop_orders {
2461                self.generate_order_rejected(
2462                    order,
2463                    format!(
2464                        "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
2465                        order.order_type(),
2466                        order.order_side(),
2467                        order.trigger_price().unwrap(),
2468                        self.core
2469                            .bid
2470                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
2471                        self.core
2472                            .ask
2473                            .map_or_else(|| "None".to_string(), |p| p.to_string())
2474                    ).into(),
2475                );
2476                return;
2477            }
2478
2479            self.accept_order(order);
2480            self.generate_order_triggered(order);
2481
2482            // Check for immediate fill
2483            let limit_px = order.price().expect("Stop limit order must have a price");
2484
2485            if self
2486                .core
2487                .is_limit_matched(order.order_side_specified(), limit_px)
2488            {
2489                order.set_liquidity_side(LiquiditySide::Taker);
2490
2491                if let Err(e) = self
2492                    .cache
2493                    .borrow_mut()
2494                    .add_order(order.clone(), None, None, false)
2495                {
2496                    log::debug!("Order already in cache: {e}");
2497                }
2498                self.fill_limit_order(order.client_order_id());
2499            }
2500
2501            // Order was triggered (and possibly filled), don't accept again
2502            return;
2503        }
2504
2505        self.accept_order(order);
2506
2507        // Add passive order to cache for later modify/cancel operations
2508        order.set_liquidity_side(LiquiditySide::Maker);
2509
2510        if let Err(e) = self
2511            .cache
2512            .borrow_mut()
2513            .add_order(order.clone(), None, None, false)
2514        {
2515            log::debug!("Order already in cache: {e}");
2516        }
2517    }
2518
2519    fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
2520        if self
2521            .core
2522            .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
2523        {
2524            if self.config.reject_stop_orders {
2525                self.generate_order_rejected(
2526                    order,
2527                    format!(
2528                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
2529                        order.order_type(),
2530                        order.order_side(),
2531                        order.trigger_price().unwrap(),
2532                        self.core
2533                            .bid
2534                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
2535                        self.core
2536                            .ask
2537                            .map_or_else(|| "None".to_string(), |p| p.to_string())
2538                    ).into(),
2539                );
2540                return;
2541            }
2542
2543            if let Err(e) = self
2544                .cache
2545                .borrow_mut()
2546                .add_order(order.clone(), None, None, false)
2547            {
2548                log::debug!("Order already in cache: {e}");
2549            }
2550            self.fill_market_order(order.client_order_id());
2551            return;
2552        }
2553
2554        // Order is valid and accepted
2555        self.accept_order(order);
2556
2557        // Add passive order to cache for later modify/cancel operations
2558        order.set_liquidity_side(LiquiditySide::Maker);
2559
2560        if let Err(e) = self
2561            .cache
2562            .borrow_mut()
2563            .add_order(order.clone(), None, None, false)
2564        {
2565            log::debug!("Order already in cache: {e}");
2566        }
2567    }
2568
2569    fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
2570        if self
2571            .core
2572            .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
2573        {
2574            if self.config.reject_stop_orders {
2575                self.generate_order_rejected(
2576                    order,
2577                    format!(
2578                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
2579                        order.order_type(),
2580                        order.order_side(),
2581                        order.trigger_price().unwrap(),
2582                        self.core
2583                            .bid
2584                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
2585                        self.core
2586                            .ask
2587                            .map_or_else(|| "None".to_string(), |p| p.to_string())
2588                    ).into(),
2589                );
2590                return;
2591            }
2592            self.accept_order(order);
2593            self.generate_order_triggered(order);
2594
2595            // Check if immediate marketable
2596            if self
2597                .core
2598                .is_limit_matched(order.order_side_specified(), order.price().unwrap())
2599            {
2600                order.set_liquidity_side(LiquiditySide::Taker);
2601
2602                if let Err(e) = self
2603                    .cache
2604                    .borrow_mut()
2605                    .add_order(order.clone(), None, None, false)
2606                {
2607                    log::debug!("Order already in cache: {e}");
2608                }
2609                self.fill_limit_order(order.client_order_id());
2610            }
2611            return;
2612        }
2613
2614        // Order is valid and accepted
2615        self.accept_order(order);
2616
2617        // Add passive order to cache for later modify/cancel operations
2618        order.set_liquidity_side(LiquiditySide::Maker);
2619
2620        if let Err(e) = self
2621            .cache
2622            .borrow_mut()
2623            .add_order(order.clone(), None, None, false)
2624        {
2625            log::debug!("Order already in cache: {e}");
2626        }
2627    }
2628
2629    fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
2630        if let Some(trigger_price) = order.trigger_price()
2631            && self
2632                .core
2633                .is_stop_matched(order.order_side_specified(), trigger_price)
2634        {
2635            self.generate_order_rejected(
2636                    order,
2637                    format!(
2638                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
2639                        order.order_type(),
2640                        order.order_side(),
2641                        trigger_price,
2642                        self.core
2643                            .bid
2644                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
2645                        self.core
2646                            .ask
2647                            .map_or_else(|| "None".to_string(), |p| p.to_string())
2648                    ).into(),
2649                );
2650            return;
2651        }
2652
2653        // Order is valid and accepted
2654        self.accept_order(order);
2655
2656        // Add passive order to cache for later modify/cancel operations
2657        order.set_liquidity_side(LiquiditySide::Maker);
2658
2659        if let Err(e) = self
2660            .cache
2661            .borrow_mut()
2662            .add_order(order.clone(), None, None, false)
2663        {
2664            log::debug!("Order already in cache: {e}");
2665        }
2666    }
2667
2668    /// Iterate the matching engine by processing the bid and ask order sides
2669    /// and advancing time up to the given UNIX `timestamp_ns`.
2670    ///
2671    /// The `aggressor_side` parameter is used for trade execution processing.
2672    /// When not `NoAggressor`, the book-based bid/ask reset is skipped to preserve
2673    /// transient trade price overrides.
2674    pub fn iterate(&mut self, timestamp_ns: UnixNanos, aggressor_side: AggressorSide) {
2675        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
2676
2677        // Only reset bid/ask from book when not processing trade execution
2678        // (preserves transient trade price override for L2/L3 books)
2679        if aggressor_side == AggressorSide::NoAggressor {
2680            if let Some(bid) = self.book.best_bid_price() {
2681                self.core.set_bid_raw(bid);
2682            }
2683
2684            if let Some(ask) = self.book.best_ask_price() {
2685                self.core.set_ask_raw(ask);
2686            }
2687        }
2688
2689        // Process expiration before matching to prevent fills on expired instruments
2690        self.check_instrument_expiration();
2691
2692        // Expire GTD orders before matching to prevent fills on expired orders
2693        if self.config.support_gtd_orders {
2694            self.expire_gtd_orders(timestamp_ns);
2695        }
2696
2697        // Process bid actions before snapshotting asks so cross-side
2698        // contingencies (OCO/OUO) mutate state between sides
2699        for action in self.core.iterate_bids() {
2700            match action {
2701                MatchAction::FillLimit(id) => self.fill_limit_order(id),
2702                MatchAction::TriggerStop(id) => self.trigger_stop_order(id),
2703            }
2704        }
2705
2706        for action in self.core.iterate_asks() {
2707            match action {
2708                MatchAction::FillLimit(id) => self.fill_limit_order(id),
2709                MatchAction::TriggerStop(id) => self.trigger_stop_order(id),
2710            }
2711        }
2712
2713        let orders_bid = self.core.get_orders_bid().to_vec();
2714        let orders_ask = self.core.get_orders_ask().to_vec();
2715
2716        self.iterate_orders(timestamp_ns, &orders_bid);
2717        self.iterate_orders(timestamp_ns, &orders_ask);
2718
2719        // Restore core bid/ask to book values after order iteration
2720        // (during trade execution, transient override was used for matching)
2721        self.core.bid = self.book.best_bid_price();
2722        self.core.ask = self.book.best_ask_price();
2723    }
2724
2725    fn get_trailing_activation_price(
2726        &self,
2727        trigger_type: TriggerType,
2728        order_side: OrderSide,
2729        bid: Option<Price>,
2730        ask: Option<Price>,
2731        last: Option<Price>,
2732    ) -> Option<Price> {
2733        match trigger_type {
2734            TriggerType::LastPrice => last,
2735            TriggerType::LastOrBidAsk => last.or(match order_side {
2736                OrderSide::Buy => ask,
2737                OrderSide::Sell => bid,
2738                _ => None,
2739            }),
2740            // Default, BidAsk, DoubleBidAsk, DoubleLastPrice, IndexPrice, MarkPrice
2741            _ => match order_side {
2742                OrderSide::Buy => ask,
2743                OrderSide::Sell => bid,
2744                _ => None,
2745            },
2746        }
2747    }
2748
2749    fn maybe_activate_trailing_stop(
2750        &self,
2751        order: &mut OrderAny,
2752        bid: Option<Price>,
2753        ask: Option<Price>,
2754        last: Option<Price>,
2755    ) -> bool {
2756        match order {
2757            OrderAny::TrailingStopMarket(inner) => {
2758                if inner.is_activated {
2759                    return true;
2760                }
2761
2762                if inner.activation_price.is_none() {
2763                    let px = self.get_trailing_activation_price(
2764                        inner.trigger_type,
2765                        inner.order_side(),
2766                        bid,
2767                        ask,
2768                        last,
2769                    );
2770
2771                    if let Some(p) = px {
2772                        inner.activation_price = Some(p);
2773                        inner.set_activated();
2774
2775                        if let Err(e) = self.cache.borrow_mut().update_order(order) {
2776                            log::error!("Failed to update order: {e}");
2777                        }
2778                        return true;
2779                    }
2780                    return false;
2781                }
2782
2783                let activation_price = inner.activation_price.unwrap();
2784                let hit = match inner.order_side() {
2785                    OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
2786                    OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
2787                    _ => false,
2788                };
2789
2790                if hit {
2791                    inner.set_activated();
2792
2793                    if let Err(e) = self.cache.borrow_mut().update_order(order) {
2794                        log::error!("Failed to update order: {e}");
2795                    }
2796                }
2797                hit
2798            }
2799            OrderAny::TrailingStopLimit(inner) => {
2800                if inner.is_activated {
2801                    return true;
2802                }
2803
2804                if inner.activation_price.is_none() {
2805                    let px = self.get_trailing_activation_price(
2806                        inner.trigger_type,
2807                        inner.order_side(),
2808                        bid,
2809                        ask,
2810                        last,
2811                    );
2812
2813                    if let Some(p) = px {
2814                        inner.activation_price = Some(p);
2815                        inner.set_activated();
2816
2817                        if let Err(e) = self.cache.borrow_mut().update_order(order) {
2818                            log::error!("Failed to update order: {e}");
2819                        }
2820                        return true;
2821                    }
2822                    return false;
2823                }
2824
2825                let activation_price = inner.activation_price.unwrap();
2826                let hit = match inner.order_side() {
2827                    OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
2828                    OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
2829                    _ => false,
2830                };
2831
2832                if hit {
2833                    inner.set_activated();
2834
2835                    if let Err(e) = self.cache.borrow_mut().update_order(order) {
2836                        log::error!("Failed to update order: {e}");
2837                    }
2838                }
2839                hit
2840            }
2841            _ => true,
2842        }
2843    }
2844
2845    fn expire_gtd_orders(&mut self, timestamp_ns: UnixNanos) {
2846        for match_info in self.core.get_orders() {
2847            let order = match self
2848                .cache
2849                .borrow()
2850                .order(&match_info.client_order_id)
2851                .cloned()
2852            {
2853                Some(order) => order,
2854                None => continue,
2855            };
2856
2857            if order.is_closed() {
2858                continue;
2859            }
2860
2861            if order
2862                .expire_time()
2863                .is_some_and(|expire_ns| timestamp_ns >= expire_ns)
2864            {
2865                let _ = self.core.delete_order(match_info.client_order_id);
2866                self.cached_filled_qty
2867                    .swap_remove(&match_info.client_order_id);
2868                self.expire_order(&order);
2869            }
2870        }
2871    }
2872
2873    fn iterate_orders(&mut self, timestamp_ns: UnixNanos, orders: &[OrderMatchInfo]) {
2874        for match_info in orders {
2875            let order = match self
2876                .cache
2877                .borrow()
2878                .order(&match_info.client_order_id)
2879                .cloned()
2880            {
2881                Some(order) => order,
2882                None => {
2883                    log::warn!(
2884                        "Order {} not found in cache during iteration, skipping",
2885                        match_info.client_order_id
2886                    );
2887                    continue;
2888                }
2889            };
2890
2891            if order.is_closed() {
2892                continue;
2893            }
2894
2895            if self.config.support_gtd_orders
2896                && order
2897                    .expire_time()
2898                    .is_some_and(|expire_timestamp_ns| timestamp_ns >= expire_timestamp_ns)
2899            {
2900                let _ = self.core.delete_order(match_info.client_order_id);
2901                self.cached_filled_qty
2902                    .swap_remove(&match_info.client_order_id);
2903                self.expire_order(&order);
2904                continue;
2905            }
2906
2907            if matches!(
2908                match_info.order_type,
2909                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
2910            ) {
2911                let mut any = order;
2912
2913                if !self.maybe_activate_trailing_stop(
2914                    &mut any,
2915                    self.core.bid,
2916                    self.core.ask,
2917                    self.core.last,
2918                ) {
2919                    continue;
2920                }
2921
2922                self.update_trailing_stop_order(&any);
2923
2924                // Persist the activated/updated trailing stop back to the core
2925                let _ = self.core.delete_order(match_info.client_order_id);
2926                let updated_match_info = OrderMatchInfo::new(
2927                    any.client_order_id(),
2928                    any.order_side().as_specified(),
2929                    any.order_type(),
2930                    any.trigger_price(),
2931                    any.price(),
2932                    match &any {
2933                        OrderAny::TrailingStopMarket(o) => o.is_activated,
2934                        OrderAny::TrailingStopLimit(o) => o.is_activated,
2935                        _ => true,
2936                    },
2937                );
2938                self.core.add_order(updated_match_info);
2939            }
2940
2941            // Move market back to targets
2942            if let Some(target_bid) = self.target_bid {
2943                self.core.bid = Some(target_bid);
2944                self.target_bid = None;
2945            }
2946
2947            if let Some(target_bid) = self.target_bid.take() {
2948                self.core.bid = Some(target_bid);
2949                self.target_bid = None;
2950            }
2951
2952            if let Some(target_ask) = self.target_ask.take() {
2953                self.core.ask = Some(target_ask);
2954                self.target_ask = None;
2955            }
2956
2957            if let Some(target_last) = self.target_last.take() {
2958                self.core.last = Some(target_last);
2959                self.target_last = None;
2960            }
2961        }
2962
2963        // Reset any targets after iteration
2964        self.target_bid = None;
2965        self.target_ask = None;
2966        self.target_last = None;
2967    }
2968
2969    fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
2970        match order.price() {
2971            Some(order_price) => {
2972                // When liquidity consumption is enabled, get ALL crossed levels so that
2973                // consumed levels can be filtered out while still finding valid ones.
2974                // Otherwise simulate_fills only returns enough levels to satisfy leaves_qty,
2975                // which may all be consumed, missing other valid crossed levels.
2976                let mut fills = if self.config.liquidity_consumption {
2977                    let size_prec = self.instrument.size_precision();
2978                    self.book
2979                        .get_all_crossed_levels(order.order_side(), order_price, size_prec)
2980                } else {
2981                    let book_order =
2982                        BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
2983                    self.book.simulate_fills(&book_order)
2984                };
2985
2986                // Trade execution: use trade-driven fill when book doesn't reflect trade price
2987                if let Some(trade_size) = self.last_trade_size
2988                    && let Some(trade_price) = self.core.last
2989                {
2990                    let fills_at_trade_price = fills.iter().any(|(px, _)| *px == trade_price);
2991
2992                    if !fills_at_trade_price
2993                        && self
2994                            .core
2995                            .is_limit_matched(order.order_side_specified(), order_price)
2996                    {
2997                        // Fill model check for MAKER at limit is already handled in fill_limit_order,
2998                        // don't re-check here to avoid calling is_limit_filled() twice (p² probability).
2999                        let leaves_qty = order.leaves_qty();
3000                        let available_qty = if self.config.liquidity_consumption {
3001                            let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
3002                            Quantity::from_raw(remaining, trade_size.precision)
3003                        } else {
3004                            trade_size
3005                        };
3006
3007                        let fill_qty = min(leaves_qty, available_qty);
3008
3009                        if !fill_qty.is_zero() {
3010                            log::debug!(
3011                                "Trade execution fill: {} @ {} (trade_price={}, available: {}, book had {} fills)",
3012                                fill_qty,
3013                                order_price,
3014                                trade_price,
3015                                available_qty,
3016                                fills.len()
3017                            );
3018
3019                            if self.config.liquidity_consumption {
3020                                self.trade_consumption += fill_qty.raw;
3021                            }
3022
3023                            // Fill at the limit price (conservative) rather than the trade price.
3024                            // Trade execution fills already account for consumption via trade_consumption,
3025                            // return early to bypass apply_liquidity_consumption which would incorrectly
3026                            // discard these fills when the trade price isn't in the order book.
3027                            return vec![(order_price, fill_qty)];
3028                        }
3029                    }
3030                }
3031
3032                // Return immediately if no fills
3033                if fills.is_empty() {
3034                    return fills;
3035                }
3036
3037                // Save original book prices BEFORE any fill price modifications for consumption tracking,
3038                // since the TAKER and MAKER loops below may adjust fill prices. Consumption should be
3039                // tracked against the original book price levels where liquidity was sourced from.
3040                let book_prices: Vec<Price> = if self.config.liquidity_consumption {
3041                    fills.iter().map(|(px, _)| *px).collect()
3042                } else {
3043                    Vec::new()
3044                };
3045                let book_prices_ref: Option<&[Price]> = if book_prices.is_empty() {
3046                    None
3047                } else {
3048                    Some(&book_prices)
3049                };
3050
3051                // check if trigger price exists
3052                if let Some(triggered_price) = order.trigger_price() {
3053                    // Filling as TAKER from trigger
3054                    if order
3055                        .liquidity_side()
3056                        .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
3057                    {
3058                        if order.order_side() == OrderSide::Sell && order_price > triggered_price {
3059                            // manually change the fills index 0
3060                            let first_fill = fills.first().unwrap();
3061                            let triggered_qty = first_fill.1;
3062                            fills[0] = (triggered_price, triggered_qty);
3063                            self.target_bid = self.core.bid;
3064                            self.target_ask = self.core.ask;
3065                            self.target_last = self.core.last;
3066                            self.core.set_ask_raw(order_price);
3067                            self.core.set_last_raw(order_price);
3068                        } else if order.order_side() == OrderSide::Buy
3069                            && order_price < triggered_price
3070                        {
3071                            // manually change the fills index 0
3072                            let first_fill = fills.first().unwrap();
3073                            let triggered_qty = first_fill.1;
3074                            fills[0] = (triggered_price, triggered_qty);
3075                            self.target_bid = self.core.bid;
3076                            self.target_ask = self.core.ask;
3077                            self.target_last = self.core.last;
3078                            self.core.set_bid_raw(order_price);
3079                            self.core.set_last_raw(order_price);
3080                        }
3081                    }
3082                }
3083
3084                // Filling as MAKER from trigger
3085                if order
3086                    .liquidity_side()
3087                    .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
3088                {
3089                    match order.order_side().as_specified() {
3090                        OrderSideSpecified::Buy => {
3091                            let target_price = if order
3092                                .trigger_price()
3093                                .is_some_and(|trigger_price| order_price > trigger_price)
3094                            {
3095                                order.trigger_price().unwrap()
3096                            } else {
3097                                order_price
3098                            };
3099
3100                            for fill in &mut fills {
3101                                let last_px = fill.0;
3102                                if last_px < order_price {
3103                                    // Marketable BUY would have filled at limit
3104                                    self.target_bid = self.core.bid;
3105                                    self.target_ask = self.core.ask;
3106                                    self.target_last = self.core.last;
3107                                    self.core.set_ask_raw(target_price);
3108                                    self.core.set_last_raw(target_price);
3109                                    fill.0 = target_price;
3110                                }
3111                            }
3112                        }
3113                        OrderSideSpecified::Sell => {
3114                            let target_price = if order
3115                                .trigger_price()
3116                                .is_some_and(|trigger_price| order_price < trigger_price)
3117                            {
3118                                order.trigger_price().unwrap()
3119                            } else {
3120                                order_price
3121                            };
3122
3123                            for fill in &mut fills {
3124                                let last_px = fill.0;
3125                                if last_px > order_price {
3126                                    // Marketable SELL would have filled at limit
3127                                    self.target_bid = self.core.bid;
3128                                    self.target_ask = self.core.ask;
3129                                    self.target_last = self.core.last;
3130                                    self.core.set_bid_raw(target_price);
3131                                    self.core.set_last_raw(target_price);
3132                                    fill.0 = target_price;
3133                                }
3134                            }
3135                        }
3136                    }
3137                }
3138
3139                self.apply_liquidity_consumption(
3140                    fills,
3141                    order.order_side(),
3142                    order.leaves_qty(),
3143                    book_prices_ref,
3144                )
3145            }
3146            None => panic!("Limit order must have a price"),
3147        }
3148    }
3149
3150    fn determine_market_price_and_volume(&self, order: &OrderAny) -> Vec<(Price, Quantity)> {
3151        let price = match order.order_side().as_specified() {
3152            OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
3153            OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
3154        };
3155
3156        // When liquidity consumption is enabled, get ALL crossed levels so that
3157        // consumed levels can be filtered out while still finding valid ones.
3158        let mut fills = if self.config.liquidity_consumption {
3159            let size_prec = self.instrument.size_precision();
3160            self.book
3161                .get_all_crossed_levels(order.order_side(), price, size_prec)
3162        } else {
3163            let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
3164            self.book.simulate_fills(&book_order)
3165        };
3166
3167        // For stop market and market-if-touched orders during bar H/L/C processing, fill at trigger price
3168        // (market moved through the trigger). For gaps/immediate triggers, fill at market.
3169        if !self.fill_at_market
3170            && self.book_type == BookType::L1_MBP
3171            && !fills.is_empty()
3172            && matches!(
3173                order.order_type(),
3174                OrderType::StopMarket | OrderType::TrailingStopMarket | OrderType::MarketIfTouched
3175            )
3176            && let Some(trigger_price) = order.trigger_price()
3177        {
3178            fills[0] = (trigger_price, fills[0].1);
3179
3180            // Skip liquidity consumption for trigger price fills (gap price may not exist in book).
3181            let mut remaining_qty = order.leaves_qty().raw;
3182            let mut capped_fills = Vec::with_capacity(fills.len());
3183
3184            for (price, qty) in fills {
3185                if remaining_qty == 0 {
3186                    break;
3187                }
3188
3189                let capped_qty_raw = min(qty.raw, remaining_qty);
3190                if capped_qty_raw == 0 {
3191                    continue;
3192                }
3193
3194                remaining_qty -= capped_qty_raw;
3195                capped_fills.push((price, Quantity::from_raw(capped_qty_raw, qty.precision)));
3196            }
3197
3198            return capped_fills;
3199        }
3200
3201        fills
3202    }
3203
3204    fn determine_market_fill_model_price_and_volume(
3205        &mut self,
3206        order: &OrderAny,
3207    ) -> (Vec<(Price, Quantity)>, bool) {
3208        if let (Some(best_bid), Some(best_ask)) = (self.core.bid, self.core.ask)
3209            && let Some(book) = self.fill_model.get_orderbook_for_fill_simulation(
3210                &self.instrument,
3211                order,
3212                best_bid,
3213                best_ask,
3214            )
3215        {
3216            let price = match order.order_side().as_specified() {
3217                OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
3218                OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
3219            };
3220            let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
3221            let fills = book.simulate_fills(&book_order);
3222            if !fills.is_empty() {
3223                return (fills, true);
3224            }
3225        }
3226        (self.determine_market_price_and_volume(order), false)
3227    }
3228
3229    fn determine_limit_fill_model_price_and_volume(
3230        &mut self,
3231        order: &OrderAny,
3232    ) -> Vec<(Price, Quantity)> {
3233        if let (Some(best_bid), Some(best_ask)) = (self.core.bid, self.core.ask)
3234            && let Some(book) = self.fill_model.get_orderbook_for_fill_simulation(
3235                &self.instrument,
3236                order,
3237                best_bid,
3238                best_ask,
3239            )
3240            && let Some(limit_price) = order.price()
3241        {
3242            let book_order = BookOrder::new(order.order_side(), limit_price, order.quantity(), 0);
3243            let fills = book.simulate_fills(&book_order);
3244            if !fills.is_empty() {
3245                return fills;
3246            }
3247        }
3248        self.determine_limit_price_and_volume(order)
3249    }
3250
3251    /// Fills a market order against the current order book.
3252    ///
3253    /// The order is filled as a taker against available liquidity.
3254    /// Reduce-only orders are canceled if no position exists.
3255    pub fn fill_market_order(&mut self, client_order_id: ClientOrderId) {
3256        let mut order = match self.cache.borrow().order(&client_order_id).cloned() {
3257            Some(order) => order,
3258            None => {
3259                log::error!("Cannot fill market order: order {client_order_id} not found in cache");
3260                return;
3261            }
3262        };
3263
3264        // Convert quote-denominated quantity at fill time for trigger-style market
3265        // orders that skipped conversion at submission. Idempotent: orders already
3266        // converted have `is_quote_quantity == false`.
3267        if order.is_quote_quantity()
3268            && !self.instrument.is_inverse()
3269            && !self.convert_quote_to_base_quantity(&mut order)
3270        {
3271            return;
3272        }
3273
3274        if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id())
3275            && filled_qty >= &order.quantity()
3276        {
3277            log::debug!(
3278                "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
3279                filled_qty,
3280                order.quantity(),
3281                order.filled_qty(),
3282                order.quantity()
3283            );
3284            return;
3285        }
3286
3287        let venue_position_id = self.ids_generator.get_position_id(&order, Some(true));
3288        let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
3289            let cache = self.cache.as_ref().borrow();
3290            cache.position(&venue_position_id).cloned()
3291        } else {
3292            None
3293        };
3294
3295        if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
3296            log::warn!(
3297                "Canceling REDUCE_ONLY {} as would increase position",
3298                order.order_type()
3299            );
3300            self.cancel_order(&order, None);
3301            return;
3302        }
3303
3304        order.set_liquidity_side(LiquiditySide::Taker);
3305        let (mut fills, from_synthetic) = self.determine_market_fill_model_price_and_volume(&order);
3306
3307        // Apply protection price filtering at fill time (trigger-time semantics for stops)
3308        if let Some(protection_points) = self.config.price_protection_points
3309            && matches!(
3310                order.order_type(),
3311                OrderType::Market | OrderType::StopMarket
3312            )
3313            && let Ok(protection_price) = protection_price_calculate(
3314                self.instrument.price_increment(),
3315                &order,
3316                protection_points,
3317                self.core.bid,
3318                self.core.ask,
3319            )
3320        {
3321            fills = self.filter_fills_by_protection(fills, &order, protection_price);
3322        }
3323
3324        // Skip consumption for synthetic fill-model books (prices may not exist
3325        // in the real book) and trigger price fills (gap price may not exist)
3326        let is_trigger_price_fill = !self.fill_at_market
3327            && self.book_type == BookType::L1_MBP
3328            && matches!(
3329                order.order_type(),
3330                OrderType::StopMarket | OrderType::TrailingStopMarket | OrderType::MarketIfTouched
3331            )
3332            && order.trigger_price().is_some();
3333
3334        if !from_synthetic && !is_trigger_price_fill {
3335            fills = self.apply_liquidity_consumption(
3336                fills,
3337                order.order_side(),
3338                order.leaves_qty(),
3339                None,
3340            );
3341        }
3342
3343        self.apply_fills(
3344            &order,
3345            &fills,
3346            LiquiditySide::Taker,
3347            None,
3348            position.as_ref(),
3349        );
3350    }
3351
3352    fn filter_fills_by_protection(
3353        &self,
3354        fills: Vec<(Price, Quantity)>,
3355        order: &OrderAny,
3356        protection_price: Price,
3357    ) -> Vec<(Price, Quantity)> {
3358        let protection_raw = protection_price.raw;
3359        fills
3360            .into_iter()
3361            .filter(|(fill_price, _)| {
3362                match order.order_side() {
3363                    // BUY: only fill at prices <= protection_price
3364                    OrderSide::Buy => fill_price.raw <= protection_raw,
3365                    // SELL: only fill at prices >= protection_price
3366                    OrderSide::Sell => fill_price.raw >= protection_raw,
3367                    OrderSide::NoOrderSide => false,
3368                }
3369            })
3370            .collect()
3371    }
3372
3373    /// Attempts to fill a limit order against the current order book.
3374    ///
3375    /// Determines fill prices and quantities based on available liquidity,
3376    /// then applies the fills to the order.
3377    ///
3378    /// # Panics
3379    ///
3380    /// Panics if the order has no price (design error).
3381    pub fn fill_limit_order(&mut self, client_order_id: ClientOrderId) {
3382        let mut order = match self.cache.borrow().order(&client_order_id).cloned() {
3383            Some(order) => order,
3384            None => {
3385                log::error!("Cannot fill limit order: order {client_order_id} not found in cache");
3386                return;
3387            }
3388        };
3389
3390        // Convert quote-denominated quantity at fill time for orders that entered
3391        // this path still carrying a quote notional (e.g. trailing-stop-limit with
3392        // a late-assigned price). Idempotent for already-converted orders.
3393        if order.is_quote_quantity()
3394            && !self.instrument.is_inverse()
3395            && !self.convert_quote_to_base_quantity(&mut order)
3396        {
3397            return;
3398        }
3399
3400        match order.price() {
3401            Some(order_price) => {
3402                let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
3403                if let Some(&qty) = cached_filled_qty
3404                    && qty >= order.quantity()
3405                {
3406                    log::debug!(
3407                        "Ignoring fill as already filled pending application of events: {}, {}, {}, {}",
3408                        qty,
3409                        order.quantity(),
3410                        order.filled_qty(),
3411                        order.leaves_qty(),
3412                    );
3413                    return;
3414                }
3415
3416                // Check fill model for MAKER orders at the limit price
3417                if order
3418                    .liquidity_side()
3419                    .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
3420                {
3421                    // For trade execution: check if trade price equals order price
3422                    // For quote updates: check if bid/ask equals order price
3423                    let at_limit = if self.last_trade_size.is_some() && self.core.last.is_some() {
3424                        self.core.last.is_some_and(|last| last == order_price)
3425                    } else if order.order_side() == OrderSide::Buy {
3426                        self.core.bid.is_some_and(|bid| bid == order_price)
3427                    } else {
3428                        self.core.ask.is_some_and(|ask| ask == order_price)
3429                    };
3430
3431                    if at_limit && !self.fill_model.is_limit_filled() {
3432                        return; // Not filled (simulates queue position)
3433                    }
3434                }
3435
3436                let queue_allowed_raw = if self.config.queue_position {
3437                    match self.determine_trade_fill_qty(&order) {
3438                        None | Some(0) => {
3439                            if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc)
3440                            {
3441                                self.cancel_order(&order, None);
3442                            }
3443                            return;
3444                        }
3445                        Some(allowed) => Some(allowed),
3446                    }
3447                } else {
3448                    None
3449                };
3450
3451                let venue_position_id = self.ids_generator.get_position_id(&order, None);
3452                let position = if let Some(venue_position_id) = venue_position_id {
3453                    let cache = self.cache.as_ref().borrow();
3454                    cache.position(&venue_position_id).cloned()
3455                } else {
3456                    None
3457                };
3458
3459                if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
3460                    log::warn!(
3461                        "Canceling REDUCE_ONLY {} as would increase position",
3462                        order.order_type()
3463                    );
3464                    self.cancel_order(&order, None);
3465                    return;
3466                }
3467
3468                let tc_before = self.trade_consumption;
3469                let mut fills = self.determine_limit_fill_model_price_and_volume(&order);
3470
3471                if let Some(allowed_raw) = queue_allowed_raw {
3472                    let size_prec = self.instrument.size_precision();
3473                    let mut remaining = allowed_raw;
3474                    fills = fills
3475                        .into_iter()
3476                        .filter_map(|(price, qty)| {
3477                            if remaining == 0 {
3478                                return None;
3479                            }
3480                            let capped = qty.raw.min(remaining);
3481                            remaining -= capped;
3482                            Some((price, Quantity::from_raw(capped, size_prec)))
3483                        })
3484                        .collect();
3485
3486                    // Consume excess and reconcile trade budget after capping
3487                    let consumed: QuantityRaw = fills.iter().map(|(_, qty)| qty.raw).sum();
3488
3489                    if let Some(excess) = self.queue_excess.get_mut(&order.client_order_id()) {
3490                        *excess = excess.saturating_sub(consumed);
3491                    }
3492                    self.trade_consumption = tc_before + consumed;
3493                }
3494
3495                // Skip apply_fills when consumed-liquidity adjustment produces no fills.
3496                // This occurs for partially filled orders when an unrelated delta arrives
3497                // and no new liquidity is available at the order's price level.
3498                if fills.is_empty() && self.config.liquidity_consumption {
3499                    log::debug!(
3500                        "Skipping fill for {}: no liquidity available after consumption",
3501                        order.client_order_id()
3502                    );
3503
3504                    if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
3505                        self.cancel_order(&order, None);
3506                    }
3507
3508                    return;
3509                }
3510
3511                let liquidity_side = order.liquidity_side().unwrap();
3512                self.apply_fills(
3513                    &order,
3514                    &fills,
3515                    liquidity_side,
3516                    venue_position_id,
3517                    position.as_ref(),
3518                );
3519            }
3520            None => panic!("Limit order must have a price"),
3521        }
3522    }
3523
3524    fn apply_fills(
3525        &mut self,
3526        order: &OrderAny,
3527        fills: &[(Price, Quantity)],
3528        liquidity_side: LiquiditySide,
3529        venue_position_id: Option<PositionId>,
3530        position: Option<&Position>,
3531    ) {
3532        if order.time_in_force() == TimeInForce::Fok {
3533            let mut total_size = Quantity::zero(order.quantity().precision);
3534            for (_fill_px, fill_qty) in fills {
3535                total_size = total_size.add(*fill_qty);
3536            }
3537
3538            if order.leaves_qty() > total_size {
3539                self.cancel_order(order, None);
3540                return;
3541            }
3542        }
3543
3544        if fills.is_empty() {
3545            if order.status() == OrderStatus::Submitted {
3546                self.generate_order_rejected(
3547                    order,
3548                    format!("No market for {}", order.instrument_id()).into(),
3549                );
3550            } else {
3551                log::error!(
3552                    "Cannot fill order: no fills from book when fills were expected (check size in data)"
3553                );
3554                return;
3555            }
3556        }
3557
3558        // For netting mode, don't use venue position ID (use None instead)
3559        let venue_position_id = if self.oms_type == OmsType::Netting {
3560            None
3561        } else {
3562            venue_position_id
3563        };
3564
3565        let mut initial_market_to_limit_fill = false;
3566
3567        for &(mut fill_px, ref fill_qty) in fills {
3568            assert!(
3569                (fill_px.precision == self.instrument.price_precision()),
3570                "Invalid price precision for fill price {} when instrument price precision is {}.\
3571                     Check that the data price precision matches the {} instrument",
3572                fill_px.precision,
3573                self.instrument.price_precision(),
3574                self.instrument.id()
3575            );
3576
3577            assert!(
3578                (fill_qty.precision == self.instrument.size_precision()),
3579                "Invalid quantity precision for fill quantity {} when instrument size precision is {}.\
3580                     Check that the data quantity precision matches the {} instrument",
3581                fill_qty.precision,
3582                self.instrument.size_precision(),
3583                self.instrument.id()
3584            );
3585
3586            if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
3587                && order.order_type() == OrderType::MarketToLimit
3588            {
3589                self.generate_order_updated(order, order.quantity(), Some(fill_px), None, None);
3590                initial_market_to_limit_fill = true;
3591            }
3592
3593            if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
3594                fill_px = match order.order_side().as_specified() {
3595                    OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
3596                    OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
3597                }
3598            }
3599
3600            // Check reduce only order
3601            // If the incoming simulated fill would exceed the position when reduce-only is honored,
3602            // clamp the effective fill size to the adjusted (remaining position) quantity.
3603            let mut effective_fill_qty = *fill_qty;
3604
3605            if self.config.use_reduce_only
3606                && order.is_reduce_only()
3607                && let Some(position) = &position
3608                && *fill_qty > position.quantity
3609            {
3610                if position.quantity == Quantity::zero(position.quantity.precision) {
3611                    // Done
3612                    return;
3613                }
3614
3615                // Adjusted target quantity equals the remaining position size
3616                let adjusted_fill_qty =
3617                    Quantity::from_raw(position.quantity.raw, fill_qty.precision);
3618
3619                // Determine the effective fill size for this iteration first
3620                effective_fill_qty = min(effective_fill_qty, adjusted_fill_qty);
3621
3622                // Only emit an update if the order quantity actually changes
3623                if order.quantity() != adjusted_fill_qty {
3624                    self.generate_order_updated(order, adjusted_fill_qty, None, None, None);
3625                }
3626            }
3627
3628            if fill_qty.is_zero() {
3629                if fills.len() == 1 && order.status() == OrderStatus::Submitted {
3630                    self.generate_order_rejected(
3631                        order,
3632                        format!("No market for {}", order.instrument_id()).into(),
3633                    );
3634                }
3635                return;
3636            }
3637
3638            self.fill_order(
3639                order,
3640                fill_px,
3641                effective_fill_qty,
3642                liquidity_side,
3643                venue_position_id,
3644                position,
3645            );
3646
3647            if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
3648                // Filled initial level
3649                return;
3650            }
3651        }
3652
3653        if order.time_in_force() == TimeInForce::Ioc && order.is_open() {
3654            // IOC order has filled all available size
3655            self.cancel_order(order, None);
3656            return;
3657        }
3658
3659        if order.is_open()
3660            && self.book_type == BookType::L1_MBP
3661            && matches!(
3662                order.order_type(),
3663                OrderType::Market
3664                    | OrderType::MarketIfTouched
3665                    | OrderType::StopMarket
3666                    | OrderType::TrailingStopMarket
3667            )
3668        {
3669            // Exhausted simulated book volume (continue aggressive filling into next level)
3670            // This is a very basic implementation of slipping by a single tick, in the future
3671            // we will implement more detailed fill modeling.
3672            todo!("Exhausted simulated book volume")
3673        }
3674    }
3675
3676    fn fill_order(
3677        &mut self,
3678        order: &OrderAny,
3679        last_px: Price,
3680        last_qty: Quantity,
3681        liquidity_side: LiquiditySide,
3682        venue_position_id: Option<PositionId>,
3683        _position: Option<&Position>,
3684    ) {
3685        self.check_size_precision(last_qty.precision, "fill quantity")
3686            .unwrap();
3687
3688        match self.cached_filled_qty.get(&order.client_order_id()) {
3689            Some(filled_qty) => {
3690                // Use saturating_sub to prevent panic if filled_qty > quantity
3691                let leaves_qty = order.quantity().saturating_sub(*filled_qty);
3692                let last_qty = min(last_qty, leaves_qty);
3693                let new_filled_qty = *filled_qty + last_qty;
3694                self.cached_filled_qty
3695                    .insert(order.client_order_id(), new_filled_qty);
3696            }
3697            None => {
3698                self.cached_filled_qty
3699                    .insert(order.client_order_id(), last_qty);
3700            }
3701        }
3702
3703        let commission = self
3704            .fee_model
3705            .get_commission(order, last_qty, last_px, &self.instrument)
3706            .unwrap();
3707
3708        let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
3709        self.generate_order_filled(
3710            order,
3711            venue_order_id,
3712            venue_position_id,
3713            last_qty,
3714            last_px,
3715            self.instrument.quote_currency(),
3716            commission,
3717            liquidity_side,
3718        );
3719
3720        let fully_filled = self
3721            .cached_filled_qty
3722            .get(&order.client_order_id())
3723            .is_some_and(|qty| qty >= &order.quantity());
3724
3725        if order.is_passive() && (order.is_closed() || fully_filled) {
3726            if self.core.order_exists(order.client_order_id()) {
3727                let _ = self.core.delete_order(order.client_order_id());
3728            }
3729
3730            // Only clear cached fills when the order status reflects closure,
3731            // callers like process_market_to_limit_order still need the entry
3732            if order.is_closed() {
3733                self.cached_filled_qty.swap_remove(&order.client_order_id());
3734            }
3735        }
3736
3737        if !self.config.support_contingent_orders {
3738            return;
3739        }
3740
3741        if let Some(contingency_type) = order.contingency_type() {
3742            match contingency_type {
3743                ContingencyType::Oto => {
3744                    if let Some(linked_orders_ids) = order.linked_order_ids() {
3745                        for client_order_id in linked_orders_ids {
3746                            let mut child_order = match self.cache.borrow().order(client_order_id) {
3747                                Some(child_order) => child_order.clone(),
3748                                None => panic!("Order {client_order_id} not found in cache"),
3749                            };
3750
3751                            if child_order.is_closed() || child_order.is_active_local() {
3752                                continue;
3753                            }
3754
3755                            // Check if we need to index position id
3756                            if let (None, Some(position_id)) =
3757                                (child_order.position_id(), order.position_id())
3758                            {
3759                                self.cache
3760                                    .borrow_mut()
3761                                    .add_position_id(
3762                                        &position_id,
3763                                        &self.venue,
3764                                        client_order_id,
3765                                        &child_order.strategy_id(),
3766                                    )
3767                                    .unwrap();
3768                                log::debug!(
3769                                    "Added position id {position_id} to cache for order {client_order_id}"
3770                                );
3771                            }
3772
3773                            if (!child_order.is_open())
3774                                || (matches!(child_order.status(), OrderStatus::PendingUpdate)
3775                                    && child_order
3776                                        .previous_status()
3777                                        .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
3778                            {
3779                                let account_id = order.account_id().unwrap_or_else(|| {
3780                                    *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
3781                                        panic!(
3782                                            "Account ID not found for trader {}",
3783                                            order.trader_id()
3784                                        )
3785                                    })
3786                                });
3787                                self.process_order(&mut child_order, account_id);
3788                            }
3789                        }
3790                    } else {
3791                        log::error!(
3792                            "OTO order {} does not have linked orders",
3793                            order.client_order_id()
3794                        );
3795                    }
3796                }
3797                ContingencyType::Oco => {
3798                    if let Some(linked_orders_ids) = order.linked_order_ids() {
3799                        for client_order_id in linked_orders_ids {
3800                            let child_order = match self.cache.borrow().order(client_order_id) {
3801                                Some(child_order) => child_order.clone(),
3802                                None => panic!("Order {client_order_id} not found in cache"),
3803                            };
3804
3805                            if child_order.is_closed() || child_order.is_active_local() {
3806                                continue;
3807                            }
3808
3809                            self.cancel_order(&child_order, None);
3810                        }
3811                    } else {
3812                        log::error!(
3813                            "OCO order {} does not have linked orders",
3814                            order.client_order_id()
3815                        );
3816                    }
3817                }
3818                ContingencyType::Ouo => {
3819                    if let Some(linked_orders_ids) = order.linked_order_ids() {
3820                        for client_order_id in linked_orders_ids {
3821                            let mut child_order = match self.cache.borrow().order(client_order_id) {
3822                                Some(child_order) => child_order.clone(),
3823                                None => panic!("Order {client_order_id} not found in cache"),
3824                            };
3825
3826                            if child_order.is_active_local() {
3827                                continue;
3828                            }
3829
3830                            if order.is_closed() && child_order.is_open() {
3831                                self.cancel_order(&child_order, None);
3832                            } else if !order.leaves_qty().is_zero()
3833                                && order.leaves_qty() != child_order.leaves_qty()
3834                            {
3835                                let price = child_order.price();
3836                                let trigger_price = child_order.trigger_price();
3837                                self.update_order(
3838                                    &mut child_order,
3839                                    Some(order.leaves_qty()),
3840                                    price,
3841                                    trigger_price,
3842                                    Some(false),
3843                                );
3844                            }
3845                        }
3846                    } else {
3847                        log::error!(
3848                            "OUO order {} does not have linked orders",
3849                            order.client_order_id()
3850                        );
3851                    }
3852                }
3853                _ => {}
3854            }
3855        }
3856    }
3857
3858    fn update_limit_order(&mut self, order: &OrderAny, quantity: Quantity, price: Price) {
3859        if self
3860            .core
3861            .is_limit_matched(order.order_side_specified(), price)
3862        {
3863            if order.is_post_only() {
3864                self.generate_order_modify_rejected(
3865                    order.trader_id(),
3866                    order.strategy_id(),
3867                    order.instrument_id(),
3868                    order.client_order_id(),
3869                    Ustr::from(format!(
3870                        "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
3871                        order.order_type(),
3872                        order.order_side(),
3873                        price,
3874                        self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
3875                        self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
3876                    ).as_str()),
3877                    order.venue_order_id(),
3878                    order.account_id(),
3879                );
3880                return;
3881            }
3882
3883            self.generate_order_updated(order, quantity, Some(price), None, None);
3884
3885            // Re-read from cache to get the order with events applied
3886            let client_order_id = order.client_order_id();
3887            if let Some(cached) = self.cache.borrow_mut().mut_order(&client_order_id) {
3888                cached.set_liquidity_side(LiquiditySide::Taker);
3889            }
3890            self.fill_limit_order(client_order_id);
3891            return;
3892        }
3893        self.generate_order_updated(order, quantity, Some(price), None, None);
3894    }
3895
3896    fn update_stop_market_order(&self, order: &OrderAny, quantity: Quantity, trigger_price: Price) {
3897        if self
3898            .core
3899            .is_stop_matched(order.order_side_specified(), trigger_price)
3900        {
3901            self.generate_order_modify_rejected(
3902                order.trader_id(),
3903                order.strategy_id(),
3904                order.instrument_id(),
3905                order.client_order_id(),
3906                Ustr::from(
3907                    format!(
3908                        "{} {} order new stop px of {} was in the market: bid={}, ask={}",
3909                        order.order_type(),
3910                        order.order_side(),
3911                        trigger_price,
3912                        self.core
3913                            .bid
3914                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
3915                        self.core
3916                            .ask
3917                            .map_or_else(|| "None".to_string(), |p| p.to_string())
3918                    )
3919                    .as_str(),
3920                ),
3921                order.venue_order_id(),
3922                order.account_id(),
3923            );
3924            return;
3925        }
3926
3927        self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
3928    }
3929
3930    fn update_stop_limit_order(
3931        &mut self,
3932        order: &mut OrderAny,
3933        quantity: Quantity,
3934        price: Price,
3935        trigger_price: Price,
3936    ) {
3937        if order.is_triggered().is_some_and(|t| t) {
3938            // Update limit price
3939            if self
3940                .core
3941                .is_limit_matched(order.order_side_specified(), price)
3942            {
3943                if order.is_post_only() {
3944                    self.generate_order_modify_rejected(
3945                        order.trader_id(),
3946                        order.strategy_id(),
3947                        order.instrument_id(),
3948                        order.client_order_id(),
3949                        Ustr::from(format!(
3950                            "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
3951                            order.order_type(),
3952                            order.order_side(),
3953                            price,
3954                            self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
3955                            self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
3956                        ).as_str()),
3957                        order.venue_order_id(),
3958                        order.account_id(),
3959                    );
3960                    return;
3961                }
3962                self.generate_order_updated(order, quantity, Some(price), None, None);
3963                order.set_liquidity_side(LiquiditySide::Taker);
3964
3965                if let Err(e) = self
3966                    .cache
3967                    .borrow_mut()
3968                    .add_order(order.clone(), None, None, false)
3969                {
3970                    log::debug!("Order already in cache: {e}");
3971                }
3972                self.fill_limit_order(order.client_order_id());
3973                return; // Filled
3974            }
3975        } else {
3976            // Update stop price
3977            if self
3978                .core
3979                .is_stop_matched(order.order_side_specified(), trigger_price)
3980            {
3981                self.generate_order_modify_rejected(
3982                    order.trader_id(),
3983                    order.strategy_id(),
3984                    order.instrument_id(),
3985                    order.client_order_id(),
3986                    Ustr::from(
3987                        format!(
3988                            "{} {} order new stop px of {} was in the market: bid={}, ask={}",
3989                            order.order_type(),
3990                            order.order_side(),
3991                            trigger_price,
3992                            self.core
3993                                .bid
3994                                .map_or_else(|| "None".to_string(), |p| p.to_string()),
3995                            self.core
3996                                .ask
3997                                .map_or_else(|| "None".to_string(), |p| p.to_string())
3998                        )
3999                        .as_str(),
4000                    ),
4001                    order.venue_order_id(),
4002                    order.account_id(),
4003                );
4004                return;
4005            }
4006        }
4007
4008        self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
4009    }
4010
4011    fn update_market_if_touched_order(
4012        &self,
4013        order: &OrderAny,
4014        quantity: Quantity,
4015        trigger_price: Price,
4016    ) {
4017        if self
4018            .core
4019            .is_touch_triggered(order.order_side_specified(), trigger_price)
4020        {
4021            self.generate_order_modify_rejected(
4022                order.trader_id(),
4023                order.strategy_id(),
4024                order.instrument_id(),
4025                order.client_order_id(),
4026                Ustr::from(
4027                    format!(
4028                        "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
4029                        order.order_type(),
4030                        order.order_side(),
4031                        trigger_price,
4032                        self.core
4033                            .bid
4034                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
4035                        self.core
4036                            .ask
4037                            .map_or_else(|| "None".to_string(), |p| p.to_string())
4038                    )
4039                    .as_str(),
4040                ),
4041                order.venue_order_id(),
4042                order.account_id(),
4043            );
4044            // Cannot update order
4045            return;
4046        }
4047
4048        self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
4049    }
4050
4051    fn update_limit_if_touched_order(
4052        &mut self,
4053        order: &mut OrderAny,
4054        quantity: Quantity,
4055        price: Price,
4056        trigger_price: Price,
4057    ) {
4058        if order.is_triggered().is_some_and(|t| t) {
4059            // Update limit price
4060            if self
4061                .core
4062                .is_limit_matched(order.order_side_specified(), price)
4063            {
4064                if order.is_post_only() {
4065                    self.generate_order_modify_rejected(
4066                        order.trader_id(),
4067                        order.strategy_id(),
4068                        order.instrument_id(),
4069                        order.client_order_id(),
4070                        Ustr::from(format!(
4071                            "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
4072                            order.order_type(),
4073                            order.order_side(),
4074                            price,
4075                            self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
4076                            self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
4077                        ).as_str()),
4078                        order.venue_order_id(),
4079                        order.account_id(),
4080                    );
4081                    // Cannot update order
4082                    return;
4083                }
4084                self.generate_order_updated(order, quantity, Some(price), None, None);
4085                order.set_liquidity_side(LiquiditySide::Taker);
4086                self.fill_limit_order(order.client_order_id());
4087                return;
4088            }
4089        } else {
4090            // Update trigger price
4091            if self
4092                .core
4093                .is_touch_triggered(order.order_side_specified(), trigger_price)
4094            {
4095                self.generate_order_modify_rejected(
4096                    order.trader_id(),
4097                    order.strategy_id(),
4098                    order.instrument_id(),
4099                    order.client_order_id(),
4100                    Ustr::from(
4101                        format!(
4102                            "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
4103                            order.order_type(),
4104                            order.order_side(),
4105                            trigger_price,
4106                            self.core
4107                                .bid
4108                                .map_or_else(|| "None".to_string(), |p| p.to_string()),
4109                            self.core
4110                                .ask
4111                                .map_or_else(|| "None".to_string(), |p| p.to_string())
4112                        )
4113                        .as_str(),
4114                    ),
4115                    order.venue_order_id(),
4116                    order.account_id(),
4117                );
4118                return;
4119            }
4120        }
4121
4122        self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
4123    }
4124
4125    fn update_trailing_stop_order(&self, order: &OrderAny) {
4126        let (new_trigger_price, new_price) = trailing_stop_calculate(
4127            self.instrument.price_increment(),
4128            order.trigger_price(),
4129            order.activation_price(),
4130            order,
4131            self.core.bid,
4132            self.core.ask,
4133            self.core.last,
4134        )
4135        .unwrap();
4136
4137        if new_trigger_price.is_none() && new_price.is_none() {
4138            return;
4139        }
4140
4141        self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price, None);
4142    }
4143
4144    fn accept_order(&mut self, order: &mut OrderAny) {
4145        if order.is_closed() {
4146            // Temporary guard to prevent invalid processing
4147            return;
4148        }
4149
4150        if order.status() != OrderStatus::Accepted {
4151            let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
4152            self.generate_order_accepted(order, venue_order_id);
4153
4154            if matches!(
4155                order.order_type(),
4156                OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
4157            ) && order.trigger_price().is_none()
4158            {
4159                self.update_trailing_stop_order(order);
4160            }
4161        }
4162
4163        let match_info = OrderMatchInfo::new(
4164            order.client_order_id(),
4165            order.order_side().as_specified(),
4166            order.order_type(),
4167            order.trigger_price(),
4168            order.price(),
4169            match order {
4170                OrderAny::TrailingStopMarket(o) => o.is_activated,
4171                OrderAny::TrailingStopLimit(o) => o.is_activated,
4172                _ => true,
4173            },
4174        );
4175        self.core.add_order(match_info);
4176    }
4177
4178    fn expire_order(&mut self, order: &OrderAny) {
4179        if self.config.support_contingent_orders
4180            && order
4181                .contingency_type()
4182                .is_some_and(|c| c != ContingencyType::NoContingency)
4183        {
4184            self.cancel_contingent_orders(order);
4185        }
4186
4187        self.generate_order_expired(order);
4188    }
4189
4190    fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
4191        let cancel_contingencies = cancel_contingencies.unwrap_or(true);
4192
4193        if order.is_active_local() {
4194            log::error!(
4195                "Cannot cancel an order with {} from the matching engine",
4196                order.status()
4197            );
4198            return;
4199        }
4200
4201        // Check if order exists in OrderMatching core, and delete it if it does
4202        if self.core.order_exists(order.client_order_id()) {
4203            let _ = self.core.delete_order(order.client_order_id());
4204        }
4205        self.cached_filled_qty.swap_remove(&order.client_order_id());
4206
4207        let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
4208        self.generate_order_canceled(order, venue_order_id);
4209
4210        if self.config.support_contingent_orders
4211            && order.contingency_type().is_some()
4212            && order.contingency_type().unwrap() != ContingencyType::NoContingency
4213            && cancel_contingencies
4214        {
4215            self.cancel_contingent_orders(order);
4216        }
4217    }
4218
4219    fn update_order(
4220        &mut self,
4221        order: &mut OrderAny,
4222        quantity: Option<Quantity>,
4223        price: Option<Price>,
4224        trigger_price: Option<Price>,
4225        update_contingencies: Option<bool>,
4226    ) -> bool {
4227        let update_contingencies = update_contingencies.unwrap_or(true);
4228        let quantity = quantity.unwrap_or(order.quantity());
4229
4230        let price_prec = self.instrument.price_precision();
4231        let size_prec = self.instrument.size_precision();
4232        let instrument_id = self.instrument.id();
4233
4234        if quantity.precision != size_prec {
4235            self.generate_order_modify_rejected(
4236                order.trader_id(),
4237                order.strategy_id(),
4238                order.instrument_id(),
4239                order.client_order_id(),
4240                Ustr::from(&format!(
4241                    "Invalid update quantity precision {}, expected {size_prec} for {instrument_id}",
4242                    quantity.precision
4243                )),
4244                order.venue_order_id(),
4245                order.account_id(),
4246            );
4247            return false;
4248        }
4249
4250        if let Some(px) = price
4251            && px.precision != price_prec
4252        {
4253            self.generate_order_modify_rejected(
4254                order.trader_id(),
4255                order.strategy_id(),
4256                order.instrument_id(),
4257                order.client_order_id(),
4258                Ustr::from(&format!(
4259                    "Invalid update price precision {}, expected {price_prec} for {instrument_id}",
4260                    px.precision
4261                )),
4262                order.venue_order_id(),
4263                order.account_id(),
4264            );
4265            return false;
4266        }
4267
4268        if let Some(tp) = trigger_price
4269            && tp.precision != price_prec
4270        {
4271            self.generate_order_modify_rejected(
4272                order.trader_id(),
4273                order.strategy_id(),
4274                order.instrument_id(),
4275                order.client_order_id(),
4276                Ustr::from(&format!(
4277                    "Invalid update trigger_price precision {}, expected {price_prec} for {instrument_id}",
4278                    tp.precision
4279                )),
4280                order.venue_order_id(),
4281                order.account_id(),
4282            );
4283            return false;
4284        }
4285
4286        // Use cached_filled_qty since PassiveOrderAny in core is not updated with fills
4287        let filled_qty = self
4288            .cached_filled_qty
4289            .get(&order.client_order_id())
4290            .copied()
4291            .unwrap_or(order.filled_qty());
4292        if quantity < filled_qty {
4293            self.generate_order_modify_rejected(
4294                order.trader_id(),
4295                order.strategy_id(),
4296                order.instrument_id(),
4297                order.client_order_id(),
4298                Ustr::from(&format!(
4299                    "Cannot reduce order quantity {quantity} below filled quantity {filled_qty}",
4300                )),
4301                order.venue_order_id(),
4302                order.account_id(),
4303            );
4304            return false;
4305        }
4306
4307        match order {
4308            OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
4309                let price = price.unwrap_or(order.price().unwrap());
4310                self.update_limit_order(order, quantity, price);
4311            }
4312            OrderAny::StopMarket(_) => {
4313                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
4314                self.update_stop_market_order(order, quantity, trigger_price);
4315            }
4316            OrderAny::StopLimit(_) => {
4317                let price = price.unwrap_or(order.price().unwrap());
4318                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
4319                self.update_stop_limit_order(order, quantity, price, trigger_price);
4320            }
4321            OrderAny::MarketIfTouched(_) => {
4322                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
4323                self.update_market_if_touched_order(order, quantity, trigger_price);
4324            }
4325            OrderAny::LimitIfTouched(_) => {
4326                let price = price.unwrap_or(order.price().unwrap());
4327                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
4328                self.update_limit_if_touched_order(order, quantity, price, trigger_price);
4329            }
4330            OrderAny::TrailingStopMarket(_) => {
4331                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
4332                self.update_market_if_touched_order(order, quantity, trigger_price);
4333            }
4334            OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
4335                let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
4336                let trigger_price =
4337                    trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
4338                self.update_limit_if_touched_order(order, quantity, price, trigger_price);
4339            }
4340            _ => {
4341                panic!(
4342                    "Unsupported order type {} for update_order",
4343                    order.order_type()
4344                );
4345            }
4346        }
4347
4348        // If order now has zero leaves after update, cancel it
4349        let new_leaves_qty = quantity.saturating_sub(filled_qty);
4350        if new_leaves_qty.is_zero() {
4351            if self.config.support_contingent_orders
4352                && order
4353                    .contingency_type()
4354                    .is_some_and(|c| c != ContingencyType::NoContingency)
4355                && update_contingencies
4356            {
4357                self.update_contingent_order(order);
4358            }
4359            // Pass false since we already handled contingents above
4360            self.cancel_order(order, Some(false));
4361            return true;
4362        }
4363
4364        if self.config.support_contingent_orders
4365            && order
4366                .contingency_type()
4367                .is_some_and(|c| c != ContingencyType::NoContingency)
4368            && update_contingencies
4369        {
4370            self.update_contingent_order(order);
4371        }
4372
4373        true
4374    }
4375
4376    /// Triggers a stop order, converting it to an active market or limit order.
4377    pub fn trigger_stop_order(&mut self, client_order_id: ClientOrderId) {
4378        let order = match self.cache.borrow().order(&client_order_id).cloned() {
4379            Some(order) => order,
4380            None => {
4381                log::error!(
4382                    "Cannot trigger stop order: order {client_order_id} not found in cache"
4383                );
4384                return;
4385            }
4386        };
4387
4388        match order.order_type() {
4389            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
4390                self.fill_limit_order(client_order_id);
4391            }
4392            OrderType::StopMarket | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
4393                self.fill_market_order(client_order_id);
4394            }
4395            _ => {
4396                log::error!(
4397                    "Cannot trigger stop order: invalid order type {}",
4398                    order.order_type()
4399                );
4400            }
4401        }
4402    }
4403
4404    fn update_contingent_order(&mut self, order: &OrderAny) {
4405        log::debug!("Updating OUO orders from {}", order.client_order_id());
4406        if let Some(linked_order_ids) = order.linked_order_ids() {
4407            let parent_filled_qty = self
4408                .cached_filled_qty
4409                .get(&order.client_order_id())
4410                .copied()
4411                .unwrap_or(order.filled_qty());
4412            let parent_leaves_qty = order.quantity().saturating_sub(parent_filled_qty);
4413
4414            for client_order_id in linked_order_ids {
4415                let mut child_order = match self.cache.borrow().order(client_order_id) {
4416                    Some(order) => order.clone(),
4417                    None => panic!("Order {client_order_id} not found in cache."),
4418                };
4419
4420                if child_order.is_active_local() {
4421                    continue;
4422                }
4423
4424                let child_filled_qty = self
4425                    .cached_filled_qty
4426                    .get(&child_order.client_order_id())
4427                    .copied()
4428                    .unwrap_or(child_order.filled_qty());
4429
4430                if parent_leaves_qty.is_zero() {
4431                    self.cancel_order(&child_order, Some(false));
4432                } else if child_filled_qty >= parent_leaves_qty {
4433                    // Child already filled beyond parent's remaining qty, cancel it
4434                    self.cancel_order(&child_order, Some(false));
4435                } else {
4436                    let child_leaves_qty = child_order.quantity().saturating_sub(child_filled_qty);
4437                    if child_leaves_qty != parent_leaves_qty {
4438                        let price = child_order.price();
4439                        let trigger_price = child_order.trigger_price();
4440                        self.update_order(
4441                            &mut child_order,
4442                            Some(parent_leaves_qty),
4443                            price,
4444                            trigger_price,
4445                            Some(false),
4446                        );
4447                    }
4448                }
4449            }
4450        }
4451    }
4452
4453    fn cancel_contingent_orders(&mut self, order: &OrderAny) {
4454        if let Some(linked_order_ids) = order.linked_order_ids() {
4455            for client_order_id in linked_order_ids {
4456                let contingent_order = match self.cache.borrow().order(client_order_id) {
4457                    Some(order) => order.clone(),
4458                    None => panic!("Cannot find contingent order for {client_order_id}"),
4459                };
4460
4461                if contingent_order.is_active_local() {
4462                    // order is not on the exchange yet
4463                    continue;
4464                }
4465
4466                if !contingent_order.is_closed() {
4467                    self.cancel_order(&contingent_order, Some(false));
4468                }
4469            }
4470        }
4471    }
4472
4473    fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
4474        let ts_now = self.clock.borrow().timestamp_ns();
4475        let account_id = order
4476            .account_id()
4477            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
4478
4479        // Check if rejection is due to post-only
4480        let due_post_only = reason.as_str().starts_with("POST_ONLY");
4481
4482        let event = OrderEventAny::Rejected(OrderRejected::new(
4483            order.trader_id(),
4484            order.strategy_id(),
4485            order.instrument_id(),
4486            order.client_order_id(),
4487            account_id,
4488            reason,
4489            UUID4::new(),
4490            ts_now,
4491            ts_now,
4492            false,
4493            due_post_only,
4494        ));
4495        self.dispatch_order_event(event);
4496    }
4497
4498    fn generate_order_accepted(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
4499        let ts_now = self.clock.borrow().timestamp_ns();
4500        let account_id = order
4501            .account_id()
4502            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
4503        let event = OrderEventAny::Accepted(OrderAccepted::new(
4504            order.trader_id(),
4505            order.strategy_id(),
4506            order.instrument_id(),
4507            order.client_order_id(),
4508            venue_order_id,
4509            account_id,
4510            UUID4::new(),
4511            ts_now,
4512            ts_now,
4513            false,
4514        ));
4515
4516        self.dispatch_order_event(event);
4517    }
4518
4519    #[expect(clippy::too_many_arguments)]
4520    fn generate_order_modify_rejected(
4521        &self,
4522        trader_id: TraderId,
4523        strategy_id: StrategyId,
4524        instrument_id: InstrumentId,
4525        client_order_id: ClientOrderId,
4526        reason: Ustr,
4527        venue_order_id: Option<VenueOrderId>,
4528        account_id: Option<AccountId>,
4529    ) {
4530        let ts_now = self.clock.borrow().timestamp_ns();
4531        let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
4532            trader_id,
4533            strategy_id,
4534            instrument_id,
4535            client_order_id,
4536            reason,
4537            UUID4::new(),
4538            ts_now,
4539            ts_now,
4540            false,
4541            venue_order_id,
4542            account_id,
4543        ));
4544        self.dispatch_order_event(event);
4545    }
4546
4547    #[expect(clippy::too_many_arguments)]
4548    fn generate_order_cancel_rejected(
4549        &self,
4550        trader_id: TraderId,
4551        strategy_id: StrategyId,
4552        account_id: AccountId,
4553        instrument_id: InstrumentId,
4554        client_order_id: ClientOrderId,
4555        venue_order_id: Option<VenueOrderId>,
4556        reason: Ustr,
4557    ) {
4558        let ts_now = self.clock.borrow().timestamp_ns();
4559        let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
4560            trader_id,
4561            strategy_id,
4562            instrument_id,
4563            client_order_id,
4564            reason,
4565            UUID4::new(),
4566            ts_now,
4567            ts_now,
4568            false,
4569            venue_order_id,
4570            Some(account_id),
4571        ));
4572        self.dispatch_order_event(event);
4573    }
4574
4575    fn generate_order_updated(
4576        &self,
4577        order: &OrderAny,
4578        quantity: Quantity,
4579        price: Option<Price>,
4580        trigger_price: Option<Price>,
4581        protection_price: Option<Price>,
4582    ) {
4583        let ts_now = self.clock.borrow().timestamp_ns();
4584        let event = OrderEventAny::Updated(OrderUpdated::new(
4585            order.trader_id(),
4586            order.strategy_id(),
4587            order.instrument_id(),
4588            order.client_order_id(),
4589            quantity,
4590            UUID4::new(),
4591            ts_now,
4592            ts_now,
4593            false,
4594            order.venue_order_id(),
4595            order.account_id(),
4596            price,
4597            trigger_price,
4598            protection_price,
4599            order.is_quote_quantity(),
4600        ));
4601
4602        self.dispatch_order_event(event);
4603    }
4604
4605    fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
4606        let ts_now = self.clock.borrow().timestamp_ns();
4607        let event = OrderEventAny::Canceled(OrderCanceled::new(
4608            order.trader_id(),
4609            order.strategy_id(),
4610            order.instrument_id(),
4611            order.client_order_id(),
4612            UUID4::new(),
4613            ts_now,
4614            ts_now,
4615            false,
4616            Some(venue_order_id),
4617            order.account_id(),
4618        ));
4619        self.dispatch_order_event(event);
4620    }
4621
4622    fn generate_order_triggered(&self, order: &OrderAny) {
4623        let ts_now = self.clock.borrow().timestamp_ns();
4624        let event = OrderEventAny::Triggered(OrderTriggered::new(
4625            order.trader_id(),
4626            order.strategy_id(),
4627            order.instrument_id(),
4628            order.client_order_id(),
4629            UUID4::new(),
4630            ts_now,
4631            ts_now,
4632            false,
4633            order.venue_order_id(),
4634            order.account_id(),
4635        ));
4636        self.dispatch_order_event(event);
4637    }
4638
4639    fn generate_order_expired(&self, order: &OrderAny) {
4640        let ts_now = self.clock.borrow().timestamp_ns();
4641        let event = OrderEventAny::Expired(OrderExpired::new(
4642            order.trader_id(),
4643            order.strategy_id(),
4644            order.instrument_id(),
4645            order.client_order_id(),
4646            UUID4::new(),
4647            ts_now,
4648            ts_now,
4649            false,
4650            order.venue_order_id(),
4651            order.account_id(),
4652        ));
4653        self.dispatch_order_event(event);
4654    }
4655
4656    #[expect(clippy::too_many_arguments)]
4657    fn generate_order_filled(
4658        &mut self,
4659        order: &OrderAny,
4660        venue_order_id: VenueOrderId,
4661        venue_position_id: Option<PositionId>,
4662        last_qty: Quantity,
4663        last_px: Price,
4664        quote_currency: Currency,
4665        commission: Money,
4666        liquidity_side: LiquiditySide,
4667    ) {
4668        debug_assert!(
4669            last_qty <= order.quantity(),
4670            "Fill quantity {last_qty} exceeds order quantity {order_qty} for {client_order_id}",
4671            order_qty = order.quantity(),
4672            client_order_id = order.client_order_id()
4673        );
4674
4675        let ts_now = self.clock.borrow().timestamp_ns();
4676        let account_id = order
4677            .account_id()
4678            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
4679        let event = OrderEventAny::Filled(OrderFilled::new(
4680            order.trader_id(),
4681            order.strategy_id(),
4682            order.instrument_id(),
4683            order.client_order_id(),
4684            venue_order_id,
4685            account_id,
4686            self.ids_generator.generate_trade_id(ts_now),
4687            order.order_side(),
4688            order.order_type(),
4689            last_qty,
4690            last_px,
4691            quote_currency,
4692            liquidity_side,
4693            UUID4::new(),
4694            ts_now,
4695            ts_now,
4696            false,
4697            venue_position_id,
4698            Some(commission),
4699        ));
4700
4701        self.dispatch_order_event(event);
4702    }
4703}