Skip to main content

nautilus_bitmex/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsers that convert BitMEX WebSocket payloads into Nautilus data structures.
17
18use std::{num::NonZero, str::FromStr};
19
20use ahash::AHashMap;
21use chrono::Timelike;
22use nautilus_core::{UnixNanos, uuid::UUID4};
23#[cfg(test)]
24use nautilus_model::types::Currency;
25use nautilus_model::{
26    data::{
27        Bar, BarSpecification, BarType, BookOrder, Data, FundingRateUpdate, IndexPriceUpdate,
28        MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
29        depth::DEPTH10_LEN,
30    },
31    enums::{
32        AccountType, AggregationSource, BarAggregation, OrderSide, OrderStatus, OrderType,
33        PriceType, RecordFlag, TimeInForce, TrailingOffsetType,
34    },
35    events::{
36        OrderAccepted, OrderCanceled, OrderExpired, OrderRejected, OrderTriggered, OrderUpdated,
37        account::state::AccountState,
38    },
39    identifiers::{
40        AccountId, ClientOrderId, InstrumentId, OrderListId, StrategyId, Symbol, TradeId, TraderId,
41        VenueOrderId,
42    },
43    instruments::{Instrument, InstrumentAny},
44    reports::{FillReport, OrderStatusReport, PositionStatusReport},
45    types::{AccountBalance, MarginBalance, Money, Price, Quantity},
46};
47use rust_decimal::Decimal;
48use ustr::Ustr;
49
50use super::{
51    enums::{BitmexAction, BitmexWsTopic},
52    messages::{
53        BitmexExecutionMsg, BitmexFundingMsg, BitmexInstrumentMsg, BitmexMarginMsg,
54        BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexOrderMsg, BitmexPositionMsg,
55        BitmexQuoteMsg, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg,
56    },
57};
58use crate::{
59    common::{
60        consts::BITMEX_VENUE,
61        enums::{
62            BitmexExecInstruction, BitmexExecType, BitmexOrderStatus, BitmexOrderType,
63            BitmexPegPriceType, BitmexSide,
64        },
65        parse::{
66            bitmex_currency_divisor, clean_reason, derive_trade_id, extract_trigger_type,
67            map_bitmex_currency, normalize_trade_bin_prices, normalize_trade_bin_volume,
68            parse_account_balance, parse_contracts_quantity, parse_fractional_quantity,
69            parse_instrument_id, parse_liquidity_side, parse_optional_datetime_to_unix_nanos,
70            parse_position_side, parse_signed_contracts_quantity,
71        },
72    },
73    http::parse::get_currency,
74    websocket::messages::BitmexOrderUpdateMsg,
75};
76
77const BAR_SPEC_1_MINUTE: BarSpecification = BarSpecification {
78    step: NonZero::new(1).expect("1 is a valid non-zero usize"),
79    aggregation: BarAggregation::Minute,
80    price_type: PriceType::Last,
81};
82const BAR_SPEC_5_MINUTE: BarSpecification = BarSpecification {
83    step: NonZero::new(5).expect("5 is a valid non-zero usize"),
84    aggregation: BarAggregation::Minute,
85    price_type: PriceType::Last,
86};
87const BAR_SPEC_1_HOUR: BarSpecification = BarSpecification {
88    step: NonZero::new(1).expect("1 is a valid non-zero usize"),
89    aggregation: BarAggregation::Hour,
90    price_type: PriceType::Last,
91};
92const BAR_SPEC_1_DAY: BarSpecification = BarSpecification {
93    step: NonZero::new(1).expect("1 is a valid non-zero usize"),
94    aggregation: BarAggregation::Day,
95    price_type: PriceType::Last,
96};
97
98/// Check if a symbol is an index symbol (starts with '.').
99///
100/// Index symbols in BitMEX represent indices like `.BXBT` and have different
101/// behavior from regular instruments:
102/// - They only have a single price value (no bid/ask spread).
103/// - They don't have trades or quotes.
104/// - Their price is delivered via the `lastPrice` field.
105#[inline]
106#[must_use]
107pub fn is_index_symbol(symbol: &Ustr) -> bool {
108    symbol.starts_with('.')
109}
110
111/// Converts a batch of BitMEX order-book rows into Nautilus delta events.
112#[must_use]
113pub fn parse_book_msg_vec(
114    data: Vec<BitmexOrderBookMsg>,
115    action: BitmexAction,
116    instruments: &AHashMap<Ustr, InstrumentAny>,
117    ts_init: UnixNanos,
118) -> Vec<Data> {
119    let mut deltas = Vec::with_capacity(data.len());
120
121    for msg in data {
122        if let Some(instrument) = instruments.get(&msg.symbol) {
123            let instrument_id = instrument.id();
124            let price_precision = instrument.price_precision();
125            deltas.push(Data::Delta(parse_book_msg(
126                &msg,
127                &action,
128                instrument,
129                instrument_id,
130                price_precision,
131                ts_init,
132            )));
133        } else {
134            log::error!(
135                "Instrument cache miss: book delta dropped for symbol={}",
136                msg.symbol
137            );
138        }
139    }
140
141    // Set F_LAST on the last delta so data engine knows the batch is complete
142    if let Some(Data::Delta(last_delta)) = deltas.last_mut() {
143        *last_delta = OrderBookDelta::new(
144            last_delta.instrument_id,
145            last_delta.action,
146            last_delta.order,
147            last_delta.flags | RecordFlag::F_LAST as u8,
148            last_delta.sequence,
149            last_delta.ts_event,
150            last_delta.ts_init,
151        );
152    }
153
154    deltas
155}
156
157/// Converts BitMEX level-10 snapshots into Nautilus depth events.
158#[must_use]
159pub fn parse_book10_msg_vec(
160    data: Vec<BitmexOrderBook10Msg>,
161    instruments: &AHashMap<Ustr, InstrumentAny>,
162    ts_init: UnixNanos,
163) -> Vec<Data> {
164    let mut depths = Vec::with_capacity(data.len());
165
166    for msg in data {
167        if let Some(instrument) = instruments.get(&msg.symbol) {
168            let instrument_id = instrument.id();
169            let price_precision = instrument.price_precision();
170            match parse_book10_msg(&msg, instrument, instrument_id, price_precision, ts_init) {
171                Ok(depth) => depths.push(Data::Depth10(Box::new(depth))),
172                Err(e) => {
173                    log::error!("Failed to parse orderBook10 for symbol={}: {e}", msg.symbol);
174                }
175            }
176        } else {
177            log::error!(
178                "Instrument cache miss: depth10 message dropped for symbol={}",
179                msg.symbol
180            );
181        }
182    }
183    depths
184}
185
186/// Converts BitMEX trade messages into Nautilus trade data events.
187#[must_use]
188pub fn parse_trade_msg_vec(
189    data: Vec<BitmexTradeMsg>,
190    instruments: &AHashMap<Ustr, InstrumentAny>,
191    ts_init: UnixNanos,
192) -> Vec<Data> {
193    let mut trades = Vec::with_capacity(data.len());
194
195    for msg in data {
196        if let Some(instrument) = instruments.get(&msg.symbol) {
197            let instrument_id = instrument.id();
198            let price_precision = instrument.price_precision();
199            trades.push(Data::Trade(parse_trade_msg(
200                &msg,
201                instrument,
202                instrument_id,
203                price_precision,
204                ts_init,
205            )));
206        } else {
207            log::error!(
208                "Instrument cache miss: trade message dropped for symbol={}",
209                msg.symbol
210            );
211        }
212    }
213    trades
214}
215
216/// Converts aggregated trade-bin messages into Nautilus data events.
217#[must_use]
218pub fn parse_trade_bin_msg_vec(
219    data: Vec<BitmexTradeBinMsg>,
220    topic: &BitmexWsTopic,
221    instruments: &AHashMap<Ustr, InstrumentAny>,
222    ts_init: UnixNanos,
223) -> Vec<Data> {
224    let mut trades = Vec::with_capacity(data.len());
225
226    for msg in data {
227        if let Some(instrument) = instruments.get(&msg.symbol) {
228            let instrument_id = instrument.id();
229            let price_precision = instrument.price_precision();
230            trades.push(Data::Bar(parse_trade_bin_msg(
231                &msg,
232                topic,
233                instrument,
234                instrument_id,
235                price_precision,
236                ts_init,
237            )));
238        } else {
239            log::error!(
240                "Instrument cache miss: trade bin (bar) dropped for symbol={}",
241                msg.symbol
242            );
243        }
244    }
245    trades
246}
247
248/// Converts a BitMEX order book row into a Nautilus order-book delta.
249#[must_use]
250pub fn parse_book_msg(
251    msg: &BitmexOrderBookMsg,
252    action: &BitmexAction,
253    instrument: &InstrumentAny,
254    instrument_id: InstrumentId,
255    price_precision: u8,
256    ts_init: UnixNanos,
257) -> OrderBookDelta {
258    let flags = if action == &BitmexAction::Partial {
259        RecordFlag::F_SNAPSHOT as u8
260    } else {
261        0
262    };
263
264    let action = action.as_book_action();
265    let price = Price::new(msg.price, price_precision);
266    let side = msg.side.as_order_side();
267    let size = parse_contracts_quantity(msg.size.unwrap_or(0), instrument);
268    let order_id = msg.id;
269    let order = BookOrder::new(side, price, size, order_id);
270    let sequence = 0; // Not available
271    let ts_event = UnixNanos::from(msg.timestamp);
272
273    OrderBookDelta::new(
274        instrument_id,
275        action,
276        order,
277        flags,
278        sequence,
279        ts_event,
280        ts_init,
281    )
282}
283
284/// Parses an `OrderBook10` message into an `OrderBookDepth10` object.
285///
286/// # Errors
287///
288/// Returns an error if the bid or ask arrays are not exactly 10 elements.
289pub fn parse_book10_msg(
290    msg: &BitmexOrderBook10Msg,
291    instrument: &InstrumentAny,
292    instrument_id: InstrumentId,
293    price_precision: u8,
294    ts_init: UnixNanos,
295) -> anyhow::Result<OrderBookDepth10> {
296    let mut bids = Vec::with_capacity(DEPTH10_LEN);
297    let mut asks = Vec::with_capacity(DEPTH10_LEN);
298
299    // Initialized with zeros
300    let mut bid_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
301    let mut ask_counts: [u32; DEPTH10_LEN] = [0; DEPTH10_LEN];
302
303    for (i, level) in msg.bids.iter().enumerate() {
304        let bid_order = BookOrder::new(
305            OrderSide::Buy,
306            Price::new(level[0], price_precision),
307            parse_fractional_quantity(level[1], instrument),
308            0,
309        );
310
311        bids.push(bid_order);
312        bid_counts[i] = 1;
313    }
314
315    for (i, level) in msg.asks.iter().enumerate() {
316        let ask_order = BookOrder::new(
317            OrderSide::Sell,
318            Price::new(level[0], price_precision),
319            parse_fractional_quantity(level[1], instrument),
320            0,
321        );
322
323        asks.push(ask_order);
324        ask_counts[i] = 1;
325    }
326
327    let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
328        anyhow::anyhow!(
329            "Bids length mismatch: expected {DEPTH10_LEN}, was {}",
330            v.len()
331        )
332    })?;
333    let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
334        anyhow::anyhow!(
335            "Asks length mismatch: expected {DEPTH10_LEN}, was {}",
336            v.len()
337        )
338    })?;
339
340    let ts_event = UnixNanos::from(msg.timestamp);
341
342    Ok(OrderBookDepth10::new(
343        instrument_id,
344        bids,
345        asks,
346        bid_counts,
347        ask_counts,
348        RecordFlag::F_SNAPSHOT as u8,
349        0, // Not applicable for BitMEX L2 books
350        ts_event,
351        ts_init,
352    ))
353}
354
355/// Converts a BitMEX quote message into a `QuoteTick`, filling missing data from cache.
356#[must_use]
357pub fn parse_quote_msg(
358    msg: &BitmexQuoteMsg,
359    last_quote: &QuoteTick,
360    instrument: &InstrumentAny,
361    instrument_id: InstrumentId,
362    price_precision: u8,
363    ts_init: UnixNanos,
364) -> QuoteTick {
365    let bid_price = match msg.bid_price {
366        Some(price) => Price::new(price, price_precision),
367        None => last_quote.bid_price,
368    };
369
370    let ask_price = match msg.ask_price {
371        Some(price) => Price::new(price, price_precision),
372        None => last_quote.ask_price,
373    };
374
375    let bid_size = match msg.bid_size {
376        Some(size) => parse_contracts_quantity(size, instrument),
377        None => last_quote.bid_size,
378    };
379
380    let ask_size = match msg.ask_size {
381        Some(size) => parse_contracts_quantity(size, instrument),
382        None => last_quote.ask_size,
383    };
384
385    let ts_event = UnixNanos::from(msg.timestamp);
386
387    QuoteTick::new(
388        instrument_id,
389        bid_price,
390        ask_price,
391        bid_size,
392        ask_size,
393        ts_event,
394        ts_init,
395    )
396}
397
398/// Converts a BitMEX trade message into a `TradeTick`.
399#[must_use]
400pub fn parse_trade_msg(
401    msg: &BitmexTradeMsg,
402    instrument: &InstrumentAny,
403    instrument_id: InstrumentId,
404    price_precision: u8,
405    ts_init: UnixNanos,
406) -> TradeTick {
407    let price = Price::new(msg.price, price_precision);
408    let size = parse_contracts_quantity(msg.size, instrument);
409    let aggressor_side = msg.side.as_aggressor_side();
410    let ts_event = UnixNanos::from(msg.timestamp);
411    let trade_id = match msg.trd_match_id {
412        Some(uuid) => TradeId::new(uuid.to_string()),
413        None => derive_trade_id(
414            msg.symbol,
415            ts_event.as_u64(),
416            msg.price,
417            msg.size as i64,
418            Some(msg.side.into()),
419        ),
420    };
421
422    TradeTick::new(
423        instrument_id,
424        price,
425        size,
426        aggressor_side,
427        trade_id,
428        ts_event,
429        ts_init,
430    )
431}
432
433/// Converts a BitMEX trade-bin summary into a `Bar` for the matching topic.
434#[must_use]
435pub fn parse_trade_bin_msg(
436    msg: &BitmexTradeBinMsg,
437    topic: &BitmexWsTopic,
438    instrument: &InstrumentAny,
439    instrument_id: InstrumentId,
440    price_precision: u8,
441    ts_init: UnixNanos,
442) -> Bar {
443    let spec = bar_spec_from_topic(topic);
444    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
445
446    let open = Price::new(msg.open, price_precision);
447    let high = Price::new(msg.high, price_precision);
448    let low = Price::new(msg.low, price_precision);
449    let close = Price::new(msg.close, price_precision);
450
451    let (open, high, low, close) =
452        normalize_trade_bin_prices(open, high, low, close, &msg.symbol, Some(&bar_type));
453
454    let volume_contracts = normalize_trade_bin_volume(Some(msg.volume), &msg.symbol);
455    let volume = parse_contracts_quantity(volume_contracts, instrument);
456    let ts_event = UnixNanos::from(msg.timestamp);
457
458    Bar::new(bar_type, open, high, low, close, volume, ts_event, ts_init)
459}
460
461/// Converts a WebSocket topic to a bar specification.
462///
463/// Returns `BAR_SPEC_1_MINUTE` and logs an error for unsupported topics.
464#[must_use]
465pub fn bar_spec_from_topic(topic: &BitmexWsTopic) -> BarSpecification {
466    match topic {
467        BitmexWsTopic::TradeBin1m => BAR_SPEC_1_MINUTE,
468        BitmexWsTopic::TradeBin5m => BAR_SPEC_5_MINUTE,
469        BitmexWsTopic::TradeBin1h => BAR_SPEC_1_HOUR,
470        BitmexWsTopic::TradeBin1d => BAR_SPEC_1_DAY,
471        _ => {
472            log::error!("Bar specification not supported: topic={topic:?}");
473            BAR_SPEC_1_MINUTE
474        }
475    }
476}
477
478/// Converts a bar specification to a WebSocket topic.
479///
480/// Returns `TradeBin1m` and logs an error for unsupported specifications.
481#[must_use]
482pub fn topic_from_bar_spec(spec: BarSpecification) -> BitmexWsTopic {
483    match spec {
484        BAR_SPEC_1_MINUTE => BitmexWsTopic::TradeBin1m,
485        BAR_SPEC_5_MINUTE => BitmexWsTopic::TradeBin5m,
486        BAR_SPEC_1_HOUR => BitmexWsTopic::TradeBin1h,
487        BAR_SPEC_1_DAY => BitmexWsTopic::TradeBin1d,
488        _ => {
489            log::error!("Bar specification not supported: spec={spec:?}");
490            BitmexWsTopic::TradeBin1m
491        }
492    }
493}
494
495fn infer_order_type_from_msg(msg: &BitmexOrderMsg) -> OrderType {
496    if msg.stop_px.is_some() {
497        if msg.price.is_some() {
498            OrderType::StopLimit
499        } else {
500            OrderType::StopMarket
501        }
502    } else if msg.price.is_some() {
503        OrderType::Limit
504    } else {
505        OrderType::Market
506    }
507}
508
509/// Parse a BitMEX WebSocket order message into a Nautilus `OrderStatusReport`.
510///
511/// # References
512///
513/// <https://www.bitmex.com/app/wsAPI#Order>
514///
515/// # Errors
516///
517/// Returns an error if the time in force conversion fails.
518pub fn parse_order_msg(
519    msg: &BitmexOrderMsg,
520    instrument: &InstrumentAny,
521    order_type_cache: &mut AHashMap<ClientOrderId, OrderType>,
522    ts_init: UnixNanos,
523) -> anyhow::Result<OrderStatusReport> {
524    let account_id = AccountId::new(format!("BITMEX-{}", msg.account)); // TODO: Revisit
525    let instrument_id = parse_instrument_id(msg.symbol);
526    let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
527    let common_side: BitmexSide = msg.side.into();
528    let order_side: OrderSide = common_side.into();
529
530    let order_type: OrderType = if let Some(ord_type) = msg.ord_type {
531        // Pegged orders with TrailingStopPeg are trailing stop orders
532        if ord_type == BitmexOrderType::Pegged
533            && msg.peg_price_type == Some(BitmexPegPriceType::TrailingStopPeg)
534        {
535            if msg.price.is_some() {
536                OrderType::TrailingStopLimit
537            } else {
538                OrderType::TrailingStopMarket
539            }
540        } else {
541            ord_type.into()
542        }
543    } else if let Some(client_order_id) = msg.cl_ord_id {
544        let client_order_id = ClientOrderId::new(client_order_id);
545        if let Some(&cached) = order_type_cache.get(&client_order_id) {
546            cached
547        } else {
548            let inferred = infer_order_type_from_msg(msg);
549            order_type_cache.insert(client_order_id, inferred);
550            inferred
551        }
552    } else {
553        infer_order_type_from_msg(msg)
554    };
555
556    let time_in_force: TimeInForce = match msg.time_in_force {
557        Some(tif) => tif.try_into().map_err(|e| anyhow::anyhow!("{e}"))?,
558        None => TimeInForce::Gtc,
559    };
560    let order_status: OrderStatus = msg.ord_status.into();
561    let quantity = parse_signed_contracts_quantity(msg.order_qty, instrument);
562    let filled_qty = parse_signed_contracts_quantity(msg.cum_qty, instrument);
563    let report_id = UUID4::new();
564    let ts_accepted =
565        parse_optional_datetime_to_unix_nanos(&Some(msg.transact_time), "transact_time");
566    let ts_last = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
567
568    let mut report = OrderStatusReport::new(
569        account_id,
570        instrument_id,
571        None, // client_order_id - will be set later if present
572        venue_order_id,
573        order_side,
574        order_type,
575        time_in_force,
576        order_status,
577        quantity,
578        filled_qty,
579        ts_accepted,
580        ts_last,
581        ts_init,
582        Some(report_id),
583    );
584
585    if let Some(cl_ord_id) = &msg.cl_ord_id {
586        report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
587    }
588
589    if let Some(cl_ord_link_id) = &msg.cl_ord_link_id {
590        report = report.with_order_list_id(OrderListId::new(cl_ord_link_id));
591    }
592
593    if let Some(price) = msg.price {
594        report = report.with_price(Price::new(price, instrument.price_precision()));
595    }
596
597    if let Some(avg_px) = msg.avg_px {
598        report = report.with_avg_px(avg_px)?;
599    }
600
601    if let Some(trigger_price) = msg.stop_px {
602        report = report
603            .with_trigger_price(Price::new(trigger_price, instrument.price_precision()))
604            .with_trigger_type(extract_trigger_type(msg.exec_inst.as_ref()));
605    }
606
607    // Populate trailing offset for trailing stop orders
608    if matches!(
609        order_type,
610        OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
611    ) && let Some(peg_offset) = msg.peg_offset_value
612    {
613        let trailing_offset = Decimal::try_from(peg_offset.abs())
614            .unwrap_or_else(|_| Decimal::new(peg_offset.abs() as i64, 0));
615        report = report
616            .with_trailing_offset(trailing_offset)
617            .with_trailing_offset_type(TrailingOffsetType::Price);
618
619        if msg.stop_px.is_none() {
620            report = report.with_trigger_type(extract_trigger_type(msg.exec_inst.as_ref()));
621        }
622    }
623
624    if let Some(exec_insts) = &msg.exec_inst {
625        for exec_inst in exec_insts {
626            match exec_inst {
627                BitmexExecInstruction::ParticipateDoNotInitiate => {
628                    report = report.with_post_only(true);
629                }
630                BitmexExecInstruction::ReduceOnly => {
631                    report = report.with_reduce_only(true);
632                }
633                _ => {}
634            }
635        }
636    }
637
638    // Extract rejection reason for rejected orders
639    if order_status == OrderStatus::Rejected {
640        if let Some(reason_str) = msg.ord_rej_reason.or(msg.text) {
641            log::debug!(
642                "Order rejected with reason: order_id={:?}, client_order_id={:?}, reason={:?}",
643                venue_order_id,
644                msg.cl_ord_id,
645                reason_str,
646            );
647            report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
648        } else {
649            log::debug!(
650                "Order rejected without reason from BitMEX: order_id={:?}, client_order_id={:?}, ord_status={:?}, ord_rej_reason={:?}, text={:?}",
651                venue_order_id,
652                msg.cl_ord_id,
653                msg.ord_status,
654                msg.ord_rej_reason,
655                msg.text,
656            );
657        }
658    }
659
660    // Check if this is a canceled post-only order (BitMEX cancels instead of rejecting)
661    // We need to preserve the rejection reason for the execution client to handle
662    if order_status == OrderStatus::Canceled
663        && let Some(reason_str) = msg.ord_rej_reason.or(msg.text)
664    {
665        report = report.with_cancel_reason(clean_reason(reason_str.as_ref()));
666    }
667
668    Ok(report)
669}
670
671/// Parsed order event variants produced by [`parse_order_event`] for tracked orders.
672#[derive(Debug, Clone)]
673pub enum ParsedOrderEvent {
674    Accepted(OrderAccepted),
675    Canceled(OrderCanceled),
676    Expired(OrderExpired),
677    Triggered(OrderTriggered),
678    Rejected(OrderRejected),
679}
680
681/// Converts a full BitMEX order message into a [`ParsedOrderEvent`] for tracked orders.
682///
683/// Returns `None` for transitional statuses (`PendingNew`, `PendingCancel`, `PendingReplace`)
684/// and for fill-related statuses (`PartiallyFilled`, `Filled`, `Rejected`) that are handled
685/// through other channels (Execution table for fills, HTTP response for rejections).
686pub fn parse_order_event(
687    msg: &BitmexOrderMsg,
688    client_order_id: ClientOrderId,
689    account_id: AccountId,
690    trader_id: TraderId,
691    strategy_id: StrategyId,
692    ts_init: UnixNanos,
693) -> Option<ParsedOrderEvent> {
694    let instrument_id = parse_instrument_id(msg.symbol);
695    let venue_order_id = VenueOrderId::new(msg.order_id.to_string());
696    let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "timestamp");
697
698    match msg.ord_status {
699        BitmexOrderStatus::New => {
700            let accepted = OrderAccepted::new(
701                trader_id,
702                strategy_id,
703                instrument_id,
704                client_order_id,
705                venue_order_id,
706                account_id,
707                UUID4::new(),
708                ts_event,
709                ts_init,
710                false,
711            );
712            Some(ParsedOrderEvent::Accepted(accepted))
713        }
714        BitmexOrderStatus::Canceled => {
715            // BitMEX cancels post-only orders instead of rejecting them when they
716            // would cross the spread. Detect via "ParticipateDoNotInitiate" reason.
717            let cancel_reason = msg
718                .ord_rej_reason
719                .or(msg.text)
720                .map(|r| clean_reason(r.as_ref()));
721
722            let is_post_only_rejection = cancel_reason
723                .as_deref()
724                .is_some_and(|r| r.contains("ParticipateDoNotInitiate"));
725
726            if is_post_only_rejection {
727                let rejected = OrderRejected::new(
728                    trader_id,
729                    strategy_id,
730                    instrument_id,
731                    client_order_id,
732                    account_id,
733                    Ustr::from(
734                        cancel_reason
735                            .as_deref()
736                            .unwrap_or("Post-only order rejected"),
737                    ),
738                    UUID4::new(),
739                    ts_event,
740                    ts_init,
741                    false,
742                    true, // due_post_only
743                );
744                Some(ParsedOrderEvent::Rejected(rejected))
745            } else {
746                let canceled = OrderCanceled::new(
747                    trader_id,
748                    strategy_id,
749                    instrument_id,
750                    client_order_id,
751                    UUID4::new(),
752                    ts_event,
753                    ts_init,
754                    false,
755                    Some(venue_order_id),
756                    Some(account_id),
757                );
758                Some(ParsedOrderEvent::Canceled(canceled))
759            }
760        }
761        BitmexOrderStatus::Expired => {
762            let expired = OrderExpired::new(
763                trader_id,
764                strategy_id,
765                instrument_id,
766                client_order_id,
767                UUID4::new(),
768                ts_event,
769                ts_init,
770                false,
771                Some(venue_order_id),
772                Some(account_id),
773            );
774            Some(ParsedOrderEvent::Expired(expired))
775        }
776        // Rejections: handled at submit time via HTTP response
777        // Fills: handled via the Execution table, not order status updates
778        // Transitional: PendingNew, PendingCancel, PendingReplace
779        _ => None,
780    }
781}
782
783/// Parse a BitMEX WebSocket order update message into a Nautilus `OrderUpdated` event.
784///
785/// This handles partial updates where only changed fields are present.
786pub fn parse_order_update_msg(
787    msg: &BitmexOrderUpdateMsg,
788    instrument: &InstrumentAny,
789    account_id: AccountId,
790    ts_init: UnixNanos,
791) -> Option<OrderUpdated> {
792    // Uses external IDs; callers enrich with tracked identity when available
793    let trader_id = TraderId::external();
794    let strategy_id = StrategyId::external();
795    let instrument_id = parse_instrument_id(msg.symbol);
796    let venue_order_id = Some(VenueOrderId::new(msg.order_id.to_string()));
797    let client_order_id = msg
798        .cl_ord_id
799        .as_ref()
800        .map_or_else(ClientOrderId::external, ClientOrderId::new);
801
802    // BitMEX partial updates may omit leaves_qty/cum_qty. When missing, we fall back
803    // to zero which signals the execution engine to use the cached order quantity.
804    let quantity = match (msg.leaves_qty, msg.cum_qty) {
805        (Some(leaves), Some(cum)) => parse_contracts_quantity((leaves + cum) as u64, instrument),
806        _ => Quantity::zero(instrument.size_precision()),
807    };
808    let price = msg
809        .price
810        .map(|p| Price::new(p, instrument.price_precision()));
811
812    // BitMEX doesn't send trigger price in regular order updates?
813    let trigger_price = None;
814    // BitMEX doesn't send protection price in regular order updates
815    let protection_price = None;
816
817    let event_id = UUID4::new();
818    let ts_event = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
819
820    Some(OrderUpdated::new(
821        trader_id,
822        strategy_id,
823        instrument_id,
824        client_order_id,
825        quantity,
826        event_id,
827        ts_event,
828        ts_init,
829        false, // reconciliation
830        venue_order_id,
831        Some(account_id),
832        price,
833        trigger_price,
834        protection_price,
835        false, // is_quote_quantity
836    ))
837}
838
839/// Parse a BitMEX WebSocket execution message into a Nautilus `FillReport`.
840///
841/// Handles different execution types appropriately:
842/// - `Trade`: Normal trade execution → FillReport
843/// - `Liquidation`: Auto-deleveraging or liquidation → FillReport
844/// - `Bankruptcy`: Bankruptcy execution → FillReport (with warning)
845/// - `Settlement`, `TrialFill`: Non-obvious cases → None (with warning)
846/// - `Funding`, `Insurance`, `Rebalance`: Expected non-fills → None (debug log)
847/// - Order state changes (`New`, `Canceled`, etc.): → None (debug log)
848///
849/// # References
850///
851/// <https://www.bitmex.com/app/wsAPI#Execution>
852pub fn parse_execution_msg(
853    msg: BitmexExecutionMsg,
854    instrument: &InstrumentAny,
855    ts_init: UnixNanos,
856) -> Option<FillReport> {
857    let exec_type = msg.exec_type?;
858
859    match exec_type {
860        // Position-affecting executions that generate fills
861        BitmexExecType::Trade | BitmexExecType::Liquidation => {}
862        BitmexExecType::Bankruptcy => {
863            log::warn!(
864                "Processing bankruptcy execution as fill: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
865                msg.order_id,
866                msg.symbol,
867            );
868        }
869
870        // Settlement executions are mark-to-market events, not fills
871        BitmexExecType::Settlement => {
872            log::debug!(
873                "Settlement execution skipped (not a fill): applies quanto conversion/PnL transfer on contract settlement: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
874                msg.order_id,
875                msg.symbol,
876            );
877            return None;
878        }
879        BitmexExecType::TrialFill => {
880            log::warn!(
881                "Trial fill execution received (testnet only), not processed as fill: exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
882                msg.order_id,
883                msg.symbol,
884            );
885            return None;
886        }
887
888        // Expected non-fill executions
889        BitmexExecType::Funding => {
890            log::debug!(
891                "Funding execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
892                msg.order_id,
893                msg.symbol,
894            );
895            return None;
896        }
897        BitmexExecType::Insurance => {
898            log::debug!(
899                "Insurance execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
900                msg.order_id,
901                msg.symbol,
902            );
903            return None;
904        }
905        BitmexExecType::Rebalance => {
906            log::debug!(
907                "Rebalance execution skipped (not a fill): exec_type={exec_type:?}, order_id={:?}, symbol={:?}",
908                msg.order_id,
909                msg.symbol,
910            );
911            return None;
912        }
913
914        // Order state changes (not fills)
915        BitmexExecType::New
916        | BitmexExecType::Canceled
917        | BitmexExecType::CancelReject
918        | BitmexExecType::Replaced
919        | BitmexExecType::Rejected
920        | BitmexExecType::AmendReject
921        | BitmexExecType::Suspended
922        | BitmexExecType::Released
923        | BitmexExecType::TriggeredOrActivatedBySystem => {
924            log::debug!(
925                "Execution message skipped (order state change, not a fill): exec_type={exec_type:?}, order_id={:?}",
926                msg.order_id,
927            );
928            return None;
929        }
930
931        BitmexExecType::Unknown(ref type_str) => {
932            log::warn!(
933                "Unknown execution type received, skipping: exec_type={type_str}, order_id={:?}, symbol={:?}",
934                msg.order_id,
935                msg.symbol,
936            );
937            return None;
938        }
939    }
940
941    let account_id = AccountId::new(format!("BITMEX-{}", msg.account?));
942    let instrument_id = parse_instrument_id(msg.symbol?);
943    let venue_order_id = VenueOrderId::new(msg.order_id?.to_string());
944    let trade_id = TradeId::new(msg.trd_match_id?.to_string());
945    let order_side: OrderSide = msg.side.map_or(OrderSide::NoOrderSide, |s| {
946        let side: BitmexSide = s.into();
947        side.into()
948    });
949    let last_qty = parse_signed_contracts_quantity(msg.last_qty?, instrument);
950    let last_px = Price::new(msg.last_px?, instrument.price_precision());
951    let settlement_currency_str = msg.settl_currency.unwrap_or(Ustr::from("XBT"));
952    let mapped_currency = map_bitmex_currency(settlement_currency_str.as_str());
953    let currency = get_currency(&mapped_currency);
954    let commission = Money::new(msg.commission.unwrap_or(0.0), currency);
955    let liquidity_side = parse_liquidity_side(&msg.last_liquidity_ind);
956    let client_order_id = msg.cl_ord_id.map(ClientOrderId::new);
957    let venue_position_id = None; // Not applicable on BitMEX
958    let ts_event = parse_optional_datetime_to_unix_nanos(&msg.transact_time, "transact_time");
959
960    Some(FillReport::new(
961        account_id,
962        instrument_id,
963        venue_order_id,
964        trade_id,
965        order_side,
966        last_qty,
967        last_px,
968        commission,
969        liquidity_side,
970        client_order_id,
971        venue_position_id,
972        ts_event,
973        ts_init,
974        None,
975    ))
976}
977
978/// Parse a BitMEX WebSocket position message into a Nautilus `PositionStatusReport`.
979///
980/// # References
981///
982/// <https://www.bitmex.com/app/wsAPI#Position>
983#[must_use]
984pub fn parse_position_msg(
985    msg: &BitmexPositionMsg,
986    instrument: &InstrumentAny,
987    ts_init: UnixNanos,
988) -> PositionStatusReport {
989    let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
990    let instrument_id = parse_instrument_id(msg.symbol);
991    let position_side = parse_position_side(msg.current_qty).as_specified();
992    let quantity = parse_signed_contracts_quantity(msg.current_qty.unwrap_or(0), instrument);
993    let venue_position_id = None; // Not applicable on BitMEX
994    let avg_px_open = msg
995        .avg_entry_price
996        .and_then(|p| Decimal::from_str(&p.to_string()).ok());
997    let ts_last = parse_optional_datetime_to_unix_nanos(&msg.timestamp, "timestamp");
998
999    PositionStatusReport::new(
1000        account_id,
1001        instrument_id,
1002        position_side,
1003        quantity,
1004        ts_last,
1005        ts_init,
1006        None,              // report_id
1007        venue_position_id, // venue_position_id
1008        avg_px_open,       // avg_px_open
1009    )
1010}
1011
1012/// Parse a BitMEX WebSocket instrument message for mark and index prices.
1013///
1014/// For index symbols (e.g., `.BXBT`):
1015/// - Uses the `lastPrice` field as the index price.
1016/// - Also emits the `markPrice` field (which equals `lastPrice` for indices).
1017///
1018/// For regular instruments:
1019/// - Uses the `index_price` field for index price updates.
1020/// - Uses the `mark_price` field for mark price updates.
1021///
1022/// Returns a Vec of Data containing mark and/or index price updates
1023/// or an empty Vec if no relevant price is present.
1024#[must_use]
1025pub fn parse_instrument_msg(
1026    msg: &BitmexInstrumentMsg,
1027    instruments_cache: &AHashMap<Ustr, InstrumentAny>,
1028    ts_init: UnixNanos,
1029) -> Vec<Data> {
1030    let mut updates = Vec::new();
1031    let is_index = is_index_symbol(&msg.symbol);
1032
1033    // For index symbols (like .BXBT), the lastPrice field contains the index price
1034    // For regular instruments, use the explicit index_price field if present
1035    let effective_index_price = if is_index {
1036        msg.last_price
1037    } else {
1038        msg.index_price
1039    };
1040
1041    // Return early if no relevant prices present (mark_price or effective_index_price)
1042    // Note: effective_index_price uses lastPrice for index symbols, index_price for others
1043    // (Funding rates come through a separate Funding channel)
1044    if msg.mark_price.is_none() && effective_index_price.is_none() {
1045        return updates;
1046    }
1047
1048    let instrument_id = InstrumentId::new(Symbol::from_ustr_unchecked(msg.symbol), *BITMEX_VENUE);
1049    let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
1050
1051    // Look up instrument for proper precision
1052    let price_precision = match instruments_cache.get(&Ustr::from(&msg.symbol)) {
1053        Some(instrument) => instrument.price_precision(),
1054        None => {
1055            // BitMEX sends updates for all instruments on the instrument channel,
1056            // but we only cache instruments that are explicitly requested.
1057            // Index instruments (starting with '.') are not loaded via regular API endpoints.
1058            if is_index {
1059                log::trace!(
1060                    "Index instrument {} not in cache, skipping update",
1061                    msg.symbol
1062                );
1063            } else {
1064                log::debug!("Instrument {} not in cache, skipping update", msg.symbol);
1065            }
1066            return updates;
1067        }
1068    };
1069
1070    // Add mark price update if present
1071    // For index symbols, markPrice equals lastPrice and is valid to emit
1072    if let Some(mark_price) = msg.mark_price {
1073        let price = Price::new(mark_price, price_precision);
1074        updates.push(Data::MarkPriceUpdate(MarkPriceUpdate::new(
1075            instrument_id,
1076            price,
1077            ts_event,
1078            ts_init,
1079        )));
1080    }
1081
1082    // Add index price update if present
1083    if let Some(index_price) = effective_index_price {
1084        let price = Price::new(index_price, price_precision);
1085        updates.push(Data::IndexPriceUpdate(IndexPriceUpdate::new(
1086            instrument_id,
1087            price,
1088            ts_event,
1089            ts_init,
1090        )));
1091    }
1092
1093    updates
1094}
1095
1096/// Parse a BitMEX WebSocket funding message.
1097///
1098/// Returns `FundingRateUpdate` containing funding rate information.
1099/// Note: This returns `FundingRateUpdate` directly, not wrapped in Data enum,
1100/// to keep it separate from the FFI layer.
1101#[must_use]
1102pub fn parse_funding_msg(msg: &BitmexFundingMsg, ts_init: UnixNanos) -> FundingRateUpdate {
1103    let instrument_id = InstrumentId::from(format!("{}.BITMEX", msg.symbol));
1104    let interval_hours = msg.funding_interval.hour();
1105    let interval_minutes = msg.funding_interval.minute();
1106    let interval = Some((interval_hours * 60 + interval_minutes) as u16);
1107    let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "");
1108
1109    FundingRateUpdate::new(
1110        instrument_id,
1111        msg.funding_rate,
1112        interval,
1113        None, // Next funding time not provided in this message
1114        ts_event,
1115        ts_init,
1116    )
1117}
1118
1119/// Parse a BitMEX wallet message into an AccountState.
1120///
1121/// BitMEX uses XBT (satoshis) as the base unit for Bitcoin.
1122/// 1 XBT = 0.00000001 BTC (1 satoshi).
1123///
1124/// # Panics
1125///
1126/// Panics if the balance calculation is invalid (total != locked + free).
1127#[must_use]
1128pub fn parse_wallet_msg(msg: &BitmexWalletMsg, ts_init: UnixNanos) -> AccountState {
1129    let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
1130
1131    // Map BitMEX currency to standard currency code
1132    let currency_str = map_bitmex_currency(msg.currency.as_str());
1133    let currency = get_currency(&currency_str);
1134
1135    // Wallet messages do not expose locked margin; treat the full balance as free
1136    // and let the centralized helper enforce `total == locked + free` at currency precision.
1137    let divisor = bitmex_currency_divisor(msg.currency.as_str());
1138    let amount_dec = Decimal::from(msg.amount.unwrap_or(0)) / divisor;
1139
1140    let balance = AccountBalance::from_total_and_locked(amount_dec, Decimal::ZERO, currency)
1141        .expect("Balance calculation should be valid");
1142
1143    AccountState::new(
1144        account_id,
1145        AccountType::Margin,
1146        vec![balance],
1147        vec![], // margins will be added separately
1148        true,   // is_reported
1149        UUID4::new(),
1150        ts_init,
1151        ts_init,
1152        None,
1153    )
1154}
1155
1156/// Parse a BitMEX margin message into an account-wide [`MarginBalance`].
1157#[must_use]
1158pub fn parse_margin_msg(msg: &BitmexMarginMsg) -> MarginBalance {
1159    let currency_str = map_bitmex_currency(msg.currency.as_str());
1160    let currency = get_currency(&currency_str);
1161
1162    let divisor = bitmex_currency_divisor(msg.currency.as_str());
1163    let initial_dec = Decimal::from(msg.init_margin.unwrap_or(0).max(0)) / divisor;
1164    let maintenance_dec = Decimal::from(msg.maint_margin.unwrap_or(0).max(0)) / divisor;
1165
1166    MarginBalance::new(
1167        Money::from_decimal(initial_dec, currency).unwrap_or_else(|_| Money::zero(currency)),
1168        Money::from_decimal(maintenance_dec, currency).unwrap_or_else(|_| Money::zero(currency)),
1169        None,
1170    )
1171}
1172
1173/// Parses a BitMEX margin message into an [`AccountState`] with balances and margins.
1174#[must_use]
1175pub fn parse_margin_account_state(msg: &BitmexMarginMsg, ts_init: UnixNanos) -> AccountState {
1176    let account_id = AccountId::new(format!("BITMEX-{}", msg.account));
1177    let balance = parse_account_balance(msg);
1178
1179    let margin = parse_margin_msg(msg);
1180
1181    let margins = if !margin.initial.is_zero() || !margin.maintenance.is_zero() {
1182        vec![margin]
1183    } else {
1184        vec![]
1185    };
1186
1187    let ts_event = parse_optional_datetime_to_unix_nanos(&Some(msg.timestamp), "margin.timestamp");
1188
1189    AccountState::new(
1190        account_id,
1191        AccountType::Margin,
1192        vec![balance],
1193        margins,
1194        true,
1195        UUID4::new(),
1196        ts_event,
1197        ts_init,
1198        None,
1199    )
1200}
1201
1202#[cfg(test)]
1203mod tests {
1204    use chrono::{DateTime, Utc};
1205    use nautilus_model::{
1206        enums::{AggressorSide, BookAction, LiquiditySide, PositionSide},
1207        identifiers::Symbol,
1208        instruments::crypto_perpetual::CryptoPerpetual,
1209    };
1210    use rstest::rstest;
1211    use ustr::Ustr;
1212
1213    use super::*;
1214    use crate::common::{
1215        enums::{BitmexExecType, BitmexOrderStatus},
1216        testing::load_test_json,
1217    };
1218
1219    // Helper function to create a test perpetual instrument for tests
1220    fn create_test_perpetual_instrument_with_precisions(
1221        price_precision: u8,
1222        size_precision: u8,
1223    ) -> InstrumentAny {
1224        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
1225            InstrumentId::from("XBTUSD.BITMEX"),
1226            Symbol::new("XBTUSD"),
1227            Currency::BTC(),
1228            Currency::USD(),
1229            Currency::BTC(),
1230            true, // is_inverse
1231            price_precision,
1232            size_precision,
1233            Price::new(0.5, price_precision),
1234            Quantity::new(1.0, size_precision),
1235            None, // multiplier
1236            None, // lot_size
1237            None, // max_quantity
1238            None, // min_quantity
1239            None, // max_notional
1240            None, // min_notional
1241            None, // max_price
1242            None, // min_price
1243            None, // margin_init
1244            None, // margin_maint
1245            None, // maker_fee
1246            None, // taker_fee
1247            None, // info
1248            UnixNanos::default(),
1249            UnixNanos::default(),
1250        ))
1251    }
1252
1253    fn create_test_perpetual_instrument() -> InstrumentAny {
1254        create_test_perpetual_instrument_with_precisions(1, 0)
1255    }
1256
1257    #[rstest]
1258    fn test_orderbook_l2_message() {
1259        let json_data = load_test_json("ws_orderbook_l2.json");
1260
1261        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1262        let msg: BitmexOrderBookMsg = serde_json::from_str(&json_data).unwrap();
1263
1264        // Test Insert action
1265        let instrument = create_test_perpetual_instrument();
1266
1267        // Test Insert action (no snapshot flag)
1268        let delta = parse_book_msg(
1269            &msg,
1270            &BitmexAction::Insert,
1271            &instrument,
1272            instrument.id(),
1273            instrument.price_precision(),
1274            UnixNanos::from(3),
1275        );
1276        assert_eq!(delta.instrument_id, instrument_id);
1277        assert_eq!(delta.order.price, Price::from("98459.9"));
1278        assert_eq!(delta.order.size, Quantity::from(33000));
1279        assert_eq!(delta.order.side, OrderSide::Sell);
1280        assert_eq!(delta.order.order_id, 62400580205);
1281        assert_eq!(delta.action, BookAction::Add);
1282        assert_eq!(delta.flags, 0);
1283        assert_eq!(delta.sequence, 0);
1284        assert_eq!(delta.ts_event, 1732436782356000000); // 2024-11-24T08:26:22.356Z in nanos
1285        assert_eq!(delta.ts_init, 3);
1286
1287        // Test Partial action (should have F_SNAPSHOT flag)
1288        let delta = parse_book_msg(
1289            &msg,
1290            &BitmexAction::Partial,
1291            &instrument,
1292            instrument.id(),
1293            instrument.price_precision(),
1294            UnixNanos::from(3),
1295        );
1296        assert_eq!(delta.flags, RecordFlag::F_SNAPSHOT as u8);
1297        assert_eq!(delta.action, BookAction::Add);
1298
1299        // Test Update action (no flags)
1300        let delta = parse_book_msg(
1301            &msg,
1302            &BitmexAction::Update,
1303            &instrument,
1304            instrument.id(),
1305            instrument.price_precision(),
1306            UnixNanos::from(3),
1307        );
1308        assert_eq!(delta.flags, 0);
1309        assert_eq!(delta.action, BookAction::Update);
1310    }
1311
1312    #[rstest]
1313    fn test_orderbook10_message() {
1314        let json_data = load_test_json("ws_orderbook_10.json");
1315        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1316        let msg: BitmexOrderBook10Msg = serde_json::from_str(&json_data).unwrap();
1317        let instrument = create_test_perpetual_instrument();
1318        let depth10 = parse_book10_msg(
1319            &msg,
1320            &instrument,
1321            instrument.id(),
1322            instrument.price_precision(),
1323            UnixNanos::from(3),
1324        )
1325        .unwrap();
1326
1327        assert_eq!(depth10.instrument_id, instrument_id);
1328
1329        // Check first bid level
1330        assert_eq!(depth10.bids[0].price, Price::from("98490.3"));
1331        assert_eq!(depth10.bids[0].size, Quantity::from(22400));
1332        assert_eq!(depth10.bids[0].side, OrderSide::Buy);
1333
1334        // Check first ask level
1335        assert_eq!(depth10.asks[0].price, Price::from("98490.4"));
1336        assert_eq!(depth10.asks[0].size, Quantity::from(17600));
1337        assert_eq!(depth10.asks[0].side, OrderSide::Sell);
1338
1339        // Check counts (should be 1 for each populated level)
1340        assert_eq!(depth10.bid_counts, [1; DEPTH10_LEN]);
1341        assert_eq!(depth10.ask_counts, [1; DEPTH10_LEN]);
1342
1343        // Check flags and timestamps
1344        assert_eq!(depth10.sequence, 0);
1345        assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT as u8);
1346        assert_eq!(depth10.ts_event, 1732436353513000000); // 2024-11-24T08:19:13.513Z in nanos
1347        assert_eq!(depth10.ts_init, 3);
1348    }
1349
1350    #[rstest]
1351    fn test_quote_message() {
1352        let json_data = load_test_json("ws_quote.json");
1353
1354        let instrument_id = InstrumentId::from("BCHUSDT.BITMEX");
1355        let last_quote = QuoteTick::new(
1356            instrument_id,
1357            Price::new(487.50, 2),
1358            Price::new(488.20, 2),
1359            Quantity::from(100_000),
1360            Quantity::from(100_000),
1361            UnixNanos::from(1),
1362            UnixNanos::from(2),
1363        );
1364        let msg: BitmexQuoteMsg = serde_json::from_str(&json_data).unwrap();
1365        let instrument = create_test_perpetual_instrument_with_precisions(2, 0);
1366        let quote = parse_quote_msg(
1367            &msg,
1368            &last_quote,
1369            &instrument,
1370            instrument_id,
1371            instrument.price_precision(),
1372            UnixNanos::from(3),
1373        );
1374
1375        assert_eq!(quote.instrument_id, instrument_id);
1376        assert_eq!(quote.bid_price, Price::from("487.55"));
1377        assert_eq!(quote.ask_price, Price::from("488.25"));
1378        assert_eq!(quote.bid_size, Quantity::from(103_000));
1379        assert_eq!(quote.ask_size, Quantity::from(50_000));
1380        assert_eq!(quote.ts_event, 1732315465085000000);
1381        assert_eq!(quote.ts_init, 3);
1382    }
1383
1384    #[rstest]
1385    fn test_trade_message() {
1386        let json_data = load_test_json("ws_trade.json");
1387
1388        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1389        let msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1390        let instrument = create_test_perpetual_instrument();
1391        let trade = parse_trade_msg(
1392            &msg,
1393            &instrument,
1394            instrument.id(),
1395            instrument.price_precision(),
1396            UnixNanos::from(3),
1397        );
1398
1399        assert_eq!(trade.instrument_id, instrument_id);
1400        assert_eq!(trade.price, Price::from("98570.9"));
1401        assert_eq!(trade.size, Quantity::from(100));
1402        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
1403        assert_eq!(
1404            trade.trade_id.to_string(),
1405            "00000000-006d-1000-0000-000e8737d536"
1406        );
1407        assert_eq!(trade.ts_event, 1732436138704000000); // 2024-11-24T08:15:38.704Z in nanos
1408        assert_eq!(trade.ts_init, 3);
1409    }
1410
1411    #[rstest]
1412    fn test_trade_message_derives_trade_id_when_trd_match_id_missing() {
1413        let json_data = load_test_json("ws_trade.json");
1414        let mut msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1415        msg.trd_match_id = None;
1416        let instrument = create_test_perpetual_instrument();
1417
1418        let trade = parse_trade_msg(
1419            &msg,
1420            &instrument,
1421            instrument.id(),
1422            instrument.price_precision(),
1423            UnixNanos::from(3),
1424        );
1425
1426        let mut again_msg: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1427        again_msg.trd_match_id = None;
1428        let again = parse_trade_msg(
1429            &again_msg,
1430            &instrument,
1431            instrument.id(),
1432            instrument.price_precision(),
1433            UnixNanos::from(3),
1434        );
1435
1436        assert_eq!(trade.trade_id, again.trade_id, "derivation must be stable");
1437        assert_eq!(trade.trade_id.as_str().len(), 16);
1438
1439        let mut altered: BitmexTradeMsg = serde_json::from_str(&json_data).unwrap();
1440        altered.trd_match_id = None;
1441        altered.price += 1.0;
1442        let altered_trade = parse_trade_msg(
1443            &altered,
1444            &instrument,
1445            instrument.id(),
1446            instrument.price_precision(),
1447            UnixNanos::from(3),
1448        );
1449        assert_ne!(trade.trade_id, altered_trade.trade_id);
1450    }
1451
1452    #[rstest]
1453    fn test_trade_bin_message() {
1454        let json_data = load_test_json("ws_trade_bin_1m.json");
1455
1456        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
1457        let topic = BitmexWsTopic::TradeBin1m;
1458
1459        let msg: BitmexTradeBinMsg = serde_json::from_str(&json_data).unwrap();
1460        let instrument = create_test_perpetual_instrument();
1461        let bar = parse_trade_bin_msg(
1462            &msg,
1463            &topic,
1464            &instrument,
1465            instrument.id(),
1466            instrument.price_precision(),
1467            UnixNanos::from(3),
1468        );
1469
1470        assert_eq!(bar.instrument_id(), instrument_id);
1471        assert_eq!(
1472            bar.bar_type.spec(),
1473            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
1474        );
1475        assert_eq!(bar.open, Price::from("97550.0"));
1476        assert_eq!(bar.high, Price::from("97584.4"));
1477        assert_eq!(bar.low, Price::from("97550.0"));
1478        assert_eq!(bar.close, Price::from("97570.1"));
1479        assert_eq!(bar.volume, Quantity::from(84_000));
1480        assert_eq!(bar.ts_event, 1732392420000000000); // 2024-11-23T20:07:00.000Z in nanos
1481        assert_eq!(bar.ts_init, 3);
1482    }
1483
1484    #[rstest]
1485    fn test_trade_bin_message_extreme_adjustment() {
1486        let topic = BitmexWsTopic::TradeBin1m;
1487        let instrument = create_test_perpetual_instrument();
1488
1489        let msg = BitmexTradeBinMsg {
1490            timestamp: DateTime::parse_from_rfc3339("2024-01-01T00:00:00Z")
1491                .unwrap()
1492                .with_timezone(&Utc),
1493            symbol: Ustr::from("XBTUSD"),
1494            open: 50_000.0,
1495            high: 49_990.0,
1496            low: 50_010.0,
1497            close: 50_005.0,
1498            trades: 10,
1499            volume: 1_000,
1500            vwap: Some(0.0),
1501            last_size: Some(0),
1502            turnover: 0,
1503            home_notional: 0.0,
1504            foreign_notional: 0.0,
1505            pool: None,
1506        };
1507
1508        let bar = parse_trade_bin_msg(
1509            &msg,
1510            &topic,
1511            &instrument,
1512            instrument.id(),
1513            instrument.price_precision(),
1514            UnixNanos::from(3),
1515        );
1516
1517        assert_eq!(bar.high, Price::from("50010.0"));
1518        assert_eq!(bar.low, Price::from("49990.0"));
1519        assert_eq!(bar.open, Price::from("50000.0"));
1520        assert_eq!(bar.close, Price::from("50005.0"));
1521        assert_eq!(bar.volume, Quantity::from(1_000));
1522    }
1523
1524    #[rstest]
1525    fn test_parse_order_msg() {
1526        let json_data = load_test_json("ws_order.json");
1527        let msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1528        let mut cache = AHashMap::new();
1529        let instrument = create_test_perpetual_instrument();
1530        let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
1531
1532        assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1533        assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1534        assert_eq!(
1535            report.venue_order_id.to_string(),
1536            "550e8400-e29b-41d4-a716-446655440001"
1537        );
1538        assert_eq!(
1539            report.client_order_id.unwrap().to_string(),
1540            "mm_bitmex_1a/oemUeQ4CAJZgP3fjHsA"
1541        );
1542        assert_eq!(report.order_side, OrderSide::Buy);
1543        assert_eq!(report.order_type, OrderType::Limit);
1544        assert_eq!(report.time_in_force, TimeInForce::Gtc);
1545        assert_eq!(report.order_status, OrderStatus::Accepted);
1546        assert_eq!(report.quantity, Quantity::from(100));
1547        assert_eq!(report.filled_qty, Quantity::from(0));
1548        assert_eq!(report.price.unwrap(), Price::from("98000.0"));
1549        assert_eq!(report.ts_accepted, 1732530600000000000); // 2024-11-25T10:30:00.000Z
1550    }
1551
1552    #[rstest]
1553    fn test_parse_order_msg_infers_type_when_missing() {
1554        let json_data = load_test_json("ws_order.json");
1555        let mut msg: BitmexOrderMsg = serde_json::from_str(&json_data).unwrap();
1556        msg.ord_type = None;
1557        msg.cl_ord_id = None;
1558        msg.price = Some(98_000.0);
1559        msg.stop_px = None;
1560
1561        let mut cache = AHashMap::new();
1562        let instrument = create_test_perpetual_instrument();
1563
1564        let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
1565
1566        assert_eq!(report.order_type, OrderType::Limit);
1567    }
1568
1569    #[rstest]
1570    fn test_parse_order_msg_rejected_with_reason() {
1571        let mut msg: BitmexOrderMsg =
1572            serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1573        msg.ord_status = BitmexOrderStatus::Rejected;
1574        msg.ord_rej_reason = Some(Ustr::from("Insufficient available balance"));
1575        msg.text = None;
1576        msg.cum_qty = 0;
1577
1578        let mut cache = AHashMap::new();
1579        let instrument = create_test_perpetual_instrument();
1580        let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
1581
1582        assert_eq!(report.order_status, OrderStatus::Rejected);
1583        assert_eq!(
1584            report.cancel_reason,
1585            Some("Insufficient available balance".to_string())
1586        );
1587    }
1588
1589    #[rstest]
1590    fn test_parse_order_msg_rejected_with_text_fallback() {
1591        let mut msg: BitmexOrderMsg =
1592            serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1593        msg.ord_status = BitmexOrderStatus::Rejected;
1594        msg.ord_rej_reason = None;
1595        msg.text = Some(Ustr::from("Order would execute immediately"));
1596        msg.cum_qty = 0;
1597
1598        let mut cache = AHashMap::new();
1599        let instrument = create_test_perpetual_instrument();
1600        let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
1601
1602        assert_eq!(report.order_status, OrderStatus::Rejected);
1603        assert_eq!(
1604            report.cancel_reason,
1605            Some("Order would execute immediately".to_string())
1606        );
1607    }
1608
1609    #[rstest]
1610    fn test_parse_order_msg_rejected_without_reason() {
1611        let mut msg: BitmexOrderMsg =
1612            serde_json::from_str(&load_test_json("ws_order.json")).unwrap();
1613        msg.ord_status = BitmexOrderStatus::Rejected;
1614        msg.ord_rej_reason = None;
1615        msg.text = None;
1616        msg.cum_qty = 0;
1617
1618        let mut cache = AHashMap::new();
1619        let instrument = create_test_perpetual_instrument();
1620        let report = parse_order_msg(&msg, &instrument, &mut cache, UnixNanos::default()).unwrap();
1621
1622        assert_eq!(report.order_status, OrderStatus::Rejected);
1623        assert_eq!(report.cancel_reason, None);
1624    }
1625
1626    #[rstest]
1627    fn test_parse_execution_msg() {
1628        let json_data = load_test_json("ws_execution.json");
1629        let msg: BitmexExecutionMsg = serde_json::from_str(&json_data).unwrap();
1630        let instrument = create_test_perpetual_instrument();
1631        let fill = parse_execution_msg(msg, &instrument, UnixNanos::default()).unwrap();
1632
1633        assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1634        assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1635        assert_eq!(
1636            fill.venue_order_id.to_string(),
1637            "550e8400-e29b-41d4-a716-446655440002"
1638        );
1639        assert_eq!(
1640            fill.trade_id.to_string(),
1641            "00000000-006d-1000-0000-000e8737d540"
1642        );
1643        assert_eq!(
1644            fill.client_order_id.unwrap().to_string(),
1645            "mm_bitmex_2b/oemUeQ4CAJZgP3fjHsB"
1646        );
1647        assert_eq!(fill.order_side, OrderSide::Sell);
1648        assert_eq!(fill.last_qty, Quantity::from(100));
1649        assert_eq!(fill.last_px, Price::from("98950.0"));
1650        assert_eq!(fill.liquidity_side, LiquiditySide::Maker);
1651        assert_eq!(fill.commission, Money::new(0.00075, Currency::from("XBT")));
1652        assert_eq!(fill.commission.currency.code.to_string(), "XBT");
1653        assert_eq!(fill.ts_event, 1732530900789000000); // 2024-11-25T10:35:00.789Z
1654    }
1655
1656    #[rstest]
1657    fn test_parse_execution_msg_non_trade() {
1658        // Test that non-trade executions return None
1659        let mut msg: BitmexExecutionMsg =
1660            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1661        msg.exec_type = Some(BitmexExecType::Settlement);
1662
1663        let instrument = create_test_perpetual_instrument();
1664        let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1665        assert!(result.is_none());
1666    }
1667
1668    #[rstest]
1669    fn test_parse_cancel_reject_execution() {
1670        // Test that CancelReject messages can be parsed (even without symbol)
1671        let json = load_test_json("ws_execution_cancel_reject.json");
1672
1673        let msg: BitmexExecutionMsg = serde_json::from_str(&json).unwrap();
1674        assert_eq!(msg.exec_type, Some(BitmexExecType::CancelReject));
1675        assert_eq!(msg.ord_status, Some(BitmexOrderStatus::Rejected));
1676        assert_eq!(msg.symbol, None);
1677
1678        // Should return None since it's not a Trade
1679        let instrument = create_test_perpetual_instrument();
1680        let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1681        assert!(result.is_none());
1682    }
1683
1684    #[rstest]
1685    fn test_parse_execution_msg_liquidation() {
1686        // Critical for ADL/hedge tracking
1687        let mut msg: BitmexExecutionMsg =
1688            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1689        msg.exec_type = Some(BitmexExecType::Liquidation);
1690
1691        let instrument = create_test_perpetual_instrument();
1692        let fill = parse_execution_msg(msg, &instrument, UnixNanos::default()).unwrap();
1693
1694        assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1695        assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1696        assert_eq!(fill.order_side, OrderSide::Sell);
1697        assert_eq!(fill.last_qty, Quantity::from(100));
1698        assert_eq!(fill.last_px, Price::from("98950.0"));
1699    }
1700
1701    #[rstest]
1702    fn test_parse_execution_msg_bankruptcy() {
1703        let mut msg: BitmexExecutionMsg =
1704            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1705        msg.exec_type = Some(BitmexExecType::Bankruptcy);
1706
1707        let instrument = create_test_perpetual_instrument();
1708        let fill = parse_execution_msg(msg, &instrument, UnixNanos::default()).unwrap();
1709
1710        assert_eq!(fill.account_id.to_string(), "BITMEX-1234567");
1711        assert_eq!(fill.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1712        assert_eq!(fill.order_side, OrderSide::Sell);
1713        assert_eq!(fill.last_qty, Quantity::from(100));
1714    }
1715
1716    #[rstest]
1717    fn test_parse_execution_msg_settlement() {
1718        let mut msg: BitmexExecutionMsg =
1719            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1720        msg.exec_type = Some(BitmexExecType::Settlement);
1721
1722        let instrument = create_test_perpetual_instrument();
1723        let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1724        assert!(result.is_none());
1725    }
1726
1727    #[rstest]
1728    fn test_parse_execution_msg_trial_fill() {
1729        let mut msg: BitmexExecutionMsg =
1730            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1731        msg.exec_type = Some(BitmexExecType::TrialFill);
1732
1733        let instrument = create_test_perpetual_instrument();
1734        let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1735        assert!(result.is_none());
1736    }
1737
1738    #[rstest]
1739    fn test_parse_execution_msg_funding() {
1740        let mut msg: BitmexExecutionMsg =
1741            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1742        msg.exec_type = Some(BitmexExecType::Funding);
1743
1744        let instrument = create_test_perpetual_instrument();
1745        let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1746        assert!(result.is_none());
1747    }
1748
1749    #[rstest]
1750    fn test_parse_execution_msg_insurance() {
1751        let mut msg: BitmexExecutionMsg =
1752            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1753        msg.exec_type = Some(BitmexExecType::Insurance);
1754
1755        let instrument = create_test_perpetual_instrument();
1756        let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1757        assert!(result.is_none());
1758    }
1759
1760    #[rstest]
1761    fn test_parse_execution_msg_rebalance() {
1762        let mut msg: BitmexExecutionMsg =
1763            serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1764        msg.exec_type = Some(BitmexExecType::Rebalance);
1765
1766        let instrument = create_test_perpetual_instrument();
1767        let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1768        assert!(result.is_none());
1769    }
1770
1771    #[rstest]
1772    fn test_parse_execution_msg_order_state_changes() {
1773        let instrument = create_test_perpetual_instrument();
1774
1775        let order_state_types = vec![
1776            BitmexExecType::New,
1777            BitmexExecType::Canceled,
1778            BitmexExecType::CancelReject,
1779            BitmexExecType::Replaced,
1780            BitmexExecType::Rejected,
1781            BitmexExecType::AmendReject,
1782            BitmexExecType::Suspended,
1783            BitmexExecType::Released,
1784            BitmexExecType::TriggeredOrActivatedBySystem,
1785        ];
1786
1787        for exec_type in order_state_types {
1788            let mut msg: BitmexExecutionMsg =
1789                serde_json::from_str(&load_test_json("ws_execution.json")).unwrap();
1790            msg.exec_type = Some(exec_type.clone());
1791
1792            let result = parse_execution_msg(msg, &instrument, UnixNanos::default());
1793            assert!(
1794                result.is_none(),
1795                "Expected None for exec_type {exec_type:?}"
1796            );
1797        }
1798    }
1799
1800    #[rstest]
1801    fn test_parse_position_msg() {
1802        let json_data = load_test_json("ws_position.json");
1803        let msg: BitmexPositionMsg = serde_json::from_str(&json_data).unwrap();
1804        let instrument = create_test_perpetual_instrument();
1805        let report = parse_position_msg(&msg, &instrument, UnixNanos::default());
1806
1807        assert_eq!(report.account_id.to_string(), "BITMEX-1234567");
1808        assert_eq!(report.instrument_id, InstrumentId::from("XBTUSD.BITMEX"));
1809        assert_eq!(report.position_side.as_position_side(), PositionSide::Long);
1810        assert_eq!(report.quantity, Quantity::from(1000));
1811        assert!(report.venue_position_id.is_none());
1812        assert_eq!(report.ts_last, 1732530900789000000); // 2024-11-25T10:35:00.789Z
1813    }
1814
1815    #[rstest]
1816    fn test_parse_position_msg_short() {
1817        let mut msg: BitmexPositionMsg =
1818            serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1819        msg.current_qty = Some(-500);
1820
1821        let instrument = create_test_perpetual_instrument();
1822        let report = parse_position_msg(&msg, &instrument, UnixNanos::default());
1823        assert_eq!(report.position_side.as_position_side(), PositionSide::Short);
1824        assert_eq!(report.quantity, Quantity::from(500));
1825    }
1826
1827    #[rstest]
1828    fn test_parse_position_msg_flat() {
1829        let mut msg: BitmexPositionMsg =
1830            serde_json::from_str(&load_test_json("ws_position.json")).unwrap();
1831        msg.current_qty = Some(0);
1832
1833        let instrument = create_test_perpetual_instrument();
1834        let report = parse_position_msg(&msg, &instrument, UnixNanos::default());
1835        assert_eq!(report.position_side.as_position_side(), PositionSide::Flat);
1836        assert_eq!(report.quantity, Quantity::from(0));
1837    }
1838
1839    #[rstest]
1840    fn test_parse_wallet_msg() {
1841        let json_data = load_test_json("ws_wallet.json");
1842        let msg: BitmexWalletMsg = serde_json::from_str(&json_data).unwrap();
1843        let ts_init = UnixNanos::from(1);
1844        let account_state = parse_wallet_msg(&msg, ts_init);
1845
1846        assert_eq!(account_state.account_id.to_string(), "BITMEX-1234567");
1847        assert!(!account_state.balances.is_empty());
1848        let balance = &account_state.balances[0];
1849        assert_eq!(balance.currency.code.to_string(), "XBT");
1850        // Amount should be converted from satoshis (100005180 / 100_000_000.0 = 1.0000518)
1851        assert!((balance.total.as_f64() - 1.0000518).abs() < 1e-7);
1852        // Wallet messages do not carry locked margin; full amount is free.
1853        assert_eq!(balance.locked.as_f64(), 0.0);
1854        assert_eq!(balance.free.as_decimal(), balance.total.as_decimal());
1855    }
1856
1857    #[rstest]
1858    fn test_parse_wallet_msg_no_amount() {
1859        let mut msg: BitmexWalletMsg =
1860            serde_json::from_str(&load_test_json("ws_wallet.json")).unwrap();
1861        msg.amount = None;
1862
1863        let ts_init = UnixNanos::from(1);
1864        let account_state = parse_wallet_msg(&msg, ts_init);
1865        let balance = &account_state.balances[0];
1866        assert_eq!(balance.total.as_f64(), 0.0);
1867    }
1868
1869    #[rstest]
1870    fn test_parse_margin_msg() {
1871        let json_data = load_test_json("ws_margin.json");
1872        let msg: BitmexMarginMsg = serde_json::from_str(&json_data).unwrap();
1873        let margin_balance = parse_margin_msg(&msg);
1874
1875        assert_eq!(margin_balance.currency.code.to_string(), "XBT");
1876        assert!(margin_balance.instrument_id.is_none());
1877        // Values should be converted from satoshis to BTC
1878        // initMargin is 0 in test data, so should be 0.0
1879        assert_eq!(margin_balance.initial.as_f64(), 0.0);
1880        // maintMargin is 15949 satoshis = 0.00015949 BTC
1881        assert!((margin_balance.maintenance.as_f64() - 0.00015949).abs() < 1e-8);
1882    }
1883
1884    #[rstest]
1885    fn test_parse_margin_msg_no_available() {
1886        let mut msg: BitmexMarginMsg =
1887            serde_json::from_str(&load_test_json("ws_margin.json")).unwrap();
1888        msg.available_margin = None;
1889
1890        let margin_balance = parse_margin_msg(&msg);
1891        // Should still have valid margin values even if available_margin is None
1892        assert!(margin_balance.initial.as_f64() >= 0.0);
1893        assert!(margin_balance.maintenance.as_f64() >= 0.0);
1894    }
1895
1896    #[rstest]
1897    fn test_parse_margin_account_state_includes_margins() {
1898        let msg = BitmexMarginMsg {
1899            account: 123456,
1900            currency: Ustr::from("USDt"),
1901            risk_limit: None,
1902            amount: Some(5_000_000_000),
1903            prev_realised_pnl: None,
1904            gross_comm: None,
1905            gross_open_cost: None,
1906            gross_open_premium: None,
1907            gross_exec_cost: None,
1908            gross_mark_value: None,
1909            risk_value: None,
1910            init_margin: Some(200_000_000),  // 200 USDT
1911            maint_margin: Some(100_000_000), // 100 USDT
1912            target_excess_margin: None,
1913            realised_pnl: None,
1914            unrealised_pnl: None,
1915            wallet_balance: Some(5_000_000_000), // 5000 USDT
1916            margin_balance: None,
1917            margin_leverage: None,
1918            margin_used_pcnt: None,
1919            excess_margin: None,
1920            available_margin: Some(4_800_000_000), // 4800 USDT
1921            withdrawable_margin: None,
1922            maker_fee_discount: None,
1923            taker_fee_discount: None,
1924            timestamp: DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
1925            foreign_margin_balance: None,
1926            foreign_requirement: None,
1927        };
1928
1929        let ts_init = UnixNanos::from(1_000_000_000u64);
1930        let state = parse_margin_account_state(&msg, ts_init);
1931
1932        assert_eq!(state.account_id.to_string(), "BITMEX-123456");
1933        assert_eq!(state.account_type, AccountType::Margin);
1934        assert_eq!(state.balances.len(), 1);
1935        assert_eq!(state.margins.len(), 1);
1936
1937        let balance = &state.balances[0];
1938        assert_eq!(balance.total.as_f64(), 5000.0);
1939
1940        let margin = &state.margins[0];
1941        assert!(margin.instrument_id.is_none());
1942        assert_eq!(margin.currency.code.as_str(), "USDT");
1943        assert_eq!(margin.initial.as_f64(), 200.0);
1944        assert_eq!(margin.maintenance.as_f64(), 100.0);
1945    }
1946
1947    #[rstest]
1948    fn test_parse_margin_account_state_zero_margins_excluded() {
1949        let msg = BitmexMarginMsg {
1950            account: 123456,
1951            currency: Ustr::from("XBt"),
1952            risk_limit: None,
1953            amount: Some(100_000_000),
1954            prev_realised_pnl: None,
1955            gross_comm: None,
1956            gross_open_cost: None,
1957            gross_open_premium: None,
1958            gross_exec_cost: None,
1959            gross_mark_value: None,
1960            risk_value: None,
1961            init_margin: Some(0),
1962            maint_margin: Some(0),
1963            target_excess_margin: None,
1964            realised_pnl: None,
1965            unrealised_pnl: None,
1966            wallet_balance: Some(100_000_000),
1967            margin_balance: None,
1968            margin_leverage: None,
1969            margin_used_pcnt: None,
1970            excess_margin: None,
1971            available_margin: Some(100_000_000),
1972            withdrawable_margin: None,
1973            maker_fee_discount: None,
1974            taker_fee_discount: None,
1975            timestamp: DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
1976            foreign_margin_balance: None,
1977            foreign_requirement: None,
1978        };
1979
1980        let state = parse_margin_account_state(&msg, UnixNanos::from(1_000_000_000u64));
1981
1982        assert_eq!(state.balances.len(), 1);
1983        assert_eq!(state.margins.len(), 0);
1984    }
1985
1986    #[rstest]
1987    fn test_parse_instrument_msg_both_prices() {
1988        let json_data = load_test_json("ws_instrument.json");
1989        let msg: BitmexInstrumentMsg = serde_json::from_str(&json_data).unwrap();
1990
1991        // Create cache with test instrument
1992        let mut instruments_cache = AHashMap::new();
1993        let test_instrument = create_test_perpetual_instrument();
1994        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
1995
1996        let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
1997
1998        // XBTUSD is not an index symbol, so it should have both mark and index prices
1999        assert_eq!(updates.len(), 2);
2000
2001        // Check mark price update
2002        match &updates[0] {
2003            Data::MarkPriceUpdate(update) => {
2004                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
2005                assert_eq!(update.value.as_f64(), 95125.7);
2006            }
2007            _ => panic!("Expected MarkPriceUpdate at index 0"),
2008        }
2009
2010        // Check index price update
2011        match &updates[1] {
2012            Data::IndexPriceUpdate(update) => {
2013                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
2014                assert_eq!(update.value.as_f64(), 95124.3);
2015            }
2016            _ => panic!("Expected IndexPriceUpdate at index 1"),
2017        }
2018    }
2019
2020    #[rstest]
2021    fn test_parse_instrument_msg_mark_price_only() {
2022        let mut msg: BitmexInstrumentMsg =
2023            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
2024        msg.index_price = None;
2025
2026        // Create cache with test instrument
2027        let mut instruments_cache = AHashMap::new();
2028        let test_instrument = create_test_perpetual_instrument();
2029        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
2030
2031        let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
2032
2033        assert_eq!(updates.len(), 1);
2034        match &updates[0] {
2035            Data::MarkPriceUpdate(update) => {
2036                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
2037                assert_eq!(update.value.as_f64(), 95125.7);
2038            }
2039            _ => panic!("Expected MarkPriceUpdate"),
2040        }
2041    }
2042
2043    #[rstest]
2044    fn test_parse_instrument_msg_index_price_only() {
2045        let mut msg: BitmexInstrumentMsg =
2046            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
2047        msg.mark_price = None;
2048
2049        // Create cache with test instrument
2050        let mut instruments_cache = AHashMap::new();
2051        let test_instrument = create_test_perpetual_instrument();
2052        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
2053
2054        let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
2055
2056        assert_eq!(updates.len(), 1);
2057        match &updates[0] {
2058            Data::IndexPriceUpdate(update) => {
2059                assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
2060                assert_eq!(update.value.as_f64(), 95124.3);
2061            }
2062            _ => panic!("Expected IndexPriceUpdate"),
2063        }
2064    }
2065
2066    #[rstest]
2067    fn test_parse_instrument_msg_no_prices() {
2068        let mut msg: BitmexInstrumentMsg =
2069            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
2070        msg.mark_price = None;
2071        msg.index_price = None;
2072        msg.last_price = None;
2073
2074        // Create cache with test instrument
2075        let mut instruments_cache = AHashMap::new();
2076        let test_instrument = create_test_perpetual_instrument();
2077        instruments_cache.insert(Ustr::from("XBTUSD"), test_instrument);
2078
2079        let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
2080        assert_eq!(updates.len(), 0);
2081    }
2082
2083    #[rstest]
2084    fn test_parse_instrument_msg_index_symbol() {
2085        // Test for index symbols like .BXBT where lastPrice is the index price
2086        // and markPrice equals lastPrice
2087        let mut msg: BitmexInstrumentMsg =
2088            serde_json::from_str(&load_test_json("ws_instrument.json")).unwrap();
2089        msg.symbol = Ustr::from(".BXBT");
2090        msg.last_price = Some(119163.05);
2091        msg.mark_price = Some(119163.05); // Index symbols have mark price equal to last price
2092        msg.index_price = None;
2093
2094        // Create instruments cache with proper precision for .BXBT
2095        let instrument_id = InstrumentId::from(".BXBT.BITMEX");
2096        let instrument = CryptoPerpetual::new(
2097            instrument_id,
2098            Symbol::from(".BXBT"),
2099            Currency::BTC(),
2100            Currency::USD(),
2101            Currency::USD(),
2102            false, // is_inverse
2103            2,     // price_precision (for 119163.05)
2104            8,     // size_precision
2105            Price::from("0.01"),
2106            Quantity::from("0.00000001"),
2107            None,                 // multiplier
2108            None,                 // lot_size
2109            None,                 // max_quantity
2110            None,                 // min_quantity
2111            None,                 // max_notional
2112            None,                 // min_notional
2113            None,                 // max_price
2114            None,                 // min_price
2115            None,                 // margin_init
2116            None,                 // margin_maint
2117            None,                 // maker_fee
2118            None,                 // taker_fee
2119            None,                 // info
2120            UnixNanos::default(), // ts_event
2121            UnixNanos::default(), // ts_init
2122        );
2123        let mut instruments_cache = AHashMap::new();
2124        instruments_cache.insert(
2125            Ustr::from(".BXBT"),
2126            InstrumentAny::CryptoPerpetual(instrument),
2127        );
2128
2129        let updates = parse_instrument_msg(&msg, &instruments_cache, UnixNanos::from(1));
2130
2131        assert_eq!(updates.len(), 2);
2132
2133        // Check mark price update
2134        match &updates[0] {
2135            Data::MarkPriceUpdate(update) => {
2136                assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
2137                assert_eq!(update.value, Price::from("119163.05"));
2138            }
2139            _ => panic!("Expected MarkPriceUpdate for index symbol"),
2140        }
2141
2142        // Check index price update
2143        match &updates[1] {
2144            Data::IndexPriceUpdate(update) => {
2145                assert_eq!(update.instrument_id.to_string(), ".BXBT.BITMEX");
2146                assert_eq!(update.value, Price::from("119163.05"));
2147                assert_eq!(update.ts_init, UnixNanos::from(1));
2148            }
2149            _ => panic!("Expected IndexPriceUpdate for index symbol"),
2150        }
2151    }
2152
2153    #[rstest]
2154    fn test_parse_funding_msg() {
2155        let json_data = load_test_json("ws_funding_rate.json");
2156        let msg: BitmexFundingMsg = serde_json::from_str(&json_data).unwrap();
2157        let update = parse_funding_msg(&msg, UnixNanos::from(1));
2158
2159        assert_eq!(update.instrument_id.to_string(), "XBTUSD.BITMEX");
2160        assert_eq!(update.rate.to_string(), "0.0001");
2161        assert_eq!(update.interval, Some(60 * 8));
2162        assert!(update.next_funding_ns.is_none());
2163        assert_eq!(update.ts_event, UnixNanos::from(1732507200000000000));
2164        assert_eq!(update.ts_init, UnixNanos::from(1));
2165    }
2166}