nautilus_persistence/python/backend/
session.rs1use std::collections::HashMap;
17
18use nautilus_core::{
19 ffi::cvec::CVec,
20 python::{IntoPyObjectNautilusExt, to_pyruntime_err},
21};
22use nautilus_model::data::{
23 Bar, Data, DataFFI, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
24 QuoteTick, TradeTick,
25};
26use nautilus_serialization::arrow::custom::CustomDataDecoder;
27use pyo3::{prelude::*, types::PyCapsule};
28
29use crate::backend::session::{DataBackendSession, DataQueryResult};
30
31struct SendPtr<T>(*mut T);
33
34unsafe impl<T> Send for SendPtr<T> {}
36
37fn data_to_pyobject(py: Python<'_>, item: Data) -> PyResult<Py<PyAny>> {
39 match item {
40 Data::Quote(quote) => Py::new(py, quote).map(|x| x.into_any()),
41 Data::Trade(trade) => Py::new(py, trade).map(|x| x.into_any()),
42 Data::Bar(bar) => Py::new(py, bar).map(|x| x.into_any()),
43 Data::Delta(delta) => Py::new(py, delta).map(|x| x.into_any()),
44 Data::Deltas(deltas) => Py::new(py, (*deltas).clone()).map(|x| x.into_any()),
45 Data::Depth10(depth) => Py::new(py, *depth).map(|x| x.into_any()),
46 Data::IndexPriceUpdate(price) => Py::new(py, price).map(|x| x.into_any()),
47 Data::MarkPriceUpdate(price) => Py::new(py, price).map(|x| x.into_any()),
48 Data::InstrumentStatus(status) => Py::new(py, status).map(|x| x.into_any()),
49 Data::InstrumentClose(close) => Py::new(py, close).map(|x| x.into_any()),
50 Data::Custom(custom) => Py::new(py, custom).map(|x| x.into_any()),
51 }
52}
53
54#[repr(C)]
55#[pyclass(frozen, eq, eq_int, from_py_object)]
56#[pyo3_stub_gen::derive::gen_stub_pyclass_enum(module = "nautilus_trader.persistence")]
57#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
58pub enum NautilusDataType {
59 OrderBookDelta = 1,
61 OrderBookDepth10 = 2,
62 QuoteTick = 3,
63 TradeTick = 4,
64 Bar = 5,
65 MarkPriceUpdate = 6,
66 InstrumentStatus = 7,
67}
68
69#[pymethods]
70#[pyo3_stub_gen::derive::gen_stub_pymethods]
71impl NautilusDataType {
72 const fn __hash__(&self) -> isize {
73 *self as isize
74 }
75}
76
77#[pymethods]
78#[pyo3_stub_gen::derive::gen_stub_pymethods]
79impl DataBackendSession {
80 #[new]
81 #[pyo3(signature=(chunk_size=10_000))]
82 fn new_session(chunk_size: usize) -> Self {
83 Self::new(chunk_size)
84 }
85
86 #[pyo3(name = "add_file")]
99 #[pyo3(signature = (data_type, table_name, file_path, sql_query=None))]
100 fn py_add_file(
101 mut slf: PyRefMut<'_, Self>,
102 data_type: NautilusDataType,
103 table_name: &str,
104 file_path: &str,
105 sql_query: Option<&str>,
106 ) -> PyResult<()> {
107 let _guard = slf.runtime.enter();
108
109 match data_type {
110 NautilusDataType::OrderBookDelta => slf
111 .add_file::<OrderBookDelta>(table_name, file_path, sql_query, None)
112 .map_err(to_pyruntime_err),
113 NautilusDataType::OrderBookDepth10 => slf
114 .add_file::<OrderBookDepth10>(table_name, file_path, sql_query, None)
115 .map_err(to_pyruntime_err),
116 NautilusDataType::QuoteTick => slf
117 .add_file::<QuoteTick>(table_name, file_path, sql_query, None)
118 .map_err(to_pyruntime_err),
119 NautilusDataType::TradeTick => slf
120 .add_file::<TradeTick>(table_name, file_path, sql_query, None)
121 .map_err(to_pyruntime_err),
122 NautilusDataType::Bar => slf
123 .add_file::<Bar>(table_name, file_path, sql_query, None)
124 .map_err(to_pyruntime_err),
125 NautilusDataType::MarkPriceUpdate => slf
126 .add_file::<MarkPriceUpdate>(table_name, file_path, sql_query, None)
127 .map_err(to_pyruntime_err),
128 NautilusDataType::InstrumentStatus => slf
129 .add_file::<InstrumentStatus>(table_name, file_path, sql_query, None)
130 .map_err(to_pyruntime_err),
131 }
132 }
133
134 #[pyo3(name = "add_custom_file")]
139 #[pyo3(signature = (type_name, table_name, file_path, sql_query=None))]
140 fn py_add_custom_file(
141 mut slf: PyRefMut<'_, Self>,
142 type_name: &str,
143 table_name: &str,
144 file_path: &str,
145 sql_query: Option<&str>,
146 ) -> PyResult<()> {
147 let _guard = slf.runtime.enter();
148 slf.add_file::<CustomDataDecoder>(table_name, file_path, sql_query, Some(type_name))
149 .map_err(to_pyruntime_err)
150 }
151
152 fn to_query_result(mut slf: PyRefMut<'_, Self>) -> DataQueryResult {
153 let py = slf.py();
154 let chunk_size = slf.chunk_size;
155 let ptr = SendPtr(&raw mut *slf);
156
157 let query_result = unsafe {
162 py.detach(move || {
163 let p = ptr;
164 (*p.0).get_query_result()
165 })
166 };
167
168 DataQueryResult::new(query_result, chunk_size)
169 }
170
171 #[pyo3(name = "register_object_store_from_uri")]
173 #[pyo3(signature = (uri, storage_options=None))]
174 fn py_register_object_store_from_uri(
175 mut slf: PyRefMut<'_, Self>,
176 uri: &str,
177 storage_options: Option<HashMap<String, String>>,
178 ) -> PyResult<()> {
179 let storage_options = storage_options.map(|m| m.into_iter().collect());
181 slf.register_object_store_from_uri(uri, storage_options)
182 .map_err(to_pyruntime_err)
183 }
184}
185
186#[pymethods]
187#[pyo3_stub_gen::derive::gen_stub_pymethods]
188impl DataQueryResult {
189 const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
191 slf
192 }
193
194 fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<Py<PyAny>>> {
200 let py = slf.py();
201 let ptr = SendPtr(&raw mut *slf);
202
203 let acc = unsafe {
213 py.detach(move || {
214 let p = ptr;
215 (*p.0).next()
216 })
217 };
218
219 match acc {
220 Some(acc) if !acc.is_empty() => {
221 let has_non_ffi = acc
222 .iter()
223 .any(|d| matches!(d, Data::Custom(_) | Data::InstrumentStatus(_)));
224
225 if has_non_ffi {
226 let objects: Vec<Py<PyAny>> = acc
228 .into_iter()
229 .map(|item| data_to_pyobject(py, item))
230 .collect::<PyResult<_>>()?;
231 Ok(Some(objects.into_py_any_unwrap(py)))
232 } else {
233 let ffi_data: Vec<DataFFI> = acc
235 .into_iter()
236 .map(DataFFI::try_from)
237 .collect::<Result<Vec<_>, _>>()
238 .map_err(to_pyruntime_err)?;
239 let cvec: CVec = ffi_data.into();
240 match PyCapsule::new_with_destructor::<CVec, _>(py, cvec, None, |_, _| {}) {
241 Ok(capsule) => Ok(Some(capsule.into_py_any_unwrap(py))),
242 Err(e) => Err(to_pyruntime_err(e)),
243 }
244 }
245 }
246 _ => Ok(None),
247 }
248 }
249}