Skip to main content

nautilus_serialization/arrow/display/
delta.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Display-mode Arrow encoder for [`OrderBookDelta`].
17
18use std::sync::Arc;
19
20use arrow::{
21    array::{
22        Float64Builder, StringBuilder, TimestampNanosecondBuilder, UInt8Builder, UInt64Builder,
23    },
24    datatypes::{DataType, Field, Schema},
25    error::ArrowError,
26    record_batch::RecordBatch,
27};
28use nautilus_model::{data::OrderBookDelta, enums::BookAction};
29
30use super::{
31    float64_field, price_to_f64, quantity_to_f64, timestamp_field, unix_nanos_to_i64, utf8_field,
32};
33
34/// Returns the display-mode Arrow schema for [`OrderBookDelta`].
35#[must_use]
36pub fn deltas_schema() -> Schema {
37    Schema::new(vec![
38        utf8_field("instrument_id", false),
39        utf8_field("action", false),
40        utf8_field("side", false),
41        float64_field("price", false),
42        float64_field("size", false),
43        utf8_field("order_id", false),
44        Field::new("flags", DataType::UInt8, false),
45        Field::new("sequence", DataType::UInt64, false),
46        timestamp_field("ts_event", false),
47        timestamp_field("ts_init", false),
48    ])
49}
50
51/// Encodes order book deltas as a display-friendly Arrow [`RecordBatch`].
52///
53/// Prices and sizes render as `Float64`, action and side render as `Utf8`
54/// via their `Display` implementations, and `order_id` becomes `Utf8` so
55/// numerically large IDs survive display in dashboards. Mixed-instrument
56/// batches are supported. Precision is lost on the conversion to `f64`;
57/// use [`crate::arrow::book_deltas_to_arrow_record_batch_bytes`] for catalog
58/// storage.
59///
60/// Returns an empty [`RecordBatch`] with the correct schema when `data` is empty.
61///
62/// # Errors
63///
64/// Returns an [`ArrowError`] if the Arrow `RecordBatch` cannot be constructed.
65pub fn encode_deltas(data: &[OrderBookDelta]) -> Result<RecordBatch, ArrowError> {
66    let mut instrument_id_builder = StringBuilder::new();
67    let mut action_builder = StringBuilder::new();
68    let mut side_builder = StringBuilder::new();
69    let mut price_builder = Float64Builder::with_capacity(data.len());
70    let mut size_builder = Float64Builder::with_capacity(data.len());
71    let mut order_id_builder = StringBuilder::new();
72    let mut flags_builder = UInt8Builder::with_capacity(data.len());
73    let mut sequence_builder = UInt64Builder::with_capacity(data.len());
74    let mut ts_event_builder = TimestampNanosecondBuilder::with_capacity(data.len());
75    let mut ts_init_builder = TimestampNanosecondBuilder::with_capacity(data.len());
76
77    for delta in data {
78        instrument_id_builder.append_value(delta.instrument_id.to_string());
79        action_builder.append_value(format!("{}", delta.action));
80        side_builder.append_value(format!("{}", delta.order.side));
81
82        // A `Clear` delta carries a `NULL_ORDER` (zero price/size) and has no
83        // meaningful order to render; emit `NaN` so dashboards show empty
84        // cells rather than a phantom zero-priced order.
85        if delta.action == BookAction::Clear {
86            price_builder.append_value(f64::NAN);
87            size_builder.append_value(f64::NAN);
88        } else {
89            price_builder.append_value(price_to_f64(&delta.order.price));
90            size_builder.append_value(quantity_to_f64(&delta.order.size));
91        }
92        order_id_builder.append_value(delta.order.order_id.to_string());
93        flags_builder.append_value(delta.flags);
94        sequence_builder.append_value(delta.sequence);
95        ts_event_builder.append_value(unix_nanos_to_i64(delta.ts_event.as_u64()));
96        ts_init_builder.append_value(unix_nanos_to_i64(delta.ts_init.as_u64()));
97    }
98
99    RecordBatch::try_new(
100        Arc::new(deltas_schema()),
101        vec![
102            Arc::new(instrument_id_builder.finish()),
103            Arc::new(action_builder.finish()),
104            Arc::new(side_builder.finish()),
105            Arc::new(price_builder.finish()),
106            Arc::new(size_builder.finish()),
107            Arc::new(order_id_builder.finish()),
108            Arc::new(flags_builder.finish()),
109            Arc::new(sequence_builder.finish()),
110            Arc::new(ts_event_builder.finish()),
111            Arc::new(ts_init_builder.finish()),
112        ],
113    )
114}
115
116#[cfg(test)]
117mod tests {
118    use arrow::{
119        array::{
120            Array, Float64Array, StringArray, TimestampNanosecondArray, UInt8Array, UInt64Array,
121        },
122        datatypes::TimeUnit,
123    };
124    use nautilus_model::{
125        data::order::BookOrder,
126        enums::{BookAction, OrderSide},
127        identifiers::InstrumentId,
128        types::{Price, Quantity, price::PRICE_UNDEF, quantity::QUANTITY_UNDEF},
129    };
130    use rstest::rstest;
131
132    use super::*;
133
134    fn make_delta(
135        instrument_id: &str,
136        action: BookAction,
137        side: OrderSide,
138        price: &str,
139        order_id: u64,
140        sequence: u64,
141        ts: u64,
142    ) -> OrderBookDelta {
143        OrderBookDelta {
144            instrument_id: InstrumentId::from(instrument_id),
145            action,
146            order: BookOrder {
147                side,
148                price: Price::from(price),
149                size: Quantity::from(100),
150                order_id,
151            },
152            flags: 0,
153            sequence,
154            ts_event: ts.into(),
155            ts_init: (ts + 1).into(),
156        }
157    }
158
159    #[rstest]
160    fn test_encode_deltas_schema() {
161        let batch = encode_deltas(&[]).unwrap();
162        let fields = batch.schema().fields().clone();
163        assert_eq!(fields.len(), 10);
164        assert_eq!(fields[0].name(), "instrument_id");
165        assert_eq!(fields[0].data_type(), &DataType::Utf8);
166        assert_eq!(fields[1].name(), "action");
167        assert_eq!(fields[1].data_type(), &DataType::Utf8);
168        assert_eq!(fields[2].name(), "side");
169        assert_eq!(fields[2].data_type(), &DataType::Utf8);
170        assert_eq!(fields[3].name(), "price");
171        assert_eq!(fields[3].data_type(), &DataType::Float64);
172        assert_eq!(fields[4].name(), "size");
173        assert_eq!(fields[4].data_type(), &DataType::Float64);
174        assert_eq!(fields[5].name(), "order_id");
175        assert_eq!(fields[5].data_type(), &DataType::Utf8);
176        assert_eq!(fields[6].name(), "flags");
177        assert_eq!(fields[6].data_type(), &DataType::UInt8);
178        assert_eq!(fields[7].name(), "sequence");
179        assert_eq!(fields[7].data_type(), &DataType::UInt64);
180        assert_eq!(fields[8].name(), "ts_event");
181        assert_eq!(
182            fields[8].data_type(),
183            &DataType::Timestamp(TimeUnit::Nanosecond, None)
184        );
185        assert_eq!(fields[9].name(), "ts_init");
186    }
187
188    #[rstest]
189    fn test_encode_deltas_values() {
190        let deltas = vec![
191            make_delta(
192                "AAPL.XNAS",
193                BookAction::Add,
194                OrderSide::Buy,
195                "100.10",
196                1,
197                10,
198                1_000,
199            ),
200            make_delta(
201                "AAPL.XNAS",
202                BookAction::Update,
203                OrderSide::Sell,
204                "100.20",
205                2,
206                11,
207                2_000,
208            ),
209        ];
210        let batch = encode_deltas(&deltas).unwrap();
211
212        assert_eq!(batch.num_rows(), 2);
213
214        let action_col = batch
215            .column(1)
216            .as_any()
217            .downcast_ref::<StringArray>()
218            .unwrap();
219        let side_col = batch
220            .column(2)
221            .as_any()
222            .downcast_ref::<StringArray>()
223            .unwrap();
224        let price_col = batch
225            .column(3)
226            .as_any()
227            .downcast_ref::<Float64Array>()
228            .unwrap();
229        let size_col = batch
230            .column(4)
231            .as_any()
232            .downcast_ref::<Float64Array>()
233            .unwrap();
234        let order_id_col = batch
235            .column(5)
236            .as_any()
237            .downcast_ref::<StringArray>()
238            .unwrap();
239        let flags_col = batch
240            .column(6)
241            .as_any()
242            .downcast_ref::<UInt8Array>()
243            .unwrap();
244        let sequence_col = batch
245            .column(7)
246            .as_any()
247            .downcast_ref::<UInt64Array>()
248            .unwrap();
249        let ts_event_col = batch
250            .column(8)
251            .as_any()
252            .downcast_ref::<TimestampNanosecondArray>()
253            .unwrap();
254
255        assert_eq!(action_col.value(0), format!("{}", BookAction::Add));
256        assert_eq!(action_col.value(1), format!("{}", BookAction::Update));
257        assert_eq!(side_col.value(0), format!("{}", OrderSide::Buy));
258        assert_eq!(side_col.value(1), format!("{}", OrderSide::Sell));
259        assert!((price_col.value(0) - 100.10).abs() < 1e-9);
260        assert!((price_col.value(1) - 100.20).abs() < 1e-9);
261        assert!((size_col.value(0) - 100.0).abs() < 1e-9);
262        assert_eq!(order_id_col.value(0), "1");
263        assert_eq!(order_id_col.value(1), "2");
264        assert_eq!(flags_col.value(0), 0);
265        assert_eq!(sequence_col.value(0), 10);
266        assert_eq!(sequence_col.value(1), 11);
267        assert_eq!(ts_event_col.value(0), 1_000);
268    }
269
270    #[rstest]
271    fn test_encode_deltas_empty() {
272        let batch = encode_deltas(&[]).unwrap();
273        assert_eq!(batch.num_rows(), 0);
274    }
275
276    #[rstest]
277    fn test_encode_deltas_live_clear_renders_as_nan() {
278        let clear = OrderBookDelta::clear(InstrumentId::from("AAPL.XNAS"), 1, 1.into(), 2.into());
279
280        let batch = encode_deltas(&[clear]).unwrap();
281        let price_col = batch
282            .column(3)
283            .as_any()
284            .downcast_ref::<Float64Array>()
285            .unwrap();
286        let size_col = batch
287            .column(4)
288            .as_any()
289            .downcast_ref::<Float64Array>()
290            .unwrap();
291
292        assert!(
293            price_col.value(0).is_nan(),
294            "live clear price should be NaN"
295        );
296        assert!(size_col.value(0).is_nan(), "live clear size should be NaN");
297    }
298
299    #[rstest]
300    fn test_encode_deltas_clear_sentinels_render_as_nan() {
301        let clear = OrderBookDelta {
302            instrument_id: InstrumentId::from("AAPL.XNAS"),
303            action: BookAction::Clear,
304            order: BookOrder {
305                side: OrderSide::NoOrderSide,
306                price: Price::from_raw(PRICE_UNDEF, 0),
307                size: Quantity::from_raw(QUANTITY_UNDEF, 0),
308                order_id: 0,
309            },
310            flags: 0,
311            sequence: 1,
312            ts_event: 1.into(),
313            ts_init: 2.into(),
314        };
315
316        let batch = encode_deltas(&[clear]).unwrap();
317        let price_col = batch
318            .column(3)
319            .as_any()
320            .downcast_ref::<Float64Array>()
321            .unwrap();
322        let size_col = batch
323            .column(4)
324            .as_any()
325            .downcast_ref::<Float64Array>()
326            .unwrap();
327
328        assert!(price_col.value(0).is_nan(), "clear price should be NaN");
329        assert!(size_col.value(0).is_nan(), "clear size should be NaN");
330    }
331
332    #[rstest]
333    fn test_encode_deltas_mixed_instruments() {
334        let deltas = vec![
335            make_delta(
336                "AAPL.XNAS",
337                BookAction::Add,
338                OrderSide::Buy,
339                "100.10",
340                1,
341                1,
342                1,
343            ),
344            make_delta(
345                "MSFT.XNAS",
346                BookAction::Add,
347                OrderSide::Sell,
348                "250.00",
349                2,
350                1,
351                2,
352            ),
353        ];
354        let batch = encode_deltas(&deltas).unwrap();
355        let instrument_id_col = batch
356            .column(0)
357            .as_any()
358            .downcast_ref::<StringArray>()
359            .unwrap();
360        assert_eq!(instrument_id_col.value(0), "AAPL.XNAS");
361        assert_eq!(instrument_id_col.value(1), "MSFT.XNAS");
362    }
363}