1use std::sync::Arc;
19
20use arrow::{
21 array::{BooleanBuilder, Float64Builder, StringBuilder, TimestampNanosecondBuilder},
22 datatypes::Schema,
23 error::ArrowError,
24 record_batch::RecordBatch,
25};
26use nautilus_model::reports::OrderStatusReport;
27use rust_decimal::prelude::ToPrimitive;
28
29use super::{
30 bool_field, float64_field, quantity_to_f64, timestamp_field, unix_nanos_to_i64, utf8_field,
31};
32
33#[must_use]
35pub fn order_status_report_schema() -> Schema {
36 Schema::new(vec![
37 utf8_field("account_id", false),
38 utf8_field("instrument_id", false),
39 utf8_field("client_order_id", true),
40 utf8_field("venue_order_id", false),
41 utf8_field("order_side", false),
42 utf8_field("order_type", false),
43 utf8_field("time_in_force", false),
44 utf8_field("order_status", false),
45 float64_field("quantity", false),
46 float64_field("filled_qty", false),
47 utf8_field("report_id", false),
48 timestamp_field("ts_accepted", false),
49 timestamp_field("ts_last", false),
50 timestamp_field("ts_init", false),
51 utf8_field("order_list_id", true),
52 utf8_field("venue_position_id", true),
53 utf8_field("linked_order_ids", true),
54 utf8_field("parent_order_id", true),
55 utf8_field("contingency_type", false),
56 timestamp_field("expire_time", true),
57 float64_field("price", true),
58 float64_field("trigger_price", true),
59 utf8_field("trigger_type", true),
60 float64_field("limit_offset", true),
61 float64_field("trailing_offset", true),
62 utf8_field("trailing_offset_type", false),
63 float64_field("avg_px", true),
64 float64_field("display_qty", true),
65 bool_field("post_only", false),
66 bool_field("reduce_only", false),
67 utf8_field("cancel_reason", true),
68 timestamp_field("ts_triggered", true),
69 ])
70}
71
72pub fn encode_order_status_reports(data: &[OrderStatusReport]) -> Result<RecordBatch, ArrowError> {
84 let mut account_id = StringBuilder::new();
85 let mut instrument_id = StringBuilder::new();
86 let mut client_order_id = StringBuilder::new();
87 let mut venue_order_id = StringBuilder::new();
88 let mut order_side = StringBuilder::new();
89 let mut order_type = StringBuilder::new();
90 let mut time_in_force = StringBuilder::new();
91 let mut order_status = StringBuilder::new();
92 let mut quantity = Float64Builder::with_capacity(data.len());
93 let mut filled_qty = Float64Builder::with_capacity(data.len());
94 let mut report_id = StringBuilder::new();
95 let mut ts_accepted = TimestampNanosecondBuilder::with_capacity(data.len());
96 let mut ts_last = TimestampNanosecondBuilder::with_capacity(data.len());
97 let mut ts_init = TimestampNanosecondBuilder::with_capacity(data.len());
98 let mut order_list_id = StringBuilder::new();
99 let mut venue_position_id = StringBuilder::new();
100 let mut linked_order_ids = StringBuilder::new();
101 let mut parent_order_id = StringBuilder::new();
102 let mut contingency_type = StringBuilder::new();
103 let mut expire_time = TimestampNanosecondBuilder::with_capacity(data.len());
104 let mut price = Float64Builder::with_capacity(data.len());
105 let mut trigger_price = Float64Builder::with_capacity(data.len());
106 let mut trigger_type = StringBuilder::new();
107 let mut limit_offset = Float64Builder::with_capacity(data.len());
108 let mut trailing_offset = Float64Builder::with_capacity(data.len());
109 let mut trailing_offset_type = StringBuilder::new();
110 let mut avg_px = Float64Builder::with_capacity(data.len());
111 let mut display_qty = Float64Builder::with_capacity(data.len());
112 let mut post_only = BooleanBuilder::with_capacity(data.len());
113 let mut reduce_only = BooleanBuilder::with_capacity(data.len());
114 let mut cancel_reason = StringBuilder::new();
115 let mut ts_triggered = TimestampNanosecondBuilder::with_capacity(data.len());
116
117 for report in data {
118 account_id.append_value(report.account_id);
119 instrument_id.append_value(report.instrument_id.to_string());
120 client_order_id.append_option(report.client_order_id.map(|v| v.to_string()));
121 venue_order_id.append_value(report.venue_order_id);
122 order_side.append_value(format!("{}", report.order_side));
123 order_type.append_value(format!("{}", report.order_type));
124 time_in_force.append_value(format!("{}", report.time_in_force));
125 order_status.append_value(format!("{}", report.order_status));
126 quantity.append_value(quantity_to_f64(&report.quantity));
127 filled_qty.append_value(quantity_to_f64(&report.filled_qty));
128 report_id.append_value(report.report_id.to_string());
129 ts_accepted.append_value(unix_nanos_to_i64(report.ts_accepted.as_u64()));
130 ts_last.append_value(unix_nanos_to_i64(report.ts_last.as_u64()));
131 ts_init.append_value(unix_nanos_to_i64(report.ts_init.as_u64()));
132 order_list_id.append_option(report.order_list_id.map(|v| v.to_string()));
133 venue_position_id.append_option(report.venue_position_id.map(|v| v.to_string()));
134 linked_order_ids.append_option(report.linked_order_ids.as_ref().map(|ids| {
135 let values: Vec<String> = ids.iter().map(ToString::to_string).collect();
136 serde_json::to_string(&values).unwrap_or_default()
137 }));
138 parent_order_id.append_option(report.parent_order_id.map(|v| v.to_string()));
139 contingency_type.append_value(format!("{}", report.contingency_type));
140 expire_time.append_option(report.expire_time.map(|v| unix_nanos_to_i64(v.as_u64())));
141 price.append_option(report.price.map(|v| v.as_f64()));
142 trigger_price.append_option(report.trigger_price.map(|v| v.as_f64()));
143 trigger_type.append_option(report.trigger_type.map(|v| format!("{v}")));
144 limit_offset.append_option(report.limit_offset.and_then(|v| v.to_f64()));
145 trailing_offset.append_option(report.trailing_offset.and_then(|v| v.to_f64()));
146 trailing_offset_type.append_value(format!("{}", report.trailing_offset_type));
147 avg_px.append_option(report.avg_px.and_then(|v| v.to_f64()));
148 display_qty.append_option(report.display_qty.map(|v| quantity_to_f64(&v)));
149 post_only.append_value(report.post_only);
150 reduce_only.append_value(report.reduce_only);
151 cancel_reason.append_option(report.cancel_reason.clone());
152 ts_triggered.append_option(report.ts_triggered.map(|v| unix_nanos_to_i64(v.as_u64())));
153 }
154
155 RecordBatch::try_new(
156 Arc::new(order_status_report_schema()),
157 vec![
158 Arc::new(account_id.finish()),
159 Arc::new(instrument_id.finish()),
160 Arc::new(client_order_id.finish()),
161 Arc::new(venue_order_id.finish()),
162 Arc::new(order_side.finish()),
163 Arc::new(order_type.finish()),
164 Arc::new(time_in_force.finish()),
165 Arc::new(order_status.finish()),
166 Arc::new(quantity.finish()),
167 Arc::new(filled_qty.finish()),
168 Arc::new(report_id.finish()),
169 Arc::new(ts_accepted.finish()),
170 Arc::new(ts_last.finish()),
171 Arc::new(ts_init.finish()),
172 Arc::new(order_list_id.finish()),
173 Arc::new(venue_position_id.finish()),
174 Arc::new(linked_order_ids.finish()),
175 Arc::new(parent_order_id.finish()),
176 Arc::new(contingency_type.finish()),
177 Arc::new(expire_time.finish()),
178 Arc::new(price.finish()),
179 Arc::new(trigger_price.finish()),
180 Arc::new(trigger_type.finish()),
181 Arc::new(limit_offset.finish()),
182 Arc::new(trailing_offset.finish()),
183 Arc::new(trailing_offset_type.finish()),
184 Arc::new(avg_px.finish()),
185 Arc::new(display_qty.finish()),
186 Arc::new(post_only.finish()),
187 Arc::new(reduce_only.finish()),
188 Arc::new(cancel_reason.finish()),
189 Arc::new(ts_triggered.finish()),
190 ],
191 )
192}
193
194#[cfg(test)]
195mod tests {
196 use arrow::{
197 array::{Array, BooleanArray, Float64Array, StringArray, TimestampNanosecondArray},
198 datatypes::{DataType, TimeUnit},
199 };
200 use nautilus_core::UUID4;
201 use nautilus_model::{
202 enums::{
203 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
204 },
205 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
206 types::{Price, Quantity},
207 };
208 use rstest::rstest;
209
210 use super::*;
211
212 fn make_report(instrument_id: &str, ts: u64) -> OrderStatusReport {
213 OrderStatusReport {
214 account_id: AccountId::from("SIM-001"),
215 instrument_id: InstrumentId::from(instrument_id),
216 client_order_id: Some(ClientOrderId::from("O-001")),
217 venue_order_id: VenueOrderId::from("V-001"),
218 order_side: OrderSide::Buy,
219 order_type: OrderType::Limit,
220 time_in_force: TimeInForce::Gtc,
221 order_status: OrderStatus::Accepted,
222 quantity: Quantity::from(100),
223 filled_qty: Quantity::from(50),
224 report_id: UUID4::default(),
225 ts_accepted: ts.into(),
226 ts_last: (ts + 1_000).into(),
227 ts_init: (ts + 1).into(),
228 order_list_id: None,
229 venue_position_id: None,
230 linked_order_ids: None,
231 parent_order_id: None,
232 contingency_type: ContingencyType::NoContingency,
233 expire_time: None,
234 price: Some(Price::from("100.50")),
235 trigger_price: None,
236 trigger_type: None,
237 limit_offset: None,
238 trailing_offset: None,
239 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
240 avg_px: None,
241 display_qty: None,
242 post_only: true,
243 reduce_only: false,
244 cancel_reason: None,
245 ts_triggered: None,
246 }
247 }
248
249 #[rstest]
250 fn test_encode_order_status_reports_schema() {
251 let batch = encode_order_status_reports(&[]).unwrap();
252 let schema = batch.schema();
253 let fields = schema.fields();
254 assert_eq!(fields.len(), 32);
255 assert_eq!(fields[0].name(), "account_id");
256 assert_eq!(fields[0].data_type(), &DataType::Utf8);
257 assert_eq!(fields[8].name(), "quantity");
258 assert_eq!(fields[8].data_type(), &DataType::Float64);
259 assert_eq!(fields[11].name(), "ts_accepted");
260 assert_eq!(
261 fields[11].data_type(),
262 &DataType::Timestamp(TimeUnit::Nanosecond, None)
263 );
264 assert_eq!(fields[28].name(), "post_only");
265 assert_eq!(fields[28].data_type(), &DataType::Boolean);
266 }
267
268 #[rstest]
269 fn test_encode_order_status_reports_values() {
270 let reports = vec![make_report("AAPL.XNAS", 1_000_000)];
271 let batch = encode_order_status_reports(&reports).unwrap();
272
273 assert_eq!(batch.num_rows(), 1);
274
275 let quantity_col = batch
276 .column(8)
277 .as_any()
278 .downcast_ref::<Float64Array>()
279 .unwrap();
280 let filled_qty_col = batch
281 .column(9)
282 .as_any()
283 .downcast_ref::<Float64Array>()
284 .unwrap();
285 let price_col = batch
286 .column(20)
287 .as_any()
288 .downcast_ref::<Float64Array>()
289 .unwrap();
290 let post_only_col = batch
291 .column(28)
292 .as_any()
293 .downcast_ref::<BooleanArray>()
294 .unwrap();
295 let ts_accepted_col = batch
296 .column(11)
297 .as_any()
298 .downcast_ref::<TimestampNanosecondArray>()
299 .unwrap();
300
301 assert!((quantity_col.value(0) - 100.0).abs() < 1e-9);
302 assert!((filled_qty_col.value(0) - 50.0).abs() < 1e-9);
303 assert!((price_col.value(0) - 100.50).abs() < 1e-9);
304 assert!(post_only_col.value(0));
305 assert_eq!(ts_accepted_col.value(0), 1_000_000);
306 }
307
308 #[rstest]
309 fn test_encode_order_status_reports_linked_order_ids_round_trip() {
310 let mut report = make_report("AAPL.XNAS", 1_000);
311 report.linked_order_ids = Some(vec![
312 ClientOrderId::from("O-Z"),
313 ClientOrderId::from("O-A"),
314 ClientOrderId::from("O-M"),
315 ]);
316 let batch = encode_order_status_reports(&[report]).unwrap();
317
318 let linked_col = batch
319 .column(16)
320 .as_any()
321 .downcast_ref::<StringArray>()
322 .unwrap();
323 assert!(!linked_col.is_null(0));
324
325 let parsed: Vec<String> = serde_json::from_str(linked_col.value(0)).unwrap();
326 assert_eq!(parsed, vec!["O-Z", "O-A", "O-M"]);
327 }
328
329 #[rstest]
330 fn test_encode_order_status_reports_linked_order_ids_null_when_absent() {
331 let batch = encode_order_status_reports(&[make_report("AAPL.XNAS", 1_000)]).unwrap();
332 let linked_col = batch
333 .column(16)
334 .as_any()
335 .downcast_ref::<StringArray>()
336 .unwrap();
337 assert!(linked_col.is_null(0));
338 }
339
340 #[rstest]
341 fn test_encode_order_status_reports_nullable_fields() {
342 let reports = vec![make_report("AAPL.XNAS", 1_000)];
343 let batch = encode_order_status_reports(&reports).unwrap();
344
345 let trigger_price_col = batch
346 .column(21)
347 .as_any()
348 .downcast_ref::<Float64Array>()
349 .unwrap();
350 let expire_time_col = batch
351 .column(19)
352 .as_any()
353 .downcast_ref::<TimestampNanosecondArray>()
354 .unwrap();
355
356 assert!(trigger_price_col.is_null(0));
357 assert!(expire_time_col.is_null(0));
358 }
359
360 #[rstest]
361 fn test_encode_order_status_reports_empty() {
362 let batch = encode_order_status_reports(&[]).unwrap();
363 assert_eq!(batch.num_rows(), 0);
364 assert_eq!(batch.schema().fields().len(), 32);
365 }
366
367 #[rstest]
368 fn test_encode_order_status_reports_mixed_instruments() {
369 let reports = vec![make_report("AAPL.XNAS", 1), make_report("MSFT.XNAS", 2)];
370 let batch = encode_order_status_reports(&reports).unwrap();
371
372 let instrument_id_col = batch
373 .column(1)
374 .as_any()
375 .downcast_ref::<StringArray>()
376 .unwrap();
377 assert_eq!(instrument_id_col.value(0), "AAPL.XNAS");
378 assert_eq!(instrument_id_col.value(1), "MSFT.XNAS");
379 }
380}