1use std::{
37 fmt::Debug,
38 future::Future,
39 pin::Pin,
40 sync::{
41 Arc,
42 atomic::{AtomicBool, AtomicU64, Ordering},
43 },
44 time::Duration,
45};
46
47use futures_util::future;
48use nautilus_common::live::get_runtime;
49use nautilus_model::{
50 enums::{ContingencyType, OrderSide, OrderType, TimeInForce, TrailingOffsetType, TriggerType},
51 identifiers::{ClientOrderId, InstrumentId, OrderListId},
52 instruments::InstrumentAny,
53 reports::OrderStatusReport,
54 types::{Price, Quantity},
55};
56use tokio::{sync::RwLock, task::JoinHandle, time::interval};
57
58use crate::{
59 common::{
60 consts::BITMEX_HTTP_TESTNET_URL,
61 enums::{BitmexEnvironment, BitmexPegPriceType},
62 },
63 http::client::BitmexHttpClient,
64};
65
66trait SubmitExecutor: Send + Sync {
88 fn add_instrument(&self, instrument: InstrumentAny);
90
91 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>>;
93
94 #[expect(clippy::too_many_arguments)]
96 fn submit_order(
97 &self,
98 instrument_id: InstrumentId,
99 client_order_id: ClientOrderId,
100 order_side: OrderSide,
101 order_type: OrderType,
102 quantity: Quantity,
103 time_in_force: TimeInForce,
104 price: Option<Price>,
105 trigger_price: Option<Price>,
106 trigger_type: Option<TriggerType>,
107 trailing_offset: Option<f64>,
108 trailing_offset_type: Option<TrailingOffsetType>,
109 display_qty: Option<Quantity>,
110 post_only: bool,
111 reduce_only: bool,
112 order_list_id: Option<OrderListId>,
113 contingency_type: Option<ContingencyType>,
114 peg_price_type: Option<BitmexPegPriceType>,
115 peg_offset_value: Option<f64>,
116 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>;
117}
118
119impl SubmitExecutor for BitmexHttpClient {
120 fn add_instrument(&self, instrument: InstrumentAny) {
121 Self::cache_instrument(self, instrument);
122 }
123
124 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
125 Box::pin(async move {
126 Self::get_server_time(self)
127 .await
128 .map(|_| ())
129 .map_err(|e| anyhow::anyhow!("{e}"))
130 })
131 }
132
133 fn submit_order(
134 &self,
135 instrument_id: InstrumentId,
136 client_order_id: ClientOrderId,
137 order_side: OrderSide,
138 order_type: OrderType,
139 quantity: Quantity,
140 time_in_force: TimeInForce,
141 price: Option<Price>,
142 trigger_price: Option<Price>,
143 trigger_type: Option<TriggerType>,
144 trailing_offset: Option<f64>,
145 trailing_offset_type: Option<TrailingOffsetType>,
146 display_qty: Option<Quantity>,
147 post_only: bool,
148 reduce_only: bool,
149 order_list_id: Option<OrderListId>,
150 contingency_type: Option<ContingencyType>,
151 peg_price_type: Option<BitmexPegPriceType>,
152 peg_offset_value: Option<f64>,
153 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
154 Box::pin(async move {
155 Self::submit_order(
156 self,
157 instrument_id,
158 client_order_id,
159 order_side,
160 order_type,
161 quantity,
162 time_in_force,
163 price,
164 trigger_price,
165 trigger_type,
166 trailing_offset,
167 trailing_offset_type,
168 display_qty,
169 post_only,
170 reduce_only,
171 order_list_id,
172 contingency_type,
173 peg_price_type,
174 peg_offset_value,
175 )
176 .await
177 })
178 }
179}
180
181#[derive(Debug, Clone)]
183pub struct SubmitBroadcasterConfig {
184 pub pool_size: usize,
186 pub api_key: Option<String>,
188 pub api_secret: Option<String>,
190 pub base_url: Option<String>,
192 pub environment: BitmexEnvironment,
194 pub timeout_secs: u64,
196 pub max_retries: u32,
198 pub retry_delay_ms: u64,
200 pub retry_delay_max_ms: u64,
202 pub recv_window_ms: u64,
204 pub max_requests_per_second: u32,
206 pub max_requests_per_minute: u32,
208 pub health_check_interval_secs: u64,
210 pub health_check_timeout_secs: u64,
212 pub expected_reject_patterns: Vec<String>,
214 pub proxy_urls: Vec<Option<String>>,
220}
221
222impl Default for SubmitBroadcasterConfig {
223 fn default() -> Self {
224 Self {
225 pool_size: 3,
226 api_key: None,
227 api_secret: None,
228 base_url: None,
229 environment: BitmexEnvironment::Mainnet,
230 timeout_secs: 60,
231 max_retries: 3,
232 retry_delay_ms: 1_000,
233 retry_delay_max_ms: 5_000,
234 recv_window_ms: 10_000,
235 max_requests_per_second: 10,
236 max_requests_per_minute: 120,
237 health_check_interval_secs: 30,
238 health_check_timeout_secs: 5,
239 expected_reject_patterns: vec!["Duplicate clOrdID".to_string()],
240 proxy_urls: vec![],
241 }
242 }
243}
244
245#[derive(Clone)]
247struct TransportClient {
248 executor: Arc<dyn SubmitExecutor>,
253 client_id: String,
254 healthy: Arc<AtomicBool>,
255 submit_count: Arc<AtomicU64>,
256 error_count: Arc<AtomicU64>,
257}
258
259impl Debug for TransportClient {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 f.debug_struct(stringify!(TransportClient))
262 .field("client_id", &self.client_id)
263 .field("healthy", &self.healthy)
264 .field("submit_count", &self.submit_count)
265 .field("error_count", &self.error_count)
266 .finish()
267 }
268}
269
270impl TransportClient {
271 fn new<E: SubmitExecutor + 'static>(executor: E, client_id: String) -> Self {
272 Self {
273 executor: Arc::new(executor),
274 client_id,
275 healthy: Arc::new(AtomicBool::new(true)),
276 submit_count: Arc::new(AtomicU64::new(0)),
277 error_count: Arc::new(AtomicU64::new(0)),
278 }
279 }
280
281 fn is_healthy(&self) -> bool {
282 self.healthy.load(Ordering::Relaxed)
283 }
284
285 fn mark_healthy(&self) {
286 self.healthy.store(true, Ordering::Relaxed);
287 }
288
289 fn mark_unhealthy(&self) {
290 self.healthy.store(false, Ordering::Relaxed);
291 }
292
293 fn get_submit_count(&self) -> u64 {
294 self.submit_count.load(Ordering::Relaxed)
295 }
296
297 fn get_error_count(&self) -> u64 {
298 self.error_count.load(Ordering::Relaxed)
299 }
300
301 async fn health_check(&self, timeout_secs: u64) -> bool {
302 match tokio::time::timeout(
303 Duration::from_secs(timeout_secs),
304 self.executor.health_check(),
305 )
306 .await
307 {
308 Ok(Ok(())) => {
309 self.mark_healthy();
310 true
311 }
312 Ok(Err(e)) => {
313 log::warn!("Health check failed for client {}: {e:?}", self.client_id);
314 self.mark_unhealthy();
315 false
316 }
317 Err(_) => {
318 log::warn!("Health check timeout for client {}", self.client_id);
319 self.mark_unhealthy();
320 false
321 }
322 }
323 }
324
325 #[expect(clippy::too_many_arguments)]
326 async fn submit_order(
327 &self,
328 instrument_id: InstrumentId,
329 client_order_id: ClientOrderId,
330 order_side: OrderSide,
331 order_type: OrderType,
332 quantity: Quantity,
333 time_in_force: TimeInForce,
334 price: Option<Price>,
335 trigger_price: Option<Price>,
336 trigger_type: Option<TriggerType>,
337 trailing_offset: Option<f64>,
338 trailing_offset_type: Option<TrailingOffsetType>,
339 display_qty: Option<Quantity>,
340 post_only: bool,
341 reduce_only: bool,
342 order_list_id: Option<OrderListId>,
343 contingency_type: Option<ContingencyType>,
344 peg_price_type: Option<BitmexPegPriceType>,
345 peg_offset_value: Option<f64>,
346 ) -> anyhow::Result<OrderStatusReport> {
347 self.submit_count.fetch_add(1, Ordering::Relaxed);
348
349 match self
350 .executor
351 .submit_order(
352 instrument_id,
353 client_order_id,
354 order_side,
355 order_type,
356 quantity,
357 time_in_force,
358 price,
359 trigger_price,
360 trigger_type,
361 trailing_offset,
362 trailing_offset_type,
363 display_qty,
364 post_only,
365 reduce_only,
366 order_list_id,
367 contingency_type,
368 peg_price_type,
369 peg_offset_value,
370 )
371 .await
372 {
373 Ok(report) => {
374 self.mark_healthy();
375 Ok(report)
376 }
377 Err(e) => {
378 self.error_count.fetch_add(1, Ordering::Relaxed);
379 Err(e)
380 }
381 }
382 }
383}
384
385#[cfg_attr(feature = "python", pyo3::pyclass)]
391#[cfg_attr(
392 feature = "python",
393 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bitmex")
394)]
395#[derive(Debug)]
396pub struct SubmitBroadcaster {
397 config: SubmitBroadcasterConfig,
398 transports: Arc<Vec<TransportClient>>,
399 health_check_task: Arc<RwLock<Option<JoinHandle<()>>>>,
400 running: Arc<AtomicBool>,
401 total_submits: Arc<AtomicU64>,
402 successful_submits: Arc<AtomicU64>,
403 failed_submits: Arc<AtomicU64>,
404 expected_rejects: Arc<AtomicU64>,
405}
406
407impl SubmitBroadcaster {
408 pub fn new(config: SubmitBroadcasterConfig) -> anyhow::Result<Self> {
414 let mut transports = Vec::with_capacity(config.pool_size);
415
416 let base_url = match config.environment {
417 BitmexEnvironment::Testnet if config.base_url.is_none() => {
418 Some(BITMEX_HTTP_TESTNET_URL.to_string())
419 }
420 _ => config.base_url.clone(),
421 };
422
423 for i in 0..config.pool_size {
424 let proxy_url = config.proxy_urls.get(i).and_then(|p| p.clone());
426
427 let client = BitmexHttpClient::with_credentials(
428 config.api_key.clone(),
429 config.api_secret.clone(),
430 base_url.clone(),
431 config.timeout_secs,
432 config.max_retries,
433 config.retry_delay_ms,
434 config.retry_delay_max_ms,
435 config.recv_window_ms,
436 config.max_requests_per_second,
437 config.max_requests_per_minute,
438 proxy_url,
439 )
440 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client {i}: {e}"))?;
441
442 transports.push(TransportClient::new(client, format!("bitmex-submit-{i}")));
443 }
444
445 Ok(Self {
446 config,
447 transports: Arc::new(transports),
448 health_check_task: Arc::new(RwLock::new(None)),
449 running: Arc::new(AtomicBool::new(false)),
450 total_submits: Arc::new(AtomicU64::new(0)),
451 successful_submits: Arc::new(AtomicU64::new(0)),
452 failed_submits: Arc::new(AtomicU64::new(0)),
453 expected_rejects: Arc::new(AtomicU64::new(0)),
454 })
455 }
456
457 pub async fn start(&self) -> anyhow::Result<()> {
463 if self.running.load(Ordering::Relaxed) {
464 return Ok(());
465 }
466
467 self.running.store(true, Ordering::Relaxed);
468
469 self.run_health_checks().await;
471
472 let transports = Arc::clone(&self.transports);
474 let running = Arc::clone(&self.running);
475 let interval_secs = self.config.health_check_interval_secs;
476 let timeout_secs = self.config.health_check_timeout_secs;
477
478 let task = get_runtime().spawn(async move {
479 let mut ticker = interval(Duration::from_secs(interval_secs));
480 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
481
482 loop {
483 ticker.tick().await;
484
485 if !running.load(Ordering::Relaxed) {
486 break;
487 }
488
489 let tasks: Vec<_> = transports
490 .iter()
491 .map(|t| t.health_check(timeout_secs))
492 .collect();
493
494 let results = future::join_all(tasks).await;
495 let healthy_count = results.iter().filter(|&&r| r).count();
496
497 log::debug!(
498 "Health check complete: {healthy_count}/{} clients healthy",
499 results.len()
500 );
501 }
502 });
503
504 *self.health_check_task.write().await = Some(task);
505
506 log::info!(
507 "SubmitBroadcaster started with {} clients",
508 self.transports.len()
509 );
510
511 Ok(())
512 }
513
514 pub async fn stop(&self) {
516 if !self.running.load(Ordering::Relaxed) {
517 return;
518 }
519
520 self.running.store(false, Ordering::Relaxed);
521
522 if let Some(task) = self.health_check_task.write().await.take() {
523 task.abort();
524 }
525
526 log::info!("SubmitBroadcaster stopped");
527 }
528
529 async fn run_health_checks(&self) {
530 let tasks: Vec<_> = self
531 .transports
532 .iter()
533 .map(|t| t.health_check(self.config.health_check_timeout_secs))
534 .collect();
535
536 let results = future::join_all(tasks).await;
537 let healthy_count = results.iter().filter(|&&r| r).count();
538
539 log::debug!(
540 "Health check complete: {healthy_count}/{} clients healthy",
541 results.len()
542 );
543 }
544
545 fn is_expected_reject(&self, error_message: &str) -> bool {
546 self.config
547 .expected_reject_patterns
548 .iter()
549 .any(|pattern| error_message.contains(pattern))
550 }
551
552 async fn process_submit_results<T>(
556 &self,
557 mut handles: Vec<JoinHandle<(String, anyhow::Result<T>)>>,
558 operation: &str,
559 params: String,
560 ) -> anyhow::Result<T>
561 where
562 T: Send + 'static,
563 {
564 let mut errors = Vec::new();
565 let mut all_duplicate_clordid = true;
566
567 while !handles.is_empty() {
568 let current_handles = std::mem::take(&mut handles);
569 let (result, _idx, remaining) = future::select_all(current_handles).await;
570 handles = remaining.into_iter().collect();
571
572 match result {
573 Ok((client_id, Ok(result))) => {
574 for handle in &handles {
576 handle.abort();
577 }
578 self.successful_submits.fetch_add(1, Ordering::Relaxed);
579 log::debug!("{operation} broadcast succeeded [{client_id}] {params}",);
580 return Ok(result);
581 }
582 Ok((client_id, Err(e))) => {
583 let error_msg = e.to_string();
584 let is_duplicate = error_msg.contains("Duplicate clOrdID");
585
586 if !is_duplicate {
587 all_duplicate_clordid = false;
588 }
589
590 if self.is_expected_reject(&error_msg) {
591 self.expected_rejects.fetch_add(1, Ordering::Relaxed);
592 log::debug!(
593 "Expected {} rejection [{client_id}]: {error_msg} {params}",
594 operation.to_lowercase(),
595 );
596 errors.push(error_msg);
597 } else {
598 log::warn!(
599 "{operation} request failed [{client_id}]: {error_msg} {params}",
600 );
601 errors.push(error_msg);
602 }
603 }
604 Err(e) => {
605 all_duplicate_clordid = false;
606 log::warn!("{operation} task join error: {e:?}");
607 errors.push(format!("Task panicked: {e:?}"));
608 }
609 }
610 }
611
612 self.failed_submits.fetch_add(1, Ordering::Relaxed);
614
615 if all_duplicate_clordid && !errors.is_empty() {
618 log::warn!(
619 "All {} requests returned 'Duplicate clOrdID' - order likely exists {params}",
620 operation.to_lowercase(),
621 );
622 anyhow::bail!("IDEMPOTENT_DUPLICATE: Order likely exists but confirmation was lost");
623 }
624
625 log::error!(
626 "All {} requests failed: {errors:?} {params}",
627 operation.to_lowercase(),
628 );
629 Err(anyhow::anyhow!(
630 "All {} requests failed: {:?}",
631 operation.to_lowercase(),
632 errors
633 ))
634 }
635
636 #[expect(clippy::too_many_arguments)]
647 pub async fn broadcast_submit(
648 &self,
649 instrument_id: InstrumentId,
650 client_order_id: ClientOrderId,
651 order_side: OrderSide,
652 order_type: OrderType,
653 quantity: Quantity,
654 time_in_force: TimeInForce,
655 price: Option<Price>,
656 trigger_price: Option<Price>,
657 trigger_type: Option<TriggerType>,
658 trailing_offset: Option<f64>,
659 trailing_offset_type: Option<TrailingOffsetType>,
660 display_qty: Option<Quantity>,
661 post_only: bool,
662 reduce_only: bool,
663 order_list_id: Option<OrderListId>,
664 contingency_type: Option<ContingencyType>,
665 submit_tries: Option<usize>,
666 peg_price_type: Option<BitmexPegPriceType>,
667 peg_offset_value: Option<f64>,
668 ) -> anyhow::Result<OrderStatusReport> {
669 self.total_submits.fetch_add(1, Ordering::Relaxed);
670
671 let pool_size = self.config.pool_size;
672 let actual_tries = if let Some(t) = submit_tries {
673 if t > pool_size {
674 log::warn!("submit_tries={t} exceeds pool_size={pool_size}, capping at pool_size");
676 }
677 std::cmp::min(t, pool_size)
678 } else {
679 pool_size
680 };
681
682 log::debug!(
683 "Submit broadcast requested for client_order_id={client_order_id} (tries={actual_tries}/{pool_size})",
684 );
685
686 let healthy_transports: Vec<TransportClient> = self
687 .transports
688 .iter()
689 .filter(|t| t.is_healthy())
690 .take(actual_tries)
691 .cloned()
692 .collect();
693
694 if healthy_transports.is_empty() {
695 self.failed_submits.fetch_add(1, Ordering::Relaxed);
696 anyhow::bail!("No healthy transport clients available");
697 }
698
699 log::debug!(
700 "Broadcasting submit to {} clients: client_order_id={client_order_id}, instrument_id={instrument_id}",
701 healthy_transports.len(),
702 );
703
704 let mut handles = Vec::new();
705
706 for transport in healthy_transports {
707 let handle = get_runtime().spawn(async move {
710 let client_id = transport.client_id.clone();
711 let result = transport
712 .submit_order(
713 instrument_id,
714 client_order_id,
715 order_side,
716 order_type,
717 quantity,
718 time_in_force,
719 price,
720 trigger_price,
721 trigger_type,
722 trailing_offset,
723 trailing_offset_type,
724 display_qty,
725 post_only,
726 reduce_only,
727 order_list_id,
728 contingency_type,
729 peg_price_type,
730 peg_offset_value,
731 )
732 .await;
733 (client_id, result)
734 });
735 handles.push(handle);
736 }
737
738 self.process_submit_results(
739 handles,
740 "Submit",
741 format!("(client_order_id={client_order_id:?})"),
742 )
743 .await
744 }
745
746 pub fn get_metrics(&self) -> BroadcasterMetrics {
748 let healthy_clients = self.transports.iter().filter(|t| t.is_healthy()).count();
749 let total_clients = self.transports.len();
750
751 BroadcasterMetrics {
752 total_submits: self.total_submits.load(Ordering::Relaxed),
753 successful_submits: self.successful_submits.load(Ordering::Relaxed),
754 failed_submits: self.failed_submits.load(Ordering::Relaxed),
755 expected_rejects: self.expected_rejects.load(Ordering::Relaxed),
756 healthy_clients,
757 total_clients,
758 }
759 }
760
761 pub async fn get_metrics_async(&self) -> BroadcasterMetrics {
763 self.get_metrics()
764 }
765
766 pub fn get_client_stats(&self) -> Vec<ClientStats> {
768 self.transports
769 .iter()
770 .map(|t| ClientStats {
771 client_id: t.client_id.clone(),
772 healthy: t.is_healthy(),
773 submit_count: t.get_submit_count(),
774 error_count: t.get_error_count(),
775 })
776 .collect()
777 }
778
779 pub async fn get_client_stats_async(&self) -> Vec<ClientStats> {
781 self.get_client_stats()
782 }
783
784 pub fn cache_instrument(&self, instrument: &InstrumentAny) {
786 for transport in self.transports.iter() {
787 transport.executor.add_instrument(instrument.clone());
788 }
789 }
790
791 #[must_use]
792 pub fn clone_for_async(&self) -> Self {
793 Self {
794 config: self.config.clone(),
795 transports: Arc::clone(&self.transports),
796 health_check_task: Arc::clone(&self.health_check_task),
797 running: Arc::clone(&self.running),
798 total_submits: Arc::clone(&self.total_submits),
799 successful_submits: Arc::clone(&self.successful_submits),
800 failed_submits: Arc::clone(&self.failed_submits),
801 expected_rejects: Arc::clone(&self.expected_rejects),
802 }
803 }
804
805 #[cfg(test)]
806 fn new_with_transports(
807 config: SubmitBroadcasterConfig,
808 transports: Vec<TransportClient>,
809 ) -> Self {
810 Self {
811 config,
812 transports: Arc::new(transports),
813 health_check_task: Arc::new(RwLock::new(None)),
814 running: Arc::new(AtomicBool::new(false)),
815 total_submits: Arc::new(AtomicU64::new(0)),
816 successful_submits: Arc::new(AtomicU64::new(0)),
817 failed_submits: Arc::new(AtomicU64::new(0)),
818 expected_rejects: Arc::new(AtomicU64::new(0)),
819 }
820 }
821}
822
823#[derive(Debug, Clone)]
825pub struct BroadcasterMetrics {
826 pub total_submits: u64,
827 pub successful_submits: u64,
828 pub failed_submits: u64,
829 pub expected_rejects: u64,
830 pub healthy_clients: usize,
831 pub total_clients: usize,
832}
833
834#[derive(Debug, Clone)]
836pub struct ClientStats {
837 pub client_id: String,
838 pub healthy: bool,
839 pub submit_count: u64,
840 pub error_count: u64,
841}
842
843#[cfg(test)]
844mod tests {
845 use std::{str::FromStr, sync::atomic::Ordering, time::Duration};
846
847 use nautilus_core::UUID4;
848 use nautilus_model::{
849 enums::{
850 ContingencyType, OrderSide, OrderStatus, OrderType, TimeInForce, TrailingOffsetType,
851 },
852 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
853 reports::OrderStatusReport,
854 types::{Price, Quantity},
855 };
856
857 use super::*;
858
859 #[derive(Clone)]
861 #[expect(clippy::type_complexity)]
862 struct MockExecutor {
863 handler: Arc<
864 dyn Fn() -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send>>
865 + Send
866 + Sync,
867 >,
868 }
869
870 impl MockExecutor {
871 fn new<F, Fut>(handler: F) -> Self
872 where
873 F: Fn() -> Fut + Send + Sync + 'static,
874 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
875 {
876 Self {
877 handler: Arc::new(move || Box::pin(handler())),
878 }
879 }
880 }
881
882 impl SubmitExecutor for MockExecutor {
883 fn health_check(&self) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
884 Box::pin(async { Ok(()) })
885 }
886
887 fn submit_order(
888 &self,
889 _instrument_id: InstrumentId,
890 _client_order_id: ClientOrderId,
891 _order_side: OrderSide,
892 _order_type: OrderType,
893 _quantity: Quantity,
894 _time_in_force: TimeInForce,
895 _price: Option<Price>,
896 _trigger_price: Option<Price>,
897 _trigger_type: Option<TriggerType>,
898 _trailing_offset: Option<f64>,
899 _trailing_offset_type: Option<TrailingOffsetType>,
900 _display_qty: Option<Quantity>,
901 _post_only: bool,
902 _reduce_only: bool,
903 _order_list_id: Option<OrderListId>,
904 _contingency_type: Option<ContingencyType>,
905 _peg_price_type: Option<BitmexPegPriceType>,
906 _peg_offset_value: Option<f64>,
907 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>> {
908 (self.handler)()
909 }
910
911 fn add_instrument(&self, _instrument: InstrumentAny) {
912 }
914 }
915
916 fn create_test_report(venue_order_id: &str) -> OrderStatusReport {
917 OrderStatusReport {
918 account_id: AccountId::from("BITMEX-001"),
919 instrument_id: InstrumentId::from_str("XBTUSD.BITMEX").unwrap(),
920 venue_order_id: VenueOrderId::from(venue_order_id),
921 order_side: OrderSide::Buy,
922 order_type: OrderType::Limit,
923 time_in_force: TimeInForce::Gtc,
924 order_status: OrderStatus::Accepted,
925 price: Some(Price::new(50000.0, 2)),
926 quantity: Quantity::new(100.0, 0),
927 filled_qty: Quantity::new(0.0, 0),
928 report_id: UUID4::new(),
929 ts_accepted: 0.into(),
930 ts_last: 0.into(),
931 ts_init: 0.into(),
932 client_order_id: None,
933 avg_px: None,
934 trigger_price: None,
935 trigger_type: None,
936 contingency_type: ContingencyType::NoContingency,
937 expire_time: None,
938 order_list_id: None,
939 venue_position_id: None,
940 linked_order_ids: None,
941 parent_order_id: None,
942 display_qty: None,
943 limit_offset: None,
944 trailing_offset: None,
945 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
946 post_only: false,
947 reduce_only: false,
948 cancel_reason: None,
949 ts_triggered: None,
950 }
951 }
952
953 fn create_stub_transport<F, Fut>(client_id: &str, handler: F) -> TransportClient
954 where
955 F: Fn() -> Fut + Send + Sync + 'static,
956 Fut: Future<Output = anyhow::Result<OrderStatusReport>> + Send + 'static,
957 {
958 let executor = MockExecutor::new(handler);
959 TransportClient::new(executor, client_id.to_string())
960 }
961
962 #[tokio::test]
963 async fn test_broadcast_submit_immediate_success() {
964 let report = create_test_report("ORDER-1");
965 let report_clone = report.clone();
966
967 let transports = vec![
968 create_stub_transport("client-0", move || {
969 let report = report_clone.clone();
970 async move { Ok(report) }
971 }),
972 create_stub_transport("client-1", || async {
973 tokio::time::sleep(Duration::from_secs(10)).await;
974 anyhow::bail!("Should be aborted")
975 }),
976 ];
977
978 let config = SubmitBroadcasterConfig::default();
979 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
980
981 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
982 let result = broadcaster
983 .broadcast_submit(
984 instrument_id,
985 ClientOrderId::from("O-123"),
986 OrderSide::Buy,
987 OrderType::Limit,
988 Quantity::new(100.0, 0),
989 TimeInForce::Gtc,
990 Some(Price::new(50000.0, 2)),
991 None,
992 None,
993 None,
994 None,
995 None,
996 false,
997 false,
998 None,
999 None,
1000 None,
1001 None,
1002 None,
1003 )
1004 .await;
1005
1006 assert!(result.is_ok());
1007 let returned_report = result.unwrap();
1008 assert_eq!(returned_report.venue_order_id, report.venue_order_id);
1009
1010 let metrics = broadcaster.get_metrics_async().await;
1011 assert_eq!(metrics.successful_submits, 1);
1012 assert_eq!(metrics.failed_submits, 0);
1013 assert_eq!(metrics.total_submits, 1);
1014 }
1015
1016 #[tokio::test]
1017 async fn test_broadcast_submit_duplicate_clordid_expected() {
1018 let transports = vec![
1019 create_stub_transport("client-0", || async { anyhow::bail!("Duplicate clOrdID") }),
1020 create_stub_transport("client-1", || async {
1021 tokio::time::sleep(Duration::from_secs(10)).await;
1022 anyhow::bail!("Should be aborted")
1023 }),
1024 ];
1025
1026 let config = SubmitBroadcasterConfig::default();
1027 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1028
1029 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1030 let result = broadcaster
1031 .broadcast_submit(
1032 instrument_id,
1033 ClientOrderId::from("O-123"),
1034 OrderSide::Buy,
1035 OrderType::Limit,
1036 Quantity::new(100.0, 0),
1037 TimeInForce::Gtc,
1038 Some(Price::new(50000.0, 2)),
1039 None,
1040 None,
1041 None,
1042 None,
1043 None,
1044 false,
1045 false,
1046 None,
1047 None,
1048 None,
1049 None,
1050 None,
1051 )
1052 .await;
1053
1054 assert!(result.is_err());
1055
1056 let metrics = broadcaster.get_metrics_async().await;
1057 assert_eq!(metrics.expected_rejects, 1);
1058 assert_eq!(metrics.successful_submits, 0);
1059 assert_eq!(metrics.failed_submits, 1);
1060 }
1061
1062 #[tokio::test]
1063 async fn test_broadcast_submit_all_failures() {
1064 let transports = vec![
1065 create_stub_transport("client-0", || async { anyhow::bail!("502 Bad Gateway") }),
1066 create_stub_transport("client-1", || async { anyhow::bail!("Connection refused") }),
1067 ];
1068
1069 let config = SubmitBroadcasterConfig::default();
1070 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1071
1072 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1073 let result = broadcaster
1074 .broadcast_submit(
1075 instrument_id,
1076 ClientOrderId::from("O-456"),
1077 OrderSide::Sell,
1078 OrderType::Market,
1079 Quantity::new(50.0, 0),
1080 TimeInForce::Ioc,
1081 None,
1082 None,
1083 None,
1084 None,
1085 None,
1086 None,
1087 false,
1088 false,
1089 None,
1090 None,
1091 None,
1092 None,
1093 None,
1094 )
1095 .await;
1096
1097 assert!(result.is_err());
1098 assert!(
1099 result
1100 .unwrap_err()
1101 .to_string()
1102 .contains("All submit requests failed")
1103 );
1104
1105 let metrics = broadcaster.get_metrics_async().await;
1106 assert_eq!(metrics.failed_submits, 1);
1107 assert_eq!(metrics.successful_submits, 0);
1108 }
1109
1110 #[tokio::test]
1111 async fn test_broadcast_submit_no_healthy_clients() {
1112 let transport =
1113 create_stub_transport("client-0", || async { Ok(create_test_report("ORDER-1")) });
1114 transport.healthy.store(false, Ordering::Relaxed);
1115
1116 let config = SubmitBroadcasterConfig::default();
1117 let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![transport]);
1118
1119 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1120 let result = broadcaster
1121 .broadcast_submit(
1122 instrument_id,
1123 ClientOrderId::from("O-789"),
1124 OrderSide::Buy,
1125 OrderType::Limit,
1126 Quantity::new(100.0, 0),
1127 TimeInForce::Gtc,
1128 Some(Price::new(50000.0, 2)),
1129 None,
1130 None,
1131 None,
1132 None,
1133 None,
1134 false,
1135 false,
1136 None,
1137 None,
1138 None,
1139 None,
1140 None,
1141 )
1142 .await;
1143
1144 assert!(result.is_err());
1145 assert!(
1146 result
1147 .unwrap_err()
1148 .to_string()
1149 .contains("No healthy transport clients available")
1150 );
1151
1152 let metrics = broadcaster.get_metrics_async().await;
1153 assert_eq!(metrics.failed_submits, 1);
1154 }
1155
1156 #[tokio::test]
1157 async fn test_default_config() {
1158 let report = create_test_report("ORDER-1");
1159 let transports: Vec<TransportClient> = (0..3)
1160 .map(|i| {
1161 let r = report.clone();
1162 create_stub_transport(&format!("client-{i}"), move || {
1163 let r = r.clone();
1164 async move { Ok(r) }
1165 })
1166 })
1167 .collect();
1168
1169 let config = SubmitBroadcasterConfig::default();
1170 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1171 let metrics = broadcaster.get_metrics_async().await;
1172
1173 assert_eq!(metrics.total_clients, 3);
1174 }
1175
1176 #[tokio::test]
1177 async fn test_broadcaster_lifecycle() {
1178 let report = create_test_report("ORDER-1");
1179 let transports: Vec<TransportClient> = (0..2)
1180 .map(|i| {
1181 let r = report.clone();
1182 create_stub_transport(&format!("client-{i}"), move || {
1183 let r = r.clone();
1184 async move { Ok(r) }
1185 })
1186 })
1187 .collect();
1188
1189 let config = SubmitBroadcasterConfig::default();
1190 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1191
1192 assert!(!broadcaster.running.load(Ordering::Relaxed));
1194
1195 let start_result = broadcaster.start().await;
1197 assert!(start_result.is_ok());
1198 assert!(broadcaster.running.load(Ordering::Relaxed));
1199
1200 let start_again = broadcaster.start().await;
1202 assert!(start_again.is_ok());
1203
1204 broadcaster.stop().await;
1206 assert!(!broadcaster.running.load(Ordering::Relaxed));
1207
1208 broadcaster.stop().await;
1210 assert!(!broadcaster.running.load(Ordering::Relaxed));
1211 }
1212
1213 #[tokio::test]
1214 async fn test_broadcast_submit_metrics_increment() {
1215 let report = create_test_report("ORDER-1");
1216 let report_clone = report.clone();
1217
1218 let transports = vec![create_stub_transport("client-0", move || {
1219 let report = report_clone.clone();
1220 async move { Ok(report) }
1221 })];
1222
1223 let config = SubmitBroadcasterConfig::default();
1224 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1225
1226 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1227 let _ = broadcaster
1228 .broadcast_submit(
1229 instrument_id,
1230 ClientOrderId::from("O-123"),
1231 OrderSide::Buy,
1232 OrderType::Limit,
1233 Quantity::new(100.0, 0),
1234 TimeInForce::Gtc,
1235 Some(Price::new(50000.0, 2)),
1236 None,
1237 None,
1238 None,
1239 None,
1240 None,
1241 false,
1242 false,
1243 None,
1244 None,
1245 None,
1246 None,
1247 None,
1248 )
1249 .await;
1250
1251 let metrics = broadcaster.get_metrics_async().await;
1252 assert_eq!(metrics.total_submits, 1);
1253 assert_eq!(metrics.successful_submits, 1);
1254 assert_eq!(metrics.failed_submits, 0);
1255 }
1256
1257 #[tokio::test]
1258 async fn test_broadcaster_creation_with_pool() {
1259 let report = create_test_report("ORDER-1");
1260 let transports: Vec<TransportClient> = (0..4)
1261 .map(|i| {
1262 let r = report.clone();
1263 create_stub_transport(&format!("client-{i}"), move || {
1264 let r = r.clone();
1265 async move { Ok(r) }
1266 })
1267 })
1268 .collect();
1269
1270 let config = SubmitBroadcasterConfig::default();
1271 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1272 let metrics = broadcaster.get_metrics_async().await;
1273 assert_eq!(metrics.total_clients, 4);
1274 }
1275
1276 #[tokio::test]
1277 async fn test_client_stats_collection() {
1278 let transports = vec![
1281 create_stub_transport("client-0", || async { anyhow::bail!("Timeout error") }),
1282 create_stub_transport("client-1", || async { anyhow::bail!("Connection error") }),
1283 ];
1284
1285 let config = SubmitBroadcasterConfig::default();
1286 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1287
1288 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1289 let _ = broadcaster
1290 .broadcast_submit(
1291 instrument_id,
1292 ClientOrderId::from("O-123"),
1293 OrderSide::Buy,
1294 OrderType::Limit,
1295 Quantity::new(100.0, 0),
1296 TimeInForce::Gtc,
1297 Some(Price::new(50000.0, 2)),
1298 None,
1299 None,
1300 None,
1301 None,
1302 None,
1303 false,
1304 false,
1305 None,
1306 None,
1307 None,
1308 None,
1309 None,
1310 )
1311 .await;
1312
1313 let stats = broadcaster.get_client_stats_async().await;
1314 assert_eq!(stats.len(), 2);
1315
1316 let client0 = stats.iter().find(|s| s.client_id == "client-0").unwrap();
1317 assert_eq!(client0.submit_count, 1);
1318 assert_eq!(client0.error_count, 1);
1319
1320 let client1 = stats.iter().find(|s| s.client_id == "client-1").unwrap();
1321 assert_eq!(client1.submit_count, 1);
1322 assert_eq!(client1.error_count, 1);
1323 }
1324
1325 #[tokio::test]
1326 async fn test_testnet_config_sets_base_url() {
1327 let config = SubmitBroadcasterConfig {
1328 pool_size: 1,
1329 api_key: Some("test_key".to_string()),
1330 api_secret: Some("test_secret".to_string()),
1331 environment: BitmexEnvironment::Testnet,
1332 base_url: None,
1333 ..Default::default()
1334 };
1335
1336 let broadcaster = SubmitBroadcaster::new(config);
1337 assert!(broadcaster.is_ok());
1338 }
1339
1340 #[tokio::test]
1341 async fn test_constructor_honors_default_pool_size() {
1342 let config = SubmitBroadcasterConfig {
1343 api_key: Some("test_key".to_string()),
1344 api_secret: Some("test_secret".to_string()),
1345 base_url: Some("http://127.0.0.1:19999".to_string()),
1346 ..Default::default()
1347 };
1348
1349 let expected_pool = config.pool_size;
1350 let broadcaster = SubmitBroadcaster::new(config).unwrap();
1351 let metrics = broadcaster.get_metrics_async().await;
1352
1353 assert_eq!(metrics.total_clients, expected_pool);
1354 }
1355
1356 #[tokio::test]
1357 async fn test_clone_for_async() {
1358 let report = create_test_report("ORDER-1");
1359 let transports = vec![create_stub_transport("client-0", move || {
1360 let r = report.clone();
1361 async move { Ok(r) }
1362 })];
1363
1364 let config = SubmitBroadcasterConfig::default();
1365 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1366 let cloned = broadcaster.clone_for_async();
1367
1368 broadcaster.total_submits.fetch_add(1, Ordering::Relaxed);
1370 assert_eq!(cloned.total_submits.load(Ordering::Relaxed), 1);
1371 }
1372
1373 #[tokio::test]
1374 async fn test_pattern_matching() {
1375 let config = SubmitBroadcasterConfig {
1376 expected_reject_patterns: vec![
1377 "Duplicate clOrdID".to_string(),
1378 "Order already exists".to_string(),
1379 ],
1380 ..Default::default()
1381 };
1382
1383 let broadcaster = SubmitBroadcaster::new_with_transports(config, vec![]);
1384
1385 assert!(broadcaster.is_expected_reject("Error: Duplicate clOrdID for order"));
1386 assert!(broadcaster.is_expected_reject("Order already exists in system"));
1387 assert!(!broadcaster.is_expected_reject("Rate limit exceeded"));
1388 assert!(!broadcaster.is_expected_reject("Internal server error"));
1389 }
1390
1391 #[tokio::test]
1392 async fn test_submit_metrics_with_mixed_responses() {
1393 let report = create_test_report("ORDER-1");
1394 let report_clone = report.clone();
1395
1396 let transports = vec![
1397 create_stub_transport("client-0", move || {
1398 let report = report_clone.clone();
1399 async move { Ok(report) }
1400 }),
1401 create_stub_transport("client-1", || async { anyhow::bail!("Timeout") }),
1402 ];
1403
1404 let config = SubmitBroadcasterConfig::default();
1405 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1406
1407 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1408 let result = broadcaster
1409 .broadcast_submit(
1410 instrument_id,
1411 ClientOrderId::from("O-123"),
1412 OrderSide::Buy,
1413 OrderType::Limit,
1414 Quantity::new(100.0, 0),
1415 TimeInForce::Gtc,
1416 Some(Price::new(50000.0, 2)),
1417 None,
1418 None,
1419 None,
1420 None,
1421 None,
1422 false,
1423 false,
1424 None,
1425 None,
1426 None,
1427 None,
1428 None,
1429 )
1430 .await;
1431
1432 assert!(result.is_ok());
1433
1434 let metrics = broadcaster.get_metrics_async().await;
1435 assert_eq!(metrics.total_submits, 1);
1436 assert_eq!(metrics.successful_submits, 1);
1437 assert_eq!(metrics.failed_submits, 0);
1438 }
1439
1440 #[tokio::test]
1441 async fn test_metrics_initialization_and_health() {
1442 let report = create_test_report("ORDER-1");
1443 let transports: Vec<TransportClient> = (0..2)
1444 .map(|i| {
1445 let r = report.clone();
1446 create_stub_transport(&format!("client-{i}"), move || {
1447 let r = r.clone();
1448 async move { Ok(r) }
1449 })
1450 })
1451 .collect();
1452
1453 let config = SubmitBroadcasterConfig::default();
1454 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1455 let metrics = broadcaster.get_metrics_async().await;
1456
1457 assert_eq!(metrics.total_submits, 0);
1458 assert_eq!(metrics.successful_submits, 0);
1459 assert_eq!(metrics.failed_submits, 0);
1460 assert_eq!(metrics.expected_rejects, 0);
1461 assert_eq!(metrics.total_clients, 2);
1462 assert_eq!(metrics.healthy_clients, 2);
1463 }
1464
1465 #[tokio::test]
1466 async fn test_health_check_task_lifecycle() {
1467 let report = create_test_report("ORDER-1");
1468 let transports: Vec<TransportClient> = (0..2)
1469 .map(|i| {
1470 let r = report.clone();
1471 create_stub_transport(&format!("client-{i}"), move || {
1472 let r = r.clone();
1473 async move { Ok(r) }
1474 })
1475 })
1476 .collect();
1477
1478 let config = SubmitBroadcasterConfig::default();
1479 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1480
1481 broadcaster.start().await.unwrap();
1483 assert!(broadcaster.running.load(Ordering::Relaxed));
1484 assert!(
1485 broadcaster
1486 .health_check_task
1487 .read()
1488 .await
1489 .as_ref()
1490 .is_some()
1491 );
1492
1493 broadcaster.stop().await;
1495 assert!(!broadcaster.running.load(Ordering::Relaxed));
1496 }
1497
1498 #[tokio::test]
1499 async fn test_expected_reject_pattern_comprehensive() {
1500 let transports = vec![
1501 create_stub_transport("client-0", || async {
1502 anyhow::bail!("Duplicate clOrdID: O-123 already exists")
1503 }),
1504 create_stub_transport("client-1", || async {
1505 tokio::time::sleep(Duration::from_secs(10)).await;
1506 anyhow::bail!("Should be aborted")
1507 }),
1508 ];
1509
1510 let config = SubmitBroadcasterConfig::default();
1511 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1512
1513 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1514 let result = broadcaster
1515 .broadcast_submit(
1516 instrument_id,
1517 ClientOrderId::from("O-123"),
1518 OrderSide::Buy,
1519 OrderType::Limit,
1520 Quantity::new(100.0, 0),
1521 TimeInForce::Gtc,
1522 Some(Price::new(50000.0, 2)),
1523 None,
1524 None,
1525 None,
1526 None,
1527 None,
1528 false,
1529 false,
1530 None,
1531 None,
1532 None,
1533 None,
1534 None,
1535 )
1536 .await;
1537
1538 assert!(result.is_err());
1540
1541 let metrics = broadcaster.get_metrics_async().await;
1542 assert_eq!(metrics.expected_rejects, 1);
1543 assert_eq!(metrics.failed_submits, 1);
1544 assert_eq!(metrics.successful_submits, 0);
1545 }
1546
1547 #[tokio::test]
1548 async fn test_client_order_id_suffix_for_multiple_clients() {
1549 use std::sync::{Arc, Mutex};
1550
1551 #[derive(Clone)]
1552 struct CaptureExecutor {
1553 captured_ids: Arc<Mutex<Vec<String>>>,
1554 barrier: Arc<tokio::sync::Barrier>,
1555 report: OrderStatusReport,
1556 }
1557
1558 impl SubmitExecutor for CaptureExecutor {
1559 fn health_check(
1560 &self,
1561 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1562 Box::pin(async { Ok(()) })
1563 }
1564
1565 fn submit_order(
1566 &self,
1567 _instrument_id: InstrumentId,
1568 client_order_id: ClientOrderId,
1569 _order_side: OrderSide,
1570 _order_type: OrderType,
1571 _quantity: Quantity,
1572 _time_in_force: TimeInForce,
1573 _price: Option<Price>,
1574 _trigger_price: Option<Price>,
1575 _trigger_type: Option<TriggerType>,
1576 _trailing_offset: Option<f64>,
1577 _trailing_offset_type: Option<TrailingOffsetType>,
1578 _display_qty: Option<Quantity>,
1579 _post_only: bool,
1580 _reduce_only: bool,
1581 _order_list_id: Option<OrderListId>,
1582 _contingency_type: Option<ContingencyType>,
1583 _peg_price_type: Option<BitmexPegPriceType>,
1584 _peg_offset_value: Option<f64>,
1585 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1586 {
1587 self.captured_ids
1589 .lock()
1590 .unwrap()
1591 .push(client_order_id.as_str().to_string());
1592 let report = self.report.clone();
1593 let barrier = Arc::clone(&self.barrier);
1594 Box::pin(async move {
1597 barrier.wait().await;
1598 Ok(report)
1599 })
1600 }
1601
1602 fn add_instrument(&self, _instrument: InstrumentAny) {}
1603 }
1604
1605 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1606 let barrier = Arc::new(tokio::sync::Barrier::new(3));
1607 let report = create_test_report("ORDER-1");
1608
1609 let transports = vec![
1610 TransportClient::new(
1611 CaptureExecutor {
1612 captured_ids: Arc::clone(&captured_ids),
1613 barrier: Arc::clone(&barrier),
1614 report: report.clone(),
1615 },
1616 "client-0".to_string(),
1617 ),
1618 TransportClient::new(
1619 CaptureExecutor {
1620 captured_ids: Arc::clone(&captured_ids),
1621 barrier: Arc::clone(&barrier),
1622 report: report.clone(),
1623 },
1624 "client-1".to_string(),
1625 ),
1626 TransportClient::new(
1627 CaptureExecutor {
1628 captured_ids: Arc::clone(&captured_ids),
1629 barrier: Arc::clone(&barrier),
1630 report: report.clone(),
1631 },
1632 "client-2".to_string(),
1633 ),
1634 ];
1635
1636 let config = SubmitBroadcasterConfig::default();
1637 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1638
1639 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1640 let result = broadcaster
1641 .broadcast_submit(
1642 instrument_id,
1643 ClientOrderId::from("O-123"),
1644 OrderSide::Buy,
1645 OrderType::Limit,
1646 Quantity::new(100.0, 0),
1647 TimeInForce::Gtc,
1648 Some(Price::new(50000.0, 2)),
1649 None,
1650 None,
1651 None,
1652 None,
1653 None,
1654 false,
1655 false,
1656 None,
1657 None,
1658 None,
1659 None,
1660 None,
1661 )
1662 .await;
1663
1664 assert!(result.is_ok());
1665
1666 let ids = captured_ids.lock().unwrap();
1668 assert_eq!(ids.len(), 3);
1669 assert!(ids.iter().all(|id| id == "O-123")); }
1671
1672 #[tokio::test]
1673 async fn test_client_order_id_suffix_with_partial_failure() {
1674 use std::sync::{Arc, Mutex};
1675
1676 #[derive(Clone)]
1677 struct CaptureAndFailExecutor {
1678 captured_ids: Arc<Mutex<Vec<String>>>,
1679 barrier: Arc<tokio::sync::Barrier>,
1680 should_succeed: bool,
1681 }
1682
1683 impl SubmitExecutor for CaptureAndFailExecutor {
1684 fn health_check(
1685 &self,
1686 ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + '_>> {
1687 Box::pin(async { Ok(()) })
1688 }
1689
1690 fn submit_order(
1691 &self,
1692 _instrument_id: InstrumentId,
1693 client_order_id: ClientOrderId,
1694 _order_side: OrderSide,
1695 _order_type: OrderType,
1696 _quantity: Quantity,
1697 _time_in_force: TimeInForce,
1698 _price: Option<Price>,
1699 _trigger_price: Option<Price>,
1700 _trigger_type: Option<TriggerType>,
1701 _trailing_offset: Option<f64>,
1702 _trailing_offset_type: Option<TrailingOffsetType>,
1703 _display_qty: Option<Quantity>,
1704 _post_only: bool,
1705 _reduce_only: bool,
1706 _order_list_id: Option<OrderListId>,
1707 _contingency_type: Option<ContingencyType>,
1708 _peg_price_type: Option<BitmexPegPriceType>,
1709 _peg_offset_value: Option<f64>,
1710 ) -> Pin<Box<dyn Future<Output = anyhow::Result<OrderStatusReport>> + Send + '_>>
1711 {
1712 self.captured_ids
1714 .lock()
1715 .unwrap()
1716 .push(client_order_id.as_str().to_string());
1717 let barrier = Arc::clone(&self.barrier);
1718 let should_succeed = self.should_succeed;
1719 Box::pin(async move {
1722 barrier.wait().await;
1723
1724 if should_succeed {
1725 Ok(create_test_report("ORDER-1"))
1726 } else {
1727 anyhow::bail!("Network error")
1728 }
1729 })
1730 }
1731
1732 fn add_instrument(&self, _instrument: InstrumentAny) {}
1733 }
1734
1735 let captured_ids = Arc::new(Mutex::new(Vec::new()));
1736 let barrier = Arc::new(tokio::sync::Barrier::new(2));
1737
1738 let transports = vec![
1739 TransportClient::new(
1740 CaptureAndFailExecutor {
1741 captured_ids: Arc::clone(&captured_ids),
1742 barrier: Arc::clone(&barrier),
1743 should_succeed: false,
1744 },
1745 "client-0".to_string(),
1746 ),
1747 TransportClient::new(
1748 CaptureAndFailExecutor {
1749 captured_ids: Arc::clone(&captured_ids),
1750 barrier: Arc::clone(&barrier),
1751 should_succeed: true,
1752 },
1753 "client-1".to_string(),
1754 ),
1755 ];
1756
1757 let config = SubmitBroadcasterConfig::default();
1758 let broadcaster = SubmitBroadcaster::new_with_transports(config, transports);
1759
1760 let instrument_id = InstrumentId::from_str("XBTUSD.BITMEX").unwrap();
1761 let result = broadcaster
1762 .broadcast_submit(
1763 instrument_id,
1764 ClientOrderId::from("O-456"),
1765 OrderSide::Sell,
1766 OrderType::Market,
1767 Quantity::new(50.0, 0),
1768 TimeInForce::Ioc,
1769 None,
1770 None,
1771 None,
1772 None,
1773 None,
1774 None,
1775 false,
1776 false,
1777 None,
1778 None,
1779 None,
1780 None,
1781 None,
1782 )
1783 .await;
1784
1785 assert!(result.is_ok());
1786
1787 let ids = captured_ids.lock().unwrap();
1789 assert_eq!(ids.len(), 2);
1790 assert!(ids.iter().all(|id| id == "O-456")); }
1792
1793 #[tokio::test]
1794 async fn test_proxy_urls_populated_from_config() {
1795 let config = SubmitBroadcasterConfig {
1796 pool_size: 3,
1797 api_key: Some("test_key".to_string()),
1798 api_secret: Some("test_secret".to_string()),
1799 proxy_urls: vec![
1800 Some("http://proxy1:8080".to_string()),
1801 Some("http://proxy2:8080".to_string()),
1802 Some("http://proxy3:8080".to_string()),
1803 ],
1804 ..Default::default()
1805 };
1806
1807 assert_eq!(config.proxy_urls.len(), 3);
1808 assert_eq!(config.proxy_urls[0], Some("http://proxy1:8080".to_string()));
1809 assert_eq!(config.proxy_urls[1], Some("http://proxy2:8080".to_string()));
1810 assert_eq!(config.proxy_urls[2], Some("http://proxy3:8080".to_string()));
1811 }
1812}