Skip to main content

nautilus_persistence/python/backend/
session.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use nautilus_core::{
19    ffi::cvec::CVec,
20    python::{IntoPyObjectNautilusExt, to_pyruntime_err},
21};
22use nautilus_model::data::{
23    Bar, Data, DataFFI, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDepth10,
24    QuoteTick, TradeTick,
25};
26use nautilus_serialization::arrow::custom::CustomDataDecoder;
27use pyo3::{prelude::*, types::PyCapsule};
28
29use crate::backend::session::{DataBackendSession, DataQueryResult};
30
31/// Wrapper to pass a raw pointer across the GIL release boundary.
32struct SendPtr<T>(*mut T);
33
34// SAFETY: Access is serialized by the calling `PyRefMut`
35unsafe impl<T> Send for SendPtr<T> {}
36
37/// Converts a `Data` variant into a Python object via PyO3.
38fn data_to_pyobject(py: Python<'_>, item: Data) -> PyResult<Py<PyAny>> {
39    match item {
40        Data::Quote(quote) => Py::new(py, quote).map(|x| x.into_any()),
41        Data::Trade(trade) => Py::new(py, trade).map(|x| x.into_any()),
42        Data::Bar(bar) => Py::new(py, bar).map(|x| x.into_any()),
43        Data::Delta(delta) => Py::new(py, delta).map(|x| x.into_any()),
44        Data::Deltas(deltas) => Py::new(py, (*deltas).clone()).map(|x| x.into_any()),
45        Data::Depth10(depth) => Py::new(py, *depth).map(|x| x.into_any()),
46        Data::IndexPriceUpdate(price) => Py::new(py, price).map(|x| x.into_any()),
47        Data::MarkPriceUpdate(price) => Py::new(py, price).map(|x| x.into_any()),
48        Data::InstrumentStatus(status) => Py::new(py, status).map(|x| x.into_any()),
49        Data::InstrumentClose(close) => Py::new(py, close).map(|x| x.into_any()),
50        Data::Custom(custom) => Py::new(py, custom).map(|x| x.into_any()),
51    }
52}
53
54#[repr(C)]
55#[pyclass(frozen, eq, eq_int, from_py_object)]
56#[pyo3_stub_gen::derive::gen_stub_pyclass_enum(module = "nautilus_trader.persistence")]
57#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
58pub enum NautilusDataType {
59    // Custom = 0,  # First slot reserved for custom data
60    OrderBookDelta = 1,
61    OrderBookDepth10 = 2,
62    QuoteTick = 3,
63    TradeTick = 4,
64    Bar = 5,
65    MarkPriceUpdate = 6,
66    InstrumentStatus = 7,
67}
68
69#[pymethods]
70#[pyo3_stub_gen::derive::gen_stub_pymethods]
71impl NautilusDataType {
72    const fn __hash__(&self) -> isize {
73        *self as isize
74    }
75}
76
77#[pymethods]
78#[pyo3_stub_gen::derive::gen_stub_pymethods]
79impl DataBackendSession {
80    #[new]
81    #[pyo3(signature=(chunk_size=10_000))]
82    fn new_session(chunk_size: usize) -> Self {
83        Self::new(chunk_size)
84    }
85
86    /// Registers a Parquet file and adds a batch stream for decoding.
87    ///
88    /// The caller must specify `T` to indicate the kind of data expected. `table_name` is
89    /// the logical name for queries; `file_path` is the Parquet path; `sql_query` defaults
90    /// to `SELECT * FROM {table_name} ORDER BY ts_init` if `None`.
91    ///
92    /// When `custom_type_name` is `Some`, it is merged into each batch's schema metadata
93    /// before decoding (as `type_name`). Use this for custom data when Parquet/DataFusion
94    /// does not preserve schema metadata so the decoder can look up the type in the registry.
95    ///
96    /// The file data must be ordered by the `ts_init` in ascending order for this
97    /// to work correctly.
98    #[pyo3(name = "add_file")]
99    #[pyo3(signature = (data_type, table_name, file_path, sql_query=None))]
100    fn py_add_file(
101        mut slf: PyRefMut<'_, Self>,
102        data_type: NautilusDataType,
103        table_name: &str,
104        file_path: &str,
105        sql_query: Option<&str>,
106    ) -> PyResult<()> {
107        let _guard = slf.runtime.enter();
108
109        match data_type {
110            NautilusDataType::OrderBookDelta => slf
111                .add_file::<OrderBookDelta>(table_name, file_path, sql_query, None)
112                .map_err(to_pyruntime_err),
113            NautilusDataType::OrderBookDepth10 => slf
114                .add_file::<OrderBookDepth10>(table_name, file_path, sql_query, None)
115                .map_err(to_pyruntime_err),
116            NautilusDataType::QuoteTick => slf
117                .add_file::<QuoteTick>(table_name, file_path, sql_query, None)
118                .map_err(to_pyruntime_err),
119            NautilusDataType::TradeTick => slf
120                .add_file::<TradeTick>(table_name, file_path, sql_query, None)
121                .map_err(to_pyruntime_err),
122            NautilusDataType::Bar => slf
123                .add_file::<Bar>(table_name, file_path, sql_query, None)
124                .map_err(to_pyruntime_err),
125            NautilusDataType::MarkPriceUpdate => slf
126                .add_file::<MarkPriceUpdate>(table_name, file_path, sql_query, None)
127                .map_err(to_pyruntime_err),
128            NautilusDataType::InstrumentStatus => slf
129                .add_file::<InstrumentStatus>(table_name, file_path, sql_query, None)
130                .map_err(to_pyruntime_err),
131        }
132    }
133
134    /// Registers a Parquet file for a custom data type identified by `type_name`.
135    ///
136    /// The custom data type must have been registered via
137    /// `ensure_custom_data_registered::<T>()` before calling this method.
138    #[pyo3(name = "add_custom_file")]
139    #[pyo3(signature = (type_name, table_name, file_path, sql_query=None))]
140    fn py_add_custom_file(
141        mut slf: PyRefMut<'_, Self>,
142        type_name: &str,
143        table_name: &str,
144        file_path: &str,
145        sql_query: Option<&str>,
146    ) -> PyResult<()> {
147        let _guard = slf.runtime.enter();
148        slf.add_file::<CustomDataDecoder>(table_name, file_path, sql_query, Some(type_name))
149            .map_err(to_pyruntime_err)
150    }
151
152    fn to_query_result(mut slf: PyRefMut<'_, Self>) -> DataQueryResult {
153        let py = slf.py();
154        let chunk_size = slf.chunk_size;
155        let ptr = SendPtr(&raw mut *slf);
156
157        // SAFETY: see comment on `__next__` for the safety argument.
158        // The GIL release is needed here because `get_query_result` eagerly
159        // pulls the first element from each stream (via `KMerge::push_iter`),
160        // which blocks on the tokio channel while workers may need the GIL.
161        let query_result = unsafe {
162            py.detach(move || {
163                let p = ptr;
164                (*p.0).get_query_result()
165            })
166        };
167
168        DataQueryResult::new(query_result, chunk_size)
169    }
170
171    /// Register an object store with the session context from a URI with optional storage options
172    #[pyo3(name = "register_object_store_from_uri")]
173    #[pyo3(signature = (uri, storage_options=None))]
174    fn py_register_object_store_from_uri(
175        mut slf: PyRefMut<'_, Self>,
176        uri: &str,
177        storage_options: Option<HashMap<String, String>>,
178    ) -> PyResult<()> {
179        // Convert HashMap to AHashMap for internal use
180        let storage_options = storage_options.map(|m| m.into_iter().collect());
181        slf.register_object_store_from_uri(uri, storage_options)
182            .map_err(to_pyruntime_err)
183    }
184}
185
186#[pymethods]
187#[pyo3_stub_gen::derive::gen_stub_pymethods]
188impl DataQueryResult {
189    /// The reader implements an iterator.
190    const fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
191        slf
192    }
193
194    /// Each iteration returns a chunk of values read from the parquet file.
195    ///
196    /// For built-in types, returns a PyCapsule containing a CVec of DataFFI (C layout)
197    /// consumed by Cython `capsule_to_list`. For custom data types (which are not
198    /// FFI-safe), returns a Python list of PyO3 objects directly.
199    fn __next__(mut slf: PyRefMut<'_, Self>) -> PyResult<Option<Py<PyAny>>> {
200        let py = slf.py();
201        let ptr = SendPtr(&raw mut *slf);
202
203        // SAFETY: `PyRefMut` guarantees exclusive access to the underlying
204        // object for the duration of this method call. The runtime borrow
205        // flag prevents any other Python thread from accessing it.
206        //
207        // The GIL must be released here so that tokio worker threads can
208        // acquire it when decoding custom data types via `Python::attach`.
209        // Without this, custom-type streaming deadlocks: the main thread
210        // holds the GIL while blocking on `recv`, and workers block on
211        // `Python::attach` waiting for the GIL.
212        let acc = unsafe {
213            py.detach(move || {
214                let p = ptr;
215                (*p.0).next()
216            })
217        };
218
219        match acc {
220            Some(acc) if !acc.is_empty() => {
221                let has_non_ffi = acc
222                    .iter()
223                    .any(|d| matches!(d, Data::Custom(_) | Data::InstrumentStatus(_)));
224
225                if has_non_ffi {
226                    // Custom and instrument-status data: convert directly to Python objects (bypasses FFI)
227                    let objects: Vec<Py<PyAny>> = acc
228                        .into_iter()
229                        .map(|item| data_to_pyobject(py, item))
230                        .collect::<PyResult<_>>()?;
231                    Ok(Some(objects.into_py_any_unwrap(py)))
232                } else {
233                    // Built-in types: FFI capsule path
234                    let ffi_data: Vec<DataFFI> = acc
235                        .into_iter()
236                        .map(DataFFI::try_from)
237                        .collect::<Result<Vec<_>, _>>()
238                        .map_err(to_pyruntime_err)?;
239                    let cvec: CVec = ffi_data.into();
240                    match PyCapsule::new_with_destructor::<CVec, _>(py, cvec, None, |_, _| {}) {
241                        Ok(capsule) => Ok(Some(capsule.into_py_any_unwrap(py))),
242                        Err(e) => Err(to_pyruntime_err(e)),
243                    }
244                }
245            }
246            _ => Ok(None),
247        }
248    }
249}