1use std::sync::{
24 Mutex,
25 atomic::{AtomicBool, Ordering},
26};
27
28use ahash::AHashMap;
29use dashmap::DashMap;
30use nautilus_core::{UUID4, UnixNanos};
31use nautilus_live::ExecutionEventEmitter;
32use nautilus_model::{
33 enums::{OrderSide, OrderType},
34 events::{OrderAccepted, OrderEventAny, OrderFilled, OrderUpdated},
35 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
36 instruments::{Instrument, InstrumentAny},
37 reports::FillReport,
38 types::Currency,
39};
40use ustr::Ustr;
41
42use crate::{
43 common::enums::{BitmexExecType, BitmexOrderType, BitmexPegPriceType},
44 http::parse::{InstrumentParseResult, parse_instrument_any},
45 websocket::{
46 enums::BitmexAction,
47 messages::{BitmexExecutionMsg, BitmexTableMessage, BitmexWsMessage, OrderData},
48 parse::{
49 ParsedOrderEvent, parse_execution_msg, parse_margin_account_state, parse_order_event,
50 parse_order_msg, parse_order_update_msg, parse_position_msg, parse_wallet_msg,
51 },
52 },
53};
54
55const DEDUP_GENERATION_CAPACITY: usize = 10_000;
57
58#[derive(Debug, Clone)]
65pub struct OrderIdentity {
66 pub instrument_id: InstrumentId,
67 pub strategy_id: StrategyId,
68 pub order_side: OrderSide,
69 pub order_type: OrderType,
70}
71
72#[derive(Debug)]
81struct GenerationalDedupSet {
82 inner: Mutex<DedupInner>,
83}
84
85#[derive(Debug)]
86struct DedupInner {
87 current: ahash::AHashSet<ClientOrderId>,
88 previous: ahash::AHashSet<ClientOrderId>,
89}
90
91impl Default for GenerationalDedupSet {
92 fn default() -> Self {
93 Self {
94 inner: Mutex::new(DedupInner {
95 current: ahash::AHashSet::new(),
96 previous: ahash::AHashSet::new(),
97 }),
98 }
99 }
100}
101
102impl GenerationalDedupSet {
103 fn contains(&self, key: &ClientOrderId) -> bool {
104 let guard = self.inner.lock().expect("dedup lock poisoned");
105 guard.current.contains(key) || guard.previous.contains(key)
106 }
107
108 fn insert(&self, key: ClientOrderId) {
109 let mut guard = self.inner.lock().expect("dedup lock poisoned");
110 let inner = &mut *guard;
111 inner.current.insert(key);
112 if inner.current.len() >= DEDUP_GENERATION_CAPACITY {
113 inner.previous.clear();
114 std::mem::swap(&mut inner.current, &mut inner.previous);
115 }
116 }
117
118 fn remove(&self, key: &ClientOrderId) {
119 let mut guard = self.inner.lock().expect("dedup lock poisoned");
120 guard.current.remove(key);
121 guard.previous.remove(key);
122 }
123}
124
125#[derive(Debug)]
130pub struct WsDispatchState {
131 pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
132 emitted_accepted: GenerationalDedupSet,
133 triggered_orders: GenerationalDedupSet,
134 filled_orders: GenerationalDedupSet,
135 tombstoned: GenerationalDedupSet,
136 pub margin_subscribed: AtomicBool,
137}
138
139impl Default for WsDispatchState {
140 fn default() -> Self {
141 Self {
142 order_identities: DashMap::new(),
143 emitted_accepted: GenerationalDedupSet::default(),
144 triggered_orders: GenerationalDedupSet::default(),
145 filled_orders: GenerationalDedupSet::default(),
146 tombstoned: GenerationalDedupSet::default(),
147 margin_subscribed: AtomicBool::new(false),
148 }
149 }
150}
151
152impl WsDispatchState {
153 pub(crate) fn accepted_contains(&self, cid: &ClientOrderId) -> bool {
154 self.emitted_accepted.contains(cid)
155 }
156
157 pub(crate) fn filled_contains(&self, cid: &ClientOrderId) -> bool {
158 self.filled_orders.contains(cid)
159 }
160
161 pub(crate) fn triggered_contains(&self, cid: &ClientOrderId) -> bool {
162 self.triggered_orders.contains(cid)
163 }
164
165 pub(crate) fn insert_accepted(&self, cid: ClientOrderId) {
166 self.emitted_accepted.insert(cid);
167 }
168
169 pub(crate) fn insert_filled(&self, cid: ClientOrderId) {
170 self.filled_orders.insert(cid);
171 }
172
173 pub(crate) fn insert_triggered(&self, cid: ClientOrderId) {
174 self.triggered_orders.insert(cid);
175 }
176
177 pub(crate) fn remove_triggered(&self, cid: &ClientOrderId) {
178 self.triggered_orders.remove(cid);
179 }
180
181 pub(crate) fn remove_filled(&self, cid: &ClientOrderId) {
182 self.filled_orders.remove(cid);
183 }
184
185 pub(crate) fn remove_accepted(&self, cid: &ClientOrderId) {
186 self.emitted_accepted.remove(cid);
187 }
188
189 pub(crate) fn is_tombstoned(&self, cid: &ClientOrderId) -> bool {
191 self.tombstoned.contains(cid)
192 }
193
194 pub(crate) fn tombstone_order(&self, cid: &ClientOrderId) {
200 self.tombstoned.insert(*cid);
201 self.order_identities.remove(cid);
202 self.remove_accepted(cid);
203 self.remove_triggered(cid);
204 self.remove_filled(cid);
205 }
206}
207
208#[expect(clippy::too_many_arguments)]
210pub fn dispatch_ws_message(
211 ts_init: UnixNanos,
212 message: BitmexWsMessage,
213 emitter: &ExecutionEventEmitter,
214 state: &WsDispatchState,
215 instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
216 order_type_cache: &mut AHashMap<ClientOrderId, OrderType>,
217 order_symbol_cache: &mut AHashMap<ClientOrderId, Ustr>,
218 account_id: AccountId,
219) {
220 match message {
221 BitmexWsMessage::Table(table_msg) => match table_msg {
222 BitmexTableMessage::Order { data, .. } => {
223 dispatch_order_messages(
224 data,
225 emitter,
226 state,
227 instruments_by_symbol,
228 order_type_cache,
229 order_symbol_cache,
230 account_id,
231 ts_init,
232 );
233 }
234 BitmexTableMessage::Execution { data, .. } => {
235 dispatch_execution_messages(
236 data,
237 emitter,
238 state,
239 instruments_by_symbol,
240 order_symbol_cache,
241 ts_init,
242 );
243 }
244 BitmexTableMessage::Position { data, .. } => {
245 for pos_msg in data {
246 let Some(instrument) = instruments_by_symbol.get(&pos_msg.symbol) else {
247 log::error!(
248 "Instrument cache miss: position dropped for symbol={}, account={}",
249 pos_msg.symbol,
250 pos_msg.account,
251 );
252 continue;
253 };
254 let report = parse_position_msg(&pos_msg, instrument, ts_init);
255 emitter.send_position_report(report);
256 }
257 }
258 BitmexTableMessage::Wallet { data, .. } => {
259 if !state.margin_subscribed.load(Ordering::Relaxed) {
260 for wallet_msg in data {
261 let acct_state = parse_wallet_msg(&wallet_msg, ts_init);
262 emitter.send_account_state(acct_state);
263 }
264 }
265 }
266 BitmexTableMessage::Margin { data, .. } => {
267 state.margin_subscribed.store(true, Ordering::Relaxed);
268
269 for margin_msg in data {
270 let acct_state = parse_margin_account_state(&margin_msg, ts_init);
271 emitter.send_account_state(acct_state);
272 }
273 }
274 BitmexTableMessage::Instrument { action, data } => {
275 if matches!(action, BitmexAction::Partial | BitmexAction::Insert) {
276 for msg in data {
277 match msg.try_into() {
278 Ok(http_inst) => match parse_instrument_any(&http_inst, ts_init) {
279 InstrumentParseResult::Ok(boxed) => {
280 let inst = *boxed;
281 let symbol = inst.symbol().inner();
282 instruments_by_symbol.insert(symbol, inst);
283 }
284 InstrumentParseResult::Unsupported { .. }
285 | InstrumentParseResult::Inactive { .. } => {}
286 InstrumentParseResult::Failed { symbol, error, .. } => {
287 log::warn!("Failed to parse instrument {symbol}: {error}");
288 }
289 },
290 Err(e) => {
291 log::debug!("Skipping instrument (missing required fields): {e}");
292 }
293 }
294 }
295 }
296 }
297 BitmexTableMessage::OrderBookL2 { .. }
298 | BitmexTableMessage::OrderBookL2_25 { .. }
299 | BitmexTableMessage::OrderBook10 { .. }
300 | BitmexTableMessage::Quote { .. }
301 | BitmexTableMessage::Trade { .. }
302 | BitmexTableMessage::TradeBin1m { .. }
303 | BitmexTableMessage::TradeBin5m { .. }
304 | BitmexTableMessage::TradeBin1h { .. }
305 | BitmexTableMessage::TradeBin1d { .. }
306 | BitmexTableMessage::Funding { .. } => {
307 log::debug!("Ignoring BitMEX data message on execution stream");
308 }
309 _ => {
310 log::warn!("Unhandled table message type on execution stream");
311 }
312 },
313 BitmexWsMessage::Reconnected => {
314 order_type_cache.clear();
315 order_symbol_cache.clear();
316 log::info!("BitMEX execution websocket reconnected");
317 }
318 BitmexWsMessage::Authenticated => {
319 log::debug!("BitMEX execution websocket authenticated");
320 }
321 }
322}
323
324#[expect(clippy::too_many_arguments)]
327fn dispatch_order_messages(
328 data: Vec<OrderData>,
329 emitter: &ExecutionEventEmitter,
330 state: &WsDispatchState,
331 instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
332 order_type_cache: &mut AHashMap<ClientOrderId, OrderType>,
333 order_symbol_cache: &mut AHashMap<ClientOrderId, Ustr>,
334 account_id: AccountId,
335 ts_init: UnixNanos,
336) {
337 for order_data in data {
338 match order_data {
339 OrderData::Full(order_msg) => {
340 let Some(instrument) = instruments_by_symbol.get(&order_msg.symbol) else {
341 log::error!(
342 "Instrument cache miss: order dropped for symbol={}, order_id={}",
343 order_msg.symbol,
344 order_msg.order_id,
345 );
346 continue;
347 };
348
349 let client_order_id = order_msg.cl_ord_id.map(ClientOrderId::new);
350
351 if let Some(ref cid) = client_order_id {
354 if let Some(ord_type) = &order_msg.ord_type {
355 let order_type: OrderType = if *ord_type == BitmexOrderType::Pegged
356 && order_msg.peg_price_type == Some(BitmexPegPriceType::TrailingStopPeg)
357 {
358 if order_msg.price.is_some() {
359 OrderType::TrailingStopLimit
360 } else {
361 OrderType::TrailingStopMarket
362 }
363 } else {
364 (*ord_type).into()
365 };
366 order_type_cache.insert(*cid, order_type);
367 }
368 order_symbol_cache.insert(*cid, order_msg.symbol);
369 }
370
371 if let Some(ref cid) = client_order_id
373 && state.is_tombstoned(cid)
374 {
375 log::debug!("Skipping tombstoned order {cid}");
376 continue;
377 }
378
379 let identity = client_order_id
380 .and_then(|cid| state.order_identities.get(&cid).map(|r| (cid, r.clone())));
381
382 if let Some((cid, ident)) = identity {
383 if let Some(event) = parse_order_event(
385 &order_msg,
386 cid,
387 account_id,
388 emitter.trader_id(),
389 ident.strategy_id,
390 ts_init,
391 ) {
392 let venue_order_id = VenueOrderId::new(order_msg.order_id.to_string());
393 dispatch_parsed_order_event(
394 event,
395 cid,
396 account_id,
397 venue_order_id,
398 &ident,
399 emitter,
400 state,
401 ts_init,
402 );
403 }
404
405 if order_msg.ord_status.is_terminal() {
407 order_type_cache.remove(&cid);
408 order_symbol_cache.remove(&cid);
409 }
410 } else {
411 match parse_order_msg(&order_msg, instrument, order_type_cache, ts_init) {
413 Ok(report) => {
414 if report.order_status.is_closed()
415 && let Some(cid) = report.client_order_id
416 {
417 order_type_cache.remove(&cid);
418 order_symbol_cache.remove(&cid);
419 }
420 emitter.send_order_status_report(report);
421 }
422 Err(e) => {
423 log::error!(
424 "Failed to parse order report: error={e}, symbol={}, order_id={}",
425 order_msg.symbol,
426 order_msg.order_id,
427 );
428 }
429 }
430 }
431 }
432 OrderData::Update(msg) => {
433 let Some(instrument) = instruments_by_symbol.get(&msg.symbol) else {
434 log::error!(
435 "Instrument cache miss: order update dropped for symbol={}, order_id={}",
436 msg.symbol,
437 msg.order_id,
438 );
439 continue;
440 };
441
442 if let Some(cl_ord_id) = &msg.cl_ord_id {
444 let client_order_id = ClientOrderId::new(cl_ord_id);
445 order_symbol_cache.insert(client_order_id, msg.symbol);
446 }
447
448 let identity = msg.cl_ord_id.as_ref().and_then(|cl| {
449 let cid = ClientOrderId::new(cl);
450 state.order_identities.get(&cid).map(|r| (cid, r.clone()))
451 });
452
453 if let Some((cid, ident)) = identity {
454 if let Some(event) =
456 parse_order_update_msg(&msg, instrument, account_id, ts_init)
457 {
458 let enriched = OrderUpdated::new(
459 emitter.trader_id(),
460 ident.strategy_id,
461 event.instrument_id,
462 cid,
463 event.quantity,
464 event.event_id,
465 event.ts_event,
466 event.ts_init,
467 false,
468 event.venue_order_id,
469 Some(account_id),
470 event.price,
471 event.trigger_price,
472 event.protection_price,
473 false, );
475 ensure_accepted_emitted(
476 cid,
477 account_id,
478 enriched
479 .venue_order_id
480 .unwrap_or_else(|| VenueOrderId::new(msg.order_id.to_string())),
481 &ident,
482 emitter,
483 state,
484 ts_init,
485 );
486 emitter.send_order_event(OrderEventAny::Updated(enriched));
487 } else {
488 log::warn!(
489 "Skipped order update (insufficient data): order_id={}, price={:?}",
490 msg.order_id,
491 msg.price,
492 );
493 }
494 } else {
495 log::debug!(
496 "Skipping order update for untracked order: order_id={}",
497 msg.order_id,
498 );
499 }
500 }
501 }
502 }
503}
504
505fn dispatch_execution_messages(
508 data: Vec<BitmexExecutionMsg>,
509 emitter: &ExecutionEventEmitter,
510 state: &WsDispatchState,
511 instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
512 order_symbol_cache: &AHashMap<ClientOrderId, Ustr>,
513 ts_init: UnixNanos,
514) {
515 for exec_msg in data {
516 let symbol_opt = if let Some(sym) = &exec_msg.symbol {
517 Some(*sym)
518 } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
519 let client_order_id = ClientOrderId::new(cl_ord_id);
520 order_symbol_cache.get(&client_order_id).copied()
521 } else {
522 None
523 };
524
525 let Some(symbol) = symbol_opt else {
526 if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
527 if exec_msg.exec_type == Some(BitmexExecType::Trade) {
528 log::warn!(
529 "Execution missing symbol and not in cache: \
530 cl_ord_id={cl_ord_id}, exec_id={:?}",
531 exec_msg.exec_id,
532 );
533 } else {
534 log::debug!(
535 "Execution missing symbol and not in cache: \
536 cl_ord_id={cl_ord_id}, exec_type={:?}",
537 exec_msg.exec_type,
538 );
539 }
540 } else if exec_msg.exec_type == Some(BitmexExecType::CancelReject) {
541 log::debug!(
542 "CancelReject missing symbol/clOrdID (expected with redundant cancels): \
543 exec_id={:?}, order_id={:?}",
544 exec_msg.exec_id,
545 exec_msg.order_id,
546 );
547 } else {
548 log::warn!(
549 "Execution missing both symbol and clOrdID: \
550 exec_id={:?}, order_id={:?}, exec_type={:?}",
551 exec_msg.exec_id,
552 exec_msg.order_id,
553 exec_msg.exec_type,
554 );
555 }
556 continue;
557 };
558
559 let Some(instrument) = instruments_by_symbol.get(&symbol) else {
560 log::error!(
561 "Instrument cache miss: execution dropped for symbol={}, exec_id={:?}, exec_type={:?}",
562 symbol,
563 exec_msg.exec_id,
564 exec_msg.exec_type,
565 );
566 continue;
567 };
568
569 let Some(fill) = parse_execution_msg(exec_msg, instrument, ts_init) else {
570 continue;
571 };
572
573 let identity = fill
574 .client_order_id
575 .and_then(|cid| state.order_identities.get(&cid).map(|r| (cid, r.clone())));
576
577 if let Some((cid, ident)) = identity {
578 let venue_order_id = fill.venue_order_id;
580 ensure_accepted_emitted(
581 cid,
582 fill.account_id,
583 venue_order_id,
584 &ident,
585 emitter,
586 state,
587 ts_init,
588 );
589 state.insert_filled(cid);
590 state.remove_triggered(&cid);
591 let filled = fill_report_to_order_filled(
592 &fill,
593 emitter.trader_id(),
594 &ident,
595 instrument.quote_currency(),
596 );
597 emitter.send_order_event(OrderEventAny::Filled(filled));
598 } else {
599 emitter.send_fill_report(fill);
601 }
602 }
603}
604
605#[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
610fn dispatch_parsed_order_event(
611 event: ParsedOrderEvent,
612 client_order_id: ClientOrderId,
613 account_id: AccountId,
614 venue_order_id: VenueOrderId,
615 identity: &OrderIdentity,
616 emitter: &ExecutionEventEmitter,
617 state: &WsDispatchState,
618 ts_init: UnixNanos,
619) {
620 let is_terminal;
621
622 match event {
623 ParsedOrderEvent::Accepted(e) => {
624 if state.accepted_contains(&client_order_id)
625 || state.filled_contains(&client_order_id)
626 || state.triggered_contains(&client_order_id)
627 {
628 log::debug!("Skipping duplicate Accepted for {client_order_id}");
629 return;
630 }
631 state.insert_accepted(client_order_id);
632 is_terminal = false;
633 emitter.send_order_event(OrderEventAny::Accepted(e));
634 }
635 ParsedOrderEvent::Triggered(e) => {
636 if state.filled_contains(&client_order_id) {
637 log::debug!("Skipping stale Triggered for {client_order_id} (already filled)");
638 return;
639 }
640 ensure_accepted_emitted(
641 client_order_id,
642 account_id,
643 venue_order_id,
644 identity,
645 emitter,
646 state,
647 ts_init,
648 );
649 state.insert_triggered(client_order_id);
650 is_terminal = false;
651 emitter.send_order_event(OrderEventAny::Triggered(e));
652 }
653 ParsedOrderEvent::Canceled(e) => {
654 ensure_accepted_emitted(
655 client_order_id,
656 account_id,
657 venue_order_id,
658 identity,
659 emitter,
660 state,
661 ts_init,
662 );
663 state.remove_triggered(&client_order_id);
664 state.remove_filled(&client_order_id);
665 is_terminal = true;
666 emitter.send_order_event(OrderEventAny::Canceled(e));
667 }
668 ParsedOrderEvent::Expired(e) => {
669 ensure_accepted_emitted(
670 client_order_id,
671 account_id,
672 venue_order_id,
673 identity,
674 emitter,
675 state,
676 ts_init,
677 );
678 state.remove_triggered(&client_order_id);
679 state.remove_filled(&client_order_id);
680 is_terminal = true;
681 emitter.send_order_event(OrderEventAny::Expired(e));
682 }
683 ParsedOrderEvent::Rejected(e) => {
684 state.remove_triggered(&client_order_id);
685 state.remove_filled(&client_order_id);
686 is_terminal = true;
687 emitter.send_order_event(OrderEventAny::Rejected(e));
688 }
689 }
690
691 if is_terminal {
692 state.order_identities.remove(&client_order_id);
693 state.remove_accepted(&client_order_id);
694 }
695}
696
697fn ensure_accepted_emitted(
700 client_order_id: ClientOrderId,
701 account_id: AccountId,
702 venue_order_id: VenueOrderId,
703 identity: &OrderIdentity,
704 emitter: &ExecutionEventEmitter,
705 state: &WsDispatchState,
706 ts_init: UnixNanos,
707) {
708 if state.accepted_contains(&client_order_id) {
709 return;
710 }
711 state.insert_accepted(client_order_id);
712 let accepted = OrderAccepted::new(
713 emitter.trader_id(),
714 identity.strategy_id,
715 identity.instrument_id,
716 client_order_id,
717 venue_order_id,
718 account_id,
719 UUID4::new(),
720 ts_init,
721 ts_init,
722 false,
723 );
724 emitter.send_order_event(OrderEventAny::Accepted(accepted));
725}
726
727pub(crate) fn fill_report_to_order_filled(
729 report: &FillReport,
730 trader_id: TraderId,
731 identity: &OrderIdentity,
732 quote_currency: Currency,
733) -> OrderFilled {
734 OrderFilled::new(
735 trader_id,
736 identity.strategy_id,
737 report.instrument_id,
738 report
739 .client_order_id
740 .expect("tracked order has client_order_id"),
741 report.venue_order_id,
742 report.account_id,
743 report.trade_id,
744 identity.order_side,
745 identity.order_type,
746 report.last_qty,
747 report.last_px,
748 quote_currency,
749 report.liquidity_side,
750 UUID4::new(),
751 report.ts_event,
752 report.ts_init,
753 false,
754 report.venue_position_id,
755 Some(report.commission),
756 )
757}