1use std::sync::{
24 Arc,
25 atomic::{AtomicBool, Ordering},
26};
27
28use ahash::AHashMap;
29use dashmap::{DashMap, DashSet};
30use nautilus_core::{UUID4, UnixNanos, time::AtomicTime};
31use nautilus_live::ExecutionEventEmitter;
32use nautilus_model::{
33 enums::{OrderSide, OrderStatus, OrderType},
34 events::{OrderAccepted, OrderEventAny, OrderFilled, OrderRejected},
35 identifiers::{
36 AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
37 },
38 instruments::{Instrument, InstrumentAny},
39 orders::TRIGGERABLE_ORDER_TYPES,
40 reports::FillReport,
41 types::{Currency, Money, Quantity},
42};
43use ustr::Ustr;
44
45use crate::{
46 common::{
47 consts::{OKX_FIELD_CLORDID, OKX_FIELD_SCODE, OKX_FIELD_SMSG, OKX_SUCCESS_CODE},
48 enums::OKXOrderStatus,
49 parse::{
50 is_market_price, parse_client_order_id, parse_millisecond_timestamp, parse_price,
51 parse_quantity,
52 },
53 },
54 http::models::{OKXAccount, OKXCancelAlgoOrderResponse, OKXPosition},
55 websocket::{
56 client::PendingOrderInfo,
57 enums::OKXWsOperation,
58 handler::is_post_only_auto_cancel,
59 messages::{ExecutionReport, OKXOrderMsg, OKXWsMessage},
60 parse::{
61 OrderStateSnapshot, ParsedOrderEvent, parse_algo_order_msg, parse_order_event,
62 parse_order_msg, update_fee_fill_caches,
63 },
64 },
65};
66
67const DEDUP_CAPACITY: usize = 10_000;
69
70#[derive(Debug, Clone)]
77pub struct OrderIdentity {
78 pub instrument_id: InstrumentId,
79 pub strategy_id: StrategyId,
80 pub order_side: OrderSide,
81 pub order_type: OrderType,
82}
83
84#[derive(Debug)]
90pub struct WsDispatchState {
91 pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
92 pub emitted_accepted: DashSet<ClientOrderId>,
93 pub triggered_orders: DashSet<ClientOrderId>,
94 pub filled_orders: DashSet<ClientOrderId>,
95 pub emitted_trades: DashSet<TradeId>,
96 pub(crate) pending_orders: Arc<DashMap<String, PendingOrderInfo>>,
97 pub(crate) pending_cancels: Arc<DashMap<String, PendingOrderInfo>>,
98 pub(crate) pending_amends: Arc<DashMap<String, PendingOrderInfo>>,
99 clearing: AtomicBool,
100}
101
102impl Default for WsDispatchState {
103 fn default() -> Self {
104 Self {
105 order_identities: DashMap::new(),
106 emitted_accepted: DashSet::default(),
107 triggered_orders: DashSet::default(),
108 filled_orders: DashSet::default(),
109 emitted_trades: DashSet::default(),
110 pending_orders: Arc::new(DashMap::new()),
111 pending_cancels: Arc::new(DashMap::new()),
112 pending_amends: Arc::new(DashMap::new()),
113 clearing: AtomicBool::new(false),
114 }
115 }
116}
117
118impl WsDispatchState {
119 pub(crate) fn with_pending_maps(
122 pending_orders: Arc<DashMap<String, PendingOrderInfo>>,
123 pending_cancels: Arc<DashMap<String, PendingOrderInfo>>,
124 pending_amends: Arc<DashMap<String, PendingOrderInfo>>,
125 ) -> Self {
126 Self {
127 pending_orders,
128 pending_cancels,
129 pending_amends,
130 ..Default::default()
131 }
132 }
133}
134
135impl WsDispatchState {
136 fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
137 if set.len() >= DEDUP_CAPACITY
138 && self
139 .clearing
140 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
141 .is_ok()
142 {
143 set.clear();
144 self.clearing.store(false, Ordering::Release);
145 }
146 }
147
148 pub(crate) fn insert_accepted(&self, cid: ClientOrderId) {
149 self.evict_if_full(&self.emitted_accepted);
150 self.emitted_accepted.insert(cid);
151 }
152
153 pub(crate) fn insert_filled(&self, cid: ClientOrderId) {
154 self.evict_if_full(&self.filled_orders);
155 self.filled_orders.insert(cid);
156 }
157
158 pub(crate) fn insert_triggered(&self, cid: ClientOrderId) {
159 self.evict_if_full(&self.triggered_orders);
160 self.triggered_orders.insert(cid);
161 }
162
163 pub fn check_and_insert_trade(&self, trade_id: TradeId) -> bool {
166 self.evict_if_full_trades();
167 !self.emitted_trades.insert(trade_id)
168 }
169
170 fn evict_if_full_trades(&self) {
171 if self.emitted_trades.len() >= DEDUP_CAPACITY
172 && self
173 .clearing
174 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
175 .is_ok()
176 {
177 self.emitted_trades.clear();
178 self.clearing.store(false, Ordering::Release);
179 }
180 }
181}
182
183#[expect(clippy::too_many_arguments)]
190pub fn dispatch_ws_message(
191 message: OKXWsMessage,
192 emitter: &ExecutionEventEmitter,
193 state: &WsDispatchState,
194 account_id: AccountId,
195 instruments: &AHashMap<Ustr, InstrumentAny>,
196 fee_cache: &mut AHashMap<Ustr, Money>,
197 filled_qty_cache: &mut AHashMap<Ustr, Quantity>,
198 order_state_cache: &mut AHashMap<ClientOrderId, OrderStateSnapshot>,
199 clock: &AtomicTime,
200) {
201 match message {
202 OKXWsMessage::Orders(order_msgs) => {
203 let ts_init = clock.get_time_ns();
204 dispatch_order_messages(
205 &order_msgs,
206 emitter,
207 state,
208 account_id,
209 instruments,
210 fee_cache,
211 filled_qty_cache,
212 order_state_cache,
213 ts_init,
214 );
215 }
216 OKXWsMessage::AlgoOrders(algo_msgs) => {
217 let ts_init = clock.get_time_ns();
218 let mut reports = Vec::new();
219
220 for msg in algo_msgs {
221 match parse_algo_order_msg(&msg, account_id, instruments, ts_init) {
222 Ok(Some(report)) => reports.push(report),
223 Ok(None) => {}
224 Err(e) => log::error!("Failed to parse algo order message: {e}"),
225 }
226 }
227 dispatch_execution_reports(reports, emitter, state);
228 }
229 OKXWsMessage::Account(data) => {
230 let ts_init = clock.get_time_ns();
231
232 match serde_json::from_value::<Vec<OKXAccount>>(data) {
233 Ok(accounts) => {
234 for account in &accounts {
235 match crate::common::parse::parse_account_state(
236 account, account_id, ts_init,
237 ) {
238 Ok(account_state) => emitter.send_account_state(account_state),
239 Err(e) => log::error!("Failed to parse account state: {e}"),
240 }
241 }
242 }
243 Err(e) => log::error!("Failed to deserialize account data: {e}"),
244 }
245 }
246 OKXWsMessage::Positions(data) => {
247 let ts_init = clock.get_time_ns();
248
249 match serde_json::from_value::<Vec<OKXPosition>>(data) {
250 Ok(positions) => {
251 for position in positions {
252 let Some(instrument) = instruments.get(&position.inst_id) else {
253 log::warn!("No cached instrument for position: {}", position.inst_id);
254 continue;
255 };
256 let instrument_id = instrument.id();
257 let size_precision = instrument.size_precision();
258
259 match crate::common::parse::parse_position_status_report(
260 &position,
261 account_id,
262 instrument_id,
263 size_precision,
264 ts_init,
265 ) {
266 Ok(report) => emitter.send_position_report(report),
267 Err(e) => log::error!("Failed to parse position report: {e}"),
268 }
269 }
270 }
271 Err(e) => log::error!("Failed to deserialize positions data: {e}"),
272 }
273 }
274 OKXWsMessage::OrderResponse {
275 id,
276 op,
277 code,
278 msg,
279 data,
280 } => {
281 let ts_init = clock.get_time_ns();
282
283 for item in &data {
284 let s_code = item
285 .get(OKX_FIELD_SCODE)
286 .and_then(|v| v.as_str())
287 .unwrap_or("");
288 let s_msg = item
289 .get(OKX_FIELD_SMSG)
290 .and_then(|v| v.as_str())
291 .unwrap_or("");
292 let cl_ord_id = item
293 .get(OKX_FIELD_CLORDID)
294 .and_then(|v| v.as_str())
295 .unwrap_or("");
296
297 if s_code == OKX_SUCCESS_CODE {
298 log::debug!("Order response ok: op={op:?} cl_ord_id={cl_ord_id}");
299 match op {
300 OKXWsOperation::Order
301 | OKXWsOperation::BatchOrders
302 | OKXWsOperation::OrderAlgo => {
303 state.pending_orders.remove(cl_ord_id);
304 }
305 OKXWsOperation::CancelOrder
306 | OKXWsOperation::BatchCancelOrders
307 | OKXWsOperation::MassCancel
308 | OKXWsOperation::CancelAlgos => {
309 state.pending_cancels.remove(cl_ord_id);
310 }
311 OKXWsOperation::AmendOrder | OKXWsOperation::BatchAmendOrders => {
312 state.pending_amends.remove(cl_ord_id);
313 }
314 _ => {}
315 }
316 continue;
317 }
318
319 let Some(client_order_id) = parse_client_order_id(cl_ord_id) else {
320 log::warn!(
321 "Order response error without client_order_id: \
322 op={op:?} s_code={s_code} s_msg={s_msg}"
323 );
324 continue;
325 };
326
327 let Some(ident) = state.order_identities.get(&client_order_id) else {
328 log::warn!(
329 "Order response error for untracked order: \
330 op={op:?} cl_ord_id={cl_ord_id} s_code={s_code} s_msg={s_msg}"
331 );
332 continue;
333 };
334
335 let venue_order_id = item
336 .get("ordId")
337 .and_then(|v| v.as_str())
338 .filter(|s| !s.is_empty())
339 .map(VenueOrderId::new);
340
341 match op {
342 OKXWsOperation::Order | OKXWsOperation::BatchOrders => {
343 state.order_identities.remove(&client_order_id);
344 state.pending_orders.remove(cl_ord_id);
345 emitter.emit_order_rejected_event(
346 ident.strategy_id,
347 ident.instrument_id,
348 client_order_id,
349 s_msg,
350 ts_init,
351 false,
352 );
353 }
354 OKXWsOperation::CancelOrder
355 | OKXWsOperation::BatchCancelOrders
356 | OKXWsOperation::MassCancel => {
357 state.pending_cancels.remove(cl_ord_id);
358 emitter.emit_order_cancel_rejected_event(
359 ident.strategy_id,
360 ident.instrument_id,
361 client_order_id,
362 venue_order_id,
363 s_msg,
364 ts_init,
365 );
366 }
367 OKXWsOperation::AmendOrder | OKXWsOperation::BatchAmendOrders => {
368 state.pending_amends.remove(cl_ord_id);
369 emitter.emit_order_modify_rejected_event(
370 ident.strategy_id,
371 ident.instrument_id,
372 client_order_id,
373 venue_order_id,
374 s_msg,
375 ts_init,
376 );
377 }
378 _ => {
379 log::warn!(
380 "Order response error for unhandled op: \
381 op={op:?} cl_ord_id={cl_ord_id} s_code={s_code} s_msg={s_msg}"
382 );
383 }
384 }
385 }
386
387 if code != "0" && data.is_empty() {
388 log::warn!(
389 "Order response error (no data): id={id:?} op={op:?} code={code} msg={msg}"
390 );
391 }
392 }
393 OKXWsMessage::SendFailed {
394 request_id,
395 client_order_id,
396 op,
397 error,
398 } => {
399 log::error!("WebSocket send failed: request_id={request_id} error={error}");
400
401 if let Some(client_order_id) = client_order_id {
402 let ts_init = clock.get_time_ns();
403
404 match op {
405 Some(
406 OKXWsOperation::Order
407 | OKXWsOperation::BatchOrders
408 | OKXWsOperation::OrderAlgo,
409 ) => {
410 let key = client_order_id.as_str();
411 state.pending_orders.remove(key);
412 if let Some((_, ident)) = state.order_identities.remove(&client_order_id) {
413 emitter.emit_order_rejected_event(
414 ident.strategy_id,
415 ident.instrument_id,
416 client_order_id,
417 &error,
418 ts_init,
419 false,
420 );
421 }
422 }
423 Some(
424 OKXWsOperation::CancelOrder
425 | OKXWsOperation::BatchCancelOrders
426 | OKXWsOperation::MassCancel
427 | OKXWsOperation::CancelAlgos,
428 ) => {
429 let key = client_order_id.as_str();
430 state.pending_cancels.remove(key);
431 if let Some(ident) = state.order_identities.get(&client_order_id) {
432 emitter.emit_order_cancel_rejected_event(
433 ident.strategy_id,
434 ident.instrument_id,
435 client_order_id,
436 None,
437 &error,
438 ts_init,
439 );
440 }
441 }
442 Some(OKXWsOperation::AmendOrder | OKXWsOperation::BatchAmendOrders) => {
443 let key = client_order_id.as_str();
444 state.pending_amends.remove(key);
445 if let Some(ident) = state.order_identities.get(&client_order_id) {
446 emitter.emit_order_modify_rejected_event(
447 ident.strategy_id,
448 ident.instrument_id,
449 client_order_id,
450 None,
451 &error,
452 ts_init,
453 );
454 }
455 }
456 _ => {
457 log::warn!(
458 "SendFailed for {client_order_id} with unknown op, cannot emit rejection"
459 );
460 }
461 }
462 }
463 }
464 OKXWsMessage::ChannelData { channel, .. } => {
465 log::debug!("Ignoring data channel message on execution client: {channel:?}");
466 }
467 OKXWsMessage::BookData { .. } | OKXWsMessage::Instruments(_) => {
468 log::debug!("Ignoring data message on execution client");
469 }
470 OKXWsMessage::Error(e) => {
471 log::warn!(
472 "Websocket error: code={} message={} conn_id={:?}",
473 e.code,
474 e.message,
475 e.conn_id
476 );
477 }
478 OKXWsMessage::Reconnected => {
479 log::info!("Websocket reconnected");
480 }
481 OKXWsMessage::Authenticated => {
482 log::debug!("Websocket authenticated");
483 }
484 }
485}
486
487#[expect(clippy::too_many_arguments)]
490fn dispatch_order_messages(
491 order_msgs: &[OKXOrderMsg],
492 emitter: &ExecutionEventEmitter,
493 state: &WsDispatchState,
494 account_id: AccountId,
495 instruments: &AHashMap<Ustr, InstrumentAny>,
496 fee_cache: &mut AHashMap<Ustr, Money>,
497 filled_qty_cache: &mut AHashMap<Ustr, Quantity>,
498 order_state_cache: &mut AHashMap<ClientOrderId, OrderStateSnapshot>,
499 ts_init: UnixNanos,
500) {
501 for msg in order_msgs {
502 let Some(instrument) = instruments.get(&msg.inst_id) else {
503 log::warn!("No instrument for {}, skipping order message", msg.inst_id);
504 continue;
505 };
506
507 let Some(client_order_id) = parse_client_order_id(&msg.cl_ord_id) else {
508 log::debug!(
509 "Order without client_order_id (ord_id={}), sending as report",
510 msg.ord_id
511 );
512 dispatch_order_msg_as_report(
513 msg,
514 account_id,
515 instruments,
516 fee_cache,
517 filled_qty_cache,
518 emitter,
519 state,
520 ts_init,
521 );
522 continue;
523 };
524
525 let (client_order_id, identity) = match state
530 .order_identities
531 .get(&client_order_id)
532 .map(|r| r.clone())
533 {
534 Some(ident) => (client_order_id, Some(ident)),
535 None => {
536 if let Some(parent_id) = msg
537 .algo_cl_ord_id
538 .as_deref()
539 .and_then(parse_client_order_id)
540 {
541 let parent_ident = state.order_identities.get(&parent_id).map(|r| r.clone());
542
543 if parent_ident.is_some() {
544 (parent_id, parent_ident)
545 } else {
546 (client_order_id, None)
547 }
548 } else {
549 (client_order_id, None)
550 }
551 }
552 };
553
554 if let Some(ident) = identity {
555 if is_post_only_auto_cancel(msg) {
556 let ts_event = parse_millisecond_timestamp(msg.u_time);
557 let rejected = OrderRejected::new(
558 emitter.trader_id(),
559 ident.strategy_id,
560 instrument.id(),
561 client_order_id,
562 account_id,
563 Ustr::from("Post-only order would have taken liquidity"),
564 UUID4::new(),
565 ts_event,
566 ts_init,
567 false,
568 true, );
570 state.order_identities.remove(&client_order_id);
571 order_state_cache.remove(&client_order_id);
572 fee_cache.remove(&msg.ord_id);
573 filled_qty_cache.remove(&msg.ord_id);
574 emitter.send_order_event(OrderEventAny::Rejected(rejected));
575 continue;
576 }
577
578 let previous_fee = fee_cache.get(&msg.ord_id).copied();
579 let previous_filled_qty = filled_qty_cache.get(&msg.ord_id).copied();
580 let previous_state = order_state_cache.get(&client_order_id);
581
582 match parse_order_event(
583 msg,
584 client_order_id,
585 account_id,
586 emitter.trader_id(),
587 ident.strategy_id,
588 instrument,
589 previous_fee,
590 previous_filled_qty,
591 previous_state,
592 ts_init,
593 ) {
594 Ok(event) => {
595 update_order_caches(
596 msg,
597 instrument,
598 client_order_id,
599 fee_cache,
600 filled_qty_cache,
601 order_state_cache,
602 );
603 dispatch_parsed_order_event(
604 event,
605 client_order_id,
606 account_id,
607 VenueOrderId::new(msg.ord_id),
608 &ident,
609 instrument,
610 msg.state,
611 emitter,
612 state,
613 order_state_cache,
614 ts_init,
615 );
616 }
617 Err(e) => log::error!("Failed to parse order event for {client_order_id}: {e}"),
618 }
619 } else {
620 log::debug!(
621 "Untracked order {client_order_id} (ord_id={}), sending as report for reconciliation",
622 msg.ord_id
623 );
624 dispatch_order_msg_as_report(
625 msg,
626 account_id,
627 instruments,
628 fee_cache,
629 filled_qty_cache,
630 emitter,
631 state,
632 ts_init,
633 );
634 }
635 }
636}
637
638#[expect(clippy::too_many_arguments)]
644fn dispatch_parsed_order_event(
645 event: ParsedOrderEvent,
646 client_order_id: ClientOrderId,
647 account_id: AccountId,
648 venue_order_id: VenueOrderId,
649 identity: &OrderIdentity,
650 instrument: &InstrumentAny,
651 venue_status: OKXOrderStatus,
652 emitter: &ExecutionEventEmitter,
653 state: &WsDispatchState,
654 order_state_cache: &mut AHashMap<ClientOrderId, OrderStateSnapshot>,
655 ts_init: UnixNanos,
656) {
657 let is_terminal;
658
659 match event {
660 ParsedOrderEvent::Accepted(e) => {
661 if state.emitted_accepted.contains(&client_order_id)
662 || state.filled_orders.contains(&client_order_id)
663 || state.triggered_orders.contains(&client_order_id)
664 {
665 log::debug!("Skipping duplicate Accepted for {client_order_id}");
666 return;
667 }
668 state.insert_accepted(client_order_id);
669 is_terminal = false;
670 emitter.send_order_event(OrderEventAny::Accepted(e));
671 }
672 ParsedOrderEvent::Triggered(e) => {
673 if state.filled_orders.contains(&client_order_id) {
674 log::debug!("Skipping stale Triggered for {client_order_id} (already filled)");
675 return;
676 }
677
678 if !TRIGGERABLE_ORDER_TYPES.contains(&identity.order_type) {
679 log::debug!(
680 "Skipping OrderTriggered for {} order {client_order_id}: market-style stops have no TRIGGERED state",
681 identity.order_type,
682 );
683 state.insert_triggered(client_order_id);
684 return;
685 }
686
687 ensure_accepted_emitted(
688 client_order_id,
689 account_id,
690 venue_order_id,
691 identity,
692 emitter,
693 state,
694 ts_init,
695 );
696 state.insert_triggered(client_order_id);
697 is_terminal = false;
698 emitter.send_order_event(OrderEventAny::Triggered(e));
699 }
700 ParsedOrderEvent::Canceled(e) => {
701 ensure_accepted_emitted(
702 client_order_id,
703 account_id,
704 venue_order_id,
705 identity,
706 emitter,
707 state,
708 ts_init,
709 );
710 state.triggered_orders.remove(&client_order_id);
711 state.filled_orders.remove(&client_order_id);
712 is_terminal = true;
713 emitter.send_order_event(OrderEventAny::Canceled(e));
714 }
715 ParsedOrderEvent::Expired(e) => {
716 ensure_accepted_emitted(
717 client_order_id,
718 account_id,
719 venue_order_id,
720 identity,
721 emitter,
722 state,
723 ts_init,
724 );
725 state.triggered_orders.remove(&client_order_id);
726 state.filled_orders.remove(&client_order_id);
727 is_terminal = true;
728 emitter.send_order_event(OrderEventAny::Expired(e));
729 }
730 ParsedOrderEvent::Updated(e) => {
731 ensure_accepted_emitted(
732 client_order_id,
733 account_id,
734 venue_order_id,
735 identity,
736 emitter,
737 state,
738 ts_init,
739 );
740 is_terminal = false;
741 emitter.send_order_event(OrderEventAny::Updated(e));
742 }
743 ParsedOrderEvent::Fill(fill_report) => {
744 let is_duplicate = state.check_and_insert_trade(fill_report.trade_id);
745 is_terminal = venue_status == OKXOrderStatus::Filled;
746
747 if is_duplicate {
748 log::debug!(
749 "Skipping duplicate fill for {client_order_id}: trade_id={}",
750 fill_report.trade_id
751 );
752 } else {
753 ensure_accepted_emitted(
754 client_order_id,
755 account_id,
756 venue_order_id,
757 identity,
758 emitter,
759 state,
760 ts_init,
761 );
762 state.insert_filled(client_order_id);
763 state.triggered_orders.remove(&client_order_id);
764 let filled = fill_report_to_order_filled(
765 &fill_report,
766 emitter.trader_id(),
767 identity,
768 instrument.quote_currency(),
769 );
770 emitter.send_order_event(OrderEventAny::Filled(filled));
771 }
772 }
773 ParsedOrderEvent::StatusOnly(report) => {
774 is_terminal = matches!(
775 report.order_status,
776 OrderStatus::Filled | OrderStatus::Canceled | OrderStatus::Expired
777 );
778 emitter.send_order_status_report(*report);
779 }
780 ParsedOrderEvent::Skipped => return,
781 }
782
783 if is_terminal {
784 state.order_identities.remove(&client_order_id);
785 state.emitted_accepted.remove(&client_order_id);
786 order_state_cache.remove(&client_order_id);
787 }
791}
792
793fn ensure_accepted_emitted(
796 client_order_id: ClientOrderId,
797 account_id: AccountId,
798 venue_order_id: VenueOrderId,
799 identity: &OrderIdentity,
800 emitter: &ExecutionEventEmitter,
801 state: &WsDispatchState,
802 ts_init: UnixNanos,
803) {
804 if state.emitted_accepted.contains(&client_order_id) {
805 return;
806 }
807 state.insert_accepted(client_order_id);
808 let accepted = OrderAccepted::new(
809 emitter.trader_id(),
810 identity.strategy_id,
811 identity.instrument_id,
812 client_order_id,
813 venue_order_id,
814 account_id,
815 UUID4::new(),
816 ts_init,
817 ts_init,
818 false,
819 );
820 emitter.send_order_event(OrderEventAny::Accepted(accepted));
821}
822
823fn fill_report_to_order_filled(
825 report: &FillReport,
826 trader_id: TraderId,
827 identity: &OrderIdentity,
828 quote_currency: Currency,
829) -> OrderFilled {
830 OrderFilled::new(
831 trader_id,
832 identity.strategy_id,
833 report.instrument_id,
834 report
835 .client_order_id
836 .expect("tracked order has client_order_id"),
837 report.venue_order_id,
838 report.account_id,
839 report.trade_id,
840 identity.order_side,
841 identity.order_type,
842 report.last_qty,
843 report.last_px,
844 quote_currency,
845 report.liquidity_side,
846 UUID4::new(),
847 report.ts_event,
848 report.ts_init,
849 false,
850 report.venue_position_id,
851 Some(report.commission),
852 )
853}
854
855#[expect(clippy::too_many_arguments)]
857fn dispatch_order_msg_as_report(
858 msg: &OKXOrderMsg,
859 account_id: AccountId,
860 instruments: &AHashMap<Ustr, InstrumentAny>,
861 fee_cache: &mut AHashMap<Ustr, Money>,
862 filled_qty_cache: &mut AHashMap<Ustr, Quantity>,
863 emitter: &ExecutionEventEmitter,
864 state: &WsDispatchState,
865 ts_init: UnixNanos,
866) {
867 match parse_order_msg(
868 msg,
869 account_id,
870 instruments,
871 fee_cache,
872 filled_qty_cache,
873 ts_init,
874 ) {
875 Ok(report) => {
876 if let Some(instrument) = instruments.get(&msg.inst_id) {
877 update_fee_fill_caches(msg, instrument, fee_cache, filled_qty_cache);
878 }
879 dispatch_execution_reports(vec![report], emitter, state);
880 }
881 Err(e) => log::error!("Failed to parse order message as report: {e}"),
882 }
883}
884
885fn update_order_caches(
887 msg: &OKXOrderMsg,
888 instrument: &InstrumentAny,
889 client_order_id: ClientOrderId,
890 fee_cache: &mut AHashMap<Ustr, Money>,
891 filled_qty_cache: &mut AHashMap<Ustr, Quantity>,
892 order_state_cache: &mut AHashMap<ClientOrderId, OrderStateSnapshot>,
893) {
894 update_fee_fill_caches(msg, instrument, fee_cache, filled_qty_cache);
895
896 let venue_order_id = VenueOrderId::new(msg.ord_id);
897 let quantity = parse_quantity(&msg.sz, instrument.size_precision()).unwrap_or_default();
898 let price = if is_market_price(&msg.px) {
899 None
900 } else {
901 parse_price(&msg.px, instrument.price_precision()).ok()
902 };
903
904 order_state_cache.insert(
905 client_order_id,
906 OrderStateSnapshot {
907 venue_order_id,
908 quantity,
909 price,
910 },
911 );
912}
913
914pub fn dispatch_execution_reports(
916 reports: Vec<ExecutionReport>,
917 emitter: &ExecutionEventEmitter,
918 state: &WsDispatchState,
919) {
920 log::debug!("Processing {} execution report(s)", reports.len());
921
922 for report in reports {
923 match report {
924 ExecutionReport::Order(order_report) => {
925 if let Some(cid) = order_report.client_order_id {
926 match order_report.order_status {
927 #[expect(clippy::collapsible_match)]
929 OrderStatus::Accepted => {
930 if state.filled_orders.contains(&cid)
931 || state.triggered_orders.contains(&cid)
932 {
933 log::debug!(
934 "Skipping stale OrderStatusReport(Accepted) \
935 for {cid} (already triggered/filled)"
936 );
937 continue;
938 }
939 }
940 OrderStatus::Triggered => {
941 if state.filled_orders.contains(&cid) {
942 log::debug!(
943 "Skipping stale OrderStatusReport(Triggered) \
944 for {cid} (already filled)"
945 );
946 continue;
947 }
948 state.insert_triggered(cid);
949 }
950 OrderStatus::Filled => {
951 state.insert_filled(cid);
952 state.triggered_orders.remove(&cid);
953 }
954 OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected => {
955 state.triggered_orders.remove(&cid);
956 state.filled_orders.remove(&cid);
957 }
958 _ => {}
959 }
960 }
961 emitter.send_order_status_report(order_report);
962 }
963 ExecutionReport::Fill(fill_report) => {
964 if state.check_and_insert_trade(fill_report.trade_id) {
965 log::debug!(
966 "Skipping duplicate fill report: trade_id={}",
967 fill_report.trade_id
968 );
969 continue;
970 }
971
972 if let Some(cid) = fill_report.client_order_id {
973 state.insert_filled(cid);
974 state.triggered_orders.remove(&cid);
975 }
976 emitter.send_fill_report(fill_report);
977 }
978 }
979 }
980}
981
982#[derive(Debug, Clone)]
983pub struct AlgoCancelContext {
984 pub client_order_id: ClientOrderId,
985 pub instrument_id: InstrumentId,
986 pub strategy_id: StrategyId,
987 pub venue_order_id: Option<VenueOrderId>,
988}
989
990pub fn emit_algo_cancel_rejections(
993 responses: &[OKXCancelAlgoOrderResponse],
994 contexts: &[AlgoCancelContext],
995 emitter: &ExecutionEventEmitter,
996 clock: &'static AtomicTime,
997) {
998 for (i, item) in responses.iter().enumerate() {
999 let code = item.s_code.as_deref().unwrap_or(OKX_SUCCESS_CODE);
1000 if code == OKX_SUCCESS_CODE {
1001 continue;
1002 }
1003
1004 let msg = item.s_msg.as_deref().unwrap_or("");
1005
1006 if let Some(ctx) = contexts.get(i) {
1007 let ts = clock.get_time_ns();
1008 emitter.emit_order_cancel_rejected_event(
1009 ctx.strategy_id,
1010 ctx.instrument_id,
1011 ctx.client_order_id,
1012 ctx.venue_order_id,
1013 msg,
1014 ts,
1015 );
1016 } else {
1017 log::warn!(
1018 "Algo cancel rejected but no context at index {i}: \
1019 algo_id={} sCode={code} sMsg={msg}",
1020 item.algo_id
1021 );
1022 }
1023 }
1024}
1025
1026pub fn emit_batch_cancel_failure(
1027 contexts: &[AlgoCancelContext],
1028 error: &str,
1029 emitter: &ExecutionEventEmitter,
1030 clock: &'static AtomicTime,
1031) {
1032 for ctx in contexts {
1033 let ts = clock.get_time_ns();
1034 emitter.emit_order_cancel_rejected_event(
1035 ctx.strategy_id,
1036 ctx.instrument_id,
1037 ctx.client_order_id,
1038 ctx.venue_order_id,
1039 error,
1040 ts,
1041 );
1042 }
1043}