1use std::sync::{
19 Arc, Mutex,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use async_trait::async_trait;
25use nautilus_common::{
26 clients::DataClient,
27 live::{get_runtime, runner::get_data_event_sender},
28 messages::{
29 DataEvent,
30 data::{
31 SubscribeBookDeltas, SubscribeInstrumentStatus, SubscribeTrades, UnsubscribeBookDeltas,
32 UnsubscribeInstrumentStatus, UnsubscribeTrades,
33 },
34 },
35 providers::InstrumentProvider,
36};
37use nautilus_core::{AtomicMap, Params};
38use nautilus_model::{
39 data::{
40 CustomData, CustomDataTrait, Data, DataType, OrderBookDeltas, OrderBookDeltas_API,
41 TradeTick,
42 },
43 identifiers::{ClientId, InstrumentId, TradeId, Venue},
44 instruments::{Instrument, InstrumentAny},
45 types::{Currency, Money, Price, Quantity},
46};
47use nautilus_network::socket::TcpMessageHandler;
48use rust_decimal::Decimal;
49use tokio::task::JoinHandle;
50
51use crate::{
52 common::{
53 consts::{
54 BETFAIR_PRICE_PRECISION, BETFAIR_QUANTITY_PRECISION, BETFAIR_RACE_STREAM_HOST,
55 BETFAIR_VENUE,
56 },
57 credential::BetfairCredential,
58 enums::{MarketDataFilterField, MarketStatus},
59 parse::{
60 extract_market_id, make_instrument_id, parse_market_definition, parse_millis_timestamp,
61 },
62 },
63 config::BetfairDataConfig,
64 data_types::{BetfairSequenceCompleted, register_betfair_custom_data},
65 http::client::BetfairHttpClient,
66 provider::{BetfairInstrumentProvider, NavigationFilter},
67 stream::{
68 client::{BetfairRaceStreamClient, BetfairStreamClient},
69 config::BetfairStreamConfig,
70 messages::{MarketDataFilter, StreamMarketFilter, StreamMessage, stream_decode},
71 parse::{
72 make_trade_tick, parse_betfair_starting_prices, parse_betfair_ticker,
73 parse_bsp_book_deltas, parse_instrument_closes, parse_instrument_statuses,
74 parse_race_progress, parse_race_runner_data, parse_runner_book_deltas,
75 },
76 },
77};
78
79const KEEP_ALIVE_INTERVAL_SECS: u64 = 36_000;
81
82pub(crate) fn custom_data_with_instrument(
85 value: Arc<dyn CustomDataTrait>,
86 instrument_id: InstrumentId,
87) -> CustomData {
88 let mut metadata = Params::new();
89 metadata.insert(
90 "instrument_id".to_string(),
91 serde_json::Value::String(instrument_id.to_string()),
92 );
93 let data_type = DataType::new(
94 value.type_name(),
95 Some(metadata),
96 Some(instrument_id.to_string()),
97 );
98 CustomData::new(value, data_type)
99}
100
101#[derive(Debug)]
103pub struct BetfairDataClient {
104 client_id: ClientId,
105 http_client: Arc<BetfairHttpClient>,
106 provider: BetfairInstrumentProvider,
107 stream_client: Option<Arc<BetfairStreamClient>>,
108 race_stream_client: Option<Arc<BetfairRaceStreamClient>>,
109 credential: BetfairCredential,
110 stream_config: BetfairStreamConfig,
111 config: BetfairDataConfig,
112 currency: Currency,
113 is_connected: AtomicBool,
114 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
115 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
116 subscribed_market_ids: AHashSet<String>,
117 keep_alive_handle: Option<JoinHandle<()>>,
118 reconnect_handle: Option<JoinHandle<()>>,
119 race_fatal_handle: Option<JoinHandle<()>>,
120}
121
122impl BetfairDataClient {
123 #[must_use]
125 #[expect(clippy::too_many_arguments)]
126 pub fn new(
127 client_id: ClientId,
128 http_client: BetfairHttpClient,
129 credential: BetfairCredential,
130 stream_config: BetfairStreamConfig,
131 config: BetfairDataConfig,
132 nav_filter: NavigationFilter,
133 currency: Currency,
134 min_notional: Option<Money>,
135 ) -> Self {
136 let data_sender = get_data_event_sender();
137 let http_client = Arc::new(http_client);
138 let provider = BetfairInstrumentProvider::new(
139 Arc::clone(&http_client),
140 nav_filter,
141 currency,
142 min_notional,
143 );
144
145 Self {
146 client_id,
147 http_client,
148 provider,
149 stream_client: None,
150 race_stream_client: None,
151 credential,
152 stream_config,
153 config,
154 currency,
155 is_connected: AtomicBool::new(false),
156 data_sender,
157 instruments: Arc::new(AtomicMap::new()),
158 subscribed_market_ids: AHashSet::new(),
159 keep_alive_handle: None,
160 reconnect_handle: None,
161 race_fatal_handle: None,
162 }
163 }
164
165 fn create_stream_handler(
166 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
167 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
168 currency: Currency,
169 min_notional: Option<Money>,
170 reconnect_tx: tokio::sync::mpsc::UnboundedSender<()>,
171 ) -> TcpMessageHandler {
172 let traded_volumes: Arc<Mutex<AHashMap<(InstrumentId, Decimal), Decimal>>> =
175 Arc::new(Mutex::new(AHashMap::new()));
176 let has_initial_connection = Arc::new(AtomicBool::new(false));
177
178 Arc::new(move |data: &[u8]| {
179 let msg = match stream_decode(data) {
180 Ok(msg) => msg,
181 Err(e) => {
182 log::warn!("Failed to decode stream message: {e}");
183 return;
184 }
185 };
186
187 match msg {
188 StreamMessage::MarketChange(mcm) => {
189 if mcm.is_heartbeat() {
190 return;
191 }
192
193 let Some(market_changes) = &mcm.mc else {
194 return;
195 };
196
197 let ts_event = parse_millis_timestamp(mcm.pt);
198 let ts_init = ts_event;
199
200 for mc in market_changes {
201 let is_snapshot = mc.img;
202 let mut market_closed = false;
203
204 if let Some(def) = &mc.market_definition {
205 match parse_market_definition(
209 &mc.id,
210 def,
211 currency,
212 ts_init,
213 min_notional,
214 ) {
215 Ok(new_instruments) => {
216 instruments.rcu(|m| {
217 for inst in &new_instruments {
218 m.insert(inst.id(), inst.clone());
219 }
220 });
221
222 for inst in new_instruments {
223 if let Err(e) =
224 data_sender.send(DataEvent::Instrument(inst))
225 {
226 log::warn!("Failed to send instrument: {e}");
227 }
228 }
229 }
230 Err(e) => {
231 log::warn!(
232 "Failed to parse market definition for {}: {e}",
233 mc.id
234 );
235 }
236 }
237
238 if let Some(status) = &def.status {
239 market_closed = *status == MarketStatus::Closed;
240
241 for event in
242 parse_instrument_statuses(&mc.id, def, ts_event, ts_init)
243 {
244 if let Err(e) =
245 data_sender.send(DataEvent::InstrumentStatus(event))
246 {
247 log::warn!("Failed to send instrument status: {e}");
248 }
249 }
250 }
251
252 for sp in parse_betfair_starting_prices(&mc.id, def, ts_event, ts_init)
253 {
254 let instrument_id = sp.instrument_id;
255 let custom =
256 custom_data_with_instrument(Arc::new(sp), instrument_id);
257
258 if let Err(e) =
259 data_sender.send(DataEvent::Data(Data::Custom(custom)))
260 {
261 log::warn!("Failed to send starting price: {e}");
262 }
263 }
264
265 for close in parse_instrument_closes(&mc.id, def, ts_event, ts_init) {
266 if let Err(e) =
267 data_sender.send(DataEvent::Data(Data::InstrumentClose(close)))
268 {
269 log::warn!("Failed to send instrument close: {e}");
270 }
271 }
272 }
273
274 let mut buffered_deltas: Vec<OrderBookDeltas> = Vec::new();
278 let mut buffered_bsp_customs: Vec<CustomData> = Vec::new();
279
280 if let Some(runner_changes) = &mc.rc {
281 for rc in runner_changes {
282 let handicap = rc.hc.unwrap_or(Decimal::ZERO);
283 let instrument_id = make_instrument_id(&mc.id, rc.id, handicap);
284
285 match parse_runner_book_deltas(
286 instrument_id,
287 rc,
288 is_snapshot,
289 mcm.pt,
290 ts_event,
291 ts_init,
292 ) {
293 Ok(Some(deltas)) => {
294 if is_snapshot {
295 if let Err(e) = data_sender.send(DataEvent::Data(
296 Data::Deltas(OrderBookDeltas_API::new(deltas)),
297 )) {
298 log::warn!("Failed to send book deltas: {e}");
299 }
300 } else {
301 buffered_deltas.push(deltas);
302 }
303 }
304 Ok(None) => {}
305 Err(e) => {
306 log::warn!(
307 "Failed to parse book deltas for {instrument_id}: {e}"
308 );
309 }
310 }
311
312 if let Some(trades) = &rc.trd {
313 let mut volumes = traded_volumes.lock().unwrap();
314
315 for pv in trades {
316 if pv.volume == Decimal::ZERO {
317 continue;
318 }
319
320 let key = (instrument_id, pv.price);
321 let prev_volume =
322 volumes.get(&key).copied().unwrap_or(Decimal::ZERO);
323
324 if pv.volume <= prev_volume {
325 continue;
326 }
327
328 let trade_volume = pv.volume - prev_volume;
329 volumes.insert(key, pv.volume);
330
331 let price = match Price::from_decimal_dp(
332 pv.price,
333 BETFAIR_PRICE_PRECISION,
334 ) {
335 Ok(p) => p,
336 Err(e) => {
337 log::warn!("Invalid trade price: {e}");
338 continue;
339 }
340 };
341 let size = match Quantity::from_decimal_dp(
342 trade_volume,
343 BETFAIR_QUANTITY_PRECISION,
344 ) {
345 Ok(q) => q,
346 Err(e) => {
347 log::warn!("Invalid trade size: {e}");
348 continue;
349 }
350 };
351 let trade_id = TradeId::new(format!(
352 "{}-{}-{}",
353 mcm.pt, rc.id, pv.price
354 ));
355 let tick: TradeTick = make_trade_tick(
356 instrument_id,
357 price,
358 size,
359 trade_id,
360 ts_event,
361 ts_init,
362 );
363
364 if let Err(e) =
365 data_sender.send(DataEvent::Data(Data::Trade(tick)))
366 {
367 log::warn!("Failed to send trade tick: {e}");
368 }
369 }
370 }
371
372 if let Some(ticker) =
373 parse_betfair_ticker(instrument_id, rc, ts_event, ts_init)
374 {
375 let custom = custom_data_with_instrument(
376 Arc::new(ticker),
377 instrument_id,
378 );
379
380 if let Err(e) =
381 data_sender.send(DataEvent::Data(Data::Custom(custom)))
382 {
383 log::warn!("Failed to send ticker: {e}");
384 }
385 }
386
387 for bsp_delta in
388 parse_bsp_book_deltas(instrument_id, rc, ts_event, ts_init)
389 {
390 buffered_bsp_customs.push(custom_data_with_instrument(
391 Arc::new(bsp_delta),
392 instrument_id,
393 ));
394 }
395 }
396 }
397
398 for deltas in buffered_deltas {
399 if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
400 OrderBookDeltas_API::new(deltas),
401 ))) {
402 log::warn!("Failed to send book deltas: {e}");
403 }
404 }
405
406 for custom in buffered_bsp_customs {
407 if let Err(e) = data_sender.send(DataEvent::Data(Data::Custom(custom)))
408 {
409 log::warn!("Failed to send BSP book delta: {e}");
410 }
411 }
412
413 if market_closed {
414 let prefix = format!("{}-", mc.id);
415
416 if let Ok(mut volumes) = traded_volumes.lock() {
417 volumes.retain(|k, _| !k.0.symbol.as_str().starts_with(&prefix));
418 }
419 }
420 }
421
422 let completed = BetfairSequenceCompleted::new(ts_event, ts_init);
423 let custom = CustomData::from_arc(Arc::new(completed));
424 if let Err(e) = data_sender.send(DataEvent::Data(Data::Custom(custom))) {
425 log::warn!("Failed to send sequence completed: {e}");
426 }
427 }
428 StreamMessage::Connection(_) => {
429 if has_initial_connection.swap(true, Ordering::SeqCst) {
430 log::info!("Betfair data stream reconnected");
431 let _ = reconnect_tx.send(());
432 } else {
433 log::info!("Betfair data stream connected");
434 }
435 }
436 StreamMessage::Status(status) => {
437 if status.connection_closed {
438 log::error!(
439 "Betfair stream closed: {:?} - {:?}",
440 status.error_code,
441 status.error_message,
442 );
443 }
444 }
445 StreamMessage::RaceChange(rcm) => {
446 if let Some(race_changes) = &rcm.rc {
447 let fallback_ts = parse_millis_timestamp(rcm.pt);
448
449 for rc in race_changes {
450 let race_id = rc.id.as_deref().unwrap_or("");
451 let market_id = rc.mid.as_deref().unwrap_or("");
452
453 if let Some(runners) = &rc.rrc {
454 for rrc in runners {
455 let ts_event =
456 rrc.ft.map_or(fallback_ts, parse_millis_timestamp);
457
458 if let Some(runner) = parse_race_runner_data(
459 race_id, market_id, rrc, ts_event, ts_event,
460 ) {
461 let selection_id = rrc.id.unwrap_or(0);
462 let mut metadata = Params::new();
463 metadata.insert(
464 "selection_id".to_string(),
465 serde_json::Value::Number(selection_id.into()),
466 );
467 let value: Arc<dyn CustomDataTrait> = Arc::new(runner);
468 let data_type =
469 DataType::new(value.type_name(), Some(metadata), None);
470 let custom = CustomData::new(value, data_type);
471
472 if let Err(e) =
473 data_sender.send(DataEvent::Data(Data::Custom(custom)))
474 {
475 log::warn!("Failed to send race runner data: {e}");
476 }
477 }
478 }
479 }
480
481 if let Some(rpc) = &rc.rpc {
482 let ts_event = rpc.ft.map_or(fallback_ts, parse_millis_timestamp);
483 let progress = parse_race_progress(
484 race_id, market_id, rpc, ts_event, ts_event,
485 );
486 let mut metadata = Params::new();
487 metadata.insert(
488 "race_id".to_string(),
489 serde_json::Value::String(race_id.to_string()),
490 );
491 let value: Arc<dyn CustomDataTrait> = Arc::new(progress);
492 let data_type =
493 DataType::new(value.type_name(), Some(metadata), None);
494 let custom = CustomData::new(value, data_type);
495
496 if let Err(e) =
497 data_sender.send(DataEvent::Data(Data::Custom(custom)))
498 {
499 log::warn!("Failed to send race progress: {e}");
500 }
501 }
502 }
503 }
504 }
505 StreamMessage::OrderChange(_) => {}
506 }
507 })
508 }
509}
510
511#[async_trait(?Send)]
512impl DataClient for BetfairDataClient {
513 fn client_id(&self) -> ClientId {
514 self.client_id
515 }
516
517 fn venue(&self) -> Option<Venue> {
518 Some(*BETFAIR_VENUE)
519 }
520
521 fn start(&mut self) -> anyhow::Result<()> {
522 log::info!("Starting Betfair data client: {}", self.client_id);
523 Ok(())
524 }
525
526 fn stop(&mut self) -> anyhow::Result<()> {
527 log::info!("Stopping Betfair data client: {}", self.client_id);
528
529 if let Some(handle) = self.keep_alive_handle.take() {
530 handle.abort();
531 }
532
533 if let Some(handle) = self.reconnect_handle.take() {
534 handle.abort();
535 }
536
537 if let Some(handle) = self.race_fatal_handle.take() {
538 handle.abort();
539 }
540 self.is_connected.store(false, Ordering::Relaxed);
541 Ok(())
542 }
543
544 fn reset(&mut self) -> anyhow::Result<()> {
545 log::info!("Resetting Betfair data client: {}", self.client_id);
546
547 if let Some(handle) = self.keep_alive_handle.take() {
548 handle.abort();
549 }
550
551 if let Some(handle) = self.reconnect_handle.take() {
552 handle.abort();
553 }
554
555 if let Some(handle) = self.race_fatal_handle.take() {
556 handle.abort();
557 }
558 self.is_connected.store(false, Ordering::Relaxed);
559 self.stream_client = None;
560 self.race_stream_client = None;
561 self.provider.store_mut().clear();
562 self.subscribed_market_ids.clear();
563
564 self.instruments.store(AHashMap::new());
565 Ok(())
566 }
567
568 fn dispose(&mut self) -> anyhow::Result<()> {
569 log::info!("Disposing Betfair data client: {}", self.client_id);
570 self.stop()
571 }
572
573 fn is_connected(&self) -> bool {
574 self.is_connected.load(Ordering::SeqCst)
575 }
576
577 fn is_disconnected(&self) -> bool {
578 !self.is_connected()
579 }
580
581 async fn connect(&mut self) -> anyhow::Result<()> {
582 if self.is_connected() {
583 return Ok(());
584 }
585
586 register_betfair_custom_data();
587
588 self.http_client
589 .connect()
590 .await
591 .map_err(|e| anyhow::anyhow!("{e}"))?;
592
593 self.provider.load_all(None).await?;
594
595 let loaded: Vec<InstrumentAny> = self
596 .provider
597 .store()
598 .list_all()
599 .into_iter()
600 .cloned()
601 .collect();
602
603 self.instruments.rcu(|m| {
604 for inst in &loaded {
605 m.insert(inst.id(), inst.clone());
606 }
607 });
608
609 for inst in &loaded {
610 if let Err(e) = self.data_sender.send(DataEvent::Instrument(inst.clone())) {
611 log::warn!("Failed to send instrument: {e}");
612 }
613 }
614
615 log::info!("Cached {} instruments for {}", loaded.len(), self.client_id,);
616
617 let session_token = self
618 .http_client
619 .session_token()
620 .await
621 .ok_or_else(|| anyhow::anyhow!("No session token after login"))?;
622
623 let (reconnect_tx, mut reconnect_rx) = tokio::sync::mpsc::unbounded_channel();
624
625 let handler = Self::create_stream_handler(
626 self.data_sender.clone(),
627 Arc::clone(&self.instruments),
628 self.currency,
629 self.provider.min_notional(),
630 reconnect_tx.clone(),
631 );
632
633 let stream_client = BetfairStreamClient::connect(
634 &self.credential,
635 session_token,
636 handler,
637 self.stream_config.clone(),
638 )
639 .await
640 .map_err(|e| anyhow::anyhow!("{e}"))?;
641
642 self.stream_client = Some(Arc::new(stream_client));
643
644 if self.config.subscribe_race_data {
645 let race_config = BetfairStreamConfig {
646 host: BETFAIR_RACE_STREAM_HOST.to_string(),
647 ..self.stream_config.clone()
648 };
649
650 let race_session = self
651 .http_client
652 .session_token()
653 .await
654 .ok_or_else(|| anyhow::anyhow!("No session token for race stream"))?;
655
656 let race_handler = Self::create_stream_handler(
657 self.data_sender.clone(),
658 Arc::clone(&self.instruments),
659 self.currency,
660 self.provider.min_notional(),
661 reconnect_tx.clone(),
662 );
663
664 let (race_fatal_tx, mut race_fatal_rx) = tokio::sync::mpsc::unbounded_channel();
665
666 match BetfairRaceStreamClient::connect(
667 &self.credential,
668 race_session,
669 race_handler,
670 race_config,
671 race_fatal_tx,
672 )
673 .await
674 {
675 Ok(client) => {
676 let race_client = Arc::new(client);
677 self.race_stream_client = Some(Arc::clone(&race_client));
678
679 if let Some(handle) = self.race_fatal_handle.take() {
680 handle.abort();
681 }
682
683 self.race_fatal_handle = Some(get_runtime().spawn(async move {
684 if race_fatal_rx.recv().await.is_some() {
685 log::error!(
686 "Betfair race stream permanently disabled due to fatal error"
687 );
688 race_client.close().await;
689 }
690 }));
691
692 log::info!("Betfair race stream connected");
693 }
694 Err(e) => {
695 log::warn!("Betfair race stream connect failed: {e}");
696 self.race_stream_client = None;
697 }
698 }
699 }
700
701 if let Some(handle) = self.keep_alive_handle.take() {
703 handle.abort();
704 }
705
706 let keep_alive_client = Arc::clone(&self.http_client);
708 let keep_alive_stream = Arc::clone(self.stream_client.as_ref().unwrap());
709 let keep_alive_race_stream = self.race_stream_client.as_ref().map(Arc::clone);
710 let keep_alive_app_key = self.credential.app_key().to_string();
711
712 self.keep_alive_handle = Some(get_runtime().spawn(async move {
713 let interval = tokio::time::Duration::from_secs(KEEP_ALIVE_INTERVAL_SECS);
714 loop {
715 tokio::time::sleep(interval).await;
716
717 match keep_alive_client.keep_alive().await {
718 Ok(()) => {}
719 Err(ref e) if e.is_login_failed() => {
720 log::warn!("Betfair session expired, attempting re-login: {e}");
721 if let Err(e) = keep_alive_client.reconnect().await {
722 log::error!("Betfair re-login failed: {e}");
723 continue;
724 }
725 }
726 Err(e) => {
727 log::warn!("Betfair keep-alive failed (transient): {e}");
728 continue;
729 }
730 }
731
732 if let Some(token) = keep_alive_client.session_token().await {
733 keep_alive_stream.update_auth(&keep_alive_app_key, token.clone());
734
735 if let Some(ref race_stream) = keep_alive_race_stream {
736 race_stream.update_auth(&keep_alive_app_key, token);
737 }
738 }
739 log::debug!("Betfair session keep-alive sent");
740 }
741 }));
742
743 let reconnect_http = Arc::clone(&self.http_client);
745 let reconnect_stream = Arc::clone(self.stream_client.as_ref().unwrap());
746 let reconnect_race_stream = self.race_stream_client.as_ref().map(Arc::clone);
747 let reconnect_app_key = self.credential.app_key().to_string();
748
749 self.reconnect_handle = Some(get_runtime().spawn(async move {
750 while reconnect_rx.recv().await.is_some() {
751 log::info!("Handling data stream reconnection");
752
753 match reconnect_http.keep_alive().await {
754 Ok(()) => {}
755 Err(ref e) if e.is_login_failed() => {
756 log::warn!("Session expired on reconnect, attempting re-login: {e}");
757 if let Err(e) = reconnect_http.reconnect().await {
758 log::error!("Re-login failed on reconnect: {e}");
759 continue;
760 }
761 }
762 Err(e) => {
763 log::warn!("Keep-alive failed on reconnect (transient): {e}");
764 continue;
765 }
766 }
767
768 if let Some(token) = reconnect_http.session_token().await {
769 reconnect_stream.update_auth(&reconnect_app_key, token.clone());
770
771 if let Some(ref race_stream) = reconnect_race_stream {
772 race_stream.update_auth(&reconnect_app_key, token);
773 }
774 }
775 }
776 }));
777
778 self.is_connected.store(true, Ordering::Release);
779
780 log::info!("Betfair data client connected: {}", self.client_id);
781 Ok(())
782 }
783
784 async fn disconnect(&mut self) -> anyhow::Result<()> {
785 if self.is_disconnected() {
786 return Ok(());
787 }
788
789 if let Some(handle) = self.keep_alive_handle.take() {
790 handle.abort();
791 }
792
793 if let Some(handle) = self.reconnect_handle.take() {
794 handle.abort();
795 }
796
797 if let Some(handle) = self.race_fatal_handle.take() {
798 handle.abort();
799 }
800
801 if let Some(client) = &self.race_stream_client {
802 client.close().await;
803 }
804 self.race_stream_client = None;
805
806 if let Some(client) = &self.stream_client {
807 client.close().await;
808 }
809
810 self.http_client.disconnect().await;
811 self.is_connected.store(false, Ordering::Relaxed);
812 self.subscribed_market_ids.clear();
813
814 log::info!("Betfair data client disconnected: {}", self.client_id);
815 Ok(())
816 }
817
818 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
819 let instrument_id = cmd.instrument_id;
820 let market_id = extract_market_id(&instrument_id)?;
821
822 if !self.subscribed_market_ids.insert(market_id.clone()) {
823 log::debug!("Book deltas already subscribed for market {market_id}");
824 return Ok(());
825 }
826
827 let stream_client = Arc::clone(
828 self.stream_client
829 .as_ref()
830 .ok_or_else(|| anyhow::anyhow!("Stream client not connected"))?,
831 );
832
833 let all_ids: Vec<String> = self.subscribed_market_ids.iter().cloned().collect();
834
835 let market_filter = StreamMarketFilter {
836 market_ids: Some(all_ids),
837 ..Default::default()
838 };
839
840 let data_filter = MarketDataFilter {
841 fields: Some(vec![
842 MarketDataFilterField::ExAllOffers,
843 MarketDataFilterField::ExTraded,
844 MarketDataFilterField::ExTradedVol,
845 MarketDataFilterField::ExLtp,
846 MarketDataFilterField::ExMarketDef,
847 MarketDataFilterField::SpTraded,
848 MarketDataFilterField::SpProjected,
849 ]),
850 ladder_levels: None,
851 };
852
853 let conflate_ms = self.config.stream_conflate_ms;
854
855 nautilus_common::live::get_runtime().spawn(async move {
856 if let Err(e) = stream_client
857 .subscribe_markets(market_filter, data_filter, None, conflate_ms)
858 .await
859 {
860 log::error!("Failed to subscribe to market data: {e}");
861 }
862 });
863
864 log::info!("Subscribing to book deltas for {instrument_id}");
865 Ok(())
866 }
867
868 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
869 log::info!(
870 "Unsubscribe book deltas not supported for Betfair: {}",
871 cmd.instrument_id
872 );
873 Ok(())
874 }
875
876 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
877 log::debug!(
879 "Trade data included in book subscription for {}",
880 cmd.instrument_id
881 );
882 Ok(())
883 }
884
885 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
886 log::info!(
887 "Unsubscribe trades not supported for Betfair: {}",
888 cmd.instrument_id
889 );
890 Ok(())
891 }
892
893 fn subscribe_instrument_status(
894 &mut self,
895 cmd: SubscribeInstrumentStatus,
896 ) -> anyhow::Result<()> {
897 log::debug!(
899 "Instrument status included in book subscription for {}",
900 cmd.instrument_id
901 );
902 Ok(())
903 }
904
905 fn unsubscribe_instrument_status(
906 &mut self,
907 cmd: &UnsubscribeInstrumentStatus,
908 ) -> anyhow::Result<()> {
909 log::info!(
910 "Unsubscribe instrument status not supported for Betfair: {}",
911 cmd.instrument_id
912 );
913 Ok(())
914 }
915}