nautilus_persistence/backend/catalog_operations.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Catalog operations for data consolidation and reset functionality.
17//!
18//! This module contains the consolidation and reset operations for the `ParquetDataCatalog`.
19//! These operations are separated into their own module for better organization and maintainability.
20
21use ahash::{AHashMap, AHashSet};
22use futures::StreamExt;
23use nautilus_core::UnixNanos;
24use nautilus_model::data::{
25 Bar, CustomData, Data, HasTsInit, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta,
26 OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose,
27};
28use nautilus_serialization::arrow::{DecodeDataFromRecordBatch, EncodeToRecordBatch};
29use object_store::{ObjectStoreExt, path::Path as ObjectPath};
30
31use crate::{
32 backend::catalog::{
33 CatalogPathPrefix, ParquetDataCatalog, are_intervals_contiguous, are_intervals_disjoint,
34 extract_path_components, make_object_store_path, parse_filename_timestamps,
35 timestamps_to_filename,
36 },
37 parquet::{
38 combine_parquet_files_from_object_store, min_max_from_parquet_metadata_object_store,
39 },
40};
41
42/// Information about a consolidation query to be executed.
43///
44/// This struct encapsulates all the information needed to execute a single consolidation
45/// operation, including the data range to query and file naming strategy.
46///
47/// # Fields
48///
49/// - `query_start`: Start timestamp for the data query range (inclusive, in nanoseconds).
50/// - `query_end`: End timestamp for the data query range (inclusive, in nanoseconds).
51/// - `use_period_boundaries`: If true, uses period boundaries for file naming; if false, uses actual data timestamps.
52///
53/// # Usage
54///
55/// This struct is used internally by the consolidation system to plan and execute
56/// data consolidation operations. It allows the system to:
57/// - Separate query planning from execution.
58/// - Handle complex scenarios like data splitting.
59/// - Optimize file naming strategies.
60/// - Batch multiple operations efficiently.
61/// - Maintain file contiguity across periods.
62///
63/// # Examples
64///
65/// ```rust,no_run
66/// use nautilus_persistence::backend::catalog_operations::ConsolidationQuery;
67///
68/// // Regular consolidation query
69/// let query = ConsolidationQuery {
70/// query_start: 1609459200000000000,
71/// query_end: 1609545600000000000,
72/// use_period_boundaries: true,
73/// };
74///
75/// // Split operation to preserve data
76/// let split_query = ConsolidationQuery {
77/// query_start: 1609459200000000000,
78/// query_end: 1609462800000000000,
79/// use_period_boundaries: false,
80/// };
81/// ```
82#[derive(Debug, Clone)]
83pub struct ConsolidationQuery {
84 /// Start timestamp for the query range (inclusive, in nanoseconds)
85 pub query_start: u64,
86 /// End timestamp for the query range (inclusive, in nanoseconds)
87 pub query_end: u64,
88 /// Whether to use period boundaries for file naming (true) or actual data timestamps (false)
89 pub use_period_boundaries: bool,
90}
91
92/// Information about a deletion operation to be executed.
93///
94/// This struct encapsulates all the information needed to execute a single deletion
95/// operation, including the type of operation and file handling details.
96#[derive(Debug, Clone)]
97pub struct DeleteOperation {
98 /// Type of deletion operation ("remove", "`split_before`", "`split_after`").
99 pub operation_type: String,
100 /// List of files involved in this operation.
101 pub files: Vec<String>,
102 /// Start timestamp for data query (used for split operations).
103 pub query_start: u64,
104 /// End timestamp for data query (used for split operations).
105 pub query_end: u64,
106 /// Start timestamp for new file naming (used for split operations).
107 pub file_start_ns: u64,
108 /// End timestamp for new file naming (used for split operations).
109 pub file_end_ns: u64,
110}
111
112impl ParquetDataCatalog {
113 /// Consolidates all data files in the catalog.
114 ///
115 /// This method identifies all leaf directories in the catalog that contain parquet files
116 /// and consolidates them. A leaf directory is one that contains files but no subdirectories.
117 /// This is a convenience method that effectively calls `consolidate_data` for all data types
118 /// and instrument IDs in the catalog.
119 ///
120 /// # Parameters
121 ///
122 /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
123 /// greater than or equal to this value will be consolidated. If None, all files
124 /// from the beginning of time will be considered.
125 /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
126 /// less than or equal to this value will be consolidated. If None, all files
127 /// up to the end of time will be considered.
128 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
129 ///
130 /// # Returns
131 ///
132 /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
133 ///
134 /// # Errors
135 ///
136 /// Returns an error if:
137 /// - Directory listing fails.
138 /// - File consolidation operations fail.
139 /// - Interval validation fails (when `ensure_contiguous_files` is true).
140 ///
141 /// # Examples
142 ///
143 /// ```rust,no_run
144 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
145 /// use nautilus_core::UnixNanos;
146 ///
147 /// let catalog = ParquetDataCatalog::new(/* ... */);
148 ///
149 /// // Consolidate all files in the catalog
150 /// catalog.consolidate_catalog(None, None, None, None)?;
151 ///
152 /// // Consolidate only files within a specific time range
153 /// catalog.consolidate_catalog(
154 /// Some(UnixNanos::from(1609459200000000000)),
155 /// Some(UnixNanos::from(1609545600000000000)),
156 /// Some(true),
157 /// None
158 /// )?;
159 /// # Ok::<(), anyhow::Error>(())
160 /// ```
161 pub fn consolidate_catalog(
162 &self,
163 start: Option<UnixNanos>,
164 end: Option<UnixNanos>,
165 ensure_contiguous_files: Option<bool>,
166 deduplicate: Option<bool>,
167 ) -> anyhow::Result<()> {
168 let leaf_directories = self.find_leaf_data_directories()?;
169
170 for directory in leaf_directories {
171 self.consolidate_directory(
172 &directory,
173 start,
174 end,
175 ensure_contiguous_files,
176 deduplicate,
177 )?;
178 }
179
180 Ok(())
181 }
182
183 /// Consolidates data files for a specific data type and identifier.
184 ///
185 /// This method consolidates Parquet files within a specific directory (defined by data type
186 /// and optional identifier) by merging multiple files into a single file. This improves
187 /// query performance and can reduce storage overhead.
188 ///
189 /// # Parameters
190 ///
191 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
192 /// - `identifier`: Optional identifier to target a specific instrument's data. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
193 /// - `start`: Optional start timestamp to limit consolidation to files within this range.
194 /// - `end`: Optional end timestamp to limit consolidation to files within this range.
195 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous (default: true).
196 ///
197 /// # Returns
198 ///
199 /// Returns `Ok(())` on success, or an error if consolidation fails.
200 ///
201 /// # Errors
202 ///
203 /// Returns an error if:
204 /// - The directory path cannot be constructed.
205 /// - File consolidation operations fail.
206 /// - Interval validation fails (when `ensure_contiguous_files` is true).
207 ///
208 /// # Examples
209 ///
210 /// ```rust,no_run
211 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
212 /// use nautilus_core::UnixNanos;
213 ///
214 /// let catalog = ParquetDataCatalog::new(/* ... */);
215 ///
216 /// // Consolidate all quote files for a specific instrument
217 /// catalog.consolidate_data(
218 /// "quotes",
219 /// Some("BTCUSD".to_string()),
220 /// None,
221 /// None,
222 /// None,
223 /// None
224 /// )?;
225 ///
226 /// // Consolidate trade files within a time range
227 /// catalog.consolidate_data(
228 /// "trades",
229 /// None,
230 /// Some(UnixNanos::from(1609459200000000000)),
231 /// Some(UnixNanos::from(1609545600000000000)),
232 /// Some(true),
233 /// None
234 /// )?;
235 /// # Ok::<(), anyhow::Error>(())
236 /// ```
237 pub fn consolidate_data(
238 &self,
239 type_name: &str,
240 identifier: Option<&str>,
241 start: Option<UnixNanos>,
242 end: Option<UnixNanos>,
243 ensure_contiguous_files: Option<bool>,
244 deduplicate: Option<bool>,
245 ) -> anyhow::Result<()> {
246 let directory = self.make_path(type_name, identifier)?;
247 self.consolidate_directory(&directory, start, end, ensure_contiguous_files, deduplicate)
248 }
249
250 /// Consolidates Parquet files within a specific directory by merging them into a single file.
251 ///
252 /// This internal method performs the actual consolidation work for a single directory.
253 /// It identifies files within the specified time range, validates their intervals,
254 /// and combines them into a single Parquet file with optimized storage.
255 ///
256 /// # Parameters
257 ///
258 /// - `directory`: The directory path containing Parquet files to consolidate.
259 /// - `start`: Optional start timestamp to limit consolidation to files within this range.
260 /// - `end`: Optional end timestamp to limit consolidation to files within this range.
261 /// - `ensure_contiguous_files`: Whether to validate that consolidated intervals are contiguous.
262 ///
263 /// # Returns
264 ///
265 /// Returns `Ok(())` on success, or an error if consolidation fails.
266 ///
267 /// # Behavior
268 ///
269 /// - Skips consolidation if directory contains 1 or fewer files.
270 /// - Filters files by timestamp range if start/end are specified.
271 /// - Sorts intervals by start timestamp before consolidation.
272 /// - Creates a new file spanning the entire time range of input files.
273 /// - Validates interval disjointness after consolidation (if enabled).
274 ///
275 /// # Errors
276 ///
277 /// Returns an error if:
278 /// - Directory listing fails.
279 /// - File combination operations fail.
280 /// - Interval validation fails (when `ensure_contiguous_files` is true).
281 /// - Object store operations fail.
282 fn consolidate_directory(
283 &self,
284 directory: &str,
285 start: Option<UnixNanos>,
286 end: Option<UnixNanos>,
287 ensure_contiguous_files: Option<bool>,
288 deduplicate: Option<bool>,
289 ) -> anyhow::Result<()> {
290 let parquet_files = self.list_parquet_files(directory)?;
291
292 if parquet_files.len() <= 1 {
293 return Ok(());
294 }
295
296 let mut files_to_consolidate = Vec::new();
297 let mut intervals = Vec::new();
298 let start = start.map(|t| t.as_u64());
299 let end = end.map(|t| t.as_u64());
300
301 for file in parquet_files {
302 if let Some(interval) = parse_filename_timestamps(&file) {
303 let (interval_start, interval_end) = interval;
304 let include_file = match (start, end) {
305 (Some(s), Some(e)) => interval_start >= s && interval_end <= e,
306 (Some(s), None) => interval_start >= s,
307 (None, Some(e)) => interval_end <= e,
308 (None, None) => true,
309 };
310
311 if include_file {
312 files_to_consolidate.push(file);
313 intervals.push(interval);
314 }
315 }
316 }
317
318 intervals.sort_by_key(|&(start, _)| start);
319
320 if !intervals.is_empty() {
321 let file_name = timestamps_to_filename(
322 UnixNanos::from(intervals[0].0),
323 UnixNanos::from(intervals.last().unwrap().1),
324 );
325 let path = make_object_store_path(directory, &[&file_name]);
326
327 // Convert string paths to ObjectPath for the function call
328 let object_paths: Vec<ObjectPath> = files_to_consolidate
329 .iter()
330 .map(|path| ObjectPath::from(path.as_str()))
331 .collect();
332
333 self.execute_async(async {
334 combine_parquet_files_from_object_store(
335 self.object_store.clone(),
336 object_paths,
337 &ObjectPath::from(path),
338 Some(self.compression),
339 Some(self.max_row_group_size),
340 deduplicate,
341 )
342 .await
343 })?;
344 }
345
346 if ensure_contiguous_files.unwrap_or(true) && !are_intervals_disjoint(&intervals) {
347 anyhow::bail!("Intervals are not disjoint after consolidating a directory");
348 }
349
350 Ok(())
351 }
352
353 /// Consolidates all data files in the catalog by splitting them into fixed time periods.
354 ///
355 /// This method identifies all leaf directories in the catalog that contain parquet files
356 /// and consolidates them by period. A leaf directory is one that contains files but no subdirectories.
357 /// This is a convenience method that effectively calls `consolidate_data_by_period` for all data types
358 /// and instrument IDs in the catalog.
359 ///
360 /// # Parameters
361 ///
362 /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
363 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
364 /// - `start`: Optional start timestamp for the consolidation range. Only files with timestamps
365 /// greater than or equal to this value will be consolidated. If None, all files
366 /// from the beginning of time will be considered.
367 /// - `end`: Optional end timestamp for the consolidation range. Only files with timestamps
368 /// less than or equal to this value will be consolidated. If None, all files
369 /// up to the end of time will be considered.
370 /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
371 /// If false, uses actual data timestamps for file naming.
372 ///
373 /// # Returns
374 ///
375 /// Returns `Ok(())` on success, or an error if consolidation fails for any directory.
376 ///
377 /// # Errors
378 ///
379 /// Returns an error if:
380 /// - Directory listing fails.
381 /// - Data type extraction from path fails.
382 /// - Period-based consolidation operations fail.
383 ///
384 /// # Notes
385 ///
386 /// - This operation can be resource-intensive for large catalogs with many data types.
387 /// and instruments.
388 /// - The consolidation process splits data into fixed time periods rather than combining.
389 /// all files into a single file per directory.
390 /// - Uses the same period-based consolidation logic as `consolidate_data_by_period`.
391 /// - Original files are removed and replaced with period-based consolidated files.
392 /// - This method is useful for periodic maintenance of the catalog to standardize.
393 /// file organization by time periods.
394 ///
395 /// # Examples
396 ///
397 /// ```rust,no_run
398 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
399 /// use nautilus_core::UnixNanos;
400 ///
401 /// let catalog = ParquetDataCatalog::new(/* ... */);
402 ///
403 /// // Consolidate all files in the catalog by 1-day periods
404 /// catalog.consolidate_catalog_by_period(
405 /// Some(86400000000000), // 1 day in nanoseconds
406 /// None,
407 /// None,
408 /// Some(true)
409 /// )?;
410 ///
411 /// // Consolidate only files within a specific time range by 1-hour periods
412 /// catalog.consolidate_catalog_by_period(
413 /// Some(3600000000000), // 1 hour in nanoseconds
414 /// Some(UnixNanos::from(1609459200000000000)),
415 /// Some(UnixNanos::from(1609545600000000000)),
416 /// Some(false)
417 /// )?;
418 /// # Ok::<(), anyhow::Error>(())
419 /// ```
420 pub fn consolidate_catalog_by_period(
421 &mut self,
422 period_nanos: Option<u64>,
423 start: Option<UnixNanos>,
424 end: Option<UnixNanos>,
425 ensure_contiguous_files: Option<bool>,
426 ) -> anyhow::Result<()> {
427 let leaf_directories = self.find_leaf_data_directories()?;
428
429 for directory in leaf_directories {
430 let (data_cls, identifier) =
431 self.extract_data_cls_and_identifier_from_path(&directory)?;
432
433 if let Some(data_cls_name) = data_cls {
434 let identifier_ref = identifier.as_deref();
435 // Use match statement to call the generic consolidate_data_by_period for various types
436 match data_cls_name.as_str() {
437 "quotes" => {
438 self.consolidate_data_by_period_generic::<QuoteTick>(
439 identifier_ref,
440 period_nanos,
441 start,
442 end,
443 ensure_contiguous_files,
444 )?;
445 }
446 "trades" => {
447 self.consolidate_data_by_period_generic::<TradeTick>(
448 identifier_ref,
449 period_nanos,
450 start,
451 end,
452 ensure_contiguous_files,
453 )?;
454 }
455 "order_book_deltas" => {
456 self.consolidate_data_by_period_generic::<OrderBookDelta>(
457 identifier_ref,
458 period_nanos,
459 start,
460 end,
461 ensure_contiguous_files,
462 )?;
463 }
464 "order_book_depths" => {
465 self.consolidate_data_by_period_generic::<OrderBookDepth10>(
466 identifier_ref,
467 period_nanos,
468 start,
469 end,
470 ensure_contiguous_files,
471 )?;
472 }
473 "bars" => {
474 self.consolidate_data_by_period_generic::<Bar>(
475 identifier_ref,
476 period_nanos,
477 start,
478 end,
479 ensure_contiguous_files,
480 )?;
481 }
482 "index_prices" => {
483 self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
484 identifier_ref,
485 period_nanos,
486 start,
487 end,
488 ensure_contiguous_files,
489 )?;
490 }
491 "mark_prices" => {
492 self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
493 identifier_ref,
494 period_nanos,
495 start,
496 end,
497 ensure_contiguous_files,
498 )?;
499 }
500 "instrument_closes" => {
501 self.consolidate_data_by_period_generic::<InstrumentClose>(
502 identifier_ref,
503 period_nanos,
504 start,
505 end,
506 ensure_contiguous_files,
507 )?;
508 }
509 _ => {
510 // Check if it's a custom data type (starts with "custom/")
511 if data_cls_name.starts_with("custom/") {
512 // Extract the custom type name (everything after "custom/")
513 let custom_type_name = data_cls_name.strip_prefix("custom/").unwrap();
514 self.consolidate_custom_data_by_period(
515 custom_type_name,
516 identifier_ref,
517 period_nanos,
518 start,
519 end,
520 ensure_contiguous_files,
521 )?;
522 } else {
523 // Skip unknown data types
524 log::warn!("Unknown data type for consolidation: {data_cls_name}");
525 }
526 }
527 }
528 }
529 }
530
531 Ok(())
532 }
533
534 /// Extracts data class and identifier from a directory path.
535 ///
536 /// This method parses a directory path to extract the data type and optional
537 /// instrument identifier. It's used to determine what type of data consolidation
538 /// to perform for each directory.
539 ///
540 /// # Parameters
541 ///
542 /// - `path`: The directory path to parse.
543 ///
544 /// # Returns
545 ///
546 /// Returns a tuple of (`data_class`, identifier) where both are optional strings.
547 pub fn extract_data_cls_and_identifier_from_path(
548 &self,
549 path: &str,
550 ) -> anyhow::Result<(Option<String>, Option<String>)> {
551 // Use cross-platform path parsing
552 let path_components = extract_path_components(path);
553
554 // Find the "data" directory in the path
555 if let Some(data_index) = path_components.iter().position(|part| part == "data")
556 && data_index + 1 < path_components.len()
557 {
558 let second = &path_components[data_index + 1];
559
560 // Custom data: data/custom/TypeName[/identifier segments...]
561 if *second == "custom" && data_index + 2 < path_components.len() {
562 let type_name = path_components[data_index + 2].clone();
563 let data_cls = format!("custom/{type_name}");
564 let identifier = if data_index + 3 < path_components.len() {
565 Some(path_components[data_index + 3..].join("/"))
566 } else {
567 None
568 };
569 return Ok((Some(data_cls), identifier));
570 }
571
572 let data_cls = second.clone();
573 let identifier = if data_index + 2 < path_components.len() {
574 Some(path_components[data_index + 2].clone())
575 } else {
576 None
577 };
578
579 return Ok((Some(data_cls), identifier));
580 }
581
582 // If we can't parse the path, return None for both
583 Ok((None, None))
584 }
585
586 /// Consolidates data files by splitting them into fixed time periods.
587 ///
588 /// This method queries data by period and writes consolidated files immediately,
589 /// using efficient period-based consolidation logic. When start/end boundaries intersect existing files,
590 /// the function automatically splits those files to preserve all data.
591 ///
592 /// # Parameters
593 ///
594 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
595 /// - `identifier`: Optional instrument ID to consolidate. If None, consolidates all instruments.
596 /// - `period_nanos`: The period duration for consolidation in nanoseconds. Default is 1 day (86400000000000).
597 /// Examples: 3600000000000 (1 hour), 604800000000000 (7 days), 1800000000000 (30 minutes)
598 /// - `start`: Optional start timestamp for consolidation range. If None, uses earliest available data.
599 /// If specified and intersects existing files, those files will be split to preserve
600 /// data outside the consolidation range.
601 /// - `end`: Optional end timestamp for consolidation range. If None, uses latest available data.
602 /// If specified and intersects existing files, those files will be split to preserve
603 /// data outside the consolidation range.
604 /// - `ensure_contiguous_files`: If true, uses period boundaries for file naming.
605 /// If false, uses actual data timestamps for file naming.
606 ///
607 /// # Returns
608 ///
609 /// Returns `Ok(())` on success, or an error if consolidation fails.
610 ///
611 /// # Errors
612 ///
613 /// Returns an error if:
614 /// - The directory path cannot be constructed.
615 /// - File operations fail.
616 /// - Data querying or writing fails.
617 ///
618 /// # Notes
619 ///
620 /// - Uses two-phase approach: first determines all queries, then executes them.
621 /// - Groups intervals into contiguous groups to preserve holes between groups.
622 /// - Allows consolidation across multiple files within each contiguous group.
623 /// - Skips queries if target files already exist for efficiency.
624 /// - Original files are removed immediately after querying each period.
625 /// - When `ensure_contiguous_files=false`, file timestamps match actual data range.
626 /// - When `ensure_contiguous_files=true`, file timestamps use period boundaries.
627 /// - Uses modulo arithmetic for efficient period boundary calculation.
628 /// - Preserves holes in data by preventing queries from spanning across gaps.
629 /// - Automatically splits files at start/end boundaries to preserve all data.
630 /// - Split operations are executed before consolidation to ensure data preservation.
631 ///
632 /// # Examples
633 ///
634 /// ```rust,no_run
635 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
636 /// use nautilus_core::UnixNanos;
637 ///
638 /// let catalog = ParquetDataCatalog::new(/* ... */);
639 ///
640 /// // Consolidate all quote files by 1-day periods
641 /// catalog.consolidate_data_by_period(
642 /// "quotes",
643 /// None,
644 /// Some(86400000000000), // 1 day in nanoseconds
645 /// None,
646 /// None,
647 /// Some(true)
648 /// )?;
649 ///
650 /// // Consolidate specific instrument by 1-hour periods
651 /// catalog.consolidate_data_by_period(
652 /// "trades",
653 /// Some("BTCUSD".to_string()),
654 /// Some(3600000000000), // 1 hour in nanoseconds
655 /// Some(UnixNanos::from(1609459200000000000)),
656 /// Some(UnixNanos::from(1609545600000000000)),
657 /// Some(false)
658 /// )?;
659 /// # Ok::<(), anyhow::Error>(())
660 /// ```
661 pub fn consolidate_data_by_period(
662 &mut self,
663 type_name: &str,
664 identifier: Option<&str>,
665 period_nanos: Option<u64>,
666 start: Option<UnixNanos>,
667 end: Option<UnixNanos>,
668 ensure_contiguous_files: Option<bool>,
669 ) -> anyhow::Result<()> {
670 // Use match statement to call the generic consolidate_data_by_period for various types
671 match type_name {
672 "quotes" => {
673 self.consolidate_data_by_period_generic::<QuoteTick>(
674 identifier,
675 period_nanos,
676 start,
677 end,
678 ensure_contiguous_files,
679 )?;
680 }
681 "trades" => {
682 self.consolidate_data_by_period_generic::<TradeTick>(
683 identifier,
684 period_nanos,
685 start,
686 end,
687 ensure_contiguous_files,
688 )?;
689 }
690 "order_book_deltas" => {
691 self.consolidate_data_by_period_generic::<OrderBookDelta>(
692 identifier,
693 period_nanos,
694 start,
695 end,
696 ensure_contiguous_files,
697 )?;
698 }
699 "order_book_depths" => {
700 self.consolidate_data_by_period_generic::<OrderBookDepth10>(
701 identifier,
702 period_nanos,
703 start,
704 end,
705 ensure_contiguous_files,
706 )?;
707 }
708 "bars" => {
709 self.consolidate_data_by_period_generic::<Bar>(
710 identifier,
711 period_nanos,
712 start,
713 end,
714 ensure_contiguous_files,
715 )?;
716 }
717 "index_prices" => {
718 self.consolidate_data_by_period_generic::<IndexPriceUpdate>(
719 identifier,
720 period_nanos,
721 start,
722 end,
723 ensure_contiguous_files,
724 )?;
725 }
726 "mark_prices" => {
727 self.consolidate_data_by_period_generic::<MarkPriceUpdate>(
728 identifier,
729 period_nanos,
730 start,
731 end,
732 ensure_contiguous_files,
733 )?;
734 }
735 "instrument_closes" => {
736 self.consolidate_data_by_period_generic::<InstrumentClose>(
737 identifier,
738 period_nanos,
739 start,
740 end,
741 ensure_contiguous_files,
742 )?;
743 }
744 _ => {
745 // Check if it's a custom data type (starts with "custom/")
746 if type_name.starts_with("custom/") {
747 // Extract the custom type name (everything after "custom/")
748 let custom_type_name = type_name.strip_prefix("custom/").unwrap();
749 self.consolidate_custom_data_by_period(
750 custom_type_name,
751 identifier,
752 period_nanos,
753 start,
754 end,
755 ensure_contiguous_files,
756 )?;
757 } else {
758 anyhow::bail!("Unknown data type for consolidation: {type_name}");
759 }
760 }
761 }
762
763 Ok(())
764 }
765
766 /// Generic consolidate data files by splitting them into fixed time periods.
767 ///
768 /// This is a type-safe version of `consolidate_data_by_period` that uses generic types
769 /// to ensure compile-time correctness and enable reuse across different data types.
770 ///
771 /// # Type Parameters
772 ///
773 /// - `T`: The data type to consolidate, must implement required traits for serialization.
774 ///
775 /// # Parameters
776 ///
777 /// - `identifier`: Optional instrument ID to target a specific instrument's data.
778 /// - `period_nanos`: Optional period size in nanoseconds (default: 1 day).
779 /// - `start`: Optional start timestamp for consolidation range.
780 /// - `end`: Optional end timestamp for consolidation range.
781 /// - `ensure_contiguous_files`: Optional flag to control file naming strategy.
782 ///
783 /// # Returns
784 ///
785 /// Returns `Ok(())` on success, or an error if consolidation fails.
786 pub fn consolidate_data_by_period_generic<T>(
787 &mut self,
788 identifier: Option<&str>,
789 period_nanos: Option<u64>,
790 start: Option<UnixNanos>,
791 end: Option<UnixNanos>,
792 ensure_contiguous_files: Option<bool>,
793 ) -> anyhow::Result<()>
794 where
795 T: DecodeDataFromRecordBatch
796 + CatalogPathPrefix
797 + EncodeToRecordBatch
798 + HasTsInit
799 + TryFrom<Data>
800 + Clone,
801 {
802 let period_nanos = period_nanos.unwrap_or(86400000000000); // Default: 1 day
803 let ensure_contiguous_files = ensure_contiguous_files.unwrap_or(true);
804
805 // Use get_intervals for cleaner implementation
806 let intervals = self.get_intervals(T::path_prefix(), identifier)?;
807
808 if intervals.is_empty() {
809 return Ok(()); // No files to consolidate
810 }
811
812 // Use auxiliary function to prepare all queries for execution
813 let queries_to_execute = self.prepare_consolidation_queries(
814 T::path_prefix(),
815 identifier,
816 &intervals,
817 period_nanos,
818 start,
819 end,
820 ensure_contiguous_files,
821 )?;
822
823 if queries_to_execute.is_empty() {
824 return Ok(()); // No queries to execute
825 }
826
827 // Get directory for file operations
828 let directory = self.make_path(T::path_prefix(), identifier)?;
829 let mut existing_files = self.list_parquet_files(&directory)?;
830 existing_files.sort();
831
832 // Capture the overall window's left bound before the loop consumes queries_to_execute,
833 // a source file is only deleted when its interval is fully consumed by the consolidation.
834 let overall_query_start = queries_to_execute[0].query_start;
835
836 // Phase 2: Execute queries, write, and delete
837 let mut file_start_ns: Option<u64> = None; // Track contiguity across periods
838
839 for query_info in queries_to_execute {
840 // Query data for this period using query_typed_data
841 let instrument_ids = identifier.map(|id| vec![id.to_string()]);
842
843 // Use optimize_file_loading=false to match Python behavior:
844 // During consolidation, we want to read only the specific files being consolidated,
845 // not the entire directory. This ensures precise file control during consolidation.
846 let period_data = self.query_typed_data::<T>(
847 instrument_ids,
848 Some(UnixNanos::from(query_info.query_start)),
849 Some(UnixNanos::from(query_info.query_end)),
850 None,
851 Some(existing_files.clone()),
852 false, // optimize_file_loading=false for precise file control during consolidation
853 )?;
854
855 if period_data.is_empty() {
856 // Skip if no data found, but maintain contiguity by using query start
857 if file_start_ns.is_none() {
858 file_start_ns = Some(query_info.query_start);
859 }
860 continue;
861 }
862
863 // Determine final file timestamps
864 let (final_start_ns, final_end_ns) = if query_info.use_period_boundaries {
865 // Use period boundaries for file naming, maintaining contiguity
866 if file_start_ns.is_none() {
867 file_start_ns = Some(query_info.query_start);
868 }
869 let start = file_start_ns.unwrap();
870 (start, query_info.query_end)
871 } else {
872 // Use actual data timestamps for file naming
873 let first_ts = period_data.first().unwrap().ts_init().as_u64();
874 let last_ts = period_data.last().unwrap().ts_init().as_u64();
875 (first_ts, last_ts)
876 };
877
878 // Check again if target file exists (in case it was created during this process)
879 let target_filename = format!(
880 "{}/{}",
881 directory,
882 timestamps_to_filename(
883 UnixNanos::from(final_start_ns),
884 UnixNanos::from(final_end_ns)
885 )
886 );
887
888 if self.file_exists(&target_filename)? {
889 // Skip if target file already exists
890 continue;
891 }
892
893 // Write consolidated data for this period using write_to_parquet
894 // Use skip_disjoint_check since we're managing file removal carefully
895 let start_ts = UnixNanos::from(final_start_ns);
896 let end_ts = UnixNanos::from(final_end_ns);
897 self.write_to_parquet(period_data, Some(start_ts), Some(end_ts), Some(true))?;
898
899 // Delete files fully consumed by this period; keep straddlers so no data is lost
900 for file in existing_files.clone() {
901 if let Some(interval) = parse_filename_timestamps(&file)
902 && interval.1 <= query_info.query_end
903 && interval.0 >= overall_query_start
904 {
905 existing_files.retain(|f| f != &file);
906 self.delete_file(&file)?;
907 }
908 }
909
910 // Reset so next period starts a new contiguous segment
911 file_start_ns = None;
912 }
913
914 Ok(())
915 }
916
917 /// Consolidates custom data files by splitting them into fixed time periods.
918 ///
919 /// This method provides consolidation for custom data types that don't have compile-time
920 /// type information. It uses dynamic querying and writing methods.
921 ///
922 /// # Parameters
923 ///
924 /// - `type_name`: The custom data type name (without "custom/" prefix).
925 /// - `identifier`: Optional instrument ID to consolidate.
926 /// - `period_nanos`: Optional period size in nanoseconds (default: 1 day).
927 /// - `start`: Optional start timestamp for consolidation range.
928 /// - `end`: Optional end timestamp for consolidation range.
929 /// - `ensure_contiguous_files`: Optional flag to control file naming strategy.
930 ///
931 /// # Returns
932 ///
933 /// Returns `Ok(())` on success, or an error if consolidation fails.
934 fn consolidate_custom_data_by_period(
935 &mut self,
936 type_name: &str,
937 identifier: Option<&str>,
938 period_nanos: Option<u64>,
939 start: Option<UnixNanos>,
940 end: Option<UnixNanos>,
941 ensure_contiguous_files: Option<bool>,
942 ) -> anyhow::Result<()> {
943 let period_nanos = period_nanos.unwrap_or(86400000000000); // Default: 1 day
944 let ensure_contiguous_files = ensure_contiguous_files.unwrap_or(true);
945
946 // Get intervals for the custom data type
947 let path_prefix = format!("custom/{type_name}");
948 let intervals = self.get_intervals(&path_prefix, identifier)?;
949
950 if intervals.is_empty() {
951 return Ok(()); // No files to consolidate
952 }
953
954 // Use auxiliary function to prepare all queries for execution
955 let queries_to_execute = self.prepare_consolidation_queries(
956 &path_prefix,
957 identifier,
958 &intervals,
959 period_nanos,
960 start,
961 end,
962 ensure_contiguous_files,
963 )?;
964
965 if queries_to_execute.is_empty() {
966 return Ok(()); // No queries to execute
967 }
968
969 // Get directory for file operations
970 let directory = self.make_path(&path_prefix, identifier)?;
971 let mut existing_files = self.list_parquet_files(&directory)?;
972 existing_files.sort();
973
974 // Capture the overall window's left bound before the loop consumes queries_to_execute,
975 // a source file is only deleted when its interval is fully consumed by the consolidation.
976 let overall_query_start = queries_to_execute[0].query_start;
977
978 // Phase 2: Execute queries, write, and delete
979 let mut file_start_ns: Option<u64> = None; // Track contiguity across periods
980
981 for query_info in queries_to_execute {
982 // Query custom data for this period using query_custom_data_dynamic
983 let instrument_ids = identifier.map(|id| vec![id.to_string()]);
984
985 let period_data = self.query_custom_data_dynamic(
986 type_name,
987 instrument_ids.as_deref(),
988 Some(UnixNanos::from(query_info.query_start)),
989 Some(UnixNanos::from(query_info.query_end)),
990 None,
991 Some(existing_files.clone()),
992 false, // optimize_file_loading=false for precise file control during consolidation
993 )?;
994
995 if period_data.is_empty() {
996 // Skip if no data found, but maintain contiguity by using query start
997 if file_start_ns.is_none() {
998 file_start_ns = Some(query_info.query_start);
999 }
1000 continue;
1001 }
1002
1003 // Determine final file timestamps
1004 let (final_start_ns, final_end_ns) = if query_info.use_period_boundaries {
1005 // Use period boundaries for file naming, maintaining contiguity
1006 if file_start_ns.is_none() {
1007 file_start_ns = Some(query_info.query_start);
1008 }
1009 let start = file_start_ns.unwrap();
1010 (start, query_info.query_end)
1011 } else {
1012 // Use actual data timestamps for file naming
1013 let first_ts = period_data.first().unwrap().ts_init().as_u64();
1014 let last_ts = period_data.last().unwrap().ts_init().as_u64();
1015 (first_ts, last_ts)
1016 };
1017
1018 // Check again if target file exists (in case it was created during this process)
1019 let target_filename = format!(
1020 "{}/{}",
1021 directory,
1022 timestamps_to_filename(
1023 UnixNanos::from(final_start_ns),
1024 UnixNanos::from(final_end_ns)
1025 )
1026 );
1027
1028 if self.file_exists(&target_filename)? {
1029 // Skip if target file already exists
1030 continue;
1031 }
1032
1033 // Group custom data by type for writing
1034 let mut custom_data_by_type: AHashMap<String, Vec<CustomData>> = AHashMap::new();
1035
1036 for data in period_data {
1037 if let Data::Custom(c) = data {
1038 let type_name_str = c.data.type_name().to_string();
1039 custom_data_by_type
1040 .entry(type_name_str)
1041 .or_default()
1042 .push(c);
1043 }
1044 }
1045
1046 // Write consolidated data for each type
1047 for (_, items) in custom_data_by_type {
1048 let start_ts = UnixNanos::from(final_start_ns);
1049 let end_ts = UnixNanos::from(final_end_ns);
1050 self.write_custom_data_batch(items, Some(start_ts), Some(end_ts), Some(true))?;
1051 }
1052
1053 // Delete files fully consumed by this period; keep straddlers so no data is lost
1054 for file in existing_files.clone() {
1055 if let Some(interval) = parse_filename_timestamps(&file)
1056 && interval.1 <= query_info.query_end
1057 && interval.0 >= overall_query_start
1058 {
1059 existing_files.retain(|f| f != &file);
1060 self.delete_file(&file)?;
1061 }
1062 }
1063
1064 // Reset so next period starts a new contiguous segment
1065 file_start_ns = None;
1066 }
1067
1068 Ok(())
1069 }
1070
1071 /// Deletes custom data within a specified time range.
1072 ///
1073 /// This method provides deletion for custom data types that don't have compile-time
1074 /// type information. It uses dynamic querying and writing methods.
1075 ///
1076 /// # Parameters
1077 ///
1078 /// - `type_name`: The custom data type name (without "custom/" prefix).
1079 /// - `identifier`: Optional instrument ID to delete data for.
1080 /// - `start`: Optional start timestamp for the deletion range.
1081 /// - `end`: Optional end timestamp for the deletion range.
1082 ///
1083 /// # Returns
1084 ///
1085 /// Returns `Ok(())` on success, or an error if deletion fails.
1086 fn delete_custom_data_range(
1087 &mut self,
1088 type_name: &str,
1089 identifier: Option<&str>,
1090 start: Option<UnixNanos>,
1091 end: Option<UnixNanos>,
1092 ) -> anyhow::Result<()> {
1093 let path_prefix = format!("custom/{type_name}");
1094
1095 // Get intervals for the custom data type
1096 let intervals = self.get_intervals(&path_prefix, identifier)?;
1097
1098 if intervals.is_empty() {
1099 return Ok(()); // No files to process
1100 }
1101
1102 // Prepare all operations for execution
1103 let operations_to_execute =
1104 self.prepare_delete_operations(&path_prefix, identifier, &intervals, start, end)?;
1105
1106 if operations_to_execute.is_empty() {
1107 return Ok(()); // No operations to execute
1108 }
1109
1110 // Execute all operations
1111 let mut files_to_remove = AHashSet::<String>::new();
1112
1113 for operation in operations_to_execute {
1114 // Reset the session before each operation
1115 self.reset_session();
1116
1117 match operation.operation_type.as_str() {
1118 "split_before" => {
1119 // Query custom data before the deletion range and write it
1120 let instrument_ids = identifier.map(|id| vec![id.to_string()]);
1121 let before_data = self.query_custom_data_dynamic(
1122 type_name,
1123 instrument_ids.as_deref(),
1124 Some(UnixNanos::from(operation.query_start)),
1125 Some(UnixNanos::from(operation.query_end)),
1126 None,
1127 Some(operation.files.clone()),
1128 false,
1129 )?;
1130
1131 if !before_data.is_empty() {
1132 // Group custom data by type for writing
1133 use ahash::AHashMap;
1134 let mut custom_data_by_type: AHashMap<String, Vec<CustomData>> =
1135 AHashMap::new();
1136
1137 for data in before_data {
1138 if let Data::Custom(c) = data {
1139 let type_name_str = c.data.type_name().to_string();
1140 custom_data_by_type
1141 .entry(type_name_str)
1142 .or_default()
1143 .push(c);
1144 }
1145 }
1146
1147 // Write data for each type
1148 for (_, items) in custom_data_by_type {
1149 let start_ts = UnixNanos::from(operation.file_start_ns);
1150 let end_ts = UnixNanos::from(operation.file_end_ns);
1151 self.write_custom_data_batch(
1152 items,
1153 Some(start_ts),
1154 Some(end_ts),
1155 Some(true),
1156 )?;
1157 }
1158 }
1159 }
1160 "split_after" => {
1161 // Query custom data after the deletion range and write it
1162 let instrument_ids = identifier.map(|id| vec![id.to_string()]);
1163 let after_data = self.query_custom_data_dynamic(
1164 type_name,
1165 instrument_ids.as_deref(),
1166 Some(UnixNanos::from(operation.query_start)),
1167 Some(UnixNanos::from(operation.query_end)),
1168 None,
1169 Some(operation.files.clone()),
1170 false,
1171 )?;
1172
1173 if !after_data.is_empty() {
1174 // Group custom data by type for writing
1175 use ahash::AHashMap;
1176 let mut custom_data_by_type: AHashMap<String, Vec<CustomData>> =
1177 AHashMap::new();
1178
1179 for data in after_data {
1180 if let Data::Custom(c) = data {
1181 let type_name_str = c.data.type_name().to_string();
1182 custom_data_by_type
1183 .entry(type_name_str)
1184 .or_default()
1185 .push(c);
1186 }
1187 }
1188
1189 // Write data for each type
1190 for (_, items) in custom_data_by_type {
1191 let start_ts = UnixNanos::from(operation.file_start_ns);
1192 let end_ts = UnixNanos::from(operation.file_end_ns);
1193 self.write_custom_data_batch(
1194 items,
1195 Some(start_ts),
1196 Some(end_ts),
1197 Some(true),
1198 )?;
1199 }
1200 }
1201 }
1202 _ => {
1203 // For "remove" operations, just mark files for removal
1204 }
1205 }
1206
1207 // Mark files for removal (applies to all operation types)
1208 for file in operation.files {
1209 files_to_remove.insert(file);
1210 }
1211 }
1212
1213 // Remove all files that were processed
1214 for file in files_to_remove {
1215 if let Err(e) = self.delete_file(&file) {
1216 log::warn!("Failed to delete file {file}: {e}");
1217 }
1218 }
1219
1220 Ok(())
1221 }
1222
1223 /// Prepares all queries for consolidation by filtering, grouping, and handling splits.
1224 ///
1225 /// This auxiliary function handles all the preparation logic for consolidation:
1226 /// 1. Filters intervals by time range.
1227 /// 2. Groups intervals into contiguous groups.
1228 /// 3. Identifies and creates split operations for data preservation.
1229 /// 4. Generates period-based consolidation queries.
1230 /// 5. Checks for existing target files.
1231 #[expect(clippy::too_many_arguments)]
1232 pub fn prepare_consolidation_queries(
1233 &self,
1234 type_name: &str,
1235 identifier: Option<&str>,
1236 intervals: &[(u64, u64)],
1237 period_nanos: u64,
1238 start: Option<UnixNanos>,
1239 end: Option<UnixNanos>,
1240 ensure_contiguous_files: bool,
1241 ) -> anyhow::Result<Vec<ConsolidationQuery>> {
1242 // Filter intervals by time range if specified
1243 let used_start = start.map(|s| s.as_u64());
1244 let used_end = end.map(|e| e.as_u64());
1245
1246 let mut filtered_intervals = Vec::new();
1247
1248 for &(interval_start, interval_end) in intervals {
1249 // Check if interval overlaps with the specified range
1250 if (used_start.is_none() || used_start.unwrap() <= interval_end)
1251 && (used_end.is_none() || interval_start <= used_end.unwrap())
1252 {
1253 filtered_intervals.push((interval_start, interval_end));
1254 }
1255 }
1256
1257 if filtered_intervals.is_empty() {
1258 return Ok(Vec::new()); // No intervals in the specified range
1259 }
1260
1261 // Check contiguity of filtered intervals if required
1262 if ensure_contiguous_files && !are_intervals_contiguous(&filtered_intervals) {
1263 anyhow::bail!(
1264 "Intervals are not contiguous. When ensure_contiguous_files=true, \
1265 all files in the consolidation range must have contiguous timestamps."
1266 );
1267 }
1268
1269 // Group intervals by the target period: split only when the gap between files
1270 // exceeds one period, since sub-period gaps land in the same consolidated file.
1271 let contiguous_groups = self.group_contiguous_intervals(&filtered_intervals, period_nanos);
1272
1273 let mut queries_to_execute = Vec::new();
1274
1275 // Handle interval splitting by creating split operations for data preservation
1276 if !filtered_intervals.is_empty() {
1277 if let Some(start_ts) = used_start {
1278 let first_interval = filtered_intervals[0];
1279 if first_interval.0 < start_ts && start_ts <= first_interval.1 {
1280 // Split before start: preserve data from interval_start to start-1
1281 queries_to_execute.push(ConsolidationQuery {
1282 query_start: first_interval.0,
1283 query_end: start_ts - 1,
1284 use_period_boundaries: false,
1285 });
1286 }
1287 }
1288
1289 if let Some(end_ts) = used_end {
1290 let last_interval = filtered_intervals[filtered_intervals.len() - 1];
1291 if last_interval.0 <= end_ts && end_ts < last_interval.1 {
1292 // Split after end: preserve data from end+1 to interval_end
1293 queries_to_execute.push(ConsolidationQuery {
1294 query_start: end_ts + 1,
1295 query_end: last_interval.1,
1296 use_period_boundaries: false,
1297 });
1298 }
1299 }
1300 }
1301
1302 // Generate period-based consolidation queries for each contiguous group
1303 for group in contiguous_groups {
1304 let group_start = group[0].0;
1305 let group_end = group[group.len() - 1].1;
1306
1307 // Apply start/end filtering to the group
1308 let effective_start = used_start.map_or(group_start, |s| s.max(group_start));
1309 let effective_end = used_end.map_or(group_end, |e| e.min(group_end));
1310
1311 if effective_start > effective_end {
1312 continue; // Skip if no overlap
1313 }
1314
1315 // Generate period-based queries within this contiguous group
1316 let mut current_start_ns = (effective_start / period_nanos) * period_nanos;
1317
1318 // Add safety check to prevent infinite loops (match Python logic)
1319 let max_iterations = 10000;
1320 let mut iteration_count = 0;
1321
1322 while current_start_ns <= effective_end {
1323 iteration_count += 1;
1324 if iteration_count > max_iterations {
1325 // Safety break to prevent infinite loops
1326 break;
1327 }
1328 let current_end_ns = (current_start_ns + period_nanos - 1).min(effective_end);
1329
1330 // Check if target file already exists (only when ensure_contiguous_files is true)
1331 if ensure_contiguous_files {
1332 let directory = self.make_path(type_name, identifier)?;
1333 let target_filename = format!(
1334 "{}/{}",
1335 directory,
1336 timestamps_to_filename(
1337 UnixNanos::from(current_start_ns),
1338 UnixNanos::from(current_end_ns)
1339 )
1340 );
1341
1342 if self.file_exists(&target_filename)? {
1343 // Skip if target file already exists
1344 current_start_ns += period_nanos;
1345 continue;
1346 }
1347 }
1348
1349 // Add query to execution list
1350 queries_to_execute.push(ConsolidationQuery {
1351 query_start: current_start_ns,
1352 query_end: current_end_ns,
1353 use_period_boundaries: ensure_contiguous_files,
1354 });
1355
1356 // Move to next period
1357 current_start_ns += period_nanos;
1358
1359 if current_start_ns > effective_end {
1360 break;
1361 }
1362 }
1363 }
1364
1365 // Sort queries by start date to enable efficient file removal
1366 // Files can be removed when interval[1] <= query_info["query_end"]
1367 // and processing in chronological order ensures optimal cleanup
1368 queries_to_execute.sort_by_key(|q| q.query_start);
1369
1370 Ok(queries_to_execute)
1371 }
1372
1373 /// Groups intervals for period-based consolidation.
1374 ///
1375 /// Groups adjacent intervals into the same bucket unless the gap between them exceeds
1376 /// `period_nanos`. Sub-period gaps land in the same consolidated file anyway, so they
1377 /// do not warrant a split. Gaps larger than one period represent genuine data holes.
1378 ///
1379 /// # Parameters
1380 ///
1381 /// - `intervals`: A slice of timestamp intervals as (start, end) tuples, sorted by start.
1382 /// - `period_nanos`: The target consolidation period; gaps larger than this split groups.
1383 ///
1384 /// # Returns
1385 ///
1386 /// Returns a vector of groups. Returns an empty vector if the input is empty.
1387 ///
1388 /// # Examples
1389 ///
1390 /// ```text
1391 /// Legacy chunked files with period=86_400_000_000_000 (1 day):
1392 /// [(1,5), (6,10), (11,15)] -> [[(1,5), (6,10), (11,15)]]
1393 ///
1394 /// Small period=1 with mixed gaps:
1395 /// [(1,5), (8,10), (12,15)] -> [[(1,5)], [(8,10)], [(12,15)]]
1396 /// ```
1397 #[must_use]
1398 pub fn group_contiguous_intervals(
1399 &self,
1400 intervals: &[(u64, u64)],
1401 period_nanos: u64,
1402 ) -> Vec<Vec<(u64, u64)>> {
1403 if intervals.is_empty() {
1404 return Vec::new();
1405 }
1406
1407 // Split groups only when the gap between files exceeds one period,
1408 // since sub-period gaps land in the same consolidated file anyway.
1409 // This works for both legacy chunked files (gap ~1ns) and fragment-per-flush
1410 // catalogs (gap ~bar interval) without inferring spacing from the data.
1411 let mut contiguous_groups = Vec::new();
1412 let mut current_group = vec![intervals[0]];
1413
1414 for i in 1..intervals.len() {
1415 let prev_end = intervals[i - 1].1;
1416 let curr_start = intervals[i].0;
1417
1418 if curr_start.saturating_sub(prev_end) > period_nanos {
1419 contiguous_groups.push(current_group);
1420 current_group = vec![intervals[i]];
1421 } else {
1422 current_group.push(intervals[i]);
1423 }
1424 }
1425
1426 contiguous_groups.push(current_group);
1427
1428 contiguous_groups
1429 }
1430
1431 /// Checks if a file exists in the object store.
1432 ///
1433 /// This method performs a HEAD operation on the object store to determine if a file
1434 /// exists without downloading its content. It works with both local and remote object stores.
1435 ///
1436 /// # Parameters
1437 ///
1438 /// - `path`: The file path to check, relative to the catalog structure.
1439 ///
1440 /// # Returns
1441 ///
1442 /// Returns `true` if the file exists, `false` if it doesn't exist.
1443 ///
1444 /// # Errors
1445 ///
1446 /// Returns an error if the object store operation fails due to network issues,
1447 /// authentication problems, or other I/O errors.
1448 fn file_exists(&self, path: &str) -> anyhow::Result<bool> {
1449 let object_path = self.to_object_path(path)?;
1450 let exists = self.execute_async(async {
1451 let result: bool = self.object_store.head(&object_path).await.is_ok();
1452 Ok(result)
1453 })?;
1454 Ok(exists)
1455 }
1456
1457 /// Deletes a file from the object store.
1458 ///
1459 /// This method removes a file from the object store. The operation is permanent
1460 /// and cannot be undone. It works with both local filesystems and remote object stores.
1461 ///
1462 /// # Parameters
1463 ///
1464 /// - `path`: The file path to delete, relative to the catalog structure.
1465 ///
1466 /// # Returns
1467 ///
1468 /// Returns `Ok(())` on successful deletion.
1469 ///
1470 /// # Errors
1471 ///
1472 /// Returns an error if:
1473 /// - The file doesn't exist.
1474 /// - Permission is denied.
1475 /// - Network issues occur (for remote stores).
1476 /// - The object store operation fails.
1477 ///
1478 /// # Safety
1479 ///
1480 /// This operation is irreversible. Ensure the file is no longer needed before deletion.
1481 fn delete_file(&self, path: &str) -> anyhow::Result<()> {
1482 let object_path = self.to_object_path(path)?;
1483 self.execute_async(async {
1484 self.object_store
1485 .delete(&object_path)
1486 .await
1487 .map_err(anyhow::Error::from)
1488 })?;
1489 Ok(())
1490 }
1491
1492 /// Resets the filenames of all Parquet files in the catalog to match their actual content timestamps.
1493 ///
1494 /// This method scans all leaf data directories in the catalog and renames files based on
1495 /// the actual timestamp range of their content. This is useful when files have been
1496 /// modified or when filename conventions have changed.
1497 ///
1498 /// # Returns
1499 ///
1500 /// Returns `Ok(())` on success, or an error if the operation fails.
1501 ///
1502 /// # Errors
1503 ///
1504 /// Returns an error if:
1505 /// - Directory listing fails.
1506 /// - File metadata reading fails.
1507 /// - File rename operations fail.
1508 /// - Interval validation fails after renaming.
1509 ///
1510 /// # Examples
1511 ///
1512 /// ```rust,no_run
1513 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1514 ///
1515 /// let catalog = ParquetDataCatalog::new(/* ... */);
1516 ///
1517 /// // Reset all filenames in the catalog
1518 /// catalog.reset_all_file_names()?;
1519 /// # Ok::<(), anyhow::Error>(())
1520 /// ```
1521 pub fn reset_all_file_names(&self) -> anyhow::Result<()> {
1522 let leaf_directories = self.find_leaf_data_directories()?;
1523
1524 for directory in leaf_directories {
1525 self.reset_file_names(&directory)?;
1526 }
1527
1528 Ok(())
1529 }
1530
1531 /// Resets the filenames of Parquet files for a specific data type and identifier.
1532 ///
1533 /// This method renames files in a specific directory based on the actual timestamp
1534 /// range of their content. This is useful for correcting filenames after data
1535 /// modifications or when filename conventions have changed.
1536 ///
1537 /// # Parameters
1538 ///
1539 /// - `data_cls`: The data type directory name (e.g., "quotes", "trades").
1540 /// - `identifier`: Optional identifier to target a specific instrument's data. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL").
1541 ///
1542 /// # Returns
1543 ///
1544 /// Returns `Ok(())` on success, or an error if the operation fails.
1545 ///
1546 /// # Errors
1547 ///
1548 /// Returns an error if:
1549 /// - The directory path cannot be constructed.
1550 /// - File metadata reading fails.
1551 /// - File rename operations fail.
1552 /// - Interval validation fails after renaming.
1553 ///
1554 /// # Examples
1555 ///
1556 /// ```rust,no_run
1557 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1558 ///
1559 /// let catalog = ParquetDataCatalog::new(/* ... */);
1560 ///
1561 /// // Reset filenames for all quote files
1562 /// catalog.reset_data_file_names("quotes", None)?;
1563 ///
1564 /// // Reset filenames for a specific instrument's trade files
1565 /// catalog.reset_data_file_names("trades", Some("BTCUSD".to_string()))?;
1566 /// # Ok::<(), anyhow::Error>(())
1567 /// ```
1568 pub fn reset_data_file_names(
1569 &self,
1570 data_cls: &str,
1571 identifier: Option<&str>,
1572 ) -> anyhow::Result<()> {
1573 let directory = self.make_path(data_cls, identifier)?;
1574 self.reset_file_names(&directory)
1575 }
1576
1577 /// Resets the filenames of Parquet files in a directory to match their actual content timestamps.
1578 ///
1579 /// This internal method scans all Parquet files in a directory, reads their metadata to
1580 /// determine the actual timestamp range of their content, and renames the files accordingly.
1581 /// This ensures that filenames accurately reflect the data they contain.
1582 ///
1583 /// # Parameters
1584 ///
1585 /// - `directory`: The directory path containing Parquet files to rename.
1586 ///
1587 /// # Returns
1588 ///
1589 /// Returns `Ok(())` on success, or an error if the operation fails.
1590 ///
1591 /// # Process
1592 ///
1593 /// 1. Lists all Parquet files in the directory
1594 /// 2. For each file, reads metadata to extract min/max timestamps
1595 /// 3. Generates a new filename based on actual timestamp range
1596 /// 4. Moves the file to the new name using object store operations
1597 /// 5. Validates that intervals remain disjoint after renaming
1598 ///
1599 /// # Errors
1600 ///
1601 /// Returns an error if:
1602 /// - Directory listing fails.
1603 /// - Metadata reading fails for any file.
1604 /// - File move operations fail.
1605 /// - Interval validation fails after renaming.
1606 /// - Object store operations fail.
1607 ///
1608 /// # Notes
1609 ///
1610 /// - This operation can be time-consuming for directories with many files.
1611 /// - Files are processed sequentially to avoid conflicts.
1612 /// - The operation is atomic per file but not across the entire directory.
1613 fn reset_file_names(&self, directory: &str) -> anyhow::Result<()> {
1614 let parquet_files = self.list_parquet_files(directory)?;
1615
1616 for file in parquet_files {
1617 let object_path = ObjectPath::from(file.as_str());
1618 let (first_ts, last_ts) = self.execute_async(async {
1619 min_max_from_parquet_metadata_object_store(
1620 self.object_store.clone(),
1621 &object_path,
1622 "ts_init",
1623 )
1624 .await
1625 })?;
1626
1627 let new_filename =
1628 timestamps_to_filename(UnixNanos::from(first_ts), UnixNanos::from(last_ts));
1629 let new_file_path = make_object_store_path(directory, &[&new_filename]);
1630 let new_object_path = ObjectPath::from(new_file_path);
1631
1632 self.move_file(&object_path, &new_object_path)?;
1633 }
1634
1635 let intervals = self.get_directory_intervals(directory)?;
1636
1637 if !are_intervals_disjoint(&intervals) {
1638 anyhow::bail!("Intervals are not disjoint after resetting file names");
1639 }
1640
1641 Ok(())
1642 }
1643
1644 /// Finds all leaf data directories in the catalog.
1645 ///
1646 /// A leaf directory is one that contains data files but no subdirectories.
1647 /// This method is used to identify directories that can be processed for
1648 /// consolidation or other operations.
1649 ///
1650 /// # Returns
1651 ///
1652 /// Returns a vector of directory path strings representing leaf directories,
1653 /// or an error if directory traversal fails.
1654 ///
1655 /// # Errors
1656 ///
1657 /// Returns an error if:
1658 /// - Object store listing operations fail.
1659 /// - Directory structure cannot be analyzed.
1660 ///
1661 /// # Examples
1662 ///
1663 /// ```rust,no_run
1664 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1665 ///
1666 /// let catalog = ParquetDataCatalog::new(/* ... */);
1667 ///
1668 /// let leaf_dirs = catalog.find_leaf_data_directories()?;
1669 /// for dir in leaf_dirs {
1670 /// println!("Found leaf directory: {}", dir);
1671 /// }
1672 /// # Ok::<(), anyhow::Error>(())
1673 /// ```
1674 pub fn find_leaf_data_directories(&self) -> anyhow::Result<Vec<String>> {
1675 let data_dir = make_object_store_path(&self.base_path, &["data"]);
1676
1677 let leaf_dirs = self.execute_async(async {
1678 let mut all_paths = AHashSet::new();
1679 let mut directories = AHashSet::new();
1680 let mut files_in_dirs = AHashMap::new();
1681
1682 // List all objects under the data directory
1683 let prefix = ObjectPath::from(format!("{data_dir}/"));
1684 let mut stream = self.object_store.list(Some(&prefix));
1685
1686 while let Some(object) = stream.next().await {
1687 let object = object?;
1688 let path_str = object.location.to_string();
1689 all_paths.insert(path_str.clone());
1690
1691 // Extract directory path
1692 if let Some(parent) = std::path::Path::new(&path_str).parent() {
1693 let parent_str = parent.to_string_lossy().to_string();
1694 directories.insert(parent_str.clone());
1695
1696 // Track files in each directory
1697 files_in_dirs
1698 .entry(parent_str)
1699 .or_insert_with(Vec::new)
1700 .push(path_str);
1701 }
1702 }
1703
1704 // Find leaf directories (directories with files but no subdirectories)
1705 let mut leaf_dirs = Vec::new();
1706
1707 for dir in &directories {
1708 let has_files = files_in_dirs
1709 .get(dir)
1710 .is_some_and(|files| !files.is_empty());
1711 let has_subdirs = directories
1712 .iter()
1713 .any(|d| d.starts_with(&make_object_store_path(dir, &[""])) && d != dir);
1714
1715 if has_files && !has_subdirs {
1716 leaf_dirs.push(dir.clone());
1717 }
1718 }
1719
1720 Ok::<Vec<String>, anyhow::Error>(leaf_dirs)
1721 })?;
1722
1723 Ok(leaf_dirs)
1724 }
1725
1726 /// Deletes data within a specified time range for a specific data type and identifier.
1727 ///
1728 /// This method identifies all parquet files that intersect with the specified time range
1729 /// and handles them appropriately:
1730 /// - Files completely within the range are deleted
1731 /// - Files partially overlapping the range are split to preserve data outside the range
1732 /// - The original intersecting files are removed after processing
1733 ///
1734 /// # Parameters
1735 ///
1736 /// - `type_name`: The data type directory name (e.g., "quotes", "trades", "bars").
1737 /// - `identifier`: Optional identifier to delete data for. Can be an instrument_id (e.g., "EUR/USD.SIM") or a bar_type (e.g., "EUR/USD.SIM-1-MINUTE-LAST-EXTERNAL"). If None, deletes data across all identifiers.
1738 /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1739 /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1740 ///
1741 /// # Returns
1742 ///
1743 /// Returns `Ok(())` on success, or an error if deletion fails.
1744 ///
1745 /// # Errors
1746 ///
1747 /// Returns an error if:
1748 /// - The directory path cannot be constructed.
1749 /// - File operations fail.
1750 /// - Data querying or writing fails.
1751 ///
1752 /// # Notes
1753 ///
1754 /// - This operation permanently removes data and cannot be undone.
1755 /// - Files that partially overlap the deletion range are split to preserve data outside the range.
1756 /// - The method ensures data integrity by using atomic operations where possible.
1757 /// - Empty directories are not automatically removed after deletion.
1758 ///
1759 /// # Examples
1760 ///
1761 /// ```rust,no_run
1762 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1763 /// use nautilus_core::UnixNanos;
1764 ///
1765 /// let catalog = ParquetDataCatalog::new(/* ... */);
1766 ///
1767 /// // Delete all quote data for a specific instrument
1768 /// catalog.delete_data_range(
1769 /// "quotes",
1770 /// Some("BTCUSD".to_string()),
1771 /// None,
1772 /// None
1773 /// )?;
1774 ///
1775 /// // Delete trade data within a specific time range
1776 /// catalog.delete_data_range(
1777 /// "trades",
1778 /// None,
1779 /// Some(UnixNanos::from(1609459200000000000)),
1780 /// Some(UnixNanos::from(1609545600000000000))
1781 /// )?;
1782 /// # Ok::<(), anyhow::Error>(())
1783 /// ```
1784 pub fn delete_data_range(
1785 &mut self,
1786 type_name: &str,
1787 identifier: Option<&str>,
1788 start: Option<UnixNanos>,
1789 end: Option<UnixNanos>,
1790 ) -> anyhow::Result<()> {
1791 // Use match statement to call the generic delete_data_range for various types
1792 match type_name {
1793 "quotes" => self.delete_data_range_generic::<QuoteTick>(identifier, start, end),
1794 "trades" => self.delete_data_range_generic::<TradeTick>(identifier, start, end),
1795 "bars" => self.delete_data_range_generic::<Bar>(identifier, start, end),
1796 "order_book_deltas" => {
1797 self.delete_data_range_generic::<OrderBookDelta>(identifier, start, end)
1798 }
1799 "order_book_depth10" => {
1800 self.delete_data_range_generic::<OrderBookDepth10>(identifier, start, end)
1801 }
1802 _ => {
1803 // Check if it's a custom data type (starts with "custom/")
1804 if type_name.starts_with("custom/") {
1805 // Extract the custom type name (everything after "custom/")
1806 let custom_type_name = type_name.strip_prefix("custom/").unwrap();
1807 self.delete_custom_data_range(custom_type_name, identifier, start, end)
1808 } else {
1809 anyhow::bail!("Unsupported data type: {type_name}");
1810 }
1811 }
1812 }
1813 }
1814
1815 /// Deletes data within a specified time range across the entire catalog.
1816 ///
1817 /// This method identifies all leaf directories in the catalog that contain parquet files
1818 /// and deletes data within the specified time range from each directory. A leaf directory
1819 /// is one that contains files but no subdirectories. This is a convenience method that
1820 /// effectively calls `delete_data_range` for all data types and instrument IDs in the catalog.
1821 ///
1822 /// # Parameters
1823 ///
1824 /// - `start`: Optional start timestamp for the deletion range. If None, deletes from the beginning.
1825 /// - `end`: Optional end timestamp for the deletion range. If None, deletes to the end.
1826 ///
1827 /// # Returns
1828 ///
1829 /// Returns `Ok(())` on success, or an error if deletion fails.
1830 ///
1831 /// # Errors
1832 ///
1833 /// Returns an error if:
1834 /// - Directory traversal fails.
1835 /// - Data class extraction from paths fails.
1836 /// - Individual delete operations fail.
1837 ///
1838 /// # Notes
1839 ///
1840 /// - This operation permanently removes data and cannot be undone.
1841 /// - The deletion process handles file intersections intelligently by splitting files
1842 /// when they partially overlap with the deletion range.
1843 /// - Files completely within the deletion range are removed entirely.
1844 /// - Files partially overlapping the deletion range are split to preserve data outside the range.
1845 /// - This method is useful for bulk data cleanup operations across the entire catalog.
1846 /// - Empty directories are not automatically removed after deletion.
1847 ///
1848 /// # Examples
1849 ///
1850 /// ```rust,no_run
1851 /// use nautilus_persistence::backend::catalog::ParquetDataCatalog;
1852 /// use nautilus_core::UnixNanos;
1853 ///
1854 /// let mut catalog = ParquetDataCatalog::new(/* ... */);
1855 ///
1856 /// // Delete all data before a specific date across entire catalog
1857 /// catalog.delete_catalog_range(
1858 /// None,
1859 /// Some(UnixNanos::from(1609459200000000000))
1860 /// )?;
1861 ///
1862 /// // Delete all data within a specific range across entire catalog
1863 /// catalog.delete_catalog_range(
1864 /// Some(UnixNanos::from(1609459200000000000)),
1865 /// Some(UnixNanos::from(1609545600000000000))
1866 /// )?;
1867 ///
1868 /// // Delete all data after a specific date across entire catalog
1869 /// catalog.delete_catalog_range(
1870 /// Some(UnixNanos::from(1609459200000000000)),
1871 /// None
1872 /// )?;
1873 /// # Ok::<(), anyhow::Error>(())
1874 /// ```
1875 pub fn delete_catalog_range(
1876 &mut self,
1877 start: Option<UnixNanos>,
1878 end: Option<UnixNanos>,
1879 ) -> anyhow::Result<()> {
1880 let leaf_directories = self.find_leaf_data_directories()?;
1881
1882 for directory in leaf_directories {
1883 if let Ok((Some(data_type), identifier)) =
1884 self.extract_data_cls_and_identifier_from_path(&directory)
1885 {
1886 // Call the existing delete_data_range method
1887 if let Err(e) =
1888 self.delete_data_range(&data_type, identifier.as_deref(), start, end)
1889 {
1890 log::warn!("Failed to delete data in directory {directory}: {e}");
1891 // Continue with other directories instead of failing completely
1892 }
1893 }
1894 }
1895
1896 Ok(())
1897 }
1898
1899 /// Generic implementation for deleting data within a specified time range.
1900 ///
1901 /// This method provides the core deletion logic that works with any data type
1902 /// that implements the required traits. It handles file intersection analysis,
1903 /// data splitting for partial overlaps, and file cleanup.
1904 ///
1905 /// # Type Parameters
1906 ///
1907 /// - `T`: The data type that implements required traits for catalog operations.
1908 ///
1909 /// # Parameters
1910 ///
1911 /// - `identifier`: Optional instrument ID to delete data for.
1912 /// - `start`: Optional start timestamp for the deletion range.
1913 /// - `end`: Optional end timestamp for the deletion range.
1914 ///
1915 /// # Returns
1916 ///
1917 /// Returns `Ok(())` on success, or an error if deletion fails.
1918 pub fn delete_data_range_generic<T>(
1919 &mut self,
1920 identifier: Option<&str>,
1921 start: Option<UnixNanos>,
1922 end: Option<UnixNanos>,
1923 ) -> anyhow::Result<()>
1924 where
1925 T: DecodeDataFromRecordBatch
1926 + CatalogPathPrefix
1927 + EncodeToRecordBatch
1928 + HasTsInit
1929 + TryFrom<Data>
1930 + Clone,
1931 {
1932 // Get intervals for cleaner implementation
1933 let intervals = self.get_intervals(T::path_prefix(), identifier)?;
1934
1935 if intervals.is_empty() {
1936 return Ok(()); // No files to process
1937 }
1938
1939 // Prepare all operations for execution
1940 let operations_to_execute =
1941 self.prepare_delete_operations(T::path_prefix(), identifier, &intervals, start, end)?;
1942
1943 if operations_to_execute.is_empty() {
1944 return Ok(()); // No operations to execute
1945 }
1946
1947 // Execute all operations
1948 let mut files_to_remove = AHashSet::<String>::new();
1949
1950 for operation in operations_to_execute {
1951 // Reset the session before each operation to ensure fresh data is loaded
1952 // This clears any cached table registrations that might interfere with file operations
1953 self.reset_session();
1954
1955 match operation.operation_type.as_str() {
1956 "split_before" => {
1957 // Query data before the deletion range and write it
1958 // Use optimize_file_loading=false for precise file control during split operations
1959 let instrument_ids = identifier.map(|id| vec![id.to_string()]);
1960 let before_data = self.query_typed_data::<T>(
1961 instrument_ids,
1962 Some(UnixNanos::from(operation.query_start)),
1963 Some(UnixNanos::from(operation.query_end)),
1964 None,
1965 Some(operation.files.clone()),
1966 false, // optimize_file_loading=false for precise file control
1967 )?;
1968
1969 if !before_data.is_empty() {
1970 let start_ts = UnixNanos::from(operation.file_start_ns);
1971 let end_ts = UnixNanos::from(operation.file_end_ns);
1972 self.write_to_parquet(
1973 before_data,
1974 Some(start_ts),
1975 Some(end_ts),
1976 Some(true),
1977 )?;
1978 }
1979 }
1980 "split_after" => {
1981 // Query data after the deletion range and write it
1982 // Use optimize_file_loading=false for precise file control during split operations
1983 let instrument_ids = identifier.map(|id| vec![id.to_string()]);
1984 let after_data = self.query_typed_data::<T>(
1985 instrument_ids,
1986 Some(UnixNanos::from(operation.query_start)),
1987 Some(UnixNanos::from(operation.query_end)),
1988 None,
1989 Some(operation.files.clone()),
1990 false, // optimize_file_loading=false for precise file control
1991 )?;
1992
1993 if !after_data.is_empty() {
1994 let start_ts = UnixNanos::from(operation.file_start_ns);
1995 let end_ts = UnixNanos::from(operation.file_end_ns);
1996 self.write_to_parquet(
1997 after_data,
1998 Some(start_ts),
1999 Some(end_ts),
2000 Some(true),
2001 )?;
2002 }
2003 }
2004 _ => {
2005 // For "remove" operations, just mark files for removal
2006 }
2007 }
2008
2009 // Mark files for removal (applies to all operation types)
2010 for file in operation.files {
2011 files_to_remove.insert(file);
2012 }
2013 }
2014
2015 // Remove all files that were processed
2016 for file in files_to_remove {
2017 if let Err(e) = self.delete_file(&file) {
2018 log::warn!("Failed to delete file {file}: {e}");
2019 }
2020 }
2021
2022 Ok(())
2023 }
2024
2025 /// Prepares all operations for data deletion by identifying files that need to be
2026 /// split or removed.
2027 ///
2028 /// This auxiliary function handles all the preparation logic for deletion:
2029 /// 1. Filters intervals by time range
2030 /// 2. Identifies files that intersect with the deletion range
2031 /// 3. Creates split operations for files that partially overlap
2032 /// 4. Generates removal operations for files completely within the range
2033 ///
2034 /// # Parameters
2035 ///
2036 /// - `type_name`: The data type directory name for path generation.
2037 /// - `identifier`: Optional instrument identifier for path generation.
2038 /// - `intervals`: List of (`start_ts`, `end_ts`) tuples representing existing file intervals.
2039 /// - `start`: Optional start timestamp for deletion range.
2040 /// - `end`: Optional end timestamp for deletion range.
2041 ///
2042 /// # Returns
2043 ///
2044 /// Returns a vector of `DeleteOperation` structs ready for execution.
2045 pub fn prepare_delete_operations(
2046 &self,
2047 type_name: &str,
2048 identifier: Option<&str>,
2049 intervals: &[(u64, u64)],
2050 start: Option<UnixNanos>,
2051 end: Option<UnixNanos>,
2052 ) -> anyhow::Result<Vec<DeleteOperation>> {
2053 // Convert start/end to nanoseconds
2054 let delete_start_ns = start.map(|s| s.as_u64());
2055 let delete_end_ns = end.map(|e| e.as_u64());
2056
2057 let mut operations = Vec::new();
2058
2059 // Get directory for file path construction
2060 let directory = self.make_path(type_name, identifier)?;
2061
2062 // Process each interval (which represents an actual file)
2063 for &(file_start_ns, file_end_ns) in intervals {
2064 // Check if file intersects with deletion range
2065 let intersects = (delete_start_ns.is_none() || delete_start_ns.unwrap() <= file_end_ns)
2066 && (delete_end_ns.is_none() || file_start_ns <= delete_end_ns.unwrap());
2067
2068 if !intersects {
2069 continue; // File doesn't intersect with deletion range
2070 }
2071
2072 // Construct file path from interval timestamps
2073 let filename = timestamps_to_filename(
2074 UnixNanos::from(file_start_ns),
2075 UnixNanos::from(file_end_ns),
2076 );
2077 let file_path = make_object_store_path(&directory, &[&filename]);
2078
2079 // Determine what type of operation is needed
2080 let file_completely_within_range = (delete_start_ns.is_none()
2081 || delete_start_ns.unwrap() <= file_start_ns)
2082 && (delete_end_ns.is_none() || file_end_ns <= delete_end_ns.unwrap());
2083
2084 if file_completely_within_range {
2085 // File is completely within deletion range - just mark for removal
2086 operations.push(DeleteOperation {
2087 operation_type: "remove".to_string(),
2088 files: vec![file_path],
2089 query_start: 0,
2090 query_end: 0,
2091 file_start_ns: 0,
2092 file_end_ns: 0,
2093 });
2094 } else {
2095 // File partially overlaps - need to split
2096 if let Some(delete_start) = delete_start_ns
2097 && file_start_ns < delete_start
2098 {
2099 // Keep data before deletion range
2100 operations.push(DeleteOperation {
2101 operation_type: "split_before".to_string(),
2102 files: vec![file_path.clone()],
2103 query_start: file_start_ns,
2104 query_end: delete_start.saturating_sub(1), // Exclusive end
2105 file_start_ns,
2106 file_end_ns: delete_start.saturating_sub(1),
2107 });
2108 }
2109
2110 if let Some(delete_end) = delete_end_ns
2111 && delete_end < file_end_ns
2112 {
2113 // Keep data after deletion range
2114 operations.push(DeleteOperation {
2115 operation_type: "split_after".to_string(),
2116 files: vec![file_path.clone()],
2117 query_start: delete_end.saturating_add(1), // Exclusive start
2118 query_end: file_end_ns,
2119 file_start_ns: delete_end.saturating_add(1),
2120 file_end_ns,
2121 });
2122 }
2123 }
2124 }
2125
2126 Ok(operations)
2127 }
2128}