Skip to main content

nautilus_serialization/arrow/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Defines the Apache Arrow schema for Nautilus types.
17
18pub mod account_state;
19pub mod bar;
20pub mod close;
21pub mod custom;
22pub mod delta;
23pub mod depth;
24#[cfg(feature = "display")]
25pub mod display;
26pub mod funding;
27pub mod index_price;
28pub mod instrument;
29pub mod instrument_status;
30pub mod json;
31pub mod mark_price;
32pub mod order_event;
33pub mod position_event;
34pub mod quote;
35pub mod report;
36pub mod snapshot;
37pub mod trade;
38
39use std::{
40    collections::HashMap,
41    io::{self, Write},
42};
43
44use arrow::{
45    array::{Array, ArrayRef, FixedSizeBinaryArray, StringArray, StringViewArray},
46    datatypes::{DataType, Schema},
47    error::ArrowError,
48    ipc::writer::StreamWriter,
49    record_batch::RecordBatch,
50};
51use nautilus_model::{
52    data::{
53        Data, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate, bar::Bar,
54        close::InstrumentClose, delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick,
55        trade::TradeTick,
56    },
57    types::{
58        PRICE_ERROR, PRICE_UNDEF, Price, QUANTITY_UNDEF, Quantity,
59        fixed::{PRECISION_BYTES, correct_price_raw, correct_quantity_raw},
60        price::PriceRaw,
61        quantity::QuantityRaw,
62    },
63};
64#[cfg(feature = "python")]
65use pyo3::prelude::*;
66
67// Define metadata key constants constants
68const KEY_BAR_TYPE: &str = "bar_type";
69pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
70pub const KEY_PRICE_PRECISION: &str = "price_precision";
71pub const KEY_SIZE_PRECISION: &str = "size_precision";
72
73#[derive(thiserror::Error, Debug)]
74pub enum DataStreamingError {
75    #[error("I/O error: {0}")]
76    IoError(#[from] io::Error),
77    #[error("Arrow error: {0}")]
78    ArrowError(#[from] arrow::error::ArrowError),
79    #[cfg(feature = "python")]
80    #[error("Python error: {0}")]
81    PythonError(#[from] PyErr),
82}
83
84#[derive(thiserror::Error, Debug)]
85pub enum EncodingError {
86    #[error("Empty data")]
87    EmptyData,
88    #[error("Missing metadata key: `{0}`")]
89    MissingMetadata(&'static str),
90    #[error("Missing data column: `{0}` at index {1}")]
91    MissingColumn(&'static str, usize),
92    #[error("Error parsing `{0}`: {1}")]
93    ParseError(&'static str, String),
94    #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
95    InvalidColumnType(&'static str, usize, DataType, DataType),
96    #[error(
97        "Precision mode mismatch for `{field}`: catalog data has {actual_bytes} byte values, \
98         but this build expects {expected_bytes} bytes. The catalog was created with a different \
99         precision mode (standard=8 bytes, high=16 bytes). Rebuild the catalog or change your \
100         build's precision mode. See: https://nautilustrader.io/docs/latest/getting_started/installation#precision-mode"
101    )]
102    PrecisionMismatch {
103        field: &'static str,
104        expected_bytes: i32,
105        actual_bytes: i32,
106    },
107    #[error("Arrow error: {0}")]
108    ArrowError(#[from] arrow::error::ArrowError),
109}
110
111#[inline]
112fn get_raw_price(bytes: &[u8]) -> PriceRaw {
113    PriceRaw::from_le_bytes(
114        bytes
115            .try_into()
116            .expect("Price raw bytes must be exactly the size of PriceRaw"),
117    )
118}
119
120#[inline]
121fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
122    QuantityRaw::from_le_bytes(
123        bytes
124            .try_into()
125            .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
126    )
127}
128
129/// Gets raw price bytes and corrects for floating-point precision errors in stored data.
130///
131/// Data from catalogs may have been created with `int(value * FIXED_SCALAR)` which
132/// introduces floating-point errors. This corrects the raw value to the nearest valid
133/// multiple of the scale factor for the given precision.
134///
135/// Sentinel values (`PRICE_UNDEF`, `PRICE_ERROR`) are preserved unchanged.
136#[inline]
137fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
138    let raw = get_raw_price(bytes);
139
140    // Preserve sentinel values unchanged
141    if raw == PRICE_UNDEF || raw == PRICE_ERROR {
142        return raw;
143    }
144
145    correct_price_raw(raw, precision)
146}
147
148/// Gets raw quantity bytes and corrects for floating-point precision errors in stored data.
149///
150/// Data from catalogs may have been created with `int(value * FIXED_SCALAR)` which
151/// introduces floating-point errors. This corrects the raw value to the nearest valid
152/// multiple of the scale factor for the given precision.
153///
154/// Sentinel values (`QUANTITY_UNDEF`) are preserved unchanged.
155#[inline]
156fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
157    let raw = get_raw_quantity(bytes);
158
159    // Preserve sentinel values unchanged
160    if raw == QUANTITY_UNDEF {
161        return raw;
162    }
163
164    correct_quantity_raw(raw, precision)
165}
166
167/// Decodes a [`Price`] from raw bytes with bounds validation.
168///
169/// Uses corrected raw values to handle floating-point precision errors in stored data.
170/// Sentinel values (`PRICE_UNDEF`, `PRICE_ERROR`) are preserved unchanged.
171///
172/// # Errors
173///
174/// Returns an [`EncodingError::ParseError`] if the price value is out of bounds.
175pub fn decode_price(
176    bytes: &[u8],
177    precision: u8,
178    field: &'static str,
179    row: usize,
180) -> Result<Price, EncodingError> {
181    let raw = get_corrected_raw_price(bytes, precision);
182    Price::from_raw_checked(raw, precision)
183        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
184}
185
186/// Decodes a [`Quantity`] from raw bytes with bounds validation.
187///
188/// Uses corrected raw values to handle floating-point precision errors in stored data.
189/// Sentinel values (`QUANTITY_UNDEF`) are preserved unchanged.
190///
191/// # Errors
192///
193/// Returns an [`EncodingError::ParseError`] if the quantity value is out of bounds.
194pub fn decode_quantity(
195    bytes: &[u8],
196    precision: u8,
197    field: &'static str,
198    row: usize,
199) -> Result<Quantity, EncodingError> {
200    let raw = get_corrected_raw_quantity(bytes, precision);
201    Quantity::from_raw_checked(raw, precision)
202        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
203}
204
205/// Decodes a [`Price`] from raw bytes, using precision 0 for sentinel values.
206///
207/// For order book data where sentinel values indicate empty levels.
208///
209/// # Errors
210///
211/// Returns an [`EncodingError::ParseError`] if the price value is out of bounds.
212pub fn decode_price_with_sentinel(
213    bytes: &[u8],
214    precision: u8,
215    field: &'static str,
216    row: usize,
217) -> Result<Price, EncodingError> {
218    let raw = get_raw_price(bytes);
219    let (final_raw, final_precision) = if raw == PRICE_UNDEF {
220        (raw, 0)
221    } else {
222        (get_corrected_raw_price(bytes, precision), precision)
223    };
224    Price::from_raw_checked(final_raw, final_precision)
225        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
226}
227
228/// Decodes a [`Quantity`] from raw bytes, using precision 0 for sentinel values.
229///
230/// For order book data where sentinel values indicate empty levels.
231///
232/// # Errors
233///
234/// Returns an [`EncodingError::ParseError`] if the quantity value is out of bounds.
235pub fn decode_quantity_with_sentinel(
236    bytes: &[u8],
237    precision: u8,
238    field: &'static str,
239    row: usize,
240) -> Result<Quantity, EncodingError> {
241    let raw = get_raw_quantity(bytes);
242    let (final_raw, final_precision) = if raw == QUANTITY_UNDEF {
243        (raw, 0)
244    } else {
245        (get_corrected_raw_quantity(bytes, precision), precision)
246    };
247    Quantity::from_raw_checked(final_raw, final_precision)
248        .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
249}
250
251/// Provides Apache Arrow schema definitions for data types.
252pub trait ArrowSchemaProvider {
253    /// Returns the Arrow schema for this type with optional metadata.
254    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
255
256    /// Returns a map of field names to their Arrow data types.
257    #[must_use]
258    fn get_schema_map() -> HashMap<String, String> {
259        let schema = Self::get_schema(None);
260        let mut map = HashMap::new();
261
262        for field in schema.fields() {
263            let name = field.name().clone();
264            let data_type = format!("{:?}", field.data_type());
265            map.insert(name, data_type);
266        }
267        map
268    }
269}
270
271/// Encodes data types to Apache Arrow RecordBatch format.
272pub trait EncodeToRecordBatch
273where
274    Self: Sized + ArrowSchemaProvider,
275{
276    /// Encodes a batch of values into an Arrow `RecordBatch` using the provided metadata.
277    ///
278    /// # Errors
279    ///
280    /// Returns an `ArrowError` if the encoding fails.
281    fn encode_batch(
282        metadata: &HashMap<String, String>,
283        data: &[Self],
284    ) -> Result<RecordBatch, ArrowError>;
285
286    /// Returns the metadata for this data element.
287    fn metadata(&self) -> HashMap<String, String>;
288
289    /// Returns the metadata for the first element in a chunk.
290    ///
291    /// # Panics
292    ///
293    /// Panics if `chunk` is empty.
294    fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
295        chunk
296            .first()
297            .map(Self::metadata)
298            .expect("Chunk must have at least one element to encode")
299    }
300}
301
302/// Decodes data types from Apache Arrow RecordBatch format.
303pub trait DecodeFromRecordBatch
304where
305    Self: Sized + Into<Data> + ArrowSchemaProvider,
306{
307    /// Decodes a `RecordBatch` into a vector of values of the implementing type, using the provided metadata.
308    ///
309    /// # Errors
310    ///
311    /// Returns an `EncodingError` if the decoding fails.
312    fn decode_batch(
313        metadata: &HashMap<String, String>,
314        record_batch: RecordBatch,
315    ) -> Result<Vec<Self>, EncodingError>;
316}
317
318/// Decodes strongly typed values from Apache Arrow RecordBatch format.
319pub trait DecodeTypedFromRecordBatch
320where
321    Self: Sized + ArrowSchemaProvider,
322{
323    /// Decodes a `RecordBatch` into a vector of values of the implementing type.
324    ///
325    /// # Errors
326    ///
327    /// Returns an `EncodingError` if the decoding fails.
328    fn decode_typed_batch(
329        metadata: &HashMap<String, String>,
330        record_batch: RecordBatch,
331    ) -> Result<Vec<Self>, EncodingError>;
332}
333
334impl<T> DecodeTypedFromRecordBatch for T
335where
336    T: DecodeFromRecordBatch,
337{
338    fn decode_typed_batch(
339        metadata: &HashMap<String, String>,
340        record_batch: RecordBatch,
341    ) -> Result<Vec<Self>, EncodingError> {
342        Self::decode_batch(metadata, record_batch)
343    }
344}
345
346/// Decodes raw Data objects from Apache Arrow RecordBatch format.
347pub trait DecodeDataFromRecordBatch
348where
349    Self: Sized + ArrowSchemaProvider,
350{
351    /// Decodes a `RecordBatch` into raw `Data` values, using the provided metadata.
352    ///
353    /// # Errors
354    ///
355    /// Returns an `EncodingError` if the decoding fails.
356    fn decode_data_batch(
357        metadata: &HashMap<String, String>,
358        record_batch: RecordBatch,
359    ) -> Result<Vec<Data>, EncodingError>;
360}
361
362/// Writes RecordBatch data to output streams.
363pub trait WriteStream {
364    /// Writes a `RecordBatch` to the implementing output stream.
365    ///
366    /// # Errors
367    ///
368    /// Returns a `DataStreamingError` if writing or finishing the stream fails.
369    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
370}
371
372impl<T: Write> WriteStream for T {
373    fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
374        let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
375        writer.write(record_batch)?;
376        writer.finish()?;
377        Ok(())
378    }
379}
380
381/// Extracts a string column, accepting both Utf8 (`StringArray`) and Utf8View (`StringViewArray`).
382/// Parquet may return Utf8View when reading, so this handles both formats.
383///
384/// # Errors
385///
386/// Returns an error if:
387/// - `column_index` is out of range: `EncodingError::MissingColumn`.
388/// - The column type is neither Utf8 nor Utf8View: `EncodingError::InvalidColumnType`.
389pub fn extract_column_string<'a>(
390    cols: &'a [ArrayRef],
391    column_key: &'static str,
392    column_index: usize,
393) -> Result<StringColumnRef<'a>, EncodingError> {
394    let column_values = cols
395        .get(column_index)
396        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
397    let dt = column_values.data_type();
398    if let Some(arr) = column_values.as_any().downcast_ref::<StringArray>() {
399        Ok(StringColumnRef::Utf8(arr))
400    } else if let Some(arr) = column_values.as_any().downcast_ref::<StringViewArray>() {
401        Ok(StringColumnRef::Utf8View(arr))
402    } else {
403        Err(EncodingError::InvalidColumnType(
404            column_key,
405            column_index,
406            DataType::Utf8,
407            dt.clone(),
408        ))
409    }
410}
411
412/// Reference to a string column, either Utf8 or Utf8View.
413#[derive(Debug)]
414pub enum StringColumnRef<'a> {
415    Utf8(&'a StringArray),
416    Utf8View(&'a StringViewArray),
417}
418
419impl StringColumnRef<'_> {
420    /// Returns the string value at row `i`.
421    #[inline]
422    #[must_use]
423    pub fn value(&self, i: usize) -> &str {
424        match self {
425            Self::Utf8(arr) => arr.value(i),
426            Self::Utf8View(arr) => arr.value(i),
427        }
428    }
429}
430
431/// Extracts and downcasts the specified `column_key` column from an Arrow array slice.
432///
433/// # Errors
434///
435/// Returns an error if:
436/// - `column_index` is out of range: `EncodingError::MissingColumn`.
437/// - The column type does not match `expected_type`: `EncodingError::InvalidColumnType`.
438pub fn extract_column<'a, T: Array + 'static>(
439    cols: &'a [ArrayRef],
440    column_key: &'static str,
441    column_index: usize,
442    expected_type: DataType,
443) -> Result<&'a T, EncodingError> {
444    let column_values = cols
445        .get(column_index)
446        .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
447    let downcasted_values =
448        column_values
449            .as_any()
450            .downcast_ref::<T>()
451            .ok_or(EncodingError::InvalidColumnType(
452                column_key,
453                column_index,
454                expected_type,
455                column_values.data_type().clone(),
456            ))?;
457    Ok(downcasted_values)
458}
459
460/// Validates that a [`FixedSizeBinaryArray`] has the expected precision byte width.
461///
462/// This detects precision mode mismatches that occur when catalog data was encoded
463/// with a different precision mode (64-bit standard vs 128-bit high-precision).
464///
465/// # Errors
466///
467/// Returns [`EncodingError::PrecisionMismatch`] if the actual byte width doesn't
468/// match [`PRECISION_BYTES`].
469pub fn validate_precision_bytes(
470    array: &FixedSizeBinaryArray,
471    field: &'static str,
472) -> Result<(), EncodingError> {
473    let actual = array.value_length();
474    if actual != PRECISION_BYTES {
475        return Err(EncodingError::PrecisionMismatch {
476            field,
477            expected_bytes: PRECISION_BYTES,
478            actual_bytes: actual,
479        });
480    }
481    Ok(())
482}
483
484/// Converts a vector of `OrderBookDelta` into an Arrow `RecordBatch`.
485///
486/// # Errors
487///
488/// Returns an error if:
489/// - `data` is empty: `EncodingError::EmptyData`.
490/// - Encoding fails: `EncodingError::ArrowError`.
491pub fn book_deltas_to_arrow_record_batch_bytes(
492    data: &[OrderBookDelta],
493) -> Result<RecordBatch, EncodingError> {
494    if data.is_empty() {
495        return Err(EncodingError::EmptyData);
496    }
497
498    // Extract metadata from chunk
499    let metadata = OrderBookDelta::chunk_metadata(data);
500    OrderBookDelta::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
501}
502
503/// Converts a vector of `OrderBookDepth10` into an Arrow `RecordBatch`.
504///
505/// # Errors
506///
507/// Returns an error if:
508/// - `data` is empty: `EncodingError::EmptyData`.
509/// - Encoding fails: `EncodingError::ArrowError`.
510#[expect(clippy::missing_panics_doc)] // Guarded by empty check
511pub fn book_depth10_to_arrow_record_batch_bytes(
512    data: &[OrderBookDepth10],
513) -> Result<RecordBatch, EncodingError> {
514    if data.is_empty() {
515        return Err(EncodingError::EmptyData);
516    }
517
518    // Take first element and extract metadata
519    let first = data.first().unwrap();
520    let metadata = first.metadata();
521    OrderBookDepth10::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
522}
523
524/// Converts a vector of `QuoteTick` into an Arrow `RecordBatch`.
525///
526/// # Errors
527///
528/// Returns an error if:
529/// - `data` is empty: `EncodingError::EmptyData`.
530/// - Encoding fails: `EncodingError::ArrowError`.
531#[expect(clippy::missing_panics_doc)] // Guarded by empty check
532pub fn quotes_to_arrow_record_batch_bytes(
533    data: &[QuoteTick],
534) -> Result<RecordBatch, EncodingError> {
535    if data.is_empty() {
536        return Err(EncodingError::EmptyData);
537    }
538
539    // Take first element and extract metadata
540    let first = data.first().unwrap();
541    let metadata = first.metadata();
542    QuoteTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
543}
544
545/// Converts a vector of `TradeTick` into an Arrow `RecordBatch`.
546///
547/// # Errors
548///
549/// Returns an error if:
550/// - `data` is empty: `EncodingError::EmptyData`.
551/// - Encoding fails: `EncodingError::ArrowError`.
552#[expect(clippy::missing_panics_doc)] // Guarded by empty check
553pub fn trades_to_arrow_record_batch_bytes(
554    data: &[TradeTick],
555) -> Result<RecordBatch, EncodingError> {
556    if data.is_empty() {
557        return Err(EncodingError::EmptyData);
558    }
559
560    // Take first element and extract metadata
561    let first = data.first().unwrap();
562    let metadata = first.metadata();
563    TradeTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
564}
565
566/// Converts a vector of `Bar` into an Arrow `RecordBatch`.
567///
568/// # Errors
569///
570/// Returns an error if:
571/// - `data` is empty: `EncodingError::EmptyData`.
572/// - Encoding fails: `EncodingError::ArrowError`.
573#[expect(clippy::missing_panics_doc)] // Guarded by empty check
574pub fn bars_to_arrow_record_batch_bytes(data: &[Bar]) -> Result<RecordBatch, EncodingError> {
575    if data.is_empty() {
576        return Err(EncodingError::EmptyData);
577    }
578
579    // Take first element and extract metadata
580    let first = data.first().unwrap();
581    let metadata = first.metadata();
582    Bar::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
583}
584
585/// Converts a vector of `MarkPriceUpdate` into an Arrow `RecordBatch`.
586///
587/// # Errors
588///
589/// Returns an error if:
590/// - `data` is empty: `EncodingError::EmptyData`.
591/// - Encoding fails: `EncodingError::ArrowError`.
592#[expect(clippy::missing_panics_doc)] // Guarded by empty check
593pub fn mark_prices_to_arrow_record_batch_bytes(
594    data: &[MarkPriceUpdate],
595) -> Result<RecordBatch, EncodingError> {
596    if data.is_empty() {
597        return Err(EncodingError::EmptyData);
598    }
599
600    // Take first element and extract metadata
601    let first = data.first().unwrap();
602    let metadata = first.metadata();
603    MarkPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
604}
605
606/// Converts a vector of `IndexPriceUpdate` into an Arrow `RecordBatch`.
607///
608/// # Errors
609///
610/// Returns an error if:
611/// - `data` is empty: `EncodingError::EmptyData`.
612/// - Encoding fails: `EncodingError::ArrowError`.
613#[expect(clippy::missing_panics_doc)] // Guarded by empty check
614pub fn index_prices_to_arrow_record_batch_bytes(
615    data: &[IndexPriceUpdate],
616) -> Result<RecordBatch, EncodingError> {
617    if data.is_empty() {
618        return Err(EncodingError::EmptyData);
619    }
620
621    // Take first element and extract metadata
622    let first = data.first().unwrap();
623    let metadata = first.metadata();
624    IndexPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
625}
626
627/// Converts a vector of `InstrumentStatus` into an Arrow `RecordBatch`.
628///
629/// # Errors
630///
631/// Returns an error if:
632/// - `data` is empty: `EncodingError::EmptyData`.
633/// - Encoding fails: `EncodingError::ArrowError`.
634#[expect(clippy::missing_panics_doc)] // Guarded by empty check
635pub fn instrument_status_to_arrow_record_batch_bytes(
636    data: &[InstrumentStatus],
637) -> Result<RecordBatch, EncodingError> {
638    if data.is_empty() {
639        return Err(EncodingError::EmptyData);
640    }
641
642    let first = data.first().unwrap();
643    let metadata = first.metadata();
644    InstrumentStatus::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
645}
646
647/// Converts a vector of `InstrumentClose` into an Arrow `RecordBatch`.
648///
649/// # Errors
650///
651/// Returns an error if:
652/// - `data` is empty: `EncodingError::EmptyData`.
653/// - Encoding fails: `EncodingError::ArrowError`.
654#[expect(clippy::missing_panics_doc)] // Guarded by empty check
655pub fn instrument_closes_to_arrow_record_batch_bytes(
656    data: &[InstrumentClose],
657) -> Result<RecordBatch, EncodingError> {
658    if data.is_empty() {
659        return Err(EncodingError::EmptyData);
660    }
661
662    // Take first element and extract metadata
663    let first = data.first().unwrap();
664    let metadata = first.metadata();
665    InstrumentClose::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
666}