1use std::{
56 collections::VecDeque,
57 hash::Hash,
58 sync::{
59 Mutex,
60 atomic::{AtomicBool, Ordering},
61 },
62};
63
64use ahash::AHashSet;
65use dashmap::{DashMap, DashSet};
66use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
67use nautilus_live::ExecutionEventEmitter;
68use nautilus_model::{
69 enums::{OrderSide, OrderStatus, OrderType},
70 events::{
71 OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
72 OrderTriggered, OrderUpdated,
73 },
74 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
75 reports::{FillReport, OrderStatusReport},
76 types::{Price, Quantity},
77};
78use ustr::Ustr;
79
80pub const DEDUP_CAPACITY: usize = 10_000;
81
82#[derive(Debug, Clone)]
89pub struct OrderIdentity {
90 pub strategy_id: StrategyId,
92 pub instrument_id: InstrumentId,
94 pub order_side: OrderSide,
96 pub order_type: OrderType,
98 pub quantity: Quantity,
100 pub price: Option<Price>,
104}
105
106#[derive(Debug)]
114pub struct BoundedDedup<T>
115where
116 T: Eq + Hash + Clone,
117{
118 order: VecDeque<T>,
119 set: AHashSet<T>,
120 capacity: usize,
121}
122
123impl<T> BoundedDedup<T>
124where
125 T: Eq + Hash + Clone,
126{
127 #[must_use]
129 pub fn new(capacity: usize) -> Self {
130 Self {
131 order: VecDeque::with_capacity(capacity),
132 set: AHashSet::with_capacity(capacity),
133 capacity,
134 }
135 }
136
137 pub fn insert(&mut self, value: T) -> bool {
139 if self.set.contains(&value) {
140 return true;
141 }
142
143 if self.order.len() >= self.capacity
144 && let Some(evicted) = self.order.pop_front()
145 {
146 self.set.remove(&evicted);
147 }
148
149 self.order.push_back(value.clone());
150 self.set.insert(value);
151 false
152 }
153
154 #[must_use]
156 pub fn len(&self) -> usize {
157 self.set.len()
158 }
159
160 #[must_use]
162 pub fn is_empty(&self) -> bool {
163 self.set.is_empty()
164 }
165
166 #[must_use]
168 pub fn contains(&self, value: &T) -> bool {
169 self.set.contains(value)
170 }
171}
172
173#[derive(Debug)]
183pub struct WsDispatchState {
184 pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
186 pub emitted_accepted: DashSet<ClientOrderId>,
188 pub filled_orders: DashSet<ClientOrderId>,
193 pub emitted_trades: Mutex<BoundedDedup<TradeId>>,
198 pub cached_venue_order_ids: DashMap<ClientOrderId, VenueOrderId>,
205 pub pending_modify_keys: DashMap<ClientOrderId, VenueOrderId>,
212 pub order_filled_qty: DashMap<ClientOrderId, Quantity>,
215 clearing: AtomicBool,
216}
217
218impl Default for WsDispatchState {
219 fn default() -> Self {
220 Self {
221 order_identities: DashMap::new(),
222 emitted_accepted: DashSet::default(),
223 filled_orders: DashSet::default(),
224 emitted_trades: Mutex::new(BoundedDedup::new(DEDUP_CAPACITY)),
225 cached_venue_order_ids: DashMap::new(),
226 pending_modify_keys: DashMap::new(),
227 order_filled_qty: DashMap::new(),
228 clearing: AtomicBool::new(false),
229 }
230 }
231}
232
233impl WsDispatchState {
234 #[must_use]
236 pub fn new() -> Self {
237 Self::default()
238 }
239
240 pub fn register_identity(&self, client_order_id: ClientOrderId, identity: OrderIdentity) {
243 self.order_identities.insert(client_order_id, identity);
244 }
245
246 #[must_use]
248 pub fn lookup_identity(&self, client_order_id: &ClientOrderId) -> Option<OrderIdentity> {
249 self.order_identities
250 .get(client_order_id)
251 .map(|r| r.clone())
252 }
253
254 pub fn update_identity_price(&self, client_order_id: &ClientOrderId, price: Option<Price>) {
257 if let Some(price) = price
258 && let Some(mut entry) = self.order_identities.get_mut(client_order_id)
259 {
260 entry.price = Some(price);
261 }
262 }
263
264 pub fn update_identity_quantity(&self, client_order_id: &ClientOrderId, quantity: Quantity) {
266 if let Some(mut entry) = self.order_identities.get_mut(client_order_id) {
267 entry.quantity = quantity;
268 }
269 }
270
271 pub fn insert_accepted(&self, cid: ClientOrderId) {
273 self.evict_if_full(&self.emitted_accepted);
274 self.emitted_accepted.insert(cid);
275 }
276
277 pub fn insert_filled(&self, cid: ClientOrderId) {
279 self.evict_if_full(&self.filled_orders);
280 self.filled_orders.insert(cid);
281 }
282
283 #[allow(
288 clippy::missing_panics_doc,
289 reason = "dedup mutex poisoning is not expected"
290 )]
291 pub fn check_and_insert_trade(&self, trade_id: TradeId) -> bool {
292 let mut set = self.emitted_trades.lock().expect(MUTEX_POISONED);
293 set.insert(trade_id)
294 }
295
296 pub fn record_venue_order_id(
298 &self,
299 client_order_id: ClientOrderId,
300 venue_order_id: VenueOrderId,
301 ) {
302 self.cached_venue_order_ids
303 .insert(client_order_id, venue_order_id);
304 }
305
306 #[must_use]
308 pub fn cached_venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
309 self.cached_venue_order_ids.get(client_order_id).map(|r| *r)
310 }
311
312 pub fn mark_pending_modify(
314 &self,
315 client_order_id: ClientOrderId,
316 old_venue_order_id: VenueOrderId,
317 ) {
318 self.pending_modify_keys
319 .insert(client_order_id, old_venue_order_id);
320 }
321
322 pub fn clear_pending_modify(&self, client_order_id: &ClientOrderId) {
324 self.pending_modify_keys.remove(client_order_id);
325 }
326
327 #[must_use]
329 pub fn pending_modify(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
330 self.pending_modify_keys.get(client_order_id).map(|r| *r)
331 }
332
333 pub fn record_filled_qty(&self, client_order_id: ClientOrderId, qty: Quantity) {
335 self.order_filled_qty.insert(client_order_id, qty);
336 }
337
338 #[must_use]
340 pub fn previous_filled_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
341 self.order_filled_qty.get(client_order_id).map(|r| *r)
342 }
343
344 pub fn cleanup_terminal(&self, client_order_id: &ClientOrderId) {
349 self.order_identities.remove(client_order_id);
350 self.emitted_accepted.remove(client_order_id);
351 self.cached_venue_order_ids.remove(client_order_id);
352 self.pending_modify_keys.remove(client_order_id);
353 self.order_filled_qty.remove(client_order_id);
354 }
355
356 fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
357 if set.len() >= DEDUP_CAPACITY
358 && self
359 .clearing
360 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
361 .is_ok()
362 {
363 set.clear();
364 self.clearing.store(false, Ordering::Release);
365 }
366 }
367}
368
369#[derive(Debug, Clone, Copy, PartialEq, Eq)]
371pub enum DispatchOutcome {
372 Tracked,
376 External,
381 Skip,
385}
386
387pub fn dispatch_order_status_report(
397 report: &OrderStatusReport,
398 state: &WsDispatchState,
399 emitter: &ExecutionEventEmitter,
400 ts_init: UnixNanos,
401) -> DispatchOutcome {
402 let Some(client_order_id) = report.client_order_id else {
403 return DispatchOutcome::External;
404 };
405
406 if state.filled_orders.contains(&client_order_id) {
407 log::debug!(
408 "Skipping stale report for filled order: cid={client_order_id}, status={:?}",
409 report.order_status,
410 );
411 return DispatchOutcome::Skip;
412 }
413
414 let Some(identity) = state.lookup_identity(&client_order_id) else {
415 return DispatchOutcome::External;
416 };
417
418 match report.order_status {
419 OrderStatus::Accepted => {
420 handle_accepted(report, client_order_id, &identity, state, emitter, ts_init)
421 }
422 OrderStatus::Triggered => {
423 handle_triggered(report, client_order_id, &identity, state, emitter, ts_init)
424 }
425 OrderStatus::Canceled => {
426 handle_canceled(report, client_order_id, &identity, state, emitter, ts_init)
427 }
428 OrderStatus::Expired => {
429 handle_expired(report, client_order_id, &identity, state, emitter, ts_init)
430 }
431 OrderStatus::Rejected => {
432 handle_rejected(report, client_order_id, &identity, state, emitter, ts_init)
433 }
434 OrderStatus::Filled => handle_filled_marker(client_order_id, state),
435 OrderStatus::PartiallyFilled => {
436 DispatchOutcome::Tracked
438 }
439 OrderStatus::PendingUpdate
440 | OrderStatus::PendingCancel
441 | OrderStatus::Submitted
442 | OrderStatus::Initialized
443 | OrderStatus::Denied
444 | OrderStatus::Released
445 | OrderStatus::Emulated => DispatchOutcome::Tracked,
446 }
447}
448
449pub fn dispatch_fill_report(
460 report: &FillReport,
461 state: &WsDispatchState,
462 emitter: &ExecutionEventEmitter,
463 ts_init: UnixNanos,
464) -> DispatchOutcome {
465 let Some(client_order_id) = report.client_order_id else {
466 return DispatchOutcome::External;
467 };
468
469 if state.filled_orders.contains(&client_order_id) {
470 log::debug!(
471 "Skipping stale fill for filled order: cid={client_order_id}, trade_id={}",
472 report.trade_id,
473 );
474 return DispatchOutcome::Skip;
475 }
476
477 let Some(identity) = state.lookup_identity(&client_order_id) else {
478 return DispatchOutcome::External;
479 };
480
481 if state.check_and_insert_trade(report.trade_id) {
482 log::debug!(
483 "Skipping duplicate fill for {client_order_id}: trade_id={}",
484 report.trade_id
485 );
486 return DispatchOutcome::Tracked;
487 }
488
489 ensure_accepted_emitted(
490 client_order_id,
491 report.venue_order_id,
492 report.account_id,
493 &identity,
494 state,
495 emitter,
496 report.ts_event,
497 ts_init,
498 );
499
500 let filled = OrderFilled::new(
501 emitter.trader_id(),
502 identity.strategy_id,
503 identity.instrument_id,
504 client_order_id,
505 report.venue_order_id,
506 report.account_id,
507 report.trade_id,
508 identity.order_side,
509 identity.order_type,
510 report.last_qty,
511 report.last_px,
512 report.commission.currency,
513 report.liquidity_side,
514 UUID4::new(),
515 report.ts_event,
516 ts_init,
517 false,
518 report.venue_position_id,
519 Some(report.commission),
520 );
521 emitter.send_order_event(OrderEventAny::Filled(filled));
522
523 let previous = state
524 .previous_filled_qty(&client_order_id)
525 .unwrap_or_else(|| Quantity::zero(report.last_qty.precision));
526 let cumulative = previous + report.last_qty;
527 state.record_filled_qty(client_order_id, cumulative);
528
529 if cumulative >= identity.quantity {
530 state.insert_filled(client_order_id);
531 state.cleanup_terminal(&client_order_id);
532 }
533
534 DispatchOutcome::Tracked
535}
536
537fn handle_accepted(
538 report: &OrderStatusReport,
539 client_order_id: ClientOrderId,
540 identity: &OrderIdentity,
541 state: &WsDispatchState,
542 emitter: &ExecutionEventEmitter,
543 ts_init: UnixNanos,
544) -> DispatchOutcome {
545 let venue_order_id = report.venue_order_id;
546 let ts_event = report.ts_last;
547 let account_id = report.account_id;
548
549 if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
554 && cached_voi != venue_order_id
555 {
556 let price = report.price.or(identity.price);
557 let Some(price) = price else {
558 log::warn!(
559 "Cannot emit OrderUpdated for cancel-replace {client_order_id}: \
560 no price on report and no cached price on identity",
561 );
562 return DispatchOutcome::Skip;
563 };
564
565 state.record_venue_order_id(client_order_id, venue_order_id);
566 state.update_identity_quantity(&client_order_id, report.quantity);
567 state.update_identity_price(&client_order_id, Some(price));
568 state.clear_pending_modify(&client_order_id);
569
570 let updated = OrderUpdated::new(
571 emitter.trader_id(),
572 identity.strategy_id,
573 identity.instrument_id,
574 client_order_id,
575 report.quantity,
576 UUID4::new(),
577 ts_event,
578 ts_init,
579 false,
580 Some(venue_order_id),
581 Some(account_id),
582 Some(price),
583 report.trigger_price,
584 None,
585 false,
586 );
587 emitter.send_order_event(OrderEventAny::Updated(updated));
588 return DispatchOutcome::Tracked;
589 }
590
591 if state.emitted_accepted.contains(&client_order_id) {
592 state.update_identity_price(&client_order_id, report.price);
596 return DispatchOutcome::Tracked;
597 }
598
599 state.insert_accepted(client_order_id);
600 state.record_venue_order_id(client_order_id, venue_order_id);
601 state.update_identity_price(&client_order_id, report.price);
602
603 let accepted = OrderAccepted::new(
604 emitter.trader_id(),
605 identity.strategy_id,
606 identity.instrument_id,
607 client_order_id,
608 venue_order_id,
609 account_id,
610 UUID4::new(),
611 ts_event,
612 ts_init,
613 false,
614 );
615 emitter.send_order_event(OrderEventAny::Accepted(accepted));
616 DispatchOutcome::Tracked
617}
618
619fn handle_triggered(
620 report: &OrderStatusReport,
621 client_order_id: ClientOrderId,
622 identity: &OrderIdentity,
623 state: &WsDispatchState,
624 emitter: &ExecutionEventEmitter,
625 ts_init: UnixNanos,
626) -> DispatchOutcome {
627 if !matches!(
628 identity.order_type,
629 OrderType::StopLimit | OrderType::TrailingStopLimit | OrderType::LimitIfTouched
630 ) {
631 log::debug!(
632 "Ignoring TRIGGERED status for non-triggerable order type {:?}: {client_order_id}",
633 identity.order_type,
634 );
635 return DispatchOutcome::Tracked;
636 }
637
638 ensure_accepted_emitted(
639 client_order_id,
640 report.venue_order_id,
641 report.account_id,
642 identity,
643 state,
644 emitter,
645 report.ts_last,
646 ts_init,
647 );
648
649 let triggered = OrderTriggered::new(
650 emitter.trader_id(),
651 identity.strategy_id,
652 identity.instrument_id,
653 client_order_id,
654 UUID4::new(),
655 report.ts_last,
656 ts_init,
657 false,
658 Some(report.venue_order_id),
659 Some(report.account_id),
660 );
661 emitter.send_order_event(OrderEventAny::Triggered(triggered));
662 DispatchOutcome::Tracked
663}
664
665fn handle_canceled(
666 report: &OrderStatusReport,
667 client_order_id: ClientOrderId,
668 identity: &OrderIdentity,
669 state: &WsDispatchState,
670 emitter: &ExecutionEventEmitter,
671 ts_init: UnixNanos,
672) -> DispatchOutcome {
673 let venue_order_id = report.venue_order_id;
674
675 if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
679 && cached_voi != venue_order_id
680 {
681 log::debug!(
682 "Skipping stale CANCELED for {venue_order_id} (cached {cached_voi}) on {client_order_id}",
683 );
684 return DispatchOutcome::Skip;
685 }
686
687 if let Some(pending_old) = state.pending_modify(&client_order_id)
693 && pending_old == venue_order_id
694 {
695 log::debug!(
696 "Skipping cancel-before-accept leg for {client_order_id}: venue_order_id={venue_order_id}",
697 );
698 return DispatchOutcome::Skip;
699 }
700
701 ensure_accepted_emitted(
702 client_order_id,
703 venue_order_id,
704 report.account_id,
705 identity,
706 state,
707 emitter,
708 report.ts_last,
709 ts_init,
710 );
711
712 let canceled = OrderCanceled::new(
713 emitter.trader_id(),
714 identity.strategy_id,
715 identity.instrument_id,
716 client_order_id,
717 UUID4::new(),
718 report.ts_last,
719 ts_init,
720 false,
721 Some(venue_order_id),
722 Some(report.account_id),
723 );
724 emitter.send_order_event(OrderEventAny::Canceled(canceled));
725
726 state.insert_filled(client_order_id);
729 state.cleanup_terminal(&client_order_id);
730 DispatchOutcome::Tracked
731}
732
733fn handle_expired(
734 report: &OrderStatusReport,
735 client_order_id: ClientOrderId,
736 identity: &OrderIdentity,
737 state: &WsDispatchState,
738 emitter: &ExecutionEventEmitter,
739 ts_init: UnixNanos,
740) -> DispatchOutcome {
741 ensure_accepted_emitted(
742 client_order_id,
743 report.venue_order_id,
744 report.account_id,
745 identity,
746 state,
747 emitter,
748 report.ts_last,
749 ts_init,
750 );
751
752 let expired = OrderExpired::new(
753 emitter.trader_id(),
754 identity.strategy_id,
755 identity.instrument_id,
756 client_order_id,
757 UUID4::new(),
758 report.ts_last,
759 ts_init,
760 false,
761 Some(report.venue_order_id),
762 Some(report.account_id),
763 );
764 emitter.send_order_event(OrderEventAny::Expired(expired));
765 state.insert_filled(client_order_id);
766 state.cleanup_terminal(&client_order_id);
767 DispatchOutcome::Tracked
768}
769
770fn handle_rejected(
771 report: &OrderStatusReport,
772 client_order_id: ClientOrderId,
773 identity: &OrderIdentity,
774 state: &WsDispatchState,
775 emitter: &ExecutionEventEmitter,
776 ts_init: UnixNanos,
777) -> DispatchOutcome {
778 let reason = report
779 .cancel_reason
780 .clone()
781 .unwrap_or_else(|| "Order rejected by exchange".to_string());
782 let rejected = OrderRejected::new(
783 emitter.trader_id(),
784 identity.strategy_id,
785 identity.instrument_id,
786 client_order_id,
787 report.account_id,
788 Ustr::from(&reason),
789 UUID4::new(),
790 report.ts_last,
791 ts_init,
792 false,
793 false,
794 );
795 emitter.send_order_event(OrderEventAny::Rejected(rejected));
796 state.insert_filled(client_order_id);
797 state.cleanup_terminal(&client_order_id);
798 DispatchOutcome::Tracked
799}
800
801fn handle_filled_marker(
802 _client_order_id: ClientOrderId,
803 _state: &WsDispatchState,
804) -> DispatchOutcome {
805 DispatchOutcome::Tracked
813}
814
815#[allow(clippy::too_many_arguments)]
823fn ensure_accepted_emitted(
824 client_order_id: ClientOrderId,
825 venue_order_id: VenueOrderId,
826 account_id: AccountId,
827 identity: &OrderIdentity,
828 state: &WsDispatchState,
829 emitter: &ExecutionEventEmitter,
830 ts_event: UnixNanos,
831 ts_init: UnixNanos,
832) {
833 if state.emitted_accepted.contains(&client_order_id) {
834 return;
835 }
836 state.insert_accepted(client_order_id);
837 state.record_venue_order_id(client_order_id, venue_order_id);
838
839 let accepted = OrderAccepted::new(
840 emitter.trader_id(),
841 identity.strategy_id,
842 identity.instrument_id,
843 client_order_id,
844 venue_order_id,
845 account_id,
846 UUID4::new(),
847 ts_event,
848 ts_init,
849 false,
850 );
851 emitter.send_order_event(OrderEventAny::Accepted(accepted));
852}
853
854#[cfg(test)]
855mod tests {
856 use nautilus_model::identifiers::{ClientOrderId, InstrumentId, StrategyId, TradeId};
857 use rstest::rstest;
858
859 use super::*;
860
861 fn make_identity() -> OrderIdentity {
862 OrderIdentity {
863 strategy_id: StrategyId::from("S-001"),
864 instrument_id: InstrumentId::from("BTC-USD-PERP.HYPERLIQUID"),
865 order_side: OrderSide::Buy,
866 order_type: OrderType::Limit,
867 quantity: Quantity::from("0.0001"),
868 price: None,
869 }
870 }
871
872 #[rstest]
873 fn test_register_and_lookup_identity() {
874 let state = WsDispatchState::new();
875 let cid = ClientOrderId::new("O-001");
876 state.register_identity(cid, make_identity());
877
878 let found = state.lookup_identity(&cid);
879 assert!(found.is_some());
880 let identity = found.unwrap();
881 assert_eq!(identity.strategy_id.as_str(), "S-001");
882 assert_eq!(identity.order_side, OrderSide::Buy);
883 }
884
885 #[rstest]
886 fn test_lookup_identity_missing_returns_none() {
887 let state = WsDispatchState::new();
888 let cid = ClientOrderId::new("not-tracked");
889 assert!(state.lookup_identity(&cid).is_none());
890 }
891
892 #[rstest]
893 fn test_insert_accepted_dedup() {
894 let state = WsDispatchState::new();
895 let cid = ClientOrderId::new("O-002");
896 assert!(!state.emitted_accepted.contains(&cid));
897 state.insert_accepted(cid);
898 assert!(state.emitted_accepted.contains(&cid));
899 state.insert_accepted(cid);
900 assert!(state.emitted_accepted.contains(&cid));
901 }
902
903 #[rstest]
904 fn test_check_and_insert_trade_detects_duplicates() {
905 let state = WsDispatchState::new();
906 let trade = TradeId::new("trade-1");
907 assert!(!state.check_and_insert_trade(trade));
908 assert!(state.check_and_insert_trade(trade));
909 }
910
911 #[rstest]
912 fn test_bounded_dedup_fifo_eviction_preserves_recent_ids() {
913 let mut dedup: BoundedDedup<TradeId> = BoundedDedup::new(3);
914 assert!(!dedup.insert(TradeId::new("t-0")));
915 assert!(!dedup.insert(TradeId::new("t-1")));
916 assert!(!dedup.insert(TradeId::new("t-2")));
917 assert_eq!(dedup.len(), 3);
918
919 assert!(!dedup.insert(TradeId::new("t-3")));
921 assert_eq!(dedup.len(), 3);
922 assert!(!dedup.contains(&TradeId::new("t-0")));
923 assert!(dedup.contains(&TradeId::new("t-1")));
924 assert!(dedup.contains(&TradeId::new("t-3")));
925 }
926
927 #[rstest]
928 fn test_pending_modify_roundtrip() {
929 let state = WsDispatchState::new();
930 let cid = ClientOrderId::new("O-010");
931 let voi = VenueOrderId::new("v-1");
932
933 assert!(state.pending_modify(&cid).is_none());
934 state.mark_pending_modify(cid, voi);
935 assert_eq!(state.pending_modify(&cid), Some(voi));
936 state.clear_pending_modify(&cid);
937 assert!(state.pending_modify(&cid).is_none());
938 }
939
940 #[rstest]
941 fn test_cleanup_terminal_preserves_filled_marker() {
942 let state = WsDispatchState::new();
943 let cid = ClientOrderId::new("O-020");
944 state.register_identity(cid, make_identity());
945 state.insert_accepted(cid);
946 state.insert_filled(cid);
947 state.cleanup_terminal(&cid);
948
949 assert!(state.lookup_identity(&cid).is_none());
950 assert!(!state.emitted_accepted.contains(&cid));
951 assert!(state.filled_orders.contains(&cid));
953 }
954}