Skip to main content

nautilus_databento/python/
loader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Databento data loader.
17
18use std::{collections::HashMap, path::PathBuf};
19
20use databento::dbn;
21use nautilus_core::{
22    ffi::cvec::CVec,
23    python::{IntoPyObjectNautilusExt, to_pyvalue_err},
24};
25use nautilus_model::{
26    data::{
27        Bar, Data, DataFFI, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick,
28        TradeTick,
29    },
30    identifiers::{InstrumentId, Venue},
31    python::instruments::instrument_any_to_pyobject,
32};
33use pyo3::{
34    prelude::*,
35    types::{PyCapsule, PyList},
36};
37use ustr::Ustr;
38
39use crate::{
40    loader::DatabentoDataLoader,
41    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
42};
43
44#[expect(clippy::needless_pass_by_value)]
45#[pymethods]
46#[pyo3_stub_gen::derive::gen_stub_pymethods]
47impl DatabentoDataLoader {
48    /// A Nautilus data loader for Databento Binary Encoding (DBN) format data.
49    ///
50    /// # Supported Schemas
51    ///  - `MBO` -> `OrderBookDelta`
52    ///  - `MBP_1` -> `(QuoteTick, Option<TradeTick>)`
53    ///  - `MBP_10` -> `OrderBookDepth10`
54    ///  - `BBO_1S` -> `QuoteTick`
55    ///  - `BBO_1M` -> `QuoteTick`
56    ///  - `CMBP_1` -> `(QuoteTick, Option<TradeTick>)`
57    ///  - `CBBO_1S` -> `QuoteTick`
58    ///  - `CBBO_1M` -> `QuoteTick`
59    ///  - `TCBBO` -> `(QuoteTick, TradeTick)`
60    ///  - `TBBO` -> `(QuoteTick, TradeTick)`
61    ///  - `TRADES` -> `TradeTick`
62    ///  - `OHLCV_1S` -> `Bar`
63    ///  - `OHLCV_1M` -> `Bar`
64    ///  - `OHLCV_1H` -> `Bar`
65    ///  - `OHLCV_1D` -> `Bar`
66    ///  - `OHLCV_EOD` -> `Bar`
67    ///  - `DEFINITION` -> `Instrument`
68    ///  - `IMBALANCE` -> `DatabentoImbalance`
69    ///  - `STATISTICS` -> `DatabentoStatistics`
70    ///  - `STATUS` -> `InstrumentStatus`
71    ///
72    /// # References
73    ///
74    /// <https://databento.com/docs/schemas-and-data-formats>
75    #[new]
76    #[pyo3(signature = (publishers_filepath=None))]
77    fn py_new(publishers_filepath: Option<PathBuf>) -> PyResult<Self> {
78        Self::new(publishers_filepath).map_err(to_pyvalue_err)
79    }
80
81    /// Load the publishers data from the file at the given `filepath`.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the file cannot be read or parsed as JSON.
86    #[pyo3(name = "load_publishers")]
87    fn py_load_publishers(&mut self, publishers_filepath: PathBuf) -> PyResult<()> {
88        self.load_publishers(publishers_filepath)
89            .map_err(to_pyvalue_err)
90    }
91
92    /// Returns the internal Databento publishers currently held by the loader.
93    #[must_use]
94    #[pyo3(name = "get_publishers")]
95    fn py_get_publishers(&self) -> HashMap<u16, DatabentoPublisher> {
96        self.get_publishers()
97            .iter()
98            .map(|(&key, value)| (key, value.clone()))
99            .collect::<HashMap<u16, DatabentoPublisher>>()
100    }
101
102    /// Sets the `venue` to map to the given `dataset`.
103    #[pyo3(name = "set_dataset_for_venue")]
104    fn py_set_dataset_for_venue(&mut self, dataset: String, venue: Venue) {
105        self.set_dataset_for_venue(Ustr::from(&dataset), venue);
106    }
107
108    /// Returns the dataset which matches the given `venue` (if found).
109    #[must_use]
110    #[pyo3(name = "get_dataset_for_venue")]
111    fn py_get_dataset_for_venue(&self, venue: &Venue) -> Option<String> {
112        self.get_dataset_for_venue(venue).map(ToString::to_string)
113    }
114
115    /// Returns the venue which matches the given `publisher_id` (if found).
116    #[must_use]
117    #[pyo3(name = "get_venue_for_publisher")]
118    fn py_get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<String> {
119        self.get_venue_for_publisher(publisher_id)
120            .map(ToString::to_string)
121    }
122
123    #[pyo3(name = "schema_for_file")]
124    fn py_schema_for_file(&self, filepath: PathBuf) -> PyResult<Option<String>> {
125        self.schema_from_file(&filepath).map_err(to_pyvalue_err)
126    }
127
128    /// Loads all instrument definitions from a DBN file.
129    ///
130    /// When `skip_on_error` is true, instruments that fail to decode are logged
131    /// as warnings and skipped. When false (default), any decode error is propagated.
132    #[pyo3(name = "load_instruments")]
133    #[pyo3(signature = (filepath, use_exchange_as_venue, skip_on_error=false))]
134    fn py_load_instruments(
135        &mut self,
136        py: Python,
137        filepath: PathBuf,
138        use_exchange_as_venue: bool,
139        skip_on_error: bool,
140    ) -> PyResult<Py<PyAny>> {
141        let iter = self
142            .load_instruments(&filepath, use_exchange_as_venue, skip_on_error)
143            .map_err(to_pyvalue_err)?;
144
145        let mut data = Vec::new();
146
147        for instrument in iter {
148            let py_object = instrument_any_to_pyobject(py, instrument)?;
149            data.push(py_object);
150        }
151
152        let list = PyList::new(py, &data).expect("Invalid `ExactSizeIterator`");
153
154        Ok(list.into_py_any_unwrap(py))
155    }
156
157    // Cannot include trades
158    /// Loads order book delta messages from a DBN MBO schema file.
159    ///
160    /// Cannot include trades.
161    #[pyo3(name = "load_order_book_deltas")]
162    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
163    fn py_load_order_book_deltas(
164        &self,
165        filepath: PathBuf,
166        instrument_id: Option<InstrumentId>,
167        price_precision: Option<u8>,
168    ) -> PyResult<Vec<OrderBookDelta>> {
169        self.load_order_book_deltas(&filepath, instrument_id, price_precision)
170            .map_err(to_pyvalue_err)
171    }
172
173    #[pyo3(name = "load_order_book_deltas_as_pycapsule")]
174    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
175    fn py_load_order_book_deltas_as_pycapsule(
176        &self,
177        py: Python,
178        filepath: PathBuf,
179        instrument_id: Option<InstrumentId>,
180        price_precision: Option<u8>,
181        include_trades: Option<bool>,
182    ) -> PyResult<Py<PyAny>> {
183        let iter = self
184            .read_records::<dbn::MboMsg>(
185                &filepath,
186                instrument_id,
187                price_precision,
188                include_trades.unwrap_or(false),
189                None,
190            )
191            .map_err(to_pyvalue_err)?;
192
193        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
194    }
195
196    /// Loads order book depth10 snapshots from a DBN MBP-10 schema file.
197    #[pyo3(name = "load_order_book_depth10")]
198    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
199    fn py_load_order_book_depth10(
200        &self,
201        filepath: PathBuf,
202        instrument_id: Option<InstrumentId>,
203        price_precision: Option<u8>,
204    ) -> PyResult<Vec<OrderBookDepth10>> {
205        self.load_order_book_depth10(&filepath, instrument_id, price_precision)
206            .map_err(to_pyvalue_err)
207    }
208
209    #[pyo3(name = "load_order_book_depth10_as_pycapsule")]
210    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
211    fn py_load_order_book_depth10_as_pycapsule(
212        &self,
213        py: Python,
214        filepath: PathBuf,
215        instrument_id: Option<InstrumentId>,
216        price_precision: Option<u8>,
217    ) -> PyResult<Py<PyAny>> {
218        let iter = self
219            .read_records::<dbn::Mbp10Msg>(&filepath, instrument_id, price_precision, false, None)
220            .map_err(to_pyvalue_err)?;
221
222        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
223    }
224
225    /// Loads quote tick messages from a DBN MBP-1 or TBBO schema file.
226    #[pyo3(name = "load_quotes")]
227    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
228    fn py_load_quotes(
229        &self,
230        filepath: PathBuf,
231        instrument_id: Option<InstrumentId>,
232        price_precision: Option<u8>,
233    ) -> PyResult<Vec<QuoteTick>> {
234        self.load_quotes(&filepath, instrument_id, price_precision)
235            .map_err(to_pyvalue_err)
236    }
237
238    #[pyo3(name = "load_quotes_as_pycapsule")]
239    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
240    fn py_load_quotes_as_pycapsule(
241        &self,
242        py: Python,
243        filepath: PathBuf,
244        instrument_id: Option<InstrumentId>,
245        price_precision: Option<u8>,
246        include_trades: Option<bool>,
247    ) -> PyResult<Py<PyAny>> {
248        let iter = self
249            .read_records::<dbn::Mbp1Msg>(
250                &filepath,
251                instrument_id,
252                price_precision,
253                include_trades.unwrap_or(false),
254                None,
255            )
256            .map_err(to_pyvalue_err)?;
257
258        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
259    }
260
261    /// Loads best bid/offer quote messages from a DBN BBO schema file.
262    #[pyo3(name = "load_bbo_quotes")]
263    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
264    fn py_load_bbo_quotes(
265        &self,
266        filepath: PathBuf,
267        instrument_id: Option<InstrumentId>,
268        price_precision: Option<u8>,
269    ) -> PyResult<Vec<QuoteTick>> {
270        self.load_bbo_quotes(&filepath, instrument_id, price_precision)
271            .map_err(to_pyvalue_err)
272    }
273
274    #[pyo3(name = "load_bbo_quotes_as_pycapsule")]
275    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
276    fn py_load_bbo_quotes_as_pycapsule(
277        &self,
278        py: Python,
279        filepath: PathBuf,
280        instrument_id: Option<InstrumentId>,
281        price_precision: Option<u8>,
282    ) -> PyResult<Py<PyAny>> {
283        let iter = self
284            .read_records::<dbn::BboMsg>(&filepath, instrument_id, price_precision, false, None)
285            .map_err(to_pyvalue_err)?;
286
287        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
288    }
289
290    /// Loads consolidated MBP-1 quote messages from a DBN CMBP-1 schema file.
291    #[pyo3(name = "load_cmbp_quotes")]
292    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
293    fn py_load_cmbp_quotes(
294        &self,
295        filepath: PathBuf,
296        instrument_id: Option<InstrumentId>,
297        price_precision: Option<u8>,
298    ) -> PyResult<Vec<QuoteTick>> {
299        self.load_cmbp_quotes(&filepath, instrument_id, price_precision)
300            .map_err(to_pyvalue_err)
301    }
302
303    #[pyo3(name = "load_cmbp_quotes_as_pycapsule")]
304    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
305    fn py_load_cmbp_quotes_as_pycapsule(
306        &self,
307        py: Python,
308        filepath: PathBuf,
309        instrument_id: Option<InstrumentId>,
310        price_precision: Option<u8>,
311        include_trades: Option<bool>,
312    ) -> PyResult<Py<PyAny>> {
313        let iter = self
314            .read_records::<dbn::Cmbp1Msg>(
315                &filepath,
316                instrument_id,
317                price_precision,
318                include_trades.unwrap_or(false),
319                None,
320            )
321            .map_err(to_pyvalue_err)?;
322
323        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
324    }
325
326    /// Loads consolidated best bid/offer quote messages from a DBN CBBO schema file.
327    #[pyo3(name = "load_cbbo_quotes")]
328    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
329    fn py_load_cbbo_quotes(
330        &self,
331        filepath: PathBuf,
332        instrument_id: Option<InstrumentId>,
333        price_precision: Option<u8>,
334    ) -> PyResult<Vec<QuoteTick>> {
335        self.load_cbbo_quotes(&filepath, instrument_id, price_precision)
336            .map_err(to_pyvalue_err)
337    }
338
339    #[pyo3(name = "load_cbbo_quotes_as_pycapsule")]
340    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
341    fn py_load_cbbo_quotes_as_pycapsule(
342        &self,
343        py: Python,
344        filepath: PathBuf,
345        instrument_id: Option<InstrumentId>,
346        price_precision: Option<u8>,
347    ) -> PyResult<Py<PyAny>> {
348        let iter = self
349            .read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
350            .map_err(to_pyvalue_err)?;
351
352        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
353    }
354
355    /// Loads trade messages from a DBN TBBO schema file.
356    #[pyo3(name = "load_tbbo_trades")]
357    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
358    fn py_load_tbbo_trades(
359        &self,
360        filepath: PathBuf,
361        instrument_id: Option<InstrumentId>,
362        price_precision: Option<u8>,
363    ) -> PyResult<Vec<TradeTick>> {
364        self.load_tbbo_trades(&filepath, instrument_id, price_precision)
365            .map_err(to_pyvalue_err)
366    }
367
368    #[pyo3(name = "load_tbbo_trades_as_pycapsule")]
369    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
370    fn py_load_tbbo_trades_as_pycapsule(
371        &self,
372        py: Python,
373        filepath: PathBuf,
374        instrument_id: Option<InstrumentId>,
375        price_precision: Option<u8>,
376    ) -> PyResult<Py<PyAny>> {
377        let iter = self
378            .read_records::<dbn::TbboMsg>(&filepath, instrument_id, price_precision, false, None)
379            .map_err(to_pyvalue_err)?;
380
381        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
382    }
383
384    /// Loads trade messages from a DBN TCBBO schema file.
385    #[pyo3(name = "load_tcbbo_trades")]
386    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
387    fn py_load_tcbbo_trades(
388        &self,
389        filepath: PathBuf,
390        instrument_id: Option<InstrumentId>,
391        price_precision: Option<u8>,
392    ) -> PyResult<Vec<TradeTick>> {
393        self.load_tcbbo_trades(&filepath, instrument_id, price_precision)
394            .map_err(to_pyvalue_err)
395    }
396
397    #[pyo3(name = "load_tcbbo_trades_as_pycapsule")]
398    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
399    fn py_load_tcbbo_trades_as_pycapsule(
400        &self,
401        py: Python,
402        filepath: PathBuf,
403        instrument_id: Option<InstrumentId>,
404        price_precision: Option<u8>,
405    ) -> PyResult<Py<PyAny>> {
406        let iter = self
407            .read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
408            .map_err(to_pyvalue_err)?;
409
410        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
411    }
412
413    /// Loads trade messages from a DBN TRADES schema file.
414    #[pyo3(name = "load_trades")]
415    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
416    fn py_load_trades(
417        &self,
418        filepath: PathBuf,
419        instrument_id: Option<InstrumentId>,
420        price_precision: Option<u8>,
421    ) -> PyResult<Vec<TradeTick>> {
422        self.load_trades(&filepath, instrument_id, price_precision)
423            .map_err(to_pyvalue_err)
424    }
425
426    #[pyo3(name = "load_trades_as_pycapsule")]
427    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
428    fn py_load_trades_as_pycapsule(
429        &self,
430        py: Python,
431        filepath: PathBuf,
432        instrument_id: Option<InstrumentId>,
433        price_precision: Option<u8>,
434    ) -> PyResult<Py<PyAny>> {
435        let iter = self
436            .read_records::<dbn::TradeMsg>(&filepath, instrument_id, price_precision, false, None)
437            .map_err(to_pyvalue_err)?;
438
439        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
440    }
441
442    /// Loads OHLCV bar messages from a DBN OHLCV schema file.
443    #[pyo3(name = "load_bars")]
444    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
445    fn py_load_bars(
446        &self,
447        filepath: PathBuf,
448        instrument_id: Option<InstrumentId>,
449        price_precision: Option<u8>,
450        timestamp_on_close: bool,
451    ) -> PyResult<Vec<Bar>> {
452        self.load_bars(
453            &filepath,
454            instrument_id,
455            price_precision,
456            Some(timestamp_on_close),
457        )
458        .map_err(to_pyvalue_err)
459    }
460
461    #[pyo3(name = "load_bars_as_pycapsule")]
462    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
463    fn py_load_bars_as_pycapsule(
464        &self,
465        py: Python,
466        filepath: PathBuf,
467        instrument_id: Option<InstrumentId>,
468        price_precision: Option<u8>,
469        timestamp_on_close: bool,
470    ) -> PyResult<Py<PyAny>> {
471        let iter = self
472            .read_records::<dbn::OhlcvMsg>(
473                &filepath,
474                instrument_id,
475                price_precision,
476                false,
477                Some(timestamp_on_close),
478            )
479            .map_err(to_pyvalue_err)?;
480
481        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
482    }
483
484    #[pyo3(name = "load_status")]
485    #[pyo3(signature = (filepath, instrument_id=None))]
486    fn py_load_status(
487        &self,
488        filepath: PathBuf,
489        instrument_id: Option<InstrumentId>,
490    ) -> PyResult<Vec<InstrumentStatus>> {
491        let iter = self
492            .load_status_records::<dbn::StatusMsg>(&filepath, instrument_id)
493            .map_err(to_pyvalue_err)?;
494
495        let mut data = Vec::new();
496
497        for result in iter {
498            match result {
499                Ok(item) => data.push(item),
500                Err(e) => return Err(to_pyvalue_err(e)),
501            }
502        }
503
504        Ok(data)
505    }
506
507    #[pyo3(name = "load_imbalance")]
508    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
509    fn py_load_imbalance(
510        &self,
511        filepath: PathBuf,
512        instrument_id: Option<InstrumentId>,
513        price_precision: Option<u8>,
514    ) -> PyResult<Vec<DatabentoImbalance>> {
515        let iter = self
516            .read_imbalance_records::<dbn::ImbalanceMsg>(&filepath, instrument_id, price_precision)
517            .map_err(to_pyvalue_err)?;
518
519        let mut data = Vec::new();
520
521        for result in iter {
522            match result {
523                Ok(item) => data.push(item),
524                Err(e) => return Err(to_pyvalue_err(e)),
525            }
526        }
527
528        Ok(data)
529    }
530
531    #[pyo3(name = "load_statistics")]
532    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
533    fn py_load_statistics(
534        &self,
535        filepath: PathBuf,
536        instrument_id: Option<InstrumentId>,
537        price_precision: Option<u8>,
538    ) -> PyResult<Vec<DatabentoStatistics>> {
539        let iter = self
540            .read_statistics_records::<dbn::StatMsg>(&filepath, instrument_id, price_precision)
541            .map_err(to_pyvalue_err)?;
542
543        let mut data = Vec::new();
544
545        for result in iter {
546            match result {
547                Ok(item) => data.push(item),
548                Err(e) => return Err(to_pyvalue_err(e)),
549            }
550        }
551
552        Ok(data)
553    }
554}
555
556fn exhaust_data_iter_to_pycapsule(
557    py: Python,
558    iter: impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>>,
559) -> anyhow::Result<Py<PyAny>> {
560    let mut data = Vec::new();
561
562    for result in iter {
563        match result {
564            Ok((Some(item1), None)) => data.push(item1),
565            Ok((None, Some(item2))) => data.push(item2),
566            Ok((Some(item1), Some(item2))) => {
567                data.push(item1);
568                data.push(item2);
569            }
570            Ok((None, None)) => {}
571            Err(e) => return Err(e),
572        }
573    }
574
575    let ffi_data: Vec<DataFFI> = data
576        .into_iter()
577        .map(DataFFI::try_from)
578        .collect::<Result<Vec<_>, _>>()
579        .map_err(to_pyvalue_err)?;
580    let cvec: CVec = ffi_data.into();
581    // No destructor: Python must call drop_cvec_pycapsule to take ownership and free.
582    let capsule = PyCapsule::new_with_destructor::<CVec, _>(py, cvec, None, |_, _| {})?;
583
584    // TODO: Improve error domain. Replace anyhow errors with nautilus
585    // errors to unify pyo3 and anyhow errors.
586    Ok(capsule.into_py_any_unwrap(py))
587}