1use std::{collections::HashMap, sync::Arc, time::SystemTime};
20
21use ahash::AHashSet;
22use async_trait::async_trait;
23use nautilus_common::providers::{InstrumentProvider, InstrumentStore};
24use nautilus_core::UnixNanos;
25use nautilus_model::{
26 identifiers::InstrumentId,
27 instruments::InstrumentAny,
28 types::{Currency, Money},
29};
30use ustr::Ustr;
31
32use crate::{
33 common::{
34 consts::{METHOD_GET_ACCOUNT_DETAILS, METHOD_LIST_MARKET_CATALOGUE},
35 enums::MarketProjection,
36 parse::{extract_market_id, parse_betfair_timestamp, parse_market_catalogue},
37 types::MarketId,
38 },
39 http::{
40 client::BetfairHttpClient,
41 models::{
42 AccountDetailsResponse, FlattenedMarket, ListMarketCatalogueParams, MarketCatalogue,
43 MarketFilter, Navigation, NavigationChild, TimeRange,
44 },
45 },
46};
47
48const CATALOGUE_BATCH_SIZE: usize = 50;
50
51#[derive(Debug, Clone, Default)]
56pub struct NavigationFilter {
57 pub event_type_ids: Option<Vec<String>>,
59 pub event_type_names: Option<Vec<String>>,
61 pub event_ids: Option<Vec<String>>,
63 pub country_codes: Option<Vec<String>>,
65 pub market_types: Option<Vec<String>>,
67 pub market_ids: Option<Vec<String>>,
69 pub min_market_start_time: Option<String>,
71 pub max_market_start_time: Option<String>,
73}
74
75impl NavigationFilter {
76 #[must_use]
78 pub fn matches(&self, market: &FlattenedMarket) -> bool {
79 if let Some(ids) = &self.event_type_ids {
80 match &market.event_type_id {
81 Some(id) => {
82 if !ids.iter().any(|f| f == id) {
83 return false;
84 }
85 }
86 None => return false,
87 }
88 }
89
90 if let Some(names) = &self.event_type_names {
91 match &market.event_type_name {
92 Some(name) => {
93 if !names.iter().any(|f| f == name.as_str()) {
94 return false;
95 }
96 }
97 None => return false,
98 }
99 }
100
101 if let Some(ids) = &self.event_ids {
102 match &market.event_id {
103 Some(id) => {
104 if !ids.iter().any(|f| f == id) {
105 return false;
106 }
107 }
108 None => return false,
109 }
110 }
111
112 if let Some(codes) = &self.country_codes {
113 match &market.event_country_code {
114 Some(cc) => {
115 if !codes.iter().any(|f| f == cc.as_str()) {
116 return false;
117 }
118 }
119 None => return false,
120 }
121 }
122
123 if let Some(types) = &self.market_types {
124 match &market.market_type {
125 Some(mt) => {
126 if !types.iter().any(|f| f == mt.as_str()) {
127 return false;
128 }
129 }
130 None => return false,
131 }
132 }
133
134 if let Some(ids) = &self.market_ids {
135 match &market.market_id {
136 Some(id) => {
137 if !ids.iter().any(|f| f == id) {
138 return false;
139 }
140 }
141 None => return false,
142 }
143 }
144
145 if let Some(min_time) = &self.min_market_start_time {
146 match (
147 &market.market_start_time,
148 parse_betfair_timestamp(min_time).ok(),
149 ) {
150 (Some(start_str), Some(min_ts)) => {
151 if let Ok(start_ts) = parse_betfair_timestamp(start_str)
152 && start_ts < min_ts
153 {
154 return false;
155 }
156 }
157 (None, _) => return false,
158 _ => {}
159 }
160 }
161
162 if let Some(max_time) = &self.max_market_start_time {
163 match (
164 &market.market_start_time,
165 parse_betfair_timestamp(max_time).ok(),
166 ) {
167 (Some(start_str), Some(max_ts)) => {
168 if let Ok(start_ts) = parse_betfair_timestamp(start_str)
169 && start_ts > max_ts
170 {
171 return false;
172 }
173 }
174 (None, _) => return false,
175 _ => {}
176 }
177 }
178
179 true
180 }
181}
182
183#[derive(Debug, Clone, Default)]
185struct NavContext {
186 event_type_id: Option<String>,
187 event_type_name: Option<Ustr>,
188 event_id: Option<String>,
189 event_name: Option<String>,
190 event_country_code: Option<Ustr>,
191}
192
193#[must_use]
199pub fn flatten_navigation(nav: &Navigation) -> Vec<FlattenedMarket> {
200 let mut markets = Vec::new();
201
202 if let Some(children) = &nav.children {
203 collect_markets(children, &NavContext::default(), &mut markets);
204 }
205 markets
206}
207
208fn collect_markets(children: &[NavigationChild], ctx: &NavContext, out: &mut Vec<FlattenedMarket>) {
209 for child in children {
210 match child {
211 NavigationChild::EventType(et) => {
212 let new_ctx = NavContext {
213 event_type_id: et.id.clone(),
214 event_type_name: et.name,
215 ..ctx.clone()
216 };
217
218 if let Some(kids) = &et.children {
219 collect_markets(kids, &new_ctx, out);
220 }
221 }
222 NavigationChild::Group(g) => {
223 if let Some(kids) = &g.children {
224 collect_markets(kids, ctx, out);
225 }
226 }
227 NavigationChild::Event(e) => {
228 let new_ctx = NavContext {
229 event_id: e.id.clone(),
230 event_name: e.name.clone(),
231 event_country_code: e.country_code,
232 ..ctx.clone()
233 };
234
235 if let Some(kids) = &e.children {
236 collect_markets(kids, &new_ctx, out);
237 }
238 }
239 NavigationChild::Race(r) => {
240 if let Some(kids) = &r.children {
241 collect_markets(kids, ctx, out);
242 }
243 }
244 NavigationChild::Market(m) => {
245 out.push(FlattenedMarket {
246 event_type_id: ctx.event_type_id.clone(),
247 event_type_name: ctx.event_type_name,
248 event_id: ctx.event_id.clone(),
249 event_name: ctx.event_name.clone(),
250 event_country_code: ctx.event_country_code,
251 market_id: m.id.clone(),
252 market_name: m.name.clone(),
253 market_type: m.market_type,
254 market_start_time: m.market_start_time.clone(),
255 number_of_winners: m.number_of_winners,
256 });
257 }
258 }
259 }
260}
261
262pub async fn load_instruments(
274 client: &BetfairHttpClient,
275 filter: &NavigationFilter,
276 currency: Currency,
277 min_notional: Option<Money>,
278) -> anyhow::Result<Vec<InstrumentAny>> {
279 let navigation: Navigation = client
280 .send_navigation()
281 .await
282 .map_err(|e| anyhow::anyhow!("{e}"))?;
283
284 let all_markets = flatten_navigation(&navigation);
285
286 let filtered: Vec<&FlattenedMarket> =
287 all_markets.iter().filter(|m| filter.matches(m)).collect();
288
289 log::info!("Found {} markets matching filter", filtered.len());
290
291 let market_ids: Vec<MarketId> = filtered
292 .iter()
293 .filter_map(|m| m.market_id.clone())
294 .collect::<AHashSet<_>>()
295 .into_iter()
296 .collect();
297
298 let time_range =
299 if filter.min_market_start_time.is_some() || filter.max_market_start_time.is_some() {
300 Some(TimeRange {
301 from: filter.min_market_start_time.clone(),
302 to: filter.max_market_start_time.clone(),
303 })
304 } else {
305 None
306 };
307
308 let ts_init = UnixNanos::from(SystemTime::now());
309 let mut all_instruments = Vec::new();
310
311 for chunk in market_ids.chunks(CATALOGUE_BATCH_SIZE) {
312 let params = ListMarketCatalogueParams {
313 filter: MarketFilter {
314 market_ids: Some(chunk.to_vec()),
315 market_start_time: time_range.clone(),
316 ..Default::default()
317 },
318 market_projection: Some(vec![
319 MarketProjection::EventType,
320 MarketProjection::Event,
321 MarketProjection::Competition,
322 MarketProjection::MarketDescription,
323 MarketProjection::RunnerDescription,
324 MarketProjection::RunnerMetadata,
325 MarketProjection::MarketStartTime,
326 ]),
327 max_results: Some(chunk.len() as u32),
328 sort: None,
329 locale: None,
330 };
331
332 let catalogues: Vec<MarketCatalogue> = client
333 .send_betting(METHOD_LIST_MARKET_CATALOGUE, ¶ms)
334 .await
335 .map_err(|e| anyhow::anyhow!("{e}"))?;
336
337 for catalogue in &catalogues {
338 match parse_market_catalogue(catalogue, currency, ts_init, min_notional) {
339 Ok(instruments) => all_instruments.extend(instruments),
340 Err(e) => {
341 log::warn!("Failed to parse catalogue {}: {e}", catalogue.market_id);
342 }
343 }
344 }
345 }
346
347 log::info!("Loaded {} instruments", all_instruments.len());
348 Ok(all_instruments)
349}
350
351#[derive(Debug)]
353pub struct BetfairInstrumentProvider {
354 store: InstrumentStore,
355 http_client: Arc<BetfairHttpClient>,
356 nav_filter: NavigationFilter,
357 currency: Currency,
358 min_notional: Option<Money>,
359}
360
361impl BetfairInstrumentProvider {
362 #[must_use]
364 pub fn new(
365 http_client: Arc<BetfairHttpClient>,
366 nav_filter: NavigationFilter,
367 currency: Currency,
368 min_notional: Option<Money>,
369 ) -> Self {
370 Self {
371 store: InstrumentStore::new(),
372 http_client,
373 nav_filter,
374 currency,
375 min_notional,
376 }
377 }
378
379 #[must_use]
381 pub fn currency(&self) -> Currency {
382 self.currency
383 }
384
385 #[must_use]
387 pub fn min_notional(&self) -> Option<Money> {
388 self.min_notional
389 }
390
391 pub async fn get_account_currency(&self) -> anyhow::Result<Currency> {
397 let details: AccountDetailsResponse = self
398 .http_client
399 .send_accounts(METHOD_GET_ACCOUNT_DETAILS, &serde_json::json!({}))
400 .await
401 .map_err(|e| anyhow::anyhow!("{e}"))?;
402
403 let code = details
404 .currency_code
405 .ok_or_else(|| anyhow::anyhow!("No currency_code in account details"))?;
406 code.as_str().parse::<Currency>()
407 }
408
409 fn build_effective_filter(
411 &self,
412 overrides: Option<&HashMap<String, String>>,
413 ) -> NavigationFilter {
414 let Some(overrides) = overrides else {
415 return self.nav_filter.clone();
416 };
417
418 let parse_csv = |key: &str| -> Option<Vec<String>> {
419 overrides
420 .get(key)
421 .map(|v| v.split(',').map(|s| s.trim().to_string()).collect())
422 };
423
424 NavigationFilter {
425 event_type_ids: parse_csv("event_type_ids")
426 .or_else(|| self.nav_filter.event_type_ids.clone()),
427 event_type_names: parse_csv("event_type_names")
428 .or_else(|| self.nav_filter.event_type_names.clone()),
429 event_ids: parse_csv("event_ids").or_else(|| self.nav_filter.event_ids.clone()),
430 country_codes: parse_csv("country_codes")
431 .or_else(|| self.nav_filter.country_codes.clone()),
432 market_types: parse_csv("market_types")
433 .or_else(|| self.nav_filter.market_types.clone()),
434 market_ids: parse_csv("market_ids").or_else(|| self.nav_filter.market_ids.clone()),
435 min_market_start_time: overrides
436 .get("min_market_start_time")
437 .cloned()
438 .or_else(|| self.nav_filter.min_market_start_time.clone()),
439 max_market_start_time: overrides
440 .get("max_market_start_time")
441 .cloned()
442 .or_else(|| self.nav_filter.max_market_start_time.clone()),
443 }
444 }
445}
446
447#[async_trait(?Send)]
448impl InstrumentProvider for BetfairInstrumentProvider {
449 fn store(&self) -> &InstrumentStore {
450 &self.store
451 }
452
453 fn store_mut(&mut self) -> &mut InstrumentStore {
454 &mut self.store
455 }
456
457 async fn load_all(&mut self, filters: Option<&HashMap<String, String>>) -> anyhow::Result<()> {
458 self.store.clear();
459 let effective_filter = self.build_effective_filter(filters);
460 let instruments = load_instruments(
461 &self.http_client,
462 &effective_filter,
463 self.currency,
464 self.min_notional,
465 )
466 .await?;
467 self.store.add_bulk(instruments);
468 self.store.set_initialized();
469 Ok(())
470 }
471
472 async fn load(
473 &mut self,
474 instrument_id: &InstrumentId,
475 _filters: Option<&HashMap<String, String>>,
476 ) -> anyhow::Result<()> {
477 let market_id = extract_market_id(instrument_id)?;
478 let ts_init = UnixNanos::from(SystemTime::now());
479
480 let params = ListMarketCatalogueParams {
481 filter: MarketFilter {
482 market_ids: Some(vec![market_id]),
483 ..Default::default()
484 },
485 market_projection: Some(vec![
486 MarketProjection::EventType,
487 MarketProjection::Event,
488 MarketProjection::Competition,
489 MarketProjection::MarketDescription,
490 MarketProjection::RunnerDescription,
491 MarketProjection::RunnerMetadata,
492 MarketProjection::MarketStartTime,
493 ]),
494 max_results: Some(1),
495 sort: None,
496 locale: None,
497 };
498
499 let catalogues: Vec<MarketCatalogue> = self
500 .http_client
501 .send_betting(METHOD_LIST_MARKET_CATALOGUE, ¶ms)
502 .await
503 .map_err(|e| anyhow::anyhow!("{e}"))?;
504
505 for catalogue in &catalogues {
506 let instruments =
507 parse_market_catalogue(catalogue, self.currency, ts_init, self.min_notional)?;
508
509 for inst in instruments {
510 self.store.add(inst);
511 }
512 }
513
514 Ok(())
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use rstest::rstest;
521
522 use super::*;
523 use crate::common::testing::load_test_json;
524
525 fn load_navigation_fixture() -> Navigation {
526 let data = load_test_json("rest/navigation_list_navigation.json");
527 serde_json::from_str(&data).unwrap()
528 }
529
530 #[rstest]
531 fn test_flatten_navigation() {
532 let nav = load_navigation_fixture();
533 let markets = flatten_navigation(&nav);
534
535 assert_eq!(markets.len(), 21);
536
537 let first = &markets[0];
538 assert!(first.event_type_id.is_some());
539 assert!(first.event_type_name.is_some());
540 assert!(first.market_id.is_some());
541 }
542
543 #[rstest]
544 fn test_flatten_navigation_context_propagation() {
545 let nav = load_navigation_fixture();
546 let markets = flatten_navigation(&nav);
547
548 for market in &markets {
549 assert!(
550 market.event_type_name.is_some(),
551 "market {:?} missing event_type_name",
552 market.market_id,
553 );
554 }
555 }
556
557 #[rstest]
558 fn test_filter_default_matches_all() {
559 let nav = load_navigation_fixture();
560 let markets = flatten_navigation(&nav);
561 let filter = NavigationFilter::default();
562
563 assert_eq!(
564 markets.iter().filter(|m| filter.matches(m)).count(),
565 markets.len(),
566 );
567 }
568
569 #[rstest]
570 fn test_filter_by_event_type_name() {
571 let nav = load_navigation_fixture();
572 let markets = flatten_navigation(&nav);
573 let filter = NavigationFilter {
574 event_type_names: Some(vec!["Horse Racing".to_string()]),
575 ..Default::default()
576 };
577
578 let matched: Vec<_> = markets.iter().filter(|m| filter.matches(m)).collect();
579
580 assert_eq!(matched.len(), 18);
581 for m in &matched {
582 assert_eq!(m.event_type_name.unwrap().as_str(), "Horse Racing");
583 }
584 }
585
586 #[rstest]
587 fn test_filter_by_market_type() {
588 let nav = load_navigation_fixture();
589 let markets = flatten_navigation(&nav);
590 let filter = NavigationFilter {
591 market_types: Some(vec!["WIN".to_string()]),
592 ..Default::default()
593 };
594
595 let matched: Vec<_> = markets.iter().filter(|m| filter.matches(m)).collect();
596
597 assert_eq!(matched.len(), 1);
598 assert_eq!(matched[0].market_type.unwrap().as_str(), "WIN");
599 }
600
601 #[rstest]
602 fn test_filter_multiple_criteria() {
603 let nav = load_navigation_fixture();
604 let markets = flatten_navigation(&nav);
605 let filter = NavigationFilter {
606 event_type_names: Some(vec!["Horse Racing".to_string()]),
607 market_types: Some(vec!["ANTEPOST_WIN".to_string()]),
608 ..Default::default()
609 };
610
611 let matched: Vec<_> = markets.iter().filter(|m| filter.matches(m)).collect();
612
613 assert_eq!(matched.len(), 16);
614 for m in &matched {
615 assert_eq!(m.event_type_name.unwrap().as_str(), "Horse Racing");
616 assert_eq!(m.market_type.unwrap().as_str(), "ANTEPOST_WIN");
617 }
618 }
619
620 #[rstest]
621 fn test_filter_no_match() {
622 let nav = load_navigation_fixture();
623 let markets = flatten_navigation(&nav);
624 let filter = NavigationFilter {
625 event_type_names: Some(vec!["Cricket".to_string()]),
626 ..Default::default()
627 };
628
629 assert_eq!(markets.iter().filter(|m| filter.matches(m)).count(), 0);
630 }
631}