nautilus_databento/python/
arrow.rs1use std::io::Cursor;
17
18use arrow::ipc::reader::StreamReader;
19use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
20use nautilus_serialization::{
21 arrow::ArrowSchemaProvider, python::arrow::arrow_record_batch_to_pybytes,
22};
23use pyo3::{
24 conversion::IntoPyObjectExt,
25 prelude::*,
26 types::{PyBytes, PyType},
27};
28
29use crate::{
30 arrow::{
31 imbalance::{decode_imbalance_batch, imbalance_to_arrow_record_batch},
32 statistics::{decode_statistics_batch, statistics_to_arrow_record_batch},
33 },
34 types::{DatabentoImbalance, DatabentoStatistics},
35};
36
37#[pyfunction]
43#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.databento")]
44pub fn get_databento_arrow_schema_map(
45 py: Python<'_>,
46 cls: &Bound<'_, PyType>,
47) -> PyResult<Py<PyAny>> {
48 let cls_str: String = cls.getattr("__name__")?.extract()?;
49 let result_map = match cls_str.as_str() {
50 stringify!(DatabentoImbalance) => DatabentoImbalance::get_schema_map(),
51 stringify!(DatabentoStatistics) => DatabentoStatistics::get_schema_map(),
52 _ => {
53 return Err(to_pyvalue_err(format!(
54 "Arrow schema for `{cls_str}` is not currently implemented"
55 )));
56 }
57 };
58
59 result_map.into_py_any(py)
60}
61
62#[pyfunction(name = "databento_imbalance_to_arrow_record_batch_bytes")]
68#[expect(clippy::needless_pass_by_value)]
69pub fn py_databento_imbalance_to_arrow_record_batch_bytes(
70 py: Python,
71 data: Vec<DatabentoImbalance>,
72) -> PyResult<Py<PyBytes>> {
73 match imbalance_to_arrow_record_batch(&data) {
74 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
75 Err(e) => Err(to_pyvalue_err(e)),
76 }
77}
78
79#[pyfunction(name = "databento_imbalance_from_arrow_record_batch_bytes")]
85pub fn py_databento_imbalance_from_arrow_record_batch_bytes(
86 _py: Python,
87 data: Vec<u8>,
88) -> PyResult<Vec<DatabentoImbalance>> {
89 let cursor = Cursor::new(data);
90 let reader = StreamReader::try_new(cursor, None).map_err(to_pyruntime_err)?;
91
92 let mut results = Vec::new();
93
94 for batch_result in reader {
95 let batch = batch_result.map_err(to_pyruntime_err)?;
96 let metadata = batch.schema().metadata().clone();
97 let decoded = decode_imbalance_batch(&metadata, &batch).map_err(to_pyvalue_err)?;
98 results.extend(decoded);
99 }
100
101 Ok(results)
102}
103
104#[pyfunction(name = "databento_statistics_to_arrow_record_batch_bytes")]
110#[expect(clippy::needless_pass_by_value)]
111pub fn py_databento_statistics_to_arrow_record_batch_bytes(
112 py: Python,
113 data: Vec<DatabentoStatistics>,
114) -> PyResult<Py<PyBytes>> {
115 match statistics_to_arrow_record_batch(&data) {
116 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
117 Err(e) => Err(to_pyvalue_err(e)),
118 }
119}
120
121#[pyfunction(name = "databento_statistics_from_arrow_record_batch_bytes")]
127pub fn py_databento_statistics_from_arrow_record_batch_bytes(
128 _py: Python,
129 data: Vec<u8>,
130) -> PyResult<Vec<DatabentoStatistics>> {
131 let cursor = Cursor::new(data);
132 let reader = StreamReader::try_new(cursor, None).map_err(to_pyruntime_err)?;
133
134 let mut results = Vec::new();
135
136 for batch_result in reader {
137 let batch = batch_result.map_err(to_pyruntime_err)?;
138 let metadata = batch.schema().metadata().clone();
139 let decoded = decode_statistics_batch(&metadata, &batch).map_err(to_pyvalue_err)?;
140 results.extend(decoded);
141 }
142
143 Ok(results)
144}