Skip to main content

nautilus_persistence/
parquet.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::Arc;
17
18use ahash::AHashMap;
19use arrow::record_batch::RecordBatch;
20use object_store::{ObjectStore, ObjectStoreExt, path::Path as ObjectPath};
21use parquet::{
22    arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
23    file::{
24        metadata::KeyValue,
25        properties::WriterProperties,
26        reader::{FileReader, SerializedFileReader},
27        statistics::Statistics,
28    },
29};
30use url::Url;
31
32pub(crate) fn is_remote_uri_scheme(scheme: &str) -> bool {
33    matches!(
34        scheme,
35        "s3" | "gs" | "gcs" | "az" | "abfs" | "http" | "https"
36    )
37}
38
39pub(crate) fn remote_store_root_url(uri: &str) -> anyhow::Result<Url> {
40    let mut url = Url::parse(uri)?;
41    url.set_path("");
42    url.set_query(None);
43    url.set_fragment(None);
44    Ok(url)
45}
46
47pub(crate) fn remote_full_uri(uri: &str, object_path: &str) -> anyhow::Result<String> {
48    let root = remote_store_root_url(uri)?;
49    let root = root.as_str().trim_end_matches('/');
50    let object_path = object_path.trim_start_matches('/');
51
52    if object_path.is_empty() {
53        Ok(root.to_string())
54    } else {
55        Ok(format!("{root}/{object_path}"))
56    }
57}
58
59pub(crate) enum ObjectStoreLocationKind {
60    Local,
61    Remote { store_root_url: Url },
62}
63
64pub(crate) struct ObjectStoreLocation {
65    pub object_store: Arc<dyn ObjectStore>,
66    pub base_path: String,
67    pub original_uri: String,
68    pub kind: ObjectStoreLocationKind,
69}
70
71impl ObjectStoreLocation {
72    pub(crate) fn store_root_url(&self) -> Option<&Url> {
73        match &self.kind {
74            ObjectStoreLocationKind::Local => None,
75            ObjectStoreLocationKind::Remote { store_root_url } => Some(store_root_url),
76        }
77    }
78}
79
80/// Writes a `RecordBatch` to a Parquet file using object store, with optional compression.
81///
82/// # Errors
83///
84/// Returns an error if writing to Parquet fails or any I/O operation fails.
85pub async fn write_batch_to_parquet(
86    batch: RecordBatch,
87    path: &str,
88    storage_options: Option<AHashMap<String, String>>,
89    compression: Option<parquet::basic::Compression>,
90    max_row_group_size: Option<usize>,
91) -> anyhow::Result<()> {
92    write_batches_to_parquet(
93        &[batch],
94        path,
95        storage_options,
96        compression,
97        max_row_group_size,
98    )
99    .await
100}
101
102/// Writes multiple `RecordBatch` items to a Parquet file using object store, with optional compression, row group sizing, and storage options.
103///
104/// # Errors
105///
106/// Returns an error if writing to Parquet fails or any I/O operation fails.
107pub async fn write_batches_to_parquet(
108    batches: &[RecordBatch],
109    path: &str,
110    storage_options: Option<AHashMap<String, String>>,
111    compression: Option<parquet::basic::Compression>,
112    max_row_group_size: Option<usize>,
113) -> anyhow::Result<()> {
114    let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
115    let object_path = if base_path.is_empty() {
116        ObjectPath::from(path)
117    } else {
118        ObjectPath::from(format!("{base_path}/{path}"))
119    };
120
121    write_batches_to_object_store(
122        batches,
123        object_store,
124        &object_path,
125        compression,
126        max_row_group_size,
127        None,
128    )
129    .await
130}
131
132/// Reads a Parquet file from an object store and returns all record batches plus
133/// the Arrow schema from the builder. The builder's schema includes metadata restored
134/// from the file's `ARROW:schema` key_value_metadata; use it for decoding instead of
135/// each batch's schema (which has metadata stripped).
136///
137/// # Errors
138///
139/// Returns an error if the path cannot be read or Parquet parsing fails.
140pub async fn read_parquet_from_object_store(
141    object_store: Arc<dyn ObjectStore>,
142    path: &ObjectPath,
143) -> anyhow::Result<(Vec<RecordBatch>, Arc<arrow::datatypes::Schema>)> {
144    let result: object_store::GetResult = object_store.get(path).await?;
145    let data = result.bytes().await?;
146    if data.is_empty() {
147        return Ok((
148            Vec::new(),
149            Arc::new(arrow::datatypes::Schema::new(
150                Vec::<arrow::datatypes::Field>::new(),
151            )),
152        ));
153    }
154    let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
155    let schema = builder.schema().clone();
156    let reader = builder.build()?;
157    let mut batches = Vec::new();
158    for batch in reader {
159        batches.push(batch?);
160    }
161    Ok((batches, schema))
162}
163
164/// Writes multiple `RecordBatch` items to an object store URI, with optional compression,
165/// row group sizing, and key_value_metadata (e.g. for instrument "class" so it survives roundtrip).
166///
167/// # Errors
168///
169/// Returns an error if writing to Parquet fails or any I/O operation fails.
170pub async fn write_batches_to_object_store(
171    batches: &[RecordBatch],
172    object_store: Arc<dyn ObjectStore>,
173    path: &ObjectPath,
174    compression: Option<parquet::basic::Compression>,
175    max_row_group_size: Option<usize>,
176    key_value_metadata: Option<Vec<KeyValue>>,
177) -> anyhow::Result<()> {
178    // Create a temporary buffer to write the parquet data
179    let mut buffer = Vec::new();
180
181    let mut props_builder = WriterProperties::builder()
182        .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
183        .set_max_row_group_row_count(Some(max_row_group_size.unwrap_or(5000)));
184
185    if let Some(kv) = key_value_metadata {
186        props_builder = props_builder.set_key_value_metadata(Some(kv));
187    }
188    let writer_props = props_builder.build();
189
190    let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
191    for batch in batches {
192        writer.write(batch)?;
193    }
194    writer.close()?;
195
196    // Upload the buffer to object store
197    object_store.put(path, buffer.into()).await?;
198
199    Ok(())
200}
201
202/// Deduplicates a slice of `RecordBatch` items, removing rows that are identical across all columns.
203///
204/// Rows are compared by encoding each row to a canonical byte sequence using Arrow's row format.
205/// Only the first occurrence of each unique row is retained; the relative order of unique rows
206/// is preserved.
207///
208/// # Errors
209///
210/// Returns an error if the row converter cannot be constructed or if the `take` kernel fails.
211fn deduplicate_record_batches(batches: &[RecordBatch]) -> anyhow::Result<Vec<RecordBatch>> {
212    if batches.is_empty() {
213        return Ok(Vec::new());
214    }
215
216    let schema = batches[0].schema();
217
218    let fields: Vec<arrow_row::SortField> = schema
219        .fields()
220        .iter()
221        .map(|f| arrow_row::SortField::new(f.data_type().clone()))
222        .collect();
223
224    let converter = arrow_row::RowConverter::new(fields)?;
225    let mut seen: std::collections::HashSet<Vec<u8>> = std::collections::HashSet::new();
226    let mut result: Vec<RecordBatch> = Vec::new();
227
228    for batch in batches {
229        let rows = converter.convert_columns(batch.columns())?;
230        let mut indices: Vec<u32> = Vec::new();
231
232        for (i, row) in rows.iter().enumerate() {
233            if seen.insert(row.as_ref().to_vec()) {
234                indices.push(i as u32);
235            }
236        }
237
238        if !indices.is_empty() {
239            let index_array = arrow::array::UInt32Array::from(indices);
240            let deduped_columns: Vec<arrow::array::ArrayRef> = batch
241                .columns()
242                .iter()
243                .map(|col| arrow::compute::take(col.as_ref(), &index_array, None))
244                .collect::<Result<_, _>>()?;
245            result.push(RecordBatch::try_new(schema.clone(), deduped_columns)?);
246        }
247    }
248
249    Ok(result)
250}
251
252/// Combines multiple Parquet files using object store with storage options
253///
254/// # Errors
255///
256/// Returns an error if file reading or writing fails.
257pub async fn combine_parquet_files(
258    file_paths: Vec<&str>,
259    new_file_path: &str,
260    storage_options: Option<AHashMap<String, String>>,
261    compression: Option<parquet::basic::Compression>,
262    max_row_group_size: Option<usize>,
263    deduplicate: Option<bool>,
264) -> anyhow::Result<()> {
265    if file_paths.len() <= 1 {
266        return Ok(());
267    }
268
269    // Create object store from the first file path (assuming all files are in the same store)
270    let (object_store, base_path, _) =
271        create_object_store_from_path(file_paths[0], storage_options)?;
272
273    // Convert string paths to ObjectPath
274    let object_paths: Vec<ObjectPath> = file_paths
275        .iter()
276        .map(|path| {
277            if base_path.is_empty() {
278                ObjectPath::from(*path)
279            } else {
280                ObjectPath::from(format!("{base_path}/{path}"))
281            }
282        })
283        .collect();
284
285    let new_object_path = if base_path.is_empty() {
286        ObjectPath::from(new_file_path)
287    } else {
288        ObjectPath::from(format!("{base_path}/{new_file_path}"))
289    };
290
291    combine_parquet_files_from_object_store(
292        object_store,
293        object_paths,
294        &new_object_path,
295        compression,
296        max_row_group_size,
297        deduplicate,
298    )
299    .await
300}
301
302/// Combines multiple Parquet files from object store
303///
304/// # Errors
305///
306/// Returns an error if file reading or writing fails.
307pub async fn combine_parquet_files_from_object_store(
308    object_store: Arc<dyn ObjectStore>,
309    file_paths: Vec<ObjectPath>,
310    new_file_path: &ObjectPath,
311    compression: Option<parquet::basic::Compression>,
312    max_row_group_size: Option<usize>,
313    deduplicate: Option<bool>,
314) -> anyhow::Result<()> {
315    if file_paths.len() <= 1 {
316        return Ok(());
317    }
318
319    let mut all_batches: Vec<RecordBatch> = Vec::new();
320    let mut schema_with_metadata: Option<Arc<arrow::datatypes::Schema>> = None;
321
322    // Read all files from object store
323    for path in &file_paths {
324        let result: object_store::GetResult = object_store.get(path).await?;
325        let data = result.bytes().await?;
326        let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
327
328        // Capture the schema from the first file's builder; it includes the Arrow
329        // schema-level metadata (e.g. bar_type, instrument_id) restored from the
330        // Parquet ARROW:schema key_value_metadata entry.  Individual RecordBatch
331        // objects returned by the reader have this metadata stripped, so we need
332        // to preserve it separately and re-apply it when writing the combined file.
333        if schema_with_metadata.is_none() {
334            schema_with_metadata = Some(builder.schema().clone());
335        }
336
337        let mut reader = builder.build()?;
338
339        for batch in reader.by_ref() {
340            all_batches.push(batch?);
341        }
342    }
343
344    // Re-apply the preserved schema metadata to all collected batches so that
345    // write_batches_to_object_store (which uses batches[0].schema()) can encode
346    // the correct Arrow schema metadata into the combined output file.
347    if let Some(schema) = &schema_with_metadata {
348        all_batches = all_batches
349            .into_iter()
350            .map(|b| {
351                RecordBatch::try_new(schema.clone(), b.columns().to_vec())
352                    .expect("schema re-application failed")
353            })
354            .collect();
355    }
356
357    // Deduplicate rows if requested
358    let batches_to_write = if deduplicate.unwrap_or(false) {
359        deduplicate_record_batches(&all_batches)?
360    } else {
361        all_batches
362    };
363
364    // Write combined batches to new location
365    write_batches_to_object_store(
366        &batches_to_write,
367        object_store.clone(),
368        new_file_path,
369        compression,
370        max_row_group_size,
371        None,
372    )
373    .await?;
374
375    // Remove the merged files
376    for path in &file_paths {
377        if path != new_file_path {
378            object_store.delete(path).await?;
379        }
380    }
381
382    Ok(())
383}
384
385/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata using object store with storage options.
386///
387/// # Errors
388///
389/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
390pub async fn min_max_from_parquet_metadata(
391    file_path: &str,
392    storage_options: Option<AHashMap<String, String>>,
393    column_name: &str,
394) -> anyhow::Result<(u64, u64)> {
395    let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
396    let object_path = if base_path.is_empty() {
397        ObjectPath::from(file_path)
398    } else {
399        ObjectPath::from(format!("{base_path}/{file_path}"))
400    };
401
402    min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
403}
404
405/// Extracts the minimum and maximum i64 values for the specified `column_name` from a Parquet file's metadata in object store.
406///
407/// # Errors
408///
409/// Returns an error if the file cannot be read, metadata parsing fails, or the column is missing or has no statistics.
410pub async fn min_max_from_parquet_metadata_object_store(
411    object_store: Arc<dyn ObjectStore>,
412    file_path: &ObjectPath,
413    column_name: &str,
414) -> anyhow::Result<(u64, u64)> {
415    // Download the parquet file from object store
416    let result: object_store::GetResult = object_store.get(file_path).await?;
417    let data = result.bytes().await?;
418    let reader = SerializedFileReader::new(data)?;
419
420    let metadata = reader.metadata();
421    let mut overall_min_value: Option<i64> = None;
422    let mut overall_max_value: Option<i64> = None;
423
424    // Iterate through all row groups
425    for i in 0..metadata.num_row_groups() {
426        let row_group = metadata.row_group(i);
427
428        // Iterate through all columns in this row group
429        for j in 0..row_group.num_columns() {
430            let col_metadata = row_group.column(j);
431
432            if col_metadata.column_path().string() == column_name {
433                if let Some(stats) = col_metadata.statistics() {
434                    // Check if we have Int64 statistics
435                    if let Statistics::Int64(int64_stats) = stats {
436                        // Extract min value if available
437                        if let Some(&min_value) = int64_stats.min_opt()
438                            && (overall_min_value.is_none()
439                                || min_value < overall_min_value.unwrap())
440                        {
441                            overall_min_value = Some(min_value);
442                        }
443
444                        // Extract max value if available
445                        if let Some(&max_value) = int64_stats.max_opt()
446                            && (overall_max_value.is_none()
447                                || max_value > overall_max_value.unwrap())
448                        {
449                            overall_max_value = Some(max_value);
450                        }
451                    } else {
452                        anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
453                    }
454                } else {
455                    anyhow::bail!(
456                        "Warning: Statistics not available for column '{column_name}' in row group {i}."
457                    );
458                }
459            }
460        }
461    }
462
463    // Return the min/max pair if both are available
464    if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
465        Ok((min as u64, max as u64))
466    } else {
467        anyhow::bail!(
468            "Column '{column_name}' not found or has no Int64 statistics in any row group."
469        )
470    }
471}
472
473/// Creates an object store from a URI string with optional storage options.
474///
475/// Supports multiple cloud storage providers:
476/// - AWS S3: `s3://bucket/path`
477/// - Google Cloud Storage: `gs://bucket/path` or `gcs://bucket/path`
478/// - Azure Blob Storage: `az://account/container/path` or `abfs://container@account.dfs.core.windows.net/path`
479/// - HTTP/WebDAV: `http://` or `https://`
480/// - Local files: `file://path` or plain paths
481///
482/// # Parameters
483///
484/// - `path`: The URI string for the storage location.
485/// - `storage_options`: Optional `HashMap` containing storage-specific configuration options:
486///   - For S3: `endpoint_url`, region, `access_key_id`, `secret_access_key`, `session_token`, etc.
487///   - For GCS: `service_account_path`, `service_account_key`, `project_id`, etc.
488///   - For Azure: `account_name`, `account_key`, `sas_token`, etc.
489///
490/// Returns a tuple of (`ObjectStore`, `base_path`, `normalized_uri`)
491pub fn create_object_store_from_path(
492    path: &str,
493    storage_options: Option<AHashMap<String, String>>,
494) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
495    let location = create_object_store_location_from_path(path, storage_options)?;
496    Ok((
497        location.object_store,
498        location.base_path,
499        location.original_uri,
500    ))
501}
502
503// `storage_options` is only consumed by the cloud-feature arms,
504// so keep the allow scoped to the no-cloud build.
505#[cfg_attr(
506    not(feature = "cloud"),
507    allow(unused_variables, clippy::needless_pass_by_value)
508)]
509pub(crate) fn create_object_store_location_from_path(
510    path: &str,
511    storage_options: Option<AHashMap<String, String>>,
512) -> anyhow::Result<ObjectStoreLocation> {
513    let uri = normalize_path_to_uri(path);
514
515    let (object_store, base_path, original_uri) = match uri.as_str() {
516        #[cfg(feature = "cloud")]
517        s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
518        #[cfg(feature = "cloud")]
519        s if s.starts_with("gs://") || s.starts_with("gcs://") => {
520            create_gcs_store(&uri, storage_options)
521        }
522        #[cfg(feature = "cloud")]
523        s if s.starts_with("az://") => create_azure_store(&uri, storage_options),
524        #[cfg(feature = "cloud")]
525        s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
526        #[cfg(feature = "cloud")]
527        s if s.starts_with("http://") || s.starts_with("https://") => {
528            create_http_store(&uri, storage_options)
529        }
530        #[cfg(not(feature = "cloud"))]
531        s if s.starts_with("s3://")
532            || s.starts_with("gs://")
533            || s.starts_with("gcs://")
534            || s.starts_with("az://")
535            || s.starts_with("abfs://")
536            || s.starts_with("http://")
537            || s.starts_with("https://") =>
538        {
539            anyhow::bail!("Cloud storage support requires the 'cloud' feature: {uri}")
540        }
541        s if s.starts_with("file://") => create_local_store(&uri, true),
542        _ => create_local_store(&uri, false), // Fallback: assume local path
543    }?;
544
545    let kind = Url::parse(&original_uri)
546        .ok()
547        .filter(|url| is_remote_uri_scheme(url.scheme()))
548        .map(|_| {
549            remote_store_root_url(&original_uri)
550                .map(|store_root_url| ObjectStoreLocationKind::Remote { store_root_url })
551        })
552        .transpose()?
553        .unwrap_or(ObjectStoreLocationKind::Local);
554
555    Ok(ObjectStoreLocation {
556        object_store,
557        base_path,
558        original_uri,
559        kind,
560    })
561}
562
563/// Normalizes a path to URI format for consistent object store usage.
564///
565/// If the path is already a URI (contains "://"), returns it as-is.
566/// Otherwise, converts local paths to file:// URIs with proper cross-platform handling.
567///
568/// Supported URI schemes:
569/// - `s3://` for AWS S3
570/// - `gs://` or `gcs://` for Google Cloud Storage
571/// - `az://` or `abfs://` for Azure Blob Storage
572/// - `http://` or `https://` for HTTP/WebDAV
573/// - `file://` for local files
574///
575/// # Cross-platform Path Handling
576///
577/// - Unix absolute paths: `/path/to/file` → `file:///path/to/file`
578/// - Windows drive paths: `C:\path\to\file` → `file:///C:/path/to/file`
579/// - Windows UNC paths: `\\server\share\file` → `file://server/share/file`
580/// - Relative paths: converted to absolute using current directory
581#[must_use]
582pub fn normalize_path_to_uri(path: &str) -> String {
583    if path.contains("://") {
584        // Already a URI - return as-is
585        path.to_string()
586    } else {
587        // Convert local path to file:// URI with cross-platform support
588        if is_absolute_path(path) {
589            path_to_file_uri(path)
590        } else {
591            // Relative path - make it absolute first
592            let absolute_path = std::env::current_dir().unwrap().join(path);
593            path_to_file_uri(&absolute_path.to_string_lossy())
594        }
595    }
596}
597
598/// Checks if a path is absolute on the current platform.
599#[must_use]
600fn is_absolute_path(path: &str) -> bool {
601    if path.starts_with('/') {
602        // Unix absolute path
603        true
604    } else if path.len() >= 3
605        && path.chars().nth(1) == Some(':')
606        && path.chars().nth(2) == Some('\\')
607    {
608        // Windows drive path like C:\
609        true
610    } else if path.len() >= 3
611        && path.chars().nth(1) == Some(':')
612        && path.chars().nth(2) == Some('/')
613    {
614        // Windows drive path with forward slashes like C:/
615        true
616    } else if path.starts_with("\\\\") {
617        // Windows UNC path
618        true
619    } else {
620        false
621    }
622}
623
624/// Converts an absolute path to a file:// URI with proper platform handling.
625#[must_use]
626fn path_to_file_uri(path: &str) -> String {
627    if path.starts_with('/') {
628        // Unix absolute path
629        format!("file://{path}")
630    } else if path.len() >= 3 && path.chars().nth(1) == Some(':') {
631        // Windows drive path - normalize separators and add proper prefix
632        let normalized = path.replace('\\', "/");
633        format!("file:///{normalized}")
634    } else if let Some(without_prefix) = path.strip_prefix("\\\\") {
635        // Windows UNC path \\server\share -> file://server/share
636        let normalized = without_prefix.replace('\\', "/");
637        format!("file://{normalized}")
638    } else {
639        // Fallback - treat as relative to root
640        format!("file://{path}")
641    }
642}
643
644/// Converts a file:// URI to a native path for the current platform.
645/// On Windows, "file:///C:/x/y" becomes "C:\x\y" so LocalFileSystem and std::fs work correctly.
646#[cfg(windows)]
647pub(crate) fn file_uri_to_native_path(uri: &str) -> String {
648    let without_scheme = uri
649        .strip_prefix("file://")
650        .or_else(|| uri.strip_prefix("file:"))
651        .unwrap_or(uri);
652    // Strip leading slash so "/C:/x/y" -> "C:/x/y", then use native separators
653    let without_leading = without_scheme.trim_start_matches('/');
654    without_leading.replace('/', "\\")
655}
656
657/// Converts a file:// URI to a path string for Unix (no-op; object_store accepts slash paths).
658#[cfg(not(windows))]
659pub(crate) fn file_uri_to_native_path(uri: &str) -> String {
660    uri.strip_prefix("file://").unwrap_or(uri).to_string()
661}
662
663/// Helper function to create local file system object store
664fn create_local_store(
665    uri: &str,
666    is_file_uri: bool,
667) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
668    let path = if is_file_uri {
669        file_uri_to_native_path(uri)
670    } else {
671        uri.to_string()
672    };
673
674    let local_store = object_store::local::LocalFileSystem::new_with_prefix(&path)?;
675    Ok((Arc::new(local_store), String::new(), uri.to_string()))
676}
677
678/// Helper function to create S3 object store with options.
679#[cfg(feature = "cloud")]
680fn create_s3_store(
681    uri: &str,
682    storage_options: Option<AHashMap<String, String>>,
683) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
684    let (url, path) = parse_url_and_path(uri)?;
685    let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
686
687    let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
688
689    // Apply storage options if provided
690    if let Some(options) = storage_options {
691        for (key, value) in options {
692            match key.as_str() {
693                "endpoint_url" => {
694                    builder = builder.with_endpoint(&value);
695                }
696                "region" => {
697                    builder = builder.with_region(&value);
698                }
699                "access_key_id" | "key" => {
700                    builder = builder.with_access_key_id(&value);
701                }
702                "secret_access_key" | "secret" => {
703                    builder = builder.with_secret_access_key(&value);
704                }
705                "session_token" | "token" => {
706                    builder = builder.with_token(&value);
707                }
708                "allow_http" => {
709                    let allow_http = value.to_lowercase() == "true";
710                    builder = builder.with_allow_http(allow_http);
711                }
712                _ => {
713                    // Ignore unknown options for forward compatibility
714                    log::warn!("Unknown S3 storage option: {key}");
715                }
716            }
717        }
718    }
719
720    let s3_store = builder.build()?;
721    Ok((Arc::new(s3_store), path, uri.to_string()))
722}
723
724/// Helper function to create GCS object store with options.
725#[cfg(feature = "cloud")]
726fn create_gcs_store(
727    uri: &str,
728    storage_options: Option<AHashMap<String, String>>,
729) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
730    let (url, path) = parse_url_and_path(uri)?;
731    let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
732
733    let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
734
735    // Apply storage options if provided
736    if let Some(options) = storage_options {
737        for (key, value) in options {
738            match key.as_str() {
739                "service_account_path" => {
740                    builder = builder.with_service_account_path(&value);
741                }
742                "service_account_key" => {
743                    builder = builder.with_service_account_key(&value);
744                }
745                "project_id" => {
746                    // Note: GoogleCloudStorageBuilder doesn't have with_project_id method
747                    // This would need to be handled via environment variables or service account
748                    log::warn!(
749                        "project_id should be set via service account or environment variables"
750                    );
751                }
752                "application_credentials" => {
753                    // Set GOOGLE_APPLICATION_CREDENTIALS env var required by Google auth libraries.
754                    // SAFETY: std::env::set_var is marked unsafe because it mutates global state and
755                    // can break signal-safe code. We only call it during configuration before any
756                    // multi-threaded work starts, so it is considered safe in this context.
757                    unsafe {
758                        std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
759                    }
760                }
761                _ => {
762                    // Ignore unknown options for forward compatibility
763                    log::warn!("Unknown GCS storage option: {key}");
764                }
765            }
766        }
767    }
768
769    let gcs_store = builder.build()?;
770    Ok((Arc::new(gcs_store), path, uri.to_string()))
771}
772
773/// Helper function to create Azure object store with options.
774#[cfg(feature = "cloud")]
775fn create_azure_store(
776    uri: &str,
777    storage_options: Option<AHashMap<String, String>>,
778) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
779    let (url, _) = parse_url_and_path(uri)?;
780    let container = extract_host(&url, "Invalid Azure URI: missing container")?;
781
782    let path = url.path().trim_start_matches('/').to_string();
783
784    let mut builder =
785        object_store::azure::MicrosoftAzureBuilder::new().with_container_name(container);
786
787    // Apply storage options if provided
788    if let Some(options) = storage_options {
789        for (key, value) in options {
790            match key.as_str() {
791                "account_name" => {
792                    builder = builder.with_account(&value);
793                }
794                "account_key" => {
795                    builder = builder.with_access_key(&value);
796                }
797                "sas_token" => {
798                    // Parse SAS token as query string parameters
799                    let query_pairs: Vec<(String, String)> = value
800                        .split('&')
801                        .filter_map(|pair| {
802                            let mut parts = pair.split('=');
803                            match (parts.next(), parts.next()) {
804                                (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
805                                _ => None,
806                            }
807                        })
808                        .collect();
809                    builder = builder.with_sas_authorization(query_pairs);
810                }
811                "client_id" => {
812                    builder = builder.with_client_id(&value);
813                }
814                "client_secret" => {
815                    builder = builder.with_client_secret(&value);
816                }
817                "tenant_id" => {
818                    builder = builder.with_tenant_id(&value);
819                }
820                _ => {
821                    // Ignore unknown options for forward compatibility
822                    log::warn!("Unknown Azure storage option: {key}");
823                }
824            }
825        }
826    }
827
828    let azure_store = builder.build()?;
829    Ok((Arc::new(azure_store), path, uri.to_string()))
830}
831
832/// Helper function to create Azure object store from abfs:// URI with options.
833#[cfg(feature = "cloud")]
834fn create_abfs_store(
835    uri: &str,
836    storage_options: Option<AHashMap<String, String>>,
837) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
838    let (url, path) = parse_url_and_path(uri)?;
839    let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
840
841    // Extract account from host (account.dfs.core.windows.net)
842    let account = host
843        .split('.')
844        .next()
845        .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
846
847    // Extract container from username part
848    let container = url
849        .username()
850        .split('@')
851        .next()
852        .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
853
854    let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
855        .with_account(account)
856        .with_container_name(container);
857
858    // Apply storage options if provided (same as Azure store)
859    if let Some(options) = storage_options {
860        for (key, value) in options {
861            match key.as_str() {
862                "account_name" => {
863                    builder = builder.with_account(&value);
864                }
865                "account_key" => {
866                    builder = builder.with_access_key(&value);
867                }
868                "sas_token" => {
869                    // Parse SAS token as query string parameters
870                    let query_pairs: Vec<(String, String)> = value
871                        .split('&')
872                        .filter_map(|pair| {
873                            let mut parts = pair.split('=');
874                            match (parts.next(), parts.next()) {
875                                (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
876                                _ => None,
877                            }
878                        })
879                        .collect();
880                    builder = builder.with_sas_authorization(query_pairs);
881                }
882                "client_id" => {
883                    builder = builder.with_client_id(&value);
884                }
885                "client_secret" => {
886                    builder = builder.with_client_secret(&value);
887                }
888                "tenant_id" => {
889                    builder = builder.with_tenant_id(&value);
890                }
891                _ => {
892                    // Ignore unknown options for forward compatibility
893                    log::warn!("Unknown ABFS storage option: {key}");
894                }
895            }
896        }
897    }
898
899    let azure_store = builder.build()?;
900    Ok((Arc::new(azure_store), path, uri.to_string()))
901}
902
903/// Helper function to create HTTP object store with options.
904#[cfg(feature = "cloud")]
905fn create_http_store(
906    uri: &str,
907    storage_options: Option<AHashMap<String, String>>,
908) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
909    let (_, path) = parse_url_and_path(uri)?;
910    let base_url = remote_store_root_url(uri)?
911        .as_str()
912        .trim_end_matches('/')
913        .to_string();
914
915    let builder = object_store::http::HttpBuilder::new().with_url(base_url);
916
917    // Apply storage options if provided
918    if let Some(options) = storage_options {
919        for (key, _value) in options {
920            // HTTP builder has limited configuration options
921            // Most HTTP-specific options would be handled via client options
922            // Ignore unknown options for forward compatibility
923            log::warn!("Unknown HTTP storage option: {key}");
924        }
925    }
926
927    let http_store = builder.build()?;
928    Ok((Arc::new(http_store), path, uri.to_string()))
929}
930
931/// Helper function to parse URL and extract path component.
932#[cfg(feature = "cloud")]
933fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
934    let url = url::Url::parse(uri)?;
935    let path = url.path().trim_start_matches('/').to_string();
936    Ok((url, path))
937}
938
939/// Helper function to extract host from URL with error handling.
940#[cfg(feature = "cloud")]
941fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
942    url.host_str()
943        .map(ToString::to_string)
944        .ok_or_else(|| anyhow::anyhow!("{error_msg}"))
945}
946
947#[cfg(test)]
948mod tests {
949    #[cfg(feature = "cloud")]
950    use ahash::AHashMap;
951    use rstest::rstest;
952
953    use super::*;
954
955    #[rstest]
956    fn test_create_object_store_from_path_local() {
957        // Create a temporary directory for testing
958        let temp_dir = std::env::temp_dir().join("nautilus_test");
959        std::fs::create_dir_all(&temp_dir).unwrap();
960
961        let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
962        if let Err(e) = &result {
963            println!("Error: {e:?}");
964        }
965        assert!(result.is_ok());
966        let (_, base_path, uri) = result.unwrap();
967        assert_eq!(base_path, "");
968        // The URI should be normalized to file:// format
969        assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
970
971        // Clean up
972        std::fs::remove_dir_all(&temp_dir).ok();
973    }
974
975    #[rstest]
976    #[cfg(feature = "cloud")]
977    fn test_create_object_store_from_path_s3() {
978        let mut options = AHashMap::new();
979        options.insert(
980            "endpoint_url".to_string(),
981            "https://test.endpoint.com".to_string(),
982        );
983        options.insert("region".to_string(), "us-west-2".to_string());
984        options.insert("access_key_id".to_string(), "test_key".to_string());
985        options.insert("secret_access_key".to_string(), "test_secret".to_string());
986
987        let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
988        assert!(result.is_ok());
989        let (_, base_path, uri) = result.unwrap();
990        assert_eq!(base_path, "path");
991        assert_eq!(uri, "s3://test-bucket/path");
992    }
993
994    #[rstest]
995    #[cfg(feature = "cloud")]
996    fn test_create_object_store_from_path_azure() {
997        let mut options = AHashMap::new();
998        options.insert("account_name".to_string(), "testaccount".to_string());
999        // Use a valid base64 encoded key for testing
1000        options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string()); // "testkey" in base64
1001
1002        let result = create_object_store_from_path("az://container/path", Some(options));
1003        if let Err(e) = &result {
1004            println!("Azure Error: {e:?}");
1005        }
1006        assert!(result.is_ok());
1007        let (_, base_path, uri) = result.unwrap();
1008        assert_eq!(base_path, "path");
1009        assert_eq!(uri, "az://container/path");
1010    }
1011
1012    #[rstest]
1013    #[cfg(feature = "cloud")]
1014    fn test_create_object_store_from_path_gcs() {
1015        // Test GCS without service account (will use default credentials or fail gracefully)
1016        let mut options = AHashMap::new();
1017        options.insert("project_id".to_string(), "test-project".to_string());
1018
1019        let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
1020        // GCS might fail due to missing credentials, but we're testing the path parsing
1021        // The function should at least parse the URI correctly before failing on auth
1022        match result {
1023            Ok((_, base_path, uri)) => {
1024                assert_eq!(base_path, "path");
1025                assert_eq!(uri, "gs://test-bucket/path");
1026            }
1027            Err(e) => {
1028                // Expected to fail due to missing credentials, but should contain bucket info
1029                let error_msg = format!("{e:?}");
1030                assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
1031            }
1032        }
1033    }
1034
1035    #[rstest]
1036    #[cfg(feature = "cloud")]
1037    fn test_create_object_store_from_path_empty_options() {
1038        let result = create_object_store_from_path("s3://test-bucket/path", None);
1039        assert!(result.is_ok());
1040        let (_, base_path, uri) = result.unwrap();
1041        assert_eq!(base_path, "path");
1042        assert_eq!(uri, "s3://test-bucket/path");
1043    }
1044
1045    #[rstest]
1046    #[cfg(feature = "cloud")]
1047    fn test_parse_url_and_path() {
1048        let result = parse_url_and_path("s3://bucket/path/to/file");
1049        assert!(result.is_ok());
1050        let (url, path) = result.unwrap();
1051        assert_eq!(url.scheme(), "s3");
1052        assert_eq!(url.host_str().unwrap(), "bucket");
1053        assert_eq!(path, "path/to/file");
1054    }
1055
1056    #[rstest]
1057    #[cfg(feature = "cloud")]
1058    fn test_remote_store_root_url_preserves_authority() {
1059        let https_root = remote_store_root_url("https://example.com:9000/base/path").unwrap();
1060        assert_eq!(
1061            https_root.as_str().trim_end_matches('/'),
1062            "https://example.com:9000"
1063        );
1064
1065        let abfs_root =
1066            remote_store_root_url("abfs://container@account.dfs.core.windows.net/base/path")
1067                .unwrap();
1068        assert_eq!(
1069            abfs_root.as_str().trim_end_matches('/'),
1070            "abfs://container@account.dfs.core.windows.net"
1071        );
1072
1073        let full_uri = remote_full_uri(
1074            "https://example.com:9000/base/path",
1075            "base/path/data/%5E/file.parquet",
1076        )
1077        .unwrap();
1078        assert_eq!(
1079            full_uri,
1080            "https://example.com:9000/base/path/data/%5E/file.parquet"
1081        );
1082
1083        let location = create_object_store_location_from_path("s3://test-bucket/path", None)
1084            .expect("S3 location should be created");
1085        assert_eq!(location.base_path, "path");
1086        assert_eq!(
1087            location
1088                .store_root_url()
1089                .expect("S3 should be remote")
1090                .as_str()
1091                .trim_end_matches('/'),
1092            "s3://test-bucket"
1093        );
1094    }
1095
1096    #[rstest]
1097    #[cfg(feature = "cloud")]
1098    fn test_extract_host() {
1099        let url = url::Url::parse("s3://test-bucket/path").unwrap();
1100        let result = extract_host(&url, "Test error");
1101        assert!(result.is_ok());
1102        assert_eq!(result.unwrap(), "test-bucket");
1103    }
1104
1105    #[rstest]
1106    fn test_normalize_path_to_uri() {
1107        // Unix absolute paths
1108        assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
1109
1110        // Windows drive paths
1111        assert_eq!(
1112            normalize_path_to_uri("C:\\tmp\\test"),
1113            "file:///C:/tmp/test"
1114        );
1115        assert_eq!(normalize_path_to_uri("C:/tmp/test"), "file:///C:/tmp/test");
1116        assert_eq!(
1117            normalize_path_to_uri("D:\\data\\file.txt"),
1118            "file:///D:/data/file.txt"
1119        );
1120
1121        // Windows UNC paths
1122        assert_eq!(
1123            normalize_path_to_uri("\\\\server\\share\\file"),
1124            "file://server/share/file"
1125        );
1126
1127        // Already URIs - should remain unchanged
1128        assert_eq!(
1129            normalize_path_to_uri("s3://bucket/path"),
1130            "s3://bucket/path"
1131        );
1132        assert_eq!(
1133            normalize_path_to_uri("file:///tmp/test"),
1134            "file:///tmp/test"
1135        );
1136        assert_eq!(
1137            normalize_path_to_uri("https://example.com/path"),
1138            "https://example.com/path"
1139        );
1140    }
1141
1142    #[rstest]
1143    fn test_is_absolute_path() {
1144        // Unix absolute paths
1145        assert!(is_absolute_path("/tmp/test"));
1146        assert!(is_absolute_path("/"));
1147
1148        // Windows drive paths
1149        assert!(is_absolute_path("C:\\tmp\\test"));
1150        assert!(is_absolute_path("C:/tmp/test"));
1151        assert!(is_absolute_path("D:\\"));
1152        assert!(is_absolute_path("Z:/"));
1153
1154        // Windows UNC paths
1155        assert!(is_absolute_path("\\\\server\\share"));
1156        assert!(is_absolute_path("\\\\localhost\\c$"));
1157
1158        // Relative paths
1159        assert!(!is_absolute_path("tmp/test"));
1160        assert!(!is_absolute_path("./test"));
1161        assert!(!is_absolute_path("../test"));
1162        assert!(!is_absolute_path("test.txt"));
1163
1164        // Edge cases
1165        assert!(!is_absolute_path(""));
1166        assert!(!is_absolute_path("C"));
1167        assert!(!is_absolute_path("C:"));
1168        assert!(!is_absolute_path("\\"));
1169    }
1170
1171    #[rstest]
1172    fn test_path_to_file_uri() {
1173        // Unix absolute paths
1174        assert_eq!(path_to_file_uri("/tmp/test"), "file:///tmp/test");
1175        assert_eq!(path_to_file_uri("/"), "file:///");
1176
1177        // Windows drive paths
1178        assert_eq!(path_to_file_uri("C:\\tmp\\test"), "file:///C:/tmp/test");
1179        assert_eq!(path_to_file_uri("C:/tmp/test"), "file:///C:/tmp/test");
1180        assert_eq!(path_to_file_uri("D:\\"), "file:///D:/");
1181
1182        // Windows UNC paths
1183        assert_eq!(
1184            path_to_file_uri("\\\\server\\share\\file"),
1185            "file://server/share/file"
1186        );
1187        assert_eq!(
1188            path_to_file_uri("\\\\localhost\\c$\\test"),
1189            "file://localhost/c$/test"
1190        );
1191    }
1192}