1use std::sync::Arc;
17
18use anyhow::Context;
19use chrono::{DateTime, Utc};
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22 data::{
23 Bar, BarType, BookOrder, DEPTH10_LEN, Data, FundingRateUpdate, IndexPriceUpdate,
24 MarkPriceUpdate, NULL_ORDER, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
25 OrderBookDepth10, QuoteTick, TradeTick,
26 },
27 enums::{AggregationSource, BookAction, OrderSide, RecordFlag},
28 identifiers::{InstrumentId, TradeId},
29 types::{Price, Quantity},
30};
31
32use super::{
33 message::{
34 BarMsg, BookChangeMsg, BookLevel, BookSnapshotMsg, DerivativeTickerMsg, TradeMsg, WsMessage,
35 },
36 types::TardisInstrumentMiniInfo,
37};
38use crate::{
39 common::parse::{
40 derive_trade_id, normalize_amount, parse_aggressor_side, parse_bar_spec, parse_book_action,
41 },
42 config::BookSnapshotOutput,
43};
44
45#[must_use]
46pub fn parse_tardis_ws_message(
47 msg: WsMessage,
48 info: &Arc<TardisInstrumentMiniInfo>,
49 book_snapshot_output: &BookSnapshotOutput,
50) -> Option<Data> {
51 match msg {
52 WsMessage::BookChange(msg) => {
53 if msg.bids.is_empty() && msg.asks.is_empty() {
54 let exchange = msg.exchange;
55 let symbol = &msg.symbol;
56 log::error!("Invalid book change for {exchange} {symbol} (empty bids and asks)");
57 return None;
58 }
59
60 match parse_book_change_msg_as_deltas(
61 &msg,
62 info.price_precision,
63 info.size_precision,
64 info.instrument_id,
65 ) {
66 Ok(deltas) => Some(Data::Deltas(deltas)),
67 Err(e) => {
68 log::error!("Failed to parse book change message: {e}");
69 None
70 }
71 }
72 }
73 WsMessage::BookSnapshot(msg) => match msg.depth {
74 1 => {
75 match parse_book_snapshot_msg_as_quote(
76 &msg,
77 info.price_precision,
78 info.size_precision,
79 info.instrument_id,
80 ) {
81 Ok(quote) => Some(Data::Quote(quote)),
82 Err(e) => {
83 log::error!("Failed to parse book snapshot quote message: {e}");
84 None
85 }
86 }
87 }
88 _ => match book_snapshot_output {
89 BookSnapshotOutput::Depth10 => {
90 match parse_book_snapshot_msg_as_depth10(
91 &msg,
92 info.price_precision,
93 info.size_precision,
94 info.instrument_id,
95 ) {
96 Ok(depth10) => Some(Data::Depth10(Box::new(depth10))),
97 Err(e) => {
98 log::error!("Failed to parse book snapshot as depth10: {e}");
99 None
100 }
101 }
102 }
103 BookSnapshotOutput::Deltas => {
104 match parse_book_snapshot_msg_as_deltas(
105 &msg,
106 info.price_precision,
107 info.size_precision,
108 info.instrument_id,
109 ) {
110 Ok(deltas) => Some(Data::Deltas(deltas)),
111 Err(e) => {
112 log::error!("Failed to parse book snapshot as deltas: {e}");
113 None
114 }
115 }
116 }
117 },
118 },
119 WsMessage::Trade(msg) => {
120 match parse_trade_msg(
121 &msg,
122 info.price_precision,
123 info.size_precision,
124 info.instrument_id,
125 ) {
126 Ok(trade) => Some(Data::Trade(trade)),
127 Err(e) => {
128 log::error!("Failed to parse trade message: {e}");
129 None
130 }
131 }
132 }
133 WsMessage::TradeBar(msg) => {
134 match parse_bar_msg(
135 &msg,
136 info.price_precision,
137 info.size_precision,
138 info.instrument_id,
139 ) {
140 Ok(bar) => Some(Data::Bar(bar)),
141 Err(e) => {
142 log::error!("Failed to parse bar message: {e}");
143 None
144 }
145 }
146 }
147 WsMessage::DerivativeTicker(_) => None,
150 WsMessage::Disconnect(_) => None,
151 }
152}
153
154#[must_use]
157pub fn parse_tardis_ws_message_funding_rate(
158 msg: WsMessage,
159 info: &Arc<TardisInstrumentMiniInfo>,
160) -> Option<FundingRateUpdate> {
161 match msg {
162 WsMessage::DerivativeTicker(msg) => {
163 match parse_derivative_ticker_msg(&msg, info.instrument_id) {
164 Ok(funding_rate) => funding_rate,
165 Err(e) => {
166 log::error!("Failed to parse derivative ticker message for funding rate: {e}");
167 None
168 }
169 }
170 }
171 _ => None, }
173}
174
175pub fn parse_book_change_msg_as_deltas(
182 msg: &BookChangeMsg,
183 price_precision: u8,
184 size_precision: u8,
185 instrument_id: InstrumentId,
186) -> anyhow::Result<OrderBookDeltas_API> {
187 parse_book_msg_as_deltas(
188 &msg.bids,
189 &msg.asks,
190 msg.is_snapshot,
191 price_precision,
192 size_precision,
193 instrument_id,
194 msg.timestamp,
195 msg.local_timestamp,
196 )
197}
198
199pub fn parse_book_snapshot_msg_as_deltas(
206 msg: &BookSnapshotMsg,
207 price_precision: u8,
208 size_precision: u8,
209 instrument_id: InstrumentId,
210) -> anyhow::Result<OrderBookDeltas_API> {
211 parse_book_msg_as_deltas(
212 &msg.bids,
213 &msg.asks,
214 true,
215 price_precision,
216 size_precision,
217 instrument_id,
218 msg.timestamp,
219 msg.local_timestamp,
220 )
221}
222
223pub fn parse_book_snapshot_msg_as_depth10(
229 msg: &BookSnapshotMsg,
230 price_precision: u8,
231 size_precision: u8,
232 instrument_id: InstrumentId,
233) -> anyhow::Result<OrderBookDepth10> {
234 let ts_event_nanos = msg
235 .timestamp
236 .timestamp_nanos_opt()
237 .context("invalid timestamp: cannot extract event nanoseconds")?;
238 anyhow::ensure!(
239 ts_event_nanos >= 0,
240 "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
241 );
242 let ts_event = UnixNanos::from(ts_event_nanos as u64);
243
244 let ts_init_nanos = msg
245 .local_timestamp
246 .timestamp_nanos_opt()
247 .context("invalid timestamp: cannot extract init nanoseconds")?;
248 anyhow::ensure!(
249 ts_init_nanos >= 0,
250 "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
251 );
252 let ts_init = UnixNanos::from(ts_init_nanos as u64);
253
254 let mut bids = [NULL_ORDER; DEPTH10_LEN];
255 let mut asks = [NULL_ORDER; DEPTH10_LEN];
256 let mut bid_counts = [0u32; DEPTH10_LEN];
257 let mut ask_counts = [0u32; DEPTH10_LEN];
258
259 for (i, level) in msg.bids.iter().take(DEPTH10_LEN).enumerate() {
260 bids[i] = BookOrder::new(
261 OrderSide::Buy,
262 Price::new(level.price, price_precision),
263 Quantity::new(level.amount, size_precision),
264 0,
265 );
266 bid_counts[i] = 1;
267 }
268
269 for (i, level) in msg.asks.iter().take(DEPTH10_LEN).enumerate() {
270 asks[i] = BookOrder::new(
271 OrderSide::Sell,
272 Price::new(level.price, price_precision),
273 Quantity::new(level.amount, size_precision),
274 0,
275 );
276 ask_counts[i] = 1;
277 }
278
279 Ok(OrderBookDepth10::new(
280 instrument_id,
281 bids,
282 asks,
283 bid_counts,
284 ask_counts,
285 RecordFlag::F_SNAPSHOT.value(),
286 0, ts_event,
288 ts_init,
289 ))
290}
291
292#[expect(clippy::too_many_arguments)]
294pub fn parse_book_msg_as_deltas(
300 bids: &[BookLevel],
301 asks: &[BookLevel],
302 is_snapshot: bool,
303 price_precision: u8,
304 size_precision: u8,
305 instrument_id: InstrumentId,
306 timestamp: DateTime<Utc>,
307 local_timestamp: DateTime<Utc>,
308) -> anyhow::Result<OrderBookDeltas_API> {
309 let event_nanos = timestamp
310 .timestamp_nanos_opt()
311 .context("invalid timestamp: cannot extract event nanoseconds")?;
312 anyhow::ensure!(
313 event_nanos >= 0,
314 "invalid timestamp: event nanoseconds {event_nanos} is before UNIX epoch"
315 );
316 let ts_event = UnixNanos::from(event_nanos as u64);
317 let init_nanos = local_timestamp
318 .timestamp_nanos_opt()
319 .context("invalid timestamp: cannot extract init nanoseconds")?;
320 anyhow::ensure!(
321 init_nanos >= 0,
322 "invalid timestamp: init nanoseconds {init_nanos} is before UNIX epoch"
323 );
324 let ts_init = UnixNanos::from(init_nanos as u64);
325
326 let capacity = if is_snapshot {
327 bids.len() + asks.len() + 1
328 } else {
329 bids.len() + asks.len()
330 };
331 let mut deltas: Vec<OrderBookDelta> = Vec::with_capacity(capacity);
332
333 if is_snapshot {
334 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_init));
335 }
336
337 for level in bids {
338 match parse_book_level(
339 instrument_id,
340 price_precision,
341 size_precision,
342 OrderSide::Buy,
343 level,
344 is_snapshot,
345 ts_event,
346 ts_init,
347 ) {
348 Ok(delta) => deltas.push(delta),
349 Err(e) => log::warn!("Skipping invalid bid level for {instrument_id}: {e}"),
350 }
351 }
352
353 for level in asks {
354 match parse_book_level(
355 instrument_id,
356 price_precision,
357 size_precision,
358 OrderSide::Sell,
359 level,
360 is_snapshot,
361 ts_event,
362 ts_init,
363 ) {
364 Ok(delta) => deltas.push(delta),
365 Err(e) => log::warn!("Skipping invalid ask level for {instrument_id}: {e}"),
366 }
367 }
368
369 if let Some(last_delta) = deltas.last_mut() {
370 last_delta.flags |= RecordFlag::F_LAST.value();
371 }
372
373 Ok(OrderBookDeltas_API::new(OrderBookDeltas::new(
375 instrument_id,
376 deltas,
377 )))
378}
379
380#[expect(clippy::too_many_arguments)]
386pub fn parse_book_level(
387 instrument_id: InstrumentId,
388 price_precision: u8,
389 size_precision: u8,
390 side: OrderSide,
391 level: &BookLevel,
392 is_snapshot: bool,
393 ts_event: UnixNanos,
394 ts_init: UnixNanos,
395) -> anyhow::Result<OrderBookDelta> {
396 let amount = normalize_amount(level.amount, size_precision);
397 let action = parse_book_action(is_snapshot, amount);
398 let price = Price::new(level.price, price_precision);
399 let size = Quantity::new(amount, size_precision);
400 let order_id = 0; let order = BookOrder::new(side, price, size, order_id);
402 let flags = if is_snapshot {
403 RecordFlag::F_SNAPSHOT.value()
404 } else {
405 0
406 };
407 let sequence = 0; anyhow::ensure!(
410 !(action != BookAction::Delete && size.is_zero()),
411 "Invalid zero size for {action}"
412 );
413
414 Ok(OrderBookDelta::new(
415 instrument_id,
416 action,
417 order,
418 flags,
419 sequence,
420 ts_event,
421 ts_init,
422 ))
423}
424
425pub fn parse_book_snapshot_msg_as_quote(
432 msg: &BookSnapshotMsg,
433 price_precision: u8,
434 size_precision: u8,
435 instrument_id: InstrumentId,
436) -> anyhow::Result<QuoteTick> {
437 let ts_event = UnixNanos::from(msg.timestamp);
438 let ts_init = UnixNanos::from(msg.local_timestamp);
439
440 let best_bid = msg
441 .bids
442 .first()
443 .context("missing best bid level for quote message")?;
444 let bid_price = Price::new(best_bid.price, price_precision);
445 let bid_size = Quantity::non_zero_checked(best_bid.amount, size_precision)
446 .with_context(|| format!("Invalid bid size for message: {msg:?}"))?;
447
448 let best_ask = msg
449 .asks
450 .first()
451 .context("missing best ask level for quote message")?;
452 let ask_price = Price::new(best_ask.price, price_precision);
453 let ask_size = Quantity::non_zero_checked(best_ask.amount, size_precision)
454 .with_context(|| format!("Invalid ask size for message: {msg:?}"))?;
455
456 Ok(QuoteTick::new(
457 instrument_id,
458 bid_price,
459 ask_price,
460 bid_size,
461 ask_size,
462 ts_event,
463 ts_init,
464 ))
465}
466
467pub fn parse_trade_msg(
474 msg: &TradeMsg,
475 price_precision: u8,
476 size_precision: u8,
477 instrument_id: InstrumentId,
478) -> anyhow::Result<TradeTick> {
479 let price = Price::new(msg.price, price_precision);
480 let size = Quantity::non_zero_checked(msg.amount, size_precision)
481 .with_context(|| format!("Invalid trade size in message: {msg:?}"))?;
482 let aggressor_side = parse_aggressor_side(&msg.side);
483 let ts_event = UnixNanos::from(msg.timestamp);
484 let ts_init = UnixNanos::from(msg.local_timestamp);
485 let trade_id = match msg.id.as_deref() {
486 Some(id) if !id.is_empty() => TradeId::new(id),
487 _ => derive_trade_id(
488 msg.symbol,
489 ts_event.as_u64(),
490 msg.price,
491 msg.amount,
492 &msg.side,
493 ),
494 };
495
496 Ok(TradeTick::new(
497 instrument_id,
498 price,
499 size,
500 aggressor_side,
501 trade_id,
502 ts_event,
503 ts_init,
504 ))
505}
506
507pub fn parse_bar_msg(
513 msg: &BarMsg,
514 price_precision: u8,
515 size_precision: u8,
516 instrument_id: InstrumentId,
517) -> anyhow::Result<Bar> {
518 let spec = parse_bar_spec(&msg.name)?;
519 let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
520
521 let open = Price::new(msg.open, price_precision);
522 let high = Price::new(msg.high, price_precision);
523 let low = Price::new(msg.low, price_precision);
524 let close = Price::new(msg.close, price_precision);
525 let volume = Quantity::non_zero(msg.volume, size_precision);
526 let ts_event = UnixNanos::from(msg.timestamp);
527 let ts_init = UnixNanos::from(msg.local_timestamp);
528
529 Ok(Bar::new(
530 bar_type, open, high, low, close, volume, ts_event, ts_init,
531 ))
532}
533
534fn parse_derivative_ticker_timestamps(
536 msg: &DerivativeTickerMsg,
537) -> anyhow::Result<(UnixNanos, UnixNanos)> {
538 let ts_event_nanos = msg
539 .timestamp
540 .timestamp_nanos_opt()
541 .context("invalid timestamp: cannot extract event nanoseconds")?;
542 anyhow::ensure!(
543 ts_event_nanos >= 0,
544 "invalid timestamp: event nanoseconds {ts_event_nanos} is before UNIX epoch"
545 );
546
547 let ts_init_nanos = msg
548 .local_timestamp
549 .timestamp_nanos_opt()
550 .context("invalid timestamp: cannot extract init nanoseconds")?;
551 anyhow::ensure!(
552 ts_init_nanos >= 0,
553 "invalid timestamp: init nanoseconds {ts_init_nanos} is before UNIX epoch"
554 );
555
556 Ok((
557 UnixNanos::from(ts_event_nanos as u64),
558 UnixNanos::from(ts_init_nanos as u64),
559 ))
560}
561
562pub fn parse_derivative_ticker_msg(
568 msg: &DerivativeTickerMsg,
569 instrument_id: InstrumentId,
570) -> anyhow::Result<Option<FundingRateUpdate>> {
571 let funding_rate = match msg.funding_rate {
572 Some(rate) => rate,
573 None => return Ok(None),
574 };
575
576 let (ts_event, ts_init) = parse_derivative_ticker_timestamps(msg)?;
577 let rate = rust_decimal::Decimal::try_from(funding_rate)
578 .with_context(|| format!("failed to convert funding rate {funding_rate} to Decimal"))?;
579
580 Ok(Some(FundingRateUpdate::new(
581 instrument_id,
582 rate,
583 None,
584 None,
585 ts_event,
586 ts_init,
587 )))
588}
589
590pub fn parse_derivative_ticker_mark_price(
596 msg: &DerivativeTickerMsg,
597 instrument_id: InstrumentId,
598 price_precision: u8,
599) -> anyhow::Result<Option<MarkPriceUpdate>> {
600 let mark_price = match msg.mark_price {
601 Some(p) => p,
602 None => return Ok(None),
603 };
604
605 let (ts_event, ts_init) = parse_derivative_ticker_timestamps(msg)?;
606
607 Ok(Some(MarkPriceUpdate::new(
608 instrument_id,
609 Price::new(mark_price, price_precision),
610 ts_event,
611 ts_init,
612 )))
613}
614
615pub fn parse_derivative_ticker_index_price(
621 msg: &DerivativeTickerMsg,
622 instrument_id: InstrumentId,
623 price_precision: u8,
624) -> anyhow::Result<Option<IndexPriceUpdate>> {
625 let index_price = match msg.index_price {
626 Some(p) => p,
627 None => return Ok(None),
628 };
629
630 let (ts_event, ts_init) = parse_derivative_ticker_timestamps(msg)?;
631
632 Ok(Some(IndexPriceUpdate::new(
633 instrument_id,
634 Price::new(index_price, price_precision),
635 ts_event,
636 ts_init,
637 )))
638}
639
640#[cfg(test)]
641mod tests {
642 use nautilus_model::enums::AggressorSide;
643 use rstest::rstest;
644
645 use super::*;
646 use crate::common::{enums::TardisExchange, testing::load_test_json};
647
648 #[rstest]
649 fn test_parse_book_change_message() {
650 let json_data = load_test_json("book_change.json");
651 let msg: BookChangeMsg = serde_json::from_str(&json_data).unwrap();
652
653 let price_precision = 0;
654 let size_precision = 0;
655 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
656 let deltas =
657 parse_book_change_msg_as_deltas(&msg, price_precision, size_precision, instrument_id)
658 .unwrap();
659
660 assert_eq!(deltas.deltas.len(), 1);
661 assert_eq!(deltas.instrument_id, instrument_id);
662 assert_eq!(deltas.flags, RecordFlag::F_LAST.value());
663 assert_eq!(deltas.sequence, 0);
664 assert_eq!(deltas.ts_event, UnixNanos::from(1571830193469000000));
665 assert_eq!(deltas.ts_init, UnixNanos::from(1571830193469000000));
666 assert_eq!(
667 deltas.deltas[0].instrument_id,
668 InstrumentId::from("XBTUSD.BITMEX")
669 );
670 assert_eq!(deltas.deltas[0].action, BookAction::Update);
671 assert_eq!(deltas.deltas[0].order.price, Price::from("7985"));
672 assert_eq!(deltas.deltas[0].order.size, Quantity::from(283318));
673 assert_eq!(deltas.deltas[0].order.order_id, 0);
674 assert_eq!(deltas.deltas[0].flags, RecordFlag::F_LAST.value());
675 assert_eq!(deltas.deltas[0].sequence, 0);
676 assert_eq!(
677 deltas.deltas[0].ts_event,
678 UnixNanos::from(1571830193469000000)
679 );
680 assert_eq!(
681 deltas.deltas[0].ts_init,
682 UnixNanos::from(1571830193469000000)
683 );
684 }
685
686 #[rstest]
687 fn test_parse_book_snapshot_message_as_deltas() {
688 let json_data = load_test_json("book_snapshot.json");
689 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
690
691 let price_precision = 1;
692 let size_precision = 0;
693 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
694 let deltas =
695 parse_book_snapshot_msg_as_deltas(&msg, price_precision, size_precision, instrument_id)
696 .unwrap();
697
698 let clear_delta = deltas.deltas[0];
699 let bid_delta = deltas.deltas[1];
700 let ask_delta = deltas.deltas[3];
701
702 assert_eq!(deltas.deltas.len(), 5);
703 assert_eq!(deltas.instrument_id, instrument_id);
704 assert_eq!(
705 deltas.flags,
706 RecordFlag::F_LAST.value() + RecordFlag::F_SNAPSHOT.value()
707 );
708 assert_eq!(deltas.sequence, 0);
709 assert_eq!(deltas.ts_event, UnixNanos::from(1572010786950000000));
710 assert_eq!(deltas.ts_init, UnixNanos::from(1572010786961000000));
711
712 assert_eq!(clear_delta.instrument_id, instrument_id);
714 assert_eq!(clear_delta.action, BookAction::Clear);
715 assert_eq!(clear_delta.flags, RecordFlag::F_SNAPSHOT.value());
716 assert_eq!(clear_delta.sequence, 0);
717 assert_eq!(clear_delta.ts_event, UnixNanos::from(1572010786950000000));
718 assert_eq!(clear_delta.ts_init, UnixNanos::from(1572010786961000000));
719
720 assert_eq!(bid_delta.instrument_id, instrument_id);
722 assert_eq!(bid_delta.action, BookAction::Add);
723 assert_eq!(bid_delta.order.side, OrderSide::Buy);
724 assert_eq!(bid_delta.order.price, Price::from("7633.5"));
725 assert_eq!(bid_delta.order.size, Quantity::from(1906067));
726 assert_eq!(bid_delta.order.order_id, 0);
727 assert_eq!(bid_delta.flags, RecordFlag::F_SNAPSHOT.value());
728 assert_eq!(bid_delta.sequence, 0);
729 assert_eq!(bid_delta.ts_event, UnixNanos::from(1572010786950000000));
730 assert_eq!(bid_delta.ts_init, UnixNanos::from(1572010786961000000));
731
732 assert_eq!(ask_delta.instrument_id, instrument_id);
734 assert_eq!(ask_delta.action, BookAction::Add);
735 assert_eq!(ask_delta.order.side, OrderSide::Sell);
736 assert_eq!(ask_delta.order.price, Price::from("7634.0"));
737 assert_eq!(ask_delta.order.size, Quantity::from(1467849));
738 assert_eq!(ask_delta.order.order_id, 0);
739 assert_eq!(ask_delta.flags, RecordFlag::F_SNAPSHOT.value());
740 assert_eq!(ask_delta.sequence, 0);
741 assert_eq!(ask_delta.ts_event, UnixNanos::from(1572010786950000000));
742 assert_eq!(ask_delta.ts_init, UnixNanos::from(1572010786961000000));
743 }
744
745 #[rstest]
746 fn test_parse_book_snapshot_message_as_depth10() {
747 let json_data = load_test_json("book_snapshot.json");
748 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
749
750 let price_precision = 1;
751 let size_precision = 0;
752 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
753
754 let depth10 = parse_book_snapshot_msg_as_depth10(
755 &msg,
756 price_precision,
757 size_precision,
758 instrument_id,
759 )
760 .unwrap();
761
762 assert_eq!(depth10.instrument_id, instrument_id);
763 assert_eq!(depth10.flags, RecordFlag::F_SNAPSHOT.value());
764 assert_eq!(depth10.sequence, 0);
765 assert_eq!(depth10.ts_event, UnixNanos::from(1572010786950000000));
766 assert_eq!(depth10.ts_init, UnixNanos::from(1572010786961000000));
767
768 assert_eq!(depth10.bids[0].side, OrderSide::Buy);
770 assert_eq!(depth10.bids[0].price, Price::from("7633.5"));
771 assert_eq!(depth10.bids[0].size, Quantity::from(1906067));
772 assert_eq!(depth10.bids[0].order_id, 0);
773 assert_eq!(depth10.bid_counts[0], 1);
774
775 assert_eq!(depth10.bids[1].side, OrderSide::Buy);
777 assert_eq!(depth10.bids[1].price, Price::from("7633.0"));
778 assert_eq!(depth10.bids[1].size, Quantity::from(65319));
779 assert_eq!(depth10.bid_counts[1], 1);
780
781 assert_eq!(depth10.asks[0].side, OrderSide::Sell);
783 assert_eq!(depth10.asks[0].price, Price::from("7634.0"));
784 assert_eq!(depth10.asks[0].size, Quantity::from(1467849));
785 assert_eq!(depth10.asks[0].order_id, 0);
786 assert_eq!(depth10.ask_counts[0], 1);
787
788 assert_eq!(depth10.asks[1].side, OrderSide::Sell);
790 assert_eq!(depth10.asks[1].price, Price::from("7634.5"));
791 assert_eq!(depth10.asks[1].size, Quantity::from(67939));
792 assert_eq!(depth10.ask_counts[1], 1);
793
794 assert_eq!(depth10.bids[2], NULL_ORDER);
796 assert_eq!(depth10.bid_counts[2], 0);
797 assert_eq!(depth10.asks[2], NULL_ORDER);
798 assert_eq!(depth10.ask_counts[2], 0);
799 }
800
801 #[rstest]
802 fn test_parse_book_snapshot_message_as_quote() {
803 let json_data = load_test_json("book_snapshot.json");
804 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
805
806 let price_precision = 1;
807 let size_precision = 0;
808 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
809 let quote =
810 parse_book_snapshot_msg_as_quote(&msg, price_precision, size_precision, instrument_id)
811 .expect("Failed to parse book snapshot quote message");
812
813 assert_eq!(quote.instrument_id, instrument_id);
814 assert_eq!(quote.bid_price, Price::from("7633.5"));
815 assert_eq!(quote.bid_size, Quantity::from(1906067));
816 assert_eq!(quote.ask_price, Price::from("7634.0"));
817 assert_eq!(quote.ask_size, Quantity::from(1467849));
818 assert_eq!(quote.ts_event, UnixNanos::from(1572010786950000000));
819 assert_eq!(quote.ts_init, UnixNanos::from(1572010786961000000));
820 }
821
822 #[rstest]
823 fn test_parse_trade_message() {
824 let json_data = load_test_json("trade.json");
825 let msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
826
827 let price_precision = 0;
828 let size_precision = 0;
829 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
830 let trade = parse_trade_msg(&msg, price_precision, size_precision, instrument_id)
831 .expect("Failed to parse trade message");
832
833 assert_eq!(trade.instrument_id, instrument_id);
834 assert_eq!(trade.price, Price::from("7996"));
835 assert_eq!(trade.size, Quantity::from(50));
836 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
837 assert_eq!(trade.ts_event, UnixNanos::from(1571826769669000000));
838 assert_eq!(trade.ts_init, UnixNanos::from(1571826769740000000));
839 }
840
841 fn build_trade_msg_without_id() -> TradeMsg {
842 let json_data = load_test_json("trade.json");
843 let mut msg: TradeMsg = serde_json::from_str(&json_data).unwrap();
844 msg.id = None;
845 msg
846 }
847
848 #[rstest]
849 fn test_parse_trade_message_derives_trade_id_when_missing() {
850 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
851
852 let first = parse_trade_msg(&build_trade_msg_without_id(), 0, 0, instrument_id).unwrap();
853 let second = parse_trade_msg(&build_trade_msg_without_id(), 0, 0, instrument_id).unwrap();
854
855 assert_eq!(first.trade_id, second.trade_id, "derivation must be stable");
856 assert_eq!(first.trade_id.as_str().len(), 16);
857
858 let mut altered = build_trade_msg_without_id();
859 altered.price = 7997.0;
860 let altered_trade = parse_trade_msg(&altered, 0, 0, instrument_id).unwrap();
861 assert_ne!(first.trade_id, altered_trade.trade_id);
862 }
863
864 #[rstest]
865 fn test_parse_trade_message_derives_trade_id_when_empty() {
866 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
867
868 let mut msg = build_trade_msg_without_id();
869 msg.id = Some(String::new());
870
871 let trade = parse_trade_msg(&msg, 0, 0, instrument_id).unwrap();
872 let fallback = parse_trade_msg(&build_trade_msg_without_id(), 0, 0, instrument_id).unwrap();
873 assert_eq!(trade.trade_id, fallback.trade_id);
874 }
875
876 #[rstest]
877 fn test_parse_bar_message() {
878 let json_data = load_test_json("bar.json");
879 let msg: BarMsg = serde_json::from_str(&json_data).unwrap();
880
881 let price_precision = 1;
882 let size_precision = 0;
883 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
884 let bar = parse_bar_msg(&msg, price_precision, size_precision, instrument_id).unwrap();
885
886 assert_eq!(
887 bar.bar_type,
888 BarType::from("XBTUSD.BITMEX-10000-MILLISECOND-LAST-EXTERNAL")
889 );
890 assert_eq!(bar.open, Price::from("7623.5"));
891 assert_eq!(bar.high, Price::from("7623.5"));
892 assert_eq!(bar.low, Price::from("7623"));
893 assert_eq!(bar.close, Price::from("7623.5"));
894 assert_eq!(bar.volume, Quantity::from(37034));
895 assert_eq!(bar.ts_event, UnixNanos::from(1572009100000000000));
896 assert_eq!(bar.ts_init, UnixNanos::from(1572009100369000000));
897 }
898
899 #[rstest]
900 fn test_parse_tardis_ws_message_book_snapshot_routes_to_depth10() {
901 let json_data = load_test_json("book_snapshot.json");
902 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
903 let ws_msg = WsMessage::BookSnapshot(msg);
904
905 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
906 let info = Arc::new(TardisInstrumentMiniInfo::new(
907 instrument_id,
908 None,
909 TardisExchange::Bitmex,
910 1,
911 0,
912 ));
913
914 let result = parse_tardis_ws_message(ws_msg, &info, &BookSnapshotOutput::Depth10);
915
916 assert!(result.is_some());
917 assert!(matches!(result.unwrap(), Data::Depth10(_)));
918 }
919
920 #[rstest]
921 fn test_parse_tardis_ws_message_sparse_book_snapshot_routes_to_depth10() {
922 let json_data = r#"{
923 "type": "book_snapshot",
924 "symbol": "ETC",
925 "exchange": "hyperliquid",
926 "name": "book_snapshot_20_10s",
927 "depth": 20,
928 "interval": 10000,
929 "bids": [{"price": 20.002, "amount": 5.81}],
930 "asks": [{"price": 20.003, "amount": 162.45}, {}],
931 "timestamp": "2025-03-03T10:48:10.000Z",
932 "localTimestamp": "2025-03-03T10:48:10.596818Z"
933 }"#;
934 let msg: BookSnapshotMsg = serde_json::from_str(json_data).unwrap();
935 let ws_msg = WsMessage::BookSnapshot(msg);
936
937 let instrument_id = InstrumentId::from("ETC.HYPERLIQUID");
938 let info = Arc::new(TardisInstrumentMiniInfo::new(
939 instrument_id,
940 None,
941 TardisExchange::Hyperliquid,
942 3,
943 2,
944 ));
945
946 let result = parse_tardis_ws_message(ws_msg, &info, &BookSnapshotOutput::Depth10);
947
948 assert!(result.is_some());
949 assert!(matches!(result.unwrap(), Data::Depth10(_)));
950 }
951
952 #[rstest]
953 fn test_parse_tardis_ws_message_book_snapshot_routes_to_deltas() {
954 let json_data = load_test_json("book_snapshot.json");
955 let msg: BookSnapshotMsg = serde_json::from_str(&json_data).unwrap();
956 let ws_msg = WsMessage::BookSnapshot(msg);
957
958 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
959 let info = Arc::new(TardisInstrumentMiniInfo::new(
960 instrument_id,
961 None,
962 TardisExchange::Bitmex,
963 1,
964 0,
965 ));
966
967 let result = parse_tardis_ws_message(ws_msg, &info, &BookSnapshotOutput::Deltas);
968
969 assert!(result.is_some());
970 assert!(matches!(result.unwrap(), Data::Deltas(_)));
971 }
972
973 #[rstest]
974 fn test_parse_derivative_ticker_funding_rate() {
975 let json_data = load_test_json("derivative_ticker.json");
976 let msg: DerivativeTickerMsg = serde_json::from_str(&json_data).unwrap();
977
978 let instrument_id = InstrumentId::from("BTC-PERPETUAL.DERIBIT");
979
980 let result = parse_derivative_ticker_msg(&msg, instrument_id).unwrap();
981 assert!(result.is_some());
982
983 let funding = result.unwrap();
984 assert_eq!(funding.instrument_id, instrument_id);
985 assert_eq!(funding.rate.to_string(), "-0.00001568");
986 assert!(funding.ts_event.as_u64() > 0);
987 assert!(funding.ts_init.as_u64() > 0);
988 }
989
990 #[rstest]
991 fn test_parse_derivative_ticker_mark_price() {
992 let json_data = load_test_json("derivative_ticker.json");
993 let msg: DerivativeTickerMsg = serde_json::from_str(&json_data).unwrap();
994
995 let instrument_id = InstrumentId::from("BTC-PERPETUAL.DERIBIT");
996 let price_precision = 2;
997
998 let result =
999 parse_derivative_ticker_mark_price(&msg, instrument_id, price_precision).unwrap();
1000 assert!(result.is_some());
1001
1002 let mark = result.unwrap();
1003 assert_eq!(mark.instrument_id, instrument_id);
1004 assert_eq!(mark.value, Price::new(7987.56, price_precision));
1005 assert!(mark.ts_event.as_u64() > 0);
1006 assert!(mark.ts_init.as_u64() > 0);
1007 }
1008
1009 #[rstest]
1010 fn test_parse_derivative_ticker_index_price() {
1011 let json_data = load_test_json("derivative_ticker.json");
1012 let msg: DerivativeTickerMsg = serde_json::from_str(&json_data).unwrap();
1013
1014 let instrument_id = InstrumentId::from("BTC-PERPETUAL.DERIBIT");
1015 let price_precision = 2;
1016
1017 let result =
1018 parse_derivative_ticker_index_price(&msg, instrument_id, price_precision).unwrap();
1019 assert!(result.is_some());
1020
1021 let index = result.unwrap();
1022 assert_eq!(index.instrument_id, instrument_id);
1023 assert_eq!(index.value, Price::new(7989.28, price_precision));
1024 assert!(index.ts_event.as_u64() > 0);
1025 assert!(index.ts_init.as_u64() > 0);
1026 }
1027
1028 #[rstest]
1029 fn test_parse_derivative_ticker_missing_fields() {
1030 let json = r#"{
1032 "type": "derivative_ticker",
1033 "symbol": "BTCUSD",
1034 "exchange": "bitmex",
1035 "lastPrice": null,
1036 "openInterest": null,
1037 "fundingRate": 0.0001,
1038 "indexPrice": null,
1039 "markPrice": null,
1040 "timestamp": "2024-01-01T00:00:00.000Z",
1041 "localTimestamp": "2024-01-01T00:00:00.100Z"
1042 }"#;
1043 let msg: DerivativeTickerMsg = serde_json::from_str(json).unwrap();
1044
1045 let instrument_id = InstrumentId::from("BTCUSD.BITMEX");
1046
1047 let funding = parse_derivative_ticker_msg(&msg, instrument_id).unwrap();
1048 assert!(funding.is_some());
1049
1050 let mark = parse_derivative_ticker_mark_price(&msg, instrument_id, 1).unwrap();
1051 assert!(mark.is_none());
1052
1053 let index = parse_derivative_ticker_index_price(&msg, instrument_id, 1).unwrap();
1054 assert!(index.is_none());
1055 }
1056}