1use std::{
19 fmt,
20 future::Future,
21 sync::{
22 Arc, Mutex,
23 atomic::{AtomicBool, Ordering},
24 },
25};
26
27use ahash::{AHashMap, AHashSet};
28use async_trait::async_trait;
29use nautilus_common::{
30 clients::ExecutionClient,
31 live::{
32 get_runtime,
33 runner::{get_data_event_sender, get_exec_event_sender},
34 },
35 messages::{
36 DataEvent,
37 execution::{
38 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
39 GenerateFillReportsBuilder, GenerateOrderStatusReports,
40 GenerateOrderStatusReportsBuilder, ModifyOrder, QueryOrder, SubmitOrder,
41 SubmitOrderList,
42 },
43 },
44};
45use nautilus_core::{
46 MUTEX_POISONED, UnixNanos,
47 datetime::NANOSECONDS_IN_SECOND,
48 time::{AtomicTime, get_atomic_clock_realtime},
49};
50use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
51use nautilus_model::{
52 accounts::AccountAny,
53 data::Data,
54 enums::{AccountType, OmsType, OrderStatus, OrderType, TimeInForce},
55 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, Venue, VenueOrderId},
56 instruments::InstrumentAny,
57 orders::Order,
58 reports::{ExecutionMassStatus, FillReport, OrderStatusReport},
59 types::{AccountBalance, Currency, MarginBalance},
60};
61use nautilus_network::socket::TcpMessageHandler;
62use rust_decimal::Decimal;
63use tokio::task::JoinHandle;
64
65use crate::{
66 common::{
67 consts::{
68 BETFAIR_VENUE, METHOD_CANCEL_ORDERS, METHOD_GET_ACCOUNT_FUNDS,
69 METHOD_LIST_CURRENT_ORDERS, METHOD_PLACE_ORDERS, METHOD_REPLACE_ORDERS,
70 },
71 credential::BetfairCredential,
72 enums::{
73 BetfairOrderStatus, BetfairOrderType, BetfairSide, BetfairTimeInForce,
74 ExecutionReportErrorCode, ExecutionReportStatus, InstructionReportErrorCode,
75 InstructionReportStatus, OrderProjection, PersistenceType, StreamingOrderStatus,
76 StreamingSide,
77 },
78 parse::{
79 extract_market_id, extract_selection_id, make_customer_order_ref,
80 make_customer_order_ref_legacy, make_instrument_id, parse_account_state,
81 parse_millis_timestamp,
82 },
83 types::BetId,
84 },
85 config::BetfairExecConfig,
86 data::custom_data_with_instrument,
87 data_types::{BetfairOrderVoided, register_betfair_custom_data},
88 http::{
89 client::BetfairHttpClient,
90 models::{
91 AccountFundsResponse, CancelExecutionReport, CancelInstruction, CancelOrdersParams,
92 CurrentOrderSummary, CurrentOrderSummaryReport, LimitOnCloseOrder, LimitOrder,
93 ListCurrentOrdersParams, MarketOnCloseOrder, MarketVersion, PlaceExecutionReport,
94 PlaceInstruction, PlaceInstructionReport, PlaceOrdersParams, ReplaceExecutionReport,
95 ReplaceInstruction, ReplaceInstructionReport, ReplaceOrdersParams, TimeRange,
96 },
97 parse::{parse_current_order_fill_report, parse_current_order_report},
98 },
99 stream::{
100 client::BetfairStreamClient,
101 config::BetfairStreamConfig,
102 messages::{StreamMessage, stream_decode},
103 parse::{FillTracker, has_cancel_quantity, parse_order_status_report},
104 },
105};
106
107const KEEP_ALIVE_INTERVAL_SECS: u64 = 36_000;
109
110const RATE_LIMIT_RETRY_DELAY_SECS: u64 = 5;
112
113#[derive(Debug, Default)]
118pub struct OcmState {
119 pub fill_tracker: FillTracker,
120 pub customer_order_refs: AHashMap<String, ClientOrderId>,
122 pub stream_reported_client_orders: AHashSet<ClientOrderId>,
124 pub terminal_orders: AHashSet<String>,
126 pub replaced_venue_order_ids: AHashSet<String>,
128 pub pending_update_keys: AHashSet<(ClientOrderId, String)>,
130}
131
132impl OcmState {
133 pub fn register_customer_order_ref(&mut self, client_order_id: ClientOrderId) {
135 let rfo = make_customer_order_ref(client_order_id.as_str());
136 self.customer_order_refs.insert(rfo, client_order_id);
137 }
138
139 pub fn register_customer_order_ref_with_legacy(&mut self, client_order_id: ClientOrderId) {
144 let rfo = make_customer_order_ref(client_order_id.as_str());
145 let rfo_legacy = make_customer_order_ref_legacy(client_order_id.as_str());
146 self.customer_order_refs.insert(rfo, client_order_id);
147 if rfo_legacy != client_order_id.as_str() {
148 self.customer_order_refs.insert(rfo_legacy, client_order_id);
149 }
150 }
151
152 pub fn remove_customer_order_refs(&mut self, client_order_id: &ClientOrderId) {
154 let rfo = make_customer_order_ref(client_order_id.as_str());
155 let rfo_legacy = make_customer_order_ref_legacy(client_order_id.as_str());
156 self.customer_order_refs.remove(&rfo);
157 self.customer_order_refs.remove(&rfo_legacy);
158 }
159
160 pub fn resolve_client_order_id(&self, rfo: Option<&str>) -> Option<ClientOrderId> {
162 rfo.and_then(|r| self.customer_order_refs.get(r).copied())
163 }
164
165 pub fn try_mark_terminal(&mut self, bet_id: &str) -> bool {
168 !self.terminal_orders.insert(bet_id.to_string())
169 }
170
171 pub fn should_suppress_cancel(&self, client_order_id: &ClientOrderId, bet_id: &str) -> bool {
174 if self.replaced_venue_order_ids.contains(bet_id) {
175 return true;
176 }
177 self.pending_update_keys
178 .contains(&(*client_order_id, bet_id.to_string()))
179 }
180
181 pub fn cleanup_terminal_order(&mut self, client_order_id: &ClientOrderId) {
184 let has_pending = self
185 .pending_update_keys
186 .iter()
187 .any(|(cid, _)| cid == client_order_id);
188
189 if !has_pending {
190 self.remove_customer_order_refs(client_order_id);
191 }
192 }
193
194 pub fn sync_from_orders(&mut self, orders: &[(String, ClientOrderId, Decimal, Decimal, bool)]) {
200 for (bet_id, client_order_id, filled_qty, avg_px, is_closed) in orders {
201 if *is_closed {
202 self.terminal_orders.insert(bet_id.clone());
203 } else {
204 self.register_customer_order_ref_with_legacy(*client_order_id);
205 }
206
207 if *filled_qty > Decimal::ZERO {
208 self.fill_tracker.sync_order(bet_id, *filled_qty, *avg_px);
209 }
210 }
211 }
212}
213
214#[derive(Debug)]
216pub struct BetfairExecutionClient {
217 core: ExecutionClientCore,
218 clock: &'static AtomicTime,
219 emitter: ExecutionEventEmitter,
220 http_client: Arc<BetfairHttpClient>,
221 stream_client: Option<Arc<BetfairStreamClient>>,
222 credential: BetfairCredential,
223 stream_config: BetfairStreamConfig,
224 config: BetfairExecConfig,
225 currency: Currency,
226 ocm_state: Arc<Mutex<OcmState>>,
227 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
228 keep_alive_handle: Option<JoinHandle<()>>,
229 account_state_handle: Option<JoinHandle<()>>,
230 reconnect_handle: Option<JoinHandle<()>>,
231}
232
233impl BetfairExecutionClient {
234 #[must_use]
236 pub fn new(
237 core: ExecutionClientCore,
238 http_client: BetfairHttpClient,
239 credential: BetfairCredential,
240 stream_config: BetfairStreamConfig,
241 config: BetfairExecConfig,
242 currency: Currency,
243 ) -> Self {
244 let clock = get_atomic_clock_realtime();
245 let emitter = ExecutionEventEmitter::new(
246 clock,
247 core.trader_id,
248 core.account_id,
249 AccountType::Betting,
250 None,
251 );
252
253 Self {
254 core,
255 clock,
256 emitter,
257 http_client: Arc::new(http_client),
258 stream_client: None,
259 credential,
260 stream_config,
261 config,
262 currency,
263 ocm_state: Arc::new(Mutex::new(OcmState::default())),
264 pending_tasks: Mutex::new(Vec::new()),
265 keep_alive_handle: None,
266 account_state_handle: None,
267 reconnect_handle: None,
268 }
269 }
270
271 fn spawn_task<F>(&self, description: &'static str, fut: F)
272 where
273 F: Future<Output = anyhow::Result<()>> + Send + 'static,
274 {
275 let runtime = get_runtime();
276 let handle = runtime.spawn(async move {
277 if let Err(e) = fut.await {
278 log::warn!("{description} failed: {e:?}");
279 }
280 });
281
282 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
283 tasks.retain(|handle| !handle.is_finished());
284 tasks.push(handle);
285 }
286
287 fn reconcile_market_ids(&self) -> Option<Vec<String>> {
288 if self.config.reconcile_market_ids_only
289 && let Some(ids) = &self.config.reconcile_market_ids
290 {
291 return Some(ids.clone());
292 }
293 self.config.stream_market_ids_filter.clone()
294 }
295
296 fn get_market_version(&self, instrument_id: &InstrumentId) -> Option<MarketVersion> {
302 if !self.config.use_market_version {
303 return None;
304 }
305
306 let cache = self.core.cache();
307 let instrument = cache.instrument(instrument_id)?;
308
309 if let InstrumentAny::Betting(betting) = instrument {
310 let version = betting.info.as_ref()?.get_i64("version")?;
311 return Some(MarketVersion {
312 version: Some(version),
313 });
314 }
315
316 None
317 }
318
319 fn sync_ocm_state_from_cache(&self) {
322 let cache = self.core.cache();
323 let venue = *BETFAIR_VENUE;
324 let orders = cache.orders(Some(&venue), None, None, None, None);
325
326 let order_data: Vec<_> = orders
327 .iter()
328 .filter_map(|order| {
329 let venue_order_id = order.venue_order_id()?;
330 let bet_id = venue_order_id.to_string();
331 let filled_qty = order.filled_qty().as_decimal();
332 let avg_px = order.avg_px().map_or(Decimal::ZERO, |px| {
333 Decimal::try_from(px).unwrap_or(Decimal::ZERO)
334 });
335 Some((
336 bet_id,
337 order.client_order_id(),
338 filled_qty,
339 avg_px,
340 order.is_closed(),
341 ))
342 })
343 .collect();
344
345 let mut state = self.ocm_state.lock().expect(MUTEX_POISONED);
346 state.sync_from_orders(&order_data);
347
348 log::info!("Synced OCM state from {} cached orders", order_data.len());
349 }
350
351 fn abort_pending_tasks(&self) {
352 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
353 for handle in tasks.drain(..) {
354 handle.abort();
355 }
356 }
357
358 fn abort_background_tasks(&mut self) {
359 if let Some(handle) = self.keep_alive_handle.take() {
360 handle.abort();
361 }
362
363 if let Some(handle) = self.account_state_handle.take() {
364 handle.abort();
365 }
366
367 if let Some(handle) = self.reconnect_handle.take() {
368 handle.abort();
369 }
370 }
371
372 #[expect(clippy::too_many_arguments)]
373 fn create_ocm_handler(
374 emitter: ExecutionEventEmitter,
375 account_id: AccountId,
376 currency: Currency,
377 ocm_state: Arc<Mutex<OcmState>>,
378 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
379 market_ids_filter: Option<ahash::AHashSet<String>>,
380 ignore_external_orders: bool,
381 reconnect_tx: tokio::sync::mpsc::UnboundedSender<()>,
382 ) -> TcpMessageHandler {
383 let has_initial_connection = Arc::new(AtomicBool::new(false));
384
385 Arc::new(move |data: &[u8]| {
386 let msg = match stream_decode(data) {
387 Ok(msg) => msg,
388 Err(e) => {
389 log::warn!("Failed to decode stream message: {e}");
390 return;
391 }
392 };
393
394 match msg {
395 StreamMessage::OrderChange(ocm) => {
396 if ocm.is_heartbeat() {
397 return;
398 }
399
400 let Some(order_changes) = &ocm.oc else {
401 return;
402 };
403
404 let ts_event = parse_millis_timestamp(ocm.pt);
405 let ts_init = ts_event;
406
407 for omc in order_changes {
408 if let Some(ref filter) = market_ids_filter
409 && !filter.contains(&omc.id)
410 {
411 continue;
412 }
413 let Some(orc_list) = &omc.orc else {
414 continue;
415 };
416
417 for orc in orc_list {
418 let handicap = orc.hc.unwrap_or(Decimal::ZERO);
419 let instrument_id = make_instrument_id(&omc.id, orc.id, handicap);
420
421 let Some(unmatched_orders) = &orc.uo else {
422 continue;
423 };
424
425 for uo in unmatched_orders {
426 if ignore_external_orders && uo.rfo.is_none() {
427 continue;
428 }
429
430 Self::process_unmatched_order(
431 uo,
432 instrument_id,
433 account_id,
434 currency,
435 &emitter,
436 &ocm_state,
437 ts_event,
438 ts_init,
439 );
440
441 if uo.status == StreamingOrderStatus::ExecutionComplete
442 && uo.sv.is_some_and(|sv| sv > Decimal::ZERO)
443 {
444 let sv = uo.sv.unwrap();
445 let side_str = match uo.side {
446 StreamingSide::Back => "BACK",
447 StreamingSide::Lay => "LAY",
448 };
449 let dec_to_f64 = |d: Decimal| -> f64 {
450 d.to_string().parse::<f64>().unwrap_or(0.0)
451 };
452 let voided = BetfairOrderVoided::new(
453 instrument_id,
454 uo.rfo.as_deref().unwrap_or("").to_string(),
455 uo.id.clone(),
456 dec_to_f64(sv),
457 dec_to_f64(uo.p),
458 dec_to_f64(uo.s),
459 side_str.to_string(),
460 uo.avp.map_or(f64::NAN, dec_to_f64),
461 uo.sm.map_or(f64::NAN, dec_to_f64),
462 String::new(),
463 ts_event,
464 ts_init,
465 );
466 log::info!("Order voided: bet_id={}, size_voided={sv}", uo.id,);
467 let custom = custom_data_with_instrument(
468 Arc::new(voided),
469 instrument_id,
470 );
471
472 if let Err(e) =
473 data_sender.send(DataEvent::Data(Data::Custom(custom)))
474 {
475 log::warn!("Failed to send voided event: {e}");
476 }
477 }
478 }
479 }
480 }
481 }
482 StreamMessage::Connection(_) => {
483 if has_initial_connection.swap(true, Ordering::SeqCst) {
484 log::info!("Betfair execution stream reconnected");
485 let _ = reconnect_tx.send(());
486 } else {
487 log::info!("Betfair execution stream connected");
488 }
489 }
490 StreamMessage::Status(status) => {
491 if status.connection_closed {
492 log::error!(
493 "Betfair execution stream closed: {:?} - {:?}",
494 status.error_code,
495 status.error_message,
496 );
497 }
498 }
499 StreamMessage::MarketChange(_) | StreamMessage::RaceChange(_) => {}
500 }
501 })
502 }
503
504 #[expect(clippy::too_many_arguments)]
505 fn process_unmatched_order(
506 uo: &crate::stream::messages::UnmatchedOrder,
507 instrument_id: InstrumentId,
508 account_id: AccountId,
509 currency: Currency,
510 emitter: &ExecutionEventEmitter,
511 ocm_state: &Arc<Mutex<OcmState>>,
512 ts_event: UnixNanos,
513 ts_init: UnixNanos,
514 ) -> bool {
515 let mut report =
516 match parse_order_status_report(uo, instrument_id, account_id, ts_event, ts_init) {
517 Ok(report) => report,
518 Err(e) => {
519 log::warn!("Failed to parse order status report for {instrument_id}: {e}");
520 return false;
521 }
522 };
523
524 let Ok(mut state) = ocm_state.lock() else {
525 log::error!("OcmState mutex poisoned");
526 return false;
527 };
528
529 if state.terminal_orders.contains(&uo.id) {
530 return false;
531 }
532
533 let resolved_client_order_id = state.resolve_client_order_id(uo.rfo.as_deref());
534
535 if resolved_client_order_id.is_some() {
538 report.client_order_id = resolved_client_order_id;
539 }
540
541 if uo.status == StreamingOrderStatus::ExecutionComplete
542 && has_cancel_quantity(uo)
543 && let Some(ref client_oid) = resolved_client_order_id
544 {
545 if state.should_suppress_cancel(client_oid, &uo.id) {
546 log::debug!(
547 "Suppressing cancel for bet_id={} (pending replace or already replaced)",
548 uo.id,
549 );
550 return false;
551 }
552
553 if state.try_mark_terminal(&uo.id) {
554 log::debug!("Duplicate terminal event for bet_id={}, skipping", uo.id);
555 return false;
556 }
557 }
558
559 if let Some(client_oid) = resolved_client_order_id {
560 state.stream_reported_client_orders.insert(client_oid);
561 }
562
563 if let Some(mut fill_report) = state.fill_tracker.maybe_fill_report(
567 uo,
568 uo.s,
569 instrument_id,
570 account_id,
571 currency,
572 ts_event,
573 ts_init,
574 ) {
575 if resolved_client_order_id.is_some() {
576 fill_report.client_order_id = resolved_client_order_id;
577 }
578 log::debug!(
579 "Fill: bet_id={}, last_qty={}, last_px={}",
580 uo.id,
581 fill_report.last_qty,
582 fill_report.last_px,
583 );
584 emitter.send_fill_report(fill_report);
585 }
586
587 if report.order_status == OrderStatus::Canceled
588 && let Some(reason) = report.cancel_reason.as_deref()
589 {
590 log::info!(
591 "Betfair order {} ({}) canceled: reason={}, matched={}, canceled={}, lapsed={}, voided={}",
592 report
593 .client_order_id
594 .unwrap_or_else(|| ClientOrderId::from(uo.id.as_str())),
595 uo.id,
596 reason,
597 uo.sm.unwrap_or(Decimal::ZERO),
598 uo.sc.unwrap_or(Decimal::ZERO),
599 uo.sl.unwrap_or(Decimal::ZERO),
600 uo.sv.unwrap_or(Decimal::ZERO),
601 );
602 }
603
604 emitter.send_order_status_report(report);
605
606 if uo.status == StreamingOrderStatus::ExecutionComplete {
607 state.terminal_orders.insert(uo.id.clone());
608 state.fill_tracker.prune(&uo.id);
609
610 if let Some(ref client_oid) = resolved_client_order_id {
611 state.cleanup_terminal_order(client_oid);
612 }
613 }
614
615 true
616 }
617}
618
619#[async_trait(?Send)]
620impl ExecutionClient for BetfairExecutionClient {
621 fn is_connected(&self) -> bool {
622 self.core.is_connected()
623 }
624
625 fn client_id(&self) -> ClientId {
626 self.core.client_id
627 }
628
629 fn account_id(&self) -> AccountId {
630 self.core.account_id
631 }
632
633 fn venue(&self) -> Venue {
634 *BETFAIR_VENUE
635 }
636
637 fn oms_type(&self) -> OmsType {
638 self.core.oms_type
639 }
640
641 fn get_account(&self) -> Option<AccountAny> {
642 self.core.cache().account(&self.core.account_id).cloned()
643 }
644
645 fn generate_account_state(
646 &self,
647 balances: Vec<AccountBalance>,
648 margins: Vec<MarginBalance>,
649 reported: bool,
650 ts_event: UnixNanos,
651 ) -> anyhow::Result<()> {
652 self.emitter
653 .emit_account_state(balances, margins, reported, ts_event);
654 Ok(())
655 }
656
657 fn start(&mut self) -> anyhow::Result<()> {
658 if self.core.is_started() {
659 return Ok(());
660 }
661
662 let sender = get_exec_event_sender();
663 self.emitter.set_sender(sender);
664 self.core.set_started();
665
666 log::info!(
667 "Started: client_id={}, account_id={}",
668 self.core.client_id,
669 self.core.account_id,
670 );
671 Ok(())
672 }
673
674 fn stop(&mut self) -> anyhow::Result<()> {
675 if self.core.is_stopped() {
676 return Ok(());
677 }
678
679 self.core.set_stopped();
680 self.core.set_disconnected();
681 self.abort_background_tasks();
682 self.abort_pending_tasks();
683 log::info!("Stopped: client_id={}", self.core.client_id);
684 Ok(())
685 }
686
687 async fn connect(&mut self) -> anyhow::Result<()> {
688 if self.core.is_connected() {
689 return Ok(());
690 }
691
692 register_betfair_custom_data();
693
694 self.http_client
695 .connect()
696 .await
697 .map_err(|e| anyhow::anyhow!("{e}"))?;
698
699 let funds: AccountFundsResponse = self
700 .http_client
701 .send_accounts(METHOD_GET_ACCOUNT_FUNDS, serde_json::json!({}))
702 .await
703 .map_err(|e| anyhow::anyhow!("{e}"))?;
704
705 let ts_init = self.clock.get_time_ns();
706 let account_state = parse_account_state(
707 &funds,
708 self.core.account_id,
709 self.currency,
710 ts_init,
711 ts_init,
712 )?;
713 self.emitter.send_account_state(account_state);
714
715 let session_token = self
716 .http_client
717 .session_token()
718 .await
719 .ok_or_else(|| anyhow::anyhow!("No session token after login"))?;
720
721 self.sync_ocm_state_from_cache();
723
724 let market_ids_filter = self
725 .config
726 .stream_market_ids_filter
727 .as_ref()
728 .map(|ids| ids.iter().cloned().collect::<ahash::AHashSet<String>>());
729
730 let (reconnect_tx, mut reconnect_rx) = tokio::sync::mpsc::unbounded_channel();
731
732 let handler = Self::create_ocm_handler(
733 self.emitter.clone(),
734 self.core.account_id,
735 self.currency,
736 Arc::clone(&self.ocm_state),
737 get_data_event_sender(),
738 market_ids_filter,
739 self.config.ignore_external_orders,
740 reconnect_tx,
741 );
742
743 let stream_client = BetfairStreamClient::connect(
744 &self.credential,
745 session_token,
746 handler,
747 self.stream_config.clone(),
748 )
749 .await
750 .map_err(|e| anyhow::anyhow!("{e}"))?;
751
752 let stream_client = Arc::new(stream_client);
753
754 stream_client
755 .subscribe_orders(None, None)
756 .await
757 .map_err(|e| anyhow::anyhow!("{e}"))?;
758
759 self.stream_client = Some(stream_client);
760
761 let keep_alive_client = Arc::clone(&self.http_client);
763 let keep_alive_stream = Arc::clone(self.stream_client.as_ref().unwrap());
764 let keep_alive_app_key = self.credential.app_key().to_string();
765
766 self.keep_alive_handle = Some(get_runtime().spawn(async move {
767 let interval = tokio::time::Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS);
768 loop {
769 tokio::time::sleep(interval).await;
770
771 match keep_alive_client.keep_alive().await {
772 Ok(()) => {}
773 Err(ref e) if e.is_login_failed() => {
774 log::warn!("Betfair execution session expired, attempting re-login: {e}");
775 if let Err(e) = keep_alive_client.reconnect().await {
776 log::error!("Betfair execution re-login failed: {e}");
777 continue;
778 }
779 }
780 Err(e) => {
781 log::warn!("Betfair execution keep-alive failed (transient): {e}");
782 continue;
783 }
784 }
785
786 if let Some(token) = keep_alive_client.session_token().await {
787 keep_alive_stream.update_auth(&keep_alive_app_key, token);
788 }
789 log::debug!("Betfair execution session keep-alive sent");
790 }
791 }));
792
793 if self.config.calculate_account_state && self.config.request_account_state_secs > 0 {
794 let acct_client = Arc::clone(&self.http_client);
795 let acct_emitter = self.emitter.clone();
796 let acct_id = self.core.account_id;
797 let acct_currency = self.currency;
798 let acct_clock = self.clock;
799 let interval_secs = self.config.request_account_state_secs;
800 self.account_state_handle = Some(get_runtime().spawn(async move {
801 let interval = tokio::time::Duration::from_secs(interval_secs);
802 loop {
803 tokio::time::sleep(interval).await;
804
805 match acct_client
806 .send_accounts::<AccountFundsResponse, _>(
807 METHOD_GET_ACCOUNT_FUNDS,
808 serde_json::json!({}),
809 )
810 .await
811 {
812 Ok(funds) => {
813 let ts_init = acct_clock.get_time_ns();
814
815 match parse_account_state(
816 &funds,
817 acct_id,
818 acct_currency,
819 ts_init,
820 ts_init,
821 ) {
822 Ok(state) => acct_emitter.send_account_state(state),
823 Err(e) => log::warn!("Failed to parse account state: {e}"),
824 }
825 }
826 Err(e) => log::warn!("Failed to fetch account state: {e}"),
827 }
828 }
829 }));
830 }
831
832 let reconnect_http = Arc::clone(&self.http_client);
833 let reconnect_stream = Arc::clone(self.stream_client.as_ref().unwrap());
834 let reconnect_app_key = self.credential.app_key().to_string();
835 let reconnect_emitter = self.emitter.clone();
836 let reconnect_clock = self.clock;
837 let reconnect_acct_id = self.core.account_id;
838 let reconnect_currency = self.currency;
839
840 self.reconnect_handle = Some(get_runtime().spawn(async move {
841 while reconnect_rx.recv().await.is_some() {
842 log::info!("Handling execution stream reconnection");
843
844 match reconnect_http.keep_alive().await {
845 Ok(()) => {}
846 Err(ref e) if e.is_login_failed() => {
847 log::warn!("Session expired on reconnect, attempting re-login: {e}");
848 if let Err(e) = reconnect_http.reconnect().await {
849 log::error!("Re-login failed on reconnect: {e}");
850 continue;
851 }
852 }
853 Err(e) => {
854 log::warn!("Keep-alive failed on reconnect (transient): {e}");
855 continue;
856 }
857 }
858
859 if let Some(token) = reconnect_http.session_token().await {
860 reconnect_stream.update_auth(&reconnect_app_key, token);
861 }
862
863 match reconnect_http
864 .send_accounts::<AccountFundsResponse, _>(
865 METHOD_GET_ACCOUNT_FUNDS,
866 serde_json::json!({}),
867 )
868 .await
869 {
870 Ok(funds) => {
871 let ts_init = reconnect_clock.get_time_ns();
872
873 match parse_account_state(
874 &funds,
875 reconnect_acct_id,
876 reconnect_currency,
877 ts_init,
878 ts_init,
879 ) {
880 Ok(state) => reconnect_emitter.send_account_state(state),
881 Err(e) => {
882 log::warn!("Failed to parse account state on reconnect: {e}");
883 }
884 }
885 }
886 Err(e) => log::warn!("Failed to fetch account state on reconnect: {e}"),
887 }
888 }
889 }));
890
891 self.core.set_connected();
892
893 log::info!("Connected: client_id={}", self.core.client_id);
894 Ok(())
895 }
896
897 async fn disconnect(&mut self) -> anyhow::Result<()> {
898 if self.core.is_disconnected() {
899 return Ok(());
900 }
901
902 self.abort_background_tasks();
903 self.abort_pending_tasks();
904
905 if let Some(client) = &self.stream_client {
906 client.close().await;
907 }
908
909 self.http_client.disconnect().await;
910 self.core.set_disconnected();
911
912 log::info!("Disconnected: client_id={}", self.core.client_id);
913 Ok(())
914 }
915
916 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
917 let http_client = Arc::clone(&self.http_client);
918 let emitter = self.emitter.clone();
919 let account_id = self.core.account_id;
920 let ocm_state = Arc::clone(&self.ocm_state);
921 let clock = self.clock;
922 let client_order_id = cmd.client_order_id;
923 let venue_order_id = cmd.venue_order_id;
924 let instrument_id = cmd.instrument_id;
925
926 self.spawn_task("query_order", async move {
927 let mut candidates: Vec<CurrentOrderSummary> = Vec::new();
928 let mut seen_bet_ids: AHashSet<String> = AHashSet::new();
929
930 let rfo = make_customer_order_ref(client_order_id.as_str());
934 let rfo_params = list_current_orders_filter_ref(rfo.clone());
935 match list_current_orders_with_retry(&http_client, &rfo_params).await {
936 Ok(r) => extend_unique(&mut candidates, &mut seen_bet_ids, r.current_orders),
937 Err(e) => log::warn!("Betfair query_order ref lookup failed: {e}"),
938 }
939
940 if candidates.is_empty() {
941 let rfo_legacy = make_customer_order_ref_legacy(client_order_id.as_str());
942 if rfo_legacy != rfo {
943 let legacy_params = list_current_orders_filter_ref(rfo_legacy);
944 match list_current_orders_with_retry(&http_client, &legacy_params).await {
945 Ok(r) => {
946 extend_unique(&mut candidates, &mut seen_bet_ids, r.current_orders);
947 }
948 Err(e) => log::warn!("Betfair query_order legacy lookup failed: {e}"),
949 }
950 }
951 }
952
953 if let Some(ref bet_id) = venue_order_id {
957 let params = list_current_orders_filter_bet_id(bet_id.to_string());
958 match list_current_orders_with_retry(&http_client, ¶ms).await {
959 Ok(r) => extend_unique(&mut candidates, &mut seen_bet_ids, r.current_orders),
960 Err(e) => log::warn!("Betfair query_order bet_id lookup failed: {e}"),
961 }
962 }
963
964 if candidates.is_empty() {
965 log::warn!(
966 "Betfair query_order found no order for client_order_id={client_order_id}, venue_order_id={venue_order_id:?}",
967 );
968 return Ok(());
969 }
970
971 let Some(order) = select_order_for_query(
972 &candidates,
973 instrument_id,
974 client_order_id,
975 venue_order_id,
976 ) else {
977 return Ok(());
978 };
979
980 let ts_init = clock.get_time_ns();
981 let mut report = match parse_current_order_report(order, account_id, ts_init) {
982 Ok(r) => r,
983 Err(e) => {
984 log::error!("Failed to parse order report for {}: {e}", order.bet_id);
985 return Ok(());
986 }
987 };
988
989 if report.client_order_id.is_none()
990 && let Some(rfo) = order.customer_order_ref.as_deref()
991 && let Ok(state) = ocm_state.lock()
992 && let Some(full_id) = state.resolve_client_order_id(Some(rfo))
993 {
994 report.client_order_id = Some(full_id);
995 }
996
997 if report.client_order_id.is_none() {
998 report.client_order_id = Some(client_order_id);
999 }
1000
1001 emitter.send_order_status_report(report);
1002 Ok(())
1003 });
1004
1005 Ok(())
1006 }
1007
1008 async fn generate_mass_status(
1009 &self,
1010 lookback_mins: Option<u64>,
1011 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1012 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1013
1014 let ts_now = self.clock.get_time_ns();
1015 let start = lookback_mins.map(|mins| {
1016 let lookback_ns = mins
1017 .saturating_mul(60)
1018 .saturating_mul(NANOSECONDS_IN_SECOND);
1019 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1020 });
1021
1022 let order_cmd = GenerateOrderStatusReportsBuilder::default()
1023 .ts_init(ts_now)
1024 .open_only(false)
1025 .start(start)
1026 .build()
1027 .map_err(|e| anyhow::anyhow!("{e}"))?;
1028
1029 let fill_cmd = GenerateFillReportsBuilder::default()
1030 .ts_init(ts_now)
1031 .start(start)
1032 .build()
1033 .map_err(|e| anyhow::anyhow!("{e}"))?;
1034
1035 let (order_reports, fill_reports) = tokio::try_join!(
1036 self.generate_order_status_reports(&order_cmd),
1037 self.generate_fill_reports(fill_cmd),
1038 )?;
1039
1040 log::info!("Received {} OrderStatusReports", order_reports.len());
1041 log::info!("Received {} FillReports", fill_reports.len());
1042
1043 let mut mass_status = ExecutionMassStatus::new(
1044 self.core.client_id,
1045 self.core.account_id,
1046 *BETFAIR_VENUE,
1047 ts_now,
1048 None,
1049 );
1050
1051 mass_status.add_order_reports(order_reports);
1052 mass_status.add_fill_reports(fill_reports);
1053
1054 Ok(Some(mass_status))
1055 }
1056
1057 async fn generate_order_status_reports(
1058 &self,
1059 cmd: &GenerateOrderStatusReports,
1060 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1061 let order_projection = if cmd.open_only {
1062 Some(OrderProjection::Executable)
1063 } else {
1064 Some(OrderProjection::All)
1065 };
1066
1067 let ts_init = self.clock.get_time_ns();
1068 let mut reports = Vec::new();
1069 let mut from_record: u32 = 0;
1070
1071 loop {
1072 let params = ListCurrentOrdersParams {
1073 bet_ids: None,
1074 market_ids: self.reconcile_market_ids(),
1075 order_projection,
1076 customer_order_refs: None,
1077 customer_strategy_refs: None,
1078 date_range: None,
1079 order_by: None,
1080 sort_dir: None,
1081 from_record: if from_record > 0 {
1082 Some(from_record)
1083 } else {
1084 None
1085 },
1086 record_count: None,
1087 };
1088
1089 let response: CurrentOrderSummaryReport = match self
1090 .http_client
1091 .send_betting(METHOD_LIST_CURRENT_ORDERS, ¶ms)
1092 .await
1093 {
1094 Ok(r) => r,
1095 Err(e) if e.is_session_error() || e.is_rate_limit_error() => {
1096 if e.is_rate_limit_error() {
1097 log::warn!("Rate limited, retrying in {RATE_LIMIT_RETRY_DELAY_SECS}s");
1098 tokio::time::sleep(tokio::time::Duration::from_secs(
1099 RATE_LIMIT_RETRY_DELAY_SECS,
1100 ))
1101 .await;
1102 } else {
1103 log::warn!("Session error, refreshing session");
1104
1105 if self.http_client.keep_alive().await.is_err() {
1106 let _ = self.http_client.reconnect().await;
1107 }
1108 }
1109 self.http_client
1110 .send_betting(METHOD_LIST_CURRENT_ORDERS, ¶ms)
1111 .await
1112 .map_err(|e| anyhow::anyhow!("{e}"))?
1113 }
1114 Err(e) => anyhow::bail!("{e}"),
1115 };
1116
1117 let page_size = response.current_orders.len() as u32;
1118
1119 for order in &response.current_orders {
1120 match parse_current_order_report(order, self.core.account_id, ts_init) {
1121 Ok(mut r) => {
1122 if let Some(ref rfo) = order.customer_order_ref
1123 && let Ok(state) = self.ocm_state.lock()
1124 && let Some(full_id) = state.resolve_client_order_id(Some(rfo.as_str()))
1125 {
1126 r.client_order_id = Some(full_id);
1127 }
1128 reports.push(r);
1129 }
1130 Err(e) => {
1131 log::warn!("Failed to parse order report for {}: {e}", order.bet_id);
1132 }
1133 }
1134 }
1135
1136 if !response.more_available {
1137 break;
1138 }
1139
1140 from_record += page_size;
1141 }
1142
1143 log::info!("Generated {} order status reports", reports.len());
1144 Ok(reports)
1145 }
1146
1147 async fn generate_fill_reports(
1148 &self,
1149 cmd: GenerateFillReports,
1150 ) -> anyhow::Result<Vec<FillReport>> {
1151 let date_range = match (cmd.start, cmd.end) {
1152 (Some(start), Some(end)) => Some(TimeRange {
1153 from: Some(start.to_rfc3339()),
1154 to: Some(end.to_rfc3339()),
1155 }),
1156 (Some(start), None) => Some(TimeRange {
1157 from: Some(start.to_rfc3339()),
1158 to: None,
1159 }),
1160 (None, Some(end)) => Some(TimeRange {
1161 from: None,
1162 to: Some(end.to_rfc3339()),
1163 }),
1164 (None, None) => None,
1165 };
1166
1167 let ts_init = self.clock.get_time_ns();
1168 let mut reports = Vec::new();
1169 let mut from_record: u32 = 0;
1170
1171 loop {
1172 let params = ListCurrentOrdersParams {
1173 bet_ids: None,
1174 market_ids: self.reconcile_market_ids(),
1175 order_projection: Some(OrderProjection::All),
1176 customer_order_refs: None,
1177 customer_strategy_refs: None,
1178 date_range: date_range.clone(),
1179 order_by: None,
1180 sort_dir: None,
1181 from_record: if from_record > 0 {
1182 Some(from_record)
1183 } else {
1184 None
1185 },
1186 record_count: None,
1187 };
1188
1189 let response: CurrentOrderSummaryReport = match self
1190 .http_client
1191 .send_betting(METHOD_LIST_CURRENT_ORDERS, ¶ms)
1192 .await
1193 {
1194 Ok(r) => r,
1195 Err(e) if e.is_session_error() || e.is_rate_limit_error() => {
1196 if e.is_rate_limit_error() {
1197 log::warn!("Rate limited, retrying in {RATE_LIMIT_RETRY_DELAY_SECS}s");
1198 tokio::time::sleep(tokio::time::Duration::from_secs(
1199 RATE_LIMIT_RETRY_DELAY_SECS,
1200 ))
1201 .await;
1202 } else {
1203 log::warn!("Session error, refreshing session");
1204
1205 if self.http_client.keep_alive().await.is_err() {
1206 let _ = self.http_client.reconnect().await;
1207 }
1208 }
1209 self.http_client
1210 .send_betting(METHOD_LIST_CURRENT_ORDERS, ¶ms)
1211 .await
1212 .map_err(|e| anyhow::anyhow!("{e}"))?
1213 }
1214 Err(e) => anyhow::bail!("{e}"),
1215 };
1216
1217 let page_size = response.current_orders.len() as u32;
1218
1219 for order in &response.current_orders {
1220 let size_matched = order.size_matched.unwrap_or(Decimal::ZERO);
1221 if size_matched == Decimal::ZERO {
1222 continue;
1223 }
1224
1225 match parse_current_order_fill_report(
1226 order,
1227 self.core.account_id,
1228 self.currency,
1229 ts_init,
1230 ) {
1231 Ok(mut r) => {
1232 if let Some(ref rfo) = order.customer_order_ref
1233 && let Ok(state) = self.ocm_state.lock()
1234 && let Some(full_id) = state.resolve_client_order_id(Some(rfo.as_str()))
1235 {
1236 r.client_order_id = Some(full_id);
1237 }
1238 reports.push(r);
1239 }
1240 Err(e) => {
1241 log::warn!("Failed to parse fill report for {}: {e}", order.bet_id);
1242 }
1243 }
1244 }
1245
1246 if !response.more_available {
1247 break;
1248 }
1249
1250 from_record += page_size;
1251 }
1252
1253 log::info!("Generated {} fill reports", reports.len());
1254 Ok(reports)
1255 }
1256
1257 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
1258 let order = self.core.get_order(&cmd.client_order_id)?;
1259
1260 if order.is_closed() {
1261 log::warn!("Cannot submit closed order {}", order.client_order_id());
1262 return Ok(());
1263 }
1264
1265 if let Ok(mut state) = self.ocm_state.lock() {
1266 state.register_customer_order_ref(order.client_order_id());
1267 }
1268
1269 let instrument_id = order.instrument_id();
1270 let market_id = extract_market_id(&instrument_id)?;
1271 let (selection_id, handicap) = extract_selection_id(&instrument_id)?;
1272
1273 let side = BetfairSide::from(order.order_side());
1274 let size = order.quantity().as_decimal();
1275 let handicap_opt = if handicap == Decimal::ZERO {
1276 None
1277 } else {
1278 Some(handicap)
1279 };
1280 let customer_order_ref = Some(make_customer_order_ref(order.client_order_id().as_str()));
1281
1282 let instruction = match order.order_type() {
1283 OrderType::Limit => {
1284 let price = order
1285 .price()
1286 .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?
1287 .as_decimal();
1288
1289 if matches!(
1292 order.time_in_force(),
1293 TimeInForce::AtTheClose | TimeInForce::AtTheOpen
1294 ) {
1295 PlaceInstruction {
1296 order_type: BetfairOrderType::LimitOnClose,
1297 selection_id,
1298 handicap: handicap_opt,
1299 side,
1300 limit_order: None,
1301 limit_on_close_order: Some(LimitOnCloseOrder {
1302 liability: size,
1303 price,
1304 }),
1305 market_on_close_order: None,
1306 customer_order_ref,
1307 }
1308 } else {
1309 let (persistence_type, time_in_force, min_fill_size) =
1310 match order.time_in_force() {
1311 TimeInForce::Ioc => (
1312 None,
1313 Some(BetfairTimeInForce::FillOrKill),
1314 Some(Decimal::ZERO),
1315 ),
1316 TimeInForce::Fok => (None, Some(BetfairTimeInForce::FillOrKill), None),
1317 TimeInForce::Gtc => (Some(PersistenceType::Persist), None, None),
1318 _ => (Some(PersistenceType::Lapse), None, None),
1319 };
1320
1321 PlaceInstruction {
1322 order_type: BetfairOrderType::Limit,
1323 selection_id,
1324 handicap: handicap_opt,
1325 side,
1326 limit_order: Some(LimitOrder {
1327 size,
1328 price,
1329 persistence_type,
1330 time_in_force,
1331 min_fill_size,
1332 bet_target_type: None,
1333 bet_target_size: None,
1334 }),
1335 limit_on_close_order: None,
1336 market_on_close_order: None,
1337 customer_order_ref,
1338 }
1339 }
1340 }
1341 OrderType::Market => {
1342 if order.time_in_force() != TimeInForce::AtTheClose {
1343 anyhow::bail!(
1344 "Market orders on Betfair are only supported with AtTheClose \
1345 time in force (BSP MarketOnClose)"
1346 );
1347 }
1348 PlaceInstruction {
1349 order_type: BetfairOrderType::MarketOnClose,
1350 selection_id,
1351 handicap: handicap_opt,
1352 side,
1353 limit_order: None,
1354 limit_on_close_order: None,
1355 market_on_close_order: Some(MarketOnCloseOrder { liability: size }),
1356 customer_order_ref,
1357 }
1358 }
1359 other => {
1360 anyhow::bail!("Unsupported order type for Betfair: {other:?}");
1361 }
1362 };
1363
1364 let market_version = self.get_market_version(&instrument_id);
1365
1366 let params = PlaceOrdersParams {
1367 market_id,
1368 instructions: vec![instruction],
1369 customer_ref: None,
1370 market_version,
1371 customer_strategy_ref: None,
1372 };
1373
1374 let client_order_id = order.client_order_id();
1375 let strategy_id = order.strategy_id();
1376
1377 log::debug!("OrderSubmitted client_order_id={client_order_id}");
1378 self.emitter.emit_order_submitted(&order);
1379
1380 let http_client = Arc::clone(&self.http_client);
1381 let emitter = self.emitter.clone();
1382 let clock = self.clock;
1383 let ocm_state = Arc::clone(&self.ocm_state);
1384
1385 self.spawn_task("submit-order", async move {
1386 let report: PlaceExecutionReport = match http_client
1387 .send_betting_order(METHOD_PLACE_ORDERS, ¶ms)
1388 .await
1389 {
1390 Ok(r) => r,
1391 Err(e) => {
1392 if e.is_order_placement_ambiguous() {
1396 log::warn!(
1397 "Ambiguous submit response for {client_order_id}: {e}. \
1398 Order may be live, awaiting OCM reconciliation",
1399 );
1400 return Ok(());
1401 }
1402
1403 let ts_event = clock.get_time_ns();
1404 emitter.emit_order_rejected_event(
1405 strategy_id,
1406 instrument_id,
1407 client_order_id,
1408 &format!("submit-order error: {e}"),
1409 ts_event,
1410 false,
1411 );
1412 return Ok(());
1413 }
1414 };
1415
1416 if report.status == ExecutionReportStatus::Timeout {
1417 log::warn!(
1418 "Betfair Timeout for {client_order_id}. \
1419 Order may be live, awaiting OCM reconciliation",
1420 );
1421 return Ok(());
1422 }
1423
1424 if let Some(instruction_reports) = &report.instruction_reports {
1425 if let Some(ir) = instruction_reports.first() {
1426 if ir.status == InstructionReportStatus::Failure {
1427 let reason = format_place_instruction_reason(ir, &report);
1428 let ts_event = clock.get_time_ns();
1429 emitter.emit_order_rejected_event(
1430 strategy_id,
1431 instrument_id,
1432 client_order_id,
1433 &reason,
1434 ts_event,
1435 false,
1436 );
1437 return Ok(());
1438 }
1439
1440 if let Some(bet_id) = &ir.bet_id {
1441 let venue_order_id = VenueOrderId::from(bet_id.as_str());
1442 let ts_event = clock.get_time_ns();
1443
1444 if should_emit_http_accept(&ocm_state, &client_order_id) {
1445 emitter.emit_order_accepted(&order, venue_order_id, ts_event);
1446 }
1447 }
1448 } else if report.status == ExecutionReportStatus::Failure
1449 || report.status == ExecutionReportStatus::ProcessedWithErrors
1450 {
1451 let reason = format_betfair_reason(
1452 report.error_message.as_deref(),
1453 report.error_code,
1454 None,
1455 "unknown error",
1456 );
1457 let ts_event = clock.get_time_ns();
1458 emitter.emit_order_rejected_event(
1459 strategy_id,
1460 instrument_id,
1461 client_order_id,
1462 &reason,
1463 ts_event,
1464 false,
1465 );
1466 }
1467 } else if report.status == ExecutionReportStatus::Failure
1468 || report.status == ExecutionReportStatus::ProcessedWithErrors
1469 {
1470 let reason = format_betfair_reason(
1471 report.error_message.as_deref(),
1472 report.error_code,
1473 None,
1474 "unknown error",
1475 );
1476 let ts_event = clock.get_time_ns();
1477 emitter.emit_order_rejected_event(
1478 strategy_id,
1479 instrument_id,
1480 client_order_id,
1481 &reason,
1482 ts_event,
1483 false,
1484 );
1485 }
1486
1487 Ok(())
1488 });
1489
1490 Ok(())
1491 }
1492
1493 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1494 let instrument_id = cmd.instrument_id;
1495 let market_id = extract_market_id(&instrument_id)?;
1496
1497 let venue_order_id = cmd
1498 .venue_order_id
1499 .ok_or_else(|| anyhow::anyhow!("Cannot cancel order without venue_order_id"))?;
1500 let bet_id: BetId = venue_order_id.to_string();
1501
1502 let params = CancelOrdersParams {
1503 market_id: Some(market_id),
1504 instructions: Some(vec![CancelInstruction {
1505 bet_id,
1506 size_reduction: None,
1507 }]),
1508 customer_ref: None,
1509 };
1510
1511 let client_order_id = cmd.client_order_id;
1512 let strategy_id = cmd.strategy_id;
1513 let http_client = Arc::clone(&self.http_client);
1514 let emitter = self.emitter.clone();
1515 let clock = self.clock;
1516
1517 self.spawn_task("cancel-order", async move {
1518 let result: Result<CancelExecutionReport, _> = http_client
1519 .send_betting_order(METHOD_CANCEL_ORDERS, ¶ms)
1520 .await;
1521
1522 let report = match result {
1523 Ok(r) => r,
1524 Err(e) => {
1525 let ts_event = clock.get_time_ns();
1526 emitter.emit_order_cancel_rejected_event(
1527 strategy_id,
1528 instrument_id,
1529 client_order_id,
1530 Some(venue_order_id),
1531 &format!("cancel-order error: {e}"),
1532 ts_event,
1533 );
1534 return Ok(());
1535 }
1536 };
1537
1538 if report.status == ExecutionReportStatus::Timeout {
1539 log::warn!(
1540 "Betfair Timeout for cancel {client_order_id}. \
1541 Cancel may be delayed (in-play), awaiting OCM reconciliation",
1542 );
1543 return Ok(());
1544 }
1545
1546 if let Some(instruction_reports) = &report.instruction_reports
1547 && !instruction_reports.is_empty()
1548 {
1549 for ir in instruction_reports {
1550 match ir.status {
1551 InstructionReportStatus::Success => {}
1552 InstructionReportStatus::Timeout => {
1553 log::warn!(
1554 "Cancel instruction timeout for {client_order_id}",
1555 );
1556 }
1557 InstructionReportStatus::Failure => {
1558 if ir.error_code
1559 == Some(InstructionReportErrorCode::BetTakenOrLapsed)
1560 {
1561 log::debug!(
1562 "Cancel {client_order_id}: BetTakenOrLapsed, treating as success",
1563 );
1564 continue;
1565 }
1566
1567 let reason = format_cancel_instruction_reason(
1568 ir.error_message.as_deref(),
1569 ir.error_code,
1570 report.error_message.as_deref(),
1571 report.error_code,
1572 );
1573 let ts_event = clock.get_time_ns();
1574 emitter.emit_order_cancel_rejected_event(
1575 strategy_id,
1576 instrument_id,
1577 client_order_id,
1578 Some(venue_order_id),
1579 &reason,
1580 ts_event,
1581 );
1582 return Ok(());
1583 }
1584 }
1585 }
1586 } else if report.status != ExecutionReportStatus::Success {
1587 let reason = format_betfair_reason(
1588 report.error_message.as_deref(),
1589 report.error_code,
1590 None,
1591 "unknown error",
1592 );
1593 let ts_event = clock.get_time_ns();
1594 emitter.emit_order_cancel_rejected_event(
1595 strategy_id,
1596 instrument_id,
1597 client_order_id,
1598 Some(venue_order_id),
1599 &reason,
1600 ts_event,
1601 );
1602 }
1603
1604 Ok(())
1605 });
1606
1607 Ok(())
1608 }
1609
1610 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1611 let instrument_id = cmd.instrument_id;
1612 let market_id = extract_market_id(&instrument_id)?;
1613
1614 let venue_order_id = cmd
1615 .venue_order_id
1616 .ok_or_else(|| anyhow::anyhow!("Cannot modify order without venue_order_id"))?;
1617 let bet_id: BetId = venue_order_id.to_string();
1618
1619 let existing_order = self.core.get_order(&cmd.client_order_id);
1621 let has_price_change = match (&cmd.price, &existing_order) {
1622 (Some(new_price), Ok(order)) => order.price() != Some(*new_price),
1623 (Some(_), Err(_)) => true,
1624 (None, _) => false,
1625 };
1626 let has_quantity_change = match (&cmd.quantity, &existing_order) {
1627 (Some(new_qty), Ok(order)) => order.quantity() != *new_qty,
1628 (Some(_), Err(_)) => true,
1629 (None, _) => false,
1630 };
1631
1632 if has_price_change && has_quantity_change {
1634 let ts_event = self.clock.get_time_ns();
1635 self.emitter.emit_order_modify_rejected_event(
1636 cmd.strategy_id,
1637 instrument_id,
1638 cmd.client_order_id,
1639 Some(venue_order_id),
1640 "cannot modify price and quantity simultaneously on Betfair",
1641 ts_event,
1642 );
1643 return Ok(());
1644 }
1645
1646 let client_order_id = cmd.client_order_id;
1647 let strategy_id = cmd.strategy_id;
1648 let http_client = Arc::clone(&self.http_client);
1649 let emitter = self.emitter.clone();
1650 let clock = self.clock;
1651
1652 if has_price_change {
1653 let new_price = cmd.price.unwrap().as_decimal();
1654 let old_bet_id = bet_id.clone();
1655
1656 if let Ok(mut state) = self.ocm_state.lock() {
1660 state
1661 .pending_update_keys
1662 .insert((client_order_id, old_bet_id.clone()));
1663 }
1664
1665 let market_version = self.get_market_version(&instrument_id);
1666
1667 let params = ReplaceOrdersParams {
1668 market_id,
1669 instructions: vec![ReplaceInstruction { bet_id, new_price }],
1670 customer_ref: None,
1671 market_version,
1672 };
1673
1674 let ocm_state = Arc::clone(&self.ocm_state);
1675
1676 self.spawn_task("modify-order-price", async move {
1677 let result: Result<ReplaceExecutionReport, _> = http_client
1678 .send_betting_order(METHOD_REPLACE_ORDERS, ¶ms)
1679 .await;
1680
1681 match result {
1682 Ok(report) if report.status == ExecutionReportStatus::Success => {
1683 if let Ok(mut state) = ocm_state.lock() {
1684 state
1685 .pending_update_keys
1686 .remove(&(client_order_id, old_bet_id.clone()));
1687 state.replaced_venue_order_ids.insert(old_bet_id);
1688 }
1689 }
1690 Ok(report) if report.status == ExecutionReportStatus::Timeout => {
1691 log::warn!(
1692 "Betfair Timeout for modify {client_order_id}. \
1693 Replace may be pending, awaiting OCM reconciliation",
1694 );
1695 }
1696 Ok(report) => {
1697 if let Ok(mut state) = ocm_state.lock() {
1698 state
1699 .pending_update_keys
1700 .remove(&(client_order_id, old_bet_id));
1701 }
1702
1703 if let Some(instruction_reports) = &report.instruction_reports
1704 && !instruction_reports.is_empty()
1705 {
1706 for ir in instruction_reports {
1707 match ir.status {
1708 InstructionReportStatus::Success => {}
1709 InstructionReportStatus::Timeout => {
1710 log::warn!(
1711 "Replace instruction timeout for {client_order_id}",
1712 );
1713 }
1714 InstructionReportStatus::Failure => {
1715 let reason = format_replace_instruction_reason(ir, &report);
1716 let ts_event = clock.get_time_ns();
1717 emitter.emit_order_modify_rejected_event(
1718 strategy_id,
1719 instrument_id,
1720 client_order_id,
1721 Some(venue_order_id),
1722 &reason,
1723 ts_event,
1724 );
1725 return Ok(());
1726 }
1727 }
1728 }
1729 }
1730
1731 let reason = format_betfair_reason(
1732 report.error_message.as_deref(),
1733 report.error_code,
1734 None,
1735 "unknown error",
1736 );
1737 let ts_event = clock.get_time_ns();
1738 emitter.emit_order_modify_rejected_event(
1739 strategy_id,
1740 instrument_id,
1741 client_order_id,
1742 Some(venue_order_id),
1743 &reason,
1744 ts_event,
1745 );
1746 }
1747 Err(e) => {
1748 if let Ok(mut state) = ocm_state.lock() {
1749 state
1750 .pending_update_keys
1751 .remove(&(client_order_id, old_bet_id));
1752 }
1753 let ts_event = clock.get_time_ns();
1754 emitter.emit_order_modify_rejected_event(
1755 strategy_id,
1756 instrument_id,
1757 client_order_id,
1758 Some(venue_order_id),
1759 &format!("modify-order error: {e}"),
1760 ts_event,
1761 );
1762 }
1763 }
1764
1765 Ok(())
1766 });
1767 } else if has_quantity_change {
1768 let order = self.core.get_order(&client_order_id)?;
1770 let existing_qty = order.quantity().as_decimal();
1771 let new_qty = cmd.quantity.unwrap().as_decimal();
1772
1773 if new_qty >= existing_qty {
1774 let ts_event = self.clock.get_time_ns();
1775 self.emitter.emit_order_modify_rejected_event(
1776 strategy_id,
1777 instrument_id,
1778 client_order_id,
1779 Some(venue_order_id),
1780 "can only reduce quantity on Betfair",
1781 ts_event,
1782 );
1783 return Ok(());
1784 }
1785
1786 let size_reduction = existing_qty - new_qty;
1787 let params = CancelOrdersParams {
1788 market_id: Some(market_id),
1789 instructions: Some(vec![CancelInstruction {
1790 bet_id,
1791 size_reduction: Some(size_reduction),
1792 }]),
1793 customer_ref: None,
1794 };
1795
1796 self.spawn_task("modify-order-quantity", async move {
1797 let result: Result<CancelExecutionReport, _> = http_client
1798 .send_betting_order(METHOD_CANCEL_ORDERS, ¶ms)
1799 .await;
1800
1801 match result {
1802 Ok(report) if report.status != ExecutionReportStatus::Success => {
1803 let reason = format_betfair_reason(
1804 report.error_message.as_deref(),
1805 report.error_code,
1806 None,
1807 "unknown error",
1808 );
1809 let ts_event = clock.get_time_ns();
1810 emitter.emit_order_modify_rejected_event(
1811 strategy_id,
1812 instrument_id,
1813 client_order_id,
1814 Some(venue_order_id),
1815 &reason,
1816 ts_event,
1817 );
1818 }
1819 Err(e) => {
1820 let ts_event = clock.get_time_ns();
1821 emitter.emit_order_modify_rejected_event(
1822 strategy_id,
1823 instrument_id,
1824 client_order_id,
1825 Some(venue_order_id),
1826 &format!("modify-order error: {e}"),
1827 ts_event,
1828 );
1829 }
1830 Ok(_) => {}
1831 }
1832
1833 Ok(())
1834 });
1835 } else {
1836 let ts_event = self.clock.get_time_ns();
1837 self.emitter.emit_order_modify_rejected_event(
1838 strategy_id,
1839 instrument_id,
1840 client_order_id,
1841 Some(venue_order_id),
1842 "no effective change in price or quantity",
1843 ts_event,
1844 );
1845 }
1846
1847 Ok(())
1848 }
1849
1850 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1851 let instrument_id = cmd.instrument_id;
1852 let market_id = extract_market_id(&instrument_id)?;
1853
1854 let params = CancelOrdersParams {
1855 market_id: Some(market_id),
1856 instructions: None,
1857 customer_ref: None,
1858 };
1859
1860 let http_client = Arc::clone(&self.http_client);
1861
1862 self.spawn_task("cancel-all-orders", async move {
1863 let result = http_client
1864 .send_betting_order::<serde_json::Value, _>(METHOD_CANCEL_ORDERS, ¶ms)
1865 .await;
1866
1867 if let Err(e) = result {
1868 log::warn!("Failed to cancel all orders: {e}");
1869 }
1870
1871 Ok(())
1872 });
1873
1874 Ok(())
1875 }
1876
1877 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1878 let instrument_id = cmd.instrument_id;
1879 let market_id = extract_market_id(&instrument_id)?;
1880
1881 let mut instructions = Vec::new();
1882 let mut valid_cancels = Vec::new();
1883
1884 for cancel in &cmd.cancels {
1885 match cancel.venue_order_id {
1886 Some(venue_order_id) => {
1887 let bet_id: BetId = venue_order_id.to_string();
1888 instructions.push(CancelInstruction {
1889 bet_id,
1890 size_reduction: None,
1891 });
1892 valid_cancels.push(cancel);
1893 }
1894 None => {
1895 let ts_event = self.clock.get_time_ns();
1896 self.emitter.emit_order_cancel_rejected_event(
1897 cancel.strategy_id,
1898 cancel.instrument_id,
1899 cancel.client_order_id,
1900 None,
1901 "no venue_order_id",
1902 ts_event,
1903 );
1904 }
1905 }
1906 }
1907
1908 if valid_cancels.is_empty() {
1909 return Ok(());
1910 }
1911
1912 let params = CancelOrdersParams {
1913 market_id: Some(market_id),
1914 instructions: Some(instructions),
1915 customer_ref: None,
1916 };
1917
1918 let cancel_data: Vec<_> = valid_cancels
1919 .iter()
1920 .map(|c| {
1921 (
1922 c.strategy_id,
1923 c.instrument_id,
1924 c.client_order_id,
1925 c.venue_order_id,
1926 )
1927 })
1928 .collect();
1929
1930 let http_client = Arc::clone(&self.http_client);
1931 let emitter = self.emitter.clone();
1932 let clock = self.clock;
1933
1934 self.spawn_task("batch-cancel-orders", async move {
1935 let report: CancelExecutionReport = match http_client
1936 .send_betting_order(METHOD_CANCEL_ORDERS, ¶ms)
1937 .await
1938 {
1939 Ok(r) => r,
1940 Err(e) => {
1941 let ts_event = clock.get_time_ns();
1942
1943 for (strategy_id, instr_id, client_oid, venue_oid) in &cancel_data {
1944 emitter.emit_order_cancel_rejected_event(
1945 *strategy_id,
1946 *instr_id,
1947 *client_oid,
1948 *venue_oid,
1949 &format!("batch-cancel error: {e}"),
1950 ts_event,
1951 );
1952 }
1953 return Ok(());
1954 }
1955 };
1956
1957 if report.status == ExecutionReportStatus::Failure {
1958 let reason = format_betfair_reason(
1959 report.error_message.as_deref(),
1960 report.error_code,
1961 None,
1962 "unknown error",
1963 );
1964
1965 if report.instruction_reports.is_none() {
1966 let ts_event = clock.get_time_ns();
1967
1968 for (strategy_id, instr_id, client_oid, venue_oid) in &cancel_data {
1969 emitter.emit_order_cancel_rejected_event(
1970 *strategy_id,
1971 *instr_id,
1972 *client_oid,
1973 *venue_oid,
1974 &reason,
1975 ts_event,
1976 );
1977 }
1978 return Ok(());
1979 }
1980 }
1981
1982 if let Some(instruction_reports) = &report.instruction_reports {
1983 for (ir, (strategy_id, instr_id, client_oid, venue_oid)) in
1984 instruction_reports.iter().zip(cancel_data.iter())
1985 {
1986 match ir.status {
1987 InstructionReportStatus::Success => {}
1988 InstructionReportStatus::Timeout => {
1989 log::warn!(
1990 "Cancel timeout for {client_oid}: leaving order state unchanged",
1991 );
1992 }
1993 InstructionReportStatus::Failure => {
1994 if ir.error_code == Some(InstructionReportErrorCode::BetTakenOrLapsed) {
1996 continue;
1997 }
1998
1999 let reason = format_cancel_instruction_reason(
2000 ir.error_message.as_deref(),
2001 ir.error_code,
2002 report.error_message.as_deref(),
2003 report.error_code,
2004 );
2005 let ts_event = clock.get_time_ns();
2006 emitter.emit_order_cancel_rejected_event(
2007 *strategy_id,
2008 *instr_id,
2009 *client_oid,
2010 *venue_oid,
2011 &reason,
2012 ts_event,
2013 );
2014 }
2015 }
2016 }
2017 }
2018
2019 Ok(())
2020 });
2021
2022 Ok(())
2023 }
2024
2025 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
2026 let instrument_id = cmd.instrument_id;
2027 let market_id = extract_market_id(&instrument_id)?;
2028 let (selection_id, handicap) = extract_selection_id(&instrument_id)?;
2029
2030 let handicap_opt = if handicap == Decimal::ZERO {
2031 None
2032 } else {
2033 Some(handicap)
2034 };
2035
2036 let mut instructions = Vec::new();
2037 let mut order_snapshots = Vec::new();
2038
2039 for client_order_id in &cmd.order_list.client_order_ids {
2040 let order = self.core.get_order(client_order_id)?;
2041
2042 if order.is_closed() {
2043 log::warn!("Skipping closed order {client_order_id}");
2044 continue;
2045 }
2046
2047 if let Ok(mut state) = self.ocm_state.lock() {
2048 state.register_customer_order_ref(order.client_order_id());
2049 }
2050
2051 let side = BetfairSide::from(order.order_side());
2052 let size = order.quantity().as_decimal();
2053 let customer_order_ref =
2054 Some(make_customer_order_ref(order.client_order_id().as_str()));
2055
2056 let instruction = match order.order_type() {
2057 OrderType::Limit => {
2058 let price = order
2059 .price()
2060 .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?
2061 .as_decimal();
2062
2063 if matches!(
2064 order.time_in_force(),
2065 TimeInForce::AtTheClose | TimeInForce::AtTheOpen
2066 ) {
2067 PlaceInstruction {
2068 order_type: BetfairOrderType::LimitOnClose,
2069 selection_id,
2070 handicap: handicap_opt,
2071 side,
2072 limit_order: None,
2073 limit_on_close_order: Some(LimitOnCloseOrder {
2074 liability: size,
2075 price,
2076 }),
2077 market_on_close_order: None,
2078 customer_order_ref,
2079 }
2080 } else {
2081 let (persistence_type, time_in_force, min_fill_size) = match order
2082 .time_in_force()
2083 {
2084 TimeInForce::Ioc => (
2085 None,
2086 Some(BetfairTimeInForce::FillOrKill),
2087 Some(Decimal::ZERO),
2088 ),
2089 TimeInForce::Fok => (None, Some(BetfairTimeInForce::FillOrKill), None),
2090 TimeInForce::Gtc => (Some(PersistenceType::Persist), None, None),
2091 _ => (Some(PersistenceType::Lapse), None, None),
2092 };
2093
2094 PlaceInstruction {
2095 order_type: BetfairOrderType::Limit,
2096 selection_id,
2097 handicap: handicap_opt,
2098 side,
2099 limit_order: Some(LimitOrder {
2100 size,
2101 price,
2102 persistence_type,
2103 time_in_force,
2104 min_fill_size,
2105 bet_target_type: None,
2106 bet_target_size: None,
2107 }),
2108 limit_on_close_order: None,
2109 market_on_close_order: None,
2110 customer_order_ref,
2111 }
2112 }
2113 }
2114 OrderType::Market => {
2115 if order.time_in_force() != TimeInForce::AtTheClose {
2116 anyhow::bail!(
2117 "Market orders on Betfair are only supported with AtTheClose \
2118 time in force (BSP MarketOnClose)"
2119 );
2120 }
2121 PlaceInstruction {
2122 order_type: BetfairOrderType::MarketOnClose,
2123 selection_id,
2124 handicap: handicap_opt,
2125 side,
2126 limit_order: None,
2127 limit_on_close_order: None,
2128 market_on_close_order: Some(MarketOnCloseOrder { liability: size }),
2129 customer_order_ref,
2130 }
2131 }
2132 other => {
2133 anyhow::bail!("Unsupported order type for Betfair: {other:?}");
2134 }
2135 };
2136
2137 instructions.push(instruction);
2138 order_snapshots.push((order.client_order_id(), order.strategy_id(), order.clone()));
2139 }
2140
2141 if instructions.is_empty() {
2142 return Ok(());
2143 }
2144
2145 let market_version = self.get_market_version(&instrument_id);
2146
2147 let params = PlaceOrdersParams {
2148 market_id,
2149 instructions,
2150 customer_ref: None,
2151 market_version,
2152 customer_strategy_ref: None,
2153 };
2154
2155 for (_, _, order) in &order_snapshots {
2156 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
2157 self.emitter.emit_order_submitted(order);
2158 }
2159
2160 let http_client = Arc::clone(&self.http_client);
2161 let emitter = self.emitter.clone();
2162 let clock = self.clock;
2163 let ocm_state = Arc::clone(&self.ocm_state);
2164
2165 self.spawn_task("submit-order-list", async move {
2166 let report: PlaceExecutionReport = match http_client
2167 .send_betting_order(METHOD_PLACE_ORDERS, ¶ms)
2168 .await
2169 {
2170 Ok(r) => r,
2171 Err(e) => {
2172 if e.is_order_placement_ambiguous() {
2173 log::warn!(
2174 "Ambiguous submit response for order list: {e}. \
2175 Orders may be live, awaiting OCM reconciliation",
2176 );
2177 return Ok(());
2178 }
2179
2180 let ts_event = clock.get_time_ns();
2181
2182 for (client_oid, strategy_id, _) in &order_snapshots {
2183 emitter.emit_order_rejected_event(
2184 *strategy_id,
2185 instrument_id,
2186 *client_oid,
2187 &format!("submit-order-list error: {e}"),
2188 ts_event,
2189 false,
2190 );
2191 }
2192 return Ok(());
2193 }
2194 };
2195
2196 if report.status == ExecutionReportStatus::Failure {
2197 let reason = format_betfair_reason(
2198 report.error_message.as_deref(),
2199 report.error_code,
2200 None,
2201 "unknown error",
2202 );
2203
2204 if report.instruction_reports.is_none() {
2205 let ts_event = clock.get_time_ns();
2206
2207 for (client_oid, strategy_id, _) in &order_snapshots {
2208 emitter.emit_order_rejected_event(
2209 *strategy_id,
2210 instrument_id,
2211 *client_oid,
2212 &reason,
2213 ts_event,
2214 false,
2215 );
2216 }
2217 return Ok(());
2218 }
2219 }
2220
2221 if report.status == ExecutionReportStatus::Timeout {
2222 log::warn!(
2223 "Betfair Timeout for order list. \
2224 Orders may be live, awaiting OCM reconciliation",
2225 );
2226 return Ok(());
2227 }
2228
2229 if let Some(instruction_reports) = &report.instruction_reports {
2230 for (ir, (client_oid, strategy_id, order)) in
2231 instruction_reports.iter().zip(order_snapshots.iter())
2232 {
2233 match ir.status {
2234 InstructionReportStatus::Success => {
2235 if let Some(bet_id) = &ir.bet_id {
2236 let venue_order_id = VenueOrderId::from(bet_id.as_str());
2237 let ts_event = clock.get_time_ns();
2238
2239 if should_emit_http_accept(&ocm_state, client_oid) {
2240 emitter.emit_order_accepted(order, venue_order_id, ts_event);
2241 }
2242 }
2243 }
2244 InstructionReportStatus::Timeout => {
2245 log::warn!(
2246 "Submit timeout for {client_oid}: \
2247 leaving SUBMITTED for reconciliation",
2248 );
2249 }
2250 InstructionReportStatus::Failure => {
2251 let reason = format_place_instruction_reason(ir, &report);
2252 let ts_event = clock.get_time_ns();
2253 emitter.emit_order_rejected_event(
2254 *strategy_id,
2255 instrument_id,
2256 *client_oid,
2257 &reason,
2258 ts_event,
2259 false,
2260 );
2261 }
2262 }
2263 }
2264 }
2265
2266 Ok(())
2267 });
2268
2269 Ok(())
2270 }
2271}
2272
2273fn list_current_orders_filter_bet_id(bet_id: String) -> ListCurrentOrdersParams {
2274 ListCurrentOrdersParams {
2275 bet_ids: Some(vec![bet_id]),
2276 market_ids: None,
2277 order_projection: None,
2278 customer_order_refs: None,
2279 customer_strategy_refs: None,
2280 date_range: None,
2281 order_by: None,
2282 sort_dir: None,
2283 from_record: None,
2284 record_count: None,
2285 }
2286}
2287
2288fn list_current_orders_filter_ref(customer_order_ref: String) -> ListCurrentOrdersParams {
2289 ListCurrentOrdersParams {
2290 bet_ids: None,
2291 market_ids: None,
2292 order_projection: None,
2293 customer_order_refs: Some(vec![customer_order_ref]),
2294 customer_strategy_refs: None,
2295 date_range: None,
2296 order_by: None,
2297 sort_dir: None,
2298 from_record: None,
2299 record_count: None,
2300 }
2301}
2302
2303fn extend_unique(
2304 candidates: &mut Vec<CurrentOrderSummary>,
2305 seen: &mut AHashSet<String>,
2306 orders: Vec<CurrentOrderSummary>,
2307) {
2308 for order in orders {
2309 if seen.insert(order.bet_id.clone()) {
2310 candidates.push(order);
2311 }
2312 }
2313}
2314
2315fn select_order_for_query(
2316 orders: &[CurrentOrderSummary],
2317 expected_instrument_id: InstrumentId,
2318 expected_client_order_id: ClientOrderId,
2319 expected_venue_order_id: Option<VenueOrderId>,
2320) -> Option<&CurrentOrderSummary> {
2321 let matching: Vec<&CurrentOrderSummary> = orders
2322 .iter()
2323 .filter(|o| {
2324 make_instrument_id(&o.market_id, o.selection_id, o.handicap) == expected_instrument_id
2325 })
2326 .collect();
2327
2328 let candidates: Vec<&CurrentOrderSummary> = if matching.is_empty() {
2329 if let Some(vid) = expected_venue_order_id
2334 && let Some(order) = orders.iter().find(|o| o.bet_id == vid.as_str())
2335 {
2336 return Some(order);
2337 }
2338 log::warn!(
2339 "Betfair query_order returned {} orders for client_order_id={expected_client_order_id}, none matching instrument {expected_instrument_id}; skipping to avoid cross-instrument reconciliation",
2340 orders.len(),
2341 );
2342 return None;
2343 } else {
2344 matching
2345 };
2346
2347 let executable: Vec<&CurrentOrderSummary> = candidates
2350 .iter()
2351 .copied()
2352 .filter(|o| o.status == BetfairOrderStatus::Executable)
2353 .collect();
2354
2355 let pool = if executable.is_empty() {
2356 candidates
2357 } else {
2358 executable
2359 };
2360
2361 pool.into_iter()
2364 .max_by(|a, b| a.placed_date.cmp(&b.placed_date))
2365}
2366
2367async fn list_current_orders_with_retry(
2368 http_client: &Arc<BetfairHttpClient>,
2369 params: &ListCurrentOrdersParams,
2370) -> anyhow::Result<CurrentOrderSummaryReport> {
2371 match http_client
2372 .send_betting(METHOD_LIST_CURRENT_ORDERS, params)
2373 .await
2374 {
2375 Ok(r) => Ok(r),
2376 Err(e) if e.is_session_error() || e.is_rate_limit_error() => {
2377 if e.is_rate_limit_error() {
2378 log::warn!("Rate limited, retrying in {RATE_LIMIT_RETRY_DELAY_SECS}s");
2379 tokio::time::sleep(tokio::time::Duration::from_secs(
2380 RATE_LIMIT_RETRY_DELAY_SECS,
2381 ))
2382 .await;
2383 } else {
2384 log::warn!("Session error, refreshing session");
2385
2386 if http_client.keep_alive().await.is_err() {
2387 let _ = http_client.reconnect().await;
2388 }
2389 }
2390 http_client
2391 .send_betting(METHOD_LIST_CURRENT_ORDERS, params)
2392 .await
2393 .map_err(|e| anyhow::anyhow!("{e}"))
2394 }
2395 Err(e) => Err(anyhow::anyhow!("{e}")),
2396 }
2397}
2398
2399fn should_emit_http_accept(
2400 ocm_state: &Arc<Mutex<OcmState>>,
2401 client_order_id: &ClientOrderId,
2402) -> bool {
2403 let Ok(state) = ocm_state.lock() else {
2404 log::error!("OcmState mutex poisoned");
2405 return true;
2406 };
2407
2408 if state
2409 .stream_reported_client_orders
2410 .contains(client_order_id)
2411 {
2412 log::info!(
2413 "Suppressing late HTTP acceptance for {client_order_id}: OCM already reported order state"
2414 );
2415 return false;
2416 }
2417
2418 true
2419}
2420
2421fn format_betfair_reason(
2422 error_message: Option<&str>,
2423 error_code: Option<impl fmt::Debug>,
2424 fallback: Option<String>,
2425 unknown: &str,
2426) -> String {
2427 if let Some(message) = error_message
2428 .map(str::trim)
2429 .filter(|message| !message.is_empty())
2430 {
2431 return match error_code {
2432 Some(code) => format!("{message} ({code:?})"),
2433 None => message.to_string(),
2434 };
2435 }
2436
2437 error_code
2438 .map(|code| format!("{code:?}"))
2439 .or(fallback.filter(|s| !s.trim().is_empty()))
2440 .unwrap_or_else(|| unknown.to_string())
2441}
2442
2443fn format_place_instruction_reason(
2444 instruction_report: &PlaceInstructionReport,
2445 report: &PlaceExecutionReport,
2446) -> String {
2447 format_betfair_reason(
2448 instruction_report.error_message.as_deref(),
2449 instruction_report.error_code,
2450 report_fallback(report.error_message.as_deref(), report.error_code),
2451 "unknown error",
2452 )
2453}
2454
2455fn format_cancel_instruction_reason(
2456 error_message: Option<&str>,
2457 error_code: Option<InstructionReportErrorCode>,
2458 report_error_message: Option<&str>,
2459 report_error_code: Option<ExecutionReportErrorCode>,
2460) -> String {
2461 format_betfair_reason(
2462 error_message,
2463 error_code,
2464 report_fallback(report_error_message, report_error_code),
2465 "unknown instruction error",
2466 )
2467}
2468
2469fn format_replace_instruction_reason(
2470 instruction_report: &ReplaceInstructionReport,
2471 report: &ReplaceExecutionReport,
2472) -> String {
2473 let nested_reason = instruction_report
2474 .place_instruction_report
2475 .as_ref()
2476 .and_then(|ir| instruction_fallback(ir.error_message.as_deref(), ir.error_code))
2477 .or_else(|| {
2478 instruction_report
2479 .cancel_instruction_report
2480 .as_ref()
2481 .and_then(|ir| instruction_fallback(ir.error_message.as_deref(), ir.error_code))
2482 });
2483
2484 format_betfair_reason(
2485 instruction_report.error_message.as_deref(),
2486 instruction_report.error_code,
2487 nested_reason
2488 .or_else(|| report_fallback(report.error_message.as_deref(), report.error_code)),
2489 "unknown instruction error",
2490 )
2491}
2492
2493fn report_fallback(
2494 error_message: Option<&str>,
2495 error_code: Option<ExecutionReportErrorCode>,
2496) -> Option<String> {
2497 error_message
2498 .map(str::trim)
2499 .filter(|s| !s.is_empty())
2500 .map(str::to_string)
2501 .or_else(|| error_code.map(|code| format!("{code:?}")))
2502}
2503
2504fn instruction_fallback(
2505 error_message: Option<&str>,
2506 error_code: Option<InstructionReportErrorCode>,
2507) -> Option<String> {
2508 error_message
2509 .map(str::trim)
2510 .filter(|s| !s.is_empty())
2511 .map(str::to_string)
2512 .or_else(|| error_code.map(|code| format!("{code:?}")))
2513}
2514
2515#[cfg(test)]
2516mod tests {
2517 use nautilus_model::types::Quantity;
2518 use rstest::rstest;
2519 use rust_decimal::Decimal;
2520
2521 use super::*;
2522
2523 #[rstest]
2524 #[case(
2525 Some("Price out of range"),
2526 Some(InstructionReportErrorCode::InvalidOdds),
2527 None,
2528 "unknown",
2529 "Price out of range (InvalidOdds)"
2530 )]
2531 #[case(
2532 Some("Price out of range"),
2533 None,
2534 None,
2535 "unknown",
2536 "Price out of range"
2537 )]
2538 #[case(
2539 None,
2540 Some(InstructionReportErrorCode::ErrorInOrder),
2541 None,
2542 "unknown",
2543 "ErrorInOrder"
2544 )]
2545 #[case(None, None, Some("report-level msg".to_string()), "unknown", "report-level msg")]
2546 #[case(None, None, None, "unknown error", "unknown error")]
2547 #[case(
2548 Some(" "),
2549 Some(InstructionReportErrorCode::ErrorInOrder),
2550 None,
2551 "unknown",
2552 "ErrorInOrder"
2553 )]
2554 #[case(Some(""), None, Some(String::new()), "fallback", "fallback")]
2555 #[case(Some(" \n "), None, Some(" ".to_string()), "unknown", "unknown")]
2556 fn test_format_betfair_reason(
2557 #[case] error_message: Option<&str>,
2558 #[case] error_code: Option<InstructionReportErrorCode>,
2559 #[case] fallback: Option<String>,
2560 #[case] unknown: &str,
2561 #[case] expected: &str,
2562 ) {
2563 assert_eq!(
2564 format_betfair_reason(error_message, error_code, fallback, unknown),
2565 expected,
2566 );
2567 }
2568
2569 #[rstest]
2570 fn test_ocm_state_register_and_resolve() {
2571 let mut state = OcmState::default();
2572 let client_oid = ClientOrderId::from("O-20240101-001");
2573
2574 state.register_customer_order_ref(client_oid);
2575
2576 let rfo = make_customer_order_ref(client_oid.as_str());
2577 let resolved = state.resolve_client_order_id(Some(&rfo));
2578 assert_eq!(resolved, Some(client_oid));
2579 }
2580
2581 #[rstest]
2582 fn test_ocm_state_resolve_none_for_unknown_rfo() {
2583 let state = OcmState::default();
2584 assert!(state.resolve_client_order_id(Some("unknown")).is_none());
2585 assert!(state.resolve_client_order_id(None).is_none());
2586 }
2587
2588 #[rstest]
2589 fn test_ocm_state_register_with_legacy() {
2590 let mut state = OcmState::default();
2591 let id = "O-20240101-550e8400-e29b-41d4-a716-446655440000";
2592 let client_oid = ClientOrderId::from(id);
2593
2594 state.register_customer_order_ref_with_legacy(client_oid);
2595
2596 let rfo_current = make_customer_order_ref(id);
2597 let rfo_legacy = make_customer_order_ref_legacy(id);
2598 assert_ne!(rfo_current, rfo_legacy);
2599
2600 assert_eq!(
2601 state.resolve_client_order_id(Some(&rfo_current)),
2602 Some(client_oid)
2603 );
2604 assert_eq!(
2605 state.resolve_client_order_id(Some(&rfo_legacy)),
2606 Some(client_oid)
2607 );
2608 }
2609
2610 #[rstest]
2611 fn test_ocm_state_remove_customer_order_refs() {
2612 let mut state = OcmState::default();
2613 let id = "O-20240101-550e8400-e29b-41d4-a716-446655440000";
2614 let client_oid = ClientOrderId::from(id);
2615
2616 state.register_customer_order_ref_with_legacy(client_oid);
2617 state.remove_customer_order_refs(&client_oid);
2618
2619 let rfo_current = make_customer_order_ref(id);
2620 let rfo_legacy = make_customer_order_ref_legacy(id);
2621 assert!(state.resolve_client_order_id(Some(&rfo_current)).is_none());
2622 assert!(state.resolve_client_order_id(Some(&rfo_legacy)).is_none());
2623 }
2624
2625 #[rstest]
2626 fn test_should_emit_http_accept_without_stream_report() {
2627 let state = Arc::new(Mutex::new(OcmState::default()));
2628 let client_oid = ClientOrderId::from("O-001");
2629
2630 assert!(should_emit_http_accept(&state, &client_oid));
2631 }
2632
2633 #[rstest]
2634 fn test_should_not_emit_http_accept_after_stream_report() {
2635 let client_oid = ClientOrderId::from("O-001");
2636 let mut inner = OcmState::default();
2637 inner.stream_reported_client_orders.insert(client_oid);
2638 let state = Arc::new(Mutex::new(inner));
2639
2640 assert!(!should_emit_http_accept(&state, &client_oid));
2641 }
2642
2643 #[rstest]
2644 fn test_ocm_state_terminal_deduplication() {
2645 let mut state = OcmState::default();
2646
2647 assert!(!state.try_mark_terminal("bet123"));
2649
2650 assert!(state.try_mark_terminal("bet123"));
2652 }
2653
2654 #[rstest]
2655 fn test_ocm_state_suppress_cancel_for_replaced() {
2656 let mut state = OcmState::default();
2657 let client_oid = ClientOrderId::from("O-001");
2658
2659 state.replaced_venue_order_ids.insert("old_bet".to_string());
2660 assert!(state.should_suppress_cancel(&client_oid, "old_bet"));
2661 assert!(!state.should_suppress_cancel(&client_oid, "new_bet"));
2662 }
2663
2664 #[rstest]
2665 fn test_ocm_state_suppress_cancel_for_pending_replace() {
2666 let mut state = OcmState::default();
2667 let client_oid = ClientOrderId::from("O-001");
2668
2669 state
2670 .pending_update_keys
2671 .insert((client_oid, "old_bet".to_string()));
2672
2673 assert!(state.should_suppress_cancel(&client_oid, "old_bet"));
2674 assert!(!state.should_suppress_cancel(&client_oid, "other_bet"));
2675 }
2676
2677 #[rstest]
2678 fn test_ocm_state_cleanup_terminal_with_pending_replace() {
2679 let mut state = OcmState::default();
2680 let client_oid = ClientOrderId::from("O-001");
2681
2682 state.register_customer_order_ref(client_oid);
2683 state
2684 .pending_update_keys
2685 .insert((client_oid, "old_bet".to_string()));
2686
2687 state.cleanup_terminal_order(&client_oid);
2689 let rfo = make_customer_order_ref(client_oid.as_str());
2690 assert!(state.resolve_client_order_id(Some(&rfo)).is_some());
2691 }
2692
2693 #[rstest]
2694 fn test_ocm_state_cleanup_terminal_without_pending() {
2695 let mut state = OcmState::default();
2696 let client_oid = ClientOrderId::from("O-001");
2697
2698 state.register_customer_order_ref(client_oid);
2699
2700 state.cleanup_terminal_order(&client_oid);
2702 let rfo = make_customer_order_ref(client_oid.as_str());
2703 assert!(state.resolve_client_order_id(Some(&rfo)).is_none());
2704 }
2705
2706 #[rstest]
2707 fn test_ocm_state_sync_from_orders() {
2708 let mut state = OcmState::default();
2709
2710 let orders = vec![
2711 (
2712 "bet1".to_string(),
2713 ClientOrderId::from("O-001"),
2714 Decimal::new(10, 0),
2715 Decimal::new(25, 1),
2716 false,
2717 ),
2718 (
2719 "bet2".to_string(),
2720 ClientOrderId::from("O-002"),
2721 Decimal::new(5, 0),
2722 Decimal::new(30, 1),
2723 true,
2724 ),
2725 ];
2726
2727 state.sync_from_orders(&orders);
2728
2729 let rfo1 = make_customer_order_ref("O-001");
2731 assert!(state.resolve_client_order_id(Some(&rfo1)).is_some());
2732
2733 assert!(state.terminal_orders.contains("bet2"));
2735 let rfo2 = make_customer_order_ref("O-002");
2736 assert!(state.resolve_client_order_id(Some(&rfo2)).is_none());
2737 }
2738
2739 #[rstest]
2740 fn test_reconnect_signal_not_sent_on_initial_connection() {
2741 let has_initial_connection = Arc::new(AtomicBool::new(false));
2742 let (reconnect_tx, mut reconnect_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
2743
2744 let has_initial = Arc::clone(&has_initial_connection);
2745 let handler = move |_data: &[u8]| {
2746 if has_initial.swap(true, Ordering::SeqCst) {
2747 let _ = reconnect_tx.send(());
2748 }
2749 };
2750
2751 handler(br#"{"op":"connection","connectionId":"abc"}"#);
2753 assert!(reconnect_rx.try_recv().is_err());
2754 assert!(has_initial_connection.load(Ordering::SeqCst));
2755 }
2756
2757 #[rstest]
2758 fn test_reconnect_signal_sent_on_subsequent_connection() {
2759 let has_initial_connection = Arc::new(AtomicBool::new(false));
2760 let (reconnect_tx, mut reconnect_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
2761
2762 let has_initial = Arc::clone(&has_initial_connection);
2763 let tx = reconnect_tx;
2764 let handler = move |_data: &[u8]| {
2765 if has_initial.swap(true, Ordering::SeqCst) {
2766 let _ = tx.send(());
2767 }
2768 };
2769
2770 handler(br#"{"op":"connection","connectionId":"abc"}"#);
2772 assert!(reconnect_rx.try_recv().is_err());
2773
2774 handler(br#"{"op":"connection","connectionId":"def"}"#);
2776 assert!(reconnect_rx.try_recv().is_ok());
2777
2778 handler(br#"{"op":"connection","connectionId":"ghi"}"#);
2780 assert!(reconnect_rx.try_recv().is_ok());
2781 }
2782
2783 #[rstest]
2784 fn test_ocm_state_persists_across_reconnections() {
2785 let ocm_state = Arc::new(Mutex::new(OcmState::default()));
2786
2787 {
2789 let mut state = ocm_state.lock().unwrap();
2790 let orders = vec![
2791 (
2792 "bet1".to_string(),
2793 ClientOrderId::from("O-001"),
2794 Decimal::new(10, 0),
2795 Decimal::new(25, 1),
2796 false,
2797 ),
2798 (
2799 "bet2".to_string(),
2800 ClientOrderId::from("O-002"),
2801 Decimal::ZERO,
2802 Decimal::ZERO,
2803 true,
2804 ),
2805 ];
2806 state.sync_from_orders(&orders);
2807 }
2808
2809 let state = ocm_state.lock().unwrap();
2811 let rfo = make_customer_order_ref("O-001");
2812 assert_eq!(
2813 state.resolve_client_order_id(Some(&rfo)),
2814 Some(ClientOrderId::from("O-001")),
2815 );
2816 assert!(state.terminal_orders.contains("bet2"));
2817 assert!(!state.terminal_orders.contains("bet1"));
2818 }
2819
2820 #[rstest]
2821 fn test_ocm_state_sync_from_orders_populates_fill_tracker() {
2822 let mut state = OcmState::default();
2823
2824 let orders = vec![(
2825 "bet_fill".to_string(),
2826 ClientOrderId::from("O-FILL-001"),
2827 Decimal::new(15, 0),
2828 Decimal::new(30, 1),
2829 false,
2830 )];
2831
2832 state.sync_from_orders(&orders);
2833
2834 let uo = crate::stream::messages::UnmatchedOrder {
2837 id: "bet_fill".to_string(),
2838 p: Decimal::new(30, 1),
2839 s: Decimal::new(20, 0),
2840 side: crate::common::enums::StreamingSide::Back,
2841 status: crate::common::enums::StreamingOrderStatus::Executable,
2842 pt: Some(crate::common::enums::StreamingPersistenceType::Lapse),
2843 ot: crate::common::enums::StreamingOrderType::Limit,
2844 pd: 1617863365000,
2845 bsp: None,
2846 rfo: Some("O-FILL-001".to_string()),
2847 rfs: None,
2848 rc: None,
2849 rac: None,
2850 md: None,
2851 cd: None,
2852 ld: None,
2853 avp: Some(Decimal::new(30, 1)),
2854 sm: Some(Decimal::new(15, 0)),
2855 sr: None,
2856 sl: None,
2857 sc: None,
2858 sv: None,
2859 lsrc: None,
2860 };
2861
2862 let instrument_id = InstrumentId::from("1.234567-12345-0.0.BETFAIR");
2863 let result = state.fill_tracker.maybe_fill_report(
2864 &uo,
2865 uo.s,
2866 instrument_id,
2867 AccountId::from("BETFAIR-001"),
2868 Currency::from("GBP"),
2869 UnixNanos::default(),
2870 UnixNanos::default(),
2871 );
2872
2873 assert!(
2874 result.is_none(),
2875 "synced fill should prevent duplicate fill report"
2876 );
2877 }
2878
2879 #[rstest]
2880 fn test_ocm_state_sync_from_orders_incremental_fill_after_sync() {
2881 let mut state = OcmState::default();
2882
2883 let orders = vec![(
2884 "bet_inc".to_string(),
2885 ClientOrderId::from("O-INC-001"),
2886 Decimal::new(10, 0),
2887 Decimal::new(25, 1),
2888 false,
2889 )];
2890
2891 state.sync_from_orders(&orders);
2892
2893 let uo = crate::stream::messages::UnmatchedOrder {
2895 id: "bet_inc".to_string(),
2896 p: Decimal::new(25, 1),
2897 s: Decimal::new(20, 0),
2898 side: crate::common::enums::StreamingSide::Lay,
2899 status: crate::common::enums::StreamingOrderStatus::Executable,
2900 pt: Some(crate::common::enums::StreamingPersistenceType::Persist),
2901 ot: crate::common::enums::StreamingOrderType::Limit,
2902 pd: 1617863365000,
2903 bsp: None,
2904 rfo: Some("O-INC-001".to_string()),
2905 rfs: None,
2906 rc: None,
2907 rac: None,
2908 md: None,
2909 cd: None,
2910 ld: None,
2911 avp: Some(Decimal::new(26, 1)),
2912 sm: Some(Decimal::new(18, 0)),
2913 sr: None,
2914 sl: None,
2915 sc: None,
2916 sv: None,
2917 lsrc: None,
2918 };
2919
2920 let instrument_id = InstrumentId::from("1.234567-12345-0.0.BETFAIR");
2921 let result = state.fill_tracker.maybe_fill_report(
2922 &uo,
2923 uo.s,
2924 instrument_id,
2925 AccountId::from("BETFAIR-001"),
2926 Currency::from("GBP"),
2927 UnixNanos::default(),
2928 UnixNanos::default(),
2929 );
2930
2931 let fill = result.expect("should produce incremental fill of 8");
2932 assert_eq!(fill.last_qty, Quantity::from("8.00"));
2933 }
2934
2935 #[rstest]
2936 fn test_ocm_state_sync_from_orders_zero_filled_not_synced() {
2937 let mut state = OcmState::default();
2938
2939 let orders = vec![(
2940 "bet_zero".to_string(),
2941 ClientOrderId::from("O-ZERO-001"),
2942 Decimal::ZERO,
2943 Decimal::ZERO,
2944 false,
2945 )];
2946
2947 state.sync_from_orders(&orders);
2948
2949 let rfo = make_customer_order_ref("O-ZERO-001");
2951 assert!(state.resolve_client_order_id(Some(&rfo)).is_some());
2952
2953 let uo = crate::stream::messages::UnmatchedOrder {
2955 id: "bet_zero".to_string(),
2956 p: Decimal::new(30, 1),
2957 s: Decimal::new(10, 0),
2958 side: crate::common::enums::StreamingSide::Back,
2959 status: crate::common::enums::StreamingOrderStatus::Executable,
2960 pt: Some(crate::common::enums::StreamingPersistenceType::Lapse),
2961 ot: crate::common::enums::StreamingOrderType::Limit,
2962 pd: 1617863365000,
2963 bsp: None,
2964 rfo: None,
2965 rfs: None,
2966 rc: None,
2967 rac: None,
2968 md: None,
2969 cd: None,
2970 ld: None,
2971 avp: Some(Decimal::new(30, 1)),
2972 sm: Some(Decimal::new(5, 0)),
2973 sr: None,
2974 sl: None,
2975 sc: None,
2976 sv: None,
2977 lsrc: None,
2978 };
2979 let instrument_id = InstrumentId::from("1.234567-12345-0.0.BETFAIR");
2980 let result = state.fill_tracker.maybe_fill_report(
2981 &uo,
2982 uo.s,
2983 instrument_id,
2984 AccountId::from("BETFAIR-001"),
2985 Currency::from("GBP"),
2986 UnixNanos::default(),
2987 UnixNanos::default(),
2988 );
2989 assert!(
2990 result.is_some(),
2991 "zero-filled order should not block new fills"
2992 );
2993 }
2994
2995 #[rstest]
2996 fn test_ocm_state_sync_multiple_open_and_closed() {
2997 let mut state = OcmState::default();
2998
2999 let orders = vec![
3000 (
3001 "bet_a".to_string(),
3002 ClientOrderId::from("O-A"),
3003 Decimal::new(5, 0),
3004 Decimal::new(20, 1),
3005 false,
3006 ),
3007 (
3008 "bet_b".to_string(),
3009 ClientOrderId::from("O-B"),
3010 Decimal::ZERO,
3011 Decimal::ZERO,
3012 true,
3013 ),
3014 (
3015 "bet_c".to_string(),
3016 ClientOrderId::from("O-C"),
3017 Decimal::new(100, 0),
3018 Decimal::new(15, 1),
3019 true,
3020 ),
3021 (
3022 "bet_d".to_string(),
3023 ClientOrderId::from("O-D"),
3024 Decimal::ZERO,
3025 Decimal::ZERO,
3026 false,
3027 ),
3028 ];
3029
3030 state.sync_from_orders(&orders);
3031
3032 assert!(
3034 state
3035 .resolve_client_order_id(Some(&make_customer_order_ref("O-A")))
3036 .is_some()
3037 );
3038 assert!(
3039 state
3040 .resolve_client_order_id(Some(&make_customer_order_ref("O-D")))
3041 .is_some()
3042 );
3043
3044 assert!(state.terminal_orders.contains("bet_b"));
3046 assert!(state.terminal_orders.contains("bet_c"));
3047 assert!(!state.terminal_orders.contains("bet_a"));
3048 assert!(!state.terminal_orders.contains("bet_d"));
3049
3050 assert!(
3052 state
3053 .resolve_client_order_id(Some(&make_customer_order_ref("O-B")))
3054 .is_none()
3055 );
3056 }
3057
3058 fn make_summary(
3059 bet_id: &str,
3060 market_id: &str,
3061 selection_id: u64,
3062 handicap: Decimal,
3063 status: BetfairOrderStatus,
3064 placed_date: &str,
3065 ) -> CurrentOrderSummary {
3066 CurrentOrderSummary {
3067 bet_id: bet_id.to_string(),
3068 market_id: market_id.to_string(),
3069 selection_id,
3070 handicap,
3071 price_size: crate::http::models::PriceSize {
3072 price: Decimal::new(20, 1),
3073 size: Decimal::new(10, 0),
3074 },
3075 bsp_liability: Decimal::ZERO,
3076 side: BetfairSide::Back,
3077 status,
3078 persistence_type: PersistenceType::Lapse,
3079 order_type: BetfairOrderType::Limit,
3080 placed_date: placed_date.to_string(),
3081 matched_date: None,
3082 average_price_matched: None,
3083 size_matched: None,
3084 size_remaining: Some(Decimal::new(10, 0)),
3085 size_lapsed: None,
3086 size_cancelled: None,
3087 size_voided: None,
3088 regulator_auth_code: None,
3089 regulator_code: None,
3090 customer_order_ref: None,
3091 customer_strategy_ref: None,
3092 }
3093 }
3094
3095 #[rstest]
3096 fn test_select_order_for_query_single_executable() {
3097 let cid = ClientOrderId::from("O-001");
3098 let orders = vec![make_summary(
3099 "bet_1",
3100 "1.100",
3101 12345,
3102 Decimal::ZERO,
3103 BetfairOrderStatus::Executable,
3104 "2026-04-18T10:00:00Z",
3105 )];
3106 let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3107
3108 let selected = select_order_for_query(&orders, expected, cid, None);
3109 assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_1"));
3110 }
3111
3112 #[rstest]
3113 fn test_select_order_for_query_single_terminal() {
3114 let cid = ClientOrderId::from("O-001");
3115 let orders = vec![make_summary(
3116 "bet_1",
3117 "1.100",
3118 12345,
3119 Decimal::ZERO,
3120 BetfairOrderStatus::ExecutionComplete,
3121 "2026-04-18T10:00:00Z",
3122 )];
3123 let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3124
3125 let selected = select_order_for_query(&orders, expected, cid, None);
3126 assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_1"));
3127 }
3128
3129 #[rstest]
3130 fn test_select_order_for_query_replace_prefers_executable() {
3131 let cid = ClientOrderId::from("O-001");
3132 let orders = vec![
3133 make_summary(
3134 "bet_old",
3135 "1.100",
3136 12345,
3137 Decimal::ZERO,
3138 BetfairOrderStatus::ExecutionComplete,
3139 "2026-04-18T10:00:00Z",
3140 ),
3141 make_summary(
3142 "bet_new",
3143 "1.100",
3144 12345,
3145 Decimal::ZERO,
3146 BetfairOrderStatus::Executable,
3147 "2026-04-18T10:05:00Z",
3148 ),
3149 ];
3150 let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3151
3152 let selected = select_order_for_query(&orders, expected, cid, None);
3153 assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_new"));
3154 }
3155
3156 #[rstest]
3157 fn test_select_order_for_query_multiple_executable_prefers_most_recent() {
3158 let cid = ClientOrderId::from("O-001");
3159 let orders = vec![
3160 make_summary(
3161 "bet_old",
3162 "1.100",
3163 12345,
3164 Decimal::ZERO,
3165 BetfairOrderStatus::Executable,
3166 "2026-04-18T10:00:00Z",
3167 ),
3168 make_summary(
3169 "bet_new",
3170 "1.100",
3171 12345,
3172 Decimal::ZERO,
3173 BetfairOrderStatus::Executable,
3174 "2026-04-18T10:05:00Z",
3175 ),
3176 ];
3177 let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3178
3179 let selected = select_order_for_query(&orders, expected, cid, None);
3180 assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_new"));
3181 }
3182
3183 #[rstest]
3184 fn test_select_order_for_query_multiple_terminal_prefers_most_recent() {
3185 let cid = ClientOrderId::from("O-001");
3186 let orders = vec![
3187 make_summary(
3188 "bet_old",
3189 "1.100",
3190 12345,
3191 Decimal::ZERO,
3192 BetfairOrderStatus::ExecutionComplete,
3193 "2026-04-18T10:00:00Z",
3194 ),
3195 make_summary(
3196 "bet_new",
3197 "1.100",
3198 12345,
3199 Decimal::ZERO,
3200 BetfairOrderStatus::ExecutionComplete,
3201 "2026-04-18T10:05:00Z",
3202 ),
3203 ];
3204 let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3205
3206 let selected = select_order_for_query(&orders, expected, cid, None);
3207 assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_new"));
3208 }
3209
3210 #[rstest]
3211 fn test_select_order_for_query_foreign_only_without_vid_returns_none() {
3212 let cid = ClientOrderId::from("O-001");
3213 let orders = vec![make_summary(
3214 "bet_foreign",
3215 "1.999",
3216 99999,
3217 Decimal::ZERO,
3218 BetfairOrderStatus::Executable,
3219 "2026-04-18T10:00:00Z",
3220 )];
3221 let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3222
3223 let selected = select_order_for_query(&orders, expected, cid, None);
3224 assert!(selected.is_none());
3225 }
3226
3227 #[rstest]
3228 fn test_select_order_for_query_foreign_only_with_vid_match_returns_match() {
3229 let cid = ClientOrderId::from("O-001");
3230 let orders = vec![make_summary(
3231 "bet_foreign",
3232 "1.999",
3233 99999,
3234 Decimal::ZERO,
3235 BetfairOrderStatus::Executable,
3236 "2026-04-18T10:00:00Z",
3237 )];
3238 let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3239 let vid = VenueOrderId::from("bet_foreign");
3240
3241 let selected = select_order_for_query(&orders, expected, cid, Some(vid));
3242 assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_foreign"));
3243 }
3244
3245 #[rstest]
3246 fn test_select_order_for_query_foreign_only_vid_mismatch_returns_none() {
3247 let cid = ClientOrderId::from("O-001");
3248 let orders = vec![
3249 make_summary(
3250 "bet_foreign_1",
3251 "1.999",
3252 99999,
3253 Decimal::ZERO,
3254 BetfairOrderStatus::Executable,
3255 "2026-04-18T10:00:00Z",
3256 ),
3257 make_summary(
3258 "bet_foreign_2",
3259 "1.888",
3260 88888,
3261 Decimal::ZERO,
3262 BetfairOrderStatus::Executable,
3263 "2026-04-18T10:05:00Z",
3264 ),
3265 ];
3266 let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3267 let vid = VenueOrderId::from("bet_unknown");
3268
3269 let selected = select_order_for_query(&orders, expected, cid, Some(vid));
3270 assert!(selected.is_none());
3271 }
3272
3273 #[rstest]
3274 fn test_select_order_for_query_mixed_returns_matching_instrument() {
3275 let cid = ClientOrderId::from("O-001");
3276 let orders = vec![
3277 make_summary(
3278 "bet_foreign",
3279 "1.999",
3280 99999,
3281 Decimal::ZERO,
3282 BetfairOrderStatus::Executable,
3283 "2026-04-18T10:05:00Z",
3284 ),
3285 make_summary(
3286 "bet_match",
3287 "1.100",
3288 12345,
3289 Decimal::ZERO,
3290 BetfairOrderStatus::ExecutionComplete,
3291 "2026-04-18T10:00:00Z",
3292 ),
3293 ];
3294 let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3295
3296 let selected = select_order_for_query(&orders, expected, cid, None);
3297 assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_match"));
3298 }
3299
3300 #[rstest]
3301 fn test_extend_unique_filters_duplicates() {
3302 let mut candidates: Vec<CurrentOrderSummary> = Vec::new();
3303 let mut seen: AHashSet<String> = AHashSet::new();
3304
3305 let orders = vec![
3306 make_summary(
3307 "bet_1",
3308 "1.100",
3309 12345,
3310 Decimal::ZERO,
3311 BetfairOrderStatus::Executable,
3312 "2026-04-18T10:00:00Z",
3313 ),
3314 make_summary(
3315 "bet_1",
3316 "1.100",
3317 12345,
3318 Decimal::ZERO,
3319 BetfairOrderStatus::Executable,
3320 "2026-04-18T10:01:00Z",
3321 ),
3322 make_summary(
3323 "bet_2",
3324 "1.100",
3325 12345,
3326 Decimal::ZERO,
3327 BetfairOrderStatus::Executable,
3328 "2026-04-18T10:02:00Z",
3329 ),
3330 ];
3331
3332 extend_unique(&mut candidates, &mut seen, orders);
3333
3334 assert_eq!(candidates.len(), 2);
3335 assert_eq!(candidates[0].bet_id, "bet_1");
3336 assert_eq!(candidates[0].placed_date, "2026-04-18T10:00:00Z");
3337 assert_eq!(candidates[1].bet_id, "bet_2");
3338 assert!(seen.contains("bet_1"));
3339 assert!(seen.contains("bet_2"));
3340 }
3341
3342 #[rstest]
3343 fn test_extend_unique_skips_already_seen() {
3344 let mut candidates: Vec<CurrentOrderSummary> = vec![make_summary(
3345 "bet_1",
3346 "1.100",
3347 12345,
3348 Decimal::ZERO,
3349 BetfairOrderStatus::Executable,
3350 "2026-04-18T10:00:00Z",
3351 )];
3352 let mut seen: AHashSet<String> = AHashSet::new();
3353 seen.insert("bet_1".to_string());
3354
3355 let orders = vec![make_summary(
3356 "bet_1",
3357 "1.100",
3358 12345,
3359 Decimal::ZERO,
3360 BetfairOrderStatus::Executable,
3361 "2026-04-18T10:05:00Z",
3362 )];
3363
3364 extend_unique(&mut candidates, &mut seen, orders);
3365
3366 assert_eq!(candidates.len(), 1);
3367 assert_eq!(candidates[0].placed_date, "2026-04-18T10:00:00Z");
3368 }
3369
3370 #[rstest]
3371 fn test_list_current_orders_filter_bet_id_sets_only_bet_ids() {
3372 let params = list_current_orders_filter_bet_id("bet_abc".to_string());
3373
3374 assert_eq!(
3375 params.bet_ids.as_deref(),
3376 Some(&["bet_abc".to_string()][..])
3377 );
3378 assert!(params.customer_order_refs.is_none());
3379 assert!(params.market_ids.is_none());
3380 assert!(params.order_projection.is_none());
3381 assert!(params.customer_strategy_refs.is_none());
3382 assert!(params.date_range.is_none());
3383 assert!(params.order_by.is_none());
3384 assert!(params.sort_dir.is_none());
3385 assert!(params.from_record.is_none());
3386 assert!(params.record_count.is_none());
3387 }
3388
3389 #[rstest]
3390 fn test_list_current_orders_filter_ref_sets_only_customer_order_refs() {
3391 let params = list_current_orders_filter_ref("rfo_abc".to_string());
3392
3393 assert_eq!(
3394 params.customer_order_refs.as_deref(),
3395 Some(&["rfo_abc".to_string()][..])
3396 );
3397 assert!(params.bet_ids.is_none());
3398 assert!(params.market_ids.is_none());
3399 assert!(params.order_projection.is_none());
3400 assert!(params.customer_strategy_refs.is_none());
3401 assert!(params.date_range.is_none());
3402 assert!(params.order_by.is_none());
3403 assert!(params.sort_dir.is_none());
3404 assert!(params.from_record.is_none());
3405 assert!(params.record_count.is_none());
3406 }
3407}