Skip to main content

nautilus_binance/common/
dispatch.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket dispatch state for tracked/external order routing.
17//!
18//! Orders submitted through this client have their identity registered in
19//! [`WsDispatchState`]. When user data stream messages arrive, the dispatch
20//! function checks for a registered identity:
21//! - Tracked orders produce proper order events (OrderAccepted, OrderFilled, etc.).
22//! - Untracked orders fall back to execution reports for reconciliation.
23
24use std::sync::Mutex;
25
26use dashmap::DashMap;
27use nautilus_common::cache::fifo::FifoCache;
28use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
29use nautilus_live::ExecutionEventEmitter;
30use nautilus_model::{
31    enums::{OrderSide, OrderType},
32    events::{OrderAccepted, OrderEventAny},
33    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, VenueOrderId},
34    types::Price,
35};
36
37/// The type of operation a pending WS API request represents.
38#[derive(Debug, Clone, Copy)]
39pub enum PendingOperation {
40    Place,
41    Cancel,
42    Modify,
43}
44
45/// A pending WS API request awaiting a response.
46///
47/// Stored in [`WsDispatchState::pending_requests`] after the WS client
48/// returns a request ID. When the venue responds (accepted or rejected),
49/// the pending request is removed and used to emit the correct order event.
50#[derive(Debug, Clone)]
51pub struct PendingRequest {
52    pub client_order_id: ClientOrderId,
53    pub venue_order_id: Option<VenueOrderId>,
54    pub operation: PendingOperation,
55}
56
57/// Order identity context stored at submission time.
58///
59/// Provides the strategy and instrument metadata needed to construct proper
60/// order events without accessing the cache from the async dispatch task.
61#[derive(Debug, Clone)]
62pub struct OrderIdentity {
63    pub instrument_id: InstrumentId,
64    pub strategy_id: StrategyId,
65    pub order_side: OrderSide,
66    pub order_type: OrderType,
67    pub price: Option<Price>,
68}
69
70/// Tracks order lifecycle state for dispatch routing.
71///
72/// Orders with a registered identity (submitted through this client) produce
73/// proper order events. Orders without identity (external or pre-existing)
74/// fall back to execution reports for reconciliation.
75#[derive(Debug)]
76pub struct WsDispatchState {
77    pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
78    pub pending_requests: DashMap<String, PendingRequest>,
79    emitted_accepted: Mutex<FifoCache<ClientOrderId, 10_000>>,
80    filled_orders: Mutex<FifoCache<ClientOrderId, 10_000>>,
81}
82
83impl Default for WsDispatchState {
84    fn default() -> Self {
85        Self {
86            order_identities: DashMap::new(),
87            pending_requests: DashMap::new(),
88            emitted_accepted: Mutex::new(FifoCache::new()),
89            filled_orders: Mutex::new(FifoCache::new()),
90        }
91    }
92}
93
94#[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
95impl WsDispatchState {
96    pub fn has_emitted_accepted(&self, cid: &ClientOrderId) -> bool {
97        self.emitted_accepted
98            .lock()
99            .expect(MUTEX_POISONED)
100            .contains(cid)
101    }
102
103    /// Marks an order as having emitted an OrderAccepted event.
104    pub fn insert_accepted(&self, cid: ClientOrderId) {
105        self.emitted_accepted.lock().expect(MUTEX_POISONED).add(cid);
106    }
107
108    pub fn has_filled(&self, cid: &ClientOrderId) -> bool {
109        self.filled_orders
110            .lock()
111            .expect(MUTEX_POISONED)
112            .contains(cid)
113    }
114
115    /// Marks an order as having received a fill.
116    pub fn insert_filled(&self, cid: ClientOrderId) {
117        self.filled_orders.lock().expect(MUTEX_POISONED).add(cid);
118    }
119
120    /// Removes all tracking state for a terminal order.
121    pub fn cleanup_terminal(&self, cid: ClientOrderId) {
122        self.order_identities.remove(&cid);
123        self.emitted_accepted
124            .lock()
125            .expect(MUTEX_POISONED)
126            .remove(&cid);
127        self.filled_orders
128            .lock()
129            .expect(MUTEX_POISONED)
130            .remove(&cid);
131    }
132}
133
134/// Synthesizes and emits OrderAccepted if one has not yet been emitted.
135///
136/// Handles fast-filling orders that skip the New state on Binance.
137pub fn ensure_accepted_emitted(
138    client_order_id: ClientOrderId,
139    account_id: AccountId,
140    venue_order_id: VenueOrderId,
141    identity: &OrderIdentity,
142    emitter: &ExecutionEventEmitter,
143    state: &WsDispatchState,
144    ts_init: UnixNanos,
145) {
146    if state.has_emitted_accepted(&client_order_id) {
147        return;
148    }
149    state.insert_accepted(client_order_id);
150    let accepted = OrderAccepted::new(
151        emitter.trader_id(),
152        identity.strategy_id,
153        identity.instrument_id,
154        client_order_id,
155        venue_order_id,
156        account_id,
157        UUID4::new(),
158        ts_init,
159        ts_init,
160        false,
161    );
162    emitter.send_order_event(OrderEventAny::Accepted(accepted));
163}