Skip to main content

nautilus_persistence/backend/
custom.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this code except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Custom data persistence: shared helpers and orchestration.
17//!
18//! Centralizes the logic for appending the `data_type` column and metadata to Arrow batches
19//! (Parquet/Feather), and custom-data write preparation, path construction, and decode logic
20//! so the catalog delegates here instead of inlining custom-specific branching.
21
22use std::{collections::HashMap, sync::Arc};
23
24use datafusion::arrow::{
25    array::{Array, StringArray},
26    datatypes::{DataType as ArrowDataType, Field, Schema},
27    record_batch::RecordBatch,
28};
29use nautilus_core::UnixNanos;
30use nautilus_model::data::{
31    Bar, CustomData, CustomDataTrait, Data, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta,
32    OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose, encode_custom_to_arrow,
33};
34use nautilus_serialization::arrow::DecodeDataFromRecordBatch;
35#[cfg(feature = "python")]
36use nautilus_serialization::arrow::custom::CustomDataDecoder;
37
38/// Builds a schema that adds the `data_type` column and `type_name` metadata to a base schema.
39/// Used when creating a Feather buffer for custom data (single type per writer).
40#[must_use]
41pub fn schema_with_data_type_column(base_schema: &Schema, type_name: &str) -> Schema {
42    let mut fields: Vec<_> = base_schema.fields().iter().cloned().collect();
43    fields.push(Arc::new(Field::new(
44        "data_type",
45        ArrowDataType::Utf8,
46        false,
47    )));
48    let mut meta = base_schema.metadata().clone();
49    meta.insert("type_name".to_string(), type_name.to_string());
50    Schema::new_with_metadata(fields, meta)
51}
52
53/// Appends a `data_type` column (JSON string per row) and type_name + optional metadata to the
54/// batch schema. Used by both the Parquet catalog and Feather writer for catalog-compatible output.
55///
56/// # Errors
57///
58/// Returns an error if the new `RecordBatch` cannot be created.
59pub fn augment_batch_with_data_type_column(
60    batch: &RecordBatch,
61    data_type_json: &str,
62    type_name: &str,
63    dt_meta: Option<&HashMap<String, String>>,
64) -> anyhow::Result<RecordBatch> {
65    let num_rows = batch.num_rows();
66    let data_type_array: Arc<dyn Array> = Arc::new(StringArray::from(
67        (0..num_rows)
68            .map(|_| Some(data_type_json))
69            .collect::<Vec<_>>(),
70    ));
71    let schema = batch.schema();
72    let mut fields: Vec<_> = schema.fields().iter().cloned().collect();
73    fields.push(Arc::new(Field::new(
74        "data_type",
75        ArrowDataType::Utf8,
76        false,
77    )));
78    let mut meta = schema.metadata().clone();
79    meta.insert("type_name".to_string(), type_name.to_string());
80
81    if let Some(m) = dt_meta {
82        meta.extend(m.clone());
83    }
84    let new_schema = Arc::new(Schema::new_with_metadata(fields, meta));
85    let mut columns = batch.columns().to_vec();
86    columns.push(data_type_array);
87    let new_batch = RecordBatch::try_new(new_schema, columns)
88        .map_err(|e| anyhow::anyhow!("Failed to merge custom data type metadata: {e}"))?;
89    Ok(new_batch)
90}
91
92/// Normalizes a custom data identifier for use in directory paths.
93/// Replaces `//` with `/`, and filters out empty segments and `..` to prevent path traversal.
94#[must_use]
95fn safe_directory_identifier(identifier: &str) -> String {
96    let normalized = identifier.replace("//", "/");
97    let segments: Vec<&str> = normalized
98        .split('/')
99        .filter(|s| !s.is_empty() && *s != "..")
100        .collect();
101    segments.join("/")
102}
103
104/// Returns path components for custom data: `["data", "custom", type_name, ...identifier segments]`.
105/// Used by the catalog to build full object-store paths via `make_object_store_path_owned`.
106#[must_use]
107pub fn custom_data_path_components(type_name: &str, identifier: Option<&str>) -> Vec<String> {
108    let mut components = vec![
109        "data".to_string(),
110        "custom".to_string(),
111        type_name.to_string(),
112    ];
113
114    if let Some(id) = identifier {
115        let safe = safe_directory_identifier(id);
116        if !safe.is_empty() {
117            for segment in safe.split('/') {
118                components.push(segment.to_string());
119            }
120        }
121    }
122    components
123}
124
125/// Prepares a batch of custom data for writing: encodes to Arrow, augments with data_type column,
126/// and returns type identity and timestamp range so the catalog can build path and perform I/O.
127///
128/// # Errors
129///
130/// Returns an error if encoding or augmentation fails, or if the type is not registered.
131pub fn prepare_custom_data_batch(
132    data: Vec<CustomData>,
133) -> anyhow::Result<(RecordBatch, String, Option<String>, UnixNanos, UnixNanos)> {
134    if data.is_empty() {
135        anyhow::bail!("prepare_custom_data_batch called with empty data");
136    }
137
138    let first_custom = data.first().unwrap();
139    let type_name = first_custom.data.type_name();
140    let identifier = first_custom.data_type.identifier().map(String::from);
141    let dt_meta = first_custom.data_type.metadata_string_map();
142    let data_type_json = first_custom
143        .data_type
144        .to_persistence_json()
145        .map_err(|e| anyhow::anyhow!("Failed to serialize data_type for persistence: {e}"))?;
146
147    let items: Vec<Arc<dyn CustomDataTrait>> =
148        data.into_iter().map(|c| Arc::clone(&c.data)).collect();
149    let first = items.first().unwrap();
150
151    let start_ts = first.ts_init();
152    let end_ts = items.last().unwrap().ts_init();
153
154    let batch = encode_custom_to_arrow(type_name, &items)
155        .map_err(|e| anyhow::anyhow!("Failed to encode custom data to Arrow: {e}"))?
156        .ok_or_else(|| {
157            anyhow::anyhow!(
158                "Custom data type \"{type_name}\" is not registered for Arrow encoding; \
159                 call register_custom_data_class or ensure_custom_data_registered before writing"
160            )
161        })?;
162    let batch =
163        augment_batch_with_data_type_column(&batch, &data_type_json, type_name, dt_meta.as_ref())?;
164
165    Ok((batch, type_name.to_string(), identifier, start_ts, end_ts))
166}
167
168/// Decodes a RecordBatch to Data objects based on metadata.
169///
170/// Supports both standard data types and custom data types when `allow_custom_fallback`
171/// is true (e.g. when decoding files under `custom/`). When false, unknown type names
172/// produce an error instead of attempting custom decode.
173///
174/// # Errors
175///
176/// Returns an error if decoding fails or the type is unknown (and custom fallback not allowed).
177pub fn decode_batch_to_data(
178    metadata: &HashMap<String, String>,
179    batch: RecordBatch,
180    allow_custom_fallback: bool,
181) -> anyhow::Result<Vec<Data>> {
182    let type_name = metadata
183        .get("type_name")
184        .cloned()
185        .or_else(|| metadata.get("bar_type").map(|_| "bars".to_string()))
186        .ok_or_else(|| anyhow::anyhow!("Missing type_name in metadata"))?;
187
188    match type_name.as_str() {
189        "QuoteTick" | "quotes" => Ok(QuoteTick::decode_data_batch(metadata, batch)?),
190        "TradeTick" | "trades" => Ok(TradeTick::decode_data_batch(metadata, batch)?),
191        "Bar" | "bars" => Ok(Bar::decode_data_batch(metadata, batch)?),
192        "OrderBookDelta" | "order_book_deltas" => {
193            Ok(OrderBookDelta::decode_data_batch(metadata, batch)?)
194        }
195        "OrderBookDepth10" | "order_book_depths" => {
196            Ok(OrderBookDepth10::decode_data_batch(metadata, batch)?)
197        }
198        "MarkPriceUpdate" | "mark_price_updates" => {
199            Ok(MarkPriceUpdate::decode_data_batch(metadata, batch)?)
200        }
201        "IndexPriceUpdate" | "index_price_updates" => {
202            Ok(IndexPriceUpdate::decode_data_batch(metadata, batch)?)
203        }
204        "InstrumentClose" | "instrument_closes" => {
205            Ok(InstrumentClose::decode_data_batch(metadata, batch)?)
206        }
207        _ => {
208            if allow_custom_fallback {
209                #[cfg(feature = "python")]
210                {
211                    Ok(CustomDataDecoder::decode_data_batch(metadata, batch)?)
212                }
213                #[cfg(not(feature = "python"))]
214                {
215                    anyhow::bail!("Unknown data type: {type_name}")
216                }
217            } else {
218                anyhow::bail!(
219                    "Unknown data type: {type_name}; custom decode only allowed in custom data context"
220                )
221            }
222        }
223    }
224}
225
226/// Decodes multiple RecordBatches (e.g. from custom data files) into a single `Vec<Data>`.
227/// Optionally replaces `ts_init` column with `ts_event` before decoding each batch.
228///
229/// # Errors
230///
231/// Returns an error if any batch fails to decode.
232pub fn decode_custom_batches_to_data(
233    batches: Vec<RecordBatch>,
234    use_ts_event_for_ts_init: bool,
235) -> anyhow::Result<Vec<Data>> {
236    let mut file_data = Vec::new();
237    let schema = batches.first().map(|b| b.schema()).ok_or_else(|| {
238        anyhow::anyhow!("decode_custom_batches_to_data called with empty batches")
239    })?;
240
241    for mut batch in batches {
242        if use_ts_event_for_ts_init {
243            let column_names: Vec<String> =
244                schema.fields().iter().map(|f| f.name().clone()).collect();
245
246            if let (Some(ts_event_idx), Some(ts_init_idx)) = (
247                column_names.iter().position(|n| n == "ts_event"),
248                column_names.iter().position(|n| n == "ts_init"),
249            ) {
250                let mut new_columns = batch.columns().to_vec();
251                new_columns[ts_init_idx] = new_columns[ts_event_idx].clone();
252                batch = RecordBatch::try_new(schema.clone(), new_columns)
253                    .map_err(|e| anyhow::anyhow!("Failed to create new batch: {e}"))?;
254            }
255        }
256        let metadata = batch.schema().metadata().clone();
257        let decoded = decode_batch_to_data(&metadata, batch, true)?;
258        file_data.extend(decoded);
259    }
260    Ok(file_data)
261}