Skip to main content

nautilus_tardis/machine/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22    data::{
23        Bar, BarType, BookOrder, DEPTH10_LEN, Data, FundingRateUpdate, IndexPriceUpdate,
24        MarkPriceUpdate, NULL_ORDER, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
25        OrderBookDepth10, QuoteTick, TradeTick,
26    },
27    enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
28    identifiers::{InstrumentId, TradeId},
29    types::{Price, Quantity},
30};
31
32use super::{
33    message::{
34        BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, DerivativeTickerMsg, TradeMsg, WsMessage,
35    },
36    types::TardisInstrumentMiniInfo,
37};
38use crate::{
39    common::parse::{
40        derive_trade_id, normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action,
41    },
42    config::BookSnapshotOutput,
43};
44
45#[must_use]
46pub fn parse_tardis_ws_message(
47    msg: WsMessage,
48    info: &Arc<TardisInstrumentMiniInfo>,
49    book_snapshot_output: &BookSnapshotOutput,
50) -> Option<Data> {
51    match msg {
52        WsMessage::BookChange(msg) => {
53            if msg.bids.is_empty() && msg.asks.is_empty() {
54                let exchange = msg.exchange;
55                let symbol = &msg.symbol;
56                log::error!("Invalid book change for {exchange} {symbol} (empty bids and asks)");
57                return None;
58            }
59
60            match parse_book_change_msg_as_deltas(
61                &msg,
62                info.price_precision,
63                info.size_precision,
64                info.instrument_id,
65            ) {
66                Ok(deltas) => Some(Data::Deltas(deltas)),
67                Err(e) => {
68                    log::error!("Failed to parse book change message: {e}");
69                    None
70                }
71            }
72        }
73        WsMessage::BookSnapshot(msg) => match msg.depth {
74            1 => {
75                match parse_book_snapshot_msg_as_quote(
76                    &msg,
77                    info.price_precision,
78                    info.size_precision,
79                    info.instrument_id,
80                ) {
81                    Ok(quote) => Some(Data::Quote(quote)),
82                    Err(e) => {
83                        log::error!("Failed to parse book snapshot quote message: {e}");
84                        None
85                    }
86                }
87            }
88            _ => match book_snapshot_output {
89                BookSnapshotOutput::Depth10 => {
90                    match parse_book_snapshot_msg_as_depth10(
91                        &msg,
92                        info.price_precision,
93                        info.size_precision,
94                        info.instrument_id,
95                    ) {
96                        Ok(depth10) => Some(Data::Depth10(Box::new(depth10))),
97                        Err(e) => {
98                            log::error!("Failed to parse book snapshot as depth10: {e}");
99                            None
100                        }
101                    }
102                }
103                BookSnapshotOutput::Deltas => {
104                    match parse_book_snapshot_msg_as_deltas(
105                        &msg,
106                        info.price_precision,
107                        info.size_precision,
108                        info.instrument_id,
109                    ) {
110                        Ok(deltas) => Some(Data::Deltas(deltas)),
111                        Err(e) => {
112                            log::error!("Failed to parse book snapshot as deltas: {e}");
113                            None
114                        }
115                    }
116                }
117            },
118        },
119        WsMessage::Trade(msg) => {
120            match parse_trade_msg(
121                &msg,
122                info.price_precision,
123                info.size_precision,
124                info.instrument_id,
125            ) {
126                Ok(trade) => Some(Data::Trade(trade)),
127                Err(e) => {
128                    log::error!("Failed to parse trade message: {e}");
129                    None
130                }
131            }
132        }
133        WsMessage::TradeBar(msg) => {
134            match parse_bar_msg(
135                &msg,
136                info.price_precision,
137                info.size_precision,
138                info.instrument_id,
139            ) {
140                Ok(bar) => Some(Data::Bar(bar)),
141                Err(e) => {
142                    log::error!("Failed to parse bar message: {e}");
143                    None
144                }
145            }
146        }
147        // Derivative ticker messages are handled through a separate callback path
148        // for FundingRateUpdate since they're not part of the Data enum.
149        WsMessage::DerivativeTicker(_) => None,
150        WsMessage::Disconnect(_) => None,
151    }
152}
153
154/// Parse a Tardis WebSocket message specifically for funding rate updates.
155/// Returns `Some(FundingRateUpdate)` if the message contains funding rate data, `None` otherwise.
156#[must_use]
157pub fn parse_tardis_ws_message_funding_rate(
158    msg: WsMessage,
159    info: &Arc<TardisInstrumentMiniInfo>,
160) -> Option<FundingRateUpdate> {
161    match msg {
162        WsMessage::DerivativeTicker(msg) => {
163            match parse_derivative_ticker_msg(&msg, info.instrument_id) {
164                Ok(funding_rate) => funding_rate,
165                Err(e) => {
166                    log::error!("Failed to parse derivative ticker message for funding rate: {e}");
167                    None
168                }
169            }
170        }
171        _ => None, // Only derivative ticker messages can contain funding rates
172    }
173}
174
175/// Parse a book change message into order book deltas, returning an error if timestamps invalid.
176/// Parse a book change message into order book deltas.
177///
178/// # Errors
179///
180/// Returns an error if timestamp fields cannot be converted to nanoseconds.
181pub fn parse_book_change_msg_as_deltas(
182    msg: &BookChangeMsg,
183    price_precision: u8,
184    size_precision: u8,
185    instrument_id: InstrumentId,
186) -> anyhow::Result<OrderBookDeltas_API> {
187    parse_book_msg_as_deltas(
188        &msg.bids,
189        &msg.asks,
190        msg.is_snapshot,
191        price_precision,
192        size_precision,
193        instrument_id,
194        msg.timestamp,
195        msg.local_timestamp,
196    )
197}
198
199/// Parse a book snapshot message into order book deltas, returning an error if timestamps invalid.
200/// Parse a book snapshot message into order book deltas.
201///
202/// # Errors
203///
204/// Returns an error if timestamp fields cannot be converted to nanoseconds.
205pub fn parse_book_snapshot_msg_as_deltas(
206    msg: &BookSnapshotMsg,
207    price_precision: u8,
208    size_precision: u8,
209    instrument_id: InstrumentId,
210) -> anyhow::Result<OrderBookDeltas_API> {
211    parse_book_msg_as_deltas(
212        &msg.bids,
213        &msg.asks,
214        true,
215        price_precision,
216        size_precision,
217        instrument_id,
218        msg.timestamp,
219        msg.local_timestamp,
220    )
221}
222
223/// Parse a book snapshot message into an [`OrderBookDepth10`].
224///
225/// # Errors
226///
227/// Returns an error if timestamp fields cannot be converted to nanoseconds.
228pub fn parse_book_snapshot_msg_as_depth10(
229    msg: &BookSnapshotMsg,
230    price_precision: u8,
231    size_precision: u8,
232    instrument_id: InstrumentId,
233) -> anyhow::Result<OrderBookDepth10> {
234    let ts_event_nanos = msg
235        .timestamp
236        .timestamp_nanos_opt()
237        .context("invalid timestamp: cannot extract event nanoseconds")?;
238    anyhow::ensure!(
239        ts_event_nanos >= 0,
240        "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
241    );
242    let ts_event = UnixNanos::from(ts_event_nanos as u64);
243
244    let ts_init_nanos = msg
245        .local_timestamp
246        .timestamp_nanos_opt()
247        .context("invalid timestamp: cannot extract init nanoseconds")?;
248    anyhow::ensure!(
249        ts_init_nanos >= 0,
250        "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
251    );
252    let ts_init = UnixNanos::from(ts_init_nanos as u64);
253
254    let mut bids = [NULL_ORDER; DEPTH10_LEN];
255    let mut asks = [NULL_ORDER; DEPTH10_LEN];
256    let mut bid_counts = [0u32; DEPTH10_LEN];
257    let mut ask_counts = [0u32; DEPTH10_LEN];
258
259    for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
260        bids[i] = BookOrder::new(
261            OrderSide::Buy,
262            Price::new(level.price, price_precision),
263            Quantity::new(level.amount, size_precision),
264            0,
265        );
266        bid_counts[i] = 1;
267    }
268
269    for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
270        asks[i] = BookOrder::new(
271            OrderSide::Sell,
272            Price::new(level.price, price_precision),
273            Quantity::new(level.amount, size_precision),
274            0,
275        );
276        ask_counts[i] = 1;
277    }
278
279    Ok(OrderBookDepth10::new(
280        instrument_id,
281        bids,
282        asks,
283        bid_counts,
284        ask_counts,
285        RecordFlag::F_SNAPSHOT.value(),
286        0, // Sequence not available from Tardis
287        ts_event,
288        ts_init,
289    ))
290}
291
292/// Parse raw book levels into order book deltas, returning error for invalid timestamps.
293#[expect(clippy::too_many_arguments)]
294/// Parse raw book levels into order book deltas.
295///
296/// # Errors
297///
298/// Returns an error if timestamp fields cannot be converted to nanoseconds.
299pub fn parse_book_msg_as_deltas(
300    bids: &[BookLevel],
301    asks: &[BookLevel],
302    is_snapshot: bool,
303    price_precision: u8,
304    size_precision: u8,
305    instrument_id: InstrumentId,
306    timestamp: DateTime<Utc>,
307    local_timestamp: DateTime<Utc>,
308) -> anyhow::Result<OrderBookDeltas_API> {
309    let event_nanos = timestamp
310        .timestamp_nanos_opt()
311        .context("invalid timestamp: cannot extract event nanoseconds")?;
312    anyhow::ensure!(
313        event_nanos >= 0,
314        "invalid timestamp: event nanoseconds {event_nanos} is before UNIX epoch"
315    );
316    let ts_event = UnixNanos::from(event_nanos as u64);
317    let init_nanos = local_timestamp
318        .timestamp_nanos_opt()
319        .context("invalid timestamp: cannot extract init nanoseconds")?;
320    anyhow::ensure!(
321        init_nanos >= 0,
322        "invalid timestamp: init nanoseconds {init_nanos} is before UNIX epoch"
323    );
324    let ts_init = UnixNanos::from(init_nanos as u64);
325
326    let capacity = if is_snapshot {
327        bids.len() + asks.len() + 1
328    } else {
329        bids.len() + asks.len()
330    };
331    let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(capacity);
332
333    if is_snapshot {
334        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_init));
335    }
336
337    for level in bids {
338        match parse_book_level(
339            instrument_id,
340            price_precision,
341            size_precision,
342            OrderSide::Buy,
343            level,
344            is_snapshot,
345            ts_event,
346            ts_init,
347        ) {
348            Ok(delta) => deltas.push(delta),
349            Err(e) => log::warn!("Skipping invalid bid level for {instrument_id}: {e}"),
350        }
351    }
352
353    for level in asks {
354        match parse_book_level(
355            instrument_id,
356            price_precision,
357            size_precision,
358            OrderSide::Sell,
359            level,
360            is_snapshot,
361            ts_event,
362            ts_init,
363        ) {
364            Ok(delta) => deltas.push(delta),
365            Err(e) => log::warn!("Skipping invalid ask level for {instrument_id}: {e}"),
366        }
367    }
368
369    if let Some(last_delta) = deltas.last_mut() {
370        last_delta.flags |= RecordFlag::F_LAST.value();
371    }
372
373    // TODO: Opaque pointer wrapper necessary for Cython (remove once Cython gone)
374    Ok(OrderBookDeltas_API::new(OrderBookDeltas::new(
375        instrument_id,
376        deltas,
377    )))
378}
379
380/// Parse a single book level into an order book delta.
381///
382/// # Errors
383///
384/// Returns an error if a non-delete action has a zero size after normalization.
385#[expect(clippy::too_many_arguments)]
386pub fn parse_book_level(
387    instrument_id: InstrumentId,
388    price_precision: u8,
389    size_precision: u8,
390    side: OrderSide,
391    level: &BookLevel,
392    is_snapshot: bool,
393    ts_event: UnixNanos,
394    ts_init: UnixNanos,
395) -> anyhow::Result<OrderBookDelta> {
396    let amount = normalize_amount(level.amount, size_precision);
397    let action = parse_book_action(is_snapshot, amount);
398    let price = Price::new(level.price, price_precision);
399    let size = Quantity::new(amount, size_precision);
400    let order_id = 0; // Not applicable for L2 data
401    let order = BookOrder::new(side, price, size, order_id);
402    let flags = if is_snapshot {
403        RecordFlag::F_SNAPSHOT.value()
404    } else {
405        0
406    };
407    let sequence = 0; // Not available
408
409    anyhow::ensure!(
410        !(action != BookAction::Delete && size.is_zero()),
411        "Invalid zero size for {action}"
412    );
413
414    Ok(OrderBookDelta::new(
415        instrument_id,
416        action,
417        order,
418        flags,
419        sequence,
420        ts_event,
421        ts_init,
422    ))
423}
424
425/// Parse a book snapshot message into a quote tick, returning an error on invalid data.
426/// Parse a book snapshot message into a quote tick.
427///
428/// # Errors
429///
430/// Returns an error if missing bid/ask levels or invalid sizes.
431pub fn parse_book_snapshot_msg_as_quote(
432    msg: &BookSnapshotMsg,
433    price_precision: u8,
434    size_precision: u8,
435    instrument_id: InstrumentId,
436) -> anyhow::Result<QuoteTick> {
437    let ts_event = UnixNanos::from(msg.timestamp);
438    let ts_init = UnixNanos::from(msg.local_timestamp);
439
440    let best_bid = msg
441        .bids
442        .first()
443        .context("missing best bid level for quote message")?;
444    let bid_price = Price::new(best_bid.price, price_precision);
445    let bid_size = Quantity::non_zero_checked(best_bid.amount, size_precision)
446        .with_context(|| format!("Invalid bid size for message: {msg:?}"))?;
447
448    let best_ask = msg
449        .asks
450        .first()
451        .context("missing best ask level for quote message")?;
452    let ask_price = Price::new(best_ask.price, price_precision);
453    let ask_size = Quantity::non_zero_checked(best_ask.amount, size_precision)
454        .with_context(|| format!("Invalid ask size for message: {msg:?}"))?;
455
456    Ok(QuoteTick::new(
457        instrument_id,
458        bid_price,
459        ask_price,
460        bid_size,
461        ask_size,
462        ts_event,
463        ts_init,
464    ))
465}
466
467/// Parse a trade message into a trade tick, returning an error on invalid data.
468/// Parse a trade message into a trade tick.
469///
470/// # Errors
471///
472/// Returns an error if invalid trade size is encountered.
473pub fn parse_trade_msg(
474    msg: &TradeMsg,
475    price_precision: u8,
476    size_precision: u8,
477    instrument_id: InstrumentId,
478) -> anyhow::Result<TradeTick> {
479    let price = Price::new(msg.price, price_precision);
480    let size = Quantity::non_zero_checked(msg.amount, size_precision)
481        .with_context(|| format!("Invalid trade size in message: {msg:?}"))?;
482    let aggressor_side = parse_aggressor_side(&msg.side);
483    let ts_event = UnixNanos::from(msg.timestamp);
484    let ts_init = UnixNanos::from(msg.local_timestamp);
485    let trade_id = match msg.id.as_deref() {
486        Some(id) if !id.is_empty() => TradeId::new(id),
487        _ => derive_trade_id(
488            msg.symbol,
489            ts_event.as_u64(),
490            msg.price,
491            msg.amount,
492            &msg.side,
493        ),
494    };
495
496    Ok(TradeTick::new(
497        instrument_id,
498        price,
499        size,
500        aggressor_side,
501        trade_id,
502        ts_event,
503        ts_init,
504    ))
505}
506
507/// Parse a bar message into a Bar.
508///
509/// # Errors
510///
511/// Returns an error if the bar specification cannot be parsed.
512pub fn parse_bar_msg(
513    msg: &BarMsg,
514    price_precision: u8,
515    size_precision: u8,
516    instrument_id: InstrumentId,
517) -> anyhow::Result<Bar> {
518    let spec = parse_bar_spec(&msg.name)?;
519    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
520
521    let open = Price::new(msg.open, price_precision);
522    let high = Price::new(msg.high, price_precision);
523    let low = Price::new(msg.low, price_precision);
524    let close = Price::new(msg.close, price_precision);
525    let volume = Quantity::non_zero(msg.volume, size_precision);
526    let ts_event = UnixNanos::from(msg.timestamp);
527    let ts_init = UnixNanos::from(msg.local_timestamp);
528
529    Ok(Bar::new(
530        bar_type, open, high, low, close, volume, ts_event, ts_init,
531    ))
532}
533
534/// Extracts event and init timestamps from a derivative ticker message.
535fn parse_derivative_ticker_timestamps(
536    msg: &DerivativeTickerMsg,
537) -> anyhow::Result<(UnixNanos, UnixNanos)> {
538    let ts_event_nanos = msg
539        .timestamp
540        .timestamp_nanos_opt()
541        .context("invalid timestamp: cannot extract event nanoseconds")?;
542    anyhow::ensure!(
543        ts_event_nanos >= 0,
544        "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
545    );
546
547    let ts_init_nanos = msg
548        .local_timestamp
549        .timestamp_nanos_opt()
550        .context("invalid timestamp: cannot extract init nanoseconds")?;
551    anyhow::ensure!(
552        ts_init_nanos >= 0,
553        "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
554    );
555
556    Ok((
557        UnixNanos::from(ts_event_nanos as u64),
558        UnixNanos::from(ts_init_nanos as u64),
559    ))
560}
561
562/// Parses a derivative ticker message into a funding rate update.
563///
564/// # Errors
565///
566/// Returns an error if timestamp conversion or decimal conversion fails.
567pub fn parse_derivative_ticker_msg(
568    msg: &DerivativeTickerMsg,
569    instrument_id: InstrumentId,
570) -> anyhow::Result<Option<FundingRateUpdate>> {
571    let funding_rate = match msg.funding_rate {
572        Some(rate) => rate,
573        None => return Ok(None),
574    };
575
576    let (ts_event, ts_init) = parse_derivative_ticker_timestamps(msg)?;
577    let rate = rust_decimal::Decimal::try_from(funding_rate)
578        .with_context(|| format!("failed to convert funding rate {funding_rate} to Decimal"))?;
579
580    Ok(Some(FundingRateUpdate::new(
581        instrument_id,
582        rate,
583        None,
584        None,
585        ts_event,
586        ts_init,
587    )))
588}
589
590/// Parses a derivative ticker message into a mark price update.
591///
592/// # Errors
593///
594/// Returns an error if timestamp conversion fails.
595pub fn parse_derivative_ticker_mark_price(
596    msg: &DerivativeTickerMsg,
597    instrument_id: InstrumentId,
598    price_precision: u8,
599) -> anyhow::Result<Option<MarkPriceUpdate>> {
600    let mark_price = match msg.mark_price {
601        Some(p) => p,
602        None => return Ok(None),
603    };
604
605    let (ts_event, ts_init) = parse_derivative_ticker_timestamps(msg)?;
606
607    Ok(Some(MarkPriceUpdate::new(
608        instrument_id,
609        Price::new(mark_price, price_precision),
610        ts_event,
611        ts_init,
612    )))
613}
614
615/// Parses a derivative ticker message into an index price update.
616///
617/// # Errors
618///
619/// Returns an error if timestamp conversion fails.
620pub fn parse_derivative_ticker_index_price(
621    msg: &DerivativeTickerMsg,
622    instrument_id: InstrumentId,
623    price_precision: u8,
624) -> anyhow::Result<Option<IndexPriceUpdate>> {
625    let index_price = match msg.index_price {
626        Some(p) => p,
627        None => return Ok(None),
628    };
629
630    let (ts_event, ts_init) = parse_derivative_ticker_timestamps(msg)?;
631
632    Ok(Some(IndexPriceUpdate::new(
633        instrument_id,
634        Price::new(index_price, price_precision),
635        ts_event,
636        ts_init,
637    )))
638}
639
640#[cfg(test)]
641mod tests {
642    use nautilus_model::enums::AggressorSide;
643    use rstest::rstest;
644
645    use super::*;
646    use crate::common::{enums::TardisExchange, testing::load_test_json};
647
648    #[rstest]
649    fn test_parse_book_change_message() {
650        let json_data = load_test_json("book_change.json");
651        let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
652
653        let price_precision = 0;
654        let size_precision = 0;
655        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
656        let deltas =
657            parse_book_change_msg_as_deltas(&msg, price_precision, size_precision, instrument_id)
658                .unwrap();
659
660        assert_eq!(deltas.deltas.len(), 1);
661        assert_eq!(deltas.instrument_id, instrument_id);
662        assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
663        assert_eq!(deltas.sequence, 0);
664        assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
665        assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
666        assert_eq!(
667            deltas.deltas[0].instrument_id,
668            InstrumentId::from("XBTUSD.BITMEX")
669        );
670        assert_eq!(deltas.deltas[0].action, BookAction::Update);
671        assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
672        assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
673        assert_eq!(deltas.deltas[0].order.order_id, 0);
674        assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
675        assert_eq!(deltas.deltas[0].sequence, 0);
676        assert_eq!(
677            deltas.deltas[0].ts_event,
678            UnixNanos::from(1571830193469000000)
679        );
680        assert_eq!(
681            deltas.deltas[0].ts_init,
682            UnixNanos::from(1571830193469000000)
683        );
684    }
685
686    #[rstest]
687    fn test_parse_book_snapshot_message_as_deltas() {
688        let json_data = load_test_json("book_snapshot.json");
689        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
690
691        let price_precision = 1;
692        let size_precision = 0;
693        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
694        let deltas =
695            parse_book_snapshot_msg_as_deltas(&msg, price_precision, size_precision, instrument_id)
696                .unwrap();
697
698        let clear_delta = deltas.deltas[0];
699        let bid_delta = deltas.deltas[1];
700        let ask_delta = deltas.deltas[3];
701
702        assert_eq!(deltas.deltas.len(), 5);
703        assert_eq!(deltas.instrument_id, instrument_id);
704        assert_eq!(
705            deltas.flags,
706            RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
707        );
708        assert_eq!(deltas.sequence, 0);
709        assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
710        assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
711
712        // CLEAR delta
713        assert_eq!(clear_delta.instrument_id, instrument_id);
714        assert_eq!(clear_delta.action, BookAction::Clear);
715        assert_eq!(clear_delta.flags, RecordFlag::F_SNAPSHOT.value());
716        assert_eq!(clear_delta.sequence, 0);
717        assert_eq!(clear_delta.ts_event, UnixNanos::from(1572010786950000000));
718        assert_eq!(clear_delta.ts_init, UnixNanos::from(1572010786961000000));
719
720        // First bid delta
721        assert_eq!(bid_delta.instrument_id, instrument_id);
722        assert_eq!(bid_delta.action, BookAction::Add);
723        assert_eq!(bid_delta.order.side, OrderSide::Buy);
724        assert_eq!(bid_delta.order.price, Price::from("7633.5"));
725        assert_eq!(bid_delta.order.size, Quantity::from(1906067));
726        assert_eq!(bid_delta.order.order_id, 0);
727        assert_eq!(bid_delta.flags, RecordFlag::F_SNAPSHOT.value());
728        assert_eq!(bid_delta.sequence, 0);
729        assert_eq!(bid_delta.ts_event, UnixNanos::from(1572010786950000000));
730        assert_eq!(bid_delta.ts_init, UnixNanos::from(1572010786961000000));
731
732        // First ask delta
733        assert_eq!(ask_delta.instrument_id, instrument_id);
734        assert_eq!(ask_delta.action, BookAction::Add);
735        assert_eq!(ask_delta.order.side, OrderSide::Sell);
736        assert_eq!(ask_delta.order.price, Price::from("7634.0"));
737        assert_eq!(ask_delta.order.size, Quantity::from(1467849));
738        assert_eq!(ask_delta.order.order_id, 0);
739        assert_eq!(ask_delta.flags, RecordFlag::F_SNAPSHOT.value());
740        assert_eq!(ask_delta.sequence, 0);
741        assert_eq!(ask_delta.ts_event, UnixNanos::from(1572010786950000000));
742        assert_eq!(ask_delta.ts_init, UnixNanos::from(1572010786961000000));
743    }
744
745    #[rstest]
746    fn test_parse_book_snapshot_message_as_depth10() {
747        let json_data = load_test_json("book_snapshot.json");
748        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
749
750        let price_precision = 1;
751        let size_precision = 0;
752        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
753
754        let depth10 = parse_book_snapshot_msg_as_depth10(
755            &msg,
756            price_precision,
757            size_precision,
758            instrument_id,
759        )
760        .unwrap();
761
762        assert_eq!(depth10.instrument_id, instrument_id);
763        assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT.value());
764        assert_eq!(depth10.sequence, 0);
765        assert_eq!(depth10.ts_event, UnixNanos::from(1572010786950000000));
766        assert_eq!(depth10.ts_init, UnixNanos::from(1572010786961000000));
767
768        // Check first bid level
769        assert_eq!(depth10.bids[0].side, OrderSide::Buy);
770        assert_eq!(depth10.bids[0].price, Price::from("7633.5"));
771        assert_eq!(depth10.bids[0].size, Quantity::from(1906067));
772        assert_eq!(depth10.bids[0].order_id, 0);
773        assert_eq!(depth10.bid_counts[0], 1);
774
775        // Check second bid level
776        assert_eq!(depth10.bids[1].side, OrderSide::Buy);
777        assert_eq!(depth10.bids[1].price, Price::from("7633.0"));
778        assert_eq!(depth10.bids[1].size, Quantity::from(65319));
779        assert_eq!(depth10.bid_counts[1], 1);
780
781        // Check first ask level
782        assert_eq!(depth10.asks[0].side, OrderSide::Sell);
783        assert_eq!(depth10.asks[0].price, Price::from("7634.0"));
784        assert_eq!(depth10.asks[0].size, Quantity::from(1467849));
785        assert_eq!(depth10.asks[0].order_id, 0);
786        assert_eq!(depth10.ask_counts[0], 1);
787
788        // Check second ask level
789        assert_eq!(depth10.asks[1].side, OrderSide::Sell);
790        assert_eq!(depth10.asks[1].price, Price::from("7634.5"));
791        assert_eq!(depth10.asks[1].size, Quantity::from(67939));
792        assert_eq!(depth10.ask_counts[1], 1);
793
794        // Check empty levels are NULL_ORDER
795        assert_eq!(depth10.bids[2], NULL_ORDER);
796        assert_eq!(depth10.bid_counts[2], 0);
797        assert_eq!(depth10.asks[2], NULL_ORDER);
798        assert_eq!(depth10.ask_counts[2], 0);
799    }
800
801    #[rstest]
802    fn test_parse_book_snapshot_message_as_quote() {
803        let json_data = load_test_json("book_snapshot.json");
804        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
805
806        let price_precision = 1;
807        let size_precision = 0;
808        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
809        let quote =
810            parse_book_snapshot_msg_as_quote(&msg, price_precision, size_precision, instrument_id)
811                .expect("Failed to parse book snapshot quote message");
812
813        assert_eq!(quote.instrument_id, instrument_id);
814        assert_eq!(quote.bid_price, Price::from("7633.5"));
815        assert_eq!(quote.bid_size, Quantity::from(1906067));
816        assert_eq!(quote.ask_price, Price::from("7634.0"));
817        assert_eq!(quote.ask_size, Quantity::from(1467849));
818        assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
819        assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
820    }
821
822    #[rstest]
823    fn test_parse_trade_message() {
824        let json_data = load_test_json("trade.json");
825        let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
826
827        let price_precision = 0;
828        let size_precision = 0;
829        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
830        let trade = parse_trade_msg(&msg, price_precision, size_precision, instrument_id)
831            .expect("Failed to parse trade message");
832
833        assert_eq!(trade.instrument_id, instrument_id);
834        assert_eq!(trade.price, Price::from("7996"));
835        assert_eq!(trade.size, Quantity::from(50));
836        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
837        assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
838        assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
839    }
840
841    fn build_trade_msg_without_id() -> TradeMsg {
842        let json_data = load_test_json("trade.json");
843        let mut msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
844        msg.id = None;
845        msg
846    }
847
848    #[rstest]
849    fn test_parse_trade_message_derives_trade_id_when_missing() {
850        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
851
852        let first = parse_trade_msg(&build_trade_msg_without_id(), 0, 0, instrument_id).unwrap();
853        let second = parse_trade_msg(&build_trade_msg_without_id(), 0, 0, instrument_id).unwrap();
854
855        assert_eq!(first.trade_id, second.trade_id, "derivation must be stable");
856        assert_eq!(first.trade_id.as_str().len(), 16);
857
858        let mut altered = build_trade_msg_without_id();
859        altered.price = 7997.0;
860        let altered_trade = parse_trade_msg(&altered, 0, 0, instrument_id).unwrap();
861        assert_ne!(first.trade_id, altered_trade.trade_id);
862    }
863
864    #[rstest]
865    fn test_parse_trade_message_derives_trade_id_when_empty() {
866        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
867
868        let mut msg = build_trade_msg_without_id();
869        msg.id = Some(String::new());
870
871        let trade = parse_trade_msg(&msg, 0, 0, instrument_id).unwrap();
872        let fallback = parse_trade_msg(&build_trade_msg_without_id(), 0, 0, instrument_id).unwrap();
873        assert_eq!(trade.trade_id, fallback.trade_id);
874    }
875
876    #[rstest]
877    fn test_parse_bar_message() {
878        let json_data = load_test_json("bar.json");
879        let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
880
881        let price_precision = 1;
882        let size_precision = 0;
883        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
884        let bar = parse_bar_msg(&msg, price_precision, size_precision, instrument_id).unwrap();
885
886        assert_eq!(
887            bar.bar_type,
888            BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
889        );
890        assert_eq!(bar.open, Price::from("7623.5"));
891        assert_eq!(bar.high, Price::from("7623.5"));
892        assert_eq!(bar.low, Price::from("7623"));
893        assert_eq!(bar.close, Price::from("7623.5"));
894        assert_eq!(bar.volume, Quantity::from(37034));
895        assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
896        assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
897    }
898
899    #[rstest]
900    fn test_parse_tardis_ws_message_book_snapshot_routes_to_depth10() {
901        let json_data = load_test_json("book_snapshot.json");
902        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
903        let ws_msg = WsMessage::BookSnapshot(msg);
904
905        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
906        let info = Arc::new(TardisInstrumentMiniInfo::new(
907            instrument_id,
908            None,
909            TardisExchange::Bitmex,
910            1,
911            0,
912        ));
913
914        let result = parse_tardis_ws_message(ws_msg, &info, &BookSnapshotOutput::Depth10);
915
916        assert!(result.is_some());
917        assert!(matches!(result.unwrap(), Data::Depth10(_)));
918    }
919
920    #[rstest]
921    fn test_parse_tardis_ws_message_sparse_book_snapshot_routes_to_depth10() {
922        let json_data = r#"{
923            "type": "book_snapshot",
924            "symbol": "ETC",
925            "exchange": "hyperliquid",
926            "name": "book_snapshot_20_10s",
927            "depth": 20,
928            "interval": 10000,
929            "bids": [{"price": 20.002, "amount": 5.81}],
930            "asks": [{"price": 20.003, "amount": 162.45}, {}],
931            "timestamp": "2025-03-03T10:48:10.000Z",
932            "localTimestamp": "2025-03-03T10:48:10.596818Z"
933        }"#;
934        let msg: BookSnapshotMsg = serde_json::from_str(json_data).unwrap();
935        let ws_msg = WsMessage::BookSnapshot(msg);
936
937        let instrument_id = InstrumentId::from("ETC.HYPERLIQUID");
938        let info = Arc::new(TardisInstrumentMiniInfo::new(
939            instrument_id,
940            None,
941            TardisExchange::Hyperliquid,
942            3,
943            2,
944        ));
945
946        let result = parse_tardis_ws_message(ws_msg, &info, &BookSnapshotOutput::Depth10);
947
948        assert!(result.is_some());
949        assert!(matches!(result.unwrap(), Data::Depth10(_)));
950    }
951
952    #[rstest]
953    fn test_parse_tardis_ws_message_book_snapshot_routes_to_deltas() {
954        let json_data = load_test_json("book_snapshot.json");
955        let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
956        let ws_msg = WsMessage::BookSnapshot(msg);
957
958        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
959        let info = Arc::new(TardisInstrumentMiniInfo::new(
960            instrument_id,
961            None,
962            TardisExchange::Bitmex,
963            1,
964            0,
965        ));
966
967        let result = parse_tardis_ws_message(ws_msg, &info, &BookSnapshotOutput::Deltas);
968
969        assert!(result.is_some());
970        assert!(matches!(result.unwrap(), Data::Deltas(_)));
971    }
972
973    #[rstest]
974    fn test_parse_derivative_ticker_funding_rate() {
975        let json_data = load_test_json("derivative_ticker.json");
976        let msg: DerivativeTickerMsg = serde_json::from_str(&json_data).unwrap();
977
978        let instrument_id = InstrumentId::from("BTC-PERPETUAL.DERIBIT");
979
980        let result = parse_derivative_ticker_msg(&msg, instrument_id).unwrap();
981        assert!(result.is_some());
982
983        let funding = result.unwrap();
984        assert_eq!(funding.instrument_id, instrument_id);
985        assert_eq!(funding.rate.to_string(), "-0.00001568");
986        assert!(funding.ts_event.as_u64() > 0);
987        assert!(funding.ts_init.as_u64() > 0);
988    }
989
990    #[rstest]
991    fn test_parse_derivative_ticker_mark_price() {
992        let json_data = load_test_json("derivative_ticker.json");
993        let msg: DerivativeTickerMsg = serde_json::from_str(&json_data).unwrap();
994
995        let instrument_id = InstrumentId::from("BTC-PERPETUAL.DERIBIT");
996        let price_precision = 2;
997
998        let result =
999            parse_derivative_ticker_mark_price(&msg, instrument_id, price_precision).unwrap();
1000        assert!(result.is_some());
1001
1002        let mark = result.unwrap();
1003        assert_eq!(mark.instrument_id, instrument_id);
1004        assert_eq!(mark.value, Price::new(7987.56, price_precision));
1005        assert!(mark.ts_event.as_u64() > 0);
1006        assert!(mark.ts_init.as_u64() > 0);
1007    }
1008
1009    #[rstest]
1010    fn test_parse_derivative_ticker_index_price() {
1011        let json_data = load_test_json("derivative_ticker.json");
1012        let msg: DerivativeTickerMsg = serde_json::from_str(&json_data).unwrap();
1013
1014        let instrument_id = InstrumentId::from("BTC-PERPETUAL.DERIBIT");
1015        let price_precision = 2;
1016
1017        let result =
1018            parse_derivative_ticker_index_price(&msg, instrument_id, price_precision).unwrap();
1019        assert!(result.is_some());
1020
1021        let index = result.unwrap();
1022        assert_eq!(index.instrument_id, instrument_id);
1023        assert_eq!(index.value, Price::new(7989.28, price_precision));
1024        assert!(index.ts_event.as_u64() > 0);
1025        assert!(index.ts_init.as_u64() > 0);
1026    }
1027
1028    #[rstest]
1029    fn test_parse_derivative_ticker_missing_fields() {
1030        // Test with minimal data (only funding_rate, no mark/index)
1031        let json = r#"{
1032            "type": "derivative_ticker",
1033            "symbol": "BTCUSD",
1034            "exchange": "bitmex",
1035            "lastPrice": null,
1036            "openInterest": null,
1037            "fundingRate": 0.0001,
1038            "indexPrice": null,
1039            "markPrice": null,
1040            "timestamp": "2024-01-01T00:00:00.000Z",
1041            "localTimestamp": "2024-01-01T00:00:00.100Z"
1042        }"#;
1043        let msg: DerivativeTickerMsg = serde_json::from_str(json).unwrap();
1044
1045        let instrument_id = InstrumentId::from("BTCUSD.BITMEX");
1046
1047        let funding = parse_derivative_ticker_msg(&msg, instrument_id).unwrap();
1048        assert!(funding.is_some());
1049
1050        let mark = parse_derivative_ticker_mark_price(&msg, instrument_id, 1).unwrap();
1051        assert!(mark.is_none());
1052
1053        let index = parse_derivative_ticker_index_price(&msg, instrument_id, 1).unwrap();
1054        assert!(index.is_none());
1055    }
1056}