1use std::sync::Arc;
19
20use arrow::{
21 array::{
22 Float64Builder, StringBuilder, TimestampNanosecondBuilder, UInt8Builder, UInt64Builder,
23 },
24 datatypes::{DataType, Field, Schema},
25 error::ArrowError,
26 record_batch::RecordBatch,
27};
28use nautilus_model::{data::OrderBookDelta, enums::BookAction};
29
30use super::{
31 float64_field, price_to_f64, quantity_to_f64, timestamp_field, unix_nanos_to_i64, utf8_field,
32};
33
34#[must_use]
36pub fn deltas_schema() -> Schema {
37 Schema::new(vec![
38 utf8_field("instrument_id", false),
39 utf8_field("action", false),
40 utf8_field("side", false),
41 float64_field("price", false),
42 float64_field("size", false),
43 utf8_field("order_id", false),
44 Field::new("flags", DataType::UInt8, false),
45 Field::new("sequence", DataType::UInt64, false),
46 timestamp_field("ts_event", false),
47 timestamp_field("ts_init", false),
48 ])
49}
50
51pub fn encode_deltas(data: &[OrderBookDelta]) -> Result<RecordBatch, ArrowError> {
66 let mut instrument_id_builder = StringBuilder::new();
67 let mut action_builder = StringBuilder::new();
68 let mut side_builder = StringBuilder::new();
69 let mut price_builder = Float64Builder::with_capacity(data.len());
70 let mut size_builder = Float64Builder::with_capacity(data.len());
71 let mut order_id_builder = StringBuilder::new();
72 let mut flags_builder = UInt8Builder::with_capacity(data.len());
73 let mut sequence_builder = UInt64Builder::with_capacity(data.len());
74 let mut ts_event_builder = TimestampNanosecondBuilder::with_capacity(data.len());
75 let mut ts_init_builder = TimestampNanosecondBuilder::with_capacity(data.len());
76
77 for delta in data {
78 instrument_id_builder.append_value(delta.instrument_id.to_string());
79 action_builder.append_value(format!("{}", delta.action));
80 side_builder.append_value(format!("{}", delta.order.side));
81
82 if delta.action == BookAction::Clear {
86 price_builder.append_value(f64::NAN);
87 size_builder.append_value(f64::NAN);
88 } else {
89 price_builder.append_value(price_to_f64(&delta.order.price));
90 size_builder.append_value(quantity_to_f64(&delta.order.size));
91 }
92 order_id_builder.append_value(delta.order.order_id.to_string());
93 flags_builder.append_value(delta.flags);
94 sequence_builder.append_value(delta.sequence);
95 ts_event_builder.append_value(unix_nanos_to_i64(delta.ts_event.as_u64()));
96 ts_init_builder.append_value(unix_nanos_to_i64(delta.ts_init.as_u64()));
97 }
98
99 RecordBatch::try_new(
100 Arc::new(deltas_schema()),
101 vec![
102 Arc::new(instrument_id_builder.finish()),
103 Arc::new(action_builder.finish()),
104 Arc::new(side_builder.finish()),
105 Arc::new(price_builder.finish()),
106 Arc::new(size_builder.finish()),
107 Arc::new(order_id_builder.finish()),
108 Arc::new(flags_builder.finish()),
109 Arc::new(sequence_builder.finish()),
110 Arc::new(ts_event_builder.finish()),
111 Arc::new(ts_init_builder.finish()),
112 ],
113 )
114}
115
116#[cfg(test)]
117mod tests {
118 use arrow::{
119 array::{
120 Array, Float64Array, StringArray, TimestampNanosecondArray, UInt8Array, UInt64Array,
121 },
122 datatypes::TimeUnit,
123 };
124 use nautilus_model::{
125 data::order::BookOrder,
126 enums::{BookAction, OrderSide},
127 identifiers::InstrumentId,
128 types::{Price, Quantity, price::PRICE_UNDEF, quantity::QUANTITY_UNDEF},
129 };
130 use rstest::rstest;
131
132 use super::*;
133
134 fn make_delta(
135 instrument_id: &str,
136 action: BookAction,
137 side: OrderSide,
138 price: &str,
139 order_id: u64,
140 sequence: u64,
141 ts: u64,
142 ) -> OrderBookDelta {
143 OrderBookDelta {
144 instrument_id: InstrumentId::from(instrument_id),
145 action,
146 order: BookOrder {
147 side,
148 price: Price::from(price),
149 size: Quantity::from(100),
150 order_id,
151 },
152 flags: 0,
153 sequence,
154 ts_event: ts.into(),
155 ts_init: (ts + 1).into(),
156 }
157 }
158
159 #[rstest]
160 fn test_encode_deltas_schema() {
161 let batch = encode_deltas(&[]).unwrap();
162 let fields = batch.schema().fields().clone();
163 assert_eq!(fields.len(), 10);
164 assert_eq!(fields[0].name(), "instrument_id");
165 assert_eq!(fields[0].data_type(), &DataType::Utf8);
166 assert_eq!(fields[1].name(), "action");
167 assert_eq!(fields[1].data_type(), &DataType::Utf8);
168 assert_eq!(fields[2].name(), "side");
169 assert_eq!(fields[2].data_type(), &DataType::Utf8);
170 assert_eq!(fields[3].name(), "price");
171 assert_eq!(fields[3].data_type(), &DataType::Float64);
172 assert_eq!(fields[4].name(), "size");
173 assert_eq!(fields[4].data_type(), &DataType::Float64);
174 assert_eq!(fields[5].name(), "order_id");
175 assert_eq!(fields[5].data_type(), &DataType::Utf8);
176 assert_eq!(fields[6].name(), "flags");
177 assert_eq!(fields[6].data_type(), &DataType::UInt8);
178 assert_eq!(fields[7].name(), "sequence");
179 assert_eq!(fields[7].data_type(), &DataType::UInt64);
180 assert_eq!(fields[8].name(), "ts_event");
181 assert_eq!(
182 fields[8].data_type(),
183 &DataType::Timestamp(TimeUnit::Nanosecond, None)
184 );
185 assert_eq!(fields[9].name(), "ts_init");
186 }
187
188 #[rstest]
189 fn test_encode_deltas_values() {
190 let deltas = vec![
191 make_delta(
192 "AAPL.XNAS",
193 BookAction::Add,
194 OrderSide::Buy,
195 "100.10",
196 1,
197 10,
198 1_000,
199 ),
200 make_delta(
201 "AAPL.XNAS",
202 BookAction::Update,
203 OrderSide::Sell,
204 "100.20",
205 2,
206 11,
207 2_000,
208 ),
209 ];
210 let batch = encode_deltas(&deltas).unwrap();
211
212 assert_eq!(batch.num_rows(), 2);
213
214 let action_col = batch
215 .column(1)
216 .as_any()
217 .downcast_ref::<StringArray>()
218 .unwrap();
219 let side_col = batch
220 .column(2)
221 .as_any()
222 .downcast_ref::<StringArray>()
223 .unwrap();
224 let price_col = batch
225 .column(3)
226 .as_any()
227 .downcast_ref::<Float64Array>()
228 .unwrap();
229 let size_col = batch
230 .column(4)
231 .as_any()
232 .downcast_ref::<Float64Array>()
233 .unwrap();
234 let order_id_col = batch
235 .column(5)
236 .as_any()
237 .downcast_ref::<StringArray>()
238 .unwrap();
239 let flags_col = batch
240 .column(6)
241 .as_any()
242 .downcast_ref::<UInt8Array>()
243 .unwrap();
244 let sequence_col = batch
245 .column(7)
246 .as_any()
247 .downcast_ref::<UInt64Array>()
248 .unwrap();
249 let ts_event_col = batch
250 .column(8)
251 .as_any()
252 .downcast_ref::<TimestampNanosecondArray>()
253 .unwrap();
254
255 assert_eq!(action_col.value(0), format!("{}", BookAction::Add));
256 assert_eq!(action_col.value(1), format!("{}", BookAction::Update));
257 assert_eq!(side_col.value(0), format!("{}", OrderSide::Buy));
258 assert_eq!(side_col.value(1), format!("{}", OrderSide::Sell));
259 assert!((price_col.value(0) - 100.10).abs() < 1e-9);
260 assert!((price_col.value(1) - 100.20).abs() < 1e-9);
261 assert!((size_col.value(0) - 100.0).abs() < 1e-9);
262 assert_eq!(order_id_col.value(0), "1");
263 assert_eq!(order_id_col.value(1), "2");
264 assert_eq!(flags_col.value(0), 0);
265 assert_eq!(sequence_col.value(0), 10);
266 assert_eq!(sequence_col.value(1), 11);
267 assert_eq!(ts_event_col.value(0), 1_000);
268 }
269
270 #[rstest]
271 fn test_encode_deltas_empty() {
272 let batch = encode_deltas(&[]).unwrap();
273 assert_eq!(batch.num_rows(), 0);
274 }
275
276 #[rstest]
277 fn test_encode_deltas_live_clear_renders_as_nan() {
278 let clear = OrderBookDelta::clear(InstrumentId::from("AAPL.XNAS"), 1, 1.into(), 2.into());
279
280 let batch = encode_deltas(&[clear]).unwrap();
281 let price_col = batch
282 .column(3)
283 .as_any()
284 .downcast_ref::<Float64Array>()
285 .unwrap();
286 let size_col = batch
287 .column(4)
288 .as_any()
289 .downcast_ref::<Float64Array>()
290 .unwrap();
291
292 assert!(
293 price_col.value(0).is_nan(),
294 "live clear price should be NaN"
295 );
296 assert!(size_col.value(0).is_nan(), "live clear size should be NaN");
297 }
298
299 #[rstest]
300 fn test_encode_deltas_clear_sentinels_render_as_nan() {
301 let clear = OrderBookDelta {
302 instrument_id: InstrumentId::from("AAPL.XNAS"),
303 action: BookAction::Clear,
304 order: BookOrder {
305 side: OrderSide::NoOrderSide,
306 price: Price::from_raw(PRICE_UNDEF, 0),
307 size: Quantity::from_raw(QUANTITY_UNDEF, 0),
308 order_id: 0,
309 },
310 flags: 0,
311 sequence: 1,
312 ts_event: 1.into(),
313 ts_init: 2.into(),
314 };
315
316 let batch = encode_deltas(&[clear]).unwrap();
317 let price_col = batch
318 .column(3)
319 .as_any()
320 .downcast_ref::<Float64Array>()
321 .unwrap();
322 let size_col = batch
323 .column(4)
324 .as_any()
325 .downcast_ref::<Float64Array>()
326 .unwrap();
327
328 assert!(price_col.value(0).is_nan(), "clear price should be NaN");
329 assert!(size_col.value(0).is_nan(), "clear size should be NaN");
330 }
331
332 #[rstest]
333 fn test_encode_deltas_mixed_instruments() {
334 let deltas = vec![
335 make_delta(
336 "AAPL.XNAS",
337 BookAction::Add,
338 OrderSide::Buy,
339 "100.10",
340 1,
341 1,
342 1,
343 ),
344 make_delta(
345 "MSFT.XNAS",
346 BookAction::Add,
347 OrderSide::Sell,
348 "250.00",
349 2,
350 1,
351 2,
352 ),
353 ];
354 let batch = encode_deltas(&deltas).unwrap();
355 let instrument_id_col = batch
356 .column(0)
357 .as_any()
358 .downcast_ref::<StringArray>()
359 .unwrap();
360 assert_eq!(instrument_id_col.value(0), "AAPL.XNAS");
361 assert_eq!(instrument_id_col.value(1), "MSFT.XNAS");
362 }
363}