nautilus_binance/common/
dispatch.rs1use std::sync::Mutex;
25
26use dashmap::DashMap;
27use nautilus_common::cache::fifo::FifoCache;
28use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
29use nautilus_live::ExecutionEventEmitter;
30use nautilus_model::{
31 enums::{OrderSide, OrderType},
32 events::{OrderAccepted, OrderEventAny},
33 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, VenueOrderId},
34 types::Price,
35};
36
37#[derive(Debug, Clone, Copy)]
39pub enum PendingOperation {
40 Place,
41 Cancel,
42 Modify,
43}
44
45#[derive(Debug, Clone)]
51pub struct PendingRequest {
52 pub client_order_id: ClientOrderId,
53 pub venue_order_id: Option<VenueOrderId>,
54 pub operation: PendingOperation,
55}
56
57#[derive(Debug, Clone)]
62pub struct OrderIdentity {
63 pub instrument_id: InstrumentId,
64 pub strategy_id: StrategyId,
65 pub order_side: OrderSide,
66 pub order_type: OrderType,
67 pub price: Option<Price>,
68}
69
70#[derive(Debug)]
76pub struct WsDispatchState {
77 pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
78 pub pending_requests: DashMap<String, PendingRequest>,
79 emitted_accepted: Mutex<FifoCache<ClientOrderId, 10_000>>,
80 filled_orders: Mutex<FifoCache<ClientOrderId, 10_000>>,
81}
82
83impl Default for WsDispatchState {
84 fn default() -> Self {
85 Self {
86 order_identities: DashMap::new(),
87 pending_requests: DashMap::new(),
88 emitted_accepted: Mutex::new(FifoCache::new()),
89 filled_orders: Mutex::new(FifoCache::new()),
90 }
91 }
92}
93
94#[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
95impl WsDispatchState {
96 pub fn has_emitted_accepted(&self, cid: &ClientOrderId) -> bool {
97 self.emitted_accepted
98 .lock()
99 .expect(MUTEX_POISONED)
100 .contains(cid)
101 }
102
103 pub fn insert_accepted(&self, cid: ClientOrderId) {
105 self.emitted_accepted.lock().expect(MUTEX_POISONED).add(cid);
106 }
107
108 pub fn has_filled(&self, cid: &ClientOrderId) -> bool {
109 self.filled_orders
110 .lock()
111 .expect(MUTEX_POISONED)
112 .contains(cid)
113 }
114
115 pub fn insert_filled(&self, cid: ClientOrderId) {
117 self.filled_orders.lock().expect(MUTEX_POISONED).add(cid);
118 }
119
120 pub fn cleanup_terminal(&self, cid: ClientOrderId) {
122 self.order_identities.remove(&cid);
123 self.emitted_accepted
124 .lock()
125 .expect(MUTEX_POISONED)
126 .remove(&cid);
127 self.filled_orders
128 .lock()
129 .expect(MUTEX_POISONED)
130 .remove(&cid);
131 }
132}
133
134pub fn ensure_accepted_emitted(
138 client_order_id: ClientOrderId,
139 account_id: AccountId,
140 venue_order_id: VenueOrderId,
141 identity: &OrderIdentity,
142 emitter: &ExecutionEventEmitter,
143 state: &WsDispatchState,
144 ts_init: UnixNanos,
145) {
146 if state.has_emitted_accepted(&client_order_id) {
147 return;
148 }
149 state.insert_accepted(client_order_id);
150 let accepted = OrderAccepted::new(
151 emitter.trader_id(),
152 identity.strategy_id,
153 identity.instrument_id,
154 client_order_id,
155 venue_order_id,
156 account_id,
157 UUID4::new(),
158 ts_init,
159 ts_init,
160 false,
161 );
162 emitter.send_order_event(OrderEventAny::Accepted(accepted));
163}