Skip to main content

nautilus_persistence/backend/
catalog_operations.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Catalog operations for data consolidation and reset functionality.
17//!
18//! This module contains the consolidation and reset operations for the `ParquetDataCatalog`.
19//! These operations are separated into their own module for better organization and maintainability.
20
21use ahash::{AHashMap, AHashSet};
22use futures::StreamExt;
23use nautilus_core::UnixNanos;
24use nautilus_model::data::{
25    Bar, CustomData, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta,
26    OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose,
27};
28use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
29use object_store::{ObjectStoreExt, path::Path as ObjectPath};
30
31use crate::{
32    backend::catalog::{
33        CatalogPathPrefix, ParquetDataCatalog, are_intervals_contiguous, are_intervals_disjoint,
34        extract_path_components, make_object_store_path, parse_filename_timestamps,
35        timestamps_to_filename,
36    },
37    parquet::{
38        combine_parquet_files_from_object_store, min_max_from_parquet_metadata_object_store,
39    },
40};
41
42/// Information about a consolidation query to be executed.
43///
44/// This struct encapsulates all the information needed to execute a single consolidation
45/// operation, including the data range to query and file naming strategy.
46///
47/// # Fields
48///
49/// - `query_start`: Start timestamp for the data query range (inclusive, in nanoseconds).
50/// - `query_end`: End timestamp for the data query range (inclusive, in nanoseconds).
51/// - `use_period_boundaries`: If true, uses period boundaries for file naming; if false, uses actual data timestamps.
52///
53/// # Usage
54///
55/// This struct is used internally by the consolidation system to plan and execute
56/// data consolidation operations. It allows the system to:
57/// - Separate query planning from execution.
58/// - Handle complex scenarios like data splitting.
59/// - Optimize file naming strategies.
60/// - Batch multiple operations efficiently.
61/// - Maintain file contiguity across periods.
62///
63/// # Examples
64///
65/// ```rust,no_run
66/// use nautilus_persistence::backend::catalog_operations::ConsolidationQuery;
67///
68/// // Regular consolidation query
69/// let query = ConsolidationQuery {
70///     query_start: 1609459200000000000,
71///     query_end: 1609545600000000000,
72///     use_period_boundaries: true,
73/// };
74///
75/// // Split operation to preserve data
76/// let split_query = ConsolidationQuery {
77///     query_start: 1609459200000000000,
78///     query_end: 1609462800000000000,
79///     use_period_boundaries: false,
80/// };
81/// ```
82#[derive(Debug, Clone)]
83pub struct ConsolidationQuery {
84    /// Start timestamp for the query range (inclusive, in nanoseconds)
85    pub query_start: u64,
86    /// End timestamp for the query range (inclusive, in nanoseconds)
87    pub query_end: u64,
88    /// Whether to use period boundaries for file naming (true) or actual data timestamps (false)
89    pub use_period_boundaries: bool,
90}
91
92/// Information about a deletion operation to be executed.
93///
94/// This struct encapsulates all the information needed to execute a single deletion
95/// operation, including the type of operation and file handling details.
96#[derive(Debug, Clone)]
97pub struct DeleteOperation {
98    /// Type of deletion operation ("remove", "`split_before`", "`split_after`").
99    pub operation_type: String,
100    /// List of files involved in this operation.
101    pub files: Vec<String>,
102    /// Start timestamp for data query (used for split operations).
103    pub query_start: u64,
104    /// End timestamp for data query (used for split operations).
105    pub query_end: u64,
106    /// Start timestamp for new file naming (used for split operations).
107    pub file_start_ns: u64,
108    /// End timestamp for new file naming (used for split operations).
109    pub file_end_ns: u64,
110}
111
112impl ParquetDataCatalog {
113    /// Consolidates all data files in the catalog.
114    ///
115    /// This method identifies all leaf directories in the catalog that contain parquet files
116    /// and consolidates them. A leaf directory is one that contains files but no subdirectories.
117    /// This is a convenience method that effectively calls `consolidate_data` for all data types
118    /// and instrument IDs in the catalog.
119    ///
120    /// # Parameters
121    ///
122    /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
123    ///   greater than or equal to this value will be consolidated. If None, all files
124    ///   from the beginning of time will be considered.
125    /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
126    ///   less than or equal to this value will be consolidated. If None, all files
127    ///   up to the end of time will be considered.
128    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
129    ///
130    /// # Returns
131    ///
132    /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if:
137    /// - Directory listing fails.
138    /// - File consolidation operations fail.
139    /// - Interval validation fails (when `ensure_contiguous_files` is true).
140    ///
141    /// # Examples
142    ///
143    /// ```rust,no_run
144    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
145    /// use nautilus_core::UnixNanos;
146    ///
147    /// let catalog = ParquetDataCatalog::new(/* ... */);
148    ///
149    /// // Consolidate all files in the catalog
150    /// catalog.consolidate_catalog(None, None, None, None)?;
151    ///
152    /// // Consolidate only files within a specific time range
153    /// catalog.consolidate_catalog(
154    ///     Some(UnixNanos::from(1609459200000000000)),
155    ///     Some(UnixNanos::from(1609545600000000000)),
156    ///     Some(true),
157    ///     None
158    /// )?;
159    /// # Ok::<(), anyhow::Error>(())
160    /// ```
161    pub fn consolidate_catalog(
162        &self,
163        start: Option<UnixNanos>,
164        end: Option<UnixNanos>,
165        ensure_contiguous_files: Option<bool>,
166        deduplicate: Option<bool>,
167    ) -> anyhow::Result<()> {
168        let leaf_directories = self.find_leaf_data_directories()?;
169
170        for directory in leaf_directories {
171            self.consolidate_directory(
172                &directory,
173                start,
174                end,
175                ensure_contiguous_files,
176                deduplicate,
177            )?;
178        }
179
180        Ok(())
181    }
182
183    /// Consolidates data files for a specific data type and identifier.
184    ///
185    /// This method consolidates Parquet files within a specific directory (defined by data type
186    /// and optional identifier) by merging multiple files into a single file. This improves
187    /// query performance and can reduce storage overhead.
188    ///
189    /// # Parameters
190    ///
191    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
192    /// - `identifier`: Optional identifier to target a specific instrument's data. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
193    /// - `start`: Optional start timestamp to limit consolidation to files within this range.
194    /// - `end`: Optional end timestamp to limit consolidation to files within this range.
195    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
196    ///
197    /// # Returns
198    ///
199    /// Returns `Ok(())` on success, or an error if consolidation fails.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if:
204    /// - The directory path cannot be constructed.
205    /// - File consolidation operations fail.
206    /// - Interval validation fails (when `ensure_contiguous_files` is true).
207    ///
208    /// # Examples
209    ///
210    /// ```rust,no_run
211    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
212    /// use nautilus_core::UnixNanos;
213    ///
214    /// let catalog = ParquetDataCatalog::new(/* ... */);
215    ///
216    /// // Consolidate all quote files for a specific instrument
217    /// catalog.consolidate_data(
218    ///     "quotes",
219    ///     Some("BTCUSD".to_string()),
220    ///     None,
221    ///     None,
222    ///     None,
223    ///     None
224    /// )?;
225    ///
226    /// // Consolidate trade files within a time range
227    /// catalog.consolidate_data(
228    ///     "trades",
229    ///     None,
230    ///     Some(UnixNanos::from(1609459200000000000)),
231    ///     Some(UnixNanos::from(1609545600000000000)),
232    ///     Some(true),
233    ///     None
234    /// )?;
235    /// # Ok::<(), anyhow::Error>(())
236    /// ```
237    pub fn consolidate_data(
238        &self,
239        type_name: &str,
240        identifier: Option<&str>,
241        start: Option<UnixNanos>,
242        end: Option<UnixNanos>,
243        ensure_contiguous_files: Option<bool>,
244        deduplicate: Option<bool>,
245    ) -> anyhow::Result<()> {
246        let directory = self.make_path(type_name, identifier)?;
247        self.consolidate_directory(&directory, start, end, ensure_contiguous_files, deduplicate)
248    }
249
250    /// Consolidates Parquet files within a specific directory by merging them into a single file.
251    ///
252    /// This internal method performs the actual consolidation work for a single directory.
253    /// It identifies files within the specified time range, validates their intervals,
254    /// and combines them into a single Parquet file with optimized storage.
255    ///
256    /// # Parameters
257    ///
258    /// - `directory`: The directory path containing Parquet files to consolidate.
259    /// - `start`: Optional start timestamp to limit consolidation to files within this range.
260    /// - `end`: Optional end timestamp to limit consolidation to files within this range.
261    /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous.
262    ///
263    /// # Returns
264    ///
265    /// Returns `Ok(())` on success, or an error if consolidation fails.
266    ///
267    /// # Behavior
268    ///
269    /// - Skips consolidation if directory contains 1 or fewer files.
270    /// - Filters files by timestamp range if start/end are specified.
271    /// - Sorts intervals by start timestamp before consolidation.
272    /// - Creates a new file spanning the entire time range of input files.
273    /// - Validates interval disjointness after consolidation (if enabled).
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if:
278    /// - Directory listing fails.
279    /// - File combination operations fail.
280    /// - Interval validation fails (when `ensure_contiguous_files` is true).
281    /// - Object store operations fail.
282    fn consolidate_directory(
283        &self,
284        directory: &str,
285        start: Option<UnixNanos>,
286        end: Option<UnixNanos>,
287        ensure_contiguous_files: Option<bool>,
288        deduplicate: Option<bool>,
289    ) -> anyhow::Result<()> {
290        let parquet_files = self.list_parquet_files(directory)?;
291
292        if parquet_files.len() <= 1 {
293            return Ok(());
294        }
295
296        let mut files_to_consolidate = Vec::new();
297        let mut intervals = Vec::new();
298        let start = start.map(|t| t.as_u64());
299        let end = end.map(|t| t.as_u64());
300
301        for file in parquet_files {
302            if let Some(interval) = parse_filename_timestamps(&file) {
303                let (interval_start, interval_end) = interval;
304                let include_file = match (start, end) {
305                    (Some(s), Some(e)) => interval_start >= s && interval_end <= e,
306                    (Some(s), None) => interval_start >= s,
307                    (None, Some(e)) => interval_end <= e,
308                    (None, None) => true,
309                };
310
311                if include_file {
312                    files_to_consolidate.push(file);
313                    intervals.push(interval);
314                }
315            }
316        }
317
318        intervals.sort_by_key(|&(start, _)| start);
319
320        if !intervals.is_empty() {
321            let file_name = timestamps_to_filename(
322                UnixNanos::from(intervals[0].0),
323                UnixNanos::from(intervals.last().unwrap().1),
324            );
325            let path = make_object_store_path(directory, &[&file_name]);
326
327            // Convert string paths to ObjectPath for the function call
328            let object_paths: Vec<ObjectPath> = files_to_consolidate
329                .iter()
330                .map(|path| ObjectPath::from(path.as_str()))
331                .collect();
332
333            self.execute_async(async {
334                combine_parquet_files_from_object_store(
335                    self.object_store.clone(),
336                    object_paths,
337                    &ObjectPath::from(path),
338                    Some(self.compression),
339                    Some(self.max_row_group_size),
340                    deduplicate,
341                )
342                .await
343            })?;
344        }
345
346        if ensure_contiguous_files.unwrap_or(true) && !are_intervals_disjoint(&intervals) {
347            anyhow::bail!("Intervals are not disjoint after consolidating a directory");
348        }
349
350        Ok(())
351    }
352
353    /// Consolidates all data files in the catalog by splitting them into fixed time periods.
354    ///
355    /// This method identifies all leaf directories in the catalog that contain parquet files
356    /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
357    /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
358    /// and instrument IDs in the catalog.
359    ///
360    /// # Parameters
361    ///
362    /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
363    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
364    /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
365    ///   greater than or equal to this value will be consolidated. If None, all files
366    ///   from the beginning of time will be considered.
367    /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
368    ///   less than or equal to this value will be consolidated. If None, all files
369    ///   up to the end of time will be considered.
370    /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
371    ///   If false, uses actual data timestamps for file naming.
372    ///
373    /// # Returns
374    ///
375    /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if:
380    /// - Directory listing fails.
381    /// - Data type extraction from path fails.
382    /// - Period-based consolidation operations fail.
383    ///
384    /// # Notes
385    ///
386    /// - This operation can be resource-intensive for large catalogs with many data types.
387    ///   and instruments.
388    /// - The consolidation process splits data into fixed time periods rather than combining.
389    ///   all files into a single file per directory.
390    /// - Uses the same period-based consolidation logic as `consolidate_data_by_period`.
391    /// - Original files are removed and replaced with period-based consolidated files.
392    /// - This method is useful for periodic maintenance of the catalog to standardize.
393    ///   file organization by time periods.
394    ///
395    /// # Examples
396    ///
397    /// ```rust,no_run
398    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
399    /// use nautilus_core::UnixNanos;
400    ///
401    /// let catalog = ParquetDataCatalog::new(/* ... */);
402    ///
403    /// // Consolidate all files in the catalog by 1-day periods
404    /// catalog.consolidate_catalog_by_period(
405    ///     Some(86400000000000), // 1 day in nanoseconds
406    ///     None,
407    ///     None,
408    ///     Some(true)
409    /// )?;
410    ///
411    /// // Consolidate only files within a specific time range by 1-hour periods
412    /// catalog.consolidate_catalog_by_period(
413    ///     Some(3600000000000), // 1 hour in nanoseconds
414    ///     Some(UnixNanos::from(1609459200000000000)),
415    ///     Some(UnixNanos::from(1609545600000000000)),
416    ///     Some(false)
417    /// )?;
418    /// # Ok::<(), anyhow::Error>(())
419    /// ```
420    pub fn consolidate_catalog_by_period(
421        &mut self,
422        period_nanos: Option<u64>,
423        start: Option<UnixNanos>,
424        end: Option<UnixNanos>,
425        ensure_contiguous_files: Option<bool>,
426    ) -> anyhow::Result<()> {
427        let leaf_directories = self.find_leaf_data_directories()?;
428
429        for directory in leaf_directories {
430            let (data_cls, identifier) =
431                self.extract_data_cls_and_identifier_from_path(&directory)?;
432
433            if let Some(data_cls_name) = data_cls {
434                let identifier_ref = identifier.as_deref();
435                // Use match statement to call the generic consolidate_data_by_period for various types
436                match data_cls_name.as_str() {
437                    "quotes" => {
438                        self.consolidate_data_by_period_generic::<QuoteTick>(
439                            identifier_ref,
440                            period_nanos,
441                            start,
442                            end,
443                            ensure_contiguous_files,
444                        )?;
445                    }
446                    "trades" => {
447                        self.consolidate_data_by_period_generic::<TradeTick>(
448                            identifier_ref,
449                            period_nanos,
450                            start,
451                            end,
452                            ensure_contiguous_files,
453                        )?;
454                    }
455                    "order_book_deltas" => {
456                        self.consolidate_data_by_period_generic::<OrderBookDelta>(
457                            identifier_ref,
458                            period_nanos,
459                            start,
460                            end,
461                            ensure_contiguous_files,
462                        )?;
463                    }
464                    "order_book_depths" => {
465                        self.consolidate_data_by_period_generic::<OrderBookDepth10>(
466                            identifier_ref,
467                            period_nanos,
468                            start,
469                            end,
470                            ensure_contiguous_files,
471                        )?;
472                    }
473                    "bars" => {
474                        self.consolidate_data_by_period_generic::<Bar>(
475                            identifier_ref,
476                            period_nanos,
477                            start,
478                            end,
479                            ensure_contiguous_files,
480                        )?;
481                    }
482                    "index_prices" => {
483                        self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
484                            identifier_ref,
485                            period_nanos,
486                            start,
487                            end,
488                            ensure_contiguous_files,
489                        )?;
490                    }
491                    "mark_prices" => {
492                        self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
493                            identifier_ref,
494                            period_nanos,
495                            start,
496                            end,
497                            ensure_contiguous_files,
498                        )?;
499                    }
500                    "instrument_closes" => {
501                        self.consolidate_data_by_period_generic::<InstrumentClose>(
502                            identifier_ref,
503                            period_nanos,
504                            start,
505                            end,
506                            ensure_contiguous_files,
507                        )?;
508                    }
509                    _ => {
510                        // Check if it's a custom data type (starts with "custom/")
511                        if data_cls_name.starts_with("custom/") {
512                            // Extract the custom type name (everything after "custom/")
513                            let custom_type_name = data_cls_name.strip_prefix("custom/").unwrap();
514                            self.consolidate_custom_data_by_period(
515                                custom_type_name,
516                                identifier_ref,
517                                period_nanos,
518                                start,
519                                end,
520                                ensure_contiguous_files,
521                            )?;
522                        } else {
523                            // Skip unknown data types
524                            log::warn!("Unknown data type for consolidation: {data_cls_name}");
525                        }
526                    }
527                }
528            }
529        }
530
531        Ok(())
532    }
533
534    /// Extracts data class and identifier from a directory path.
535    ///
536    /// This method parses a directory path to extract the data type and optional
537    /// instrument identifier. It's used to determine what type of data consolidation
538    /// to perform for each directory.
539    ///
540    /// # Parameters
541    ///
542    /// - `path`: The directory path to parse.
543    ///
544    /// # Returns
545    ///
546    /// Returns a tuple of (`data_class`, identifier) where both are optional strings.
547    pub fn extract_data_cls_and_identifier_from_path(
548        &self,
549        path: &str,
550    ) -> anyhow::Result<(Option<String>, Option<String>)> {
551        // Use cross-platform path parsing
552        let path_components = extract_path_components(path);
553
554        // Find the "data" directory in the path
555        if let Some(data_index) = path_components.iter().position(|part| part == "data")
556            && data_index + 1 < path_components.len()
557        {
558            let second = &path_components[data_index + 1];
559
560            // Custom data: data/custom/TypeName[/identifier segments...]
561            if *second == "custom" && data_index + 2 < path_components.len() {
562                let type_name = path_components[data_index + 2].clone();
563                let data_cls = format!("custom/{type_name}");
564                let identifier = if data_index + 3 < path_components.len() {
565                    Some(path_components[data_index + 3..].join("/"))
566                } else {
567                    None
568                };
569                return Ok((Some(data_cls), identifier));
570            }
571
572            let data_cls = second.clone();
573            let identifier = if data_index + 2 < path_components.len() {
574                Some(path_components[data_index + 2].clone())
575            } else {
576                None
577            };
578
579            return Ok((Some(data_cls), identifier));
580        }
581
582        // If we can't parse the path, return None for both
583        Ok((None, None))
584    }
585
586    /// Consolidates data files by splitting them into fixed time periods.
587    ///
588    /// This method queries data by period and writes consolidated files immediately,
589    /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
590    /// the function automatically splits those files to preserve all data.
591    ///
592    /// # Parameters
593    ///
594    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
595    /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments.
596    /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
597    ///   Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
598    /// - `start`: Optional start timestamp for consolidation range. If None, uses earliest available data.
599    ///   If specified and intersects existing files, those files will be split to preserve
600    ///   data outside the consolidation range.
601    /// - `end`: Optional end timestamp for consolidation range. If None, uses latest available data.
602    ///   If specified and intersects existing files, those files will be split to preserve
603    ///   data outside the consolidation range.
604    /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
605    ///   If false, uses actual data timestamps for file naming.
606    ///
607    /// # Returns
608    ///
609    /// Returns `Ok(())` on success, or an error if consolidation fails.
610    ///
611    /// # Errors
612    ///
613    /// Returns an error if:
614    /// - The directory path cannot be constructed.
615    /// - File operations fail.
616    /// - Data querying or writing fails.
617    ///
618    /// # Notes
619    ///
620    /// - Uses two-phase approach: first determines all queries, then executes them.
621    /// - Groups intervals into contiguous groups to preserve holes between groups.
622    /// - Allows consolidation across multiple files within each contiguous group.
623    /// - Skips queries if target files already exist for efficiency.
624    /// - Original files are removed immediately after querying each period.
625    /// - When `ensure_contiguous_files=false`, file timestamps match actual data range.
626    /// - When `ensure_contiguous_files=true`, file timestamps use period boundaries.
627    /// - Uses modulo arithmetic for efficient period boundary calculation.
628    /// - Preserves holes in data by preventing queries from spanning across gaps.
629    /// - Automatically splits files at start/end boundaries to preserve all data.
630    /// - Split operations are executed before consolidation to ensure data preservation.
631    ///
632    /// # Examples
633    ///
634    /// ```rust,no_run
635    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
636    /// use nautilus_core::UnixNanos;
637    ///
638    /// let catalog = ParquetDataCatalog::new(/* ... */);
639    ///
640    /// // Consolidate all quote files by 1-day periods
641    /// catalog.consolidate_data_by_period(
642    ///     "quotes",
643    ///     None,
644    ///     Some(86400000000000), // 1 day in nanoseconds
645    ///     None,
646    ///     None,
647    ///     Some(true)
648    /// )?;
649    ///
650    /// // Consolidate specific instrument by 1-hour periods
651    /// catalog.consolidate_data_by_period(
652    ///     "trades",
653    ///     Some("BTCUSD".to_string()),
654    ///     Some(3600000000000), // 1 hour in nanoseconds
655    ///     Some(UnixNanos::from(1609459200000000000)),
656    ///     Some(UnixNanos::from(1609545600000000000)),
657    ///     Some(false)
658    /// )?;
659    /// # Ok::<(), anyhow::Error>(())
660    /// ```
661    pub fn consolidate_data_by_period(
662        &mut self,
663        type_name: &str,
664        identifier: Option<&str>,
665        period_nanos: Option<u64>,
666        start: Option<UnixNanos>,
667        end: Option<UnixNanos>,
668        ensure_contiguous_files: Option<bool>,
669    ) -> anyhow::Result<()> {
670        // Use match statement to call the generic consolidate_data_by_period for various types
671        match type_name {
672            "quotes" => {
673                self.consolidate_data_by_period_generic::<QuoteTick>(
674                    identifier,
675                    period_nanos,
676                    start,
677                    end,
678                    ensure_contiguous_files,
679                )?;
680            }
681            "trades" => {
682                self.consolidate_data_by_period_generic::<TradeTick>(
683                    identifier,
684                    period_nanos,
685                    start,
686                    end,
687                    ensure_contiguous_files,
688                )?;
689            }
690            "order_book_deltas" => {
691                self.consolidate_data_by_period_generic::<OrderBookDelta>(
692                    identifier,
693                    period_nanos,
694                    start,
695                    end,
696                    ensure_contiguous_files,
697                )?;
698            }
699            "order_book_depths" => {
700                self.consolidate_data_by_period_generic::<OrderBookDepth10>(
701                    identifier,
702                    period_nanos,
703                    start,
704                    end,
705                    ensure_contiguous_files,
706                )?;
707            }
708            "bars" => {
709                self.consolidate_data_by_period_generic::<Bar>(
710                    identifier,
711                    period_nanos,
712                    start,
713                    end,
714                    ensure_contiguous_files,
715                )?;
716            }
717            "index_prices" => {
718                self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
719                    identifier,
720                    period_nanos,
721                    start,
722                    end,
723                    ensure_contiguous_files,
724                )?;
725            }
726            "mark_prices" => {
727                self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
728                    identifier,
729                    period_nanos,
730                    start,
731                    end,
732                    ensure_contiguous_files,
733                )?;
734            }
735            "instrument_closes" => {
736                self.consolidate_data_by_period_generic::<InstrumentClose>(
737                    identifier,
738                    period_nanos,
739                    start,
740                    end,
741                    ensure_contiguous_files,
742                )?;
743            }
744            _ => {
745                // Check if it's a custom data type (starts with "custom/")
746                if type_name.starts_with("custom/") {
747                    // Extract the custom type name (everything after "custom/")
748                    let custom_type_name = type_name.strip_prefix("custom/").unwrap();
749                    self.consolidate_custom_data_by_period(
750                        custom_type_name,
751                        identifier,
752                        period_nanos,
753                        start,
754                        end,
755                        ensure_contiguous_files,
756                    )?;
757                } else {
758                    anyhow::bail!("Unknown data type for consolidation: {type_name}");
759                }
760            }
761        }
762
763        Ok(())
764    }
765
766    /// Generic consolidate data files by splitting them into fixed time periods.
767    ///
768    /// This is a type-safe version of `consolidate_data_by_period` that uses generic types
769    /// to ensure compile-time correctness and enable reuse across different data types.
770    ///
771    /// # Type Parameters
772    ///
773    /// - `T`: The data type to consolidate, must implement required traits for serialization.
774    ///
775    /// # Parameters
776    ///
777    /// - `identifier`: Optional instrument ID to target a specific instrument's data.
778    /// - `period_nanos`: Optional period size in nanoseconds (default: 1 day).
779    /// - `start`: Optional start timestamp for consolidation range.
780    /// - `end`: Optional end timestamp for consolidation range.
781    /// - `ensure_contiguous_files`: Optional flag to control file naming strategy.
782    ///
783    /// # Returns
784    ///
785    /// Returns `Ok(())` on success, or an error if consolidation fails.
786    pub fn consolidate_data_by_period_generic<T>(
787        &mut self,
788        identifier: Option<&str>,
789        period_nanos: Option<u64>,
790        start: Option<UnixNanos>,
791        end: Option<UnixNanos>,
792        ensure_contiguous_files: Option<bool>,
793    ) -> anyhow::Result<()>
794    where
795        T: DecodeDataFromRecordBatch
796            + CatalogPathPrefix
797            + EncodeToRecordBatch
798            + HasTsInit
799            + TryFrom<Data>
800            + Clone,
801    {
802        let period_nanos = period_nanos.unwrap_or(86400000000000); // Default: 1 day
803        let ensure_contiguous_files = ensure_contiguous_files.unwrap_or(true);
804
805        // Use get_intervals for cleaner implementation
806        let intervals = self.get_intervals(T::path_prefix(), identifier)?;
807
808        if intervals.is_empty() {
809            return Ok(()); // No files to consolidate
810        }
811
812        // Use auxiliary function to prepare all queries for execution
813        let queries_to_execute = self.prepare_consolidation_queries(
814            T::path_prefix(),
815            identifier,
816            &intervals,
817            period_nanos,
818            start,
819            end,
820            ensure_contiguous_files,
821        )?;
822
823        if queries_to_execute.is_empty() {
824            return Ok(()); // No queries to execute
825        }
826
827        // Get directory for file operations
828        let directory = self.make_path(T::path_prefix(), identifier)?;
829        let mut existing_files = self.list_parquet_files(&directory)?;
830        existing_files.sort();
831
832        // Capture the overall window's left bound before the loop consumes queries_to_execute,
833        // a source file is only deleted when its interval is fully consumed by the consolidation.
834        let overall_query_start = queries_to_execute[0].query_start;
835
836        // Phase 2: Execute queries, write, and delete
837        let mut file_start_ns: Option<u64> = None; // Track contiguity across periods
838
839        for query_info in queries_to_execute {
840            // Query data for this period using query_typed_data
841            let instrument_ids = identifier.map(|id| vec![id.to_string()]);
842
843            // Use optimize_file_loading=false to match Python behavior:
844            // During consolidation, we want to read only the specific files being consolidated,
845            // not the entire directory. This ensures precise file control during consolidation.
846            let period_data = self.query_typed_data::<T>(
847                instrument_ids,
848                Some(UnixNanos::from(query_info.query_start)),
849                Some(UnixNanos::from(query_info.query_end)),
850                None,
851                Some(existing_files.clone()),
852                false, // optimize_file_loading=false for precise file control during consolidation
853            )?;
854
855            if period_data.is_empty() {
856                // Skip if no data found, but maintain contiguity by using query start
857                if file_start_ns.is_none() {
858                    file_start_ns = Some(query_info.query_start);
859                }
860                continue;
861            }
862
863            // Determine final file timestamps
864            let (final_start_ns, final_end_ns) = if query_info.use_period_boundaries {
865                // Use period boundaries for file naming, maintaining contiguity
866                if file_start_ns.is_none() {
867                    file_start_ns = Some(query_info.query_start);
868                }
869                let start = file_start_ns.unwrap();
870                (start, query_info.query_end)
871            } else {
872                // Use actual data timestamps for file naming
873                let first_ts = period_data.first().unwrap().ts_init().as_u64();
874                let last_ts = period_data.last().unwrap().ts_init().as_u64();
875                (first_ts, last_ts)
876            };
877
878            // Check again if target file exists (in case it was created during this process)
879            let target_filename = format!(
880                "{}/{}",
881                directory,
882                timestamps_to_filename(
883                    UnixNanos::from(final_start_ns),
884                    UnixNanos::from(final_end_ns)
885                )
886            );
887
888            if self.file_exists(&target_filename)? {
889                // Skip if target file already exists
890                continue;
891            }
892
893            // Write consolidated data for this period using write_to_parquet
894            // Use skip_disjoint_check since we're managing file removal carefully
895            let start_ts = UnixNanos::from(final_start_ns);
896            let end_ts = UnixNanos::from(final_end_ns);
897            self.write_to_parquet(period_data, Some(start_ts), Some(end_ts), Some(true))?;
898
899            // Delete files fully consumed by this period; keep straddlers so no data is lost
900            for file in existing_files.clone() {
901                if let Some(interval) = parse_filename_timestamps(&file)
902                    && interval.1 <= query_info.query_end
903                    && interval.0 >= overall_query_start
904                {
905                    existing_files.retain(|f| f != &file);
906                    self.delete_file(&file)?;
907                }
908            }
909
910            // Reset so next period starts a new contiguous segment
911            file_start_ns = None;
912        }
913
914        Ok(())
915    }
916
917    /// Consolidates custom data files by splitting them into fixed time periods.
918    ///
919    /// This method provides consolidation for custom data types that don't have compile-time
920    /// type information. It uses dynamic querying and writing methods.
921    ///
922    /// # Parameters
923    ///
924    /// - `type_name`: The custom data type name (without "custom/" prefix).
925    /// - `identifier`: Optional instrument ID to consolidate.
926    /// - `period_nanos`: Optional period size in nanoseconds (default: 1 day).
927    /// - `start`: Optional start timestamp for consolidation range.
928    /// - `end`: Optional end timestamp for consolidation range.
929    /// - `ensure_contiguous_files`: Optional flag to control file naming strategy.
930    ///
931    /// # Returns
932    ///
933    /// Returns `Ok(())` on success, or an error if consolidation fails.
934    fn consolidate_custom_data_by_period(
935        &mut self,
936        type_name: &str,
937        identifier: Option<&str>,
938        period_nanos: Option<u64>,
939        start: Option<UnixNanos>,
940        end: Option<UnixNanos>,
941        ensure_contiguous_files: Option<bool>,
942    ) -> anyhow::Result<()> {
943        let period_nanos = period_nanos.unwrap_or(86400000000000); // Default: 1 day
944        let ensure_contiguous_files = ensure_contiguous_files.unwrap_or(true);
945
946        // Get intervals for the custom data type
947        let path_prefix = format!("custom/{type_name}");
948        let intervals = self.get_intervals(&path_prefix, identifier)?;
949
950        if intervals.is_empty() {
951            return Ok(()); // No files to consolidate
952        }
953
954        // Use auxiliary function to prepare all queries for execution
955        let queries_to_execute = self.prepare_consolidation_queries(
956            &path_prefix,
957            identifier,
958            &intervals,
959            period_nanos,
960            start,
961            end,
962            ensure_contiguous_files,
963        )?;
964
965        if queries_to_execute.is_empty() {
966            return Ok(()); // No queries to execute
967        }
968
969        // Get directory for file operations
970        let directory = self.make_path(&path_prefix, identifier)?;
971        let mut existing_files = self.list_parquet_files(&directory)?;
972        existing_files.sort();
973
974        // Capture the overall window's left bound before the loop consumes queries_to_execute,
975        // a source file is only deleted when its interval is fully consumed by the consolidation.
976        let overall_query_start = queries_to_execute[0].query_start;
977
978        // Phase 2: Execute queries, write, and delete
979        let mut file_start_ns: Option<u64> = None; // Track contiguity across periods
980
981        for query_info in queries_to_execute {
982            // Query custom data for this period using query_custom_data_dynamic
983            let instrument_ids = identifier.map(|id| vec![id.to_string()]);
984
985            let period_data = self.query_custom_data_dynamic(
986                type_name,
987                instrument_ids.as_deref(),
988                Some(UnixNanos::from(query_info.query_start)),
989                Some(UnixNanos::from(query_info.query_end)),
990                None,
991                Some(existing_files.clone()),
992                false, // optimize_file_loading=false for precise file control during consolidation
993            )?;
994
995            if period_data.is_empty() {
996                // Skip if no data found, but maintain contiguity by using query start
997                if file_start_ns.is_none() {
998                    file_start_ns = Some(query_info.query_start);
999                }
1000                continue;
1001            }
1002
1003            // Determine final file timestamps
1004            let (final_start_ns, final_end_ns) = if query_info.use_period_boundaries {
1005                // Use period boundaries for file naming, maintaining contiguity
1006                if file_start_ns.is_none() {
1007                    file_start_ns = Some(query_info.query_start);
1008                }
1009                let start = file_start_ns.unwrap();
1010                (start, query_info.query_end)
1011            } else {
1012                // Use actual data timestamps for file naming
1013                let first_ts = period_data.first().unwrap().ts_init().as_u64();
1014                let last_ts = period_data.last().unwrap().ts_init().as_u64();
1015                (first_ts, last_ts)
1016            };
1017
1018            // Check again if target file exists (in case it was created during this process)
1019            let target_filename = format!(
1020                "{}/{}",
1021                directory,
1022                timestamps_to_filename(
1023                    UnixNanos::from(final_start_ns),
1024                    UnixNanos::from(final_end_ns)
1025                )
1026            );
1027
1028            if self.file_exists(&target_filename)? {
1029                // Skip if target file already exists
1030                continue;
1031            }
1032
1033            // Group custom data by type for writing
1034            let mut custom_data_by_type: AHashMap<String, Vec<CustomData>> = AHashMap::new();
1035
1036            for data in period_data {
1037                if let Data::Custom(c) = data {
1038                    let type_name_str = c.data.type_name().to_string();
1039                    custom_data_by_type
1040                        .entry(type_name_str)
1041                        .or_default()
1042                        .push(c);
1043                }
1044            }
1045
1046            // Write consolidated data for each type
1047            for (_, items) in custom_data_by_type {
1048                let start_ts = UnixNanos::from(final_start_ns);
1049                let end_ts = UnixNanos::from(final_end_ns);
1050                self.write_custom_data_batch(items, Some(start_ts), Some(end_ts), Some(true))?;
1051            }
1052
1053            // Delete files fully consumed by this period; keep straddlers so no data is lost
1054            for file in existing_files.clone() {
1055                if let Some(interval) = parse_filename_timestamps(&file)
1056                    && interval.1 <= query_info.query_end
1057                    && interval.0 >= overall_query_start
1058                {
1059                    existing_files.retain(|f| f != &file);
1060                    self.delete_file(&file)?;
1061                }
1062            }
1063
1064            // Reset so next period starts a new contiguous segment
1065            file_start_ns = None;
1066        }
1067
1068        Ok(())
1069    }
1070
1071    /// Deletes custom data within a specified time range.
1072    ///
1073    /// This method provides deletion for custom data types that don't have compile-time
1074    /// type information. It uses dynamic querying and writing methods.
1075    ///
1076    /// # Parameters
1077    ///
1078    /// - `type_name`: The custom data type name (without "custom/" prefix).
1079    /// - `identifier`: Optional instrument ID to delete data for.
1080    /// - `start`: Optional start timestamp for the deletion range.
1081    /// - `end`: Optional end timestamp for the deletion range.
1082    ///
1083    /// # Returns
1084    ///
1085    /// Returns `Ok(())` on success, or an error if deletion fails.
1086    fn delete_custom_data_range(
1087        &mut self,
1088        type_name: &str,
1089        identifier: Option<&str>,
1090        start: Option<UnixNanos>,
1091        end: Option<UnixNanos>,
1092    ) -> anyhow::Result<()> {
1093        let path_prefix = format!("custom/{type_name}");
1094
1095        // Get intervals for the custom data type
1096        let intervals = self.get_intervals(&path_prefix, identifier)?;
1097
1098        if intervals.is_empty() {
1099            return Ok(()); // No files to process
1100        }
1101
1102        // Prepare all operations for execution
1103        let operations_to_execute =
1104            self.prepare_delete_operations(&path_prefix, identifier, &intervals, start, end)?;
1105
1106        if operations_to_execute.is_empty() {
1107            return Ok(()); // No operations to execute
1108        }
1109
1110        // Execute all operations
1111        let mut files_to_remove = AHashSet::<String>::new();
1112
1113        for operation in operations_to_execute {
1114            // Reset the session before each operation
1115            self.reset_session();
1116
1117            match operation.operation_type.as_str() {
1118                "split_before" => {
1119                    // Query custom data before the deletion range and write it
1120                    let instrument_ids = identifier.map(|id| vec![id.to_string()]);
1121                    let before_data = self.query_custom_data_dynamic(
1122                        type_name,
1123                        instrument_ids.as_deref(),
1124                        Some(UnixNanos::from(operation.query_start)),
1125                        Some(UnixNanos::from(operation.query_end)),
1126                        None,
1127                        Some(operation.files.clone()),
1128                        false,
1129                    )?;
1130
1131                    if !before_data.is_empty() {
1132                        // Group custom data by type for writing
1133                        use ahash::AHashMap;
1134                        let mut custom_data_by_type: AHashMap<String, Vec<CustomData>> =
1135                            AHashMap::new();
1136
1137                        for data in before_data {
1138                            if let Data::Custom(c) = data {
1139                                let type_name_str = c.data.type_name().to_string();
1140                                custom_data_by_type
1141                                    .entry(type_name_str)
1142                                    .or_default()
1143                                    .push(c);
1144                            }
1145                        }
1146
1147                        // Write data for each type
1148                        for (_, items) in custom_data_by_type {
1149                            let start_ts = UnixNanos::from(operation.file_start_ns);
1150                            let end_ts = UnixNanos::from(operation.file_end_ns);
1151                            self.write_custom_data_batch(
1152                                items,
1153                                Some(start_ts),
1154                                Some(end_ts),
1155                                Some(true),
1156                            )?;
1157                        }
1158                    }
1159                }
1160                "split_after" => {
1161                    // Query custom data after the deletion range and write it
1162                    let instrument_ids = identifier.map(|id| vec![id.to_string()]);
1163                    let after_data = self.query_custom_data_dynamic(
1164                        type_name,
1165                        instrument_ids.as_deref(),
1166                        Some(UnixNanos::from(operation.query_start)),
1167                        Some(UnixNanos::from(operation.query_end)),
1168                        None,
1169                        Some(operation.files.clone()),
1170                        false,
1171                    )?;
1172
1173                    if !after_data.is_empty() {
1174                        // Group custom data by type for writing
1175                        use ahash::AHashMap;
1176                        let mut custom_data_by_type: AHashMap<String, Vec<CustomData>> =
1177                            AHashMap::new();
1178
1179                        for data in after_data {
1180                            if let Data::Custom(c) = data {
1181                                let type_name_str = c.data.type_name().to_string();
1182                                custom_data_by_type
1183                                    .entry(type_name_str)
1184                                    .or_default()
1185                                    .push(c);
1186                            }
1187                        }
1188
1189                        // Write data for each type
1190                        for (_, items) in custom_data_by_type {
1191                            let start_ts = UnixNanos::from(operation.file_start_ns);
1192                            let end_ts = UnixNanos::from(operation.file_end_ns);
1193                            self.write_custom_data_batch(
1194                                items,
1195                                Some(start_ts),
1196                                Some(end_ts),
1197                                Some(true),
1198                            )?;
1199                        }
1200                    }
1201                }
1202                _ => {
1203                    // For "remove" operations, just mark files for removal
1204                }
1205            }
1206
1207            // Mark files for removal (applies to all operation types)
1208            for file in operation.files {
1209                files_to_remove.insert(file);
1210            }
1211        }
1212
1213        // Remove all files that were processed
1214        for file in files_to_remove {
1215            if let Err(e) = self.delete_file(&file) {
1216                log::warn!("Failed to delete file {file}: {e}");
1217            }
1218        }
1219
1220        Ok(())
1221    }
1222
1223    /// Prepares all queries for consolidation by filtering, grouping, and handling splits.
1224    ///
1225    /// This auxiliary function handles all the preparation logic for consolidation:
1226    /// 1. Filters intervals by time range.
1227    /// 2. Groups intervals into contiguous groups.
1228    /// 3. Identifies and creates split operations for data preservation.
1229    /// 4. Generates period-based consolidation queries.
1230    /// 5. Checks for existing target files.
1231    #[expect(clippy::too_many_arguments)]
1232    pub fn prepare_consolidation_queries(
1233        &self,
1234        type_name: &str,
1235        identifier: Option<&str>,
1236        intervals: &[(u64, u64)],
1237        period_nanos: u64,
1238        start: Option<UnixNanos>,
1239        end: Option<UnixNanos>,
1240        ensure_contiguous_files: bool,
1241    ) -> anyhow::Result<Vec<ConsolidationQuery>> {
1242        // Filter intervals by time range if specified
1243        let used_start = start.map(|s| s.as_u64());
1244        let used_end = end.map(|e| e.as_u64());
1245
1246        let mut filtered_intervals = Vec::new();
1247
1248        for &(interval_start, interval_end) in intervals {
1249            // Check if interval overlaps with the specified range
1250            if (used_start.is_none() || used_start.unwrap() <= interval_end)
1251                && (used_end.is_none() || interval_start <= used_end.unwrap())
1252            {
1253                filtered_intervals.push((interval_start, interval_end));
1254            }
1255        }
1256
1257        if filtered_intervals.is_empty() {
1258            return Ok(Vec::new()); // No intervals in the specified range
1259        }
1260
1261        // Check contiguity of filtered intervals if required
1262        if ensure_contiguous_files && !are_intervals_contiguous(&filtered_intervals) {
1263            anyhow::bail!(
1264                "Intervals are not contiguous. When ensure_contiguous_files=true, \
1265                 all files in the consolidation range must have contiguous timestamps."
1266            );
1267        }
1268
1269        // Group intervals by the target period: split only when the gap between files
1270        // exceeds one period, since sub-period gaps land in the same consolidated file.
1271        let contiguous_groups = self.group_contiguous_intervals(&filtered_intervals, period_nanos);
1272
1273        let mut queries_to_execute = Vec::new();
1274
1275        // Handle interval splitting by creating split operations for data preservation
1276        if !filtered_intervals.is_empty() {
1277            if let Some(start_ts) = used_start {
1278                let first_interval = filtered_intervals[0];
1279                if first_interval.0 < start_ts && start_ts <= first_interval.1 {
1280                    // Split before start: preserve data from interval_start to start-1
1281                    queries_to_execute.push(ConsolidationQuery {
1282                        query_start: first_interval.0,
1283                        query_end: start_ts - 1,
1284                        use_period_boundaries: false,
1285                    });
1286                }
1287            }
1288
1289            if let Some(end_ts) = used_end {
1290                let last_interval = filtered_intervals[filtered_intervals.len() - 1];
1291                if last_interval.0 <= end_ts && end_ts < last_interval.1 {
1292                    // Split after end: preserve data from end+1 to interval_end
1293                    queries_to_execute.push(ConsolidationQuery {
1294                        query_start: end_ts + 1,
1295                        query_end: last_interval.1,
1296                        use_period_boundaries: false,
1297                    });
1298                }
1299            }
1300        }
1301
1302        // Generate period-based consolidation queries for each contiguous group
1303        for group in contiguous_groups {
1304            let group_start = group[0].0;
1305            let group_end = group[group.len() - 1].1;
1306
1307            // Apply start/end filtering to the group
1308            let effective_start = used_start.map_or(group_start, |s| s.max(group_start));
1309            let effective_end = used_end.map_or(group_end, |e| e.min(group_end));
1310
1311            if effective_start > effective_end {
1312                continue; // Skip if no overlap
1313            }
1314
1315            // Generate period-based queries within this contiguous group
1316            let mut current_start_ns = (effective_start / period_nanos) * period_nanos;
1317
1318            // Add safety check to prevent infinite loops (match Python logic)
1319            let max_iterations = 10000;
1320            let mut iteration_count = 0;
1321
1322            while current_start_ns <= effective_end {
1323                iteration_count += 1;
1324                if iteration_count > max_iterations {
1325                    // Safety break to prevent infinite loops
1326                    break;
1327                }
1328                let current_end_ns = (current_start_ns + period_nanos - 1).min(effective_end);
1329
1330                // Check if target file already exists (only when ensure_contiguous_files is true)
1331                if ensure_contiguous_files {
1332                    let directory = self.make_path(type_name, identifier)?;
1333                    let target_filename = format!(
1334                        "{}/{}",
1335                        directory,
1336                        timestamps_to_filename(
1337                            UnixNanos::from(current_start_ns),
1338                            UnixNanos::from(current_end_ns)
1339                        )
1340                    );
1341
1342                    if self.file_exists(&target_filename)? {
1343                        // Skip if target file already exists
1344                        current_start_ns += period_nanos;
1345                        continue;
1346                    }
1347                }
1348
1349                // Add query to execution list
1350                queries_to_execute.push(ConsolidationQuery {
1351                    query_start: current_start_ns,
1352                    query_end: current_end_ns,
1353                    use_period_boundaries: ensure_contiguous_files,
1354                });
1355
1356                // Move to next period
1357                current_start_ns += period_nanos;
1358
1359                if current_start_ns > effective_end {
1360                    break;
1361                }
1362            }
1363        }
1364
1365        // Sort queries by start date to enable efficient file removal
1366        // Files can be removed when interval[1] <= query_info["query_end"]
1367        // and processing in chronological order ensures optimal cleanup
1368        queries_to_execute.sort_by_key(|q| q.query_start);
1369
1370        Ok(queries_to_execute)
1371    }
1372
1373    /// Groups intervals for period-based consolidation.
1374    ///
1375    /// Groups adjacent intervals into the same bucket unless the gap between them exceeds
1376    /// `period_nanos`. Sub-period gaps land in the same consolidated file anyway, so they
1377    /// do not warrant a split. Gaps larger than one period represent genuine data holes.
1378    ///
1379    /// # Parameters
1380    ///
1381    /// - `intervals`: A slice of timestamp intervals as (start, end) tuples, sorted by start.
1382    /// - `period_nanos`: The target consolidation period; gaps larger than this split groups.
1383    ///
1384    /// # Returns
1385    ///
1386    /// Returns a vector of groups. Returns an empty vector if the input is empty.
1387    ///
1388    /// # Examples
1389    ///
1390    /// ```text
1391    /// Legacy chunked files with period=86_400_000_000_000 (1 day):
1392    ///   [(1,5), (6,10), (11,15)] -> [[(1,5), (6,10), (11,15)]]
1393    ///
1394    /// Small period=1 with mixed gaps:
1395    ///   [(1,5), (8,10), (12,15)] -> [[(1,5)], [(8,10)], [(12,15)]]
1396    /// ```
1397    #[must_use]
1398    pub fn group_contiguous_intervals(
1399        &self,
1400        intervals: &[(u64, u64)],
1401        period_nanos: u64,
1402    ) -> Vec<Vec<(u64, u64)>> {
1403        if intervals.is_empty() {
1404            return Vec::new();
1405        }
1406
1407        // Split groups only when the gap between files exceeds one period,
1408        // since sub-period gaps land in the same consolidated file anyway.
1409        // This works for both legacy chunked files (gap ~1ns) and fragment-per-flush
1410        // catalogs (gap ~bar interval) without inferring spacing from the data.
1411        let mut contiguous_groups = Vec::new();
1412        let mut current_group = vec![intervals[0]];
1413
1414        for i in 1..intervals.len() {
1415            let prev_end = intervals[i - 1].1;
1416            let curr_start = intervals[i].0;
1417
1418            if curr_start.saturating_sub(prev_end) > period_nanos {
1419                contiguous_groups.push(current_group);
1420                current_group = vec![intervals[i]];
1421            } else {
1422                current_group.push(intervals[i]);
1423            }
1424        }
1425
1426        contiguous_groups.push(current_group);
1427
1428        contiguous_groups
1429    }
1430
1431    /// Checks if a file exists in the object store.
1432    ///
1433    /// This method performs a HEAD operation on the object store to determine if a file
1434    /// exists without downloading its content. It works with both local and remote object stores.
1435    ///
1436    /// # Parameters
1437    ///
1438    /// - `path`: The file path to check, relative to the catalog structure.
1439    ///
1440    /// # Returns
1441    ///
1442    /// Returns `true` if the file exists, `false` if it doesn't exist.
1443    ///
1444    /// # Errors
1445    ///
1446    /// Returns an error if the object store operation fails due to network issues,
1447    /// authentication problems, or other I/O errors.
1448    fn file_exists(&self, path: &str) -> anyhow::Result<bool> {
1449        let object_path = self.to_object_path(path)?;
1450        let exists = self.execute_async(async {
1451            let result: bool = self.object_store.head(&object_path).await.is_ok();
1452            Ok(result)
1453        })?;
1454        Ok(exists)
1455    }
1456
1457    /// Deletes a file from the object store.
1458    ///
1459    /// This method removes a file from the object store. The operation is permanent
1460    /// and cannot be undone. It works with both local filesystems and remote object stores.
1461    ///
1462    /// # Parameters
1463    ///
1464    /// - `path`: The file path to delete, relative to the catalog structure.
1465    ///
1466    /// # Returns
1467    ///
1468    /// Returns `Ok(())` on successful deletion.
1469    ///
1470    /// # Errors
1471    ///
1472    /// Returns an error if:
1473    /// - The file doesn't exist.
1474    /// - Permission is denied.
1475    /// - Network issues occur (for remote stores).
1476    /// - The object store operation fails.
1477    ///
1478    /// # Safety
1479    ///
1480    /// This operation is irreversible. Ensure the file is no longer needed before deletion.
1481    fn delete_file(&self, path: &str) -> anyhow::Result<()> {
1482        let object_path = self.to_object_path(path)?;
1483        self.execute_async(async {
1484            self.object_store
1485                .delete(&object_path)
1486                .await
1487                .map_err(anyhow::Error::from)
1488        })?;
1489        Ok(())
1490    }
1491
1492    /// Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
1493    ///
1494    /// This method scans all leaf data directories in the catalog and renames files based on
1495    /// the actual timestamp range of their content. This is useful when files have been
1496    /// modified or when filename conventions have changed.
1497    ///
1498    /// # Returns
1499    ///
1500    /// Returns `Ok(())` on success, or an error if the operation fails.
1501    ///
1502    /// # Errors
1503    ///
1504    /// Returns an error if:
1505    /// - Directory listing fails.
1506    /// - File metadata reading fails.
1507    /// - File rename operations fail.
1508    /// - Interval validation fails after renaming.
1509    ///
1510    /// # Examples
1511    ///
1512    /// ```rust,no_run
1513    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1514    ///
1515    /// let catalog = ParquetDataCatalog::new(/* ... */);
1516    ///
1517    /// // Reset all filenames in the catalog
1518    /// catalog.reset_all_file_names()?;
1519    /// # Ok::<(), anyhow::Error>(())
1520    /// ```
1521    pub fn reset_all_file_names(&self) -> anyhow::Result<()> {
1522        let leaf_directories = self.find_leaf_data_directories()?;
1523
1524        for directory in leaf_directories {
1525            self.reset_file_names(&directory)?;
1526        }
1527
1528        Ok(())
1529    }
1530
1531    /// Resets the filenames of Parquet files for a specific data type and identifier.
1532    ///
1533    /// This method renames files in a specific directory based on the actual timestamp
1534    /// range of their content. This is useful for correcting filenames after data
1535    /// modifications or when filename conventions have changed.
1536    ///
1537    /// # Parameters
1538    ///
1539    /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1540    /// - `identifier`: Optional identifier to target a specific instrument's data. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1541    ///
1542    /// # Returns
1543    ///
1544    /// Returns `Ok(())` on success, or an error if the operation fails.
1545    ///
1546    /// # Errors
1547    ///
1548    /// Returns an error if:
1549    /// - The directory path cannot be constructed.
1550    /// - File metadata reading fails.
1551    /// - File rename operations fail.
1552    /// - Interval validation fails after renaming.
1553    ///
1554    /// # Examples
1555    ///
1556    /// ```rust,no_run
1557    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1558    ///
1559    /// let catalog = ParquetDataCatalog::new(/* ... */);
1560    ///
1561    /// // Reset filenames for all quote files
1562    /// catalog.reset_data_file_names("quotes", None)?;
1563    ///
1564    /// // Reset filenames for a specific instrument's trade files
1565    /// catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
1566    /// # Ok::<(), anyhow::Error>(())
1567    /// ```
1568    pub fn reset_data_file_names(
1569        &self,
1570        data_cls: &str,
1571        identifier: Option<&str>,
1572    ) -> anyhow::Result<()> {
1573        let directory = self.make_path(data_cls, identifier)?;
1574        self.reset_file_names(&directory)
1575    }
1576
1577    /// Resets the filenames of Parquet files in a directory to match their actual content timestamps.
1578    ///
1579    /// This internal method scans all Parquet files in a directory, reads their metadata to
1580    /// determine the actual timestamp range of their content, and renames the files accordingly.
1581    /// This ensures that filenames accurately reflect the data they contain.
1582    ///
1583    /// # Parameters
1584    ///
1585    /// - `directory`: The directory path containing Parquet files to rename.
1586    ///
1587    /// # Returns
1588    ///
1589    /// Returns `Ok(())` on success, or an error if the operation fails.
1590    ///
1591    /// # Process
1592    ///
1593    /// 1. Lists all Parquet files in the directory
1594    /// 2. For each file, reads metadata to extract min/max timestamps
1595    /// 3. Generates a new filename based on actual timestamp range
1596    /// 4. Moves the file to the new name using object store operations
1597    /// 5. Validates that intervals remain disjoint after renaming
1598    ///
1599    /// # Errors
1600    ///
1601    /// Returns an error if:
1602    /// - Directory listing fails.
1603    /// - Metadata reading fails for any file.
1604    /// - File move operations fail.
1605    /// - Interval validation fails after renaming.
1606    /// - Object store operations fail.
1607    ///
1608    /// # Notes
1609    ///
1610    /// - This operation can be time-consuming for directories with many files.
1611    /// - Files are processed sequentially to avoid conflicts.
1612    /// - The operation is atomic per file but not across the entire directory.
1613    fn reset_file_names(&self, directory: &str) -> anyhow::Result<()> {
1614        let parquet_files = self.list_parquet_files(directory)?;
1615
1616        for file in parquet_files {
1617            let object_path = ObjectPath::from(file.as_str());
1618            let (first_ts, last_ts) = self.execute_async(async {
1619                min_max_from_parquet_metadata_object_store(
1620                    self.object_store.clone(),
1621                    &object_path,
1622                    "ts_init",
1623                )
1624                .await
1625            })?;
1626
1627            let new_filename =
1628                timestamps_to_filename(UnixNanos::from(first_ts), UnixNanos::from(last_ts));
1629            let new_file_path = make_object_store_path(directory, &[&new_filename]);
1630            let new_object_path = ObjectPath::from(new_file_path);
1631
1632            self.move_file(&object_path, &new_object_path)?;
1633        }
1634
1635        let intervals = self.get_directory_intervals(directory)?;
1636
1637        if !are_intervals_disjoint(&intervals) {
1638            anyhow::bail!("Intervals are not disjoint after resetting file names");
1639        }
1640
1641        Ok(())
1642    }
1643
1644    /// Finds all leaf data directories in the catalog.
1645    ///
1646    /// A leaf directory is one that contains data files but no subdirectories.
1647    /// This method is used to identify directories that can be processed for
1648    /// consolidation or other operations.
1649    ///
1650    /// # Returns
1651    ///
1652    /// Returns a vector of directory path strings representing leaf directories,
1653    /// or an error if directory traversal fails.
1654    ///
1655    /// # Errors
1656    ///
1657    /// Returns an error if:
1658    /// - Object store listing operations fail.
1659    /// - Directory structure cannot be analyzed.
1660    ///
1661    /// # Examples
1662    ///
1663    /// ```rust,no_run
1664    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1665    ///
1666    /// let catalog = ParquetDataCatalog::new(/* ... */);
1667    ///
1668    /// let leaf_dirs = catalog.find_leaf_data_directories()?;
1669    /// for dir in leaf_dirs {
1670    ///     println!("Found leaf directory: {}", dir);
1671    /// }
1672    /// # Ok::<(), anyhow::Error>(())
1673    /// ```
1674    pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<String>> {
1675        let data_dir = make_object_store_path(&self.base_path, &["data"]);
1676
1677        let leaf_dirs = self.execute_async(async {
1678            let mut all_paths = AHashSet::new();
1679            let mut directories = AHashSet::new();
1680            let mut files_in_dirs = AHashMap::new();
1681
1682            // List all objects under the data directory
1683            let prefix = ObjectPath::from(format!("{data_dir}/"));
1684            let mut stream = self.object_store.list(Some(&prefix));
1685
1686            while let Some(object) = stream.next().await {
1687                let object = object?;
1688                let path_str = object.location.to_string();
1689                all_paths.insert(path_str.clone());
1690
1691                // Extract directory path
1692                if let Some(parent) = std::path::Path::new(&path_str).parent() {
1693                    let parent_str = parent.to_string_lossy().to_string();
1694                    directories.insert(parent_str.clone());
1695
1696                    // Track files in each directory
1697                    files_in_dirs
1698                        .entry(parent_str)
1699                        .or_insert_with(Vec::new)
1700                        .push(path_str);
1701                }
1702            }
1703
1704            // Find leaf directories (directories with files but no subdirectories)
1705            let mut leaf_dirs = Vec::new();
1706
1707            for dir in &directories {
1708                let has_files = files_in_dirs
1709                    .get(dir)
1710                    .is_some_and(|files| !files.is_empty());
1711                let has_subdirs = directories
1712                    .iter()
1713                    .any(|d| d.starts_with(&make_object_store_path(dir, &[""])) && d != dir);
1714
1715                if has_files && !has_subdirs {
1716                    leaf_dirs.push(dir.clone());
1717                }
1718            }
1719
1720            Ok::<Vec<String>, anyhow::Error>(leaf_dirs)
1721        })?;
1722
1723        Ok(leaf_dirs)
1724    }
1725
1726    /// Deletes data within a specified time range for a specific data type and identifier.
1727    ///
1728    /// This method identifies all parquet files that intersect with the specified time range
1729    /// and handles them appropriately:
1730    /// - Files completely within the range are deleted
1731    /// - Files partially overlapping the range are split to preserve data outside the range
1732    /// - The original intersecting files are removed after processing
1733    ///
1734    /// # Parameters
1735    ///
1736    /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1737    /// - `identifier`: Optional identifier to delete data for. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL"). If None, deletes data across all identifiers.
1738    /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1739    /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1740    ///
1741    /// # Returns
1742    ///
1743    /// Returns `Ok(())` on success, or an error if deletion fails.
1744    ///
1745    /// # Errors
1746    ///
1747    /// Returns an error if:
1748    /// - The directory path cannot be constructed.
1749    /// - File operations fail.
1750    /// - Data querying or writing fails.
1751    ///
1752    /// # Notes
1753    ///
1754    /// - This operation permanently removes data and cannot be undone.
1755    /// - Files that partially overlap the deletion range are split to preserve data outside the range.
1756    /// - The method ensures data integrity by using atomic operations where possible.
1757    /// - Empty directories are not automatically removed after deletion.
1758    ///
1759    /// # Examples
1760    ///
1761    /// ```rust,no_run
1762    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1763    /// use nautilus_core::UnixNanos;
1764    ///
1765    /// let catalog = ParquetDataCatalog::new(/* ... */);
1766    ///
1767    /// // Delete all quote data for a specific instrument
1768    /// catalog.delete_data_range(
1769    ///     "quotes",
1770    ///     Some("BTCUSD".to_string()),
1771    ///     None,
1772    ///     None
1773    /// )?;
1774    ///
1775    /// // Delete trade data within a specific time range
1776    /// catalog.delete_data_range(
1777    ///     "trades",
1778    ///     None,
1779    ///     Some(UnixNanos::from(1609459200000000000)),
1780    ///     Some(UnixNanos::from(1609545600000000000))
1781    /// )?;
1782    /// # Ok::<(), anyhow::Error>(())
1783    /// ```
1784    pub fn delete_data_range(
1785        &mut self,
1786        type_name: &str,
1787        identifier: Option<&str>,
1788        start: Option<UnixNanos>,
1789        end: Option<UnixNanos>,
1790    ) -> anyhow::Result<()> {
1791        // Use match statement to call the generic delete_data_range for various types
1792        match type_name {
1793            "quotes" => self.delete_data_range_generic::<QuoteTick>(identifier, start, end),
1794            "trades" => self.delete_data_range_generic::<TradeTick>(identifier, start, end),
1795            "bars" => self.delete_data_range_generic::<Bar>(identifier, start, end),
1796            "order_book_deltas" => {
1797                self.delete_data_range_generic::<OrderBookDelta>(identifier, start, end)
1798            }
1799            "order_book_depth10" => {
1800                self.delete_data_range_generic::<OrderBookDepth10>(identifier, start, end)
1801            }
1802            _ => {
1803                // Check if it's a custom data type (starts with "custom/")
1804                if type_name.starts_with("custom/") {
1805                    // Extract the custom type name (everything after "custom/")
1806                    let custom_type_name = type_name.strip_prefix("custom/").unwrap();
1807                    self.delete_custom_data_range(custom_type_name, identifier, start, end)
1808                } else {
1809                    anyhow::bail!("Unsupported data type: {type_name}");
1810                }
1811            }
1812        }
1813    }
1814
1815    /// Deletes data within a specified time range across the entire catalog.
1816    ///
1817    /// This method identifies all leaf directories in the catalog that contain parquet files
1818    /// and deletes data within the specified time range from each directory. A leaf directory
1819    /// is one that contains files but no subdirectories. This is a convenience method that
1820    /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
1821    ///
1822    /// # Parameters
1823    ///
1824    /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1825    /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1826    ///
1827    /// # Returns
1828    ///
1829    /// Returns `Ok(())` on success, or an error if deletion fails.
1830    ///
1831    /// # Errors
1832    ///
1833    /// Returns an error if:
1834    /// - Directory traversal fails.
1835    /// - Data class extraction from paths fails.
1836    /// - Individual delete operations fail.
1837    ///
1838    /// # Notes
1839    ///
1840    /// - This operation permanently removes data and cannot be undone.
1841    /// - The deletion process handles file intersections intelligently by splitting files
1842    ///   when they partially overlap with the deletion range.
1843    /// - Files completely within the deletion range are removed entirely.
1844    /// - Files partially overlapping the deletion range are split to preserve data outside the range.
1845    /// - This method is useful for bulk data cleanup operations across the entire catalog.
1846    /// - Empty directories are not automatically removed after deletion.
1847    ///
1848    /// # Examples
1849    ///
1850    /// ```rust,no_run
1851    /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1852    /// use nautilus_core::UnixNanos;
1853    ///
1854    /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1855    ///
1856    /// // Delete all data before a specific date across entire catalog
1857    /// catalog.delete_catalog_range(
1858    ///     None,
1859    ///     Some(UnixNanos::from(1609459200000000000))
1860    /// )?;
1861    ///
1862    /// // Delete all data within a specific range across entire catalog
1863    /// catalog.delete_catalog_range(
1864    ///     Some(UnixNanos::from(1609459200000000000)),
1865    ///     Some(UnixNanos::from(1609545600000000000))
1866    /// )?;
1867    ///
1868    /// // Delete all data after a specific date across entire catalog
1869    /// catalog.delete_catalog_range(
1870    ///     Some(UnixNanos::from(1609459200000000000)),
1871    ///     None
1872    /// )?;
1873    /// # Ok::<(), anyhow::Error>(())
1874    /// ```
1875    pub fn delete_catalog_range(
1876        &mut self,
1877        start: Option<UnixNanos>,
1878        end: Option<UnixNanos>,
1879    ) -> anyhow::Result<()> {
1880        let leaf_directories = self.find_leaf_data_directories()?;
1881
1882        for directory in leaf_directories {
1883            if let Ok((Some(data_type), identifier)) =
1884                self.extract_data_cls_and_identifier_from_path(&directory)
1885            {
1886                // Call the existing delete_data_range method
1887                if let Err(e) =
1888                    self.delete_data_range(&data_type, identifier.as_deref(), start, end)
1889                {
1890                    log::warn!("Failed to delete data in directory {directory}: {e}");
1891                    // Continue with other directories instead of failing completely
1892                }
1893            }
1894        }
1895
1896        Ok(())
1897    }
1898
1899    /// Generic implementation for deleting data within a specified time range.
1900    ///
1901    /// This method provides the core deletion logic that works with any data type
1902    /// that implements the required traits. It handles file intersection analysis,
1903    /// data splitting for partial overlaps, and file cleanup.
1904    ///
1905    /// # Type Parameters
1906    ///
1907    /// - `T`: The data type that implements required traits for catalog operations.
1908    ///
1909    /// # Parameters
1910    ///
1911    /// - `identifier`: Optional instrument ID to delete data for.
1912    /// - `start`: Optional start timestamp for the deletion range.
1913    /// - `end`: Optional end timestamp for the deletion range.
1914    ///
1915    /// # Returns
1916    ///
1917    /// Returns `Ok(())` on success, or an error if deletion fails.
1918    pub fn delete_data_range_generic<T>(
1919        &mut self,
1920        identifier: Option<&str>,
1921        start: Option<UnixNanos>,
1922        end: Option<UnixNanos>,
1923    ) -> anyhow::Result<()>
1924    where
1925        T: DecodeDataFromRecordBatch
1926            + CatalogPathPrefix
1927            + EncodeToRecordBatch
1928            + HasTsInit
1929            + TryFrom<Data>
1930            + Clone,
1931    {
1932        // Get intervals for cleaner implementation
1933        let intervals = self.get_intervals(T::path_prefix(), identifier)?;
1934
1935        if intervals.is_empty() {
1936            return Ok(()); // No files to process
1937        }
1938
1939        // Prepare all operations for execution
1940        let operations_to_execute =
1941            self.prepare_delete_operations(T::path_prefix(), identifier, &intervals, start, end)?;
1942
1943        if operations_to_execute.is_empty() {
1944            return Ok(()); // No operations to execute
1945        }
1946
1947        // Execute all operations
1948        let mut files_to_remove = AHashSet::<String>::new();
1949
1950        for operation in operations_to_execute {
1951            // Reset the session before each operation to ensure fresh data is loaded
1952            // This clears any cached table registrations that might interfere with file operations
1953            self.reset_session();
1954
1955            match operation.operation_type.as_str() {
1956                "split_before" => {
1957                    // Query data before the deletion range and write it
1958                    // Use optimize_file_loading=false for precise file control during split operations
1959                    let instrument_ids = identifier.map(|id| vec![id.to_string()]);
1960                    let before_data = self.query_typed_data::<T>(
1961                        instrument_ids,
1962                        Some(UnixNanos::from(operation.query_start)),
1963                        Some(UnixNanos::from(operation.query_end)),
1964                        None,
1965                        Some(operation.files.clone()),
1966                        false, // optimize_file_loading=false for precise file control
1967                    )?;
1968
1969                    if !before_data.is_empty() {
1970                        let start_ts = UnixNanos::from(operation.file_start_ns);
1971                        let end_ts = UnixNanos::from(operation.file_end_ns);
1972                        self.write_to_parquet(
1973                            before_data,
1974                            Some(start_ts),
1975                            Some(end_ts),
1976                            Some(true),
1977                        )?;
1978                    }
1979                }
1980                "split_after" => {
1981                    // Query data after the deletion range and write it
1982                    // Use optimize_file_loading=false for precise file control during split operations
1983                    let instrument_ids = identifier.map(|id| vec![id.to_string()]);
1984                    let after_data = self.query_typed_data::<T>(
1985                        instrument_ids,
1986                        Some(UnixNanos::from(operation.query_start)),
1987                        Some(UnixNanos::from(operation.query_end)),
1988                        None,
1989                        Some(operation.files.clone()),
1990                        false, // optimize_file_loading=false for precise file control
1991                    )?;
1992
1993                    if !after_data.is_empty() {
1994                        let start_ts = UnixNanos::from(operation.file_start_ns);
1995                        let end_ts = UnixNanos::from(operation.file_end_ns);
1996                        self.write_to_parquet(
1997                            after_data,
1998                            Some(start_ts),
1999                            Some(end_ts),
2000                            Some(true),
2001                        )?;
2002                    }
2003                }
2004                _ => {
2005                    // For "remove" operations, just mark files for removal
2006                }
2007            }
2008
2009            // Mark files for removal (applies to all operation types)
2010            for file in operation.files {
2011                files_to_remove.insert(file);
2012            }
2013        }
2014
2015        // Remove all files that were processed
2016        for file in files_to_remove {
2017            if let Err(e) = self.delete_file(&file) {
2018                log::warn!("Failed to delete file {file}: {e}");
2019            }
2020        }
2021
2022        Ok(())
2023    }
2024
2025    /// Prepares all operations for data deletion by identifying files that need to be
2026    /// split or removed.
2027    ///
2028    /// This auxiliary function handles all the preparation logic for deletion:
2029    /// 1. Filters intervals by time range
2030    /// 2. Identifies files that intersect with the deletion range
2031    /// 3. Creates split operations for files that partially overlap
2032    /// 4. Generates removal operations for files completely within the range
2033    ///
2034    /// # Parameters
2035    ///
2036    /// - `type_name`: The data type directory name for path generation.
2037    /// - `identifier`: Optional instrument identifier for path generation.
2038    /// - `intervals`: List of (`start_ts`, `end_ts`) tuples representing existing file intervals.
2039    /// - `start`: Optional start timestamp for deletion range.
2040    /// - `end`: Optional end timestamp for deletion range.
2041    ///
2042    /// # Returns
2043    ///
2044    /// Returns a vector of `DeleteOperation` structs ready for execution.
2045    pub fn prepare_delete_operations(
2046        &self,
2047        type_name: &str,
2048        identifier: Option<&str>,
2049        intervals: &[(u64, u64)],
2050        start: Option<UnixNanos>,
2051        end: Option<UnixNanos>,
2052    ) -> anyhow::Result<Vec<DeleteOperation>> {
2053        // Convert start/end to nanoseconds
2054        let delete_start_ns = start.map(|s| s.as_u64());
2055        let delete_end_ns = end.map(|e| e.as_u64());
2056
2057        let mut operations = Vec::new();
2058
2059        // Get directory for file path construction
2060        let directory = self.make_path(type_name, identifier)?;
2061
2062        // Process each interval (which represents an actual file)
2063        for &(file_start_ns, file_end_ns) in intervals {
2064            // Check if file intersects with deletion range
2065            let intersects = (delete_start_ns.is_none() || delete_start_ns.unwrap() <= file_end_ns)
2066                && (delete_end_ns.is_none() || file_start_ns <= delete_end_ns.unwrap());
2067
2068            if !intersects {
2069                continue; // File doesn't intersect with deletion range
2070            }
2071
2072            // Construct file path from interval timestamps
2073            let filename = timestamps_to_filename(
2074                UnixNanos::from(file_start_ns),
2075                UnixNanos::from(file_end_ns),
2076            );
2077            let file_path = make_object_store_path(&directory, &[&filename]);
2078
2079            // Determine what type of operation is needed
2080            let file_completely_within_range = (delete_start_ns.is_none()
2081                || delete_start_ns.unwrap() <= file_start_ns)
2082                && (delete_end_ns.is_none() || file_end_ns <= delete_end_ns.unwrap());
2083
2084            if file_completely_within_range {
2085                // File is completely within deletion range - just mark for removal
2086                operations.push(DeleteOperation {
2087                    operation_type: "remove".to_string(),
2088                    files: vec![file_path],
2089                    query_start: 0,
2090                    query_end: 0,
2091                    file_start_ns: 0,
2092                    file_end_ns: 0,
2093                });
2094            } else {
2095                // File partially overlaps - need to split
2096                if let Some(delete_start) = delete_start_ns
2097                    && file_start_ns < delete_start
2098                {
2099                    // Keep data before deletion range
2100                    operations.push(DeleteOperation {
2101                        operation_type: "split_before".to_string(),
2102                        files: vec![file_path.clone()],
2103                        query_start: file_start_ns,
2104                        query_end: delete_start.saturating_sub(1), // Exclusive end
2105                        file_start_ns,
2106                        file_end_ns: delete_start.saturating_sub(1),
2107                    });
2108                }
2109
2110                if let Some(delete_end) = delete_end_ns
2111                    && delete_end < file_end_ns
2112                {
2113                    // Keep data after deletion range
2114                    operations.push(DeleteOperation {
2115                        operation_type: "split_after".to_string(),
2116                        files: vec![file_path.clone()],
2117                        query_start: delete_end.saturating_add(1), // Exclusive start
2118                        query_end: file_end_ns,
2119                        file_start_ns: delete_end.saturating_add(1),
2120                        file_end_ns,
2121                    });
2122                }
2123            }
2124        }
2125
2126        Ok(operations)
2127    }
2128}