Skip to main content

nautilus_databento/python/
arrow.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::io::Cursor;
17
18use arrow::ipc::reader::StreamReader;
19use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
20use nautilus_serialization::{
21    arrow::ArrowSchemaProvider, python::arrow::arrow_record_batch_to_pybytes,
22};
23use pyo3::{
24    conversion::IntoPyObjectExt,
25    prelude::*,
26    types::{PyBytes, PyType},
27};
28
29use crate::{
30    arrow::{
31        imbalance::{decode_imbalance_batch, imbalance_to_arrow_record_batch},
32        statistics::{decode_statistics_batch, statistics_to_arrow_record_batch},
33    },
34    types::{DatabentoImbalance, DatabentoStatistics},
35};
36
37/// Returns a mapping from field names to Arrow data types for the given Databento data class.
38///
39/// # Errors
40///
41/// Returns a `PyErr` if the class name is not recognized.
42#[pyfunction]
43#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.databento")]
44pub fn get_databento_arrow_schema_map(
45    py: Python<'_>,
46    cls: &Bound<'_, PyType>,
47) -> PyResult<Py<PyAny>> {
48    let cls_str: String = cls.getattr("__name__")?.extract()?;
49    let result_map = match cls_str.as_str() {
50        stringify!(DatabentoImbalance) => DatabentoImbalance::get_schema_map(),
51        stringify!(DatabentoStatistics) => DatabentoStatistics::get_schema_map(),
52        _ => {
53            return Err(to_pyvalue_err(format!(
54                "Arrow schema for `{cls_str}` is not currently implemented"
55            )));
56        }
57    };
58
59    result_map.into_py_any(py)
60}
61
62/// Encodes a list of `DatabentoImbalance` into Arrow IPC bytes.
63///
64/// # Errors
65///
66/// Returns a `PyErr` if encoding fails.
67#[pyfunction(name = "databento_imbalance_to_arrow_record_batch_bytes")]
68#[expect(clippy::needless_pass_by_value)]
69pub fn py_databento_imbalance_to_arrow_record_batch_bytes(
70    py: Python,
71    data: Vec<DatabentoImbalance>,
72) -> PyResult<Py<PyBytes>> {
73    match imbalance_to_arrow_record_batch(&data) {
74        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
75        Err(e) => Err(to_pyvalue_err(e)),
76    }
77}
78
79/// Decodes Arrow IPC bytes into a list of `DatabentoImbalance`.
80///
81/// # Errors
82///
83/// Returns a `PyErr` if decoding fails.
84#[pyfunction(name = "databento_imbalance_from_arrow_record_batch_bytes")]
85pub fn py_databento_imbalance_from_arrow_record_batch_bytes(
86    _py: Python,
87    data: Vec<u8>,
88) -> PyResult<Vec<DatabentoImbalance>> {
89    let cursor = Cursor::new(data);
90    let reader = StreamReader::try_new(cursor, None).map_err(to_pyruntime_err)?;
91
92    let mut results = Vec::new();
93
94    for batch_result in reader {
95        let batch = batch_result.map_err(to_pyruntime_err)?;
96        let metadata = batch.schema().metadata().clone();
97        let decoded = decode_imbalance_batch(&metadata, &batch).map_err(to_pyvalue_err)?;
98        results.extend(decoded);
99    }
100
101    Ok(results)
102}
103
104/// Encodes a list of `DatabentoStatistics` into Arrow IPC bytes.
105///
106/// # Errors
107///
108/// Returns a `PyErr` if encoding fails.
109#[pyfunction(name = "databento_statistics_to_arrow_record_batch_bytes")]
110#[expect(clippy::needless_pass_by_value)]
111pub fn py_databento_statistics_to_arrow_record_batch_bytes(
112    py: Python,
113    data: Vec<DatabentoStatistics>,
114) -> PyResult<Py<PyBytes>> {
115    match statistics_to_arrow_record_batch(&data) {
116        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
117        Err(e) => Err(to_pyvalue_err(e)),
118    }
119}
120
121/// Decodes Arrow IPC bytes into a list of `DatabentoStatistics`.
122///
123/// # Errors
124///
125/// Returns a `PyErr` if decoding fails.
126#[pyfunction(name = "databento_statistics_from_arrow_record_batch_bytes")]
127pub fn py_databento_statistics_from_arrow_record_batch_bytes(
128    _py: Python,
129    data: Vec<u8>,
130) -> PyResult<Vec<DatabentoStatistics>> {
131    let cursor = Cursor::new(data);
132    let reader = StreamReader::try_new(cursor, None).map_err(to_pyruntime_err)?;
133
134    let mut results = Vec::new();
135
136    for batch_result in reader {
137        let batch = batch_result.map_err(to_pyruntime_err)?;
138        let metadata = batch.schema().metadata().clone();
139        let decoded = decode_statistics_batch(&metadata, &batch).map_err(to_pyvalue_err)?;
140        results.extend(decoded);
141    }
142
143    Ok(results)
144}