1use std::collections::HashMap;
17
18use arrow::{datatypes::Schema, error::ArrowError, record_batch::RecordBatch};
19use nautilus_model::events::{
20 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied, OrderEmulated, OrderExpired,
21 OrderFilled, OrderInitialized, OrderModifyRejected, OrderPendingCancel, OrderPendingUpdate,
22 OrderRejected, OrderReleased, OrderSubmitted, OrderTriggered, OrderUpdated,
23};
24
25use super::{
26 ArrowSchemaProvider, DecodeTypedFromRecordBatch, EncodeToRecordBatch, EncodingError,
27 KEY_INSTRUMENT_ID,
28 json::{JsonFieldSpec, decode_batch, encode_batch, metadata_for_type, schema_for_type},
29};
30
31const ORDER_INITIALIZED_FIELDS: &[JsonFieldSpec] = &[
32 JsonFieldSpec::utf8("trader_id", false),
33 JsonFieldSpec::utf8("strategy_id", false),
34 JsonFieldSpec::utf8("instrument_id", false),
35 JsonFieldSpec::utf8("client_order_id", false),
36 JsonFieldSpec::utf8("order_side", false),
37 JsonFieldSpec::utf8("order_type", false),
38 JsonFieldSpec::utf8("quantity", false),
39 JsonFieldSpec::utf8("time_in_force", false),
40 JsonFieldSpec::boolean("post_only", false),
41 JsonFieldSpec::boolean("reduce_only", false),
42 JsonFieldSpec::boolean("quote_quantity", false),
43 JsonFieldSpec::boolean("reconciliation", false),
44 JsonFieldSpec::utf8("event_id", false),
45 JsonFieldSpec::u64("ts_event", false),
46 JsonFieldSpec::u64("ts_init", false),
47 JsonFieldSpec::utf8("price", true),
48 JsonFieldSpec::utf8("trigger_price", true),
49 JsonFieldSpec::utf8("trigger_type", true),
50 JsonFieldSpec::utf8("limit_offset", true),
51 JsonFieldSpec::utf8("trailing_offset", true),
52 JsonFieldSpec::utf8("trailing_offset_type", true),
53 JsonFieldSpec::u64("expire_time", true),
54 JsonFieldSpec::utf8("display_qty", true),
55 JsonFieldSpec::utf8("emulation_trigger", true),
56 JsonFieldSpec::utf8("trigger_instrument_id", true),
57 JsonFieldSpec::utf8("contingency_type", true),
58 JsonFieldSpec::utf8("order_list_id", true),
59 JsonFieldSpec::utf8_json("linked_order_ids", true),
60 JsonFieldSpec::utf8("parent_order_id", true),
61 JsonFieldSpec::utf8("exec_algorithm_id", true),
62 JsonFieldSpec::utf8_json("exec_algorithm_params", true),
63 JsonFieldSpec::utf8("exec_spawn_id", true),
64 JsonFieldSpec::utf8_json("tags", true),
65];
66
67const ORDER_DENIED_FIELDS: &[JsonFieldSpec] = &[
68 JsonFieldSpec::utf8("trader_id", false),
69 JsonFieldSpec::utf8("strategy_id", false),
70 JsonFieldSpec::utf8("instrument_id", false),
71 JsonFieldSpec::utf8("client_order_id", false),
72 JsonFieldSpec::utf8("reason", false),
73 JsonFieldSpec::utf8("event_id", false),
74 JsonFieldSpec::u64("ts_event", false),
75 JsonFieldSpec::u64("ts_init", false),
76];
77
78const ORDER_EMULATED_FIELDS: &[JsonFieldSpec] = &[
79 JsonFieldSpec::utf8("trader_id", false),
80 JsonFieldSpec::utf8("strategy_id", false),
81 JsonFieldSpec::utf8("instrument_id", false),
82 JsonFieldSpec::utf8("client_order_id", false),
83 JsonFieldSpec::utf8("event_id", false),
84 JsonFieldSpec::u64("ts_event", false),
85 JsonFieldSpec::u64("ts_init", false),
86];
87
88const ORDER_SUBMITTED_FIELDS: &[JsonFieldSpec] = &[
89 JsonFieldSpec::utf8("trader_id", false),
90 JsonFieldSpec::utf8("strategy_id", false),
91 JsonFieldSpec::utf8("instrument_id", false),
92 JsonFieldSpec::utf8("client_order_id", false),
93 JsonFieldSpec::utf8("account_id", false),
94 JsonFieldSpec::utf8("event_id", false),
95 JsonFieldSpec::u64("ts_event", false),
96 JsonFieldSpec::u64("ts_init", false),
97];
98
99const ORDER_ACCEPTED_FIELDS: &[JsonFieldSpec] = &[
100 JsonFieldSpec::utf8("trader_id", false),
101 JsonFieldSpec::utf8("strategy_id", false),
102 JsonFieldSpec::utf8("instrument_id", false),
103 JsonFieldSpec::utf8("client_order_id", false),
104 JsonFieldSpec::utf8("venue_order_id", false),
105 JsonFieldSpec::utf8("account_id", false),
106 JsonFieldSpec::utf8("event_id", false),
107 JsonFieldSpec::u64("ts_event", false),
108 JsonFieldSpec::u64("ts_init", false),
109 JsonFieldSpec::u64("reconciliation", false),
110];
111
112const ORDER_REJECTED_FIELDS: &[JsonFieldSpec] = &[
113 JsonFieldSpec::utf8("trader_id", false),
114 JsonFieldSpec::utf8("strategy_id", false),
115 JsonFieldSpec::utf8("instrument_id", false),
116 JsonFieldSpec::utf8("client_order_id", false),
117 JsonFieldSpec::utf8("account_id", false),
118 JsonFieldSpec::utf8("reason", false),
119 JsonFieldSpec::utf8("event_id", false),
120 JsonFieldSpec::u64("ts_event", false),
121 JsonFieldSpec::u64("ts_init", false),
122 JsonFieldSpec::u64("reconciliation", false),
123 JsonFieldSpec::u64("due_post_only", false),
124];
125
126const ORDER_PENDING_CANCEL_FIELDS: &[JsonFieldSpec] = &[
127 JsonFieldSpec::utf8("trader_id", false),
128 JsonFieldSpec::utf8("strategy_id", false),
129 JsonFieldSpec::utf8("instrument_id", false),
130 JsonFieldSpec::utf8("client_order_id", false),
131 JsonFieldSpec::utf8("account_id", false),
132 JsonFieldSpec::utf8("event_id", false),
133 JsonFieldSpec::u64("ts_event", false),
134 JsonFieldSpec::u64("ts_init", false),
135 JsonFieldSpec::u64("reconciliation", false),
136 JsonFieldSpec::utf8("venue_order_id", true),
137];
138
139const ORDER_CANCELED_FIELDS: &[JsonFieldSpec] = &[
140 JsonFieldSpec::utf8("trader_id", false),
141 JsonFieldSpec::utf8("strategy_id", false),
142 JsonFieldSpec::utf8("instrument_id", false),
143 JsonFieldSpec::utf8("client_order_id", false),
144 JsonFieldSpec::utf8("event_id", false),
145 JsonFieldSpec::u64("ts_event", false),
146 JsonFieldSpec::u64("ts_init", false),
147 JsonFieldSpec::u64("reconciliation", false),
148 JsonFieldSpec::utf8("venue_order_id", true),
149 JsonFieldSpec::utf8("account_id", true),
150];
151
152const ORDER_CANCEL_REJECTED_FIELDS: &[JsonFieldSpec] = &[
153 JsonFieldSpec::utf8("trader_id", false),
154 JsonFieldSpec::utf8("strategy_id", false),
155 JsonFieldSpec::utf8("instrument_id", false),
156 JsonFieldSpec::utf8("client_order_id", false),
157 JsonFieldSpec::utf8("reason", false),
158 JsonFieldSpec::utf8("event_id", false),
159 JsonFieldSpec::u64("ts_event", false),
160 JsonFieldSpec::u64("ts_init", false),
161 JsonFieldSpec::u64("reconciliation", false),
162 JsonFieldSpec::utf8("venue_order_id", true),
163 JsonFieldSpec::utf8("account_id", true),
164];
165
166const ORDER_EXPIRED_FIELDS: &[JsonFieldSpec] = &[
167 JsonFieldSpec::utf8("trader_id", false),
168 JsonFieldSpec::utf8("strategy_id", false),
169 JsonFieldSpec::utf8("instrument_id", false),
170 JsonFieldSpec::utf8("client_order_id", false),
171 JsonFieldSpec::utf8("event_id", false),
172 JsonFieldSpec::u64("ts_event", false),
173 JsonFieldSpec::u64("ts_init", false),
174 JsonFieldSpec::u64("reconciliation", false),
175 JsonFieldSpec::utf8("venue_order_id", true),
176 JsonFieldSpec::utf8("account_id", true),
177];
178
179const ORDER_TRIGGERED_FIELDS: &[JsonFieldSpec] = &[
180 JsonFieldSpec::utf8("trader_id", false),
181 JsonFieldSpec::utf8("strategy_id", false),
182 JsonFieldSpec::utf8("instrument_id", false),
183 JsonFieldSpec::utf8("client_order_id", false),
184 JsonFieldSpec::utf8("event_id", false),
185 JsonFieldSpec::u64("ts_event", false),
186 JsonFieldSpec::u64("ts_init", false),
187 JsonFieldSpec::u64("reconciliation", false),
188 JsonFieldSpec::utf8("venue_order_id", true),
189 JsonFieldSpec::utf8("account_id", true),
190];
191
192const ORDER_PENDING_UPDATE_FIELDS: &[JsonFieldSpec] = &[
193 JsonFieldSpec::utf8("trader_id", false),
194 JsonFieldSpec::utf8("strategy_id", false),
195 JsonFieldSpec::utf8("instrument_id", false),
196 JsonFieldSpec::utf8("client_order_id", false),
197 JsonFieldSpec::utf8("account_id", false),
198 JsonFieldSpec::utf8("event_id", false),
199 JsonFieldSpec::u64("ts_event", false),
200 JsonFieldSpec::u64("ts_init", false),
201 JsonFieldSpec::u64("reconciliation", false),
202 JsonFieldSpec::utf8("venue_order_id", true),
203];
204
205const ORDER_RELEASED_FIELDS: &[JsonFieldSpec] = &[
206 JsonFieldSpec::utf8("trader_id", false),
207 JsonFieldSpec::utf8("strategy_id", false),
208 JsonFieldSpec::utf8("instrument_id", false),
209 JsonFieldSpec::utf8("client_order_id", false),
210 JsonFieldSpec::utf8("released_price", false),
211 JsonFieldSpec::utf8("event_id", false),
212 JsonFieldSpec::u64("ts_event", false),
213 JsonFieldSpec::u64("ts_init", false),
214];
215
216const ORDER_MODIFY_REJECTED_FIELDS: &[JsonFieldSpec] = &[
217 JsonFieldSpec::utf8("trader_id", false),
218 JsonFieldSpec::utf8("strategy_id", false),
219 JsonFieldSpec::utf8("instrument_id", false),
220 JsonFieldSpec::utf8("client_order_id", false),
221 JsonFieldSpec::utf8("reason", false),
222 JsonFieldSpec::utf8("event_id", false),
223 JsonFieldSpec::u64("ts_event", false),
224 JsonFieldSpec::u64("ts_init", false),
225 JsonFieldSpec::u64("reconciliation", false),
226 JsonFieldSpec::utf8("venue_order_id", true),
227 JsonFieldSpec::utf8("account_id", true),
228];
229
230const ORDER_UPDATED_FIELDS: &[JsonFieldSpec] = &[
231 JsonFieldSpec::utf8("trader_id", false),
232 JsonFieldSpec::utf8("strategy_id", false),
233 JsonFieldSpec::utf8("instrument_id", false),
234 JsonFieldSpec::utf8("client_order_id", false),
235 JsonFieldSpec::utf8("venue_order_id", true),
236 JsonFieldSpec::utf8("account_id", true),
237 JsonFieldSpec::utf8("quantity", false),
238 JsonFieldSpec::utf8("price", true),
239 JsonFieldSpec::utf8("trigger_price", true),
240 JsonFieldSpec::utf8("protection_price", true),
241 JsonFieldSpec::boolean("is_quote_quantity", false),
242 JsonFieldSpec::utf8("event_id", false),
243 JsonFieldSpec::u64("ts_event", false),
244 JsonFieldSpec::u64("ts_init", false),
245 JsonFieldSpec::u64("reconciliation", false),
246];
247
248const ORDER_FILLED_FIELDS: &[JsonFieldSpec] = &[
249 JsonFieldSpec::utf8("trader_id", false),
250 JsonFieldSpec::utf8("strategy_id", false),
251 JsonFieldSpec::utf8("instrument_id", false),
252 JsonFieldSpec::utf8("client_order_id", false),
253 JsonFieldSpec::utf8("venue_order_id", false),
254 JsonFieldSpec::utf8("account_id", false),
255 JsonFieldSpec::utf8("trade_id", false),
256 JsonFieldSpec::utf8("order_side", false),
257 JsonFieldSpec::utf8("order_type", false),
258 JsonFieldSpec::utf8("last_qty", false),
259 JsonFieldSpec::utf8("last_px", false),
260 JsonFieldSpec::utf8("currency", false),
261 JsonFieldSpec::utf8("liquidity_side", false),
262 JsonFieldSpec::utf8("event_id", false),
263 JsonFieldSpec::u64("ts_event", false),
264 JsonFieldSpec::u64("ts_init", false),
265 JsonFieldSpec::boolean("reconciliation", false),
266 JsonFieldSpec::utf8("position_id", true),
267 JsonFieldSpec::utf8("commission", true),
268];
269
270fn instrument_metadata(type_name: &'static str, instrument_id: &str) -> HashMap<String, String> {
271 let mut metadata = metadata_for_type(type_name);
272 metadata.insert(KEY_INSTRUMENT_ID.to_string(), instrument_id.to_string());
273 metadata
274}
275
276macro_rules! impl_order_event_arrow {
277 ($type:ty, $type_name:expr, $fields:expr) => {
278 impl ArrowSchemaProvider for $type {
279 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
280 schema_for_type($type_name, metadata, $fields)
281 }
282 }
283
284 impl EncodeToRecordBatch for $type {
285 fn encode_batch(
286 metadata: &HashMap<String, String>,
287 data: &[Self],
288 ) -> Result<RecordBatch, ArrowError> {
289 encode_batch($type_name, metadata, data, $fields)
290 }
291
292 fn metadata(&self) -> HashMap<String, String> {
293 instrument_metadata($type_name, &self.instrument_id.to_string())
294 }
295 }
296
297 impl DecodeTypedFromRecordBatch for $type {
298 fn decode_typed_batch(
299 metadata: &HashMap<String, String>,
300 record_batch: RecordBatch,
301 ) -> Result<Vec<Self>, EncodingError> {
302 decode_batch(metadata, &record_batch, $fields, Some($type_name))
303 }
304 }
305 };
306}
307
308impl_order_event_arrow!(
309 OrderInitialized,
310 "OrderInitialized",
311 ORDER_INITIALIZED_FIELDS
312);
313impl_order_event_arrow!(OrderDenied, "OrderDenied", ORDER_DENIED_FIELDS);
314impl_order_event_arrow!(OrderEmulated, "OrderEmulated", ORDER_EMULATED_FIELDS);
315impl_order_event_arrow!(OrderSubmitted, "OrderSubmitted", ORDER_SUBMITTED_FIELDS);
316impl_order_event_arrow!(OrderAccepted, "OrderAccepted", ORDER_ACCEPTED_FIELDS);
317impl_order_event_arrow!(OrderRejected, "OrderRejected", ORDER_REJECTED_FIELDS);
318impl_order_event_arrow!(
319 OrderPendingCancel,
320 "OrderPendingCancel",
321 ORDER_PENDING_CANCEL_FIELDS
322);
323impl_order_event_arrow!(OrderCanceled, "OrderCanceled", ORDER_CANCELED_FIELDS);
324impl_order_event_arrow!(
325 OrderCancelRejected,
326 "OrderCancelRejected",
327 ORDER_CANCEL_REJECTED_FIELDS
328);
329impl_order_event_arrow!(OrderExpired, "OrderExpired", ORDER_EXPIRED_FIELDS);
330impl_order_event_arrow!(OrderTriggered, "OrderTriggered", ORDER_TRIGGERED_FIELDS);
331impl_order_event_arrow!(
332 OrderPendingUpdate,
333 "OrderPendingUpdate",
334 ORDER_PENDING_UPDATE_FIELDS
335);
336impl_order_event_arrow!(OrderReleased, "OrderReleased", ORDER_RELEASED_FIELDS);
337impl_order_event_arrow!(
338 OrderModifyRejected,
339 "OrderModifyRejected",
340 ORDER_MODIFY_REJECTED_FIELDS
341);
342impl_order_event_arrow!(OrderUpdated, "OrderUpdated", ORDER_UPDATED_FIELDS);
343impl_order_event_arrow!(OrderFilled, "OrderFilled", ORDER_FILLED_FIELDS);
344
345#[cfg(test)]
346mod tests {
347 use std::str::FromStr;
348
349 use nautilus_model::events::order::stubs::{
350 order_accepted, order_cancel_rejected, order_denied_max_submitted_rate, order_emulated,
351 order_expired, order_filled, order_initialized_buy_limit, order_modify_rejected,
352 order_pending_cancel, order_pending_update, order_rejected_insufficient_margin,
353 order_released, order_submitted, order_triggered, order_updated,
354 };
355 use rstest::rstest;
356 use rust_decimal::Decimal;
357
358 use super::*;
359
360 #[rstest]
361 fn test_order_initialized_round_trip(order_initialized_buy_limit: OrderInitialized) {
362 let event = OrderInitialized {
363 limit_offset: Some(Decimal::from_str("0.123456789123456789").unwrap()),
364 trailing_offset: Some(Decimal::from_str("0.987654321987654321").unwrap()),
365 ..order_initialized_buy_limit
366 };
367 let metadata = event.metadata();
368 let batch =
369 OrderInitialized::encode_batch(&metadata, std::slice::from_ref(&event)).unwrap();
370 let decoded =
371 OrderInitialized::decode_typed_batch(batch.schema().metadata(), batch).unwrap();
372
373 assert_eq!(decoded, vec![event]);
374 }
375
376 #[rstest]
377 fn test_order_filled_round_trip(order_filled: OrderFilled) {
378 let event = order_filled;
379 let metadata = event.metadata();
380 let batch = OrderFilled::encode_batch(&metadata, &[event]).unwrap();
381 let decoded = OrderFilled::decode_typed_batch(batch.schema().metadata(), batch).unwrap();
382
383 assert_eq!(decoded, vec![event]);
384 }
385
386 fn roundtrip<T>(event: T)
387 where
388 T: ArrowSchemaProvider
389 + EncodeToRecordBatch
390 + DecodeTypedFromRecordBatch
391 + Clone
392 + PartialEq
393 + std::fmt::Debug,
394 {
395 let metadata = event.metadata();
396 let batch = T::encode_batch(&metadata, std::slice::from_ref(&event)).unwrap();
397 let decoded = T::decode_typed_batch(batch.schema().metadata(), batch).unwrap();
398 assert_eq!(decoded, vec![event]);
399 }
400
401 #[rstest]
402 fn test_order_denied_round_trip(order_denied_max_submitted_rate: OrderDenied) {
403 roundtrip(order_denied_max_submitted_rate);
404 }
405
406 #[rstest]
407 fn test_order_submitted_round_trip(order_submitted: OrderSubmitted) {
408 roundtrip(order_submitted);
409 }
410
411 #[rstest]
412 fn test_order_accepted_round_trip(order_accepted: OrderAccepted) {
413 roundtrip(order_accepted);
414 }
415
416 #[rstest]
417 fn test_order_rejected_round_trip(order_rejected_insufficient_margin: OrderRejected) {
418 roundtrip(order_rejected_insufficient_margin);
419 }
420
421 #[rstest]
422 fn test_order_canceled_round_trip() {
423 use nautilus_model::events::OrderCanceled;
424 roundtrip(OrderCanceled::default());
425 }
426
427 #[rstest]
428 fn test_order_updated_round_trip(order_updated: OrderUpdated) {
429 roundtrip(order_updated);
430 }
431
432 #[rstest]
433 fn test_order_triggered_round_trip(order_triggered: OrderTriggered) {
434 roundtrip(order_triggered);
435 }
436
437 #[rstest]
438 fn test_order_expired_round_trip(order_expired: OrderExpired) {
439 roundtrip(order_expired);
440 }
441
442 #[rstest]
443 fn test_order_pending_update_round_trip(order_pending_update: OrderPendingUpdate) {
444 roundtrip(order_pending_update);
445 }
446
447 #[rstest]
448 fn test_order_pending_cancel_round_trip(order_pending_cancel: OrderPendingCancel) {
449 roundtrip(order_pending_cancel);
450 }
451
452 #[rstest]
453 fn test_order_cancel_rejected_round_trip(order_cancel_rejected: OrderCancelRejected) {
454 roundtrip(order_cancel_rejected);
455 }
456
457 #[rstest]
458 fn test_order_modify_rejected_round_trip(order_modify_rejected: OrderModifyRejected) {
459 roundtrip(order_modify_rejected);
460 }
461
462 #[rstest]
463 fn test_order_emulated_round_trip(order_emulated: OrderEmulated) {
464 roundtrip(order_emulated);
465 }
466
467 #[rstest]
468 fn test_order_released_round_trip(order_released: OrderReleased) {
469 roundtrip(order_released);
470 }
471}