nautilus_binance/spot/sbe/stream/
mod.rs1use std::{error::Error, fmt::Display};
28
29pub use nautilus_serialization::sbe::{GroupSize16Encoding, GroupSizeEncoding, decode_var_string8};
31
32use crate::spot::sbe::{cursor::SbeCursor, error::SbeDecodeError};
33
34mod best_bid_ask;
35mod depth_diff;
36mod depth_snapshot;
37mod trades;
38
39pub use best_bid_ask::BestBidAskStreamEvent;
40pub use depth_diff::DepthDiffStreamEvent;
41pub use depth_snapshot::DepthSnapshotStreamEvent;
42pub use trades::{Trade, TradesStreamEvent};
43
44pub const STREAM_SCHEMA_ID: u16 = 1;
46
47pub const STREAM_SCHEMA_VERSION: u16 = 0;
49
50pub const MAX_GROUP_SIZE: usize = 10_000;
53
54pub mod template_id {
56 pub const TRADES_STREAM_EVENT: u16 = 10000;
57 pub const BEST_BID_ASK_STREAM_EVENT: u16 = 10001;
58 pub const DEPTH_SNAPSHOT_STREAM_EVENT: u16 = 10002;
59 pub const DEPTH_DIFF_STREAM_EVENT: u16 = 10003;
60}
61
62#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum StreamDecodeError {
65 BufferTooShort { expected: usize, actual: usize },
67 GroupSizeTooLarge { count: usize, max: usize },
69 InvalidUtf8,
71 InvalidEnumValue { type_name: &'static str, value: u16 },
73 NumericOverflow { type_name: &'static str },
75 InvalidValue { field: &'static str },
77 SchemaMismatch { expected: u16, actual: u16 },
79 UnknownTemplateId(u16),
81 InvalidBlockLength { expected: u16, actual: u16 },
83}
84
85impl Display for StreamDecodeError {
86 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87 match self {
88 Self::BufferTooShort { expected, actual } => {
89 write!(
90 f,
91 "Buffer too short: expected {expected} bytes, was {actual}"
92 )
93 }
94 Self::GroupSizeTooLarge { count, max } => {
95 write!(f, "Group size {count} exceeds maximum {max}")
96 }
97 Self::InvalidUtf8 => write!(f, "Invalid UTF-8 in symbol"),
98 Self::InvalidEnumValue { type_name, value } => {
99 write!(f, "Invalid enum value {value} for {type_name}")
100 }
101 Self::NumericOverflow { type_name } => {
102 write!(f, "Numeric value overflows target type {type_name}")
103 }
104 Self::InvalidValue { field } => write!(f, "Invalid value for {field}"),
105 Self::SchemaMismatch { expected, actual } => {
106 write!(f, "Schema mismatch: expected {expected}, was {actual}")
107 }
108 Self::UnknownTemplateId(id) => write!(f, "Unknown template ID: {id}"),
109 Self::InvalidBlockLength { expected, actual } => {
110 write!(f, "Invalid block length: expected {expected}, was {actual}")
111 }
112 }
113 }
114}
115
116impl Error for StreamDecodeError {}
117
118impl From<SbeDecodeError> for StreamDecodeError {
119 fn from(err: SbeDecodeError) -> Self {
120 match err {
121 SbeDecodeError::BufferTooShort { expected, actual } => {
122 Self::BufferTooShort { expected, actual }
123 }
124 SbeDecodeError::SchemaMismatch { expected, actual } => {
125 Self::SchemaMismatch { expected, actual }
126 }
127 SbeDecodeError::VersionMismatch { .. } => Self::SchemaMismatch {
128 expected: STREAM_SCHEMA_VERSION,
129 actual: 0,
130 },
131 SbeDecodeError::UnknownTemplateId(id) => Self::UnknownTemplateId(id),
132 SbeDecodeError::GroupSizeTooLarge { count, max } => Self::GroupSizeTooLarge {
133 count: count as usize,
134 max: max as usize,
135 },
136 SbeDecodeError::InvalidBlockLength { expected, actual } => {
137 Self::InvalidBlockLength { expected, actual }
138 }
139 SbeDecodeError::InvalidUtf8 => Self::InvalidUtf8,
140 SbeDecodeError::InvalidEnumValue { type_name, value } => {
141 Self::InvalidEnumValue { type_name, value }
142 }
143 SbeDecodeError::NumericOverflow { type_name } => Self::NumericOverflow { type_name },
144 SbeDecodeError::InvalidValue { field } => Self::InvalidValue { field },
145 }
146 }
147}
148
149#[derive(Debug, Clone, Copy)]
151pub struct MessageHeader {
152 pub block_length: u16,
153 pub template_id: u16,
154 pub schema_id: u16,
155 pub version: u16,
156}
157
158impl MessageHeader {
159 pub const ENCODED_LENGTH: usize = 8;
160
161 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
167 if buf.len() < Self::ENCODED_LENGTH {
168 return Err(StreamDecodeError::BufferTooShort {
169 expected: Self::ENCODED_LENGTH,
170 actual: buf.len(),
171 });
172 }
173 Ok(Self {
174 block_length: u16::from_le_bytes([buf[0], buf[1]]),
175 template_id: u16::from_le_bytes([buf[2], buf[3]]),
176 schema_id: u16::from_le_bytes([buf[4], buf[5]]),
177 version: u16::from_le_bytes([buf[6], buf[7]]),
178 })
179 }
180
181 pub fn validate_schema(&self) -> Result<(), StreamDecodeError> {
187 if self.schema_id != STREAM_SCHEMA_ID {
188 return Err(StreamDecodeError::SchemaMismatch {
189 expected: STREAM_SCHEMA_ID,
190 actual: self.schema_id,
191 });
192 }
193 Ok(())
194 }
195}
196
197#[derive(Debug, Clone, Copy)]
199pub struct PriceLevel {
200 pub price_mantissa: i64,
202 pub qty_mantissa: i64,
204}
205
206impl PriceLevel {
207 pub const ENCODED_LENGTH: usize = 16;
208
209 pub fn decode(cursor: &mut SbeCursor<'_>) -> Result<Self, SbeDecodeError> {
215 Ok(Self {
216 price_mantissa: cursor.read_i64_le()?,
217 qty_mantissa: cursor.read_i64_le()?,
218 })
219 }
220}
221
222#[inline]
224#[must_use]
225pub fn mantissa_to_f64(mantissa: i64, exponent: i8) -> f64 {
226 mantissa as f64 * 10_f64.powi(exponent as i32)
227}
228
229#[cfg(test)]
230mod tests {
231 use rstest::rstest;
232
233 use super::*;
234 use crate::spot::sbe::error::SbeDecodeError;
235
236 #[rstest]
237 fn test_mantissa_to_f64() {
238 assert!((mantissa_to_f64(12345, -2) - 123.45).abs() < 1e-10);
239 assert!((mantissa_to_f64(100, 0) - 100.0).abs() < 1e-10);
240 assert!((mantissa_to_f64(5, 3) - 5000.0).abs() < 1e-10);
241 }
242
243 #[rstest]
244 fn test_message_header_too_short() {
245 let buf = [0u8; 7];
246 let err = MessageHeader::decode(&buf).unwrap_err();
247 assert_eq!(
248 err,
249 StreamDecodeError::BufferTooShort {
250 expected: 8,
251 actual: 7
252 }
253 );
254 }
255
256 #[rstest]
257 fn test_group_size_too_large() {
258 let mut buf = [0u8; 6];
260 let count = (MAX_GROUP_SIZE + 1) as u32;
261 buf[2..6].copy_from_slice(&count.to_le_bytes());
262
263 let err = GroupSizeEncoding::decode(&buf).unwrap_err();
264 assert!(matches!(err, SbeDecodeError::GroupSizeTooLarge { .. }));
265 }
266
267 #[rstest]
268 fn test_decode_var_string8_empty_buffer() {
269 let err = decode_var_string8(&[]).unwrap_err();
270 assert!(matches!(err, SbeDecodeError::BufferTooShort { .. }));
271 }
272
273 #[rstest]
274 fn test_decode_var_string8_truncated() {
275 let buf = [10u8, b'H', b'E', b'L', b'L'];
277 let err = decode_var_string8(&buf).unwrap_err();
278 assert!(matches!(err, SbeDecodeError::BufferTooShort { .. }));
279 }
280
281 #[rstest]
282 fn test_decode_var_string8_valid() {
283 let buf = [5u8, b'H', b'E', b'L', b'L', b'O'];
284 let (s, consumed) = decode_var_string8(&buf).unwrap();
285 assert_eq!(s, "HELLO");
286 assert_eq!(consumed, 6);
287 }
288
289 #[rstest]
290 fn test_schema_validation() {
291 let header = MessageHeader {
292 block_length: 50,
293 template_id: 10001,
294 schema_id: 99, version: 0,
296 };
297 let err = header.validate_schema().unwrap_err();
298 assert_eq!(
299 err,
300 StreamDecodeError::SchemaMismatch {
301 expected: STREAM_SCHEMA_ID,
302 actual: 99
303 }
304 );
305 }
306
307 #[rstest]
308 fn test_decode_error_conversion_preserves_new_variants() {
309 assert_eq!(
310 StreamDecodeError::from(SbeDecodeError::InvalidEnumValue {
311 type_name: "AggressorSide",
312 value: 99,
313 }),
314 StreamDecodeError::InvalidEnumValue {
315 type_name: "AggressorSide",
316 value: 99,
317 }
318 );
319 assert_eq!(
320 StreamDecodeError::from(SbeDecodeError::NumericOverflow { type_name: "Price" }),
321 StreamDecodeError::NumericOverflow { type_name: "Price" }
322 );
323 assert_eq!(
324 StreamDecodeError::from(SbeDecodeError::InvalidValue {
325 field: "Price.precision",
326 }),
327 StreamDecodeError::InvalidValue {
328 field: "Price.precision",
329 }
330 );
331 }
332}