Skip to main content

nautilus_bybit/websocket/
dispatch.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message dispatch for the Bybit execution client.
17//!
18//! Routes incoming [`BybitWsMessage`] variants to the appropriate parsing and
19//! event emission paths. Tracked orders (submitted through this client) produce
20//! proper order events; untracked orders fall back to execution reports for
21//! downstream reconciliation.
22
23use std::sync::atomic::{AtomicBool, Ordering};
24
25use ahash::AHashMap;
26use anyhow::Context;
27use dashmap::{DashMap, DashSet};
28use nautilus_core::{UUID4, UnixNanos, time::AtomicTime};
29use nautilus_live::ExecutionEventEmitter;
30use nautilus_model::{
31    enums::{LiquiditySide, OrderSide, OrderType},
32    events::{
33        OrderAccepted, OrderCanceled, OrderEventAny, OrderFilled, OrderTriggered, OrderUpdated,
34    },
35    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
36    instruments::{Instrument, InstrumentAny},
37    orders::TRIGGERABLE_ORDER_TYPES,
38    types::{Money, Price, Quantity},
39};
40use rust_decimal::Decimal;
41use ustr::Ustr;
42
43use super::{
44    messages::{BybitWsAccountExecution, BybitWsAccountOrder, BybitWsMessage},
45    parse::{parse_millis_i64, parse_ws_account_state, parse_ws_position_status_report},
46};
47use crate::common::{
48    enums::BybitOrderStatus,
49    parse::{
50        make_bybit_symbol, parse_millis_timestamp, parse_price_with_precision,
51        parse_quantity_with_precision,
52    },
53};
54
55const DEDUP_CAPACITY: usize = 10_000;
56
57const BYBIT_OP_ORDER_CREATE: &str = "order.create";
58const BYBIT_OP_ORDER_AMEND: &str = "order.amend";
59const BYBIT_OP_ORDER_CANCEL: &str = "order.cancel";
60
61/// Order identity context stored at submission time, used by the WS dispatch
62/// task to produce proper order events without Cache access.
63#[derive(Debug, Clone)]
64pub struct OrderIdentity {
65    pub instrument_id: InstrumentId,
66    pub strategy_id: StrategyId,
67    pub order_side: OrderSide,
68    pub order_type: OrderType,
69}
70
71/// Tracks which type of WS request is pending for a given req_id.
72#[derive(Debug, Clone, Copy)]
73pub enum PendingOperation {
74    Place,
75    Cancel,
76    Amend,
77}
78
79/// Shared state for cross-stream event deduplication between the private
80/// and trade WebSocket dispatch loops.
81pub type PendingRequestData = (
82    Vec<ClientOrderId>,
83    Vec<Option<VenueOrderId>>,
84    PendingOperation,
85);
86
87/// Snapshot of an order's price, quantity, and trigger price at last dispatch.
88/// Used to detect modifications when Bybit sends back an order with the same
89/// status but changed fields.
90#[derive(Debug, Clone)]
91pub struct OrderStateSnapshot {
92    pub quantity: Quantity,
93    pub price: Option<Price>,
94    pub trigger_price: Option<Price>,
95}
96
97#[derive(Debug)]
98pub struct WsDispatchState {
99    pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
100    pub pending_requests: DashMap<String, PendingRequestData>,
101    pub order_snapshots: DashMap<ClientOrderId, OrderStateSnapshot>,
102    pub emitted_accepted: DashSet<ClientOrderId>,
103    pub triggered_orders: DashSet<ClientOrderId>,
104    pub filled_orders: DashSet<ClientOrderId>,
105    clearing: AtomicBool,
106}
107
108impl Default for WsDispatchState {
109    fn default() -> Self {
110        Self {
111            order_identities: DashMap::new(),
112            pending_requests: DashMap::new(),
113            order_snapshots: DashMap::new(),
114            emitted_accepted: DashSet::default(),
115            triggered_orders: DashSet::default(),
116            filled_orders: DashSet::default(),
117            clearing: AtomicBool::new(false),
118        }
119    }
120}
121
122impl WsDispatchState {
123    fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
124        if set.len() >= DEDUP_CAPACITY
125            && self
126                .clearing
127                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
128                .is_ok()
129        {
130            set.clear();
131            self.clearing.store(false, Ordering::Release);
132        }
133    }
134
135    fn insert_accepted(&self, cid: ClientOrderId) {
136        self.evict_if_full(&self.emitted_accepted);
137        self.emitted_accepted.insert(cid);
138    }
139
140    fn insert_filled(&self, cid: ClientOrderId) {
141        self.evict_if_full(&self.filled_orders);
142        self.filled_orders.insert(cid);
143    }
144
145    fn insert_triggered(&self, cid: ClientOrderId) {
146        self.evict_if_full(&self.triggered_orders);
147        self.triggered_orders.insert(cid);
148    }
149}
150
151/// Dispatches a WebSocket message with cross-stream deduplication.
152///
153/// For orders with a tracked identity (submitted through this client), produces
154/// proper order events (OrderAccepted, OrderCanceled, OrderFilled, etc.).
155/// For untracked orders (external or pre-existing), falls back to execution
156/// reports for downstream reconciliation.
157pub fn dispatch_ws_message(
158    message: &BybitWsMessage,
159    emitter: &ExecutionEventEmitter,
160    state: &WsDispatchState,
161    account_id: AccountId,
162    instruments: &AHashMap<Ustr, InstrumentAny>,
163    clock: &AtomicTime,
164) {
165    match message {
166        BybitWsMessage::AccountOrder(msg) => {
167            let ts_init = clock.get_time_ns();
168
169            for order in &msg.data {
170                let symbol = make_bybit_symbol(order.symbol, order.category);
171                let Some(instrument) = instruments.get(&symbol) else {
172                    log::warn!("No instrument for order update: {symbol}");
173                    continue;
174                };
175                dispatch_order_update(order, instrument, emitter, state, account_id, ts_init);
176            }
177        }
178        BybitWsMessage::AccountExecution(msg) => {
179            let ts_init = clock.get_time_ns();
180
181            for exec in &msg.data {
182                let symbol = make_bybit_symbol(exec.symbol, exec.category);
183                let Some(instrument) = instruments.get(&symbol) else {
184                    log::warn!("No instrument for execution update: {symbol}");
185                    continue;
186                };
187                dispatch_execution_fill(exec, instrument, emitter, state, account_id, ts_init);
188            }
189        }
190        BybitWsMessage::AccountWallet(msg) => {
191            let ts_init = clock.get_time_ns();
192            let ts_event = parse_millis_i64(msg.creation_time, "wallet.creation_time")
193                .unwrap_or_else(|e| {
194                    log::warn!("Failed to parse wallet creation_time, using ts_init: {e}");
195                    ts_init
196                });
197
198            for wallet in &msg.data {
199                match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
200                    Ok(state) => emitter.send_account_state(state),
201                    Err(e) => log::error!("Failed to parse account state: {e}"),
202                }
203            }
204        }
205        BybitWsMessage::AccountPosition(msg) => {
206            let ts_init = clock.get_time_ns();
207
208            for position in &msg.data {
209                let symbol = make_bybit_symbol(position.symbol, position.category);
210                let Some(instrument) = instruments.get(&symbol) else {
211                    log::warn!("No instrument for position update: {symbol}");
212                    continue;
213                };
214
215                match parse_ws_position_status_report(position, account_id, instrument, ts_init) {
216                    Ok(report) => emitter.send_position_report(report),
217                    Err(e) => log::error!("Failed to parse position status report: {e}"),
218                }
219            }
220        }
221        BybitWsMessage::OrderResponse(resp) => {
222            let ts_init = clock.get_time_ns();
223            dispatch_order_response(resp, emitter, state, ts_init);
224        }
225        BybitWsMessage::Error(e) => {
226            log::warn!("WebSocket error: code={} message={}", e.code, e.message);
227        }
228        BybitWsMessage::Reconnected => {
229            log::info!("WebSocket reconnected");
230        }
231        BybitWsMessage::Auth(_)
232        | BybitWsMessage::Orderbook(_)
233        | BybitWsMessage::Trade(_)
234        | BybitWsMessage::Kline(_)
235        | BybitWsMessage::TickerLinear(_)
236        | BybitWsMessage::TickerOption(_) => {}
237    }
238}
239
240/// Dispatches a single order status update.
241///
242/// Tracked orders produce lifecycle events (OrderAccepted, OrderTriggered,
243/// OrderCanceled, OrderRejected). Untracked orders fall back to
244/// `OrderStatusReport` for reconciliation.
245fn dispatch_order_update(
246    order: &BybitWsAccountOrder,
247    instrument: &InstrumentAny,
248    emitter: &ExecutionEventEmitter,
249    state: &WsDispatchState,
250    account_id: AccountId,
251    ts_init: UnixNanos,
252) {
253    let client_order_id = if order.order_link_id.is_empty() {
254        None
255    } else {
256        Some(ClientOrderId::new(order.order_link_id.as_str()))
257    };
258
259    let identity = client_order_id
260        .as_ref()
261        .and_then(|cid| state.order_identities.get(cid).map(|r| r.clone()));
262
263    if let (Some(client_order_id), Some(identity)) = (client_order_id, identity) {
264        let venue_order_id = VenueOrderId::new(order.order_id.as_str());
265
266        match order.order_status {
267            BybitOrderStatus::Created | BybitOrderStatus::New | BybitOrderStatus::Untriggered => {
268                let snapshot = parse_order_snapshot(order, instrument);
269
270                if state.emitted_accepted.contains(&client_order_id)
271                    || state.filled_orders.contains(&client_order_id)
272                    || state.triggered_orders.contains(&client_order_id)
273                {
274                    if let Some(snapshot) = snapshot
275                        && is_snapshot_updated(&snapshot, &client_order_id, state)
276                    {
277                        let updated = OrderUpdated::new(
278                            emitter.trader_id(),
279                            identity.strategy_id,
280                            identity.instrument_id,
281                            client_order_id,
282                            snapshot.quantity,
283                            UUID4::new(),
284                            ts_init,
285                            ts_init,
286                            false,
287                            Some(venue_order_id),
288                            Some(account_id),
289                            snapshot.price,
290                            snapshot.trigger_price,
291                            None,
292                            false,
293                        );
294                        state.order_snapshots.insert(client_order_id, snapshot);
295                        emitter.send_order_event(OrderEventAny::Updated(updated));
296                        return;
297                    }
298                    log::debug!("Skipping duplicate Accepted for {client_order_id}");
299                    return;
300                }
301
302                state.insert_accepted(client_order_id);
303
304                if let Some(snapshot) = snapshot {
305                    state.order_snapshots.insert(client_order_id, snapshot);
306                }
307
308                let accepted = OrderAccepted::new(
309                    emitter.trader_id(),
310                    identity.strategy_id,
311                    identity.instrument_id,
312                    client_order_id,
313                    venue_order_id,
314                    account_id,
315                    UUID4::new(),
316                    ts_init,
317                    ts_init,
318                    false,
319                );
320                emitter.send_order_event(OrderEventAny::Accepted(accepted));
321            }
322            BybitOrderStatus::Triggered => {
323                if state.filled_orders.contains(&client_order_id) {
324                    log::debug!("Skipping stale Triggered for {client_order_id} (already filled)");
325                    return;
326                }
327
328                if !TRIGGERABLE_ORDER_TYPES.contains(&identity.order_type) {
329                    log::debug!(
330                        "Skipping OrderTriggered for {} order {client_order_id}: market-style stops have no TRIGGERED state",
331                        identity.order_type,
332                    );
333                    return;
334                }
335
336                ensure_accepted_emitted(
337                    client_order_id,
338                    account_id,
339                    venue_order_id,
340                    &identity,
341                    emitter,
342                    state,
343                    ts_init,
344                );
345                state.insert_triggered(client_order_id);
346                let triggered = OrderTriggered::new(
347                    emitter.trader_id(),
348                    identity.strategy_id,
349                    identity.instrument_id,
350                    client_order_id,
351                    UUID4::new(),
352                    ts_init,
353                    ts_init,
354                    false,
355                    Some(venue_order_id),
356                    Some(account_id),
357                );
358                emitter.send_order_event(OrderEventAny::Triggered(triggered));
359            }
360            BybitOrderStatus::Rejected => {
361                let filled_qty = parse_quantity_with_precision(
362                    &order.cum_exec_qty,
363                    instrument.size_precision(),
364                    "order.cumExecQty",
365                )
366                .unwrap_or_default();
367
368                if filled_qty.is_positive() {
369                    // Partially filled then rejected - treat as canceled
370                    ensure_accepted_emitted(
371                        client_order_id,
372                        account_id,
373                        venue_order_id,
374                        &identity,
375                        emitter,
376                        state,
377                        ts_init,
378                    );
379                    let canceled = OrderCanceled::new(
380                        emitter.trader_id(),
381                        identity.strategy_id,
382                        identity.instrument_id,
383                        client_order_id,
384                        UUID4::new(),
385                        ts_init,
386                        ts_init,
387                        false,
388                        Some(venue_order_id),
389                        Some(account_id),
390                    );
391                    cleanup_terminal(client_order_id, state);
392                    emitter.send_order_event(OrderEventAny::Canceled(canceled));
393                } else {
394                    let reason = if order.reject_reason.is_empty() {
395                        Ustr::from("Order rejected by venue")
396                    } else {
397                        order.reject_reason
398                    };
399                    state.order_identities.remove(&client_order_id);
400                    state.order_snapshots.remove(&client_order_id);
401                    emitter.emit_order_rejected_event(
402                        identity.strategy_id,
403                        identity.instrument_id,
404                        client_order_id,
405                        reason.as_str(),
406                        ts_init,
407                        false,
408                    );
409                }
410            }
411            BybitOrderStatus::PartiallyFilled => {
412                // Fills arrive on the execution channel; no event needed here.
413                // Ensure accepted was emitted so the fill has a valid prior state.
414                ensure_accepted_emitted(
415                    client_order_id,
416                    account_id,
417                    venue_order_id,
418                    &identity,
419                    emitter,
420                    state,
421                    ts_init,
422                );
423
424                // A successful amend on a partially filled order keeps the
425                // PartiallyFilled status. Detect price/qty/trigger changes and
426                // emit OrderUpdated so PendingUpdate resolves.
427                if let Some(snapshot) = parse_order_snapshot(order, instrument)
428                    && is_snapshot_updated(&snapshot, &client_order_id, state)
429                {
430                    let updated = OrderUpdated::new(
431                        emitter.trader_id(),
432                        identity.strategy_id,
433                        identity.instrument_id,
434                        client_order_id,
435                        snapshot.quantity,
436                        UUID4::new(),
437                        ts_init,
438                        ts_init,
439                        false,
440                        Some(venue_order_id),
441                        Some(account_id),
442                        snapshot.price,
443                        snapshot.trigger_price,
444                        None,
445                        false,
446                    );
447                    state.order_snapshots.insert(client_order_id, snapshot);
448                    emitter.send_order_event(OrderEventAny::Updated(updated));
449                }
450            }
451            BybitOrderStatus::Filled => {
452                // Fills arrive on the execution channel; no event needed here.
453                // Ensure accepted was emitted so the fill has a valid prior state.
454                ensure_accepted_emitted(
455                    client_order_id,
456                    account_id,
457                    venue_order_id,
458                    &identity,
459                    emitter,
460                    state,
461                    ts_init,
462                );
463                // Identity cleaned up in dispatch_execution_fill when leaves_qty
464                // reaches zero, since there is no guaranteed ordering between
465                // the order and execution topics.
466            }
467            BybitOrderStatus::Canceled
468            | BybitOrderStatus::PartiallyFilledCanceled
469            | BybitOrderStatus::Deactivated => {
470                ensure_accepted_emitted(
471                    client_order_id,
472                    account_id,
473                    venue_order_id,
474                    &identity,
475                    emitter,
476                    state,
477                    ts_init,
478                );
479                let canceled = OrderCanceled::new(
480                    emitter.trader_id(),
481                    identity.strategy_id,
482                    identity.instrument_id,
483                    client_order_id,
484                    UUID4::new(),
485                    ts_init,
486                    ts_init,
487                    false,
488                    Some(venue_order_id),
489                    Some(account_id),
490                );
491                cleanup_terminal(client_order_id, state);
492                emitter.send_order_event(OrderEventAny::Canceled(canceled));
493            }
494        }
495    } else {
496        // Untracked order: fall back to report for reconciliation
497        match super::parse::parse_ws_order_status_report(order, instrument, account_id, ts_init) {
498            Ok(report) => emitter.send_order_status_report(report),
499            Err(e) => log::error!("Failed to parse order status report: {e}"),
500        }
501    }
502}
503
504/// Dispatches a single execution (fill) message.
505///
506/// Tracked orders are parsed directly to [`OrderFilled`]. Untracked orders
507/// fall back to [`FillReport`] for reconciliation.
508fn dispatch_execution_fill(
509    exec: &BybitWsAccountExecution,
510    instrument: &InstrumentAny,
511    emitter: &ExecutionEventEmitter,
512    state: &WsDispatchState,
513    account_id: AccountId,
514    ts_init: UnixNanos,
515) {
516    if exec.exec_type.is_exchange_generated() {
517        log::warn!(
518            "Exchange-generated execution: exec_type={:?}, symbol={}, order_id={}, order_link_id={}, side={:?}, qty={}, price={}",
519            exec.exec_type,
520            exec.symbol,
521            exec.order_id,
522            exec.order_link_id,
523            exec.side,
524            exec.exec_qty,
525            exec.exec_price,
526        );
527    }
528
529    let client_order_id = if exec.order_link_id.is_empty() {
530        None
531    } else {
532        Some(ClientOrderId::new(exec.order_link_id.as_str()))
533    };
534
535    let identity = client_order_id
536        .as_ref()
537        .and_then(|cid| state.order_identities.get(cid).map(|r| r.clone()));
538
539    if let (Some(client_order_id), Some(identity)) = (client_order_id, identity) {
540        let venue_order_id = VenueOrderId::new(exec.order_id.as_str());
541
542        ensure_accepted_emitted(
543            client_order_id,
544            account_id,
545            venue_order_id,
546            &identity,
547            emitter,
548            state,
549            ts_init,
550        );
551
552        match parse_order_filled(exec, instrument, &identity, emitter, account_id, ts_init) {
553            Ok(filled) => {
554                state.insert_filled(client_order_id);
555                state.triggered_orders.remove(&client_order_id);
556                emitter.send_order_event(OrderEventAny::Filled(filled));
557
558                if exec.leaves_qty == "0" {
559                    cleanup_terminal(client_order_id, state);
560                }
561            }
562            Err(e) => log::error!("Failed to parse OrderFilled for {client_order_id}: {e}"),
563        }
564    } else {
565        // Untracked: fall back to FillReport for reconciliation
566        match super::parse::parse_ws_fill_report(exec, account_id, instrument, ts_init) {
567            Ok(report) => emitter.send_fill_report(report),
568            Err(e) => log::error!("Failed to parse fill report: {e}"),
569        }
570    }
571}
572
573/// Parses a Bybit execution message directly into an [`OrderFilled`] event.
574fn parse_order_filled(
575    exec: &BybitWsAccountExecution,
576    instrument: &InstrumentAny,
577    identity: &OrderIdentity,
578    emitter: &ExecutionEventEmitter,
579    account_id: AccountId,
580    ts_init: UnixNanos,
581) -> anyhow::Result<OrderFilled> {
582    let client_order_id = ClientOrderId::new(exec.order_link_id.as_str());
583    let venue_order_id = VenueOrderId::new(exec.order_id.as_str());
584    let trade_id =
585        TradeId::new_checked(exec.exec_id.as_str()).context("invalid execId in Bybit execution")?;
586
587    let last_qty = parse_quantity_with_precision(
588        &exec.exec_qty,
589        instrument.size_precision(),
590        "execution.execQty",
591    )?;
592    let last_px = parse_price_with_precision(
593        &exec.exec_price,
594        instrument.price_precision(),
595        "execution.execPrice",
596    )?;
597
598    let liquidity_side = if exec.is_maker {
599        LiquiditySide::Maker
600    } else {
601        LiquiditySide::Taker
602    };
603
604    let fee_decimal: Decimal = exec
605        .exec_fee
606        .parse()
607        .with_context(|| format!("failed to parse execFee='{}'", exec.exec_fee))?;
608    let commission_currency = instrument.quote_currency();
609    let commission = Money::from_decimal(fee_decimal, commission_currency).with_context(|| {
610        format!(
611            "failed to create commission from execFee='{}'",
612            exec.exec_fee
613        )
614    })?;
615
616    let ts_event = parse_millis_timestamp(&exec.exec_time, "execution.execTime")?;
617
618    Ok(OrderFilled::new(
619        emitter.trader_id(),
620        identity.strategy_id,
621        identity.instrument_id,
622        client_order_id,
623        venue_order_id,
624        account_id,
625        trade_id,
626        identity.order_side,
627        identity.order_type,
628        last_qty,
629        last_px,
630        commission_currency,
631        liquidity_side,
632        UUID4::new(),
633        ts_event,
634        ts_init,
635        false,
636        None, // venue_position_id
637        Some(commission),
638    ))
639}
640
641/// Handles a Bybit WS order response, emitting rejection events for failures.
642fn dispatch_order_response(
643    resp: &super::messages::BybitWsOrderResponse,
644    emitter: &ExecutionEventEmitter,
645    state: &WsDispatchState,
646    ts_init: UnixNanos,
647) {
648    if resp.ret_code == 0 {
649        // Check for per-order failures in batch retExtInfo even on success
650        let pending = resp
651            .req_id
652            .as_ref()
653            .and_then(|rid| state.pending_requests.remove(rid))
654            .map(|(_, v)| v);
655
656        if let Some((cids, voids, pending_op)) = pending {
657            let batch_errors = resp.extract_batch_errors();
658            let data_array = resp.data.as_array();
659
660            for (idx, error) in batch_errors.iter().enumerate() {
661                if error.code == 0 {
662                    continue;
663                }
664
665                // Extract orderLinkId from the corresponding data entry
666                let cid = data_array
667                    .and_then(|arr| arr.get(idx))
668                    .and_then(extract_order_link_id_from_data)
669                    .or_else(|| cids.get(idx).copied());
670
671                let Some(cid) = cid else {
672                    log::warn!(
673                        "Batch error at index {idx} without correlation: code={}, msg={}",
674                        error.code,
675                        error.msg,
676                    );
677                    continue;
678                };
679
680                let Some(identity) = state.order_identities.get(&cid).map(|r| r.clone()) else {
681                    log::warn!(
682                        "Batch error for untracked order: client_order_id={cid}, msg={}",
683                        error.msg,
684                    );
685                    continue;
686                };
687
688                let stored_void = voids.get(idx).and_then(|v| *v);
689
690                emit_rejection_for_op(
691                    &pending_op,
692                    cid,
693                    &identity,
694                    stored_void,
695                    &error.msg,
696                    emitter,
697                    state,
698                    ts_init,
699                );
700            }
701        }
702        return;
703    }
704
705    // Remove the pending request entry (if any) to get client_order_ids and op
706    let pending = resp
707        .req_id
708        .as_ref()
709        .and_then(|rid| state.pending_requests.remove(rid))
710        .map(|(_, v)| v);
711
712    let effective_op = pending
713        .as_ref()
714        .map(|(_, _, op)| *op)
715        .or_else(|| pending_op_from_str(resp.op.as_str()))
716        .unwrap_or_else(|| {
717            log::warn!("Unknown order operation '{}', defaulting to Place", resp.op);
718            PendingOperation::Place
719        });
720
721    // For batch rejections (ret_code != 0), emit rejections for ALL orders
722    if let Some((cids, voids, _)) = &pending
723        && cids.len() > 1
724    {
725        for (idx, cid) in cids.iter().enumerate() {
726            let Some(identity) = state.order_identities.get(cid).map(|r| r.clone()) else {
727                log::warn!(
728                    "Batch reject for untracked order: client_order_id={cid}, ret_msg={}",
729                    resp.ret_msg,
730                );
731                continue;
732            };
733            let void = voids.get(idx).and_then(|v| *v);
734            emit_rejection_for_op(
735                &effective_op,
736                *cid,
737                &identity,
738                void,
739                &resp.ret_msg,
740                emitter,
741                state,
742                ts_init,
743            );
744        }
745        return;
746    }
747
748    // Single-order rejection path
749    let client_order_id = extract_order_link_id_from_data(&resp.data).or_else(|| {
750        pending
751            .as_ref()
752            .and_then(|(cids, _, _)| cids.first().copied())
753    });
754
755    let stored_venue_order_id = pending
756        .as_ref()
757        .and_then(|(_, voids, _)| voids.first().and_then(|v| *v));
758
759    let Some(client_order_id) = client_order_id else {
760        log::warn!(
761            "Order response error without correlation: op={}, ret_code={}, ret_msg={}, req_id={:?}",
762            resp.op,
763            resp.ret_code,
764            resp.ret_msg,
765            resp.req_id,
766        );
767        return;
768    };
769
770    let Some(identity) = state
771        .order_identities
772        .get(&client_order_id)
773        .map(|r| r.clone())
774    else {
775        log::warn!(
776            "Order response error for untracked order: op={}, client_order_id={client_order_id}, ret_msg={}",
777            resp.op,
778            resp.ret_msg,
779        );
780        return;
781    };
782
783    let venue_order_id = extract_venue_order_id_from_data(&resp.data).or(stored_venue_order_id);
784
785    emit_rejection_for_op(
786        &effective_op,
787        client_order_id,
788        &identity,
789        venue_order_id,
790        &resp.ret_msg,
791        emitter,
792        state,
793        ts_init,
794    );
795}
796
797/// Emits the appropriate rejection event based on the pending operation type.
798#[expect(clippy::too_many_arguments)]
799fn emit_rejection_for_op(
800    pending_op: &PendingOperation,
801    client_order_id: ClientOrderId,
802    identity: &OrderIdentity,
803    venue_order_id: Option<VenueOrderId>,
804    reason: &str,
805    emitter: &ExecutionEventEmitter,
806    state: &WsDispatchState,
807    ts_init: UnixNanos,
808) {
809    match pending_op {
810        PendingOperation::Place => {
811            state.order_identities.remove(&client_order_id);
812            emitter.emit_order_rejected_event(
813                identity.strategy_id,
814                identity.instrument_id,
815                client_order_id,
816                reason,
817                ts_init,
818                false,
819            );
820        }
821        PendingOperation::Cancel => {
822            emitter.emit_order_cancel_rejected_event(
823                identity.strategy_id,
824                identity.instrument_id,
825                client_order_id,
826                venue_order_id,
827                reason,
828                ts_init,
829            );
830        }
831        PendingOperation::Amend => {
832            emitter.emit_order_modify_rejected_event(
833                identity.strategy_id,
834                identity.instrument_id,
835                client_order_id,
836                venue_order_id,
837                reason,
838                ts_init,
839            );
840        }
841    }
842}
843
844/// Maps an operation string to a `PendingOperation`.
845fn pending_op_from_str(op: &str) -> Option<PendingOperation> {
846    match op {
847        BYBIT_OP_ORDER_CREATE => Some(PendingOperation::Place),
848        BYBIT_OP_ORDER_CANCEL => Some(PendingOperation::Cancel),
849        BYBIT_OP_ORDER_AMEND => Some(PendingOperation::Amend),
850        _ => None,
851    }
852}
853
854/// Parses an order snapshot from a WS order message for modification detection.
855fn parse_order_snapshot(
856    order: &BybitWsAccountOrder,
857    instrument: &InstrumentAny,
858) -> Option<OrderStateSnapshot> {
859    let quantity =
860        parse_quantity_with_precision(&order.qty, instrument.size_precision(), "order.qty").ok()?;
861
862    let price = if !order.price.is_empty() && order.price != "0" {
863        parse_price_with_precision(&order.price, instrument.price_precision(), "order.price").ok()
864    } else {
865        None
866    };
867
868    let trigger_price = if !order.trigger_price.is_empty() && order.trigger_price != "0" {
869        parse_price_with_precision(
870            &order.trigger_price,
871            instrument.price_precision(),
872            "order.triggerPrice",
873        )
874        .ok()
875    } else {
876        None
877    };
878
879    Some(OrderStateSnapshot {
880        quantity,
881        price,
882        trigger_price,
883    })
884}
885
886/// Returns whether the incoming snapshot differs from the stored snapshot.
887fn is_snapshot_updated(
888    snapshot: &OrderStateSnapshot,
889    client_order_id: &ClientOrderId,
890    state: &WsDispatchState,
891) -> bool {
892    let Some(previous) = state.order_snapshots.get(client_order_id) else {
893        return false;
894    };
895
896    if let (Some(prev_price), Some(new_price)) = (previous.price, snapshot.price)
897        && prev_price != new_price
898    {
899        return true;
900    }
901
902    if let (Some(prev_trigger), Some(new_trigger)) =
903        (previous.trigger_price, snapshot.trigger_price)
904        && prev_trigger != new_trigger
905    {
906        return true;
907    }
908
909    previous.quantity != snapshot.quantity
910}
911
912/// Synthesizes and emits `OrderAccepted` if one has not yet been emitted for
913/// this order. Handles fast-filling orders that skip the `New` state on Bybit.
914fn ensure_accepted_emitted(
915    client_order_id: ClientOrderId,
916    account_id: AccountId,
917    venue_order_id: VenueOrderId,
918    identity: &OrderIdentity,
919    emitter: &ExecutionEventEmitter,
920    state: &WsDispatchState,
921    ts_init: UnixNanos,
922) {
923    if state.emitted_accepted.contains(&client_order_id) {
924        return;
925    }
926    state.insert_accepted(client_order_id);
927    let accepted = OrderAccepted::new(
928        emitter.trader_id(),
929        identity.strategy_id,
930        identity.instrument_id,
931        client_order_id,
932        venue_order_id,
933        account_id,
934        UUID4::new(),
935        ts_init,
936        ts_init,
937        false,
938    );
939    emitter.send_order_event(OrderEventAny::Accepted(accepted));
940}
941
942/// Removes a terminal order from all tracking sets.
943fn cleanup_terminal(client_order_id: ClientOrderId, state: &WsDispatchState) {
944    state.order_identities.remove(&client_order_id);
945    state.order_snapshots.remove(&client_order_id);
946    state.emitted_accepted.remove(&client_order_id);
947    state.triggered_orders.remove(&client_order_id);
948    state.filled_orders.remove(&client_order_id);
949}
950
951/// Tries to extract `orderLinkId` from the response data Value.
952fn extract_order_link_id_from_data(data: &serde_json::Value) -> Option<ClientOrderId> {
953    data.get("orderLinkId")
954        .and_then(|v| v.as_str())
955        .filter(|s| !s.is_empty())
956        .map(ClientOrderId::new)
957}
958
959/// Tries to extract `orderId` from the response data Value.
960fn extract_venue_order_id_from_data(data: &serde_json::Value) -> Option<VenueOrderId> {
961    data.get("orderId")
962        .and_then(|v| v.as_str())
963        .filter(|s| !s.is_empty())
964        .map(VenueOrderId::new)
965}
966
967#[cfg(test)]
968mod tests {
969    use ahash::AHashMap;
970    use nautilus_common::messages::{ExecutionEvent, execution::ExecutionReport};
971    use nautilus_core::{
972        UnixNanos,
973        time::{AtomicTime, get_atomic_clock_realtime},
974    };
975    use nautilus_live::emitter::ExecutionEventEmitter;
976    use nautilus_model::{
977        enums::{AccountType, OrderSide, OrderType},
978        events::OrderEventAny,
979        identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
980        instruments::{Instrument, InstrumentAny},
981    };
982    use rstest::rstest;
983    use ustr::Ustr;
984
985    use super::*;
986    use crate::{
987        common::{parse::parse_linear_instrument, testing::load_test_json},
988        http::models::{BybitFeeRate, BybitInstrumentLinearResponse},
989        websocket::messages::BybitWsMessage,
990    };
991
992    fn sample_fee_rate(
993        symbol: &str,
994        taker: &str,
995        maker: &str,
996        base_coin: Option<&str>,
997    ) -> BybitFeeRate {
998        BybitFeeRate {
999            symbol: Ustr::from(symbol),
1000            taker_fee_rate: taker.to_string(),
1001            maker_fee_rate: maker.to_string(),
1002            base_coin: base_coin.map(Ustr::from),
1003        }
1004    }
1005
1006    fn linear_instrument() -> InstrumentAny {
1007        let json = load_test_json("http_get_instruments_linear.json");
1008        let response: BybitInstrumentLinearResponse = serde_json::from_str(&json).unwrap();
1009        let instrument = &response.result.list[0];
1010        let fee_rate = sample_fee_rate("BTCUSDT", "0.00055", "0.0001", Some("BTC"));
1011        let ts = UnixNanos::new(1_700_000_000_000_000_000);
1012        parse_linear_instrument(instrument, &fee_rate, ts, ts).unwrap()
1013    }
1014
1015    fn build_instruments(instruments: &[InstrumentAny]) -> AHashMap<Ustr, InstrumentAny> {
1016        let mut map = AHashMap::new();
1017        for inst in instruments {
1018            map.insert(inst.id().symbol.inner(), inst.clone());
1019        }
1020        map
1021    }
1022
1023    fn test_account_id() -> AccountId {
1024        AccountId::from("BYBIT-001")
1025    }
1026
1027    fn create_emitter() -> (
1028        ExecutionEventEmitter,
1029        tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1030    ) {
1031        let clock = get_atomic_clock_realtime();
1032        let trader_id = TraderId::from("TESTER-001");
1033        let account_id = test_account_id();
1034        let mut emitter =
1035            ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
1036        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1037        emitter.set_sender(tx);
1038        (emitter, rx)
1039    }
1040
1041    fn default_identity() -> OrderIdentity {
1042        OrderIdentity {
1043            instrument_id: InstrumentId::from("BTCUSDT-LINEAR.BYBIT"),
1044            strategy_id: StrategyId::from("S-001"),
1045            order_side: OrderSide::Buy,
1046            order_type: OrderType::Limit,
1047        }
1048    }
1049
1050    #[rstest]
1051    fn test_dispatch_tracked_canceled_order_emits_accepted_then_canceled() {
1052        let instrument = linear_instrument();
1053        let instruments = build_instruments(std::slice::from_ref(&instrument));
1054        let (emitter, mut rx) = create_emitter();
1055        let clock = get_atomic_clock_realtime();
1056        let state = WsDispatchState::default();
1057
1058        // Fixture has orderStatus=Cancelled
1059        let json = load_test_json("ws_account_order.json");
1060        let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1061            serde_json::from_str(&json).unwrap();
1062
1063        if let Some(order) = msg.data.first()
1064            && !order.order_link_id.is_empty()
1065        {
1066            let cid = ClientOrderId::new(order.order_link_id.as_str());
1067            state.order_identities.insert(cid, default_identity());
1068        }
1069
1070        let ws_msg = BybitWsMessage::AccountOrder(msg);
1071        dispatch_ws_message(
1072            &ws_msg,
1073            &emitter,
1074            &state,
1075            test_account_id(),
1076            &instruments,
1077            clock,
1078        );
1079
1080        // First: synthesized Accepted
1081        let event1 = rx.try_recv().unwrap();
1082        assert!(
1083            matches!(event1, ExecutionEvent::Order(OrderEventAny::Accepted(ref a)) if a.strategy_id == StrategyId::from("S-001")),
1084            "Expected Accepted, found {event1:?}"
1085        );
1086
1087        // Second: Canceled (from Cancelled status)
1088        let event2 = rx.try_recv().unwrap();
1089        assert!(
1090            matches!(event2, ExecutionEvent::Order(OrderEventAny::Canceled(_))),
1091            "Expected Canceled, found {event2:?}"
1092        );
1093    }
1094
1095    #[rstest]
1096    fn test_dispatch_untracked_order_emits_report() {
1097        let instrument = linear_instrument();
1098        let instruments = build_instruments(std::slice::from_ref(&instrument));
1099        let (emitter, mut rx) = create_emitter();
1100        let clock = get_atomic_clock_realtime();
1101        let state = WsDispatchState::default();
1102
1103        let json = load_test_json("ws_account_order.json");
1104        let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1105            serde_json::from_str(&json).unwrap();
1106
1107        // No identity registered → untracked
1108        let ws_msg = BybitWsMessage::AccountOrder(msg);
1109        dispatch_ws_message(
1110            &ws_msg,
1111            &emitter,
1112            &state,
1113            test_account_id(),
1114            &instruments,
1115            clock,
1116        );
1117
1118        let event = rx.try_recv().unwrap();
1119        assert!(matches!(
1120            event,
1121            ExecutionEvent::Report(ExecutionReport::Order(_))
1122        ));
1123    }
1124
1125    #[rstest]
1126    fn test_dispatch_tracked_execution_emits_order_filled() {
1127        let instrument = linear_instrument();
1128        let instruments = build_instruments(std::slice::from_ref(&instrument));
1129        let (emitter, mut rx) = create_emitter();
1130        let clock = get_atomic_clock_realtime();
1131        let state = WsDispatchState::default();
1132
1133        let json = load_test_json("ws_account_execution.json");
1134        let msg: crate::websocket::messages::BybitWsAccountExecutionMsg =
1135            serde_json::from_str(&json).unwrap();
1136
1137        // Register identity for the execution's orderLinkId
1138        if let Some(exec) = msg.data.first()
1139            && !exec.order_link_id.is_empty()
1140        {
1141            let cid = ClientOrderId::new(exec.order_link_id.as_str());
1142            state.order_identities.insert(cid, default_identity());
1143        }
1144
1145        let ws_msg = BybitWsMessage::AccountExecution(msg);
1146        dispatch_ws_message(
1147            &ws_msg,
1148            &emitter,
1149            &state,
1150            test_account_id(),
1151            &instruments,
1152            clock,
1153        );
1154
1155        // First event should be synthesized Accepted
1156        let event1 = rx.try_recv().unwrap();
1157        assert!(
1158            matches!(event1, ExecutionEvent::Order(OrderEventAny::Accepted(_))),
1159            "Expected Accepted, found {event1:?}"
1160        );
1161
1162        // Second event should be OrderFilled
1163        let event2 = rx.try_recv().unwrap();
1164        match event2 {
1165            ExecutionEvent::Order(OrderEventAny::Filled(filled)) => {
1166                assert_eq!(filled.strategy_id, StrategyId::from("S-001"));
1167                assert_eq!(filled.order_side, OrderSide::Buy);
1168                assert_eq!(filled.order_type, OrderType::Limit);
1169            }
1170            other => panic!("Expected Filled event, found {other:?}"),
1171        }
1172    }
1173
1174    #[rstest]
1175    fn test_dispatch_untracked_execution_emits_fill_report() {
1176        let instrument = linear_instrument();
1177        let instruments = build_instruments(std::slice::from_ref(&instrument));
1178        let (emitter, mut rx) = create_emitter();
1179        let clock = get_atomic_clock_realtime();
1180        let state = WsDispatchState::default();
1181
1182        let json = load_test_json("ws_account_execution.json");
1183        let msg: crate::websocket::messages::BybitWsAccountExecutionMsg =
1184            serde_json::from_str(&json).unwrap();
1185
1186        // No identity registered → untracked
1187        let ws_msg = BybitWsMessage::AccountExecution(msg);
1188        dispatch_ws_message(
1189            &ws_msg,
1190            &emitter,
1191            &state,
1192            test_account_id(),
1193            &instruments,
1194            clock,
1195        );
1196
1197        let event = rx.try_recv().unwrap();
1198        assert!(matches!(
1199            event,
1200            ExecutionEvent::Report(ExecutionReport::Fill(_))
1201        ));
1202    }
1203
1204    #[rstest]
1205    fn test_dispatch_wallet_emits_account_state() {
1206        let instruments = AHashMap::new();
1207        let (emitter, mut rx) = create_emitter();
1208        let clock = get_atomic_clock_realtime();
1209        let state = WsDispatchState::default();
1210
1211        let json = load_test_json("ws_account_wallet.json");
1212        let msg: crate::websocket::messages::BybitWsAccountWalletMsg =
1213            serde_json::from_str(&json).unwrap();
1214        let ws_msg = BybitWsMessage::AccountWallet(msg);
1215
1216        dispatch_ws_message(
1217            &ws_msg,
1218            &emitter,
1219            &state,
1220            test_account_id(),
1221            &instruments,
1222            clock,
1223        );
1224
1225        let event = rx.try_recv().unwrap();
1226        assert!(matches!(event, ExecutionEvent::Account(_)));
1227    }
1228
1229    #[rstest]
1230    fn test_dispatch_data_message_ignored() {
1231        let instruments = AHashMap::new();
1232        let (emitter, mut rx) = create_emitter();
1233        let clock = get_atomic_clock_realtime();
1234        let state = WsDispatchState::default();
1235
1236        let json = load_test_json("ws_public_trade.json");
1237        let msg: crate::websocket::messages::BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
1238        let ws_msg = BybitWsMessage::Trade(msg);
1239
1240        dispatch_ws_message(
1241            &ws_msg,
1242            &emitter,
1243            &state,
1244            test_account_id(),
1245            &instruments,
1246            clock,
1247        );
1248
1249        assert!(rx.try_recv().is_err());
1250    }
1251
1252    #[rstest]
1253    fn test_accepted_dedup_prevents_duplicate() {
1254        let instrument = linear_instrument();
1255        let instruments = build_instruments(std::slice::from_ref(&instrument));
1256        let (emitter, mut rx) = create_emitter();
1257        let clock = get_atomic_clock_realtime();
1258        let state = WsDispatchState::default();
1259
1260        // Fixture has orderStatus=Cancelled. Patch to New for this dedup test.
1261        let json = load_test_json("ws_account_order.json");
1262        let mut value: serde_json::Value = serde_json::from_str(&json).unwrap();
1263        value["data"][0]["orderStatus"] = serde_json::Value::String("New".to_string());
1264        let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1265            serde_json::from_value(value).unwrap();
1266
1267        if let Some(order) = msg.data.first()
1268            && !order.order_link_id.is_empty()
1269        {
1270            let cid = ClientOrderId::new(order.order_link_id.as_str());
1271            state.order_identities.insert(cid, default_identity());
1272        }
1273
1274        let ws_msg = BybitWsMessage::AccountOrder(msg.clone());
1275        dispatch_ws_message(
1276            &ws_msg,
1277            &emitter,
1278            &state,
1279            test_account_id(),
1280            &instruments,
1281            clock,
1282        );
1283
1284        let event = rx.try_recv().unwrap();
1285        assert!(matches!(
1286            event,
1287            ExecutionEvent::Order(OrderEventAny::Accepted(_))
1288        ));
1289
1290        // Dispatch the same message again: dedup should suppress the duplicate
1291        let ws_msg2 = BybitWsMessage::AccountOrder(msg);
1292        dispatch_ws_message(
1293            &ws_msg2,
1294            &emitter,
1295            &state,
1296            test_account_id(),
1297            &instruments,
1298            clock,
1299        );
1300
1301        assert!(rx.try_recv().is_err());
1302    }
1303
1304    fn new_order_value() -> serde_json::Value {
1305        let json = load_test_json("ws_account_order.json");
1306        let mut value: serde_json::Value = serde_json::from_str(&json).unwrap();
1307        value["data"][0]["orderStatus"] = serde_json::Value::String("New".to_string());
1308        value
1309    }
1310
1311    struct DispatchTestContext {
1312        instruments: AHashMap<Ustr, InstrumentAny>,
1313        emitter: ExecutionEventEmitter,
1314        rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1315        clock: &'static AtomicTime,
1316        state: WsDispatchState,
1317    }
1318
1319    impl DispatchTestContext {
1320        fn new() -> Self {
1321            let instrument = linear_instrument();
1322            let instruments = build_instruments(std::slice::from_ref(&instrument));
1323            let (emitter, rx) = create_emitter();
1324            let clock = get_atomic_clock_realtime();
1325            let state = WsDispatchState::default();
1326            Self {
1327                instruments,
1328                emitter,
1329                rx,
1330                clock,
1331                state,
1332            }
1333        }
1334
1335        fn accept_order(&mut self, value: &serde_json::Value) {
1336            let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1337                serde_json::from_value(value.clone()).unwrap();
1338
1339            if let Some(order) = msg.data.first()
1340                && !order.order_link_id.is_empty()
1341                && !self
1342                    .state
1343                    .order_identities
1344                    .contains_key(&ClientOrderId::new(order.order_link_id.as_str()))
1345            {
1346                let cid = ClientOrderId::new(order.order_link_id.as_str());
1347                self.state.order_identities.insert(cid, default_identity());
1348            }
1349
1350            self.dispatch_value(value);
1351
1352            let event = self.rx.try_recv().unwrap();
1353            assert!(
1354                matches!(event, ExecutionEvent::Order(OrderEventAny::Accepted(_))),
1355                "Expected Accepted, found {event:?}"
1356            );
1357        }
1358
1359        fn dispatch_value(&self, value: &serde_json::Value) {
1360            let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1361                serde_json::from_value(value.clone()).unwrap();
1362            let ws_msg = BybitWsMessage::AccountOrder(msg);
1363            dispatch_ws_message(
1364                &ws_msg,
1365                &self.emitter,
1366                &self.state,
1367                test_account_id(),
1368                &self.instruments,
1369                self.clock,
1370            );
1371        }
1372
1373        fn recv_updated(&mut self) -> OrderUpdated {
1374            let event = self.rx.try_recv().unwrap();
1375            match event {
1376                ExecutionEvent::Order(OrderEventAny::Updated(updated)) => updated,
1377                other => panic!("Expected Updated event, found {other:?}"),
1378            }
1379        }
1380    }
1381
1382    #[rstest]
1383    fn test_dispatch_order_updated_on_price_change() {
1384        let mut ctx = DispatchTestContext::new();
1385        let value = new_order_value();
1386        ctx.accept_order(&value);
1387
1388        let mut amended = value;
1389        amended["data"][0]["price"] = serde_json::Value::String("31000".to_string());
1390        ctx.dispatch_value(&amended);
1391
1392        let updated = ctx.recv_updated();
1393        assert_eq!(updated.client_order_id, ClientOrderId::from("client-1"));
1394        assert_eq!(updated.price, Some(Price::from("31000.00")));
1395        assert_eq!(updated.quantity, Quantity::from("0.010"));
1396        assert_eq!(updated.trigger_price, None);
1397        assert!(updated.venue_order_id.is_some());
1398    }
1399
1400    #[rstest]
1401    fn test_dispatch_order_updated_on_quantity_change() {
1402        let mut ctx = DispatchTestContext::new();
1403        let value = new_order_value();
1404        ctx.accept_order(&value);
1405
1406        let mut amended = value;
1407        amended["data"][0]["qty"] = serde_json::Value::String("0.020".to_string());
1408        ctx.dispatch_value(&amended);
1409
1410        let updated = ctx.recv_updated();
1411        assert_eq!(updated.quantity, Quantity::from("0.020"));
1412        assert_eq!(updated.price, Some(Price::from("30000.00")));
1413    }
1414
1415    #[rstest]
1416    fn test_dispatch_order_updated_on_trigger_price_change() {
1417        let mut ctx = DispatchTestContext::new();
1418        let mut value = new_order_value();
1419        value["data"][0]["triggerPrice"] = serde_json::Value::String("29000".to_string());
1420        ctx.accept_order(&value);
1421
1422        let mut amended = value;
1423        amended["data"][0]["triggerPrice"] = serde_json::Value::String("28000".to_string());
1424        ctx.dispatch_value(&amended);
1425
1426        let updated = ctx.recv_updated();
1427        assert_eq!(updated.trigger_price, Some(Price::from("28000.00")));
1428        assert_eq!(updated.price, Some(Price::from("30000.00")));
1429    }
1430
1431    #[rstest]
1432    fn test_dispatch_dedup_suppresses_identical_after_snapshot() {
1433        let mut ctx = DispatchTestContext::new();
1434        let value = new_order_value();
1435        ctx.accept_order(&value);
1436
1437        ctx.dispatch_value(&value);
1438
1439        assert!(
1440            ctx.rx.try_recv().is_err(),
1441            "Expected no event for identical redelivery"
1442        );
1443    }
1444
1445    #[rstest]
1446    fn test_dispatch_order_updated_stores_snapshot_for_subsequent_change() {
1447        let mut ctx = DispatchTestContext::new();
1448        let value = new_order_value();
1449        ctx.accept_order(&value);
1450
1451        let mut amended1 = value.clone();
1452        amended1["data"][0]["price"] = serde_json::Value::String("31000".to_string());
1453        ctx.dispatch_value(&amended1);
1454        let _ = ctx.recv_updated();
1455
1456        let mut amended2 = value;
1457        amended2["data"][0]["price"] = serde_json::Value::String("32000".to_string());
1458        ctx.dispatch_value(&amended2);
1459
1460        let updated = ctx.recv_updated();
1461        assert_eq!(updated.price, Some(Price::from("32000.00")));
1462    }
1463
1464    #[rstest]
1465    #[case::price_changed(
1466        Some(Price::from("100.00")),
1467        None,
1468        Quantity::from("1.000"),
1469        Some(Price::from("200.00")),
1470        None,
1471        Quantity::from("1.000"),
1472        true
1473    )]
1474    #[case::trigger_changed(
1475        None,
1476        Some(Price::from("100.00")),
1477        Quantity::from("1.000"),
1478        None,
1479        Some(Price::from("90.00")),
1480        Quantity::from("1.000"),
1481        true
1482    )]
1483    #[case::qty_changed(
1484        Some(Price::from("100.00")),
1485        None,
1486        Quantity::from("1.000"),
1487        Some(Price::from("100.00")),
1488        None,
1489        Quantity::from("2.000"),
1490        true
1491    )]
1492    #[case::no_change(
1493        Some(Price::from("100.00")),
1494        None,
1495        Quantity::from("1.000"),
1496        Some(Price::from("100.00")),
1497        None,
1498        Quantity::from("1.000"),
1499        false
1500    )]
1501    fn test_is_snapshot_updated(
1502        #[case] prev_price: Option<Price>,
1503        #[case] prev_trigger: Option<Price>,
1504        #[case] prev_qty: Quantity,
1505        #[case] new_price: Option<Price>,
1506        #[case] new_trigger: Option<Price>,
1507        #[case] new_qty: Quantity,
1508        #[case] expected: bool,
1509    ) {
1510        let state = WsDispatchState::default();
1511        let cid = ClientOrderId::from("test-1");
1512        state.order_snapshots.insert(
1513            cid,
1514            OrderStateSnapshot {
1515                quantity: prev_qty,
1516                price: prev_price,
1517                trigger_price: prev_trigger,
1518            },
1519        );
1520
1521        let new_snapshot = OrderStateSnapshot {
1522            quantity: new_qty,
1523            price: new_price,
1524            trigger_price: new_trigger,
1525        };
1526        assert_eq!(is_snapshot_updated(&new_snapshot, &cid, &state), expected);
1527    }
1528
1529    #[rstest]
1530    fn test_is_snapshot_updated_no_previous() {
1531        let state = WsDispatchState::default();
1532        let cid = ClientOrderId::from("test-1");
1533
1534        let new_snapshot = OrderStateSnapshot {
1535            quantity: Quantity::from("1.000"),
1536            price: Some(Price::from("100.00")),
1537            trigger_price: None,
1538        };
1539        assert!(!is_snapshot_updated(&new_snapshot, &cid, &state));
1540    }
1541
1542    #[rstest]
1543    #[case::limit_order("30000", "0", Some(Price::from("30000.00")), None)]
1544    #[case::conditional("0", "29000", None, Some(Price::from("29000.00")))]
1545    #[case::both(
1546        "30000",
1547        "29000",
1548        Some(Price::from("30000.00")),
1549        Some(Price::from("29000.00"))
1550    )]
1551    fn test_parse_order_snapshot(
1552        #[case] price: &str,
1553        #[case] trigger: &str,
1554        #[case] expected_price: Option<Price>,
1555        #[case] expected_trigger: Option<Price>,
1556    ) {
1557        let instrument = linear_instrument();
1558        let json = load_test_json("ws_account_order.json");
1559        let mut value: serde_json::Value = serde_json::from_str(&json).unwrap();
1560        value["data"][0]["price"] = serde_json::Value::String(price.to_string());
1561        value["data"][0]["triggerPrice"] = serde_json::Value::String(trigger.to_string());
1562        let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1563            serde_json::from_value(value).unwrap();
1564
1565        let snapshot = parse_order_snapshot(&msg.data[0], &instrument).unwrap();
1566        assert_eq!(snapshot.price, expected_price);
1567        assert_eq!(snapshot.trigger_price, expected_trigger);
1568        assert_eq!(snapshot.quantity, Quantity::from("0.010"));
1569    }
1570
1571    #[rstest]
1572    fn test_parse_order_snapshot_invalid_qty_returns_none() {
1573        let instrument = linear_instrument();
1574        let json = load_test_json("ws_account_order.json");
1575        let mut value: serde_json::Value = serde_json::from_str(&json).unwrap();
1576        value["data"][0]["qty"] = serde_json::Value::String(String::new());
1577        let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1578            serde_json::from_value(value).unwrap();
1579
1580        assert!(parse_order_snapshot(&msg.data[0], &instrument).is_none());
1581    }
1582
1583    #[rstest]
1584    fn test_dispatch_order_updated_on_partially_filled_price_change() {
1585        let mut ctx = DispatchTestContext::new();
1586        let value = new_order_value();
1587        ctx.accept_order(&value);
1588
1589        let mut amended = value;
1590        amended["data"][0]["orderStatus"] =
1591            serde_json::Value::String("PartiallyFilled".to_string());
1592        amended["data"][0]["cumExecQty"] = serde_json::Value::String("0.005".to_string());
1593        amended["data"][0]["price"] = serde_json::Value::String("31000".to_string());
1594        ctx.dispatch_value(&amended);
1595
1596        let updated = ctx.recv_updated();
1597        assert_eq!(updated.client_order_id, ClientOrderId::from("client-1"));
1598        assert_eq!(updated.price, Some(Price::from("31000.00")));
1599    }
1600}