Skip to main content

nautilus_serialization/python/
arrow.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::io::Cursor;
17
18use arrow::{
19    ipc::{reader::StreamReader, writer::StreamWriter},
20    record_batch::RecordBatch,
21};
22use nautilus_core::python::{to_pyruntime_err, to_pytype_err, to_pyvalue_err};
23use nautilus_model::{
24    data::{
25        Bar, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
26        QuoteTick, TradeTick, close::InstrumentClose,
27    },
28    python::data::{
29        pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
30        pyobjects_to_instrument_closes, pyobjects_to_instrument_statuses, pyobjects_to_mark_prices,
31        pyobjects_to_quotes, pyobjects_to_trades,
32    },
33};
34use pyo3::{
35    conversion::IntoPyObjectExt,
36    prelude::*,
37    types::{PyBytes, PyType},
38};
39
40use crate::arrow::{
41    ArrowSchemaProvider, DecodeTypedFromRecordBatch, bars_to_arrow_record_batch_bytes,
42    book_deltas_to_arrow_record_batch_bytes, book_depth10_to_arrow_record_batch_bytes,
43    index_prices_to_arrow_record_batch_bytes, instrument_closes_to_arrow_record_batch_bytes,
44    instrument_status_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
45    quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
46};
47
48/// Transforms the given record `batch` into Python `bytes`.
49///
50/// # Errors
51///
52/// Returns a `PyErr` if writing the Arrow IPC stream fails.
53pub fn arrow_record_batch_to_pybytes(py: Python, batch: &RecordBatch) -> PyResult<Py<PyBytes>> {
54    // Create a cursor to write to a byte array in memory
55    let mut cursor = Cursor::new(Vec::new());
56    {
57        let mut writer =
58            StreamWriter::try_new(&mut cursor, &batch.schema()).map_err(to_pyruntime_err)?;
59
60        writer.write(batch).map_err(to_pyruntime_err)?;
61
62        writer.finish().map_err(to_pyruntime_err)?;
63    }
64
65    let buffer = cursor.into_inner();
66    let pybytes = PyBytes::new(py, &buffer);
67
68    Ok(pybytes.into())
69}
70
71/// Returns a mapping from field names to Arrow data types for the given Rust data class.
72///
73/// # Errors
74///
75/// Returns a `PyErr` if the class name is not recognized or schema extraction fails.
76#[pyfunction]
77#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
78pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
79    let cls_str: String = cls.getattr("__name__")?.extract()?;
80    let result_map = match cls_str.as_str() {
81        stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
82        stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
83        stringify!(QuoteTick) => QuoteTick::get_schema_map(),
84        stringify!(TradeTick) => TradeTick::get_schema_map(),
85        stringify!(Bar) => Bar::get_schema_map(),
86        stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
87        stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
88        stringify!(InstrumentStatus) => InstrumentStatus::get_schema_map(),
89        stringify!(InstrumentClose) => InstrumentClose::get_schema_map(),
90        _ => {
91            return Err(to_pytype_err(format!(
92                "Arrow schema for `{cls_str}` is not currently implemented in Rust."
93            )));
94        }
95    };
96
97    result_map.into_py_any(py)
98}
99
100/// Converts a vector of `OrderBookDelta` into an Arrow `RecordBatch`.
101#[pyfunction]
102#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
103#[expect(clippy::missing_panics_doc)] // Guarded by empty check
104pub fn pyobjects_to_arrow_record_batch_bytes(
105    py: Python,
106    data: Vec<Bound<'_, PyAny>>,
107) -> PyResult<Py<PyBytes>> {
108    if data.is_empty() {
109        return Err(to_pyvalue_err("Empty data"));
110    }
111
112    let data_type: String = data
113        .first()
114        .unwrap() // SAFETY: Unwrap safe as already checked that `data` not empty
115        .getattr("__class__")?
116        .getattr("__name__")?
117        .extract()?;
118
119    match data_type.as_str() {
120        stringify!(OrderBookDelta) => {
121            let deltas = pyobjects_to_book_deltas(data)?;
122            py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
123        }
124        stringify!(OrderBookDepth10) => {
125            let depth_snapshots: Vec<OrderBookDepth10> = data
126                .into_iter()
127                .map(|obj| obj.extract::<OrderBookDepth10>().map_err(Into::into))
128                .collect::<PyResult<Vec<OrderBookDepth10>>>()?;
129            py_book_depth10_to_arrow_record_batch_bytes(py, depth_snapshots)
130        }
131        stringify!(QuoteTick) => {
132            let quotes = pyobjects_to_quotes(data)?;
133            py_quotes_to_arrow_record_batch_bytes(py, quotes)
134        }
135        stringify!(TradeTick) => {
136            let trades = pyobjects_to_trades(data)?;
137            py_trades_to_arrow_record_batch_bytes(py, trades)
138        }
139        stringify!(Bar) => {
140            let bars = pyobjects_to_bars(data)?;
141            py_bars_to_arrow_record_batch_bytes(py, bars)
142        }
143        stringify!(MarkPriceUpdate) => {
144            let updates = pyobjects_to_mark_prices(data)?;
145            py_mark_prices_to_arrow_record_batch_bytes(py, updates)
146        }
147        stringify!(IndexPriceUpdate) => {
148            let index_prices = pyobjects_to_index_prices(data)?;
149            py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
150        }
151        stringify!(InstrumentStatus) => {
152            let statuses = pyobjects_to_instrument_statuses(data)?;
153            py_instrument_status_to_arrow_record_batch_bytes(py, statuses)
154        }
155        stringify!(InstrumentClose) => {
156            let closes = pyobjects_to_instrument_closes(data)?;
157            py_instrument_closes_to_arrow_record_batch_bytes(py, closes)
158        }
159        _ => Err(to_pyvalue_err(format!(
160            "unsupported data type: {data_type}"
161        ))),
162    }
163}
164
165/// Converts a list of `OrderBookDelta` into Arrow IPC bytes for Python.
166///
167/// # Errors
168///
169/// Returns a `PyErr` if encoding fails.
170#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
171#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
172#[expect(clippy::needless_pass_by_value)]
173pub fn py_book_deltas_to_arrow_record_batch_bytes(
174    py: Python,
175    data: Vec<OrderBookDelta>,
176) -> PyResult<Py<PyBytes>> {
177    match book_deltas_to_arrow_record_batch_bytes(&data) {
178        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
179        Err(e) => Err(to_pyvalue_err(e)),
180    }
181}
182
183/// Converts a list of `OrderBookDepth10` into Arrow IPC bytes for Python.
184///
185/// # Errors
186///
187/// Returns a `PyErr` if encoding fails.
188#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
189#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
190#[expect(clippy::needless_pass_by_value)]
191pub fn py_book_depth10_to_arrow_record_batch_bytes(
192    py: Python,
193    data: Vec<OrderBookDepth10>,
194) -> PyResult<Py<PyBytes>> {
195    match book_depth10_to_arrow_record_batch_bytes(&data) {
196        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
197        Err(e) => Err(to_pyvalue_err(e)),
198    }
199}
200
201/// Converts a list of `QuoteTick` into Arrow IPC bytes for Python.
202///
203/// # Errors
204///
205/// Returns a `PyErr` if encoding fails.
206#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
207#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
208#[expect(clippy::needless_pass_by_value)]
209pub fn py_quotes_to_arrow_record_batch_bytes(
210    py: Python,
211    data: Vec<QuoteTick>,
212) -> PyResult<Py<PyBytes>> {
213    match quotes_to_arrow_record_batch_bytes(&data) {
214        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
215        Err(e) => Err(to_pyvalue_err(e)),
216    }
217}
218
219/// Converts a list of `TradeTick` into Arrow IPC bytes for Python.
220///
221/// # Errors
222///
223/// Returns a `PyErr` if encoding fails.
224#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
225#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
226#[expect(clippy::needless_pass_by_value)]
227pub fn py_trades_to_arrow_record_batch_bytes(
228    py: Python,
229    data: Vec<TradeTick>,
230) -> PyResult<Py<PyBytes>> {
231    match trades_to_arrow_record_batch_bytes(&data) {
232        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
233        Err(e) => Err(to_pyvalue_err(e)),
234    }
235}
236
237/// Converts a list of `Bar` into Arrow IPC bytes for Python.
238///
239/// # Errors
240///
241/// Returns a `PyErr` if encoding fails.
242#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
243#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
244#[expect(clippy::needless_pass_by_value)]
245pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
246    match bars_to_arrow_record_batch_bytes(&data) {
247        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
248        Err(e) => Err(to_pyvalue_err(e)),
249    }
250}
251
252/// Converts a list of `MarkPriceUpdate` into Arrow IPC bytes for Python.
253///
254/// # Errors
255///
256/// Returns a `PyErr` if encoding fails.
257#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
258#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
259#[expect(clippy::needless_pass_by_value)]
260pub fn py_mark_prices_to_arrow_record_batch_bytes(
261    py: Python,
262    data: Vec<MarkPriceUpdate>,
263) -> PyResult<Py<PyBytes>> {
264    match mark_prices_to_arrow_record_batch_bytes(&data) {
265        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
266        Err(e) => Err(to_pyvalue_err(e)),
267    }
268}
269
270/// Converts a list of `IndexPriceUpdate` into Arrow IPC bytes for Python.
271///
272/// # Errors
273///
274/// Returns a `PyErr` if encoding fails.
275#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
276#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
277#[expect(clippy::needless_pass_by_value)]
278pub fn py_index_prices_to_arrow_record_batch_bytes(
279    py: Python,
280    data: Vec<IndexPriceUpdate>,
281) -> PyResult<Py<PyBytes>> {
282    match index_prices_to_arrow_record_batch_bytes(&data) {
283        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
284        Err(e) => Err(to_pyvalue_err(e)),
285    }
286}
287
288/// Converts a list of `InstrumentStatus` into Arrow IPC bytes for Python.
289///
290/// # Errors
291///
292/// Returns a `PyErr` if encoding fails.
293#[pyfunction(name = "instrument_status_to_arrow_record_batch_bytes")]
294#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
295#[expect(clippy::needless_pass_by_value)]
296pub fn py_instrument_status_to_arrow_record_batch_bytes(
297    py: Python,
298    data: Vec<InstrumentStatus>,
299) -> PyResult<Py<PyBytes>> {
300    match instrument_status_to_arrow_record_batch_bytes(&data) {
301        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
302        Err(e) => Err(to_pyvalue_err(e)),
303    }
304}
305
306/// Decodes Arrow IPC bytes into a list of `InstrumentStatus`.
307///
308/// # Errors
309///
310/// Returns a `PyErr` if decoding fails.
311#[pyfunction(name = "instrument_status_from_arrow_record_batch_bytes")]
312#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
313pub fn py_instrument_status_from_arrow_record_batch_bytes(
314    _py: Python,
315    data: Vec<u8>,
316) -> PyResult<Vec<InstrumentStatus>> {
317    let cursor = Cursor::new(data);
318    let reader = StreamReader::try_new(cursor, None).map_err(to_pyruntime_err)?;
319
320    let mut results = Vec::new();
321
322    for batch_result in reader {
323        let batch = batch_result.map_err(to_pyruntime_err)?;
324        let metadata = batch.schema().metadata().clone();
325        let decoded =
326            InstrumentStatus::decode_typed_batch(&metadata, batch).map_err(to_pyvalue_err)?;
327        results.extend(decoded);
328    }
329
330    Ok(results)
331}
332
333/// Converts a list of `InstrumentClose` into Arrow IPC bytes for Python.
334///
335/// # Errors
336///
337/// Returns a `PyErr` if encoding fails.
338#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
339#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
340#[expect(clippy::needless_pass_by_value)]
341pub fn py_instrument_closes_to_arrow_record_batch_bytes(
342    py: Python,
343    data: Vec<InstrumentClose>,
344) -> PyResult<Py<PyBytes>> {
345    match instrument_closes_to_arrow_record_batch_bytes(&data) {
346        Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
347        Err(e) => Err(to_pyvalue_err(e)),
348    }
349}