1use std::str::FromStr;
27
28use ahash::AHashMap;
29use nautilus_core::serialization::{deserialize_decimal, deserialize_optional_decimal};
30use rust_decimal::Decimal;
31use serde::{Deserialize, Deserializer, Serialize, de::Visitor};
32use ustr::Ustr;
33
34use crate::common::{
35 consts::{STREAM_OP_AUTHENTICATION, STREAM_OP_HEARTBEAT, STREAM_OP_RACE_SUBSCRIPTION},
36 enums::{
37 ChangeType, LapseStatusReasonCode, MarketBettingType, MarketDataFilterField, MarketStatus,
38 PriceLadderType, RunnerStatus, SegmentType, StatusErrorCode, StreamingOrderStatus,
39 StreamingOrderType, StreamingPersistenceType, StreamingSide,
40 },
41 types::{
42 Handicap, MarketId, SelectionId, deserialize_optional_string_lenient,
43 deserialize_selection_id,
44 },
45};
46
47#[derive(Debug, Clone, Deserialize)]
52#[serde(tag = "op")]
53pub enum StreamMessage {
54 #[serde(rename = "connection")]
55 Connection(Connection),
56 #[serde(rename = "status")]
57 Status(Status),
58 #[serde(rename = "mcm")]
59 MarketChange(MCM),
60 #[serde(rename = "ocm")]
61 OrderChange(OCM),
62 #[serde(rename = "rcm")]
63 RaceChange(RCM),
64}
65
66#[derive(Debug, Clone, Deserialize)]
68#[serde(rename_all = "camelCase")]
69pub struct Connection {
70 pub id: Option<u64>,
71 pub connection_id: String,
72}
73
74#[derive(Debug, Clone, Deserialize)]
76#[serde(rename_all = "camelCase")]
77pub struct Status {
78 pub id: Option<u64>,
79 pub connection_closed: bool,
80 pub connection_id: Option<String>,
81 pub connections_available: Option<u32>,
82 pub error_code: Option<StatusErrorCode>,
83 pub error_message: Option<String>,
84 pub status_code: Option<String>,
85}
86
87#[derive(Debug, Clone, Deserialize)]
89pub struct MCM {
90 pub id: Option<u64>,
91 pub pt: u64,
93 pub clk: Option<String>,
95 #[serde(rename = "initialClk")]
97 pub initial_clk: Option<String>,
98 pub mc: Option<Vec<MarketChange>>,
100 pub ct: Option<ChangeType>,
102 #[serde(rename = "conflateMs")]
104 pub conflate_ms: Option<u64>,
105 #[serde(rename = "heartbeatMs")]
107 pub heartbeat_ms: Option<u64>,
108 #[serde(rename = "segmentType")]
110 pub segment_type: Option<SegmentType>,
111 pub status: Option<i32>,
112}
113
114impl MCM {
115 #[must_use]
116 pub fn is_heartbeat(&self) -> bool {
117 self.ct == Some(ChangeType::Heartbeat)
118 }
119}
120
121#[derive(Debug, Clone, Deserialize)]
123pub struct OCM {
124 pub id: Option<u64>,
125 pub pt: u64,
127 pub clk: Option<String>,
128 #[serde(rename = "initialClk")]
129 pub initial_clk: Option<String>,
130 pub oc: Option<Vec<OrderMarketChange>>,
132 pub ct: Option<ChangeType>,
133 #[serde(rename = "conflateMs")]
134 pub conflate_ms: Option<u64>,
135 #[serde(rename = "heartbeatMs")]
136 pub heartbeat_ms: Option<u64>,
137 #[serde(rename = "segmentType")]
138 pub segment_type: Option<SegmentType>,
139 pub status: Option<i32>,
140}
141
142impl OCM {
143 #[must_use]
144 pub fn is_heartbeat(&self) -> bool {
145 self.ct == Some(ChangeType::Heartbeat)
146 }
147}
148
149#[derive(Debug, Clone, Deserialize)]
151pub struct MarketChange {
152 pub id: MarketId,
154 pub rc: Option<Vec<RunnerChange>>,
156 pub con: Option<bool>,
158 #[serde(default)]
160 pub img: bool,
161 #[serde(rename = "marketDefinition")]
163 pub market_definition: Option<MarketDefinition>,
164 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
166 pub tv: Option<Decimal>,
167}
168
169#[derive(Debug, Clone, Deserialize)]
171pub struct RunnerChange {
172 #[serde(deserialize_with = "deserialize_selection_id")]
174 pub id: SelectionId,
175 pub hc: Option<Handicap>,
177 pub atb: Option<Vec<PV>>,
179 pub atl: Option<Vec<PV>>,
181 pub batb: Option<Vec<LPV>>,
183 pub batl: Option<Vec<LPV>>,
185 pub bdatb: Option<Vec<LPV>>,
187 pub bdatl: Option<Vec<LPV>>,
189 pub spb: Option<Vec<PV>>,
191 pub spl: Option<Vec<PV>>,
193 #[serde(default, deserialize_with = "deserialize_optional_decimal_lenient")]
195 pub spn: Option<Decimal>,
196 #[serde(default, deserialize_with = "deserialize_optional_decimal_lenient")]
198 pub spf: Option<Decimal>,
199 pub trd: Option<Vec<PV>>,
201 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
203 pub ltp: Option<Decimal>,
204 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
206 pub tv: Option<Decimal>,
207}
208
209fn deserialize_optional_decimal_lenient<'de, D>(
210 deserializer: D,
211) -> Result<Option<Decimal>, D::Error>
212where
213 D: Deserializer<'de>,
214{
215 struct LenientOptionalDecimalVisitor;
216
217 impl Visitor<'_> for LenientOptionalDecimalVisitor {
218 type Value = Option<Decimal>;
219
220 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
221 formatter.write_str("null or a decimal number as string, integer, or float")
222 }
223
224 fn visit_str<E: serde::de::Error>(self, value: &str) -> Result<Self::Value, E> {
225 Ok(parse_optional_decimal_lenient(value))
226 }
227
228 fn visit_string<E: serde::de::Error>(self, value: String) -> Result<Self::Value, E> {
229 self.visit_str(&value)
230 }
231
232 fn visit_i64<E: serde::de::Error>(self, value: i64) -> Result<Self::Value, E> {
233 Ok(Some(Decimal::from(value)))
234 }
235
236 fn visit_u64<E: serde::de::Error>(self, value: u64) -> Result<Self::Value, E> {
237 Ok(Some(Decimal::from(value)))
238 }
239
240 fn visit_i128<E: serde::de::Error>(self, value: i128) -> Result<Self::Value, E> {
241 Ok(Some(Decimal::from(value)))
242 }
243
244 fn visit_u128<E: serde::de::Error>(self, value: u128) -> Result<Self::Value, E> {
245 Ok(Some(Decimal::from(value)))
246 }
247
248 fn visit_f64<E: serde::de::Error>(self, value: f64) -> Result<Self::Value, E> {
249 Ok(Decimal::try_from(value).ok())
250 }
251
252 fn visit_unit<E: serde::de::Error>(self) -> Result<Self::Value, E> {
253 Ok(None)
254 }
255
256 fn visit_none<E: serde::de::Error>(self) -> Result<Self::Value, E> {
257 Ok(None)
258 }
259 }
260
261 deserializer.deserialize_any(LenientOptionalDecimalVisitor)
262}
263
264fn parse_optional_decimal_lenient(value: &str) -> Option<Decimal> {
265 let trimmed = value.trim();
266 if trimmed.is_empty() || is_non_finite_decimal(trimmed) {
267 return None;
268 }
269
270 if trimmed.contains('e') || trimmed.contains('E') {
271 Decimal::from_scientific(trimmed).ok()
272 } else {
273 Decimal::from_str(trimmed).ok()
274 }
275}
276
277fn is_non_finite_decimal(value: &str) -> bool {
278 value.eq_ignore_ascii_case("nan")
279 || value.eq_ignore_ascii_case("inf")
280 || value.eq_ignore_ascii_case("+inf")
281 || value.eq_ignore_ascii_case("-inf")
282 || value.eq_ignore_ascii_case("infinity")
283 || value.eq_ignore_ascii_case("+infinity")
284 || value.eq_ignore_ascii_case("-infinity")
285}
286
287#[derive(Debug, Clone, Deserialize)]
289#[serde(rename_all = "camelCase")]
290pub struct MarketDefinition {
291 pub bet_delay: Option<i32>,
292 pub betting_type: Option<MarketBettingType>,
293 pub bsp_market: Option<bool>,
294 pub bsp_reconciled: Option<bool>,
295 pub competition_id: Option<String>,
296 pub competition_name: Option<String>,
297 pub complete: Option<bool>,
298 pub country_code: Option<Ustr>,
299 pub cross_matching: Option<bool>,
300 pub discount_allowed: Option<bool>,
301 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
302 pub each_way_divisor: Option<Decimal>,
303 pub event_id: Option<String>,
304 pub event_name: Option<String>,
305 #[serde(default, deserialize_with = "deserialize_optional_string_lenient")]
306 pub event_type_id: Option<String>,
307 pub event_type_name: Option<Ustr>,
308 pub in_play: Option<bool>,
309 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
310 pub line_interval: Option<Decimal>,
311 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
312 pub line_max_unit: Option<Decimal>,
313 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
314 pub line_min_unit: Option<Decimal>,
315 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
316 pub market_base_rate: Option<Decimal>,
317 pub market_id: Option<MarketId>,
318 pub market_name: Option<String>,
319 pub market_time: Option<String>,
320 pub market_type: Option<Ustr>,
321 pub number_of_active_runners: Option<u32>,
322 pub number_of_winners: Option<u32>,
323 pub open_date: Option<String>,
324 pub persistence_enabled: Option<bool>,
325 pub price_ladder_definition: Option<PriceLadderDefinition>,
326 pub race_type: Option<Ustr>,
327 pub regulators: Option<Vec<Ustr>>,
328 pub runners: Option<Vec<RunnerDefinition>>,
329 pub runners_voidable: Option<bool>,
330 pub settled_time: Option<String>,
331 pub status: Option<MarketStatus>,
332 pub suspend_time: Option<String>,
333 pub timezone: Option<Ustr>,
334 pub turn_in_play_enabled: Option<bool>,
335 pub venue: Option<Ustr>,
336 pub version: Option<u64>,
337}
338
339#[derive(Debug, Clone, Deserialize)]
341#[serde(rename_all = "camelCase")]
342pub struct RunnerDefinition {
343 #[serde(deserialize_with = "deserialize_selection_id")]
344 pub id: SelectionId,
345 pub hc: Option<Handicap>,
346 pub sort_priority: Option<u32>,
347 pub name: Option<String>,
348 pub status: Option<RunnerStatus>,
349 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
350 pub adjustment_factor: Option<Decimal>,
351 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
352 pub bsp: Option<Decimal>,
353 pub removal_date: Option<String>,
354}
355
356#[derive(Debug, Clone, Deserialize)]
358pub struct PriceLadderDefinition {
359 #[serde(rename = "type")]
360 pub ladder_type: Option<PriceLadderType>,
361}
362
363#[derive(Debug, Clone, Copy, PartialEq)]
368pub struct PV {
369 pub price: Decimal,
370 pub volume: Decimal,
371}
372
373impl<'de> Deserialize<'de> for PV {
374 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
375 where
376 D: serde::Deserializer<'de>,
377 {
378 let arr: Vec<Decimal> = Deserialize::deserialize(deserializer)?;
380 match arr.len() {
381 2 => Ok(Self {
382 price: arr[0],
383 volume: arr[1],
384 }),
385 3 => Ok(Self {
386 price: arr[1],
387 volume: arr[2],
388 }),
389 n => Err(serde::de::Error::invalid_length(n, &"2 or 3 elements")),
390 }
391 }
392}
393
394impl Serialize for PV {
395 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
396 where
397 S: serde::Serializer,
398 {
399 (self.price, self.volume).serialize(serializer)
400 }
401}
402
403#[derive(Debug, Clone, Copy, PartialEq)]
405pub struct LPV {
406 pub level: u32,
407 pub price: Decimal,
408 pub volume: Decimal,
409}
410
411impl<'de> Deserialize<'de> for LPV {
412 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
413 where
414 D: serde::Deserializer<'de>,
415 {
416 let arr: (u32, Decimal, Decimal) = Deserialize::deserialize(deserializer)?;
417 Ok(Self {
418 level: arr.0,
419 price: arr.1,
420 volume: arr.2,
421 })
422 }
423}
424
425impl Serialize for LPV {
426 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
427 where
428 S: serde::Serializer,
429 {
430 (self.level, self.price, self.volume).serialize(serializer)
431 }
432}
433
434#[derive(Debug, Clone, Deserialize)]
436pub struct OrderMarketChange {
437 pub id: MarketId,
439 #[serde(rename = "accountId")]
440 pub account_id: Option<u64>,
441 pub closed: Option<bool>,
442 #[serde(rename = "fullImage", default)]
443 pub full_image: bool,
444 pub orc: Option<Vec<OrderRunnerChange>>,
446}
447
448#[derive(Debug, Clone, Deserialize)]
450pub struct OrderRunnerChange {
451 #[serde(deserialize_with = "deserialize_selection_id")]
453 pub id: SelectionId,
454 #[serde(rename = "fullImage", default)]
455 pub full_image: bool,
456 pub hc: Option<Handicap>,
458 pub mb: Option<Vec<MatchedOrder>>,
460 pub ml: Option<Vec<MatchedOrder>>,
462 pub smc: Option<AHashMap<String, StrategyMatchChange>>,
464 pub uo: Option<Vec<UnmatchedOrder>>,
466}
467
468#[derive(Debug, Clone, Copy, PartialEq)]
470pub struct MatchedOrder {
471 pub price: Decimal,
472 pub size: Decimal,
473}
474
475impl<'de> Deserialize<'de> for MatchedOrder {
476 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
477 where
478 D: serde::Deserializer<'de>,
479 {
480 let arr: (Decimal, Decimal) = Deserialize::deserialize(deserializer)?;
481 Ok(Self {
482 price: arr.0,
483 size: arr.1,
484 })
485 }
486}
487
488impl Serialize for MatchedOrder {
489 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
490 where
491 S: serde::Serializer,
492 {
493 (self.price, self.size).serialize(serializer)
494 }
495}
496
497#[derive(Debug, Clone, Deserialize)]
499pub struct StrategyMatchChange {
500 pub mb: Option<Vec<MatchedOrder>>,
502 pub ml: Option<Vec<MatchedOrder>>,
504}
505
506#[derive(Debug, Clone, Deserialize)]
508pub struct UnmatchedOrder {
509 pub id: String,
511 #[serde(deserialize_with = "deserialize_decimal")]
513 pub p: Decimal,
514 #[serde(deserialize_with = "deserialize_decimal")]
516 pub s: Decimal,
517 pub side: StreamingSide,
519 pub status: StreamingOrderStatus,
521 #[serde(default)]
525 pub pt: Option<StreamingPersistenceType>,
526 pub ot: StreamingOrderType,
528 pub pd: u64,
530 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
532 pub bsp: Option<Decimal>,
533 pub rfo: Option<String>,
535 pub rfs: Option<String>,
537 pub rc: Option<String>,
539 pub rac: Option<String>,
541 pub md: Option<u64>,
543 pub cd: Option<u64>,
545 pub ld: Option<u64>,
547 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
549 pub avp: Option<Decimal>,
550 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
552 pub sm: Option<Decimal>,
553 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
555 pub sr: Option<Decimal>,
556 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
558 pub sl: Option<Decimal>,
559 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
561 pub sc: Option<Decimal>,
562 #[serde(default, deserialize_with = "deserialize_optional_decimal")]
564 pub sv: Option<Decimal>,
565 pub lsrc: Option<LapseStatusReasonCode>,
567}
568
569#[derive(Debug, Clone, Serialize)]
571pub struct Authentication {
572 pub op: String,
573 pub id: Option<u64>,
574 #[serde(rename = "appKey")]
575 pub app_key: String,
576 pub session: String,
577}
578
579impl Authentication {
580 #[must_use]
582 pub fn new(app_key: String, session: String) -> Self {
583 Self {
584 op: STREAM_OP_AUTHENTICATION.to_string(),
585 id: None,
586 app_key,
587 session,
588 }
589 }
590}
591
592#[derive(Debug, Clone, Serialize)]
594#[serde(rename_all = "camelCase")]
595pub struct MarketSubscription {
596 pub op: String,
597 pub id: Option<u64>,
598 pub market_filter: StreamMarketFilter,
599 pub market_data_filter: MarketDataFilter,
600 #[serde(skip_serializing_if = "Option::is_none")]
601 pub clk: Option<String>,
602 #[serde(skip_serializing_if = "Option::is_none")]
603 pub conflate_ms: Option<u64>,
604 #[serde(skip_serializing_if = "Option::is_none")]
605 pub heartbeat_ms: Option<u64>,
606 #[serde(skip_serializing_if = "Option::is_none")]
607 pub initial_clk: Option<String>,
608 #[serde(skip_serializing_if = "Option::is_none")]
609 pub segmentation_enabled: Option<bool>,
610}
611
612#[derive(Debug, Clone, Serialize)]
614#[serde(rename_all = "camelCase")]
615pub struct OrderSubscription {
616 pub op: String,
617 pub id: Option<u64>,
618 #[serde(skip_serializing_if = "Option::is_none")]
619 pub order_filter: Option<OrderFilter>,
620 #[serde(skip_serializing_if = "Option::is_none")]
621 pub clk: Option<String>,
622 #[serde(skip_serializing_if = "Option::is_none")]
623 pub conflate_ms: Option<u64>,
624 #[serde(skip_serializing_if = "Option::is_none")]
625 pub heartbeat_ms: Option<u64>,
626 #[serde(skip_serializing_if = "Option::is_none")]
627 pub initial_clk: Option<String>,
628 #[serde(skip_serializing_if = "Option::is_none")]
629 pub segmentation_enabled: Option<bool>,
630}
631
632#[derive(Debug, Clone, Serialize)]
634#[serde(rename_all = "camelCase")]
635pub struct RaceSubscription {
636 pub op: String,
637 pub id: Option<u64>,
638}
639
640impl RaceSubscription {
641 #[must_use]
642 pub fn new(id: u64) -> Self {
643 Self {
644 op: STREAM_OP_RACE_SUBSCRIPTION.to_string(),
645 id: Some(id),
646 }
647 }
648}
649
650#[derive(Debug, Clone, Serialize)]
652pub struct StreamHeartbeat {
653 pub op: String,
654 pub id: Option<u64>,
655}
656
657impl StreamHeartbeat {
658 #[must_use]
659 pub fn new() -> Self {
660 Self {
661 op: STREAM_OP_HEARTBEAT.to_string(),
662 id: None,
663 }
664 }
665}
666
667impl Default for StreamHeartbeat {
668 fn default() -> Self {
669 Self::new()
670 }
671}
672
673#[derive(Debug, Clone, Default, Serialize, Deserialize)]
675#[serde(rename_all = "camelCase")]
676pub struct StreamMarketFilter {
677 #[serde(skip_serializing_if = "Option::is_none")]
678 pub betting_types: Option<Vec<MarketBettingType>>,
679 #[serde(skip_serializing_if = "Option::is_none")]
680 pub bsp_market: Option<bool>,
681 #[serde(skip_serializing_if = "Option::is_none")]
682 pub country_codes: Option<Vec<Ustr>>,
683 #[serde(skip_serializing_if = "Option::is_none")]
684 pub event_ids: Option<Vec<String>>,
685 #[serde(skip_serializing_if = "Option::is_none")]
686 pub event_type_ids: Option<Vec<String>>,
687 #[serde(skip_serializing_if = "Option::is_none")]
688 pub market_ids: Option<Vec<MarketId>>,
689 #[serde(skip_serializing_if = "Option::is_none")]
690 pub market_types: Option<Vec<Ustr>>,
691 #[serde(skip_serializing_if = "Option::is_none")]
692 pub race_types: Option<Vec<Ustr>>,
693 #[serde(skip_serializing_if = "Option::is_none")]
694 pub turn_in_play_enabled: Option<bool>,
695 #[serde(skip_serializing_if = "Option::is_none")]
696 pub venues: Option<Vec<Ustr>>,
697}
698
699#[derive(Debug, Clone, Default, Serialize, Deserialize)]
701#[serde(rename_all = "camelCase")]
702pub struct MarketDataFilter {
703 #[serde(skip_serializing_if = "Option::is_none")]
704 pub fields: Option<Vec<MarketDataFilterField>>,
705 #[serde(skip_serializing_if = "Option::is_none")]
706 pub ladder_levels: Option<u32>,
707}
708
709#[derive(Debug, Clone, Serialize, Deserialize)]
711#[serde(rename_all = "camelCase")]
712pub struct OrderFilter {
713 #[serde(default = "default_true")]
714 pub include_overall_position: bool,
715 #[serde(skip_serializing_if = "Option::is_none")]
716 pub customer_strategy_refs: Option<Vec<String>>,
717 #[serde(default)]
718 pub partition_matched_by_strategy_ref: bool,
719 #[serde(skip_serializing_if = "Option::is_none")]
720 pub account_ids: Option<Vec<u64>>,
721}
722
723impl Default for OrderFilter {
724 fn default() -> Self {
725 Self {
726 include_overall_position: true,
727 customer_strategy_refs: None,
728 partition_matched_by_strategy_ref: false,
729 account_ids: None,
730 }
731 }
732}
733
734fn default_true() -> bool {
735 true
736}
737
738#[derive(Debug, Clone, Deserialize)]
740pub struct RCM {
741 pub id: Option<u64>,
742 pub pt: u64,
744 pub clk: Option<serde_json::Value>,
746 pub rc: Option<Vec<RaceChange>>,
748}
749
750#[derive(Debug, Clone, Deserialize)]
752pub struct RaceChange {
753 pub id: Option<String>,
755 pub mid: Option<String>,
757 pub rrc: Option<Vec<RaceRunnerChange>>,
759 pub rpc: Option<RaceProgressChange>,
761}
762
763#[derive(Debug, Clone, Deserialize)]
765pub struct RaceRunnerChange {
766 pub ft: Option<u64>,
768 pub id: Option<i64>,
770 pub lat: Option<f64>,
772 #[serde(rename = "long")]
774 pub lng: Option<f64>,
775 pub spd: Option<f64>,
777 pub prg: Option<f64>,
779 pub sfq: Option<f64>,
781}
782
783#[derive(Debug, Clone, Deserialize)]
785pub struct RaceProgressChange {
786 pub ft: Option<u64>,
788 pub g: Option<String>,
790 pub st: Option<f64>,
792 pub rt: Option<f64>,
794 pub spd: Option<f64>,
796 pub prg: Option<f64>,
798 pub ord: Option<Vec<i64>>,
800 #[serde(rename = "J")]
802 pub jumps: Option<Vec<Jump>>,
803}
804
805#[derive(Debug, Clone, Serialize, Deserialize)]
807pub struct Jump {
808 #[serde(rename = "J")]
810 pub number: i32,
811 #[serde(rename = "L")]
813 pub distance: f64,
814}
815
816pub fn stream_decode(data: &[u8]) -> Result<StreamMessage, serde_json::Error> {
822 serde_json::from_slice(data)
823}
824
825#[cfg(test)]
826mod tests {
827 use rstest::rstest;
828
829 use super::*;
830 use crate::common::testing::load_test_json;
831
832 #[rstest]
833 #[case("stream/ocm_NEW_FULL_IMAGE.json")]
834 #[case("stream/ocm_FILLED.json")]
835 #[case("stream/ocm_FULL_IMAGE.json")]
836 #[case("stream/ocm_FULL_IMAGE_STRATEGY.json")]
837 #[case("stream/ocm_CANCEL.json")]
838 #[case("stream/ocm_UPDATE.json")]
839 #[case("stream/ocm_SUB_IMAGE.json")]
840 #[case("stream/ocm_MIXED.json")]
841 #[case("stream/ocm_EMPTY_IMAGE.json")]
842 #[case("stream/ocm_error_fill.json")]
843 #[case("stream/ocm_filled_different_price.json")]
844 #[case("stream/ocm_order_update.json")]
845 fn test_stream_decode_ocm_fixtures(#[case] fixture: &str) {
846 let data = load_test_json(fixture);
847 let msg = stream_decode(data.as_bytes()).unwrap_or_else(|e| panic!("{fixture}: {e}"));
848 assert!(matches!(msg, StreamMessage::OrderChange(_)), "{fixture}");
849 }
850
851 #[rstest]
852 #[case("stream/mcm_SUB_IMAGE.json")]
853 #[case("stream/mcm_SUB_IMAGE_no_market_def.json")]
854 #[case("stream/mcm_UPDATE.json")]
855 #[case("stream/mcm_UPDATE_md.json")]
856 #[case("stream/mcm_UPDATE_tv.json")]
857 #[case("stream/mcm_HEARTBEAT.json")]
858 #[case("stream/mcm_RESUB_DELTA.json")]
859 #[case("stream/mcm_live_IMAGE.json")]
860 #[case("stream/mcm_live_UPDATE.json")]
861 #[case("stream/mcm_latency.json")]
862 #[case("stream/market_definition_racing.json")]
863 #[case("stream/market_definition_runner_removed.json")]
864 fn test_stream_decode_mcm_fixtures(#[case] fixture: &str) {
865 let data = load_test_json(fixture);
866 let msg = stream_decode(data.as_bytes()).unwrap_or_else(|e| panic!("{fixture}: {e}"));
867 assert!(matches!(msg, StreamMessage::MarketChange(_)), "{fixture}");
868 }
869
870 #[rstest]
872 #[case("stream/mcm_BSP.json")]
873 #[case("stream/market_updates.json")]
874 fn test_stream_decode_mcm_multi_fixtures(#[case] fixture: &str) {
875 let data = load_test_json(fixture);
876 let msgs: Vec<StreamMessage> =
877 serde_json::from_str(&data).unwrap_or_else(|e| panic!("{fixture}: {e}"));
878 assert!(!msgs.is_empty(), "{fixture}: empty array");
879 for msg in &msgs {
880 assert!(matches!(msg, StreamMessage::MarketChange(_)), "{fixture}");
881 }
882 }
883
884 #[rstest]
886 #[case("stream/ocm_multiple_fills.json")]
887 #[case("stream/ocm_DUPLICATE_EXECUTION.json")]
888 fn test_stream_decode_ocm_multi_fixtures(#[case] fixture: &str) {
889 let data = load_test_json(fixture);
890 let msgs: Vec<StreamMessage> =
891 serde_json::from_str(&data).unwrap_or_else(|e| panic!("{fixture}: {e}"));
892 assert!(!msgs.is_empty(), "{fixture}: empty array");
893 for msg in &msgs {
894 assert!(matches!(msg, StreamMessage::OrderChange(_)), "{fixture}");
895 }
896 }
897
898 #[rstest]
899 fn test_stream_decode_connection() {
900 let data = load_test_json("stream/connection.json");
901 let msg = stream_decode(data.as_bytes()).unwrap();
902 match msg {
903 StreamMessage::Connection(conn) => {
904 assert_eq!(conn.connection_id, "002-051134157842-432409");
905 }
906 other => panic!("Expected Connection, was {other:?}"),
907 }
908 }
909
910 #[rstest]
911 fn test_stream_decode_status() {
912 let data = load_test_json("stream/status.json");
913 let msg = stream_decode(data.as_bytes()).unwrap();
914 assert!(matches!(msg, StreamMessage::Status(_)));
915 }
916
917 #[rstest]
918 fn test_stream_decode_lenient_sp_fields() {
919 let data = r#"{
920 "op":"mcm",
921 "pt":1773304044929,
922 "mc":[{
923 "id":"1.255095842",
924 "rc":[{
925 "id":96146807,
926 "spn":"Infinity",
927 "spf":"NaN",
928 "ltp":5.0,
929 "tv":10.63
930 }]
931 }]
932 }"#;
933
934 let msg = stream_decode(data.as_bytes()).unwrap();
935
936 match msg {
937 StreamMessage::MarketChange(mcm) => {
938 let rc = &mcm.mc.as_ref().unwrap()[0].rc.as_ref().unwrap()[0];
939 assert_eq!(rc.spn, None);
940 assert_eq!(rc.spf, None);
941 assert_eq!(rc.ltp, Some(Decimal::new(50, 1)));
942 assert_eq!(rc.tv, Some(Decimal::new(1063, 2)));
943 }
944 other => panic!("Expected MarketChange, was {other:?}"),
945 }
946 }
947
948 #[rstest]
949 fn test_market_definition_standalone() {
950 let data = load_test_json("stream/market_definition.json");
951 let _def: MarketDefinition = serde_json::from_str(&data).unwrap();
952 }
953
954 #[rstest]
955 #[case("rest/market_definition_open.json")]
956 #[case("rest/market_definition_closed.json")]
957 #[case("rest/market_definition_runner_removed.json")]
958 fn test_market_definition_response_fixtures(#[case] fixture: &str) {
959 let data = load_test_json(fixture);
960 let _def: MarketDefinition = serde_json::from_str(&data).unwrap();
961 }
962
963 #[rstest]
964 fn test_stream_decode_rcm_single() {
965 let data = load_test_json("stream/rcm_single.json");
966 let msg = stream_decode(data.as_bytes()).unwrap();
967 match msg {
968 StreamMessage::RaceChange(rcm) => {
969 let rc = rcm.rc.as_ref().unwrap();
970 assert_eq!(rc.len(), 1);
971
972 let race = &rc[0];
973 assert_eq!(race.id.as_deref(), Some("28587288.1650"));
974 assert_eq!(race.mid.as_deref(), Some("1.1234567"));
975
976 let runners = race.rrc.as_ref().unwrap();
977 assert_eq!(runners.len(), 1);
978 assert_eq!(runners[0].id, Some(7390417));
979 assert!((runners[0].lat.unwrap() - 51.4189543).abs() < 1e-6);
980 assert!((runners[0].spd.unwrap() - 17.8).abs() < 1e-6);
981 assert!((runners[0].sfq.unwrap() - 2.07).abs() < 1e-6);
982
983 let progress = race.rpc.as_ref().unwrap();
984 assert_eq!(progress.g.as_deref(), Some("1f"));
985 assert!((progress.st.unwrap() - 10.6).abs() < 1e-6);
986 assert!((progress.rt.unwrap() - 46.7).abs() < 1e-6);
987
988 let order = progress.ord.as_ref().unwrap();
989 assert_eq!(order.len(), 5);
990 assert_eq!(order[0], 7390417);
991
992 let jumps = progress.jumps.as_ref().unwrap();
993 assert_eq!(jumps.len(), 2);
994 assert_eq!(jumps[0].number, 2);
995 assert!((jumps[0].distance - 370.1).abs() < 1e-6);
996 }
997 other => panic!("Expected RaceChange, was {other:?}"),
998 }
999 }
1000
1001 #[rstest]
1002 fn test_stream_decode_rcm_multi_runner() {
1003 let data = load_test_json("stream/rcm_multi_runner.json");
1004 let msg = stream_decode(data.as_bytes()).unwrap();
1005 match msg {
1006 StreamMessage::RaceChange(rcm) => {
1007 let rc = rcm.rc.as_ref().unwrap();
1008 let runners = rc[0].rrc.as_ref().unwrap();
1009 assert_eq!(runners.len(), 5);
1010
1011 let ids: Vec<i64> = runners.iter().filter_map(|r| r.id).collect();
1012 assert_eq!(ids, vec![35467839, 24947967, 299569, 31422647, 41694785]);
1013 }
1014 other => panic!("Expected RaceChange, was {other:?}"),
1015 }
1016 }
1017
1018 #[rstest]
1019 fn test_stream_decode_ocm_voided() {
1020 let data = load_test_json("stream/ocm_VOIDED.json");
1021 let msg = stream_decode(data.as_bytes()).unwrap();
1022 match msg {
1023 StreamMessage::OrderChange(ocm) => {
1024 let oc = ocm.oc.as_ref().unwrap();
1025 let orc = oc[0].orc.as_ref().unwrap();
1026 let uo = &orc[0].uo.as_ref().unwrap()[0];
1027 assert_eq!(uo.sv.unwrap(), rust_decimal::Decimal::from(50));
1028 assert_eq!(uo.sm.unwrap(), rust_decimal::Decimal::from(50));
1029 assert_eq!(uo.s, rust_decimal::Decimal::from(100));
1030 }
1031 other => panic!("Expected OrderChange, was {other:?}"),
1032 }
1033 }
1034
1035 #[rstest]
1036 fn test_stream_decode_ocm_missing_persistence_type_for_market_on_close() {
1037 let data = r#"{
1038 "op":"ocm",
1039 "id":1,
1040 "pt":1775175455685,
1041 "clk":"clk-1",
1042 "oc":[{
1043 "id":"1.256134154",
1044 "orc":[{
1045 "id":77465280,
1046 "uo":[{
1047 "id":"424009603606",
1048 "p":1.01,
1049 "s":2.00,
1050 "side":"B",
1051 "status":"E",
1052 "ot":"MOC",
1053 "pd":1775175455000,
1054 "sr":2.00
1055 }]
1056 }]
1057 }]
1058 }"#;
1059
1060 let msg = stream_decode(data.as_bytes()).unwrap();
1061
1062 match msg {
1063 StreamMessage::OrderChange(ocm) => {
1064 let oc = ocm.oc.as_ref().unwrap();
1065 let orc = oc[0].orc.as_ref().unwrap();
1066 let uo = &orc[0].uo.as_ref().unwrap()[0];
1067 assert_eq!(uo.pt, None);
1068 assert_eq!(
1069 uo.ot,
1070 crate::common::enums::StreamingOrderType::MarketOnClose
1071 );
1072 }
1073 other => panic!("Expected OrderChange, was {other:?}"),
1074 }
1075 }
1076}