nautilus_serialization/arrow/display/
depth.rs1use std::sync::Arc;
19
20use arrow::{
21 array::{
22 ArrayRef, Float64Builder, StringBuilder, TimestampNanosecondBuilder, UInt8Builder,
23 UInt32Builder, UInt64Builder,
24 },
25 datatypes::{DataType, Field, Schema},
26 error::ArrowError,
27 record_batch::RecordBatch,
28};
29use nautilus_model::data::depth::{DEPTH10_LEN, OrderBookDepth10};
30
31use super::{
32 float64_field, price_to_f64, quantity_to_f64, timestamp_field, unix_nanos_to_i64, utf8_field,
33};
34
35#[must_use]
42pub fn depth10_schema() -> Schema {
43 let mut fields = Vec::with_capacity(1 + 6 * DEPTH10_LEN + 4);
44 fields.push(utf8_field("instrument_id", false));
45
46 for i in 0..DEPTH10_LEN {
47 fields.push(float64_field(&format!("bid_price_{i}"), false));
48 }
49
50 for i in 0..DEPTH10_LEN {
51 fields.push(float64_field(&format!("ask_price_{i}"), false));
52 }
53
54 for i in 0..DEPTH10_LEN {
55 fields.push(float64_field(&format!("bid_size_{i}"), false));
56 }
57
58 for i in 0..DEPTH10_LEN {
59 fields.push(float64_field(&format!("ask_size_{i}"), false));
60 }
61
62 for i in 0..DEPTH10_LEN {
63 fields.push(Field::new(
64 format!("bid_count_{i}"),
65 DataType::UInt32,
66 false,
67 ));
68 }
69
70 for i in 0..DEPTH10_LEN {
71 fields.push(Field::new(
72 format!("ask_count_{i}"),
73 DataType::UInt32,
74 false,
75 ));
76 }
77
78 fields.push(Field::new("flags", DataType::UInt8, false));
79 fields.push(Field::new("sequence", DataType::UInt64, false));
80 fields.push(timestamp_field("ts_event", false));
81 fields.push(timestamp_field("ts_init", false));
82
83 Schema::new(fields)
84}
85
86pub fn encode_depth10(data: &[OrderBookDepth10]) -> Result<RecordBatch, ArrowError> {
101 let mut instrument_id_builder = StringBuilder::new();
102 let mut bid_price_builders: Vec<Float64Builder> = (0..DEPTH10_LEN)
103 .map(|_| Float64Builder::with_capacity(data.len()))
104 .collect();
105 let mut ask_price_builders: Vec<Float64Builder> = (0..DEPTH10_LEN)
106 .map(|_| Float64Builder::with_capacity(data.len()))
107 .collect();
108 let mut bid_size_builders: Vec<Float64Builder> = (0..DEPTH10_LEN)
109 .map(|_| Float64Builder::with_capacity(data.len()))
110 .collect();
111 let mut ask_size_builders: Vec<Float64Builder> = (0..DEPTH10_LEN)
112 .map(|_| Float64Builder::with_capacity(data.len()))
113 .collect();
114 let mut bid_count_builders: Vec<UInt32Builder> = (0..DEPTH10_LEN)
115 .map(|_| UInt32Builder::with_capacity(data.len()))
116 .collect();
117 let mut ask_count_builders: Vec<UInt32Builder> = (0..DEPTH10_LEN)
118 .map(|_| UInt32Builder::with_capacity(data.len()))
119 .collect();
120 let mut flags_builder = UInt8Builder::with_capacity(data.len());
121 let mut sequence_builder = UInt64Builder::with_capacity(data.len());
122 let mut ts_event_builder = TimestampNanosecondBuilder::with_capacity(data.len());
123 let mut ts_init_builder = TimestampNanosecondBuilder::with_capacity(data.len());
124
125 for depth in data {
126 instrument_id_builder.append_value(depth.instrument_id.to_string());
127 for i in 0..DEPTH10_LEN {
128 bid_price_builders[i].append_value(price_to_f64(&depth.bids[i].price));
129 ask_price_builders[i].append_value(price_to_f64(&depth.asks[i].price));
130 bid_size_builders[i].append_value(quantity_to_f64(&depth.bids[i].size));
131 ask_size_builders[i].append_value(quantity_to_f64(&depth.asks[i].size));
132 bid_count_builders[i].append_value(depth.bid_counts[i]);
133 ask_count_builders[i].append_value(depth.ask_counts[i]);
134 }
135 flags_builder.append_value(depth.flags);
136 sequence_builder.append_value(depth.sequence);
137 ts_event_builder.append_value(unix_nanos_to_i64(depth.ts_event.as_u64()));
138 ts_init_builder.append_value(unix_nanos_to_i64(depth.ts_init.as_u64()));
139 }
140
141 let mut columns: Vec<ArrayRef> = Vec::with_capacity(1 + 6 * DEPTH10_LEN + 4);
142 columns.push(Arc::new(instrument_id_builder.finish()));
143
144 for mut b in bid_price_builders {
145 columns.push(Arc::new(b.finish()));
146 }
147
148 for mut b in ask_price_builders {
149 columns.push(Arc::new(b.finish()));
150 }
151
152 for mut b in bid_size_builders {
153 columns.push(Arc::new(b.finish()));
154 }
155
156 for mut b in ask_size_builders {
157 columns.push(Arc::new(b.finish()));
158 }
159
160 for mut b in bid_count_builders {
161 columns.push(Arc::new(b.finish()));
162 }
163
164 for mut b in ask_count_builders {
165 columns.push(Arc::new(b.finish()));
166 }
167
168 columns.push(Arc::new(flags_builder.finish()));
169 columns.push(Arc::new(sequence_builder.finish()));
170 columns.push(Arc::new(ts_event_builder.finish()));
171 columns.push(Arc::new(ts_init_builder.finish()));
172
173 RecordBatch::try_new(Arc::new(depth10_schema()), columns)
174}
175
176#[cfg(test)]
177mod tests {
178 use arrow::{
179 array::{
180 Array, Float64Array, StringArray, TimestampNanosecondArray, UInt8Array, UInt32Array,
181 UInt64Array,
182 },
183 datatypes::TimeUnit,
184 };
185 use nautilus_model::{
186 data::{order::BookOrder, stubs::stub_depth10},
187 identifiers::InstrumentId,
188 types::{Price, Quantity},
189 };
190 use rstest::rstest;
191
192 use super::*;
193
194 #[rstest]
195 fn test_encode_depth10_schema() {
196 let batch = encode_depth10(&[]).unwrap();
197 let fields = batch.schema().fields().clone();
198
199 let expected_len = 1 + 6 * DEPTH10_LEN + 4;
200 assert_eq!(fields.len(), expected_len);
201
202 assert_eq!(fields[0].name(), "instrument_id");
203 assert_eq!(fields[0].data_type(), &DataType::Utf8);
204
205 for i in 0..DEPTH10_LEN {
206 assert_eq!(fields[1 + i].name(), &format!("bid_price_{i}"));
207 assert_eq!(fields[1 + i].data_type(), &DataType::Float64);
208 }
209
210 for i in 0..DEPTH10_LEN {
211 assert_eq!(
212 fields[1 + DEPTH10_LEN + i].name(),
213 &format!("ask_price_{i}")
214 );
215 }
216
217 for i in 0..DEPTH10_LEN {
218 assert_eq!(
219 fields[1 + 2 * DEPTH10_LEN + i].name(),
220 &format!("bid_size_{i}")
221 );
222 }
223
224 for i in 0..DEPTH10_LEN {
225 assert_eq!(
226 fields[1 + 3 * DEPTH10_LEN + i].name(),
227 &format!("ask_size_{i}")
228 );
229 }
230
231 for i in 0..DEPTH10_LEN {
232 assert_eq!(
233 fields[1 + 4 * DEPTH10_LEN + i].name(),
234 &format!("bid_count_{i}")
235 );
236 assert_eq!(
237 fields[1 + 4 * DEPTH10_LEN + i].data_type(),
238 &DataType::UInt32
239 );
240 }
241
242 for i in 0..DEPTH10_LEN {
243 assert_eq!(
244 fields[1 + 5 * DEPTH10_LEN + i].name(),
245 &format!("ask_count_{i}")
246 );
247 }
248
249 let trailer_start = 1 + 6 * DEPTH10_LEN;
250 assert_eq!(fields[trailer_start].name(), "flags");
251 assert_eq!(fields[trailer_start].data_type(), &DataType::UInt8);
252 assert_eq!(fields[trailer_start + 1].name(), "sequence");
253 assert_eq!(fields[trailer_start + 1].data_type(), &DataType::UInt64);
254 assert_eq!(fields[trailer_start + 2].name(), "ts_event");
255 assert_eq!(
256 fields[trailer_start + 2].data_type(),
257 &DataType::Timestamp(TimeUnit::Nanosecond, None)
258 );
259 assert_eq!(fields[trailer_start + 3].name(), "ts_init");
260 }
261
262 #[rstest]
263 fn test_encode_depth10_values(stub_depth10: OrderBookDepth10) {
264 let data = vec![stub_depth10];
265 let batch = encode_depth10(&data).unwrap();
266
267 assert_eq!(batch.num_rows(), 1);
268
269 let instrument_id_col = batch
270 .column(0)
271 .as_any()
272 .downcast_ref::<StringArray>()
273 .unwrap();
274 assert_eq!(
275 instrument_id_col.value(0),
276 stub_depth10.instrument_id.to_string()
277 );
278
279 let bid_price_0 = batch
280 .column(1)
281 .as_any()
282 .downcast_ref::<Float64Array>()
283 .unwrap();
284 assert!((bid_price_0.value(0) - stub_depth10.bids[0].price.as_f64()).abs() < 1e-9);
285
286 let ask_price_0 = batch
287 .column(1 + DEPTH10_LEN)
288 .as_any()
289 .downcast_ref::<Float64Array>()
290 .unwrap();
291 assert!((ask_price_0.value(0) - stub_depth10.asks[0].price.as_f64()).abs() < 1e-9);
292
293 let bid_size_0 = batch
294 .column(1 + 2 * DEPTH10_LEN)
295 .as_any()
296 .downcast_ref::<Float64Array>()
297 .unwrap();
298 assert!((bid_size_0.value(0) - stub_depth10.bids[0].size.as_f64()).abs() < 1e-9);
299
300 let bid_count_0 = batch
301 .column(1 + 4 * DEPTH10_LEN)
302 .as_any()
303 .downcast_ref::<UInt32Array>()
304 .unwrap();
305 assert_eq!(bid_count_0.value(0), stub_depth10.bid_counts[0]);
306
307 let trailer_start = 1 + 6 * DEPTH10_LEN;
308 let flags_col = batch
309 .column(trailer_start)
310 .as_any()
311 .downcast_ref::<UInt8Array>()
312 .unwrap();
313 let sequence_col = batch
314 .column(trailer_start + 1)
315 .as_any()
316 .downcast_ref::<UInt64Array>()
317 .unwrap();
318 let ts_event_col = batch
319 .column(trailer_start + 2)
320 .as_any()
321 .downcast_ref::<TimestampNanosecondArray>()
322 .unwrap();
323
324 assert_eq!(flags_col.value(0), stub_depth10.flags);
325 assert_eq!(sequence_col.value(0), stub_depth10.sequence);
326 assert_eq!(ts_event_col.value(0), stub_depth10.ts_event.as_u64() as i64);
327 }
328
329 #[rstest]
330 fn test_encode_depth10_multi_row_values(stub_depth10: OrderBookDepth10) {
331 let row0 = stub_depth10;
335 let mut row1 = stub_depth10;
336 row1.bids[0] = BookOrder::new(
337 row1.bids[0].side,
338 Price::from("200.00"),
339 Quantity::from(250),
340 row1.bids[0].order_id,
341 );
342 row1.asks[0] = BookOrder::new(
343 row1.asks[0].side,
344 Price::from("201.00"),
345 Quantity::from(350),
346 row1.asks[0].order_id,
347 );
348 row1.bid_counts[0] = 42;
349 row1.ask_counts[0] = 43;
350
351 let batch = encode_depth10(&[row0, row1]).unwrap();
352 assert_eq!(batch.num_rows(), 2);
353
354 let bid_price_0 = batch
355 .column(1)
356 .as_any()
357 .downcast_ref::<Float64Array>()
358 .unwrap();
359 let ask_price_0 = batch
360 .column(1 + DEPTH10_LEN)
361 .as_any()
362 .downcast_ref::<Float64Array>()
363 .unwrap();
364 let bid_size_0 = batch
365 .column(1 + 2 * DEPTH10_LEN)
366 .as_any()
367 .downcast_ref::<Float64Array>()
368 .unwrap();
369 let ask_size_0 = batch
370 .column(1 + 3 * DEPTH10_LEN)
371 .as_any()
372 .downcast_ref::<Float64Array>()
373 .unwrap();
374 let bid_count_0 = batch
375 .column(1 + 4 * DEPTH10_LEN)
376 .as_any()
377 .downcast_ref::<UInt32Array>()
378 .unwrap();
379 let ask_count_0 = batch
380 .column(1 + 5 * DEPTH10_LEN)
381 .as_any()
382 .downcast_ref::<UInt32Array>()
383 .unwrap();
384
385 assert!((bid_price_0.value(0) - row0.bids[0].price.as_f64()).abs() < 1e-9);
386 assert!((bid_price_0.value(1) - 200.00).abs() < 1e-9);
387 assert!((ask_price_0.value(0) - row0.asks[0].price.as_f64()).abs() < 1e-9);
388 assert!((ask_price_0.value(1) - 201.00).abs() < 1e-9);
389 assert!((bid_size_0.value(0) - row0.bids[0].size.as_f64()).abs() < 1e-9);
390 assert!((bid_size_0.value(1) - 250.0).abs() < 1e-9);
391 assert!((ask_size_0.value(0) - row0.asks[0].size.as_f64()).abs() < 1e-9);
392 assert!((ask_size_0.value(1) - 350.0).abs() < 1e-9);
393 assert_eq!(bid_count_0.value(0), row0.bid_counts[0]);
394 assert_eq!(bid_count_0.value(1), 42);
395 assert_eq!(ask_count_0.value(0), row0.ask_counts[0]);
396 assert_eq!(ask_count_0.value(1), 43);
397 }
398
399 #[rstest]
400 fn test_encode_depth10_empty() {
401 let batch = encode_depth10(&[]).unwrap();
402 assert_eq!(batch.num_rows(), 0);
403 }
404
405 #[rstest]
406 fn test_encode_depth10_mixed_instruments(stub_depth10: OrderBookDepth10) {
407 let mut other = stub_depth10;
408 other.instrument_id = InstrumentId::from("MSFT.XNAS");
409
410 let data = vec![stub_depth10, other];
411 let batch = encode_depth10(&data).unwrap();
412 assert_eq!(batch.num_rows(), 2);
413
414 let instrument_id_col = batch
415 .column(0)
416 .as_any()
417 .downcast_ref::<StringArray>()
418 .unwrap();
419 assert_eq!(
420 instrument_id_col.value(0),
421 stub_depth10.instrument_id.to_string()
422 );
423 assert_eq!(instrument_id_col.value(1), "MSFT.XNAS");
424 }
425}