1pub mod account_state;
19pub mod bar;
20pub mod close;
21pub mod custom;
22pub mod delta;
23pub mod depth;
24#[cfg(feature = "display")]
25pub mod display;
26pub mod funding;
27pub mod index_price;
28pub mod instrument;
29pub mod instrument_status;
30pub mod json;
31pub mod mark_price;
32pub mod order_event;
33pub mod position_event;
34pub mod quote;
35pub mod report;
36pub mod snapshot;
37pub mod trade;
38
39use std::{
40 collections::HashMap,
41 io::{self, Write},
42};
43
44use arrow::{
45 array::{Array, ArrayRef, FixedSizeBinaryArray, StringArray, StringViewArray},
46 datatypes::{DataType, Schema},
47 error::ArrowError,
48 ipc::writer::StreamWriter,
49 record_batch::RecordBatch,
50};
51use nautilus_model::{
52 data::{
53 Data, IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate, bar::Bar,
54 close::InstrumentClose, delta::OrderBookDelta, depth::OrderBookDepth10, quote::QuoteTick,
55 trade::TradeTick,
56 },
57 types::{
58 PRICE_ERROR, PRICE_UNDEF, Price, QUANTITY_UNDEF, Quantity,
59 fixed::{PRECISION_BYTES, correct_price_raw, correct_quantity_raw},
60 price::PriceRaw,
61 quantity::QuantityRaw,
62 },
63};
64#[cfg(feature = "python")]
65use pyo3::prelude::*;
66
67const KEY_BAR_TYPE: &str = "bar_type";
69pub const KEY_INSTRUMENT_ID: &str = "instrument_id";
70pub const KEY_PRICE_PRECISION: &str = "price_precision";
71pub const KEY_SIZE_PRECISION: &str = "size_precision";
72
73#[derive(thiserror::Error, Debug)]
74pub enum DataStreamingError {
75 #[error("I/O error: {0}")]
76 IoError(#[from] io::Error),
77 #[error("Arrow error: {0}")]
78 ArrowError(#[from] arrow::error::ArrowError),
79 #[cfg(feature = "python")]
80 #[error("Python error: {0}")]
81 PythonError(#[from] PyErr),
82}
83
84#[derive(thiserror::Error, Debug)]
85pub enum EncodingError {
86 #[error("Empty data")]
87 EmptyData,
88 #[error("Missing metadata key: `{0}`")]
89 MissingMetadata(&'static str),
90 #[error("Missing data column: `{0}` at index {1}")]
91 MissingColumn(&'static str, usize),
92 #[error("Error parsing `{0}`: {1}")]
93 ParseError(&'static str, String),
94 #[error("Invalid column type `{0}` at index {1}: expected {2}, found {3}")]
95 InvalidColumnType(&'static str, usize, DataType, DataType),
96 #[error(
97 "Precision mode mismatch for `{field}`: catalog data has {actual_bytes} byte values, \
98 but this build expects {expected_bytes} bytes. The catalog was created with a different \
99 precision mode (standard=8 bytes, high=16 bytes). Rebuild the catalog or change your \
100 build's precision mode. See: https://nautilustrader.io/docs/latest/getting_started/installation#precision-mode"
101 )]
102 PrecisionMismatch {
103 field: &'static str,
104 expected_bytes: i32,
105 actual_bytes: i32,
106 },
107 #[error("Arrow error: {0}")]
108 ArrowError(#[from] arrow::error::ArrowError),
109}
110
111#[inline]
112fn get_raw_price(bytes: &[u8]) -> PriceRaw {
113 PriceRaw::from_le_bytes(
114 bytes
115 .try_into()
116 .expect("Price raw bytes must be exactly the size of PriceRaw"),
117 )
118}
119
120#[inline]
121fn get_raw_quantity(bytes: &[u8]) -> QuantityRaw {
122 QuantityRaw::from_le_bytes(
123 bytes
124 .try_into()
125 .expect("Quantity raw bytes must be exactly the size of QuantityRaw"),
126 )
127}
128
129#[inline]
137fn get_corrected_raw_price(bytes: &[u8], precision: u8) -> PriceRaw {
138 let raw = get_raw_price(bytes);
139
140 if raw == PRICE_UNDEF || raw == PRICE_ERROR {
142 return raw;
143 }
144
145 correct_price_raw(raw, precision)
146}
147
148#[inline]
156fn get_corrected_raw_quantity(bytes: &[u8], precision: u8) -> QuantityRaw {
157 let raw = get_raw_quantity(bytes);
158
159 if raw == QUANTITY_UNDEF {
161 return raw;
162 }
163
164 correct_quantity_raw(raw, precision)
165}
166
167pub fn decode_price(
176 bytes: &[u8],
177 precision: u8,
178 field: &'static str,
179 row: usize,
180) -> Result<Price, EncodingError> {
181 let raw = get_corrected_raw_price(bytes, precision);
182 Price::from_raw_checked(raw, precision)
183 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
184}
185
186pub fn decode_quantity(
195 bytes: &[u8],
196 precision: u8,
197 field: &'static str,
198 row: usize,
199) -> Result<Quantity, EncodingError> {
200 let raw = get_corrected_raw_quantity(bytes, precision);
201 Quantity::from_raw_checked(raw, precision)
202 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
203}
204
205pub fn decode_price_with_sentinel(
213 bytes: &[u8],
214 precision: u8,
215 field: &'static str,
216 row: usize,
217) -> Result<Price, EncodingError> {
218 let raw = get_raw_price(bytes);
219 let (final_raw, final_precision) = if raw == PRICE_UNDEF {
220 (raw, 0)
221 } else {
222 (get_corrected_raw_price(bytes, precision), precision)
223 };
224 Price::from_raw_checked(final_raw, final_precision)
225 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
226}
227
228pub fn decode_quantity_with_sentinel(
236 bytes: &[u8],
237 precision: u8,
238 field: &'static str,
239 row: usize,
240) -> Result<Quantity, EncodingError> {
241 let raw = get_raw_quantity(bytes);
242 let (final_raw, final_precision) = if raw == QUANTITY_UNDEF {
243 (raw, 0)
244 } else {
245 (get_corrected_raw_quantity(bytes, precision), precision)
246 };
247 Quantity::from_raw_checked(final_raw, final_precision)
248 .map_err(|e| EncodingError::ParseError(field, format!("row {row}: {e}")))
249}
250
251pub trait ArrowSchemaProvider {
253 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema;
255
256 #[must_use]
258 fn get_schema_map() -> HashMap<String, String> {
259 let schema = Self::get_schema(None);
260 let mut map = HashMap::new();
261
262 for field in schema.fields() {
263 let name = field.name().clone();
264 let data_type = format!("{:?}", field.data_type());
265 map.insert(name, data_type);
266 }
267 map
268 }
269}
270
271pub trait EncodeToRecordBatch
273where
274 Self: Sized + ArrowSchemaProvider,
275{
276 fn encode_batch(
282 metadata: &HashMap<String, String>,
283 data: &[Self],
284 ) -> Result<RecordBatch, ArrowError>;
285
286 fn metadata(&self) -> HashMap<String, String>;
288
289 fn chunk_metadata(chunk: &[Self]) -> HashMap<String, String> {
295 chunk
296 .first()
297 .map(Self::metadata)
298 .expect("Chunk must have at least one element to encode")
299 }
300}
301
302pub trait DecodeFromRecordBatch
304where
305 Self: Sized + Into<Data> + ArrowSchemaProvider,
306{
307 fn decode_batch(
313 metadata: &HashMap<String, String>,
314 record_batch: RecordBatch,
315 ) -> Result<Vec<Self>, EncodingError>;
316}
317
318pub trait DecodeTypedFromRecordBatch
320where
321 Self: Sized + ArrowSchemaProvider,
322{
323 fn decode_typed_batch(
329 metadata: &HashMap<String, String>,
330 record_batch: RecordBatch,
331 ) -> Result<Vec<Self>, EncodingError>;
332}
333
334impl<T> DecodeTypedFromRecordBatch for T
335where
336 T: DecodeFromRecordBatch,
337{
338 fn decode_typed_batch(
339 metadata: &HashMap<String, String>,
340 record_batch: RecordBatch,
341 ) -> Result<Vec<Self>, EncodingError> {
342 Self::decode_batch(metadata, record_batch)
343 }
344}
345
346pub trait DecodeDataFromRecordBatch
348where
349 Self: Sized + ArrowSchemaProvider,
350{
351 fn decode_data_batch(
357 metadata: &HashMap<String, String>,
358 record_batch: RecordBatch,
359 ) -> Result<Vec<Data>, EncodingError>;
360}
361
362pub trait WriteStream {
364 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError>;
370}
371
372impl<T: Write> WriteStream for T {
373 fn write(&mut self, record_batch: &RecordBatch) -> Result<(), DataStreamingError> {
374 let mut writer = StreamWriter::try_new(self, &record_batch.schema())?;
375 writer.write(record_batch)?;
376 writer.finish()?;
377 Ok(())
378 }
379}
380
381pub fn extract_column_string<'a>(
390 cols: &'a [ArrayRef],
391 column_key: &'static str,
392 column_index: usize,
393) -> Result<StringColumnRef<'a>, EncodingError> {
394 let column_values = cols
395 .get(column_index)
396 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
397 let dt = column_values.data_type();
398 if let Some(arr) = column_values.as_any().downcast_ref::<StringArray>() {
399 Ok(StringColumnRef::Utf8(arr))
400 } else if let Some(arr) = column_values.as_any().downcast_ref::<StringViewArray>() {
401 Ok(StringColumnRef::Utf8View(arr))
402 } else {
403 Err(EncodingError::InvalidColumnType(
404 column_key,
405 column_index,
406 DataType::Utf8,
407 dt.clone(),
408 ))
409 }
410}
411
412#[derive(Debug)]
414pub enum StringColumnRef<'a> {
415 Utf8(&'a StringArray),
416 Utf8View(&'a StringViewArray),
417}
418
419impl StringColumnRef<'_> {
420 #[inline]
422 #[must_use]
423 pub fn value(&self, i: usize) -> &str {
424 match self {
425 Self::Utf8(arr) => arr.value(i),
426 Self::Utf8View(arr) => arr.value(i),
427 }
428 }
429}
430
431pub fn extract_column<'a, T: Array + 'static>(
439 cols: &'a [ArrayRef],
440 column_key: &'static str,
441 column_index: usize,
442 expected_type: DataType,
443) -> Result<&'a T, EncodingError> {
444 let column_values = cols
445 .get(column_index)
446 .ok_or(EncodingError::MissingColumn(column_key, column_index))?;
447 let downcasted_values =
448 column_values
449 .as_any()
450 .downcast_ref::<T>()
451 .ok_or(EncodingError::InvalidColumnType(
452 column_key,
453 column_index,
454 expected_type,
455 column_values.data_type().clone(),
456 ))?;
457 Ok(downcasted_values)
458}
459
460pub fn validate_precision_bytes(
470 array: &FixedSizeBinaryArray,
471 field: &'static str,
472) -> Result<(), EncodingError> {
473 let actual = array.value_length();
474 if actual != PRECISION_BYTES {
475 return Err(EncodingError::PrecisionMismatch {
476 field,
477 expected_bytes: PRECISION_BYTES,
478 actual_bytes: actual,
479 });
480 }
481 Ok(())
482}
483
484pub fn book_deltas_to_arrow_record_batch_bytes(
492 data: &[OrderBookDelta],
493) -> Result<RecordBatch, EncodingError> {
494 if data.is_empty() {
495 return Err(EncodingError::EmptyData);
496 }
497
498 let metadata = OrderBookDelta::chunk_metadata(data);
500 OrderBookDelta::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
501}
502
503#[expect(clippy::missing_panics_doc)] pub fn book_depth10_to_arrow_record_batch_bytes(
512 data: &[OrderBookDepth10],
513) -> Result<RecordBatch, EncodingError> {
514 if data.is_empty() {
515 return Err(EncodingError::EmptyData);
516 }
517
518 let first = data.first().unwrap();
520 let metadata = first.metadata();
521 OrderBookDepth10::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
522}
523
524#[expect(clippy::missing_panics_doc)] pub fn quotes_to_arrow_record_batch_bytes(
533 data: &[QuoteTick],
534) -> Result<RecordBatch, EncodingError> {
535 if data.is_empty() {
536 return Err(EncodingError::EmptyData);
537 }
538
539 let first = data.first().unwrap();
541 let metadata = first.metadata();
542 QuoteTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
543}
544
545#[expect(clippy::missing_panics_doc)] pub fn trades_to_arrow_record_batch_bytes(
554 data: &[TradeTick],
555) -> Result<RecordBatch, EncodingError> {
556 if data.is_empty() {
557 return Err(EncodingError::EmptyData);
558 }
559
560 let first = data.first().unwrap();
562 let metadata = first.metadata();
563 TradeTick::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
564}
565
566#[expect(clippy::missing_panics_doc)] pub fn bars_to_arrow_record_batch_bytes(data: &[Bar]) -> Result<RecordBatch, EncodingError> {
575 if data.is_empty() {
576 return Err(EncodingError::EmptyData);
577 }
578
579 let first = data.first().unwrap();
581 let metadata = first.metadata();
582 Bar::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
583}
584
585#[expect(clippy::missing_panics_doc)] pub fn mark_prices_to_arrow_record_batch_bytes(
594 data: &[MarkPriceUpdate],
595) -> Result<RecordBatch, EncodingError> {
596 if data.is_empty() {
597 return Err(EncodingError::EmptyData);
598 }
599
600 let first = data.first().unwrap();
602 let metadata = first.metadata();
603 MarkPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
604}
605
606#[expect(clippy::missing_panics_doc)] pub fn index_prices_to_arrow_record_batch_bytes(
615 data: &[IndexPriceUpdate],
616) -> Result<RecordBatch, EncodingError> {
617 if data.is_empty() {
618 return Err(EncodingError::EmptyData);
619 }
620
621 let first = data.first().unwrap();
623 let metadata = first.metadata();
624 IndexPriceUpdate::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
625}
626
627#[expect(clippy::missing_panics_doc)] pub fn instrument_status_to_arrow_record_batch_bytes(
636 data: &[InstrumentStatus],
637) -> Result<RecordBatch, EncodingError> {
638 if data.is_empty() {
639 return Err(EncodingError::EmptyData);
640 }
641
642 let first = data.first().unwrap();
643 let metadata = first.metadata();
644 InstrumentStatus::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
645}
646
647#[expect(clippy::missing_panics_doc)] pub fn instrument_closes_to_arrow_record_batch_bytes(
656 data: &[InstrumentClose],
657) -> Result<RecordBatch, EncodingError> {
658 if data.is_empty() {
659 return Err(EncodingError::EmptyData);
660 }
661
662 let first = data.first().unwrap();
664 let metadata = first.metadata();
665 InstrumentClose::encode_batch(&metadata, data).map_err(EncodingError::ArrowError)
666}