Skip to main content

nautilus_binance/spot/sbe/stream/
depth_snapshot.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Depth snapshot stream event decoder.
17//!
18//! Message layout (after 8-byte header):
19//! - eventTime: i64 (microseconds)
20//! - bookUpdateId: i64
21//! - priceExponent: i8
22//! - qtyExponent: i8
23//! - bids group (groupSize16Encoding: u16 blockLength + u16 numInGroup):
24//!   - price: i64 (mantissa)
25//!   - qty: i64 (mantissa)
26//! - asks group (groupSize16Encoding: u16 blockLength + u16 numInGroup):
27//!   - price: i64 (mantissa)
28//!   - qty: i64 (mantissa)
29//! - symbol: varString8
30
31use ustr::Ustr;
32
33use super::{MessageHeader, PriceLevel, StreamDecodeError};
34use crate::spot::sbe::cursor::SbeCursor;
35
36/// Depth snapshot stream event (top N levels of order book).
37#[derive(Debug, Clone)]
38pub struct DepthSnapshotStreamEvent {
39    /// Event timestamp in microseconds.
40    pub event_time_us: i64,
41    /// Book update ID for sequencing.
42    pub book_update_id: i64,
43    /// Price exponent (prices = mantissa * 10^exponent).
44    pub price_exponent: i8,
45    /// Quantity exponent (quantities = mantissa * 10^exponent).
46    pub qty_exponent: i8,
47    /// Bid levels (best bid first).
48    pub bids: Vec<PriceLevel>,
49    /// Ask levels (best ask first).
50    pub asks: Vec<PriceLevel>,
51    /// Trading symbol.
52    pub symbol: Ustr,
53}
54
55impl DepthSnapshotStreamEvent {
56    /// Fixed block length (excluding header, groups, and variable-length data).
57    pub const BLOCK_LENGTH: usize = 18;
58
59    /// Decode from SBE buffer (including 8-byte header).
60    ///
61    /// # Errors
62    ///
63    /// Returns error if buffer is too short, group size exceeds limits,
64    /// or data is otherwise invalid.
65    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
66        let header = MessageHeader::decode(buf)?;
67        header.validate_schema()?;
68        Self::decode_validated(buf)
69    }
70
71    /// Decode from an SBE buffer whose header has already been validated.
72    pub(crate) fn decode_validated(buf: &[u8]) -> Result<Self, StreamDecodeError> {
73        let mut cursor = SbeCursor::new_at(buf, MessageHeader::ENCODED_LENGTH);
74        Self::decode_body(&mut cursor)
75    }
76
77    #[inline]
78    fn decode_body(cursor: &mut SbeCursor<'_>) -> Result<Self, StreamDecodeError> {
79        let event_time_us = cursor.read_i64_le()?;
80        let book_update_id = cursor.read_i64_le()?;
81        let price_exponent = cursor.read_i8()?;
82        let qty_exponent = cursor.read_i8()?;
83
84        let (bid_block_length, num_bids) = cursor.read_group_header_16()?;
85        let bids = cursor.read_group(bid_block_length, u32::from(num_bids), PriceLevel::decode)?;
86
87        let (ask_block_length, num_asks) = cursor.read_group_header_16()?;
88        let asks = cursor.read_group(ask_block_length, u32::from(num_asks), PriceLevel::decode)?;
89
90        let symbol = Ustr::from(cursor.read_var_string8_ref()?);
91
92        Ok(Self {
93            event_time_us,
94            book_update_id,
95            price_exponent,
96            qty_exponent,
97            bids,
98            asks,
99            symbol,
100        })
101    }
102
103    /// Get price as f64 for a level.
104    #[inline]
105    #[must_use]
106    pub fn level_price(&self, level: &PriceLevel) -> f64 {
107        super::mantissa_to_f64(level.price_mantissa, self.price_exponent)
108    }
109
110    /// Get quantity as f64 for a level.
111    #[inline]
112    #[must_use]
113    pub fn level_qty(&self, level: &PriceLevel) -> f64 {
114        super::mantissa_to_f64(level.qty_mantissa, self.qty_exponent)
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use rstest::rstest;
121
122    use super::*;
123    use crate::spot::sbe::stream::{STREAM_SCHEMA_ID, template_id};
124
125    fn make_valid_buffer(num_bids: usize, num_asks: usize) -> Vec<u8> {
126        let level_block_len = 16u16;
127        let body_size = 18
128            + 4
129            + (num_bids * level_block_len as usize)
130            + 4
131            + (num_asks * level_block_len as usize)
132            + 8;
133        let mut buf = vec![0u8; 8 + body_size];
134
135        // Header
136        buf[0..2].copy_from_slice(&18u16.to_le_bytes()); // block_length
137        buf[2..4].copy_from_slice(&template_id::DEPTH_SNAPSHOT_STREAM_EVENT.to_le_bytes());
138        buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
139        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
140
141        // Body
142        let body = &mut buf[8..];
143        body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); // event_time_us
144        body[8..16].copy_from_slice(&12345i64.to_le_bytes()); // book_update_id
145        body[16] = (-2i8) as u8; // price_exponent
146        body[17] = (-8i8) as u8; // qty_exponent
147
148        let mut offset = 18;
149
150        // Bids group header
151        body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
152        body[offset + 2..offset + 4].copy_from_slice(&(num_bids as u16).to_le_bytes());
153        offset += 4;
154
155        // Bids
156        for i in 0..num_bids {
157            body[offset..offset + 8].copy_from_slice(&(4200000i64 - i as i64 * 100).to_le_bytes());
158            body[offset + 8..offset + 16].copy_from_slice(&100000000i64.to_le_bytes());
159            offset += level_block_len as usize;
160        }
161
162        // Asks group header
163        body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
164        body[offset + 2..offset + 4].copy_from_slice(&(num_asks as u16).to_le_bytes());
165        offset += 4;
166
167        // Asks
168        for i in 0..num_asks {
169            body[offset..offset + 8].copy_from_slice(&(4200100i64 + i as i64 * 100).to_le_bytes());
170            body[offset + 8..offset + 16].copy_from_slice(&200000000i64.to_le_bytes());
171            offset += level_block_len as usize;
172        }
173
174        // Symbol: "BTCUSDT"
175        body[offset] = 7;
176        body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
177
178        buf
179    }
180
181    #[rstest]
182    fn test_decode_valid() {
183        let buf = make_valid_buffer(5, 5);
184        let event = DepthSnapshotStreamEvent::decode(&buf).unwrap();
185
186        assert_eq!(event.event_time_us, 1000000);
187        assert_eq!(event.book_update_id, 12345);
188        assert_eq!(event.bids.len(), 5);
189        assert_eq!(event.asks.len(), 5);
190        assert_eq!(event.symbol, "BTCUSDT");
191    }
192
193    #[rstest]
194    fn test_decode_empty_books() {
195        let buf = make_valid_buffer(0, 0);
196        let event = DepthSnapshotStreamEvent::decode(&buf).unwrap();
197
198        assert!(event.bids.is_empty());
199        assert!(event.asks.is_empty());
200    }
201
202    #[rstest]
203    fn test_decode_truncated() {
204        let mut buf = make_valid_buffer(10, 10);
205        buf.truncate(100); // Truncate in the middle
206        let err = DepthSnapshotStreamEvent::decode(&buf).unwrap_err();
207        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
208    }
209
210    #[rstest]
211    fn test_decode_wrong_schema() {
212        let mut buf = make_valid_buffer(5, 5);
213        buf[4..6].copy_from_slice(&99u16.to_le_bytes());
214        let err = DepthSnapshotStreamEvent::decode(&buf).unwrap_err();
215        assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
216    }
217
218    #[rstest]
219    fn test_decode_validated_matches_decode() {
220        let buf = make_valid_buffer(2, 3);
221
222        let decode_event = DepthSnapshotStreamEvent::decode(&buf).unwrap();
223        let validated_event = DepthSnapshotStreamEvent::decode_validated(&buf).unwrap();
224
225        assert_eq!(validated_event.event_time_us, decode_event.event_time_us);
226        assert_eq!(validated_event.book_update_id, decode_event.book_update_id);
227        assert_eq!(validated_event.price_exponent, decode_event.price_exponent);
228        assert_eq!(validated_event.qty_exponent, decode_event.qty_exponent);
229        assert_eq!(validated_event.bids.len(), decode_event.bids.len());
230        assert_eq!(validated_event.asks.len(), decode_event.asks.len());
231        assert_eq!(validated_event.symbol, decode_event.symbol);
232    }
233}