1use std::{
24 fs::File,
25 io::{BufRead, BufReader},
26 path::Path,
27};
28
29use ahash::AHashMap;
30use anyhow::Context;
31use bzip2::read::BzDecoder;
32use flate2::read::GzDecoder;
33use nautilus_model::{
34 data::{InstrumentClose, InstrumentStatus, OrderBookDeltas, TradeTick},
35 identifiers::{InstrumentId, TradeId},
36 instruments::{Instrument, InstrumentAny},
37 types::{Currency, Money, Price, Quantity},
38};
39use rust_decimal::Decimal;
40
41use crate::{
42 common::{
43 consts::{BETFAIR_PRICE_PRECISION, BETFAIR_QUANTITY_PRECISION},
44 enums::MarketStatus,
45 parse::{make_instrument_id, parse_market_definition, parse_millis_timestamp},
46 },
47 data_types::{
48 BetfairBspBookDelta, BetfairRaceProgress, BetfairRaceRunnerData, BetfairSequenceCompleted,
49 BetfairStartingPrice, BetfairTicker,
50 },
51 stream::{
52 messages::{MCM, RCM, StreamMessage, stream_decode},
53 parse::{
54 make_trade_tick, parse_betfair_starting_prices, parse_betfair_ticker,
55 parse_bsp_book_deltas, parse_instrument_closes, parse_instrument_statuses,
56 parse_race_progress, parse_race_runner_data, parse_runner_book_deltas,
57 },
58 },
59};
60
61#[derive(Debug)]
63pub enum BetfairDataItem {
64 Instrument(Box<InstrumentAny>),
66 Status(InstrumentStatus),
68 Deltas(OrderBookDeltas),
70 Trade(TradeTick),
72 Ticker(BetfairTicker),
74 StartingPrice(BetfairStartingPrice),
76 BspBookDelta(BetfairBspBookDelta),
78 InstrumentClose(InstrumentClose),
80 SequenceCompleted(BetfairSequenceCompleted),
82 RaceRunnerData(BetfairRaceRunnerData),
84 RaceProgress(BetfairRaceProgress),
86}
87
88#[derive(Debug)]
94pub struct BetfairDataLoader {
95 currency: Currency,
96 min_notional: Option<Money>,
97 traded_volumes: AHashMap<(InstrumentId, Decimal), Decimal>,
98 instruments: AHashMap<InstrumentId, InstrumentAny>,
99}
100
101impl BetfairDataLoader {
102 #[must_use]
104 pub fn new(currency: Currency, min_notional: Option<Money>) -> Self {
105 Self {
106 currency,
107 min_notional,
108 traded_volumes: AHashMap::new(),
109 instruments: AHashMap::new(),
110 }
111 }
112
113 #[must_use]
115 pub fn instruments(&self) -> &AHashMap<InstrumentId, InstrumentAny> {
116 &self.instruments
117 }
118
119 pub fn reset(&mut self) {
121 self.traded_volumes.clear();
122 self.instruments.clear();
123 }
124
125 pub fn load(&mut self, filepath: &Path) -> anyhow::Result<Vec<BetfairDataItem>> {
136 let reader = open_reader(filepath)?;
137 let mut items = Vec::new();
138
139 for (line_num, line_result) in reader.lines().enumerate() {
140 let line = line_result.with_context(|| {
141 format!(
142 "failed to read line {} of '{}'",
143 line_num + 1,
144 filepath.display()
145 )
146 })?;
147
148 if line.is_empty() {
149 continue;
150 }
151
152 let msg = match stream_decode(line.as_bytes()) {
153 Ok(msg) => msg,
154 Err(e) => {
155 log::warn!("Failed to decode line {}: {e}", line_num + 1);
156 continue;
157 }
158 };
159
160 match msg {
161 StreamMessage::MarketChange(mcm) => self.process_mcm(&mcm, &mut items),
162 StreamMessage::RaceChange(rcm) => Self::process_rcm(&rcm, &mut items),
163 StreamMessage::Connection(_)
164 | StreamMessage::Status(_)
165 | StreamMessage::OrderChange(_) => {}
166 }
167 }
168
169 Ok(items)
170 }
171
172 pub fn load_instruments(&mut self, filepath: &Path) -> anyhow::Result<Vec<InstrumentAny>> {
182 let reader = open_reader(filepath)?;
183
184 for line_result in reader.lines() {
185 let line = line_result?;
186 if line.is_empty() {
187 continue;
188 }
189
190 let msg = match stream_decode(line.as_bytes()) {
191 Ok(msg) => msg,
192 Err(_) => continue,
193 };
194
195 if let StreamMessage::MarketChange(mcm) = msg {
196 let Some(market_changes) = &mcm.mc else {
197 continue;
198 };
199
200 let ts_init = parse_millis_timestamp(mcm.pt);
201
202 for mc in market_changes {
203 if let Some(def) = &mc.market_definition
204 && let Ok(instruments) = parse_market_definition(
205 &mc.id,
206 def,
207 self.currency,
208 ts_init,
209 self.min_notional,
210 )
211 {
212 for inst in instruments {
213 self.instruments.insert(inst.id(), inst);
214 }
215 }
216 }
217 }
218 }
219
220 Ok(self.instruments.values().cloned().collect())
221 }
222
223 fn process_mcm(&mut self, mcm: &MCM, items: &mut Vec<BetfairDataItem>) {
224 if mcm.is_heartbeat() {
225 return;
226 }
227
228 let Some(market_changes) = &mcm.mc else {
229 return;
230 };
231
232 let ts_event = parse_millis_timestamp(mcm.pt);
233 let ts_init = ts_event;
234
235 for mc in market_changes {
236 let is_snapshot = mc.img;
237 let mut market_closed = false;
238
239 if let Some(def) = &mc.market_definition {
240 match parse_market_definition(
244 &mc.id,
245 def,
246 self.currency,
247 ts_init,
248 self.min_notional,
249 ) {
250 Ok(new_instruments) => {
251 for inst in &new_instruments {
252 self.instruments.insert(inst.id(), inst.clone());
253 }
254
255 for inst in new_instruments {
256 items.push(BetfairDataItem::Instrument(Box::new(inst)));
257 }
258 }
259 Err(e) => {
260 log::warn!("Failed to parse market definition for {}: {e}", mc.id);
261 }
262 }
263
264 if let Some(status) = &def.status {
265 market_closed = *status == MarketStatus::Closed;
266
267 for event in parse_instrument_statuses(&mc.id, def, ts_event, ts_init) {
268 items.push(BetfairDataItem::Status(event));
269 }
270 }
271
272 for sp in parse_betfair_starting_prices(&mc.id, def, ts_event, ts_init) {
273 items.push(BetfairDataItem::StartingPrice(sp));
274 }
275
276 for close in parse_instrument_closes(&mc.id, def, ts_event, ts_init) {
277 items.push(BetfairDataItem::InstrumentClose(close));
278 }
279 }
280
281 let mut buffered_deltas: Vec<OrderBookDeltas> = Vec::new();
286 let mut buffered_bsp_deltas: Vec<BetfairBspBookDelta> = Vec::new();
287
288 if let Some(runner_changes) = &mc.rc {
289 for rc in runner_changes {
290 let handicap = rc.hc.unwrap_or(Decimal::ZERO);
291 let instrument_id = make_instrument_id(&mc.id, rc.id, handicap);
292
293 match parse_runner_book_deltas(
294 instrument_id,
295 rc,
296 is_snapshot,
297 mcm.pt,
298 ts_event,
299 ts_init,
300 ) {
301 Ok(Some(deltas)) => {
302 if is_snapshot {
303 items.push(BetfairDataItem::Deltas(deltas));
304 } else {
305 buffered_deltas.push(deltas);
306 }
307 }
308 Ok(None) => {}
309 Err(e) => {
310 log::warn!("Failed to parse book deltas for {instrument_id}: {e}");
311 }
312 }
313
314 if let Some(trades) = &rc.trd {
315 for pv in trades {
316 if pv.volume == Decimal::ZERO {
317 continue;
318 }
319
320 let key = (instrument_id, pv.price);
321 let prev_volume = self
322 .traded_volumes
323 .get(&key)
324 .copied()
325 .unwrap_or(Decimal::ZERO);
326
327 if pv.volume <= prev_volume {
328 continue;
329 }
330
331 let trade_volume = pv.volume - prev_volume;
332 self.traded_volumes.insert(key, pv.volume);
333
334 let price =
335 match Price::from_decimal_dp(pv.price, BETFAIR_PRICE_PRECISION) {
336 Ok(p) => p,
337 Err(e) => {
338 log::warn!("Invalid trade price: {e}");
339 continue;
340 }
341 };
342 let size = match Quantity::from_decimal_dp(
343 trade_volume,
344 BETFAIR_QUANTITY_PRECISION,
345 ) {
346 Ok(q) => q,
347 Err(e) => {
348 log::warn!("Invalid trade size: {e}");
349 continue;
350 }
351 };
352 let trade_id =
353 TradeId::new(format!("{}-{}-{}", mcm.pt, rc.id, pv.price));
354 let tick = make_trade_tick(
355 instrument_id,
356 price,
357 size,
358 trade_id,
359 ts_event,
360 ts_init,
361 );
362 items.push(BetfairDataItem::Trade(tick));
363 }
364 }
365
366 if let Some(ticker) = parse_betfair_ticker(instrument_id, rc, ts_event, ts_init)
367 {
368 items.push(BetfairDataItem::Ticker(ticker));
369 }
370
371 buffered_bsp_deltas.extend(parse_bsp_book_deltas(
372 instrument_id,
373 rc,
374 ts_event,
375 ts_init,
376 ));
377 }
378 }
379
380 for deltas in buffered_deltas {
381 items.push(BetfairDataItem::Deltas(deltas));
382 }
383
384 for bsp_delta in buffered_bsp_deltas {
385 items.push(BetfairDataItem::BspBookDelta(bsp_delta));
386 }
387
388 if market_closed {
389 let prefix = format!("{}-", mc.id);
390 self.traded_volumes
391 .retain(|k, _| !k.0.symbol.as_str().starts_with(&prefix));
392 }
393 }
394
395 items.push(BetfairDataItem::SequenceCompleted(
396 BetfairSequenceCompleted::new(ts_event, ts_init),
397 ));
398 }
399
400 fn process_rcm(rcm: &RCM, items: &mut Vec<BetfairDataItem>) {
401 let Some(race_changes) = &rcm.rc else {
402 return;
403 };
404
405 let fallback_ts = parse_millis_timestamp(rcm.pt);
406
407 for rc in race_changes {
408 let race_id = rc.id.as_deref().unwrap_or("");
409 let market_id = rc.mid.as_deref().unwrap_or("");
410
411 if let Some(runners) = &rc.rrc {
412 for rrc in runners {
413 let ts_event = rrc.ft.map_or(fallback_ts, parse_millis_timestamp);
414
415 if let Some(runner) =
416 parse_race_runner_data(race_id, market_id, rrc, ts_event, ts_event)
417 {
418 items.push(BetfairDataItem::RaceRunnerData(runner));
419 }
420 }
421 }
422
423 if let Some(rpc) = &rc.rpc {
424 let ts_event = rpc.ft.map_or(fallback_ts, parse_millis_timestamp);
425 let progress = parse_race_progress(race_id, market_id, rpc, ts_event, ts_event);
426 items.push(BetfairDataItem::RaceProgress(progress));
427 }
428 }
429 }
430}
431
432fn open_reader(filepath: &Path) -> anyhow::Result<Box<dyn BufRead>> {
433 let file =
434 File::open(filepath).with_context(|| format!("failed to open '{}'", filepath.display()))?;
435
436 let ext = filepath.extension().and_then(|e| e.to_str()).unwrap_or("");
437
438 if ext.eq_ignore_ascii_case("gz") {
439 Ok(Box::new(BufReader::new(GzDecoder::new(file))))
440 } else if ext.eq_ignore_ascii_case("bz2") {
441 Ok(Box::new(BufReader::new(BzDecoder::new(file))))
442 } else {
443 Ok(Box::new(BufReader::new(file)))
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use std::path::PathBuf;
450
451 use rstest::rstest;
452
453 use super::*;
454 use crate::common::testing::load_test_json;
455
456 fn compact_json(pretty: &str) -> String {
457 let value: serde_json::Value = serde_json::from_str(pretty).unwrap();
458 serde_json::to_string(&value).unwrap()
459 }
460
461 fn local_data_dir() -> PathBuf {
462 PathBuf::from(env!("CARGO_MANIFEST_DIR"))
463 .ancestors()
464 .nth(3)
465 .unwrap()
466 .join("tests/test_data/local/betfair")
467 }
468
469 fn test_data_dir() -> PathBuf {
470 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data")
471 }
472
473 #[rstest]
474 fn test_load_bz2_file() {
475 let filepath = test_data_dir().join("stream/sample.bz2");
476 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
477 let items = loader.load(&filepath).unwrap();
478
479 let instrument_count = items
480 .iter()
481 .filter(|i| matches!(i, BetfairDataItem::Instrument(_)))
482 .count();
483 assert!(
484 instrument_count > 0,
485 "should parse instruments from bz2 file"
486 );
487 assert_eq!(loader.instruments().len(), instrument_count);
488
489 let has_sequence = items
490 .iter()
491 .any(|i| matches!(i, BetfairDataItem::SequenceCompleted(_)));
492 assert!(has_sequence, "should emit SequenceCompleted");
493 }
494
495 #[rstest]
496 fn test_load_single_mcm_line() {
497 let data = compact_json(&load_test_json("stream/mcm_SUB_IMAGE.json"));
498 let tmp_dir = std::env::temp_dir().join("betfair_test");
499 std::fs::create_dir_all(&tmp_dir).unwrap();
500 let tmp_file = tmp_dir.join("test_single_mcm.json");
501 std::fs::write(&tmp_file, &data).unwrap();
502
503 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
504 let items = loader.load(&tmp_file).unwrap();
505
506 let instrument_count = items
507 .iter()
508 .filter(|i| matches!(i, BetfairDataItem::Instrument(_)))
509 .count();
510 assert!(
511 instrument_count > 0,
512 "should parse instruments from market definition"
513 );
514 assert_eq!(loader.instruments().len(), instrument_count);
515
516 let has_sequence = items
517 .iter()
518 .any(|i| matches!(i, BetfairDataItem::SequenceCompleted(_)));
519 assert!(has_sequence, "should emit SequenceCompleted");
520
521 std::fs::remove_file(&tmp_file).ok();
522 }
523
524 #[rstest]
525 fn test_load_mcm_with_book_data() {
526 let sub_image = compact_json(&load_test_json("stream/mcm_SUB_IMAGE.json"));
527 let update = compact_json(&load_test_json("stream/mcm_UPDATE.json"));
528
529 let tmp_dir = std::env::temp_dir().join("betfair_test");
530 std::fs::create_dir_all(&tmp_dir).unwrap();
531 let tmp_file = tmp_dir.join("test_book_data.json");
532 std::fs::write(&tmp_file, format!("{sub_image}\n{update}")).unwrap();
533
534 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
535 let items = loader.load(&tmp_file).unwrap();
536
537 let deltas_count = items
538 .iter()
539 .filter(|i| matches!(i, BetfairDataItem::Deltas(_)))
540 .count();
541 assert!(deltas_count > 0, "should parse book deltas");
542
543 std::fs::remove_file(&tmp_file).ok();
544 }
545
546 #[rstest]
547 fn test_load_instruments_only() {
548 let sub_image = compact_json(&load_test_json("stream/mcm_SUB_IMAGE.json"));
549 let update = compact_json(&load_test_json("stream/mcm_UPDATE.json"));
550
551 let tmp_dir = std::env::temp_dir().join("betfair_test");
552 std::fs::create_dir_all(&tmp_dir).unwrap();
553 let tmp_file = tmp_dir.join("test_instruments_only.json");
554 std::fs::write(&tmp_file, format!("{sub_image}\n{update}")).unwrap();
555
556 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
557 let instruments = loader.load_instruments(&tmp_file).unwrap();
558
559 assert!(!instruments.is_empty(), "should find instruments");
560 assert_eq!(loader.instruments().len(), instruments.len());
561
562 std::fs::remove_file(&tmp_file).ok();
563 }
564
565 #[rstest]
566 fn test_reset_clears_state() {
567 let data = compact_json(&load_test_json("stream/mcm_SUB_IMAGE.json"));
568 let tmp_dir = std::env::temp_dir().join("betfair_test");
569 std::fs::create_dir_all(&tmp_dir).unwrap();
570 let tmp_file = tmp_dir.join("test_reset.json");
571 std::fs::write(&tmp_file, &data).unwrap();
572
573 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
574 loader.load(&tmp_file).unwrap();
575 assert!(!loader.instruments().is_empty());
576
577 loader.reset();
578 assert!(loader.instruments().is_empty());
579 assert!(loader.traded_volumes.is_empty());
580
581 std::fs::remove_file(&tmp_file).ok();
582 }
583
584 #[rstest]
585 fn test_load_bsp_data() {
586 let raw = load_test_json("stream/mcm_BSP.json");
587 let messages: Vec<serde_json::Value> = serde_json::from_str(&raw).unwrap();
588 let lines: Vec<String> = messages
589 .iter()
590 .map(|v| serde_json::to_string(v).unwrap())
591 .collect();
592
593 let tmp_dir = std::env::temp_dir().join("betfair_test");
594 std::fs::create_dir_all(&tmp_dir).unwrap();
595 let tmp_file = tmp_dir.join("test_bsp.json");
596 std::fs::write(&tmp_file, lines.join("\n")).unwrap();
597
598 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
599 let items = loader.load(&tmp_file).unwrap();
600
601 let bsp_count = items
602 .iter()
603 .filter(|i| matches!(i, BetfairDataItem::BspBookDelta(_)))
604 .count();
605 assert!(bsp_count > 0, "should parse BSP book deltas");
606
607 std::fs::remove_file(&tmp_file).ok();
608 }
609
610 #[rstest]
611 fn test_load_market_definition_with_traded_volumes() {
612 let sub_image = compact_json(&load_test_json("stream/mcm_SUB_IMAGE.json"));
613 let update_tv = compact_json(&load_test_json("stream/mcm_UPDATE_tv.json"));
614
615 let tmp_dir = std::env::temp_dir().join("betfair_test");
616 std::fs::create_dir_all(&tmp_dir).unwrap();
617 let tmp_file = tmp_dir.join("test_tv.json");
618 std::fs::write(&tmp_file, format!("{sub_image}\n{update_tv}")).unwrap();
619
620 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
621 let items = loader.load(&tmp_file).unwrap();
622
623 let ticker_count = items
624 .iter()
625 .filter(|i| matches!(i, BetfairDataItem::Ticker(_)))
626 .count();
627 assert!(ticker_count > 0, "should parse ticker data from tv updates");
628
629 std::fs::remove_file(&tmp_file).ok();
630 }
631
632 #[rstest]
633 #[ignore] fn test_load_match_odds_file() {
635 let filepath = local_data_dir().join("1.253378068.gz");
636 if !filepath.exists() {
637 eprintln!("Skipping: {filepath:?} not found");
638 return;
639 }
640
641 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
642 let items = loader.load(&filepath).unwrap();
643
644 let instrument_count = items
645 .iter()
646 .filter(|i| matches!(i, BetfairDataItem::Instrument(_)))
647 .count();
648 let deltas_count = items
649 .iter()
650 .filter(|i| matches!(i, BetfairDataItem::Deltas(_)))
651 .count();
652 let trade_count = items
653 .iter()
654 .filter(|i| matches!(i, BetfairDataItem::Trade(_)))
655 .count();
656 let close_count = items
657 .iter()
658 .filter(|i| matches!(i, BetfairDataItem::InstrumentClose(_)))
659 .count();
660
661 println!(
662 "Match odds file: {instrument_count} instruments, {deltas_count} deltas, {trade_count} trades, {close_count} closes"
663 );
664 println!("Total items: {}", items.len());
665
666 assert!(instrument_count >= 3, "expected at least 3 instruments");
668 assert!(deltas_count > 0, "expected book deltas");
669 assert!(trade_count > 0, "expected trade ticks");
670 assert!(close_count > 0, "expected instrument closes at settlement");
671
672 let closes: Vec<_> = items
674 .iter()
675 .filter_map(|i| match i {
676 BetfairDataItem::InstrumentClose(c) => Some(c),
677 _ => None,
678 })
679 .collect();
680 let winner = closes.iter().find(|c| c.close_price == Price::from("1.00"));
681 assert!(winner.is_some(), "expected a winner with close_price 1.00");
682 assert!(
683 winner
684 .unwrap()
685 .instrument_id
686 .symbol
687 .as_str()
688 .contains("2426"),
689 "winner should be runner 2426"
690 );
691 }
692
693 #[rstest]
694 #[ignore] fn test_load_racing_win_file() {
696 let filepath = local_data_dir().join("1.245077076.gz");
697 if !filepath.exists() {
698 eprintln!("Skipping: {filepath:?} not found");
699 return;
700 }
701
702 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
703 let items = loader.load(&filepath).unwrap();
704
705 let instrument_count = items
706 .iter()
707 .filter(|i| matches!(i, BetfairDataItem::Instrument(_)))
708 .count();
709 let deltas_count = items
710 .iter()
711 .filter(|i| matches!(i, BetfairDataItem::Deltas(_)))
712 .count();
713 let trade_count = items
714 .iter()
715 .filter(|i| matches!(i, BetfairDataItem::Trade(_)))
716 .count();
717 let close_count = items
718 .iter()
719 .filter(|i| matches!(i, BetfairDataItem::InstrumentClose(_)))
720 .count();
721
722 println!(
723 "Racing file: {instrument_count} instruments, {deltas_count} deltas, {trade_count} trades, {close_count} closes"
724 );
725 println!("Total items: {}", items.len());
726
727 assert!(instrument_count >= 6, "expected at least 6 instruments");
729 assert!(deltas_count > 0, "expected book deltas");
730 assert!(trade_count > 0, "expected trade ticks");
731 assert!(close_count > 0, "expected instrument closes at settlement");
732
733 let closes: Vec<_> = items
735 .iter()
736 .filter_map(|i| match i {
737 BetfairDataItem::InstrumentClose(c) => Some(c),
738 _ => None,
739 })
740 .collect();
741 let winner = closes.iter().find(|c| c.close_price == Price::from("1.00"));
742 assert!(winner.is_some(), "expected a winner with close_price 1.00");
743 assert!(
744 winner
745 .unwrap()
746 .instrument_id
747 .symbol
748 .as_str()
749 .contains("75925986"),
750 "winner should be runner 75925986"
751 );
752 }
753
754 fn write_tmp(contents: &str, name: &str) -> PathBuf {
755 let tmp_dir = std::env::temp_dir().join("betfair_test");
756 std::fs::create_dir_all(&tmp_dir).unwrap();
757 let tmp_file = tmp_dir.join(name);
758 std::fs::write(&tmp_file, contents).unwrap();
759 tmp_file
760 }
761
762 fn find_first(
763 items: &[BetfairDataItem],
764 pred: impl Fn(&BetfairDataItem) -> bool,
765 ) -> Option<usize> {
766 items.iter().position(pred)
767 }
768
769 fn find_last(
770 items: &[BetfairDataItem],
771 pred: impl Fn(&BetfairDataItem) -> bool,
772 ) -> Option<usize> {
773 items.iter().rposition(pred)
774 }
775
776 fn partition_by_mcm(items: &[BetfairDataItem]) -> Vec<&[BetfairDataItem]> {
779 let mut partitions = Vec::new();
780 let mut start = 0;
781
782 for (i, item) in items.iter().enumerate() {
783 if matches!(item, BetfairDataItem::SequenceCompleted(_)) {
784 partitions.push(&items[start..=i]);
785 start = i + 1;
786 }
787 }
788
789 partitions
790 }
791
792 #[rstest]
793 fn test_load_emits_instrument_before_status_and_close() {
794 let data = compact_json(&load_test_json("stream/mcm_UPDATE_md.json"));
799 let tmp_file = write_tmp(&data, "test_order_instrument_first.json");
800
801 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
802 let items = loader.load(&tmp_file).unwrap();
803
804 let instrument_idx =
805 find_first(&items, |i| matches!(i, BetfairDataItem::Instrument(_))).unwrap();
806 let status_idx = find_first(&items, |i| matches!(i, BetfairDataItem::Status(_))).unwrap();
807
808 assert!(
809 instrument_idx < status_idx,
810 "Instrument (idx {instrument_idx}) must precede Status (idx {status_idx})"
811 );
812
813 std::fs::remove_file(&tmp_file).ok();
814 }
815
816 #[rstest]
817 fn test_load_emits_instrument_before_close() {
818 let mcm = r#"{"op":"mcm","id":1,"pt":1627617202953,"ct":"SUB_IMAGE","mc":[{"id":"1.1","marketDefinition":{"bspMarket":false,"turnInPlayEnabled":false,"persistenceEnabled":false,"marketBaseRate":5,"eventId":"1","eventTypeId":"1","numberOfWinners":1,"bettingType":"ODDS","marketType":"WIN","marketTime":"2021-07-30T03:55:00.000Z","bspReconciled":true,"complete":true,"inPlay":false,"crossMatching":false,"runnersVoidable":false,"numberOfActiveRunners":0,"betDelay":0,"status":"CLOSED","runners":[{"status":"WINNER","sortPriority":1,"id":101},{"status":"LOSER","sortPriority":2,"id":102}],"regulators":["MR_INT"],"discountAllowed":true,"timezone":"UTC","openDate":"2021-07-30T02:45:00.000Z","version":1,"priceLadderDefinition":{"type":"CLASSIC"}}}]}"#;
822 let tmp_file = write_tmp(mcm, "test_order_instrument_before_close.json");
823
824 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
825 let items = loader.load(&tmp_file).unwrap();
826
827 let instrument_idx =
828 find_first(&items, |i| matches!(i, BetfairDataItem::Instrument(_))).unwrap();
829 let close_idx =
830 find_first(&items, |i| matches!(i, BetfairDataItem::InstrumentClose(_))).unwrap();
831
832 assert!(
833 instrument_idx < close_idx,
834 "Instrument (idx {instrument_idx}) must precede InstrumentClose (idx {close_idx})"
835 );
836
837 std::fs::remove_file(&tmp_file).ok();
838 }
839
840 #[rstest]
841 fn test_load_non_snapshot_deltas_tail_after_trades() {
842 let data = compact_json(&load_test_json("stream/mcm_live_UPDATE.json"));
846 let tmp_file = write_tmp(&data, "test_order_deltas_tail.json");
847
848 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
849 let items = loader.load(&tmp_file).unwrap();
850
851 let last_trade_idx = find_last(&items, |i| matches!(i, BetfairDataItem::Trade(_))).unwrap();
853 let first_deltas_idx =
854 find_first(&items, |i| matches!(i, BetfairDataItem::Deltas(_))).unwrap();
855
856 assert!(
857 first_deltas_idx > last_trade_idx,
858 "Deltas (first idx {first_deltas_idx}) must tail after Trade (last idx {last_trade_idx}) on non-snapshot updates"
859 );
860
861 std::fs::remove_file(&tmp_file).ok();
862 }
863
864 #[rstest]
865 fn test_load_snapshot_deltas_emit_inline_before_trades() {
866 let data = compact_json(&load_test_json(
870 "stream/market_definition_runner_removed.json",
871 ));
872 let tmp_file = write_tmp(&data, "test_order_snapshot_inline.json");
873
874 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
875 let items = loader.load(&tmp_file).unwrap();
876
877 let first_deltas_idx =
878 find_first(&items, |i| matches!(i, BetfairDataItem::Deltas(_))).unwrap();
879 let first_trade_idx =
880 find_first(&items, |i| matches!(i, BetfairDataItem::Trade(_))).unwrap();
881
882 assert!(
883 first_deltas_idx < first_trade_idx,
884 "Snapshot Deltas (first idx {first_deltas_idx}) must emit before Trade (first idx {first_trade_idx})"
885 );
886
887 std::fs::remove_file(&tmp_file).ok();
888 }
889
890 #[rstest]
891 fn test_load_bsp_tails_after_book_deltas() {
892 let raw = load_test_json("stream/mcm_BSP.json");
896 let messages: Vec<serde_json::Value> = serde_json::from_str(&raw).unwrap();
897 let lines: Vec<String> = messages
898 .iter()
899 .map(|v| serde_json::to_string(v).unwrap())
900 .collect();
901 let tmp_file = write_tmp(&lines.join("\n"), "test_order_bsp_tail.json");
902
903 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
904 let items = loader.load(&tmp_file).unwrap();
905
906 let partitions = partition_by_mcm(&items);
907 assert!(
908 !partitions.is_empty(),
909 "expected at least one MCM partition"
910 );
911
912 let mut checked_any = false;
913
914 for partition in partitions {
915 let last_deltas_idx = find_last(partition, |i| matches!(i, BetfairDataItem::Deltas(_)));
916 let first_bsp_idx =
917 find_first(partition, |i| matches!(i, BetfairDataItem::BspBookDelta(_)));
918
919 if let (Some(last_deltas), Some(first_bsp)) = (last_deltas_idx, first_bsp_idx) {
920 assert!(
921 last_deltas < first_bsp,
922 "BspBookDelta (first idx {first_bsp}) must tail after Deltas (last idx {last_deltas}) within the same MCM"
923 );
924 checked_any = true;
925 }
926 }
927
928 assert!(
929 checked_any,
930 "expected at least one MCM to contain both Deltas and BspBookDelta"
931 );
932
933 std::fs::remove_file(&tmp_file).ok();
934 }
935
936 #[rstest]
937 fn test_load_emits_close_for_removed_runner_while_market_open() {
938 let mcm = r#"{"op":"mcm","id":1,"pt":1627617202953,"ct":"SUB_IMAGE","mc":[{"id":"1.2","marketDefinition":{"bspMarket":false,"turnInPlayEnabled":false,"persistenceEnabled":false,"marketBaseRate":5,"eventId":"1","eventTypeId":"1","numberOfWinners":1,"bettingType":"ODDS","marketType":"WIN","marketTime":"2021-07-30T03:55:00.000Z","bspReconciled":false,"complete":true,"inPlay":false,"crossMatching":false,"runnersVoidable":false,"numberOfActiveRunners":1,"betDelay":0,"status":"OPEN","runners":[{"status":"ACTIVE","sortPriority":1,"id":201},{"status":"REMOVED","sortPriority":2,"id":202}],"regulators":["MR_INT"],"discountAllowed":true,"timezone":"UTC","openDate":"2021-07-30T02:45:00.000Z","version":1,"priceLadderDefinition":{"type":"CLASSIC"}}}]}"#;
942 let tmp_file = write_tmp(mcm, "test_order_close_for_removed.json");
943
944 let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
945 let items = loader.load(&tmp_file).unwrap();
946
947 let closes: Vec<_> = items
948 .iter()
949 .filter_map(|i| match i {
950 BetfairDataItem::InstrumentClose(c) => Some(c),
951 _ => None,
952 })
953 .collect();
954
955 assert_eq!(
956 closes.len(),
957 1,
958 "Removed runner must produce exactly one InstrumentClose while market is Open"
959 );
960 assert!(
961 closes[0].instrument_id.symbol.as_str().contains("202"),
962 "close must target the removed runner (selection id 202)"
963 );
964
965 std::fs::remove_file(&tmp_file).ok();
966 }
967}