nautilus_persistence/python/catalog.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::collections::HashMap;
17
18use nautilus_core::{UnixNanos, python::to_pytype_err};
19use nautilus_model::{
20 data::{
21 Bar, Data, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate, OrderBookDelta,
22 OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose,
23 },
24 python::instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
25};
26use pyo3::{exceptions::PyIOError, prelude::*, types::PyList};
27
28use crate::backend::catalog::ParquetDataCatalog;
29
30/// Converts a single `Data` variant into a Python object for returning from catalog methods.
31fn data_to_pyobject(py: Python<'_>, item: Data) -> PyResult<Py<PyAny>> {
32 match item {
33 Data::Quote(quote) => Py::new(py, quote).map(|x| x.into_any()),
34 Data::Trade(trade) => Py::new(py, trade).map(|x| x.into_any()),
35 Data::Bar(bar) => Py::new(py, bar).map(|x| x.into_any()),
36 Data::Delta(delta) => Py::new(py, delta).map(|x| x.into_any()),
37 Data::Deltas(deltas) => Py::new(py, (*deltas).clone()).map(|x| x.into_any()),
38 Data::Depth10(depth) => Py::new(py, *depth).map(|x| x.into_any()),
39 Data::IndexPriceUpdate(price) => Py::new(py, price).map(|x| x.into_any()),
40 Data::MarkPriceUpdate(price) => Py::new(py, price).map(|x| x.into_any()),
41 Data::InstrumentStatus(status) => Py::new(py, status).map(|x| x.into_any()),
42 Data::InstrumentClose(close) => Py::new(py, close).map(|x| x.into_any()),
43 Data::Custom(custom) => Py::new(py, custom).map(|x| x.into_any()),
44 }
45}
46
47/// A catalog for writing data to Parquet files.
48#[pyclass(
49 name = "ParquetDataCatalog",
50 module = "nautilus_trader.core.nautilus_pyo3.persistence"
51)]
52#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.persistence")]
53pub struct PyParquetDataCatalog {
54 inner: ParquetDataCatalog,
55}
56
57#[pymethods]
58#[pyo3_stub_gen::derive::gen_stub_pymethods]
59impl PyParquetDataCatalog {
60 /// Create a new `ParquetCatalog` with the given base path and optional parameters.
61 ///
62 /// # Parameters
63 ///
64 /// - `base_path`: The base path for the catalog
65 /// - `storage_options`: Optional storage configuration for cloud backends
66 /// - `batch_size`: Optional batch size for processing (default: 5000)
67 /// - `compression`: Optional compression type (0=UNCOMPRESSED, 1=SNAPPY, 2=GZIP, 3=LZO, 4=BROTLI, 5=LZ4, 6=ZSTD)
68 /// - `max_row_group_size`: Optional maximum row group size (default: 5000)
69 #[new]
70 #[pyo3(signature = (base_path, storage_options=None, batch_size=None, compression=None, max_row_group_size=None))]
71 #[must_use]
72 pub fn new(
73 base_path: &str,
74 storage_options: Option<HashMap<String, String>>,
75 batch_size: Option<usize>,
76 compression: Option<u8>,
77 max_row_group_size: Option<usize>,
78 ) -> Self {
79 let compression = compression.map(|c| match c {
80 0 => parquet::basic::Compression::UNCOMPRESSED,
81 1 => parquet::basic::Compression::SNAPPY,
82 // For GZIP, LZO, BROTLI, LZ4, ZSTD we need to use the default level
83 // since we can't pass the level parameter through PyO3
84 2 => {
85 let level = parquet::basic::GzipLevel::default();
86 parquet::basic::Compression::GZIP(level)
87 }
88 3 => parquet::basic::Compression::LZO,
89 4 => {
90 let level = parquet::basic::BrotliLevel::default();
91 parquet::basic::Compression::BROTLI(level)
92 }
93 5 => parquet::basic::Compression::LZ4,
94 6 => {
95 let level = parquet::basic::ZstdLevel::default();
96 parquet::basic::Compression::ZSTD(level)
97 }
98 _ => parquet::basic::Compression::SNAPPY,
99 });
100
101 // Convert HashMap to AHashMap for internal use
102 let storage_options = storage_options.map(|m| m.into_iter().collect());
103
104 Self {
105 inner: ParquetDataCatalog::from_uri(
106 base_path,
107 storage_options,
108 batch_size,
109 compression,
110 max_row_group_size,
111 )
112 .expect("Failed to create ParquetDataCatalog"),
113 }
114 }
115
116 // TODO: Cannot pass mixed data across pyo3 as a single type
117 // pub fn write_data(mut slf: PyRefMut<'_, Self>, data_type: NautilusDataType, data: Vec<Data>) {}
118
119 /// Write quote tick data to Parquet files.
120 ///
121 /// # Parameters
122 ///
123 /// - `data`: Vector of quote ticks to write
124 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
125 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
126 ///
127 /// # Returns
128 ///
129 /// Returns the path of the created file as a string.
130 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
131 pub fn write_quote_ticks(
132 &self,
133 data: Vec<QuoteTick>,
134 start: Option<u64>,
135 end: Option<u64>,
136 skip_disjoint_check: bool,
137 ) -> PyResult<String> {
138 // Convert u64 timestamps to UnixNanos
139 let start_nanos = start.map(UnixNanos::from);
140 let end_nanos = end.map(UnixNanos::from);
141
142 self.inner
143 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
144 .map(|path| path.to_string_lossy().to_string())
145 .map_err(|e| PyIOError::new_err(format!("Failed to write quote ticks: {e}")))
146 }
147
148 /// Write trade tick data to Parquet files.
149 ///
150 /// # Parameters
151 ///
152 /// - `data`: Vector of trade ticks to write
153 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
154 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
155 ///
156 /// # Returns
157 ///
158 /// Returns the path of the created file as a string.
159 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
160 pub fn write_trade_ticks(
161 &self,
162 data: Vec<TradeTick>,
163 start: Option<u64>,
164 end: Option<u64>,
165 skip_disjoint_check: bool,
166 ) -> PyResult<String> {
167 // Convert u64 timestamps to UnixNanos
168 let start_nanos = start.map(UnixNanos::from);
169 let end_nanos = end.map(UnixNanos::from);
170
171 self.inner
172 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
173 .map(|path| path.to_string_lossy().to_string())
174 .map_err(|e| PyIOError::new_err(format!("Failed to write trade ticks: {e}")))
175 }
176
177 /// Write order book delta data to Parquet files.
178 ///
179 /// # Parameters
180 ///
181 /// - `data`: Vector of order book deltas to write
182 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
183 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
184 ///
185 /// # Returns
186 ///
187 /// Returns the path of the created file as a string.
188 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
189 pub fn write_order_book_deltas(
190 &self,
191 data: Vec<OrderBookDelta>,
192 start: Option<u64>,
193 end: Option<u64>,
194 skip_disjoint_check: bool,
195 ) -> PyResult<String> {
196 // Convert u64 timestamps to UnixNanos
197 let start_nanos = start.map(UnixNanos::from);
198 let end_nanos = end.map(UnixNanos::from);
199
200 self.inner
201 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
202 .map(|path| path.to_string_lossy().to_string())
203 .map_err(|e| PyIOError::new_err(format!("Failed to write order book deltas: {e}")))
204 }
205
206 /// Write bar data to Parquet files.
207 ///
208 /// # Parameters
209 ///
210 /// - `data`: Vector of bars to write
211 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
212 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
213 ///
214 /// # Returns
215 ///
216 /// Returns the path of the created file as a string.
217 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
218 pub fn write_bars(
219 &self,
220 data: Vec<Bar>,
221 start: Option<u64>,
222 end: Option<u64>,
223 skip_disjoint_check: bool,
224 ) -> PyResult<String> {
225 // Convert u64 timestamps to UnixNanos
226 let start_nanos = start.map(UnixNanos::from);
227 let end_nanos = end.map(UnixNanos::from);
228
229 self.inner
230 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
231 .map(|path| path.to_string_lossy().to_string())
232 .map_err(|e| PyIOError::new_err(format!("Failed to write bars: {e}")))
233 }
234
235 /// Write order book depth data to Parquet files.
236 ///
237 /// # Parameters
238 ///
239 /// - `data`: Vector of order book depths to write
240 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
241 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
242 ///
243 /// # Returns
244 ///
245 /// Returns the path of the created file as a string.
246 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
247 pub fn write_order_book_depths(
248 &self,
249 data: Vec<OrderBookDepth10>,
250 start: Option<u64>,
251 end: Option<u64>,
252 skip_disjoint_check: bool,
253 ) -> PyResult<String> {
254 // Convert u64 timestamps to UnixNanos
255 let start_nanos = start.map(UnixNanos::from);
256 let end_nanos = end.map(UnixNanos::from);
257
258 self.inner
259 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
260 .map(|path| path.to_string_lossy().to_string())
261 .map_err(|e| PyIOError::new_err(format!("Failed to write order book depths: {e}")))
262 }
263
264 /// Write mark price update data to Parquet files.
265 ///
266 /// # Parameters
267 ///
268 /// - `data`: Vector of mark price updates to write
269 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
270 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
271 ///
272 /// # Returns
273 ///
274 /// Returns the path of the created file as a string.
275 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
276 pub fn write_mark_price_updates(
277 &self,
278 data: Vec<MarkPriceUpdate>,
279 start: Option<u64>,
280 end: Option<u64>,
281 skip_disjoint_check: bool,
282 ) -> PyResult<String> {
283 // Convert u64 timestamps to UnixNanos
284 let start_nanos = start.map(UnixNanos::from);
285 let end_nanos = end.map(UnixNanos::from);
286
287 self.inner
288 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
289 .map(|path| path.to_string_lossy().to_string())
290 .map_err(|e| PyIOError::new_err(format!("Failed to write mark price updates: {e}")))
291 }
292
293 /// Write index price update data to Parquet files.
294 ///
295 /// # Parameters
296 ///
297 /// - `data`: Vector of index price updates to write
298 /// - `start`: Optional start timestamp override (nanoseconds since Unix epoch)
299 /// - `end`: Optional end timestamp override (nanoseconds since Unix epoch)
300 ///
301 /// # Returns
302 ///
303 /// Returns the path of the created file as a string.
304 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
305 pub fn write_index_price_updates(
306 &self,
307 data: Vec<IndexPriceUpdate>,
308 start: Option<u64>,
309 end: Option<u64>,
310 skip_disjoint_check: bool,
311 ) -> PyResult<String> {
312 // Convert u64 timestamps to UnixNanos
313 let start_nanos = start.map(UnixNanos::from);
314 let end_nanos = end.map(UnixNanos::from);
315
316 self.inner
317 .write_to_parquet(data, start_nanos, end_nanos, Some(skip_disjoint_check))
318 .map(|path| path.to_string_lossy().to_string())
319 .map_err(|e| PyIOError::new_err(format!("Failed to write index price updates: {e}")))
320 }
321
322 /// Write instruments to Parquet files in the catalog.
323 ///
324 /// Instruments are stored under `data/instruments/{instrument_id}/` using timestamp-ranged
325 /// parquet file names, allowing multiple historical versions of the same instrument to be
326 /// written across separate calls.
327 ///
328 /// # Parameters
329 ///
330 /// - `data`: A Python list of instrument objects (e.g. CurrencyPair, Equity).
331 ///
332 /// # Returns
333 ///
334 /// Returns a list of written file paths.
335 #[pyo3(signature = (data))]
336 pub fn write_instruments(&self, data: &Bound<'_, PyAny>) -> PyResult<Vec<String>> {
337 let py = data.py();
338 let list = data.cast::<PyList>()?;
339 let mut instruments = Vec::with_capacity(list.len());
340 for item in list.iter() {
341 let py_item: Py<PyAny> = item.unbind();
342 let instrument = pyobject_to_instrument_any(py, py_item)?;
343 instruments.push(instrument);
344 }
345 self.inner
346 .write_instruments(instruments)
347 .map(|paths| {
348 paths
349 .into_iter()
350 .map(|p| p.to_string_lossy().to_string())
351 .collect()
352 })
353 .map_err(|e| PyIOError::new_err(format!("Failed to write instruments: {e}")))
354 }
355
356 /// Query instruments from the catalog.
357 ///
358 /// # Parameters
359 ///
360 /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, returns all instruments.
361 /// - `start`: Optional inclusive lower bound for `ts_init` filtering.
362 /// - `end`: Optional inclusive upper bound for `ts_init` filtering.
363 ///
364 /// # Returns
365 ///
366 /// Returns a list of instrument objects (e.g. CurrencyPair, Equity).
367 #[pyo3(signature = (instrument_ids=None, start=None, end=None))]
368 #[expect(clippy::needless_pass_by_value)]
369 pub fn instruments(
370 &self,
371 instrument_ids: Option<Vec<String>>,
372 start: Option<u64>,
373 end: Option<u64>,
374 ) -> PyResult<Vec<Py<PyAny>>> {
375 let rust_instruments = self
376 .inner
377 .query_instruments_filtered(
378 instrument_ids.as_deref(),
379 start.map(UnixNanos::from),
380 end.map(UnixNanos::from),
381 )
382 .map_err(|e| PyIOError::new_err(format!("Failed to query instruments: {e}")))?;
383 Python::attach(|py| {
384 rust_instruments
385 .into_iter()
386 .map(|inst| instrument_any_to_pyobject(py, inst))
387 .collect()
388 })
389 }
390
391 /// Extend file names in the catalog with additional timestamp information.
392 ///
393 /// # Parameters
394 ///
395 /// - `data_cls`: The data class name
396 /// - `instrument_id`: Optional instrument ID filter
397 /// - `start`: Start timestamp (nanoseconds since Unix epoch)
398 /// - `end`: End timestamp (nanoseconds since Unix epoch)
399 #[pyo3(signature = (data_cls, instrument_id=None, *, start, end))]
400 #[expect(clippy::needless_pass_by_value)]
401 pub fn extend_file_name(
402 &self,
403 data_cls: &str,
404 instrument_id: Option<String>,
405 start: u64,
406 end: u64,
407 ) -> PyResult<()> {
408 // Convert u64 timestamps to UnixNanos
409 let start_nanos = UnixNanos::from(start);
410 let end_nanos = UnixNanos::from(end);
411
412 self.inner
413 .extend_file_name(data_cls, instrument_id.as_deref(), start_nanos, end_nanos)
414 .map_err(|e| PyIOError::new_err(format!("Failed to extend file name: {e}")))
415 }
416
417 /// Consolidate all data files in the catalog within the specified time range.
418 ///
419 /// # Parameters
420 ///
421 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
422 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
423 /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
424 /// - `deduplicate`: Optional flag to deduplicate rows when combining files
425 #[pyo3(signature = (start=None, end=None, ensure_contiguous_files=None, deduplicate=None))]
426 pub fn consolidate_catalog(
427 &self,
428 start: Option<u64>,
429 end: Option<u64>,
430 ensure_contiguous_files: Option<bool>,
431 deduplicate: Option<bool>,
432 ) -> PyResult<()> {
433 // Convert u64 timestamps to UnixNanos
434 let start_nanos = start.map(UnixNanos::from);
435 let end_nanos = end.map(UnixNanos::from);
436
437 self.inner
438 .consolidate_catalog(start_nanos, end_nanos, ensure_contiguous_files, deduplicate)
439 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate catalog: {e}")))
440 }
441
442 /// Consolidate data files for a specific data type within the specified time range.
443 ///
444 /// # Parameters
445 ///
446 /// - `type_name`: The data type name to consolidate
447 /// - `instrument_id`: Optional instrument ID filter
448 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
449 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
450 /// - `ensure_contiguous_files`: Optional flag to ensure files are contiguous
451 /// - `deduplicate`: Optional flag to deduplicate rows when combining files
452 #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None, ensure_contiguous_files=None, deduplicate=None))]
453 #[expect(clippy::needless_pass_by_value)]
454 pub fn consolidate_data(
455 &self,
456 type_name: &str,
457 instrument_id: Option<String>,
458 start: Option<u64>,
459 end: Option<u64>,
460 ensure_contiguous_files: Option<bool>,
461 deduplicate: Option<bool>,
462 ) -> PyResult<()> {
463 // Convert u64 timestamps to UnixNanos
464 let start_nanos = start.map(UnixNanos::from);
465 let end_nanos = end.map(UnixNanos::from);
466
467 self.inner
468 .consolidate_data(
469 type_name,
470 instrument_id.as_deref(),
471 start_nanos,
472 end_nanos,
473 ensure_contiguous_files,
474 deduplicate,
475 )
476 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data: {e}")))
477 }
478
479 /// Consolidate all data files in the catalog by splitting them into fixed time periods.
480 ///
481 /// This method identifies all leaf directories in the catalog that contain parquet files
482 /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
483 /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
484 /// and instrument IDs in the catalog.
485 ///
486 /// # Parameters
487 ///
488 /// - `period_nanos`: Optional period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
489 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
490 /// - `start`: Optional start timestamp for the consolidation range (nanoseconds since Unix epoch)
491 /// - `end`: Optional end timestamp for the consolidation range (nanoseconds since Unix epoch)
492 /// - `ensure_contiguous_files`: Optional flag to control file naming strategy
493 #[pyo3(signature = (period_nanos=None, start=None, end=None, ensure_contiguous_files=None))]
494 pub fn consolidate_catalog_by_period(
495 &mut self,
496 period_nanos: Option<u64>,
497 start: Option<u64>,
498 end: Option<u64>,
499 ensure_contiguous_files: Option<bool>,
500 ) -> PyResult<()> {
501 // Convert u64 timestamps to UnixNanos
502 let start_nanos = start.map(UnixNanos::from);
503 let end_nanos = end.map(UnixNanos::from);
504
505 self.inner
506 .consolidate_catalog_by_period(
507 period_nanos,
508 start_nanos,
509 end_nanos,
510 ensure_contiguous_files,
511 )
512 .map_err(|e| {
513 PyIOError::new_err(format!("Failed to consolidate catalog by period: {e}"))
514 })
515 }
516
517 /// Consolidate data files by splitting them into fixed time periods.
518 ///
519 /// This method queries data by period and writes consolidated files immediately,
520 /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
521 /// the function automatically splits those files to preserve all data.
522 ///
523 /// # Parameters
524 ///
525 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars")
526 /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments
527 /// - `period_nanos`: Optional period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
528 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
529 /// - `start`: Optional start timestamp for consolidation range (nanoseconds since Unix epoch)
530 /// - `end`: Optional end timestamp for consolidation range (nanoseconds since Unix epoch)
531 /// - `ensure_contiguous_files`: Optional flag to control file naming strategy
532 #[pyo3(signature = (type_name, identifier=None, period_nanos=None, start=None, end=None, ensure_contiguous_files=None))]
533 #[expect(clippy::needless_pass_by_value)]
534 pub fn consolidate_data_by_period(
535 &mut self,
536 type_name: &str,
537 identifier: Option<String>,
538 period_nanos: Option<u64>,
539 start: Option<u64>,
540 end: Option<u64>,
541 ensure_contiguous_files: Option<bool>,
542 ) -> PyResult<()> {
543 // Convert u64 timestamps to UnixNanos
544 let start_nanos = start.map(UnixNanos::from);
545 let end_nanos = end.map(UnixNanos::from);
546
547 self.inner
548 .consolidate_data_by_period(
549 type_name,
550 identifier.as_deref(),
551 period_nanos,
552 start_nanos,
553 end_nanos,
554 ensure_contiguous_files,
555 )
556 .map_err(|e| PyIOError::new_err(format!("Failed to consolidate data by period: {e}")))
557 }
558
559 /// Reset all catalog file names to their canonical form.
560 pub fn reset_all_file_names(&self) -> PyResult<()> {
561 self.inner
562 .reset_all_file_names()
563 .map_err(|e| PyIOError::new_err(format!("Failed to reset catalog file names: {e}")))
564 }
565
566 /// Reset data file names for a specific data class to their canonical form.
567 ///
568 /// # Parameters
569 ///
570 /// - `data_cls`: The data class name
571 /// - `instrument_id`: Optional instrument ID filter
572 #[pyo3(signature = (data_cls, instrument_id=None))]
573 #[expect(clippy::needless_pass_by_value)]
574 pub fn reset_data_file_names(
575 &self,
576 data_cls: &str,
577 instrument_id: Option<String>,
578 ) -> PyResult<()> {
579 self.inner
580 .reset_data_file_names(data_cls, instrument_id.as_deref())
581 .map_err(|e| PyIOError::new_err(format!("Failed to reset data file names: {e}")))
582 }
583
584 /// Delete data within a specified time range across the entire catalog.
585 ///
586 /// This method identifies all leaf directories in the catalog that contain parquet files
587 /// and deletes data within the specified time range from each directory. A leaf directory
588 /// is one that contains files but no subdirectories. This is a convenience method that
589 /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
590 ///
591 /// # Parameters
592 ///
593 /// - `start`: Optional start timestamp for the deletion range (nanoseconds since Unix epoch)
594 /// - `end`: Optional end timestamp for the deletion range (nanoseconds since Unix epoch)
595 ///
596 /// # Notes
597 ///
598 /// - This operation permanently removes data and cannot be undone
599 /// - The deletion process handles file intersections intelligently by splitting files
600 /// when they partially overlap with the deletion range
601 /// - Files completely within the deletion range are removed entirely
602 /// - Files partially overlapping the deletion range are split to preserve data outside the range
603 /// - This method is useful for bulk data cleanup operations across the entire catalog
604 /// - Empty directories are not automatically removed after deletion
605 #[pyo3(signature = (start=None, end=None))]
606 pub fn delete_catalog_range(&mut self, start: Option<u64>, end: Option<u64>) -> PyResult<()> {
607 // Convert u64 timestamps to UnixNanos
608 let start_nanos = start.map(UnixNanos::from);
609 let end_nanos = end.map(UnixNanos::from);
610
611 self.inner
612 .delete_catalog_range(start_nanos, end_nanos)
613 .map_err(|e| PyIOError::new_err(format!("Failed to delete catalog range: {e}")))
614 }
615
616 /// Delete data within a specified time range for a specific data type and instrument.
617 ///
618 /// This method identifies all parquet files that intersect with the specified time range
619 /// and handles them appropriately:
620 /// - Files completely within the range are deleted
621 /// - Files partially overlapping the range are split to preserve data outside the range
622 /// - The original intersecting files are removed after processing
623 ///
624 /// # Parameters
625 ///
626 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars")
627 /// - `instrument_id`: Optional instrument ID to delete data for. If None, deletes data across all instruments
628 /// - `start`: Optional start timestamp for the deletion range (nanoseconds since Unix epoch)
629 /// - `end`: Optional end timestamp for the deletion range (nanoseconds since Unix epoch)
630 ///
631 /// # Notes
632 ///
633 /// - This operation permanently removes data and cannot be undone
634 /// - Files that partially overlap the deletion range are split to preserve data outside the range
635 /// - The method ensures data integrity by using atomic operations where possible
636 /// - Empty directories are not automatically removed after deletion
637 #[pyo3(signature = (type_name, instrument_id=None, start=None, end=None))]
638 #[expect(clippy::needless_pass_by_value)]
639 pub fn delete_data_range(
640 &mut self,
641 type_name: &str,
642 instrument_id: Option<String>,
643 start: Option<u64>,
644 end: Option<u64>,
645 ) -> PyResult<()> {
646 // Convert u64 timestamps to UnixNanos
647 let start_nanos = start.map(UnixNanos::from);
648 let end_nanos = end.map(UnixNanos::from);
649
650 self.inner
651 .delete_data_range(type_name, instrument_id.as_deref(), start_nanos, end_nanos)
652 .map_err(|e| PyIOError::new_err(format!("Failed to delete data range: {e}")))
653 }
654
655 /// Write custom data to Parquet files.
656 ///
657 /// Requires `CustomData` wrappers. Callers must wrap raw custom objects in
658 /// `CustomData(data_type=DataType(cls, metadata=...), data=...)` before writing.
659 #[pyo3(signature = (data, start=None, end=None, skip_disjoint_check=false))]
660 pub fn write_custom_data(
661 &self,
662 _py: Python<'_>,
663 data: Vec<Bound<'_, PyAny>>,
664 start: Option<u64>,
665 end: Option<u64>,
666 skip_disjoint_check: bool,
667 ) -> PyResult<String> {
668 use nautilus_model::data::CustomData;
669
670 let mut custom_items: Vec<CustomData> = Vec::with_capacity(data.len());
671 for obj in data {
672 let custom = obj.extract::<CustomData>().map_err(|_| {
673 to_pytype_err(
674 "write_custom_data requires CustomData wrappers; wrap with CustomData(data_type=DataType(cls, metadata=...), data=...)",
675 )
676 })?;
677 custom_items.push(custom);
678 }
679
680 let start_nanos = start.map(UnixNanos::from);
681 let end_nanos = end.map(UnixNanos::from);
682
683 self.inner
684 .write_custom_data_batch(
685 custom_items,
686 start_nanos,
687 end_nanos,
688 Some(skip_disjoint_check),
689 )
690 .map(|path| path.to_string_lossy().to_string())
691 .map_err(|e| PyIOError::new_err(format!("Failed to write custom data: {e}")))
692 }
693
694 /// List all instrument IDs available in the catalog for a given data type.
695 pub fn list_instruments(&self, data_type: &str) -> PyResult<Vec<String>> {
696 self.inner
697 .list_instruments(data_type)
698 .map_err(|e| PyIOError::new_err(format!("Failed to list instruments: {e}")))
699 }
700
701 /// List all Parquet files in the catalog for a given data type and instrument.
702 pub fn list_parquet_files(
703 &self,
704 data_type: &str,
705 instrument_id: &str,
706 ) -> PyResult<Vec<String>> {
707 let directory = format!("data/{data_type}/{instrument_id}");
708 self.inner
709 .list_parquet_files(&directory)
710 .map_err(|e| PyIOError::new_err(format!("Failed to list parquet files: {e}")))
711 }
712
713 /// Query files in the catalog matching the specified criteria.
714 ///
715 /// # Parameters
716 ///
717 /// - `data_cls`: The data class name to query
718 /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings
719 /// (e.g., "EUR/USD.SIM") or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
720 /// For bars, partial matching is supported.
721 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
722 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
723 ///
724 /// # Returns
725 ///
726 /// Returns a list of file paths matching the criteria.
727 #[pyo3(signature = (data_cls, identifiers=None, start=None, end=None))]
728 pub fn query_files(
729 &self,
730 data_cls: &str,
731 identifiers: Option<Vec<String>>,
732 start: Option<u64>,
733 end: Option<u64>,
734 ) -> PyResult<Vec<String>> {
735 // Convert u64 timestamps to UnixNanos
736 let start_nanos = start.map(UnixNanos::from);
737 let end_nanos = end.map(UnixNanos::from);
738
739 self.inner
740 .query_files(data_cls, identifiers, start_nanos, end_nanos)
741 .map_err(|e| PyIOError::new_err(format!("Failed to query files list: {e}")))
742 }
743
744 /// Get missing time intervals for a data request.
745 ///
746 /// # Parameters
747 ///
748 /// - `start`: Start timestamp (nanoseconds since Unix epoch)
749 /// - `end`: End timestamp (nanoseconds since Unix epoch)
750 /// - `data_cls`: The data class name
751 /// - `instrument_id`: Optional instrument ID filter
752 ///
753 /// # Returns
754 ///
755 /// Returns a list of (start, end) timestamp tuples representing missing intervals.
756 #[pyo3(signature = (start, end, data_cls, instrument_id=None))]
757 #[expect(clippy::needless_pass_by_value)]
758 pub fn get_missing_intervals_for_request(
759 &self,
760 start: u64,
761 end: u64,
762 data_cls: &str,
763 instrument_id: Option<String>,
764 ) -> PyResult<Vec<(u64, u64)>> {
765 self.inner
766 .get_missing_intervals_for_request(start, end, data_cls, instrument_id.as_deref())
767 .map_err(|e| PyIOError::new_err(format!("Failed to get missing intervals: {e}")))
768 }
769
770 /// Query the first timestamp for a specific data class and instrument.
771 ///
772 /// # Parameters
773 ///
774 /// - `data_cls`: The data class name
775 /// - `instrument_id`: Optional instrument ID filter
776 ///
777 /// # Returns
778 ///
779 /// Returns the first timestamp as nanoseconds since Unix epoch, or None if no data exists.
780 #[pyo3(signature = (data_cls, instrument_id=None))]
781 #[expect(clippy::needless_pass_by_value)]
782 pub fn query_first_timestamp(
783 &self,
784 data_cls: &str,
785 instrument_id: Option<String>,
786 ) -> PyResult<Option<u64>> {
787 self.inner
788 .query_first_timestamp(data_cls, instrument_id.as_deref())
789 .map_err(|e| PyIOError::new_err(format!("Failed to query first timestamp: {e}")))
790 }
791
792 /// Query the last timestamp for a specific data class and instrument.
793 ///
794 /// # Parameters
795 ///
796 /// - `data_cls`: The data class name
797 /// - `instrument_id`: Optional instrument ID filter
798 ///
799 /// # Returns
800 ///
801 /// Returns the last timestamp as nanoseconds since Unix epoch, or None if no data exists.
802 #[pyo3(signature = (data_cls, instrument_id=None))]
803 #[expect(clippy::needless_pass_by_value)]
804 pub fn query_last_timestamp(
805 &self,
806 data_cls: &str,
807 instrument_id: Option<String>,
808 ) -> PyResult<Option<u64>> {
809 self.inner
810 .query_last_timestamp(data_cls, instrument_id.as_deref())
811 .map_err(|e| PyIOError::new_err(format!("Failed to query last timestamp: {e}")))
812 }
813
814 /// Get time intervals covered by data for a specific data class and instrument.
815 ///
816 /// # Parameters
817 ///
818 /// - `data_cls`: The data class name
819 /// - `instrument_id`: Optional instrument ID filter
820 ///
821 /// # Returns
822 ///
823 /// Returns a list of (start, end) timestamp tuples representing covered intervals.
824 #[pyo3(signature = (data_cls, instrument_id=None))]
825 #[expect(clippy::needless_pass_by_value)]
826 pub fn get_intervals(
827 &self,
828 data_cls: &str,
829 instrument_id: Option<String>,
830 ) -> PyResult<Vec<(u64, u64)>> {
831 self.inner
832 .get_intervals(data_cls, instrument_id.as_deref())
833 .map_err(|e| PyIOError::new_err(format!("Failed to get intervals: {e}")))
834 }
835
836 /// Query Parquet files for data matching the given criteria.
837 #[pyo3(signature = (data_type, identifiers=None, start=None, end=None, where_clause=None, files=None, optimize_file_loading=true))]
838 #[expect(clippy::too_many_arguments)]
839 pub fn query(
840 &mut self,
841 py: Python<'_>,
842 data_type: &str,
843 identifiers: Option<Vec<String>>,
844 start: Option<u64>,
845 end: Option<u64>,
846 where_clause: Option<&str>,
847 files: Option<Vec<String>>,
848 optimize_file_loading: bool,
849 ) -> PyResult<Vec<Py<PyAny>>> {
850 let start_nanos = start.map(UnixNanos::from);
851 let end_nanos = end.map(UnixNanos::from);
852
853 let data = match data_type {
854 "quotes" => {
855 let ticks = self
856 .inner
857 .query_typed_data::<QuoteTick>(
858 identifiers,
859 start_nanos,
860 end_nanos,
861 where_clause,
862 files,
863 optimize_file_loading,
864 )
865 .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
866 ticks.into_iter().map(Data::from).collect()
867 }
868 "trades" => {
869 let ticks = self
870 .inner
871 .query_typed_data::<TradeTick>(
872 identifiers,
873 start_nanos,
874 end_nanos,
875 where_clause,
876 files,
877 optimize_file_loading,
878 )
879 .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
880 ticks.into_iter().map(Data::from).collect()
881 }
882 "bars" => {
883 let bars = self
884 .inner
885 .query_typed_data::<Bar>(
886 identifiers,
887 start_nanos,
888 end_nanos,
889 where_clause,
890 files,
891 optimize_file_loading,
892 )
893 .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
894 bars.into_iter().map(Data::from).collect()
895 }
896 "order_book_deltas" => {
897 let deltas = self
898 .inner
899 .query_typed_data::<OrderBookDelta>(
900 identifiers,
901 start_nanos,
902 end_nanos,
903 where_clause,
904 files,
905 optimize_file_loading,
906 )
907 .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
908 deltas.into_iter().map(Data::from).collect()
909 }
910 "order_book_depths" => {
911 let depths = self
912 .inner
913 .query_typed_data::<OrderBookDepth10>(
914 identifiers,
915 start_nanos,
916 end_nanos,
917 where_clause,
918 files,
919 optimize_file_loading,
920 )
921 .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
922 depths.into_iter().map(Data::from).collect()
923 }
924 "index_prices" => {
925 let prices = self
926 .inner
927 .query_typed_data::<IndexPriceUpdate>(
928 identifiers,
929 start_nanos,
930 end_nanos,
931 where_clause,
932 files,
933 optimize_file_loading,
934 )
935 .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
936 prices.into_iter().map(Data::from).collect()
937 }
938 "mark_prices" => {
939 let prices = self
940 .inner
941 .query_typed_data::<MarkPriceUpdate>(
942 identifiers,
943 start_nanos,
944 end_nanos,
945 where_clause,
946 files,
947 optimize_file_loading,
948 )
949 .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
950 prices.into_iter().map(Data::from).collect()
951 }
952 "instrument_status" => {
953 let statuses = self
954 .inner
955 .query_typed_data::<InstrumentStatus>(
956 identifiers,
957 start_nanos,
958 end_nanos,
959 where_clause,
960 files,
961 optimize_file_loading,
962 )
963 .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
964 statuses.into_iter().map(Data::from).collect()
965 }
966 "instrument_closes" => {
967 let closes = self
968 .inner
969 .query_typed_data::<InstrumentClose>(
970 identifiers,
971 start_nanos,
972 end_nanos,
973 where_clause,
974 files,
975 optimize_file_loading,
976 )
977 .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?;
978 closes.into_iter().map(Data::from).collect()
979 }
980 _ => py
981 .detach(|| {
982 self.inner.query_custom_data_dynamic(
983 data_type,
984 identifiers.as_deref(),
985 start_nanos,
986 end_nanos,
987 where_clause,
988 files.clone(),
989 optimize_file_loading,
990 )
991 })
992 .map_err(|e| PyIOError::new_err(format!("Query failed: {e}")))?,
993 };
994
995 let mut python_objects = Vec::new();
996 for item in data {
997 python_objects.push(data_to_pyobject(py, item)?);
998 }
999 Ok(python_objects)
1000 }
1001
1002 /// Query quote tick data from Parquet files.
1003 ///
1004 /// # Parameters
1005 ///
1006 /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings
1007 /// (e.g., "EUR/USD.SIM") or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1008 /// For bars, partial matching is supported.
1009 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1010 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1011 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1012 ///
1013 /// # Returns
1014 ///
1015 /// Returns a vector of `QuoteTick` objects matching the query criteria.
1016 #[pyo3(signature = (identifiers=None, start=None, end=None, where_clause=None))]
1017 pub fn query_quote_ticks(
1018 &mut self,
1019 identifiers: Option<Vec<String>>,
1020 start: Option<u64>,
1021 end: Option<u64>,
1022 where_clause: Option<&str>,
1023 ) -> PyResult<Vec<QuoteTick>> {
1024 let start_nanos = start.map(UnixNanos::from);
1025 let end_nanos = end.map(UnixNanos::from);
1026
1027 self.inner
1028 .query_typed_data::<QuoteTick>(
1029 identifiers,
1030 start_nanos,
1031 end_nanos,
1032 where_clause,
1033 None,
1034 true, // optimize_file_loading=true for directory-based registration (default)
1035 )
1036 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1037 }
1038
1039 /// Query trade tick data from Parquet files.
1040 ///
1041 /// # Parameters
1042 ///
1043 /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings
1044 /// (e.g., "EUR/USD.SIM") or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1045 /// For bars, partial matching is supported.
1046 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1047 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1048 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1049 ///
1050 /// # Returns
1051 ///
1052 /// Returns a vector of `TradeTick` objects matching the query criteria.
1053 #[pyo3(signature = (identifiers=None, start=None, end=None, where_clause=None))]
1054 pub fn query_trade_ticks(
1055 &mut self,
1056 identifiers: Option<Vec<String>>,
1057 start: Option<u64>,
1058 end: Option<u64>,
1059 where_clause: Option<&str>,
1060 ) -> PyResult<Vec<TradeTick>> {
1061 let start_nanos = start.map(UnixNanos::from);
1062 let end_nanos = end.map(UnixNanos::from);
1063
1064 self.inner
1065 .query_typed_data::<TradeTick>(
1066 identifiers,
1067 start_nanos,
1068 end_nanos,
1069 where_clause,
1070 None,
1071 true, // optimize_file_loading=true for directory-based registration (default)
1072 )
1073 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1074 }
1075
1076 /// Query order book delta data from Parquet files.
1077 ///
1078 /// # Parameters
1079 ///
1080 /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings
1081 /// (e.g., "EUR/USD.SIM") or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1082 /// For bars, partial matching is supported.
1083 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1084 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1085 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1086 ///
1087 /// # Returns
1088 ///
1089 /// Returns a vector of `OrderBookDelta` objects matching the query criteria.
1090 #[pyo3(signature = (identifiers=None, start=None, end=None, where_clause=None))]
1091 pub fn query_order_book_deltas(
1092 &mut self,
1093 identifiers: Option<Vec<String>>,
1094 start: Option<u64>,
1095 end: Option<u64>,
1096 where_clause: Option<&str>,
1097 ) -> PyResult<Vec<OrderBookDelta>> {
1098 let start_nanos = start.map(UnixNanos::from);
1099 let end_nanos = end.map(UnixNanos::from);
1100
1101 self.inner
1102 .query_typed_data::<OrderBookDelta>(
1103 identifiers,
1104 start_nanos,
1105 end_nanos,
1106 where_clause,
1107 None,
1108 true, // optimize_file_loading=true for directory-based registration (default)
1109 )
1110 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1111 }
1112
1113 /// Query bar data from Parquet files.
1114 ///
1115 /// # Parameters
1116 ///
1117 /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings
1118 /// (e.g., "EUR/USD.SIM") or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1119 /// For bars, partial matching is supported (e.g., "EUR/USD.SIM" will match all bar types for that instrument).
1120 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1121 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1122 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1123 ///
1124 /// # Returns
1125 ///
1126 /// Returns a vector of Bar objects matching the query criteria.
1127 #[pyo3(signature = (identifiers=None, start=None, end=None, where_clause=None))]
1128 pub fn query_bars(
1129 &mut self,
1130 identifiers: Option<Vec<String>>,
1131 start: Option<u64>,
1132 end: Option<u64>,
1133 where_clause: Option<&str>,
1134 ) -> PyResult<Vec<Bar>> {
1135 let start_nanos = start.map(UnixNanos::from);
1136 let end_nanos = end.map(UnixNanos::from);
1137
1138 self.inner
1139 .query_typed_data::<Bar>(
1140 identifiers,
1141 start_nanos,
1142 end_nanos,
1143 where_clause,
1144 None,
1145 true, // optimize_file_loading=true for directory-based registration (default)
1146 )
1147 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1148 }
1149
1150 /// Query order book depth data from Parquet files.
1151 ///
1152 /// # Parameters
1153 ///
1154 /// - `instrument_ids`: Optional list of instrument IDs to filter by
1155 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1156 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1157 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1158 ///
1159 /// # Returns
1160 ///
1161 /// Returns a vector of `OrderBookDepth10` objects matching the query criteria.
1162 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
1163 pub fn query_order_book_depths(
1164 &mut self,
1165 instrument_ids: Option<Vec<String>>,
1166 start: Option<u64>,
1167 end: Option<u64>,
1168 where_clause: Option<&str>,
1169 ) -> PyResult<Vec<OrderBookDepth10>> {
1170 let start_nanos = start.map(UnixNanos::from);
1171 let end_nanos = end.map(UnixNanos::from);
1172
1173 self.inner
1174 .query_typed_data::<OrderBookDepth10>(
1175 instrument_ids,
1176 start_nanos,
1177 end_nanos,
1178 where_clause,
1179 None,
1180 true, // optimize_file_loading=true for directory-based registration (default)
1181 )
1182 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1183 }
1184
1185 /// Query mark price update data from Parquet files.
1186 ///
1187 /// # Parameters
1188 ///
1189 /// - `instrument_ids`: Optional list of instrument IDs to filter by
1190 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1191 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1192 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1193 ///
1194 /// # Returns
1195 ///
1196 /// Returns a vector of `MarkPriceUpdate` objects matching the query criteria.
1197 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
1198 pub fn query_mark_price_updates(
1199 &mut self,
1200 instrument_ids: Option<Vec<String>>,
1201 start: Option<u64>,
1202 end: Option<u64>,
1203 where_clause: Option<&str>,
1204 ) -> PyResult<Vec<MarkPriceUpdate>> {
1205 let start_nanos = start.map(UnixNanos::from);
1206 let end_nanos = end.map(UnixNanos::from);
1207
1208 self.inner
1209 .query_typed_data::<MarkPriceUpdate>(
1210 instrument_ids,
1211 start_nanos,
1212 end_nanos,
1213 where_clause,
1214 None,
1215 true, // optimize_file_loading=true for directory-based registration (default)
1216 )
1217 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1218 }
1219
1220 /// Query index price update data from Parquet files.
1221 ///
1222 /// # Parameters
1223 ///
1224 /// - `instrument_ids`: Optional list of instrument IDs to filter by
1225 /// - `start`: Optional start timestamp (nanoseconds since Unix epoch)
1226 /// - `end`: Optional end timestamp (nanoseconds since Unix epoch)
1227 /// - `where_clause`: Optional SQL WHERE clause for additional filtering
1228 ///
1229 /// # Returns
1230 ///
1231 /// Returns a vector of `IndexPriceUpdate` objects matching the query criteria.
1232 #[pyo3(signature = (instrument_ids=None, start=None, end=None, where_clause=None))]
1233 pub fn query_index_price_updates(
1234 &mut self,
1235 instrument_ids: Option<Vec<String>>,
1236 start: Option<u64>,
1237 end: Option<u64>,
1238 where_clause: Option<&str>,
1239 ) -> PyResult<Vec<IndexPriceUpdate>> {
1240 let start_nanos = start.map(UnixNanos::from);
1241 let end_nanos = end.map(UnixNanos::from);
1242
1243 self.inner
1244 .query_typed_data::<IndexPriceUpdate>(
1245 instrument_ids,
1246 start_nanos,
1247 end_nanos,
1248 where_clause,
1249 None,
1250 true, // optimize_file_loading=true for directory-based registration (default)
1251 )
1252 .map_err(|e| PyIOError::new_err(format!("Failed to query data: {e}")))
1253 }
1254
1255 /// List all data types available in the catalog.
1256 ///
1257 /// # Returns
1258 ///
1259 /// Returns a list of data type names (as directory stems) in the catalog.
1260 pub fn list_data_types(&self) -> PyResult<Vec<String>> {
1261 self.inner
1262 .list_data_types()
1263 .map_err(|e| PyIOError::new_err(format!("Failed to list data types: {e}")))
1264 }
1265
1266 /// List all live run IDs available in the catalog.
1267 ///
1268 /// # Returns
1269 ///
1270 /// Returns a list of live run IDs (as directory stems) in the catalog.
1271 pub fn list_live_runs(&self) -> PyResult<Vec<String>> {
1272 self.inner
1273 .list_live_runs()
1274 .map_err(|e| PyIOError::new_err(format!("Failed to list live runs: {e}")))
1275 }
1276
1277 /// List all backtest run IDs available in the catalog.
1278 ///
1279 /// # Returns
1280 ///
1281 /// Returns a list of backtest run IDs (as directory stems) in the catalog.
1282 pub fn list_backtest_runs(&self) -> PyResult<Vec<String>> {
1283 self.inner
1284 .list_backtest_runs()
1285 .map_err(|e| PyIOError::new_err(format!("Failed to list backtest runs: {e}")))
1286 }
1287
1288 /// List all backtest run instances available in the catalog.
1289 pub fn list_backtests(&self) -> PyResult<Vec<String>> {
1290 self.inner
1291 .list_backtest_runs()
1292 .map_err(|e| PyIOError::new_err(format!("Failed to list backtests: {e}")))
1293 }
1294
1295 /// Read data from a live run instance.
1296 ///
1297 /// # Parameters
1298 ///
1299 /// - `instance_id`: The ID of the live run instance
1300 ///
1301 /// # Returns
1302 ///
1303 /// Returns a list of data objects from the live run, sorted by timestamp.
1304 #[pyo3(signature = (instance_id))]
1305 pub fn read_live_run(&self, py: Python<'_>, instance_id: &str) -> PyResult<Vec<Py<PyAny>>> {
1306 let data = self
1307 .inner
1308 .read_live_run(instance_id)
1309 .map_err(|e| PyIOError::new_err(format!("Failed to read live run: {e}")))?;
1310
1311 let mut python_objects = Vec::new();
1312 for item in data {
1313 python_objects.push(data_to_pyobject(py, item)?);
1314 }
1315 Ok(python_objects)
1316 }
1317
1318 /// Read data from a backtest run instance.
1319 ///
1320 /// # Parameters
1321 ///
1322 /// - `instance_id`: The ID of the backtest run instance
1323 ///
1324 /// # Returns
1325 ///
1326 /// Returns a list of data objects from the backtest run, sorted by timestamp.
1327 #[pyo3(signature = (instance_id))]
1328 pub fn read_backtest(&self, py: Python<'_>, instance_id: &str) -> PyResult<Vec<Py<PyAny>>> {
1329 let data = self
1330 .inner
1331 .read_backtest(instance_id)
1332 .map_err(|e| PyIOError::new_err(format!("Failed to read backtest: {e}")))?;
1333
1334 let mut python_objects = Vec::new();
1335 for item in data {
1336 python_objects.push(data_to_pyobject(py, item)?);
1337 }
1338 Ok(python_objects)
1339 }
1340
1341 /// Convert stream data from feather files to parquet files.
1342 ///
1343 /// This method reads data from feather files generated during a backtest or live run
1344 /// and writes it to the catalog in parquet format. It's useful for converting temporary
1345 /// stream data into a more permanent and queryable format.
1346 ///
1347 /// # Parameters
1348 ///
1349 /// - `instance_id`: The ID of the backtest or live run instance
1350 /// - `data_cls`: The data class name (e.g., "quotes", "trades", "bars")
1351 /// - `subdirectory`: Optional subdirectory containing the feather files. Either "backtest" or "live" (default: "backtest")
1352 /// - `identifiers`: Optional list of identifiers to filter by (instrument IDs or bar types)
1353 /// - `use_ts_event_for_ts_init`: If true, replaces the `ts_init` column with `ts_event` column values before deserializing
1354 ///
1355 /// # Returns
1356 ///
1357 /// Returns nothing on success.
1358 ///
1359 /// # Examples
1360 ///
1361 /// ```python
1362 /// # Convert backtest stream data to parquet
1363 /// catalog.convert_stream_to_data(
1364 /// "instance-123",
1365 /// "quotes",
1366 /// subdirectory="backtest"
1367 /// )
1368 ///
1369 /// # Convert live run data with identifier filtering
1370 /// catalog.convert_stream_to_data(
1371 /// "instance-456",
1372 /// "trades",
1373 /// subdirectory="live",
1374 /// identifiers=["EUR/USD.SIM"]
1375 /// )
1376 /// ```
1377 #[pyo3(signature = (instance_id, data_cls, subdirectory=None, identifiers=None, use_ts_event_for_ts_init=false))]
1378 #[expect(clippy::needless_pass_by_value)]
1379 pub fn convert_stream_to_data(
1380 &mut self,
1381 instance_id: &str,
1382 data_cls: &str,
1383 subdirectory: Option<&str>,
1384 identifiers: Option<Vec<String>>,
1385 use_ts_event_for_ts_init: bool,
1386 ) -> PyResult<()> {
1387 let subdir = subdirectory.unwrap_or("backtest");
1388
1389 match self.inner.convert_stream_to_data(
1390 instance_id,
1391 data_cls,
1392 Some(subdir),
1393 identifiers.as_deref(),
1394 use_ts_event_for_ts_init,
1395 ) {
1396 Ok(()) => Ok(()),
1397 Err(e) => Err(PyIOError::new_err(format!(
1398 "Failed to convert stream to data: {e}"
1399 ))),
1400 }
1401 }
1402
1403 /// Query custom data from Parquet files.
1404 #[pyo3(signature = (type_name, identifiers=None, start=None, end=None, where_clause=None))]
1405 #[expect(clippy::needless_pass_by_value)]
1406 pub fn query_custom_data(
1407 &mut self,
1408 py: Python<'_>,
1409 type_name: &str,
1410 identifiers: Option<Vec<String>>,
1411 start: Option<u64>,
1412 end: Option<u64>,
1413 where_clause: Option<&str>,
1414 ) -> PyResult<Vec<Py<PyAny>>> {
1415 let start_nanos = start.map(UnixNanos::from);
1416 let end_nanos = end.map(UnixNanos::from);
1417
1418 let data = py
1419 .detach(|| {
1420 self.inner.query_custom_data_dynamic(
1421 type_name,
1422 identifiers.as_deref(),
1423 start_nanos,
1424 end_nanos,
1425 where_clause,
1426 None,
1427 true,
1428 )
1429 })
1430 .map_err(|e| PyIOError::new_err(format!("Failed to query custom data: {e}")))?;
1431
1432 let mut python_objects = Vec::new();
1433
1434 for item in data {
1435 let py_obj: Py<PyAny> = match item {
1436 Data::Custom(custom) => Py::new(py, custom.clone())?.into_any(),
1437 _ => return Err(PyIOError::new_err("Expected custom data")),
1438 };
1439 python_objects.push(py_obj);
1440 }
1441 Ok(python_objects)
1442 }
1443}