Skip to main content

nautilus_serialization/arrow/display/
report.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Display-mode Arrow encoder for [`OrderStatusReport`].
17
18use std::sync::Arc;
19
20use arrow::{
21    array::{BooleanBuilder, Float64Builder, StringBuilder, TimestampNanosecondBuilder},
22    datatypes::Schema,
23    error::ArrowError,
24    record_batch::RecordBatch,
25};
26use nautilus_model::reports::OrderStatusReport;
27use rust_decimal::prelude::ToPrimitive;
28
29use super::{
30    bool_field, float64_field, quantity_to_f64, timestamp_field, unix_nanos_to_i64, utf8_field,
31};
32
33/// Returns the display-mode Arrow schema for [`OrderStatusReport`].
34#[must_use]
35pub fn order_status_report_schema() -> Schema {
36    Schema::new(vec![
37        utf8_field("account_id", false),
38        utf8_field("instrument_id", false),
39        utf8_field("client_order_id", true),
40        utf8_field("venue_order_id", false),
41        utf8_field("order_side", false),
42        utf8_field("order_type", false),
43        utf8_field("time_in_force", false),
44        utf8_field("order_status", false),
45        float64_field("quantity", false),
46        float64_field("filled_qty", false),
47        utf8_field("report_id", false),
48        timestamp_field("ts_accepted", false),
49        timestamp_field("ts_last", false),
50        timestamp_field("ts_init", false),
51        utf8_field("order_list_id", true),
52        utf8_field("venue_position_id", true),
53        utf8_field("linked_order_ids", true),
54        utf8_field("parent_order_id", true),
55        utf8_field("contingency_type", false),
56        timestamp_field("expire_time", true),
57        float64_field("price", true),
58        float64_field("trigger_price", true),
59        utf8_field("trigger_type", true),
60        float64_field("limit_offset", true),
61        float64_field("trailing_offset", true),
62        utf8_field("trailing_offset_type", false),
63        float64_field("avg_px", true),
64        float64_field("display_qty", true),
65        bool_field("post_only", false),
66        bool_field("reduce_only", false),
67        utf8_field("cancel_reason", true),
68        timestamp_field("ts_triggered", true),
69    ])
70}
71
72/// Encodes order status reports as a display-friendly Arrow [`RecordBatch`].
73///
74/// Emits `Float64` columns for quantities, prices, and offsets,
75/// `Timestamp(Nanosecond)` columns for all time fields, and `Utf8` columns for
76/// identifiers and enums. Mixed-instrument batches are supported.
77///
78/// Returns an empty [`RecordBatch`] with the correct schema when `data` is empty.
79///
80/// # Errors
81///
82/// Returns an [`ArrowError`] if the Arrow `RecordBatch` cannot be constructed.
83pub fn encode_order_status_reports(data: &[OrderStatusReport]) -> Result<RecordBatch, ArrowError> {
84    let mut account_id = StringBuilder::new();
85    let mut instrument_id = StringBuilder::new();
86    let mut client_order_id = StringBuilder::new();
87    let mut venue_order_id = StringBuilder::new();
88    let mut order_side = StringBuilder::new();
89    let mut order_type = StringBuilder::new();
90    let mut time_in_force = StringBuilder::new();
91    let mut order_status = StringBuilder::new();
92    let mut quantity = Float64Builder::with_capacity(data.len());
93    let mut filled_qty = Float64Builder::with_capacity(data.len());
94    let mut report_id = StringBuilder::new();
95    let mut ts_accepted = TimestampNanosecondBuilder::with_capacity(data.len());
96    let mut ts_last = TimestampNanosecondBuilder::with_capacity(data.len());
97    let mut ts_init = TimestampNanosecondBuilder::with_capacity(data.len());
98    let mut order_list_id = StringBuilder::new();
99    let mut venue_position_id = StringBuilder::new();
100    let mut linked_order_ids = StringBuilder::new();
101    let mut parent_order_id = StringBuilder::new();
102    let mut contingency_type = StringBuilder::new();
103    let mut expire_time = TimestampNanosecondBuilder::with_capacity(data.len());
104    let mut price = Float64Builder::with_capacity(data.len());
105    let mut trigger_price = Float64Builder::with_capacity(data.len());
106    let mut trigger_type = StringBuilder::new();
107    let mut limit_offset = Float64Builder::with_capacity(data.len());
108    let mut trailing_offset = Float64Builder::with_capacity(data.len());
109    let mut trailing_offset_type = StringBuilder::new();
110    let mut avg_px = Float64Builder::with_capacity(data.len());
111    let mut display_qty = Float64Builder::with_capacity(data.len());
112    let mut post_only = BooleanBuilder::with_capacity(data.len());
113    let mut reduce_only = BooleanBuilder::with_capacity(data.len());
114    let mut cancel_reason = StringBuilder::new();
115    let mut ts_triggered = TimestampNanosecondBuilder::with_capacity(data.len());
116
117    for report in data {
118        account_id.append_value(report.account_id);
119        instrument_id.append_value(report.instrument_id.to_string());
120        client_order_id.append_option(report.client_order_id.map(|v| v.to_string()));
121        venue_order_id.append_value(report.venue_order_id);
122        order_side.append_value(format!("{}", report.order_side));
123        order_type.append_value(format!("{}", report.order_type));
124        time_in_force.append_value(format!("{}", report.time_in_force));
125        order_status.append_value(format!("{}", report.order_status));
126        quantity.append_value(quantity_to_f64(&report.quantity));
127        filled_qty.append_value(quantity_to_f64(&report.filled_qty));
128        report_id.append_value(report.report_id.to_string());
129        ts_accepted.append_value(unix_nanos_to_i64(report.ts_accepted.as_u64()));
130        ts_last.append_value(unix_nanos_to_i64(report.ts_last.as_u64()));
131        ts_init.append_value(unix_nanos_to_i64(report.ts_init.as_u64()));
132        order_list_id.append_option(report.order_list_id.map(|v| v.to_string()));
133        venue_position_id.append_option(report.venue_position_id.map(|v| v.to_string()));
134        linked_order_ids.append_option(report.linked_order_ids.as_ref().map(|ids| {
135            let values: Vec<String> = ids.iter().map(ToString::to_string).collect();
136            serde_json::to_string(&values).unwrap_or_default()
137        }));
138        parent_order_id.append_option(report.parent_order_id.map(|v| v.to_string()));
139        contingency_type.append_value(format!("{}", report.contingency_type));
140        expire_time.append_option(report.expire_time.map(|v| unix_nanos_to_i64(v.as_u64())));
141        price.append_option(report.price.map(|v| v.as_f64()));
142        trigger_price.append_option(report.trigger_price.map(|v| v.as_f64()));
143        trigger_type.append_option(report.trigger_type.map(|v| format!("{v}")));
144        limit_offset.append_option(report.limit_offset.and_then(|v| v.to_f64()));
145        trailing_offset.append_option(report.trailing_offset.and_then(|v| v.to_f64()));
146        trailing_offset_type.append_value(format!("{}", report.trailing_offset_type));
147        avg_px.append_option(report.avg_px.and_then(|v| v.to_f64()));
148        display_qty.append_option(report.display_qty.map(|v| quantity_to_f64(&v)));
149        post_only.append_value(report.post_only);
150        reduce_only.append_value(report.reduce_only);
151        cancel_reason.append_option(report.cancel_reason.clone());
152        ts_triggered.append_option(report.ts_triggered.map(|v| unix_nanos_to_i64(v.as_u64())));
153    }
154
155    RecordBatch::try_new(
156        Arc::new(order_status_report_schema()),
157        vec![
158            Arc::new(account_id.finish()),
159            Arc::new(instrument_id.finish()),
160            Arc::new(client_order_id.finish()),
161            Arc::new(venue_order_id.finish()),
162            Arc::new(order_side.finish()),
163            Arc::new(order_type.finish()),
164            Arc::new(time_in_force.finish()),
165            Arc::new(order_status.finish()),
166            Arc::new(quantity.finish()),
167            Arc::new(filled_qty.finish()),
168            Arc::new(report_id.finish()),
169            Arc::new(ts_accepted.finish()),
170            Arc::new(ts_last.finish()),
171            Arc::new(ts_init.finish()),
172            Arc::new(order_list_id.finish()),
173            Arc::new(venue_position_id.finish()),
174            Arc::new(linked_order_ids.finish()),
175            Arc::new(parent_order_id.finish()),
176            Arc::new(contingency_type.finish()),
177            Arc::new(expire_time.finish()),
178            Arc::new(price.finish()),
179            Arc::new(trigger_price.finish()),
180            Arc::new(trigger_type.finish()),
181            Arc::new(limit_offset.finish()),
182            Arc::new(trailing_offset.finish()),
183            Arc::new(trailing_offset_type.finish()),
184            Arc::new(avg_px.finish()),
185            Arc::new(display_qty.finish()),
186            Arc::new(post_only.finish()),
187            Arc::new(reduce_only.finish()),
188            Arc::new(cancel_reason.finish()),
189            Arc::new(ts_triggered.finish()),
190        ],
191    )
192}
193
194#[cfg(test)]
195mod tests {
196    use arrow::{
197        array::{Array, BooleanArray, Float64Array, StringArray, TimestampNanosecondArray},
198        datatypes::{DataType, TimeUnit},
199    };
200    use nautilus_core::UUID4;
201    use nautilus_model::{
202        enums::{
203            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
204        },
205        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
206        types::{Price, Quantity},
207    };
208    use rstest::rstest;
209
210    use super::*;
211
212    fn make_report(instrument_id: &str, ts: u64) -> OrderStatusReport {
213        OrderStatusReport {
214            account_id: AccountId::from("SIM-001"),
215            instrument_id: InstrumentId::from(instrument_id),
216            client_order_id: Some(ClientOrderId::from("O-001")),
217            venue_order_id: VenueOrderId::from("V-001"),
218            order_side: OrderSide::Buy,
219            order_type: OrderType::Limit,
220            time_in_force: TimeInForce::Gtc,
221            order_status: OrderStatus::Accepted,
222            quantity: Quantity::from(100),
223            filled_qty: Quantity::from(50),
224            report_id: UUID4::default(),
225            ts_accepted: ts.into(),
226            ts_last: (ts + 1_000).into(),
227            ts_init: (ts + 1).into(),
228            order_list_id: None,
229            venue_position_id: None,
230            linked_order_ids: None,
231            parent_order_id: None,
232            contingency_type: ContingencyType::NoContingency,
233            expire_time: None,
234            price: Some(Price::from("100.50")),
235            trigger_price: None,
236            trigger_type: None,
237            limit_offset: None,
238            trailing_offset: None,
239            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
240            avg_px: None,
241            display_qty: None,
242            post_only: true,
243            reduce_only: false,
244            cancel_reason: None,
245            ts_triggered: None,
246        }
247    }
248
249    #[rstest]
250    fn test_encode_order_status_reports_schema() {
251        let batch = encode_order_status_reports(&[]).unwrap();
252        let schema = batch.schema();
253        let fields = schema.fields();
254        assert_eq!(fields.len(), 32);
255        assert_eq!(fields[0].name(), "account_id");
256        assert_eq!(fields[0].data_type(), &DataType::Utf8);
257        assert_eq!(fields[8].name(), "quantity");
258        assert_eq!(fields[8].data_type(), &DataType::Float64);
259        assert_eq!(fields[11].name(), "ts_accepted");
260        assert_eq!(
261            fields[11].data_type(),
262            &DataType::Timestamp(TimeUnit::Nanosecond, None)
263        );
264        assert_eq!(fields[28].name(), "post_only");
265        assert_eq!(fields[28].data_type(), &DataType::Boolean);
266    }
267
268    #[rstest]
269    fn test_encode_order_status_reports_values() {
270        let reports = vec![make_report("AAPL.XNAS", 1_000_000)];
271        let batch = encode_order_status_reports(&reports).unwrap();
272
273        assert_eq!(batch.num_rows(), 1);
274
275        let quantity_col = batch
276            .column(8)
277            .as_any()
278            .downcast_ref::<Float64Array>()
279            .unwrap();
280        let filled_qty_col = batch
281            .column(9)
282            .as_any()
283            .downcast_ref::<Float64Array>()
284            .unwrap();
285        let price_col = batch
286            .column(20)
287            .as_any()
288            .downcast_ref::<Float64Array>()
289            .unwrap();
290        let post_only_col = batch
291            .column(28)
292            .as_any()
293            .downcast_ref::<BooleanArray>()
294            .unwrap();
295        let ts_accepted_col = batch
296            .column(11)
297            .as_any()
298            .downcast_ref::<TimestampNanosecondArray>()
299            .unwrap();
300
301        assert!((quantity_col.value(0) - 100.0).abs() < 1e-9);
302        assert!((filled_qty_col.value(0) - 50.0).abs() < 1e-9);
303        assert!((price_col.value(0) - 100.50).abs() < 1e-9);
304        assert!(post_only_col.value(0));
305        assert_eq!(ts_accepted_col.value(0), 1_000_000);
306    }
307
308    #[rstest]
309    fn test_encode_order_status_reports_linked_order_ids_round_trip() {
310        let mut report = make_report("AAPL.XNAS", 1_000);
311        report.linked_order_ids = Some(vec![
312            ClientOrderId::from("O-Z"),
313            ClientOrderId::from("O-A"),
314            ClientOrderId::from("O-M"),
315        ]);
316        let batch = encode_order_status_reports(&[report]).unwrap();
317
318        let linked_col = batch
319            .column(16)
320            .as_any()
321            .downcast_ref::<StringArray>()
322            .unwrap();
323        assert!(!linked_col.is_null(0));
324
325        let parsed: Vec<String> = serde_json::from_str(linked_col.value(0)).unwrap();
326        assert_eq!(parsed, vec!["O-Z", "O-A", "O-M"]);
327    }
328
329    #[rstest]
330    fn test_encode_order_status_reports_linked_order_ids_null_when_absent() {
331        let batch = encode_order_status_reports(&[make_report("AAPL.XNAS", 1_000)]).unwrap();
332        let linked_col = batch
333            .column(16)
334            .as_any()
335            .downcast_ref::<StringArray>()
336            .unwrap();
337        assert!(linked_col.is_null(0));
338    }
339
340    #[rstest]
341    fn test_encode_order_status_reports_nullable_fields() {
342        let reports = vec![make_report("AAPL.XNAS", 1_000)];
343        let batch = encode_order_status_reports(&reports).unwrap();
344
345        let trigger_price_col = batch
346            .column(21)
347            .as_any()
348            .downcast_ref::<Float64Array>()
349            .unwrap();
350        let expire_time_col = batch
351            .column(19)
352            .as_any()
353            .downcast_ref::<TimestampNanosecondArray>()
354            .unwrap();
355
356        assert!(trigger_price_col.is_null(0));
357        assert!(expire_time_col.is_null(0));
358    }
359
360    #[rstest]
361    fn test_encode_order_status_reports_empty() {
362        let batch = encode_order_status_reports(&[]).unwrap();
363        assert_eq!(batch.num_rows(), 0);
364        assert_eq!(batch.schema().fields().len(), 32);
365    }
366
367    #[rstest]
368    fn test_encode_order_status_reports_mixed_instruments() {
369        let reports = vec![make_report("AAPL.XNAS", 1), make_report("MSFT.XNAS", 2)];
370        let batch = encode_order_status_reports(&reports).unwrap();
371
372        let instrument_id_col = batch
373            .column(1)
374            .as_any()
375            .downcast_ref::<StringArray>()
376            .unwrap();
377        assert_eq!(instrument_id_col.value(0), "AAPL.XNAS");
378        assert_eq!(instrument_id_col.value(1), "MSFT.XNAS");
379    }
380}