1use std::sync::Arc;
17
18use ahash::AHashMap;
19use arrow::record_batch::RecordBatch;
20use object_store::{ObjectStore, ObjectStoreExt, path::Path as ObjectPath};
21use parquet::{
22 arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
23 file::{
24 metadata::KeyValue,
25 properties::WriterProperties,
26 reader::{FileReader, SerializedFileReader},
27 statistics::Statistics,
28 },
29};
30use url::Url;
31
32pub(crate) fn is_remote_uri_scheme(scheme: &str) -> bool {
33 matches!(
34 scheme,
35 "s3" | "gs" | "gcs" | "az" | "abfs" | "http" | "https"
36 )
37}
38
39pub(crate) fn remote_store_root_url(uri: &str) -> anyhow::Result<Url> {
40 let mut url = Url::parse(uri)?;
41 url.set_path("");
42 url.set_query(None);
43 url.set_fragment(None);
44 Ok(url)
45}
46
47pub(crate) fn remote_full_uri(uri: &str, object_path: &str) -> anyhow::Result<String> {
48 let root = remote_store_root_url(uri)?;
49 let root = root.as_str().trim_end_matches('/');
50 let object_path = object_path.trim_start_matches('/');
51
52 if object_path.is_empty() {
53 Ok(root.to_string())
54 } else {
55 Ok(format!("{root}/{object_path}"))
56 }
57}
58
59pub(crate) enum ObjectStoreLocationKind {
60 Local,
61 Remote { store_root_url: Url },
62}
63
64pub(crate) struct ObjectStoreLocation {
65 pub object_store: Arc<dyn ObjectStore>,
66 pub base_path: String,
67 pub original_uri: String,
68 pub kind: ObjectStoreLocationKind,
69}
70
71impl ObjectStoreLocation {
72 pub(crate) fn store_root_url(&self) -> Option<&Url> {
73 match &self.kind {
74 ObjectStoreLocationKind::Local => None,
75 ObjectStoreLocationKind::Remote { store_root_url } => Some(store_root_url),
76 }
77 }
78}
79
80pub async fn write_batch_to_parquet(
86 batch: RecordBatch,
87 path: &str,
88 storage_options: Option<AHashMap<String, String>>,
89 compression: Option<parquet::basic::Compression>,
90 max_row_group_size: Option<usize>,
91) -> anyhow::Result<()> {
92 write_batches_to_parquet(
93 &[batch],
94 path,
95 storage_options,
96 compression,
97 max_row_group_size,
98 )
99 .await
100}
101
102pub async fn write_batches_to_parquet(
108 batches: &[RecordBatch],
109 path: &str,
110 storage_options: Option<AHashMap<String, String>>,
111 compression: Option<parquet::basic::Compression>,
112 max_row_group_size: Option<usize>,
113) -> anyhow::Result<()> {
114 let (object_store, base_path, _) = create_object_store_from_path(path, storage_options)?;
115 let object_path = if base_path.is_empty() {
116 ObjectPath::from(path)
117 } else {
118 ObjectPath::from(format!("{base_path}/{path}"))
119 };
120
121 write_batches_to_object_store(
122 batches,
123 object_store,
124 &object_path,
125 compression,
126 max_row_group_size,
127 None,
128 )
129 .await
130}
131
132pub async fn read_parquet_from_object_store(
141 object_store: Arc<dyn ObjectStore>,
142 path: &ObjectPath,
143) -> anyhow::Result<(Vec<RecordBatch>, Arc<arrow::datatypes::Schema>)> {
144 let result: object_store::GetResult = object_store.get(path).await?;
145 let data = result.bytes().await?;
146 if data.is_empty() {
147 return Ok((
148 Vec::new(),
149 Arc::new(arrow::datatypes::Schema::new(
150 Vec::<arrow::datatypes::Field>::new(),
151 )),
152 ));
153 }
154 let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
155 let schema = builder.schema().clone();
156 let reader = builder.build()?;
157 let mut batches = Vec::new();
158 for batch in reader {
159 batches.push(batch?);
160 }
161 Ok((batches, schema))
162}
163
164pub async fn write_batches_to_object_store(
171 batches: &[RecordBatch],
172 object_store: Arc<dyn ObjectStore>,
173 path: &ObjectPath,
174 compression: Option<parquet::basic::Compression>,
175 max_row_group_size: Option<usize>,
176 key_value_metadata: Option<Vec<KeyValue>>,
177) -> anyhow::Result<()> {
178 let mut buffer = Vec::new();
180
181 let mut props_builder = WriterProperties::builder()
182 .set_compression(compression.unwrap_or(parquet::basic::Compression::SNAPPY))
183 .set_max_row_group_row_count(Some(max_row_group_size.unwrap_or(5000)));
184
185 if let Some(kv) = key_value_metadata {
186 props_builder = props_builder.set_key_value_metadata(Some(kv));
187 }
188 let writer_props = props_builder.build();
189
190 let mut writer = ArrowWriter::try_new(&mut buffer, batches[0].schema(), Some(writer_props))?;
191 for batch in batches {
192 writer.write(batch)?;
193 }
194 writer.close()?;
195
196 object_store.put(path, buffer.into()).await?;
198
199 Ok(())
200}
201
202fn deduplicate_record_batches(batches: &[RecordBatch]) -> anyhow::Result<Vec<RecordBatch>> {
212 if batches.is_empty() {
213 return Ok(Vec::new());
214 }
215
216 let schema = batches[0].schema();
217
218 let fields: Vec<arrow_row::SortField> = schema
219 .fields()
220 .iter()
221 .map(|f| arrow_row::SortField::new(f.data_type().clone()))
222 .collect();
223
224 let converter = arrow_row::RowConverter::new(fields)?;
225 let mut seen: std::collections::HashSet<Vec<u8>> = std::collections::HashSet::new();
226 let mut result: Vec<RecordBatch> = Vec::new();
227
228 for batch in batches {
229 let rows = converter.convert_columns(batch.columns())?;
230 let mut indices: Vec<u32> = Vec::new();
231
232 for (i, row) in rows.iter().enumerate() {
233 if seen.insert(row.as_ref().to_vec()) {
234 indices.push(i as u32);
235 }
236 }
237
238 if !indices.is_empty() {
239 let index_array = arrow::array::UInt32Array::from(indices);
240 let deduped_columns: Vec<arrow::array::ArrayRef> = batch
241 .columns()
242 .iter()
243 .map(|col| arrow::compute::take(col.as_ref(), &index_array, None))
244 .collect::<Result<_, _>>()?;
245 result.push(RecordBatch::try_new(schema.clone(), deduped_columns)?);
246 }
247 }
248
249 Ok(result)
250}
251
252pub async fn combine_parquet_files(
258 file_paths: Vec<&str>,
259 new_file_path: &str,
260 storage_options: Option<AHashMap<String, String>>,
261 compression: Option<parquet::basic::Compression>,
262 max_row_group_size: Option<usize>,
263 deduplicate: Option<bool>,
264) -> anyhow::Result<()> {
265 if file_paths.len() <= 1 {
266 return Ok(());
267 }
268
269 let (object_store, base_path, _) =
271 create_object_store_from_path(file_paths[0], storage_options)?;
272
273 let object_paths: Vec<ObjectPath> = file_paths
275 .iter()
276 .map(|path| {
277 if base_path.is_empty() {
278 ObjectPath::from(*path)
279 } else {
280 ObjectPath::from(format!("{base_path}/{path}"))
281 }
282 })
283 .collect();
284
285 let new_object_path = if base_path.is_empty() {
286 ObjectPath::from(new_file_path)
287 } else {
288 ObjectPath::from(format!("{base_path}/{new_file_path}"))
289 };
290
291 combine_parquet_files_from_object_store(
292 object_store,
293 object_paths,
294 &new_object_path,
295 compression,
296 max_row_group_size,
297 deduplicate,
298 )
299 .await
300}
301
302pub async fn combine_parquet_files_from_object_store(
308 object_store: Arc<dyn ObjectStore>,
309 file_paths: Vec<ObjectPath>,
310 new_file_path: &ObjectPath,
311 compression: Option<parquet::basic::Compression>,
312 max_row_group_size: Option<usize>,
313 deduplicate: Option<bool>,
314) -> anyhow::Result<()> {
315 if file_paths.len() <= 1 {
316 return Ok(());
317 }
318
319 let mut all_batches: Vec<RecordBatch> = Vec::new();
320 let mut schema_with_metadata: Option<Arc<arrow::datatypes::Schema>> = None;
321
322 for path in &file_paths {
324 let result: object_store::GetResult = object_store.get(path).await?;
325 let data = result.bytes().await?;
326 let builder = ParquetRecordBatchReaderBuilder::try_new(data)?;
327
328 if schema_with_metadata.is_none() {
334 schema_with_metadata = Some(builder.schema().clone());
335 }
336
337 let mut reader = builder.build()?;
338
339 for batch in reader.by_ref() {
340 all_batches.push(batch?);
341 }
342 }
343
344 if let Some(schema) = &schema_with_metadata {
348 all_batches = all_batches
349 .into_iter()
350 .map(|b| {
351 RecordBatch::try_new(schema.clone(), b.columns().to_vec())
352 .expect("schema re-application failed")
353 })
354 .collect();
355 }
356
357 let batches_to_write = if deduplicate.unwrap_or(false) {
359 deduplicate_record_batches(&all_batches)?
360 } else {
361 all_batches
362 };
363
364 write_batches_to_object_store(
366 &batches_to_write,
367 object_store.clone(),
368 new_file_path,
369 compression,
370 max_row_group_size,
371 None,
372 )
373 .await?;
374
375 for path in &file_paths {
377 if path != new_file_path {
378 object_store.delete(path).await?;
379 }
380 }
381
382 Ok(())
383}
384
385pub async fn min_max_from_parquet_metadata(
391 file_path: &str,
392 storage_options: Option<AHashMap<String, String>>,
393 column_name: &str,
394) -> anyhow::Result<(u64, u64)> {
395 let (object_store, base_path, _) = create_object_store_from_path(file_path, storage_options)?;
396 let object_path = if base_path.is_empty() {
397 ObjectPath::from(file_path)
398 } else {
399 ObjectPath::from(format!("{base_path}/{file_path}"))
400 };
401
402 min_max_from_parquet_metadata_object_store(object_store, &object_path, column_name).await
403}
404
405pub async fn min_max_from_parquet_metadata_object_store(
411 object_store: Arc<dyn ObjectStore>,
412 file_path: &ObjectPath,
413 column_name: &str,
414) -> anyhow::Result<(u64, u64)> {
415 let result: object_store::GetResult = object_store.get(file_path).await?;
417 let data = result.bytes().await?;
418 let reader = SerializedFileReader::new(data)?;
419
420 let metadata = reader.metadata();
421 let mut overall_min_value: Option<i64> = None;
422 let mut overall_max_value: Option<i64> = None;
423
424 for i in 0..metadata.num_row_groups() {
426 let row_group = metadata.row_group(i);
427
428 for j in 0..row_group.num_columns() {
430 let col_metadata = row_group.column(j);
431
432 if col_metadata.column_path().string() == column_name {
433 if let Some(stats) = col_metadata.statistics() {
434 if let Statistics::Int64(int64_stats) = stats {
436 if let Some(&min_value) = int64_stats.min_opt()
438 && (overall_min_value.is_none()
439 || min_value < overall_min_value.unwrap())
440 {
441 overall_min_value = Some(min_value);
442 }
443
444 if let Some(&max_value) = int64_stats.max_opt()
446 && (overall_max_value.is_none()
447 || max_value > overall_max_value.unwrap())
448 {
449 overall_max_value = Some(max_value);
450 }
451 } else {
452 anyhow::bail!("Warning: Column name '{column_name}' is not of type i64.");
453 }
454 } else {
455 anyhow::bail!(
456 "Warning: Statistics not available for column '{column_name}' in row group {i}."
457 );
458 }
459 }
460 }
461 }
462
463 if let (Some(min), Some(max)) = (overall_min_value, overall_max_value) {
465 Ok((min as u64, max as u64))
466 } else {
467 anyhow::bail!(
468 "Column '{column_name}' not found or has no Int64 statistics in any row group."
469 )
470 }
471}
472
473pub fn create_object_store_from_path(
492 path: &str,
493 storage_options: Option<AHashMap<String, String>>,
494) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
495 let location = create_object_store_location_from_path(path, storage_options)?;
496 Ok((
497 location.object_store,
498 location.base_path,
499 location.original_uri,
500 ))
501}
502
503#[cfg_attr(
506 not(feature = "cloud"),
507 allow(unused_variables, clippy::needless_pass_by_value)
508)]
509pub(crate) fn create_object_store_location_from_path(
510 path: &str,
511 storage_options: Option<AHashMap<String, String>>,
512) -> anyhow::Result<ObjectStoreLocation> {
513 let uri = normalize_path_to_uri(path);
514
515 let (object_store, base_path, original_uri) = match uri.as_str() {
516 #[cfg(feature = "cloud")]
517 s if s.starts_with("s3://") => create_s3_store(&uri, storage_options),
518 #[cfg(feature = "cloud")]
519 s if s.starts_with("gs://") || s.starts_with("gcs://") => {
520 create_gcs_store(&uri, storage_options)
521 }
522 #[cfg(feature = "cloud")]
523 s if s.starts_with("az://") => create_azure_store(&uri, storage_options),
524 #[cfg(feature = "cloud")]
525 s if s.starts_with("abfs://") => create_abfs_store(&uri, storage_options),
526 #[cfg(feature = "cloud")]
527 s if s.starts_with("http://") || s.starts_with("https://") => {
528 create_http_store(&uri, storage_options)
529 }
530 #[cfg(not(feature = "cloud"))]
531 s if s.starts_with("s3://")
532 || s.starts_with("gs://")
533 || s.starts_with("gcs://")
534 || s.starts_with("az://")
535 || s.starts_with("abfs://")
536 || s.starts_with("http://")
537 || s.starts_with("https://") =>
538 {
539 anyhow::bail!("Cloud storage support requires the 'cloud' feature: {uri}")
540 }
541 s if s.starts_with("file://") => create_local_store(&uri, true),
542 _ => create_local_store(&uri, false), }?;
544
545 let kind = Url::parse(&original_uri)
546 .ok()
547 .filter(|url| is_remote_uri_scheme(url.scheme()))
548 .map(|_| {
549 remote_store_root_url(&original_uri)
550 .map(|store_root_url| ObjectStoreLocationKind::Remote { store_root_url })
551 })
552 .transpose()?
553 .unwrap_or(ObjectStoreLocationKind::Local);
554
555 Ok(ObjectStoreLocation {
556 object_store,
557 base_path,
558 original_uri,
559 kind,
560 })
561}
562
563#[must_use]
582pub fn normalize_path_to_uri(path: &str) -> String {
583 if path.contains("://") {
584 path.to_string()
586 } else {
587 if is_absolute_path(path) {
589 path_to_file_uri(path)
590 } else {
591 let absolute_path = std::env::current_dir().unwrap().join(path);
593 path_to_file_uri(&absolute_path.to_string_lossy())
594 }
595 }
596}
597
598#[must_use]
600fn is_absolute_path(path: &str) -> bool {
601 if path.starts_with('/') {
602 true
604 } else if path.len() >= 3
605 && path.chars().nth(1) == Some(':')
606 && path.chars().nth(2) == Some('\\')
607 {
608 true
610 } else if path.len() >= 3
611 && path.chars().nth(1) == Some(':')
612 && path.chars().nth(2) == Some('/')
613 {
614 true
616 } else if path.starts_with("\\\\") {
617 true
619 } else {
620 false
621 }
622}
623
624#[must_use]
626fn path_to_file_uri(path: &str) -> String {
627 if path.starts_with('/') {
628 format!("file://{path}")
630 } else if path.len() >= 3 && path.chars().nth(1) == Some(':') {
631 let normalized = path.replace('\\', "/");
633 format!("file:///{normalized}")
634 } else if let Some(without_prefix) = path.strip_prefix("\\\\") {
635 let normalized = without_prefix.replace('\\', "/");
637 format!("file://{normalized}")
638 } else {
639 format!("file://{path}")
641 }
642}
643
644#[cfg(windows)]
647pub(crate) fn file_uri_to_native_path(uri: &str) -> String {
648 let without_scheme = uri
649 .strip_prefix("file://")
650 .or_else(|| uri.strip_prefix("file:"))
651 .unwrap_or(uri);
652 let without_leading = without_scheme.trim_start_matches('/');
654 without_leading.replace('/', "\\")
655}
656
657#[cfg(not(windows))]
659pub(crate) fn file_uri_to_native_path(uri: &str) -> String {
660 uri.strip_prefix("file://").unwrap_or(uri).to_string()
661}
662
663fn create_local_store(
665 uri: &str,
666 is_file_uri: bool,
667) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
668 let path = if is_file_uri {
669 file_uri_to_native_path(uri)
670 } else {
671 uri.to_string()
672 };
673
674 let local_store = object_store::local::LocalFileSystem::new_with_prefix(&path)?;
675 Ok((Arc::new(local_store), String::new(), uri.to_string()))
676}
677
678#[cfg(feature = "cloud")]
680fn create_s3_store(
681 uri: &str,
682 storage_options: Option<AHashMap<String, String>>,
683) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
684 let (url, path) = parse_url_and_path(uri)?;
685 let bucket = extract_host(&url, "Invalid S3 URI: missing bucket")?;
686
687 let mut builder = object_store::aws::AmazonS3Builder::new().with_bucket_name(&bucket);
688
689 if let Some(options) = storage_options {
691 for (key, value) in options {
692 match key.as_str() {
693 "endpoint_url" => {
694 builder = builder.with_endpoint(&value);
695 }
696 "region" => {
697 builder = builder.with_region(&value);
698 }
699 "access_key_id" | "key" => {
700 builder = builder.with_access_key_id(&value);
701 }
702 "secret_access_key" | "secret" => {
703 builder = builder.with_secret_access_key(&value);
704 }
705 "session_token" | "token" => {
706 builder = builder.with_token(&value);
707 }
708 "allow_http" => {
709 let allow_http = value.to_lowercase() == "true";
710 builder = builder.with_allow_http(allow_http);
711 }
712 _ => {
713 log::warn!("Unknown S3 storage option: {key}");
715 }
716 }
717 }
718 }
719
720 let s3_store = builder.build()?;
721 Ok((Arc::new(s3_store), path, uri.to_string()))
722}
723
724#[cfg(feature = "cloud")]
726fn create_gcs_store(
727 uri: &str,
728 storage_options: Option<AHashMap<String, String>>,
729) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
730 let (url, path) = parse_url_and_path(uri)?;
731 let bucket = extract_host(&url, "Invalid GCS URI: missing bucket")?;
732
733 let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new().with_bucket_name(&bucket);
734
735 if let Some(options) = storage_options {
737 for (key, value) in options {
738 match key.as_str() {
739 "service_account_path" => {
740 builder = builder.with_service_account_path(&value);
741 }
742 "service_account_key" => {
743 builder = builder.with_service_account_key(&value);
744 }
745 "project_id" => {
746 log::warn!(
749 "project_id should be set via service account or environment variables"
750 );
751 }
752 "application_credentials" => {
753 unsafe {
758 std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", &value);
759 }
760 }
761 _ => {
762 log::warn!("Unknown GCS storage option: {key}");
764 }
765 }
766 }
767 }
768
769 let gcs_store = builder.build()?;
770 Ok((Arc::new(gcs_store), path, uri.to_string()))
771}
772
773#[cfg(feature = "cloud")]
775fn create_azure_store(
776 uri: &str,
777 storage_options: Option<AHashMap<String, String>>,
778) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
779 let (url, _) = parse_url_and_path(uri)?;
780 let container = extract_host(&url, "Invalid Azure URI: missing container")?;
781
782 let path = url.path().trim_start_matches('/').to_string();
783
784 let mut builder =
785 object_store::azure::MicrosoftAzureBuilder::new().with_container_name(container);
786
787 if let Some(options) = storage_options {
789 for (key, value) in options {
790 match key.as_str() {
791 "account_name" => {
792 builder = builder.with_account(&value);
793 }
794 "account_key" => {
795 builder = builder.with_access_key(&value);
796 }
797 "sas_token" => {
798 let query_pairs: Vec<(String, String)> = value
800 .split('&')
801 .filter_map(|pair| {
802 let mut parts = pair.split('=');
803 match (parts.next(), parts.next()) {
804 (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
805 _ => None,
806 }
807 })
808 .collect();
809 builder = builder.with_sas_authorization(query_pairs);
810 }
811 "client_id" => {
812 builder = builder.with_client_id(&value);
813 }
814 "client_secret" => {
815 builder = builder.with_client_secret(&value);
816 }
817 "tenant_id" => {
818 builder = builder.with_tenant_id(&value);
819 }
820 _ => {
821 log::warn!("Unknown Azure storage option: {key}");
823 }
824 }
825 }
826 }
827
828 let azure_store = builder.build()?;
829 Ok((Arc::new(azure_store), path, uri.to_string()))
830}
831
832#[cfg(feature = "cloud")]
834fn create_abfs_store(
835 uri: &str,
836 storage_options: Option<AHashMap<String, String>>,
837) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
838 let (url, path) = parse_url_and_path(uri)?;
839 let host = extract_host(&url, "Invalid ABFS URI: missing host")?;
840
841 let account = host
843 .split('.')
844 .next()
845 .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: cannot extract account from host"))?;
846
847 let container = url
849 .username()
850 .split('@')
851 .next()
852 .ok_or_else(|| anyhow::anyhow!("Invalid ABFS URI: missing container"))?;
853
854 let mut builder = object_store::azure::MicrosoftAzureBuilder::new()
855 .with_account(account)
856 .with_container_name(container);
857
858 if let Some(options) = storage_options {
860 for (key, value) in options {
861 match key.as_str() {
862 "account_name" => {
863 builder = builder.with_account(&value);
864 }
865 "account_key" => {
866 builder = builder.with_access_key(&value);
867 }
868 "sas_token" => {
869 let query_pairs: Vec<(String, String)> = value
871 .split('&')
872 .filter_map(|pair| {
873 let mut parts = pair.split('=');
874 match (parts.next(), parts.next()) {
875 (Some(key), Some(val)) => Some((key.to_string(), val.to_string())),
876 _ => None,
877 }
878 })
879 .collect();
880 builder = builder.with_sas_authorization(query_pairs);
881 }
882 "client_id" => {
883 builder = builder.with_client_id(&value);
884 }
885 "client_secret" => {
886 builder = builder.with_client_secret(&value);
887 }
888 "tenant_id" => {
889 builder = builder.with_tenant_id(&value);
890 }
891 _ => {
892 log::warn!("Unknown ABFS storage option: {key}");
894 }
895 }
896 }
897 }
898
899 let azure_store = builder.build()?;
900 Ok((Arc::new(azure_store), path, uri.to_string()))
901}
902
903#[cfg(feature = "cloud")]
905fn create_http_store(
906 uri: &str,
907 storage_options: Option<AHashMap<String, String>>,
908) -> anyhow::Result<(Arc<dyn ObjectStore>, String, String)> {
909 let (_, path) = parse_url_and_path(uri)?;
910 let base_url = remote_store_root_url(uri)?
911 .as_str()
912 .trim_end_matches('/')
913 .to_string();
914
915 let builder = object_store::http::HttpBuilder::new().with_url(base_url);
916
917 if let Some(options) = storage_options {
919 for (key, _value) in options {
920 log::warn!("Unknown HTTP storage option: {key}");
924 }
925 }
926
927 let http_store = builder.build()?;
928 Ok((Arc::new(http_store), path, uri.to_string()))
929}
930
931#[cfg(feature = "cloud")]
933fn parse_url_and_path(uri: &str) -> anyhow::Result<(url::Url, String)> {
934 let url = url::Url::parse(uri)?;
935 let path = url.path().trim_start_matches('/').to_string();
936 Ok((url, path))
937}
938
939#[cfg(feature = "cloud")]
941fn extract_host(url: &url::Url, error_msg: &str) -> anyhow::Result<String> {
942 url.host_str()
943 .map(ToString::to_string)
944 .ok_or_else(|| anyhow::anyhow!("{error_msg}"))
945}
946
947#[cfg(test)]
948mod tests {
949 #[cfg(feature = "cloud")]
950 use ahash::AHashMap;
951 use rstest::rstest;
952
953 use super::*;
954
955 #[rstest]
956 fn test_create_object_store_from_path_local() {
957 let temp_dir = std::env::temp_dir().join("nautilus_test");
959 std::fs::create_dir_all(&temp_dir).unwrap();
960
961 let result = create_object_store_from_path(temp_dir.to_str().unwrap(), None);
962 if let Err(e) = &result {
963 println!("Error: {e:?}");
964 }
965 assert!(result.is_ok());
966 let (_, base_path, uri) = result.unwrap();
967 assert_eq!(base_path, "");
968 assert_eq!(uri, format!("file://{}", temp_dir.to_str().unwrap()));
970
971 std::fs::remove_dir_all(&temp_dir).ok();
973 }
974
975 #[rstest]
976 #[cfg(feature = "cloud")]
977 fn test_create_object_store_from_path_s3() {
978 let mut options = AHashMap::new();
979 options.insert(
980 "endpoint_url".to_string(),
981 "https://test.endpoint.com".to_string(),
982 );
983 options.insert("region".to_string(), "us-west-2".to_string());
984 options.insert("access_key_id".to_string(), "test_key".to_string());
985 options.insert("secret_access_key".to_string(), "test_secret".to_string());
986
987 let result = create_object_store_from_path("s3://test-bucket/path", Some(options));
988 assert!(result.is_ok());
989 let (_, base_path, uri) = result.unwrap();
990 assert_eq!(base_path, "path");
991 assert_eq!(uri, "s3://test-bucket/path");
992 }
993
994 #[rstest]
995 #[cfg(feature = "cloud")]
996 fn test_create_object_store_from_path_azure() {
997 let mut options = AHashMap::new();
998 options.insert("account_name".to_string(), "testaccount".to_string());
999 options.insert("account_key".to_string(), "dGVzdGtleQ==".to_string()); let result = create_object_store_from_path("az://container/path", Some(options));
1003 if let Err(e) = &result {
1004 println!("Azure Error: {e:?}");
1005 }
1006 assert!(result.is_ok());
1007 let (_, base_path, uri) = result.unwrap();
1008 assert_eq!(base_path, "path");
1009 assert_eq!(uri, "az://container/path");
1010 }
1011
1012 #[rstest]
1013 #[cfg(feature = "cloud")]
1014 fn test_create_object_store_from_path_gcs() {
1015 let mut options = AHashMap::new();
1017 options.insert("project_id".to_string(), "test-project".to_string());
1018
1019 let result = create_object_store_from_path("gs://test-bucket/path", Some(options));
1020 match result {
1023 Ok((_, base_path, uri)) => {
1024 assert_eq!(base_path, "path");
1025 assert_eq!(uri, "gs://test-bucket/path");
1026 }
1027 Err(e) => {
1028 let error_msg = format!("{e:?}");
1030 assert!(error_msg.contains("test-bucket") || error_msg.contains("credential"));
1031 }
1032 }
1033 }
1034
1035 #[rstest]
1036 #[cfg(feature = "cloud")]
1037 fn test_create_object_store_from_path_empty_options() {
1038 let result = create_object_store_from_path("s3://test-bucket/path", None);
1039 assert!(result.is_ok());
1040 let (_, base_path, uri) = result.unwrap();
1041 assert_eq!(base_path, "path");
1042 assert_eq!(uri, "s3://test-bucket/path");
1043 }
1044
1045 #[rstest]
1046 #[cfg(feature = "cloud")]
1047 fn test_parse_url_and_path() {
1048 let result = parse_url_and_path("s3://bucket/path/to/file");
1049 assert!(result.is_ok());
1050 let (url, path) = result.unwrap();
1051 assert_eq!(url.scheme(), "s3");
1052 assert_eq!(url.host_str().unwrap(), "bucket");
1053 assert_eq!(path, "path/to/file");
1054 }
1055
1056 #[rstest]
1057 #[cfg(feature = "cloud")]
1058 fn test_remote_store_root_url_preserves_authority() {
1059 let https_root = remote_store_root_url("https://example.com:9000/base/path").unwrap();
1060 assert_eq!(
1061 https_root.as_str().trim_end_matches('/'),
1062 "https://example.com:9000"
1063 );
1064
1065 let abfs_root =
1066 remote_store_root_url("abfs://container@account.dfs.core.windows.net/base/path")
1067 .unwrap();
1068 assert_eq!(
1069 abfs_root.as_str().trim_end_matches('/'),
1070 "abfs://container@account.dfs.core.windows.net"
1071 );
1072
1073 let full_uri = remote_full_uri(
1074 "https://example.com:9000/base/path",
1075 "base/path/data/%5E/file.parquet",
1076 )
1077 .unwrap();
1078 assert_eq!(
1079 full_uri,
1080 "https://example.com:9000/base/path/data/%5E/file.parquet"
1081 );
1082
1083 let location = create_object_store_location_from_path("s3://test-bucket/path", None)
1084 .expect("S3 location should be created");
1085 assert_eq!(location.base_path, "path");
1086 assert_eq!(
1087 location
1088 .store_root_url()
1089 .expect("S3 should be remote")
1090 .as_str()
1091 .trim_end_matches('/'),
1092 "s3://test-bucket"
1093 );
1094 }
1095
1096 #[rstest]
1097 #[cfg(feature = "cloud")]
1098 fn test_extract_host() {
1099 let url = url::Url::parse("s3://test-bucket/path").unwrap();
1100 let result = extract_host(&url, "Test error");
1101 assert!(result.is_ok());
1102 assert_eq!(result.unwrap(), "test-bucket");
1103 }
1104
1105 #[rstest]
1106 fn test_normalize_path_to_uri() {
1107 assert_eq!(normalize_path_to_uri("/tmp/test"), "file:///tmp/test");
1109
1110 assert_eq!(
1112 normalize_path_to_uri("C:\\tmp\\test"),
1113 "file:///C:/tmp/test"
1114 );
1115 assert_eq!(normalize_path_to_uri("C:/tmp/test"), "file:///C:/tmp/test");
1116 assert_eq!(
1117 normalize_path_to_uri("D:\\data\\file.txt"),
1118 "file:///D:/data/file.txt"
1119 );
1120
1121 assert_eq!(
1123 normalize_path_to_uri("\\\\server\\share\\file"),
1124 "file://server/share/file"
1125 );
1126
1127 assert_eq!(
1129 normalize_path_to_uri("s3://bucket/path"),
1130 "s3://bucket/path"
1131 );
1132 assert_eq!(
1133 normalize_path_to_uri("file:///tmp/test"),
1134 "file:///tmp/test"
1135 );
1136 assert_eq!(
1137 normalize_path_to_uri("https://example.com/path"),
1138 "https://example.com/path"
1139 );
1140 }
1141
1142 #[rstest]
1143 fn test_is_absolute_path() {
1144 assert!(is_absolute_path("/tmp/test"));
1146 assert!(is_absolute_path("/"));
1147
1148 assert!(is_absolute_path("C:\\tmp\\test"));
1150 assert!(is_absolute_path("C:/tmp/test"));
1151 assert!(is_absolute_path("D:\\"));
1152 assert!(is_absolute_path("Z:/"));
1153
1154 assert!(is_absolute_path("\\\\server\\share"));
1156 assert!(is_absolute_path("\\\\localhost\\c$"));
1157
1158 assert!(!is_absolute_path("tmp/test"));
1160 assert!(!is_absolute_path("./test"));
1161 assert!(!is_absolute_path("../test"));
1162 assert!(!is_absolute_path("test.txt"));
1163
1164 assert!(!is_absolute_path(""));
1166 assert!(!is_absolute_path("C"));
1167 assert!(!is_absolute_path("C:"));
1168 assert!(!is_absolute_path("\\"));
1169 }
1170
1171 #[rstest]
1172 fn test_path_to_file_uri() {
1173 assert_eq!(path_to_file_uri("/tmp/test"), "file:///tmp/test");
1175 assert_eq!(path_to_file_uri("/"), "file:///");
1176
1177 assert_eq!(path_to_file_uri("C:\\tmp\\test"), "file:///C:/tmp/test");
1179 assert_eq!(path_to_file_uri("C:/tmp/test"), "file:///C:/tmp/test");
1180 assert_eq!(path_to_file_uri("D:\\"), "file:///D:/");
1181
1182 assert_eq!(
1184 path_to_file_uri("\\\\server\\share\\file"),
1185 "file://server/share/file"
1186 );
1187 assert_eq!(
1188 path_to_file_uri("\\\\localhost\\c$\\test"),
1189 "file://localhost/c$/test"
1190 );
1191 }
1192}