nautilus_serialization/arrow/display/
mark_price.rs1use std::sync::Arc;
19
20use arrow::{
21 array::{Float64Builder, StringBuilder, TimestampNanosecondBuilder},
22 datatypes::Schema,
23 error::ArrowError,
24 record_batch::RecordBatch,
25};
26use nautilus_model::data::MarkPriceUpdate;
27
28use super::{float64_field, price_to_f64, timestamp_field, unix_nanos_to_i64, utf8_field};
29
30#[must_use]
32pub fn mark_prices_schema() -> Schema {
33 Schema::new(vec![
34 utf8_field("instrument_id", false),
35 float64_field("value", false),
36 timestamp_field("ts_event", false),
37 timestamp_field("ts_init", false),
38 ])
39}
40
41pub fn encode_mark_prices(data: &[MarkPriceUpdate]) -> Result<RecordBatch, ArrowError> {
54 let mut instrument_id_builder = StringBuilder::new();
55 let mut value_builder = Float64Builder::with_capacity(data.len());
56 let mut ts_event_builder = TimestampNanosecondBuilder::with_capacity(data.len());
57 let mut ts_init_builder = TimestampNanosecondBuilder::with_capacity(data.len());
58
59 for update in data {
60 instrument_id_builder.append_value(update.instrument_id.to_string());
61 value_builder.append_value(price_to_f64(&update.value));
62 ts_event_builder.append_value(unix_nanos_to_i64(update.ts_event.as_u64()));
63 ts_init_builder.append_value(unix_nanos_to_i64(update.ts_init.as_u64()));
64 }
65
66 RecordBatch::try_new(
67 Arc::new(mark_prices_schema()),
68 vec![
69 Arc::new(instrument_id_builder.finish()),
70 Arc::new(value_builder.finish()),
71 Arc::new(ts_event_builder.finish()),
72 Arc::new(ts_init_builder.finish()),
73 ],
74 )
75}
76
77#[cfg(test)]
78mod tests {
79 use arrow::{
80 array::{Array, Float64Array, StringArray, TimestampNanosecondArray},
81 datatypes::{DataType, TimeUnit},
82 };
83 use nautilus_model::{identifiers::InstrumentId, types::Price};
84 use rstest::rstest;
85
86 use super::*;
87
88 fn make_update(instrument_id: &str, value: &str, ts: u64) -> MarkPriceUpdate {
89 MarkPriceUpdate {
90 instrument_id: InstrumentId::from(instrument_id),
91 value: Price::from(value),
92 ts_event: ts.into(),
93 ts_init: (ts + 1).into(),
94 }
95 }
96
97 #[rstest]
98 fn test_encode_mark_prices_schema() {
99 let batch = encode_mark_prices(&[]).unwrap();
100 let fields = batch.schema().fields().clone();
101 assert_eq!(fields.len(), 4);
102 assert_eq!(fields[0].name(), "instrument_id");
103 assert_eq!(fields[0].data_type(), &DataType::Utf8);
104 assert_eq!(fields[1].name(), "value");
105 assert_eq!(fields[1].data_type(), &DataType::Float64);
106 assert_eq!(fields[2].name(), "ts_event");
107 assert_eq!(
108 fields[2].data_type(),
109 &DataType::Timestamp(TimeUnit::Nanosecond, None)
110 );
111 assert_eq!(fields[3].name(), "ts_init");
112 }
113
114 #[rstest]
115 fn test_encode_mark_prices_values() {
116 let updates = vec![
117 make_update("BTC-USDT.BINANCE", "50200.00", 1_000),
118 make_update("BTC-USDT.BINANCE", "50300.00", 2_000),
119 ];
120 let batch = encode_mark_prices(&updates).unwrap();
121
122 assert_eq!(batch.num_rows(), 2);
123
124 let instrument_id_col = batch
125 .column(0)
126 .as_any()
127 .downcast_ref::<StringArray>()
128 .unwrap();
129 let value_col = batch
130 .column(1)
131 .as_any()
132 .downcast_ref::<Float64Array>()
133 .unwrap();
134 let ts_event_col = batch
135 .column(2)
136 .as_any()
137 .downcast_ref::<TimestampNanosecondArray>()
138 .unwrap();
139
140 assert_eq!(instrument_id_col.value(0), "BTC-USDT.BINANCE");
141 assert!((value_col.value(0) - 50_200.00).abs() < 1e-9);
142 assert!((value_col.value(1) - 50_300.00).abs() < 1e-9);
143 assert_eq!(ts_event_col.value(0), 1_000);
144 }
145
146 #[rstest]
147 fn test_encode_mark_prices_empty() {
148 let batch = encode_mark_prices(&[]).unwrap();
149 assert_eq!(batch.num_rows(), 0);
150 }
151
152 #[rstest]
153 fn test_encode_mark_prices_mixed_instruments() {
154 let updates = vec![
155 make_update("BTC-USDT.BINANCE", "50200.00", 1),
156 make_update("ETH-USDT.BINANCE", "2500.00", 2),
157 ];
158 let batch = encode_mark_prices(&updates).unwrap();
159 let instrument_id_col = batch
160 .column(0)
161 .as_any()
162 .downcast_ref::<StringArray>()
163 .unwrap();
164 assert_eq!(instrument_id_col.value(0), "BTC-USDT.BINANCE");
165 assert_eq!(instrument_id_col.value(1), "ETH-USDT.BINANCE");
166 }
167}