1use std::sync::atomic::{AtomicBool, Ordering};
24
25use ahash::AHashMap;
26use anyhow::Context;
27use dashmap::{DashMap, DashSet};
28use nautilus_core::{UUID4, UnixNanos, time::AtomicTime};
29use nautilus_live::ExecutionEventEmitter;
30use nautilus_model::{
31 enums::{LiquiditySide, OrderSide, OrderType},
32 events::{
33 OrderAccepted, OrderCanceled, OrderEventAny, OrderFilled, OrderTriggered, OrderUpdated,
34 },
35 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, VenueOrderId},
36 instruments::{Instrument, InstrumentAny},
37 orders::TRIGGERABLE_ORDER_TYPES,
38 types::{Money, Price, Quantity},
39};
40use rust_decimal::Decimal;
41use ustr::Ustr;
42
43use super::{
44 messages::{BybitWsAccountExecution, BybitWsAccountOrder, BybitWsMessage},
45 parse::{parse_millis_i64, parse_ws_account_state, parse_ws_position_status_report},
46};
47use crate::common::{
48 enums::BybitOrderStatus,
49 parse::{
50 make_bybit_symbol, parse_millis_timestamp, parse_price_with_precision,
51 parse_quantity_with_precision,
52 },
53};
54
55const DEDUP_CAPACITY: usize = 10_000;
56
57const BYBIT_OP_ORDER_CREATE: &str = "order.create";
58const BYBIT_OP_ORDER_AMEND: &str = "order.amend";
59const BYBIT_OP_ORDER_CANCEL: &str = "order.cancel";
60
61#[derive(Debug, Clone)]
64pub struct OrderIdentity {
65 pub instrument_id: InstrumentId,
66 pub strategy_id: StrategyId,
67 pub order_side: OrderSide,
68 pub order_type: OrderType,
69}
70
71#[derive(Debug, Clone, Copy)]
73pub enum PendingOperation {
74 Place,
75 Cancel,
76 Amend,
77}
78
79pub type PendingRequestData = (
82 Vec<ClientOrderId>,
83 Vec<Option<VenueOrderId>>,
84 PendingOperation,
85);
86
87#[derive(Debug, Clone)]
91pub struct OrderStateSnapshot {
92 pub quantity: Quantity,
93 pub price: Option<Price>,
94 pub trigger_price: Option<Price>,
95}
96
97#[derive(Debug)]
98pub struct WsDispatchState {
99 pub order_identities: DashMap<ClientOrderId, OrderIdentity>,
100 pub pending_requests: DashMap<String, PendingRequestData>,
101 pub order_snapshots: DashMap<ClientOrderId, OrderStateSnapshot>,
102 pub emitted_accepted: DashSet<ClientOrderId>,
103 pub triggered_orders: DashSet<ClientOrderId>,
104 pub filled_orders: DashSet<ClientOrderId>,
105 clearing: AtomicBool,
106}
107
108impl Default for WsDispatchState {
109 fn default() -> Self {
110 Self {
111 order_identities: DashMap::new(),
112 pending_requests: DashMap::new(),
113 order_snapshots: DashMap::new(),
114 emitted_accepted: DashSet::default(),
115 triggered_orders: DashSet::default(),
116 filled_orders: DashSet::default(),
117 clearing: AtomicBool::new(false),
118 }
119 }
120}
121
122impl WsDispatchState {
123 fn evict_if_full(&self, set: &DashSet<ClientOrderId>) {
124 if set.len() >= DEDUP_CAPACITY
125 && self
126 .clearing
127 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
128 .is_ok()
129 {
130 set.clear();
131 self.clearing.store(false, Ordering::Release);
132 }
133 }
134
135 fn insert_accepted(&self, cid: ClientOrderId) {
136 self.evict_if_full(&self.emitted_accepted);
137 self.emitted_accepted.insert(cid);
138 }
139
140 fn insert_filled(&self, cid: ClientOrderId) {
141 self.evict_if_full(&self.filled_orders);
142 self.filled_orders.insert(cid);
143 }
144
145 fn insert_triggered(&self, cid: ClientOrderId) {
146 self.evict_if_full(&self.triggered_orders);
147 self.triggered_orders.insert(cid);
148 }
149}
150
151pub fn dispatch_ws_message(
158 message: &BybitWsMessage,
159 emitter: &ExecutionEventEmitter,
160 state: &WsDispatchState,
161 account_id: AccountId,
162 instruments: &AHashMap<Ustr, InstrumentAny>,
163 clock: &AtomicTime,
164) {
165 match message {
166 BybitWsMessage::AccountOrder(msg) => {
167 let ts_init = clock.get_time_ns();
168
169 for order in &msg.data {
170 let symbol = make_bybit_symbol(order.symbol, order.category);
171 let Some(instrument) = instruments.get(&symbol) else {
172 log::warn!("No instrument for order update: {symbol}");
173 continue;
174 };
175 dispatch_order_update(order, instrument, emitter, state, account_id, ts_init);
176 }
177 }
178 BybitWsMessage::AccountExecution(msg) => {
179 let ts_init = clock.get_time_ns();
180
181 for exec in &msg.data {
182 let symbol = make_bybit_symbol(exec.symbol, exec.category);
183 let Some(instrument) = instruments.get(&symbol) else {
184 log::warn!("No instrument for execution update: {symbol}");
185 continue;
186 };
187 dispatch_execution_fill(exec, instrument, emitter, state, account_id, ts_init);
188 }
189 }
190 BybitWsMessage::AccountWallet(msg) => {
191 let ts_init = clock.get_time_ns();
192 let ts_event = parse_millis_i64(msg.creation_time, "wallet.creation_time")
193 .unwrap_or_else(|e| {
194 log::warn!("Failed to parse wallet creation_time, using ts_init: {e}");
195 ts_init
196 });
197
198 for wallet in &msg.data {
199 match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
200 Ok(state) => emitter.send_account_state(state),
201 Err(e) => log::error!("Failed to parse account state: {e}"),
202 }
203 }
204 }
205 BybitWsMessage::AccountPosition(msg) => {
206 let ts_init = clock.get_time_ns();
207
208 for position in &msg.data {
209 let symbol = make_bybit_symbol(position.symbol, position.category);
210 let Some(instrument) = instruments.get(&symbol) else {
211 log::warn!("No instrument for position update: {symbol}");
212 continue;
213 };
214
215 match parse_ws_position_status_report(position, account_id, instrument, ts_init) {
216 Ok(report) => emitter.send_position_report(report),
217 Err(e) => log::error!("Failed to parse position status report: {e}"),
218 }
219 }
220 }
221 BybitWsMessage::OrderResponse(resp) => {
222 let ts_init = clock.get_time_ns();
223 dispatch_order_response(resp, emitter, state, ts_init);
224 }
225 BybitWsMessage::Error(e) => {
226 log::warn!("WebSocket error: code={} message={}", e.code, e.message);
227 }
228 BybitWsMessage::Reconnected => {
229 log::info!("WebSocket reconnected");
230 }
231 BybitWsMessage::Auth(_)
232 | BybitWsMessage::Orderbook(_)
233 | BybitWsMessage::Trade(_)
234 | BybitWsMessage::Kline(_)
235 | BybitWsMessage::TickerLinear(_)
236 | BybitWsMessage::TickerOption(_) => {}
237 }
238}
239
240fn dispatch_order_update(
246 order: &BybitWsAccountOrder,
247 instrument: &InstrumentAny,
248 emitter: &ExecutionEventEmitter,
249 state: &WsDispatchState,
250 account_id: AccountId,
251 ts_init: UnixNanos,
252) {
253 let client_order_id = if order.order_link_id.is_empty() {
254 None
255 } else {
256 Some(ClientOrderId::new(order.order_link_id.as_str()))
257 };
258
259 let identity = client_order_id
260 .as_ref()
261 .and_then(|cid| state.order_identities.get(cid).map(|r| r.clone()));
262
263 if let (Some(client_order_id), Some(identity)) = (client_order_id, identity) {
264 let venue_order_id = VenueOrderId::new(order.order_id.as_str());
265
266 match order.order_status {
267 BybitOrderStatus::Created | BybitOrderStatus::New | BybitOrderStatus::Untriggered => {
268 let snapshot = parse_order_snapshot(order, instrument);
269
270 if state.emitted_accepted.contains(&client_order_id)
271 || state.filled_orders.contains(&client_order_id)
272 || state.triggered_orders.contains(&client_order_id)
273 {
274 if let Some(snapshot) = snapshot
275 && is_snapshot_updated(&snapshot, &client_order_id, state)
276 {
277 let updated = OrderUpdated::new(
278 emitter.trader_id(),
279 identity.strategy_id,
280 identity.instrument_id,
281 client_order_id,
282 snapshot.quantity,
283 UUID4::new(),
284 ts_init,
285 ts_init,
286 false,
287 Some(venue_order_id),
288 Some(account_id),
289 snapshot.price,
290 snapshot.trigger_price,
291 None,
292 false,
293 );
294 state.order_snapshots.insert(client_order_id, snapshot);
295 emitter.send_order_event(OrderEventAny::Updated(updated));
296 return;
297 }
298 log::debug!("Skipping duplicate Accepted for {client_order_id}");
299 return;
300 }
301
302 state.insert_accepted(client_order_id);
303
304 if let Some(snapshot) = snapshot {
305 state.order_snapshots.insert(client_order_id, snapshot);
306 }
307
308 let accepted = OrderAccepted::new(
309 emitter.trader_id(),
310 identity.strategy_id,
311 identity.instrument_id,
312 client_order_id,
313 venue_order_id,
314 account_id,
315 UUID4::new(),
316 ts_init,
317 ts_init,
318 false,
319 );
320 emitter.send_order_event(OrderEventAny::Accepted(accepted));
321 }
322 BybitOrderStatus::Triggered => {
323 if state.filled_orders.contains(&client_order_id) {
324 log::debug!("Skipping stale Triggered for {client_order_id} (already filled)");
325 return;
326 }
327
328 if !TRIGGERABLE_ORDER_TYPES.contains(&identity.order_type) {
329 log::debug!(
330 "Skipping OrderTriggered for {} order {client_order_id}: market-style stops have no TRIGGERED state",
331 identity.order_type,
332 );
333 return;
334 }
335
336 ensure_accepted_emitted(
337 client_order_id,
338 account_id,
339 venue_order_id,
340 &identity,
341 emitter,
342 state,
343 ts_init,
344 );
345 state.insert_triggered(client_order_id);
346 let triggered = OrderTriggered::new(
347 emitter.trader_id(),
348 identity.strategy_id,
349 identity.instrument_id,
350 client_order_id,
351 UUID4::new(),
352 ts_init,
353 ts_init,
354 false,
355 Some(venue_order_id),
356 Some(account_id),
357 );
358 emitter.send_order_event(OrderEventAny::Triggered(triggered));
359 }
360 BybitOrderStatus::Rejected => {
361 let filled_qty = parse_quantity_with_precision(
362 &order.cum_exec_qty,
363 instrument.size_precision(),
364 "order.cumExecQty",
365 )
366 .unwrap_or_default();
367
368 if filled_qty.is_positive() {
369 ensure_accepted_emitted(
371 client_order_id,
372 account_id,
373 venue_order_id,
374 &identity,
375 emitter,
376 state,
377 ts_init,
378 );
379 let canceled = OrderCanceled::new(
380 emitter.trader_id(),
381 identity.strategy_id,
382 identity.instrument_id,
383 client_order_id,
384 UUID4::new(),
385 ts_init,
386 ts_init,
387 false,
388 Some(venue_order_id),
389 Some(account_id),
390 );
391 cleanup_terminal(client_order_id, state);
392 emitter.send_order_event(OrderEventAny::Canceled(canceled));
393 } else {
394 let reason = if order.reject_reason.is_empty() {
395 Ustr::from("Order rejected by venue")
396 } else {
397 order.reject_reason
398 };
399 state.order_identities.remove(&client_order_id);
400 state.order_snapshots.remove(&client_order_id);
401 emitter.emit_order_rejected_event(
402 identity.strategy_id,
403 identity.instrument_id,
404 client_order_id,
405 reason.as_str(),
406 ts_init,
407 false,
408 );
409 }
410 }
411 BybitOrderStatus::PartiallyFilled => {
412 ensure_accepted_emitted(
415 client_order_id,
416 account_id,
417 venue_order_id,
418 &identity,
419 emitter,
420 state,
421 ts_init,
422 );
423
424 if let Some(snapshot) = parse_order_snapshot(order, instrument)
428 && is_snapshot_updated(&snapshot, &client_order_id, state)
429 {
430 let updated = OrderUpdated::new(
431 emitter.trader_id(),
432 identity.strategy_id,
433 identity.instrument_id,
434 client_order_id,
435 snapshot.quantity,
436 UUID4::new(),
437 ts_init,
438 ts_init,
439 false,
440 Some(venue_order_id),
441 Some(account_id),
442 snapshot.price,
443 snapshot.trigger_price,
444 None,
445 false,
446 );
447 state.order_snapshots.insert(client_order_id, snapshot);
448 emitter.send_order_event(OrderEventAny::Updated(updated));
449 }
450 }
451 BybitOrderStatus::Filled => {
452 ensure_accepted_emitted(
455 client_order_id,
456 account_id,
457 venue_order_id,
458 &identity,
459 emitter,
460 state,
461 ts_init,
462 );
463 }
467 BybitOrderStatus::Canceled
468 | BybitOrderStatus::PartiallyFilledCanceled
469 | BybitOrderStatus::Deactivated => {
470 ensure_accepted_emitted(
471 client_order_id,
472 account_id,
473 venue_order_id,
474 &identity,
475 emitter,
476 state,
477 ts_init,
478 );
479 let canceled = OrderCanceled::new(
480 emitter.trader_id(),
481 identity.strategy_id,
482 identity.instrument_id,
483 client_order_id,
484 UUID4::new(),
485 ts_init,
486 ts_init,
487 false,
488 Some(venue_order_id),
489 Some(account_id),
490 );
491 cleanup_terminal(client_order_id, state);
492 emitter.send_order_event(OrderEventAny::Canceled(canceled));
493 }
494 }
495 } else {
496 match super::parse::parse_ws_order_status_report(order, instrument, account_id, ts_init) {
498 Ok(report) => emitter.send_order_status_report(report),
499 Err(e) => log::error!("Failed to parse order status report: {e}"),
500 }
501 }
502}
503
504fn dispatch_execution_fill(
509 exec: &BybitWsAccountExecution,
510 instrument: &InstrumentAny,
511 emitter: &ExecutionEventEmitter,
512 state: &WsDispatchState,
513 account_id: AccountId,
514 ts_init: UnixNanos,
515) {
516 if exec.exec_type.is_exchange_generated() {
517 log::warn!(
518 "Exchange-generated execution: exec_type={:?}, symbol={}, order_id={}, order_link_id={}, side={:?}, qty={}, price={}",
519 exec.exec_type,
520 exec.symbol,
521 exec.order_id,
522 exec.order_link_id,
523 exec.side,
524 exec.exec_qty,
525 exec.exec_price,
526 );
527 }
528
529 let client_order_id = if exec.order_link_id.is_empty() {
530 None
531 } else {
532 Some(ClientOrderId::new(exec.order_link_id.as_str()))
533 };
534
535 let identity = client_order_id
536 .as_ref()
537 .and_then(|cid| state.order_identities.get(cid).map(|r| r.clone()));
538
539 if let (Some(client_order_id), Some(identity)) = (client_order_id, identity) {
540 let venue_order_id = VenueOrderId::new(exec.order_id.as_str());
541
542 ensure_accepted_emitted(
543 client_order_id,
544 account_id,
545 venue_order_id,
546 &identity,
547 emitter,
548 state,
549 ts_init,
550 );
551
552 match parse_order_filled(exec, instrument, &identity, emitter, account_id, ts_init) {
553 Ok(filled) => {
554 state.insert_filled(client_order_id);
555 state.triggered_orders.remove(&client_order_id);
556 emitter.send_order_event(OrderEventAny::Filled(filled));
557
558 if exec.leaves_qty == "0" {
559 cleanup_terminal(client_order_id, state);
560 }
561 }
562 Err(e) => log::error!("Failed to parse OrderFilled for {client_order_id}: {e}"),
563 }
564 } else {
565 match super::parse::parse_ws_fill_report(exec, account_id, instrument, ts_init) {
567 Ok(report) => emitter.send_fill_report(report),
568 Err(e) => log::error!("Failed to parse fill report: {e}"),
569 }
570 }
571}
572
573fn parse_order_filled(
575 exec: &BybitWsAccountExecution,
576 instrument: &InstrumentAny,
577 identity: &OrderIdentity,
578 emitter: &ExecutionEventEmitter,
579 account_id: AccountId,
580 ts_init: UnixNanos,
581) -> anyhow::Result<OrderFilled> {
582 let client_order_id = ClientOrderId::new(exec.order_link_id.as_str());
583 let venue_order_id = VenueOrderId::new(exec.order_id.as_str());
584 let trade_id =
585 TradeId::new_checked(exec.exec_id.as_str()).context("invalid execId in Bybit execution")?;
586
587 let last_qty = parse_quantity_with_precision(
588 &exec.exec_qty,
589 instrument.size_precision(),
590 "execution.execQty",
591 )?;
592 let last_px = parse_price_with_precision(
593 &exec.exec_price,
594 instrument.price_precision(),
595 "execution.execPrice",
596 )?;
597
598 let liquidity_side = if exec.is_maker {
599 LiquiditySide::Maker
600 } else {
601 LiquiditySide::Taker
602 };
603
604 let fee_decimal: Decimal = exec
605 .exec_fee
606 .parse()
607 .with_context(|| format!("failed to parse execFee='{}'", exec.exec_fee))?;
608 let commission_currency = instrument.quote_currency();
609 let commission = Money::from_decimal(fee_decimal, commission_currency).with_context(|| {
610 format!(
611 "failed to create commission from execFee='{}'",
612 exec.exec_fee
613 )
614 })?;
615
616 let ts_event = parse_millis_timestamp(&exec.exec_time, "execution.execTime")?;
617
618 Ok(OrderFilled::new(
619 emitter.trader_id(),
620 identity.strategy_id,
621 identity.instrument_id,
622 client_order_id,
623 venue_order_id,
624 account_id,
625 trade_id,
626 identity.order_side,
627 identity.order_type,
628 last_qty,
629 last_px,
630 commission_currency,
631 liquidity_side,
632 UUID4::new(),
633 ts_event,
634 ts_init,
635 false,
636 None, Some(commission),
638 ))
639}
640
641fn dispatch_order_response(
643 resp: &super::messages::BybitWsOrderResponse,
644 emitter: &ExecutionEventEmitter,
645 state: &WsDispatchState,
646 ts_init: UnixNanos,
647) {
648 if resp.ret_code == 0 {
649 let pending = resp
651 .req_id
652 .as_ref()
653 .and_then(|rid| state.pending_requests.remove(rid))
654 .map(|(_, v)| v);
655
656 if let Some((cids, voids, pending_op)) = pending {
657 let batch_errors = resp.extract_batch_errors();
658 let data_array = resp.data.as_array();
659
660 for (idx, error) in batch_errors.iter().enumerate() {
661 if error.code == 0 {
662 continue;
663 }
664
665 let cid = data_array
667 .and_then(|arr| arr.get(idx))
668 .and_then(extract_order_link_id_from_data)
669 .or_else(|| cids.get(idx).copied());
670
671 let Some(cid) = cid else {
672 log::warn!(
673 "Batch error at index {idx} without correlation: code={}, msg={}",
674 error.code,
675 error.msg,
676 );
677 continue;
678 };
679
680 let Some(identity) = state.order_identities.get(&cid).map(|r| r.clone()) else {
681 log::warn!(
682 "Batch error for untracked order: client_order_id={cid}, msg={}",
683 error.msg,
684 );
685 continue;
686 };
687
688 let stored_void = voids.get(idx).and_then(|v| *v);
689
690 emit_rejection_for_op(
691 &pending_op,
692 cid,
693 &identity,
694 stored_void,
695 &error.msg,
696 emitter,
697 state,
698 ts_init,
699 );
700 }
701 }
702 return;
703 }
704
705 let pending = resp
707 .req_id
708 .as_ref()
709 .and_then(|rid| state.pending_requests.remove(rid))
710 .map(|(_, v)| v);
711
712 let effective_op = pending
713 .as_ref()
714 .map(|(_, _, op)| *op)
715 .or_else(|| pending_op_from_str(resp.op.as_str()))
716 .unwrap_or_else(|| {
717 log::warn!("Unknown order operation '{}', defaulting to Place", resp.op);
718 PendingOperation::Place
719 });
720
721 if let Some((cids, voids, _)) = &pending
723 && cids.len() > 1
724 {
725 for (idx, cid) in cids.iter().enumerate() {
726 let Some(identity) = state.order_identities.get(cid).map(|r| r.clone()) else {
727 log::warn!(
728 "Batch reject for untracked order: client_order_id={cid}, ret_msg={}",
729 resp.ret_msg,
730 );
731 continue;
732 };
733 let void = voids.get(idx).and_then(|v| *v);
734 emit_rejection_for_op(
735 &effective_op,
736 *cid,
737 &identity,
738 void,
739 &resp.ret_msg,
740 emitter,
741 state,
742 ts_init,
743 );
744 }
745 return;
746 }
747
748 let client_order_id = extract_order_link_id_from_data(&resp.data).or_else(|| {
750 pending
751 .as_ref()
752 .and_then(|(cids, _, _)| cids.first().copied())
753 });
754
755 let stored_venue_order_id = pending
756 .as_ref()
757 .and_then(|(_, voids, _)| voids.first().and_then(|v| *v));
758
759 let Some(client_order_id) = client_order_id else {
760 log::warn!(
761 "Order response error without correlation: op={}, ret_code={}, ret_msg={}, req_id={:?}",
762 resp.op,
763 resp.ret_code,
764 resp.ret_msg,
765 resp.req_id,
766 );
767 return;
768 };
769
770 let Some(identity) = state
771 .order_identities
772 .get(&client_order_id)
773 .map(|r| r.clone())
774 else {
775 log::warn!(
776 "Order response error for untracked order: op={}, client_order_id={client_order_id}, ret_msg={}",
777 resp.op,
778 resp.ret_msg,
779 );
780 return;
781 };
782
783 let venue_order_id = extract_venue_order_id_from_data(&resp.data).or(stored_venue_order_id);
784
785 emit_rejection_for_op(
786 &effective_op,
787 client_order_id,
788 &identity,
789 venue_order_id,
790 &resp.ret_msg,
791 emitter,
792 state,
793 ts_init,
794 );
795}
796
797#[expect(clippy::too_many_arguments)]
799fn emit_rejection_for_op(
800 pending_op: &PendingOperation,
801 client_order_id: ClientOrderId,
802 identity: &OrderIdentity,
803 venue_order_id: Option<VenueOrderId>,
804 reason: &str,
805 emitter: &ExecutionEventEmitter,
806 state: &WsDispatchState,
807 ts_init: UnixNanos,
808) {
809 match pending_op {
810 PendingOperation::Place => {
811 state.order_identities.remove(&client_order_id);
812 emitter.emit_order_rejected_event(
813 identity.strategy_id,
814 identity.instrument_id,
815 client_order_id,
816 reason,
817 ts_init,
818 false,
819 );
820 }
821 PendingOperation::Cancel => {
822 emitter.emit_order_cancel_rejected_event(
823 identity.strategy_id,
824 identity.instrument_id,
825 client_order_id,
826 venue_order_id,
827 reason,
828 ts_init,
829 );
830 }
831 PendingOperation::Amend => {
832 emitter.emit_order_modify_rejected_event(
833 identity.strategy_id,
834 identity.instrument_id,
835 client_order_id,
836 venue_order_id,
837 reason,
838 ts_init,
839 );
840 }
841 }
842}
843
844fn pending_op_from_str(op: &str) -> Option<PendingOperation> {
846 match op {
847 BYBIT_OP_ORDER_CREATE => Some(PendingOperation::Place),
848 BYBIT_OP_ORDER_CANCEL => Some(PendingOperation::Cancel),
849 BYBIT_OP_ORDER_AMEND => Some(PendingOperation::Amend),
850 _ => None,
851 }
852}
853
854fn parse_order_snapshot(
856 order: &BybitWsAccountOrder,
857 instrument: &InstrumentAny,
858) -> Option<OrderStateSnapshot> {
859 let quantity =
860 parse_quantity_with_precision(&order.qty, instrument.size_precision(), "order.qty").ok()?;
861
862 let price = if !order.price.is_empty() && order.price != "0" {
863 parse_price_with_precision(&order.price, instrument.price_precision(), "order.price").ok()
864 } else {
865 None
866 };
867
868 let trigger_price = if !order.trigger_price.is_empty() && order.trigger_price != "0" {
869 parse_price_with_precision(
870 &order.trigger_price,
871 instrument.price_precision(),
872 "order.triggerPrice",
873 )
874 .ok()
875 } else {
876 None
877 };
878
879 Some(OrderStateSnapshot {
880 quantity,
881 price,
882 trigger_price,
883 })
884}
885
886fn is_snapshot_updated(
888 snapshot: &OrderStateSnapshot,
889 client_order_id: &ClientOrderId,
890 state: &WsDispatchState,
891) -> bool {
892 let Some(previous) = state.order_snapshots.get(client_order_id) else {
893 return false;
894 };
895
896 if let (Some(prev_price), Some(new_price)) = (previous.price, snapshot.price)
897 && prev_price != new_price
898 {
899 return true;
900 }
901
902 if let (Some(prev_trigger), Some(new_trigger)) =
903 (previous.trigger_price, snapshot.trigger_price)
904 && prev_trigger != new_trigger
905 {
906 return true;
907 }
908
909 previous.quantity != snapshot.quantity
910}
911
912fn ensure_accepted_emitted(
915 client_order_id: ClientOrderId,
916 account_id: AccountId,
917 venue_order_id: VenueOrderId,
918 identity: &OrderIdentity,
919 emitter: &ExecutionEventEmitter,
920 state: &WsDispatchState,
921 ts_init: UnixNanos,
922) {
923 if state.emitted_accepted.contains(&client_order_id) {
924 return;
925 }
926 state.insert_accepted(client_order_id);
927 let accepted = OrderAccepted::new(
928 emitter.trader_id(),
929 identity.strategy_id,
930 identity.instrument_id,
931 client_order_id,
932 venue_order_id,
933 account_id,
934 UUID4::new(),
935 ts_init,
936 ts_init,
937 false,
938 );
939 emitter.send_order_event(OrderEventAny::Accepted(accepted));
940}
941
942fn cleanup_terminal(client_order_id: ClientOrderId, state: &WsDispatchState) {
944 state.order_identities.remove(&client_order_id);
945 state.order_snapshots.remove(&client_order_id);
946 state.emitted_accepted.remove(&client_order_id);
947 state.triggered_orders.remove(&client_order_id);
948 state.filled_orders.remove(&client_order_id);
949}
950
951fn extract_order_link_id_from_data(data: &serde_json::Value) -> Option<ClientOrderId> {
953 data.get("orderLinkId")
954 .and_then(|v| v.as_str())
955 .filter(|s| !s.is_empty())
956 .map(ClientOrderId::new)
957}
958
959fn extract_venue_order_id_from_data(data: &serde_json::Value) -> Option<VenueOrderId> {
961 data.get("orderId")
962 .and_then(|v| v.as_str())
963 .filter(|s| !s.is_empty())
964 .map(VenueOrderId::new)
965}
966
967#[cfg(test)]
968mod tests {
969 use ahash::AHashMap;
970 use nautilus_common::messages::{ExecutionEvent, execution::ExecutionReport};
971 use nautilus_core::{
972 UnixNanos,
973 time::{AtomicTime, get_atomic_clock_realtime},
974 };
975 use nautilus_live::emitter::ExecutionEventEmitter;
976 use nautilus_model::{
977 enums::{AccountType, OrderSide, OrderType},
978 events::OrderEventAny,
979 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
980 instruments::{Instrument, InstrumentAny},
981 };
982 use rstest::rstest;
983 use ustr::Ustr;
984
985 use super::*;
986 use crate::{
987 common::{parse::parse_linear_instrument, testing::load_test_json},
988 http::models::{BybitFeeRate, BybitInstrumentLinearResponse},
989 websocket::messages::BybitWsMessage,
990 };
991
992 fn sample_fee_rate(
993 symbol: &str,
994 taker: &str,
995 maker: &str,
996 base_coin: Option<&str>,
997 ) -> BybitFeeRate {
998 BybitFeeRate {
999 symbol: Ustr::from(symbol),
1000 taker_fee_rate: taker.to_string(),
1001 maker_fee_rate: maker.to_string(),
1002 base_coin: base_coin.map(Ustr::from),
1003 }
1004 }
1005
1006 fn linear_instrument() -> InstrumentAny {
1007 let json = load_test_json("http_get_instruments_linear.json");
1008 let response: BybitInstrumentLinearResponse = serde_json::from_str(&json).unwrap();
1009 let instrument = &response.result.list[0];
1010 let fee_rate = sample_fee_rate("BTCUSDT", "0.00055", "0.0001", Some("BTC"));
1011 let ts = UnixNanos::new(1_700_000_000_000_000_000);
1012 parse_linear_instrument(instrument, &fee_rate, ts, ts).unwrap()
1013 }
1014
1015 fn build_instruments(instruments: &[InstrumentAny]) -> AHashMap<Ustr, InstrumentAny> {
1016 let mut map = AHashMap::new();
1017 for inst in instruments {
1018 map.insert(inst.id().symbol.inner(), inst.clone());
1019 }
1020 map
1021 }
1022
1023 fn test_account_id() -> AccountId {
1024 AccountId::from("BYBIT-001")
1025 }
1026
1027 fn create_emitter() -> (
1028 ExecutionEventEmitter,
1029 tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1030 ) {
1031 let clock = get_atomic_clock_realtime();
1032 let trader_id = TraderId::from("TESTER-001");
1033 let account_id = test_account_id();
1034 let mut emitter =
1035 ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
1036 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1037 emitter.set_sender(tx);
1038 (emitter, rx)
1039 }
1040
1041 fn default_identity() -> OrderIdentity {
1042 OrderIdentity {
1043 instrument_id: InstrumentId::from("BTCUSDT-LINEAR.BYBIT"),
1044 strategy_id: StrategyId::from("S-001"),
1045 order_side: OrderSide::Buy,
1046 order_type: OrderType::Limit,
1047 }
1048 }
1049
1050 #[rstest]
1051 fn test_dispatch_tracked_canceled_order_emits_accepted_then_canceled() {
1052 let instrument = linear_instrument();
1053 let instruments = build_instruments(std::slice::from_ref(&instrument));
1054 let (emitter, mut rx) = create_emitter();
1055 let clock = get_atomic_clock_realtime();
1056 let state = WsDispatchState::default();
1057
1058 let json = load_test_json("ws_account_order.json");
1060 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1061 serde_json::from_str(&json).unwrap();
1062
1063 if let Some(order) = msg.data.first()
1064 && !order.order_link_id.is_empty()
1065 {
1066 let cid = ClientOrderId::new(order.order_link_id.as_str());
1067 state.order_identities.insert(cid, default_identity());
1068 }
1069
1070 let ws_msg = BybitWsMessage::AccountOrder(msg);
1071 dispatch_ws_message(
1072 &ws_msg,
1073 &emitter,
1074 &state,
1075 test_account_id(),
1076 &instruments,
1077 clock,
1078 );
1079
1080 let event1 = rx.try_recv().unwrap();
1082 assert!(
1083 matches!(event1, ExecutionEvent::Order(OrderEventAny::Accepted(ref a)) if a.strategy_id == StrategyId::from("S-001")),
1084 "Expected Accepted, found {event1:?}"
1085 );
1086
1087 let event2 = rx.try_recv().unwrap();
1089 assert!(
1090 matches!(event2, ExecutionEvent::Order(OrderEventAny::Canceled(_))),
1091 "Expected Canceled, found {event2:?}"
1092 );
1093 }
1094
1095 #[rstest]
1096 fn test_dispatch_untracked_order_emits_report() {
1097 let instrument = linear_instrument();
1098 let instruments = build_instruments(std::slice::from_ref(&instrument));
1099 let (emitter, mut rx) = create_emitter();
1100 let clock = get_atomic_clock_realtime();
1101 let state = WsDispatchState::default();
1102
1103 let json = load_test_json("ws_account_order.json");
1104 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1105 serde_json::from_str(&json).unwrap();
1106
1107 let ws_msg = BybitWsMessage::AccountOrder(msg);
1109 dispatch_ws_message(
1110 &ws_msg,
1111 &emitter,
1112 &state,
1113 test_account_id(),
1114 &instruments,
1115 clock,
1116 );
1117
1118 let event = rx.try_recv().unwrap();
1119 assert!(matches!(
1120 event,
1121 ExecutionEvent::Report(ExecutionReport::Order(_))
1122 ));
1123 }
1124
1125 #[rstest]
1126 fn test_dispatch_tracked_execution_emits_order_filled() {
1127 let instrument = linear_instrument();
1128 let instruments = build_instruments(std::slice::from_ref(&instrument));
1129 let (emitter, mut rx) = create_emitter();
1130 let clock = get_atomic_clock_realtime();
1131 let state = WsDispatchState::default();
1132
1133 let json = load_test_json("ws_account_execution.json");
1134 let msg: crate::websocket::messages::BybitWsAccountExecutionMsg =
1135 serde_json::from_str(&json).unwrap();
1136
1137 if let Some(exec) = msg.data.first()
1139 && !exec.order_link_id.is_empty()
1140 {
1141 let cid = ClientOrderId::new(exec.order_link_id.as_str());
1142 state.order_identities.insert(cid, default_identity());
1143 }
1144
1145 let ws_msg = BybitWsMessage::AccountExecution(msg);
1146 dispatch_ws_message(
1147 &ws_msg,
1148 &emitter,
1149 &state,
1150 test_account_id(),
1151 &instruments,
1152 clock,
1153 );
1154
1155 let event1 = rx.try_recv().unwrap();
1157 assert!(
1158 matches!(event1, ExecutionEvent::Order(OrderEventAny::Accepted(_))),
1159 "Expected Accepted, found {event1:?}"
1160 );
1161
1162 let event2 = rx.try_recv().unwrap();
1164 match event2 {
1165 ExecutionEvent::Order(OrderEventAny::Filled(filled)) => {
1166 assert_eq!(filled.strategy_id, StrategyId::from("S-001"));
1167 assert_eq!(filled.order_side, OrderSide::Buy);
1168 assert_eq!(filled.order_type, OrderType::Limit);
1169 }
1170 other => panic!("Expected Filled event, found {other:?}"),
1171 }
1172 }
1173
1174 #[rstest]
1175 fn test_dispatch_untracked_execution_emits_fill_report() {
1176 let instrument = linear_instrument();
1177 let instruments = build_instruments(std::slice::from_ref(&instrument));
1178 let (emitter, mut rx) = create_emitter();
1179 let clock = get_atomic_clock_realtime();
1180 let state = WsDispatchState::default();
1181
1182 let json = load_test_json("ws_account_execution.json");
1183 let msg: crate::websocket::messages::BybitWsAccountExecutionMsg =
1184 serde_json::from_str(&json).unwrap();
1185
1186 let ws_msg = BybitWsMessage::AccountExecution(msg);
1188 dispatch_ws_message(
1189 &ws_msg,
1190 &emitter,
1191 &state,
1192 test_account_id(),
1193 &instruments,
1194 clock,
1195 );
1196
1197 let event = rx.try_recv().unwrap();
1198 assert!(matches!(
1199 event,
1200 ExecutionEvent::Report(ExecutionReport::Fill(_))
1201 ));
1202 }
1203
1204 #[rstest]
1205 fn test_dispatch_wallet_emits_account_state() {
1206 let instruments = AHashMap::new();
1207 let (emitter, mut rx) = create_emitter();
1208 let clock = get_atomic_clock_realtime();
1209 let state = WsDispatchState::default();
1210
1211 let json = load_test_json("ws_account_wallet.json");
1212 let msg: crate::websocket::messages::BybitWsAccountWalletMsg =
1213 serde_json::from_str(&json).unwrap();
1214 let ws_msg = BybitWsMessage::AccountWallet(msg);
1215
1216 dispatch_ws_message(
1217 &ws_msg,
1218 &emitter,
1219 &state,
1220 test_account_id(),
1221 &instruments,
1222 clock,
1223 );
1224
1225 let event = rx.try_recv().unwrap();
1226 assert!(matches!(event, ExecutionEvent::Account(_)));
1227 }
1228
1229 #[rstest]
1230 fn test_dispatch_data_message_ignored() {
1231 let instruments = AHashMap::new();
1232 let (emitter, mut rx) = create_emitter();
1233 let clock = get_atomic_clock_realtime();
1234 let state = WsDispatchState::default();
1235
1236 let json = load_test_json("ws_public_trade.json");
1237 let msg: crate::websocket::messages::BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
1238 let ws_msg = BybitWsMessage::Trade(msg);
1239
1240 dispatch_ws_message(
1241 &ws_msg,
1242 &emitter,
1243 &state,
1244 test_account_id(),
1245 &instruments,
1246 clock,
1247 );
1248
1249 assert!(rx.try_recv().is_err());
1250 }
1251
1252 #[rstest]
1253 fn test_accepted_dedup_prevents_duplicate() {
1254 let instrument = linear_instrument();
1255 let instruments = build_instruments(std::slice::from_ref(&instrument));
1256 let (emitter, mut rx) = create_emitter();
1257 let clock = get_atomic_clock_realtime();
1258 let state = WsDispatchState::default();
1259
1260 let json = load_test_json("ws_account_order.json");
1262 let mut value: serde_json::Value = serde_json::from_str(&json).unwrap();
1263 value["data"][0]["orderStatus"] = serde_json::Value::String("New".to_string());
1264 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1265 serde_json::from_value(value).unwrap();
1266
1267 if let Some(order) = msg.data.first()
1268 && !order.order_link_id.is_empty()
1269 {
1270 let cid = ClientOrderId::new(order.order_link_id.as_str());
1271 state.order_identities.insert(cid, default_identity());
1272 }
1273
1274 let ws_msg = BybitWsMessage::AccountOrder(msg.clone());
1275 dispatch_ws_message(
1276 &ws_msg,
1277 &emitter,
1278 &state,
1279 test_account_id(),
1280 &instruments,
1281 clock,
1282 );
1283
1284 let event = rx.try_recv().unwrap();
1285 assert!(matches!(
1286 event,
1287 ExecutionEvent::Order(OrderEventAny::Accepted(_))
1288 ));
1289
1290 let ws_msg2 = BybitWsMessage::AccountOrder(msg);
1292 dispatch_ws_message(
1293 &ws_msg2,
1294 &emitter,
1295 &state,
1296 test_account_id(),
1297 &instruments,
1298 clock,
1299 );
1300
1301 assert!(rx.try_recv().is_err());
1302 }
1303
1304 fn new_order_value() -> serde_json::Value {
1305 let json = load_test_json("ws_account_order.json");
1306 let mut value: serde_json::Value = serde_json::from_str(&json).unwrap();
1307 value["data"][0]["orderStatus"] = serde_json::Value::String("New".to_string());
1308 value
1309 }
1310
1311 struct DispatchTestContext {
1312 instruments: AHashMap<Ustr, InstrumentAny>,
1313 emitter: ExecutionEventEmitter,
1314 rx: tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1315 clock: &'static AtomicTime,
1316 state: WsDispatchState,
1317 }
1318
1319 impl DispatchTestContext {
1320 fn new() -> Self {
1321 let instrument = linear_instrument();
1322 let instruments = build_instruments(std::slice::from_ref(&instrument));
1323 let (emitter, rx) = create_emitter();
1324 let clock = get_atomic_clock_realtime();
1325 let state = WsDispatchState::default();
1326 Self {
1327 instruments,
1328 emitter,
1329 rx,
1330 clock,
1331 state,
1332 }
1333 }
1334
1335 fn accept_order(&mut self, value: &serde_json::Value) {
1336 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1337 serde_json::from_value(value.clone()).unwrap();
1338
1339 if let Some(order) = msg.data.first()
1340 && !order.order_link_id.is_empty()
1341 && !self
1342 .state
1343 .order_identities
1344 .contains_key(&ClientOrderId::new(order.order_link_id.as_str()))
1345 {
1346 let cid = ClientOrderId::new(order.order_link_id.as_str());
1347 self.state.order_identities.insert(cid, default_identity());
1348 }
1349
1350 self.dispatch_value(value);
1351
1352 let event = self.rx.try_recv().unwrap();
1353 assert!(
1354 matches!(event, ExecutionEvent::Order(OrderEventAny::Accepted(_))),
1355 "Expected Accepted, found {event:?}"
1356 );
1357 }
1358
1359 fn dispatch_value(&self, value: &serde_json::Value) {
1360 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1361 serde_json::from_value(value.clone()).unwrap();
1362 let ws_msg = BybitWsMessage::AccountOrder(msg);
1363 dispatch_ws_message(
1364 &ws_msg,
1365 &self.emitter,
1366 &self.state,
1367 test_account_id(),
1368 &self.instruments,
1369 self.clock,
1370 );
1371 }
1372
1373 fn recv_updated(&mut self) -> OrderUpdated {
1374 let event = self.rx.try_recv().unwrap();
1375 match event {
1376 ExecutionEvent::Order(OrderEventAny::Updated(updated)) => updated,
1377 other => panic!("Expected Updated event, found {other:?}"),
1378 }
1379 }
1380 }
1381
1382 #[rstest]
1383 fn test_dispatch_order_updated_on_price_change() {
1384 let mut ctx = DispatchTestContext::new();
1385 let value = new_order_value();
1386 ctx.accept_order(&value);
1387
1388 let mut amended = value;
1389 amended["data"][0]["price"] = serde_json::Value::String("31000".to_string());
1390 ctx.dispatch_value(&amended);
1391
1392 let updated = ctx.recv_updated();
1393 assert_eq!(updated.client_order_id, ClientOrderId::from("client-1"));
1394 assert_eq!(updated.price, Some(Price::from("31000.00")));
1395 assert_eq!(updated.quantity, Quantity::from("0.010"));
1396 assert_eq!(updated.trigger_price, None);
1397 assert!(updated.venue_order_id.is_some());
1398 }
1399
1400 #[rstest]
1401 fn test_dispatch_order_updated_on_quantity_change() {
1402 let mut ctx = DispatchTestContext::new();
1403 let value = new_order_value();
1404 ctx.accept_order(&value);
1405
1406 let mut amended = value;
1407 amended["data"][0]["qty"] = serde_json::Value::String("0.020".to_string());
1408 ctx.dispatch_value(&amended);
1409
1410 let updated = ctx.recv_updated();
1411 assert_eq!(updated.quantity, Quantity::from("0.020"));
1412 assert_eq!(updated.price, Some(Price::from("30000.00")));
1413 }
1414
1415 #[rstest]
1416 fn test_dispatch_order_updated_on_trigger_price_change() {
1417 let mut ctx = DispatchTestContext::new();
1418 let mut value = new_order_value();
1419 value["data"][0]["triggerPrice"] = serde_json::Value::String("29000".to_string());
1420 ctx.accept_order(&value);
1421
1422 let mut amended = value;
1423 amended["data"][0]["triggerPrice"] = serde_json::Value::String("28000".to_string());
1424 ctx.dispatch_value(&amended);
1425
1426 let updated = ctx.recv_updated();
1427 assert_eq!(updated.trigger_price, Some(Price::from("28000.00")));
1428 assert_eq!(updated.price, Some(Price::from("30000.00")));
1429 }
1430
1431 #[rstest]
1432 fn test_dispatch_dedup_suppresses_identical_after_snapshot() {
1433 let mut ctx = DispatchTestContext::new();
1434 let value = new_order_value();
1435 ctx.accept_order(&value);
1436
1437 ctx.dispatch_value(&value);
1438
1439 assert!(
1440 ctx.rx.try_recv().is_err(),
1441 "Expected no event for identical redelivery"
1442 );
1443 }
1444
1445 #[rstest]
1446 fn test_dispatch_order_updated_stores_snapshot_for_subsequent_change() {
1447 let mut ctx = DispatchTestContext::new();
1448 let value = new_order_value();
1449 ctx.accept_order(&value);
1450
1451 let mut amended1 = value.clone();
1452 amended1["data"][0]["price"] = serde_json::Value::String("31000".to_string());
1453 ctx.dispatch_value(&amended1);
1454 let _ = ctx.recv_updated();
1455
1456 let mut amended2 = value;
1457 amended2["data"][0]["price"] = serde_json::Value::String("32000".to_string());
1458 ctx.dispatch_value(&amended2);
1459
1460 let updated = ctx.recv_updated();
1461 assert_eq!(updated.price, Some(Price::from("32000.00")));
1462 }
1463
1464 #[rstest]
1465 #[case::price_changed(
1466 Some(Price::from("100.00")),
1467 None,
1468 Quantity::from("1.000"),
1469 Some(Price::from("200.00")),
1470 None,
1471 Quantity::from("1.000"),
1472 true
1473 )]
1474 #[case::trigger_changed(
1475 None,
1476 Some(Price::from("100.00")),
1477 Quantity::from("1.000"),
1478 None,
1479 Some(Price::from("90.00")),
1480 Quantity::from("1.000"),
1481 true
1482 )]
1483 #[case::qty_changed(
1484 Some(Price::from("100.00")),
1485 None,
1486 Quantity::from("1.000"),
1487 Some(Price::from("100.00")),
1488 None,
1489 Quantity::from("2.000"),
1490 true
1491 )]
1492 #[case::no_change(
1493 Some(Price::from("100.00")),
1494 None,
1495 Quantity::from("1.000"),
1496 Some(Price::from("100.00")),
1497 None,
1498 Quantity::from("1.000"),
1499 false
1500 )]
1501 fn test_is_snapshot_updated(
1502 #[case] prev_price: Option<Price>,
1503 #[case] prev_trigger: Option<Price>,
1504 #[case] prev_qty: Quantity,
1505 #[case] new_price: Option<Price>,
1506 #[case] new_trigger: Option<Price>,
1507 #[case] new_qty: Quantity,
1508 #[case] expected: bool,
1509 ) {
1510 let state = WsDispatchState::default();
1511 let cid = ClientOrderId::from("test-1");
1512 state.order_snapshots.insert(
1513 cid,
1514 OrderStateSnapshot {
1515 quantity: prev_qty,
1516 price: prev_price,
1517 trigger_price: prev_trigger,
1518 },
1519 );
1520
1521 let new_snapshot = OrderStateSnapshot {
1522 quantity: new_qty,
1523 price: new_price,
1524 trigger_price: new_trigger,
1525 };
1526 assert_eq!(is_snapshot_updated(&new_snapshot, &cid, &state), expected);
1527 }
1528
1529 #[rstest]
1530 fn test_is_snapshot_updated_no_previous() {
1531 let state = WsDispatchState::default();
1532 let cid = ClientOrderId::from("test-1");
1533
1534 let new_snapshot = OrderStateSnapshot {
1535 quantity: Quantity::from("1.000"),
1536 price: Some(Price::from("100.00")),
1537 trigger_price: None,
1538 };
1539 assert!(!is_snapshot_updated(&new_snapshot, &cid, &state));
1540 }
1541
1542 #[rstest]
1543 #[case::limit_order("30000", "0", Some(Price::from("30000.00")), None)]
1544 #[case::conditional("0", "29000", None, Some(Price::from("29000.00")))]
1545 #[case::both(
1546 "30000",
1547 "29000",
1548 Some(Price::from("30000.00")),
1549 Some(Price::from("29000.00"))
1550 )]
1551 fn test_parse_order_snapshot(
1552 #[case] price: &str,
1553 #[case] trigger: &str,
1554 #[case] expected_price: Option<Price>,
1555 #[case] expected_trigger: Option<Price>,
1556 ) {
1557 let instrument = linear_instrument();
1558 let json = load_test_json("ws_account_order.json");
1559 let mut value: serde_json::Value = serde_json::from_str(&json).unwrap();
1560 value["data"][0]["price"] = serde_json::Value::String(price.to_string());
1561 value["data"][0]["triggerPrice"] = serde_json::Value::String(trigger.to_string());
1562 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1563 serde_json::from_value(value).unwrap();
1564
1565 let snapshot = parse_order_snapshot(&msg.data[0], &instrument).unwrap();
1566 assert_eq!(snapshot.price, expected_price);
1567 assert_eq!(snapshot.trigger_price, expected_trigger);
1568 assert_eq!(snapshot.quantity, Quantity::from("0.010"));
1569 }
1570
1571 #[rstest]
1572 fn test_parse_order_snapshot_invalid_qty_returns_none() {
1573 let instrument = linear_instrument();
1574 let json = load_test_json("ws_account_order.json");
1575 let mut value: serde_json::Value = serde_json::from_str(&json).unwrap();
1576 value["data"][0]["qty"] = serde_json::Value::String(String::new());
1577 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1578 serde_json::from_value(value).unwrap();
1579
1580 assert!(parse_order_snapshot(&msg.data[0], &instrument).is_none());
1581 }
1582
1583 #[rstest]
1584 fn test_dispatch_order_updated_on_partially_filled_price_change() {
1585 let mut ctx = DispatchTestContext::new();
1586 let value = new_order_value();
1587 ctx.accept_order(&value);
1588
1589 let mut amended = value;
1590 amended["data"][0]["orderStatus"] =
1591 serde_json::Value::String("PartiallyFilled".to_string());
1592 amended["data"][0]["cumExecQty"] = serde_json::Value::String("0.005".to_string());
1593 amended["data"][0]["price"] = serde_json::Value::String("31000".to_string());
1594 ctx.dispatch_value(&amended);
1595
1596 let updated = ctx.recv_updated();
1597 assert_eq!(updated.client_order_id, ClientOrderId::from("client-1"));
1598 assert_eq!(updated.price, Some(Price::from("31000.00")));
1599 }
1600}