Skip to main content

nautilus_kraken/websocket/dispatch/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket execution dispatch for the Kraken Spot and Futures clients.
17//!
18//! Implements the two-tier execution dispatch contract from
19//! `docs/developer_guide/adapters.md` (lines 1232-1296):
20//!
21//! 1. The execution client registers an [`OrderIdentity`] in [`WsDispatchState`]
22//!    when it submits an order.
23//! 2. WebSocket execution messages are routed through the per-product dispatch
24//!    functions in [`futures`] and [`spot`]. For tracked orders the dispatch
25//!    builds typed [`OrderEventAny`] events and emits them directly via
26//!    [`ExecutionEventEmitter::send_order_event`]. For untracked / external
27//!    orders the dispatch falls back to `OrderStatusReport` / `FillReport`
28//!    so the engine can reconcile.
29//!
30//! The dispatch state lives in an `Arc<WsDispatchState>` shared between the
31//! main client thread (which registers identities at submission time) and the
32//! spawned WebSocket consumer task. `DashMap`/`DashSet` provide lock-free
33//! concurrent access.
34
35pub mod futures;
36pub mod spot;
37
38use std::sync::{
39    Arc, Mutex,
40    atomic::{AtomicBool, Ordering},
41};
42
43use dashmap::{DashMap, DashSet};
44use indexmap::IndexSet;
45use nautilus_core::{AtomicMap, UUID4, UnixNanos};
46use nautilus_live::ExecutionEventEmitter;
47use nautilus_model::{
48    enums::{OrderSide, OrderType},
49    events::{OrderAccepted, OrderEventAny, OrderFilled},
50    identifiers::{
51        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TradeId, TraderId, VenueOrderId,
52    },
53    instruments::InstrumentAny,
54    reports::FillReport,
55    types::{Currency, Quantity},
56};
57
58use crate::common::consts::KRAKEN_VENUE;
59
60const DEDUP_CAPACITY: usize = 10_000;
61
62/// Snapshot of the mutable fields seen on a tracked `OpenOrdersDelta`.
63///
64/// Used by the futures delta path to discriminate partial fills (filled
65/// increased), modify acknowledgements (qty / price / trigger_price changed),
66/// and no-op deltas (nothing changed) when a follow-up delta arrives.
67#[derive(Debug, Clone, Copy, PartialEq)]
68pub struct DeltaSnapshot {
69    pub qty: Quantity,
70    pub filled: Quantity,
71    pub limit_price_bits: Option<u64>,
72    pub stop_price_bits: Option<u64>,
73}
74
75impl DeltaSnapshot {
76    pub(crate) fn new(
77        qty: Quantity,
78        filled: Quantity,
79        limit_price: Option<f64>,
80        stop_price: Option<f64>,
81    ) -> Self {
82        Self {
83            qty,
84            filled,
85            limit_price_bits: limit_price.map(f64::to_bits),
86            stop_price_bits: stop_price.map(f64::to_bits),
87        }
88    }
89
90    pub(crate) fn non_fill_fields_match(&self, other: &Self) -> bool {
91        self.qty == other.qty
92            && self.limit_price_bits == other.limit_price_bits
93            && self.stop_price_bits == other.stop_price_bits
94    }
95}
96
97/// Identity metadata captured when an order is submitted through this client.
98///
99/// Stored in [`WsDispatchState::order_identities`] keyed by the full Nautilus
100/// [`ClientOrderId`]. The dispatch functions use the identity to build typed
101/// order events for tracked orders without needing access to the engine cache
102/// (which is `!Send` and unreachable from the spawned WebSocket task).
103#[derive(Debug, Clone)]
104pub struct OrderIdentity {
105    /// Strategy that owns the order.
106    pub strategy_id: StrategyId,
107    /// Instrument the order targets.
108    pub instrument_id: InstrumentId,
109    /// Order side captured at submission.
110    pub order_side: OrderSide,
111    /// Order type captured at submission.
112    pub order_type: OrderType,
113    /// Order quantity captured at submission. Used to detect terminal fills.
114    pub quantity: Quantity,
115}
116
117/// Per-client dispatch state shared between order submission and the
118/// WebSocket consumer task.
119///
120/// Tracks which orders were submitted through this client (so we can route
121/// venue events to typed [`OrderEventAny`] emissions for tracked orders, and
122/// fall back to reports for external orders), and provides cross-stream
123/// dedup for `OrderAccepted` and `OrderFilled` emissions.
124#[derive(Debug)]
125pub struct WsDispatchState {
126    /// Tracked orders keyed by full Nautilus [`ClientOrderId`].
127    pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
128    /// Client order IDs for which an `OrderAccepted` event has been emitted.
129    pub emitted_accepted: DashSet<ClientOrderId>,
130    /// Client order IDs that have reached the filled terminal state.
131    pub filled_orders: DashSet<ClientOrderId>,
132    /// Last snapshot of qty / filled / price / trigger_price seen on a
133    /// tracked `OpenOrdersDelta`.
134    ///
135    /// The futures delta path uses this map to discriminate partial-fill
136    /// notifications (the new delta carries `filled` greater than the
137    /// previously seen value), modify acknowledgements (a non-fill field
138    /// changed), and pure no-op deltas (nothing changed). It is updated only
139    /// by the delta path so that the fill path's own cumulative is not
140    /// double-counted.
141    pub delta_snapshots: DashMap<ClientOrderId, DeltaSnapshot>,
142    /// Cumulative filled quantity per tracked client order id, populated by
143    /// the fill side of dispatch.
144    ///
145    /// Compared against `OrderIdentity::quantity` to decide when to clean up
146    /// tracked state on a terminal fill.
147    pub order_filled_qty: DashMap<ClientOrderId, Quantity>,
148    /// Trade IDs for which an `OrderFilled` event has been emitted.
149    ///
150    /// Bounded FIFO dedup: when capacity is reached, the oldest entry is
151    /// evicted on the next insert. A simple `clear()` at the threshold would
152    /// drop all recent trade IDs at once, opening a window where a reconnect
153    /// or replay immediately after the rollover could re-emit duplicate
154    /// `OrderFilled` events.
155    pub emitted_trades: Mutex<IndexSet<TradeId>>,
156    clearing: AtomicBool,
157}
158
159impl Default for WsDispatchState {
160    fn default() -> Self {
161        Self {
162            order_identities: DashMap::new(),
163            emitted_accepted: DashSet::default(),
164            filled_orders: DashSet::default(),
165            delta_snapshots: DashMap::new(),
166            order_filled_qty: DashMap::new(),
167            emitted_trades: Mutex::new(IndexSet::with_capacity(DEDUP_CAPACITY)),
168            clearing: AtomicBool::new(false),
169        }
170    }
171}
172
173impl WsDispatchState {
174    /// Creates a new empty dispatch state.
175    #[must_use]
176    pub fn new() -> Self {
177        Self::default()
178    }
179
180    /// Registers an order identity. Called by the execution client at order
181    /// submission time, before any WebSocket events for the order can arrive.
182    pub fn register_identity(&self, client_order_id: ClientOrderId, identity: OrderIdentity) {
183        self.order_identities.insert(client_order_id, identity);
184    }
185
186    /// Returns a clone of the identity for the given client order id, if any.
187    #[must_use]
188    pub fn lookup_identity(&self, client_order_id: &ClientOrderId) -> Option<OrderIdentity> {
189        self.order_identities
190            .get(client_order_id)
191            .map(|r| r.clone())
192    }
193
194    /// Marks an `OrderAccepted` event as emitted for this order.
195    pub fn insert_accepted(&self, cid: ClientOrderId) {
196        self.evict_if_full(&self.emitted_accepted);
197        self.emitted_accepted.insert(cid);
198    }
199
200    /// Marks an order as having reached the filled terminal state.
201    pub fn insert_filled(&self, cid: ClientOrderId) {
202        self.evict_if_full(&self.filled_orders);
203        self.filled_orders.insert(cid);
204    }
205
206    /// Atomically inserts a trade id into the dedup set.
207    ///
208    /// Returns `true` when the trade was already present (i.e. it is a
209    /// duplicate), `false` otherwise. When the dedup set is at capacity the
210    /// oldest entry is evicted to make room, preserving the `DEDUP_CAPACITY`
211    /// most recently seen trade IDs.
212    #[expect(
213        clippy::missing_panics_doc,
214        reason = "dedup mutex poisoning is not expected"
215    )]
216    pub fn check_and_insert_trade(&self, trade_id: TradeId) -> bool {
217        let mut set = self.emitted_trades.lock().expect("dedup mutex poisoned");
218
219        if set.contains(&trade_id) {
220            return true;
221        }
222
223        if set.len() >= DEDUP_CAPACITY {
224            set.shift_remove_index(0);
225        }
226
227        set.insert(trade_id);
228        false
229    }
230
231    /// Removes all dispatch state for an order that has reached a terminal state.
232    pub fn cleanup_terminal(&self, client_order_id: &ClientOrderId) {
233        self.order_identities.remove(client_order_id);
234        self.emitted_accepted.remove(client_order_id);
235        self.order_filled_qty.remove(client_order_id);
236        self.delta_snapshots.remove(client_order_id);
237    }
238
239    /// Records cumulative filled quantity for a tracked order. Used by the
240    /// fill side of dispatch only.
241    pub fn record_filled_qty(&self, client_order_id: ClientOrderId, qty: Quantity) {
242        self.order_filled_qty.insert(client_order_id, qty);
243    }
244
245    /// Returns the previously recorded cumulative filled quantity, if any.
246    #[must_use]
247    pub fn previous_filled_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
248        self.order_filled_qty.get(client_order_id).map(|r| *r)
249    }
250
251    /// Records the latest delta snapshot for a tracked order. Used by the
252    /// delta side of dispatch only.
253    pub fn record_delta_snapshot(&self, client_order_id: ClientOrderId, snapshot: DeltaSnapshot) {
254        self.delta_snapshots.insert(client_order_id, snapshot);
255    }
256
257    /// Returns the previously recorded delta snapshot, if any.
258    #[must_use]
259    pub fn previous_delta_snapshot(
260        &self,
261        client_order_id: &ClientOrderId,
262    ) -> Option<DeltaSnapshot> {
263        self.delta_snapshots.get(client_order_id).map(|r| *r)
264    }
265
266    /// Updates the tracked `quantity` for an order following a successful
267    /// modify acknowledgement, leaving all other identity fields untouched.
268    pub fn update_identity_quantity(&self, client_order_id: &ClientOrderId, quantity: Quantity) {
269        if let Some(mut entry) = self.order_identities.get_mut(client_order_id) {
270            entry.quantity = quantity;
271        }
272    }
273
274    fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
275        if set.len() >= DEDUP_CAPACITY
276            && self
277                .clearing
278                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
279                .is_ok()
280        {
281            set.clear();
282            self.clearing.store(false, Ordering::Release);
283        }
284    }
285}
286
287/// Resolves a Kraken-truncated client order id to its full Nautilus form.
288///
289/// Kraken truncates non-UUID client order ids to 18 characters; the truncation
290/// map is populated at submission time so the WebSocket consumer can recover
291/// the original id. Falls back to constructing a fresh `ClientOrderId` from
292/// the truncated string when no mapping exists (the order is then treated as
293/// external by downstream lookup).
294pub(crate) fn resolve_client_order_id(
295    truncated: &str,
296    truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
297) -> ClientOrderId {
298    truncated_id_map
299        .load()
300        .get(truncated)
301        .copied()
302        .unwrap_or_else(|| ClientOrderId::new(truncated))
303}
304
305/// Synthesizes and emits an `OrderAccepted` event when one has not yet been
306/// emitted for the given order.
307///
308/// Used before emitting non-Accepted events (Filled, Canceled, Expired,
309/// Updated) so that strategies always observe the canonical
310/// `Submitted -> Accepted -> ...` lifecycle even when the venue compresses
311/// the acceptance and follow-up event into a single message (fast fills).
312#[expect(clippy::too_many_arguments)]
313pub(crate) fn ensure_accepted_emitted(
314    client_order_id: ClientOrderId,
315    venue_order_id: VenueOrderId,
316    account_id: AccountId,
317    identity: &OrderIdentity,
318    state: &WsDispatchState,
319    emitter: &ExecutionEventEmitter,
320    ts_event: UnixNanos,
321    ts_init: UnixNanos,
322) {
323    if state.emitted_accepted.contains(&client_order_id) {
324        return;
325    }
326    state.insert_accepted(client_order_id);
327    let accepted = OrderAccepted::new(
328        emitter.trader_id(),
329        identity.strategy_id,
330        identity.instrument_id,
331        client_order_id,
332        venue_order_id,
333        account_id,
334        UUID4::new(),
335        ts_event,
336        ts_init,
337        false,
338    );
339    emitter.send_order_event(OrderEventAny::Accepted(accepted));
340}
341
342/// Builds an [`OrderFilled`] event from a [`FillReport`] and tracked
343/// [`OrderIdentity`].
344pub(crate) fn fill_report_to_order_filled(
345    report: &FillReport,
346    trader_id: TraderId,
347    identity: &OrderIdentity,
348    quote_currency: Currency,
349    client_order_id: ClientOrderId,
350) -> OrderFilled {
351    OrderFilled::new(
352        trader_id,
353        identity.strategy_id,
354        identity.instrument_id,
355        client_order_id,
356        report.venue_order_id,
357        report.account_id,
358        report.trade_id,
359        identity.order_side,
360        identity.order_type,
361        report.last_qty,
362        report.last_px,
363        quote_currency,
364        report.liquidity_side,
365        UUID4::new(),
366        report.ts_event,
367        report.ts_init,
368        false,
369        report.venue_position_id,
370        Some(report.commission),
371    )
372}
373
374/// Looks up an instrument from the shared instruments cache by raw symbol.
375pub(crate) fn lookup_instrument(
376    instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
377    raw_symbol: &str,
378) -> Option<InstrumentAny> {
379    let instrument_id = InstrumentId::new(Symbol::new(raw_symbol), *KRAKEN_VENUE);
380    instruments.load().get(&instrument_id).cloned()
381}
382
383#[cfg(test)]
384mod tests {
385    use nautilus_model::{
386        enums::{OrderSide, OrderType},
387        identifiers::{ClientOrderId, InstrumentId, StrategyId, TradeId},
388    };
389    use rstest::rstest;
390
391    use super::*;
392
393    fn make_identity() -> OrderIdentity {
394        OrderIdentity {
395            strategy_id: StrategyId::new("EXEC_TESTER-001"),
396            instrument_id: InstrumentId::from("PF_XBTUSD.KRAKEN"),
397            order_side: OrderSide::Buy,
398            order_type: OrderType::Limit,
399            quantity: Quantity::from("0.0001"),
400        }
401    }
402
403    #[rstest]
404    fn test_register_and_lookup_identity() {
405        let state = WsDispatchState::new();
406        let cid = ClientOrderId::new("uuid-1");
407        state.register_identity(cid, make_identity());
408
409        let found = state.lookup_identity(&cid);
410        assert!(found.is_some());
411        let identity = found.unwrap();
412        assert_eq!(identity.strategy_id.as_str(), "EXEC_TESTER-001");
413        assert_eq!(identity.order_side, OrderSide::Buy);
414    }
415
416    #[rstest]
417    fn test_lookup_identity_missing_returns_none() {
418        let state = WsDispatchState::new();
419        let cid = ClientOrderId::new("not-tracked");
420        assert!(state.lookup_identity(&cid).is_none());
421    }
422
423    #[rstest]
424    fn test_insert_accepted_dedup() {
425        let state = WsDispatchState::new();
426        let cid = ClientOrderId::new("uuid-2");
427        assert!(!state.emitted_accepted.contains(&cid));
428        state.insert_accepted(cid);
429        assert!(state.emitted_accepted.contains(&cid));
430        // Second insert is a no-op.
431        state.insert_accepted(cid);
432        assert!(state.emitted_accepted.contains(&cid));
433    }
434
435    #[rstest]
436    fn test_check_and_insert_trade_detects_duplicates() {
437        let state = WsDispatchState::new();
438        let trade = TradeId::new("trade-1");
439        // First insert: not a duplicate.
440        assert!(!state.check_and_insert_trade(trade));
441        // Second insert: duplicate.
442        assert!(state.check_and_insert_trade(trade));
443    }
444
445    #[rstest]
446    fn test_check_and_insert_trade_fifo_eviction_preserves_recent_ids() {
447        // Verifies the dedup window slides rather than collapsing to zero at
448        // the capacity threshold. Overshooting by one entry must evict only
449        // the oldest (`trade-0`), leaving every other ID still deduped.
450        let state = WsDispatchState::new();
451        for i in 0..DEDUP_CAPACITY {
452            let trade = TradeId::new(format!("trade-{i}").as_str());
453            assert!(!state.check_and_insert_trade(trade));
454        }
455        // At capacity; the next insert evicts `trade-0`.
456        let overflow = TradeId::new(format!("trade-{DEDUP_CAPACITY}").as_str());
457        assert!(!state.check_and_insert_trade(overflow));
458
459        // Inspect the dedup set directly to confirm FIFO behaviour without
460        // perturbing state via another `check_and_insert_trade` call.
461        let set = state.emitted_trades.lock().expect("dedup mutex poisoned");
462        assert_eq!(set.len(), DEDUP_CAPACITY);
463        assert!(
464            !set.contains(&TradeId::new("trade-0")),
465            "oldest entry should have been evicted",
466        );
467        assert!(
468            set.contains(&TradeId::new("trade-1")),
469            "second-oldest remains"
470        );
471        assert!(
472            set.contains(&TradeId::new(
473                format!("trade-{}", DEDUP_CAPACITY - 1).as_str(),
474            )),
475            "most-recent pre-overflow entry remains",
476        );
477        assert!(
478            set.contains(&overflow),
479            "the overflow entry was inserted after eviction",
480        );
481    }
482
483    #[rstest]
484    fn test_cleanup_terminal_removes_state() {
485        let state = WsDispatchState::new();
486        let cid = ClientOrderId::new("uuid-3");
487        state.register_identity(cid, make_identity());
488        state.insert_accepted(cid);
489
490        assert!(state.lookup_identity(&cid).is_some());
491        assert!(state.emitted_accepted.contains(&cid));
492
493        state.cleanup_terminal(&cid);
494
495        assert!(state.lookup_identity(&cid).is_none());
496        assert!(!state.emitted_accepted.contains(&cid));
497    }
498
499    #[rstest]
500    fn test_resolve_client_order_id_via_truncated_map() {
501        let map: Arc<AtomicMap<String, ClientOrderId>> = Arc::new(AtomicMap::new());
502        let full = ClientOrderId::new("full-uuid-12345");
503        map.insert("trunc-id".to_string(), full);
504
505        let resolved = resolve_client_order_id("trunc-id", &map);
506        assert_eq!(resolved, full);
507    }
508
509    #[rstest]
510    fn test_resolve_client_order_id_falls_back_to_input() {
511        let map: Arc<AtomicMap<String, ClientOrderId>> = Arc::new(AtomicMap::new());
512        let resolved = resolve_client_order_id("unknown", &map);
513        assert_eq!(resolved.as_str(), "unknown");
514    }
515}