nautilus_dydx/websocket/
dispatch.rs1use std::sync::atomic::{AtomicBool, Ordering};
24
25use dashmap::{DashMap, DashSet};
26use nautilus_core::UUID4;
27use nautilus_model::{
28 enums::{OrderSide, OrderType},
29 events::OrderFilled,
30 identifiers::{ClientOrderId, InstrumentId, StrategyId, TraderId},
31 reports::FillReport,
32 types::Currency,
33};
34
35const DEDUP_CAPACITY: usize = 10_000;
36
37#[derive(Debug, Clone)]
40pub struct OrderIdentity {
41 pub instrument_id: InstrumentId,
42 pub strategy_id: StrategyId,
43 pub order_side: OrderSide,
44 pub order_type: OrderType,
45}
46
47#[derive(Debug)]
50pub struct DydxWsDispatchState {
51 pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
52 pub emitted_accepted: DashSet<ClientOrderId>,
53 pub filled_orders: DashSet<ClientOrderId>,
54 clearing_accepted: AtomicBool,
55 clearing_filled: AtomicBool,
56}
57
58impl Default for DydxWsDispatchState {
59 fn default() -> Self {
60 Self {
61 order_identities: DashMap::new(),
62 emitted_accepted: DashSet::default(),
63 filled_orders: DashSet::default(),
64 clearing_accepted: AtomicBool::new(false),
65 clearing_filled: AtomicBool::new(false),
66 }
67 }
68}
69
70impl DydxWsDispatchState {
71 fn evict_if_full(set: &DashSet<ClientOrderId>, flag: &AtomicBool) {
72 if set.len() >= DEDUP_CAPACITY
73 && flag
74 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
75 .is_ok()
76 {
77 set.clear();
78 flag.store(false, Ordering::Release);
79 }
80 }
81
82 pub fn insert_accepted(&self, cid: ClientOrderId) {
83 Self::evict_if_full(&self.emitted_accepted, &self.clearing_accepted);
84 self.emitted_accepted.insert(cid);
85 }
86
87 pub fn insert_filled(&self, cid: ClientOrderId) {
88 Self::evict_if_full(&self.filled_orders, &self.clearing_filled);
89 self.filled_orders.insert(cid);
90 }
91
92 pub fn cleanup_terminal(&self, client_order_id: &ClientOrderId) {
94 self.order_identities.remove(client_order_id);
95 self.emitted_accepted.remove(client_order_id);
96 self.filled_orders.remove(client_order_id);
97 }
98}
99
100pub fn fill_report_to_order_filled(
109 report: &FillReport,
110 trader_id: TraderId,
111 identity: &OrderIdentity,
112 quote_currency: Currency,
113) -> OrderFilled {
114 OrderFilled::new(
115 trader_id,
116 identity.strategy_id,
117 report.instrument_id,
118 report
119 .client_order_id
120 .expect("tracked order has client_order_id"),
121 report.venue_order_id,
122 report.account_id,
123 report.trade_id,
124 identity.order_side,
125 identity.order_type,
126 report.last_qty,
127 report.last_px,
128 quote_currency,
129 report.liquidity_side,
130 UUID4::new(),
131 report.ts_event,
132 report.ts_init,
133 false,
134 report.venue_position_id,
135 Some(report.commission),
136 )
137}