Skip to main content

nautilus_betfair/
loader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! File-based loader for historical Betfair Exchange Streaming data.
17//!
18//! Reads compressed (gzip or bzip2) or plain JSON files containing
19//! newline-delimited Betfair ESA messages and produces Nautilus domain
20//! objects. The parsing logic mirrors the live data client handler in
21//! [`crate::data`].
22
23use std::{
24    fs::File,
25    io::{BufRead, BufReader},
26    path::Path,
27};
28
29use ahash::AHashMap;
30use anyhow::Context;
31use bzip2::read::BzDecoder;
32use flate2::read::GzDecoder;
33use nautilus_model::{
34    data::{InstrumentClose, InstrumentStatus, OrderBookDeltas, TradeTick},
35    identifiers::{InstrumentId, TradeId},
36    instruments::{Instrument, InstrumentAny},
37    types::{Currency, Money, Price, Quantity},
38};
39use rust_decimal::Decimal;
40
41use crate::{
42    common::{
43        consts::{BETFAIR_PRICE_PRECISION, BETFAIR_QUANTITY_PRECISION},
44        enums::MarketStatus,
45        parse::{make_instrument_id, parse_market_definition, parse_millis_timestamp},
46    },
47    data_types::{
48        BetfairBspBookDelta, BetfairRaceProgress, BetfairRaceRunnerData, BetfairSequenceCompleted,
49        BetfairStartingPrice, BetfairTicker,
50    },
51    stream::{
52        messages::{MCM, RCM, StreamMessage, stream_decode},
53        parse::{
54            make_trade_tick, parse_betfair_starting_prices, parse_betfair_ticker,
55            parse_bsp_book_deltas, parse_instrument_closes, parse_instrument_statuses,
56            parse_race_progress, parse_race_runner_data, parse_runner_book_deltas,
57        },
58    },
59};
60
61/// A parsed data item from a Betfair historical file.
62#[derive(Debug)]
63pub enum BetfairDataItem {
64    /// Instrument definition from a market definition.
65    Instrument(Box<InstrumentAny>),
66    /// Market status change for an instrument.
67    Status(InstrumentStatus),
68    /// Order book snapshot or delta update.
69    Deltas(OrderBookDeltas),
70    /// Incremental trade tick derived from cumulative traded volumes.
71    Trade(TradeTick),
72    /// Betfair-specific ticker data (last traded price, traded volume, BSP near/far).
73    Ticker(BetfairTicker),
74    /// Betfair Starting Price for a runner.
75    StartingPrice(BetfairStartingPrice),
76    /// BSP book delta (separate from exchange book).
77    BspBookDelta(BetfairBspBookDelta),
78    /// Instrument close event at market settlement.
79    InstrumentClose(InstrumentClose),
80    /// Marker emitted after each MCM batch is fully processed.
81    SequenceCompleted(BetfairSequenceCompleted),
82    /// GPS tracking data for a race runner (from RCM).
83    RaceRunnerData(BetfairRaceRunnerData),
84    /// Race-level progress data (from RCM).
85    RaceProgress(BetfairRaceProgress),
86}
87
88/// Reads Betfair historical data files and converts them into Nautilus domain objects.
89///
90/// Each file contains newline-delimited JSON from the Betfair Exchange Streaming API.
91/// The loader handles gzip decompression, stateful traded volume tracking, and
92/// instrument creation from market definitions.
93#[derive(Debug)]
94pub struct BetfairDataLoader {
95    currency: Currency,
96    min_notional: Option<Money>,
97    traded_volumes: AHashMap<(InstrumentId, Decimal), Decimal>,
98    instruments: AHashMap<InstrumentId, InstrumentAny>,
99}
100
101impl BetfairDataLoader {
102    /// Creates a new [`BetfairDataLoader`].
103    #[must_use]
104    pub fn new(currency: Currency, min_notional: Option<Money>) -> Self {
105        Self {
106            currency,
107            min_notional,
108            traded_volumes: AHashMap::new(),
109            instruments: AHashMap::new(),
110        }
111    }
112
113    /// Returns the instruments cached from the most recent load.
114    #[must_use]
115    pub fn instruments(&self) -> &AHashMap<InstrumentId, InstrumentAny> {
116        &self.instruments
117    }
118
119    /// Clears all cached state (instruments and traded volumes).
120    pub fn reset(&mut self) {
121        self.traded_volumes.clear();
122        self.instruments.clear();
123    }
124
125    /// Loads a Betfair historical data file and returns all parsed data items.
126    ///
127    /// Supports gzip-compressed (`.gz`), bzip2-compressed (`.bz2`), and plain JSON files.
128    /// Each line is deserialized as a Betfair stream message. MCM and RCM
129    /// messages are parsed into Nautilus domain objects; other message types
130    /// are skipped.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the file cannot be opened or a line fails to parse.
135    pub fn load(&mut self, filepath: &Path) -> anyhow::Result<Vec<BetfairDataItem>> {
136        let reader = open_reader(filepath)?;
137        let mut items = Vec::new();
138
139        for (line_num, line_result) in reader.lines().enumerate() {
140            let line = line_result.with_context(|| {
141                format!(
142                    "failed to read line {} of '{}'",
143                    line_num + 1,
144                    filepath.display()
145                )
146            })?;
147
148            if line.is_empty() {
149                continue;
150            }
151
152            let msg = match stream_decode(line.as_bytes()) {
153                Ok(msg) => msg,
154                Err(e) => {
155                    log::warn!("Failed to decode line {}: {e}", line_num + 1);
156                    continue;
157                }
158            };
159
160            match msg {
161                StreamMessage::MarketChange(mcm) => self.process_mcm(&mcm, &mut items),
162                StreamMessage::RaceChange(rcm) => Self::process_rcm(&rcm, &mut items),
163                StreamMessage::Connection(_)
164                | StreamMessage::Status(_)
165                | StreamMessage::OrderChange(_) => {}
166            }
167        }
168
169        Ok(items)
170    }
171
172    /// Loads only instrument definitions from a Betfair historical data file.
173    ///
174    /// Scans the file for market definitions and creates instruments, but
175    /// skips all other data processing. Faster than `load()` when only
176    /// instruments are needed.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if the file cannot be opened or parsed.
181    pub fn load_instruments(&mut self, filepath: &Path) -> anyhow::Result<Vec<InstrumentAny>> {
182        let reader = open_reader(filepath)?;
183
184        for line_result in reader.lines() {
185            let line = line_result?;
186            if line.is_empty() {
187                continue;
188            }
189
190            let msg = match stream_decode(line.as_bytes()) {
191                Ok(msg) => msg,
192                Err(_) => continue,
193            };
194
195            if let StreamMessage::MarketChange(mcm) = msg {
196                let Some(market_changes) = &mcm.mc else {
197                    continue;
198                };
199
200                let ts_init = parse_millis_timestamp(mcm.pt);
201
202                for mc in market_changes {
203                    if let Some(def) = &mc.market_definition
204                        && let Ok(instruments) = parse_market_definition(
205                            &mc.id,
206                            def,
207                            self.currency,
208                            ts_init,
209                            self.min_notional,
210                        )
211                    {
212                        for inst in instruments {
213                            self.instruments.insert(inst.id(), inst);
214                        }
215                    }
216                }
217            }
218        }
219
220        Ok(self.instruments.values().cloned().collect())
221    }
222
223    fn process_mcm(&mut self, mcm: &MCM, items: &mut Vec<BetfairDataItem>) {
224        if mcm.is_heartbeat() {
225            return;
226        }
227
228        let Some(market_changes) = &mcm.mc else {
229            return;
230        };
231
232        let ts_event = parse_millis_timestamp(mcm.pt);
233        let ts_init = ts_event;
234
235        for mc in market_changes {
236            let is_snapshot = mc.img;
237            let mut market_closed = false;
238
239            if let Some(def) = &mc.market_definition {
240                // Emit instruments first so sequential consumers (e.g. the backtest
241                // exchange) have the instrument in cache before any status or close
242                // event references it.
243                match parse_market_definition(
244                    &mc.id,
245                    def,
246                    self.currency,
247                    ts_init,
248                    self.min_notional,
249                ) {
250                    Ok(new_instruments) => {
251                        for inst in &new_instruments {
252                            self.instruments.insert(inst.id(), inst.clone());
253                        }
254
255                        for inst in new_instruments {
256                            items.push(BetfairDataItem::Instrument(Box::new(inst)));
257                        }
258                    }
259                    Err(e) => {
260                        log::warn!("Failed to parse market definition for {}: {e}", mc.id);
261                    }
262                }
263
264                if let Some(status) = &def.status {
265                    market_closed = *status == MarketStatus::Closed;
266
267                    for event in parse_instrument_statuses(&mc.id, def, ts_event, ts_init) {
268                        items.push(BetfairDataItem::Status(event));
269                    }
270                }
271
272                for sp in parse_betfair_starting_prices(&mc.id, def, ts_event, ts_init) {
273                    items.push(BetfairDataItem::StartingPrice(sp));
274                }
275
276                for close in parse_instrument_closes(&mc.id, def, ts_event, ts_init) {
277                    items.push(BetfairDataItem::InstrumentClose(close));
278                }
279            }
280
281            // Non-snapshot deltas and BSP deltas are buffered and flushed after
282            // trades/tickers to mirror the Python `market_change_to_updates`
283            // ordering (book deltas first, then BSP). Snapshots go inline per
284            // runner, also matching Python.
285            let mut buffered_deltas: Vec<OrderBookDeltas> = Vec::new();
286            let mut buffered_bsp_deltas: Vec<BetfairBspBookDelta> = Vec::new();
287
288            if let Some(runner_changes) = &mc.rc {
289                for rc in runner_changes {
290                    let handicap = rc.hc.unwrap_or(Decimal::ZERO);
291                    let instrument_id = make_instrument_id(&mc.id, rc.id, handicap);
292
293                    match parse_runner_book_deltas(
294                        instrument_id,
295                        rc,
296                        is_snapshot,
297                        mcm.pt,
298                        ts_event,
299                        ts_init,
300                    ) {
301                        Ok(Some(deltas)) => {
302                            if is_snapshot {
303                                items.push(BetfairDataItem::Deltas(deltas));
304                            } else {
305                                buffered_deltas.push(deltas);
306                            }
307                        }
308                        Ok(None) => {}
309                        Err(e) => {
310                            log::warn!("Failed to parse book deltas for {instrument_id}: {e}");
311                        }
312                    }
313
314                    if let Some(trades) = &rc.trd {
315                        for pv in trades {
316                            if pv.volume == Decimal::ZERO {
317                                continue;
318                            }
319
320                            let key = (instrument_id, pv.price);
321                            let prev_volume = self
322                                .traded_volumes
323                                .get(&key)
324                                .copied()
325                                .unwrap_or(Decimal::ZERO);
326
327                            if pv.volume <= prev_volume {
328                                continue;
329                            }
330
331                            let trade_volume = pv.volume - prev_volume;
332                            self.traded_volumes.insert(key, pv.volume);
333
334                            let price =
335                                match Price::from_decimal_dp(pv.price, BETFAIR_PRICE_PRECISION) {
336                                    Ok(p) => p,
337                                    Err(e) => {
338                                        log::warn!("Invalid trade price: {e}");
339                                        continue;
340                                    }
341                                };
342                            let size = match Quantity::from_decimal_dp(
343                                trade_volume,
344                                BETFAIR_QUANTITY_PRECISION,
345                            ) {
346                                Ok(q) => q,
347                                Err(e) => {
348                                    log::warn!("Invalid trade size: {e}");
349                                    continue;
350                                }
351                            };
352                            let trade_id =
353                                TradeId::new(format!("{}-{}-{}", mcm.pt, rc.id, pv.price));
354                            let tick = make_trade_tick(
355                                instrument_id,
356                                price,
357                                size,
358                                trade_id,
359                                ts_event,
360                                ts_init,
361                            );
362                            items.push(BetfairDataItem::Trade(tick));
363                        }
364                    }
365
366                    if let Some(ticker) = parse_betfair_ticker(instrument_id, rc, ts_event, ts_init)
367                    {
368                        items.push(BetfairDataItem::Ticker(ticker));
369                    }
370
371                    buffered_bsp_deltas.extend(parse_bsp_book_deltas(
372                        instrument_id,
373                        rc,
374                        ts_event,
375                        ts_init,
376                    ));
377                }
378            }
379
380            for deltas in buffered_deltas {
381                items.push(BetfairDataItem::Deltas(deltas));
382            }
383
384            for bsp_delta in buffered_bsp_deltas {
385                items.push(BetfairDataItem::BspBookDelta(bsp_delta));
386            }
387
388            if market_closed {
389                let prefix = format!("{}-", mc.id);
390                self.traded_volumes
391                    .retain(|k, _| !k.0.symbol.as_str().starts_with(&prefix));
392            }
393        }
394
395        items.push(BetfairDataItem::SequenceCompleted(
396            BetfairSequenceCompleted::new(ts_event, ts_init),
397        ));
398    }
399
400    fn process_rcm(rcm: &RCM, items: &mut Vec<BetfairDataItem>) {
401        let Some(race_changes) = &rcm.rc else {
402            return;
403        };
404
405        let fallback_ts = parse_millis_timestamp(rcm.pt);
406
407        for rc in race_changes {
408            let race_id = rc.id.as_deref().unwrap_or("");
409            let market_id = rc.mid.as_deref().unwrap_or("");
410
411            if let Some(runners) = &rc.rrc {
412                for rrc in runners {
413                    let ts_event = rrc.ft.map_or(fallback_ts, parse_millis_timestamp);
414
415                    if let Some(runner) =
416                        parse_race_runner_data(race_id, market_id, rrc, ts_event, ts_event)
417                    {
418                        items.push(BetfairDataItem::RaceRunnerData(runner));
419                    }
420                }
421            }
422
423            if let Some(rpc) = &rc.rpc {
424                let ts_event = rpc.ft.map_or(fallback_ts, parse_millis_timestamp);
425                let progress = parse_race_progress(race_id, market_id, rpc, ts_event, ts_event);
426                items.push(BetfairDataItem::RaceProgress(progress));
427            }
428        }
429    }
430}
431
432fn open_reader(filepath: &Path) -> anyhow::Result<Box<dyn BufRead>> {
433    let file =
434        File::open(filepath).with_context(|| format!("failed to open '{}'", filepath.display()))?;
435
436    let ext = filepath.extension().and_then(|e| e.to_str()).unwrap_or("");
437
438    if ext.eq_ignore_ascii_case("gz") {
439        Ok(Box::new(BufReader::new(GzDecoder::new(file))))
440    } else if ext.eq_ignore_ascii_case("bz2") {
441        Ok(Box::new(BufReader::new(BzDecoder::new(file))))
442    } else {
443        Ok(Box::new(BufReader::new(file)))
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use std::path::PathBuf;
450
451    use rstest::rstest;
452
453    use super::*;
454    use crate::common::testing::load_test_json;
455
456    fn compact_json(pretty: &str) -> String {
457        let value: serde_json::Value = serde_json::from_str(pretty).unwrap();
458        serde_json::to_string(&value).unwrap()
459    }
460
461    fn local_data_dir() -> PathBuf {
462        PathBuf::from(env!("CARGO_MANIFEST_DIR"))
463            .ancestors()
464            .nth(3)
465            .unwrap()
466            .join("tests/test_data/local/betfair")
467    }
468
469    fn test_data_dir() -> PathBuf {
470        PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data")
471    }
472
473    #[rstest]
474    fn test_load_bz2_file() {
475        let filepath = test_data_dir().join("stream/sample.bz2");
476        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
477        let items = loader.load(&filepath).unwrap();
478
479        let instrument_count = items
480            .iter()
481            .filter(|i| matches!(i, BetfairDataItem::Instrument(_)))
482            .count();
483        assert!(
484            instrument_count > 0,
485            "should parse instruments from bz2 file"
486        );
487        assert_eq!(loader.instruments().len(), instrument_count);
488
489        let has_sequence = items
490            .iter()
491            .any(|i| matches!(i, BetfairDataItem::SequenceCompleted(_)));
492        assert!(has_sequence, "should emit SequenceCompleted");
493    }
494
495    #[rstest]
496    fn test_load_single_mcm_line() {
497        let data = compact_json(&load_test_json("stream/mcm_SUB_IMAGE.json"));
498        let tmp_dir = std::env::temp_dir().join("betfair_test");
499        std::fs::create_dir_all(&tmp_dir).unwrap();
500        let tmp_file = tmp_dir.join("test_single_mcm.json");
501        std::fs::write(&tmp_file, &data).unwrap();
502
503        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
504        let items = loader.load(&tmp_file).unwrap();
505
506        let instrument_count = items
507            .iter()
508            .filter(|i| matches!(i, BetfairDataItem::Instrument(_)))
509            .count();
510        assert!(
511            instrument_count > 0,
512            "should parse instruments from market definition"
513        );
514        assert_eq!(loader.instruments().len(), instrument_count);
515
516        let has_sequence = items
517            .iter()
518            .any(|i| matches!(i, BetfairDataItem::SequenceCompleted(_)));
519        assert!(has_sequence, "should emit SequenceCompleted");
520
521        std::fs::remove_file(&tmp_file).ok();
522    }
523
524    #[rstest]
525    fn test_load_mcm_with_book_data() {
526        let sub_image = compact_json(&load_test_json("stream/mcm_SUB_IMAGE.json"));
527        let update = compact_json(&load_test_json("stream/mcm_UPDATE.json"));
528
529        let tmp_dir = std::env::temp_dir().join("betfair_test");
530        std::fs::create_dir_all(&tmp_dir).unwrap();
531        let tmp_file = tmp_dir.join("test_book_data.json");
532        std::fs::write(&tmp_file, format!("{sub_image}\n{update}")).unwrap();
533
534        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
535        let items = loader.load(&tmp_file).unwrap();
536
537        let deltas_count = items
538            .iter()
539            .filter(|i| matches!(i, BetfairDataItem::Deltas(_)))
540            .count();
541        assert!(deltas_count > 0, "should parse book deltas");
542
543        std::fs::remove_file(&tmp_file).ok();
544    }
545
546    #[rstest]
547    fn test_load_instruments_only() {
548        let sub_image = compact_json(&load_test_json("stream/mcm_SUB_IMAGE.json"));
549        let update = compact_json(&load_test_json("stream/mcm_UPDATE.json"));
550
551        let tmp_dir = std::env::temp_dir().join("betfair_test");
552        std::fs::create_dir_all(&tmp_dir).unwrap();
553        let tmp_file = tmp_dir.join("test_instruments_only.json");
554        std::fs::write(&tmp_file, format!("{sub_image}\n{update}")).unwrap();
555
556        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
557        let instruments = loader.load_instruments(&tmp_file).unwrap();
558
559        assert!(!instruments.is_empty(), "should find instruments");
560        assert_eq!(loader.instruments().len(), instruments.len());
561
562        std::fs::remove_file(&tmp_file).ok();
563    }
564
565    #[rstest]
566    fn test_reset_clears_state() {
567        let data = compact_json(&load_test_json("stream/mcm_SUB_IMAGE.json"));
568        let tmp_dir = std::env::temp_dir().join("betfair_test");
569        std::fs::create_dir_all(&tmp_dir).unwrap();
570        let tmp_file = tmp_dir.join("test_reset.json");
571        std::fs::write(&tmp_file, &data).unwrap();
572
573        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
574        loader.load(&tmp_file).unwrap();
575        assert!(!loader.instruments().is_empty());
576
577        loader.reset();
578        assert!(loader.instruments().is_empty());
579        assert!(loader.traded_volumes.is_empty());
580
581        std::fs::remove_file(&tmp_file).ok();
582    }
583
584    #[rstest]
585    fn test_load_bsp_data() {
586        let raw = load_test_json("stream/mcm_BSP.json");
587        let messages: Vec<serde_json::Value> = serde_json::from_str(&raw).unwrap();
588        let lines: Vec<String> = messages
589            .iter()
590            .map(|v| serde_json::to_string(v).unwrap())
591            .collect();
592
593        let tmp_dir = std::env::temp_dir().join("betfair_test");
594        std::fs::create_dir_all(&tmp_dir).unwrap();
595        let tmp_file = tmp_dir.join("test_bsp.json");
596        std::fs::write(&tmp_file, lines.join("\n")).unwrap();
597
598        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
599        let items = loader.load(&tmp_file).unwrap();
600
601        let bsp_count = items
602            .iter()
603            .filter(|i| matches!(i, BetfairDataItem::BspBookDelta(_)))
604            .count();
605        assert!(bsp_count > 0, "should parse BSP book deltas");
606
607        std::fs::remove_file(&tmp_file).ok();
608    }
609
610    #[rstest]
611    fn test_load_market_definition_with_traded_volumes() {
612        let sub_image = compact_json(&load_test_json("stream/mcm_SUB_IMAGE.json"));
613        let update_tv = compact_json(&load_test_json("stream/mcm_UPDATE_tv.json"));
614
615        let tmp_dir = std::env::temp_dir().join("betfair_test");
616        std::fs::create_dir_all(&tmp_dir).unwrap();
617        let tmp_file = tmp_dir.join("test_tv.json");
618        std::fs::write(&tmp_file, format!("{sub_image}\n{update_tv}")).unwrap();
619
620        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
621        let items = loader.load(&tmp_file).unwrap();
622
623        let ticker_count = items
624            .iter()
625            .filter(|i| matches!(i, BetfairDataItem::Ticker(_)))
626            .count();
627        assert!(ticker_count > 0, "should parse ticker data from tv updates");
628
629        std::fs::remove_file(&tmp_file).ok();
630    }
631
632    #[rstest]
633    #[ignore] // Requires user-fetched data in tests/test_data/local/betfair/
634    fn test_load_match_odds_file() {
635        let filepath = local_data_dir().join("1.253378068.gz");
636        if !filepath.exists() {
637            eprintln!("Skipping: {filepath:?} not found");
638            return;
639        }
640
641        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
642        let items = loader.load(&filepath).unwrap();
643
644        let instrument_count = items
645            .iter()
646            .filter(|i| matches!(i, BetfairDataItem::Instrument(_)))
647            .count();
648        let deltas_count = items
649            .iter()
650            .filter(|i| matches!(i, BetfairDataItem::Deltas(_)))
651            .count();
652        let trade_count = items
653            .iter()
654            .filter(|i| matches!(i, BetfairDataItem::Trade(_)))
655            .count();
656        let close_count = items
657            .iter()
658            .filter(|i| matches!(i, BetfairDataItem::InstrumentClose(_)))
659            .count();
660
661        println!(
662            "Match odds file: {instrument_count} instruments, {deltas_count} deltas, {trade_count} trades, {close_count} closes"
663        );
664        println!("Total items: {}", items.len());
665
666        // 3 runners (home/draw/away), emitted on each market definition
667        assert!(instrument_count >= 3, "expected at least 3 instruments");
668        assert!(deltas_count > 0, "expected book deltas");
669        assert!(trade_count > 0, "expected trade ticks");
670        assert!(close_count > 0, "expected instrument closes at settlement");
671
672        // Winner should be runner 2426
673        let closes: Vec<_> = items
674            .iter()
675            .filter_map(|i| match i {
676                BetfairDataItem::InstrumentClose(c) => Some(c),
677                _ => None,
678            })
679            .collect();
680        let winner = closes.iter().find(|c| c.close_price == Price::from("1.00"));
681        assert!(winner.is_some(), "expected a winner with close_price 1.00");
682        assert!(
683            winner
684                .unwrap()
685                .instrument_id
686                .symbol
687                .as_str()
688                .contains("2426"),
689            "winner should be runner 2426"
690        );
691    }
692
693    #[rstest]
694    #[ignore] // Requires user-fetched data in tests/test_data/local/betfair/
695    fn test_load_racing_win_file() {
696        let filepath = local_data_dir().join("1.245077076.gz");
697        if !filepath.exists() {
698            eprintln!("Skipping: {filepath:?} not found");
699            return;
700        }
701
702        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
703        let items = loader.load(&filepath).unwrap();
704
705        let instrument_count = items
706            .iter()
707            .filter(|i| matches!(i, BetfairDataItem::Instrument(_)))
708            .count();
709        let deltas_count = items
710            .iter()
711            .filter(|i| matches!(i, BetfairDataItem::Deltas(_)))
712            .count();
713        let trade_count = items
714            .iter()
715            .filter(|i| matches!(i, BetfairDataItem::Trade(_)))
716            .count();
717        let close_count = items
718            .iter()
719            .filter(|i| matches!(i, BetfairDataItem::InstrumentClose(_)))
720            .count();
721
722        println!(
723            "Racing file: {instrument_count} instruments, {deltas_count} deltas, {trade_count} trades, {close_count} closes"
724        );
725        println!("Total items: {}", items.len());
726
727        // 6 runners (though 2 removed during the race)
728        assert!(instrument_count >= 6, "expected at least 6 instruments");
729        assert!(deltas_count > 0, "expected book deltas");
730        assert!(trade_count > 0, "expected trade ticks");
731        assert!(close_count > 0, "expected instrument closes at settlement");
732
733        // Winner should be runner 75925986
734        let closes: Vec<_> = items
735            .iter()
736            .filter_map(|i| match i {
737                BetfairDataItem::InstrumentClose(c) => Some(c),
738                _ => None,
739            })
740            .collect();
741        let winner = closes.iter().find(|c| c.close_price == Price::from("1.00"));
742        assert!(winner.is_some(), "expected a winner with close_price 1.00");
743        assert!(
744            winner
745                .unwrap()
746                .instrument_id
747                .symbol
748                .as_str()
749                .contains("75925986"),
750            "winner should be runner 75925986"
751        );
752    }
753
754    fn write_tmp(contents: &str, name: &str) -> PathBuf {
755        let tmp_dir = std::env::temp_dir().join("betfair_test");
756        std::fs::create_dir_all(&tmp_dir).unwrap();
757        let tmp_file = tmp_dir.join(name);
758        std::fs::write(&tmp_file, contents).unwrap();
759        tmp_file
760    }
761
762    fn find_first(
763        items: &[BetfairDataItem],
764        pred: impl Fn(&BetfairDataItem) -> bool,
765    ) -> Option<usize> {
766        items.iter().position(pred)
767    }
768
769    fn find_last(
770        items: &[BetfairDataItem],
771        pred: impl Fn(&BetfairDataItem) -> bool,
772    ) -> Option<usize> {
773        items.iter().rposition(pred)
774    }
775
776    /// Split the loader output into per-MCM slices using `SequenceCompleted`
777    /// as the delimiter. Each MCM ends with one `SequenceCompleted` item.
778    fn partition_by_mcm(items: &[BetfairDataItem]) -> Vec<&[BetfairDataItem]> {
779        let mut partitions = Vec::new();
780        let mut start = 0;
781
782        for (i, item) in items.iter().enumerate() {
783            if matches!(item, BetfairDataItem::SequenceCompleted(_)) {
784                partitions.push(&items[start..=i]);
785                start = i + 1;
786            }
787        }
788
789        partitions
790    }
791
792    #[rstest]
793    fn test_load_emits_instrument_before_status_and_close() {
794        // The loader must emit `Instrument` events before any `InstrumentStatus`
795        // or `InstrumentClose` in the same MCM so downstream consumers (e.g. the
796        // backtest exchange) have the instrument cached before lifecycle events
797        // are processed.
798        let data = compact_json(&load_test_json("stream/mcm_UPDATE_md.json"));
799        let tmp_file = write_tmp(&data, "test_order_instrument_first.json");
800
801        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
802        let items = loader.load(&tmp_file).unwrap();
803
804        let instrument_idx =
805            find_first(&items, |i| matches!(i, BetfairDataItem::Instrument(_))).unwrap();
806        let status_idx = find_first(&items, |i| matches!(i, BetfairDataItem::Status(_))).unwrap();
807
808        assert!(
809            instrument_idx < status_idx,
810            "Instrument (idx {instrument_idx}) must precede Status (idx {status_idx})"
811        );
812
813        std::fs::remove_file(&tmp_file).ok();
814    }
815
816    #[rstest]
817    fn test_load_emits_instrument_before_close() {
818        // Synthetic fixture: market CLOSED with terminal runner statuses so that
819        // both Instrument and InstrumentClose are emitted within the same MCM.
820        // Instrument must appear first.
821        let mcm = r#"{"op":"mcm","id":1,"pt":1627617202953,"ct":"SUB_IMAGE","mc":[{"id":"1.1","marketDefinition":{"bspMarket":false,"turnInPlayEnabled":false,"persistenceEnabled":false,"marketBaseRate":5,"eventId":"1","eventTypeId":"1","numberOfWinners":1,"bettingType":"ODDS","marketType":"WIN","marketTime":"2021-07-30T03:55:00.000Z","bspReconciled":true,"complete":true,"inPlay":false,"crossMatching":false,"runnersVoidable":false,"numberOfActiveRunners":0,"betDelay":0,"status":"CLOSED","runners":[{"status":"WINNER","sortPriority":1,"id":101},{"status":"LOSER","sortPriority":2,"id":102}],"regulators":["MR_INT"],"discountAllowed":true,"timezone":"UTC","openDate":"2021-07-30T02:45:00.000Z","version":1,"priceLadderDefinition":{"type":"CLASSIC"}}}]}"#;
822        let tmp_file = write_tmp(mcm, "test_order_instrument_before_close.json");
823
824        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
825        let items = loader.load(&tmp_file).unwrap();
826
827        let instrument_idx =
828            find_first(&items, |i| matches!(i, BetfairDataItem::Instrument(_))).unwrap();
829        let close_idx =
830            find_first(&items, |i| matches!(i, BetfairDataItem::InstrumentClose(_))).unwrap();
831
832        assert!(
833            instrument_idx < close_idx,
834            "Instrument (idx {instrument_idx}) must precede InstrumentClose (idx {close_idx})"
835        );
836
837        std::fs::remove_file(&tmp_file).ok();
838    }
839
840    #[rstest]
841    fn test_load_non_snapshot_deltas_tail_after_trades() {
842        // Non-snapshot runner updates must emit book deltas AFTER any trades or
843        // tickers parsed from the same message, matching the Python
844        // `market_change_to_updates` ordering and keeping live/backtest in step.
845        let data = compact_json(&load_test_json("stream/mcm_live_UPDATE.json"));
846        let tmp_file = write_tmp(&data, "test_order_deltas_tail.json");
847
848        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
849        let items = loader.load(&tmp_file).unwrap();
850
851        // Fixture is a single non-snapshot MCM (img=None) with trd + atl on one rc
852        let last_trade_idx = find_last(&items, |i| matches!(i, BetfairDataItem::Trade(_))).unwrap();
853        let first_deltas_idx =
854            find_first(&items, |i| matches!(i, BetfairDataItem::Deltas(_))).unwrap();
855
856        assert!(
857            first_deltas_idx > last_trade_idx,
858            "Deltas (first idx {first_deltas_idx}) must tail after Trade (last idx {last_trade_idx}) on non-snapshot updates"
859        );
860
861        std::fs::remove_file(&tmp_file).ok();
862    }
863
864    #[rstest]
865    fn test_load_snapshot_deltas_emit_inline_before_trades() {
866        // Snapshot messages (mc.img=true) emit Clear+Add deltas inline per runner
867        // so consumers can apply the book state before any trades in the same
868        // MCM. This matches Python's inline-snapshot behaviour.
869        let data = compact_json(&load_test_json(
870            "stream/market_definition_runner_removed.json",
871        ));
872        let tmp_file = write_tmp(&data, "test_order_snapshot_inline.json");
873
874        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
875        let items = loader.load(&tmp_file).unwrap();
876
877        let first_deltas_idx =
878            find_first(&items, |i| matches!(i, BetfairDataItem::Deltas(_))).unwrap();
879        let first_trade_idx =
880            find_first(&items, |i| matches!(i, BetfairDataItem::Trade(_))).unwrap();
881
882        assert!(
883            first_deltas_idx < first_trade_idx,
884            "Snapshot Deltas (first idx {first_deltas_idx}) must emit before Trade (first idx {first_trade_idx})"
885        );
886
887        std::fs::remove_file(&tmp_file).ok();
888    }
889
890    #[rstest]
891    fn test_load_bsp_tails_after_book_deltas() {
892        // Within each MCM, BSP deltas must emit after all regular book deltas.
893        // Python flushes `book_updates` before `bsp_book_updates`; the Rust
894        // loader must do the same to preserve consumer ordering.
895        let raw = load_test_json("stream/mcm_BSP.json");
896        let messages: Vec<serde_json::Value> = serde_json::from_str(&raw).unwrap();
897        let lines: Vec<String> = messages
898            .iter()
899            .map(|v| serde_json::to_string(v).unwrap())
900            .collect();
901        let tmp_file = write_tmp(&lines.join("\n"), "test_order_bsp_tail.json");
902
903        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
904        let items = loader.load(&tmp_file).unwrap();
905
906        let partitions = partition_by_mcm(&items);
907        assert!(
908            !partitions.is_empty(),
909            "expected at least one MCM partition"
910        );
911
912        let mut checked_any = false;
913
914        for partition in partitions {
915            let last_deltas_idx = find_last(partition, |i| matches!(i, BetfairDataItem::Deltas(_)));
916            let first_bsp_idx =
917                find_first(partition, |i| matches!(i, BetfairDataItem::BspBookDelta(_)));
918
919            if let (Some(last_deltas), Some(first_bsp)) = (last_deltas_idx, first_bsp_idx) {
920                assert!(
921                    last_deltas < first_bsp,
922                    "BspBookDelta (first idx {first_bsp}) must tail after Deltas (last idx {last_deltas}) within the same MCM"
923                );
924                checked_any = true;
925            }
926        }
927
928        assert!(
929            checked_any,
930            "expected at least one MCM to contain both Deltas and BspBookDelta"
931        );
932
933        std::fs::remove_file(&tmp_file).ok();
934    }
935
936    #[rstest]
937    fn test_load_emits_close_for_removed_runner_while_market_open() {
938        // Removed runners must fire InstrumentClose as soon as the market
939        // definition reports the Removed status, regardless of whether the
940        // market as a whole is still Open. This matches the Python parser.
941        let mcm = r#"{"op":"mcm","id":1,"pt":1627617202953,"ct":"SUB_IMAGE","mc":[{"id":"1.2","marketDefinition":{"bspMarket":false,"turnInPlayEnabled":false,"persistenceEnabled":false,"marketBaseRate":5,"eventId":"1","eventTypeId":"1","numberOfWinners":1,"bettingType":"ODDS","marketType":"WIN","marketTime":"2021-07-30T03:55:00.000Z","bspReconciled":false,"complete":true,"inPlay":false,"crossMatching":false,"runnersVoidable":false,"numberOfActiveRunners":1,"betDelay":0,"status":"OPEN","runners":[{"status":"ACTIVE","sortPriority":1,"id":201},{"status":"REMOVED","sortPriority":2,"id":202}],"regulators":["MR_INT"],"discountAllowed":true,"timezone":"UTC","openDate":"2021-07-30T02:45:00.000Z","version":1,"priceLadderDefinition":{"type":"CLASSIC"}}}]}"#;
942        let tmp_file = write_tmp(mcm, "test_order_close_for_removed.json");
943
944        let mut loader = BetfairDataLoader::new(Currency::GBP(), None);
945        let items = loader.load(&tmp_file).unwrap();
946
947        let closes: Vec<_> = items
948            .iter()
949            .filter_map(|i| match i {
950                BetfairDataItem::InstrumentClose(c) => Some(c),
951                _ => None,
952            })
953            .collect();
954
955        assert_eq!(
956            closes.len(),
957            1,
958            "Removed runner must produce exactly one InstrumentClose while market is Open"
959        );
960        assert!(
961            closes[0].instrument_id.symbol.as_str().contains("202"),
962            "close must target the removed runner (selection id 202)"
963        );
964
965        std::fs::remove_file(&tmp_file).ok();
966    }
967}