Skip to main content

nautilus_binance/spot/sbe/stream/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance SBE market data stream decoders (schema 1:0).
17//!
18//! These decoders are hand-written for the 4 market data stream message types:
19//! - [`TradesStreamEvent`] - Real-time trade data
20//! - [`BestBidAskStreamEvent`] - Best bid/ask (BBO) updates
21//! - [`DepthSnapshotStreamEvent`] - Order book snapshots (top N levels)
22//! - [`DepthDiffStreamEvent`] - Incremental order book updates
23//!
24//! All decoders return `Result<T, StreamDecodeError>` to safely handle malformed
25//! or truncated network data without panicking.
26
27use std::{error::Error, fmt::Display};
28
29/// Re-exported generic varString/group decoders shared across SBE adapters.
30pub use nautilus_serialization::sbe::{GroupSize16Encoding, GroupSizeEncoding, decode_var_string8};
31
32use crate::spot::sbe::{cursor::SbeCursor, error::SbeDecodeError};
33
34mod best_bid_ask;
35mod depth_diff;
36mod depth_snapshot;
37mod trades;
38
39pub use best_bid_ask::BestBidAskStreamEvent;
40pub use depth_diff::DepthDiffStreamEvent;
41pub use depth_snapshot::DepthSnapshotStreamEvent;
42pub use trades::{Trade, TradesStreamEvent};
43
44/// Stream schema ID (from stream_1_0.xml).
45pub const STREAM_SCHEMA_ID: u16 = 1;
46
47/// Stream schema version.
48pub const STREAM_SCHEMA_VERSION: u16 = 0;
49
50/// Maximum allowed group size to prevent OOM from malicious payloads.
51/// Binance depth streams typically have at most 5000 levels.
52pub const MAX_GROUP_SIZE: usize = 10_000;
53
54/// Message template IDs for stream events.
55pub mod template_id {
56    pub const TRADES_STREAM_EVENT: u16 = 10000;
57    pub const BEST_BID_ASK_STREAM_EVENT: u16 = 10001;
58    pub const DEPTH_SNAPSHOT_STREAM_EVENT: u16 = 10002;
59    pub const DEPTH_DIFF_STREAM_EVENT: u16 = 10003;
60}
61
62/// Stream decode error.
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum StreamDecodeError {
65    /// Buffer too short to decode expected data.
66    BufferTooShort { expected: usize, actual: usize },
67    /// Group count exceeds safety limit.
68    GroupSizeTooLarge { count: usize, max: usize },
69    /// Invalid UTF-8 in symbol string.
70    InvalidUtf8,
71    /// Invalid enum discriminant in the payload.
72    InvalidEnumValue { type_name: &'static str, value: u16 },
73    /// Numeric value cannot fit the target type.
74    NumericOverflow { type_name: &'static str },
75    /// Encoded field value is invalid.
76    InvalidValue { field: &'static str },
77    /// Schema ID mismatch.
78    SchemaMismatch { expected: u16, actual: u16 },
79    /// Unknown template ID.
80    UnknownTemplateId(u16),
81    /// Invalid fixed block length.
82    InvalidBlockLength { expected: u16, actual: u16 },
83}
84
85impl Display for StreamDecodeError {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        match self {
88            Self::BufferTooShort { expected, actual } => {
89                write!(
90                    f,
91                    "Buffer too short: expected {expected} bytes, was {actual}"
92                )
93            }
94            Self::GroupSizeTooLarge { count, max } => {
95                write!(f, "Group size {count} exceeds maximum {max}")
96            }
97            Self::InvalidUtf8 => write!(f, "Invalid UTF-8 in symbol"),
98            Self::InvalidEnumValue { type_name, value } => {
99                write!(f, "Invalid enum value {value} for {type_name}")
100            }
101            Self::NumericOverflow { type_name } => {
102                write!(f, "Numeric value overflows target type {type_name}")
103            }
104            Self::InvalidValue { field } => write!(f, "Invalid value for {field}"),
105            Self::SchemaMismatch { expected, actual } => {
106                write!(f, "Schema mismatch: expected {expected}, was {actual}")
107            }
108            Self::UnknownTemplateId(id) => write!(f, "Unknown template ID: {id}"),
109            Self::InvalidBlockLength { expected, actual } => {
110                write!(f, "Invalid block length: expected {expected}, was {actual}")
111            }
112        }
113    }
114}
115
116impl Error for StreamDecodeError {}
117
118impl From<SbeDecodeError> for StreamDecodeError {
119    fn from(err: SbeDecodeError) -> Self {
120        match err {
121            SbeDecodeError::BufferTooShort { expected, actual } => {
122                Self::BufferTooShort { expected, actual }
123            }
124            SbeDecodeError::SchemaMismatch { expected, actual } => {
125                Self::SchemaMismatch { expected, actual }
126            }
127            SbeDecodeError::VersionMismatch { .. } => Self::SchemaMismatch {
128                expected: STREAM_SCHEMA_VERSION,
129                actual: 0,
130            },
131            SbeDecodeError::UnknownTemplateId(id) => Self::UnknownTemplateId(id),
132            SbeDecodeError::GroupSizeTooLarge { count, max } => Self::GroupSizeTooLarge {
133                count: count as usize,
134                max: max as usize,
135            },
136            SbeDecodeError::InvalidBlockLength { expected, actual } => {
137                Self::InvalidBlockLength { expected, actual }
138            }
139            SbeDecodeError::InvalidUtf8 => Self::InvalidUtf8,
140            SbeDecodeError::InvalidEnumValue { type_name, value } => {
141                Self::InvalidEnumValue { type_name, value }
142            }
143            SbeDecodeError::NumericOverflow { type_name } => Self::NumericOverflow { type_name },
144            SbeDecodeError::InvalidValue { field } => Self::InvalidValue { field },
145        }
146    }
147}
148
149/// SBE message header (8 bytes).
150#[derive(Debug, Clone, Copy)]
151pub struct MessageHeader {
152    pub block_length: u16,
153    pub template_id: u16,
154    pub schema_id: u16,
155    pub version: u16,
156}
157
158impl MessageHeader {
159    pub const ENCODED_LENGTH: usize = 8;
160
161    /// Decode message header from buffer.
162    ///
163    /// # Errors
164    ///
165    /// Returns error if buffer is less than 8 bytes.
166    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
167        if buf.len() < Self::ENCODED_LENGTH {
168            return Err(StreamDecodeError::BufferTooShort {
169                expected: Self::ENCODED_LENGTH,
170                actual: buf.len(),
171            });
172        }
173        Ok(Self {
174            block_length: u16::from_le_bytes([buf[0], buf[1]]),
175            template_id: u16::from_le_bytes([buf[2], buf[3]]),
176            schema_id: u16::from_le_bytes([buf[4], buf[5]]),
177            version: u16::from_le_bytes([buf[6], buf[7]]),
178        })
179    }
180
181    /// Validate schema ID matches expected stream schema.
182    ///
183    /// # Errors
184    ///
185    /// Returns `SchemaMismatch` if the schema ID does not match [`STREAM_SCHEMA_ID`].
186    pub fn validate_schema(&self) -> Result<(), StreamDecodeError> {
187        if self.schema_id != STREAM_SCHEMA_ID {
188            return Err(StreamDecodeError::SchemaMismatch {
189                expected: STREAM_SCHEMA_ID,
190                actual: self.schema_id,
191            });
192        }
193        Ok(())
194    }
195}
196
197/// Price/quantity level in order book.
198#[derive(Debug, Clone, Copy)]
199pub struct PriceLevel {
200    /// Price mantissa (multiply by 10^exponent to get actual price).
201    pub price_mantissa: i64,
202    /// Quantity mantissa (multiply by 10^exponent to get actual quantity).
203    pub qty_mantissa: i64,
204}
205
206impl PriceLevel {
207    pub const ENCODED_LENGTH: usize = 16;
208
209    /// Decode price level from cursor.
210    ///
211    /// # Errors
212    ///
213    /// Returns error if buffer is too short.
214    pub fn decode(cursor: &mut SbeCursor<'_>) -> Result<Self, SbeDecodeError> {
215        Ok(Self {
216            price_mantissa: cursor.read_i64_le()?,
217            qty_mantissa: cursor.read_i64_le()?,
218        })
219    }
220}
221
222/// Convert mantissa and exponent to f64.
223#[inline]
224#[must_use]
225pub fn mantissa_to_f64(mantissa: i64, exponent: i8) -> f64 {
226    mantissa as f64 * 10_f64.powi(exponent as i32)
227}
228
229#[cfg(test)]
230mod tests {
231    use rstest::rstest;
232
233    use super::*;
234    use crate::spot::sbe::error::SbeDecodeError;
235
236    #[rstest]
237    fn test_mantissa_to_f64() {
238        assert!((mantissa_to_f64(12345, -2) - 123.45).abs() < 1e-10);
239        assert!((mantissa_to_f64(100, 0) - 100.0).abs() < 1e-10);
240        assert!((mantissa_to_f64(5, 3) - 5000.0).abs() < 1e-10);
241    }
242
243    #[rstest]
244    fn test_message_header_too_short() {
245        let buf = [0u8; 7];
246        let err = MessageHeader::decode(&buf).unwrap_err();
247        assert_eq!(
248            err,
249            StreamDecodeError::BufferTooShort {
250                expected: 8,
251                actual: 7
252            }
253        );
254    }
255
256    #[rstest]
257    fn test_group_size_too_large() {
258        // Craft a buffer with num_in_group = MAX_GROUP_SIZE + 1
259        let mut buf = [0u8; 6];
260        let count = (MAX_GROUP_SIZE + 1) as u32;
261        buf[2..6].copy_from_slice(&count.to_le_bytes());
262
263        let err = GroupSizeEncoding::decode(&buf).unwrap_err();
264        assert!(matches!(err, SbeDecodeError::GroupSizeTooLarge { .. }));
265    }
266
267    #[rstest]
268    fn test_decode_var_string8_empty_buffer() {
269        let err = decode_var_string8(&[]).unwrap_err();
270        assert!(matches!(err, SbeDecodeError::BufferTooShort { .. }));
271    }
272
273    #[rstest]
274    fn test_decode_var_string8_truncated() {
275        // Length says 10 bytes, but only 5 available
276        let buf = [10u8, b'H', b'E', b'L', b'L'];
277        let err = decode_var_string8(&buf).unwrap_err();
278        assert!(matches!(err, SbeDecodeError::BufferTooShort { .. }));
279    }
280
281    #[rstest]
282    fn test_decode_var_string8_valid() {
283        let buf = [5u8, b'H', b'E', b'L', b'L', b'O'];
284        let (s, consumed) = decode_var_string8(&buf).unwrap();
285        assert_eq!(s, "HELLO");
286        assert_eq!(consumed, 6);
287    }
288
289    #[rstest]
290    fn test_schema_validation() {
291        let header = MessageHeader {
292            block_length: 50,
293            template_id: 10001,
294            schema_id: 99, // Wrong schema
295            version: 0,
296        };
297        let err = header.validate_schema().unwrap_err();
298        assert_eq!(
299            err,
300            StreamDecodeError::SchemaMismatch {
301                expected: STREAM_SCHEMA_ID,
302                actual: 99
303            }
304        );
305    }
306
307    #[rstest]
308    fn test_decode_error_conversion_preserves_new_variants() {
309        assert_eq!(
310            StreamDecodeError::from(SbeDecodeError::InvalidEnumValue {
311                type_name: "AggressorSide",
312                value: 99,
313            }),
314            StreamDecodeError::InvalidEnumValue {
315                type_name: "AggressorSide",
316                value: 99,
317            }
318        );
319        assert_eq!(
320            StreamDecodeError::from(SbeDecodeError::NumericOverflow { type_name: "Price" }),
321            StreamDecodeError::NumericOverflow { type_name: "Price" }
322        );
323        assert_eq!(
324            StreamDecodeError::from(SbeDecodeError::InvalidValue {
325                field: "Price.precision",
326            }),
327            StreamDecodeError::InvalidValue {
328                field: "Price.precision",
329            }
330        );
331    }
332}