1use std::{ffi::c_char, num::NonZeroUsize};
48
49use databento::dbn;
50use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND};
51use nautilus_model::{
52 data::{
53 Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
54 OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
55 },
56 enums::{
57 AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
58 InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
59 },
60 identifiers::{InstrumentId, Symbol, TradeId},
61 instruments::{
62 Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
63 },
64 types::{
65 Currency, Price, Quantity,
66 price::{PRICE_UNDEF, decode_raw_price_i64},
67 },
68};
69use ustr::Ustr;
70
71use super::{
72 enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
73 types::{DatabentoImbalance, DatabentoStatistics},
74};
75
76const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
77
78const BAR_SPEC_1S: BarSpecification = BarSpecification {
79 step: STEP_ONE,
80 aggregation: BarAggregation::Second,
81 price_type: PriceType::Last,
82};
83const BAR_SPEC_1M: BarSpecification = BarSpecification {
84 step: STEP_ONE,
85 aggregation: BarAggregation::Minute,
86 price_type: PriceType::Last,
87};
88const BAR_SPEC_1H: BarSpecification = BarSpecification {
89 step: STEP_ONE,
90 aggregation: BarAggregation::Hour,
91 price_type: PriceType::Last,
92};
93const BAR_SPEC_1D: BarSpecification = BarSpecification {
94 step: STEP_ONE,
95 aggregation: BarAggregation::Day,
96 price_type: PriceType::Last,
97};
98
99const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
100const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
101const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
102const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
103
104const FNV_OFFSET_BASIS: u64 = 0xcbf2_9ce4_8422_2325;
106const FNV_PRIME: u64 = 0x0100_0000_01b3;
107
108#[must_use]
109pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
110 match c as u8 as char {
111 'Y' => Some(true),
112 'N' => Some(false),
113 _ => None,
114 }
115}
116
117#[must_use]
118pub const fn parse_order_side(c: c_char) -> OrderSide {
119 match c as u8 as char {
120 'A' => OrderSide::Sell,
121 'B' => OrderSide::Buy,
122 _ => OrderSide::NoOrderSide,
123 }
124}
125
126#[must_use]
127pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
128 match c as u8 as char {
129 'A' => AggressorSide::Seller,
130 'B' => AggressorSide::Buyer,
131 _ => AggressorSide::NoAggressor,
132 }
133}
134
135fn fnv1a_mix(hash: &mut u64, bytes: &[u8]) {
136 for &byte in bytes {
137 *hash ^= u64::from(byte);
138 *hash = hash.wrapping_mul(FNV_PRIME);
139 }
140 *hash ^= 0xff;
141 *hash = hash.wrapping_mul(FNV_PRIME);
142}
143
144fn derive_cmbp_trade_id(
150 instrument_id: InstrumentId,
151 ts_event: u64,
152 ts_recv: u64,
153 price: i64,
154 size: u32,
155 side: c_char,
156) -> TradeId {
157 let mut hash: u64 = FNV_OFFSET_BASIS;
158 fnv1a_mix(&mut hash, instrument_id.to_string().as_bytes());
159 fnv1a_mix(&mut hash, &ts_event.to_le_bytes());
160 fnv1a_mix(&mut hash, &ts_recv.to_le_bytes());
161 fnv1a_mix(&mut hash, &price.to_le_bytes());
162 fnv1a_mix(&mut hash, &size.to_le_bytes());
163 fnv1a_mix(&mut hash, &[side as u8]);
164 TradeId::new(format!("{hash:016x}"))
165}
166
167pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
173 match c as u8 as char {
174 'A' => Ok(BookAction::Add),
175 'C' => Ok(BookAction::Delete),
176 'F' => Ok(BookAction::Update),
177 'M' => Ok(BookAction::Update),
178 'R' => Ok(BookAction::Clear),
179 invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
180 }
181}
182
183pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
189 match c as u8 as char {
190 'C' => Ok(OptionKind::Call),
191 'P' => Ok(OptionKind::Put),
192 invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
193 }
194}
195
196fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
197 match value {
198 Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
199 log::warn!("Unknown currency code '{value}', defaulting to USD");
200 Currency::USD()
201 }),
202 Ok(_) => Currency::USD(),
203 Err(e) => {
204 log::error!("Error parsing currency: {e}");
205 Currency::USD()
206 }
207 }
208}
209
210#[must_use]
214pub fn parse_cfi_iso10926(value: &str) -> (Option<AssetClass>, Option<InstrumentClass>) {
215 let chars: Vec<char> = value.chars().collect();
216 if chars.len() < 3 {
217 return (None, None);
218 }
219
220 let cfi_category = chars[0];
222 let cfi_group = chars[1];
223 let cfi_attribute1 = chars[2];
224 let mut asset_class = match cfi_category {
229 'D' => Some(AssetClass::Debt),
230 'E' => Some(AssetClass::Equity),
231 'S' => None,
232 _ => None,
233 };
234
235 let instrument_class = match cfi_group {
236 'I' => Some(InstrumentClass::Future),
237 _ => None,
238 };
239
240 if cfi_attribute1 == 'I' {
241 asset_class = Some(AssetClass::Index);
242 }
243
244 (asset_class, instrument_class)
245}
246
247fn decode_underlying(underlying_str: &str, symbol: &Symbol) -> Ustr {
248 if underlying_str.is_empty() {
249 symbol
251 .as_str()
252 .split_whitespace()
253 .next()
254 .map_or_else(|| symbol.inner(), Ustr::from)
255 } else {
256 Ustr::from(underlying_str)
257 }
258}
259
260pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
268 let value_str = match value {
269 0 => return Ok(None),
270 1 => "Scheduled",
271 2 => "Surveillance intervention",
272 3 => "Market event",
273 4 => "Instrument activation",
274 5 => "Instrument expiration",
275 6 => "Recovery in process",
276 10 => "Regulatory",
277 11 => "Administrative",
278 12 => "Non-compliance",
279 13 => "Filings not current",
280 14 => "SEC trading suspension",
281 15 => "New issue",
282 16 => "Issue available",
283 17 => "Issues reviewed",
284 18 => "Filing requirements satisfied",
285 30 => "News pending",
286 31 => "News released",
287 32 => "News and resumption times",
288 33 => "News not forthcoming",
289 40 => "Order imbalance",
290 50 => "LULD pause",
291 60 => "Operational",
292 70 => "Additional information requested",
293 80 => "Merger effective",
294 90 => "ETF",
295 100 => "Corporate action",
296 110 => "New Security offering",
297 120 => "Market wide halt level 1",
298 121 => "Market wide halt level 2",
299 122 => "Market wide halt level 3",
300 123 => "Market wide halt carryover",
301 124 => "Market wide halt resumption",
302 130 => "Quotation not available",
303 invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
304 };
305
306 Ok(Some(Ustr::from(value_str)))
307}
308
309pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
315 let value_str = match value {
316 0 => return Ok(None),
317 1 => "No cancel",
318 2 => "Change trading session",
319 3 => "Implied matching on",
320 4 => "Implied matching off",
321 _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
322 };
323
324 Ok(Some(Ustr::from(value_str)))
325}
326
327#[inline(always)]
336pub fn decode_price(value: i64, precision: u8, field_name: &str) -> anyhow::Result<Price> {
337 if value == i64::MAX {
338 anyhow::bail!("Missing required price for `{field_name}`")
339 } else {
340 Ok(Price::from_raw(decode_raw_price_i64(value), precision))
341 }
342}
343
344#[inline(always)]
349#[must_use]
350pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
351 if value == i64::MAX {
352 None
353 } else {
354 Some(Price::from_raw(decode_raw_price_i64(value), precision))
355 }
356}
357
358#[inline(always)]
363#[must_use]
364pub fn decode_price_or_undef(value: i64, precision: u8) -> Price {
365 if value == i64::MAX {
366 Price::from_raw(PRICE_UNDEF, 0)
367 } else {
368 Price::from_raw(decode_raw_price_i64(value), precision)
369 }
370}
371
372#[inline(always)]
378#[must_use]
379pub fn precision_from_raw(value: i64) -> u8 {
380 let mut v = value.unsigned_abs();
381 if v == 0 {
382 return 0;
383 }
384 let mut trailing = 0u8;
385 while trailing < 9 && v.is_multiple_of(10) {
386 v /= 10;
387 trailing += 1;
388 }
389 9 - trailing
390}
391
392#[inline(always)]
398#[must_use]
399pub fn decode_price_increment(value: i64, precision: u8) -> Price {
400 match value {
401 0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
402 _ => {
403 let derived = precision_from_raw(value).max(precision);
404 Price::from_raw(decode_raw_price_i64(value), derived)
405 }
406 }
407}
408
409#[inline(always)]
411#[must_use]
412pub fn decode_quantity(value: u64) -> Quantity {
413 Quantity::from(value)
414}
415
416#[inline(always)]
418#[must_use]
419pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
420 match value {
421 i64::MAX => None,
422 _ => Some(Quantity::from(value)),
423 }
424}
425
426#[inline(always)]
434pub fn decode_timestamp(value: u64, field_name: &str) -> anyhow::Result<UnixNanos> {
435 if value == dbn::UNDEF_TIMESTAMP {
436 anyhow::bail!("Missing required timestamp for `{field_name}`")
437 } else {
438 Ok(UnixNanos::from(value))
439 }
440}
441
442#[inline(always)]
446#[must_use]
447pub fn decode_optional_timestamp(value: u64) -> Option<UnixNanos> {
448 if value == dbn::UNDEF_TIMESTAMP {
449 None
450 } else {
451 Some(UnixNanos::from(value))
452 }
453}
454
455pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
462 const SCALE: u128 = 1_000_000_000;
463
464 match value {
465 0 | i64::MAX => Ok(Quantity::from(1)),
466 v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
467 v => {
468 let abs = v as u128;
471 let int_part = abs / SCALE;
472 let frac_part = abs % SCALE;
473
474 if frac_part == 0 {
477 Ok(Quantity::from(int_part as u64))
479 } else {
480 let mut frac_str = format!("{frac_part:09}");
481 while frac_str.ends_with('0') {
482 frac_str.pop();
483 }
484 let s = format!("{int_part}.{frac_str}");
485 Ok(Quantity::from(s))
486 }
487 }
488 }
489}
490
491#[inline(always)]
493#[must_use]
494pub fn decode_lot_size(value: i32) -> Quantity {
495 match value {
496 0 | i32::MAX => Quantity::from(1),
497 value => Quantity::from(value),
498 }
499}
500
501#[inline(always)]
502#[must_use]
503fn is_trade_msg(action: c_char) -> bool {
504 action as u8 as char == 'T'
505}
506
507#[inline(always)]
512#[must_use]
513fn has_valid_bid_ask(bid_px: i64, ask_px: i64) -> bool {
514 bid_px != i64::MAX && ask_px != i64::MAX
515}
516
517pub fn decode_mbo_msg(
526 msg: &dbn::MboMsg,
527 instrument_id: InstrumentId,
528 price_precision: u8,
529 ts_init: Option<UnixNanos>,
530 include_trades: bool,
531) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
532 let side = parse_order_side(msg.side);
533 if is_trade_msg(msg.action) {
534 if include_trades && msg.size > 0 {
535 let price = decode_price_or_undef(msg.price, price_precision);
536 let size = decode_quantity(msg.size as u64);
537 let aggressor_side = parse_aggressor_side(msg.side);
538 let trade_id = TradeId::new(itoa::Buffer::new().format(msg.sequence));
539 let ts_event = msg.ts_recv.into();
540 let ts_init = ts_init.unwrap_or(ts_event);
541
542 let trade = TradeTick::new(
543 instrument_id,
544 price,
545 size,
546 aggressor_side,
547 trade_id,
548 ts_event,
549 ts_init,
550 );
551 return Ok((None, Some(trade)));
552 }
553
554 return Ok((None, None));
555 }
556
557 let action = parse_book_action(msg.action)?;
558 let price = decode_price_or_undef(msg.price, price_precision);
559 let size = decode_quantity(msg.size as u64);
560 let order = BookOrder::new(side, price, size, msg.order_id);
561
562 let ts_event = msg.ts_recv.into();
563 let ts_init = ts_init.unwrap_or(ts_event);
564
565 let delta = OrderBookDelta::new(
566 instrument_id,
567 action,
568 order,
569 msg.flags.raw(),
570 msg.sequence.into(),
571 ts_event,
572 ts_init,
573 );
574
575 Ok((Some(delta), None))
576}
577
578pub fn decode_trade_msg(
584 msg: &dbn::TradeMsg,
585 instrument_id: InstrumentId,
586 price_precision: u8,
587 ts_init: Option<UnixNanos>,
588) -> anyhow::Result<TradeTick> {
589 let ts_event = msg.ts_recv.into();
590 let ts_init = ts_init.unwrap_or(ts_event);
591
592 let trade = TradeTick::new(
593 instrument_id,
594 decode_price_or_undef(msg.price, price_precision),
595 decode_quantity(msg.size as u64),
596 parse_aggressor_side(msg.side),
597 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
598 ts_event,
599 ts_init,
600 );
601
602 Ok(trade)
603}
604
605pub fn decode_tbbo_msg(
614 msg: &dbn::TbboMsg,
615 instrument_id: InstrumentId,
616 price_precision: u8,
617 ts_init: Option<UnixNanos>,
618) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
619 let top_level = &msg.levels[0];
620 let ts_event = msg.ts_recv.into();
621 let ts_init = ts_init.unwrap_or(ts_event);
622
623 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
624 Some(QuoteTick::new(
625 instrument_id,
626 decode_price_or_undef(top_level.bid_px, price_precision),
627 decode_price_or_undef(top_level.ask_px, price_precision),
628 decode_quantity(top_level.bid_sz as u64),
629 decode_quantity(top_level.ask_sz as u64),
630 ts_event,
631 ts_init,
632 ))
633 } else {
634 None
635 };
636
637 let trade = TradeTick::new(
638 instrument_id,
639 decode_price_or_undef(msg.price, price_precision),
640 decode_quantity(msg.size as u64),
641 parse_aggressor_side(msg.side),
642 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
643 ts_event,
644 ts_init,
645 );
646
647 Ok((maybe_quote, trade))
648}
649
650pub fn decode_mbp1_msg(
658 msg: &dbn::Mbp1Msg,
659 instrument_id: InstrumentId,
660 price_precision: u8,
661 ts_init: Option<UnixNanos>,
662 include_trades: bool,
663) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
664 let top_level = &msg.levels[0];
665 let ts_event = msg.ts_recv.into();
666 let ts_init = ts_init.unwrap_or(ts_event);
667
668 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
669 Some(QuoteTick::new(
670 instrument_id,
671 decode_price_or_undef(top_level.bid_px, price_precision),
672 decode_price_or_undef(top_level.ask_px, price_precision),
673 decode_quantity(top_level.bid_sz as u64),
674 decode_quantity(top_level.ask_sz as u64),
675 ts_event,
676 ts_init,
677 ))
678 } else {
679 None
680 };
681
682 let maybe_trade = if include_trades && is_trade_msg(msg.action) {
683 Some(TradeTick::new(
684 instrument_id,
685 decode_price_or_undef(msg.price, price_precision),
686 decode_quantity(msg.size as u64),
687 parse_aggressor_side(msg.side),
688 TradeId::new(itoa::Buffer::new().format(msg.sequence)),
689 ts_event,
690 ts_init,
691 ))
692 } else {
693 None
694 };
695
696 Ok((maybe_quote, maybe_trade))
697}
698
699pub fn decode_bbo_msg(
707 msg: &dbn::BboMsg,
708 instrument_id: InstrumentId,
709 price_precision: u8,
710 ts_init: Option<UnixNanos>,
711) -> anyhow::Result<Option<QuoteTick>> {
712 let top_level = &msg.levels[0];
713 if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
714 return Ok(None);
715 }
716
717 let ts_event = msg.ts_recv.into();
718 let ts_init = ts_init.unwrap_or(ts_event);
719
720 let quote = QuoteTick::new(
721 instrument_id,
722 decode_price_or_undef(top_level.bid_px, price_precision),
723 decode_price_or_undef(top_level.ask_px, price_precision),
724 decode_quantity(top_level.bid_sz as u64),
725 decode_quantity(top_level.ask_sz as u64),
726 ts_event,
727 ts_init,
728 );
729
730 Ok(Some(quote))
731}
732
733pub fn decode_mbp10_msg(
739 msg: &dbn::Mbp10Msg,
740 instrument_id: InstrumentId,
741 price_precision: u8,
742 ts_init: Option<UnixNanos>,
743) -> anyhow::Result<OrderBookDepth10> {
744 let mut bids = Vec::with_capacity(DEPTH10_LEN);
745 let mut asks = Vec::with_capacity(DEPTH10_LEN);
746 let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
747 let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
748
749 for level in &msg.levels {
750 let bid_order = BookOrder::new(
751 OrderSide::Buy,
752 decode_price_or_undef(level.bid_px, price_precision),
753 decode_quantity(level.bid_sz as u64),
754 0,
755 );
756
757 let ask_order = BookOrder::new(
758 OrderSide::Sell,
759 decode_price_or_undef(level.ask_px, price_precision),
760 decode_quantity(level.ask_sz as u64),
761 0,
762 );
763
764 bids.push(bid_order);
765 asks.push(ask_order);
766 bid_counts.push(level.bid_ct);
767 ask_counts.push(level.ask_ct);
768 }
769
770 let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
771 anyhow::anyhow!(
772 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
773 v.len()
774 )
775 })?;
776
777 let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
778 anyhow::anyhow!(
779 "Expected exactly {DEPTH10_LEN} ask levels, received {}",
780 v.len()
781 )
782 })?;
783
784 let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
785 anyhow::anyhow!(
786 "Expected exactly {DEPTH10_LEN} bid counts, received {}",
787 v.len()
788 )
789 })?;
790
791 let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
792 anyhow::anyhow!(
793 "Expected exactly {DEPTH10_LEN} ask counts, received {}",
794 v.len()
795 )
796 })?;
797
798 let ts_event = msg.ts_recv.into();
799 let ts_init = ts_init.unwrap_or(ts_event);
800
801 let depth = OrderBookDepth10::new(
802 instrument_id,
803 bids,
804 asks,
805 bid_counts,
806 ask_counts,
807 msg.flags.raw(),
808 msg.sequence.into(),
809 ts_event,
810 ts_init,
811 );
812
813 Ok(depth)
814}
815
816pub fn decode_cmbp1_msg(
825 msg: &dbn::Cmbp1Msg,
826 instrument_id: InstrumentId,
827 price_precision: u8,
828 ts_init: Option<UnixNanos>,
829 include_trades: bool,
830) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
831 let top_level = &msg.levels[0];
832 let ts_event = msg.ts_recv.into();
833 let ts_init = ts_init.unwrap_or(ts_event);
834
835 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
836 Some(QuoteTick::new(
837 instrument_id,
838 decode_price_or_undef(top_level.bid_px, price_precision),
839 decode_price_or_undef(top_level.ask_px, price_precision),
840 decode_quantity(top_level.bid_sz as u64),
841 decode_quantity(top_level.ask_sz as u64),
842 ts_event,
843 ts_init,
844 ))
845 } else {
846 None
847 };
848
849 let maybe_trade = if include_trades && is_trade_msg(msg.action) {
850 let trade_id = derive_cmbp_trade_id(
852 instrument_id,
853 msg.hd.ts_event,
854 msg.ts_recv,
855 msg.price,
856 msg.size,
857 msg.side,
858 );
859 Some(TradeTick::new(
860 instrument_id,
861 decode_price_or_undef(msg.price, price_precision),
862 decode_quantity(msg.size as u64),
863 parse_aggressor_side(msg.side),
864 trade_id,
865 ts_event,
866 ts_init,
867 ))
868 } else {
869 None
870 };
871
872 Ok((maybe_quote, maybe_trade))
873}
874
875pub fn decode_cbbo_msg(
883 msg: &dbn::CbboMsg,
884 instrument_id: InstrumentId,
885 price_precision: u8,
886 ts_init: Option<UnixNanos>,
887) -> anyhow::Result<Option<QuoteTick>> {
888 let top_level = &msg.levels[0];
889 if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
890 return Ok(None);
891 }
892
893 let ts_event = msg.ts_recv.into();
894 let ts_init = ts_init.unwrap_or(ts_event);
895
896 let quote = QuoteTick::new(
897 instrument_id,
898 decode_price_or_undef(top_level.bid_px, price_precision),
899 decode_price_or_undef(top_level.ask_px, price_precision),
900 decode_quantity(top_level.bid_sz as u64),
901 decode_quantity(top_level.ask_sz as u64),
902 ts_event,
903 ts_init,
904 );
905
906 Ok(Some(quote))
907}
908
909pub fn decode_tcbbo_msg(
918 msg: &dbn::CbboMsg,
919 instrument_id: InstrumentId,
920 price_precision: u8,
921 ts_init: Option<UnixNanos>,
922) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
923 let top_level = &msg.levels[0];
924 let ts_event = msg.ts_recv.into();
925 let ts_init = ts_init.unwrap_or(ts_event);
926
927 let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
928 Some(QuoteTick::new(
929 instrument_id,
930 decode_price_or_undef(top_level.bid_px, price_precision),
931 decode_price_or_undef(top_level.ask_px, price_precision),
932 decode_quantity(top_level.bid_sz as u64),
933 decode_quantity(top_level.ask_sz as u64),
934 ts_event,
935 ts_init,
936 ))
937 } else {
938 None
939 };
940
941 let trade_id = derive_cmbp_trade_id(
943 instrument_id,
944 msg.hd.ts_event,
945 msg.ts_recv,
946 msg.price,
947 msg.size,
948 msg.side,
949 );
950 let trade = TradeTick::new(
951 instrument_id,
952 decode_price_or_undef(msg.price, price_precision),
953 decode_quantity(msg.size as u64),
954 parse_aggressor_side(msg.side),
955 trade_id,
956 ts_event,
957 ts_init,
958 );
959
960 Ok((maybe_quote, trade))
961}
962
963pub fn decode_bar_type(
967 msg: &dbn::OhlcvMsg,
968 instrument_id: InstrumentId,
969) -> anyhow::Result<BarType> {
970 let bar_type = match msg.hd.rtype {
971 32 => {
972 BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
974 }
975 33 => {
976 BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
978 }
979 34 => {
980 BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
982 }
983 35 => {
984 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
986 }
987 36 => {
988 BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
990 }
991 _ => anyhow::bail!(
992 "`rtype` is not a supported bar aggregation, was {}",
993 msg.hd.rtype
994 ),
995 };
996
997 Ok(bar_type)
998}
999
1000pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
1004 let adjustment = match msg.hd.rtype {
1005 32 => {
1006 BAR_CLOSE_ADJUSTMENT_1S
1008 }
1009 33 => {
1010 BAR_CLOSE_ADJUSTMENT_1M
1012 }
1013 34 => {
1014 BAR_CLOSE_ADJUSTMENT_1H
1016 }
1017 35 | 36 => {
1018 BAR_CLOSE_ADJUSTMENT_1D
1020 }
1021 _ => anyhow::bail!(
1022 "`rtype` is not a supported bar aggregation, was {}",
1023 msg.hd.rtype
1024 ),
1025 };
1026
1027 Ok(adjustment.into())
1028}
1029
1030pub fn decode_ohlcv_msg(
1034 msg: &dbn::OhlcvMsg,
1035 instrument_id: InstrumentId,
1036 price_precision: u8,
1037 ts_init: Option<UnixNanos>,
1038 timestamp_on_close: bool,
1039) -> anyhow::Result<Bar> {
1040 let bar_type = decode_bar_type(msg, instrument_id)?;
1041 let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
1042
1043 let ts_event_raw = msg.hd.ts_event.into();
1044 let ts_close = ts_event_raw + ts_event_adjustment;
1045 let ts_init = ts_init.unwrap_or(ts_close); let ts_event = if timestamp_on_close {
1048 ts_close
1049 } else {
1050 ts_event_raw
1051 };
1052
1053 let bar = Bar::new(
1054 bar_type,
1055 decode_price_or_undef(msg.open, price_precision),
1056 decode_price_or_undef(msg.high, price_precision),
1057 decode_price_or_undef(msg.low, price_precision),
1058 decode_price_or_undef(msg.close, price_precision),
1059 decode_quantity(msg.volume),
1060 ts_event,
1061 ts_init,
1062 );
1063
1064 Ok(bar)
1065}
1066
1067pub fn decode_status_msg(
1073 msg: &dbn::StatusMsg,
1074 instrument_id: InstrumentId,
1075 ts_init: Option<UnixNanos>,
1076) -> anyhow::Result<InstrumentStatus> {
1077 let ts_event = msg.hd.ts_event.into();
1078 let ts_init = ts_init.unwrap_or(ts_event);
1079
1080 let action = MarketStatusAction::from_u16(msg.action)
1081 .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
1082
1083 let status = InstrumentStatus::new(
1084 instrument_id,
1085 action,
1086 ts_event,
1087 ts_init,
1088 parse_status_reason(msg.reason)?,
1089 parse_status_trading_event(msg.trading_event)?,
1090 parse_optional_bool(msg.is_trading),
1091 parse_optional_bool(msg.is_quoting),
1092 parse_optional_bool(msg.is_short_sell_restricted),
1093 );
1094
1095 Ok(status)
1096}
1097
1098pub fn decode_record(
1102 record: &dbn::RecordRef,
1103 instrument_id: InstrumentId,
1104 price_precision: u8,
1105 ts_init: Option<UnixNanos>,
1106 include_trades: bool,
1107 bars_timestamp_on_close: bool,
1108) -> anyhow::Result<(Option<Data>, Option<Data>)> {
1109 let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
1113 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1114 let result = decode_mbo_msg(
1115 msg,
1116 instrument_id,
1117 price_precision,
1118 Some(ts_init),
1119 include_trades,
1120 )?;
1121
1122 match result {
1123 (Some(delta), None) => (Some(Data::Delta(delta)), None),
1124 (None, Some(trade)) => (Some(Data::Trade(trade)), None),
1125 (None, None) => (None, None),
1126 _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
1127 }
1128 } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
1129 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1130 let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1131 (Some(Data::Trade(trade)), None)
1132 } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
1133 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1134 let (maybe_quote, maybe_trade) = decode_mbp1_msg(
1135 msg,
1136 instrument_id,
1137 price_precision,
1138 Some(ts_init),
1139 include_trades,
1140 )?;
1141 (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1142 } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
1143 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1144 let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1145 (maybe_quote.map(Data::Quote), None)
1146 } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
1147 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1148 let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1149 (maybe_quote.map(Data::Quote), None)
1150 } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
1151 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1152 let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1153 (Some(Data::from(depth)), None)
1154 } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
1155 let bar = decode_ohlcv_msg(
1158 msg,
1159 instrument_id,
1160 price_precision,
1161 ts_init,
1162 bars_timestamp_on_close,
1163 )?;
1164 (Some(Data::Bar(bar)), None)
1165 } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
1166 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1167 let (maybe_quote, maybe_trade) = decode_cmbp1_msg(
1168 msg,
1169 instrument_id,
1170 price_precision,
1171 Some(ts_init),
1172 include_trades,
1173 )?;
1174 (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1175 } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
1176 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1178 let (maybe_quote, trade) =
1179 decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1180 (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1181 } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
1182 if msg.price != i64::MAX && msg.size > 0 {
1184 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1186 let (maybe_quote, trade) =
1187 decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1188 (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1189 } else {
1190 let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1192 let maybe_quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1193 (maybe_quote.map(Data::Quote), None)
1194 }
1195 } else {
1196 anyhow::bail!("DBN message type is not currently supported")
1197 };
1198
1199 Ok(result)
1200}
1201
1202const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
1203 match ts_init {
1204 Some(ts_init) => ts_init,
1205 None => msg_timestamp,
1206 }
1207}
1208
1209pub fn decode_instrument_def_msg(
1213 msg: &dbn::InstrumentDefMsg,
1214 instrument_id: InstrumentId,
1215 ts_init: Option<UnixNanos>,
1216) -> anyhow::Result<InstrumentAny> {
1217 match msg.instrument_class as u8 as char {
1218 'K' => Ok(InstrumentAny::Equity(decode_equity(
1219 msg,
1220 instrument_id,
1221 ts_init,
1222 )?)),
1223 'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
1224 msg,
1225 instrument_id,
1226 ts_init,
1227 )?)),
1228 'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1229 msg,
1230 instrument_id,
1231 ts_init,
1232 )?)),
1233 'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1234 msg,
1235 instrument_id,
1236 ts_init,
1237 )?)),
1238 'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1239 msg,
1240 instrument_id,
1241 ts_init,
1242 )?)),
1243 'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1244 'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1245 _ => anyhow::bail!(
1246 "Unsupported `instrument_class` '{}'",
1247 msg.instrument_class as u8 as char
1248 ),
1249 }
1250}
1251
1252pub fn decode_equity(
1258 msg: &dbn::InstrumentDefMsg,
1259 instrument_id: InstrumentId,
1260 ts_init: Option<UnixNanos>,
1261) -> anyhow::Result<Equity> {
1262 let currency = parse_currency_or_usd_default(msg.currency());
1263 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1264 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1265 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1267
1268 Ok(Equity::new(
1269 instrument_id,
1270 instrument_id.symbol,
1271 None, currency,
1273 price_increment.precision,
1274 price_increment,
1275 Some(lot_size),
1276 None, None, None, None, None, None, None, None, None, ts_event,
1286 ts_init,
1287 ))
1288}
1289
1290pub fn decode_futures_contract(
1296 msg: &dbn::InstrumentDefMsg,
1297 instrument_id: InstrumentId,
1298 ts_init: Option<UnixNanos>,
1299) -> anyhow::Result<FuturesContract> {
1300 let currency = parse_currency_or_usd_default(msg.currency());
1301 let exchange = Ustr::from(msg.exchange()?);
1302 let underlying = decode_underlying(msg.asset()?, &instrument_id.symbol);
1303 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1304 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1305 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1306 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1307 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1309
1310 Ok(FuturesContract::new_checked(
1311 instrument_id,
1312 instrument_id.symbol,
1313 asset_class.unwrap_or(AssetClass::Commodity),
1314 Some(exchange),
1315 underlying,
1316 decode_optional_timestamp(msg.activation).unwrap_or_default(),
1317 decode_timestamp(msg.expiration, "expiration")?,
1318 currency,
1319 price_increment.precision,
1320 price_increment,
1321 multiplier,
1322 lot_size,
1323 None, None, None, None, None, None, None, None, None, ts_event,
1333 ts_init,
1334 )?)
1335}
1336
1337pub fn decode_futures_spread(
1343 msg: &dbn::InstrumentDefMsg,
1344 instrument_id: InstrumentId,
1345 ts_init: Option<UnixNanos>,
1346) -> anyhow::Result<FuturesSpread> {
1347 let exchange = Ustr::from(msg.exchange()?);
1348 let underlying = decode_underlying(msg.asset()?, &instrument_id.symbol);
1349 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1350 let strategy_type = Ustr::from(msg.secsubtype()?);
1351 let currency = parse_currency_or_usd_default(msg.currency());
1352 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1353 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1354 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1355 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1357
1358 Ok(FuturesSpread::new_checked(
1359 instrument_id,
1360 instrument_id.symbol,
1361 asset_class.unwrap_or(AssetClass::Commodity),
1362 Some(exchange),
1363 underlying,
1364 strategy_type,
1365 decode_optional_timestamp(msg.activation).unwrap_or_default(),
1366 decode_timestamp(msg.expiration, "expiration")?,
1367 currency,
1368 price_increment.precision,
1369 price_increment,
1370 multiplier,
1371 lot_size,
1372 None, None, None, None, None, None, None, None, None, ts_event,
1382 ts_init,
1383 )?)
1384}
1385
1386pub fn decode_option_contract(
1392 msg: &dbn::InstrumentDefMsg,
1393 instrument_id: InstrumentId,
1394 ts_init: Option<UnixNanos>,
1395) -> anyhow::Result<OptionContract> {
1396 let currency = parse_currency_or_usd_default(msg.currency());
1397 let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1398 let exchange = Ustr::from(msg.exchange()?);
1399 let underlying = decode_underlying(msg.underlying()?, &instrument_id.symbol);
1400 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1401 Some(AssetClass::Equity)
1402 } else {
1403 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1404 asset_class
1405 };
1406 let option_kind = parse_option_kind(msg.instrument_class)?;
1407 let strike_price = decode_price(
1408 msg.strike_price,
1409 strike_price_currency.precision,
1410 "strike_price",
1411 )?;
1412 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1413 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1414 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1415 let ts_event = UnixNanos::from(msg.ts_recv); let ts_init = ts_init.unwrap_or(ts_event);
1417
1418 Ok(OptionContract::new_checked(
1419 instrument_id,
1420 instrument_id.symbol,
1421 asset_class_opt.unwrap_or(AssetClass::Commodity),
1422 Some(exchange),
1423 underlying,
1424 option_kind,
1425 strike_price,
1426 currency,
1427 decode_optional_timestamp(msg.activation).unwrap_or_default(),
1428 decode_timestamp(msg.expiration, "expiration")?,
1429 price_increment.precision,
1430 price_increment,
1431 multiplier,
1432 lot_size,
1433 None, None, None, None, None, None, None, None, None, ts_event,
1443 ts_init,
1444 )?)
1445}
1446
1447pub fn decode_option_spread(
1453 msg: &dbn::InstrumentDefMsg,
1454 instrument_id: InstrumentId,
1455 ts_init: Option<UnixNanos>,
1456) -> anyhow::Result<OptionSpread> {
1457 let exchange = Ustr::from(msg.exchange()?);
1458 let underlying = decode_underlying(msg.underlying()?, &instrument_id.symbol);
1459 let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1460 Some(AssetClass::Equity)
1461 } else {
1462 let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1463 asset_class
1464 };
1465 let strategy_type = Ustr::from(msg.secsubtype()?);
1466 let currency = parse_currency_or_usd_default(msg.currency());
1467 let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1468 let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1469 let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1470 let ts_event = msg.ts_recv.into(); let ts_init = ts_init.unwrap_or(ts_event);
1472
1473 Ok(OptionSpread::new_checked(
1474 instrument_id,
1475 instrument_id.symbol,
1476 asset_class_opt.unwrap_or(AssetClass::Commodity),
1477 Some(exchange),
1478 underlying,
1479 strategy_type,
1480 decode_optional_timestamp(msg.activation).unwrap_or_default(),
1481 decode_timestamp(msg.expiration, "expiration")?,
1482 currency,
1483 price_increment.precision,
1484 price_increment,
1485 multiplier,
1486 lot_size,
1487 None, None, None, None, None, None, None, None, None, ts_event,
1497 ts_init,
1498 )?)
1499}
1500
1501pub fn decode_imbalance_msg(
1507 msg: &dbn::ImbalanceMsg,
1508 instrument_id: InstrumentId,
1509 price_precision: u8,
1510 ts_init: Option<UnixNanos>,
1511) -> anyhow::Result<DatabentoImbalance> {
1512 let ts_event = msg.ts_recv.into();
1513 let ts_init = ts_init.unwrap_or(ts_event);
1514
1515 Ok(DatabentoImbalance::new(
1516 instrument_id,
1517 decode_price_or_undef(msg.ref_price, price_precision),
1518 decode_price_or_undef(msg.cont_book_clr_price, price_precision),
1519 decode_price_or_undef(msg.auct_interest_clr_price, price_precision),
1520 Quantity::new(f64::from(msg.paired_qty), 0),
1521 Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1522 parse_order_side(msg.side),
1523 msg.significant_imbalance as c_char,
1524 msg.hd.ts_event.into(),
1525 ts_event,
1526 ts_init,
1527 ))
1528}
1529
1530pub fn decode_statistics_msg(
1537 msg: &dbn::StatMsg,
1538 instrument_id: InstrumentId,
1539 price_precision: u8,
1540 ts_init: Option<UnixNanos>,
1541) -> anyhow::Result<DatabentoStatistics> {
1542 let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1543 .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1544 let update_action =
1545 DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1546 anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1547 })?;
1548 let ts_event = msg.ts_recv.into();
1549 let ts_init = ts_init.unwrap_or(ts_event);
1550
1551 Ok(DatabentoStatistics::new(
1552 instrument_id,
1553 stat_type,
1554 update_action,
1555 decode_optional_price(msg.price, price_precision),
1556 decode_optional_quantity(msg.quantity),
1557 msg.channel_id,
1558 msg.stat_flags,
1559 msg.sequence,
1560 msg.ts_ref.into(),
1561 msg.ts_in_delta,
1562 msg.hd.ts_event.into(),
1563 ts_event,
1564 ts_init,
1565 ))
1566}
1567
1568#[cfg(test)]
1569mod tests {
1570 use std::path::{Path, PathBuf};
1571
1572 use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1573 use fallible_streaming_iterator::FallibleStreamingIterator;
1574 use nautilus_model::instruments::Instrument;
1575 use rstest::*;
1576
1577 use super::*;
1578
1579 fn test_data_path() -> PathBuf {
1580 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1581 }
1582
1583 #[rstest]
1584 #[case('Y' as c_char, Some(true))]
1585 #[case('N' as c_char, Some(false))]
1586 #[case('X' as c_char, None)]
1587 fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1588 assert_eq!(parse_optional_bool(input), expected);
1589 }
1590
1591 #[rstest]
1592 #[case('A' as c_char, OrderSide::Sell)]
1593 #[case('B' as c_char, OrderSide::Buy)]
1594 #[case('X' as c_char, OrderSide::NoOrderSide)]
1595 fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1596 assert_eq!(parse_order_side(input), expected);
1597 }
1598
1599 #[rstest]
1600 #[case('A' as c_char, AggressorSide::Seller)]
1601 #[case('B' as c_char, AggressorSide::Buyer)]
1602 #[case('X' as c_char, AggressorSide::NoAggressor)]
1603 fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1604 assert_eq!(parse_aggressor_side(input), expected);
1605 }
1606
1607 #[rstest]
1608 #[case('T' as c_char, true)]
1609 #[case('A' as c_char, false)]
1610 #[case('C' as c_char, false)]
1611 #[case('F' as c_char, false)]
1612 #[case('M' as c_char, false)]
1613 #[case('R' as c_char, false)]
1614 fn test_is_trade_msg(#[case] action: c_char, #[case] expected: bool) {
1615 assert_eq!(is_trade_msg(action), expected);
1616 }
1617
1618 #[rstest]
1619 fn test_derive_cmbp_trade_id_is_deterministic() {
1620 let instrument_id = InstrumentId::from("ES.c.0.GLBX");
1621 let first = derive_cmbp_trade_id(instrument_id, 1, 2, 100, 5, 'B' as c_char);
1622 let second = derive_cmbp_trade_id(instrument_id, 1, 2, 100, 5, 'B' as c_char);
1623 assert_eq!(first, second);
1624 }
1625
1626 #[rstest]
1627 fn test_derive_cmbp_trade_id_format_is_16_hex_chars() {
1628 let instrument_id = InstrumentId::from("ES.c.0.GLBX");
1629 let trade_id = derive_cmbp_trade_id(instrument_id, 0, 0, 0, 0, 'B' as c_char);
1630 let value = trade_id.as_str();
1631 assert_eq!(value.len(), 16);
1632 assert!(
1633 value
1634 .chars()
1635 .all(|c| c.is_ascii_hexdigit() && !c.is_uppercase())
1636 );
1637 }
1638
1639 #[rstest]
1640 #[case::ts_event_changed(
1641 derive_cmbp_trade_id(InstrumentId::from("ES.c.0.GLBX"), 2, 2, 100, 5, 'B' as c_char),
1642 )]
1643 #[case::ts_recv_changed(
1644 derive_cmbp_trade_id(InstrumentId::from("ES.c.0.GLBX"), 1, 3, 100, 5, 'B' as c_char),
1645 )]
1646 #[case::price_changed(
1647 derive_cmbp_trade_id(InstrumentId::from("ES.c.0.GLBX"), 1, 2, 101, 5, 'B' as c_char),
1648 )]
1649 #[case::size_changed(
1650 derive_cmbp_trade_id(InstrumentId::from("ES.c.0.GLBX"), 1, 2, 100, 6, 'B' as c_char),
1651 )]
1652 #[case::side_changed(
1653 derive_cmbp_trade_id(InstrumentId::from("ES.c.0.GLBX"), 1, 2, 100, 5, 'A' as c_char),
1654 )]
1655 #[case::instrument_changed(
1656 derive_cmbp_trade_id(InstrumentId::from("NQ.c.0.GLBX"), 1, 2, 100, 5, 'B' as c_char),
1657 )]
1658 fn test_derive_cmbp_trade_id_each_field_affects_output(#[case] altered: TradeId) {
1659 let baseline = derive_cmbp_trade_id(
1660 InstrumentId::from("ES.c.0.GLBX"),
1661 1,
1662 2,
1663 100,
1664 5,
1665 'B' as c_char,
1666 );
1667 assert_ne!(baseline, altered);
1668 }
1669
1670 #[rstest]
1671 fn test_derive_cmbp_trade_id_field_delimiter_prevents_collision() {
1672 let instrument_id = InstrumentId::from("ES.c.0.GLBX");
1673 let a = derive_cmbp_trade_id(instrument_id, 0x100, 0, 0, 0, 'B' as c_char);
1676 let b = derive_cmbp_trade_id(instrument_id, 0, 0x100, 0, 0, 'B' as c_char);
1677 assert_ne!(a, b);
1678 }
1679
1680 mod cmbp_trade_id_property_tests {
1681 use proptest::prelude::*;
1682 use rstest::rstest;
1683
1684 use super::*;
1685
1686 proptest! {
1687 #[rstest]
1688 fn prop_derive_cmbp_trade_id_is_stable_for_same_inputs(
1689 ts_event in any::<u64>(),
1690 ts_recv in any::<u64>(),
1691 price in any::<i64>(),
1692 size in any::<u32>(),
1693 side_byte in 0u8..128,
1694 ) {
1695 let instrument_id = InstrumentId::from("ES.c.0.GLBX");
1696 let side = side_byte as c_char;
1697
1698 let first = derive_cmbp_trade_id(
1699 instrument_id, ts_event, ts_recv, price, size, side,
1700 );
1701 let second = derive_cmbp_trade_id(
1702 instrument_id, ts_event, ts_recv, price, size, side,
1703 );
1704 prop_assert_eq!(first, second);
1705 }
1706
1707 #[rstest]
1708 fn prop_derive_cmbp_trade_id_output_is_16_hex_chars(
1709 ts_event in any::<u64>(),
1710 ts_recv in any::<u64>(),
1711 price in any::<i64>(),
1712 size in any::<u32>(),
1713 side_byte in 0u8..128,
1714 ) {
1715 let instrument_id = InstrumentId::from("ES.c.0.GLBX");
1716 let side = side_byte as c_char;
1717 let id = derive_cmbp_trade_id(
1718 instrument_id, ts_event, ts_recv, price, size, side,
1719 );
1720 let value = id.as_str();
1721 prop_assert_eq!(value.len(), 16);
1722 prop_assert!(value.chars().all(|c| c.is_ascii_hexdigit() && !c.is_uppercase()));
1723 }
1724 }
1725 }
1726
1727 #[rstest]
1728 #[case('A' as c_char, Ok(BookAction::Add))]
1729 #[case('C' as c_char, Ok(BookAction::Delete))]
1730 #[case('F' as c_char, Ok(BookAction::Update))]
1731 #[case('M' as c_char, Ok(BookAction::Update))]
1732 #[case('R' as c_char, Ok(BookAction::Clear))]
1733 #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1734 fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1735 match parse_book_action(input) {
1736 Ok(action) => assert_eq!(Ok(action), expected),
1737 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1738 }
1739 }
1740
1741 #[rstest]
1742 #[case('C' as c_char, Ok(OptionKind::Call))]
1743 #[case('P' as c_char, Ok(OptionKind::Put))]
1744 #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1745 fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1746 match parse_option_kind(input) {
1747 Ok(kind) => assert_eq!(Ok(kind), expected),
1748 Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1749 }
1750 }
1751
1752 #[rstest]
1753 #[case(Ok("USD"), Currency::USD())]
1754 #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1755 #[case(Ok(""), Currency::USD())]
1756 #[case(Err("Error"), Currency::USD())]
1757 fn test_parse_currency_or_usd_default(
1758 #[case] input: Result<&str, &'static str>, #[case] expected: Currency,
1760 ) {
1761 let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1762 assert_eq!(actual, expected);
1763 }
1764
1765 #[rstest]
1766 #[case("DII", (Some(AssetClass::Index), Some(InstrumentClass::Future)))]
1767 #[case("EII", (Some(AssetClass::Index), Some(InstrumentClass::Future)))]
1768 #[case("EIA", (Some(AssetClass::Equity), Some(InstrumentClass::Future)))]
1769 #[case("XXX", (None, None))]
1770 #[case("D", (None, None))]
1771 #[case("", (None, None))]
1772 fn test_parse_cfi_iso10926(
1773 #[case] input: &str,
1774 #[case] expected: (Option<AssetClass>, Option<InstrumentClass>),
1775 ) {
1776 let result = parse_cfi_iso10926(input);
1777 assert_eq!(result, expected);
1778 }
1779
1780 #[rstest]
1781 #[case(0, 2, Price::from_raw(0, 2))]
1782 #[case(
1783 1_000_000_000,
1784 2,
1785 Price::from_raw(decode_raw_price_i64(1_000_000_000), 2)
1786 )]
1787 fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1788 let actual = decode_price(value, precision, "test_field").unwrap();
1789 assert_eq!(actual, expected);
1790 }
1791
1792 #[rstest]
1793 fn test_decode_price_undefined_errors() {
1794 let result = decode_price(i64::MAX, 2, "strike_price");
1795 assert!(result.is_err());
1796 assert!(result.unwrap_err().to_string().contains("strike_price"));
1797 }
1798
1799 #[rstest]
1800 #[case(0, 0)]
1801 #[case(1, 9)] #[case(10, 8)] #[case(3_906_250, 8)] #[case(7_812_500, 7)] #[case(15_625_000, 6)] #[case(31_250_000, 5)] #[case(250_000_000, 2)] #[case(1_000_000_000, 0)] #[case(10_000_000_000, 0)] fn test_precision_from_raw(#[case] value: i64, #[case] expected: u8) {
1811 assert_eq!(precision_from_raw(value), expected);
1812 }
1813
1814 #[rstest]
1815 #[case(0, 2, Price::new(0.01, 2))] #[case(i64::MAX, 2, Price::new(0.01, 2))] #[case(
1818 10_000_000_000,
1819 2,
1820 Price::from_raw(decode_raw_price_i64(10_000_000_000), 2)
1821 )] #[case(3_906_250, 2, Price::from_raw(decode_raw_price_i64(3_906_250), 8))] #[case(7_812_500, 2, Price::from_raw(decode_raw_price_i64(7_812_500), 7))] #[case(15_625_000, 2, Price::from_raw(decode_raw_price_i64(15_625_000), 6))] #[case(31_250_000, 2, Price::from_raw(decode_raw_price_i64(31_250_000), 5))] #[case(250_000_000, 2, Price::from_raw(decode_raw_price_i64(250_000_000), 2))] fn test_decode_price_increment(
1828 #[case] value: i64,
1829 #[case] precision: u8,
1830 #[case] expected: Price,
1831 ) {
1832 let actual = decode_price_increment(value, precision);
1833 assert_eq!(actual, expected);
1834 }
1835
1836 #[rstest]
1837 #[case(i64::MAX, 2, None)] #[case(0, 2, Some(Price::from_raw(0, 2)))] #[case(
1840 10_000_000_000,
1841 2,
1842 Some(Price::from_raw(decode_raw_price_i64(10_000_000_000), 2))
1843 )]
1844 fn test_decode_optional_price(
1845 #[case] value: i64,
1846 #[case] precision: u8,
1847 #[case] expected: Option<Price>,
1848 ) {
1849 let actual = decode_optional_price(value, precision);
1850 assert_eq!(actual, expected);
1851 }
1852
1853 #[rstest]
1854 #[case(0, 2, Price::from_raw(0, 2))]
1855 #[case(
1856 1_000_000_000,
1857 2,
1858 Price::from_raw(decode_raw_price_i64(1_000_000_000), 2)
1859 )]
1860 #[case(i64::MAX, 2, Price::from_raw(PRICE_UNDEF, 0))] fn test_decode_price_or_undef(
1862 #[case] value: i64,
1863 #[case] precision: u8,
1864 #[case] expected: Price,
1865 ) {
1866 let actual = decode_price_or_undef(value, precision);
1867 assert_eq!(actual, expected);
1868 }
1869
1870 #[rstest]
1871 #[case(i64::MAX, None)] #[case(0, Some(Quantity::new(0.0, 0)))] #[case(10, Some(Quantity::new(10.0, 0)))] fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1875 let actual = decode_optional_quantity(value);
1876 assert_eq!(actual, expected);
1877 }
1878
1879 #[rstest]
1880 #[case(0, UnixNanos::from(0))]
1881 #[case(1_000_000_000, UnixNanos::from(1_000_000_000))]
1882 fn test_decode_timestamp(#[case] value: u64, #[case] expected: UnixNanos) {
1883 let actual = decode_timestamp(value, "test_field").unwrap();
1884 assert_eq!(actual, expected);
1885 }
1886
1887 #[rstest]
1888 fn test_decode_timestamp_undefined_errors() {
1889 let result = decode_timestamp(dbn::UNDEF_TIMESTAMP, "expiration");
1890 assert!(result.is_err());
1891 assert!(result.unwrap_err().to_string().contains("expiration"));
1892 }
1893
1894 #[rstest]
1895 #[case(0, Some(UnixNanos::from(0)))]
1896 #[case(1_000_000_000, Some(UnixNanos::from(1_000_000_000)))]
1897 #[case(dbn::UNDEF_TIMESTAMP, None)]
1898 fn test_decode_optional_timestamp(#[case] value: u64, #[case] expected: Option<UnixNanos>) {
1899 let actual = decode_optional_timestamp(value);
1900 assert_eq!(actual, expected);
1901 }
1902
1903 #[rstest]
1904 #[case(0, Quantity::from(1))] #[case(i64::MAX, Quantity::from(1))] #[case(50_000_000_000, Quantity::from("50"))] #[case(12_500_000_000, Quantity::from("12.5"))] #[case(1_000_000_000, Quantity::from("1"))] #[case(1, Quantity::from("0.000000001"))] #[case(1_000_000_001, Quantity::from("1.000000001"))] #[case(999_999_999, Quantity::from("0.999999999"))] #[case(123_456_789_000, Quantity::from("123.456789"))] fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1914 assert_eq!(decode_multiplier(raw).unwrap(), expected);
1915 }
1916
1917 #[rstest]
1918 #[case(-1_500_000_000)] #[case(-1)] #[case(-999_999_999)] fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1922 let result = decode_multiplier(raw);
1923 assert!(result.is_err());
1924 assert!(
1925 result
1926 .unwrap_err()
1927 .to_string()
1928 .contains("Invalid negative multiplier")
1929 );
1930 }
1931
1932 #[rstest]
1933 #[case(100, Quantity::from(100))]
1934 #[case(1000, Quantity::from(1000))]
1935 #[case(5, Quantity::from(5))]
1936 fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1937 assert_eq!(decode_quantity(value), expected);
1938 }
1939
1940 #[rstest]
1941 #[case(0, Quantity::from(1))] #[case(i32::MAX, Quantity::from(1))] #[case(100, Quantity::from(100))]
1944 #[case(1, Quantity::from(1))]
1945 #[case(1000, Quantity::from(1000))]
1946 fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1947 assert_eq!(decode_lot_size(value), expected);
1948 }
1949
1950 #[rstest]
1951 #[case(0, None)] #[case(1, Some(Ustr::from("Scheduled")))]
1953 #[case(2, Some(Ustr::from("Surveillance intervention")))]
1954 #[case(3, Some(Ustr::from("Market event")))]
1955 #[case(10, Some(Ustr::from("Regulatory")))]
1956 #[case(30, Some(Ustr::from("News pending")))]
1957 #[case(40, Some(Ustr::from("Order imbalance")))]
1958 #[case(50, Some(Ustr::from("LULD pause")))]
1959 #[case(60, Some(Ustr::from("Operational")))]
1960 #[case(100, Some(Ustr::from("Corporate action")))]
1961 #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1962 fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1963 assert_eq!(parse_status_reason(value).unwrap(), expected);
1964 }
1965
1966 #[rstest]
1967 #[case(999)] fn test_parse_status_reason_invalid(#[case] value: u16) {
1969 assert!(parse_status_reason(value).is_err());
1970 }
1971
1972 #[rstest]
1973 #[case(0, None)] #[case(1, Some(Ustr::from("No cancel")))]
1975 #[case(2, Some(Ustr::from("Change trading session")))]
1976 #[case(3, Some(Ustr::from("Implied matching on")))]
1977 #[case(4, Some(Ustr::from("Implied matching off")))]
1978 fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1979 assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1980 }
1981
1982 #[rstest]
1983 #[case(5)] #[case(100)] fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1986 assert!(parse_status_trading_event(value).is_err());
1987 }
1988
1989 #[rstest]
1990 fn test_decode_mbo_msg() {
1991 let path = test_data_path().join("test_data.mbo.dbn.zst");
1992 let mut dbn_stream = Decoder::from_zstd_file(path)
1993 .unwrap()
1994 .decode_stream::<dbn::MboMsg>();
1995 let msg = dbn_stream.next().unwrap().unwrap();
1996
1997 let instrument_id = InstrumentId::from("ESM4.GLBX");
1998 let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1999 let delta = delta.unwrap();
2000
2001 assert_eq!(delta.instrument_id, instrument_id);
2002 assert_eq!(delta.action, BookAction::Delete);
2003 assert_eq!(delta.order.side, OrderSide::Sell);
2004 assert_eq!(delta.order.price, Price::from("3722.75"));
2005 assert_eq!(delta.order.size, Quantity::from("1"));
2006 assert_eq!(delta.order.order_id, 647_784_973_705);
2007 assert_eq!(delta.flags, 128);
2008 assert_eq!(delta.sequence, 1_170_352);
2009 assert_eq!(delta.ts_event, msg.ts_recv);
2010 assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
2011 assert_eq!(delta.ts_init, 0);
2012 }
2013
2014 #[rstest]
2015 fn test_decode_mbo_msg_clear_action() {
2016 let ts_recv = 1_609_160_400_000_000_000;
2018 let msg = dbn::MboMsg {
2019 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
2020 order_id: 0,
2021 price: i64::MAX,
2022 size: 0,
2023 flags: dbn::FlagSet::empty(),
2024 channel_id: 0,
2025 action: 'R' as c_char,
2026 side: 'N' as c_char, ts_recv,
2028 ts_in_delta: 0,
2029 sequence: 1_000_000,
2030 };
2031
2032 let instrument_id = InstrumentId::from("ESM4.GLBX");
2033 let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
2034
2035 assert!(trade.is_none());
2037 let delta = delta.expect("Clear action should produce OrderBookDelta");
2038
2039 assert_eq!(delta.instrument_id, instrument_id);
2040 assert_eq!(delta.action, BookAction::Clear);
2041 assert_eq!(delta.order.side, OrderSide::NoOrderSide);
2042 assert_eq!(delta.order.size, Quantity::from("0"));
2043 assert_eq!(delta.order.order_id, 0);
2044 assert_eq!(delta.sequence, 1_000_000);
2045 assert_eq!(delta.ts_event, ts_recv);
2046 assert_eq!(delta.ts_init, 0);
2047 assert!(delta.order.price.is_undefined());
2048 assert_eq!(delta.order.price.precision, 0);
2049 }
2050
2051 #[rstest]
2052 fn test_decode_mbo_msg_price_undef_with_precision() {
2053 let ts_recv = 1_609_160_400_000_000_000;
2055 let msg = dbn::MboMsg {
2056 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
2057 order_id: 0,
2058 price: i64::MAX, size: 0,
2060 flags: dbn::FlagSet::empty(),
2061 channel_id: 0,
2062 action: 'R' as c_char, side: 'N' as c_char, ts_recv,
2065 ts_in_delta: 0,
2066 sequence: 0,
2067 };
2068
2069 let instrument_id = InstrumentId::from("ESM4.GLBX");
2070 let (delta, _) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
2071 let delta = delta.unwrap();
2072
2073 assert!(delta.order.price.is_undefined());
2074 assert_eq!(delta.order.price.precision, 0);
2075 assert_eq!(delta.order.price.raw, PRICE_UNDEF);
2076 }
2077
2078 #[rstest]
2079 fn test_decode_mbo_msg_no_order_side_update() {
2080 let ts_recv = 1_609_160_400_000_000_000;
2083 let msg = dbn::MboMsg {
2084 hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
2085 order_id: 123_456_789,
2086 price: 4_800_250_000_000, size: 1,
2088 flags: dbn::FlagSet::empty(),
2089 channel_id: 1,
2090 action: 'M' as c_char, side: 'N' as c_char, ts_recv,
2093 ts_in_delta: 0,
2094 sequence: 1_000_000,
2095 };
2096
2097 let instrument_id = InstrumentId::from("ESM4.GLBX");
2098 let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
2099
2100 assert!(delta.is_some());
2102 assert!(trade.is_none());
2103 let delta = delta.unwrap();
2104 assert_eq!(delta.order.side, OrderSide::NoOrderSide);
2105 assert_eq!(delta.order.order_id, 123_456_789);
2106 assert_eq!(delta.action, BookAction::Update);
2107 }
2108
2109 #[rstest]
2110 fn test_decode_mbp1_msg() {
2111 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
2112 let mut dbn_stream = Decoder::from_zstd_file(path)
2113 .unwrap()
2114 .decode_stream::<dbn::Mbp1Msg>();
2115 let msg = dbn_stream.next().unwrap().unwrap();
2116
2117 let instrument_id = InstrumentId::from("ESM4.GLBX");
2118 let (maybe_quote, _) =
2119 decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
2120 let quote = maybe_quote.expect("Expected valid quote");
2121
2122 assert_eq!(quote.instrument_id, instrument_id);
2123 assert_eq!(quote.bid_price, Price::from("3720.25"));
2124 assert_eq!(quote.ask_price, Price::from("3720.50"));
2125 assert_eq!(quote.bid_size, Quantity::from("24"));
2126 assert_eq!(quote.ask_size, Quantity::from("11"));
2127 assert_eq!(quote.ts_event, msg.ts_recv);
2128 assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
2129 assert_eq!(quote.ts_init, 0);
2130 }
2131
2132 #[rstest]
2133 fn test_decode_mbp1_msg_undefined_ask_skips_quote() {
2134 let ts_recv = 1_609_160_400_000_000_000;
2135 let msg = dbn::Mbp1Msg {
2136 hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
2137 price: 3_720_250_000_000, size: 5,
2139 action: 'A' as c_char,
2140 side: 'B' as c_char,
2141 flags: dbn::FlagSet::empty(),
2142 depth: 0,
2143 ts_recv,
2144 ts_in_delta: 0,
2145 sequence: 1_170_352,
2146 levels: [dbn::BidAskPair {
2147 bid_px: 3_720_250_000_000, ask_px: i64::MAX, bid_sz: 24,
2150 ask_sz: 0,
2151 bid_ct: 1,
2152 ask_ct: 0,
2153 }],
2154 };
2155
2156 let instrument_id = InstrumentId::from("ESM4.GLBX");
2157 let (maybe_quote, _) =
2158 decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
2159
2160 assert!(maybe_quote.is_none());
2162 }
2163
2164 #[rstest]
2165 fn test_decode_mbp1_msg_undefined_bid_skips_quote() {
2166 let ts_recv = 1_609_160_400_000_000_000;
2167 let msg = dbn::Mbp1Msg {
2168 hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
2169 price: 3_720_500_000_000, size: 5,
2171 action: 'A' as c_char,
2172 side: 'A' as c_char,
2173 flags: dbn::FlagSet::empty(),
2174 depth: 0,
2175 ts_recv,
2176 ts_in_delta: 0,
2177 sequence: 1_170_352,
2178 levels: [dbn::BidAskPair {
2179 bid_px: i64::MAX, ask_px: 3_720_500_000_000, bid_sz: 0,
2182 ask_sz: 11,
2183 bid_ct: 0,
2184 ask_ct: 1,
2185 }],
2186 };
2187
2188 let instrument_id = InstrumentId::from("ESM4.GLBX");
2189 let (maybe_quote, _) =
2190 decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
2191
2192 assert!(maybe_quote.is_none());
2194 }
2195
2196 #[rstest]
2197 fn test_decode_mbp1_msg_trade_still_returned_with_undefined_prices() {
2198 let ts_recv = 1_609_160_400_000_000_000;
2199 let msg = dbn::Mbp1Msg {
2200 hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
2201 price: 3_720_250_000_000, size: 5,
2203 action: 'T' as c_char, side: 'A' as c_char,
2205 flags: dbn::FlagSet::empty(),
2206 depth: 0,
2207 ts_recv,
2208 ts_in_delta: 0,
2209 sequence: 1_170_352,
2210 levels: [dbn::BidAskPair {
2211 bid_px: i64::MAX, ask_px: i64::MAX, bid_sz: 0,
2214 ask_sz: 0,
2215 bid_ct: 0,
2216 ask_ct: 0,
2217 }],
2218 };
2219
2220 let instrument_id = InstrumentId::from("ESM4.GLBX");
2221 let (maybe_quote, maybe_trade) =
2222 decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), true).unwrap();
2223
2224 assert!(maybe_quote.is_none());
2226
2227 let trade = maybe_trade.expect("Expected trade");
2229 assert_eq!(trade.instrument_id, instrument_id);
2230 assert_eq!(trade.price, Price::from("3720.25"));
2231 assert_eq!(trade.size, Quantity::from("5"));
2232 }
2233
2234 #[rstest]
2235 fn test_decode_bbo_1s_msg() {
2236 let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
2237 let mut dbn_stream = Decoder::from_zstd_file(path)
2238 .unwrap()
2239 .decode_stream::<dbn::BboMsg>();
2240 let msg = dbn_stream.next().unwrap().unwrap();
2241
2242 let instrument_id = InstrumentId::from("ESM4.GLBX");
2243 let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2244 let quote = maybe_quote.expect("Expected valid quote");
2245
2246 assert_eq!(quote.instrument_id, instrument_id);
2247 assert_eq!(quote.bid_price, Price::from("3702.25"));
2248 assert_eq!(quote.ask_price, Price::from("3702.75"));
2249 assert_eq!(quote.bid_size, Quantity::from("18"));
2250 assert_eq!(quote.ask_size, Quantity::from("13"));
2251 assert_eq!(quote.ts_event, msg.ts_recv);
2252 assert_eq!(quote.ts_event, 1609113600000000000);
2253 assert_eq!(quote.ts_init, 0);
2254 }
2255
2256 #[rstest]
2257 fn test_decode_bbo_1m_msg() {
2258 let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
2259 let mut dbn_stream = Decoder::from_zstd_file(path)
2260 .unwrap()
2261 .decode_stream::<dbn::BboMsg>();
2262 let msg = dbn_stream.next().unwrap().unwrap();
2263
2264 let instrument_id = InstrumentId::from("ESM4.GLBX");
2265 let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2266 let quote = maybe_quote.expect("Expected valid quote");
2267
2268 assert_eq!(quote.instrument_id, instrument_id);
2269 assert_eq!(quote.bid_price, Price::from("3702.25"));
2270 assert_eq!(quote.ask_price, Price::from("3702.75"));
2271 assert_eq!(quote.bid_size, Quantity::from("18"));
2272 assert_eq!(quote.ask_size, Quantity::from("13"));
2273 assert_eq!(quote.ts_event, msg.ts_recv);
2274 assert_eq!(quote.ts_event, 1609113600000000000);
2275 assert_eq!(quote.ts_init, 0);
2276 }
2277
2278 #[rstest]
2279 fn test_decode_mbp10_msg() {
2280 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
2281 let mut dbn_stream = Decoder::from_zstd_file(path)
2282 .unwrap()
2283 .decode_stream::<dbn::Mbp10Msg>();
2284 let msg = dbn_stream.next().unwrap().unwrap();
2285
2286 let instrument_id = InstrumentId::from("ESM4.GLBX");
2287 let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2288
2289 assert_eq!(depth10.instrument_id, instrument_id);
2290 assert_eq!(depth10.bids.len(), 10);
2291 assert_eq!(depth10.asks.len(), 10);
2292 assert_eq!(depth10.bid_counts.len(), 10);
2293 assert_eq!(depth10.ask_counts.len(), 10);
2294 assert_eq!(depth10.flags, 128);
2295 assert_eq!(depth10.sequence, 1_170_352);
2296 assert_eq!(depth10.ts_event, msg.ts_recv);
2297 assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
2298 assert_eq!(depth10.ts_init, 0);
2299 }
2300
2301 #[rstest]
2302 fn test_decode_trade_msg() {
2303 let path = test_data_path().join("test_data.trades.dbn.zst");
2304 let mut dbn_stream = Decoder::from_zstd_file(path)
2305 .unwrap()
2306 .decode_stream::<dbn::TradeMsg>();
2307 let msg = dbn_stream.next().unwrap().unwrap();
2308
2309 let instrument_id = InstrumentId::from("ESM4.GLBX");
2310 let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2311
2312 assert_eq!(trade.instrument_id, instrument_id);
2313 assert_eq!(trade.price, Price::from("3720.25"));
2314 assert_eq!(trade.size, Quantity::from("5"));
2315 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2316 assert_eq!(trade.trade_id.to_string(), "1170380");
2317 assert_eq!(trade.ts_event, msg.ts_recv);
2318 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2319 assert_eq!(trade.ts_init, 0);
2320 }
2321
2322 #[rstest]
2323 fn test_decode_tbbo_msg() {
2324 let path = test_data_path().join("test_data.tbbo.dbn.zst");
2325 let mut dbn_stream = Decoder::from_zstd_file(path)
2326 .unwrap()
2327 .decode_stream::<dbn::Mbp1Msg>();
2328 let msg = dbn_stream.next().unwrap().unwrap();
2329
2330 let instrument_id = InstrumentId::from("ESM4.GLBX");
2331 let (maybe_quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2332 let quote = maybe_quote.expect("Expected valid quote");
2333
2334 assert_eq!(quote.instrument_id, instrument_id);
2335 assert_eq!(quote.bid_price, Price::from("3720.25"));
2336 assert_eq!(quote.ask_price, Price::from("3720.50"));
2337 assert_eq!(quote.bid_size, Quantity::from("26"));
2338 assert_eq!(quote.ask_size, Quantity::from("7"));
2339 assert_eq!(quote.ts_event, msg.ts_recv);
2340 assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
2341 assert_eq!(quote.ts_init, 0);
2342
2343 assert_eq!(trade.instrument_id, instrument_id);
2344 assert_eq!(trade.price, Price::from("3720.25"));
2345 assert_eq!(trade.size, Quantity::from("5"));
2346 assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2347 assert_eq!(trade.trade_id.to_string(), "1170380");
2348 assert_eq!(trade.ts_event, msg.ts_recv);
2349 assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2350 assert_eq!(trade.ts_init, 0);
2351 }
2352
2353 #[rstest]
2354 fn test_decode_ohlcv_msg() {
2355 let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
2356 let mut dbn_stream = Decoder::from_zstd_file(path)
2357 .unwrap()
2358 .decode_stream::<dbn::OhlcvMsg>();
2359 let msg = dbn_stream.next().unwrap().unwrap();
2360
2361 let instrument_id = InstrumentId::from("ESM4.GLBX");
2362 let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2363
2364 assert_eq!(
2365 bar.bar_type,
2366 BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
2367 );
2368 assert_eq!(bar.open, Price::from("372025.00"));
2369 assert_eq!(bar.high, Price::from("372050.00"));
2370 assert_eq!(bar.low, Price::from("372025.00"));
2371 assert_eq!(bar.close, Price::from("372050.00"));
2372 assert_eq!(bar.volume, Quantity::from("57"));
2373 assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); assert_eq!(bar.ts_init, 0); }
2376
2377 #[rstest]
2378 fn test_decode_definition_msg() {
2379 let path = test_data_path().join("test_data.definition.dbn.zst");
2380 let mut dbn_stream = Decoder::from_zstd_file(path)
2381 .unwrap()
2382 .decode_stream::<dbn::InstrumentDefMsg>();
2383 let msg = dbn_stream.next().unwrap().unwrap();
2384
2385 let instrument_id = InstrumentId::from("ESM4.GLBX");
2386 let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
2387
2388 assert!(result.is_ok());
2389 assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
2390 }
2391
2392 #[rstest]
2393 fn test_decode_status_msg() {
2394 let path = test_data_path().join("test_data.status.dbn.zst");
2395 let mut dbn_stream = Decoder::from_zstd_file(path)
2396 .unwrap()
2397 .decode_stream::<dbn::StatusMsg>();
2398 let msg = dbn_stream.next().unwrap().unwrap();
2399
2400 let instrument_id = InstrumentId::from("ESM4.GLBX");
2401 let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
2402
2403 assert_eq!(status.instrument_id, instrument_id);
2404 assert_eq!(status.action, MarketStatusAction::Trading);
2405 assert_eq!(status.ts_event, msg.hd.ts_event);
2406 assert_eq!(status.ts_init, 0);
2407 assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
2408 assert_eq!(status.trading_event, None);
2409 assert_eq!(status.is_trading, Some(true));
2410 assert_eq!(status.is_quoting, Some(true));
2411 assert_eq!(status.is_short_sell_restricted, None);
2412 }
2413
2414 #[rstest]
2415 fn test_decode_imbalance_msg() {
2416 let path = test_data_path().join("test_data.imbalance.dbn.zst");
2417 let mut dbn_stream = Decoder::from_zstd_file(path)
2418 .unwrap()
2419 .decode_stream::<dbn::ImbalanceMsg>();
2420 let msg = dbn_stream.next().unwrap().unwrap();
2421
2422 let instrument_id = InstrumentId::from("ESM4.GLBX");
2423 let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2424
2425 assert_eq!(imbalance.instrument_id, instrument_id);
2426 assert_eq!(imbalance.ref_price, Price::from("229.43"));
2427 assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
2428 assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
2429 assert_eq!(imbalance.paired_qty, Quantity::from("0"));
2430 assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
2431 assert_eq!(imbalance.side, OrderSide::Buy);
2432 assert_eq!(imbalance.significant_imbalance, 126);
2433 assert_eq!(imbalance.ts_event, msg.hd.ts_event);
2434 assert_eq!(imbalance.ts_recv, msg.ts_recv);
2435 assert_eq!(imbalance.ts_init, 0);
2436 }
2437
2438 #[rstest]
2439 fn test_decode_statistics_msg() {
2440 let path = test_data_path().join("test_data.statistics.dbn.zst");
2441 let mut dbn_stream = Decoder::from_zstd_file(path)
2442 .unwrap()
2443 .decode_stream::<dbn::StatMsg>();
2444 let msg = dbn_stream.next().unwrap().unwrap();
2445
2446 let instrument_id = InstrumentId::from("ESM4.GLBX");
2447 let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2448
2449 assert_eq!(statistics.instrument_id, instrument_id);
2450 assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
2451 assert_eq!(
2452 statistics.update_action,
2453 DatabentoStatisticUpdateAction::Added
2454 );
2455 assert_eq!(statistics.price, Some(Price::from("100.00")));
2456 assert_eq!(statistics.quantity, None);
2457 assert_eq!(statistics.channel_id, 13);
2458 assert_eq!(statistics.stat_flags, 255);
2459 assert_eq!(statistics.sequence, 2);
2460 assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
2461 assert_eq!(statistics.ts_in_delta, 26961);
2462 assert_eq!(statistics.ts_event, msg.hd.ts_event);
2463 assert_eq!(statistics.ts_recv, msg.ts_recv);
2464 assert_eq!(statistics.ts_init, 0);
2465 }
2466
2467 #[rstest]
2468 fn test_decode_cmbp1_msg() {
2469 let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
2470 let mut dbn_stream = Decoder::from_zstd_file(path)
2471 .unwrap()
2472 .decode_stream::<dbn::Cmbp1Msg>();
2473 let msg = dbn_stream.next().unwrap().unwrap();
2474
2475 let instrument_id = InstrumentId::from("ESM4.GLBX");
2476 let (maybe_quote, trade) =
2477 decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2478 let quote = maybe_quote.expect("Expected valid quote");
2479
2480 assert_eq!(quote.instrument_id, instrument_id);
2481 assert!(quote.bid_price.raw > 0);
2482 assert!(quote.ask_price.raw > 0);
2483 assert!(quote.bid_size.raw > 0);
2484 assert!(quote.ask_size.raw > 0);
2485 assert_eq!(quote.ts_event, msg.ts_recv);
2486 assert_eq!(quote.ts_init, 0);
2487
2488 if is_trade_msg(msg.action) {
2490 assert!(trade.is_some());
2491 let trade = trade.unwrap();
2492 assert_eq!(trade.instrument_id, instrument_id);
2493 } else {
2494 assert!(trade.is_none());
2495 }
2496 }
2497
2498 #[rstest]
2499 fn test_decode_cbbo_1s_msg() {
2500 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2501 let mut dbn_stream = Decoder::from_zstd_file(path)
2502 .unwrap()
2503 .decode_stream::<dbn::CbboMsg>();
2504 let msg = dbn_stream.next().unwrap().unwrap();
2505
2506 let instrument_id = InstrumentId::from("ESM4.GLBX");
2507 let maybe_quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2508 let quote = maybe_quote.expect("Expected valid quote");
2509
2510 assert_eq!(quote.instrument_id, instrument_id);
2511 assert!(quote.bid_price.raw > 0);
2512 assert!(quote.ask_price.raw > 0);
2513 assert!(quote.bid_size.raw > 0);
2514 assert!(quote.ask_size.raw > 0);
2515 assert_eq!(quote.ts_event, msg.ts_recv);
2516 assert_eq!(quote.ts_init, 0);
2517 }
2518
2519 #[rstest]
2520 fn test_decode_mbp10_msg_with_all_levels() {
2521 let mut msg = dbn::Mbp10Msg::default();
2522 for i in 0..10 {
2523 msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
2524 msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
2525 msg.levels[i].bid_sz = 10 + i as u32;
2526 msg.levels[i].ask_sz = 10 + i as u32;
2527 msg.levels[i].bid_ct = 1 + i as u32;
2528 msg.levels[i].ask_ct = 1 + i as u32;
2529 }
2530 msg.ts_recv = 1_609_160_400_000_704_060;
2531
2532 let instrument_id = InstrumentId::from("TEST.VENUE");
2533 let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
2534
2535 assert!(result.is_ok());
2536 let depth = result.unwrap();
2537 assert_eq!(depth.bids.len(), 10);
2538 assert_eq!(depth.asks.len(), 10);
2539 assert_eq!(depth.bid_counts.len(), 10);
2540 assert_eq!(depth.ask_counts.len(), 10);
2541 }
2542
2543 #[rstest]
2544 fn test_array_conversion_error_handling() {
2545 let mut bids = Vec::new();
2546 let mut asks = Vec::new();
2547
2548 for i in 0..5 {
2550 bids.push(BookOrder::new(
2551 OrderSide::Buy,
2552 Price::from(format!("{}.00", 100 - i)),
2553 Quantity::from(10),
2554 i as u64,
2555 ));
2556 asks.push(BookOrder::new(
2557 OrderSide::Sell,
2558 Price::from(format!("{}.00", 101 + i)),
2559 Quantity::from(10),
2560 i as u64,
2561 ));
2562 }
2563
2564 let result: Result<[BookOrder; DEPTH10_LEN], _> =
2565 bids.try_into().map_err(|v: Vec<BookOrder>| {
2566 anyhow::anyhow!(
2567 "Expected exactly {DEPTH10_LEN} bid levels, received {}",
2568 v.len()
2569 )
2570 });
2571 assert!(result.is_err());
2572 assert!(
2573 result
2574 .unwrap_err()
2575 .to_string()
2576 .contains("Expected exactly 10 bid levels, received 5")
2577 );
2578 }
2579
2580 #[rstest]
2581 fn test_decode_tcbbo_msg() {
2582 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2584 let mut dbn_stream = Decoder::from_zstd_file(path)
2585 .unwrap()
2586 .decode_stream::<dbn::CbboMsg>();
2587 let msg = dbn_stream.next().unwrap().unwrap();
2588
2589 let mut tcbbo_msg = msg.clone();
2591 tcbbo_msg.price = 3702500000000;
2592 tcbbo_msg.size = 10;
2593
2594 let instrument_id = InstrumentId::from("ESM4.GLBX");
2595 let (maybe_quote, trade) =
2596 decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
2597 let quote = maybe_quote.expect("Expected valid quote");
2598
2599 assert_eq!(quote.instrument_id, instrument_id);
2600 assert!(quote.bid_price.raw > 0);
2601 assert!(quote.ask_price.raw > 0);
2602 assert!(quote.bid_size.raw > 0);
2603 assert!(quote.ask_size.raw > 0);
2604 assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
2605 assert_eq!(quote.ts_init, 0);
2606
2607 assert_eq!(trade.instrument_id, instrument_id);
2608 assert_eq!(trade.price, Price::from("3702.50"));
2609 assert_eq!(trade.size, Quantity::from(10));
2610 assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
2611 assert_eq!(trade.ts_init, 0);
2612 }
2613
2614 #[rstest]
2615 fn test_decode_bar_type() {
2616 let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2617 let instrument_id = InstrumentId::from("ESM4.GLBX");
2618
2619 msg.hd.rtype = 32;
2621 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2622 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
2623
2624 msg.hd.rtype = 33;
2626 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2627 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2628
2629 msg.hd.rtype = 34;
2631 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2632 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2633
2634 msg.hd.rtype = 35;
2636 let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2637 assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2638
2639 msg.hd.rtype = 99;
2641 let result = decode_bar_type(&msg, instrument_id);
2642 assert!(result.is_err());
2643 }
2644
2645 #[rstest]
2646 fn test_decode_ts_event_adjustment() {
2647 let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2648
2649 msg.hd.rtype = 32;
2651 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2652 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2653
2654 msg.hd.rtype = 33;
2656 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2657 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2658
2659 msg.hd.rtype = 34;
2661 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2662 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2663
2664 msg.hd.rtype = 35;
2666 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2667 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2668
2669 msg.hd.rtype = 36;
2671 let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2672 assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2673
2674 msg.hd.rtype = 99;
2676 let result = decode_ts_event_adjustment(&msg);
2677 assert!(result.is_err());
2678 }
2679
2680 #[rstest]
2681 fn test_decode_record() {
2682 let path = test_data_path().join("test_data.mbo.dbn.zst");
2684 let decoder = Decoder::from_zstd_file(path).unwrap();
2685 let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2686 let msg = dbn_stream.next().unwrap().unwrap();
2687
2688 let record_ref = dbn::RecordRef::from(msg);
2689 let instrument_id = InstrumentId::from("ESM4.GLBX");
2690
2691 let (data1, data2) =
2692 decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2693
2694 assert!(data1.is_some());
2695 assert!(data2.is_none());
2696
2697 let path = test_data_path().join("test_data.trades.dbn.zst");
2699 let decoder = Decoder::from_zstd_file(path).unwrap();
2700 let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2701 let msg = dbn_stream.next().unwrap().unwrap();
2702
2703 let record_ref = dbn::RecordRef::from(msg);
2704
2705 let (data1, data2) =
2706 decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2707
2708 assert!(data1.is_some());
2709 assert!(data2.is_none());
2710 assert!(matches!(data1.unwrap(), Data::Trade(_)));
2711 }
2712}