Skip to main content

nautilus_okx/websocket/
dispatch.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message dispatch for the OKX execution client.
17//!
18//! Routes incoming [`OKXWsMessage`] variants to the appropriate parsing and
19//! event emission paths. Tracked orders (submitted through this client) produce
20//! proper order events; untracked orders fall back to execution reports for
21//! downstream reconciliation.
22
23use std::sync::{
24    Arc,
25    atomic::{AtomicBool, Ordering},
26};
27
28use ahash::AHashMap;
29use dashmap::{DashMap, DashSet};
30use nautilus_core::{UUID4, UnixNanos, time::AtomicTime};
31use nautilus_live::ExecutionEventEmitter;
32use nautilus_model::{
33    enums::{OrderSide, OrderStatus, OrderType},
34    events::{OrderAccepted, OrderEventAny, OrderFilled, OrderRejected},
35    identifiers::{
36        AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
37    },
38    instruments::{Instrument, InstrumentAny},
39    orders::TRIGGERABLE_ORDER_TYPES,
40    reports::FillReport,
41    types::{Currency, Money, Quantity},
42};
43use ustr::Ustr;
44
45use crate::{
46    common::{
47        consts::{OKX_FIELD_CLORDID, OKX_FIELD_SCODE, OKX_FIELD_SMSG, OKX_SUCCESS_CODE},
48        enums::OKXOrderStatus,
49        parse::{
50            is_market_price, parse_client_order_id, parse_millisecond_timestamp, parse_price,
51            parse_quantity,
52        },
53    },
54    http::models::{OKXAccount, OKXCancelAlgoOrderResponse, OKXPosition},
55    websocket::{
56        client::PendingOrderInfo,
57        enums::OKXWsOperation,
58        handler::is_post_only_auto_cancel,
59        messages::{ExecutionReport, OKXOrderMsg, OKXWsMessage},
60        parse::{
61            OrderStateSnapshot, ParsedOrderEvent, parse_algo_order_msg, parse_order_event,
62            parse_order_msg, update_fee_fill_caches,
63        },
64    },
65};
66
67/// Maximum entries in the dedup sets before they are cleared.
68const DEDUP_CAPACITY: usize = 10_000;
69
70/// Order identity context stored at submission time, used by the WS dispatch
71/// task to produce proper order events without Cache access.
72///
73/// These fields are immutable for the lifetime of an order and are used to
74/// construct proper order events (OrderAccepted, OrderFilled, etc.) instead
75/// of execution reports.
76#[derive(Debug, Clone)]
77pub struct OrderIdentity {
78    pub instrument_id: InstrumentId,
79    pub strategy_id: StrategyId,
80    pub order_side: OrderSide,
81    pub order_type: OrderType,
82}
83
84/// Shared state for cross-stream event deduplication between the private
85/// and business WebSocket dispatch loops.
86///
87/// Uses `DashMap`/`DashSet` for concurrent access from both stream tasks
88/// and the main thread without mutex contention.
89#[derive(Debug)]
90pub struct WsDispatchState {
91    pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
92    pub emitted_accepted: DashSet<ClientOrderId>,
93    pub triggered_orders: DashSet<ClientOrderId>,
94    pub filled_orders: DashSet<ClientOrderId>,
95    pub emitted_trades: DashSet<TradeId>,
96    pub(crate) pending_orders: Arc<DashMap<String, PendingOrderInfo>>,
97    pub(crate) pending_cancels: Arc<DashMap<String, PendingOrderInfo>>,
98    pub(crate) pending_amends: Arc<DashMap<String, PendingOrderInfo>>,
99    clearing: AtomicBool,
100}
101
102impl Default for WsDispatchState {
103    fn default() -> Self {
104        Self {
105            order_identities: DashMap::new(),
106            emitted_accepted: DashSet::default(),
107            triggered_orders: DashSet::default(),
108            filled_orders: DashSet::default(),
109            emitted_trades: DashSet::default(),
110            pending_orders: Arc::new(DashMap::new()),
111            pending_cancels: Arc::new(DashMap::new()),
112            pending_amends: Arc::new(DashMap::new()),
113            clearing: AtomicBool::new(false),
114        }
115    }
116}
117
118impl WsDispatchState {
119    // Creates a dispatch state sharing the pending operation maps
120    // with the WebSocket client that populates them
121    pub(crate) fn with_pending_maps(
122        pending_orders: Arc<DashMap<String, PendingOrderInfo>>,
123        pending_cancels: Arc<DashMap<String, PendingOrderInfo>>,
124        pending_amends: Arc<DashMap<String, PendingOrderInfo>>,
125    ) -> Self {
126        Self {
127            pending_orders,
128            pending_cancels,
129            pending_amends,
130            ..Default::default()
131        }
132    }
133}
134
135impl WsDispatchState {
136    fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
137        if set.len() >= DEDUP_CAPACITY
138            && self
139                .clearing
140                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
141                .is_ok()
142        {
143            set.clear();
144            self.clearing.store(false, Ordering::Release);
145        }
146    }
147
148    pub(crate) fn insert_accepted(&self, cid: ClientOrderId) {
149        self.evict_if_full(&self.emitted_accepted);
150        self.emitted_accepted.insert(cid);
151    }
152
153    pub(crate) fn insert_filled(&self, cid: ClientOrderId) {
154        self.evict_if_full(&self.filled_orders);
155        self.filled_orders.insert(cid);
156    }
157
158    pub(crate) fn insert_triggered(&self, cid: ClientOrderId) {
159        self.evict_if_full(&self.triggered_orders);
160        self.triggered_orders.insert(cid);
161    }
162
163    /// Returns `true` if this trade was already emitted (duplicate).
164    /// Uses atomic insert to avoid TOCTOU races between concurrent streams.
165    pub fn check_and_insert_trade(&self, trade_id: TradeId) -> bool {
166        self.evict_if_full_trades();
167        !self.emitted_trades.insert(trade_id)
168    }
169
170    fn evict_if_full_trades(&self) {
171        if self.emitted_trades.len() >= DEDUP_CAPACITY
172            && self
173                .clearing
174                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
175                .is_ok()
176        {
177            self.emitted_trades.clear();
178            self.clearing.store(false, Ordering::Release);
179        }
180    }
181}
182
183/// Dispatches a WebSocket message with cross-stream deduplication.
184///
185/// For orders with a tracked identity (submitted through this client), produces
186/// proper order events (OrderAccepted, OrderCanceled, OrderFilled, etc.).
187/// For untracked orders (external or pre-existing), falls back to execution
188/// reports for downstream reconciliation.
189#[expect(clippy::too_many_arguments)]
190pub fn dispatch_ws_message(
191    message: OKXWsMessage,
192    emitter: &ExecutionEventEmitter,
193    state: &WsDispatchState,
194    account_id: AccountId,
195    instruments: &AHashMap<Ustr, InstrumentAny>,
196    fee_cache: &mut AHashMap<Ustr, Money>,
197    filled_qty_cache: &mut AHashMap<Ustr, Quantity>,
198    order_state_cache: &mut AHashMap<ClientOrderId, OrderStateSnapshot>,
199    clock: &AtomicTime,
200) {
201    match message {
202        OKXWsMessage::Orders(order_msgs) => {
203            let ts_init = clock.get_time_ns();
204            dispatch_order_messages(
205                &order_msgs,
206                emitter,
207                state,
208                account_id,
209                instruments,
210                fee_cache,
211                filled_qty_cache,
212                order_state_cache,
213                ts_init,
214            );
215        }
216        OKXWsMessage::AlgoOrders(algo_msgs) => {
217            let ts_init = clock.get_time_ns();
218            let mut reports = Vec::new();
219
220            for msg in algo_msgs {
221                match parse_algo_order_msg(&msg, account_id, instruments, ts_init) {
222                    Ok(Some(report)) => reports.push(report),
223                    Ok(None) => {}
224                    Err(e) => log::error!("Failed to parse algo order message: {e}"),
225                }
226            }
227            dispatch_execution_reports(reports, emitter, state);
228        }
229        OKXWsMessage::Account(data) => {
230            let ts_init = clock.get_time_ns();
231
232            match serde_json::from_value::<Vec<OKXAccount>>(data) {
233                Ok(accounts) => {
234                    for account in &accounts {
235                        match crate::common::parse::parse_account_state(
236                            account, account_id, ts_init,
237                        ) {
238                            Ok(account_state) => emitter.send_account_state(account_state),
239                            Err(e) => log::error!("Failed to parse account state: {e}"),
240                        }
241                    }
242                }
243                Err(e) => log::error!("Failed to deserialize account data: {e}"),
244            }
245        }
246        OKXWsMessage::Positions(data) => {
247            let ts_init = clock.get_time_ns();
248
249            match serde_json::from_value::<Vec<OKXPosition>>(data) {
250                Ok(positions) => {
251                    for position in positions {
252                        let Some(instrument) = instruments.get(&position.inst_id) else {
253                            log::warn!("No cached instrument for position: {}", position.inst_id);
254                            continue;
255                        };
256                        let instrument_id = instrument.id();
257                        let size_precision = instrument.size_precision();
258
259                        match crate::common::parse::parse_position_status_report(
260                            &position,
261                            account_id,
262                            instrument_id,
263                            size_precision,
264                            ts_init,
265                        ) {
266                            Ok(report) => emitter.send_position_report(report),
267                            Err(e) => log::error!("Failed to parse position report: {e}"),
268                        }
269                    }
270                }
271                Err(e) => log::error!("Failed to deserialize positions data: {e}"),
272            }
273        }
274        OKXWsMessage::OrderResponse {
275            id,
276            op,
277            code,
278            msg,
279            data,
280        } => {
281            let ts_init = clock.get_time_ns();
282
283            for item in &data {
284                let s_code = item
285                    .get(OKX_FIELD_SCODE)
286                    .and_then(|v| v.as_str())
287                    .unwrap_or("");
288                let s_msg = item
289                    .get(OKX_FIELD_SMSG)
290                    .and_then(|v| v.as_str())
291                    .unwrap_or("");
292                let cl_ord_id = item
293                    .get(OKX_FIELD_CLORDID)
294                    .and_then(|v| v.as_str())
295                    .unwrap_or("");
296
297                if s_code == OKX_SUCCESS_CODE {
298                    log::debug!("Order response ok: op={op:?} cl_ord_id={cl_ord_id}");
299                    match op {
300                        OKXWsOperation::Order
301                        | OKXWsOperation::BatchOrders
302                        | OKXWsOperation::OrderAlgo => {
303                            state.pending_orders.remove(cl_ord_id);
304                        }
305                        OKXWsOperation::CancelOrder
306                        | OKXWsOperation::BatchCancelOrders
307                        | OKXWsOperation::MassCancel
308                        | OKXWsOperation::CancelAlgos => {
309                            state.pending_cancels.remove(cl_ord_id);
310                        }
311                        OKXWsOperation::AmendOrder | OKXWsOperation::BatchAmendOrders => {
312                            state.pending_amends.remove(cl_ord_id);
313                        }
314                        _ => {}
315                    }
316                    continue;
317                }
318
319                let Some(client_order_id) = parse_client_order_id(cl_ord_id) else {
320                    log::warn!(
321                        "Order response error without client_order_id: \
322                         op={op:?} s_code={s_code} s_msg={s_msg}"
323                    );
324                    continue;
325                };
326
327                let Some(ident) = state.order_identities.get(&client_order_id) else {
328                    log::warn!(
329                        "Order response error for untracked order: \
330                         op={op:?} cl_ord_id={cl_ord_id} s_code={s_code} s_msg={s_msg}"
331                    );
332                    continue;
333                };
334
335                let venue_order_id = item
336                    .get("ordId")
337                    .and_then(|v| v.as_str())
338                    .filter(|s| !s.is_empty())
339                    .map(VenueOrderId::new);
340
341                match op {
342                    OKXWsOperation::Order | OKXWsOperation::BatchOrders => {
343                        state.order_identities.remove(&client_order_id);
344                        state.pending_orders.remove(cl_ord_id);
345                        emitter.emit_order_rejected_event(
346                            ident.strategy_id,
347                            ident.instrument_id,
348                            client_order_id,
349                            s_msg,
350                            ts_init,
351                            false,
352                        );
353                    }
354                    OKXWsOperation::CancelOrder
355                    | OKXWsOperation::BatchCancelOrders
356                    | OKXWsOperation::MassCancel => {
357                        state.pending_cancels.remove(cl_ord_id);
358                        emitter.emit_order_cancel_rejected_event(
359                            ident.strategy_id,
360                            ident.instrument_id,
361                            client_order_id,
362                            venue_order_id,
363                            s_msg,
364                            ts_init,
365                        );
366                    }
367                    OKXWsOperation::AmendOrder | OKXWsOperation::BatchAmendOrders => {
368                        state.pending_amends.remove(cl_ord_id);
369                        emitter.emit_order_modify_rejected_event(
370                            ident.strategy_id,
371                            ident.instrument_id,
372                            client_order_id,
373                            venue_order_id,
374                            s_msg,
375                            ts_init,
376                        );
377                    }
378                    _ => {
379                        log::warn!(
380                            "Order response error for unhandled op: \
381                             op={op:?} cl_ord_id={cl_ord_id} s_code={s_code} s_msg={s_msg}"
382                        );
383                    }
384                }
385            }
386
387            if code != "0" && data.is_empty() {
388                log::warn!(
389                    "Order response error (no data): id={id:?} op={op:?} code={code} msg={msg}"
390                );
391            }
392        }
393        OKXWsMessage::SendFailed {
394            request_id,
395            client_order_id,
396            op,
397            error,
398        } => {
399            log::error!("WebSocket send failed: request_id={request_id} error={error}");
400
401            if let Some(client_order_id) = client_order_id {
402                let ts_init = clock.get_time_ns();
403
404                match op {
405                    Some(
406                        OKXWsOperation::Order
407                        | OKXWsOperation::BatchOrders
408                        | OKXWsOperation::OrderAlgo,
409                    ) => {
410                        let key = client_order_id.as_str();
411                        state.pending_orders.remove(key);
412                        if let Some((_, ident)) = state.order_identities.remove(&client_order_id) {
413                            emitter.emit_order_rejected_event(
414                                ident.strategy_id,
415                                ident.instrument_id,
416                                client_order_id,
417                                &error,
418                                ts_init,
419                                false,
420                            );
421                        }
422                    }
423                    Some(
424                        OKXWsOperation::CancelOrder
425                        | OKXWsOperation::BatchCancelOrders
426                        | OKXWsOperation::MassCancel
427                        | OKXWsOperation::CancelAlgos,
428                    ) => {
429                        let key = client_order_id.as_str();
430                        state.pending_cancels.remove(key);
431                        if let Some(ident) = state.order_identities.get(&client_order_id) {
432                            emitter.emit_order_cancel_rejected_event(
433                                ident.strategy_id,
434                                ident.instrument_id,
435                                client_order_id,
436                                None,
437                                &error,
438                                ts_init,
439                            );
440                        }
441                    }
442                    Some(OKXWsOperation::AmendOrder | OKXWsOperation::BatchAmendOrders) => {
443                        let key = client_order_id.as_str();
444                        state.pending_amends.remove(key);
445                        if let Some(ident) = state.order_identities.get(&client_order_id) {
446                            emitter.emit_order_modify_rejected_event(
447                                ident.strategy_id,
448                                ident.instrument_id,
449                                client_order_id,
450                                None,
451                                &error,
452                                ts_init,
453                            );
454                        }
455                    }
456                    _ => {
457                        log::warn!(
458                            "SendFailed for {client_order_id} with unknown op, cannot emit rejection"
459                        );
460                    }
461                }
462            }
463        }
464        OKXWsMessage::ChannelData { channel, .. } => {
465            log::debug!("Ignoring data channel message on execution client: {channel:?}");
466        }
467        OKXWsMessage::BookData { .. } | OKXWsMessage::Instruments(_) => {
468            log::debug!("Ignoring data message on execution client");
469        }
470        OKXWsMessage::Error(e) => {
471            log::warn!(
472                "Websocket error: code={} message={} conn_id={:?}",
473                e.code,
474                e.message,
475                e.conn_id
476            );
477        }
478        OKXWsMessage::Reconnected => {
479            log::info!("Websocket reconnected");
480        }
481        OKXWsMessage::Authenticated => {
482            log::debug!("Websocket authenticated");
483        }
484    }
485}
486
487/// Dispatches order messages, producing proper order events for tracked orders
488/// and falling back to execution reports for untracked/external orders.
489#[expect(clippy::too_many_arguments)]
490fn dispatch_order_messages(
491    order_msgs: &[OKXOrderMsg],
492    emitter: &ExecutionEventEmitter,
493    state: &WsDispatchState,
494    account_id: AccountId,
495    instruments: &AHashMap<Ustr, InstrumentAny>,
496    fee_cache: &mut AHashMap<Ustr, Money>,
497    filled_qty_cache: &mut AHashMap<Ustr, Quantity>,
498    order_state_cache: &mut AHashMap<ClientOrderId, OrderStateSnapshot>,
499    ts_init: UnixNanos,
500) {
501    for msg in order_msgs {
502        let Some(instrument) = instruments.get(&msg.inst_id) else {
503            log::warn!("No instrument for {}, skipping order message", msg.inst_id);
504            continue;
505        };
506
507        let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
508            log::debug!(
509                "Order without client_order_id (ord_id={}), sending as report",
510                msg.ord_id
511            );
512            dispatch_order_msg_as_report(
513                msg,
514                account_id,
515                instruments,
516                fee_cache,
517                filled_qty_cache,
518                emitter,
519                state,
520                ts_init,
521            );
522            continue;
523        };
524
525        // Resolve identity: check direct match first, then fall back to the
526        // parent algo order ID for triggered child orders. OKX assigns a new
527        // cl_ord_id to child orders when an algo/stop triggers, preserving the
528        // parent's client order ID in algo_cl_ord_id.
529        let (client_order_id, identity) = match state
530            .order_identities
531            .get(&client_order_id)
532            .map(|r| r.clone())
533        {
534            Some(ident) => (client_order_id, Some(ident)),
535            None => {
536                if let Some(parent_id) = msg
537                    .algo_cl_ord_id
538                    .as_deref()
539                    .and_then(parse_client_order_id)
540                {
541                    let parent_ident = state.order_identities.get(&parent_id).map(|r| r.clone());
542
543                    if parent_ident.is_some() {
544                        (parent_id, parent_ident)
545                    } else {
546                        (client_order_id, None)
547                    }
548                } else {
549                    (client_order_id, None)
550                }
551            }
552        };
553
554        if let Some(ident) = identity {
555            if is_post_only_auto_cancel(msg) {
556                let ts_event = parse_millisecond_timestamp(msg.u_time);
557                let rejected = OrderRejected::new(
558                    emitter.trader_id(),
559                    ident.strategy_id,
560                    instrument.id(),
561                    client_order_id,
562                    account_id,
563                    Ustr::from("Post-only order would have taken liquidity"),
564                    UUID4::new(),
565                    ts_event,
566                    ts_init,
567                    false,
568                    true, // due_post_only
569                );
570                state.order_identities.remove(&client_order_id);
571                order_state_cache.remove(&client_order_id);
572                fee_cache.remove(&msg.ord_id);
573                filled_qty_cache.remove(&msg.ord_id);
574                emitter.send_order_event(OrderEventAny::Rejected(rejected));
575                continue;
576            }
577
578            let previous_fee = fee_cache.get(&msg.ord_id).copied();
579            let previous_filled_qty = filled_qty_cache.get(&msg.ord_id).copied();
580            let previous_state = order_state_cache.get(&client_order_id);
581
582            match parse_order_event(
583                msg,
584                client_order_id,
585                account_id,
586                emitter.trader_id(),
587                ident.strategy_id,
588                instrument,
589                previous_fee,
590                previous_filled_qty,
591                previous_state,
592                ts_init,
593            ) {
594                Ok(event) => {
595                    update_order_caches(
596                        msg,
597                        instrument,
598                        client_order_id,
599                        fee_cache,
600                        filled_qty_cache,
601                        order_state_cache,
602                    );
603                    dispatch_parsed_order_event(
604                        event,
605                        client_order_id,
606                        account_id,
607                        VenueOrderId::new(msg.ord_id),
608                        &ident,
609                        instrument,
610                        msg.state,
611                        emitter,
612                        state,
613                        order_state_cache,
614                        ts_init,
615                    );
616                }
617                Err(e) => log::error!("Failed to parse order event for {client_order_id}: {e}"),
618            }
619        } else {
620            log::debug!(
621                "Untracked order {client_order_id} (ord_id={}), sending as report for reconciliation",
622                msg.ord_id
623            );
624            dispatch_order_msg_as_report(
625                msg,
626                account_id,
627                instruments,
628                fee_cache,
629                filled_qty_cache,
630                emitter,
631                state,
632                ts_init,
633            );
634        }
635    }
636}
637
638/// Dispatches a parsed order event as a proper `OrderEventAny`.
639///
640/// Guarantees the `Submitted -> Accepted -> ...` lifecycle by synthesizing
641/// `OrderAccepted` before any other event when one has not yet been emitted.
642/// Duplicate `Accepted` events (e.g. from reconnect replays) are suppressed.
643#[expect(clippy::too_many_arguments)]
644fn dispatch_parsed_order_event(
645    event: ParsedOrderEvent,
646    client_order_id: ClientOrderId,
647    account_id: AccountId,
648    venue_order_id: VenueOrderId,
649    identity: &OrderIdentity,
650    instrument: &InstrumentAny,
651    venue_status: OKXOrderStatus,
652    emitter: &ExecutionEventEmitter,
653    state: &WsDispatchState,
654    order_state_cache: &mut AHashMap<ClientOrderId, OrderStateSnapshot>,
655    ts_init: UnixNanos,
656) {
657    let is_terminal;
658
659    match event {
660        ParsedOrderEvent::Accepted(e) => {
661            if state.emitted_accepted.contains(&client_order_id)
662                || state.filled_orders.contains(&client_order_id)
663                || state.triggered_orders.contains(&client_order_id)
664            {
665                log::debug!("Skipping duplicate Accepted for {client_order_id}");
666                return;
667            }
668            state.insert_accepted(client_order_id);
669            is_terminal = false;
670            emitter.send_order_event(OrderEventAny::Accepted(e));
671        }
672        ParsedOrderEvent::Triggered(e) => {
673            if state.filled_orders.contains(&client_order_id) {
674                log::debug!("Skipping stale Triggered for {client_order_id} (already filled)");
675                return;
676            }
677
678            if !TRIGGERABLE_ORDER_TYPES.contains(&identity.order_type) {
679                log::debug!(
680                    "Skipping OrderTriggered for {} order {client_order_id}: market-style stops have no TRIGGERED state",
681                    identity.order_type,
682                );
683                state.insert_triggered(client_order_id);
684                return;
685            }
686
687            ensure_accepted_emitted(
688                client_order_id,
689                account_id,
690                venue_order_id,
691                identity,
692                emitter,
693                state,
694                ts_init,
695            );
696            state.insert_triggered(client_order_id);
697            is_terminal = false;
698            emitter.send_order_event(OrderEventAny::Triggered(e));
699        }
700        ParsedOrderEvent::Canceled(e) => {
701            ensure_accepted_emitted(
702                client_order_id,
703                account_id,
704                venue_order_id,
705                identity,
706                emitter,
707                state,
708                ts_init,
709            );
710            state.triggered_orders.remove(&client_order_id);
711            state.filled_orders.remove(&client_order_id);
712            is_terminal = true;
713            emitter.send_order_event(OrderEventAny::Canceled(e));
714        }
715        ParsedOrderEvent::Expired(e) => {
716            ensure_accepted_emitted(
717                client_order_id,
718                account_id,
719                venue_order_id,
720                identity,
721                emitter,
722                state,
723                ts_init,
724            );
725            state.triggered_orders.remove(&client_order_id);
726            state.filled_orders.remove(&client_order_id);
727            is_terminal = true;
728            emitter.send_order_event(OrderEventAny::Expired(e));
729        }
730        ParsedOrderEvent::Updated(e) => {
731            ensure_accepted_emitted(
732                client_order_id,
733                account_id,
734                venue_order_id,
735                identity,
736                emitter,
737                state,
738                ts_init,
739            );
740            is_terminal = false;
741            emitter.send_order_event(OrderEventAny::Updated(e));
742        }
743        ParsedOrderEvent::Fill(fill_report) => {
744            let is_duplicate = state.check_and_insert_trade(fill_report.trade_id);
745            is_terminal = venue_status == OKXOrderStatus::Filled;
746
747            if is_duplicate {
748                log::debug!(
749                    "Skipping duplicate fill for {client_order_id}: trade_id={}",
750                    fill_report.trade_id
751                );
752            } else {
753                ensure_accepted_emitted(
754                    client_order_id,
755                    account_id,
756                    venue_order_id,
757                    identity,
758                    emitter,
759                    state,
760                    ts_init,
761                );
762                state.insert_filled(client_order_id);
763                state.triggered_orders.remove(&client_order_id);
764                let filled = fill_report_to_order_filled(
765                    &fill_report,
766                    emitter.trader_id(),
767                    identity,
768                    instrument.quote_currency(),
769                );
770                emitter.send_order_event(OrderEventAny::Filled(filled));
771            }
772        }
773        ParsedOrderEvent::StatusOnly(report) => {
774            is_terminal = matches!(
775                report.order_status,
776                OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Expired
777            );
778            emitter.send_order_status_report(*report);
779        }
780        ParsedOrderEvent::Skipped => return,
781    }
782
783    if is_terminal {
784        state.order_identities.remove(&client_order_id);
785        state.emitted_accepted.remove(&client_order_id);
786        order_state_cache.remove(&client_order_id);
787        // Keep fee_cache and filled_qty_cache entries: replayed terminal
788        // messages go through the untracked report path and need prior
789        // cumulative state to avoid re-emitting the full fill quantity
790    }
791}
792
793/// Synthesizes and emits `OrderAccepted` if one has not yet been emitted for
794/// this order. Handles fast-filling orders that skip the `Live` state on OKX.
795fn ensure_accepted_emitted(
796    client_order_id: ClientOrderId,
797    account_id: AccountId,
798    venue_order_id: VenueOrderId,
799    identity: &OrderIdentity,
800    emitter: &ExecutionEventEmitter,
801    state: &WsDispatchState,
802    ts_init: UnixNanos,
803) {
804    if state.emitted_accepted.contains(&client_order_id) {
805        return;
806    }
807    state.insert_accepted(client_order_id);
808    let accepted = OrderAccepted::new(
809        emitter.trader_id(),
810        identity.strategy_id,
811        identity.instrument_id,
812        client_order_id,
813        venue_order_id,
814        account_id,
815        UUID4::new(),
816        ts_init,
817        ts_init,
818        false,
819    );
820    emitter.send_order_event(OrderEventAny::Accepted(accepted));
821}
822
823/// Converts a [`FillReport`] into an [`OrderFilled`] event using tracked identity.
824fn fill_report_to_order_filled(
825    report: &FillReport,
826    trader_id: TraderId,
827    identity: &OrderIdentity,
828    quote_currency: Currency,
829) -> OrderFilled {
830    OrderFilled::new(
831        trader_id,
832        identity.strategy_id,
833        report.instrument_id,
834        report
835            .client_order_id
836            .expect("tracked order has client_order_id"),
837        report.venue_order_id,
838        report.account_id,
839        report.trade_id,
840        identity.order_side,
841        identity.order_type,
842        report.last_qty,
843        report.last_px,
844        quote_currency,
845        report.liquidity_side,
846        UUID4::new(),
847        report.ts_event,
848        report.ts_init,
849        false,
850        report.venue_position_id,
851        Some(report.commission),
852    )
853}
854
855/// Falls back to the report path for a single order message.
856#[expect(clippy::too_many_arguments)]
857fn dispatch_order_msg_as_report(
858    msg: &OKXOrderMsg,
859    account_id: AccountId,
860    instruments: &AHashMap<Ustr, InstrumentAny>,
861    fee_cache: &mut AHashMap<Ustr, Money>,
862    filled_qty_cache: &mut AHashMap<Ustr, Quantity>,
863    emitter: &ExecutionEventEmitter,
864    state: &WsDispatchState,
865    ts_init: UnixNanos,
866) {
867    match parse_order_msg(
868        msg,
869        account_id,
870        instruments,
871        fee_cache,
872        filled_qty_cache,
873        ts_init,
874    ) {
875        Ok(report) => {
876            if let Some(instrument) = instruments.get(&msg.inst_id) {
877                update_fee_fill_caches(msg, instrument, fee_cache, filled_qty_cache);
878            }
879            dispatch_execution_reports(vec![report], emitter, state);
880        }
881        Err(e) => log::error!("Failed to parse order message as report: {e}"),
882    }
883}
884
885/// Updates fee, fill, and order state caches from a raw OKX order message.
886fn update_order_caches(
887    msg: &OKXOrderMsg,
888    instrument: &InstrumentAny,
889    client_order_id: ClientOrderId,
890    fee_cache: &mut AHashMap<Ustr, Money>,
891    filled_qty_cache: &mut AHashMap<Ustr, Quantity>,
892    order_state_cache: &mut AHashMap<ClientOrderId, OrderStateSnapshot>,
893) {
894    update_fee_fill_caches(msg, instrument, fee_cache, filled_qty_cache);
895
896    let venue_order_id = VenueOrderId::new(msg.ord_id);
897    let quantity = parse_quantity(&msg.sz, instrument.size_precision()).unwrap_or_default();
898    let price = if is_market_price(&msg.px) {
899        None
900    } else {
901        parse_price(&msg.px, instrument.price_precision()).ok()
902    };
903
904    order_state_cache.insert(
905        client_order_id,
906        OrderStateSnapshot {
907            venue_order_id,
908            quantity,
909            price,
910        },
911    );
912}
913
914/// Dispatches execution reports with cross-stream deduplication.
915pub fn dispatch_execution_reports(
916    reports: Vec<ExecutionReport>,
917    emitter: &ExecutionEventEmitter,
918    state: &WsDispatchState,
919) {
920    log::debug!("Processing {} execution report(s)", reports.len());
921
922    for report in reports {
923        match report {
924            ExecutionReport::Order(order_report) => {
925                if let Some(cid) = order_report.client_order_id {
926                    match order_report.order_status {
927                        // Guard form reformats awkwardly across multiple lines
928                        #[expect(clippy::collapsible_match)]
929                        OrderStatus::Accepted => {
930                            if state.filled_orders.contains(&cid)
931                                || state.triggered_orders.contains(&cid)
932                            {
933                                log::debug!(
934                                    "Skipping stale OrderStatusReport(Accepted) \
935                                     for {cid} (already triggered/filled)"
936                                );
937                                continue;
938                            }
939                        }
940                        OrderStatus::Triggered => {
941                            if state.filled_orders.contains(&cid) {
942                                log::debug!(
943                                    "Skipping stale OrderStatusReport(Triggered) \
944                                     for {cid} (already filled)"
945                                );
946                                continue;
947                            }
948                            state.insert_triggered(cid);
949                        }
950                        OrderStatus::Filled => {
951                            state.insert_filled(cid);
952                            state.triggered_orders.remove(&cid);
953                        }
954                        OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected => {
955                            state.triggered_orders.remove(&cid);
956                            state.filled_orders.remove(&cid);
957                        }
958                        _ => {}
959                    }
960                }
961                emitter.send_order_status_report(order_report);
962            }
963            ExecutionReport::Fill(fill_report) => {
964                if state.check_and_insert_trade(fill_report.trade_id) {
965                    log::debug!(
966                        "Skipping duplicate fill report: trade_id={}",
967                        fill_report.trade_id
968                    );
969                    continue;
970                }
971
972                if let Some(cid) = fill_report.client_order_id {
973                    state.insert_filled(cid);
974                    state.triggered_orders.remove(&cid);
975                }
976                emitter.send_fill_report(fill_report);
977            }
978        }
979    }
980}
981
982#[derive(Debug, Clone)]
983pub struct AlgoCancelContext {
984    pub client_order_id: ClientOrderId,
985    pub instrument_id: InstrumentId,
986    pub strategy_id: StrategyId,
987    pub venue_order_id: Option<VenueOrderId>,
988}
989
990// Contexts must correspond 1:1 with the requests that produced
991// the responses (OKX preserves request order in batch responses).
992pub fn emit_algo_cancel_rejections(
993    responses: &[OKXCancelAlgoOrderResponse],
994    contexts: &[AlgoCancelContext],
995    emitter: &ExecutionEventEmitter,
996    clock: &'static AtomicTime,
997) {
998    for (i, item) in responses.iter().enumerate() {
999        let code = item.s_code.as_deref().unwrap_or(OKX_SUCCESS_CODE);
1000        if code == OKX_SUCCESS_CODE {
1001            continue;
1002        }
1003
1004        let msg = item.s_msg.as_deref().unwrap_or("");
1005
1006        if let Some(ctx) = contexts.get(i) {
1007            let ts = clock.get_time_ns();
1008            emitter.emit_order_cancel_rejected_event(
1009                ctx.strategy_id,
1010                ctx.instrument_id,
1011                ctx.client_order_id,
1012                ctx.venue_order_id,
1013                msg,
1014                ts,
1015            );
1016        } else {
1017            log::warn!(
1018                "Algo cancel rejected but no context at index {i}: \
1019                 algo_id={} sCode={code} sMsg={msg}",
1020                item.algo_id
1021            );
1022        }
1023    }
1024}
1025
1026pub fn emit_batch_cancel_failure(
1027    contexts: &[AlgoCancelContext],
1028    error: &str,
1029    emitter: &ExecutionEventEmitter,
1030    clock: &'static AtomicTime,
1031) {
1032    for ctx in contexts {
1033        let ts = clock.get_time_ns();
1034        emitter.emit_order_cancel_rejected_event(
1035            ctx.strategy_id,
1036            ctx.instrument_id,
1037            ctx.client_order_id,
1038            ctx.venue_order_id,
1039            error,
1040            ts,
1041        );
1042    }
1043}