Skip to main content

nautilus_architect_ax/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the AX Exchange adapter.
17
18use std::{
19    future::Future,
20    sync::Mutex,
21    time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use futures_util::{StreamExt, pin_mut};
27use nautilus_common::{
28    clients::ExecutionClient,
29    live::{get_runtime, runner::get_exec_event_sender},
30    messages::execution::{
31        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
33        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34    },
35};
36use nautilus_core::{
37    AtomicMap, MUTEX_POISONED, UUID4, UnixNanos,
38    time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42    accounts::AccountAny,
43    enums::{AccountType, LiquiditySide, OmsType, OrderSide, OrderStatus, OrderType, TimeInForce},
44    events::{
45        OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
46        OrderFilled, OrderRejected, OrderUpdated,
47    },
48    identifiers::{
49        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, Venue, VenueOrderId,
50    },
51    instruments::{Instrument, InstrumentAny},
52    orders::Order,
53    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
54    types::{AccountBalance, MarginBalance, Money, Price, Quantity},
55};
56use tokio::task::JoinHandle;
57use ustr::Ustr;
58
59use crate::{
60    common::{
61        consts::{
62            AX_ACCOUNT_REGISTRATION_TIMEOUT_SECS, AX_AUTH_TOKEN_TTL_EXEC_SECS, AX_POST_ONLY_REJECT,
63            AX_VENUE,
64        },
65        credential::Credential,
66        enums::AxOrderSide,
67        parse::{ax_timestamp_stn_to_unix_nanos, cid_to_client_order_id, quantity_to_contracts},
68    },
69    config::AxExecClientConfig,
70    http::{
71        client::AxHttpClient,
72        models::{AxOrderRejectReason, PreviewAggressiveLimitOrderRequest, ReplaceOrderRequest},
73    },
74    websocket::{
75        AxOrdersWsMessage, AxWsOrderEvent,
76        messages::{AxWsOrder, AxWsTradeExecution, OrderMetadata},
77        orders::{AxOrdersWebSocketClient, OrdersCaches},
78    },
79};
80
81/// Live execution client for the AX Exchange.
82#[derive(Debug)]
83pub struct AxExecutionClient {
84    core: ExecutionClientCore,
85    clock: &'static AtomicTime,
86    config: AxExecClientConfig,
87    emitter: ExecutionEventEmitter,
88    http_client: AxHttpClient,
89    ws_orders: AxOrdersWebSocketClient,
90    ws_stream_handle: Option<JoinHandle<()>>,
91    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
92}
93
94impl AxExecutionClient {
95    /// Creates a new [`AxExecutionClient`].
96    ///
97    /// # Errors
98    ///
99    /// Returns an error if the client fails to initialize.
100    pub fn new(core: ExecutionClientCore, config: AxExecClientConfig) -> anyhow::Result<Self> {
101        let http_client = AxHttpClient::with_credentials(
102            config.api_key.clone().unwrap_or_default(),
103            config.api_secret.clone().unwrap_or_default(),
104            Some(config.http_base_url()),
105            Some(config.orders_base_url()),
106            config.http_timeout_secs,
107            config.max_retries,
108            config.retry_delay_initial_ms,
109            config.retry_delay_max_ms,
110            config.proxy_url.clone(),
111        )?;
112
113        let clock = get_atomic_clock_realtime();
114        let trader_id = core.trader_id;
115        let account_id = core.account_id;
116        let emitter =
117            ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
118        let mut ws_url = config.ws_private_url();
119        if config.cancel_on_disconnect {
120            let separator = if ws_url.contains('?') { "&" } else { "?" };
121            ws_url.push_str(&format!("{separator}cancel_on_disconnect=true"));
122        }
123        let ws_orders = AxOrdersWebSocketClient::new(
124            ws_url,
125            account_id,
126            trader_id,
127            config.heartbeat_interval_secs,
128            config.transport_backend,
129            config.proxy_url.clone(),
130        );
131
132        Ok(Self {
133            core,
134            clock,
135            config,
136            emitter,
137            http_client,
138            ws_orders,
139            ws_stream_handle: None,
140            pending_tasks: Mutex::new(Vec::new()),
141        })
142    }
143
144    async fn authenticate(&self) -> anyhow::Result<String> {
145        let credential =
146            Credential::resolve(self.config.api_key.clone(), self.config.api_secret.clone())
147                .context("API credentials not configured")?;
148
149        self.http_client
150            .authenticate(
151                credential.api_key(),
152                credential.api_secret(),
153                AX_AUTH_TOKEN_TTL_EXEC_SECS,
154            )
155            .await
156            .map_err(|e| anyhow::anyhow!("Authentication failed: {e}"))
157    }
158
159    fn update_account_state(&self) {
160        let http_client = self.http_client.clone();
161        let account_id = self.core.account_id;
162        let emitter = self.emitter.clone();
163        let clock = self.clock;
164
165        self.spawn_task("query_account", async move {
166            let account_state = http_client
167                .request_account_state(account_id)
168                .await
169                .context("failed to request AX account state")?;
170            let ts_event = clock.get_time_ns();
171            emitter.emit_account_state(
172                account_state.balances.clone(),
173                account_state.margins.clone(),
174                account_state.is_reported,
175                ts_event,
176            );
177            Ok(())
178        });
179    }
180
181    fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
182        let (
183            client_order_id,
184            strategy_id,
185            instrument_id,
186            order_side,
187            order_type,
188            quantity,
189            trigger_price,
190            time_in_force,
191            is_post_only,
192            limit_price,
193        ) = {
194            let cache = self.core.cache();
195            let order = cache.order(&cmd.client_order_id).ok_or_else(|| {
196                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
197            })?;
198            (
199                order.client_order_id(),
200                order.strategy_id(),
201                order.instrument_id(),
202                order.order_side(),
203                order.order_type(),
204                order.quantity(),
205                order.trigger_price(),
206                order.time_in_force(),
207                order.is_post_only(),
208                order.price(),
209            )
210        };
211
212        let ws_orders = self.ws_orders.clone();
213        let emitter = self.emitter.clone();
214        let clock = self.clock;
215        let trader_id = self.core.trader_id;
216
217        let http_client = if order_type == OrderType::Market {
218            Some(self.http_client.clone())
219        } else {
220            None
221        };
222
223        self.spawn_task("submit_order", async move {
224            let result: anyhow::Result<()> = async {
225                // For market orders, get the take-through price from AX.
226                // The preview and submit are not atomic: the book can change
227                // between the two calls. This is safe because submit_order
228                // forces IOC time-in-force for market orders, so the order
229                // fills immediately or is canceled (it cannot rest on the book).
230                // If the book moves past the previewed take-through price the
231                // order may partially fill with the remainder canceled.
232                let price = if order_type == OrderType::Market {
233                    let symbol = instrument_id.symbol.inner();
234                    let ax_side = AxOrderSide::try_from(order_side)
235                        .map_err(|e| anyhow::anyhow!("Invalid order side: {e}"))?;
236                    let qty_contracts = quantity_to_contracts(quantity)?;
237
238                    let request =
239                        PreviewAggressiveLimitOrderRequest::new(symbol, qty_contracts, ax_side);
240                    let response = http_client
241                        .expect("HTTP client should be set for market orders")
242                        .inner
243                        .preview_aggressive_limit_order(&request)
244                        .await
245                        .map_err(|e| {
246                            anyhow::anyhow!("Failed to preview aggressive limit order: {e}")
247                        })?;
248
249                    if response.remaining_quantity > 0 {
250                        log::warn!(
251                            "Market order book depth insufficient: \
252                             filled_qty={} remaining_qty={} for {instrument_id}",
253                            response.filled_quantity,
254                            response.remaining_quantity,
255                        );
256                    }
257
258                    let limit_price_decimal = response.limit_price.ok_or_else(|| {
259                        anyhow::anyhow!(
260                            "No liquidity available for market order on {instrument_id}"
261                        )
262                    })?;
263
264                    let price = Price::from(limit_price_decimal.to_string().as_str());
265                    log::info!("Market order take-through price: {price} for {instrument_id}",);
266                    Some(price)
267                } else {
268                    limit_price
269                };
270
271                ws_orders
272                    .submit_order(
273                        trader_id,
274                        strategy_id,
275                        instrument_id,
276                        client_order_id,
277                        order_side,
278                        order_type,
279                        quantity,
280                        time_in_force,
281                        price,
282                        trigger_price,
283                        is_post_only,
284                    )
285                    .await
286                    .map_err(|e| anyhow::anyhow!("Submit order failed: {e}"))?;
287
288                Ok(())
289            }
290            .await;
291
292            if let Err(e) = result {
293                let ts_event = clock.get_time_ns();
294                emitter.emit_order_rejected_event(
295                    strategy_id,
296                    instrument_id,
297                    client_order_id,
298                    &format!("submit-order-error: {e}"),
299                    ts_event,
300                    false,
301                );
302                anyhow::bail!("{e}");
303            }
304
305            Ok(())
306        });
307
308        Ok(())
309    }
310
311    fn cancel_order_internal(&self, cmd: &CancelOrder) {
312        let ws_orders = self.ws_orders.clone();
313
314        let emitter = self.emitter.clone();
315        let clock = self.clock;
316        let instrument_id = cmd.instrument_id;
317        let client_order_id = cmd.client_order_id;
318        let venue_order_id = cmd.venue_order_id;
319        let strategy_id = cmd.strategy_id;
320
321        self.spawn_task("cancel_order", async move {
322            let result = ws_orders
323                .cancel_order(client_order_id, venue_order_id)
324                .await
325                .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
326
327            if let Err(e) = &result {
328                let ts_event = clock.get_time_ns();
329                emitter.emit_order_cancel_rejected_event(
330                    strategy_id,
331                    instrument_id,
332                    client_order_id,
333                    venue_order_id,
334                    &format!("cancel-order-error: {e}"),
335                    ts_event,
336                );
337                anyhow::bail!("{e}");
338            }
339
340            Ok(())
341        });
342    }
343
344    fn spawn_task<F>(&self, description: &'static str, fut: F)
345    where
346        F: Future<Output = anyhow::Result<()>> + Send + 'static,
347    {
348        let runtime = get_runtime();
349        let handle = runtime.spawn(async move {
350            if let Err(e) = fut.await {
351                log::warn!("{description} failed: {e}");
352            }
353        });
354
355        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
356        tasks.retain(|handle| !handle.is_finished());
357        tasks.push(handle);
358    }
359
360    fn abort_pending_tasks(&self) {
361        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
362        for handle in tasks.drain(..) {
363            handle.abort();
364        }
365    }
366
367    /// Polls the cache until the account is registered or timeout is reached.
368    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
369        let account_id = self.core.account_id;
370
371        if self.core.cache().account(&account_id).is_some() {
372            log::info!("Account {account_id} registered");
373            return Ok(());
374        }
375
376        let start = Instant::now();
377        let timeout = Duration::from_secs_f64(timeout_secs);
378        let interval = Duration::from_millis(10);
379
380        loop {
381            tokio::time::sleep(interval).await;
382
383            if self.core.cache().account(&account_id).is_some() {
384                log::info!("Account {account_id} registered");
385                return Ok(());
386            }
387
388            if start.elapsed() >= timeout {
389                anyhow::bail!(
390                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
391                );
392            }
393        }
394    }
395}
396
397#[async_trait(?Send)]
398impl ExecutionClient for AxExecutionClient {
399    fn is_connected(&self) -> bool {
400        self.core.is_connected()
401    }
402
403    fn client_id(&self) -> ClientId {
404        self.core.client_id
405    }
406
407    fn account_id(&self) -> AccountId {
408        self.core.account_id
409    }
410
411    fn venue(&self) -> Venue {
412        *AX_VENUE
413    }
414
415    fn oms_type(&self) -> OmsType {
416        self.core.oms_type
417    }
418
419    fn get_account(&self) -> Option<AccountAny> {
420        self.core.cache().account(&self.core.account_id).cloned()
421    }
422
423    async fn connect(&mut self) -> anyhow::Result<()> {
424        if self.core.is_connected() {
425            return Ok(());
426        }
427
428        // Reset so requests work after a previous disconnect
429        self.http_client.reset_cancellation_token();
430
431        if !self.core.instruments_initialized() {
432            let instruments = self
433                .http_client
434                .request_instruments(None, None)
435                .await
436                .context("failed to request AX instruments")?;
437
438            if instruments.is_empty() {
439                log::warn!("No instruments returned from AX");
440            } else {
441                log::info!("Loaded {} instruments", instruments.len());
442                self.http_client.cache_instruments(&instruments);
443                self.ws_orders.cache_instruments(&instruments);
444            }
445            self.core.set_instruments_initialized();
446        }
447
448        let token = self.authenticate().await?;
449        self.ws_orders.connect(&token).await?;
450        log::info!("Connected to orders WebSocket");
451
452        let should_spawn = match &self.ws_stream_handle {
453            None => true,
454            Some(handle) => handle.is_finished(),
455        };
456
457        if should_spawn {
458            let stream = self.ws_orders.stream();
459            let emitter = self.emitter.clone();
460            let caches = self.ws_orders.caches().clone();
461            let account_id = self.core.account_id;
462            let instruments_cache = self.ws_orders.instruments_cache();
463            let clock = self.clock;
464
465            let handle = get_runtime().spawn(async move {
466                pin_mut!(stream);
467                while let Some(message) = stream.next().await {
468                    dispatch_ws_message(
469                        message,
470                        &emitter,
471                        &caches,
472                        account_id,
473                        &instruments_cache,
474                        clock,
475                    );
476                }
477            });
478            self.ws_stream_handle = Some(handle);
479        }
480
481        let account_state = self
482            .http_client
483            .request_account_state(self.core.account_id)
484            .await
485            .context("failed to request AX account state")?;
486
487        if !account_state.balances.is_empty() {
488            log::info!(
489                "Received account state with {} balance(s)",
490                account_state.balances.len()
491            );
492        }
493        self.emitter.send_account_state(account_state);
494
495        self.await_account_registered(AX_ACCOUNT_REGISTRATION_TIMEOUT_SECS)
496            .await?;
497
498        self.core.set_connected();
499        log::info!("Connected: client_id={}", self.core.client_id);
500        Ok(())
501    }
502
503    async fn disconnect(&mut self) -> anyhow::Result<()> {
504        if self.core.is_disconnected() {
505            return Ok(());
506        }
507
508        self.abort_pending_tasks();
509        self.http_client.cancel_all_requests();
510
511        self.ws_orders.close().await;
512
513        if let Some(handle) = self.ws_stream_handle.take() {
514            handle.abort();
515        }
516
517        self.core.set_disconnected();
518        log::info!("Disconnected: client_id={}", self.core.client_id);
519        Ok(())
520    }
521
522    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
523        self.update_account_state();
524        Ok(())
525    }
526
527    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
528        let http_client = self.http_client.clone();
529        let account_id = self.core.account_id;
530        let client_order_id = cmd.client_order_id;
531        let venue_order_id = cmd.venue_order_id;
532        let instrument_id = cmd.instrument_id;
533        let emitter = self.emitter.clone();
534
535        // Read immutable order fields from cache before spawning
536        let (order_side, order_type, time_in_force) = {
537            let cache = self.core.cache();
538            match cache.order(&client_order_id) {
539                Some(order) => (
540                    order.order_side(),
541                    order.order_type(),
542                    order.time_in_force(),
543                ),
544                None => (OrderSide::NoOrderSide, OrderType::Limit, TimeInForce::Gtc),
545            }
546        };
547
548        self.spawn_task("query_order", async move {
549            match http_client
550                .request_order_status(
551                    account_id,
552                    instrument_id,
553                    Some(client_order_id),
554                    venue_order_id,
555                    order_side,
556                    order_type,
557                    time_in_force,
558                )
559                .await
560            {
561                Ok(report) => emitter.send_order_status_report(report),
562                Err(e) => log::error!("AX query order failed: {e}"),
563            }
564            Ok(())
565        });
566
567        Ok(())
568    }
569
570    fn generate_account_state(
571        &self,
572        balances: Vec<AccountBalance>,
573        margins: Vec<MarginBalance>,
574        reported: bool,
575        ts_event: UnixNanos,
576    ) -> anyhow::Result<()> {
577        self.emitter
578            .emit_account_state(balances, margins, reported, ts_event);
579        Ok(())
580    }
581
582    fn start(&mut self) -> anyhow::Result<()> {
583        if self.core.is_started() {
584            return Ok(());
585        }
586
587        self.emitter.set_sender(get_exec_event_sender());
588        self.core.set_started();
589        log::info!(
590            "Started: client_id={}, account_id={}, environment={}",
591            self.core.client_id,
592            self.core.account_id,
593            self.config.environment,
594        );
595        Ok(())
596    }
597
598    fn stop(&mut self) -> anyhow::Result<()> {
599        if self.core.is_stopped() {
600            return Ok(());
601        }
602
603        self.core.set_stopped();
604        self.core.set_disconnected();
605
606        if let Some(handle) = self.ws_stream_handle.take() {
607            handle.abort();
608        }
609        self.abort_pending_tasks();
610        log::info!("Stopped: client_id={}", self.core.client_id);
611        Ok(())
612    }
613
614    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
615        {
616            let cache = self.core.cache();
617            let order = cache.order(&cmd.client_order_id).ok_or_else(|| {
618                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
619            })?;
620
621            if order.is_closed() {
622                log::warn!("Cannot submit closed order {}", order.client_order_id());
623                return Ok(());
624            }
625
626            if !matches!(
627                order.order_type(),
628                OrderType::Market | OrderType::Limit | OrderType::StopLimit
629            ) {
630                self.emitter.emit_order_denied(
631                    order,
632                    &format!(
633                        "Unsupported order type: {:?}, \
634                         AX supports MARKET, LIMIT and STOP_LIMIT",
635                        order.order_type(),
636                    ),
637                );
638                return Ok(());
639            }
640
641            if order.time_in_force() == TimeInForce::Gtd {
642                self.emitter.emit_order_denied(
643                    order,
644                    "Unsupported time in force: GTD, \
645                     AX supports GTC, IOC, FOK, and DAY",
646                );
647                return Ok(());
648            }
649
650            log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
651            self.emitter.emit_order_submitted(order);
652        }
653
654        self.submit_order_internal(&cmd)
655    }
656
657    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
658        for (client_order_id, order_init) in cmd
659            .order_list
660            .client_order_ids
661            .iter()
662            .zip(cmd.order_inits.iter())
663        {
664            let submit_cmd = SubmitOrder::new(
665                cmd.trader_id,
666                cmd.client_id,
667                cmd.strategy_id,
668                cmd.instrument_id,
669                *client_order_id,
670                order_init.clone(),
671                cmd.exec_algorithm_id,
672                cmd.position_id,
673                cmd.params.clone(),
674                UUID4::new(),
675                cmd.ts_init,
676            );
677            self.submit_order(submit_cmd)?;
678        }
679        Ok(())
680    }
681
682    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
683        let venue_order_id = match cmd.venue_order_id {
684            Some(ref voi) => *voi,
685            None => {
686                let reason = "Cannot modify order without venue_order_id";
687                log::error!("{reason}");
688                let ts_event = self.clock.get_time_ns();
689                self.emitter.emit_order_modify_rejected_event(
690                    cmd.strategy_id,
691                    cmd.instrument_id,
692                    cmd.client_order_id,
693                    cmd.venue_order_id,
694                    reason,
695                    ts_event,
696                );
697                return Ok(());
698            }
699        };
700
701        let http_client = self.http_client.clone();
702        let emitter = self.emitter.clone();
703        let caches = self.ws_orders.caches().clone();
704        let clock = self.clock;
705        let client_order_id = cmd.client_order_id;
706        let strategy_id = cmd.strategy_id;
707        let instrument_id = cmd.instrument_id;
708        let quantity = cmd.quantity;
709        let price = cmd.price;
710        let trigger_price = cmd.trigger_price;
711
712        self.spawn_task("modify_order", async move {
713            let mut request = ReplaceOrderRequest::new(venue_order_id.as_str());
714
715            if let Some(price) = price {
716                request = request.with_price(price.as_decimal());
717            }
718
719            if let Some(qty) = quantity {
720                let contracts = quantity_to_contracts(qty)?;
721                request = request.with_quantity(contracts);
722            }
723
724            if let Some(trigger) = trigger_price {
725                request = request.with_trigger_price(trigger.as_decimal());
726            }
727
728            match http_client.inner.replace_order(&request).await {
729                Ok(resp) => {
730                    let new_venue_order_id = VenueOrderId::new(&resp.oid);
731                    caches
732                        .venue_to_client_id
733                        .insert(new_venue_order_id, client_order_id);
734                    if let Some(mut entry) = caches.orders_metadata.get_mut(&client_order_id) {
735                        entry.venue_order_id = Some(new_venue_order_id);
736                        entry.pending_trigger_price = trigger_price;
737                    }
738                    log::info!("Order replaced: old={} new={}", request.oid, resp.oid);
739                }
740                Err(e) => {
741                    let reason = format!("modify-order-error: {e}");
742                    let ts_event = clock.get_time_ns();
743                    emitter.emit_order_modify_rejected_event(
744                        strategy_id,
745                        instrument_id,
746                        client_order_id,
747                        Some(VenueOrderId::new(&request.oid)),
748                        &reason,
749                        ts_event,
750                    );
751                    anyhow::bail!("{reason}");
752                }
753            }
754
755            Ok(())
756        });
757
758        Ok(())
759    }
760
761    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
762        self.cancel_order_internal(&cmd);
763        Ok(())
764    }
765
766    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
767        let http_client = self.http_client.clone();
768        let emitter = self.emitter.clone();
769        let clock = self.clock;
770        let instrument_id = cmd.instrument_id;
771        let account_id = self.core.account_id;
772        let trader_id = self.core.trader_id;
773
774        // Snapshot open orders so we can emit cancel events after the HTTP request
775        let open_orders: Vec<(ClientOrderId, Option<VenueOrderId>, StrategyId)> = {
776            let cache = self.core.cache();
777            cache
778                .orders_open(None, Some(&instrument_id), None, None, None)
779                .iter()
780                .map(|o| (o.client_order_id(), o.venue_order_id(), o.strategy_id()))
781                .collect()
782        };
783
784        let caches = self.ws_orders.caches().clone();
785
786        self.spawn_task("cancel_all_orders", async move {
787            match http_client.cancel_all_orders(instrument_id).await {
788                Ok(()) => {
789                    log::info!("Canceled all orders for {instrument_id}");
790
791                    // AX does not push WS cancel confirmations for HTTP-initiated
792                    // cancels, so emit OrderCanceled events locally and clean up
793                    // tracking state to prevent duplicates if WS events arrive
794                    let ts_event = clock.get_time_ns();
795
796                    for (client_order_id, venue_order_id, strategy_id) in &open_orders {
797                        let event = OrderCanceled::new(
798                            trader_id,
799                            *strategy_id,
800                            instrument_id,
801                            *client_order_id,
802                            UUID4::new(),
803                            ts_event,
804                            clock.get_time_ns(),
805                            false,
806                            *venue_order_id,
807                            Some(account_id),
808                        );
809                        emitter.send_order_event(OrderEventAny::Canceled(event));
810
811                        if let Some(voi) = venue_order_id {
812                            caches.venue_to_client_id.remove(voi);
813                        }
814                        caches.orders_metadata.remove(client_order_id);
815                    }
816                }
817                Err(e) => {
818                    log::error!("Failed to cancel all orders for {instrument_id}: {e}");
819                    let ts_event = clock.get_time_ns();
820
821                    for (client_order_id, venue_order_id, strategy_id) in &open_orders {
822                        emitter.emit_order_cancel_rejected_event(
823                            *strategy_id,
824                            instrument_id,
825                            *client_order_id,
826                            *venue_order_id,
827                            &format!("cancel-all-orders-error: {e}"),
828                            ts_event,
829                        );
830                    }
831                }
832            }
833            Ok(())
834        });
835
836        Ok(())
837    }
838
839    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
840        for cancel in &cmd.cancels {
841            self.cancel_order_internal(cancel);
842        }
843        Ok(())
844    }
845
846    async fn generate_order_status_report(
847        &self,
848        cmd: &GenerateOrderStatusReport,
849    ) -> anyhow::Result<Option<OrderStatusReport>> {
850        let cid_map = self.ws_orders.cid_to_client_order_id().clone();
851        let cid_resolver = move |cid: u64| cid_map.get(&cid).map(|v| *v);
852
853        let mut reports = self
854            .http_client
855            .request_order_status_reports(self.core.account_id, Some(cid_resolver))
856            .await?;
857
858        if let Some(instrument_id) = cmd.instrument_id {
859            reports.retain(|report| report.instrument_id == instrument_id);
860        }
861
862        if let Some(client_order_id) = cmd.client_order_id {
863            reports.retain(|report| report.client_order_id == Some(client_order_id));
864        }
865
866        if let Some(venue_order_id) = cmd.venue_order_id {
867            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
868        }
869
870        Ok(reports.into_iter().next())
871    }
872
873    async fn generate_order_status_reports(
874        &self,
875        cmd: &GenerateOrderStatusReports,
876    ) -> anyhow::Result<Vec<OrderStatusReport>> {
877        let cid_map = self.ws_orders.cid_to_client_order_id().clone();
878        let cid_resolver = move |cid: u64| cid_map.get(&cid).map(|v| *v);
879
880        let mut reports = self
881            .http_client
882            .request_order_status_reports(self.core.account_id, Some(cid_resolver))
883            .await?;
884
885        if let Some(instrument_id) = cmd.instrument_id {
886            reports.retain(|report| report.instrument_id == instrument_id);
887        }
888
889        if cmd.open_only {
890            reports.retain(|r| r.order_status.is_open());
891        }
892
893        if let Some(start) = cmd.start {
894            reports.retain(|r| r.ts_last >= start);
895        }
896
897        if let Some(end) = cmd.end {
898            reports.retain(|r| r.ts_last <= end);
899        }
900
901        Ok(reports)
902    }
903
904    async fn generate_fill_reports(
905        &self,
906        cmd: GenerateFillReports,
907    ) -> anyhow::Result<Vec<FillReport>> {
908        let mut reports = self
909            .http_client
910            .request_fill_reports(self.core.account_id)
911            .await?;
912
913        if let Some(instrument_id) = cmd.instrument_id {
914            reports.retain(|report| report.instrument_id == instrument_id);
915        }
916
917        if let Some(venue_order_id) = cmd.venue_order_id {
918            reports.retain(|report| report.venue_order_id.as_str() == venue_order_id.as_str());
919        }
920
921        Ok(reports)
922    }
923
924    async fn generate_position_status_reports(
925        &self,
926        cmd: &GeneratePositionStatusReports,
927    ) -> anyhow::Result<Vec<PositionStatusReport>> {
928        let mut reports = self
929            .http_client
930            .request_position_reports(self.core.account_id)
931            .await?;
932
933        if let Some(instrument_id) = cmd.instrument_id {
934            reports.retain(|report| report.instrument_id == instrument_id);
935        }
936
937        Ok(reports)
938    }
939
940    async fn generate_mass_status(
941        &self,
942        lookback_mins: Option<u64>,
943    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
944        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
945
946        let ts_now = self.clock.get_time_ns();
947
948        let start = lookback_mins.map(|mins| {
949            let lookback_ns = mins * 60 * 1_000_000_000;
950            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
951        });
952
953        let order_cmd = GenerateOrderStatusReports::new(
954            UUID4::new(),
955            ts_now,
956            false, // open_only
957            None,  // instrument_id
958            start,
959            None, // end
960            None, // params
961            None, // correlation_id
962        );
963
964        let fill_cmd = GenerateFillReports::new(
965            UUID4::new(),
966            ts_now,
967            None, // instrument_id
968            None, // venue_order_id
969            start,
970            None, // end
971            None, // params
972            None, // correlation_id
973        );
974
975        let position_cmd = GeneratePositionStatusReports::new(
976            UUID4::new(),
977            ts_now,
978            None, // instrument_id
979            start,
980            None, // end
981            None, // params
982            None, // correlation_id
983        );
984
985        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
986            self.generate_order_status_reports(&order_cmd),
987            self.generate_fill_reports(fill_cmd),
988            self.generate_position_status_reports(&position_cmd),
989        )?;
990
991        log::info!("Received {} OrderStatusReports", order_reports.len());
992        log::info!("Received {} FillReports", fill_reports.len());
993        log::info!("Received {} PositionReports", position_reports.len());
994
995        let mut mass_status = ExecutionMassStatus::new(
996            self.core.client_id,
997            self.core.account_id,
998            *AX_VENUE,
999            ts_now,
1000            None,
1001        );
1002
1003        mass_status.add_order_reports(order_reports);
1004        mass_status.add_fill_reports(fill_reports);
1005        mass_status.add_position_reports(position_reports);
1006
1007        Ok(Some(mass_status))
1008    }
1009
1010    fn register_external_order(
1011        &self,
1012        client_order_id: ClientOrderId,
1013        venue_order_id: VenueOrderId,
1014        instrument_id: InstrumentId,
1015        strategy_id: StrategyId,
1016        _ts_init: UnixNanos,
1017    ) {
1018        self.ws_orders.register_external_order(
1019            client_order_id,
1020            venue_order_id,
1021            instrument_id,
1022            strategy_id,
1023        );
1024    }
1025}
1026
1027/// Dispatches a WebSocket message using the event emitter.
1028fn dispatch_ws_message(
1029    message: AxOrdersWsMessage,
1030    emitter: &ExecutionEventEmitter,
1031    caches: &OrdersCaches,
1032    account_id: AccountId,
1033    instruments: &AtomicMap<Ustr, InstrumentAny>,
1034    clock: &'static AtomicTime,
1035) {
1036    match message {
1037        AxOrdersWsMessage::Event(event) => {
1038            dispatch_order_event(*event, emitter, caches, account_id, instruments, clock);
1039        }
1040        AxOrdersWsMessage::PlaceOrderResponse(resp) => {
1041            log::debug!(
1042                "Place order response: rid={} oid={}",
1043                resp.rid,
1044                resp.res.oid
1045            );
1046        }
1047        AxOrdersWsMessage::CancelOrderResponse(resp) => {
1048            log::debug!(
1049                "Cancel order response: rid={} accepted={}",
1050                resp.rid,
1051                resp.res.cxl_rx
1052            );
1053        }
1054        AxOrdersWsMessage::OpenOrdersResponse(resp) => {
1055            log::debug!("Open orders response: {} orders", resp.res.len());
1056        }
1057        AxOrdersWsMessage::Error(err) => {
1058            log::error!("WebSocket error: {}", err.message);
1059        }
1060        AxOrdersWsMessage::Reconnected => {
1061            log::info!("WebSocket reconnected");
1062        }
1063        AxOrdersWsMessage::Authenticated => {
1064            log::debug!("WebSocket authenticated");
1065        }
1066    }
1067}
1068
1069fn dispatch_order_event(
1070    event: AxWsOrderEvent,
1071    emitter: &ExecutionEventEmitter,
1072    caches: &OrdersCaches,
1073    account_id: AccountId,
1074    instruments: &AtomicMap<Ustr, InstrumentAny>,
1075    clock: &'static AtomicTime,
1076) {
1077    match event {
1078        AxWsOrderEvent::Heartbeat => {}
1079        AxWsOrderEvent::Acknowledged(msg) => {
1080            if let Some(event) =
1081                create_order_accepted(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
1082            {
1083                emitter.send_order_event(OrderEventAny::Accepted(event));
1084            } else if let Some(report) = create_order_status_report(
1085                &msg.o,
1086                OrderStatus::Accepted,
1087                msg.ts,
1088                msg.tn,
1089                caches,
1090                account_id,
1091                instruments,
1092                clock,
1093            ) {
1094                emitter.send_order_status_report(report);
1095            }
1096        }
1097        AxWsOrderEvent::PartiallyFilled(msg) => {
1098            dispatch_fill_event(
1099                &msg.o,
1100                &msg.xs,
1101                msg.ts,
1102                msg.tn,
1103                emitter,
1104                caches,
1105                account_id,
1106                instruments,
1107                clock,
1108            );
1109        }
1110        AxWsOrderEvent::Filled(msg) => {
1111            dispatch_fill_event(
1112                &msg.o,
1113                &msg.xs,
1114                msg.ts,
1115                msg.tn,
1116                emitter,
1117                caches,
1118                account_id,
1119                instruments,
1120                clock,
1121            );
1122            cleanup_terminal_order_tracking(&msg.o, caches);
1123        }
1124        AxWsOrderEvent::Canceled(msg) => {
1125            if let Some(event) =
1126                create_order_canceled(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
1127            {
1128                emitter.send_order_event(OrderEventAny::Canceled(event));
1129            } else if let Some(report) = create_order_status_report(
1130                &msg.o,
1131                OrderStatus::Canceled,
1132                msg.ts,
1133                msg.tn,
1134                caches,
1135                account_id,
1136                instruments,
1137                clock,
1138            ) {
1139                emitter.send_order_status_report(report);
1140            }
1141            cleanup_terminal_order_tracking(&msg.o, caches);
1142        }
1143        AxWsOrderEvent::Rejected(msg) => {
1144            let known_reason = msg.r.filter(|r| !matches!(r, AxOrderRejectReason::Unknown));
1145            let reason = known_reason
1146                .as_ref()
1147                .map(AsRef::as_ref)
1148                .or(msg.txt.as_deref())
1149                .unwrap_or("UNKNOWN");
1150
1151            if let Some(event) =
1152                create_order_rejected(&msg.o, reason, msg.ts, msg.tn, caches, account_id, clock)
1153            {
1154                emitter.send_order_event(OrderEventAny::Rejected(event));
1155            }
1156            cleanup_terminal_order_tracking(&msg.o, caches);
1157        }
1158        AxWsOrderEvent::Expired(msg) => {
1159            if let Some(event) =
1160                create_order_expired(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
1161            {
1162                emitter.send_order_event(OrderEventAny::Expired(event));
1163            } else if let Some(report) = create_order_status_report(
1164                &msg.o,
1165                OrderStatus::Expired,
1166                msg.ts,
1167                msg.tn,
1168                caches,
1169                account_id,
1170                instruments,
1171                clock,
1172            ) {
1173                emitter.send_order_status_report(report);
1174            }
1175            cleanup_terminal_order_tracking(&msg.o, caches);
1176        }
1177        AxWsOrderEvent::Replaced(msg) => {
1178            if let Some(event) =
1179                create_order_updated(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
1180            {
1181                emitter.send_order_event(OrderEventAny::Updated(event));
1182            } else if let Some(report) = create_order_status_report(
1183                &msg.o,
1184                OrderStatus::Accepted,
1185                msg.ts,
1186                msg.tn,
1187                caches,
1188                account_id,
1189                instruments,
1190                clock,
1191            ) {
1192                emitter.send_order_status_report(report);
1193            }
1194        }
1195        AxWsOrderEvent::DoneForDay(msg) => {
1196            if let Some(event) =
1197                create_order_expired(&msg.o, msg.ts, msg.tn, caches, account_id, clock)
1198            {
1199                emitter.send_order_event(OrderEventAny::Expired(event));
1200            } else if let Some(report) = create_order_status_report(
1201                &msg.o,
1202                OrderStatus::Expired,
1203                msg.ts,
1204                msg.tn,
1205                caches,
1206                account_id,
1207                instruments,
1208                clock,
1209            ) {
1210                emitter.send_order_status_report(report);
1211            }
1212            cleanup_terminal_order_tracking(&msg.o, caches);
1213        }
1214        AxWsOrderEvent::CancelRejected(msg) => {
1215            let venue_order_id = VenueOrderId::new(&msg.oid);
1216            if let Some(client_order_id) = caches.venue_to_client_id.get(&venue_order_id)
1217                && let Some(metadata) = caches.orders_metadata.get(&client_order_id)
1218            {
1219                let event = OrderCancelRejected::new(
1220                    metadata.trader_id,
1221                    metadata.strategy_id,
1222                    metadata.instrument_id,
1223                    metadata.client_order_id,
1224                    Ustr::from(msg.r.as_ref()),
1225                    UUID4::new(),
1226                    clock.get_time_ns(),
1227                    metadata.ts_init,
1228                    false,
1229                    Some(venue_order_id),
1230                    Some(account_id),
1231                );
1232                emitter.send_order_event(OrderEventAny::CancelRejected(event));
1233            } else {
1234                log::warn!(
1235                    "Could not find metadata for cancel rejected order {}",
1236                    msg.oid
1237                );
1238            }
1239        }
1240    }
1241}
1242
1243#[expect(clippy::too_many_arguments)]
1244fn dispatch_fill_event(
1245    order: &AxWsOrder,
1246    execution: &AxWsTradeExecution,
1247    ts: i64,
1248    tn: i64,
1249    emitter: &ExecutionEventEmitter,
1250    caches: &OrdersCaches,
1251    account_id: AccountId,
1252    instruments: &AtomicMap<Ustr, InstrumentAny>,
1253    clock: &'static AtomicTime,
1254) {
1255    if let Some(event) = create_order_filled(order, execution, ts, tn, caches, account_id, clock) {
1256        emitter.send_order_event(OrderEventAny::Filled(event));
1257    } else if let Some(report) = create_fill_report(
1258        order,
1259        execution,
1260        ts,
1261        tn,
1262        caches,
1263        account_id,
1264        instruments,
1265        clock,
1266    ) {
1267        emitter.send_fill_report(report);
1268    }
1269}
1270
1271pub(crate) fn lookup_order_metadata<'a>(
1272    order: &AxWsOrder,
1273    caches: &'a OrdersCaches,
1274) -> Option<dashmap::mapref::one::Ref<'a, ClientOrderId, OrderMetadata>> {
1275    let venue_order_id = VenueOrderId::new(&order.oid);
1276
1277    if let Some(client_order_id) = caches.venue_to_client_id.get(&venue_order_id)
1278        && let Some(metadata) = caches.orders_metadata.get(&*client_order_id)
1279    {
1280        return Some(metadata);
1281    }
1282
1283    if let Some(cid) = order.cid
1284        && let Some(client_order_id) = caches.cid_to_client_order_id.get(&cid)
1285        && let Some(metadata) = caches.orders_metadata.get(&*client_order_id)
1286    {
1287        return Some(metadata);
1288    }
1289
1290    None
1291}
1292
1293pub(crate) fn create_order_accepted(
1294    order: &AxWsOrder,
1295    event_ts: i64,
1296    event_tn: i64,
1297    caches: &OrdersCaches,
1298    account_id: AccountId,
1299    clock: &'static AtomicTime,
1300) -> Option<OrderAccepted> {
1301    let venue_order_id = VenueOrderId::new(&order.oid);
1302    let metadata = lookup_order_metadata(order, caches)?;
1303
1304    let client_order_id = metadata.client_order_id;
1305    let trader_id = metadata.trader_id;
1306    let strategy_id = metadata.strategy_id;
1307    let instrument_id = metadata.instrument_id;
1308    drop(metadata);
1309
1310    caches
1311        .venue_to_client_id
1312        .insert(venue_order_id, client_order_id);
1313
1314    if let Some(mut entry) = caches.orders_metadata.get_mut(&client_order_id) {
1315        entry.venue_order_id = Some(venue_order_id);
1316    }
1317
1318    let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1319        .map_err(|e| log::error!("{e}"))
1320        .ok()?;
1321
1322    Some(OrderAccepted::new(
1323        trader_id,
1324        strategy_id,
1325        instrument_id,
1326        client_order_id,
1327        venue_order_id,
1328        account_id,
1329        UUID4::new(),
1330        ts_event,
1331        clock.get_time_ns(),
1332        false,
1333    ))
1334}
1335
1336pub(crate) fn create_order_updated(
1337    order: &AxWsOrder,
1338    event_ts: i64,
1339    event_tn: i64,
1340    caches: &OrdersCaches,
1341    account_id: AccountId,
1342    clock: &'static AtomicTime,
1343) -> Option<OrderUpdated> {
1344    let metadata = lookup_order_metadata(order, caches)?;
1345
1346    let client_order_id = metadata.client_order_id;
1347    let trader_id = metadata.trader_id;
1348    let strategy_id = metadata.strategy_id;
1349    let instrument_id = metadata.instrument_id;
1350    let price_precision = metadata.price_precision;
1351    let size_precision = metadata.size_precision;
1352    let pending_trigger_price = metadata.pending_trigger_price;
1353    // Use cached venue_order_id (set by HTTP handler) over the WS event oid,
1354    // because AX may report the old oid in the replaced event
1355    let venue_order_id = metadata
1356        .venue_order_id
1357        .unwrap_or_else(|| VenueOrderId::new(&order.oid));
1358    drop(metadata);
1359
1360    caches
1361        .venue_to_client_id
1362        .insert(venue_order_id, client_order_id);
1363
1364    // Consume the pending trigger price now that the replace is confirmed
1365    if let Some(mut entry) = caches.orders_metadata.get_mut(&client_order_id) {
1366        entry.pending_trigger_price = None;
1367    }
1368
1369    let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1370        .map_err(|e| log::error!("{e}"))
1371        .ok()?;
1372
1373    let quantity = Quantity::new(order.q as f64, size_precision);
1374    let price = Price::from_decimal_dp(order.p, price_precision).ok();
1375
1376    Some(OrderUpdated::new(
1377        trader_id,
1378        strategy_id,
1379        instrument_id,
1380        client_order_id,
1381        quantity,
1382        UUID4::new(),
1383        ts_event,
1384        clock.get_time_ns(),
1385        false,
1386        Some(venue_order_id),
1387        Some(account_id),
1388        price,
1389        pending_trigger_price,
1390        None, // protection_price
1391        false,
1392    ))
1393}
1394
1395pub(crate) fn create_order_filled(
1396    order: &AxWsOrder,
1397    execution: &AxWsTradeExecution,
1398    event_ts: i64,
1399    event_tn: i64,
1400    caches: &OrdersCaches,
1401    account_id: AccountId,
1402    clock: &'static AtomicTime,
1403) -> Option<OrderFilled> {
1404    let venue_order_id = VenueOrderId::new(&order.oid);
1405    let metadata = lookup_order_metadata(order, caches)?;
1406
1407    let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1408        .map_err(|e| log::error!("{e}"))
1409        .ok()?;
1410
1411    let last_qty = Quantity::new(execution.q as f64, metadata.size_precision);
1412    let last_px = Price::from_decimal_dp(execution.p, metadata.price_precision).ok()?;
1413
1414    let order_side: OrderSide = order.d.into();
1415
1416    let liquidity_side = if execution.agg {
1417        LiquiditySide::Taker
1418    } else {
1419        LiquiditySide::Maker
1420    };
1421
1422    Some(OrderFilled::new(
1423        metadata.trader_id,
1424        metadata.strategy_id,
1425        metadata.instrument_id,
1426        metadata.client_order_id,
1427        venue_order_id,
1428        account_id,
1429        TradeId::new(&execution.tid),
1430        order_side,
1431        OrderType::Limit,
1432        last_qty,
1433        last_px,
1434        metadata.quote_currency,
1435        liquidity_side,
1436        UUID4::new(),
1437        ts_event,
1438        clock.get_time_ns(),
1439        false,
1440        None,
1441        None,
1442    ))
1443}
1444
1445pub(crate) fn create_order_canceled(
1446    order: &AxWsOrder,
1447    event_ts: i64,
1448    event_tn: i64,
1449    caches: &OrdersCaches,
1450    account_id: AccountId,
1451    clock: &'static AtomicTime,
1452) -> Option<OrderCanceled> {
1453    let venue_order_id = VenueOrderId::new(&order.oid);
1454    let metadata = lookup_order_metadata(order, caches)?;
1455
1456    let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1457        .map_err(|e| log::error!("{e}"))
1458        .ok()?;
1459
1460    Some(OrderCanceled::new(
1461        metadata.trader_id,
1462        metadata.strategy_id,
1463        metadata.instrument_id,
1464        metadata.client_order_id,
1465        UUID4::new(),
1466        ts_event,
1467        clock.get_time_ns(),
1468        false,
1469        Some(venue_order_id),
1470        Some(account_id),
1471    ))
1472}
1473
1474pub(crate) fn create_order_expired(
1475    order: &AxWsOrder,
1476    event_ts: i64,
1477    event_tn: i64,
1478    caches: &OrdersCaches,
1479    account_id: AccountId,
1480    clock: &'static AtomicTime,
1481) -> Option<OrderExpired> {
1482    let venue_order_id = VenueOrderId::new(&order.oid);
1483    let metadata = lookup_order_metadata(order, caches)?;
1484
1485    let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1486        .map_err(|e| log::error!("{e}"))
1487        .ok()?;
1488
1489    Some(OrderExpired::new(
1490        metadata.trader_id,
1491        metadata.strategy_id,
1492        metadata.instrument_id,
1493        metadata.client_order_id,
1494        UUID4::new(),
1495        ts_event,
1496        clock.get_time_ns(),
1497        false,
1498        Some(venue_order_id),
1499        Some(account_id),
1500    ))
1501}
1502
1503pub(crate) fn create_order_rejected(
1504    order: &AxWsOrder,
1505    reason: &str,
1506    event_ts: i64,
1507    event_tn: i64,
1508    caches: &OrdersCaches,
1509    account_id: AccountId,
1510    clock: &'static AtomicTime,
1511) -> Option<OrderRejected> {
1512    let metadata = lookup_order_metadata(order, caches)?;
1513
1514    let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1515        .map_err(|e| log::error!("{e}"))
1516        .ok()?;
1517    let due_post_only = reason.contains(AX_POST_ONLY_REJECT);
1518
1519    Some(OrderRejected::new(
1520        metadata.trader_id,
1521        metadata.strategy_id,
1522        metadata.instrument_id,
1523        metadata.client_order_id,
1524        account_id,
1525        Ustr::from(reason),
1526        UUID4::new(),
1527        ts_event,
1528        clock.get_time_ns(),
1529        false,
1530        due_post_only,
1531    ))
1532}
1533
1534pub(crate) fn cleanup_terminal_order_tracking(order: &AxWsOrder, caches: &OrdersCaches) {
1535    let venue_order_id = VenueOrderId::new(&order.oid);
1536    let client_order_id = caches
1537        .venue_to_client_id
1538        .remove(&venue_order_id)
1539        .map(|(_, v)| v)
1540        .or_else(|| {
1541            order
1542                .cid
1543                .and_then(|cid| caches.cid_to_client_order_id.remove(&cid).map(|(_, v)| v))
1544        });
1545
1546    if let Some(client_order_id) = client_order_id {
1547        caches.orders_metadata.remove(&client_order_id);
1548    }
1549
1550    if let Some(cid) = order.cid {
1551        caches.cid_to_client_order_id.remove(&cid);
1552    }
1553}
1554
1555#[expect(clippy::too_many_arguments)]
1556fn create_order_status_report(
1557    order: &AxWsOrder,
1558    order_status: OrderStatus,
1559    event_ts: i64,
1560    event_tn: i64,
1561    caches: &OrdersCaches,
1562    account_id: AccountId,
1563    instruments: &AtomicMap<Ustr, InstrumentAny>,
1564    clock: &'static AtomicTime,
1565) -> Option<OrderStatusReport> {
1566    let instruments_snap = instruments.load();
1567    let instrument = instruments_snap.get(&order.s)?;
1568    let venue_order_id = VenueOrderId::new(&order.oid);
1569    let instrument_id = instrument.id();
1570    let order_side = order.d.into();
1571    let time_in_force = order.tif.into();
1572
1573    let quantity = Quantity::new(order.q as f64, instrument.size_precision());
1574    let filled_qty = Quantity::new(order.xq as f64, instrument.size_precision());
1575
1576    let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1577        .map_err(|e| log::error!("{e}"))
1578        .ok()?;
1579    let ts_init = clock.get_time_ns();
1580
1581    let client_order_id = order.cid.map(|cid| {
1582        caches
1583            .cid_to_client_order_id
1584            .get(&cid)
1585            .map_or_else(|| cid_to_client_order_id(cid), |v| *v)
1586    });
1587
1588    let mut report = OrderStatusReport::new(
1589        account_id,
1590        instrument_id,
1591        client_order_id,
1592        venue_order_id,
1593        order_side,
1594        OrderType::Limit,
1595        time_in_force,
1596        order_status,
1597        quantity,
1598        filled_qty,
1599        ts_event,
1600        ts_event,
1601        ts_init,
1602        Some(UUID4::new()),
1603    );
1604
1605    if let Ok(price) = Price::from_decimal_dp(order.p, instrument.price_precision()) {
1606        report = report.with_price(price);
1607    }
1608
1609    Some(report)
1610}
1611
1612#[expect(clippy::too_many_arguments)]
1613fn create_fill_report(
1614    order: &AxWsOrder,
1615    execution: &AxWsTradeExecution,
1616    event_ts: i64,
1617    event_tn: i64,
1618    caches: &OrdersCaches,
1619    account_id: AccountId,
1620    instruments: &AtomicMap<Ustr, InstrumentAny>,
1621    clock: &'static AtomicTime,
1622) -> Option<FillReport> {
1623    let instruments_snap = instruments.load();
1624    let instrument = instruments_snap.get(&order.s)?;
1625    let venue_order_id = VenueOrderId::new(&order.oid);
1626    let instrument_id = instrument.id();
1627    let order_side = order.d.into();
1628
1629    let last_qty = Quantity::new(execution.q as f64, instrument.size_precision());
1630    let last_px = Price::from_decimal_dp(execution.p, instrument.price_precision()).ok()?;
1631
1632    let liquidity_side = if execution.agg {
1633        LiquiditySide::Taker
1634    } else {
1635        LiquiditySide::Maker
1636    };
1637
1638    let ts_event = ax_timestamp_stn_to_unix_nanos(event_ts, event_tn)
1639        .map_err(|e| log::error!("{e}"))
1640        .ok()?;
1641    let ts_init = clock.get_time_ns();
1642
1643    let client_order_id = order.cid.map(|cid| {
1644        caches
1645            .cid_to_client_order_id
1646            .get(&cid)
1647            .map_or_else(|| cid_to_client_order_id(cid), |v| *v)
1648    });
1649
1650    // The WS trade execution payload does not include fee data so
1651    // commission is zero here. The REST /fills endpoint (used during
1652    // reconciliation via parse_fill_report) includes accurate fees.
1653    let commission = Money::new(0.0, instrument.quote_currency());
1654
1655    Some(FillReport::new(
1656        account_id,
1657        instrument_id,
1658        venue_order_id,
1659        TradeId::new(&execution.tid),
1660        order_side,
1661        last_qty,
1662        last_px,
1663        commission,
1664        liquidity_side,
1665        client_order_id,
1666        None,
1667        ts_event,
1668        ts_init,
1669        Some(UUID4::new()),
1670    ))
1671}
1672
1673#[cfg(test)]
1674mod tests {
1675    use std::sync::Arc;
1676
1677    use dashmap::DashMap;
1678    use nautilus_core::time::get_atomic_clock_realtime;
1679    use nautilus_model::{
1680        identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
1681        types::{Currency, Price, Quantity},
1682    };
1683    use rstest::rstest;
1684    use rust_decimal::Decimal;
1685    use rust_decimal_macros::dec;
1686    use ustr::Ustr;
1687
1688    use super::*;
1689    use crate::{
1690        common::enums::{AxOrderSide, AxOrderStatus, AxTimeInForce},
1691        websocket::{
1692            messages::{AxWsTradeExecution, OrderMetadata},
1693            orders::OrdersCaches,
1694        },
1695    };
1696
1697    fn test_caches() -> OrdersCaches {
1698        OrdersCaches {
1699            orders_metadata: Arc::new(DashMap::new()),
1700            venue_to_client_id: Arc::new(DashMap::new()),
1701            cid_to_client_order_id: Arc::new(DashMap::new()),
1702        }
1703    }
1704
1705    fn test_ws_order(oid: &str, price: Decimal, qty: u64) -> AxWsOrder {
1706        AxWsOrder {
1707            oid: oid.to_string(),
1708            u: "user".to_string(),
1709            s: Ustr::from("BTC-PERP"),
1710            p: price,
1711            q: qty,
1712            xq: 0,
1713            rq: qty,
1714            o: AxOrderStatus::Accepted,
1715            d: AxOrderSide::Buy,
1716            tif: AxTimeInForce::Gtc,
1717            ts: 1609459200,
1718            tn: 0,
1719            cid: None,
1720            tag: None,
1721            txt: None,
1722        }
1723    }
1724
1725    #[rstest]
1726    fn test_create_order_updated_uses_cached_venue_order_id() {
1727        let caches = test_caches();
1728        let clock = get_atomic_clock_realtime();
1729        let account_id = AccountId::from("AX-001");
1730        let client_order_id = ClientOrderId::from("O-001");
1731        let new_venue_id = VenueOrderId::new("NEW-OID");
1732        let trigger = Price::from("49000.00");
1733
1734        let metadata = OrderMetadata {
1735            trader_id: TraderId::from("TRADER-001"),
1736            strategy_id: StrategyId::from("S-001"),
1737            instrument_id: InstrumentId::from("BTC-PERP.AX"),
1738            client_order_id,
1739            venue_order_id: Some(new_venue_id),
1740            ts_init: 0.into(),
1741            size_precision: 0,
1742            price_precision: 2,
1743            quote_currency: Currency::USD(),
1744            pending_trigger_price: Some(trigger),
1745        };
1746        caches.orders_metadata.insert(client_order_id, metadata);
1747        caches
1748            .venue_to_client_id
1749            .insert(new_venue_id, client_order_id);
1750
1751        // WS event carries the OLD oid
1752        let ws_order = test_ws_order("OLD-OID", dec!(50500.00), 100);
1753
1754        // Lookup needs cid path since OLD-OID is not in venue_to_client_id.
1755        // Seed it via cid instead.
1756        let cid_value = 42u64;
1757        caches
1758            .cid_to_client_order_id
1759            .insert(cid_value, client_order_id);
1760        let mut ws_order_with_cid = ws_order;
1761        ws_order_with_cid.cid = Some(cid_value);
1762
1763        let event = create_order_updated(
1764            &ws_order_with_cid,
1765            1609459200,
1766            0,
1767            &caches,
1768            account_id,
1769            clock,
1770        )
1771        .expect("should produce OrderUpdated");
1772
1773        // Uses cached NEW-OID, not the WS event's OLD-OID
1774        assert_eq!(event.venue_order_id, Some(new_venue_id));
1775        assert_eq!(event.trigger_price, Some(trigger));
1776        assert_eq!(event.quantity, Quantity::new(100.0, 0));
1777        assert_eq!(event.price, Some(Price::from("50500.00")));
1778
1779        // Pending trigger consumed
1780        let meta = caches.orders_metadata.get(&client_order_id).unwrap();
1781        assert!(meta.pending_trigger_price.is_none());
1782    }
1783
1784    #[rstest]
1785    fn test_create_order_updated_falls_back_to_ws_oid() {
1786        let caches = test_caches();
1787        let clock = get_atomic_clock_realtime();
1788        let account_id = AccountId::from("AX-001");
1789        let client_order_id = ClientOrderId::from("O-002");
1790        let ws_oid = VenueOrderId::new("WS-OID");
1791
1792        let metadata = OrderMetadata {
1793            trader_id: TraderId::from("TRADER-001"),
1794            strategy_id: StrategyId::from("S-001"),
1795            instrument_id: InstrumentId::from("BTC-PERP.AX"),
1796            client_order_id,
1797            venue_order_id: None,
1798            ts_init: 0.into(),
1799            size_precision: 0,
1800            price_precision: 2,
1801            quote_currency: Currency::USD(),
1802            pending_trigger_price: None,
1803        };
1804        caches.orders_metadata.insert(client_order_id, metadata);
1805        caches.venue_to_client_id.insert(ws_oid, client_order_id);
1806
1807        let ws_order = test_ws_order("WS-OID", dec!(50500.00), 200);
1808
1809        let event = create_order_updated(&ws_order, 1609459200, 0, &caches, account_id, clock)
1810            .expect("should produce OrderUpdated");
1811
1812        assert_eq!(event.venue_order_id, Some(ws_oid));
1813        assert!(event.trigger_price.is_none());
1814    }
1815
1816    fn test_metadata(client_order_id: ClientOrderId, instrument_id: InstrumentId) -> OrderMetadata {
1817        OrderMetadata {
1818            trader_id: TraderId::from("TRADER-001"),
1819            strategy_id: StrategyId::from("S-001"),
1820            instrument_id,
1821            client_order_id,
1822            venue_order_id: None,
1823            ts_init: 0.into(),
1824            size_precision: 0,
1825            price_precision: 2,
1826            quote_currency: Currency::USD(),
1827            pending_trigger_price: None,
1828        }
1829    }
1830
1831    fn test_execution(tid: &str, price: Decimal, qty: u64, agg: bool) -> AxWsTradeExecution {
1832        AxWsTradeExecution {
1833            tid: tid.to_string(),
1834            s: Ustr::from("BTC-PERP"),
1835            q: qty,
1836            p: price,
1837            d: AxOrderSide::Buy,
1838            agg,
1839        }
1840    }
1841
1842    #[rstest]
1843    fn test_create_order_accepted_populates_cache_and_event() {
1844        let caches = test_caches();
1845        let clock = get_atomic_clock_realtime();
1846        let account_id = AccountId::from("AX-001");
1847        let client_order_id = ClientOrderId::from("O-ACK");
1848        let instrument_id = InstrumentId::from("BTC-PERP.AX");
1849        let venue_order_id = VenueOrderId::new("OID-ACK");
1850
1851        caches.orders_metadata.insert(
1852            client_order_id,
1853            test_metadata(client_order_id, instrument_id),
1854        );
1855        let cid_value = 7u64;
1856        caches
1857            .cid_to_client_order_id
1858            .insert(cid_value, client_order_id);
1859
1860        let mut ws_order = test_ws_order(venue_order_id.as_str(), dec!(50500.00), 100);
1861        ws_order.cid = Some(cid_value);
1862
1863        let event = create_order_accepted(&ws_order, 1609459200, 500, &caches, account_id, clock)
1864            .expect("should produce OrderAccepted");
1865
1866        assert_eq!(event.venue_order_id, venue_order_id);
1867        assert_eq!(event.client_order_id, client_order_id);
1868        assert_eq!(event.account_id, account_id);
1869        assert_eq!(event.instrument_id, instrument_id);
1870        assert_eq!(event.trader_id, TraderId::from("TRADER-001"));
1871        assert_eq!(event.strategy_id, StrategyId::from("S-001"));
1872        assert_eq!(
1873            event.ts_event,
1874            UnixNanos::from(1_609_459_200_000_000_500u64)
1875        );
1876
1877        // Side effects on caches
1878        assert_eq!(
1879            *caches.venue_to_client_id.get(&venue_order_id).unwrap(),
1880            client_order_id,
1881        );
1882        let meta = caches.orders_metadata.get(&client_order_id).unwrap();
1883        assert_eq!(meta.venue_order_id, Some(venue_order_id));
1884    }
1885
1886    #[rstest]
1887    fn test_create_order_accepted_returns_none_without_metadata() {
1888        let caches = test_caches();
1889        let clock = get_atomic_clock_realtime();
1890        let account_id = AccountId::from("AX-001");
1891        let ws_order = test_ws_order("OID-UNKNOWN", dec!(100.00), 10);
1892
1893        let result = create_order_accepted(&ws_order, 1609459200, 0, &caches, account_id, clock);
1894        assert!(result.is_none());
1895        assert!(caches.venue_to_client_id.is_empty());
1896    }
1897
1898    #[rstest]
1899    fn test_lookup_order_metadata_cid_fallback() {
1900        let caches = test_caches();
1901        let client_order_id = ClientOrderId::from("O-CID");
1902        let instrument_id = InstrumentId::from("BTC-PERP.AX");
1903        caches.orders_metadata.insert(
1904            client_order_id,
1905            test_metadata(client_order_id, instrument_id),
1906        );
1907        caches.cid_to_client_order_id.insert(99, client_order_id);
1908
1909        let mut ws_order = test_ws_order("UNKNOWN-OID", dec!(0), 0);
1910        ws_order.cid = Some(99);
1911
1912        let found = lookup_order_metadata(&ws_order, &caches).expect("cid fallback should find");
1913        assert_eq!(found.client_order_id, client_order_id);
1914    }
1915
1916    #[rstest]
1917    fn test_lookup_order_metadata_returns_none_when_unknown() {
1918        let caches = test_caches();
1919        let ws_order = test_ws_order("UNKNOWN-OID", dec!(0), 0);
1920        assert!(lookup_order_metadata(&ws_order, &caches).is_none());
1921    }
1922
1923    #[rstest]
1924    #[case(true, LiquiditySide::Taker)]
1925    #[case(false, LiquiditySide::Maker)]
1926    fn test_create_order_filled_maps_liquidity_side(
1927        #[case] agg: bool,
1928        #[case] expected: LiquiditySide,
1929    ) {
1930        let caches = test_caches();
1931        let clock = get_atomic_clock_realtime();
1932        let account_id = AccountId::from("AX-001");
1933        let client_order_id = ClientOrderId::from("O-FILL");
1934        let instrument_id = InstrumentId::from("BTC-PERP.AX");
1935        let venue_order_id = VenueOrderId::new("OID-FILL");
1936
1937        caches.orders_metadata.insert(
1938            client_order_id,
1939            test_metadata(client_order_id, instrument_id),
1940        );
1941        caches
1942            .venue_to_client_id
1943            .insert(venue_order_id, client_order_id);
1944
1945        let order = test_ws_order(venue_order_id.as_str(), dec!(50500.00), 100);
1946        let execution = test_execution("TID-1", dec!(50500.00), 25, agg);
1947
1948        let event = create_order_filled(
1949            &order, &execution, 1609459200, 0, &caches, account_id, clock,
1950        )
1951        .expect("should produce OrderFilled");
1952
1953        assert_eq!(event.venue_order_id, venue_order_id);
1954        assert_eq!(event.client_order_id, client_order_id);
1955        assert_eq!(event.trade_id, TradeId::new("TID-1"));
1956        assert_eq!(event.last_qty, Quantity::new(25.0, 0));
1957        assert_eq!(event.last_px, Price::from("50500.00"));
1958        assert_eq!(event.liquidity_side, expected);
1959    }
1960
1961    #[rstest]
1962    fn test_create_order_canceled_populates_identifiers() {
1963        let caches = test_caches();
1964        let clock = get_atomic_clock_realtime();
1965        let account_id = AccountId::from("AX-001");
1966        let client_order_id = ClientOrderId::from("O-CXL");
1967        let instrument_id = InstrumentId::from("BTC-PERP.AX");
1968        let venue_order_id = VenueOrderId::new("OID-CXL");
1969
1970        caches.orders_metadata.insert(
1971            client_order_id,
1972            test_metadata(client_order_id, instrument_id),
1973        );
1974        caches
1975            .venue_to_client_id
1976            .insert(venue_order_id, client_order_id);
1977
1978        let order = test_ws_order(venue_order_id.as_str(), dec!(100.00), 10);
1979        let event = create_order_canceled(&order, 1609459200, 0, &caches, account_id, clock)
1980            .expect("should produce OrderCanceled");
1981
1982        assert_eq!(event.venue_order_id, Some(venue_order_id));
1983        assert_eq!(event.client_order_id, client_order_id);
1984        assert_eq!(event.account_id, Some(account_id));
1985        assert_eq!(event.instrument_id, instrument_id);
1986    }
1987
1988    #[rstest]
1989    fn test_create_order_expired_populates_identifiers() {
1990        let caches = test_caches();
1991        let clock = get_atomic_clock_realtime();
1992        let account_id = AccountId::from("AX-001");
1993        let client_order_id = ClientOrderId::from("O-EXP");
1994        let instrument_id = InstrumentId::from("BTC-PERP.AX");
1995        let venue_order_id = VenueOrderId::new("OID-EXP");
1996
1997        caches.orders_metadata.insert(
1998            client_order_id,
1999            test_metadata(client_order_id, instrument_id),
2000        );
2001        caches
2002            .venue_to_client_id
2003            .insert(venue_order_id, client_order_id);
2004
2005        let order = test_ws_order(venue_order_id.as_str(), dec!(100.00), 10);
2006        let event = create_order_expired(&order, 1609459200, 0, &caches, account_id, clock)
2007            .expect("should produce OrderExpired");
2008
2009        assert_eq!(event.venue_order_id, Some(venue_order_id));
2010        assert_eq!(event.client_order_id, client_order_id);
2011    }
2012
2013    #[rstest]
2014    fn test_create_order_rejected_sets_due_post_only_when_reason_matches() {
2015        let caches = test_caches();
2016        let clock = get_atomic_clock_realtime();
2017        let account_id = AccountId::from("AX-001");
2018        let client_order_id = ClientOrderId::from("O-REJ");
2019        let instrument_id = InstrumentId::from("BTC-PERP.AX");
2020
2021        caches.orders_metadata.insert(
2022            client_order_id,
2023            test_metadata(client_order_id, instrument_id),
2024        );
2025        caches
2026            .venue_to_client_id
2027            .insert(VenueOrderId::new("OID-REJ"), client_order_id);
2028
2029        let order = test_ws_order("OID-REJ", dec!(100.00), 10);
2030        let reason = AX_POST_ONLY_REJECT;
2031        let event =
2032            create_order_rejected(&order, reason, 1609459200, 0, &caches, account_id, clock)
2033                .expect("should produce OrderRejected");
2034
2035        assert_eq!(event.due_post_only, 1, "post-only reason should set flag");
2036        assert_eq!(event.reason, Ustr::from(reason));
2037    }
2038
2039    #[rstest]
2040    fn test_create_order_rejected_clears_due_post_only_for_other_reasons() {
2041        let caches = test_caches();
2042        let clock = get_atomic_clock_realtime();
2043        let account_id = AccountId::from("AX-001");
2044        let client_order_id = ClientOrderId::from("O-REJ-2");
2045        let instrument_id = InstrumentId::from("BTC-PERP.AX");
2046
2047        caches.orders_metadata.insert(
2048            client_order_id,
2049            test_metadata(client_order_id, instrument_id),
2050        );
2051        caches
2052            .venue_to_client_id
2053            .insert(VenueOrderId::new("OID-REJ-2"), client_order_id);
2054
2055        let order = test_ws_order("OID-REJ-2", dec!(100.00), 10);
2056        let event = create_order_rejected(
2057            &order,
2058            "INSUFFICIENT_MARGIN",
2059            1609459200,
2060            0,
2061            &caches,
2062            account_id,
2063            clock,
2064        )
2065        .expect("should produce OrderRejected");
2066
2067        assert_eq!(event.due_post_only, 0);
2068        assert_eq!(event.reason, Ustr::from("INSUFFICIENT_MARGIN"));
2069    }
2070
2071    #[rstest]
2072    fn test_cleanup_terminal_order_tracking_removes_all_caches() {
2073        let caches = test_caches();
2074        let client_order_id = ClientOrderId::from("O-CLEAN");
2075        let instrument_id = InstrumentId::from("BTC-PERP.AX");
2076        let venue_order_id = VenueOrderId::new("OID-CLEAN");
2077        let cid_value = 123u64;
2078
2079        caches.orders_metadata.insert(
2080            client_order_id,
2081            test_metadata(client_order_id, instrument_id),
2082        );
2083        caches
2084            .venue_to_client_id
2085            .insert(venue_order_id, client_order_id);
2086        caches
2087            .cid_to_client_order_id
2088            .insert(cid_value, client_order_id);
2089
2090        let mut order = test_ws_order(venue_order_id.as_str(), dec!(100.00), 10);
2091        order.cid = Some(cid_value);
2092
2093        cleanup_terminal_order_tracking(&order, &caches);
2094
2095        assert!(caches.orders_metadata.is_empty());
2096        assert!(caches.venue_to_client_id.is_empty());
2097        assert!(caches.cid_to_client_order_id.is_empty());
2098    }
2099
2100    #[rstest]
2101    fn test_cleanup_terminal_order_tracking_via_cid_when_venue_missing() {
2102        let caches = test_caches();
2103        let client_order_id = ClientOrderId::from("O-CLEAN-CID");
2104        let instrument_id = InstrumentId::from("BTC-PERP.AX");
2105        let cid_value = 321u64;
2106
2107        caches.orders_metadata.insert(
2108            client_order_id,
2109            test_metadata(client_order_id, instrument_id),
2110        );
2111        caches
2112            .cid_to_client_order_id
2113            .insert(cid_value, client_order_id);
2114
2115        // Venue id missing from cache
2116        let mut order = test_ws_order("OID-UNKNOWN", dec!(100.00), 10);
2117        order.cid = Some(cid_value);
2118
2119        cleanup_terminal_order_tracking(&order, &caches);
2120
2121        assert!(caches.orders_metadata.is_empty());
2122        assert!(caches.cid_to_client_order_id.is_empty());
2123    }
2124
2125    #[rstest]
2126    fn test_cleanup_terminal_order_tracking_noop_when_unknown() {
2127        let caches = test_caches();
2128        let other = ClientOrderId::from("OTHER");
2129        let instrument_id = InstrumentId::from("BTC-PERP.AX");
2130        caches
2131            .orders_metadata
2132            .insert(other, test_metadata(other, instrument_id));
2133
2134        let order = test_ws_order("OID-NOT-TRACKED", dec!(100.00), 10);
2135        cleanup_terminal_order_tracking(&order, &caches);
2136
2137        // Unrelated metadata still present
2138        assert_eq!(caches.orders_metadata.len(), 1);
2139    }
2140
2141    #[rstest]
2142    fn test_cancel_on_disconnect_url_no_existing_query() {
2143        let mut url = "wss://example.com/orders/ws".to_string();
2144        let separator = if url.contains('?') { "&" } else { "?" };
2145        url.push_str(&format!("{separator}cancel_on_disconnect=true"));
2146        assert_eq!(url, "wss://example.com/orders/ws?cancel_on_disconnect=true");
2147    }
2148
2149    #[rstest]
2150    fn test_cancel_on_disconnect_url_with_existing_query() {
2151        let mut url = "wss://example.com/orders/ws?token=abc".to_string();
2152        let separator = if url.contains('?') { "&" } else { "?" };
2153        url.push_str(&format!("{separator}cancel_on_disconnect=true"));
2154        assert_eq!(
2155            url,
2156            "wss://example.com/orders/ws?token=abc&cancel_on_disconnect=true"
2157        );
2158    }
2159}