nautilus_binance/spot/sbe/stream/
depth_snapshot.rs1use ustr::Ustr;
32
33use super::{MessageHeader, PriceLevel, StreamDecodeError};
34use crate::spot::sbe::cursor::SbeCursor;
35
36#[derive(Debug, Clone)]
38pub struct DepthSnapshotStreamEvent {
39 pub event_time_us: i64,
41 pub book_update_id: i64,
43 pub price_exponent: i8,
45 pub qty_exponent: i8,
47 pub bids: Vec<PriceLevel>,
49 pub asks: Vec<PriceLevel>,
51 pub symbol: Ustr,
53}
54
55impl DepthSnapshotStreamEvent {
56 pub const BLOCK_LENGTH: usize = 18;
58
59 pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
66 let header = MessageHeader::decode(buf)?;
67 header.validate_schema()?;
68 Self::decode_validated(buf)
69 }
70
71 pub(crate) fn decode_validated(buf: &[u8]) -> Result<Self, StreamDecodeError> {
73 let mut cursor = SbeCursor::new_at(buf, MessageHeader::ENCODED_LENGTH);
74 Self::decode_body(&mut cursor)
75 }
76
77 #[inline]
78 fn decode_body(cursor: &mut SbeCursor<'_>) -> Result<Self, StreamDecodeError> {
79 let event_time_us = cursor.read_i64_le()?;
80 let book_update_id = cursor.read_i64_le()?;
81 let price_exponent = cursor.read_i8()?;
82 let qty_exponent = cursor.read_i8()?;
83
84 let (bid_block_length, num_bids) = cursor.read_group_header_16()?;
85 let bids = cursor.read_group(bid_block_length, u32::from(num_bids), PriceLevel::decode)?;
86
87 let (ask_block_length, num_asks) = cursor.read_group_header_16()?;
88 let asks = cursor.read_group(ask_block_length, u32::from(num_asks), PriceLevel::decode)?;
89
90 let symbol = Ustr::from(cursor.read_var_string8_ref()?);
91
92 Ok(Self {
93 event_time_us,
94 book_update_id,
95 price_exponent,
96 qty_exponent,
97 bids,
98 asks,
99 symbol,
100 })
101 }
102
103 #[inline]
105 #[must_use]
106 pub fn level_price(&self, level: &PriceLevel) -> f64 {
107 super::mantissa_to_f64(level.price_mantissa, self.price_exponent)
108 }
109
110 #[inline]
112 #[must_use]
113 pub fn level_qty(&self, level: &PriceLevel) -> f64 {
114 super::mantissa_to_f64(level.qty_mantissa, self.qty_exponent)
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use rstest::rstest;
121
122 use super::*;
123 use crate::spot::sbe::stream::{STREAM_SCHEMA_ID, template_id};
124
125 fn make_valid_buffer(num_bids: usize, num_asks: usize) -> Vec<u8> {
126 let level_block_len = 16u16;
127 let body_size = 18
128 + 4
129 + (num_bids * level_block_len as usize)
130 + 4
131 + (num_asks * level_block_len as usize)
132 + 8;
133 let mut buf = vec![0u8; 8 + body_size];
134
135 buf[0..2].copy_from_slice(&18u16.to_le_bytes()); buf[2..4].copy_from_slice(&template_id::DEPTH_SNAPSHOT_STREAM_EVENT.to_le_bytes());
138 buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
139 buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let body = &mut buf[8..];
143 body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); body[8..16].copy_from_slice(&12345i64.to_le_bytes()); body[16] = (-2i8) as u8; body[17] = (-8i8) as u8; let mut offset = 18;
149
150 body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
152 body[offset + 2..offset + 4].copy_from_slice(&(num_bids as u16).to_le_bytes());
153 offset += 4;
154
155 for i in 0..num_bids {
157 body[offset..offset + 8].copy_from_slice(&(4200000i64 - i as i64 * 100).to_le_bytes());
158 body[offset + 8..offset + 16].copy_from_slice(&100000000i64.to_le_bytes());
159 offset += level_block_len as usize;
160 }
161
162 body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
164 body[offset + 2..offset + 4].copy_from_slice(&(num_asks as u16).to_le_bytes());
165 offset += 4;
166
167 for i in 0..num_asks {
169 body[offset..offset + 8].copy_from_slice(&(4200100i64 + i as i64 * 100).to_le_bytes());
170 body[offset + 8..offset + 16].copy_from_slice(&200000000i64.to_le_bytes());
171 offset += level_block_len as usize;
172 }
173
174 body[offset] = 7;
176 body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
177
178 buf
179 }
180
181 #[rstest]
182 fn test_decode_valid() {
183 let buf = make_valid_buffer(5, 5);
184 let event = DepthSnapshotStreamEvent::decode(&buf).unwrap();
185
186 assert_eq!(event.event_time_us, 1000000);
187 assert_eq!(event.book_update_id, 12345);
188 assert_eq!(event.bids.len(), 5);
189 assert_eq!(event.asks.len(), 5);
190 assert_eq!(event.symbol, "BTCUSDT");
191 }
192
193 #[rstest]
194 fn test_decode_empty_books() {
195 let buf = make_valid_buffer(0, 0);
196 let event = DepthSnapshotStreamEvent::decode(&buf).unwrap();
197
198 assert!(event.bids.is_empty());
199 assert!(event.asks.is_empty());
200 }
201
202 #[rstest]
203 fn test_decode_truncated() {
204 let mut buf = make_valid_buffer(10, 10);
205 buf.truncate(100); let err = DepthSnapshotStreamEvent::decode(&buf).unwrap_err();
207 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
208 }
209
210 #[rstest]
211 fn test_decode_wrong_schema() {
212 let mut buf = make_valid_buffer(5, 5);
213 buf[4..6].copy_from_slice(&99u16.to_le_bytes());
214 let err = DepthSnapshotStreamEvent::decode(&buf).unwrap_err();
215 assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
216 }
217
218 #[rstest]
219 fn test_decode_validated_matches_decode() {
220 let buf = make_valid_buffer(2, 3);
221
222 let decode_event = DepthSnapshotStreamEvent::decode(&buf).unwrap();
223 let validated_event = DepthSnapshotStreamEvent::decode_validated(&buf).unwrap();
224
225 assert_eq!(validated_event.event_time_us, decode_event.event_time_us);
226 assert_eq!(validated_event.book_update_id, decode_event.book_update_id);
227 assert_eq!(validated_event.price_exponent, decode_event.price_exponent);
228 assert_eq!(validated_event.qty_exponent, decode_event.qty_exponent);
229 assert_eq!(validated_event.bids.len(), decode_event.bids.len());
230 assert_eq!(validated_event.asks.len(), decode_event.asks.len());
231 assert_eq!(validated_event.symbol, decode_event.symbol);
232 }
233}