1use std::{
19 future::Future,
20 sync::Mutex,
21 time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::{StreamExt, pin_mut};
27use nautilus_common::{
28 clients::ExecutionClient,
29 live::{get_runtime, runner::get_exec_event_sender},
30 messages::execution::{
31 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
33 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34 },
35};
36use nautilus_core::{
37 AtomicMap, MUTEX_POISONED, UUID4, UnixNanos,
38 time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42 accounts::AccountAny,
43 enums::{AccountType, LiquiditySide, OmsType, OrderSide, OrderStatus, OrderType, TimeInForce},
44 events::{
45 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
46 OrderFilled, OrderRejected, OrderUpdated,
47 },
48 identifiers::{
49 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, Venue, VenueOrderId,
50 },
51 instruments::{Instrument, InstrumentAny},
52 orders::Order,
53 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
54 types::{AccountBalance, MarginBalance, Money, Price, Quantity},
55};
56use tokio::task::JoinHandle;
57use ustr::Ustr;
58
59use crate::{
60 common::{
61 consts::{
62 AX_ACCOUNT_REGISTRATION_TIMEOUT_SECS, AX_AUTH_TOKEN_TTL_EXEC_SECS, AX_POST_ONLY_REJECT,
63 AX_VENUE,
64 },
65 credential::Credential,
66 enums::AxOrderSide,
67 parse::{ax_timestamp_stn_to_unix_nanos, cid_to_client_order_id, quantity_to_contracts},
68 },
69 config::AxExecClientConfig,
70 http::{
71 client::AxHttpClient,
72 models::{AxOrderRejectReason, PreviewAggressiveLimitOrderRequest, ReplaceOrderRequest},
73 },
74 websocket::{
75 AxOrdersWsMessage, AxWsOrderEvent,
76 messages::{AxWsOrder, AxWsTradeExecution, OrderMetadata},
77 orders::{AxOrdersWebSocketClient, OrdersCaches},
78 },
79};
80
81#[derive(Debug)]
83pub struct AxExecutionClient {
84 core: ExecutionClientCore,
85 clock: &'static AtomicTime,
86 config: AxExecClientConfig,
87 emitter: ExecutionEventEmitter,
88 http_client: AxHttpClient,
89 ws_orders: AxOrdersWebSocketClient,
90 ws_stream_handle: Option<JoinHandle<()>>,
91 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
92}
93
94impl AxExecutionClient {
95 pub fn new(core: ExecutionClientCore, config: AxExecClientConfig) -> anyhow::Result<Self> {
101 let http_client = AxHttpClient::with_credentials(
102 config.api_key.clone().unwrap_or_default(),
103 config.api_secret.clone().unwrap_or_default(),
104 Some(config.http_base_url()),
105 Some(config.orders_base_url()),
106 config.http_timeout_secs,
107 config.max_retries,
108 config.retry_delay_initial_ms,
109 config.retry_delay_max_ms,
110 config.proxy_url.clone(),
111 )?;
112
113 let clock = get_atomic_clock_realtime();
114 let trader_id = core.trader_id;
115 let account_id = core.account_id;
116 let emitter =
117 ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
118 let mut ws_url = config.ws_private_url();
119 if config.cancel_on_disconnect {
120 let separator = if ws_url.contains('?') { "&" } else { "?" };
121 ws_url.push_str(&format!("{separator}cancel_on_disconnect=true"));
122 }
123 let ws_orders = AxOrdersWebSocketClient::new(
124 ws_url,
125 account_id,
126 trader_id,
127 config.heartbeat_interval_secs,
128 config.transport_backend,
129 config.proxy_url.clone(),
130 );
131
132 Ok(Self {
133 core,
134 clock,
135 config,
136 emitter,
137 http_client,
138 ws_orders,
139 ws_stream_handle: None,
140 pending_tasks: Mutex::new(Vec::new()),
141 })
142 }
143
144 async fn authenticate(&self) -> anyhow::Result<String> {
145 let credential =
146 Credential::resolve(self.config.api_key.clone(), self.config.api_secret.clone())
147 .context("API credentials not configured")?;
148
149 self.http_client
150 .authenticate(
151 credential.api_key(),
152 credential.api_secret(),
153 AX_AUTH_TOKEN_TTL_EXEC_SECS,
154 )
155 .await
156 .map_err(|e| anyhow::anyhow!("Authentication failed: {e}"))
157 }
158
159 fn update_account_state(&self) {
160 let http_client = self.http_client.clone();
161 let account_id = self.core.account_id;
162 let emitter = self.emitter.clone();
163 let clock = self.clock;
164
165 self.spawn_task("query_account", async move {
166 let account_state = http_client
167 .request_account_state(account_id)
168 .await
169 .context("failed to request AX account state")?;
170 let ts_event = clock.get_time_ns();
171 emitter.emit_account_state(
172 account_state.balances.clone(),
173 account_state.margins.clone(),
174 account_state.is_reported,
175 ts_event,
176 );
177 Ok(())
178 });
179 }
180
181 fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
182 let (
183 client_order_id,
184 strategy_id,
185 instrument_id,
186 order_side,
187 order_type,
188 quantity,
189 trigger_price,
190 time_in_force,
191 is_post_only,
192 limit_price,
193 ) = {
194 let cache = self.core.cache();
195 let order = cache.order(&cmd.client_order_id).ok_or_else(|| {
196 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
197 })?;
198 (
199 order.client_order_id(),
200 order.strategy_id(),
201 order.instrument_id(),
202 order.order_side(),
203 order.order_type(),
204 order.quantity(),
205 order.trigger_price(),
206 order.time_in_force(),
207 order.is_post_only(),
208 order.price(),
209 )
210 };
211
212 let ws_orders = self.ws_orders.clone();
213 let emitter = self.emitter.clone();
214 let clock = self.clock;
215 let trader_id = self.core.trader_id;
216
217 let http_client = if order_type == OrderType::Market {
218 Some(self.http_client.clone())
219 } else {
220 None
221 };
222
223 self.spawn_task("submit_order", async move {
224 let result: anyhow::Result<()> = async {
225 let price = if order_type == OrderType::Market {
233 let symbol = instrument_id.symbol.inner();
234 let ax_side = AxOrderSide::try_from(order_side)
235 .map_err(|e| anyhow::anyhow!("Invalid order side: {e}"))?;
236 let qty_contracts = quantity_to_contracts(quantity)?;
237
238 let request =
239 PreviewAggressiveLimitOrderRequest::new(symbol, qty_contracts, ax_side);
240 let response = http_client
241 .expect("HTTP client should be set for market orders")
242 .inner
243 .preview_aggressive_limit_order(&request)
244 .await
245 .map_err(|e| {
246 anyhow::anyhow!("Failed to preview aggressive limit order: {e}")
247 })?;
248
249 if response.remaining_quantity > 0 {
250 log::warn!(
251 "Market order book depth insufficient: \
252 filled_qty={} remaining_qty={} for {instrument_id}",
253 response.filled_quantity,
254 response.remaining_quantity,
255 );
256 }
257
258 let limit_price_decimal = response.limit_price.ok_or_else(|| {
259 anyhow::anyhow!(
260 "No liquidity available for market order on {instrument_id}"
261 )
262 })?;
263
264 let price = Price::from(limit_price_decimal.to_string().as_str());
265 log::info!("Market order take-through price: {price} for {instrument_id}",);
266 Some(price)
267 } else {
268 limit_price
269 };
270
271 ws_orders
272 .submit_order(
273 trader_id,
274 strategy_id,
275 instrument_id,
276 client_order_id,
277 order_side,
278 order_type,
279 quantity,
280 time_in_force,
281 price,
282 trigger_price,
283 is_post_only,
284 )
285 .await
286 .map_err(|e| anyhow::anyhow!("Submit order failed: {e}"))?;
287
288 Ok(())
289 }
290 .await;
291
292 if let Err(e) = result {
293 let ts_event = clock.get_time_ns();
294 emitter.emit_order_rejected_event(
295 strategy_id,
296 instrument_id,
297 client_order_id,
298 &format!("submit-order-error: {e}"),
299 ts_event,
300 false,
301 );
302 anyhow::bail!("{e}");
303 }
304
305 Ok(())
306 });
307
308 Ok(())
309 }
310
311 fn cancel_order_internal(&self, cmd: &CancelOrder) {
312 let ws_orders = self.ws_orders.clone();
313
314 let emitter = self.emitter.clone();
315 let clock = self.clock;
316 let instrument_id = cmd.instrument_id;
317 let client_order_id = cmd.client_order_id;
318 let venue_order_id = cmd.venue_order_id;
319 let strategy_id = cmd.strategy_id;
320
321 self.spawn_task("cancel_order", async move {
322 let result = ws_orders
323 .cancel_order(client_order_id, venue_order_id)
324 .await
325 .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
326
327 if let Err(e) = &result {
328 let ts_event = clock.get_time_ns();
329 emitter.emit_order_cancel_rejected_event(
330 strategy_id,
331 instrument_id,
332 client_order_id,
333 venue_order_id,
334 &format!("cancel-order-error: {e}"),
335 ts_event,
336 );
337 anyhow::bail!("{e}");
338 }
339
340 Ok(())
341 });
342 }
343
344 fn spawn_task<F>(&self, description: &'static str, fut: F)
345 where
346 F: Future<Output = anyhow::Result<()>> + Send + 'static,
347 {
348 let runtime = get_runtime();
349 let handle = runtime.spawn(async move {
350 if let Err(e) = fut.await {
351 log::warn!("{description} failed: {e}");
352 }
353 });
354
355 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
356 tasks.retain(|handle| !handle.is_finished());
357 tasks.push(handle);
358 }
359
360 fn abort_pending_tasks(&self) {
361 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
362 for handle in tasks.drain(..) {
363 handle.abort();
364 }
365 }
366
367 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
369 let account_id = self.core.account_id;
370
371 if self.core.cache().account(&account_id).is_some() {
372 log::info!("Account {account_id} registered");
373 return Ok(());
374 }
375
376 let start = Instant::now();
377 let timeout = Duration::from_secs_f64(timeout_secs);
378 let interval = Duration::from_millis(10);
379
380 loop {
381 tokio::time::sleep(interval).await;
382
383 if self.core.cache().account(&account_id).is_some() {
384 log::info!("Account {account_id} registered");
385 return Ok(());
386 }
387
388 if start.elapsed() >= timeout {
389 anyhow::bail!(
390 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
391 );
392 }
393 }
394 }
395}
396
397#[async_trait(?Send)]
398impl ExecutionClient for AxExecutionClient {
399 fn is_connected(&self) -> bool {
400 self.core.is_connected()
401 }
402
403 fn client_id(&self) -> ClientId {
404 self.core.client_id
405 }
406
407 fn account_id(&self) -> AccountId {
408 self.core.account_id
409 }
410
411 fn venue(&self) -> Venue {
412 *AX_VENUE
413 }
414
415 fn oms_type(&self) -> OmsType {
416 self.core.oms_type
417 }
418
419 fn get_account(&self) -> Option<AccountAny> {
420 self.core.cache().account(&self.core.account_id).cloned()
421 }
422
423 async fn connect(&mut self) -> anyhow::Result<()> {
424 if self.core.is_connected() {
425 return Ok(());
426 }
427
428 self.http_client.reset_cancellation_token();
430
431 if !self.core.instruments_initialized() {
432 let instruments = self
433 .http_client
434 .request_instruments(None, None)
435 .await
436 .context("failed to request AX instruments")?;
437
438 if instruments.is_empty() {
439 log::warn!("No instruments returned from AX");
440 } else {
441 log::info!("Loaded {} instruments", instruments.len());
442 self.http_client.cache_instruments(&instruments);
443 self.ws_orders.cache_instruments(&instruments);
444 }
445 self.core.set_instruments_initialized();
446 }
447
448 let token = self.authenticate().await?;
449 self.ws_orders.connect(&token).await?;
450 log::info!("Connected to orders WebSocket");
451
452 let should_spawn = match &self.ws_stream_handle {
453 None => true,
454 Some(handle) => handle.is_finished(),
455 };
456
457 if should_spawn {
458 let stream = self.ws_orders.stream();
459 let emitter = self.emitter.clone();
460 let caches = self.ws_orders.caches().clone();
461 let account_id = self.core.account_id;
462 let instruments_cache = self.ws_orders.instruments_cache();
463 let clock = self.clock;
464
465 let handle = get_runtime().spawn(async move {
466 pin_mut!(stream);
467 while let Some(message) = stream.next().await {
468 dispatch_ws_message(
469 message,
470 &emitter,
471 &caches,
472 account_id,
473 &instruments_cache,
474 clock,
475 );
476 }
477 });
478 self.ws_stream_handle = Some(handle);
479 }
480
481 let account_state = self
482 .http_client
483 .request_account_state(self.core.account_id)
484 .await
485 .context("failed to request AX account state")?;
486
487 if !account_state.balances.is_empty() {
488 log::info!(
489 "Received account state with {} balance(s)",
490 account_state.balances.len()
491 );
492 }
493 self.emitter.send_account_state(account_state);
494
495 self.await_account_registered(AX_ACCOUNT_REGISTRATION_TIMEOUT_SECS)
496 .await?;
497
498 self.core.set_connected();
499 log::info!("Connected: client_id={}", self.core.client_id);
500 Ok(())
501 }
502
503 async fn disconnect(&mut self) -> anyhow::Result<()> {
504 if self.core.is_disconnected() {
505 return Ok(());
506 }
507
508 self.abort_pending_tasks();
509 self.http_client.cancel_all_requests();
510
511 self.ws_orders.close().await;
512
513 if let Some(handle) = self.ws_stream_handle.take() {
514 handle.abort();
515 }
516
517 self.core.set_disconnected();
518 log::info!("Disconnected: client_id={}", self.core.client_id);
519 Ok(())
520 }
521
522 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
523 self.update_account_state();
524 Ok(())
525 }
526
527 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
528 let http_client = self.http_client.clone();
529 let account_id = self.core.account_id;
530 let client_order_id = cmd.client_order_id;
531 let venue_order_id = cmd.venue_order_id;
532 let instrument_id = cmd.instrument_id;
533 let emitter = self.emitter.clone();
534
535 let (order_side, order_type, time_in_force) = {
537 let cache = self.core.cache();
538 match cache.order(&client_order_id) {
539 Some(order) => (
540 order.order_side(),
541 order.order_type(),
542 order.time_in_force(),
543 ),
544 None => (OrderSide::NoOrderSide, OrderType::Limit, TimeInForce::Gtc),
545 }
546 };
547
548 self.spawn_task("query_order", async move {
549 match http_client
550 .request_order_status(
551 account_id,
552 instrument_id,
553 Some(client_order_id),
554 venue_order_id,
555 order_side,
556 order_type,
557 time_in_force,
558 )
559 .await
560 {
561 Ok(report) => emitter.send_order_status_report(report),
562 Err(e) => log::error!("AX query order failed: {e}"),
563 }
564 Ok(())
565 });
566
567 Ok(())
568 }
569
570 fn generate_account_state(
571 &self,
572 balances: Vec<AccountBalance>,
573 margins: Vec<MarginBalance>,
574 reported: bool,
575 ts_event: UnixNanos,
576 ) -> anyhow::Result<()> {
577 self.emitter
578 .emit_account_state(balances, margins, reported, ts_event);
579 Ok(())
580 }
581
582 fn start(&mut self) -> anyhow::Result<()> {
583 if self.core.is_started() {
584 return Ok(());
585 }
586
587 self.emitter.set_sender(get_exec_event_sender());
588 self.core.set_started();
589 log::info!(
590 "Started: client_id={}, account_id={}, environment={}",
591 self.core.client_id,
592 self.core.account_id,
593 self.config.environment,
594 );
595 Ok(())
596 }
597
598 fn stop(&mut self) -> anyhow::Result<()> {
599 if self.core.is_stopped() {
600 return Ok(());
601 }
602
603 self.core.set_stopped();
604 self.core.set_disconnected();
605
606 if let Some(handle) = self.ws_stream_handle.take() {
607 handle.abort();
608 }
609 self.abort_pending_tasks();
610 log::info!("Stopped: client_id={}", self.core.client_id);
611 Ok(())
612 }
613
614 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
615 {
616 let cache = self.core.cache();
617 let order = cache.order(&cmd.client_order_id).ok_or_else(|| {
618 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
619 })?;
620
621 if order.is_closed() {
622 log::warn!("Cannot submit closed order {}", order.client_order_id());
623 return Ok(());
624 }
625
626 if !matches!(
627 order.order_type(),
628 OrderType::Market | OrderType::Limit | OrderType::StopLimit
629 ) {
630 self.emitter.emit_order_denied(
631 order,
632 &format!(
633 "Unsupported order type: {:?}, \
634 AX supports MARKET, LIMIT and STOP_LIMIT",
635 order.order_type(),
636 ),
637 );
638 return Ok(());
639 }
640
641 if order.time_in_force() == TimeInForce::Gtd {
642 self.emitter.emit_order_denied(
643 order,
644 "Unsupported time in force: GTD, \
645 AX supports GTC, IOC, FOK, and DAY",
646 );
647 return Ok(());
648 }
649
650 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
651 self.emitter.emit_order_submitted(order);
652 }
653
654 self.submit_order_internal(&cmd)
655 }
656
657 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
658 for (client_order_id, order_init) in cmd
659 .order_list
660 .client_order_ids
661 .iter()
662 .zip(cmd.order_inits.iter())
663 {
664 let submit_cmd = SubmitOrder::new(
665 cmd.trader_id,
666 cmd.client_id,
667 cmd.strategy_id,
668 cmd.instrument_id,
669 *client_order_id,
670 order_init.clone(),
671 cmd.exec_algorithm_id,
672 cmd.position_id,
673 cmd.params.clone(),
674 UUID4::new(),
675 cmd.ts_init,
676 );
677 self.submit_order(submit_cmd)?;
678 }
679 Ok(())
680 }
681
682 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
683 let venue_order_id = match cmd.venue_order_id {
684 Some(ref voi) => *voi,
685 None => {
686 let reason = "Cannot modify order without venue_order_id";
687 log::error!("{reason}");
688 let ts_event = self.clock.get_time_ns();
689 self.emitter.emit_order_modify_rejected_event(
690 cmd.strategy_id,
691 cmd.instrument_id,
692 cmd.client_order_id,
693 cmd.venue_order_id,
694 reason,
695 ts_event,
696 );
697 return Ok(());
698 }
699 };
700
701 let http_client = self.http_client.clone();
702 let emitter = self.emitter.clone();
703 let caches = self.ws_orders.caches().clone();
704 let clock = self.clock;
705 let client_order_id = cmd.client_order_id;
706 let strategy_id = cmd.strategy_id;
707 let instrument_id = cmd.instrument_id;
708 let quantity = cmd.quantity;
709 let price = cmd.price;
710 let trigger_price = cmd.trigger_price;
711
712 self.spawn_task("modify_order", async move {
713 let mut request = ReplaceOrderRequest::new(venue_order_id.as_str());
714
715 if let Some(price) = price {
716 request = request.with_price(price.as_decimal());
717 }
718
719 if let Some(qty) = quantity {
720 let contracts = quantity_to_contracts(qty)?;
721 request = request.with_quantity(contracts);
722 }
723
724 if let Some(trigger) = trigger_price {
725 request = request.with_trigger_price(trigger.as_decimal());
726 }
727
728 match http_client.inner.replace_order(&request).await {
729 Ok(resp) => {
730 let new_venue_order_id = VenueOrderId::new(&resp.oid);
731 caches
732 .venue_to_client_id
733 .insert(new_venue_order_id, client_order_id);
734 if let Some(mut entry) = caches.orders_metadata.get_mut(&client_order_id) {
735 entry.venue_order_id = Some(new_venue_order_id);
736 entry.pending_trigger_price = trigger_price;
737 }
738 log::info!("Order replaced: old={} new={}", request.oid, resp.oid);
739 }
740 Err(e) => {
741 let reason = format!("modify-order-error: {e}");
742 let ts_event = clock.get_time_ns();
743 emitter.emit_order_modify_rejected_event(
744 strategy_id,
745 instrument_id,
746 client_order_id,
747 Some(VenueOrderId::new(&request.oid)),
748 &reason,
749 ts_event,
750 );
751 anyhow::bail!("{reason}");
752 }
753 }
754
755 Ok(())
756 });
757
758 Ok(())
759 }
760
761 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
762 self.cancel_order_internal(&cmd);
763 Ok(())
764 }
765
766 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
767 let http_client = self.http_client.clone();
768 let emitter = self.emitter.clone();
769 let clock = self.clock;
770 let instrument_id = cmd.instrument_id;
771 let account_id = self.core.account_id;
772 let trader_id = self.core.trader_id;
773
774 let open_orders: Vec<(ClientOrderId, Option<VenueOrderId>, StrategyId)> = {
776 let cache = self.core.cache();
777 cache
778 .orders_open(None, Some(&instrument_id), None, None, None)
779 .iter()
780 .map(|o| (o.client_order_id(), o.venue_order_id(), o.strategy_id()))
781 .collect()
782 };
783
784 let caches = self.ws_orders.caches().clone();
785
786 self.spawn_task("cancel_all_orders", async move {
787 match http_client.cancel_all_orders(instrument_id).await {
788 Ok(()) => {
789 log::info!("Canceled all orders for {instrument_id}");
790
791 let ts_event = clock.get_time_ns();
795
796 for (client_order_id, venue_order_id, strategy_id) in &open_orders {
797 let event = OrderCanceled::new(
798 trader_id,
799 *strategy_id,
800 instrument_id,
801 *client_order_id,
802 UUID4::new(),
803 ts_event,
804 clock.get_time_ns(),
805 false,
806 *venue_order_id,
807 Some(account_id),
808 );
809 emitter.send_order_event(OrderEventAny::Canceled(event));
810
811 if let Some(voi) = venue_order_id {
812 caches.venue_to_client_id.remove(voi);
813 }
814 caches.orders_metadata.remove(client_order_id);
815 }
816 }
817 Err(e) => {
818 log::error!("Failed to cancel all orders for {instrument_id}: {e}");
819 let ts_event = clock.get_time_ns();
820
821 for (client_order_id, venue_order_id, strategy_id) in &open_orders {
822 emitter.emit_order_cancel_rejected_event(
823 *strategy_id,
824 instrument_id,
825 *client_order_id,
826 *venue_order_id,
827 &format!("cancel-all-orders-error: {e}"),
828 ts_event,
829 );
830 }
831 }
832 }
833 Ok(())
834 });
835
836 Ok(())
837 }
838
839 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
840 for cancel in &cmd.cancels {
841 self.cancel_order_internal(cancel);
842 }
843 Ok(())
844 }
845
846 async fn generate_order_status_report(
847 &self,
848 cmd: &GenerateOrderStatusReport,
849 ) -> anyhow::Result<Option<OrderStatusReport>> {
850 let cid_map = self.ws_orders.cid_to_client_order_id().clone();
851 let cid_resolver = move |cid: u64| cid_map.get(&cid).map(|v| *v);
852
853 let mut reports = self
854 .http_client
855 .request_order_status_reports(self.core.account_id, Some(cid_resolver))
856 .await?;
857
858 if let Some(instrument_id) = cmd.instrument_id {
859 reports.retain(|report| report.instrument_id == instrument_id);
860 }
861
862 if let Some(client_order_id) = cmd.client_order_id {
863 reports.retain(|report| report.client_order_id == Some(client_order_id));
864 }
865
866 if let Some(venue_order_id) = cmd.venue_order_id {
867 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
868 }
869
870 Ok(reports.into_iter().next())
871 }
872
873 async fn generate_order_status_reports(
874 &self,
875 cmd: &GenerateOrderStatusReports,
876 ) -> anyhow::Result<Vec<OrderStatusReport>> {
877 let cid_map = self.ws_orders.cid_to_client_order_id().clone();
878 let cid_resolver = move |cid: u64| cid_map.get(&cid).map(|v| *v);
879
880 let mut reports = self
881 .http_client
882 .request_order_status_reports(self.core.account_id, Some(cid_resolver))
883 .await?;
884
885 if let Some(instrument_id) = cmd.instrument_id {
886 reports.retain(|report| report.instrument_id == instrument_id);
887 }
888
889 if cmd.open_only {
890 reports.retain(|r| r.order_status.is_open());
891 }
892
893 if let Some(start) = cmd.start {
894 reports.retain(|r| r.ts_last >= start);
895 }
896
897 if let Some(end) = cmd.end {
898 reports.retain(|r| r.ts_last <= end);
899 }
900
901 Ok(reports)
902 }
903
904 async fn generate_fill_reports(
905 &self,
906 cmd: GenerateFillReports,
907 ) -> anyhow::Result<Vec<FillReport>> {
908 let mut reports = self
909 .http_client
910 .request_fill_reports(self.core.account_id)
911 .await?;
912
913 if let Some(instrument_id) = cmd.instrument_id {
914 reports.retain(|report| report.instrument_id == instrument_id);
915 }
916
917 if let Some(venue_order_id) = cmd.venue_order_id {
918 reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
919 }
920
921 Ok(reports)
922 }
923
924 async fn generate_position_status_reports(
925 &self,
926 cmd: &GeneratePositionStatusReports,
927 ) -> anyhow::Result<Vec<PositionStatusReport>> {
928 let mut reports = self
929 .http_client
930 .request_position_reports(self.core.account_id)
931 .await?;
932
933 if let Some(instrument_id) = cmd.instrument_id {
934 reports.retain(|report| report.instrument_id == instrument_id);
935 }
936
937 Ok(reports)
938 }
939
940 async fn generate_mass_status(
941 &self,
942 lookback_mins: Option<u64>,
943 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
944 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
945
946 let ts_now = self.clock.get_time_ns();
947
948 let start = lookback_mins.map(|mins| {
949 let lookback_ns = mins * 60 * 1_000_000_000;
950 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
951 });
952
953 let order_cmd = GenerateOrderStatusReports::new(
954 UUID4::new(),
955 ts_now,
956 false, None, start,
959 None, None, None, );
963
964 let fill_cmd = GenerateFillReports::new(
965 UUID4::new(),
966 ts_now,
967 None, None, start,
970 None, None, None, );
974
975 let position_cmd = GeneratePositionStatusReports::new(
976 UUID4::new(),
977 ts_now,
978 None, start,
980 None, None, None, );
984
985 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
986 self.generate_order_status_reports(&order_cmd),
987 self.generate_fill_reports(fill_cmd),
988 self.generate_position_status_reports(&position_cmd),
989 )?;
990
991 log::info!("Received {} OrderStatusReports", order_reports.len());
992 log::info!("Received {} FillReports", fill_reports.len());
993 log::info!("Received {} PositionReports", position_reports.len());
994
995 let mut mass_status = ExecutionMassStatus::new(
996 self.core.client_id,
997 self.core.account_id,
998 *AX_VENUE,
999 ts_now,
1000 None,
1001 );
1002
1003 mass_status.add_order_reports(order_reports);
1004 mass_status.add_fill_reports(fill_reports);
1005 mass_status.add_position_reports(position_reports);
1006
1007 Ok(Some(mass_status))
1008 }
1009
1010 fn register_external_order(
1011 &self,
1012 client_order_id: ClientOrderId,
1013 venue_order_id: VenueOrderId,
1014 instrument_id: InstrumentId,
1015 strategy_id: StrategyId,
1016 _ts_init: UnixNanos,
1017 ) {
1018 self.ws_orders.register_external_order(
1019 client_order_id,
1020 venue_order_id,
1021 instrument_id,
1022 strategy_id,
1023 );
1024 }
1025}
1026
1027fn dispatch_ws_message(
1029 message: AxOrdersWsMessage,
1030 emitter: &ExecutionEventEmitter,
1031 caches: &OrdersCaches,
1032 account_id: AccountId,
1033 instruments: &AtomicMap<Ustr, InstrumentAny>,
1034 clock: &'static AtomicTime,
1035) {
1036 match message {
1037 AxOrdersWsMessage::Event(event) => {
1038 dispatch_order_event(*event, emitter, caches, account_id, instruments, clock);
1039 }
1040 AxOrdersWsMessage::PlaceOrderResponse(resp) => {
1041 log::debug!(
1042 "Place order response: rid={} oid={}",
1043 resp.rid,
1044 resp.res.oid
1045 );
1046 }
1047 AxOrdersWsMessage::CancelOrderResponse(resp) => {
1048 log::debug!(
1049 "Cancel order response: rid={} accepted={}",
1050 resp.rid,
1051 resp.res.cxl_rx
1052 );
1053 }
1054 AxOrdersWsMessage::OpenOrdersResponse(resp) => {
1055 log::debug!("Open orders response: {} orders", resp.res.len());
1056 }
1057 AxOrdersWsMessage::Error(err) => {
1058 log::error!("WebSocket error: {}", err.message);
1059 }
1060 AxOrdersWsMessage::Reconnected => {
1061 log::info!("WebSocket reconnected");
1062 }
1063 AxOrdersWsMessage::Authenticated => {
1064 log::debug!("WebSocket authenticated");
1065 }
1066 }
1067}
1068
1069fn dispatch_order_event(
1070 event: AxWsOrderEvent,
1071 emitter: &ExecutionEventEmitter,
1072 caches: &OrdersCaches,
1073 account_id: AccountId,
1074 instruments: &AtomicMap<Ustr, InstrumentAny>,
1075 clock: &'static AtomicTime,
1076) {
1077 match event {
1078 AxWsOrderEvent::Heartbeat => {}
1079 AxWsOrderEvent::Acknowledged(msg) => {
1080 if let Some(event) =
1081 create_order_accepted(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
1082 {
1083 emitter.send_order_event(OrderEventAny::Accepted(event));
1084 } else if let Some(report) = create_order_status_report(
1085 &msg.o,
1086 OrderStatus::Accepted,
1087 msg.ts,
1088 msg.tn,
1089 caches,
1090 account_id,
1091 instruments,
1092 clock,
1093 ) {
1094 emitter.send_order_status_report(report);
1095 }
1096 }
1097 AxWsOrderEvent::PartiallyFilled(msg) => {
1098 dispatch_fill_event(
1099 &msg.o,
1100 &msg.xs,
1101 msg.ts,
1102 msg.tn,
1103 emitter,
1104 caches,
1105 account_id,
1106 instruments,
1107 clock,
1108 );
1109 }
1110 AxWsOrderEvent::Filled(msg) => {
1111 dispatch_fill_event(
1112 &msg.o,
1113 &msg.xs,
1114 msg.ts,
1115 msg.tn,
1116 emitter,
1117 caches,
1118 account_id,
1119 instruments,
1120 clock,
1121 );
1122 cleanup_terminal_order_tracking(&msg.o, caches);
1123 }
1124 AxWsOrderEvent::Canceled(msg) => {
1125 if let Some(event) =
1126 create_order_canceled(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
1127 {
1128 emitter.send_order_event(OrderEventAny::Canceled(event));
1129 } else if let Some(report) = create_order_status_report(
1130 &msg.o,
1131 OrderStatus::Canceled,
1132 msg.ts,
1133 msg.tn,
1134 caches,
1135 account_id,
1136 instruments,
1137 clock,
1138 ) {
1139 emitter.send_order_status_report(report);
1140 }
1141 cleanup_terminal_order_tracking(&msg.o, caches);
1142 }
1143 AxWsOrderEvent::Rejected(msg) => {
1144 let known_reason = msg.r.filter(|r| !matches!(r, AxOrderRejectReason::Unknown));
1145 let reason = known_reason
1146 .as_ref()
1147 .map(AsRef::as_ref)
1148 .or(msg.txt.as_deref())
1149 .unwrap_or("UNKNOWN");
1150
1151 if let Some(event) =
1152 create_order_rejected(&msg.o, reason, msg.ts, msg.tn, caches, account_id, clock)
1153 {
1154 emitter.send_order_event(OrderEventAny::Rejected(event));
1155 }
1156 cleanup_terminal_order_tracking(&msg.o, caches);
1157 }
1158 AxWsOrderEvent::Expired(msg) => {
1159 if let Some(event) =
1160 create_order_expired(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
1161 {
1162 emitter.send_order_event(OrderEventAny::Expired(event));
1163 } else if let Some(report) = create_order_status_report(
1164 &msg.o,
1165 OrderStatus::Expired,
1166 msg.ts,
1167 msg.tn,
1168 caches,
1169 account_id,
1170 instruments,
1171 clock,
1172 ) {
1173 emitter.send_order_status_report(report);
1174 }
1175 cleanup_terminal_order_tracking(&msg.o, caches);
1176 }
1177 AxWsOrderEvent::Replaced(msg) => {
1178 if let Some(event) =
1179 create_order_updated(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
1180 {
1181 emitter.send_order_event(OrderEventAny::Updated(event));
1182 } else if let Some(report) = create_order_status_report(
1183 &msg.o,
1184 OrderStatus::Accepted,
1185 msg.ts,
1186 msg.tn,
1187 caches,
1188 account_id,
1189 instruments,
1190 clock,
1191 ) {
1192 emitter.send_order_status_report(report);
1193 }
1194 }
1195 AxWsOrderEvent::DoneForDay(msg) => {
1196 if let Some(event) =
1197 create_order_expired(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
1198 {
1199 emitter.send_order_event(OrderEventAny::Expired(event));
1200 } else if let Some(report) = create_order_status_report(
1201 &msg.o,
1202 OrderStatus::Expired,
1203 msg.ts,
1204 msg.tn,
1205 caches,
1206 account_id,
1207 instruments,
1208 clock,
1209 ) {
1210 emitter.send_order_status_report(report);
1211 }
1212 cleanup_terminal_order_tracking(&msg.o, caches);
1213 }
1214 AxWsOrderEvent::CancelRejected(msg) => {
1215 let venue_order_id = VenueOrderId::new(&msg.oid);
1216 if let Some(client_order_id) = caches.venue_to_client_id.get(&venue_order_id)
1217 && let Some(metadata) = caches.orders_metadata.get(&client_order_id)
1218 {
1219 let event = OrderCancelRejected::new(
1220 metadata.trader_id,
1221 metadata.strategy_id,
1222 metadata.instrument_id,
1223 metadata.client_order_id,
1224 Ustr::from(msg.r.as_ref()),
1225 UUID4::new(),
1226 clock.get_time_ns(),
1227 metadata.ts_init,
1228 false,
1229 Some(venue_order_id),
1230 Some(account_id),
1231 );
1232 emitter.send_order_event(OrderEventAny::CancelRejected(event));
1233 } else {
1234 log::warn!(
1235 "Could not find metadata for cancel rejected order {}",
1236 msg.oid
1237 );
1238 }
1239 }
1240 }
1241}
1242
1243#[expect(clippy::too_many_arguments)]
1244fn dispatch_fill_event(
1245 order: &AxWsOrder,
1246 execution: &AxWsTradeExecution,
1247 ts: i64,
1248 tn: i64,
1249 emitter: &ExecutionEventEmitter,
1250 caches: &OrdersCaches,
1251 account_id: AccountId,
1252 instruments: &AtomicMap<Ustr, InstrumentAny>,
1253 clock: &'static AtomicTime,
1254) {
1255 if let Some(event) = create_order_filled(order, execution, ts, tn, caches, account_id, clock) {
1256 emitter.send_order_event(OrderEventAny::Filled(event));
1257 } else if let Some(report) = create_fill_report(
1258 order,
1259 execution,
1260 ts,
1261 tn,
1262 caches,
1263 account_id,
1264 instruments,
1265 clock,
1266 ) {
1267 emitter.send_fill_report(report);
1268 }
1269}
1270
1271pub(crate) fn lookup_order_metadata<'a>(
1272 order: &AxWsOrder,
1273 caches: &'a OrdersCaches,
1274) -> Option<dashmap::mapref::one::Ref<'a, ClientOrderId, OrderMetadata>> {
1275 let venue_order_id = VenueOrderId::new(&order.oid);
1276
1277 if let Some(client_order_id) = caches.venue_to_client_id.get(&venue_order_id)
1278 && let Some(metadata) = caches.orders_metadata.get(&*client_order_id)
1279 {
1280 return Some(metadata);
1281 }
1282
1283 if let Some(cid) = order.cid
1284 && let Some(client_order_id) = caches.cid_to_client_order_id.get(&cid)
1285 && let Some(metadata) = caches.orders_metadata.get(&*client_order_id)
1286 {
1287 return Some(metadata);
1288 }
1289
1290 None
1291}
1292
1293pub(crate) fn create_order_accepted(
1294 order: &AxWsOrder,
1295 event_ts: i64,
1296 event_tn: i64,
1297 caches: &OrdersCaches,
1298 account_id: AccountId,
1299 clock: &'static AtomicTime,
1300) -> Option<OrderAccepted> {
1301 let venue_order_id = VenueOrderId::new(&order.oid);
1302 let metadata = lookup_order_metadata(order, caches)?;
1303
1304 let client_order_id = metadata.client_order_id;
1305 let trader_id = metadata.trader_id;
1306 let strategy_id = metadata.strategy_id;
1307 let instrument_id = metadata.instrument_id;
1308 drop(metadata);
1309
1310 caches
1311 .venue_to_client_id
1312 .insert(venue_order_id, client_order_id);
1313
1314 if let Some(mut entry) = caches.orders_metadata.get_mut(&client_order_id) {
1315 entry.venue_order_id = Some(venue_order_id);
1316 }
1317
1318 let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1319 .map_err(|e| log::error!("{e}"))
1320 .ok()?;
1321
1322 Some(OrderAccepted::new(
1323 trader_id,
1324 strategy_id,
1325 instrument_id,
1326 client_order_id,
1327 venue_order_id,
1328 account_id,
1329 UUID4::new(),
1330 ts_event,
1331 clock.get_time_ns(),
1332 false,
1333 ))
1334}
1335
1336pub(crate) fn create_order_updated(
1337 order: &AxWsOrder,
1338 event_ts: i64,
1339 event_tn: i64,
1340 caches: &OrdersCaches,
1341 account_id: AccountId,
1342 clock: &'static AtomicTime,
1343) -> Option<OrderUpdated> {
1344 let metadata = lookup_order_metadata(order, caches)?;
1345
1346 let client_order_id = metadata.client_order_id;
1347 let trader_id = metadata.trader_id;
1348 let strategy_id = metadata.strategy_id;
1349 let instrument_id = metadata.instrument_id;
1350 let price_precision = metadata.price_precision;
1351 let size_precision = metadata.size_precision;
1352 let pending_trigger_price = metadata.pending_trigger_price;
1353 let venue_order_id = metadata
1356 .venue_order_id
1357 .unwrap_or_else(|| VenueOrderId::new(&order.oid));
1358 drop(metadata);
1359
1360 caches
1361 .venue_to_client_id
1362 .insert(venue_order_id, client_order_id);
1363
1364 if let Some(mut entry) = caches.orders_metadata.get_mut(&client_order_id) {
1366 entry.pending_trigger_price = None;
1367 }
1368
1369 let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1370 .map_err(|e| log::error!("{e}"))
1371 .ok()?;
1372
1373 let quantity = Quantity::new(order.q as f64, size_precision);
1374 let price = Price::from_decimal_dp(order.p, price_precision).ok();
1375
1376 Some(OrderUpdated::new(
1377 trader_id,
1378 strategy_id,
1379 instrument_id,
1380 client_order_id,
1381 quantity,
1382 UUID4::new(),
1383 ts_event,
1384 clock.get_time_ns(),
1385 false,
1386 Some(venue_order_id),
1387 Some(account_id),
1388 price,
1389 pending_trigger_price,
1390 None, false,
1392 ))
1393}
1394
1395pub(crate) fn create_order_filled(
1396 order: &AxWsOrder,
1397 execution: &AxWsTradeExecution,
1398 event_ts: i64,
1399 event_tn: i64,
1400 caches: &OrdersCaches,
1401 account_id: AccountId,
1402 clock: &'static AtomicTime,
1403) -> Option<OrderFilled> {
1404 let venue_order_id = VenueOrderId::new(&order.oid);
1405 let metadata = lookup_order_metadata(order, caches)?;
1406
1407 let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1408 .map_err(|e| log::error!("{e}"))
1409 .ok()?;
1410
1411 let last_qty = Quantity::new(execution.q as f64, metadata.size_precision);
1412 let last_px = Price::from_decimal_dp(execution.p, metadata.price_precision).ok()?;
1413
1414 let order_side: OrderSide = order.d.into();
1415
1416 let liquidity_side = if execution.agg {
1417 LiquiditySide::Taker
1418 } else {
1419 LiquiditySide::Maker
1420 };
1421
1422 Some(OrderFilled::new(
1423 metadata.trader_id,
1424 metadata.strategy_id,
1425 metadata.instrument_id,
1426 metadata.client_order_id,
1427 venue_order_id,
1428 account_id,
1429 TradeId::new(&execution.tid),
1430 order_side,
1431 OrderType::Limit,
1432 last_qty,
1433 last_px,
1434 metadata.quote_currency,
1435 liquidity_side,
1436 UUID4::new(),
1437 ts_event,
1438 clock.get_time_ns(),
1439 false,
1440 None,
1441 None,
1442 ))
1443}
1444
1445pub(crate) fn create_order_canceled(
1446 order: &AxWsOrder,
1447 event_ts: i64,
1448 event_tn: i64,
1449 caches: &OrdersCaches,
1450 account_id: AccountId,
1451 clock: &'static AtomicTime,
1452) -> Option<OrderCanceled> {
1453 let venue_order_id = VenueOrderId::new(&order.oid);
1454 let metadata = lookup_order_metadata(order, caches)?;
1455
1456 let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1457 .map_err(|e| log::error!("{e}"))
1458 .ok()?;
1459
1460 Some(OrderCanceled::new(
1461 metadata.trader_id,
1462 metadata.strategy_id,
1463 metadata.instrument_id,
1464 metadata.client_order_id,
1465 UUID4::new(),
1466 ts_event,
1467 clock.get_time_ns(),
1468 false,
1469 Some(venue_order_id),
1470 Some(account_id),
1471 ))
1472}
1473
1474pub(crate) fn create_order_expired(
1475 order: &AxWsOrder,
1476 event_ts: i64,
1477 event_tn: i64,
1478 caches: &OrdersCaches,
1479 account_id: AccountId,
1480 clock: &'static AtomicTime,
1481) -> Option<OrderExpired> {
1482 let venue_order_id = VenueOrderId::new(&order.oid);
1483 let metadata = lookup_order_metadata(order, caches)?;
1484
1485 let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1486 .map_err(|e| log::error!("{e}"))
1487 .ok()?;
1488
1489 Some(OrderExpired::new(
1490 metadata.trader_id,
1491 metadata.strategy_id,
1492 metadata.instrument_id,
1493 metadata.client_order_id,
1494 UUID4::new(),
1495 ts_event,
1496 clock.get_time_ns(),
1497 false,
1498 Some(venue_order_id),
1499 Some(account_id),
1500 ))
1501}
1502
1503pub(crate) fn create_order_rejected(
1504 order: &AxWsOrder,
1505 reason: &str,
1506 event_ts: i64,
1507 event_tn: i64,
1508 caches: &OrdersCaches,
1509 account_id: AccountId,
1510 clock: &'static AtomicTime,
1511) -> Option<OrderRejected> {
1512 let metadata = lookup_order_metadata(order, caches)?;
1513
1514 let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1515 .map_err(|e| log::error!("{e}"))
1516 .ok()?;
1517 let due_post_only = reason.contains(AX_POST_ONLY_REJECT);
1518
1519 Some(OrderRejected::new(
1520 metadata.trader_id,
1521 metadata.strategy_id,
1522 metadata.instrument_id,
1523 metadata.client_order_id,
1524 account_id,
1525 Ustr::from(reason),
1526 UUID4::new(),
1527 ts_event,
1528 clock.get_time_ns(),
1529 false,
1530 due_post_only,
1531 ))
1532}
1533
1534pub(crate) fn cleanup_terminal_order_tracking(order: &AxWsOrder, caches: &OrdersCaches) {
1535 let venue_order_id = VenueOrderId::new(&order.oid);
1536 let client_order_id = caches
1537 .venue_to_client_id
1538 .remove(&venue_order_id)
1539 .map(|(_, v)| v)
1540 .or_else(|| {
1541 order
1542 .cid
1543 .and_then(|cid| caches.cid_to_client_order_id.remove(&cid).map(|(_, v)| v))
1544 });
1545
1546 if let Some(client_order_id) = client_order_id {
1547 caches.orders_metadata.remove(&client_order_id);
1548 }
1549
1550 if let Some(cid) = order.cid {
1551 caches.cid_to_client_order_id.remove(&cid);
1552 }
1553}
1554
1555#[expect(clippy::too_many_arguments)]
1556fn create_order_status_report(
1557 order: &AxWsOrder,
1558 order_status: OrderStatus,
1559 event_ts: i64,
1560 event_tn: i64,
1561 caches: &OrdersCaches,
1562 account_id: AccountId,
1563 instruments: &AtomicMap<Ustr, InstrumentAny>,
1564 clock: &'static AtomicTime,
1565) -> Option<OrderStatusReport> {
1566 let instruments_snap = instruments.load();
1567 let instrument = instruments_snap.get(&order.s)?;
1568 let venue_order_id = VenueOrderId::new(&order.oid);
1569 let instrument_id = instrument.id();
1570 let order_side = order.d.into();
1571 let time_in_force = order.tif.into();
1572
1573 let quantity = Quantity::new(order.q as f64, instrument.size_precision());
1574 let filled_qty = Quantity::new(order.xq as f64, instrument.size_precision());
1575
1576 let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1577 .map_err(|e| log::error!("{e}"))
1578 .ok()?;
1579 let ts_init = clock.get_time_ns();
1580
1581 let client_order_id = order.cid.map(|cid| {
1582 caches
1583 .cid_to_client_order_id
1584 .get(&cid)
1585 .map_or_else(|| cid_to_client_order_id(cid), |v| *v)
1586 });
1587
1588 let mut report = OrderStatusReport::new(
1589 account_id,
1590 instrument_id,
1591 client_order_id,
1592 venue_order_id,
1593 order_side,
1594 OrderType::Limit,
1595 time_in_force,
1596 order_status,
1597 quantity,
1598 filled_qty,
1599 ts_event,
1600 ts_event,
1601 ts_init,
1602 Some(UUID4::new()),
1603 );
1604
1605 if let Ok(price) = Price::from_decimal_dp(order.p, instrument.price_precision()) {
1606 report = report.with_price(price);
1607 }
1608
1609 Some(report)
1610}
1611
1612#[expect(clippy::too_many_arguments)]
1613fn create_fill_report(
1614 order: &AxWsOrder,
1615 execution: &AxWsTradeExecution,
1616 event_ts: i64,
1617 event_tn: i64,
1618 caches: &OrdersCaches,
1619 account_id: AccountId,
1620 instruments: &AtomicMap<Ustr, InstrumentAny>,
1621 clock: &'static AtomicTime,
1622) -> Option<FillReport> {
1623 let instruments_snap = instruments.load();
1624 let instrument = instruments_snap.get(&order.s)?;
1625 let venue_order_id = VenueOrderId::new(&order.oid);
1626 let instrument_id = instrument.id();
1627 let order_side = order.d.into();
1628
1629 let last_qty = Quantity::new(execution.q as f64, instrument.size_precision());
1630 let last_px = Price::from_decimal_dp(execution.p, instrument.price_precision()).ok()?;
1631
1632 let liquidity_side = if execution.agg {
1633 LiquiditySide::Taker
1634 } else {
1635 LiquiditySide::Maker
1636 };
1637
1638 let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1639 .map_err(|e| log::error!("{e}"))
1640 .ok()?;
1641 let ts_init = clock.get_time_ns();
1642
1643 let client_order_id = order.cid.map(|cid| {
1644 caches
1645 .cid_to_client_order_id
1646 .get(&cid)
1647 .map_or_else(|| cid_to_client_order_id(cid), |v| *v)
1648 });
1649
1650 let commission = Money::new(0.0, instrument.quote_currency());
1654
1655 Some(FillReport::new(
1656 account_id,
1657 instrument_id,
1658 venue_order_id,
1659 TradeId::new(&execution.tid),
1660 order_side,
1661 last_qty,
1662 last_px,
1663 commission,
1664 liquidity_side,
1665 client_order_id,
1666 None,
1667 ts_event,
1668 ts_init,
1669 Some(UUID4::new()),
1670 ))
1671}
1672
1673#[cfg(test)]
1674mod tests {
1675 use std::sync::Arc;
1676
1677 use dashmap::DashMap;
1678 use nautilus_core::time::get_atomic_clock_realtime;
1679 use nautilus_model::{
1680 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
1681 types::{Currency, Price, Quantity},
1682 };
1683 use rstest::rstest;
1684 use rust_decimal::Decimal;
1685 use rust_decimal_macros::dec;
1686 use ustr::Ustr;
1687
1688 use super::*;
1689 use crate::{
1690 common::enums::{AxOrderSide, AxOrderStatus, AxTimeInForce},
1691 websocket::{
1692 messages::{AxWsTradeExecution, OrderMetadata},
1693 orders::OrdersCaches,
1694 },
1695 };
1696
1697 fn test_caches() -> OrdersCaches {
1698 OrdersCaches {
1699 orders_metadata: Arc::new(DashMap::new()),
1700 venue_to_client_id: Arc::new(DashMap::new()),
1701 cid_to_client_order_id: Arc::new(DashMap::new()),
1702 }
1703 }
1704
1705 fn test_ws_order(oid: &str, price: Decimal, qty: u64) -> AxWsOrder {
1706 AxWsOrder {
1707 oid: oid.to_string(),
1708 u: "user".to_string(),
1709 s: Ustr::from("BTC-PERP"),
1710 p: price,
1711 q: qty,
1712 xq: 0,
1713 rq: qty,
1714 o: AxOrderStatus::Accepted,
1715 d: AxOrderSide::Buy,
1716 tif: AxTimeInForce::Gtc,
1717 ts: 1609459200,
1718 tn: 0,
1719 cid: None,
1720 tag: None,
1721 txt: None,
1722 }
1723 }
1724
1725 #[rstest]
1726 fn test_create_order_updated_uses_cached_venue_order_id() {
1727 let caches = test_caches();
1728 let clock = get_atomic_clock_realtime();
1729 let account_id = AccountId::from("AX-001");
1730 let client_order_id = ClientOrderId::from("O-001");
1731 let new_venue_id = VenueOrderId::new("NEW-OID");
1732 let trigger = Price::from("49000.00");
1733
1734 let metadata = OrderMetadata {
1735 trader_id: TraderId::from("TRADER-001"),
1736 strategy_id: StrategyId::from("S-001"),
1737 instrument_id: InstrumentId::from("BTC-PERP.AX"),
1738 client_order_id,
1739 venue_order_id: Some(new_venue_id),
1740 ts_init: 0.into(),
1741 size_precision: 0,
1742 price_precision: 2,
1743 quote_currency: Currency::USD(),
1744 pending_trigger_price: Some(trigger),
1745 };
1746 caches.orders_metadata.insert(client_order_id, metadata);
1747 caches
1748 .venue_to_client_id
1749 .insert(new_venue_id, client_order_id);
1750
1751 let ws_order = test_ws_order("OLD-OID", dec!(50500.00), 100);
1753
1754 let cid_value = 42u64;
1757 caches
1758 .cid_to_client_order_id
1759 .insert(cid_value, client_order_id);
1760 let mut ws_order_with_cid = ws_order;
1761 ws_order_with_cid.cid = Some(cid_value);
1762
1763 let event = create_order_updated(
1764 &ws_order_with_cid,
1765 1609459200,
1766 0,
1767 &caches,
1768 account_id,
1769 clock,
1770 )
1771 .expect("should produce OrderUpdated");
1772
1773 assert_eq!(event.venue_order_id, Some(new_venue_id));
1775 assert_eq!(event.trigger_price, Some(trigger));
1776 assert_eq!(event.quantity, Quantity::new(100.0, 0));
1777 assert_eq!(event.price, Some(Price::from("50500.00")));
1778
1779 let meta = caches.orders_metadata.get(&client_order_id).unwrap();
1781 assert!(meta.pending_trigger_price.is_none());
1782 }
1783
1784 #[rstest]
1785 fn test_create_order_updated_falls_back_to_ws_oid() {
1786 let caches = test_caches();
1787 let clock = get_atomic_clock_realtime();
1788 let account_id = AccountId::from("AX-001");
1789 let client_order_id = ClientOrderId::from("O-002");
1790 let ws_oid = VenueOrderId::new("WS-OID");
1791
1792 let metadata = OrderMetadata {
1793 trader_id: TraderId::from("TRADER-001"),
1794 strategy_id: StrategyId::from("S-001"),
1795 instrument_id: InstrumentId::from("BTC-PERP.AX"),
1796 client_order_id,
1797 venue_order_id: None,
1798 ts_init: 0.into(),
1799 size_precision: 0,
1800 price_precision: 2,
1801 quote_currency: Currency::USD(),
1802 pending_trigger_price: None,
1803 };
1804 caches.orders_metadata.insert(client_order_id, metadata);
1805 caches.venue_to_client_id.insert(ws_oid, client_order_id);
1806
1807 let ws_order = test_ws_order("WS-OID", dec!(50500.00), 200);
1808
1809 let event = create_order_updated(&ws_order, 1609459200, 0, &caches, account_id, clock)
1810 .expect("should produce OrderUpdated");
1811
1812 assert_eq!(event.venue_order_id, Some(ws_oid));
1813 assert!(event.trigger_price.is_none());
1814 }
1815
1816 fn test_metadata(client_order_id: ClientOrderId, instrument_id: InstrumentId) -> OrderMetadata {
1817 OrderMetadata {
1818 trader_id: TraderId::from("TRADER-001"),
1819 strategy_id: StrategyId::from("S-001"),
1820 instrument_id,
1821 client_order_id,
1822 venue_order_id: None,
1823 ts_init: 0.into(),
1824 size_precision: 0,
1825 price_precision: 2,
1826 quote_currency: Currency::USD(),
1827 pending_trigger_price: None,
1828 }
1829 }
1830
1831 fn test_execution(tid: &str, price: Decimal, qty: u64, agg: bool) -> AxWsTradeExecution {
1832 AxWsTradeExecution {
1833 tid: tid.to_string(),
1834 s: Ustr::from("BTC-PERP"),
1835 q: qty,
1836 p: price,
1837 d: AxOrderSide::Buy,
1838 agg,
1839 }
1840 }
1841
1842 #[rstest]
1843 fn test_create_order_accepted_populates_cache_and_event() {
1844 let caches = test_caches();
1845 let clock = get_atomic_clock_realtime();
1846 let account_id = AccountId::from("AX-001");
1847 let client_order_id = ClientOrderId::from("O-ACK");
1848 let instrument_id = InstrumentId::from("BTC-PERP.AX");
1849 let venue_order_id = VenueOrderId::new("OID-ACK");
1850
1851 caches.orders_metadata.insert(
1852 client_order_id,
1853 test_metadata(client_order_id, instrument_id),
1854 );
1855 let cid_value = 7u64;
1856 caches
1857 .cid_to_client_order_id
1858 .insert(cid_value, client_order_id);
1859
1860 let mut ws_order = test_ws_order(venue_order_id.as_str(), dec!(50500.00), 100);
1861 ws_order.cid = Some(cid_value);
1862
1863 let event = create_order_accepted(&ws_order, 1609459200, 500, &caches, account_id, clock)
1864 .expect("should produce OrderAccepted");
1865
1866 assert_eq!(event.venue_order_id, venue_order_id);
1867 assert_eq!(event.client_order_id, client_order_id);
1868 assert_eq!(event.account_id, account_id);
1869 assert_eq!(event.instrument_id, instrument_id);
1870 assert_eq!(event.trader_id, TraderId::from("TRADER-001"));
1871 assert_eq!(event.strategy_id, StrategyId::from("S-001"));
1872 assert_eq!(
1873 event.ts_event,
1874 UnixNanos::from(1_609_459_200_000_000_500u64)
1875 );
1876
1877 assert_eq!(
1879 *caches.venue_to_client_id.get(&venue_order_id).unwrap(),
1880 client_order_id,
1881 );
1882 let meta = caches.orders_metadata.get(&client_order_id).unwrap();
1883 assert_eq!(meta.venue_order_id, Some(venue_order_id));
1884 }
1885
1886 #[rstest]
1887 fn test_create_order_accepted_returns_none_without_metadata() {
1888 let caches = test_caches();
1889 let clock = get_atomic_clock_realtime();
1890 let account_id = AccountId::from("AX-001");
1891 let ws_order = test_ws_order("OID-UNKNOWN", dec!(100.00), 10);
1892
1893 let result = create_order_accepted(&ws_order, 1609459200, 0, &caches, account_id, clock);
1894 assert!(result.is_none());
1895 assert!(caches.venue_to_client_id.is_empty());
1896 }
1897
1898 #[rstest]
1899 fn test_lookup_order_metadata_cid_fallback() {
1900 let caches = test_caches();
1901 let client_order_id = ClientOrderId::from("O-CID");
1902 let instrument_id = InstrumentId::from("BTC-PERP.AX");
1903 caches.orders_metadata.insert(
1904 client_order_id,
1905 test_metadata(client_order_id, instrument_id),
1906 );
1907 caches.cid_to_client_order_id.insert(99, client_order_id);
1908
1909 let mut ws_order = test_ws_order("UNKNOWN-OID", dec!(0), 0);
1910 ws_order.cid = Some(99);
1911
1912 let found = lookup_order_metadata(&ws_order, &caches).expect("cid fallback should find");
1913 assert_eq!(found.client_order_id, client_order_id);
1914 }
1915
1916 #[rstest]
1917 fn test_lookup_order_metadata_returns_none_when_unknown() {
1918 let caches = test_caches();
1919 let ws_order = test_ws_order("UNKNOWN-OID", dec!(0), 0);
1920 assert!(lookup_order_metadata(&ws_order, &caches).is_none());
1921 }
1922
1923 #[rstest]
1924 #[case(true, LiquiditySide::Taker)]
1925 #[case(false, LiquiditySide::Maker)]
1926 fn test_create_order_filled_maps_liquidity_side(
1927 #[case] agg: bool,
1928 #[case] expected: LiquiditySide,
1929 ) {
1930 let caches = test_caches();
1931 let clock = get_atomic_clock_realtime();
1932 let account_id = AccountId::from("AX-001");
1933 let client_order_id = ClientOrderId::from("O-FILL");
1934 let instrument_id = InstrumentId::from("BTC-PERP.AX");
1935 let venue_order_id = VenueOrderId::new("OID-FILL");
1936
1937 caches.orders_metadata.insert(
1938 client_order_id,
1939 test_metadata(client_order_id, instrument_id),
1940 );
1941 caches
1942 .venue_to_client_id
1943 .insert(venue_order_id, client_order_id);
1944
1945 let order = test_ws_order(venue_order_id.as_str(), dec!(50500.00), 100);
1946 let execution = test_execution("TID-1", dec!(50500.00), 25, agg);
1947
1948 let event = create_order_filled(
1949 &order, &execution, 1609459200, 0, &caches, account_id, clock,
1950 )
1951 .expect("should produce OrderFilled");
1952
1953 assert_eq!(event.venue_order_id, venue_order_id);
1954 assert_eq!(event.client_order_id, client_order_id);
1955 assert_eq!(event.trade_id, TradeId::new("TID-1"));
1956 assert_eq!(event.last_qty, Quantity::new(25.0, 0));
1957 assert_eq!(event.last_px, Price::from("50500.00"));
1958 assert_eq!(event.liquidity_side, expected);
1959 }
1960
1961 #[rstest]
1962 fn test_create_order_canceled_populates_identifiers() {
1963 let caches = test_caches();
1964 let clock = get_atomic_clock_realtime();
1965 let account_id = AccountId::from("AX-001");
1966 let client_order_id = ClientOrderId::from("O-CXL");
1967 let instrument_id = InstrumentId::from("BTC-PERP.AX");
1968 let venue_order_id = VenueOrderId::new("OID-CXL");
1969
1970 caches.orders_metadata.insert(
1971 client_order_id,
1972 test_metadata(client_order_id, instrument_id),
1973 );
1974 caches
1975 .venue_to_client_id
1976 .insert(venue_order_id, client_order_id);
1977
1978 let order = test_ws_order(venue_order_id.as_str(), dec!(100.00), 10);
1979 let event = create_order_canceled(&order, 1609459200, 0, &caches, account_id, clock)
1980 .expect("should produce OrderCanceled");
1981
1982 assert_eq!(event.venue_order_id, Some(venue_order_id));
1983 assert_eq!(event.client_order_id, client_order_id);
1984 assert_eq!(event.account_id, Some(account_id));
1985 assert_eq!(event.instrument_id, instrument_id);
1986 }
1987
1988 #[rstest]
1989 fn test_create_order_expired_populates_identifiers() {
1990 let caches = test_caches();
1991 let clock = get_atomic_clock_realtime();
1992 let account_id = AccountId::from("AX-001");
1993 let client_order_id = ClientOrderId::from("O-EXP");
1994 let instrument_id = InstrumentId::from("BTC-PERP.AX");
1995 let venue_order_id = VenueOrderId::new("OID-EXP");
1996
1997 caches.orders_metadata.insert(
1998 client_order_id,
1999 test_metadata(client_order_id, instrument_id),
2000 );
2001 caches
2002 .venue_to_client_id
2003 .insert(venue_order_id, client_order_id);
2004
2005 let order = test_ws_order(venue_order_id.as_str(), dec!(100.00), 10);
2006 let event = create_order_expired(&order, 1609459200, 0, &caches, account_id, clock)
2007 .expect("should produce OrderExpired");
2008
2009 assert_eq!(event.venue_order_id, Some(venue_order_id));
2010 assert_eq!(event.client_order_id, client_order_id);
2011 }
2012
2013 #[rstest]
2014 fn test_create_order_rejected_sets_due_post_only_when_reason_matches() {
2015 let caches = test_caches();
2016 let clock = get_atomic_clock_realtime();
2017 let account_id = AccountId::from("AX-001");
2018 let client_order_id = ClientOrderId::from("O-REJ");
2019 let instrument_id = InstrumentId::from("BTC-PERP.AX");
2020
2021 caches.orders_metadata.insert(
2022 client_order_id,
2023 test_metadata(client_order_id, instrument_id),
2024 );
2025 caches
2026 .venue_to_client_id
2027 .insert(VenueOrderId::new("OID-REJ"), client_order_id);
2028
2029 let order = test_ws_order("OID-REJ", dec!(100.00), 10);
2030 let reason = AX_POST_ONLY_REJECT;
2031 let event =
2032 create_order_rejected(&order, reason, 1609459200, 0, &caches, account_id, clock)
2033 .expect("should produce OrderRejected");
2034
2035 assert_eq!(event.due_post_only, 1, "post-only reason should set flag");
2036 assert_eq!(event.reason, Ustr::from(reason));
2037 }
2038
2039 #[rstest]
2040 fn test_create_order_rejected_clears_due_post_only_for_other_reasons() {
2041 let caches = test_caches();
2042 let clock = get_atomic_clock_realtime();
2043 let account_id = AccountId::from("AX-001");
2044 let client_order_id = ClientOrderId::from("O-REJ-2");
2045 let instrument_id = InstrumentId::from("BTC-PERP.AX");
2046
2047 caches.orders_metadata.insert(
2048 client_order_id,
2049 test_metadata(client_order_id, instrument_id),
2050 );
2051 caches
2052 .venue_to_client_id
2053 .insert(VenueOrderId::new("OID-REJ-2"), client_order_id);
2054
2055 let order = test_ws_order("OID-REJ-2", dec!(100.00), 10);
2056 let event = create_order_rejected(
2057 &order,
2058 "INSUFFICIENT_MARGIN",
2059 1609459200,
2060 0,
2061 &caches,
2062 account_id,
2063 clock,
2064 )
2065 .expect("should produce OrderRejected");
2066
2067 assert_eq!(event.due_post_only, 0);
2068 assert_eq!(event.reason, Ustr::from("INSUFFICIENT_MARGIN"));
2069 }
2070
2071 #[rstest]
2072 fn test_cleanup_terminal_order_tracking_removes_all_caches() {
2073 let caches = test_caches();
2074 let client_order_id = ClientOrderId::from("O-CLEAN");
2075 let instrument_id = InstrumentId::from("BTC-PERP.AX");
2076 let venue_order_id = VenueOrderId::new("OID-CLEAN");
2077 let cid_value = 123u64;
2078
2079 caches.orders_metadata.insert(
2080 client_order_id,
2081 test_metadata(client_order_id, instrument_id),
2082 );
2083 caches
2084 .venue_to_client_id
2085 .insert(venue_order_id, client_order_id);
2086 caches
2087 .cid_to_client_order_id
2088 .insert(cid_value, client_order_id);
2089
2090 let mut order = test_ws_order(venue_order_id.as_str(), dec!(100.00), 10);
2091 order.cid = Some(cid_value);
2092
2093 cleanup_terminal_order_tracking(&order, &caches);
2094
2095 assert!(caches.orders_metadata.is_empty());
2096 assert!(caches.venue_to_client_id.is_empty());
2097 assert!(caches.cid_to_client_order_id.is_empty());
2098 }
2099
2100 #[rstest]
2101 fn test_cleanup_terminal_order_tracking_via_cid_when_venue_missing() {
2102 let caches = test_caches();
2103 let client_order_id = ClientOrderId::from("O-CLEAN-CID");
2104 let instrument_id = InstrumentId::from("BTC-PERP.AX");
2105 let cid_value = 321u64;
2106
2107 caches.orders_metadata.insert(
2108 client_order_id,
2109 test_metadata(client_order_id, instrument_id),
2110 );
2111 caches
2112 .cid_to_client_order_id
2113 .insert(cid_value, client_order_id);
2114
2115 let mut order = test_ws_order("OID-UNKNOWN", dec!(100.00), 10);
2117 order.cid = Some(cid_value);
2118
2119 cleanup_terminal_order_tracking(&order, &caches);
2120
2121 assert!(caches.orders_metadata.is_empty());
2122 assert!(caches.cid_to_client_order_id.is_empty());
2123 }
2124
2125 #[rstest]
2126 fn test_cleanup_terminal_order_tracking_noop_when_unknown() {
2127 let caches = test_caches();
2128 let other = ClientOrderId::from("OTHER");
2129 let instrument_id = InstrumentId::from("BTC-PERP.AX");
2130 caches
2131 .orders_metadata
2132 .insert(other, test_metadata(other, instrument_id));
2133
2134 let order = test_ws_order("OID-NOT-TRACKED", dec!(100.00), 10);
2135 cleanup_terminal_order_tracking(&order, &caches);
2136
2137 assert_eq!(caches.orders_metadata.len(), 1);
2139 }
2140
2141 #[rstest]
2142 fn test_cancel_on_disconnect_url_no_existing_query() {
2143 let mut url = "wss://example.com/orders/ws".to_string();
2144 let separator = if url.contains('?') { "&" } else { "?" };
2145 url.push_str(&format!("{separator}cancel_on_disconnect=true"));
2146 assert_eq!(url, "wss://example.com/orders/ws?cancel_on_disconnect=true");
2147 }
2148
2149 #[rstest]
2150 fn test_cancel_on_disconnect_url_with_existing_query() {
2151 let mut url = "wss://example.com/orders/ws?token=abc".to_string();
2152 let separator = if url.contains('?') { "&" } else { "?" };
2153 url.push_str(&format!("{separator}cancel_on_disconnect=true"));
2154 assert_eq!(
2155 url,
2156 "wss://example.com/orders/ws?token=abc&cancel_on_disconnect=true"
2157 );
2158 }
2159}