Skip to main content

nautilus_dydx/websocket/
dispatch.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message dispatch for the dYdX execution client.
17//!
18//! Routes incoming subaccount channel data to the appropriate parsing and
19//! event emission paths. Tracked orders (submitted through this client) produce
20//! proper order events; untracked orders fall back to execution reports for
21//! downstream reconciliation.
22
23use std::sync::atomic::{AtomicBool, Ordering};
24
25use dashmap::{DashMap, DashSet};
26use nautilus_core::UUID4;
27use nautilus_model::{
28    enums::{OrderSide, OrderType},
29    events::OrderFilled,
30    identifiers::{ClientOrderId, InstrumentId, StrategyId, TraderId},
31    reports::FillReport,
32    types::Currency,
33};
34
35const DEDUP_CAPACITY: usize = 10_000;
36
37/// Order identity context stored at submission time, used by the WS dispatch
38/// task to produce proper order events without Cache access.
39#[derive(Debug, Clone)]
40pub struct OrderIdentity {
41    pub instrument_id: InstrumentId,
42    pub strategy_id: StrategyId,
43    pub order_side: OrderSide,
44    pub order_type: OrderType,
45}
46
47/// Shared state for cross-stream event deduplication in the execution
48/// dispatch loop.
49#[derive(Debug)]
50pub struct DydxWsDispatchState {
51    pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
52    pub emitted_accepted: DashSet<ClientOrderId>,
53    pub filled_orders: DashSet<ClientOrderId>,
54    clearing_accepted: AtomicBool,
55    clearing_filled: AtomicBool,
56}
57
58impl Default for DydxWsDispatchState {
59    fn default() -> Self {
60        Self {
61            order_identities: DashMap::new(),
62            emitted_accepted: DashSet::default(),
63            filled_orders: DashSet::default(),
64            clearing_accepted: AtomicBool::new(false),
65            clearing_filled: AtomicBool::new(false),
66        }
67    }
68}
69
70impl DydxWsDispatchState {
71    fn evict_if_full(set: &DashSet<ClientOrderId>, flag: &AtomicBool) {
72        if set.len() >= DEDUP_CAPACITY
73            && flag
74                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
75                .is_ok()
76        {
77            set.clear();
78            flag.store(false, Ordering::Release);
79        }
80    }
81
82    pub fn insert_accepted(&self, cid: ClientOrderId) {
83        Self::evict_if_full(&self.emitted_accepted, &self.clearing_accepted);
84        self.emitted_accepted.insert(cid);
85    }
86
87    pub fn insert_filled(&self, cid: ClientOrderId) {
88        Self::evict_if_full(&self.filled_orders, &self.clearing_filled);
89        self.filled_orders.insert(cid);
90    }
91
92    /// Removes an order from all tracking sets after it reaches terminal state.
93    pub fn cleanup_terminal(&self, client_order_id: &ClientOrderId) {
94        self.order_identities.remove(client_order_id);
95        self.emitted_accepted.remove(client_order_id);
96        self.filled_orders.remove(client_order_id);
97    }
98}
99
100/// Converts a [`FillReport`] to an [`OrderFilled`] event for tracked orders.
101///
102/// Uses the stored [`OrderIdentity`] to supply `order_side` and `order_type`
103/// fields that are not available in the report.
104///
105/// # Panics
106///
107/// Panics if `report.client_order_id` is `None`.
108pub fn fill_report_to_order_filled(
109    report: &FillReport,
110    trader_id: TraderId,
111    identity: &OrderIdentity,
112    quote_currency: Currency,
113) -> OrderFilled {
114    OrderFilled::new(
115        trader_id,
116        identity.strategy_id,
117        report.instrument_id,
118        report
119            .client_order_id
120            .expect("tracked order has client_order_id"),
121        report.venue_order_id,
122        report.account_id,
123        report.trade_id,
124        identity.order_side,
125        identity.order_type,
126        report.last_qty,
127        report.last_px,
128        quote_currency,
129        report.liquidity_side,
130        UUID4::new(),
131        report.ts_event,
132        report.ts_init,
133        false,
134        report.venue_position_id,
135        Some(report.commission),
136    )
137}