Skip to main content

nautilus_bitmex/websocket/
dispatch.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message dispatch for the BitMEX execution client.
17//!
18//! Routes incoming [`BitmexWsMessage`] variants to the appropriate parsing and
19//! event emission paths. Tracked orders (submitted through this client) produce
20//! proper order events; untracked orders fall back to execution reports for
21//! downstream reconciliation.
22
23use std::sync::{
24    Mutex,
25    atomic::{AtomicBool, Ordering},
26};
27
28use ahash::AHashMap;
29use dashmap::DashMap;
30use nautilus_core::{UUID4, UnixNanos};
31use nautilus_live::ExecutionEventEmitter;
32use nautilus_model::{
33    enums::{OrderSide, OrderType},
34    events::{OrderAccepted, OrderEventAny, OrderFilled, OrderUpdated},
35    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
36    instruments::{Instrument, InstrumentAny},
37    reports::FillReport,
38    types::Currency,
39};
40use ustr::Ustr;
41
42use crate::{
43    common::enums::{BitmexExecType, BitmexOrderType, BitmexPegPriceType},
44    http::parse::{InstrumentParseResult, parse_instrument_any},
45    websocket::{
46        enums::BitmexAction,
47        messages::{BitmexExecutionMsg, BitmexTableMessage, BitmexWsMessage, OrderData},
48        parse::{
49            ParsedOrderEvent, parse_execution_msg, parse_margin_account_state, parse_order_event,
50            parse_order_msg, parse_order_update_msg, parse_position_msg, parse_wallet_msg,
51        },
52    },
53};
54
55/// Maximum entries per generation before rotation.
56const DEDUP_GENERATION_CAPACITY: usize = 10_000;
57
58/// Order identity context stored at submission time, used by the WS dispatch
59/// task to produce proper order events without Cache access.
60///
61/// These fields are immutable for the lifetime of an order and are used to
62/// construct proper order events (`OrderAccepted`, `OrderFilled`, etc.) instead
63/// of execution reports.
64#[derive(Debug, Clone)]
65pub struct OrderIdentity {
66    pub instrument_id: InstrumentId,
67    pub strategy_id: StrategyId,
68    pub order_side: OrderSide,
69    pub order_type: OrderType,
70}
71
72/// Two-generation dedup set that avoids the duplicate-emission window caused
73/// by wholesale clearing. Holds a current and previous `AHashSet` behind a
74/// `Mutex`. When the current set fills up, `std::mem::swap` promotes it to
75/// previous and starts a fresh current, all under a single lock acquisition.
76/// Membership checks and removals also take the lock briefly.
77///
78/// The lock is held only for the duration of a hash-set insert (and sometimes
79/// a swap + clear), so contention is negligible.
80#[derive(Debug)]
81struct GenerationalDedupSet {
82    inner: Mutex<DedupInner>,
83}
84
85#[derive(Debug)]
86struct DedupInner {
87    current: ahash::AHashSet<ClientOrderId>,
88    previous: ahash::AHashSet<ClientOrderId>,
89}
90
91impl Default for GenerationalDedupSet {
92    fn default() -> Self {
93        Self {
94            inner: Mutex::new(DedupInner {
95                current: ahash::AHashSet::new(),
96                previous: ahash::AHashSet::new(),
97            }),
98        }
99    }
100}
101
102impl GenerationalDedupSet {
103    fn contains(&self, key: &ClientOrderId) -> bool {
104        let guard = self.inner.lock().expect("dedup lock poisoned");
105        guard.current.contains(key) || guard.previous.contains(key)
106    }
107
108    fn insert(&self, key: ClientOrderId) {
109        let mut guard = self.inner.lock().expect("dedup lock poisoned");
110        let inner = &mut *guard;
111        inner.current.insert(key);
112        if inner.current.len() >= DEDUP_GENERATION_CAPACITY {
113            inner.previous.clear();
114            std::mem::swap(&mut inner.current, &mut inner.previous);
115        }
116    }
117
118    fn remove(&self, key: &ClientOrderId) {
119        let mut guard = self.inner.lock().expect("dedup lock poisoned");
120        guard.current.remove(key);
121        guard.previous.remove(key);
122    }
123}
124
125/// Shared state for WS dispatch event deduplication and order tracking.
126///
127/// Uses `DashMap` and mutex-guarded sets for concurrent access from the stream task
128/// and the main thread without mutex contention.
129#[derive(Debug)]
130pub struct WsDispatchState {
131    pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
132    emitted_accepted: GenerationalDedupSet,
133    triggered_orders: GenerationalDedupSet,
134    filled_orders: GenerationalDedupSet,
135    tombstoned: GenerationalDedupSet,
136    pub margin_subscribed: AtomicBool,
137}
138
139impl Default for WsDispatchState {
140    fn default() -> Self {
141        Self {
142            order_identities: DashMap::new(),
143            emitted_accepted: GenerationalDedupSet::default(),
144            triggered_orders: GenerationalDedupSet::default(),
145            filled_orders: GenerationalDedupSet::default(),
146            tombstoned: GenerationalDedupSet::default(),
147            margin_subscribed: AtomicBool::new(false),
148        }
149    }
150}
151
152impl WsDispatchState {
153    pub(crate) fn accepted_contains(&self, cid: &ClientOrderId) -> bool {
154        self.emitted_accepted.contains(cid)
155    }
156
157    pub(crate) fn filled_contains(&self, cid: &ClientOrderId) -> bool {
158        self.filled_orders.contains(cid)
159    }
160
161    pub(crate) fn triggered_contains(&self, cid: &ClientOrderId) -> bool {
162        self.triggered_orders.contains(cid)
163    }
164
165    pub(crate) fn insert_accepted(&self, cid: ClientOrderId) {
166        self.emitted_accepted.insert(cid);
167    }
168
169    pub(crate) fn insert_filled(&self, cid: ClientOrderId) {
170        self.filled_orders.insert(cid);
171    }
172
173    pub(crate) fn insert_triggered(&self, cid: ClientOrderId) {
174        self.triggered_orders.insert(cid);
175    }
176
177    pub(crate) fn remove_triggered(&self, cid: &ClientOrderId) {
178        self.triggered_orders.remove(cid);
179    }
180
181    pub(crate) fn remove_filled(&self, cid: &ClientOrderId) {
182        self.filled_orders.remove(cid);
183    }
184
185    pub(crate) fn remove_accepted(&self, cid: &ClientOrderId) {
186        self.emitted_accepted.remove(cid);
187    }
188
189    /// Returns `true` if the order has been tombstoned by the HTTP cancel path.
190    pub(crate) fn is_tombstoned(&self, cid: &ClientOrderId) -> bool {
191        self.tombstoned.contains(cid)
192    }
193
194    /// Tombstones an order so the WS dispatch silently drops all subsequent
195    /// messages for it. Call after the HTTP path has already sent a terminal
196    /// report (cancel, expire, reject). The tombstone prevents stale WS
197    /// messages (Accepted, Triggered) that are still queued from being
198    /// processed as untracked orders and re-activating a closed order.
199    pub(crate) fn tombstone_order(&self, cid: &ClientOrderId) {
200        self.tombstoned.insert(*cid);
201        self.order_identities.remove(cid);
202        self.remove_accepted(cid);
203        self.remove_triggered(cid);
204        self.remove_filled(cid);
205    }
206}
207
208/// Top-level dispatch for all BitMEX WebSocket messages on the execution stream.
209#[expect(clippy::too_many_arguments)]
210pub fn dispatch_ws_message(
211    ts_init: UnixNanos,
212    message: BitmexWsMessage,
213    emitter: &ExecutionEventEmitter,
214    state: &WsDispatchState,
215    instruments_by_symbol: &mut AHashMap<Ustr, InstrumentAny>,
216    order_type_cache: &mut AHashMap<ClientOrderId, OrderType>,
217    order_symbol_cache: &mut AHashMap<ClientOrderId, Ustr>,
218    account_id: AccountId,
219) {
220    match message {
221        BitmexWsMessage::Table(table_msg) => match table_msg {
222            BitmexTableMessage::Order { data, .. } => {
223                dispatch_order_messages(
224                    data,
225                    emitter,
226                    state,
227                    instruments_by_symbol,
228                    order_type_cache,
229                    order_symbol_cache,
230                    account_id,
231                    ts_init,
232                );
233            }
234            BitmexTableMessage::Execution { data, .. } => {
235                dispatch_execution_messages(
236                    data,
237                    emitter,
238                    state,
239                    instruments_by_symbol,
240                    order_symbol_cache,
241                    ts_init,
242                );
243            }
244            BitmexTableMessage::Position { data, .. } => {
245                for pos_msg in data {
246                    let Some(instrument) = instruments_by_symbol.get(&pos_msg.symbol) else {
247                        log::error!(
248                            "Instrument cache miss: position dropped for symbol={}, account={}",
249                            pos_msg.symbol,
250                            pos_msg.account,
251                        );
252                        continue;
253                    };
254                    let report = parse_position_msg(&pos_msg, instrument, ts_init);
255                    emitter.send_position_report(report);
256                }
257            }
258            BitmexTableMessage::Wallet { data, .. } => {
259                if !state.margin_subscribed.load(Ordering::Relaxed) {
260                    for wallet_msg in data {
261                        let acct_state = parse_wallet_msg(&wallet_msg, ts_init);
262                        emitter.send_account_state(acct_state);
263                    }
264                }
265            }
266            BitmexTableMessage::Margin { data, .. } => {
267                state.margin_subscribed.store(true, Ordering::Relaxed);
268
269                for margin_msg in data {
270                    let acct_state = parse_margin_account_state(&margin_msg, ts_init);
271                    emitter.send_account_state(acct_state);
272                }
273            }
274            BitmexTableMessage::Instrument { action, data } => {
275                if matches!(action, BitmexAction::Partial | BitmexAction::Insert) {
276                    for msg in data {
277                        match msg.try_into() {
278                            Ok(http_inst) => match parse_instrument_any(&http_inst, ts_init) {
279                                InstrumentParseResult::Ok(boxed) => {
280                                    let inst = *boxed;
281                                    let symbol = inst.symbol().inner();
282                                    instruments_by_symbol.insert(symbol, inst);
283                                }
284                                InstrumentParseResult::Unsupported { .. }
285                                | InstrumentParseResult::Inactive { .. } => {}
286                                InstrumentParseResult::Failed { symbol, error, .. } => {
287                                    log::warn!("Failed to parse instrument {symbol}: {error}");
288                                }
289                            },
290                            Err(e) => {
291                                log::debug!("Skipping instrument (missing required fields): {e}");
292                            }
293                        }
294                    }
295                }
296            }
297            BitmexTableMessage::OrderBookL2 { .. }
298            | BitmexTableMessage::OrderBookL2_25 { .. }
299            | BitmexTableMessage::OrderBook10 { .. }
300            | BitmexTableMessage::Quote { .. }
301            | BitmexTableMessage::Trade { .. }
302            | BitmexTableMessage::TradeBin1m { .. }
303            | BitmexTableMessage::TradeBin5m { .. }
304            | BitmexTableMessage::TradeBin1h { .. }
305            | BitmexTableMessage::TradeBin1d { .. }
306            | BitmexTableMessage::Funding { .. } => {
307                log::debug!("Ignoring BitMEX data message on execution stream");
308            }
309            _ => {
310                log::warn!("Unhandled table message type on execution stream");
311            }
312        },
313        BitmexWsMessage::Reconnected => {
314            order_type_cache.clear();
315            order_symbol_cache.clear();
316            log::info!("BitMEX execution websocket reconnected");
317        }
318        BitmexWsMessage::Authenticated => {
319            log::debug!("BitMEX execution websocket authenticated");
320        }
321    }
322}
323
324/// Dispatches order messages, routing tracked orders to events and untracked
325/// orders to reports.
326#[expect(clippy::too_many_arguments)]
327fn dispatch_order_messages(
328    data: Vec<OrderData>,
329    emitter: &ExecutionEventEmitter,
330    state: &WsDispatchState,
331    instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
332    order_type_cache: &mut AHashMap<ClientOrderId, OrderType>,
333    order_symbol_cache: &mut AHashMap<ClientOrderId, Ustr>,
334    account_id: AccountId,
335    ts_init: UnixNanos,
336) {
337    for order_data in data {
338        match order_data {
339            OrderData::Full(order_msg) => {
340                let Some(instrument) = instruments_by_symbol.get(&order_msg.symbol) else {
341                    log::error!(
342                        "Instrument cache miss: order dropped for symbol={}, order_id={}",
343                        order_msg.symbol,
344                        order_msg.order_id,
345                    );
346                    continue;
347                };
348
349                let client_order_id = order_msg.cl_ord_id.map(ClientOrderId::new);
350
351                // Update caches before tombstone check so execution messages
352                // that arrive later can still resolve the symbol
353                if let Some(ref cid) = client_order_id {
354                    if let Some(ord_type) = &order_msg.ord_type {
355                        let order_type: OrderType = if *ord_type == BitmexOrderType::Pegged
356                            && order_msg.peg_price_type == Some(BitmexPegPriceType::TrailingStopPeg)
357                        {
358                            if order_msg.price.is_some() {
359                                OrderType::TrailingStopLimit
360                            } else {
361                                OrderType::TrailingStopMarket
362                            }
363                        } else {
364                            (*ord_type).into()
365                        };
366                        order_type_cache.insert(*cid, order_type);
367                    }
368                    order_symbol_cache.insert(*cid, order_msg.symbol);
369                }
370
371                // Skip tombstoned orders (already handled by HTTP cancel path)
372                if let Some(ref cid) = client_order_id
373                    && state.is_tombstoned(cid)
374                {
375                    log::debug!("Skipping tombstoned order {cid}");
376                    continue;
377                }
378
379                let identity = client_order_id
380                    .and_then(|cid| state.order_identities.get(&cid).map(|r| (cid, r.clone())));
381
382                if let Some((cid, ident)) = identity {
383                    // Tracked order: produce order events
384                    if let Some(event) = parse_order_event(
385                        &order_msg,
386                        cid,
387                        account_id,
388                        emitter.trader_id(),
389                        ident.strategy_id,
390                        ts_init,
391                    ) {
392                        let venue_order_id = VenueOrderId::new(order_msg.order_id.to_string());
393                        dispatch_parsed_order_event(
394                            event,
395                            cid,
396                            account_id,
397                            venue_order_id,
398                            &ident,
399                            emitter,
400                            state,
401                            ts_init,
402                        );
403                    }
404
405                    // Clean up caches on terminal status
406                    if order_msg.ord_status.is_terminal() {
407                        order_type_cache.remove(&cid);
408                        order_symbol_cache.remove(&cid);
409                    }
410                } else {
411                    // Untracked order: fall back to report
412                    match parse_order_msg(&order_msg, instrument, order_type_cache, ts_init) {
413                        Ok(report) => {
414                            if report.order_status.is_closed()
415                                && let Some(cid) = report.client_order_id
416                            {
417                                order_type_cache.remove(&cid);
418                                order_symbol_cache.remove(&cid);
419                            }
420                            emitter.send_order_status_report(report);
421                        }
422                        Err(e) => {
423                            log::error!(
424                                "Failed to parse order report: error={e}, symbol={}, order_id={}",
425                                order_msg.symbol,
426                                order_msg.order_id,
427                            );
428                        }
429                    }
430                }
431            }
432            OrderData::Update(msg) => {
433                let Some(instrument) = instruments_by_symbol.get(&msg.symbol) else {
434                    log::error!(
435                        "Instrument cache miss: order update dropped for symbol={}, order_id={}",
436                        msg.symbol,
437                        msg.order_id,
438                    );
439                    continue;
440                };
441
442                // Populate cache for execution message routing
443                if let Some(cl_ord_id) = &msg.cl_ord_id {
444                    let client_order_id = ClientOrderId::new(cl_ord_id);
445                    order_symbol_cache.insert(client_order_id, msg.symbol);
446                }
447
448                let identity = msg.cl_ord_id.as_ref().and_then(|cl| {
449                    let cid = ClientOrderId::new(cl);
450                    state.order_identities.get(&cid).map(|r| (cid, r.clone()))
451                });
452
453                if let Some((cid, ident)) = identity {
454                    // Tracked: enrich with identity context
455                    if let Some(event) =
456                        parse_order_update_msg(&msg, instrument, account_id, ts_init)
457                    {
458                        let enriched = OrderUpdated::new(
459                            emitter.trader_id(),
460                            ident.strategy_id,
461                            event.instrument_id,
462                            cid,
463                            event.quantity,
464                            event.event_id,
465                            event.ts_event,
466                            event.ts_init,
467                            false,
468                            event.venue_order_id,
469                            Some(account_id),
470                            event.price,
471                            event.trigger_price,
472                            event.protection_price,
473                            false, // is_quote_quantity
474                        );
475                        ensure_accepted_emitted(
476                            cid,
477                            account_id,
478                            enriched
479                                .venue_order_id
480                                .unwrap_or_else(|| VenueOrderId::new(msg.order_id.to_string())),
481                            &ident,
482                            emitter,
483                            state,
484                            ts_init,
485                        );
486                        emitter.send_order_event(OrderEventAny::Updated(enriched));
487                    } else {
488                        log::warn!(
489                            "Skipped order update (insufficient data): order_id={}, price={:?}",
490                            msg.order_id,
491                            msg.price,
492                        );
493                    }
494                } else {
495                    log::debug!(
496                        "Skipping order update for untracked order: order_id={}",
497                        msg.order_id,
498                    );
499                }
500            }
501        }
502    }
503}
504
505/// Dispatches execution (fill) messages, routing tracked orders to
506/// `OrderFilled` events and untracked orders to `FillReport`.
507fn dispatch_execution_messages(
508    data: Vec<BitmexExecutionMsg>,
509    emitter: &ExecutionEventEmitter,
510    state: &WsDispatchState,
511    instruments_by_symbol: &AHashMap<Ustr, InstrumentAny>,
512    order_symbol_cache: &AHashMap<ClientOrderId, Ustr>,
513    ts_init: UnixNanos,
514) {
515    for exec_msg in data {
516        let symbol_opt = if let Some(sym) = &exec_msg.symbol {
517            Some(*sym)
518        } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
519            let client_order_id = ClientOrderId::new(cl_ord_id);
520            order_symbol_cache.get(&client_order_id).copied()
521        } else {
522            None
523        };
524
525        let Some(symbol) = symbol_opt else {
526            if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
527                if exec_msg.exec_type == Some(BitmexExecType::Trade) {
528                    log::warn!(
529                        "Execution missing symbol and not in cache: \
530                        cl_ord_id={cl_ord_id}, exec_id={:?}",
531                        exec_msg.exec_id,
532                    );
533                } else {
534                    log::debug!(
535                        "Execution missing symbol and not in cache: \
536                        cl_ord_id={cl_ord_id}, exec_type={:?}",
537                        exec_msg.exec_type,
538                    );
539                }
540            } else if exec_msg.exec_type == Some(BitmexExecType::CancelReject) {
541                log::debug!(
542                    "CancelReject missing symbol/clOrdID (expected with redundant cancels): \
543                    exec_id={:?}, order_id={:?}",
544                    exec_msg.exec_id,
545                    exec_msg.order_id,
546                );
547            } else {
548                log::warn!(
549                    "Execution missing both symbol and clOrdID: \
550                    exec_id={:?}, order_id={:?}, exec_type={:?}",
551                    exec_msg.exec_id,
552                    exec_msg.order_id,
553                    exec_msg.exec_type,
554                );
555            }
556            continue;
557        };
558
559        let Some(instrument) = instruments_by_symbol.get(&symbol) else {
560            log::error!(
561                "Instrument cache miss: execution dropped for symbol={}, exec_id={:?}, exec_type={:?}",
562                symbol,
563                exec_msg.exec_id,
564                exec_msg.exec_type,
565            );
566            continue;
567        };
568
569        let Some(fill) = parse_execution_msg(exec_msg, instrument, ts_init) else {
570            continue;
571        };
572
573        let identity = fill
574            .client_order_id
575            .and_then(|cid| state.order_identities.get(&cid).map(|r| (cid, r.clone())));
576
577        if let Some((cid, ident)) = identity {
578            // Tracked: produce OrderFilled event
579            let venue_order_id = fill.venue_order_id;
580            ensure_accepted_emitted(
581                cid,
582                fill.account_id,
583                venue_order_id,
584                &ident,
585                emitter,
586                state,
587                ts_init,
588            );
589            state.insert_filled(cid);
590            state.remove_triggered(&cid);
591            let filled = fill_report_to_order_filled(
592                &fill,
593                emitter.trader_id(),
594                &ident,
595                instrument.quote_currency(),
596            );
597            emitter.send_order_event(OrderEventAny::Filled(filled));
598        } else {
599            // Untracked: forward as FillReport
600            emitter.send_fill_report(fill);
601        }
602    }
603}
604
605/// Dispatches a parsed order event with lifecycle synthesis and deduplication.
606///
607/// Guarantees the `Submitted -> Accepted -> ...` lifecycle by synthesizing
608/// `OrderAccepted` before any other event when one has not yet been emitted.
609#[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
610fn dispatch_parsed_order_event(
611    event: ParsedOrderEvent,
612    client_order_id: ClientOrderId,
613    account_id: AccountId,
614    venue_order_id: VenueOrderId,
615    identity: &OrderIdentity,
616    emitter: &ExecutionEventEmitter,
617    state: &WsDispatchState,
618    ts_init: UnixNanos,
619) {
620    let is_terminal;
621
622    match event {
623        ParsedOrderEvent::Accepted(e) => {
624            if state.accepted_contains(&client_order_id)
625                || state.filled_contains(&client_order_id)
626                || state.triggered_contains(&client_order_id)
627            {
628                log::debug!("Skipping duplicate Accepted for {client_order_id}");
629                return;
630            }
631            state.insert_accepted(client_order_id);
632            is_terminal = false;
633            emitter.send_order_event(OrderEventAny::Accepted(e));
634        }
635        ParsedOrderEvent::Triggered(e) => {
636            if state.filled_contains(&client_order_id) {
637                log::debug!("Skipping stale Triggered for {client_order_id} (already filled)");
638                return;
639            }
640            ensure_accepted_emitted(
641                client_order_id,
642                account_id,
643                venue_order_id,
644                identity,
645                emitter,
646                state,
647                ts_init,
648            );
649            state.insert_triggered(client_order_id);
650            is_terminal = false;
651            emitter.send_order_event(OrderEventAny::Triggered(e));
652        }
653        ParsedOrderEvent::Canceled(e) => {
654            ensure_accepted_emitted(
655                client_order_id,
656                account_id,
657                venue_order_id,
658                identity,
659                emitter,
660                state,
661                ts_init,
662            );
663            state.remove_triggered(&client_order_id);
664            state.remove_filled(&client_order_id);
665            is_terminal = true;
666            emitter.send_order_event(OrderEventAny::Canceled(e));
667        }
668        ParsedOrderEvent::Expired(e) => {
669            ensure_accepted_emitted(
670                client_order_id,
671                account_id,
672                venue_order_id,
673                identity,
674                emitter,
675                state,
676                ts_init,
677            );
678            state.remove_triggered(&client_order_id);
679            state.remove_filled(&client_order_id);
680            is_terminal = true;
681            emitter.send_order_event(OrderEventAny::Expired(e));
682        }
683        ParsedOrderEvent::Rejected(e) => {
684            state.remove_triggered(&client_order_id);
685            state.remove_filled(&client_order_id);
686            is_terminal = true;
687            emitter.send_order_event(OrderEventAny::Rejected(e));
688        }
689    }
690
691    if is_terminal {
692        state.order_identities.remove(&client_order_id);
693        state.remove_accepted(&client_order_id);
694    }
695}
696
697/// Synthesizes and emits `OrderAccepted` if one has not yet been emitted for
698/// this order. Handles fast-filling orders that skip the `New` state.
699fn ensure_accepted_emitted(
700    client_order_id: ClientOrderId,
701    account_id: AccountId,
702    venue_order_id: VenueOrderId,
703    identity: &OrderIdentity,
704    emitter: &ExecutionEventEmitter,
705    state: &WsDispatchState,
706    ts_init: UnixNanos,
707) {
708    if state.accepted_contains(&client_order_id) {
709        return;
710    }
711    state.insert_accepted(client_order_id);
712    let accepted = OrderAccepted::new(
713        emitter.trader_id(),
714        identity.strategy_id,
715        identity.instrument_id,
716        client_order_id,
717        venue_order_id,
718        account_id,
719        UUID4::new(),
720        ts_init,
721        ts_init,
722        false,
723    );
724    emitter.send_order_event(OrderEventAny::Accepted(accepted));
725}
726
727/// Converts a [`FillReport`] into an [`OrderFilled`] event using tracked identity.
728pub(crate) fn fill_report_to_order_filled(
729    report: &FillReport,
730    trader_id: TraderId,
731    identity: &OrderIdentity,
732    quote_currency: Currency,
733) -> OrderFilled {
734    OrderFilled::new(
735        trader_id,
736        identity.strategy_id,
737        report.instrument_id,
738        report
739            .client_order_id
740            .expect("tracked order has client_order_id"),
741        report.venue_order_id,
742        report.account_id,
743        report.trade_id,
744        identity.order_side,
745        identity.order_type,
746        report.last_qty,
747        report.last_px,
748        quote_currency,
749        report.liquidity_side,
750        UUID4::new(),
751        report.ts_event,
752        report.ts_init,
753        false,
754        report.venue_position_id,
755        Some(report.commission),
756    )
757}