1use std::{
17 cell::RefCell,
18 cmp::min,
19 fmt::Debug,
20 ops::{Add, Sub},
21 rc::Rc,
22};
23
24use chrono::TimeDelta;
25use indexmap::IndexMap;
26use nautilus_common::{
27 cache::Cache,
28 clock::Clock,
29 messages::execution::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
30 msgbus::{self, MessagingSwitchboard},
31};
32use nautilus_core::{UUID4, UnixNanos};
33use nautilus_model::{
34 data::{
35 Bar, BarType, InstrumentClose, OrderBookDelta, OrderBookDeltas, OrderBookDepth10,
36 QuoteTick, TradeTick, order::BookOrder,
37 },
38 enums::{
39 AccountType, AggregationSource, AggressorSide, BookAction, BookType, ContingencyType,
40 InstrumentCloseType, LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OrderSide,
41 OrderSideSpecified, OrderStatus, OrderType, PositionSide, PriceType, TimeInForce,
42 TriggerType,
43 },
44 events::{
45 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
46 OrderFilled, OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
47 },
48 identifiers::{
49 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId, Venue,
50 VenueOrderId,
51 },
52 instruments::{Instrument, InstrumentAny},
53 orderbook::OrderBook,
54 orders::{MarketOrder, Order, OrderAny, OrderCore},
55 position::Position,
56 types::{
57 Currency, Money, Price, Quantity, fixed::FIXED_PRECISION, price::PriceRaw,
58 quantity::QuantityRaw,
59 },
60};
61use ustr::Ustr;
62
63use crate::{
64 matching_core::{MatchAction, OrderMatchInfo, OrderMatchingCore},
65 matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
66 models::{
67 fee::{FeeModel, FeeModelAny},
68 fill::{FillModel, FillModelAny},
69 },
70 protection::protection_price_calculate,
71 trailing::trailing_stop_calculate,
72};
73
74pub struct OrderMatchingEngine {
76 pub venue: Venue,
78 pub instrument: InstrumentAny,
80 pub raw_id: u32,
82 pub book_type: BookType,
84 pub oms_type: OmsType,
86 pub account_type: AccountType,
88 pub market_status: MarketStatus,
90 pub config: OrderMatchingEngineConfig,
92 core: OrderMatchingCore,
93 clock: Rc<RefCell<dyn Clock>>,
94 cache: Rc<RefCell<Cache>>,
95 book: OrderBook,
96 fill_model: FillModelAny,
97 fee_model: FeeModelAny,
98 event_handler: Option<Rc<dyn Fn(OrderEventAny)>>,
99 target_bid: Option<Price>,
100 target_ask: Option<Price>,
101 target_last: Option<Price>,
102 last_bar_bid: Option<Bar>,
103 last_bar_ask: Option<Bar>,
104 fill_at_market: bool,
105 execution_bar_types: IndexMap<InstrumentId, BarType>,
106 execution_bar_deltas: IndexMap<BarType, TimeDelta>,
107 account_ids: IndexMap<TraderId, AccountId>,
108 cached_filled_qty: IndexMap<ClientOrderId, Quantity>,
109 ids_generator: IdsGenerator,
110 last_trade_size: Option<Quantity>,
111 bid_consumption: IndexMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
112 ask_consumption: IndexMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
113 trade_consumption: QuantityRaw,
114 queue_ahead: IndexMap<ClientOrderId, (PriceRaw, QuantityRaw)>,
115 queue_excess: IndexMap<ClientOrderId, QuantityRaw>,
116 queue_pending: IndexMap<ClientOrderId, PriceRaw>,
117 prev_bid_price_raw: PriceRaw,
118 prev_bid_size_raw: QuantityRaw,
119 prev_ask_price_raw: PriceRaw,
120 prev_ask_size_raw: QuantityRaw,
121 last_quote_bid: Option<Price>,
122 last_quote_ask: Option<Price>,
123 tob_initialized: bool,
124 instrument_close: Option<InstrumentClose>,
125 settlement_price: Option<Price>,
126 expiration_processed: bool,
127}
128
129impl Debug for OrderMatchingEngine {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 f.debug_struct(stringify!(OrderMatchingEngine))
132 .field("venue", &self.venue)
133 .field("instrument", &self.instrument.id())
134 .finish()
135 }
136}
137
138impl OrderMatchingEngine {
139 #[expect(clippy::too_many_arguments)]
141 pub fn new(
142 instrument: InstrumentAny,
143 raw_id: u32,
144 fill_model: FillModelAny,
145 fee_model: FeeModelAny,
146 book_type: BookType,
147 oms_type: OmsType,
148 account_type: AccountType,
149 clock: Rc<RefCell<dyn Clock>>,
150 cache: Rc<RefCell<Cache>>,
151 config: OrderMatchingEngineConfig,
152 ) -> Self {
153 let book = OrderBook::new(instrument.id(), book_type);
154 let mut core = OrderMatchingCore::new(instrument.id(), instrument.price_increment());
155 core.set_fill_limit_inside_spread(fill_model.fill_limit_inside_spread());
156 let ids_generator = IdsGenerator::new(
157 instrument.id().venue,
158 oms_type,
159 raw_id,
160 config.use_random_ids,
161 config.use_position_ids,
162 cache.clone(),
163 );
164
165 Self {
166 venue: instrument.id().venue,
167 instrument,
168 raw_id,
169 fill_model,
170 fee_model,
171 event_handler: None,
172 book_type,
173 oms_type,
174 account_type,
175 clock,
176 cache,
177 book,
178 market_status: MarketStatus::Open,
179 config,
180 core,
181 target_bid: None,
182 target_ask: None,
183 target_last: None,
184 last_bar_bid: None,
185 last_bar_ask: None,
186 fill_at_market: true,
187 execution_bar_types: IndexMap::new(),
188 execution_bar_deltas: IndexMap::new(),
189 account_ids: IndexMap::new(),
190 cached_filled_qty: IndexMap::new(),
191 ids_generator,
192 last_trade_size: None,
193 bid_consumption: IndexMap::new(),
194 ask_consumption: IndexMap::new(),
195 trade_consumption: 0,
196 queue_ahead: IndexMap::new(),
197 queue_excess: IndexMap::new(),
198 queue_pending: IndexMap::new(),
199 prev_bid_price_raw: 0,
200 prev_bid_size_raw: 0,
201 prev_ask_price_raw: 0,
202 prev_ask_size_raw: 0,
203 last_quote_bid: None,
204 last_quote_ask: None,
205 tob_initialized: false,
206 instrument_close: None,
207 settlement_price: None,
208 expiration_processed: false,
209 }
210 }
211
212 pub fn set_event_handler(&mut self, handler: Rc<dyn Fn(OrderEventAny)>) {
219 self.event_handler = Some(handler);
220 }
221
222 fn dispatch_order_event(&self, event: OrderEventAny) {
223 if let Some(handler) = &self.event_handler {
224 handler(event);
225 } else {
226 let endpoint = MessagingSwitchboard::exec_engine_process();
227 msgbus::send_order_event(endpoint, event);
228 }
229 }
230
231 pub fn reset(&mut self) {
237 self.book.reset();
238 self.execution_bar_types.clear();
239 self.execution_bar_deltas.clear();
240 self.account_ids.clear();
241 self.cached_filled_qty.clear();
242 self.core.reset();
243 self.target_bid = None;
244 self.target_ask = None;
245 self.target_last = None;
246 self.last_trade_size = None;
247 self.bid_consumption.clear();
248 self.ask_consumption.clear();
249 self.trade_consumption = 0;
250 self.queue_ahead.clear();
251 self.queue_excess.clear();
252 self.queue_pending.clear();
253 self.prev_bid_price_raw = 0;
254 self.prev_bid_size_raw = 0;
255 self.prev_ask_price_raw = 0;
256 self.prev_ask_size_raw = 0;
257 self.last_quote_bid = None;
258 self.last_quote_ask = None;
259 self.tob_initialized = false;
260 self.instrument_close = None;
261 self.settlement_price = None;
262 self.expiration_processed = false;
263 self.fill_at_market = true;
264 self.ids_generator.reset();
265
266 log::info!("Reset {}", self.instrument.id());
267 }
268
269 fn apply_liquidity_consumption(
270 &mut self,
271 fills: Vec<(Price, Quantity)>,
272 order_side: OrderSide,
273 leaves_qty: Quantity,
274 book_prices: Option<&[Price]>,
275 ) -> Vec<(Price, Quantity)> {
276 if !self.config.liquidity_consumption {
277 return fills;
278 }
279
280 let consumption = match order_side {
281 OrderSide::Buy => &mut self.ask_consumption,
282 OrderSide::Sell => &mut self.bid_consumption,
283 _ => return fills,
284 };
285
286 let mut adjusted_fills = Vec::with_capacity(fills.len());
287 let mut remaining_qty = leaves_qty.raw;
288
289 for (fill_idx, (price, qty)) in fills.into_iter().enumerate() {
290 if remaining_qty == 0 {
291 break;
292 }
293
294 let book_price = book_prices
297 .and_then(|bp| bp.get(fill_idx).copied())
298 .unwrap_or(price);
299
300 let book_price_raw = book_price.raw;
301 let level_size = self
302 .book
303 .get_quantity_at_level(book_price, order_side, qty.precision);
304
305 let (original_size, consumed) = consumption
306 .entry(book_price_raw)
307 .or_insert((level_size.raw, 0));
308
309 if *original_size != level_size.raw {
311 *original_size = level_size.raw;
312 *consumed = 0;
313 }
314
315 let available = original_size.saturating_sub(*consumed);
316 if available == 0 {
317 continue;
318 }
319
320 let adjusted_qty_raw = min(min(qty.raw, available), remaining_qty);
321 if adjusted_qty_raw == 0 {
322 continue;
323 }
324
325 *consumed += adjusted_qty_raw;
326 remaining_qty -= adjusted_qty_raw;
327
328 let adjusted_qty = Quantity::from_raw(adjusted_qty_raw, qty.precision);
329 adjusted_fills.push((price, adjusted_qty));
330 }
331
332 adjusted_fills
333 }
334
335 fn seed_trade_consumption(
336 &mut self,
337 trade_price_raw: PriceRaw,
338 trade_size_raw: QuantityRaw,
339 trade_ts_event: UnixNanos,
340 aggressor_side: AggressorSide,
341 ) {
342 if trade_size_raw == 0 {
343 return;
344 }
345
346 if self.book.ts_last > trade_ts_event {
349 return;
350 }
351
352 let consumption = match aggressor_side {
353 AggressorSide::Buyer => &mut self.ask_consumption,
354 AggressorSide::Seller => &mut self.bid_consumption,
355 AggressorSide::NoAggressor => return,
356 };
357
358 let levels: Vec<_> = match aggressor_side {
359 AggressorSide::Buyer => self
360 .book
361 .asks(None)
362 .take_while(|l| l.price.value.raw <= trade_price_raw)
363 .collect(),
364 AggressorSide::Seller => self
365 .book
366 .bids(None)
367 .take_while(|l| l.price.value.raw >= trade_price_raw)
368 .collect(),
369 _ => unreachable!(),
370 };
371
372 let mut remaining = trade_size_raw;
373 for level in &levels {
374 if remaining == 0 {
375 break;
376 }
377 let level_size = level.size_raw();
378 let entry = consumption
379 .entry(level.price.value.raw)
380 .or_insert((level_size, 0));
381
382 if entry.0 != level_size {
384 entry.0 = level_size;
385 entry.1 = 0;
386 }
387
388 let available = level_size.saturating_sub(entry.1);
389 let consume = min(remaining, available);
390 entry.1 += consume;
391 remaining -= consume;
392 }
393 }
394
395 pub fn set_fill_model(&mut self, fill_model: FillModelAny) {
397 self.core
398 .set_fill_limit_inside_spread(fill_model.fill_limit_inside_spread());
399 self.fill_model = fill_model;
400 }
401
402 pub fn set_settlement_price(&mut self, price: Price) {
403 self.settlement_price = Some(price);
404 }
405
406 fn snapshot_queue_position(&mut self, order: &OrderAny, price: Price) {
407 if !self.config.queue_position {
408 return;
409 }
410 let size_prec = self.instrument.size_precision();
411
412 let qty_ahead = self.book.get_quantity_at_level(
415 price,
416 OrderCore::opposite_side(order.order_side()),
417 size_prec,
418 );
419
420 let client_order_id = order.client_order_id();
421
422 self.queue_pending.shift_remove(&client_order_id);
424 self.queue_ahead.shift_remove(&client_order_id);
425
426 if self.book_type == BookType::L1_MBP && qty_ahead.raw == 0 {
431 let behind_bbo = match order.order_side() {
432 OrderSide::Buy => self.book.best_bid_price().is_some_and(|bid| price < bid),
433 OrderSide::Sell => self.book.best_ask_price().is_some_and(|ask| price > ask),
434 _ => false,
435 };
436
437 if behind_bbo {
438 self.queue_pending.insert(client_order_id, price.raw);
439 return;
440 }
441 }
442
443 self.queue_ahead
444 .insert(client_order_id, (price.raw, qty_ahead.raw));
445 }
446
447 fn decrement_queue_on_trade(
448 &mut self,
449 price_raw: PriceRaw,
450 trade_size_raw: QuantityRaw,
451 aggressor_side: AggressorSide,
452 ) {
453 if !self.config.queue_position {
454 return;
455 }
456
457 self.queue_excess.clear();
458
459 let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
460 let mut entries: Vec<(ClientOrderId, QuantityRaw, QuantityRaw)> = Vec::new();
461 let mut stale: Vec<ClientOrderId> = Vec::new();
462
463 for client_order_id in keys {
464 let (order_price_raw, ahead_raw) = match self.queue_ahead.get(&client_order_id).copied()
465 {
466 Some(v) => v,
467 None => continue,
468 };
469
470 let cache = self.cache.borrow();
471 let order_info = cache.order(&client_order_id).and_then(|order| {
472 if order.is_closed() {
473 None
474 } else {
475 Some((order.order_side(), order.leaves_qty().raw))
476 }
477 });
478 drop(cache);
479
480 let Some((order_side, leaves_raw)) = order_info else {
481 stale.push(client_order_id);
482 continue;
483 };
484
485 if order_price_raw != price_raw || ahead_raw == 0 {
486 continue;
487 }
488
489 let should_decrement = matches!(aggressor_side, AggressorSide::NoAggressor)
490 || (aggressor_side == AggressorSide::Buyer && order_side == OrderSide::Sell)
491 || (aggressor_side == AggressorSide::Seller && order_side == OrderSide::Buy);
492
493 if should_decrement {
494 entries.push((client_order_id, ahead_raw, leaves_raw));
495 }
496 }
497
498 for id in stale {
499 self.queue_ahead.shift_remove(&id);
500 }
501
502 entries.sort_by_key(|&(_, ahead, _)| ahead);
504
505 let mut remaining = trade_size_raw;
506 let mut prev_position: QuantityRaw = 0;
507
508 for (client_order_id, ahead_raw, leaves_raw) in &entries {
509 if remaining == 0 {
510 let new_ahead = ahead_raw.saturating_sub(trade_size_raw);
511 self.queue_ahead
512 .insert(*client_order_id, (price_raw, new_ahead));
513 if new_ahead == 0 {
514 self.queue_excess.insert(*client_order_id, 0);
516 }
517 continue;
518 }
519
520 let gap = ahead_raw.saturating_sub(prev_position);
522 let queue_consumed = remaining.min(gap);
523 remaining -= queue_consumed;
524
525 if remaining == 0 && queue_consumed < gap {
526 let new_ahead = ahead_raw.saturating_sub(trade_size_raw);
527 self.queue_ahead
528 .insert(*client_order_id, (price_raw, new_ahead));
529 continue;
530 }
531
532 self.queue_ahead.insert(*client_order_id, (price_raw, 0));
533 let excess = remaining.min(*leaves_raw);
534 self.queue_excess.insert(*client_order_id, excess);
535 remaining -= excess;
536 prev_position = ahead_raw + excess;
537 }
538 }
539
540 fn determine_trade_fill_qty(&self, order: &OrderAny) -> Option<QuantityRaw> {
541 if !self.config.queue_position {
542 return Some(order.leaves_qty().raw);
543 }
544
545 let client_order_id = order.client_order_id();
546
547 if self.queue_pending.contains_key(&client_order_id) {
549 return None;
550 }
551
552 if let Some(&(tracked_price_raw, ahead_raw)) = self.queue_ahead.get(&client_order_id)
553 && let Some(order_price) = order.price()
554 && order_price.raw == tracked_price_raw
555 && ahead_raw > 0
556 {
557 let crossed = self.last_trade_size.is_some()
559 && self
560 .core
561 .last
562 .is_some_and(|trade_price| match order.order_side() {
563 OrderSide::Buy => trade_price.raw < order_price.raw,
564 OrderSide::Sell => trade_price.raw > order_price.raw,
565 _ => false,
566 });
567
568 if !crossed {
569 return None;
570 }
571 }
572
573 let leaves_raw = order.leaves_qty().raw;
574 if leaves_raw == 0 {
575 return None;
576 }
577
578 let mut available_raw = leaves_raw;
579
580 if let Some(trade_size) = self.last_trade_size {
582 let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
583 available_raw = available_raw.min(remaining);
584
585 if let Some(&excess_raw) = self.queue_excess.get(&client_order_id) {
586 if excess_raw == 0 {
587 return None;
588 }
589 available_raw = available_raw.min(excess_raw);
590 }
591 }
592
593 if available_raw == 0 {
594 return None;
595 }
596
597 Some(available_raw)
598 }
599
600 fn clear_all_queue_positions(&mut self) {
601 for (_, (_, ahead_raw)) in &mut self.queue_ahead {
602 *ahead_raw = 0;
603 }
604 }
605
606 fn clear_queue_on_delete(&mut self, deleted_price_raw: PriceRaw, deleted_side: OrderSide) {
607 let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
608 for client_order_id in keys {
609 if let Some(&(order_price_raw, _)) = self.queue_ahead.get(&client_order_id)
610 && order_price_raw == deleted_price_raw
611 {
612 let cache = self.cache.borrow();
613 if let Some(order) = cache.order(&client_order_id)
614 && order.order_side() == deleted_side
615 {
616 drop(cache);
617 self.queue_ahead
618 .insert(client_order_id, (order_price_raw, 0));
619 }
620 }
621 }
622 }
623
624 fn cap_queue_ahead(
625 &mut self,
626 price_raw: PriceRaw,
627 size_raw: QuantityRaw,
628 order_side: OrderSide,
629 ) {
630 let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
631 let mut stale: Vec<ClientOrderId> = Vec::new();
632
633 for client_order_id in keys {
634 let (order_price_raw, ahead_raw) = match self.queue_ahead.get(&client_order_id).copied()
635 {
636 Some(v) => v,
637 None => continue,
638 };
639
640 if order_price_raw != price_raw || ahead_raw <= size_raw {
641 continue;
642 }
643
644 let cache = self.cache.borrow();
645 let order_info = cache.order(&client_order_id).and_then(|order| {
646 if order.is_closed() {
647 None
648 } else {
649 Some(order.order_side())
650 }
651 });
652 drop(cache);
653
654 let Some(side) = order_info else {
655 stale.push(client_order_id);
656 continue;
657 };
658
659 if side != order_side {
660 continue;
661 }
662
663 self.queue_ahead
664 .insert(client_order_id, (order_price_raw, size_raw));
665 }
666
667 for id in stale {
668 self.queue_ahead.shift_remove(&id);
669 }
670 }
671
672 fn seed_tob_baseline(&mut self) {
673 let bid = self.book.best_bid_price();
674 let ask = self.book.best_ask_price();
675 self.prev_bid_price_raw = bid.map_or(0, |p| p.raw);
676 self.prev_bid_size_raw = self.book.best_bid_size().map_or(0, |q| q.raw);
677 self.prev_ask_price_raw = ask.map_or(0, |p| p.raw);
678 self.prev_ask_size_raw = self.book.best_ask_size().map_or(0, |q| q.raw);
679 self.tob_initialized = bid.is_some() || ask.is_some();
680 }
681
682 fn decrement_l1_queue_on_quote(
683 &mut self,
684 bid_price_raw: PriceRaw,
685 bid_size_raw: QuantityRaw,
686 ask_price_raw: PriceRaw,
687 ask_size_raw: QuantityRaw,
688 ) {
689 if !self.config.queue_position {
690 return;
691 }
692
693 if self.tob_initialized {
695 if bid_price_raw < self.prev_bid_price_raw {
697 self.adjust_l1_queue_on_price_move(bid_price_raw, bid_size_raw, OrderSide::Buy);
698 }
699
700 if ask_price_raw > self.prev_ask_price_raw {
702 self.adjust_l1_queue_on_price_move(ask_price_raw, ask_size_raw, OrderSide::Sell);
703 }
704 }
705
706 self.resolve_pending_l1_snapshots(bid_price_raw, bid_size_raw, ask_price_raw, ask_size_raw);
708 }
709
710 fn adjust_l1_queue_on_price_move(
711 &mut self,
712 new_price_raw: PriceRaw,
713 new_size_raw: QuantityRaw,
714 order_side: OrderSide,
715 ) {
716 let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
717 let mut stale: Vec<ClientOrderId> = Vec::new();
718
719 for client_order_id in keys {
720 let Some(&(order_price_raw, ahead_raw)) = self.queue_ahead.get(&client_order_id) else {
721 continue;
722 };
723
724 let cache = self.cache.borrow();
725 let order_info = cache.order(&client_order_id).and_then(|order| {
726 if order.is_closed() {
727 None
728 } else {
729 Some(order.order_side())
730 }
731 });
732 drop(cache);
733
734 let Some(side) = order_info else {
735 stale.push(client_order_id);
736 continue;
737 };
738
739 if side != order_side {
740 continue;
741 }
742
743 let crossed = match order_side {
746 OrderSide::Buy => order_price_raw > new_price_raw,
747 _ => order_price_raw < new_price_raw,
748 };
749
750 if crossed {
751 self.queue_ahead
752 .insert(client_order_id, (order_price_raw, 0));
753 } else if order_price_raw == new_price_raw && ahead_raw > new_size_raw {
754 self.queue_ahead
755 .insert(client_order_id, (order_price_raw, new_size_raw));
756 }
757 }
758
759 for id in stale {
760 self.queue_ahead.shift_remove(&id);
761 }
762
763 let pending_keys: Vec<ClientOrderId> = self.queue_pending.keys().copied().collect();
765 let mut pending_stale: Vec<ClientOrderId> = Vec::new();
766
767 for client_order_id in pending_keys {
768 let Some(&order_price_raw) = self.queue_pending.get(&client_order_id) else {
769 continue;
770 };
771
772 let cache = self.cache.borrow();
773 let order_info = cache.order(&client_order_id).and_then(|order| {
774 if order.is_closed() {
775 None
776 } else {
777 Some(order.order_side())
778 }
779 });
780 drop(cache);
781
782 let Some(side) = order_info else {
783 pending_stale.push(client_order_id);
784 continue;
785 };
786
787 if side != order_side {
788 continue;
789 }
790
791 let crossed = match order_side {
792 OrderSide::Buy => order_price_raw > new_price_raw,
793 _ => order_price_raw < new_price_raw,
794 };
795
796 if crossed {
797 self.queue_pending.shift_remove(&client_order_id);
798 self.queue_ahead
799 .insert(client_order_id, (order_price_raw, 0));
800 } else if order_price_raw == new_price_raw {
801 self.queue_pending.shift_remove(&client_order_id);
802 self.queue_ahead
803 .insert(client_order_id, (order_price_raw, new_size_raw));
804 }
805 }
806
807 for id in pending_stale {
808 self.queue_pending.shift_remove(&id);
809 }
810 }
811
812 fn resolve_pending_l1_snapshots(
813 &mut self,
814 bid_price_raw: PriceRaw,
815 bid_size_raw: QuantityRaw,
816 ask_price_raw: PriceRaw,
817 ask_size_raw: QuantityRaw,
818 ) {
819 let keys: Vec<ClientOrderId> = self.queue_pending.keys().copied().collect();
820 let mut stale: Vec<ClientOrderId> = Vec::new();
821
822 for client_order_id in keys {
823 let Some(&order_price_raw) = self.queue_pending.get(&client_order_id) else {
824 continue;
825 };
826
827 let cache = self.cache.borrow();
828 let order_info = cache.order(&client_order_id).and_then(|order| {
829 if order.is_closed() {
830 None
831 } else {
832 Some(order.order_side())
833 }
834 });
835 drop(cache);
836
837 let Some(side) = order_info else {
838 stale.push(client_order_id);
839 continue;
840 };
841
842 let matched_size = match side {
844 OrderSide::Buy if order_price_raw == bid_price_raw => Some(bid_size_raw),
845 OrderSide::Sell if order_price_raw == ask_price_raw => Some(ask_size_raw),
846 _ => None,
847 };
848
849 if let Some(size) = matched_size {
850 self.queue_pending.shift_remove(&client_order_id);
851 self.queue_ahead
852 .insert(client_order_id, (order_price_raw, size));
853 }
854 }
855
856 for id in stale {
857 self.queue_pending.shift_remove(&id);
858 }
859 }
860
861 fn resolve_pending_on_trade(&mut self, trade_price_raw: PriceRaw) {
862 let keys: Vec<ClientOrderId> = self.queue_pending.keys().copied().collect();
863 let mut stale: Vec<ClientOrderId> = Vec::new();
864
865 for client_order_id in keys {
866 let Some(&order_price_raw) = self.queue_pending.get(&client_order_id) else {
867 continue;
868 };
869
870 let cache = self.cache.borrow();
871 let order_side = cache.order(&client_order_id).and_then(|order| {
872 if order.is_closed() {
873 None
874 } else {
875 Some(order.order_side())
876 }
877 });
878 drop(cache);
879
880 let Some(side) = order_side else {
881 stale.push(client_order_id);
882 continue;
883 };
884
885 let crossed = match side {
887 OrderSide::Buy => trade_price_raw < order_price_raw,
888 OrderSide::Sell => trade_price_raw > order_price_raw,
889 _ => false,
890 };
891
892 if crossed {
893 self.queue_pending.shift_remove(&client_order_id);
894 self.queue_ahead
895 .insert(client_order_id, (order_price_raw, 0));
896 }
897 }
898
899 for id in stale {
900 self.queue_pending.shift_remove(&id);
901 }
902 }
903
904 #[must_use]
905 pub fn best_bid_price(&self) -> Option<Price> {
907 self.book.best_bid_price()
908 }
909
910 #[must_use]
911 pub fn best_ask_price(&self) -> Option<Price> {
913 self.book.best_ask_price()
914 }
915
916 #[must_use]
917 pub const fn get_book(&self) -> &OrderBook {
919 &self.book
920 }
921
922 #[must_use]
923 pub const fn get_open_bid_orders(&self) -> &[OrderMatchInfo] {
925 self.core.get_orders_bid()
926 }
927
928 #[must_use]
929 pub const fn get_open_ask_orders(&self) -> &[OrderMatchInfo] {
931 self.core.get_orders_ask()
932 }
933
934 #[must_use]
935 pub fn get_open_orders(&self) -> Vec<OrderMatchInfo> {
937 let mut orders = Vec::new();
938 orders.extend_from_slice(self.core.get_orders_bid());
939 orders.extend_from_slice(self.core.get_orders_ask());
940 orders
941 }
942
943 #[must_use]
944 pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
946 self.core.order_exists(client_order_id)
947 }
948
949 #[must_use]
950 pub const fn get_core(&self) -> &OrderMatchingCore {
951 &self.core
952 }
953
954 pub fn set_fill_at_market(&mut self, value: bool) {
955 self.fill_at_market = value;
956 }
957
958 fn check_price_precision(&self, actual: u8, field: &str) -> anyhow::Result<()> {
959 let expected = self.instrument.price_precision();
960 if actual != expected {
961 anyhow::bail!(
962 "Invalid {field} precision {actual}, expected {expected} for {}",
963 self.instrument.id()
964 );
965 }
966 Ok(())
967 }
968
969 fn check_size_precision(&self, actual: u8, field: &str) -> anyhow::Result<()> {
970 let expected = self.instrument.size_precision();
971 if actual != expected {
972 anyhow::bail!(
973 "Invalid {field} precision {actual}, expected {expected} for {}",
974 self.instrument.id()
975 );
976 }
977 Ok(())
978 }
979
980 pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) -> anyhow::Result<()> {
988 log::debug!("Processing {delta}");
989
990 if matches!(delta.action, BookAction::Add | BookAction::Update) {
992 self.check_price_precision(delta.order.price.precision, "delta order price")?;
993 self.check_size_precision(delta.order.size.precision, "delta order size")?;
994 }
995
996 if self.book_type == BookType::L1_MBP {
998 self.iterate(delta.ts_init, AggressorSide::NoAggressor);
999 return Ok(());
1000 }
1001
1002 self.book.apply_delta(delta)?;
1003
1004 let delta_snapshot_or_clear = (delta.flags & 32) != 0 || delta.action == BookAction::Clear;
1005
1006 if self.config.queue_position {
1007 if delta_snapshot_or_clear {
1008 self.clear_all_queue_positions();
1009 } else if delta.action == BookAction::Delete {
1010 self.clear_queue_on_delete(delta.order.price.raw, delta.order.side);
1011 } else if delta.action == BookAction::Update {
1012 self.cap_queue_ahead(
1013 delta.order.price.raw,
1014 delta.order.size.raw,
1015 delta.order.side,
1016 );
1017 }
1018 }
1019
1020 if self.config.queue_position && delta_snapshot_or_clear {
1021 self.seed_tob_baseline();
1022 }
1023
1024 self.iterate(delta.ts_init, AggressorSide::NoAggressor);
1025 Ok(())
1026 }
1027
1028 pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
1036 log::debug!("Processing {deltas}");
1037
1038 for delta in &deltas.deltas {
1040 if matches!(delta.action, BookAction::Add | BookAction::Update) {
1041 self.check_price_precision(delta.order.price.precision, "delta order price")?;
1042 self.check_size_precision(delta.order.size.precision, "delta order size")?;
1043 }
1044 }
1045
1046 if self.book_type == BookType::L1_MBP {
1048 self.iterate(deltas.ts_init, AggressorSide::NoAggressor);
1049 return Ok(());
1050 }
1051
1052 self.book.apply_deltas(deltas)?;
1053
1054 let mut has_snapshot_or_clear = false;
1055
1056 if self.config.queue_position {
1057 for delta in &deltas.deltas {
1058 if (delta.flags & 32) != 0 || delta.action == BookAction::Clear {
1059 self.clear_all_queue_positions();
1060 has_snapshot_or_clear = true;
1061 break;
1062 } else if delta.action == BookAction::Delete {
1063 self.clear_queue_on_delete(delta.order.price.raw, delta.order.side);
1064 } else if delta.action == BookAction::Update {
1065 self.cap_queue_ahead(
1066 delta.order.price.raw,
1067 delta.order.size.raw,
1068 delta.order.side,
1069 );
1070 }
1071 }
1072 }
1073
1074 if self.config.queue_position && has_snapshot_or_clear {
1075 self.seed_tob_baseline();
1076 }
1077
1078 self.iterate(deltas.ts_init, AggressorSide::NoAggressor);
1079 Ok(())
1080 }
1081
1082 pub fn process_order_book_depth10(&mut self, depth: &OrderBookDepth10) -> anyhow::Result<()> {
1091 log::debug!("Processing OrderBookDepth10 for {}", depth.instrument_id);
1092
1093 for order in &depth.bids {
1095 if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
1096 continue;
1097 }
1098 self.check_price_precision(order.price.precision, "bid price")?;
1099 self.check_size_precision(order.size.precision, "bid size")?;
1100 }
1101
1102 for order in &depth.asks {
1103 if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
1104 continue;
1105 }
1106 self.check_price_precision(order.price.precision, "ask price")?;
1107 self.check_size_precision(order.size.precision, "ask size")?;
1108 }
1109
1110 if self.book_type == BookType::L1_MBP {
1113 let quote = QuoteTick::new(
1114 depth.instrument_id,
1115 depth.bids[0].price,
1116 depth.asks[0].price,
1117 depth.bids[0].size,
1118 depth.asks[0].size,
1119 depth.ts_event,
1120 depth.ts_init,
1121 );
1122 self.book.update_quote_tick("e)?;
1123 self.last_quote_bid = Some(depth.bids[0].price);
1124 self.last_quote_ask = Some(depth.asks[0].price);
1125 } else {
1126 self.book.apply_depth(depth)?;
1127 }
1128
1129 if self.config.queue_position {
1131 self.clear_all_queue_positions();
1132 let bid_price_raw = depth.bids[0].price.raw;
1133 let bid_size_raw = depth.bids[0].size.raw;
1134 let ask_price_raw = depth.asks[0].price.raw;
1135 let ask_size_raw = depth.asks[0].size.raw;
1136
1137 if self.tob_initialized {
1139 if bid_price_raw < self.prev_bid_price_raw {
1140 self.adjust_l1_queue_on_price_move(bid_price_raw, bid_size_raw, OrderSide::Buy);
1141 }
1142
1143 if ask_price_raw > self.prev_ask_price_raw {
1144 self.adjust_l1_queue_on_price_move(
1145 ask_price_raw,
1146 ask_size_raw,
1147 OrderSide::Sell,
1148 );
1149 }
1150 }
1151
1152 self.resolve_pending_l1_snapshots(
1153 bid_price_raw,
1154 bid_size_raw,
1155 ask_price_raw,
1156 ask_size_raw,
1157 );
1158
1159 self.prev_bid_price_raw = bid_price_raw;
1160 self.prev_bid_size_raw = bid_size_raw;
1161 self.prev_ask_price_raw = ask_price_raw;
1162 self.prev_ask_size_raw = ask_size_raw;
1163 self.tob_initialized = true;
1164 }
1165
1166 self.iterate(depth.ts_init, AggressorSide::NoAggressor);
1167 Ok(())
1168 }
1169
1170 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
1178 log::debug!("Processing {quote}");
1179
1180 self.check_price_precision(quote.bid_price.precision, "bid_price")
1181 .unwrap();
1182 self.check_price_precision(quote.ask_price.precision, "ask_price")
1183 .unwrap();
1184 self.check_size_precision(quote.bid_size.precision, "bid_size")
1185 .unwrap();
1186 self.check_size_precision(quote.ask_size.precision, "ask_size")
1187 .unwrap();
1188
1189 if self.book_type == BookType::L1_MBP {
1190 if quote.ts_event < self.book.ts_last {
1192 log::warn!(
1193 "Skipping stale quote: ts_event {} < book.ts_last {} for {}",
1194 quote.ts_event,
1195 self.book.ts_last,
1196 self.book.instrument_id,
1197 );
1198 self.iterate(quote.ts_init, AggressorSide::NoAggressor);
1199 return;
1200 }
1201
1202 if self.config.queue_position {
1203 self.decrement_l1_queue_on_quote(
1204 quote.bid_price.raw,
1205 quote.bid_size.raw,
1206 quote.ask_price.raw,
1207 quote.ask_size.raw,
1208 );
1209 self.prev_bid_price_raw = quote.bid_price.raw;
1210 self.prev_bid_size_raw = quote.bid_size.raw;
1211 self.prev_ask_price_raw = quote.ask_price.raw;
1212 self.prev_ask_size_raw = quote.ask_size.raw;
1213 self.tob_initialized = true;
1214 }
1215 self.book.update_quote_tick(quote).unwrap();
1216 self.last_quote_bid = Some(quote.bid_price);
1217 self.last_quote_ask = Some(quote.ask_price);
1218 }
1219
1220 self.iterate(quote.ts_init, AggressorSide::NoAggressor);
1221 }
1222
1223 pub fn process_bar(&mut self, bar: &Bar) {
1234 log::debug!("Processing {bar}");
1235
1236 if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
1238 return;
1239 }
1240
1241 let bar_type = bar.bar_type;
1242 if bar_type.aggregation_source() == AggregationSource::Internal {
1244 return;
1245 }
1246
1247 self.check_price_precision(bar.open.precision, "bar open")
1248 .unwrap();
1249 self.check_price_precision(bar.high.precision, "bar high")
1250 .unwrap();
1251 self.check_price_precision(bar.low.precision, "bar low")
1252 .unwrap();
1253 self.check_price_precision(bar.close.precision, "bar close")
1254 .unwrap();
1255 self.check_size_precision(bar.volume.precision, "bar volume")
1256 .unwrap();
1257
1258 let execution_bar_type =
1259 if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
1260 execution_bar_type.to_owned()
1261 } else {
1262 self.execution_bar_types
1263 .insert(bar.instrument_id(), bar_type);
1264 self.execution_bar_deltas
1265 .insert(bar_type, bar_type.spec().timedelta());
1266 bar_type
1267 };
1268
1269 if execution_bar_type != bar_type {
1270 let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
1271 if bar_type_timedelta.is_none() {
1272 bar_type_timedelta = Some(bar_type.spec().timedelta());
1273 self.execution_bar_deltas
1274 .insert(bar_type, bar_type_timedelta.unwrap());
1275 }
1276
1277 if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
1278 >= &bar_type_timedelta.unwrap()
1279 {
1280 self.execution_bar_types
1281 .insert(bar_type.instrument_id(), bar_type);
1282 } else {
1283 return;
1284 }
1285 }
1286
1287 match bar_type.spec().price_type {
1288 PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
1289 PriceType::Bid => {
1290 self.last_bar_bid = Some(bar.to_owned());
1291 self.process_quote_ticks_from_bar(bar);
1292 }
1293 PriceType::Ask => {
1294 self.last_bar_ask = Some(bar.to_owned());
1295 self.process_quote_ticks_from_bar(bar);
1296 }
1297 PriceType::Mark => panic!("Not implemented"),
1298 }
1299 }
1300
1301 fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
1302 let quarter_raw = bar.volume.raw / 4;
1304 let remainder_raw = bar.volume.raw % 4;
1305 let size = Quantity::from_raw(quarter_raw, bar.volume.precision);
1306 let close_size = Quantity::from_raw(quarter_raw + remainder_raw, bar.volume.precision);
1307
1308 let aggressor_side = if !self.core.is_last_initialized || bar.open > self.core.last.unwrap()
1309 {
1310 AggressorSide::Buyer
1311 } else {
1312 AggressorSide::Seller
1313 };
1314
1315 let mut trade_tick = TradeTick::new(
1317 bar.instrument_id(),
1318 bar.open,
1319 size,
1320 aggressor_side,
1321 self.ids_generator.generate_trade_id(bar.ts_init),
1322 bar.ts_init,
1323 bar.ts_init,
1324 );
1325
1326 if !self.core.is_last_initialized {
1328 self.fill_at_market = true;
1329 self.book.update_trade_tick(&trade_tick).unwrap();
1330 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
1331 self.core.set_last_raw(trade_tick.price);
1332 } else if self.core.last.is_some_and(|last| bar.open != last) {
1333 self.fill_at_market = true;
1335 self.book.update_trade_tick(&trade_tick).unwrap();
1336 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
1337 self.core.set_last_raw(trade_tick.price);
1338 }
1339
1340 let high_first = !self.config.bar_adaptive_high_low_ordering
1343 || (bar.high.raw - bar.open.raw).abs() < (bar.low.raw - bar.open.raw).abs();
1344
1345 if high_first {
1346 self.process_bar_high(&mut trade_tick, bar);
1347 self.process_bar_low(&mut trade_tick, bar);
1348 } else {
1349 self.process_bar_low(&mut trade_tick, bar);
1350 self.process_bar_high(&mut trade_tick, bar);
1351 }
1352
1353 if self.core.last.is_some_and(|last| bar.close != last) {
1355 self.fill_at_market = false;
1356 trade_tick.price = bar.close;
1357 trade_tick.size = close_size;
1358
1359 if bar.close > self.core.last.unwrap() {
1360 trade_tick.aggressor_side = AggressorSide::Buyer;
1361 } else {
1362 trade_tick.aggressor_side = AggressorSide::Seller;
1363 }
1364 trade_tick.trade_id = self.ids_generator.generate_trade_id(trade_tick.ts_init);
1365
1366 self.book.update_trade_tick(&trade_tick).unwrap();
1367 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
1368
1369 self.core.set_last_raw(trade_tick.price);
1370 }
1371
1372 self.fill_at_market = true;
1373 }
1374
1375 fn process_bar_high(&mut self, trade_tick: &mut TradeTick, bar: &Bar) {
1376 if self.core.last.is_some_and(|last| bar.high > last) {
1377 self.fill_at_market = false;
1378 trade_tick.price = bar.high;
1379 trade_tick.aggressor_side = AggressorSide::Buyer;
1380 trade_tick.trade_id = self.ids_generator.generate_trade_id(trade_tick.ts_init);
1381
1382 self.book.update_trade_tick(trade_tick).unwrap();
1383 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
1384
1385 self.core.set_last_raw(trade_tick.price);
1386 }
1387 }
1388
1389 fn process_bar_low(&mut self, trade_tick: &mut TradeTick, bar: &Bar) {
1390 if self.core.last.is_some_and(|last| bar.low < last) {
1391 self.fill_at_market = false;
1392 trade_tick.price = bar.low;
1393 trade_tick.aggressor_side = AggressorSide::Seller;
1394 trade_tick.trade_id = self.ids_generator.generate_trade_id(trade_tick.ts_init);
1395
1396 self.book.update_trade_tick(trade_tick).unwrap();
1397 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
1398
1399 self.core.set_last_raw(trade_tick.price);
1400 }
1401 }
1402
1403 fn process_quote_ticks_from_bar(&mut self, bar: &Bar) {
1404 if self.last_bar_bid.is_none()
1406 || self.last_bar_ask.is_none()
1407 || self.last_bar_bid.unwrap().ts_init != self.last_bar_ask.unwrap().ts_init
1408 {
1409 return;
1410 }
1411 let bid_bar = self.last_bar_bid.unwrap();
1412 let ask_bar = self.last_bar_ask.unwrap();
1413
1414 let bid_quarter = bid_bar.volume.raw / 4;
1416 let bid_remainder = bid_bar.volume.raw % 4;
1417 let ask_quarter = ask_bar.volume.raw / 4;
1418 let ask_remainder = ask_bar.volume.raw % 4;
1419
1420 let bid_size = Quantity::from_raw(bid_quarter, bar.volume.precision);
1421 let ask_size = Quantity::from_raw(ask_quarter, bar.volume.precision);
1422 let bid_close_size = Quantity::from_raw(bid_quarter + bid_remainder, bar.volume.precision);
1423 let ask_close_size = Quantity::from_raw(ask_quarter + ask_remainder, bar.volume.precision);
1424
1425 let mut quote_tick = QuoteTick::new(
1427 self.book.instrument_id,
1428 bid_bar.open,
1429 ask_bar.open,
1430 bid_size,
1431 ask_size,
1432 bid_bar.ts_init,
1433 bid_bar.ts_init,
1434 );
1435
1436 self.fill_at_market = true;
1438 self.book.update_quote_tick("e_tick).unwrap();
1439 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
1440
1441 self.fill_at_market = false;
1443 quote_tick.bid_price = bid_bar.high;
1444 quote_tick.ask_price = ask_bar.high;
1445 self.book.update_quote_tick("e_tick).unwrap();
1446 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
1447
1448 self.fill_at_market = false;
1450 quote_tick.bid_price = bid_bar.low;
1451 quote_tick.ask_price = ask_bar.low;
1452 self.book.update_quote_tick("e_tick).unwrap();
1453 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
1454
1455 self.fill_at_market = false;
1457 quote_tick.bid_price = bid_bar.close;
1458 quote_tick.ask_price = ask_bar.close;
1459 quote_tick.bid_size = bid_close_size;
1460 quote_tick.ask_size = ask_close_size;
1461 self.book.update_quote_tick("e_tick).unwrap();
1462 self.last_quote_bid = Some(bid_bar.close);
1463 self.last_quote_ask = Some(ask_bar.close);
1464 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
1465
1466 self.last_bar_bid = None;
1467 self.last_bar_ask = None;
1468 self.fill_at_market = true;
1469 }
1470
1471 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
1484 log::debug!("Processing {trade}");
1485
1486 self.check_price_precision(trade.price.precision, "trade price")
1487 .unwrap();
1488 self.check_size_precision(trade.size.precision, "trade size")
1489 .unwrap();
1490
1491 let price_raw = trade.price.raw;
1492
1493 if self.book_type == BookType::L1_MBP {
1494 if trade.ts_event < self.book.ts_last {
1496 log::warn!(
1497 "Skipping stale trade: ts_event {} < book.ts_last {} for {}",
1498 trade.ts_event,
1499 self.book.ts_last,
1500 self.book.instrument_id,
1501 );
1502 self.iterate(trade.ts_init, AggressorSide::NoAggressor);
1503 return;
1504 }
1505
1506 self.book.update_trade_tick(trade).unwrap();
1507 }
1508
1509 self.core.set_last_raw(trade.price);
1510
1511 if !self.config.trade_execution {
1512 if self.book_type == BookType::L1_MBP {
1514 if let Some(bid) = self.book.best_bid_price() {
1515 self.core.set_bid_raw(bid);
1516 }
1517
1518 if let Some(ask) = self.book.best_ask_price() {
1519 self.core.set_ask_raw(ask);
1520 }
1521 }
1522 return;
1523 }
1524
1525 let aggressor_side = trade.aggressor_side;
1526
1527 match aggressor_side {
1528 AggressorSide::Buyer => {
1529 if self.core.ask.is_none() || price_raw > self.core.ask.map_or(0, |p| p.raw) {
1532 self.core.set_ask_raw(trade.price);
1533 }
1534
1535 if self.core.bid.is_none() {
1537 self.core.set_bid_raw(trade.price);
1538 }
1539 }
1540 AggressorSide::Seller => {
1541 if self.core.bid.is_none()
1544 || price_raw < self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1545 {
1546 self.core.set_bid_raw(trade.price);
1547 }
1548
1549 if self.core.ask.is_none() {
1551 self.core.set_ask_raw(trade.price);
1552 }
1553 }
1554 AggressorSide::NoAggressor => {
1555 if self.core.bid.is_none()
1556 || price_raw <= self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1557 {
1558 self.core.set_bid_raw(trade.price);
1559 }
1560
1561 if self.core.ask.is_none() || price_raw >= self.core.ask.map_or(0, |p| p.raw) {
1562 self.core.set_ask_raw(trade.price);
1563 }
1564 }
1565 }
1566
1567 let original_bid = self.core.bid;
1568 let original_ask = self.core.ask;
1569
1570 match aggressor_side {
1571 AggressorSide::Seller => {
1572 if original_ask.is_some_and(|ask| price_raw < ask.raw) {
1573 self.core.set_ask_raw(trade.price);
1574 }
1575 }
1576 AggressorSide::Buyer => {
1577 if original_bid.is_some_and(|bid| price_raw > bid.raw) {
1578 self.core.set_bid_raw(trade.price);
1579 }
1580 }
1581 AggressorSide::NoAggressor => {
1582 self.core.set_bid_raw(trade.price);
1584 self.core.set_ask_raw(trade.price);
1585 }
1586 }
1587
1588 self.last_trade_size = Some(trade.size);
1589 self.trade_consumption = 0;
1590
1591 if self.config.liquidity_consumption && self.book_type != BookType::L1_MBP {
1592 self.seed_trade_consumption(price_raw, trade.size.raw, trade.ts_event, aggressor_side);
1593 }
1594
1595 self.resolve_pending_on_trade(price_raw);
1596 self.decrement_queue_on_trade(price_raw, trade.size.raw, aggressor_side);
1597
1598 self.iterate(trade.ts_init, aggressor_side);
1599
1600 self.last_trade_size = None;
1601 self.trade_consumption = 0;
1602
1603 if self.book_type == BookType::L1_MBP {
1609 match aggressor_side {
1610 AggressorSide::Seller => {
1611 if let Some(ask) = self.last_quote_ask {
1612 self.core.ask = Some(ask);
1613 }
1614 }
1615 AggressorSide::Buyer => {
1616 if let Some(bid) = self.last_quote_bid {
1617 self.core.bid = Some(bid);
1618 }
1619 }
1620 AggressorSide::NoAggressor => {}
1621 }
1622 } else {
1623 match aggressor_side {
1624 AggressorSide::Seller => {
1625 if let Some(ask) = original_ask
1626 && price_raw < ask.raw
1627 {
1628 self.core.ask = Some(ask);
1629 }
1630 }
1631 AggressorSide::Buyer => {
1632 if let Some(bid) = original_bid
1633 && price_raw > bid.raw
1634 {
1635 self.core.bid = Some(bid);
1636 }
1637 }
1638 AggressorSide::NoAggressor => {}
1639 }
1640 }
1641 }
1642
1643 pub fn process_status(&mut self, action: MarketStatusAction) {
1645 log::debug!("Processing {action}");
1646
1647 if self.market_status == MarketStatus::Closed
1649 && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
1650 {
1651 self.market_status = MarketStatus::Open;
1652 }
1653 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
1655 self.market_status = MarketStatus::Paused;
1656 }
1657 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
1659 self.market_status = MarketStatus::Suspended;
1660 }
1661 if self.market_status == MarketStatus::Open
1663 && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
1664 {
1665 self.market_status = MarketStatus::Closed;
1666 }
1667 }
1668
1669 pub fn process_instrument_close(&mut self, close: InstrumentClose) {
1674 if close.instrument_id != self.instrument.id() {
1675 log::warn!(
1676 "Received instrument close for unknown instrument_id: {}",
1677 close.instrument_id
1678 );
1679 return;
1680 }
1681
1682 if close.close_type == InstrumentCloseType::ContractExpired {
1683 self.instrument_close = Some(close);
1684 self.iterate(close.ts_init, AggressorSide::NoAggressor);
1685 }
1686 }
1687
1688 fn check_instrument_expiration(&mut self) {
1689 if self.expiration_processed || self.instrument_close.is_none() {
1690 return;
1691 }
1692
1693 self.expiration_processed = true;
1694 let close = self.instrument_close.take().unwrap();
1695 log::info!("{} reached expiration", self.instrument.id());
1696
1697 let open_orders: Vec<OrderMatchInfo> = self.get_open_orders();
1698 for order_info in &open_orders {
1699 let order = {
1700 let cache = self.cache.borrow();
1701 cache.order(&order_info.client_order_id).cloned()
1702 };
1703
1704 if let Some(order) = order {
1705 self.cancel_order(&order, None);
1706 }
1707 }
1708
1709 let instrument_id = self.instrument.id();
1710 let positions: Vec<(TraderId, StrategyId, PositionId, OrderSide, Quantity)> = {
1711 let cache = self.cache.borrow();
1712 cache
1713 .positions_open(None, Some(&instrument_id), None, None, None)
1714 .into_iter()
1715 .map(|pos| {
1716 let closing_side = match pos.side {
1717 PositionSide::Long => OrderSide::Sell,
1718 PositionSide::Short => OrderSide::Buy,
1719 _ => OrderSide::NoOrderSide,
1720 };
1721 (
1722 pos.trader_id,
1723 pos.strategy_id,
1724 pos.id,
1725 closing_side,
1726 pos.quantity,
1727 )
1728 })
1729 .collect()
1730 };
1731
1732 let ts_now = self.clock.borrow().timestamp_ns();
1733
1734 for (trader_id, strategy_id, position_id, closing_side, quantity) in positions {
1735 let client_order_id =
1736 ClientOrderId::from(format!("EXPIRATION-{}-{}", self.venue, UUID4::new()).as_str());
1737 let order = OrderAny::Market(MarketOrder::new(
1738 trader_id,
1739 strategy_id,
1740 instrument_id,
1741 client_order_id,
1742 closing_side,
1743 quantity,
1744 TimeInForce::Gtc,
1745 UUID4::new(),
1746 ts_now,
1747 true, false,
1749 None,
1750 None,
1751 None,
1752 None,
1753 None,
1754 None,
1755 None,
1756 Some(vec![Ustr::from(&format!(
1757 "EXPIRATION_{}_CLOSE",
1758 self.venue
1759 ))]),
1760 ));
1761
1762 if self
1763 .cache
1764 .borrow_mut()
1765 .add_order(order.clone(), Some(position_id), None, false)
1766 .is_err()
1767 {
1768 log::debug!("Expiration order already in cache: {client_order_id}");
1769 }
1770
1771 let venue_order_id = self.ids_generator.get_venue_order_id(&order).unwrap();
1772 self.generate_order_accepted(&order, venue_order_id);
1773 let fill_price = self.settlement_price.unwrap_or(close.close_price);
1774 self.apply_fills(
1775 &order,
1776 &[(fill_price, quantity)],
1777 LiquiditySide::Taker,
1778 Some(position_id),
1779 None,
1780 );
1781 }
1782 }
1783
1784 pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
1793 let reject_reason: Option<Ustr> = 'validate: {
1798 let cache_borrow = self.cache.as_ref().borrow();
1799
1800 if self.core.order_exists(order.client_order_id()) {
1801 break 'validate Some("Order already exists".into());
1802 }
1803
1804 self.account_ids.insert(order.trader_id(), account_id);
1806
1807 if self.instrument.has_expiration() {
1809 if let Some(activation_ns) = self.instrument.activation_ns()
1810 && self.clock.borrow().timestamp_ns() < activation_ns
1811 {
1812 break 'validate Some(
1813 format!(
1814 "Contract {} is not yet active, activation {activation_ns}",
1815 self.instrument.id(),
1816 )
1817 .into(),
1818 );
1819 }
1820
1821 if let Some(expiration_ns) = self.instrument.expiration_ns()
1822 && self.clock.borrow().timestamp_ns() >= expiration_ns
1823 {
1824 break 'validate Some(
1825 format!(
1826 "Contract {} has expired, expiration {expiration_ns}",
1827 self.instrument.id(),
1828 )
1829 .into(),
1830 );
1831 }
1832 }
1833
1834 if self.config.support_contingent_orders {
1836 if let Some(parent_order_id) = order.parent_order_id() {
1837 let parent_order = cache_borrow.order(&parent_order_id);
1838 if parent_order.is_none()
1839 || parent_order.unwrap().contingency_type().unwrap() != ContingencyType::Oto
1840 {
1841 panic!("OTO parent not found");
1842 }
1843
1844 if let Some(parent_order) = parent_order {
1845 if parent_order.status() == OrderStatus::Rejected && order.is_open() {
1846 break 'validate Some(
1847 format!("Rejected OTO order from {parent_order_id}").into(),
1848 );
1849 } else if parent_order.status() == OrderStatus::Accepted
1850 || parent_order.status() == OrderStatus::Triggered
1851 || (self.config.oto_full_trigger
1852 && parent_order.status() == OrderStatus::PartiallyFilled)
1853 {
1854 log::info!(
1855 "Pending OTO order {} triggers from {parent_order_id}",
1856 order.client_order_id(),
1857 );
1858 return;
1859 }
1860 }
1861 }
1862
1863 if let Some(linked_order_ids) = order.linked_order_ids() {
1864 for client_order_id in linked_order_ids {
1865 match cache_borrow.order(client_order_id) {
1866 Some(contingent_order)
1867 if (order.contingency_type().unwrap() == ContingencyType::Oco
1868 || order.contingency_type().unwrap()
1869 == ContingencyType::Ouo)
1870 && !order.is_closed()
1871 && contingent_order.is_closed() =>
1872 {
1873 break 'validate Some(
1874 format!("Contingent order {client_order_id} already closed")
1875 .into(),
1876 );
1877 }
1878 None => panic!("Cannot find contingent order for {client_order_id}"),
1879 _ => {}
1880 }
1881 }
1882 }
1883 }
1884
1885 if order.quantity().precision != self.instrument.size_precision() {
1887 break 'validate Some(
1888 format!(
1889 "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
1890 order.client_order_id(),
1891 order.quantity().precision,
1892 self.instrument.id(),
1893 self.instrument.size_precision()
1894 )
1895 .into(),
1896 );
1897 }
1898
1899 if let Some(price) = order.price()
1901 && price.precision != self.instrument.price_precision()
1902 {
1903 break 'validate Some(
1904 format!(
1905 "Invalid order price precision for order {}, was {} when {} price precision is {}",
1906 order.client_order_id(),
1907 price.precision,
1908 self.instrument.id(),
1909 self.instrument.price_precision()
1910 )
1911 .into(),
1912 );
1913 }
1914
1915 if let Some(trigger_price) = order.trigger_price()
1917 && trigger_price.precision != self.instrument.price_precision()
1918 {
1919 break 'validate Some(
1920 format!(
1921 "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
1922 order.client_order_id(),
1923 trigger_price.precision,
1924 self.instrument.id(),
1925 self.instrument.price_precision()
1926 )
1927 .into(),
1928 );
1929 }
1930
1931 let position: Option<&Position> = cache_borrow
1933 .position_for_order(&order.client_order_id())
1934 .or_else(|| {
1935 if self.oms_type == OmsType::Netting {
1936 let position_id = PositionId::new(
1937 format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
1938 );
1939 cache_borrow.position(&position_id)
1940 } else {
1941 None
1942 }
1943 });
1944
1945 if order.order_side() == OrderSide::Sell
1947 && self.account_type != AccountType::Margin
1948 && matches!(self.instrument, InstrumentAny::Equity(_))
1949 && (position.is_none()
1950 || !order.would_reduce_only(position.unwrap().side, position.unwrap().quantity))
1951 {
1952 let position_string = position.map_or("None".to_string(), |pos| pos.id.to_string());
1953 break 'validate Some(
1954 format!(
1955 "Short selling not permitted on a CASH account with position {position_string} and order {order}",
1956 )
1957 .into(),
1958 );
1959 }
1960
1961 if self.config.use_reduce_only
1963 && order.is_reduce_only()
1964 && !order.is_closed()
1965 && position.is_none_or(|pos| {
1966 pos.is_closed()
1967 || (order.is_buy() && pos.is_long())
1968 || (order.is_sell() && pos.is_short())
1969 })
1970 {
1971 break 'validate Some(
1972 format!(
1973 "Reduce-only order {} ({}-{}) would have increased position",
1974 order.client_order_id(),
1975 order.order_type().to_string().to_uppercase(),
1976 order.order_side().to_string().to_uppercase()
1977 )
1978 .into(),
1979 );
1980 }
1981
1982 None
1983 };
1984
1985 if let Some(reason) = reject_reason {
1986 self.generate_order_rejected(order, reason);
1987 return;
1988 }
1989
1990 if order.is_quote_quantity()
1998 && !self.instrument.is_inverse()
1999 && !matches!(
2000 order.order_type(),
2001 OrderType::TrailingStopLimit | OrderType::TrailingStopMarket,
2002 )
2003 && (order.price().is_some()
2004 || matches!(
2005 order.order_type(),
2006 OrderType::Market | OrderType::MarketToLimit,
2007 ))
2008 && !self.convert_quote_to_base_quantity(order)
2009 {
2010 return;
2011 }
2012
2013 match order.order_type() {
2014 OrderType::Market => self.process_market_order(order),
2015 OrderType::Limit => self.process_limit_order(order),
2016 OrderType::MarketToLimit => self.process_market_to_limit_order(order),
2017 OrderType::StopMarket => self.process_stop_market_order(order),
2018 OrderType::StopLimit => self.process_stop_limit_order(order),
2019 OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
2020 OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
2021 OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
2022 OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
2023 }
2024 }
2025
2026 fn convert_quote_to_base_quantity(&self, order: &mut OrderAny) -> bool {
2027 let reference_price = if let Some(price) = order.price() {
2031 Some(price)
2032 } else {
2033 match order.order_side() {
2034 OrderSide::Buy => self.core.ask,
2035 OrderSide::Sell => self.core.bid,
2036 OrderSide::NoOrderSide => None,
2037 }
2038 };
2039
2040 let Some(reference_price) = reference_price else {
2041 self.generate_order_rejected(
2042 order,
2043 format!(
2044 "No market for {} to convert quote quantity to base",
2045 order.instrument_id(),
2046 )
2047 .into(),
2048 );
2049 return false;
2050 };
2051
2052 let base_quantity = self
2053 .instrument
2054 .calculate_base_quantity(order.quantity(), reference_price);
2055
2056 let ts_now = self.clock.borrow().timestamp_ns();
2057 let event = OrderEventAny::Updated(OrderUpdated::new(
2058 order.trader_id(),
2059 order.strategy_id(),
2060 order.instrument_id(),
2061 order.client_order_id(),
2062 base_quantity,
2063 UUID4::new(),
2064 ts_now,
2065 ts_now,
2066 false,
2067 order.venue_order_id(),
2068 order.account_id(),
2069 None,
2070 None,
2071 None,
2072 false,
2073 ));
2074
2075 if let Err(e) = order.apply(event.clone()) {
2079 log::error!(
2080 "Failed to apply quote-to-base update for {}: {e}",
2081 order.client_order_id(),
2082 );
2083 return false;
2084 }
2085 self.dispatch_order_event(event);
2086 true
2087 }
2088
2089 pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
2091 if !self.core.order_exists(command.client_order_id) {
2092 self.generate_order_modify_rejected(
2093 command.trader_id,
2094 command.strategy_id,
2095 command.instrument_id,
2096 command.client_order_id,
2097 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
2098 command.venue_order_id,
2099 Some(account_id),
2100 );
2101 return;
2102 }
2103
2104 let mut order = match self.cache.borrow().order(&command.client_order_id).cloned() {
2105 Some(order) => order,
2106 None => {
2107 log::error!(
2108 "Cannot modify order: order {} not found in cache",
2109 command.client_order_id
2110 );
2111 return;
2112 }
2113 };
2114
2115 let update_success = self.update_order(
2116 &mut order,
2117 command.quantity,
2118 command.price,
2119 command.trigger_price,
2120 None,
2121 );
2122
2123 if update_success && order.is_open() {
2125 let _ = self.core.delete_order(command.client_order_id);
2126 let match_info = OrderMatchInfo::new(
2127 order.client_order_id(),
2128 order.order_side().as_specified(),
2129 order.order_type(),
2130 order.trigger_price(),
2131 order.price(),
2132 true,
2133 );
2134 self.core.add_order(match_info);
2135
2136 if self.config.queue_position
2137 && let Some(new_price) = order.price()
2138 {
2139 self.snapshot_queue_position(&order, new_price);
2140 self.queue_excess.swap_remove(&order.client_order_id());
2141 }
2142 }
2143 }
2144
2145 pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
2147 if !self.core.order_exists(command.client_order_id) {
2148 self.generate_order_cancel_rejected(
2149 command.trader_id,
2150 command.strategy_id,
2151 account_id,
2152 command.instrument_id,
2153 command.client_order_id,
2154 command.venue_order_id,
2155 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
2156 );
2157 return;
2158 }
2159
2160 let order = match self.cache.borrow().order(&command.client_order_id).cloned() {
2161 Some(order) => order,
2162 None => {
2163 log::error!(
2164 "Cannot cancel order: order {} not found in cache",
2165 command.client_order_id
2166 );
2167 return;
2168 }
2169 };
2170
2171 if !order.is_inflight() && !order.is_open() {
2172 self.purge_stale_core_entry(command.client_order_id);
2173 return;
2174 }
2175
2176 self.cancel_order(&order, None);
2177 }
2178
2179 pub fn process_cancel_all(&mut self, command: &CancelAllOrders, _account_id: AccountId) {
2181 let instrument_id = command.instrument_id;
2182 let order_side = if command.order_side == OrderSide::NoOrderSide {
2183 None
2184 } else {
2185 Some(command.order_side)
2186 };
2187
2188 let client_order_ids: Vec<ClientOrderId> = self
2189 .cache
2190 .borrow()
2191 .orders_open(None, Some(&instrument_id), None, None, order_side)
2192 .iter()
2193 .map(|o| o.client_order_id())
2194 .collect();
2195
2196 for client_order_id in client_order_ids {
2197 let order = match self.cache.borrow().order(&client_order_id).cloned() {
2198 Some(order) => order,
2199 None => continue,
2200 };
2201
2202 if !order.is_inflight() && !order.is_open() {
2203 self.purge_stale_core_entry(client_order_id);
2204 continue;
2205 }
2206
2207 self.cancel_order(&order, None);
2208 }
2209 }
2210
2211 fn purge_stale_core_entry(&mut self, client_order_id: ClientOrderId) {
2214 if self.core.order_exists(client_order_id) {
2215 let _ = self.core.delete_order(client_order_id);
2216 }
2217 self.cached_filled_qty.swap_remove(&client_order_id);
2218 }
2219
2220 pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
2222 for order in &command.cancels {
2223 self.process_cancel(order, account_id);
2224 }
2225 }
2226
2227 fn process_market_order(&mut self, order: &OrderAny) {
2228 if order.time_in_force() == TimeInForce::AtTheOpen
2229 || order.time_in_force() == TimeInForce::AtTheClose
2230 {
2231 log::error!(
2232 "Market auction for the time in force {} is currently not supported",
2233 order.time_in_force()
2234 );
2235 return;
2236 }
2237
2238 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
2240 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
2241 {
2242 self.generate_order_rejected(
2243 order,
2244 format!("No market for {}", order.instrument_id()).into(),
2245 );
2246 return;
2247 }
2248
2249 if self.config.use_market_order_acks {
2250 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2251 self.generate_order_accepted(order, venue_order_id);
2252 }
2253
2254 if let Err(e) = self
2256 .cache
2257 .borrow_mut()
2258 .add_order(order.clone(), None, None, false)
2259 {
2260 log::debug!("Order already in cache: {e}");
2261 }
2262
2263 self.fill_market_order(order.client_order_id());
2264 }
2265
2266 fn process_limit_order(&mut self, order: &mut OrderAny) {
2267 let limit_px = order.price().expect("Limit order must have a price");
2268 if order.is_post_only()
2269 && self
2270 .core
2271 .is_limit_matched(order.order_side_specified(), limit_px)
2272 {
2273 self.generate_order_rejected(
2274 order,
2275 format!(
2276 "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
2277 order.order_type(),
2278 order.order_side(),
2279 order.price().unwrap(),
2280 self.core
2281 .bid
2282 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2283 self.core
2284 .ask
2285 .map_or_else(|| "None".to_string(), |p| p.to_string())
2286 )
2287 .into(),
2288 );
2289 return;
2290 }
2291
2292 self.accept_order(order);
2294
2295 if self
2297 .core
2298 .is_limit_matched(order.order_side_specified(), limit_px)
2299 {
2300 order.set_liquidity_side(LiquiditySide::Taker);
2302
2303 if self
2304 .cache
2305 .borrow_mut()
2306 .add_order(order.clone(), None, None, false)
2307 .is_err()
2308 && let Err(e) = self.cache.borrow_mut().update_order(order)
2309 {
2310 log::debug!("Failed to update order in cache: {e}");
2311 }
2312 self.fill_limit_order(order.client_order_id());
2313
2314 if self.core.order_exists(order.client_order_id())
2317 && let Some(cached) = self.cache.borrow_mut().mut_order(&order.client_order_id())
2318 {
2319 cached.set_liquidity_side(LiquiditySide::Maker);
2320 }
2321 } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
2322 self.cancel_order(order, None);
2323 } else {
2324 order.set_liquidity_side(LiquiditySide::Maker);
2326
2327 if let Some(price) = order.price() {
2328 self.snapshot_queue_position(order, price);
2329 }
2330
2331 let add_result = self
2332 .cache
2333 .borrow_mut()
2334 .add_order(order.clone(), None, None, false);
2335
2336 if let Err(e) = add_result {
2337 log::debug!("Failed to add order to cache: {e}");
2338
2339 if let Some(cached) = self.cache.borrow_mut().mut_order(&order.client_order_id())
2342 && !matches!(
2343 cached.liquidity_side(),
2344 Some(LiquiditySide::Maker | LiquiditySide::Taker)
2345 )
2346 {
2347 cached.set_liquidity_side(LiquiditySide::Maker);
2348 }
2349 }
2350 }
2351 }
2352
2353 fn process_market_to_limit_order(&mut self, order: &OrderAny) {
2354 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
2356 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
2357 {
2358 self.generate_order_rejected(
2359 order,
2360 format!("No market for {}", order.instrument_id()).into(),
2361 );
2362 return;
2363 }
2364
2365 if self.config.use_market_order_acks {
2366 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2367 self.generate_order_accepted(order, venue_order_id);
2368 }
2369
2370 if let Err(e) = self
2372 .cache
2373 .borrow_mut()
2374 .add_order(order.clone(), None, None, false)
2375 {
2376 log::debug!("Order already in cache: {e}");
2377 }
2378 let client_order_id = order.client_order_id();
2379 self.fill_market_order(client_order_id);
2380
2381 let filled_qty = self
2383 .cached_filled_qty
2384 .get(&client_order_id)
2385 .copied()
2386 .unwrap_or_default();
2387 let leaves_qty = order.quantity().saturating_sub(filled_qty);
2388 if !leaves_qty.is_zero() {
2389 let updated_order = self.cache.borrow().order(&client_order_id).cloned();
2391 if let Some(mut updated_order) = updated_order {
2392 self.accept_order(&mut updated_order);
2393 }
2394 }
2395 }
2396
2397 fn process_stop_market_order(&mut self, order: &mut OrderAny) {
2398 let stop_px = order
2399 .trigger_price()
2400 .expect("Stop order must have a trigger price");
2401
2402 if self
2403 .core
2404 .is_stop_matched(order.order_side_specified(), stop_px)
2405 {
2406 if self.config.reject_stop_orders {
2407 self.generate_order_rejected(
2408 order,
2409 format!(
2410 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
2411 order.order_type(),
2412 order.order_side(),
2413 order.trigger_price().unwrap(),
2414 self.core
2415 .bid
2416 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2417 self.core
2418 .ask
2419 .map_or_else(|| "None".to_string(), |p| p.to_string())
2420 ).into(),
2421 );
2422 return;
2423 }
2424
2425 if let Err(e) = self
2426 .cache
2427 .borrow_mut()
2428 .add_order(order.clone(), None, None, false)
2429 {
2430 log::debug!("Order already in cache: {e}");
2431 }
2432 self.fill_market_order(order.client_order_id());
2433 return;
2434 }
2435
2436 self.accept_order(order);
2438
2439 order.set_liquidity_side(LiquiditySide::Maker);
2441
2442 if let Err(e) = self
2443 .cache
2444 .borrow_mut()
2445 .add_order(order.clone(), None, None, false)
2446 {
2447 log::debug!("Order already in cache: {e}");
2448 }
2449 }
2450
2451 fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
2452 let stop_px = order
2453 .trigger_price()
2454 .expect("Stop order must have a trigger price");
2455
2456 if self
2457 .core
2458 .is_stop_matched(order.order_side_specified(), stop_px)
2459 {
2460 if self.config.reject_stop_orders {
2461 self.generate_order_rejected(
2462 order,
2463 format!(
2464 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
2465 order.order_type(),
2466 order.order_side(),
2467 order.trigger_price().unwrap(),
2468 self.core
2469 .bid
2470 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2471 self.core
2472 .ask
2473 .map_or_else(|| "None".to_string(), |p| p.to_string())
2474 ).into(),
2475 );
2476 return;
2477 }
2478
2479 self.accept_order(order);
2480 self.generate_order_triggered(order);
2481
2482 let limit_px = order.price().expect("Stop limit order must have a price");
2484
2485 if self
2486 .core
2487 .is_limit_matched(order.order_side_specified(), limit_px)
2488 {
2489 order.set_liquidity_side(LiquiditySide::Taker);
2490
2491 if let Err(e) = self
2492 .cache
2493 .borrow_mut()
2494 .add_order(order.clone(), None, None, false)
2495 {
2496 log::debug!("Order already in cache: {e}");
2497 }
2498 self.fill_limit_order(order.client_order_id());
2499 }
2500
2501 return;
2503 }
2504
2505 self.accept_order(order);
2506
2507 order.set_liquidity_side(LiquiditySide::Maker);
2509
2510 if let Err(e) = self
2511 .cache
2512 .borrow_mut()
2513 .add_order(order.clone(), None, None, false)
2514 {
2515 log::debug!("Order already in cache: {e}");
2516 }
2517 }
2518
2519 fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
2520 if self
2521 .core
2522 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
2523 {
2524 if self.config.reject_stop_orders {
2525 self.generate_order_rejected(
2526 order,
2527 format!(
2528 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
2529 order.order_type(),
2530 order.order_side(),
2531 order.trigger_price().unwrap(),
2532 self.core
2533 .bid
2534 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2535 self.core
2536 .ask
2537 .map_or_else(|| "None".to_string(), |p| p.to_string())
2538 ).into(),
2539 );
2540 return;
2541 }
2542
2543 if let Err(e) = self
2544 .cache
2545 .borrow_mut()
2546 .add_order(order.clone(), None, None, false)
2547 {
2548 log::debug!("Order already in cache: {e}");
2549 }
2550 self.fill_market_order(order.client_order_id());
2551 return;
2552 }
2553
2554 self.accept_order(order);
2556
2557 order.set_liquidity_side(LiquiditySide::Maker);
2559
2560 if let Err(e) = self
2561 .cache
2562 .borrow_mut()
2563 .add_order(order.clone(), None, None, false)
2564 {
2565 log::debug!("Order already in cache: {e}");
2566 }
2567 }
2568
2569 fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
2570 if self
2571 .core
2572 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
2573 {
2574 if self.config.reject_stop_orders {
2575 self.generate_order_rejected(
2576 order,
2577 format!(
2578 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
2579 order.order_type(),
2580 order.order_side(),
2581 order.trigger_price().unwrap(),
2582 self.core
2583 .bid
2584 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2585 self.core
2586 .ask
2587 .map_or_else(|| "None".to_string(), |p| p.to_string())
2588 ).into(),
2589 );
2590 return;
2591 }
2592 self.accept_order(order);
2593 self.generate_order_triggered(order);
2594
2595 if self
2597 .core
2598 .is_limit_matched(order.order_side_specified(), order.price().unwrap())
2599 {
2600 order.set_liquidity_side(LiquiditySide::Taker);
2601
2602 if let Err(e) = self
2603 .cache
2604 .borrow_mut()
2605 .add_order(order.clone(), None, None, false)
2606 {
2607 log::debug!("Order already in cache: {e}");
2608 }
2609 self.fill_limit_order(order.client_order_id());
2610 }
2611 return;
2612 }
2613
2614 self.accept_order(order);
2616
2617 order.set_liquidity_side(LiquiditySide::Maker);
2619
2620 if let Err(e) = self
2621 .cache
2622 .borrow_mut()
2623 .add_order(order.clone(), None, None, false)
2624 {
2625 log::debug!("Order already in cache: {e}");
2626 }
2627 }
2628
2629 fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
2630 if let Some(trigger_price) = order.trigger_price()
2631 && self
2632 .core
2633 .is_stop_matched(order.order_side_specified(), trigger_price)
2634 {
2635 self.generate_order_rejected(
2636 order,
2637 format!(
2638 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
2639 order.order_type(),
2640 order.order_side(),
2641 trigger_price,
2642 self.core
2643 .bid
2644 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2645 self.core
2646 .ask
2647 .map_or_else(|| "None".to_string(), |p| p.to_string())
2648 ).into(),
2649 );
2650 return;
2651 }
2652
2653 self.accept_order(order);
2655
2656 order.set_liquidity_side(LiquiditySide::Maker);
2658
2659 if let Err(e) = self
2660 .cache
2661 .borrow_mut()
2662 .add_order(order.clone(), None, None, false)
2663 {
2664 log::debug!("Order already in cache: {e}");
2665 }
2666 }
2667
2668 pub fn iterate(&mut self, timestamp_ns: UnixNanos, aggressor_side: AggressorSide) {
2675 if aggressor_side == AggressorSide::NoAggressor {
2680 if let Some(bid) = self.book.best_bid_price() {
2681 self.core.set_bid_raw(bid);
2682 }
2683
2684 if let Some(ask) = self.book.best_ask_price() {
2685 self.core.set_ask_raw(ask);
2686 }
2687 }
2688
2689 self.check_instrument_expiration();
2691
2692 if self.config.support_gtd_orders {
2694 self.expire_gtd_orders(timestamp_ns);
2695 }
2696
2697 for action in self.core.iterate_bids() {
2700 match action {
2701 MatchAction::FillLimit(id) => self.fill_limit_order(id),
2702 MatchAction::TriggerStop(id) => self.trigger_stop_order(id),
2703 }
2704 }
2705
2706 for action in self.core.iterate_asks() {
2707 match action {
2708 MatchAction::FillLimit(id) => self.fill_limit_order(id),
2709 MatchAction::TriggerStop(id) => self.trigger_stop_order(id),
2710 }
2711 }
2712
2713 let orders_bid = self.core.get_orders_bid().to_vec();
2714 let orders_ask = self.core.get_orders_ask().to_vec();
2715
2716 self.iterate_orders(timestamp_ns, &orders_bid);
2717 self.iterate_orders(timestamp_ns, &orders_ask);
2718
2719 self.core.bid = self.book.best_bid_price();
2722 self.core.ask = self.book.best_ask_price();
2723 }
2724
2725 fn get_trailing_activation_price(
2726 &self,
2727 trigger_type: TriggerType,
2728 order_side: OrderSide,
2729 bid: Option<Price>,
2730 ask: Option<Price>,
2731 last: Option<Price>,
2732 ) -> Option<Price> {
2733 match trigger_type {
2734 TriggerType::LastPrice => last,
2735 TriggerType::LastOrBidAsk => last.or(match order_side {
2736 OrderSide::Buy => ask,
2737 OrderSide::Sell => bid,
2738 _ => None,
2739 }),
2740 _ => match order_side {
2742 OrderSide::Buy => ask,
2743 OrderSide::Sell => bid,
2744 _ => None,
2745 },
2746 }
2747 }
2748
2749 fn maybe_activate_trailing_stop(
2750 &self,
2751 order: &mut OrderAny,
2752 bid: Option<Price>,
2753 ask: Option<Price>,
2754 last: Option<Price>,
2755 ) -> bool {
2756 match order {
2757 OrderAny::TrailingStopMarket(inner) => {
2758 if inner.is_activated {
2759 return true;
2760 }
2761
2762 if inner.activation_price.is_none() {
2763 let px = self.get_trailing_activation_price(
2764 inner.trigger_type,
2765 inner.order_side(),
2766 bid,
2767 ask,
2768 last,
2769 );
2770
2771 if let Some(p) = px {
2772 inner.activation_price = Some(p);
2773 inner.set_activated();
2774
2775 if let Err(e) = self.cache.borrow_mut().update_order(order) {
2776 log::error!("Failed to update order: {e}");
2777 }
2778 return true;
2779 }
2780 return false;
2781 }
2782
2783 let activation_price = inner.activation_price.unwrap();
2784 let hit = match inner.order_side() {
2785 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
2786 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
2787 _ => false,
2788 };
2789
2790 if hit {
2791 inner.set_activated();
2792
2793 if let Err(e) = self.cache.borrow_mut().update_order(order) {
2794 log::error!("Failed to update order: {e}");
2795 }
2796 }
2797 hit
2798 }
2799 OrderAny::TrailingStopLimit(inner) => {
2800 if inner.is_activated {
2801 return true;
2802 }
2803
2804 if inner.activation_price.is_none() {
2805 let px = self.get_trailing_activation_price(
2806 inner.trigger_type,
2807 inner.order_side(),
2808 bid,
2809 ask,
2810 last,
2811 );
2812
2813 if let Some(p) = px {
2814 inner.activation_price = Some(p);
2815 inner.set_activated();
2816
2817 if let Err(e) = self.cache.borrow_mut().update_order(order) {
2818 log::error!("Failed to update order: {e}");
2819 }
2820 return true;
2821 }
2822 return false;
2823 }
2824
2825 let activation_price = inner.activation_price.unwrap();
2826 let hit = match inner.order_side() {
2827 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
2828 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
2829 _ => false,
2830 };
2831
2832 if hit {
2833 inner.set_activated();
2834
2835 if let Err(e) = self.cache.borrow_mut().update_order(order) {
2836 log::error!("Failed to update order: {e}");
2837 }
2838 }
2839 hit
2840 }
2841 _ => true,
2842 }
2843 }
2844
2845 fn expire_gtd_orders(&mut self, timestamp_ns: UnixNanos) {
2846 for match_info in self.core.get_orders() {
2847 let order = match self
2848 .cache
2849 .borrow()
2850 .order(&match_info.client_order_id)
2851 .cloned()
2852 {
2853 Some(order) => order,
2854 None => continue,
2855 };
2856
2857 if order.is_closed() {
2858 continue;
2859 }
2860
2861 if order
2862 .expire_time()
2863 .is_some_and(|expire_ns| timestamp_ns >= expire_ns)
2864 {
2865 let _ = self.core.delete_order(match_info.client_order_id);
2866 self.cached_filled_qty
2867 .swap_remove(&match_info.client_order_id);
2868 self.expire_order(&order);
2869 }
2870 }
2871 }
2872
2873 fn iterate_orders(&mut self, timestamp_ns: UnixNanos, orders: &[OrderMatchInfo]) {
2874 for match_info in orders {
2875 let order = match self
2876 .cache
2877 .borrow()
2878 .order(&match_info.client_order_id)
2879 .cloned()
2880 {
2881 Some(order) => order,
2882 None => {
2883 log::warn!(
2884 "Order {} not found in cache during iteration, skipping",
2885 match_info.client_order_id
2886 );
2887 continue;
2888 }
2889 };
2890
2891 if order.is_closed() {
2892 continue;
2893 }
2894
2895 if self.config.support_gtd_orders
2896 && order
2897 .expire_time()
2898 .is_some_and(|expire_timestamp_ns| timestamp_ns >= expire_timestamp_ns)
2899 {
2900 let _ = self.core.delete_order(match_info.client_order_id);
2901 self.cached_filled_qty
2902 .swap_remove(&match_info.client_order_id);
2903 self.expire_order(&order);
2904 continue;
2905 }
2906
2907 if matches!(
2908 match_info.order_type,
2909 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
2910 ) {
2911 let mut any = order;
2912
2913 if !self.maybe_activate_trailing_stop(
2914 &mut any,
2915 self.core.bid,
2916 self.core.ask,
2917 self.core.last,
2918 ) {
2919 continue;
2920 }
2921
2922 self.update_trailing_stop_order(&any);
2923
2924 let _ = self.core.delete_order(match_info.client_order_id);
2926 let updated_match_info = OrderMatchInfo::new(
2927 any.client_order_id(),
2928 any.order_side().as_specified(),
2929 any.order_type(),
2930 any.trigger_price(),
2931 any.price(),
2932 match &any {
2933 OrderAny::TrailingStopMarket(o) => o.is_activated,
2934 OrderAny::TrailingStopLimit(o) => o.is_activated,
2935 _ => true,
2936 },
2937 );
2938 self.core.add_order(updated_match_info);
2939 }
2940
2941 if let Some(target_bid) = self.target_bid {
2943 self.core.bid = Some(target_bid);
2944 self.target_bid = None;
2945 }
2946
2947 if let Some(target_bid) = self.target_bid.take() {
2948 self.core.bid = Some(target_bid);
2949 self.target_bid = None;
2950 }
2951
2952 if let Some(target_ask) = self.target_ask.take() {
2953 self.core.ask = Some(target_ask);
2954 self.target_ask = None;
2955 }
2956
2957 if let Some(target_last) = self.target_last.take() {
2958 self.core.last = Some(target_last);
2959 self.target_last = None;
2960 }
2961 }
2962
2963 self.target_bid = None;
2965 self.target_ask = None;
2966 self.target_last = None;
2967 }
2968
2969 fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
2970 match order.price() {
2971 Some(order_price) => {
2972 let mut fills = if self.config.liquidity_consumption {
2977 let size_prec = self.instrument.size_precision();
2978 self.book
2979 .get_all_crossed_levels(order.order_side(), order_price, size_prec)
2980 } else {
2981 let book_order =
2982 BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
2983 self.book.simulate_fills(&book_order)
2984 };
2985
2986 if let Some(trade_size) = self.last_trade_size
2988 && let Some(trade_price) = self.core.last
2989 {
2990 let fills_at_trade_price = fills.iter().any(|(px, _)| *px == trade_price);
2991
2992 if !fills_at_trade_price
2993 && self
2994 .core
2995 .is_limit_matched(order.order_side_specified(), order_price)
2996 {
2997 let leaves_qty = order.leaves_qty();
3000 let available_qty = if self.config.liquidity_consumption {
3001 let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
3002 Quantity::from_raw(remaining, trade_size.precision)
3003 } else {
3004 trade_size
3005 };
3006
3007 let fill_qty = min(leaves_qty, available_qty);
3008
3009 if !fill_qty.is_zero() {
3010 log::debug!(
3011 "Trade execution fill: {} @ {} (trade_price={}, available: {}, book had {} fills)",
3012 fill_qty,
3013 order_price,
3014 trade_price,
3015 available_qty,
3016 fills.len()
3017 );
3018
3019 if self.config.liquidity_consumption {
3020 self.trade_consumption += fill_qty.raw;
3021 }
3022
3023 return vec![(order_price, fill_qty)];
3028 }
3029 }
3030 }
3031
3032 if fills.is_empty() {
3034 return fills;
3035 }
3036
3037 let book_prices: Vec<Price> = if self.config.liquidity_consumption {
3041 fills.iter().map(|(px, _)| *px).collect()
3042 } else {
3043 Vec::new()
3044 };
3045 let book_prices_ref: Option<&[Price]> = if book_prices.is_empty() {
3046 None
3047 } else {
3048 Some(&book_prices)
3049 };
3050
3051 if let Some(triggered_price) = order.trigger_price() {
3053 if order
3055 .liquidity_side()
3056 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
3057 {
3058 if order.order_side() == OrderSide::Sell && order_price > triggered_price {
3059 let first_fill = fills.first().unwrap();
3061 let triggered_qty = first_fill.1;
3062 fills[0] = (triggered_price, triggered_qty);
3063 self.target_bid = self.core.bid;
3064 self.target_ask = self.core.ask;
3065 self.target_last = self.core.last;
3066 self.core.set_ask_raw(order_price);
3067 self.core.set_last_raw(order_price);
3068 } else if order.order_side() == OrderSide::Buy
3069 && order_price < triggered_price
3070 {
3071 let first_fill = fills.first().unwrap();
3073 let triggered_qty = first_fill.1;
3074 fills[0] = (triggered_price, triggered_qty);
3075 self.target_bid = self.core.bid;
3076 self.target_ask = self.core.ask;
3077 self.target_last = self.core.last;
3078 self.core.set_bid_raw(order_price);
3079 self.core.set_last_raw(order_price);
3080 }
3081 }
3082 }
3083
3084 if order
3086 .liquidity_side()
3087 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
3088 {
3089 match order.order_side().as_specified() {
3090 OrderSideSpecified::Buy => {
3091 let target_price = if order
3092 .trigger_price()
3093 .is_some_and(|trigger_price| order_price > trigger_price)
3094 {
3095 order.trigger_price().unwrap()
3096 } else {
3097 order_price
3098 };
3099
3100 for fill in &mut fills {
3101 let last_px = fill.0;
3102 if last_px < order_price {
3103 self.target_bid = self.core.bid;
3105 self.target_ask = self.core.ask;
3106 self.target_last = self.core.last;
3107 self.core.set_ask_raw(target_price);
3108 self.core.set_last_raw(target_price);
3109 fill.0 = target_price;
3110 }
3111 }
3112 }
3113 OrderSideSpecified::Sell => {
3114 let target_price = if order
3115 .trigger_price()
3116 .is_some_and(|trigger_price| order_price < trigger_price)
3117 {
3118 order.trigger_price().unwrap()
3119 } else {
3120 order_price
3121 };
3122
3123 for fill in &mut fills {
3124 let last_px = fill.0;
3125 if last_px > order_price {
3126 self.target_bid = self.core.bid;
3128 self.target_ask = self.core.ask;
3129 self.target_last = self.core.last;
3130 self.core.set_bid_raw(target_price);
3131 self.core.set_last_raw(target_price);
3132 fill.0 = target_price;
3133 }
3134 }
3135 }
3136 }
3137 }
3138
3139 self.apply_liquidity_consumption(
3140 fills,
3141 order.order_side(),
3142 order.leaves_qty(),
3143 book_prices_ref,
3144 )
3145 }
3146 None => panic!("Limit order must have a price"),
3147 }
3148 }
3149
3150 fn determine_market_price_and_volume(&self, order: &OrderAny) -> Vec<(Price, Quantity)> {
3151 let price = match order.order_side().as_specified() {
3152 OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
3153 OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
3154 };
3155
3156 let mut fills = if self.config.liquidity_consumption {
3159 let size_prec = self.instrument.size_precision();
3160 self.book
3161 .get_all_crossed_levels(order.order_side(), price, size_prec)
3162 } else {
3163 let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
3164 self.book.simulate_fills(&book_order)
3165 };
3166
3167 if !self.fill_at_market
3170 && self.book_type == BookType::L1_MBP
3171 && !fills.is_empty()
3172 && matches!(
3173 order.order_type(),
3174 OrderType::StopMarket | OrderType::TrailingStopMarket | OrderType::MarketIfTouched
3175 )
3176 && let Some(trigger_price) = order.trigger_price()
3177 {
3178 fills[0] = (trigger_price, fills[0].1);
3179
3180 let mut remaining_qty = order.leaves_qty().raw;
3182 let mut capped_fills = Vec::with_capacity(fills.len());
3183
3184 for (price, qty) in fills {
3185 if remaining_qty == 0 {
3186 break;
3187 }
3188
3189 let capped_qty_raw = min(qty.raw, remaining_qty);
3190 if capped_qty_raw == 0 {
3191 continue;
3192 }
3193
3194 remaining_qty -= capped_qty_raw;
3195 capped_fills.push((price, Quantity::from_raw(capped_qty_raw, qty.precision)));
3196 }
3197
3198 return capped_fills;
3199 }
3200
3201 fills
3202 }
3203
3204 fn determine_market_fill_model_price_and_volume(
3205 &mut self,
3206 order: &OrderAny,
3207 ) -> (Vec<(Price, Quantity)>, bool) {
3208 if let (Some(best_bid), Some(best_ask)) = (self.core.bid, self.core.ask)
3209 && let Some(book) = self.fill_model.get_orderbook_for_fill_simulation(
3210 &self.instrument,
3211 order,
3212 best_bid,
3213 best_ask,
3214 )
3215 {
3216 let price = match order.order_side().as_specified() {
3217 OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
3218 OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
3219 };
3220 let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
3221 let fills = book.simulate_fills(&book_order);
3222 if !fills.is_empty() {
3223 return (fills, true);
3224 }
3225 }
3226 (self.determine_market_price_and_volume(order), false)
3227 }
3228
3229 fn determine_limit_fill_model_price_and_volume(
3230 &mut self,
3231 order: &OrderAny,
3232 ) -> Vec<(Price, Quantity)> {
3233 if let (Some(best_bid), Some(best_ask)) = (self.core.bid, self.core.ask)
3234 && let Some(book) = self.fill_model.get_orderbook_for_fill_simulation(
3235 &self.instrument,
3236 order,
3237 best_bid,
3238 best_ask,
3239 )
3240 && let Some(limit_price) = order.price()
3241 {
3242 let book_order = BookOrder::new(order.order_side(), limit_price, order.quantity(), 0);
3243 let fills = book.simulate_fills(&book_order);
3244 if !fills.is_empty() {
3245 return fills;
3246 }
3247 }
3248 self.determine_limit_price_and_volume(order)
3249 }
3250
3251 pub fn fill_market_order(&mut self, client_order_id: ClientOrderId) {
3256 let mut order = match self.cache.borrow().order(&client_order_id).cloned() {
3257 Some(order) => order,
3258 None => {
3259 log::error!("Cannot fill market order: order {client_order_id} not found in cache");
3260 return;
3261 }
3262 };
3263
3264 if order.is_quote_quantity()
3268 && !self.instrument.is_inverse()
3269 && !self.convert_quote_to_base_quantity(&mut order)
3270 {
3271 return;
3272 }
3273
3274 if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id())
3275 && filled_qty >= &order.quantity()
3276 {
3277 log::debug!(
3278 "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
3279 filled_qty,
3280 order.quantity(),
3281 order.filled_qty(),
3282 order.quantity()
3283 );
3284 return;
3285 }
3286
3287 let venue_position_id = self.ids_generator.get_position_id(&order, Some(true));
3288 let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
3289 let cache = self.cache.as_ref().borrow();
3290 cache.position(&venue_position_id).cloned()
3291 } else {
3292 None
3293 };
3294
3295 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
3296 log::warn!(
3297 "Canceling REDUCE_ONLY {} as would increase position",
3298 order.order_type()
3299 );
3300 self.cancel_order(&order, None);
3301 return;
3302 }
3303
3304 order.set_liquidity_side(LiquiditySide::Taker);
3305 let (mut fills, from_synthetic) = self.determine_market_fill_model_price_and_volume(&order);
3306
3307 if let Some(protection_points) = self.config.price_protection_points
3309 && matches!(
3310 order.order_type(),
3311 OrderType::Market | OrderType::StopMarket
3312 )
3313 && let Ok(protection_price) = protection_price_calculate(
3314 self.instrument.price_increment(),
3315 &order,
3316 protection_points,
3317 self.core.bid,
3318 self.core.ask,
3319 )
3320 {
3321 fills = self.filter_fills_by_protection(fills, &order, protection_price);
3322 }
3323
3324 let is_trigger_price_fill = !self.fill_at_market
3327 && self.book_type == BookType::L1_MBP
3328 && matches!(
3329 order.order_type(),
3330 OrderType::StopMarket | OrderType::TrailingStopMarket | OrderType::MarketIfTouched
3331 )
3332 && order.trigger_price().is_some();
3333
3334 if !from_synthetic && !is_trigger_price_fill {
3335 fills = self.apply_liquidity_consumption(
3336 fills,
3337 order.order_side(),
3338 order.leaves_qty(),
3339 None,
3340 );
3341 }
3342
3343 self.apply_fills(
3344 &order,
3345 &fills,
3346 LiquiditySide::Taker,
3347 None,
3348 position.as_ref(),
3349 );
3350 }
3351
3352 fn filter_fills_by_protection(
3353 &self,
3354 fills: Vec<(Price, Quantity)>,
3355 order: &OrderAny,
3356 protection_price: Price,
3357 ) -> Vec<(Price, Quantity)> {
3358 let protection_raw = protection_price.raw;
3359 fills
3360 .into_iter()
3361 .filter(|(fill_price, _)| {
3362 match order.order_side() {
3363 OrderSide::Buy => fill_price.raw <= protection_raw,
3365 OrderSide::Sell => fill_price.raw >= protection_raw,
3367 OrderSide::NoOrderSide => false,
3368 }
3369 })
3370 .collect()
3371 }
3372
3373 pub fn fill_limit_order(&mut self, client_order_id: ClientOrderId) {
3382 let mut order = match self.cache.borrow().order(&client_order_id).cloned() {
3383 Some(order) => order,
3384 None => {
3385 log::error!("Cannot fill limit order: order {client_order_id} not found in cache");
3386 return;
3387 }
3388 };
3389
3390 if order.is_quote_quantity()
3394 && !self.instrument.is_inverse()
3395 && !self.convert_quote_to_base_quantity(&mut order)
3396 {
3397 return;
3398 }
3399
3400 match order.price() {
3401 Some(order_price) => {
3402 let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
3403 if let Some(&qty) = cached_filled_qty
3404 && qty >= order.quantity()
3405 {
3406 log::debug!(
3407 "Ignoring fill as already filled pending application of events: {}, {}, {}, {}",
3408 qty,
3409 order.quantity(),
3410 order.filled_qty(),
3411 order.leaves_qty(),
3412 );
3413 return;
3414 }
3415
3416 if order
3418 .liquidity_side()
3419 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
3420 {
3421 let at_limit = if self.last_trade_size.is_some() && self.core.last.is_some() {
3424 self.core.last.is_some_and(|last| last == order_price)
3425 } else if order.order_side() == OrderSide::Buy {
3426 self.core.bid.is_some_and(|bid| bid == order_price)
3427 } else {
3428 self.core.ask.is_some_and(|ask| ask == order_price)
3429 };
3430
3431 if at_limit && !self.fill_model.is_limit_filled() {
3432 return; }
3434 }
3435
3436 let queue_allowed_raw = if self.config.queue_position {
3437 match self.determine_trade_fill_qty(&order) {
3438 None | Some(0) => {
3439 if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc)
3440 {
3441 self.cancel_order(&order, None);
3442 }
3443 return;
3444 }
3445 Some(allowed) => Some(allowed),
3446 }
3447 } else {
3448 None
3449 };
3450
3451 let venue_position_id = self.ids_generator.get_position_id(&order, None);
3452 let position = if let Some(venue_position_id) = venue_position_id {
3453 let cache = self.cache.as_ref().borrow();
3454 cache.position(&venue_position_id).cloned()
3455 } else {
3456 None
3457 };
3458
3459 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
3460 log::warn!(
3461 "Canceling REDUCE_ONLY {} as would increase position",
3462 order.order_type()
3463 );
3464 self.cancel_order(&order, None);
3465 return;
3466 }
3467
3468 let tc_before = self.trade_consumption;
3469 let mut fills = self.determine_limit_fill_model_price_and_volume(&order);
3470
3471 if let Some(allowed_raw) = queue_allowed_raw {
3472 let size_prec = self.instrument.size_precision();
3473 let mut remaining = allowed_raw;
3474 fills = fills
3475 .into_iter()
3476 .filter_map(|(price, qty)| {
3477 if remaining == 0 {
3478 return None;
3479 }
3480 let capped = qty.raw.min(remaining);
3481 remaining -= capped;
3482 Some((price, Quantity::from_raw(capped, size_prec)))
3483 })
3484 .collect();
3485
3486 let consumed: QuantityRaw = fills.iter().map(|(_, qty)| qty.raw).sum();
3488
3489 if let Some(excess) = self.queue_excess.get_mut(&order.client_order_id()) {
3490 *excess = excess.saturating_sub(consumed);
3491 }
3492 self.trade_consumption = tc_before + consumed;
3493 }
3494
3495 if fills.is_empty() && self.config.liquidity_consumption {
3499 log::debug!(
3500 "Skipping fill for {}: no liquidity available after consumption",
3501 order.client_order_id()
3502 );
3503
3504 if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
3505 self.cancel_order(&order, None);
3506 }
3507
3508 return;
3509 }
3510
3511 let liquidity_side = order.liquidity_side().unwrap();
3512 self.apply_fills(
3513 &order,
3514 &fills,
3515 liquidity_side,
3516 venue_position_id,
3517 position.as_ref(),
3518 );
3519 }
3520 None => panic!("Limit order must have a price"),
3521 }
3522 }
3523
3524 fn apply_fills(
3525 &mut self,
3526 order: &OrderAny,
3527 fills: &[(Price, Quantity)],
3528 liquidity_side: LiquiditySide,
3529 venue_position_id: Option<PositionId>,
3530 position: Option<&Position>,
3531 ) {
3532 if order.time_in_force() == TimeInForce::Fok {
3533 let mut total_size = Quantity::zero(order.quantity().precision);
3534 for (_fill_px, fill_qty) in fills {
3535 total_size = total_size.add(*fill_qty);
3536 }
3537
3538 if order.leaves_qty() > total_size {
3539 self.cancel_order(order, None);
3540 return;
3541 }
3542 }
3543
3544 if fills.is_empty() {
3545 if order.status() == OrderStatus::Submitted {
3546 self.generate_order_rejected(
3547 order,
3548 format!("No market for {}", order.instrument_id()).into(),
3549 );
3550 } else {
3551 log::error!(
3552 "Cannot fill order: no fills from book when fills were expected (check size in data)"
3553 );
3554 return;
3555 }
3556 }
3557
3558 let venue_position_id = if self.oms_type == OmsType::Netting {
3560 None
3561 } else {
3562 venue_position_id
3563 };
3564
3565 let mut initial_market_to_limit_fill = false;
3566
3567 for &(mut fill_px, ref fill_qty) in fills {
3568 assert!(
3569 (fill_px.precision == self.instrument.price_precision()),
3570 "Invalid price precision for fill price {} when instrument price precision is {}.\
3571 Check that the data price precision matches the {} instrument",
3572 fill_px.precision,
3573 self.instrument.price_precision(),
3574 self.instrument.id()
3575 );
3576
3577 assert!(
3578 (fill_qty.precision == self.instrument.size_precision()),
3579 "Invalid quantity precision for fill quantity {} when instrument size precision is {}.\
3580 Check that the data quantity precision matches the {} instrument",
3581 fill_qty.precision,
3582 self.instrument.size_precision(),
3583 self.instrument.id()
3584 );
3585
3586 if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
3587 && order.order_type() == OrderType::MarketToLimit
3588 {
3589 self.generate_order_updated(order, order.quantity(), Some(fill_px), None, None);
3590 initial_market_to_limit_fill = true;
3591 }
3592
3593 if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
3594 fill_px = match order.order_side().as_specified() {
3595 OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
3596 OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
3597 }
3598 }
3599
3600 let mut effective_fill_qty = *fill_qty;
3604
3605 if self.config.use_reduce_only
3606 && order.is_reduce_only()
3607 && let Some(position) = &position
3608 && *fill_qty > position.quantity
3609 {
3610 if position.quantity == Quantity::zero(position.quantity.precision) {
3611 return;
3613 }
3614
3615 let adjusted_fill_qty =
3617 Quantity::from_raw(position.quantity.raw, fill_qty.precision);
3618
3619 effective_fill_qty = min(effective_fill_qty, adjusted_fill_qty);
3621
3622 if order.quantity() != adjusted_fill_qty {
3624 self.generate_order_updated(order, adjusted_fill_qty, None, None, None);
3625 }
3626 }
3627
3628 if fill_qty.is_zero() {
3629 if fills.len() == 1 && order.status() == OrderStatus::Submitted {
3630 self.generate_order_rejected(
3631 order,
3632 format!("No market for {}", order.instrument_id()).into(),
3633 );
3634 }
3635 return;
3636 }
3637
3638 self.fill_order(
3639 order,
3640 fill_px,
3641 effective_fill_qty,
3642 liquidity_side,
3643 venue_position_id,
3644 position,
3645 );
3646
3647 if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
3648 return;
3650 }
3651 }
3652
3653 if order.time_in_force() == TimeInForce::Ioc && order.is_open() {
3654 self.cancel_order(order, None);
3656 return;
3657 }
3658
3659 if order.is_open()
3660 && self.book_type == BookType::L1_MBP
3661 && matches!(
3662 order.order_type(),
3663 OrderType::Market
3664 | OrderType::MarketIfTouched
3665 | OrderType::StopMarket
3666 | OrderType::TrailingStopMarket
3667 )
3668 {
3669 todo!("Exhausted simulated book volume")
3673 }
3674 }
3675
3676 fn fill_order(
3677 &mut self,
3678 order: &OrderAny,
3679 last_px: Price,
3680 last_qty: Quantity,
3681 liquidity_side: LiquiditySide,
3682 venue_position_id: Option<PositionId>,
3683 _position: Option<&Position>,
3684 ) {
3685 self.check_size_precision(last_qty.precision, "fill quantity")
3686 .unwrap();
3687
3688 match self.cached_filled_qty.get(&order.client_order_id()) {
3689 Some(filled_qty) => {
3690 let leaves_qty = order.quantity().saturating_sub(*filled_qty);
3692 let last_qty = min(last_qty, leaves_qty);
3693 let new_filled_qty = *filled_qty + last_qty;
3694 self.cached_filled_qty
3695 .insert(order.client_order_id(), new_filled_qty);
3696 }
3697 None => {
3698 self.cached_filled_qty
3699 .insert(order.client_order_id(), last_qty);
3700 }
3701 }
3702
3703 let commission = self
3704 .fee_model
3705 .get_commission(order, last_qty, last_px, &self.instrument)
3706 .unwrap();
3707
3708 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
3709 self.generate_order_filled(
3710 order,
3711 venue_order_id,
3712 venue_position_id,
3713 last_qty,
3714 last_px,
3715 self.instrument.quote_currency(),
3716 commission,
3717 liquidity_side,
3718 );
3719
3720 let fully_filled = self
3721 .cached_filled_qty
3722 .get(&order.client_order_id())
3723 .is_some_and(|qty| qty >= &order.quantity());
3724
3725 if order.is_passive() && (order.is_closed() || fully_filled) {
3726 if self.core.order_exists(order.client_order_id()) {
3727 let _ = self.core.delete_order(order.client_order_id());
3728 }
3729
3730 if order.is_closed() {
3733 self.cached_filled_qty.swap_remove(&order.client_order_id());
3734 }
3735 }
3736
3737 if !self.config.support_contingent_orders {
3738 return;
3739 }
3740
3741 if let Some(contingency_type) = order.contingency_type() {
3742 match contingency_type {
3743 ContingencyType::Oto => {
3744 if let Some(linked_orders_ids) = order.linked_order_ids() {
3745 for client_order_id in linked_orders_ids {
3746 let mut child_order = match self.cache.borrow().order(client_order_id) {
3747 Some(child_order) => child_order.clone(),
3748 None => panic!("Order {client_order_id} not found in cache"),
3749 };
3750
3751 if child_order.is_closed() || child_order.is_active_local() {
3752 continue;
3753 }
3754
3755 if let (None, Some(position_id)) =
3757 (child_order.position_id(), order.position_id())
3758 {
3759 self.cache
3760 .borrow_mut()
3761 .add_position_id(
3762 &position_id,
3763 &self.venue,
3764 client_order_id,
3765 &child_order.strategy_id(),
3766 )
3767 .unwrap();
3768 log::debug!(
3769 "Added position id {position_id} to cache for order {client_order_id}"
3770 );
3771 }
3772
3773 if (!child_order.is_open())
3774 || (matches!(child_order.status(), OrderStatus::PendingUpdate)
3775 && child_order
3776 .previous_status()
3777 .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
3778 {
3779 let account_id = order.account_id().unwrap_or_else(|| {
3780 *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
3781 panic!(
3782 "Account ID not found for trader {}",
3783 order.trader_id()
3784 )
3785 })
3786 });
3787 self.process_order(&mut child_order, account_id);
3788 }
3789 }
3790 } else {
3791 log::error!(
3792 "OTO order {} does not have linked orders",
3793 order.client_order_id()
3794 );
3795 }
3796 }
3797 ContingencyType::Oco => {
3798 if let Some(linked_orders_ids) = order.linked_order_ids() {
3799 for client_order_id in linked_orders_ids {
3800 let child_order = match self.cache.borrow().order(client_order_id) {
3801 Some(child_order) => child_order.clone(),
3802 None => panic!("Order {client_order_id} not found in cache"),
3803 };
3804
3805 if child_order.is_closed() || child_order.is_active_local() {
3806 continue;
3807 }
3808
3809 self.cancel_order(&child_order, None);
3810 }
3811 } else {
3812 log::error!(
3813 "OCO order {} does not have linked orders",
3814 order.client_order_id()
3815 );
3816 }
3817 }
3818 ContingencyType::Ouo => {
3819 if let Some(linked_orders_ids) = order.linked_order_ids() {
3820 for client_order_id in linked_orders_ids {
3821 let mut child_order = match self.cache.borrow().order(client_order_id) {
3822 Some(child_order) => child_order.clone(),
3823 None => panic!("Order {client_order_id} not found in cache"),
3824 };
3825
3826 if child_order.is_active_local() {
3827 continue;
3828 }
3829
3830 if order.is_closed() && child_order.is_open() {
3831 self.cancel_order(&child_order, None);
3832 } else if !order.leaves_qty().is_zero()
3833 && order.leaves_qty() != child_order.leaves_qty()
3834 {
3835 let price = child_order.price();
3836 let trigger_price = child_order.trigger_price();
3837 self.update_order(
3838 &mut child_order,
3839 Some(order.leaves_qty()),
3840 price,
3841 trigger_price,
3842 Some(false),
3843 );
3844 }
3845 }
3846 } else {
3847 log::error!(
3848 "OUO order {} does not have linked orders",
3849 order.client_order_id()
3850 );
3851 }
3852 }
3853 _ => {}
3854 }
3855 }
3856 }
3857
3858 fn update_limit_order(&mut self, order: &OrderAny, quantity: Quantity, price: Price) {
3859 if self
3860 .core
3861 .is_limit_matched(order.order_side_specified(), price)
3862 {
3863 if order.is_post_only() {
3864 self.generate_order_modify_rejected(
3865 order.trader_id(),
3866 order.strategy_id(),
3867 order.instrument_id(),
3868 order.client_order_id(),
3869 Ustr::from(format!(
3870 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
3871 order.order_type(),
3872 order.order_side(),
3873 price,
3874 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
3875 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
3876 ).as_str()),
3877 order.venue_order_id(),
3878 order.account_id(),
3879 );
3880 return;
3881 }
3882
3883 self.generate_order_updated(order, quantity, Some(price), None, None);
3884
3885 let client_order_id = order.client_order_id();
3887 if let Some(cached) = self.cache.borrow_mut().mut_order(&client_order_id) {
3888 cached.set_liquidity_side(LiquiditySide::Taker);
3889 }
3890 self.fill_limit_order(client_order_id);
3891 return;
3892 }
3893 self.generate_order_updated(order, quantity, Some(price), None, None);
3894 }
3895
3896 fn update_stop_market_order(&self, order: &OrderAny, quantity: Quantity, trigger_price: Price) {
3897 if self
3898 .core
3899 .is_stop_matched(order.order_side_specified(), trigger_price)
3900 {
3901 self.generate_order_modify_rejected(
3902 order.trader_id(),
3903 order.strategy_id(),
3904 order.instrument_id(),
3905 order.client_order_id(),
3906 Ustr::from(
3907 format!(
3908 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
3909 order.order_type(),
3910 order.order_side(),
3911 trigger_price,
3912 self.core
3913 .bid
3914 .map_or_else(|| "None".to_string(), |p| p.to_string()),
3915 self.core
3916 .ask
3917 .map_or_else(|| "None".to_string(), |p| p.to_string())
3918 )
3919 .as_str(),
3920 ),
3921 order.venue_order_id(),
3922 order.account_id(),
3923 );
3924 return;
3925 }
3926
3927 self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
3928 }
3929
3930 fn update_stop_limit_order(
3931 &mut self,
3932 order: &mut OrderAny,
3933 quantity: Quantity,
3934 price: Price,
3935 trigger_price: Price,
3936 ) {
3937 if order.is_triggered().is_some_and(|t| t) {
3938 if self
3940 .core
3941 .is_limit_matched(order.order_side_specified(), price)
3942 {
3943 if order.is_post_only() {
3944 self.generate_order_modify_rejected(
3945 order.trader_id(),
3946 order.strategy_id(),
3947 order.instrument_id(),
3948 order.client_order_id(),
3949 Ustr::from(format!(
3950 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
3951 order.order_type(),
3952 order.order_side(),
3953 price,
3954 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
3955 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
3956 ).as_str()),
3957 order.venue_order_id(),
3958 order.account_id(),
3959 );
3960 return;
3961 }
3962 self.generate_order_updated(order, quantity, Some(price), None, None);
3963 order.set_liquidity_side(LiquiditySide::Taker);
3964
3965 if let Err(e) = self
3966 .cache
3967 .borrow_mut()
3968 .add_order(order.clone(), None, None, false)
3969 {
3970 log::debug!("Order already in cache: {e}");
3971 }
3972 self.fill_limit_order(order.client_order_id());
3973 return; }
3975 } else {
3976 if self
3978 .core
3979 .is_stop_matched(order.order_side_specified(), trigger_price)
3980 {
3981 self.generate_order_modify_rejected(
3982 order.trader_id(),
3983 order.strategy_id(),
3984 order.instrument_id(),
3985 order.client_order_id(),
3986 Ustr::from(
3987 format!(
3988 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
3989 order.order_type(),
3990 order.order_side(),
3991 trigger_price,
3992 self.core
3993 .bid
3994 .map_or_else(|| "None".to_string(), |p| p.to_string()),
3995 self.core
3996 .ask
3997 .map_or_else(|| "None".to_string(), |p| p.to_string())
3998 )
3999 .as_str(),
4000 ),
4001 order.venue_order_id(),
4002 order.account_id(),
4003 );
4004 return;
4005 }
4006 }
4007
4008 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
4009 }
4010
4011 fn update_market_if_touched_order(
4012 &self,
4013 order: &OrderAny,
4014 quantity: Quantity,
4015 trigger_price: Price,
4016 ) {
4017 if self
4018 .core
4019 .is_touch_triggered(order.order_side_specified(), trigger_price)
4020 {
4021 self.generate_order_modify_rejected(
4022 order.trader_id(),
4023 order.strategy_id(),
4024 order.instrument_id(),
4025 order.client_order_id(),
4026 Ustr::from(
4027 format!(
4028 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
4029 order.order_type(),
4030 order.order_side(),
4031 trigger_price,
4032 self.core
4033 .bid
4034 .map_or_else(|| "None".to_string(), |p| p.to_string()),
4035 self.core
4036 .ask
4037 .map_or_else(|| "None".to_string(), |p| p.to_string())
4038 )
4039 .as_str(),
4040 ),
4041 order.venue_order_id(),
4042 order.account_id(),
4043 );
4044 return;
4046 }
4047
4048 self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
4049 }
4050
4051 fn update_limit_if_touched_order(
4052 &mut self,
4053 order: &mut OrderAny,
4054 quantity: Quantity,
4055 price: Price,
4056 trigger_price: Price,
4057 ) {
4058 if order.is_triggered().is_some_and(|t| t) {
4059 if self
4061 .core
4062 .is_limit_matched(order.order_side_specified(), price)
4063 {
4064 if order.is_post_only() {
4065 self.generate_order_modify_rejected(
4066 order.trader_id(),
4067 order.strategy_id(),
4068 order.instrument_id(),
4069 order.client_order_id(),
4070 Ustr::from(format!(
4071 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
4072 order.order_type(),
4073 order.order_side(),
4074 price,
4075 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
4076 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
4077 ).as_str()),
4078 order.venue_order_id(),
4079 order.account_id(),
4080 );
4081 return;
4083 }
4084 self.generate_order_updated(order, quantity, Some(price), None, None);
4085 order.set_liquidity_side(LiquiditySide::Taker);
4086 self.fill_limit_order(order.client_order_id());
4087 return;
4088 }
4089 } else {
4090 if self
4092 .core
4093 .is_touch_triggered(order.order_side_specified(), trigger_price)
4094 {
4095 self.generate_order_modify_rejected(
4096 order.trader_id(),
4097 order.strategy_id(),
4098 order.instrument_id(),
4099 order.client_order_id(),
4100 Ustr::from(
4101 format!(
4102 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
4103 order.order_type(),
4104 order.order_side(),
4105 trigger_price,
4106 self.core
4107 .bid
4108 .map_or_else(|| "None".to_string(), |p| p.to_string()),
4109 self.core
4110 .ask
4111 .map_or_else(|| "None".to_string(), |p| p.to_string())
4112 )
4113 .as_str(),
4114 ),
4115 order.venue_order_id(),
4116 order.account_id(),
4117 );
4118 return;
4119 }
4120 }
4121
4122 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
4123 }
4124
4125 fn update_trailing_stop_order(&self, order: &OrderAny) {
4126 let (new_trigger_price, new_price) = trailing_stop_calculate(
4127 self.instrument.price_increment(),
4128 order.trigger_price(),
4129 order.activation_price(),
4130 order,
4131 self.core.bid,
4132 self.core.ask,
4133 self.core.last,
4134 )
4135 .unwrap();
4136
4137 if new_trigger_price.is_none() && new_price.is_none() {
4138 return;
4139 }
4140
4141 self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price, None);
4142 }
4143
4144 fn accept_order(&mut self, order: &mut OrderAny) {
4145 if order.is_closed() {
4146 return;
4148 }
4149
4150 if order.status() != OrderStatus::Accepted {
4151 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
4152 self.generate_order_accepted(order, venue_order_id);
4153
4154 if matches!(
4155 order.order_type(),
4156 OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
4157 ) && order.trigger_price().is_none()
4158 {
4159 self.update_trailing_stop_order(order);
4160 }
4161 }
4162
4163 let match_info = OrderMatchInfo::new(
4164 order.client_order_id(),
4165 order.order_side().as_specified(),
4166 order.order_type(),
4167 order.trigger_price(),
4168 order.price(),
4169 match order {
4170 OrderAny::TrailingStopMarket(o) => o.is_activated,
4171 OrderAny::TrailingStopLimit(o) => o.is_activated,
4172 _ => true,
4173 },
4174 );
4175 self.core.add_order(match_info);
4176 }
4177
4178 fn expire_order(&mut self, order: &OrderAny) {
4179 if self.config.support_contingent_orders
4180 && order
4181 .contingency_type()
4182 .is_some_and(|c| c != ContingencyType::NoContingency)
4183 {
4184 self.cancel_contingent_orders(order);
4185 }
4186
4187 self.generate_order_expired(order);
4188 }
4189
4190 fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
4191 let cancel_contingencies = cancel_contingencies.unwrap_or(true);
4192
4193 if order.is_active_local() {
4194 log::error!(
4195 "Cannot cancel an order with {} from the matching engine",
4196 order.status()
4197 );
4198 return;
4199 }
4200
4201 if self.core.order_exists(order.client_order_id()) {
4203 let _ = self.core.delete_order(order.client_order_id());
4204 }
4205 self.cached_filled_qty.swap_remove(&order.client_order_id());
4206
4207 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
4208 self.generate_order_canceled(order, venue_order_id);
4209
4210 if self.config.support_contingent_orders
4211 && order.contingency_type().is_some()
4212 && order.contingency_type().unwrap() != ContingencyType::NoContingency
4213 && cancel_contingencies
4214 {
4215 self.cancel_contingent_orders(order);
4216 }
4217 }
4218
4219 fn update_order(
4220 &mut self,
4221 order: &mut OrderAny,
4222 quantity: Option<Quantity>,
4223 price: Option<Price>,
4224 trigger_price: Option<Price>,
4225 update_contingencies: Option<bool>,
4226 ) -> bool {
4227 let update_contingencies = update_contingencies.unwrap_or(true);
4228 let quantity = quantity.unwrap_or(order.quantity());
4229
4230 let price_prec = self.instrument.price_precision();
4231 let size_prec = self.instrument.size_precision();
4232 let instrument_id = self.instrument.id();
4233
4234 if quantity.precision != size_prec {
4235 self.generate_order_modify_rejected(
4236 order.trader_id(),
4237 order.strategy_id(),
4238 order.instrument_id(),
4239 order.client_order_id(),
4240 Ustr::from(&format!(
4241 "Invalid update quantity precision {}, expected {size_prec} for {instrument_id}",
4242 quantity.precision
4243 )),
4244 order.venue_order_id(),
4245 order.account_id(),
4246 );
4247 return false;
4248 }
4249
4250 if let Some(px) = price
4251 && px.precision != price_prec
4252 {
4253 self.generate_order_modify_rejected(
4254 order.trader_id(),
4255 order.strategy_id(),
4256 order.instrument_id(),
4257 order.client_order_id(),
4258 Ustr::from(&format!(
4259 "Invalid update price precision {}, expected {price_prec} for {instrument_id}",
4260 px.precision
4261 )),
4262 order.venue_order_id(),
4263 order.account_id(),
4264 );
4265 return false;
4266 }
4267
4268 if let Some(tp) = trigger_price
4269 && tp.precision != price_prec
4270 {
4271 self.generate_order_modify_rejected(
4272 order.trader_id(),
4273 order.strategy_id(),
4274 order.instrument_id(),
4275 order.client_order_id(),
4276 Ustr::from(&format!(
4277 "Invalid update trigger_price precision {}, expected {price_prec} for {instrument_id}",
4278 tp.precision
4279 )),
4280 order.venue_order_id(),
4281 order.account_id(),
4282 );
4283 return false;
4284 }
4285
4286 let filled_qty = self
4288 .cached_filled_qty
4289 .get(&order.client_order_id())
4290 .copied()
4291 .unwrap_or(order.filled_qty());
4292 if quantity < filled_qty {
4293 self.generate_order_modify_rejected(
4294 order.trader_id(),
4295 order.strategy_id(),
4296 order.instrument_id(),
4297 order.client_order_id(),
4298 Ustr::from(&format!(
4299 "Cannot reduce order quantity {quantity} below filled quantity {filled_qty}",
4300 )),
4301 order.venue_order_id(),
4302 order.account_id(),
4303 );
4304 return false;
4305 }
4306
4307 match order {
4308 OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
4309 let price = price.unwrap_or(order.price().unwrap());
4310 self.update_limit_order(order, quantity, price);
4311 }
4312 OrderAny::StopMarket(_) => {
4313 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
4314 self.update_stop_market_order(order, quantity, trigger_price);
4315 }
4316 OrderAny::StopLimit(_) => {
4317 let price = price.unwrap_or(order.price().unwrap());
4318 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
4319 self.update_stop_limit_order(order, quantity, price, trigger_price);
4320 }
4321 OrderAny::MarketIfTouched(_) => {
4322 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
4323 self.update_market_if_touched_order(order, quantity, trigger_price);
4324 }
4325 OrderAny::LimitIfTouched(_) => {
4326 let price = price.unwrap_or(order.price().unwrap());
4327 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
4328 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
4329 }
4330 OrderAny::TrailingStopMarket(_) => {
4331 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
4332 self.update_market_if_touched_order(order, quantity, trigger_price);
4333 }
4334 OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
4335 let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
4336 let trigger_price =
4337 trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
4338 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
4339 }
4340 _ => {
4341 panic!(
4342 "Unsupported order type {} for update_order",
4343 order.order_type()
4344 );
4345 }
4346 }
4347
4348 let new_leaves_qty = quantity.saturating_sub(filled_qty);
4350 if new_leaves_qty.is_zero() {
4351 if self.config.support_contingent_orders
4352 && order
4353 .contingency_type()
4354 .is_some_and(|c| c != ContingencyType::NoContingency)
4355 && update_contingencies
4356 {
4357 self.update_contingent_order(order);
4358 }
4359 self.cancel_order(order, Some(false));
4361 return true;
4362 }
4363
4364 if self.config.support_contingent_orders
4365 && order
4366 .contingency_type()
4367 .is_some_and(|c| c != ContingencyType::NoContingency)
4368 && update_contingencies
4369 {
4370 self.update_contingent_order(order);
4371 }
4372
4373 true
4374 }
4375
4376 pub fn trigger_stop_order(&mut self, client_order_id: ClientOrderId) {
4378 let order = match self.cache.borrow().order(&client_order_id).cloned() {
4379 Some(order) => order,
4380 None => {
4381 log::error!(
4382 "Cannot trigger stop order: order {client_order_id} not found in cache"
4383 );
4384 return;
4385 }
4386 };
4387
4388 match order.order_type() {
4389 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
4390 self.fill_limit_order(client_order_id);
4391 }
4392 OrderType::StopMarket | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
4393 self.fill_market_order(client_order_id);
4394 }
4395 _ => {
4396 log::error!(
4397 "Cannot trigger stop order: invalid order type {}",
4398 order.order_type()
4399 );
4400 }
4401 }
4402 }
4403
4404 fn update_contingent_order(&mut self, order: &OrderAny) {
4405 log::debug!("Updating OUO orders from {}", order.client_order_id());
4406 if let Some(linked_order_ids) = order.linked_order_ids() {
4407 let parent_filled_qty = self
4408 .cached_filled_qty
4409 .get(&order.client_order_id())
4410 .copied()
4411 .unwrap_or(order.filled_qty());
4412 let parent_leaves_qty = order.quantity().saturating_sub(parent_filled_qty);
4413
4414 for client_order_id in linked_order_ids {
4415 let mut child_order = match self.cache.borrow().order(client_order_id) {
4416 Some(order) => order.clone(),
4417 None => panic!("Order {client_order_id} not found in cache."),
4418 };
4419
4420 if child_order.is_active_local() {
4421 continue;
4422 }
4423
4424 let child_filled_qty = self
4425 .cached_filled_qty
4426 .get(&child_order.client_order_id())
4427 .copied()
4428 .unwrap_or(child_order.filled_qty());
4429
4430 if parent_leaves_qty.is_zero() {
4431 self.cancel_order(&child_order, Some(false));
4432 } else if child_filled_qty >= parent_leaves_qty {
4433 self.cancel_order(&child_order, Some(false));
4435 } else {
4436 let child_leaves_qty = child_order.quantity().saturating_sub(child_filled_qty);
4437 if child_leaves_qty != parent_leaves_qty {
4438 let price = child_order.price();
4439 let trigger_price = child_order.trigger_price();
4440 self.update_order(
4441 &mut child_order,
4442 Some(parent_leaves_qty),
4443 price,
4444 trigger_price,
4445 Some(false),
4446 );
4447 }
4448 }
4449 }
4450 }
4451 }
4452
4453 fn cancel_contingent_orders(&mut self, order: &OrderAny) {
4454 if let Some(linked_order_ids) = order.linked_order_ids() {
4455 for client_order_id in linked_order_ids {
4456 let contingent_order = match self.cache.borrow().order(client_order_id) {
4457 Some(order) => order.clone(),
4458 None => panic!("Cannot find contingent order for {client_order_id}"),
4459 };
4460
4461 if contingent_order.is_active_local() {
4462 continue;
4464 }
4465
4466 if !contingent_order.is_closed() {
4467 self.cancel_order(&contingent_order, Some(false));
4468 }
4469 }
4470 }
4471 }
4472
4473 fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
4474 let ts_now = self.clock.borrow().timestamp_ns();
4475 let account_id = order
4476 .account_id()
4477 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
4478
4479 let due_post_only = reason.as_str().starts_with("POST_ONLY");
4481
4482 let event = OrderEventAny::Rejected(OrderRejected::new(
4483 order.trader_id(),
4484 order.strategy_id(),
4485 order.instrument_id(),
4486 order.client_order_id(),
4487 account_id,
4488 reason,
4489 UUID4::new(),
4490 ts_now,
4491 ts_now,
4492 false,
4493 due_post_only,
4494 ));
4495 self.dispatch_order_event(event);
4496 }
4497
4498 fn generate_order_accepted(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
4499 let ts_now = self.clock.borrow().timestamp_ns();
4500 let account_id = order
4501 .account_id()
4502 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
4503 let event = OrderEventAny::Accepted(OrderAccepted::new(
4504 order.trader_id(),
4505 order.strategy_id(),
4506 order.instrument_id(),
4507 order.client_order_id(),
4508 venue_order_id,
4509 account_id,
4510 UUID4::new(),
4511 ts_now,
4512 ts_now,
4513 false,
4514 ));
4515
4516 self.dispatch_order_event(event);
4517 }
4518
4519 #[expect(clippy::too_many_arguments)]
4520 fn generate_order_modify_rejected(
4521 &self,
4522 trader_id: TraderId,
4523 strategy_id: StrategyId,
4524 instrument_id: InstrumentId,
4525 client_order_id: ClientOrderId,
4526 reason: Ustr,
4527 venue_order_id: Option<VenueOrderId>,
4528 account_id: Option<AccountId>,
4529 ) {
4530 let ts_now = self.clock.borrow().timestamp_ns();
4531 let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
4532 trader_id,
4533 strategy_id,
4534 instrument_id,
4535 client_order_id,
4536 reason,
4537 UUID4::new(),
4538 ts_now,
4539 ts_now,
4540 false,
4541 venue_order_id,
4542 account_id,
4543 ));
4544 self.dispatch_order_event(event);
4545 }
4546
4547 #[expect(clippy::too_many_arguments)]
4548 fn generate_order_cancel_rejected(
4549 &self,
4550 trader_id: TraderId,
4551 strategy_id: StrategyId,
4552 account_id: AccountId,
4553 instrument_id: InstrumentId,
4554 client_order_id: ClientOrderId,
4555 venue_order_id: Option<VenueOrderId>,
4556 reason: Ustr,
4557 ) {
4558 let ts_now = self.clock.borrow().timestamp_ns();
4559 let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
4560 trader_id,
4561 strategy_id,
4562 instrument_id,
4563 client_order_id,
4564 reason,
4565 UUID4::new(),
4566 ts_now,
4567 ts_now,
4568 false,
4569 venue_order_id,
4570 Some(account_id),
4571 ));
4572 self.dispatch_order_event(event);
4573 }
4574
4575 fn generate_order_updated(
4576 &self,
4577 order: &OrderAny,
4578 quantity: Quantity,
4579 price: Option<Price>,
4580 trigger_price: Option<Price>,
4581 protection_price: Option<Price>,
4582 ) {
4583 let ts_now = self.clock.borrow().timestamp_ns();
4584 let event = OrderEventAny::Updated(OrderUpdated::new(
4585 order.trader_id(),
4586 order.strategy_id(),
4587 order.instrument_id(),
4588 order.client_order_id(),
4589 quantity,
4590 UUID4::new(),
4591 ts_now,
4592 ts_now,
4593 false,
4594 order.venue_order_id(),
4595 order.account_id(),
4596 price,
4597 trigger_price,
4598 protection_price,
4599 order.is_quote_quantity(),
4600 ));
4601
4602 self.dispatch_order_event(event);
4603 }
4604
4605 fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
4606 let ts_now = self.clock.borrow().timestamp_ns();
4607 let event = OrderEventAny::Canceled(OrderCanceled::new(
4608 order.trader_id(),
4609 order.strategy_id(),
4610 order.instrument_id(),
4611 order.client_order_id(),
4612 UUID4::new(),
4613 ts_now,
4614 ts_now,
4615 false,
4616 Some(venue_order_id),
4617 order.account_id(),
4618 ));
4619 self.dispatch_order_event(event);
4620 }
4621
4622 fn generate_order_triggered(&self, order: &OrderAny) {
4623 let ts_now = self.clock.borrow().timestamp_ns();
4624 let event = OrderEventAny::Triggered(OrderTriggered::new(
4625 order.trader_id(),
4626 order.strategy_id(),
4627 order.instrument_id(),
4628 order.client_order_id(),
4629 UUID4::new(),
4630 ts_now,
4631 ts_now,
4632 false,
4633 order.venue_order_id(),
4634 order.account_id(),
4635 ));
4636 self.dispatch_order_event(event);
4637 }
4638
4639 fn generate_order_expired(&self, order: &OrderAny) {
4640 let ts_now = self.clock.borrow().timestamp_ns();
4641 let event = OrderEventAny::Expired(OrderExpired::new(
4642 order.trader_id(),
4643 order.strategy_id(),
4644 order.instrument_id(),
4645 order.client_order_id(),
4646 UUID4::new(),
4647 ts_now,
4648 ts_now,
4649 false,
4650 order.venue_order_id(),
4651 order.account_id(),
4652 ));
4653 self.dispatch_order_event(event);
4654 }
4655
4656 #[expect(clippy::too_many_arguments)]
4657 fn generate_order_filled(
4658 &mut self,
4659 order: &OrderAny,
4660 venue_order_id: VenueOrderId,
4661 venue_position_id: Option<PositionId>,
4662 last_qty: Quantity,
4663 last_px: Price,
4664 quote_currency: Currency,
4665 commission: Money,
4666 liquidity_side: LiquiditySide,
4667 ) {
4668 debug_assert!(
4669 last_qty <= order.quantity(),
4670 "Fill quantity {last_qty} exceeds order quantity {order_qty} for {client_order_id}",
4671 order_qty = order.quantity(),
4672 client_order_id = order.client_order_id()
4673 );
4674
4675 let ts_now = self.clock.borrow().timestamp_ns();
4676 let account_id = order
4677 .account_id()
4678 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
4679 let event = OrderEventAny::Filled(OrderFilled::new(
4680 order.trader_id(),
4681 order.strategy_id(),
4682 order.instrument_id(),
4683 order.client_order_id(),
4684 venue_order_id,
4685 account_id,
4686 self.ids_generator.generate_trade_id(ts_now),
4687 order.order_side(),
4688 order.order_type(),
4689 last_qty,
4690 last_px,
4691 quote_currency,
4692 liquidity_side,
4693 UUID4::new(),
4694 ts_now,
4695 ts_now,
4696 false,
4697 venue_position_id,
4698 Some(commission),
4699 ));
4700
4701 self.dispatch_order_event(event);
4702 }
4703}