Skip to main content

nautilus_persistence/backend/
catalog.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parquet data catalog for efficient storage and retrieval of financial market data.
17//!
18//! This module provides a data catalog implementation that uses Apache Parquet
19//! format for storing financial market data with object store backends. The catalog supports
20//! various data types including quotes, trades, bars, order book data, and other market events.
21//!
22//! # Key Features
23//!
24//! - **Object Store Integration**: Works with local filesystems, S3, and other object stores.
25//! - **Data Type Support**: Handles all major financial data types (quotes, trades, bars, etc.).
26//! - **Time-based Organization**: Organizes data by timestamp ranges for efficient querying.
27//! - **Consolidation**: Merges multiple files to optimize storage and query performance.
28//! - **Validation**: Ensures data integrity with timestamp ordering and interval validation.
29//!
30//! # Architecture
31//!
32//! The catalog organizes data in a hierarchical structure:
33//! ```text
34//! data/
35//! ├── quotes/
36//! │   └── INSTRUMENT_ID/
37//! │       └── start_ts-end_ts.parquet
38//! ├── trades/
39//! │   └── INSTRUMENT_ID/
40//! │       └── start_ts-end_ts.parquet
41//! └── bars/
42//!     └── INSTRUMENT_ID/
43//!         └── start_ts-end_ts.parquet
44//! ```
45//!
46//! # Usage
47//!
48//! ```rust,no_run
49//! use std::path::PathBuf;
50//! use nautilus_persistence::backend::catalog::ParquetDataCatalog;
51//!
52//! // Create a new catalog
53//! let catalog = ParquetDataCatalog::new(
54//!     PathBuf::from("/path/to/data"),
55//!     None,        // storage_options
56//!     Some(5000),  // batch_size
57//!     None,        // compression (defaults to SNAPPY)
58//!     None,        // max_row_group_size (defaults to 5000)
59//! );
60//!
61//! // Write data to the catalog
62//! // catalog.write_to_parquet(data, None, None)?;
63//! ```
64
65use std::{
66    borrow::Cow,
67    collections::{BTreeMap, HashSet},
68    fmt::Debug,
69    io::Cursor,
70    ops::Bound as RangeBound,
71    path::{Path, PathBuf},
72    sync::Arc,
73};
74
75use ahash::AHashMap;
76use datafusion::arrow::record_batch::RecordBatch;
77use futures::StreamExt;
78use itertools::Itertools;
79use nautilus_common::live::get_runtime;
80use nautilus_core::{
81    UnixNanos,
82    datetime::{iso8601_to_unix_nanos, unix_nanos_to_iso8601},
83    string::{conversions::to_snake_case, urlencoding},
84};
85use nautilus_model::{
86    data::{
87        Bar, CustomData, Data, FundingRateUpdate, HasTsInit, IndexPriceUpdate, InstrumentStatus,
88        MarkPriceUpdate, OrderBookDelta, OrderBookDepth10, QuoteTick, TradeTick,
89        close::InstrumentClose, is_monotonically_increasing_by_init, to_variant,
90    },
91    events::{
92        AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied,
93        OrderEmulated, OrderExpired, OrderFilled, OrderInitialized, OrderModifyRejected,
94        OrderPendingCancel, OrderPendingUpdate, OrderRejected, OrderReleased, OrderSnapshot,
95        OrderSubmitted, OrderTriggered, OrderUpdated, PositionAdjusted, PositionChanged,
96        PositionClosed, PositionOpened, PositionSnapshot,
97    },
98    instruments::InstrumentAny,
99    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
100};
101use nautilus_serialization::arrow::{
102    DecodeDataFromRecordBatch, DecodeTypedFromRecordBatch, EncodeToRecordBatch,
103    custom::CustomDataDecoder,
104};
105use object_store::{ObjectStore, ObjectStoreExt, path::Path as ObjectPath};
106use serde::Serialize;
107use unbounded_interval_tree::interval_tree::IntervalTree;
108
109use super::{
110    custom::{
111        custom_data_path_components, decode_batch_to_data as orchestration_decode_batch_to_data,
112        decode_custom_batches_to_data as orchestration_decode_custom_batches_to_data,
113        prepare_custom_data_batch,
114    },
115    session::{self, DataBackendSession, QueryResult, build_query},
116};
117use crate::parquet::{
118    is_remote_uri_scheme, read_parquet_from_object_store, remote_full_uri, remote_store_root_url,
119    write_batches_to_object_store,
120};
121
122/// A high-performance data catalog for storing and retrieving financial market data using Apache Parquet format.
123///
124/// The `ParquetDataCatalog` provides a solution for managing large volumes of financial
125/// market data with efficient storage, querying, and consolidation capabilities. It supports various
126/// object store backends including local filesystems, AWS S3, and other cloud storage providers.
127///
128/// # Features
129///
130/// - **Efficient Storage**: Uses Apache Parquet format with configurable compression.
131/// - **Object Store Backend**: Supports multiple storage backends through the `object_store` crate.
132/// - **Time-based Organization**: Organizes data by timestamp ranges for optimal query performance.
133/// - **Data Validation**: Ensures timestamp ordering and interval consistency.
134/// - **Consolidation**: Merges multiple files to reduce storage overhead and improve query speed.
135/// - **Type Safety**: Strongly typed data handling with compile-time guarantees.
136///
137/// # Data Organization
138///
139/// Data is organized hierarchically by data type and instrument:
140/// - `data/{data_type}/{instrument_id}/{start_ts}-{end_ts}.parquet`.
141/// - Files are named with their timestamp ranges for efficient range queries.
142/// - Intervals are validated to be disjoint to prevent data overlap.
143///
144/// # Performance Considerations
145///
146/// - **Batch Size**: Controls memory usage during data processing.
147/// - **Compression**: SNAPPY compression provides good balance of speed and size.
148/// - **Row Group Size**: Affects query performance and memory usage.
149/// - **File Consolidation**: Reduces the number of files for better query performance.
150pub struct ParquetDataCatalog {
151    /// The base path for data storage within the object store.
152    pub base_path: String,
153    /// The original URI provided when creating the catalog.
154    pub original_uri: String,
155    /// The object store backend for data persistence.
156    pub object_store: Arc<dyn ObjectStore>,
157    /// The DataFusion session for query execution.
158    pub session: DataBackendSession,
159    /// The number of records to process in each batch.
160    pub batch_size: usize,
161    /// The compression algorithm used for Parquet files.
162    pub compression: parquet::basic::Compression,
163    /// The maximum number of rows in each Parquet row group.
164    pub max_row_group_size: usize,
165}
166
167impl Debug for ParquetDataCatalog {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        f.debug_struct(stringify!(ParquetDataCatalog))
170            .field("base_path", &self.base_path)
171            .finish()
172    }
173}
174
175impl ParquetDataCatalog {
176    /// Creates a new [`ParquetDataCatalog`] instance from a local file path.
177    ///
178    /// This is a convenience constructor that converts a local path to a URI format
179    /// and delegates to [`Self::from_uri`].
180    ///
181    /// # Parameters
182    ///
183    /// - `base_path`: The base directory path for data storage.
184    /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options.
185    /// - `batch_size`: Number of records to process in each batch (default: 5000).
186    /// - `compression`: Parquet compression algorithm (default: SNAPPY).
187    /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
188    ///
189    /// # Panics
190    ///
191    /// Panics if the path cannot be converted to a valid URI or if the object store
192    /// cannot be created from the path.
193    ///
194    /// # Examples
195    ///
196    /// ```rust,no_run
197    /// use std::path::PathBuf;
198    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
199    ///
200    /// let catalog = ParquetDataCatalog::new(
201    ///     PathBuf::from("/tmp/nautilus_data"),
202    ///     None,        // no storage options
203    ///     Some(1000),  // smaller batch size
204    ///     None,        // default compression
205    ///     None,        // default row group size
206    /// );
207    /// ```
208    #[must_use]
209    pub fn new(
210        base_path: &Path,
211        storage_options: Option<AHashMap<String, String>>,
212        batch_size: Option<usize>,
213        compression: Option<parquet::basic::Compression>,
214        max_row_group_size: Option<usize>,
215    ) -> Self {
216        let path_str = base_path.to_string_lossy().to_string();
217        Self::from_uri(
218            &path_str,
219            storage_options,
220            batch_size,
221            compression,
222            max_row_group_size,
223        )
224        .expect("Failed to create catalog from path")
225    }
226
227    /// Creates a new [`ParquetDataCatalog`] instance from a URI with optional storage options.
228    ///
229    /// Supports various URI schemes including local file paths and multiple cloud storage backends
230    /// supported by the `object_store` crate.
231    ///
232    /// # Supported URI Schemes
233    ///
234    /// - **AWS S3**: `s3://bucket/path`.
235    /// - **Google Cloud Storage**: `gs://bucket/path` or `gcs://bucket/path`.
236    /// - **Azure Blob Storage**: `az://container/path` or `abfs://container@account.dfs.core.windows.net/path`.
237    /// - **HTTP/WebDAV**: `http://` or `https://`.
238    /// - **Local files**: `file://path` or plain paths.
239    ///
240    /// # Parameters
241    ///
242    /// - `uri`: The URI for the data storage location.
243    /// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
244    ///   - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
245    ///   - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
246    ///   - For Azure: `account_name`, `account_key`, `sas_token`, etc.
247    /// - `batch_size`: Number of records to process in each batch (default: 5000).
248    /// - `compression`: Parquet compression algorithm (default: SNAPPY).
249    /// - `max_row_group_size`: Maximum rows per Parquet row group (default: 5000).
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if:
254    /// - The URI format is invalid or unsupported.
255    /// - The object store cannot be created or accessed.
256    /// - Authentication fails for cloud storage backends.
257    ///
258    /// # Examples
259    ///
260    /// ```rust,no_run
261    /// use ahash::AHashMap;
262    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
263    ///
264    /// // Local filesystem
265    /// let local_catalog = ParquetDataCatalog::from_uri(
266    ///     "/tmp/nautilus_data",
267    ///     None, None, None, None
268    /// )?;
269    ///
270    /// // S3 bucket
271    /// let s3_catalog = ParquetDataCatalog::from_uri(
272    ///     "s3://my-bucket/nautilus-data",
273    ///     None, None, None, None
274    /// )?;
275    ///
276    /// // Google Cloud Storage
277    /// let gcs_catalog = ParquetDataCatalog::from_uri(
278    ///     "gs://my-bucket/nautilus-data",
279    ///     None, None, None, None
280    /// )?;
281    ///
282    /// // Azure Blob Storage
283    /// let azure_catalog = ParquetDataCatalog::from_uri(
284    ///     "az://container/nautilus-data",
285    ///     storage_options, None, None, None
286    /// )?;
287    ///
288    /// // S3 with custom endpoint and credentials
289    /// let mut storage_options = HashMap::new();
290    /// storage_options.insert("endpoint_url".to_string(), "https://my-s3-endpoint.com".to_string());
291    /// storage_options.insert("access_key_id".to_string(), "my-key".to_string());
292    /// storage_options.insert("secret_access_key".to_string(), "my-secret".to_string());
293    ///
294    /// let s3_catalog = ParquetDataCatalog::from_uri(
295    ///     "s3://my-bucket/nautilus-data",
296    ///     Some(storage_options),
297    ///     None, None, None,
298    /// )?;
299    /// # Ok::<(), anyhow::Error>(())
300    /// ```
301    pub fn from_uri(
302        uri: &str,
303        storage_options: Option<AHashMap<String, String>>,
304        batch_size: Option<usize>,
305        compression: Option<parquet::basic::Compression>,
306        max_row_group_size: Option<usize>,
307    ) -> anyhow::Result<Self> {
308        let batch_size = batch_size.unwrap_or(5000);
309        let compression = compression.unwrap_or(parquet::basic::Compression::SNAPPY);
310        let max_row_group_size = max_row_group_size.unwrap_or(5000);
311
312        let location =
313            crate::parquet::create_object_store_location_from_path(uri, storage_options)?;
314
315        Ok(Self {
316            base_path: location.base_path,
317            original_uri: location.original_uri,
318            object_store: location.object_store,
319            session: session::DataBackendSession::new(batch_size),
320            batch_size,
321            compression,
322            max_row_group_size,
323        })
324    }
325
326    /// Returns the base path of the catalog for testing purposes.
327    #[must_use]
328    pub fn get_base_path(&self) -> String {
329        self.base_path.clone()
330    }
331
332    /// Resets the backend session to clear any cached table registrations.
333    ///
334    /// This is useful during catalog operations when files are being modified
335    /// and we need to ensure fresh data is loaded.
336    pub fn reset_session(&mut self) {
337        self.session.clear_registered_tables();
338    }
339
340    /// Writes mixed data types to the catalog by separating them into type-specific collections.
341    ///
342    /// This method takes a heterogeneous collection of market data and separates it by type,
343    /// then writes each type to its appropriate location in the catalog. This is useful when
344    /// processing mixed data streams or bulk data imports.
345    ///
346    /// # Parameters
347    ///
348    /// - `data`: A vector of mixed [`Data`] enum variants.
349    /// - `start`: Optional start timestamp to override the data's natural range.
350    /// - `end`: Optional end timestamp to override the data's natural range.
351    ///
352    /// # Notes
353    ///
354    /// - Data is automatically sorted by type before writing.
355    /// - Each data type is written to its own directory structure.
356    /// - Instrument data handling is not yet implemented (TODO).
357    ///
358    /// # Examples
359    ///
360    /// ```rust,no_run
361    /// use nautilus_model::data::Data;
362    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
363    ///
364    /// let catalog = ParquetDataCatalog::new(/* ... */);
365    /// let mixed_data: Vec<Data> = vec![/* mixed data types */];
366    ///
367    /// catalog.write_data_enum(mixed_data, None, None)?;
368    /// ```
369    pub fn write_data_enum(
370        &self,
371        data: &[Data],
372        start: Option<UnixNanos>,
373        end: Option<UnixNanos>,
374        skip_disjoint_check: Option<bool>,
375    ) -> anyhow::Result<()> {
376        let mut deltas: Vec<OrderBookDelta> = Vec::new();
377        let mut depth10s: Vec<OrderBookDepth10> = Vec::new();
378        let mut quotes: Vec<QuoteTick> = Vec::new();
379        let mut trades: Vec<TradeTick> = Vec::new();
380        let mut bars: Vec<Bar> = Vec::new();
381        let mut mark_prices: Vec<MarkPriceUpdate> = Vec::new();
382        let mut index_prices: Vec<IndexPriceUpdate> = Vec::new();
383        let mut statuses: Vec<InstrumentStatus> = Vec::new();
384        let mut closes: Vec<InstrumentClose> = Vec::new();
385        // Group custom data by full DataType identity (type_name + identifier + metadata)
386        // so each batch is written to the correct path with consistent schema/metadata.
387        let custom_data_key = |c: &CustomData| {
388            (
389                c.data_type.type_name().to_string(),
390                c.data_type.identifier().map(String::from),
391                c.data_type.metadata_str(),
392            )
393        };
394        let mut custom_data: AHashMap<(String, Option<String>, String), Vec<CustomData>> =
395            AHashMap::new();
396
397        for d in data.iter().cloned() {
398            match d {
399                Data::Deltas(_) => {}
400                Data::Delta(d) => {
401                    deltas.push(d);
402                }
403                Data::Depth10(d) => {
404                    depth10s.push(*d);
405                }
406                Data::Quote(d) => {
407                    quotes.push(d);
408                }
409                Data::Trade(d) => {
410                    trades.push(d);
411                }
412                Data::Bar(d) => {
413                    bars.push(d);
414                }
415                Data::MarkPriceUpdate(p) => {
416                    mark_prices.push(p);
417                }
418                Data::IndexPriceUpdate(p) => {
419                    index_prices.push(p);
420                }
421                Data::InstrumentStatus(s) => {
422                    statuses.push(s);
423                }
424                Data::InstrumentClose(c) => {
425                    closes.push(c);
426                }
427                Data::Custom(c) => {
428                    custom_data.entry(custom_data_key(&c)).or_default().push(c);
429                }
430            }
431        }
432
433        // Instruments are handled separately via write_instruments method
434
435        self.write_to_parquet(deltas, start, end, skip_disjoint_check)?;
436        self.write_to_parquet(depth10s, start, end, skip_disjoint_check)?;
437        self.write_to_parquet(quotes, start, end, skip_disjoint_check)?;
438        self.write_to_parquet(trades, start, end, skip_disjoint_check)?;
439        self.write_to_parquet(bars, start, end, skip_disjoint_check)?;
440        self.write_to_parquet(mark_prices, start, end, skip_disjoint_check)?;
441        self.write_to_parquet(index_prices, start, end, skip_disjoint_check)?;
442        self.write_to_parquet(statuses, start, end, skip_disjoint_check)?;
443        self.write_to_parquet(closes, start, end, skip_disjoint_check)?;
444
445        for (_, items) in custom_data {
446            self.write_custom_data_batch(items, start, end, skip_disjoint_check)?;
447        }
448
449        Ok(())
450    }
451
452    /// Writes typed data to a Parquet file in the catalog.
453    ///
454    /// This is the core method for persisting market data to the catalog. It handles data
455    /// validation, batching, compression, and ensures proper file organization with
456    /// timestamp-based naming.
457    ///
458    /// # Type Parameters
459    ///
460    /// - `T`: The data type to write, must implement required traits for serialization and cataloging.
461    ///
462    /// # Parameters
463    ///
464    /// - `data`: Vector of data records to write (must be in ascending timestamp order).
465    /// - `start`: Optional start timestamp to override the natural data range.
466    /// - `end`: Optional end timestamp to override the natural data range.
467    ///
468    /// # Returns
469    ///
470    /// Returns the [`PathBuf`] of the created file, or an empty path if no data was provided.
471    /// If the target file already exists, returns the path without writing (skips write).
472    ///
473    /// # Errors
474    ///
475    /// Returns an error if:
476    /// - Data serialization to Arrow record batches fails.
477    /// - Object store write operations fail.
478    /// - File path construction fails.
479    /// - Writing would create non-disjoint timestamp intervals.
480    ///
481    /// # Panics
482    ///
483    /// Panics if:
484    /// - Data timestamps are not in ascending order.
485    /// - Record batches are empty after conversion.
486    /// - Required metadata is missing from the schema.
487    ///
488    /// # Examples
489    ///
490    /// ```rust,no_run
491    /// use nautilus_model::data::QuoteTick;
492    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
493    ///
494    /// let catalog = ParquetDataCatalog::new(/* ... */);
495    /// let quotes: Vec<QuoteTick> = vec![/* quote data */];
496    ///
497    /// let path = catalog.write_to_parquet(quotes, None, None)?;
498    /// println!("Data written to: {:?}", path);
499    /// # Ok::<(), anyhow::Error>(())
500    /// ```
501    pub fn write_to_parquet<T>(
502        &self,
503        data: Vec<T>,
504        start: Option<UnixNanos>,
505        end: Option<UnixNanos>,
506        skip_disjoint_check: Option<bool>,
507    ) -> anyhow::Result<PathBuf>
508    where
509        T: HasTsInit + EncodeToRecordBatch + CatalogPathPrefix,
510    {
511        if data.is_empty() {
512            return Ok(PathBuf::new());
513        }
514
515        let type_name = to_snake_case(std::any::type_name::<T>());
516        Self::check_ascending_timestamps(&data, &type_name)?;
517
518        let start_ts = start.unwrap_or(data.first().unwrap().ts_init());
519        let end_ts = end.unwrap_or(data.last().unwrap().ts_init());
520
521        let batches = self.data_to_record_batches(data)?;
522        let schema = batches.first().expect("Batches are empty.").schema();
523
524        let identifier = if T::path_prefix() == "bars" {
525            schema.metadata.get("bar_type").cloned()
526        } else {
527            schema.metadata.get("instrument_id").cloned()
528        };
529
530        let directory = self.make_path(T::path_prefix(), identifier.as_deref())?;
531        let filename = timestamps_to_filename(start_ts, end_ts);
532        let path = PathBuf::from(format!("{directory}/{filename}"));
533        let object_path = self.to_object_path(&path.to_string_lossy())?;
534
535        let file_exists = self.execute_async(async {
536            let exists: bool = self.object_store.head(&object_path).await.is_ok();
537            Ok(exists)
538        })?;
539
540        if file_exists {
541            log::info!("File {} already exists, skipping write", path.display());
542            return Ok(path);
543        }
544
545        if !skip_disjoint_check.unwrap_or(false) {
546            let current_intervals = self.get_directory_intervals(&directory)?;
547            let new_interval = (start_ts.as_u64(), end_ts.as_u64());
548            let mut new_intervals = current_intervals.clone();
549            new_intervals.push(new_interval);
550
551            if !are_intervals_disjoint(&new_intervals) {
552                anyhow::bail!(
553                    "Writing file {filename} with interval ({start_ts}, {end_ts}) would create \
554                    non-disjoint intervals. Existing intervals: {current_intervals:?}"
555                );
556            }
557        }
558
559        log::info!(
560            "Writing {} batches of {type_name} data to {}",
561            batches.len(),
562            path.display(),
563        );
564
565        self.execute_async(async {
566            write_batches_to_object_store(
567                &batches,
568                self.object_store.clone(),
569                &object_path,
570                Some(self.compression),
571                Some(self.max_row_group_size),
572                None,
573            )
574            .await
575        })?;
576
577        Ok(path)
578    }
579
580    /// Writes custom data to a Parquet file in the catalog.
581    ///
582    /// This method handles writing custom data types that implement `CustomDataTrait`.
583    /// Custom data is organized by type name in a `custom/{type_name}/` directory structure.
584    ///
585    /// # Parameters
586    ///
587    /// - `data`: Vector of custom data items to write (must be in ascending timestamp order).
588    /// - `start`: Optional start timestamp to override the natural data range.
589    /// - `end`: Optional end timestamp to override the natural data range.
590    /// - `skip_disjoint_check`: Whether to skip interval disjointness validation.
591    ///
592    /// # Returns
593    ///
594    /// Returns the [`PathBuf`] of the created file, or an empty path if no data was provided.
595    ///
596    /// # Errors
597    ///
598    /// Returns an error if:
599    /// - Data serialization to Arrow record batches fails.
600    /// - Object store write operations fail.
601    /// - File path construction fails.
602    /// - Writing would create non-disjoint timestamp intervals (unless skipped).
603    pub fn write_custom_data_batch(
604        &self,
605        data: Vec<CustomData>,
606        start: Option<UnixNanos>,
607        end: Option<UnixNanos>,
608        skip_disjoint_check: Option<bool>,
609    ) -> anyhow::Result<PathBuf> {
610        if data.is_empty() {
611            return Ok(PathBuf::new());
612        }
613
614        let (batch, type_name, identifier, start_ts, end_ts) = prepare_custom_data_batch(data)?;
615        let start_ts = start.unwrap_or(start_ts);
616        let end_ts = end.unwrap_or(end_ts);
617        let batches = vec![batch];
618
619        let directory = self.make_path_custom_data(&type_name, identifier.as_deref())?;
620        let filename = timestamps_to_filename(start_ts, end_ts);
621        let path = PathBuf::from(format!("{directory}/{filename}"));
622        let object_path = self.to_object_path(&path.to_string_lossy())?;
623
624        let file_exists = self.execute_async(async {
625            let exists: bool = self.object_store.head(&object_path).await.is_ok();
626            Ok(exists)
627        })?;
628
629        if file_exists {
630            log::info!("File {} already exists, skipping write", path.display());
631            return Ok(path);
632        }
633
634        if !skip_disjoint_check.unwrap_or(false) {
635            let current_intervals = self.get_directory_intervals(&directory)?;
636            let new_interval = (start_ts.as_u64(), end_ts.as_u64());
637            let mut new_intervals = current_intervals.clone();
638            new_intervals.push(new_interval);
639
640            if !are_intervals_disjoint(&new_intervals) {
641                anyhow::bail!(
642                    "Writing file {filename} with interval ({start_ts}, {end_ts}) would create \
643                    non-disjoint intervals. Existing intervals: {current_intervals:?}"
644                );
645            }
646        }
647
648        self.execute_async(async {
649            write_batches_to_object_store(
650                &batches,
651                self.object_store.clone(),
652                &object_path,
653                Some(self.compression),
654                Some(self.max_row_group_size),
655                None,
656            )
657            .await
658        })?;
659
660        Ok(path)
661    }
662
663    /// Writes instruments to Parquet files in the catalog.
664    ///
665    /// Instruments are stored under their instrument ID directory using timestamp-ranged
666    /// file names, allowing multiple historical versions of the same instrument to be
667    /// appended over time:
668    /// `data/instruments/{instrument_id}/{start_ts}-{end_ts}.parquet`
669    ///
670    /// # Parameters
671    ///
672    /// - `instruments`: Vector of instruments to write.
673    ///
674    /// # Returns
675    ///
676    /// Returns a vector of paths to the created files.
677    ///
678    /// # Errors
679    ///
680    /// Returns an error if:
681    /// - Data serialization fails.
682    /// - Object store write operations fail.
683    /// - File path construction fails.
684    ///
685    /// # Examples
686    ///
687    /// ```rust,no_run
688    /// use nautilus_model::instruments::InstrumentAny;
689    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
690    ///
691    /// let catalog = ParquetDataCatalog::new(/* ... */);
692    /// let instruments: Vec<InstrumentAny> = vec![/* instruments */];
693    ///
694    /// let paths = catalog.write_instruments(instruments)?;
695    /// # Ok::<(), anyhow::Error>(())
696    /// ```
697    pub fn write_instruments(
698        &self,
699        instruments: Vec<InstrumentAny>,
700    ) -> anyhow::Result<Vec<PathBuf>> {
701        use nautilus_model::instruments::Instrument;
702
703        if instruments.is_empty() {
704            return Ok(Vec::new());
705        }
706
707        // Group instruments by concrete type and instrument_id so mixed InstrumentAny
708        // inputs are written as separate parquet batches with stable ordering.
709        let mut by_type_and_id: BTreeMap<(String, String), Vec<InstrumentAny>> = BTreeMap::new();
710
711        for instrument in instruments {
712            let instrument_type = Self::instrument_type_name(&instrument).to_string();
713            let instrument_id = Instrument::id(&instrument).to_string();
714            by_type_and_id
715                .entry((instrument_type, instrument_id))
716                .or_default()
717                .push(instrument);
718        }
719
720        let mut paths = Vec::new();
721
722        for ((_instrument_type, instrument_id), instrument_group) in by_type_and_id {
723            Self::check_ascending_timestamps(&instrument_group, "instrument")?;
724
725            let start_ts = HasTsInit::ts_init(instrument_group.first().unwrap());
726            let end_ts = HasTsInit::ts_init(instrument_group.last().unwrap());
727            let batches = self.data_to_record_batches(instrument_group)?;
728            if batches.is_empty() {
729                continue;
730            }
731
732            let directory = self.make_path("instruments", Some(instrument_id.as_str()))?;
733            let filename = timestamps_to_filename(start_ts, end_ts);
734            let path = PathBuf::from(format!("{directory}/{filename}"));
735            let object_path = self.to_object_path(&path.to_string_lossy())?;
736
737            let file_exists = self
738                .execute_async(async { Ok(self.object_store.head(&object_path).await.is_ok()) })?;
739
740            if file_exists {
741                log::info!(
742                    "Instrument file {} already exists, skipping write",
743                    path.display()
744                );
745                paths.push(path);
746                continue;
747            }
748
749            let current_intervals = self.get_directory_intervals(&directory)?;
750            let new_interval = (start_ts.as_u64(), end_ts.as_u64());
751            let mut new_intervals = current_intervals.clone();
752            new_intervals.push(new_interval);
753
754            if !are_intervals_disjoint(&new_intervals) {
755                anyhow::bail!(
756                    "Writing file {filename} with interval ({start_ts}, {end_ts}) would create \
757                    non-disjoint intervals. Existing intervals: {current_intervals:?}"
758                );
759            }
760
761            log::info!(
762                "Writing {} batches of instrument data for {instrument_id} to {}",
763                batches.len(),
764                path.display(),
765            );
766
767            // ArrowWriter stores the full schema (including "class" metadata) in ARROW:schema.
768            // When reading, use the builder's schema for metadata (see query_instruments).
769            self.execute_async(async {
770                write_batches_to_object_store(
771                    &batches,
772                    self.object_store.clone(),
773                    &object_path,
774                    Some(self.compression),
775                    Some(self.max_row_group_size),
776                    None,
777                )
778                .await
779            })?;
780
781            paths.push(path);
782        }
783
784        Ok(paths)
785    }
786
787    /// Queries instruments from the catalog.
788    ///
789    /// Instruments are stored by instrument ID in timestamp-ranged parquet files under
790    /// `data/instruments/{instrument_id}/`. Legacy `instrument.parquet` files are still
791    /// supported for backwards compatibility.
792    ///
793    /// # Parameters
794    ///
795    /// - `instrument_ids`: Optional list of instrument IDs to filter by. If `None`, queries all instruments.
796    ///
797    /// # Returns
798    ///
799    /// Returns a vector of `InstrumentAny` instances, or an error if the operation fails.
800    ///
801    /// # Errors
802    ///
803    /// Returns an error if:
804    /// - File discovery fails.
805    /// - File reading fails.
806    /// - Data deserialization fails.
807    ///
808    /// # Examples
809    ///
810    /// ```rust,no_run
811    /// use nautilus_model::instruments::InstrumentAny;
812    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
813    ///
814    /// let catalog = ParquetDataCatalog::new(/* ... */);
815    ///
816    /// // Query all instruments
817    /// let instruments = catalog.query_instruments(None)?;
818    ///
819    /// // Query specific instruments
820    /// let instruments = catalog.query_instruments(Some(vec!["EUR/USD.SIM".to_string()]))?;
821    /// # Ok::<(), anyhow::Error>(())
822    /// ```
823    pub fn query_instruments(
824        &self,
825        instrument_ids: Option<&[String]>,
826    ) -> anyhow::Result<Vec<InstrumentAny>> {
827        self.query_instruments_filtered(instrument_ids, None, None)
828    }
829
830    /// Queries instruments from the catalog with optional timestamp filtering.
831    ///
832    /// This reads all matching parquet files under `data/instruments/{instrument_id}/`,
833    /// including legacy `instrument.parquet` files, decodes the records back to
834    /// `InstrumentAny`, and filters them by `ts_init` when a range is provided.
835    pub fn query_instruments_filtered(
836        &self,
837        instrument_ids: Option<&[String]>,
838        start: Option<UnixNanos>,
839        end: Option<UnixNanos>,
840    ) -> anyhow::Result<Vec<InstrumentAny>> {
841        use nautilus_serialization::arrow::instrument::decode_instrument_any_batch;
842
843        let base_dir = self.make_path("instruments", None)?;
844        let mut all_instruments = Vec::new();
845        let start_u64 = start.map(|ts| ts.as_u64());
846        let end_u64 = end.map(|ts| ts.as_u64());
847
848        let list_result = self.execute_async(async {
849            let prefix = ObjectPath::from(format!("{base_dir}/"));
850            let mut stream = self.object_store.list(Some(&prefix));
851            let mut objects = Vec::new();
852            while let Some(object) = stream.next().await {
853                objects.push(object?);
854            }
855            Ok::<Vec<_>, anyhow::Error>(objects)
856        })?;
857
858        let mut instrument_files = Vec::new();
859
860        for object in list_result {
861            let path_str = object.location.to_string();
862            if !path_str.ends_with(".parquet") {
863                continue;
864            }
865
866            let path_parts: Vec<&str> = path_str.split('/').collect();
867            if path_parts.len() < 2 {
868                continue;
869            }
870
871            let instrument_id_dir = path_parts[path_parts.len() - 2];
872
873            if let Some(ids) = instrument_ids
874                && !ids
875                    .iter()
876                    .map(|id| urisafe_instrument_id(id))
877                    .any(|x| x.as_str() == urisafe_instrument_id(instrument_id_dir))
878            {
879                continue;
880            }
881
882            let include_file = if path_str.ends_with("/instrument.parquet") {
883                true
884            } else {
885                query_intersects_filename(&path_str, start_u64, end_u64)
886            };
887
888            if include_file {
889                instrument_files.push(path_str);
890            }
891        }
892
893        instrument_files.sort();
894
895        for file_path in instrument_files {
896            let object_path = self.to_object_path_parsed(&file_path)?;
897            let (batches, builder_schema) = self.execute_async(async {
898                read_parquet_from_object_store(self.object_store.clone(), &object_path).await
899            })?;
900
901            let metadata: std::collections::HashMap<String, String> =
902                builder_schema.metadata().clone();
903
904            for batch in batches {
905                let mut instruments = decode_instrument_any_batch(&metadata, &batch)?;
906
907                if start.is_some() || end.is_some() {
908                    instruments.retain(|instrument| {
909                        let ts = HasTsInit::ts_init(instrument).as_u64();
910                        start_u64.is_none_or(|value| ts >= value)
911                            && end_u64.is_none_or(|value| ts <= value)
912                    });
913                }
914                all_instruments.extend(instruments);
915            }
916        }
917
918        all_instruments.sort_by_key(HasTsInit::ts_init);
919
920        Ok(all_instruments)
921    }
922
923    /// Writes typed data to a JSON file in the catalog.
924    ///
925    /// This method provides an alternative to Parquet format for data export and debugging.
926    /// JSON files are human-readable but less efficient for large datasets.
927    ///
928    /// # Type Parameters
929    ///
930    /// - `T`: The data type to write, must implement serialization and cataloging traits.
931    ///
932    /// # Parameters
933    ///
934    /// - `data`: Vector of data records to write (must be in ascending timestamp order).
935    /// - `path`: Optional custom directory path (defaults to catalog's standard structure).
936    /// - `write_metadata`: Whether to write a separate metadata file alongside the data.
937    ///
938    /// # Returns
939    ///
940    /// Returns the [`PathBuf`] of the created JSON file.
941    ///
942    /// # Errors
943    ///
944    /// Returns an error if:
945    /// - JSON serialization fails.
946    /// - Object store write operations fail.
947    /// - File path construction fails.
948    ///
949    /// # Panics
950    ///
951    /// Panics if data timestamps are not in ascending order.
952    ///
953    /// # Examples
954    ///
955    /// ```rust,no_run
956    /// use std::path::PathBuf;
957    /// use nautilus_model::data::TradeTick;
958    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
959    ///
960    /// let catalog = ParquetDataCatalog::new(/* ... */);
961    /// let trades: Vec<TradeTick> = vec![/* trade data */];
962    ///
963    /// let path = catalog.write_to_json(
964    ///     trades,
965    ///     Some(PathBuf::from("/custom/path")),
966    ///     true  // write metadata
967    /// )?;
968    /// # Ok::<(), anyhow::Error>(())
969    /// ```
970    pub fn write_to_json<T>(
971        &self,
972        data: Vec<T>,
973        path: Option<PathBuf>,
974        write_metadata: bool,
975    ) -> anyhow::Result<PathBuf>
976    where
977        T: HasTsInit + Serialize + CatalogPathPrefix + EncodeToRecordBatch,
978    {
979        if data.is_empty() {
980            return Ok(PathBuf::new());
981        }
982
983        let type_name = to_snake_case(std::any::type_name::<T>());
984        Self::check_ascending_timestamps(&data, &type_name)?;
985
986        let start_ts = data.first().unwrap().ts_init();
987        let end_ts = data.last().unwrap().ts_init();
988
989        let directory =
990            path.unwrap_or_else(|| PathBuf::from(self.make_path(T::path_prefix(), None).unwrap()));
991        let filename = timestamps_to_filename(start_ts, end_ts).replace(".parquet", ".json");
992        let json_path = directory.join(&filename);
993
994        log::info!(
995            "Writing {} records of {type_name} data to {}",
996            data.len(),
997            json_path.display(),
998        );
999
1000        if write_metadata {
1001            let metadata = T::chunk_metadata(&data);
1002            let metadata_path = json_path.with_extension("metadata.json");
1003            log::info!("Writing metadata to {}", metadata_path.display());
1004
1005            // Use object store for metadata file
1006            let metadata_object_path = ObjectPath::from(metadata_path.to_string_lossy().as_ref());
1007            let metadata_json = serde_json::to_vec_pretty(&metadata)?;
1008            self.execute_async(async {
1009                let _: object_store::PutResult = self
1010                    .object_store
1011                    .put(&metadata_object_path, metadata_json.into())
1012                    .await?;
1013                Ok(())
1014            })?;
1015        }
1016
1017        // Use object store for main JSON file
1018        let json_object_path = ObjectPath::from(json_path.to_string_lossy().as_ref());
1019        let json_data = serde_json::to_vec_pretty(&serde_json::to_value(data)?)?;
1020        self.execute_async(async {
1021            let _: object_store::PutResult = self
1022                .object_store
1023                .put(&json_object_path, json_data.into())
1024                .await?;
1025            Ok(())
1026        })?;
1027
1028        Ok(json_path)
1029    }
1030
1031    /// Validates that data timestamps are in ascending order.
1032    ///
1033    /// # Parameters
1034    ///
1035    /// - `data`: Slice of data records to validate.
1036    /// - `type_name`: Name of the data type for error messages.
1037    ///
1038    pub fn check_ascending_timestamps<T: HasTsInit>(
1039        data: &[T],
1040        type_name: &str,
1041    ) -> anyhow::Result<()> {
1042        if !data
1043            .array_windows()
1044            .all(|[a, b]| a.ts_init() <= b.ts_init())
1045        {
1046            anyhow::bail!("{type_name} timestamps must be in ascending order");
1047        }
1048
1049        Ok(())
1050    }
1051
1052    fn instrument_type_name(instrument: &InstrumentAny) -> &'static str {
1053        match instrument {
1054            InstrumentAny::Betting(_) => "BettingInstrument",
1055            InstrumentAny::BinaryOption(_) => "BinaryOption",
1056            InstrumentAny::Cfd(_) => "Cfd",
1057            InstrumentAny::Commodity(_) => "Commodity",
1058            InstrumentAny::CryptoFuture(_) => "CryptoFuture",
1059            InstrumentAny::CryptoOption(_) => "CryptoOption",
1060            InstrumentAny::CryptoPerpetual(_) => "CryptoPerpetual",
1061            InstrumentAny::CurrencyPair(_) => "CurrencyPair",
1062            InstrumentAny::Equity(_) => "Equity",
1063            InstrumentAny::FuturesContract(_) => "FuturesContract",
1064            InstrumentAny::FuturesSpread(_) => "FuturesSpread",
1065            InstrumentAny::IndexInstrument(_) => "IndexInstrument",
1066            InstrumentAny::OptionContract(_) => "OptionContract",
1067            InstrumentAny::OptionSpread(_) => "OptionSpread",
1068            InstrumentAny::PerpetualContract(_) => "PerpetualContract",
1069            InstrumentAny::TokenizedAsset(_) => "TokenizedAsset",
1070        }
1071    }
1072
1073    /// Converts data into Arrow record batches for Parquet serialization.
1074    ///
1075    /// This method chunks the data according to the configured batch size and converts
1076    /// each chunk into an Arrow record batch with appropriate metadata.
1077    ///
1078    /// # Type Parameters
1079    ///
1080    /// - `T`: The data type to convert, must implement required encoding traits.
1081    ///
1082    /// # Parameters
1083    ///
1084    /// - `data`: Vector of data records to convert.
1085    ///
1086    /// # Returns
1087    ///
1088    /// Returns a vector of Arrow [`RecordBatch`] instances ready for Parquet serialization.
1089    ///
1090    /// # Errors
1091    ///
1092    /// Returns an error if record batch encoding fails for any chunk.
1093    pub fn data_to_record_batches<T>(&self, data: Vec<T>) -> anyhow::Result<Vec<RecordBatch>>
1094    where
1095        T: HasTsInit + EncodeToRecordBatch,
1096    {
1097        let mut batches = Vec::new();
1098
1099        for chunk in &data.into_iter().chunks(self.batch_size) {
1100            let data = chunk.collect_vec();
1101            let metadata = EncodeToRecordBatch::chunk_metadata(&data);
1102            let record_batch = T::encode_batch(&metadata, &data)?;
1103            batches.push(record_batch);
1104        }
1105
1106        Ok(batches)
1107    }
1108
1109    /// Extends the timestamp range of an existing Parquet file by renaming it.
1110    ///
1111    /// This method finds an existing file that is adjacent to the specified time range
1112    /// and renames it to include the new range. This is useful when appending data
1113    /// that extends the time coverage of existing files.
1114    ///
1115    /// # Parameters
1116    ///
1117    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1118    /// - `identifier`: Optional identifier to target a specific instrument's data. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1119    /// - `start`: Start timestamp of the new range to extend to.
1120    /// - `end`: End timestamp of the new range to extend to.
1121    ///
1122    /// # Returns
1123    ///
1124    /// Returns `Ok(())` on success, or an error if the operation fails.
1125    ///
1126    /// # Errors
1127    ///
1128    /// Returns an error if:
1129    /// - The directory path cannot be constructed.
1130    /// - No adjacent file is found to extend.
1131    /// - File rename operations fail.
1132    /// - Interval validation fails after extension.
1133    ///
1134    /// # Examples
1135    ///
1136    /// ```rust,no_run
1137    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1138    /// use nautilus_core::UnixNanos;
1139    ///
1140    /// let catalog = ParquetDataCatalog::new(/* ... */);
1141    ///
1142    /// // Extend a file's range backwards or forwards
1143    /// catalog.extend_file_name(
1144    ///     "quotes",
1145    ///     Some("BTC/USD.SIM".to_string()),
1146    ///     UnixNanos::from(1609459200000000000),
1147    ///     UnixNanos::from(1609545600000000000)
1148    /// )?;
1149    /// # Ok::<(), anyhow::Error>(())
1150    /// ```
1151    pub fn extend_file_name(
1152        &self,
1153        data_cls: &str,
1154        identifier: Option<&str>,
1155        start: UnixNanos,
1156        end: UnixNanos,
1157    ) -> anyhow::Result<()> {
1158        let directory = self.make_path(data_cls, identifier)?;
1159        let intervals = self.get_directory_intervals(&directory)?;
1160
1161        let start = start.as_u64();
1162        let end = end.as_u64();
1163
1164        for interval in intervals {
1165            if interval.0 == end + 1 {
1166                // Extend backwards: new file covers [start, interval.1]
1167                self.rename_parquet_file(&directory, interval.0, interval.1, start, interval.1)?;
1168                break;
1169            } else if interval.1 == start - 1 {
1170                // Extend forwards: new file covers [interval.0, end]
1171                self.rename_parquet_file(&directory, interval.0, interval.1, interval.0, end)?;
1172                break;
1173            }
1174        }
1175
1176        let intervals = self.get_directory_intervals(&directory)?;
1177
1178        if !are_intervals_disjoint(&intervals) {
1179            anyhow::bail!("Intervals are not disjoint after extending a file");
1180        }
1181
1182        Ok(())
1183    }
1184
1185    /// Lists all Parquet files in a specified directory.
1186    ///
1187    /// This method scans a directory and returns the full paths of all files with the `.parquet`
1188    /// extension. It works with both local filesystems and remote object stores, making it
1189    /// suitable for various storage backends.
1190    ///
1191    /// # Parameters
1192    ///
1193    /// - `directory`: The directory path to scan for Parquet files.
1194    ///
1195    /// # Returns
1196    ///
1197    /// Returns a vector of full file paths (as strings) for all Parquet files found in the directory.
1198    /// The paths are relative to the object store root and suitable for use with object store operations.
1199    /// Returns an empty vector if the directory doesn't exist or contains no Parquet files.
1200    ///
1201    /// # Errors
1202    ///
1203    /// Returns an error if:
1204    /// - Object store listing operations fail.
1205    /// - Directory access is denied.
1206    /// - Network issues occur (for remote object stores).
1207    ///
1208    /// # Notes
1209    ///
1210    /// - Only files ending with `.parquet` are included.
1211    /// - Subdirectories are not recursively scanned.
1212    /// - File paths are returned in the order provided by the object store.
1213    /// - Works with all supported object store backends (local, S3, GCS, Azure, etc.).
1214    ///
1215    /// # Examples
1216    ///
1217    /// ```rust,no_run
1218    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1219    ///
1220    /// let catalog = ParquetDataCatalog::new(/* ... */);
1221    /// let files = catalog.list_parquet_files("data/quotes/EURUSD")?;
1222    ///
1223    /// for file in files {
1224    ///     println!("Found Parquet file: {}", file);
1225    /// }
1226    /// # Ok::<(), anyhow::Error>(())
1227    /// ```
1228    pub fn list_parquet_files(&self, directory: &str) -> anyhow::Result<Vec<String>> {
1229        self.execute_async(async {
1230            let prefix = ObjectPath::from(format!("{directory}/"));
1231            let mut stream = self.object_store.list(Some(&prefix));
1232            let mut files = Vec::new();
1233
1234            while let Some(object) = stream.next().await {
1235                let object = object?;
1236                if object.location.as_ref().ends_with(".parquet") {
1237                    files.push(object.location.to_string());
1238                }
1239            }
1240            Ok::<Vec<String>, anyhow::Error>(files)
1241        })
1242    }
1243
1244    /// Lists all instrument identifiers for a specific data type.
1245    ///
1246    /// This method scans the data directory for a given data type and extracts
1247    /// all unique instrument identifiers from the directory structure.
1248    ///
1249    /// # Parameters
1250    ///
1251    /// - `data_type`: The data type directory name (e.g., "quotes", "trades", "bars").
1252    ///
1253    /// # Returns
1254    ///
1255    /// Returns a vector of instrument identifier strings.
1256    ///
1257    /// # Errors
1258    ///
1259    /// Returns an error if directory listing fails.
1260    pub fn list_instruments(&self, data_type: &str) -> anyhow::Result<Vec<String>> {
1261        self.execute_async(async {
1262            let prefix = ObjectPath::from(format!("data/{data_type}/"));
1263            let mut stream = self.object_store.list(Some(&prefix));
1264            let mut instruments = HashSet::new();
1265
1266            while let Some(object) = stream.next().await {
1267                let object = object?;
1268                let path = object.location.as_ref();
1269                let parts: Vec<&str> = path.split('/').collect();
1270                if parts.len() >= 3 {
1271                    instruments.insert(parts[2].to_string());
1272                }
1273            }
1274            Ok::<Vec<String>, anyhow::Error>(instruments.into_iter().collect())
1275        })
1276    }
1277
1278    /// Lists Parquet files matching specific criteria (data type, identifiers, time range).
1279    ///
1280    /// This method finds all Parquet files that match the specified criteria by filtering
1281    /// files based on their directory structure and filename timestamps.
1282    ///
1283    /// # Parameters
1284    ///
1285    /// - `data_type`: The data type directory name (e.g., "quotes", "trades", "custom/MyType").
1286    /// - `identifiers`: Optional list of identifiers to filter by.
1287    /// - `start`: Optional start timestamp to filter files by their time range.
1288    /// - `end`: Optional end timestamp to filter files by their time range.
1289    ///
1290    /// # Returns
1291    ///
1292    /// Returns a vector of file paths that match the criteria.
1293    ///
1294    /// # Errors
1295    ///
1296    /// Returns an error if directory listing or file filtering fails.
1297    pub fn list_parquet_files_with_criteria(
1298        &self,
1299        data_type: &str,
1300        identifiers: Option<&[String]>,
1301        start: Option<UnixNanos>,
1302        end: Option<UnixNanos>,
1303    ) -> anyhow::Result<Vec<String>> {
1304        let mut all_files = Vec::new();
1305
1306        let start_u64 = start.map(|s| s.as_u64());
1307        let end_u64 = end.map(|e| e.as_u64());
1308
1309        let base_dir = self.make_path(data_type, None)?;
1310
1311        // Use recursive listing to match Python's glob behavior
1312        let list_result = self.execute_async(async {
1313            let prefix = ObjectPath::from(format!("{base_dir}/"));
1314            let mut stream = self.object_store.list(Some(&prefix));
1315            let mut objects = Vec::new();
1316            while let Some(object) = stream.next().await {
1317                objects.push(object?);
1318            }
1319            Ok::<Vec<_>, anyhow::Error>(objects)
1320        })?;
1321
1322        for object in list_result {
1323            let path_str = object.location.to_string();
1324
1325            // Filter by identifiers if provided
1326            if let Some(ids) = identifiers {
1327                let path_components = extract_path_components(&path_str);
1328                let mut matches = false;
1329
1330                for id in ids {
1331                    if path_components.iter().any(|c| c.contains(id)) {
1332                        matches = true;
1333                        break;
1334                    }
1335                }
1336
1337                if !matches {
1338                    continue;
1339                }
1340            }
1341
1342            // Filter by timestamp range if filename can be parsed
1343            if path_str.ends_with(".parquet")
1344                && query_intersects_filename(&path_str, start_u64, end_u64)
1345            {
1346                all_files.push(path_str);
1347            }
1348        }
1349
1350        Ok(all_files)
1351    }
1352
1353    /// Helper method to reconstruct full URI for remote object store paths
1354    #[must_use]
1355    pub fn reconstruct_full_uri(&self, path_str: &str) -> String {
1356        if path_str.contains("://") {
1357            return path_str.to_string();
1358        }
1359
1360        // Check if this is a remote URI scheme that needs reconstruction
1361        if self.is_remote_uri() {
1362            let path = self.path_under_base(path_str);
1363            if let Ok(uri) = remote_full_uri(&self.original_uri, &path) {
1364                return uri;
1365            }
1366        }
1367
1368        // For local paths, extract the directory from the original URI
1369        if self.original_uri.starts_with("file://") {
1370            // Extract the path from the file:// URI
1371            if let Ok(url) = url::Url::parse(&self.original_uri)
1372                && let Ok(base_path) = url.to_file_path()
1373            {
1374                // Use platform-appropriate path separator for display
1375                // but object store paths always use forward slashes
1376                let base_str = base_path.to_string_lossy();
1377                return self.join_paths(&base_str, path_str);
1378            }
1379        }
1380
1381        // For local paths without file:// prefix, use the original URI as base
1382        if self.base_path.is_empty() {
1383            // If base_path is empty and not a file URI, try using original_uri as base
1384            if self.original_uri.contains("://") {
1385                // Fallback: return the path as-is
1386                path_str.to_string()
1387            } else {
1388                self.join_paths(self.original_uri.trim_end_matches('/'), path_str)
1389            }
1390        } else {
1391            let base = self.base_path.trim_end_matches('/');
1392            self.join_paths(base, path_str)
1393        }
1394    }
1395
1396    /// Helper method to join paths using forward slashes (object store convention)
1397    #[must_use]
1398    fn join_paths(&self, base: &str, path: &str) -> String {
1399        make_object_store_path(base, &[path])
1400    }
1401
1402    /// Resolves a path for use with DataFusion (avoiding Windows path doubling for file://).
1403    /// Returns the path as-is if it is already a full URI or absolute; otherwise builds
1404    /// file:// base + path for local catalogs or reconstruct_full_uri for remote.
1405    #[must_use]
1406    fn resolve_path_for_datafusion(&self, path: &str) -> String {
1407        if path.contains("://") {
1408            return path.to_string();
1409        }
1410
1411        if path.starts_with('/') {
1412            return path.to_string();
1413        }
1414
1415        if self.original_uri.starts_with("file://") {
1416            let base = self.original_uri.trim_end_matches('/');
1417            let path_trimmed = path.trim_end_matches('/');
1418            return format!("{base}/{path_trimmed}");
1419        }
1420        self.reconstruct_full_uri(path)
1421    }
1422
1423    /// Like resolve_path_for_datafusion but ensures the result ends with a trailing slash.
1424    #[must_use]
1425    fn resolve_directory_for_datafusion(&self, directory: &str) -> String {
1426        let mut resolved = self.resolve_path_for_datafusion(directory);
1427        if !resolved.ends_with('/') {
1428            resolved.push('/');
1429        }
1430        resolved
1431    }
1432
1433    /// Returns the path string to push in query_files result list: relative for file://,
1434    /// full URI for remote (so callers can pass to resolve_path_for_datafusion later).
1435    #[must_use]
1436    fn path_for_query_list(&self, path: &str) -> String {
1437        if self.original_uri.starts_with("file://") {
1438            path.to_string()
1439        } else {
1440            self.reconstruct_full_uri(path)
1441        }
1442    }
1443
1444    /// Returns the native path string for the catalog root (for std::fs). Only valid when
1445    /// !is_remote_uri(); uses parquet's file_uri_to_native_path for file:// URIs.
1446    #[must_use]
1447    fn native_base_path_string(&self) -> String {
1448        if self.original_uri.starts_with("file://") {
1449            crate::parquet::file_uri_to_native_path(&self.original_uri)
1450        } else {
1451            self.original_uri.clone()
1452        }
1453    }
1454
1455    /// Helper method to check if the original URI uses a remote object store scheme
1456    #[must_use]
1457    pub fn is_remote_uri(&self) -> bool {
1458        self.original_uri
1459            .split_once("://")
1460            .is_some_and(|(scheme, _)| is_remote_uri_scheme(scheme))
1461    }
1462
1463    /// Executes a query against the catalog to retrieve market data of a specific type.
1464    ///
1465    /// This is the primary method for querying data from the catalog. It registers the appropriate
1466    /// object store with the DataFusion session, finds all relevant Parquet files, and executes
1467    /// the query across them. The method supports filtering by instrument IDs, time ranges, and
1468    /// custom SQL WHERE clauses.
1469    ///
1470    /// # Type Parameters
1471    ///
1472    /// - `T`: The data type to query, must implement required traits for deserialization and cataloging.
1473    ///
1474    /// # Parameters
1475    ///
1476    /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings (e.g., "EUR/USD.SIM")
1477    ///   or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL"). If `None`, queries all identifiers.
1478    ///   For bars, partial matching is supported (e.g., "EUR/USD.SIM" will match "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1479    /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
1480    /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
1481    /// - `where_clause`: Optional SQL WHERE clause for additional filtering (e.g., "price > 100").
1482    /// - `files`: Optional list of specific files to query. If provided, skips file discovery.
1483    /// - `optimize_file_loading`: If `true` (default), registers entire directories with DataFusion,
1484    ///   which is more efficient for managing many files. If `false`, registers each file individually
1485    ///   (needed for operations like consolidation where precise file control is required).
1486    ///
1487    /// # Returns
1488    ///
1489    /// Returns a [`QueryResult`] containing the query execution context and data.
1490    /// Use [`QueryResult::collect()`] to retrieve the actual data records.
1491    ///
1492    /// # Errors
1493    ///
1494    /// Returns an error if:
1495    /// - Object store registration fails for remote URIs.
1496    /// - File discovery fails.
1497    /// - DataFusion query execution fails.
1498    /// - Data deserialization fails.
1499    ///
1500    /// # Performance Notes
1501    ///
1502    /// - Files are automatically filtered by timestamp ranges before querying.
1503    /// - DataFusion optimizes queries across multiple Parquet files.
1504    /// - Use specific instrument IDs and time ranges to improve performance.
1505    /// - WHERE clauses are pushed down to the Parquet reader when possible.
1506    /// - Directory-based registration (`optimize_file_loading=true`) is more efficient for queries
1507    ///   with many files, as it reduces the number of table registrations.
1508    ///
1509    /// # Examples
1510    ///
1511    /// ```rust,no_run
1512    /// use nautilus_model::data::QuoteTick;
1513    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1514    /// use nautilus_core::UnixNanos;
1515    ///
1516    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1517    ///
1518    /// // Query all quote data (uses directory-based registration by default)
1519    /// let result = catalog.query::<QuoteTick>(None, None, None, None, None, true)?;
1520    /// let quotes = result.collect();
1521    ///
1522    /// // Query specific instruments within a time range
1523    /// let result = catalog.query::<QuoteTick>(
1524    ///     Some(vec!["EUR/USD.SIM".to_string(), "GBP/USD.SIM".to_string()]),
1525    ///     Some(UnixNanos::from(1609459200000000000)),
1526    ///     Some(UnixNanos::from(1609545600000000000)),
1527    ///     None,
1528    ///     None,
1529    ///     true
1530    /// )?;
1531    ///
1532    /// // Query with custom WHERE clause and file-based registration
1533    /// let result = catalog.query::<QuoteTick>(
1534    ///     Some(vec!["EUR/USD.SIM".to_string()]),
1535    ///     None,
1536    ///     None,
1537    ///     Some("bid_price > 1.2000"),
1538    ///     None,
1539    ///     false  // Use file-based registration for precise control
1540    /// )?;
1541    /// # Ok::<(), anyhow::Error>(())
1542    /// ```
1543    pub fn query<T>(
1544        &mut self,
1545        identifiers: Option<Vec<String>>,
1546        start: Option<UnixNanos>,
1547        end: Option<UnixNanos>,
1548        where_clause: Option<&str>,
1549        files: Option<Vec<String>>,
1550        optimize_file_loading: bool,
1551    ) -> anyhow::Result<QueryResult>
1552    where
1553        T: DecodeDataFromRecordBatch + CatalogPathPrefix,
1554    {
1555        // Register the object store with the session for remote URIs only.
1556        // For local file:// we do not register: we pass full file URLs to register_parquet
1557        // so DataFusion's default file provider handles them (avoids path doubling on Windows
1558        // where a registered store would receive a path that gets prefixed again).
1559        self.register_remote_object_store()?;
1560
1561        let files_list = if let Some(files) = files {
1562            files
1563        } else {
1564            self.query_files(T::path_prefix(), identifiers, start, end)?
1565        };
1566
1567        if optimize_file_loading {
1568            // Use directory-based registration for efficiency. DataFusion handles
1569            // reading all files in each directory, which is more memory-efficient
1570            // than registering many individual file tables.
1571            let directories: HashSet<String> = files_list
1572                .iter()
1573                .filter_map(|file_uri| {
1574                    // Extract directory path (everything except the filename)
1575                    let path = Path::new(file_uri);
1576                    path.parent().map(|p| p.to_string_lossy().to_string())
1577                })
1578                .collect();
1579
1580            for directory in directories {
1581                // Extract identifier from directory path (last component)
1582                let path_parts: Vec<&str> = directory.split('/').collect();
1583                let identifier = if path_parts.is_empty() {
1584                    "unknown".to_string()
1585                } else {
1586                    path_parts[path_parts.len() - 1].to_string()
1587                };
1588                let safe_sql_identifier = make_sql_safe_identifier(&identifier);
1589
1590                // Create table name from path_prefix and identifier (no filename component)
1591                let table_name = format!("{}_{}", T::path_prefix(), safe_sql_identifier);
1592                let query = build_query(&table_name, start, end, where_clause);
1593
1594                let resolved_path = self.resolve_directory_for_datafusion(&directory);
1595
1596                self.session
1597                    .add_file::<T>(&table_name, &resolved_path, Some(&query), None)?;
1598            }
1599        } else {
1600            // Register files individually (for operations requiring precise file control)
1601            for file_uri in &files_list {
1602                // Extract identifier from file path and filename to create meaningful table names
1603                let identifier = extract_identifier_from_path(file_uri);
1604                let safe_sql_identifier = make_sql_safe_identifier(&identifier);
1605                let safe_filename = extract_sql_safe_filename(file_uri);
1606
1607                // Create table name from path_prefix, identifier, and filename
1608                let table_name = format!(
1609                    "{}_{}_{}",
1610                    T::path_prefix(),
1611                    safe_sql_identifier,
1612                    safe_filename
1613                );
1614                let query = build_query(&table_name, start, end, where_clause);
1615
1616                let resolved_path = self.resolve_path_for_datafusion(file_uri);
1617                self.session
1618                    .add_file::<T>(&table_name, &resolved_path, Some(&query), None)?;
1619            }
1620        }
1621
1622        Ok(self.session.get_query_result())
1623    }
1624
1625    /// Queries typed data from the catalog and returns results as a strongly-typed vector.
1626    ///
1627    /// This is a convenience method that wraps the generic `query` method and automatically
1628    /// collects and converts the results into a vector of the specific data type. It handles
1629    /// the type conversion from the generic [`Data`] enum to the concrete type `T`.
1630    ///
1631    /// # Type Parameters
1632    ///
1633    /// - `T`: The specific data type to query and return. Must implement required traits for
1634    ///   deserialization, cataloging, and conversion from the [`Data`] enum.
1635    ///
1636    /// # Parameters
1637    ///
1638    /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings (e.g., "EUR/USD.SIM")
1639    ///   or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL"). If `None`, queries all identifiers.
1640    ///   For bars, partial matching is supported (e.g., "EUR/USD.SIM" will match "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1641    /// - `start`: Optional start timestamp for filtering (inclusive). If `None`, queries from the beginning.
1642    /// - `end`: Optional end timestamp for filtering (inclusive). If `None`, queries to the end.
1643    /// - `where_clause`: Optional SQL WHERE clause for additional filtering. Use standard SQL syntax
1644    ///   with column names matching the Parquet schema (e.g., "`bid_price` > 1.2000", "volume > 1000").
1645    ///
1646    /// # Returns
1647    ///
1648    /// Returns a vector of the specific data type `T`, sorted by timestamp. The vector will be
1649    /// empty if no data matches the query criteria.
1650    ///
1651    /// # Errors
1652    ///
1653    /// Returns an error if:
1654    /// - The underlying query execution fails.
1655    /// - Data type conversion fails.
1656    /// - Object store access fails.
1657    /// - Invalid WHERE clause syntax is provided.
1658    ///
1659    /// # Performance Considerations
1660    ///
1661    /// - Use specific instrument IDs and time ranges to minimize data scanning.
1662    /// - WHERE clauses are pushed down to Parquet readers when possible.
1663    /// - Results are automatically sorted by timestamp during collection.
1664    /// - Memory usage scales with the amount of data returned.
1665    ///
1666    /// # Examples
1667    ///
1668    /// ```rust,no_run
1669    /// use nautilus_model::data::{QuoteTick, TradeTick, Bar};
1670    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1671    /// use nautilus_core::UnixNanos;
1672    ///
1673    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1674    ///
1675    /// // Query all quotes for a specific instrument
1676    /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1677    ///     Some(vec!["EUR/USD.SIM".to_string()]),
1678    ///     None,
1679    ///     None,
1680    ///     None,
1681    ///     None,
1682    ///     true
1683    /// )?;
1684    ///
1685    /// // Query trades within a specific time range
1686    /// let trades: Vec<TradeTick> = catalog.query_typed_data(
1687    ///     Some(vec!["BTC/USD.SIM".to_string()]),
1688    ///     Some(UnixNanos::from(1609459200000000000)),
1689    ///     Some(UnixNanos::from(1609545600000000000)),
1690    ///     None,
1691    ///     None,
1692    ///     true
1693    /// )?;
1694    ///
1695    /// // Query bars with volume filter (using instrument_id - partial match for bar_type)
1696    /// let bars: Vec<Bar> = catalog.query_typed_data(
1697    ///     Some(vec!["AAPL.NASDAQ".to_string()]),
1698    ///     None,
1699    ///     None,
1700    ///     Some("volume > 1000000"),
1701    ///     None,
1702    ///     true
1703    /// )?;
1704    ///
1705    /// // Query bars with specific bar_type
1706    /// let bars: Vec<Bar> = catalog.query_typed_data(
1707    ///     Some(vec!["AAPL.NASDAQ-1-MINUTE-LAST-EXTERNAL".to_string()]),
1708    ///     None,
1709    ///     None,
1710    ///     None,
1711    ///     None,
1712    ///     true
1713    /// )?;
1714    ///
1715    /// // Query multiple instruments with price filter
1716    /// let quotes: Vec<QuoteTick> = catalog.query_typed_data(
1717    ///     Some(vec!["EUR/USD.SIM".to_string(), "GBP/USD.SIM".to_string()]),
1718    ///     None,
1719    ///     None,
1720    ///     Some("bid_price > 1.2000 AND ask_price < 1.3000"),
1721    ///     None,
1722    ///     true
1723    /// )?;
1724    /// # Ok::<(), anyhow::Error>(())
1725    /// ```
1726    pub fn query_typed_data<T>(
1727        &mut self,
1728        identifiers: Option<Vec<String>>,
1729        start: Option<UnixNanos>,
1730        end: Option<UnixNanos>,
1731        where_clause: Option<&str>,
1732        files: Option<Vec<String>>,
1733        optimize_file_loading: bool,
1734    ) -> anyhow::Result<Vec<T>>
1735    where
1736        T: DecodeDataFromRecordBatch + CatalogPathPrefix + TryFrom<Data>,
1737    {
1738        // Reset session to allow repeated queries (streams are consumed on each query)
1739        self.reset_session();
1740
1741        let query_result = self.query::<T>(
1742            identifiers,
1743            start,
1744            end,
1745            where_clause,
1746            files,
1747            optimize_file_loading,
1748        )?;
1749        let all_data = query_result.collect();
1750
1751        // Convert Data enum variants to specific type T using to_variant
1752        Ok(to_variant::<T>(all_data))
1753    }
1754
1755    /// Queries typed records that are not represented by the [`Data`] enum.
1756    pub fn query_typed<T>(
1757        &mut self,
1758        identifiers: Option<Vec<String>>,
1759        start: Option<UnixNanos>,
1760        end: Option<UnixNanos>,
1761        where_clause: Option<&str>,
1762        files: Option<Vec<String>>,
1763        optimize_file_loading: bool,
1764    ) -> anyhow::Result<Vec<T>>
1765    where
1766        T: DecodeTypedFromRecordBatch + CatalogPathPrefix + HasTsInit,
1767    {
1768        self.reset_session();
1769
1770        self.register_remote_object_store()?;
1771
1772        let files_list = if let Some(files) = files {
1773            files
1774        } else {
1775            self.query_files(T::path_prefix(), identifiers, start, end)?
1776        };
1777
1778        let mut all_records = Vec::new();
1779
1780        if optimize_file_loading {
1781            let directories: HashSet<String> = files_list
1782                .iter()
1783                .filter_map(|file_uri| {
1784                    Path::new(file_uri)
1785                        .parent()
1786                        .map(|path| path.to_string_lossy().to_string())
1787                })
1788                .collect();
1789
1790            for directory in directories {
1791                let path_parts: Vec<&str> = directory.split('/').collect();
1792                let identifier = if path_parts.is_empty() {
1793                    "unknown".to_string()
1794                } else {
1795                    path_parts[path_parts.len() - 1].to_string()
1796                };
1797                let safe_sql_identifier = make_sql_safe_identifier(&identifier);
1798                let table_name = format!("{}_{}", T::path_prefix(), safe_sql_identifier);
1799                let query = build_query(&table_name, start, end, where_clause);
1800                let resolved_path = self.resolve_directory_for_datafusion(&directory);
1801                let batches = self.session.collect_query_batches(
1802                    &table_name,
1803                    &resolved_path,
1804                    Some(&query),
1805                )?;
1806
1807                all_records.extend(self.convert_record_batches_to_typed::<T>(batches)?);
1808            }
1809        } else {
1810            for file_uri in &files_list {
1811                let identifier = extract_identifier_from_path(file_uri);
1812                let safe_sql_identifier = make_sql_safe_identifier(&identifier);
1813                let safe_filename = extract_sql_safe_filename(file_uri);
1814                let table_name = format!(
1815                    "{}_{}_{}",
1816                    T::path_prefix(),
1817                    safe_sql_identifier,
1818                    safe_filename
1819                );
1820                let query = build_query(&table_name, start, end, where_clause);
1821                let resolved_path = self.resolve_path_for_datafusion(file_uri);
1822                let batches = self.session.collect_query_batches(
1823                    &table_name,
1824                    &resolved_path,
1825                    Some(&query),
1826                )?;
1827
1828                all_records.extend(self.convert_record_batches_to_typed::<T>(batches)?);
1829            }
1830        }
1831
1832        if !is_monotonically_increasing_by_init(&all_records) {
1833            all_records.sort_by_key(|record| record.ts_init());
1834        }
1835
1836        Ok(all_records)
1837    }
1838
1839    /// Queries custom data dynamically by type name.
1840    ///
1841    /// This method allows querying custom data types without compile-time knowledge of the type.
1842    /// It uses dynamic schema decoding based on the type name stored in metadata.
1843    ///
1844    /// # Parameters
1845    ///
1846    /// - `type_name`: The name of the custom data type to query.
1847    /// - `identifiers`: Optional list of instrument identifiers to filter by.
1848    /// - `start`: Optional start timestamp for filtering.
1849    /// - `end`: Optional end timestamp for filtering.
1850    /// - `where_clause`: Optional SQL WHERE clause for additional filtering.
1851    /// - `files`: Optional list of specific files to query.
1852    /// - `_optimize_file_loading`: Whether to optimize file loading (currently unused).
1853    ///
1854    /// # Returns
1855    ///
1856    /// Returns a vector of `Data` enum variants containing the custom data.
1857    ///
1858    /// # Errors
1859    ///
1860    /// Returns an error if:
1861    /// - File discovery fails.
1862    /// - Data decoding fails.
1863    /// - Query execution fails.
1864    #[expect(clippy::too_many_arguments)]
1865    pub fn query_custom_data_dynamic(
1866        &mut self,
1867        type_name: &str,
1868        identifiers: Option<&[String]>,
1869        start: Option<UnixNanos>,
1870        end: Option<UnixNanos>,
1871        where_clause: Option<&str>,
1872        files: Option<Vec<String>>,
1873        _optimize_file_loading: bool,
1874    ) -> anyhow::Result<Vec<Data>> {
1875        self.reset_session();
1876
1877        self.register_remote_object_store()?;
1878
1879        let path_prefix = format!("custom/{type_name}");
1880
1881        let files = if let Some(f) = files {
1882            f.into_iter()
1883                .map(|p| self.to_object_path(&p).map(|op| op.to_string()))
1884                .collect::<anyhow::Result<Vec<_>>>()?
1885        } else {
1886            self.list_parquet_files_with_criteria(&path_prefix, identifiers, start, end)?
1887        };
1888
1889        if files.is_empty() {
1890            return Ok(Vec::new());
1891        }
1892
1893        let table_name = "custom_data_table";
1894
1895        // Use CustomDataDecoder for all custom data. Pass type_name so decode can look up
1896        // the type when Parquet/DataFusion does not preserve schema metadata. Callers must
1897        // ensure Rust custom types are registered via ensure_custom_data_registered::<T>().
1898        for file in files {
1899            let resolved_path = self.resolve_path_for_datafusion(&file);
1900            let sql_query = build_query(table_name, start, end, where_clause);
1901
1902            self.session
1903                .add_file::<CustomDataDecoder>(
1904                    table_name,
1905                    &resolved_path,
1906                    Some(&sql_query),
1907                    Some(type_name),
1908                )
1909                .map_err(|e| anyhow::anyhow!(e.to_string()))?;
1910        }
1911
1912        let query_result = self.session.get_query_result();
1913        Ok(query_result.collect())
1914    }
1915
1916    /// Queries all Parquet files for a specific data type and optional instrument IDs.
1917    ///
1918    /// This method finds all Parquet files that match the specified criteria and returns
1919    /// their full URIs. The files are filtered by data type, instrument IDs (if provided),
1920    /// and timestamp range (if provided).
1921    ///
1922    /// # Parameters
1923    ///
1924    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1925    /// - `identifiers`: Optional list of identifiers to filter by. Can be instrument_id strings
1926    ///   (e.g., "EUR/USD.SIM") or bar_type strings (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1927    ///   For bars, partial matching is supported.
1928    /// - `start`: Optional start timestamp to filter files by their time range.
1929    /// - `end`: Optional end timestamp to filter files by their time range.
1930    ///
1931    /// # Returns
1932    ///
1933    /// Returns a vector of file URI strings that match the query criteria,
1934    /// or an error if the query fails.
1935    ///
1936    /// # Errors
1937    ///
1938    /// Returns an error if:
1939    /// - The directory path cannot be constructed.
1940    /// - Object store listing operations fail.
1941    /// - URI reconstruction fails.
1942    ///
1943    /// # Examples
1944    ///
1945    /// ```rust,no_run
1946    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1947    /// use nautilus_core::UnixNanos;
1948    ///
1949    /// let catalog = ParquetDataCatalog::new(/* ... */);
1950    ///
1951    /// // Query all quote files
1952    /// let files = catalog.query_files("quotes", None, None, None)?;
1953    ///
1954    /// // Query trade files for specific instruments within a time range
1955    /// let files = catalog.query_files(
1956    ///     "trades",
1957    ///     Some(vec!["BTC/USD.SIM".to_string(), "ETH/USD.SIM".to_string()]),
1958    ///     Some(UnixNanos::from(1609459200000000000)),
1959    ///     Some(UnixNanos::from(1609545600000000000))
1960    /// )?;
1961    /// # Ok::<(), anyhow::Error>(())
1962    /// ```
1963    pub fn query_files(
1964        &self,
1965        data_cls: &str,
1966        identifiers: Option<Vec<String>>,
1967        start: Option<UnixNanos>,
1968        end: Option<UnixNanos>,
1969    ) -> anyhow::Result<Vec<String>> {
1970        let mut files = Vec::new();
1971
1972        let start_u64 = start.map(|s| s.as_u64());
1973        let end_u64 = end.map(|e| e.as_u64());
1974
1975        let base_dir = self.make_path(data_cls, None)?;
1976
1977        // Use recursive listing to match Python's glob behavior
1978        let list_result = self.execute_async(async {
1979            let prefix = ObjectPath::from(format!("{base_dir}/"));
1980            let mut stream = self.object_store.list(Some(&prefix));
1981            let mut objects = Vec::new();
1982            while let Some(object) = stream.next().await {
1983                objects.push(object?);
1984            }
1985            Ok::<Vec<_>, anyhow::Error>(objects)
1986        })?;
1987
1988        let mut file_paths: Vec<String> = list_result
1989            .into_iter()
1990            .filter_map(|object| {
1991                let path_str = object.location.to_string();
1992                if path_str.ends_with(".parquet") {
1993                    Some(path_str)
1994                } else {
1995                    None
1996                }
1997            })
1998            .collect();
1999
2000        // Apply identifier filtering if provided
2001        if let Some(identifiers) = identifiers {
2002            let safe_identifiers: Vec<String> = identifiers
2003                .iter()
2004                .map(|id| urisafe_instrument_id(id))
2005                .collect();
2006
2007            // Exact match by default for instrument_ids or bar_types
2008            let exact_match_file_paths: Vec<String> = file_paths
2009                .iter()
2010                .filter(|file_path| {
2011                    // Extract the directory name (second to last path component)
2012                    let path_parts: Vec<&str> = file_path.split('/').collect();
2013                    if path_parts.len() >= 2 {
2014                        let dir_name = path_parts[path_parts.len() - 2];
2015                        safe_identifiers.iter().any(|safe_id| safe_id == dir_name)
2016                    } else {
2017                        false
2018                    }
2019                })
2020                .cloned()
2021                .collect();
2022
2023            if exact_match_file_paths.is_empty() && data_cls == "bars" {
2024                file_paths.retain(|file_path| {
2025                    let path_parts: Vec<&str> = file_path.split('/').collect();
2026                    if path_parts.len() >= 2 {
2027                        let dir_name = path_parts[path_parts.len() - 2];
2028                        if let Some(bar_instrument_id) = extract_bar_type_instrument_id(dir_name) {
2029                            safe_identifiers.iter().any(|id| id == bar_instrument_id)
2030                        } else {
2031                            false
2032                        }
2033                    } else {
2034                        false
2035                    }
2036                });
2037            } else {
2038                file_paths = exact_match_file_paths;
2039            }
2040        }
2041
2042        // Apply timestamp filtering
2043        file_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
2044
2045        for file_path in file_paths {
2046            files.push(self.path_for_query_list(&file_path));
2047        }
2048
2049        Ok(files)
2050    }
2051
2052    pub fn quote_ticks(
2053        &mut self,
2054        instrument_ids: Option<Vec<String>>,
2055        start: Option<UnixNanos>,
2056        end: Option<UnixNanos>,
2057    ) -> anyhow::Result<Vec<QuoteTick>> {
2058        self.query_typed_data::<QuoteTick>(instrument_ids, start, end, None, None, true)
2059    }
2060
2061    /// Queries trade tick data for the specified instrument(s) and time range.
2062    pub fn trade_ticks(
2063        &mut self,
2064        instrument_ids: Option<Vec<String>>,
2065        start: Option<UnixNanos>,
2066        end: Option<UnixNanos>,
2067    ) -> anyhow::Result<Vec<TradeTick>> {
2068        self.query_typed_data::<TradeTick>(instrument_ids, start, end, None, None, true)
2069    }
2070
2071    /// Queries bar data for the specified instrument(s) and time range.
2072    pub fn bars(
2073        &mut self,
2074        instrument_ids: Option<Vec<String>>,
2075        start: Option<UnixNanos>,
2076        end: Option<UnixNanos>,
2077    ) -> anyhow::Result<Vec<Bar>> {
2078        self.query_typed_data::<Bar>(instrument_ids, start, end, None, None, true)
2079    }
2080
2081    /// Queries order book delta data for the specified instrument(s) and time range.
2082    pub fn order_book_deltas(
2083        &mut self,
2084        instrument_ids: Option<Vec<String>>,
2085        start: Option<UnixNanos>,
2086        end: Option<UnixNanos>,
2087    ) -> anyhow::Result<Vec<OrderBookDelta>> {
2088        self.query_typed_data::<OrderBookDelta>(instrument_ids, start, end, None, None, true)
2089    }
2090
2091    /// Queries order book depth L10 data for the specified instrument(s) and time range.
2092    pub fn order_book_depth10(
2093        &mut self,
2094        instrument_ids: Option<Vec<String>>,
2095        start: Option<UnixNanos>,
2096        end: Option<UnixNanos>,
2097    ) -> anyhow::Result<Vec<OrderBookDepth10>> {
2098        self.query_typed_data::<OrderBookDepth10>(instrument_ids, start, end, None, None, true)
2099    }
2100
2101    /// Queries instrument close data for the specified instrument(s) and time range.
2102    pub fn instrument_closes(
2103        &mut self,
2104        instrument_ids: Option<Vec<String>>,
2105        start: Option<UnixNanos>,
2106        end: Option<UnixNanos>,
2107    ) -> anyhow::Result<Vec<InstrumentClose>> {
2108        self.query_typed_data::<InstrumentClose>(instrument_ids, start, end, None, None, true)
2109    }
2110
2111    /// Queries any instrument data for the specified instrument(s) and time range.
2112    pub fn instruments(
2113        &self,
2114        instrument_ids: Option<&[String]>,
2115        _start: Option<UnixNanos>,
2116        _end: Option<UnixNanos>,
2117    ) -> anyhow::Result<Vec<InstrumentAny>> {
2118        self.query_instruments(instrument_ids)
2119    }
2120
2121    /// Retrieves a list of file paths for a given data type.
2122    ///
2123    /// This method constructs a path pattern to find all parquet files
2124    /// associated with the specified data type in the catalog's directory structure.
2125    ///
2126    /// # Parameters
2127    ///
2128    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades", "bars").
2129    ///
2130    /// # Returns
2131    ///
2132    /// Returns a vector of file paths matching the data type, or an error if the operation fails.
2133    ///
2134    /// # Errors
2135    ///
2136    /// Returns an error if:
2137    /// - Object store listing operations fail.
2138    /// - Directory access is denied.
2139    ///
2140    /// # Examples
2141    ///
2142    /// ```rust,no_run
2143    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2144    ///
2145    /// let catalog = ParquetDataCatalog::new(/* ... */);
2146    /// let files = catalog.get_file_list_from_data_cls("quotes")?;
2147    ///
2148    /// for file in files {
2149    ///     println!("Found file: {}", file);
2150    /// }
2151    /// # Ok::<(), anyhow::Error>(())
2152    /// ```
2153    pub fn get_file_list_from_data_cls(&self, data_cls: &str) -> anyhow::Result<Vec<String>> {
2154        let base_dir = self.make_path(data_cls, None)?;
2155
2156        let list_result = self.execute_async(async {
2157            let prefix = ObjectPath::from(format!("{base_dir}/"));
2158            let mut stream = self.object_store.list(Some(&prefix));
2159            let mut objects = Vec::new();
2160            while let Some(object) = stream.next().await {
2161                objects.push(object?);
2162            }
2163            Ok::<Vec<_>, anyhow::Error>(objects)
2164        })?;
2165
2166        let file_paths: Vec<String> = list_result
2167            .into_iter()
2168            .filter_map(|object| {
2169                let path_str = object.location.to_string();
2170                if path_str.ends_with(".parquet") {
2171                    Some(path_str)
2172                } else {
2173                    None
2174                }
2175            })
2176            .collect();
2177
2178        Ok(file_paths)
2179    }
2180
2181    /// Filters a list of file paths based on identifiers and time range.
2182    ///
2183    /// This method filters the provided file paths by:
2184    /// 1. Matching identifiers (exact match for instruments, prefix match for bars)
2185    /// 2. Intersecting with the specified time range
2186    ///
2187    /// # Parameters
2188    ///
2189    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades", "bars").
2190    /// - `file_paths`: List of file paths to filter.
2191    /// - `identifiers`: Optional list of identifiers to match against file paths.
2192    /// - `start`: Optional start timestamp for filtering.
2193    /// - `end`: Optional end timestamp for filtering.
2194    ///
2195    /// # Returns
2196    ///
2197    /// Returns a filtered vector of file paths that match the criteria.
2198    ///
2199    /// # Notes
2200    ///
2201    /// For Bar data types, if exact identifier matching fails, the function attempts
2202    /// partial matching by checking if the file's identifier starts with the provided identifier
2203    /// followed by a dash (to match bar type patterns).
2204    ///
2205    /// # Examples
2206    ///
2207    /// ```rust,no_run
2208    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2209    /// use nautilus_core::UnixNanos;
2210    ///
2211    /// let catalog = ParquetDataCatalog::new(/* ... */);
2212    /// let all_files = catalog.get_file_list_from_data_cls("quotes")?;
2213    ///
2214    /// let filtered = catalog.filter_files(
2215    ///     "quotes",
2216    ///     all_files,
2217    ///     Some(vec!["EUR/USD.SIM".to_string()]),
2218    ///     Some(UnixNanos::from(1609459200000000000)),
2219    ///     Some(UnixNanos::from(1609545600000000000))
2220    /// )?;
2221    /// # Ok::<(), anyhow::Error>(())
2222    /// ```
2223    pub fn filter_files(
2224        &self,
2225        data_cls: &str,
2226        file_paths: Vec<String>,
2227        identifiers: Option<Vec<String>>,
2228        start: Option<UnixNanos>,
2229        end: Option<UnixNanos>,
2230    ) -> anyhow::Result<Vec<String>> {
2231        let mut filtered_paths = file_paths;
2232
2233        // Apply identifier filtering if provided
2234        if let Some(identifiers) = identifiers {
2235            let safe_identifiers: Vec<String> = identifiers
2236                .iter()
2237                .map(|id| urisafe_instrument_id(id))
2238                .collect();
2239
2240            // Extract directory names from file paths
2241            let file_safe_identifiers: Vec<String> = filtered_paths
2242                .iter()
2243                .map(|file_path| {
2244                    let path_parts: Vec<&str> = file_path.split('/').collect();
2245                    if path_parts.len() >= 2 {
2246                        path_parts[path_parts.len() - 2].to_string()
2247                    } else {
2248                        String::new()
2249                    }
2250                })
2251                .collect();
2252
2253            // Exact match by default for instrument_ids or bar_types
2254            let exact_match_file_paths: Vec<String> = filtered_paths
2255                .iter()
2256                .enumerate()
2257                .filter_map(|(i, file_path)| {
2258                    let dir_name = &file_safe_identifiers[i];
2259                    if safe_identifiers.iter().any(|safe_id| safe_id == dir_name) {
2260                        Some(file_path.clone())
2261                    } else {
2262                        None
2263                    }
2264                })
2265                .collect();
2266
2267            if exact_match_file_paths.is_empty() && data_cls == "bars" {
2268                // Partial match of instrument_ids in bar_types for bars
2269                filtered_paths.retain(|file_path| {
2270                    let path_parts: Vec<&str> = file_path.split('/').collect();
2271                    if path_parts.len() >= 2 {
2272                        let dir_name = path_parts[path_parts.len() - 2];
2273                        safe_identifiers
2274                            .iter()
2275                            .any(|safe_id| dir_name.starts_with(&format!("{safe_id}-")))
2276                    } else {
2277                        false
2278                    }
2279                });
2280            } else {
2281                filtered_paths = exact_match_file_paths;
2282            }
2283        }
2284
2285        // Apply timestamp filtering
2286        let start_u64 = start.map(|s| s.as_u64());
2287        let end_u64 = end.map(|e| e.as_u64());
2288        filtered_paths.retain(|file_path| query_intersects_filename(file_path, start_u64, end_u64));
2289
2290        Ok(filtered_paths)
2291    }
2292
2293    /// Finds the missing time intervals for a specific data type and instrument ID.
2294    ///
2295    /// This method compares a requested time range against the existing data coverage
2296    /// and returns the gaps that need to be filled. This is useful for determining
2297    /// what data needs to be fetched or backfilled.
2298    ///
2299    /// # Parameters
2300    ///
2301    /// - `start`: Start timestamp of the requested range (Unix nanoseconds).
2302    /// - `end`: End timestamp of the requested range (Unix nanoseconds).
2303    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
2304    /// - `instrument_id`: Optional instrument ID to target a specific instrument's data.
2305    ///
2306    /// # Returns
2307    ///
2308    /// Returns a vector of (start, end) tuples representing the missing intervals,
2309    /// or an error if the operation fails.
2310    ///
2311    /// # Errors
2312    ///
2313    /// Returns an error if:
2314    /// - The directory path cannot be constructed.
2315    /// - Interval retrieval fails.
2316    /// - Gap calculation fails.
2317    ///
2318    /// # Examples
2319    ///
2320    /// ```rust,no_run
2321    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2322    ///
2323    /// let catalog = ParquetDataCatalog::new(/* ... */);
2324    ///
2325    /// // Find missing intervals for quote data
2326    /// let missing = catalog.get_missing_intervals_for_request(
2327    ///     1609459200000000000,  // start
2328    ///     1609545600000000000,  // end
2329    ///     "quotes",
2330    ///     Some("BTCUSD".to_string())
2331    /// )?;
2332    ///
2333    /// for (start, end) in missing {
2334    ///     println!("Missing data from {} to {}", start, end);
2335    /// }
2336    /// # Ok::<(), anyhow::Error>(())
2337    /// ```
2338    pub fn get_missing_intervals_for_request(
2339        &self,
2340        start: u64,
2341        end: u64,
2342        data_cls: &str,
2343        identifier: Option<&str>,
2344    ) -> anyhow::Result<Vec<(u64, u64)>> {
2345        let intervals = self.get_intervals(data_cls, identifier)?;
2346
2347        Ok(query_interval_diff(start, end, &intervals))
2348    }
2349
2350    /// Gets the first (earliest) timestamp for a specific data type and identifier.
2351    ///
2352    /// This method finds the earliest timestamp covered by existing data files for
2353    /// the specified data type and identifier. This is useful for determining
2354    /// the oldest data available or for incremental data updates.
2355    ///
2356    /// # Parameters
2357    ///
2358    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
2359    /// - `identifier`: Optional identifier to target a specific instrument's data. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
2360    ///
2361    /// # Returns
2362    ///
2363    /// Returns `Some(timestamp)` if data exists, `None` if no data is found,
2364    /// or an error if the operation fails.
2365    ///
2366    /// # Errors
2367    ///
2368    /// Returns an error if:
2369    /// - The directory path cannot be constructed.
2370    /// - Interval retrieval fails.
2371    ///
2372    /// # Note
2373    ///
2374    /// Unlike the Python implementation, this method does not check subclasses of the
2375    /// data type. The Python version checks `[data_cls, *data_cls.__subclasses__()]` to
2376    /// handle cases where subclasses might use different directory names. Since Rust
2377    /// works with string names rather than types, subclass checking is not possible.
2378    /// In practice, most subclasses map to the same directory name via `class_to_filename`,
2379    /// so this difference is typically not significant.
2380    ///
2381    /// # Examples
2382    ///
2383    /// ```rust,no_run
2384    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2385    ///
2386    /// let catalog = ParquetDataCatalog::new(/* ... */);
2387    ///
2388    /// // Get the first timestamp for quote data
2389    /// if let Some(first_ts) = catalog.query_first_timestamp("quotes", Some("BTCUSD".to_string()))? {
2390    ///     println!("First quote timestamp: {}", first_ts);
2391    /// } else {
2392    ///     println!("No quote data found");
2393    /// }
2394    /// # Ok::<(), anyhow::Error>(())
2395    /// ```
2396    pub fn query_first_timestamp(
2397        &self,
2398        data_cls: &str,
2399        identifier: Option<&str>,
2400    ) -> anyhow::Result<Option<u64>> {
2401        let intervals = self.get_intervals(data_cls, identifier)?;
2402
2403        if intervals.is_empty() {
2404            return Ok(None);
2405        }
2406
2407        Ok(Some(intervals.first().unwrap().0))
2408    }
2409
2410    /// Gets the last (most recent) timestamp for a specific data type and identifier.
2411    ///
2412    /// This method finds the latest timestamp covered by existing data files for
2413    /// the specified data type and identifier. This is useful for determining
2414    /// the most recent data available or for incremental data updates.
2415    ///
2416    /// # Parameters
2417    ///
2418    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
2419    /// - `identifier`: Optional identifier to target a specific instrument's data. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
2420    ///
2421    /// # Returns
2422    ///
2423    /// Returns `Some(timestamp)` if data exists, `None` if no data is found,
2424    /// or an error if the operation fails.
2425    ///
2426    /// # Errors
2427    ///
2428    /// Returns an error if:
2429    /// - The directory path cannot be constructed.
2430    /// - Interval retrieval fails.
2431    ///
2432    /// # Note
2433    ///
2434    /// Unlike the Python implementation, this method does not check subclasses of the
2435    /// data type. The Python version checks `[data_cls, *data_cls.__subclasses__()]` to
2436    /// handle cases where subclasses might use different directory names. Since Rust
2437    /// works with string names rather than types, subclass checking is not possible.
2438    /// In practice, most subclasses map to the same directory name via `class_to_filename`,
2439    /// so this difference is typically not significant.
2440    ///
2441    /// # Examples
2442    ///
2443    /// ```rust,no_run
2444    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2445    ///
2446    /// let catalog = ParquetDataCatalog::new(/* ... */);
2447    ///
2448    /// // Get the last timestamp for quote data
2449    /// if let Some(last_ts) = catalog.query_last_timestamp("quotes", Some("BTCUSD".to_string()))? {
2450    ///     println!("Last quote timestamp: {}", last_ts);
2451    /// } else {
2452    ///     println!("No quote data found");
2453    /// }
2454    /// # Ok::<(), anyhow::Error>(())
2455    /// ```
2456    pub fn query_last_timestamp(
2457        &self,
2458        data_cls: &str,
2459        identifier: Option<&str>,
2460    ) -> anyhow::Result<Option<u64>> {
2461        let intervals = self.get_intervals(data_cls, identifier)?;
2462
2463        if intervals.is_empty() {
2464            return Ok(None);
2465        }
2466
2467        Ok(Some(intervals.last().unwrap().1))
2468    }
2469
2470    /// Gets the time intervals covered by Parquet files for a specific data type and identifier.
2471    ///
2472    /// This method returns all time intervals covered by existing data files for the
2473    /// specified data type and identifier. The intervals are sorted by start time and
2474    /// represent the complete data coverage available.
2475    ///
2476    /// # Parameters
2477    ///
2478    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
2479    /// - `identifier`: Optional identifier to target a specific instrument's data. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
2480    ///
2481    /// # Returns
2482    ///
2483    /// Returns a vector of (start, end) tuples representing the covered intervals,
2484    /// sorted by start time, or an error if the operation fails.
2485    ///
2486    /// # Errors
2487    ///
2488    /// Returns an error if:
2489    /// - The directory path cannot be constructed.
2490    /// - Directory listing fails.
2491    /// - Filename parsing fails.
2492    ///
2493    /// # Examples
2494    ///
2495    /// ```rust,no_run
2496    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2497    ///
2498    /// let catalog = ParquetDataCatalog::new(/* ... */);
2499    ///
2500    /// // Get all intervals for quote data
2501    /// let intervals = catalog.get_intervals("quotes", Some("BTCUSD".to_string()))?;
2502    /// for (start, end) in intervals {
2503    ///     println!("Data available from {} to {}", start, end);
2504    /// }
2505    /// # Ok::<(), anyhow::Error>(())
2506    /// ```
2507    pub fn get_intervals(
2508        &self,
2509        data_cls: &str,
2510        identifier: Option<&str>,
2511    ) -> anyhow::Result<Vec<(u64, u64)>> {
2512        let directory = self.make_path(data_cls, identifier)?;
2513        let intervals = self.get_directory_intervals(&directory)?;
2514
2515        if identifier.is_none() {
2516            // `get_directory_intervals` already recursed through every per-identifier
2517            // subdirectory via `object_store.list`, so intervals from different
2518            // identifiers can overlap. Merge overlaps into a disjoint sorted union
2519            // so callers like `query_last_timestamp` see the true max end and
2520            // `consolidate_data_by_period` sees contiguous coverage.
2521            let mut merged: Vec<(u64, u64)> = Vec::new();
2522
2523            for interval in intervals {
2524                if let Some(last) = merged.last_mut()
2525                    && interval.0 <= last.1
2526                {
2527                    last.1 = last.1.max(interval.1);
2528                    continue;
2529                }
2530                merged.push(interval);
2531            }
2532
2533            return Ok(merged);
2534        }
2535
2536        // For bars, fall back to partial matching when the exact directory
2537        // doesn't exist (callers may pass an instrument_id like "EUR/USD.SIM"
2538        // but bars are stored under bar_type dirs like "EURUSD.SIM-1-MINUTE-...")
2539
2540        if !intervals.is_empty() || data_cls != "bars" {
2541            return Ok(intervals);
2542        }
2543
2544        let safe_id = urisafe_instrument_id(identifier.unwrap());
2545
2546        // Use relative path so list_directory_stems doesn't double-prefix
2547        // for remote catalogs (make_path already includes base_path)
2548        let bars_subdir = format!("data/{data_cls}");
2549        let subdirs = self.list_directory_stems(&bars_subdir)?;
2550
2551        let mut all_intervals = Vec::new();
2552
2553        for subdir in &subdirs {
2554            let decoded = urlencoding::decode(subdir).unwrap_or(Cow::Borrowed(subdir));
2555
2556            if extract_bar_type_instrument_id(&decoded) == Some(safe_id.as_str()) {
2557                // Use decoded name to avoid double percent-encoding
2558                // (to_object_path uses Path::from which re-encodes)
2559                let subdir_path = self.make_path(data_cls, Some(&decoded))?;
2560                all_intervals.extend(self.get_directory_intervals(&subdir_path)?);
2561            }
2562        }
2563
2564        all_intervals.sort_by_key(|&(start, _)| start);
2565
2566        // Merge overlapping intervals from different bar types so that
2567        // last().1 reliably gives the maximum end timestamp
2568        let mut merged: Vec<(u64, u64)> = Vec::new();
2569
2570        for interval in all_intervals {
2571            if let Some(last) = merged.last_mut()
2572                && interval.0 <= last.1
2573            {
2574                last.1 = last.1.max(interval.1);
2575                continue;
2576            }
2577            merged.push(interval);
2578        }
2579
2580        Ok(merged)
2581    }
2582
2583    /// Gets the time intervals covered by Parquet files in a specific directory.
2584    ///
2585    /// This method scans a directory for Parquet files and extracts the timestamp ranges
2586    /// from their filenames. It's used internally by other methods to determine data coverage
2587    /// and is essential for interval-based operations like gap detection and consolidation.
2588    ///
2589    /// # Parameters
2590    ///
2591    /// - `directory`: The directory path to scan for Parquet files.
2592    ///
2593    /// # Returns
2594    ///
2595    /// Returns a vector of (start, end) tuples representing the time intervals covered
2596    /// by files in the directory, sorted by start timestamp. Returns an empty vector
2597    /// if the directory doesn't exist or contains no valid Parquet files.
2598    ///
2599    /// # Errors
2600    ///
2601    /// Returns an error if:
2602    /// - Object store listing operations fail.
2603    /// - Directory access is denied.
2604    ///
2605    /// # Notes
2606    ///
2607    /// - Only files with valid timestamp-based filenames are included.
2608    /// - Files with unparsable names are silently ignored.
2609    /// - The method works with both local and remote object stores.
2610    /// - Results are automatically sorted by start timestamp.
2611    ///
2612    /// # Examples
2613    ///
2614    /// ```rust,no_run
2615    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2616    ///
2617    /// let catalog = ParquetDataCatalog::new(/* ... */);
2618    /// let intervals = catalog.get_directory_intervals("data/quotes/EURUSD")?;
2619    ///
2620    /// for (start, end) in intervals {
2621    ///     println!("File covers {} to {}", start, end);
2622    /// }
2623    /// # Ok::<(), anyhow::Error>(())
2624    /// ```
2625    pub fn get_directory_intervals(&self, directory: &str) -> anyhow::Result<Vec<(u64, u64)>> {
2626        // Use object store for all operations
2627        // Convert directory to object path format (consistent with how files are written)
2628        // For local stores with empty base_path, to_object_path returns path as-is.
2629        // For remote stores, to_object_path preserves or prepends the catalog base path.
2630        let object_dir = self.to_object_path(directory)?;
2631        let list_result = self.execute_async(async {
2632            // Ensure trailing slash for directory listing
2633            let dir_str = format!("{}/", object_dir.as_ref());
2634            let prefix = ObjectPath::from(dir_str);
2635            let mut stream = self.object_store.list(Some(&prefix));
2636            let mut objects = Vec::new();
2637            while let Some(object) = stream.next().await {
2638                objects.push(object?);
2639            }
2640            Ok::<Vec<_>, anyhow::Error>(objects)
2641        })?;
2642
2643        let mut intervals = Vec::new();
2644
2645        for object in list_result {
2646            let path_str = object.location.to_string();
2647            if path_str.ends_with(".parquet")
2648                && let Some(interval) = parse_filename_timestamps(&path_str)
2649            {
2650                intervals.push(interval);
2651            }
2652        }
2653
2654        intervals.sort_by_key(|&(start, _)| start);
2655
2656        Ok(intervals)
2657    }
2658
2659    /// Constructs a directory path for storing data of a specific type and instrument.
2660    ///
2661    /// This method builds the hierarchical directory structure used by the catalog to organize
2662    /// data by type and instrument. The path follows the pattern: `{base_path}/data/{type_name}/{instrument_id}`.
2663    /// Instrument IDs are automatically converted to URI-safe format by removing forward slashes.
2664    ///
2665    /// # Parameters
2666    ///
2667    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
2668    /// - `identifier`: Optional identifier. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL"). If provided, creates a subdirectory for the identifier. If `None`, returns the path to the data type directory.
2669    ///
2670    /// # Returns
2671    ///
2672    /// Returns the constructed directory path as a string, or an error if path construction fails.
2673    ///
2674    /// # Errors
2675    ///
2676    /// Returns an error if:
2677    /// - The instrument ID contains invalid characters that cannot be made URI-safe.
2678    /// - Path construction fails due to system limitations.
2679    ///
2680    /// # Path Structure
2681    ///
2682    /// - Without identifier: `{base_path}/data/{type_name}`.
2683    /// - With identifier: `{base_path}/data/{type_name}/{safe_identifier}`.
2684    /// - If `base_path` is empty: `data/{type_name}[/{safe_identifier}]`.
2685    ///
2686    /// # Examples
2687    ///
2688    /// ```rust,no_run
2689    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2690    ///
2691    /// let catalog = ParquetDataCatalog::new(/* ... */);
2692    ///
2693    /// // Path for all quote data
2694    /// let quotes_path = catalog.make_path("quotes", None)?;
2695    /// // Returns: "/base/path/data/quotes"
2696    ///
2697    /// // Path for specific instrument quotes
2698    /// let eurusd_quotes = catalog.make_path("quotes", Some("EUR/USD".to_string()))?;
2699    /// // Returns: "/base/path/data/quotes/EURUSD" (slash removed)
2700    ///
2701    /// // Path for bar data with complex instrument ID
2702    /// let bars_path = catalog.make_path("bars", Some("BTC/USD-1H".to_string()))?;
2703    /// // Returns: "/base/path/data/bars/BTCUSD-1H"
2704    /// # Ok::<(), anyhow::Error>(())
2705    /// ```
2706    pub fn make_path(&self, type_name: &str, identifier: Option<&str>) -> anyhow::Result<String> {
2707        let mut components = vec!["data".to_string(), type_name.to_string()];
2708
2709        if let Some(id) = identifier {
2710            let safe_id = urisafe_instrument_id(id);
2711            components.push(safe_id);
2712        }
2713
2714        let path = make_object_store_path_owned(&self.base_path, components);
2715        Ok(path)
2716    }
2717
2718    /// Builds the directory path for custom data: `data/custom/{type_name}[/{identifier segments}]`.
2719    /// Identifier can contain `//` for subdirectories (normalized to `/`); path is safe for writing.
2720    pub fn make_path_custom_data(
2721        &self,
2722        type_name: &str,
2723        identifier: Option<&str>,
2724    ) -> anyhow::Result<String> {
2725        let components = custom_data_path_components(type_name, identifier);
2726        let path = make_object_store_path_owned(&self.base_path, components);
2727        Ok(path)
2728    }
2729
2730    /// Helper method to rename a parquet file by moving it via object store operations
2731    fn rename_parquet_file(
2732        &self,
2733        directory: &str,
2734        old_start: u64,
2735        old_end: u64,
2736        new_start: u64,
2737        new_end: u64,
2738    ) -> anyhow::Result<()> {
2739        let old_filename =
2740            timestamps_to_filename(UnixNanos::from(old_start), UnixNanos::from(old_end));
2741        let old_path = format!("{directory}/{old_filename}");
2742        let old_object_path = self.to_object_path(&old_path)?;
2743
2744        let new_filename =
2745            timestamps_to_filename(UnixNanos::from(new_start), UnixNanos::from(new_end));
2746        let new_path = format!("{directory}/{new_filename}");
2747        let new_object_path = self.to_object_path(&new_path)?;
2748
2749        self.move_file(&old_object_path, &new_object_path)
2750    }
2751
2752    /// Converts a catalog path string to an [`ObjectPath`] for object store operations.
2753    ///
2754    /// This method handles the conversion between catalog-relative paths and object store paths,
2755    /// taking into account the catalog's base path configuration. It automatically preserves the
2756    /// base path prefix for remote catalogs and strips it for local catalog paths.
2757    ///
2758    /// # Parameters
2759    ///
2760    /// - `path`: The catalog path string to convert. Can be absolute or relative.
2761    ///
2762    /// # Returns
2763    ///
2764    /// Returns an [`ObjectPath`] suitable for use with object store operations.
2765    ///
2766    /// # Path Handling
2767    ///
2768    /// - If `base_path` is empty, the path is used as-is.
2769    /// - If `base_path` is set for a remote catalog, it's preserved or prepended.
2770    /// - If `base_path` is set for a local catalog, it's stripped from the path if present.
2771    /// - Trailing slashes and backslashes are automatically handled.
2772    /// - The resulting path is relative to the object store root.
2773    /// - All paths are normalized to use forward slashes (object store convention).
2774    ///
2775    /// # Errors
2776    ///
2777    /// Returns an error for remote catalogs when `path` is a full URI whose scheme/host
2778    /// does not match the catalog's own root (cross-bucket misuse). Without this guard
2779    /// the caller could silently write to or read from the wrong bucket.
2780    ///
2781    /// # Examples
2782    ///
2783    /// Local catalog paths (absolute or relative) strip the catalog's base directory:
2784    ///
2785    /// ```rust,no_run
2786    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2787    /// # let catalog: ParquetDataCatalog = unimplemented!();
2788    /// let object_path = catalog.to_object_path("/base/data/quotes/file.parquet")?;
2789    /// // ObjectPath("data/quotes/file.parquet")
2790    /// # Ok::<(), anyhow::Error>(())
2791    /// ```
2792    ///
2793    /// Remote catalog paths (relative or full URI) preserve or prepend the base prefix:
2794    ///
2795    /// ```rust,no_run
2796    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2797    /// # let catalog: ParquetDataCatalog = unimplemented!();
2798    /// let object_path = catalog.to_object_path("data/trades/file.parquet")?;
2799    /// // ObjectPath("base/data/trades/file.parquet")
2800    /// # Ok::<(), anyhow::Error>(())
2801    /// ```
2802    pub fn to_object_path(&self, path: &str) -> anyhow::Result<ObjectPath> {
2803        Ok(ObjectPath::from(self.object_store_path(path)?))
2804    }
2805
2806    fn register_remote_object_store(&mut self) -> anyhow::Result<()> {
2807        if self.is_remote_uri() {
2808            let base_url = remote_store_root_url(&self.original_uri)?;
2809            self.session
2810                .register_object_store(&base_url, self.object_store.clone());
2811        }
2812
2813        Ok(())
2814    }
2815
2816    /// Converts a path string to [`ObjectPath`] using parse (no percent-encoding).
2817    ///
2818    /// Use this for paths that were returned by the object store (e.g. from `list()`),
2819    /// which may already be percent-encoded. Using [`Self::to_object_path`] (which uses
2820    /// `Path::from`) on such paths would double-encode (e.g. `%5E` -> `%255E`).
2821    ///
2822    /// # Errors
2823    ///
2824    /// Returns an error for the same cross-bucket case as [`Self::to_object_path`], or
2825    /// when the resulting string fails [`ObjectPath::parse`].
2826    pub fn to_object_path_parsed(&self, path: &str) -> anyhow::Result<ObjectPath> {
2827        let to_parse = self.object_store_path(path)?;
2828        ObjectPath::parse(&to_parse).map_err(anyhow::Error::from)
2829    }
2830
2831    fn object_store_path(&self, path: &str) -> anyhow::Result<String> {
2832        let normalized_path = path.replace('\\', "/");
2833
2834        if self.is_remote_uri() {
2835            if normalized_path.contains("://") {
2836                let path_under_root = self.remote_uri_object_path(&normalized_path)?;
2837                return Ok(self.path_under_base(&path_under_root));
2838            }
2839
2840            return Ok(self.path_under_base(&normalized_path));
2841        }
2842
2843        Ok(self.path_without_local_base(&normalized_path))
2844    }
2845
2846    fn remote_uri_object_path(&self, path: &str) -> anyhow::Result<String> {
2847        let path_url = url::Url::parse(path)
2848            .map_err(|e| anyhow::anyhow!("Failed to parse object store URI {path}: {e}"))?;
2849        if !is_remote_uri_scheme(path_url.scheme()) {
2850            anyhow::bail!(
2851                "URI {path} uses non-remote scheme {} for remote catalog at {}",
2852                path_url.scheme(),
2853                self.original_uri,
2854            );
2855        }
2856
2857        let catalog_root = remote_store_root_url(&self.original_uri)?;
2858        let path_root = remote_store_root_url(path)?;
2859        if catalog_root.as_str().trim_end_matches('/') != path_root.as_str().trim_end_matches('/') {
2860            anyhow::bail!(
2861                "Cross-store URI {path} (root {}) does not belong to catalog rooted at {} ({})",
2862                path_root.as_str().trim_end_matches('/'),
2863                self.original_uri,
2864                catalog_root.as_str().trim_end_matches('/'),
2865            );
2866        }
2867
2868        // The URL crate keeps the path component percent-encoded (e.g. `%5E`),
2869        // so preserve that encoding for `ObjectPath::parse` round-trips through
2870        // `object_store::list`/`get`.
2871        Ok(path_url.path().trim_start_matches('/').to_string())
2872    }
2873
2874    fn path_without_local_base(&self, path: &str) -> String {
2875        let base_path = if self.base_path.is_empty() {
2876            self.native_base_path_string()
2877        } else {
2878            self.base_path.clone()
2879        };
2880        let normalized_base = base_path.replace('\\', "/");
2881        let base = normalized_base.trim_end_matches('/');
2882
2883        if base.is_empty() {
2884            path.to_string()
2885        } else if path == base {
2886            String::new()
2887        } else if let Some(without_base) = path.strip_prefix(&format!("{base}/")) {
2888            without_base.to_string()
2889        } else {
2890            path.to_string()
2891        }
2892    }
2893
2894    fn path_under_base(&self, path: &str) -> String {
2895        let normalized_path = path.replace('\\', "/");
2896        let path = normalized_path
2897            .trim_start_matches('/')
2898            .trim_end_matches('/');
2899
2900        if self.base_path.is_empty() {
2901            return path.to_string();
2902        }
2903
2904        let normalized_base = self.base_path.replace('\\', "/");
2905        let base = normalized_base
2906            .trim_start_matches('/')
2907            .trim_end_matches('/');
2908
2909        if base.is_empty() || path == base || path.starts_with(&format!("{base}/")) {
2910            path.to_string()
2911        } else if path.is_empty() {
2912            base.to_string()
2913        } else {
2914            make_object_store_path(base, &[path])
2915        }
2916    }
2917
2918    #[allow(dead_code)]
2919    fn to_file_path(&self, path: &ObjectPath) -> String {
2920        path.to_string()
2921    }
2922
2923    /// Helper method to move a file using object store rename operation
2924    pub fn move_file(&self, old_path: &ObjectPath, new_path: &ObjectPath) -> anyhow::Result<()> {
2925        self.execute_async(async {
2926            self.object_store
2927                .rename(old_path, new_path)
2928                .await
2929                .map_err(anyhow::Error::from)
2930        })
2931    }
2932
2933    /// Helper method to execute async operations with a runtime
2934    pub fn execute_async<F, R>(&self, future: F) -> anyhow::Result<R>
2935    where
2936        F: std::future::Future<Output = anyhow::Result<R>>,
2937    {
2938        let rt = get_runtime();
2939        rt.block_on(future)
2940    }
2941
2942    /// Lists directory stems (directory names without path) in a subdirectory.
2943    ///
2944    /// This method scans a subdirectory and returns the names of all immediate
2945    /// subdirectories. It's used to list data types, backtest runs, and live runs.
2946    ///
2947    /// # Parameters
2948    ///
2949    /// - `subdirectory`: The subdirectory path to scan (e.g., "data", "backtest", "live").
2950    ///
2951    /// # Returns
2952    ///
2953    /// Returns a vector of directory names (stems) found in the subdirectory,
2954    /// or an error if the operation fails.
2955    ///
2956    /// # Errors
2957    ///
2958    /// Returns an error if:
2959    /// - Object store listing operations fail.
2960    /// - Directory access is denied.
2961    ///
2962    /// # Examples
2963    ///
2964    /// ```rust,no_run
2965    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
2966    ///
2967    /// let catalog = ParquetDataCatalog::new(/* ... */);
2968    ///
2969    /// // List all data types
2970    /// let data_types = catalog.list_directory_stems("data")?;
2971    /// for data_type in data_types {
2972    ///     println!("Found data type: {}", data_type);
2973    /// }
2974    /// # Ok::<(), anyhow::Error>(())
2975    /// ```
2976    pub fn list_directory_stems(&self, subdirectory: &str) -> anyhow::Result<Vec<String>> {
2977        // For local filesystem paths, use filesystem operations to detect empty directories
2978        // For remote object stores, we can only list directories that contain files
2979        if !self.is_remote_uri() {
2980            let directory = PathBuf::from(self.native_base_path_string()).join(subdirectory);
2981
2982            // Check if directory exists
2983            if !directory.exists() {
2984                return Ok(Vec::new());
2985            }
2986
2987            // List all entries in the directory
2988            let mut directories = Vec::new();
2989
2990            if let Ok(entries) = std::fs::read_dir(&directory) {
2991                for entry in entries.flatten() {
2992                    if let Ok(file_type) = entry.file_type()
2993                        && file_type.is_dir()
2994                    {
2995                        // Use file_name() to get the directory name (not file_stem which removes extension)
2996                        if let Some(name) = entry.path().file_name() {
2997                            directories.push(name.to_string_lossy().to_string());
2998                        }
2999                    }
3000                }
3001            }
3002            directories.sort();
3003            return Ok(directories);
3004        }
3005
3006        // For remote URIs, use object store listing (only lists directories with files)
3007        let directory = make_object_store_path(&self.base_path, &[subdirectory]);
3008
3009        let list_result = self.execute_async(async {
3010            let prefix = ObjectPath::from(format!("{directory}/"));
3011            let mut stream = self.object_store.list(Some(&prefix));
3012            let mut directories = Vec::new();
3013            let mut seen_dirs = std::collections::HashSet::new();
3014
3015            while let Some(object) = stream.next().await {
3016                let object = object?;
3017                let path_str = object.location.to_string();
3018
3019                // Extract the immediate subdirectory name
3020                if let Some(relative_path) = path_str.strip_prefix(&format!("{directory}/")) {
3021                    let parts: Vec<&str> = relative_path.split('/').collect();
3022                    if let Some(first_part) = parts.first()
3023                        && !first_part.is_empty()
3024                        && !seen_dirs.contains(*first_part)
3025                    {
3026                        seen_dirs.insert(first_part.to_string());
3027                        directories.push(first_part.to_string());
3028                    }
3029                }
3030            }
3031
3032            Ok::<Vec<String>, anyhow::Error>(directories)
3033        })?;
3034
3035        Ok(list_result)
3036    }
3037
3038    /// Lists all data types available in the catalog.
3039    ///
3040    /// This method returns the names of all data type directories in the catalog.
3041    /// Data types correspond to different kinds of market data (e.g., "quotes", "trades", "bars").
3042    ///
3043    /// # Returns
3044    ///
3045    /// Returns a vector of data type names, or an error if the operation fails.
3046    ///
3047    /// # Errors
3048    ///
3049    /// Returns an error if:
3050    /// - Object store listing operations fail.
3051    /// - Directory access is denied.
3052    ///
3053    /// # Examples
3054    ///
3055    /// ```rust,no_run
3056    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
3057    ///
3058    /// let catalog = ParquetDataCatalog::new(/* ... */);
3059    ///
3060    /// // List all data types
3061    /// let data_types = catalog.list_data_types()?;
3062    /// for data_type in data_types {
3063    ///     println!("Available data type: {}", data_type);
3064    /// }
3065    /// # Ok::<(), anyhow::Error>(())
3066    /// ```
3067    ///
3068    pub fn list_data_types(&self) -> anyhow::Result<Vec<String>> {
3069        self.list_directory_stems("data")
3070    }
3071
3072    /// Data types that are not persisted by the Rust feather writer or catalog.
3073    fn is_excluded_stream_data_type(_name: &str) -> bool {
3074        false
3075    }
3076
3077    /// Lists all backtest run IDs available in the catalog.
3078    ///
3079    /// This method returns the names of all backtest run directories in the catalog.
3080    /// Each backtest run corresponds to a specific backtest execution instance.
3081    ///
3082    /// # Returns
3083    ///
3084    /// Returns a vector of backtest run IDs, or an error if the operation fails.
3085    ///
3086    /// # Errors
3087    ///
3088    /// Returns an error if:
3089    /// - Object store listing operations fail.
3090    /// - Directory access is denied.
3091    ///
3092    /// # Examples
3093    ///
3094    /// ```rust,no_run
3095    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
3096    ///
3097    /// let catalog = ParquetDataCatalog::new(/* ... */);
3098    ///
3099    /// // List all backtest runs
3100    /// let runs = catalog.list_backtest_runs()?;
3101    /// for run_id in runs {
3102    ///     println!("Backtest run: {}", run_id);
3103    /// }
3104    /// # Ok::<(), anyhow::Error>(())
3105    /// ```
3106    pub fn list_backtest_runs(&self) -> anyhow::Result<Vec<String>> {
3107        self.list_directory_stems("backtest")
3108    }
3109
3110    /// Lists all live run IDs available in the catalog.
3111    ///
3112    /// This method returns the names of all live run directories in the catalog.
3113    /// Each live run corresponds to a specific live trading execution instance.
3114    ///
3115    /// # Returns
3116    ///
3117    /// Returns a vector of live run IDs, or an error if the operation fails.
3118    ///
3119    /// # Errors
3120    ///
3121    /// Returns an error if:
3122    /// - Object store listing operations fail.
3123    /// - Directory access is denied.
3124    ///
3125    /// # Examples
3126    ///
3127    /// ```rust,no_run
3128    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
3129    ///
3130    /// let catalog = ParquetDataCatalog::new(/* ... */);
3131    ///
3132    /// // List all live runs
3133    /// let runs = catalog.list_live_runs()?;
3134    /// for run_id in runs {
3135    ///     println!("Live run: {}", run_id);
3136    /// }
3137    /// # Ok::<(), anyhow::Error>(())
3138    /// ```
3139    pub fn list_live_runs(&self) -> anyhow::Result<Vec<String>> {
3140        self.list_directory_stems("live")
3141    }
3142
3143    /// Reads data from a live run instance.
3144    ///
3145    /// This method reads all data associated with a specific live run instance
3146    /// from feather files stored in the catalog.
3147    ///
3148    /// # Parameters
3149    ///
3150    /// - `instance_id`: The ID of the live run instance to read.
3151    ///
3152    /// # Returns
3153    ///
3154    /// Returns a vector of `Data` objects from the live run, sorted by timestamp,
3155    /// or an error if the operation fails.
3156    ///
3157    /// # Errors
3158    ///
3159    /// Returns an error if:
3160    /// - The instance ID doesn't exist.
3161    /// - Feather file reading fails.
3162    /// - Data deserialization fails.
3163    ///
3164    /// # Note
3165    ///
3166    /// This method is currently not fully implemented. Feather file reading
3167    /// requires complex deserialization logic that needs to be added.
3168    ///
3169    /// # Examples
3170    ///
3171    /// ```rust,no_run
3172    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
3173    ///
3174    /// let catalog = ParquetDataCatalog::new(/* ... */);
3175    ///
3176    /// // Read data from a live run
3177    /// let data = catalog.read_live_run("instance-123")?;
3178    /// for item in data {
3179    ///     println!("Data: {:?}", item);
3180    /// }
3181    /// # Ok::<(), anyhow::Error>(())
3182    /// ```
3183    pub fn read_live_run(&self, instance_id: &str) -> anyhow::Result<Vec<Data>> {
3184        self.read_run_data("live", instance_id)
3185    }
3186
3187    /// Reads data from a backtest run instance.
3188    ///
3189    /// This method reads all data associated with a specific backtest run instance
3190    /// from feather files stored in the catalog.
3191    ///
3192    /// # Parameters
3193    ///
3194    /// - `instance_id`: The ID of the backtest run instance to read.
3195    ///
3196    /// # Returns
3197    ///
3198    /// Returns a vector of `Data` objects from the backtest run, sorted by timestamp,
3199    /// or an error if the operation fails.
3200    ///
3201    /// # Errors
3202    ///
3203    /// Returns an error if:
3204    /// - The instance ID doesn't exist.
3205    /// - Feather file reading fails.
3206    /// - Data deserialization fails.
3207    ///
3208    /// # Examples
3209    ///
3210    /// ```rust,no_run
3211    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
3212    ///
3213    /// let catalog = ParquetDataCatalog::new(/* ... */);
3214    ///
3215    /// // Read data from a backtest run
3216    /// let data = catalog.read_backtest("instance-123")?;
3217    /// for item in data {
3218    ///     println!("Data: {:?}", item);
3219    /// }
3220    /// # Ok::<(), anyhow::Error>(())
3221    /// ```
3222    pub fn read_backtest(&self, instance_id: &str) -> anyhow::Result<Vec<Data>> {
3223        self.read_run_data("backtest", instance_id)
3224    }
3225
3226    /// Helper function to read data from a run instance (backtest or live).
3227    ///
3228    /// This function reads all data associated with a specific run instance
3229    /// from feather files stored in the catalog.
3230    ///
3231    /// # Parameters
3232    ///
3233    /// - `subdirectory`: The subdirectory name ("backtest" or "live").
3234    /// - `instance_id`: The ID of the run instance to read.
3235    ///
3236    /// # Returns
3237    ///
3238    /// Returns a vector of `Data` objects from the run, sorted by timestamp,
3239    /// or an error if the operation fails.
3240    fn read_run_data(&self, subdirectory: &str, instance_id: &str) -> anyhow::Result<Vec<Data>> {
3241        // List all data types in the instance directory
3242        let instance_dir = make_object_store_path(&self.base_path, &[subdirectory, instance_id]);
3243
3244        // List directories under the instance directory
3245        let data_types = if self.is_remote_uri() {
3246            // For remote URIs, use object store listing
3247
3248            self.execute_async(async {
3249                let prefix = ObjectPath::from(format!("{instance_dir}/"));
3250                let mut stream = self.object_store.list(Some(&prefix));
3251                let mut directories = Vec::new();
3252                let mut seen_dirs = std::collections::HashSet::new();
3253
3254                while let Some(object) = stream.next().await {
3255                    let object = object?;
3256                    let path_str = object.location.to_string();
3257
3258                    // Extract the immediate subdirectory name
3259                    if let Some(relative_path) = path_str.strip_prefix(&format!("{instance_dir}/"))
3260                    {
3261                        let parts: Vec<&str> = relative_path.split('/').collect();
3262                        if let Some(first_part) = parts.first()
3263                            && !first_part.is_empty()
3264                            && !seen_dirs.contains(*first_part)
3265                        {
3266                            seen_dirs.insert(first_part.to_string());
3267                            directories.push(first_part.to_string());
3268                        }
3269                    }
3270                }
3271
3272                Ok::<Vec<String>, anyhow::Error>(directories)
3273            })?
3274        } else {
3275            // For local filesystem paths
3276            let directory = PathBuf::from(self.native_base_path_string())
3277                .join(subdirectory)
3278                .join(instance_id);
3279
3280            if !directory.exists() {
3281                return Ok(Vec::new());
3282            }
3283
3284            let mut directories = Vec::new();
3285
3286            if let Ok(entries) = std::fs::read_dir(&directory) {
3287                for entry in entries.flatten() {
3288                    if let Ok(file_type) = entry.file_type()
3289                        && file_type.is_dir()
3290                        && let Some(name) = entry.path().file_name()
3291                    {
3292                        directories.push(name.to_string_lossy().to_string());
3293                    }
3294                }
3295            }
3296            directories.sort();
3297            directories
3298        };
3299
3300        if data_types.is_empty() {
3301            // No data types found - return empty vector
3302            return Ok(Vec::new());
3303        }
3304
3305        let mut all_data: Vec<Data> = Vec::new();
3306
3307        // Process each persisted data type.
3308        for data_cls in data_types
3309            .into_iter()
3310            .filter(|s| !Self::is_excluded_stream_data_type(s))
3311        {
3312            // List all feather files for this data type
3313            let feather_files = self.list_feather_files(
3314                subdirectory,
3315                instance_id,
3316                &data_cls,
3317                None, // No identifier filtering - read all
3318            )?;
3319
3320            if feather_files.is_empty() {
3321                continue; // Skip if no files found
3322            }
3323
3324            // Process each feather file
3325            for file_path in feather_files {
3326                // Read the feather file (may contain multiple batches)
3327                let batches = self.read_feather_file(&file_path)?;
3328
3329                if batches.is_empty() {
3330                    continue; // Skip empty or invalid files
3331                }
3332
3333                // Convert RecordBatches to Data objects based on data_cls
3334                let file_data: Vec<Data> = match data_cls.as_str() {
3335                    "quotes" => {
3336                        let quotes: Vec<QuoteTick> =
3337                            self.convert_record_batches_to_data(batches, false)?;
3338                        quotes.into_iter().map(Data::from).collect()
3339                    }
3340                    "trades" => {
3341                        let trades: Vec<TradeTick> =
3342                            self.convert_record_batches_to_data(batches, false)?;
3343                        trades.into_iter().map(Data::from).collect()
3344                    }
3345                    "order_book_deltas" => {
3346                        let deltas: Vec<OrderBookDelta> =
3347                            self.convert_record_batches_to_data(batches, false)?;
3348                        deltas.into_iter().map(Data::from).collect()
3349                    }
3350                    "order_book_depths" => {
3351                        let depths: Vec<OrderBookDepth10> =
3352                            self.convert_record_batches_to_data(batches, false)?;
3353                        depths.into_iter().map(Data::from).collect()
3354                    }
3355                    "bars" => {
3356                        let bars: Vec<Bar> = self.convert_record_batches_to_data(batches, false)?;
3357                        bars.into_iter().map(Data::from).collect()
3358                    }
3359                    "index_prices" => {
3360                        let prices: Vec<IndexPriceUpdate> =
3361                            self.convert_record_batches_to_data(batches, false)?;
3362                        prices.into_iter().map(Data::from).collect()
3363                    }
3364                    "mark_prices" => {
3365                        let prices: Vec<MarkPriceUpdate> =
3366                            self.convert_record_batches_to_data(batches, false)?;
3367                        prices.into_iter().map(Data::from).collect()
3368                    }
3369                    "instrument_status" => {
3370                        let statuses: Vec<InstrumentStatus> =
3371                            self.convert_record_batches_to_data(batches, false)?;
3372                        statuses.into_iter().map(Data::from).collect()
3373                    }
3374                    "instrument_closes" => {
3375                        let closes: Vec<InstrumentClose> =
3376                            self.convert_record_batches_to_data(batches, false)?;
3377                        closes.into_iter().map(Data::from).collect()
3378                    }
3379                    _ => {
3380                        if data_cls.starts_with("custom/") {
3381                            self.decode_custom_batches_to_data(batches, false)?
3382                        } else {
3383                            // Unknown data type - skip it
3384                            continue;
3385                        }
3386                    }
3387                };
3388
3389                all_data.extend(file_data);
3390            }
3391        }
3392
3393        // Sort all data by timestamp (ts_init)
3394        all_data.sort_by(|a, b| {
3395            let ts_a = a.ts_init();
3396            let ts_b = b.ts_init();
3397            ts_a.cmp(&ts_b)
3398        });
3399
3400        Ok(all_data)
3401    }
3402
3403    /// Decodes multiple record batches of custom data (data_cls starts with "custom/") into a single
3404    /// `Vec<Data>`. Optionally replaces `ts_init` column with `ts_event` before decoding.
3405    ///
3406    /// # Errors
3407    ///
3408    /// Returns an error if any batch fails to decode.
3409    fn decode_custom_batches_to_data(
3410        &self,
3411        batches: Vec<RecordBatch>,
3412        use_ts_event_for_ts_init: bool,
3413    ) -> anyhow::Result<Vec<Data>> {
3414        orchestration_decode_custom_batches_to_data(batches, use_ts_event_for_ts_init)
3415    }
3416
3417    /// Decodes a RecordBatch to Data objects based on metadata.
3418    ///
3419    /// This method determines the data type from metadata and decodes the batch accordingly.
3420    /// It supports both standard data types and custom data types when `allow_custom_fallback`
3421    /// is true (e.g. when called from `decode_custom_batches_to_data` for files under
3422    /// `custom/`). When false, unknown type names produce an error instead of attempting
3423    /// custom decode, so malformed or typo'd built-in metadata fails explicitly.
3424    ///
3425    /// # Parameters
3426    ///
3427    /// - `metadata`: Schema metadata containing type information.
3428    /// - `batch`: The RecordBatch to decode.
3429    /// - `allow_custom_fallback`: If true, unknown type_name is decoded via custom data
3430    ///   registry; if false, unknown type_name returns an error.
3431    ///
3432    /// # Returns
3433    ///
3434    /// Returns a vector of Data enum variants.
3435    ///
3436    /// # Errors
3437    ///
3438    /// Returns an error if decoding fails or the type is unknown (and custom fallback not allowed).
3439    #[allow(dead_code)] // used by tests
3440    fn decode_batch_to_data(
3441        &self,
3442        metadata: &std::collections::HashMap<String, String>,
3443        batch: RecordBatch,
3444        allow_custom_fallback: bool,
3445    ) -> anyhow::Result<Vec<Data>> {
3446        orchestration_decode_batch_to_data(metadata, batch, allow_custom_fallback)
3447    }
3448
3449    /// Converts stream data from feather files to parquet files.
3450    ///
3451    /// This method reads data from feather files generated during a backtest or live run
3452    /// and writes it to the catalog in parquet format. It's useful for converting temporary
3453    /// stream data into a more permanent and queryable format.
3454    ///
3455    /// # Parameters
3456    ///
3457    /// - `instance_id`: The ID of the backtest or live run instance.
3458    /// - `data_cls`: The data class name (e.g., "quotes", "trades", "bars").
3459    /// - `subdirectory`: The subdirectory containing the feather files. Either "backtest" or "live" (default: "backtest").
3460    /// - `identifiers`: Optional list of identifiers to filter by (instrument IDs or bar types).
3461    /// - `use_ts_event_for_ts_init`: If true, replaces the `ts_init` column with `ts_event` column values before deserializing.
3462    ///
3463    /// # Returns
3464    ///
3465    /// Returns `Ok(())` on success, or an error if the operation fails.
3466    ///
3467    /// # Errors
3468    ///
3469    /// Returns an error if:
3470    /// - The instance ID doesn't exist.
3471    /// - Feather file listing fails.
3472    /// - Feather file reading fails.
3473    /// - Data deserialization fails.
3474    /// - Writing to parquet fails.
3475    ///
3476    /// # Note
3477    ///
3478    /// This method is currently not fully implemented. It requires:
3479    /// - Listing feather files in the specified subdirectory
3480    /// - Reading feather files (Arrow IPC stream reading)
3481    /// - Converting Arrow tables to Nautilus data objects
3482    /// - Writing data to the catalog using existing write methods
3483    ///
3484    /// # Examples
3485    ///
3486    /// ```rust,no_run
3487    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
3488    ///
3489    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
3490    ///
3491    /// // Convert backtest stream data to parquet
3492    /// catalog.convert_stream_to_data(
3493    ///     "instance-123",
3494    ///     "quotes",
3495    ///     Some("backtest"),
3496    ///     None,
3497    ///     false
3498    /// )?;
3499    /// # Ok::<(), anyhow::Error>(())
3500    /// ```
3501    /// Lists feather files for a specific data class in a subdirectory.
3502    ///
3503    /// This helper function finds all `.feather` files in the specified subdirectory
3504    /// (backtest or live) for the given instance ID and data class.
3505    fn list_feather_files(
3506        &self,
3507        subdirectory: &str,
3508        instance_id: &str,
3509        data_name: &str,
3510        identifiers: Option<&[String]>,
3511    ) -> anyhow::Result<Vec<String>> {
3512        // Construct the base directory path: {subdirectory}/{instance_id}/{data_name}
3513        let base_dir = make_object_store_path(&self.base_path, &[subdirectory, instance_id]);
3514        let data_dir = make_object_store_path(&base_dir, &[data_name]);
3515
3516        let mut files = Vec::new();
3517
3518        // Try to list files in the data directory (for per-instrument subdirectories)
3519        let subdir_prefix = ObjectPath::from(format!("{data_dir}/"));
3520        let list_result = self.execute_async(async {
3521            let mut stream = self.object_store.list(Some(&subdir_prefix));
3522            let mut subdirs = Vec::new();
3523            let mut flat_files = Vec::new();
3524
3525            while let Some(object) = stream.next().await {
3526                let object = object?;
3527                let path_str = object.location.to_string();
3528
3529                // Check if this is a subdirectory (per-instrument) or a flat file
3530                if let Some(relative_path) = path_str.strip_prefix(&format!("{data_dir}/")) {
3531                    if relative_path.ends_with(".feather") {
3532                        // Flat file format: {data_name}_*.feather
3533                        if path_str.contains(&format!("{data_name}_")) {
3534                            flat_files.push(path_str);
3535                        }
3536                    } else {
3537                        // This might be a subdirectory - check if it contains feather files
3538                        let subdir_path = format!("{path_str}/");
3539                        let mut subdir_stream = self
3540                            .object_store
3541                            .list(Some(&ObjectPath::from(subdir_path.as_str())));
3542
3543                        while let Some(subdir_object) = subdir_stream.next().await {
3544                            let subdir_object = subdir_object?;
3545                            let subdir_file_path = subdir_object.location.to_string();
3546
3547                            if subdir_file_path.ends_with(".feather") {
3548                                // Check identifier filter if provided
3549                                if let Some(identifiers) = identifiers {
3550                                    let subdir_name = relative_path.split('/').next().unwrap_or("");
3551                                    if !identifiers.iter().any(|id| subdir_name.contains(id)) {
3552                                        continue;
3553                                    }
3554                                }
3555                                subdirs.push(subdir_file_path);
3556                            }
3557                        }
3558                    }
3559                }
3560            }
3561
3562            Ok::<Vec<String>, anyhow::Error>([subdirs, flat_files].concat())
3563        })?;
3564
3565        files.extend(list_result);
3566        files.sort();
3567        Ok(files)
3568    }
3569
3570    /// Reads a feather file and returns all RecordBatches.
3571    ///
3572    /// This function reads an Arrow IPC stream file from the object store
3573    /// and returns all RecordBatches contained within it.
3574    fn read_feather_file(&self, file_path: &str) -> anyhow::Result<Vec<RecordBatch>> {
3575        use datafusion::arrow::ipc::reader::StreamReader;
3576
3577        let bytes = self.execute_async(async {
3578            let path = ObjectPath::from(file_path);
3579            let result = self.object_store.get(&path).await?;
3580            let bytes = result.bytes().await?;
3581            Ok::<_, anyhow::Error>(bytes)
3582        })?;
3583
3584        if bytes.is_empty() {
3585            return Ok(Vec::new());
3586        }
3587
3588        // Read the Arrow IPC stream
3589        let cursor = Cursor::new(bytes.as_ref());
3590        let reader = StreamReader::try_new(cursor, None)
3591            .map_err(|e| anyhow::anyhow!("Failed to create StreamReader: {e}"))?;
3592
3593        // Read all batches
3594        let mut batches = Vec::new();
3595
3596        for batch_result in reader {
3597            let batch = batch_result.map_err(|e| anyhow::anyhow!("Failed to read batch: {e}"))?;
3598            batches.push(batch);
3599        }
3600
3601        Ok(batches)
3602    }
3603
3604    /// Converts RecordBatches to Data objects, optionally replacing ts_init with ts_event.
3605    fn convert_record_batches_to_data<T>(
3606        &self,
3607        batches: Vec<RecordBatch>,
3608        use_ts_event_for_ts_init: bool,
3609    ) -> anyhow::Result<Vec<T>>
3610    where
3611        T: DecodeDataFromRecordBatch + TryFrom<Data>,
3612    {
3613        self.convert_record_batches_to_data_with_bar_type_conversion(
3614            batches,
3615            use_ts_event_for_ts_init,
3616            false,
3617        )
3618    }
3619
3620    /// Converts RecordBatches to Data objects with optional transforms for stream conversion.
3621    fn convert_record_batches_to_data_with_bar_type_conversion<T>(
3622        &self,
3623        batches: Vec<RecordBatch>,
3624        use_ts_event_for_ts_init: bool,
3625        convert_bar_type_to_external: bool,
3626    ) -> anyhow::Result<Vec<T>>
3627    where
3628        T: DecodeDataFromRecordBatch + TryFrom<Data>,
3629    {
3630        if batches.is_empty() {
3631            return Ok(Vec::new());
3632        }
3633
3634        // Get schema and metadata from first batch
3635        let schema = batches[0].schema();
3636        let mut metadata = schema.metadata().clone();
3637
3638        // Convert bar_type from INTERNAL to EXTERNAL if requested
3639        if convert_bar_type_to_external
3640            && let Some(bar_type_str) = metadata.get("bar_type").cloned()
3641            && bar_type_str.ends_with("-INTERNAL")
3642        {
3643            let external = bar_type_str.replace("-INTERNAL", "-EXTERNAL");
3644            metadata.insert("bar_type".to_string(), external);
3645        }
3646
3647        // Process each batch
3648        let mut all_data = Vec::new();
3649
3650        for mut batch in batches {
3651            // Handle ts_event/ts_init replacement if requested
3652            if use_ts_event_for_ts_init {
3653                let column_names: Vec<String> =
3654                    schema.fields().iter().map(|f| f.name().clone()).collect();
3655
3656                let ts_event_idx = column_names
3657                    .iter()
3658                    .position(|n| n == "ts_event")
3659                    .ok_or_else(|| anyhow::anyhow!("ts_event column not found"))?;
3660                let ts_init_idx = column_names
3661                    .iter()
3662                    .position(|n| n == "ts_init")
3663                    .ok_or_else(|| anyhow::anyhow!("ts_init column not found"))?;
3664
3665                // Create new arrays with ts_init replaced by ts_event
3666                let mut new_columns = batch.columns().to_vec();
3667                new_columns[ts_init_idx] = new_columns[ts_event_idx].clone();
3668
3669                // Create new batch with updated columns
3670                batch = RecordBatch::try_new(schema.clone(), new_columns)
3671                    .map_err(|e| anyhow::anyhow!("Failed to create new batch: {e}"))?;
3672            }
3673
3674            // Decode the batch to Data objects
3675            let data_vec = T::decode_data_batch(&metadata, batch)
3676                .map_err(|e| anyhow::anyhow!("Failed to decode batch: {e}"))?;
3677
3678            all_data.extend(data_vec);
3679        }
3680
3681        // Convert Data enum to specific type T
3682        Ok(to_variant::<T>(all_data))
3683    }
3684
3685    /// Converts RecordBatches directly to strongly typed values.
3686    fn convert_record_batches_to_typed<T>(
3687        &self,
3688        batches: Vec<RecordBatch>,
3689    ) -> anyhow::Result<Vec<T>>
3690    where
3691        T: DecodeTypedFromRecordBatch,
3692    {
3693        if batches.is_empty() {
3694            return Ok(Vec::new());
3695        }
3696
3697        let mut all_data = Vec::new();
3698
3699        for batch in batches {
3700            let metadata = batch.schema().metadata().clone();
3701            let decoded = T::decode_typed_batch(&metadata, batch)
3702                .map_err(|e| anyhow::anyhow!("Failed to decode batch: {e}"))?;
3703            all_data.extend(decoded);
3704        }
3705
3706        Ok(all_data)
3707    }
3708
3709    fn write_typed_batches<T>(&self, batches: Vec<RecordBatch>) -> anyhow::Result<()>
3710    where
3711        T: DecodeTypedFromRecordBatch + EncodeToRecordBatch + CatalogPathPrefix + HasTsInit,
3712    {
3713        let mut data: Vec<T> = self.convert_record_batches_to_typed(batches)?;
3714
3715        if !is_monotonically_increasing_by_init(&data) {
3716            data.sort_by_key(|item| item.ts_init());
3717        }
3718
3719        self.write_to_parquet(data, None, None, None)?;
3720        Ok(())
3721    }
3722
3723    pub fn convert_stream_to_data(
3724        &mut self,
3725        instance_id: &str,
3726        data_cls: &str,
3727        subdirectory: Option<&str>,
3728        identifiers: Option<&[String]>,
3729        use_ts_event_for_ts_init: bool,
3730    ) -> anyhow::Result<()> {
3731        let subdirectory = subdirectory.unwrap_or("backtest");
3732
3733        // Skip unsupported stream data types without error.
3734        if Self::is_excluded_stream_data_type(data_cls) {
3735            return Ok(());
3736        }
3737
3738        // Convert data class name to filename (e.g., "quotes" -> "quotes")
3739        // The data_cls should already be in the correct format (snake_case)
3740        let data_name = to_snake_case(data_cls);
3741
3742        // List all feather files for this data class
3743        let feather_files =
3744            self.list_feather_files(subdirectory, instance_id, &data_name, identifiers)?;
3745
3746        if feather_files.is_empty() {
3747            return Ok(());
3748        }
3749
3750        // Process each feather file independently so that each file's identifier
3751        // (instrument_id or bar_type from schema metadata) is preserved when writing
3752        // to parquet. This matches the Python _convert_feather_table_to_parquet approach.
3753        let convert_bar_type = data_cls == "bars";
3754
3755        for file_path in feather_files {
3756            let batches = self.read_feather_file(&file_path)?;
3757
3758            if batches.is_empty() {
3759                continue;
3760            }
3761
3762            match data_cls {
3763                "quotes" => {
3764                    let mut data: Vec<QuoteTick> =
3765                        self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3766
3767                    if !is_monotonically_increasing_by_init(&data) {
3768                        data.sort_by_key(|d| d.ts_init);
3769                    }
3770                    self.write_to_parquet(data, None, None, None)?;
3771                }
3772                "trades" => {
3773                    let mut data: Vec<TradeTick> =
3774                        self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3775
3776                    if !is_monotonically_increasing_by_init(&data) {
3777                        data.sort_by_key(|d| d.ts_init);
3778                    }
3779                    self.write_to_parquet(data, None, None, None)?;
3780                }
3781                "order_book_deltas" => {
3782                    let mut data: Vec<OrderBookDelta> =
3783                        self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3784
3785                    if !is_monotonically_increasing_by_init(&data) {
3786                        data.sort_by_key(|d| d.ts_init);
3787                    }
3788                    self.write_to_parquet(data, None, None, None)?;
3789                }
3790                "order_book_depths" => {
3791                    let mut data: Vec<OrderBookDepth10> =
3792                        self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3793
3794                    if !is_monotonically_increasing_by_init(&data) {
3795                        data.sort_by_key(|d| d.ts_init);
3796                    }
3797                    self.write_to_parquet(data, None, None, None)?;
3798                }
3799                "bars" => {
3800                    let mut data: Vec<Bar> = self
3801                        .convert_record_batches_to_data_with_bar_type_conversion(
3802                            batches,
3803                            use_ts_event_for_ts_init,
3804                            convert_bar_type,
3805                        )?;
3806
3807                    if !is_monotonically_increasing_by_init(&data) {
3808                        data.sort_by_key(|d| d.ts_init);
3809                    }
3810                    self.write_to_parquet(data, None, None, None)?;
3811                }
3812                "index_prices" => {
3813                    let mut data: Vec<IndexPriceUpdate> =
3814                        self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3815
3816                    if !is_monotonically_increasing_by_init(&data) {
3817                        data.sort_by_key(|d| d.ts_init);
3818                    }
3819                    self.write_to_parquet(data, None, None, None)?;
3820                }
3821                "mark_prices" => {
3822                    let mut data: Vec<MarkPriceUpdate> =
3823                        self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3824
3825                    if !is_monotonically_increasing_by_init(&data) {
3826                        data.sort_by_key(|d| d.ts_init);
3827                    }
3828                    self.write_to_parquet(data, None, None, None)?;
3829                }
3830                "instrument_status" => {
3831                    self.write_typed_batches::<InstrumentStatus>(batches)?;
3832                }
3833                "instrument_closes" => {
3834                    let mut data: Vec<InstrumentClose> =
3835                        self.convert_record_batches_to_data(batches, use_ts_event_for_ts_init)?;
3836
3837                    if !is_monotonically_increasing_by_init(&data) {
3838                        data.sort_by_key(|d| d.ts_init);
3839                    }
3840                    self.write_to_parquet(data, None, None, None)?;
3841                }
3842                "funding_rate_update" => {
3843                    self.write_typed_batches::<FundingRateUpdate>(batches)?;
3844                }
3845                "account_state" => {
3846                    self.write_typed_batches::<AccountState>(batches)?;
3847                }
3848                "order_initialized" => {
3849                    self.write_typed_batches::<OrderInitialized>(batches)?;
3850                }
3851                "order_denied" => {
3852                    self.write_typed_batches::<OrderDenied>(batches)?;
3853                }
3854                "order_emulated" => {
3855                    self.write_typed_batches::<OrderEmulated>(batches)?;
3856                }
3857                "order_submitted" => {
3858                    self.write_typed_batches::<OrderSubmitted>(batches)?;
3859                }
3860                "order_accepted" => {
3861                    self.write_typed_batches::<OrderAccepted>(batches)?;
3862                }
3863                "order_rejected" => {
3864                    self.write_typed_batches::<OrderRejected>(batches)?;
3865                }
3866                "order_pending_cancel" => {
3867                    self.write_typed_batches::<OrderPendingCancel>(batches)?;
3868                }
3869                "order_canceled" => {
3870                    self.write_typed_batches::<OrderCanceled>(batches)?;
3871                }
3872                "order_cancel_rejected" => {
3873                    self.write_typed_batches::<OrderCancelRejected>(batches)?;
3874                }
3875                "order_expired" => {
3876                    self.write_typed_batches::<OrderExpired>(batches)?;
3877                }
3878                "order_triggered" => {
3879                    self.write_typed_batches::<OrderTriggered>(batches)?;
3880                }
3881                "order_pending_update" => {
3882                    self.write_typed_batches::<OrderPendingUpdate>(batches)?;
3883                }
3884                "order_released" => {
3885                    self.write_typed_batches::<OrderReleased>(batches)?;
3886                }
3887                "order_modify_rejected" => {
3888                    self.write_typed_batches::<OrderModifyRejected>(batches)?;
3889                }
3890                "order_updated" => {
3891                    self.write_typed_batches::<OrderUpdated>(batches)?;
3892                }
3893                "order_filled" => {
3894                    self.write_typed_batches::<OrderFilled>(batches)?;
3895                }
3896                "position_opened" => {
3897                    self.write_typed_batches::<PositionOpened>(batches)?;
3898                }
3899                "position_changed" => {
3900                    self.write_typed_batches::<PositionChanged>(batches)?;
3901                }
3902                "position_closed" => {
3903                    self.write_typed_batches::<PositionClosed>(batches)?;
3904                }
3905                "position_adjusted" => {
3906                    self.write_typed_batches::<PositionAdjusted>(batches)?;
3907                }
3908                "order_snapshot" => {
3909                    self.write_typed_batches::<OrderSnapshot>(batches)?;
3910                }
3911                "position_snapshot" => {
3912                    self.write_typed_batches::<PositionSnapshot>(batches)?;
3913                }
3914                "order_status_report" => {
3915                    self.write_typed_batches::<OrderStatusReport>(batches)?;
3916                }
3917                "fill_report" => {
3918                    self.write_typed_batches::<FillReport>(batches)?;
3919                }
3920                "position_status_report" => {
3921                    self.write_typed_batches::<PositionStatusReport>(batches)?;
3922                }
3923                "execution_mass_status" => {
3924                    self.write_typed_batches::<ExecutionMassStatus>(batches)?;
3925                }
3926                _ => {
3927                    if data_cls.starts_with("custom/") {
3928                        let data =
3929                            self.decode_custom_batches_to_data(batches, use_ts_event_for_ts_init)?;
3930                        let custom_items: Vec<CustomData> = data
3931                            .into_iter()
3932                            .filter_map(|d| match d {
3933                                Data::Custom(c) => Some(c),
3934                                _ => None,
3935                            })
3936                            .collect();
3937
3938                        if !custom_items.is_empty() {
3939                            self.write_custom_data_batch(custom_items, None, None, None)?;
3940                        }
3941                    } else {
3942                        anyhow::bail!("Unknown data class: {data_cls}");
3943                    }
3944                }
3945            }
3946        }
3947
3948        Ok(())
3949    }
3950}
3951
3952/// Trait for providing catalog path prefixes for different data types.
3953///
3954/// This trait enables type-safe organization of data within the catalog by providing
3955/// a standardized way to determine the directory structure for each data type.
3956/// Each data type maps to a specific subdirectory within the catalog's data folder.
3957///
3958/// # Implementation
3959///
3960/// Types implementing this trait should return a static string that represents
3961/// the directory name where data of that type should be stored.
3962///
3963/// # Examples
3964///
3965/// ```rust
3966/// use nautilus_persistence::backend::catalog::CatalogPathPrefix;
3967/// use nautilus_model::data::QuoteTick;
3968///
3969/// assert_eq!(QuoteTick::path_prefix(), "quotes");
3970/// ```
3971pub trait CatalogPathPrefix {
3972    /// Returns the path prefix (directory name) for this data type.
3973    ///
3974    /// # Returns
3975    ///
3976    /// A static string representing the directory name where this data type is stored.
3977    fn path_prefix() -> &'static str;
3978}
3979
3980/// Macro for implementing [`CatalogPathPrefix`] for data types.
3981///
3982/// This macro provides a convenient way to implement the trait for multiple types
3983/// with their corresponding path prefixes.
3984///
3985/// # Parameters
3986///
3987/// - `$type`: The data type to implement the trait for.
3988/// - `$path`: The path prefix string for that type.
3989macro_rules! impl_catalog_path_prefix {
3990    ($type:ty, $path:expr) => {
3991        impl CatalogPathPrefix for $type {
3992            fn path_prefix() -> &'static str {
3993                $path
3994            }
3995        }
3996    };
3997}
3998
3999// Standard implementations for financial data types
4000impl_catalog_path_prefix!(QuoteTick, "quotes");
4001impl_catalog_path_prefix!(TradeTick, "trades");
4002impl_catalog_path_prefix!(OrderBookDelta, "order_book_deltas");
4003impl_catalog_path_prefix!(OrderBookDepth10, "order_book_depths");
4004impl_catalog_path_prefix!(Bar, "bars");
4005impl_catalog_path_prefix!(IndexPriceUpdate, "index_prices");
4006impl_catalog_path_prefix!(MarkPriceUpdate, "mark_prices");
4007impl_catalog_path_prefix!(FundingRateUpdate, "funding_rate_update");
4008impl_catalog_path_prefix!(InstrumentStatus, "instrument_status");
4009impl_catalog_path_prefix!(InstrumentClose, "instrument_closes");
4010impl_catalog_path_prefix!(InstrumentAny, "instruments");
4011impl_catalog_path_prefix!(AccountState, "account_state");
4012impl_catalog_path_prefix!(OrderInitialized, "order_initialized");
4013impl_catalog_path_prefix!(OrderDenied, "order_denied");
4014impl_catalog_path_prefix!(OrderEmulated, "order_emulated");
4015impl_catalog_path_prefix!(OrderSubmitted, "order_submitted");
4016impl_catalog_path_prefix!(OrderAccepted, "order_accepted");
4017impl_catalog_path_prefix!(OrderRejected, "order_rejected");
4018impl_catalog_path_prefix!(OrderPendingCancel, "order_pending_cancel");
4019impl_catalog_path_prefix!(OrderCanceled, "order_canceled");
4020impl_catalog_path_prefix!(OrderCancelRejected, "order_cancel_rejected");
4021impl_catalog_path_prefix!(OrderExpired, "order_expired");
4022impl_catalog_path_prefix!(OrderTriggered, "order_triggered");
4023impl_catalog_path_prefix!(OrderPendingUpdate, "order_pending_update");
4024impl_catalog_path_prefix!(OrderReleased, "order_released");
4025impl_catalog_path_prefix!(OrderModifyRejected, "order_modify_rejected");
4026impl_catalog_path_prefix!(OrderUpdated, "order_updated");
4027impl_catalog_path_prefix!(OrderFilled, "order_filled");
4028impl_catalog_path_prefix!(PositionOpened, "position_opened");
4029impl_catalog_path_prefix!(PositionChanged, "position_changed");
4030impl_catalog_path_prefix!(PositionClosed, "position_closed");
4031impl_catalog_path_prefix!(PositionAdjusted, "position_adjusted");
4032impl_catalog_path_prefix!(OrderSnapshot, "order_snapshot");
4033impl_catalog_path_prefix!(PositionSnapshot, "position_snapshot");
4034impl_catalog_path_prefix!(OrderStatusReport, "order_status_report");
4035impl_catalog_path_prefix!(FillReport, "fill_report");
4036impl_catalog_path_prefix!(PositionStatusReport, "position_status_report");
4037impl_catalog_path_prefix!(ExecutionMassStatus, "execution_mass_status");
4038
4039/// Converts timestamps to a filename using ISO 8601 format.
4040///
4041/// This function converts two Unix nanosecond timestamps to a filename that uses
4042/// ISO 8601 format with filesystem-safe characters. The format matches the Python
4043/// implementation for consistency.
4044///
4045/// # Parameters
4046///
4047/// - `timestamp_1`: First timestamp in Unix nanoseconds.
4048/// - `timestamp_2`: Second timestamp in Unix nanoseconds.
4049///
4050/// # Returns
4051///
4052/// Returns a filename string in the format: "`iso_timestamp_1_iso_timestamp_2.parquet`".
4053///
4054/// # Examples
4055///
4056/// ```rust
4057/// # use nautilus_persistence::backend::catalog::timestamps_to_filename;
4058/// # use nautilus_core::UnixNanos;
4059/// let filename = timestamps_to_filename(
4060///     UnixNanos::from(1609459200000000000),
4061///     UnixNanos::from(1609545600000000000)
4062/// );
4063/// // Returns something like: "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet"
4064/// ```
4065#[must_use]
4066pub fn timestamps_to_filename(timestamp_1: UnixNanos, timestamp_2: UnixNanos) -> String {
4067    let datetime_1 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_1));
4068    let datetime_2 = iso_timestamp_to_file_timestamp(&unix_nanos_to_iso8601(timestamp_2));
4069
4070    format!("{datetime_1}_{datetime_2}.parquet")
4071}
4072
4073/// Converts an ISO 8601 timestamp to a filesystem-safe format.
4074///
4075/// This function replaces colons and dots with hyphens to make the timestamp
4076/// safe for use in filenames across different filesystems.
4077///
4078/// # Parameters
4079///
4080/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
4081///
4082/// # Returns
4083///
4084/// Returns a filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
4085///
4086/// # Examples
4087///
4088/// ```rust
4089/// # use nautilus_persistence::backend::catalog::iso_timestamp_to_file_timestamp;
4090/// let safe_timestamp = iso_timestamp_to_file_timestamp("2023-10-26T07:30:50.123456789Z");
4091/// assert_eq!(safe_timestamp, "2023-10-26T07-30-50-123456789Z");
4092/// ```
4093fn iso_timestamp_to_file_timestamp(iso_timestamp: &str) -> String {
4094    iso_timestamp.replace([':', '.'], "-")
4095}
4096
4097/// Converts a filesystem-safe timestamp back to ISO 8601 format.
4098///
4099/// This function reverses the transformation done by `iso_timestamp_to_file_timestamp`,
4100/// converting filesystem-safe timestamps back to standard ISO 8601 format.
4101///
4102/// # Parameters
4103///
4104/// - `file_timestamp`: Filesystem-safe timestamp string (e.g., "2023-10-26T07-30-50-123456789Z").
4105///
4106/// # Returns
4107///
4108/// Returns an ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
4109///
4110/// # Examples
4111///
4112/// ```rust
4113/// # use nautilus_persistence::backend::catalog::file_timestamp_to_iso_timestamp;
4114/// let iso_timestamp = file_timestamp_to_iso_timestamp("2023-10-26T07-30-50-123456789Z");
4115/// assert_eq!(iso_timestamp, "2023-10-26T07:30:50.123456789Z");
4116/// ```
4117fn file_timestamp_to_iso_timestamp(file_timestamp: &str) -> String {
4118    let (date_part, time_part) = file_timestamp
4119        .split_once('T')
4120        .unwrap_or((file_timestamp, ""));
4121    let time_part = time_part.strip_suffix('Z').unwrap_or(time_part);
4122
4123    // Find the last hyphen to separate nanoseconds
4124    if let Some(last_hyphen_idx) = time_part.rfind('-') {
4125        let time_with_dot_for_nanos = format!(
4126            "{}.{}",
4127            &time_part[..last_hyphen_idx],
4128            &time_part[last_hyphen_idx + 1..]
4129        );
4130        let final_time_part = time_with_dot_for_nanos.replace('-', ":");
4131        format!("{date_part}T{final_time_part}Z")
4132    } else {
4133        // Fallback if no nanoseconds part found
4134        let final_time_part = time_part.replace('-', ":");
4135        format!("{date_part}T{final_time_part}Z")
4136    }
4137}
4138
4139/// Converts an ISO 8601 timestamp string to Unix nanoseconds.
4140///
4141/// This function parses an ISO 8601 timestamp and converts it to Unix nanoseconds.
4142/// It's used to convert parsed timestamps back to the internal representation.
4143///
4144/// # Parameters
4145///
4146/// - `iso_timestamp`: ISO 8601 timestamp string (e.g., "2023-10-26T07:30:50.123456789Z").
4147///
4148/// # Returns
4149///
4150/// Returns `Ok(u64)` with the Unix nanoseconds timestamp, or an error if parsing fails.
4151///
4152/// # Examples
4153///
4154/// ```rust
4155/// # use nautilus_persistence::backend::catalog::iso_to_unix_nanos;
4156/// let nanos = iso_to_unix_nanos("2021-01-01T00:00:00.000000000Z").unwrap();
4157/// assert_eq!(nanos, 1609459200000000000);
4158/// ```
4159fn iso_to_unix_nanos(iso_timestamp: &str) -> anyhow::Result<u64> {
4160    Ok(iso8601_to_unix_nanos(iso_timestamp)?.into())
4161}
4162
4163/// Converts an instrument ID to a URI-safe format by removing forward slashes
4164/// and replacing carets with underscores.
4165///
4166/// Some instrument IDs contain forward slashes (e.g., "BTC/USD") which are not
4167/// suitable for use in file paths. This function transforms these characters to
4168/// create a safe directory name.
4169///
4170/// # Parameters
4171///
4172/// - `instrument_id`: The original instrument ID string.
4173///
4174/// # Returns
4175///
4176/// A URI-safe version of the instrument ID with forward slashes removed and carets replaced.
4177///
4178/// # Examples
4179///
4180/// ```rust
4181/// # use nautilus_persistence::backend::catalog::urisafe_instrument_id;
4182/// assert_eq!(urisafe_instrument_id("BTC/USD"), "BTCUSD");
4183/// assert_eq!(urisafe_instrument_id("EUR-USD"), "EUR-USD");
4184/// assert_eq!(urisafe_instrument_id("^SPX.CBOE"), "_SPX.CBOE");
4185/// ```
4186pub fn urisafe_instrument_id(instrument_id: &str) -> String {
4187    instrument_id.replace('/', "").replace('^', "_")
4188}
4189
4190// Extract the instrument ID portion from a bar type directory name.
4191// Handles both standard and composite formats:
4192//   {id}-{step}-{agg}-{price}-{source}
4193//   {id}-{step}-{agg}-{price}-{source}@{step}-{agg}-{source}
4194// Strips the composite suffix before parsing with rsplitn(5, '-').
4195fn extract_bar_type_instrument_id(bar_type_dir: &str) -> Option<&str> {
4196    let standard = bar_type_dir.split('@').next().unwrap_or(bar_type_dir);
4197    let pieces: Vec<&str> = standard.rsplitn(5, '-').collect();
4198    // pieces (reversed): [source, price_type, agg, step, instrument_id]
4199    if pieces.len() == 5 && pieces[3].chars().all(|c| c.is_ascii_digit()) {
4200        Some(pieces[4])
4201    } else {
4202        None
4203    }
4204}
4205
4206/// Normalizes a custom data identifier for use in directory paths.
4207/// Replaces `//` with `/`, and filters out empty segments and `..` to prevent path traversal.
4208#[must_use]
4209pub fn safe_directory_identifier(identifier: &str) -> String {
4210    let normalized = identifier.replace("//", "/");
4211    let segments: Vec<&str> = normalized
4212        .split('/')
4213        .filter(|s| !s.is_empty() && *s != "..")
4214        .collect();
4215    segments.join("/")
4216}
4217
4218/// Extracts the identifier from a file path.
4219///
4220/// The identifier is typically the second-to-last path component (directory name).
4221/// For example, from "`data/quote_tick/EURUSD/file.parquet`", extracts "EURUSD".
4222#[must_use]
4223pub fn extract_identifier_from_path(file_path: &str) -> String {
4224    let path_parts: Vec<&str> = file_path.split('/').collect();
4225    if path_parts.len() >= 2 {
4226        path_parts[path_parts.len() - 2].to_string()
4227    } else {
4228        "unknown".to_string()
4229    }
4230}
4231
4232/// Makes an identifier safe for use in SQL table names.
4233///
4234/// Removes forward slashes, replaces dots, hyphens, and spaces with underscores, and converts to lowercase.
4235#[must_use]
4236pub fn make_sql_safe_identifier(identifier: &str) -> String {
4237    urisafe_instrument_id(identifier)
4238        .replace(['.', '-', ' ', '%'], "_")
4239        .to_lowercase()
4240}
4241
4242/// Extracts the filename from a file path and makes it SQL-safe.
4243///
4244/// For example, from "data/quote_tick/EURUSD/2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
4245/// extracts "`2021_01_01t00_00_00_000000000z_2021_01_02t00_00_00_000000000z`".
4246#[must_use]
4247pub fn extract_sql_safe_filename(file_path: &str) -> String {
4248    if file_path.is_empty() {
4249        return "unknown_file".to_string();
4250    }
4251
4252    let filename = file_path.split('/').next_back().unwrap_or("unknown_file");
4253
4254    // Remove .parquet extension
4255    let name_without_ext = if let Some(dot_pos) = filename.rfind(".parquet") {
4256        &filename[..dot_pos]
4257    } else {
4258        filename
4259    };
4260
4261    // Remove characters that can pose problems: hyphens, colons, etc.
4262    name_without_ext
4263        .replace(['-', ':', '.'], "_")
4264        .to_lowercase()
4265}
4266
4267/// Creates a platform-appropriate local path using `PathBuf`.
4268///
4269/// This function constructs file system paths using the platform's native path separators.
4270/// Use this for local file operations that need to work with the actual file system.
4271///
4272/// # Arguments
4273///
4274/// * `base_path` - The base directory path
4275/// * `components` - Path components to join
4276///
4277/// # Returns
4278///
4279/// A `PathBuf` with platform-appropriate separators
4280///
4281/// # Examples
4282///
4283/// ```rust
4284/// # use nautilus_persistence::backend::catalog::make_local_path;
4285/// let path = make_local_path("/base", &["data", "quotes", "EURUSD"]);
4286/// // On Unix: "/base/data/quotes/EURUSD"
4287/// // On Windows: "\base\data\quotes\EURUSD"
4288/// ```
4289pub fn make_local_path<P: AsRef<Path>>(base_path: P, components: &[&str]) -> PathBuf {
4290    let mut path = PathBuf::from(base_path.as_ref());
4291    for component in components {
4292        path.push(component);
4293    }
4294    path
4295}
4296
4297/// Creates an object store path using forward slashes.
4298///
4299/// Object stores (S3, GCS, etc.) always expect forward slashes regardless of platform.
4300/// Use this when creating paths for object store operations.
4301///
4302/// # Arguments
4303///
4304/// * `base_path` - The base path (can be empty)
4305/// * `components` - Path components to join
4306///
4307/// # Returns
4308///
4309/// A string path with forward slash separators
4310///
4311/// # Examples
4312///
4313/// ```rust
4314/// # use nautilus_persistence::backend::catalog::make_object_store_path;
4315/// let path = make_object_store_path("base", &["data", "quotes", "EURUSD"]);
4316/// assert_eq!(path, "base/data/quotes/EURUSD");
4317/// ```
4318#[must_use]
4319pub fn make_object_store_path(base_path: &str, components: &[&str]) -> String {
4320    let mut parts = Vec::new();
4321
4322    if !base_path.is_empty() {
4323        let normalized_base = base_path
4324            .replace('\\', "/")
4325            .trim_end_matches('/')
4326            .to_string();
4327
4328        if !normalized_base.is_empty() {
4329            parts.push(normalized_base);
4330        }
4331    }
4332
4333    for component in components {
4334        let normalized_component = component
4335            .replace('\\', "/")
4336            .trim_start_matches('/')
4337            .trim_end_matches('/')
4338            .to_string();
4339
4340        if !normalized_component.is_empty() {
4341            parts.push(normalized_component);
4342        }
4343    }
4344
4345    parts.join("/")
4346}
4347
4348/// Creates an object store path using forward slashes with owned strings.
4349///
4350/// This variant accepts owned strings to avoid lifetime issues.
4351///
4352/// # Arguments
4353///
4354/// * `base_path` - The base path (can be empty)
4355/// * `components` - Path components to join (owned strings)
4356///
4357/// # Returns
4358///
4359/// A string path with forward slash separators
4360#[must_use]
4361pub fn make_object_store_path_owned(base_path: &str, components: Vec<String>) -> String {
4362    let mut parts = Vec::new();
4363
4364    if !base_path.is_empty() {
4365        let normalized_base = base_path
4366            .replace('\\', "/")
4367            .trim_end_matches('/')
4368            .to_string();
4369
4370        if !normalized_base.is_empty() {
4371            parts.push(normalized_base);
4372        }
4373    }
4374
4375    for component in components {
4376        let normalized_component = component
4377            .replace('\\', "/")
4378            .trim_start_matches('/')
4379            .trim_end_matches('/')
4380            .to_string();
4381
4382        if !normalized_component.is_empty() {
4383            parts.push(normalized_component);
4384        }
4385    }
4386
4387    parts.join("/")
4388}
4389
4390/// Converts a local `PathBuf` to an object store path string.
4391///
4392/// This function normalizes a local file system path to the forward-slash format
4393/// expected by object stores, handling platform differences.
4394///
4395/// # Arguments
4396///
4397/// * `local_path` - The local `PathBuf` to convert
4398///
4399/// # Returns
4400///
4401/// A string with forward slash separators suitable for object store operations
4402///
4403/// # Examples
4404///
4405/// ```rust
4406/// # use std::path::PathBuf;
4407/// # use nautilus_persistence::backend::catalog::local_to_object_store_path;
4408/// let local_path = PathBuf::from("data").join("quotes").join("EURUSD");
4409/// let object_path = local_to_object_store_path(&local_path);
4410/// assert_eq!(object_path, "data/quotes/EURUSD");
4411/// ```
4412#[must_use]
4413pub fn local_to_object_store_path(local_path: &Path) -> String {
4414    local_path.to_string_lossy().replace('\\', "/")
4415}
4416
4417/// Extracts path components using platform-appropriate path parsing.
4418///
4419/// This function safely parses a path into its components, handling both
4420/// local file system paths and object store paths correctly.
4421///
4422/// # Arguments
4423///
4424/// * `path_str` - The path string to parse
4425///
4426/// # Returns
4427///
4428/// A vector of path components
4429///
4430/// # Examples
4431///
4432/// ```rust
4433/// # use nautilus_persistence::backend::catalog::extract_path_components;
4434/// let components = extract_path_components("data/quotes/EURUSD");
4435/// assert_eq!(components, vec!["data", "quotes", "EURUSD"]);
4436///
4437/// // Works with both separators
4438/// let components = extract_path_components("data\\quotes\\EURUSD");
4439/// assert_eq!(components, vec!["data", "quotes", "EURUSD"]);
4440/// ```
4441#[must_use]
4442pub fn extract_path_components(path_str: &str) -> Vec<String> {
4443    // Normalize separators and split
4444    let normalized = path_str.replace('\\', "/");
4445    normalized
4446        .split('/')
4447        .filter(|s| !s.is_empty())
4448        .map(ToString::to_string)
4449        .collect()
4450}
4451
4452/// Checks if a filename's timestamp range intersects with a query interval.
4453///
4454/// This function determines whether a Parquet file (identified by its timestamp-based
4455/// filename) contains data that falls within the specified query time range.
4456///
4457/// # Parameters
4458///
4459/// - `filename`: The filename to check (format: "`iso_timestamp_1_iso_timestamp_2.parquet`").
4460/// - `start`: Optional start timestamp for the query range.
4461/// - `end`: Optional end timestamp for the query range.
4462///
4463/// # Returns
4464///
4465/// Returns `true` if the file's time range intersects with the query range,
4466/// `false` otherwise. Returns `true` if the filename cannot be parsed.
4467///
4468/// # Examples
4469///
4470/// ```rust
4471/// # use nautilus_persistence::backend::catalog::query_intersects_filename;
4472/// // Example with ISO format filenames
4473/// assert!(query_intersects_filename(
4474///     "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet",
4475///     Some(1609459200000000000),
4476///     Some(1609545600000000000)
4477/// ));
4478/// ```
4479fn query_intersects_filename(filename: &str, start: Option<u64>, end: Option<u64>) -> bool {
4480    if let Some((file_start, file_end)) = parse_filename_timestamps(filename) {
4481        (start.is_none() || start.unwrap() <= file_end)
4482            && (end.is_none() || file_start <= end.unwrap())
4483    } else {
4484        true
4485    }
4486}
4487
4488/// Parses timestamps from a Parquet filename.
4489///
4490/// Extracts the start and end timestamps from filenames that follow the ISO 8601 format:
4491/// "`iso_timestamp_1_iso_timestamp_2.parquet`" (e.g., "2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet")
4492///
4493/// # Parameters
4494///
4495/// - `filename`: The filename to parse (can be a full path).
4496///
4497/// # Returns
4498///
4499/// Returns `Some((start_ts, end_ts))` if the filename matches the expected format,
4500/// `None` otherwise.
4501///
4502/// # Examples
4503///
4504/// ```rust
4505/// # use nautilus_persistence::backend::catalog::parse_filename_timestamps;
4506/// assert!(parse_filename_timestamps("2021-01-01T00-00-00-000000000Z_2021-01-02T00-00-00-000000000Z.parquet").is_some());
4507/// assert_eq!(parse_filename_timestamps("invalid.parquet"), None);
4508/// ```
4509#[must_use]
4510pub fn parse_filename_timestamps(filename: &str) -> Option<(u64, u64)> {
4511    let path = Path::new(filename);
4512    let base_name = path.file_name()?.to_str()?;
4513    let base_filename = base_name.strip_suffix(".parquet")?;
4514    let (first_part, second_part) = base_filename.split_once('_')?;
4515
4516    let first_iso = file_timestamp_to_iso_timestamp(first_part);
4517    let second_iso = file_timestamp_to_iso_timestamp(second_part);
4518
4519    let first_ts = iso_to_unix_nanos(&first_iso).ok()?;
4520    let second_ts = iso_to_unix_nanos(&second_iso).ok()?;
4521
4522    Some((first_ts, second_ts))
4523}
4524
4525/// Checks if a list of closed integer intervals are all mutually disjoint.
4526///
4527/// Two intervals are disjoint if they do not overlap. This function validates that
4528/// all intervals in the list are non-overlapping, which is a requirement for
4529/// maintaining data integrity in the catalog.
4530///
4531/// # Parameters
4532///
4533/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
4534///
4535/// # Returns
4536///
4537/// Returns `true` if all intervals are disjoint, `false` if any overlap is found.
4538/// Returns `true` for empty lists or lists with a single interval.
4539///
4540/// # Examples
4541///
4542/// ```rust
4543/// # use nautilus_persistence::backend::catalog::are_intervals_disjoint;
4544/// // Disjoint intervals
4545/// assert!(are_intervals_disjoint(&[(1, 5), (10, 15), (20, 25)]));
4546///
4547/// // Overlapping intervals
4548/// assert!(!are_intervals_disjoint(&[(1, 10), (5, 15)]));
4549/// ```
4550#[must_use]
4551pub fn are_intervals_disjoint(intervals: &[(u64, u64)]) -> bool {
4552    let n = intervals.len();
4553
4554    if n <= 1 {
4555        return true;
4556    }
4557
4558    let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
4559    sorted_intervals.sort_by_key(|&(start, _)| start);
4560
4561    for i in 0..(n - 1) {
4562        let (_, end1) = sorted_intervals[i];
4563        let (start2, _) = sorted_intervals[i + 1];
4564
4565        if end1 >= start2 {
4566            return false;
4567        }
4568    }
4569
4570    true
4571}
4572
4573/// Checks if intervals are contiguous (adjacent with no gaps).
4574///
4575/// Intervals are contiguous if, when sorted by start time, each interval's start
4576/// timestamp is exactly one more than the previous interval's end timestamp.
4577/// This ensures complete coverage of a time range with no gaps.
4578///
4579/// # Parameters
4580///
4581/// - `intervals`: A slice of timestamp intervals as (start, end) tuples.
4582///
4583/// # Returns
4584///
4585/// Returns `true` if all intervals are contiguous, `false` if any gaps are found.
4586/// Returns `true` for empty lists or lists with a single interval.
4587///
4588/// # Examples
4589///
4590/// ```rust
4591/// # use nautilus_persistence::backend::catalog::are_intervals_contiguous;
4592/// // Contiguous intervals
4593/// assert!(are_intervals_contiguous(&[(1, 5), (6, 10), (11, 15)]));
4594///
4595/// // Non-contiguous intervals (gap between 5 and 8)
4596/// assert!(!are_intervals_contiguous(&[(1, 5), (8, 10)]));
4597/// ```
4598#[must_use]
4599pub fn are_intervals_contiguous(intervals: &[(u64, u64)]) -> bool {
4600    let n = intervals.len();
4601    if n <= 1 {
4602        return true;
4603    }
4604
4605    let mut sorted_intervals: Vec<(u64, u64)> = intervals.to_vec();
4606    sorted_intervals.sort_by_key(|&(start, _)| start);
4607
4608    for i in 0..(n - 1) {
4609        let (_, end1) = sorted_intervals[i];
4610        let (start2, _) = sorted_intervals[i + 1];
4611
4612        if end1 + 1 != start2 {
4613            return false;
4614        }
4615    }
4616
4617    true
4618}
4619
4620/// Finds the parts of a query interval that are not covered by existing data intervals.
4621///
4622/// This function calculates the "gaps" in data coverage by comparing a requested
4623/// time range against the intervals covered by existing data files. It's used to
4624/// determine what data needs to be fetched or backfilled.
4625///
4626/// # Parameters
4627///
4628/// - `start`: Start timestamp of the query interval (inclusive).
4629/// - `end`: End timestamp of the query interval (inclusive).
4630/// - `closed_intervals`: Existing data intervals as (start, end) tuples.
4631///
4632/// # Returns
4633///
4634/// Returns a vector of (start, end) tuples representing the gaps in coverage.
4635/// Returns an empty vector if the query range is invalid or fully covered.
4636///
4637/// # Examples
4638///
4639/// ```rust
4640/// # use nautilus_persistence::backend::catalog::query_interval_diff;
4641/// // Query 1-100, have data for 10-30 and 60-80
4642/// let gaps = query_interval_diff(1, 100, &[(10, 30), (60, 80)]);
4643/// assert_eq!(gaps, vec![(1, 9), (31, 59), (81, 100)]);
4644/// ```
4645fn query_interval_diff(start: u64, end: u64, closed_intervals: &[(u64, u64)]) -> Vec<(u64, u64)> {
4646    if start > end {
4647        return Vec::new();
4648    }
4649
4650    let interval_set = get_interval_set(closed_intervals);
4651    let query_range = (RangeBound::Included(start), RangeBound::Included(end));
4652    let query_diff = interval_set.get_interval_difference(&query_range);
4653    let mut result: Vec<(u64, u64)> = Vec::new();
4654
4655    for interval in query_diff {
4656        if let Some(tuple) = interval_to_tuple(interval, start, end) {
4657            result.push(tuple);
4658        }
4659    }
4660
4661    result
4662}
4663
4664/// Creates an interval tree from closed integer intervals.
4665///
4666/// This function converts closed intervals [a, b] into half-open intervals [a, b+1)
4667/// for use with the interval tree data structure, which is used for efficient
4668/// interval operations and gap detection.
4669///
4670/// # Parameters
4671///
4672/// - `intervals`: A slice of closed intervals as (start, end) tuples.
4673///
4674/// # Returns
4675///
4676/// Returns an [`IntervalTree`] containing the converted intervals.
4677///
4678/// # Notes
4679///
4680/// - Invalid intervals (where start > end) are skipped.
4681/// - Uses saturating addition to prevent overflow when converting to half-open intervals.
4682fn get_interval_set(intervals: &[(u64, u64)]) -> IntervalTree<u64> {
4683    let mut tree = IntervalTree::default();
4684
4685    if intervals.is_empty() {
4686        return tree;
4687    }
4688
4689    for &(start, end) in intervals {
4690        if start > end {
4691            continue;
4692        }
4693
4694        tree.insert((
4695            RangeBound::Included(start),
4696            RangeBound::Excluded(end.saturating_add(1)),
4697        ));
4698    }
4699
4700    tree
4701}
4702
4703/// Converts an interval tree result back to a closed interval tuple.
4704///
4705/// This helper function converts the bounded interval representation used by
4706/// the interval tree back into the (start, end) tuple format used throughout
4707/// the catalog.
4708///
4709/// # Parameters
4710///
4711/// - `interval`: The bounded interval from the interval tree.
4712/// - `query_start`: The start of the original query range.
4713/// - `query_end`: The end of the original query range.
4714///
4715/// # Returns
4716///
4717/// Returns `Some((start, end))` for valid intervals, `None` for empty intervals.
4718fn interval_to_tuple(
4719    interval: (RangeBound<&u64>, RangeBound<&u64>),
4720    query_start: u64,
4721    query_end: u64,
4722) -> Option<(u64, u64)> {
4723    let (bound_start, bound_end) = interval;
4724
4725    let start = match bound_start {
4726        RangeBound::Included(val) => *val,
4727        RangeBound::Excluded(val) => val.saturating_add(1),
4728        RangeBound::Unbounded => query_start,
4729    };
4730
4731    let end = match bound_end {
4732        RangeBound::Included(val) => *val,
4733        RangeBound::Excluded(val) => {
4734            if *val == 0 {
4735                return None; // Empty interval
4736            }
4737            val - 1
4738        }
4739        RangeBound::Unbounded => query_end,
4740    };
4741
4742    if start <= end {
4743        Some((start, end))
4744    } else {
4745        None
4746    }
4747}