nautilus_serialization/python/
arrow.rs1use std::io::Cursor;
17
18use arrow::{
19 ipc::{reader::StreamReader, writer::StreamWriter},
20 record_batch::RecordBatch,
21};
22use nautilus_core::python::{to_pyruntime_err, to_pytype_err, to_pyvalue_err};
23use nautilus_model::{
24 data::{
25 Bar, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
26 QuoteTick, TradeTick, close::InstrumentClose,
27 },
28 python::data::{
29 pyobjects_to_bars, pyobjects_to_book_deltas, pyobjects_to_index_prices,
30 pyobjects_to_instrument_closes, pyobjects_to_instrument_statuses, pyobjects_to_mark_prices,
31 pyobjects_to_quotes, pyobjects_to_trades,
32 },
33};
34use pyo3::{
35 conversion::IntoPyObjectExt,
36 prelude::*,
37 types::{PyBytes, PyType},
38};
39
40use crate::arrow::{
41 ArrowSchemaProvider, DecodeTypedFromRecordBatch, bars_to_arrow_record_batch_bytes,
42 book_deltas_to_arrow_record_batch_bytes, book_depth10_to_arrow_record_batch_bytes,
43 index_prices_to_arrow_record_batch_bytes, instrument_closes_to_arrow_record_batch_bytes,
44 instrument_status_to_arrow_record_batch_bytes, mark_prices_to_arrow_record_batch_bytes,
45 quotes_to_arrow_record_batch_bytes, trades_to_arrow_record_batch_bytes,
46};
47
48pub fn arrow_record_batch_to_pybytes(py: Python, batch: &RecordBatch) -> PyResult<Py<PyBytes>> {
54 let mut cursor = Cursor::new(Vec::new());
56 {
57 let mut writer =
58 StreamWriter::try_new(&mut cursor, &batch.schema()).map_err(to_pyruntime_err)?;
59
60 writer.write(batch).map_err(to_pyruntime_err)?;
61
62 writer.finish().map_err(to_pyruntime_err)?;
63 }
64
65 let buffer = cursor.into_inner();
66 let pybytes = PyBytes::new(py, &buffer);
67
68 Ok(pybytes.into())
69}
70
71#[pyfunction]
77#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
78pub fn get_arrow_schema_map(py: Python<'_>, cls: &Bound<'_, PyType>) -> PyResult<Py<PyAny>> {
79 let cls_str: String = cls.getattr("__name__")?.extract()?;
80 let result_map = match cls_str.as_str() {
81 stringify!(OrderBookDelta) => OrderBookDelta::get_schema_map(),
82 stringify!(OrderBookDepth10) => OrderBookDepth10::get_schema_map(),
83 stringify!(QuoteTick) => QuoteTick::get_schema_map(),
84 stringify!(TradeTick) => TradeTick::get_schema_map(),
85 stringify!(Bar) => Bar::get_schema_map(),
86 stringify!(MarkPriceUpdate) => MarkPriceUpdate::get_schema_map(),
87 stringify!(IndexPriceUpdate) => IndexPriceUpdate::get_schema_map(),
88 stringify!(InstrumentStatus) => InstrumentStatus::get_schema_map(),
89 stringify!(InstrumentClose) => InstrumentClose::get_schema_map(),
90 _ => {
91 return Err(to_pytype_err(format!(
92 "Arrow schema for `{cls_str}` is not currently implemented in Rust."
93 )));
94 }
95 };
96
97 result_map.into_py_any(py)
98}
99
100#[pyfunction]
102#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
103#[expect(clippy::missing_panics_doc)] pub fn pyobjects_to_arrow_record_batch_bytes(
105 py: Python,
106 data: Vec<Bound<'_, PyAny>>,
107) -> PyResult<Py<PyBytes>> {
108 if data.is_empty() {
109 return Err(to_pyvalue_err("Empty data"));
110 }
111
112 let data_type: String = data
113 .first()
114 .unwrap() .getattr("__class__")?
116 .getattr("__name__")?
117 .extract()?;
118
119 match data_type.as_str() {
120 stringify!(OrderBookDelta) => {
121 let deltas = pyobjects_to_book_deltas(data)?;
122 py_book_deltas_to_arrow_record_batch_bytes(py, deltas)
123 }
124 stringify!(OrderBookDepth10) => {
125 let depth_snapshots: Vec<OrderBookDepth10> = data
126 .into_iter()
127 .map(|obj| obj.extract::<OrderBookDepth10>().map_err(Into::into))
128 .collect::<PyResult<Vec<OrderBookDepth10>>>()?;
129 py_book_depth10_to_arrow_record_batch_bytes(py, depth_snapshots)
130 }
131 stringify!(QuoteTick) => {
132 let quotes = pyobjects_to_quotes(data)?;
133 py_quotes_to_arrow_record_batch_bytes(py, quotes)
134 }
135 stringify!(TradeTick) => {
136 let trades = pyobjects_to_trades(data)?;
137 py_trades_to_arrow_record_batch_bytes(py, trades)
138 }
139 stringify!(Bar) => {
140 let bars = pyobjects_to_bars(data)?;
141 py_bars_to_arrow_record_batch_bytes(py, bars)
142 }
143 stringify!(MarkPriceUpdate) => {
144 let updates = pyobjects_to_mark_prices(data)?;
145 py_mark_prices_to_arrow_record_batch_bytes(py, updates)
146 }
147 stringify!(IndexPriceUpdate) => {
148 let index_prices = pyobjects_to_index_prices(data)?;
149 py_index_prices_to_arrow_record_batch_bytes(py, index_prices)
150 }
151 stringify!(InstrumentStatus) => {
152 let statuses = pyobjects_to_instrument_statuses(data)?;
153 py_instrument_status_to_arrow_record_batch_bytes(py, statuses)
154 }
155 stringify!(InstrumentClose) => {
156 let closes = pyobjects_to_instrument_closes(data)?;
157 py_instrument_closes_to_arrow_record_batch_bytes(py, closes)
158 }
159 _ => Err(to_pyvalue_err(format!(
160 "unsupported data type: {data_type}"
161 ))),
162 }
163}
164
165#[pyfunction(name = "book_deltas_to_arrow_record_batch_bytes")]
171#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
172#[expect(clippy::needless_pass_by_value)]
173pub fn py_book_deltas_to_arrow_record_batch_bytes(
174 py: Python,
175 data: Vec<OrderBookDelta>,
176) -> PyResult<Py<PyBytes>> {
177 match book_deltas_to_arrow_record_batch_bytes(&data) {
178 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
179 Err(e) => Err(to_pyvalue_err(e)),
180 }
181}
182
183#[pyfunction(name = "book_depth10_to_arrow_record_batch_bytes")]
189#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
190#[expect(clippy::needless_pass_by_value)]
191pub fn py_book_depth10_to_arrow_record_batch_bytes(
192 py: Python,
193 data: Vec<OrderBookDepth10>,
194) -> PyResult<Py<PyBytes>> {
195 match book_depth10_to_arrow_record_batch_bytes(&data) {
196 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
197 Err(e) => Err(to_pyvalue_err(e)),
198 }
199}
200
201#[pyfunction(name = "quotes_to_arrow_record_batch_bytes")]
207#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
208#[expect(clippy::needless_pass_by_value)]
209pub fn py_quotes_to_arrow_record_batch_bytes(
210 py: Python,
211 data: Vec<QuoteTick>,
212) -> PyResult<Py<PyBytes>> {
213 match quotes_to_arrow_record_batch_bytes(&data) {
214 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
215 Err(e) => Err(to_pyvalue_err(e)),
216 }
217}
218
219#[pyfunction(name = "trades_to_arrow_record_batch_bytes")]
225#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
226#[expect(clippy::needless_pass_by_value)]
227pub fn py_trades_to_arrow_record_batch_bytes(
228 py: Python,
229 data: Vec<TradeTick>,
230) -> PyResult<Py<PyBytes>> {
231 match trades_to_arrow_record_batch_bytes(&data) {
232 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
233 Err(e) => Err(to_pyvalue_err(e)),
234 }
235}
236
237#[pyfunction(name = "bars_to_arrow_record_batch_bytes")]
243#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
244#[expect(clippy::needless_pass_by_value)]
245pub fn py_bars_to_arrow_record_batch_bytes(py: Python, data: Vec<Bar>) -> PyResult<Py<PyBytes>> {
246 match bars_to_arrow_record_batch_bytes(&data) {
247 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
248 Err(e) => Err(to_pyvalue_err(e)),
249 }
250}
251
252#[pyfunction(name = "mark_prices_to_arrow_record_batch_bytes")]
258#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
259#[expect(clippy::needless_pass_by_value)]
260pub fn py_mark_prices_to_arrow_record_batch_bytes(
261 py: Python,
262 data: Vec<MarkPriceUpdate>,
263) -> PyResult<Py<PyBytes>> {
264 match mark_prices_to_arrow_record_batch_bytes(&data) {
265 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
266 Err(e) => Err(to_pyvalue_err(e)),
267 }
268}
269
270#[pyfunction(name = "index_prices_to_arrow_record_batch_bytes")]
276#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
277#[expect(clippy::needless_pass_by_value)]
278pub fn py_index_prices_to_arrow_record_batch_bytes(
279 py: Python,
280 data: Vec<IndexPriceUpdate>,
281) -> PyResult<Py<PyBytes>> {
282 match index_prices_to_arrow_record_batch_bytes(&data) {
283 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
284 Err(e) => Err(to_pyvalue_err(e)),
285 }
286}
287
288#[pyfunction(name = "instrument_status_to_arrow_record_batch_bytes")]
294#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
295#[expect(clippy::needless_pass_by_value)]
296pub fn py_instrument_status_to_arrow_record_batch_bytes(
297 py: Python,
298 data: Vec<InstrumentStatus>,
299) -> PyResult<Py<PyBytes>> {
300 match instrument_status_to_arrow_record_batch_bytes(&data) {
301 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
302 Err(e) => Err(to_pyvalue_err(e)),
303 }
304}
305
306#[pyfunction(name = "instrument_status_from_arrow_record_batch_bytes")]
312#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
313pub fn py_instrument_status_from_arrow_record_batch_bytes(
314 _py: Python,
315 data: Vec<u8>,
316) -> PyResult<Vec<InstrumentStatus>> {
317 let cursor = Cursor::new(data);
318 let reader = StreamReader::try_new(cursor, None).map_err(to_pyruntime_err)?;
319
320 let mut results = Vec::new();
321
322 for batch_result in reader {
323 let batch = batch_result.map_err(to_pyruntime_err)?;
324 let metadata = batch.schema().metadata().clone();
325 let decoded =
326 InstrumentStatus::decode_typed_batch(&metadata, batch).map_err(to_pyvalue_err)?;
327 results.extend(decoded);
328 }
329
330 Ok(results)
331}
332
333#[pyfunction(name = "instrument_closes_to_arrow_record_batch_bytes")]
339#[pyo3_stub_gen::derive::gen_stub_pyfunction(module = "nautilus_trader.serialization")]
340#[expect(clippy::needless_pass_by_value)]
341pub fn py_instrument_closes_to_arrow_record_batch_bytes(
342 py: Python,
343 data: Vec<InstrumentClose>,
344) -> PyResult<Py<PyBytes>> {
345 match instrument_closes_to_arrow_record_batch_bytes(&data) {
346 Ok(batch) => arrow_record_batch_to_pybytes(py, &batch),
347 Err(e) => Err(to_pyvalue_err(e)),
348 }
349}