Skip to main content

nautilus_binance/spot/sbe/stream/
depth_diff.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Depth diff stream event decoder.
17//!
18//! Message layout (after 8-byte header):
19//! - eventTime: i64 (microseconds)
20//! - firstBookUpdateId: i64
21//! - lastBookUpdateId: i64
22//! - priceExponent: i8
23//! - qtyExponent: i8
24//! - bids group (groupSize16Encoding: u16 blockLength + u16 numInGroup):
25//!   - price: i64 (mantissa)
26//!   - qty: i64 (mantissa)
27//! - asks group (groupSize16Encoding: u16 blockLength + u16 numInGroup):
28//!   - price: i64 (mantissa)
29//!   - qty: i64 (mantissa)
30//! - symbol: varString8
31
32use ustr::Ustr;
33
34use super::{MessageHeader, PriceLevel, StreamDecodeError};
35use crate::spot::sbe::cursor::SbeCursor;
36
37/// Depth diff stream event (incremental order book updates).
38#[derive(Debug, Clone)]
39pub struct DepthDiffStreamEvent {
40    /// Event timestamp in microseconds.
41    pub event_time_us: i64,
42    /// First book update ID in this diff.
43    pub first_book_update_id: i64,
44    /// Last book update ID in this diff.
45    pub last_book_update_id: i64,
46    /// Price exponent (prices = mantissa * 10^exponent).
47    pub price_exponent: i8,
48    /// Quantity exponent (quantities = mantissa * 10^exponent).
49    pub qty_exponent: i8,
50    /// Bid level updates (qty=0 means remove level).
51    pub bids: Vec<PriceLevel>,
52    /// Ask level updates (qty=0 means remove level).
53    pub asks: Vec<PriceLevel>,
54    /// Trading symbol.
55    pub symbol: Ustr,
56}
57
58impl DepthDiffStreamEvent {
59    /// Fixed block length (excluding header, groups, and variable-length data).
60    pub const BLOCK_LENGTH: usize = 26;
61
62    /// Decode from SBE buffer (including 8-byte header).
63    ///
64    /// # Errors
65    ///
66    /// Returns error if buffer is too short, group size exceeds limits,
67    /// or data is otherwise invalid.
68    pub fn decode(buf: &[u8]) -> Result<Self, StreamDecodeError> {
69        let header = MessageHeader::decode(buf)?;
70        header.validate_schema()?;
71        Self::decode_validated(buf)
72    }
73
74    /// Decode from an SBE buffer whose header has already been validated.
75    pub(crate) fn decode_validated(buf: &[u8]) -> Result<Self, StreamDecodeError> {
76        let mut cursor = SbeCursor::new_at(buf, MessageHeader::ENCODED_LENGTH);
77        Self::decode_body(&mut cursor)
78    }
79
80    #[inline]
81    fn decode_body(cursor: &mut SbeCursor<'_>) -> Result<Self, StreamDecodeError> {
82        let event_time_us = cursor.read_i64_le()?;
83        let first_book_update_id = cursor.read_i64_le()?;
84        let last_book_update_id = cursor.read_i64_le()?;
85        let price_exponent = cursor.read_i8()?;
86        let qty_exponent = cursor.read_i8()?;
87
88        let (bid_block_length, num_bids) = cursor.read_group_header_16()?;
89        let bids = cursor.read_group(bid_block_length, u32::from(num_bids), PriceLevel::decode)?;
90
91        let (ask_block_length, num_asks) = cursor.read_group_header_16()?;
92        let asks = cursor.read_group(ask_block_length, u32::from(num_asks), PriceLevel::decode)?;
93
94        let symbol = Ustr::from(cursor.read_var_string8_ref()?);
95
96        Ok(Self {
97            event_time_us,
98            first_book_update_id,
99            last_book_update_id,
100            price_exponent,
101            qty_exponent,
102            bids,
103            asks,
104            symbol,
105        })
106    }
107
108    /// Get price as f64 for a level.
109    #[inline]
110    #[must_use]
111    pub fn level_price(&self, level: &PriceLevel) -> f64 {
112        super::mantissa_to_f64(level.price_mantissa, self.price_exponent)
113    }
114
115    /// Get quantity as f64 for a level.
116    #[inline]
117    #[must_use]
118    pub fn level_qty(&self, level: &PriceLevel) -> f64 {
119        super::mantissa_to_f64(level.qty_mantissa, self.qty_exponent)
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use rstest::rstest;
126
127    use super::*;
128    use crate::spot::sbe::stream::{STREAM_SCHEMA_ID, template_id};
129
130    fn make_valid_buffer(num_bids: usize, num_asks: usize) -> Vec<u8> {
131        let level_block_len = 16u16;
132        let body_size = 26
133            + 4
134            + (num_bids * level_block_len as usize)
135            + 4
136            + (num_asks * level_block_len as usize)
137            + 8;
138        let mut buf = vec![0u8; 8 + body_size];
139
140        // Header
141        buf[0..2].copy_from_slice(&26u16.to_le_bytes()); // block_length
142        buf[2..4].copy_from_slice(&template_id::DEPTH_DIFF_STREAM_EVENT.to_le_bytes());
143        buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
144        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
145
146        // Body
147        let body = &mut buf[8..];
148        body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); // event_time_us
149        body[8..16].copy_from_slice(&12345i64.to_le_bytes()); // first_book_update_id
150        body[16..24].copy_from_slice(&12350i64.to_le_bytes()); // last_book_update_id
151        body[24] = (-2i8) as u8; // price_exponent
152        body[25] = (-8i8) as u8; // qty_exponent
153
154        let mut offset = 26;
155
156        // Bids group header
157        body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
158        body[offset + 2..offset + 4].copy_from_slice(&(num_bids as u16).to_le_bytes());
159        offset += 4;
160
161        // Bids
162        for i in 0..num_bids {
163            body[offset..offset + 8].copy_from_slice(&(4200000i64 - i as i64 * 100).to_le_bytes());
164            body[offset + 8..offset + 16].copy_from_slice(&100000000i64.to_le_bytes());
165            offset += level_block_len as usize;
166        }
167
168        // Asks group header
169        body[offset..offset + 2].copy_from_slice(&level_block_len.to_le_bytes());
170        body[offset + 2..offset + 4].copy_from_slice(&(num_asks as u16).to_le_bytes());
171        offset += 4;
172
173        // Asks
174        for i in 0..num_asks {
175            body[offset..offset + 8].copy_from_slice(&(4200100i64 + i as i64 * 100).to_le_bytes());
176            body[offset + 8..offset + 16].copy_from_slice(&200000000i64.to_le_bytes());
177            offset += level_block_len as usize;
178        }
179
180        // Symbol: "BTCUSDT"
181        body[offset] = 7;
182        body[offset + 1..offset + 8].copy_from_slice(b"BTCUSDT");
183
184        buf
185    }
186
187    #[rstest]
188    fn test_decode_valid() {
189        let buf = make_valid_buffer(3, 2);
190        let event = DepthDiffStreamEvent::decode(&buf).unwrap();
191
192        assert_eq!(event.event_time_us, 1000000);
193        assert_eq!(event.first_book_update_id, 12345);
194        assert_eq!(event.last_book_update_id, 12350);
195        assert_eq!(event.bids.len(), 3);
196        assert_eq!(event.asks.len(), 2);
197        assert_eq!(event.symbol, "BTCUSDT");
198    }
199
200    #[rstest]
201    fn test_decode_empty_updates() {
202        let buf = make_valid_buffer(0, 0);
203        let event = DepthDiffStreamEvent::decode(&buf).unwrap();
204
205        assert!(event.bids.is_empty());
206        assert!(event.asks.is_empty());
207    }
208
209    #[rstest]
210    fn test_decode_truncated() {
211        let mut buf = make_valid_buffer(5, 5);
212        buf.truncate(60); // Truncate in the middle
213        let err = DepthDiffStreamEvent::decode(&buf).unwrap_err();
214        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
215    }
216
217    #[rstest]
218    fn test_decode_wrong_schema() {
219        let mut buf = make_valid_buffer(3, 2);
220        buf[4..6].copy_from_slice(&99u16.to_le_bytes());
221        let err = DepthDiffStreamEvent::decode(&buf).unwrap_err();
222        assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
223    }
224
225    #[rstest]
226    fn test_decode_validated_matches_decode() {
227        let buf = make_valid_buffer(3, 2);
228
229        let decode_event = DepthDiffStreamEvent::decode(&buf).unwrap();
230        let validated_event = DepthDiffStreamEvent::decode_validated(&buf).unwrap();
231
232        assert_eq!(validated_event.event_time_us, decode_event.event_time_us);
233        assert_eq!(
234            validated_event.first_book_update_id,
235            decode_event.first_book_update_id
236        );
237        assert_eq!(
238            validated_event.last_book_update_id,
239            decode_event.last_book_update_id
240        );
241        assert_eq!(validated_event.price_exponent, decode_event.price_exponent);
242        assert_eq!(validated_event.qty_exponent, decode_event.qty_exponent);
243        assert_eq!(validated_event.bids.len(), decode_event.bids.len());
244        assert_eq!(validated_event.asks.len(), decode_event.asks.len());
245        assert_eq!(validated_event.symbol, decode_event.symbol);
246    }
247}