nautilus_persistence/backend/
custom.rs1use std::{collections::HashMap, sync::Arc};
23
24use datafusion::arrow::{
25 array::{Array, StringArray},
26 datatypes::{DataType as ArrowDataType, Field, Schema},
27 record_batch::RecordBatch,
28};
29use nautilus_core::UnixNanos;
30use nautilus_model::data::{
31 Bar, CustomData, CustomDataTrait, Data, IndexPriceUpdate, MarkPriceUpdate, OrderBookDelta,
32 OrderBookDepth10, QuoteTick, TradeTick, close::InstrumentClose, encode_custom_to_arrow,
33};
34use nautilus_serialization::arrow::DecodeDataFromRecordBatch;
35#[cfg(feature = "python")]
36use nautilus_serialization::arrow::custom::CustomDataDecoder;
37
38#[must_use]
41pub fn schema_with_data_type_column(base_schema: &Schema, type_name: &str) -> Schema {
42 let mut fields: Vec<_> = base_schema.fields().iter().cloned().collect();
43 fields.push(Arc::new(Field::new(
44 "data_type",
45 ArrowDataType::Utf8,
46 false,
47 )));
48 let mut meta = base_schema.metadata().clone();
49 meta.insert("type_name".to_string(), type_name.to_string());
50 Schema::new_with_metadata(fields, meta)
51}
52
53pub fn augment_batch_with_data_type_column(
60 batch: &RecordBatch,
61 data_type_json: &str,
62 type_name: &str,
63 dt_meta: Option<&HashMap<String, String>>,
64) -> anyhow::Result<RecordBatch> {
65 let num_rows = batch.num_rows();
66 let data_type_array: Arc<dyn Array> = Arc::new(StringArray::from(
67 (0..num_rows)
68 .map(|_| Some(data_type_json))
69 .collect::<Vec<_>>(),
70 ));
71 let schema = batch.schema();
72 let mut fields: Vec<_> = schema.fields().iter().cloned().collect();
73 fields.push(Arc::new(Field::new(
74 "data_type",
75 ArrowDataType::Utf8,
76 false,
77 )));
78 let mut meta = schema.metadata().clone();
79 meta.insert("type_name".to_string(), type_name.to_string());
80
81 if let Some(m) = dt_meta {
82 meta.extend(m.clone());
83 }
84 let new_schema = Arc::new(Schema::new_with_metadata(fields, meta));
85 let mut columns = batch.columns().to_vec();
86 columns.push(data_type_array);
87 let new_batch = RecordBatch::try_new(new_schema, columns)
88 .map_err(|e| anyhow::anyhow!("Failed to merge custom data type metadata: {e}"))?;
89 Ok(new_batch)
90}
91
92#[must_use]
95fn safe_directory_identifier(identifier: &str) -> String {
96 let normalized = identifier.replace("//", "/");
97 let segments: Vec<&str> = normalized
98 .split('/')
99 .filter(|s| !s.is_empty() && *s != "..")
100 .collect();
101 segments.join("/")
102}
103
104#[must_use]
107pub fn custom_data_path_components(type_name: &str, identifier: Option<&str>) -> Vec<String> {
108 let mut components = vec![
109 "data".to_string(),
110 "custom".to_string(),
111 type_name.to_string(),
112 ];
113
114 if let Some(id) = identifier {
115 let safe = safe_directory_identifier(id);
116 if !safe.is_empty() {
117 for segment in safe.split('/') {
118 components.push(segment.to_string());
119 }
120 }
121 }
122 components
123}
124
125pub fn prepare_custom_data_batch(
132 data: Vec<CustomData>,
133) -> anyhow::Result<(RecordBatch, String, Option<String>, UnixNanos, UnixNanos)> {
134 if data.is_empty() {
135 anyhow::bail!("prepare_custom_data_batch called with empty data");
136 }
137
138 let first_custom = data.first().unwrap();
139 let type_name = first_custom.data.type_name();
140 let identifier = first_custom.data_type.identifier().map(String::from);
141 let dt_meta = first_custom.data_type.metadata_string_map();
142 let data_type_json = first_custom
143 .data_type
144 .to_persistence_json()
145 .map_err(|e| anyhow::anyhow!("Failed to serialize data_type for persistence: {e}"))?;
146
147 let items: Vec<Arc<dyn CustomDataTrait>> =
148 data.into_iter().map(|c| Arc::clone(&c.data)).collect();
149 let first = items.first().unwrap();
150
151 let start_ts = first.ts_init();
152 let end_ts = items.last().unwrap().ts_init();
153
154 let batch = encode_custom_to_arrow(type_name, &items)
155 .map_err(|e| anyhow::anyhow!("Failed to encode custom data to Arrow: {e}"))?
156 .ok_or_else(|| {
157 anyhow::anyhow!(
158 "Custom data type \"{type_name}\" is not registered for Arrow encoding; \
159 call register_custom_data_class or ensure_custom_data_registered before writing"
160 )
161 })?;
162 let batch =
163 augment_batch_with_data_type_column(&batch, &data_type_json, type_name, dt_meta.as_ref())?;
164
165 Ok((batch, type_name.to_string(), identifier, start_ts, end_ts))
166}
167
168pub fn decode_batch_to_data(
178 metadata: &HashMap<String, String>,
179 batch: RecordBatch,
180 allow_custom_fallback: bool,
181) -> anyhow::Result<Vec<Data>> {
182 let type_name = metadata
183 .get("type_name")
184 .cloned()
185 .or_else(|| metadata.get("bar_type").map(|_| "bars".to_string()))
186 .ok_or_else(|| anyhow::anyhow!("Missing type_name in metadata"))?;
187
188 match type_name.as_str() {
189 "QuoteTick" | "quotes" => Ok(QuoteTick::decode_data_batch(metadata, batch)?),
190 "TradeTick" | "trades" => Ok(TradeTick::decode_data_batch(metadata, batch)?),
191 "Bar" | "bars" => Ok(Bar::decode_data_batch(metadata, batch)?),
192 "OrderBookDelta" | "order_book_deltas" => {
193 Ok(OrderBookDelta::decode_data_batch(metadata, batch)?)
194 }
195 "OrderBookDepth10" | "order_book_depths" => {
196 Ok(OrderBookDepth10::decode_data_batch(metadata, batch)?)
197 }
198 "MarkPriceUpdate" | "mark_price_updates" => {
199 Ok(MarkPriceUpdate::decode_data_batch(metadata, batch)?)
200 }
201 "IndexPriceUpdate" | "index_price_updates" => {
202 Ok(IndexPriceUpdate::decode_data_batch(metadata, batch)?)
203 }
204 "InstrumentClose" | "instrument_closes" => {
205 Ok(InstrumentClose::decode_data_batch(metadata, batch)?)
206 }
207 _ => {
208 if allow_custom_fallback {
209 #[cfg(feature = "python")]
210 {
211 Ok(CustomDataDecoder::decode_data_batch(metadata, batch)?)
212 }
213 #[cfg(not(feature = "python"))]
214 {
215 anyhow::bail!("Unknown data type: {type_name}")
216 }
217 } else {
218 anyhow::bail!(
219 "Unknown data type: {type_name}; custom decode only allowed in custom data context"
220 )
221 }
222 }
223 }
224}
225
226pub fn decode_custom_batches_to_data(
233 batches: Vec<RecordBatch>,
234 use_ts_event_for_ts_init: bool,
235) -> anyhow::Result<Vec<Data>> {
236 let mut file_data = Vec::new();
237 let schema = batches.first().map(|b| b.schema()).ok_or_else(|| {
238 anyhow::anyhow!("decode_custom_batches_to_data called with empty batches")
239 })?;
240
241 for mut batch in batches {
242 if use_ts_event_for_ts_init {
243 let column_names: Vec<String> =
244 schema.fields().iter().map(|f| f.name().clone()).collect();
245
246 if let (Some(ts_event_idx), Some(ts_init_idx)) = (
247 column_names.iter().position(|n| n == "ts_event"),
248 column_names.iter().position(|n| n == "ts_init"),
249 ) {
250 let mut new_columns = batch.columns().to_vec();
251 new_columns[ts_init_idx] = new_columns[ts_event_idx].clone();
252 batch = RecordBatch::try_new(schema.clone(), new_columns)
253 .map_err(|e| anyhow::anyhow!("Failed to create new batch: {e}"))?;
254 }
255 }
256 let metadata = batch.schema().metadata().clone();
257 let decoded = decode_batch_to_data(&metadata, batch, true)?;
258 file_data.extend(decoded);
259 }
260 Ok(file_data)
261}