1use std::{num::NonZero, str::FromStr};
19
20use ahash::AHashMap;
21use chrono::Timelike;
22use nautilus_core::{UnixNanos, uuid::UUID4};
23#[cfg(test)]
24use nautilus_model::types::Currency;
25use nautilus_model::{
26 data::{
27 Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
28 MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
29 depth::DEPTH10_LEN,
30 },
31 enums::{
32 AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
33 PriceType, RecordFlag, TimeInForce, TrailingOffsetType,
34 },
35 events::{
36 OrderAccepted, OrderCanceled, OrderExpired, OrderRejected, OrderTriggered, OrderUpdated,
37 account::state::AccountState,
38 },
39 identifiers::{
40 AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
41 VenueOrderId,
42 },
43 instruments::{Instrument, InstrumentAny},
44 reports::{FillReport, OrderStatusReport, PositionStatusReport},
45 types::{AccountBalance, MarginBalance, Money, Price, Quantity},
46};
47use rust_decimal::Decimal;
48use ustr::Ustr;
49
50use super::{
51 enums::{BitmexAction, BitmexWsTopic},
52 messages::{
53 BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
54 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
55 BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
56 },
57};
58use crate::{
59 common::{
60 consts::BITMEX_VENUE,
61 enums::{
62 BitmexExecInstruction, BitmexExecType, BitmexOrderStatus, BitmexOrderType,
63 BitmexPegPriceType, BitmexSide,
64 },
65 parse::{
66 bitmex_currency_divisor, clean_reason, derive_trade_id, extract_trigger_type,
67 map_bitmex_currency, normalize_trade_bin_prices, normalize_trade_bin_volume,
68 parse_account_balance, parse_contracts_quantity, parse_fractional_quantity,
69 parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
70 parse_position_side, parse_signed_contracts_quantity,
71 },
72 },
73 http::parse::get_currency,
74 websocket::messages::BitmexOrderUpdateMsg,
75};
76
77const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
78 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
79 aggregation: BarAggregation::Minute,
80 price_type: PriceType::Last,
81};
82const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
83 step: NonZero::new(5).expect("5 is a valid non-zero usize"),
84 aggregation: BarAggregation::Minute,
85 price_type: PriceType::Last,
86};
87const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
88 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
89 aggregation: BarAggregation::Hour,
90 price_type: PriceType::Last,
91};
92const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
93 step: NonZero::new(1).expect("1 is a valid non-zero usize"),
94 aggregation: BarAggregation::Day,
95 price_type: PriceType::Last,
96};
97
98#[inline]
106#[must_use]
107pub fn is_index_symbol(symbol: &Ustr) -> bool {
108 symbol.starts_with('.')
109}
110
111#[must_use]
113pub fn parse_book_msg_vec(
114 data: Vec<BitmexOrderBookMsg>,
115 action: BitmexAction,
116 instruments: &AHashMap<Ustr, InstrumentAny>,
117 ts_init: UnixNanos,
118) -> Vec<Data> {
119 let mut deltas = Vec::with_capacity(data.len());
120
121 for msg in data {
122 if let Some(instrument) = instruments.get(&msg.symbol) {
123 let instrument_id = instrument.id();
124 let price_precision = instrument.price_precision();
125 deltas.push(Data::Delta(parse_book_msg(
126 &msg,
127 &action,
128 instrument,
129 instrument_id,
130 price_precision,
131 ts_init,
132 )));
133 } else {
134 log::error!(
135 "Instrument cache miss: book delta dropped for symbol={}",
136 msg.symbol
137 );
138 }
139 }
140
141 if let Some(Data::Delta(last_delta)) = deltas.last_mut() {
143 *last_delta = OrderBookDelta::new(
144 last_delta.instrument_id,
145 last_delta.action,
146 last_delta.order,
147 last_delta.flags | RecordFlag::F_LAST as u8,
148 last_delta.sequence,
149 last_delta.ts_event,
150 last_delta.ts_init,
151 );
152 }
153
154 deltas
155}
156
157#[must_use]
159pub fn parse_book10_msg_vec(
160 data: Vec<BitmexOrderBook10Msg>,
161 instruments: &AHashMap<Ustr, InstrumentAny>,
162 ts_init: UnixNanos,
163) -> Vec<Data> {
164 let mut depths = Vec::with_capacity(data.len());
165
166 for msg in data {
167 if let Some(instrument) = instruments.get(&msg.symbol) {
168 let instrument_id = instrument.id();
169 let price_precision = instrument.price_precision();
170 match parse_book10_msg(&msg, instrument, instrument_id, price_precision, ts_init) {
171 Ok(depth) => depths.push(Data::Depth10(Box::new(depth))),
172 Err(e) => {
173 log::error!("Failed to parse orderBook10 for symbol={}: {e}", msg.symbol);
174 }
175 }
176 } else {
177 log::error!(
178 "Instrument cache miss: depth10 message dropped for symbol={}",
179 msg.symbol
180 );
181 }
182 }
183 depths
184}
185
186#[must_use]
188pub fn parse_trade_msg_vec(
189 data: Vec<BitmexTradeMsg>,
190 instruments: &AHashMap<Ustr, InstrumentAny>,
191 ts_init: UnixNanos,
192) -> Vec<Data> {
193 let mut trades = Vec::with_capacity(data.len());
194
195 for msg in data {
196 if let Some(instrument) = instruments.get(&msg.symbol) {
197 let instrument_id = instrument.id();
198 let price_precision = instrument.price_precision();
199 trades.push(Data::Trade(parse_trade_msg(
200 &msg,
201 instrument,
202 instrument_id,
203 price_precision,
204 ts_init,
205 )));
206 } else {
207 log::error!(
208 "Instrument cache miss: trade message dropped for symbol={}",
209 msg.symbol
210 );
211 }
212 }
213 trades
214}
215
216#[must_use]
218pub fn parse_trade_bin_msg_vec(
219 data: Vec<BitmexTradeBinMsg>,
220 topic: &BitmexWsTopic,
221 instruments: &AHashMap<Ustr, InstrumentAny>,
222 ts_init: UnixNanos,
223) -> Vec<Data> {
224 let mut trades = Vec::with_capacity(data.len());
225
226 for msg in data {
227 if let Some(instrument) = instruments.get(&msg.symbol) {
228 let instrument_id = instrument.id();
229 let price_precision = instrument.price_precision();
230 trades.push(Data::Bar(parse_trade_bin_msg(
231 &msg,
232 topic,
233 instrument,
234 instrument_id,
235 price_precision,
236 ts_init,
237 )));
238 } else {
239 log::error!(
240 "Instrument cache miss: trade bin (bar) dropped for symbol={}",
241 msg.symbol
242 );
243 }
244 }
245 trades
246}
247
248#[must_use]
250pub fn parse_book_msg(
251 msg: &BitmexOrderBookMsg,
252 action: &BitmexAction,
253 instrument: &InstrumentAny,
254 instrument_id: InstrumentId,
255 price_precision: u8,
256 ts_init: UnixNanos,
257) -> OrderBookDelta {
258 let flags = if action == &BitmexAction::Partial {
259 RecordFlag::F_SNAPSHOT as u8
260 } else {
261 0
262 };
263
264 let action = action.as_book_action();
265 let price = Price::new(msg.price, price_precision);
266 let side = msg.side.as_order_side();
267 let size = parse_contracts_quantity(msg.size.unwrap_or(0), instrument);
268 let order_id = msg.id;
269 let order = BookOrder::new(side, price, size, order_id);
270 let sequence = 0; let ts_event = UnixNanos::from(msg.timestamp);
272
273 OrderBookDelta::new(
274 instrument_id,
275 action,
276 order,
277 flags,
278 sequence,
279 ts_event,
280 ts_init,
281 )
282}
283
284pub fn parse_book10_msg(
290 msg: &BitmexOrderBook10Msg,
291 instrument: &InstrumentAny,
292 instrument_id: InstrumentId,
293 price_precision: u8,
294 ts_init: UnixNanos,
295) -> anyhow::Result<OrderBookDepth10> {
296 let mut bids = Vec::with_capacity(DEPTH10_LEN);
297 let mut asks = Vec::with_capacity(DEPTH10_LEN);
298
299 let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
301 let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
302
303 for (i, level) in msg.bids.iter().enumerate() {
304 let bid_order = BookOrder::new(
305 OrderSide::Buy,
306 Price::new(level[0], price_precision),
307 parse_fractional_quantity(level[1], instrument),
308 0,
309 );
310
311 bids.push(bid_order);
312 bid_counts[i] = 1;
313 }
314
315 for (i, level) in msg.asks.iter().enumerate() {
316 let ask_order = BookOrder::new(
317 OrderSide::Sell,
318 Price::new(level[0], price_precision),
319 parse_fractional_quantity(level[1], instrument),
320 0,
321 );
322
323 asks.push(ask_order);
324 ask_counts[i] = 1;
325 }
326
327 let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
328 anyhow::anyhow!(
329 "Bids length mismatch: expected {DEPTH10_LEN}, was {}",
330 v.len()
331 )
332 })?;
333 let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
334 anyhow::anyhow!(
335 "Asks length mismatch: expected {DEPTH10_LEN}, was {}",
336 v.len()
337 )
338 })?;
339
340 let ts_event = UnixNanos::from(msg.timestamp);
341
342 Ok(OrderBookDepth10::new(
343 instrument_id,
344 bids,
345 asks,
346 bid_counts,
347 ask_counts,
348 RecordFlag::F_SNAPSHOT as u8,
349 0, ts_event,
351 ts_init,
352 ))
353}
354
355#[must_use]
357pub fn parse_quote_msg(
358 msg: &BitmexQuoteMsg,
359 last_quote: &QuoteTick,
360 instrument: &InstrumentAny,
361 instrument_id: InstrumentId,
362 price_precision: u8,
363 ts_init: UnixNanos,
364) -> QuoteTick {
365 let bid_price = match msg.bid_price {
366 Some(price) => Price::new(price, price_precision),
367 None => last_quote.bid_price,
368 };
369
370 let ask_price = match msg.ask_price {
371 Some(price) => Price::new(price, price_precision),
372 None => last_quote.ask_price,
373 };
374
375 let bid_size = match msg.bid_size {
376 Some(size) => parse_contracts_quantity(size, instrument),
377 None => last_quote.bid_size,
378 };
379
380 let ask_size = match msg.ask_size {
381 Some(size) => parse_contracts_quantity(size, instrument),
382 None => last_quote.ask_size,
383 };
384
385 let ts_event = UnixNanos::from(msg.timestamp);
386
387 QuoteTick::new(
388 instrument_id,
389 bid_price,
390 ask_price,
391 bid_size,
392 ask_size,
393 ts_event,
394 ts_init,
395 )
396}
397
398#[must_use]
400pub fn parse_trade_msg(
401 msg: &BitmexTradeMsg,
402 instrument: &InstrumentAny,
403 instrument_id: InstrumentId,
404 price_precision: u8,
405 ts_init: UnixNanos,
406) -> TradeTick {
407 let price = Price::new(msg.price, price_precision);
408 let size = parse_contracts_quantity(msg.size, instrument);
409 let aggressor_side = msg.side.as_aggressor_side();
410 let ts_event = UnixNanos::from(msg.timestamp);
411 let trade_id = match msg.trd_match_id {
412 Some(uuid) => TradeId::new(uuid.to_string()),
413 None => derive_trade_id(
414 msg.symbol,
415 ts_event.as_u64(),
416 msg.price,
417 msg.size as i64,
418 Some(msg.side.into()),
419 ),
420 };
421
422 TradeTick::new(
423 instrument_id,
424 price,
425 size,
426 aggressor_side,
427 trade_id,
428 ts_event,
429 ts_init,
430 )
431}
432
433#[must_use]
435pub fn parse_trade_bin_msg(
436 msg: &BitmexTradeBinMsg,
437 topic: &BitmexWsTopic,
438 instrument: &InstrumentAny,
439 instrument_id: InstrumentId,
440 price_precision: u8,
441 ts_init: UnixNanos,
442) -> Bar {
443 let spec = bar_spec_from_topic(topic);
444 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
445
446 let open = Price::new(msg.open, price_precision);
447 let high = Price::new(msg.high, price_precision);
448 let low = Price::new(msg.low, price_precision);
449 let close = Price::new(msg.close, price_precision);
450
451 let (open, high, low, close) =
452 normalize_trade_bin_prices(open, high, low, close, &msg.symbol, Some(&bar_type));
453
454 let volume_contracts = normalize_trade_bin_volume(Some(msg.volume), &msg.symbol);
455 let volume = parse_contracts_quantity(volume_contracts, instrument);
456 let ts_event = UnixNanos::from(msg.timestamp);
457
458 Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
459}
460
461#[must_use]
465pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
466 match topic {
467 BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
468 BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
469 BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
470 BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
471 _ => {
472 log::error!("Bar specification not supported: topic={topic:?}");
473 BAR_SPEC_1_MINUTE
474 }
475 }
476}
477
478#[must_use]
482pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
483 match spec {
484 BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
485 BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
486 BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
487 BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
488 _ => {
489 log::error!("Bar specification not supported: spec={spec:?}");
490 BitmexWsTopic::TradeBin1m
491 }
492 }
493}
494
495fn infer_order_type_from_msg(msg: &BitmexOrderMsg) -> OrderType {
496 if msg.stop_px.is_some() {
497 if msg.price.is_some() {
498 OrderType::StopLimit
499 } else {
500 OrderType::StopMarket
501 }
502 } else if msg.price.is_some() {
503 OrderType::Limit
504 } else {
505 OrderType::Market
506 }
507}
508
509pub fn parse_order_msg(
519 msg: &BitmexOrderMsg,
520 instrument: &InstrumentAny,
521 order_type_cache: &mut AHashMap<ClientOrderId, OrderType>,
522 ts_init: UnixNanos,
523) -> anyhow::Result<OrderStatusReport> {
524 let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); let instrument_id = parse_instrument_id(msg.symbol);
526 let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
527 let common_side: BitmexSide = msg.side.into();
528 let order_side: OrderSide = common_side.into();
529
530 let order_type: OrderType = if let Some(ord_type) = msg.ord_type {
531 if ord_type == BitmexOrderType::Pegged
533 && msg.peg_price_type == Some(BitmexPegPriceType::TrailingStopPeg)
534 {
535 if msg.price.is_some() {
536 OrderType::TrailingStopLimit
537 } else {
538 OrderType::TrailingStopMarket
539 }
540 } else {
541 ord_type.into()
542 }
543 } else if let Some(client_order_id) = msg.cl_ord_id {
544 let client_order_id = ClientOrderId::new(client_order_id);
545 if let Some(&cached) = order_type_cache.get(&client_order_id) {
546 cached
547 } else {
548 let inferred = infer_order_type_from_msg(msg);
549 order_type_cache.insert(client_order_id, inferred);
550 inferred
551 }
552 } else {
553 infer_order_type_from_msg(msg)
554 };
555
556 let time_in_force: TimeInForce = match msg.time_in_force {
557 Some(tif) => tif.try_into().map_err(|e| anyhow::anyhow!("{e}"))?,
558 None => TimeInForce::Gtc,
559 };
560 let order_status: OrderStatus = msg.ord_status.into();
561 let quantity = parse_signed_contracts_quantity(msg.order_qty, instrument);
562 let filled_qty = parse_signed_contracts_quantity(msg.cum_qty, instrument);
563 let report_id = UUID4::new();
564 let ts_accepted =
565 parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
566 let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
567
568 let mut report = OrderStatusReport::new(
569 account_id,
570 instrument_id,
571 None, venue_order_id,
573 order_side,
574 order_type,
575 time_in_force,
576 order_status,
577 quantity,
578 filled_qty,
579 ts_accepted,
580 ts_last,
581 ts_init,
582 Some(report_id),
583 );
584
585 if let Some(cl_ord_id) = &msg.cl_ord_id {
586 report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
587 }
588
589 if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
590 report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
591 }
592
593 if let Some(price) = msg.price {
594 report = report.with_price(Price::new(price, instrument.price_precision()));
595 }
596
597 if let Some(avg_px) = msg.avg_px {
598 report = report.with_avg_px(avg_px)?;
599 }
600
601 if let Some(trigger_price) = msg.stop_px {
602 report = report
603 .with_trigger_price(Price::new(trigger_price, instrument.price_precision()))
604 .with_trigger_type(extract_trigger_type(msg.exec_inst.as_ref()));
605 }
606
607 if matches!(
609 order_type,
610 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
611 ) && let Some(peg_offset) = msg.peg_offset_value
612 {
613 let trailing_offset = Decimal::try_from(peg_offset.abs())
614 .unwrap_or_else(|_| Decimal::new(peg_offset.abs() as i64, 0));
615 report = report
616 .with_trailing_offset(trailing_offset)
617 .with_trailing_offset_type(TrailingOffsetType::Price);
618
619 if msg.stop_px.is_none() {
620 report = report.with_trigger_type(extract_trigger_type(msg.exec_inst.as_ref()));
621 }
622 }
623
624 if let Some(exec_insts) = &msg.exec_inst {
625 for exec_inst in exec_insts {
626 match exec_inst {
627 BitmexExecInstruction::ParticipateDoNotInitiate => {
628 report = report.with_post_only(true);
629 }
630 BitmexExecInstruction::ReduceOnly => {
631 report = report.with_reduce_only(true);
632 }
633 _ => {}
634 }
635 }
636 }
637
638 if order_status == OrderStatus::Rejected {
640 if let Some(reason_str) = msg.ord_rej_reason.or(msg.text) {
641 log::debug!(
642 "Order rejected with reason: order_id={:?}, client_order_id={:?}, reason={:?}",
643 venue_order_id,
644 msg.cl_ord_id,
645 reason_str,
646 );
647 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
648 } else {
649 log::debug!(
650 "Order rejected without reason from BitMEX: order_id={:?}, client_order_id={:?}, ord_status={:?}, ord_rej_reason={:?}, text={:?}",
651 venue_order_id,
652 msg.cl_ord_id,
653 msg.ord_status,
654 msg.ord_rej_reason,
655 msg.text,
656 );
657 }
658 }
659
660 if order_status == OrderStatus::Canceled
663 && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
664 {
665 report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
666 }
667
668 Ok(report)
669}
670
671#[derive(Debug, Clone)]
673pub enum ParsedOrderEvent {
674 Accepted(OrderAccepted),
675 Canceled(OrderCanceled),
676 Expired(OrderExpired),
677 Triggered(OrderTriggered),
678 Rejected(OrderRejected),
679}
680
681pub fn parse_order_event(
687 msg: &BitmexOrderMsg,
688 client_order_id: ClientOrderId,
689 account_id: AccountId,
690 trader_id: TraderId,
691 strategy_id: StrategyId,
692 ts_init: UnixNanos,
693) -> Option<ParsedOrderEvent> {
694 let instrument_id = parse_instrument_id(msg.symbol);
695 let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
696 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
697
698 match msg.ord_status {
699 BitmexOrderStatus::New => {
700 let accepted = OrderAccepted::new(
701 trader_id,
702 strategy_id,
703 instrument_id,
704 client_order_id,
705 venue_order_id,
706 account_id,
707 UUID4::new(),
708 ts_event,
709 ts_init,
710 false,
711 );
712 Some(ParsedOrderEvent::Accepted(accepted))
713 }
714 BitmexOrderStatus::Canceled => {
715 let cancel_reason = msg
718 .ord_rej_reason
719 .or(msg.text)
720 .map(|r| clean_reason(r.as_ref()));
721
722 let is_post_only_rejection = cancel_reason
723 .as_deref()
724 .is_some_and(|r| r.contains("ParticipateDoNotInitiate"));
725
726 if is_post_only_rejection {
727 let rejected = OrderRejected::new(
728 trader_id,
729 strategy_id,
730 instrument_id,
731 client_order_id,
732 account_id,
733 Ustr::from(
734 cancel_reason
735 .as_deref()
736 .unwrap_or("Post-only order rejected"),
737 ),
738 UUID4::new(),
739 ts_event,
740 ts_init,
741 false,
742 true, );
744 Some(ParsedOrderEvent::Rejected(rejected))
745 } else {
746 let canceled = OrderCanceled::new(
747 trader_id,
748 strategy_id,
749 instrument_id,
750 client_order_id,
751 UUID4::new(),
752 ts_event,
753 ts_init,
754 false,
755 Some(venue_order_id),
756 Some(account_id),
757 );
758 Some(ParsedOrderEvent::Canceled(canceled))
759 }
760 }
761 BitmexOrderStatus::Expired => {
762 let expired = OrderExpired::new(
763 trader_id,
764 strategy_id,
765 instrument_id,
766 client_order_id,
767 UUID4::new(),
768 ts_event,
769 ts_init,
770 false,
771 Some(venue_order_id),
772 Some(account_id),
773 );
774 Some(ParsedOrderEvent::Expired(expired))
775 }
776 _ => None,
780 }
781}
782
783pub fn parse_order_update_msg(
787 msg: &BitmexOrderUpdateMsg,
788 instrument: &InstrumentAny,
789 account_id: AccountId,
790 ts_init: UnixNanos,
791) -> Option<OrderUpdated> {
792 let trader_id = TraderId::external();
794 let strategy_id = StrategyId::external();
795 let instrument_id = parse_instrument_id(msg.symbol);
796 let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
797 let client_order_id = msg
798 .cl_ord_id
799 .as_ref()
800 .map_or_else(ClientOrderId::external, ClientOrderId::new);
801
802 let quantity = match (msg.leaves_qty, msg.cum_qty) {
805 (Some(leaves), Some(cum)) => parse_contracts_quantity((leaves + cum) as u64, instrument),
806 _ => Quantity::zero(instrument.size_precision()),
807 };
808 let price = msg
809 .price
810 .map(|p| Price::new(p, instrument.price_precision()));
811
812 let trigger_price = None;
814 let protection_price = None;
816
817 let event_id = UUID4::new();
818 let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
819
820 Some(OrderUpdated::new(
821 trader_id,
822 strategy_id,
823 instrument_id,
824 client_order_id,
825 quantity,
826 event_id,
827 ts_event,
828 ts_init,
829 false, venue_order_id,
831 Some(account_id),
832 price,
833 trigger_price,
834 protection_price,
835 false, ))
837}
838
839pub fn parse_execution_msg(
853 msg: BitmexExecutionMsg,
854 instrument: &InstrumentAny,
855 ts_init: UnixNanos,
856) -> Option<FillReport> {
857 let exec_type = msg.exec_type?;
858
859 match exec_type {
860 BitmexExecType::Trade | BitmexExecType::Liquidation => {}
862 BitmexExecType::Bankruptcy => {
863 log::warn!(
864 "Processing bankruptcy execution as fill: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
865 msg.order_id,
866 msg.symbol,
867 );
868 }
869
870 BitmexExecType::Settlement => {
872 log::debug!(
873 "Settlement execution skipped (not a fill): applies quanto conversion/PnL transfer on contract settlement: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
874 msg.order_id,
875 msg.symbol,
876 );
877 return None;
878 }
879 BitmexExecType::TrialFill => {
880 log::warn!(
881 "Trial fill execution received (testnet only), not processed as fill: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
882 msg.order_id,
883 msg.symbol,
884 );
885 return None;
886 }
887
888 BitmexExecType::Funding => {
890 log::debug!(
891 "Funding execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
892 msg.order_id,
893 msg.symbol,
894 );
895 return None;
896 }
897 BitmexExecType::Insurance => {
898 log::debug!(
899 "Insurance execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
900 msg.order_id,
901 msg.symbol,
902 );
903 return None;
904 }
905 BitmexExecType::Rebalance => {
906 log::debug!(
907 "Rebalance execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
908 msg.order_id,
909 msg.symbol,
910 );
911 return None;
912 }
913
914 BitmexExecType::New
916 | BitmexExecType::Canceled
917 | BitmexExecType::CancelReject
918 | BitmexExecType::Replaced
919 | BitmexExecType::Rejected
920 | BitmexExecType::AmendReject
921 | BitmexExecType::Suspended
922 | BitmexExecType::Released
923 | BitmexExecType::TriggeredOrActivatedBySystem => {
924 log::debug!(
925 "Execution message skipped (order state change, not a fill): exec_type={exec_type:?}, order_id={:?}",
926 msg.order_id,
927 );
928 return None;
929 }
930
931 BitmexExecType::Unknown(ref type_str) => {
932 log::warn!(
933 "Unknown execution type received, skipping: exec_type={type_str}, order_id={:?}, symbol={:?}",
934 msg.order_id,
935 msg.symbol,
936 );
937 return None;
938 }
939 }
940
941 let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
942 let instrument_id = parse_instrument_id(msg.symbol?);
943 let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
944 let trade_id = TradeId::new(msg.trd_match_id?.to_string());
945 let order_side: OrderSide = msg.side.map_or(OrderSide::NoOrderSide, |s| {
946 let side: BitmexSide = s.into();
947 side.into()
948 });
949 let last_qty = parse_signed_contracts_quantity(msg.last_qty?, instrument);
950 let last_px = Price::new(msg.last_px?, instrument.price_precision());
951 let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
952 let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
953 let currency = get_currency(&mapped_currency);
954 let commission = Money::new(msg.commission.unwrap_or(0.0), currency);
955 let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
956 let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
957 let venue_position_id = None; let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
959
960 Some(FillReport::new(
961 account_id,
962 instrument_id,
963 venue_order_id,
964 trade_id,
965 order_side,
966 last_qty,
967 last_px,
968 commission,
969 liquidity_side,
970 client_order_id,
971 venue_position_id,
972 ts_event,
973 ts_init,
974 None,
975 ))
976}
977
978#[must_use]
984pub fn parse_position_msg(
985 msg: &BitmexPositionMsg,
986 instrument: &InstrumentAny,
987 ts_init: UnixNanos,
988) -> PositionStatusReport {
989 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
990 let instrument_id = parse_instrument_id(msg.symbol);
991 let position_side = parse_position_side(msg.current_qty).as_specified();
992 let quantity = parse_signed_contracts_quantity(msg.current_qty.unwrap_or(0), instrument);
993 let venue_position_id = None; let avg_px_open = msg
995 .avg_entry_price
996 .and_then(|p| Decimal::from_str(&p.to_string()).ok());
997 let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
998
999 PositionStatusReport::new(
1000 account_id,
1001 instrument_id,
1002 position_side,
1003 quantity,
1004 ts_last,
1005 ts_init,
1006 None, venue_position_id, avg_px_open, )
1010}
1011
1012#[must_use]
1025pub fn parse_instrument_msg(
1026 msg: &BitmexInstrumentMsg,
1027 instruments_cache: &AHashMap<Ustr, InstrumentAny>,
1028 ts_init: UnixNanos,
1029) -> Vec<Data> {
1030 let mut updates = Vec::new();
1031 let is_index = is_index_symbol(&msg.symbol);
1032
1033 let effective_index_price = if is_index {
1036 msg.last_price
1037 } else {
1038 msg.index_price
1039 };
1040
1041 if msg.mark_price.is_none() && effective_index_price.is_none() {
1045 return updates;
1046 }
1047
1048 let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
1049 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
1050
1051 let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
1053 Some(instrument) => instrument.price_precision(),
1054 None => {
1055 if is_index {
1059 log::trace!(
1060 "Index instrument {} not in cache, skipping update",
1061 msg.symbol
1062 );
1063 } else {
1064 log::debug!("Instrument {} not in cache, skipping update", msg.symbol);
1065 }
1066 return updates;
1067 }
1068 };
1069
1070 if let Some(mark_price) = msg.mark_price {
1073 let price = Price::new(mark_price, price_precision);
1074 updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
1075 instrument_id,
1076 price,
1077 ts_event,
1078 ts_init,
1079 )));
1080 }
1081
1082 if let Some(index_price) = effective_index_price {
1084 let price = Price::new(index_price, price_precision);
1085 updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
1086 instrument_id,
1087 price,
1088 ts_event,
1089 ts_init,
1090 )));
1091 }
1092
1093 updates
1094}
1095
1096#[must_use]
1102pub fn parse_funding_msg(msg: &BitmexFundingMsg, ts_init: UnixNanos) -> FundingRateUpdate {
1103 let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol));
1104 let interval_hours = msg.funding_interval.hour();
1105 let interval_minutes = msg.funding_interval.minute();
1106 let interval = Some((interval_hours * 60 + interval_minutes) as u16);
1107 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
1108
1109 FundingRateUpdate::new(
1110 instrument_id,
1111 msg.funding_rate,
1112 interval,
1113 None, ts_event,
1115 ts_init,
1116 )
1117}
1118
1119#[must_use]
1128pub fn parse_wallet_msg(msg: &BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
1129 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
1130
1131 let currency_str = map_bitmex_currency(msg.currency.as_str());
1133 let currency = get_currency(¤cy_str);
1134
1135 let divisor = bitmex_currency_divisor(msg.currency.as_str());
1138 let amount_dec = Decimal::from(msg.amount.unwrap_or(0)) / divisor;
1139
1140 let balance = AccountBalance::from_total_and_locked(amount_dec, Decimal::ZERO, currency)
1141 .expect("Balance calculation should be valid");
1142
1143 AccountState::new(
1144 account_id,
1145 AccountType::Margin,
1146 vec![balance],
1147 vec![], true, UUID4::new(),
1150 ts_init,
1151 ts_init,
1152 None,
1153 )
1154}
1155
1156#[must_use]
1158pub fn parse_margin_msg(msg: &BitmexMarginMsg) -> MarginBalance {
1159 let currency_str = map_bitmex_currency(msg.currency.as_str());
1160 let currency = get_currency(¤cy_str);
1161
1162 let divisor = bitmex_currency_divisor(msg.currency.as_str());
1163 let initial_dec = Decimal::from(msg.init_margin.unwrap_or(0).max(0)) / divisor;
1164 let maintenance_dec = Decimal::from(msg.maint_margin.unwrap_or(0).max(0)) / divisor;
1165
1166 MarginBalance::new(
1167 Money::from_decimal(initial_dec, currency).unwrap_or_else(|_| Money::zero(currency)),
1168 Money::from_decimal(maintenance_dec, currency).unwrap_or_else(|_| Money::zero(currency)),
1169 None,
1170 )
1171}
1172
1173#[must_use]
1175pub fn parse_margin_account_state(msg: &BitmexMarginMsg, ts_init: UnixNanos) -> AccountState {
1176 let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
1177 let balance = parse_account_balance(msg);
1178
1179 let margin = parse_margin_msg(msg);
1180
1181 let margins = if !margin.initial.is_zero() || !margin.maintenance.is_zero() {
1182 vec![margin]
1183 } else {
1184 vec![]
1185 };
1186
1187 let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "margin.timestamp");
1188
1189 AccountState::new(
1190 account_id,
1191 AccountType::Margin,
1192 vec![balance],
1193 margins,
1194 true,
1195 UUID4::new(),
1196 ts_event,
1197 ts_init,
1198 None,
1199 )
1200}
1201
1202#[cfg(test)]
1203mod tests {
1204 use chrono::{DateTime, Utc};
1205 use nautilus_model::{
1206 enums::{AggressorSide, BookAction, LiquiditySide, PositionSide},
1207 identifiers::Symbol,
1208 instruments::crypto_perpetual::CryptoPerpetual,
1209 };
1210 use rstest::rstest;
1211 use ustr::Ustr;
1212
1213 use super::*;
1214 use crate::common::{
1215 enums::{BitmexExecType, BitmexOrderStatus},
1216 testing::load_test_json,
1217 };
1218
1219 fn create_test_perpetual_instrument_with_precisions(
1221 price_precision: u8,
1222 size_precision: u8,
1223 ) -> InstrumentAny {
1224 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
1225 InstrumentId::from("XBTUSD.BITMEX"),
1226 Symbol::new("XBTUSD"),
1227 Currency::BTC(),
1228 Currency::USD(),
1229 Currency::BTC(),
1230 true, price_precision,
1232 size_precision,
1233 Price::new(0.5, price_precision),
1234 Quantity::new(1.0, size_precision),
1235 None, None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
1249 UnixNanos::default(),
1250 ))
1251 }
1252
1253 fn create_test_perpetual_instrument() -> InstrumentAny {
1254 create_test_perpetual_instrument_with_precisions(1, 0)
1255 }
1256
1257 #[rstest]
1258 fn test_orderbook_l2_message() {
1259 let json_data = load_test_json("ws_orderbook_l2.json");
1260
1261 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1262 let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
1263
1264 let instrument = create_test_perpetual_instrument();
1266
1267 let delta = parse_book_msg(
1269 &msg,
1270 &BitmexAction::Insert,
1271 &instrument,
1272 instrument.id(),
1273 instrument.price_precision(),
1274 UnixNanos::from(3),
1275 );
1276 assert_eq!(delta.instrument_id, instrument_id);
1277 assert_eq!(delta.order.price, Price::from("98459.9"));
1278 assert_eq!(delta.order.size, Quantity::from(33000));
1279 assert_eq!(delta.order.side, OrderSide::Sell);
1280 assert_eq!(delta.order.order_id, 62400580205);
1281 assert_eq!(delta.action, BookAction::Add);
1282 assert_eq!(delta.flags, 0);
1283 assert_eq!(delta.sequence, 0);
1284 assert_eq!(delta.ts_event, 1732436782356000000); assert_eq!(delta.ts_init, 3);
1286
1287 let delta = parse_book_msg(
1289 &msg,
1290 &BitmexAction::Partial,
1291 &instrument,
1292 instrument.id(),
1293 instrument.price_precision(),
1294 UnixNanos::from(3),
1295 );
1296 assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
1297 assert_eq!(delta.action, BookAction::Add);
1298
1299 let delta = parse_book_msg(
1301 &msg,
1302 &BitmexAction::Update,
1303 &instrument,
1304 instrument.id(),
1305 instrument.price_precision(),
1306 UnixNanos::from(3),
1307 );
1308 assert_eq!(delta.flags, 0);
1309 assert_eq!(delta.action, BookAction::Update);
1310 }
1311
1312 #[rstest]
1313 fn test_orderbook10_message() {
1314 let json_data = load_test_json("ws_orderbook_10.json");
1315 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1316 let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
1317 let instrument = create_test_perpetual_instrument();
1318 let depth10 = parse_book10_msg(
1319 &msg,
1320 &instrument,
1321 instrument.id(),
1322 instrument.price_precision(),
1323 UnixNanos::from(3),
1324 )
1325 .unwrap();
1326
1327 assert_eq!(depth10.instrument_id, instrument_id);
1328
1329 assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
1331 assert_eq!(depth10.bids[0].size, Quantity::from(22400));
1332 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1333
1334 assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
1336 assert_eq!(depth10.asks[0].size, Quantity::from(17600));
1337 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1338
1339 assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
1341 assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
1342
1343 assert_eq!(depth10.sequence, 0);
1345 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1346 assert_eq!(depth10.ts_event, 1732436353513000000); assert_eq!(depth10.ts_init, 3);
1348 }
1349
1350 #[rstest]
1351 fn test_quote_message() {
1352 let json_data = load_test_json("ws_quote.json");
1353
1354 let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
1355 let last_quote = QuoteTick::new(
1356 instrument_id,
1357 Price::new(487.50, 2),
1358 Price::new(488.20, 2),
1359 Quantity::from(100_000),
1360 Quantity::from(100_000),
1361 UnixNanos::from(1),
1362 UnixNanos::from(2),
1363 );
1364 let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
1365 let instrument = create_test_perpetual_instrument_with_precisions(2, 0);
1366 let quote = parse_quote_msg(
1367 &msg,
1368 &last_quote,
1369 &instrument,
1370 instrument_id,
1371 instrument.price_precision(),
1372 UnixNanos::from(3),
1373 );
1374
1375 assert_eq!(quote.instrument_id, instrument_id);
1376 assert_eq!(quote.bid_price, Price::from("487.55"));
1377 assert_eq!(quote.ask_price, Price::from("488.25"));
1378 assert_eq!(quote.bid_size, Quantity::from(103_000));
1379 assert_eq!(quote.ask_size, Quantity::from(50_000));
1380 assert_eq!(quote.ts_event, 1732315465085000000);
1381 assert_eq!(quote.ts_init, 3);
1382 }
1383
1384 #[rstest]
1385 fn test_trade_message() {
1386 let json_data = load_test_json("ws_trade.json");
1387
1388 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1389 let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1390 let instrument = create_test_perpetual_instrument();
1391 let trade = parse_trade_msg(
1392 &msg,
1393 &instrument,
1394 instrument.id(),
1395 instrument.price_precision(),
1396 UnixNanos::from(3),
1397 );
1398
1399 assert_eq!(trade.instrument_id, instrument_id);
1400 assert_eq!(trade.price, Price::from("98570.9"));
1401 assert_eq!(trade.size, Quantity::from(100));
1402 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1403 assert_eq!(
1404 trade.trade_id.to_string(),
1405 "00000000-006d-1000-0000-000e8737d536"
1406 );
1407 assert_eq!(trade.ts_event, 1732436138704000000); assert_eq!(trade.ts_init, 3);
1409 }
1410
1411 #[rstest]
1412 fn test_trade_message_derives_trade_id_when_trd_match_id_missing() {
1413 let json_data = load_test_json("ws_trade.json");
1414 let mut msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1415 msg.trd_match_id = None;
1416 let instrument = create_test_perpetual_instrument();
1417
1418 let trade = parse_trade_msg(
1419 &msg,
1420 &instrument,
1421 instrument.id(),
1422 instrument.price_precision(),
1423 UnixNanos::from(3),
1424 );
1425
1426 let mut again_msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1427 again_msg.trd_match_id = None;
1428 let again = parse_trade_msg(
1429 &again_msg,
1430 &instrument,
1431 instrument.id(),
1432 instrument.price_precision(),
1433 UnixNanos::from(3),
1434 );
1435
1436 assert_eq!(trade.trade_id, again.trade_id, "derivation must be stable");
1437 assert_eq!(trade.trade_id.as_str().len(), 16);
1438
1439 let mut altered: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1440 altered.trd_match_id = None;
1441 altered.price += 1.0;
1442 let altered_trade = parse_trade_msg(
1443 &altered,
1444 &instrument,
1445 instrument.id(),
1446 instrument.price_precision(),
1447 UnixNanos::from(3),
1448 );
1449 assert_ne!(trade.trade_id, altered_trade.trade_id);
1450 }
1451
1452 #[rstest]
1453 fn test_trade_bin_message() {
1454 let json_data = load_test_json("ws_trade_bin_1m.json");
1455
1456 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1457 let topic = BitmexWsTopic::TradeBin1m;
1458
1459 let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
1460 let instrument = create_test_perpetual_instrument();
1461 let bar = parse_trade_bin_msg(
1462 &msg,
1463 &topic,
1464 &instrument,
1465 instrument.id(),
1466 instrument.price_precision(),
1467 UnixNanos::from(3),
1468 );
1469
1470 assert_eq!(bar.instrument_id(), instrument_id);
1471 assert_eq!(
1472 bar.bar_type.spec(),
1473 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1474 );
1475 assert_eq!(bar.open, Price::from("97550.0"));
1476 assert_eq!(bar.high, Price::from("97584.4"));
1477 assert_eq!(bar.low, Price::from("97550.0"));
1478 assert_eq!(bar.close, Price::from("97570.1"));
1479 assert_eq!(bar.volume, Quantity::from(84_000));
1480 assert_eq!(bar.ts_event, 1732392420000000000); assert_eq!(bar.ts_init, 3);
1482 }
1483
1484 #[rstest]
1485 fn test_trade_bin_message_extreme_adjustment() {
1486 let topic = BitmexWsTopic::TradeBin1m;
1487 let instrument = create_test_perpetual_instrument();
1488
1489 let msg = BitmexTradeBinMsg {
1490 timestamp: DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
1491 .unwrap()
1492 .with_timezone(&Utc),
1493 symbol: Ustr::from("XBTUSD"),
1494 open: 50_000.0,
1495 high: 49_990.0,
1496 low: 50_010.0,
1497 close: 50_005.0,
1498 trades: 10,
1499 volume: 1_000,
1500 vwap: Some(0.0),
1501 last_size: Some(0),
1502 turnover: 0,
1503 home_notional: 0.0,
1504 foreign_notional: 0.0,
1505 pool: None,
1506 };
1507
1508 let bar = parse_trade_bin_msg(
1509 &msg,
1510 &topic,
1511 &instrument,
1512 instrument.id(),
1513 instrument.price_precision(),
1514 UnixNanos::from(3),
1515 );
1516
1517 assert_eq!(bar.high, Price::from("50010.0"));
1518 assert_eq!(bar.low, Price::from("49990.0"));
1519 assert_eq!(bar.open, Price::from("50000.0"));
1520 assert_eq!(bar.close, Price::from("50005.0"));
1521 assert_eq!(bar.volume, Quantity::from(1_000));
1522 }
1523
1524 #[rstest]
1525 fn test_parse_order_msg() {
1526 let json_data = load_test_json("ws_order.json");
1527 let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1528 let mut cache = AHashMap::new();
1529 let instrument = create_test_perpetual_instrument();
1530 let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
1531
1532 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1533 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1534 assert_eq!(
1535 report.venue_order_id.to_string(),
1536 "550e8400-e29b-41d4-a716-446655440001"
1537 );
1538 assert_eq!(
1539 report.client_order_id.unwrap().to_string(),
1540 "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1541 );
1542 assert_eq!(report.order_side, OrderSide::Buy);
1543 assert_eq!(report.order_type, OrderType::Limit);
1544 assert_eq!(report.time_in_force, TimeInForce::Gtc);
1545 assert_eq!(report.order_status, OrderStatus::Accepted);
1546 assert_eq!(report.quantity, Quantity::from(100));
1547 assert_eq!(report.filled_qty, Quantity::from(0));
1548 assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1549 assert_eq!(report.ts_accepted, 1732530600000000000); }
1551
1552 #[rstest]
1553 fn test_parse_order_msg_infers_type_when_missing() {
1554 let json_data = load_test_json("ws_order.json");
1555 let mut msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1556 msg.ord_type = None;
1557 msg.cl_ord_id = None;
1558 msg.price = Some(98_000.0);
1559 msg.stop_px = None;
1560
1561 let mut cache = AHashMap::new();
1562 let instrument = create_test_perpetual_instrument();
1563
1564 let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
1565
1566 assert_eq!(report.order_type, OrderType::Limit);
1567 }
1568
1569 #[rstest]
1570 fn test_parse_order_msg_rejected_with_reason() {
1571 let mut msg: BitmexOrderMsg =
1572 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1573 msg.ord_status = BitmexOrderStatus::Rejected;
1574 msg.ord_rej_reason = Some(Ustr::from("Insufficient available balance"));
1575 msg.text = None;
1576 msg.cum_qty = 0;
1577
1578 let mut cache = AHashMap::new();
1579 let instrument = create_test_perpetual_instrument();
1580 let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
1581
1582 assert_eq!(report.order_status, OrderStatus::Rejected);
1583 assert_eq!(
1584 report.cancel_reason,
1585 Some("Insufficient available balance".to_string())
1586 );
1587 }
1588
1589 #[rstest]
1590 fn test_parse_order_msg_rejected_with_text_fallback() {
1591 let mut msg: BitmexOrderMsg =
1592 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1593 msg.ord_status = BitmexOrderStatus::Rejected;
1594 msg.ord_rej_reason = None;
1595 msg.text = Some(Ustr::from("Order would execute immediately"));
1596 msg.cum_qty = 0;
1597
1598 let mut cache = AHashMap::new();
1599 let instrument = create_test_perpetual_instrument();
1600 let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
1601
1602 assert_eq!(report.order_status, OrderStatus::Rejected);
1603 assert_eq!(
1604 report.cancel_reason,
1605 Some("Order would execute immediately".to_string())
1606 );
1607 }
1608
1609 #[rstest]
1610 fn test_parse_order_msg_rejected_without_reason() {
1611 let mut msg: BitmexOrderMsg =
1612 serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1613 msg.ord_status = BitmexOrderStatus::Rejected;
1614 msg.ord_rej_reason = None;
1615 msg.text = None;
1616 msg.cum_qty = 0;
1617
1618 let mut cache = AHashMap::new();
1619 let instrument = create_test_perpetual_instrument();
1620 let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
1621
1622 assert_eq!(report.order_status, OrderStatus::Rejected);
1623 assert_eq!(report.cancel_reason, None);
1624 }
1625
1626 #[rstest]
1627 fn test_parse_execution_msg() {
1628 let json_data = load_test_json("ws_execution.json");
1629 let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1630 let instrument = create_test_perpetual_instrument();
1631 let fill = parse_execution_msg(msg, &instrument, UnixNanos::default()).unwrap();
1632
1633 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1634 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1635 assert_eq!(
1636 fill.venue_order_id.to_string(),
1637 "550e8400-e29b-41d4-a716-446655440002"
1638 );
1639 assert_eq!(
1640 fill.trade_id.to_string(),
1641 "00000000-006d-1000-0000-000e8737d540"
1642 );
1643 assert_eq!(
1644 fill.client_order_id.unwrap().to_string(),
1645 "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1646 );
1647 assert_eq!(fill.order_side, OrderSide::Sell);
1648 assert_eq!(fill.last_qty, Quantity::from(100));
1649 assert_eq!(fill.last_px, Price::from("98950.0"));
1650 assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1651 assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1652 assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1653 assert_eq!(fill.ts_event, 1732530900789000000); }
1655
1656 #[rstest]
1657 fn test_parse_execution_msg_non_trade() {
1658 let mut msg: BitmexExecutionMsg =
1660 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1661 msg.exec_type = Some(BitmexExecType::Settlement);
1662
1663 let instrument = create_test_perpetual_instrument();
1664 let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1665 assert!(result.is_none());
1666 }
1667
1668 #[rstest]
1669 fn test_parse_cancel_reject_execution() {
1670 let json = load_test_json("ws_execution_cancel_reject.json");
1672
1673 let msg: BitmexExecutionMsg = serde_json::from_str(&json).unwrap();
1674 assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1675 assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1676 assert_eq!(msg.symbol, None);
1677
1678 let instrument = create_test_perpetual_instrument();
1680 let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1681 assert!(result.is_none());
1682 }
1683
1684 #[rstest]
1685 fn test_parse_execution_msg_liquidation() {
1686 let mut msg: BitmexExecutionMsg =
1688 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1689 msg.exec_type = Some(BitmexExecType::Liquidation);
1690
1691 let instrument = create_test_perpetual_instrument();
1692 let fill = parse_execution_msg(msg, &instrument, UnixNanos::default()).unwrap();
1693
1694 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1695 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1696 assert_eq!(fill.order_side, OrderSide::Sell);
1697 assert_eq!(fill.last_qty, Quantity::from(100));
1698 assert_eq!(fill.last_px, Price::from("98950.0"));
1699 }
1700
1701 #[rstest]
1702 fn test_parse_execution_msg_bankruptcy() {
1703 let mut msg: BitmexExecutionMsg =
1704 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1705 msg.exec_type = Some(BitmexExecType::Bankruptcy);
1706
1707 let instrument = create_test_perpetual_instrument();
1708 let fill = parse_execution_msg(msg, &instrument, UnixNanos::default()).unwrap();
1709
1710 assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1711 assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1712 assert_eq!(fill.order_side, OrderSide::Sell);
1713 assert_eq!(fill.last_qty, Quantity::from(100));
1714 }
1715
1716 #[rstest]
1717 fn test_parse_execution_msg_settlement() {
1718 let mut msg: BitmexExecutionMsg =
1719 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1720 msg.exec_type = Some(BitmexExecType::Settlement);
1721
1722 let instrument = create_test_perpetual_instrument();
1723 let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1724 assert!(result.is_none());
1725 }
1726
1727 #[rstest]
1728 fn test_parse_execution_msg_trial_fill() {
1729 let mut msg: BitmexExecutionMsg =
1730 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1731 msg.exec_type = Some(BitmexExecType::TrialFill);
1732
1733 let instrument = create_test_perpetual_instrument();
1734 let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1735 assert!(result.is_none());
1736 }
1737
1738 #[rstest]
1739 fn test_parse_execution_msg_funding() {
1740 let mut msg: BitmexExecutionMsg =
1741 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1742 msg.exec_type = Some(BitmexExecType::Funding);
1743
1744 let instrument = create_test_perpetual_instrument();
1745 let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1746 assert!(result.is_none());
1747 }
1748
1749 #[rstest]
1750 fn test_parse_execution_msg_insurance() {
1751 let mut msg: BitmexExecutionMsg =
1752 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1753 msg.exec_type = Some(BitmexExecType::Insurance);
1754
1755 let instrument = create_test_perpetual_instrument();
1756 let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1757 assert!(result.is_none());
1758 }
1759
1760 #[rstest]
1761 fn test_parse_execution_msg_rebalance() {
1762 let mut msg: BitmexExecutionMsg =
1763 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1764 msg.exec_type = Some(BitmexExecType::Rebalance);
1765
1766 let instrument = create_test_perpetual_instrument();
1767 let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1768 assert!(result.is_none());
1769 }
1770
1771 #[rstest]
1772 fn test_parse_execution_msg_order_state_changes() {
1773 let instrument = create_test_perpetual_instrument();
1774
1775 let order_state_types = vec![
1776 BitmexExecType::New,
1777 BitmexExecType::Canceled,
1778 BitmexExecType::CancelReject,
1779 BitmexExecType::Replaced,
1780 BitmexExecType::Rejected,
1781 BitmexExecType::AmendReject,
1782 BitmexExecType::Suspended,
1783 BitmexExecType::Released,
1784 BitmexExecType::TriggeredOrActivatedBySystem,
1785 ];
1786
1787 for exec_type in order_state_types {
1788 let mut msg: BitmexExecutionMsg =
1789 serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1790 msg.exec_type = Some(exec_type.clone());
1791
1792 let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1793 assert!(
1794 result.is_none(),
1795 "Expected None for exec_type {exec_type:?}"
1796 );
1797 }
1798 }
1799
1800 #[rstest]
1801 fn test_parse_position_msg() {
1802 let json_data = load_test_json("ws_position.json");
1803 let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1804 let instrument = create_test_perpetual_instrument();
1805 let report = parse_position_msg(&msg, &instrument, UnixNanos::default());
1806
1807 assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1808 assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1809 assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1810 assert_eq!(report.quantity, Quantity::from(1000));
1811 assert!(report.venue_position_id.is_none());
1812 assert_eq!(report.ts_last, 1732530900789000000); }
1814
1815 #[rstest]
1816 fn test_parse_position_msg_short() {
1817 let mut msg: BitmexPositionMsg =
1818 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1819 msg.current_qty = Some(-500);
1820
1821 let instrument = create_test_perpetual_instrument();
1822 let report = parse_position_msg(&msg, &instrument, UnixNanos::default());
1823 assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1824 assert_eq!(report.quantity, Quantity::from(500));
1825 }
1826
1827 #[rstest]
1828 fn test_parse_position_msg_flat() {
1829 let mut msg: BitmexPositionMsg =
1830 serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1831 msg.current_qty = Some(0);
1832
1833 let instrument = create_test_perpetual_instrument();
1834 let report = parse_position_msg(&msg, &instrument, UnixNanos::default());
1835 assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1836 assert_eq!(report.quantity, Quantity::from(0));
1837 }
1838
1839 #[rstest]
1840 fn test_parse_wallet_msg() {
1841 let json_data = load_test_json("ws_wallet.json");
1842 let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1843 let ts_init = UnixNanos::from(1);
1844 let account_state = parse_wallet_msg(&msg, ts_init);
1845
1846 assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1847 assert!(!account_state.balances.is_empty());
1848 let balance = &account_state.balances[0];
1849 assert_eq!(balance.currency.code.to_string(), "XBT");
1850 assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1852 assert_eq!(balance.locked.as_f64(), 0.0);
1854 assert_eq!(balance.free.as_decimal(), balance.total.as_decimal());
1855 }
1856
1857 #[rstest]
1858 fn test_parse_wallet_msg_no_amount() {
1859 let mut msg: BitmexWalletMsg =
1860 serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1861 msg.amount = None;
1862
1863 let ts_init = UnixNanos::from(1);
1864 let account_state = parse_wallet_msg(&msg, ts_init);
1865 let balance = &account_state.balances[0];
1866 assert_eq!(balance.total.as_f64(), 0.0);
1867 }
1868
1869 #[rstest]
1870 fn test_parse_margin_msg() {
1871 let json_data = load_test_json("ws_margin.json");
1872 let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1873 let margin_balance = parse_margin_msg(&msg);
1874
1875 assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1876 assert!(margin_balance.instrument_id.is_none());
1877 assert_eq!(margin_balance.initial.as_f64(), 0.0);
1880 assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1882 }
1883
1884 #[rstest]
1885 fn test_parse_margin_msg_no_available() {
1886 let mut msg: BitmexMarginMsg =
1887 serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1888 msg.available_margin = None;
1889
1890 let margin_balance = parse_margin_msg(&msg);
1891 assert!(margin_balance.initial.as_f64() >= 0.0);
1893 assert!(margin_balance.maintenance.as_f64() >= 0.0);
1894 }
1895
1896 #[rstest]
1897 fn test_parse_margin_account_state_includes_margins() {
1898 let msg = BitmexMarginMsg {
1899 account: 123456,
1900 currency: Ustr::from("USDt"),
1901 risk_limit: None,
1902 amount: Some(5_000_000_000),
1903 prev_realised_pnl: None,
1904 gross_comm: None,
1905 gross_open_cost: None,
1906 gross_open_premium: None,
1907 gross_exec_cost: None,
1908 gross_mark_value: None,
1909 risk_value: None,
1910 init_margin: Some(200_000_000), maint_margin: Some(100_000_000), target_excess_margin: None,
1913 realised_pnl: None,
1914 unrealised_pnl: None,
1915 wallet_balance: Some(5_000_000_000), margin_balance: None,
1917 margin_leverage: None,
1918 margin_used_pcnt: None,
1919 excess_margin: None,
1920 available_margin: Some(4_800_000_000), withdrawable_margin: None,
1922 maker_fee_discount: None,
1923 taker_fee_discount: None,
1924 timestamp: DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
1925 foreign_margin_balance: None,
1926 foreign_requirement: None,
1927 };
1928
1929 let ts_init = UnixNanos::from(1_000_000_000u64);
1930 let state = parse_margin_account_state(&msg, ts_init);
1931
1932 assert_eq!(state.account_id.to_string(), "BITMEX-123456");
1933 assert_eq!(state.account_type, AccountType::Margin);
1934 assert_eq!(state.balances.len(), 1);
1935 assert_eq!(state.margins.len(), 1);
1936
1937 let balance = &state.balances[0];
1938 assert_eq!(balance.total.as_f64(), 5000.0);
1939
1940 let margin = &state.margins[0];
1941 assert!(margin.instrument_id.is_none());
1942 assert_eq!(margin.currency.code.as_str(), "USDT");
1943 assert_eq!(margin.initial.as_f64(), 200.0);
1944 assert_eq!(margin.maintenance.as_f64(), 100.0);
1945 }
1946
1947 #[rstest]
1948 fn test_parse_margin_account_state_zero_margins_excluded() {
1949 let msg = BitmexMarginMsg {
1950 account: 123456,
1951 currency: Ustr::from("XBt"),
1952 risk_limit: None,
1953 amount: Some(100_000_000),
1954 prev_realised_pnl: None,
1955 gross_comm: None,
1956 gross_open_cost: None,
1957 gross_open_premium: None,
1958 gross_exec_cost: None,
1959 gross_mark_value: None,
1960 risk_value: None,
1961 init_margin: Some(0),
1962 maint_margin: Some(0),
1963 target_excess_margin: None,
1964 realised_pnl: None,
1965 unrealised_pnl: None,
1966 wallet_balance: Some(100_000_000),
1967 margin_balance: None,
1968 margin_leverage: None,
1969 margin_used_pcnt: None,
1970 excess_margin: None,
1971 available_margin: Some(100_000_000),
1972 withdrawable_margin: None,
1973 maker_fee_discount: None,
1974 taker_fee_discount: None,
1975 timestamp: DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
1976 foreign_margin_balance: None,
1977 foreign_requirement: None,
1978 };
1979
1980 let state = parse_margin_account_state(&msg, UnixNanos::from(1_000_000_000u64));
1981
1982 assert_eq!(state.balances.len(), 1);
1983 assert_eq!(state.margins.len(), 0);
1984 }
1985
1986 #[rstest]
1987 fn test_parse_instrument_msg_both_prices() {
1988 let json_data = load_test_json("ws_instrument.json");
1989 let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1990
1991 let mut instruments_cache = AHashMap::new();
1993 let test_instrument = create_test_perpetual_instrument();
1994 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1995
1996 let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
1997
1998 assert_eq!(updates.len(), 2);
2000
2001 match &updates[0] {
2003 Data::MarkPriceUpdate(update) => {
2004 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
2005 assert_eq!(update.value.as_f64(), 95125.7);
2006 }
2007 _ => panic!("Expected MarkPriceUpdate at index 0"),
2008 }
2009
2010 match &updates[1] {
2012 Data::IndexPriceUpdate(update) => {
2013 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
2014 assert_eq!(update.value.as_f64(), 95124.3);
2015 }
2016 _ => panic!("Expected IndexPriceUpdate at index 1"),
2017 }
2018 }
2019
2020 #[rstest]
2021 fn test_parse_instrument_msg_mark_price_only() {
2022 let mut msg: BitmexInstrumentMsg =
2023 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
2024 msg.index_price = None;
2025
2026 let mut instruments_cache = AHashMap::new();
2028 let test_instrument = create_test_perpetual_instrument();
2029 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
2030
2031 let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
2032
2033 assert_eq!(updates.len(), 1);
2034 match &updates[0] {
2035 Data::MarkPriceUpdate(update) => {
2036 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
2037 assert_eq!(update.value.as_f64(), 95125.7);
2038 }
2039 _ => panic!("Expected MarkPriceUpdate"),
2040 }
2041 }
2042
2043 #[rstest]
2044 fn test_parse_instrument_msg_index_price_only() {
2045 let mut msg: BitmexInstrumentMsg =
2046 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
2047 msg.mark_price = None;
2048
2049 let mut instruments_cache = AHashMap::new();
2051 let test_instrument = create_test_perpetual_instrument();
2052 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
2053
2054 let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
2055
2056 assert_eq!(updates.len(), 1);
2057 match &updates[0] {
2058 Data::IndexPriceUpdate(update) => {
2059 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
2060 assert_eq!(update.value.as_f64(), 95124.3);
2061 }
2062 _ => panic!("Expected IndexPriceUpdate"),
2063 }
2064 }
2065
2066 #[rstest]
2067 fn test_parse_instrument_msg_no_prices() {
2068 let mut msg: BitmexInstrumentMsg =
2069 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
2070 msg.mark_price = None;
2071 msg.index_price = None;
2072 msg.last_price = None;
2073
2074 let mut instruments_cache = AHashMap::new();
2076 let test_instrument = create_test_perpetual_instrument();
2077 instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
2078
2079 let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
2080 assert_eq!(updates.len(), 0);
2081 }
2082
2083 #[rstest]
2084 fn test_parse_instrument_msg_index_symbol() {
2085 let mut msg: BitmexInstrumentMsg =
2088 serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
2089 msg.symbol = Ustr::from(".BXBT");
2090 msg.last_price = Some(119163.05);
2091 msg.mark_price = Some(119163.05); msg.index_price = None;
2093
2094 let instrument_id = InstrumentId::from(".BXBT.BITMEX");
2096 let instrument = CryptoPerpetual::new(
2097 instrument_id,
2098 Symbol::from(".BXBT"),
2099 Currency::BTC(),
2100 Currency::USD(),
2101 Currency::USD(),
2102 false, 2, 8, Price::from("0.01"),
2106 Quantity::from("0.00000001"),
2107 None, None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(), UnixNanos::default(), );
2123 let mut instruments_cache = AHashMap::new();
2124 instruments_cache.insert(
2125 Ustr::from(".BXBT"),
2126 InstrumentAny::CryptoPerpetual(instrument),
2127 );
2128
2129 let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
2130
2131 assert_eq!(updates.len(), 2);
2132
2133 match &updates[0] {
2135 Data::MarkPriceUpdate(update) => {
2136 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
2137 assert_eq!(update.value, Price::from("119163.05"));
2138 }
2139 _ => panic!("Expected MarkPriceUpdate for index symbol"),
2140 }
2141
2142 match &updates[1] {
2144 Data::IndexPriceUpdate(update) => {
2145 assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
2146 assert_eq!(update.value, Price::from("119163.05"));
2147 assert_eq!(update.ts_init, UnixNanos::from(1));
2148 }
2149 _ => panic!("Expected IndexPriceUpdate for index symbol"),
2150 }
2151 }
2152
2153 #[rstest]
2154 fn test_parse_funding_msg() {
2155 let json_data = load_test_json("ws_funding_rate.json");
2156 let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
2157 let update = parse_funding_msg(&msg, UnixNanos::from(1));
2158
2159 assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
2160 assert_eq!(update.rate.to_string(), "0.0001");
2161 assert_eq!(update.interval, Some(60 * 8));
2162 assert!(update.next_funding_ns.is_none());
2163 assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
2164 assert_eq!(update.ts_init, UnixNanos::from(1));
2165 }
2166}