Skip to main content

nautilus_databento/
loader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    env, fs,
18    path::{Path, PathBuf},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use databento::dbn::{self, InstrumentDefMsg};
24use dbn::{
25    Publisher,
26    decode::{DbnMetadata, DecodeStream, dbn::Decoder},
27};
28use fallible_streaming_iterator::FallibleStreamingIterator;
29use indexmap::IndexMap;
30use nautilus_model::{
31    data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
32    identifiers::{InstrumentId, Symbol, Venue},
33    instruments::InstrumentAny,
34    types::Currency,
35};
36
37use super::{
38    decode::{decode_imbalance_msg, decode_record, decode_statistics_msg, decode_status_msg},
39    symbology::decode_nautilus_instrument_id,
40    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, Dataset, PublisherId},
41};
42use crate::{decode::decode_instrument_def_msg, symbology::MetadataCache};
43
44/// Applies default venue-to-dataset mappings for consolidated Databento feeds.
45/// GLBX.MDP3 covers CME Globex exchange MICs; OPRA.PILLAR covers OPRA option venues.
46fn apply_default_venue_dataset_mappings(venue_dataset_map: &mut IndexMap<Venue, Dataset>) {
47    let glbx = Dataset::from("GLBX.MDP3");
48
49    for venue in [
50        Venue::CBCM(),
51        Venue::GLBX(),
52        Venue::NYUM(),
53        Venue::XCBT(),
54        Venue::XCEC(),
55        Venue::XCME(),
56        Venue::XFXS(),
57        Venue::XNYM(),
58    ] {
59        _ = venue_dataset_map.insert(venue, glbx);
60    }
61
62    let opra = Dataset::from("OPRA.PILLAR");
63    for venue_code in [
64        "AMXO", "XBOX", "XCBO", "EMLD", "EDGO", "GMNI", "XISX", "MCRY", "XMIO", "ARCO", "OPRA",
65        "MPRL", "XNDQ", "XBXO", "C2OX", "XPHL", "BATO", "MXOP", "SPHR",
66    ] {
67        _ = venue_dataset_map.insert(Venue::from(venue_code), opra);
68    }
69}
70
71/// A Nautilus data loader for Databento Binary Encoding (DBN) format data.
72///
73/// # Supported Schemas
74///  - `MBO` -> `OrderBookDelta`
75///  - `MBP_1` -> `(QuoteTick, Option<TradeTick>)`
76///  - `MBP_10` -> `OrderBookDepth10`
77///  - `BBO_1S` -> `QuoteTick`
78///  - `BBO_1M` -> `QuoteTick`
79///  - `CMBP_1` -> `(QuoteTick, Option<TradeTick>)`
80///  - `CBBO_1S` -> `QuoteTick`
81///  - `CBBO_1M` -> `QuoteTick`
82///  - `TCBBO` -> `(QuoteTick, TradeTick)`
83///  - `TBBO` -> `(QuoteTick, TradeTick)`
84///  - `TRADES` -> `TradeTick`
85///  - `OHLCV_1S` -> `Bar`
86///  - `OHLCV_1M` -> `Bar`
87///  - `OHLCV_1H` -> `Bar`
88///  - `OHLCV_1D` -> `Bar`
89///  - `OHLCV_EOD` -> `Bar`
90///  - `DEFINITION` -> `Instrument`
91///  - `IMBALANCE` -> `DatabentoImbalance`
92///  - `STATISTICS` -> `DatabentoStatistics`
93///  - `STATUS` -> `InstrumentStatus`
94///
95/// # References
96///
97/// <https://databento.com/docs/schemas-and-data-formats>
98#[cfg_attr(
99    feature = "python",
100    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
101)]
102#[cfg_attr(
103    feature = "python",
104    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
105)]
106#[derive(Debug)]
107pub struct DatabentoDataLoader {
108    publishers_map: IndexMap<PublisherId, DatabentoPublisher>,
109    venue_dataset_map: IndexMap<Venue, Dataset>,
110    publisher_venue_map: IndexMap<PublisherId, Venue>,
111    symbol_venue_map: AHashMap<Symbol, Venue>,
112}
113
114impl DatabentoDataLoader {
115    /// Creates a new [`DatabentoDataLoader`] instance.
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if locating or loading publishers data fails.
120    pub fn new(publishers_filepath: Option<PathBuf>) -> anyhow::Result<Self> {
121        let mut loader = Self {
122            publishers_map: IndexMap::new(),
123            venue_dataset_map: IndexMap::new(),
124            publisher_venue_map: IndexMap::new(),
125            symbol_venue_map: AHashMap::new(),
126        };
127
128        // Load publishers
129        let publishers_filepath = if let Some(p) = publishers_filepath {
130            p
131        } else {
132            // Use built-in publishers path
133            let mut exe_path = env::current_exe()?;
134            exe_path.pop();
135            exe_path.push("publishers.json");
136            exe_path
137        };
138
139        loader
140            .load_publishers(publishers_filepath)
141            .context("error loading publishers.json")?;
142
143        Ok(loader)
144    }
145
146    /// Load the publishers data from the file at the given `filepath`.
147    ///
148    /// # Errors
149    ///
150    /// Returns an error if the file cannot be read or parsed as JSON.
151    pub fn load_publishers(&mut self, filepath: PathBuf) -> anyhow::Result<()> {
152        let file_content = fs::read_to_string(filepath)?;
153        let publishers: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
154
155        self.publishers_map = publishers
156            .clone()
157            .into_iter()
158            .map(|p| (p.publisher_id, p))
159            .collect();
160
161        let mut venue_dataset_map = IndexMap::new();
162
163        // Only insert a dataset if the venue key is not already in the map
164        for publisher in &publishers {
165            let venue = Venue::from(publisher.venue.as_str());
166            let dataset = Dataset::from(publisher.dataset.as_str());
167            venue_dataset_map.entry(venue).or_insert(dataset);
168        }
169
170        self.venue_dataset_map = venue_dataset_map;
171        apply_default_venue_dataset_mappings(&mut self.venue_dataset_map);
172
173        self.publisher_venue_map = publishers
174            .into_iter()
175            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
176            .collect();
177
178        Ok(())
179    }
180
181    /// Returns the internal Databento publishers currently held by the loader.
182    #[must_use]
183    pub const fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
184        &self.publishers_map
185    }
186
187    /// Sets the `venue` to map to the given `dataset`.
188    pub fn set_dataset_for_venue(&mut self, dataset: Dataset, venue: Venue) {
189        _ = self.venue_dataset_map.insert(venue, dataset);
190    }
191
192    /// Returns the dataset which matches the given `venue` (if found).
193    #[must_use]
194    pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
195        self.venue_dataset_map.get(venue)
196    }
197
198    /// Returns the venue which matches the given `publisher_id` (if found).
199    #[must_use]
200    pub fn get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<&Venue> {
201        self.publisher_venue_map.get(&publisher_id)
202    }
203
204    /// Returns the schema for the given `filepath`.
205    ///
206    /// # Errors
207    ///
208    /// Returns an error if the file cannot be decoded or metadata retrieval fails.
209    pub fn schema_from_file(&self, filepath: &Path) -> anyhow::Result<Option<String>> {
210        let decoder = Decoder::from_zstd_file(filepath)?;
211        let metadata = decoder.metadata();
212        Ok(metadata.schema.map(|schema| schema.to_string()))
213    }
214
215    /// Reads instrument definition records from a DBN file.
216    ///
217    /// # Errors
218    ///
219    /// Returns an error if decoding the definition records fails.
220    pub fn read_definition_records(
221        &mut self,
222        filepath: &Path,
223        use_exchange_as_venue: bool,
224    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentAny>> + '_> {
225        let decoder = Decoder::from_zstd_file(filepath)?;
226        let mut dbn_stream = decoder.decode_stream::<InstrumentDefMsg>();
227
228        Ok(std::iter::from_fn(move || {
229            let result: anyhow::Result<Option<InstrumentAny>> = (|| {
230                dbn_stream
231                    .advance()
232                    .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
233
234                if let Some(rec) = dbn_stream.get() {
235                    let record = dbn::RecordRef::from(rec);
236                    let msg = record
237                        .get::<InstrumentDefMsg>()
238                        .ok_or_else(|| anyhow::anyhow!("Failed to decode InstrumentDefMsg"))?;
239
240                    // Symbol and venue resolution
241                    let raw_symbol = rec
242                        .raw_symbol()
243                        .map_err(|e| anyhow::anyhow!("Error decoding `raw_symbol`: {e}"))?;
244                    let symbol = Symbol::from(raw_symbol);
245
246                    let publisher = rec
247                        .hd
248                        .publisher()
249                        .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
250                    let venue = match publisher {
251                        Publisher::GlbxMdp3Glbx if use_exchange_as_venue => {
252                            let exchange = rec.exchange().map_err(|e| {
253                                anyhow::anyhow!("Missing `exchange` for record: {e}")
254                            })?;
255                            let venue = Venue::from_code(exchange).map_err(|e| {
256                                anyhow::anyhow!("Venue not found for exchange {exchange}: {e}")
257                            })?;
258                            self.symbol_venue_map.insert(symbol, venue);
259                            venue
260                        }
261                        _ => *self
262                            .publisher_venue_map
263                            .get(&msg.hd.publisher_id)
264                            .ok_or_else(|| {
265                                anyhow::anyhow!(
266                                    "Venue not found for publisher_id {}",
267                                    msg.hd.publisher_id
268                                )
269                            })?,
270                    };
271                    let instrument_id = InstrumentId::new(symbol, venue);
272                    let ts_init = msg.ts_recv.into();
273
274                    let data = decode_instrument_def_msg(rec, instrument_id, Some(ts_init))?;
275                    Ok(Some(data))
276                } else {
277                    // No more records
278                    Ok(None)
279                }
280            })();
281
282            match result {
283                Ok(Some(item)) => Some(Ok(item)),
284                Ok(None) => None,
285                Err(e) => Some(Err(e)),
286            }
287        }))
288    }
289
290    /// Reads and decodes market data records from a DBN file.
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if reading records fails.
295    pub fn read_records<T>(
296        &self,
297        filepath: &Path,
298        instrument_id: Option<InstrumentId>,
299        price_precision: Option<u8>,
300        include_trades: bool,
301        bars_timestamp_on_close: Option<bool>,
302    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
303    where
304        T: dbn::Record + dbn::HasRType + 'static,
305    {
306        let decoder = Decoder::from_zstd_file(filepath)?;
307        let metadata = decoder.metadata().clone();
308        let mut metadata_cache = MetadataCache::new(metadata);
309        let mut dbn_stream = decoder.decode_stream::<T>();
310
311        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
312
313        Ok(std::iter::from_fn(move || {
314            let result: anyhow::Result<Option<(Option<Data>, Option<Data>)>> = (|| {
315                dbn_stream
316                    .advance()
317                    .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
318
319                if let Some(rec) = dbn_stream.get() {
320                    let record = dbn::RecordRef::from(rec);
321                    let instrument_id = if let Some(id) = &instrument_id {
322                        *id
323                    } else {
324                        decode_nautilus_instrument_id(
325                            &record,
326                            &mut metadata_cache,
327                            &self.publisher_venue_map,
328                            &self.symbol_venue_map,
329                        )
330                        .context("failed to decode instrument id")?
331                    };
332                    let (item1, item2) = decode_record(
333                        &record,
334                        instrument_id,
335                        price_precision,
336                        None,
337                        include_trades,
338                        bars_timestamp_on_close.unwrap_or(true),
339                    )?;
340                    Ok(Some((item1, item2)))
341                } else {
342                    Ok(None)
343                }
344            })();
345
346            match result {
347                Ok(Some(v)) => Some(Ok(v)),
348                Ok(None) => None,
349                Err(e) => Some(Err(e)),
350            }
351        }))
352    }
353
354    /// Loads all instrument definitions from a DBN file.
355    ///
356    /// When `skip_on_error` is true, instruments that fail to decode are logged
357    /// as warnings and skipped. When false (default), any decode error is propagated.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if loading instruments fails.
362    pub fn load_instruments(
363        &mut self,
364        filepath: &Path,
365        use_exchange_as_venue: bool,
366        skip_on_error: bool,
367    ) -> anyhow::Result<Vec<InstrumentAny>> {
368        if skip_on_error {
369            let mut instruments = Vec::new();
370
371            for result in self.read_definition_records(filepath, use_exchange_as_venue)? {
372                match result {
373                    Ok(instrument) => instruments.push(instrument),
374                    Err(e) => log::warn!("Skipping instrument: {e}"),
375                }
376            }
377            Ok(instruments)
378        } else {
379            self.read_definition_records(filepath, use_exchange_as_venue)?
380                .collect::<Result<Vec<_>, _>>()
381        }
382    }
383
384    /// Loads order book delta messages from a DBN MBO schema file.
385    ///
386    /// Cannot include trades.
387    ///
388    /// # Errors
389    ///
390    /// Returns an error if loading order book deltas fails.
391    pub fn load_order_book_deltas(
392        &self,
393        filepath: &Path,
394        instrument_id: Option<InstrumentId>,
395        price_precision: Option<u8>,
396    ) -> anyhow::Result<Vec<OrderBookDelta>> {
397        self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false, None)?
398            .filter_map(|result| match result {
399                Ok((Some(item1), _)) => {
400                    if let Data::Delta(delta) = item1 {
401                        Some(Ok(delta))
402                    } else {
403                        None
404                    }
405                }
406                Ok((None, _)) => None,
407                Err(e) => Some(Err(e)),
408            })
409            .collect()
410    }
411
412    /// Loads order book depth10 snapshots from a DBN MBP-10 schema file.
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if loading order book depth10 fails.
417    pub fn load_order_book_depth10(
418        &self,
419        filepath: &Path,
420        instrument_id: Option<InstrumentId>,
421        price_precision: Option<u8>,
422    ) -> anyhow::Result<Vec<OrderBookDepth10>> {
423        self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false, None)?
424            .filter_map(|result| match result {
425                Ok((Some(item1), _)) => {
426                    if let Data::Depth10(depth) = item1 {
427                        Some(Ok(*depth))
428                    } else {
429                        None
430                    }
431                }
432                Ok((None, _)) => None,
433                Err(e) => Some(Err(e)),
434            })
435            .collect()
436    }
437
438    /// Loads quote tick messages from a DBN MBP-1 or TBBO schema file.
439    ///
440    /// # Errors
441    ///
442    /// Returns an error if loading quotes fails.
443    pub fn load_quotes(
444        &self,
445        filepath: &Path,
446        instrument_id: Option<InstrumentId>,
447        price_precision: Option<u8>,
448    ) -> anyhow::Result<Vec<QuoteTick>> {
449        self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false, None)?
450            .filter_map(|result| match result {
451                Ok((Some(item1), _)) => {
452                    if let Data::Quote(quote) = item1 {
453                        Some(Ok(quote))
454                    } else {
455                        None
456                    }
457                }
458                Ok((None, _)) => None,
459                Err(e) => Some(Err(e)),
460            })
461            .collect()
462    }
463
464    /// Loads best bid/offer quote messages from a DBN BBO schema file.
465    ///
466    /// # Errors
467    ///
468    /// Returns an error if loading BBO quotes fails.
469    pub fn load_bbo_quotes(
470        &self,
471        filepath: &Path,
472        instrument_id: Option<InstrumentId>,
473        price_precision: Option<u8>,
474    ) -> anyhow::Result<Vec<QuoteTick>> {
475        self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false, None)?
476            .filter_map(|result| match result {
477                Ok((Some(item1), _)) => {
478                    if let Data::Quote(quote) = item1 {
479                        Some(Ok(quote))
480                    } else {
481                        None
482                    }
483                }
484                Ok((None, _)) => None,
485                Err(e) => Some(Err(e)),
486            })
487            .collect()
488    }
489
490    /// Loads consolidated MBP-1 quote messages from a DBN CMBP-1 schema file.
491    ///
492    /// # Errors
493    ///
494    /// Returns an error if loading consolidated MBP-1 quotes fails.
495    pub fn load_cmbp_quotes(
496        &self,
497        filepath: &Path,
498        instrument_id: Option<InstrumentId>,
499        price_precision: Option<u8>,
500    ) -> anyhow::Result<Vec<QuoteTick>> {
501        self.read_records::<dbn::Cmbp1Msg>(filepath, instrument_id, price_precision, false, None)?
502            .filter_map(|result| match result {
503                Ok((Some(item1), _)) => {
504                    if let Data::Quote(quote) = item1 {
505                        Some(Ok(quote))
506                    } else {
507                        None
508                    }
509                }
510                Ok((None, _)) => None,
511                Err(e) => Some(Err(e)),
512            })
513            .collect()
514    }
515
516    /// Loads consolidated best bid/offer quote messages from a DBN CBBO schema file.
517    ///
518    /// # Errors
519    ///
520    /// Returns an error if loading consolidated BBO quotes fails.
521    pub fn load_cbbo_quotes(
522        &self,
523        filepath: &Path,
524        instrument_id: Option<InstrumentId>,
525        price_precision: Option<u8>,
526    ) -> anyhow::Result<Vec<QuoteTick>> {
527        self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
528            .filter_map(|result| match result {
529                Ok((Some(item1), _)) => {
530                    if let Data::Quote(quote) = item1 {
531                        Some(Ok(quote))
532                    } else {
533                        None
534                    }
535                }
536                Ok((None, _)) => None,
537                Err(e) => Some(Err(e)),
538            })
539            .collect()
540    }
541
542    /// Loads trade messages from a DBN TBBO schema file.
543    ///
544    /// # Errors
545    ///
546    /// Returns an error if loading TBBO trades fails.
547    pub fn load_tbbo_trades(
548        &self,
549        filepath: &Path,
550        instrument_id: Option<InstrumentId>,
551        price_precision: Option<u8>,
552    ) -> anyhow::Result<Vec<TradeTick>> {
553        self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false, None)?
554            .filter_map(|result| match result {
555                Ok((_, maybe_item2)) => {
556                    if let Some(Data::Trade(trade)) = maybe_item2 {
557                        Some(Ok(trade))
558                    } else {
559                        None
560                    }
561                }
562                Err(e) => Some(Err(e)),
563            })
564            .collect()
565    }
566
567    /// Loads trade messages from a DBN TCBBO schema file.
568    ///
569    /// # Errors
570    ///
571    /// Returns an error if loading TCBBO trades fails.
572    pub fn load_tcbbo_trades(
573        &self,
574        filepath: &Path,
575        instrument_id: Option<InstrumentId>,
576        price_precision: Option<u8>,
577    ) -> anyhow::Result<Vec<TradeTick>> {
578        self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
579            .filter_map(|result| match result {
580                Ok((_, maybe_item2)) => {
581                    if let Some(Data::Trade(trade)) = maybe_item2 {
582                        Some(Ok(trade))
583                    } else {
584                        None
585                    }
586                }
587                Err(e) => Some(Err(e)),
588            })
589            .collect()
590    }
591
592    /// Loads trade messages from a DBN TRADES schema file.
593    ///
594    /// # Errors
595    ///
596    /// Returns an error if loading trades fails.
597    pub fn load_trades(
598        &self,
599        filepath: &Path,
600        instrument_id: Option<InstrumentId>,
601        price_precision: Option<u8>,
602    ) -> anyhow::Result<Vec<TradeTick>> {
603        self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false, None)?
604            .filter_map(|result| match result {
605                Ok((Some(item1), _)) => {
606                    if let Data::Trade(trade) = item1 {
607                        Some(Ok(trade))
608                    } else {
609                        None
610                    }
611                }
612                Ok((None, _)) => None,
613                Err(e) => Some(Err(e)),
614            })
615            .collect()
616    }
617
618    /// Loads OHLCV bar messages from a DBN OHLCV schema file.
619    ///
620    /// # Errors
621    ///
622    /// Returns an error if loading bars fails.
623    pub fn load_bars(
624        &self,
625        filepath: &Path,
626        instrument_id: Option<InstrumentId>,
627        price_precision: Option<u8>,
628        timestamp_on_close: Option<bool>,
629    ) -> anyhow::Result<Vec<Bar>> {
630        self.read_records::<dbn::OhlcvMsg>(
631            filepath,
632            instrument_id,
633            price_precision,
634            false,
635            timestamp_on_close,
636        )?
637        .filter_map(|result| match result {
638            Ok((Some(item1), _)) => {
639                if let Data::Bar(bar) = item1 {
640                    Some(Ok(bar))
641                } else {
642                    None
643                }
644            }
645            Ok((None, _)) => None,
646            Err(e) => Some(Err(e)),
647        })
648        .collect()
649    }
650
651    /// Loads instrument status messages from a DBN STATUS schema file.
652    ///
653    /// # Errors
654    ///
655    /// Returns an error if loading status records fails.
656    pub fn load_status_records<T>(
657        &self,
658        filepath: &Path,
659        instrument_id: Option<InstrumentId>,
660    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentStatus>> + '_>
661    where
662        T: dbn::Record + dbn::HasRType + 'static,
663    {
664        let decoder = Decoder::from_zstd_file(filepath)?;
665        let metadata = decoder.metadata().clone();
666        let mut metadata_cache = MetadataCache::new(metadata);
667        let mut dbn_stream = decoder.decode_stream::<T>();
668
669        Ok(std::iter::from_fn(move || {
670            if let Err(e) = dbn_stream.advance() {
671                return Some(Err(e.into()));
672            }
673
674            match dbn_stream.get() {
675                Some(rec) => {
676                    let record = dbn::RecordRef::from(rec);
677                    let instrument_id = match &instrument_id {
678                        Some(id) => *id, // Copy
679                        None => match decode_nautilus_instrument_id(
680                            &record,
681                            &mut metadata_cache,
682                            &self.publisher_venue_map,
683                            &self.symbol_venue_map,
684                        ) {
685                            Ok(id) => id,
686                            Err(e) => return Some(Err(e)),
687                        },
688                    };
689
690                    let msg = match record.get::<dbn::StatusMsg>() {
691                        Some(m) => m,
692                        None => return Some(Err(anyhow::anyhow!("Invalid `StatusMsg`"))),
693                    };
694                    let ts_init = msg.ts_recv.into();
695
696                    match decode_status_msg(msg, instrument_id, Some(ts_init)) {
697                        Ok(data) => Some(Ok(data)),
698                        Err(e) => Some(Err(e)),
699                    }
700                }
701                None => None,
702            }
703        }))
704    }
705
706    /// Reads imbalance messages from a DBN IMBALANCE schema file.
707    ///
708    /// # Errors
709    ///
710    /// Returns an error if reading imbalance records fails.
711    pub fn read_imbalance_records<T>(
712        &self,
713        filepath: &Path,
714        instrument_id: Option<InstrumentId>,
715        price_precision: Option<u8>,
716    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
717    where
718        T: dbn::Record + dbn::HasRType + 'static,
719    {
720        let decoder = Decoder::from_zstd_file(filepath)?;
721        let metadata = decoder.metadata().clone();
722        let mut metadata_cache = MetadataCache::new(metadata);
723        let mut dbn_stream = decoder.decode_stream::<T>();
724
725        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
726
727        Ok(std::iter::from_fn(move || {
728            if let Err(e) = dbn_stream.advance() {
729                return Some(Err(e.into()));
730            }
731
732            match dbn_stream.get() {
733                Some(rec) => {
734                    let record = dbn::RecordRef::from(rec);
735                    let instrument_id = match &instrument_id {
736                        Some(id) => *id, // Copy
737                        None => match decode_nautilus_instrument_id(
738                            &record,
739                            &mut metadata_cache,
740                            &self.publisher_venue_map,
741                            &self.symbol_venue_map,
742                        ) {
743                            Ok(id) => id,
744                            Err(e) => return Some(Err(e)),
745                        },
746                    };
747
748                    let msg = match record.get::<dbn::ImbalanceMsg>() {
749                        Some(m) => m,
750                        None => return Some(Err(anyhow::anyhow!("Invalid `ImbalanceMsg`"))),
751                    };
752                    let ts_init = msg.ts_recv.into();
753
754                    match decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init)) {
755                        Ok(data) => Some(Ok(data)),
756                        Err(e) => Some(Err(e)),
757                    }
758                }
759                None => None,
760            }
761        }))
762    }
763
764    /// Reads statistics messages from a DBN STATISTICS schema file.
765    ///
766    /// # Errors
767    ///
768    /// Returns an error if reading statistics records fails.
769    pub fn read_statistics_records<T>(
770        &self,
771        filepath: &Path,
772        instrument_id: Option<InstrumentId>,
773        price_precision: Option<u8>,
774    ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
775    where
776        T: dbn::Record + dbn::HasRType + 'static,
777    {
778        let decoder = Decoder::from_zstd_file(filepath)?;
779        let metadata = decoder.metadata().clone();
780        let mut metadata_cache = MetadataCache::new(metadata);
781        let mut dbn_stream = decoder.decode_stream::<T>();
782
783        let price_precision = price_precision.unwrap_or(Currency::USD().precision);
784
785        Ok(std::iter::from_fn(move || {
786            if let Err(e) = dbn_stream.advance() {
787                return Some(Err(e.into()));
788            }
789
790            match dbn_stream.get() {
791                Some(rec) => {
792                    let record = dbn::RecordRef::from(rec);
793                    let instrument_id = match &instrument_id {
794                        Some(id) => *id, // Copy
795                        None => match decode_nautilus_instrument_id(
796                            &record,
797                            &mut metadata_cache,
798                            &self.publisher_venue_map,
799                            &self.symbol_venue_map,
800                        ) {
801                            Ok(id) => id,
802                            Err(e) => return Some(Err(e)),
803                        },
804                    };
805                    let msg = match record.get::<dbn::StatMsg>() {
806                        Some(m) => m,
807                        None => return Some(Err(anyhow::anyhow!("Invalid `StatMsg`"))),
808                    };
809                    let ts_init = msg.ts_recv.into();
810
811                    match decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
812                    {
813                        Ok(data) => Some(Ok(data)),
814                        Err(e) => Some(Err(e)),
815                    }
816                }
817                None => None,
818            }
819        }))
820    }
821}
822
823#[cfg(test)]
824mod tests {
825    use std::path::{Path, PathBuf};
826
827    use nautilus_model::types::{Price, Quantity};
828    use rstest::{fixture, rstest};
829    use ustr::Ustr;
830
831    use super::*;
832
833    fn test_data_path() -> PathBuf {
834        Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
835    }
836
837    #[fixture]
838    fn loader() -> DatabentoDataLoader {
839        let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
840        DatabentoDataLoader::new(Some(publishers_filepath)).unwrap()
841    }
842
843    // TODO: Improve the below assertions that we've actually read the records we expected
844
845    #[rstest]
846    fn test_set_dataset_venue_mapping(mut loader: DatabentoDataLoader) {
847        let dataset = Ustr::from("EQUS.PLUS");
848        let venue = Venue::from("XNAS");
849        loader.set_dataset_for_venue(dataset, venue);
850
851        let result = loader.get_dataset_for_venue(&venue).unwrap();
852        assert_eq!(*result, dataset);
853    }
854
855    #[rstest]
856    fn test_default_venue_dataset_mappings(loader: DatabentoDataLoader) {
857        let xcme = Venue::XCME();
858        let result = loader.get_dataset_for_venue(&xcme).unwrap();
859        assert_eq!(*result, Ustr::from("GLBX.MDP3"));
860
861        let xcbo = Venue::from("XCBO");
862        let result = loader.get_dataset_for_venue(&xcbo).unwrap();
863        assert_eq!(*result, Ustr::from("OPRA.PILLAR"));
864    }
865
866    #[rstest]
867    #[case(test_data_path().join("test_data.definition.dbn.zst"))]
868    fn test_load_instruments(mut loader: DatabentoDataLoader, #[case] path: PathBuf) {
869        let instruments = loader.load_instruments(&path, false, false).unwrap();
870
871        assert_eq!(instruments.len(), 2);
872    }
873
874    #[rstest]
875    fn test_load_order_book_deltas(loader: DatabentoDataLoader) {
876        let path = test_data_path().join("test_data.mbo.dbn.zst");
877        let instrument_id = InstrumentId::from("ESM4.GLBX");
878
879        let deltas = loader
880            .load_order_book_deltas(&path, Some(instrument_id), None)
881            .unwrap();
882
883        assert_eq!(deltas.len(), 2);
884    }
885
886    #[rstest]
887    fn test_load_order_book_depth10(loader: DatabentoDataLoader) {
888        let path = test_data_path().join("test_data.mbp-10.dbn.zst");
889        let instrument_id = InstrumentId::from("ESM4.GLBX");
890
891        let depths = loader
892            .load_order_book_depth10(&path, Some(instrument_id), None)
893            .unwrap();
894
895        assert_eq!(depths.len(), 2);
896    }
897
898    #[rstest]
899    fn test_load_quotes(loader: DatabentoDataLoader) {
900        let path = test_data_path().join("test_data.mbp-1.dbn.zst");
901        let instrument_id = InstrumentId::from("ESM4.GLBX");
902
903        let quotes = loader
904            .load_quotes(&path, Some(instrument_id), None)
905            .unwrap();
906
907        assert_eq!(quotes.len(), 2);
908    }
909
910    #[rstest]
911    #[case(test_data_path().join("test_data.bbo-1s.dbn.zst"))]
912    #[case(test_data_path().join("test_data.bbo-1m.dbn.zst"))]
913    fn test_load_bbo_quotes(loader: DatabentoDataLoader, #[case] path: PathBuf) {
914        let instrument_id = InstrumentId::from("ESM4.GLBX");
915
916        let quotes = loader
917            .load_bbo_quotes(&path, Some(instrument_id), None)
918            .unwrap();
919
920        assert_eq!(quotes.len(), 4);
921    }
922
923    #[rstest]
924    fn test_load_cmbp_quotes(loader: DatabentoDataLoader) {
925        let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
926        let instrument_id = InstrumentId::from("ESM4.GLBX");
927
928        let quotes = loader
929            .load_cmbp_quotes(&path, Some(instrument_id), None)
930            .unwrap();
931
932        // Verify exact data count
933        assert_eq!(quotes.len(), 2);
934
935        // Verify first quote fields
936        let first_quote = &quotes[0];
937        assert_eq!(first_quote.instrument_id, instrument_id);
938        assert_eq!(first_quote.bid_price, Price::from("3720.25"));
939        assert_eq!(first_quote.ask_price, Price::from("3720.50"));
940        assert_eq!(first_quote.bid_size, Quantity::from(24));
941        assert_eq!(first_quote.ask_size, Quantity::from(11));
942        assert_eq!(first_quote.ts_event, 1609160400006136329);
943        assert_eq!(first_quote.ts_init, 1609160400006136329);
944    }
945
946    #[rstest]
947    fn test_load_cbbo_quotes(loader: DatabentoDataLoader) {
948        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
949        let instrument_id = InstrumentId::from("ESM4.GLBX");
950
951        let quotes = loader
952            .load_cbbo_quotes(&path, Some(instrument_id), None)
953            .unwrap();
954
955        // Verify exact data count
956        assert_eq!(quotes.len(), 2);
957
958        // Verify first quote fields
959        let first_quote = &quotes[0];
960        assert_eq!(first_quote.instrument_id, instrument_id);
961        assert_eq!(first_quote.bid_price, Price::from("3720.25"));
962        assert_eq!(first_quote.ask_price, Price::from("3720.50"));
963        assert_eq!(first_quote.bid_size, Quantity::from(24));
964        assert_eq!(first_quote.ask_size, Quantity::from(11));
965        assert_eq!(first_quote.ts_event, 1609160400006136329);
966        assert_eq!(first_quote.ts_init, 1609160400006136329);
967    }
968
969    #[rstest]
970    fn test_load_tbbo_trades(loader: DatabentoDataLoader) {
971        let path = test_data_path().join("test_data.tbbo.dbn.zst");
972        let instrument_id = InstrumentId::from("ESM4.GLBX");
973
974        let trades = loader
975            .load_tbbo_trades(&path, Some(instrument_id), None)
976            .unwrap();
977
978        // TBBO test data doesn't contain valid trade data (size/price may be 0)
979        assert_eq!(trades.len(), 0);
980    }
981
982    #[rstest]
983    fn test_load_tcbbo_trades(loader: DatabentoDataLoader) {
984        // Since we don't have dedicated TCBBO test data, we'll use CBBO data
985        // In practice, TCBBO would be CBBO messages with trade data
986        let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
987        let instrument_id = InstrumentId::from("ESM4.GLBX");
988
989        let result = loader.load_tcbbo_trades(&path, Some(instrument_id), None);
990
991        assert!(result.is_ok());
992        let trades = result.unwrap();
993        assert_eq!(trades.len(), 2);
994    }
995
996    #[rstest]
997    fn test_load_trades(loader: DatabentoDataLoader) {
998        let path = test_data_path().join("test_data.trades.dbn.zst");
999        let instrument_id = InstrumentId::from("ESM4.GLBX");
1000        let trades = loader
1001            .load_trades(&path, Some(instrument_id), None)
1002            .unwrap();
1003
1004        assert_eq!(trades.len(), 2);
1005    }
1006
1007    #[rstest]
1008    // #[case(test_data_path().join("test_data.ohlcv-1d.dbn.zst"))]  // TODO: Empty file (0 records)
1009    #[case(test_data_path().join("test_data.ohlcv-1h.dbn.zst"))]
1010    #[case(test_data_path().join("test_data.ohlcv-1m.dbn.zst"))]
1011    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
1012    fn test_load_bars(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1013        let instrument_id = InstrumentId::from("ESM4.GLBX");
1014        let bars = loader
1015            .load_bars(&path, Some(instrument_id), None, None)
1016            .unwrap();
1017
1018        assert_eq!(bars.len(), 2);
1019    }
1020
1021    #[rstest]
1022    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
1023    fn test_load_bars_timestamp_on_close_true(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1024        let instrument_id = InstrumentId::from("ESM4.GLBX");
1025        let bars = loader
1026            .load_bars(&path, Some(instrument_id), None, Some(true))
1027            .unwrap();
1028
1029        assert_eq!(bars.len(), 2);
1030
1031        // When bars_timestamp_on_close is true, both ts_event and ts_init should be close time
1032        for bar in &bars {
1033            assert_eq!(
1034                bar.ts_event, bar.ts_init,
1035                "ts_event and ts_init should both be close time when bars_timestamp_on_close=true"
1036            );
1037        }
1038    }
1039
1040    #[rstest]
1041    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
1042    fn test_load_bars_timestamp_on_close_false(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1043        let instrument_id = InstrumentId::from("ESM4.GLBX");
1044        let bars = loader
1045            .load_bars(&path, Some(instrument_id), None, Some(false))
1046            .unwrap();
1047
1048        assert_eq!(bars.len(), 2);
1049
1050        // When bars_timestamp_on_close is false, ts_event is open time and ts_init is close time
1051        for bar in &bars {
1052            assert_ne!(
1053                bar.ts_event, bar.ts_init,
1054                "ts_event should be open time and ts_init should be close time when bars_timestamp_on_close=false"
1055            );
1056            // For 1-second bars, ts_init (close) should be 1 second after ts_event (open)
1057            assert_eq!(bar.ts_init.as_u64(), bar.ts_event.as_u64() + 1_000_000_000);
1058        }
1059    }
1060
1061    #[rstest]
1062    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 0)]
1063    #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 1)]
1064    fn test_load_bars_timestamp_comparison(
1065        loader: DatabentoDataLoader,
1066        #[case] path: PathBuf,
1067        #[case] bar_index: usize,
1068    ) {
1069        const ONE_SECOND_NS: u64 = 1_000_000_000;
1070
1071        let instrument_id = InstrumentId::from("ESM4.GLBX");
1072
1073        let bars_close = loader
1074            .load_bars(&path, Some(instrument_id), None, Some(true))
1075            .unwrap();
1076
1077        let bars_open = loader
1078            .load_bars(&path, Some(instrument_id), None, Some(false))
1079            .unwrap();
1080
1081        assert_eq!(bars_close.len(), bars_open.len());
1082        assert_eq!(bars_close.len(), 2);
1083
1084        let bar_close = &bars_close[bar_index];
1085        let bar_open = &bars_open[bar_index];
1086
1087        // Bars should have the same OHLCV data
1088        assert_eq!(bar_close.open, bar_open.open);
1089        assert_eq!(bar_close.high, bar_open.high);
1090        assert_eq!(bar_close.low, bar_open.low);
1091        assert_eq!(bar_close.close, bar_open.close);
1092        assert_eq!(bar_close.volume, bar_open.volume);
1093
1094        // The close-timestamped bar should have later timestamp than open-timestamped bar
1095        // For 1-second bars, this should be exactly 1 second difference
1096        assert!(
1097            bar_close.ts_event > bar_open.ts_event,
1098            "Close-timestamped bar should have later timestamp than open-timestamped bar"
1099        );
1100
1101        // The difference should be exactly 1 second (1_000_000_000 nanoseconds) for 1s bars
1102        assert_eq!(
1103            bar_close.ts_event.as_u64() - bar_open.ts_event.as_u64(),
1104            ONE_SECOND_NS,
1105            "Timestamp difference should be exactly 1 second for 1s bars"
1106        );
1107    }
1108
1109    #[rstest]
1110    fn test_load_status_records(loader: DatabentoDataLoader) {
1111        let path = test_data_path().join("test_data.status.dbn.zst");
1112        let instrument_id = InstrumentId::from("ESM4.GLBX");
1113
1114        let statuses = loader
1115            .load_status_records::<dbn::StatusMsg>(&path, Some(instrument_id))
1116            .unwrap()
1117            .collect::<anyhow::Result<Vec<_>>>()
1118            .unwrap();
1119
1120        // Assert total count matches Python test expectations
1121        assert_eq!(statuses.len(), 4, "Should load exactly 4 status records");
1122
1123        // Assert first record fields match Python test expectations
1124        let first = &statuses[0];
1125        assert_eq!(first.instrument_id, instrument_id);
1126        assert_eq!(first.ts_event.as_u64(), 1609110000000000000);
1127        assert_eq!(first.ts_init.as_u64(), 1609113600000000000);
1128    }
1129
1130    #[rstest]
1131    fn test_read_imbalance_records(loader: DatabentoDataLoader) {
1132        let path = test_data_path().join("test_data.imbalance.dbn.zst");
1133        let instrument_id = InstrumentId::from("ESM4.GLBX");
1134
1135        let imbalances = loader
1136            .read_imbalance_records::<dbn::ImbalanceMsg>(&path, Some(instrument_id), None)
1137            .unwrap()
1138            .collect::<anyhow::Result<Vec<_>>>()
1139            .unwrap();
1140
1141        // Assert total count
1142        assert_eq!(
1143            imbalances.len(),
1144            2,
1145            "Should load exactly 2 imbalance records"
1146        );
1147
1148        // Assert first record has required fields
1149        let first = &imbalances[0];
1150        assert_eq!(first.instrument_id, instrument_id);
1151        assert!(
1152            first.ref_price.as_f64() > 0.0,
1153            "ref_price should be positive"
1154        );
1155        assert!(first.ts_event.as_u64() > 0, "ts_event should be set");
1156        assert!(first.ts_recv.as_u64() > 0, "ts_recv should be set");
1157        assert!(first.ts_init.as_u64() > 0, "ts_init should be set");
1158    }
1159
1160    #[rstest]
1161    fn test_read_statistics_records(loader: DatabentoDataLoader) {
1162        let path = test_data_path().join("test_data.statistics.dbn.zst");
1163        let instrument_id = InstrumentId::from("ESM4.GLBX");
1164
1165        let statistics = loader
1166            .read_statistics_records::<dbn::StatMsg>(&path, Some(instrument_id), None)
1167            .unwrap()
1168            .collect::<anyhow::Result<Vec<_>>>()
1169            .unwrap();
1170
1171        // Assert total count
1172        assert_eq!(
1173            statistics.len(),
1174            2,
1175            "Should load exactly 2 statistics records"
1176        );
1177
1178        // Assert first record has required fields
1179        let first = &statistics[0];
1180        assert_eq!(first.instrument_id, instrument_id);
1181        assert!(first.ts_event.as_u64() > 0, "ts_event should be set");
1182        assert!(first.ts_recv.as_u64() > 0, "ts_recv should be set");
1183        assert!(first.ts_init.as_u64() > 0, "ts_init should be set");
1184        assert!(first.sequence > 0, "sequence should be positive");
1185    }
1186}