Skip to main content

nautilus_persistence/python/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use nautilus_core::{UnixNanos, python::to_pytype_err};
19use nautilus_model::{
20    data::{
21        Bar, Data, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate, OrderBookDelta,
22        OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose,
23    },
24    python::instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
25};
26use pyo3::{exceptions::PyIOError, prelude::*, types::PyList};
27
28use crate::backend::catalog::ParquetDataCatalog;
29
30/// Converts a single `Data` variant into a Python object for returning from catalog methods.
31fn data_to_pyobject(py: Python<'_>, item: Data) -> PyResult<Py<PyAny>> {
32    match item {
33        Data::Quote(quote) => Py::new(py, quote).map(|x| x.into_any()),
34        Data::Trade(trade) => Py::new(py, trade).map(|x| x.into_any()),
35        Data::Bar(bar) => Py::new(py, bar).map(|x| x.into_any()),
36        Data::Delta(delta) => Py::new(py, delta).map(|x| x.into_any()),
37        Data::Deltas(deltas) => Py::new(py, (*deltas).clone()).map(|x| x.into_any()),
38        Data::Depth10(depth) => Py::new(py, *depth).map(|x| x.into_any()),
39        Data::IndexPriceUpdate(price) => Py::new(py, price).map(|x| x.into_any()),
40        Data::MarkPriceUpdate(price) => Py::new(py, price).map(|x| x.into_any()),
41        Data::InstrumentStatus(status) => Py::new(py, status).map(|x| x.into_any()),
42        Data::InstrumentClose(close) => Py::new(py, close).map(|x| x.into_any()),
43        Data::Custom(custom) => Py::new(py, custom).map(|x| x.into_any()),
44    }
45}
46
47/// A catalog for writing data to Parquet files.
48#[pyclass(
49    name = "ParquetDataCatalog",
50    module = "nautilus_trader.core.nautilus_pyo3.persistence"
51)]
52#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.persistence")]
53pub struct PyParquetDataCatalog {
54    inner: ParquetDataCatalog,
55}
56
57#[pymethods]
58#[pyo3_stub_gen::derive::gen_stub_pymethods]
59impl PyParquetDataCatalog {
60    /// Create a new `ParquetCatalog` with the given base path and optional parameters.
61    ///
62    /// # Parameters
63    ///
64    /// - `base_path`: The base path for the catalog
65    /// - `storage_options`: Optional storage configuration for cloud backends
66    /// - `batch_size`: Optional batch size for processing (default: 5000)
67    /// - `compression`: Optional compression type (0=UNCOMPRESSED, 1=SNAPPY, 2=GZIP, 3=LZO, 4=BROTLI, 5=LZ4, 6=ZSTD)
68    /// - `max_row_group_size`: Optional maximum row group size (default: 5000)
69    #[new]
70    #[pyo3(signature = (base_path, storage_options=None, batch_size=None, compression=None, max_row_group_size=None))]
71    #[must_use]
72    pub fn new(
73        base_path: &str,
74        storage_options: Option<HashMap<String, String>>,
75        batch_size: Option<usize>,
76        compression: Option<u8>,
77        max_row_group_size: Option<usize>,
78    ) -> Self {
79        let compression = compression.map(|c| match c {
80            0 => parquet::basic::Compression::UNCOMPRESSED,
81            1 => parquet::basic::Compression::SNAPPY,
82            // For GZIP, LZO, BROTLI, LZ4, ZSTD we need to use the default level
83            // since we can't pass the level parameter through PyO3
84            2 => {
85                let level = parquet::basic::GzipLevel::default();
86                parquet::basic::Compression::GZIP(level)
87            }
88            3 => parquet::basic::Compression::LZO,
89            4 => {
90                let level = parquet::basic::BrotliLevel::default();
91                parquet::basic::Compression::BROTLI(level)
92            }
93            5 => parquet::basic::Compression::LZ4,
94            6 => {
95                let level = parquet::basic::ZstdLevel::default();
96                parquet::basic::Compression::ZSTD(level)
97            }
98            _ => parquet::basic::Compression::SNAPPY,
99        });
100
101        // Convert HashMap to AHashMap for internal use
102        let storage_options = storage_options.map(|m| m.into_iter().collect());
103
104        Self {
105            inner: ParquetDataCatalog::from_uri(
106                base_path,
107                storage_options,
108                batch_size,
109                compression,
110                max_row_group_size,
111            )
112            .expect("Failed to create ParquetDataCatalog"),
113        }
114    }
115
116    // TODO: Cannot pass mixed data across pyo3 as a single type
117    // pub fn write_data(mut slf: PyRefMut<'_, Self>, data_type: NautilusDataType, data: Vec<Data>) {}
118
119    /// Write quote tick data to Parquet files.
120    ///
121    /// # Parameters
122    ///
123    /// - `data`: Vector of quote ticks to write
124    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
125    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
126    ///
127    /// # Returns
128    ///
129    /// Returns the path of the created file as a string.
130    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
131    pub fn write_quote_ticks(
132        &self,
133        data: Vec<QuoteTick>,
134        start: Option<u64>,
135        end: Option<u64>,
136        skip_disjoint_check: bool,
137    ) -> PyResult<String> {
138        // Convert u64 timestamps to UnixNanos
139        let start_nanos = start.map(UnixNanos::from);
140        let end_nanos = end.map(UnixNanos::from);
141
142        self.inner
143            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
144            .map(|path| path.to_string_lossy().to_string())
145            .map_err(|e| PyIOError::new_err(format!("Failed to write quote ticks: {e}")))
146    }
147
148    /// Write trade tick data to Parquet files.
149    ///
150    /// # Parameters
151    ///
152    /// - `data`: Vector of trade ticks to write
153    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
154    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
155    ///
156    /// # Returns
157    ///
158    /// Returns the path of the created file as a string.
159    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
160    pub fn write_trade_ticks(
161        &self,
162        data: Vec<TradeTick>,
163        start: Option<u64>,
164        end: Option<u64>,
165        skip_disjoint_check: bool,
166    ) -> PyResult<String> {
167        // Convert u64 timestamps to UnixNanos
168        let start_nanos = start.map(UnixNanos::from);
169        let end_nanos = end.map(UnixNanos::from);
170
171        self.inner
172            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
173            .map(|path| path.to_string_lossy().to_string())
174            .map_err(|e| PyIOError::new_err(format!("Failed to write trade ticks: {e}")))
175    }
176
177    /// Write order book delta data to Parquet files.
178    ///
179    /// # Parameters
180    ///
181    /// - `data`: Vector of order book deltas to write
182    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
183    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
184    ///
185    /// # Returns
186    ///
187    /// Returns the path of the created file as a string.
188    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
189    pub fn write_order_book_deltas(
190        &self,
191        data: Vec<OrderBookDelta>,
192        start: Option<u64>,
193        end: Option<u64>,
194        skip_disjoint_check: bool,
195    ) -> PyResult<String> {
196        // Convert u64 timestamps to UnixNanos
197        let start_nanos = start.map(UnixNanos::from);
198        let end_nanos = end.map(UnixNanos::from);
199
200        self.inner
201            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
202            .map(|path| path.to_string_lossy().to_string())
203            .map_err(|e| PyIOError::new_err(format!("Failed to write order book deltas: {e}")))
204    }
205
206    /// Write bar data to Parquet files.
207    ///
208    /// # Parameters
209    ///
210    /// - `data`: Vector of bars to write
211    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
212    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
213    ///
214    /// # Returns
215    ///
216    /// Returns the path of the created file as a string.
217    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
218    pub fn write_bars(
219        &self,
220        data: Vec<Bar>,
221        start: Option<u64>,
222        end: Option<u64>,
223        skip_disjoint_check: bool,
224    ) -> PyResult<String> {
225        // Convert u64 timestamps to UnixNanos
226        let start_nanos = start.map(UnixNanos::from);
227        let end_nanos = end.map(UnixNanos::from);
228
229        self.inner
230            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
231            .map(|path| path.to_string_lossy().to_string())
232            .map_err(|e| PyIOError::new_err(format!("Failed to write bars: {e}")))
233    }
234
235    /// Write order book depth data to Parquet files.
236    ///
237    /// # Parameters
238    ///
239    /// - `data`: Vector of order book depths to write
240    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
241    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
242    ///
243    /// # Returns
244    ///
245    /// Returns the path of the created file as a string.
246    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
247    pub fn write_order_book_depths(
248        &self,
249        data: Vec<OrderBookDepth10>,
250        start: Option<u64>,
251        end: Option<u64>,
252        skip_disjoint_check: bool,
253    ) -> PyResult<String> {
254        // Convert u64 timestamps to UnixNanos
255        let start_nanos = start.map(UnixNanos::from);
256        let end_nanos = end.map(UnixNanos::from);
257
258        self.inner
259            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
260            .map(|path| path.to_string_lossy().to_string())
261            .map_err(|e| PyIOError::new_err(format!("Failed to write order book depths: {e}")))
262    }
263
264    /// Write mark price update data to Parquet files.
265    ///
266    /// # Parameters
267    ///
268    /// - `data`: Vector of mark price updates to write
269    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
270    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
271    ///
272    /// # Returns
273    ///
274    /// Returns the path of the created file as a string.
275    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
276    pub fn write_mark_price_updates(
277        &self,
278        data: Vec<MarkPriceUpdate>,
279        start: Option<u64>,
280        end: Option<u64>,
281        skip_disjoint_check: bool,
282    ) -> PyResult<String> {
283        // Convert u64 timestamps to UnixNanos
284        let start_nanos = start.map(UnixNanos::from);
285        let end_nanos = end.map(UnixNanos::from);
286
287        self.inner
288            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
289            .map(|path| path.to_string_lossy().to_string())
290            .map_err(|e| PyIOError::new_err(format!("Failed to write mark price updates: {e}")))
291    }
292
293    /// Write index price update data to Parquet files.
294    ///
295    /// # Parameters
296    ///
297    /// - `data`: Vector of index price updates to write
298    /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
299    /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
300    ///
301    /// # Returns
302    ///
303    /// Returns the path of the created file as a string.
304    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
305    pub fn write_index_price_updates(
306        &self,
307        data: Vec<IndexPriceUpdate>,
308        start: Option<u64>,
309        end: Option<u64>,
310        skip_disjoint_check: bool,
311    ) -> PyResult<String> {
312        // Convert u64 timestamps to UnixNanos
313        let start_nanos = start.map(UnixNanos::from);
314        let end_nanos = end.map(UnixNanos::from);
315
316        self.inner
317            .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
318            .map(|path| path.to_string_lossy().to_string())
319            .map_err(|e| PyIOError::new_err(format!("Failed to write index price updates: {e}")))
320    }
321
322    /// Write instruments to Parquet files in the catalog.
323    ///
324    /// Instruments are stored under `data/instruments/{instrument_id}/` using timestamp-ranged
325    /// parquet file names, allowing multiple historical versions of the same instrument to be
326    /// written across separate calls.
327    ///
328    /// # Parameters
329    ///
330    /// - `data`: A Python list of instrument objects (e.g. CurrencyPair, Equity).
331    ///
332    /// # Returns
333    ///
334    /// Returns a list of written file paths.
335    #[pyo3(signature = (data))]
336    pub fn write_instruments(&self, data: &Bound<'_, PyAny>) -> PyResult<Vec<String>> {
337        let py = data.py();
338        let list = data.cast::<PyList>()?;
339        let mut instruments = Vec::with_capacity(list.len());
340        for item in list.iter() {
341            let py_item: Py<PyAny> = item.unbind();
342            let instrument = pyobject_to_instrument_any(py, py_item)?;
343            instruments.push(instrument);
344        }
345        self.inner
346            .write_instruments(instruments)
347            .map(|paths| {
348                paths
349                    .into_iter()
350                    .map(|p| p.to_string_lossy().to_string())
351                    .collect()
352            })
353            .map_err(|e| PyIOError::new_err(format!("Failed to write instruments: {e}")))
354    }
355
356    /// Query instruments from the catalog.
357    ///
358    /// # Parameters
359    ///
360    /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, returns all instruments.
361    /// - `start`: Optional inclusive lower bound for `ts_init` filtering.
362    /// - `end`: Optional inclusive upper bound for `ts_init` filtering.
363    ///
364    /// # Returns
365    ///
366    /// Returns a list of instrument objects (e.g. CurrencyPair, Equity).
367    #[pyo3(signature = (instrument_ids=None, start=None, end=None))]
368    #[expect(clippy::needless_pass_by_value)]
369    pub fn instruments(
370        &self,
371        instrument_ids: Option<Vec<String>>,
372        start: Option<u64>,
373        end: Option<u64>,
374    ) -> PyResult<Vec<Py<PyAny>>> {
375        let rust_instruments = self
376            .inner
377            .query_instruments_filtered(
378                instrument_ids.as_deref(),
379                start.map(UnixNanos::from),
380                end.map(UnixNanos::from),
381            )
382            .map_err(|e| PyIOError::new_err(format!("Failed to query instruments: {e}")))?;
383        Python::attach(|py| {
384            rust_instruments
385                .into_iter()
386                .map(|inst| instrument_any_to_pyobject(py, inst))
387                .collect()
388        })
389    }
390
391    /// Extend file names in the catalog with additional timestamp information.
392    ///
393    /// # Parameters
394    ///
395    /// - `data_cls`: The data class name
396    /// - `instrument_id`: Optional instrument ID filter
397    /// - `start`: Start timestamp (nanoseconds since Unix epoch)
398    /// - `end`: End timestamp (nanoseconds since Unix epoch)
399    #[pyo3(signature = (data_cls, instrument_id=None, *, start, end))]
400    #[expect(clippy::needless_pass_by_value)]
401    pub fn extend_file_name(
402        &self,
403        data_cls: &str,
404        instrument_id: Option<String>,
405        start: u64,
406        end: u64,
407    ) -> PyResult<()> {
408        // Convert u64 timestamps to UnixNanos
409        let start_nanos = UnixNanos::from(start);
410        let end_nanos = UnixNanos::from(end);
411
412        self.inner
413            .extend_file_name(data_cls, instrument_id.as_deref(), start_nanos, end_nanos)
414            .map_err(|e| PyIOError::new_err(format!("Failed to extend file name: {e}")))
415    }
416
417    /// Consolidate all data files in the catalog within the specified time range.
418    ///
419    /// # Parameters
420    ///
421    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
422    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
423    /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
424    /// - `deduplicate`: Optional flag to deduplicate rows when combining files
425    #[pyo3(signature = (start=None, end=None, ensure_contiguous_files=None, deduplicate=None))]
426    pub fn consolidate_catalog(
427        &self,
428        start: Option<u64>,
429        end: Option<u64>,
430        ensure_contiguous_files: Option<bool>,
431        deduplicate: Option<bool>,
432    ) -> PyResult<()> {
433        // Convert u64 timestamps to UnixNanos
434        let start_nanos = start.map(UnixNanos::from);
435        let end_nanos = end.map(UnixNanos::from);
436
437        self.inner
438            .consolidate_catalog(start_nanos, end_nanos, ensure_contiguous_files, deduplicate)
439            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate catalog: {e}")))
440    }
441
442    /// Consolidate data files for a specific data type within the specified time range.
443    ///
444    /// # Parameters
445    ///
446    /// - `type_name`: The data type name to consolidate
447    /// - `instrument_id`: Optional instrument ID filter
448    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
449    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
450    /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
451    /// - `deduplicate`: Optional flag to deduplicate rows when combining files
452    #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None, ensure_contiguous_files=None, deduplicate=None))]
453    #[expect(clippy::needless_pass_by_value)]
454    pub fn consolidate_data(
455        &self,
456        type_name: &str,
457        instrument_id: Option<String>,
458        start: Option<u64>,
459        end: Option<u64>,
460        ensure_contiguous_files: Option<bool>,
461        deduplicate: Option<bool>,
462    ) -> PyResult<()> {
463        // Convert u64 timestamps to UnixNanos
464        let start_nanos = start.map(UnixNanos::from);
465        let end_nanos = end.map(UnixNanos::from);
466
467        self.inner
468            .consolidate_data(
469                type_name,
470                instrument_id.as_deref(),
471                start_nanos,
472                end_nanos,
473                ensure_contiguous_files,
474                deduplicate,
475            )
476            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data: {e}")))
477    }
478
479    /// Consolidate all data files in the catalog by splitting them into fixed time periods.
480    ///
481    /// This method identifies all leaf directories in the catalog that contain parquet files
482    /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
483    /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
484    /// and instrument IDs in the catalog.
485    ///
486    /// # Parameters
487    ///
488    /// - `period_nanos`: Optional period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
489    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
490    /// - `start`: Optional start timestamp for the consolidation range (nanoseconds since Unix epoch)
491    /// - `end`: Optional end timestamp for the consolidation range (nanoseconds since Unix epoch)
492    /// - `ensure_contiguous_files`: Optional flag to control file naming strategy
493    #[pyo3(signature = (period_nanos=None, start=None, end=None, ensure_contiguous_files=None))]
494    pub fn consolidate_catalog_by_period(
495        &mut self,
496        period_nanos: Option<u64>,
497        start: Option<u64>,
498        end: Option<u64>,
499        ensure_contiguous_files: Option<bool>,
500    ) -> PyResult<()> {
501        // Convert u64 timestamps to UnixNanos
502        let start_nanos = start.map(UnixNanos::from);
503        let end_nanos = end.map(UnixNanos::from);
504
505        self.inner
506            .consolidate_catalog_by_period(
507                period_nanos,
508                start_nanos,
509                end_nanos,
510                ensure_contiguous_files,
511            )
512            .map_err(|e| {
513                PyIOError::new_err(format!("Failed to consolidate catalog by period: {e}"))
514            })
515    }
516
517    /// Consolidate data files by splitting them into fixed time periods.
518    ///
519    /// This method queries data by period and writes consolidated files immediately,
520    /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
521    /// the function automatically splits those files to preserve all data.
522    ///
523    /// # Parameters
524    ///
525    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars")
526    /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments
527    /// - `period_nanos`: Optional period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
528    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
529    /// - `start`: Optional start timestamp for consolidation range (nanoseconds since Unix epoch)
530    /// - `end`: Optional end timestamp for consolidation range (nanoseconds since Unix epoch)
531    /// - `ensure_contiguous_files`: Optional flag to control file naming strategy
532    #[pyo3(signature = (type_name, identifier=None, period_nanos=None, start=None, end=None, ensure_contiguous_files=None))]
533    #[expect(clippy::needless_pass_by_value)]
534    pub fn consolidate_data_by_period(
535        &mut self,
536        type_name: &str,
537        identifier: Option<String>,
538        period_nanos: Option<u64>,
539        start: Option<u64>,
540        end: Option<u64>,
541        ensure_contiguous_files: Option<bool>,
542    ) -> PyResult<()> {
543        // Convert u64 timestamps to UnixNanos
544        let start_nanos = start.map(UnixNanos::from);
545        let end_nanos = end.map(UnixNanos::from);
546
547        self.inner
548            .consolidate_data_by_period(
549                type_name,
550                identifier.as_deref(),
551                period_nanos,
552                start_nanos,
553                end_nanos,
554                ensure_contiguous_files,
555            )
556            .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data by period: {e}")))
557    }
558
559    /// Reset all catalog file names to their canonical form.
560    pub fn reset_all_file_names(&self) -> PyResult<()> {
561        self.inner
562            .reset_all_file_names()
563            .map_err(|e| PyIOError::new_err(format!("Failed to reset catalog file names: {e}")))
564    }
565
566    /// Reset data file names for a specific data class to their canonical form.
567    ///
568    /// # Parameters
569    ///
570    /// - `data_cls`: The data class name
571    /// - `instrument_id`: Optional instrument ID filter
572    #[pyo3(signature = (data_cls, instrument_id=None))]
573    #[expect(clippy::needless_pass_by_value)]
574    pub fn reset_data_file_names(
575        &self,
576        data_cls: &str,
577        instrument_id: Option<String>,
578    ) -> PyResult<()> {
579        self.inner
580            .reset_data_file_names(data_cls, instrument_id.as_deref())
581            .map_err(|e| PyIOError::new_err(format!("Failed to reset data file names: {e}")))
582    }
583
584    /// Delete data within a specified time range across the entire catalog.
585    ///
586    /// This method identifies all leaf directories in the catalog that contain parquet files
587    /// and deletes data within the specified time range from each directory. A leaf directory
588    /// is one that contains files but no subdirectories. This is a convenience method that
589    /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
590    ///
591    /// # Parameters
592    ///
593    /// - `start`: Optional start timestamp for the deletion range (nanoseconds since Unix epoch)
594    /// - `end`: Optional end timestamp for the deletion range (nanoseconds since Unix epoch)
595    ///
596    /// # Notes
597    ///
598    /// - This operation permanently removes data and cannot be undone
599    /// - The deletion process handles file intersections intelligently by splitting files
600    ///   when they partially overlap with the deletion range
601    /// - Files completely within the deletion range are removed entirely
602    /// - Files partially overlapping the deletion range are split to preserve data outside the range
603    /// - This method is useful for bulk data cleanup operations across the entire catalog
604    /// - Empty directories are not automatically removed after deletion
605    #[pyo3(signature = (start=None, end=None))]
606    pub fn delete_catalog_range(&mut self, start: Option<u64>, end: Option<u64>) -> PyResult<()> {
607        // Convert u64 timestamps to UnixNanos
608        let start_nanos = start.map(UnixNanos::from);
609        let end_nanos = end.map(UnixNanos::from);
610
611        self.inner
612            .delete_catalog_range(start_nanos, end_nanos)
613            .map_err(|e| PyIOError::new_err(format!("Failed to delete catalog range: {e}")))
614    }
615
616    /// Delete data within a specified time range for a specific data type and instrument.
617    ///
618    /// This method identifies all parquet files that intersect with the specified time range
619    /// and handles them appropriately:
620    /// - Files completely within the range are deleted
621    /// - Files partially overlapping the range are split to preserve data outside the range
622    /// - The original intersecting files are removed after processing
623    ///
624    /// # Parameters
625    ///
626    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars")
627    /// - `instrument_id`: Optional instrument ID to delete data for. If None, deletes data across all instruments
628    /// - `start`: Optional start timestamp for the deletion range (nanoseconds since Unix epoch)
629    /// - `end`: Optional end timestamp for the deletion range (nanoseconds since Unix epoch)
630    ///
631    /// # Notes
632    ///
633    /// - This operation permanently removes data and cannot be undone
634    /// - Files that partially overlap the deletion range are split to preserve data outside the range
635    /// - The method ensures data integrity by using atomic operations where possible
636    /// - Empty directories are not automatically removed after deletion
637    #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None))]
638    #[expect(clippy::needless_pass_by_value)]
639    pub fn delete_data_range(
640        &mut self,
641        type_name: &str,
642        instrument_id: Option<String>,
643        start: Option<u64>,
644        end: Option<u64>,
645    ) -> PyResult<()> {
646        // Convert u64 timestamps to UnixNanos
647        let start_nanos = start.map(UnixNanos::from);
648        let end_nanos = end.map(UnixNanos::from);
649
650        self.inner
651            .delete_data_range(type_name, instrument_id.as_deref(), start_nanos, end_nanos)
652            .map_err(|e| PyIOError::new_err(format!("Failed to delete data range: {e}")))
653    }
654
655    /// Write custom data to Parquet files.
656    ///
657    /// Requires `CustomData` wrappers. Callers must wrap raw custom objects in
658    /// `CustomData(data_type=DataType(cls, metadata=...), data=...)` before writing.
659    #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
660    pub fn write_custom_data(
661        &self,
662        _py: Python<'_>,
663        data: Vec<Bound<'_, PyAny>>,
664        start: Option<u64>,
665        end: Option<u64>,
666        skip_disjoint_check: bool,
667    ) -> PyResult<String> {
668        use nautilus_model::data::CustomData;
669
670        let mut custom_items: Vec<CustomData> = Vec::with_capacity(data.len());
671        for obj in data {
672            let custom = obj.extract::<CustomData>().map_err(|_| {
673                to_pytype_err(
674                    "write_custom_data requires CustomData wrappers; wrap with CustomData(data_type=DataType(cls, metadata=...), data=...)",
675                )
676            })?;
677            custom_items.push(custom);
678        }
679
680        let start_nanos = start.map(UnixNanos::from);
681        let end_nanos = end.map(UnixNanos::from);
682
683        self.inner
684            .write_custom_data_batch(
685                custom_items,
686                start_nanos,
687                end_nanos,
688                Some(skip_disjoint_check),
689            )
690            .map(|path| path.to_string_lossy().to_string())
691            .map_err(|e| PyIOError::new_err(format!("Failed to write custom data: {e}")))
692    }
693
694    /// List all instrument IDs available in the catalog for a given data type.
695    pub fn list_instruments(&self, data_type: &str) -> PyResult<Vec<String>> {
696        self.inner
697            .list_instruments(data_type)
698            .map_err(|e| PyIOError::new_err(format!("Failed to list instruments: {e}")))
699    }
700
701    /// List all Parquet files in the catalog for a given data type and instrument.
702    pub fn list_parquet_files(
703        &self,
704        data_type: &str,
705        instrument_id: &str,
706    ) -> PyResult<Vec<String>> {
707        let directory = format!("data/{data_type}/{instrument_id}");
708        self.inner
709            .list_parquet_files(&directory)
710            .map_err(|e| PyIOError::new_err(format!("Failed to list parquet files: {e}")))
711    }
712
713    /// Query files in the catalog matching the specified criteria.
714    ///
715    /// # Parameters
716    ///
717    /// - `data_cls`: The data class name to query
718    /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings
719    ///   (e.g., "EUR/USD.SIM") or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
720    ///   For bars, partial matching is supported.
721    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
722    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
723    ///
724    /// # Returns
725    ///
726    /// Returns a list of file paths matching the criteria.
727    #[pyo3(signature = (data_cls, identifiers=None, start=None, end=None))]
728    pub fn query_files(
729        &self,
730        data_cls: &str,
731        identifiers: Option<Vec<String>>,
732        start: Option<u64>,
733        end: Option<u64>,
734    ) -> PyResult<Vec<String>> {
735        // Convert u64 timestamps to UnixNanos
736        let start_nanos = start.map(UnixNanos::from);
737        let end_nanos = end.map(UnixNanos::from);
738
739        self.inner
740            .query_files(data_cls, identifiers, start_nanos, end_nanos)
741            .map_err(|e| PyIOError::new_err(format!("Failed to query files list: {e}")))
742    }
743
744    /// Get missing time intervals for a data request.
745    ///
746    /// # Parameters
747    ///
748    /// - `start`: Start timestamp (nanoseconds since Unix epoch)
749    /// - `end`: End timestamp (nanoseconds since Unix epoch)
750    /// - `data_cls`: The data class name
751    /// - `instrument_id`: Optional instrument ID filter
752    ///
753    /// # Returns
754    ///
755    /// Returns a list of (start, end) timestamp tuples representing missing intervals.
756    #[pyo3(signature = (start, end, data_cls, instrument_id=None))]
757    #[expect(clippy::needless_pass_by_value)]
758    pub fn get_missing_intervals_for_request(
759        &self,
760        start: u64,
761        end: u64,
762        data_cls: &str,
763        instrument_id: Option<String>,
764    ) -> PyResult<Vec<(u64, u64)>> {
765        self.inner
766            .get_missing_intervals_for_request(start, end, data_cls, instrument_id.as_deref())
767            .map_err(|e| PyIOError::new_err(format!("Failed to get missing intervals: {e}")))
768    }
769
770    /// Query the first timestamp for a specific data class and instrument.
771    ///
772    /// # Parameters
773    ///
774    /// - `data_cls`: The data class name
775    /// - `instrument_id`: Optional instrument ID filter
776    ///
777    /// # Returns
778    ///
779    /// Returns the first timestamp as nanoseconds since Unix epoch, or None if no data exists.
780    #[pyo3(signature = (data_cls, instrument_id=None))]
781    #[expect(clippy::needless_pass_by_value)]
782    pub fn query_first_timestamp(
783        &self,
784        data_cls: &str,
785        instrument_id: Option<String>,
786    ) -> PyResult<Option<u64>> {
787        self.inner
788            .query_first_timestamp(data_cls, instrument_id.as_deref())
789            .map_err(|e| PyIOError::new_err(format!("Failed to query first timestamp: {e}")))
790    }
791
792    /// Query the last timestamp for a specific data class and instrument.
793    ///
794    /// # Parameters
795    ///
796    /// - `data_cls`: The data class name
797    /// - `instrument_id`: Optional instrument ID filter
798    ///
799    /// # Returns
800    ///
801    /// Returns the last timestamp as nanoseconds since Unix epoch, or None if no data exists.
802    #[pyo3(signature = (data_cls, instrument_id=None))]
803    #[expect(clippy::needless_pass_by_value)]
804    pub fn query_last_timestamp(
805        &self,
806        data_cls: &str,
807        instrument_id: Option<String>,
808    ) -> PyResult<Option<u64>> {
809        self.inner
810            .query_last_timestamp(data_cls, instrument_id.as_deref())
811            .map_err(|e| PyIOError::new_err(format!("Failed to query last timestamp: {e}")))
812    }
813
814    /// Get time intervals covered by data for a specific data class and instrument.
815    ///
816    /// # Parameters
817    ///
818    /// - `data_cls`: The data class name
819    /// - `instrument_id`: Optional instrument ID filter
820    ///
821    /// # Returns
822    ///
823    /// Returns a list of (start, end) timestamp tuples representing covered intervals.
824    #[pyo3(signature = (data_cls, instrument_id=None))]
825    #[expect(clippy::needless_pass_by_value)]
826    pub fn get_intervals(
827        &self,
828        data_cls: &str,
829        instrument_id: Option<String>,
830    ) -> PyResult<Vec<(u64, u64)>> {
831        self.inner
832            .get_intervals(data_cls, instrument_id.as_deref())
833            .map_err(|e| PyIOError::new_err(format!("Failed to get intervals: {e}")))
834    }
835
836    /// Query Parquet files for data matching the given criteria.
837    #[pyo3(signature = (data_type, identifiers=None, start=None, end=None, where_clause=None, files=None, optimize_file_loading=true))]
838    #[expect(clippy::too_many_arguments)]
839    pub fn query(
840        &mut self,
841        py: Python<'_>,
842        data_type: &str,
843        identifiers: Option<Vec<String>>,
844        start: Option<u64>,
845        end: Option<u64>,
846        where_clause: Option<&str>,
847        files: Option<Vec<String>>,
848        optimize_file_loading: bool,
849    ) -> PyResult<Vec<Py<PyAny>>> {
850        let start_nanos = start.map(UnixNanos::from);
851        let end_nanos = end.map(UnixNanos::from);
852
853        let data = match data_type {
854            "quotes" => {
855                let ticks = self
856                    .inner
857                    .query_typed_data::<QuoteTick>(
858                        identifiers,
859                        start_nanos,
860                        end_nanos,
861                        where_clause,
862                        files,
863                        optimize_file_loading,
864                    )
865                    .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
866                ticks.into_iter().map(Data::from).collect()
867            }
868            "trades" => {
869                let ticks = self
870                    .inner
871                    .query_typed_data::<TradeTick>(
872                        identifiers,
873                        start_nanos,
874                        end_nanos,
875                        where_clause,
876                        files,
877                        optimize_file_loading,
878                    )
879                    .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
880                ticks.into_iter().map(Data::from).collect()
881            }
882            "bars" => {
883                let bars = self
884                    .inner
885                    .query_typed_data::<Bar>(
886                        identifiers,
887                        start_nanos,
888                        end_nanos,
889                        where_clause,
890                        files,
891                        optimize_file_loading,
892                    )
893                    .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
894                bars.into_iter().map(Data::from).collect()
895            }
896            "order_book_deltas" => {
897                let deltas = self
898                    .inner
899                    .query_typed_data::<OrderBookDelta>(
900                        identifiers,
901                        start_nanos,
902                        end_nanos,
903                        where_clause,
904                        files,
905                        optimize_file_loading,
906                    )
907                    .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
908                deltas.into_iter().map(Data::from).collect()
909            }
910            "order_book_depths" => {
911                let depths = self
912                    .inner
913                    .query_typed_data::<OrderBookDepth10>(
914                        identifiers,
915                        start_nanos,
916                        end_nanos,
917                        where_clause,
918                        files,
919                        optimize_file_loading,
920                    )
921                    .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
922                depths.into_iter().map(Data::from).collect()
923            }
924            "index_prices" => {
925                let prices = self
926                    .inner
927                    .query_typed_data::<IndexPriceUpdate>(
928                        identifiers,
929                        start_nanos,
930                        end_nanos,
931                        where_clause,
932                        files,
933                        optimize_file_loading,
934                    )
935                    .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
936                prices.into_iter().map(Data::from).collect()
937            }
938            "mark_prices" => {
939                let prices = self
940                    .inner
941                    .query_typed_data::<MarkPriceUpdate>(
942                        identifiers,
943                        start_nanos,
944                        end_nanos,
945                        where_clause,
946                        files,
947                        optimize_file_loading,
948                    )
949                    .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
950                prices.into_iter().map(Data::from).collect()
951            }
952            "instrument_status" => {
953                let statuses = self
954                    .inner
955                    .query_typed_data::<InstrumentStatus>(
956                        identifiers,
957                        start_nanos,
958                        end_nanos,
959                        where_clause,
960                        files,
961                        optimize_file_loading,
962                    )
963                    .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
964                statuses.into_iter().map(Data::from).collect()
965            }
966            "instrument_closes" => {
967                let closes = self
968                    .inner
969                    .query_typed_data::<InstrumentClose>(
970                        identifiers,
971                        start_nanos,
972                        end_nanos,
973                        where_clause,
974                        files,
975                        optimize_file_loading,
976                    )
977                    .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
978                closes.into_iter().map(Data::from).collect()
979            }
980            _ => py
981                .detach(|| {
982                    self.inner.query_custom_data_dynamic(
983                        data_type,
984                        identifiers.as_deref(),
985                        start_nanos,
986                        end_nanos,
987                        where_clause,
988                        files.clone(),
989                        optimize_file_loading,
990                    )
991                })
992                .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?,
993        };
994
995        let mut python_objects = Vec::new();
996        for item in data {
997            python_objects.push(data_to_pyobject(py, item)?);
998        }
999        Ok(python_objects)
1000    }
1001
1002    /// Query quote tick data from Parquet files.
1003    ///
1004    /// # Parameters
1005    ///
1006    /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings
1007    ///   (e.g., "EUR/USD.SIM") or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1008    ///   For bars, partial matching is supported.
1009    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1010    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1011    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1012    ///
1013    /// # Returns
1014    ///
1015    /// Returns a vector of `QuoteTick` objects matching the query criteria.
1016    #[pyo3(signature = (identifiers=None, start=None, end=None, where_clause=None))]
1017    pub fn query_quote_ticks(
1018        &mut self,
1019        identifiers: Option<Vec<String>>,
1020        start: Option<u64>,
1021        end: Option<u64>,
1022        where_clause: Option<&str>,
1023    ) -> PyResult<Vec<QuoteTick>> {
1024        let start_nanos = start.map(UnixNanos::from);
1025        let end_nanos = end.map(UnixNanos::from);
1026
1027        self.inner
1028            .query_typed_data::<QuoteTick>(
1029                identifiers,
1030                start_nanos,
1031                end_nanos,
1032                where_clause,
1033                None,
1034                true, // optimize_file_loading=true for directory-based registration (default)
1035            )
1036            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1037    }
1038
1039    /// Query trade tick data from Parquet files.
1040    ///
1041    /// # Parameters
1042    ///
1043    /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings
1044    ///   (e.g., "EUR/USD.SIM") or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1045    ///   For bars, partial matching is supported.
1046    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1047    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1048    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1049    ///
1050    /// # Returns
1051    ///
1052    /// Returns a vector of `TradeTick` objects matching the query criteria.
1053    #[pyo3(signature = (identifiers=None, start=None, end=None, where_clause=None))]
1054    pub fn query_trade_ticks(
1055        &mut self,
1056        identifiers: Option<Vec<String>>,
1057        start: Option<u64>,
1058        end: Option<u64>,
1059        where_clause: Option<&str>,
1060    ) -> PyResult<Vec<TradeTick>> {
1061        let start_nanos = start.map(UnixNanos::from);
1062        let end_nanos = end.map(UnixNanos::from);
1063
1064        self.inner
1065            .query_typed_data::<TradeTick>(
1066                identifiers,
1067                start_nanos,
1068                end_nanos,
1069                where_clause,
1070                None,
1071                true, // optimize_file_loading=true for directory-based registration (default)
1072            )
1073            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1074    }
1075
1076    /// Query order book delta data from Parquet files.
1077    ///
1078    /// # Parameters
1079    ///
1080    /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings
1081    ///   (e.g., "EUR/USD.SIM") or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1082    ///   For bars, partial matching is supported.
1083    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1084    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1085    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1086    ///
1087    /// # Returns
1088    ///
1089    /// Returns a vector of `OrderBookDelta` objects matching the query criteria.
1090    #[pyo3(signature = (identifiers=None, start=None, end=None, where_clause=None))]
1091    pub fn query_order_book_deltas(
1092        &mut self,
1093        identifiers: Option<Vec<String>>,
1094        start: Option<u64>,
1095        end: Option<u64>,
1096        where_clause: Option<&str>,
1097    ) -> PyResult<Vec<OrderBookDelta>> {
1098        let start_nanos = start.map(UnixNanos::from);
1099        let end_nanos = end.map(UnixNanos::from);
1100
1101        self.inner
1102            .query_typed_data::<OrderBookDelta>(
1103                identifiers,
1104                start_nanos,
1105                end_nanos,
1106                where_clause,
1107                None,
1108                true, // optimize_file_loading=true for directory-based registration (default)
1109            )
1110            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1111    }
1112
1113    /// Query bar data from Parquet files.
1114    ///
1115    /// # Parameters
1116    ///
1117    /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings
1118    ///   (e.g., "EUR/USD.SIM") or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1119    ///   For bars, partial matching is supported (e.g., "EUR/USD.SIM" will match all bar types for that instrument).
1120    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1121    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1122    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1123    ///
1124    /// # Returns
1125    ///
1126    /// Returns a vector of Bar objects matching the query criteria.
1127    #[pyo3(signature = (identifiers=None, start=None, end=None, where_clause=None))]
1128    pub fn query_bars(
1129        &mut self,
1130        identifiers: Option<Vec<String>>,
1131        start: Option<u64>,
1132        end: Option<u64>,
1133        where_clause: Option<&str>,
1134    ) -> PyResult<Vec<Bar>> {
1135        let start_nanos = start.map(UnixNanos::from);
1136        let end_nanos = end.map(UnixNanos::from);
1137
1138        self.inner
1139            .query_typed_data::<Bar>(
1140                identifiers,
1141                start_nanos,
1142                end_nanos,
1143                where_clause,
1144                None,
1145                true, // optimize_file_loading=true for directory-based registration (default)
1146            )
1147            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1148    }
1149
1150    /// Query order book depth data from Parquet files.
1151    ///
1152    /// # Parameters
1153    ///
1154    /// - `instrument_ids`: Optional list of instrument IDs to filter by
1155    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1156    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1157    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1158    ///
1159    /// # Returns
1160    ///
1161    /// Returns a vector of `OrderBookDepth10` objects matching the query criteria.
1162    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
1163    pub fn query_order_book_depths(
1164        &mut self,
1165        instrument_ids: Option<Vec<String>>,
1166        start: Option<u64>,
1167        end: Option<u64>,
1168        where_clause: Option<&str>,
1169    ) -> PyResult<Vec<OrderBookDepth10>> {
1170        let start_nanos = start.map(UnixNanos::from);
1171        let end_nanos = end.map(UnixNanos::from);
1172
1173        self.inner
1174            .query_typed_data::<OrderBookDepth10>(
1175                instrument_ids,
1176                start_nanos,
1177                end_nanos,
1178                where_clause,
1179                None,
1180                true, // optimize_file_loading=true for directory-based registration (default)
1181            )
1182            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1183    }
1184
1185    /// Query mark price update data from Parquet files.
1186    ///
1187    /// # Parameters
1188    ///
1189    /// - `instrument_ids`: Optional list of instrument IDs to filter by
1190    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1191    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1192    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1193    ///
1194    /// # Returns
1195    ///
1196    /// Returns a vector of `MarkPriceUpdate` objects matching the query criteria.
1197    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
1198    pub fn query_mark_price_updates(
1199        &mut self,
1200        instrument_ids: Option<Vec<String>>,
1201        start: Option<u64>,
1202        end: Option<u64>,
1203        where_clause: Option<&str>,
1204    ) -> PyResult<Vec<MarkPriceUpdate>> {
1205        let start_nanos = start.map(UnixNanos::from);
1206        let end_nanos = end.map(UnixNanos::from);
1207
1208        self.inner
1209            .query_typed_data::<MarkPriceUpdate>(
1210                instrument_ids,
1211                start_nanos,
1212                end_nanos,
1213                where_clause,
1214                None,
1215                true, // optimize_file_loading=true for directory-based registration (default)
1216            )
1217            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1218    }
1219
1220    /// Query index price update data from Parquet files.
1221    ///
1222    /// # Parameters
1223    ///
1224    /// - `instrument_ids`: Optional list of instrument IDs to filter by
1225    /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1226    /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1227    /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1228    ///
1229    /// # Returns
1230    ///
1231    /// Returns a vector of `IndexPriceUpdate` objects matching the query criteria.
1232    #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
1233    pub fn query_index_price_updates(
1234        &mut self,
1235        instrument_ids: Option<Vec<String>>,
1236        start: Option<u64>,
1237        end: Option<u64>,
1238        where_clause: Option<&str>,
1239    ) -> PyResult<Vec<IndexPriceUpdate>> {
1240        let start_nanos = start.map(UnixNanos::from);
1241        let end_nanos = end.map(UnixNanos::from);
1242
1243        self.inner
1244            .query_typed_data::<IndexPriceUpdate>(
1245                instrument_ids,
1246                start_nanos,
1247                end_nanos,
1248                where_clause,
1249                None,
1250                true, // optimize_file_loading=true for directory-based registration (default)
1251            )
1252            .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1253    }
1254
1255    /// List all data types available in the catalog.
1256    ///
1257    /// # Returns
1258    ///
1259    /// Returns a list of data type names (as directory stems) in the catalog.
1260    pub fn list_data_types(&self) -> PyResult<Vec<String>> {
1261        self.inner
1262            .list_data_types()
1263            .map_err(|e| PyIOError::new_err(format!("Failed to list data types: {e}")))
1264    }
1265
1266    /// List all live run IDs available in the catalog.
1267    ///
1268    /// # Returns
1269    ///
1270    /// Returns a list of live run IDs (as directory stems) in the catalog.
1271    pub fn list_live_runs(&self) -> PyResult<Vec<String>> {
1272        self.inner
1273            .list_live_runs()
1274            .map_err(|e| PyIOError::new_err(format!("Failed to list live runs: {e}")))
1275    }
1276
1277    /// List all backtest run IDs available in the catalog.
1278    ///
1279    /// # Returns
1280    ///
1281    /// Returns a list of backtest run IDs (as directory stems) in the catalog.
1282    pub fn list_backtest_runs(&self) -> PyResult<Vec<String>> {
1283        self.inner
1284            .list_backtest_runs()
1285            .map_err(|e| PyIOError::new_err(format!("Failed to list backtest runs: {e}")))
1286    }
1287
1288    /// List all backtest run instances available in the catalog.
1289    pub fn list_backtests(&self) -> PyResult<Vec<String>> {
1290        self.inner
1291            .list_backtest_runs()
1292            .map_err(|e| PyIOError::new_err(format!("Failed to list backtests: {e}")))
1293    }
1294
1295    /// Read data from a live run instance.
1296    ///
1297    /// # Parameters
1298    ///
1299    /// - `instance_id`: The ID of the live run instance
1300    ///
1301    /// # Returns
1302    ///
1303    /// Returns a list of data objects from the live run, sorted by timestamp.
1304    #[pyo3(signature = (instance_id))]
1305    pub fn read_live_run(&self, py: Python<'_>, instance_id: &str) -> PyResult<Vec<Py<PyAny>>> {
1306        let data = self
1307            .inner
1308            .read_live_run(instance_id)
1309            .map_err(|e| PyIOError::new_err(format!("Failed to read live run: {e}")))?;
1310
1311        let mut python_objects = Vec::new();
1312        for item in data {
1313            python_objects.push(data_to_pyobject(py, item)?);
1314        }
1315        Ok(python_objects)
1316    }
1317
1318    /// Read data from a backtest run instance.
1319    ///
1320    /// # Parameters
1321    ///
1322    /// - `instance_id`: The ID of the backtest run instance
1323    ///
1324    /// # Returns
1325    ///
1326    /// Returns a list of data objects from the backtest run, sorted by timestamp.
1327    #[pyo3(signature = (instance_id))]
1328    pub fn read_backtest(&self, py: Python<'_>, instance_id: &str) -> PyResult<Vec<Py<PyAny>>> {
1329        let data = self
1330            .inner
1331            .read_backtest(instance_id)
1332            .map_err(|e| PyIOError::new_err(format!("Failed to read backtest: {e}")))?;
1333
1334        let mut python_objects = Vec::new();
1335        for item in data {
1336            python_objects.push(data_to_pyobject(py, item)?);
1337        }
1338        Ok(python_objects)
1339    }
1340
1341    /// Convert stream data from feather files to parquet files.
1342    ///
1343    /// This method reads data from feather files generated during a backtest or live run
1344    /// and writes it to the catalog in parquet format. It's useful for converting temporary
1345    /// stream data into a more permanent and queryable format.
1346    ///
1347    /// # Parameters
1348    ///
1349    /// - `instance_id`: The ID of the backtest or live run instance
1350    /// - `data_cls`: The data class name (e.g., "quotes", "trades", "bars")
1351    /// - `subdirectory`: Optional subdirectory containing the feather files. Either "backtest" or "live" (default: "backtest")
1352    /// - `identifiers`: Optional list of identifiers to filter by (instrument IDs or bar types)
1353    /// - `use_ts_event_for_ts_init`: If true, replaces the `ts_init` column with `ts_event` column values before deserializing
1354    ///
1355    /// # Returns
1356    ///
1357    /// Returns nothing on success.
1358    ///
1359    /// # Examples
1360    ///
1361    /// ```python
1362    /// # Convert backtest stream data to parquet
1363    /// catalog.convert_stream_to_data(
1364    ///     "instance-123",
1365    ///     "quotes",
1366    ///     subdirectory="backtest"
1367    /// )
1368    ///
1369    /// # Convert live run data with identifier filtering
1370    /// catalog.convert_stream_to_data(
1371    ///     "instance-456",
1372    ///     "trades",
1373    ///     subdirectory="live",
1374    ///     identifiers=["EUR/USD.SIM"]
1375    /// )
1376    /// ```
1377    #[pyo3(signature = (instance_id, data_cls, subdirectory=None, identifiers=None, use_ts_event_for_ts_init=false))]
1378    #[expect(clippy::needless_pass_by_value)]
1379    pub fn convert_stream_to_data(
1380        &mut self,
1381        instance_id: &str,
1382        data_cls: &str,
1383        subdirectory: Option<&str>,
1384        identifiers: Option<Vec<String>>,
1385        use_ts_event_for_ts_init: bool,
1386    ) -> PyResult<()> {
1387        let subdir = subdirectory.unwrap_or("backtest");
1388
1389        match self.inner.convert_stream_to_data(
1390            instance_id,
1391            data_cls,
1392            Some(subdir),
1393            identifiers.as_deref(),
1394            use_ts_event_for_ts_init,
1395        ) {
1396            Ok(()) => Ok(()),
1397            Err(e) => Err(PyIOError::new_err(format!(
1398                "Failed to convert stream to data: {e}"
1399            ))),
1400        }
1401    }
1402
1403    /// Query custom data from Parquet files.
1404    #[pyo3(signature = (type_name, identifiers=None, start=None, end=None, where_clause=None))]
1405    #[expect(clippy::needless_pass_by_value)]
1406    pub fn query_custom_data(
1407        &mut self,
1408        py: Python<'_>,
1409        type_name: &str,
1410        identifiers: Option<Vec<String>>,
1411        start: Option<u64>,
1412        end: Option<u64>,
1413        where_clause: Option<&str>,
1414    ) -> PyResult<Vec<Py<PyAny>>> {
1415        let start_nanos = start.map(UnixNanos::from);
1416        let end_nanos = end.map(UnixNanos::from);
1417
1418        let data = py
1419            .detach(|| {
1420                self.inner.query_custom_data_dynamic(
1421                    type_name,
1422                    identifiers.as_deref(),
1423                    start_nanos,
1424                    end_nanos,
1425                    where_clause,
1426                    None,
1427                    true,
1428                )
1429            })
1430            .map_err(|e| PyIOError::new_err(format!("Failed to query custom data: {e}")))?;
1431
1432        let mut python_objects = Vec::new();
1433
1434        for item in data {
1435            let py_obj: Py<PyAny> = match item {
1436                Data::Custom(custom) => Py::new(py, custom.clone())?.into_any(),
1437                _ => return Err(PyIOError::new_err("Expected custom data")),
1438            };
1439            python_objects.push(py_obj);
1440        }
1441        Ok(python_objects)
1442    }
1443}