Skip to main content

nautilus_hyperliquid/websocket/
dispatch.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket execution dispatch for the Hyperliquid execution client.
17//!
18//! Implements the two-tier execution dispatch contract from
19//! `docs/developer_guide/adapters.md` (lines 1232-1296):
20//!
21//! 1. The execution client registers an [`OrderIdentity`] in [`WsDispatchState`]
22//!    when it submits an order, and refreshes the cached venue order id when a
23//!    modify is sent so the WebSocket consumer can detect cancel-replace.
24//! 2. Incoming [`OrderStatusReport`] and [`FillReport`] messages are routed
25//!    through [`dispatch_order_status_report`] and [`dispatch_fill_report`].
26//!    For tracked orders these build typed [`OrderEventAny`] events and emit
27//!    them via [`ExecutionEventEmitter::send_order_event`]. For untracked /
28//!    external orders the dispatch falls back to forwarding the raw report.
29//!
30//! The dispatch state lives in an `Arc<WsDispatchState>` shared between the
31//! main client task (which registers identities at submission time) and the
32//! spawned WebSocket consumer task.
33//!
34//! # GH-3827 cancel-replace handling
35//!
36//! Hyperliquid implements `modify` as a cancel-and-replace: the venue emits an
37//! `ACCEPTED(new_voi)` together with a `CANCELED(old_voi)` under the same
38//! `client_order_id`. The dispatch detects the replacement leg by comparing
39//! `report.venue_order_id` to the last cached value, promotes it to an
40//! `OrderUpdated` event, and suppresses the stale cancel so strategies never
41//! observe a spurious termination.
42//!
43//! The pending-modify marker (keyed on `client_order_id`) is set by
44//! `modify_order` only after a successful HTTP round-trip and cleared on the
45//! matching `ACCEPTED`. It lets dispatch skip an early
46//! `CANCELED(old_voi)` that arrives before the replacement `ACCEPTED(new_voi)`
47//! on the WebSocket. The documented transport-timeout + WS-race window still
48//! applies: if the modify HTTP call fails but the venue actually accepted it,
49//! no marker is set, so a cancel-before-accept race in that window would
50//! surface as `OrderCanceled`. This matches the Python behaviour we are
51//! porting and is the simplest correct answer; verifying the venue-side
52//! outcome before emitting would require speculative waiting that drops
53//! latency without improving correctness.
54
55use std::{
56    collections::VecDeque,
57    hash::Hash,
58    sync::{
59        Mutex,
60        atomic::{AtomicBool, Ordering},
61    },
62};
63
64use ahash::AHashSet;
65use dashmap::{DashMap, DashSet};
66use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
67use nautilus_live::ExecutionEventEmitter;
68use nautilus_model::{
69    enums::{OrderSide, OrderStatus, OrderType},
70    events::{
71        OrderAccepted, OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected,
72        OrderTriggered, OrderUpdated,
73    },
74    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
75    reports::{FillReport, OrderStatusReport},
76    types::{Price, Quantity},
77};
78use ustr::Ustr;
79
80pub const DEDUP_CAPACITY: usize = 10_000;
81
82/// Identity metadata captured when an order is submitted through this client.
83///
84/// Stored in [`WsDispatchState::order_identities`] keyed by the full Nautilus
85/// [`ClientOrderId`]. The dispatch functions use the identity to build typed
86/// order events for tracked orders without needing access to the engine cache
87/// (which is `!Send` and unreachable from the spawned WebSocket task).
88#[derive(Debug, Clone)]
89pub struct OrderIdentity {
90    /// Strategy that owns the order.
91    pub strategy_id: StrategyId,
92    /// Instrument the order targets.
93    pub instrument_id: InstrumentId,
94    /// Order side captured at submission.
95    pub order_side: OrderSide,
96    /// Order type captured at submission.
97    pub order_type: OrderType,
98    /// Order quantity captured at submission.
99    pub quantity: Quantity,
100    /// Last known order price. Populated on submission and refreshed from
101    /// subsequent status reports so a cancel-replace `ACCEPTED` that omits
102    /// `price` can still produce an `OrderUpdated` carrying an accurate value.
103    pub price: Option<Price>,
104}
105
106/// Bounded FIFO deduplication set.
107///
108/// When the capacity is reached, the oldest entry is evicted on the next
109/// insert. A simple `clear()` at the threshold would drop every recent trade
110/// id at once, opening a window where a reconnect or replay right after the
111/// rollover could re-emit duplicate `OrderFilled` events; the FIFO window
112/// slides instead.
113#[derive(Debug)]
114pub struct BoundedDedup<T>
115where
116    T: Eq + Hash + Clone,
117{
118    order: VecDeque<T>,
119    set: AHashSet<T>,
120    capacity: usize,
121}
122
123impl<T> BoundedDedup<T>
124where
125    T: Eq + Hash + Clone,
126{
127    /// Creates a new bounded dedup set with the given `capacity`.
128    #[must_use]
129    pub fn new(capacity: usize) -> Self {
130        Self {
131            order: VecDeque::with_capacity(capacity),
132            set: AHashSet::with_capacity(capacity),
133            capacity,
134        }
135    }
136
137    /// Inserts a value. Returns `true` when the value was already present.
138    pub fn insert(&mut self, value: T) -> bool {
139        if self.set.contains(&value) {
140            return true;
141        }
142
143        if self.order.len() >= self.capacity
144            && let Some(evicted) = self.order.pop_front()
145        {
146            self.set.remove(&evicted);
147        }
148
149        self.order.push_back(value.clone());
150        self.set.insert(value);
151        false
152    }
153
154    /// Returns the number of entries currently tracked.
155    #[must_use]
156    pub fn len(&self) -> usize {
157        self.set.len()
158    }
159
160    /// Returns whether the dedup set is empty.
161    #[must_use]
162    pub fn is_empty(&self) -> bool {
163        self.set.is_empty()
164    }
165
166    /// Returns whether the value is currently tracked.
167    #[must_use]
168    pub fn contains(&self, value: &T) -> bool {
169        self.set.contains(value)
170    }
171}
172
173/// Per-client dispatch state shared between order submission and the
174/// WebSocket consumer task.
175///
176/// Tracks which orders were submitted through this client (so we can route
177/// venue events to typed [`OrderEventAny`] emissions for tracked orders and
178/// fall back to reports for external orders), provides cross-stream dedup
179/// for `OrderAccepted` and `OrderFilled` emissions, and carries the
180/// GH-3827 cancel-replace state (`cached_venue_order_ids` and
181/// `pending_modify_keys`).
182#[derive(Debug)]
183pub struct WsDispatchState {
184    /// Tracked orders keyed by full Nautilus [`ClientOrderId`].
185    pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
186    /// Client order IDs for which an `OrderAccepted` event has been emitted.
187    pub emitted_accepted: DashSet<ClientOrderId>,
188    /// Client order IDs that have reached the filled terminal state.
189    ///
190    /// Retained past `cleanup_terminal` so that late replay of the same
191    /// status or fill does not re-emit events.
192    pub filled_orders: DashSet<ClientOrderId>,
193    /// Trade IDs for which an `OrderFilled` event has been emitted.
194    ///
195    /// Bounded FIFO dedup to bound memory while keeping recent trade ids
196    /// deduped across reconnects.
197    pub emitted_trades: Mutex<BoundedDedup<TradeId>>,
198    /// Last venue order id observed for a tracked client order id.
199    ///
200    /// Populated on the first `OrderAccepted` and refreshed on every
201    /// cancel-replace promotion. A later `ACCEPTED` with a different venue
202    /// order id under the same client order id is treated as the
203    /// replacement leg of a Hyperliquid modify and emitted as `OrderUpdated`.
204    pub cached_venue_order_ids: DashMap<ClientOrderId, VenueOrderId>,
205    /// Maps `client_order_id` to the old venue order id of an in-flight
206    /// modify. Populated by `modify_order` only after a successful HTTP
207    /// round-trip and cleared on the matching `ACCEPTED(new_voi)`. A
208    /// `CANCELED(old_voi)` arriving while the marker is set is treated as
209    /// the cancel leg of a cancel-before-accept race and suppressed so the
210    /// later `ACCEPTED(new_voi)` can flow through the `OrderUpdated` path.
211    pub pending_modify_keys: DashMap<ClientOrderId, VenueOrderId>,
212    /// Cumulative filled quantity per tracked order. Compared against
213    /// `OrderIdentity::quantity` to decide when to clean up tracked state.
214    pub order_filled_qty: DashMap<ClientOrderId, Quantity>,
215    clearing: AtomicBool,
216}
217
218impl Default for WsDispatchState {
219    fn default() -> Self {
220        Self {
221            order_identities: DashMap::new(),
222            emitted_accepted: DashSet::default(),
223            filled_orders: DashSet::default(),
224            emitted_trades: Mutex::new(BoundedDedup::new(DEDUP_CAPACITY)),
225            cached_venue_order_ids: DashMap::new(),
226            pending_modify_keys: DashMap::new(),
227            order_filled_qty: DashMap::new(),
228            clearing: AtomicBool::new(false),
229        }
230    }
231}
232
233impl WsDispatchState {
234    /// Creates a new empty dispatch state.
235    #[must_use]
236    pub fn new() -> Self {
237        Self::default()
238    }
239
240    /// Registers an order identity. Called by the execution client at order
241    /// submission time, before any WebSocket events for the order can arrive.
242    pub fn register_identity(&self, client_order_id: ClientOrderId, identity: OrderIdentity) {
243        self.order_identities.insert(client_order_id, identity);
244    }
245
246    /// Returns a clone of the identity for the given client order id, if any.
247    #[must_use]
248    pub fn lookup_identity(&self, client_order_id: &ClientOrderId) -> Option<OrderIdentity> {
249        self.order_identities
250            .get(client_order_id)
251            .map(|r| r.clone())
252    }
253
254    /// Refreshes the tracked price for a modify ack when the new report
255    /// carries an updated price.
256    pub fn update_identity_price(&self, client_order_id: &ClientOrderId, price: Option<Price>) {
257        if let Some(price) = price
258            && let Some(mut entry) = self.order_identities.get_mut(client_order_id)
259        {
260            entry.price = Some(price);
261        }
262    }
263
264    /// Refreshes the tracked quantity for a modify ack.
265    pub fn update_identity_quantity(&self, client_order_id: &ClientOrderId, quantity: Quantity) {
266        if let Some(mut entry) = self.order_identities.get_mut(client_order_id) {
267            entry.quantity = quantity;
268        }
269    }
270
271    /// Marks an `OrderAccepted` event as emitted for this order.
272    pub fn insert_accepted(&self, cid: ClientOrderId) {
273        self.evict_if_full(&self.emitted_accepted);
274        self.emitted_accepted.insert(cid);
275    }
276
277    /// Marks an order as having reached the filled terminal state.
278    pub fn insert_filled(&self, cid: ClientOrderId) {
279        self.evict_if_full(&self.filled_orders);
280        self.filled_orders.insert(cid);
281    }
282
283    /// Atomically inserts a trade id into the dedup set.
284    ///
285    /// Returns `true` when the trade was already present (i.e. it is a
286    /// duplicate), `false` otherwise.
287    #[allow(
288        clippy::missing_panics_doc,
289        reason = "dedup mutex poisoning is not expected"
290    )]
291    pub fn check_and_insert_trade(&self, trade_id: TradeId) -> bool {
292        let mut set = self.emitted_trades.lock().expect(MUTEX_POISONED);
293        set.insert(trade_id)
294    }
295
296    /// Caches the venue order id observed for a tracked client order id.
297    pub fn record_venue_order_id(
298        &self,
299        client_order_id: ClientOrderId,
300        venue_order_id: VenueOrderId,
301    ) {
302        self.cached_venue_order_ids
303            .insert(client_order_id, venue_order_id);
304    }
305
306    /// Returns the previously cached venue order id, if any.
307    #[must_use]
308    pub fn cached_venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
309        self.cached_venue_order_ids.get(client_order_id).map(|r| *r)
310    }
311
312    /// Marks an in-flight modify for cancel-before-accept suppression.
313    pub fn mark_pending_modify(
314        &self,
315        client_order_id: ClientOrderId,
316        old_venue_order_id: VenueOrderId,
317    ) {
318        self.pending_modify_keys
319            .insert(client_order_id, old_venue_order_id);
320    }
321
322    /// Clears the pending modify marker for a client order id.
323    pub fn clear_pending_modify(&self, client_order_id: &ClientOrderId) {
324        self.pending_modify_keys.remove(client_order_id);
325    }
326
327    /// Returns the pending modify marker for a client order id, if any.
328    #[must_use]
329    pub fn pending_modify(&self, client_order_id: &ClientOrderId) -> Option<VenueOrderId> {
330        self.pending_modify_keys.get(client_order_id).map(|r| *r)
331    }
332
333    /// Records cumulative filled quantity for a tracked order.
334    pub fn record_filled_qty(&self, client_order_id: ClientOrderId, qty: Quantity) {
335        self.order_filled_qty.insert(client_order_id, qty);
336    }
337
338    /// Returns the previously recorded cumulative filled quantity, if any.
339    #[must_use]
340    pub fn previous_filled_qty(&self, client_order_id: &ClientOrderId) -> Option<Quantity> {
341        self.order_filled_qty.get(client_order_id).map(|r| *r)
342    }
343
344    /// Removes all dispatch state for an order that has reached a terminal state.
345    ///
346    /// `filled_orders` is intentionally *not* cleared here: the marker is
347    /// used to suppress stale replays and must outlive the identity cleanup.
348    pub fn cleanup_terminal(&self, client_order_id: &ClientOrderId) {
349        self.order_identities.remove(client_order_id);
350        self.emitted_accepted.remove(client_order_id);
351        self.cached_venue_order_ids.remove(client_order_id);
352        self.pending_modify_keys.remove(client_order_id);
353        self.order_filled_qty.remove(client_order_id);
354    }
355
356    fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
357        if set.len() >= DEDUP_CAPACITY
358            && self
359                .clearing
360                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
361                .is_ok()
362        {
363            set.clear();
364            self.clearing.store(false, Ordering::Release);
365        }
366    }
367}
368
369/// Outcome of a single dispatch call.
370#[derive(Debug, Clone, Copy, PartialEq, Eq)]
371pub enum DispatchOutcome {
372    /// The report was for a tracked order. Typed events have been emitted
373    /// (or intentionally skipped, e.g. dedup hit). The caller must not
374    /// forward the report as a fallback.
375    Tracked,
376    /// The report is for an external / untracked order. The caller should
377    /// forward the report via [`ExecutionEventEmitter::send_order_status_report`]
378    /// or [`ExecutionEventEmitter::send_fill_report`] so the engine can
379    /// reconcile.
380    External,
381    /// The report was recognised as stale (e.g. cancel leg of a
382    /// cancel-replace modify, or replay after terminal state). The caller
383    /// must drop it without forwarding.
384    Skip,
385}
386
387/// Dispatches an [`OrderStatusReport`] using the two-tier routing contract.
388///
389/// Returns [`DispatchOutcome::Tracked`] when the report maps to a tracked
390/// order (typed events have been emitted or dedup hit), [`External`] when
391/// the caller should forward the report as an untracked fallback, or
392/// [`Skip`] when the report is a stale / race leg that must be dropped.
393///
394/// [`External`]: DispatchOutcome::External
395/// [`Skip`]: DispatchOutcome::Skip
396pub fn dispatch_order_status_report(
397    report: &OrderStatusReport,
398    state: &WsDispatchState,
399    emitter: &ExecutionEventEmitter,
400    ts_init: UnixNanos,
401) -> DispatchOutcome {
402    let Some(client_order_id) = report.client_order_id else {
403        return DispatchOutcome::External;
404    };
405
406    if state.filled_orders.contains(&client_order_id) {
407        log::debug!(
408            "Skipping stale report for filled order: cid={client_order_id}, status={:?}",
409            report.order_status,
410        );
411        return DispatchOutcome::Skip;
412    }
413
414    let Some(identity) = state.lookup_identity(&client_order_id) else {
415        return DispatchOutcome::External;
416    };
417
418    match report.order_status {
419        OrderStatus::Accepted => {
420            handle_accepted(report, client_order_id, &identity, state, emitter, ts_init)
421        }
422        OrderStatus::Triggered => {
423            handle_triggered(report, client_order_id, &identity, state, emitter, ts_init)
424        }
425        OrderStatus::Canceled => {
426            handle_canceled(report, client_order_id, &identity, state, emitter, ts_init)
427        }
428        OrderStatus::Expired => {
429            handle_expired(report, client_order_id, &identity, state, emitter, ts_init)
430        }
431        OrderStatus::Rejected => {
432            handle_rejected(report, client_order_id, &identity, state, emitter, ts_init)
433        }
434        OrderStatus::Filled => handle_filled_marker(client_order_id, state),
435        OrderStatus::PartiallyFilled => {
436            // Fills come via `FillReport`; nothing to emit from the status path.
437            DispatchOutcome::Tracked
438        }
439        OrderStatus::PendingUpdate
440        | OrderStatus::PendingCancel
441        | OrderStatus::Submitted
442        | OrderStatus::Initialized
443        | OrderStatus::Denied
444        | OrderStatus::Released
445        | OrderStatus::Emulated => DispatchOutcome::Tracked,
446    }
447}
448
449/// Dispatches a [`FillReport`] using the two-tier routing contract.
450///
451/// Returns [`DispatchOutcome::Tracked`] when the fill has been emitted as
452/// an `OrderFilled` event (or skipped via trade dedup), [`External`] when
453/// the caller should forward the fill via
454/// [`ExecutionEventEmitter::send_fill_report`], or [`Skip`] when the fill
455/// is a replay for an already-terminal order and must be dropped.
456///
457/// [`External`]: DispatchOutcome::External
458/// [`Skip`]: DispatchOutcome::Skip
459pub fn dispatch_fill_report(
460    report: &FillReport,
461    state: &WsDispatchState,
462    emitter: &ExecutionEventEmitter,
463    ts_init: UnixNanos,
464) -> DispatchOutcome {
465    let Some(client_order_id) = report.client_order_id else {
466        return DispatchOutcome::External;
467    };
468
469    if state.filled_orders.contains(&client_order_id) {
470        log::debug!(
471            "Skipping stale fill for filled order: cid={client_order_id}, trade_id={}",
472            report.trade_id,
473        );
474        return DispatchOutcome::Skip;
475    }
476
477    let Some(identity) = state.lookup_identity(&client_order_id) else {
478        return DispatchOutcome::External;
479    };
480
481    if state.check_and_insert_trade(report.trade_id) {
482        log::debug!(
483            "Skipping duplicate fill for {client_order_id}: trade_id={}",
484            report.trade_id
485        );
486        return DispatchOutcome::Tracked;
487    }
488
489    ensure_accepted_emitted(
490        client_order_id,
491        report.venue_order_id,
492        report.account_id,
493        &identity,
494        state,
495        emitter,
496        report.ts_event,
497        ts_init,
498    );
499
500    let filled = OrderFilled::new(
501        emitter.trader_id(),
502        identity.strategy_id,
503        identity.instrument_id,
504        client_order_id,
505        report.venue_order_id,
506        report.account_id,
507        report.trade_id,
508        identity.order_side,
509        identity.order_type,
510        report.last_qty,
511        report.last_px,
512        report.commission.currency,
513        report.liquidity_side,
514        UUID4::new(),
515        report.ts_event,
516        ts_init,
517        false,
518        report.venue_position_id,
519        Some(report.commission),
520    );
521    emitter.send_order_event(OrderEventAny::Filled(filled));
522
523    let previous = state
524        .previous_filled_qty(&client_order_id)
525        .unwrap_or_else(|| Quantity::zero(report.last_qty.precision));
526    let cumulative = previous + report.last_qty;
527    state.record_filled_qty(client_order_id, cumulative);
528
529    if cumulative >= identity.quantity {
530        state.insert_filled(client_order_id);
531        state.cleanup_terminal(&client_order_id);
532    }
533
534    DispatchOutcome::Tracked
535}
536
537fn handle_accepted(
538    report: &OrderStatusReport,
539    client_order_id: ClientOrderId,
540    identity: &OrderIdentity,
541    state: &WsDispatchState,
542    emitter: &ExecutionEventEmitter,
543    ts_init: UnixNanos,
544) -> DispatchOutcome {
545    let venue_order_id = report.venue_order_id;
546    let ts_event = report.ts_last;
547    let account_id = report.account_id;
548
549    // Cancel-replace detection: if an earlier ACCEPTED cached a different
550    // venue_order_id under the same client_order_id, this ACCEPTED is the
551    // replacement leg of a Hyperliquid modify and must be promoted to
552    // OrderUpdated. See GH-3827.
553    if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
554        && cached_voi != venue_order_id
555    {
556        let price = report.price.or(identity.price);
557        let Some(price) = price else {
558            log::warn!(
559                "Cannot emit OrderUpdated for cancel-replace {client_order_id}: \
560                 no price on report and no cached price on identity",
561            );
562            return DispatchOutcome::Skip;
563        };
564
565        state.record_venue_order_id(client_order_id, venue_order_id);
566        state.update_identity_quantity(&client_order_id, report.quantity);
567        state.update_identity_price(&client_order_id, Some(price));
568        state.clear_pending_modify(&client_order_id);
569
570        let updated = OrderUpdated::new(
571            emitter.trader_id(),
572            identity.strategy_id,
573            identity.instrument_id,
574            client_order_id,
575            report.quantity,
576            UUID4::new(),
577            ts_event,
578            ts_init,
579            false,
580            Some(venue_order_id),
581            Some(account_id),
582            Some(price),
583            report.trigger_price,
584            None,
585            false,
586        );
587        emitter.send_order_event(OrderEventAny::Updated(updated));
588        return DispatchOutcome::Tracked;
589    }
590
591    if state.emitted_accepted.contains(&client_order_id) {
592        // Repeat ACCEPTED for an already-accepted order. Nothing to emit;
593        // refresh the cached price so a subsequent cancel-replace without a
594        // report price can still recover an accurate value.
595        state.update_identity_price(&client_order_id, report.price);
596        return DispatchOutcome::Tracked;
597    }
598
599    state.insert_accepted(client_order_id);
600    state.record_venue_order_id(client_order_id, venue_order_id);
601    state.update_identity_price(&client_order_id, report.price);
602
603    let accepted = OrderAccepted::new(
604        emitter.trader_id(),
605        identity.strategy_id,
606        identity.instrument_id,
607        client_order_id,
608        venue_order_id,
609        account_id,
610        UUID4::new(),
611        ts_event,
612        ts_init,
613        false,
614    );
615    emitter.send_order_event(OrderEventAny::Accepted(accepted));
616    DispatchOutcome::Tracked
617}
618
619fn handle_triggered(
620    report: &OrderStatusReport,
621    client_order_id: ClientOrderId,
622    identity: &OrderIdentity,
623    state: &WsDispatchState,
624    emitter: &ExecutionEventEmitter,
625    ts_init: UnixNanos,
626) -> DispatchOutcome {
627    if !matches!(
628        identity.order_type,
629        OrderType::StopLimit | OrderType::TrailingStopLimit | OrderType::LimitIfTouched
630    ) {
631        log::debug!(
632            "Ignoring TRIGGERED status for non-triggerable order type {:?}: {client_order_id}",
633            identity.order_type,
634        );
635        return DispatchOutcome::Tracked;
636    }
637
638    ensure_accepted_emitted(
639        client_order_id,
640        report.venue_order_id,
641        report.account_id,
642        identity,
643        state,
644        emitter,
645        report.ts_last,
646        ts_init,
647    );
648
649    let triggered = OrderTriggered::new(
650        emitter.trader_id(),
651        identity.strategy_id,
652        identity.instrument_id,
653        client_order_id,
654        UUID4::new(),
655        report.ts_last,
656        ts_init,
657        false,
658        Some(report.venue_order_id),
659        Some(report.account_id),
660    );
661    emitter.send_order_event(OrderEventAny::Triggered(triggered));
662    DispatchOutcome::Tracked
663}
664
665fn handle_canceled(
666    report: &OrderStatusReport,
667    client_order_id: ClientOrderId,
668    identity: &OrderIdentity,
669    state: &WsDispatchState,
670    emitter: &ExecutionEventEmitter,
671    ts_init: UnixNanos,
672) -> DispatchOutcome {
673    let venue_order_id = report.venue_order_id;
674
675    // Stale cancel suppression: if the cached venue_order_id has already
676    // been advanced by a cancel-replace promotion, this CANCELED refers to
677    // the old leg and has already been handled as OrderUpdated. See GH-3827.
678    if let Some(cached_voi) = state.cached_venue_order_id(&client_order_id)
679        && cached_voi != venue_order_id
680    {
681        log::debug!(
682            "Skipping stale CANCELED for {venue_order_id} (cached {cached_voi}) on {client_order_id}",
683        );
684        return DispatchOutcome::Skip;
685    }
686
687    // Cancel-before-accept race: an in-flight modify may deliver
688    // CANCELED(old_voi) before the replacement ACCEPTED(new_voi). The
689    // pending marker (set only after a confirmed modify HTTP success) lets
690    // us suppress the old leg so the later ACCEPTED can route through
691    // OrderUpdated. See GH-3827.
692    if let Some(pending_old) = state.pending_modify(&client_order_id)
693        && pending_old == venue_order_id
694    {
695        log::debug!(
696            "Skipping cancel-before-accept leg for {client_order_id}: venue_order_id={venue_order_id}",
697        );
698        return DispatchOutcome::Skip;
699    }
700
701    ensure_accepted_emitted(
702        client_order_id,
703        venue_order_id,
704        report.account_id,
705        identity,
706        state,
707        emitter,
708        report.ts_last,
709        ts_init,
710    );
711
712    let canceled = OrderCanceled::new(
713        emitter.trader_id(),
714        identity.strategy_id,
715        identity.instrument_id,
716        client_order_id,
717        UUID4::new(),
718        report.ts_last,
719        ts_init,
720        false,
721        Some(venue_order_id),
722        Some(report.account_id),
723    );
724    emitter.send_order_event(OrderEventAny::Canceled(canceled));
725
726    // Retain the filled marker so any late replay of the cancel is
727    // suppressed even after the identity state has been cleaned up.
728    state.insert_filled(client_order_id);
729    state.cleanup_terminal(&client_order_id);
730    DispatchOutcome::Tracked
731}
732
733fn handle_expired(
734    report: &OrderStatusReport,
735    client_order_id: ClientOrderId,
736    identity: &OrderIdentity,
737    state: &WsDispatchState,
738    emitter: &ExecutionEventEmitter,
739    ts_init: UnixNanos,
740) -> DispatchOutcome {
741    ensure_accepted_emitted(
742        client_order_id,
743        report.venue_order_id,
744        report.account_id,
745        identity,
746        state,
747        emitter,
748        report.ts_last,
749        ts_init,
750    );
751
752    let expired = OrderExpired::new(
753        emitter.trader_id(),
754        identity.strategy_id,
755        identity.instrument_id,
756        client_order_id,
757        UUID4::new(),
758        report.ts_last,
759        ts_init,
760        false,
761        Some(report.venue_order_id),
762        Some(report.account_id),
763    );
764    emitter.send_order_event(OrderEventAny::Expired(expired));
765    state.insert_filled(client_order_id);
766    state.cleanup_terminal(&client_order_id);
767    DispatchOutcome::Tracked
768}
769
770fn handle_rejected(
771    report: &OrderStatusReport,
772    client_order_id: ClientOrderId,
773    identity: &OrderIdentity,
774    state: &WsDispatchState,
775    emitter: &ExecutionEventEmitter,
776    ts_init: UnixNanos,
777) -> DispatchOutcome {
778    let reason = report
779        .cancel_reason
780        .clone()
781        .unwrap_or_else(|| "Order rejected by exchange".to_string());
782    let rejected = OrderRejected::new(
783        emitter.trader_id(),
784        identity.strategy_id,
785        identity.instrument_id,
786        client_order_id,
787        report.account_id,
788        Ustr::from(&reason),
789        UUID4::new(),
790        report.ts_last,
791        ts_init,
792        false,
793        false,
794    );
795    emitter.send_order_event(OrderEventAny::Rejected(rejected));
796    state.insert_filled(client_order_id);
797    state.cleanup_terminal(&client_order_id);
798    DispatchOutcome::Tracked
799}
800
801fn handle_filled_marker(
802    _client_order_id: ClientOrderId,
803    _state: &WsDispatchState,
804) -> DispatchOutcome {
805    // A status-only `FILLED` marker does not carry fill data; the actual
806    // `OrderFilled` is emitted from `dispatch_fill_report` when the matching
807    // trade arrives. Do *not* set `filled_orders` here, otherwise the
808    // follow-up fill would be classified as a stale replay and dropped
809    // before the terminal `OrderFilled` event can be emitted. The fill
810    // path installs the marker itself once the cumulative fill quantity
811    // matches the tracked order quantity.
812    DispatchOutcome::Tracked
813}
814
815/// Synthesizes and emits an `OrderAccepted` event when one has not yet been
816/// emitted for the given order.
817///
818/// Used before emitting non-Accepted events so strategies always observe the
819/// canonical `Submitted -> Accepted -> ...` lifecycle even when the venue
820/// compresses the placement and follow-up event into a single message (fast
821/// fills).
822#[allow(clippy::too_many_arguments)]
823fn ensure_accepted_emitted(
824    client_order_id: ClientOrderId,
825    venue_order_id: VenueOrderId,
826    account_id: AccountId,
827    identity: &OrderIdentity,
828    state: &WsDispatchState,
829    emitter: &ExecutionEventEmitter,
830    ts_event: UnixNanos,
831    ts_init: UnixNanos,
832) {
833    if state.emitted_accepted.contains(&client_order_id) {
834        return;
835    }
836    state.insert_accepted(client_order_id);
837    state.record_venue_order_id(client_order_id, venue_order_id);
838
839    let accepted = OrderAccepted::new(
840        emitter.trader_id(),
841        identity.strategy_id,
842        identity.instrument_id,
843        client_order_id,
844        venue_order_id,
845        account_id,
846        UUID4::new(),
847        ts_event,
848        ts_init,
849        false,
850    );
851    emitter.send_order_event(OrderEventAny::Accepted(accepted));
852}
853
854#[cfg(test)]
855mod tests {
856    use nautilus_model::identifiers::{ClientOrderId, InstrumentId, StrategyId, TradeId};
857    use rstest::rstest;
858
859    use super::*;
860
861    fn make_identity() -> OrderIdentity {
862        OrderIdentity {
863            strategy_id: StrategyId::from("S-001"),
864            instrument_id: InstrumentId::from("BTC-USD-PERP.HYPERLIQUID"),
865            order_side: OrderSide::Buy,
866            order_type: OrderType::Limit,
867            quantity: Quantity::from("0.0001"),
868            price: None,
869        }
870    }
871
872    #[rstest]
873    fn test_register_and_lookup_identity() {
874        let state = WsDispatchState::new();
875        let cid = ClientOrderId::new("O-001");
876        state.register_identity(cid, make_identity());
877
878        let found = state.lookup_identity(&cid);
879        assert!(found.is_some());
880        let identity = found.unwrap();
881        assert_eq!(identity.strategy_id.as_str(), "S-001");
882        assert_eq!(identity.order_side, OrderSide::Buy);
883    }
884
885    #[rstest]
886    fn test_lookup_identity_missing_returns_none() {
887        let state = WsDispatchState::new();
888        let cid = ClientOrderId::new("not-tracked");
889        assert!(state.lookup_identity(&cid).is_none());
890    }
891
892    #[rstest]
893    fn test_insert_accepted_dedup() {
894        let state = WsDispatchState::new();
895        let cid = ClientOrderId::new("O-002");
896        assert!(!state.emitted_accepted.contains(&cid));
897        state.insert_accepted(cid);
898        assert!(state.emitted_accepted.contains(&cid));
899        state.insert_accepted(cid);
900        assert!(state.emitted_accepted.contains(&cid));
901    }
902
903    #[rstest]
904    fn test_check_and_insert_trade_detects_duplicates() {
905        let state = WsDispatchState::new();
906        let trade = TradeId::new("trade-1");
907        assert!(!state.check_and_insert_trade(trade));
908        assert!(state.check_and_insert_trade(trade));
909    }
910
911    #[rstest]
912    fn test_bounded_dedup_fifo_eviction_preserves_recent_ids() {
913        let mut dedup: BoundedDedup<TradeId> = BoundedDedup::new(3);
914        assert!(!dedup.insert(TradeId::new("t-0")));
915        assert!(!dedup.insert(TradeId::new("t-1")));
916        assert!(!dedup.insert(TradeId::new("t-2")));
917        assert_eq!(dedup.len(), 3);
918
919        // Overflow evicts the oldest.
920        assert!(!dedup.insert(TradeId::new("t-3")));
921        assert_eq!(dedup.len(), 3);
922        assert!(!dedup.contains(&TradeId::new("t-0")));
923        assert!(dedup.contains(&TradeId::new("t-1")));
924        assert!(dedup.contains(&TradeId::new("t-3")));
925    }
926
927    #[rstest]
928    fn test_pending_modify_roundtrip() {
929        let state = WsDispatchState::new();
930        let cid = ClientOrderId::new("O-010");
931        let voi = VenueOrderId::new("v-1");
932
933        assert!(state.pending_modify(&cid).is_none());
934        state.mark_pending_modify(cid, voi);
935        assert_eq!(state.pending_modify(&cid), Some(voi));
936        state.clear_pending_modify(&cid);
937        assert!(state.pending_modify(&cid).is_none());
938    }
939
940    #[rstest]
941    fn test_cleanup_terminal_preserves_filled_marker() {
942        let state = WsDispatchState::new();
943        let cid = ClientOrderId::new("O-020");
944        state.register_identity(cid, make_identity());
945        state.insert_accepted(cid);
946        state.insert_filled(cid);
947        state.cleanup_terminal(&cid);
948
949        assert!(state.lookup_identity(&cid).is_none());
950        assert!(!state.emitted_accepted.contains(&cid));
951        // `filled_orders` outlives `cleanup_terminal` so replays stay suppressed.
952        assert!(state.filled_orders.contains(&cid));
953    }
954}