Skip to main content

nautilus_bitmex/broadcast/
submitter.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Submit request broadcaster for redundant order submission.
17//!
18//! This module provides the [`SubmitBroadcaster`] which fans out submit requests
19//! to multiple HTTP clients in parallel for redundancy. The broadcaster is triggered
20//! when the `SubmitOrder` command contains `params["broadcast_submit_tries"]`.
21//!
22//! Key design patterns:
23//!
24//! - **Dependency injection via traits**: Uses `SubmitExecutor` trait to abstract
25//!   the HTTP client, enabling testing without `#[cfg(test)]` conditional compilation.
26//! - **Trait objects over generics**: Uses `Arc<dyn SubmitExecutor>` to avoid
27//!   generic type parameters on the public API (simpler Python FFI).
28//! - **Short-circuit on first success**: Aborts remaining requests once any client
29//!   succeeds, minimizing latency.
30//! - **Idempotent rejection handling**: Recognizes duplicate clOrdID as expected
31//!   rejections for debug-level logging without noise.
32
33// TODO: Replace boxed futures in `SubmitExecutor` once stable async trait object support
34// lands so we can drop the per-call heap allocation
35
36use std::{
37    fmt::Debug,
38    future::Future,
39    pin::Pin,
40    sync::{
41        Arc,
42        atomic::{AtomicBool, AtomicU64, Ordering},
43    },
44    time::Duration,
45};
46
47use futures_util::future;
48use nautilus_common::live::get_runtime;
49use nautilus_model::{
50    enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TrailingOffsetType, TriggerType},
51    identifiers::{ClientOrderId, InstrumentId, OrderListId},
52    instruments::InstrumentAny,
53    reports::OrderStatusReport,
54    types::{Price, Quantity},
55};
56use tokio::{sync::RwLock, task::JoinHandle, time::interval};
57
58use crate::{
59    common::{
60        consts::BITMEX_HTTP_TESTNET_URL,
61        enums::{BitmexEnvironment, BitmexPegPriceType},
62    },
63    http::client::BitmexHttpClient,
64};
65
66/// Trait for order submission operations.
67///
68/// This trait abstracts the execution layer to enable dependency injection and testing
69/// without conditional compilation. The broadcaster holds executors as `Arc<dyn SubmitExecutor>`
70/// to avoid generic type parameters that would complicate the Python FFI boundary.
71///
72/// # Thread Safety
73///
74/// All methods must be safe to call concurrently from multiple threads. Implementations
75/// should use interior mutability (e.g., `Arc<Mutex<T>>`) if mutable state is required.
76///
77/// # Error Handling
78///
79/// Methods return `anyhow::Result` for flexibility. Implementers should provide
80/// meaningful error messages that can be logged and tracked by the broadcaster.
81///
82/// # Implementation Note
83///
84/// This trait does not require `Clone` because executors are wrapped in `Arc` at the
85/// `TransportClient` level. This allows `BitmexHttpClient` (which doesn't implement
86/// `Clone`) to be used without modification.
87trait SubmitExecutor: Send + Sync {
88    /// Adds an instrument for caching.
89    fn add_instrument(&self, instrument: InstrumentAny);
90
91    /// Performs a health check on the executor.
92    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
93
94    /// Submits a single order.
95    #[expect(clippy::too_many_arguments)]
96    fn submit_order(
97        &self,
98        instrument_id: InstrumentId,
99        client_order_id: ClientOrderId,
100        order_side: OrderSide,
101        order_type: OrderType,
102        quantity: Quantity,
103        time_in_force: TimeInForce,
104        price: Option<Price>,
105        trigger_price: Option<Price>,
106        trigger_type: Option<TriggerType>,
107        trailing_offset: Option<f64>,
108        trailing_offset_type: Option<TrailingOffsetType>,
109        display_qty: Option<Quantity>,
110        post_only: bool,
111        reduce_only: bool,
112        order_list_id: Option<OrderListId>,
113        contingency_type: Option<ContingencyType>,
114        peg_price_type: Option<BitmexPegPriceType>,
115        peg_offset_value: Option<f64>,
116    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
117}
118
119impl SubmitExecutor for BitmexHttpClient {
120    fn add_instrument(&self, instrument: InstrumentAny) {
121        Self::cache_instrument(self, instrument);
122    }
123
124    fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
125        Box::pin(async move {
126            Self::get_server_time(self)
127                .await
128                .map(|_| ())
129                .map_err(|e| anyhow::anyhow!("{e}"))
130        })
131    }
132
133    fn submit_order(
134        &self,
135        instrument_id: InstrumentId,
136        client_order_id: ClientOrderId,
137        order_side: OrderSide,
138        order_type: OrderType,
139        quantity: Quantity,
140        time_in_force: TimeInForce,
141        price: Option<Price>,
142        trigger_price: Option<Price>,
143        trigger_type: Option<TriggerType>,
144        trailing_offset: Option<f64>,
145        trailing_offset_type: Option<TrailingOffsetType>,
146        display_qty: Option<Quantity>,
147        post_only: bool,
148        reduce_only: bool,
149        order_list_id: Option<OrderListId>,
150        contingency_type: Option<ContingencyType>,
151        peg_price_type: Option<BitmexPegPriceType>,
152        peg_offset_value: Option<f64>,
153    ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
154        Box::pin(async move {
155            Self::submit_order(
156                self,
157                instrument_id,
158                client_order_id,
159                order_side,
160                order_type,
161                quantity,
162                time_in_force,
163                price,
164                trigger_price,
165                trigger_type,
166                trailing_offset,
167                trailing_offset_type,
168                display_qty,
169                post_only,
170                reduce_only,
171                order_list_id,
172                contingency_type,
173                peg_price_type,
174                peg_offset_value,
175            )
176            .await
177        })
178    }
179}
180
181/// Configuration for the submit broadcaster.
182#[derive(Debug, Clone)]
183pub struct SubmitBroadcasterConfig {
184    /// Number of HTTP clients in the pool.
185    pub pool_size: usize,
186    /// BitMEX API key (None will source from environment).
187    pub api_key: Option<String>,
188    /// BitMEX API secret (None will source from environment).
189    pub api_secret: Option<String>,
190    /// Base URL for BitMEX HTTP API.
191    pub base_url: Option<String>,
192    /// BitMEX environment (mainnet or testnet).
193    pub environment: BitmexEnvironment,
194    /// Timeout in seconds for HTTP requests.
195    pub timeout_secs: u64,
196    /// Maximum number of retry attempts for failed requests.
197    pub max_retries: u32,
198    /// Initial delay in milliseconds between retry attempts.
199    pub retry_delay_ms: u64,
200    /// Maximum delay in milliseconds between retry attempts.
201    pub retry_delay_max_ms: u64,
202    /// Expiration window in milliseconds for signed requests.
203    pub recv_window_ms: u64,
204    /// Maximum REST burst rate (requests per second).
205    pub max_requests_per_second: u32,
206    /// Maximum REST rolling rate (requests per minute).
207    pub max_requests_per_minute: u32,
208    /// Interval in seconds between health check pings.
209    pub health_check_interval_secs: u64,
210    /// Timeout in seconds for health check requests.
211    pub health_check_timeout_secs: u64,
212    /// Substrings to identify expected submit rejections for debug-level logging.
213    pub expected_reject_patterns: Vec<String>,
214    /// Optional list of proxy URLs for path diversity.
215    ///
216    /// Each transport instance uses the proxy at its index. If the list is shorter
217    /// than pool_size, remaining transports will use no proxy. If longer, extra proxies
218    /// are ignored.
219    pub proxy_urls: Vec<Option<String>>,
220}
221
222impl Default for SubmitBroadcasterConfig {
223    fn default() -> Self {
224        Self {
225            pool_size: 3,
226            api_key: None,
227            api_secret: None,
228            base_url: None,
229            environment: BitmexEnvironment::Mainnet,
230            timeout_secs: 60,
231            max_retries: 3,
232            retry_delay_ms: 1_000,
233            retry_delay_max_ms: 5_000,
234            recv_window_ms: 10_000,
235            max_requests_per_second: 10,
236            max_requests_per_minute: 120,
237            health_check_interval_secs: 30,
238            health_check_timeout_secs: 5,
239            expected_reject_patterns: vec!["Duplicate clOrdID".to_string()],
240            proxy_urls: vec![],
241        }
242    }
243}
244
245/// Transport client wrapper with health monitoring.
246#[derive(Clone)]
247struct TransportClient {
248    /// Executor wrapped in Arc to enable cloning without requiring Clone on SubmitExecutor.
249    ///
250    /// BitmexHttpClient doesn't implement Clone, so we use reference counting to share
251    /// the executor across multiple TransportClient clones.
252    executor: Arc<dyn SubmitExecutor>,
253    client_id: String,
254    healthy: Arc<AtomicBool>,
255    submit_count: Arc<AtomicU64>,
256    error_count: Arc<AtomicU64>,
257}
258
259impl Debug for TransportClient {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        f.debug_struct(stringify!(TransportClient))
262            .field("client_id", &self.client_id)
263            .field("healthy", &self.healthy)
264            .field("submit_count", &self.submit_count)
265            .field("error_count", &self.error_count)
266            .finish()
267    }
268}
269
270impl TransportClient {
271    fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
272        Self {
273            executor: Arc::new(executor),
274            client_id,
275            healthy: Arc::new(AtomicBool::new(true)),
276            submit_count: Arc::new(AtomicU64::new(0)),
277            error_count: Arc::new(AtomicU64::new(0)),
278        }
279    }
280
281    fn is_healthy(&self) -> bool {
282        self.healthy.load(Ordering::Relaxed)
283    }
284
285    fn mark_healthy(&self) {
286        self.healthy.store(true, Ordering::Relaxed);
287    }
288
289    fn mark_unhealthy(&self) {
290        self.healthy.store(false, Ordering::Relaxed);
291    }
292
293    fn get_submit_count(&self) -> u64 {
294        self.submit_count.load(Ordering::Relaxed)
295    }
296
297    fn get_error_count(&self) -> u64 {
298        self.error_count.load(Ordering::Relaxed)
299    }
300
301    async fn health_check(&self, timeout_secs: u64) -> bool {
302        match tokio::time::timeout(
303            Duration::from_secs(timeout_secs),
304            self.executor.health_check(),
305        )
306        .await
307        {
308            Ok(Ok(())) => {
309                self.mark_healthy();
310                true
311            }
312            Ok(Err(e)) => {
313                log::warn!("Health check failed for client {}: {e:?}", self.client_id);
314                self.mark_unhealthy();
315                false
316            }
317            Err(_) => {
318                log::warn!("Health check timeout for client {}", self.client_id);
319                self.mark_unhealthy();
320                false
321            }
322        }
323    }
324
325    #[expect(clippy::too_many_arguments)]
326    async fn submit_order(
327        &self,
328        instrument_id: InstrumentId,
329        client_order_id: ClientOrderId,
330        order_side: OrderSide,
331        order_type: OrderType,
332        quantity: Quantity,
333        time_in_force: TimeInForce,
334        price: Option<Price>,
335        trigger_price: Option<Price>,
336        trigger_type: Option<TriggerType>,
337        trailing_offset: Option<f64>,
338        trailing_offset_type: Option<TrailingOffsetType>,
339        display_qty: Option<Quantity>,
340        post_only: bool,
341        reduce_only: bool,
342        order_list_id: Option<OrderListId>,
343        contingency_type: Option<ContingencyType>,
344        peg_price_type: Option<BitmexPegPriceType>,
345        peg_offset_value: Option<f64>,
346    ) -> anyhow::Result<OrderStatusReport> {
347        self.submit_count.fetch_add(1, Ordering::Relaxed);
348
349        match self
350            .executor
351            .submit_order(
352                instrument_id,
353                client_order_id,
354                order_side,
355                order_type,
356                quantity,
357                time_in_force,
358                price,
359                trigger_price,
360                trigger_type,
361                trailing_offset,
362                trailing_offset_type,
363                display_qty,
364                post_only,
365                reduce_only,
366                order_list_id,
367                contingency_type,
368                peg_price_type,
369                peg_offset_value,
370            )
371            .await
372        {
373            Ok(report) => {
374                self.mark_healthy();
375                Ok(report)
376            }
377            Err(e) => {
378                self.error_count.fetch_add(1, Ordering::Relaxed);
379                Err(e)
380            }
381        }
382    }
383}
384
385/// Broadcasts submit requests to multiple HTTP clients for redundancy.
386///
387/// This broadcaster fans out submit requests to multiple pre-warmed HTTP clients
388/// in parallel, short-circuits when the first successful acknowledgement is received,
389/// and handles expected rejection patterns (duplicate clOrdID) with appropriate log levels.
390#[cfg_attr(feature = "python", pyo3::pyclass)]
391#[cfg_attr(
392    feature = "python",
393    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bitmex")
394)]
395#[derive(Debug)]
396pub struct SubmitBroadcaster {
397    config: SubmitBroadcasterConfig,
398    transports: Arc<Vec<TransportClient>>,
399    health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
400    running: Arc<AtomicBool>,
401    total_submits: Arc<AtomicU64>,
402    successful_submits: Arc<AtomicU64>,
403    failed_submits: Arc<AtomicU64>,
404    expected_rejects: Arc<AtomicU64>,
405}
406
407impl SubmitBroadcaster {
408    /// Creates a new [`SubmitBroadcaster`] with internal HTTP client pool.
409    ///
410    /// # Errors
411    ///
412    /// Returns an error if any HTTP client fails to initialize.
413    pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
414        let mut transports = Vec::with_capacity(config.pool_size);
415
416        let base_url = match config.environment {
417            BitmexEnvironment::Testnet if config.base_url.is_none() => {
418                Some(BITMEX_HTTP_TESTNET_URL.to_string())
419            }
420            _ => config.base_url.clone(),
421        };
422
423        for i in 0..config.pool_size {
424            // Assign proxy from config list, or None if index exceeds list length
425            let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
426
427            let client = BitmexHttpClient::with_credentials(
428                config.api_key.clone(),
429                config.api_secret.clone(),
430                base_url.clone(),
431                config.timeout_secs,
432                config.max_retries,
433                config.retry_delay_ms,
434                config.retry_delay_max_ms,
435                config.recv_window_ms,
436                config.max_requests_per_second,
437                config.max_requests_per_minute,
438                proxy_url,
439            )
440            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
441
442            transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
443        }
444
445        Ok(Self {
446            config,
447            transports: Arc::new(transports),
448            health_check_task: Arc::new(RwLock::new(None)),
449            running: Arc::new(AtomicBool::new(false)),
450            total_submits: Arc::new(AtomicU64::new(0)),
451            successful_submits: Arc::new(AtomicU64::new(0)),
452            failed_submits: Arc::new(AtomicU64::new(0)),
453            expected_rejects: Arc::new(AtomicU64::new(0)),
454        })
455    }
456
457    /// Starts the broadcaster and health check loop.
458    ///
459    /// # Errors
460    ///
461    /// Returns an error if the broadcaster is already running.
462    pub async fn start(&self) -> anyhow::Result<()> {
463        if self.running.load(Ordering::Relaxed) {
464            return Ok(());
465        }
466
467        self.running.store(true, Ordering::Relaxed);
468
469        // Initial health check for all clients
470        self.run_health_checks().await;
471
472        // Start periodic health check task
473        let transports = Arc::clone(&self.transports);
474        let running = Arc::clone(&self.running);
475        let interval_secs = self.config.health_check_interval_secs;
476        let timeout_secs = self.config.health_check_timeout_secs;
477
478        let task = get_runtime().spawn(async move {
479            let mut ticker = interval(Duration::from_secs(interval_secs));
480            ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
481
482            loop {
483                ticker.tick().await;
484
485                if !running.load(Ordering::Relaxed) {
486                    break;
487                }
488
489                let tasks: Vec<_> = transports
490                    .iter()
491                    .map(|t| t.health_check(timeout_secs))
492                    .collect();
493
494                let results = future::join_all(tasks).await;
495                let healthy_count = results.iter().filter(|&&r| r).count();
496
497                log::debug!(
498                    "Health check complete: {healthy_count}/{} clients healthy",
499                    results.len()
500                );
501            }
502        });
503
504        *self.health_check_task.write().await = Some(task);
505
506        log::info!(
507            "SubmitBroadcaster started with {} clients",
508            self.transports.len()
509        );
510
511        Ok(())
512    }
513
514    /// Stops the broadcaster and health check loop.
515    pub async fn stop(&self) {
516        if !self.running.load(Ordering::Relaxed) {
517            return;
518        }
519
520        self.running.store(false, Ordering::Relaxed);
521
522        if let Some(task) = self.health_check_task.write().await.take() {
523            task.abort();
524        }
525
526        log::info!("SubmitBroadcaster stopped");
527    }
528
529    async fn run_health_checks(&self) {
530        let tasks: Vec<_> = self
531            .transports
532            .iter()
533            .map(|t| t.health_check(self.config.health_check_timeout_secs))
534            .collect();
535
536        let results = future::join_all(tasks).await;
537        let healthy_count = results.iter().filter(|&&r| r).count();
538
539        log::debug!(
540            "Health check complete: {healthy_count}/{} clients healthy",
541            results.len()
542        );
543    }
544
545    fn is_expected_reject(&self, error_message: &str) -> bool {
546        self.config
547            .expected_reject_patterns
548            .iter()
549            .any(|pattern| error_message.contains(pattern))
550    }
551
552    /// Processes submit request results, handling success and failures.
553    ///
554    /// This helper consolidates the common error handling loop used for submit broadcasts.
555    async fn process_submit_results<T>(
556        &self,
557        mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
558        operation: &str,
559        params: String,
560    ) -> anyhow::Result<T>
561    where
562        T: Send + 'static,
563    {
564        let mut errors = Vec::new();
565        let mut all_duplicate_clordid = true;
566
567        while !handles.is_empty() {
568            let current_handles = std::mem::take(&mut handles);
569            let (result, _idx, remaining) = future::select_all(current_handles).await;
570            handles = remaining.into_iter().collect();
571
572            match result {
573                Ok((client_id, Ok(result))) => {
574                    // First success - abort remaining handles
575                    for handle in &handles {
576                        handle.abort();
577                    }
578                    self.successful_submits.fetch_add(1, Ordering::Relaxed);
579                    log::debug!("{operation} broadcast succeeded [{client_id}] {params}",);
580                    return Ok(result);
581                }
582                Ok((client_id, Err(e))) => {
583                    let error_msg = e.to_string();
584                    let is_duplicate = error_msg.contains("Duplicate clOrdID");
585
586                    if !is_duplicate {
587                        all_duplicate_clordid = false;
588                    }
589
590                    if self.is_expected_reject(&error_msg) {
591                        self.expected_rejects.fetch_add(1, Ordering::Relaxed);
592                        log::debug!(
593                            "Expected {} rejection [{client_id}]: {error_msg} {params}",
594                            operation.to_lowercase(),
595                        );
596                        errors.push(error_msg);
597                    } else {
598                        log::warn!(
599                            "{operation} request failed [{client_id}]: {error_msg} {params}",
600                        );
601                        errors.push(error_msg);
602                    }
603                }
604                Err(e) => {
605                    all_duplicate_clordid = false;
606                    log::warn!("{operation} task join error: {e:?}");
607                    errors.push(format!("Task panicked: {e:?}"));
608                }
609            }
610        }
611
612        // All tasks failed
613        self.failed_submits.fetch_add(1, Ordering::Relaxed);
614
615        // If all errors were "Duplicate clOrdID", this is likely an idempotent scenario
616        // where the order exists but the success response was lost
617        if all_duplicate_clordid && !errors.is_empty() {
618            log::warn!(
619                "All {} requests returned 'Duplicate clOrdID' - order likely exists {params}",
620                operation.to_lowercase(),
621            );
622            anyhow::bail!("IDEMPOTENT_DUPLICATE: Order likely exists but confirmation was lost");
623        }
624
625        log::error!(
626            "All {} requests failed: {errors:?} {params}",
627            operation.to_lowercase(),
628        );
629        Err(anyhow::anyhow!(
630            "All {} requests failed: {:?}",
631            operation.to_lowercase(),
632            errors
633        ))
634    }
635
636    /// Broadcasts a submit request to all healthy clients in parallel.
637    ///
638    /// # Returns
639    ///
640    /// - `Ok(report)` if successfully submitted with a report.
641    /// - `Err` if all requests failed.
642    ///
643    /// # Errors
644    ///
645    /// Returns an error if all submit requests fail or no healthy clients are available.
646    #[expect(clippy::too_many_arguments)]
647    pub async fn broadcast_submit(
648        &self,
649        instrument_id: InstrumentId,
650        client_order_id: ClientOrderId,
651        order_side: OrderSide,
652        order_type: OrderType,
653        quantity: Quantity,
654        time_in_force: TimeInForce,
655        price: Option<Price>,
656        trigger_price: Option<Price>,
657        trigger_type: Option<TriggerType>,
658        trailing_offset: Option<f64>,
659        trailing_offset_type: Option<TrailingOffsetType>,
660        display_qty: Option<Quantity>,
661        post_only: bool,
662        reduce_only: bool,
663        order_list_id: Option<OrderListId>,
664        contingency_type: Option<ContingencyType>,
665        submit_tries: Option<usize>,
666        peg_price_type: Option<BitmexPegPriceType>,
667        peg_offset_value: Option<f64>,
668    ) -> anyhow::Result<OrderStatusReport> {
669        self.total_submits.fetch_add(1, Ordering::Relaxed);
670
671        let pool_size = self.config.pool_size;
672        let actual_tries = if let Some(t) = submit_tries {
673            if t > pool_size {
674                // Use log macro for Python visibility for now
675                log::warn!("submit_tries={t} exceeds pool_size={pool_size}, capping at pool_size");
676            }
677            std::cmp::min(t, pool_size)
678        } else {
679            pool_size
680        };
681
682        log::debug!(
683            "Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
684        );
685
686        let healthy_transports: Vec<TransportClient> = self
687            .transports
688            .iter()
689            .filter(|t| t.is_healthy())
690            .take(actual_tries)
691            .cloned()
692            .collect();
693
694        if healthy_transports.is_empty() {
695            self.failed_submits.fetch_add(1, Ordering::Relaxed);
696            anyhow::bail!("No healthy transport clients available");
697        }
698
699        log::debug!(
700            "Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
701            healthy_transports.len(),
702        );
703
704        let mut handles = Vec::new();
705
706        for transport in healthy_transports {
707            // All transports use the same client_order_id. If multiple succeed,
708            // BitMEX rejects duplicates with "duplicate clOrdID" (expected rejection).
709            let handle = get_runtime().spawn(async move {
710                let client_id = transport.client_id.clone();
711                let result = transport
712                    .submit_order(
713                        instrument_id,
714                        client_order_id,
715                        order_side,
716                        order_type,
717                        quantity,
718                        time_in_force,
719                        price,
720                        trigger_price,
721                        trigger_type,
722                        trailing_offset,
723                        trailing_offset_type,
724                        display_qty,
725                        post_only,
726                        reduce_only,
727                        order_list_id,
728                        contingency_type,
729                        peg_price_type,
730                        peg_offset_value,
731                    )
732                    .await;
733                (client_id, result)
734            });
735            handles.push(handle);
736        }
737
738        self.process_submit_results(
739            handles,
740            "Submit",
741            format!("(client_order_id={client_order_id:?})"),
742        )
743        .await
744    }
745
746    /// Gets broadcaster metrics.
747    pub fn get_metrics(&self) -> BroadcasterMetrics {
748        let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
749        let total_clients = self.transports.len();
750
751        BroadcasterMetrics {
752            total_submits: self.total_submits.load(Ordering::Relaxed),
753            successful_submits: self.successful_submits.load(Ordering::Relaxed),
754            failed_submits: self.failed_submits.load(Ordering::Relaxed),
755            expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
756            healthy_clients,
757            total_clients,
758        }
759    }
760
761    /// Gets broadcaster metrics (async version for use within async context).
762    pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
763        self.get_metrics()
764    }
765
766    /// Gets per-client statistics.
767    pub fn get_client_stats(&self) -> Vec<ClientStats> {
768        self.transports
769            .iter()
770            .map(|t| ClientStats {
771                client_id: t.client_id.clone(),
772                healthy: t.is_healthy(),
773                submit_count: t.get_submit_count(),
774                error_count: t.get_error_count(),
775            })
776            .collect()
777    }
778
779    /// Gets per-client statistics (async version for use within async context).
780    pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
781        self.get_client_stats()
782    }
783
784    /// Caches an instrument in all HTTP clients in the pool.
785    pub fn cache_instrument(&self, instrument: &InstrumentAny) {
786        for transport in self.transports.iter() {
787            transport.executor.add_instrument(instrument.clone());
788        }
789    }
790
791    #[must_use]
792    pub fn clone_for_async(&self) -> Self {
793        Self {
794            config: self.config.clone(),
795            transports: Arc::clone(&self.transports),
796            health_check_task: Arc::clone(&self.health_check_task),
797            running: Arc::clone(&self.running),
798            total_submits: Arc::clone(&self.total_submits),
799            successful_submits: Arc::clone(&self.successful_submits),
800            failed_submits: Arc::clone(&self.failed_submits),
801            expected_rejects: Arc::clone(&self.expected_rejects),
802        }
803    }
804
805    #[cfg(test)]
806    fn new_with_transports(
807        config: SubmitBroadcasterConfig,
808        transports: Vec<TransportClient>,
809    ) -> Self {
810        Self {
811            config,
812            transports: Arc::new(transports),
813            health_check_task: Arc::new(RwLock::new(None)),
814            running: Arc::new(AtomicBool::new(false)),
815            total_submits: Arc::new(AtomicU64::new(0)),
816            successful_submits: Arc::new(AtomicU64::new(0)),
817            failed_submits: Arc::new(AtomicU64::new(0)),
818            expected_rejects: Arc::new(AtomicU64::new(0)),
819        }
820    }
821}
822
823/// Broadcaster metrics snapshot.
824#[derive(Debug, Clone)]
825pub struct BroadcasterMetrics {
826    pub total_submits: u64,
827    pub successful_submits: u64,
828    pub failed_submits: u64,
829    pub expected_rejects: u64,
830    pub healthy_clients: usize,
831    pub total_clients: usize,
832}
833
834/// Per-client statistics.
835#[derive(Debug, Clone)]
836pub struct ClientStats {
837    pub client_id: String,
838    pub healthy: bool,
839    pub submit_count: u64,
840    pub error_count: u64,
841}
842
843#[cfg(test)]
844mod tests {
845    use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
846
847    use nautilus_core::UUID4;
848    use nautilus_model::{
849        enums::{
850            ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
851        },
852        identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
853        reports::OrderStatusReport,
854        types::{Price, Quantity},
855    };
856
857    use super::*;
858
859    /// Mock executor for testing.
860    #[derive(Clone)]
861    #[expect(clippy::type_complexity)]
862    struct MockExecutor {
863        handler: Arc<
864            dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
865                + Send
866                + Sync,
867        >,
868    }
869
870    impl MockExecutor {
871        fn new<F, Fut>(handler: F) -> Self
872        where
873            F: Fn() -> Fut + Send + Sync + 'static,
874            Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
875        {
876            Self {
877                handler: Arc::new(move || Box::pin(handler())),
878            }
879        }
880    }
881
882    impl SubmitExecutor for MockExecutor {
883        fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
884            Box::pin(async { Ok(()) })
885        }
886
887        fn submit_order(
888            &self,
889            _instrument_id: InstrumentId,
890            _client_order_id: ClientOrderId,
891            _order_side: OrderSide,
892            _order_type: OrderType,
893            _quantity: Quantity,
894            _time_in_force: TimeInForce,
895            _price: Option<Price>,
896            _trigger_price: Option<Price>,
897            _trigger_type: Option<TriggerType>,
898            _trailing_offset: Option<f64>,
899            _trailing_offset_type: Option<TrailingOffsetType>,
900            _display_qty: Option<Quantity>,
901            _post_only: bool,
902            _reduce_only: bool,
903            _order_list_id: Option<OrderListId>,
904            _contingency_type: Option<ContingencyType>,
905            _peg_price_type: Option<BitmexPegPriceType>,
906            _peg_offset_value: Option<f64>,
907        ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
908            (self.handler)()
909        }
910
911        fn add_instrument(&self, _instrument: InstrumentAny) {
912            // No-op for mock
913        }
914    }
915
916    fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
917        OrderStatusReport {
918            account_id: AccountId::from("BITMEX-001"),
919            instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
920            venue_order_id: VenueOrderId::from(venue_order_id),
921            order_side: OrderSide::Buy,
922            order_type: OrderType::Limit,
923            time_in_force: TimeInForce::Gtc,
924            order_status: OrderStatus::Accepted,
925            price: Some(Price::new(50000.0, 2)),
926            quantity: Quantity::new(100.0, 0),
927            filled_qty: Quantity::new(0.0, 0),
928            report_id: UUID4::new(),
929            ts_accepted: 0.into(),
930            ts_last: 0.into(),
931            ts_init: 0.into(),
932            client_order_id: None,
933            avg_px: None,
934            trigger_price: None,
935            trigger_type: None,
936            contingency_type: ContingencyType::NoContingency,
937            expire_time: None,
938            order_list_id: None,
939            venue_position_id: None,
940            linked_order_ids: None,
941            parent_order_id: None,
942            display_qty: None,
943            limit_offset: None,
944            trailing_offset: None,
945            trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
946            post_only: false,
947            reduce_only: false,
948            cancel_reason: None,
949            ts_triggered: None,
950        }
951    }
952
953    fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
954    where
955        F: Fn() -> Fut + Send + Sync + 'static,
956        Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
957    {
958        let executor = MockExecutor::new(handler);
959        TransportClient::new(executor, client_id.to_string())
960    }
961
962    #[tokio::test]
963    async fn test_broadcast_submit_immediate_success() {
964        let report = create_test_report("ORDER-1");
965        let report_clone = report.clone();
966
967        let transports = vec![
968            create_stub_transport("client-0", move || {
969                let report = report_clone.clone();
970                async move { Ok(report) }
971            }),
972            create_stub_transport("client-1", || async {
973                tokio::time::sleep(Duration::from_secs(10)).await;
974                anyhow::bail!("Should be aborted")
975            }),
976        ];
977
978        let config = SubmitBroadcasterConfig::default();
979        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
980
981        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
982        let result = broadcaster
983            .broadcast_submit(
984                instrument_id,
985                ClientOrderId::from("O-123"),
986                OrderSide::Buy,
987                OrderType::Limit,
988                Quantity::new(100.0, 0),
989                TimeInForce::Gtc,
990                Some(Price::new(50000.0, 2)),
991                None,
992                None,
993                None,
994                None,
995                None,
996                false,
997                false,
998                None,
999                None,
1000                None,
1001                None,
1002                None,
1003            )
1004            .await;
1005
1006        assert!(result.is_ok());
1007        let returned_report = result.unwrap();
1008        assert_eq!(returned_report.venue_order_id, report.venue_order_id);
1009
1010        let metrics = broadcaster.get_metrics_async().await;
1011        assert_eq!(metrics.successful_submits, 1);
1012        assert_eq!(metrics.failed_submits, 0);
1013        assert_eq!(metrics.total_submits, 1);
1014    }
1015
1016    #[tokio::test]
1017    async fn test_broadcast_submit_duplicate_clordid_expected() {
1018        let transports = vec![
1019            create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
1020            create_stub_transport("client-1", || async {
1021                tokio::time::sleep(Duration::from_secs(10)).await;
1022                anyhow::bail!("Should be aborted")
1023            }),
1024        ];
1025
1026        let config = SubmitBroadcasterConfig::default();
1027        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1028
1029        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1030        let result = broadcaster
1031            .broadcast_submit(
1032                instrument_id,
1033                ClientOrderId::from("O-123"),
1034                OrderSide::Buy,
1035                OrderType::Limit,
1036                Quantity::new(100.0, 0),
1037                TimeInForce::Gtc,
1038                Some(Price::new(50000.0, 2)),
1039                None,
1040                None,
1041                None,
1042                None,
1043                None,
1044                false,
1045                false,
1046                None,
1047                None,
1048                None,
1049                None,
1050                None,
1051            )
1052            .await;
1053
1054        assert!(result.is_err());
1055
1056        let metrics = broadcaster.get_metrics_async().await;
1057        assert_eq!(metrics.expected_rejects, 1);
1058        assert_eq!(metrics.successful_submits, 0);
1059        assert_eq!(metrics.failed_submits, 1);
1060    }
1061
1062    #[tokio::test]
1063    async fn test_broadcast_submit_all_failures() {
1064        let transports = vec![
1065            create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
1066            create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
1067        ];
1068
1069        let config = SubmitBroadcasterConfig::default();
1070        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1071
1072        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1073        let result = broadcaster
1074            .broadcast_submit(
1075                instrument_id,
1076                ClientOrderId::from("O-456"),
1077                OrderSide::Sell,
1078                OrderType::Market,
1079                Quantity::new(50.0, 0),
1080                TimeInForce::Ioc,
1081                None,
1082                None,
1083                None,
1084                None,
1085                None,
1086                None,
1087                false,
1088                false,
1089                None,
1090                None,
1091                None,
1092                None,
1093                None,
1094            )
1095            .await;
1096
1097        assert!(result.is_err());
1098        assert!(
1099            result
1100                .unwrap_err()
1101                .to_string()
1102                .contains("All submit requests failed")
1103        );
1104
1105        let metrics = broadcaster.get_metrics_async().await;
1106        assert_eq!(metrics.failed_submits, 1);
1107        assert_eq!(metrics.successful_submits, 0);
1108    }
1109
1110    #[tokio::test]
1111    async fn test_broadcast_submit_no_healthy_clients() {
1112        let transport =
1113            create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
1114        transport.healthy.store(false, Ordering::Relaxed);
1115
1116        let config = SubmitBroadcasterConfig::default();
1117        let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
1118
1119        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1120        let result = broadcaster
1121            .broadcast_submit(
1122                instrument_id,
1123                ClientOrderId::from("O-789"),
1124                OrderSide::Buy,
1125                OrderType::Limit,
1126                Quantity::new(100.0, 0),
1127                TimeInForce::Gtc,
1128                Some(Price::new(50000.0, 2)),
1129                None,
1130                None,
1131                None,
1132                None,
1133                None,
1134                false,
1135                false,
1136                None,
1137                None,
1138                None,
1139                None,
1140                None,
1141            )
1142            .await;
1143
1144        assert!(result.is_err());
1145        assert!(
1146            result
1147                .unwrap_err()
1148                .to_string()
1149                .contains("No healthy transport clients available")
1150        );
1151
1152        let metrics = broadcaster.get_metrics_async().await;
1153        assert_eq!(metrics.failed_submits, 1);
1154    }
1155
1156    #[tokio::test]
1157    async fn test_default_config() {
1158        let report = create_test_report("ORDER-1");
1159        let transports: Vec<TransportClient> = (0..3)
1160            .map(|i| {
1161                let r = report.clone();
1162                create_stub_transport(&format!("client-{i}"), move || {
1163                    let r = r.clone();
1164                    async move { Ok(r) }
1165                })
1166            })
1167            .collect();
1168
1169        let config = SubmitBroadcasterConfig::default();
1170        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1171        let metrics = broadcaster.get_metrics_async().await;
1172
1173        assert_eq!(metrics.total_clients, 3);
1174    }
1175
1176    #[tokio::test]
1177    async fn test_broadcaster_lifecycle() {
1178        let report = create_test_report("ORDER-1");
1179        let transports: Vec<TransportClient> = (0..2)
1180            .map(|i| {
1181                let r = report.clone();
1182                create_stub_transport(&format!("client-{i}"), move || {
1183                    let r = r.clone();
1184                    async move { Ok(r) }
1185                })
1186            })
1187            .collect();
1188
1189        let config = SubmitBroadcasterConfig::default();
1190        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1191
1192        // Should not be running initially
1193        assert!(!broadcaster.running.load(Ordering::Relaxed));
1194
1195        // Start broadcaster
1196        let start_result = broadcaster.start().await;
1197        assert!(start_result.is_ok());
1198        assert!(broadcaster.running.load(Ordering::Relaxed));
1199
1200        // Starting again should be idempotent
1201        let start_again = broadcaster.start().await;
1202        assert!(start_again.is_ok());
1203
1204        // Stop broadcaster
1205        broadcaster.stop().await;
1206        assert!(!broadcaster.running.load(Ordering::Relaxed));
1207
1208        // Stopping again should be safe
1209        broadcaster.stop().await;
1210        assert!(!broadcaster.running.load(Ordering::Relaxed));
1211    }
1212
1213    #[tokio::test]
1214    async fn test_broadcast_submit_metrics_increment() {
1215        let report = create_test_report("ORDER-1");
1216        let report_clone = report.clone();
1217
1218        let transports = vec![create_stub_transport("client-0", move || {
1219            let report = report_clone.clone();
1220            async move { Ok(report) }
1221        })];
1222
1223        let config = SubmitBroadcasterConfig::default();
1224        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1225
1226        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1227        let _ = broadcaster
1228            .broadcast_submit(
1229                instrument_id,
1230                ClientOrderId::from("O-123"),
1231                OrderSide::Buy,
1232                OrderType::Limit,
1233                Quantity::new(100.0, 0),
1234                TimeInForce::Gtc,
1235                Some(Price::new(50000.0, 2)),
1236                None,
1237                None,
1238                None,
1239                None,
1240                None,
1241                false,
1242                false,
1243                None,
1244                None,
1245                None,
1246                None,
1247                None,
1248            )
1249            .await;
1250
1251        let metrics = broadcaster.get_metrics_async().await;
1252        assert_eq!(metrics.total_submits, 1);
1253        assert_eq!(metrics.successful_submits, 1);
1254        assert_eq!(metrics.failed_submits, 0);
1255    }
1256
1257    #[tokio::test]
1258    async fn test_broadcaster_creation_with_pool() {
1259        let report = create_test_report("ORDER-1");
1260        let transports: Vec<TransportClient> = (0..4)
1261            .map(|i| {
1262                let r = report.clone();
1263                create_stub_transport(&format!("client-{i}"), move || {
1264                    let r = r.clone();
1265                    async move { Ok(r) }
1266                })
1267            })
1268            .collect();
1269
1270        let config = SubmitBroadcasterConfig::default();
1271        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1272        let metrics = broadcaster.get_metrics_async().await;
1273        assert_eq!(metrics.total_clients, 4);
1274    }
1275
1276    #[tokio::test]
1277    async fn test_client_stats_collection() {
1278        // Both clients fail so broadcast waits for all of them (no early abort on success).
1279        // This ensures both clients execute and record their stats before the function returns.
1280        let transports = vec![
1281            create_stub_transport("client-0", || async { anyhow::bail!("Timeout error") }),
1282            create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
1283        ];
1284
1285        let config = SubmitBroadcasterConfig::default();
1286        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1287
1288        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1289        let _ = broadcaster
1290            .broadcast_submit(
1291                instrument_id,
1292                ClientOrderId::from("O-123"),
1293                OrderSide::Buy,
1294                OrderType::Limit,
1295                Quantity::new(100.0, 0),
1296                TimeInForce::Gtc,
1297                Some(Price::new(50000.0, 2)),
1298                None,
1299                None,
1300                None,
1301                None,
1302                None,
1303                false,
1304                false,
1305                None,
1306                None,
1307                None,
1308                None,
1309                None,
1310            )
1311            .await;
1312
1313        let stats = broadcaster.get_client_stats_async().await;
1314        assert_eq!(stats.len(), 2);
1315
1316        let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
1317        assert_eq!(client0.submit_count, 1);
1318        assert_eq!(client0.error_count, 1);
1319
1320        let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
1321        assert_eq!(client1.submit_count, 1);
1322        assert_eq!(client1.error_count, 1);
1323    }
1324
1325    #[tokio::test]
1326    async fn test_testnet_config_sets_base_url() {
1327        let config = SubmitBroadcasterConfig {
1328            pool_size: 1,
1329            api_key: Some("test_key".to_string()),
1330            api_secret: Some("test_secret".to_string()),
1331            environment: BitmexEnvironment::Testnet,
1332            base_url: None,
1333            ..Default::default()
1334        };
1335
1336        let broadcaster = SubmitBroadcaster::new(config);
1337        assert!(broadcaster.is_ok());
1338    }
1339
1340    #[tokio::test]
1341    async fn test_constructor_honors_default_pool_size() {
1342        let config = SubmitBroadcasterConfig {
1343            api_key: Some("test_key".to_string()),
1344            api_secret: Some("test_secret".to_string()),
1345            base_url: Some("http://127.0.0.1:19999".to_string()),
1346            ..Default::default()
1347        };
1348
1349        let expected_pool = config.pool_size;
1350        let broadcaster = SubmitBroadcaster::new(config).unwrap();
1351        let metrics = broadcaster.get_metrics_async().await;
1352
1353        assert_eq!(metrics.total_clients, expected_pool);
1354    }
1355
1356    #[tokio::test]
1357    async fn test_clone_for_async() {
1358        let report = create_test_report("ORDER-1");
1359        let transports = vec![create_stub_transport("client-0", move || {
1360            let r = report.clone();
1361            async move { Ok(r) }
1362        })];
1363
1364        let config = SubmitBroadcasterConfig::default();
1365        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1366        let cloned = broadcaster.clone_for_async();
1367
1368        // Verify they share the same atomics
1369        broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
1370        assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
1371    }
1372
1373    #[tokio::test]
1374    async fn test_pattern_matching() {
1375        let config = SubmitBroadcasterConfig {
1376            expected_reject_patterns: vec![
1377                "Duplicate clOrdID".to_string(),
1378                "Order already exists".to_string(),
1379            ],
1380            ..Default::default()
1381        };
1382
1383        let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
1384
1385        assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
1386        assert!(broadcaster.is_expected_reject("Order already exists in system"));
1387        assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
1388        assert!(!broadcaster.is_expected_reject("Internal server error"));
1389    }
1390
1391    #[tokio::test]
1392    async fn test_submit_metrics_with_mixed_responses() {
1393        let report = create_test_report("ORDER-1");
1394        let report_clone = report.clone();
1395
1396        let transports = vec![
1397            create_stub_transport("client-0", move || {
1398                let report = report_clone.clone();
1399                async move { Ok(report) }
1400            }),
1401            create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
1402        ];
1403
1404        let config = SubmitBroadcasterConfig::default();
1405        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1406
1407        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1408        let result = broadcaster
1409            .broadcast_submit(
1410                instrument_id,
1411                ClientOrderId::from("O-123"),
1412                OrderSide::Buy,
1413                OrderType::Limit,
1414                Quantity::new(100.0, 0),
1415                TimeInForce::Gtc,
1416                Some(Price::new(50000.0, 2)),
1417                None,
1418                None,
1419                None,
1420                None,
1421                None,
1422                false,
1423                false,
1424                None,
1425                None,
1426                None,
1427                None,
1428                None,
1429            )
1430            .await;
1431
1432        assert!(result.is_ok());
1433
1434        let metrics = broadcaster.get_metrics_async().await;
1435        assert_eq!(metrics.total_submits, 1);
1436        assert_eq!(metrics.successful_submits, 1);
1437        assert_eq!(metrics.failed_submits, 0);
1438    }
1439
1440    #[tokio::test]
1441    async fn test_metrics_initialization_and_health() {
1442        let report = create_test_report("ORDER-1");
1443        let transports: Vec<TransportClient> = (0..2)
1444            .map(|i| {
1445                let r = report.clone();
1446                create_stub_transport(&format!("client-{i}"), move || {
1447                    let r = r.clone();
1448                    async move { Ok(r) }
1449                })
1450            })
1451            .collect();
1452
1453        let config = SubmitBroadcasterConfig::default();
1454        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1455        let metrics = broadcaster.get_metrics_async().await;
1456
1457        assert_eq!(metrics.total_submits, 0);
1458        assert_eq!(metrics.successful_submits, 0);
1459        assert_eq!(metrics.failed_submits, 0);
1460        assert_eq!(metrics.expected_rejects, 0);
1461        assert_eq!(metrics.total_clients, 2);
1462        assert_eq!(metrics.healthy_clients, 2);
1463    }
1464
1465    #[tokio::test]
1466    async fn test_health_check_task_lifecycle() {
1467        let report = create_test_report("ORDER-1");
1468        let transports: Vec<TransportClient> = (0..2)
1469            .map(|i| {
1470                let r = report.clone();
1471                create_stub_transport(&format!("client-{i}"), move || {
1472                    let r = r.clone();
1473                    async move { Ok(r) }
1474                })
1475            })
1476            .collect();
1477
1478        let config = SubmitBroadcasterConfig::default();
1479        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1480
1481        // Start should spawn health check task
1482        broadcaster.start().await.unwrap();
1483        assert!(broadcaster.running.load(Ordering::Relaxed));
1484        assert!(
1485            broadcaster
1486                .health_check_task
1487                .read()
1488                .await
1489                .as_ref()
1490                .is_some()
1491        );
1492
1493        // Stop should clean up task
1494        broadcaster.stop().await;
1495        assert!(!broadcaster.running.load(Ordering::Relaxed));
1496    }
1497
1498    #[tokio::test]
1499    async fn test_expected_reject_pattern_comprehensive() {
1500        let transports = vec![
1501            create_stub_transport("client-0", || async {
1502                anyhow::bail!("Duplicate clOrdID: O-123 already exists")
1503            }),
1504            create_stub_transport("client-1", || async {
1505                tokio::time::sleep(Duration::from_secs(10)).await;
1506                anyhow::bail!("Should be aborted")
1507            }),
1508        ];
1509
1510        let config = SubmitBroadcasterConfig::default();
1511        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1512
1513        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1514        let result = broadcaster
1515            .broadcast_submit(
1516                instrument_id,
1517                ClientOrderId::from("O-123"),
1518                OrderSide::Buy,
1519                OrderType::Limit,
1520                Quantity::new(100.0, 0),
1521                TimeInForce::Gtc,
1522                Some(Price::new(50000.0, 2)),
1523                None,
1524                None,
1525                None,
1526                None,
1527                None,
1528                false,
1529                false,
1530                None,
1531                None,
1532                None,
1533                None,
1534                None,
1535            )
1536            .await;
1537
1538        // All failed with expected reject
1539        assert!(result.is_err());
1540
1541        let metrics = broadcaster.get_metrics_async().await;
1542        assert_eq!(metrics.expected_rejects, 1);
1543        assert_eq!(metrics.failed_submits, 1);
1544        assert_eq!(metrics.successful_submits, 0);
1545    }
1546
1547    #[tokio::test]
1548    async fn test_client_order_id_suffix_for_multiple_clients() {
1549        use std::sync::{Arc, Mutex};
1550
1551        #[derive(Clone)]
1552        struct CaptureExecutor {
1553            captured_ids: Arc<Mutex<Vec<String>>>,
1554            barrier: Arc<tokio::sync::Barrier>,
1555            report: OrderStatusReport,
1556        }
1557
1558        impl SubmitExecutor for CaptureExecutor {
1559            fn health_check(
1560                &self,
1561            ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1562                Box::pin(async { Ok(()) })
1563            }
1564
1565            fn submit_order(
1566                &self,
1567                _instrument_id: InstrumentId,
1568                client_order_id: ClientOrderId,
1569                _order_side: OrderSide,
1570                _order_type: OrderType,
1571                _quantity: Quantity,
1572                _time_in_force: TimeInForce,
1573                _price: Option<Price>,
1574                _trigger_price: Option<Price>,
1575                _trigger_type: Option<TriggerType>,
1576                _trailing_offset: Option<f64>,
1577                _trailing_offset_type: Option<TrailingOffsetType>,
1578                _display_qty: Option<Quantity>,
1579                _post_only: bool,
1580                _reduce_only: bool,
1581                _order_list_id: Option<OrderListId>,
1582                _contingency_type: Option<ContingencyType>,
1583                _peg_price_type: Option<BitmexPegPriceType>,
1584                _peg_offset_value: Option<f64>,
1585            ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1586            {
1587                // Capture the client_order_id
1588                self.captured_ids
1589                    .lock()
1590                    .unwrap()
1591                    .push(client_order_id.as_str().to_string());
1592                let report = self.report.clone();
1593                let barrier = Arc::clone(&self.barrier);
1594                // Wait for all tasks to capture their IDs before any completes
1595                // (with concurrent execution, first success aborts others)
1596                Box::pin(async move {
1597                    barrier.wait().await;
1598                    Ok(report)
1599                })
1600            }
1601
1602            fn add_instrument(&self, _instrument: InstrumentAny) {}
1603        }
1604
1605        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1606        let barrier = Arc::new(tokio::sync::Barrier::new(3));
1607        let report = create_test_report("ORDER-1");
1608
1609        let transports = vec![
1610            TransportClient::new(
1611                CaptureExecutor {
1612                    captured_ids: Arc::clone(&captured_ids),
1613                    barrier: Arc::clone(&barrier),
1614                    report: report.clone(),
1615                },
1616                "client-0".to_string(),
1617            ),
1618            TransportClient::new(
1619                CaptureExecutor {
1620                    captured_ids: Arc::clone(&captured_ids),
1621                    barrier: Arc::clone(&barrier),
1622                    report: report.clone(),
1623                },
1624                "client-1".to_string(),
1625            ),
1626            TransportClient::new(
1627                CaptureExecutor {
1628                    captured_ids: Arc::clone(&captured_ids),
1629                    barrier: Arc::clone(&barrier),
1630                    report: report.clone(),
1631                },
1632                "client-2".to_string(),
1633            ),
1634        ];
1635
1636        let config = SubmitBroadcasterConfig::default();
1637        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1638
1639        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1640        let result = broadcaster
1641            .broadcast_submit(
1642                instrument_id,
1643                ClientOrderId::from("O-123"),
1644                OrderSide::Buy,
1645                OrderType::Limit,
1646                Quantity::new(100.0, 0),
1647                TimeInForce::Gtc,
1648                Some(Price::new(50000.0, 2)),
1649                None,
1650                None,
1651                None,
1652                None,
1653                None,
1654                false,
1655                false,
1656                None,
1657                None,
1658                None,
1659                None,
1660                None,
1661            )
1662            .await;
1663
1664        assert!(result.is_ok());
1665
1666        // All transports receive the same client_order_id (no suffixing)
1667        let ids = captured_ids.lock().unwrap();
1668        assert_eq!(ids.len(), 3);
1669        assert!(ids.iter().all(|id| id == "O-123")); // All clients get the same ID
1670    }
1671
1672    #[tokio::test]
1673    async fn test_client_order_id_suffix_with_partial_failure() {
1674        use std::sync::{Arc, Mutex};
1675
1676        #[derive(Clone)]
1677        struct CaptureAndFailExecutor {
1678            captured_ids: Arc<Mutex<Vec<String>>>,
1679            barrier: Arc<tokio::sync::Barrier>,
1680            should_succeed: bool,
1681        }
1682
1683        impl SubmitExecutor for CaptureAndFailExecutor {
1684            fn health_check(
1685                &self,
1686            ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1687                Box::pin(async { Ok(()) })
1688            }
1689
1690            fn submit_order(
1691                &self,
1692                _instrument_id: InstrumentId,
1693                client_order_id: ClientOrderId,
1694                _order_side: OrderSide,
1695                _order_type: OrderType,
1696                _quantity: Quantity,
1697                _time_in_force: TimeInForce,
1698                _price: Option<Price>,
1699                _trigger_price: Option<Price>,
1700                _trigger_type: Option<TriggerType>,
1701                _trailing_offset: Option<f64>,
1702                _trailing_offset_type: Option<TrailingOffsetType>,
1703                _display_qty: Option<Quantity>,
1704                _post_only: bool,
1705                _reduce_only: bool,
1706                _order_list_id: Option<OrderListId>,
1707                _contingency_type: Option<ContingencyType>,
1708                _peg_price_type: Option<BitmexPegPriceType>,
1709                _peg_offset_value: Option<f64>,
1710            ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1711            {
1712                // Capture the client_order_id
1713                self.captured_ids
1714                    .lock()
1715                    .unwrap()
1716                    .push(client_order_id.as_str().to_string());
1717                let barrier = Arc::clone(&self.barrier);
1718                let should_succeed = self.should_succeed;
1719                // Wait for all tasks to capture their IDs before any completes
1720                // (with concurrent execution, first success aborts others)
1721                Box::pin(async move {
1722                    barrier.wait().await;
1723
1724                    if should_succeed {
1725                        Ok(create_test_report("ORDER-1"))
1726                    } else {
1727                        anyhow::bail!("Network error")
1728                    }
1729                })
1730            }
1731
1732            fn add_instrument(&self, _instrument: InstrumentAny) {}
1733        }
1734
1735        let captured_ids = Arc::new(Mutex::new(Vec::new()));
1736        let barrier = Arc::new(tokio::sync::Barrier::new(2));
1737
1738        let transports = vec![
1739            TransportClient::new(
1740                CaptureAndFailExecutor {
1741                    captured_ids: Arc::clone(&captured_ids),
1742                    barrier: Arc::clone(&barrier),
1743                    should_succeed: false,
1744                },
1745                "client-0".to_string(),
1746            ),
1747            TransportClient::new(
1748                CaptureAndFailExecutor {
1749                    captured_ids: Arc::clone(&captured_ids),
1750                    barrier: Arc::clone(&barrier),
1751                    should_succeed: true,
1752                },
1753                "client-1".to_string(),
1754            ),
1755        ];
1756
1757        let config = SubmitBroadcasterConfig::default();
1758        let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1759
1760        let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1761        let result = broadcaster
1762            .broadcast_submit(
1763                instrument_id,
1764                ClientOrderId::from("O-456"),
1765                OrderSide::Sell,
1766                OrderType::Market,
1767                Quantity::new(50.0, 0),
1768                TimeInForce::Ioc,
1769                None,
1770                None,
1771                None,
1772                None,
1773                None,
1774                None,
1775                false,
1776                false,
1777                None,
1778                None,
1779                None,
1780                None,
1781                None,
1782            )
1783            .await;
1784
1785        assert!(result.is_ok());
1786
1787        // All transports receive the same client_order_id (no suffixing)
1788        let ids = captured_ids.lock().unwrap();
1789        assert_eq!(ids.len(), 2);
1790        assert!(ids.iter().all(|id| id == "O-456")); // All clients get the same ID
1791    }
1792
1793    #[tokio::test]
1794    async fn test_proxy_urls_populated_from_config() {
1795        let config = SubmitBroadcasterConfig {
1796            pool_size: 3,
1797            api_key: Some("test_key".to_string()),
1798            api_secret: Some("test_secret".to_string()),
1799            proxy_urls: vec![
1800                Some("http://proxy1:8080".to_string()),
1801                Some("http://proxy2:8080".to_string()),
1802                Some("http://proxy3:8080".to_string()),
1803            ],
1804            ..Default::default()
1805        };
1806
1807        assert_eq!(config.proxy_urls.len(), 3);
1808        assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
1809        assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
1810        assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
1811    }
1812}