1use std::{collections::HashMap, path::PathBuf};
19
20use databento::dbn;
21use nautilus_core::{
22 ffi::cvec::CVec,
23 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
24};
25use nautilus_model::{
26 data::{
27 Bar, Data, DataFFI, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick,
28 TradeTick,
29 },
30 identifiers::{InstrumentId, Venue},
31 python::instruments::instrument_any_to_pyobject,
32};
33use pyo3::{
34 prelude::*,
35 types::{PyCapsule, PyList},
36};
37use ustr::Ustr;
38
39use crate::{
40 loader::DatabentoDataLoader,
41 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
42};
43
44#[expect(clippy::needless_pass_by_value)]
45#[pymethods]
46#[pyo3_stub_gen::derive::gen_stub_pymethods]
47impl DatabentoDataLoader {
48 #[new]
76 #[pyo3(signature = (publishers_filepath=None))]
77 fn py_new(publishers_filepath: Option<PathBuf>) -> PyResult<Self> {
78 Self::new(publishers_filepath).map_err(to_pyvalue_err)
79 }
80
81 #[pyo3(name = "load_publishers")]
87 fn py_load_publishers(&mut self, publishers_filepath: PathBuf) -> PyResult<()> {
88 self.load_publishers(publishers_filepath)
89 .map_err(to_pyvalue_err)
90 }
91
92 #[must_use]
94 #[pyo3(name = "get_publishers")]
95 fn py_get_publishers(&self) -> HashMap<u16, DatabentoPublisher> {
96 self.get_publishers()
97 .iter()
98 .map(|(&key, value)| (key, value.clone()))
99 .collect::<HashMap<u16, DatabentoPublisher>>()
100 }
101
102 #[pyo3(name = "set_dataset_for_venue")]
104 fn py_set_dataset_for_venue(&mut self, dataset: String, venue: Venue) {
105 self.set_dataset_for_venue(Ustr::from(&dataset), venue);
106 }
107
108 #[must_use]
110 #[pyo3(name = "get_dataset_for_venue")]
111 fn py_get_dataset_for_venue(&self, venue: &Venue) -> Option<String> {
112 self.get_dataset_for_venue(venue).map(ToString::to_string)
113 }
114
115 #[must_use]
117 #[pyo3(name = "get_venue_for_publisher")]
118 fn py_get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<String> {
119 self.get_venue_for_publisher(publisher_id)
120 .map(ToString::to_string)
121 }
122
123 #[pyo3(name = "schema_for_file")]
124 fn py_schema_for_file(&self, filepath: PathBuf) -> PyResult<Option<String>> {
125 self.schema_from_file(&filepath).map_err(to_pyvalue_err)
126 }
127
128 #[pyo3(name = "load_instruments")]
133 #[pyo3(signature = (filepath, use_exchange_as_venue, skip_on_error=false))]
134 fn py_load_instruments(
135 &mut self,
136 py: Python,
137 filepath: PathBuf,
138 use_exchange_as_venue: bool,
139 skip_on_error: bool,
140 ) -> PyResult<Py<PyAny>> {
141 let iter = self
142 .load_instruments(&filepath, use_exchange_as_venue, skip_on_error)
143 .map_err(to_pyvalue_err)?;
144
145 let mut data = Vec::new();
146
147 for instrument in iter {
148 let py_object = instrument_any_to_pyobject(py, instrument)?;
149 data.push(py_object);
150 }
151
152 let list = PyList::new(py, &data).expect("Invalid `ExactSizeIterator`");
153
154 Ok(list.into_py_any_unwrap(py))
155 }
156
157 #[pyo3(name = "load_order_book_deltas")]
162 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
163 fn py_load_order_book_deltas(
164 &self,
165 filepath: PathBuf,
166 instrument_id: Option<InstrumentId>,
167 price_precision: Option<u8>,
168 ) -> PyResult<Vec<OrderBookDelta>> {
169 self.load_order_book_deltas(&filepath, instrument_id, price_precision)
170 .map_err(to_pyvalue_err)
171 }
172
173 #[pyo3(name = "load_order_book_deltas_as_pycapsule")]
174 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
175 fn py_load_order_book_deltas_as_pycapsule(
176 &self,
177 py: Python,
178 filepath: PathBuf,
179 instrument_id: Option<InstrumentId>,
180 price_precision: Option<u8>,
181 include_trades: Option<bool>,
182 ) -> PyResult<Py<PyAny>> {
183 let iter = self
184 .read_records::<dbn::MboMsg>(
185 &filepath,
186 instrument_id,
187 price_precision,
188 include_trades.unwrap_or(false),
189 None,
190 )
191 .map_err(to_pyvalue_err)?;
192
193 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
194 }
195
196 #[pyo3(name = "load_order_book_depth10")]
198 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
199 fn py_load_order_book_depth10(
200 &self,
201 filepath: PathBuf,
202 instrument_id: Option<InstrumentId>,
203 price_precision: Option<u8>,
204 ) -> PyResult<Vec<OrderBookDepth10>> {
205 self.load_order_book_depth10(&filepath, instrument_id, price_precision)
206 .map_err(to_pyvalue_err)
207 }
208
209 #[pyo3(name = "load_order_book_depth10_as_pycapsule")]
210 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
211 fn py_load_order_book_depth10_as_pycapsule(
212 &self,
213 py: Python,
214 filepath: PathBuf,
215 instrument_id: Option<InstrumentId>,
216 price_precision: Option<u8>,
217 ) -> PyResult<Py<PyAny>> {
218 let iter = self
219 .read_records::<dbn::Mbp10Msg>(&filepath, instrument_id, price_precision, false, None)
220 .map_err(to_pyvalue_err)?;
221
222 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
223 }
224
225 #[pyo3(name = "load_quotes")]
227 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
228 fn py_load_quotes(
229 &self,
230 filepath: PathBuf,
231 instrument_id: Option<InstrumentId>,
232 price_precision: Option<u8>,
233 ) -> PyResult<Vec<QuoteTick>> {
234 self.load_quotes(&filepath, instrument_id, price_precision)
235 .map_err(to_pyvalue_err)
236 }
237
238 #[pyo3(name = "load_quotes_as_pycapsule")]
239 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
240 fn py_load_quotes_as_pycapsule(
241 &self,
242 py: Python,
243 filepath: PathBuf,
244 instrument_id: Option<InstrumentId>,
245 price_precision: Option<u8>,
246 include_trades: Option<bool>,
247 ) -> PyResult<Py<PyAny>> {
248 let iter = self
249 .read_records::<dbn::Mbp1Msg>(
250 &filepath,
251 instrument_id,
252 price_precision,
253 include_trades.unwrap_or(false),
254 None,
255 )
256 .map_err(to_pyvalue_err)?;
257
258 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
259 }
260
261 #[pyo3(name = "load_bbo_quotes")]
263 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
264 fn py_load_bbo_quotes(
265 &self,
266 filepath: PathBuf,
267 instrument_id: Option<InstrumentId>,
268 price_precision: Option<u8>,
269 ) -> PyResult<Vec<QuoteTick>> {
270 self.load_bbo_quotes(&filepath, instrument_id, price_precision)
271 .map_err(to_pyvalue_err)
272 }
273
274 #[pyo3(name = "load_bbo_quotes_as_pycapsule")]
275 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
276 fn py_load_bbo_quotes_as_pycapsule(
277 &self,
278 py: Python,
279 filepath: PathBuf,
280 instrument_id: Option<InstrumentId>,
281 price_precision: Option<u8>,
282 ) -> PyResult<Py<PyAny>> {
283 let iter = self
284 .read_records::<dbn::BboMsg>(&filepath, instrument_id, price_precision, false, None)
285 .map_err(to_pyvalue_err)?;
286
287 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
288 }
289
290 #[pyo3(name = "load_cmbp_quotes")]
292 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
293 fn py_load_cmbp_quotes(
294 &self,
295 filepath: PathBuf,
296 instrument_id: Option<InstrumentId>,
297 price_precision: Option<u8>,
298 ) -> PyResult<Vec<QuoteTick>> {
299 self.load_cmbp_quotes(&filepath, instrument_id, price_precision)
300 .map_err(to_pyvalue_err)
301 }
302
303 #[pyo3(name = "load_cmbp_quotes_as_pycapsule")]
304 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
305 fn py_load_cmbp_quotes_as_pycapsule(
306 &self,
307 py: Python,
308 filepath: PathBuf,
309 instrument_id: Option<InstrumentId>,
310 price_precision: Option<u8>,
311 include_trades: Option<bool>,
312 ) -> PyResult<Py<PyAny>> {
313 let iter = self
314 .read_records::<dbn::Cmbp1Msg>(
315 &filepath,
316 instrument_id,
317 price_precision,
318 include_trades.unwrap_or(false),
319 None,
320 )
321 .map_err(to_pyvalue_err)?;
322
323 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
324 }
325
326 #[pyo3(name = "load_cbbo_quotes")]
328 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
329 fn py_load_cbbo_quotes(
330 &self,
331 filepath: PathBuf,
332 instrument_id: Option<InstrumentId>,
333 price_precision: Option<u8>,
334 ) -> PyResult<Vec<QuoteTick>> {
335 self.load_cbbo_quotes(&filepath, instrument_id, price_precision)
336 .map_err(to_pyvalue_err)
337 }
338
339 #[pyo3(name = "load_cbbo_quotes_as_pycapsule")]
340 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
341 fn py_load_cbbo_quotes_as_pycapsule(
342 &self,
343 py: Python,
344 filepath: PathBuf,
345 instrument_id: Option<InstrumentId>,
346 price_precision: Option<u8>,
347 ) -> PyResult<Py<PyAny>> {
348 let iter = self
349 .read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
350 .map_err(to_pyvalue_err)?;
351
352 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
353 }
354
355 #[pyo3(name = "load_tbbo_trades")]
357 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
358 fn py_load_tbbo_trades(
359 &self,
360 filepath: PathBuf,
361 instrument_id: Option<InstrumentId>,
362 price_precision: Option<u8>,
363 ) -> PyResult<Vec<TradeTick>> {
364 self.load_tbbo_trades(&filepath, instrument_id, price_precision)
365 .map_err(to_pyvalue_err)
366 }
367
368 #[pyo3(name = "load_tbbo_trades_as_pycapsule")]
369 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
370 fn py_load_tbbo_trades_as_pycapsule(
371 &self,
372 py: Python,
373 filepath: PathBuf,
374 instrument_id: Option<InstrumentId>,
375 price_precision: Option<u8>,
376 ) -> PyResult<Py<PyAny>> {
377 let iter = self
378 .read_records::<dbn::TbboMsg>(&filepath, instrument_id, price_precision, false, None)
379 .map_err(to_pyvalue_err)?;
380
381 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
382 }
383
384 #[pyo3(name = "load_tcbbo_trades")]
386 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
387 fn py_load_tcbbo_trades(
388 &self,
389 filepath: PathBuf,
390 instrument_id: Option<InstrumentId>,
391 price_precision: Option<u8>,
392 ) -> PyResult<Vec<TradeTick>> {
393 self.load_tcbbo_trades(&filepath, instrument_id, price_precision)
394 .map_err(to_pyvalue_err)
395 }
396
397 #[pyo3(name = "load_tcbbo_trades_as_pycapsule")]
398 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
399 fn py_load_tcbbo_trades_as_pycapsule(
400 &self,
401 py: Python,
402 filepath: PathBuf,
403 instrument_id: Option<InstrumentId>,
404 price_precision: Option<u8>,
405 ) -> PyResult<Py<PyAny>> {
406 let iter = self
407 .read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
408 .map_err(to_pyvalue_err)?;
409
410 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
411 }
412
413 #[pyo3(name = "load_trades")]
415 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
416 fn py_load_trades(
417 &self,
418 filepath: PathBuf,
419 instrument_id: Option<InstrumentId>,
420 price_precision: Option<u8>,
421 ) -> PyResult<Vec<TradeTick>> {
422 self.load_trades(&filepath, instrument_id, price_precision)
423 .map_err(to_pyvalue_err)
424 }
425
426 #[pyo3(name = "load_trades_as_pycapsule")]
427 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
428 fn py_load_trades_as_pycapsule(
429 &self,
430 py: Python,
431 filepath: PathBuf,
432 instrument_id: Option<InstrumentId>,
433 price_precision: Option<u8>,
434 ) -> PyResult<Py<PyAny>> {
435 let iter = self
436 .read_records::<dbn::TradeMsg>(&filepath, instrument_id, price_precision, false, None)
437 .map_err(to_pyvalue_err)?;
438
439 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
440 }
441
442 #[pyo3(name = "load_bars")]
444 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
445 fn py_load_bars(
446 &self,
447 filepath: PathBuf,
448 instrument_id: Option<InstrumentId>,
449 price_precision: Option<u8>,
450 timestamp_on_close: bool,
451 ) -> PyResult<Vec<Bar>> {
452 self.load_bars(
453 &filepath,
454 instrument_id,
455 price_precision,
456 Some(timestamp_on_close),
457 )
458 .map_err(to_pyvalue_err)
459 }
460
461 #[pyo3(name = "load_bars_as_pycapsule")]
462 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
463 fn py_load_bars_as_pycapsule(
464 &self,
465 py: Python,
466 filepath: PathBuf,
467 instrument_id: Option<InstrumentId>,
468 price_precision: Option<u8>,
469 timestamp_on_close: bool,
470 ) -> PyResult<Py<PyAny>> {
471 let iter = self
472 .read_records::<dbn::OhlcvMsg>(
473 &filepath,
474 instrument_id,
475 price_precision,
476 false,
477 Some(timestamp_on_close),
478 )
479 .map_err(to_pyvalue_err)?;
480
481 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
482 }
483
484 #[pyo3(name = "load_status")]
485 #[pyo3(signature = (filepath, instrument_id=None))]
486 fn py_load_status(
487 &self,
488 filepath: PathBuf,
489 instrument_id: Option<InstrumentId>,
490 ) -> PyResult<Vec<InstrumentStatus>> {
491 let iter = self
492 .load_status_records::<dbn::StatusMsg>(&filepath, instrument_id)
493 .map_err(to_pyvalue_err)?;
494
495 let mut data = Vec::new();
496
497 for result in iter {
498 match result {
499 Ok(item) => data.push(item),
500 Err(e) => return Err(to_pyvalue_err(e)),
501 }
502 }
503
504 Ok(data)
505 }
506
507 #[pyo3(name = "load_imbalance")]
508 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
509 fn py_load_imbalance(
510 &self,
511 filepath: PathBuf,
512 instrument_id: Option<InstrumentId>,
513 price_precision: Option<u8>,
514 ) -> PyResult<Vec<DatabentoImbalance>> {
515 let iter = self
516 .read_imbalance_records::<dbn::ImbalanceMsg>(&filepath, instrument_id, price_precision)
517 .map_err(to_pyvalue_err)?;
518
519 let mut data = Vec::new();
520
521 for result in iter {
522 match result {
523 Ok(item) => data.push(item),
524 Err(e) => return Err(to_pyvalue_err(e)),
525 }
526 }
527
528 Ok(data)
529 }
530
531 #[pyo3(name = "load_statistics")]
532 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
533 fn py_load_statistics(
534 &self,
535 filepath: PathBuf,
536 instrument_id: Option<InstrumentId>,
537 price_precision: Option<u8>,
538 ) -> PyResult<Vec<DatabentoStatistics>> {
539 let iter = self
540 .read_statistics_records::<dbn::StatMsg>(&filepath, instrument_id, price_precision)
541 .map_err(to_pyvalue_err)?;
542
543 let mut data = Vec::new();
544
545 for result in iter {
546 match result {
547 Ok(item) => data.push(item),
548 Err(e) => return Err(to_pyvalue_err(e)),
549 }
550 }
551
552 Ok(data)
553 }
554}
555
556fn exhaust_data_iter_to_pycapsule(
557 py: Python,
558 iter: impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>>,
559) -> anyhow::Result<Py<PyAny>> {
560 let mut data = Vec::new();
561
562 for result in iter {
563 match result {
564 Ok((Some(item1), None)) => data.push(item1),
565 Ok((None, Some(item2))) => data.push(item2),
566 Ok((Some(item1), Some(item2))) => {
567 data.push(item1);
568 data.push(item2);
569 }
570 Ok((None, None)) => {}
571 Err(e) => return Err(e),
572 }
573 }
574
575 let ffi_data: Vec<DataFFI> = data
576 .into_iter()
577 .map(DataFFI::try_from)
578 .collect::<Result<Vec<_>, _>>()
579 .map_err(to_pyvalue_err)?;
580 let cvec: CVec = ffi_data.into();
581 let capsule = PyCapsule::new_with_destructor::<CVec, _>(py, cvec, None, |_, _| {})?;
583
584 Ok(capsule.into_py_any_unwrap(py))
587}