Skip to main content

nautilus_serialization/arrow/display/
depth.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Display-mode Arrow encoder for [`OrderBookDepth10`].
17
18use std::sync::Arc;
19
20use arrow::{
21    array::{
22        ArrayRef, Float64Builder, StringBuilder, TimestampNanosecondBuilder, UInt8Builder,
23        UInt32Builder, UInt64Builder,
24    },
25    datatypes::{DataType, Field, Schema},
26    error::ArrowError,
27    record_batch::RecordBatch,
28};
29use nautilus_model::data::depth::{DEPTH10_LEN, OrderBookDepth10};
30
31use super::{
32    float64_field, price_to_f64, quantity_to_f64, timestamp_field, unix_nanos_to_i64, utf8_field,
33};
34
35/// Returns the display-mode Arrow schema for [`OrderBookDepth10`].
36///
37/// Column order: `instrument_id`, then all `bid_price_{0..N}`,
38/// `ask_price_{0..N}`, `bid_size_{0..N}`, `ask_size_{0..N}`,
39/// `bid_count_{0..N}`, `ask_count_{0..N}`, then `flags`, `sequence`,
40/// `ts_event`, `ts_init`.
41#[must_use]
42pub fn depth10_schema() -> Schema {
43    let mut fields = Vec::with_capacity(1 + 6 * DEPTH10_LEN + 4);
44    fields.push(utf8_field("instrument_id", false));
45
46    for i in 0..DEPTH10_LEN {
47        fields.push(float64_field(&format!("bid_price_{i}"), false));
48    }
49
50    for i in 0..DEPTH10_LEN {
51        fields.push(float64_field(&format!("ask_price_{i}"), false));
52    }
53
54    for i in 0..DEPTH10_LEN {
55        fields.push(float64_field(&format!("bid_size_{i}"), false));
56    }
57
58    for i in 0..DEPTH10_LEN {
59        fields.push(float64_field(&format!("ask_size_{i}"), false));
60    }
61
62    for i in 0..DEPTH10_LEN {
63        fields.push(Field::new(
64            format!("bid_count_{i}"),
65            DataType::UInt32,
66            false,
67        ));
68    }
69
70    for i in 0..DEPTH10_LEN {
71        fields.push(Field::new(
72            format!("ask_count_{i}"),
73            DataType::UInt32,
74            false,
75        ));
76    }
77
78    fields.push(Field::new("flags", DataType::UInt8, false));
79    fields.push(Field::new("sequence", DataType::UInt64, false));
80    fields.push(timestamp_field("ts_event", false));
81    fields.push(timestamp_field("ts_init", false));
82
83    Schema::new(fields)
84}
85
86/// Encodes depth-10 snapshots as a display-friendly Arrow [`RecordBatch`].
87///
88/// Emits `Float64` columns per level for prices and sizes, `UInt32` columns
89/// per level for counts, a `Utf8` `instrument_id` column, and
90/// `Timestamp(Nanosecond)` columns for event and init times. Mixed-instrument
91/// batches are supported. Precision is lost on the conversion to `f64`; use
92/// [`crate::arrow::book_depth10_to_arrow_record_batch_bytes`] for catalog
93/// storage.
94///
95/// Returns an empty [`RecordBatch`] with the correct schema when `data` is empty.
96///
97/// # Errors
98///
99/// Returns an [`ArrowError`] if the Arrow `RecordBatch` cannot be constructed.
100pub fn encode_depth10(data: &[OrderBookDepth10]) -> Result<RecordBatch, ArrowError> {
101    let mut instrument_id_builder = StringBuilder::new();
102    let mut bid_price_builders: Vec<Float64Builder> = (0..DEPTH10_LEN)
103        .map(|_| Float64Builder::with_capacity(data.len()))
104        .collect();
105    let mut ask_price_builders: Vec<Float64Builder> = (0..DEPTH10_LEN)
106        .map(|_| Float64Builder::with_capacity(data.len()))
107        .collect();
108    let mut bid_size_builders: Vec<Float64Builder> = (0..DEPTH10_LEN)
109        .map(|_| Float64Builder::with_capacity(data.len()))
110        .collect();
111    let mut ask_size_builders: Vec<Float64Builder> = (0..DEPTH10_LEN)
112        .map(|_| Float64Builder::with_capacity(data.len()))
113        .collect();
114    let mut bid_count_builders: Vec<UInt32Builder> = (0..DEPTH10_LEN)
115        .map(|_| UInt32Builder::with_capacity(data.len()))
116        .collect();
117    let mut ask_count_builders: Vec<UInt32Builder> = (0..DEPTH10_LEN)
118        .map(|_| UInt32Builder::with_capacity(data.len()))
119        .collect();
120    let mut flags_builder = UInt8Builder::with_capacity(data.len());
121    let mut sequence_builder = UInt64Builder::with_capacity(data.len());
122    let mut ts_event_builder = TimestampNanosecondBuilder::with_capacity(data.len());
123    let mut ts_init_builder = TimestampNanosecondBuilder::with_capacity(data.len());
124
125    for depth in data {
126        instrument_id_builder.append_value(depth.instrument_id.to_string());
127        for i in 0..DEPTH10_LEN {
128            bid_price_builders[i].append_value(price_to_f64(&depth.bids[i].price));
129            ask_price_builders[i].append_value(price_to_f64(&depth.asks[i].price));
130            bid_size_builders[i].append_value(quantity_to_f64(&depth.bids[i].size));
131            ask_size_builders[i].append_value(quantity_to_f64(&depth.asks[i].size));
132            bid_count_builders[i].append_value(depth.bid_counts[i]);
133            ask_count_builders[i].append_value(depth.ask_counts[i]);
134        }
135        flags_builder.append_value(depth.flags);
136        sequence_builder.append_value(depth.sequence);
137        ts_event_builder.append_value(unix_nanos_to_i64(depth.ts_event.as_u64()));
138        ts_init_builder.append_value(unix_nanos_to_i64(depth.ts_init.as_u64()));
139    }
140
141    let mut columns: Vec<ArrayRef> = Vec::with_capacity(1 + 6 * DEPTH10_LEN + 4);
142    columns.push(Arc::new(instrument_id_builder.finish()));
143
144    for mut b in bid_price_builders {
145        columns.push(Arc::new(b.finish()));
146    }
147
148    for mut b in ask_price_builders {
149        columns.push(Arc::new(b.finish()));
150    }
151
152    for mut b in bid_size_builders {
153        columns.push(Arc::new(b.finish()));
154    }
155
156    for mut b in ask_size_builders {
157        columns.push(Arc::new(b.finish()));
158    }
159
160    for mut b in bid_count_builders {
161        columns.push(Arc::new(b.finish()));
162    }
163
164    for mut b in ask_count_builders {
165        columns.push(Arc::new(b.finish()));
166    }
167
168    columns.push(Arc::new(flags_builder.finish()));
169    columns.push(Arc::new(sequence_builder.finish()));
170    columns.push(Arc::new(ts_event_builder.finish()));
171    columns.push(Arc::new(ts_init_builder.finish()));
172
173    RecordBatch::try_new(Arc::new(depth10_schema()), columns)
174}
175
176#[cfg(test)]
177mod tests {
178    use arrow::{
179        array::{
180            Array, Float64Array, StringArray, TimestampNanosecondArray, UInt8Array, UInt32Array,
181            UInt64Array,
182        },
183        datatypes::TimeUnit,
184    };
185    use nautilus_model::{
186        data::{order::BookOrder, stubs::stub_depth10},
187        identifiers::InstrumentId,
188        types::{Price, Quantity},
189    };
190    use rstest::rstest;
191
192    use super::*;
193
194    #[rstest]
195    fn test_encode_depth10_schema() {
196        let batch = encode_depth10(&[]).unwrap();
197        let fields = batch.schema().fields().clone();
198
199        let expected_len = 1 + 6 * DEPTH10_LEN + 4;
200        assert_eq!(fields.len(), expected_len);
201
202        assert_eq!(fields[0].name(), "instrument_id");
203        assert_eq!(fields[0].data_type(), &DataType::Utf8);
204
205        for i in 0..DEPTH10_LEN {
206            assert_eq!(fields[1 + i].name(), &format!("bid_price_{i}"));
207            assert_eq!(fields[1 + i].data_type(), &DataType::Float64);
208        }
209
210        for i in 0..DEPTH10_LEN {
211            assert_eq!(
212                fields[1 + DEPTH10_LEN + i].name(),
213                &format!("ask_price_{i}")
214            );
215        }
216
217        for i in 0..DEPTH10_LEN {
218            assert_eq!(
219                fields[1 + 2 * DEPTH10_LEN + i].name(),
220                &format!("bid_size_{i}")
221            );
222        }
223
224        for i in 0..DEPTH10_LEN {
225            assert_eq!(
226                fields[1 + 3 * DEPTH10_LEN + i].name(),
227                &format!("ask_size_{i}")
228            );
229        }
230
231        for i in 0..DEPTH10_LEN {
232            assert_eq!(
233                fields[1 + 4 * DEPTH10_LEN + i].name(),
234                &format!("bid_count_{i}")
235            );
236            assert_eq!(
237                fields[1 + 4 * DEPTH10_LEN + i].data_type(),
238                &DataType::UInt32
239            );
240        }
241
242        for i in 0..DEPTH10_LEN {
243            assert_eq!(
244                fields[1 + 5 * DEPTH10_LEN + i].name(),
245                &format!("ask_count_{i}")
246            );
247        }
248
249        let trailer_start = 1 + 6 * DEPTH10_LEN;
250        assert_eq!(fields[trailer_start].name(), "flags");
251        assert_eq!(fields[trailer_start].data_type(), &DataType::UInt8);
252        assert_eq!(fields[trailer_start + 1].name(), "sequence");
253        assert_eq!(fields[trailer_start + 1].data_type(), &DataType::UInt64);
254        assert_eq!(fields[trailer_start + 2].name(), "ts_event");
255        assert_eq!(
256            fields[trailer_start + 2].data_type(),
257            &DataType::Timestamp(TimeUnit::Nanosecond, None)
258        );
259        assert_eq!(fields[trailer_start + 3].name(), "ts_init");
260    }
261
262    #[rstest]
263    fn test_encode_depth10_values(stub_depth10: OrderBookDepth10) {
264        let data = vec![stub_depth10];
265        let batch = encode_depth10(&data).unwrap();
266
267        assert_eq!(batch.num_rows(), 1);
268
269        let instrument_id_col = batch
270            .column(0)
271            .as_any()
272            .downcast_ref::<StringArray>()
273            .unwrap();
274        assert_eq!(
275            instrument_id_col.value(0),
276            stub_depth10.instrument_id.to_string()
277        );
278
279        let bid_price_0 = batch
280            .column(1)
281            .as_any()
282            .downcast_ref::<Float64Array>()
283            .unwrap();
284        assert!((bid_price_0.value(0) - stub_depth10.bids[0].price.as_f64()).abs() < 1e-9);
285
286        let ask_price_0 = batch
287            .column(1 + DEPTH10_LEN)
288            .as_any()
289            .downcast_ref::<Float64Array>()
290            .unwrap();
291        assert!((ask_price_0.value(0) - stub_depth10.asks[0].price.as_f64()).abs() < 1e-9);
292
293        let bid_size_0 = batch
294            .column(1 + 2 * DEPTH10_LEN)
295            .as_any()
296            .downcast_ref::<Float64Array>()
297            .unwrap();
298        assert!((bid_size_0.value(0) - stub_depth10.bids[0].size.as_f64()).abs() < 1e-9);
299
300        let bid_count_0 = batch
301            .column(1 + 4 * DEPTH10_LEN)
302            .as_any()
303            .downcast_ref::<UInt32Array>()
304            .unwrap();
305        assert_eq!(bid_count_0.value(0), stub_depth10.bid_counts[0]);
306
307        let trailer_start = 1 + 6 * DEPTH10_LEN;
308        let flags_col = batch
309            .column(trailer_start)
310            .as_any()
311            .downcast_ref::<UInt8Array>()
312            .unwrap();
313        let sequence_col = batch
314            .column(trailer_start + 1)
315            .as_any()
316            .downcast_ref::<UInt64Array>()
317            .unwrap();
318        let ts_event_col = batch
319            .column(trailer_start + 2)
320            .as_any()
321            .downcast_ref::<TimestampNanosecondArray>()
322            .unwrap();
323
324        assert_eq!(flags_col.value(0), stub_depth10.flags);
325        assert_eq!(sequence_col.value(0), stub_depth10.sequence);
326        assert_eq!(ts_event_col.value(0), stub_depth10.ts_event.as_u64() as i64);
327    }
328
329    #[rstest]
330    fn test_encode_depth10_multi_row_values(stub_depth10: OrderBookDepth10) {
331        // Guards against row-indexing bugs in the wide depth10 schema by
332        // placing distinct values at the same level across two rows and
333        // asserting each row-column independently.
334        let row0 = stub_depth10;
335        let mut row1 = stub_depth10;
336        row1.bids[0] = BookOrder::new(
337            row1.bids[0].side,
338            Price::from("200.00"),
339            Quantity::from(250),
340            row1.bids[0].order_id,
341        );
342        row1.asks[0] = BookOrder::new(
343            row1.asks[0].side,
344            Price::from("201.00"),
345            Quantity::from(350),
346            row1.asks[0].order_id,
347        );
348        row1.bid_counts[0] = 42;
349        row1.ask_counts[0] = 43;
350
351        let batch = encode_depth10(&[row0, row1]).unwrap();
352        assert_eq!(batch.num_rows(), 2);
353
354        let bid_price_0 = batch
355            .column(1)
356            .as_any()
357            .downcast_ref::<Float64Array>()
358            .unwrap();
359        let ask_price_0 = batch
360            .column(1 + DEPTH10_LEN)
361            .as_any()
362            .downcast_ref::<Float64Array>()
363            .unwrap();
364        let bid_size_0 = batch
365            .column(1 + 2 * DEPTH10_LEN)
366            .as_any()
367            .downcast_ref::<Float64Array>()
368            .unwrap();
369        let ask_size_0 = batch
370            .column(1 + 3 * DEPTH10_LEN)
371            .as_any()
372            .downcast_ref::<Float64Array>()
373            .unwrap();
374        let bid_count_0 = batch
375            .column(1 + 4 * DEPTH10_LEN)
376            .as_any()
377            .downcast_ref::<UInt32Array>()
378            .unwrap();
379        let ask_count_0 = batch
380            .column(1 + 5 * DEPTH10_LEN)
381            .as_any()
382            .downcast_ref::<UInt32Array>()
383            .unwrap();
384
385        assert!((bid_price_0.value(0) - row0.bids[0].price.as_f64()).abs() < 1e-9);
386        assert!((bid_price_0.value(1) - 200.00).abs() < 1e-9);
387        assert!((ask_price_0.value(0) - row0.asks[0].price.as_f64()).abs() < 1e-9);
388        assert!((ask_price_0.value(1) - 201.00).abs() < 1e-9);
389        assert!((bid_size_0.value(0) - row0.bids[0].size.as_f64()).abs() < 1e-9);
390        assert!((bid_size_0.value(1) - 250.0).abs() < 1e-9);
391        assert!((ask_size_0.value(0) - row0.asks[0].size.as_f64()).abs() < 1e-9);
392        assert!((ask_size_0.value(1) - 350.0).abs() < 1e-9);
393        assert_eq!(bid_count_0.value(0), row0.bid_counts[0]);
394        assert_eq!(bid_count_0.value(1), 42);
395        assert_eq!(ask_count_0.value(0), row0.ask_counts[0]);
396        assert_eq!(ask_count_0.value(1), 43);
397    }
398
399    #[rstest]
400    fn test_encode_depth10_empty() {
401        let batch = encode_depth10(&[]).unwrap();
402        assert_eq!(batch.num_rows(), 0);
403    }
404
405    #[rstest]
406    fn test_encode_depth10_mixed_instruments(stub_depth10: OrderBookDepth10) {
407        let mut other = stub_depth10;
408        other.instrument_id = InstrumentId::from("MSFT.XNAS");
409
410        let data = vec![stub_depth10, other];
411        let batch = encode_depth10(&data).unwrap();
412        assert_eq!(batch.num_rows(), 2);
413
414        let instrument_id_col = batch
415            .column(0)
416            .as_any()
417            .downcast_ref::<StringArray>()
418            .unwrap();
419        assert_eq!(
420            instrument_id_col.value(0),
421            stub_depth10.instrument_id.to_string()
422        );
423        assert_eq!(instrument_id_col.value(1), "MSFT.XNAS");
424    }
425}