Skip to main content

nautilus_betfair/stream/
messages.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Betfair Exchange Stream API message definitions.
17//!
18//! The stream protocol uses newline-delimited JSON with an `op` field to
19//! discriminate message types. Field names are abbreviated for bandwidth
20//! efficiency (e.g. `pt` for publish time, `mc` for market changes).
21//!
22//! # References
23//!
24//! <https://docs.developer.betfair.com/display/1smk3cen4v3lu3yomq5qye0ni/Exchange+Stream+API>
25
26use std::str::FromStr;
27
28use ahash::AHashMap;
29use nautilus_core::serialization::{deserialize_decimal, deserialize_optional_decimal};
30use rust_decimal::Decimal;
31use serde::{Deserialize, Deserializer, Serialize, de::Visitor};
32use ustr::Ustr;
33
34use crate::common::{
35    consts::{STREAM_OP_AUTHENTICATION, STREAM_OP_HEARTBEAT, STREAM_OP_RACE_SUBSCRIPTION},
36    enums::{
37        ChangeType, LapseStatusReasonCode, MarketBettingType, MarketDataFilterField, MarketStatus,
38        PriceLadderType, RunnerStatus, SegmentType, StatusErrorCode, StreamingOrderStatus,
39        StreamingOrderType, StreamingPersistenceType, StreamingSide,
40    },
41    types::{
42        Handicap, MarketId, SelectionId, deserialize_optional_string_lenient,
43        deserialize_selection_id,
44    },
45};
46
47/// Top-level streaming message, discriminated by the `op` field.
48///
49/// Deserializing a raw JSON line into this enum replaces the Python
50/// `stream_decode()` function from `betfair_parser`.
51#[derive(Debug, Clone, Deserialize)]
52#[serde(tag = "op")]
53pub enum StreamMessage {
54    #[serde(rename = "connection")]
55    Connection(Connection),
56    #[serde(rename = "status")]
57    Status(Status),
58    #[serde(rename = "mcm")]
59    MarketChange(MCM),
60    #[serde(rename = "ocm")]
61    OrderChange(OCM),
62    #[serde(rename = "rcm")]
63    RaceChange(RCM),
64}
65
66/// Connection confirmation sent on stream connect.
67#[derive(Debug, Clone, Deserialize)]
68#[serde(rename_all = "camelCase")]
69pub struct Connection {
70    pub id: Option<u64>,
71    pub connection_id: String,
72}
73
74/// Status response for errors or informational messages.
75#[derive(Debug, Clone, Deserialize)]
76#[serde(rename_all = "camelCase")]
77pub struct Status {
78    pub id: Option<u64>,
79    pub connection_closed: bool,
80    pub connection_id: Option<String>,
81    pub connections_available: Option<u32>,
82    pub error_code: Option<StatusErrorCode>,
83    pub error_message: Option<String>,
84    pub status_code: Option<String>,
85}
86
87/// Market Change Message (MCM) - price/market data updates.
88#[derive(Debug, Clone, Deserialize)]
89pub struct MCM {
90    pub id: Option<u64>,
91    /// Publish time (epoch millis).
92    pub pt: u64,
93    /// Token used for resubscription.
94    pub clk: Option<String>,
95    /// Initial clock token (sent on first image).
96    #[serde(rename = "initialClk")]
97    pub initial_clk: Option<String>,
98    /// Market changes (None on heartbeat).
99    pub mc: Option<Vec<MarketChange>>,
100    /// Change type.
101    pub ct: Option<ChangeType>,
102    /// Conflation interval in milliseconds.
103    #[serde(rename = "conflateMs")]
104    pub conflate_ms: Option<u64>,
105    /// Heartbeat interval in milliseconds.
106    #[serde(rename = "heartbeatMs")]
107    pub heartbeat_ms: Option<u64>,
108    /// Segment type for large messages.
109    #[serde(rename = "segmentType")]
110    pub segment_type: Option<SegmentType>,
111    pub status: Option<i32>,
112}
113
114impl MCM {
115    #[must_use]
116    pub fn is_heartbeat(&self) -> bool {
117        self.ct == Some(ChangeType::Heartbeat)
118    }
119}
120
121/// Order Change Message (OCM) - order/position updates.
122#[derive(Debug, Clone, Deserialize)]
123pub struct OCM {
124    pub id: Option<u64>,
125    /// Publish time (epoch millis).
126    pub pt: u64,
127    pub clk: Option<String>,
128    #[serde(rename = "initialClk")]
129    pub initial_clk: Option<String>,
130    /// Order market changes (None on heartbeat).
131    pub oc: Option<Vec<OrderMarketChange>>,
132    pub ct: Option<ChangeType>,
133    #[serde(rename = "conflateMs")]
134    pub conflate_ms: Option<u64>,
135    #[serde(rename = "heartbeatMs")]
136    pub heartbeat_ms: Option<u64>,
137    #[serde(rename = "segmentType")]
138    pub segment_type: Option<SegmentType>,
139    pub status: Option<i32>,
140}
141
142impl OCM {
143    #[must_use]
144    pub fn is_heartbeat(&self) -> bool {
145        self.ct == Some(ChangeType::Heartbeat)
146    }
147}
148
149/// Delta update for a single market.
150#[derive(Debug, Clone, Deserialize)]
151pub struct MarketChange {
152    /// Market identifier.
153    pub id: MarketId,
154    /// Runner changes.
155    pub rc: Option<Vec<RunnerChange>>,
156    /// Whether there was a conflation.
157    pub con: Option<bool>,
158    /// Whether this is a full image (vs delta).
159    #[serde(default)]
160    pub img: bool,
161    /// Full market definition (sent on subscription or change).
162    #[serde(rename = "marketDefinition")]
163    pub market_definition: Option<MarketDefinition>,
164    /// Total volume matched on this market.
165    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
166    pub tv: Option<Decimal>,
167}
168
169/// Delta update for a single runner (selection).
170#[derive(Debug, Clone, Deserialize)]
171pub struct RunnerChange {
172    /// Selection identifier.
173    #[serde(deserialize_with = "deserialize_selection_id")]
174    pub id: SelectionId,
175    /// Handicap value.
176    pub hc: Option<Handicap>,
177    /// Available to back.
178    pub atb: Option<Vec<PV>>,
179    /// Available to lay.
180    pub atl: Option<Vec<PV>>,
181    /// Best available to back (depth).
182    pub batb: Option<Vec<LPV>>,
183    /// Best available to lay (depth).
184    pub batl: Option<Vec<LPV>>,
185    /// Best display available to back.
186    pub bdatb: Option<Vec<LPV>>,
187    /// Best display available to lay.
188    pub bdatl: Option<Vec<LPV>>,
189    /// Starting price back.
190    pub spb: Option<Vec<PV>>,
191    /// Starting price lay.
192    pub spl: Option<Vec<PV>>,
193    /// Starting price near (projected SP).
194    #[serde(default, deserialize_with = "deserialize_optional_decimal_lenient")]
195    pub spn: Option<Decimal>,
196    /// Starting price far (actual BSP).
197    #[serde(default, deserialize_with = "deserialize_optional_decimal_lenient")]
198    pub spf: Option<Decimal>,
199    /// Traded volume by price level.
200    pub trd: Option<Vec<PV>>,
201    /// Last traded price.
202    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
203    pub ltp: Option<Decimal>,
204    /// Total volume matched on this runner.
205    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
206    pub tv: Option<Decimal>,
207}
208
209fn deserialize_optional_decimal_lenient<'de, D>(
210    deserializer: D,
211) -> Result<Option<Decimal>, D::Error>
212where
213    D: Deserializer<'de>,
214{
215    struct LenientOptionalDecimalVisitor;
216
217    impl Visitor<'_> for LenientOptionalDecimalVisitor {
218        type Value = Option<Decimal>;
219
220        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
221            formatter.write_str("null or a decimal number as string, integer, or float")
222        }
223
224        fn visit_str<E: serde::de::Error>(self, value: &str) -> Result<Self::Value, E> {
225            Ok(parse_optional_decimal_lenient(value))
226        }
227
228        fn visit_string<E: serde::de::Error>(self, value: String) -> Result<Self::Value, E> {
229            self.visit_str(&value)
230        }
231
232        fn visit_i64<E: serde::de::Error>(self, value: i64) -> Result<Self::Value, E> {
233            Ok(Some(Decimal::from(value)))
234        }
235
236        fn visit_u64<E: serde::de::Error>(self, value: u64) -> Result<Self::Value, E> {
237            Ok(Some(Decimal::from(value)))
238        }
239
240        fn visit_i128<E: serde::de::Error>(self, value: i128) -> Result<Self::Value, E> {
241            Ok(Some(Decimal::from(value)))
242        }
243
244        fn visit_u128<E: serde::de::Error>(self, value: u128) -> Result<Self::Value, E> {
245            Ok(Some(Decimal::from(value)))
246        }
247
248        fn visit_f64<E: serde::de::Error>(self, value: f64) -> Result<Self::Value, E> {
249            Ok(Decimal::try_from(value).ok())
250        }
251
252        fn visit_unit<E: serde::de::Error>(self) -> Result<Self::Value, E> {
253            Ok(None)
254        }
255
256        fn visit_none<E: serde::de::Error>(self) -> Result<Self::Value, E> {
257            Ok(None)
258        }
259    }
260
261    deserializer.deserialize_any(LenientOptionalDecimalVisitor)
262}
263
264fn parse_optional_decimal_lenient(value: &str) -> Option<Decimal> {
265    let trimmed = value.trim();
266    if trimmed.is_empty() || is_non_finite_decimal(trimmed) {
267        return None;
268    }
269
270    if trimmed.contains('e') || trimmed.contains('E') {
271        Decimal::from_scientific(trimmed).ok()
272    } else {
273        Decimal::from_str(trimmed).ok()
274    }
275}
276
277fn is_non_finite_decimal(value: &str) -> bool {
278    value.eq_ignore_ascii_case("nan")
279        || value.eq_ignore_ascii_case("inf")
280        || value.eq_ignore_ascii_case("+inf")
281        || value.eq_ignore_ascii_case("-inf")
282        || value.eq_ignore_ascii_case("infinity")
283        || value.eq_ignore_ascii_case("+infinity")
284        || value.eq_ignore_ascii_case("-infinity")
285}
286
287/// Full market definition snapshot.
288#[derive(Debug, Clone, Deserialize)]
289#[serde(rename_all = "camelCase")]
290pub struct MarketDefinition {
291    pub bet_delay: Option<i32>,
292    pub betting_type: Option<MarketBettingType>,
293    pub bsp_market: Option<bool>,
294    pub bsp_reconciled: Option<bool>,
295    pub competition_id: Option<String>,
296    pub competition_name: Option<String>,
297    pub complete: Option<bool>,
298    pub country_code: Option<Ustr>,
299    pub cross_matching: Option<bool>,
300    pub discount_allowed: Option<bool>,
301    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
302    pub each_way_divisor: Option<Decimal>,
303    pub event_id: Option<String>,
304    pub event_name: Option<String>,
305    #[serde(default, deserialize_with = "deserialize_optional_string_lenient")]
306    pub event_type_id: Option<String>,
307    pub event_type_name: Option<Ustr>,
308    pub in_play: Option<bool>,
309    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
310    pub line_interval: Option<Decimal>,
311    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
312    pub line_max_unit: Option<Decimal>,
313    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
314    pub line_min_unit: Option<Decimal>,
315    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
316    pub market_base_rate: Option<Decimal>,
317    pub market_id: Option<MarketId>,
318    pub market_name: Option<String>,
319    pub market_time: Option<String>,
320    pub market_type: Option<Ustr>,
321    pub number_of_active_runners: Option<u32>,
322    pub number_of_winners: Option<u32>,
323    pub open_date: Option<String>,
324    pub persistence_enabled: Option<bool>,
325    pub price_ladder_definition: Option<PriceLadderDefinition>,
326    pub race_type: Option<Ustr>,
327    pub regulators: Option<Vec<Ustr>>,
328    pub runners: Option<Vec<RunnerDefinition>>,
329    pub runners_voidable: Option<bool>,
330    pub settled_time: Option<String>,
331    pub status: Option<MarketStatus>,
332    pub suspend_time: Option<String>,
333    pub timezone: Option<Ustr>,
334    pub turn_in_play_enabled: Option<bool>,
335    pub venue: Option<Ustr>,
336    pub version: Option<u64>,
337}
338
339/// Runner (selection) definition within a market definition.
340#[derive(Debug, Clone, Deserialize)]
341#[serde(rename_all = "camelCase")]
342pub struct RunnerDefinition {
343    #[serde(deserialize_with = "deserialize_selection_id")]
344    pub id: SelectionId,
345    pub hc: Option<Handicap>,
346    pub sort_priority: Option<u32>,
347    pub name: Option<String>,
348    pub status: Option<RunnerStatus>,
349    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
350    pub adjustment_factor: Option<Decimal>,
351    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
352    pub bsp: Option<Decimal>,
353    pub removal_date: Option<String>,
354}
355
356/// Price ladder definition within a market definition.
357#[derive(Debug, Clone, Deserialize)]
358pub struct PriceLadderDefinition {
359    #[serde(rename = "type")]
360    pub ladder_type: Option<PriceLadderType>,
361}
362
363// Betfair encodes price-volume types as JSON arrays: [price, volume] and
364// [level, price, volume] respectively.
365
366/// Price-volume pair, serialized as a JSON array `[price, volume]`.
367#[derive(Debug, Clone, Copy, PartialEq)]
368pub struct PV {
369    pub price: Decimal,
370    pub volume: Decimal,
371}
372
373impl<'de> Deserialize<'de> for PV {
374    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
375    where
376        D: serde::Deserializer<'de>,
377    {
378        // Handles both `[price, volume]` and `[level, price, volume]` (RESUB_DELTA)
379        let arr: Vec<Decimal> = Deserialize::deserialize(deserializer)?;
380        match arr.len() {
381            2 => Ok(Self {
382                price: arr[0],
383                volume: arr[1],
384            }),
385            3 => Ok(Self {
386                price: arr[1],
387                volume: arr[2],
388            }),
389            n => Err(serde::de::Error::invalid_length(n, &"2 or 3 elements")),
390        }
391    }
392}
393
394impl Serialize for PV {
395    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
396    where
397        S: serde::Serializer,
398    {
399        (self.price, self.volume).serialize(serializer)
400    }
401}
402
403/// Level-price-volume triple, serialized as a JSON array `[level, price, volume]`.
404#[derive(Debug, Clone, Copy, PartialEq)]
405pub struct LPV {
406    pub level: u32,
407    pub price: Decimal,
408    pub volume: Decimal,
409}
410
411impl<'de> Deserialize<'de> for LPV {
412    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
413    where
414        D: serde::Deserializer<'de>,
415    {
416        let arr: (u32, Decimal, Decimal) = Deserialize::deserialize(deserializer)?;
417        Ok(Self {
418            level: arr.0,
419            price: arr.1,
420            volume: arr.2,
421        })
422    }
423}
424
425impl Serialize for LPV {
426    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
427    where
428        S: serde::Serializer,
429    {
430        (self.level, self.price, self.volume).serialize(serializer)
431    }
432}
433
434/// Order changes for a single market.
435#[derive(Debug, Clone, Deserialize)]
436pub struct OrderMarketChange {
437    /// Market identifier.
438    pub id: MarketId,
439    #[serde(rename = "accountId")]
440    pub account_id: Option<u64>,
441    pub closed: Option<bool>,
442    #[serde(rename = "fullImage", default)]
443    pub full_image: bool,
444    /// Order runner changes.
445    pub orc: Option<Vec<OrderRunnerChange>>,
446}
447
448/// Order changes for a single runner within a market.
449#[derive(Debug, Clone, Deserialize)]
450pub struct OrderRunnerChange {
451    /// Selection identifier.
452    #[serde(deserialize_with = "deserialize_selection_id")]
453    pub id: SelectionId,
454    #[serde(rename = "fullImage", default)]
455    pub full_image: bool,
456    /// Handicap.
457    pub hc: Option<Handicap>,
458    /// Matched backs.
459    pub mb: Option<Vec<MatchedOrder>>,
460    /// Matched lays.
461    pub ml: Option<Vec<MatchedOrder>>,
462    /// Strategy match changes, keyed by customer strategy ref.
463    pub smc: Option<AHashMap<String, StrategyMatchChange>>,
464    /// Unmatched orders.
465    pub uo: Option<Vec<UnmatchedOrder>>,
466}
467
468/// Matched order (price-size pair), serialized as `[price, size]`.
469#[derive(Debug, Clone, Copy, PartialEq)]
470pub struct MatchedOrder {
471    pub price: Decimal,
472    pub size: Decimal,
473}
474
475impl<'de> Deserialize<'de> for MatchedOrder {
476    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
477    where
478        D: serde::Deserializer<'de>,
479    {
480        let arr: (Decimal, Decimal) = Deserialize::deserialize(deserializer)?;
481        Ok(Self {
482            price: arr.0,
483            size: arr.1,
484        })
485    }
486}
487
488impl Serialize for MatchedOrder {
489    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
490    where
491        S: serde::Serializer,
492    {
493        (self.price, self.size).serialize(serializer)
494    }
495}
496
497/// Strategy-level match changes.
498#[derive(Debug, Clone, Deserialize)]
499pub struct StrategyMatchChange {
500    /// Matched backs.
501    pub mb: Option<Vec<MatchedOrder>>,
502    /// Matched lays.
503    pub ml: Option<Vec<MatchedOrder>>,
504}
505
506/// Unmatched order on the streaming API.
507#[derive(Debug, Clone, Deserialize)]
508pub struct UnmatchedOrder {
509    /// Bet identifier.
510    pub id: String,
511    /// Price.
512    #[serde(deserialize_with = "deserialize_decimal")]
513    pub p: Decimal,
514    /// Size.
515    #[serde(deserialize_with = "deserialize_decimal")]
516    pub s: Decimal,
517    /// Side (B=Back, L=Lay).
518    pub side: StreamingSide,
519    /// Order status (E=Executable, EC=ExecutionComplete).
520    pub status: StreamingOrderStatus,
521    /// Persistence type (L=Lapse, P=Persist, MOC=MarketOnClose).
522    ///
523    /// Betfair can omit this on some BSP market-on-close order updates.
524    #[serde(default)]
525    pub pt: Option<StreamingPersistenceType>,
526    /// Order type (L=Limit, LOC=LimitOnClose, MOC=MarketOnClose).
527    pub ot: StreamingOrderType,
528    /// Placed date (epoch millis).
529    pub pd: u64,
530    /// BSP liability.
531    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
532    pub bsp: Option<Decimal>,
533    /// Customer strategy reference.
534    pub rfo: Option<String>,
535    /// Regulator reference.
536    pub rfs: Option<String>,
537    /// Customer order reference.
538    pub rc: Option<String>,
539    /// Regulator auth code.
540    pub rac: Option<String>,
541    /// Matched date (epoch millis).
542    pub md: Option<u64>,
543    /// Cancelled date (epoch millis).
544    pub cd: Option<u64>,
545    /// Lapsed date (epoch millis).
546    pub ld: Option<u64>,
547    /// Average price matched.
548    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
549    pub avp: Option<Decimal>,
550    /// Size matched.
551    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
552    pub sm: Option<Decimal>,
553    /// Size remaining.
554    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
555    pub sr: Option<Decimal>,
556    /// Size lapsed.
557    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
558    pub sl: Option<Decimal>,
559    /// Size cancelled.
560    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
561    pub sc: Option<Decimal>,
562    /// Size voided.
563    #[serde(default, deserialize_with = "deserialize_optional_decimal")]
564    pub sv: Option<Decimal>,
565    /// Lapse status reason code.
566    pub lsrc: Option<LapseStatusReasonCode>,
567}
568
569/// Authentication request sent on stream connect.
570#[derive(Debug, Clone, Serialize)]
571pub struct Authentication {
572    pub op: String,
573    pub id: Option<u64>,
574    #[serde(rename = "appKey")]
575    pub app_key: String,
576    pub session: String,
577}
578
579impl Authentication {
580    /// Creates a new authentication request.
581    #[must_use]
582    pub fn new(app_key: String, session: String) -> Self {
583        Self {
584            op: STREAM_OP_AUTHENTICATION.to_string(),
585            id: None,
586            app_key,
587            session,
588        }
589    }
590}
591
592/// Market subscription request.
593#[derive(Debug, Clone, Serialize)]
594#[serde(rename_all = "camelCase")]
595pub struct MarketSubscription {
596    pub op: String,
597    pub id: Option<u64>,
598    pub market_filter: StreamMarketFilter,
599    pub market_data_filter: MarketDataFilter,
600    #[serde(skip_serializing_if = "Option::is_none")]
601    pub clk: Option<String>,
602    #[serde(skip_serializing_if = "Option::is_none")]
603    pub conflate_ms: Option<u64>,
604    #[serde(skip_serializing_if = "Option::is_none")]
605    pub heartbeat_ms: Option<u64>,
606    #[serde(skip_serializing_if = "Option::is_none")]
607    pub initial_clk: Option<String>,
608    #[serde(skip_serializing_if = "Option::is_none")]
609    pub segmentation_enabled: Option<bool>,
610}
611
612/// Order subscription request.
613#[derive(Debug, Clone, Serialize)]
614#[serde(rename_all = "camelCase")]
615pub struct OrderSubscription {
616    pub op: String,
617    pub id: Option<u64>,
618    #[serde(skip_serializing_if = "Option::is_none")]
619    pub order_filter: Option<OrderFilter>,
620    #[serde(skip_serializing_if = "Option::is_none")]
621    pub clk: Option<String>,
622    #[serde(skip_serializing_if = "Option::is_none")]
623    pub conflate_ms: Option<u64>,
624    #[serde(skip_serializing_if = "Option::is_none")]
625    pub heartbeat_ms: Option<u64>,
626    #[serde(skip_serializing_if = "Option::is_none")]
627    pub initial_clk: Option<String>,
628    #[serde(skip_serializing_if = "Option::is_none")]
629    pub segmentation_enabled: Option<bool>,
630}
631
632/// Race stream subscription request.
633#[derive(Debug, Clone, Serialize)]
634#[serde(rename_all = "camelCase")]
635pub struct RaceSubscription {
636    pub op: String,
637    pub id: Option<u64>,
638}
639
640impl RaceSubscription {
641    #[must_use]
642    pub fn new(id: u64) -> Self {
643        Self {
644            op: STREAM_OP_RACE_SUBSCRIPTION.to_string(),
645            id: Some(id),
646        }
647    }
648}
649
650/// Heartbeat request to keep the connection alive.
651#[derive(Debug, Clone, Serialize)]
652pub struct StreamHeartbeat {
653    pub op: String,
654    pub id: Option<u64>,
655}
656
657impl StreamHeartbeat {
658    #[must_use]
659    pub fn new() -> Self {
660        Self {
661            op: STREAM_OP_HEARTBEAT.to_string(),
662            id: None,
663        }
664    }
665}
666
667impl Default for StreamHeartbeat {
668    fn default() -> Self {
669        Self::new()
670    }
671}
672
673/// Market filter for streaming subscriptions.
674#[derive(Debug, Clone, Default, Serialize, Deserialize)]
675#[serde(rename_all = "camelCase")]
676pub struct StreamMarketFilter {
677    #[serde(skip_serializing_if = "Option::is_none")]
678    pub betting_types: Option<Vec<MarketBettingType>>,
679    #[serde(skip_serializing_if = "Option::is_none")]
680    pub bsp_market: Option<bool>,
681    #[serde(skip_serializing_if = "Option::is_none")]
682    pub country_codes: Option<Vec<Ustr>>,
683    #[serde(skip_serializing_if = "Option::is_none")]
684    pub event_ids: Option<Vec<String>>,
685    #[serde(skip_serializing_if = "Option::is_none")]
686    pub event_type_ids: Option<Vec<String>>,
687    #[serde(skip_serializing_if = "Option::is_none")]
688    pub market_ids: Option<Vec<MarketId>>,
689    #[serde(skip_serializing_if = "Option::is_none")]
690    pub market_types: Option<Vec<Ustr>>,
691    #[serde(skip_serializing_if = "Option::is_none")]
692    pub race_types: Option<Vec<Ustr>>,
693    #[serde(skip_serializing_if = "Option::is_none")]
694    pub turn_in_play_enabled: Option<bool>,
695    #[serde(skip_serializing_if = "Option::is_none")]
696    pub venues: Option<Vec<Ustr>>,
697}
698
699/// Market data filter for streaming subscriptions.
700#[derive(Debug, Clone, Default, Serialize, Deserialize)]
701#[serde(rename_all = "camelCase")]
702pub struct MarketDataFilter {
703    #[serde(skip_serializing_if = "Option::is_none")]
704    pub fields: Option<Vec<MarketDataFilterField>>,
705    #[serde(skip_serializing_if = "Option::is_none")]
706    pub ladder_levels: Option<u32>,
707}
708
709/// Order filter for streaming subscriptions.
710#[derive(Debug, Clone, Serialize, Deserialize)]
711#[serde(rename_all = "camelCase")]
712pub struct OrderFilter {
713    #[serde(default = "default_true")]
714    pub include_overall_position: bool,
715    #[serde(skip_serializing_if = "Option::is_none")]
716    pub customer_strategy_refs: Option<Vec<String>>,
717    #[serde(default)]
718    pub partition_matched_by_strategy_ref: bool,
719    #[serde(skip_serializing_if = "Option::is_none")]
720    pub account_ids: Option<Vec<u64>>,
721}
722
723impl Default for OrderFilter {
724    fn default() -> Self {
725        Self {
726            include_overall_position: true,
727            customer_strategy_refs: None,
728            partition_matched_by_strategy_ref: false,
729            account_ids: None,
730        }
731    }
732}
733
734fn default_true() -> bool {
735    true
736}
737
738/// Race Change Message (RCM) - live GPS tracking data (Total Performance Data).
739#[derive(Debug, Clone, Deserialize)]
740pub struct RCM {
741    pub id: Option<u64>,
742    /// Publish time (epoch millis).
743    pub pt: u64,
744    /// Clock token (may be integer or string depending on feed state).
745    pub clk: Option<serde_json::Value>,
746    /// Race changes (None on heartbeat).
747    pub rc: Option<Vec<RaceChange>>,
748}
749
750/// Delta update for a single race within an RCM.
751#[derive(Debug, Clone, Deserialize)]
752pub struct RaceChange {
753    /// Race identifier (e.g. "28587288.1650").
754    pub id: Option<String>,
755    /// Betfair market identifier.
756    pub mid: Option<String>,
757    /// Individual runner GPS data changes.
758    pub rrc: Option<Vec<RaceRunnerChange>>,
759    /// Overall race progress summary.
760    pub rpc: Option<RaceProgressChange>,
761}
762
763/// GPS tracking data for a single runner.
764#[derive(Debug, Clone, Deserialize)]
765pub struct RaceRunnerChange {
766    /// Feed time (epoch millis).
767    pub ft: Option<u64>,
768    /// Selection identifier.
769    pub id: Option<i64>,
770    /// Latitude (GPS coordinate).
771    pub lat: Option<f64>,
772    /// Longitude (GPS coordinate).
773    #[serde(rename = "long")]
774    pub lng: Option<f64>,
775    /// Speed in m/s (Doppler-derived).
776    pub spd: Option<f64>,
777    /// Distance to finish in meters.
778    pub prg: Option<f64>,
779    /// Stride frequency in Hz.
780    pub sfq: Option<f64>,
781}
782
783/// Race-level progress summary.
784#[derive(Debug, Clone, Deserialize)]
785pub struct RaceProgressChange {
786    /// Feed time (epoch millis).
787    pub ft: Option<u64>,
788    /// Gate/sectional name (e.g. "1f", "2f", "Finish").
789    pub g: Option<String>,
790    /// Sectional time in seconds.
791    pub st: Option<f64>,
792    /// Running time since race start in seconds.
793    pub rt: Option<f64>,
794    /// Speed of lead horse in m/s.
795    pub spd: Option<f64>,
796    /// Distance to finish for leading horse in meters.
797    pub prg: Option<f64>,
798    /// Runner order by selection ID (current race position).
799    pub ord: Option<Vec<i64>>,
800    /// Obstacle data for jump races.
801    #[serde(rename = "J")]
802    pub jumps: Option<Vec<Jump>>,
803}
804
805/// Jump obstacle location data.
806#[derive(Debug, Clone, Serialize, Deserialize)]
807pub struct Jump {
808    /// Jump number.
809    #[serde(rename = "J")]
810    pub number: i32,
811    /// Distance from finish line in meters.
812    #[serde(rename = "L")]
813    pub distance: f64,
814}
815
816/// Decode a single JSON stream line into a [`StreamMessage`].
817///
818/// # Errors
819///
820/// Returns an error if the JSON is malformed or the `op` field is missing/unknown.
821pub fn stream_decode(data: &[u8]) -> Result<StreamMessage, serde_json::Error> {
822    serde_json::from_slice(data)
823}
824
825#[cfg(test)]
826mod tests {
827    use rstest::rstest;
828
829    use super::*;
830    use crate::common::testing::load_test_json;
831
832    #[rstest]
833    #[case("stream/ocm_NEW_FULL_IMAGE.json")]
834    #[case("stream/ocm_FILLED.json")]
835    #[case("stream/ocm_FULL_IMAGE.json")]
836    #[case("stream/ocm_FULL_IMAGE_STRATEGY.json")]
837    #[case("stream/ocm_CANCEL.json")]
838    #[case("stream/ocm_UPDATE.json")]
839    #[case("stream/ocm_SUB_IMAGE.json")]
840    #[case("stream/ocm_MIXED.json")]
841    #[case("stream/ocm_EMPTY_IMAGE.json")]
842    #[case("stream/ocm_error_fill.json")]
843    #[case("stream/ocm_filled_different_price.json")]
844    #[case("stream/ocm_order_update.json")]
845    fn test_stream_decode_ocm_fixtures(#[case] fixture: &str) {
846        let data = load_test_json(fixture);
847        let msg = stream_decode(data.as_bytes()).unwrap_or_else(|e| panic!("{fixture}: {e}"));
848        assert!(matches!(msg, StreamMessage::OrderChange(_)), "{fixture}");
849    }
850
851    #[rstest]
852    #[case("stream/mcm_SUB_IMAGE.json")]
853    #[case("stream/mcm_SUB_IMAGE_no_market_def.json")]
854    #[case("stream/mcm_UPDATE.json")]
855    #[case("stream/mcm_UPDATE_md.json")]
856    #[case("stream/mcm_UPDATE_tv.json")]
857    #[case("stream/mcm_HEARTBEAT.json")]
858    #[case("stream/mcm_RESUB_DELTA.json")]
859    #[case("stream/mcm_live_IMAGE.json")]
860    #[case("stream/mcm_live_UPDATE.json")]
861    #[case("stream/mcm_latency.json")]
862    #[case("stream/market_definition_racing.json")]
863    #[case("stream/market_definition_runner_removed.json")]
864    fn test_stream_decode_mcm_fixtures(#[case] fixture: &str) {
865        let data = load_test_json(fixture);
866        let msg = stream_decode(data.as_bytes()).unwrap_or_else(|e| panic!("{fixture}: {e}"));
867        assert!(matches!(msg, StreamMessage::MarketChange(_)), "{fixture}");
868    }
869
870    /// Fixtures containing a JSON array of multiple MCM messages.
871    #[rstest]
872    #[case("stream/mcm_BSP.json")]
873    #[case("stream/market_updates.json")]
874    fn test_stream_decode_mcm_multi_fixtures(#[case] fixture: &str) {
875        let data = load_test_json(fixture);
876        let msgs: Vec<StreamMessage> =
877            serde_json::from_str(&data).unwrap_or_else(|e| panic!("{fixture}: {e}"));
878        assert!(!msgs.is_empty(), "{fixture}: empty array");
879        for msg in &msgs {
880            assert!(matches!(msg, StreamMessage::MarketChange(_)), "{fixture}");
881        }
882    }
883
884    /// Fixtures containing a JSON array of multiple OCM messages.
885    #[rstest]
886    #[case("stream/ocm_multiple_fills.json")]
887    #[case("stream/ocm_DUPLICATE_EXECUTION.json")]
888    fn test_stream_decode_ocm_multi_fixtures(#[case] fixture: &str) {
889        let data = load_test_json(fixture);
890        let msgs: Vec<StreamMessage> =
891            serde_json::from_str(&data).unwrap_or_else(|e| panic!("{fixture}: {e}"));
892        assert!(!msgs.is_empty(), "{fixture}: empty array");
893        for msg in &msgs {
894            assert!(matches!(msg, StreamMessage::OrderChange(_)), "{fixture}");
895        }
896    }
897
898    #[rstest]
899    fn test_stream_decode_connection() {
900        let data = load_test_json("stream/connection.json");
901        let msg = stream_decode(data.as_bytes()).unwrap();
902        match msg {
903            StreamMessage::Connection(conn) => {
904                assert_eq!(conn.connection_id, "002-051134157842-432409");
905            }
906            other => panic!("Expected Connection, was {other:?}"),
907        }
908    }
909
910    #[rstest]
911    fn test_stream_decode_status() {
912        let data = load_test_json("stream/status.json");
913        let msg = stream_decode(data.as_bytes()).unwrap();
914        assert!(matches!(msg, StreamMessage::Status(_)));
915    }
916
917    #[rstest]
918    fn test_stream_decode_lenient_sp_fields() {
919        let data = r#"{
920            "op":"mcm",
921            "pt":1773304044929,
922            "mc":[{
923                "id":"1.255095842",
924                "rc":[{
925                    "id":96146807,
926                    "spn":"Infinity",
927                    "spf":"NaN",
928                    "ltp":5.0,
929                    "tv":10.63
930                }]
931            }]
932        }"#;
933
934        let msg = stream_decode(data.as_bytes()).unwrap();
935
936        match msg {
937            StreamMessage::MarketChange(mcm) => {
938                let rc = &mcm.mc.as_ref().unwrap()[0].rc.as_ref().unwrap()[0];
939                assert_eq!(rc.spn, None);
940                assert_eq!(rc.spf, None);
941                assert_eq!(rc.ltp, Some(Decimal::new(50, 1)));
942                assert_eq!(rc.tv, Some(Decimal::new(1063, 2)));
943            }
944            other => panic!("Expected MarketChange, was {other:?}"),
945        }
946    }
947
948    #[rstest]
949    fn test_market_definition_standalone() {
950        let data = load_test_json("stream/market_definition.json");
951        let _def: MarketDefinition = serde_json::from_str(&data).unwrap();
952    }
953
954    #[rstest]
955    #[case("rest/market_definition_open.json")]
956    #[case("rest/market_definition_closed.json")]
957    #[case("rest/market_definition_runner_removed.json")]
958    fn test_market_definition_response_fixtures(#[case] fixture: &str) {
959        let data = load_test_json(fixture);
960        let _def: MarketDefinition = serde_json::from_str(&data).unwrap();
961    }
962
963    #[rstest]
964    fn test_stream_decode_rcm_single() {
965        let data = load_test_json("stream/rcm_single.json");
966        let msg = stream_decode(data.as_bytes()).unwrap();
967        match msg {
968            StreamMessage::RaceChange(rcm) => {
969                let rc = rcm.rc.as_ref().unwrap();
970                assert_eq!(rc.len(), 1);
971
972                let race = &rc[0];
973                assert_eq!(race.id.as_deref(), Some("28587288.1650"));
974                assert_eq!(race.mid.as_deref(), Some("1.1234567"));
975
976                let runners = race.rrc.as_ref().unwrap();
977                assert_eq!(runners.len(), 1);
978                assert_eq!(runners[0].id, Some(7390417));
979                assert!((runners[0].lat.unwrap() - 51.4189543).abs() < 1e-6);
980                assert!((runners[0].spd.unwrap() - 17.8).abs() < 1e-6);
981                assert!((runners[0].sfq.unwrap() - 2.07).abs() < 1e-6);
982
983                let progress = race.rpc.as_ref().unwrap();
984                assert_eq!(progress.g.as_deref(), Some("1f"));
985                assert!((progress.st.unwrap() - 10.6).abs() < 1e-6);
986                assert!((progress.rt.unwrap() - 46.7).abs() < 1e-6);
987
988                let order = progress.ord.as_ref().unwrap();
989                assert_eq!(order.len(), 5);
990                assert_eq!(order[0], 7390417);
991
992                let jumps = progress.jumps.as_ref().unwrap();
993                assert_eq!(jumps.len(), 2);
994                assert_eq!(jumps[0].number, 2);
995                assert!((jumps[0].distance - 370.1).abs() < 1e-6);
996            }
997            other => panic!("Expected RaceChange, was {other:?}"),
998        }
999    }
1000
1001    #[rstest]
1002    fn test_stream_decode_rcm_multi_runner() {
1003        let data = load_test_json("stream/rcm_multi_runner.json");
1004        let msg = stream_decode(data.as_bytes()).unwrap();
1005        match msg {
1006            StreamMessage::RaceChange(rcm) => {
1007                let rc = rcm.rc.as_ref().unwrap();
1008                let runners = rc[0].rrc.as_ref().unwrap();
1009                assert_eq!(runners.len(), 5);
1010
1011                let ids: Vec<i64> = runners.iter().filter_map(|r| r.id).collect();
1012                assert_eq!(ids, vec![35467839, 24947967, 299569, 31422647, 41694785]);
1013            }
1014            other => panic!("Expected RaceChange, was {other:?}"),
1015        }
1016    }
1017
1018    #[rstest]
1019    fn test_stream_decode_ocm_voided() {
1020        let data = load_test_json("stream/ocm_VOIDED.json");
1021        let msg = stream_decode(data.as_bytes()).unwrap();
1022        match msg {
1023            StreamMessage::OrderChange(ocm) => {
1024                let oc = ocm.oc.as_ref().unwrap();
1025                let orc = oc[0].orc.as_ref().unwrap();
1026                let uo = &orc[0].uo.as_ref().unwrap()[0];
1027                assert_eq!(uo.sv.unwrap(), rust_decimal::Decimal::from(50));
1028                assert_eq!(uo.sm.unwrap(), rust_decimal::Decimal::from(50));
1029                assert_eq!(uo.s, rust_decimal::Decimal::from(100));
1030            }
1031            other => panic!("Expected OrderChange, was {other:?}"),
1032        }
1033    }
1034
1035    #[rstest]
1036    fn test_stream_decode_ocm_missing_persistence_type_for_market_on_close() {
1037        let data = r#"{
1038            "op":"ocm",
1039            "id":1,
1040            "pt":1775175455685,
1041            "clk":"clk-1",
1042            "oc":[{
1043                "id":"1.256134154",
1044                "orc":[{
1045                    "id":77465280,
1046                    "uo":[{
1047                        "id":"424009603606",
1048                        "p":1.01,
1049                        "s":2.00,
1050                        "side":"B",
1051                        "status":"E",
1052                        "ot":"MOC",
1053                        "pd":1775175455000,
1054                        "sr":2.00
1055                    }]
1056                }]
1057            }]
1058        }"#;
1059
1060        let msg = stream_decode(data.as_bytes()).unwrap();
1061
1062        match msg {
1063            StreamMessage::OrderChange(ocm) => {
1064                let oc = ocm.oc.as_ref().unwrap();
1065                let orc = oc[0].orc.as_ref().unwrap();
1066                let uo = &orc[0].uo.as_ref().unwrap()[0];
1067                assert_eq!(uo.pt, None);
1068                assert_eq!(
1069                    uo.ot,
1070                    crate::common::enums::StreamingOrderType::MarketOnClose
1071                );
1072            }
1073            other => panic!("Expected OrderChange, was {other:?}"),
1074        }
1075    }
1076}