Skip to main content

nautilus_databento/
decode.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Databento message decoding functions.
17//!
18//! # Sentinel Values
19//!
20//! Databento uses sentinel values to represent undefined/null fields:
21//!
22//! | Sentinel          | Value      | Usage                       |
23//! |-------------------|------------|-----------------------------|
24//! | `UNDEF_PRICE`     | `i64::MAX` | Undefined price fields.     |
25//! | `UNDEF_TIMESTAMP` | `u64::MAX` | Undefined timestamp fields. |
26//!
27//! # Fields Potentially Undefined
28//!
29//! According to Databento documentation, the following fields can contain sentinel values:
30//!
31//! | Message Type       | Field                          | Handling                           |
32//! |--------------------|--------------------------------|------------------------------------|
33//! | `MboMsg`           | `price`                        | Passed through as `PRICE_UNDEF`.   |
34//! | `TradeMsg`         | `price`                        | Passed through as `PRICE_UNDEF`.   |
35//! | `OhlcvMsg`         | `open`, `high`, `low`, `close` | Passed through as `PRICE_UNDEF`.   |
36//! | `Mbp1Msg`          | `bid_px`, `ask_px`             | Quote skipped if either undefined. |
37//! | `InstrumentDefMsg` | `activation`                   | Defaults to 0 (epoch).             |
38//! | `InstrumentDefMsg` | `expiration`                   | Returns error if undefined.        |
39//! | `InstrumentDefMsg` | `strike_price`                 | Returns error if undefined.        |
40//!
41//! # References
42//!
43//! - [`UNDEF_PRICE`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_PRICE.html)
44//! - [`UNDEF_TIMESTAMP`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_TIMESTAMP.html)
45//! - [Databento DBN Schema](https://databento.com/docs/schemas)
46
47use std::{ffi::c_char, num::NonZeroUsize};
48
49use databento::dbn;
50use nautilus_core::{UnixNanos, datetime::NANOSECONDS_IN_SECOND};
51use nautilus_model::{
52    data::{
53        Bar, BarSpecification, BarType, BookOrder, DEPTH10_LEN, Data, InstrumentStatus,
54        OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
55    },
56    enums::{
57        AggregationSource, AggressorSide, AssetClass, BarAggregation, BookAction, FromU8, FromU16,
58        InstrumentClass, MarketStatusAction, OptionKind, OrderSide, PriceType,
59    },
60    identifiers::{InstrumentId, Symbol, TradeId},
61    instruments::{
62        Equity, FuturesContract, FuturesSpread, InstrumentAny, OptionContract, OptionSpread,
63    },
64    types::{
65        Currency, Price, Quantity,
66        price::{PRICE_UNDEF, decode_raw_price_i64},
67    },
68};
69use ustr::Ustr;
70
71use super::{
72    enums::{DatabentoStatisticType, DatabentoStatisticUpdateAction},
73    types::{DatabentoImbalance, DatabentoStatistics},
74};
75
76const STEP_ONE: NonZeroUsize = NonZeroUsize::new(1).unwrap();
77
78const BAR_SPEC_1S: BarSpecification = BarSpecification {
79    step: STEP_ONE,
80    aggregation: BarAggregation::Second,
81    price_type: PriceType::Last,
82};
83const BAR_SPEC_1M: BarSpecification = BarSpecification {
84    step: STEP_ONE,
85    aggregation: BarAggregation::Minute,
86    price_type: PriceType::Last,
87};
88const BAR_SPEC_1H: BarSpecification = BarSpecification {
89    step: STEP_ONE,
90    aggregation: BarAggregation::Hour,
91    price_type: PriceType::Last,
92};
93const BAR_SPEC_1D: BarSpecification = BarSpecification {
94    step: STEP_ONE,
95    aggregation: BarAggregation::Day,
96    price_type: PriceType::Last,
97};
98
99const BAR_CLOSE_ADJUSTMENT_1S: u64 = NANOSECONDS_IN_SECOND;
100const BAR_CLOSE_ADJUSTMENT_1M: u64 = NANOSECONDS_IN_SECOND * 60;
101const BAR_CLOSE_ADJUSTMENT_1H: u64 = NANOSECONDS_IN_SECOND * 60 * 60;
102const BAR_CLOSE_ADJUSTMENT_1D: u64 = NANOSECONDS_IN_SECOND * 60 * 60 * 24;
103
104// FNV-1a 64-bit constants (see http://www.isthe.com/chongo/tech/comp/fnv/).
105const FNV_OFFSET_BASIS: u64 = 0xcbf2_9ce4_8422_2325;
106const FNV_PRIME: u64 = 0x0100_0000_01b3;
107
108#[must_use]
109pub const fn parse_optional_bool(c: c_char) -> Option<bool> {
110    match c as u8 as char {
111        'Y' => Some(true),
112        'N' => Some(false),
113        _ => None,
114    }
115}
116
117#[must_use]
118pub const fn parse_order_side(c: c_char) -> OrderSide {
119    match c as u8 as char {
120        'A' => OrderSide::Sell,
121        'B' => OrderSide::Buy,
122        _ => OrderSide::NoOrderSide,
123    }
124}
125
126#[must_use]
127pub const fn parse_aggressor_side(c: c_char) -> AggressorSide {
128    match c as u8 as char {
129        'A' => AggressorSide::Seller,
130        'B' => AggressorSide::Buyer,
131        _ => AggressorSide::NoAggressor,
132    }
133}
134
135fn fnv1a_mix(hash: &mut u64, bytes: &[u8]) {
136    for &byte in bytes {
137        *hash ^= u64::from(byte);
138        *hash = hash.wrapping_mul(FNV_PRIME);
139    }
140    *hash ^= 0xff;
141    *hash = hash.wrapping_mul(FNV_PRIME);
142}
143
144/// Derives a deterministic [`TradeId`] for Databento schemas that do not
145/// publish a native trade identifier (e.g. CMBP1, TCBBO).
146///
147/// The hash combines the instrument, timestamps, price, size and aggressor
148/// side so that replayed data yields the same identifier across runs.
149fn derive_cmbp_trade_id(
150    instrument_id: InstrumentId,
151    ts_event: u64,
152    ts_recv: u64,
153    price: i64,
154    size: u32,
155    side: c_char,
156) -> TradeId {
157    let mut hash: u64 = FNV_OFFSET_BASIS;
158    fnv1a_mix(&mut hash, instrument_id.to_string().as_bytes());
159    fnv1a_mix(&mut hash, &ts_event.to_le_bytes());
160    fnv1a_mix(&mut hash, &ts_recv.to_le_bytes());
161    fnv1a_mix(&mut hash, &price.to_le_bytes());
162    fnv1a_mix(&mut hash, &size.to_le_bytes());
163    fnv1a_mix(&mut hash, &[side as u8]);
164    TradeId::new(format!("{hash:016x}"))
165}
166
167/// Parses a Databento book action character into a `BookAction` enum.
168///
169/// # Errors
170///
171/// Returns an error if `c` is not a valid `BookAction` character.
172pub fn parse_book_action(c: c_char) -> anyhow::Result<BookAction> {
173    match c as u8 as char {
174        'A' => Ok(BookAction::Add),
175        'C' => Ok(BookAction::Delete),
176        'F' => Ok(BookAction::Update),
177        'M' => Ok(BookAction::Update),
178        'R' => Ok(BookAction::Clear),
179        invalid => anyhow::bail!("Invalid `BookAction`, was '{invalid}'"),
180    }
181}
182
183/// Parses a Databento option kind character into an `OptionKind` enum.
184///
185/// # Errors
186///
187/// Returns an error if `c` is not a valid `OptionKind` character.
188pub fn parse_option_kind(c: c_char) -> anyhow::Result<OptionKind> {
189    match c as u8 as char {
190        'C' => Ok(OptionKind::Call),
191        'P' => Ok(OptionKind::Put),
192        invalid => anyhow::bail!("Invalid `OptionKind`, was '{invalid}'"),
193    }
194}
195
196fn parse_currency_or_usd_default(value: Result<&str, impl std::error::Error>) -> Currency {
197    match value {
198        Ok(value) if !value.is_empty() => Currency::try_from_str(value).unwrap_or_else(|| {
199            log::warn!("Unknown currency code '{value}', defaulting to USD");
200            Currency::USD()
201        }),
202        Ok(_) => Currency::USD(),
203        Err(e) => {
204            log::error!("Error parsing currency: {e}");
205            Currency::USD()
206        }
207    }
208}
209
210/// Parses a CFI (Classification of Financial Instruments) code to extract asset and instrument classes.
211///
212/// Returns `(None, None)` if `value` has fewer than 3 characters.
213#[must_use]
214pub fn parse_cfi_iso10926(value: &str) -> (Option<AssetClass>, Option<InstrumentClass>) {
215    let chars: Vec<char> = value.chars().collect();
216    if chars.len() < 3 {
217        return (None, None);
218    }
219
220    // TODO: A proper CFI parser would be useful: https://en.wikipedia.org/wiki/ISO_10962
221    let cfi_category = chars[0];
222    let cfi_group = chars[1];
223    let cfi_attribute1 = chars[2];
224    // let cfi_attribute2 = value[3];
225    // let cfi_attribute3 = value[4];
226    // let cfi_attribute4 = value[5];
227
228    let mut asset_class = match cfi_category {
229        'D' => Some(AssetClass::Debt),
230        'E' => Some(AssetClass::Equity),
231        'S' => None,
232        _ => None,
233    };
234
235    let instrument_class = match cfi_group {
236        'I' => Some(InstrumentClass::Future),
237        _ => None,
238    };
239
240    if cfi_attribute1 == 'I' {
241        asset_class = Some(AssetClass::Index);
242    }
243
244    (asset_class, instrument_class)
245}
246
247fn decode_underlying(underlying_str: &str, symbol: &Symbol) -> Ustr {
248    if underlying_str.is_empty() {
249        // Fall back to first whitespace-separated token from symbol
250        symbol
251            .as_str()
252            .split_whitespace()
253            .next()
254            .map_or_else(|| symbol.inner(), Ustr::from)
255    } else {
256        Ustr::from(underlying_str)
257    }
258}
259
260/// Parses a Databento status reason code into a human-readable string.
261///
262/// See: <https://databento.com/docs/schemas-and-data-formats/status#types-of-status-reasons>
263///
264/// # Errors
265///
266/// Returns an error if `value` is an invalid status reason code.
267pub fn parse_status_reason(value: u16) -> anyhow::Result<Option<Ustr>> {
268    let value_str = match value {
269        0 => return Ok(None),
270        1 => "Scheduled",
271        2 => "Surveillance intervention",
272        3 => "Market event",
273        4 => "Instrument activation",
274        5 => "Instrument expiration",
275        6 => "Recovery in process",
276        10 => "Regulatory",
277        11 => "Administrative",
278        12 => "Non-compliance",
279        13 => "Filings not current",
280        14 => "SEC trading suspension",
281        15 => "New issue",
282        16 => "Issue available",
283        17 => "Issues reviewed",
284        18 => "Filing requirements satisfied",
285        30 => "News pending",
286        31 => "News released",
287        32 => "News and resumption times",
288        33 => "News not forthcoming",
289        40 => "Order imbalance",
290        50 => "LULD pause",
291        60 => "Operational",
292        70 => "Additional information requested",
293        80 => "Merger effective",
294        90 => "ETF",
295        100 => "Corporate action",
296        110 => "New Security offering",
297        120 => "Market wide halt level 1",
298        121 => "Market wide halt level 2",
299        122 => "Market wide halt level 3",
300        123 => "Market wide halt carryover",
301        124 => "Market wide halt resumption",
302        130 => "Quotation not available",
303        invalid => anyhow::bail!("Invalid `StatusMsg` reason, was '{invalid}'"),
304    };
305
306    Ok(Some(Ustr::from(value_str)))
307}
308
309/// Parses a Databento status trading event code into a human-readable string.
310///
311/// # Errors
312///
313/// Returns an error if `value` is an invalid status trading event code.
314pub fn parse_status_trading_event(value: u16) -> anyhow::Result<Option<Ustr>> {
315    let value_str = match value {
316        0 => return Ok(None),
317        1 => "No cancel",
318        2 => "Change trading session",
319        3 => "Implied matching on",
320        4 => "Implied matching off",
321        _ => anyhow::bail!("Invalid `StatusMsg` trading_event, was '{value}'"),
322    };
323
324    Ok(Some(Ustr::from(value_str)))
325}
326
327/// Decodes a price, returning an error if undefined.
328///
329/// Databento uses `i64::MAX` as a sentinel value for unset/null prices (see
330/// [`UNDEF_PRICE`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_PRICE.html)).
331///
332/// # Errors
333///
334/// Returns an error if `value` is `i64::MAX` (undefined).
335#[inline(always)]
336pub fn decode_price(value: i64, precision: u8, field_name: &str) -> anyhow::Result<Price> {
337    if value == i64::MAX {
338        anyhow::bail!("Missing required price for `{field_name}`")
339    } else {
340        Ok(Price::from_raw(decode_raw_price_i64(value), precision))
341    }
342}
343
344/// Decodes a price from the given optional value, expressed in units of 1e-9.
345///
346/// Databento uses `i64::MAX` as a sentinel value for unset/null prices (see
347/// [`UNDEF_PRICE`](https://docs.rs/dbn/latest/dbn/constant.UNDEF_PRICE.html)).
348#[inline(always)]
349#[must_use]
350pub fn decode_optional_price(value: i64, precision: u8) -> Option<Price> {
351    if value == i64::MAX {
352        None
353    } else {
354        Some(Price::from_raw(decode_raw_price_i64(value), precision))
355    }
356}
357
358/// Decodes a price, returning `PRICE_UNDEF` if the value is undefined.
359///
360/// This is used for market data where undefined prices should pass through
361/// as `PRICE_UNDEF` rather than causing an error.
362#[inline(always)]
363#[must_use]
364pub fn decode_price_or_undef(value: i64, precision: u8) -> Price {
365    if value == i64::MAX {
366        Price::from_raw(PRICE_UNDEF, 0)
367    } else {
368        Price::from_raw(decode_raw_price_i64(value), precision)
369    }
370}
371
372/// Computes the minimum decimal precision needed to represent a raw price value
373/// expressed in units of 1e-9, by counting trailing decimal zeros.
374///
375/// For example, a raw value of `3_906_250` (representing 0.00390625) has 1 trailing
376/// zero, so the precision is `9 - 1 = 8`.
377#[inline(always)]
378#[must_use]
379pub fn precision_from_raw(value: i64) -> u8 {
380    let mut v = value.unsigned_abs();
381    if v == 0 {
382        return 0;
383    }
384    let mut trailing = 0u8;
385    while trailing < 9 && v.is_multiple_of(10) {
386        v /= 10;
387        trailing += 1;
388    }
389    9 - trailing
390}
391
392/// Decodes a minimum price increment from the given value, expressed in units of 1e-9.
393///
394/// The precision is derived from the actual tick value to avoid truncation of
395/// fractional tick sizes (e.g., treasury futures with 1/256 or 1/32 ticks).
396/// The derived precision is floored at `precision` (typically the currency precision).
397#[inline(always)]
398#[must_use]
399pub fn decode_price_increment(value: i64, precision: u8) -> Price {
400    match value {
401        0 | i64::MAX => Price::new(10f64.powi(-i32::from(precision)), precision),
402        _ => {
403            let derived = precision_from_raw(value).max(precision);
404            Price::from_raw(decode_raw_price_i64(value), derived)
405        }
406    }
407}
408
409/// Decodes a quantity from the given value, expressed in standard whole-number units.
410#[inline(always)]
411#[must_use]
412pub fn decode_quantity(value: u64) -> Quantity {
413    Quantity::from(value)
414}
415
416/// Decodes a quantity from the given optional value, where `i64::MAX` indicates missing data.
417#[inline(always)]
418#[must_use]
419pub fn decode_optional_quantity(value: i64) -> Option<Quantity> {
420    match value {
421        i64::MAX => None,
422        _ => Some(Quantity::from(value)),
423    }
424}
425
426/// Decodes a timestamp, returning an error if undefined.
427///
428/// Databento uses `u64::MAX` as `UNDEF_TIMESTAMP` sentinel for null timestamps.
429///
430/// # Errors
431///
432/// Returns an error if `value` is `u64::MAX` (undefined).
433#[inline(always)]
434pub fn decode_timestamp(value: u64, field_name: &str) -> anyhow::Result<UnixNanos> {
435    if value == dbn::UNDEF_TIMESTAMP {
436        anyhow::bail!("Missing required timestamp for `{field_name}`")
437    } else {
438        Ok(UnixNanos::from(value))
439    }
440}
441
442/// Decodes a timestamp from the given optional value.
443///
444/// Databento uses `u64::MAX` as `UNDEF_TIMESTAMP` sentinel for null timestamps.
445#[inline(always)]
446#[must_use]
447pub fn decode_optional_timestamp(value: u64) -> Option<UnixNanos> {
448    if value == dbn::UNDEF_TIMESTAMP {
449        None
450    } else {
451        Some(UnixNanos::from(value))
452    }
453}
454
455/// Decodes a multiplier from the given value, expressed in units of 1e-9.
456/// Uses exact integer arithmetic to avoid precision loss in financial calculations.
457///
458/// # Errors
459///
460/// Returns an error if value is negative (invalid multiplier).
461pub fn decode_multiplier(value: i64) -> anyhow::Result<Quantity> {
462    const SCALE: u128 = 1_000_000_000;
463
464    match value {
465        0 | i64::MAX => Ok(Quantity::from(1)),
466        v if v < 0 => anyhow::bail!("Invalid negative multiplier: {v}"),
467        v => {
468            // Work in integers: v is fixed-point with 9 fractional digits.
469            // Build a canonical decimal string without floating-point.
470            let abs = v as u128;
471            let int_part = abs / SCALE;
472            let frac_part = abs % SCALE;
473
474            // Format fractional part with exactly 9 digits, then trim trailing zeros
475            // to keep a canonical representation.
476            if frac_part == 0 {
477                // Pure integer
478                Ok(Quantity::from(int_part as u64))
479            } else {
480                let mut frac_str = format!("{frac_part:09}");
481                while frac_str.ends_with('0') {
482                    frac_str.pop();
483                }
484                let s = format!("{int_part}.{frac_str}");
485                Ok(Quantity::from(s))
486            }
487        }
488    }
489}
490
491/// Decodes a lot size from the given value, expressed in standard whole-number units.
492#[inline(always)]
493#[must_use]
494pub fn decode_lot_size(value: i32) -> Quantity {
495    match value {
496        0 | i32::MAX => Quantity::from(1),
497        value => Quantity::from(value),
498    }
499}
500
501#[inline(always)]
502#[must_use]
503fn is_trade_msg(action: c_char) -> bool {
504    action as u8 as char == 'T'
505}
506
507/// Returns `true` if both bid and ask prices are defined (not `i64::MAX`).
508///
509/// Databento uses `i64::MAX` as a sentinel value for undefined/null prices.
510/// A valid quote requires both sides to be defined.
511#[inline(always)]
512#[must_use]
513fn has_valid_bid_ask(bid_px: i64, ask_px: i64) -> bool {
514    bid_px != i64::MAX && ask_px != i64::MAX
515}
516
517/// Decodes a Databento MBO (Market by Order) message into an order book delta or trade.
518///
519/// Returns a tuple containing either an `OrderBookDelta` or a `TradeTick`, depending on
520/// whether the message represents an order book update or a trade execution.
521///
522/// # Errors
523///
524/// Returns an error if decoding the MBO message fails.
525pub fn decode_mbo_msg(
526    msg: &dbn::MboMsg,
527    instrument_id: InstrumentId,
528    price_precision: u8,
529    ts_init: Option<UnixNanos>,
530    include_trades: bool,
531) -> anyhow::Result<(Option<OrderBookDelta>, Option<TradeTick>)> {
532    let side = parse_order_side(msg.side);
533    if is_trade_msg(msg.action) {
534        if include_trades && msg.size > 0 {
535            let price = decode_price_or_undef(msg.price, price_precision);
536            let size = decode_quantity(msg.size as u64);
537            let aggressor_side = parse_aggressor_side(msg.side);
538            let trade_id = TradeId::new(itoa::Buffer::new().format(msg.sequence));
539            let ts_event = msg.ts_recv.into();
540            let ts_init = ts_init.unwrap_or(ts_event);
541
542            let trade = TradeTick::new(
543                instrument_id,
544                price,
545                size,
546                aggressor_side,
547                trade_id,
548                ts_event,
549                ts_init,
550            );
551            return Ok((None, Some(trade)));
552        }
553
554        return Ok((None, None));
555    }
556
557    let action = parse_book_action(msg.action)?;
558    let price = decode_price_or_undef(msg.price, price_precision);
559    let size = decode_quantity(msg.size as u64);
560    let order = BookOrder::new(side, price, size, msg.order_id);
561
562    let ts_event = msg.ts_recv.into();
563    let ts_init = ts_init.unwrap_or(ts_event);
564
565    let delta = OrderBookDelta::new(
566        instrument_id,
567        action,
568        order,
569        msg.flags.raw(),
570        msg.sequence.into(),
571        ts_event,
572        ts_init,
573    );
574
575    Ok((Some(delta), None))
576}
577
578/// Decodes a Databento Trade message into a `TradeTick`.
579///
580/// # Errors
581///
582/// Returns an error if decoding the Trade message fails.
583pub fn decode_trade_msg(
584    msg: &dbn::TradeMsg,
585    instrument_id: InstrumentId,
586    price_precision: u8,
587    ts_init: Option<UnixNanos>,
588) -> anyhow::Result<TradeTick> {
589    let ts_event = msg.ts_recv.into();
590    let ts_init = ts_init.unwrap_or(ts_event);
591
592    let trade = TradeTick::new(
593        instrument_id,
594        decode_price_or_undef(msg.price, price_precision),
595        decode_quantity(msg.size as u64),
596        parse_aggressor_side(msg.side),
597        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
598        ts_event,
599        ts_init,
600    );
601
602    Ok(trade)
603}
604
605/// Decodes a Databento TBBO (Top of Book with Trade) message into quote and trade ticks.
606///
607/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
608/// The trade is always returned.
609///
610/// # Errors
611///
612/// Returns an error if decoding the TBBO message fails.
613pub fn decode_tbbo_msg(
614    msg: &dbn::TbboMsg,
615    instrument_id: InstrumentId,
616    price_precision: u8,
617    ts_init: Option<UnixNanos>,
618) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
619    let top_level = &msg.levels[0];
620    let ts_event = msg.ts_recv.into();
621    let ts_init = ts_init.unwrap_or(ts_event);
622
623    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
624        Some(QuoteTick::new(
625            instrument_id,
626            decode_price_or_undef(top_level.bid_px, price_precision),
627            decode_price_or_undef(top_level.ask_px, price_precision),
628            decode_quantity(top_level.bid_sz as u64),
629            decode_quantity(top_level.ask_sz as u64),
630            ts_event,
631            ts_init,
632        ))
633    } else {
634        None
635    };
636
637    let trade = TradeTick::new(
638        instrument_id,
639        decode_price_or_undef(msg.price, price_precision),
640        decode_quantity(msg.size as u64),
641        parse_aggressor_side(msg.side),
642        TradeId::new(itoa::Buffer::new().format(msg.sequence)),
643        ts_event,
644        ts_init,
645    );
646
647    Ok((maybe_quote, trade))
648}
649
650/// Decodes a Databento MBP1 (Market by Price Level 1) message into quote and optional trade ticks.
651///
652/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
653///
654/// # Errors
655///
656/// Returns an error if decoding the MBP1 message fails.
657pub fn decode_mbp1_msg(
658    msg: &dbn::Mbp1Msg,
659    instrument_id: InstrumentId,
660    price_precision: u8,
661    ts_init: Option<UnixNanos>,
662    include_trades: bool,
663) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
664    let top_level = &msg.levels[0];
665    let ts_event = msg.ts_recv.into();
666    let ts_init = ts_init.unwrap_or(ts_event);
667
668    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
669        Some(QuoteTick::new(
670            instrument_id,
671            decode_price_or_undef(top_level.bid_px, price_precision),
672            decode_price_or_undef(top_level.ask_px, price_precision),
673            decode_quantity(top_level.bid_sz as u64),
674            decode_quantity(top_level.ask_sz as u64),
675            ts_event,
676            ts_init,
677        ))
678    } else {
679        None
680    };
681
682    let maybe_trade = if include_trades && is_trade_msg(msg.action) {
683        Some(TradeTick::new(
684            instrument_id,
685            decode_price_or_undef(msg.price, price_precision),
686            decode_quantity(msg.size as u64),
687            parse_aggressor_side(msg.side),
688            TradeId::new(itoa::Buffer::new().format(msg.sequence)),
689            ts_event,
690            ts_init,
691        ))
692    } else {
693        None
694    };
695
696    Ok((maybe_quote, maybe_trade))
697}
698
699/// Decodes a Databento BBO (Best Bid and Offer) message into a `QuoteTick`.
700///
701/// Returns `None` if either bid or ask price is undefined (`i64::MAX`).
702///
703/// # Errors
704///
705/// Returns an error if decoding the BBO message fails.
706pub fn decode_bbo_msg(
707    msg: &dbn::BboMsg,
708    instrument_id: InstrumentId,
709    price_precision: u8,
710    ts_init: Option<UnixNanos>,
711) -> anyhow::Result<Option<QuoteTick>> {
712    let top_level = &msg.levels[0];
713    if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
714        return Ok(None);
715    }
716
717    let ts_event = msg.ts_recv.into();
718    let ts_init = ts_init.unwrap_or(ts_event);
719
720    let quote = QuoteTick::new(
721        instrument_id,
722        decode_price_or_undef(top_level.bid_px, price_precision),
723        decode_price_or_undef(top_level.ask_px, price_precision),
724        decode_quantity(top_level.bid_sz as u64),
725        decode_quantity(top_level.ask_sz as u64),
726        ts_event,
727        ts_init,
728    );
729
730    Ok(Some(quote))
731}
732
733/// Decodes a Databento MBP10 (Market by Price 10 levels) message into an `OrderBookDepth10`.
734///
735/// # Errors
736///
737/// Returns an error if the number of levels in `msg.levels` is not exactly `DEPTH10_LEN`.
738pub fn decode_mbp10_msg(
739    msg: &dbn::Mbp10Msg,
740    instrument_id: InstrumentId,
741    price_precision: u8,
742    ts_init: Option<UnixNanos>,
743) -> anyhow::Result<OrderBookDepth10> {
744    let mut bids = Vec::with_capacity(DEPTH10_LEN);
745    let mut asks = Vec::with_capacity(DEPTH10_LEN);
746    let mut bid_counts = Vec::with_capacity(DEPTH10_LEN);
747    let mut ask_counts = Vec::with_capacity(DEPTH10_LEN);
748
749    for level in &msg.levels {
750        let bid_order = BookOrder::new(
751            OrderSide::Buy,
752            decode_price_or_undef(level.bid_px, price_precision),
753            decode_quantity(level.bid_sz as u64),
754            0,
755        );
756
757        let ask_order = BookOrder::new(
758            OrderSide::Sell,
759            decode_price_or_undef(level.ask_px, price_precision),
760            decode_quantity(level.ask_sz as u64),
761            0,
762        );
763
764        bids.push(bid_order);
765        asks.push(ask_order);
766        bid_counts.push(level.bid_ct);
767        ask_counts.push(level.ask_ct);
768    }
769
770    let bids: [BookOrder; DEPTH10_LEN] = bids.try_into().map_err(|v: Vec<BookOrder>| {
771        anyhow::anyhow!(
772            "Expected exactly {DEPTH10_LEN} bid levels, received {}",
773            v.len()
774        )
775    })?;
776
777    let asks: [BookOrder; DEPTH10_LEN] = asks.try_into().map_err(|v: Vec<BookOrder>| {
778        anyhow::anyhow!(
779            "Expected exactly {DEPTH10_LEN} ask levels, received {}",
780            v.len()
781        )
782    })?;
783
784    let bid_counts: [u32; DEPTH10_LEN] = bid_counts.try_into().map_err(|v: Vec<u32>| {
785        anyhow::anyhow!(
786            "Expected exactly {DEPTH10_LEN} bid counts, received {}",
787            v.len()
788        )
789    })?;
790
791    let ask_counts: [u32; DEPTH10_LEN] = ask_counts.try_into().map_err(|v: Vec<u32>| {
792        anyhow::anyhow!(
793            "Expected exactly {DEPTH10_LEN} ask counts, received {}",
794            v.len()
795        )
796    })?;
797
798    let ts_event = msg.ts_recv.into();
799    let ts_init = ts_init.unwrap_or(ts_event);
800
801    let depth = OrderBookDepth10::new(
802        instrument_id,
803        bids,
804        asks,
805        bid_counts,
806        ask_counts,
807        msg.flags.raw(),
808        msg.sequence.into(),
809        ts_event,
810        ts_init,
811    );
812
813    Ok(depth)
814}
815
816/// Decodes a Databento CMBP1 (Consolidated Market by Price Level 1) message.
817///
818/// Returns a tuple containing an optional `QuoteTick` and an optional `TradeTick`.
819/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
820///
821/// # Errors
822///
823/// Returns an error if decoding the CMBP1 message fails.
824pub fn decode_cmbp1_msg(
825    msg: &dbn::Cmbp1Msg,
826    instrument_id: InstrumentId,
827    price_precision: u8,
828    ts_init: Option<UnixNanos>,
829    include_trades: bool,
830) -> anyhow::Result<(Option<QuoteTick>, Option<TradeTick>)> {
831    let top_level = &msg.levels[0];
832    let ts_event = msg.ts_recv.into();
833    let ts_init = ts_init.unwrap_or(ts_event);
834
835    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
836        Some(QuoteTick::new(
837            instrument_id,
838            decode_price_or_undef(top_level.bid_px, price_precision),
839            decode_price_or_undef(top_level.ask_px, price_precision),
840            decode_quantity(top_level.bid_sz as u64),
841            decode_quantity(top_level.ask_sz as u64),
842            ts_event,
843            ts_init,
844        ))
845    } else {
846        None
847    };
848
849    let maybe_trade = if include_trades && is_trade_msg(msg.action) {
850        // CMBP1 does not publish a native trade ID; derive a deterministic one
851        let trade_id = derive_cmbp_trade_id(
852            instrument_id,
853            msg.hd.ts_event,
854            msg.ts_recv,
855            msg.price,
856            msg.size,
857            msg.side,
858        );
859        Some(TradeTick::new(
860            instrument_id,
861            decode_price_or_undef(msg.price, price_precision),
862            decode_quantity(msg.size as u64),
863            parse_aggressor_side(msg.side),
864            trade_id,
865            ts_event,
866            ts_init,
867        ))
868    } else {
869        None
870    };
871
872    Ok((maybe_quote, maybe_trade))
873}
874
875/// Decodes a Databento CBBO (Consolidated Best Bid and Offer) message.
876///
877/// Returns `None` if either bid or ask price is undefined (`i64::MAX`).
878///
879/// # Errors
880///
881/// Returns an error if decoding the CBBO message fails.
882pub fn decode_cbbo_msg(
883    msg: &dbn::CbboMsg,
884    instrument_id: InstrumentId,
885    price_precision: u8,
886    ts_init: Option<UnixNanos>,
887) -> anyhow::Result<Option<QuoteTick>> {
888    let top_level = &msg.levels[0];
889    if !has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
890        return Ok(None);
891    }
892
893    let ts_event = msg.ts_recv.into();
894    let ts_init = ts_init.unwrap_or(ts_event);
895
896    let quote = QuoteTick::new(
897        instrument_id,
898        decode_price_or_undef(top_level.bid_px, price_precision),
899        decode_price_or_undef(top_level.ask_px, price_precision),
900        decode_quantity(top_level.bid_sz as u64),
901        decode_quantity(top_level.ask_sz as u64),
902        ts_event,
903        ts_init,
904    );
905
906    Ok(Some(quote))
907}
908
909/// Decodes a Databento TCBBO (Consolidated Top of Book with Trade) message.
910///
911/// Returns `None` for the quote if either bid or ask price is undefined (`i64::MAX`).
912/// The trade is always returned.
913///
914/// # Errors
915///
916/// Returns an error if decoding the TCBBO message fails.
917pub fn decode_tcbbo_msg(
918    msg: &dbn::CbboMsg,
919    instrument_id: InstrumentId,
920    price_precision: u8,
921    ts_init: Option<UnixNanos>,
922) -> anyhow::Result<(Option<QuoteTick>, TradeTick)> {
923    let top_level = &msg.levels[0];
924    let ts_event = msg.ts_recv.into();
925    let ts_init = ts_init.unwrap_or(ts_event);
926
927    let maybe_quote = if has_valid_bid_ask(top_level.bid_px, top_level.ask_px) {
928        Some(QuoteTick::new(
929            instrument_id,
930            decode_price_or_undef(top_level.bid_px, price_precision),
931            decode_price_or_undef(top_level.ask_px, price_precision),
932            decode_quantity(top_level.bid_sz as u64),
933            decode_quantity(top_level.ask_sz as u64),
934            ts_event,
935            ts_init,
936        ))
937    } else {
938        None
939    };
940
941    // TCBBO does not publish a native trade ID; derive a deterministic one
942    let trade_id = derive_cmbp_trade_id(
943        instrument_id,
944        msg.hd.ts_event,
945        msg.ts_recv,
946        msg.price,
947        msg.size,
948        msg.side,
949    );
950    let trade = TradeTick::new(
951        instrument_id,
952        decode_price_or_undef(msg.price, price_precision),
953        decode_quantity(msg.size as u64),
954        parse_aggressor_side(msg.side),
955        trade_id,
956        ts_event,
957        ts_init,
958    );
959
960    Ok((maybe_quote, trade))
961}
962
963/// # Errors
964///
965/// Returns an error if `rtype` is not a supported bar aggregation.
966pub fn decode_bar_type(
967    msg: &dbn::OhlcvMsg,
968    instrument_id: InstrumentId,
969) -> anyhow::Result<BarType> {
970    let bar_type = match msg.hd.rtype {
971        32 => {
972            // ohlcv-1s
973            BarType::new(instrument_id, BAR_SPEC_1S, AggregationSource::External)
974        }
975        33 => {
976            // ohlcv-1m
977            BarType::new(instrument_id, BAR_SPEC_1M, AggregationSource::External)
978        }
979        34 => {
980            // ohlcv-1h
981            BarType::new(instrument_id, BAR_SPEC_1H, AggregationSource::External)
982        }
983        35 => {
984            // ohlcv-1d
985            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
986        }
987        36 => {
988            // ohlcv-eod
989            BarType::new(instrument_id, BAR_SPEC_1D, AggregationSource::External)
990        }
991        _ => anyhow::bail!(
992            "`rtype` is not a supported bar aggregation, was {}",
993            msg.hd.rtype
994        ),
995    };
996
997    Ok(bar_type)
998}
999
1000/// # Errors
1001///
1002/// Returns an error if `rtype` is not a supported bar aggregation.
1003pub fn decode_ts_event_adjustment(msg: &dbn::OhlcvMsg) -> anyhow::Result<UnixNanos> {
1004    let adjustment = match msg.hd.rtype {
1005        32 => {
1006            // ohlcv-1s
1007            BAR_CLOSE_ADJUSTMENT_1S
1008        }
1009        33 => {
1010            // ohlcv-1m
1011            BAR_CLOSE_ADJUSTMENT_1M
1012        }
1013        34 => {
1014            // ohlcv-1h
1015            BAR_CLOSE_ADJUSTMENT_1H
1016        }
1017        35 | 36 => {
1018            // ohlcv-1d and ohlcv-eod
1019            BAR_CLOSE_ADJUSTMENT_1D
1020        }
1021        _ => anyhow::bail!(
1022            "`rtype` is not a supported bar aggregation, was {}",
1023            msg.hd.rtype
1024        ),
1025    };
1026
1027    Ok(adjustment.into())
1028}
1029
1030/// # Errors
1031///
1032/// Returns an error if decoding the OHLCV message fails.
1033pub fn decode_ohlcv_msg(
1034    msg: &dbn::OhlcvMsg,
1035    instrument_id: InstrumentId,
1036    price_precision: u8,
1037    ts_init: Option<UnixNanos>,
1038    timestamp_on_close: bool,
1039) -> anyhow::Result<Bar> {
1040    let bar_type = decode_bar_type(msg, instrument_id)?;
1041    let ts_event_adjustment = decode_ts_event_adjustment(msg)?;
1042
1043    let ts_event_raw = msg.hd.ts_event.into();
1044    let ts_close = ts_event_raw + ts_event_adjustment;
1045    let ts_init = ts_init.unwrap_or(ts_close); // received time or close time
1046
1047    let ts_event = if timestamp_on_close {
1048        ts_close
1049    } else {
1050        ts_event_raw
1051    };
1052
1053    let bar = Bar::new(
1054        bar_type,
1055        decode_price_or_undef(msg.open, price_precision),
1056        decode_price_or_undef(msg.high, price_precision),
1057        decode_price_or_undef(msg.low, price_precision),
1058        decode_price_or_undef(msg.close, price_precision),
1059        decode_quantity(msg.volume),
1060        ts_event,
1061        ts_init,
1062    );
1063
1064    Ok(bar)
1065}
1066
1067/// Decodes a Databento status message into an `InstrumentStatus` event.
1068///
1069/// # Errors
1070///
1071/// Returns an error if decoding the status message fails or if `msg.action` is not a valid `MarketStatusAction`.
1072pub fn decode_status_msg(
1073    msg: &dbn::StatusMsg,
1074    instrument_id: InstrumentId,
1075    ts_init: Option<UnixNanos>,
1076) -> anyhow::Result<InstrumentStatus> {
1077    let ts_event = msg.hd.ts_event.into();
1078    let ts_init = ts_init.unwrap_or(ts_event);
1079
1080    let action = MarketStatusAction::from_u16(msg.action)
1081        .ok_or_else(|| anyhow::anyhow!("Invalid `MarketStatusAction` value: {}", msg.action))?;
1082
1083    let status = InstrumentStatus::new(
1084        instrument_id,
1085        action,
1086        ts_event,
1087        ts_init,
1088        parse_status_reason(msg.reason)?,
1089        parse_status_trading_event(msg.trading_event)?,
1090        parse_optional_bool(msg.is_trading),
1091        parse_optional_bool(msg.is_quoting),
1092        parse_optional_bool(msg.is_short_sell_restricted),
1093    );
1094
1095    Ok(status)
1096}
1097
1098/// # Errors
1099///
1100/// Returns an error if decoding the record type fails or encounters unsupported message.
1101pub fn decode_record(
1102    record: &dbn::RecordRef,
1103    instrument_id: InstrumentId,
1104    price_precision: u8,
1105    ts_init: Option<UnixNanos>,
1106    include_trades: bool,
1107    bars_timestamp_on_close: bool,
1108) -> anyhow::Result<(Option<Data>, Option<Data>)> {
1109    // Note: TBBO and TCBBO messages provide both quotes and trades.
1110    // TBBO is handled explicitly below, while TCBBO is handled by
1111    // the CbboMsg branch based on whether it has trade data.
1112    let result = if let Some(msg) = record.get::<dbn::MboMsg>() {
1113        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1114        let result = decode_mbo_msg(
1115            msg,
1116            instrument_id,
1117            price_precision,
1118            Some(ts_init),
1119            include_trades,
1120        )?;
1121
1122        match result {
1123            (Some(delta), None) => (Some(Data::Delta(delta)), None),
1124            (None, Some(trade)) => (Some(Data::Trade(trade)), None),
1125            (None, None) => (None, None),
1126            _ => anyhow::bail!("Invalid `MboMsg` parsing combination"),
1127        }
1128    } else if let Some(msg) = record.get::<dbn::TradeMsg>() {
1129        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1130        let trade = decode_trade_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1131        (Some(Data::Trade(trade)), None)
1132    } else if let Some(msg) = record.get::<dbn::Mbp1Msg>() {
1133        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1134        let (maybe_quote, maybe_trade) = decode_mbp1_msg(
1135            msg,
1136            instrument_id,
1137            price_precision,
1138            Some(ts_init),
1139            include_trades,
1140        )?;
1141        (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1142    } else if let Some(msg) = record.get::<dbn::Bbo1SMsg>() {
1143        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1144        let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1145        (maybe_quote.map(Data::Quote), None)
1146    } else if let Some(msg) = record.get::<dbn::Bbo1MMsg>() {
1147        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1148        let maybe_quote = decode_bbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1149        (maybe_quote.map(Data::Quote), None)
1150    } else if let Some(msg) = record.get::<dbn::Mbp10Msg>() {
1151        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1152        let depth = decode_mbp10_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1153        (Some(Data::from(depth)), None)
1154    } else if let Some(msg) = record.get::<dbn::OhlcvMsg>() {
1155        // if ts_init is None (like with historical data) we don't want it to be equal to ts_event
1156        // it will be set correctly in decode_ohlcv_msg instead
1157        let bar = decode_ohlcv_msg(
1158            msg,
1159            instrument_id,
1160            price_precision,
1161            ts_init,
1162            bars_timestamp_on_close,
1163        )?;
1164        (Some(Data::Bar(bar)), None)
1165    } else if let Some(msg) = record.get::<dbn::Cmbp1Msg>() {
1166        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1167        let (maybe_quote, maybe_trade) = decode_cmbp1_msg(
1168            msg,
1169            instrument_id,
1170            price_precision,
1171            Some(ts_init),
1172            include_trades,
1173        )?;
1174        (maybe_quote.map(Data::Quote), maybe_trade.map(Data::Trade))
1175    } else if let Some(msg) = record.get::<dbn::TbboMsg>() {
1176        // TBBO always has a trade, quote may be skipped if prices undefined
1177        let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1178        let (maybe_quote, trade) =
1179            decode_tbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1180        (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1181    } else if let Some(msg) = record.get::<dbn::CbboMsg>() {
1182        // Check if this is a TCBBO or regular CBBO based on whether it has trade data
1183        if msg.price != i64::MAX && msg.size > 0 {
1184            // TCBBO - has a trade, quote may be skipped if prices undefined
1185            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1186            let (maybe_quote, trade) =
1187                decode_tcbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1188            (maybe_quote.map(Data::Quote), Some(Data::Trade(trade)))
1189        } else {
1190            // Regular CBBO - quote only (may be None if prices undefined)
1191            let ts_init = determine_timestamp(ts_init, msg.ts_recv.into());
1192            let maybe_quote = decode_cbbo_msg(msg, instrument_id, price_precision, Some(ts_init))?;
1193            (maybe_quote.map(Data::Quote), None)
1194        }
1195    } else {
1196        anyhow::bail!("DBN message type is not currently supported")
1197    };
1198
1199    Ok(result)
1200}
1201
1202const fn determine_timestamp(ts_init: Option<UnixNanos>, msg_timestamp: UnixNanos) -> UnixNanos {
1203    match ts_init {
1204        Some(ts_init) => ts_init,
1205        None => msg_timestamp,
1206    }
1207}
1208
1209/// # Errors
1210///
1211/// Returns an error if decoding the `InstrumentDefMsg` fails.
1212pub fn decode_instrument_def_msg(
1213    msg: &dbn::InstrumentDefMsg,
1214    instrument_id: InstrumentId,
1215    ts_init: Option<UnixNanos>,
1216) -> anyhow::Result<InstrumentAny> {
1217    match msg.instrument_class as u8 as char {
1218        'K' => Ok(InstrumentAny::Equity(decode_equity(
1219            msg,
1220            instrument_id,
1221            ts_init,
1222        )?)),
1223        'F' => Ok(InstrumentAny::FuturesContract(decode_futures_contract(
1224            msg,
1225            instrument_id,
1226            ts_init,
1227        )?)),
1228        'S' => Ok(InstrumentAny::FuturesSpread(decode_futures_spread(
1229            msg,
1230            instrument_id,
1231            ts_init,
1232        )?)),
1233        'C' | 'P' => Ok(InstrumentAny::OptionContract(decode_option_contract(
1234            msg,
1235            instrument_id,
1236            ts_init,
1237        )?)),
1238        'T' | 'M' => Ok(InstrumentAny::OptionSpread(decode_option_spread(
1239            msg,
1240            instrument_id,
1241            ts_init,
1242        )?)),
1243        'B' => anyhow::bail!("Unsupported `instrument_class` 'B' (Bond)"),
1244        'X' => anyhow::bail!("Unsupported `instrument_class` 'X' (FX spot)"),
1245        _ => anyhow::bail!(
1246            "Unsupported `instrument_class` '{}'",
1247            msg.instrument_class as u8 as char
1248        ),
1249    }
1250}
1251
1252/// Decodes a Databento instrument definition message into an `Equity` instrument.
1253///
1254/// # Errors
1255///
1256/// Returns an error if parsing or constructing `Equity` fails.
1257pub fn decode_equity(
1258    msg: &dbn::InstrumentDefMsg,
1259    instrument_id: InstrumentId,
1260    ts_init: Option<UnixNanos>,
1261) -> anyhow::Result<Equity> {
1262    let currency = parse_currency_or_usd_default(msg.currency());
1263    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1264    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1265    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1266    let ts_init = ts_init.unwrap_or(ts_event);
1267
1268    Ok(Equity::new(
1269        instrument_id,
1270        instrument_id.symbol,
1271        None, // No ISIN available yet
1272        currency,
1273        price_increment.precision,
1274        price_increment,
1275        Some(lot_size),
1276        None, // max_quantity
1277        None, // min_quantity
1278        None, // max_price
1279        None, // min_price
1280        None, // margin_init
1281        None, // margin_maint
1282        None, // maker_fee
1283        None, // taker_fee
1284        None, // info
1285        ts_event,
1286        ts_init,
1287    ))
1288}
1289
1290/// Decodes a Databento instrument definition message into a `FuturesContract` instrument.
1291///
1292/// # Errors
1293///
1294/// Returns an error if parsing or constructing `FuturesContract` fails.
1295pub fn decode_futures_contract(
1296    msg: &dbn::InstrumentDefMsg,
1297    instrument_id: InstrumentId,
1298    ts_init: Option<UnixNanos>,
1299) -> anyhow::Result<FuturesContract> {
1300    let currency = parse_currency_or_usd_default(msg.currency());
1301    let exchange = Ustr::from(msg.exchange()?);
1302    let underlying = decode_underlying(msg.asset()?, &instrument_id.symbol);
1303    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1304    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1305    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1306    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1307    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1308    let ts_init = ts_init.unwrap_or(ts_event);
1309
1310    Ok(FuturesContract::new_checked(
1311        instrument_id,
1312        instrument_id.symbol,
1313        asset_class.unwrap_or(AssetClass::Commodity),
1314        Some(exchange),
1315        underlying,
1316        decode_optional_timestamp(msg.activation).unwrap_or_default(),
1317        decode_timestamp(msg.expiration, "expiration")?,
1318        currency,
1319        price_increment.precision,
1320        price_increment,
1321        multiplier,
1322        lot_size,
1323        None, // max_quantity
1324        None, // min_quantity
1325        None, // max_price
1326        None, // min_price
1327        None, // margin_init
1328        None, // margin_maint
1329        None, // maker_fee
1330        None, // taker_fee
1331        None, // info
1332        ts_event,
1333        ts_init,
1334    )?)
1335}
1336
1337/// Decodes a Databento instrument definition message into a `FuturesSpread` instrument.
1338///
1339/// # Errors
1340///
1341/// Returns an error if parsing or constructing `FuturesSpread` fails.
1342pub fn decode_futures_spread(
1343    msg: &dbn::InstrumentDefMsg,
1344    instrument_id: InstrumentId,
1345    ts_init: Option<UnixNanos>,
1346) -> anyhow::Result<FuturesSpread> {
1347    let exchange = Ustr::from(msg.exchange()?);
1348    let underlying = decode_underlying(msg.asset()?, &instrument_id.symbol);
1349    let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1350    let strategy_type = Ustr::from(msg.secsubtype()?);
1351    let currency = parse_currency_or_usd_default(msg.currency());
1352    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1353    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1354    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1355    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1356    let ts_init = ts_init.unwrap_or(ts_event);
1357
1358    Ok(FuturesSpread::new_checked(
1359        instrument_id,
1360        instrument_id.symbol,
1361        asset_class.unwrap_or(AssetClass::Commodity),
1362        Some(exchange),
1363        underlying,
1364        strategy_type,
1365        decode_optional_timestamp(msg.activation).unwrap_or_default(),
1366        decode_timestamp(msg.expiration, "expiration")?,
1367        currency,
1368        price_increment.precision,
1369        price_increment,
1370        multiplier,
1371        lot_size,
1372        None, // max_quantity
1373        None, // min_quantity
1374        None, // max_price
1375        None, // min_price
1376        None, // margin_init
1377        None, // margin_maint
1378        None, // maker_fee
1379        None, // taker_fee
1380        None, // info
1381        ts_event,
1382        ts_init,
1383    )?)
1384}
1385
1386/// Decodes a Databento instrument definition message into an `OptionContract` instrument.
1387///
1388/// # Errors
1389///
1390/// Returns an error if parsing or constructing `OptionContract` fails.
1391pub fn decode_option_contract(
1392    msg: &dbn::InstrumentDefMsg,
1393    instrument_id: InstrumentId,
1394    ts_init: Option<UnixNanos>,
1395) -> anyhow::Result<OptionContract> {
1396    let currency = parse_currency_or_usd_default(msg.currency());
1397    let strike_price_currency = parse_currency_or_usd_default(msg.strike_price_currency());
1398    let exchange = Ustr::from(msg.exchange()?);
1399    let underlying = decode_underlying(msg.underlying()?, &instrument_id.symbol);
1400    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1401        Some(AssetClass::Equity)
1402    } else {
1403        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1404        asset_class
1405    };
1406    let option_kind = parse_option_kind(msg.instrument_class)?;
1407    let strike_price = decode_price(
1408        msg.strike_price,
1409        strike_price_currency.precision,
1410        "strike_price",
1411    )?;
1412    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1413    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1414    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1415    let ts_event = UnixNanos::from(msg.ts_recv); // More accurate and reliable timestamp
1416    let ts_init = ts_init.unwrap_or(ts_event);
1417
1418    Ok(OptionContract::new_checked(
1419        instrument_id,
1420        instrument_id.symbol,
1421        asset_class_opt.unwrap_or(AssetClass::Commodity),
1422        Some(exchange),
1423        underlying,
1424        option_kind,
1425        strike_price,
1426        currency,
1427        decode_optional_timestamp(msg.activation).unwrap_or_default(),
1428        decode_timestamp(msg.expiration, "expiration")?,
1429        price_increment.precision,
1430        price_increment,
1431        multiplier,
1432        lot_size,
1433        None, // max_quantity
1434        None, // min_quantity
1435        None, // max_price
1436        None, // min_price
1437        None, // margin_init
1438        None, // margin_maint
1439        None, // maker_fee
1440        None, // taker_fee
1441        None, // info
1442        ts_event,
1443        ts_init,
1444    )?)
1445}
1446
1447/// Decodes a Databento instrument definition message into an `OptionSpread` instrument.
1448///
1449/// # Errors
1450///
1451/// Returns an error if parsing or constructing `OptionSpread` fails.
1452pub fn decode_option_spread(
1453    msg: &dbn::InstrumentDefMsg,
1454    instrument_id: InstrumentId,
1455    ts_init: Option<UnixNanos>,
1456) -> anyhow::Result<OptionSpread> {
1457    let exchange = Ustr::from(msg.exchange()?);
1458    let underlying = decode_underlying(msg.underlying()?, &instrument_id.symbol);
1459    let asset_class_opt = if instrument_id.venue.as_str() == "OPRA" {
1460        Some(AssetClass::Equity)
1461    } else {
1462        let (asset_class, _) = parse_cfi_iso10926(msg.cfi()?);
1463        asset_class
1464    };
1465    let strategy_type = Ustr::from(msg.secsubtype()?);
1466    let currency = parse_currency_or_usd_default(msg.currency());
1467    let price_increment = decode_price_increment(msg.min_price_increment, currency.precision);
1468    let multiplier = decode_multiplier(msg.unit_of_measure_qty)?;
1469    let lot_size = decode_lot_size(msg.min_lot_size_round_lot);
1470    let ts_event = msg.ts_recv.into(); // More accurate and reliable timestamp
1471    let ts_init = ts_init.unwrap_or(ts_event);
1472
1473    Ok(OptionSpread::new_checked(
1474        instrument_id,
1475        instrument_id.symbol,
1476        asset_class_opt.unwrap_or(AssetClass::Commodity),
1477        Some(exchange),
1478        underlying,
1479        strategy_type,
1480        decode_optional_timestamp(msg.activation).unwrap_or_default(),
1481        decode_timestamp(msg.expiration, "expiration")?,
1482        currency,
1483        price_increment.precision,
1484        price_increment,
1485        multiplier,
1486        lot_size,
1487        None, // max_quantity
1488        None, // min_quantity
1489        None, // max_price
1490        None, // min_price
1491        None, // margin_init
1492        None, // margin_maint
1493        None, // maker_fee
1494        None, // taker_fee
1495        None, // info
1496        ts_event,
1497        ts_init,
1498    )?)
1499}
1500
1501/// Decodes a Databento imbalance message into a `DatabentoImbalance` event.
1502///
1503/// # Errors
1504///
1505/// Returns an error if constructing `DatabentoImbalance` fails.
1506pub fn decode_imbalance_msg(
1507    msg: &dbn::ImbalanceMsg,
1508    instrument_id: InstrumentId,
1509    price_precision: u8,
1510    ts_init: Option<UnixNanos>,
1511) -> anyhow::Result<DatabentoImbalance> {
1512    let ts_event = msg.ts_recv.into();
1513    let ts_init = ts_init.unwrap_or(ts_event);
1514
1515    Ok(DatabentoImbalance::new(
1516        instrument_id,
1517        decode_price_or_undef(msg.ref_price, price_precision),
1518        decode_price_or_undef(msg.cont_book_clr_price, price_precision),
1519        decode_price_or_undef(msg.auct_interest_clr_price, price_precision),
1520        Quantity::new(f64::from(msg.paired_qty), 0),
1521        Quantity::new(f64::from(msg.total_imbalance_qty), 0),
1522        parse_order_side(msg.side),
1523        msg.significant_imbalance as c_char,
1524        msg.hd.ts_event.into(),
1525        ts_event,
1526        ts_init,
1527    ))
1528}
1529
1530/// Decodes a Databento statistics message into a `DatabentoStatistics` event.
1531///
1532/// # Errors
1533///
1534/// Returns an error if constructing `DatabentoStatistics` fails or if `msg.stat_type` or
1535/// `msg.update_action` is not a valid enum variant.
1536pub fn decode_statistics_msg(
1537    msg: &dbn::StatMsg,
1538    instrument_id: InstrumentId,
1539    price_precision: u8,
1540    ts_init: Option<UnixNanos>,
1541) -> anyhow::Result<DatabentoStatistics> {
1542    let stat_type = DatabentoStatisticType::from_u8(msg.stat_type as u8)
1543        .ok_or_else(|| anyhow::anyhow!("Invalid value for `stat_type`: {}", msg.stat_type))?;
1544    let update_action =
1545        DatabentoStatisticUpdateAction::from_u8(msg.update_action).ok_or_else(|| {
1546            anyhow::anyhow!("Invalid value for `update_action`: {}", msg.update_action)
1547        })?;
1548    let ts_event = msg.ts_recv.into();
1549    let ts_init = ts_init.unwrap_or(ts_event);
1550
1551    Ok(DatabentoStatistics::new(
1552        instrument_id,
1553        stat_type,
1554        update_action,
1555        decode_optional_price(msg.price, price_precision),
1556        decode_optional_quantity(msg.quantity),
1557        msg.channel_id,
1558        msg.stat_flags,
1559        msg.sequence,
1560        msg.ts_ref.into(),
1561        msg.ts_in_delta,
1562        msg.hd.ts_event.into(),
1563        ts_event,
1564        ts_init,
1565    ))
1566}
1567
1568#[cfg(test)]
1569mod tests {
1570    use std::path::{Path, PathBuf};
1571
1572    use databento::dbn::decode::{DecodeStream, dbn::Decoder};
1573    use fallible_streaming_iterator::FallibleStreamingIterator;
1574    use nautilus_model::instruments::Instrument;
1575    use rstest::*;
1576
1577    use super::*;
1578
1579    fn test_data_path() -> PathBuf {
1580        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
1581    }
1582
1583    #[rstest]
1584    #[case('Y' as c_char, Some(true))]
1585    #[case('N' as c_char, Some(false))]
1586    #[case('X' as c_char, None)]
1587    fn test_parse_optional_bool(#[case] input: c_char, #[case] expected: Option<bool>) {
1588        assert_eq!(parse_optional_bool(input), expected);
1589    }
1590
1591    #[rstest]
1592    #[case('A' as c_char, OrderSide::Sell)]
1593    #[case('B' as c_char, OrderSide::Buy)]
1594    #[case('X' as c_char, OrderSide::NoOrderSide)]
1595    fn test_parse_order_side(#[case] input: c_char, #[case] expected: OrderSide) {
1596        assert_eq!(parse_order_side(input), expected);
1597    }
1598
1599    #[rstest]
1600    #[case('A' as c_char, AggressorSide::Seller)]
1601    #[case('B' as c_char, AggressorSide::Buyer)]
1602    #[case('X' as c_char, AggressorSide::NoAggressor)]
1603    fn test_parse_aggressor_side(#[case] input: c_char, #[case] expected: AggressorSide) {
1604        assert_eq!(parse_aggressor_side(input), expected);
1605    }
1606
1607    #[rstest]
1608    #[case('T' as c_char, true)]
1609    #[case('A' as c_char, false)]
1610    #[case('C' as c_char, false)]
1611    #[case('F' as c_char, false)]
1612    #[case('M' as c_char, false)]
1613    #[case('R' as c_char, false)]
1614    fn test_is_trade_msg(#[case] action: c_char, #[case] expected: bool) {
1615        assert_eq!(is_trade_msg(action), expected);
1616    }
1617
1618    #[rstest]
1619    fn test_derive_cmbp_trade_id_is_deterministic() {
1620        let instrument_id = InstrumentId::from("ES.c.0.GLBX");
1621        let first = derive_cmbp_trade_id(instrument_id, 1, 2, 100, 5, 'B' as c_char);
1622        let second = derive_cmbp_trade_id(instrument_id, 1, 2, 100, 5, 'B' as c_char);
1623        assert_eq!(first, second);
1624    }
1625
1626    #[rstest]
1627    fn test_derive_cmbp_trade_id_format_is_16_hex_chars() {
1628        let instrument_id = InstrumentId::from("ES.c.0.GLBX");
1629        let trade_id = derive_cmbp_trade_id(instrument_id, 0, 0, 0, 0, 'B' as c_char);
1630        let value = trade_id.as_str();
1631        assert_eq!(value.len(), 16);
1632        assert!(
1633            value
1634                .chars()
1635                .all(|c| c.is_ascii_hexdigit() && !c.is_uppercase())
1636        );
1637    }
1638
1639    #[rstest]
1640    #[case::ts_event_changed(
1641        derive_cmbp_trade_id(InstrumentId::from("ES.c.0.GLBX"), 2, 2, 100, 5, 'B' as c_char),
1642    )]
1643    #[case::ts_recv_changed(
1644        derive_cmbp_trade_id(InstrumentId::from("ES.c.0.GLBX"), 1, 3, 100, 5, 'B' as c_char),
1645    )]
1646    #[case::price_changed(
1647        derive_cmbp_trade_id(InstrumentId::from("ES.c.0.GLBX"), 1, 2, 101, 5, 'B' as c_char),
1648    )]
1649    #[case::size_changed(
1650        derive_cmbp_trade_id(InstrumentId::from("ES.c.0.GLBX"), 1, 2, 100, 6, 'B' as c_char),
1651    )]
1652    #[case::side_changed(
1653        derive_cmbp_trade_id(InstrumentId::from("ES.c.0.GLBX"), 1, 2, 100, 5, 'A' as c_char),
1654    )]
1655    #[case::instrument_changed(
1656        derive_cmbp_trade_id(InstrumentId::from("NQ.c.0.GLBX"), 1, 2, 100, 5, 'B' as c_char),
1657    )]
1658    fn test_derive_cmbp_trade_id_each_field_affects_output(#[case] altered: TradeId) {
1659        let baseline = derive_cmbp_trade_id(
1660            InstrumentId::from("ES.c.0.GLBX"),
1661            1,
1662            2,
1663            100,
1664            5,
1665            'B' as c_char,
1666        );
1667        assert_ne!(baseline, altered);
1668    }
1669
1670    #[rstest]
1671    fn test_derive_cmbp_trade_id_field_delimiter_prevents_collision() {
1672        let instrument_id = InstrumentId::from("ES.c.0.GLBX");
1673        // If fields were concatenated without delimiters, these two triples
1674        // would produce the same input stream.
1675        let a = derive_cmbp_trade_id(instrument_id, 0x100, 0, 0, 0, 'B' as c_char);
1676        let b = derive_cmbp_trade_id(instrument_id, 0, 0x100, 0, 0, 'B' as c_char);
1677        assert_ne!(a, b);
1678    }
1679
1680    mod cmbp_trade_id_property_tests {
1681        use proptest::prelude::*;
1682        use rstest::rstest;
1683
1684        use super::*;
1685
1686        proptest! {
1687            #[rstest]
1688            fn prop_derive_cmbp_trade_id_is_stable_for_same_inputs(
1689                ts_event in any::<u64>(),
1690                ts_recv in any::<u64>(),
1691                price in any::<i64>(),
1692                size in any::<u32>(),
1693                side_byte in 0u8..128,
1694            ) {
1695                let instrument_id = InstrumentId::from("ES.c.0.GLBX");
1696                let side = side_byte as c_char;
1697
1698                let first = derive_cmbp_trade_id(
1699                    instrument_id, ts_event, ts_recv, price, size, side,
1700                );
1701                let second = derive_cmbp_trade_id(
1702                    instrument_id, ts_event, ts_recv, price, size, side,
1703                );
1704                prop_assert_eq!(first, second);
1705            }
1706
1707            #[rstest]
1708            fn prop_derive_cmbp_trade_id_output_is_16_hex_chars(
1709                ts_event in any::<u64>(),
1710                ts_recv in any::<u64>(),
1711                price in any::<i64>(),
1712                size in any::<u32>(),
1713                side_byte in 0u8..128,
1714            ) {
1715                let instrument_id = InstrumentId::from("ES.c.0.GLBX");
1716                let side = side_byte as c_char;
1717                let id = derive_cmbp_trade_id(
1718                    instrument_id, ts_event, ts_recv, price, size, side,
1719                );
1720                let value = id.as_str();
1721                prop_assert_eq!(value.len(), 16);
1722                prop_assert!(value.chars().all(|c| c.is_ascii_hexdigit() && !c.is_uppercase()));
1723            }
1724        }
1725    }
1726
1727    #[rstest]
1728    #[case('A' as c_char, Ok(BookAction::Add))]
1729    #[case('C' as c_char, Ok(BookAction::Delete))]
1730    #[case('F' as c_char, Ok(BookAction::Update))]
1731    #[case('M' as c_char, Ok(BookAction::Update))]
1732    #[case('R' as c_char, Ok(BookAction::Clear))]
1733    #[case('X' as c_char, Err("Invalid `BookAction`, was 'X'"))]
1734    fn test_parse_book_action(#[case] input: c_char, #[case] expected: Result<BookAction, &str>) {
1735        match parse_book_action(input) {
1736            Ok(action) => assert_eq!(Ok(action), expected),
1737            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1738        }
1739    }
1740
1741    #[rstest]
1742    #[case('C' as c_char, Ok(OptionKind::Call))]
1743    #[case('P' as c_char, Ok(OptionKind::Put))]
1744    #[case('X' as c_char, Err("Invalid `OptionKind`, was 'X'"))]
1745    fn test_parse_option_kind(#[case] input: c_char, #[case] expected: Result<OptionKind, &str>) {
1746        match parse_option_kind(input) {
1747            Ok(kind) => assert_eq!(Ok(kind), expected),
1748            Err(e) => assert_eq!(Err(e.to_string().as_str()), expected),
1749        }
1750    }
1751
1752    #[rstest]
1753    #[case(Ok("USD"), Currency::USD())]
1754    #[case(Ok("EUR"), Currency::try_from_str("EUR").unwrap())]
1755    #[case(Ok(""), Currency::USD())]
1756    #[case(Err("Error"), Currency::USD())]
1757    fn test_parse_currency_or_usd_default(
1758        #[case] input: Result<&str, &'static str>, // Using `&'static str` for errors
1759        #[case] expected: Currency,
1760    ) {
1761        let actual = parse_currency_or_usd_default(input.map_err(std::io::Error::other));
1762        assert_eq!(actual, expected);
1763    }
1764
1765    #[rstest]
1766    #[case("DII", (Some(AssetClass::Index), Some(InstrumentClass::Future)))]
1767    #[case("EII", (Some(AssetClass::Index), Some(InstrumentClass::Future)))]
1768    #[case("EIA", (Some(AssetClass::Equity), Some(InstrumentClass::Future)))]
1769    #[case("XXX", (None, None))]
1770    #[case("D", (None, None))]
1771    #[case("", (None, None))]
1772    fn test_parse_cfi_iso10926(
1773        #[case] input: &str,
1774        #[case] expected: (Option<AssetClass>, Option<InstrumentClass>),
1775    ) {
1776        let result = parse_cfi_iso10926(input);
1777        assert_eq!(result, expected);
1778    }
1779
1780    #[rstest]
1781    #[case(0, 2, Price::from_raw(0, 2))]
1782    #[case(
1783        1_000_000_000,
1784        2,
1785        Price::from_raw(decode_raw_price_i64(1_000_000_000), 2)
1786    )]
1787    fn test_decode_price(#[case] value: i64, #[case] precision: u8, #[case] expected: Price) {
1788        let actual = decode_price(value, precision, "test_field").unwrap();
1789        assert_eq!(actual, expected);
1790    }
1791
1792    #[rstest]
1793    fn test_decode_price_undefined_errors() {
1794        let result = decode_price(i64::MAX, 2, "strike_price");
1795        assert!(result.is_err());
1796        assert!(result.unwrap_err().to_string().contains("strike_price"));
1797    }
1798
1799    #[rstest]
1800    #[case(0, 0)]
1801    #[case(1, 9)] // 0.000000001 needs 9 decimal places
1802    #[case(10, 8)] // 0.00000001 needs 8
1803    #[case(3_906_250, 8)] // ZT: 1/256 = 0.00390625
1804    #[case(7_812_500, 7)] // ZF: 1/128 = 0.0078125
1805    #[case(15_625_000, 6)] // ZN: 1/64 = 0.015625
1806    #[case(31_250_000, 5)] // ZB: 1/32 = 0.03125
1807    #[case(250_000_000, 2)] // ES: 0.25
1808    #[case(1_000_000_000, 0)] // 1.0
1809    #[case(10_000_000_000, 0)] // 10.0
1810    fn test_precision_from_raw(#[case] value: i64, #[case] expected: u8) {
1811        assert_eq!(precision_from_raw(value), expected);
1812    }
1813
1814    #[rstest]
1815    #[case(0, 2, Price::new(0.01, 2))] // Default for 0
1816    #[case(i64::MAX, 2, Price::new(0.01, 2))] // Default for i64::MAX
1817    #[case(
1818        10_000_000_000,
1819        2,
1820        Price::from_raw(decode_raw_price_i64(10_000_000_000), 2)
1821    )] // 10.0: derived=0, max(0,2)=2
1822    #[case(3_906_250, 2, Price::from_raw(decode_raw_price_i64(3_906_250), 8))] // ZT 1/256: derived=8, max(8,2)=8
1823    #[case(7_812_500, 2, Price::from_raw(decode_raw_price_i64(7_812_500), 7))] // ZF 1/128: derived=7, max(7,2)=7
1824    #[case(15_625_000, 2, Price::from_raw(decode_raw_price_i64(15_625_000), 6))] // ZN 1/64: derived=6, max(6,2)=6
1825    #[case(31_250_000, 2, Price::from_raw(decode_raw_price_i64(31_250_000), 5))] // ZB 1/32: derived=5, max(5,2)=5
1826    #[case(250_000_000, 2, Price::from_raw(decode_raw_price_i64(250_000_000), 2))] // ES 0.25: derived=2, max(2,2)=2
1827    fn test_decode_price_increment(
1828        #[case] value: i64,
1829        #[case] precision: u8,
1830        #[case] expected: Price,
1831    ) {
1832        let actual = decode_price_increment(value, precision);
1833        assert_eq!(actual, expected);
1834    }
1835
1836    #[rstest]
1837    #[case(i64::MAX, 2, None)] // None for i64::MAX
1838    #[case(0, 2, Some(Price::from_raw(0, 2)))] // 0 is valid here
1839    #[case(
1840        10_000_000_000,
1841        2,
1842        Some(Price::from_raw(decode_raw_price_i64(10_000_000_000), 2))
1843    )]
1844    fn test_decode_optional_price(
1845        #[case] value: i64,
1846        #[case] precision: u8,
1847        #[case] expected: Option<Price>,
1848    ) {
1849        let actual = decode_optional_price(value, precision);
1850        assert_eq!(actual, expected);
1851    }
1852
1853    #[rstest]
1854    #[case(0, 2, Price::from_raw(0, 2))]
1855    #[case(
1856        1_000_000_000,
1857        2,
1858        Price::from_raw(decode_raw_price_i64(1_000_000_000), 2)
1859    )]
1860    #[case(i64::MAX, 2, Price::from_raw(PRICE_UNDEF, 0))] // Sentinel becomes PRICE_UNDEF
1861    fn test_decode_price_or_undef(
1862        #[case] value: i64,
1863        #[case] precision: u8,
1864        #[case] expected: Price,
1865    ) {
1866        let actual = decode_price_or_undef(value, precision);
1867        assert_eq!(actual, expected);
1868    }
1869
1870    #[rstest]
1871    #[case(i64::MAX, None)] // None for i32::MAX
1872    #[case(0, Some(Quantity::new(0.0, 0)))] // 0 is valid quantity
1873    #[case(10, Some(Quantity::new(10.0, 0)))] // Arbitrary valid quantity
1874    fn test_decode_optional_quantity(#[case] value: i64, #[case] expected: Option<Quantity>) {
1875        let actual = decode_optional_quantity(value);
1876        assert_eq!(actual, expected);
1877    }
1878
1879    #[rstest]
1880    #[case(0, UnixNanos::from(0))]
1881    #[case(1_000_000_000, UnixNanos::from(1_000_000_000))]
1882    fn test_decode_timestamp(#[case] value: u64, #[case] expected: UnixNanos) {
1883        let actual = decode_timestamp(value, "test_field").unwrap();
1884        assert_eq!(actual, expected);
1885    }
1886
1887    #[rstest]
1888    fn test_decode_timestamp_undefined_errors() {
1889        let result = decode_timestamp(dbn::UNDEF_TIMESTAMP, "expiration");
1890        assert!(result.is_err());
1891        assert!(result.unwrap_err().to_string().contains("expiration"));
1892    }
1893
1894    #[rstest]
1895    #[case(0, Some(UnixNanos::from(0)))]
1896    #[case(1_000_000_000, Some(UnixNanos::from(1_000_000_000)))]
1897    #[case(dbn::UNDEF_TIMESTAMP, None)]
1898    fn test_decode_optional_timestamp(#[case] value: u64, #[case] expected: Option<UnixNanos>) {
1899        let actual = decode_optional_timestamp(value);
1900        assert_eq!(actual, expected);
1901    }
1902
1903    #[rstest]
1904    #[case(0, Quantity::from(1))] // Default fallback for 0
1905    #[case(i64::MAX, Quantity::from(1))] // Default fallback for i64::MAX
1906    #[case(50_000_000_000, Quantity::from("50"))] // 50.0 exactly
1907    #[case(12_500_000_000, Quantity::from("12.5"))] // 12.5 exactly
1908    #[case(1_000_000_000, Quantity::from("1"))] // 1.0 exactly
1909    #[case(1, Quantity::from("0.000000001"))] // Smallest positive value
1910    #[case(1_000_000_001, Quantity::from("1.000000001"))] // Just over 1.0
1911    #[case(999_999_999, Quantity::from("0.999999999"))] // Just under 1.0
1912    #[case(123_456_789_000, Quantity::from("123.456789"))] // Trailing zeros trimmed
1913    fn test_decode_multiplier_precise(#[case] raw: i64, #[case] expected: Quantity) {
1914        assert_eq!(decode_multiplier(raw).unwrap(), expected);
1915    }
1916
1917    #[rstest]
1918    #[case(-1_500_000_000)] // Large negative value
1919    #[case(-1)] // Small negative value
1920    #[case(-999_999_999)] // Another negative value
1921    fn test_decode_multiplier_negative_error(#[case] raw: i64) {
1922        let result = decode_multiplier(raw);
1923        assert!(result.is_err());
1924        assert!(
1925            result
1926                .unwrap_err()
1927                .to_string()
1928                .contains("Invalid negative multiplier")
1929        );
1930    }
1931
1932    #[rstest]
1933    #[case(100, Quantity::from(100))]
1934    #[case(1000, Quantity::from(1000))]
1935    #[case(5, Quantity::from(5))]
1936    fn test_decode_quantity(#[case] value: u64, #[case] expected: Quantity) {
1937        assert_eq!(decode_quantity(value), expected);
1938    }
1939
1940    #[rstest]
1941    #[case(0, Quantity::from(1))] // Default for 0
1942    #[case(i32::MAX, Quantity::from(1))] // Default for MAX
1943    #[case(100, Quantity::from(100))]
1944    #[case(1, Quantity::from(1))]
1945    #[case(1000, Quantity::from(1000))]
1946    fn test_decode_lot_size(#[case] value: i32, #[case] expected: Quantity) {
1947        assert_eq!(decode_lot_size(value), expected);
1948    }
1949
1950    #[rstest]
1951    #[case(0, None)] // None for 0
1952    #[case(1, Some(Ustr::from("Scheduled")))]
1953    #[case(2, Some(Ustr::from("Surveillance intervention")))]
1954    #[case(3, Some(Ustr::from("Market event")))]
1955    #[case(10, Some(Ustr::from("Regulatory")))]
1956    #[case(30, Some(Ustr::from("News pending")))]
1957    #[case(40, Some(Ustr::from("Order imbalance")))]
1958    #[case(50, Some(Ustr::from("LULD pause")))]
1959    #[case(60, Some(Ustr::from("Operational")))]
1960    #[case(100, Some(Ustr::from("Corporate action")))]
1961    #[case(120, Some(Ustr::from("Market wide halt level 1")))]
1962    fn test_parse_status_reason(#[case] value: u16, #[case] expected: Option<Ustr>) {
1963        assert_eq!(parse_status_reason(value).unwrap(), expected);
1964    }
1965
1966    #[rstest]
1967    #[case(999)] // Invalid code
1968    fn test_parse_status_reason_invalid(#[case] value: u16) {
1969        assert!(parse_status_reason(value).is_err());
1970    }
1971
1972    #[rstest]
1973    #[case(0, None)] // None for 0
1974    #[case(1, Some(Ustr::from("No cancel")))]
1975    #[case(2, Some(Ustr::from("Change trading session")))]
1976    #[case(3, Some(Ustr::from("Implied matching on")))]
1977    #[case(4, Some(Ustr::from("Implied matching off")))]
1978    fn test_parse_status_trading_event(#[case] value: u16, #[case] expected: Option<Ustr>) {
1979        assert_eq!(parse_status_trading_event(value).unwrap(), expected);
1980    }
1981
1982    #[rstest]
1983    #[case(5)] // Invalid code
1984    #[case(100)] // Invalid code
1985    fn test_parse_status_trading_event_invalid(#[case] value: u16) {
1986        assert!(parse_status_trading_event(value).is_err());
1987    }
1988
1989    #[rstest]
1990    fn test_decode_mbo_msg() {
1991        let path = test_data_path().join("test_data.mbo.dbn.zst");
1992        let mut dbn_stream = Decoder::from_zstd_file(path)
1993            .unwrap()
1994            .decode_stream::<dbn::MboMsg>();
1995        let msg = dbn_stream.next().unwrap().unwrap();
1996
1997        let instrument_id = InstrumentId::from("ESM4.GLBX");
1998        let (delta, _) = decode_mbo_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
1999        let delta = delta.unwrap();
2000
2001        assert_eq!(delta.instrument_id, instrument_id);
2002        assert_eq!(delta.action, BookAction::Delete);
2003        assert_eq!(delta.order.side, OrderSide::Sell);
2004        assert_eq!(delta.order.price, Price::from("3722.75"));
2005        assert_eq!(delta.order.size, Quantity::from("1"));
2006        assert_eq!(delta.order.order_id, 647_784_973_705);
2007        assert_eq!(delta.flags, 128);
2008        assert_eq!(delta.sequence, 1_170_352);
2009        assert_eq!(delta.ts_event, msg.ts_recv);
2010        assert_eq!(delta.ts_event, 1_609_160_400_000_704_060);
2011        assert_eq!(delta.ts_init, 0);
2012    }
2013
2014    #[rstest]
2015    fn test_decode_mbo_msg_clear_action() {
2016        // Create an MBO message with Clear action (action='R', side='N')
2017        let ts_recv = 1_609_160_400_000_000_000;
2018        let msg = dbn::MboMsg {
2019            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
2020            order_id: 0,
2021            price: i64::MAX,
2022            size: 0,
2023            flags: dbn::FlagSet::empty(),
2024            channel_id: 0,
2025            action: 'R' as c_char,
2026            side: 'N' as c_char, // NoOrderSide for Clear
2027            ts_recv,
2028            ts_in_delta: 0,
2029            sequence: 1_000_000,
2030        };
2031
2032        let instrument_id = InstrumentId::from("ESM4.GLBX");
2033        let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
2034
2035        // Clear messages should produce OrderBookDelta, not TradeTick
2036        assert!(trade.is_none());
2037        let delta = delta.expect("Clear action should produce OrderBookDelta");
2038
2039        assert_eq!(delta.instrument_id, instrument_id);
2040        assert_eq!(delta.action, BookAction::Clear);
2041        assert_eq!(delta.order.side, OrderSide::NoOrderSide);
2042        assert_eq!(delta.order.size, Quantity::from("0"));
2043        assert_eq!(delta.order.order_id, 0);
2044        assert_eq!(delta.sequence, 1_000_000);
2045        assert_eq!(delta.ts_event, ts_recv);
2046        assert_eq!(delta.ts_init, 0);
2047        assert!(delta.order.price.is_undefined());
2048        assert_eq!(delta.order.price.precision, 0);
2049    }
2050
2051    #[rstest]
2052    fn test_decode_mbo_msg_price_undef_with_precision() {
2053        // Test that PRICE_UNDEF (i64::MAX) forces precision to 0 even when price_precision is non-zero
2054        let ts_recv = 1_609_160_400_000_000_000;
2055        let msg = dbn::MboMsg {
2056            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
2057            order_id: 0,
2058            price: i64::MAX, // PRICE_UNDEF
2059            size: 0,
2060            flags: dbn::FlagSet::empty(),
2061            channel_id: 0,
2062            action: 'R' as c_char, // Clear
2063            side: 'N' as c_char,   // NoOrderSide
2064            ts_recv,
2065            ts_in_delta: 0,
2066            sequence: 0,
2067        };
2068
2069        let instrument_id = InstrumentId::from("ESM4.GLBX");
2070        let (delta, _) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
2071        let delta = delta.unwrap();
2072
2073        assert!(delta.order.price.is_undefined());
2074        assert_eq!(delta.order.price.precision, 0);
2075        assert_eq!(delta.order.price.raw, PRICE_UNDEF);
2076    }
2077
2078    #[rstest]
2079    fn test_decode_mbo_msg_no_order_side_update() {
2080        // MBO messages with NoOrderSide are now passed through to the book
2081        // The book will resolve the side from its cache using the order_id
2082        let ts_recv = 1_609_160_400_000_000_000;
2083        let msg = dbn::MboMsg {
2084            hd: dbn::RecordHeader::new::<dbn::MboMsg>(1, 1, ts_recv as u32, 0),
2085            order_id: 123_456_789,
2086            price: 4_800_250_000_000, // $4800.25 with precision 2
2087            size: 1,
2088            flags: dbn::FlagSet::empty(),
2089            channel_id: 1,
2090            action: 'M' as c_char, // Modify/Update action
2091            side: 'N' as c_char,   // NoOrderSide
2092            ts_recv,
2093            ts_in_delta: 0,
2094            sequence: 1_000_000,
2095        };
2096
2097        let instrument_id = InstrumentId::from("ESM4.GLBX");
2098        let (delta, trade) = decode_mbo_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
2099
2100        // Delta should be created with NoOrderSide (book will resolve it)
2101        assert!(delta.is_some());
2102        assert!(trade.is_none());
2103        let delta = delta.unwrap();
2104        assert_eq!(delta.order.side, OrderSide::NoOrderSide);
2105        assert_eq!(delta.order.order_id, 123_456_789);
2106        assert_eq!(delta.action, BookAction::Update);
2107    }
2108
2109    #[rstest]
2110    fn test_decode_mbp1_msg() {
2111        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
2112        let mut dbn_stream = Decoder::from_zstd_file(path)
2113            .unwrap()
2114            .decode_stream::<dbn::Mbp1Msg>();
2115        let msg = dbn_stream.next().unwrap().unwrap();
2116
2117        let instrument_id = InstrumentId::from("ESM4.GLBX");
2118        let (maybe_quote, _) =
2119            decode_mbp1_msg(msg, instrument_id, 2, Some(0.into()), false).unwrap();
2120        let quote = maybe_quote.expect("Expected valid quote");
2121
2122        assert_eq!(quote.instrument_id, instrument_id);
2123        assert_eq!(quote.bid_price, Price::from("3720.25"));
2124        assert_eq!(quote.ask_price, Price::from("3720.50"));
2125        assert_eq!(quote.bid_size, Quantity::from("24"));
2126        assert_eq!(quote.ask_size, Quantity::from("11"));
2127        assert_eq!(quote.ts_event, msg.ts_recv);
2128        assert_eq!(quote.ts_event, 1_609_160_400_006_136_329);
2129        assert_eq!(quote.ts_init, 0);
2130    }
2131
2132    #[rstest]
2133    fn test_decode_mbp1_msg_undefined_ask_skips_quote() {
2134        let ts_recv = 1_609_160_400_000_000_000;
2135        let msg = dbn::Mbp1Msg {
2136            hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
2137            price: 3_720_250_000_000, // Valid trade price
2138            size: 5,
2139            action: 'A' as c_char,
2140            side: 'B' as c_char,
2141            flags: dbn::FlagSet::empty(),
2142            depth: 0,
2143            ts_recv,
2144            ts_in_delta: 0,
2145            sequence: 1_170_352,
2146            levels: [dbn::BidAskPair {
2147                bid_px: 3_720_250_000_000, // Valid bid price
2148                ask_px: i64::MAX,          // Undefined ask price
2149                bid_sz: 24,
2150                ask_sz: 0,
2151                bid_ct: 1,
2152                ask_ct: 0,
2153            }],
2154        };
2155
2156        let instrument_id = InstrumentId::from("ESM4.GLBX");
2157        let (maybe_quote, _) =
2158            decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
2159
2160        // Quote should be None because ask price is undefined
2161        assert!(maybe_quote.is_none());
2162    }
2163
2164    #[rstest]
2165    fn test_decode_mbp1_msg_undefined_bid_skips_quote() {
2166        let ts_recv = 1_609_160_400_000_000_000;
2167        let msg = dbn::Mbp1Msg {
2168            hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
2169            price: 3_720_500_000_000, // Valid trade price
2170            size: 5,
2171            action: 'A' as c_char,
2172            side: 'A' as c_char,
2173            flags: dbn::FlagSet::empty(),
2174            depth: 0,
2175            ts_recv,
2176            ts_in_delta: 0,
2177            sequence: 1_170_352,
2178            levels: [dbn::BidAskPair {
2179                bid_px: i64::MAX,          // Undefined bid price
2180                ask_px: 3_720_500_000_000, // Valid ask price
2181                bid_sz: 0,
2182                ask_sz: 11,
2183                bid_ct: 0,
2184                ask_ct: 1,
2185            }],
2186        };
2187
2188        let instrument_id = InstrumentId::from("ESM4.GLBX");
2189        let (maybe_quote, _) =
2190            decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), false).unwrap();
2191
2192        // Quote should be None because bid price is undefined
2193        assert!(maybe_quote.is_none());
2194    }
2195
2196    #[rstest]
2197    fn test_decode_mbp1_msg_trade_still_returned_with_undefined_prices() {
2198        let ts_recv = 1_609_160_400_000_000_000;
2199        let msg = dbn::Mbp1Msg {
2200            hd: dbn::RecordHeader::new::<dbn::Mbp1Msg>(1, 1, ts_recv as u32, 0),
2201            price: 3_720_250_000_000, // Valid trade price
2202            size: 5,
2203            action: 'T' as c_char, // Trade action
2204            side: 'A' as c_char,
2205            flags: dbn::FlagSet::empty(),
2206            depth: 0,
2207            ts_recv,
2208            ts_in_delta: 0,
2209            sequence: 1_170_352,
2210            levels: [dbn::BidAskPair {
2211                bid_px: i64::MAX, // Undefined bid
2212                ask_px: i64::MAX, // Undefined ask
2213                bid_sz: 0,
2214                ask_sz: 0,
2215                bid_ct: 0,
2216                ask_ct: 0,
2217            }],
2218        };
2219
2220        let instrument_id = InstrumentId::from("ESM4.GLBX");
2221        let (maybe_quote, maybe_trade) =
2222            decode_mbp1_msg(&msg, instrument_id, 2, Some(0.into()), true).unwrap();
2223
2224        // Quote should be None because both prices are undefined
2225        assert!(maybe_quote.is_none());
2226
2227        // Trade should still be present
2228        let trade = maybe_trade.expect("Expected trade");
2229        assert_eq!(trade.instrument_id, instrument_id);
2230        assert_eq!(trade.price, Price::from("3720.25"));
2231        assert_eq!(trade.size, Quantity::from("5"));
2232    }
2233
2234    #[rstest]
2235    fn test_decode_bbo_1s_msg() {
2236        let path = test_data_path().join("test_data.bbo-1s.dbn.zst");
2237        let mut dbn_stream = Decoder::from_zstd_file(path)
2238            .unwrap()
2239            .decode_stream::<dbn::BboMsg>();
2240        let msg = dbn_stream.next().unwrap().unwrap();
2241
2242        let instrument_id = InstrumentId::from("ESM4.GLBX");
2243        let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2244        let quote = maybe_quote.expect("Expected valid quote");
2245
2246        assert_eq!(quote.instrument_id, instrument_id);
2247        assert_eq!(quote.bid_price, Price::from("3702.25"));
2248        assert_eq!(quote.ask_price, Price::from("3702.75"));
2249        assert_eq!(quote.bid_size, Quantity::from("18"));
2250        assert_eq!(quote.ask_size, Quantity::from("13"));
2251        assert_eq!(quote.ts_event, msg.ts_recv);
2252        assert_eq!(quote.ts_event, 1609113600000000000);
2253        assert_eq!(quote.ts_init, 0);
2254    }
2255
2256    #[rstest]
2257    fn test_decode_bbo_1m_msg() {
2258        let path = test_data_path().join("test_data.bbo-1m.dbn.zst");
2259        let mut dbn_stream = Decoder::from_zstd_file(path)
2260            .unwrap()
2261            .decode_stream::<dbn::BboMsg>();
2262        let msg = dbn_stream.next().unwrap().unwrap();
2263
2264        let instrument_id = InstrumentId::from("ESM4.GLBX");
2265        let maybe_quote = decode_bbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2266        let quote = maybe_quote.expect("Expected valid quote");
2267
2268        assert_eq!(quote.instrument_id, instrument_id);
2269        assert_eq!(quote.bid_price, Price::from("3702.25"));
2270        assert_eq!(quote.ask_price, Price::from("3702.75"));
2271        assert_eq!(quote.bid_size, Quantity::from("18"));
2272        assert_eq!(quote.ask_size, Quantity::from("13"));
2273        assert_eq!(quote.ts_event, msg.ts_recv);
2274        assert_eq!(quote.ts_event, 1609113600000000000);
2275        assert_eq!(quote.ts_init, 0);
2276    }
2277
2278    #[rstest]
2279    fn test_decode_mbp10_msg() {
2280        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
2281        let mut dbn_stream = Decoder::from_zstd_file(path)
2282            .unwrap()
2283            .decode_stream::<dbn::Mbp10Msg>();
2284        let msg = dbn_stream.next().unwrap().unwrap();
2285
2286        let instrument_id = InstrumentId::from("ESM4.GLBX");
2287        let depth10 = decode_mbp10_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2288
2289        assert_eq!(depth10.instrument_id, instrument_id);
2290        assert_eq!(depth10.bids.len(), 10);
2291        assert_eq!(depth10.asks.len(), 10);
2292        assert_eq!(depth10.bid_counts.len(), 10);
2293        assert_eq!(depth10.ask_counts.len(), 10);
2294        assert_eq!(depth10.flags, 128);
2295        assert_eq!(depth10.sequence, 1_170_352);
2296        assert_eq!(depth10.ts_event, msg.ts_recv);
2297        assert_eq!(depth10.ts_event, 1_609_160_400_000_704_060);
2298        assert_eq!(depth10.ts_init, 0);
2299    }
2300
2301    #[rstest]
2302    fn test_decode_trade_msg() {
2303        let path = test_data_path().join("test_data.trades.dbn.zst");
2304        let mut dbn_stream = Decoder::from_zstd_file(path)
2305            .unwrap()
2306            .decode_stream::<dbn::TradeMsg>();
2307        let msg = dbn_stream.next().unwrap().unwrap();
2308
2309        let instrument_id = InstrumentId::from("ESM4.GLBX");
2310        let trade = decode_trade_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2311
2312        assert_eq!(trade.instrument_id, instrument_id);
2313        assert_eq!(trade.price, Price::from("3720.25"));
2314        assert_eq!(trade.size, Quantity::from("5"));
2315        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2316        assert_eq!(trade.trade_id.to_string(), "1170380");
2317        assert_eq!(trade.ts_event, msg.ts_recv);
2318        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2319        assert_eq!(trade.ts_init, 0);
2320    }
2321
2322    #[rstest]
2323    fn test_decode_tbbo_msg() {
2324        let path = test_data_path().join("test_data.tbbo.dbn.zst");
2325        let mut dbn_stream = Decoder::from_zstd_file(path)
2326            .unwrap()
2327            .decode_stream::<dbn::Mbp1Msg>();
2328        let msg = dbn_stream.next().unwrap().unwrap();
2329
2330        let instrument_id = InstrumentId::from("ESM4.GLBX");
2331        let (maybe_quote, trade) = decode_tbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2332        let quote = maybe_quote.expect("Expected valid quote");
2333
2334        assert_eq!(quote.instrument_id, instrument_id);
2335        assert_eq!(quote.bid_price, Price::from("3720.25"));
2336        assert_eq!(quote.ask_price, Price::from("3720.50"));
2337        assert_eq!(quote.bid_size, Quantity::from("26"));
2338        assert_eq!(quote.ask_size, Quantity::from("7"));
2339        assert_eq!(quote.ts_event, msg.ts_recv);
2340        assert_eq!(quote.ts_event, 1_609_160_400_099_150_057);
2341        assert_eq!(quote.ts_init, 0);
2342
2343        assert_eq!(trade.instrument_id, instrument_id);
2344        assert_eq!(trade.price, Price::from("3720.25"));
2345        assert_eq!(trade.size, Quantity::from("5"));
2346        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
2347        assert_eq!(trade.trade_id.to_string(), "1170380");
2348        assert_eq!(trade.ts_event, msg.ts_recv);
2349        assert_eq!(trade.ts_event, 1_609_160_400_099_150_057);
2350        assert_eq!(trade.ts_init, 0);
2351    }
2352
2353    #[rstest]
2354    fn test_decode_ohlcv_msg() {
2355        let path = test_data_path().join("test_data.ohlcv-1s.dbn.zst");
2356        let mut dbn_stream = Decoder::from_zstd_file(path)
2357            .unwrap()
2358            .decode_stream::<dbn::OhlcvMsg>();
2359        let msg = dbn_stream.next().unwrap().unwrap();
2360
2361        let instrument_id = InstrumentId::from("ESM4.GLBX");
2362        let bar = decode_ohlcv_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2363
2364        assert_eq!(
2365            bar.bar_type,
2366            BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL")
2367        );
2368        assert_eq!(bar.open, Price::from("372025.00"));
2369        assert_eq!(bar.high, Price::from("372050.00"));
2370        assert_eq!(bar.low, Price::from("372025.00"));
2371        assert_eq!(bar.close, Price::from("372050.00"));
2372        assert_eq!(bar.volume, Quantity::from("57"));
2373        assert_eq!(bar.ts_event, msg.hd.ts_event + BAR_CLOSE_ADJUSTMENT_1S); // timestamp_on_close=true
2374        assert_eq!(bar.ts_init, 0); // ts_init was Some(0)
2375    }
2376
2377    #[rstest]
2378    fn test_decode_definition_msg() {
2379        let path = test_data_path().join("test_data.definition.dbn.zst");
2380        let mut dbn_stream = Decoder::from_zstd_file(path)
2381            .unwrap()
2382            .decode_stream::<dbn::InstrumentDefMsg>();
2383        let msg = dbn_stream.next().unwrap().unwrap();
2384
2385        let instrument_id = InstrumentId::from("ESM4.GLBX");
2386        let result = decode_instrument_def_msg(msg, instrument_id, Some(0.into()));
2387
2388        assert!(result.is_ok());
2389        assert_eq!(result.unwrap().multiplier(), Quantity::from(1));
2390    }
2391
2392    #[rstest]
2393    fn test_decode_status_msg() {
2394        let path = test_data_path().join("test_data.status.dbn.zst");
2395        let mut dbn_stream = Decoder::from_zstd_file(path)
2396            .unwrap()
2397            .decode_stream::<dbn::StatusMsg>();
2398        let msg = dbn_stream.next().unwrap().unwrap();
2399
2400        let instrument_id = InstrumentId::from("ESM4.GLBX");
2401        let status = decode_status_msg(msg, instrument_id, Some(0.into())).unwrap();
2402
2403        assert_eq!(status.instrument_id, instrument_id);
2404        assert_eq!(status.action, MarketStatusAction::Trading);
2405        assert_eq!(status.ts_event, msg.hd.ts_event);
2406        assert_eq!(status.ts_init, 0);
2407        assert_eq!(status.reason, Some(Ustr::from("Scheduled")));
2408        assert_eq!(status.trading_event, None);
2409        assert_eq!(status.is_trading, Some(true));
2410        assert_eq!(status.is_quoting, Some(true));
2411        assert_eq!(status.is_short_sell_restricted, None);
2412    }
2413
2414    #[rstest]
2415    fn test_decode_imbalance_msg() {
2416        let path = test_data_path().join("test_data.imbalance.dbn.zst");
2417        let mut dbn_stream = Decoder::from_zstd_file(path)
2418            .unwrap()
2419            .decode_stream::<dbn::ImbalanceMsg>();
2420        let msg = dbn_stream.next().unwrap().unwrap();
2421
2422        let instrument_id = InstrumentId::from("ESM4.GLBX");
2423        let imbalance = decode_imbalance_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2424
2425        assert_eq!(imbalance.instrument_id, instrument_id);
2426        assert_eq!(imbalance.ref_price, Price::from("229.43"));
2427        assert_eq!(imbalance.cont_book_clr_price, Price::from("0.00"));
2428        assert_eq!(imbalance.auct_interest_clr_price, Price::from("0.00"));
2429        assert_eq!(imbalance.paired_qty, Quantity::from("0"));
2430        assert_eq!(imbalance.total_imbalance_qty, Quantity::from("2000"));
2431        assert_eq!(imbalance.side, OrderSide::Buy);
2432        assert_eq!(imbalance.significant_imbalance, 126);
2433        assert_eq!(imbalance.ts_event, msg.hd.ts_event);
2434        assert_eq!(imbalance.ts_recv, msg.ts_recv);
2435        assert_eq!(imbalance.ts_init, 0);
2436    }
2437
2438    #[rstest]
2439    fn test_decode_statistics_msg() {
2440        let path = test_data_path().join("test_data.statistics.dbn.zst");
2441        let mut dbn_stream = Decoder::from_zstd_file(path)
2442            .unwrap()
2443            .decode_stream::<dbn::StatMsg>();
2444        let msg = dbn_stream.next().unwrap().unwrap();
2445
2446        let instrument_id = InstrumentId::from("ESM4.GLBX");
2447        let statistics = decode_statistics_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2448
2449        assert_eq!(statistics.instrument_id, instrument_id);
2450        assert_eq!(statistics.stat_type, DatabentoStatisticType::LowestOffer);
2451        assert_eq!(
2452            statistics.update_action,
2453            DatabentoStatisticUpdateAction::Added
2454        );
2455        assert_eq!(statistics.price, Some(Price::from("100.00")));
2456        assert_eq!(statistics.quantity, None);
2457        assert_eq!(statistics.channel_id, 13);
2458        assert_eq!(statistics.stat_flags, 255);
2459        assert_eq!(statistics.sequence, 2);
2460        assert_eq!(statistics.ts_ref, 18_446_744_073_709_551_615);
2461        assert_eq!(statistics.ts_in_delta, 26961);
2462        assert_eq!(statistics.ts_event, msg.hd.ts_event);
2463        assert_eq!(statistics.ts_recv, msg.ts_recv);
2464        assert_eq!(statistics.ts_init, 0);
2465    }
2466
2467    #[rstest]
2468    fn test_decode_cmbp1_msg() {
2469        let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
2470        let mut dbn_stream = Decoder::from_zstd_file(path)
2471            .unwrap()
2472            .decode_stream::<dbn::Cmbp1Msg>();
2473        let msg = dbn_stream.next().unwrap().unwrap();
2474
2475        let instrument_id = InstrumentId::from("ESM4.GLBX");
2476        let (maybe_quote, trade) =
2477            decode_cmbp1_msg(msg, instrument_id, 2, Some(0.into()), true).unwrap();
2478        let quote = maybe_quote.expect("Expected valid quote");
2479
2480        assert_eq!(quote.instrument_id, instrument_id);
2481        assert!(quote.bid_price.raw > 0);
2482        assert!(quote.ask_price.raw > 0);
2483        assert!(quote.bid_size.raw > 0);
2484        assert!(quote.ask_size.raw > 0);
2485        assert_eq!(quote.ts_event, msg.ts_recv);
2486        assert_eq!(quote.ts_init, 0);
2487
2488        // Check if trade is present based on action
2489        if is_trade_msg(msg.action) {
2490            assert!(trade.is_some());
2491            let trade = trade.unwrap();
2492            assert_eq!(trade.instrument_id, instrument_id);
2493        } else {
2494            assert!(trade.is_none());
2495        }
2496    }
2497
2498    #[rstest]
2499    fn test_decode_cbbo_1s_msg() {
2500        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2501        let mut dbn_stream = Decoder::from_zstd_file(path)
2502            .unwrap()
2503            .decode_stream::<dbn::CbboMsg>();
2504        let msg = dbn_stream.next().unwrap().unwrap();
2505
2506        let instrument_id = InstrumentId::from("ESM4.GLBX");
2507        let maybe_quote = decode_cbbo_msg(msg, instrument_id, 2, Some(0.into())).unwrap();
2508        let quote = maybe_quote.expect("Expected valid quote");
2509
2510        assert_eq!(quote.instrument_id, instrument_id);
2511        assert!(quote.bid_price.raw > 0);
2512        assert!(quote.ask_price.raw > 0);
2513        assert!(quote.bid_size.raw > 0);
2514        assert!(quote.ask_size.raw > 0);
2515        assert_eq!(quote.ts_event, msg.ts_recv);
2516        assert_eq!(quote.ts_init, 0);
2517    }
2518
2519    #[rstest]
2520    fn test_decode_mbp10_msg_with_all_levels() {
2521        let mut msg = dbn::Mbp10Msg::default();
2522        for i in 0..10 {
2523            msg.levels[i].bid_px = 100_000_000_000 - i as i64 * 10_000_000;
2524            msg.levels[i].ask_px = 100_010_000_000 + i as i64 * 10_000_000;
2525            msg.levels[i].bid_sz = 10 + i as u32;
2526            msg.levels[i].ask_sz = 10 + i as u32;
2527            msg.levels[i].bid_ct = 1 + i as u32;
2528            msg.levels[i].ask_ct = 1 + i as u32;
2529        }
2530        msg.ts_recv = 1_609_160_400_000_704_060;
2531
2532        let instrument_id = InstrumentId::from("TEST.VENUE");
2533        let result = decode_mbp10_msg(&msg, instrument_id, 2, None);
2534
2535        assert!(result.is_ok());
2536        let depth = result.unwrap();
2537        assert_eq!(depth.bids.len(), 10);
2538        assert_eq!(depth.asks.len(), 10);
2539        assert_eq!(depth.bid_counts.len(), 10);
2540        assert_eq!(depth.ask_counts.len(), 10);
2541    }
2542
2543    #[rstest]
2544    fn test_array_conversion_error_handling() {
2545        let mut bids = Vec::new();
2546        let mut asks = Vec::new();
2547
2548        // Intentionally create fewer than DEPTH10_LEN elements
2549        for i in 0..5 {
2550            bids.push(BookOrder::new(
2551                OrderSide::Buy,
2552                Price::from(format!("{}.00", 100 - i)),
2553                Quantity::from(10),
2554                i as u64,
2555            ));
2556            asks.push(BookOrder::new(
2557                OrderSide::Sell,
2558                Price::from(format!("{}.00", 101 + i)),
2559                Quantity::from(10),
2560                i as u64,
2561            ));
2562        }
2563
2564        let result: Result<[BookOrder; DEPTH10_LEN], _> =
2565            bids.try_into().map_err(|v: Vec<BookOrder>| {
2566                anyhow::anyhow!(
2567                    "Expected exactly {DEPTH10_LEN} bid levels, received {}",
2568                    v.len()
2569                )
2570            });
2571        assert!(result.is_err());
2572        assert!(
2573            result
2574                .unwrap_err()
2575                .to_string()
2576                .contains("Expected exactly 10 bid levels, received 5")
2577        );
2578    }
2579
2580    #[rstest]
2581    fn test_decode_tcbbo_msg() {
2582        // Use cbbo-1s as base since cbbo.dbn.zst was invalid
2583        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
2584        let mut dbn_stream = Decoder::from_zstd_file(path)
2585            .unwrap()
2586            .decode_stream::<dbn::CbboMsg>();
2587        let msg = dbn_stream.next().unwrap().unwrap();
2588
2589        // Simulate TCBBO by adding trade data
2590        let mut tcbbo_msg = msg.clone();
2591        tcbbo_msg.price = 3702500000000;
2592        tcbbo_msg.size = 10;
2593
2594        let instrument_id = InstrumentId::from("ESM4.GLBX");
2595        let (maybe_quote, trade) =
2596            decode_tcbbo_msg(&tcbbo_msg, instrument_id, 2, Some(0.into())).unwrap();
2597        let quote = maybe_quote.expect("Expected valid quote");
2598
2599        assert_eq!(quote.instrument_id, instrument_id);
2600        assert!(quote.bid_price.raw > 0);
2601        assert!(quote.ask_price.raw > 0);
2602        assert!(quote.bid_size.raw > 0);
2603        assert!(quote.ask_size.raw > 0);
2604        assert_eq!(quote.ts_event, tcbbo_msg.ts_recv);
2605        assert_eq!(quote.ts_init, 0);
2606
2607        assert_eq!(trade.instrument_id, instrument_id);
2608        assert_eq!(trade.price, Price::from("3702.50"));
2609        assert_eq!(trade.size, Quantity::from(10));
2610        assert_eq!(trade.ts_event, tcbbo_msg.ts_recv);
2611        assert_eq!(trade.ts_init, 0);
2612    }
2613
2614    #[rstest]
2615    fn test_decode_bar_type() {
2616        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2617        let instrument_id = InstrumentId::from("ESM4.GLBX");
2618
2619        // Test 1-second bar
2620        msg.hd.rtype = 32;
2621        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2622        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-SECOND-LAST-EXTERNAL"));
2623
2624        // Test 1-minute bar
2625        msg.hd.rtype = 33;
2626        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2627        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-MINUTE-LAST-EXTERNAL"));
2628
2629        // Test 1-hour bar
2630        msg.hd.rtype = 34;
2631        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2632        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-HOUR-LAST-EXTERNAL"));
2633
2634        // Test 1-day bar
2635        msg.hd.rtype = 35;
2636        let bar_type = decode_bar_type(&msg, instrument_id).unwrap();
2637        assert_eq!(bar_type, BarType::from("ESM4.GLBX-1-DAY-LAST-EXTERNAL"));
2638
2639        // Test unsupported rtype
2640        msg.hd.rtype = 99;
2641        let result = decode_bar_type(&msg, instrument_id);
2642        assert!(result.is_err());
2643    }
2644
2645    #[rstest]
2646    fn test_decode_ts_event_adjustment() {
2647        let mut msg = dbn::OhlcvMsg::default_for_schema(dbn::Schema::Ohlcv1S);
2648
2649        // Test 1-second bar adjustment
2650        msg.hd.rtype = 32;
2651        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2652        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1S);
2653
2654        // Test 1-minute bar adjustment
2655        msg.hd.rtype = 33;
2656        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2657        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1M);
2658
2659        // Test 1-hour bar adjustment
2660        msg.hd.rtype = 34;
2661        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2662        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1H);
2663
2664        // Test 1-day bar adjustment
2665        msg.hd.rtype = 35;
2666        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2667        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2668
2669        // Test eod bar adjustment (same as 1d)
2670        msg.hd.rtype = 36;
2671        let adjustment = decode_ts_event_adjustment(&msg).unwrap();
2672        assert_eq!(adjustment, BAR_CLOSE_ADJUSTMENT_1D);
2673
2674        // Test unsupported rtype
2675        msg.hd.rtype = 99;
2676        let result = decode_ts_event_adjustment(&msg);
2677        assert!(result.is_err());
2678    }
2679
2680    #[rstest]
2681    fn test_decode_record() {
2682        // Test with MBO message
2683        let path = test_data_path().join("test_data.mbo.dbn.zst");
2684        let decoder = Decoder::from_zstd_file(path).unwrap();
2685        let mut dbn_stream = decoder.decode_stream::<dbn::MboMsg>();
2686        let msg = dbn_stream.next().unwrap().unwrap();
2687
2688        let record_ref = dbn::RecordRef::from(msg);
2689        let instrument_id = InstrumentId::from("ESM4.GLBX");
2690
2691        let (data1, data2) =
2692            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2693
2694        assert!(data1.is_some());
2695        assert!(data2.is_none());
2696
2697        // Test with Trade message
2698        let path = test_data_path().join("test_data.trades.dbn.zst");
2699        let decoder = Decoder::from_zstd_file(path).unwrap();
2700        let mut dbn_stream = decoder.decode_stream::<dbn::TradeMsg>();
2701        let msg = dbn_stream.next().unwrap().unwrap();
2702
2703        let record_ref = dbn::RecordRef::from(msg);
2704
2705        let (data1, data2) =
2706            decode_record(&record_ref, instrument_id, 2, Some(0.into()), true, false).unwrap();
2707
2708        assert!(data1.is_some());
2709        assert!(data2.is_none());
2710        assert!(matches!(data1.unwrap(), Data::Trade(_)));
2711    }
2712}