Skip to main content

nautilus_betfair/
data_types.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Betfair-specific custom data types.
17//!
18//! These types carry Betfair domain data through the Nautilus data engine as
19//! [`CustomData`](nautilus_model::data::CustomData). Each type uses the
20//! `#[custom_data(pyo3)]` macro which generates `CustomDataTrait`, Arrow codec, and
21//! serialization implementations.
22//!
23//! Call [`register_betfair_custom_data`] once (e.g. during client `connect()`)
24//! to register all types for JSON and Arrow encoding.
25//!
26//! Absent optional float values use `f64::NAN` as the sentinel, matching
27//! Betfair's convention for missing starting price values.
28
29use nautilus_core::UnixNanos;
30use nautilus_model::identifiers::InstrumentId;
31use nautilus_persistence_macros::custom_data;
32
33/// Serde helpers for f64 fields that use NaN as a sentinel for absent values.
34/// Serializes NaN as JSON `null` and deserializes `null` back to NaN,
35/// avoiding `serde_json` errors on non-finite floats.
36mod nan_as_null {
37    pub fn serialize<S: serde::Serializer>(v: &f64, s: S) -> Result<S::Ok, S::Error> {
38        if v.is_nan() {
39            s.serialize_none()
40        } else {
41            s.serialize_f64(*v)
42        }
43    }
44
45    pub fn deserialize<'de, D: serde::Deserializer<'de>>(d: D) -> Result<f64, D::Error> {
46        use serde::Deserialize;
47        Ok(Option::<f64>::deserialize(d)?.unwrap_or(f64::NAN))
48    }
49}
50
51/// Betfair ticker data from MCM runner changes.
52///
53/// Carries last traded price, traded volume, and starting price
54/// near/far values per runner. Fields are `f64::NAN` when absent.
55#[custom_data(pyo3)]
56pub struct BetfairTicker {
57    /// The instrument ID for this ticker.
58    pub instrument_id: InstrumentId,
59    /// Last traded price.
60    #[serde(
61        serialize_with = "nan_as_null::serialize",
62        deserialize_with = "nan_as_null::deserialize"
63    )]
64    pub last_traded_price: f64,
65    /// Total traded volume.
66    #[serde(
67        serialize_with = "nan_as_null::serialize",
68        deserialize_with = "nan_as_null::deserialize"
69    )]
70    pub traded_volume: f64,
71    /// Starting price near (projected BSP from matched portion).
72    #[serde(
73        serialize_with = "nan_as_null::serialize",
74        deserialize_with = "nan_as_null::deserialize"
75    )]
76    pub starting_price_near: f64,
77    /// Starting price far (projected BSP from unmatched portion).
78    #[serde(
79        serialize_with = "nan_as_null::serialize",
80        deserialize_with = "nan_as_null::deserialize"
81    )]
82    pub starting_price_far: f64,
83    /// UNIX timestamp (nanoseconds) when the data event occurred.
84    pub ts_event: UnixNanos,
85    /// UNIX timestamp (nanoseconds) when the instance was initialized.
86    pub ts_init: UnixNanos,
87}
88
89/// Realized Betfair Starting Price (BSP) for a runner.
90///
91/// Emitted from the market definition when a runner's BSP is determined.
92#[custom_data(pyo3)]
93pub struct BetfairStartingPrice {
94    /// The instrument ID for this starting price.
95    pub instrument_id: InstrumentId,
96    /// The realized best starting price value.
97    pub bsp: f64,
98    /// UNIX timestamp (nanoseconds) when the data event occurred.
99    pub ts_event: UnixNanos,
100    /// UNIX timestamp (nanoseconds) when the instance was initialized.
101    pub ts_init: UnixNanos,
102}
103
104/// BSP order book delta from starting price back/lay arrays.
105///
106/// Mirrors `OrderBookDelta` fields as a custom data type so strategies
107/// can subscribe specifically to BSP book updates (spb/spl) separately
108/// from the exchange order book (atb/atl).
109#[custom_data(pyo3)]
110pub struct BetfairBspBookDelta {
111    /// The instrument ID for this BSP delta.
112    pub instrument_id: InstrumentId,
113    /// The book action (add/update/delete/clear) as `BookAction` u8.
114    pub action: u32,
115    /// The order side as `OrderSide` u8.
116    pub side: u32,
117    /// The price level.
118    pub price: f64,
119    /// The size at this price level.
120    pub size: f64,
121    /// UNIX timestamp (nanoseconds) when the data event occurred.
122    pub ts_event: UnixNanos,
123    /// UNIX timestamp (nanoseconds) when the instance was initialized.
124    pub ts_init: UnixNanos,
125}
126
127/// Marker emitted after all changes in a single MCM batch are processed.
128///
129/// Strategies can use this to know when a coherent set of market updates
130/// has been fully delivered.
131#[custom_data(pyo3)]
132pub struct BetfairSequenceCompleted {
133    /// UNIX timestamp (nanoseconds) when the data event occurred.
134    pub ts_event: UnixNanos,
135    /// UNIX timestamp (nanoseconds) when the instance was initialized.
136    pub ts_init: UnixNanos,
137}
138
139/// Betfair order void event (e.g. VAR void).
140///
141/// Published when a matched bet is retroactively voided by Betfair, such as
142/// when a goal is disallowed following a VAR review.
143#[custom_data(pyo3)]
144pub struct BetfairOrderVoided {
145    /// The instrument ID for the voided order.
146    pub instrument_id: InstrumentId,
147    /// The client order ID.
148    pub client_order_id: String,
149    /// The venue (Betfair) order ID (bet ID).
150    pub venue_order_id: String,
151    /// The size that was voided.
152    pub size_voided: f64,
153    /// The order price.
154    pub price: f64,
155    /// The original order size.
156    pub size: f64,
157    /// The order side ("BACK" or "LAY").
158    pub side: String,
159    /// The average price matched. `f64::NAN` if absent.
160    #[serde(
161        serialize_with = "nan_as_null::serialize",
162        deserialize_with = "nan_as_null::deserialize"
163    )]
164    pub avg_price_matched: f64,
165    /// The total size matched. `f64::NAN` if absent.
166    #[serde(
167        serialize_with = "nan_as_null::serialize",
168        deserialize_with = "nan_as_null::deserialize"
169    )]
170    pub size_matched: f64,
171    /// The void reason. Empty string if absent.
172    pub reason: String,
173    /// UNIX timestamp (nanoseconds) when the data event occurred.
174    pub ts_event: UnixNanos,
175    /// UNIX timestamp (nanoseconds) when the instance was initialized.
176    pub ts_init: UnixNanos,
177}
178
179/// GPS tracking data for a single runner from RCM (Race Change Messages).
180///
181/// Betfair's Total Performance Data (TPD) provides real-time GPS positions,
182/// speed, and stride frequency for each runner in supported races.
183#[custom_data(pyo3)]
184pub struct BetfairRaceRunnerData {
185    /// Race identifier (e.g. "28587288.1650").
186    pub race_id: String,
187    /// Betfair market identifier.
188    pub market_id: String,
189    /// Betfair selection identifier.
190    pub selection_id: i64,
191    /// GPS latitude coordinate. `f64::NAN` if absent.
192    #[serde(
193        serialize_with = "nan_as_null::serialize",
194        deserialize_with = "nan_as_null::deserialize"
195    )]
196    pub latitude: f64,
197    /// GPS longitude coordinate. `f64::NAN` if absent.
198    #[serde(
199        serialize_with = "nan_as_null::serialize",
200        deserialize_with = "nan_as_null::deserialize"
201    )]
202    pub longitude: f64,
203    /// Speed in m/s (Doppler-derived). `f64::NAN` if absent.
204    #[serde(
205        serialize_with = "nan_as_null::serialize",
206        deserialize_with = "nan_as_null::deserialize"
207    )]
208    pub speed: f64,
209    /// Distance to finish in meters. `f64::NAN` if absent.
210    #[serde(
211        serialize_with = "nan_as_null::serialize",
212        deserialize_with = "nan_as_null::deserialize"
213    )]
214    pub progress: f64,
215    /// Stride frequency in Hz. `f64::NAN` if absent.
216    #[serde(
217        serialize_with = "nan_as_null::serialize",
218        deserialize_with = "nan_as_null::deserialize"
219    )]
220    pub stride_frequency: f64,
221    /// UNIX timestamp (nanoseconds) when the data event occurred.
222    pub ts_event: UnixNanos,
223    /// UNIX timestamp (nanoseconds) when the instance was initialized.
224    pub ts_init: UnixNanos,
225}
226
227/// Race-level progress from RCM (Race Change Messages).
228///
229/// Provides sectional timing, race order, and obstacle data for the
230/// overall race rather than individual runners.
231#[custom_data(pyo3)]
232pub struct BetfairRaceProgress {
233    /// Race identifier (e.g. "28587288.1650").
234    pub race_id: String,
235    /// Betfair market identifier.
236    pub market_id: String,
237    /// Gate/sectional name (e.g. "1f", "2f", "Finish"). Empty if absent.
238    pub gate_name: String,
239    /// Sectional time in seconds. `f64::NAN` if absent.
240    #[serde(
241        serialize_with = "nan_as_null::serialize",
242        deserialize_with = "nan_as_null::deserialize"
243    )]
244    pub sectional_time: f64,
245    /// Running time since race start in seconds. `f64::NAN` if absent.
246    #[serde(
247        serialize_with = "nan_as_null::serialize",
248        deserialize_with = "nan_as_null::deserialize"
249    )]
250    pub running_time: f64,
251    /// Speed of lead horse in m/s. `f64::NAN` if absent.
252    #[serde(
253        serialize_with = "nan_as_null::serialize",
254        deserialize_with = "nan_as_null::deserialize"
255    )]
256    pub speed: f64,
257    /// Distance to finish for leading horse in meters. `f64::NAN` if absent.
258    #[serde(
259        serialize_with = "nan_as_null::serialize",
260        deserialize_with = "nan_as_null::deserialize"
261    )]
262    pub progress: f64,
263    /// Runner order by selection ID (JSON-encoded array). Empty if absent.
264    pub order: String,
265    /// Jump obstacles (JSON-encoded array of {"J":int,"L":float}). Empty if absent.
266    pub jumps: String,
267    /// UNIX timestamp (nanoseconds) when the data event occurred.
268    pub ts_event: UnixNanos,
269    /// UNIX timestamp (nanoseconds) when the instance was initialized.
270    pub ts_init: UnixNanos,
271}
272
273/// Registers all Betfair custom data types for JSON and Arrow encoding.
274///
275/// This must be called once before emitting or persisting Betfair custom data.
276/// Safe to call multiple times (idempotent via internal `Once` guards).
277pub fn register_betfair_custom_data() {
278    nautilus_serialization::ensure_custom_data_registered::<BetfairTicker>();
279    nautilus_serialization::ensure_custom_data_registered::<BetfairStartingPrice>();
280    nautilus_serialization::ensure_custom_data_registered::<BetfairBspBookDelta>();
281    nautilus_serialization::ensure_custom_data_registered::<BetfairSequenceCompleted>();
282    nautilus_serialization::ensure_custom_data_registered::<BetfairOrderVoided>();
283    nautilus_serialization::ensure_custom_data_registered::<BetfairRaceRunnerData>();
284    nautilus_serialization::ensure_custom_data_registered::<BetfairRaceProgress>();
285}
286
287#[cfg(test)]
288mod tests {
289    use nautilus_serialization::arrow::ArrowSchemaProvider;
290    use rstest::rstest;
291
292    use super::*;
293
294    #[rstest]
295    fn test_betfair_ticker_schema() {
296        let schema = BetfairTicker::get_schema(None);
297        let field_names: Vec<_> = schema.fields().iter().map(|f| f.name().clone()).collect();
298        assert!(field_names.contains(&"instrument_id".to_string()));
299        assert!(field_names.contains(&"last_traded_price".to_string()));
300        assert!(field_names.contains(&"traded_volume".to_string()));
301        assert!(field_names.contains(&"starting_price_near".to_string()));
302        assert!(field_names.contains(&"starting_price_far".to_string()));
303        assert!(field_names.contains(&"ts_event".to_string()));
304        assert!(field_names.contains(&"ts_init".to_string()));
305    }
306
307    #[rstest]
308    fn test_betfair_starting_price_schema() {
309        let schema = BetfairStartingPrice::get_schema(None);
310        let field_names: Vec<_> = schema.fields().iter().map(|f| f.name().clone()).collect();
311        assert!(field_names.contains(&"instrument_id".to_string()));
312        assert!(field_names.contains(&"bsp".to_string()));
313        assert!(field_names.contains(&"ts_event".to_string()));
314        assert!(field_names.contains(&"ts_init".to_string()));
315    }
316
317    #[rstest]
318    fn test_betfair_bsp_book_delta_schema() {
319        let schema = BetfairBspBookDelta::get_schema(None);
320        let field_names: Vec<_> = schema.fields().iter().map(|f| f.name().clone()).collect();
321        assert!(field_names.contains(&"instrument_id".to_string()));
322        assert!(field_names.contains(&"action".to_string()));
323        assert!(field_names.contains(&"side".to_string()));
324        assert!(field_names.contains(&"price".to_string()));
325        assert!(field_names.contains(&"size".to_string()));
326        assert!(field_names.contains(&"ts_event".to_string()));
327        assert!(field_names.contains(&"ts_init".to_string()));
328    }
329
330    #[rstest]
331    fn test_betfair_sequence_completed_schema() {
332        let schema = BetfairSequenceCompleted::get_schema(None);
333        let field_names: Vec<_> = schema.fields().iter().map(|f| f.name().clone()).collect();
334        assert!(field_names.contains(&"ts_event".to_string()));
335        assert!(field_names.contains(&"ts_init".to_string()));
336    }
337
338    #[rstest]
339    fn test_betfair_order_voided_schema() {
340        let schema = BetfairOrderVoided::get_schema(None);
341        let field_names: Vec<_> = schema.fields().iter().map(|f| f.name().clone()).collect();
342        assert!(field_names.contains(&"instrument_id".to_string()));
343        assert!(field_names.contains(&"client_order_id".to_string()));
344        assert!(field_names.contains(&"venue_order_id".to_string()));
345        assert!(field_names.contains(&"size_voided".to_string()));
346        assert!(field_names.contains(&"reason".to_string()));
347    }
348
349    #[rstest]
350    fn test_register_betfair_custom_data_is_idempotent() {
351        register_betfair_custom_data();
352        register_betfair_custom_data();
353    }
354
355    #[rstest]
356    fn test_betfair_race_runner_data_schema() {
357        let schema = BetfairRaceRunnerData::get_schema(None);
358        let field_names: Vec<_> = schema.fields().iter().map(|f| f.name().clone()).collect();
359        assert!(field_names.contains(&"race_id".to_string()));
360        assert!(field_names.contains(&"market_id".to_string()));
361        assert!(field_names.contains(&"selection_id".to_string()));
362        assert!(field_names.contains(&"latitude".to_string()));
363        assert!(field_names.contains(&"longitude".to_string()));
364        assert!(field_names.contains(&"speed".to_string()));
365        assert!(field_names.contains(&"progress".to_string()));
366        assert!(field_names.contains(&"stride_frequency".to_string()));
367    }
368
369    #[rstest]
370    fn test_betfair_race_progress_schema() {
371        let schema = BetfairRaceProgress::get_schema(None);
372        let field_names: Vec<_> = schema.fields().iter().map(|f| f.name().clone()).collect();
373        assert!(field_names.contains(&"race_id".to_string()));
374        assert!(field_names.contains(&"market_id".to_string()));
375        assert!(field_names.contains(&"gate_name".to_string()));
376        assert!(field_names.contains(&"sectional_time".to_string()));
377        assert!(field_names.contains(&"running_time".to_string()));
378        assert!(field_names.contains(&"speed".to_string()));
379        assert!(field_names.contains(&"progress".to_string()));
380        assert!(field_names.contains(&"order".to_string()));
381        assert!(field_names.contains(&"jumps".to_string()));
382    }
383
384    #[rstest]
385    fn test_race_runner_data_nan_json_roundtrip() {
386        let data = BetfairRaceRunnerData::new(
387            "28587288.1650".to_string(),
388            "1.1234567".to_string(),
389            7390417,
390            51.4189543,
391            -0.4058491,
392            17.8,
393            f64::NAN,
394            f64::NAN,
395            UnixNanos::from(1_000_000_000u64),
396            UnixNanos::from(1_000_000_000u64),
397        );
398
399        let json = serde_json::to_string(&data).unwrap();
400        assert!(json.contains("\"progress\":null"));
401        assert!(json.contains("\"stride_frequency\":null"));
402        assert!(json.contains("\"latitude\":51.4189543"));
403
404        let parsed: BetfairRaceRunnerData = serde_json::from_str(&json).unwrap();
405        assert!(parsed.progress.is_nan());
406        assert!(parsed.stride_frequency.is_nan());
407        assert_eq!(parsed.latitude, 51.4189543);
408        assert_eq!(parsed.selection_id, 7390417);
409    }
410
411    #[rstest]
412    fn test_betfair_ticker_nan_json_roundtrip() {
413        let ticker = BetfairTicker::new(
414            InstrumentId::from("1.234-56789-0.0.BETFAIR"),
415            1.5,
416            100.0,
417            f64::NAN,
418            f64::NAN,
419            UnixNanos::from(1_000_000_000u64),
420            UnixNanos::from(1_000_000_000u64),
421        );
422
423        let json = serde_json::to_string(&ticker).unwrap();
424        assert!(json.contains("\"starting_price_near\":null"));
425        assert!(json.contains("\"starting_price_far\":null"));
426        assert!(json.contains("\"last_traded_price\":1.5"));
427
428        let parsed: BetfairTicker = serde_json::from_str(&json).unwrap();
429        assert!(parsed.starting_price_near.is_nan());
430        assert!(parsed.starting_price_far.is_nan());
431        assert_eq!(parsed.last_traded_price, 1.5);
432    }
433}