1pub mod futures;
36pub mod spot;
37
38use std::sync::{
39 Arc, Mutex,
40 atomic::{AtomicBool, Ordering},
41};
42
43use dashmap::{DashMap, DashSet};
44use indexmap::IndexSet;
45use nautilus_core::{AtomicMap, UUID4, UnixNanos};
46use nautilus_live::ExecutionEventEmitter;
47use nautilus_model::{
48 enums::{OrderSide, OrderType},
49 events::{OrderAccepted, OrderEventAny, OrderFilled},
50 identifiers::{
51 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TradeId, TraderId, VenueOrderId,
52 },
53 instruments::InstrumentAny,
54 reports::FillReport,
55 types::{Currency, Quantity},
56};
57
58use crate::common::consts::KRAKEN_VENUE;
59
60const DEDUP_CAPACITY: usize = 10_000;
61
62#[derive(Debug, Clone, Copy, PartialEq)]
68pub struct DeltaSnapshot {
69 pub qty: Quantity,
70 pub filled: Quantity,
71 pub limit_price_bits: Option<u64>,
72 pub stop_price_bits: Option<u64>,
73}
74
75impl DeltaSnapshot {
76 pub(crate) fn new(
77 qty: Quantity,
78 filled: Quantity,
79 limit_price: Option<f64>,
80 stop_price: Option<f64>,
81 ) -> Self {
82 Self {
83 qty,
84 filled,
85 limit_price_bits: limit_price.map(f64::to_bits),
86 stop_price_bits: stop_price.map(f64::to_bits),
87 }
88 }
89
90 pub(crate) fn non_fill_fields_match(&self, other: &Self) -> bool {
91 self.qty == other.qty
92 && self.limit_price_bits == other.limit_price_bits
93 && self.stop_price_bits == other.stop_price_bits
94 }
95}
96
97#[derive(Debug, Clone)]
104pub struct OrderIdentity {
105 pub strategy_id: StrategyId,
107 pub instrument_id: InstrumentId,
109 pub order_side: OrderSide,
111 pub order_type: OrderType,
113 pub quantity: Quantity,
115}
116
117#[derive(Debug)]
125pub struct WsDispatchState {
126 pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
128 pub emitted_accepted: DashSet<ClientOrderId>,
130 pub filled_orders: DashSet<ClientOrderId>,
132 pub delta_snapshots: DashMap<ClientOrderId, DeltaSnapshot>,
142 pub order_filled_qty: DashMap<ClientOrderId, Quantity>,
148 pub emitted_trades: Mutex<IndexSet<TradeId>>,
156 clearing: AtomicBool,
157}
158
159impl Default for WsDispatchState {
160 fn default() -> Self {
161 Self {
162 order_identities: DashMap::new(),
163 emitted_accepted: DashSet::default(),
164 filled_orders: DashSet::default(),
165 delta_snapshots: DashMap::new(),
166 order_filled_qty: DashMap::new(),
167 emitted_trades: Mutex::new(IndexSet::with_capacity(DEDUP_CAPACITY)),
168 clearing: AtomicBool::new(false),
169 }
170 }
171}
172
173impl WsDispatchState {
174 #[must_use]
176 pub fn new() -> Self {
177 Self::default()
178 }
179
180 pub fn register_identity(&self, client_order_id: ClientOrderId, identity: OrderIdentity) {
183 self.order_identities.insert(client_order_id, identity);
184 }
185
186 #[must_use]
188 pub fn lookup_identity(&self, client_order_id: &ClientOrderId) -> Option<OrderIdentity> {
189 self.order_identities
190 .get(client_order_id)
191 .map(|r| r.clone())
192 }
193
194 pub fn insert_accepted(&self, cid: ClientOrderId) {
196 self.evict_if_full(&self.emitted_accepted);
197 self.emitted_accepted.insert(cid);
198 }
199
200 pub fn insert_filled(&self, cid: ClientOrderId) {
202 self.evict_if_full(&self.filled_orders);
203 self.filled_orders.insert(cid);
204 }
205
206 #[expect(
213 clippy::missing_panics_doc,
214 reason = "dedup mutex poisoning is not expected"
215 )]
216 pub fn check_and_insert_trade(&self, trade_id: TradeId) -> bool {
217 let mut set = self.emitted_trades.lock().expect("dedup mutex poisoned");
218
219 if set.contains(&trade_id) {
220 return true;
221 }
222
223 if set.len() >= DEDUP_CAPACITY {
224 set.shift_remove_index(0);
225 }
226
227 set.insert(trade_id);
228 false
229 }
230
231 pub fn cleanup_terminal(&self, client_order_id: &ClientOrderId) {
233 self.order_identities.remove(client_order_id);
234 self.emitted_accepted.remove(client_order_id);
235 self.order_filled_qty.remove(client_order_id);
236 self.delta_snapshots.remove(client_order_id);
237 }
238
239 pub fn record_filled_qty(&self, client_order_id: ClientOrderId, qty: Quantity) {
242 self.order_filled_qty.insert(client_order_id, qty);
243 }
244
245 #[must_use]
247 pub fn previous_filled_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
248 self.order_filled_qty.get(client_order_id).map(|r| *r)
249 }
250
251 pub fn record_delta_snapshot(&self, client_order_id: ClientOrderId, snapshot: DeltaSnapshot) {
254 self.delta_snapshots.insert(client_order_id, snapshot);
255 }
256
257 #[must_use]
259 pub fn previous_delta_snapshot(
260 &self,
261 client_order_id: &ClientOrderId,
262 ) -> Option<DeltaSnapshot> {
263 self.delta_snapshots.get(client_order_id).map(|r| *r)
264 }
265
266 pub fn update_identity_quantity(&self, client_order_id: &ClientOrderId, quantity: Quantity) {
269 if let Some(mut entry) = self.order_identities.get_mut(client_order_id) {
270 entry.quantity = quantity;
271 }
272 }
273
274 fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
275 if set.len() >= DEDUP_CAPACITY
276 && self
277 .clearing
278 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
279 .is_ok()
280 {
281 set.clear();
282 self.clearing.store(false, Ordering::Release);
283 }
284 }
285}
286
287pub(crate) fn resolve_client_order_id(
295 truncated: &str,
296 truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
297) -> ClientOrderId {
298 truncated_id_map
299 .load()
300 .get(truncated)
301 .copied()
302 .unwrap_or_else(|| ClientOrderId::new(truncated))
303}
304
305#[expect(clippy::too_many_arguments)]
313pub(crate) fn ensure_accepted_emitted(
314 client_order_id: ClientOrderId,
315 venue_order_id: VenueOrderId,
316 account_id: AccountId,
317 identity: &OrderIdentity,
318 state: &WsDispatchState,
319 emitter: &ExecutionEventEmitter,
320 ts_event: UnixNanos,
321 ts_init: UnixNanos,
322) {
323 if state.emitted_accepted.contains(&client_order_id) {
324 return;
325 }
326 state.insert_accepted(client_order_id);
327 let accepted = OrderAccepted::new(
328 emitter.trader_id(),
329 identity.strategy_id,
330 identity.instrument_id,
331 client_order_id,
332 venue_order_id,
333 account_id,
334 UUID4::new(),
335 ts_event,
336 ts_init,
337 false,
338 );
339 emitter.send_order_event(OrderEventAny::Accepted(accepted));
340}
341
342pub(crate) fn fill_report_to_order_filled(
345 report: &FillReport,
346 trader_id: TraderId,
347 identity: &OrderIdentity,
348 quote_currency: Currency,
349 client_order_id: ClientOrderId,
350) -> OrderFilled {
351 OrderFilled::new(
352 trader_id,
353 identity.strategy_id,
354 identity.instrument_id,
355 client_order_id,
356 report.venue_order_id,
357 report.account_id,
358 report.trade_id,
359 identity.order_side,
360 identity.order_type,
361 report.last_qty,
362 report.last_px,
363 quote_currency,
364 report.liquidity_side,
365 UUID4::new(),
366 report.ts_event,
367 report.ts_init,
368 false,
369 report.venue_position_id,
370 Some(report.commission),
371 )
372}
373
374pub(crate) fn lookup_instrument(
376 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
377 raw_symbol: &str,
378) -> Option<InstrumentAny> {
379 let instrument_id = InstrumentId::new(Symbol::new(raw_symbol), *KRAKEN_VENUE);
380 instruments.load().get(&instrument_id).cloned()
381}
382
383#[cfg(test)]
384mod tests {
385 use nautilus_model::{
386 enums::{OrderSide, OrderType},
387 identifiers::{ClientOrderId, InstrumentId, StrategyId, TradeId},
388 };
389 use rstest::rstest;
390
391 use super::*;
392
393 fn make_identity() -> OrderIdentity {
394 OrderIdentity {
395 strategy_id: StrategyId::new("EXEC_TESTER-001"),
396 instrument_id: InstrumentId::from("PF_XBTUSD.KRAKEN"),
397 order_side: OrderSide::Buy,
398 order_type: OrderType::Limit,
399 quantity: Quantity::from("0.0001"),
400 }
401 }
402
403 #[rstest]
404 fn test_register_and_lookup_identity() {
405 let state = WsDispatchState::new();
406 let cid = ClientOrderId::new("uuid-1");
407 state.register_identity(cid, make_identity());
408
409 let found = state.lookup_identity(&cid);
410 assert!(found.is_some());
411 let identity = found.unwrap();
412 assert_eq!(identity.strategy_id.as_str(), "EXEC_TESTER-001");
413 assert_eq!(identity.order_side, OrderSide::Buy);
414 }
415
416 #[rstest]
417 fn test_lookup_identity_missing_returns_none() {
418 let state = WsDispatchState::new();
419 let cid = ClientOrderId::new("not-tracked");
420 assert!(state.lookup_identity(&cid).is_none());
421 }
422
423 #[rstest]
424 fn test_insert_accepted_dedup() {
425 let state = WsDispatchState::new();
426 let cid = ClientOrderId::new("uuid-2");
427 assert!(!state.emitted_accepted.contains(&cid));
428 state.insert_accepted(cid);
429 assert!(state.emitted_accepted.contains(&cid));
430 state.insert_accepted(cid);
432 assert!(state.emitted_accepted.contains(&cid));
433 }
434
435 #[rstest]
436 fn test_check_and_insert_trade_detects_duplicates() {
437 let state = WsDispatchState::new();
438 let trade = TradeId::new("trade-1");
439 assert!(!state.check_and_insert_trade(trade));
441 assert!(state.check_and_insert_trade(trade));
443 }
444
445 #[rstest]
446 fn test_check_and_insert_trade_fifo_eviction_preserves_recent_ids() {
447 let state = WsDispatchState::new();
451 for i in 0..DEDUP_CAPACITY {
452 let trade = TradeId::new(format!("trade-{i}").as_str());
453 assert!(!state.check_and_insert_trade(trade));
454 }
455 let overflow = TradeId::new(format!("trade-{DEDUP_CAPACITY}").as_str());
457 assert!(!state.check_and_insert_trade(overflow));
458
459 let set = state.emitted_trades.lock().expect("dedup mutex poisoned");
462 assert_eq!(set.len(), DEDUP_CAPACITY);
463 assert!(
464 !set.contains(&TradeId::new("trade-0")),
465 "oldest entry should have been evicted",
466 );
467 assert!(
468 set.contains(&TradeId::new("trade-1")),
469 "second-oldest remains"
470 );
471 assert!(
472 set.contains(&TradeId::new(
473 format!("trade-{}", DEDUP_CAPACITY - 1).as_str(),
474 )),
475 "most-recent pre-overflow entry remains",
476 );
477 assert!(
478 set.contains(&overflow),
479 "the overflow entry was inserted after eviction",
480 );
481 }
482
483 #[rstest]
484 fn test_cleanup_terminal_removes_state() {
485 let state = WsDispatchState::new();
486 let cid = ClientOrderId::new("uuid-3");
487 state.register_identity(cid, make_identity());
488 state.insert_accepted(cid);
489
490 assert!(state.lookup_identity(&cid).is_some());
491 assert!(state.emitted_accepted.contains(&cid));
492
493 state.cleanup_terminal(&cid);
494
495 assert!(state.lookup_identity(&cid).is_none());
496 assert!(!state.emitted_accepted.contains(&cid));
497 }
498
499 #[rstest]
500 fn test_resolve_client_order_id_via_truncated_map() {
501 let map: Arc<AtomicMap<String, ClientOrderId>> = Arc::new(AtomicMap::new());
502 let full = ClientOrderId::new("full-uuid-12345");
503 map.insert("trunc-id".to_string(), full);
504
505 let resolved = resolve_client_order_id("trunc-id", &map);
506 assert_eq!(resolved, full);
507 }
508
509 #[rstest]
510 fn test_resolve_client_order_id_falls_back_to_input() {
511 let map: Arc<AtomicMap<String, ClientOrderId>> = Arc::new(AtomicMap::new());
512 let resolved = resolve_client_order_id("unknown", &map);
513 assert_eq!(resolved.as_str(), "unknown");
514 }
515}