1use nautilus_core::nanos::UnixNanos;
19use nautilus_model::{
20 data::{BookOrder, Data, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
21 enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
22 identifiers::TradeId,
23 instruments::{Instrument, InstrumentAny},
24 types::{Price, Quantity},
25};
26
27use crate::spot::sbe::stream::{
28 BestBidAskStreamEvent, DepthDiffStreamEvent, DepthSnapshotStreamEvent, MessageHeader,
29 StreamDecodeError, TradesStreamEvent, template_id,
30};
31
32#[derive(Debug)]
34pub enum MarketDataMessage {
35 Trades(TradesStreamEvent),
37 BestBidAsk(BestBidAskStreamEvent),
39 DepthSnapshot(DepthSnapshotStreamEvent),
41 DepthDiff(DepthDiffStreamEvent),
43}
44
45pub fn decode_market_data(buf: &[u8]) -> Result<MarketDataMessage, StreamDecodeError> {
55 let header = MessageHeader::decode(buf)?;
56 header.validate_schema()?;
57
58 match header.template_id {
59 template_id::TRADES_STREAM_EVENT => Ok(MarketDataMessage::Trades(
60 TradesStreamEvent::decode_validated(buf)?,
61 )),
62 template_id::BEST_BID_ASK_STREAM_EVENT => Ok(MarketDataMessage::BestBidAsk(
63 BestBidAskStreamEvent::decode_validated(buf)?,
64 )),
65 template_id::DEPTH_SNAPSHOT_STREAM_EVENT => Ok(MarketDataMessage::DepthSnapshot(
66 DepthSnapshotStreamEvent::decode_validated(buf)?,
67 )),
68 template_id::DEPTH_DIFF_STREAM_EVENT => Ok(MarketDataMessage::DepthDiff(
69 DepthDiffStreamEvent::decode_validated(buf)?,
70 )),
71 _ => Err(StreamDecodeError::UnknownTemplateId(header.template_id)),
72 }
73}
74
75pub fn parse_trades_event(event: &TradesStreamEvent, instrument: &InstrumentAny) -> Vec<Data> {
77 let instrument_id = instrument.id();
78 let price_precision = instrument.price_precision();
79 let size_precision = instrument.size_precision();
80
81 event
82 .trades
83 .iter()
84 .map(|t| {
85 let price = Price::from_mantissa_exponent(
86 t.price_mantissa,
87 event.price_exponent,
88 price_precision,
89 );
90 let size = Quantity::from_mantissa_exponent(
91 t.qty_mantissa as u64,
92 event.qty_exponent,
93 size_precision,
94 );
95 let ts_event = UnixNanos::from_micros(event.transact_time_us as u64);
96
97 let trade = TradeTick::new(
98 instrument_id,
99 price,
100 size,
101 if t.is_buyer_maker {
102 AggressorSide::Seller
103 } else {
104 AggressorSide::Buyer
105 },
106 TradeId::new(t.id.to_string()),
107 ts_event,
108 ts_event,
109 );
110 Data::from(trade)
111 })
112 .collect()
113}
114
115pub fn parse_bbo_event(event: &BestBidAskStreamEvent, instrument: &InstrumentAny) -> QuoteTick {
117 let instrument_id = instrument.id();
118 let price_precision = instrument.price_precision();
119 let size_precision = instrument.size_precision();
120
121 let bid_price = Price::from_mantissa_exponent(
122 event.bid_price_mantissa,
123 event.price_exponent,
124 price_precision,
125 );
126 let bid_size = Quantity::from_mantissa_exponent(
127 event.bid_qty_mantissa as u64,
128 event.qty_exponent,
129 size_precision,
130 );
131 let ask_price = Price::from_mantissa_exponent(
132 event.ask_price_mantissa,
133 event.price_exponent,
134 price_precision,
135 );
136 let ask_size = Quantity::from_mantissa_exponent(
137 event.ask_qty_mantissa as u64,
138 event.qty_exponent,
139 size_precision,
140 );
141 let ts_event = UnixNanos::from_micros(event.event_time_us as u64);
142
143 QuoteTick::new(
144 instrument_id,
145 bid_price,
146 ask_price,
147 bid_size,
148 ask_size,
149 ts_event,
150 ts_event,
151 )
152}
153
154pub fn parse_depth_snapshot(
158 event: &DepthSnapshotStreamEvent,
159 instrument: &InstrumentAny,
160) -> Option<OrderBookDeltas> {
161 let instrument_id = instrument.id();
162 let price_precision = instrument.price_precision();
163 let size_precision = instrument.size_precision();
164 let ts_event = UnixNanos::from_micros(event.event_time_us as u64);
165
166 let mut deltas = Vec::with_capacity(event.bids.len() + event.asks.len() + 1);
167
168 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_event));
170
171 for (i, level) in event.bids.iter().enumerate() {
173 let price = Price::from_mantissa_exponent(
174 level.price_mantissa,
175 event.price_exponent,
176 price_precision,
177 );
178 let size = Quantity::from_mantissa_exponent(
179 level.qty_mantissa as u64,
180 event.qty_exponent,
181 size_precision,
182 );
183 let flags = if i == event.bids.len() - 1 && event.asks.is_empty() {
184 RecordFlag::F_LAST as u8
185 } else {
186 0
187 };
188
189 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
190
191 deltas.push(OrderBookDelta::new(
192 instrument_id,
193 BookAction::Add,
194 order,
195 flags,
196 0,
197 ts_event,
198 ts_event,
199 ));
200 }
201
202 for (i, level) in event.asks.iter().enumerate() {
204 let price = Price::from_mantissa_exponent(
205 level.price_mantissa,
206 event.price_exponent,
207 price_precision,
208 );
209 let size = Quantity::from_mantissa_exponent(
210 level.qty_mantissa as u64,
211 event.qty_exponent,
212 size_precision,
213 );
214 let flags = if i == event.asks.len() - 1 {
215 RecordFlag::F_LAST as u8
216 } else {
217 0
218 };
219
220 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
221
222 deltas.push(OrderBookDelta::new(
223 instrument_id,
224 BookAction::Add,
225 order,
226 flags,
227 0,
228 ts_event,
229 ts_event,
230 ));
231 }
232
233 if deltas.len() <= 1 {
236 return None;
237 }
238
239 Some(OrderBookDeltas::new(instrument_id, deltas))
240}
241
242pub fn parse_depth_diff(
246 event: &DepthDiffStreamEvent,
247 instrument: &InstrumentAny,
248) -> Option<OrderBookDeltas> {
249 let instrument_id = instrument.id();
250 let price_precision = instrument.price_precision();
251 let size_precision = instrument.size_precision();
252 let ts_event = UnixNanos::from_micros(event.event_time_us as u64);
253
254 let mut deltas = Vec::with_capacity(event.bids.len() + event.asks.len());
255
256 for (i, level) in event.bids.iter().enumerate() {
258 let price = Price::from_mantissa_exponent(
259 level.price_mantissa,
260 event.price_exponent,
261 price_precision,
262 );
263 let size = Quantity::from_mantissa_exponent(
264 level.qty_mantissa as u64,
265 event.qty_exponent,
266 size_precision,
267 );
268
269 let action = if level.qty_mantissa == 0 {
271 BookAction::Delete
272 } else {
273 BookAction::Update
274 };
275
276 let flags = if i == event.bids.len() - 1 && event.asks.is_empty() {
277 RecordFlag::F_LAST as u8
278 } else {
279 0
280 };
281
282 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
283
284 deltas.push(OrderBookDelta::new(
285 instrument_id,
286 action,
287 order,
288 flags,
289 0,
290 ts_event,
291 ts_event,
292 ));
293 }
294
295 for (i, level) in event.asks.iter().enumerate() {
297 let price = Price::from_mantissa_exponent(
298 level.price_mantissa,
299 event.price_exponent,
300 price_precision,
301 );
302 let size = Quantity::from_mantissa_exponent(
303 level.qty_mantissa as u64,
304 event.qty_exponent,
305 size_precision,
306 );
307
308 let action = if level.qty_mantissa == 0 {
309 BookAction::Delete
310 } else {
311 BookAction::Update
312 };
313
314 let flags = if i == event.asks.len() - 1 {
315 RecordFlag::F_LAST as u8
316 } else {
317 0
318 };
319
320 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
321
322 deltas.push(OrderBookDelta::new(
323 instrument_id,
324 action,
325 order,
326 flags,
327 0,
328 ts_event,
329 ts_event,
330 ));
331 }
332
333 if deltas.is_empty() {
334 return None;
335 }
336
337 Some(OrderBookDeltas::new(instrument_id, deltas))
338}
339
340#[cfg(test)]
341mod tests {
342 use rstest::rstest;
343 use ustr::Ustr;
344
345 use super::*;
346 use crate::{
347 common::parse::parse_spot_instrument_sbe,
348 spot::{
349 http::models::{
350 BinanceLotSizeFilterSbe, BinancePriceFilterSbe, BinanceSymbolFiltersSbe,
351 BinanceSymbolSbe,
352 },
353 sbe::stream::{PriceLevel, STREAM_SCHEMA_ID, Trade},
354 },
355 };
356
357 fn make_bbo_buffer() -> Vec<u8> {
358 let mut buf = vec![0u8; 70];
359
360 buf[0..2].copy_from_slice(&50u16.to_le_bytes()); buf[2..4].copy_from_slice(&template_id::BEST_BID_ASK_STREAM_EVENT.to_le_bytes());
363 buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
364 buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let body = &mut buf[8..];
368 body[0..8].copy_from_slice(&1000000i64.to_le_bytes()); body[8..16].copy_from_slice(&12345i64.to_le_bytes()); body[16] = (-2i8) as u8; body[17] = (-8i8) as u8; body[18..26].copy_from_slice(&4200000i64.to_le_bytes()); body[26..34].copy_from_slice(&100000000i64.to_le_bytes()); body[34..42].copy_from_slice(&4200100i64.to_le_bytes()); body[42..50].copy_from_slice(&200000000i64.to_le_bytes()); body[50] = 7;
379 body[51..58].copy_from_slice(b"BTCUSDT");
380
381 buf
382 }
383
384 fn sample_instrument() -> InstrumentAny {
385 let symbol = BinanceSymbolSbe {
386 symbol: "ETHUSDT".to_string(),
387 base_asset: "ETH".to_string(),
388 quote_asset: "USDT".to_string(),
389 base_asset_precision: 8,
390 quote_asset_precision: 8,
391 status: 0,
392 order_types: 0,
393 iceberg_allowed: true,
394 oco_allowed: true,
395 oto_allowed: false,
396 quote_order_qty_market_allowed: true,
397 allow_trailing_stop: true,
398 cancel_replace_allowed: true,
399 amend_allowed: true,
400 is_spot_trading_allowed: true,
401 is_margin_trading_allowed: false,
402 filters: BinanceSymbolFiltersSbe {
403 price_filter: Some(BinancePriceFilterSbe {
404 price_exponent: -8,
405 min_price: 1_000_000,
406 max_price: 100_000_000_000_000,
407 tick_size: 1_000_000,
408 }),
409 lot_size_filter: Some(BinanceLotSizeFilterSbe {
410 qty_exponent: -8,
411 min_qty: 10_000,
412 max_qty: 900_000_000_000,
413 step_size: 10_000,
414 }),
415 },
416 permissions: vec![vec!["SPOT".to_string()]],
417 };
418
419 let ts = UnixNanos::from(1_700_000_000_000_000_000u64);
420 parse_spot_instrument_sbe(&symbol, ts, ts).unwrap()
421 }
422
423 #[rstest]
424 fn test_decode_empty_buffer() {
425 let err = decode_market_data(&[]).unwrap_err();
426 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
427 }
428
429 #[rstest]
430 fn test_decode_short_buffer() {
431 let buf = [0u8; 5];
432 let err = decode_market_data(&buf).unwrap_err();
433 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
434 }
435
436 #[rstest]
437 fn test_decode_wrong_schema() {
438 let mut buf = [0u8; 100];
439 buf[0..2].copy_from_slice(&50u16.to_le_bytes()); buf[2..4].copy_from_slice(&template_id::BEST_BID_ASK_STREAM_EVENT.to_le_bytes());
441 buf[4..6].copy_from_slice(&99u16.to_le_bytes()); buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let err = decode_market_data(&buf).unwrap_err();
445 assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
446 }
447
448 #[rstest]
449 fn test_decode_unknown_template() {
450 let mut buf = [0u8; 100];
451 buf[0..2].copy_from_slice(&50u16.to_le_bytes()); buf[2..4].copy_from_slice(&9999u16.to_le_bytes()); buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
454 buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let err = decode_market_data(&buf).unwrap_err();
457 assert!(matches!(err, StreamDecodeError::UnknownTemplateId(9999)));
458 }
459
460 #[rstest]
461 fn test_decode_valid_best_bid_ask() {
462 let buf = make_bbo_buffer();
463 let msg = decode_market_data(&buf).unwrap();
464
465 match msg {
466 MarketDataMessage::BestBidAsk(event) => {
467 assert_eq!(event.event_time_us, 1_000_000);
468 assert_eq!(event.symbol, Ustr::from("BTCUSDT"));
469 }
470 _ => panic!("Expected BestBidAsk"),
471 }
472 }
473
474 #[rstest]
475 fn test_parse_trades_event() {
476 let instrument = sample_instrument();
477 let event = TradesStreamEvent {
478 event_time_us: 1_700_000_000_000_000,
479 transact_time_us: 1_700_000_000_100_000,
480 price_exponent: -2,
481 qty_exponent: -4,
482 trades: vec![
483 Trade {
484 id: 1,
485 price_mantissa: 12_345,
486 qty_mantissa: 25_000,
487 is_buyer_maker: false,
488 },
489 Trade {
490 id: 2,
491 price_mantissa: 12_340,
492 qty_mantissa: 10_000,
493 is_buyer_maker: true,
494 },
495 ],
496 symbol: Ustr::from("ETHUSDT"),
497 };
498
499 let data = parse_trades_event(&event, &instrument);
500
501 assert_eq!(data.len(), 2);
502 match &data[0] {
503 Data::Trade(trade) => {
504 assert_eq!(trade.instrument_id, instrument.id());
505 assert_eq!(trade.price, Price::new(123.45, 2));
506 assert_eq!(trade.size, Quantity::new(2.5, 4));
507 assert_eq!(trade.aggressor_side, AggressorSide::Buyer);
508 assert_eq!(trade.trade_id, TradeId::new("1"));
509 assert_eq!(
510 trade.ts_event,
511 UnixNanos::from(1_700_000_000_100_000_000u64)
512 );
513 assert_eq!(trade.ts_init, UnixNanos::from(1_700_000_000_100_000_000u64));
514 }
515 other => panic!("Expected trade data, was {other:?}"),
516 }
517
518 match &data[1] {
519 Data::Trade(trade) => assert_eq!(trade.aggressor_side, AggressorSide::Seller),
520 other => panic!("Expected trade data, was {other:?}"),
521 }
522 }
523
524 #[rstest]
525 fn test_parse_bbo_event() {
526 let instrument = sample_instrument();
527 let event = BestBidAskStreamEvent {
528 event_time_us: 1_700_000_000_000_000,
529 book_update_id: 123,
530 price_exponent: -2,
531 qty_exponent: -4,
532 bid_price_mantissa: 12_345,
533 bid_qty_mantissa: 25_000,
534 ask_price_mantissa: 12_350,
535 ask_qty_mantissa: 30_000,
536 symbol: Ustr::from("ETHUSDT"),
537 };
538
539 let quote = parse_bbo_event(&event, &instrument);
540
541 assert_eq!(quote.instrument_id, instrument.id());
542 assert_eq!(quote.bid_price, Price::new(123.45, 2));
543 assert_eq!(quote.ask_price, Price::new(123.50, 2));
544 assert_eq!(quote.bid_size, Quantity::new(2.5, 4));
545 assert_eq!(quote.ask_size, Quantity::new(3.0, 4));
546 assert_eq!(
547 quote.ts_event,
548 UnixNanos::from(1_700_000_000_000_000_000u64)
549 );
550 assert_eq!(quote.ts_init, UnixNanos::from(1_700_000_000_000_000_000u64));
551 }
552
553 #[rstest]
554 fn test_parse_depth_snapshot() {
555 let instrument = sample_instrument();
556 let event = DepthSnapshotStreamEvent {
557 event_time_us: 1_700_000_000_000_000,
558 book_update_id: 123,
559 price_exponent: -2,
560 qty_exponent: -4,
561 bids: vec![PriceLevel {
562 price_mantissa: 12_345,
563 qty_mantissa: 25_000,
564 }],
565 asks: vec![PriceLevel {
566 price_mantissa: 12_350,
567 qty_mantissa: 30_000,
568 }],
569 symbol: Ustr::from("ETHUSDT"),
570 };
571
572 let deltas = parse_depth_snapshot(&event, &instrument).unwrap();
573
574 assert_eq!(deltas.instrument_id, instrument.id());
575 assert_eq!(deltas.deltas.len(), 3);
576 assert_eq!(deltas.deltas[0].action, BookAction::Clear);
577 assert_eq!(deltas.deltas[1].action, BookAction::Add);
578 assert_eq!(deltas.deltas[1].order.side, OrderSide::Buy);
579 assert_eq!(deltas.deltas[1].order.price, Price::new(123.45, 2));
580 assert_eq!(deltas.deltas[1].order.size, Quantity::new(2.5, 4));
581 assert_eq!(deltas.deltas[2].action, BookAction::Add);
582 assert_eq!(deltas.deltas[2].order.side, OrderSide::Sell);
583 assert_eq!(deltas.deltas[2].order.price, Price::new(123.50, 2));
584 assert_eq!(deltas.deltas[2].order.size, Quantity::new(3.0, 4));
585 assert_eq!(deltas.deltas[2].flags, RecordFlag::F_LAST as u8);
586 assert_eq!(
587 deltas.ts_event,
588 UnixNanos::from(1_700_000_000_000_000_000u64)
589 );
590 }
591
592 #[rstest]
593 fn test_parse_depth_snapshot_empty_returns_none() {
594 let instrument = sample_instrument();
595 let event = DepthSnapshotStreamEvent {
596 event_time_us: 1_700_000_000_000_000,
597 book_update_id: 123,
598 price_exponent: -2,
599 qty_exponent: -4,
600 bids: vec![],
601 asks: vec![],
602 symbol: Ustr::from("ETHUSDT"),
603 };
604
605 let deltas = parse_depth_snapshot(&event, &instrument);
606
607 assert!(deltas.is_none());
608 }
609
610 #[rstest]
611 fn test_parse_depth_diff() {
612 let instrument = sample_instrument();
613 let event = DepthDiffStreamEvent {
614 event_time_us: 1_700_000_000_000_000,
615 first_book_update_id: 100,
616 last_book_update_id: 101,
617 price_exponent: -2,
618 qty_exponent: -4,
619 bids: vec![
620 PriceLevel {
621 price_mantissa: 12_345,
622 qty_mantissa: 25_000,
623 },
624 PriceLevel {
625 price_mantissa: 12_340,
626 qty_mantissa: 0,
627 },
628 ],
629 asks: vec![PriceLevel {
630 price_mantissa: 12_350,
631 qty_mantissa: 30_000,
632 }],
633 symbol: Ustr::from("ETHUSDT"),
634 };
635
636 let deltas = parse_depth_diff(&event, &instrument).unwrap();
637
638 assert_eq!(deltas.instrument_id, instrument.id());
639 assert_eq!(deltas.deltas.len(), 3);
640 assert_eq!(deltas.deltas[0].action, BookAction::Update);
641 assert_eq!(deltas.deltas[0].order.side, OrderSide::Buy);
642 assert_eq!(deltas.deltas[1].action, BookAction::Delete);
643 assert_eq!(deltas.deltas[1].order.side, OrderSide::Buy);
644 assert_eq!(deltas.deltas[2].action, BookAction::Update);
645 assert_eq!(deltas.deltas[2].order.side, OrderSide::Sell);
646 assert_eq!(deltas.deltas[2].flags, RecordFlag::F_LAST as u8);
647 assert_eq!(
648 deltas.ts_event,
649 UnixNanos::from(1_700_000_000_000_000_000u64)
650 );
651 }
652
653 #[rstest]
654 fn test_parse_depth_diff_empty_returns_none() {
655 let instrument = sample_instrument();
656 let event = DepthDiffStreamEvent {
657 event_time_us: 1_700_000_000_000_000,
658 first_book_update_id: 100,
659 last_book_update_id: 101,
660 price_exponent: -2,
661 qty_exponent: -4,
662 bids: vec![],
663 asks: vec![],
664 symbol: Ustr::from("ETHUSDT"),
665 };
666
667 let deltas = parse_depth_diff(&event, &instrument);
668
669 assert!(deltas.is_none());
670 }
671}