1use std::convert::TryFrom;
19
20use anyhow::Context;
21use nautilus_core::{datetime::NANOSECONDS_IN_MILLISECOND, nanos::UnixNanos, uuid::UUID4};
22use nautilus_model::{
23 data::{
24 Bar, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate,
25 OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, greeks::OptionGreekValues,
26 option_chain::OptionGreeks,
27 },
28 enums::{
29 AccountType, AggressorSide, BookAction, GreeksConvention, LiquiditySide, OrderSide,
30 OrderStatus, PositionSideSpecified, RecordFlag, TimeInForce, TriggerType,
31 },
32 events::account::state::AccountState,
33 identifiers::{AccountId, ClientOrderId, InstrumentId, TradeId, VenueOrderId},
34 instruments::{Instrument, any::InstrumentAny},
35 reports::{FillReport, OrderStatusReport, PositionStatusReport},
36 types::{AccountBalance, MarginBalance, Money, Price, Quantity},
37};
38use rust_decimal::Decimal;
39
40use super::{
41 enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
42 messages::{
43 BybitWsAccountExecution, BybitWsAccountOrder, BybitWsAccountPosition, BybitWsAccountWallet,
44 BybitWsAuthResponse, BybitWsFrame, BybitWsKline, BybitWsOrderResponse,
45 BybitWsOrderbookDepthMsg, BybitWsResponse, BybitWsSubscriptionMsg, BybitWsTickerLinear,
46 BybitWsTickerLinearMsg, BybitWsTickerOptionMsg, BybitWsTrade,
47 },
48};
49use crate::common::{
50 enums::{BybitOrderStatus, BybitPositionSide, BybitTimeInForce},
51 parse::{
52 get_currency, parse_book_level, parse_bybit_order_type, parse_millis_timestamp,
53 parse_price_with_precision, parse_quantity_with_precision,
54 },
55};
56
57pub fn parse_bybit_ws_frame(value: serde_json::Value) -> BybitWsFrame {
61 if let Some(op_val) = value.get("op") {
62 if let Ok(op) = serde_json::from_value::<BybitWsOperation>(op_val.clone())
63 && op == BybitWsOperation::Auth
64 && let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
65 {
66 let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
67 if is_success {
68 return BybitWsFrame::Auth(auth);
69 }
70 let resp = BybitWsResponse {
71 op: Some(auth.op.clone()),
72 topic: None,
73 success: auth.success,
74 conn_id: auth.conn_id.clone(),
75 req_id: None,
76 ret_code: auth.ret_code,
77 ret_msg: auth.ret_msg,
78 };
79 return BybitWsFrame::ErrorResponse(resp);
80 }
81
82 if let Some(op_str) = op_val.as_str()
83 && op_str.starts_with("order.")
84 {
85 return serde_json::from_value::<BybitWsOrderResponse>(value.clone()).map_or_else(
86 |_| BybitWsFrame::Unknown(value),
87 BybitWsFrame::OrderResponse,
88 );
89 }
90 }
91
92 if let Some(success) = value.get("success").and_then(serde_json::Value::as_bool) {
93 if success {
94 return serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone())
95 .map_or_else(|_| BybitWsFrame::Unknown(value), BybitWsFrame::Subscription);
96 }
97 return serde_json::from_value::<BybitWsResponse>(value.clone()).map_or_else(
98 |_| BybitWsFrame::Unknown(value),
99 BybitWsFrame::ErrorResponse,
100 );
101 }
102
103 if let Some(topic) = value.get("topic").and_then(serde_json::Value::as_str) {
104 if topic.starts_with(BybitWsPublicChannel::OrderBook.as_ref()) {
105 return serde_json::from_value(value.clone())
106 .map_or_else(|_| BybitWsFrame::Unknown(value), BybitWsFrame::Orderbook);
107 }
108
109 if topic.contains(BybitWsPublicChannel::PublicTrade.as_ref())
110 || topic.starts_with(BybitWsPublicChannel::Trade.as_ref())
111 {
112 return serde_json::from_value(value.clone())
113 .map_or_else(|_| BybitWsFrame::Unknown(value), BybitWsFrame::Trade);
114 }
115
116 if topic.starts_with(BybitWsPublicChannel::Kline.as_ref()) {
117 return serde_json::from_value(value.clone())
118 .map_or_else(|_| BybitWsFrame::Unknown(value), BybitWsFrame::Kline);
119 }
120
121 if topic.starts_with(BybitWsPublicChannel::Tickers.as_ref()) {
122 let is_option = value
124 .get("data")
125 .and_then(|d| d.get("symbol"))
126 .and_then(|s| s.as_str())
127 .is_some_and(|symbol| symbol.contains('-') && symbol.matches('-').count() >= 3);
128
129 if is_option {
130 return serde_json::from_value(value.clone())
131 .map_or_else(|_| BybitWsFrame::Unknown(value), BybitWsFrame::TickerOption);
132 }
133 return serde_json::from_value(value.clone())
134 .map_or_else(|_| BybitWsFrame::Unknown(value), BybitWsFrame::TickerLinear);
135 }
136
137 if topic.starts_with(BybitWsPrivateChannel::Order.as_ref()) {
138 return serde_json::from_value(value.clone())
139 .map_or_else(|_| BybitWsFrame::Unknown(value), BybitWsFrame::AccountOrder);
140 }
141
142 if topic.starts_with(BybitWsPrivateChannel::Execution.as_ref()) {
143 return serde_json::from_value(value.clone()).map_or_else(
144 |_| BybitWsFrame::Unknown(value),
145 BybitWsFrame::AccountExecution,
146 );
147 }
148
149 if topic.starts_with(BybitWsPrivateChannel::Wallet.as_ref()) {
150 return serde_json::from_value(value.clone()).map_or_else(
151 |_| BybitWsFrame::Unknown(value),
152 BybitWsFrame::AccountWallet,
153 );
154 }
155
156 if topic.starts_with(BybitWsPrivateChannel::Position.as_ref()) {
157 return serde_json::from_value(value.clone()).map_or_else(
158 |_| BybitWsFrame::Unknown(value),
159 BybitWsFrame::AccountPosition,
160 );
161 }
162 }
163
164 BybitWsFrame::Unknown(value)
165}
166
167pub fn parse_topic(topic: &str) -> anyhow::Result<Vec<&str>> {
173 let parts: Vec<&str> = topic.split('.').collect();
174 if parts.is_empty() {
175 anyhow::bail!("Invalid topic format: empty topic");
176 }
177 Ok(parts)
178}
179
180pub fn parse_kline_topic(topic: &str) -> anyhow::Result<(&str, &str)> {
188 let kline = BybitWsPublicChannel::Kline.as_ref();
189 let parts = parse_topic(topic)?;
190 if parts.len() != 3 || parts[0] != kline {
191 anyhow::bail!(
192 "Invalid kline topic format: expected '{kline}.{{interval}}.{{symbol}}', was '{topic}'"
193 );
194 }
195 Ok((parts[1], parts[2]))
196}
197
198pub fn parse_ws_trade_tick(
200 trade: &BybitWsTrade,
201 instrument: &InstrumentAny,
202 ts_init: UnixNanos,
203) -> anyhow::Result<TradeTick> {
204 let price = parse_price_with_precision(&trade.p, instrument.price_precision(), "trade.p")?;
205 let size = parse_quantity_with_precision(&trade.v, instrument.size_precision(), "trade.v")?;
206 let aggressor: AggressorSide = trade.taker_side.into();
207 let trade_id = TradeId::new_checked(trade.i.as_str())
208 .context("invalid trade identifier in Bybit trade message")?;
209 let ts_event = parse_millis_i64(trade.t, "trade.T")?;
210
211 TradeTick::new_checked(
212 instrument.id(),
213 price,
214 size,
215 aggressor,
216 trade_id,
217 ts_event,
218 ts_init,
219 )
220 .context("failed to construct TradeTick from Bybit trade message")
221}
222
223pub fn parse_orderbook_deltas(
225 msg: &BybitWsOrderbookDepthMsg,
226 instrument: &InstrumentAny,
227 ts_init: UnixNanos,
228) -> anyhow::Result<OrderBookDeltas> {
229 let is_snapshot = msg.msg_type.eq_ignore_ascii_case("snapshot");
230 let ts_event = parse_millis_i64(msg.ts, "orderbook.ts")?;
231 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
232
233 let depth = &msg.data;
234 let instrument_id = instrument.id();
235 let price_precision = instrument.price_precision();
236 let size_precision = instrument.size_precision();
237 let update_id = u64::try_from(depth.u)
238 .context("received negative update id in Bybit order book message")?;
239 let sequence = u64::try_from(depth.seq)
240 .context("received negative sequence in Bybit order book message")?;
241
242 let total_levels = depth.b.len() + depth.a.len();
243 let capacity = if is_snapshot {
244 total_levels + 1
245 } else {
246 total_levels
247 };
248 let mut deltas = Vec::with_capacity(capacity);
249
250 if is_snapshot {
251 deltas.push(OrderBookDelta::clear(
252 instrument_id,
253 sequence,
254 ts_event,
255 ts_init,
256 ));
257 }
258 let mut processed = 0_usize;
259
260 let mut push_level = |values: &[String], side: OrderSide| -> anyhow::Result<()> {
261 let (price, size) = parse_book_level(values, price_precision, size_precision, "orderbook")?;
262 let action = if size.is_zero() {
263 BookAction::Delete
264 } else if is_snapshot {
265 BookAction::Add
266 } else {
267 BookAction::Update
268 };
269
270 processed += 1;
271 let mut flags = RecordFlag::F_MBP as u8;
272
273 if processed == total_levels {
274 flags |= RecordFlag::F_LAST as u8;
275 }
276
277 let order = BookOrder::new(side, price, size, update_id);
278 let delta = OrderBookDelta::new_checked(
279 instrument_id,
280 action,
281 order,
282 flags,
283 sequence,
284 ts_event,
285 ts_init,
286 )
287 .context("failed to construct OrderBookDelta from Bybit book level")?;
288 deltas.push(delta);
289 Ok(())
290 };
291
292 for level in &depth.b {
293 push_level(level, OrderSide::Buy)?;
294 }
295
296 for level in &depth.a {
297 push_level(level, OrderSide::Sell)?;
298 }
299
300 if total_levels == 0
301 && let Some(last) = deltas.last_mut()
302 {
303 last.flags |= RecordFlag::F_LAST as u8;
304 }
305
306 OrderBookDeltas::new_checked(instrument_id, deltas)
307 .context("failed to assemble OrderBookDeltas from Bybit message")
308}
309
310pub fn parse_orderbook_quote(
312 msg: &BybitWsOrderbookDepthMsg,
313 instrument: &InstrumentAny,
314 last_quote: Option<&QuoteTick>,
315 ts_init: UnixNanos,
316) -> anyhow::Result<QuoteTick> {
317 let ts_event = parse_millis_i64(msg.ts, "orderbook.ts")?;
318 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
319 let price_precision = instrument.price_precision();
320 let size_precision = instrument.size_precision();
321
322 let get_best =
323 |levels: &[Vec<String>], label: &str| -> anyhow::Result<Option<(Price, Quantity)>> {
324 if let Some(values) = levels.first() {
325 parse_book_level(values, price_precision, size_precision, label).map(Some)
326 } else {
327 Ok(None)
328 }
329 };
330
331 let bids = get_best(&msg.data.b, "bid")?;
332 let asks = get_best(&msg.data.a, "ask")?;
333
334 let (bid_price, bid_size) = match (bids, last_quote) {
335 (Some(level), _) => level,
336 (None, Some(prev)) => (prev.bid_price, prev.bid_size),
337 (None, None) => {
338 anyhow::bail!(
339 "Bybit order book update missing bid levels and no previous quote provided"
340 );
341 }
342 };
343
344 let (ask_price, ask_size) = match (asks, last_quote) {
345 (Some(level), _) => level,
346 (None, Some(prev)) => (prev.ask_price, prev.ask_size),
347 (None, None) => {
348 anyhow::bail!(
349 "Bybit order book update missing ask levels and no previous quote provided"
350 );
351 }
352 };
353
354 QuoteTick::new_checked(
355 instrument.id(),
356 bid_price,
357 ask_price,
358 bid_size,
359 ask_size,
360 ts_event,
361 ts_init,
362 )
363 .context("failed to construct QuoteTick from Bybit order book message")
364}
365
366pub fn parse_ticker_linear_quote(
368 msg: &BybitWsTickerLinearMsg,
369 instrument: &InstrumentAny,
370 ts_init: UnixNanos,
371) -> anyhow::Result<QuoteTick> {
372 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")?;
373 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
374 let price_precision = instrument.price_precision();
375 let size_precision = instrument.size_precision();
376
377 let data = &msg.data;
378 let bid_price = data
379 .bid1_price
380 .as_ref()
381 .context("Bybit ticker message missing bid1Price")?
382 .as_str();
383 let ask_price = data
384 .ask1_price
385 .as_ref()
386 .context("Bybit ticker message missing ask1Price")?
387 .as_str();
388
389 let bid_price = parse_price_with_precision(bid_price, price_precision, "ticker.bid1Price")?;
390 let ask_price = parse_price_with_precision(ask_price, price_precision, "ticker.ask1Price")?;
391
392 let bid_size_str = data.bid1_size.as_deref().unwrap_or("0");
393 let ask_size_str = data.ask1_size.as_deref().unwrap_or("0");
394
395 let bid_size = parse_quantity_with_precision(bid_size_str, size_precision, "ticker.bid1Size")?;
396 let ask_size = parse_quantity_with_precision(ask_size_str, size_precision, "ticker.ask1Size")?;
397
398 QuoteTick::new_checked(
399 instrument.id(),
400 bid_price,
401 ask_price,
402 bid_size,
403 ask_size,
404 ts_event,
405 ts_init,
406 )
407 .context("failed to construct QuoteTick from Bybit linear ticker message")
408}
409
410pub fn parse_ticker_option_quote(
412 msg: &BybitWsTickerOptionMsg,
413 instrument: &InstrumentAny,
414 ts_init: UnixNanos,
415) -> anyhow::Result<QuoteTick> {
416 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")?;
417 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
418 let price_precision = instrument.price_precision();
419 let size_precision = instrument.size_precision();
420
421 let data = &msg.data;
422 let bid_price =
423 parse_price_with_precision(&data.bid_price, price_precision, "ticker.bidPrice")?;
424 let ask_price =
425 parse_price_with_precision(&data.ask_price, price_precision, "ticker.askPrice")?;
426 let bid_size = parse_quantity_with_precision(&data.bid_size, size_precision, "ticker.bidSize")?;
427 let ask_size = parse_quantity_with_precision(&data.ask_size, size_precision, "ticker.askSize")?;
428
429 QuoteTick::new_checked(
430 instrument.id(),
431 bid_price,
432 ask_price,
433 bid_size,
434 ask_size,
435 ts_event,
436 ts_init,
437 )
438 .context("failed to construct QuoteTick from Bybit option ticker message")
439}
440
441pub fn parse_ticker_linear_funding(
447 data: &BybitWsTickerLinear,
448 instrument_id: InstrumentId,
449 ts_event: UnixNanos,
450 ts_init: UnixNanos,
451) -> anyhow::Result<FundingRateUpdate> {
452 let funding_rate_str = data
453 .funding_rate
454 .as_ref()
455 .context("Bybit ticker missing funding_rate")?;
456
457 let funding_rate = funding_rate_str
458 .as_str()
459 .parse::<Decimal>()
460 .context("invalid funding_rate value")?;
461
462 let funding_interval = if let Some(funding_interval_hour) = &data.funding_interval_hour {
463 let funding_interval_hour = funding_interval_hour
464 .as_str()
465 .parse::<u16>()
466 .context("invalid funding_interval_hour value")?;
467 Some(
468 funding_interval_hour
469 .checked_mul(60)
470 .ok_or_else(|| anyhow::anyhow!("funding_interval_hour out of bounds"))?,
471 )
472 } else {
473 None
474 };
475
476 let next_funding_ns = if let Some(next_funding_time) = &data.next_funding_time {
477 let next_funding_millis = next_funding_time
478 .as_str()
479 .parse::<i64>()
480 .context("invalid next_funding_time value")?;
481 Some(parse_millis_i64(next_funding_millis, "next_funding_time")?)
482 } else {
483 None
484 };
485
486 Ok(FundingRateUpdate::new(
487 instrument_id,
488 funding_rate,
489 funding_interval,
490 next_funding_ns,
491 ts_event,
492 ts_init,
493 ))
494}
495
496pub fn parse_ticker_linear_mark_price(
502 data: &BybitWsTickerLinear,
503 instrument: &InstrumentAny,
504 ts_event: UnixNanos,
505 ts_init: UnixNanos,
506) -> anyhow::Result<MarkPriceUpdate> {
507 let mark_price_str = data
508 .mark_price
509 .as_ref()
510 .context("Bybit ticker missing mark_price")?;
511
512 let price =
513 parse_price_with_precision(mark_price_str, instrument.price_precision(), "mark_price")?;
514
515 Ok(MarkPriceUpdate::new(
516 instrument.id(),
517 price,
518 ts_event,
519 ts_init,
520 ))
521}
522
523pub fn parse_ticker_linear_index_price(
529 data: &BybitWsTickerLinear,
530 instrument: &InstrumentAny,
531 ts_event: UnixNanos,
532 ts_init: UnixNanos,
533) -> anyhow::Result<IndexPriceUpdate> {
534 let index_price_str = data
535 .index_price
536 .as_ref()
537 .context("Bybit ticker missing index_price")?;
538
539 let price =
540 parse_price_with_precision(index_price_str, instrument.price_precision(), "index_price")?;
541
542 Ok(IndexPriceUpdate::new(
543 instrument.id(),
544 price,
545 ts_event,
546 ts_init,
547 ))
548}
549
550pub fn parse_ticker_option_mark_price(
556 msg: &BybitWsTickerOptionMsg,
557 instrument: &InstrumentAny,
558 ts_init: UnixNanos,
559) -> anyhow::Result<MarkPriceUpdate> {
560 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")?;
561
562 let price = parse_price_with_precision(
563 &msg.data.mark_price,
564 instrument.price_precision(),
565 "mark_price",
566 )?;
567
568 Ok(MarkPriceUpdate::new(
569 instrument.id(),
570 price,
571 ts_event,
572 ts_init,
573 ))
574}
575
576pub fn parse_ticker_option_index_price(
582 msg: &BybitWsTickerOptionMsg,
583 instrument: &InstrumentAny,
584 ts_init: UnixNanos,
585) -> anyhow::Result<IndexPriceUpdate> {
586 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")?;
587
588 let price = parse_price_with_precision(
589 &msg.data.index_price,
590 instrument.price_precision(),
591 "index_price",
592 )?;
593
594 Ok(IndexPriceUpdate::new(
595 instrument.id(),
596 price,
597 ts_event,
598 ts_init,
599 ))
600}
601
602pub fn parse_ticker_option_greeks(
608 msg: &BybitWsTickerOptionMsg,
609 instrument: &InstrumentAny,
610 ts_init: UnixNanos,
611) -> anyhow::Result<OptionGreeks> {
612 let ts_event = parse_millis_i64(msg.ts, "ticker.ts")?;
613
614 let delta: f64 = msg.data.delta.parse().context("invalid delta")?;
615 let gamma: f64 = msg.data.gamma.parse().context("invalid gamma")?;
616 let vega: f64 = msg.data.vega.parse().context("invalid vega")?;
617 let theta: f64 = msg.data.theta.parse().context("invalid theta")?;
618
619 let bid_iv: f64 = msg.data.bid_iv.parse().context("invalid bid_iv")?;
620 let ask_iv: f64 = msg.data.ask_iv.parse().context("invalid ask_iv")?;
621 let mark_iv: f64 = msg
622 .data
623 .mark_price_iv
624 .parse()
625 .context("invalid mark_price_iv")?;
626 let underlying_price: f64 = msg
627 .data
628 .underlying_price
629 .parse()
630 .context("invalid underlying_price")?;
631 let open_interest: f64 = msg
632 .data
633 .open_interest
634 .parse()
635 .context("invalid open_interest")?;
636
637 Ok(OptionGreeks {
638 instrument_id: instrument.id(),
639 convention: GreeksConvention::BlackScholes,
640 greeks: OptionGreekValues {
641 delta,
642 gamma,
643 vega,
644 theta,
645 rho: 0.0, },
647 mark_iv: Some(mark_iv),
648 bid_iv: Some(bid_iv),
649 ask_iv: Some(ask_iv),
650 underlying_price: Some(underlying_price),
651 open_interest: Some(open_interest),
652 ts_event,
653 ts_init,
654 })
655}
656
657pub(crate) fn parse_millis_i64(value: i64, field: &str) -> anyhow::Result<UnixNanos> {
658 if value < 0 {
659 Err(anyhow::anyhow!("{field} must be non-negative, was {value}"))
660 } else {
661 let nanos = (value as u64)
662 .checked_mul(NANOSECONDS_IN_MILLISECOND)
663 .ok_or_else(|| anyhow::anyhow!("millisecond timestamp overflowed"))?;
664 Ok(UnixNanos::from(nanos))
665 }
666}
667
668pub fn parse_ws_kline_bar(
674 kline: &BybitWsKline,
675 instrument: &InstrumentAny,
676 bar_type: BarType,
677 timestamp_on_close: bool,
678 ts_init: UnixNanos,
679) -> anyhow::Result<Bar> {
680 let price_precision = instrument.price_precision();
681 let size_precision = instrument.size_precision();
682
683 let open = parse_price_with_precision(&kline.open, price_precision, "kline.open")?;
684 let high = parse_price_with_precision(&kline.high, price_precision, "kline.high")?;
685 let low = parse_price_with_precision(&kline.low, price_precision, "kline.low")?;
686 let close = parse_price_with_precision(&kline.close, price_precision, "kline.close")?;
687 let volume = parse_quantity_with_precision(&kline.volume, size_precision, "kline.volume")?;
688
689 let mut ts_event = parse_millis_i64(kline.start, "kline.start")?;
690
691 if timestamp_on_close {
692 let interval_ns = bar_type
693 .spec()
694 .timedelta()
695 .num_nanoseconds()
696 .context("bar specification produced non-integer interval")?;
697 let interval_ns = u64::try_from(interval_ns)
698 .context("bar interval overflowed the u64 range for nanoseconds")?;
699 let updated = ts_event
700 .as_u64()
701 .checked_add(interval_ns)
702 .context("bar timestamp overflowed when adjusting to close time")?;
703 ts_event = UnixNanos::from(updated);
704 }
705 let ts_init = if ts_init.is_zero() { ts_event } else { ts_init };
706
707 Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
708 .context("failed to construct Bar from Bybit WebSocket kline")
709}
710
711pub fn parse_ws_order_status_report(
717 order: &BybitWsAccountOrder,
718 instrument: &InstrumentAny,
719 account_id: AccountId,
720 ts_init: UnixNanos,
721) -> anyhow::Result<OrderStatusReport> {
722 let instrument_id = instrument.id();
723 let venue_order_id = VenueOrderId::new(order.order_id.as_str());
724 let order_side: OrderSide = order.side.into();
725
726 let order_type = parse_bybit_order_type(
727 order.order_type,
728 order.stop_order_type,
729 order.trigger_direction,
730 order.side,
731 );
732
733 let time_in_force: TimeInForce = match order.time_in_force {
734 BybitTimeInForce::Gtc => TimeInForce::Gtc,
735 BybitTimeInForce::Ioc => TimeInForce::Ioc,
736 BybitTimeInForce::Fok => TimeInForce::Fok,
737 BybitTimeInForce::PostOnly => TimeInForce::Gtc,
738 };
739
740 let quantity =
741 parse_quantity_with_precision(&order.qty, instrument.size_precision(), "order.qty")?;
742
743 let filled_qty = parse_quantity_with_precision(
744 &order.cum_exec_qty,
745 instrument.size_precision(),
746 "order.cumExecQty",
747 )?;
748
749 let order_status: OrderStatus = match order.order_status {
755 BybitOrderStatus::Created | BybitOrderStatus::New | BybitOrderStatus::Untriggered => {
756 OrderStatus::Accepted
757 }
758 BybitOrderStatus::Rejected => {
759 if filled_qty.is_positive() {
760 OrderStatus::Canceled
761 } else {
762 OrderStatus::Rejected
763 }
764 }
765 BybitOrderStatus::PartiallyFilled => OrderStatus::PartiallyFilled,
766 BybitOrderStatus::Filled => OrderStatus::Filled,
767 BybitOrderStatus::Canceled | BybitOrderStatus::PartiallyFilledCanceled => {
768 OrderStatus::Canceled
769 }
770 BybitOrderStatus::Triggered => OrderStatus::Triggered,
771 BybitOrderStatus::Deactivated => OrderStatus::Canceled,
772 };
773
774 let ts_accepted = parse_millis_timestamp(&order.created_time, "order.createdTime")?;
775 let ts_last = parse_millis_timestamp(&order.updated_time, "order.updatedTime")?;
776
777 let mut report = OrderStatusReport::new(
778 account_id,
779 instrument_id,
780 None,
781 venue_order_id,
782 order_side,
783 order_type,
784 time_in_force,
785 order_status,
786 quantity,
787 filled_qty,
788 ts_accepted,
789 ts_last,
790 ts_init,
791 Some(UUID4::new()),
792 );
793
794 if !order.order_link_id.is_empty() {
795 report = report.with_client_order_id(ClientOrderId::new(order.order_link_id.as_str()));
796 }
797
798 if !order.price.is_empty() && order.price != "0" {
799 let price =
800 parse_price_with_precision(&order.price, instrument.price_precision(), "order.price")?;
801 report = report.with_price(price);
802 }
803
804 if !order.avg_price.is_empty() && order.avg_price != "0" {
805 let avg_px = order
806 .avg_price
807 .parse::<f64>()
808 .with_context(|| format!("Failed to parse avg_price='{}' as f64", order.avg_price))?;
809 report = report.with_avg_px(avg_px)?;
810 }
811
812 if !order.trigger_price.is_empty() && order.trigger_price != "0" {
813 let trigger_price = parse_price_with_precision(
814 &order.trigger_price,
815 instrument.price_precision(),
816 "order.triggerPrice",
817 )?;
818 report = report.with_trigger_price(trigger_price);
819
820 let trigger_type: TriggerType = order.trigger_by.into();
822 report = report.with_trigger_type(trigger_type);
823 }
824
825 if order.reduce_only {
829 report = report.with_reduce_only(true);
830 }
831
832 if order.time_in_force == BybitTimeInForce::PostOnly {
833 report = report.with_post_only(true);
834 }
835
836 if !order.reject_reason.is_empty() {
837 report = report.with_cancel_reason(order.reject_reason.to_string());
838 }
839
840 Ok(report)
841}
842
843pub fn parse_ws_fill_report(
849 execution: &BybitWsAccountExecution,
850 account_id: AccountId,
851 instrument: &InstrumentAny,
852 ts_init: UnixNanos,
853) -> anyhow::Result<FillReport> {
854 let instrument_id = instrument.id();
855 let venue_order_id = VenueOrderId::new(execution.order_id.as_str());
856 let trade_id = TradeId::new_checked(execution.exec_id.as_str())
857 .context("invalid execId in Bybit WebSocket execution payload")?;
858
859 let order_side: OrderSide = execution.side.into();
860 let last_qty = parse_quantity_with_precision(
861 &execution.exec_qty,
862 instrument.size_precision(),
863 "execution.execQty",
864 )?;
865 let last_px = parse_price_with_precision(
866 &execution.exec_price,
867 instrument.price_precision(),
868 "execution.execPrice",
869 )?;
870
871 let liquidity_side = if execution.is_maker {
872 LiquiditySide::Maker
873 } else {
874 LiquiditySide::Taker
875 };
876
877 let fee_decimal: Decimal = execution
878 .exec_fee
879 .parse()
880 .with_context(|| format!("Failed to parse execFee='{}'", execution.exec_fee))?;
881
882 let commission_currency = instrument.quote_currency();
883 let commission = Money::from_decimal(fee_decimal, commission_currency).with_context(|| {
884 format!(
885 "Failed to create commission from execFee='{}'",
886 execution.exec_fee
887 )
888 })?;
889 let ts_event = parse_millis_timestamp(&execution.exec_time, "execution.execTime")?;
890
891 let client_order_id = if execution.order_link_id.is_empty() {
892 None
893 } else {
894 Some(ClientOrderId::new(execution.order_link_id.as_str()))
895 };
896
897 Ok(FillReport::new(
898 account_id,
899 instrument_id,
900 venue_order_id,
901 trade_id,
902 order_side,
903 last_qty,
904 last_px,
905 commission,
906 liquidity_side,
907 client_order_id,
908 None, ts_event,
910 ts_init,
911 None, ))
913}
914
915pub fn parse_ws_position_status_report(
921 position: &BybitWsAccountPosition,
922 account_id: AccountId,
923 instrument: &InstrumentAny,
924 ts_init: UnixNanos,
925) -> anyhow::Result<PositionStatusReport> {
926 let instrument_id = instrument.id();
927
928 let quantity = parse_quantity_with_precision(
930 &position.size,
931 instrument.size_precision(),
932 "position.size",
933 )?;
934
935 let position_side = match position.side {
936 BybitPositionSide::Buy => PositionSideSpecified::Long,
937 BybitPositionSide::Sell => PositionSideSpecified::Short,
938 BybitPositionSide::Flat => PositionSideSpecified::Flat,
939 };
940
941 if position.adl_rank_indicator >= 4 {
945 log::warn!(
946 "Elevated ADL risk: {} position size={} adl_rank={}",
947 instrument_id,
948 position.size,
949 position.adl_rank_indicator,
950 );
951 }
952
953 let ts_last = parse_millis_timestamp(&position.updated_time, "position.updatedTime")?;
954
955 Ok(PositionStatusReport::new(
956 account_id,
957 instrument_id,
958 position_side,
959 quantity,
960 ts_last,
961 ts_init,
962 None, None, position.entry_price, ))
966}
967
968pub fn parse_ws_account_state(
974 wallet: &BybitWsAccountWallet,
975 account_id: AccountId,
976 ts_event: UnixNanos,
977 ts_init: UnixNanos,
978) -> anyhow::Result<AccountState> {
979 let mut balances = Vec::new();
980 let mut margins = Vec::new();
981
982 for coin_data in &wallet.coin {
983 let currency = get_currency(coin_data.coin.as_str());
984 let total_dec = coin_data.wallet_balance - coin_data.spot_borrow;
985 let locked_dec = coin_data.total_order_im + coin_data.total_position_im;
986
987 balances.push(AccountBalance::from_total_and_locked(
988 total_dec, locked_dec, currency,
989 )?);
990
991 let initial_margin_dec = coin_data.total_position_im + coin_data.total_order_im;
994 let maintenance_margin_dec = match &coin_data.total_position_mm {
995 Some(mm) if !mm.is_empty() => mm.parse::<Decimal>()?,
996 _ => Decimal::ZERO,
997 };
998
999 if !initial_margin_dec.is_zero() || !maintenance_margin_dec.is_zero() {
1000 margins.push(MarginBalance::new(
1001 Money::from_decimal(initial_margin_dec, currency)?,
1002 Money::from_decimal(maintenance_margin_dec, currency)?,
1003 None,
1004 ));
1005 }
1006 }
1007
1008 Ok(AccountState::new(
1009 account_id,
1010 AccountType::Margin, balances,
1012 margins,
1013 true, UUID4::new(),
1015 ts_event,
1016 ts_init,
1017 None, ))
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023 use nautilus_model::{
1024 data::BarSpecification,
1025 enums::{
1026 AggregationSource, BarAggregation, OrderType, PositionSide, PriceType, TriggerType,
1027 },
1028 };
1029 use rstest::rstest;
1030 use rust_decimal_macros::dec;
1031
1032 use super::*;
1033 use crate::{
1034 common::{
1035 enums::BybitExecType,
1036 parse::{parse_linear_instrument, parse_option_instrument},
1037 testing::load_test_json,
1038 },
1039 http::models::{BybitInstrumentLinearResponse, BybitInstrumentOptionResponse},
1040 websocket::messages::{
1041 BybitWsOrderbookDepthMsg, BybitWsTickerLinearMsg, BybitWsTickerOptionMsg,
1042 BybitWsTradeMsg,
1043 },
1044 };
1045
1046 const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
1047
1048 use ustr::Ustr;
1049
1050 use crate::http::models::BybitFeeRate;
1051
1052 fn sample_fee_rate(
1053 symbol: &str,
1054 taker: &str,
1055 maker: &str,
1056 base_coin: Option<&str>,
1057 ) -> BybitFeeRate {
1058 BybitFeeRate {
1059 symbol: Ustr::from(symbol),
1060 taker_fee_rate: taker.to_string(),
1061 maker_fee_rate: maker.to_string(),
1062 base_coin: base_coin.map(Ustr::from),
1063 }
1064 }
1065
1066 fn linear_instrument() -> InstrumentAny {
1067 let json = load_test_json("http_get_instruments_linear.json");
1068 let response: BybitInstrumentLinearResponse = serde_json::from_str(&json).unwrap();
1069 let instrument = &response.result.list[0];
1070 let fee_rate = sample_fee_rate("BTCUSDT", "0.00055", "0.0001", Some("BTC"));
1071 parse_linear_instrument(instrument, &fee_rate, TS, TS).unwrap()
1072 }
1073
1074 fn option_instrument() -> InstrumentAny {
1075 let json = load_test_json("http_get_instruments_option.json");
1076 let response: BybitInstrumentOptionResponse = serde_json::from_str(&json).unwrap();
1077 let instrument = &response.result.list[0];
1078 parse_option_instrument(instrument, None, TS, TS).unwrap()
1079 }
1080
1081 #[rstest]
1082 fn parse_ws_trade_into_trade_tick() {
1083 let instrument = linear_instrument();
1084 let json = load_test_json("ws_public_trade.json");
1085 let msg: BybitWsTradeMsg = serde_json::from_str(&json).unwrap();
1086 let trade = &msg.data[0];
1087
1088 let tick = parse_ws_trade_tick(trade, &instrument, TS).unwrap();
1089
1090 assert_eq!(tick.instrument_id, instrument.id());
1091 assert_eq!(tick.price, instrument.make_price(27451.00));
1092 assert_eq!(tick.size, instrument.make_qty(0.010, None));
1093 assert_eq!(tick.aggressor_side, AggressorSide::Buyer);
1094 assert_eq!(
1095 tick.trade_id.to_string(),
1096 "9dc75fca-4bdd-4773-9f78-6f5d7ab2a110"
1097 );
1098 assert_eq!(tick.ts_event, UnixNanos::new(1_709_891_679_000_000_000));
1099 }
1100
1101 #[rstest]
1102 fn parse_orderbook_snapshot_into_deltas() {
1103 let instrument = linear_instrument();
1104 let json = load_test_json("ws_orderbook_snapshot.json");
1105 let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
1106
1107 let deltas = parse_orderbook_deltas(&msg, &instrument, TS).unwrap();
1108
1109 assert_eq!(deltas.instrument_id, instrument.id());
1110 assert_eq!(deltas.deltas.len(), 5);
1111 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
1112 assert_eq!(
1113 deltas.deltas[1].order.price,
1114 instrument.make_price(27450.00)
1115 );
1116 assert_eq!(
1117 deltas.deltas[1].order.size,
1118 instrument.make_qty(0.500, None)
1119 );
1120 let last = deltas.deltas.last().unwrap();
1121 assert_eq!(last.order.side, OrderSide::Sell);
1122 assert_eq!(last.order.price, instrument.make_price(27451.50));
1123 assert_eq!(
1124 last.flags & RecordFlag::F_LAST as u8,
1125 RecordFlag::F_LAST as u8
1126 );
1127 }
1128
1129 #[rstest]
1130 fn parse_orderbook_delta_marks_actions() {
1131 let instrument = linear_instrument();
1132 let json = load_test_json("ws_orderbook_delta.json");
1133 let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
1134
1135 let deltas = parse_orderbook_deltas(&msg, &instrument, TS).unwrap();
1136
1137 assert_eq!(deltas.deltas.len(), 2);
1138 let bid = &deltas.deltas[0];
1139 assert_eq!(bid.action, BookAction::Update);
1140 assert_eq!(bid.order.side, OrderSide::Buy);
1141 assert_eq!(bid.order.size, instrument.make_qty(0.400, None));
1142
1143 let ask = &deltas.deltas[1];
1144 assert_eq!(ask.action, BookAction::Delete);
1145 assert_eq!(ask.order.side, OrderSide::Sell);
1146 assert_eq!(ask.order.size, instrument.make_qty(0.0, None));
1147 assert_eq!(
1148 ask.flags & RecordFlag::F_LAST as u8,
1149 RecordFlag::F_LAST as u8
1150 );
1151 }
1152
1153 #[rstest]
1154 fn parse_orderbook_quote_produces_top_of_book() {
1155 let instrument = linear_instrument();
1156 let json = load_test_json("ws_orderbook_snapshot.json");
1157 let msg: BybitWsOrderbookDepthMsg = serde_json::from_str(&json).unwrap();
1158
1159 let quote = parse_orderbook_quote(&msg, &instrument, None, TS).unwrap();
1160
1161 assert_eq!(quote.instrument_id, instrument.id());
1162 assert_eq!(quote.bid_price, instrument.make_price(27450.00));
1163 assert_eq!(quote.bid_size, instrument.make_qty(0.500, None));
1164 assert_eq!(quote.ask_price, instrument.make_price(27451.00));
1165 assert_eq!(quote.ask_size, instrument.make_qty(0.750, None));
1166 }
1167
1168 #[rstest]
1169 fn parse_orderbook_quote_with_delta_updates_sizes() {
1170 let instrument = linear_instrument();
1171 let snapshot: BybitWsOrderbookDepthMsg =
1172 serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json")).unwrap();
1173 let base_quote = parse_orderbook_quote(&snapshot, &instrument, None, TS).unwrap();
1174
1175 let delta: BybitWsOrderbookDepthMsg =
1176 serde_json::from_str(&load_test_json("ws_orderbook_delta.json")).unwrap();
1177 let updated = parse_orderbook_quote(&delta, &instrument, Some(&base_quote), TS).unwrap();
1178
1179 assert_eq!(updated.bid_price, instrument.make_price(27450.00));
1180 assert_eq!(updated.bid_size, instrument.make_qty(0.400, None));
1181 assert_eq!(updated.ask_price, instrument.make_price(27451.00));
1182 assert_eq!(updated.ask_size, instrument.make_qty(0.0, None));
1183 }
1184
1185 #[rstest]
1186 fn parse_linear_ticker_quote_to_quote_tick() {
1187 let instrument = linear_instrument();
1188 let json = load_test_json("ws_ticker_linear.json");
1189 let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
1190
1191 let quote = parse_ticker_linear_quote(&msg, &instrument, TS).unwrap();
1192
1193 assert_eq!(quote.instrument_id, instrument.id());
1194 assert_eq!(quote.bid_price, instrument.make_price(17215.50));
1195 assert_eq!(quote.ask_price, instrument.make_price(17216.00));
1196 assert_eq!(quote.bid_size, instrument.make_qty(84.489, None));
1197 assert_eq!(quote.ask_size, instrument.make_qty(83.020, None));
1198 assert_eq!(quote.ts_event, UnixNanos::new(1_673_272_861_686_000_000));
1199 assert_eq!(quote.ts_init, TS);
1200 }
1201
1202 #[rstest]
1203 fn parse_option_ticker_quote_to_quote_tick() {
1204 let instrument = option_instrument();
1205 let json = load_test_json("ws_ticker_option.json");
1206 let msg: BybitWsTickerOptionMsg = serde_json::from_str(&json).unwrap();
1207
1208 let quote = parse_ticker_option_quote(&msg, &instrument, TS).unwrap();
1209
1210 assert_eq!(quote.instrument_id, instrument.id());
1211 assert_eq!(quote.bid_price, instrument.make_price(0.0));
1212 assert_eq!(quote.ask_price, instrument.make_price(10.0));
1213 assert_eq!(quote.bid_size, instrument.make_qty(0.0, None));
1214 assert_eq!(quote.ask_size, instrument.make_qty(5.1, None));
1215 assert_eq!(quote.ts_event, UnixNanos::new(1_672_917_511_074_000_000));
1216 assert_eq!(quote.ts_init, TS);
1217 }
1218
1219 #[rstest]
1220 #[case::timestamp_on_open(false, 1_672_324_800_000_000_000)]
1221 #[case::timestamp_on_close(true, 1_672_325_100_000_000_000)]
1222 fn parse_ws_kline_into_bar(#[case] timestamp_on_close: bool, #[case] expected_ts_event: u64) {
1223 use std::num::NonZero;
1224
1225 let instrument = linear_instrument();
1226 let json = load_test_json("ws_kline.json");
1227 let msg: crate::websocket::messages::BybitWsKlineMsg = serde_json::from_str(&json).unwrap();
1228 let kline = &msg.data[0];
1229
1230 let bar_spec = BarSpecification {
1231 step: NonZero::new(5).unwrap(),
1232 aggregation: BarAggregation::Minute,
1233 price_type: PriceType::Last,
1234 };
1235 let bar_type = BarType::new(instrument.id(), bar_spec, AggregationSource::External);
1236
1237 let bar = parse_ws_kline_bar(kline, &instrument, bar_type, timestamp_on_close, TS).unwrap();
1238
1239 assert_eq!(bar.bar_type, bar_type);
1240 assert_eq!(bar.open, instrument.make_price(16649.5));
1241 assert_eq!(bar.high, instrument.make_price(16677.0));
1242 assert_eq!(bar.low, instrument.make_price(16608.0));
1243 assert_eq!(bar.close, instrument.make_price(16677.0));
1244 assert_eq!(bar.volume, instrument.make_qty(2.081, None));
1245 assert_eq!(bar.ts_event, UnixNanos::new(expected_ts_event));
1246 assert_eq!(bar.ts_init, TS);
1247 }
1248
1249 #[rstest]
1250 fn parse_ws_order_into_order_status_report() {
1251 let instrument = linear_instrument();
1252 let json = load_test_json("ws_account_order_filled.json");
1253 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1254 serde_json::from_str(&json).unwrap();
1255 let order = &msg.data[0];
1256 let account_id = AccountId::new("BYBIT-001");
1257
1258 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
1259
1260 assert_eq!(report.account_id, account_id);
1261 assert_eq!(report.instrument_id, instrument.id());
1262 assert_eq!(report.order_side, OrderSide::Buy);
1263 assert_eq!(report.order_type, OrderType::Limit);
1264 assert_eq!(report.time_in_force, TimeInForce::Gtc);
1265 assert_eq!(report.order_status, OrderStatus::Filled);
1266 assert_eq!(report.quantity, instrument.make_qty(0.100, None));
1267 assert_eq!(report.filled_qty, instrument.make_qty(0.100, None));
1268 assert_eq!(report.price, Some(instrument.make_price(30000.50)));
1269 assert_eq!(report.avg_px, Some(dec!(30000.50)));
1270 assert_eq!(
1271 report.client_order_id.as_ref().unwrap().to_string(),
1272 "test-client-order-001"
1273 );
1274 assert_eq!(
1275 report.ts_accepted,
1276 UnixNanos::new(1_672_364_262_444_000_000)
1277 );
1278 assert_eq!(report.ts_last, UnixNanos::new(1_672_364_262_457_000_000));
1279 }
1280
1281 #[rstest]
1282 fn parse_ws_order_partially_filled_rejected_maps_to_canceled() {
1283 let instrument = linear_instrument();
1284 let json = load_test_json("ws_account_order_partially_filled_rejected.json");
1285 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1286 serde_json::from_str(&json).unwrap();
1287 let order = &msg.data[0];
1288 let account_id = AccountId::new("BYBIT-001");
1289
1290 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
1291
1292 assert_eq!(report.order_status, OrderStatus::Canceled);
1294 assert_eq!(report.filled_qty, instrument.make_qty(50.0, None));
1295 assert_eq!(
1296 report.client_order_id.as_ref().unwrap().to_string(),
1297 "O-20251001-164609-APEX-000-49"
1298 );
1299 assert_eq!(report.cancel_reason, Some("UNKNOWN".to_string()));
1300 }
1301
1302 #[rstest]
1303 fn parse_ws_execution_into_fill_report() {
1304 let instrument = linear_instrument();
1305 let json = load_test_json("ws_account_execution.json");
1306 let msg: crate::websocket::messages::BybitWsAccountExecutionMsg =
1307 serde_json::from_str(&json).unwrap();
1308 let execution = &msg.data[0];
1309 let account_id = AccountId::new("BYBIT-001");
1310
1311 let report = parse_ws_fill_report(execution, account_id, &instrument, TS).unwrap();
1312
1313 assert_eq!(report.account_id, account_id);
1314 assert_eq!(report.instrument_id, instrument.id());
1315 assert_eq!(
1316 report.venue_order_id.to_string(),
1317 "9aac161b-8ed6-450d-9cab-c5cc67c21784"
1318 );
1319 assert_eq!(
1320 report.trade_id.to_string(),
1321 "0ab1bdf7-4219-438b-b30a-32ec863018f7"
1322 );
1323 assert_eq!(report.order_side, OrderSide::Sell);
1324 assert_eq!(report.last_qty, instrument.make_qty(0.5, None));
1325 assert_eq!(report.last_px, instrument.make_price(95900.1));
1326 assert_eq!(report.commission.as_f64(), 26.3725275);
1327 assert_eq!(report.liquidity_side, LiquiditySide::Taker);
1328 assert_eq!(
1329 report.client_order_id.as_ref().unwrap().to_string(),
1330 "test-order-link-001"
1331 );
1332 assert_eq!(report.ts_event, UnixNanos::new(1_746_270_400_353_000_000));
1333 }
1334
1335 #[rstest]
1336 fn parse_ws_adl_execution_into_fill_report() {
1337 let instrument = linear_instrument();
1338 let json = load_test_json("ws_account_execution_adl.json");
1339 let msg: crate::websocket::messages::BybitWsAccountExecutionMsg =
1340 serde_json::from_str(&json).unwrap();
1341 let execution = &msg.data[0];
1342 let account_id = AccountId::new("BYBIT-001");
1343
1344 assert_eq!(execution.exec_type, BybitExecType::AdlTrade);
1345 assert!(execution.exec_type.is_exchange_generated());
1346 assert!(execution.order_link_id.is_empty());
1347
1348 let report = parse_ws_fill_report(execution, account_id, &instrument, TS).unwrap();
1349
1350 assert_eq!(report.client_order_id, None);
1353 assert_eq!(
1354 report.venue_order_id.to_string(),
1355 "9aac161b-8ed6-450d-9cab-c5cc67c21785"
1356 );
1357 assert_eq!(report.order_side, OrderSide::Sell);
1358 assert_eq!(report.last_qty, instrument.make_qty(0.5, None));
1359 assert_eq!(report.last_px, instrument.make_price(95850.0));
1360 assert_eq!(report.commission.as_f64(), 0.0);
1361 }
1362
1363 #[rstest]
1364 fn parse_ws_fill_report_venue_position_id_is_none() {
1365 let instrument = linear_instrument();
1366 let json = load_test_json("ws_account_execution.json");
1367 let msg: crate::websocket::messages::BybitWsAccountExecutionMsg =
1368 serde_json::from_str(&json).unwrap();
1369 let execution = &msg.data[0];
1370 let account_id = AccountId::new("BYBIT-001");
1371
1372 let report = parse_ws_fill_report(execution, account_id, &instrument, TS).unwrap();
1373
1374 assert_eq!(report.venue_position_id, None);
1375 }
1376
1377 #[rstest]
1378 fn parse_ws_order_status_report_venue_position_id_is_none_for_tp() {
1379 let instrument = linear_instrument();
1380 let json = load_test_json("ws_account_order_take_profit.json");
1381 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1382 serde_json::from_str(&json).unwrap();
1383 let order = &msg.data[0]; let account_id = AccountId::new("BYBIT-001");
1385
1386 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
1387
1388 assert_eq!(report.venue_position_id, None);
1389 }
1390
1391 #[rstest]
1392 fn parse_ws_position_into_position_status_report() {
1393 let instrument = linear_instrument();
1394 let json = load_test_json("ws_account_position.json");
1395 let msg: crate::websocket::messages::BybitWsAccountPositionMsg =
1396 serde_json::from_str(&json).unwrap();
1397 let position = &msg.data[0];
1398 let account_id = AccountId::new("BYBIT-001");
1399
1400 let report =
1401 parse_ws_position_status_report(position, account_id, &instrument, TS).unwrap();
1402
1403 assert_eq!(report.account_id, account_id);
1404 assert_eq!(report.instrument_id, instrument.id());
1405 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1406 assert_eq!(report.quantity, instrument.make_qty(0.01, None));
1407 assert_eq!(
1408 report.avg_px_open,
1409 Some(Decimal::try_from(3641.075).unwrap())
1410 );
1411 assert_eq!(report.ts_last, UnixNanos::new(1_762_199_125_472_000_000));
1412 assert_eq!(report.ts_init, TS);
1413 }
1414
1415 #[rstest]
1416 fn parse_ws_position_short_into_position_status_report() {
1417 let instruments_json = load_test_json("http_get_instruments_linear.json");
1419 let instruments_response: crate::http::models::BybitInstrumentLinearResponse =
1420 serde_json::from_str(&instruments_json).unwrap();
1421 let eth_def = &instruments_response.result.list[1]; let fee_rate = crate::http::models::BybitFeeRate {
1423 symbol: Ustr::from("ETHUSDT"),
1424 taker_fee_rate: "0.00055".to_string(),
1425 maker_fee_rate: "0.0001".to_string(),
1426 base_coin: Some(Ustr::from("ETH")),
1427 };
1428 let instrument =
1429 crate::common::parse::parse_linear_instrument(eth_def, &fee_rate, TS, TS).unwrap();
1430
1431 let json = load_test_json("ws_account_position_short.json");
1432 let msg: crate::websocket::messages::BybitWsAccountPositionMsg =
1433 serde_json::from_str(&json).unwrap();
1434 let position = &msg.data[0];
1435 let account_id = AccountId::new("BYBIT-001");
1436
1437 let report =
1438 parse_ws_position_status_report(position, account_id, &instrument, TS).unwrap();
1439
1440 assert_eq!(report.account_id, account_id);
1441 assert_eq!(report.instrument_id.symbol.as_str(), "ETHUSDT-LINEAR");
1442 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1443 assert_eq!(report.quantity, instrument.make_qty(0.01, None));
1444 assert_eq!(
1445 report.avg_px_open,
1446 Some(Decimal::try_from(3641.075).unwrap())
1447 );
1448 assert_eq!(report.ts_last, UnixNanos::new(1_762_199_125_472_000_000));
1449 assert_eq!(report.ts_init, TS);
1450 }
1451
1452 #[rstest]
1453 fn parse_ws_wallet_into_account_state() {
1454 let json = load_test_json("ws_account_wallet.json");
1455 let msg: crate::websocket::messages::BybitWsAccountWalletMsg =
1456 serde_json::from_str(&json).unwrap();
1457 let wallet = &msg.data[0];
1458 let account_id = AccountId::new("BYBIT-001");
1459 let ts_event = UnixNanos::new(1_700_034_722_104_000_000);
1460
1461 let state = parse_ws_account_state(wallet, account_id, ts_event, TS).unwrap();
1462
1463 assert_eq!(state.account_id, account_id);
1464 assert_eq!(state.account_type, AccountType::Margin);
1465 assert_eq!(state.balances.len(), 2);
1466 assert!(state.is_reported);
1467
1468 let btc_balance = &state.balances[0];
1470 assert_eq!(btc_balance.currency.code.as_str(), "BTC");
1471 assert!((btc_balance.total.as_f64() - 0.00102964).abs() < 1e-8);
1472 assert!((btc_balance.free.as_f64() - 0.00092964).abs() < 1e-8);
1473 assert!((btc_balance.locked.as_f64() - 0.0001).abs() < 1e-8);
1474
1475 let usdt_balance = &state.balances[1];
1477 assert_eq!(usdt_balance.currency.code.as_str(), "USDT");
1478 assert!((usdt_balance.total.as_f64() - 9647.75537647).abs() < 1e-6);
1479 assert!((usdt_balance.free.as_f64() - 9519.89806037).abs() < 1e-6);
1480 assert!((usdt_balance.locked.as_f64() - 127.8573161).abs() < 1e-6);
1481
1482 assert_eq!(state.margins.len(), 2);
1484 assert!(state.margins.iter().all(|m| m.instrument_id.is_none()));
1485
1486 let btc_margin = state
1487 .margins
1488 .iter()
1489 .find(|m| m.currency.code.as_str() == "BTC")
1490 .expect("BTC margin missing");
1491 assert!((btc_margin.initial.as_f64() - 0.0001).abs() < 1e-8);
1492 assert!(btc_margin.maintenance.as_f64().abs() < 1e-9);
1493
1494 let usdt_margin = state
1495 .margins
1496 .iter()
1497 .find(|m| m.currency.code.as_str() == "USDT")
1498 .expect("USDT margin missing");
1499 assert!((usdt_margin.initial.as_f64() - 127.8573161).abs() < 1e-6);
1500 assert!((usdt_margin.maintenance.as_f64() - 12.78573161).abs() < 1e-6);
1501
1502 assert_eq!(state.ts_event, ts_event);
1503 assert_eq!(state.ts_init, TS);
1504 }
1505
1506 #[rstest]
1507 fn parse_ws_wallet_with_small_order_calculates_free_correctly() {
1508 let json = load_test_json("ws_account_wallet_small_order.json");
1512 let msg: crate::websocket::messages::BybitWsAccountWalletMsg =
1513 serde_json::from_str(&json).unwrap();
1514 let wallet = &msg.data[0];
1515 let account_id = AccountId::new("BYBIT-UNIFIED");
1516 let ts_event = UnixNanos::new(1_762_960_669_000_000_000);
1517
1518 let state = parse_ws_account_state(wallet, account_id, ts_event, TS).unwrap();
1519
1520 assert_eq!(state.account_id, account_id);
1521 assert_eq!(state.balances.len(), 1);
1522
1523 let usdt_balance = &state.balances[0];
1525 assert_eq!(usdt_balance.currency.code.as_str(), "USDT");
1526
1527 assert!((usdt_balance.total.as_f64() - 51333.82543837).abs() < 1e-6);
1529
1530 assert!((usdt_balance.locked.as_f64() - 50.028).abs() < 1e-6);
1532
1533 assert!((usdt_balance.free.as_f64() - 51283.79743837).abs() < 1e-6);
1535
1536 assert_eq!(state.margins.len(), 1);
1542 let usdt_margin = &state.margins[0];
1543 assert!(usdt_margin.instrument_id.is_none());
1544 assert_eq!(usdt_margin.currency.code.as_str(), "USDT");
1545 assert!((usdt_margin.initial.as_f64() - 50.028).abs() < 1e-6);
1546 assert!(usdt_margin.maintenance.as_f64().abs() < 1e-9);
1547 }
1548
1549 #[rstest]
1550 fn parse_ticker_linear_into_funding_rate() {
1551 let instrument = linear_instrument();
1552 let json = load_test_json("ws_ticker_linear.json");
1553 let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
1554
1555 let ts_event = UnixNanos::new(1_673_272_861_686_000_000);
1556
1557 let funding =
1558 parse_ticker_linear_funding(&msg.data, instrument.id(), ts_event, TS).unwrap();
1559
1560 assert_eq!(funding.instrument_id, instrument.id());
1561 assert_eq!(funding.rate, dec!(-0.000212)); assert_eq!(funding.interval, Some(8 * 60));
1563 assert_eq!(
1564 funding.next_funding_ns,
1565 Some(UnixNanos::new(1_673_280_000_000_000_000))
1566 );
1567 assert_eq!(funding.ts_event, ts_event);
1568 assert_eq!(funding.ts_init, TS);
1569 }
1570
1571 #[rstest]
1572 fn parse_ticker_linear_into_mark_price() {
1573 let instrument = linear_instrument();
1574 let json = load_test_json("ws_ticker_linear.json");
1575 let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
1576
1577 let ts_event = UnixNanos::new(1_673_272_861_686_000_000);
1578
1579 let mark_price =
1580 parse_ticker_linear_mark_price(&msg.data, &instrument, ts_event, TS).unwrap();
1581
1582 assert_eq!(mark_price.instrument_id, instrument.id());
1583 assert_eq!(mark_price.value, instrument.make_price(17217.33));
1584 assert_eq!(mark_price.ts_event, ts_event);
1585 assert_eq!(mark_price.ts_init, TS);
1586 }
1587
1588 #[rstest]
1589 fn parse_ticker_linear_into_index_price() {
1590 let instrument = linear_instrument();
1591 let json = load_test_json("ws_ticker_linear.json");
1592 let msg: BybitWsTickerLinearMsg = serde_json::from_str(&json).unwrap();
1593
1594 let ts_event = UnixNanos::new(1_673_272_861_686_000_000);
1595
1596 let index_price =
1597 parse_ticker_linear_index_price(&msg.data, &instrument, ts_event, TS).unwrap();
1598
1599 assert_eq!(index_price.instrument_id, instrument.id());
1600 assert_eq!(index_price.value, instrument.make_price(17227.36));
1601 assert_eq!(index_price.ts_event, ts_event);
1602 assert_eq!(index_price.ts_init, TS);
1603 }
1604
1605 #[rstest]
1606 fn parse_ticker_option_into_mark_price() {
1607 let instrument = option_instrument();
1608 let json = load_test_json("ws_ticker_option.json");
1609 let msg: BybitWsTickerOptionMsg = serde_json::from_str(&json).unwrap();
1610
1611 let mark_price = parse_ticker_option_mark_price(&msg, &instrument, TS).unwrap();
1612
1613 assert_eq!(mark_price.instrument_id, instrument.id());
1614 assert_eq!(mark_price.value, instrument.make_price(7.86976724));
1615 assert_eq!(mark_price.ts_init, TS);
1616 }
1617
1618 #[rstest]
1619 fn parse_ticker_option_into_index_price() {
1620 let instrument = option_instrument();
1621 let json = load_test_json("ws_ticker_option.json");
1622 let msg: BybitWsTickerOptionMsg = serde_json::from_str(&json).unwrap();
1623
1624 let index_price = parse_ticker_option_index_price(&msg, &instrument, TS).unwrap();
1625
1626 assert_eq!(index_price.instrument_id, instrument.id());
1627 assert_eq!(index_price.value, instrument.make_price(16823.73));
1628 assert_eq!(index_price.ts_init, TS);
1629 }
1630
1631 #[rstest]
1632 fn parse_ws_order_stop_market_sell_preserves_type() {
1633 let instrument = linear_instrument();
1634 let json = load_test_json("ws_account_order_stop_market.json");
1635 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1636 serde_json::from_str(&json).unwrap();
1637 let order = &msg.data[0];
1638 let account_id = AccountId::new("BYBIT-001");
1639
1640 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
1641
1642 assert_eq!(report.order_type, OrderType::StopMarket);
1644 assert_eq!(report.order_side, OrderSide::Sell);
1645 assert_eq!(report.order_status, OrderStatus::Accepted); assert_eq!(report.trigger_price, Some(instrument.make_price(45000.00)));
1647 assert_eq!(report.trigger_type, Some(TriggerType::LastPrice));
1648 assert_eq!(
1649 report.client_order_id.as_ref().unwrap().to_string(),
1650 "test-client-stop-market-001"
1651 );
1652 }
1653
1654 #[rstest]
1655 fn parse_ws_order_stop_market_buy_preserves_type() {
1656 let instrument = linear_instrument();
1657 let json = load_test_json("ws_account_order_buy_stop_market.json");
1658 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1659 serde_json::from_str(&json).unwrap();
1660 let order = &msg.data[0];
1661 let account_id = AccountId::new("BYBIT-001");
1662
1663 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
1664
1665 assert_eq!(report.order_type, OrderType::StopMarket);
1667 assert_eq!(report.order_side, OrderSide::Buy);
1668 assert_eq!(report.order_status, OrderStatus::Accepted);
1669 assert_eq!(report.trigger_price, Some(instrument.make_price(55000.00)));
1670 assert_eq!(report.trigger_type, Some(TriggerType::LastPrice));
1671 assert_eq!(
1672 report.client_order_id.as_ref().unwrap().to_string(),
1673 "test-client-buy-stop-market-001"
1674 );
1675 }
1676
1677 #[rstest]
1678 fn parse_ws_order_market_if_touched_buy_preserves_type() {
1679 let instrument = linear_instrument();
1680 let json = load_test_json("ws_account_order_market_if_touched.json");
1681 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1682 serde_json::from_str(&json).unwrap();
1683 let order = &msg.data[0];
1684 let account_id = AccountId::new("BYBIT-001");
1685
1686 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
1687
1688 assert_eq!(report.order_type, OrderType::MarketIfTouched);
1690 assert_eq!(report.order_side, OrderSide::Buy);
1691 assert_eq!(report.order_status, OrderStatus::Accepted); assert_eq!(report.trigger_price, Some(instrument.make_price(55000.00)));
1693 assert_eq!(report.trigger_type, Some(TriggerType::LastPrice));
1694 assert_eq!(
1695 report.client_order_id.as_ref().unwrap().to_string(),
1696 "test-client-mit-001"
1697 );
1698 }
1699
1700 #[rstest]
1701 fn parse_ws_order_market_if_touched_sell_preserves_type() {
1702 let instrument = linear_instrument();
1703 let json = load_test_json("ws_account_order_sell_market_if_touched.json");
1704 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1705 serde_json::from_str(&json).unwrap();
1706 let order = &msg.data[0];
1707 let account_id = AccountId::new("BYBIT-001");
1708
1709 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
1710
1711 assert_eq!(report.order_type, OrderType::MarketIfTouched);
1713 assert_eq!(report.order_side, OrderSide::Sell);
1714 assert_eq!(report.order_status, OrderStatus::Accepted);
1715 assert_eq!(report.trigger_price, Some(instrument.make_price(55000.00)));
1716 assert_eq!(
1717 report.client_order_id.as_ref().unwrap().to_string(),
1718 "test-client-sell-mit-001"
1719 );
1720 }
1721
1722 #[rstest]
1723 fn parse_ws_order_stop_limit_preserves_type() {
1724 let instrument = linear_instrument();
1725 let json = load_test_json("ws_account_order_stop_limit.json");
1726 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1727 serde_json::from_str(&json).unwrap();
1728 let order = &msg.data[0];
1729 let account_id = AccountId::new("BYBIT-001");
1730
1731 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
1732
1733 assert_eq!(report.order_type, OrderType::StopLimit);
1736 assert_eq!(report.order_side, OrderSide::Sell);
1737 assert_eq!(report.order_status, OrderStatus::Accepted); assert_eq!(report.price, Some(instrument.make_price(44500.00)));
1739 assert_eq!(report.trigger_price, Some(instrument.make_price(45000.00)));
1740 assert_eq!(
1741 report.client_order_id.as_ref().unwrap().to_string(),
1742 "test-client-stop-limit-001"
1743 );
1744 }
1745
1746 #[rstest]
1747 fn parse_ws_order_limit_if_touched_preserves_type() {
1748 let instrument = linear_instrument();
1749 let json = load_test_json("ws_account_order_limit_if_touched.json");
1750 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1751 serde_json::from_str(&json).unwrap();
1752 let order = &msg.data[0];
1753 let account_id = AccountId::new("BYBIT-001");
1754
1755 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
1756
1757 assert_eq!(report.order_type, OrderType::LimitIfTouched);
1760 assert_eq!(report.order_side, OrderSide::Buy);
1761 assert_eq!(report.order_status, OrderStatus::Accepted); assert_eq!(report.price, Some(instrument.make_price(55500.00)));
1763 assert_eq!(report.trigger_price, Some(instrument.make_price(55000.00)));
1764 assert_eq!(
1765 report.client_order_id.as_ref().unwrap().to_string(),
1766 "test-client-lit-001"
1767 );
1768 }
1769
1770 #[rstest]
1771 fn parse_ws_wallet_clamps_free_to_zero_when_locked_exceeds_total() {
1772 let json = load_test_json("ws_account_wallet_locked_exceeds_total.json");
1775 let msg: crate::websocket::messages::BybitWsAccountWalletMsg =
1776 serde_json::from_str(&json).unwrap();
1777 let wallet = &msg.data[0];
1778 let account_id = AccountId::new("BYBIT-UNIFIED");
1779 let ts_event = UnixNanos::new(1_762_960_669_000_000_000);
1780
1781 let state = parse_ws_account_state(wallet, account_id, ts_event, TS).unwrap();
1782
1783 let usdt_balance = &state.balances[0];
1784 assert_eq!(usdt_balance.currency.code.as_str(), "USDT");
1785 assert!((usdt_balance.total.as_f64() - 100.0).abs() < 1e-6);
1786 assert!((usdt_balance.locked.as_f64() - 100.0).abs() < 1e-6);
1788 assert_eq!(usdt_balance.free.as_f64(), 0.0);
1789 }
1790
1791 #[rstest]
1792 fn parse_ws_order_take_profit_maps_to_market_if_touched() {
1793 let instrument = linear_instrument();
1794 let json = load_test_json("ws_account_order_take_profit.json");
1795 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1796 serde_json::from_str(&json).unwrap();
1797 let order = &msg.data[0];
1798 let account_id = AccountId::new("BYBIT-001");
1799
1800 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
1801
1802 assert_eq!(report.order_type, OrderType::MarketIfTouched);
1803 assert_eq!(report.order_side, OrderSide::Sell);
1804 assert_eq!(report.trigger_price, Some(instrument.make_price(55000.00)));
1805 assert_eq!(report.trigger_type, Some(TriggerType::LastPrice));
1806 assert!(report.reduce_only);
1807 }
1808
1809 #[rstest]
1810 fn parse_ws_order_stop_loss_maps_to_stop_market() {
1811 let instrument = linear_instrument();
1812 let json = load_test_json("ws_account_order_stop_loss.json");
1813 let msg: crate::websocket::messages::BybitWsAccountOrderMsg =
1814 serde_json::from_str(&json).unwrap();
1815 let order = &msg.data[0];
1816 let account_id = AccountId::new("BYBIT-001");
1817
1818 let report = parse_ws_order_status_report(order, &instrument, account_id, TS).unwrap();
1819
1820 assert_eq!(report.order_type, OrderType::StopMarket);
1821 assert_eq!(report.order_side, OrderSide::Sell);
1822 assert_eq!(report.trigger_price, Some(instrument.make_price(48000.00)));
1823 assert_eq!(report.trigger_type, Some(TriggerType::LastPrice));
1824 assert!(report.reduce_only);
1825 }
1826}