1use ahash::{AHashMap, AHashSet};
23use nautilus_core::UnixNanos;
24use nautilus_model::{
25 data::{
26 BookOrder, InstrumentClose, InstrumentStatus, OrderBookDelta, OrderBookDeltas, TradeTick,
27 },
28 enums::{
29 AggressorSide, BookAction, InstrumentCloseType, LiquiditySide, MarketStatusAction,
30 OrderSide, OrderType, RecordFlag, TimeInForce,
31 },
32 identifiers::{AccountId, ClientOrderId, InstrumentId, TradeId, VenueOrderId},
33 reports::{FillReport, OrderStatusReport},
34 types::{Currency, Money, Price, Quantity},
35};
36use rust_decimal::Decimal;
37
38use crate::{
39 common::{
40 consts::{BETFAIR_PRICE_PRECISION, BETFAIR_QUANTITY_PRECISION},
41 enums::{MarketStatus, RunnerStatus, StreamingOrderStatus, resolve_streaming_order_status},
42 parse::{make_instrument_id, parse_millis_timestamp},
43 },
44 data_types::{
45 BetfairBspBookDelta, BetfairRaceProgress, BetfairRaceRunnerData, BetfairStartingPrice,
46 BetfairTicker,
47 },
48 stream::messages::{
49 MarketDefinition, RaceProgressChange, RaceRunnerChange, RunnerChange, UnmatchedOrder,
50 },
51};
52
53pub fn parse_runner_book_deltas(
75 instrument_id: InstrumentId,
76 rc: &RunnerChange,
77 is_snapshot: bool,
78 sequence: u64,
79 ts_event: UnixNanos,
80 ts_init: UnixNanos,
81) -> anyhow::Result<Option<OrderBookDeltas>> {
82 let atb_len = rc.atb.as_ref().map_or(0, Vec::len);
83 let atl_len = rc.atl.as_ref().map_or(0, Vec::len);
84 let total_levels = atb_len + atl_len;
85
86 if total_levels == 0 && !is_snapshot {
87 return Ok(None);
88 }
89
90 let snapshot_flags = if is_snapshot {
91 RecordFlag::F_SNAPSHOT as u8
92 } else {
93 0
94 };
95 let mut deltas = Vec::with_capacity(total_levels + usize::from(is_snapshot));
96
97 if is_snapshot {
98 let mut clear = OrderBookDelta::clear(instrument_id, sequence, ts_event, ts_init);
99
100 if total_levels == 0 {
101 clear.flags |= RecordFlag::F_LAST as u8;
102 }
103 deltas.push(clear);
104 }
105
106 for pv in rc.atb.as_deref().unwrap_or(&[]) {
108 let action = if is_snapshot {
109 BookAction::Add
110 } else if pv.volume == Decimal::ZERO {
111 BookAction::Delete
112 } else {
113 BookAction::Update
114 };
115
116 deltas.push(OrderBookDelta::new(
117 instrument_id,
118 action,
119 BookOrder::new(
120 OrderSide::Buy,
121 Price::from_decimal_dp(pv.price, BETFAIR_PRICE_PRECISION)?,
122 Quantity::from_decimal_dp(pv.volume, BETFAIR_QUANTITY_PRECISION)?,
123 0,
124 ),
125 snapshot_flags,
126 sequence,
127 ts_event,
128 ts_init,
129 ));
130 }
131
132 for pv in rc.atl.as_deref().unwrap_or(&[]) {
134 let action = if is_snapshot {
135 BookAction::Add
136 } else if pv.volume == Decimal::ZERO {
137 BookAction::Delete
138 } else {
139 BookAction::Update
140 };
141
142 deltas.push(OrderBookDelta::new(
143 instrument_id,
144 action,
145 BookOrder::new(
146 OrderSide::Sell,
147 Price::from_decimal_dp(pv.price, BETFAIR_PRICE_PRECISION)?,
148 Quantity::from_decimal_dp(pv.volume, BETFAIR_QUANTITY_PRECISION)?,
149 0,
150 ),
151 snapshot_flags,
152 sequence,
153 ts_event,
154 ts_init,
155 ));
156 }
157
158 if let Some(last) = deltas.last_mut() {
160 last.flags |= RecordFlag::F_LAST as u8;
161 }
162
163 Ok(Some(OrderBookDeltas::new(instrument_id, deltas)))
164}
165
166#[must_use]
171pub fn make_trade_tick(
172 instrument_id: InstrumentId,
173 price: Price,
174 size: Quantity,
175 trade_id: TradeId,
176 ts_event: UnixNanos,
177 ts_init: UnixNanos,
178) -> TradeTick {
179 TradeTick::new(
180 instrument_id,
181 price,
182 size,
183 AggressorSide::NoAggressor,
184 trade_id,
185 ts_event,
186 ts_init,
187 )
188}
189
190#[must_use]
199pub fn parse_instrument_statuses(
200 market_id: &str,
201 def: &MarketDefinition,
202 ts_event: UnixNanos,
203 ts_init: UnixNanos,
204) -> Vec<InstrumentStatus> {
205 let Some(status) = def.status else {
206 return Vec::new();
207 };
208 let Some(runners) = &def.runners else {
209 return Vec::new();
210 };
211 let in_play = def.in_play.unwrap_or(false);
212
213 runners
214 .iter()
215 .map(|rd| {
216 let handicap = rd.hc.unwrap_or(Decimal::ZERO);
217 let instrument_id = make_instrument_id(market_id, rd.id, handicap);
218 let action = match rd.status {
219 Some(RunnerStatus::Removed | RunnerStatus::RemovedVacant) => {
220 MarketStatusAction::Close
221 }
222 _ => match (status, in_play) {
223 (MarketStatus::Inactive, _) => MarketStatusAction::Close,
224 (MarketStatus::Open, false) => MarketStatusAction::PreOpen,
225 (MarketStatus::Open, true) => MarketStatusAction::Trading,
226 (MarketStatus::Suspended, _) => MarketStatusAction::Pause,
227 (MarketStatus::Closed, _) => MarketStatusAction::Close,
228 },
229 };
230 let is_trading = matches!(status, MarketStatus::Open)
231 && in_play
232 && !matches!(
233 rd.status,
234 Some(RunnerStatus::Removed | RunnerStatus::RemovedVacant)
235 );
236 InstrumentStatus::new(
237 instrument_id,
238 action,
239 ts_event,
240 ts_init,
241 None,
242 None,
243 Some(is_trading),
244 None,
245 None,
246 )
247 })
248 .collect()
249}
250
251pub fn make_trade_id(uo: &UnmatchedOrder) -> TradeId {
256 let sm = uo.sm.unwrap_or(Decimal::ZERO);
257 TradeId::new(format!("{}-{sm}", uo.id))
258}
259
260#[derive(Debug, Default)]
267pub struct FillTracker {
268 filled_qty: AHashMap<String, Decimal>,
269 avg_px: AHashMap<String, Decimal>,
270 published_trade_ids: AHashSet<String>,
271}
272
273impl FillTracker {
274 #[must_use]
276 pub fn new() -> Self {
277 Self::default()
278 }
279
280 #[expect(clippy::too_many_arguments)]
285 pub fn maybe_fill_report(
286 &mut self,
287 uo: &UnmatchedOrder,
288 order_qty: Decimal,
289 instrument_id: InstrumentId,
290 account_id: AccountId,
291 currency: Currency,
292 ts_event: UnixNanos,
293 ts_init: UnixNanos,
294 ) -> Option<FillReport> {
295 let sm = uo.sm?;
296
297 if sm <= Decimal::ZERO {
298 return None;
299 }
300
301 let prev_filled = self
302 .filled_qty
303 .get(&uo.id)
304 .copied()
305 .unwrap_or(Decimal::ZERO);
306
307 if sm <= prev_filled {
308 return None;
309 }
310
311 let order_qty = resolve_stream_order_quantity(order_qty, uo);
312
313 if sm > order_qty {
315 log::warn!(
316 "Rejecting potential overfill for bet_id={}: order_qty={order_qty}, sm={sm}",
317 uo.id,
318 );
319 return None;
320 }
321
322 let trade_id = make_trade_id(uo);
323
324 if self.published_trade_ids.contains(trade_id.as_str()) {
325 return None;
326 }
327
328 let fill_qty_dec = sm - prev_filled;
329 let fill_price = self.compute_fill_price(uo, prev_filled);
330
331 let last_qty = Quantity::from_decimal_dp(fill_qty_dec, BETFAIR_QUANTITY_PRECISION).ok()?;
332 let last_px = Price::from_decimal_dp(fill_price, BETFAIR_PRICE_PRECISION).ok()?;
333
334 self.filled_qty.insert(uo.id.clone(), sm);
336
337 if let Some(avp) = uo.avp {
338 self.avg_px.insert(uo.id.clone(), avp);
339 }
340
341 self.published_trade_ids.insert(trade_id.to_string());
342
343 let venue_order_id = VenueOrderId::from(uo.id.as_str());
344 let order_side = OrderSide::from(uo.side);
345 let client_order_id = uo.rfo.as_deref().map(ClientOrderId::from);
346 let ts_fill = uo.md.map_or(ts_event, parse_millis_timestamp);
347
348 Some(make_fill_report(
349 account_id,
350 instrument_id,
351 venue_order_id,
352 trade_id,
353 order_side,
354 last_qty,
355 last_px,
356 currency,
357 client_order_id,
358 ts_fill,
359 ts_init,
360 ))
361 }
362
363 fn compute_fill_price(&self, uo: &UnmatchedOrder, prev_filled: Decimal) -> Decimal {
370 let Some(avp) = uo.avp else {
371 return uo.p;
372 };
373
374 if prev_filled == Decimal::ZERO {
375 return avp;
376 }
377
378 let Some(prev_avg) = self.avg_px.get(&uo.id).copied() else {
379 return avp;
380 };
381
382 if prev_avg == avp {
383 return avp;
384 }
385
386 let sm = uo.sm.unwrap_or(Decimal::ZERO);
387 let fill_size = sm - prev_filled;
388
389 if fill_size == Decimal::ZERO {
390 return prev_avg;
391 }
392
393 let fill_price = (avp * sm - prev_avg * prev_filled) / fill_size;
394
395 if fill_price <= Decimal::ZERO {
396 log::warn!(
397 "Calculated fill price {fill_price} is invalid for bet_id={}, falling back to avp={avp}",
398 uo.id,
399 );
400 return avp;
401 }
402
403 fill_price
404 }
405
406 pub fn sync_order(&mut self, bet_id: &str, filled_qty: Decimal, avg_px: Decimal) {
412 self.filled_qty.insert(bet_id.to_string(), filled_qty);
413
414 if avg_px > Decimal::ZERO {
415 self.avg_px.insert(bet_id.to_string(), avg_px);
416 }
417 }
418
419 pub fn prune(&mut self, bet_id: &str) {
421 self.filled_qty.remove(bet_id);
422 self.avg_px.remove(bet_id);
423
424 let prefix = format!("{bet_id}-");
425 self.published_trade_ids
426 .retain(|id| !id.starts_with(&prefix));
427 }
428}
429
430#[must_use]
432pub fn has_cancel_quantity(uo: &UnmatchedOrder) -> bool {
433 let sc = uo.sc.unwrap_or(Decimal::ZERO);
434 let sl = uo.sl.unwrap_or(Decimal::ZERO);
435 let sv = uo.sv.unwrap_or(Decimal::ZERO);
436 (sc + sl + sv) > Decimal::ZERO
437}
438
439#[must_use]
441pub fn is_lapsed(uo: &UnmatchedOrder) -> bool {
442 uo.status == StreamingOrderStatus::ExecutionComplete && uo.lsrc.is_some()
443}
444
445pub fn parse_order_status_report(
454 uo: &UnmatchedOrder,
455 instrument_id: InstrumentId,
456 account_id: AccountId,
457 ts_event: UnixNanos,
458 ts_init: UnixNanos,
459) -> anyhow::Result<OrderStatusReport> {
460 let order_side = OrderSide::from(uo.side);
461 let order_type = OrderType::from(uo.ot);
462 let time_in_force = parse_stream_time_in_force(uo)?;
463
464 let size_matched = uo.sm.unwrap_or(Decimal::ZERO);
465 let size_cancelled = uo.sc.unwrap_or(Decimal::ZERO);
466 let size_lapsed = uo.sl.unwrap_or(Decimal::ZERO);
467 let size_voided = uo.sv.unwrap_or(Decimal::ZERO);
468
469 let size_closed = size_cancelled + size_lapsed + size_voided;
471 let order_status = resolve_streaming_order_status(uo.status, size_matched, size_closed);
472
473 let quantity_decimal = stream_order_quantity(uo);
474 anyhow::ensure!(
475 quantity_decimal > Decimal::ZERO,
476 "failed to resolve positive quantity for stream order update {} \
477 (order_type={:?}, persistence_type={:?}, size={}, bsp_liability={:?}, \
478 size_matched={:?}, size_remaining={:?}, size_cancelled={:?}, size_lapsed={:?}, size_voided={:?})",
479 uo.id,
480 uo.ot,
481 uo.pt,
482 uo.s,
483 uo.bsp,
484 uo.sm,
485 uo.sr,
486 uo.sc,
487 uo.sl,
488 uo.sv,
489 );
490 let quantity = Quantity::from_decimal_dp(quantity_decimal, BETFAIR_QUANTITY_PRECISION)?;
491 let filled_qty = Quantity::from_decimal_dp(size_matched, BETFAIR_QUANTITY_PRECISION)?;
492
493 let ts_accepted = parse_millis_timestamp(uo.pd);
494
495 let ts_last = [uo.md, uo.cd, uo.ld]
497 .into_iter()
498 .flatten()
499 .max()
500 .map_or(ts_event, parse_millis_timestamp);
501
502 let venue_order_id = VenueOrderId::from(uo.id.as_str());
503 let client_order_id = uo.rfo.as_deref().map(ClientOrderId::from);
504
505 let price = Price::from_decimal_dp(uo.p, BETFAIR_PRICE_PRECISION)?;
506
507 let mut report = OrderStatusReport::new(
508 account_id,
509 instrument_id,
510 client_order_id,
511 venue_order_id,
512 order_side,
513 order_type,
514 time_in_force,
515 order_status,
516 quantity,
517 filled_qty,
518 ts_accepted,
519 ts_last,
520 ts_init,
521 None,
522 )
523 .with_price(price);
524
525 report.avg_px = uo.avp;
526 if let Some(lsrc) = uo.lsrc {
527 report.cancel_reason = Some(lsrc.to_string());
528 }
529
530 Ok(report)
531}
532
533fn parse_stream_time_in_force(uo: &UnmatchedOrder) -> anyhow::Result<TimeInForce> {
534 match uo.pt {
535 Some(persistence_type) => Ok(TimeInForce::from(persistence_type)),
536 None if matches!(
537 uo.ot,
538 crate::common::enums::StreamingOrderType::LimitOnClose
539 | crate::common::enums::StreamingOrderType::MarketOnClose
540 ) =>
541 {
542 Ok(TimeInForce::AtTheClose)
543 }
544 None => anyhow::bail!("missing persistence type for order update {}", uo.id),
545 }
546}
547
548fn stream_order_quantity(uo: &UnmatchedOrder) -> Decimal {
549 if uo.s > Decimal::ZERO {
550 return uo.s;
551 }
552
553 let lifecycle_qty = uo.sm.unwrap_or(Decimal::ZERO)
554 + uo.sr.unwrap_or(Decimal::ZERO)
555 + uo.sc.unwrap_or(Decimal::ZERO)
556 + uo.sl.unwrap_or(Decimal::ZERO)
557 + uo.sv.unwrap_or(Decimal::ZERO);
558
559 if lifecycle_qty > Decimal::ZERO {
560 return lifecycle_qty;
561 }
562
563 if uses_liability_based_stream_quantity(uo) {
564 return uo.bsp.unwrap_or(Decimal::ZERO);
565 }
566
567 Decimal::ZERO
568}
569
570fn resolve_stream_order_quantity(order_qty: Decimal, uo: &UnmatchedOrder) -> Decimal {
571 if order_qty > Decimal::ZERO {
572 order_qty
573 } else {
574 stream_order_quantity(uo)
575 }
576}
577
578fn uses_liability_based_stream_quantity(uo: &UnmatchedOrder) -> bool {
579 matches!(
580 uo.ot,
581 crate::common::enums::StreamingOrderType::LimitOnClose
582 | crate::common::enums::StreamingOrderType::MarketOnClose
583 )
584}
585
586#[must_use]
591#[expect(clippy::too_many_arguments)]
592pub fn make_fill_report(
593 account_id: AccountId,
594 instrument_id: InstrumentId,
595 venue_order_id: VenueOrderId,
596 trade_id: TradeId,
597 order_side: OrderSide,
598 last_qty: Quantity,
599 last_px: Price,
600 currency: Currency,
601 client_order_id: Option<ClientOrderId>,
602 ts_event: UnixNanos,
603 ts_init: UnixNanos,
604) -> FillReport {
605 FillReport::new(
606 account_id,
607 instrument_id,
608 venue_order_id,
609 trade_id,
610 order_side,
611 last_qty,
612 last_px,
613 Money::new(0.0, currency),
614 LiquiditySide::NoLiquiditySide,
615 client_order_id,
616 None,
617 ts_event,
618 ts_init,
619 None,
620 )
621}
622
623#[must_use]
627pub fn parse_betfair_ticker(
628 instrument_id: InstrumentId,
629 rc: &RunnerChange,
630 ts_event: UnixNanos,
631 ts_init: UnixNanos,
632) -> Option<BetfairTicker> {
633 if rc.ltp.is_none() && rc.tv.is_none() && rc.spn.is_none() && rc.spf.is_none() {
634 return None;
635 }
636
637 Some(BetfairTicker::new(
638 instrument_id,
639 rc.ltp.map_or(f64::NAN, |d| {
640 d.to_string().parse::<f64>().unwrap_or(f64::NAN)
641 }),
642 rc.tv.map_or(f64::NAN, |d| {
643 d.to_string().parse::<f64>().unwrap_or(f64::NAN)
644 }),
645 rc.spn.map_or(f64::NAN, |d| {
646 d.to_string().parse::<f64>().unwrap_or(f64::NAN)
647 }),
648 rc.spf.map_or(f64::NAN, |d| {
649 d.to_string().parse::<f64>().unwrap_or(f64::NAN)
650 }),
651 ts_event,
652 ts_init,
653 ))
654}
655
656#[must_use]
660pub fn parse_betfair_starting_prices(
661 market_id: &str,
662 def: &MarketDefinition,
663 ts_event: UnixNanos,
664 ts_init: UnixNanos,
665) -> Vec<BetfairStartingPrice> {
666 let Some(runners) = &def.runners else {
667 return Vec::new();
668 };
669
670 runners
671 .iter()
672 .filter_map(|rd| {
673 let bsp = rd.bsp?;
674 let handicap = rd.hc.unwrap_or(Decimal::ZERO);
675 let instrument_id = make_instrument_id(market_id, rd.id, handicap);
676 let bsp_f64 = bsp.to_string().parse::<f64>().unwrap_or(f64::NAN);
677 Some(BetfairStartingPrice::new(
678 instrument_id,
679 bsp_f64,
680 ts_event,
681 ts_init,
682 ))
683 })
684 .collect()
685}
686
687#[must_use]
693pub fn parse_bsp_book_deltas(
694 instrument_id: InstrumentId,
695 rc: &RunnerChange,
696 ts_event: UnixNanos,
697 ts_init: UnixNanos,
698) -> Vec<BetfairBspBookDelta> {
699 let spb_len = rc.spb.as_ref().map_or(0, Vec::len);
700 let spl_len = rc.spl.as_ref().map_or(0, Vec::len);
701
702 if spb_len + spl_len == 0 {
703 return Vec::new();
704 }
705
706 let mut result = Vec::with_capacity(spb_len + spl_len);
707
708 for pv in rc.spb.as_deref().unwrap_or(&[]) {
710 let action = if pv.volume == Decimal::ZERO {
711 BookAction::Delete as u32
712 } else {
713 BookAction::Update as u32
714 };
715
716 result.push(BetfairBspBookDelta::new(
717 instrument_id,
718 action,
719 OrderSide::Sell as u32,
720 pv.price.to_string().parse::<f64>().unwrap_or(f64::NAN),
721 pv.volume.to_string().parse::<f64>().unwrap_or(0.0),
722 ts_event,
723 ts_init,
724 ));
725 }
726
727 for pv in rc.spl.as_deref().unwrap_or(&[]) {
729 let action = if pv.volume == Decimal::ZERO {
730 BookAction::Delete as u32
731 } else {
732 BookAction::Update as u32
733 };
734
735 result.push(BetfairBspBookDelta::new(
736 instrument_id,
737 action,
738 OrderSide::Buy as u32,
739 pv.price.to_string().parse::<f64>().unwrap_or(f64::NAN),
740 pv.volume.to_string().parse::<f64>().unwrap_or(0.0),
741 ts_event,
742 ts_init,
743 ));
744 }
745
746 result
747}
748
749#[must_use]
754pub fn parse_instrument_closes(
755 market_id: &str,
756 def: &MarketDefinition,
757 ts_event: UnixNanos,
758 ts_init: UnixNanos,
759) -> Vec<InstrumentClose> {
760 let Some(runners) = &def.runners else {
761 return Vec::new();
762 };
763
764 runners
765 .iter()
766 .filter_map(|rd| {
767 let status = rd.status.as_ref()?;
768 let close_price = match status {
769 RunnerStatus::Winner | RunnerStatus::Placed => Price::from("1.00"),
770 RunnerStatus::Loser | RunnerStatus::Removed | RunnerStatus::RemovedVacant => {
771 Price::from("0.00")
772 }
773 RunnerStatus::Active | RunnerStatus::Hidden => return None,
774 };
775
776 let handicap = rd.hc.unwrap_or(Decimal::ZERO);
777 let instrument_id = make_instrument_id(market_id, rd.id, handicap);
778
779 Some(InstrumentClose::new(
780 instrument_id,
781 close_price,
782 InstrumentCloseType::ContractExpired,
783 ts_event,
784 ts_init,
785 ))
786 })
787 .collect()
788}
789
790#[must_use]
794pub fn parse_race_runner_data(
795 race_id: &str,
796 market_id: &str,
797 rrc: &RaceRunnerChange,
798 ts_event: UnixNanos,
799 ts_init: UnixNanos,
800) -> Option<BetfairRaceRunnerData> {
801 let selection_id = rrc.id?;
802
803 Some(BetfairRaceRunnerData::new(
804 race_id.to_string(),
805 market_id.to_string(),
806 selection_id,
807 rrc.lat.unwrap_or(f64::NAN),
808 rrc.lng.unwrap_or(f64::NAN),
809 rrc.spd.unwrap_or(f64::NAN),
810 rrc.prg.unwrap_or(f64::NAN),
811 rrc.sfq.unwrap_or(f64::NAN),
812 ts_event,
813 ts_init,
814 ))
815}
816
817#[must_use]
819pub fn parse_race_progress(
820 race_id: &str,
821 market_id: &str,
822 rpc: &RaceProgressChange,
823 ts_event: UnixNanos,
824 ts_init: UnixNanos,
825) -> BetfairRaceProgress {
826 let order_json = rpc
827 .ord
828 .as_ref()
829 .map(|v| serde_json::to_string(v).unwrap_or_default())
830 .unwrap_or_default();
831
832 let jumps_json = rpc
833 .jumps
834 .as_ref()
835 .map(|v| serde_json::to_string(v).unwrap_or_default())
836 .unwrap_or_default();
837
838 BetfairRaceProgress::new(
839 race_id.to_string(),
840 market_id.to_string(),
841 rpc.g.clone().unwrap_or_default(),
842 rpc.st.unwrap_or(f64::NAN),
843 rpc.rt.unwrap_or(f64::NAN),
844 rpc.spd.unwrap_or(f64::NAN),
845 rpc.prg.unwrap_or(f64::NAN),
846 order_json,
847 jumps_json,
848 ts_event,
849 ts_init,
850 )
851}
852
853#[cfg(test)]
854mod tests {
855 use nautilus_model::enums::{MarketStatusAction, OrderStatus, TimeInForce};
856 use rstest::rstest;
857
858 use super::*;
859 use crate::{
860 common::{
861 enums::{StreamingOrderType, StreamingPersistenceType, StreamingSide},
862 testing::load_test_json,
863 },
864 stream::messages::{PV, RunnerDefinition, StreamMessage, stream_decode},
865 };
866
867 #[rstest]
868 fn test_parse_runner_book_snapshot() {
869 let data = load_test_json("stream/mcm_live_IMAGE.json");
870 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
871
872 if let StreamMessage::MarketChange(mcm) = msg {
873 let mc = mcm.mc.as_ref().unwrap();
874 let change = &mc[0];
875 let rc = &change.rc.as_ref().unwrap()[0];
876 let instrument_id = make_instrument_id(&change.id, rc.id, Decimal::ZERO);
877
878 let deltas = parse_runner_book_deltas(
879 instrument_id,
880 rc,
881 true,
882 mcm.pt,
883 parse_millis_timestamp(mcm.pt),
884 parse_millis_timestamp(mcm.pt),
885 )
886 .unwrap()
887 .expect("should produce deltas");
888
889 let atb_len = rc.atb.as_ref().unwrap().len();
891 let atl_len = rc.atl.as_ref().unwrap().len();
892 assert_eq!(deltas.deltas.len(), 1 + atb_len + atl_len);
893
894 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
896 assert!(RecordFlag::F_SNAPSHOT.matches(deltas.deltas[0].flags));
897
898 for delta in &deltas.deltas[1..] {
900 assert_eq!(delta.action, BookAction::Add);
901 assert!(RecordFlag::F_SNAPSHOT.matches(delta.flags));
902 }
903
904 let last = deltas.deltas.last().unwrap();
906 assert!(RecordFlag::F_LAST.matches(last.flags));
907
908 let buy_count = deltas
910 .deltas
911 .iter()
912 .filter(|d| d.order.side == OrderSide::Buy)
913 .count();
914 let sell_count = deltas
915 .deltas
916 .iter()
917 .filter(|d| d.order.side == OrderSide::Sell)
918 .count();
919 assert_eq!(buy_count, atb_len);
920 assert_eq!(sell_count, atl_len);
921 } else {
922 panic!("expected MarketChange");
923 }
924 }
925
926 #[rstest]
927 fn test_parse_runner_book_update() {
928 let data = load_test_json("stream/mcm_UPDATE.json");
929 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
930
931 if let StreamMessage::MarketChange(mcm) = msg {
932 let mc = mcm.mc.as_ref().unwrap();
933 let change = &mc[0];
934 let rc = &change.rc.as_ref().unwrap()[0];
935 let instrument_id = make_instrument_id(&change.id, rc.id, Decimal::ZERO);
936
937 let deltas = parse_runner_book_deltas(
938 instrument_id,
939 rc,
940 false,
941 mcm.pt,
942 parse_millis_timestamp(mcm.pt),
943 parse_millis_timestamp(mcm.pt),
944 )
945 .unwrap()
946 .expect("should produce deltas");
947
948 assert!(deltas.deltas.iter().all(|d| d.action != BookAction::Clear));
950
951 let last = deltas.deltas.last().unwrap();
953 assert!(RecordFlag::F_LAST.matches(last.flags));
954
955 for delta in &deltas.deltas {
957 assert!(!RecordFlag::F_SNAPSHOT.matches(delta.flags));
958 }
959 } else {
960 panic!("expected MarketChange");
961 }
962 }
963
964 #[rstest]
965 fn test_parse_runner_book_update_zero_volume_is_delete() {
966 let data = load_test_json("stream/mcm_UPDATE.json");
967 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
968
969 if let StreamMessage::MarketChange(mcm) = msg {
970 let mc = mcm.mc.as_ref().unwrap();
971 let change = &mc[0];
972 let rc = &change.rc.as_ref().unwrap()[0];
973 let instrument_id = make_instrument_id(&change.id, rc.id, Decimal::ZERO);
974
975 let deltas = parse_runner_book_deltas(
976 instrument_id,
977 rc,
978 false,
979 mcm.pt,
980 parse_millis_timestamp(mcm.pt),
981 parse_millis_timestamp(mcm.pt),
982 )
983 .unwrap()
984 .unwrap();
985
986 assert!(
988 deltas.deltas.iter().any(|d| d.action == BookAction::Delete),
989 "zero volume should produce Delete action"
990 );
991 } else {
992 panic!("expected MarketChange");
993 }
994 }
995
996 #[rstest]
997 fn test_parse_runner_book_no_levels_returns_none() {
998 let rc = RunnerChange {
999 id: 12345,
1000 hc: None,
1001 atb: None,
1002 atl: None,
1003 batb: None,
1004 batl: None,
1005 bdatb: None,
1006 bdatl: None,
1007 spb: None,
1008 spl: None,
1009 spn: None,
1010 spf: None,
1011 trd: None,
1012 ltp: None,
1013 tv: None,
1014 };
1015
1016 let result = parse_runner_book_deltas(
1017 make_instrument_id("1.123", 12345, Decimal::ZERO),
1018 &rc,
1019 false,
1020 0,
1021 UnixNanos::default(),
1022 UnixNanos::default(),
1023 );
1024
1025 assert!(result.unwrap().is_none());
1026 }
1027
1028 #[rstest]
1029 fn test_make_trade_tick() {
1030 let instrument_id = make_instrument_id("1.180737206", 19248890, Decimal::ZERO);
1031 let tick = make_trade_tick(
1032 instrument_id,
1033 Price::new(2.42, BETFAIR_PRICE_PRECISION),
1034 Quantity::new(100.0, BETFAIR_QUANTITY_PRECISION),
1035 TradeId::from("test-trade-1"),
1036 UnixNanos::default(),
1037 UnixNanos::default(),
1038 );
1039
1040 assert_eq!(tick.instrument_id, instrument_id);
1041 assert_eq!(tick.price.as_f64(), 2.42);
1042 assert_eq!(tick.size.as_f64(), 100.0);
1043 assert_eq!(tick.aggressor_side, AggressorSide::NoAggressor);
1044 }
1045
1046 fn make_status_def(
1047 status: MarketStatus,
1048 in_play: bool,
1049 runner_status: RunnerStatus,
1050 ) -> MarketDefinition {
1051 MarketDefinition {
1052 runners: Some(vec![RunnerDefinition {
1053 id: 456,
1054 hc: None,
1055 sort_priority: None,
1056 name: None,
1057 status: Some(runner_status),
1058 adjustment_factor: None,
1059 bsp: None,
1060 removal_date: None,
1061 }]),
1062 bet_delay: None,
1063 betting_type: None,
1064 bsp_market: None,
1065 bsp_reconciled: None,
1066 competition_id: None,
1067 competition_name: None,
1068 complete: None,
1069 country_code: None,
1070 cross_matching: None,
1071 discount_allowed: None,
1072 each_way_divisor: None,
1073 event_id: None,
1074 event_name: None,
1075 event_type_id: None,
1076 event_type_name: None,
1077 in_play: Some(in_play),
1078 line_interval: None,
1079 line_max_unit: None,
1080 line_min_unit: None,
1081 market_base_rate: None,
1082 market_id: None,
1083 market_name: None,
1084 market_time: None,
1085 market_type: None,
1086 number_of_active_runners: None,
1087 number_of_winners: None,
1088 open_date: None,
1089 persistence_enabled: None,
1090 price_ladder_definition: None,
1091 race_type: None,
1092 regulators: None,
1093 runners_voidable: None,
1094 settled_time: None,
1095 status: Some(status),
1096 suspend_time: None,
1097 timezone: None,
1098 turn_in_play_enabled: None,
1099 venue: None,
1100 version: None,
1101 }
1102 }
1103
1104 #[rstest]
1105 #[case(MarketStatus::Open, false, MarketStatusAction::PreOpen, false)]
1106 #[case(MarketStatus::Open, true, MarketStatusAction::Trading, true)]
1107 #[case(MarketStatus::Closed, false, MarketStatusAction::Close, false)]
1108 #[case(MarketStatus::Closed, true, MarketStatusAction::Close, false)]
1109 #[case(MarketStatus::Suspended, false, MarketStatusAction::Pause, false)]
1110 #[case(MarketStatus::Suspended, true, MarketStatusAction::Pause, false)]
1111 #[case(MarketStatus::Inactive, false, MarketStatusAction::Close, false)]
1112 fn test_parse_instrument_statuses_market_state(
1113 #[case] status: MarketStatus,
1114 #[case] in_play: bool,
1115 #[case] expected_action: MarketStatusAction,
1116 #[case] expected_is_trading: bool,
1117 ) {
1118 let def = make_status_def(status, in_play, RunnerStatus::Active);
1119 let results =
1120 parse_instrument_statuses("1.123", &def, UnixNanos::default(), UnixNanos::default());
1121
1122 assert_eq!(results.len(), 1);
1123 assert_eq!(results[0].action, expected_action);
1124 assert_eq!(results[0].is_trading, Some(expected_is_trading));
1125 }
1126
1127 #[rstest]
1128 #[case(RunnerStatus::Removed)]
1129 #[case(RunnerStatus::RemovedVacant)]
1130 fn test_parse_instrument_statuses_scratched_runner_closes(#[case] runner_status: RunnerStatus) {
1131 let def = make_status_def(MarketStatus::Open, true, runner_status);
1133 let results =
1134 parse_instrument_statuses("1.123", &def, UnixNanos::default(), UnixNanos::default());
1135
1136 assert_eq!(results.len(), 1);
1137 assert_eq!(results[0].action, MarketStatusAction::Close);
1138 assert_eq!(results[0].is_trading, Some(false));
1139 }
1140
1141 #[rstest]
1142 #[case::missing_runners("runners")]
1143 #[case::missing_status("status")]
1144 fn test_parse_instrument_statuses_returns_empty(#[case] drop_field: &str) {
1145 let mut def = make_status_def(MarketStatus::Open, true, RunnerStatus::Active);
1146 match drop_field {
1147 "runners" => def.runners = None,
1148 "status" => def.status = None,
1149 _ => unreachable!(),
1150 }
1151
1152 let results =
1153 parse_instrument_statuses("1.123", &def, UnixNanos::default(), UnixNanos::default());
1154
1155 assert!(results.is_empty());
1156 }
1157
1158 #[rstest]
1159 fn test_parse_instrument_statuses_mixed_runners() {
1160 let mut def = make_status_def(MarketStatus::Open, true, RunnerStatus::Active);
1164 def.runners = Some(vec![
1165 RunnerDefinition {
1166 id: 101,
1167 hc: None,
1168 sort_priority: Some(1),
1169 name: None,
1170 status: Some(RunnerStatus::Active),
1171 adjustment_factor: None,
1172 bsp: None,
1173 removal_date: None,
1174 },
1175 RunnerDefinition {
1176 id: 202,
1177 hc: Some(Decimal::new(25, 1)), sort_priority: Some(2),
1179 name: None,
1180 status: Some(RunnerStatus::Removed),
1181 adjustment_factor: None,
1182 bsp: None,
1183 removal_date: None,
1184 },
1185 RunnerDefinition {
1186 id: 303,
1187 hc: None,
1188 sort_priority: Some(3),
1189 name: None,
1190 status: Some(RunnerStatus::RemovedVacant),
1191 adjustment_factor: None,
1192 bsp: None,
1193 removal_date: None,
1194 },
1195 ]);
1196
1197 let results =
1198 parse_instrument_statuses("1.999", &def, UnixNanos::default(), UnixNanos::default());
1199
1200 assert_eq!(results.len(), 3);
1201
1202 assert_eq!(results[0].action, MarketStatusAction::Trading);
1203 assert_eq!(results[0].is_trading, Some(true));
1204
1205 assert_eq!(results[1].action, MarketStatusAction::Close);
1206 assert_eq!(results[1].is_trading, Some(false));
1207
1208 assert_eq!(results[2].action, MarketStatusAction::Close);
1209 assert_eq!(results[2].is_trading, Some(false));
1210
1211 assert_ne!(results[0].instrument_id, results[1].instrument_id);
1213 assert_ne!(results[1].instrument_id, results[2].instrument_id);
1214 assert_ne!(results[0].instrument_id, results[2].instrument_id);
1215 }
1216
1217 #[rstest]
1218 fn test_parse_order_status_report_new_order() {
1219 let data = load_test_json("stream/ocm_NEW_FULL_IMAGE.json");
1220 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1221
1222 if let StreamMessage::OrderChange(ocm) = msg {
1223 let oc = ocm.oc.as_ref().unwrap();
1224 let omc = &oc[0];
1225 let orc = &omc.orc.as_ref().unwrap()[0];
1226 let uo = &orc.uo.as_ref().unwrap()[0];
1227
1228 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1229 let report = parse_order_status_report(
1230 uo,
1231 instrument_id,
1232 AccountId::from("BETFAIR-001"),
1233 parse_millis_timestamp(ocm.pt),
1234 parse_millis_timestamp(ocm.pt),
1235 )
1236 .unwrap();
1237
1238 assert_eq!(report.order_status, OrderStatus::PartiallyFilled);
1240 assert_eq!(report.order_side, OrderSide::Sell); assert_eq!(report.order_type, OrderType::Limit);
1242 assert_eq!(report.filled_qty.as_f64(), 4.75);
1243 assert_eq!(report.quantity.as_f64(), 5.0);
1244 assert!(report.price.is_some());
1245 assert_eq!(report.price.unwrap().as_f64(), 12.0);
1246 } else {
1247 panic!("expected OrderChange");
1248 }
1249 }
1250
1251 #[rstest]
1252 fn test_parse_order_status_report_filled() {
1253 let data = load_test_json("stream/ocm_FILLED.json");
1254 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1255
1256 if let StreamMessage::OrderChange(ocm) = msg {
1257 let oc = ocm.oc.as_ref().unwrap();
1258 let omc = &oc[0];
1259 let orc = &omc.orc.as_ref().unwrap()[0];
1260 let uo = &orc.uo.as_ref().unwrap()[0];
1261
1262 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1263 let report = parse_order_status_report(
1264 uo,
1265 instrument_id,
1266 AccountId::from("BETFAIR-001"),
1267 parse_millis_timestamp(ocm.pt),
1268 parse_millis_timestamp(ocm.pt),
1269 )
1270 .unwrap();
1271
1272 assert_eq!(report.order_status, OrderStatus::Filled);
1273 assert_eq!(report.order_side, OrderSide::Buy); assert_eq!(report.filled_qty.as_f64(), 10.0);
1275 assert_eq!(report.quantity.as_f64(), 10.0);
1276
1277 assert!(report.client_order_id.is_some());
1279 } else {
1280 panic!("expected OrderChange");
1281 }
1282 }
1283
1284 #[rstest]
1285 fn test_parse_order_status_report_cancelled() {
1286 let data = load_test_json("stream/ocm_CANCEL.json");
1287 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1288
1289 if let StreamMessage::OrderChange(ocm) = msg {
1290 let oc = ocm.oc.as_ref().unwrap();
1291 let omc = &oc[0];
1292 let orc = &omc.orc.as_ref().unwrap()[0];
1293 let uo = &orc.uo.as_ref().unwrap()[0];
1294
1295 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1296 let report = parse_order_status_report(
1297 uo,
1298 instrument_id,
1299 AccountId::from("BETFAIR-001"),
1300 parse_millis_timestamp(ocm.pt),
1301 parse_millis_timestamp(ocm.pt),
1302 )
1303 .unwrap();
1304
1305 assert_eq!(report.order_status, OrderStatus::Canceled);
1306 assert_eq!(report.order_side, OrderSide::Sell); assert_eq!(report.filled_qty.as_f64(), 0.0);
1308 assert_eq!(report.quantity.as_f64(), 10.0);
1309 } else {
1310 panic!("expected OrderChange");
1311 }
1312 }
1313
1314 #[rstest]
1315 fn test_parse_order_status_report_duplicate_execution() {
1316 let data = load_test_json("stream/ocm_DUPLICATE_EXECUTION.json");
1317 let msgs: Vec<StreamMessage> = serde_json::from_str(&data).unwrap();
1318
1319 if let StreamMessage::OrderChange(ocm) = &msgs[0] {
1320 let oc = ocm.oc.as_ref().unwrap();
1321 let omc = &oc[0];
1322 let orc = &omc.orc.as_ref().unwrap()[0];
1323 let uo = &orc.uo.as_ref().unwrap()[0];
1324
1325 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1326 let report = parse_order_status_report(
1327 uo,
1328 instrument_id,
1329 AccountId::from("BETFAIR-001"),
1330 parse_millis_timestamp(ocm.pt),
1331 parse_millis_timestamp(ocm.pt),
1332 )
1333 .unwrap();
1334
1335 assert_eq!(report.order_status, OrderStatus::PartiallyFilled);
1337 assert_eq!(report.order_side, OrderSide::Buy); assert_eq!(report.filled_qty.as_f64(), 1.12);
1339 } else {
1340 panic!("expected OrderChange");
1341 }
1342 }
1343
1344 #[rstest]
1345 fn test_parse_order_status_report_multiple_fills() {
1346 let data = load_test_json("stream/ocm_multiple_fills.json");
1347 let msgs: Vec<StreamMessage> = serde_json::from_str(&data).unwrap();
1348
1349 if let StreamMessage::OrderChange(ocm) = &msgs[0] {
1350 let oc = ocm.oc.as_ref().unwrap();
1351 let omc = &oc[0];
1352 let orc = &omc.orc.as_ref().unwrap()[0];
1353 let uo = &orc.uo.as_ref().unwrap()[0];
1354
1355 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1356 let report = parse_order_status_report(
1357 uo,
1358 instrument_id,
1359 AccountId::from("BETFAIR-001"),
1360 parse_millis_timestamp(ocm.pt),
1361 parse_millis_timestamp(ocm.pt),
1362 )
1363 .unwrap();
1364
1365 assert_eq!(report.order_status, OrderStatus::PartiallyFilled);
1367 assert_eq!(report.order_side, OrderSide::Sell); assert_eq!(report.filled_qty.as_f64(), 16.19);
1369 assert!(report.client_order_id.is_some());
1370 assert!(report.avg_px.is_some());
1371 } else {
1372 panic!("expected OrderChange");
1373 }
1374 }
1375
1376 #[rstest]
1377 fn test_parse_runner_book_snapshot_empty_book() {
1378 let rc = RunnerChange {
1380 id: 12345,
1381 hc: None,
1382 atb: Some(vec![]),
1383 atl: Some(vec![]),
1384 batb: None,
1385 batl: None,
1386 bdatb: None,
1387 bdatl: None,
1388 spb: None,
1389 spl: None,
1390 spn: None,
1391 spf: None,
1392 trd: None,
1393 ltp: None,
1394 tv: None,
1395 };
1396
1397 let result = parse_runner_book_deltas(
1398 make_instrument_id("1.123", 12345, Decimal::ZERO),
1399 &rc,
1400 true,
1401 1000,
1402 UnixNanos::default(),
1403 UnixNanos::default(),
1404 )
1405 .unwrap()
1406 .expect("should produce snapshot deltas");
1407
1408 assert_eq!(result.deltas.len(), 1);
1410 assert_eq!(result.deltas[0].action, BookAction::Clear);
1411 assert!(RecordFlag::F_LAST.matches(result.deltas[0].flags));
1412 assert!(RecordFlag::F_SNAPSHOT.matches(result.deltas[0].flags));
1413 }
1414
1415 #[rstest]
1416 fn test_make_fill_report() {
1417 let instrument_id = make_instrument_id("1.180604981", 1209555, Decimal::ZERO);
1418 let fill = make_fill_report(
1419 AccountId::from("BETFAIR-001"),
1420 instrument_id,
1421 VenueOrderId::from("229430281339"),
1422 TradeId::from("229430281339-0"),
1423 OrderSide::Buy,
1424 Quantity::new(10.0, BETFAIR_QUANTITY_PRECISION),
1425 Price::new(1.1, BETFAIR_PRICE_PRECISION),
1426 Currency::GBP(),
1427 None,
1428 UnixNanos::default(),
1429 UnixNanos::default(),
1430 );
1431
1432 assert_eq!(fill.instrument_id, instrument_id);
1433 assert_eq!(fill.order_side, OrderSide::Buy);
1434 assert_eq!(fill.last_qty.as_f64(), 10.0);
1435 assert_eq!(fill.last_px.as_f64(), 1.1);
1436 assert_eq!(fill.commission.as_f64(), 0.0);
1437 assert_eq!(fill.liquidity_side, LiquiditySide::NoLiquiditySide);
1438 }
1439
1440 #[rstest]
1441 fn test_fill_tracker_single_full_fill() {
1442 let data = load_test_json("stream/ocm_FILLED.json");
1443 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1444
1445 if let StreamMessage::OrderChange(ocm) = msg {
1446 let oc = ocm.oc.as_ref().unwrap();
1447 let omc = &oc[0];
1448 let orc = &omc.orc.as_ref().unwrap()[0];
1449 let uo = &orc.uo.as_ref().unwrap()[0];
1450 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1451 let ts = parse_millis_timestamp(ocm.pt);
1452
1453 let mut tracker = FillTracker::new();
1454 let fill = tracker
1455 .maybe_fill_report(
1456 uo,
1457 uo.s,
1458 instrument_id,
1459 AccountId::from("BETFAIR-001"),
1460 Currency::GBP(),
1461 ts,
1462 ts,
1463 )
1464 .expect("should produce fill");
1465
1466 assert_eq!(fill.last_qty.as_f64(), 10.0);
1467 assert_eq!(fill.last_px.as_f64(), 1.1);
1468 assert_eq!(fill.order_side, OrderSide::Buy);
1469 assert!(fill.client_order_id.is_some());
1470 } else {
1471 panic!("expected OrderChange");
1472 }
1473 }
1474
1475 #[rstest]
1476 fn test_fill_tracker_incremental_fills() {
1477 let data = load_test_json("stream/ocm_multiple_fills.json");
1478 let msgs: Vec<StreamMessage> = serde_json::from_str(&data).unwrap();
1479 let instrument_id = make_instrument_id("1.179082386", 50210, Decimal::ZERO);
1480
1481 let mut tracker = FillTracker::new();
1482 let account_id = AccountId::from("BETFAIR-001");
1483 let currency = Currency::GBP();
1484
1485 let uo1 = extract_uo(&msgs[0]);
1487 let ts1 = extract_ts(&msgs[0]);
1488 let fill1 = tracker
1489 .maybe_fill_report(uo1, uo1.s, instrument_id, account_id, currency, ts1, ts1)
1490 .expect("should produce first fill");
1491 assert_eq!(fill1.last_qty.as_f64(), 16.19);
1492 assert_eq!(fill1.last_px.as_f64(), 5.8);
1493
1494 let uo2 = extract_uo(&msgs[1]);
1496 let ts2 = extract_ts(&msgs[1]);
1497 let fill2 = tracker
1498 .maybe_fill_report(uo2, uo2.s, instrument_id, account_id, currency, ts2, ts2)
1499 .expect("should produce second fill");
1500 assert_eq!(fill2.last_qty.as_f64(), 0.77);
1501 assert_eq!(fill2.last_px.as_f64(), 5.8);
1502
1503 let uo3 = extract_uo(&msgs[2]);
1505 let ts3 = extract_ts(&msgs[2]);
1506 let fill3 = tracker
1507 .maybe_fill_report(uo3, uo3.s, instrument_id, account_id, currency, ts3, ts3)
1508 .expect("should produce third fill");
1509 assert_eq!(fill3.last_qty.as_f64(), 0.77);
1510 }
1511
1512 #[rstest]
1513 fn test_fill_tracker_different_price() {
1514 let data = load_test_json("stream/ocm_filled_different_price.json");
1515 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1516
1517 if let StreamMessage::OrderChange(ocm) = msg {
1518 let oc = ocm.oc.as_ref().unwrap();
1519 let omc = &oc[0];
1520 let orc = &omc.orc.as_ref().unwrap()[0];
1521 let uo = &orc.uo.as_ref().unwrap()[0];
1522 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1523 let ts = parse_millis_timestamp(ocm.pt);
1524
1525 let mut tracker = FillTracker::new();
1526 let fill = tracker
1527 .maybe_fill_report(
1528 uo,
1529 uo.s,
1530 instrument_id,
1531 AccountId::from("BETFAIR-001"),
1532 Currency::GBP(),
1533 ts,
1534 ts,
1535 )
1536 .expect("should produce fill");
1537
1538 assert_eq!(fill.last_qty.as_f64(), 20.0);
1540 assert_eq!(fill.last_px.as_f64(), 1.2);
1541 } else {
1542 panic!("expected OrderChange");
1543 }
1544 }
1545
1546 #[rstest]
1547 fn test_fill_tracker_cancel_no_fill() {
1548 let data = load_test_json("stream/ocm_CANCEL.json");
1549 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1550
1551 if let StreamMessage::OrderChange(ocm) = msg {
1552 let oc = ocm.oc.as_ref().unwrap();
1553 let omc = &oc[0];
1554 let orc = &omc.orc.as_ref().unwrap()[0];
1555 let uo = &orc.uo.as_ref().unwrap()[0];
1556 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1557 let ts = parse_millis_timestamp(ocm.pt);
1558
1559 let mut tracker = FillTracker::new();
1560 let result = tracker.maybe_fill_report(
1561 uo,
1562 uo.s,
1563 instrument_id,
1564 AccountId::from("BETFAIR-001"),
1565 Currency::GBP(),
1566 ts,
1567 ts,
1568 );
1569 assert!(result.is_none(), "cancelled order should not produce fill");
1570 } else {
1571 panic!("expected OrderChange");
1572 }
1573 }
1574
1575 #[rstest]
1576 fn test_fill_tracker_lapsed_no_fill() {
1577 let data = load_test_json("stream/ocm_error_fill.json");
1578 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1579
1580 if let StreamMessage::OrderChange(ocm) = msg {
1581 let oc = ocm.oc.as_ref().unwrap();
1582 let omc = &oc[0];
1583 let orc = &omc.orc.as_ref().unwrap()[0];
1584 let uo = &orc.uo.as_ref().unwrap()[0];
1585 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1586 let ts = parse_millis_timestamp(ocm.pt);
1587
1588 let mut tracker = FillTracker::new();
1589 let result = tracker.maybe_fill_report(
1590 uo,
1591 uo.s,
1592 instrument_id,
1593 AccountId::from("BETFAIR-001"),
1594 Currency::GBP(),
1595 ts,
1596 ts,
1597 );
1598 assert!(result.is_none(), "lapsed order should not produce fill");
1599 } else {
1600 panic!("expected OrderChange");
1601 }
1602 }
1603
1604 #[rstest]
1605 fn test_fill_tracker_duplicate_dedup() {
1606 let data = load_test_json("stream/ocm_FILLED.json");
1607 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1608
1609 if let StreamMessage::OrderChange(ocm) = msg {
1610 let oc = ocm.oc.as_ref().unwrap();
1611 let omc = &oc[0];
1612 let orc = &omc.orc.as_ref().unwrap()[0];
1613 let uo = &orc.uo.as_ref().unwrap()[0];
1614 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
1615 let ts = parse_millis_timestamp(ocm.pt);
1616 let account_id = AccountId::from("BETFAIR-001");
1617 let currency = Currency::GBP();
1618
1619 let mut tracker = FillTracker::new();
1620
1621 let fill1 =
1623 tracker.maybe_fill_report(uo, uo.s, instrument_id, account_id, currency, ts, ts);
1624 assert!(fill1.is_some());
1625
1626 let fill2 =
1628 tracker.maybe_fill_report(uo, uo.s, instrument_id, account_id, currency, ts, ts);
1629 assert!(fill2.is_none(), "duplicate fill should be suppressed");
1630 } else {
1631 panic!("expected OrderChange");
1632 }
1633 }
1634
1635 #[rstest]
1636 fn test_fill_tracker_price_back_calculation() {
1637 let data = load_test_json("stream/ocm_multiple_fills.json");
1638 let msgs: Vec<StreamMessage> = serde_json::from_str(&data).unwrap();
1639 let instrument_id = make_instrument_id("1.179082386", 50210, Decimal::ZERO);
1640 let account_id = AccountId::from("BETFAIR-001");
1641 let currency = Currency::GBP();
1642 let mut tracker = FillTracker::new();
1643
1644 let uo1 = extract_uo(&msgs[0]);
1646 let ts1 = extract_ts(&msgs[0]);
1647 let fill1 = tracker
1648 .maybe_fill_report(uo1, uo1.s, instrument_id, account_id, currency, ts1, ts1)
1649 .unwrap();
1650 assert_eq!(fill1.last_px.as_f64(), 5.8);
1651
1652 let uo2 = extract_uo(&msgs[1]);
1654 let ts2 = extract_ts(&msgs[1]);
1655 let fill2 = tracker
1656 .maybe_fill_report(uo2, uo2.s, instrument_id, account_id, currency, ts2, ts2)
1657 .unwrap();
1658 assert_eq!(fill2.last_px.as_f64(), 5.8);
1659 }
1660
1661 #[rstest]
1662 fn test_has_cancel_quantity() {
1663 let data = load_test_json("stream/ocm_CANCEL.json");
1664 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1665
1666 if let StreamMessage::OrderChange(ocm) = msg {
1667 let uo = &ocm.oc.as_ref().unwrap()[0].orc.as_ref().unwrap()[0]
1668 .uo
1669 .as_ref()
1670 .unwrap()[0];
1671 assert!(has_cancel_quantity(uo));
1672 } else {
1673 panic!("expected OrderChange");
1674 }
1675 }
1676
1677 #[rstest]
1678 fn test_has_cancel_quantity_filled_order() {
1679 let data = load_test_json("stream/ocm_FILLED.json");
1680 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1681
1682 if let StreamMessage::OrderChange(ocm) = msg {
1683 let uo = &ocm.oc.as_ref().unwrap()[0].orc.as_ref().unwrap()[0]
1684 .uo
1685 .as_ref()
1686 .unwrap()[0];
1687 assert!(!has_cancel_quantity(uo));
1688 } else {
1689 panic!("expected OrderChange");
1690 }
1691 }
1692
1693 fn extract_uo(msg: &StreamMessage) -> &UnmatchedOrder {
1694 if let StreamMessage::OrderChange(ocm) = msg {
1695 &ocm.oc.as_ref().unwrap()[0].orc.as_ref().unwrap()[0]
1696 .uo
1697 .as_ref()
1698 .unwrap()[0]
1699 } else {
1700 panic!("expected OrderChange")
1701 }
1702 }
1703
1704 fn extract_ts(msg: &StreamMessage) -> UnixNanos {
1705 if let StreamMessage::OrderChange(ocm) = msg {
1706 parse_millis_timestamp(ocm.pt)
1707 } else {
1708 panic!("expected OrderChange")
1709 }
1710 }
1711
1712 #[rstest]
1713 fn test_parse_race_runner_data_from_fixture() {
1714 let data = load_test_json("stream/rcm_single.json");
1715 let msg = stream_decode(data.as_bytes()).unwrap();
1716
1717 let StreamMessage::RaceChange(rcm) = msg else {
1718 panic!("expected RaceChange");
1719 };
1720
1721 let race = &rcm.rc.as_ref().unwrap()[0];
1722 let rrc = &race.rrc.as_ref().unwrap()[0];
1723 let ts = parse_millis_timestamp(rcm.pt);
1724
1725 let runner = parse_race_runner_data(
1726 race.id.as_deref().unwrap(),
1727 race.mid.as_deref().unwrap(),
1728 rrc,
1729 ts,
1730 ts,
1731 )
1732 .unwrap();
1733
1734 assert_eq!(runner.race_id, "28587288.1650");
1735 assert_eq!(runner.market_id, "1.1234567");
1736 assert_eq!(runner.selection_id, 7390417);
1737 assert!((runner.latitude - 51.4189543).abs() < 1e-6);
1738 assert!((runner.longitude - (-0.4058491)).abs() < 1e-6);
1739 assert!((runner.speed - 17.8).abs() < 1e-6);
1740 assert!((runner.progress - 2051.0).abs() < 1e-6);
1741 assert!((runner.stride_frequency - 2.07).abs() < 1e-6);
1742 }
1743
1744 #[rstest]
1745 fn test_parse_race_progress_from_fixture() {
1746 let data = load_test_json("stream/rcm_single.json");
1747 let msg = stream_decode(data.as_bytes()).unwrap();
1748
1749 let StreamMessage::RaceChange(rcm) = msg else {
1750 panic!("expected RaceChange");
1751 };
1752
1753 let race = &rcm.rc.as_ref().unwrap()[0];
1754 let rpc = race.rpc.as_ref().unwrap();
1755 let ts = parse_millis_timestamp(rcm.pt);
1756
1757 let progress = parse_race_progress(
1758 race.id.as_deref().unwrap(),
1759 race.mid.as_deref().unwrap(),
1760 rpc,
1761 ts,
1762 ts,
1763 );
1764
1765 assert_eq!(progress.race_id, "28587288.1650");
1766 assert_eq!(progress.market_id, "1.1234567");
1767 assert_eq!(progress.gate_name, "1f");
1768 assert!((progress.sectional_time - 10.6).abs() < 1e-6);
1769 assert!((progress.running_time - 46.7).abs() < 1e-6);
1770 assert!((progress.speed - 17.8).abs() < 1e-6);
1771 assert!((progress.progress - 87.5).abs() < 1e-6);
1772
1773 let order: Vec<i64> = serde_json::from_str(&progress.order).unwrap();
1774 assert_eq!(order, vec![7390417, 5600338, 11527189, 6395118, 8706072]);
1775
1776 let jumps: Vec<serde_json::Value> = serde_json::from_str(&progress.jumps).unwrap();
1777 assert_eq!(jumps.len(), 2);
1778 assert_eq!(jumps[0]["J"], 2);
1779 }
1780
1781 #[rstest]
1782 fn test_parse_race_runner_data_multi_runner() {
1783 let data = load_test_json("stream/rcm_multi_runner.json");
1784 let msg = stream_decode(data.as_bytes()).unwrap();
1785
1786 let StreamMessage::RaceChange(rcm) = msg else {
1787 panic!("expected RaceChange");
1788 };
1789
1790 let race = &rcm.rc.as_ref().unwrap()[0];
1791 let ts = parse_millis_timestamp(rcm.pt);
1792 let race_id = race.id.as_deref().unwrap();
1793 let market_id = race.mid.as_deref().unwrap();
1794
1795 let runners: Vec<_> = race
1796 .rrc
1797 .as_ref()
1798 .unwrap()
1799 .iter()
1800 .filter_map(|rrc| parse_race_runner_data(race_id, market_id, rrc, ts, ts))
1801 .collect();
1802
1803 assert_eq!(runners.len(), 5);
1804 assert_eq!(runners[0].selection_id, 35467839);
1805 assert_eq!(runners[4].selection_id, 41694785);
1806 assert!((runners[0].speed - 16.33).abs() < 1e-6);
1807 assert!((runners[4].speed - 17.11).abs() < 1e-6);
1808 }
1809
1810 #[rstest]
1811 fn test_parse_race_runner_data_missing_id_returns_none() {
1812 let rrc = RaceRunnerChange {
1813 ft: Some(1000),
1814 id: None,
1815 lat: Some(51.0),
1816 lng: Some(-0.4),
1817 spd: Some(15.0),
1818 prg: Some(500.0),
1819 sfq: Some(2.0),
1820 };
1821 let ts = UnixNanos::from(1_000_000_000u64);
1822 let result = parse_race_runner_data("race1", "market1", &rrc, ts, ts);
1823 assert!(result.is_none());
1824 }
1825
1826 #[rstest]
1827 fn test_parse_race_runner_data_absent_fields_are_nan() {
1828 let rrc = RaceRunnerChange {
1829 ft: None,
1830 id: Some(12345),
1831 lat: None,
1832 lng: None,
1833 spd: None,
1834 prg: None,
1835 sfq: None,
1836 };
1837 let ts = UnixNanos::from(1_000_000_000u64);
1838 let runner = parse_race_runner_data("race1", "market1", &rrc, ts, ts).unwrap();
1839 assert!(runner.latitude.is_nan());
1840 assert!(runner.longitude.is_nan());
1841 assert!(runner.speed.is_nan());
1842 assert!(runner.progress.is_nan());
1843 assert!(runner.stride_frequency.is_nan());
1844 }
1845
1846 #[rstest]
1847 fn test_parse_race_progress_absent_fields() {
1848 let rpc = RaceProgressChange {
1849 ft: None,
1850 g: None,
1851 st: None,
1852 rt: None,
1853 spd: None,
1854 prg: None,
1855 ord: None,
1856 jumps: None,
1857 };
1858 let ts = UnixNanos::from(1_000_000_000u64);
1859 let progress = parse_race_progress("race1", "market1", &rpc, ts, ts);
1860 assert_eq!(progress.gate_name, "");
1861 assert!(progress.sectional_time.is_nan());
1862 assert!(progress.running_time.is_nan());
1863 assert_eq!(progress.order, "");
1864 assert_eq!(progress.jumps, "");
1865 }
1866
1867 fn runner_change_with_ticker(
1868 id: u64,
1869 ltp: Option<Decimal>,
1870 tv: Option<Decimal>,
1871 spn: Option<Decimal>,
1872 spf: Option<Decimal>,
1873 ) -> RunnerChange {
1874 RunnerChange {
1875 id,
1876 hc: None,
1877 atb: None,
1878 atl: None,
1879 batb: None,
1880 batl: None,
1881 bdatb: None,
1882 bdatl: None,
1883 spb: None,
1884 spl: None,
1885 spn,
1886 spf,
1887 trd: None,
1888 ltp,
1889 tv,
1890 }
1891 }
1892
1893 #[rstest]
1894 fn test_parse_betfair_ticker_all_fields() {
1895 let rc = runner_change_with_ticker(
1896 9249757,
1897 Some(Decimal::new(55, 1)),
1898 Some(Decimal::new(189032, 2)),
1899 Some(Decimal::new(568, 2)),
1900 Some(Decimal::new(573, 2)),
1901 );
1902 let ts = UnixNanos::from(1_000_000_000u64);
1903 let instrument_id = make_instrument_id("1.185781465", 9249757, Decimal::ZERO);
1904
1905 let ticker = parse_betfair_ticker(instrument_id, &rc, ts, ts).unwrap();
1906
1907 assert_eq!(ticker.instrument_id, instrument_id);
1908 assert!((ticker.last_traded_price - 5.5).abs() < f64::EPSILON);
1909 assert!((ticker.traded_volume - 1890.32).abs() < f64::EPSILON);
1910 assert!((ticker.starting_price_near - 5.68).abs() < f64::EPSILON);
1911 assert!((ticker.starting_price_far - 5.73).abs() < f64::EPSILON);
1912 }
1913
1914 #[rstest]
1915 fn test_parse_betfair_ticker_partial_fields() {
1916 let rc = runner_change_with_ticker(
1917 9249757,
1918 Some(Decimal::new(55, 1)),
1919 Some(Decimal::new(189032, 2)),
1920 None,
1921 None,
1922 );
1923 let ts = UnixNanos::from(1_000_000_000u64);
1924 let instrument_id = make_instrument_id("1.185781465", 9249757, Decimal::ZERO);
1925
1926 let ticker = parse_betfair_ticker(instrument_id, &rc, ts, ts).unwrap();
1927
1928 assert!((ticker.last_traded_price - 5.5).abs() < f64::EPSILON);
1929 assert!((ticker.traded_volume - 1890.32).abs() < f64::EPSILON);
1930 assert!(ticker.starting_price_near.is_nan());
1931 assert!(ticker.starting_price_far.is_nan());
1932 }
1933
1934 #[rstest]
1935 fn test_parse_betfair_ticker_no_fields_returns_none() {
1936 let rc = runner_change_with_ticker(9249757, None, None, None, None);
1937 let ts = UnixNanos::from(1_000_000_000u64);
1938 let instrument_id = make_instrument_id("1.185781465", 9249757, Decimal::ZERO);
1939
1940 assert!(parse_betfair_ticker(instrument_id, &rc, ts, ts).is_none());
1941 }
1942
1943 #[rstest]
1944 fn test_parse_betfair_ticker_only_tv() {
1945 let rc =
1946 runner_change_with_ticker(40273293, None, Some(Decimal::new(320115, 2)), None, None);
1947 let ts = UnixNanos::from(1_000_000_000u64);
1948 let instrument_id = make_instrument_id("1.185781465", 40273293, Decimal::ZERO);
1949
1950 let ticker = parse_betfair_ticker(instrument_id, &rc, ts, ts).unwrap();
1951
1952 assert!(ticker.last_traded_price.is_nan());
1953 assert!((ticker.traded_volume - 3201.15).abs() < f64::EPSILON);
1954 assert!(ticker.starting_price_near.is_nan());
1955 assert!(ticker.starting_price_far.is_nan());
1956 }
1957
1958 #[rstest]
1959 fn test_parse_betfair_ticker_from_fixture() {
1960 let data = load_test_json("stream/mcm_BSP_settled.json");
1961 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
1962
1963 if let StreamMessage::MarketChange(mcm) = msg {
1964 let mc = mcm.mc.as_ref().unwrap();
1965 let change = &mc[0];
1966 let rc_list = change.rc.as_ref().unwrap();
1967
1968 let rc = rc_list.iter().find(|r| r.id == 9249757).unwrap();
1970 let instrument_id = make_instrument_id(&change.id, rc.id, Decimal::ZERO);
1971 let ts = parse_millis_timestamp(mcm.pt);
1972
1973 let ticker = parse_betfair_ticker(instrument_id, rc, ts, ts).unwrap();
1974
1975 assert!((ticker.last_traded_price - 5.5).abs() < f64::EPSILON);
1976 assert!((ticker.traded_volume - 1890.32).abs() < f64::EPSILON);
1977 assert!((ticker.starting_price_near - 5.68).abs() < f64::EPSILON);
1978 assert!((ticker.starting_price_far - 5.73).abs() < f64::EPSILON);
1979
1980 let rc2 = rc_list.iter().find(|r| r.id == 40273293).unwrap();
1982 let instrument_id2 = make_instrument_id(&change.id, rc2.id, Decimal::ZERO);
1983 let ticker2 = parse_betfair_ticker(instrument_id2, rc2, ts, ts).unwrap();
1984
1985 assert!((ticker2.last_traded_price - 2.1).abs() < f64::EPSILON);
1986 assert!((ticker2.traded_volume - 3201.15).abs() < f64::EPSILON);
1987 assert!(ticker2.starting_price_near.is_nan());
1988 assert!(ticker2.starting_price_far.is_nan());
1989
1990 let rc3 = rc_list.iter().find(|r| r.id == 23678734).unwrap();
1992 let instrument_id3 = make_instrument_id(&change.id, rc3.id, Decimal::ZERO);
1993 let ticker3 = parse_betfair_ticker(instrument_id3, rc3, ts, ts).unwrap();
1994
1995 assert!(ticker3.last_traded_price.is_nan());
1996 assert!((ticker3.traded_volume - 0.0).abs() < f64::EPSILON);
1997 } else {
1998 panic!("Expected MarketChange");
1999 }
2000 }
2001
2002 #[rstest]
2003 fn test_parse_betfair_starting_prices_from_fixture() {
2004 let data = load_test_json("stream/mcm_BSP_settled.json");
2005 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
2006
2007 if let StreamMessage::MarketChange(mcm) = msg {
2008 let mc = mcm.mc.as_ref().unwrap();
2009 let def = mc[0].market_definition.as_ref().unwrap();
2010 let ts = parse_millis_timestamp(mcm.pt);
2011
2012 let prices = parse_betfair_starting_prices(&mc[0].id, def, ts, ts);
2013
2014 assert_eq!(prices.len(), 3);
2016
2017 let bsp_map: std::collections::HashMap<String, f64> = prices
2018 .iter()
2019 .map(|p| (p.instrument_id.to_string(), p.bsp))
2020 .collect();
2021
2022 let id_winner = make_instrument_id("1.185781465", 9249757, Decimal::ZERO).to_string();
2023 let id_placed = make_instrument_id("1.185781465", 40273293, Decimal::ZERO).to_string();
2024 let id_loser = make_instrument_id("1.185781465", 11120000, Decimal::ZERO).to_string();
2025
2026 assert!((bsp_map[&id_winner] - 5.73).abs() < f64::EPSILON);
2027 assert!((bsp_map[&id_placed] - 2.14).abs() < f64::EPSILON);
2028 assert!((bsp_map[&id_loser] - 28.56).abs() < f64::EPSILON);
2029 } else {
2030 panic!("Expected MarketChange");
2031 }
2032 }
2033
2034 #[rstest]
2035 fn test_parse_betfair_starting_prices_no_runners() {
2036 let def = MarketDefinition {
2037 runners: None,
2038 bet_delay: None,
2039 betting_type: None,
2040 bsp_market: None,
2041 bsp_reconciled: None,
2042 competition_id: None,
2043 competition_name: None,
2044 complete: None,
2045 country_code: None,
2046 cross_matching: None,
2047 discount_allowed: None,
2048 each_way_divisor: None,
2049 event_id: None,
2050 event_name: None,
2051 event_type_id: None,
2052 event_type_name: None,
2053 in_play: None,
2054 line_interval: None,
2055 line_max_unit: None,
2056 line_min_unit: None,
2057 market_base_rate: None,
2058 market_id: None,
2059 market_name: None,
2060 market_time: None,
2061 market_type: None,
2062 number_of_active_runners: None,
2063 number_of_winners: None,
2064 open_date: None,
2065 persistence_enabled: None,
2066 price_ladder_definition: None,
2067 race_type: None,
2068 regulators: None,
2069 runners_voidable: None,
2070 settled_time: None,
2071 status: None,
2072 suspend_time: None,
2073 timezone: None,
2074 turn_in_play_enabled: None,
2075 venue: None,
2076 version: None,
2077 };
2078 let ts = UnixNanos::from(1_000_000_000u64);
2079
2080 let prices = parse_betfair_starting_prices("1.12345", &def, ts, ts);
2081
2082 assert!(prices.is_empty());
2083 }
2084
2085 #[rstest]
2086 fn test_parse_bsp_book_deltas_from_fixture() {
2087 let data = load_test_json("stream/mcm_BSP.json");
2088 let messages: Vec<StreamMessage> = serde_json::from_str(&data).unwrap();
2089
2090 let mcm = messages
2092 .iter()
2093 .find_map(|m| match m {
2094 StreamMessage::MarketChange(mcm) => {
2095 let mc = mcm.mc.as_ref()?;
2096 let has_spb = mc.iter().any(|c| {
2097 c.rc.as_ref()
2098 .is_some_and(|rcs| rcs.iter().any(|r| r.spb.is_some()))
2099 });
2100
2101 if has_spb { Some(mcm) } else { None }
2102 }
2103 _ => None,
2104 })
2105 .expect("fixture should contain MCM with spb data");
2106
2107 let mc = mcm.mc.as_ref().unwrap();
2108 let change = &mc[0];
2109 let rc_list = change.rc.as_ref().unwrap();
2110
2111 let rc = rc_list.iter().find(|r| r.id == 9249757).unwrap();
2113 let instrument_id = make_instrument_id(&change.id, rc.id, Decimal::ZERO);
2114 let ts = parse_millis_timestamp(mcm.pt);
2115
2116 let deltas = parse_bsp_book_deltas(instrument_id, rc, ts, ts);
2117
2118 let spb_count = rc.spb.as_ref().unwrap().len();
2119 let spl_count = rc.spl.as_ref().unwrap().len();
2120 assert_eq!(deltas.len(), spb_count + spl_count);
2121
2122 assert_eq!(deltas[0].side, OrderSide::Sell as u32);
2124 assert!((deltas[0].price - 1000.0).abs() < f64::EPSILON);
2125 assert!((deltas[0].size - 33.38).abs() < f64::EPSILON);
2126 assert_eq!(deltas[0].action, BookAction::Update as u32);
2127
2128 let spl_start = spb_count;
2130 assert_eq!(deltas[spl_start].side, OrderSide::Buy as u32);
2131 assert!((deltas[spl_start].price - 7.0).abs() < f64::EPSILON);
2132 assert!((deltas[spl_start].size - 10.0).abs() < f64::EPSILON);
2133 }
2134
2135 #[rstest]
2136 fn test_parse_bsp_book_deltas_zero_volume_is_delete() {
2137 let rc = RunnerChange {
2138 id: 12345,
2139 hc: None,
2140 atb: None,
2141 atl: None,
2142 batb: None,
2143 batl: None,
2144 bdatb: None,
2145 bdatl: None,
2146 spb: Some(vec![PV {
2147 price: Decimal::new(50, 1),
2148 volume: Decimal::ZERO,
2149 }]),
2150 spl: None,
2151 spn: None,
2152 spf: None,
2153 trd: None,
2154 ltp: None,
2155 tv: None,
2156 };
2157 let ts = UnixNanos::from(1_000_000_000u64);
2158 let instrument_id = make_instrument_id("1.12345", 12345, Decimal::ZERO);
2159
2160 let deltas = parse_bsp_book_deltas(instrument_id, &rc, ts, ts);
2161
2162 assert_eq!(deltas.len(), 1);
2163 assert_eq!(deltas[0].action, BookAction::Delete as u32);
2164 assert!((deltas[0].price - 5.0).abs() < f64::EPSILON);
2165 assert!((deltas[0].size - 0.0).abs() < f64::EPSILON);
2166 }
2167
2168 #[rstest]
2169 fn test_parse_bsp_book_deltas_no_spb_spl_returns_empty() {
2170 let rc = runner_change_with_ticker(12345, None, None, None, None);
2171 let ts = UnixNanos::from(1_000_000_000u64);
2172 let instrument_id = make_instrument_id("1.12345", 12345, Decimal::ZERO);
2173
2174 let deltas = parse_bsp_book_deltas(instrument_id, &rc, ts, ts);
2175
2176 assert!(deltas.is_empty());
2177 }
2178
2179 #[rstest]
2180 fn test_parse_instrument_closes_from_fixture() {
2181 let data = load_test_json("stream/mcm_BSP_settled.json");
2182 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
2183
2184 if let StreamMessage::MarketChange(mcm) = msg {
2185 let mc = mcm.mc.as_ref().unwrap();
2186 let def = mc[0].market_definition.as_ref().unwrap();
2187 let ts = parse_millis_timestamp(mcm.pt);
2188
2189 let closes = parse_instrument_closes(&mc[0].id, def, ts, ts);
2190
2191 assert_eq!(closes.len(), 4);
2193
2194 let close_map: std::collections::HashMap<String, Price> = closes
2195 .iter()
2196 .map(|c| (c.instrument_id.to_string(), c.close_price))
2197 .collect();
2198
2199 let id_winner = make_instrument_id("1.185781465", 9249757, Decimal::ZERO).to_string();
2200 let id_placed = make_instrument_id("1.185781465", 40273293, Decimal::ZERO).to_string();
2201 let id_loser = make_instrument_id("1.185781465", 11120000, Decimal::ZERO).to_string();
2202 let id_removed = make_instrument_id("1.185781465", 37433527, Decimal::ZERO).to_string();
2203
2204 assert_eq!(close_map[&id_winner], Price::from("1.00"));
2205 assert_eq!(close_map[&id_placed], Price::from("1.00"));
2206 assert_eq!(close_map[&id_loser], Price::from("0.00"));
2207 assert_eq!(close_map[&id_removed], Price::from("0.00"));
2208 } else {
2209 panic!("Expected MarketChange");
2210 }
2211 }
2212
2213 #[rstest]
2214 fn test_parse_instrument_closes_active_runners_excluded() {
2215 let data = load_test_json("stream/mcm_BSP.json");
2216 let messages: Vec<StreamMessage> = serde_json::from_str(&data).unwrap();
2217
2218 let mcm = messages
2220 .iter()
2221 .find_map(|m| match m {
2222 StreamMessage::MarketChange(mcm) => {
2223 let mc = mcm.mc.as_ref()?;
2224 mc.iter()
2225 .find(|c| c.market_definition.is_some())
2226 .map(|_| mcm)
2227 }
2228 _ => None,
2229 })
2230 .expect("fixture should contain MCM with market definition");
2231
2232 let mc = mcm.mc.as_ref().unwrap();
2233 let change = mc.iter().find(|c| c.market_definition.is_some()).unwrap();
2234 let def = change.market_definition.as_ref().unwrap();
2235 let ts = parse_millis_timestamp(mcm.pt);
2236
2237 let closes = parse_instrument_closes(&change.id, def, ts, ts);
2238
2239 assert!(
2240 closes.is_empty(),
2241 "Active runners should not produce close events, found {}",
2242 closes.len()
2243 );
2244 }
2245
2246 fn make_test_uo(
2247 bet_id: &str,
2248 size: Decimal,
2249 sm: Option<Decimal>,
2250 avp: Option<Decimal>,
2251 ) -> UnmatchedOrder {
2252 UnmatchedOrder {
2253 id: bet_id.to_string(),
2254 p: Decimal::new(25, 1),
2255 s: size,
2256 side: StreamingSide::Back,
2257 status: StreamingOrderStatus::Executable,
2258 pt: Some(StreamingPersistenceType::Lapse),
2259 ot: StreamingOrderType::Limit,
2260 pd: 1616568581000,
2261 bsp: None,
2262 rfo: None,
2263 rfs: None,
2264 rc: None,
2265 rac: None,
2266 md: None,
2267 cd: None,
2268 ld: None,
2269 avp,
2270 sm,
2271 sr: None,
2272 sl: None,
2273 sc: None,
2274 sv: None,
2275 lsrc: None,
2276 }
2277 }
2278
2279 #[rstest]
2280 fn test_fill_tracker_sync_order_prevents_duplicate_fill() {
2281 let mut tracker = FillTracker::new();
2282
2283 tracker.sync_order("123456", Decimal::new(10, 0), Decimal::new(25, 1));
2285
2286 let uo = make_test_uo(
2287 "123456",
2288 Decimal::new(20, 0),
2289 Some(Decimal::new(10, 0)),
2290 Some(Decimal::new(25, 1)),
2291 );
2292
2293 let instrument_id = InstrumentId::from("1.234567-123456-0.0.BETFAIR");
2294 let account_id = AccountId::from("BETFAIR-001");
2295 let currency = Currency::from("GBP");
2296 let ts = UnixNanos::default();
2297
2298 let result =
2300 tracker.maybe_fill_report(&uo, uo.s, instrument_id, account_id, currency, ts, ts);
2301 assert!(
2302 result.is_none(),
2303 "should not emit fill for already-synced qty"
2304 );
2305 }
2306
2307 #[rstest]
2308 fn test_fill_tracker_sync_order_allows_incremental_fill() {
2309 let mut tracker = FillTracker::new();
2310
2311 tracker.sync_order("123456", Decimal::new(10, 0), Decimal::new(25, 1));
2313
2314 let uo = make_test_uo(
2315 "123456",
2316 Decimal::new(20, 0),
2317 Some(Decimal::new(15, 0)),
2318 Some(Decimal::new(26, 1)),
2319 );
2320
2321 let instrument_id = InstrumentId::from("1.234567-123456-0.0.BETFAIR");
2322 let account_id = AccountId::from("BETFAIR-001");
2323 let currency = Currency::from("GBP");
2324 let ts = UnixNanos::default();
2325
2326 let result =
2328 tracker.maybe_fill_report(&uo, uo.s, instrument_id, account_id, currency, ts, ts);
2329 assert!(result.is_some(), "should emit fill for new matched qty");
2330 let fill = result.unwrap();
2331 assert_eq!(fill.last_qty, Quantity::from("5.00"));
2332 }
2333
2334 #[rstest]
2335 fn test_fill_tracker_overfill_rejected() {
2336 let mut tracker = FillTracker::new();
2337
2338 let uo = make_test_uo(
2340 "999001",
2341 Decimal::new(20, 0),
2342 Some(Decimal::new(30, 0)),
2343 Some(Decimal::new(25, 1)),
2344 );
2345
2346 let instrument_id = InstrumentId::from("1.234567-999001-0.0.BETFAIR");
2347 let account_id = AccountId::from("BETFAIR-001");
2348 let currency = Currency::from("GBP");
2349 let ts = UnixNanos::default();
2350
2351 let result =
2352 tracker.maybe_fill_report(&uo, uo.s, instrument_id, account_id, currency, ts, ts);
2353 assert!(
2354 result.is_none(),
2355 "overfill (sm > order_qty) should be rejected"
2356 );
2357 }
2358
2359 #[rstest]
2360 fn test_fill_tracker_zero_sm_returns_none() {
2361 let mut tracker = FillTracker::new();
2362
2363 let uo = make_test_uo("999002", Decimal::new(10, 0), Some(Decimal::ZERO), None);
2364
2365 let instrument_id = InstrumentId::from("1.234567-999002-0.0.BETFAIR");
2366 let result = tracker.maybe_fill_report(
2367 &uo,
2368 uo.s,
2369 instrument_id,
2370 AccountId::from("BETFAIR-001"),
2371 Currency::from("GBP"),
2372 UnixNanos::default(),
2373 UnixNanos::default(),
2374 );
2375 assert!(result.is_none(), "zero sm should not produce a fill");
2376 }
2377
2378 #[rstest]
2379 fn test_fill_tracker_no_avp_uses_order_price() {
2380 let data = load_test_json("stream/ocm_FILLED_no_avp.json");
2381 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
2382
2383 if let StreamMessage::OrderChange(ocm) = msg {
2384 let oc = ocm.oc.as_ref().unwrap();
2385 let omc = &oc[0];
2386 let orc = &omc.orc.as_ref().unwrap()[0];
2387 let uo = &orc.uo.as_ref().unwrap()[0];
2388 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
2389 let ts = parse_millis_timestamp(ocm.pt);
2390
2391 let mut tracker = FillTracker::new();
2392 let fill = tracker
2393 .maybe_fill_report(
2394 uo,
2395 uo.s,
2396 instrument_id,
2397 AccountId::from("BETFAIR-001"),
2398 Currency::GBP(),
2399 ts,
2400 ts,
2401 )
2402 .expect("should produce fill even without avp");
2403
2404 assert_eq!(fill.last_qty.as_f64(), 25.0);
2406 assert_eq!(fill.last_px.as_f64(), 3.5);
2407 } else {
2408 panic!("expected OrderChange");
2409 }
2410 }
2411
2412 #[rstest]
2413 fn test_fill_tracker_weighted_avg_back_calculation() {
2414 let mut tracker = FillTracker::new();
2415 let instrument_id = InstrumentId::from("1.234567-999003-0.0.BETFAIR");
2416 let account_id = AccountId::from("BETFAIR-001");
2417 let currency = Currency::from("GBP");
2418 let ts = UnixNanos::default();
2419
2420 let uo1 = make_test_uo(
2422 "999003",
2423 Decimal::new(30, 0),
2424 Some(Decimal::new(10, 0)),
2425 Some(Decimal::new(20, 1)),
2426 );
2427 let fill1 = tracker
2428 .maybe_fill_report(&uo1, uo1.s, instrument_id, account_id, currency, ts, ts)
2429 .expect("first fill");
2430 assert_eq!(fill1.last_px.as_f64(), 2.0);
2431 assert_eq!(fill1.last_qty.as_f64(), 10.0);
2432
2433 let uo2 = make_test_uo(
2436 "999003",
2437 Decimal::new(30, 0),
2438 Some(Decimal::new(20, 0)),
2439 Some(Decimal::new(25, 1)),
2440 );
2441 let fill2 = tracker
2442 .maybe_fill_report(&uo2, uo2.s, instrument_id, account_id, currency, ts, ts)
2443 .expect("second fill");
2444 assert_eq!(fill2.last_qty.as_f64(), 10.0);
2445 assert_eq!(fill2.last_px.as_f64(), 3.0);
2446 }
2447
2448 #[rstest]
2449 fn test_fill_tracker_negative_fill_price_falls_back_to_avp() {
2450 let mut tracker = FillTracker::new();
2451 let instrument_id = InstrumentId::from("1.234567-999004-0.0.BETFAIR");
2452 let account_id = AccountId::from("BETFAIR-001");
2453 let currency = Currency::from("GBP");
2454 let ts = UnixNanos::default();
2455
2456 let uo1 = make_test_uo(
2458 "999004",
2459 Decimal::new(20, 0),
2460 Some(Decimal::new(10, 0)),
2461 Some(Decimal::new(50, 1)),
2462 );
2463 tracker
2464 .maybe_fill_report(&uo1, uo1.s, instrument_id, account_id, currency, ts, ts)
2465 .expect("first fill");
2466
2467 let uo2 = make_test_uo(
2471 "999004",
2472 Decimal::new(20, 0),
2473 Some(Decimal::new(15, 0)),
2474 Some(Decimal::new(10, 1)),
2475 );
2476 let fill2 = tracker
2477 .maybe_fill_report(&uo2, uo2.s, instrument_id, account_id, currency, ts, ts)
2478 .expect("second fill should use avp fallback");
2479 assert_eq!(fill2.last_qty.as_f64(), 5.0);
2480 assert_eq!(fill2.last_px.as_f64(), 1.0);
2481 }
2482
2483 #[rstest]
2484 fn test_fill_tracker_prune_clears_state() {
2485 let mut tracker = FillTracker::new();
2486 let instrument_id = InstrumentId::from("1.234567-999005-0.0.BETFAIR");
2487 let account_id = AccountId::from("BETFAIR-001");
2488 let currency = Currency::from("GBP");
2489 let ts = UnixNanos::default();
2490
2491 let uo = make_test_uo(
2493 "999005",
2494 Decimal::new(10, 0),
2495 Some(Decimal::new(10, 0)),
2496 Some(Decimal::new(25, 1)),
2497 );
2498 let fill1 =
2499 tracker.maybe_fill_report(&uo, uo.s, instrument_id, account_id, currency, ts, ts);
2500 assert!(fill1.is_some());
2501
2502 let fill2 =
2504 tracker.maybe_fill_report(&uo, uo.s, instrument_id, account_id, currency, ts, ts);
2505 assert!(fill2.is_none(), "should be deduplicated");
2506
2507 tracker.prune("999005");
2509
2510 let fill3 =
2512 tracker.maybe_fill_report(&uo, uo.s, instrument_id, account_id, currency, ts, ts);
2513 assert!(fill3.is_some(), "after prune, should produce fill again");
2514 }
2515
2516 #[rstest]
2517 fn test_fill_tracker_sm_none_returns_none() {
2518 let mut tracker = FillTracker::new();
2519
2520 let uo = make_test_uo("999006", Decimal::new(10, 0), None, None);
2522
2523 let instrument_id = InstrumentId::from("1.234567-999006-0.0.BETFAIR");
2524 let result = tracker.maybe_fill_report(
2525 &uo,
2526 uo.s,
2527 instrument_id,
2528 AccountId::from("BETFAIR-001"),
2529 Currency::from("GBP"),
2530 UnixNanos::default(),
2531 UnixNanos::default(),
2532 );
2533 assert!(result.is_none(), "None sm should not produce a fill");
2534 }
2535
2536 #[rstest]
2537 fn test_parse_order_status_report_missing_persistence_type_for_market_on_close() {
2538 let uo = UnmatchedOrder {
2539 s: Decimal::ZERO,
2540 pt: None,
2541 ot: StreamingOrderType::MarketOnClose,
2542 sr: Some(Decimal::new(10, 0)),
2543 ..make_test_uo("999007", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2544 };
2545
2546 let report = parse_order_status_report(
2547 &uo,
2548 InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2549 AccountId::from("BETFAIR-001"),
2550 UnixNanos::default(),
2551 UnixNanos::default(),
2552 )
2553 .unwrap();
2554
2555 assert_eq!(report.quantity, Quantity::from("10.00"));
2556 assert_eq!(report.time_in_force, TimeInForce::AtTheClose);
2557 }
2558
2559 #[rstest]
2560 fn test_parse_order_status_report_missing_persistence_type_for_limit_on_close() {
2561 let uo = UnmatchedOrder {
2562 s: Decimal::ZERO,
2563 pt: None,
2564 ot: StreamingOrderType::LimitOnClose,
2565 sr: Some(Decimal::new(10, 0)),
2566 ..make_test_uo("999013", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2567 };
2568
2569 let report = parse_order_status_report(
2570 &uo,
2571 InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2572 AccountId::from("BETFAIR-001"),
2573 UnixNanos::default(),
2574 UnixNanos::default(),
2575 )
2576 .unwrap();
2577
2578 assert_eq!(report.quantity, Quantity::from("10.00"));
2579 assert_eq!(report.time_in_force, TimeInForce::AtTheClose);
2580 }
2581
2582 #[rstest]
2583 fn test_parse_order_status_report_market_on_close_uses_bsp_liability() {
2584 let uo = UnmatchedOrder {
2585 s: Decimal::ZERO,
2586 bsp: Some(Decimal::new(20, 1)),
2587 pt: None,
2588 ot: StreamingOrderType::MarketOnClose,
2589 ..make_test_uo("999010", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2590 };
2591
2592 let report = parse_order_status_report(
2593 &uo,
2594 InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2595 AccountId::from("BETFAIR-001"),
2596 UnixNanos::default(),
2597 UnixNanos::default(),
2598 )
2599 .unwrap();
2600
2601 assert_eq!(report.quantity, Quantity::from("2.00"));
2602 assert_eq!(report.time_in_force, TimeInForce::AtTheClose);
2603 }
2604
2605 #[rstest]
2606 fn test_parse_order_status_report_fails_for_non_positive_quantity() {
2607 let uo = UnmatchedOrder {
2608 s: Decimal::ZERO,
2609 sm: Some(Decimal::ZERO),
2610 sr: Some(Decimal::ZERO),
2611 sc: Some(Decimal::ZERO),
2612 sl: Some(Decimal::ZERO),
2613 sv: Some(Decimal::ZERO),
2614 ..make_test_uo("999014", Decimal::ZERO, Some(Decimal::ZERO), None)
2615 };
2616
2617 let result = parse_order_status_report(
2618 &uo,
2619 InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2620 AccountId::from("BETFAIR-001"),
2621 UnixNanos::default(),
2622 UnixNanos::default(),
2623 );
2624
2625 assert!(result.is_err());
2626 assert!(
2627 result
2628 .unwrap_err()
2629 .to_string()
2630 .contains("failed to resolve positive quantity for stream order update 999014")
2631 );
2632 }
2633
2634 #[rstest]
2635 fn test_parse_order_status_report_includes_lapse_reason() {
2636 let uo = UnmatchedOrder {
2637 status: StreamingOrderStatus::ExecutionComplete,
2638 sl: Some(Decimal::ONE),
2639 lsrc: Some(crate::common::enums::LapseStatusReasonCode::SpInPlay),
2640 ..make_test_uo("999012", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2641 };
2642
2643 let report = parse_order_status_report(
2644 &uo,
2645 InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2646 AccountId::from("BETFAIR-001"),
2647 UnixNanos::default(),
2648 UnixNanos::default(),
2649 )
2650 .unwrap();
2651
2652 assert_eq!(report.order_status, OrderStatus::Canceled);
2653 assert_eq!(report.cancel_reason.as_deref(), Some("SP_IN_PLAY"));
2654 }
2655
2656 #[rstest]
2657 fn test_parse_order_status_report_missing_persistence_type_fails_for_limit_order() {
2658 let uo = UnmatchedOrder {
2659 pt: None,
2660 ..make_test_uo("999008", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2661 };
2662
2663 let result = parse_order_status_report(
2664 &uo,
2665 InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2666 AccountId::from("BETFAIR-001"),
2667 UnixNanos::default(),
2668 UnixNanos::default(),
2669 );
2670
2671 assert!(result.is_err());
2672 assert_eq!(
2673 result.unwrap_err().to_string(),
2674 "missing persistence type for order update 999008"
2675 );
2676 }
2677
2678 #[rstest]
2679 fn test_fill_tracker_uses_lifecycle_quantity_when_stream_size_is_zero() {
2680 let mut tracker = FillTracker::new();
2681 let uo = UnmatchedOrder {
2682 s: Decimal::ZERO,
2683 sr: Some(Decimal::new(10, 0)),
2684 sm: Some(Decimal::new(5, 0)),
2685 avp: Some(Decimal::new(20, 1)),
2686 ..make_test_uo("999009", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2687 };
2688
2689 let fill = tracker
2690 .maybe_fill_report(
2691 &uo,
2692 uo.s,
2693 InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2694 AccountId::from("BETFAIR-001"),
2695 Currency::from("GBP"),
2696 UnixNanos::default(),
2697 UnixNanos::default(),
2698 )
2699 .expect("zero stream size should fall back to lifecycle quantities");
2700
2701 assert_eq!(fill.last_qty, Quantity::from("5.00"));
2702 }
2703
2704 #[rstest]
2705 fn test_fill_tracker_uses_bsp_liability_when_stream_size_is_zero() {
2706 let mut tracker = FillTracker::new();
2707 let uo = UnmatchedOrder {
2708 s: Decimal::ZERO,
2709 bsp: Some(Decimal::new(20, 1)),
2710 pt: None,
2711 ot: StreamingOrderType::MarketOnClose,
2712 sm: Some(Decimal::new(10, 1)),
2713 avp: Some(Decimal::new(20, 1)),
2714 ..make_test_uo("999011", Decimal::new(10, 0), Some(Decimal::ZERO), None)
2715 };
2716
2717 let fill = tracker
2718 .maybe_fill_report(
2719 &uo,
2720 uo.s,
2721 InstrumentId::from("1.234567-123456-0.0.BETFAIR"),
2722 AccountId::from("BETFAIR-001"),
2723 Currency::from("GBP"),
2724 UnixNanos::default(),
2725 UnixNanos::default(),
2726 )
2727 .expect("zero stream size should fall back to bsp liability");
2728
2729 assert_eq!(fill.last_qty, Quantity::from("1.00"));
2730 }
2731
2732 #[rstest]
2733 fn test_fill_tracker_partial_void_still_emits_fill() {
2734 let data = load_test_json("stream/ocm_VOIDED_partial.json");
2735 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
2736
2737 if let StreamMessage::OrderChange(ocm) = msg {
2738 let oc = ocm.oc.as_ref().unwrap();
2739 let omc = &oc[0];
2740 let orc = &omc.orc.as_ref().unwrap()[0];
2741 let uo = &orc.uo.as_ref().unwrap()[0];
2742 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
2743 let ts = parse_millis_timestamp(ocm.pt);
2744
2745 let mut tracker = FillTracker::new();
2746 let fill = tracker
2747 .maybe_fill_report(
2748 uo,
2749 uo.s,
2750 instrument_id,
2751 AccountId::from("BETFAIR-001"),
2752 Currency::GBP(),
2753 ts,
2754 ts,
2755 )
2756 .expect("should produce fill for matched portion");
2757
2758 assert_eq!(fill.last_qty.as_f64(), 60.0);
2760 assert_eq!(fill.last_px.as_f64(), 1.5);
2761 assert_eq!(fill.order_side, OrderSide::Sell);
2762 } else {
2763 panic!("expected OrderChange");
2764 }
2765 }
2766
2767 #[rstest]
2768 fn test_fill_tracker_no_fill_when_sv_zero_and_fully_filled() {
2769 let data = load_test_json("stream/ocm_FILLED_sv_zero.json");
2770 let msg: StreamMessage = serde_json::from_str(&data).unwrap();
2771
2772 if let StreamMessage::OrderChange(ocm) = msg {
2773 let oc = ocm.oc.as_ref().unwrap();
2774 let omc = &oc[0];
2775 let orc = &omc.orc.as_ref().unwrap()[0];
2776 let uo = &orc.uo.as_ref().unwrap()[0];
2777 let instrument_id = make_instrument_id(&omc.id, orc.id, Decimal::ZERO);
2778 let ts = parse_millis_timestamp(ocm.pt);
2779
2780 let mut tracker = FillTracker::new();
2781 let fill = tracker
2782 .maybe_fill_report(
2783 uo,
2784 uo.s,
2785 instrument_id,
2786 AccountId::from("BETFAIR-001"),
2787 Currency::GBP(),
2788 ts,
2789 ts,
2790 )
2791 .expect("fully filled order should produce fill");
2792
2793 assert_eq!(fill.last_qty.as_f64(), 50.0);
2794 assert_eq!(fill.last_px.as_f64(), 2.0);
2795
2796 assert_eq!(uo.sv, Some(Decimal::ZERO));
2798 } else {
2799 panic!("expected OrderChange");
2800 }
2801 }
2802}