nautilus_persistence/python/wranglers/
bar.rs1use std::{collections::HashMap, io::Cursor, str::FromStr};
17
18use datafusion::arrow::ipc::reader::StreamReader;
19use nautilus_core::python::to_pyvalue_err;
20use nautilus_model::data::bar::{Bar, BarType};
21use nautilus_serialization::arrow::DecodeFromRecordBatch;
22use pyo3::prelude::*;
23
24#[pyclass]
25#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.persistence")]
26pub struct BarDataWrangler {
27 bar_type: BarType,
28 price_precision: u8,
29 size_precision: u8,
30 metadata: HashMap<String, String>,
31}
32
33#[pymethods]
34#[pyo3_stub_gen::derive::gen_stub_pymethods]
35impl BarDataWrangler {
36 #[new]
37 fn py_new(bar_type: &str, price_precision: u8, size_precision: u8) -> PyResult<Self> {
38 let bar_type = BarType::from_str(bar_type).map_err(to_pyvalue_err)?;
39 let metadata = Bar::get_metadata(&bar_type, price_precision, size_precision);
40
41 Ok(Self {
42 bar_type,
43 price_precision,
44 size_precision,
45 metadata,
46 })
47 }
48
49 #[getter]
50 fn bar_type(&self) -> String {
51 self.bar_type.to_string()
52 }
53
54 #[getter]
55 const fn price_precision(&self) -> u8 {
56 self.price_precision
57 }
58
59 #[getter]
60 const fn size_precision(&self) -> u8 {
61 self.size_precision
62 }
63
64 fn process_record_batch_bytes(
65 &self,
66 #[gen_stub(override_type(type_repr = "bytes"))] data: &[u8],
67 ) -> PyResult<Vec<Bar>> {
68 let cursor = Cursor::new(data);
70 let reader = match StreamReader::try_new(cursor, None) {
71 Ok(reader) => reader,
72 Err(e) => return Err(to_pyvalue_err(e)),
73 };
74
75 let mut bars = Vec::new();
76
77 for maybe_batch in reader {
79 let record_batch = match maybe_batch {
80 Ok(record_batch) => record_batch,
81 Err(e) => return Err(to_pyvalue_err(e)),
82 };
83
84 let batch_bars =
85 Bar::decode_batch(&self.metadata, record_batch).map_err(to_pyvalue_err)?;
86 bars.extend(batch_bars);
87 }
88
89 Ok(bars)
90 }
91}