1use std::{
17 env, fs,
18 path::{Path, PathBuf},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use databento::dbn::{self, InstrumentDefMsg};
24use dbn::{
25 Publisher,
26 decode::{DbnMetadata, DecodeStream, dbn::Decoder},
27};
28use fallible_streaming_iterator::FallibleStreamingIterator;
29use indexmap::IndexMap;
30use nautilus_model::{
31 data::{Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick},
32 identifiers::{InstrumentId, Symbol, Venue},
33 instruments::InstrumentAny,
34 types::Currency,
35};
36
37use super::{
38 decode::{decode_imbalance_msg, decode_record, decode_statistics_msg, decode_status_msg},
39 symbology::decode_nautilus_instrument_id,
40 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, Dataset, PublisherId},
41};
42use crate::{decode::decode_instrument_def_msg, symbology::MetadataCache};
43
44fn apply_default_venue_dataset_mappings(venue_dataset_map: &mut IndexMap<Venue, Dataset>) {
47 let glbx = Dataset::from("GLBX.MDP3");
48
49 for venue in [
50 Venue::CBCM(),
51 Venue::GLBX(),
52 Venue::NYUM(),
53 Venue::XCBT(),
54 Venue::XCEC(),
55 Venue::XCME(),
56 Venue::XFXS(),
57 Venue::XNYM(),
58 ] {
59 _ = venue_dataset_map.insert(venue, glbx);
60 }
61
62 let opra = Dataset::from("OPRA.PILLAR");
63 for venue_code in [
64 "AMXO", "XBOX", "XCBO", "EMLD", "EDGO", "GMNI", "XISX", "MCRY", "XMIO", "ARCO", "OPRA",
65 "MPRL", "XNDQ", "XBXO", "C2OX", "XPHL", "BATO", "MXOP", "SPHR",
66 ] {
67 _ = venue_dataset_map.insert(Venue::from(venue_code), opra);
68 }
69}
70
71#[cfg_attr(
99 feature = "python",
100 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
101)]
102#[cfg_attr(
103 feature = "python",
104 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
105)]
106#[derive(Debug)]
107pub struct DatabentoDataLoader {
108 publishers_map: IndexMap<PublisherId, DatabentoPublisher>,
109 venue_dataset_map: IndexMap<Venue, Dataset>,
110 publisher_venue_map: IndexMap<PublisherId, Venue>,
111 symbol_venue_map: AHashMap<Symbol, Venue>,
112}
113
114impl DatabentoDataLoader {
115 pub fn new(publishers_filepath: Option<PathBuf>) -> anyhow::Result<Self> {
121 let mut loader = Self {
122 publishers_map: IndexMap::new(),
123 venue_dataset_map: IndexMap::new(),
124 publisher_venue_map: IndexMap::new(),
125 symbol_venue_map: AHashMap::new(),
126 };
127
128 let publishers_filepath = if let Some(p) = publishers_filepath {
130 p
131 } else {
132 let mut exe_path = env::current_exe()?;
134 exe_path.pop();
135 exe_path.push("publishers.json");
136 exe_path
137 };
138
139 loader
140 .load_publishers(publishers_filepath)
141 .context("error loading publishers.json")?;
142
143 Ok(loader)
144 }
145
146 pub fn load_publishers(&mut self, filepath: PathBuf) -> anyhow::Result<()> {
152 let file_content = fs::read_to_string(filepath)?;
153 let publishers: Vec<DatabentoPublisher> = serde_json::from_str(&file_content)?;
154
155 self.publishers_map = publishers
156 .clone()
157 .into_iter()
158 .map(|p| (p.publisher_id, p))
159 .collect();
160
161 let mut venue_dataset_map = IndexMap::new();
162
163 for publisher in &publishers {
165 let venue = Venue::from(publisher.venue.as_str());
166 let dataset = Dataset::from(publisher.dataset.as_str());
167 venue_dataset_map.entry(venue).or_insert(dataset);
168 }
169
170 self.venue_dataset_map = venue_dataset_map;
171 apply_default_venue_dataset_mappings(&mut self.venue_dataset_map);
172
173 self.publisher_venue_map = publishers
174 .into_iter()
175 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
176 .collect();
177
178 Ok(())
179 }
180
181 #[must_use]
183 pub const fn get_publishers(&self) -> &IndexMap<u16, DatabentoPublisher> {
184 &self.publishers_map
185 }
186
187 pub fn set_dataset_for_venue(&mut self, dataset: Dataset, venue: Venue) {
189 _ = self.venue_dataset_map.insert(venue, dataset);
190 }
191
192 #[must_use]
194 pub fn get_dataset_for_venue(&self, venue: &Venue) -> Option<&Dataset> {
195 self.venue_dataset_map.get(venue)
196 }
197
198 #[must_use]
200 pub fn get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<&Venue> {
201 self.publisher_venue_map.get(&publisher_id)
202 }
203
204 pub fn schema_from_file(&self, filepath: &Path) -> anyhow::Result<Option<String>> {
210 let decoder = Decoder::from_zstd_file(filepath)?;
211 let metadata = decoder.metadata();
212 Ok(metadata.schema.map(|schema| schema.to_string()))
213 }
214
215 pub fn read_definition_records(
221 &mut self,
222 filepath: &Path,
223 use_exchange_as_venue: bool,
224 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentAny>> + '_> {
225 let decoder = Decoder::from_zstd_file(filepath)?;
226 let mut dbn_stream = decoder.decode_stream::<InstrumentDefMsg>();
227
228 Ok(std::iter::from_fn(move || {
229 let result: anyhow::Result<Option<InstrumentAny>> = (|| {
230 dbn_stream
231 .advance()
232 .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
233
234 if let Some(rec) = dbn_stream.get() {
235 let record = dbn::RecordRef::from(rec);
236 let msg = record
237 .get::<InstrumentDefMsg>()
238 .ok_or_else(|| anyhow::anyhow!("Failed to decode InstrumentDefMsg"))?;
239
240 let raw_symbol = rec
242 .raw_symbol()
243 .map_err(|e| anyhow::anyhow!("Error decoding `raw_symbol`: {e}"))?;
244 let symbol = Symbol::from(raw_symbol);
245
246 let publisher = rec
247 .hd
248 .publisher()
249 .map_err(|e| anyhow::anyhow!("Invalid `publisher` for record: {e}"))?;
250 let venue = match publisher {
251 Publisher::GlbxMdp3Glbx if use_exchange_as_venue => {
252 let exchange = rec.exchange().map_err(|e| {
253 anyhow::anyhow!("Missing `exchange` for record: {e}")
254 })?;
255 let venue = Venue::from_code(exchange).map_err(|e| {
256 anyhow::anyhow!("Venue not found for exchange {exchange}: {e}")
257 })?;
258 self.symbol_venue_map.insert(symbol, venue);
259 venue
260 }
261 _ => *self
262 .publisher_venue_map
263 .get(&msg.hd.publisher_id)
264 .ok_or_else(|| {
265 anyhow::anyhow!(
266 "Venue not found for publisher_id {}",
267 msg.hd.publisher_id
268 )
269 })?,
270 };
271 let instrument_id = InstrumentId::new(symbol, venue);
272 let ts_init = msg.ts_recv.into();
273
274 let data = decode_instrument_def_msg(rec, instrument_id, Some(ts_init))?;
275 Ok(Some(data))
276 } else {
277 Ok(None)
279 }
280 })();
281
282 match result {
283 Ok(Some(item)) => Some(Ok(item)),
284 Ok(None) => None,
285 Err(e) => Some(Err(e)),
286 }
287 }))
288 }
289
290 pub fn read_records<T>(
296 &self,
297 filepath: &Path,
298 instrument_id: Option<InstrumentId>,
299 price_precision: Option<u8>,
300 include_trades: bool,
301 bars_timestamp_on_close: Option<bool>,
302 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>> + '_>
303 where
304 T: dbn::Record + dbn::HasRType + 'static,
305 {
306 let decoder = Decoder::from_zstd_file(filepath)?;
307 let metadata = decoder.metadata().clone();
308 let mut metadata_cache = MetadataCache::new(metadata);
309 let mut dbn_stream = decoder.decode_stream::<T>();
310
311 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
312
313 Ok(std::iter::from_fn(move || {
314 let result: anyhow::Result<Option<(Option<Data>, Option<Data>)>> = (|| {
315 dbn_stream
316 .advance()
317 .map_err(|e| anyhow::anyhow!("Stream advance error: {e}"))?;
318
319 if let Some(rec) = dbn_stream.get() {
320 let record = dbn::RecordRef::from(rec);
321 let instrument_id = if let Some(id) = &instrument_id {
322 *id
323 } else {
324 decode_nautilus_instrument_id(
325 &record,
326 &mut metadata_cache,
327 &self.publisher_venue_map,
328 &self.symbol_venue_map,
329 )
330 .context("failed to decode instrument id")?
331 };
332 let (item1, item2) = decode_record(
333 &record,
334 instrument_id,
335 price_precision,
336 None,
337 include_trades,
338 bars_timestamp_on_close.unwrap_or(true),
339 )?;
340 Ok(Some((item1, item2)))
341 } else {
342 Ok(None)
343 }
344 })();
345
346 match result {
347 Ok(Some(v)) => Some(Ok(v)),
348 Ok(None) => None,
349 Err(e) => Some(Err(e)),
350 }
351 }))
352 }
353
354 pub fn load_instruments(
363 &mut self,
364 filepath: &Path,
365 use_exchange_as_venue: bool,
366 skip_on_error: bool,
367 ) -> anyhow::Result<Vec<InstrumentAny>> {
368 if skip_on_error {
369 let mut instruments = Vec::new();
370
371 for result in self.read_definition_records(filepath, use_exchange_as_venue)? {
372 match result {
373 Ok(instrument) => instruments.push(instrument),
374 Err(e) => log::warn!("Skipping instrument: {e}"),
375 }
376 }
377 Ok(instruments)
378 } else {
379 self.read_definition_records(filepath, use_exchange_as_venue)?
380 .collect::<Result<Vec<_>, _>>()
381 }
382 }
383
384 pub fn load_order_book_deltas(
392 &self,
393 filepath: &Path,
394 instrument_id: Option<InstrumentId>,
395 price_precision: Option<u8>,
396 ) -> anyhow::Result<Vec<OrderBookDelta>> {
397 self.read_records::<dbn::MboMsg>(filepath, instrument_id, price_precision, false, None)?
398 .filter_map(|result| match result {
399 Ok((Some(item1), _)) => {
400 if let Data::Delta(delta) = item1 {
401 Some(Ok(delta))
402 } else {
403 None
404 }
405 }
406 Ok((None, _)) => None,
407 Err(e) => Some(Err(e)),
408 })
409 .collect()
410 }
411
412 pub fn load_order_book_depth10(
418 &self,
419 filepath: &Path,
420 instrument_id: Option<InstrumentId>,
421 price_precision: Option<u8>,
422 ) -> anyhow::Result<Vec<OrderBookDepth10>> {
423 self.read_records::<dbn::Mbp10Msg>(filepath, instrument_id, price_precision, false, None)?
424 .filter_map(|result| match result {
425 Ok((Some(item1), _)) => {
426 if let Data::Depth10(depth) = item1 {
427 Some(Ok(*depth))
428 } else {
429 None
430 }
431 }
432 Ok((None, _)) => None,
433 Err(e) => Some(Err(e)),
434 })
435 .collect()
436 }
437
438 pub fn load_quotes(
444 &self,
445 filepath: &Path,
446 instrument_id: Option<InstrumentId>,
447 price_precision: Option<u8>,
448 ) -> anyhow::Result<Vec<QuoteTick>> {
449 self.read_records::<dbn::Mbp1Msg>(filepath, instrument_id, price_precision, false, None)?
450 .filter_map(|result| match result {
451 Ok((Some(item1), _)) => {
452 if let Data::Quote(quote) = item1 {
453 Some(Ok(quote))
454 } else {
455 None
456 }
457 }
458 Ok((None, _)) => None,
459 Err(e) => Some(Err(e)),
460 })
461 .collect()
462 }
463
464 pub fn load_bbo_quotes(
470 &self,
471 filepath: &Path,
472 instrument_id: Option<InstrumentId>,
473 price_precision: Option<u8>,
474 ) -> anyhow::Result<Vec<QuoteTick>> {
475 self.read_records::<dbn::BboMsg>(filepath, instrument_id, price_precision, false, None)?
476 .filter_map(|result| match result {
477 Ok((Some(item1), _)) => {
478 if let Data::Quote(quote) = item1 {
479 Some(Ok(quote))
480 } else {
481 None
482 }
483 }
484 Ok((None, _)) => None,
485 Err(e) => Some(Err(e)),
486 })
487 .collect()
488 }
489
490 pub fn load_cmbp_quotes(
496 &self,
497 filepath: &Path,
498 instrument_id: Option<InstrumentId>,
499 price_precision: Option<u8>,
500 ) -> anyhow::Result<Vec<QuoteTick>> {
501 self.read_records::<dbn::Cmbp1Msg>(filepath, instrument_id, price_precision, false, None)?
502 .filter_map(|result| match result {
503 Ok((Some(item1), _)) => {
504 if let Data::Quote(quote) = item1 {
505 Some(Ok(quote))
506 } else {
507 None
508 }
509 }
510 Ok((None, _)) => None,
511 Err(e) => Some(Err(e)),
512 })
513 .collect()
514 }
515
516 pub fn load_cbbo_quotes(
522 &self,
523 filepath: &Path,
524 instrument_id: Option<InstrumentId>,
525 price_precision: Option<u8>,
526 ) -> anyhow::Result<Vec<QuoteTick>> {
527 self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
528 .filter_map(|result| match result {
529 Ok((Some(item1), _)) => {
530 if let Data::Quote(quote) = item1 {
531 Some(Ok(quote))
532 } else {
533 None
534 }
535 }
536 Ok((None, _)) => None,
537 Err(e) => Some(Err(e)),
538 })
539 .collect()
540 }
541
542 pub fn load_tbbo_trades(
548 &self,
549 filepath: &Path,
550 instrument_id: Option<InstrumentId>,
551 price_precision: Option<u8>,
552 ) -> anyhow::Result<Vec<TradeTick>> {
553 self.read_records::<dbn::TbboMsg>(filepath, instrument_id, price_precision, false, None)?
554 .filter_map(|result| match result {
555 Ok((_, maybe_item2)) => {
556 if let Some(Data::Trade(trade)) = maybe_item2 {
557 Some(Ok(trade))
558 } else {
559 None
560 }
561 }
562 Err(e) => Some(Err(e)),
563 })
564 .collect()
565 }
566
567 pub fn load_tcbbo_trades(
573 &self,
574 filepath: &Path,
575 instrument_id: Option<InstrumentId>,
576 price_precision: Option<u8>,
577 ) -> anyhow::Result<Vec<TradeTick>> {
578 self.read_records::<dbn::CbboMsg>(filepath, instrument_id, price_precision, false, None)?
579 .filter_map(|result| match result {
580 Ok((_, maybe_item2)) => {
581 if let Some(Data::Trade(trade)) = maybe_item2 {
582 Some(Ok(trade))
583 } else {
584 None
585 }
586 }
587 Err(e) => Some(Err(e)),
588 })
589 .collect()
590 }
591
592 pub fn load_trades(
598 &self,
599 filepath: &Path,
600 instrument_id: Option<InstrumentId>,
601 price_precision: Option<u8>,
602 ) -> anyhow::Result<Vec<TradeTick>> {
603 self.read_records::<dbn::TradeMsg>(filepath, instrument_id, price_precision, false, None)?
604 .filter_map(|result| match result {
605 Ok((Some(item1), _)) => {
606 if let Data::Trade(trade) = item1 {
607 Some(Ok(trade))
608 } else {
609 None
610 }
611 }
612 Ok((None, _)) => None,
613 Err(e) => Some(Err(e)),
614 })
615 .collect()
616 }
617
618 pub fn load_bars(
624 &self,
625 filepath: &Path,
626 instrument_id: Option<InstrumentId>,
627 price_precision: Option<u8>,
628 timestamp_on_close: Option<bool>,
629 ) -> anyhow::Result<Vec<Bar>> {
630 self.read_records::<dbn::OhlcvMsg>(
631 filepath,
632 instrument_id,
633 price_precision,
634 false,
635 timestamp_on_close,
636 )?
637 .filter_map(|result| match result {
638 Ok((Some(item1), _)) => {
639 if let Data::Bar(bar) = item1 {
640 Some(Ok(bar))
641 } else {
642 None
643 }
644 }
645 Ok((None, _)) => None,
646 Err(e) => Some(Err(e)),
647 })
648 .collect()
649 }
650
651 pub fn load_status_records<T>(
657 &self,
658 filepath: &Path,
659 instrument_id: Option<InstrumentId>,
660 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<InstrumentStatus>> + '_>
661 where
662 T: dbn::Record + dbn::HasRType + 'static,
663 {
664 let decoder = Decoder::from_zstd_file(filepath)?;
665 let metadata = decoder.metadata().clone();
666 let mut metadata_cache = MetadataCache::new(metadata);
667 let mut dbn_stream = decoder.decode_stream::<T>();
668
669 Ok(std::iter::from_fn(move || {
670 if let Err(e) = dbn_stream.advance() {
671 return Some(Err(e.into()));
672 }
673
674 match dbn_stream.get() {
675 Some(rec) => {
676 let record = dbn::RecordRef::from(rec);
677 let instrument_id = match &instrument_id {
678 Some(id) => *id, None => match decode_nautilus_instrument_id(
680 &record,
681 &mut metadata_cache,
682 &self.publisher_venue_map,
683 &self.symbol_venue_map,
684 ) {
685 Ok(id) => id,
686 Err(e) => return Some(Err(e)),
687 },
688 };
689
690 let msg = match record.get::<dbn::StatusMsg>() {
691 Some(m) => m,
692 None => return Some(Err(anyhow::anyhow!("Invalid `StatusMsg`"))),
693 };
694 let ts_init = msg.ts_recv.into();
695
696 match decode_status_msg(msg, instrument_id, Some(ts_init)) {
697 Ok(data) => Some(Ok(data)),
698 Err(e) => Some(Err(e)),
699 }
700 }
701 None => None,
702 }
703 }))
704 }
705
706 pub fn read_imbalance_records<T>(
712 &self,
713 filepath: &Path,
714 instrument_id: Option<InstrumentId>,
715 price_precision: Option<u8>,
716 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoImbalance>> + '_>
717 where
718 T: dbn::Record + dbn::HasRType + 'static,
719 {
720 let decoder = Decoder::from_zstd_file(filepath)?;
721 let metadata = decoder.metadata().clone();
722 let mut metadata_cache = MetadataCache::new(metadata);
723 let mut dbn_stream = decoder.decode_stream::<T>();
724
725 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
726
727 Ok(std::iter::from_fn(move || {
728 if let Err(e) = dbn_stream.advance() {
729 return Some(Err(e.into()));
730 }
731
732 match dbn_stream.get() {
733 Some(rec) => {
734 let record = dbn::RecordRef::from(rec);
735 let instrument_id = match &instrument_id {
736 Some(id) => *id, None => match decode_nautilus_instrument_id(
738 &record,
739 &mut metadata_cache,
740 &self.publisher_venue_map,
741 &self.symbol_venue_map,
742 ) {
743 Ok(id) => id,
744 Err(e) => return Some(Err(e)),
745 },
746 };
747
748 let msg = match record.get::<dbn::ImbalanceMsg>() {
749 Some(m) => m,
750 None => return Some(Err(anyhow::anyhow!("Invalid `ImbalanceMsg`"))),
751 };
752 let ts_init = msg.ts_recv.into();
753
754 match decode_imbalance_msg(msg, instrument_id, price_precision, Some(ts_init)) {
755 Ok(data) => Some(Ok(data)),
756 Err(e) => Some(Err(e)),
757 }
758 }
759 None => None,
760 }
761 }))
762 }
763
764 pub fn read_statistics_records<T>(
770 &self,
771 filepath: &Path,
772 instrument_id: Option<InstrumentId>,
773 price_precision: Option<u8>,
774 ) -> anyhow::Result<impl Iterator<Item = anyhow::Result<DatabentoStatistics>> + '_>
775 where
776 T: dbn::Record + dbn::HasRType + 'static,
777 {
778 let decoder = Decoder::from_zstd_file(filepath)?;
779 let metadata = decoder.metadata().clone();
780 let mut metadata_cache = MetadataCache::new(metadata);
781 let mut dbn_stream = decoder.decode_stream::<T>();
782
783 let price_precision = price_precision.unwrap_or(Currency::USD().precision);
784
785 Ok(std::iter::from_fn(move || {
786 if let Err(e) = dbn_stream.advance() {
787 return Some(Err(e.into()));
788 }
789
790 match dbn_stream.get() {
791 Some(rec) => {
792 let record = dbn::RecordRef::from(rec);
793 let instrument_id = match &instrument_id {
794 Some(id) => *id, None => match decode_nautilus_instrument_id(
796 &record,
797 &mut metadata_cache,
798 &self.publisher_venue_map,
799 &self.symbol_venue_map,
800 ) {
801 Ok(id) => id,
802 Err(e) => return Some(Err(e)),
803 },
804 };
805 let msg = match record.get::<dbn::StatMsg>() {
806 Some(m) => m,
807 None => return Some(Err(anyhow::anyhow!("Invalid `StatMsg`"))),
808 };
809 let ts_init = msg.ts_recv.into();
810
811 match decode_statistics_msg(msg, instrument_id, price_precision, Some(ts_init))
812 {
813 Ok(data) => Some(Ok(data)),
814 Err(e) => Some(Err(e)),
815 }
816 }
817 None => None,
818 }
819 }))
820 }
821}
822
823#[cfg(test)]
824mod tests {
825 use std::path::{Path, PathBuf};
826
827 use nautilus_model::types::{Price, Quantity};
828 use rstest::{fixture, rstest};
829 use ustr::Ustr;
830
831 use super::*;
832
833 fn test_data_path() -> PathBuf {
834 Path::new(env!("CARGO_MANIFEST_DIR")).join("test_data")
835 }
836
837 #[fixture]
838 fn loader() -> DatabentoDataLoader {
839 let publishers_filepath = Path::new(env!("CARGO_MANIFEST_DIR")).join("publishers.json");
840 DatabentoDataLoader::new(Some(publishers_filepath)).unwrap()
841 }
842
843 #[rstest]
846 fn test_set_dataset_venue_mapping(mut loader: DatabentoDataLoader) {
847 let dataset = Ustr::from("EQUS.PLUS");
848 let venue = Venue::from("XNAS");
849 loader.set_dataset_for_venue(dataset, venue);
850
851 let result = loader.get_dataset_for_venue(&venue).unwrap();
852 assert_eq!(*result, dataset);
853 }
854
855 #[rstest]
856 fn test_default_venue_dataset_mappings(loader: DatabentoDataLoader) {
857 let xcme = Venue::XCME();
858 let result = loader.get_dataset_for_venue(&xcme).unwrap();
859 assert_eq!(*result, Ustr::from("GLBX.MDP3"));
860
861 let xcbo = Venue::from("XCBO");
862 let result = loader.get_dataset_for_venue(&xcbo).unwrap();
863 assert_eq!(*result, Ustr::from("OPRA.PILLAR"));
864 }
865
866 #[rstest]
867 #[case(test_data_path().join("test_data.definition.dbn.zst"))]
868 fn test_load_instruments(mut loader: DatabentoDataLoader, #[case] path: PathBuf) {
869 let instruments = loader.load_instruments(&path, false, false).unwrap();
870
871 assert_eq!(instruments.len(), 2);
872 }
873
874 #[rstest]
875 fn test_load_order_book_deltas(loader: DatabentoDataLoader) {
876 let path = test_data_path().join("test_data.mbo.dbn.zst");
877 let instrument_id = InstrumentId::from("ESM4.GLBX");
878
879 let deltas = loader
880 .load_order_book_deltas(&path, Some(instrument_id), None)
881 .unwrap();
882
883 assert_eq!(deltas.len(), 2);
884 }
885
886 #[rstest]
887 fn test_load_order_book_depth10(loader: DatabentoDataLoader) {
888 let path = test_data_path().join("test_data.mbp-10.dbn.zst");
889 let instrument_id = InstrumentId::from("ESM4.GLBX");
890
891 let depths = loader
892 .load_order_book_depth10(&path, Some(instrument_id), None)
893 .unwrap();
894
895 assert_eq!(depths.len(), 2);
896 }
897
898 #[rstest]
899 fn test_load_quotes(loader: DatabentoDataLoader) {
900 let path = test_data_path().join("test_data.mbp-1.dbn.zst");
901 let instrument_id = InstrumentId::from("ESM4.GLBX");
902
903 let quotes = loader
904 .load_quotes(&path, Some(instrument_id), None)
905 .unwrap();
906
907 assert_eq!(quotes.len(), 2);
908 }
909
910 #[rstest]
911 #[case(test_data_path().join("test_data.bbo-1s.dbn.zst"))]
912 #[case(test_data_path().join("test_data.bbo-1m.dbn.zst"))]
913 fn test_load_bbo_quotes(loader: DatabentoDataLoader, #[case] path: PathBuf) {
914 let instrument_id = InstrumentId::from("ESM4.GLBX");
915
916 let quotes = loader
917 .load_bbo_quotes(&path, Some(instrument_id), None)
918 .unwrap();
919
920 assert_eq!(quotes.len(), 4);
921 }
922
923 #[rstest]
924 fn test_load_cmbp_quotes(loader: DatabentoDataLoader) {
925 let path = test_data_path().join("test_data.cmbp-1.dbn.zst");
926 let instrument_id = InstrumentId::from("ESM4.GLBX");
927
928 let quotes = loader
929 .load_cmbp_quotes(&path, Some(instrument_id), None)
930 .unwrap();
931
932 assert_eq!(quotes.len(), 2);
934
935 let first_quote = "es[0];
937 assert_eq!(first_quote.instrument_id, instrument_id);
938 assert_eq!(first_quote.bid_price, Price::from("3720.25"));
939 assert_eq!(first_quote.ask_price, Price::from("3720.50"));
940 assert_eq!(first_quote.bid_size, Quantity::from(24));
941 assert_eq!(first_quote.ask_size, Quantity::from(11));
942 assert_eq!(first_quote.ts_event, 1609160400006136329);
943 assert_eq!(first_quote.ts_init, 1609160400006136329);
944 }
945
946 #[rstest]
947 fn test_load_cbbo_quotes(loader: DatabentoDataLoader) {
948 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
949 let instrument_id = InstrumentId::from("ESM4.GLBX");
950
951 let quotes = loader
952 .load_cbbo_quotes(&path, Some(instrument_id), None)
953 .unwrap();
954
955 assert_eq!(quotes.len(), 2);
957
958 let first_quote = "es[0];
960 assert_eq!(first_quote.instrument_id, instrument_id);
961 assert_eq!(first_quote.bid_price, Price::from("3720.25"));
962 assert_eq!(first_quote.ask_price, Price::from("3720.50"));
963 assert_eq!(first_quote.bid_size, Quantity::from(24));
964 assert_eq!(first_quote.ask_size, Quantity::from(11));
965 assert_eq!(first_quote.ts_event, 1609160400006136329);
966 assert_eq!(first_quote.ts_init, 1609160400006136329);
967 }
968
969 #[rstest]
970 fn test_load_tbbo_trades(loader: DatabentoDataLoader) {
971 let path = test_data_path().join("test_data.tbbo.dbn.zst");
972 let instrument_id = InstrumentId::from("ESM4.GLBX");
973
974 let trades = loader
975 .load_tbbo_trades(&path, Some(instrument_id), None)
976 .unwrap();
977
978 assert_eq!(trades.len(), 0);
980 }
981
982 #[rstest]
983 fn test_load_tcbbo_trades(loader: DatabentoDataLoader) {
984 let path = test_data_path().join("test_data.cbbo-1s.dbn.zst");
987 let instrument_id = InstrumentId::from("ESM4.GLBX");
988
989 let result = loader.load_tcbbo_trades(&path, Some(instrument_id), None);
990
991 assert!(result.is_ok());
992 let trades = result.unwrap();
993 assert_eq!(trades.len(), 2);
994 }
995
996 #[rstest]
997 fn test_load_trades(loader: DatabentoDataLoader) {
998 let path = test_data_path().join("test_data.trades.dbn.zst");
999 let instrument_id = InstrumentId::from("ESM4.GLBX");
1000 let trades = loader
1001 .load_trades(&path, Some(instrument_id), None)
1002 .unwrap();
1003
1004 assert_eq!(trades.len(), 2);
1005 }
1006
1007 #[rstest]
1008 #[case(test_data_path().join("test_data.ohlcv-1h.dbn.zst"))]
1010 #[case(test_data_path().join("test_data.ohlcv-1m.dbn.zst"))]
1011 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
1012 fn test_load_bars(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1013 let instrument_id = InstrumentId::from("ESM4.GLBX");
1014 let bars = loader
1015 .load_bars(&path, Some(instrument_id), None, None)
1016 .unwrap();
1017
1018 assert_eq!(bars.len(), 2);
1019 }
1020
1021 #[rstest]
1022 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
1023 fn test_load_bars_timestamp_on_close_true(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1024 let instrument_id = InstrumentId::from("ESM4.GLBX");
1025 let bars = loader
1026 .load_bars(&path, Some(instrument_id), None, Some(true))
1027 .unwrap();
1028
1029 assert_eq!(bars.len(), 2);
1030
1031 for bar in &bars {
1033 assert_eq!(
1034 bar.ts_event, bar.ts_init,
1035 "ts_event and ts_init should both be close time when bars_timestamp_on_close=true"
1036 );
1037 }
1038 }
1039
1040 #[rstest]
1041 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"))]
1042 fn test_load_bars_timestamp_on_close_false(loader: DatabentoDataLoader, #[case] path: PathBuf) {
1043 let instrument_id = InstrumentId::from("ESM4.GLBX");
1044 let bars = loader
1045 .load_bars(&path, Some(instrument_id), None, Some(false))
1046 .unwrap();
1047
1048 assert_eq!(bars.len(), 2);
1049
1050 for bar in &bars {
1052 assert_ne!(
1053 bar.ts_event, bar.ts_init,
1054 "ts_event should be open time and ts_init should be close time when bars_timestamp_on_close=false"
1055 );
1056 assert_eq!(bar.ts_init.as_u64(), bar.ts_event.as_u64() + 1_000_000_000);
1058 }
1059 }
1060
1061 #[rstest]
1062 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 0)]
1063 #[case(test_data_path().join("test_data.ohlcv-1s.dbn.zst"), 1)]
1064 fn test_load_bars_timestamp_comparison(
1065 loader: DatabentoDataLoader,
1066 #[case] path: PathBuf,
1067 #[case] bar_index: usize,
1068 ) {
1069 const ONE_SECOND_NS: u64 = 1_000_000_000;
1070
1071 let instrument_id = InstrumentId::from("ESM4.GLBX");
1072
1073 let bars_close = loader
1074 .load_bars(&path, Some(instrument_id), None, Some(true))
1075 .unwrap();
1076
1077 let bars_open = loader
1078 .load_bars(&path, Some(instrument_id), None, Some(false))
1079 .unwrap();
1080
1081 assert_eq!(bars_close.len(), bars_open.len());
1082 assert_eq!(bars_close.len(), 2);
1083
1084 let bar_close = &bars_close[bar_index];
1085 let bar_open = &bars_open[bar_index];
1086
1087 assert_eq!(bar_close.open, bar_open.open);
1089 assert_eq!(bar_close.high, bar_open.high);
1090 assert_eq!(bar_close.low, bar_open.low);
1091 assert_eq!(bar_close.close, bar_open.close);
1092 assert_eq!(bar_close.volume, bar_open.volume);
1093
1094 assert!(
1097 bar_close.ts_event > bar_open.ts_event,
1098 "Close-timestamped bar should have later timestamp than open-timestamped bar"
1099 );
1100
1101 assert_eq!(
1103 bar_close.ts_event.as_u64() - bar_open.ts_event.as_u64(),
1104 ONE_SECOND_NS,
1105 "Timestamp difference should be exactly 1 second for 1s bars"
1106 );
1107 }
1108
1109 #[rstest]
1110 fn test_load_status_records(loader: DatabentoDataLoader) {
1111 let path = test_data_path().join("test_data.status.dbn.zst");
1112 let instrument_id = InstrumentId::from("ESM4.GLBX");
1113
1114 let statuses = loader
1115 .load_status_records::<dbn::StatusMsg>(&path, Some(instrument_id))
1116 .unwrap()
1117 .collect::<anyhow::Result<Vec<_>>>()
1118 .unwrap();
1119
1120 assert_eq!(statuses.len(), 4, "Should load exactly 4 status records");
1122
1123 let first = &statuses[0];
1125 assert_eq!(first.instrument_id, instrument_id);
1126 assert_eq!(first.ts_event.as_u64(), 1609110000000000000);
1127 assert_eq!(first.ts_init.as_u64(), 1609113600000000000);
1128 }
1129
1130 #[rstest]
1131 fn test_read_imbalance_records(loader: DatabentoDataLoader) {
1132 let path = test_data_path().join("test_data.imbalance.dbn.zst");
1133 let instrument_id = InstrumentId::from("ESM4.GLBX");
1134
1135 let imbalances = loader
1136 .read_imbalance_records::<dbn::ImbalanceMsg>(&path, Some(instrument_id), None)
1137 .unwrap()
1138 .collect::<anyhow::Result<Vec<_>>>()
1139 .unwrap();
1140
1141 assert_eq!(
1143 imbalances.len(),
1144 2,
1145 "Should load exactly 2 imbalance records"
1146 );
1147
1148 let first = &imbalances[0];
1150 assert_eq!(first.instrument_id, instrument_id);
1151 assert!(
1152 first.ref_price.as_f64() > 0.0,
1153 "ref_price should be positive"
1154 );
1155 assert!(first.ts_event.as_u64() > 0, "ts_event should be set");
1156 assert!(first.ts_recv.as_u64() > 0, "ts_recv should be set");
1157 assert!(first.ts_init.as_u64() > 0, "ts_init should be set");
1158 }
1159
1160 #[rstest]
1161 fn test_read_statistics_records(loader: DatabentoDataLoader) {
1162 let path = test_data_path().join("test_data.statistics.dbn.zst");
1163 let instrument_id = InstrumentId::from("ESM4.GLBX");
1164
1165 let statistics = loader
1166 .read_statistics_records::<dbn::StatMsg>(&path, Some(instrument_id), None)
1167 .unwrap()
1168 .collect::<anyhow::Result<Vec<_>>>()
1169 .unwrap();
1170
1171 assert_eq!(
1173 statistics.len(),
1174 2,
1175 "Should load exactly 2 statistics records"
1176 );
1177
1178 let first = &statistics[0];
1180 assert_eq!(first.instrument_id, instrument_id);
1181 assert!(first.ts_event.as_u64() > 0, "ts_event should be set");
1182 assert!(first.ts_recv.as_u64() > 0, "ts_recv should be set");
1183 assert!(first.ts_init.as_u64() > 0, "ts_init should be set");
1184 assert!(first.sequence > 0, "sequence should be positive");
1185 }
1186}