1use std::{
34 fmt::Debug,
35 future::Future,
36 pin::Pin,
37 sync::{
38 Arc,
39 atomic::{AtomicBool, AtomicU64, Ordering},
40 },
41 time::Duration,
42};
43
44use futures_util::future;
45use nautilus_common::live::get_runtime;
46use nautilus_model::{
47 enums::OrderSide,
48 identifiers::{ClientOrderId, InstrumentId, VenueOrderId},
49 instruments::InstrumentAny,
50 reports::OrderStatusReport,
51};
52use tokio::{sync::RwLock, task::JoinHandle, time::interval};
53
54use crate::{
55 common::{consts::BITMEX_HTTP_TESTNET_URL, enums::BitmexEnvironment},
56 http::client::BitmexHttpClient,
57};
58
59const IDEMPOTENT_ALREADY_CANCELED: &str = "AlreadyCanceled";
60const IDEMPOTENT_ORDER_NOT_FOUND: &str = "orderID not found";
61const IDEMPOTENT_UNABLE_DUE_TO_STATE: &str = "Unable to cancel order due to existing state";
62
63trait CancelExecutor: Send + Sync {
85 fn add_instrument(&self, instrument: InstrumentAny);
87
88 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
90
91 fn cancel_order(
93 &self,
94 instrument_id: InstrumentId,
95 client_order_id: Option<ClientOrderId>,
96 venue_order_id: Option<VenueOrderId>,
97 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
98
99 fn cancel_orders(
101 &self,
102 instrument_id: InstrumentId,
103 client_order_ids: Option<Vec<ClientOrderId>>,
104 venue_order_ids: Option<Vec<VenueOrderId>>,
105 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
106
107 fn cancel_all_orders(
109 &self,
110 instrument_id: InstrumentId,
111 order_side: Option<OrderSide>,
112 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>;
113}
114
115impl CancelExecutor for BitmexHttpClient {
116 fn add_instrument(&self, instrument: InstrumentAny) {
117 Self::cache_instrument(self, instrument);
118 }
119
120 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
121 Box::pin(async move {
122 Self::get_server_time(self)
123 .await
124 .map(|_| ())
125 .map_err(|e| anyhow::anyhow!("{e}"))
126 })
127 }
128
129 fn cancel_order(
130 &self,
131 instrument_id: InstrumentId,
132 client_order_id: Option<ClientOrderId>,
133 venue_order_id: Option<VenueOrderId>,
134 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
135 Box::pin(async move {
136 Self::cancel_order(self, instrument_id, client_order_id, venue_order_id).await
137 })
138 }
139
140 fn cancel_orders(
141 &self,
142 instrument_id: InstrumentId,
143 client_order_ids: Option<Vec<ClientOrderId>>,
144 venue_order_ids: Option<Vec<VenueOrderId>>,
145 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
146 Box::pin(async move {
147 Self::cancel_orders(self, instrument_id, client_order_ids, venue_order_ids).await
148 })
149 }
150
151 fn cancel_all_orders(
152 &self,
153 instrument_id: InstrumentId,
154 order_side: Option<OrderSide>,
155 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>> {
156 Box::pin(async move { Self::cancel_all_orders(self, instrument_id, order_side).await })
157 }
158}
159
160#[derive(Debug, Clone)]
162pub struct CancelBroadcasterConfig {
163 pub pool_size: usize,
165 pub api_key: Option<String>,
167 pub api_secret: Option<String>,
169 pub base_url: Option<String>,
171 pub environment: BitmexEnvironment,
173 pub timeout_secs: u64,
175 pub max_retries: u32,
177 pub retry_delay_ms: u64,
179 pub retry_delay_max_ms: u64,
181 pub recv_window_ms: u64,
183 pub max_requests_per_second: u32,
185 pub max_requests_per_minute: u32,
187 pub health_check_interval_secs: u64,
189 pub health_check_timeout_secs: u64,
191 pub expected_reject_patterns: Vec<String>,
193 pub idempotent_success_patterns: Vec<String>,
195 pub proxy_urls: Vec<Option<String>>,
201}
202
203impl Default for CancelBroadcasterConfig {
204 fn default() -> Self {
205 Self {
206 pool_size: 2,
207 api_key: None,
208 api_secret: None,
209 base_url: None,
210 environment: BitmexEnvironment::Mainnet,
211 timeout_secs: 60,
212 max_retries: 3,
213 retry_delay_ms: 1_000,
214 retry_delay_max_ms: 5_000,
215 recv_window_ms: 10_000,
216 max_requests_per_second: 10,
217 max_requests_per_minute: 120,
218 health_check_interval_secs: 30,
219 health_check_timeout_secs: 5,
220 expected_reject_patterns: vec![
221 "Order had execInst of ParticipateDoNotInitiate".to_string(),
222 ],
223 idempotent_success_patterns: vec![
224 IDEMPOTENT_ALREADY_CANCELED.to_string(),
225 IDEMPOTENT_ORDER_NOT_FOUND.to_string(),
226 IDEMPOTENT_UNABLE_DUE_TO_STATE.to_string(),
227 ],
228 proxy_urls: vec![],
229 }
230 }
231}
232
233#[derive(Clone)]
235struct TransportClient {
236 executor: Arc<dyn CancelExecutor>,
241 client_id: String,
242 healthy: Arc<AtomicBool>,
243 cancel_count: Arc<AtomicU64>,
244 error_count: Arc<AtomicU64>,
245}
246
247impl Debug for TransportClient {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 f.debug_struct(stringify!(TransportClient))
250 .field("client_id", &self.client_id)
251 .field("healthy", &self.healthy)
252 .field("cancel_count", &self.cancel_count)
253 .field("error_count", &self.error_count)
254 .finish()
255 }
256}
257
258impl TransportClient {
259 fn new<E: CancelExecutor + 'static>(executor: E, client_id: String) -> Self {
260 Self {
261 executor: Arc::new(executor),
262 client_id,
263 healthy: Arc::new(AtomicBool::new(true)),
264 cancel_count: Arc::new(AtomicU64::new(0)),
265 error_count: Arc::new(AtomicU64::new(0)),
266 }
267 }
268
269 fn is_healthy(&self) -> bool {
270 self.healthy.load(Ordering::Relaxed)
271 }
272
273 fn mark_healthy(&self) {
274 self.healthy.store(true, Ordering::Relaxed);
275 }
276
277 fn mark_unhealthy(&self) {
278 self.healthy.store(false, Ordering::Relaxed);
279 }
280
281 fn get_cancel_count(&self) -> u64 {
282 self.cancel_count.load(Ordering::Relaxed)
283 }
284
285 fn get_error_count(&self) -> u64 {
286 self.error_count.load(Ordering::Relaxed)
287 }
288
289 async fn health_check(&self, timeout_secs: u64) -> bool {
290 match tokio::time::timeout(
291 Duration::from_secs(timeout_secs),
292 self.executor.health_check(),
293 )
294 .await
295 {
296 Ok(Ok(())) => {
297 self.mark_healthy();
298 true
299 }
300 Ok(Err(e)) => {
301 log::warn!("Health check failed for client {}: {e:?}", self.client_id);
302 self.mark_unhealthy();
303 false
304 }
305 Err(_) => {
306 log::warn!("Health check timeout for client {}", self.client_id);
307 self.mark_unhealthy();
308 false
309 }
310 }
311 }
312
313 async fn cancel_order(
314 &self,
315 instrument_id: InstrumentId,
316 client_order_id: Option<ClientOrderId>,
317 venue_order_id: Option<VenueOrderId>,
318 ) -> anyhow::Result<OrderStatusReport> {
319 self.cancel_count.fetch_add(1, Ordering::Relaxed);
320
321 match self
322 .executor
323 .cancel_order(instrument_id, client_order_id, venue_order_id)
324 .await
325 {
326 Ok(report) => {
327 self.mark_healthy();
328 Ok(report)
329 }
330 Err(e) => {
331 self.error_count.fetch_add(1, Ordering::Relaxed);
332 Err(e)
333 }
334 }
335 }
336}
337
338#[cfg_attr(feature = "python", pyo3::pyclass)]
344#[cfg_attr(
345 feature = "python",
346 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bitmex")
347)]
348#[derive(Debug)]
349pub struct CancelBroadcaster {
350 config: CancelBroadcasterConfig,
351 transports: Arc<Vec<TransportClient>>,
352 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
353 running: Arc<AtomicBool>,
354 total_cancels: Arc<AtomicU64>,
355 successful_cancels: Arc<AtomicU64>,
356 failed_cancels: Arc<AtomicU64>,
357 expected_rejects: Arc<AtomicU64>,
358 idempotent_successes: Arc<AtomicU64>,
359}
360
361impl CancelBroadcaster {
362 pub fn new(config: CancelBroadcasterConfig) -> anyhow::Result<Self> {
368 let mut transports = Vec::with_capacity(config.pool_size);
369
370 let base_url = match config.environment {
371 BitmexEnvironment::Testnet if config.base_url.is_none() => {
372 Some(BITMEX_HTTP_TESTNET_URL.to_string())
373 }
374 _ => config.base_url.clone(),
375 };
376
377 for i in 0..config.pool_size {
378 let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
380
381 let client = BitmexHttpClient::with_credentials(
382 config.api_key.clone(),
383 config.api_secret.clone(),
384 base_url.clone(),
385 config.timeout_secs,
386 config.max_retries,
387 config.retry_delay_ms,
388 config.retry_delay_max_ms,
389 config.recv_window_ms,
390 config.max_requests_per_second,
391 config.max_requests_per_minute,
392 proxy_url,
393 )
394 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
395
396 transports.push(TransportClient::new(client, format!("bitmex-cancel-{i}")));
397 }
398
399 Ok(Self {
400 config,
401 transports: Arc::new(transports),
402 health_check_task: Arc::new(RwLock::new(None)),
403 running: Arc::new(AtomicBool::new(false)),
404 total_cancels: Arc::new(AtomicU64::new(0)),
405 successful_cancels: Arc::new(AtomicU64::new(0)),
406 failed_cancels: Arc::new(AtomicU64::new(0)),
407 expected_rejects: Arc::new(AtomicU64::new(0)),
408 idempotent_successes: Arc::new(AtomicU64::new(0)),
409 })
410 }
411
412 pub async fn start(&self) -> anyhow::Result<()> {
418 if self.running.load(Ordering::Relaxed) {
419 return Ok(());
420 }
421
422 self.running.store(true, Ordering::Relaxed);
423
424 self.run_health_checks().await;
426
427 let transports = Arc::clone(&self.transports);
429 let running = Arc::clone(&self.running);
430 let interval_secs = self.config.health_check_interval_secs;
431 let timeout_secs = self.config.health_check_timeout_secs;
432
433 let task = get_runtime().spawn(async move {
434 let mut ticker = interval(Duration::from_secs(interval_secs));
435 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
436
437 loop {
438 ticker.tick().await;
439
440 if !running.load(Ordering::Relaxed) {
441 break;
442 }
443
444 let tasks: Vec<_> = transports
445 .iter()
446 .map(|t| t.health_check(timeout_secs))
447 .collect();
448
449 let results = future::join_all(tasks).await;
450 let healthy_count = results.iter().filter(|&&r| r).count();
451
452 log::debug!(
453 "Health check complete: {}/{} clients healthy",
454 healthy_count,
455 results.len()
456 );
457 }
458 });
459
460 *self.health_check_task.write().await = Some(task);
461
462 log::info!(
463 "CancelBroadcaster started with {} clients",
464 self.transports.len()
465 );
466
467 Ok(())
468 }
469
470 pub async fn stop(&self) {
472 if !self.running.load(Ordering::Relaxed) {
473 return;
474 }
475
476 self.running.store(false, Ordering::Relaxed);
477
478 if let Some(task) = self.health_check_task.write().await.take() {
479 task.abort();
480 }
481
482 log::info!("CancelBroadcaster stopped");
483 }
484
485 async fn run_health_checks(&self) {
486 let tasks: Vec<_> = self
487 .transports
488 .iter()
489 .map(|t| t.health_check(self.config.health_check_timeout_secs))
490 .collect();
491
492 let results = future::join_all(tasks).await;
493 let healthy_count = results.iter().filter(|&&r| r).count();
494
495 log::debug!(
496 "Health check complete: {}/{} clients healthy",
497 healthy_count,
498 results.len()
499 );
500 }
501
502 fn is_expected_reject(&self, error_message: &str) -> bool {
503 self.config
504 .expected_reject_patterns
505 .iter()
506 .any(|pattern| error_message.contains(pattern))
507 }
508
509 fn is_idempotent_success(&self, error_message: &str) -> bool {
510 self.config
511 .idempotent_success_patterns
512 .iter()
513 .any(|pattern| error_message.contains(pattern))
514 }
515
516 async fn process_cancel_results<T>(
520 &self,
521 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
522 idempotent_result: impl FnOnce() -> anyhow::Result<T>,
523 operation: &str,
524 params: String,
525 idempotent_reason: &str,
526 ) -> anyhow::Result<T>
527 where
528 T: Send + 'static,
529 {
530 let mut errors = Vec::new();
531
532 while !handles.is_empty() {
533 let current_handles = std::mem::take(&mut handles);
534 let (result, _idx, remaining) = future::select_all(current_handles).await;
535 handles = remaining.into_iter().collect();
536
537 match result {
538 Ok((client_id, Ok(result))) => {
539 for handle in &handles {
541 handle.abort();
542 }
543 self.successful_cancels.fetch_add(1, Ordering::Relaxed);
544
545 log::debug!("{operation} broadcast succeeded [{client_id}] {params}");
546
547 return Ok(result);
548 }
549 Ok((client_id, Err(e))) => {
550 let error_msg = e.to_string();
551
552 if self.is_idempotent_success(&error_msg) {
553 for handle in &handles {
555 handle.abort();
556 }
557 self.idempotent_successes.fetch_add(1, Ordering::Relaxed);
558
559 log::debug!(
560 "Idempotent success [{client_id}] - {idempotent_reason}: {error_msg} {params}",
561 );
562
563 return idempotent_result();
564 }
565
566 if self.is_expected_reject(&error_msg) {
567 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
568 log::debug!(
569 "Expected {} rejection [{}]: {} {}",
570 operation.to_lowercase(),
571 client_id,
572 error_msg,
573 params
574 );
575 errors.push(error_msg);
576 } else {
577 log::warn!(
578 "{operation} request failed [{client_id}]: {error_msg} {params}"
579 );
580 errors.push(error_msg);
581 }
582 }
583 Err(e) => {
584 log::warn!("{operation} task join error: {e:?}");
585 errors.push(format!("Task panicked: {e:?}"));
586 }
587 }
588 }
589
590 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
592 log::error!(
593 "All {} requests failed: {errors:?} {params}",
594 operation.to_lowercase(),
595 );
596 Err(anyhow::anyhow!(
597 "All {} requests failed: {errors:?}",
598 operation.to_lowercase(),
599 ))
600 }
601
602 pub async fn broadcast_cancel(
614 &self,
615 instrument_id: InstrumentId,
616 client_order_id: Option<ClientOrderId>,
617 venue_order_id: Option<VenueOrderId>,
618 ) -> anyhow::Result<Option<OrderStatusReport>> {
619 self.total_cancels.fetch_add(1, Ordering::Relaxed);
620
621 let healthy_transports: Vec<TransportClient> = self
622 .transports
623 .iter()
624 .filter(|t| t.is_healthy())
625 .cloned()
626 .collect();
627
628 if healthy_transports.is_empty() {
629 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
630 anyhow::bail!("No healthy transport clients available");
631 }
632
633 let mut handles = Vec::new();
634
635 for transport in healthy_transports {
636 let handle = get_runtime().spawn(async move {
637 let client_id = transport.client_id.clone();
638 let result = transport
639 .cancel_order(instrument_id, client_order_id, venue_order_id)
640 .await
641 .map(Some); (client_id, result)
643 });
644 handles.push(handle);
645 }
646
647 self.process_cancel_results(
648 handles,
649 || Ok(None),
650 "Cancel",
651 format!("(client_order_id={client_order_id:?}, venue_order_id={venue_order_id:?})"),
652 "order already cancelled/not found",
653 )
654 .await
655 }
656
657 pub async fn broadcast_batch_cancel(
663 &self,
664 instrument_id: InstrumentId,
665 client_order_ids: Option<Vec<ClientOrderId>>,
666 venue_order_ids: Option<Vec<VenueOrderId>>,
667 ) -> anyhow::Result<Vec<OrderStatusReport>> {
668 self.total_cancels.fetch_add(1, Ordering::Relaxed);
669
670 let healthy_transports: Vec<TransportClient> = self
671 .transports
672 .iter()
673 .filter(|t| t.is_healthy())
674 .cloned()
675 .collect();
676
677 if healthy_transports.is_empty() {
678 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
679 anyhow::bail!("No healthy transport clients available");
680 }
681
682 let mut handles = Vec::new();
683
684 for transport in healthy_transports {
685 let client_order_ids_clone = client_order_ids.clone();
686 let venue_order_ids_clone = venue_order_ids.clone();
687 let handle = get_runtime().spawn(async move {
688 let client_id = transport.client_id.clone();
689 let result = transport
690 .executor
691 .cancel_orders(instrument_id, client_order_ids_clone, venue_order_ids_clone)
692 .await;
693 (client_id, result)
694 });
695 handles.push(handle);
696 }
697
698 self.process_cancel_results(
699 handles,
700 || Ok(Vec::new()),
701 "Batch cancel",
702 format!("(client_order_ids={client_order_ids:?}, venue_order_ids={venue_order_ids:?})"),
703 "orders already cancelled/not found",
704 )
705 .await
706 }
707
708 pub async fn broadcast_cancel_all(
714 &self,
715 instrument_id: InstrumentId,
716 order_side: Option<OrderSide>,
717 ) -> anyhow::Result<Vec<OrderStatusReport>> {
718 self.total_cancels.fetch_add(1, Ordering::Relaxed);
719
720 let healthy_transports: Vec<TransportClient> = self
721 .transports
722 .iter()
723 .filter(|t| t.is_healthy())
724 .cloned()
725 .collect();
726
727 if healthy_transports.is_empty() {
728 self.failed_cancels.fetch_add(1, Ordering::Relaxed);
729 anyhow::bail!("No healthy transport clients available");
730 }
731
732 let mut handles = Vec::new();
733
734 for transport in healthy_transports {
735 let handle = get_runtime().spawn(async move {
736 let client_id = transport.client_id.clone();
737 let result = transport
738 .executor
739 .cancel_all_orders(instrument_id, order_side)
740 .await;
741 (client_id, result)
742 });
743 handles.push(handle);
744 }
745
746 self.process_cancel_results(
747 handles,
748 || Ok(Vec::new()),
749 "Cancel all",
750 format!("(instrument_id={instrument_id}, order_side={order_side:?})"),
751 "no orders to cancel",
752 )
753 .await
754 }
755
756 pub fn get_metrics(&self) -> BroadcasterMetrics {
758 let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
759 let total_clients = self.transports.len();
760
761 BroadcasterMetrics {
762 total_cancels: self.total_cancels.load(Ordering::Relaxed),
763 successful_cancels: self.successful_cancels.load(Ordering::Relaxed),
764 failed_cancels: self.failed_cancels.load(Ordering::Relaxed),
765 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
766 idempotent_successes: self.idempotent_successes.load(Ordering::Relaxed),
767 healthy_clients,
768 total_clients,
769 }
770 }
771
772 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
774 self.get_metrics()
775 }
776
777 pub fn get_client_stats(&self) -> Vec<ClientStats> {
779 self.transports
780 .iter()
781 .map(|t| ClientStats {
782 client_id: t.client_id.clone(),
783 healthy: t.is_healthy(),
784 cancel_count: t.get_cancel_count(),
785 error_count: t.get_error_count(),
786 })
787 .collect()
788 }
789
790 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
792 self.get_client_stats()
793 }
794
795 pub fn cache_instrument(&self, instrument: &InstrumentAny) {
797 for transport in self.transports.iter() {
798 transport.executor.add_instrument(instrument.clone());
799 }
800 }
801
802 #[must_use]
803 pub fn clone_for_async(&self) -> Self {
804 Self {
805 config: self.config.clone(),
806 transports: Arc::clone(&self.transports),
807 health_check_task: Arc::clone(&self.health_check_task),
808 running: Arc::clone(&self.running),
809 total_cancels: Arc::clone(&self.total_cancels),
810 successful_cancels: Arc::clone(&self.successful_cancels),
811 failed_cancels: Arc::clone(&self.failed_cancels),
812 expected_rejects: Arc::clone(&self.expected_rejects),
813 idempotent_successes: Arc::clone(&self.idempotent_successes),
814 }
815 }
816
817 #[cfg(test)]
818 fn new_with_transports(
819 config: CancelBroadcasterConfig,
820 transports: Vec<TransportClient>,
821 ) -> Self {
822 Self {
823 config,
824 transports: Arc::new(transports),
825 health_check_task: Arc::new(RwLock::new(None)),
826 running: Arc::new(AtomicBool::new(false)),
827 total_cancels: Arc::new(AtomicU64::new(0)),
828 successful_cancels: Arc::new(AtomicU64::new(0)),
829 failed_cancels: Arc::new(AtomicU64::new(0)),
830 expected_rejects: Arc::new(AtomicU64::new(0)),
831 idempotent_successes: Arc::new(AtomicU64::new(0)),
832 }
833 }
834}
835
836#[derive(Debug, Clone)]
838pub struct BroadcasterMetrics {
839 pub total_cancels: u64,
840 pub successful_cancels: u64,
841 pub failed_cancels: u64,
842 pub expected_rejects: u64,
843 pub idempotent_successes: u64,
844 pub healthy_clients: usize,
845 pub total_clients: usize,
846}
847
848#[derive(Debug, Clone)]
850pub struct ClientStats {
851 pub client_id: String,
852 pub healthy: bool,
853 pub cancel_count: u64,
854 pub error_count: u64,
855}
856
857#[cfg(test)]
858mod tests {
859 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
860
861 use nautilus_core::UUID4;
862 use nautilus_model::{
863 enums::{
864 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
865 },
866 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
867 reports::OrderStatusReport,
868 types::{Price, Quantity},
869 };
870
871 use super::*;
872
873 #[derive(Clone)]
875 #[expect(clippy::type_complexity)]
876 struct MockExecutor {
877 handler: Arc<
878 dyn Fn(
879 InstrumentId,
880 Option<ClientOrderId>,
881 Option<VenueOrderId>,
882 )
883 -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
884 + Send
885 + Sync,
886 >,
887 }
888
889 impl MockExecutor {
890 fn new<F, Fut>(handler: F) -> Self
891 where
892 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
893 + Send
894 + Sync
895 + 'static,
896 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
897 {
898 Self {
899 handler: Arc::new(move |id, cid, vid| Box::pin(handler(id, cid, vid))),
900 }
901 }
902 }
903
904 impl CancelExecutor for MockExecutor {
905 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
906 Box::pin(async { Ok(()) })
907 }
908
909 fn cancel_order(
910 &self,
911 instrument_id: InstrumentId,
912 client_order_id: Option<ClientOrderId>,
913 venue_order_id: Option<VenueOrderId>,
914 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
915 (self.handler)(instrument_id, client_order_id, venue_order_id)
916 }
917
918 fn cancel_orders(
919 &self,
920 _instrument_id: InstrumentId,
921 _client_order_ids: Option<Vec<ClientOrderId>>,
922 _venue_order_ids: Option<Vec<VenueOrderId>>,
923 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
924 {
925 Box::pin(async { Ok(Vec::new()) })
926 }
927
928 fn cancel_all_orders(
929 &self,
930 instrument_id: InstrumentId,
931 _order_side: Option<OrderSide>,
932 ) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<OrderStatusReport>>> + Send + '_>>
933 {
934 let handler = Arc::clone(&self.handler);
936 Box::pin(async move {
937 let result = handler(instrument_id, None, None).await;
939 match result {
940 Ok(_) => Ok(Vec::new()),
941 Err(e) => Err(e),
942 }
943 })
944 }
945
946 fn add_instrument(&self, _instrument: InstrumentAny) {
947 }
949 }
950
951 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
952 OrderStatusReport {
953 account_id: AccountId::from("BITMEX-001"),
954 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
955 venue_order_id: VenueOrderId::from(venue_order_id),
956 order_side: OrderSide::Buy,
957 order_type: OrderType::Limit,
958 time_in_force: TimeInForce::Gtc,
959 order_status: OrderStatus::Canceled,
960 price: Some(Price::new(50000.0, 2)),
961 quantity: Quantity::new(100.0, 0),
962 filled_qty: Quantity::new(0.0, 0),
963 report_id: UUID4::new(),
964 ts_accepted: 0.into(),
965 ts_last: 0.into(),
966 ts_init: 0.into(),
967 client_order_id: None,
968 avg_px: None,
969 trigger_price: None,
970 trigger_type: None,
971 contingency_type: ContingencyType::NoContingency,
972 expire_time: None,
973 order_list_id: None,
974 venue_position_id: None,
975 linked_order_ids: None,
976 parent_order_id: None,
977 display_qty: None,
978 limit_offset: None,
979 trailing_offset: None,
980 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
981 post_only: false,
982 reduce_only: false,
983 cancel_reason: None,
984 ts_triggered: None,
985 }
986 }
987
988 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
989 where
990 F: Fn(InstrumentId, Option<ClientOrderId>, Option<VenueOrderId>) -> Fut
991 + Send
992 + Sync
993 + 'static,
994 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
995 {
996 let executor = MockExecutor::new(handler);
997 TransportClient::new(executor, client_id.to_string())
998 }
999
1000 #[tokio::test]
1001 async fn test_broadcast_cancel_immediate_success() {
1002 let report = create_test_report("ORDER-1");
1003 let report_clone = report.clone();
1004
1005 let transports = vec![
1006 create_stub_transport("client-0", move |_, _, _| {
1007 let report = report_clone.clone();
1008 async move { Ok(report) }
1009 }),
1010 create_stub_transport("client-1", |_, _, _| async {
1011 tokio::time::sleep(Duration::from_secs(10)).await;
1012 anyhow::bail!("Should be aborted")
1013 }),
1014 ];
1015
1016 let config = CancelBroadcasterConfig::default();
1017 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1018
1019 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1020 let result = broadcaster
1021 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1022 .await;
1023
1024 assert!(result.is_ok());
1025 let returned_report = result.unwrap();
1026 assert!(returned_report.is_some());
1027 assert_eq!(
1028 returned_report.unwrap().venue_order_id,
1029 report.venue_order_id
1030 );
1031
1032 let metrics = broadcaster.get_metrics_async().await;
1033 assert_eq!(metrics.successful_cancels, 1);
1034 assert_eq!(metrics.failed_cancels, 0);
1035 assert_eq!(metrics.total_cancels, 1);
1036 }
1037
1038 #[tokio::test]
1039 async fn test_broadcast_cancel_idempotent_success() {
1040 let transports = vec![
1041 create_stub_transport("client-0", |_, _, _| async {
1042 anyhow::bail!("AlreadyCanceled")
1043 }),
1044 create_stub_transport("client-1", |_, _, _| async {
1045 tokio::time::sleep(Duration::from_secs(10)).await;
1046 anyhow::bail!("Should be aborted")
1047 }),
1048 ];
1049
1050 let config = CancelBroadcasterConfig::default();
1051 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1052
1053 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1054 let result = broadcaster
1055 .broadcast_cancel(instrument_id, None, Some(VenueOrderId::from("12345")))
1056 .await;
1057
1058 assert!(result.is_ok());
1059 assert!(result.unwrap().is_none());
1060
1061 let metrics = broadcaster.get_metrics_async().await;
1062 assert_eq!(metrics.idempotent_successes, 1);
1063 assert_eq!(metrics.successful_cancels, 0);
1064 assert_eq!(metrics.failed_cancels, 0);
1065 }
1066
1067 #[tokio::test]
1068 async fn test_broadcast_cancel_mixed_idempotent_and_failure() {
1069 let transports = vec![
1070 create_stub_transport("client-0", |_, _, _| async {
1071 anyhow::bail!("502 Bad Gateway")
1072 }),
1073 create_stub_transport("client-1", |_, _, _| async {
1074 anyhow::bail!("orderID not found")
1075 }),
1076 ];
1077
1078 let config = CancelBroadcasterConfig::default();
1079 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1080
1081 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1082 let result = broadcaster
1083 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-456")), None)
1084 .await;
1085
1086 assert!(result.is_ok());
1087 assert!(result.unwrap().is_none());
1088
1089 let metrics = broadcaster.get_metrics_async().await;
1090 assert_eq!(metrics.idempotent_successes, 1);
1091 assert_eq!(metrics.failed_cancels, 0);
1092 }
1093
1094 #[tokio::test]
1095 async fn test_broadcast_cancel_all_failures() {
1096 let transports = vec![
1097 create_stub_transport("client-0", |_, _, _| async {
1098 anyhow::bail!("502 Bad Gateway")
1099 }),
1100 create_stub_transport("client-1", |_, _, _| async {
1101 anyhow::bail!("Connection refused")
1102 }),
1103 ];
1104
1105 let config = CancelBroadcasterConfig::default();
1106 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1107
1108 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1109 let result = broadcaster.broadcast_cancel_all(instrument_id, None).await;
1110
1111 assert!(result.is_err());
1112 assert!(
1113 result
1114 .unwrap_err()
1115 .to_string()
1116 .contains("All cancel all requests failed")
1117 );
1118
1119 let metrics = broadcaster.get_metrics_async().await;
1120 assert_eq!(metrics.failed_cancels, 1);
1121 assert_eq!(metrics.successful_cancels, 0);
1122 assert_eq!(metrics.idempotent_successes, 0);
1123 }
1124
1125 #[tokio::test]
1126 async fn test_broadcast_cancel_no_healthy_clients() {
1127 let transport = create_stub_transport("client-0", |_, _, _| async {
1128 Ok(create_test_report("ORDER-1"))
1129 });
1130 transport.healthy.store(false, Ordering::Relaxed);
1131
1132 let config = CancelBroadcasterConfig::default();
1133 let broadcaster = CancelBroadcaster::new_with_transports(config, vec![transport]);
1134
1135 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1136 let result = broadcaster
1137 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-789")), None)
1138 .await;
1139
1140 assert!(result.is_err());
1141 assert!(
1142 result
1143 .unwrap_err()
1144 .to_string()
1145 .contains("No healthy transport clients available")
1146 );
1147
1148 let metrics = broadcaster.get_metrics_async().await;
1149 assert_eq!(metrics.failed_cancels, 1);
1150 }
1151
1152 #[tokio::test]
1153 async fn test_broadcast_cancel_metrics_increment() {
1154 let report1 = create_test_report("ORDER-1");
1155 let report1_clone = report1.clone();
1156 let report2 = create_test_report("ORDER-2");
1157 let report2_clone = report2.clone();
1158
1159 let transports = vec![
1160 create_stub_transport("client-0", move |_, _, _| {
1161 let report = report1_clone.clone();
1162 async move { Ok(report) }
1163 }),
1164 create_stub_transport("client-1", move |_, _, _| {
1165 let report = report2_clone.clone();
1166 async move { Ok(report) }
1167 }),
1168 ];
1169
1170 let config = CancelBroadcasterConfig::default();
1171 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1172
1173 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1174
1175 let _ = broadcaster
1176 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-1")), None)
1177 .await;
1178
1179 let _ = broadcaster
1180 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-2")), None)
1181 .await;
1182
1183 let metrics = broadcaster.get_metrics_async().await;
1184 assert_eq!(metrics.total_cancels, 2);
1185 assert_eq!(metrics.successful_cancels, 2);
1186 }
1187
1188 #[tokio::test]
1189 async fn test_broadcast_cancel_expected_reject_pattern() {
1190 let transports = vec![
1191 create_stub_transport("client-0", |_, _, _| async {
1192 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1193 }),
1194 create_stub_transport("client-1", |_, _, _| async {
1195 anyhow::bail!("Order had execInst of ParticipateDoNotInitiate")
1196 }),
1197 ];
1198
1199 let config = CancelBroadcasterConfig::default();
1200 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1201
1202 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1203 let result = broadcaster
1204 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-PDI")), None)
1205 .await;
1206
1207 assert!(result.is_err());
1208
1209 let metrics = broadcaster.get_metrics_async().await;
1210 assert_eq!(metrics.expected_rejects, 2);
1211 assert_eq!(metrics.failed_cancels, 1);
1212 }
1213
1214 #[tokio::test]
1215 async fn test_broadcaster_creation_with_pool() {
1216 let transports = vec![
1217 create_stub_transport("client-0", |_, _, _| async {
1218 Ok(create_test_report("ORDER-1"))
1219 }),
1220 create_stub_transport("client-1", |_, _, _| async {
1221 Ok(create_test_report("ORDER-1"))
1222 }),
1223 create_stub_transport("client-2", |_, _, _| async {
1224 Ok(create_test_report("ORDER-1"))
1225 }),
1226 ];
1227
1228 let config = CancelBroadcasterConfig::default();
1229 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1230 let metrics = broadcaster.get_metrics_async().await;
1231
1232 assert_eq!(metrics.total_clients, 3);
1233 assert_eq!(metrics.total_cancels, 0);
1234 assert_eq!(metrics.successful_cancels, 0);
1235 assert_eq!(metrics.failed_cancels, 0);
1236 }
1237
1238 #[tokio::test]
1239 async fn test_broadcaster_lifecycle() {
1240 let transports = vec![
1241 create_stub_transport("client-0", |_, _, _| async {
1242 Ok(create_test_report("ORDER-1"))
1243 }),
1244 create_stub_transport("client-1", |_, _, _| async {
1245 Ok(create_test_report("ORDER-1"))
1246 }),
1247 ];
1248
1249 let config = CancelBroadcasterConfig::default();
1250 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1251
1252 assert!(!broadcaster.running.load(Ordering::Relaxed));
1254
1255 let start_result = broadcaster.start().await;
1257 assert!(start_result.is_ok());
1258 assert!(broadcaster.running.load(Ordering::Relaxed));
1259
1260 let start_again = broadcaster.start().await;
1262 assert!(start_again.is_ok());
1263
1264 broadcaster.stop().await;
1266 assert!(!broadcaster.running.load(Ordering::Relaxed));
1267
1268 broadcaster.stop().await;
1270 assert!(!broadcaster.running.load(Ordering::Relaxed));
1271 }
1272
1273 #[tokio::test]
1274 async fn test_client_stats_collection() {
1275 let transports = vec![
1276 create_stub_transport("client-0", |_, _, _| async {
1277 Ok(create_test_report("ORDER-1"))
1278 }),
1279 create_stub_transport("client-1", |_, _, _| async {
1280 Ok(create_test_report("ORDER-1"))
1281 }),
1282 ];
1283
1284 let config = CancelBroadcasterConfig::default();
1285 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1286 let stats = broadcaster.get_client_stats_async().await;
1287
1288 assert_eq!(stats.len(), 2);
1289 assert_eq!(stats[0].client_id, "client-0");
1290 assert_eq!(stats[1].client_id, "client-1");
1291 assert!(stats[0].healthy); assert!(stats[1].healthy);
1293 assert_eq!(stats[0].cancel_count, 0);
1294 assert_eq!(stats[1].cancel_count, 0);
1295 assert_eq!(stats[0].error_count, 0);
1296 assert_eq!(stats[1].error_count, 0);
1297 }
1298
1299 #[tokio::test]
1300 async fn test_testnet_config_sets_base_url() {
1301 let config = CancelBroadcasterConfig {
1302 pool_size: 1,
1303 api_key: Some("test_key".to_string()),
1304 api_secret: Some("test_secret".to_string()),
1305 base_url: None,
1306 environment: BitmexEnvironment::Testnet,
1307 timeout_secs: 5,
1308 max_retries: 3,
1309 retry_delay_ms: 1_000,
1310 retry_delay_max_ms: 5_000,
1311 recv_window_ms: 10_000,
1312 max_requests_per_second: 10,
1313 max_requests_per_minute: 120,
1314 health_check_interval_secs: 60,
1315 health_check_timeout_secs: 5,
1316 expected_reject_patterns: vec![],
1317 idempotent_success_patterns: vec![],
1318 proxy_urls: vec![],
1319 };
1320
1321 let broadcaster = CancelBroadcaster::new(config);
1322 assert!(broadcaster.is_ok());
1323 }
1324
1325 #[tokio::test]
1326 async fn test_constructor_honors_default_pool_size() {
1327 let config = CancelBroadcasterConfig {
1328 api_key: Some("test_key".to_string()),
1329 api_secret: Some("test_secret".to_string()),
1330 base_url: Some("http://127.0.0.1:19999".to_string()),
1331 ..Default::default()
1332 };
1333
1334 let expected_pool = config.pool_size;
1335 let broadcaster = CancelBroadcaster::new(config).unwrap();
1336 let metrics = broadcaster.get_metrics_async().await;
1337
1338 assert_eq!(metrics.total_clients, expected_pool);
1339 }
1340
1341 #[tokio::test]
1342 async fn test_default_config() {
1343 let transports = vec![
1344 create_stub_transport("client-0", |_, _, _| async {
1345 Ok(create_test_report("ORDER-1"))
1346 }),
1347 create_stub_transport("client-1", |_, _, _| async {
1348 Ok(create_test_report("ORDER-1"))
1349 }),
1350 ];
1351
1352 let config = CancelBroadcasterConfig::default();
1353 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1354 let metrics = broadcaster.get_metrics_async().await;
1355
1356 assert_eq!(metrics.total_clients, 2);
1358 }
1359
1360 #[tokio::test]
1361 async fn test_clone_for_async() {
1362 let transports = vec![create_stub_transport("client-0", |_, _, _| async {
1363 Ok(create_test_report("ORDER-1"))
1364 })];
1365
1366 let config = CancelBroadcasterConfig::default();
1367 let broadcaster1 = CancelBroadcaster::new_with_transports(config, transports);
1368
1369 broadcaster1.total_cancels.fetch_add(1, Ordering::Relaxed);
1371
1372 let broadcaster2 = broadcaster1.clone_for_async();
1374 let metrics2 = broadcaster2.get_metrics_async().await;
1375
1376 assert_eq!(metrics2.total_cancels, 1); broadcaster2
1380 .successful_cancels
1381 .fetch_add(5, Ordering::Relaxed);
1382
1383 let metrics1 = broadcaster1.get_metrics_async().await;
1385 assert_eq!(metrics1.successful_cancels, 5);
1386 }
1387
1388 #[tokio::test]
1389 async fn test_pattern_matching() {
1390 let transports = vec![create_stub_transport("client-0", |_, _, _| async {
1392 Ok(create_test_report("ORDER-1"))
1393 })];
1394
1395 let config = CancelBroadcasterConfig {
1396 expected_reject_patterns: vec![
1397 "ParticipateDoNotInitiate".to_string(),
1398 "Close-only".to_string(),
1399 ],
1400 idempotent_success_patterns: vec![
1401 "AlreadyCanceled".to_string(),
1402 "orderID not found".to_string(),
1403 "Unable to cancel".to_string(),
1404 ],
1405 ..Default::default()
1406 };
1407
1408 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1409
1410 assert!(broadcaster.is_expected_reject("Order had execInst of ParticipateDoNotInitiate"));
1412 assert!(broadcaster.is_expected_reject("This is a Close-only order"));
1413 assert!(!broadcaster.is_expected_reject("Connection timeout"));
1414
1415 assert!(broadcaster.is_idempotent_success("AlreadyCanceled"));
1417 assert!(broadcaster.is_idempotent_success("Error: orderID not found for this account"));
1418 assert!(broadcaster.is_idempotent_success("Unable to cancel order due to existing state"));
1419 assert!(!broadcaster.is_idempotent_success("502 Bad Gateway"));
1420 }
1421
1422 #[tokio::test]
1426 async fn test_broadcast_batch_cancel_structure() {
1427 let transports = vec![
1429 create_stub_transport("client-0", |_, _, _| async {
1430 Ok(create_test_report("ORDER-1"))
1431 }),
1432 create_stub_transport("client-1", |_, _, _| async {
1433 Ok(create_test_report("ORDER-1"))
1434 }),
1435 ];
1436
1437 let config = CancelBroadcasterConfig {
1438 idempotent_success_patterns: vec!["AlreadyCanceled".to_string()],
1439 ..Default::default()
1440 };
1441
1442 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1443 let metrics = broadcaster.get_metrics_async().await;
1444
1445 assert_eq!(metrics.total_clients, 2);
1447 assert_eq!(metrics.total_cancels, 0);
1448 assert_eq!(metrics.successful_cancels, 0);
1449 assert_eq!(metrics.failed_cancels, 0);
1450 }
1451
1452 #[tokio::test]
1453 async fn test_broadcast_cancel_all_structure() {
1454 let transports = vec![
1456 create_stub_transport("client-0", |_, _, _| async {
1457 Ok(create_test_report("ORDER-1"))
1458 }),
1459 create_stub_transport("client-1", |_, _, _| async {
1460 Ok(create_test_report("ORDER-1"))
1461 }),
1462 create_stub_transport("client-2", |_, _, _| async {
1463 Ok(create_test_report("ORDER-1"))
1464 }),
1465 ];
1466
1467 let config = CancelBroadcasterConfig {
1468 idempotent_success_patterns: vec!["orderID not found".to_string()],
1469 ..Default::default()
1470 };
1471
1472 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1473 let metrics = broadcaster.get_metrics_async().await;
1474
1475 assert_eq!(metrics.total_clients, 3);
1477 assert_eq!(metrics.healthy_clients, 3);
1478 assert_eq!(metrics.total_cancels, 0);
1479 }
1480
1481 #[tokio::test]
1483 async fn test_single_cancel_metrics_with_mixed_responses() {
1484 let transports = vec![
1487 create_stub_transport("client-0", |_, _, _| async {
1488 anyhow::bail!("Connection timeout")
1489 }),
1490 create_stub_transport("client-1", |_, _, _| async {
1491 anyhow::bail!("AlreadyCanceled")
1492 }),
1493 ];
1494
1495 let config = CancelBroadcasterConfig::default();
1496 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1497
1498 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1499 let result = broadcaster
1500 .broadcast_cancel(instrument_id, Some(ClientOrderId::from("O-123")), None)
1501 .await;
1502
1503 assert!(result.is_ok());
1505 assert!(result.unwrap().is_none());
1506
1507 let metrics = broadcaster.get_metrics_async().await;
1509 assert_eq!(
1510 metrics.failed_cancels, 0,
1511 "Idempotent success should not increment failed_cancels"
1512 );
1513 assert_eq!(metrics.idempotent_successes, 1);
1514 assert_eq!(metrics.successful_cancels, 0);
1515 }
1516
1517 #[tokio::test]
1518 async fn test_metrics_initialization_and_health() {
1519 let transports = vec![
1521 create_stub_transport("client-0", |_, _, _| async {
1522 Ok(create_test_report("ORDER-1"))
1523 }),
1524 create_stub_transport("client-1", |_, _, _| async {
1525 Ok(create_test_report("ORDER-1"))
1526 }),
1527 create_stub_transport("client-2", |_, _, _| async {
1528 Ok(create_test_report("ORDER-1"))
1529 }),
1530 create_stub_transport("client-3", |_, _, _| async {
1531 Ok(create_test_report("ORDER-1"))
1532 }),
1533 ];
1534
1535 let config = CancelBroadcasterConfig::default();
1536 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1537 let metrics = broadcaster.get_metrics_async().await;
1538
1539 assert_eq!(metrics.total_cancels, 0);
1541 assert_eq!(metrics.successful_cancels, 0);
1542 assert_eq!(metrics.failed_cancels, 0);
1543 assert_eq!(metrics.expected_rejects, 0);
1544 assert_eq!(metrics.idempotent_successes, 0);
1545
1546 assert_eq!(metrics.healthy_clients, 4);
1548 assert_eq!(metrics.total_clients, 4);
1549 }
1550
1551 #[tokio::test]
1553 async fn test_health_check_task_lifecycle() {
1554 let transports = vec![create_stub_transport("client-0", |_, _, _| async {
1555 Ok(create_test_report("ORDER-1"))
1556 })];
1557
1558 let config = CancelBroadcasterConfig {
1559 health_check_interval_secs: 1, health_check_timeout_secs: 1,
1561 ..Default::default()
1562 };
1563
1564 let broadcaster = CancelBroadcaster::new_with_transports(config, transports);
1565
1566 broadcaster.start().await.unwrap();
1568 assert!(broadcaster.running.load(Ordering::Relaxed));
1569
1570 {
1572 let task_guard = broadcaster.health_check_task.read().await;
1573 assert!(task_guard.is_some());
1574 }
1575
1576 broadcaster.stop().await;
1578 assert!(!broadcaster.running.load(Ordering::Relaxed));
1579
1580 {
1582 let task_guard = broadcaster.health_check_task.read().await;
1583 assert!(task_guard.is_none());
1584 }
1585 }
1586}