Skip to main content

nautilus_bitmex/broadcast/
canceller.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Cancel request broadcaster for redundant order cancellation.
17//!
18//! This module provides the [`CancelBroadcaster`] which fans out cancel requests
19//! to multiple HTTP clients in parallel for redundancy. Key design patterns:
20//!
21//! - **Dependency injection via traits**: Uses `CancelExecutor` trait to abstract
22//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
23//! - **Trait objects over generics**: Uses `Arc<dyn CancelExecutor>` to avoid
24//!   generic type parameters on the public API (simpler Python FFI).
25//! - **Short-circuit on first success**: Aborts remaining requests once any client
26//!   succeeds, minimizing latency.
27//! - **Idempotent success handling**: Recognizes "already cancelled" responses as
28//!   successful outcomes.
29
30// TODO: Replace boxed futures in `CancelExecutor` once stable async trait object support
31// lands so we can drop the per-call heap allocation
32
33use std::{
34    fmt::Debug,
35    future::Future,
36    pin::Pin,
37    sync::{
38        Arc,
39        atomic::{AtomicBool, AtomicU64, Ordering},
40    },
41    time::Duration,
42};
43
44use futures_util::future;
45use nautilus_common::live::get_runtime;
46use nautilus_model::{
47    enums::OrderSide,
48    identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
49    instruments::InstrumentAny,
50    reports::OrderStatusReport,
51};
52use tokio::{sync::RwLock, task::JoinHandle, time::interval};
53
54use crate::{
55    common::{consts::BITMEX_HTTP_TESTNET_URL, enums::BitmexEnvironment},
56    http::client::BitmexHttpClient,
57};
58
59const IDEMPOTENT_ALREADY_CANCELED: &str = "AlreadyCanceled";
60const IDEMPOTENT_ORDER_NOT_FOUND: &str = "orderID not found";
61const IDEMPOTENT_UNABLE_DUE_TO_STATE: &str = "Unable to cancel order due to existing state";
62
63/// Trait for order cancellation operations.
64///
65/// This trait abstracts the execution layer to enable dependency injection and testing
66/// without conditional compilation. The broadcaster holds executors as `Arc<dyn CancelExecutor>`
67/// to avoid generic type parameters that would complicate the Python FFI boundary.
68///
69/// # Thread Safety
70///
71/// All methods must be safe to call concurrently from multiple threads. Implementations
72/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
73///
74/// # Error Handling
75///
76/// Methods return `anyhow::Result` for flexibility. Implementers should provide
77/// meaningful error messages that can be logged and tracked by the broadcaster.
78///
79/// # Implementation Note
80///
81/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
82/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
83/// `Clone`) to be used without modification.
84trait CancelExecutor: Send + Sync {
85    /// Adds an instrument for caching.
86    fn add_instrument(&self, instrument: InstrumentAny);
87
88    /// Performs a health check on the executor.
89    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
90
91    /// Cancels a single order.
92    fn cancel_order(
93        &self,
94        instrument_id: InstrumentId,
95        client_order_id: Option<ClientOrderId>,
96        venue_order_id: Option<VenueOrderId>,
97    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
98
99    /// Cancels multiple orders.
100    fn cancel_orders(
101        &self,
102        instrument_id: InstrumentId,
103        client_order_ids: Option<Vec<ClientOrderId>>,
104        venue_order_ids: Option<Vec<VenueOrderId>>,
105    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
106
107    /// Cancels all orders for an instrument.
108    fn cancel_all_orders(
109        &self,
110        instrument_id: InstrumentId,
111        order_side: Option<OrderSide>,
112    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
113}
114
115impl CancelExecutor for BitmexHttpClient {
116    fn add_instrument(&self, instrument: InstrumentAny) {
117        Self::cache_instrument(self, instrument);
118    }
119
120    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
121        Box::pin(async move {
122            Self::get_server_time(self)
123                .await
124                .map(|_| ())
125                .map_err(|e| anyhow::anyhow!("{e}"))
126        })
127    }
128
129    fn cancel_order(
130        &self,
131        instrument_id: InstrumentId,
132        client_order_id: Option<ClientOrderId>,
133        venue_order_id: Option<VenueOrderId>,
134    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
135        Box::pin(async move {
136            Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
137        })
138    }
139
140    fn cancel_orders(
141        &self,
142        instrument_id: InstrumentId,
143        client_order_ids: Option<Vec<ClientOrderId>>,
144        venue_order_ids: Option<Vec<VenueOrderId>>,
145    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
146        Box::pin(async move {
147            Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
148        })
149    }
150
151    fn cancel_all_orders(
152        &self,
153        instrument_id: InstrumentId,
154        order_side: Option<OrderSide>,
155    ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
156        Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
157    }
158}
159
160/// Configuration for the cancel broadcaster.
161#[derive(Debug, Clone)]
162pub struct CancelBroadcasterConfig {
163    /// Number of HTTP clients in the pool.
164    pub pool_size: usize,
165    /// BitMEX API key (None will source from environment).
166    pub api_key: Option<String>,
167    /// BitMEX API secret (None will source from environment).
168    pub api_secret: Option<String>,
169    /// Base URL for BitMEX HTTP API.
170    pub base_url: Option<String>,
171    /// BitMEX environment (mainnet or testnet).
172    pub environment: BitmexEnvironment,
173    /// Timeout in seconds for HTTP requests.
174    pub timeout_secs: u64,
175    /// Maximum number of retry attempts for failed requests.
176    pub max_retries: u32,
177    /// Initial delay in milliseconds between retry attempts.
178    pub retry_delay_ms: u64,
179    /// Maximum delay in milliseconds between retry attempts.
180    pub retry_delay_max_ms: u64,
181    /// Expiration window in milliseconds for signed requests.
182    pub recv_window_ms: u64,
183    /// Maximum REST burst rate (requests per second).
184    pub max_requests_per_second: u32,
185    /// Maximum REST rolling rate (requests per minute).
186    pub max_requests_per_minute: u32,
187    /// Interval in seconds between health check pings.
188    pub health_check_interval_secs: u64,
189    /// Timeout in seconds for health check requests.
190    pub health_check_timeout_secs: u64,
191    /// Substrings to identify expected cancel rejections for debug-level logging.
192    pub expected_reject_patterns: Vec<String>,
193    /// Substrings to identify idempotent success (order already cancelled/not found).
194    pub idempotent_success_patterns: Vec<String>,
195    /// Optional list of proxy URLs for path diversity.
196    ///
197    /// Each transport instance uses the proxy at its index. If the list is shorter
198    /// than pool_size, remaining transports will use no proxy. If longer, extra proxies
199    /// are ignored.
200    pub proxy_urls: Vec<Option<String>>,
201}
202
203impl Default for CancelBroadcasterConfig {
204    fn default() -> Self {
205        Self {
206            pool_size: 2,
207            api_key: None,
208            api_secret: None,
209            base_url: None,
210            environment: BitmexEnvironment::Mainnet,
211            timeout_secs: 60,
212            max_retries: 3,
213            retry_delay_ms: 1_000,
214            retry_delay_max_ms: 5_000,
215            recv_window_ms: 10_000,
216            max_requests_per_second: 10,
217            max_requests_per_minute: 120,
218            health_check_interval_secs: 30,
219            health_check_timeout_secs: 5,
220            expected_reject_patterns: vec![
221                "Order had execInst of ParticipateDoNotInitiate".to_string(),
222            ],
223            idempotent_success_patterns: vec![
224                IDEMPOTENT_ALREADY_CANCELED.to_string(),
225                IDEMPOTENT_ORDER_NOT_FOUND.to_string(),
226                IDEMPOTENT_UNABLE_DUE_TO_STATE.to_string(),
227            ],
228            proxy_urls: vec![],
229        }
230    }
231}
232
233/// Transport client wrapper with health monitoring.
234#[derive(Clone)]
235struct TransportClient {
236    /// Executor wrapped in Arc to enable cloning without requiring Clone on CancelExecutor.
237    ///
238    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
239    /// the executor across multiple TransportClient clones.
240    executor: Arc<dyn CancelExecutor>,
241    client_id: String,
242    healthy: Arc<AtomicBool>,
243    cancel_count: Arc<AtomicU64>,
244    error_count: Arc<AtomicU64>,
245}
246
247impl Debug for TransportClient {
248    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249        f.debug_struct(stringify!(TransportClient))
250            .field("client_id", &self.client_id)
251            .field("healthy", &self.healthy)
252            .field("cancel_count", &self.cancel_count)
253            .field("error_count", &self.error_count)
254            .finish()
255    }
256}
257
258impl TransportClient {
259    fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
260        Self {
261            executor: Arc::new(executor),
262            client_id,
263            healthy: Arc::new(AtomicBool::new(true)),
264            cancel_count: Arc::new(AtomicU64::new(0)),
265            error_count: Arc::new(AtomicU64::new(0)),
266        }
267    }
268
269    fn is_healthy(&self) -> bool {
270        self.healthy.load(Ordering::Relaxed)
271    }
272
273    fn mark_healthy(&self) {
274        self.healthy.store(true, Ordering::Relaxed);
275    }
276
277    fn mark_unhealthy(&self) {
278        self.healthy.store(false, Ordering::Relaxed);
279    }
280
281    fn get_cancel_count(&self) -> u64 {
282        self.cancel_count.load(Ordering::Relaxed)
283    }
284
285    fn get_error_count(&self) -> u64 {
286        self.error_count.load(Ordering::Relaxed)
287    }
288
289    async fn health_check(&self, timeout_secs: u64) -> bool {
290        match tokio::time::timeout(
291            Duration::from_secs(timeout_secs),
292            self.executor.health_check(),
293        )
294        .await
295        {
296            Ok(Ok(())) => {
297                self.mark_healthy();
298                true
299            }
300            Ok(Err(e)) => {
301                log::warn!("Health check failed for client {}: {e:?}", self.client_id);
302                self.mark_unhealthy();
303                false
304            }
305            Err(_) => {
306                log::warn!("Health check timeout for client {}", self.client_id);
307                self.mark_unhealthy();
308                false
309            }
310        }
311    }
312
313    async fn cancel_order(
314        &self,
315        instrument_id: InstrumentId,
316        client_order_id: Option<ClientOrderId>,
317        venue_order_id: Option<VenueOrderId>,
318    ) -> anyhow::Result<OrderStatusReport> {
319        self.cancel_count.fetch_add(1, Ordering::Relaxed);
320
321        match self
322            .executor
323            .cancel_order(instrument_id, client_order_id, venue_order_id)
324            .await
325        {
326            Ok(report) => {
327                self.mark_healthy();
328                Ok(report)
329            }
330            Err(e) => {
331                self.error_count.fetch_add(1, Ordering::Relaxed);
332                Err(e)
333            }
334        }
335    }
336}
337
338/// Broadcasts cancel requests to multiple HTTP clients for redundancy.
339///
340/// This broadcaster fans out cancel requests to multiple pre-warmed HTTP clients
341/// in parallel, short-circuits when the first successful acknowledgement is received,
342/// and handles expected rejection patterns with appropriate log levels.
343#[cfg_attr(feature = "python", pyo3::pyclass)]
344#[cfg_attr(
345    feature = "python",
346    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bitmex")
347)]
348#[derive(Debug)]
349pub struct CancelBroadcaster {
350    config: CancelBroadcasterConfig,
351    transports: Arc<Vec<TransportClient>>,
352    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
353    running: Arc<AtomicBool>,
354    total_cancels: Arc<AtomicU64>,
355    successful_cancels: Arc<AtomicU64>,
356    failed_cancels: Arc<AtomicU64>,
357    expected_rejects: Arc<AtomicU64>,
358    idempotent_successes: Arc<AtomicU64>,
359}
360
361impl CancelBroadcaster {
362    /// Creates a new [`CancelBroadcaster`] with internal HTTP client pool.
363    ///
364    /// # Errors
365    ///
366    /// Returns an error if any HTTP client fails to initialize.
367    pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
368        let mut transports = Vec::with_capacity(config.pool_size);
369
370        let base_url = match config.environment {
371            BitmexEnvironment::Testnet if config.base_url.is_none() => {
372                Some(BITMEX_HTTP_TESTNET_URL.to_string())
373            }
374            _ => config.base_url.clone(),
375        };
376
377        for i in 0..config.pool_size {
378            // Assign proxy from config list, or None if index exceeds list length
379            let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
380
381            let client = BitmexHttpClient::with_credentials(
382                config.api_key.clone(),
383                config.api_secret.clone(),
384                base_url.clone(),
385                config.timeout_secs,
386                config.max_retries,
387                config.retry_delay_ms,
388                config.retry_delay_max_ms,
389                config.recv_window_ms,
390                config.max_requests_per_second,
391                config.max_requests_per_minute,
392                proxy_url,
393            )
394            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
395
396            transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
397        }
398
399        Ok(Self {
400            config,
401            transports: Arc::new(transports),
402            health_check_task: Arc::new(RwLock::new(None)),
403            running: Arc::new(AtomicBool::new(false)),
404            total_cancels: Arc::new(AtomicU64::new(0)),
405            successful_cancels: Arc::new(AtomicU64::new(0)),
406            failed_cancels: Arc::new(AtomicU64::new(0)),
407            expected_rejects: Arc::new(AtomicU64::new(0)),
408            idempotent_successes: Arc::new(AtomicU64::new(0)),
409        })
410    }
411
412    /// Starts the broadcaster and health check loop.
413    ///
414    /// # Errors
415    ///
416    /// Returns an error if the broadcaster is already running.
417    pub async fn start(&self) -> anyhow::Result<()> {
418        if self.running.load(Ordering::Relaxed) {
419            return Ok(());
420        }
421
422        self.running.store(true, Ordering::Relaxed);
423
424        // Initial health check for all clients
425        self.run_health_checks().await;
426
427        // Start periodic health check task
428        let transports = Arc::clone(&self.transports);
429        let running = Arc::clone(&self.running);
430        let interval_secs = self.config.health_check_interval_secs;
431        let timeout_secs = self.config.health_check_timeout_secs;
432
433        let task = get_runtime().spawn(async move {
434            let mut ticker = interval(Duration::from_secs(interval_secs));
435            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
436
437            loop {
438                ticker.tick().await;
439
440                if !running.load(Ordering::Relaxed) {
441                    break;
442                }
443
444                let tasks: Vec<_> = transports
445                    .iter()
446                    .map(|t| t.health_check(timeout_secs))
447                    .collect();
448
449                let results = future::join_all(tasks).await;
450                let healthy_count = results.iter().filter(|&&r| r).count();
451
452                log::debug!(
453                    "Health check complete: {}/{} clients healthy",
454                    healthy_count,
455                    results.len()
456                );
457            }
458        });
459
460        *self.health_check_task.write().await = Some(task);
461
462        log::info!(
463            "CancelBroadcaster started with {} clients",
464            self.transports.len()
465        );
466
467        Ok(())
468    }
469
470    /// Stops the broadcaster and health check loop.
471    pub async fn stop(&self) {
472        if !self.running.load(Ordering::Relaxed) {
473            return;
474        }
475
476        self.running.store(false, Ordering::Relaxed);
477
478        if let Some(task) = self.health_check_task.write().await.take() {
479            task.abort();
480        }
481
482        log::info!("CancelBroadcaster stopped");
483    }
484
485    async fn run_health_checks(&self) {
486        let tasks: Vec<_> = self
487            .transports
488            .iter()
489            .map(|t| t.health_check(self.config.health_check_timeout_secs))
490            .collect();
491
492        let results = future::join_all(tasks).await;
493        let healthy_count = results.iter().filter(|&&r| r).count();
494
495        log::debug!(
496            "Health check complete: {}/{} clients healthy",
497            healthy_count,
498            results.len()
499        );
500    }
501
502    fn is_expected_reject(&self, error_message: &str) -> bool {
503        self.config
504            .expected_reject_patterns
505            .iter()
506            .any(|pattern| error_message.contains(pattern))
507    }
508
509    fn is_idempotent_success(&self, error_message: &str) -> bool {
510        self.config
511            .idempotent_success_patterns
512            .iter()
513            .any(|pattern| error_message.contains(pattern))
514    }
515
516    /// Processes cancel request results, handling success, idempotent success, and failures.
517    ///
518    /// This helper consolidates the common error handling loop used across all broadcast methods.
519    async fn process_cancel_results<T>(
520        &self,
521        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
522        idempotent_result: impl FnOnce() -> anyhow::Result<T>,
523        operation: &str,
524        params: String,
525        idempotent_reason: &str,
526    ) -> anyhow::Result<T>
527    where
528        T: Send + 'static,
529    {
530        let mut errors = Vec::new();
531
532        while !handles.is_empty() {
533            let current_handles = std::mem::take(&mut handles);
534            let (result, _idx, remaining) = future::select_all(current_handles).await;
535            handles = remaining.into_iter().collect();
536
537            match result {
538                Ok((client_id, Ok(result))) => {
539                    // First success - abort remaining handles
540                    for handle in &handles {
541                        handle.abort();
542                    }
543                    self.successful_cancels.fetch_add(1, Ordering::Relaxed);
544
545                    log::debug!("{operation} broadcast succeeded [{client_id}] {params}");
546
547                    return Ok(result);
548                }
549                Ok((client_id, Err(e))) => {
550                    let error_msg = e.to_string();
551
552                    if self.is_idempotent_success(&error_msg) {
553                        // First idempotent success - abort remaining handles and return success
554                        for handle in &handles {
555                            handle.abort();
556                        }
557                        self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
558
559                        log::debug!(
560                            "Idempotent success [{client_id}] - {idempotent_reason}: {error_msg} {params}",
561                        );
562
563                        return idempotent_result();
564                    }
565
566                    if self.is_expected_reject(&error_msg) {
567                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
568                        log::debug!(
569                            "Expected {} rejection [{}]: {} {}",
570                            operation.to_lowercase(),
571                            client_id,
572                            error_msg,
573                            params
574                        );
575                        errors.push(error_msg);
576                    } else {
577                        log::warn!(
578                            "{operation} request failed [{client_id}]: {error_msg} {params}"
579                        );
580                        errors.push(error_msg);
581                    }
582                }
583                Err(e) => {
584                    log::warn!("{operation} task join error: {e:?}");
585                    errors.push(format!("Task panicked: {e:?}"));
586                }
587            }
588        }
589
590        // All tasks failed
591        self.failed_cancels.fetch_add(1, Ordering::Relaxed);
592        log::error!(
593            "All {} requests failed: {errors:?} {params}",
594            operation.to_lowercase(),
595        );
596        Err(anyhow::anyhow!(
597            "All {} requests failed: {errors:?}",
598            operation.to_lowercase(),
599        ))
600    }
601
602    /// Broadcasts a single cancel request to all healthy clients in parallel.
603    ///
604    /// # Returns
605    ///
606    /// - `Ok(Some(report))` if successfully cancelled with a report.
607    /// - `Ok(None)` if the order was already cancelled (idempotent success).
608    /// - `Err` if all requests failed.
609    ///
610    /// # Errors
611    ///
612    /// Returns an error if all cancel requests fail or no healthy clients are available.
613    pub async fn broadcast_cancel(
614        &self,
615        instrument_id: InstrumentId,
616        client_order_id: Option<ClientOrderId>,
617        venue_order_id: Option<VenueOrderId>,
618    ) -> anyhow::Result<Option<OrderStatusReport>> {
619        self.total_cancels.fetch_add(1, Ordering::Relaxed);
620
621        let healthy_transports: Vec<TransportClient> = self
622            .transports
623            .iter()
624            .filter(|t| t.is_healthy())
625            .cloned()
626            .collect();
627
628        if healthy_transports.is_empty() {
629            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
630            anyhow::bail!("No healthy transport clients available");
631        }
632
633        let mut handles = Vec::new();
634
635        for transport in healthy_transports {
636            let handle = get_runtime().spawn(async move {
637                let client_id = transport.client_id.clone();
638                let result = transport
639                    .cancel_order(instrument_id, client_order_id, venue_order_id)
640                    .await
641                    .map(Some); // Wrap success in Some for Option<OrderStatusReport>
642                (client_id, result)
643            });
644            handles.push(handle);
645        }
646
647        self.process_cancel_results(
648            handles,
649            || Ok(None),
650            "Cancel",
651            format!("(client_order_id={client_order_id:?}, venue_order_id={venue_order_id:?})"),
652            "order already cancelled/not found",
653        )
654        .await
655    }
656
657    /// Broadcasts a batch cancel request to all healthy clients in parallel.
658    ///
659    /// # Errors
660    ///
661    /// Returns an error if all cancel requests fail or no healthy clients are available.
662    pub async fn broadcast_batch_cancel(
663        &self,
664        instrument_id: InstrumentId,
665        client_order_ids: Option<Vec<ClientOrderId>>,
666        venue_order_ids: Option<Vec<VenueOrderId>>,
667    ) -> anyhow::Result<Vec<OrderStatusReport>> {
668        self.total_cancels.fetch_add(1, Ordering::Relaxed);
669
670        let healthy_transports: Vec<TransportClient> = self
671            .transports
672            .iter()
673            .filter(|t| t.is_healthy())
674            .cloned()
675            .collect();
676
677        if healthy_transports.is_empty() {
678            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
679            anyhow::bail!("No healthy transport clients available");
680        }
681
682        let mut handles = Vec::new();
683
684        for transport in healthy_transports {
685            let client_order_ids_clone = client_order_ids.clone();
686            let venue_order_ids_clone = venue_order_ids.clone();
687            let handle = get_runtime().spawn(async move {
688                let client_id = transport.client_id.clone();
689                let result = transport
690                    .executor
691                    .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
692                    .await;
693                (client_id, result)
694            });
695            handles.push(handle);
696        }
697
698        self.process_cancel_results(
699            handles,
700            || Ok(Vec::new()),
701            "Batch cancel",
702            format!("(client_order_ids={client_order_ids:?}, venue_order_ids={venue_order_ids:?})"),
703            "orders already cancelled/not found",
704        )
705        .await
706    }
707
708    /// Broadcasts a cancel all request to all healthy clients in parallel.
709    ///
710    /// # Errors
711    ///
712    /// Returns an error if all cancel requests fail or no healthy clients are available.
713    pub async fn broadcast_cancel_all(
714        &self,
715        instrument_id: InstrumentId,
716        order_side: Option<OrderSide>,
717    ) -> anyhow::Result<Vec<OrderStatusReport>> {
718        self.total_cancels.fetch_add(1, Ordering::Relaxed);
719
720        let healthy_transports: Vec<TransportClient> = self
721            .transports
722            .iter()
723            .filter(|t| t.is_healthy())
724            .cloned()
725            .collect();
726
727        if healthy_transports.is_empty() {
728            self.failed_cancels.fetch_add(1, Ordering::Relaxed);
729            anyhow::bail!("No healthy transport clients available");
730        }
731
732        let mut handles = Vec::new();
733
734        for transport in healthy_transports {
735            let handle = get_runtime().spawn(async move {
736                let client_id = transport.client_id.clone();
737                let result = transport
738                    .executor
739                    .cancel_all_orders(instrument_id, order_side)
740                    .await;
741                (client_id, result)
742            });
743            handles.push(handle);
744        }
745
746        self.process_cancel_results(
747            handles,
748            || Ok(Vec::new()),
749            "Cancel all",
750            format!("(instrument_id={instrument_id}, order_side={order_side:?})"),
751            "no orders to cancel",
752        )
753        .await
754    }
755
756    /// Gets broadcaster metrics.
757    pub fn get_metrics(&self) -> BroadcasterMetrics {
758        let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
759        let total_clients = self.transports.len();
760
761        BroadcasterMetrics {
762            total_cancels: self.total_cancels.load(Ordering::Relaxed),
763            successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
764            failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
765            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
766            idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
767            healthy_clients,
768            total_clients,
769        }
770    }
771
772    /// Gets broadcaster metrics (async version for use within async context).
773    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
774        self.get_metrics()
775    }
776
777    /// Gets per-client statistics.
778    pub fn get_client_stats(&self) -> Vec<ClientStats> {
779        self.transports
780            .iter()
781            .map(|t| ClientStats {
782                client_id: t.client_id.clone(),
783                healthy: t.is_healthy(),
784                cancel_count: t.get_cancel_count(),
785                error_count: t.get_error_count(),
786            })
787            .collect()
788    }
789
790    /// Gets per-client statistics (async version for use within async context).
791    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
792        self.get_client_stats()
793    }
794
795    /// Caches an instrument in all HTTP clients in the pool.
796    pub fn cache_instrument(&self, instrument: &InstrumentAny) {
797        for transport in self.transports.iter() {
798            transport.executor.add_instrument(instrument.clone());
799        }
800    }
801
802    #[must_use]
803    pub fn clone_for_async(&self) -> Self {
804        Self {
805            config: self.config.clone(),
806            transports: Arc::clone(&self.transports),
807            health_check_task: Arc::clone(&self.health_check_task),
808            running: Arc::clone(&self.running),
809            total_cancels: Arc::clone(&self.total_cancels),
810            successful_cancels: Arc::clone(&self.successful_cancels),
811            failed_cancels: Arc::clone(&self.failed_cancels),
812            expected_rejects: Arc::clone(&self.expected_rejects),
813            idempotent_successes: Arc::clone(&self.idempotent_successes),
814        }
815    }
816
817    #[cfg(test)]
818    fn new_with_transports(
819        config: CancelBroadcasterConfig,
820        transports: Vec<TransportClient>,
821    ) -> Self {
822        Self {
823            config,
824            transports: Arc::new(transports),
825            health_check_task: Arc::new(RwLock::new(None)),
826            running: Arc::new(AtomicBool::new(false)),
827            total_cancels: Arc::new(AtomicU64::new(0)),
828            successful_cancels: Arc::new(AtomicU64::new(0)),
829            failed_cancels: Arc::new(AtomicU64::new(0)),
830            expected_rejects: Arc::new(AtomicU64::new(0)),
831            idempotent_successes: Arc::new(AtomicU64::new(0)),
832        }
833    }
834}
835
836/// Broadcaster metrics snapshot.
837#[derive(Debug, Clone)]
838pub struct BroadcasterMetrics {
839    pub total_cancels: u64,
840    pub successful_cancels: u64,
841    pub failed_cancels: u64,
842    pub expected_rejects: u64,
843    pub idempotent_successes: u64,
844    pub healthy_clients: usize,
845    pub total_clients: usize,
846}
847
848/// Per-client statistics.
849#[derive(Debug, Clone)]
850pub struct ClientStats {
851    pub client_id: String,
852    pub healthy: bool,
853    pub cancel_count: u64,
854    pub error_count: u64,
855}
856
857#[cfg(test)]
858mod tests {
859    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
860
861    use nautilus_core::UUID4;
862    use nautilus_model::{
863        enums::{
864            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
865        },
866        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
867        reports::OrderStatusReport,
868        types::{Price, Quantity},
869    };
870
871    use super::*;
872
873    /// Mock executor for testing.
874    #[derive(Clone)]
875    #[expect(clippy::type_complexity)]
876    struct MockExecutor {
877        handler: Arc<
878            dyn Fn(
879                    InstrumentId,
880                    Option<ClientOrderId>,
881                    Option<VenueOrderId>,
882                )
883                    -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
884                + Send
885                + Sync,
886        >,
887    }
888
889    impl MockExecutor {
890        fn new<F, Fut>(handler: F) -> Self
891        where
892            F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
893                + Send
894                + Sync
895                + 'static,
896            Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
897        {
898            Self {
899                handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
900            }
901        }
902    }
903
904    impl CancelExecutor for MockExecutor {
905        fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
906            Box::pin(async { Ok(()) })
907        }
908
909        fn cancel_order(
910            &self,
911            instrument_id: InstrumentId,
912            client_order_id: Option<ClientOrderId>,
913            venue_order_id: Option<VenueOrderId>,
914        ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
915            (self.handler)(instrument_id, client_order_id, venue_order_id)
916        }
917
918        fn cancel_orders(
919            &self,
920            _instrument_id: InstrumentId,
921            _client_order_ids: Option<Vec<ClientOrderId>>,
922            _venue_order_ids: Option<Vec<VenueOrderId>>,
923        ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
924        {
925            Box::pin(async { Ok(Vec::new()) })
926        }
927
928        fn cancel_all_orders(
929            &self,
930            instrument_id: InstrumentId,
931            _order_side: Option<OrderSide>,
932        ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
933        {
934            // Try to get result from the single-order handler to propagate errors
935            let handler = Arc::clone(&self.handler);
936            Box::pin(async move {
937                // Call the handler to check if it would fail
938                let result = handler(instrument_id, None, None).await;
939                match result {
940                    Ok(_) => Ok(Vec::new()),
941                    Err(e) => Err(e),
942                }
943            })
944        }
945
946        fn add_instrument(&self, _instrument: InstrumentAny) {
947            // No-op for mock
948        }
949    }
950
951    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
952        OrderStatusReport {
953            account_id: AccountId::from("BITMEX-001"),
954            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
955            venue_order_id: VenueOrderId::from(venue_order_id),
956            order_side: OrderSide::Buy,
957            order_type: OrderType::Limit,
958            time_in_force: TimeInForce::Gtc,
959            order_status: OrderStatus::Canceled,
960            price: Some(Price::new(50000.0, 2)),
961            quantity: Quantity::new(100.0, 0),
962            filled_qty: Quantity::new(0.0, 0),
963            report_id: UUID4::new(),
964            ts_accepted: 0.into(),
965            ts_last: 0.into(),
966            ts_init: 0.into(),
967            client_order_id: None,
968            avg_px: None,
969            trigger_price: None,
970            trigger_type: None,
971            contingency_type: ContingencyType::NoContingency,
972            expire_time: None,
973            order_list_id: None,
974            venue_position_id: None,
975            linked_order_ids: None,
976            parent_order_id: None,
977            display_qty: None,
978            limit_offset: None,
979            trailing_offset: None,
980            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
981            post_only: false,
982            reduce_only: false,
983            cancel_reason: None,
984            ts_triggered: None,
985        }
986    }
987
988    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
989    where
990        F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
991            + Send
992            + Sync
993            + 'static,
994        Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
995    {
996        let executor = MockExecutor::new(handler);
997        TransportClient::new(executor, client_id.to_string())
998    }
999
1000    #[tokio::test]
1001    async fn test_broadcast_cancel_immediate_success() {
1002        let report = create_test_report("ORDER-1");
1003        let report_clone = report.clone();
1004
1005        let transports = vec![
1006            create_stub_transport("client-0", move |_, _, _| {
1007                let report = report_clone.clone();
1008                async move { Ok(report) }
1009            }),
1010            create_stub_transport("client-1", |_, _, _| async {
1011                tokio::time::sleep(Duration::from_secs(10)).await;
1012                anyhow::bail!("Should be aborted")
1013            }),
1014        ];
1015
1016        let config = CancelBroadcasterConfig::default();
1017        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1018
1019        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1020        let result = broadcaster
1021            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1022            .await;
1023
1024        assert!(result.is_ok());
1025        let returned_report = result.unwrap();
1026        assert!(returned_report.is_some());
1027        assert_eq!(
1028            returned_report.unwrap().venue_order_id,
1029            report.venue_order_id
1030        );
1031
1032        let metrics = broadcaster.get_metrics_async().await;
1033        assert_eq!(metrics.successful_cancels, 1);
1034        assert_eq!(metrics.failed_cancels, 0);
1035        assert_eq!(metrics.total_cancels, 1);
1036    }
1037
1038    #[tokio::test]
1039    async fn test_broadcast_cancel_idempotent_success() {
1040        let transports = vec![
1041            create_stub_transport("client-0", |_, _, _| async {
1042                anyhow::bail!("AlreadyCanceled")
1043            }),
1044            create_stub_transport("client-1", |_, _, _| async {
1045                tokio::time::sleep(Duration::from_secs(10)).await;
1046                anyhow::bail!("Should be aborted")
1047            }),
1048        ];
1049
1050        let config = CancelBroadcasterConfig::default();
1051        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1052
1053        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1054        let result = broadcaster
1055            .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1056            .await;
1057
1058        assert!(result.is_ok());
1059        assert!(result.unwrap().is_none());
1060
1061        let metrics = broadcaster.get_metrics_async().await;
1062        assert_eq!(metrics.idempotent_successes, 1);
1063        assert_eq!(metrics.successful_cancels, 0);
1064        assert_eq!(metrics.failed_cancels, 0);
1065    }
1066
1067    #[tokio::test]
1068    async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1069        let transports = vec![
1070            create_stub_transport("client-0", |_, _, _| async {
1071                anyhow::bail!("502 Bad Gateway")
1072            }),
1073            create_stub_transport("client-1", |_, _, _| async {
1074                anyhow::bail!("orderID not found")
1075            }),
1076        ];
1077
1078        let config = CancelBroadcasterConfig::default();
1079        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1080
1081        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1082        let result = broadcaster
1083            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1084            .await;
1085
1086        assert!(result.is_ok());
1087        assert!(result.unwrap().is_none());
1088
1089        let metrics = broadcaster.get_metrics_async().await;
1090        assert_eq!(metrics.idempotent_successes, 1);
1091        assert_eq!(metrics.failed_cancels, 0);
1092    }
1093
1094    #[tokio::test]
1095    async fn test_broadcast_cancel_all_failures() {
1096        let transports = vec![
1097            create_stub_transport("client-0", |_, _, _| async {
1098                anyhow::bail!("502 Bad Gateway")
1099            }),
1100            create_stub_transport("client-1", |_, _, _| async {
1101                anyhow::bail!("Connection refused")
1102            }),
1103        ];
1104
1105        let config = CancelBroadcasterConfig::default();
1106        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1107
1108        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1109        let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1110
1111        assert!(result.is_err());
1112        assert!(
1113            result
1114                .unwrap_err()
1115                .to_string()
1116                .contains("All cancel all requests failed")
1117        );
1118
1119        let metrics = broadcaster.get_metrics_async().await;
1120        assert_eq!(metrics.failed_cancels, 1);
1121        assert_eq!(metrics.successful_cancels, 0);
1122        assert_eq!(metrics.idempotent_successes, 0);
1123    }
1124
1125    #[tokio::test]
1126    async fn test_broadcast_cancel_no_healthy_clients() {
1127        let transport = create_stub_transport("client-0", |_, _, _| async {
1128            Ok(create_test_report("ORDER-1"))
1129        });
1130        transport.healthy.store(false, Ordering::Relaxed);
1131
1132        let config = CancelBroadcasterConfig::default();
1133        let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1134
1135        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1136        let result = broadcaster
1137            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1138            .await;
1139
1140        assert!(result.is_err());
1141        assert!(
1142            result
1143                .unwrap_err()
1144                .to_string()
1145                .contains("No healthy transport clients available")
1146        );
1147
1148        let metrics = broadcaster.get_metrics_async().await;
1149        assert_eq!(metrics.failed_cancels, 1);
1150    }
1151
1152    #[tokio::test]
1153    async fn test_broadcast_cancel_metrics_increment() {
1154        let report1 = create_test_report("ORDER-1");
1155        let report1_clone = report1.clone();
1156        let report2 = create_test_report("ORDER-2");
1157        let report2_clone = report2.clone();
1158
1159        let transports = vec![
1160            create_stub_transport("client-0", move |_, _, _| {
1161                let report = report1_clone.clone();
1162                async move { Ok(report) }
1163            }),
1164            create_stub_transport("client-1", move |_, _, _| {
1165                let report = report2_clone.clone();
1166                async move { Ok(report) }
1167            }),
1168        ];
1169
1170        let config = CancelBroadcasterConfig::default();
1171        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1172
1173        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1174
1175        let _ = broadcaster
1176            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1177            .await;
1178
1179        let _ = broadcaster
1180            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1181            .await;
1182
1183        let metrics = broadcaster.get_metrics_async().await;
1184        assert_eq!(metrics.total_cancels, 2);
1185        assert_eq!(metrics.successful_cancels, 2);
1186    }
1187
1188    #[tokio::test]
1189    async fn test_broadcast_cancel_expected_reject_pattern() {
1190        let transports = vec![
1191            create_stub_transport("client-0", |_, _, _| async {
1192                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1193            }),
1194            create_stub_transport("client-1", |_, _, _| async {
1195                anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1196            }),
1197        ];
1198
1199        let config = CancelBroadcasterConfig::default();
1200        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1201
1202        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1203        let result = broadcaster
1204            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1205            .await;
1206
1207        assert!(result.is_err());
1208
1209        let metrics = broadcaster.get_metrics_async().await;
1210        assert_eq!(metrics.expected_rejects, 2);
1211        assert_eq!(metrics.failed_cancels, 1);
1212    }
1213
1214    #[tokio::test]
1215    async fn test_broadcaster_creation_with_pool() {
1216        let transports = vec![
1217            create_stub_transport("client-0", |_, _, _| async {
1218                Ok(create_test_report("ORDER-1"))
1219            }),
1220            create_stub_transport("client-1", |_, _, _| async {
1221                Ok(create_test_report("ORDER-1"))
1222            }),
1223            create_stub_transport("client-2", |_, _, _| async {
1224                Ok(create_test_report("ORDER-1"))
1225            }),
1226        ];
1227
1228        let config = CancelBroadcasterConfig::default();
1229        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1230        let metrics = broadcaster.get_metrics_async().await;
1231
1232        assert_eq!(metrics.total_clients, 3);
1233        assert_eq!(metrics.total_cancels, 0);
1234        assert_eq!(metrics.successful_cancels, 0);
1235        assert_eq!(metrics.failed_cancels, 0);
1236    }
1237
1238    #[tokio::test]
1239    async fn test_broadcaster_lifecycle() {
1240        let transports = vec![
1241            create_stub_transport("client-0", |_, _, _| async {
1242                Ok(create_test_report("ORDER-1"))
1243            }),
1244            create_stub_transport("client-1", |_, _, _| async {
1245                Ok(create_test_report("ORDER-1"))
1246            }),
1247        ];
1248
1249        let config = CancelBroadcasterConfig::default();
1250        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1251
1252        // Should not be running initially
1253        assert!(!broadcaster.running.load(Ordering::Relaxed));
1254
1255        // Start broadcaster
1256        let start_result = broadcaster.start().await;
1257        assert!(start_result.is_ok());
1258        assert!(broadcaster.running.load(Ordering::Relaxed));
1259
1260        // Starting again should be idempotent
1261        let start_again = broadcaster.start().await;
1262        assert!(start_again.is_ok());
1263
1264        // Stop broadcaster
1265        broadcaster.stop().await;
1266        assert!(!broadcaster.running.load(Ordering::Relaxed));
1267
1268        // Stopping again should be safe
1269        broadcaster.stop().await;
1270        assert!(!broadcaster.running.load(Ordering::Relaxed));
1271    }
1272
1273    #[tokio::test]
1274    async fn test_client_stats_collection() {
1275        let transports = vec![
1276            create_stub_transport("client-0", |_, _, _| async {
1277                Ok(create_test_report("ORDER-1"))
1278            }),
1279            create_stub_transport("client-1", |_, _, _| async {
1280                Ok(create_test_report("ORDER-1"))
1281            }),
1282        ];
1283
1284        let config = CancelBroadcasterConfig::default();
1285        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1286        let stats = broadcaster.get_client_stats_async().await;
1287
1288        assert_eq!(stats.len(), 2);
1289        assert_eq!(stats[0].client_id, "client-0");
1290        assert_eq!(stats[1].client_id, "client-1");
1291        assert!(stats[0].healthy); // Should be healthy initially
1292        assert!(stats[1].healthy);
1293        assert_eq!(stats[0].cancel_count, 0);
1294        assert_eq!(stats[1].cancel_count, 0);
1295        assert_eq!(stats[0].error_count, 0);
1296        assert_eq!(stats[1].error_count, 0);
1297    }
1298
1299    #[tokio::test]
1300    async fn test_testnet_config_sets_base_url() {
1301        let config = CancelBroadcasterConfig {
1302            pool_size: 1,
1303            api_key: Some("test_key".to_string()),
1304            api_secret: Some("test_secret".to_string()),
1305            base_url: None,
1306            environment: BitmexEnvironment::Testnet,
1307            timeout_secs: 5,
1308            max_retries: 3,
1309            retry_delay_ms: 1_000,
1310            retry_delay_max_ms: 5_000,
1311            recv_window_ms: 10_000,
1312            max_requests_per_second: 10,
1313            max_requests_per_minute: 120,
1314            health_check_interval_secs: 60,
1315            health_check_timeout_secs: 5,
1316            expected_reject_patterns: vec![],
1317            idempotent_success_patterns: vec![],
1318            proxy_urls: vec![],
1319        };
1320
1321        let broadcaster = CancelBroadcaster::new(config);
1322        assert!(broadcaster.is_ok());
1323    }
1324
1325    #[tokio::test]
1326    async fn test_constructor_honors_default_pool_size() {
1327        let config = CancelBroadcasterConfig {
1328            api_key: Some("test_key".to_string()),
1329            api_secret: Some("test_secret".to_string()),
1330            base_url: Some("http://127.0.0.1:19999".to_string()),
1331            ..Default::default()
1332        };
1333
1334        let expected_pool = config.pool_size;
1335        let broadcaster = CancelBroadcaster::new(config).unwrap();
1336        let metrics = broadcaster.get_metrics_async().await;
1337
1338        assert_eq!(metrics.total_clients, expected_pool);
1339    }
1340
1341    #[tokio::test]
1342    async fn test_default_config() {
1343        let transports = vec![
1344            create_stub_transport("client-0", |_, _, _| async {
1345                Ok(create_test_report("ORDER-1"))
1346            }),
1347            create_stub_transport("client-1", |_, _, _| async {
1348                Ok(create_test_report("ORDER-1"))
1349            }),
1350        ];
1351
1352        let config = CancelBroadcasterConfig::default();
1353        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1354        let metrics = broadcaster.get_metrics_async().await;
1355
1356        // Default pool_size is 2
1357        assert_eq!(metrics.total_clients, 2);
1358    }
1359
1360    #[tokio::test]
1361    async fn test_clone_for_async() {
1362        let transports = vec![create_stub_transport("client-0", |_, _, _| async {
1363            Ok(create_test_report("ORDER-1"))
1364        })];
1365
1366        let config = CancelBroadcasterConfig::default();
1367        let broadcaster1 = CancelBroadcaster::new_with_transports(config, transports);
1368
1369        // Increment a metric on original
1370        broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1371
1372        // Clone should share the same atomic
1373        let broadcaster2 = broadcaster1.clone_for_async();
1374        let metrics2 = broadcaster2.get_metrics_async().await;
1375
1376        assert_eq!(metrics2.total_cancels, 1); // Should see the increment
1377
1378        // Modify through clone
1379        broadcaster2
1380            .successful_cancels
1381            .fetch_add(5, Ordering::Relaxed);
1382
1383        // Original should see the change
1384        let metrics1 = broadcaster1.get_metrics_async().await;
1385        assert_eq!(metrics1.successful_cancels, 5);
1386    }
1387
1388    #[tokio::test]
1389    async fn test_pattern_matching() {
1390        // Test that pattern matching works for expected rejects and idempotent successes
1391        let transports = vec![create_stub_transport("client-0", |_, _, _| async {
1392            Ok(create_test_report("ORDER-1"))
1393        })];
1394
1395        let config = CancelBroadcasterConfig {
1396            expected_reject_patterns: vec![
1397                "ParticipateDoNotInitiate".to_string(),
1398                "Close-only".to_string(),
1399            ],
1400            idempotent_success_patterns: vec![
1401                "AlreadyCanceled".to_string(),
1402                "orderID not found".to_string(),
1403                "Unable to cancel".to_string(),
1404            ],
1405            ..Default::default()
1406        };
1407
1408        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1409
1410        // Test expected reject patterns
1411        assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1412        assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1413        assert!(!broadcaster.is_expected_reject("Connection timeout"));
1414
1415        // Test idempotent success patterns
1416        assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1417        assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1418        assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1419        assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1420    }
1421
1422    // Happy-path coverage for broadcast_batch_cancel and broadcast_cancel_all
1423    // Note: These use simplified stubs since batch/cancel-all bypass test_handler
1424    // Full HTTP mocking tested in integration tests
1425    #[tokio::test]
1426    async fn test_broadcast_batch_cancel_structure() {
1427        // Validates broadcaster structure and metric initialization
1428        let transports = vec![
1429            create_stub_transport("client-0", |_, _, _| async {
1430                Ok(create_test_report("ORDER-1"))
1431            }),
1432            create_stub_transport("client-1", |_, _, _| async {
1433                Ok(create_test_report("ORDER-1"))
1434            }),
1435        ];
1436
1437        let config = CancelBroadcasterConfig {
1438            idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1439            ..Default::default()
1440        };
1441
1442        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1443        let metrics = broadcaster.get_metrics_async().await;
1444
1445        // Verify initial state
1446        assert_eq!(metrics.total_clients, 2);
1447        assert_eq!(metrics.total_cancels, 0);
1448        assert_eq!(metrics.successful_cancels, 0);
1449        assert_eq!(metrics.failed_cancels, 0);
1450    }
1451
1452    #[tokio::test]
1453    async fn test_broadcast_cancel_all_structure() {
1454        // Validates broadcaster structure for cancel_all operations
1455        let transports = vec![
1456            create_stub_transport("client-0", |_, _, _| async {
1457                Ok(create_test_report("ORDER-1"))
1458            }),
1459            create_stub_transport("client-1", |_, _, _| async {
1460                Ok(create_test_report("ORDER-1"))
1461            }),
1462            create_stub_transport("client-2", |_, _, _| async {
1463                Ok(create_test_report("ORDER-1"))
1464            }),
1465        ];
1466
1467        let config = CancelBroadcasterConfig {
1468            idempotent_success_patterns: vec!["orderID not found".to_string()],
1469            ..Default::default()
1470        };
1471
1472        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1473        let metrics = broadcaster.get_metrics_async().await;
1474
1475        // Verify pool size and initial metrics
1476        assert_eq!(metrics.total_clients, 3);
1477        assert_eq!(metrics.healthy_clients, 3);
1478        assert_eq!(metrics.total_cancels, 0);
1479    }
1480
1481    // Metric health tests - validates that idempotent successes don't increment failed_cancels
1482    #[tokio::test]
1483    async fn test_single_cancel_metrics_with_mixed_responses() {
1484        // Test similar to test_broadcast_cancel_mixed_idempotent_and_failure
1485        // but explicitly validates metric health
1486        let transports = vec![
1487            create_stub_transport("client-0", |_, _, _| async {
1488                anyhow::bail!("Connection timeout")
1489            }),
1490            create_stub_transport("client-1", |_, _, _| async {
1491                anyhow::bail!("AlreadyCanceled")
1492            }),
1493        ];
1494
1495        let config = CancelBroadcasterConfig::default();
1496        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1497
1498        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1499        let result = broadcaster
1500            .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1501            .await;
1502
1503        // Should succeed with idempotent
1504        assert!(result.is_ok());
1505        assert!(result.unwrap().is_none());
1506
1507        // Verify metrics: idempotent success doesn't count as failure
1508        let metrics = broadcaster.get_metrics_async().await;
1509        assert_eq!(
1510            metrics.failed_cancels, 0,
1511            "Idempotent success should not increment failed_cancels"
1512        );
1513        assert_eq!(metrics.idempotent_successes, 1);
1514        assert_eq!(metrics.successful_cancels, 0);
1515    }
1516
1517    #[tokio::test]
1518    async fn test_metrics_initialization_and_health() {
1519        // Validates that metrics start at zero and clients start healthy
1520        let transports = vec![
1521            create_stub_transport("client-0", |_, _, _| async {
1522                Ok(create_test_report("ORDER-1"))
1523            }),
1524            create_stub_transport("client-1", |_, _, _| async {
1525                Ok(create_test_report("ORDER-1"))
1526            }),
1527            create_stub_transport("client-2", |_, _, _| async {
1528                Ok(create_test_report("ORDER-1"))
1529            }),
1530            create_stub_transport("client-3", |_, _, _| async {
1531                Ok(create_test_report("ORDER-1"))
1532            }),
1533        ];
1534
1535        let config = CancelBroadcasterConfig::default();
1536        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1537        let metrics = broadcaster.get_metrics_async().await;
1538
1539        // All metrics should start at zero
1540        assert_eq!(metrics.total_cancels, 0);
1541        assert_eq!(metrics.successful_cancels, 0);
1542        assert_eq!(metrics.failed_cancels, 0);
1543        assert_eq!(metrics.expected_rejects, 0);
1544        assert_eq!(metrics.idempotent_successes, 0);
1545
1546        // All clients should start healthy
1547        assert_eq!(metrics.healthy_clients, 4);
1548        assert_eq!(metrics.total_clients, 4);
1549    }
1550
1551    // Health-check task lifecycle test
1552    #[tokio::test]
1553    async fn test_health_check_task_lifecycle() {
1554        let transports = vec![create_stub_transport("client-0", |_, _, _| async {
1555            Ok(create_test_report("ORDER-1"))
1556        })];
1557
1558        let config = CancelBroadcasterConfig {
1559            health_check_interval_secs: 1, // Very short interval
1560            health_check_timeout_secs: 1,
1561            ..Default::default()
1562        };
1563
1564        let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1565
1566        // Start the broadcaster
1567        broadcaster.start().await.unwrap();
1568        assert!(broadcaster.running.load(Ordering::Relaxed));
1569
1570        // Verify task handle exists
1571        {
1572            let task_guard = broadcaster.health_check_task.read().await;
1573            assert!(task_guard.is_some());
1574        }
1575
1576        // Stop the broadcaster
1577        broadcaster.stop().await;
1578        assert!(!broadcaster.running.load(Ordering::Relaxed));
1579
1580        // Verify task handle has been cleared
1581        {
1582            let task_guard = broadcaster.health_check_task.read().await;
1583            assert!(task_guard.is_none());
1584        }
1585    }
1586}