Skip to main content

nautilus_betfair/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client for the Betfair adapter.
17
18use std::{
19    fmt,
20    future::Future,
21    sync::{
22        Arc, Mutex,
23        atomic::{AtomicBool, Ordering},
24    },
25};
26
27use ahash::{AHashMap, AHashSet};
28use async_trait::async_trait;
29use nautilus_common::{
30    clients::ExecutionClient,
31    live::{
32        get_runtime,
33        runner::{get_data_event_sender, get_exec_event_sender},
34    },
35    messages::{
36        DataEvent,
37        execution::{
38            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
39            GenerateFillReportsBuilder, GenerateOrderStatusReports,
40            GenerateOrderStatusReportsBuilder, ModifyOrder, QueryOrder, SubmitOrder,
41            SubmitOrderList,
42        },
43    },
44};
45use nautilus_core::{
46    MUTEX_POISONED, UnixNanos,
47    datetime::NANOSECONDS_IN_SECOND,
48    time::{AtomicTime, get_atomic_clock_realtime},
49};
50use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
51use nautilus_model::{
52    accounts::AccountAny,
53    data::Data,
54    enums::{AccountType, OmsType, OrderStatus, OrderType, TimeInForce},
55    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, Venue, VenueOrderId},
56    instruments::InstrumentAny,
57    orders::Order,
58    reports::{ExecutionMassStatus, FillReport, OrderStatusReport},
59    types::{AccountBalance, Currency, MarginBalance},
60};
61use nautilus_network::socket::TcpMessageHandler;
62use rust_decimal::Decimal;
63use tokio::task::JoinHandle;
64
65use crate::{
66    common::{
67        consts::{
68            BETFAIR_VENUE, METHOD_CANCEL_ORDERS, METHOD_GET_ACCOUNT_FUNDS,
69            METHOD_LIST_CURRENT_ORDERS, METHOD_PLACE_ORDERS, METHOD_REPLACE_ORDERS,
70        },
71        credential::BetfairCredential,
72        enums::{
73            BetfairOrderStatus, BetfairOrderType, BetfairSide, BetfairTimeInForce,
74            ExecutionReportErrorCode, ExecutionReportStatus, InstructionReportErrorCode,
75            InstructionReportStatus, OrderProjection, PersistenceType, StreamingOrderStatus,
76            StreamingSide,
77        },
78        parse::{
79            extract_market_id, extract_selection_id, make_customer_order_ref,
80            make_customer_order_ref_legacy, make_instrument_id, parse_account_state,
81            parse_millis_timestamp,
82        },
83        types::BetId,
84    },
85    config::BetfairExecConfig,
86    data::custom_data_with_instrument,
87    data_types::{BetfairOrderVoided, register_betfair_custom_data},
88    http::{
89        client::BetfairHttpClient,
90        models::{
91            AccountFundsResponse, CancelExecutionReport, CancelInstruction, CancelOrdersParams,
92            CurrentOrderSummary, CurrentOrderSummaryReport, LimitOnCloseOrder, LimitOrder,
93            ListCurrentOrdersParams, MarketOnCloseOrder, MarketVersion, PlaceExecutionReport,
94            PlaceInstruction, PlaceInstructionReport, PlaceOrdersParams, ReplaceExecutionReport,
95            ReplaceInstruction, ReplaceInstructionReport, ReplaceOrdersParams, TimeRange,
96        },
97        parse::{parse_current_order_fill_report, parse_current_order_report},
98    },
99    stream::{
100        client::BetfairStreamClient,
101        config::BetfairStreamConfig,
102        messages::{StreamMessage, stream_decode},
103        parse::{FillTracker, has_cancel_quantity, parse_order_status_report},
104    },
105};
106
107/// Keep-alive interval in seconds (10 hours, matching Python default).
108const KEEP_ALIVE_INTERVAL_SECS: u64 = 36_000;
109
110/// Delay in seconds before retrying after a rate limit error.
111const RATE_LIMIT_RETRY_DELAY_SECS: u64 = 5;
112
113/// Shared mutable state for the OCM stream handler.
114///
115/// Accessed by both the TCP reader closure and the execution client methods
116/// (submit, modify, connect/disconnect). All access goes through `Arc<Mutex<>>`.
117#[derive(Debug, Default)]
118pub struct OcmState {
119    pub fill_tracker: FillTracker,
120    /// Maps customer_order_ref (rfo) to ClientOrderId for stream resolution.
121    pub customer_order_refs: AHashMap<String, ClientOrderId>,
122    /// Client order IDs that already received an OCM order status update.
123    pub stream_reported_client_orders: AHashSet<ClientOrderId>,
124    /// Bet IDs that have received a terminal event (cancel, lapse, fill-complete).
125    pub terminal_orders: AHashSet<String>,
126    /// Old bet IDs from replace operations, to suppress late stream updates.
127    pub replaced_venue_order_ids: AHashSet<String>,
128    /// (client_order_id, old_bet_id) pairs for in-flight replace operations.
129    pub pending_update_keys: AHashSet<(ClientOrderId, String)>,
130}
131
132impl OcmState {
133    /// Registers a customer_order_ref mapping for a new order.
134    pub fn register_customer_order_ref(&mut self, client_order_id: ClientOrderId) {
135        let rfo = make_customer_order_ref(client_order_id.as_str());
136        self.customer_order_refs.insert(rfo, client_order_id);
137    }
138
139    /// Registers both current and legacy customer_order_ref truncations.
140    ///
141    /// Used during reconnect sync for pre-existing orders that may
142    /// have been placed with either truncation format.
143    pub fn register_customer_order_ref_with_legacy(&mut self, client_order_id: ClientOrderId) {
144        let rfo = make_customer_order_ref(client_order_id.as_str());
145        let rfo_legacy = make_customer_order_ref_legacy(client_order_id.as_str());
146        self.customer_order_refs.insert(rfo, client_order_id);
147        if rfo_legacy != client_order_id.as_str() {
148            self.customer_order_refs.insert(rfo_legacy, client_order_id);
149        }
150    }
151
152    /// Removes customer_order_ref mappings for a client_order_id.
153    pub fn remove_customer_order_refs(&mut self, client_order_id: &ClientOrderId) {
154        let rfo = make_customer_order_ref(client_order_id.as_str());
155        let rfo_legacy = make_customer_order_ref_legacy(client_order_id.as_str());
156        self.customer_order_refs.remove(&rfo);
157        self.customer_order_refs.remove(&rfo_legacy);
158    }
159
160    /// Resolves a client_order_id from the unmatched order's rfo field.
161    pub fn resolve_client_order_id(&self, rfo: Option<&str>) -> Option<ClientOrderId> {
162        rfo.and_then(|r| self.customer_order_refs.get(r).copied())
163    }
164
165    /// Returns `true` if the bet_id already has a terminal event and should be skipped.
166    /// Otherwise marks it as terminal and returns `false`.
167    pub fn try_mark_terminal(&mut self, bet_id: &str) -> bool {
168        !self.terminal_orders.insert(bet_id.to_string())
169    }
170
171    /// Returns `true` if a cancel/lapse for this bet should be suppressed
172    /// because a replace operation is pending or the bet was already replaced.
173    pub fn should_suppress_cancel(&self, client_order_id: &ClientOrderId, bet_id: &str) -> bool {
174        if self.replaced_venue_order_ids.contains(bet_id) {
175            return true;
176        }
177        self.pending_update_keys
178            .contains(&(*client_order_id, bet_id.to_string()))
179    }
180
181    /// Cleans up customer_order_ref mappings for a terminal order,
182    /// unless a pending replace exists for this client_order_id.
183    pub fn cleanup_terminal_order(&mut self, client_order_id: &ClientOrderId) {
184        let has_pending = self
185            .pending_update_keys
186            .iter()
187            .any(|(cid, _)| cid == client_order_id);
188
189        if !has_pending {
190            self.remove_customer_order_refs(client_order_id);
191        }
192    }
193
194    /// Syncs fill tracker state from existing order fills.
195    ///
196    /// Pre-populates filled quantities and average prices so that
197    /// the first stream update after reconnect computes correct
198    /// incremental fills instead of treating cumulative size as new.
199    pub fn sync_from_orders(&mut self, orders: &[(String, ClientOrderId, Decimal, Decimal, bool)]) {
200        for (bet_id, client_order_id, filled_qty, avg_px, is_closed) in orders {
201            if *is_closed {
202                self.terminal_orders.insert(bet_id.clone());
203            } else {
204                self.register_customer_order_ref_with_legacy(*client_order_id);
205            }
206
207            if *filled_qty > Decimal::ZERO {
208                self.fill_tracker.sync_order(bet_id, *filled_qty, *avg_px);
209            }
210        }
211    }
212}
213
214/// Betfair live execution client.
215#[derive(Debug)]
216pub struct BetfairExecutionClient {
217    core: ExecutionClientCore,
218    clock: &'static AtomicTime,
219    emitter: ExecutionEventEmitter,
220    http_client: Arc<BetfairHttpClient>,
221    stream_client: Option<Arc<BetfairStreamClient>>,
222    credential: BetfairCredential,
223    stream_config: BetfairStreamConfig,
224    config: BetfairExecConfig,
225    currency: Currency,
226    ocm_state: Arc<Mutex<OcmState>>,
227    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
228    keep_alive_handle: Option<JoinHandle<()>>,
229    account_state_handle: Option<JoinHandle<()>>,
230    reconnect_handle: Option<JoinHandle<()>>,
231}
232
233impl BetfairExecutionClient {
234    /// Creates a new [`BetfairExecutionClient`] instance.
235    #[must_use]
236    pub fn new(
237        core: ExecutionClientCore,
238        http_client: BetfairHttpClient,
239        credential: BetfairCredential,
240        stream_config: BetfairStreamConfig,
241        config: BetfairExecConfig,
242        currency: Currency,
243    ) -> Self {
244        let clock = get_atomic_clock_realtime();
245        let emitter = ExecutionEventEmitter::new(
246            clock,
247            core.trader_id,
248            core.account_id,
249            AccountType::Betting,
250            None,
251        );
252
253        Self {
254            core,
255            clock,
256            emitter,
257            http_client: Arc::new(http_client),
258            stream_client: None,
259            credential,
260            stream_config,
261            config,
262            currency,
263            ocm_state: Arc::new(Mutex::new(OcmState::default())),
264            pending_tasks: Mutex::new(Vec::new()),
265            keep_alive_handle: None,
266            account_state_handle: None,
267            reconnect_handle: None,
268        }
269    }
270
271    fn spawn_task<F>(&self, description: &'static str, fut: F)
272    where
273        F: Future<Output = anyhow::Result<()>> + Send + 'static,
274    {
275        let runtime = get_runtime();
276        let handle = runtime.spawn(async move {
277            if let Err(e) = fut.await {
278                log::warn!("{description} failed: {e:?}");
279            }
280        });
281
282        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
283        tasks.retain(|handle| !handle.is_finished());
284        tasks.push(handle);
285    }
286
287    fn reconcile_market_ids(&self) -> Option<Vec<String>> {
288        if self.config.reconcile_market_ids_only
289            && let Some(ids) = &self.config.reconcile_market_ids
290        {
291            return Some(ids.clone());
292        }
293        self.config.stream_market_ids_filter.clone()
294    }
295
296    /// Returns the market version for price protection on order placement.
297    ///
298    /// When `use_market_version` is enabled, reads the `version` field from
299    /// the instrument's `info` metadata. Betfair lapses orders submitted with
300    /// a stale version rather than matching against a moved book.
301    fn get_market_version(&self, instrument_id: &InstrumentId) -> Option<MarketVersion> {
302        if !self.config.use_market_version {
303            return None;
304        }
305
306        let cache = self.core.cache();
307        let instrument = cache.instrument(instrument_id)?;
308
309        if let InstrumentAny::Betting(betting) = instrument {
310            let version = betting.info.as_ref()?.get_i64("version")?;
311            return Some(MarketVersion {
312                version: Some(version),
313            });
314        }
315
316        None
317    }
318
319    /// Pre-populates OCM state from cached orders to prevent duplicate fills
320    /// and terminal events after reconnect.
321    fn sync_ocm_state_from_cache(&self) {
322        let cache = self.core.cache();
323        let venue = *BETFAIR_VENUE;
324        let orders = cache.orders(Some(&venue), None, None, None, None);
325
326        let order_data: Vec<_> = orders
327            .iter()
328            .filter_map(|order| {
329                let venue_order_id = order.venue_order_id()?;
330                let bet_id = venue_order_id.to_string();
331                let filled_qty = order.filled_qty().as_decimal();
332                let avg_px = order.avg_px().map_or(Decimal::ZERO, |px| {
333                    Decimal::try_from(px).unwrap_or(Decimal::ZERO)
334                });
335                Some((
336                    bet_id,
337                    order.client_order_id(),
338                    filled_qty,
339                    avg_px,
340                    order.is_closed(),
341                ))
342            })
343            .collect();
344
345        let mut state = self.ocm_state.lock().expect(MUTEX_POISONED);
346        state.sync_from_orders(&order_data);
347
348        log::info!("Synced OCM state from {} cached orders", order_data.len());
349    }
350
351    fn abort_pending_tasks(&self) {
352        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
353        for handle in tasks.drain(..) {
354            handle.abort();
355        }
356    }
357
358    fn abort_background_tasks(&mut self) {
359        if let Some(handle) = self.keep_alive_handle.take() {
360            handle.abort();
361        }
362
363        if let Some(handle) = self.account_state_handle.take() {
364            handle.abort();
365        }
366
367        if let Some(handle) = self.reconnect_handle.take() {
368            handle.abort();
369        }
370    }
371
372    #[expect(clippy::too_many_arguments)]
373    fn create_ocm_handler(
374        emitter: ExecutionEventEmitter,
375        account_id: AccountId,
376        currency: Currency,
377        ocm_state: Arc<Mutex<OcmState>>,
378        data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
379        market_ids_filter: Option<ahash::AHashSet<String>>,
380        ignore_external_orders: bool,
381        reconnect_tx: tokio::sync::mpsc::UnboundedSender<()>,
382    ) -> TcpMessageHandler {
383        let has_initial_connection = Arc::new(AtomicBool::new(false));
384
385        Arc::new(move |data: &[u8]| {
386            let msg = match stream_decode(data) {
387                Ok(msg) => msg,
388                Err(e) => {
389                    log::warn!("Failed to decode stream message: {e}");
390                    return;
391                }
392            };
393
394            match msg {
395                StreamMessage::OrderChange(ocm) => {
396                    if ocm.is_heartbeat() {
397                        return;
398                    }
399
400                    let Some(order_changes) = &ocm.oc else {
401                        return;
402                    };
403
404                    let ts_event = parse_millis_timestamp(ocm.pt);
405                    let ts_init = ts_event;
406
407                    for omc in order_changes {
408                        if let Some(ref filter) = market_ids_filter
409                            && !filter.contains(&omc.id)
410                        {
411                            continue;
412                        }
413                        let Some(orc_list) = &omc.orc else {
414                            continue;
415                        };
416
417                        for orc in orc_list {
418                            let handicap = orc.hc.unwrap_or(Decimal::ZERO);
419                            let instrument_id = make_instrument_id(&omc.id, orc.id, handicap);
420
421                            let Some(unmatched_orders) = &orc.uo else {
422                                continue;
423                            };
424
425                            for uo in unmatched_orders {
426                                if ignore_external_orders && uo.rfo.is_none() {
427                                    continue;
428                                }
429
430                                Self::process_unmatched_order(
431                                    uo,
432                                    instrument_id,
433                                    account_id,
434                                    currency,
435                                    &emitter,
436                                    &ocm_state,
437                                    ts_event,
438                                    ts_init,
439                                );
440
441                                if uo.status == StreamingOrderStatus::ExecutionComplete
442                                    && uo.sv.is_some_and(|sv| sv > Decimal::ZERO)
443                                {
444                                    let sv = uo.sv.unwrap();
445                                    let side_str = match uo.side {
446                                        StreamingSide::Back => "BACK",
447                                        StreamingSide::Lay => "LAY",
448                                    };
449                                    let dec_to_f64 = |d: Decimal| -> f64 {
450                                        d.to_string().parse::<f64>().unwrap_or(0.0)
451                                    };
452                                    let voided = BetfairOrderVoided::new(
453                                        instrument_id,
454                                        uo.rfo.as_deref().unwrap_or("").to_string(),
455                                        uo.id.clone(),
456                                        dec_to_f64(sv),
457                                        dec_to_f64(uo.p),
458                                        dec_to_f64(uo.s),
459                                        side_str.to_string(),
460                                        uo.avp.map_or(f64::NAN, dec_to_f64),
461                                        uo.sm.map_or(f64::NAN, dec_to_f64),
462                                        String::new(),
463                                        ts_event,
464                                        ts_init,
465                                    );
466                                    log::info!("Order voided: bet_id={}, size_voided={sv}", uo.id,);
467                                    let custom = custom_data_with_instrument(
468                                        Arc::new(voided),
469                                        instrument_id,
470                                    );
471
472                                    if let Err(e) =
473                                        data_sender.send(DataEvent::Data(Data::Custom(custom)))
474                                    {
475                                        log::warn!("Failed to send voided event: {e}");
476                                    }
477                                }
478                            }
479                        }
480                    }
481                }
482                StreamMessage::Connection(_) => {
483                    if has_initial_connection.swap(true, Ordering::SeqCst) {
484                        log::info!("Betfair execution stream reconnected");
485                        let _ = reconnect_tx.send(());
486                    } else {
487                        log::info!("Betfair execution stream connected");
488                    }
489                }
490                StreamMessage::Status(status) => {
491                    if status.connection_closed {
492                        log::error!(
493                            "Betfair execution stream closed: {:?} - {:?}",
494                            status.error_code,
495                            status.error_message,
496                        );
497                    }
498                }
499                StreamMessage::MarketChange(_) | StreamMessage::RaceChange(_) => {}
500            }
501        })
502    }
503
504    #[expect(clippy::too_many_arguments)]
505    fn process_unmatched_order(
506        uo: &crate::stream::messages::UnmatchedOrder,
507        instrument_id: InstrumentId,
508        account_id: AccountId,
509        currency: Currency,
510        emitter: &ExecutionEventEmitter,
511        ocm_state: &Arc<Mutex<OcmState>>,
512        ts_event: UnixNanos,
513        ts_init: UnixNanos,
514    ) -> bool {
515        let mut report =
516            match parse_order_status_report(uo, instrument_id, account_id, ts_event, ts_init) {
517                Ok(report) => report,
518                Err(e) => {
519                    log::warn!("Failed to parse order status report for {instrument_id}: {e}");
520                    return false;
521                }
522            };
523
524        let Ok(mut state) = ocm_state.lock() else {
525            log::error!("OcmState mutex poisoned");
526            return false;
527        };
528
529        if state.terminal_orders.contains(&uo.id) {
530            return false;
531        }
532
533        let resolved_client_order_id = state.resolve_client_order_id(uo.rfo.as_deref());
534
535        // Patch the truncated rfo-derived client_order_id with the full
536        // resolved value so downstream reconciliation matches correctly.
537        if resolved_client_order_id.is_some() {
538            report.client_order_id = resolved_client_order_id;
539        }
540
541        if uo.status == StreamingOrderStatus::ExecutionComplete
542            && has_cancel_quantity(uo)
543            && let Some(ref client_oid) = resolved_client_order_id
544        {
545            if state.should_suppress_cancel(client_oid, &uo.id) {
546                log::debug!(
547                    "Suppressing cancel for bet_id={} (pending replace or already replaced)",
548                    uo.id,
549                );
550                return false;
551            }
552
553            if state.try_mark_terminal(&uo.id) {
554                log::debug!("Duplicate terminal event for bet_id={}, skipping", uo.id);
555                return false;
556            }
557        }
558
559        if let Some(client_oid) = resolved_client_order_id {
560            state.stream_reported_client_orders.insert(client_oid);
561        }
562
563        // Emit fill reports before order status reports so reconciliation does
564        // not infer a duplicate fill from the cumulative filled_qty on the
565        // status report.
566        if let Some(mut fill_report) = state.fill_tracker.maybe_fill_report(
567            uo,
568            uo.s,
569            instrument_id,
570            account_id,
571            currency,
572            ts_event,
573            ts_init,
574        ) {
575            if resolved_client_order_id.is_some() {
576                fill_report.client_order_id = resolved_client_order_id;
577            }
578            log::debug!(
579                "Fill: bet_id={}, last_qty={}, last_px={}",
580                uo.id,
581                fill_report.last_qty,
582                fill_report.last_px,
583            );
584            emitter.send_fill_report(fill_report);
585        }
586
587        if report.order_status == OrderStatus::Canceled
588            && let Some(reason) = report.cancel_reason.as_deref()
589        {
590            log::info!(
591                "Betfair order {} ({}) canceled: reason={}, matched={}, canceled={}, lapsed={}, voided={}",
592                report
593                    .client_order_id
594                    .unwrap_or_else(|| ClientOrderId::from(uo.id.as_str())),
595                uo.id,
596                reason,
597                uo.sm.unwrap_or(Decimal::ZERO),
598                uo.sc.unwrap_or(Decimal::ZERO),
599                uo.sl.unwrap_or(Decimal::ZERO),
600                uo.sv.unwrap_or(Decimal::ZERO),
601            );
602        }
603
604        emitter.send_order_status_report(report);
605
606        if uo.status == StreamingOrderStatus::ExecutionComplete {
607            state.terminal_orders.insert(uo.id.clone());
608            state.fill_tracker.prune(&uo.id);
609
610            if let Some(ref client_oid) = resolved_client_order_id {
611                state.cleanup_terminal_order(client_oid);
612            }
613        }
614
615        true
616    }
617}
618
619#[async_trait(?Send)]
620impl ExecutionClient for BetfairExecutionClient {
621    fn is_connected(&self) -> bool {
622        self.core.is_connected()
623    }
624
625    fn client_id(&self) -> ClientId {
626        self.core.client_id
627    }
628
629    fn account_id(&self) -> AccountId {
630        self.core.account_id
631    }
632
633    fn venue(&self) -> Venue {
634        *BETFAIR_VENUE
635    }
636
637    fn oms_type(&self) -> OmsType {
638        self.core.oms_type
639    }
640
641    fn get_account(&self) -> Option<AccountAny> {
642        self.core.cache().account(&self.core.account_id).cloned()
643    }
644
645    fn generate_account_state(
646        &self,
647        balances: Vec<AccountBalance>,
648        margins: Vec<MarginBalance>,
649        reported: bool,
650        ts_event: UnixNanos,
651    ) -> anyhow::Result<()> {
652        self.emitter
653            .emit_account_state(balances, margins, reported, ts_event);
654        Ok(())
655    }
656
657    fn start(&mut self) -> anyhow::Result<()> {
658        if self.core.is_started() {
659            return Ok(());
660        }
661
662        let sender = get_exec_event_sender();
663        self.emitter.set_sender(sender);
664        self.core.set_started();
665
666        log::info!(
667            "Started: client_id={}, account_id={}",
668            self.core.client_id,
669            self.core.account_id,
670        );
671        Ok(())
672    }
673
674    fn stop(&mut self) -> anyhow::Result<()> {
675        if self.core.is_stopped() {
676            return Ok(());
677        }
678
679        self.core.set_stopped();
680        self.core.set_disconnected();
681        self.abort_background_tasks();
682        self.abort_pending_tasks();
683        log::info!("Stopped: client_id={}", self.core.client_id);
684        Ok(())
685    }
686
687    async fn connect(&mut self) -> anyhow::Result<()> {
688        if self.core.is_connected() {
689            return Ok(());
690        }
691
692        register_betfair_custom_data();
693
694        self.http_client
695            .connect()
696            .await
697            .map_err(|e| anyhow::anyhow!("{e}"))?;
698
699        let funds: AccountFundsResponse = self
700            .http_client
701            .send_accounts(METHOD_GET_ACCOUNT_FUNDS, serde_json::json!({}))
702            .await
703            .map_err(|e| anyhow::anyhow!("{e}"))?;
704
705        let ts_init = self.clock.get_time_ns();
706        let account_state = parse_account_state(
707            &funds,
708            self.core.account_id,
709            self.currency,
710            ts_init,
711            ts_init,
712        )?;
713        self.emitter.send_account_state(account_state);
714
715        let session_token = self
716            .http_client
717            .session_token()
718            .await
719            .ok_or_else(|| anyhow::anyhow!("No session token after login"))?;
720
721        // Sync OCM state from cached orders before stream connects
722        self.sync_ocm_state_from_cache();
723
724        let market_ids_filter = self
725            .config
726            .stream_market_ids_filter
727            .as_ref()
728            .map(|ids| ids.iter().cloned().collect::<ahash::AHashSet<String>>());
729
730        let (reconnect_tx, mut reconnect_rx) = tokio::sync::mpsc::unbounded_channel();
731
732        let handler = Self::create_ocm_handler(
733            self.emitter.clone(),
734            self.core.account_id,
735            self.currency,
736            Arc::clone(&self.ocm_state),
737            get_data_event_sender(),
738            market_ids_filter,
739            self.config.ignore_external_orders,
740            reconnect_tx,
741        );
742
743        let stream_client = BetfairStreamClient::connect(
744            &self.credential,
745            session_token,
746            handler,
747            self.stream_config.clone(),
748        )
749        .await
750        .map_err(|e| anyhow::anyhow!("{e}"))?;
751
752        let stream_client = Arc::new(stream_client);
753
754        stream_client
755            .subscribe_orders(None, None)
756            .await
757            .map_err(|e| anyhow::anyhow!("{e}"))?;
758
759        self.stream_client = Some(stream_client);
760
761        // Spawn periodic keep-alive to prevent session expiry
762        let keep_alive_client = Arc::clone(&self.http_client);
763        let keep_alive_stream = Arc::clone(self.stream_client.as_ref().unwrap());
764        let keep_alive_app_key = self.credential.app_key().to_string();
765
766        self.keep_alive_handle = Some(get_runtime().spawn(async move {
767            let interval = tokio::time::Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS);
768            loop {
769                tokio::time::sleep(interval).await;
770
771                match keep_alive_client.keep_alive().await {
772                    Ok(()) => {}
773                    Err(ref e) if e.is_login_failed() => {
774                        log::warn!("Betfair execution session expired, attempting re-login: {e}");
775                        if let Err(e) = keep_alive_client.reconnect().await {
776                            log::error!("Betfair execution re-login failed: {e}");
777                            continue;
778                        }
779                    }
780                    Err(e) => {
781                        log::warn!("Betfair execution keep-alive failed (transient): {e}");
782                        continue;
783                    }
784                }
785
786                if let Some(token) = keep_alive_client.session_token().await {
787                    keep_alive_stream.update_auth(&keep_alive_app_key, token);
788                }
789                log::debug!("Betfair execution session keep-alive sent");
790            }
791        }));
792
793        if self.config.calculate_account_state && self.config.request_account_state_secs > 0 {
794            let acct_client = Arc::clone(&self.http_client);
795            let acct_emitter = self.emitter.clone();
796            let acct_id = self.core.account_id;
797            let acct_currency = self.currency;
798            let acct_clock = self.clock;
799            let interval_secs = self.config.request_account_state_secs;
800            self.account_state_handle = Some(get_runtime().spawn(async move {
801                let interval = tokio::time::Duration::from_secs(interval_secs);
802                loop {
803                    tokio::time::sleep(interval).await;
804
805                    match acct_client
806                        .send_accounts::<AccountFundsResponse, _>(
807                            METHOD_GET_ACCOUNT_FUNDS,
808                            serde_json::json!({}),
809                        )
810                        .await
811                    {
812                        Ok(funds) => {
813                            let ts_init = acct_clock.get_time_ns();
814
815                            match parse_account_state(
816                                &funds,
817                                acct_id,
818                                acct_currency,
819                                ts_init,
820                                ts_init,
821                            ) {
822                                Ok(state) => acct_emitter.send_account_state(state),
823                                Err(e) => log::warn!("Failed to parse account state: {e}"),
824                            }
825                        }
826                        Err(e) => log::warn!("Failed to fetch account state: {e}"),
827                    }
828                }
829            }));
830        }
831
832        let reconnect_http = Arc::clone(&self.http_client);
833        let reconnect_stream = Arc::clone(self.stream_client.as_ref().unwrap());
834        let reconnect_app_key = self.credential.app_key().to_string();
835        let reconnect_emitter = self.emitter.clone();
836        let reconnect_clock = self.clock;
837        let reconnect_acct_id = self.core.account_id;
838        let reconnect_currency = self.currency;
839
840        self.reconnect_handle = Some(get_runtime().spawn(async move {
841            while reconnect_rx.recv().await.is_some() {
842                log::info!("Handling execution stream reconnection");
843
844                match reconnect_http.keep_alive().await {
845                    Ok(()) => {}
846                    Err(ref e) if e.is_login_failed() => {
847                        log::warn!("Session expired on reconnect, attempting re-login: {e}");
848                        if let Err(e) = reconnect_http.reconnect().await {
849                            log::error!("Re-login failed on reconnect: {e}");
850                            continue;
851                        }
852                    }
853                    Err(e) => {
854                        log::warn!("Keep-alive failed on reconnect (transient): {e}");
855                        continue;
856                    }
857                }
858
859                if let Some(token) = reconnect_http.session_token().await {
860                    reconnect_stream.update_auth(&reconnect_app_key, token);
861                }
862
863                match reconnect_http
864                    .send_accounts::<AccountFundsResponse, _>(
865                        METHOD_GET_ACCOUNT_FUNDS,
866                        serde_json::json!({}),
867                    )
868                    .await
869                {
870                    Ok(funds) => {
871                        let ts_init = reconnect_clock.get_time_ns();
872
873                        match parse_account_state(
874                            &funds,
875                            reconnect_acct_id,
876                            reconnect_currency,
877                            ts_init,
878                            ts_init,
879                        ) {
880                            Ok(state) => reconnect_emitter.send_account_state(state),
881                            Err(e) => {
882                                log::warn!("Failed to parse account state on reconnect: {e}");
883                            }
884                        }
885                    }
886                    Err(e) => log::warn!("Failed to fetch account state on reconnect: {e}"),
887                }
888            }
889        }));
890
891        self.core.set_connected();
892
893        log::info!("Connected: client_id={}", self.core.client_id);
894        Ok(())
895    }
896
897    async fn disconnect(&mut self) -> anyhow::Result<()> {
898        if self.core.is_disconnected() {
899            return Ok(());
900        }
901
902        self.abort_background_tasks();
903        self.abort_pending_tasks();
904
905        if let Some(client) = &self.stream_client {
906            client.close().await;
907        }
908
909        self.http_client.disconnect().await;
910        self.core.set_disconnected();
911
912        log::info!("Disconnected: client_id={}", self.core.client_id);
913        Ok(())
914    }
915
916    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
917        let http_client = Arc::clone(&self.http_client);
918        let emitter = self.emitter.clone();
919        let account_id = self.core.account_id;
920        let ocm_state = Arc::clone(&self.ocm_state);
921        let clock = self.clock;
922        let client_order_id = cmd.client_order_id;
923        let venue_order_id = cmd.venue_order_id;
924        let instrument_id = cmd.instrument_id;
925
926        self.spawn_task("query_order", async move {
927            let mut candidates: Vec<CurrentOrderSummary> = Vec::new();
928            let mut seen_bet_ids: AHashSet<String> = AHashSet::new();
929
930            // Customer_order_ref lookup: Betfair reuses the ref across a
931            // replace (old bet cancelled + new bet live), so this returns the
932            // live replacement even when the cached bet_id is stale.
933            let rfo = make_customer_order_ref(client_order_id.as_str());
934            let rfo_params = list_current_orders_filter_ref(rfo.clone());
935            match list_current_orders_with_retry(&http_client, &rfo_params).await {
936                Ok(r) => extend_unique(&mut candidates, &mut seen_bet_ids, r.current_orders),
937                Err(e) => log::warn!("Betfair query_order ref lookup failed: {e}"),
938            }
939
940            if candidates.is_empty() {
941                let rfo_legacy = make_customer_order_ref_legacy(client_order_id.as_str());
942                if rfo_legacy != rfo {
943                    let legacy_params = list_current_orders_filter_ref(rfo_legacy);
944                    match list_current_orders_with_retry(&http_client, &legacy_params).await {
945                        Ok(r) => {
946                            extend_unique(&mut candidates, &mut seen_bet_ids, r.current_orders);
947                        }
948                        Err(e) => log::warn!("Betfair query_order legacy lookup failed: {e}"),
949                    }
950                }
951            }
952
953            // Always also query by bet_id when known. This rescues
954            // pre-existing orders without a recognizable ref and orders whose
955            // ref-based results came back as foreign-market collisions only.
956            if let Some(ref bet_id) = venue_order_id {
957                let params = list_current_orders_filter_bet_id(bet_id.to_string());
958                match list_current_orders_with_retry(&http_client, &params).await {
959                    Ok(r) => extend_unique(&mut candidates, &mut seen_bet_ids, r.current_orders),
960                    Err(e) => log::warn!("Betfair query_order bet_id lookup failed: {e}"),
961                }
962            }
963
964            if candidates.is_empty() {
965                log::warn!(
966                    "Betfair query_order found no order for client_order_id={client_order_id}, venue_order_id={venue_order_id:?}",
967                );
968                return Ok(());
969            }
970
971            let Some(order) = select_order_for_query(
972                &candidates,
973                instrument_id,
974                client_order_id,
975                venue_order_id,
976            ) else {
977                return Ok(());
978            };
979
980            let ts_init = clock.get_time_ns();
981            let mut report = match parse_current_order_report(order, account_id, ts_init) {
982                Ok(r) => r,
983                Err(e) => {
984                    log::error!("Failed to parse order report for {}: {e}", order.bet_id);
985                    return Ok(());
986                }
987            };
988
989            if report.client_order_id.is_none()
990                && let Some(rfo) = order.customer_order_ref.as_deref()
991                && let Ok(state) = ocm_state.lock()
992                && let Some(full_id) = state.resolve_client_order_id(Some(rfo))
993            {
994                report.client_order_id = Some(full_id);
995            }
996
997            if report.client_order_id.is_none() {
998                report.client_order_id = Some(client_order_id);
999            }
1000
1001            emitter.send_order_status_report(report);
1002            Ok(())
1003        });
1004
1005        Ok(())
1006    }
1007
1008    async fn generate_mass_status(
1009        &self,
1010        lookback_mins: Option<u64>,
1011    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1012        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1013
1014        let ts_now = self.clock.get_time_ns();
1015        let start = lookback_mins.map(|mins| {
1016            let lookback_ns = mins
1017                .saturating_mul(60)
1018                .saturating_mul(NANOSECONDS_IN_SECOND);
1019            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1020        });
1021
1022        let order_cmd = GenerateOrderStatusReportsBuilder::default()
1023            .ts_init(ts_now)
1024            .open_only(false)
1025            .start(start)
1026            .build()
1027            .map_err(|e| anyhow::anyhow!("{e}"))?;
1028
1029        let fill_cmd = GenerateFillReportsBuilder::default()
1030            .ts_init(ts_now)
1031            .start(start)
1032            .build()
1033            .map_err(|e| anyhow::anyhow!("{e}"))?;
1034
1035        let (order_reports, fill_reports) = tokio::try_join!(
1036            self.generate_order_status_reports(&order_cmd),
1037            self.generate_fill_reports(fill_cmd),
1038        )?;
1039
1040        log::info!("Received {} OrderStatusReports", order_reports.len());
1041        log::info!("Received {} FillReports", fill_reports.len());
1042
1043        let mut mass_status = ExecutionMassStatus::new(
1044            self.core.client_id,
1045            self.core.account_id,
1046            *BETFAIR_VENUE,
1047            ts_now,
1048            None,
1049        );
1050
1051        mass_status.add_order_reports(order_reports);
1052        mass_status.add_fill_reports(fill_reports);
1053
1054        Ok(Some(mass_status))
1055    }
1056
1057    async fn generate_order_status_reports(
1058        &self,
1059        cmd: &GenerateOrderStatusReports,
1060    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1061        let order_projection = if cmd.open_only {
1062            Some(OrderProjection::Executable)
1063        } else {
1064            Some(OrderProjection::All)
1065        };
1066
1067        let ts_init = self.clock.get_time_ns();
1068        let mut reports = Vec::new();
1069        let mut from_record: u32 = 0;
1070
1071        loop {
1072            let params = ListCurrentOrdersParams {
1073                bet_ids: None,
1074                market_ids: self.reconcile_market_ids(),
1075                order_projection,
1076                customer_order_refs: None,
1077                customer_strategy_refs: None,
1078                date_range: None,
1079                order_by: None,
1080                sort_dir: None,
1081                from_record: if from_record > 0 {
1082                    Some(from_record)
1083                } else {
1084                    None
1085                },
1086                record_count: None,
1087            };
1088
1089            let response: CurrentOrderSummaryReport = match self
1090                .http_client
1091                .send_betting(METHOD_LIST_CURRENT_ORDERS, &params)
1092                .await
1093            {
1094                Ok(r) => r,
1095                Err(e) if e.is_session_error() || e.is_rate_limit_error() => {
1096                    if e.is_rate_limit_error() {
1097                        log::warn!("Rate limited, retrying in {RATE_LIMIT_RETRY_DELAY_SECS}s");
1098                        tokio::time::sleep(tokio::time::Duration::from_secs(
1099                            RATE_LIMIT_RETRY_DELAY_SECS,
1100                        ))
1101                        .await;
1102                    } else {
1103                        log::warn!("Session error, refreshing session");
1104
1105                        if self.http_client.keep_alive().await.is_err() {
1106                            let _ = self.http_client.reconnect().await;
1107                        }
1108                    }
1109                    self.http_client
1110                        .send_betting(METHOD_LIST_CURRENT_ORDERS, &params)
1111                        .await
1112                        .map_err(|e| anyhow::anyhow!("{e}"))?
1113                }
1114                Err(e) => anyhow::bail!("{e}"),
1115            };
1116
1117            let page_size = response.current_orders.len() as u32;
1118
1119            for order in &response.current_orders {
1120                match parse_current_order_report(order, self.core.account_id, ts_init) {
1121                    Ok(mut r) => {
1122                        if let Some(ref rfo) = order.customer_order_ref
1123                            && let Ok(state) = self.ocm_state.lock()
1124                            && let Some(full_id) = state.resolve_client_order_id(Some(rfo.as_str()))
1125                        {
1126                            r.client_order_id = Some(full_id);
1127                        }
1128                        reports.push(r);
1129                    }
1130                    Err(e) => {
1131                        log::warn!("Failed to parse order report for {}: {e}", order.bet_id);
1132                    }
1133                }
1134            }
1135
1136            if !response.more_available {
1137                break;
1138            }
1139
1140            from_record += page_size;
1141        }
1142
1143        log::info!("Generated {} order status reports", reports.len());
1144        Ok(reports)
1145    }
1146
1147    async fn generate_fill_reports(
1148        &self,
1149        cmd: GenerateFillReports,
1150    ) -> anyhow::Result<Vec<FillReport>> {
1151        let date_range = match (cmd.start, cmd.end) {
1152            (Some(start), Some(end)) => Some(TimeRange {
1153                from: Some(start.to_rfc3339()),
1154                to: Some(end.to_rfc3339()),
1155            }),
1156            (Some(start), None) => Some(TimeRange {
1157                from: Some(start.to_rfc3339()),
1158                to: None,
1159            }),
1160            (None, Some(end)) => Some(TimeRange {
1161                from: None,
1162                to: Some(end.to_rfc3339()),
1163            }),
1164            (None, None) => None,
1165        };
1166
1167        let ts_init = self.clock.get_time_ns();
1168        let mut reports = Vec::new();
1169        let mut from_record: u32 = 0;
1170
1171        loop {
1172            let params = ListCurrentOrdersParams {
1173                bet_ids: None,
1174                market_ids: self.reconcile_market_ids(),
1175                order_projection: Some(OrderProjection::All),
1176                customer_order_refs: None,
1177                customer_strategy_refs: None,
1178                date_range: date_range.clone(),
1179                order_by: None,
1180                sort_dir: None,
1181                from_record: if from_record > 0 {
1182                    Some(from_record)
1183                } else {
1184                    None
1185                },
1186                record_count: None,
1187            };
1188
1189            let response: CurrentOrderSummaryReport = match self
1190                .http_client
1191                .send_betting(METHOD_LIST_CURRENT_ORDERS, &params)
1192                .await
1193            {
1194                Ok(r) => r,
1195                Err(e) if e.is_session_error() || e.is_rate_limit_error() => {
1196                    if e.is_rate_limit_error() {
1197                        log::warn!("Rate limited, retrying in {RATE_LIMIT_RETRY_DELAY_SECS}s");
1198                        tokio::time::sleep(tokio::time::Duration::from_secs(
1199                            RATE_LIMIT_RETRY_DELAY_SECS,
1200                        ))
1201                        .await;
1202                    } else {
1203                        log::warn!("Session error, refreshing session");
1204
1205                        if self.http_client.keep_alive().await.is_err() {
1206                            let _ = self.http_client.reconnect().await;
1207                        }
1208                    }
1209                    self.http_client
1210                        .send_betting(METHOD_LIST_CURRENT_ORDERS, &params)
1211                        .await
1212                        .map_err(|e| anyhow::anyhow!("{e}"))?
1213                }
1214                Err(e) => anyhow::bail!("{e}"),
1215            };
1216
1217            let page_size = response.current_orders.len() as u32;
1218
1219            for order in &response.current_orders {
1220                let size_matched = order.size_matched.unwrap_or(Decimal::ZERO);
1221                if size_matched == Decimal::ZERO {
1222                    continue;
1223                }
1224
1225                match parse_current_order_fill_report(
1226                    order,
1227                    self.core.account_id,
1228                    self.currency,
1229                    ts_init,
1230                ) {
1231                    Ok(mut r) => {
1232                        if let Some(ref rfo) = order.customer_order_ref
1233                            && let Ok(state) = self.ocm_state.lock()
1234                            && let Some(full_id) = state.resolve_client_order_id(Some(rfo.as_str()))
1235                        {
1236                            r.client_order_id = Some(full_id);
1237                        }
1238                        reports.push(r);
1239                    }
1240                    Err(e) => {
1241                        log::warn!("Failed to parse fill report for {}: {e}", order.bet_id);
1242                    }
1243                }
1244            }
1245
1246            if !response.more_available {
1247                break;
1248            }
1249
1250            from_record += page_size;
1251        }
1252
1253        log::info!("Generated {} fill reports", reports.len());
1254        Ok(reports)
1255    }
1256
1257    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
1258        let order = self.core.get_order(&cmd.client_order_id)?;
1259
1260        if order.is_closed() {
1261            log::warn!("Cannot submit closed order {}", order.client_order_id());
1262            return Ok(());
1263        }
1264
1265        if let Ok(mut state) = self.ocm_state.lock() {
1266            state.register_customer_order_ref(order.client_order_id());
1267        }
1268
1269        let instrument_id = order.instrument_id();
1270        let market_id = extract_market_id(&instrument_id)?;
1271        let (selection_id, handicap) = extract_selection_id(&instrument_id)?;
1272
1273        let side = BetfairSide::from(order.order_side());
1274        let size = order.quantity().as_decimal();
1275        let handicap_opt = if handicap == Decimal::ZERO {
1276            None
1277        } else {
1278            Some(handicap)
1279        };
1280        let customer_order_ref = Some(make_customer_order_ref(order.client_order_id().as_str()));
1281
1282        let instruction = match order.order_type() {
1283            OrderType::Limit => {
1284                let price = order
1285                    .price()
1286                    .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?
1287                    .as_decimal();
1288
1289                // BSP LimitOnClose: participates in starting price calculation
1290                // with a price limit, using liability instead of size
1291                if matches!(
1292                    order.time_in_force(),
1293                    TimeInForce::AtTheClose | TimeInForce::AtTheOpen
1294                ) {
1295                    PlaceInstruction {
1296                        order_type: BetfairOrderType::LimitOnClose,
1297                        selection_id,
1298                        handicap: handicap_opt,
1299                        side,
1300                        limit_order: None,
1301                        limit_on_close_order: Some(LimitOnCloseOrder {
1302                            liability: size,
1303                            price,
1304                        }),
1305                        market_on_close_order: None,
1306                        customer_order_ref,
1307                    }
1308                } else {
1309                    let (persistence_type, time_in_force, min_fill_size) =
1310                        match order.time_in_force() {
1311                            TimeInForce::Ioc => (
1312                                None,
1313                                Some(BetfairTimeInForce::FillOrKill),
1314                                Some(Decimal::ZERO),
1315                            ),
1316                            TimeInForce::Fok => (None, Some(BetfairTimeInForce::FillOrKill), None),
1317                            TimeInForce::Gtc => (Some(PersistenceType::Persist), None, None),
1318                            _ => (Some(PersistenceType::Lapse), None, None),
1319                        };
1320
1321                    PlaceInstruction {
1322                        order_type: BetfairOrderType::Limit,
1323                        selection_id,
1324                        handicap: handicap_opt,
1325                        side,
1326                        limit_order: Some(LimitOrder {
1327                            size,
1328                            price,
1329                            persistence_type,
1330                            time_in_force,
1331                            min_fill_size,
1332                            bet_target_type: None,
1333                            bet_target_size: None,
1334                        }),
1335                        limit_on_close_order: None,
1336                        market_on_close_order: None,
1337                        customer_order_ref,
1338                    }
1339                }
1340            }
1341            OrderType::Market => {
1342                if order.time_in_force() != TimeInForce::AtTheClose {
1343                    anyhow::bail!(
1344                        "Market orders on Betfair are only supported with AtTheClose \
1345                         time in force (BSP MarketOnClose)"
1346                    );
1347                }
1348                PlaceInstruction {
1349                    order_type: BetfairOrderType::MarketOnClose,
1350                    selection_id,
1351                    handicap: handicap_opt,
1352                    side,
1353                    limit_order: None,
1354                    limit_on_close_order: None,
1355                    market_on_close_order: Some(MarketOnCloseOrder { liability: size }),
1356                    customer_order_ref,
1357                }
1358            }
1359            other => {
1360                anyhow::bail!("Unsupported order type for Betfair: {other:?}");
1361            }
1362        };
1363
1364        let market_version = self.get_market_version(&instrument_id);
1365
1366        let params = PlaceOrdersParams {
1367            market_id,
1368            instructions: vec![instruction],
1369            customer_ref: None,
1370            market_version,
1371            customer_strategy_ref: None,
1372        };
1373
1374        let client_order_id = order.client_order_id();
1375        let strategy_id = order.strategy_id();
1376
1377        log::debug!("OrderSubmitted client_order_id={client_order_id}");
1378        self.emitter.emit_order_submitted(&order);
1379
1380        let http_client = Arc::clone(&self.http_client);
1381        let emitter = self.emitter.clone();
1382        let clock = self.clock;
1383        let ocm_state = Arc::clone(&self.ocm_state);
1384
1385        self.spawn_task("submit-order", async move {
1386            let report: PlaceExecutionReport = match http_client
1387                .send_betting_order(METHOD_PLACE_ORDERS, &params)
1388                .await
1389            {
1390                Ok(r) => r,
1391                Err(e) => {
1392                    // Transport errors (502, timeout, network reset) may mean the
1393                    // order was placed but the response was lost. Do not reject
1394                    // because the OCM stream will reconcile via customerOrderRef.
1395                    if e.is_order_placement_ambiguous() {
1396                        log::warn!(
1397                            "Ambiguous submit response for {client_order_id}: {e}. \
1398                             Order may be live, awaiting OCM reconciliation",
1399                        );
1400                        return Ok(());
1401                    }
1402
1403                    let ts_event = clock.get_time_ns();
1404                    emitter.emit_order_rejected_event(
1405                        strategy_id,
1406                        instrument_id,
1407                        client_order_id,
1408                        &format!("submit-order error: {e}"),
1409                        ts_event,
1410                        false,
1411                    );
1412                    return Ok(());
1413                }
1414            };
1415
1416            if report.status == ExecutionReportStatus::Timeout {
1417                log::warn!(
1418                    "Betfair Timeout for {client_order_id}. \
1419                     Order may be live, awaiting OCM reconciliation",
1420                );
1421                return Ok(());
1422            }
1423
1424            if let Some(instruction_reports) = &report.instruction_reports {
1425                if let Some(ir) = instruction_reports.first() {
1426                    if ir.status == InstructionReportStatus::Failure {
1427                        let reason = format_place_instruction_reason(ir, &report);
1428                        let ts_event = clock.get_time_ns();
1429                        emitter.emit_order_rejected_event(
1430                            strategy_id,
1431                            instrument_id,
1432                            client_order_id,
1433                            &reason,
1434                            ts_event,
1435                            false,
1436                        );
1437                        return Ok(());
1438                    }
1439
1440                    if let Some(bet_id) = &ir.bet_id {
1441                        let venue_order_id = VenueOrderId::from(bet_id.as_str());
1442                        let ts_event = clock.get_time_ns();
1443
1444                        if should_emit_http_accept(&ocm_state, &client_order_id) {
1445                            emitter.emit_order_accepted(&order, venue_order_id, ts_event);
1446                        }
1447                    }
1448                } else if report.status == ExecutionReportStatus::Failure
1449                    || report.status == ExecutionReportStatus::ProcessedWithErrors
1450                {
1451                    let reason = format_betfair_reason(
1452                        report.error_message.as_deref(),
1453                        report.error_code,
1454                        None,
1455                        "unknown error",
1456                    );
1457                    let ts_event = clock.get_time_ns();
1458                    emitter.emit_order_rejected_event(
1459                        strategy_id,
1460                        instrument_id,
1461                        client_order_id,
1462                        &reason,
1463                        ts_event,
1464                        false,
1465                    );
1466                }
1467            } else if report.status == ExecutionReportStatus::Failure
1468                || report.status == ExecutionReportStatus::ProcessedWithErrors
1469            {
1470                let reason = format_betfair_reason(
1471                    report.error_message.as_deref(),
1472                    report.error_code,
1473                    None,
1474                    "unknown error",
1475                );
1476                let ts_event = clock.get_time_ns();
1477                emitter.emit_order_rejected_event(
1478                    strategy_id,
1479                    instrument_id,
1480                    client_order_id,
1481                    &reason,
1482                    ts_event,
1483                    false,
1484                );
1485            }
1486
1487            Ok(())
1488        });
1489
1490        Ok(())
1491    }
1492
1493    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1494        let instrument_id = cmd.instrument_id;
1495        let market_id = extract_market_id(&instrument_id)?;
1496
1497        let venue_order_id = cmd
1498            .venue_order_id
1499            .ok_or_else(|| anyhow::anyhow!("Cannot cancel order without venue_order_id"))?;
1500        let bet_id: BetId = venue_order_id.to_string();
1501
1502        let params = CancelOrdersParams {
1503            market_id: Some(market_id),
1504            instructions: Some(vec![CancelInstruction {
1505                bet_id,
1506                size_reduction: None,
1507            }]),
1508            customer_ref: None,
1509        };
1510
1511        let client_order_id = cmd.client_order_id;
1512        let strategy_id = cmd.strategy_id;
1513        let http_client = Arc::clone(&self.http_client);
1514        let emitter = self.emitter.clone();
1515        let clock = self.clock;
1516
1517        self.spawn_task("cancel-order", async move {
1518            let result: Result<CancelExecutionReport, _> = http_client
1519                .send_betting_order(METHOD_CANCEL_ORDERS, &params)
1520                .await;
1521
1522            let report = match result {
1523                Ok(r) => r,
1524                Err(e) => {
1525                    let ts_event = clock.get_time_ns();
1526                    emitter.emit_order_cancel_rejected_event(
1527                        strategy_id,
1528                        instrument_id,
1529                        client_order_id,
1530                        Some(venue_order_id),
1531                        &format!("cancel-order error: {e}"),
1532                        ts_event,
1533                    );
1534                    return Ok(());
1535                }
1536            };
1537
1538            if report.status == ExecutionReportStatus::Timeout {
1539                log::warn!(
1540                    "Betfair Timeout for cancel {client_order_id}. \
1541                     Cancel may be delayed (in-play), awaiting OCM reconciliation",
1542                );
1543                return Ok(());
1544            }
1545
1546            if let Some(instruction_reports) = &report.instruction_reports
1547                && !instruction_reports.is_empty()
1548            {
1549                for ir in instruction_reports {
1550                    match ir.status {
1551                        InstructionReportStatus::Success => {}
1552                        InstructionReportStatus::Timeout => {
1553                            log::warn!(
1554                                "Cancel instruction timeout for {client_order_id}",
1555                            );
1556                        }
1557                        InstructionReportStatus::Failure => {
1558                            if ir.error_code
1559                                == Some(InstructionReportErrorCode::BetTakenOrLapsed)
1560                            {
1561                                log::debug!(
1562                                    "Cancel {client_order_id}: BetTakenOrLapsed, treating as success",
1563                                );
1564                                continue;
1565                            }
1566
1567                            let reason = format_cancel_instruction_reason(
1568                                ir.error_message.as_deref(),
1569                                ir.error_code,
1570                                report.error_message.as_deref(),
1571                                report.error_code,
1572                            );
1573                            let ts_event = clock.get_time_ns();
1574                            emitter.emit_order_cancel_rejected_event(
1575                                strategy_id,
1576                                instrument_id,
1577                                client_order_id,
1578                                Some(venue_order_id),
1579                                &reason,
1580                                ts_event,
1581                            );
1582                            return Ok(());
1583                        }
1584                    }
1585                }
1586            } else if report.status != ExecutionReportStatus::Success {
1587                let reason = format_betfair_reason(
1588                    report.error_message.as_deref(),
1589                    report.error_code,
1590                    None,
1591                    "unknown error",
1592                );
1593                let ts_event = clock.get_time_ns();
1594                emitter.emit_order_cancel_rejected_event(
1595                    strategy_id,
1596                    instrument_id,
1597                    client_order_id,
1598                    Some(venue_order_id),
1599                    &reason,
1600                    ts_event,
1601                );
1602            }
1603
1604            Ok(())
1605        });
1606
1607        Ok(())
1608    }
1609
1610    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1611        let instrument_id = cmd.instrument_id;
1612        let market_id = extract_market_id(&instrument_id)?;
1613
1614        let venue_order_id = cmd
1615            .venue_order_id
1616            .ok_or_else(|| anyhow::anyhow!("Cannot modify order without venue_order_id"))?;
1617        let bet_id: BetId = venue_order_id.to_string();
1618
1619        // Compare against existing order to determine actual changes
1620        let existing_order = self.core.get_order(&cmd.client_order_id);
1621        let has_price_change = match (&cmd.price, &existing_order) {
1622            (Some(new_price), Ok(order)) => order.price() != Some(*new_price),
1623            (Some(_), Err(_)) => true,
1624            (None, _) => false,
1625        };
1626        let has_quantity_change = match (&cmd.quantity, &existing_order) {
1627            (Some(new_qty), Ok(order)) => order.quantity() != *new_qty,
1628            (Some(_), Err(_)) => true,
1629            (None, _) => false,
1630        };
1631
1632        // Betfair does not support atomic price+quantity modification
1633        if has_price_change && has_quantity_change {
1634            let ts_event = self.clock.get_time_ns();
1635            self.emitter.emit_order_modify_rejected_event(
1636                cmd.strategy_id,
1637                instrument_id,
1638                cmd.client_order_id,
1639                Some(venue_order_id),
1640                "cannot modify price and quantity simultaneously on Betfair",
1641                ts_event,
1642            );
1643            return Ok(());
1644        }
1645
1646        let client_order_id = cmd.client_order_id;
1647        let strategy_id = cmd.strategy_id;
1648        let http_client = Arc::clone(&self.http_client);
1649        let emitter = self.emitter.clone();
1650        let clock = self.clock;
1651
1652        if has_price_change {
1653            let new_price = cmd.price.unwrap().as_decimal();
1654            let old_bet_id = bet_id.clone();
1655
1656            // Track pending replace so the OCM handler suppresses the
1657            // cancel event for the old bet that Betfair emits as part
1658            // of the replace operation.
1659            if let Ok(mut state) = self.ocm_state.lock() {
1660                state
1661                    .pending_update_keys
1662                    .insert((client_order_id, old_bet_id.clone()));
1663            }
1664
1665            let market_version = self.get_market_version(&instrument_id);
1666
1667            let params = ReplaceOrdersParams {
1668                market_id,
1669                instructions: vec![ReplaceInstruction { bet_id, new_price }],
1670                customer_ref: None,
1671                market_version,
1672            };
1673
1674            let ocm_state = Arc::clone(&self.ocm_state);
1675
1676            self.spawn_task("modify-order-price", async move {
1677                let result: Result<ReplaceExecutionReport, _> = http_client
1678                    .send_betting_order(METHOD_REPLACE_ORDERS, &params)
1679                    .await;
1680
1681                match result {
1682                    Ok(report) if report.status == ExecutionReportStatus::Success => {
1683                        if let Ok(mut state) = ocm_state.lock() {
1684                            state
1685                                .pending_update_keys
1686                                .remove(&(client_order_id, old_bet_id.clone()));
1687                            state.replaced_venue_order_ids.insert(old_bet_id);
1688                        }
1689                    }
1690                    Ok(report) if report.status == ExecutionReportStatus::Timeout => {
1691                        log::warn!(
1692                            "Betfair Timeout for modify {client_order_id}. \
1693                             Replace may be pending, awaiting OCM reconciliation",
1694                        );
1695                    }
1696                    Ok(report) => {
1697                        if let Ok(mut state) = ocm_state.lock() {
1698                            state
1699                                .pending_update_keys
1700                                .remove(&(client_order_id, old_bet_id));
1701                        }
1702
1703                        if let Some(instruction_reports) = &report.instruction_reports
1704                            && !instruction_reports.is_empty()
1705                        {
1706                            for ir in instruction_reports {
1707                                match ir.status {
1708                                    InstructionReportStatus::Success => {}
1709                                    InstructionReportStatus::Timeout => {
1710                                        log::warn!(
1711                                            "Replace instruction timeout for {client_order_id}",
1712                                        );
1713                                    }
1714                                    InstructionReportStatus::Failure => {
1715                                        let reason = format_replace_instruction_reason(ir, &report);
1716                                        let ts_event = clock.get_time_ns();
1717                                        emitter.emit_order_modify_rejected_event(
1718                                            strategy_id,
1719                                            instrument_id,
1720                                            client_order_id,
1721                                            Some(venue_order_id),
1722                                            &reason,
1723                                            ts_event,
1724                                        );
1725                                        return Ok(());
1726                                    }
1727                                }
1728                            }
1729                        }
1730
1731                        let reason = format_betfair_reason(
1732                            report.error_message.as_deref(),
1733                            report.error_code,
1734                            None,
1735                            "unknown error",
1736                        );
1737                        let ts_event = clock.get_time_ns();
1738                        emitter.emit_order_modify_rejected_event(
1739                            strategy_id,
1740                            instrument_id,
1741                            client_order_id,
1742                            Some(venue_order_id),
1743                            &reason,
1744                            ts_event,
1745                        );
1746                    }
1747                    Err(e) => {
1748                        if let Ok(mut state) = ocm_state.lock() {
1749                            state
1750                                .pending_update_keys
1751                                .remove(&(client_order_id, old_bet_id));
1752                        }
1753                        let ts_event = clock.get_time_ns();
1754                        emitter.emit_order_modify_rejected_event(
1755                            strategy_id,
1756                            instrument_id,
1757                            client_order_id,
1758                            Some(venue_order_id),
1759                            &format!("modify-order error: {e}"),
1760                            ts_event,
1761                        );
1762                    }
1763                }
1764
1765                Ok(())
1766            });
1767        } else if has_quantity_change {
1768            // Quantity reduction via partial cancel
1769            let order = self.core.get_order(&client_order_id)?;
1770            let existing_qty = order.quantity().as_decimal();
1771            let new_qty = cmd.quantity.unwrap().as_decimal();
1772
1773            if new_qty >= existing_qty {
1774                let ts_event = self.clock.get_time_ns();
1775                self.emitter.emit_order_modify_rejected_event(
1776                    strategy_id,
1777                    instrument_id,
1778                    client_order_id,
1779                    Some(venue_order_id),
1780                    "can only reduce quantity on Betfair",
1781                    ts_event,
1782                );
1783                return Ok(());
1784            }
1785
1786            let size_reduction = existing_qty - new_qty;
1787            let params = CancelOrdersParams {
1788                market_id: Some(market_id),
1789                instructions: Some(vec![CancelInstruction {
1790                    bet_id,
1791                    size_reduction: Some(size_reduction),
1792                }]),
1793                customer_ref: None,
1794            };
1795
1796            self.spawn_task("modify-order-quantity", async move {
1797                let result: Result<CancelExecutionReport, _> = http_client
1798                    .send_betting_order(METHOD_CANCEL_ORDERS, &params)
1799                    .await;
1800
1801                match result {
1802                    Ok(report) if report.status != ExecutionReportStatus::Success => {
1803                        let reason = format_betfair_reason(
1804                            report.error_message.as_deref(),
1805                            report.error_code,
1806                            None,
1807                            "unknown error",
1808                        );
1809                        let ts_event = clock.get_time_ns();
1810                        emitter.emit_order_modify_rejected_event(
1811                            strategy_id,
1812                            instrument_id,
1813                            client_order_id,
1814                            Some(venue_order_id),
1815                            &reason,
1816                            ts_event,
1817                        );
1818                    }
1819                    Err(e) => {
1820                        let ts_event = clock.get_time_ns();
1821                        emitter.emit_order_modify_rejected_event(
1822                            strategy_id,
1823                            instrument_id,
1824                            client_order_id,
1825                            Some(venue_order_id),
1826                            &format!("modify-order error: {e}"),
1827                            ts_event,
1828                        );
1829                    }
1830                    Ok(_) => {}
1831                }
1832
1833                Ok(())
1834            });
1835        } else {
1836            let ts_event = self.clock.get_time_ns();
1837            self.emitter.emit_order_modify_rejected_event(
1838                strategy_id,
1839                instrument_id,
1840                client_order_id,
1841                Some(venue_order_id),
1842                "no effective change in price or quantity",
1843                ts_event,
1844            );
1845        }
1846
1847        Ok(())
1848    }
1849
1850    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1851        let instrument_id = cmd.instrument_id;
1852        let market_id = extract_market_id(&instrument_id)?;
1853
1854        let params = CancelOrdersParams {
1855            market_id: Some(market_id),
1856            instructions: None,
1857            customer_ref: None,
1858        };
1859
1860        let http_client = Arc::clone(&self.http_client);
1861
1862        self.spawn_task("cancel-all-orders", async move {
1863            let result = http_client
1864                .send_betting_order::<serde_json::Value, _>(METHOD_CANCEL_ORDERS, &params)
1865                .await;
1866
1867            if let Err(e) = result {
1868                log::warn!("Failed to cancel all orders: {e}");
1869            }
1870
1871            Ok(())
1872        });
1873
1874        Ok(())
1875    }
1876
1877    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1878        let instrument_id = cmd.instrument_id;
1879        let market_id = extract_market_id(&instrument_id)?;
1880
1881        let mut instructions = Vec::new();
1882        let mut valid_cancels = Vec::new();
1883
1884        for cancel in &cmd.cancels {
1885            match cancel.venue_order_id {
1886                Some(venue_order_id) => {
1887                    let bet_id: BetId = venue_order_id.to_string();
1888                    instructions.push(CancelInstruction {
1889                        bet_id,
1890                        size_reduction: None,
1891                    });
1892                    valid_cancels.push(cancel);
1893                }
1894                None => {
1895                    let ts_event = self.clock.get_time_ns();
1896                    self.emitter.emit_order_cancel_rejected_event(
1897                        cancel.strategy_id,
1898                        cancel.instrument_id,
1899                        cancel.client_order_id,
1900                        None,
1901                        "no venue_order_id",
1902                        ts_event,
1903                    );
1904                }
1905            }
1906        }
1907
1908        if valid_cancels.is_empty() {
1909            return Ok(());
1910        }
1911
1912        let params = CancelOrdersParams {
1913            market_id: Some(market_id),
1914            instructions: Some(instructions),
1915            customer_ref: None,
1916        };
1917
1918        let cancel_data: Vec<_> = valid_cancels
1919            .iter()
1920            .map(|c| {
1921                (
1922                    c.strategy_id,
1923                    c.instrument_id,
1924                    c.client_order_id,
1925                    c.venue_order_id,
1926                )
1927            })
1928            .collect();
1929
1930        let http_client = Arc::clone(&self.http_client);
1931        let emitter = self.emitter.clone();
1932        let clock = self.clock;
1933
1934        self.spawn_task("batch-cancel-orders", async move {
1935            let report: CancelExecutionReport = match http_client
1936                .send_betting_order(METHOD_CANCEL_ORDERS, &params)
1937                .await
1938            {
1939                Ok(r) => r,
1940                Err(e) => {
1941                    let ts_event = clock.get_time_ns();
1942
1943                    for (strategy_id, instr_id, client_oid, venue_oid) in &cancel_data {
1944                        emitter.emit_order_cancel_rejected_event(
1945                            *strategy_id,
1946                            *instr_id,
1947                            *client_oid,
1948                            *venue_oid,
1949                            &format!("batch-cancel error: {e}"),
1950                            ts_event,
1951                        );
1952                    }
1953                    return Ok(());
1954                }
1955            };
1956
1957            if report.status == ExecutionReportStatus::Failure {
1958                let reason = format_betfair_reason(
1959                    report.error_message.as_deref(),
1960                    report.error_code,
1961                    None,
1962                    "unknown error",
1963                );
1964
1965                if report.instruction_reports.is_none() {
1966                    let ts_event = clock.get_time_ns();
1967
1968                    for (strategy_id, instr_id, client_oid, venue_oid) in &cancel_data {
1969                        emitter.emit_order_cancel_rejected_event(
1970                            *strategy_id,
1971                            *instr_id,
1972                            *client_oid,
1973                            *venue_oid,
1974                            &reason,
1975                            ts_event,
1976                        );
1977                    }
1978                    return Ok(());
1979                }
1980            }
1981
1982            if let Some(instruction_reports) = &report.instruction_reports {
1983                for (ir, (strategy_id, instr_id, client_oid, venue_oid)) in
1984                    instruction_reports.iter().zip(cancel_data.iter())
1985                {
1986                    match ir.status {
1987                        InstructionReportStatus::Success => {}
1988                        InstructionReportStatus::Timeout => {
1989                            log::warn!(
1990                                "Cancel timeout for {client_oid}: leaving order state unchanged",
1991                            );
1992                        }
1993                        InstructionReportStatus::Failure => {
1994                            // BetTakenOrLapsed means the bet already completed, treat as success
1995                            if ir.error_code == Some(InstructionReportErrorCode::BetTakenOrLapsed) {
1996                                continue;
1997                            }
1998
1999                            let reason = format_cancel_instruction_reason(
2000                                ir.error_message.as_deref(),
2001                                ir.error_code,
2002                                report.error_message.as_deref(),
2003                                report.error_code,
2004                            );
2005                            let ts_event = clock.get_time_ns();
2006                            emitter.emit_order_cancel_rejected_event(
2007                                *strategy_id,
2008                                *instr_id,
2009                                *client_oid,
2010                                *venue_oid,
2011                                &reason,
2012                                ts_event,
2013                            );
2014                        }
2015                    }
2016                }
2017            }
2018
2019            Ok(())
2020        });
2021
2022        Ok(())
2023    }
2024
2025    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
2026        let instrument_id = cmd.instrument_id;
2027        let market_id = extract_market_id(&instrument_id)?;
2028        let (selection_id, handicap) = extract_selection_id(&instrument_id)?;
2029
2030        let handicap_opt = if handicap == Decimal::ZERO {
2031            None
2032        } else {
2033            Some(handicap)
2034        };
2035
2036        let mut instructions = Vec::new();
2037        let mut order_snapshots = Vec::new();
2038
2039        for client_order_id in &cmd.order_list.client_order_ids {
2040            let order = self.core.get_order(client_order_id)?;
2041
2042            if order.is_closed() {
2043                log::warn!("Skipping closed order {client_order_id}");
2044                continue;
2045            }
2046
2047            if let Ok(mut state) = self.ocm_state.lock() {
2048                state.register_customer_order_ref(order.client_order_id());
2049            }
2050
2051            let side = BetfairSide::from(order.order_side());
2052            let size = order.quantity().as_decimal();
2053            let customer_order_ref =
2054                Some(make_customer_order_ref(order.client_order_id().as_str()));
2055
2056            let instruction = match order.order_type() {
2057                OrderType::Limit => {
2058                    let price = order
2059                        .price()
2060                        .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?
2061                        .as_decimal();
2062
2063                    if matches!(
2064                        order.time_in_force(),
2065                        TimeInForce::AtTheClose | TimeInForce::AtTheOpen
2066                    ) {
2067                        PlaceInstruction {
2068                            order_type: BetfairOrderType::LimitOnClose,
2069                            selection_id,
2070                            handicap: handicap_opt,
2071                            side,
2072                            limit_order: None,
2073                            limit_on_close_order: Some(LimitOnCloseOrder {
2074                                liability: size,
2075                                price,
2076                            }),
2077                            market_on_close_order: None,
2078                            customer_order_ref,
2079                        }
2080                    } else {
2081                        let (persistence_type, time_in_force, min_fill_size) = match order
2082                            .time_in_force()
2083                        {
2084                            TimeInForce::Ioc => (
2085                                None,
2086                                Some(BetfairTimeInForce::FillOrKill),
2087                                Some(Decimal::ZERO),
2088                            ),
2089                            TimeInForce::Fok => (None, Some(BetfairTimeInForce::FillOrKill), None),
2090                            TimeInForce::Gtc => (Some(PersistenceType::Persist), None, None),
2091                            _ => (Some(PersistenceType::Lapse), None, None),
2092                        };
2093
2094                        PlaceInstruction {
2095                            order_type: BetfairOrderType::Limit,
2096                            selection_id,
2097                            handicap: handicap_opt,
2098                            side,
2099                            limit_order: Some(LimitOrder {
2100                                size,
2101                                price,
2102                                persistence_type,
2103                                time_in_force,
2104                                min_fill_size,
2105                                bet_target_type: None,
2106                                bet_target_size: None,
2107                            }),
2108                            limit_on_close_order: None,
2109                            market_on_close_order: None,
2110                            customer_order_ref,
2111                        }
2112                    }
2113                }
2114                OrderType::Market => {
2115                    if order.time_in_force() != TimeInForce::AtTheClose {
2116                        anyhow::bail!(
2117                            "Market orders on Betfair are only supported with AtTheClose \
2118                             time in force (BSP MarketOnClose)"
2119                        );
2120                    }
2121                    PlaceInstruction {
2122                        order_type: BetfairOrderType::MarketOnClose,
2123                        selection_id,
2124                        handicap: handicap_opt,
2125                        side,
2126                        limit_order: None,
2127                        limit_on_close_order: None,
2128                        market_on_close_order: Some(MarketOnCloseOrder { liability: size }),
2129                        customer_order_ref,
2130                    }
2131                }
2132                other => {
2133                    anyhow::bail!("Unsupported order type for Betfair: {other:?}");
2134                }
2135            };
2136
2137            instructions.push(instruction);
2138            order_snapshots.push((order.client_order_id(), order.strategy_id(), order.clone()));
2139        }
2140
2141        if instructions.is_empty() {
2142            return Ok(());
2143        }
2144
2145        let market_version = self.get_market_version(&instrument_id);
2146
2147        let params = PlaceOrdersParams {
2148            market_id,
2149            instructions,
2150            customer_ref: None,
2151            market_version,
2152            customer_strategy_ref: None,
2153        };
2154
2155        for (_, _, order) in &order_snapshots {
2156            log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
2157            self.emitter.emit_order_submitted(order);
2158        }
2159
2160        let http_client = Arc::clone(&self.http_client);
2161        let emitter = self.emitter.clone();
2162        let clock = self.clock;
2163        let ocm_state = Arc::clone(&self.ocm_state);
2164
2165        self.spawn_task("submit-order-list", async move {
2166            let report: PlaceExecutionReport = match http_client
2167                .send_betting_order(METHOD_PLACE_ORDERS, &params)
2168                .await
2169            {
2170                Ok(r) => r,
2171                Err(e) => {
2172                    if e.is_order_placement_ambiguous() {
2173                        log::warn!(
2174                            "Ambiguous submit response for order list: {e}. \
2175                             Orders may be live, awaiting OCM reconciliation",
2176                        );
2177                        return Ok(());
2178                    }
2179
2180                    let ts_event = clock.get_time_ns();
2181
2182                    for (client_oid, strategy_id, _) in &order_snapshots {
2183                        emitter.emit_order_rejected_event(
2184                            *strategy_id,
2185                            instrument_id,
2186                            *client_oid,
2187                            &format!("submit-order-list error: {e}"),
2188                            ts_event,
2189                            false,
2190                        );
2191                    }
2192                    return Ok(());
2193                }
2194            };
2195
2196            if report.status == ExecutionReportStatus::Failure {
2197                let reason = format_betfair_reason(
2198                    report.error_message.as_deref(),
2199                    report.error_code,
2200                    None,
2201                    "unknown error",
2202                );
2203
2204                if report.instruction_reports.is_none() {
2205                    let ts_event = clock.get_time_ns();
2206
2207                    for (client_oid, strategy_id, _) in &order_snapshots {
2208                        emitter.emit_order_rejected_event(
2209                            *strategy_id,
2210                            instrument_id,
2211                            *client_oid,
2212                            &reason,
2213                            ts_event,
2214                            false,
2215                        );
2216                    }
2217                    return Ok(());
2218                }
2219            }
2220
2221            if report.status == ExecutionReportStatus::Timeout {
2222                log::warn!(
2223                    "Betfair Timeout for order list. \
2224                     Orders may be live, awaiting OCM reconciliation",
2225                );
2226                return Ok(());
2227            }
2228
2229            if let Some(instruction_reports) = &report.instruction_reports {
2230                for (ir, (client_oid, strategy_id, order)) in
2231                    instruction_reports.iter().zip(order_snapshots.iter())
2232                {
2233                    match ir.status {
2234                        InstructionReportStatus::Success => {
2235                            if let Some(bet_id) = &ir.bet_id {
2236                                let venue_order_id = VenueOrderId::from(bet_id.as_str());
2237                                let ts_event = clock.get_time_ns();
2238
2239                                if should_emit_http_accept(&ocm_state, client_oid) {
2240                                    emitter.emit_order_accepted(order, venue_order_id, ts_event);
2241                                }
2242                            }
2243                        }
2244                        InstructionReportStatus::Timeout => {
2245                            log::warn!(
2246                                "Submit timeout for {client_oid}: \
2247                                 leaving SUBMITTED for reconciliation",
2248                            );
2249                        }
2250                        InstructionReportStatus::Failure => {
2251                            let reason = format_place_instruction_reason(ir, &report);
2252                            let ts_event = clock.get_time_ns();
2253                            emitter.emit_order_rejected_event(
2254                                *strategy_id,
2255                                instrument_id,
2256                                *client_oid,
2257                                &reason,
2258                                ts_event,
2259                                false,
2260                            );
2261                        }
2262                    }
2263                }
2264            }
2265
2266            Ok(())
2267        });
2268
2269        Ok(())
2270    }
2271}
2272
2273fn list_current_orders_filter_bet_id(bet_id: String) -> ListCurrentOrdersParams {
2274    ListCurrentOrdersParams {
2275        bet_ids: Some(vec![bet_id]),
2276        market_ids: None,
2277        order_projection: None,
2278        customer_order_refs: None,
2279        customer_strategy_refs: None,
2280        date_range: None,
2281        order_by: None,
2282        sort_dir: None,
2283        from_record: None,
2284        record_count: None,
2285    }
2286}
2287
2288fn list_current_orders_filter_ref(customer_order_ref: String) -> ListCurrentOrdersParams {
2289    ListCurrentOrdersParams {
2290        bet_ids: None,
2291        market_ids: None,
2292        order_projection: None,
2293        customer_order_refs: Some(vec![customer_order_ref]),
2294        customer_strategy_refs: None,
2295        date_range: None,
2296        order_by: None,
2297        sort_dir: None,
2298        from_record: None,
2299        record_count: None,
2300    }
2301}
2302
2303fn extend_unique(
2304    candidates: &mut Vec<CurrentOrderSummary>,
2305    seen: &mut AHashSet<String>,
2306    orders: Vec<CurrentOrderSummary>,
2307) {
2308    for order in orders {
2309        if seen.insert(order.bet_id.clone()) {
2310            candidates.push(order);
2311        }
2312    }
2313}
2314
2315fn select_order_for_query(
2316    orders: &[CurrentOrderSummary],
2317    expected_instrument_id: InstrumentId,
2318    expected_client_order_id: ClientOrderId,
2319    expected_venue_order_id: Option<VenueOrderId>,
2320) -> Option<&CurrentOrderSummary> {
2321    let matching: Vec<&CurrentOrderSummary> = orders
2322        .iter()
2323        .filter(|o| {
2324            make_instrument_id(&o.market_id, o.selection_id, o.handicap) == expected_instrument_id
2325        })
2326        .collect();
2327
2328    let candidates: Vec<&CurrentOrderSummary> = if matching.is_empty() {
2329        // No instrument match: accept only an exact venue_order_id hit
2330        // (pre-existing orders without a recognizable customer_order_ref).
2331        // A lone foreign-instrument candidate is not enough, since a 32-char
2332        // customer_order_ref collision can surface a single unrelated bet.
2333        if let Some(vid) = expected_venue_order_id
2334            && let Some(order) = orders.iter().find(|o| o.bet_id == vid.as_str())
2335        {
2336            return Some(order);
2337        }
2338        log::warn!(
2339            "Betfair query_order returned {} orders for client_order_id={expected_client_order_id}, none matching instrument {expected_instrument_id}; skipping to avoid cross-instrument reconciliation",
2340            orders.len(),
2341        );
2342        return None;
2343    } else {
2344        matching
2345    };
2346
2347    // Prefer EXECUTABLE so a live replacement wins over a cancelled
2348    // predecessor sharing the same customer_order_ref.
2349    let executable: Vec<&CurrentOrderSummary> = candidates
2350        .iter()
2351        .copied()
2352        .filter(|o| o.status == BetfairOrderStatus::Executable)
2353        .collect();
2354
2355    let pool = if executable.is_empty() {
2356        candidates
2357    } else {
2358        executable
2359    };
2360
2361    // Tiebreaker: most recently placed bet. Picks the replacement over the
2362    // predecessor even when both are already terminal by poll time.
2363    pool.into_iter()
2364        .max_by(|a, b| a.placed_date.cmp(&b.placed_date))
2365}
2366
2367async fn list_current_orders_with_retry(
2368    http_client: &Arc<BetfairHttpClient>,
2369    params: &ListCurrentOrdersParams,
2370) -> anyhow::Result<CurrentOrderSummaryReport> {
2371    match http_client
2372        .send_betting(METHOD_LIST_CURRENT_ORDERS, params)
2373        .await
2374    {
2375        Ok(r) => Ok(r),
2376        Err(e) if e.is_session_error() || e.is_rate_limit_error() => {
2377            if e.is_rate_limit_error() {
2378                log::warn!("Rate limited, retrying in {RATE_LIMIT_RETRY_DELAY_SECS}s");
2379                tokio::time::sleep(tokio::time::Duration::from_secs(
2380                    RATE_LIMIT_RETRY_DELAY_SECS,
2381                ))
2382                .await;
2383            } else {
2384                log::warn!("Session error, refreshing session");
2385
2386                if http_client.keep_alive().await.is_err() {
2387                    let _ = http_client.reconnect().await;
2388                }
2389            }
2390            http_client
2391                .send_betting(METHOD_LIST_CURRENT_ORDERS, params)
2392                .await
2393                .map_err(|e| anyhow::anyhow!("{e}"))
2394        }
2395        Err(e) => Err(anyhow::anyhow!("{e}")),
2396    }
2397}
2398
2399fn should_emit_http_accept(
2400    ocm_state: &Arc<Mutex<OcmState>>,
2401    client_order_id: &ClientOrderId,
2402) -> bool {
2403    let Ok(state) = ocm_state.lock() else {
2404        log::error!("OcmState mutex poisoned");
2405        return true;
2406    };
2407
2408    if state
2409        .stream_reported_client_orders
2410        .contains(client_order_id)
2411    {
2412        log::info!(
2413            "Suppressing late HTTP acceptance for {client_order_id}: OCM already reported order state"
2414        );
2415        return false;
2416    }
2417
2418    true
2419}
2420
2421fn format_betfair_reason(
2422    error_message: Option<&str>,
2423    error_code: Option<impl fmt::Debug>,
2424    fallback: Option<String>,
2425    unknown: &str,
2426) -> String {
2427    if let Some(message) = error_message
2428        .map(str::trim)
2429        .filter(|message| !message.is_empty())
2430    {
2431        return match error_code {
2432            Some(code) => format!("{message} ({code:?})"),
2433            None => message.to_string(),
2434        };
2435    }
2436
2437    error_code
2438        .map(|code| format!("{code:?}"))
2439        .or(fallback.filter(|s| !s.trim().is_empty()))
2440        .unwrap_or_else(|| unknown.to_string())
2441}
2442
2443fn format_place_instruction_reason(
2444    instruction_report: &PlaceInstructionReport,
2445    report: &PlaceExecutionReport,
2446) -> String {
2447    format_betfair_reason(
2448        instruction_report.error_message.as_deref(),
2449        instruction_report.error_code,
2450        report_fallback(report.error_message.as_deref(), report.error_code),
2451        "unknown error",
2452    )
2453}
2454
2455fn format_cancel_instruction_reason(
2456    error_message: Option<&str>,
2457    error_code: Option<InstructionReportErrorCode>,
2458    report_error_message: Option<&str>,
2459    report_error_code: Option<ExecutionReportErrorCode>,
2460) -> String {
2461    format_betfair_reason(
2462        error_message,
2463        error_code,
2464        report_fallback(report_error_message, report_error_code),
2465        "unknown instruction error",
2466    )
2467}
2468
2469fn format_replace_instruction_reason(
2470    instruction_report: &ReplaceInstructionReport,
2471    report: &ReplaceExecutionReport,
2472) -> String {
2473    let nested_reason = instruction_report
2474        .place_instruction_report
2475        .as_ref()
2476        .and_then(|ir| instruction_fallback(ir.error_message.as_deref(), ir.error_code))
2477        .or_else(|| {
2478            instruction_report
2479                .cancel_instruction_report
2480                .as_ref()
2481                .and_then(|ir| instruction_fallback(ir.error_message.as_deref(), ir.error_code))
2482        });
2483
2484    format_betfair_reason(
2485        instruction_report.error_message.as_deref(),
2486        instruction_report.error_code,
2487        nested_reason
2488            .or_else(|| report_fallback(report.error_message.as_deref(), report.error_code)),
2489        "unknown instruction error",
2490    )
2491}
2492
2493fn report_fallback(
2494    error_message: Option<&str>,
2495    error_code: Option<ExecutionReportErrorCode>,
2496) -> Option<String> {
2497    error_message
2498        .map(str::trim)
2499        .filter(|s| !s.is_empty())
2500        .map(str::to_string)
2501        .or_else(|| error_code.map(|code| format!("{code:?}")))
2502}
2503
2504fn instruction_fallback(
2505    error_message: Option<&str>,
2506    error_code: Option<InstructionReportErrorCode>,
2507) -> Option<String> {
2508    error_message
2509        .map(str::trim)
2510        .filter(|s| !s.is_empty())
2511        .map(str::to_string)
2512        .or_else(|| error_code.map(|code| format!("{code:?}")))
2513}
2514
2515#[cfg(test)]
2516mod tests {
2517    use nautilus_model::types::Quantity;
2518    use rstest::rstest;
2519    use rust_decimal::Decimal;
2520
2521    use super::*;
2522
2523    #[rstest]
2524    #[case(
2525        Some("Price out of range"),
2526        Some(InstructionReportErrorCode::InvalidOdds),
2527        None,
2528        "unknown",
2529        "Price out of range (InvalidOdds)"
2530    )]
2531    #[case(
2532        Some("Price out of range"),
2533        None,
2534        None,
2535        "unknown",
2536        "Price out of range"
2537    )]
2538    #[case(
2539        None,
2540        Some(InstructionReportErrorCode::ErrorInOrder),
2541        None,
2542        "unknown",
2543        "ErrorInOrder"
2544    )]
2545    #[case(None, None, Some("report-level msg".to_string()), "unknown", "report-level msg")]
2546    #[case(None, None, None, "unknown error", "unknown error")]
2547    #[case(
2548        Some("  "),
2549        Some(InstructionReportErrorCode::ErrorInOrder),
2550        None,
2551        "unknown",
2552        "ErrorInOrder"
2553    )]
2554    #[case(Some(""), None, Some(String::new()), "fallback", "fallback")]
2555    #[case(Some("  \n "), None, Some("  ".to_string()), "unknown", "unknown")]
2556    fn test_format_betfair_reason(
2557        #[case] error_message: Option<&str>,
2558        #[case] error_code: Option<InstructionReportErrorCode>,
2559        #[case] fallback: Option<String>,
2560        #[case] unknown: &str,
2561        #[case] expected: &str,
2562    ) {
2563        assert_eq!(
2564            format_betfair_reason(error_message, error_code, fallback, unknown),
2565            expected,
2566        );
2567    }
2568
2569    #[rstest]
2570    fn test_ocm_state_register_and_resolve() {
2571        let mut state = OcmState::default();
2572        let client_oid = ClientOrderId::from("O-20240101-001");
2573
2574        state.register_customer_order_ref(client_oid);
2575
2576        let rfo = make_customer_order_ref(client_oid.as_str());
2577        let resolved = state.resolve_client_order_id(Some(&rfo));
2578        assert_eq!(resolved, Some(client_oid));
2579    }
2580
2581    #[rstest]
2582    fn test_ocm_state_resolve_none_for_unknown_rfo() {
2583        let state = OcmState::default();
2584        assert!(state.resolve_client_order_id(Some("unknown")).is_none());
2585        assert!(state.resolve_client_order_id(None).is_none());
2586    }
2587
2588    #[rstest]
2589    fn test_ocm_state_register_with_legacy() {
2590        let mut state = OcmState::default();
2591        let id = "O-20240101-550e8400-e29b-41d4-a716-446655440000";
2592        let client_oid = ClientOrderId::from(id);
2593
2594        state.register_customer_order_ref_with_legacy(client_oid);
2595
2596        let rfo_current = make_customer_order_ref(id);
2597        let rfo_legacy = make_customer_order_ref_legacy(id);
2598        assert_ne!(rfo_current, rfo_legacy);
2599
2600        assert_eq!(
2601            state.resolve_client_order_id(Some(&rfo_current)),
2602            Some(client_oid)
2603        );
2604        assert_eq!(
2605            state.resolve_client_order_id(Some(&rfo_legacy)),
2606            Some(client_oid)
2607        );
2608    }
2609
2610    #[rstest]
2611    fn test_ocm_state_remove_customer_order_refs() {
2612        let mut state = OcmState::default();
2613        let id = "O-20240101-550e8400-e29b-41d4-a716-446655440000";
2614        let client_oid = ClientOrderId::from(id);
2615
2616        state.register_customer_order_ref_with_legacy(client_oid);
2617        state.remove_customer_order_refs(&client_oid);
2618
2619        let rfo_current = make_customer_order_ref(id);
2620        let rfo_legacy = make_customer_order_ref_legacy(id);
2621        assert!(state.resolve_client_order_id(Some(&rfo_current)).is_none());
2622        assert!(state.resolve_client_order_id(Some(&rfo_legacy)).is_none());
2623    }
2624
2625    #[rstest]
2626    fn test_should_emit_http_accept_without_stream_report() {
2627        let state = Arc::new(Mutex::new(OcmState::default()));
2628        let client_oid = ClientOrderId::from("O-001");
2629
2630        assert!(should_emit_http_accept(&state, &client_oid));
2631    }
2632
2633    #[rstest]
2634    fn test_should_not_emit_http_accept_after_stream_report() {
2635        let client_oid = ClientOrderId::from("O-001");
2636        let mut inner = OcmState::default();
2637        inner.stream_reported_client_orders.insert(client_oid);
2638        let state = Arc::new(Mutex::new(inner));
2639
2640        assert!(!should_emit_http_accept(&state, &client_oid));
2641    }
2642
2643    #[rstest]
2644    fn test_ocm_state_terminal_deduplication() {
2645        let mut state = OcmState::default();
2646
2647        // First call marks as terminal, returns false (not duplicate)
2648        assert!(!state.try_mark_terminal("bet123"));
2649
2650        // Second call returns true (already terminal)
2651        assert!(state.try_mark_terminal("bet123"));
2652    }
2653
2654    #[rstest]
2655    fn test_ocm_state_suppress_cancel_for_replaced() {
2656        let mut state = OcmState::default();
2657        let client_oid = ClientOrderId::from("O-001");
2658
2659        state.replaced_venue_order_ids.insert("old_bet".to_string());
2660        assert!(state.should_suppress_cancel(&client_oid, "old_bet"));
2661        assert!(!state.should_suppress_cancel(&client_oid, "new_bet"));
2662    }
2663
2664    #[rstest]
2665    fn test_ocm_state_suppress_cancel_for_pending_replace() {
2666        let mut state = OcmState::default();
2667        let client_oid = ClientOrderId::from("O-001");
2668
2669        state
2670            .pending_update_keys
2671            .insert((client_oid, "old_bet".to_string()));
2672
2673        assert!(state.should_suppress_cancel(&client_oid, "old_bet"));
2674        assert!(!state.should_suppress_cancel(&client_oid, "other_bet"));
2675    }
2676
2677    #[rstest]
2678    fn test_ocm_state_cleanup_terminal_with_pending_replace() {
2679        let mut state = OcmState::default();
2680        let client_oid = ClientOrderId::from("O-001");
2681
2682        state.register_customer_order_ref(client_oid);
2683        state
2684            .pending_update_keys
2685            .insert((client_oid, "old_bet".to_string()));
2686
2687        // Should NOT remove refs because replace is pending
2688        state.cleanup_terminal_order(&client_oid);
2689        let rfo = make_customer_order_ref(client_oid.as_str());
2690        assert!(state.resolve_client_order_id(Some(&rfo)).is_some());
2691    }
2692
2693    #[rstest]
2694    fn test_ocm_state_cleanup_terminal_without_pending() {
2695        let mut state = OcmState::default();
2696        let client_oid = ClientOrderId::from("O-001");
2697
2698        state.register_customer_order_ref(client_oid);
2699
2700        // Should remove refs because no pending replace
2701        state.cleanup_terminal_order(&client_oid);
2702        let rfo = make_customer_order_ref(client_oid.as_str());
2703        assert!(state.resolve_client_order_id(Some(&rfo)).is_none());
2704    }
2705
2706    #[rstest]
2707    fn test_ocm_state_sync_from_orders() {
2708        let mut state = OcmState::default();
2709
2710        let orders = vec![
2711            (
2712                "bet1".to_string(),
2713                ClientOrderId::from("O-001"),
2714                Decimal::new(10, 0),
2715                Decimal::new(25, 1),
2716                false,
2717            ),
2718            (
2719                "bet2".to_string(),
2720                ClientOrderId::from("O-002"),
2721                Decimal::new(5, 0),
2722                Decimal::new(30, 1),
2723                true,
2724            ),
2725        ];
2726
2727        state.sync_from_orders(&orders);
2728
2729        // Open order: should have customer_order_ref registered
2730        let rfo1 = make_customer_order_ref("O-001");
2731        assert!(state.resolve_client_order_id(Some(&rfo1)).is_some());
2732
2733        // Closed order: should be in terminal_orders, no customer_order_ref
2734        assert!(state.terminal_orders.contains("bet2"));
2735        let rfo2 = make_customer_order_ref("O-002");
2736        assert!(state.resolve_client_order_id(Some(&rfo2)).is_none());
2737    }
2738
2739    #[rstest]
2740    fn test_reconnect_signal_not_sent_on_initial_connection() {
2741        let has_initial_connection = Arc::new(AtomicBool::new(false));
2742        let (reconnect_tx, mut reconnect_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
2743
2744        let has_initial = Arc::clone(&has_initial_connection);
2745        let handler = move |_data: &[u8]| {
2746            if has_initial.swap(true, Ordering::SeqCst) {
2747                let _ = reconnect_tx.send(());
2748            }
2749        };
2750
2751        // First connection message: no signal
2752        handler(br#"{"op":"connection","connectionId":"abc"}"#);
2753        assert!(reconnect_rx.try_recv().is_err());
2754        assert!(has_initial_connection.load(Ordering::SeqCst));
2755    }
2756
2757    #[rstest]
2758    fn test_reconnect_signal_sent_on_subsequent_connection() {
2759        let has_initial_connection = Arc::new(AtomicBool::new(false));
2760        let (reconnect_tx, mut reconnect_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
2761
2762        let has_initial = Arc::clone(&has_initial_connection);
2763        let tx = reconnect_tx;
2764        let handler = move |_data: &[u8]| {
2765            if has_initial.swap(true, Ordering::SeqCst) {
2766                let _ = tx.send(());
2767            }
2768        };
2769
2770        // First connection: no signal
2771        handler(br#"{"op":"connection","connectionId":"abc"}"#);
2772        assert!(reconnect_rx.try_recv().is_err());
2773
2774        // Second connection: signal sent
2775        handler(br#"{"op":"connection","connectionId":"def"}"#);
2776        assert!(reconnect_rx.try_recv().is_ok());
2777
2778        // Third connection: signal sent again
2779        handler(br#"{"op":"connection","connectionId":"ghi"}"#);
2780        assert!(reconnect_rx.try_recv().is_ok());
2781    }
2782
2783    #[rstest]
2784    fn test_ocm_state_persists_across_reconnections() {
2785        let ocm_state = Arc::new(Mutex::new(OcmState::default()));
2786
2787        // Populate state before "reconnect"
2788        {
2789            let mut state = ocm_state.lock().unwrap();
2790            let orders = vec![
2791                (
2792                    "bet1".to_string(),
2793                    ClientOrderId::from("O-001"),
2794                    Decimal::new(10, 0),
2795                    Decimal::new(25, 1),
2796                    false,
2797                ),
2798                (
2799                    "bet2".to_string(),
2800                    ClientOrderId::from("O-002"),
2801                    Decimal::ZERO,
2802                    Decimal::ZERO,
2803                    true,
2804                ),
2805            ];
2806            state.sync_from_orders(&orders);
2807        }
2808
2809        // Verify state survives (simulates reconnection where Arc<Mutex<OcmState>> persists)
2810        let state = ocm_state.lock().unwrap();
2811        let rfo = make_customer_order_ref("O-001");
2812        assert_eq!(
2813            state.resolve_client_order_id(Some(&rfo)),
2814            Some(ClientOrderId::from("O-001")),
2815        );
2816        assert!(state.terminal_orders.contains("bet2"));
2817        assert!(!state.terminal_orders.contains("bet1"));
2818    }
2819
2820    #[rstest]
2821    fn test_ocm_state_sync_from_orders_populates_fill_tracker() {
2822        let mut state = OcmState::default();
2823
2824        let orders = vec![(
2825            "bet_fill".to_string(),
2826            ClientOrderId::from("O-FILL-001"),
2827            Decimal::new(15, 0),
2828            Decimal::new(30, 1),
2829            false,
2830        )];
2831
2832        state.sync_from_orders(&orders);
2833
2834        // Fill tracker should be pre-populated so that a stream update with
2835        // sm=15 does NOT produce a duplicate fill
2836        let uo = crate::stream::messages::UnmatchedOrder {
2837            id: "bet_fill".to_string(),
2838            p: Decimal::new(30, 1),
2839            s: Decimal::new(20, 0),
2840            side: crate::common::enums::StreamingSide::Back,
2841            status: crate::common::enums::StreamingOrderStatus::Executable,
2842            pt: Some(crate::common::enums::StreamingPersistenceType::Lapse),
2843            ot: crate::common::enums::StreamingOrderType::Limit,
2844            pd: 1617863365000,
2845            bsp: None,
2846            rfo: Some("O-FILL-001".to_string()),
2847            rfs: None,
2848            rc: None,
2849            rac: None,
2850            md: None,
2851            cd: None,
2852            ld: None,
2853            avp: Some(Decimal::new(30, 1)),
2854            sm: Some(Decimal::new(15, 0)),
2855            sr: None,
2856            sl: None,
2857            sc: None,
2858            sv: None,
2859            lsrc: None,
2860        };
2861
2862        let instrument_id = InstrumentId::from("1.234567-12345-0.0.BETFAIR");
2863        let result = state.fill_tracker.maybe_fill_report(
2864            &uo,
2865            uo.s,
2866            instrument_id,
2867            AccountId::from("BETFAIR-001"),
2868            Currency::from("GBP"),
2869            UnixNanos::default(),
2870            UnixNanos::default(),
2871        );
2872
2873        assert!(
2874            result.is_none(),
2875            "synced fill should prevent duplicate fill report"
2876        );
2877    }
2878
2879    #[rstest]
2880    fn test_ocm_state_sync_from_orders_incremental_fill_after_sync() {
2881        let mut state = OcmState::default();
2882
2883        let orders = vec![(
2884            "bet_inc".to_string(),
2885            ClientOrderId::from("O-INC-001"),
2886            Decimal::new(10, 0),
2887            Decimal::new(25, 1),
2888            false,
2889        )];
2890
2891        state.sync_from_orders(&orders);
2892
2893        // Stream update with sm=18 (8 more than synced 10)
2894        let uo = crate::stream::messages::UnmatchedOrder {
2895            id: "bet_inc".to_string(),
2896            p: Decimal::new(25, 1),
2897            s: Decimal::new(20, 0),
2898            side: crate::common::enums::StreamingSide::Lay,
2899            status: crate::common::enums::StreamingOrderStatus::Executable,
2900            pt: Some(crate::common::enums::StreamingPersistenceType::Persist),
2901            ot: crate::common::enums::StreamingOrderType::Limit,
2902            pd: 1617863365000,
2903            bsp: None,
2904            rfo: Some("O-INC-001".to_string()),
2905            rfs: None,
2906            rc: None,
2907            rac: None,
2908            md: None,
2909            cd: None,
2910            ld: None,
2911            avp: Some(Decimal::new(26, 1)),
2912            sm: Some(Decimal::new(18, 0)),
2913            sr: None,
2914            sl: None,
2915            sc: None,
2916            sv: None,
2917            lsrc: None,
2918        };
2919
2920        let instrument_id = InstrumentId::from("1.234567-12345-0.0.BETFAIR");
2921        let result = state.fill_tracker.maybe_fill_report(
2922            &uo,
2923            uo.s,
2924            instrument_id,
2925            AccountId::from("BETFAIR-001"),
2926            Currency::from("GBP"),
2927            UnixNanos::default(),
2928            UnixNanos::default(),
2929        );
2930
2931        let fill = result.expect("should produce incremental fill of 8");
2932        assert_eq!(fill.last_qty, Quantity::from("8.00"));
2933    }
2934
2935    #[rstest]
2936    fn test_ocm_state_sync_from_orders_zero_filled_not_synced() {
2937        let mut state = OcmState::default();
2938
2939        let orders = vec![(
2940            "bet_zero".to_string(),
2941            ClientOrderId::from("O-ZERO-001"),
2942            Decimal::ZERO,
2943            Decimal::ZERO,
2944            false,
2945        )];
2946
2947        state.sync_from_orders(&orders);
2948
2949        // RFO should still be registered even if no fills
2950        let rfo = make_customer_order_ref("O-ZERO-001");
2951        assert!(state.resolve_client_order_id(Some(&rfo)).is_some());
2952
2953        // A stream update with sm=5 should produce a fill (not blocked by sync)
2954        let uo = crate::stream::messages::UnmatchedOrder {
2955            id: "bet_zero".to_string(),
2956            p: Decimal::new(30, 1),
2957            s: Decimal::new(10, 0),
2958            side: crate::common::enums::StreamingSide::Back,
2959            status: crate::common::enums::StreamingOrderStatus::Executable,
2960            pt: Some(crate::common::enums::StreamingPersistenceType::Lapse),
2961            ot: crate::common::enums::StreamingOrderType::Limit,
2962            pd: 1617863365000,
2963            bsp: None,
2964            rfo: None,
2965            rfs: None,
2966            rc: None,
2967            rac: None,
2968            md: None,
2969            cd: None,
2970            ld: None,
2971            avp: Some(Decimal::new(30, 1)),
2972            sm: Some(Decimal::new(5, 0)),
2973            sr: None,
2974            sl: None,
2975            sc: None,
2976            sv: None,
2977            lsrc: None,
2978        };
2979        let instrument_id = InstrumentId::from("1.234567-12345-0.0.BETFAIR");
2980        let result = state.fill_tracker.maybe_fill_report(
2981            &uo,
2982            uo.s,
2983            instrument_id,
2984            AccountId::from("BETFAIR-001"),
2985            Currency::from("GBP"),
2986            UnixNanos::default(),
2987            UnixNanos::default(),
2988        );
2989        assert!(
2990            result.is_some(),
2991            "zero-filled order should not block new fills"
2992        );
2993    }
2994
2995    #[rstest]
2996    fn test_ocm_state_sync_multiple_open_and_closed() {
2997        let mut state = OcmState::default();
2998
2999        let orders = vec![
3000            (
3001                "bet_a".to_string(),
3002                ClientOrderId::from("O-A"),
3003                Decimal::new(5, 0),
3004                Decimal::new(20, 1),
3005                false,
3006            ),
3007            (
3008                "bet_b".to_string(),
3009                ClientOrderId::from("O-B"),
3010                Decimal::ZERO,
3011                Decimal::ZERO,
3012                true,
3013            ),
3014            (
3015                "bet_c".to_string(),
3016                ClientOrderId::from("O-C"),
3017                Decimal::new(100, 0),
3018                Decimal::new(15, 1),
3019                true,
3020            ),
3021            (
3022                "bet_d".to_string(),
3023                ClientOrderId::from("O-D"),
3024                Decimal::ZERO,
3025                Decimal::ZERO,
3026                false,
3027            ),
3028        ];
3029
3030        state.sync_from_orders(&orders);
3031
3032        // Open orders have RFO registered
3033        assert!(
3034            state
3035                .resolve_client_order_id(Some(&make_customer_order_ref("O-A")))
3036                .is_some()
3037        );
3038        assert!(
3039            state
3040                .resolve_client_order_id(Some(&make_customer_order_ref("O-D")))
3041                .is_some()
3042        );
3043
3044        // Closed orders are terminal
3045        assert!(state.terminal_orders.contains("bet_b"));
3046        assert!(state.terminal_orders.contains("bet_c"));
3047        assert!(!state.terminal_orders.contains("bet_a"));
3048        assert!(!state.terminal_orders.contains("bet_d"));
3049
3050        // Closed orders do NOT get RFO registered
3051        assert!(
3052            state
3053                .resolve_client_order_id(Some(&make_customer_order_ref("O-B")))
3054                .is_none()
3055        );
3056    }
3057
3058    fn make_summary(
3059        bet_id: &str,
3060        market_id: &str,
3061        selection_id: u64,
3062        handicap: Decimal,
3063        status: BetfairOrderStatus,
3064        placed_date: &str,
3065    ) -> CurrentOrderSummary {
3066        CurrentOrderSummary {
3067            bet_id: bet_id.to_string(),
3068            market_id: market_id.to_string(),
3069            selection_id,
3070            handicap,
3071            price_size: crate::http::models::PriceSize {
3072                price: Decimal::new(20, 1),
3073                size: Decimal::new(10, 0),
3074            },
3075            bsp_liability: Decimal::ZERO,
3076            side: BetfairSide::Back,
3077            status,
3078            persistence_type: PersistenceType::Lapse,
3079            order_type: BetfairOrderType::Limit,
3080            placed_date: placed_date.to_string(),
3081            matched_date: None,
3082            average_price_matched: None,
3083            size_matched: None,
3084            size_remaining: Some(Decimal::new(10, 0)),
3085            size_lapsed: None,
3086            size_cancelled: None,
3087            size_voided: None,
3088            regulator_auth_code: None,
3089            regulator_code: None,
3090            customer_order_ref: None,
3091            customer_strategy_ref: None,
3092        }
3093    }
3094
3095    #[rstest]
3096    fn test_select_order_for_query_single_executable() {
3097        let cid = ClientOrderId::from("O-001");
3098        let orders = vec![make_summary(
3099            "bet_1",
3100            "1.100",
3101            12345,
3102            Decimal::ZERO,
3103            BetfairOrderStatus::Executable,
3104            "2026-04-18T10:00:00Z",
3105        )];
3106        let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3107
3108        let selected = select_order_for_query(&orders, expected, cid, None);
3109        assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_1"));
3110    }
3111
3112    #[rstest]
3113    fn test_select_order_for_query_single_terminal() {
3114        let cid = ClientOrderId::from("O-001");
3115        let orders = vec![make_summary(
3116            "bet_1",
3117            "1.100",
3118            12345,
3119            Decimal::ZERO,
3120            BetfairOrderStatus::ExecutionComplete,
3121            "2026-04-18T10:00:00Z",
3122        )];
3123        let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3124
3125        let selected = select_order_for_query(&orders, expected, cid, None);
3126        assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_1"));
3127    }
3128
3129    #[rstest]
3130    fn test_select_order_for_query_replace_prefers_executable() {
3131        let cid = ClientOrderId::from("O-001");
3132        let orders = vec![
3133            make_summary(
3134                "bet_old",
3135                "1.100",
3136                12345,
3137                Decimal::ZERO,
3138                BetfairOrderStatus::ExecutionComplete,
3139                "2026-04-18T10:00:00Z",
3140            ),
3141            make_summary(
3142                "bet_new",
3143                "1.100",
3144                12345,
3145                Decimal::ZERO,
3146                BetfairOrderStatus::Executable,
3147                "2026-04-18T10:05:00Z",
3148            ),
3149        ];
3150        let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3151
3152        let selected = select_order_for_query(&orders, expected, cid, None);
3153        assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_new"));
3154    }
3155
3156    #[rstest]
3157    fn test_select_order_for_query_multiple_executable_prefers_most_recent() {
3158        let cid = ClientOrderId::from("O-001");
3159        let orders = vec![
3160            make_summary(
3161                "bet_old",
3162                "1.100",
3163                12345,
3164                Decimal::ZERO,
3165                BetfairOrderStatus::Executable,
3166                "2026-04-18T10:00:00Z",
3167            ),
3168            make_summary(
3169                "bet_new",
3170                "1.100",
3171                12345,
3172                Decimal::ZERO,
3173                BetfairOrderStatus::Executable,
3174                "2026-04-18T10:05:00Z",
3175            ),
3176        ];
3177        let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3178
3179        let selected = select_order_for_query(&orders, expected, cid, None);
3180        assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_new"));
3181    }
3182
3183    #[rstest]
3184    fn test_select_order_for_query_multiple_terminal_prefers_most_recent() {
3185        let cid = ClientOrderId::from("O-001");
3186        let orders = vec![
3187            make_summary(
3188                "bet_old",
3189                "1.100",
3190                12345,
3191                Decimal::ZERO,
3192                BetfairOrderStatus::ExecutionComplete,
3193                "2026-04-18T10:00:00Z",
3194            ),
3195            make_summary(
3196                "bet_new",
3197                "1.100",
3198                12345,
3199                Decimal::ZERO,
3200                BetfairOrderStatus::ExecutionComplete,
3201                "2026-04-18T10:05:00Z",
3202            ),
3203        ];
3204        let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3205
3206        let selected = select_order_for_query(&orders, expected, cid, None);
3207        assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_new"));
3208    }
3209
3210    #[rstest]
3211    fn test_select_order_for_query_foreign_only_without_vid_returns_none() {
3212        let cid = ClientOrderId::from("O-001");
3213        let orders = vec![make_summary(
3214            "bet_foreign",
3215            "1.999",
3216            99999,
3217            Decimal::ZERO,
3218            BetfairOrderStatus::Executable,
3219            "2026-04-18T10:00:00Z",
3220        )];
3221        let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3222
3223        let selected = select_order_for_query(&orders, expected, cid, None);
3224        assert!(selected.is_none());
3225    }
3226
3227    #[rstest]
3228    fn test_select_order_for_query_foreign_only_with_vid_match_returns_match() {
3229        let cid = ClientOrderId::from("O-001");
3230        let orders = vec![make_summary(
3231            "bet_foreign",
3232            "1.999",
3233            99999,
3234            Decimal::ZERO,
3235            BetfairOrderStatus::Executable,
3236            "2026-04-18T10:00:00Z",
3237        )];
3238        let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3239        let vid = VenueOrderId::from("bet_foreign");
3240
3241        let selected = select_order_for_query(&orders, expected, cid, Some(vid));
3242        assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_foreign"));
3243    }
3244
3245    #[rstest]
3246    fn test_select_order_for_query_foreign_only_vid_mismatch_returns_none() {
3247        let cid = ClientOrderId::from("O-001");
3248        let orders = vec![
3249            make_summary(
3250                "bet_foreign_1",
3251                "1.999",
3252                99999,
3253                Decimal::ZERO,
3254                BetfairOrderStatus::Executable,
3255                "2026-04-18T10:00:00Z",
3256            ),
3257            make_summary(
3258                "bet_foreign_2",
3259                "1.888",
3260                88888,
3261                Decimal::ZERO,
3262                BetfairOrderStatus::Executable,
3263                "2026-04-18T10:05:00Z",
3264            ),
3265        ];
3266        let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3267        let vid = VenueOrderId::from("bet_unknown");
3268
3269        let selected = select_order_for_query(&orders, expected, cid, Some(vid));
3270        assert!(selected.is_none());
3271    }
3272
3273    #[rstest]
3274    fn test_select_order_for_query_mixed_returns_matching_instrument() {
3275        let cid = ClientOrderId::from("O-001");
3276        let orders = vec![
3277            make_summary(
3278                "bet_foreign",
3279                "1.999",
3280                99999,
3281                Decimal::ZERO,
3282                BetfairOrderStatus::Executable,
3283                "2026-04-18T10:05:00Z",
3284            ),
3285            make_summary(
3286                "bet_match",
3287                "1.100",
3288                12345,
3289                Decimal::ZERO,
3290                BetfairOrderStatus::ExecutionComplete,
3291                "2026-04-18T10:00:00Z",
3292            ),
3293        ];
3294        let expected = make_instrument_id("1.100", 12345, Decimal::ZERO);
3295
3296        let selected = select_order_for_query(&orders, expected, cid, None);
3297        assert_eq!(selected.map(|o| o.bet_id.as_str()), Some("bet_match"));
3298    }
3299
3300    #[rstest]
3301    fn test_extend_unique_filters_duplicates() {
3302        let mut candidates: Vec<CurrentOrderSummary> = Vec::new();
3303        let mut seen: AHashSet<String> = AHashSet::new();
3304
3305        let orders = vec![
3306            make_summary(
3307                "bet_1",
3308                "1.100",
3309                12345,
3310                Decimal::ZERO,
3311                BetfairOrderStatus::Executable,
3312                "2026-04-18T10:00:00Z",
3313            ),
3314            make_summary(
3315                "bet_1",
3316                "1.100",
3317                12345,
3318                Decimal::ZERO,
3319                BetfairOrderStatus::Executable,
3320                "2026-04-18T10:01:00Z",
3321            ),
3322            make_summary(
3323                "bet_2",
3324                "1.100",
3325                12345,
3326                Decimal::ZERO,
3327                BetfairOrderStatus::Executable,
3328                "2026-04-18T10:02:00Z",
3329            ),
3330        ];
3331
3332        extend_unique(&mut candidates, &mut seen, orders);
3333
3334        assert_eq!(candidates.len(), 2);
3335        assert_eq!(candidates[0].bet_id, "bet_1");
3336        assert_eq!(candidates[0].placed_date, "2026-04-18T10:00:00Z");
3337        assert_eq!(candidates[1].bet_id, "bet_2");
3338        assert!(seen.contains("bet_1"));
3339        assert!(seen.contains("bet_2"));
3340    }
3341
3342    #[rstest]
3343    fn test_extend_unique_skips_already_seen() {
3344        let mut candidates: Vec<CurrentOrderSummary> = vec![make_summary(
3345            "bet_1",
3346            "1.100",
3347            12345,
3348            Decimal::ZERO,
3349            BetfairOrderStatus::Executable,
3350            "2026-04-18T10:00:00Z",
3351        )];
3352        let mut seen: AHashSet<String> = AHashSet::new();
3353        seen.insert("bet_1".to_string());
3354
3355        let orders = vec![make_summary(
3356            "bet_1",
3357            "1.100",
3358            12345,
3359            Decimal::ZERO,
3360            BetfairOrderStatus::Executable,
3361            "2026-04-18T10:05:00Z",
3362        )];
3363
3364        extend_unique(&mut candidates, &mut seen, orders);
3365
3366        assert_eq!(candidates.len(), 1);
3367        assert_eq!(candidates[0].placed_date, "2026-04-18T10:00:00Z");
3368    }
3369
3370    #[rstest]
3371    fn test_list_current_orders_filter_bet_id_sets_only_bet_ids() {
3372        let params = list_current_orders_filter_bet_id("bet_abc".to_string());
3373
3374        assert_eq!(
3375            params.bet_ids.as_deref(),
3376            Some(&["bet_abc".to_string()][..])
3377        );
3378        assert!(params.customer_order_refs.is_none());
3379        assert!(params.market_ids.is_none());
3380        assert!(params.order_projection.is_none());
3381        assert!(params.customer_strategy_refs.is_none());
3382        assert!(params.date_range.is_none());
3383        assert!(params.order_by.is_none());
3384        assert!(params.sort_dir.is_none());
3385        assert!(params.from_record.is_none());
3386        assert!(params.record_count.is_none());
3387    }
3388
3389    #[rstest]
3390    fn test_list_current_orders_filter_ref_sets_only_customer_order_refs() {
3391        let params = list_current_orders_filter_ref("rfo_abc".to_string());
3392
3393        assert_eq!(
3394            params.customer_order_refs.as_deref(),
3395            Some(&["rfo_abc".to_string()][..])
3396        );
3397        assert!(params.bet_ids.is_none());
3398        assert!(params.market_ids.is_none());
3399        assert!(params.order_projection.is_none());
3400        assert!(params.customer_strategy_refs.is_none());
3401        assert!(params.date_range.is_none());
3402        assert!(params.order_by.is_none());
3403        assert!(params.sort_dir.is_none());
3404        assert!(params.from_record.is_none());
3405        assert!(params.record_count.is_none());
3406    }
3407}