nautilus_serialization/arrow/display/
quote.rs1use std::sync::Arc;
19
20use arrow::{
21 array::{Float64Builder, StringBuilder, TimestampNanosecondBuilder},
22 datatypes::Schema,
23 error::ArrowError,
24 record_batch::RecordBatch,
25};
26use nautilus_model::data::QuoteTick;
27
28use super::{
29 float64_field, price_to_f64, quantity_to_f64, timestamp_field, unix_nanos_to_i64, utf8_field,
30};
31
32#[must_use]
34pub fn quotes_schema() -> Schema {
35 Schema::new(vec![
36 utf8_field("instrument_id", false),
37 float64_field("bid_price", false),
38 float64_field("ask_price", false),
39 float64_field("bid_size", false),
40 float64_field("ask_size", false),
41 timestamp_field("ts_event", false),
42 timestamp_field("ts_init", false),
43 ])
44}
45
46pub fn encode_quotes(data: &[QuoteTick]) -> Result<RecordBatch, ArrowError> {
60 let mut instrument_id_builder = StringBuilder::new();
61 let mut bid_price_builder = Float64Builder::with_capacity(data.len());
62 let mut ask_price_builder = Float64Builder::with_capacity(data.len());
63 let mut bid_size_builder = Float64Builder::with_capacity(data.len());
64 let mut ask_size_builder = Float64Builder::with_capacity(data.len());
65 let mut ts_event_builder = TimestampNanosecondBuilder::with_capacity(data.len());
66 let mut ts_init_builder = TimestampNanosecondBuilder::with_capacity(data.len());
67
68 for quote in data {
69 instrument_id_builder.append_value(quote.instrument_id.to_string());
70 bid_price_builder.append_value(price_to_f64("e.bid_price));
71 ask_price_builder.append_value(price_to_f64("e.ask_price));
72 bid_size_builder.append_value(quantity_to_f64("e.bid_size));
73 ask_size_builder.append_value(quantity_to_f64("e.ask_size));
74 ts_event_builder.append_value(unix_nanos_to_i64(quote.ts_event.as_u64()));
75 ts_init_builder.append_value(unix_nanos_to_i64(quote.ts_init.as_u64()));
76 }
77
78 RecordBatch::try_new(
79 Arc::new(quotes_schema()),
80 vec![
81 Arc::new(instrument_id_builder.finish()),
82 Arc::new(bid_price_builder.finish()),
83 Arc::new(ask_price_builder.finish()),
84 Arc::new(bid_size_builder.finish()),
85 Arc::new(ask_size_builder.finish()),
86 Arc::new(ts_event_builder.finish()),
87 Arc::new(ts_init_builder.finish()),
88 ],
89 )
90}
91
92#[cfg(test)]
93mod tests {
94 use arrow::{
95 array::{Array, Float64Array, StringArray, TimestampNanosecondArray},
96 datatypes::{DataType, TimeUnit},
97 };
98 use nautilus_model::{
99 identifiers::InstrumentId,
100 types::{Price, Quantity},
101 };
102 use rstest::rstest;
103
104 use super::*;
105
106 fn make_quote(instrument_id: &str, bid: &str, ask: &str, ts: u64) -> QuoteTick {
107 QuoteTick {
108 instrument_id: InstrumentId::from(instrument_id),
109 bid_price: Price::from(bid),
110 ask_price: Price::from(ask),
111 bid_size: Quantity::from(1_000),
112 ask_size: Quantity::from(500),
113 ts_event: ts.into(),
114 ts_init: (ts + 1).into(),
115 }
116 }
117
118 #[rstest]
119 fn test_encode_quotes_schema() {
120 let quotes = vec![make_quote("AAPL.XNAS", "100.10", "100.20", 1)];
121 let batch = encode_quotes("es).unwrap();
122
123 let schema = batch.schema();
124 let fields = schema.fields();
125 assert_eq!(fields.len(), 7);
126 assert_eq!(fields[0].name(), "instrument_id");
127 assert_eq!(fields[0].data_type(), &DataType::Utf8);
128 assert_eq!(fields[1].name(), "bid_price");
129 assert_eq!(fields[1].data_type(), &DataType::Float64);
130 assert_eq!(fields[2].name(), "ask_price");
131 assert_eq!(fields[2].data_type(), &DataType::Float64);
132 assert_eq!(fields[3].name(), "bid_size");
133 assert_eq!(fields[3].data_type(), &DataType::Float64);
134 assert_eq!(fields[4].name(), "ask_size");
135 assert_eq!(fields[4].data_type(), &DataType::Float64);
136 assert_eq!(fields[5].name(), "ts_event");
137 assert_eq!(
138 fields[5].data_type(),
139 &DataType::Timestamp(TimeUnit::Nanosecond, None)
140 );
141 assert_eq!(fields[6].name(), "ts_init");
142 assert_eq!(
143 fields[6].data_type(),
144 &DataType::Timestamp(TimeUnit::Nanosecond, None)
145 );
146 }
147
148 #[rstest]
149 fn test_encode_quotes_values() {
150 let quotes = vec![
151 make_quote("AAPL.XNAS", "100.10", "100.20", 1_000_000_000),
152 make_quote("AAPL.XNAS", "100.15", "100.25", 2_000_000_000),
153 ];
154 let batch = encode_quotes("es).unwrap();
155
156 assert_eq!(batch.num_rows(), 2);
157
158 let instrument_id_col = batch
159 .column(0)
160 .as_any()
161 .downcast_ref::<StringArray>()
162 .unwrap();
163 let bid_price_col = batch
164 .column(1)
165 .as_any()
166 .downcast_ref::<Float64Array>()
167 .unwrap();
168 let ask_price_col = batch
169 .column(2)
170 .as_any()
171 .downcast_ref::<Float64Array>()
172 .unwrap();
173 let bid_size_col = batch
174 .column(3)
175 .as_any()
176 .downcast_ref::<Float64Array>()
177 .unwrap();
178 let ask_size_col = batch
179 .column(4)
180 .as_any()
181 .downcast_ref::<Float64Array>()
182 .unwrap();
183 let ts_event_col = batch
184 .column(5)
185 .as_any()
186 .downcast_ref::<TimestampNanosecondArray>()
187 .unwrap();
188 let ts_init_col = batch
189 .column(6)
190 .as_any()
191 .downcast_ref::<TimestampNanosecondArray>()
192 .unwrap();
193
194 assert_eq!(instrument_id_col.value(0), "AAPL.XNAS");
195 assert_eq!(instrument_id_col.value(1), "AAPL.XNAS");
196 assert!((bid_price_col.value(0) - 100.10).abs() < 1e-9);
197 assert!((bid_price_col.value(1) - 100.15).abs() < 1e-9);
198 assert!((ask_price_col.value(0) - 100.20).abs() < 1e-9);
199 assert!((ask_price_col.value(1) - 100.25).abs() < 1e-9);
200 assert!((bid_size_col.value(0) - 1_000.0).abs() < 1e-9);
201 assert!((ask_size_col.value(0) - 500.0).abs() < 1e-9);
202 assert_eq!(ts_event_col.value(0), 1_000_000_000);
203 assert_eq!(ts_event_col.value(1), 2_000_000_000);
204 assert_eq!(ts_init_col.value(0), 1_000_000_001);
205 assert_eq!(ts_init_col.value(1), 2_000_000_001);
206 }
207
208 #[rstest]
209 fn test_encode_quotes_empty() {
210 let batch = encode_quotes(&[]).unwrap();
211 assert_eq!(batch.num_rows(), 0);
212 assert_eq!(batch.schema().fields().len(), 7);
213 }
214
215 #[rstest]
216 fn test_encode_quotes_mixed_instruments() {
217 let quotes = vec![
218 make_quote("AAPL.XNAS", "100.10", "100.20", 1),
219 make_quote("MSFT.XNAS", "250.00", "250.05", 2),
220 ];
221 let batch = encode_quotes("es).unwrap();
222
223 let instrument_id_col = batch
224 .column(0)
225 .as_any()
226 .downcast_ref::<StringArray>()
227 .unwrap();
228 assert_eq!(instrument_id_col.value(0), "AAPL.XNAS");
229 assert_eq!(instrument_id_col.value(1), "MSFT.XNAS");
230 }
231}