nautilus_persistence/backend/catalog.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parquet data catalog for efficient storage and retrieval of financial market data.
17//!
18//! This module provides a data catalog implementation that uses Apache Parquet
19//! format for storing financial market data with object store backends. The catalog supports
20//! various data types including quotes, trades, bars, order book data, and other market events.
21//!
22//! # Key Features
23//!
24//! - **Object Store Integration**: Works with local filesystems, S3, and other object stores.
25//! - **Data Type Support**: Handles all major financial data types (quotes, trades, bars, etc.).
26//! - **Time-based Organization**: Organizes data by timestamp ranges for efficient querying.
27//! - **Consolidation**: Merges multiple files to optimize storage and query performance.
28//! - **Validation**: Ensures data integrity with timestamp ordering and interval validation.
29//!
30//! # Architecture
31//!
32//! The catalog organizes data in a hierarchical structure:
33//! ```text
34//! data/
35//! ├── quotes/
36//! │ └── INSTRUMENT_ID/
37//! │ └── start_ts-end_ts.parquet
38//! ├── trades/
39//! │ └── INSTRUMENT_ID/
40//! │ └── start_ts-end_ts.parquet
41//! └── bars/
42//! └── INSTRUMENT_ID/
43//! └── start_ts-end_ts.parquet
44//! ```
45//!
46//! # Usage
47//!
48//! ```rust,no_run
49//! use std::path::PathBuf;
50//! use nautilus_persistence::backend::catalog::ParquetDataCatalog;
51//!
52//! // Create a new catalog
53//! let catalog = ParquetDataCatalog::new(
54//! PathBuf::from("/path/to/data"),
55//! None, // storage_options
56//! Some(5000), // batch_size
57//! None, // compression (defaults to SNAPPY)
58//! None, // max_row_group_size (defaults to 5000)
59//! );
60//!
61//! // Write data to the catalog
62//! // catalog.write_to_parquet(data, None, None)?;
63//! ```
64
65use std::{
66 borrow::Cow,
67 collections::{BTreeMap, HashSet},
68 fmt::Debug,
69 io::Cursor,
70 ops::Bound as RangeBound,
71 path::{Path, PathBuf},
72 sync::Arc,
73};
74
75use ahash::AHashMap;
76use datafusion::arrow::record_batch::RecordBatch;
77use futures::StreamExt;
78use itertools::Itertools;
79use nautilus_common::live::get_runtime;
80use nautilus_core::{
81 UnixNanos,
82 datetime::{iso8601_to_unix_nanos, unix_nanos_to_iso8601},
83 string::{conversions::to_snake_case, urlencoding},
84};
85use nautilus_model::{
86 data::{
87 Bar, CustomData, Data, FundingRateUpdate, HasTsInit, IndexPriceUpdate, InstrumentStatus,
88 MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
89 close::InstrumentClose, is_monotonically_increasing_by_init, to_variant,
90 },
91 events::{
92 AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied,
93 OrderEmulated, OrderExpired, OrderFilled, OrderInitialized, OrderModifyRejected,
94 OrderPendingCancel, OrderPendingUpdate, OrderRejected, OrderReleased, OrderSnapshot,
95 OrderSubmitted, OrderTriggered, OrderUpdated, PositionAdjusted, PositionChanged,
96 PositionClosed, PositionOpened, PositionSnapshot,
97 },
98 instruments::InstrumentAny,
99 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
100};
101use nautilus_serialization::arrow::{
102 DecodeDataFromRecordBatch, DecodeTypedFromRecordBatch, EncodeToRecordBatch,
103 custom::CustomDataDecoder,
104};
105use object_store::{ObjectStore, ObjectStoreExt, path::Path as ObjectPath};
106use serde::Serialize;
107use unbounded_interval_tree::interval_tree::IntervalTree;
108
109use super::{
110 custom::{
111 custom_data_path_components, decode_batch_to_data as orchestration_decode_batch_to_data,
112 decode_custom_batches_to_data as orchestration_decode_custom_batches_to_data,
113 prepare_custom_data_batch,
114 },
115 session::{self, DataBackendSession, QueryResult, build_query},
116};
117use crate::parquet::{
118 is_remote_uri_scheme, read_parquet_from_object_store, remote_full_uri, remote_store_root_url,
119 write_batches_to_object_store,
120};
121
122/// A high-performance data catalog for storing and retrieving financial market data using Apache Parquet format.
123///
124/// The `ParquetDataCatalog` provides a solution for managing large volumes of financial
125/// market data with efficient storage, querying, and consolidation capabilities. It supports various
126/// object store backends including local filesystems, AWS S3, and other cloud storage providers.
127///
128/// # Features
129///
130/// - **Efficient Storage**: Uses Apache Parquet format with configurable compression.
131/// - **Object Store Backend**: Supports multiple storage backends through the `object_store` crate.
132/// - **Time-based Organization**: Organizes data by timestamp ranges for optimal query performance.
133/// - **Data Validation**: Ensures timestamp ordering and interval consistency.
134/// - **Consolidation**: Merges multiple files to reduce storage overhead and improve query speed.
135/// - **Type Safety**: Strongly typed data handling with compile-time guarantees.
136///
137/// # Data Organization
138///
139/// Data is organized hierarchically by data type and instrument:
140/// - `data/{data_type}/{instrument_id}/{start_ts}-{end_ts}.parquet`.
141/// - Files are named with their timestamp ranges for efficient range queries.
142/// - Intervals are validated to be disjoint to prevent data overlap.
143///
144/// # Performance Considerations
145///
146/// - **Batch Size**: Controls memory usage during data processing.
147/// - **Compression**: SNAPPY compression provides good balance of speed and size.
148/// - **Row Group Size**: Affects query performance and memory usage.
149/// - **File Consolidation**: Reduces the number of files for better query performance.
150pub struct ParquetDataCatalog {
151 /// The base path for data storage within the object store.
152 pub base_path: String,
153 /// The original URI provided when creating the catalog.
154 pub original_uri: String,
155 /// The object store backend for data persistence.
156 pub object_store: Arc<dyn ObjectStore>,
157 /// The DataFusion session for query execution.
158 pub session: DataBackendSession,
159 /// The number of records to process in each batch.
160 pub batch_size: usize,
161 /// The compression algorithm used for Parquet files.
162 pub compression: parquet::basic::Compression,
163 /// The maximum number of rows in each Parquet row group.
164 pub max_row_group_size: usize,
165}
166
167impl Debug for ParquetDataCatalog {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 f.debug_struct(stringify!(ParquetDataCatalog))
170 .field("base_path", &self.base_path)
171 .finish()
172 }
173}
174
175impl ParquetDataCatalog {
176 /// Creates a new [`ParquetDataCatalog`] instance from a local file path.
177 ///
178 /// This is a convenience constructor that converts a local path to a URI format
179 /// and delegates to [`Self::from_uri`].
180 ///
181 /// # Parameters
182 ///
183 /// - `base_path`: The base directory path for data storage.
184 /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options.
185 /// - `batch_size`: Number of records to process in each batch (default: 5000).
186 /// - `compression`: Parquet compression algorithm (default: SNAPPY).
187 /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
188 ///
189 /// # Panics
190 ///
191 /// Panics if the path cannot be converted to a valid URI or if the object store
192 /// cannot be created from the path.
193 ///
194 /// # Examples
195 ///
196 /// ```rust,no_run
197 /// use std::path::PathBuf;
198 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
199 ///
200 /// let catalog = ParquetDataCatalog::new(
201 /// PathBuf::from("/tmp/nautilus_data"),
202 /// None, // no storage options
203 /// Some(1000), // smaller batch size
204 /// None, // default compression
205 /// None, // default row group size
206 /// );
207 /// ```
208 #[must_use]
209 pub fn new(
210 base_path: &Path,
211 storage_options: Option<AHashMap<String, String>>,
212 batch_size: Option<usize>,
213 compression: Option<parquet::basic::Compression>,
214 max_row_group_size: Option<usize>,
215 ) -> Self {
216 let path_str = base_path.to_string_lossy().to_string();
217 Self::from_uri(
218 &path_str,
219 storage_options,
220 batch_size,
221 compression,
222 max_row_group_size,
223 )
224 .expect("Failed to create catalog from path")
225 }
226
227 /// Creates a new [`ParquetDataCatalog`] instance from a URI with optional storage options.
228 ///
229 /// Supports various URI schemes including local file paths and multiple cloud storage backends
230 /// supported by the `object_store` crate.
231 ///
232 /// # Supported URI Schemes
233 ///
234 /// - **AWS S3**: `s3://bucket/path`.
235 /// - **Google Cloud Storage**: `gs://bucket/path` or `gcs://bucket/path`.
236 /// - **Azure Blob Storage**: `az://container/path` or `abfs://container@account.dfs.core.windows.net/path`.
237 /// - **HTTP/WebDAV**: `http://` or `https://`.
238 /// - **Local files**: `file://path` or plain paths.
239 ///
240 /// # Parameters
241 ///
242 /// - `uri`: The URI for the data storage location.
243 /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
244 /// - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
245 /// - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
246 /// - For Azure: `account_name`, `account_key`, `sas_token`, etc.
247 /// - `batch_size`: Number of records to process in each batch (default: 5000).
248 /// - `compression`: Parquet compression algorithm (default: SNAPPY).
249 /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
250 ///
251 /// # Errors
252 ///
253 /// Returns an error if:
254 /// - The URI format is invalid or unsupported.
255 /// - The object store cannot be created or accessed.
256 /// - Authentication fails for cloud storage backends.
257 ///
258 /// # Examples
259 ///
260 /// ```rust,no_run
261 /// use ahash::AHashMap;
262 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
263 ///
264 /// // Local filesystem
265 /// let local_catalog = ParquetDataCatalog::from_uri(
266 /// "/tmp/nautilus_data",
267 /// None, None, None, None
268 /// )?;
269 ///
270 /// // S3 bucket
271 /// let s3_catalog = ParquetDataCatalog::from_uri(
272 /// "s3://my-bucket/nautilus-data",
273 /// None, None, None, None
274 /// )?;
275 ///
276 /// // Google Cloud Storage
277 /// let gcs_catalog = ParquetDataCatalog::from_uri(
278 /// "gs://my-bucket/nautilus-data",
279 /// None, None, None, None
280 /// )?;
281 ///
282 /// // Azure Blob Storage
283 /// let azure_catalog = ParquetDataCatalog::from_uri(
284 /// "az://container/nautilus-data",
285 /// storage_options, None, None, None
286 /// )?;
287 ///
288 /// // S3 with custom endpoint and credentials
289 /// let mut storage_options = HashMap::new();
290 /// storage_options.insert("endpoint_url".to_string(), "https://my-s3-endpoint.com".to_string());
291 /// storage_options.insert("access_key_id".to_string(), "my-key".to_string());
292 /// storage_options.insert("secret_access_key".to_string(), "my-secret".to_string());
293 ///
294 /// let s3_catalog = ParquetDataCatalog::from_uri(
295 /// "s3://my-bucket/nautilus-data",
296 /// Some(storage_options),
297 /// None, None, None,
298 /// )?;
299 /// # Ok::<(), anyhow::Error>(())
300 /// ```
301 pub fn from_uri(
302 uri: &str,
303 storage_options: Option<AHashMap<String, String>>,
304 batch_size: Option<usize>,
305 compression: Option<parquet::basic::Compression>,
306 max_row_group_size: Option<usize>,
307 ) -> anyhow::Result<Self> {
308 let batch_size = batch_size.unwrap_or(5000);
309 let compression = compression.unwrap_or(parquet::basic::Compression::SNAPPY);
310 let max_row_group_size = max_row_group_size.unwrap_or(5000);
311
312 let location =
313 crate::parquet::create_object_store_location_from_path(uri, storage_options)?;
314
315 Ok(Self {
316 base_path: location.base_path,
317 original_uri: location.original_uri,
318 object_store: location.object_store,
319 session: session::DataBackendSession::new(batch_size),
320 batch_size,
321 compression,
322 max_row_group_size,
323 })
324 }
325
326 /// Returns the base path of the catalog for testing purposes.
327 #[must_use]
328 pub fn get_base_path(&self) -> String {
329 self.base_path.clone()
330 }
331
332 /// Resets the backend session to clear any cached table registrations.
333 ///
334 /// This is useful during catalog operations when files are being modified
335 /// and we need to ensure fresh data is loaded.
336 pub fn reset_session(&mut self) {
337 self.session.clear_registered_tables();
338 }
339
340 /// Writes mixed data types to the catalog by separating them into type-specific collections.
341 ///
342 /// This method takes a heterogeneous collection of market data and separates it by type,
343 /// then writes each type to its appropriate location in the catalog. This is useful when
344 /// processing mixed data streams or bulk data imports.
345 ///
346 /// # Parameters
347 ///
348 /// - `data`: A vector of mixed [`Data`] enum variants.
349 /// - `start`: Optional start timestamp to override the data's natural range.
350 /// - `end`: Optional end timestamp to override the data's natural range.
351 ///
352 /// # Notes
353 ///
354 /// - Data is automatically sorted by type before writing.
355 /// - Each data type is written to its own directory structure.
356 /// - Instrument data handling is not yet implemented (TODO).
357 ///
358 /// # Examples
359 ///
360 /// ```rust,no_run
361 /// use nautilus_model::data::Data;
362 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
363 ///
364 /// let catalog = ParquetDataCatalog::new(/* ... */);
365 /// let mixed_data: Vec<Data> = vec![/* mixed data types */];
366 ///
367 /// catalog.write_data_enum(mixed_data, None, None)?;
368 /// ```
369 pub fn write_data_enum(
370 &self,
371 data: &[Data],
372 start: Option<UnixNanos>,
373 end: Option<UnixNanos>,
374 skip_disjoint_check: Option<bool>,
375 ) -> anyhow::Result<()> {
376 let mut deltas: Vec<OrderBookDelta> = Vec::new();
377 let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
378 let mut quotes: Vec<QuoteTick> = Vec::new();
379 let mut trades: Vec<TradeTick> = Vec::new();
380 let mut bars: Vec<Bar> = Vec::new();
381 let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
382 let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
383 let mut statuses: Vec<InstrumentStatus> = Vec::new();
384 let mut closes: Vec<InstrumentClose> = Vec::new();
385 // Group custom data by full DataType identity (type_name + identifier + metadata)
386 // so each batch is written to the correct path with consistent schema/metadata.
387 let custom_data_key = |c: &CustomData| {
388 (
389 c.data_type.type_name().to_string(),
390 c.data_type.identifier().map(String::from),
391 c.data_type.metadata_str(),
392 )
393 };
394 let mut custom_data: AHashMap<(String, Option<String>, String), Vec<CustomData>> =
395 AHashMap::new();
396
397 for d in data.iter().cloned() {
398 match d {
399 Data::Deltas(_) => {}
400 Data::Delta(d) => {
401 deltas.push(d);
402 }
403 Data::Depth10(d) => {
404 depth10s.push(*d);
405 }
406 Data::Quote(d) => {
407 quotes.push(d);
408 }
409 Data::Trade(d) => {
410 trades.push(d);
411 }
412 Data::Bar(d) => {
413 bars.push(d);
414 }
415 Data::MarkPriceUpdate(p) => {
416 mark_prices.push(p);
417 }
418 Data::IndexPriceUpdate(p) => {
419 index_prices.push(p);
420 }
421 Data::InstrumentStatus(s) => {
422 statuses.push(s);
423 }
424 Data::InstrumentClose(c) => {
425 closes.push(c);
426 }
427 Data::Custom(c) => {
428 custom_data.entry(custom_data_key(&c)).or_default().push(c);
429 }
430 }
431 }
432
433 // Instruments are handled separately via write_instruments method
434
435 self.write_to_parquet(deltas, start, end, skip_disjoint_check)?;
436 self.write_to_parquet(depth10s, start, end, skip_disjoint_check)?;
437 self.write_to_parquet(quotes, start, end, skip_disjoint_check)?;
438 self.write_to_parquet(trades, start, end, skip_disjoint_check)?;
439 self.write_to_parquet(bars, start, end, skip_disjoint_check)?;
440 self.write_to_parquet(mark_prices, start, end, skip_disjoint_check)?;
441 self.write_to_parquet(index_prices, start, end, skip_disjoint_check)?;
442 self.write_to_parquet(statuses, start, end, skip_disjoint_check)?;
443 self.write_to_parquet(closes, start, end, skip_disjoint_check)?;
444
445 for (_, items) in custom_data {
446 self.write_custom_data_batch(items, start, end, skip_disjoint_check)?;
447 }
448
449 Ok(())
450 }
451
452 /// Writes typed data to a Parquet file in the catalog.
453 ///
454 /// This is the core method for persisting market data to the catalog. It handles data
455 /// validation, batching, compression, and ensures proper file organization with
456 /// timestamp-based naming.
457 ///
458 /// # Type Parameters
459 ///
460 /// - `T`: The data type to write, must implement required traits for serialization and cataloging.
461 ///
462 /// # Parameters
463 ///
464 /// - `data`: Vector of data records to write (must be in ascending timestamp order).
465 /// - `start`: Optional start timestamp to override the natural data range.
466 /// - `end`: Optional end timestamp to override the natural data range.
467 ///
468 /// # Returns
469 ///
470 /// Returns the [`PathBuf`] of the created file, or an empty path if no data was provided.
471 /// If the target file already exists, returns the path without writing (skips write).
472 ///
473 /// # Errors
474 ///
475 /// Returns an error if:
476 /// - Data serialization to Arrow record batches fails.
477 /// - Object store write operations fail.
478 /// - File path construction fails.
479 /// - Writing would create non-disjoint timestamp intervals.
480 ///
481 /// # Panics
482 ///
483 /// Panics if:
484 /// - Data timestamps are not in ascending order.
485 /// - Record batches are empty after conversion.
486 /// - Required metadata is missing from the schema.
487 ///
488 /// # Examples
489 ///
490 /// ```rust,no_run
491 /// use nautilus_model::data::QuoteTick;
492 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
493 ///
494 /// let catalog = ParquetDataCatalog::new(/* ... */);
495 /// let quotes: Vec<QuoteTick> = vec![/* quote data */];
496 ///
497 /// let path = catalog.write_to_parquet(quotes, None, None)?;
498 /// println!("Data written to: {:?}", path);
499 /// # Ok::<(), anyhow::Error>(())
500 /// ```
501 pub fn write_to_parquet<T>(
502 &self,
503 data: Vec<T>,
504 start: Option<UnixNanos>,
505 end: Option<UnixNanos>,
506 skip_disjoint_check: Option<bool>,
507 ) -> anyhow::Result<PathBuf>
508 where
509 T: HasTsInit + EncodeToRecordBatch + CatalogPathPrefix,
510 {
511 if data.is_empty() {
512 return Ok(PathBuf::new());
513 }
514
515 let type_name = to_snake_case(std::any::type_name::<T>());
516 Self::check_ascending_timestamps(&data, &type_name)?;
517
518 let start_ts = start.unwrap_or(data.first().unwrap().ts_init());
519 let end_ts = end.unwrap_or(data.last().unwrap().ts_init());
520
521 let batches = self.data_to_record_batches(data)?;
522 let schema = batches.first().expect("Batches are empty.").schema();
523
524 let identifier = if T::path_prefix() == "bars" {
525 schema.metadata.get("bar_type").cloned()
526 } else {
527 schema.metadata.get("instrument_id").cloned()
528 };
529
530 let directory = self.make_path(T::path_prefix(), identifier.as_deref())?;
531 let filename = timestamps_to_filename(start_ts, end_ts);
532 let path = PathBuf::from(format!("{directory}/{filename}"));
533 let object_path = self.to_object_path(&path.to_string_lossy())?;
534
535 let file_exists = self.execute_async(async {
536 let exists: bool = self.object_store.head(&object_path).await.is_ok();
537 Ok(exists)
538 })?;
539
540 if file_exists {
541 log::info!("File {} already exists, skipping write", path.display());
542 return Ok(path);
543 }
544
545 if !skip_disjoint_check.unwrap_or(false) {
546 let current_intervals = self.get_directory_intervals(&directory)?;
547 let new_interval = (start_ts.as_u64(), end_ts.as_u64());
548 let mut new_intervals = current_intervals.clone();
549 new_intervals.push(new_interval);
550
551 if !are_intervals_disjoint(&new_intervals) {
552 anyhow::bail!(
553 "Writing file {filename} with interval ({start_ts}, {end_ts}) would create \
554 non-disjoint intervals. Existing intervals: {current_intervals:?}"
555 );
556 }
557 }
558
559 log::info!(
560 "Writing {} batches of {type_name} data to {}",
561 batches.len(),
562 path.display(),
563 );
564
565 self.execute_async(async {
566 write_batches_to_object_store(
567 &batches,
568 self.object_store.clone(),
569 &object_path,
570 Some(self.compression),
571 Some(self.max_row_group_size),
572 None,
573 )
574 .await
575 })?;
576
577 Ok(path)
578 }
579
580 /// Writes custom data to a Parquet file in the catalog.
581 ///
582 /// This method handles writing custom data types that implement `CustomDataTrait`.
583 /// Custom data is organized by type name in a `custom/{type_name}/` directory structure.
584 ///
585 /// # Parameters
586 ///
587 /// - `data`: Vector of custom data items to write (must be in ascending timestamp order).
588 /// - `start`: Optional start timestamp to override the natural data range.
589 /// - `end`: Optional end timestamp to override the natural data range.
590 /// - `skip_disjoint_check`: Whether to skip interval disjointness validation.
591 ///
592 /// # Returns
593 ///
594 /// Returns the [`PathBuf`] of the created file, or an empty path if no data was provided.
595 ///
596 /// # Errors
597 ///
598 /// Returns an error if:
599 /// - Data serialization to Arrow record batches fails.
600 /// - Object store write operations fail.
601 /// - File path construction fails.
602 /// - Writing would create non-disjoint timestamp intervals (unless skipped).
603 pub fn write_custom_data_batch(
604 &self,
605 data: Vec<CustomData>,
606 start: Option<UnixNanos>,
607 end: Option<UnixNanos>,
608 skip_disjoint_check: Option<bool>,
609 ) -> anyhow::Result<PathBuf> {
610 if data.is_empty() {
611 return Ok(PathBuf::new());
612 }
613
614 let (batch, type_name, identifier, start_ts, end_ts) = prepare_custom_data_batch(data)?;
615 let start_ts = start.unwrap_or(start_ts);
616 let end_ts = end.unwrap_or(end_ts);
617 let batches = vec![batch];
618
619 let directory = self.make_path_custom_data(&type_name, identifier.as_deref())?;
620 let filename = timestamps_to_filename(start_ts, end_ts);
621 let path = PathBuf::from(format!("{directory}/{filename}"));
622 let object_path = self.to_object_path(&path.to_string_lossy())?;
623
624 let file_exists = self.execute_async(async {
625 let exists: bool = self.object_store.head(&object_path).await.is_ok();
626 Ok(exists)
627 })?;
628
629 if file_exists {
630 log::info!("File {} already exists, skipping write", path.display());
631 return Ok(path);
632 }
633
634 if !skip_disjoint_check.unwrap_or(false) {
635 let current_intervals = self.get_directory_intervals(&directory)?;
636 let new_interval = (start_ts.as_u64(), end_ts.as_u64());
637 let mut new_intervals = current_intervals.clone();
638 new_intervals.push(new_interval);
639
640 if !are_intervals_disjoint(&new_intervals) {
641 anyhow::bail!(
642 "Writing file {filename} with interval ({start_ts}, {end_ts}) would create \
643 non-disjoint intervals. Existing intervals: {current_intervals:?}"
644 );
645 }
646 }
647
648 self.execute_async(async {
649 write_batches_to_object_store(
650 &batches,
651 self.object_store.clone(),
652 &object_path,
653 Some(self.compression),
654 Some(self.max_row_group_size),
655 None,
656 )
657 .await
658 })?;
659
660 Ok(path)
661 }
662
663 /// Writes instruments to Parquet files in the catalog.
664 ///
665 /// Instruments are stored under their instrument ID directory using timestamp-ranged
666 /// file names, allowing multiple historical versions of the same instrument to be
667 /// appended over time:
668 /// `data/instruments/{instrument_id}/{start_ts}-{end_ts}.parquet`
669 ///
670 /// # Parameters
671 ///
672 /// - `instruments`: Vector of instruments to write.
673 ///
674 /// # Returns
675 ///
676 /// Returns a vector of paths to the created files.
677 ///
678 /// # Errors
679 ///
680 /// Returns an error if:
681 /// - Data serialization fails.
682 /// - Object store write operations fail.
683 /// - File path construction fails.
684 ///
685 /// # Examples
686 ///
687 /// ```rust,no_run
688 /// use nautilus_model::instruments::InstrumentAny;
689 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
690 ///
691 /// let catalog = ParquetDataCatalog::new(/* ... */);
692 /// let instruments: Vec<InstrumentAny> = vec![/* instruments */];
693 ///
694 /// let paths = catalog.write_instruments(instruments)?;
695 /// # Ok::<(), anyhow::Error>(())
696 /// ```
697 pub fn write_instruments(
698 &self,
699 instruments: Vec<InstrumentAny>,
700 ) -> anyhow::Result<Vec<PathBuf>> {
701 use nautilus_model::instruments::Instrument;
702
703 if instruments.is_empty() {
704 return Ok(Vec::new());
705 }
706
707 // Group instruments by concrete type and instrument_id so mixed InstrumentAny
708 // inputs are written as separate parquet batches with stable ordering.
709 let mut by_type_and_id: BTreeMap<(String, String), Vec<InstrumentAny>> = BTreeMap::new();
710
711 for instrument in instruments {
712 let instrument_type = Self::instrument_type_name(&instrument).to_string();
713 let instrument_id = Instrument::id(&instrument).to_string();
714 by_type_and_id
715 .entry((instrument_type, instrument_id))
716 .or_default()
717 .push(instrument);
718 }
719
720 let mut paths = Vec::new();
721
722 for ((_instrument_type, instrument_id), instrument_group) in by_type_and_id {
723 Self::check_ascending_timestamps(&instrument_group, "instrument")?;
724
725 let start_ts = HasTsInit::ts_init(instrument_group.first().unwrap());
726 let end_ts = HasTsInit::ts_init(instrument_group.last().unwrap());
727 let batches = self.data_to_record_batches(instrument_group)?;
728 if batches.is_empty() {
729 continue;
730 }
731
732 let directory = self.make_path("instruments", Some(instrument_id.as_str()))?;
733 let filename = timestamps_to_filename(start_ts, end_ts);
734 let path = PathBuf::from(format!("{directory}/{filename}"));
735 let object_path = self.to_object_path(&path.to_string_lossy())?;
736
737 let file_exists = self
738 .execute_async(async { Ok(self.object_store.head(&object_path).await.is_ok()) })?;
739
740 if file_exists {
741 log::info!(
742 "Instrument file {} already exists, skipping write",
743 path.display()
744 );
745 paths.push(path);
746 continue;
747 }
748
749 let current_intervals = self.get_directory_intervals(&directory)?;
750 let new_interval = (start_ts.as_u64(), end_ts.as_u64());
751 let mut new_intervals = current_intervals.clone();
752 new_intervals.push(new_interval);
753
754 if !are_intervals_disjoint(&new_intervals) {
755 anyhow::bail!(
756 "Writing file {filename} with interval ({start_ts}, {end_ts}) would create \
757 non-disjoint intervals. Existing intervals: {current_intervals:?}"
758 );
759 }
760
761 log::info!(
762 "Writing {} batches of instrument data for {instrument_id} to {}",
763 batches.len(),
764 path.display(),
765 );
766
767 // ArrowWriter stores the full schema (including "class" metadata) in ARROW:schema.
768 // When reading, use the builder's schema for metadata (see query_instruments).
769 self.execute_async(async {
770 write_batches_to_object_store(
771 &batches,
772 self.object_store.clone(),
773 &object_path,
774 Some(self.compression),
775 Some(self.max_row_group_size),
776 None,
777 )
778 .await
779 })?;
780
781 paths.push(path);
782 }
783
784 Ok(paths)
785 }
786
787 /// Queries instruments from the catalog.
788 ///
789 /// Instruments are stored by instrument ID in timestamp-ranged parquet files under
790 /// `data/instruments/{instrument_id}/`. Legacy `instrument.parquet` files are still
791 /// supported for backwards compatibility.
792 ///
793 /// # Parameters
794 ///
795 /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, queries all instruments.
796 ///
797 /// # Returns
798 ///
799 /// Returns a vector of `InstrumentAny` instances, or an error if the operation fails.
800 ///
801 /// # Errors
802 ///
803 /// Returns an error if:
804 /// - File discovery fails.
805 /// - File reading fails.
806 /// - Data deserialization fails.
807 ///
808 /// # Examples
809 ///
810 /// ```rust,no_run
811 /// use nautilus_model::instruments::InstrumentAny;
812 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
813 ///
814 /// let catalog = ParquetDataCatalog::new(/* ... */);
815 ///
816 /// // Query all instruments
817 /// let instruments = catalog.query_instruments(None)?;
818 ///
819 /// // Query specific instruments
820 /// let instruments = catalog.query_instruments(Some(vec!["EUR/USD.SIM".to_string()]))?;
821 /// # Ok::<(), anyhow::Error>(())
822 /// ```
823 pub fn query_instruments(
824 &self,
825 instrument_ids: Option<&[String]>,
826 ) -> anyhow::Result<Vec<InstrumentAny>> {
827 self.query_instruments_filtered(instrument_ids, None, None)
828 }
829
830 /// Queries instruments from the catalog with optional timestamp filtering.
831 ///
832 /// This reads all matching parquet files under `data/instruments/{instrument_id}/`,
833 /// including legacy `instrument.parquet` files, decodes the records back to
834 /// `InstrumentAny`, and filters them by `ts_init` when a range is provided.
835 pub fn query_instruments_filtered(
836 &self,
837 instrument_ids: Option<&[String]>,
838 start: Option<UnixNanos>,
839 end: Option<UnixNanos>,
840 ) -> anyhow::Result<Vec<InstrumentAny>> {
841 use nautilus_serialization::arrow::instrument::decode_instrument_any_batch;
842
843 let base_dir = self.make_path("instruments", None)?;
844 let mut all_instruments = Vec::new();
845 let start_u64 = start.map(|ts| ts.as_u64());
846 let end_u64 = end.map(|ts| ts.as_u64());
847
848 let list_result = self.execute_async(async {
849 let prefix = ObjectPath::from(format!("{base_dir}/"));
850 let mut stream = self.object_store.list(Some(&prefix));
851 let mut objects = Vec::new();
852 while let Some(object) = stream.next().await {
853 objects.push(object?);
854 }
855 Ok::<Vec<_>, anyhow::Error>(objects)
856 })?;
857
858 let mut instrument_files = Vec::new();
859
860 for object in list_result {
861 let path_str = object.location.to_string();
862 if !path_str.ends_with(".parquet") {
863 continue;
864 }
865
866 let path_parts: Vec<&str> = path_str.split('/').collect();
867 if path_parts.len() < 2 {
868 continue;
869 }
870
871 let instrument_id_dir = path_parts[path_parts.len() - 2];
872
873 if let Some(ids) = instrument_ids
874 && !ids
875 .iter()
876 .map(|id| urisafe_instrument_id(id))
877 .any(|x| x.as_str() == urisafe_instrument_id(instrument_id_dir))
878 {
879 continue;
880 }
881
882 let include_file = if path_str.ends_with("/instrument.parquet") {
883 true
884 } else {
885 query_intersects_filename(&path_str, start_u64, end_u64)
886 };
887
888 if include_file {
889 instrument_files.push(path_str);
890 }
891 }
892
893 instrument_files.sort();
894
895 for file_path in instrument_files {
896 let object_path = self.to_object_path_parsed(&file_path)?;
897 let (batches, builder_schema) = self.execute_async(async {
898 read_parquet_from_object_store(self.object_store.clone(), &object_path).await
899 })?;
900
901 let metadata: std::collections::HashMap<String, String> =
902 builder_schema.metadata().clone();
903
904 for batch in batches {
905 let mut instruments = decode_instrument_any_batch(&metadata, &batch)?;
906
907 if start.is_some() || end.is_some() {
908 instruments.retain(|instrument| {
909 let ts = HasTsInit::ts_init(instrument).as_u64();
910 start_u64.is_none_or(|value| ts >= value)
911 && end_u64.is_none_or(|value| ts <= value)
912 });
913 }
914 all_instruments.extend(instruments);
915 }
916 }
917
918 all_instruments.sort_by_key(HasTsInit::ts_init);
919
920 Ok(all_instruments)
921 }
922
923 /// Writes typed data to a JSON file in the catalog.
924 ///
925 /// This method provides an alternative to Parquet format for data export and debugging.
926 /// JSON files are human-readable but less efficient for large datasets.
927 ///
928 /// # Type Parameters
929 ///
930 /// - `T`: The data type to write, must implement serialization and cataloging traits.
931 ///
932 /// # Parameters
933 ///
934 /// - `data`: Vector of data records to write (must be in ascending timestamp order).
935 /// - `path`: Optional custom directory path (defaults to catalog's standard structure).
936 /// - `write_metadata`: Whether to write a separate metadata file alongside the data.
937 ///
938 /// # Returns
939 ///
940 /// Returns the [`PathBuf`] of the created JSON file.
941 ///
942 /// # Errors
943 ///
944 /// Returns an error if:
945 /// - JSON serialization fails.
946 /// - Object store write operations fail.
947 /// - File path construction fails.
948 ///
949 /// # Panics
950 ///
951 /// Panics if data timestamps are not in ascending order.
952 ///
953 /// # Examples
954 ///
955 /// ```rust,no_run
956 /// use std::path::PathBuf;
957 /// use nautilus_model::data::TradeTick;
958 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
959 ///
960 /// let catalog = ParquetDataCatalog::new(/* ... */);
961 /// let trades: Vec<TradeTick> = vec![/* trade data */];
962 ///
963 /// let path = catalog.write_to_json(
964 /// trades,
965 /// Some(PathBuf::from("/custom/path")),
966 /// true // write metadata
967 /// )?;
968 /// # Ok::<(), anyhow::Error>(())
969 /// ```
970 pub fn write_to_json<T>(
971 &self,
972 data: Vec<T>,
973 path: Option<PathBuf>,
974 write_metadata: bool,
975 ) -> anyhow::Result<PathBuf>
976 where
977 T: HasTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
978 {
979 if data.is_empty() {
980 return Ok(PathBuf::new());
981 }
982
983 let type_name = to_snake_case(std::any::type_name::<T>());
984 Self::check_ascending_timestamps(&data, &type_name)?;
985
986 let start_ts = data.first().unwrap().ts_init();
987 let end_ts = data.last().unwrap().ts_init();
988
989 let directory =
990 path.unwrap_or_else(|| PathBuf::from(self.make_path(T::path_prefix(), None).unwrap()));
991 let filename = timestamps_to_filename(start_ts, end_ts).replace(".parquet", ".json");
992 let json_path = directory.join(&filename);
993
994 log::info!(
995 "Writing {} records of {type_name} data to {}",
996 data.len(),
997 json_path.display(),
998 );
999
1000 if write_metadata {
1001 let metadata = T::chunk_metadata(&data);
1002 let metadata_path = json_path.with_extension("metadata.json");
1003 log::info!("Writing metadata to {}", metadata_path.display());
1004
1005 // Use object store for metadata file
1006 let metadata_object_path = ObjectPath::from(metadata_path.to_string_lossy().as_ref());
1007 let metadata_json = serde_json::to_vec_pretty(&metadata)?;
1008 self.execute_async(async {
1009 let _: object_store::PutResult = self
1010 .object_store
1011 .put(&metadata_object_path, metadata_json.into())
1012 .await?;
1013 Ok(())
1014 })?;
1015 }
1016
1017 // Use object store for main JSON file
1018 let json_object_path = ObjectPath::from(json_path.to_string_lossy().as_ref());
1019 let json_data = serde_json::to_vec_pretty(&serde_json::to_value(data)?)?;
1020 self.execute_async(async {
1021 let _: object_store::PutResult = self
1022 .object_store
1023 .put(&json_object_path, json_data.into())
1024 .await?;
1025 Ok(())
1026 })?;
1027
1028 Ok(json_path)
1029 }
1030
1031 /// Validates that data timestamps are in ascending order.
1032 ///
1033 /// # Parameters
1034 ///
1035 /// - `data`: Slice of data records to validate.
1036 /// - `type_name`: Name of the data type for error messages.
1037 ///
1038 pub fn check_ascending_timestamps<T: HasTsInit>(
1039 data: &[T],
1040 type_name: &str,
1041 ) -> anyhow::Result<()> {
1042 if !data
1043 .array_windows()
1044 .all(|[a, b]| a.ts_init() <= b.ts_init())
1045 {
1046 anyhow::bail!("{type_name} timestamps must be in ascending order");
1047 }
1048
1049 Ok(())
1050 }
1051
1052 fn instrument_type_name(instrument: &InstrumentAny) -> &'static str {
1053 match instrument {
1054 InstrumentAny::Betting(_) => "BettingInstrument",
1055 InstrumentAny::BinaryOption(_) => "BinaryOption",
1056 InstrumentAny::Cfd(_) => "Cfd",
1057 InstrumentAny::Commodity(_) => "Commodity",
1058 InstrumentAny::CryptoFuture(_) => "CryptoFuture",
1059 InstrumentAny::CryptoOption(_) => "CryptoOption",
1060 InstrumentAny::CryptoPerpetual(_) => "CryptoPerpetual",
1061 InstrumentAny::CurrencyPair(_) => "CurrencyPair",
1062 InstrumentAny::Equity(_) => "Equity",
1063 InstrumentAny::FuturesContract(_) => "FuturesContract",
1064 InstrumentAny::FuturesSpread(_) => "FuturesSpread",
1065 InstrumentAny::IndexInstrument(_) => "IndexInstrument",
1066 InstrumentAny::OptionContract(_) => "OptionContract",
1067 InstrumentAny::OptionSpread(_) => "OptionSpread",
1068 InstrumentAny::PerpetualContract(_) => "PerpetualContract",
1069 InstrumentAny::TokenizedAsset(_) => "TokenizedAsset",
1070 }
1071 }
1072
1073 /// Converts data into Arrow record batches for Parquet serialization.
1074 ///
1075 /// This method chunks the data according to the configured batch size and converts
1076 /// each chunk into an Arrow record batch with appropriate metadata.
1077 ///
1078 /// # Type Parameters
1079 ///
1080 /// - `T`: The data type to convert, must implement required encoding traits.
1081 ///
1082 /// # Parameters
1083 ///
1084 /// - `data`: Vector of data records to convert.
1085 ///
1086 /// # Returns
1087 ///
1088 /// Returns a vector of Arrow [`RecordBatch`] instances ready for Parquet serialization.
1089 ///
1090 /// # Errors
1091 ///
1092 /// Returns an error if record batch encoding fails for any chunk.
1093 pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
1094 where
1095 T: HasTsInit + EncodeToRecordBatch,
1096 {
1097 let mut batches = Vec::new();
1098
1099 for chunk in &data.into_iter().chunks(self.batch_size) {
1100 let data = chunk.collect_vec();
1101 let metadata = EncodeToRecordBatch::chunk_metadata(&data);
1102 let record_batch = T::encode_batch(&metadata, &data)?;
1103 batches.push(record_batch);
1104 }
1105
1106 Ok(batches)
1107 }
1108
1109 /// Extends the timestamp range of an existing Parquet file by renaming it.
1110 ///
1111 /// This method finds an existing file that is adjacent to the specified time range
1112 /// and renames it to include the new range. This is useful when appending data
1113 /// that extends the time coverage of existing files.
1114 ///
1115 /// # Parameters
1116 ///
1117 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1118 /// - `identifier`: Optional identifier to target a specific instrument's data. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1119 /// - `start`: Start timestamp of the new range to extend to.
1120 /// - `end`: End timestamp of the new range to extend to.
1121 ///
1122 /// # Returns
1123 ///
1124 /// Returns `Ok(())` on success, or an error if the operation fails.
1125 ///
1126 /// # Errors
1127 ///
1128 /// Returns an error if:
1129 /// - The directory path cannot be constructed.
1130 /// - No adjacent file is found to extend.
1131 /// - File rename operations fail.
1132 /// - Interval validation fails after extension.
1133 ///
1134 /// # Examples
1135 ///
1136 /// ```rust,no_run
1137 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1138 /// use nautilus_core::UnixNanos;
1139 ///
1140 /// let catalog = ParquetDataCatalog::new(/* ... */);
1141 ///
1142 /// // Extend a file's range backwards or forwards
1143 /// catalog.extend_file_name(
1144 /// "quotes",
1145 /// Some("BTC/USD.SIM".to_string()),
1146 /// UnixNanos::from(1609459200000000000),
1147 /// UnixNanos::from(1609545600000000000)
1148 /// )?;
1149 /// # Ok::<(), anyhow::Error>(())
1150 /// ```
1151 pub fn extend_file_name(
1152 &self,
1153 data_cls: &str,
1154 identifier: Option<&str>,
1155 start: UnixNanos,
1156 end: UnixNanos,
1157 ) -> anyhow::Result<()> {
1158 let directory = self.make_path(data_cls, identifier)?;
1159 let intervals = self.get_directory_intervals(&directory)?;
1160
1161 let start = start.as_u64();
1162 let end = end.as_u64();
1163
1164 for interval in intervals {
1165 if interval.0 == end + 1 {
1166 // Extend backwards: new file covers [start, interval.1]
1167 self.rename_parquet_file(&directory, interval.0, interval.1, start, interval.1)?;
1168 break;
1169 } else if interval.1 == start - 1 {
1170 // Extend forwards: new file covers [interval.0, end]
1171 self.rename_parquet_file(&directory, interval.0, interval.1, interval.0, end)?;
1172 break;
1173 }
1174 }
1175
1176 let intervals = self.get_directory_intervals(&directory)?;
1177
1178 if !are_intervals_disjoint(&intervals) {
1179 anyhow::bail!("Intervals are not disjoint after extending a file");
1180 }
1181
1182 Ok(())
1183 }
1184
1185 /// Lists all Parquet files in a specified directory.
1186 ///
1187 /// This method scans a directory and returns the full paths of all files with the `.parquet`
1188 /// extension. It works with both local filesystems and remote object stores, making it
1189 /// suitable for various storage backends.
1190 ///
1191 /// # Parameters
1192 ///
1193 /// - `directory`: The directory path to scan for Parquet files.
1194 ///
1195 /// # Returns
1196 ///
1197 /// Returns a vector of full file paths (as strings) for all Parquet files found in the directory.
1198 /// The paths are relative to the object store root and suitable for use with object store operations.
1199 /// Returns an empty vector if the directory doesn't exist or contains no Parquet files.
1200 ///
1201 /// # Errors
1202 ///
1203 /// Returns an error if:
1204 /// - Object store listing operations fail.
1205 /// - Directory access is denied.
1206 /// - Network issues occur (for remote object stores).
1207 ///
1208 /// # Notes
1209 ///
1210 /// - Only files ending with `.parquet` are included.
1211 /// - Subdirectories are not recursively scanned.
1212 /// - File paths are returned in the order provided by the object store.
1213 /// - Works with all supported object store backends (local, S3, GCS, Azure, etc.).
1214 ///
1215 /// # Examples
1216 ///
1217 /// ```rust,no_run
1218 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1219 ///
1220 /// let catalog = ParquetDataCatalog::new(/* ... */);
1221 /// let files = catalog.list_parquet_files("data/quotes/EURUSD")?;
1222 ///
1223 /// for file in files {
1224 /// println!("Found Parquet file: {}", file);
1225 /// }
1226 /// # Ok::<(), anyhow::Error>(())
1227 /// ```
1228 pub fn list_parquet_files(&self, directory: &str) -> anyhow::Result<Vec<String>> {
1229 self.execute_async(async {
1230 let prefix = ObjectPath::from(format!("{directory}/"));
1231 let mut stream = self.object_store.list(Some(&prefix));
1232 let mut files = Vec::new();
1233
1234 while let Some(object) = stream.next().await {
1235 let object = object?;
1236 if object.location.as_ref().ends_with(".parquet") {
1237 files.push(object.location.to_string());
1238 }
1239 }
1240 Ok::<Vec<String>, anyhow::Error>(files)
1241 })
1242 }
1243
1244 /// Lists all instrument identifiers for a specific data type.
1245 ///
1246 /// This method scans the data directory for a given data type and extracts
1247 /// all unique instrument identifiers from the directory structure.
1248 ///
1249 /// # Parameters
1250 ///
1251 /// - `data_type`: The data type directory name (e.g., "quotes", "trades", "bars").
1252 ///
1253 /// # Returns
1254 ///
1255 /// Returns a vector of instrument identifier strings.
1256 ///
1257 /// # Errors
1258 ///
1259 /// Returns an error if directory listing fails.
1260 pub fn list_instruments(&self, data_type: &str) -> anyhow::Result<Vec<String>> {
1261 self.execute_async(async {
1262 let prefix = ObjectPath::from(format!("data/{data_type}/"));
1263 let mut stream = self.object_store.list(Some(&prefix));
1264 let mut instruments = HashSet::new();
1265
1266 while let Some(object) = stream.next().await {
1267 let object = object?;
1268 let path = object.location.as_ref();
1269 let parts: Vec<&str> = path.split('/').collect();
1270 if parts.len() >= 3 {
1271 instruments.insert(parts[2].to_string());
1272 }
1273 }
1274 Ok::<Vec<String>, anyhow::Error>(instruments.into_iter().collect())
1275 })
1276 }
1277
1278 /// Lists Parquet files matching specific criteria (data type, identifiers, time range).
1279 ///
1280 /// This method finds all Parquet files that match the specified criteria by filtering
1281 /// files based on their directory structure and filename timestamps.
1282 ///
1283 /// # Parameters
1284 ///
1285 /// - `data_type`: The data type directory name (e.g., "quotes", "trades", "custom/MyType").
1286 /// - `identifiers`: Optional list of identifiers to filter by.
1287 /// - `start`: Optional start timestamp to filter files by their time range.
1288 /// - `end`: Optional end timestamp to filter files by their time range.
1289 ///
1290 /// # Returns
1291 ///
1292 /// Returns a vector of file paths that match the criteria.
1293 ///
1294 /// # Errors
1295 ///
1296 /// Returns an error if directory listing or file filtering fails.
1297 pub fn list_parquet_files_with_criteria(
1298 &self,
1299 data_type: &str,
1300 identifiers: Option<&[String]>,
1301 start: Option<UnixNanos>,
1302 end: Option<UnixNanos>,
1303 ) -> anyhow::Result<Vec<String>> {
1304 let mut all_files = Vec::new();
1305
1306 let start_u64 = start.map(|s| s.as_u64());
1307 let end_u64 = end.map(|e| e.as_u64());
1308
1309 let base_dir = self.make_path(data_type, None)?;
1310
1311 // Use recursive listing to match Python's glob behavior
1312 let list_result = self.execute_async(async {
1313 let prefix = ObjectPath::from(format!("{base_dir}/"));
1314 let mut stream = self.object_store.list(Some(&prefix));
1315 let mut objects = Vec::new();
1316 while let Some(object) = stream.next().await {
1317 objects.push(object?);
1318 }
1319 Ok::<Vec<_>, anyhow::Error>(objects)
1320 })?;
1321
1322 for object in list_result {
1323 let path_str = object.location.to_string();
1324
1325 // Filter by identifiers if provided
1326 if let Some(ids) = identifiers {
1327 let path_components = extract_path_components(&path_str);
1328 let mut matches = false;
1329
1330 for id in ids {
1331 if path_components.iter().any(|c| c.contains(id)) {
1332 matches = true;
1333 break;
1334 }
1335 }
1336
1337 if !matches {
1338 continue;
1339 }
1340 }
1341
1342 // Filter by timestamp range if filename can be parsed
1343 if path_str.ends_with(".parquet")
1344 && query_intersects_filename(&path_str, start_u64, end_u64)
1345 {
1346 all_files.push(path_str);
1347 }
1348 }
1349
1350 Ok(all_files)
1351 }
1352
1353 /// Helper method to reconstruct full URI for remote object store paths
1354 #[must_use]
1355 pub fn reconstruct_full_uri(&self, path_str: &str) -> String {
1356 if path_str.contains("://") {
1357 return path_str.to_string();
1358 }
1359
1360 // Check if this is a remote URI scheme that needs reconstruction
1361 if self.is_remote_uri() {
1362 let path = self.path_under_base(path_str);
1363 if let Ok(uri) = remote_full_uri(&self.original_uri, &path) {
1364 return uri;
1365 }
1366 }
1367
1368 // For local paths, extract the directory from the original URI
1369 if self.original_uri.starts_with("file://") {
1370 // Extract the path from the file:// URI
1371 if let Ok(url) = url::Url::parse(&self.original_uri)
1372 && let Ok(base_path) = url.to_file_path()
1373 {
1374 // Use platform-appropriate path separator for display
1375 // but object store paths always use forward slashes
1376 let base_str = base_path.to_string_lossy();
1377 return self.join_paths(&base_str, path_str);
1378 }
1379 }
1380
1381 // For local paths without file:// prefix, use the original URI as base
1382 if self.base_path.is_empty() {
1383 // If base_path is empty and not a file URI, try using original_uri as base
1384 if self.original_uri.contains("://") {
1385 // Fallback: return the path as-is
1386 path_str.to_string()
1387 } else {
1388 self.join_paths(self.original_uri.trim_end_matches('/'), path_str)
1389 }
1390 } else {
1391 let base = self.base_path.trim_end_matches('/');
1392 self.join_paths(base, path_str)
1393 }
1394 }
1395
1396 /// Helper method to join paths using forward slashes (object store convention)
1397 #[must_use]
1398 fn join_paths(&self, base: &str, path: &str) -> String {
1399 make_object_store_path(base, &[path])
1400 }
1401
1402 /// Resolves a path for use with DataFusion (avoiding Windows path doubling for file://).
1403 /// Returns the path as-is if it is already a full URI or absolute; otherwise builds
1404 /// file:// base + path for local catalogs or reconstruct_full_uri for remote.
1405 #[must_use]
1406 fn resolve_path_for_datafusion(&self, path: &str) -> String {
1407 if path.contains("://") {
1408 return path.to_string();
1409 }
1410
1411 if path.starts_with('/') {
1412 return path.to_string();
1413 }
1414
1415 if self.original_uri.starts_with("file://") {
1416 let base = self.original_uri.trim_end_matches('/');
1417 let path_trimmed = path.trim_end_matches('/');
1418 return format!("{base}/{path_trimmed}");
1419 }
1420 self.reconstruct_full_uri(path)
1421 }
1422
1423 /// Like resolve_path_for_datafusion but ensures the result ends with a trailing slash.
1424 #[must_use]
1425 fn resolve_directory_for_datafusion(&self, directory: &str) -> String {
1426 let mut resolved = self.resolve_path_for_datafusion(directory);
1427 if !resolved.ends_with('/') {
1428 resolved.push('/');
1429 }
1430 resolved
1431 }
1432
1433 /// Returns the path string to push in query_files result list: relative for file://,
1434 /// full URI for remote (so callers can pass to resolve_path_for_datafusion later).
1435 #[must_use]
1436 fn path_for_query_list(&self, path: &str) -> String {
1437 if self.original_uri.starts_with("file://") {
1438 path.to_string()
1439 } else {
1440 self.reconstruct_full_uri(path)
1441 }
1442 }
1443
1444 /// Returns the native path string for the catalog root (for std::fs). Only valid when
1445 /// !is_remote_uri(); uses parquet's file_uri_to_native_path for file:// URIs.
1446 #[must_use]
1447 fn native_base_path_string(&self) -> String {
1448 if self.original_uri.starts_with("file://") {
1449 crate::parquet::file_uri_to_native_path(&self.original_uri)
1450 } else {
1451 self.original_uri.clone()
1452 }
1453 }
1454
1455 /// Helper method to check if the original URI uses a remote object store scheme
1456 #[must_use]
1457 pub fn is_remote_uri(&self) -> bool {
1458 self.original_uri
1459 .split_once("://")
1460 .is_some_and(|(scheme, _)| is_remote_uri_scheme(scheme))
1461 }
1462
1463 /// Executes a query against the catalog to retrieve market data of a specific type.
1464 ///
1465 /// This is the primary method for querying data from the catalog. It registers the appropriate
1466 /// object store with the DataFusion session, finds all relevant Parquet files, and executes
1467 /// the query across them. The method supports filtering by instrument IDs, time ranges, and
1468 /// custom SQL WHERE clauses.
1469 ///
1470 /// # Type Parameters
1471 ///
1472 /// - `T`: The data type to query, must implement required traits for deserialization and cataloging.
1473 ///
1474 /// # Parameters
1475 ///
1476 /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings (e.g., "EUR/USD.SIM")
1477 /// or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL"). If `None`, queries all identifiers.
1478 /// For bars, partial matching is supported (e.g., "EUR/USD.SIM" will match "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1479 /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
1480 /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
1481 /// - `where_clause`: Optional SQL WHERE clause for additional filtering (e.g., "price > 100").
1482 /// - `files`: Optional list of specific files to query. If provided, skips file discovery.
1483 /// - `optimize_file_loading`: If `true` (default), registers entire directories with DataFusion,
1484 /// which is more efficient for managing many files. If `false`, registers each file individually
1485 /// (needed for operations like consolidation where precise file control is required).
1486 ///
1487 /// # Returns
1488 ///
1489 /// Returns a [`QueryResult`] containing the query execution context and data.
1490 /// Use [`QueryResult::collect()`] to retrieve the actual data records.
1491 ///
1492 /// # Errors
1493 ///
1494 /// Returns an error if:
1495 /// - Object store registration fails for remote URIs.
1496 /// - File discovery fails.
1497 /// - DataFusion query execution fails.
1498 /// - Data deserialization fails.
1499 ///
1500 /// # Performance Notes
1501 ///
1502 /// - Files are automatically filtered by timestamp ranges before querying.
1503 /// - DataFusion optimizes queries across multiple Parquet files.
1504 /// - Use specific instrument IDs and time ranges to improve performance.
1505 /// - WHERE clauses are pushed down to the Parquet reader when possible.
1506 /// - Directory-based registration (`optimize_file_loading=true`) is more efficient for queries
1507 /// with many files, as it reduces the number of table registrations.
1508 ///
1509 /// # Examples
1510 ///
1511 /// ```rust,no_run
1512 /// use nautilus_model::data::QuoteTick;
1513 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1514 /// use nautilus_core::UnixNanos;
1515 ///
1516 /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1517 ///
1518 /// // Query all quote data (uses directory-based registration by default)
1519 /// let result = catalog.query::<QuoteTick>(None, None, None, None, None, true)?;
1520 /// let quotes = result.collect();
1521 ///
1522 /// // Query specific instruments within a time range
1523 /// let result = catalog.query::<QuoteTick>(
1524 /// Some(vec!["EUR/USD.SIM".to_string(), "GBP/USD.SIM".to_string()]),
1525 /// Some(UnixNanos::from(1609459200000000000)),
1526 /// Some(UnixNanos::from(1609545600000000000)),
1527 /// None,
1528 /// None,
1529 /// true
1530 /// )?;
1531 ///
1532 /// // Query with custom WHERE clause and file-based registration
1533 /// let result = catalog.query::<QuoteTick>(
1534 /// Some(vec!["EUR/USD.SIM".to_string()]),
1535 /// None,
1536 /// None,
1537 /// Some("bid_price > 1.2000"),
1538 /// None,
1539 /// false // Use file-based registration for precise control
1540 /// )?;
1541 /// # Ok::<(), anyhow::Error>(())
1542 /// ```
1543 pub fn query<T>(
1544 &mut self,
1545 identifiers: Option<Vec<String>>,
1546 start: Option<UnixNanos>,
1547 end: Option<UnixNanos>,
1548 where_clause: Option<&str>,
1549 files: Option<Vec<String>>,
1550 optimize_file_loading: bool,
1551 ) -> anyhow::Result<QueryResult>
1552 where
1553 T: DecodeDataFromRecordBatch + CatalogPathPrefix,
1554 {
1555 // Register the object store with the session for remote URIs only.
1556 // For local file:// we do not register: we pass full file URLs to register_parquet
1557 // so DataFusion's default file provider handles them (avoids path doubling on Windows
1558 // where a registered store would receive a path that gets prefixed again).
1559 self.register_remote_object_store()?;
1560
1561 let files_list = if let Some(files) = files {
1562 files
1563 } else {
1564 self.query_files(T::path_prefix(), identifiers, start, end)?
1565 };
1566
1567 if optimize_file_loading {
1568 // Use directory-based registration for efficiency. DataFusion handles
1569 // reading all files in each directory, which is more memory-efficient
1570 // than registering many individual file tables.
1571 let directories: HashSet<String> = files_list
1572 .iter()
1573 .filter_map(|file_uri| {
1574 // Extract directory path (everything except the filename)
1575 let path = Path::new(file_uri);
1576 path.parent().map(|p| p.to_string_lossy().to_string())
1577 })
1578 .collect();
1579
1580 for directory in directories {
1581 // Extract identifier from directory path (last component)
1582 let path_parts: Vec<&str> = directory.split('/').collect();
1583 let identifier = if path_parts.is_empty() {
1584 "unknown".to_string()
1585 } else {
1586 path_parts[path_parts.len() - 1].to_string()
1587 };
1588 let safe_sql_identifier = make_sql_safe_identifier(&identifier);
1589
1590 // Create table name from path_prefix and identifier (no filename component)
1591 let table_name = format!("{}_{}", T::path_prefix(), safe_sql_identifier);
1592 let query = build_query(&table_name, start, end, where_clause);
1593
1594 let resolved_path = self.resolve_directory_for_datafusion(&directory);
1595
1596 self.session
1597 .add_file::<T>(&table_name, &resolved_path, Some(&query), None)?;
1598 }
1599 } else {
1600 // Register files individually (for operations requiring precise file control)
1601 for file_uri in &files_list {
1602 // Extract identifier from file path and filename to create meaningful table names
1603 let identifier = extract_identifier_from_path(file_uri);
1604 let safe_sql_identifier = make_sql_safe_identifier(&identifier);
1605 let safe_filename = extract_sql_safe_filename(file_uri);
1606
1607 // Create table name from path_prefix, identifier, and filename
1608 let table_name = format!(
1609 "{}_{}_{}",
1610 T::path_prefix(),
1611 safe_sql_identifier,
1612 safe_filename
1613 );
1614 let query = build_query(&table_name, start, end, where_clause);
1615
1616 let resolved_path = self.resolve_path_for_datafusion(file_uri);
1617 self.session
1618 .add_file::<T>(&table_name, &resolved_path, Some(&query), None)?;
1619 }
1620 }
1621
1622 Ok(self.session.get_query_result())
1623 }
1624
1625 /// Queries typed data from the catalog and returns results as a strongly-typed vector.
1626 ///
1627 /// This is a convenience method that wraps the generic `query` method and automatically
1628 /// collects and converts the results into a vector of the specific data type. It handles
1629 /// the type conversion from the generic [`Data`] enum to the concrete type `T`.
1630 ///
1631 /// # Type Parameters
1632 ///
1633 /// - `T`: The specific data type to query and return. Must implement required traits for
1634 /// deserialization, cataloging, and conversion from the [`Data`] enum.
1635 ///
1636 /// # Parameters
1637 ///
1638 /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings (e.g., "EUR/USD.SIM")
1639 /// or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL"). If `None`, queries all identifiers.
1640 /// For bars, partial matching is supported (e.g., "EUR/USD.SIM" will match "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1641 /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
1642 /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
1643 /// - `where_clause`: Optional SQL WHERE clause for additional filtering. Use standard SQL syntax
1644 /// with column names matching the Parquet schema (e.g., "`bid_price` > 1.2000", "volume > 1000").
1645 ///
1646 /// # Returns
1647 ///
1648 /// Returns a vector of the specific data type `T`, sorted by timestamp. The vector will be
1649 /// empty if no data matches the query criteria.
1650 ///
1651 /// # Errors
1652 ///
1653 /// Returns an error if:
1654 /// - The underlying query execution fails.
1655 /// - Data type conversion fails.
1656 /// - Object store access fails.
1657 /// - Invalid WHERE clause syntax is provided.
1658 ///
1659 /// # Performance Considerations
1660 ///
1661 /// - Use specific instrument IDs and time ranges to minimize data scanning.
1662 /// - WHERE clauses are pushed down to Parquet readers when possible.
1663 /// - Results are automatically sorted by timestamp during collection.
1664 /// - Memory usage scales with the amount of data returned.
1665 ///
1666 /// # Examples
1667 ///
1668 /// ```rust,no_run
1669 /// use nautilus_model::data::{QuoteTick, TradeTick, Bar};
1670 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1671 /// use nautilus_core::UnixNanos;
1672 ///
1673 /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1674 ///
1675 /// // Query all quotes for a specific instrument
1676 /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1677 /// Some(vec!["EUR/USD.SIM".to_string()]),
1678 /// None,
1679 /// None,
1680 /// None,
1681 /// None,
1682 /// true
1683 /// )?;
1684 ///
1685 /// // Query trades within a specific time range
1686 /// let trades: Vec<TradeTick> = catalog.query_typed_data(
1687 /// Some(vec!["BTC/USD.SIM".to_string()]),
1688 /// Some(UnixNanos::from(1609459200000000000)),
1689 /// Some(UnixNanos::from(1609545600000000000)),
1690 /// None,
1691 /// None,
1692 /// true
1693 /// )?;
1694 ///
1695 /// // Query bars with volume filter (using instrument_id - partial match for bar_type)
1696 /// let bars: Vec<Bar> = catalog.query_typed_data(
1697 /// Some(vec!["AAPL.NASDAQ".to_string()]),
1698 /// None,
1699 /// None,
1700 /// Some("volume > 1000000"),
1701 /// None,
1702 /// true
1703 /// )?;
1704 ///
1705 /// // Query bars with specific bar_type
1706 /// let bars: Vec<Bar> = catalog.query_typed_data(
1707 /// Some(vec!["AAPL.NASDAQ-1-MINUTE-LAST-EXTERNAL".to_string()]),
1708 /// None,
1709 /// None,
1710 /// None,
1711 /// None,
1712 /// true
1713 /// )?;
1714 ///
1715 /// // Query multiple instruments with price filter
1716 /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1717 /// Some(vec!["EUR/USD.SIM".to_string(), "GBP/USD.SIM".to_string()]),
1718 /// None,
1719 /// None,
1720 /// Some("bid_price > 1.2000 AND ask_price < 1.3000"),
1721 /// None,
1722 /// true
1723 /// )?;
1724 /// # Ok::<(), anyhow::Error>(())
1725 /// ```
1726 pub fn query_typed_data<T>(
1727 &mut self,
1728 identifiers: Option<Vec<String>>,
1729 start: Option<UnixNanos>,
1730 end: Option<UnixNanos>,
1731 where_clause: Option<&str>,
1732 files: Option<Vec<String>>,
1733 optimize_file_loading: bool,
1734 ) -> anyhow::Result<Vec<T>>
1735 where
1736 T: DecodeDataFromRecordBatch + CatalogPathPrefix + TryFrom<Data>,
1737 {
1738 // Reset session to allow repeated queries (streams are consumed on each query)
1739 self.reset_session();
1740
1741 let query_result = self.query::<T>(
1742 identifiers,
1743 start,
1744 end,
1745 where_clause,
1746 files,
1747 optimize_file_loading,
1748 )?;
1749 let all_data = query_result.collect();
1750
1751 // Convert Data enum variants to specific type T using to_variant
1752 Ok(to_variant::<T>(all_data))
1753 }
1754
1755 /// Queries typed records that are not represented by the [`Data`] enum.
1756 pub fn query_typed<T>(
1757 &mut self,
1758 identifiers: Option<Vec<String>>,
1759 start: Option<UnixNanos>,
1760 end: Option<UnixNanos>,
1761 where_clause: Option<&str>,
1762 files: Option<Vec<String>>,
1763 optimize_file_loading: bool,
1764 ) -> anyhow::Result<Vec<T>>
1765 where
1766 T: DecodeTypedFromRecordBatch + CatalogPathPrefix + HasTsInit,
1767 {
1768 self.reset_session();
1769
1770 self.register_remote_object_store()?;
1771
1772 let files_list = if let Some(files) = files {
1773 files
1774 } else {
1775 self.query_files(T::path_prefix(), identifiers, start, end)?
1776 };
1777
1778 let mut all_records = Vec::new();
1779
1780 if optimize_file_loading {
1781 let directories: HashSet<String> = files_list
1782 .iter()
1783 .filter_map(|file_uri| {
1784 Path::new(file_uri)
1785 .parent()
1786 .map(|path| path.to_string_lossy().to_string())
1787 })
1788 .collect();
1789
1790 for directory in directories {
1791 let path_parts: Vec<&str> = directory.split('/').collect();
1792 let identifier = if path_parts.is_empty() {
1793 "unknown".to_string()
1794 } else {
1795 path_parts[path_parts.len() - 1].to_string()
1796 };
1797 let safe_sql_identifier = make_sql_safe_identifier(&identifier);
1798 let table_name = format!("{}_{}", T::path_prefix(), safe_sql_identifier);
1799 let query = build_query(&table_name, start, end, where_clause);
1800 let resolved_path = self.resolve_directory_for_datafusion(&directory);
1801 let batches = self.session.collect_query_batches(
1802 &table_name,
1803 &resolved_path,
1804 Some(&query),
1805 )?;
1806
1807 all_records.extend(self.convert_record_batches_to_typed::<T>(batches)?);
1808 }
1809 } else {
1810 for file_uri in &files_list {
1811 let identifier = extract_identifier_from_path(file_uri);
1812 let safe_sql_identifier = make_sql_safe_identifier(&identifier);
1813 let safe_filename = extract_sql_safe_filename(file_uri);
1814 let table_name = format!(
1815 "{}_{}_{}",
1816 T::path_prefix(),
1817 safe_sql_identifier,
1818 safe_filename
1819 );
1820 let query = build_query(&table_name, start, end, where_clause);
1821 let resolved_path = self.resolve_path_for_datafusion(file_uri);
1822 let batches = self.session.collect_query_batches(
1823 &table_name,
1824 &resolved_path,
1825 Some(&query),
1826 )?;
1827
1828 all_records.extend(self.convert_record_batches_to_typed::<T>(batches)?);
1829 }
1830 }
1831
1832 if !is_monotonically_increasing_by_init(&all_records) {
1833 all_records.sort_by_key(|record| record.ts_init());
1834 }
1835
1836 Ok(all_records)
1837 }
1838
1839 /// Queries custom data dynamically by type name.
1840 ///
1841 /// This method allows querying custom data types without compile-time knowledge of the type.
1842 /// It uses dynamic schema decoding based on the type name stored in metadata.
1843 ///
1844 /// # Parameters
1845 ///
1846 /// - `type_name`: The name of the custom data type to query.
1847 /// - `identifiers`: Optional list of instrument identifiers to filter by.
1848 /// - `start`: Optional start timestamp for filtering.
1849 /// - `end`: Optional end timestamp for filtering.
1850 /// - `where_clause`: Optional SQL WHERE clause for additional filtering.
1851 /// - `files`: Optional list of specific files to query.
1852 /// - `_optimize_file_loading`: Whether to optimize file loading (currently unused).
1853 ///
1854 /// # Returns
1855 ///
1856 /// Returns a vector of `Data` enum variants containing the custom data.
1857 ///
1858 /// # Errors
1859 ///
1860 /// Returns an error if:
1861 /// - File discovery fails.
1862 /// - Data decoding fails.
1863 /// - Query execution fails.
1864 #[expect(clippy::too_many_arguments)]
1865 pub fn query_custom_data_dynamic(
1866 &mut self,
1867 type_name: &str,
1868 identifiers: Option<&[String]>,
1869 start: Option<UnixNanos>,
1870 end: Option<UnixNanos>,
1871 where_clause: Option<&str>,
1872 files: Option<Vec<String>>,
1873 _optimize_file_loading: bool,
1874 ) -> anyhow::Result<Vec<Data>> {
1875 self.reset_session();
1876
1877 self.register_remote_object_store()?;
1878
1879 let path_prefix = format!("custom/{type_name}");
1880
1881 let files = if let Some(f) = files {
1882 f.into_iter()
1883 .map(|p| self.to_object_path(&p).map(|op| op.to_string()))
1884 .collect::<anyhow::Result<Vec<_>>>()?
1885 } else {
1886 self.list_parquet_files_with_criteria(&path_prefix, identifiers, start, end)?
1887 };
1888
1889 if files.is_empty() {
1890 return Ok(Vec::new());
1891 }
1892
1893 let table_name = "custom_data_table";
1894
1895 // Use CustomDataDecoder for all custom data. Pass type_name so decode can look up
1896 // the type when Parquet/DataFusion does not preserve schema metadata. Callers must
1897 // ensure Rust custom types are registered via ensure_custom_data_registered::<T>().
1898 for file in files {
1899 let resolved_path = self.resolve_path_for_datafusion(&file);
1900 let sql_query = build_query(table_name, start, end, where_clause);
1901
1902 self.session
1903 .add_file::<CustomDataDecoder>(
1904 table_name,
1905 &resolved_path,
1906 Some(&sql_query),
1907 Some(type_name),
1908 )
1909 .map_err(|e| anyhow::anyhow!(e.to_string()))?;
1910 }
1911
1912 let query_result = self.session.get_query_result();
1913 Ok(query_result.collect())
1914 }
1915
1916 /// Queries all Parquet files for a specific data type and optional instrument IDs.
1917 ///
1918 /// This method finds all Parquet files that match the specified criteria and returns
1919 /// their full URIs. The files are filtered by data type, instrument IDs (if provided),
1920 /// and timestamp range (if provided).
1921 ///
1922 /// # Parameters
1923 ///
1924 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1925 /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings
1926 /// (e.g., "EUR/USD.SIM") or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1927 /// For bars, partial matching is supported.
1928 /// - `start`: Optional start timestamp to filter files by their time range.
1929 /// - `end`: Optional end timestamp to filter files by their time range.
1930 ///
1931 /// # Returns
1932 ///
1933 /// Returns a vector of file URI strings that match the query criteria,
1934 /// or an error if the query fails.
1935 ///
1936 /// # Errors
1937 ///
1938 /// Returns an error if:
1939 /// - The directory path cannot be constructed.
1940 /// - Object store listing operations fail.
1941 /// - URI reconstruction fails.
1942 ///
1943 /// # Examples
1944 ///
1945 /// ```rust,no_run
1946 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1947 /// use nautilus_core::UnixNanos;
1948 ///
1949 /// let catalog = ParquetDataCatalog::new(/* ... */);
1950 ///
1951 /// // Query all quote files
1952 /// let files = catalog.query_files("quotes", None, None, None)?;
1953 ///
1954 /// // Query trade files for specific instruments within a time range
1955 /// let files = catalog.query_files(
1956 /// "trades",
1957 /// Some(vec!["BTC/USD.SIM".to_string(), "ETH/USD.SIM".to_string()]),
1958 /// Some(UnixNanos::from(1609459200000000000)),
1959 /// Some(UnixNanos::from(1609545600000000000))
1960 /// )?;
1961 /// # Ok::<(), anyhow::Error>(())
1962 /// ```
1963 pub fn query_files(
1964 &self,
1965 data_cls: &str,
1966 identifiers: Option<Vec<String>>,
1967 start: Option<UnixNanos>,
1968 end: Option<UnixNanos>,
1969 ) -> anyhow::Result<Vec<String>> {
1970 let mut files = Vec::new();
1971
1972 let start_u64 = start.map(|s| s.as_u64());
1973 let end_u64 = end.map(|e| e.as_u64());
1974
1975 let base_dir = self.make_path(data_cls, None)?;
1976
1977 // Use recursive listing to match Python's glob behavior
1978 let list_result = self.execute_async(async {
1979 let prefix = ObjectPath::from(format!("{base_dir}/"));
1980 let mut stream = self.object_store.list(Some(&prefix));
1981 let mut objects = Vec::new();
1982 while let Some(object) = stream.next().await {
1983 objects.push(object?);
1984 }
1985 Ok::<Vec<_>, anyhow::Error>(objects)
1986 })?;
1987
1988 let mut file_paths: Vec<String> = list_result
1989 .into_iter()
1990 .filter_map(|object| {
1991 let path_str = object.location.to_string();
1992 if path_str.ends_with(".parquet") {
1993 Some(path_str)
1994 } else {
1995 None
1996 }
1997 })
1998 .collect();
1999
2000 // Apply identifier filtering if provided
2001 if let Some(identifiers) = identifiers {
2002 let safe_identifiers: Vec<String> = identifiers
2003 .iter()
2004 .map(|id| urisafe_instrument_id(id))
2005 .collect();
2006
2007 // Exact match by default for instrument_ids or bar_types
2008 let exact_match_file_paths: Vec<String> = file_paths
2009 .iter()
2010 .filter(|file_path| {
2011 // Extract the directory name (second to last path component)
2012 let path_parts: Vec<&str> = file_path.split('/').collect();
2013 if path_parts.len() >= 2 {
2014 let dir_name = path_parts[path_parts.len() - 2];
2015 safe_identifiers.iter().any(|safe_id| safe_id == dir_name)
2016 } else {
2017 false
2018 }
2019 })
2020 .cloned()
2021 .collect();
2022
2023 if exact_match_file_paths.is_empty() && data_cls == "bars" {
2024 file_paths.retain(|file_path| {
2025 let path_parts: Vec<&str> = file_path.split('/').collect();
2026 if path_parts.len() >= 2 {
2027 let dir_name = path_parts[path_parts.len() - 2];
2028 if let Some(bar_instrument_id) = extract_bar_type_instrument_id(dir_name) {
2029 safe_identifiers.iter().any(|id| id == bar_instrument_id)
2030 } else {
2031 false
2032 }
2033 } else {
2034 false
2035 }
2036 });
2037 } else {
2038 file_paths = exact_match_file_paths;
2039 }
2040 }
2041
2042 // Apply timestamp filtering
2043 file_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
2044
2045 for file_path in file_paths {
2046 files.push(self.path_for_query_list(&file_path));
2047 }
2048
2049 Ok(files)
2050 }
2051
2052 pub fn quote_ticks(
2053 &mut self,
2054 instrument_ids: Option<Vec<String>>,
2055 start: Option<UnixNanos>,
2056 end: Option<UnixNanos>,
2057 ) -> anyhow::Result<Vec<QuoteTick>> {
2058 self.query_typed_data::<QuoteTick>(instrument_ids, start, end, None, None, true)
2059 }
2060
2061 /// Queries trade tick data for the specified instrument(s) and time range.
2062 pub fn trade_ticks(
2063 &mut self,
2064 instrument_ids: Option<Vec<String>>,
2065 start: Option<UnixNanos>,
2066 end: Option<UnixNanos>,
2067 ) -> anyhow::Result<Vec<TradeTick>> {
2068 self.query_typed_data::<TradeTick>(instrument_ids, start, end, None, None, true)
2069 }
2070
2071 /// Queries bar data for the specified instrument(s) and time range.
2072 pub fn bars(
2073 &mut self,
2074 instrument_ids: Option<Vec<String>>,
2075 start: Option<UnixNanos>,
2076 end: Option<UnixNanos>,
2077 ) -> anyhow::Result<Vec<Bar>> {
2078 self.query_typed_data::<Bar>(instrument_ids, start, end, None, None, true)
2079 }
2080
2081 /// Queries order book delta data for the specified instrument(s) and time range.
2082 pub fn order_book_deltas(
2083 &mut self,
2084 instrument_ids: Option<Vec<String>>,
2085 start: Option<UnixNanos>,
2086 end: Option<UnixNanos>,
2087 ) -> anyhow::Result<Vec<OrderBookDelta>> {
2088 self.query_typed_data::<OrderBookDelta>(instrument_ids, start, end, None, None, true)
2089 }
2090
2091 /// Queries order book depth L10 data for the specified instrument(s) and time range.
2092 pub fn order_book_depth10(
2093 &mut self,
2094 instrument_ids: Option<Vec<String>>,
2095 start: Option<UnixNanos>,
2096 end: Option<UnixNanos>,
2097 ) -> anyhow::Result<Vec<OrderBookDepth10>> {
2098 self.query_typed_data::<OrderBookDepth10>(instrument_ids, start, end, None, None, true)
2099 }
2100
2101 /// Queries instrument close data for the specified instrument(s) and time range.
2102 pub fn instrument_closes(
2103 &mut self,
2104 instrument_ids: Option<Vec<String>>,
2105 start: Option<UnixNanos>,
2106 end: Option<UnixNanos>,
2107 ) -> anyhow::Result<Vec<InstrumentClose>> {
2108 self.query_typed_data::<InstrumentClose>(instrument_ids, start, end, None, None, true)
2109 }
2110
2111 /// Queries any instrument data for the specified instrument(s) and time range.
2112 pub fn instruments(
2113 &self,
2114 instrument_ids: Option<&[String]>,
2115 _start: Option<UnixNanos>,
2116 _end: Option<UnixNanos>,
2117 ) -> anyhow::Result<Vec<InstrumentAny>> {
2118 self.query_instruments(instrument_ids)
2119 }
2120
2121 /// Retrieves a list of file paths for a given data type.
2122 ///
2123 /// This method constructs a path pattern to find all parquet files
2124 /// associated with the specified data type in the catalog's directory structure.
2125 ///
2126 /// # Parameters
2127 ///
2128 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades", "bars").
2129 ///
2130 /// # Returns
2131 ///
2132 /// Returns a vector of file paths matching the data type, or an error if the operation fails.
2133 ///
2134 /// # Errors
2135 ///
2136 /// Returns an error if:
2137 /// - Object store listing operations fail.
2138 /// - Directory access is denied.
2139 ///
2140 /// # Examples
2141 ///
2142 /// ```rust,no_run
2143 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2144 ///
2145 /// let catalog = ParquetDataCatalog::new(/* ... */);
2146 /// let files = catalog.get_file_list_from_data_cls("quotes")?;
2147 ///
2148 /// for file in files {
2149 /// println!("Found file: {}", file);
2150 /// }
2151 /// # Ok::<(), anyhow::Error>(())
2152 /// ```
2153 pub fn get_file_list_from_data_cls(&self, data_cls: &str) -> anyhow::Result<Vec<String>> {
2154 let base_dir = self.make_path(data_cls, None)?;
2155
2156 let list_result = self.execute_async(async {
2157 let prefix = ObjectPath::from(format!("{base_dir}/"));
2158 let mut stream = self.object_store.list(Some(&prefix));
2159 let mut objects = Vec::new();
2160 while let Some(object) = stream.next().await {
2161 objects.push(object?);
2162 }
2163 Ok::<Vec<_>, anyhow::Error>(objects)
2164 })?;
2165
2166 let file_paths: Vec<String> = list_result
2167 .into_iter()
2168 .filter_map(|object| {
2169 let path_str = object.location.to_string();
2170 if path_str.ends_with(".parquet") {
2171 Some(path_str)
2172 } else {
2173 None
2174 }
2175 })
2176 .collect();
2177
2178 Ok(file_paths)
2179 }
2180
2181 /// Filters a list of file paths based on identifiers and time range.
2182 ///
2183 /// This method filters the provided file paths by:
2184 /// 1. Matching identifiers (exact match for instruments, prefix match for bars)
2185 /// 2. Intersecting with the specified time range
2186 ///
2187 /// # Parameters
2188 ///
2189 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades", "bars").
2190 /// - `file_paths`: List of file paths to filter.
2191 /// - `identifiers`: Optional list of identifiers to match against file paths.
2192 /// - `start`: Optional start timestamp for filtering.
2193 /// - `end`: Optional end timestamp for filtering.
2194 ///
2195 /// # Returns
2196 ///
2197 /// Returns a filtered vector of file paths that match the criteria.
2198 ///
2199 /// # Notes
2200 ///
2201 /// For Bar data types, if exact identifier matching fails, the function attempts
2202 /// partial matching by checking if the file's identifier starts with the provided identifier
2203 /// followed by a dash (to match bar type patterns).
2204 ///
2205 /// # Examples
2206 ///
2207 /// ```rust,no_run
2208 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2209 /// use nautilus_core::UnixNanos;
2210 ///
2211 /// let catalog = ParquetDataCatalog::new(/* ... */);
2212 /// let all_files = catalog.get_file_list_from_data_cls("quotes")?;
2213 ///
2214 /// let filtered = catalog.filter_files(
2215 /// "quotes",
2216 /// all_files,
2217 /// Some(vec!["EUR/USD.SIM".to_string()]),
2218 /// Some(UnixNanos::from(1609459200000000000)),
2219 /// Some(UnixNanos::from(1609545600000000000))
2220 /// )?;
2221 /// # Ok::<(), anyhow::Error>(())
2222 /// ```
2223 pub fn filter_files(
2224 &self,
2225 data_cls: &str,
2226 file_paths: Vec<String>,
2227 identifiers: Option<Vec<String>>,
2228 start: Option<UnixNanos>,
2229 end: Option<UnixNanos>,
2230 ) -> anyhow::Result<Vec<String>> {
2231 let mut filtered_paths = file_paths;
2232
2233 // Apply identifier filtering if provided
2234 if let Some(identifiers) = identifiers {
2235 let safe_identifiers: Vec<String> = identifiers
2236 .iter()
2237 .map(|id| urisafe_instrument_id(id))
2238 .collect();
2239
2240 // Extract directory names from file paths
2241 let file_safe_identifiers: Vec<String> = filtered_paths
2242 .iter()
2243 .map(|file_path| {
2244 let path_parts: Vec<&str> = file_path.split('/').collect();
2245 if path_parts.len() >= 2 {
2246 path_parts[path_parts.len() - 2].to_string()
2247 } else {
2248 String::new()
2249 }
2250 })
2251 .collect();
2252
2253 // Exact match by default for instrument_ids or bar_types
2254 let exact_match_file_paths: Vec<String> = filtered_paths
2255 .iter()
2256 .enumerate()
2257 .filter_map(|(i, file_path)| {
2258 let dir_name = &file_safe_identifiers[i];
2259 if safe_identifiers.iter().any(|safe_id| safe_id == dir_name) {
2260 Some(file_path.clone())
2261 } else {
2262 None
2263 }
2264 })
2265 .collect();
2266
2267 if exact_match_file_paths.is_empty() && data_cls == "bars" {
2268 // Partial match of instrument_ids in bar_types for bars
2269 filtered_paths.retain(|file_path| {
2270 let path_parts: Vec<&str> = file_path.split('/').collect();
2271 if path_parts.len() >= 2 {
2272 let dir_name = path_parts[path_parts.len() - 2];
2273 safe_identifiers
2274 .iter()
2275 .any(|safe_id| dir_name.starts_with(&format!("{safe_id}-")))
2276 } else {
2277 false
2278 }
2279 });
2280 } else {
2281 filtered_paths = exact_match_file_paths;
2282 }
2283 }
2284
2285 // Apply timestamp filtering
2286 let start_u64 = start.map(|s| s.as_u64());
2287 let end_u64 = end.map(|e| e.as_u64());
2288 filtered_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
2289
2290 Ok(filtered_paths)
2291 }
2292
2293 /// Finds the missing time intervals for a specific data type and instrument ID.
2294 ///
2295 /// This method compares a requested time range against the existing data coverage
2296 /// and returns the gaps that need to be filled. This is useful for determining
2297 /// what data needs to be fetched or backfilled.
2298 ///
2299 /// # Parameters
2300 ///
2301 /// - `start`: Start timestamp of the requested range (Unix nanoseconds).
2302 /// - `end`: End timestamp of the requested range (Unix nanoseconds).
2303 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
2304 /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
2305 ///
2306 /// # Returns
2307 ///
2308 /// Returns a vector of (start, end) tuples representing the missing intervals,
2309 /// or an error if the operation fails.
2310 ///
2311 /// # Errors
2312 ///
2313 /// Returns an error if:
2314 /// - The directory path cannot be constructed.
2315 /// - Interval retrieval fails.
2316 /// - Gap calculation fails.
2317 ///
2318 /// # Examples
2319 ///
2320 /// ```rust,no_run
2321 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2322 ///
2323 /// let catalog = ParquetDataCatalog::new(/* ... */);
2324 ///
2325 /// // Find missing intervals for quote data
2326 /// let missing = catalog.get_missing_intervals_for_request(
2327 /// 1609459200000000000, // start
2328 /// 1609545600000000000, // end
2329 /// "quotes",
2330 /// Some("BTCUSD".to_string())
2331 /// )?;
2332 ///
2333 /// for (start, end) in missing {
2334 /// println!("Missing data from {} to {}", start, end);
2335 /// }
2336 /// # Ok::<(), anyhow::Error>(())
2337 /// ```
2338 pub fn get_missing_intervals_for_request(
2339 &self,
2340 start: u64,
2341 end: u64,
2342 data_cls: &str,
2343 identifier: Option<&str>,
2344 ) -> anyhow::Result<Vec<(u64, u64)>> {
2345 let intervals = self.get_intervals(data_cls, identifier)?;
2346
2347 Ok(query_interval_diff(start, end, &intervals))
2348 }
2349
2350 /// Gets the first (earliest) timestamp for a specific data type and identifier.
2351 ///
2352 /// This method finds the earliest timestamp covered by existing data files for
2353 /// the specified data type and identifier. This is useful for determining
2354 /// the oldest data available or for incremental data updates.
2355 ///
2356 /// # Parameters
2357 ///
2358 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
2359 /// - `identifier`: Optional identifier to target a specific instrument's data. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
2360 ///
2361 /// # Returns
2362 ///
2363 /// Returns `Some(timestamp)` if data exists, `None` if no data is found,
2364 /// or an error if the operation fails.
2365 ///
2366 /// # Errors
2367 ///
2368 /// Returns an error if:
2369 /// - The directory path cannot be constructed.
2370 /// - Interval retrieval fails.
2371 ///
2372 /// # Note
2373 ///
2374 /// Unlike the Python implementation, this method does not check subclasses of the
2375 /// data type. The Python version checks `[data_cls, *data_cls.__subclasses__()]` to
2376 /// handle cases where subclasses might use different directory names. Since Rust
2377 /// works with string names rather than types, subclass checking is not possible.
2378 /// In practice, most subclasses map to the same directory name via `class_to_filename`,
2379 /// so this difference is typically not significant.
2380 ///
2381 /// # Examples
2382 ///
2383 /// ```rust,no_run
2384 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2385 ///
2386 /// let catalog = ParquetDataCatalog::new(/* ... */);
2387 ///
2388 /// // Get the first timestamp for quote data
2389 /// if let Some(first_ts) = catalog.query_first_timestamp("quotes", Some("BTCUSD".to_string()))? {
2390 /// println!("First quote timestamp: {}", first_ts);
2391 /// } else {
2392 /// println!("No quote data found");
2393 /// }
2394 /// # Ok::<(), anyhow::Error>(())
2395 /// ```
2396 pub fn query_first_timestamp(
2397 &self,
2398 data_cls: &str,
2399 identifier: Option<&str>,
2400 ) -> anyhow::Result<Option<u64>> {
2401 let intervals = self.get_intervals(data_cls, identifier)?;
2402
2403 if intervals.is_empty() {
2404 return Ok(None);
2405 }
2406
2407 Ok(Some(intervals.first().unwrap().0))
2408 }
2409
2410 /// Gets the last (most recent) timestamp for a specific data type and identifier.
2411 ///
2412 /// This method finds the latest timestamp covered by existing data files for
2413 /// the specified data type and identifier. This is useful for determining
2414 /// the most recent data available or for incremental data updates.
2415 ///
2416 /// # Parameters
2417 ///
2418 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
2419 /// - `identifier`: Optional identifier to target a specific instrument's data. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
2420 ///
2421 /// # Returns
2422 ///
2423 /// Returns `Some(timestamp)` if data exists, `None` if no data is found,
2424 /// or an error if the operation fails.
2425 ///
2426 /// # Errors
2427 ///
2428 /// Returns an error if:
2429 /// - The directory path cannot be constructed.
2430 /// - Interval retrieval fails.
2431 ///
2432 /// # Note
2433 ///
2434 /// Unlike the Python implementation, this method does not check subclasses of the
2435 /// data type. The Python version checks `[data_cls, *data_cls.__subclasses__()]` to
2436 /// handle cases where subclasses might use different directory names. Since Rust
2437 /// works with string names rather than types, subclass checking is not possible.
2438 /// In practice, most subclasses map to the same directory name via `class_to_filename`,
2439 /// so this difference is typically not significant.
2440 ///
2441 /// # Examples
2442 ///
2443 /// ```rust,no_run
2444 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2445 ///
2446 /// let catalog = ParquetDataCatalog::new(/* ... */);
2447 ///
2448 /// // Get the last timestamp for quote data
2449 /// if let Some(last_ts) = catalog.query_last_timestamp("quotes", Some("BTCUSD".to_string()))? {
2450 /// println!("Last quote timestamp: {}", last_ts);
2451 /// } else {
2452 /// println!("No quote data found");
2453 /// }
2454 /// # Ok::<(), anyhow::Error>(())
2455 /// ```
2456 pub fn query_last_timestamp(
2457 &self,
2458 data_cls: &str,
2459 identifier: Option<&str>,
2460 ) -> anyhow::Result<Option<u64>> {
2461 let intervals = self.get_intervals(data_cls, identifier)?;
2462
2463 if intervals.is_empty() {
2464 return Ok(None);
2465 }
2466
2467 Ok(Some(intervals.last().unwrap().1))
2468 }
2469
2470 /// Gets the time intervals covered by Parquet files for a specific data type and identifier.
2471 ///
2472 /// This method returns all time intervals covered by existing data files for the
2473 /// specified data type and identifier. The intervals are sorted by start time and
2474 /// represent the complete data coverage available.
2475 ///
2476 /// # Parameters
2477 ///
2478 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
2479 /// - `identifier`: Optional identifier to target a specific instrument's data. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
2480 ///
2481 /// # Returns
2482 ///
2483 /// Returns a vector of (start, end) tuples representing the covered intervals,
2484 /// sorted by start time, or an error if the operation fails.
2485 ///
2486 /// # Errors
2487 ///
2488 /// Returns an error if:
2489 /// - The directory path cannot be constructed.
2490 /// - Directory listing fails.
2491 /// - Filename parsing fails.
2492 ///
2493 /// # Examples
2494 ///
2495 /// ```rust,no_run
2496 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2497 ///
2498 /// let catalog = ParquetDataCatalog::new(/* ... */);
2499 ///
2500 /// // Get all intervals for quote data
2501 /// let intervals = catalog.get_intervals("quotes", Some("BTCUSD".to_string()))?;
2502 /// for (start, end) in intervals {
2503 /// println!("Data available from {} to {}", start, end);
2504 /// }
2505 /// # Ok::<(), anyhow::Error>(())
2506 /// ```
2507 pub fn get_intervals(
2508 &self,
2509 data_cls: &str,
2510 identifier: Option<&str>,
2511 ) -> anyhow::Result<Vec<(u64, u64)>> {
2512 let directory = self.make_path(data_cls, identifier)?;
2513 let intervals = self.get_directory_intervals(&directory)?;
2514
2515 if identifier.is_none() {
2516 // `get_directory_intervals` already recursed through every per-identifier
2517 // subdirectory via `object_store.list`, so intervals from different
2518 // identifiers can overlap. Merge overlaps into a disjoint sorted union
2519 // so callers like `query_last_timestamp` see the true max end and
2520 // `consolidate_data_by_period` sees contiguous coverage.
2521 let mut merged: Vec<(u64, u64)> = Vec::new();
2522
2523 for interval in intervals {
2524 if let Some(last) = merged.last_mut()
2525 && interval.0 <= last.1
2526 {
2527 last.1 = last.1.max(interval.1);
2528 continue;
2529 }
2530 merged.push(interval);
2531 }
2532
2533 return Ok(merged);
2534 }
2535
2536 // For bars, fall back to partial matching when the exact directory
2537 // doesn't exist (callers may pass an instrument_id like "EUR/USD.SIM"
2538 // but bars are stored under bar_type dirs like "EURUSD.SIM-1-MINUTE-...")
2539
2540 if !intervals.is_empty() || data_cls != "bars" {
2541 return Ok(intervals);
2542 }
2543
2544 let safe_id = urisafe_instrument_id(identifier.unwrap());
2545
2546 // Use relative path so list_directory_stems doesn't double-prefix
2547 // for remote catalogs (make_path already includes base_path)
2548 let bars_subdir = format!("data/{data_cls}");
2549 let subdirs = self.list_directory_stems(&bars_subdir)?;
2550
2551 let mut all_intervals = Vec::new();
2552
2553 for subdir in &subdirs {
2554 let decoded = urlencoding::decode(subdir).unwrap_or(Cow::Borrowed(subdir));
2555
2556 if extract_bar_type_instrument_id(&decoded) == Some(safe_id.as_str()) {
2557 // Use decoded name to avoid double percent-encoding
2558 // (to_object_path uses Path::from which re-encodes)
2559 let subdir_path = self.make_path(data_cls, Some(&decoded))?;
2560 all_intervals.extend(self.get_directory_intervals(&subdir_path)?);
2561 }
2562 }
2563
2564 all_intervals.sort_by_key(|&(start, _)| start);
2565
2566 // Merge overlapping intervals from different bar types so that
2567 // last().1 reliably gives the maximum end timestamp
2568 let mut merged: Vec<(u64, u64)> = Vec::new();
2569
2570 for interval in all_intervals {
2571 if let Some(last) = merged.last_mut()
2572 && interval.0 <= last.1
2573 {
2574 last.1 = last.1.max(interval.1);
2575 continue;
2576 }
2577 merged.push(interval);
2578 }
2579
2580 Ok(merged)
2581 }
2582
2583 /// Gets the time intervals covered by Parquet files in a specific directory.
2584 ///
2585 /// This method scans a directory for Parquet files and extracts the timestamp ranges
2586 /// from their filenames. It's used internally by other methods to determine data coverage
2587 /// and is essential for interval-based operations like gap detection and consolidation.
2588 ///
2589 /// # Parameters
2590 ///
2591 /// - `directory`: The directory path to scan for Parquet files.
2592 ///
2593 /// # Returns
2594 ///
2595 /// Returns a vector of (start, end) tuples representing the time intervals covered
2596 /// by files in the directory, sorted by start timestamp. Returns an empty vector
2597 /// if the directory doesn't exist or contains no valid Parquet files.
2598 ///
2599 /// # Errors
2600 ///
2601 /// Returns an error if:
2602 /// - Object store listing operations fail.
2603 /// - Directory access is denied.
2604 ///
2605 /// # Notes
2606 ///
2607 /// - Only files with valid timestamp-based filenames are included.
2608 /// - Files with unparsable names are silently ignored.
2609 /// - The method works with both local and remote object stores.
2610 /// - Results are automatically sorted by start timestamp.
2611 ///
2612 /// # Examples
2613 ///
2614 /// ```rust,no_run
2615 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2616 ///
2617 /// let catalog = ParquetDataCatalog::new(/* ... */);
2618 /// let intervals = catalog.get_directory_intervals("data/quotes/EURUSD")?;
2619 ///
2620 /// for (start, end) in intervals {
2621 /// println!("File covers {} to {}", start, end);
2622 /// }
2623 /// # Ok::<(), anyhow::Error>(())
2624 /// ```
2625 pub fn get_directory_intervals(&self, directory: &str) -> anyhow::Result<Vec<(u64, u64)>> {
2626 // Use object store for all operations
2627 // Convert directory to object path format (consistent with how files are written)
2628 // For local stores with empty base_path, to_object_path returns path as-is.
2629 // For remote stores, to_object_path preserves or prepends the catalog base path.
2630 let object_dir = self.to_object_path(directory)?;
2631 let list_result = self.execute_async(async {
2632 // Ensure trailing slash for directory listing
2633 let dir_str = format!("{}/", object_dir.as_ref());
2634 let prefix = ObjectPath::from(dir_str);
2635 let mut stream = self.object_store.list(Some(&prefix));
2636 let mut objects = Vec::new();
2637 while let Some(object) = stream.next().await {
2638 objects.push(object?);
2639 }
2640 Ok::<Vec<_>, anyhow::Error>(objects)
2641 })?;
2642
2643 let mut intervals = Vec::new();
2644
2645 for object in list_result {
2646 let path_str = object.location.to_string();
2647 if path_str.ends_with(".parquet")
2648 && let Some(interval) = parse_filename_timestamps(&path_str)
2649 {
2650 intervals.push(interval);
2651 }
2652 }
2653
2654 intervals.sort_by_key(|&(start, _)| start);
2655
2656 Ok(intervals)
2657 }
2658
2659 /// Constructs a directory path for storing data of a specific type and instrument.
2660 ///
2661 /// This method builds the hierarchical directory structure used by the catalog to organize
2662 /// data by type and instrument. The path follows the pattern: `{base_path}/data/{type_name}/{instrument_id}`.
2663 /// Instrument IDs are automatically converted to URI-safe format by removing forward slashes.
2664 ///
2665 /// # Parameters
2666 ///
2667 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
2668 /// - `identifier`: Optional identifier. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL"). If provided, creates a subdirectory for the identifier. If `None`, returns the path to the data type directory.
2669 ///
2670 /// # Returns
2671 ///
2672 /// Returns the constructed directory path as a string, or an error if path construction fails.
2673 ///
2674 /// # Errors
2675 ///
2676 /// Returns an error if:
2677 /// - The instrument ID contains invalid characters that cannot be made URI-safe.
2678 /// - Path construction fails due to system limitations.
2679 ///
2680 /// # Path Structure
2681 ///
2682 /// - Without identifier: `{base_path}/data/{type_name}`.
2683 /// - With identifier: `{base_path}/data/{type_name}/{safe_identifier}`.
2684 /// - If `base_path` is empty: `data/{type_name}[/{safe_identifier}]`.
2685 ///
2686 /// # Examples
2687 ///
2688 /// ```rust,no_run
2689 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2690 ///
2691 /// let catalog = ParquetDataCatalog::new(/* ... */);
2692 ///
2693 /// // Path for all quote data
2694 /// let quotes_path = catalog.make_path("quotes", None)?;
2695 /// // Returns: "/base/path/data/quotes"
2696 ///
2697 /// // Path for specific instrument quotes
2698 /// let eurusd_quotes = catalog.make_path("quotes", Some("EUR/USD".to_string()))?;
2699 /// // Returns: "/base/path/data/quotes/EURUSD" (slash removed)
2700 ///
2701 /// // Path for bar data with complex instrument ID
2702 /// let bars_path = catalog.make_path("bars", Some("BTC/USD-1H".to_string()))?;
2703 /// // Returns: "/base/path/data/bars/BTCUSD-1H"
2704 /// # Ok::<(), anyhow::Error>(())
2705 /// ```
2706 pub fn make_path(&self, type_name: &str, identifier: Option<&str>) -> anyhow::Result<String> {
2707 let mut components = vec!["data".to_string(), type_name.to_string()];
2708
2709 if let Some(id) = identifier {
2710 let safe_id = urisafe_instrument_id(id);
2711 components.push(safe_id);
2712 }
2713
2714 let path = make_object_store_path_owned(&self.base_path, components);
2715 Ok(path)
2716 }
2717
2718 /// Builds the directory path for custom data: `data/custom/{type_name}[/{identifier segments}]`.
2719 /// Identifier can contain `//` for subdirectories (normalized to `/`); path is safe for writing.
2720 pub fn make_path_custom_data(
2721 &self,
2722 type_name: &str,
2723 identifier: Option<&str>,
2724 ) -> anyhow::Result<String> {
2725 let components = custom_data_path_components(type_name, identifier);
2726 let path = make_object_store_path_owned(&self.base_path, components);
2727 Ok(path)
2728 }
2729
2730 /// Helper method to rename a parquet file by moving it via object store operations
2731 fn rename_parquet_file(
2732 &self,
2733 directory: &str,
2734 old_start: u64,
2735 old_end: u64,
2736 new_start: u64,
2737 new_end: u64,
2738 ) -> anyhow::Result<()> {
2739 let old_filename =
2740 timestamps_to_filename(UnixNanos::from(old_start), UnixNanos::from(old_end));
2741 let old_path = format!("{directory}/{old_filename}");
2742 let old_object_path = self.to_object_path(&old_path)?;
2743
2744 let new_filename =
2745 timestamps_to_filename(UnixNanos::from(new_start), UnixNanos::from(new_end));
2746 let new_path = format!("{directory}/{new_filename}");
2747 let new_object_path = self.to_object_path(&new_path)?;
2748
2749 self.move_file(&old_object_path, &new_object_path)
2750 }
2751
2752 /// Converts a catalog path string to an [`ObjectPath`] for object store operations.
2753 ///
2754 /// This method handles the conversion between catalog-relative paths and object store paths,
2755 /// taking into account the catalog's base path configuration. It automatically preserves the
2756 /// base path prefix for remote catalogs and strips it for local catalog paths.
2757 ///
2758 /// # Parameters
2759 ///
2760 /// - `path`: The catalog path string to convert. Can be absolute or relative.
2761 ///
2762 /// # Returns
2763 ///
2764 /// Returns an [`ObjectPath`] suitable for use with object store operations.
2765 ///
2766 /// # Path Handling
2767 ///
2768 /// - If `base_path` is empty, the path is used as-is.
2769 /// - If `base_path` is set for a remote catalog, it's preserved or prepended.
2770 /// - If `base_path` is set for a local catalog, it's stripped from the path if present.
2771 /// - Trailing slashes and backslashes are automatically handled.
2772 /// - The resulting path is relative to the object store root.
2773 /// - All paths are normalized to use forward slashes (object store convention).
2774 ///
2775 /// # Errors
2776 ///
2777 /// Returns an error for remote catalogs when `path` is a full URI whose scheme/host
2778 /// does not match the catalog's own root (cross-bucket misuse). Without this guard
2779 /// the caller could silently write to or read from the wrong bucket.
2780 ///
2781 /// # Examples
2782 ///
2783 /// Local catalog paths (absolute or relative) strip the catalog's base directory:
2784 ///
2785 /// ```rust,no_run
2786 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2787 /// # let catalog: ParquetDataCatalog = unimplemented!();
2788 /// let object_path = catalog.to_object_path("/base/data/quotes/file.parquet")?;
2789 /// // ObjectPath("data/quotes/file.parquet")
2790 /// # Ok::<(), anyhow::Error>(())
2791 /// ```
2792 ///
2793 /// Remote catalog paths (relative or full URI) preserve or prepend the base prefix:
2794 ///
2795 /// ```rust,no_run
2796 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2797 /// # let catalog: ParquetDataCatalog = unimplemented!();
2798 /// let object_path = catalog.to_object_path("data/trades/file.parquet")?;
2799 /// // ObjectPath("base/data/trades/file.parquet")
2800 /// # Ok::<(), anyhow::Error>(())
2801 /// ```
2802 pub fn to_object_path(&self, path: &str) -> anyhow::Result<ObjectPath> {
2803 Ok(ObjectPath::from(self.object_store_path(path)?))
2804 }
2805
2806 fn register_remote_object_store(&mut self) -> anyhow::Result<()> {
2807 if self.is_remote_uri() {
2808 let base_url = remote_store_root_url(&self.original_uri)?;
2809 self.session
2810 .register_object_store(&base_url, self.object_store.clone());
2811 }
2812
2813 Ok(())
2814 }
2815
2816 /// Converts a path string to [`ObjectPath`] using parse (no percent-encoding).
2817 ///
2818 /// Use this for paths that were returned by the object store (e.g. from `list()`),
2819 /// which may already be percent-encoded. Using [`Self::to_object_path`] (which uses
2820 /// `Path::from`) on such paths would double-encode (e.g. `%5E` -> `%255E`).
2821 ///
2822 /// # Errors
2823 ///
2824 /// Returns an error for the same cross-bucket case as [`Self::to_object_path`], or
2825 /// when the resulting string fails [`ObjectPath::parse`].
2826 pub fn to_object_path_parsed(&self, path: &str) -> anyhow::Result<ObjectPath> {
2827 let to_parse = self.object_store_path(path)?;
2828 ObjectPath::parse(&to_parse).map_err(anyhow::Error::from)
2829 }
2830
2831 fn object_store_path(&self, path: &str) -> anyhow::Result<String> {
2832 let normalized_path = path.replace('\\', "/");
2833
2834 if self.is_remote_uri() {
2835 if normalized_path.contains("://") {
2836 let path_under_root = self.remote_uri_object_path(&normalized_path)?;
2837 return Ok(self.path_under_base(&path_under_root));
2838 }
2839
2840 return Ok(self.path_under_base(&normalized_path));
2841 }
2842
2843 Ok(self.path_without_local_base(&normalized_path))
2844 }
2845
2846 fn remote_uri_object_path(&self, path: &str) -> anyhow::Result<String> {
2847 let path_url = url::Url::parse(path)
2848 .map_err(|e| anyhow::anyhow!("Failed to parse object store URI {path}: {e}"))?;
2849 if !is_remote_uri_scheme(path_url.scheme()) {
2850 anyhow::bail!(
2851 "URI {path} uses non-remote scheme {} for remote catalog at {}",
2852 path_url.scheme(),
2853 self.original_uri,
2854 );
2855 }
2856
2857 let catalog_root = remote_store_root_url(&self.original_uri)?;
2858 let path_root = remote_store_root_url(path)?;
2859 if catalog_root.as_str().trim_end_matches('/') != path_root.as_str().trim_end_matches('/') {
2860 anyhow::bail!(
2861 "Cross-store URI {path} (root {}) does not belong to catalog rooted at {} ({})",
2862 path_root.as_str().trim_end_matches('/'),
2863 self.original_uri,
2864 catalog_root.as_str().trim_end_matches('/'),
2865 );
2866 }
2867
2868 // The URL crate keeps the path component percent-encoded (e.g. `%5E`),
2869 // so preserve that encoding for `ObjectPath::parse` round-trips through
2870 // `object_store::list`/`get`.
2871 Ok(path_url.path().trim_start_matches('/').to_string())
2872 }
2873
2874 fn path_without_local_base(&self, path: &str) -> String {
2875 let base_path = if self.base_path.is_empty() {
2876 self.native_base_path_string()
2877 } else {
2878 self.base_path.clone()
2879 };
2880 let normalized_base = base_path.replace('\\', "/");
2881 let base = normalized_base.trim_end_matches('/');
2882
2883 if base.is_empty() {
2884 path.to_string()
2885 } else if path == base {
2886 String::new()
2887 } else if let Some(without_base) = path.strip_prefix(&format!("{base}/")) {
2888 without_base.to_string()
2889 } else {
2890 path.to_string()
2891 }
2892 }
2893
2894 fn path_under_base(&self, path: &str) -> String {
2895 let normalized_path = path.replace('\\', "/");
2896 let path = normalized_path
2897 .trim_start_matches('/')
2898 .trim_end_matches('/');
2899
2900 if self.base_path.is_empty() {
2901 return path.to_string();
2902 }
2903
2904 let normalized_base = self.base_path.replace('\\', "/");
2905 let base = normalized_base
2906 .trim_start_matches('/')
2907 .trim_end_matches('/');
2908
2909 if base.is_empty() || path == base || path.starts_with(&format!("{base}/")) {
2910 path.to_string()
2911 } else if path.is_empty() {
2912 base.to_string()
2913 } else {
2914 make_object_store_path(base, &[path])
2915 }
2916 }
2917
2918 #[allow(dead_code)]
2919 fn to_file_path(&self, path: &ObjectPath) -> String {
2920 path.to_string()
2921 }
2922
2923 /// Helper method to move a file using object store rename operation
2924 pub fn move_file(&self, old_path: &ObjectPath, new_path: &ObjectPath) -> anyhow::Result<()> {
2925 self.execute_async(async {
2926 self.object_store
2927 .rename(old_path, new_path)
2928 .await
2929 .map_err(anyhow::Error::from)
2930 })
2931 }
2932
2933 /// Helper method to execute async operations with a runtime
2934 pub fn execute_async<F, R>(&self, future: F) -> anyhow::Result<R>
2935 where
2936 F: std::future::Future<Output = anyhow::Result<R>>,
2937 {
2938 let rt = get_runtime();
2939 rt.block_on(future)
2940 }
2941
2942 /// Lists directory stems (directory names without path) in a subdirectory.
2943 ///
2944 /// This method scans a subdirectory and returns the names of all immediate
2945 /// subdirectories. It's used to list data types, backtest runs, and live runs.
2946 ///
2947 /// # Parameters
2948 ///
2949 /// - `subdirectory`: The subdirectory path to scan (e.g., "data", "backtest", "live").
2950 ///
2951 /// # Returns
2952 ///
2953 /// Returns a vector of directory names (stems) found in the subdirectory,
2954 /// or an error if the operation fails.
2955 ///
2956 /// # Errors
2957 ///
2958 /// Returns an error if:
2959 /// - Object store listing operations fail.
2960 /// - Directory access is denied.
2961 ///
2962 /// # Examples
2963 ///
2964 /// ```rust,no_run
2965 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2966 ///
2967 /// let catalog = ParquetDataCatalog::new(/* ... */);
2968 ///
2969 /// // List all data types
2970 /// let data_types = catalog.list_directory_stems("data")?;
2971 /// for data_type in data_types {
2972 /// println!("Found data type: {}", data_type);
2973 /// }
2974 /// # Ok::<(), anyhow::Error>(())
2975 /// ```
2976 pub fn list_directory_stems(&self, subdirectory: &str) -> anyhow::Result<Vec<String>> {
2977 // For local filesystem paths, use filesystem operations to detect empty directories
2978 // For remote object stores, we can only list directories that contain files
2979 if !self.is_remote_uri() {
2980 let directory = PathBuf::from(self.native_base_path_string()).join(subdirectory);
2981
2982 // Check if directory exists
2983 if !directory.exists() {
2984 return Ok(Vec::new());
2985 }
2986
2987 // List all entries in the directory
2988 let mut directories = Vec::new();
2989
2990 if let Ok(entries) = std::fs::read_dir(&directory) {
2991 for entry in entries.flatten() {
2992 if let Ok(file_type) = entry.file_type()
2993 && file_type.is_dir()
2994 {
2995 // Use file_name() to get the directory name (not file_stem which removes extension)
2996 if let Some(name) = entry.path().file_name() {
2997 directories.push(name.to_string_lossy().to_string());
2998 }
2999 }
3000 }
3001 }
3002 directories.sort();
3003 return Ok(directories);
3004 }
3005
3006 // For remote URIs, use object store listing (only lists directories with files)
3007 let directory = make_object_store_path(&self.base_path, &[subdirectory]);
3008
3009 let list_result = self.execute_async(async {
3010 let prefix = ObjectPath::from(format!("{directory}/"));
3011 let mut stream = self.object_store.list(Some(&prefix));
3012 let mut directories = Vec::new();
3013 let mut seen_dirs = std::collections::HashSet::new();
3014
3015 while let Some(object) = stream.next().await {
3016 let object = object?;
3017 let path_str = object.location.to_string();
3018
3019 // Extract the immediate subdirectory name
3020 if let Some(relative_path) = path_str.strip_prefix(&format!("{directory}/")) {
3021 let parts: Vec<&str> = relative_path.split('/').collect();
3022 if let Some(first_part) = parts.first()
3023 && !first_part.is_empty()
3024 && !seen_dirs.contains(*first_part)
3025 {
3026 seen_dirs.insert(first_part.to_string());
3027 directories.push(first_part.to_string());
3028 }
3029 }
3030 }
3031
3032 Ok::<Vec<String>, anyhow::Error>(directories)
3033 })?;
3034
3035 Ok(list_result)
3036 }
3037
3038 /// Lists all data types available in the catalog.
3039 ///
3040 /// This method returns the names of all data type directories in the catalog.
3041 /// Data types correspond to different kinds of market data (e.g., "quotes", "trades", "bars").
3042 ///
3043 /// # Returns
3044 ///
3045 /// Returns a vector of data type names, or an error if the operation fails.
3046 ///
3047 /// # Errors
3048 ///
3049 /// Returns an error if:
3050 /// - Object store listing operations fail.
3051 /// - Directory access is denied.
3052 ///
3053 /// # Examples
3054 ///
3055 /// ```rust,no_run
3056 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
3057 ///
3058 /// let catalog = ParquetDataCatalog::new(/* ... */);
3059 ///
3060 /// // List all data types
3061 /// let data_types = catalog.list_data_types()?;
3062 /// for data_type in data_types {
3063 /// println!("Available data type: {}", data_type);
3064 /// }
3065 /// # Ok::<(), anyhow::Error>(())
3066 /// ```
3067 ///
3068 pub fn list_data_types(&self) -> anyhow::Result<Vec<String>> {
3069 self.list_directory_stems("data")
3070 }
3071
3072 /// Data types that are not persisted by the Rust feather writer or catalog.
3073 fn is_excluded_stream_data_type(_name: &str) -> bool {
3074 false
3075 }
3076
3077 /// Lists all backtest run IDs available in the catalog.
3078 ///
3079 /// This method returns the names of all backtest run directories in the catalog.
3080 /// Each backtest run corresponds to a specific backtest execution instance.
3081 ///
3082 /// # Returns
3083 ///
3084 /// Returns a vector of backtest run IDs, or an error if the operation fails.
3085 ///
3086 /// # Errors
3087 ///
3088 /// Returns an error if:
3089 /// - Object store listing operations fail.
3090 /// - Directory access is denied.
3091 ///
3092 /// # Examples
3093 ///
3094 /// ```rust,no_run
3095 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
3096 ///
3097 /// let catalog = ParquetDataCatalog::new(/* ... */);
3098 ///
3099 /// // List all backtest runs
3100 /// let runs = catalog.list_backtest_runs()?;
3101 /// for run_id in runs {
3102 /// println!("Backtest run: {}", run_id);
3103 /// }
3104 /// # Ok::<(), anyhow::Error>(())
3105 /// ```
3106 pub fn list_backtest_runs(&self) -> anyhow::Result<Vec<String>> {
3107 self.list_directory_stems("backtest")
3108 }
3109
3110 /// Lists all live run IDs available in the catalog.
3111 ///
3112 /// This method returns the names of all live run directories in the catalog.
3113 /// Each live run corresponds to a specific live trading execution instance.
3114 ///
3115 /// # Returns
3116 ///
3117 /// Returns a vector of live run IDs, or an error if the operation fails.
3118 ///
3119 /// # Errors
3120 ///
3121 /// Returns an error if:
3122 /// - Object store listing operations fail.
3123 /// - Directory access is denied.
3124 ///
3125 /// # Examples
3126 ///
3127 /// ```rust,no_run
3128 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
3129 ///
3130 /// let catalog = ParquetDataCatalog::new(/* ... */);
3131 ///
3132 /// // List all live runs
3133 /// let runs = catalog.list_live_runs()?;
3134 /// for run_id in runs {
3135 /// println!("Live run: {}", run_id);
3136 /// }
3137 /// # Ok::<(), anyhow::Error>(())
3138 /// ```
3139 pub fn list_live_runs(&self) -> anyhow::Result<Vec<String>> {
3140 self.list_directory_stems("live")
3141 }
3142
3143 /// Reads data from a live run instance.
3144 ///
3145 /// This method reads all data associated with a specific live run instance
3146 /// from feather files stored in the catalog.
3147 ///
3148 /// # Parameters
3149 ///
3150 /// - `instance_id`: The ID of the live run instance to read.
3151 ///
3152 /// # Returns
3153 ///
3154 /// Returns a vector of `Data` objects from the live run, sorted by timestamp,
3155 /// or an error if the operation fails.
3156 ///
3157 /// # Errors
3158 ///
3159 /// Returns an error if:
3160 /// - The instance ID doesn't exist.
3161 /// - Feather file reading fails.
3162 /// - Data deserialization fails.
3163 ///
3164 /// # Note
3165 ///
3166 /// This method is currently not fully implemented. Feather file reading
3167 /// requires complex deserialization logic that needs to be added.
3168 ///
3169 /// # Examples
3170 ///
3171 /// ```rust,no_run
3172 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
3173 ///
3174 /// let catalog = ParquetDataCatalog::new(/* ... */);
3175 ///
3176 /// // Read data from a live run
3177 /// let data = catalog.read_live_run("instance-123")?;
3178 /// for item in data {
3179 /// println!("Data: {:?}", item);
3180 /// }
3181 /// # Ok::<(), anyhow::Error>(())
3182 /// ```
3183 pub fn read_live_run(&self, instance_id: &str) -> anyhow::Result<Vec<Data>> {
3184 self.read_run_data("live", instance_id)
3185 }
3186
3187 /// Reads data from a backtest run instance.
3188 ///
3189 /// This method reads all data associated with a specific backtest run instance
3190 /// from feather files stored in the catalog.
3191 ///
3192 /// # Parameters
3193 ///
3194 /// - `instance_id`: The ID of the backtest run instance to read.
3195 ///
3196 /// # Returns
3197 ///
3198 /// Returns a vector of `Data` objects from the backtest run, sorted by timestamp,
3199 /// or an error if the operation fails.
3200 ///
3201 /// # Errors
3202 ///
3203 /// Returns an error if:
3204 /// - The instance ID doesn't exist.
3205 /// - Feather file reading fails.
3206 /// - Data deserialization fails.
3207 ///
3208 /// # Examples
3209 ///
3210 /// ```rust,no_run
3211 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
3212 ///
3213 /// let catalog = ParquetDataCatalog::new(/* ... */);
3214 ///
3215 /// // Read data from a backtest run
3216 /// let data = catalog.read_backtest("instance-123")?;
3217 /// for item in data {
3218 /// println!("Data: {:?}", item);
3219 /// }
3220 /// # Ok::<(), anyhow::Error>(())
3221 /// ```
3222 pub fn read_backtest(&self, instance_id: &str) -> anyhow::Result<Vec<Data>> {
3223 self.read_run_data("backtest", instance_id)
3224 }
3225
3226 /// Helper function to read data from a run instance (backtest or live).
3227 ///
3228 /// This function reads all data associated with a specific run instance
3229 /// from feather files stored in the catalog.
3230 ///
3231 /// # Parameters
3232 ///
3233 /// - `subdirectory`: The subdirectory name ("backtest" or "live").
3234 /// - `instance_id`: The ID of the run instance to read.
3235 ///
3236 /// # Returns
3237 ///
3238 /// Returns a vector of `Data` objects from the run, sorted by timestamp,
3239 /// or an error if the operation fails.
3240 fn read_run_data(&self, subdirectory: &str, instance_id: &str) -> anyhow::Result<Vec<Data>> {
3241 // List all data types in the instance directory
3242 let instance_dir = make_object_store_path(&self.base_path, &[subdirectory, instance_id]);
3243
3244 // List directories under the instance directory
3245 let data_types = if self.is_remote_uri() {
3246 // For remote URIs, use object store listing
3247
3248 self.execute_async(async {
3249 let prefix = ObjectPath::from(format!("{instance_dir}/"));
3250 let mut stream = self.object_store.list(Some(&prefix));
3251 let mut directories = Vec::new();
3252 let mut seen_dirs = std::collections::HashSet::new();
3253
3254 while let Some(object) = stream.next().await {
3255 let object = object?;
3256 let path_str = object.location.to_string();
3257
3258 // Extract the immediate subdirectory name
3259 if let Some(relative_path) = path_str.strip_prefix(&format!("{instance_dir}/"))
3260 {
3261 let parts: Vec<&str> = relative_path.split('/').collect();
3262 if let Some(first_part) = parts.first()
3263 && !first_part.is_empty()
3264 && !seen_dirs.contains(*first_part)
3265 {
3266 seen_dirs.insert(first_part.to_string());
3267 directories.push(first_part.to_string());
3268 }
3269 }
3270 }
3271
3272 Ok::<Vec<String>, anyhow::Error>(directories)
3273 })?
3274 } else {
3275 // For local filesystem paths
3276 let directory = PathBuf::from(self.native_base_path_string())
3277 .join(subdirectory)
3278 .join(instance_id);
3279
3280 if !directory.exists() {
3281 return Ok(Vec::new());
3282 }
3283
3284 let mut directories = Vec::new();
3285
3286 if let Ok(entries) = std::fs::read_dir(&directory) {
3287 for entry in entries.flatten() {
3288 if let Ok(file_type) = entry.file_type()
3289 && file_type.is_dir()
3290 && let Some(name) = entry.path().file_name()
3291 {
3292 directories.push(name.to_string_lossy().to_string());
3293 }
3294 }
3295 }
3296 directories.sort();
3297 directories
3298 };
3299
3300 if data_types.is_empty() {
3301 // No data types found - return empty vector
3302 return Ok(Vec::new());
3303 }
3304
3305 let mut all_data: Vec<Data> = Vec::new();
3306
3307 // Process each persisted data type.
3308 for data_cls in data_types
3309 .into_iter()
3310 .filter(|s| !Self::is_excluded_stream_data_type(s))
3311 {
3312 // List all feather files for this data type
3313 let feather_files = self.list_feather_files(
3314 subdirectory,
3315 instance_id,
3316 &data_cls,
3317 None, // No identifier filtering - read all
3318 )?;
3319
3320 if feather_files.is_empty() {
3321 continue; // Skip if no files found
3322 }
3323
3324 // Process each feather file
3325 for file_path in feather_files {
3326 // Read the feather file (may contain multiple batches)
3327 let batches = self.read_feather_file(&file_path)?;
3328
3329 if batches.is_empty() {
3330 continue; // Skip empty or invalid files
3331 }
3332
3333 // Convert RecordBatches to Data objects based on data_cls
3334 let file_data: Vec<Data> = match data_cls.as_str() {
3335 "quotes" => {
3336 let quotes: Vec<QuoteTick> =
3337 self.convert_record_batches_to_data(batches, false)?;
3338 quotes.into_iter().map(Data::from).collect()
3339 }
3340 "trades" => {
3341 let trades: Vec<TradeTick> =
3342 self.convert_record_batches_to_data(batches, false)?;
3343 trades.into_iter().map(Data::from).collect()
3344 }
3345 "order_book_deltas" => {
3346 let deltas: Vec<OrderBookDelta> =
3347 self.convert_record_batches_to_data(batches, false)?;
3348 deltas.into_iter().map(Data::from).collect()
3349 }
3350 "order_book_depths" => {
3351 let depths: Vec<OrderBookDepth10> =
3352 self.convert_record_batches_to_data(batches, false)?;
3353 depths.into_iter().map(Data::from).collect()
3354 }
3355 "bars" => {
3356 let bars: Vec<Bar> = self.convert_record_batches_to_data(batches, false)?;
3357 bars.into_iter().map(Data::from).collect()
3358 }
3359 "index_prices" => {
3360 let prices: Vec<IndexPriceUpdate> =
3361 self.convert_record_batches_to_data(batches, false)?;
3362 prices.into_iter().map(Data::from).collect()
3363 }
3364 "mark_prices" => {
3365 let prices: Vec<MarkPriceUpdate> =
3366 self.convert_record_batches_to_data(batches, false)?;
3367 prices.into_iter().map(Data::from).collect()
3368 }
3369 "instrument_status" => {
3370 let statuses: Vec<InstrumentStatus> =
3371 self.convert_record_batches_to_data(batches, false)?;
3372 statuses.into_iter().map(Data::from).collect()
3373 }
3374 "instrument_closes" => {
3375 let closes: Vec<InstrumentClose> =
3376 self.convert_record_batches_to_data(batches, false)?;
3377 closes.into_iter().map(Data::from).collect()
3378 }
3379 _ => {
3380 if data_cls.starts_with("custom/") {
3381 self.decode_custom_batches_to_data(batches, false)?
3382 } else {
3383 // Unknown data type - skip it
3384 continue;
3385 }
3386 }
3387 };
3388
3389 all_data.extend(file_data);
3390 }
3391 }
3392
3393 // Sort all data by timestamp (ts_init)
3394 all_data.sort_by(|a, b| {
3395 let ts_a = a.ts_init();
3396 let ts_b = b.ts_init();
3397 ts_a.cmp(&ts_b)
3398 });
3399
3400 Ok(all_data)
3401 }
3402
3403 /// Decodes multiple record batches of custom data (data_cls starts with "custom/") into a single
3404 /// `Vec<Data>`. Optionally replaces `ts_init` column with `ts_event` before decoding.
3405 ///
3406 /// # Errors
3407 ///
3408 /// Returns an error if any batch fails to decode.
3409 fn decode_custom_batches_to_data(
3410 &self,
3411 batches: Vec<RecordBatch>,
3412 use_ts_event_for_ts_init: bool,
3413 ) -> anyhow::Result<Vec<Data>> {
3414 orchestration_decode_custom_batches_to_data(batches, use_ts_event_for_ts_init)
3415 }
3416
3417 /// Decodes a RecordBatch to Data objects based on metadata.
3418 ///
3419 /// This method determines the data type from metadata and decodes the batch accordingly.
3420 /// It supports both standard data types and custom data types when `allow_custom_fallback`
3421 /// is true (e.g. when called from `decode_custom_batches_to_data` for files under
3422 /// `custom/`). When false, unknown type names produce an error instead of attempting
3423 /// custom decode, so malformed or typo'd built-in metadata fails explicitly.
3424 ///
3425 /// # Parameters
3426 ///
3427 /// - `metadata`: Schema metadata containing type information.
3428 /// - `batch`: The RecordBatch to decode.
3429 /// - `allow_custom_fallback`: If true, unknown type_name is decoded via custom data
3430 /// registry; if false, unknown type_name returns an error.
3431 ///
3432 /// # Returns
3433 ///
3434 /// Returns a vector of Data enum variants.
3435 ///
3436 /// # Errors
3437 ///
3438 /// Returns an error if decoding fails or the type is unknown (and custom fallback not allowed).
3439 #[allow(dead_code)] // used by tests
3440 fn decode_batch_to_data(
3441 &self,
3442 metadata: &std::collections::HashMap<String, String>,
3443 batch: RecordBatch,
3444 allow_custom_fallback: bool,
3445 ) -> anyhow::Result<Vec<Data>> {
3446 orchestration_decode_batch_to_data(metadata, batch, allow_custom_fallback)
3447 }
3448
3449 /// Converts stream data from feather files to parquet files.
3450 ///
3451 /// This method reads data from feather files generated during a backtest or live run
3452 /// and writes it to the catalog in parquet format. It's useful for converting temporary
3453 /// stream data into a more permanent and queryable format.
3454 ///
3455 /// # Parameters
3456 ///
3457 /// - `instance_id`: The ID of the backtest or live run instance.
3458 /// - `data_cls`: The data class name (e.g., "quotes", "trades", "bars").
3459 /// - `subdirectory`: The subdirectory containing the feather files. Either "backtest" or "live" (default: "backtest").
3460 /// - `identifiers`: Optional list of identifiers to filter by (instrument IDs or bar types).
3461 /// - `use_ts_event_for_ts_init`: If true, replaces the `ts_init` column with `ts_event` column values before deserializing.
3462 ///
3463 /// # Returns
3464 ///
3465 /// Returns `Ok(())` on success, or an error if the operation fails.
3466 ///
3467 /// # Errors
3468 ///
3469 /// Returns an error if:
3470 /// - The instance ID doesn't exist.
3471 /// - Feather file listing fails.
3472 /// - Feather file reading fails.
3473 /// - Data deserialization fails.
3474 /// - Writing to parquet fails.
3475 ///
3476 /// # Note
3477 ///
3478 /// This method is currently not fully implemented. It requires:
3479 /// - Listing feather files in the specified subdirectory
3480 /// - Reading feather files (Arrow IPC stream reading)
3481 /// - Converting Arrow tables to Nautilus data objects
3482 /// - Writing data to the catalog using existing write methods
3483 ///
3484 /// # Examples
3485 ///
3486 /// ```rust,no_run
3487 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
3488 ///
3489 /// let mut catalog = ParquetDataCatalog::new(/* ... */);
3490 ///
3491 /// // Convert backtest stream data to parquet
3492 /// catalog.convert_stream_to_data(
3493 /// "instance-123",
3494 /// "quotes",
3495 /// Some("backtest"),
3496 /// None,
3497 /// false
3498 /// )?;
3499 /// # Ok::<(), anyhow::Error>(())
3500 /// ```
3501 /// Lists feather files for a specific data class in a subdirectory.
3502 ///
3503 /// This helper function finds all `.feather` files in the specified subdirectory
3504 /// (backtest or live) for the given instance ID and data class.
3505 fn list_feather_files(
3506 &self,
3507 subdirectory: &str,
3508 instance_id: &str,
3509 data_name: &str,
3510 identifiers: Option<&[String]>,
3511 ) -> anyhow::Result<Vec<String>> {
3512 // Construct the base directory path: {subdirectory}/{instance_id}/{data_name}
3513 let base_dir = make_object_store_path(&self.base_path, &[subdirectory, instance_id]);
3514 let data_dir = make_object_store_path(&base_dir, &[data_name]);
3515
3516 let mut files = Vec::new();
3517
3518 // Try to list files in the data directory (for per-instrument subdirectories)
3519 let subdir_prefix = ObjectPath::from(format!("{data_dir}/"));
3520 let list_result = self.execute_async(async {
3521 let mut stream = self.object_store.list(Some(&subdir_prefix));
3522 let mut subdirs = Vec::new();
3523 let mut flat_files = Vec::new();
3524
3525 while let Some(object) = stream.next().await {
3526 let object = object?;
3527 let path_str = object.location.to_string();
3528
3529 // Check if this is a subdirectory (per-instrument) or a flat file
3530 if let Some(relative_path) = path_str.strip_prefix(&format!("{data_dir}/")) {
3531 if relative_path.ends_with(".feather") {
3532 // Flat file format: {data_name}_*.feather
3533 if path_str.contains(&format!("{data_name}_")) {
3534 flat_files.push(path_str);
3535 }
3536 } else {
3537 // This might be a subdirectory - check if it contains feather files
3538 let subdir_path = format!("{path_str}/");
3539 let mut subdir_stream = self
3540 .object_store
3541 .list(Some(&ObjectPath::from(subdir_path.as_str())));
3542
3543 while let Some(subdir_object) = subdir_stream.next().await {
3544 let subdir_object = subdir_object?;
3545 let subdir_file_path = subdir_object.location.to_string();
3546
3547 if subdir_file_path.ends_with(".feather") {
3548 // Check identifier filter if provided
3549 if let Some(identifiers) = identifiers {
3550 let subdir_name = relative_path.split('/').next().unwrap_or("");
3551 if !identifiers.iter().any(|id| subdir_name.contains(id)) {
3552 continue;
3553 }
3554 }
3555 subdirs.push(subdir_file_path);
3556 }
3557 }
3558 }
3559 }
3560 }
3561
3562 Ok::<Vec<String>, anyhow::Error>([subdirs, flat_files].concat())
3563 })?;
3564
3565 files.extend(list_result);
3566 files.sort();
3567 Ok(files)
3568 }
3569
3570 /// Reads a feather file and returns all RecordBatches.
3571 ///
3572 /// This function reads an Arrow IPC stream file from the object store
3573 /// and returns all RecordBatches contained within it.
3574 fn read_feather_file(&self, file_path: &str) -> anyhow::Result<Vec<RecordBatch>> {
3575 use datafusion::arrow::ipc::reader::StreamReader;
3576
3577 let bytes = self.execute_async(async {
3578 let path = ObjectPath::from(file_path);
3579 let result = self.object_store.get(&path).await?;
3580 let bytes = result.bytes().await?;
3581 Ok::<_, anyhow::Error>(bytes)
3582 })?;
3583
3584 if bytes.is_empty() {
3585 return Ok(Vec::new());
3586 }
3587
3588 // Read the Arrow IPC stream
3589 let cursor = Cursor::new(bytes.as_ref());
3590 let reader = StreamReader::try_new(cursor, None)
3591 .map_err(|e| anyhow::anyhow!("Failed to create StreamReader: {e}"))?;
3592
3593 // Read all batches
3594 let mut batches = Vec::new();
3595
3596 for batch_result in reader {
3597 let batch = batch_result.map_err(|e| anyhow::anyhow!("Failed to read batch: {e}"))?;
3598 batches.push(batch);
3599 }
3600
3601 Ok(batches)
3602 }
3603
3604 /// Converts RecordBatches to Data objects, optionally replacing ts_init with ts_event.
3605 fn convert_record_batches_to_data<T>(
3606 &self,
3607 batches: Vec<RecordBatch>,
3608 use_ts_event_for_ts_init: bool,
3609 ) -> anyhow::Result<Vec<T>>
3610 where
3611 T: DecodeDataFromRecordBatch + TryFrom<Data>,
3612 {
3613 self.convert_record_batches_to_data_with_bar_type_conversion(
3614 batches,
3615 use_ts_event_for_ts_init,
3616 false,
3617 )
3618 }
3619
3620 /// Converts RecordBatches to Data objects with optional transforms for stream conversion.
3621 fn convert_record_batches_to_data_with_bar_type_conversion<T>(
3622 &self,
3623 batches: Vec<RecordBatch>,
3624 use_ts_event_for_ts_init: bool,
3625 convert_bar_type_to_external: bool,
3626 ) -> anyhow::Result<Vec<T>>
3627 where
3628 T: DecodeDataFromRecordBatch + TryFrom<Data>,
3629 {
3630 if batches.is_empty() {
3631 return Ok(Vec::new());
3632 }
3633
3634 // Get schema and metadata from first batch
3635 let schema = batches[0].schema();
3636 let mut metadata = schema.metadata().clone();
3637
3638 // Convert bar_type from INTERNAL to EXTERNAL if requested
3639 if convert_bar_type_to_external
3640 && let Some(bar_type_str) = metadata.get("bar_type").cloned()
3641 && bar_type_str.ends_with("-INTERNAL")
3642 {
3643 let external = bar_type_str.replace("-INTERNAL", "-EXTERNAL");
3644 metadata.insert("bar_type".to_string(), external);
3645 }
3646
3647 // Process each batch
3648 let mut all_data = Vec::new();
3649
3650 for mut batch in batches {
3651 // Handle ts_event/ts_init replacement if requested
3652 if use_ts_event_for_ts_init {
3653 let column_names: Vec<String> =
3654 schema.fields().iter().map(|f| f.name().clone()).collect();
3655
3656 let ts_event_idx = column_names
3657 .iter()
3658 .position(|n| n == "ts_event")
3659 .ok_or_else(|| anyhow::anyhow!("ts_event column not found"))?;
3660 let ts_init_idx = column_names
3661 .iter()
3662 .position(|n| n == "ts_init")
3663 .ok_or_else(|| anyhow::anyhow!("ts_init column not found"))?;
3664
3665 // Create new arrays with ts_init replaced by ts_event
3666 let mut new_columns = batch.columns().to_vec();
3667 new_columns[ts_init_idx] = new_columns[ts_event_idx].clone();
3668
3669 // Create new batch with updated columns
3670 batch = RecordBatch::try_new(schema.clone(), new_columns)
3671 .map_err(|e| anyhow::anyhow!("Failed to create new batch: {e}"))?;
3672 }
3673
3674 // Decode the batch to Data objects
3675 let data_vec = T::decode_data_batch(&metadata, batch)
3676 .map_err(|e| anyhow::anyhow!("Failed to decode batch: {e}"))?;
3677
3678 all_data.extend(data_vec);
3679 }
3680
3681 // Convert Data enum to specific type T
3682 Ok(to_variant::<T>(all_data))
3683 }
3684
3685 /// Converts RecordBatches directly to strongly typed values.
3686 fn convert_record_batches_to_typed<T>(
3687 &self,
3688 batches: Vec<RecordBatch>,
3689 ) -> anyhow::Result<Vec<T>>
3690 where
3691 T: DecodeTypedFromRecordBatch,
3692 {
3693 if batches.is_empty() {
3694 return Ok(Vec::new());
3695 }
3696
3697 let mut all_data = Vec::new();
3698
3699 for batch in batches {
3700 let metadata = batch.schema().metadata().clone();
3701 let decoded = T::decode_typed_batch(&metadata, batch)
3702 .map_err(|e| anyhow::anyhow!("Failed to decode batch: {e}"))?;
3703 all_data.extend(decoded);
3704 }
3705
3706 Ok(all_data)
3707 }
3708
3709 fn write_typed_batches<T>(&self, batches: Vec<RecordBatch>) -> anyhow::Result<()>
3710 where
3711 T: DecodeTypedFromRecordBatch + EncodeToRecordBatch + CatalogPathPrefix + HasTsInit,
3712 {
3713 let mut data: Vec<T> = self.convert_record_batches_to_typed(batches)?;
3714
3715 if !is_monotonically_increasing_by_init(&data) {
3716 data.sort_by_key(|item| item.ts_init());
3717 }
3718
3719 self.write_to_parquet(data, None, None, None)?;
3720 Ok(())
3721 }
3722
3723 pub fn convert_stream_to_data(
3724 &mut self,
3725 instance_id: &str,
3726 data_cls: &str,
3727 subdirectory: Option<&str>,
3728 identifiers: Option<&[String]>,
3729 use_ts_event_for_ts_init: bool,
3730 ) -> anyhow::Result<()> {
3731 let subdirectory = subdirectory.unwrap_or("backtest");
3732
3733 // Skip unsupported stream data types without error.
3734 if Self::is_excluded_stream_data_type(data_cls) {
3735 return Ok(());
3736 }
3737
3738 // Convert data class name to filename (e.g., "quotes" -> "quotes")
3739 // The data_cls should already be in the correct format (snake_case)
3740 let data_name = to_snake_case(data_cls);
3741
3742 // List all feather files for this data class
3743 let feather_files =
3744 self.list_feather_files(subdirectory, instance_id, &data_name, identifiers)?;
3745
3746 if feather_files.is_empty() {
3747 return Ok(());
3748 }
3749
3750 // Process each feather file independently so that each file's identifier
3751 // (instrument_id or bar_type from schema metadata) is preserved when writing
3752 // to parquet. This matches the Python _convert_feather_table_to_parquet approach.
3753 let convert_bar_type = data_cls == "bars";
3754
3755 for file_path in feather_files {
3756 let batches = self.read_feather_file(&file_path)?;
3757
3758 if batches.is_empty() {
3759 continue;
3760 }
3761
3762 match data_cls {
3763 "quotes" => {
3764 let mut data: Vec<QuoteTick> =
3765 self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3766
3767 if !is_monotonically_increasing_by_init(&data) {
3768 data.sort_by_key(|d| d.ts_init);
3769 }
3770 self.write_to_parquet(data, None, None, None)?;
3771 }
3772 "trades" => {
3773 let mut data: Vec<TradeTick> =
3774 self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3775
3776 if !is_monotonically_increasing_by_init(&data) {
3777 data.sort_by_key(|d| d.ts_init);
3778 }
3779 self.write_to_parquet(data, None, None, None)?;
3780 }
3781 "order_book_deltas" => {
3782 let mut data: Vec<OrderBookDelta> =
3783 self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3784
3785 if !is_monotonically_increasing_by_init(&data) {
3786 data.sort_by_key(|d| d.ts_init);
3787 }
3788 self.write_to_parquet(data, None, None, None)?;
3789 }
3790 "order_book_depths" => {
3791 let mut data: Vec<OrderBookDepth10> =
3792 self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3793
3794 if !is_monotonically_increasing_by_init(&data) {
3795 data.sort_by_key(|d| d.ts_init);
3796 }
3797 self.write_to_parquet(data, None, None, None)?;
3798 }
3799 "bars" => {
3800 let mut data: Vec<Bar> = self
3801 .convert_record_batches_to_data_with_bar_type_conversion(
3802 batches,
3803 use_ts_event_for_ts_init,
3804 convert_bar_type,
3805 )?;
3806
3807 if !is_monotonically_increasing_by_init(&data) {
3808 data.sort_by_key(|d| d.ts_init);
3809 }
3810 self.write_to_parquet(data, None, None, None)?;
3811 }
3812 "index_prices" => {
3813 let mut data: Vec<IndexPriceUpdate> =
3814 self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3815
3816 if !is_monotonically_increasing_by_init(&data) {
3817 data.sort_by_key(|d| d.ts_init);
3818 }
3819 self.write_to_parquet(data, None, None, None)?;
3820 }
3821 "mark_prices" => {
3822 let mut data: Vec<MarkPriceUpdate> =
3823 self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3824
3825 if !is_monotonically_increasing_by_init(&data) {
3826 data.sort_by_key(|d| d.ts_init);
3827 }
3828 self.write_to_parquet(data, None, None, None)?;
3829 }
3830 "instrument_status" => {
3831 self.write_typed_batches::<InstrumentStatus>(batches)?;
3832 }
3833 "instrument_closes" => {
3834 let mut data: Vec<InstrumentClose> =
3835 self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3836
3837 if !is_monotonically_increasing_by_init(&data) {
3838 data.sort_by_key(|d| d.ts_init);
3839 }
3840 self.write_to_parquet(data, None, None, None)?;
3841 }
3842 "funding_rate_update" => {
3843 self.write_typed_batches::<FundingRateUpdate>(batches)?;
3844 }
3845 "account_state" => {
3846 self.write_typed_batches::<AccountState>(batches)?;
3847 }
3848 "order_initialized" => {
3849 self.write_typed_batches::<OrderInitialized>(batches)?;
3850 }
3851 "order_denied" => {
3852 self.write_typed_batches::<OrderDenied>(batches)?;
3853 }
3854 "order_emulated" => {
3855 self.write_typed_batches::<OrderEmulated>(batches)?;
3856 }
3857 "order_submitted" => {
3858 self.write_typed_batches::<OrderSubmitted>(batches)?;
3859 }
3860 "order_accepted" => {
3861 self.write_typed_batches::<OrderAccepted>(batches)?;
3862 }
3863 "order_rejected" => {
3864 self.write_typed_batches::<OrderRejected>(batches)?;
3865 }
3866 "order_pending_cancel" => {
3867 self.write_typed_batches::<OrderPendingCancel>(batches)?;
3868 }
3869 "order_canceled" => {
3870 self.write_typed_batches::<OrderCanceled>(batches)?;
3871 }
3872 "order_cancel_rejected" => {
3873 self.write_typed_batches::<OrderCancelRejected>(batches)?;
3874 }
3875 "order_expired" => {
3876 self.write_typed_batches::<OrderExpired>(batches)?;
3877 }
3878 "order_triggered" => {
3879 self.write_typed_batches::<OrderTriggered>(batches)?;
3880 }
3881 "order_pending_update" => {
3882 self.write_typed_batches::<OrderPendingUpdate>(batches)?;
3883 }
3884 "order_released" => {
3885 self.write_typed_batches::<OrderReleased>(batches)?;
3886 }
3887 "order_modify_rejected" => {
3888 self.write_typed_batches::<OrderModifyRejected>(batches)?;
3889 }
3890 "order_updated" => {
3891 self.write_typed_batches::<OrderUpdated>(batches)?;
3892 }
3893 "order_filled" => {
3894 self.write_typed_batches::<OrderFilled>(batches)?;
3895 }
3896 "position_opened" => {
3897 self.write_typed_batches::<PositionOpened>(batches)?;
3898 }
3899 "position_changed" => {
3900 self.write_typed_batches::<PositionChanged>(batches)?;
3901 }
3902 "position_closed" => {
3903 self.write_typed_batches::<PositionClosed>(batches)?;
3904 }
3905 "position_adjusted" => {
3906 self.write_typed_batches::<PositionAdjusted>(batches)?;
3907 }
3908 "order_snapshot" => {
3909 self.write_typed_batches::<OrderSnapshot>(batches)?;
3910 }
3911 "position_snapshot" => {
3912 self.write_typed_batches::<PositionSnapshot>(batches)?;
3913 }
3914 "order_status_report" => {
3915 self.write_typed_batches::<OrderStatusReport>(batches)?;
3916 }
3917 "fill_report" => {
3918 self.write_typed_batches::<FillReport>(batches)?;
3919 }
3920 "position_status_report" => {
3921 self.write_typed_batches::<PositionStatusReport>(batches)?;
3922 }
3923 "execution_mass_status" => {
3924 self.write_typed_batches::<ExecutionMassStatus>(batches)?;
3925 }
3926 _ => {
3927 if data_cls.starts_with("custom/") {
3928 let data =
3929 self.decode_custom_batches_to_data(batches, use_ts_event_for_ts_init)?;
3930 let custom_items: Vec<CustomData> = data
3931 .into_iter()
3932 .filter_map(|d| match d {
3933 Data::Custom(c) => Some(c),
3934 _ => None,
3935 })
3936 .collect();
3937
3938 if !custom_items.is_empty() {
3939 self.write_custom_data_batch(custom_items, None, None, None)?;
3940 }
3941 } else {
3942 anyhow::bail!("Unknown data class: {data_cls}");
3943 }
3944 }
3945 }
3946 }
3947
3948 Ok(())
3949 }
3950}
3951
3952/// Trait for providing catalog path prefixes for different data types.
3953///
3954/// This trait enables type-safe organization of data within the catalog by providing
3955/// a standardized way to determine the directory structure for each data type.
3956/// Each data type maps to a specific subdirectory within the catalog's data folder.
3957///
3958/// # Implementation
3959///
3960/// Types implementing this trait should return a static string that represents
3961/// the directory name where data of that type should be stored.
3962///
3963/// # Examples
3964///
3965/// ```rust
3966/// use nautilus_persistence::backend::catalog::CatalogPathPrefix;
3967/// use nautilus_model::data::QuoteTick;
3968///
3969/// assert_eq!(QuoteTick::path_prefix(), "quotes");
3970/// ```
3971pub trait CatalogPathPrefix {
3972 /// Returns the path prefix (directory name) for this data type.
3973 ///
3974 /// # Returns
3975 ///
3976 /// A static string representing the directory name where this data type is stored.
3977 fn path_prefix() -> &'static str;
3978}
3979
3980/// Macro for implementing [`CatalogPathPrefix`] for data types.
3981///
3982/// This macro provides a convenient way to implement the trait for multiple types
3983/// with their corresponding path prefixes.
3984///
3985/// # Parameters
3986///
3987/// - `$type`: The data type to implement the trait for.
3988/// - `$path`: The path prefix string for that type.
3989macro_rules! impl_catalog_path_prefix {
3990 ($type:ty, $path:expr) => {
3991 impl CatalogPathPrefix for $type {
3992 fn path_prefix() -> &'static str {
3993 $path
3994 }
3995 }
3996 };
3997}
3998
3999// Standard implementations for financial data types
4000impl_catalog_path_prefix!(QuoteTick, "quotes");
4001impl_catalog_path_prefix!(TradeTick, "trades");
4002impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
4003impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
4004impl_catalog_path_prefix!(Bar, "bars");
4005impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
4006impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
4007impl_catalog_path_prefix!(FundingRateUpdate, "funding_rate_update");
4008impl_catalog_path_prefix!(InstrumentStatus, "instrument_status");
4009impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
4010impl_catalog_path_prefix!(InstrumentAny, "instruments");
4011impl_catalog_path_prefix!(AccountState, "account_state");
4012impl_catalog_path_prefix!(OrderInitialized, "order_initialized");
4013impl_catalog_path_prefix!(OrderDenied, "order_denied");
4014impl_catalog_path_prefix!(OrderEmulated, "order_emulated");
4015impl_catalog_path_prefix!(OrderSubmitted, "order_submitted");
4016impl_catalog_path_prefix!(OrderAccepted, "order_accepted");
4017impl_catalog_path_prefix!(OrderRejected, "order_rejected");
4018impl_catalog_path_prefix!(OrderPendingCancel, "order_pending_cancel");
4019impl_catalog_path_prefix!(OrderCanceled, "order_canceled");
4020impl_catalog_path_prefix!(OrderCancelRejected, "order_cancel_rejected");
4021impl_catalog_path_prefix!(OrderExpired, "order_expired");
4022impl_catalog_path_prefix!(OrderTriggered, "order_triggered");
4023impl_catalog_path_prefix!(OrderPendingUpdate, "order_pending_update");
4024impl_catalog_path_prefix!(OrderReleased, "order_released");
4025impl_catalog_path_prefix!(OrderModifyRejected, "order_modify_rejected");
4026impl_catalog_path_prefix!(OrderUpdated, "order_updated");
4027impl_catalog_path_prefix!(OrderFilled, "order_filled");
4028impl_catalog_path_prefix!(PositionOpened, "position_opened");
4029impl_catalog_path_prefix!(PositionChanged, "position_changed");
4030impl_catalog_path_prefix!(PositionClosed, "position_closed");
4031impl_catalog_path_prefix!(PositionAdjusted, "position_adjusted");
4032impl_catalog_path_prefix!(OrderSnapshot, "order_snapshot");
4033impl_catalog_path_prefix!(PositionSnapshot, "position_snapshot");
4034impl_catalog_path_prefix!(OrderStatusReport, "order_status_report");
4035impl_catalog_path_prefix!(FillReport, "fill_report");
4036impl_catalog_path_prefix!(PositionStatusReport, "position_status_report");
4037impl_catalog_path_prefix!(ExecutionMassStatus, "execution_mass_status");
4038
4039/// Converts timestamps to a filename using ISO 8601 format.
4040///
4041/// This function converts two Unix nanosecond timestamps to a filename that uses
4042/// ISO 8601 format with filesystem-safe characters. The format matches the Python
4043/// implementation for consistency.
4044///
4045/// # Parameters
4046///
4047/// - `timestamp_1`: First timestamp in Unix nanoseconds.
4048/// - `timestamp_2`: Second timestamp in Unix nanoseconds.
4049///
4050/// # Returns
4051///
4052/// Returns a filename string in the format: "`iso_timestamp_1_iso_timestamp_2.parquet`".
4053///
4054/// # Examples
4055///
4056/// ```rust
4057/// # use nautilus_persistence::backend::catalog::timestamps_to_filename;
4058/// # use nautilus_core::UnixNanos;
4059/// let filename = timestamps_to_filename(
4060/// UnixNanos::from(1609459200000000000),
4061/// UnixNanos::from(1609545600000000000)
4062/// );
4063/// // Returns something like: "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet"
4064/// ```
4065#[must_use]
4066pub fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
4067 let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
4068 let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
4069
4070 format!("{datetime_1}_{datetime_2}.parquet")
4071}
4072
4073/// Converts an ISO 8601 timestamp to a filesystem-safe format.
4074///
4075/// This function replaces colons and dots with hyphens to make the timestamp
4076/// safe for use in filenames across different filesystems.
4077///
4078/// # Parameters
4079///
4080/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
4081///
4082/// # Returns
4083///
4084/// Returns a filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
4085///
4086/// # Examples
4087///
4088/// ```rust
4089/// # use nautilus_persistence::backend::catalog::iso_timestamp_to_file_timestamp;
4090/// let safe_timestamp = iso_timestamp_to_file_timestamp("2023-10-26T07:30:50.123456789Z");
4091/// assert_eq!(safe_timestamp, "2023-10-26T07-30-50-123456789Z");
4092/// ```
4093fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
4094 iso_timestamp.replace([':', '.'], "-")
4095}
4096
4097/// Converts a filesystem-safe timestamp back to ISO 8601 format.
4098///
4099/// This function reverses the transformation done by `iso_timestamp_to_file_timestamp`,
4100/// converting filesystem-safe timestamps back to standard ISO 8601 format.
4101///
4102/// # Parameters
4103///
4104/// - `file_timestamp`: Filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
4105///
4106/// # Returns
4107///
4108/// Returns an ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
4109///
4110/// # Examples
4111///
4112/// ```rust
4113/// # use nautilus_persistence::backend::catalog::file_timestamp_to_iso_timestamp;
4114/// let iso_timestamp = file_timestamp_to_iso_timestamp("2023-10-26T07-30-50-123456789Z");
4115/// assert_eq!(iso_timestamp, "2023-10-26T07:30:50.123456789Z");
4116/// ```
4117fn file_timestamp_to_iso_timestamp(file_timestamp: &str) -> String {
4118 let (date_part, time_part) = file_timestamp
4119 .split_once('T')
4120 .unwrap_or((file_timestamp, ""));
4121 let time_part = time_part.strip_suffix('Z').unwrap_or(time_part);
4122
4123 // Find the last hyphen to separate nanoseconds
4124 if let Some(last_hyphen_idx) = time_part.rfind('-') {
4125 let time_with_dot_for_nanos = format!(
4126 "{}.{}",
4127 &time_part[..last_hyphen_idx],
4128 &time_part[last_hyphen_idx + 1..]
4129 );
4130 let final_time_part = time_with_dot_for_nanos.replace('-', ":");
4131 format!("{date_part}T{final_time_part}Z")
4132 } else {
4133 // Fallback if no nanoseconds part found
4134 let final_time_part = time_part.replace('-', ":");
4135 format!("{date_part}T{final_time_part}Z")
4136 }
4137}
4138
4139/// Converts an ISO 8601 timestamp string to Unix nanoseconds.
4140///
4141/// This function parses an ISO 8601 timestamp and converts it to Unix nanoseconds.
4142/// It's used to convert parsed timestamps back to the internal representation.
4143///
4144/// # Parameters
4145///
4146/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
4147///
4148/// # Returns
4149///
4150/// Returns `Ok(u64)` with the Unix nanoseconds timestamp, or an error if parsing fails.
4151///
4152/// # Examples
4153///
4154/// ```rust
4155/// # use nautilus_persistence::backend::catalog::iso_to_unix_nanos;
4156/// let nanos = iso_to_unix_nanos("2021-01-01T00:00:00.000000000Z").unwrap();
4157/// assert_eq!(nanos, 1609459200000000000);
4158/// ```
4159fn iso_to_unix_nanos(iso_timestamp: &str) -> anyhow::Result<u64> {
4160 Ok(iso8601_to_unix_nanos(iso_timestamp)?.into())
4161}
4162
4163/// Converts an instrument ID to a URI-safe format by removing forward slashes
4164/// and replacing carets with underscores.
4165///
4166/// Some instrument IDs contain forward slashes (e.g., "BTC/USD") which are not
4167/// suitable for use in file paths. This function transforms these characters to
4168/// create a safe directory name.
4169///
4170/// # Parameters
4171///
4172/// - `instrument_id`: The original instrument ID string.
4173///
4174/// # Returns
4175///
4176/// A URI-safe version of the instrument ID with forward slashes removed and carets replaced.
4177///
4178/// # Examples
4179///
4180/// ```rust
4181/// # use nautilus_persistence::backend::catalog::urisafe_instrument_id;
4182/// assert_eq!(urisafe_instrument_id("BTC/USD"), "BTCUSD");
4183/// assert_eq!(urisafe_instrument_id("EUR-USD"), "EUR-USD");
4184/// assert_eq!(urisafe_instrument_id("^SPX.CBOE"), "_SPX.CBOE");
4185/// ```
4186pub fn urisafe_instrument_id(instrument_id: &str) -> String {
4187 instrument_id.replace('/', "").replace('^', "_")
4188}
4189
4190// Extract the instrument ID portion from a bar type directory name.
4191// Handles both standard and composite formats:
4192// {id}-{step}-{agg}-{price}-{source}
4193// {id}-{step}-{agg}-{price}-{source}@{step}-{agg}-{source}
4194// Strips the composite suffix before parsing with rsplitn(5, '-').
4195fn extract_bar_type_instrument_id(bar_type_dir: &str) -> Option<&str> {
4196 let standard = bar_type_dir.split('@').next().unwrap_or(bar_type_dir);
4197 let pieces: Vec<&str> = standard.rsplitn(5, '-').collect();
4198 // pieces (reversed): [source, price_type, agg, step, instrument_id]
4199 if pieces.len() == 5 && pieces[3].chars().all(|c| c.is_ascii_digit()) {
4200 Some(pieces[4])
4201 } else {
4202 None
4203 }
4204}
4205
4206/// Normalizes a custom data identifier for use in directory paths.
4207/// Replaces `//` with `/`, and filters out empty segments and `..` to prevent path traversal.
4208#[must_use]
4209pub fn safe_directory_identifier(identifier: &str) -> String {
4210 let normalized = identifier.replace("//", "/");
4211 let segments: Vec<&str> = normalized
4212 .split('/')
4213 .filter(|s| !s.is_empty() && *s != "..")
4214 .collect();
4215 segments.join("/")
4216}
4217
4218/// Extracts the identifier from a file path.
4219///
4220/// The identifier is typically the second-to-last path component (directory name).
4221/// For example, from "`data/quote_tick/EURUSD/file.parquet`", extracts "EURUSD".
4222#[must_use]
4223pub fn extract_identifier_from_path(file_path: &str) -> String {
4224 let path_parts: Vec<&str> = file_path.split('/').collect();
4225 if path_parts.len() >= 2 {
4226 path_parts[path_parts.len() - 2].to_string()
4227 } else {
4228 "unknown".to_string()
4229 }
4230}
4231
4232/// Makes an identifier safe for use in SQL table names.
4233///
4234/// Removes forward slashes, replaces dots, hyphens, and spaces with underscores, and converts to lowercase.
4235#[must_use]
4236pub fn make_sql_safe_identifier(identifier: &str) -> String {
4237 urisafe_instrument_id(identifier)
4238 .replace(['.', '-', ' ', '%'], "_")
4239 .to_lowercase()
4240}
4241
4242/// Extracts the filename from a file path and makes it SQL-safe.
4243///
4244/// For example, from "data/quote_tick/EURUSD/2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
4245/// extracts "`2021_01_01t00_00_00_000000000z_2021_01_02t00_00_00_000000000z`".
4246#[must_use]
4247pub fn extract_sql_safe_filename(file_path: &str) -> String {
4248 if file_path.is_empty() {
4249 return "unknown_file".to_string();
4250 }
4251
4252 let filename = file_path.split('/').next_back().unwrap_or("unknown_file");
4253
4254 // Remove .parquet extension
4255 let name_without_ext = if let Some(dot_pos) = filename.rfind(".parquet") {
4256 &filename[..dot_pos]
4257 } else {
4258 filename
4259 };
4260
4261 // Remove characters that can pose problems: hyphens, colons, etc.
4262 name_without_ext
4263 .replace(['-', ':', '.'], "_")
4264 .to_lowercase()
4265}
4266
4267/// Creates a platform-appropriate local path using `PathBuf`.
4268///
4269/// This function constructs file system paths using the platform's native path separators.
4270/// Use this for local file operations that need to work with the actual file system.
4271///
4272/// # Arguments
4273///
4274/// * `base_path` - The base directory path
4275/// * `components` - Path components to join
4276///
4277/// # Returns
4278///
4279/// A `PathBuf` with platform-appropriate separators
4280///
4281/// # Examples
4282///
4283/// ```rust
4284/// # use nautilus_persistence::backend::catalog::make_local_path;
4285/// let path = make_local_path("/base", &["data", "quotes", "EURUSD"]);
4286/// // On Unix: "/base/data/quotes/EURUSD"
4287/// // On Windows: "\base\data\quotes\EURUSD"
4288/// ```
4289pub fn make_local_path<P: AsRef<Path>>(base_path: P, components: &[&str]) -> PathBuf {
4290 let mut path = PathBuf::from(base_path.as_ref());
4291 for component in components {
4292 path.push(component);
4293 }
4294 path
4295}
4296
4297/// Creates an object store path using forward slashes.
4298///
4299/// Object stores (S3, GCS, etc.) always expect forward slashes regardless of platform.
4300/// Use this when creating paths for object store operations.
4301///
4302/// # Arguments
4303///
4304/// * `base_path` - The base path (can be empty)
4305/// * `components` - Path components to join
4306///
4307/// # Returns
4308///
4309/// A string path with forward slash separators
4310///
4311/// # Examples
4312///
4313/// ```rust
4314/// # use nautilus_persistence::backend::catalog::make_object_store_path;
4315/// let path = make_object_store_path("base", &["data", "quotes", "EURUSD"]);
4316/// assert_eq!(path, "base/data/quotes/EURUSD");
4317/// ```
4318#[must_use]
4319pub fn make_object_store_path(base_path: &str, components: &[&str]) -> String {
4320 let mut parts = Vec::new();
4321
4322 if !base_path.is_empty() {
4323 let normalized_base = base_path
4324 .replace('\\', "/")
4325 .trim_end_matches('/')
4326 .to_string();
4327
4328 if !normalized_base.is_empty() {
4329 parts.push(normalized_base);
4330 }
4331 }
4332
4333 for component in components {
4334 let normalized_component = component
4335 .replace('\\', "/")
4336 .trim_start_matches('/')
4337 .trim_end_matches('/')
4338 .to_string();
4339
4340 if !normalized_component.is_empty() {
4341 parts.push(normalized_component);
4342 }
4343 }
4344
4345 parts.join("/")
4346}
4347
4348/// Creates an object store path using forward slashes with owned strings.
4349///
4350/// This variant accepts owned strings to avoid lifetime issues.
4351///
4352/// # Arguments
4353///
4354/// * `base_path` - The base path (can be empty)
4355/// * `components` - Path components to join (owned strings)
4356///
4357/// # Returns
4358///
4359/// A string path with forward slash separators
4360#[must_use]
4361pub fn make_object_store_path_owned(base_path: &str, components: Vec<String>) -> String {
4362 let mut parts = Vec::new();
4363
4364 if !base_path.is_empty() {
4365 let normalized_base = base_path
4366 .replace('\\', "/")
4367 .trim_end_matches('/')
4368 .to_string();
4369
4370 if !normalized_base.is_empty() {
4371 parts.push(normalized_base);
4372 }
4373 }
4374
4375 for component in components {
4376 let normalized_component = component
4377 .replace('\\', "/")
4378 .trim_start_matches('/')
4379 .trim_end_matches('/')
4380 .to_string();
4381
4382 if !normalized_component.is_empty() {
4383 parts.push(normalized_component);
4384 }
4385 }
4386
4387 parts.join("/")
4388}
4389
4390/// Converts a local `PathBuf` to an object store path string.
4391///
4392/// This function normalizes a local file system path to the forward-slash format
4393/// expected by object stores, handling platform differences.
4394///
4395/// # Arguments
4396///
4397/// * `local_path` - The local `PathBuf` to convert
4398///
4399/// # Returns
4400///
4401/// A string with forward slash separators suitable for object store operations
4402///
4403/// # Examples
4404///
4405/// ```rust
4406/// # use std::path::PathBuf;
4407/// # use nautilus_persistence::backend::catalog::local_to_object_store_path;
4408/// let local_path = PathBuf::from("data").join("quotes").join("EURUSD");
4409/// let object_path = local_to_object_store_path(&local_path);
4410/// assert_eq!(object_path, "data/quotes/EURUSD");
4411/// ```
4412#[must_use]
4413pub fn local_to_object_store_path(local_path: &Path) -> String {
4414 local_path.to_string_lossy().replace('\\', "/")
4415}
4416
4417/// Extracts path components using platform-appropriate path parsing.
4418///
4419/// This function safely parses a path into its components, handling both
4420/// local file system paths and object store paths correctly.
4421///
4422/// # Arguments
4423///
4424/// * `path_str` - The path string to parse
4425///
4426/// # Returns
4427///
4428/// A vector of path components
4429///
4430/// # Examples
4431///
4432/// ```rust
4433/// # use nautilus_persistence::backend::catalog::extract_path_components;
4434/// let components = extract_path_components("data/quotes/EURUSD");
4435/// assert_eq!(components, vec!["data", "quotes", "EURUSD"]);
4436///
4437/// // Works with both separators
4438/// let components = extract_path_components("data\\quotes\\EURUSD");
4439/// assert_eq!(components, vec!["data", "quotes", "EURUSD"]);
4440/// ```
4441#[must_use]
4442pub fn extract_path_components(path_str: &str) -> Vec<String> {
4443 // Normalize separators and split
4444 let normalized = path_str.replace('\\', "/");
4445 normalized
4446 .split('/')
4447 .filter(|s| !s.is_empty())
4448 .map(ToString::to_string)
4449 .collect()
4450}
4451
4452/// Checks if a filename's timestamp range intersects with a query interval.
4453///
4454/// This function determines whether a Parquet file (identified by its timestamp-based
4455/// filename) contains data that falls within the specified query time range.
4456///
4457/// # Parameters
4458///
4459/// - `filename`: The filename to check (format: "`iso_timestamp_1_iso_timestamp_2.parquet`").
4460/// - `start`: Optional start timestamp for the query range.
4461/// - `end`: Optional end timestamp for the query range.
4462///
4463/// # Returns
4464///
4465/// Returns `true` if the file's time range intersects with the query range,
4466/// `false` otherwise. Returns `true` if the filename cannot be parsed.
4467///
4468/// # Examples
4469///
4470/// ```rust
4471/// # use nautilus_persistence::backend::catalog::query_intersects_filename;
4472/// // Example with ISO format filenames
4473/// assert!(query_intersects_filename(
4474/// "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
4475/// Some(1609459200000000000),
4476/// Some(1609545600000000000)
4477/// ));
4478/// ```
4479fn query_intersects_filename(filename: &str, start: Option<u64>, end: Option<u64>) -> bool {
4480 if let Some((file_start, file_end)) = parse_filename_timestamps(filename) {
4481 (start.is_none() || start.unwrap() <= file_end)
4482 && (end.is_none() || file_start <= end.unwrap())
4483 } else {
4484 true
4485 }
4486}
4487
4488/// Parses timestamps from a Parquet filename.
4489///
4490/// Extracts the start and end timestamps from filenames that follow the ISO 8601 format:
4491/// "`iso_timestamp_1_iso_timestamp_2.parquet`" (e.g., "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet")
4492///
4493/// # Parameters
4494///
4495/// - `filename`: The filename to parse (can be a full path).
4496///
4497/// # Returns
4498///
4499/// Returns `Some((start_ts, end_ts))` if the filename matches the expected format,
4500/// `None` otherwise.
4501///
4502/// # Examples
4503///
4504/// ```rust
4505/// # use nautilus_persistence::backend::catalog::parse_filename_timestamps;
4506/// assert!(parse_filename_timestamps("2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet").is_some());
4507/// assert_eq!(parse_filename_timestamps("invalid.parquet"), None);
4508/// ```
4509#[must_use]
4510pub fn parse_filename_timestamps(filename: &str) -> Option<(u64, u64)> {
4511 let path = Path::new(filename);
4512 let base_name = path.file_name()?.to_str()?;
4513 let base_filename = base_name.strip_suffix(".parquet")?;
4514 let (first_part, second_part) = base_filename.split_once('_')?;
4515
4516 let first_iso = file_timestamp_to_iso_timestamp(first_part);
4517 let second_iso = file_timestamp_to_iso_timestamp(second_part);
4518
4519 let first_ts = iso_to_unix_nanos(&first_iso).ok()?;
4520 let second_ts = iso_to_unix_nanos(&second_iso).ok()?;
4521
4522 Some((first_ts, second_ts))
4523}
4524
4525/// Checks if a list of closed integer intervals are all mutually disjoint.
4526///
4527/// Two intervals are disjoint if they do not overlap. This function validates that
4528/// all intervals in the list are non-overlapping, which is a requirement for
4529/// maintaining data integrity in the catalog.
4530///
4531/// # Parameters
4532///
4533/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
4534///
4535/// # Returns
4536///
4537/// Returns `true` if all intervals are disjoint, `false` if any overlap is found.
4538/// Returns `true` for empty lists or lists with a single interval.
4539///
4540/// # Examples
4541///
4542/// ```rust
4543/// # use nautilus_persistence::backend::catalog::are_intervals_disjoint;
4544/// // Disjoint intervals
4545/// assert!(are_intervals_disjoint(&[(1, 5), (10, 15), (20, 25)]));
4546///
4547/// // Overlapping intervals
4548/// assert!(!are_intervals_disjoint(&[(1, 10), (5, 15)]));
4549/// ```
4550#[must_use]
4551pub fn are_intervals_disjoint(intervals: &[(u64, u64)]) -> bool {
4552 let n = intervals.len();
4553
4554 if n <= 1 {
4555 return true;
4556 }
4557
4558 let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
4559 sorted_intervals.sort_by_key(|&(start, _)| start);
4560
4561 for i in 0..(n - 1) {
4562 let (_, end1) = sorted_intervals[i];
4563 let (start2, _) = sorted_intervals[i + 1];
4564
4565 if end1 >= start2 {
4566 return false;
4567 }
4568 }
4569
4570 true
4571}
4572
4573/// Checks if intervals are contiguous (adjacent with no gaps).
4574///
4575/// Intervals are contiguous if, when sorted by start time, each interval's start
4576/// timestamp is exactly one more than the previous interval's end timestamp.
4577/// This ensures complete coverage of a time range with no gaps.
4578///
4579/// # Parameters
4580///
4581/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
4582///
4583/// # Returns
4584///
4585/// Returns `true` if all intervals are contiguous, `false` if any gaps are found.
4586/// Returns `true` for empty lists or lists with a single interval.
4587///
4588/// # Examples
4589///
4590/// ```rust
4591/// # use nautilus_persistence::backend::catalog::are_intervals_contiguous;
4592/// // Contiguous intervals
4593/// assert!(are_intervals_contiguous(&[(1, 5), (6, 10), (11, 15)]));
4594///
4595/// // Non-contiguous intervals (gap between 5 and 8)
4596/// assert!(!are_intervals_contiguous(&[(1, 5), (8, 10)]));
4597/// ```
4598#[must_use]
4599pub fn are_intervals_contiguous(intervals: &[(u64, u64)]) -> bool {
4600 let n = intervals.len();
4601 if n <= 1 {
4602 return true;
4603 }
4604
4605 let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
4606 sorted_intervals.sort_by_key(|&(start, _)| start);
4607
4608 for i in 0..(n - 1) {
4609 let (_, end1) = sorted_intervals[i];
4610 let (start2, _) = sorted_intervals[i + 1];
4611
4612 if end1 + 1 != start2 {
4613 return false;
4614 }
4615 }
4616
4617 true
4618}
4619
4620/// Finds the parts of a query interval that are not covered by existing data intervals.
4621///
4622/// This function calculates the "gaps" in data coverage by comparing a requested
4623/// time range against the intervals covered by existing data files. It's used to
4624/// determine what data needs to be fetched or backfilled.
4625///
4626/// # Parameters
4627///
4628/// - `start`: Start timestamp of the query interval (inclusive).
4629/// - `end`: End timestamp of the query interval (inclusive).
4630/// - `closed_intervals`: Existing data intervals as (start, end) tuples.
4631///
4632/// # Returns
4633///
4634/// Returns a vector of (start, end) tuples representing the gaps in coverage.
4635/// Returns an empty vector if the query range is invalid or fully covered.
4636///
4637/// # Examples
4638///
4639/// ```rust
4640/// # use nautilus_persistence::backend::catalog::query_interval_diff;
4641/// // Query 1-100, have data for 10-30 and 60-80
4642/// let gaps = query_interval_diff(1, 100, &[(10, 30), (60, 80)]);
4643/// assert_eq!(gaps, vec![(1, 9), (31, 59), (81, 100)]);
4644/// ```
4645fn query_interval_diff(start: u64, end: u64, closed_intervals: &[(u64, u64)]) -> Vec<(u64, u64)> {
4646 if start > end {
4647 return Vec::new();
4648 }
4649
4650 let interval_set = get_interval_set(closed_intervals);
4651 let query_range = (RangeBound::Included(start), RangeBound::Included(end));
4652 let query_diff = interval_set.get_interval_difference(&query_range);
4653 let mut result: Vec<(u64, u64)> = Vec::new();
4654
4655 for interval in query_diff {
4656 if let Some(tuple) = interval_to_tuple(interval, start, end) {
4657 result.push(tuple);
4658 }
4659 }
4660
4661 result
4662}
4663
4664/// Creates an interval tree from closed integer intervals.
4665///
4666/// This function converts closed intervals [a, b] into half-open intervals [a, b+1)
4667/// for use with the interval tree data structure, which is used for efficient
4668/// interval operations and gap detection.
4669///
4670/// # Parameters
4671///
4672/// - `intervals`: A slice of closed intervals as (start, end) tuples.
4673///
4674/// # Returns
4675///
4676/// Returns an [`IntervalTree`] containing the converted intervals.
4677///
4678/// # Notes
4679///
4680/// - Invalid intervals (where start > end) are skipped.
4681/// - Uses saturating addition to prevent overflow when converting to half-open intervals.
4682fn get_interval_set(intervals: &[(u64, u64)]) -> IntervalTree<u64> {
4683 let mut tree = IntervalTree::default();
4684
4685 if intervals.is_empty() {
4686 return tree;
4687 }
4688
4689 for &(start, end) in intervals {
4690 if start > end {
4691 continue;
4692 }
4693
4694 tree.insert((
4695 RangeBound::Included(start),
4696 RangeBound::Excluded(end.saturating_add(1)),
4697 ));
4698 }
4699
4700 tree
4701}
4702
4703/// Converts an interval tree result back to a closed interval tuple.
4704///
4705/// This helper function converts the bounded interval representation used by
4706/// the interval tree back into the (start, end) tuple format used throughout
4707/// the catalog.
4708///
4709/// # Parameters
4710///
4711/// - `interval`: The bounded interval from the interval tree.
4712/// - `query_start`: The start of the original query range.
4713/// - `query_end`: The end of the original query range.
4714///
4715/// # Returns
4716///
4717/// Returns `Some((start, end))` for valid intervals, `None` for empty intervals.
4718fn interval_to_tuple(
4719 interval: (RangeBound<&u64>, RangeBound<&u64>),
4720 query_start: u64,
4721 query_end: u64,
4722) -> Option<(u64, u64)> {
4723 let (bound_start, bound_end) = interval;
4724
4725 let start = match bound_start {
4726 RangeBound::Included(val) => *val,
4727 RangeBound::Excluded(val) => val.saturating_add(1),
4728 RangeBound::Unbounded => query_start,
4729 };
4730
4731 let end = match bound_end {
4732 RangeBound::Included(val) => *val,
4733 RangeBound::Excluded(val) => {
4734 if *val == 0 {
4735 return None; // Empty interval
4736 }
4737 val - 1
4738 }
4739 RangeBound::Unbounded => query_end,
4740 };
4741
4742 if start <= end {
4743 Some((start, end))
4744 } else {
4745 None
4746 }
4747}