nautilus_serialization/arrow/display/
order_filled.rs1use std::sync::Arc;
19
20use arrow::{
21 array::{BooleanBuilder, Float64Builder, StringBuilder, TimestampNanosecondBuilder},
22 datatypes::Schema,
23 error::ArrowError,
24 record_batch::RecordBatch,
25};
26use nautilus_model::events::OrderFilled;
27
28use super::{
29 bool_field, float64_field, price_to_f64, quantity_to_f64, timestamp_field, unix_nanos_to_i64,
30 utf8_field,
31};
32
33#[must_use]
35pub fn order_filled_schema() -> Schema {
36 Schema::new(vec![
37 utf8_field("trader_id", false),
38 utf8_field("strategy_id", false),
39 utf8_field("instrument_id", false),
40 utf8_field("client_order_id", false),
41 utf8_field("venue_order_id", false),
42 utf8_field("account_id", false),
43 utf8_field("trade_id", false),
44 utf8_field("order_side", false),
45 utf8_field("order_type", false),
46 float64_field("last_qty", false),
47 float64_field("last_px", false),
48 utf8_field("currency", false),
49 utf8_field("liquidity_side", false),
50 utf8_field("event_id", false),
51 timestamp_field("ts_event", false),
52 timestamp_field("ts_init", false),
53 bool_field("reconciliation", false),
54 utf8_field("position_id", true),
55 utf8_field("commission", true),
56 ])
57}
58
59pub fn encode_order_fills(data: &[OrderFilled]) -> Result<RecordBatch, ArrowError> {
72 let mut trader_id = StringBuilder::new();
73 let mut strategy_id = StringBuilder::new();
74 let mut instrument_id = StringBuilder::new();
75 let mut client_order_id = StringBuilder::new();
76 let mut venue_order_id = StringBuilder::new();
77 let mut account_id = StringBuilder::new();
78 let mut trade_id = StringBuilder::new();
79 let mut order_side = StringBuilder::new();
80 let mut order_type = StringBuilder::new();
81 let mut last_qty = Float64Builder::with_capacity(data.len());
82 let mut last_px = Float64Builder::with_capacity(data.len());
83 let mut currency = StringBuilder::new();
84 let mut liquidity_side = StringBuilder::new();
85 let mut event_id = StringBuilder::new();
86 let mut ts_event = TimestampNanosecondBuilder::with_capacity(data.len());
87 let mut ts_init = TimestampNanosecondBuilder::with_capacity(data.len());
88 let mut reconciliation = BooleanBuilder::with_capacity(data.len());
89 let mut position_id = StringBuilder::new();
90 let mut commission = StringBuilder::new();
91
92 for fill in data {
93 trader_id.append_value(fill.trader_id);
94 strategy_id.append_value(fill.strategy_id);
95 instrument_id.append_value(fill.instrument_id.to_string());
96 client_order_id.append_value(fill.client_order_id);
97 venue_order_id.append_value(fill.venue_order_id);
98 account_id.append_value(fill.account_id);
99 trade_id.append_value(fill.trade_id.to_string());
100 order_side.append_value(format!("{}", fill.order_side));
101 order_type.append_value(format!("{}", fill.order_type));
102 last_qty.append_value(quantity_to_f64(&fill.last_qty));
103 last_px.append_value(price_to_f64(&fill.last_px));
104 currency.append_value(fill.currency.to_string());
105 liquidity_side.append_value(format!("{}", fill.liquidity_side));
106 event_id.append_value(fill.event_id.to_string());
107 ts_event.append_value(unix_nanos_to_i64(fill.ts_event.as_u64()));
108 ts_init.append_value(unix_nanos_to_i64(fill.ts_init.as_u64()));
109 reconciliation.append_value(fill.reconciliation);
110 position_id.append_option(fill.position_id.map(|v| v.to_string()));
111 commission.append_option(fill.commission.map(|v| format!("{v}")));
112 }
113
114 RecordBatch::try_new(
115 Arc::new(order_filled_schema()),
116 vec![
117 Arc::new(trader_id.finish()),
118 Arc::new(strategy_id.finish()),
119 Arc::new(instrument_id.finish()),
120 Arc::new(client_order_id.finish()),
121 Arc::new(venue_order_id.finish()),
122 Arc::new(account_id.finish()),
123 Arc::new(trade_id.finish()),
124 Arc::new(order_side.finish()),
125 Arc::new(order_type.finish()),
126 Arc::new(last_qty.finish()),
127 Arc::new(last_px.finish()),
128 Arc::new(currency.finish()),
129 Arc::new(liquidity_side.finish()),
130 Arc::new(event_id.finish()),
131 Arc::new(ts_event.finish()),
132 Arc::new(ts_init.finish()),
133 Arc::new(reconciliation.finish()),
134 Arc::new(position_id.finish()),
135 Arc::new(commission.finish()),
136 ],
137 )
138}
139
140#[cfg(test)]
141mod tests {
142 use arrow::{
143 array::{Array, BooleanArray, Float64Array, StringArray, TimestampNanosecondArray},
144 datatypes::{DataType, TimeUnit},
145 };
146 use nautilus_core::UUID4;
147 use nautilus_model::{
148 enums::{LiquiditySide, OrderSide, OrderType},
149 identifiers::{
150 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId,
151 VenueOrderId,
152 },
153 types::{Currency, Money, Price, Quantity},
154 };
155 use rstest::rstest;
156
157 use super::*;
158
159 fn make_fill(instrument_id: &str, commission: Option<Money>, ts: u64) -> OrderFilled {
160 OrderFilled {
161 trader_id: TraderId::from("TESTER-001"),
162 strategy_id: StrategyId::from("S-001"),
163 instrument_id: InstrumentId::from(instrument_id),
164 client_order_id: ClientOrderId::from("O-001"),
165 venue_order_id: VenueOrderId::from("V-001"),
166 account_id: AccountId::from("SIM-001"),
167 trade_id: TradeId::new("T-001"),
168 order_side: OrderSide::Buy,
169 order_type: OrderType::Limit,
170 last_qty: Quantity::from(100),
171 last_px: Price::from("50.25"),
172 currency: Currency::USD(),
173 liquidity_side: LiquiditySide::Maker,
174 event_id: UUID4::default(),
175 ts_event: ts.into(),
176 ts_init: (ts + 1).into(),
177 reconciliation: false,
178 position_id: Some(PositionId::from("P-001")),
179 commission,
180 }
181 }
182
183 #[rstest]
184 fn test_encode_order_fills_schema() {
185 let batch = encode_order_fills(&[]).unwrap();
186 let schema = batch.schema();
187 let fields = schema.fields();
188 assert_eq!(fields.len(), 19);
189 assert_eq!(fields[0].name(), "trader_id");
190 assert_eq!(fields[0].data_type(), &DataType::Utf8);
191 assert_eq!(fields[9].name(), "last_qty");
192 assert_eq!(fields[9].data_type(), &DataType::Float64);
193 assert_eq!(fields[10].name(), "last_px");
194 assert_eq!(fields[10].data_type(), &DataType::Float64);
195 assert_eq!(fields[14].name(), "ts_event");
196 assert_eq!(
197 fields[14].data_type(),
198 &DataType::Timestamp(TimeUnit::Nanosecond, None)
199 );
200 assert_eq!(fields[16].name(), "reconciliation");
201 assert_eq!(fields[16].data_type(), &DataType::Boolean);
202 assert_eq!(fields[18].name(), "commission");
203 assert!(fields[18].is_nullable());
204 }
205
206 #[rstest]
207 fn test_encode_order_fills_values() {
208 let commission = Money::new(10.50, Currency::USD());
209 let fills = vec![make_fill("AAPL.XNAS", Some(commission), 1_000)];
210 let batch = encode_order_fills(&fills).unwrap();
211
212 assert_eq!(batch.num_rows(), 1);
213
214 let last_qty_col = batch
215 .column(9)
216 .as_any()
217 .downcast_ref::<Float64Array>()
218 .unwrap();
219 let last_px_col = batch
220 .column(10)
221 .as_any()
222 .downcast_ref::<Float64Array>()
223 .unwrap();
224 let ts_event_col = batch
225 .column(14)
226 .as_any()
227 .downcast_ref::<TimestampNanosecondArray>()
228 .unwrap();
229 let reconciliation_col = batch
230 .column(16)
231 .as_any()
232 .downcast_ref::<BooleanArray>()
233 .unwrap();
234 let commission_col = batch
235 .column(18)
236 .as_any()
237 .downcast_ref::<StringArray>()
238 .unwrap();
239
240 assert!((last_qty_col.value(0) - 100.0).abs() < 1e-9);
241 assert!((last_px_col.value(0) - 50.25).abs() < 1e-9);
242 assert_eq!(ts_event_col.value(0), 1_000);
243 assert!(!reconciliation_col.value(0));
244 assert_eq!(commission_col.value(0), "10.50 USD");
245 }
246
247 #[rstest]
248 fn test_encode_order_fills_null_commission() {
249 let fills = vec![make_fill("AAPL.XNAS", None, 1_000)];
250 let batch = encode_order_fills(&fills).unwrap();
251
252 let commission_col = batch
253 .column(18)
254 .as_any()
255 .downcast_ref::<StringArray>()
256 .unwrap();
257 assert!(commission_col.is_null(0));
258 }
259
260 #[rstest]
261 fn test_encode_order_fills_empty() {
262 let batch = encode_order_fills(&[]).unwrap();
263 assert_eq!(batch.num_rows(), 0);
264 assert_eq!(batch.schema().fields().len(), 19);
265 }
266
267 #[rstest]
268 fn test_encode_order_fills_mixed_instruments() {
269 let fills = vec![
270 make_fill("AAPL.XNAS", None, 1),
271 make_fill("MSFT.XNAS", None, 2),
272 ];
273 let batch = encode_order_fills(&fills).unwrap();
274
275 let instrument_id_col = batch
276 .column(2)
277 .as_any()
278 .downcast_ref::<StringArray>()
279 .unwrap();
280 assert_eq!(instrument_id_col.value(0), "AAPL.XNAS");
281 assert_eq!(instrument_id_col.value(1), "MSFT.XNAS");
282 }
283}