Skip to main content

nautilus_betfair/stream/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing utilities that convert Betfair stream messages into Nautilus domain models.
17//!
18//! MCM (Market Change Messages) are parsed into order book deltas, trade ticks,
19//! and instrument status updates. OCM (Order Change Messages) are parsed into
20//! order status reports and fill reports.
21
22use ahash::{AHashMap, AHashSet};
23use nautilus_core::UnixNanos;
24use nautilus_model::{
25    data::{
26        BookOrder, InstrumentClose, InstrumentStatus, OrderBookDelta, OrderBookDeltas, TradeTick,
27    },
28    enums::{
29        AggressorSide, BookAction, InstrumentCloseType, LiquiditySide, MarketStatusAction,
30        OrderSide, OrderType, RecordFlag, TimeInForce,
31    },
32    identifiers::{AccountId, ClientOrderId, InstrumentId, TradeId, VenueOrderId},
33    reports::{FillReport, OrderStatusReport},
34    types::{Currency, Money, Price, Quantity},
35};
36use rust_decimal::Decimal;
37
38use crate::{
39    common::{
40        consts::{BETFAIR_PRICE_PRECISION, BETFAIR_QUANTITY_PRECISION},
41        enums::{MarketStatus, RunnerStatus, StreamingOrderStatus, resolve_streaming_order_status},
42        parse::{make_instrument_id, parse_millis_timestamp},
43    },
44    data_types::{
45        BetfairBspBookDelta, BetfairRaceProgress, BetfairRaceRunnerData, BetfairStartingPrice,
46        BetfairTicker,
47    },
48    stream::messages::{
49        MarketDefinition, RaceProgressChange, RaceRunnerChange, RunnerChange, UnmatchedOrder,
50    },
51};
52
53/// Parses a single runner's book data into [`OrderBookDeltas`].
54///
55/// Handles both full image snapshots (`is_snapshot = true`) and delta updates.
56///
57/// Only processes price-keyed fields (`atb`/`atl`). Level-indexed fields
58/// (`batb`/`batl`/`bdatb`/`bdatl`) are ignored because correct delta
59/// processing requires stateful level-to-price tracking. The stream
60/// subscription should use `EX_ALL_OFFERS` which populates `atb`/`atl`.
61///
62/// Book side mapping (Betfair exchange convention):
63/// - `atb` (available to back) -> [`OrderSide::Buy`] (bid side)
64/// - `atl` (available to lay) -> [`OrderSide::Sell`] (ask side)
65///
66/// For snapshots: emits a Clear delta followed by Add deltas for each level.
67/// For updates: emits Update or Delete (when volume is zero) deltas.
68///
69/// Returns `Ok(None)` if the runner change contains no processable book data.
70///
71/// # Errors
72///
73/// Returns an error if price or quantity values cannot be converted.
74pub fn parse_runner_book_deltas(
75    instrument_id: InstrumentId,
76    rc: &RunnerChange,
77    is_snapshot: bool,
78    sequence: u64,
79    ts_event: UnixNanos,
80    ts_init: UnixNanos,
81) -> anyhow::Result<Option<OrderBookDeltas>> {
82    let atb_len = rc.atb.as_ref().map_or(0, Vec::len);
83    let atl_len = rc.atl.as_ref().map_or(0, Vec::len);
84    let total_levels = atb_len + atl_len;
85
86    if total_levels == 0 && !is_snapshot {
87        return Ok(None);
88    }
89
90    let snapshot_flags = if is_snapshot {
91        RecordFlag::F_SNAPSHOT as u8
92    } else {
93        0
94    };
95    let mut deltas = Vec::with_capacity(total_levels + usize::from(is_snapshot));
96
97    if is_snapshot {
98        let mut clear = OrderBookDelta::clear(instrument_id, sequence, ts_event, ts_init);
99
100        if total_levels == 0 {
101            clear.flags |= RecordFlag::F_LAST as u8;
102        }
103        deltas.push(clear);
104    }
105
106    // Buy side (bid): atb (available to back) is price-keyed
107    for pv in rc.atb.as_deref().unwrap_or(&[]) {
108        let action = if is_snapshot {
109            BookAction::Add
110        } else if pv.volume == Decimal::ZERO {
111            BookAction::Delete
112        } else {
113            BookAction::Update
114        };
115
116        deltas.push(OrderBookDelta::new(
117            instrument_id,
118            action,
119            BookOrder::new(
120                OrderSide::Buy,
121                Price::from_decimal_dp(pv.price, BETFAIR_PRICE_PRECISION)?,
122                Quantity::from_decimal_dp(pv.volume, BETFAIR_QUANTITY_PRECISION)?,
123                0,
124            ),
125            snapshot_flags,
126            sequence,
127            ts_event,
128            ts_init,
129        ));
130    }
131
132    // Sell side (ask): atl (available to lay) is price-keyed
133    for pv in rc.atl.as_deref().unwrap_or(&[]) {
134        let action = if is_snapshot {
135            BookAction::Add
136        } else if pv.volume == Decimal::ZERO {
137            BookAction::Delete
138        } else {
139            BookAction::Update
140        };
141
142        deltas.push(OrderBookDelta::new(
143            instrument_id,
144            action,
145            BookOrder::new(
146                OrderSide::Sell,
147                Price::from_decimal_dp(pv.price, BETFAIR_PRICE_PRECISION)?,
148                Quantity::from_decimal_dp(pv.volume, BETFAIR_QUANTITY_PRECISION)?,
149                0,
150            ),
151            snapshot_flags,
152            sequence,
153            ts_event,
154            ts_init,
155        ));
156    }
157
158    // Set F_LAST on the final delta
159    if let Some(last) = deltas.last_mut() {
160        last.flags |= RecordFlag::F_LAST as u8;
161    }
162
163    Ok(Some(OrderBookDeltas::new(instrument_id, deltas)))
164}
165
166/// Creates a [`TradeTick`] from stream data.
167///
168/// Betfair does not identify the aggressor side in its stream, so
169/// [`AggressorSide::NoAggressor`] is always used.
170#[must_use]
171pub fn make_trade_tick(
172    instrument_id: InstrumentId,
173    price: Price,
174    size: Quantity,
175    trade_id: TradeId,
176    ts_event: UnixNanos,
177    ts_init: UnixNanos,
178) -> TradeTick {
179    TradeTick::new(
180        instrument_id,
181        price,
182        size,
183        AggressorSide::NoAggressor,
184        trade_id,
185        ts_event,
186        ts_init,
187    )
188}
189
190/// Produces per-runner [`InstrumentStatus`] events from a market definition.
191///
192/// Iterates `def.runners` and maps each runner's lifecycle to a Nautilus status.
193/// Scratched runners (`Removed`, `RemovedVacant`) close immediately regardless
194/// of market-level state. The `in_play` flag distinguishes pre-open (Open + not
195/// in play) from active trading (Open + in play).
196///
197/// Returns an empty vector when `def.status` or `def.runners` is missing.
198#[must_use]
199pub fn parse_instrument_statuses(
200    market_id: &str,
201    def: &MarketDefinition,
202    ts_event: UnixNanos,
203    ts_init: UnixNanos,
204) -> Vec<InstrumentStatus> {
205    let Some(status) = def.status else {
206        return Vec::new();
207    };
208    let Some(runners) = &def.runners else {
209        return Vec::new();
210    };
211    let in_play = def.in_play.unwrap_or(false);
212
213    runners
214        .iter()
215        .map(|rd| {
216            let handicap = rd.hc.unwrap_or(Decimal::ZERO);
217            let instrument_id = make_instrument_id(market_id, rd.id, handicap);
218            let action = match rd.status {
219                Some(RunnerStatus::Removed | RunnerStatus::RemovedVacant) => {
220                    MarketStatusAction::Close
221                }
222                _ => match (status, in_play) {
223                    (MarketStatus::Inactive, _) => MarketStatusAction::Close,
224                    (MarketStatus::Open, false) => MarketStatusAction::PreOpen,
225                    (MarketStatus::Open, true) => MarketStatusAction::Trading,
226                    (MarketStatus::Suspended, _) => MarketStatusAction::Pause,
227                    (MarketStatus::Closed, _) => MarketStatusAction::Close,
228                },
229            };
230            let is_trading = matches!(status, MarketStatus::Open)
231                && in_play
232                && !matches!(
233                    rd.status,
234                    Some(RunnerStatus::Removed | RunnerStatus::RemovedVacant)
235                );
236            InstrumentStatus::new(
237                instrument_id,
238                action,
239                ts_event,
240                ts_init,
241                None,
242                None,
243                Some(is_trading),
244                None,
245                None,
246            )
247        })
248        .collect()
249}
250
251/// Generates a deterministic [`TradeId`] for a Betfair fill.
252///
253/// Uses `bet_id` and cumulative `sm` (size matched) which together uniquely
254/// identify each fill state, since `sm` increases monotonically with each fill.
255pub fn make_trade_id(uo: &UnmatchedOrder) -> TradeId {
256    let sm = uo.sm.unwrap_or(Decimal::ZERO);
257    TradeId::new(format!("{}-{sm}", uo.id))
258}
259
260/// Tracks cumulative fill state per bet to compute incremental fills from the
261/// Betfair OCM stream.
262///
263/// Betfair provides cumulative `sm` (size matched) and `avp` (average price
264/// matched) on each order update. This tracker maintains per-bet state to
265/// derive individual fill quantities and prices for each update.
266#[derive(Debug, Default)]
267pub struct FillTracker {
268    filled_qty: AHashMap<String, Decimal>,
269    avg_px: AHashMap<String, Decimal>,
270    published_trade_ids: AHashSet<String>,
271}
272
273impl FillTracker {
274    /// Creates a new [`FillTracker`] instance.
275    #[must_use]
276    pub fn new() -> Self {
277        Self::default()
278    }
279
280    /// Computes an incremental [`FillReport`] for an unmatched order update.
281    ///
282    /// Returns `None` if no new fill occurred (size matched unchanged,
283    /// duplicate trade ID, or overfill detected).
284    #[expect(clippy::too_many_arguments)]
285    pub fn maybe_fill_report(
286        &mut self,
287        uo: &UnmatchedOrder,
288        order_qty: Decimal,
289        instrument_id: InstrumentId,
290        account_id: AccountId,
291        currency: Currency,
292        ts_event: UnixNanos,
293        ts_init: UnixNanos,
294    ) -> Option<FillReport> {
295        let sm = uo.sm?;
296
297        if sm <= Decimal::ZERO {
298            return None;
299        }
300
301        let prev_filled = self
302            .filled_qty
303            .get(&uo.id)
304            .copied()
305            .unwrap_or(Decimal::ZERO);
306
307        if sm <= prev_filled {
308            return None;
309        }
310
311        let order_qty = resolve_stream_order_quantity(order_qty, uo);
312
313        // Overfill guard
314        if sm > order_qty {
315            log::warn!(
316                "Rejecting potential overfill for bet_id={}: order_qty={order_qty}, sm={sm}",
317                uo.id,
318            );
319            return None;
320        }
321
322        let trade_id = make_trade_id(uo);
323
324        if self.published_trade_ids.contains(trade_id.as_str()) {
325            return None;
326        }
327
328        let fill_qty_dec = sm - prev_filled;
329        let fill_price = self.compute_fill_price(uo, prev_filled);
330
331        let last_qty = Quantity::from_decimal_dp(fill_qty_dec, BETFAIR_QUANTITY_PRECISION).ok()?;
332        let last_px = Price::from_decimal_dp(fill_price, BETFAIR_PRICE_PRECISION).ok()?;
333
334        // Update state before emitting
335        self.filled_qty.insert(uo.id.clone(), sm);
336
337        if let Some(avp) = uo.avp {
338            self.avg_px.insert(uo.id.clone(), avp);
339        }
340
341        self.published_trade_ids.insert(trade_id.to_string());
342
343        let venue_order_id = VenueOrderId::from(uo.id.as_str());
344        let order_side = OrderSide::from(uo.side);
345        let client_order_id = uo.rfo.as_deref().map(ClientOrderId::from);
346        let ts_fill = uo.md.map_or(ts_event, parse_millis_timestamp);
347
348        Some(make_fill_report(
349            account_id,
350            instrument_id,
351            venue_order_id,
352            trade_id,
353            order_side,
354            last_qty,
355            last_px,
356            currency,
357            client_order_id,
358            ts_fill,
359            ts_init,
360        ))
361    }
362
363    /// Back-calculates the individual fill price from Betfair's cumulative
364    /// average price matched (`avp`).
365    ///
366    /// For the first fill, the average price IS the fill price. For subsequent
367    /// fills, the individual price is derived from:
368    /// `fill_price = (avp * sm - prev_avp * prev_sm) / fill_size`
369    fn compute_fill_price(&self, uo: &UnmatchedOrder, prev_filled: Decimal) -> Decimal {
370        let Some(avp) = uo.avp else {
371            return uo.p;
372        };
373
374        if prev_filled == Decimal::ZERO {
375            return avp;
376        }
377
378        let Some(prev_avg) = self.avg_px.get(&uo.id).copied() else {
379            return avp;
380        };
381
382        if prev_avg == avp {
383            return avp;
384        }
385
386        let sm = uo.sm.unwrap_or(Decimal::ZERO);
387        let fill_size = sm - prev_filled;
388
389        if fill_size == Decimal::ZERO {
390            return prev_avg;
391        }
392
393        let fill_price = (avp * sm - prev_avg * prev_filled) / fill_size;
394
395        if fill_price <= Decimal::ZERO {
396            log::warn!(
397                "Calculated fill price {fill_price} is invalid for bet_id={}, falling back to avp={avp}",
398                uo.id,
399            );
400            return avp;
401        }
402
403        fill_price
404    }
405
406    /// Pre-populates state for a bet from existing order data.
407    ///
408    /// Called during reconnect sync so that the first stream update
409    /// computes a correct incremental fill instead of treating the
410    /// cumulative matched size as a new fill.
411    pub fn sync_order(&mut self, bet_id: &str, filled_qty: Decimal, avg_px: Decimal) {
412        self.filled_qty.insert(bet_id.to_string(), filled_qty);
413
414        if avg_px > Decimal::ZERO {
415            self.avg_px.insert(bet_id.to_string(), avg_px);
416        }
417    }
418
419    /// Removes state for a completed bet to prevent unbounded growth.
420    pub fn prune(&mut self, bet_id: &str) {
421        self.filled_qty.remove(bet_id);
422        self.avg_px.remove(bet_id);
423
424        let prefix = format!("{bet_id}-");
425        self.published_trade_ids
426            .retain(|id| !id.starts_with(&prefix));
427    }
428}
429
430/// Returns `true` if the unmatched order has cancel, lapse, or void quantities.
431#[must_use]
432pub fn has_cancel_quantity(uo: &UnmatchedOrder) -> bool {
433    let sc = uo.sc.unwrap_or(Decimal::ZERO);
434    let sl = uo.sl.unwrap_or(Decimal::ZERO);
435    let sv = uo.sv.unwrap_or(Decimal::ZERO);
436    (sc + sl + sv) > Decimal::ZERO
437}
438
439/// Returns `true` if the order is execution-complete and has lapsed.
440#[must_use]
441pub fn is_lapsed(uo: &UnmatchedOrder) -> bool {
442    uo.status == StreamingOrderStatus::ExecutionComplete && uo.lsrc.is_some()
443}
444
445/// Parses a streaming [`UnmatchedOrder`] into a Nautilus [`OrderStatusReport`].
446///
447/// Resolves the Nautilus order status from the Betfair streaming status
448/// plus matched/cancelled quantities.
449///
450/// # Errors
451///
452/// Returns an error if price or quantity values cannot be converted.
453pub fn parse_order_status_report(
454    uo: &UnmatchedOrder,
455    instrument_id: InstrumentId,
456    account_id: AccountId,
457    ts_event: UnixNanos,
458    ts_init: UnixNanos,
459) -> anyhow::Result<OrderStatusReport> {
460    let order_side = OrderSide::from(uo.side);
461    let order_type = OrderType::from(uo.ot);
462    let time_in_force = parse_stream_time_in_force(uo)?;
463
464    let size_matched = uo.sm.unwrap_or(Decimal::ZERO);
465    let size_cancelled = uo.sc.unwrap_or(Decimal::ZERO);
466    let size_lapsed = uo.sl.unwrap_or(Decimal::ZERO);
467    let size_voided = uo.sv.unwrap_or(Decimal::ZERO);
468
469    // Include lapsed/voided in the closed quantity for status resolution
470    let size_closed = size_cancelled + size_lapsed + size_voided;
471    let order_status = resolve_streaming_order_status(uo.status, size_matched, size_closed);
472
473    let quantity_decimal = stream_order_quantity(uo);
474    anyhow::ensure!(
475        quantity_decimal > Decimal::ZERO,
476        "failed to resolve positive quantity for stream order update {} \
477         (order_type={:?}, persistence_type={:?}, size={}, bsp_liability={:?}, \
478         size_matched={:?}, size_remaining={:?}, size_cancelled={:?}, size_lapsed={:?}, size_voided={:?})",
479        uo.id,
480        uo.ot,
481        uo.pt,
482        uo.s,
483        uo.bsp,
484        uo.sm,
485        uo.sr,
486        uo.sc,
487        uo.sl,
488        uo.sv,
489    );
490    let quantity = Quantity::from_decimal_dp(quantity_decimal, BETFAIR_QUANTITY_PRECISION)?;
491    let filled_qty = Quantity::from_decimal_dp(size_matched, BETFAIR_QUANTITY_PRECISION)?;
492
493    let ts_accepted = parse_millis_timestamp(uo.pd);
494
495    // Use the latest lifecycle timestamp, falling back to OCM publish time
496    let ts_last = [uo.md, uo.cd, uo.ld]
497        .into_iter()
498        .flatten()
499        .max()
500        .map_or(ts_event, parse_millis_timestamp);
501
502    let venue_order_id = VenueOrderId::from(uo.id.as_str());
503    let client_order_id = uo.rfo.as_deref().map(ClientOrderId::from);
504
505    let price = Price::from_decimal_dp(uo.p, BETFAIR_PRICE_PRECISION)?;
506
507    let mut report = OrderStatusReport::new(
508        account_id,
509        instrument_id,
510        client_order_id,
511        venue_order_id,
512        order_side,
513        order_type,
514        time_in_force,
515        order_status,
516        quantity,
517        filled_qty,
518        ts_accepted,
519        ts_last,
520        ts_init,
521        None,
522    )
523    .with_price(price);
524
525    report.avg_px = uo.avp;
526    if let Some(lsrc) = uo.lsrc {
527        report.cancel_reason = Some(lsrc.to_string());
528    }
529
530    Ok(report)
531}
532
533fn parse_stream_time_in_force(uo: &UnmatchedOrder) -> anyhow::Result<TimeInForce> {
534    match uo.pt {
535        Some(persistence_type) => Ok(TimeInForce::from(persistence_type)),
536        None if matches!(
537            uo.ot,
538            crate::common::enums::StreamingOrderType::LimitOnClose
539                | crate::common::enums::StreamingOrderType::MarketOnClose
540        ) =>
541        {
542            Ok(TimeInForce::AtTheClose)
543        }
544        None => anyhow::bail!("missing persistence type for order update {}", uo.id),
545    }
546}
547
548fn stream_order_quantity(uo: &UnmatchedOrder) -> Decimal {
549    if uo.s > Decimal::ZERO {
550        return uo.s;
551    }
552
553    let lifecycle_qty = uo.sm.unwrap_or(Decimal::ZERO)
554        + uo.sr.unwrap_or(Decimal::ZERO)
555        + uo.sc.unwrap_or(Decimal::ZERO)
556        + uo.sl.unwrap_or(Decimal::ZERO)
557        + uo.sv.unwrap_or(Decimal::ZERO);
558
559    if lifecycle_qty > Decimal::ZERO {
560        return lifecycle_qty;
561    }
562
563    if uses_liability_based_stream_quantity(uo) {
564        return uo.bsp.unwrap_or(Decimal::ZERO);
565    }
566
567    Decimal::ZERO
568}
569
570fn resolve_stream_order_quantity(order_qty: Decimal, uo: &UnmatchedOrder) -> Decimal {
571    if order_qty > Decimal::ZERO {
572        order_qty
573    } else {
574        stream_order_quantity(uo)
575    }
576}
577
578fn uses_liability_based_stream_quantity(uo: &UnmatchedOrder) -> bool {
579    matches!(
580        uo.ot,
581        crate::common::enums::StreamingOrderType::LimitOnClose
582            | crate::common::enums::StreamingOrderType::MarketOnClose
583    )
584}
585
586/// Creates a [`FillReport`] for a Betfair order fill.
587///
588/// Betfair charges commission on net winnings, not per-fill, so commission
589/// is set to zero. The `liquidity_side` is unknown from the stream.
590#[must_use]
591#[expect(clippy::too_many_arguments)]
592pub fn make_fill_report(
593    account_id: AccountId,
594    instrument_id: InstrumentId,
595    venue_order_id: VenueOrderId,
596    trade_id: TradeId,
597    order_side: OrderSide,
598    last_qty: Quantity,
599    last_px: Price,
600    currency: Currency,
601    client_order_id: Option<ClientOrderId>,
602    ts_event: UnixNanos,
603    ts_init: UnixNanos,
604) -> FillReport {
605    FillReport::new(
606        account_id,
607        instrument_id,
608        venue_order_id,
609        trade_id,
610        order_side,
611        last_qty,
612        last_px,
613        Money::new(0.0, currency),
614        LiquiditySide::NoLiquiditySide,
615        client_order_id,
616        None,
617        ts_event,
618        ts_init,
619        None,
620    )
621}
622
623/// Extracts a [`BetfairTicker`] from a runner change if any ticker fields are present.
624///
625/// Returns `None` when the runner change contains no ltp, tv, spn, or spf data.
626#[must_use]
627pub fn parse_betfair_ticker(
628    instrument_id: InstrumentId,
629    rc: &RunnerChange,
630    ts_event: UnixNanos,
631    ts_init: UnixNanos,
632) -> Option<BetfairTicker> {
633    if rc.ltp.is_none() && rc.tv.is_none() && rc.spn.is_none() && rc.spf.is_none() {
634        return None;
635    }
636
637    Some(BetfairTicker::new(
638        instrument_id,
639        rc.ltp.map_or(f64::NAN, |d| {
640            d.to_string().parse::<f64>().unwrap_or(f64::NAN)
641        }),
642        rc.tv.map_or(f64::NAN, |d| {
643            d.to_string().parse::<f64>().unwrap_or(f64::NAN)
644        }),
645        rc.spn.map_or(f64::NAN, |d| {
646            d.to_string().parse::<f64>().unwrap_or(f64::NAN)
647        }),
648        rc.spf.map_or(f64::NAN, |d| {
649            d.to_string().parse::<f64>().unwrap_or(f64::NAN)
650        }),
651        ts_event,
652        ts_init,
653    ))
654}
655
656/// Extracts [`BetfairStartingPrice`] values from a market definition's runners.
657///
658/// Returns one entry per runner that has a non-None BSP value.
659#[must_use]
660pub fn parse_betfair_starting_prices(
661    market_id: &str,
662    def: &MarketDefinition,
663    ts_event: UnixNanos,
664    ts_init: UnixNanos,
665) -> Vec<BetfairStartingPrice> {
666    let Some(runners) = &def.runners else {
667        return Vec::new();
668    };
669
670    runners
671        .iter()
672        .filter_map(|rd| {
673            let bsp = rd.bsp?;
674            let handicap = rd.hc.unwrap_or(Decimal::ZERO);
675            let instrument_id = make_instrument_id(market_id, rd.id, handicap);
676            let bsp_f64 = bsp.to_string().parse::<f64>().unwrap_or(f64::NAN);
677            Some(BetfairStartingPrice::new(
678                instrument_id,
679                bsp_f64,
680                ts_event,
681                ts_init,
682            ))
683        })
684        .collect()
685}
686
687/// Extracts BSP order book deltas from a runner change's `spb`/`spl` fields.
688///
689/// Returns an empty vec when neither `spb` nor `spl` data is present.
690/// The `side` field uses `OrderSide::Sell` for `spb` (back) and
691/// `OrderSide::Buy` for `spl` (lay), following Betfair's inverted convention.
692#[must_use]
693pub fn parse_bsp_book_deltas(
694    instrument_id: InstrumentId,
695    rc: &RunnerChange,
696    ts_event: UnixNanos,
697    ts_init: UnixNanos,
698) -> Vec<BetfairBspBookDelta> {
699    let spb_len = rc.spb.as_ref().map_or(0, Vec::len);
700    let spl_len = rc.spl.as_ref().map_or(0, Vec::len);
701
702    if spb_len + spl_len == 0 {
703        return Vec::new();
704    }
705
706    let mut result = Vec::with_capacity(spb_len + spl_len);
707
708    // spb (starting price back) -> Sell side (Betfair convention)
709    for pv in rc.spb.as_deref().unwrap_or(&[]) {
710        let action = if pv.volume == Decimal::ZERO {
711            BookAction::Delete as u32
712        } else {
713            BookAction::Update as u32
714        };
715
716        result.push(BetfairBspBookDelta::new(
717            instrument_id,
718            action,
719            OrderSide::Sell as u32,
720            pv.price.to_string().parse::<f64>().unwrap_or(f64::NAN),
721            pv.volume.to_string().parse::<f64>().unwrap_or(0.0),
722            ts_event,
723            ts_init,
724        ));
725    }
726
727    // spl (starting price lay) -> Buy side (Betfair convention)
728    for pv in rc.spl.as_deref().unwrap_or(&[]) {
729        let action = if pv.volume == Decimal::ZERO {
730            BookAction::Delete as u32
731        } else {
732            BookAction::Update as u32
733        };
734
735        result.push(BetfairBspBookDelta::new(
736            instrument_id,
737            action,
738            OrderSide::Buy as u32,
739            pv.price.to_string().parse::<f64>().unwrap_or(f64::NAN),
740            pv.volume.to_string().parse::<f64>().unwrap_or(0.0),
741            ts_event,
742            ts_init,
743        ));
744    }
745
746    result
747}
748
749/// Produces [`InstrumentClose`] events from a market definition's runner statuses.
750///
751/// Winners and placed runners get close price 1.0; losers and removed runners
752/// get close price 0.0. Active runners produce no close event.
753#[must_use]
754pub fn parse_instrument_closes(
755    market_id: &str,
756    def: &MarketDefinition,
757    ts_event: UnixNanos,
758    ts_init: UnixNanos,
759) -> Vec<InstrumentClose> {
760    let Some(runners) = &def.runners else {
761        return Vec::new();
762    };
763
764    runners
765        .iter()
766        .filter_map(|rd| {
767            let status = rd.status.as_ref()?;
768            let close_price = match status {
769                RunnerStatus::Winner | RunnerStatus::Placed => Price::from("1.00"),
770                RunnerStatus::Loser | RunnerStatus::Removed | RunnerStatus::RemovedVacant => {
771                    Price::from("0.00")
772                }
773                RunnerStatus::Active | RunnerStatus::Hidden => return None,
774            };
775
776            let handicap = rd.hc.unwrap_or(Decimal::ZERO);
777            let instrument_id = make_instrument_id(market_id, rd.id, handicap);
778
779            Some(InstrumentClose::new(
780                instrument_id,
781                close_price,
782                InstrumentCloseType::ContractExpired,
783                ts_event,
784                ts_init,
785            ))
786        })
787        .collect()
788}
789
790/// Parses a single [`RaceRunnerChange`] into a [`BetfairRaceRunnerData`].
791///
792/// Returns `None` if the runner change has no selection ID.
793#[must_use]
794pub fn parse_race_runner_data(
795    race_id: &str,
796    market_id: &str,
797    rrc: &RaceRunnerChange,
798    ts_event: UnixNanos,
799    ts_init: UnixNanos,
800) -> Option<BetfairRaceRunnerData> {
801    let selection_id = rrc.id?;
802
803    Some(BetfairRaceRunnerData::new(
804        race_id.to_string(),
805        market_id.to_string(),
806        selection_id,
807        rrc.lat.unwrap_or(f64::NAN),
808        rrc.lng.unwrap_or(f64::NAN),
809        rrc.spd.unwrap_or(f64::NAN),
810        rrc.prg.unwrap_or(f64::NAN),
811        rrc.sfq.unwrap_or(f64::NAN),
812        ts_event,
813        ts_init,
814    ))
815}
816
817/// Parses a [`RaceProgressChange`] into a [`BetfairRaceProgress`].
818#[must_use]
819pub fn parse_race_progress(
820    race_id: &str,
821    market_id: &str,
822    rpc: &RaceProgressChange,
823    ts_event: UnixNanos,
824    ts_init: UnixNanos,
825) -> BetfairRaceProgress {
826    let order_json = rpc
827        .ord
828        .as_ref()
829        .map(|v| serde_json::to_string(v).unwrap_or_default())
830        .unwrap_or_default();
831
832    let jumps_json = rpc
833        .jumps
834        .as_ref()
835        .map(|v| serde_json::to_string(v).unwrap_or_default())
836        .unwrap_or_default();
837
838    BetfairRaceProgress::new(
839        race_id.to_string(),
840        market_id.to_string(),
841        rpc.g.clone().unwrap_or_default(),
842        rpc.st.unwrap_or(f64::NAN),
843        rpc.rt.unwrap_or(f64::NAN),
844        rpc.spd.unwrap_or(f64::NAN),
845        rpc.prg.unwrap_or(f64::NAN),
846        order_json,
847        jumps_json,
848        ts_event,
849        ts_init,
850    )
851}
852
853#[cfg(test)]
854mod tests {
855    use nautilus_model::enums::{MarketStatusAction, OrderStatus, TimeInForce};
856    use rstest::rstest;
857
858    use super::*;
859    use crate::{
860        common::{
861            enums::{StreamingOrderType, StreamingPersistenceType, StreamingSide},
862            testing::load_test_json,
863        },
864        stream::messages::{PV, RunnerDefinition, StreamMessage, stream_decode},
865    };
866
867    #[rstest]
868    fn test_parse_runner_book_snapshot() {
869        let data = load_test_json("stream/mcm_live_IMAGE.json");
870        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
871
872        if let StreamMessage::MarketChange(mcm) = msg {
873            let mc = mcm.mc.as_ref().unwrap();
874            let change = &mc[0];
875            let rc = &change.rc.as_ref().unwrap()[0];
876            let instrument_id = make_instrument_id(&change.id, rc.id, Decimal::ZERO);
877
878            let deltas = parse_runner_book_deltas(
879                instrument_id,
880                rc,
881                true,
882                mcm.pt,
883                parse_millis_timestamp(mcm.pt),
884                parse_millis_timestamp(mcm.pt),
885            )
886            .unwrap()
887            .expect("should produce deltas");
888
889            // Clear + atb levels + atl levels
890            let atb_len = rc.atb.as_ref().unwrap().len();
891            let atl_len = rc.atl.as_ref().unwrap().len();
892            assert_eq!(deltas.deltas.len(), 1 + atb_len + atl_len);
893
894            // First delta is Clear with F_SNAPSHOT
895            assert_eq!(deltas.deltas[0].action, BookAction::Clear);
896            assert!(RecordFlag::F_SNAPSHOT.matches(deltas.deltas[0].flags));
897
898            // Subsequent deltas are Add with F_SNAPSHOT
899            for delta in &deltas.deltas[1..] {
900                assert_eq!(delta.action, BookAction::Add);
901                assert!(RecordFlag::F_SNAPSHOT.matches(delta.flags));
902            }
903
904            // Last delta has F_LAST
905            let last = deltas.deltas.last().unwrap();
906            assert!(RecordFlag::F_LAST.matches(last.flags));
907
908            // Verify buy/sell sides
909            let buy_count = deltas
910                .deltas
911                .iter()
912                .filter(|d| d.order.side == OrderSide::Buy)
913                .count();
914            let sell_count = deltas
915                .deltas
916                .iter()
917                .filter(|d| d.order.side == OrderSide::Sell)
918                .count();
919            assert_eq!(buy_count, atb_len);
920            assert_eq!(sell_count, atl_len);
921        } else {
922            panic!("expected MarketChange");
923        }
924    }
925
926    #[rstest]
927    fn test_parse_runner_book_update() {
928        let data = load_test_json("stream/mcm_UPDATE.json");
929        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
930
931        if let StreamMessage::MarketChange(mcm) = msg {
932            let mc = mcm.mc.as_ref().unwrap();
933            let change = &mc[0];
934            let rc = &change.rc.as_ref().unwrap()[0];
935            let instrument_id = make_instrument_id(&change.id, rc.id, Decimal::ZERO);
936
937            let deltas = parse_runner_book_deltas(
938                instrument_id,
939                rc,
940                false,
941                mcm.pt,
942                parse_millis_timestamp(mcm.pt),
943                parse_millis_timestamp(mcm.pt),
944            )
945            .unwrap()
946            .expect("should produce deltas");
947
948            // No Clear delta for updates
949            assert!(deltas.deltas.iter().all(|d| d.action != BookAction::Clear));
950
951            // Last delta has F_LAST
952            let last = deltas.deltas.last().unwrap();
953            assert!(RecordFlag::F_LAST.matches(last.flags));
954
955            // No snapshot flags
956            for delta in &deltas.deltas {
957                assert!(!RecordFlag::F_SNAPSHOT.matches(delta.flags));
958            }
959        } else {
960            panic!("expected MarketChange");
961        }
962    }
963
964    #[rstest]
965    fn test_parse_runner_book_update_zero_volume_is_delete() {
966        let data = load_test_json("stream/mcm_UPDATE.json");
967        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
968
969        if let StreamMessage::MarketChange(mcm) = msg {
970            let mc = mcm.mc.as_ref().unwrap();
971            let change = &mc[0];
972            let rc = &change.rc.as_ref().unwrap()[0];
973            let instrument_id = make_instrument_id(&change.id, rc.id, Decimal::ZERO);
974
975            let deltas = parse_runner_book_deltas(
976                instrument_id,
977                rc,
978                false,
979                mcm.pt,
980                parse_millis_timestamp(mcm.pt),
981                parse_millis_timestamp(mcm.pt),
982            )
983            .unwrap()
984            .unwrap();
985
986            // atl has [[4.7, 0]] which should be Delete
987            assert!(
988                deltas.deltas.iter().any(|d| d.action == BookAction::Delete),
989                "zero volume should produce Delete action"
990            );
991        } else {
992            panic!("expected MarketChange");
993        }
994    }
995
996    #[rstest]
997    fn test_parse_runner_book_no_levels_returns_none() {
998        let rc = RunnerChange {
999            id: 12345,
1000            hc: None,
1001            atb: None,
1002            atl: None,
1003            batb: None,
1004            batl: None,
1005            bdatb: None,
1006            bdatl: None,
1007            spb: None,
1008            spl: None,
1009            spn: None,
1010            spf: None,
1011            trd: None,
1012            ltp: None,
1013            tv: None,
1014        };
1015
1016        let result = parse_runner_book_deltas(
1017            make_instrument_id("1.123", 12345, Decimal::ZERO),
1018            &rc,
1019            false,
1020            0,
1021            UnixNanos::default(),
1022            UnixNanos::default(),
1023        );
1024
1025        assert!(result.unwrap().is_none());
1026    }
1027
1028    #[rstest]
1029    fn test_make_trade_tick() {
1030        let instrument_id = make_instrument_id("1.180737206", 19248890, Decimal::ZERO);
1031        let tick = make_trade_tick(
1032            instrument_id,
1033            Price::new(2.42, BETFAIR_PRICE_PRECISION),
1034            Quantity::new(100.0, BETFAIR_QUANTITY_PRECISION),
1035            TradeId::from("test-trade-1"),
1036            UnixNanos::default(),
1037            UnixNanos::default(),
1038        );
1039
1040        assert_eq!(tick.instrument_id, instrument_id);
1041        assert_eq!(tick.price.as_f64(), 2.42);
1042        assert_eq!(tick.size.as_f64(), 100.0);
1043        assert_eq!(tick.aggressor_side, AggressorSide::NoAggressor);
1044    }
1045
1046    fn make_status_def(
1047        status: MarketStatus,
1048        in_play: bool,
1049        runner_status: RunnerStatus,
1050    ) -> MarketDefinition {
1051        MarketDefinition {
1052            runners: Some(vec![RunnerDefinition {
1053                id: 456,
1054                hc: None,
1055                sort_priority: None,
1056                name: None,
1057                status: Some(runner_status),
1058                adjustment_factor: None,
1059                bsp: None,
1060                removal_date: None,
1061            }]),
1062            bet_delay: None,
1063            betting_type: None,
1064            bsp_market: None,
1065            bsp_reconciled: None,
1066            competition_id: None,
1067            competition_name: None,
1068            complete: None,
1069            country_code: None,
1070            cross_matching: None,
1071            discount_allowed: None,
1072            each_way_divisor: None,
1073            event_id: None,
1074            event_name: None,
1075            event_type_id: None,
1076            event_type_name: None,
1077            in_play: Some(in_play),
1078            line_interval: None,
1079            line_max_unit: None,
1080            line_min_unit: None,
1081            market_base_rate: None,
1082            market_id: None,
1083            market_name: None,
1084            market_time: None,
1085            market_type: None,
1086            number_of_active_runners: None,
1087            number_of_winners: None,
1088            open_date: None,
1089            persistence_enabled: None,
1090            price_ladder_definition: None,
1091            race_type: None,
1092            regulators: None,
1093            runners_voidable: None,
1094            settled_time: None,
1095            status: Some(status),
1096            suspend_time: None,
1097            timezone: None,
1098            turn_in_play_enabled: None,
1099            venue: None,
1100            version: None,
1101        }
1102    }
1103
1104    #[rstest]
1105    #[case(MarketStatus::Open, false, MarketStatusAction::PreOpen, false)]
1106    #[case(MarketStatus::Open, true, MarketStatusAction::Trading, true)]
1107    #[case(MarketStatus::Closed, false, MarketStatusAction::Close, false)]
1108    #[case(MarketStatus::Closed, true, MarketStatusAction::Close, false)]
1109    #[case(MarketStatus::Suspended, false, MarketStatusAction::Pause, false)]
1110    #[case(MarketStatus::Suspended, true, MarketStatusAction::Pause, false)]
1111    #[case(MarketStatus::Inactive, false, MarketStatusAction::Close, false)]
1112    fn test_parse_instrument_statuses_market_state(
1113        #[case] status: MarketStatus,
1114        #[case] in_play: bool,
1115        #[case] expected_action: MarketStatusAction,
1116        #[case] expected_is_trading: bool,
1117    ) {
1118        let def = make_status_def(status, in_play, RunnerStatus::Active);
1119        let results =
1120            parse_instrument_statuses("1.123", &def, UnixNanos::default(), UnixNanos::default());
1121
1122        assert_eq!(results.len(), 1);
1123        assert_eq!(results[0].action, expected_action);
1124        assert_eq!(results[0].is_trading, Some(expected_is_trading));
1125    }
1126
1127    #[rstest]
1128    #[case(RunnerStatus::Removed)]
1129    #[case(RunnerStatus::RemovedVacant)]
1130    fn test_parse_instrument_statuses_scratched_runner_closes(#[case] runner_status: RunnerStatus) {
1131        // Even with Open + in_play the runner must close when scratched
1132        let def = make_status_def(MarketStatus::Open, true, runner_status);
1133        let results =
1134            parse_instrument_statuses("1.123", &def, UnixNanos::default(), UnixNanos::default());
1135
1136        assert_eq!(results.len(), 1);
1137        assert_eq!(results[0].action, MarketStatusAction::Close);
1138        assert_eq!(results[0].is_trading, Some(false));
1139    }
1140
1141    #[rstest]
1142    #[case::missing_runners("runners")]
1143    #[case::missing_status("status")]
1144    fn test_parse_instrument_statuses_returns_empty(#[case] drop_field: &str) {
1145        let mut def = make_status_def(MarketStatus::Open, true, RunnerStatus::Active);
1146        match drop_field {
1147            "runners" => def.runners = None,
1148            "status" => def.status = None,
1149            _ => unreachable!(),
1150        }
1151
1152        let results =
1153            parse_instrument_statuses("1.123", &def, UnixNanos::default(), UnixNanos::default());
1154
1155        assert!(results.is_empty());
1156    }
1157
1158    #[rstest]
1159    fn test_parse_instrument_statuses_mixed_runners() {
1160        // Three runners in one market definition: an Active runner should follow the
1161        // market-level mapping while Removed/RemovedVacant override to Close. Verify
1162        // that selection id and handicap propagate into the emitted instrument id.
1163        let mut def = make_status_def(MarketStatus::Open, true, RunnerStatus::Active);
1164        def.runners = Some(vec![
1165            RunnerDefinition {
1166                id: 101,
1167                hc: None,
1168                sort_priority: Some(1),
1169                name: None,
1170                status: Some(RunnerStatus::Active),
1171                adjustment_factor: None,
1172                bsp: None,
1173                removal_date: None,
1174            },
1175            RunnerDefinition {
1176                id: 202,
1177                hc: Some(Decimal::new(25, 1)), // 2.5 handicap
1178                sort_priority: Some(2),
1179                name: None,
1180                status: Some(RunnerStatus::Removed),
1181                adjustment_factor: None,
1182                bsp: None,
1183                removal_date: None,
1184            },
1185            RunnerDefinition {
1186                id: 303,
1187                hc: None,
1188                sort_priority: Some(3),
1189                name: None,
1190                status: Some(RunnerStatus::RemovedVacant),
1191                adjustment_factor: None,
1192                bsp: None,
1193                removal_date: None,
1194            },
1195        ]);
1196
1197        let results =
1198            parse_instrument_statuses("1.999", &def, UnixNanos::default(), UnixNanos::default());
1199
1200        assert_eq!(results.len(), 3);
1201
1202        assert_eq!(results[0].action, MarketStatusAction::Trading);
1203        assert_eq!(results[0].is_trading, Some(true));
1204
1205        assert_eq!(results[1].action, MarketStatusAction::Close);
1206        assert_eq!(results[1].is_trading, Some(false));
1207
1208        assert_eq!(results[2].action, MarketStatusAction::Close);
1209        assert_eq!(results[2].is_trading, Some(false));
1210
1211        // Each runner produces a distinct instrument id (selection id + handicap)
1212        assert_ne!(results[0].instrument_id, results[1].instrument_id);
1213        assert_ne!(results[1].instrument_id, results[2].instrument_id);
1214        assert_ne!(results[0].instrument_id, results[2].instrument_id);
1215    }
1216
1217    #[rstest]
1218    fn test_parse_order_status_report_new_order() {
1219        let data = load_test_json("stream/ocm_NEW_FULL_IMAGE.json");
1220        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1221
1222        if let StreamMessage::OrderChange(ocm) = msg {
1223            let oc = ocm.oc.as_ref().unwrap();
1224            let omc = &oc[0];
1225            let orc = &omc.orc.as_ref().unwrap()[0];
1226            let uo = &orc.uo.as_ref().unwrap()[0];
1227
1228            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1229            let report = parse_order_status_report(
1230                uo,
1231                instrument_id,
1232                AccountId::from("BETFAIR-001"),
1233                parse_millis_timestamp(ocm.pt),
1234                parse_millis_timestamp(ocm.pt),
1235            )
1236            .unwrap();
1237
1238            // Partially filled: sm=4.75, sr=0.25, status=E
1239            assert_eq!(report.order_status, OrderStatus::PartiallyFilled);
1240            assert_eq!(report.order_side, OrderSide::Sell); // Back → Sell
1241            assert_eq!(report.order_type, OrderType::Limit);
1242            assert_eq!(report.filled_qty.as_f64(), 4.75);
1243            assert_eq!(report.quantity.as_f64(), 5.0);
1244            assert!(report.price.is_some());
1245            assert_eq!(report.price.unwrap().as_f64(), 12.0);
1246        } else {
1247            panic!("expected OrderChange");
1248        }
1249    }
1250
1251    #[rstest]
1252    fn test_parse_order_status_report_filled() {
1253        let data = load_test_json("stream/ocm_FILLED.json");
1254        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1255
1256        if let StreamMessage::OrderChange(ocm) = msg {
1257            let oc = ocm.oc.as_ref().unwrap();
1258            let omc = &oc[0];
1259            let orc = &omc.orc.as_ref().unwrap()[0];
1260            let uo = &orc.uo.as_ref().unwrap()[0];
1261
1262            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1263            let report = parse_order_status_report(
1264                uo,
1265                instrument_id,
1266                AccountId::from("BETFAIR-001"),
1267                parse_millis_timestamp(ocm.pt),
1268                parse_millis_timestamp(ocm.pt),
1269            )
1270            .unwrap();
1271
1272            assert_eq!(report.order_status, OrderStatus::Filled);
1273            assert_eq!(report.order_side, OrderSide::Buy); // Lay → Buy
1274            assert_eq!(report.filled_qty.as_f64(), 10.0);
1275            assert_eq!(report.quantity.as_f64(), 10.0);
1276
1277            // Has client_order_id from rfo field
1278            assert!(report.client_order_id.is_some());
1279        } else {
1280            panic!("expected OrderChange");
1281        }
1282    }
1283
1284    #[rstest]
1285    fn test_parse_order_status_report_cancelled() {
1286        let data = load_test_json("stream/ocm_CANCEL.json");
1287        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1288
1289        if let StreamMessage::OrderChange(ocm) = msg {
1290            let oc = ocm.oc.as_ref().unwrap();
1291            let omc = &oc[0];
1292            let orc = &omc.orc.as_ref().unwrap()[0];
1293            let uo = &orc.uo.as_ref().unwrap()[0];
1294
1295            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1296            let report = parse_order_status_report(
1297                uo,
1298                instrument_id,
1299                AccountId::from("BETFAIR-001"),
1300                parse_millis_timestamp(ocm.pt),
1301                parse_millis_timestamp(ocm.pt),
1302            )
1303            .unwrap();
1304
1305            assert_eq!(report.order_status, OrderStatus::Canceled);
1306            assert_eq!(report.order_side, OrderSide::Sell); // Back → Sell
1307            assert_eq!(report.filled_qty.as_f64(), 0.0);
1308            assert_eq!(report.quantity.as_f64(), 10.0);
1309        } else {
1310            panic!("expected OrderChange");
1311        }
1312    }
1313
1314    #[rstest]
1315    fn test_parse_order_status_report_duplicate_execution() {
1316        let data = load_test_json("stream/ocm_DUPLICATE_EXECUTION.json");
1317        let msgs: Vec<StreamMessage> = serde_json::from_str(&data).unwrap();
1318
1319        if let StreamMessage::OrderChange(ocm) = &msgs[0] {
1320            let oc = ocm.oc.as_ref().unwrap();
1321            let omc = &oc[0];
1322            let orc = &omc.orc.as_ref().unwrap()[0];
1323            let uo = &orc.uo.as_ref().unwrap()[0];
1324
1325            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1326            let report = parse_order_status_report(
1327                uo,
1328                instrument_id,
1329                AccountId::from("BETFAIR-001"),
1330                parse_millis_timestamp(ocm.pt),
1331                parse_millis_timestamp(ocm.pt),
1332            )
1333            .unwrap();
1334
1335            // Partially filled: sm=1.12, status=E
1336            assert_eq!(report.order_status, OrderStatus::PartiallyFilled);
1337            assert_eq!(report.order_side, OrderSide::Buy); // Lay → Buy
1338            assert_eq!(report.filled_qty.as_f64(), 1.12);
1339        } else {
1340            panic!("expected OrderChange");
1341        }
1342    }
1343
1344    #[rstest]
1345    fn test_parse_order_status_report_multiple_fills() {
1346        let data = load_test_json("stream/ocm_multiple_fills.json");
1347        let msgs: Vec<StreamMessage> = serde_json::from_str(&data).unwrap();
1348
1349        if let StreamMessage::OrderChange(ocm) = &msgs[0] {
1350            let oc = ocm.oc.as_ref().unwrap();
1351            let omc = &oc[0];
1352            let orc = &omc.orc.as_ref().unwrap()[0];
1353            let uo = &orc.uo.as_ref().unwrap()[0];
1354
1355            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1356            let report = parse_order_status_report(
1357                uo,
1358                instrument_id,
1359                AccountId::from("BETFAIR-001"),
1360                parse_millis_timestamp(ocm.pt),
1361                parse_millis_timestamp(ocm.pt),
1362            )
1363            .unwrap();
1364
1365            // Partially filled: sm=16.19, status=E, has rfo
1366            assert_eq!(report.order_status, OrderStatus::PartiallyFilled);
1367            assert_eq!(report.order_side, OrderSide::Sell); // Back → Sell
1368            assert_eq!(report.filled_qty.as_f64(), 16.19);
1369            assert!(report.client_order_id.is_some());
1370            assert!(report.avg_px.is_some());
1371        } else {
1372            panic!("expected OrderChange");
1373        }
1374    }
1375
1376    #[rstest]
1377    fn test_parse_runner_book_snapshot_empty_book() {
1378        // A snapshot with no levels should still produce a clear delta
1379        let rc = RunnerChange {
1380            id: 12345,
1381            hc: None,
1382            atb: Some(vec![]),
1383            atl: Some(vec![]),
1384            batb: None,
1385            batl: None,
1386            bdatb: None,
1387            bdatl: None,
1388            spb: None,
1389            spl: None,
1390            spn: None,
1391            spf: None,
1392            trd: None,
1393            ltp: None,
1394            tv: None,
1395        };
1396
1397        let result = parse_runner_book_deltas(
1398            make_instrument_id("1.123", 12345, Decimal::ZERO),
1399            &rc,
1400            true,
1401            1000,
1402            UnixNanos::default(),
1403            UnixNanos::default(),
1404        )
1405        .unwrap()
1406        .expect("should produce snapshot deltas");
1407
1408        // Just the Clear delta with F_LAST
1409        assert_eq!(result.deltas.len(), 1);
1410        assert_eq!(result.deltas[0].action, BookAction::Clear);
1411        assert!(RecordFlag::F_LAST.matches(result.deltas[0].flags));
1412        assert!(RecordFlag::F_SNAPSHOT.matches(result.deltas[0].flags));
1413    }
1414
1415    #[rstest]
1416    fn test_make_fill_report() {
1417        let instrument_id = make_instrument_id("1.180604981", 1209555, Decimal::ZERO);
1418        let fill = make_fill_report(
1419            AccountId::from("BETFAIR-001"),
1420            instrument_id,
1421            VenueOrderId::from("229430281339"),
1422            TradeId::from("229430281339-0"),
1423            OrderSide::Buy,
1424            Quantity::new(10.0, BETFAIR_QUANTITY_PRECISION),
1425            Price::new(1.1, BETFAIR_PRICE_PRECISION),
1426            Currency::GBP(),
1427            None,
1428            UnixNanos::default(),
1429            UnixNanos::default(),
1430        );
1431
1432        assert_eq!(fill.instrument_id, instrument_id);
1433        assert_eq!(fill.order_side, OrderSide::Buy);
1434        assert_eq!(fill.last_qty.as_f64(), 10.0);
1435        assert_eq!(fill.last_px.as_f64(), 1.1);
1436        assert_eq!(fill.commission.as_f64(), 0.0);
1437        assert_eq!(fill.liquidity_side, LiquiditySide::NoLiquiditySide);
1438    }
1439
1440    #[rstest]
1441    fn test_fill_tracker_single_full_fill() {
1442        let data = load_test_json("stream/ocm_FILLED.json");
1443        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1444
1445        if let StreamMessage::OrderChange(ocm) = msg {
1446            let oc = ocm.oc.as_ref().unwrap();
1447            let omc = &oc[0];
1448            let orc = &omc.orc.as_ref().unwrap()[0];
1449            let uo = &orc.uo.as_ref().unwrap()[0];
1450            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1451            let ts = parse_millis_timestamp(ocm.pt);
1452
1453            let mut tracker = FillTracker::new();
1454            let fill = tracker
1455                .maybe_fill_report(
1456                    uo,
1457                    uo.s,
1458                    instrument_id,
1459                    AccountId::from("BETFAIR-001"),
1460                    Currency::GBP(),
1461                    ts,
1462                    ts,
1463                )
1464                .expect("should produce fill");
1465
1466            assert_eq!(fill.last_qty.as_f64(), 10.0);
1467            assert_eq!(fill.last_px.as_f64(), 1.1);
1468            assert_eq!(fill.order_side, OrderSide::Buy);
1469            assert!(fill.client_order_id.is_some());
1470        } else {
1471            panic!("expected OrderChange");
1472        }
1473    }
1474
1475    #[rstest]
1476    fn test_fill_tracker_incremental_fills() {
1477        let data = load_test_json("stream/ocm_multiple_fills.json");
1478        let msgs: Vec<StreamMessage> = serde_json::from_str(&data).unwrap();
1479        let instrument_id = make_instrument_id("1.179082386", 50210, Decimal::ZERO);
1480
1481        let mut tracker = FillTracker::new();
1482        let account_id = AccountId::from("BETFAIR-001");
1483        let currency = Currency::GBP();
1484
1485        // First fill: sm=16.19 (from zero)
1486        let uo1 = extract_uo(&msgs[0]);
1487        let ts1 = extract_ts(&msgs[0]);
1488        let fill1 = tracker
1489            .maybe_fill_report(uo1, uo1.s, instrument_id, account_id, currency, ts1, ts1)
1490            .expect("should produce first fill");
1491        assert_eq!(fill1.last_qty.as_f64(), 16.19);
1492        assert_eq!(fill1.last_px.as_f64(), 5.8);
1493
1494        // Second fill: sm=16.96 (delta=0.77)
1495        let uo2 = extract_uo(&msgs[1]);
1496        let ts2 = extract_ts(&msgs[1]);
1497        let fill2 = tracker
1498            .maybe_fill_report(uo2, uo2.s, instrument_id, account_id, currency, ts2, ts2)
1499            .expect("should produce second fill");
1500        assert_eq!(fill2.last_qty.as_f64(), 0.77);
1501        assert_eq!(fill2.last_px.as_f64(), 5.8);
1502
1503        // Third fill: sm=17.73 (delta=0.77)
1504        let uo3 = extract_uo(&msgs[2]);
1505        let ts3 = extract_ts(&msgs[2]);
1506        let fill3 = tracker
1507            .maybe_fill_report(uo3, uo3.s, instrument_id, account_id, currency, ts3, ts3)
1508            .expect("should produce third fill");
1509        assert_eq!(fill3.last_qty.as_f64(), 0.77);
1510    }
1511
1512    #[rstest]
1513    fn test_fill_tracker_different_price() {
1514        let data = load_test_json("stream/ocm_filled_different_price.json");
1515        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1516
1517        if let StreamMessage::OrderChange(ocm) = msg {
1518            let oc = ocm.oc.as_ref().unwrap();
1519            let omc = &oc[0];
1520            let orc = &omc.orc.as_ref().unwrap()[0];
1521            let uo = &orc.uo.as_ref().unwrap()[0];
1522            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1523            let ts = parse_millis_timestamp(ocm.pt);
1524
1525            let mut tracker = FillTracker::new();
1526            let fill = tracker
1527                .maybe_fill_report(
1528                    uo,
1529                    uo.s,
1530                    instrument_id,
1531                    AccountId::from("BETFAIR-001"),
1532                    Currency::GBP(),
1533                    ts,
1534                    ts,
1535                )
1536                .expect("should produce fill");
1537
1538            // Order placed at 1.3 but average fill price is 1.2
1539            assert_eq!(fill.last_qty.as_f64(), 20.0);
1540            assert_eq!(fill.last_px.as_f64(), 1.2);
1541        } else {
1542            panic!("expected OrderChange");
1543        }
1544    }
1545
1546    #[rstest]
1547    fn test_fill_tracker_cancel_no_fill() {
1548        let data = load_test_json("stream/ocm_CANCEL.json");
1549        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1550
1551        if let StreamMessage::OrderChange(ocm) = msg {
1552            let oc = ocm.oc.as_ref().unwrap();
1553            let omc = &oc[0];
1554            let orc = &omc.orc.as_ref().unwrap()[0];
1555            let uo = &orc.uo.as_ref().unwrap()[0];
1556            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1557            let ts = parse_millis_timestamp(ocm.pt);
1558
1559            let mut tracker = FillTracker::new();
1560            let result = tracker.maybe_fill_report(
1561                uo,
1562                uo.s,
1563                instrument_id,
1564                AccountId::from("BETFAIR-001"),
1565                Currency::GBP(),
1566                ts,
1567                ts,
1568            );
1569            assert!(result.is_none(), "cancelled order should not produce fill");
1570        } else {
1571            panic!("expected OrderChange");
1572        }
1573    }
1574
1575    #[rstest]
1576    fn test_fill_tracker_lapsed_no_fill() {
1577        let data = load_test_json("stream/ocm_error_fill.json");
1578        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1579
1580        if let StreamMessage::OrderChange(ocm) = msg {
1581            let oc = ocm.oc.as_ref().unwrap();
1582            let omc = &oc[0];
1583            let orc = &omc.orc.as_ref().unwrap()[0];
1584            let uo = &orc.uo.as_ref().unwrap()[0];
1585            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1586            let ts = parse_millis_timestamp(ocm.pt);
1587
1588            let mut tracker = FillTracker::new();
1589            let result = tracker.maybe_fill_report(
1590                uo,
1591                uo.s,
1592                instrument_id,
1593                AccountId::from("BETFAIR-001"),
1594                Currency::GBP(),
1595                ts,
1596                ts,
1597            );
1598            assert!(result.is_none(), "lapsed order should not produce fill");
1599        } else {
1600            panic!("expected OrderChange");
1601        }
1602    }
1603
1604    #[rstest]
1605    fn test_fill_tracker_duplicate_dedup() {
1606        let data = load_test_json("stream/ocm_FILLED.json");
1607        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1608
1609        if let StreamMessage::OrderChange(ocm) = msg {
1610            let oc = ocm.oc.as_ref().unwrap();
1611            let omc = &oc[0];
1612            let orc = &omc.orc.as_ref().unwrap()[0];
1613            let uo = &orc.uo.as_ref().unwrap()[0];
1614            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1615            let ts = parse_millis_timestamp(ocm.pt);
1616            let account_id = AccountId::from("BETFAIR-001");
1617            let currency = Currency::GBP();
1618
1619            let mut tracker = FillTracker::new();
1620
1621            // First call produces fill
1622            let fill1 =
1623                tracker.maybe_fill_report(uo, uo.s, instrument_id, account_id, currency, ts, ts);
1624            assert!(fill1.is_some());
1625
1626            // Second call with same data produces nothing (dedup)
1627            let fill2 =
1628                tracker.maybe_fill_report(uo, uo.s, instrument_id, account_id, currency, ts, ts);
1629            assert!(fill2.is_none(), "duplicate fill should be suppressed");
1630        } else {
1631            panic!("expected OrderChange");
1632        }
1633    }
1634
1635    #[rstest]
1636    fn test_fill_tracker_price_back_calculation() {
1637        let data = load_test_json("stream/ocm_multiple_fills.json");
1638        let msgs: Vec<StreamMessage> = serde_json::from_str(&data).unwrap();
1639        let instrument_id = make_instrument_id("1.179082386", 50210, Decimal::ZERO);
1640        let account_id = AccountId::from("BETFAIR-001");
1641        let currency = Currency::GBP();
1642        let mut tracker = FillTracker::new();
1643
1644        // Process first fill at avp=5.8
1645        let uo1 = extract_uo(&msgs[0]);
1646        let ts1 = extract_ts(&msgs[0]);
1647        let fill1 = tracker
1648            .maybe_fill_report(uo1, uo1.s, instrument_id, account_id, currency, ts1, ts1)
1649            .unwrap();
1650        assert_eq!(fill1.last_px.as_f64(), 5.8);
1651
1652        // Second fill also at avp=5.8 (same price, avg unchanged)
1653        let uo2 = extract_uo(&msgs[1]);
1654        let ts2 = extract_ts(&msgs[1]);
1655        let fill2 = tracker
1656            .maybe_fill_report(uo2, uo2.s, instrument_id, account_id, currency, ts2, ts2)
1657            .unwrap();
1658        assert_eq!(fill2.last_px.as_f64(), 5.8);
1659    }
1660
1661    #[rstest]
1662    fn test_has_cancel_quantity() {
1663        let data = load_test_json("stream/ocm_CANCEL.json");
1664        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1665
1666        if let StreamMessage::OrderChange(ocm) = msg {
1667            let uo = &ocm.oc.as_ref().unwrap()[0].orc.as_ref().unwrap()[0]
1668                .uo
1669                .as_ref()
1670                .unwrap()[0];
1671            assert!(has_cancel_quantity(uo));
1672        } else {
1673            panic!("expected OrderChange");
1674        }
1675    }
1676
1677    #[rstest]
1678    fn test_has_cancel_quantity_filled_order() {
1679        let data = load_test_json("stream/ocm_FILLED.json");
1680        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1681
1682        if let StreamMessage::OrderChange(ocm) = msg {
1683            let uo = &ocm.oc.as_ref().unwrap()[0].orc.as_ref().unwrap()[0]
1684                .uo
1685                .as_ref()
1686                .unwrap()[0];
1687            assert!(!has_cancel_quantity(uo));
1688        } else {
1689            panic!("expected OrderChange");
1690        }
1691    }
1692
1693    fn extract_uo(msg: &StreamMessage) -> &UnmatchedOrder {
1694        if let StreamMessage::OrderChange(ocm) = msg {
1695            &ocm.oc.as_ref().unwrap()[0].orc.as_ref().unwrap()[0]
1696                .uo
1697                .as_ref()
1698                .unwrap()[0]
1699        } else {
1700            panic!("expected OrderChange")
1701        }
1702    }
1703
1704    fn extract_ts(msg: &StreamMessage) -> UnixNanos {
1705        if let StreamMessage::OrderChange(ocm) = msg {
1706            parse_millis_timestamp(ocm.pt)
1707        } else {
1708            panic!("expected OrderChange")
1709        }
1710    }
1711
1712    #[rstest]
1713    fn test_parse_race_runner_data_from_fixture() {
1714        let data = load_test_json("stream/rcm_single.json");
1715        let msg = stream_decode(data.as_bytes()).unwrap();
1716
1717        let StreamMessage::RaceChange(rcm) = msg else {
1718            panic!("expected RaceChange");
1719        };
1720
1721        let race = &rcm.rc.as_ref().unwrap()[0];
1722        let rrc = &race.rrc.as_ref().unwrap()[0];
1723        let ts = parse_millis_timestamp(rcm.pt);
1724
1725        let runner = parse_race_runner_data(
1726            race.id.as_deref().unwrap(),
1727            race.mid.as_deref().unwrap(),
1728            rrc,
1729            ts,
1730            ts,
1731        )
1732        .unwrap();
1733
1734        assert_eq!(runner.race_id, "28587288.1650");
1735        assert_eq!(runner.market_id, "1.1234567");
1736        assert_eq!(runner.selection_id, 7390417);
1737        assert!((runner.latitude - 51.4189543).abs() < 1e-6);
1738        assert!((runner.longitude - (-0.4058491)).abs() < 1e-6);
1739        assert!((runner.speed - 17.8).abs() < 1e-6);
1740        assert!((runner.progress - 2051.0).abs() < 1e-6);
1741        assert!((runner.stride_frequency - 2.07).abs() < 1e-6);
1742    }
1743
1744    #[rstest]
1745    fn test_parse_race_progress_from_fixture() {
1746        let data = load_test_json("stream/rcm_single.json");
1747        let msg = stream_decode(data.as_bytes()).unwrap();
1748
1749        let StreamMessage::RaceChange(rcm) = msg else {
1750            panic!("expected RaceChange");
1751        };
1752
1753        let race = &rcm.rc.as_ref().unwrap()[0];
1754        let rpc = race.rpc.as_ref().unwrap();
1755        let ts = parse_millis_timestamp(rcm.pt);
1756
1757        let progress = parse_race_progress(
1758            race.id.as_deref().unwrap(),
1759            race.mid.as_deref().unwrap(),
1760            rpc,
1761            ts,
1762            ts,
1763        );
1764
1765        assert_eq!(progress.race_id, "28587288.1650");
1766        assert_eq!(progress.market_id, "1.1234567");
1767        assert_eq!(progress.gate_name, "1f");
1768        assert!((progress.sectional_time - 10.6).abs() < 1e-6);
1769        assert!((progress.running_time - 46.7).abs() < 1e-6);
1770        assert!((progress.speed - 17.8).abs() < 1e-6);
1771        assert!((progress.progress - 87.5).abs() < 1e-6);
1772
1773        let order: Vec<i64> = serde_json::from_str(&progress.order).unwrap();
1774        assert_eq!(order, vec![7390417, 5600338, 11527189, 6395118, 8706072]);
1775
1776        let jumps: Vec<serde_json::Value> = serde_json::from_str(&progress.jumps).unwrap();
1777        assert_eq!(jumps.len(), 2);
1778        assert_eq!(jumps[0]["J"], 2);
1779    }
1780
1781    #[rstest]
1782    fn test_parse_race_runner_data_multi_runner() {
1783        let data = load_test_json("stream/rcm_multi_runner.json");
1784        let msg = stream_decode(data.as_bytes()).unwrap();
1785
1786        let StreamMessage::RaceChange(rcm) = msg else {
1787            panic!("expected RaceChange");
1788        };
1789
1790        let race = &rcm.rc.as_ref().unwrap()[0];
1791        let ts = parse_millis_timestamp(rcm.pt);
1792        let race_id = race.id.as_deref().unwrap();
1793        let market_id = race.mid.as_deref().unwrap();
1794
1795        let runners: Vec<_> = race
1796            .rrc
1797            .as_ref()
1798            .unwrap()
1799            .iter()
1800            .filter_map(|rrc| parse_race_runner_data(race_id, market_id, rrc, ts, ts))
1801            .collect();
1802
1803        assert_eq!(runners.len(), 5);
1804        assert_eq!(runners[0].selection_id, 35467839);
1805        assert_eq!(runners[4].selection_id, 41694785);
1806        assert!((runners[0].speed - 16.33).abs() < 1e-6);
1807        assert!((runners[4].speed - 17.11).abs() < 1e-6);
1808    }
1809
1810    #[rstest]
1811    fn test_parse_race_runner_data_missing_id_returns_none() {
1812        let rrc = RaceRunnerChange {
1813            ft: Some(1000),
1814            id: None,
1815            lat: Some(51.0),
1816            lng: Some(-0.4),
1817            spd: Some(15.0),
1818            prg: Some(500.0),
1819            sfq: Some(2.0),
1820        };
1821        let ts = UnixNanos::from(1_000_000_000u64);
1822        let result = parse_race_runner_data("race1", "market1", &rrc, ts, ts);
1823        assert!(result.is_none());
1824    }
1825
1826    #[rstest]
1827    fn test_parse_race_runner_data_absent_fields_are_nan() {
1828        let rrc = RaceRunnerChange {
1829            ft: None,
1830            id: Some(12345),
1831            lat: None,
1832            lng: None,
1833            spd: None,
1834            prg: None,
1835            sfq: None,
1836        };
1837        let ts = UnixNanos::from(1_000_000_000u64);
1838        let runner = parse_race_runner_data("race1", "market1", &rrc, ts, ts).unwrap();
1839        assert!(runner.latitude.is_nan());
1840        assert!(runner.longitude.is_nan());
1841        assert!(runner.speed.is_nan());
1842        assert!(runner.progress.is_nan());
1843        assert!(runner.stride_frequency.is_nan());
1844    }
1845
1846    #[rstest]
1847    fn test_parse_race_progress_absent_fields() {
1848        let rpc = RaceProgressChange {
1849            ft: None,
1850            g: None,
1851            st: None,
1852            rt: None,
1853            spd: None,
1854            prg: None,
1855            ord: None,
1856            jumps: None,
1857        };
1858        let ts = UnixNanos::from(1_000_000_000u64);
1859        let progress = parse_race_progress("race1", "market1", &rpc, ts, ts);
1860        assert_eq!(progress.gate_name, "");
1861        assert!(progress.sectional_time.is_nan());
1862        assert!(progress.running_time.is_nan());
1863        assert_eq!(progress.order, "");
1864        assert_eq!(progress.jumps, "");
1865    }
1866
1867    fn runner_change_with_ticker(
1868        id: u64,
1869        ltp: Option<Decimal>,
1870        tv: Option<Decimal>,
1871        spn: Option<Decimal>,
1872        spf: Option<Decimal>,
1873    ) -> RunnerChange {
1874        RunnerChange {
1875            id,
1876            hc: None,
1877            atb: None,
1878            atl: None,
1879            batb: None,
1880            batl: None,
1881            bdatb: None,
1882            bdatl: None,
1883            spb: None,
1884            spl: None,
1885            spn,
1886            spf,
1887            trd: None,
1888            ltp,
1889            tv,
1890        }
1891    }
1892
1893    #[rstest]
1894    fn test_parse_betfair_ticker_all_fields() {
1895        let rc = runner_change_with_ticker(
1896            9249757,
1897            Some(Decimal::new(55, 1)),
1898            Some(Decimal::new(189032, 2)),
1899            Some(Decimal::new(568, 2)),
1900            Some(Decimal::new(573, 2)),
1901        );
1902        let ts = UnixNanos::from(1_000_000_000u64);
1903        let instrument_id = make_instrument_id("1.185781465", 9249757, Decimal::ZERO);
1904
1905        let ticker = parse_betfair_ticker(instrument_id, &rc, ts, ts).unwrap();
1906
1907        assert_eq!(ticker.instrument_id, instrument_id);
1908        assert!((ticker.last_traded_price - 5.5).abs() < f64::EPSILON);
1909        assert!((ticker.traded_volume - 1890.32).abs() < f64::EPSILON);
1910        assert!((ticker.starting_price_near - 5.68).abs() < f64::EPSILON);
1911        assert!((ticker.starting_price_far - 5.73).abs() < f64::EPSILON);
1912    }
1913
1914    #[rstest]
1915    fn test_parse_betfair_ticker_partial_fields() {
1916        let rc = runner_change_with_ticker(
1917            9249757,
1918            Some(Decimal::new(55, 1)),
1919            Some(Decimal::new(189032, 2)),
1920            None,
1921            None,
1922        );
1923        let ts = UnixNanos::from(1_000_000_000u64);
1924        let instrument_id = make_instrument_id("1.185781465", 9249757, Decimal::ZERO);
1925
1926        let ticker = parse_betfair_ticker(instrument_id, &rc, ts, ts).unwrap();
1927
1928        assert!((ticker.last_traded_price - 5.5).abs() < f64::EPSILON);
1929        assert!((ticker.traded_volume - 1890.32).abs() < f64::EPSILON);
1930        assert!(ticker.starting_price_near.is_nan());
1931        assert!(ticker.starting_price_far.is_nan());
1932    }
1933
1934    #[rstest]
1935    fn test_parse_betfair_ticker_no_fields_returns_none() {
1936        let rc = runner_change_with_ticker(9249757, None, None, None, None);
1937        let ts = UnixNanos::from(1_000_000_000u64);
1938        let instrument_id = make_instrument_id("1.185781465", 9249757, Decimal::ZERO);
1939
1940        assert!(parse_betfair_ticker(instrument_id, &rc, ts, ts).is_none());
1941    }
1942
1943    #[rstest]
1944    fn test_parse_betfair_ticker_only_tv() {
1945        let rc =
1946            runner_change_with_ticker(40273293, None, Some(Decimal::new(320115, 2)), None, None);
1947        let ts = UnixNanos::from(1_000_000_000u64);
1948        let instrument_id = make_instrument_id("1.185781465", 40273293, Decimal::ZERO);
1949
1950        let ticker = parse_betfair_ticker(instrument_id, &rc, ts, ts).unwrap();
1951
1952        assert!(ticker.last_traded_price.is_nan());
1953        assert!((ticker.traded_volume - 3201.15).abs() < f64::EPSILON);
1954        assert!(ticker.starting_price_near.is_nan());
1955        assert!(ticker.starting_price_far.is_nan());
1956    }
1957
1958    #[rstest]
1959    fn test_parse_betfair_ticker_from_fixture() {
1960        let data = load_test_json("stream/mcm_BSP_settled.json");
1961        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1962
1963        if let StreamMessage::MarketChange(mcm) = msg {
1964            let mc = mcm.mc.as_ref().unwrap();
1965            let change = &mc[0];
1966            let rc_list = change.rc.as_ref().unwrap();
1967
1968            // Runner 9249757 has ltp=5.5, tv=1890.32, spn=5.68, spf=5.73
1969            let rc = rc_list.iter().find(|r| r.id == 9249757).unwrap();
1970            let instrument_id = make_instrument_id(&change.id, rc.id, Decimal::ZERO);
1971            let ts = parse_millis_timestamp(mcm.pt);
1972
1973            let ticker = parse_betfair_ticker(instrument_id, rc, ts, ts).unwrap();
1974
1975            assert!((ticker.last_traded_price - 5.5).abs() < f64::EPSILON);
1976            assert!((ticker.traded_volume - 1890.32).abs() < f64::EPSILON);
1977            assert!((ticker.starting_price_near - 5.68).abs() < f64::EPSILON);
1978            assert!((ticker.starting_price_far - 5.73).abs() < f64::EPSILON);
1979
1980            // Runner 40273293 has ltp=2.1, tv=3201.15 but no spn/spf
1981            let rc2 = rc_list.iter().find(|r| r.id == 40273293).unwrap();
1982            let instrument_id2 = make_instrument_id(&change.id, rc2.id, Decimal::ZERO);
1983            let ticker2 = parse_betfair_ticker(instrument_id2, rc2, ts, ts).unwrap();
1984
1985            assert!((ticker2.last_traded_price - 2.1).abs() < f64::EPSILON);
1986            assert!((ticker2.traded_volume - 3201.15).abs() < f64::EPSILON);
1987            assert!(ticker2.starting_price_near.is_nan());
1988            assert!(ticker2.starting_price_far.is_nan());
1989
1990            // Runner 23678734 has only tv=0, no ltp
1991            let rc3 = rc_list.iter().find(|r| r.id == 23678734).unwrap();
1992            let instrument_id3 = make_instrument_id(&change.id, rc3.id, Decimal::ZERO);
1993            let ticker3 = parse_betfair_ticker(instrument_id3, rc3, ts, ts).unwrap();
1994
1995            assert!(ticker3.last_traded_price.is_nan());
1996            assert!((ticker3.traded_volume - 0.0).abs() < f64::EPSILON);
1997        } else {
1998            panic!("Expected MarketChange");
1999        }
2000    }
2001
2002    #[rstest]
2003    fn test_parse_betfair_starting_prices_from_fixture() {
2004        let data = load_test_json("stream/mcm_BSP_settled.json");
2005        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
2006
2007        if let StreamMessage::MarketChange(mcm) = msg {
2008            let mc = mcm.mc.as_ref().unwrap();
2009            let def = mc[0].market_definition.as_ref().unwrap();
2010            let ts = parse_millis_timestamp(mcm.pt);
2011
2012            let prices = parse_betfair_starting_prices(&mc[0].id, def, ts, ts);
2013
2014            // 3 runners have bsp values, 1 (REMOVED) does not
2015            assert_eq!(prices.len(), 3);
2016
2017            let bsp_map: std::collections::HashMap<String, f64> = prices
2018                .iter()
2019                .map(|p| (p.instrument_id.to_string(), p.bsp))
2020                .collect();
2021
2022            let id_winner = make_instrument_id("1.185781465", 9249757, Decimal::ZERO).to_string();
2023            let id_placed = make_instrument_id("1.185781465", 40273293, Decimal::ZERO).to_string();
2024            let id_loser = make_instrument_id("1.185781465", 11120000, Decimal::ZERO).to_string();
2025
2026            assert!((bsp_map[&id_winner] - 5.73).abs() < f64::EPSILON);
2027            assert!((bsp_map[&id_placed] - 2.14).abs() < f64::EPSILON);
2028            assert!((bsp_map[&id_loser] - 28.56).abs() < f64::EPSILON);
2029        } else {
2030            panic!("Expected MarketChange");
2031        }
2032    }
2033
2034    #[rstest]
2035    fn test_parse_betfair_starting_prices_no_runners() {
2036        let def = MarketDefinition {
2037            runners: None,
2038            bet_delay: None,
2039            betting_type: None,
2040            bsp_market: None,
2041            bsp_reconciled: None,
2042            competition_id: None,
2043            competition_name: None,
2044            complete: None,
2045            country_code: None,
2046            cross_matching: None,
2047            discount_allowed: None,
2048            each_way_divisor: None,
2049            event_id: None,
2050            event_name: None,
2051            event_type_id: None,
2052            event_type_name: None,
2053            in_play: None,
2054            line_interval: None,
2055            line_max_unit: None,
2056            line_min_unit: None,
2057            market_base_rate: None,
2058            market_id: None,
2059            market_name: None,
2060            market_time: None,
2061            market_type: None,
2062            number_of_active_runners: None,
2063            number_of_winners: None,
2064            open_date: None,
2065            persistence_enabled: None,
2066            price_ladder_definition: None,
2067            race_type: None,
2068            regulators: None,
2069            runners_voidable: None,
2070            settled_time: None,
2071            status: None,
2072            suspend_time: None,
2073            timezone: None,
2074            turn_in_play_enabled: None,
2075            venue: None,
2076            version: None,
2077        };
2078        let ts = UnixNanos::from(1_000_000_000u64);
2079
2080        let prices = parse_betfair_starting_prices("1.12345", &def, ts, ts);
2081
2082        assert!(prices.is_empty());
2083    }
2084
2085    #[rstest]
2086    fn test_parse_bsp_book_deltas_from_fixture() {
2087        let data = load_test_json("stream/mcm_BSP.json");
2088        let messages: Vec<StreamMessage> = serde_json::from_str(&data).unwrap();
2089
2090        // Find the MCM with runner changes containing spb/spl data
2091        let mcm = messages
2092            .iter()
2093            .find_map(|m| match m {
2094                StreamMessage::MarketChange(mcm) => {
2095                    let mc = mcm.mc.as_ref()?;
2096                    let has_spb = mc.iter().any(|c| {
2097                        c.rc.as_ref()
2098                            .is_some_and(|rcs| rcs.iter().any(|r| r.spb.is_some()))
2099                    });
2100
2101                    if has_spb { Some(mcm) } else { None }
2102                }
2103                _ => None,
2104            })
2105            .expect("fixture should contain MCM with spb data");
2106
2107        let mc = mcm.mc.as_ref().unwrap();
2108        let change = &mc[0];
2109        let rc_list = change.rc.as_ref().unwrap();
2110
2111        // Runner 9249757 has spb and spl arrays
2112        let rc = rc_list.iter().find(|r| r.id == 9249757).unwrap();
2113        let instrument_id = make_instrument_id(&change.id, rc.id, Decimal::ZERO);
2114        let ts = parse_millis_timestamp(mcm.pt);
2115
2116        let deltas = parse_bsp_book_deltas(instrument_id, rc, ts, ts);
2117
2118        let spb_count = rc.spb.as_ref().unwrap().len();
2119        let spl_count = rc.spl.as_ref().unwrap().len();
2120        assert_eq!(deltas.len(), spb_count + spl_count);
2121
2122        // SPB entries are Sell side
2123        assert_eq!(deltas[0].side, OrderSide::Sell as u32);
2124        assert!((deltas[0].price - 1000.0).abs() < f64::EPSILON);
2125        assert!((deltas[0].size - 33.38).abs() < f64::EPSILON);
2126        assert_eq!(deltas[0].action, BookAction::Update as u32);
2127
2128        // SPL entries are Buy side
2129        let spl_start = spb_count;
2130        assert_eq!(deltas[spl_start].side, OrderSide::Buy as u32);
2131        assert!((deltas[spl_start].price - 7.0).abs() < f64::EPSILON);
2132        assert!((deltas[spl_start].size - 10.0).abs() < f64::EPSILON);
2133    }
2134
2135    #[rstest]
2136    fn test_parse_bsp_book_deltas_zero_volume_is_delete() {
2137        let rc = RunnerChange {
2138            id: 12345,
2139            hc: None,
2140            atb: None,
2141            atl: None,
2142            batb: None,
2143            batl: None,
2144            bdatb: None,
2145            bdatl: None,
2146            spb: Some(vec![PV {
2147                price: Decimal::new(50, 1),
2148                volume: Decimal::ZERO,
2149            }]),
2150            spl: None,
2151            spn: None,
2152            spf: None,
2153            trd: None,
2154            ltp: None,
2155            tv: None,
2156        };
2157        let ts = UnixNanos::from(1_000_000_000u64);
2158        let instrument_id = make_instrument_id("1.12345", 12345, Decimal::ZERO);
2159
2160        let deltas = parse_bsp_book_deltas(instrument_id, &rc, ts, ts);
2161
2162        assert_eq!(deltas.len(), 1);
2163        assert_eq!(deltas[0].action, BookAction::Delete as u32);
2164        assert!((deltas[0].price - 5.0).abs() < f64::EPSILON);
2165        assert!((deltas[0].size - 0.0).abs() < f64::EPSILON);
2166    }
2167
2168    #[rstest]
2169    fn test_parse_bsp_book_deltas_no_spb_spl_returns_empty() {
2170        let rc = runner_change_with_ticker(12345, None, None, None, None);
2171        let ts = UnixNanos::from(1_000_000_000u64);
2172        let instrument_id = make_instrument_id("1.12345", 12345, Decimal::ZERO);
2173
2174        let deltas = parse_bsp_book_deltas(instrument_id, &rc, ts, ts);
2175
2176        assert!(deltas.is_empty());
2177    }
2178
2179    #[rstest]
2180    fn test_parse_instrument_closes_from_fixture() {
2181        let data = load_test_json("stream/mcm_BSP_settled.json");
2182        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
2183
2184        if let StreamMessage::MarketChange(mcm) = msg {
2185            let mc = mcm.mc.as_ref().unwrap();
2186            let def = mc[0].market_definition.as_ref().unwrap();
2187            let ts = parse_millis_timestamp(mcm.pt);
2188
2189            let closes = parse_instrument_closes(&mc[0].id, def, ts, ts);
2190
2191            // 4 runners: WINNER, PLACED, LOSER, REMOVED - all produce close events
2192            assert_eq!(closes.len(), 4);
2193
2194            let close_map: std::collections::HashMap<String, Price> = closes
2195                .iter()
2196                .map(|c| (c.instrument_id.to_string(), c.close_price))
2197                .collect();
2198
2199            let id_winner = make_instrument_id("1.185781465", 9249757, Decimal::ZERO).to_string();
2200            let id_placed = make_instrument_id("1.185781465", 40273293, Decimal::ZERO).to_string();
2201            let id_loser = make_instrument_id("1.185781465", 11120000, Decimal::ZERO).to_string();
2202            let id_removed = make_instrument_id("1.185781465", 37433527, Decimal::ZERO).to_string();
2203
2204            assert_eq!(close_map[&id_winner], Price::from("1.00"));
2205            assert_eq!(close_map[&id_placed], Price::from("1.00"));
2206            assert_eq!(close_map[&id_loser], Price::from("0.00"));
2207            assert_eq!(close_map[&id_removed], Price::from("0.00"));
2208        } else {
2209            panic!("Expected MarketChange");
2210        }
2211    }
2212
2213    #[rstest]
2214    fn test_parse_instrument_closes_active_runners_excluded() {
2215        let data = load_test_json("stream/mcm_BSP.json");
2216        let messages: Vec<StreamMessage> = serde_json::from_str(&data).unwrap();
2217
2218        // Find the MCM with a market definition containing ACTIVE runners
2219        let mcm = messages
2220            .iter()
2221            .find_map(|m| match m {
2222                StreamMessage::MarketChange(mcm) => {
2223                    let mc = mcm.mc.as_ref()?;
2224                    mc.iter()
2225                        .find(|c| c.market_definition.is_some())
2226                        .map(|_| mcm)
2227                }
2228                _ => None,
2229            })
2230            .expect("fixture should contain MCM with market definition");
2231
2232        let mc = mcm.mc.as_ref().unwrap();
2233        let change = mc.iter().find(|c| c.market_definition.is_some()).unwrap();
2234        let def = change.market_definition.as_ref().unwrap();
2235        let ts = parse_millis_timestamp(mcm.pt);
2236
2237        let closes = parse_instrument_closes(&change.id, def, ts, ts);
2238
2239        assert!(
2240            closes.is_empty(),
2241            "Active runners should not produce close events, found {}",
2242            closes.len()
2243        );
2244    }
2245
2246    fn make_test_uo(
2247        bet_id: &str,
2248        size: Decimal,
2249        sm: Option<Decimal>,
2250        avp: Option<Decimal>,
2251    ) -> UnmatchedOrder {
2252        UnmatchedOrder {
2253            id: bet_id.to_string(),
2254            p: Decimal::new(25, 1),
2255            s: size,
2256            side: StreamingSide::Back,
2257            status: StreamingOrderStatus::Executable,
2258            pt: Some(StreamingPersistenceType::Lapse),
2259            ot: StreamingOrderType::Limit,
2260            pd: 1616568581000,
2261            bsp: None,
2262            rfo: None,
2263            rfs: None,
2264            rc: None,
2265            rac: None,
2266            md: None,
2267            cd: None,
2268            ld: None,
2269            avp,
2270            sm,
2271            sr: None,
2272            sl: None,
2273            sc: None,
2274            sv: None,
2275            lsrc: None,
2276        }
2277    }
2278
2279    #[rstest]
2280    fn test_fill_tracker_sync_order_prevents_duplicate_fill() {
2281        let mut tracker = FillTracker::new();
2282
2283        // Sync existing fill state: 10 matched at 2.5
2284        tracker.sync_order("123456", Decimal::new(10, 0), Decimal::new(25, 1));
2285
2286        let uo = make_test_uo(
2287            "123456",
2288            Decimal::new(20, 0),
2289            Some(Decimal::new(10, 0)),
2290            Some(Decimal::new(25, 1)),
2291        );
2292
2293        let instrument_id = InstrumentId::from("1.234567-123456-0.0.BETFAIR");
2294        let account_id = AccountId::from("BETFAIR-001");
2295        let currency = Currency::from("GBP");
2296        let ts = UnixNanos::default();
2297
2298        // Same sm=10 as synced, should not emit a fill
2299        let result =
2300            tracker.maybe_fill_report(&uo, uo.s, instrument_id, account_id, currency, ts, ts);
2301        assert!(
2302            result.is_none(),
2303            "should not emit fill for already-synced qty"
2304        );
2305    }
2306
2307    #[rstest]
2308    fn test_fill_tracker_sync_order_allows_incremental_fill() {
2309        let mut tracker = FillTracker::new();
2310
2311        // Sync: 10 matched at 2.5
2312        tracker.sync_order("123456", Decimal::new(10, 0), Decimal::new(25, 1));
2313
2314        let uo = make_test_uo(
2315            "123456",
2316            Decimal::new(20, 0),
2317            Some(Decimal::new(15, 0)),
2318            Some(Decimal::new(26, 1)),
2319        );
2320
2321        let instrument_id = InstrumentId::from("1.234567-123456-0.0.BETFAIR");
2322        let account_id = AccountId::from("BETFAIR-001");
2323        let currency = Currency::from("GBP");
2324        let ts = UnixNanos::default();
2325
2326        // sm=15 vs synced 10, should emit incremental fill of 5
2327        let result =
2328            tracker.maybe_fill_report(&uo, uo.s, instrument_id, account_id, currency, ts, ts);
2329        assert!(result.is_some(), "should emit fill for new matched qty");
2330        let fill = result.unwrap();
2331        assert_eq!(fill.last_qty, Quantity::from("5.00"));
2332    }
2333
2334    #[rstest]
2335    fn test_fill_tracker_overfill_rejected() {
2336        let mut tracker = FillTracker::new();
2337
2338        // sm=30 exceeds order size s=20
2339        let uo = make_test_uo(
2340            "999001",
2341            Decimal::new(20, 0),
2342            Some(Decimal::new(30, 0)),
2343            Some(Decimal::new(25, 1)),
2344        );
2345
2346        let instrument_id = InstrumentId::from("1.234567-999001-0.0.BETFAIR");
2347        let account_id = AccountId::from("BETFAIR-001");
2348        let currency = Currency::from("GBP");
2349        let ts = UnixNanos::default();
2350
2351        let result =
2352            tracker.maybe_fill_report(&uo, uo.s, instrument_id, account_id, currency, ts, ts);
2353        assert!(
2354            result.is_none(),
2355            "overfill (sm > order_qty) should be rejected"
2356        );
2357    }
2358
2359    #[rstest]
2360    fn test_fill_tracker_zero_sm_returns_none() {
2361        let mut tracker = FillTracker::new();
2362
2363        let uo = make_test_uo("999002", Decimal::new(10, 0), Some(Decimal::ZERO), None);
2364
2365        let instrument_id = InstrumentId::from("1.234567-999002-0.0.BETFAIR");
2366        let result = tracker.maybe_fill_report(
2367            &uo,
2368            uo.s,
2369            instrument_id,
2370            AccountId::from("BETFAIR-001"),
2371            Currency::from("GBP"),
2372            UnixNanos::default(),
2373            UnixNanos::default(),
2374        );
2375        assert!(result.is_none(), "zero sm should not produce a fill");
2376    }
2377
2378    #[rstest]
2379    fn test_fill_tracker_no_avp_uses_order_price() {
2380        let data = load_test_json("stream/ocm_FILLED_no_avp.json");
2381        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
2382
2383        if let StreamMessage::OrderChange(ocm) = msg {
2384            let oc = ocm.oc.as_ref().unwrap();
2385            let omc = &oc[0];
2386            let orc = &omc.orc.as_ref().unwrap()[0];
2387            let uo = &orc.uo.as_ref().unwrap()[0];
2388            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
2389            let ts = parse_millis_timestamp(ocm.pt);
2390
2391            let mut tracker = FillTracker::new();
2392            let fill = tracker
2393                .maybe_fill_report(
2394                    uo,
2395                    uo.s,
2396                    instrument_id,
2397                    AccountId::from("BETFAIR-001"),
2398                    Currency::GBP(),
2399                    ts,
2400                    ts,
2401                )
2402                .expect("should produce fill even without avp");
2403
2404            // No avp field, so fill price falls back to order price (p=3.5)
2405            assert_eq!(fill.last_qty.as_f64(), 25.0);
2406            assert_eq!(fill.last_px.as_f64(), 3.5);
2407        } else {
2408            panic!("expected OrderChange");
2409        }
2410    }
2411
2412    #[rstest]
2413    fn test_fill_tracker_weighted_avg_back_calculation() {
2414        let mut tracker = FillTracker::new();
2415        let instrument_id = InstrumentId::from("1.234567-999003-0.0.BETFAIR");
2416        let account_id = AccountId::from("BETFAIR-001");
2417        let currency = Currency::from("GBP");
2418        let ts = UnixNanos::default();
2419
2420        // First fill: 10 @ avp=2.0
2421        let uo1 = make_test_uo(
2422            "999003",
2423            Decimal::new(30, 0),
2424            Some(Decimal::new(10, 0)),
2425            Some(Decimal::new(20, 1)),
2426        );
2427        let fill1 = tracker
2428            .maybe_fill_report(&uo1, uo1.s, instrument_id, account_id, currency, ts, ts)
2429            .expect("first fill");
2430        assert_eq!(fill1.last_px.as_f64(), 2.0);
2431        assert_eq!(fill1.last_qty.as_f64(), 10.0);
2432
2433        // Second fill: sm=20, avp=2.5
2434        // Back-calc: (2.5*20 - 2.0*10) / 10 = (50-20)/10 = 3.0
2435        let uo2 = make_test_uo(
2436            "999003",
2437            Decimal::new(30, 0),
2438            Some(Decimal::new(20, 0)),
2439            Some(Decimal::new(25, 1)),
2440        );
2441        let fill2 = tracker
2442            .maybe_fill_report(&uo2, uo2.s, instrument_id, account_id, currency, ts, ts)
2443            .expect("second fill");
2444        assert_eq!(fill2.last_qty.as_f64(), 10.0);
2445        assert_eq!(fill2.last_px.as_f64(), 3.0);
2446    }
2447
2448    #[rstest]
2449    fn test_fill_tracker_negative_fill_price_falls_back_to_avp() {
2450        let mut tracker = FillTracker::new();
2451        let instrument_id = InstrumentId::from("1.234567-999004-0.0.BETFAIR");
2452        let account_id = AccountId::from("BETFAIR-001");
2453        let currency = Currency::from("GBP");
2454        let ts = UnixNanos::default();
2455
2456        // First fill: 10 @ avp=5.0
2457        let uo1 = make_test_uo(
2458            "999004",
2459            Decimal::new(20, 0),
2460            Some(Decimal::new(10, 0)),
2461            Some(Decimal::new(50, 1)),
2462        );
2463        tracker
2464            .maybe_fill_report(&uo1, uo1.s, instrument_id, account_id, currency, ts, ts)
2465            .expect("first fill");
2466
2467        // Second fill: sm=15, avp=1.0
2468        // Back-calc: (1.0*15 - 5.0*10) / 5 = (15-50)/5 = -7.0
2469        // Negative price should fall back to avp=1.0
2470        let uo2 = make_test_uo(
2471            "999004",
2472            Decimal::new(20, 0),
2473            Some(Decimal::new(15, 0)),
2474            Some(Decimal::new(10, 1)),
2475        );
2476        let fill2 = tracker
2477            .maybe_fill_report(&uo2, uo2.s, instrument_id, account_id, currency, ts, ts)
2478            .expect("second fill should use avp fallback");
2479        assert_eq!(fill2.last_qty.as_f64(), 5.0);
2480        assert_eq!(fill2.last_px.as_f64(), 1.0);
2481    }
2482
2483    #[rstest]
2484    fn test_fill_tracker_prune_clears_state() {
2485        let mut tracker = FillTracker::new();
2486        let instrument_id = InstrumentId::from("1.234567-999005-0.0.BETFAIR");
2487        let account_id = AccountId::from("BETFAIR-001");
2488        let currency = Currency::from("GBP");
2489        let ts = UnixNanos::default();
2490
2491        // Fill order fully
2492        let uo = make_test_uo(
2493            "999005",
2494            Decimal::new(10, 0),
2495            Some(Decimal::new(10, 0)),
2496            Some(Decimal::new(25, 1)),
2497        );
2498        let fill1 =
2499            tracker.maybe_fill_report(&uo, uo.s, instrument_id, account_id, currency, ts, ts);
2500        assert!(fill1.is_some());
2501
2502        // Same data again - deduplicated
2503        let fill2 =
2504            tracker.maybe_fill_report(&uo, uo.s, instrument_id, account_id, currency, ts, ts);
2505        assert!(fill2.is_none(), "should be deduplicated");
2506
2507        // Prune the bet
2508        tracker.prune("999005");
2509
2510        // After prune, same data can produce a fill again (simulates re-processing)
2511        let fill3 =
2512            tracker.maybe_fill_report(&uo, uo.s, instrument_id, account_id, currency, ts, ts);
2513        assert!(fill3.is_some(), "after prune, should produce fill again");
2514    }
2515
2516    #[rstest]
2517    fn test_fill_tracker_sm_none_returns_none() {
2518        let mut tracker = FillTracker::new();
2519
2520        // sm=None (no matched quantity field at all)
2521        let uo = make_test_uo("999006", Decimal::new(10, 0), None, None);
2522
2523        let instrument_id = InstrumentId::from("1.234567-999006-0.0.BETFAIR");
2524        let result = tracker.maybe_fill_report(
2525            &uo,
2526            uo.s,
2527            instrument_id,
2528            AccountId::from("BETFAIR-001"),
2529            Currency::from("GBP"),
2530            UnixNanos::default(),
2531            UnixNanos::default(),
2532        );
2533        assert!(result.is_none(), "None sm should not produce a fill");
2534    }
2535
2536    #[rstest]
2537    fn test_parse_order_status_report_missing_persistence_type_for_market_on_close() {
2538        let uo = UnmatchedOrder {
2539            s: Decimal::ZERO,
2540            pt: None,
2541            ot: StreamingOrderType::MarketOnClose,
2542            sr: Some(Decimal::new(10, 0)),
2543            ..make_test_uo("999007", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2544        };
2545
2546        let report = parse_order_status_report(
2547            &uo,
2548            InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2549            AccountId::from("BETFAIR-001"),
2550            UnixNanos::default(),
2551            UnixNanos::default(),
2552        )
2553        .unwrap();
2554
2555        assert_eq!(report.quantity, Quantity::from("10.00"));
2556        assert_eq!(report.time_in_force, TimeInForce::AtTheClose);
2557    }
2558
2559    #[rstest]
2560    fn test_parse_order_status_report_missing_persistence_type_for_limit_on_close() {
2561        let uo = UnmatchedOrder {
2562            s: Decimal::ZERO,
2563            pt: None,
2564            ot: StreamingOrderType::LimitOnClose,
2565            sr: Some(Decimal::new(10, 0)),
2566            ..make_test_uo("999013", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2567        };
2568
2569        let report = parse_order_status_report(
2570            &uo,
2571            InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2572            AccountId::from("BETFAIR-001"),
2573            UnixNanos::default(),
2574            UnixNanos::default(),
2575        )
2576        .unwrap();
2577
2578        assert_eq!(report.quantity, Quantity::from("10.00"));
2579        assert_eq!(report.time_in_force, TimeInForce::AtTheClose);
2580    }
2581
2582    #[rstest]
2583    fn test_parse_order_status_report_market_on_close_uses_bsp_liability() {
2584        let uo = UnmatchedOrder {
2585            s: Decimal::ZERO,
2586            bsp: Some(Decimal::new(20, 1)),
2587            pt: None,
2588            ot: StreamingOrderType::MarketOnClose,
2589            ..make_test_uo("999010", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2590        };
2591
2592        let report = parse_order_status_report(
2593            &uo,
2594            InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2595            AccountId::from("BETFAIR-001"),
2596            UnixNanos::default(),
2597            UnixNanos::default(),
2598        )
2599        .unwrap();
2600
2601        assert_eq!(report.quantity, Quantity::from("2.00"));
2602        assert_eq!(report.time_in_force, TimeInForce::AtTheClose);
2603    }
2604
2605    #[rstest]
2606    fn test_parse_order_status_report_fails_for_non_positive_quantity() {
2607        let uo = UnmatchedOrder {
2608            s: Decimal::ZERO,
2609            sm: Some(Decimal::ZERO),
2610            sr: Some(Decimal::ZERO),
2611            sc: Some(Decimal::ZERO),
2612            sl: Some(Decimal::ZERO),
2613            sv: Some(Decimal::ZERO),
2614            ..make_test_uo("999014", Decimal::ZERO, Some(Decimal::ZERO), None)
2615        };
2616
2617        let result = parse_order_status_report(
2618            &uo,
2619            InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2620            AccountId::from("BETFAIR-001"),
2621            UnixNanos::default(),
2622            UnixNanos::default(),
2623        );
2624
2625        assert!(result.is_err());
2626        assert!(
2627            result
2628                .unwrap_err()
2629                .to_string()
2630                .contains("failed to resolve positive quantity for stream order update 999014")
2631        );
2632    }
2633
2634    #[rstest]
2635    fn test_parse_order_status_report_includes_lapse_reason() {
2636        let uo = UnmatchedOrder {
2637            status: StreamingOrderStatus::ExecutionComplete,
2638            sl: Some(Decimal::ONE),
2639            lsrc: Some(crate::common::enums::LapseStatusReasonCode::SpInPlay),
2640            ..make_test_uo("999012", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2641        };
2642
2643        let report = parse_order_status_report(
2644            &uo,
2645            InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2646            AccountId::from("BETFAIR-001"),
2647            UnixNanos::default(),
2648            UnixNanos::default(),
2649        )
2650        .unwrap();
2651
2652        assert_eq!(report.order_status, OrderStatus::Canceled);
2653        assert_eq!(report.cancel_reason.as_deref(), Some("SP_IN_PLAY"));
2654    }
2655
2656    #[rstest]
2657    fn test_parse_order_status_report_missing_persistence_type_fails_for_limit_order() {
2658        let uo = UnmatchedOrder {
2659            pt: None,
2660            ..make_test_uo("999008", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2661        };
2662
2663        let result = parse_order_status_report(
2664            &uo,
2665            InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2666            AccountId::from("BETFAIR-001"),
2667            UnixNanos::default(),
2668            UnixNanos::default(),
2669        );
2670
2671        assert!(result.is_err());
2672        assert_eq!(
2673            result.unwrap_err().to_string(),
2674            "missing persistence type for order update 999008"
2675        );
2676    }
2677
2678    #[rstest]
2679    fn test_fill_tracker_uses_lifecycle_quantity_when_stream_size_is_zero() {
2680        let mut tracker = FillTracker::new();
2681        let uo = UnmatchedOrder {
2682            s: Decimal::ZERO,
2683            sr: Some(Decimal::new(10, 0)),
2684            sm: Some(Decimal::new(5, 0)),
2685            avp: Some(Decimal::new(20, 1)),
2686            ..make_test_uo("999009", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2687        };
2688
2689        let fill = tracker
2690            .maybe_fill_report(
2691                &uo,
2692                uo.s,
2693                InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2694                AccountId::from("BETFAIR-001"),
2695                Currency::from("GBP"),
2696                UnixNanos::default(),
2697                UnixNanos::default(),
2698            )
2699            .expect("zero stream size should fall back to lifecycle quantities");
2700
2701        assert_eq!(fill.last_qty, Quantity::from("5.00"));
2702    }
2703
2704    #[rstest]
2705    fn test_fill_tracker_uses_bsp_liability_when_stream_size_is_zero() {
2706        let mut tracker = FillTracker::new();
2707        let uo = UnmatchedOrder {
2708            s: Decimal::ZERO,
2709            bsp: Some(Decimal::new(20, 1)),
2710            pt: None,
2711            ot: StreamingOrderType::MarketOnClose,
2712            sm: Some(Decimal::new(10, 1)),
2713            avp: Some(Decimal::new(20, 1)),
2714            ..make_test_uo("999011", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2715        };
2716
2717        let fill = tracker
2718            .maybe_fill_report(
2719                &uo,
2720                uo.s,
2721                InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2722                AccountId::from("BETFAIR-001"),
2723                Currency::from("GBP"),
2724                UnixNanos::default(),
2725                UnixNanos::default(),
2726            )
2727            .expect("zero stream size should fall back to bsp liability");
2728
2729        assert_eq!(fill.last_qty, Quantity::from("1.00"));
2730    }
2731
2732    #[rstest]
2733    fn test_fill_tracker_partial_void_still_emits_fill() {
2734        let data = load_test_json("stream/ocm_VOIDED_partial.json");
2735        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
2736
2737        if let StreamMessage::OrderChange(ocm) = msg {
2738            let oc = ocm.oc.as_ref().unwrap();
2739            let omc = &oc[0];
2740            let orc = &omc.orc.as_ref().unwrap()[0];
2741            let uo = &orc.uo.as_ref().unwrap()[0];
2742            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
2743            let ts = parse_millis_timestamp(ocm.pt);
2744
2745            let mut tracker = FillTracker::new();
2746            let fill = tracker
2747                .maybe_fill_report(
2748                    uo,
2749                    uo.s,
2750                    instrument_id,
2751                    AccountId::from("BETFAIR-001"),
2752                    Currency::GBP(),
2753                    ts,
2754                    ts,
2755                )
2756                .expect("should produce fill for matched portion");
2757
2758            // Order: s=100, sm=60, sv=40 -> fill qty should be 60
2759            assert_eq!(fill.last_qty.as_f64(), 60.0);
2760            assert_eq!(fill.last_px.as_f64(), 1.5);
2761            assert_eq!(fill.order_side, OrderSide::Sell);
2762        } else {
2763            panic!("expected OrderChange");
2764        }
2765    }
2766
2767    #[rstest]
2768    fn test_fill_tracker_no_fill_when_sv_zero_and_fully_filled() {
2769        let data = load_test_json("stream/ocm_FILLED_sv_zero.json");
2770        let msg: StreamMessage = serde_json::from_str(&data).unwrap();
2771
2772        if let StreamMessage::OrderChange(ocm) = msg {
2773            let oc = ocm.oc.as_ref().unwrap();
2774            let omc = &oc[0];
2775            let orc = &omc.orc.as_ref().unwrap()[0];
2776            let uo = &orc.uo.as_ref().unwrap()[0];
2777            let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
2778            let ts = parse_millis_timestamp(ocm.pt);
2779
2780            let mut tracker = FillTracker::new();
2781            let fill = tracker
2782                .maybe_fill_report(
2783                    uo,
2784                    uo.s,
2785                    instrument_id,
2786                    AccountId::from("BETFAIR-001"),
2787                    Currency::GBP(),
2788                    ts,
2789                    ts,
2790                )
2791                .expect("fully filled order should produce fill");
2792
2793            assert_eq!(fill.last_qty.as_f64(), 50.0);
2794            assert_eq!(fill.last_px.as_f64(), 2.0);
2795
2796            // sv=0, so no void event should be generated (tested separately)
2797            assert_eq!(uo.sv, Some(Decimal::ZERO));
2798        } else {
2799            panic!("expected OrderChange");
2800        }
2801    }
2802}