Skip to main content

nautilus_betfair/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client for the Betfair adapter.
17
18use std::sync::{
19    Arc, Mutex,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use async_trait::async_trait;
25use nautilus_common::{
26    clients::DataClient,
27    live::{get_runtime, runner::get_data_event_sender},
28    messages::{
29        DataEvent,
30        data::{
31            SubscribeBookDeltas, SubscribeInstrumentStatus, SubscribeTrades, UnsubscribeBookDeltas,
32            UnsubscribeInstrumentStatus, UnsubscribeTrades,
33        },
34    },
35    providers::InstrumentProvider,
36};
37use nautilus_core::{AtomicMap, Params};
38use nautilus_model::{
39    data::{
40        CustomData, CustomDataTrait, Data, DataType, OrderBookDeltas, OrderBookDeltas_API,
41        TradeTick,
42    },
43    identifiers::{ClientId, InstrumentId, TradeId, Venue},
44    instruments::{Instrument, InstrumentAny},
45    types::{Currency, Money, Price, Quantity},
46};
47use nautilus_network::socket::TcpMessageHandler;
48use rust_decimal::Decimal;
49use tokio::task::JoinHandle;
50
51use crate::{
52    common::{
53        consts::{
54            BETFAIR_PRICE_PRECISION, BETFAIR_QUANTITY_PRECISION, BETFAIR_RACE_STREAM_HOST,
55            BETFAIR_VENUE,
56        },
57        credential::BetfairCredential,
58        enums::{MarketDataFilterField, MarketStatus},
59        parse::{
60            extract_market_id, make_instrument_id, parse_market_definition, parse_millis_timestamp,
61        },
62    },
63    config::BetfairDataConfig,
64    data_types::{BetfairSequenceCompleted, register_betfair_custom_data},
65    http::client::BetfairHttpClient,
66    provider::{BetfairInstrumentProvider, NavigationFilter},
67    stream::{
68        client::{BetfairRaceStreamClient, BetfairStreamClient},
69        config::BetfairStreamConfig,
70        messages::{MarketDataFilter, StreamMarketFilter, StreamMessage, stream_decode},
71        parse::{
72            make_trade_tick, parse_betfair_starting_prices, parse_betfair_ticker,
73            parse_bsp_book_deltas, parse_instrument_closes, parse_instrument_statuses,
74            parse_race_progress, parse_race_runner_data, parse_runner_book_deltas,
75        },
76    },
77};
78
79/// Keep-alive interval in seconds (10 hours, matching Python default).
80const KEEP_ALIVE_INTERVAL_SECS: u64 = 36_000;
81
82/// Wraps a custom data value with its instrument_id in both metadata (for
83/// topic routing) and identifier (for catalog partitioning).
84pub(crate) fn custom_data_with_instrument(
85    value: Arc<dyn CustomDataTrait>,
86    instrument_id: InstrumentId,
87) -> CustomData {
88    let mut metadata = Params::new();
89    metadata.insert(
90        "instrument_id".to_string(),
91        serde_json::Value::String(instrument_id.to_string()),
92    );
93    let data_type = DataType::new(
94        value.type_name(),
95        Some(metadata),
96        Some(instrument_id.to_string()),
97    );
98    CustomData::new(value, data_type)
99}
100
101/// Betfair live data client.
102#[derive(Debug)]
103pub struct BetfairDataClient {
104    client_id: ClientId,
105    http_client: Arc<BetfairHttpClient>,
106    provider: BetfairInstrumentProvider,
107    stream_client: Option<Arc<BetfairStreamClient>>,
108    race_stream_client: Option<Arc<BetfairRaceStreamClient>>,
109    credential: BetfairCredential,
110    stream_config: BetfairStreamConfig,
111    config: BetfairDataConfig,
112    currency: Currency,
113    is_connected: AtomicBool,
114    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
115    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
116    subscribed_market_ids: AHashSet<String>,
117    keep_alive_handle: Option<JoinHandle<()>>,
118    reconnect_handle: Option<JoinHandle<()>>,
119    race_fatal_handle: Option<JoinHandle<()>>,
120}
121
122impl BetfairDataClient {
123    /// Creates a new [`BetfairDataClient`] instance.
124    #[must_use]
125    #[expect(clippy::too_many_arguments)]
126    pub fn new(
127        client_id: ClientId,
128        http_client: BetfairHttpClient,
129        credential: BetfairCredential,
130        stream_config: BetfairStreamConfig,
131        config: BetfairDataConfig,
132        nav_filter: NavigationFilter,
133        currency: Currency,
134        min_notional: Option<Money>,
135    ) -> Self {
136        let data_sender = get_data_event_sender();
137        let http_client = Arc::new(http_client);
138        let provider = BetfairInstrumentProvider::new(
139            Arc::clone(&http_client),
140            nav_filter,
141            currency,
142            min_notional,
143        );
144
145        Self {
146            client_id,
147            http_client,
148            provider,
149            stream_client: None,
150            race_stream_client: None,
151            credential,
152            stream_config,
153            config,
154            currency,
155            is_connected: AtomicBool::new(false),
156            data_sender,
157            instruments: Arc::new(AtomicMap::new()),
158            subscribed_market_ids: AHashSet::new(),
159            keep_alive_handle: None,
160            reconnect_handle: None,
161            race_fatal_handle: None,
162        }
163    }
164
165    fn create_stream_handler(
166        data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
167        instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
168        currency: Currency,
169        min_notional: Option<Money>,
170        reconnect_tx: tokio::sync::mpsc::UnboundedSender<()>,
171    ) -> TcpMessageHandler {
172        // Track cumulative traded volumes per (instrument_id, price) to compute
173        // incremental trade sizes. Betfair `trd` fields report totals, not deltas.
174        let traded_volumes: Arc<Mutex<AHashMap<(InstrumentId, Decimal), Decimal>>> =
175            Arc::new(Mutex::new(AHashMap::new()));
176        let has_initial_connection = Arc::new(AtomicBool::new(false));
177
178        Arc::new(move |data: &[u8]| {
179            let msg = match stream_decode(data) {
180                Ok(msg) => msg,
181                Err(e) => {
182                    log::warn!("Failed to decode stream message: {e}");
183                    return;
184                }
185            };
186
187            match msg {
188                StreamMessage::MarketChange(mcm) => {
189                    if mcm.is_heartbeat() {
190                        return;
191                    }
192
193                    let Some(market_changes) = &mcm.mc else {
194                        return;
195                    };
196
197                    let ts_event = parse_millis_timestamp(mcm.pt);
198                    let ts_init = ts_event;
199
200                    for mc in market_changes {
201                        let is_snapshot = mc.img;
202                        let mut market_closed = false;
203
204                        if let Some(def) = &mc.market_definition {
205                            // Emit instruments first so downstream consumers (DataEngine,
206                            // BacktestExchange) have the instrument cached before any status
207                            // or close event references it.
208                            match parse_market_definition(
209                                &mc.id,
210                                def,
211                                currency,
212                                ts_init,
213                                min_notional,
214                            ) {
215                                Ok(new_instruments) => {
216                                    instruments.rcu(|m| {
217                                        for inst in &new_instruments {
218                                            m.insert(inst.id(), inst.clone());
219                                        }
220                                    });
221
222                                    for inst in new_instruments {
223                                        if let Err(e) =
224                                            data_sender.send(DataEvent::Instrument(inst))
225                                        {
226                                            log::warn!("Failed to send instrument: {e}");
227                                        }
228                                    }
229                                }
230                                Err(e) => {
231                                    log::warn!(
232                                        "Failed to parse market definition for {}: {e}",
233                                        mc.id
234                                    );
235                                }
236                            }
237
238                            if let Some(status) = &def.status {
239                                market_closed = *status == MarketStatus::Closed;
240
241                                for event in
242                                    parse_instrument_statuses(&mc.id, def, ts_event, ts_init)
243                                {
244                                    if let Err(e) =
245                                        data_sender.send(DataEvent::InstrumentStatus(event))
246                                    {
247                                        log::warn!("Failed to send instrument status: {e}");
248                                    }
249                                }
250                            }
251
252                            for sp in parse_betfair_starting_prices(&mc.id, def, ts_event, ts_init)
253                            {
254                                let instrument_id = sp.instrument_id;
255                                let custom =
256                                    custom_data_with_instrument(Arc::new(sp), instrument_id);
257
258                                if let Err(e) =
259                                    data_sender.send(DataEvent::Data(Data::Custom(custom)))
260                                {
261                                    log::warn!("Failed to send starting price: {e}");
262                                }
263                            }
264
265                            for close in parse_instrument_closes(&mc.id, def, ts_event, ts_init) {
266                                if let Err(e) =
267                                    data_sender.send(DataEvent::Data(Data::InstrumentClose(close)))
268                                {
269                                    log::warn!("Failed to send instrument close: {e}");
270                                }
271                            }
272                        }
273
274                        // Non-snapshot deltas and BSP deltas are buffered and flushed after
275                        // trades/tickers to mirror the Python `market_change_to_updates`
276                        // ordering (book deltas first, then BSP). Snapshots go inline.
277                        let mut buffered_deltas: Vec<OrderBookDeltas> = Vec::new();
278                        let mut buffered_bsp_customs: Vec<CustomData> = Vec::new();
279
280                        if let Some(runner_changes) = &mc.rc {
281                            for rc in runner_changes {
282                                let handicap = rc.hc.unwrap_or(Decimal::ZERO);
283                                let instrument_id = make_instrument_id(&mc.id, rc.id, handicap);
284
285                                match parse_runner_book_deltas(
286                                    instrument_id,
287                                    rc,
288                                    is_snapshot,
289                                    mcm.pt,
290                                    ts_event,
291                                    ts_init,
292                                ) {
293                                    Ok(Some(deltas)) => {
294                                        if is_snapshot {
295                                            if let Err(e) = data_sender.send(DataEvent::Data(
296                                                Data::Deltas(OrderBookDeltas_API::new(deltas)),
297                                            )) {
298                                                log::warn!("Failed to send book deltas: {e}");
299                                            }
300                                        } else {
301                                            buffered_deltas.push(deltas);
302                                        }
303                                    }
304                                    Ok(None) => {}
305                                    Err(e) => {
306                                        log::warn!(
307                                            "Failed to parse book deltas for {instrument_id}: {e}"
308                                        );
309                                    }
310                                }
311
312                                if let Some(trades) = &rc.trd {
313                                    let mut volumes = traded_volumes.lock().unwrap();
314
315                                    for pv in trades {
316                                        if pv.volume == Decimal::ZERO {
317                                            continue;
318                                        }
319
320                                        let key = (instrument_id, pv.price);
321                                        let prev_volume =
322                                            volumes.get(&key).copied().unwrap_or(Decimal::ZERO);
323
324                                        if pv.volume <= prev_volume {
325                                            continue;
326                                        }
327
328                                        let trade_volume = pv.volume - prev_volume;
329                                        volumes.insert(key, pv.volume);
330
331                                        let price = match Price::from_decimal_dp(
332                                            pv.price,
333                                            BETFAIR_PRICE_PRECISION,
334                                        ) {
335                                            Ok(p) => p,
336                                            Err(e) => {
337                                                log::warn!("Invalid trade price: {e}");
338                                                continue;
339                                            }
340                                        };
341                                        let size = match Quantity::from_decimal_dp(
342                                            trade_volume,
343                                            BETFAIR_QUANTITY_PRECISION,
344                                        ) {
345                                            Ok(q) => q,
346                                            Err(e) => {
347                                                log::warn!("Invalid trade size: {e}");
348                                                continue;
349                                            }
350                                        };
351                                        let trade_id = TradeId::new(format!(
352                                            "{}-{}-{}",
353                                            mcm.pt, rc.id, pv.price
354                                        ));
355                                        let tick: TradeTick = make_trade_tick(
356                                            instrument_id,
357                                            price,
358                                            size,
359                                            trade_id,
360                                            ts_event,
361                                            ts_init,
362                                        );
363
364                                        if let Err(e) =
365                                            data_sender.send(DataEvent::Data(Data::Trade(tick)))
366                                        {
367                                            log::warn!("Failed to send trade tick: {e}");
368                                        }
369                                    }
370                                }
371
372                                if let Some(ticker) =
373                                    parse_betfair_ticker(instrument_id, rc, ts_event, ts_init)
374                                {
375                                    let custom = custom_data_with_instrument(
376                                        Arc::new(ticker),
377                                        instrument_id,
378                                    );
379
380                                    if let Err(e) =
381                                        data_sender.send(DataEvent::Data(Data::Custom(custom)))
382                                    {
383                                        log::warn!("Failed to send ticker: {e}");
384                                    }
385                                }
386
387                                for bsp_delta in
388                                    parse_bsp_book_deltas(instrument_id, rc, ts_event, ts_init)
389                                {
390                                    buffered_bsp_customs.push(custom_data_with_instrument(
391                                        Arc::new(bsp_delta),
392                                        instrument_id,
393                                    ));
394                                }
395                            }
396                        }
397
398                        for deltas in buffered_deltas {
399                            if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
400                                OrderBookDeltas_API::new(deltas),
401                            ))) {
402                                log::warn!("Failed to send book deltas: {e}");
403                            }
404                        }
405
406                        for custom in buffered_bsp_customs {
407                            if let Err(e) = data_sender.send(DataEvent::Data(Data::Custom(custom)))
408                            {
409                                log::warn!("Failed to send BSP book delta: {e}");
410                            }
411                        }
412
413                        if market_closed {
414                            let prefix = format!("{}-", mc.id);
415
416                            if let Ok(mut volumes) = traded_volumes.lock() {
417                                volumes.retain(|k, _| !k.0.symbol.as_str().starts_with(&prefix));
418                            }
419                        }
420                    }
421
422                    let completed = BetfairSequenceCompleted::new(ts_event, ts_init);
423                    let custom = CustomData::from_arc(Arc::new(completed));
424                    if let Err(e) = data_sender.send(DataEvent::Data(Data::Custom(custom))) {
425                        log::warn!("Failed to send sequence completed: {e}");
426                    }
427                }
428                StreamMessage::Connection(_) => {
429                    if has_initial_connection.swap(true, Ordering::SeqCst) {
430                        log::info!("Betfair data stream reconnected");
431                        let _ = reconnect_tx.send(());
432                    } else {
433                        log::info!("Betfair data stream connected");
434                    }
435                }
436                StreamMessage::Status(status) => {
437                    if status.connection_closed {
438                        log::error!(
439                            "Betfair stream closed: {:?} - {:?}",
440                            status.error_code,
441                            status.error_message,
442                        );
443                    }
444                }
445                StreamMessage::RaceChange(rcm) => {
446                    if let Some(race_changes) = &rcm.rc {
447                        let fallback_ts = parse_millis_timestamp(rcm.pt);
448
449                        for rc in race_changes {
450                            let race_id = rc.id.as_deref().unwrap_or("");
451                            let market_id = rc.mid.as_deref().unwrap_or("");
452
453                            if let Some(runners) = &rc.rrc {
454                                for rrc in runners {
455                                    let ts_event =
456                                        rrc.ft.map_or(fallback_ts, parse_millis_timestamp);
457
458                                    if let Some(runner) = parse_race_runner_data(
459                                        race_id, market_id, rrc, ts_event, ts_event,
460                                    ) {
461                                        let selection_id = rrc.id.unwrap_or(0);
462                                        let mut metadata = Params::new();
463                                        metadata.insert(
464                                            "selection_id".to_string(),
465                                            serde_json::Value::Number(selection_id.into()),
466                                        );
467                                        let value: Arc<dyn CustomDataTrait> = Arc::new(runner);
468                                        let data_type =
469                                            DataType::new(value.type_name(), Some(metadata), None);
470                                        let custom = CustomData::new(value, data_type);
471
472                                        if let Err(e) =
473                                            data_sender.send(DataEvent::Data(Data::Custom(custom)))
474                                        {
475                                            log::warn!("Failed to send race runner data: {e}");
476                                        }
477                                    }
478                                }
479                            }
480
481                            if let Some(rpc) = &rc.rpc {
482                                let ts_event = rpc.ft.map_or(fallback_ts, parse_millis_timestamp);
483                                let progress = parse_race_progress(
484                                    race_id, market_id, rpc, ts_event, ts_event,
485                                );
486                                let mut metadata = Params::new();
487                                metadata.insert(
488                                    "race_id".to_string(),
489                                    serde_json::Value::String(race_id.to_string()),
490                                );
491                                let value: Arc<dyn CustomDataTrait> = Arc::new(progress);
492                                let data_type =
493                                    DataType::new(value.type_name(), Some(metadata), None);
494                                let custom = CustomData::new(value, data_type);
495
496                                if let Err(e) =
497                                    data_sender.send(DataEvent::Data(Data::Custom(custom)))
498                                {
499                                    log::warn!("Failed to send race progress: {e}");
500                                }
501                            }
502                        }
503                    }
504                }
505                StreamMessage::OrderChange(_) => {}
506            }
507        })
508    }
509}
510
511#[async_trait(?Send)]
512impl DataClient for BetfairDataClient {
513    fn client_id(&self) -> ClientId {
514        self.client_id
515    }
516
517    fn venue(&self) -> Option<Venue> {
518        Some(*BETFAIR_VENUE)
519    }
520
521    fn start(&mut self) -> anyhow::Result<()> {
522        log::info!("Starting Betfair data client: {}", self.client_id);
523        Ok(())
524    }
525
526    fn stop(&mut self) -> anyhow::Result<()> {
527        log::info!("Stopping Betfair data client: {}", self.client_id);
528
529        if let Some(handle) = self.keep_alive_handle.take() {
530            handle.abort();
531        }
532
533        if let Some(handle) = self.reconnect_handle.take() {
534            handle.abort();
535        }
536
537        if let Some(handle) = self.race_fatal_handle.take() {
538            handle.abort();
539        }
540        self.is_connected.store(false, Ordering::Relaxed);
541        Ok(())
542    }
543
544    fn reset(&mut self) -> anyhow::Result<()> {
545        log::info!("Resetting Betfair data client: {}", self.client_id);
546
547        if let Some(handle) = self.keep_alive_handle.take() {
548            handle.abort();
549        }
550
551        if let Some(handle) = self.reconnect_handle.take() {
552            handle.abort();
553        }
554
555        if let Some(handle) = self.race_fatal_handle.take() {
556            handle.abort();
557        }
558        self.is_connected.store(false, Ordering::Relaxed);
559        self.stream_client = None;
560        self.race_stream_client = None;
561        self.provider.store_mut().clear();
562        self.subscribed_market_ids.clear();
563
564        self.instruments.store(AHashMap::new());
565        Ok(())
566    }
567
568    fn dispose(&mut self) -> anyhow::Result<()> {
569        log::info!("Disposing Betfair data client: {}", self.client_id);
570        self.stop()
571    }
572
573    fn is_connected(&self) -> bool {
574        self.is_connected.load(Ordering::SeqCst)
575    }
576
577    fn is_disconnected(&self) -> bool {
578        !self.is_connected()
579    }
580
581    async fn connect(&mut self) -> anyhow::Result<()> {
582        if self.is_connected() {
583            return Ok(());
584        }
585
586        register_betfair_custom_data();
587
588        self.http_client
589            .connect()
590            .await
591            .map_err(|e| anyhow::anyhow!("{e}"))?;
592
593        self.provider.load_all(None).await?;
594
595        let loaded: Vec<InstrumentAny> = self
596            .provider
597            .store()
598            .list_all()
599            .into_iter()
600            .cloned()
601            .collect();
602
603        self.instruments.rcu(|m| {
604            for inst in &loaded {
605                m.insert(inst.id(), inst.clone());
606            }
607        });
608
609        for inst in &loaded {
610            if let Err(e) = self.data_sender.send(DataEvent::Instrument(inst.clone())) {
611                log::warn!("Failed to send instrument: {e}");
612            }
613        }
614
615        log::info!("Cached {} instruments for {}", loaded.len(), self.client_id,);
616
617        let session_token = self
618            .http_client
619            .session_token()
620            .await
621            .ok_or_else(|| anyhow::anyhow!("No session token after login"))?;
622
623        let (reconnect_tx, mut reconnect_rx) = tokio::sync::mpsc::unbounded_channel();
624
625        let handler = Self::create_stream_handler(
626            self.data_sender.clone(),
627            Arc::clone(&self.instruments),
628            self.currency,
629            self.provider.min_notional(),
630            reconnect_tx.clone(),
631        );
632
633        let stream_client = BetfairStreamClient::connect(
634            &self.credential,
635            session_token,
636            handler,
637            self.stream_config.clone(),
638        )
639        .await
640        .map_err(|e| anyhow::anyhow!("{e}"))?;
641
642        self.stream_client = Some(Arc::new(stream_client));
643
644        if self.config.subscribe_race_data {
645            let race_config = BetfairStreamConfig {
646                host: BETFAIR_RACE_STREAM_HOST.to_string(),
647                ..self.stream_config.clone()
648            };
649
650            let race_session = self
651                .http_client
652                .session_token()
653                .await
654                .ok_or_else(|| anyhow::anyhow!("No session token for race stream"))?;
655
656            let race_handler = Self::create_stream_handler(
657                self.data_sender.clone(),
658                Arc::clone(&self.instruments),
659                self.currency,
660                self.provider.min_notional(),
661                reconnect_tx.clone(),
662            );
663
664            let (race_fatal_tx, mut race_fatal_rx) = tokio::sync::mpsc::unbounded_channel();
665
666            match BetfairRaceStreamClient::connect(
667                &self.credential,
668                race_session,
669                race_handler,
670                race_config,
671                race_fatal_tx,
672            )
673            .await
674            {
675                Ok(client) => {
676                    let race_client = Arc::new(client);
677                    self.race_stream_client = Some(Arc::clone(&race_client));
678
679                    if let Some(handle) = self.race_fatal_handle.take() {
680                        handle.abort();
681                    }
682
683                    self.race_fatal_handle = Some(get_runtime().spawn(async move {
684                        if race_fatal_rx.recv().await.is_some() {
685                            log::error!(
686                                "Betfair race stream permanently disabled due to fatal error"
687                            );
688                            race_client.close().await;
689                        }
690                    }));
691
692                    log::info!("Betfair race stream connected");
693                }
694                Err(e) => {
695                    log::warn!("Betfair race stream connect failed: {e}");
696                    self.race_stream_client = None;
697                }
698            }
699        }
700
701        // Abort any existing keep-alive task before spawning a new one
702        if let Some(handle) = self.keep_alive_handle.take() {
703            handle.abort();
704        }
705
706        // Spawn periodic keep-alive to prevent session expiry
707        let keep_alive_client = Arc::clone(&self.http_client);
708        let keep_alive_stream = Arc::clone(self.stream_client.as_ref().unwrap());
709        let keep_alive_race_stream = self.race_stream_client.as_ref().map(Arc::clone);
710        let keep_alive_app_key = self.credential.app_key().to_string();
711
712        self.keep_alive_handle = Some(get_runtime().spawn(async move {
713            let interval = tokio::time::Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS);
714            loop {
715                tokio::time::sleep(interval).await;
716
717                match keep_alive_client.keep_alive().await {
718                    Ok(()) => {}
719                    Err(ref e) if e.is_login_failed() => {
720                        log::warn!("Betfair session expired, attempting re-login: {e}");
721                        if let Err(e) = keep_alive_client.reconnect().await {
722                            log::error!("Betfair re-login failed: {e}");
723                            continue;
724                        }
725                    }
726                    Err(e) => {
727                        log::warn!("Betfair keep-alive failed (transient): {e}");
728                        continue;
729                    }
730                }
731
732                if let Some(token) = keep_alive_client.session_token().await {
733                    keep_alive_stream.update_auth(&keep_alive_app_key, token.clone());
734
735                    if let Some(ref race_stream) = keep_alive_race_stream {
736                        race_stream.update_auth(&keep_alive_app_key, token);
737                    }
738                }
739                log::debug!("Betfair session keep-alive sent");
740            }
741        }));
742
743        // Spawn reconnect handler to refresh session on stream reconnection
744        let reconnect_http = Arc::clone(&self.http_client);
745        let reconnect_stream = Arc::clone(self.stream_client.as_ref().unwrap());
746        let reconnect_race_stream = self.race_stream_client.as_ref().map(Arc::clone);
747        let reconnect_app_key = self.credential.app_key().to_string();
748
749        self.reconnect_handle = Some(get_runtime().spawn(async move {
750            while reconnect_rx.recv().await.is_some() {
751                log::info!("Handling data stream reconnection");
752
753                match reconnect_http.keep_alive().await {
754                    Ok(()) => {}
755                    Err(ref e) if e.is_login_failed() => {
756                        log::warn!("Session expired on reconnect, attempting re-login: {e}");
757                        if let Err(e) = reconnect_http.reconnect().await {
758                            log::error!("Re-login failed on reconnect: {e}");
759                            continue;
760                        }
761                    }
762                    Err(e) => {
763                        log::warn!("Keep-alive failed on reconnect (transient): {e}");
764                        continue;
765                    }
766                }
767
768                if let Some(token) = reconnect_http.session_token().await {
769                    reconnect_stream.update_auth(&reconnect_app_key, token.clone());
770
771                    if let Some(ref race_stream) = reconnect_race_stream {
772                        race_stream.update_auth(&reconnect_app_key, token);
773                    }
774                }
775            }
776        }));
777
778        self.is_connected.store(true, Ordering::Release);
779
780        log::info!("Betfair data client connected: {}", self.client_id);
781        Ok(())
782    }
783
784    async fn disconnect(&mut self) -> anyhow::Result<()> {
785        if self.is_disconnected() {
786            return Ok(());
787        }
788
789        if let Some(handle) = self.keep_alive_handle.take() {
790            handle.abort();
791        }
792
793        if let Some(handle) = self.reconnect_handle.take() {
794            handle.abort();
795        }
796
797        if let Some(handle) = self.race_fatal_handle.take() {
798            handle.abort();
799        }
800
801        if let Some(client) = &self.race_stream_client {
802            client.close().await;
803        }
804        self.race_stream_client = None;
805
806        if let Some(client) = &self.stream_client {
807            client.close().await;
808        }
809
810        self.http_client.disconnect().await;
811        self.is_connected.store(false, Ordering::Relaxed);
812        self.subscribed_market_ids.clear();
813
814        log::info!("Betfair data client disconnected: {}", self.client_id);
815        Ok(())
816    }
817
818    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
819        let instrument_id = cmd.instrument_id;
820        let market_id = extract_market_id(&instrument_id)?;
821
822        if !self.subscribed_market_ids.insert(market_id.clone()) {
823            log::debug!("Book deltas already subscribed for market {market_id}");
824            return Ok(());
825        }
826
827        let stream_client = Arc::clone(
828            self.stream_client
829                .as_ref()
830                .ok_or_else(|| anyhow::anyhow!("Stream client not connected"))?,
831        );
832
833        let all_ids: Vec<String> = self.subscribed_market_ids.iter().cloned().collect();
834
835        let market_filter = StreamMarketFilter {
836            market_ids: Some(all_ids),
837            ..Default::default()
838        };
839
840        let data_filter = MarketDataFilter {
841            fields: Some(vec![
842                MarketDataFilterField::ExAllOffers,
843                MarketDataFilterField::ExTraded,
844                MarketDataFilterField::ExTradedVol,
845                MarketDataFilterField::ExLtp,
846                MarketDataFilterField::ExMarketDef,
847                MarketDataFilterField::SpTraded,
848                MarketDataFilterField::SpProjected,
849            ]),
850            ladder_levels: None,
851        };
852
853        let conflate_ms = self.config.stream_conflate_ms;
854
855        nautilus_common::live::get_runtime().spawn(async move {
856            if let Err(e) = stream_client
857                .subscribe_markets(market_filter, data_filter, None, conflate_ms)
858                .await
859            {
860                log::error!("Failed to subscribe to market data: {e}");
861            }
862        });
863
864        log::info!("Subscribing to book deltas for {instrument_id}");
865        Ok(())
866    }
867
868    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
869        log::info!(
870            "Unsubscribe book deltas not supported for Betfair: {}",
871            cmd.instrument_id
872        );
873        Ok(())
874    }
875
876    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
877        // Trades are included in market subscription via EX_TRADED
878        log::debug!(
879            "Trade data included in book subscription for {}",
880            cmd.instrument_id
881        );
882        Ok(())
883    }
884
885    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
886        log::info!(
887            "Unsubscribe trades not supported for Betfair: {}",
888            cmd.instrument_id
889        );
890        Ok(())
891    }
892
893    fn subscribe_instrument_status(
894        &mut self,
895        cmd: SubscribeInstrumentStatus,
896    ) -> anyhow::Result<()> {
897        // Instrument status is included in market subscription via EX_MARKET_DEF
898        log::debug!(
899            "Instrument status included in book subscription for {}",
900            cmd.instrument_id
901        );
902        Ok(())
903    }
904
905    fn unsubscribe_instrument_status(
906        &mut self,
907        cmd: &UnsubscribeInstrumentStatus,
908    ) -> anyhow::Result<()> {
909        log::info!(
910            "Unsubscribe instrument status not supported for Betfair: {}",
911            cmd.instrument_id
912        );
913        Ok(())
914    }
915}