Skip to main content

nautilus_binance/spot/websocket/streams/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing utilities for Binance Spot WebSocket SBE messages.
17
18use nautilus_core::nanos::UnixNanos;
19use nautilus_model::{
20    data::{BookOrder, Data, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
21    enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
22    identifiers::TradeId,
23    instruments::{Instrument, InstrumentAny},
24    types::{Price, Quantity},
25};
26
27use crate::spot::sbe::stream::{
28    BestBidAskStreamEvent, DepthDiffStreamEvent, DepthSnapshotStreamEvent, MessageHeader,
29    StreamDecodeError, TradesStreamEvent, template_id,
30};
31
32/// Decoded market data message.
33#[derive(Debug)]
34pub enum MarketDataMessage {
35    /// Trade event.
36    Trades(TradesStreamEvent),
37    /// Best bid/ask update.
38    BestBidAsk(BestBidAskStreamEvent),
39    /// Order book snapshot.
40    DepthSnapshot(DepthSnapshotStreamEvent),
41    /// Order book diff update.
42    DepthDiff(DepthDiffStreamEvent),
43}
44
45/// Decode an SBE binary frame into a market data message.
46///
47/// Validates the message header (including schema ID) and routes to the
48/// appropriate decoder based on template ID.
49///
50/// # Errors
51///
52/// Returns an error if the buffer is too short, schema validation fails,
53/// or the template ID is unknown.
54pub fn decode_market_data(buf: &[u8]) -> Result<MarketDataMessage, StreamDecodeError> {
55    let header = MessageHeader::decode(buf)?;
56    header.validate_schema()?;
57
58    match header.template_id {
59        template_id::TRADES_STREAM_EVENT => Ok(MarketDataMessage::Trades(
60            TradesStreamEvent::decode_validated(buf)?,
61        )),
62        template_id::BEST_BID_ASK_STREAM_EVENT => Ok(MarketDataMessage::BestBidAsk(
63            BestBidAskStreamEvent::decode_validated(buf)?,
64        )),
65        template_id::DEPTH_SNAPSHOT_STREAM_EVENT => Ok(MarketDataMessage::DepthSnapshot(
66            DepthSnapshotStreamEvent::decode_validated(buf)?,
67        )),
68        template_id::DEPTH_DIFF_STREAM_EVENT => Ok(MarketDataMessage::DepthDiff(
69            DepthDiffStreamEvent::decode_validated(buf)?,
70        )),
71        _ => Err(StreamDecodeError::UnknownTemplateId(header.template_id)),
72    }
73}
74
75/// Parses a trades stream event into a vector of `TradeTick`.
76pub fn parse_trades_event(event: &TradesStreamEvent, instrument: &InstrumentAny) -> Vec<Data> {
77    let instrument_id = instrument.id();
78    let price_precision = instrument.price_precision();
79    let size_precision = instrument.size_precision();
80
81    event
82        .trades
83        .iter()
84        .map(|t| {
85            let price = Price::from_mantissa_exponent(
86                t.price_mantissa,
87                event.price_exponent,
88                price_precision,
89            );
90            let size = Quantity::from_mantissa_exponent(
91                t.qty_mantissa as u64,
92                event.qty_exponent,
93                size_precision,
94            );
95            let ts_event = UnixNanos::from_micros(event.transact_time_us as u64);
96
97            let trade = TradeTick::new(
98                instrument_id,
99                price,
100                size,
101                if t.is_buyer_maker {
102                    AggressorSide::Seller
103                } else {
104                    AggressorSide::Buyer
105                },
106                TradeId::new(t.id.to_string()),
107                ts_event,
108                ts_event,
109            );
110            Data::from(trade)
111        })
112        .collect()
113}
114
115/// Parses a best bid/ask event into a `QuoteTick`.
116pub fn parse_bbo_event(event: &BestBidAskStreamEvent, instrument: &InstrumentAny) -> QuoteTick {
117    let instrument_id = instrument.id();
118    let price_precision = instrument.price_precision();
119    let size_precision = instrument.size_precision();
120
121    let bid_price = Price::from_mantissa_exponent(
122        event.bid_price_mantissa,
123        event.price_exponent,
124        price_precision,
125    );
126    let bid_size = Quantity::from_mantissa_exponent(
127        event.bid_qty_mantissa as u64,
128        event.qty_exponent,
129        size_precision,
130    );
131    let ask_price = Price::from_mantissa_exponent(
132        event.ask_price_mantissa,
133        event.price_exponent,
134        price_precision,
135    );
136    let ask_size = Quantity::from_mantissa_exponent(
137        event.ask_qty_mantissa as u64,
138        event.qty_exponent,
139        size_precision,
140    );
141    let ts_event = UnixNanos::from_micros(event.event_time_us as u64);
142
143    QuoteTick::new(
144        instrument_id,
145        bid_price,
146        ask_price,
147        bid_size,
148        ask_size,
149        ts_event,
150        ts_event,
151    )
152}
153
154/// Parses a depth snapshot event into `OrderBookDeltas`.
155///
156/// Returns `None` if the snapshot contains no levels.
157pub fn parse_depth_snapshot(
158    event: &DepthSnapshotStreamEvent,
159    instrument: &InstrumentAny,
160) -> Option<OrderBookDeltas> {
161    let instrument_id = instrument.id();
162    let price_precision = instrument.price_precision();
163    let size_precision = instrument.size_precision();
164    let ts_event = UnixNanos::from_micros(event.event_time_us as u64);
165
166    let mut deltas = Vec::with_capacity(event.bids.len() + event.asks.len() + 1);
167
168    // Add clear delta first
169    deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_event));
170
171    // Add bid levels
172    for (i, level) in event.bids.iter().enumerate() {
173        let price = Price::from_mantissa_exponent(
174            level.price_mantissa,
175            event.price_exponent,
176            price_precision,
177        );
178        let size = Quantity::from_mantissa_exponent(
179            level.qty_mantissa as u64,
180            event.qty_exponent,
181            size_precision,
182        );
183        let flags = if i == event.bids.len() - 1 && event.asks.is_empty() {
184            RecordFlag::F_LAST as u8
185        } else {
186            0
187        };
188
189        let order = BookOrder::new(OrderSide::Buy, price, size, 0);
190
191        deltas.push(OrderBookDelta::new(
192            instrument_id,
193            BookAction::Add,
194            order,
195            flags,
196            0,
197            ts_event,
198            ts_event,
199        ));
200    }
201
202    // Add ask levels
203    for (i, level) in event.asks.iter().enumerate() {
204        let price = Price::from_mantissa_exponent(
205            level.price_mantissa,
206            event.price_exponent,
207            price_precision,
208        );
209        let size = Quantity::from_mantissa_exponent(
210            level.qty_mantissa as u64,
211            event.qty_exponent,
212            size_precision,
213        );
214        let flags = if i == event.asks.len() - 1 {
215            RecordFlag::F_LAST as u8
216        } else {
217            0
218        };
219
220        let order = BookOrder::new(OrderSide::Sell, price, size, 0);
221
222        deltas.push(OrderBookDelta::new(
223            instrument_id,
224            BookAction::Add,
225            order,
226            flags,
227            0,
228            ts_event,
229            ts_event,
230        ));
231    }
232
233    // A snapshot that only contains the synthetic clear delta has no book levels
234    // to apply and is treated as "no usable update".
235    if deltas.len() <= 1 {
236        return None;
237    }
238
239    Some(OrderBookDeltas::new(instrument_id, deltas))
240}
241
242/// Parses a depth diff event into `OrderBookDeltas`.
243///
244/// Returns `None` if the diff contains no updates.
245pub fn parse_depth_diff(
246    event: &DepthDiffStreamEvent,
247    instrument: &InstrumentAny,
248) -> Option<OrderBookDeltas> {
249    let instrument_id = instrument.id();
250    let price_precision = instrument.price_precision();
251    let size_precision = instrument.size_precision();
252    let ts_event = UnixNanos::from_micros(event.event_time_us as u64);
253
254    let mut deltas = Vec::with_capacity(event.bids.len() + event.asks.len());
255
256    // Add bid updates
257    for (i, level) in event.bids.iter().enumerate() {
258        let price = Price::from_mantissa_exponent(
259            level.price_mantissa,
260            event.price_exponent,
261            price_precision,
262        );
263        let size = Quantity::from_mantissa_exponent(
264            level.qty_mantissa as u64,
265            event.qty_exponent,
266            size_precision,
267        );
268
269        // Zero size means delete, otherwise update
270        let action = if level.qty_mantissa == 0 {
271            BookAction::Delete
272        } else {
273            BookAction::Update
274        };
275
276        let flags = if i == event.bids.len() - 1 && event.asks.is_empty() {
277            RecordFlag::F_LAST as u8
278        } else {
279            0
280        };
281
282        let order = BookOrder::new(OrderSide::Buy, price, size, 0);
283
284        deltas.push(OrderBookDelta::new(
285            instrument_id,
286            action,
287            order,
288            flags,
289            0,
290            ts_event,
291            ts_event,
292        ));
293    }
294
295    // Add ask updates
296    for (i, level) in event.asks.iter().enumerate() {
297        let price = Price::from_mantissa_exponent(
298            level.price_mantissa,
299            event.price_exponent,
300            price_precision,
301        );
302        let size = Quantity::from_mantissa_exponent(
303            level.qty_mantissa as u64,
304            event.qty_exponent,
305            size_precision,
306        );
307
308        let action = if level.qty_mantissa == 0 {
309            BookAction::Delete
310        } else {
311            BookAction::Update
312        };
313
314        let flags = if i == event.asks.len() - 1 {
315            RecordFlag::F_LAST as u8
316        } else {
317            0
318        };
319
320        let order = BookOrder::new(OrderSide::Sell, price, size, 0);
321
322        deltas.push(OrderBookDelta::new(
323            instrument_id,
324            action,
325            order,
326            flags,
327            0,
328            ts_event,
329            ts_event,
330        ));
331    }
332
333    if deltas.is_empty() {
334        return None;
335    }
336
337    Some(OrderBookDeltas::new(instrument_id, deltas))
338}
339
340#[cfg(test)]
341mod tests {
342    use rstest::rstest;
343    use ustr::Ustr;
344
345    use super::*;
346    use crate::{
347        common::parse::parse_spot_instrument_sbe,
348        spot::{
349            http::models::{
350                BinanceLotSizeFilterSbe, BinancePriceFilterSbe, BinanceSymbolFiltersSbe,
351                BinanceSymbolSbe,
352            },
353            sbe::stream::{PriceLevel, STREAM_SCHEMA_ID, Trade},
354        },
355    };
356
357    fn make_bbo_buffer() -> Vec<u8> {
358        let mut buf = vec![0u8; 70];
359
360        // Header
361        buf[0..2].copy_from_slice(&50u16.to_le_bytes()); // block_length
362        buf[2..4].copy_from_slice(&template_id::BEST_BID_ASK_STREAM_EVENT.to_le_bytes());
363        buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
364        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
365
366        // Body
367        let body = &mut buf[8..];
368        body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); // event_time_us
369        body[8..16].copy_from_slice(&12345i64.to_le_bytes()); // book_update_id
370        body[16] = (-2i8) as u8; // price_exponent
371        body[17] = (-8i8) as u8; // qty_exponent
372        body[18..26].copy_from_slice(&4200000i64.to_le_bytes()); // bid_price
373        body[26..34].copy_from_slice(&100000000i64.to_le_bytes()); // bid_qty
374        body[34..42].copy_from_slice(&4200100i64.to_le_bytes()); // ask_price
375        body[42..50].copy_from_slice(&200000000i64.to_le_bytes()); // ask_qty
376
377        // Symbol: "BTCUSDT" (7 bytes)
378        body[50] = 7;
379        body[51..58].copy_from_slice(b"BTCUSDT");
380
381        buf
382    }
383
384    fn sample_instrument() -> InstrumentAny {
385        let symbol = BinanceSymbolSbe {
386            symbol: "ETHUSDT".to_string(),
387            base_asset: "ETH".to_string(),
388            quote_asset: "USDT".to_string(),
389            base_asset_precision: 8,
390            quote_asset_precision: 8,
391            status: 0,
392            order_types: 0,
393            iceberg_allowed: true,
394            oco_allowed: true,
395            oto_allowed: false,
396            quote_order_qty_market_allowed: true,
397            allow_trailing_stop: true,
398            cancel_replace_allowed: true,
399            amend_allowed: true,
400            is_spot_trading_allowed: true,
401            is_margin_trading_allowed: false,
402            filters: BinanceSymbolFiltersSbe {
403                price_filter: Some(BinancePriceFilterSbe {
404                    price_exponent: -8,
405                    min_price: 1_000_000,
406                    max_price: 100_000_000_000_000,
407                    tick_size: 1_000_000,
408                }),
409                lot_size_filter: Some(BinanceLotSizeFilterSbe {
410                    qty_exponent: -8,
411                    min_qty: 10_000,
412                    max_qty: 900_000_000_000,
413                    step_size: 10_000,
414                }),
415            },
416            permissions: vec![vec!["SPOT".to_string()]],
417        };
418
419        let ts = UnixNanos::from(1_700_000_000_000_000_000u64);
420        parse_spot_instrument_sbe(&symbol, ts, ts).unwrap()
421    }
422
423    #[rstest]
424    fn test_decode_empty_buffer() {
425        let err = decode_market_data(&[]).unwrap_err();
426        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
427    }
428
429    #[rstest]
430    fn test_decode_short_buffer() {
431        let buf = [0u8; 5];
432        let err = decode_market_data(&buf).unwrap_err();
433        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
434    }
435
436    #[rstest]
437    fn test_decode_wrong_schema() {
438        let mut buf = [0u8; 100];
439        buf[0..2].copy_from_slice(&50u16.to_le_bytes()); // block_length
440        buf[2..4].copy_from_slice(&template_id::BEST_BID_ASK_STREAM_EVENT.to_le_bytes());
441        buf[4..6].copy_from_slice(&99u16.to_le_bytes()); // Wrong schema
442        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
443
444        let err = decode_market_data(&buf).unwrap_err();
445        assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
446    }
447
448    #[rstest]
449    fn test_decode_unknown_template() {
450        let mut buf = [0u8; 100];
451        buf[0..2].copy_from_slice(&50u16.to_le_bytes()); // block_length
452        buf[2..4].copy_from_slice(&9999u16.to_le_bytes()); // Unknown template
453        buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
454        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
455
456        let err = decode_market_data(&buf).unwrap_err();
457        assert!(matches!(err, StreamDecodeError::UnknownTemplateId(9999)));
458    }
459
460    #[rstest]
461    fn test_decode_valid_best_bid_ask() {
462        let buf = make_bbo_buffer();
463        let msg = decode_market_data(&buf).unwrap();
464
465        match msg {
466            MarketDataMessage::BestBidAsk(event) => {
467                assert_eq!(event.event_time_us, 1_000_000);
468                assert_eq!(event.symbol, Ustr::from("BTCUSDT"));
469            }
470            _ => panic!("Expected BestBidAsk"),
471        }
472    }
473
474    #[rstest]
475    fn test_parse_trades_event() {
476        let instrument = sample_instrument();
477        let event = TradesStreamEvent {
478            event_time_us: 1_700_000_000_000_000,
479            transact_time_us: 1_700_000_000_100_000,
480            price_exponent: -2,
481            qty_exponent: -4,
482            trades: vec![
483                Trade {
484                    id: 1,
485                    price_mantissa: 12_345,
486                    qty_mantissa: 25_000,
487                    is_buyer_maker: false,
488                },
489                Trade {
490                    id: 2,
491                    price_mantissa: 12_340,
492                    qty_mantissa: 10_000,
493                    is_buyer_maker: true,
494                },
495            ],
496            symbol: Ustr::from("ETHUSDT"),
497        };
498
499        let data = parse_trades_event(&event, &instrument);
500
501        assert_eq!(data.len(), 2);
502        match &data[0] {
503            Data::Trade(trade) => {
504                assert_eq!(trade.instrument_id, instrument.id());
505                assert_eq!(trade.price, Price::new(123.45, 2));
506                assert_eq!(trade.size, Quantity::new(2.5, 4));
507                assert_eq!(trade.aggressor_side, AggressorSide::Buyer);
508                assert_eq!(trade.trade_id, TradeId::new("1"));
509                assert_eq!(
510                    trade.ts_event,
511                    UnixNanos::from(1_700_000_000_100_000_000u64)
512                );
513                assert_eq!(trade.ts_init, UnixNanos::from(1_700_000_000_100_000_000u64));
514            }
515            other => panic!("Expected trade data, was {other:?}"),
516        }
517
518        match &data[1] {
519            Data::Trade(trade) => assert_eq!(trade.aggressor_side, AggressorSide::Seller),
520            other => panic!("Expected trade data, was {other:?}"),
521        }
522    }
523
524    #[rstest]
525    fn test_parse_bbo_event() {
526        let instrument = sample_instrument();
527        let event = BestBidAskStreamEvent {
528            event_time_us: 1_700_000_000_000_000,
529            book_update_id: 123,
530            price_exponent: -2,
531            qty_exponent: -4,
532            bid_price_mantissa: 12_345,
533            bid_qty_mantissa: 25_000,
534            ask_price_mantissa: 12_350,
535            ask_qty_mantissa: 30_000,
536            symbol: Ustr::from("ETHUSDT"),
537        };
538
539        let quote = parse_bbo_event(&event, &instrument);
540
541        assert_eq!(quote.instrument_id, instrument.id());
542        assert_eq!(quote.bid_price, Price::new(123.45, 2));
543        assert_eq!(quote.ask_price, Price::new(123.50, 2));
544        assert_eq!(quote.bid_size, Quantity::new(2.5, 4));
545        assert_eq!(quote.ask_size, Quantity::new(3.0, 4));
546        assert_eq!(
547            quote.ts_event,
548            UnixNanos::from(1_700_000_000_000_000_000u64)
549        );
550        assert_eq!(quote.ts_init, UnixNanos::from(1_700_000_000_000_000_000u64));
551    }
552
553    #[rstest]
554    fn test_parse_depth_snapshot() {
555        let instrument = sample_instrument();
556        let event = DepthSnapshotStreamEvent {
557            event_time_us: 1_700_000_000_000_000,
558            book_update_id: 123,
559            price_exponent: -2,
560            qty_exponent: -4,
561            bids: vec![PriceLevel {
562                price_mantissa: 12_345,
563                qty_mantissa: 25_000,
564            }],
565            asks: vec![PriceLevel {
566                price_mantissa: 12_350,
567                qty_mantissa: 30_000,
568            }],
569            symbol: Ustr::from("ETHUSDT"),
570        };
571
572        let deltas = parse_depth_snapshot(&event, &instrument).unwrap();
573
574        assert_eq!(deltas.instrument_id, instrument.id());
575        assert_eq!(deltas.deltas.len(), 3);
576        assert_eq!(deltas.deltas[0].action, BookAction::Clear);
577        assert_eq!(deltas.deltas[1].action, BookAction::Add);
578        assert_eq!(deltas.deltas[1].order.side, OrderSide::Buy);
579        assert_eq!(deltas.deltas[1].order.price, Price::new(123.45, 2));
580        assert_eq!(deltas.deltas[1].order.size, Quantity::new(2.5, 4));
581        assert_eq!(deltas.deltas[2].action, BookAction::Add);
582        assert_eq!(deltas.deltas[2].order.side, OrderSide::Sell);
583        assert_eq!(deltas.deltas[2].order.price, Price::new(123.50, 2));
584        assert_eq!(deltas.deltas[2].order.size, Quantity::new(3.0, 4));
585        assert_eq!(deltas.deltas[2].flags, RecordFlag::F_LAST as u8);
586        assert_eq!(
587            deltas.ts_event,
588            UnixNanos::from(1_700_000_000_000_000_000u64)
589        );
590    }
591
592    #[rstest]
593    fn test_parse_depth_snapshot_empty_returns_none() {
594        let instrument = sample_instrument();
595        let event = DepthSnapshotStreamEvent {
596            event_time_us: 1_700_000_000_000_000,
597            book_update_id: 123,
598            price_exponent: -2,
599            qty_exponent: -4,
600            bids: vec![],
601            asks: vec![],
602            symbol: Ustr::from("ETHUSDT"),
603        };
604
605        let deltas = parse_depth_snapshot(&event, &instrument);
606
607        assert!(deltas.is_none());
608    }
609
610    #[rstest]
611    fn test_parse_depth_diff() {
612        let instrument = sample_instrument();
613        let event = DepthDiffStreamEvent {
614            event_time_us: 1_700_000_000_000_000,
615            first_book_update_id: 100,
616            last_book_update_id: 101,
617            price_exponent: -2,
618            qty_exponent: -4,
619            bids: vec![
620                PriceLevel {
621                    price_mantissa: 12_345,
622                    qty_mantissa: 25_000,
623                },
624                PriceLevel {
625                    price_mantissa: 12_340,
626                    qty_mantissa: 0,
627                },
628            ],
629            asks: vec![PriceLevel {
630                price_mantissa: 12_350,
631                qty_mantissa: 30_000,
632            }],
633            symbol: Ustr::from("ETHUSDT"),
634        };
635
636        let deltas = parse_depth_diff(&event, &instrument).unwrap();
637
638        assert_eq!(deltas.instrument_id, instrument.id());
639        assert_eq!(deltas.deltas.len(), 3);
640        assert_eq!(deltas.deltas[0].action, BookAction::Update);
641        assert_eq!(deltas.deltas[0].order.side, OrderSide::Buy);
642        assert_eq!(deltas.deltas[1].action, BookAction::Delete);
643        assert_eq!(deltas.deltas[1].order.side, OrderSide::Buy);
644        assert_eq!(deltas.deltas[2].action, BookAction::Update);
645        assert_eq!(deltas.deltas[2].order.side, OrderSide::Sell);
646        assert_eq!(deltas.deltas[2].flags, RecordFlag::F_LAST as u8);
647        assert_eq!(
648            deltas.ts_event,
649            UnixNanos::from(1_700_000_000_000_000_000u64)
650        );
651    }
652
653    #[rstest]
654    fn test_parse_depth_diff_empty_returns_none() {
655        let instrument = sample_instrument();
656        let event = DepthDiffStreamEvent {
657            event_time_us: 1_700_000_000_000_000,
658            first_book_update_id: 100,
659            last_book_update_id: 101,
660            price_exponent: -2,
661            qty_exponent: -4,
662            bids: vec![],
663            asks: vec![],
664            symbol: Ustr::from("ETHUSDT"),
665        };
666
667        let deltas = parse_depth_diff(&event, &instrument);
668
669        assert!(deltas.is_none());
670    }
671}