Skip to main content

nautilus_binance/futures/websocket/streams/
parse_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing utilities for Binance Futures WebSocket JSON messages.
17
18use nautilus_core::nanos::UnixNanos;
19use nautilus_model::{
20    data::{
21        Bar, BarSpecification, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate,
22        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick,
23    },
24    enums::{
25        AggregationSource, AggressorSide, BarAggregation, BookAction, OrderSide, PriceType,
26        RecordFlag,
27    },
28    identifiers::TradeId,
29    instruments::{Instrument, InstrumentAny},
30    types::{Price, Quantity},
31};
32use rust_decimal::{Decimal, prelude::FromPrimitive};
33use ustr::Ustr;
34
35use super::{
36    error::{BinanceWsError, BinanceWsResult},
37    messages::{
38        BinanceFuturesAggTradeMsg, BinanceFuturesBookTickerMsg, BinanceFuturesDepthUpdateMsg,
39        BinanceFuturesKlineMsg, BinanceFuturesMarkPriceMsg, BinanceFuturesTradeMsg,
40    },
41};
42use crate::common::enums::{BinanceKlineInterval, BinanceWsEventType};
43
44/// Parses an aggregate trade message into a `TradeTick`.
45///
46/// # Errors
47///
48/// Returns an error if parsing fails.
49pub fn parse_agg_trade(
50    msg: &BinanceFuturesAggTradeMsg,
51    instrument: &InstrumentAny,
52    ts_init: UnixNanos,
53) -> BinanceWsResult<TradeTick> {
54    let instrument_id = instrument.id();
55    let price_precision = instrument.price_precision();
56    let size_precision = instrument.size_precision();
57
58    let price = msg
59        .price
60        .parse::<f64>()
61        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
62    let size = msg
63        .quantity
64        .parse::<f64>()
65        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
66
67    let aggressor_side = if msg.is_buyer_maker {
68        AggressorSide::Seller
69    } else {
70        AggressorSide::Buyer
71    };
72
73    let ts_event = UnixNanos::from_millis(msg.trade_time as u64);
74    let trade_id = TradeId::new(msg.agg_trade_id.to_string());
75
76    Ok(TradeTick::new(
77        instrument_id,
78        Price::new(price, price_precision),
79        Quantity::new(size, size_precision),
80        aggressor_side,
81        trade_id,
82        ts_event,
83        ts_init,
84    ))
85}
86
87/// Parses a trade message into a `TradeTick`.
88///
89/// # Errors
90///
91/// Returns an error if parsing fails.
92pub fn parse_trade(
93    msg: &BinanceFuturesTradeMsg,
94    instrument: &InstrumentAny,
95    ts_init: UnixNanos,
96) -> BinanceWsResult<TradeTick> {
97    let instrument_id = instrument.id();
98    let price_precision = instrument.price_precision();
99    let size_precision = instrument.size_precision();
100
101    let price = msg
102        .price
103        .parse::<f64>()
104        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
105    let size = msg
106        .quantity
107        .parse::<f64>()
108        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
109
110    let aggressor_side = if msg.is_buyer_maker {
111        AggressorSide::Seller
112    } else {
113        AggressorSide::Buyer
114    };
115
116    let ts_event = UnixNanos::from_millis(msg.trade_time as u64);
117    let trade_id = TradeId::new(msg.trade_id.to_string());
118
119    Ok(TradeTick::new(
120        instrument_id,
121        Price::new(price, price_precision),
122        Quantity::new(size, size_precision),
123        aggressor_side,
124        trade_id,
125        ts_event,
126        ts_init,
127    ))
128}
129
130/// Parses a book ticker message into a `QuoteTick`.
131///
132/// # Errors
133///
134/// Returns an error if parsing fails.
135pub fn parse_book_ticker(
136    msg: &BinanceFuturesBookTickerMsg,
137    instrument: &InstrumentAny,
138    ts_init: UnixNanos,
139) -> BinanceWsResult<QuoteTick> {
140    let instrument_id = instrument.id();
141    let price_precision = instrument.price_precision();
142    let size_precision = instrument.size_precision();
143
144    let bid_price = msg
145        .best_bid_price
146        .parse::<f64>()
147        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
148    let bid_size = msg
149        .best_bid_qty
150        .parse::<f64>()
151        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
152    let ask_price = msg
153        .best_ask_price
154        .parse::<f64>()
155        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
156    let ask_size = msg
157        .best_ask_qty
158        .parse::<f64>()
159        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
160
161    let ts_event = UnixNanos::from_millis(msg.transaction_time as u64);
162
163    Ok(QuoteTick::new(
164        instrument_id,
165        Price::new(bid_price, price_precision),
166        Price::new(ask_price, price_precision),
167        Quantity::new(bid_size, size_precision),
168        Quantity::new(ask_size, size_precision),
169        ts_event,
170        ts_init,
171    ))
172}
173
174/// Parses a depth update message into `OrderBookDeltas`.
175///
176/// # Errors
177///
178/// Returns an error if parsing fails.
179pub fn parse_depth_update(
180    msg: &BinanceFuturesDepthUpdateMsg,
181    instrument: &InstrumentAny,
182    ts_init: UnixNanos,
183) -> BinanceWsResult<OrderBookDeltas> {
184    let instrument_id = instrument.id();
185    let price_precision = instrument.price_precision();
186    let size_precision = instrument.size_precision();
187
188    let ts_event = UnixNanos::from_millis(msg.transaction_time as u64);
189
190    let mut deltas = Vec::with_capacity(msg.bids.len() + msg.asks.len());
191
192    // Process bids
193    for (i, bid) in msg.bids.iter().enumerate() {
194        let price = bid[0]
195            .parse::<f64>()
196            .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
197        let size = bid[1]
198            .parse::<f64>()
199            .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
200
201        let action = if size == 0.0 {
202            BookAction::Delete
203        } else {
204            BookAction::Update
205        };
206
207        let is_last = i == msg.bids.len() - 1 && msg.asks.is_empty();
208        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
209
210        let order = BookOrder::new(
211            OrderSide::Buy,
212            Price::new(price, price_precision),
213            Quantity::new(size, size_precision),
214            0,
215        );
216
217        deltas.push(OrderBookDelta::new(
218            instrument_id,
219            action,
220            order,
221            flags,
222            msg.final_update_id,
223            ts_event,
224            ts_init,
225        ));
226    }
227
228    // Process asks
229    for (i, ask) in msg.asks.iter().enumerate() {
230        let price = ask[0]
231            .parse::<f64>()
232            .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
233        let size = ask[1]
234            .parse::<f64>()
235            .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
236
237        let action = if size == 0.0 {
238            BookAction::Delete
239        } else {
240            BookAction::Update
241        };
242
243        let is_last = i == msg.asks.len() - 1;
244        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
245
246        let order = BookOrder::new(
247            OrderSide::Sell,
248            Price::new(price, price_precision),
249            Quantity::new(size, size_precision),
250            0,
251        );
252
253        deltas.push(OrderBookDelta::new(
254            instrument_id,
255            action,
256            order,
257            flags,
258            msg.final_update_id,
259            ts_event,
260            ts_init,
261        ));
262    }
263
264    Ok(OrderBookDeltas::new(instrument_id, deltas))
265}
266
267/// Parses a mark price message into `MarkPriceUpdate`, `IndexPriceUpdate`, and `FundingRateUpdate`.
268///
269/// # Errors
270///
271/// Returns an error if parsing fails.
272pub fn parse_mark_price(
273    msg: &BinanceFuturesMarkPriceMsg,
274    instrument: &InstrumentAny,
275    ts_init: UnixNanos,
276) -> BinanceWsResult<(MarkPriceUpdate, IndexPriceUpdate, FundingRateUpdate)> {
277    let instrument_id = instrument.id();
278    let price_precision = instrument.price_precision();
279
280    let mark_price = msg
281        .mark_price
282        .parse::<f64>()
283        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
284    let index_price = msg
285        .index_price
286        .parse::<f64>()
287        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
288    let funding_rate = msg
289        .funding_rate
290        .parse::<f64>()
291        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
292
293    let ts_event = UnixNanos::from_millis(msg.event_time as u64);
294    let next_funding_ns = if msg.next_funding_time > 0 {
295        Some(UnixNanos::from_millis(msg.next_funding_time as u64))
296    } else {
297        None
298    };
299
300    let mark_update = MarkPriceUpdate::new(
301        instrument_id,
302        Price::new(mark_price, price_precision),
303        ts_event,
304        ts_init,
305    );
306
307    let index_update = IndexPriceUpdate::new(
308        instrument_id,
309        Price::new(index_price, price_precision),
310        ts_event,
311        ts_init,
312    );
313
314    let funding_update = FundingRateUpdate::new(
315        instrument_id,
316        Decimal::from_f64(funding_rate).unwrap_or_default(),
317        None, // Binance does not provide the funding interval through WebSocket API
318        next_funding_ns,
319        ts_event,
320        ts_init,
321    );
322
323    Ok((mark_update, index_update, funding_update))
324}
325
326/// Converts a Binance kline interval to a Nautilus `BarSpecification`.
327fn interval_to_bar_spec(interval: BinanceKlineInterval) -> BarSpecification {
328    match interval {
329        BinanceKlineInterval::Second1 => {
330            BarSpecification::new(1, BarAggregation::Second, PriceType::Last)
331        }
332        BinanceKlineInterval::Minute1 => {
333            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last)
334        }
335        BinanceKlineInterval::Minute3 => {
336            BarSpecification::new(3, BarAggregation::Minute, PriceType::Last)
337        }
338        BinanceKlineInterval::Minute5 => {
339            BarSpecification::new(5, BarAggregation::Minute, PriceType::Last)
340        }
341        BinanceKlineInterval::Minute15 => {
342            BarSpecification::new(15, BarAggregation::Minute, PriceType::Last)
343        }
344        BinanceKlineInterval::Minute30 => {
345            BarSpecification::new(30, BarAggregation::Minute, PriceType::Last)
346        }
347        BinanceKlineInterval::Hour1 => {
348            BarSpecification::new(1, BarAggregation::Hour, PriceType::Last)
349        }
350        BinanceKlineInterval::Hour2 => {
351            BarSpecification::new(2, BarAggregation::Hour, PriceType::Last)
352        }
353        BinanceKlineInterval::Hour4 => {
354            BarSpecification::new(4, BarAggregation::Hour, PriceType::Last)
355        }
356        BinanceKlineInterval::Hour6 => {
357            BarSpecification::new(6, BarAggregation::Hour, PriceType::Last)
358        }
359        BinanceKlineInterval::Hour8 => {
360            BarSpecification::new(8, BarAggregation::Hour, PriceType::Last)
361        }
362        BinanceKlineInterval::Hour12 => {
363            BarSpecification::new(12, BarAggregation::Hour, PriceType::Last)
364        }
365        BinanceKlineInterval::Day1 => {
366            BarSpecification::new(1, BarAggregation::Day, PriceType::Last)
367        }
368        BinanceKlineInterval::Day3 => {
369            BarSpecification::new(3, BarAggregation::Day, PriceType::Last)
370        }
371        BinanceKlineInterval::Week1 => {
372            BarSpecification::new(1, BarAggregation::Week, PriceType::Last)
373        }
374        BinanceKlineInterval::Month1 => {
375            BarSpecification::new(1, BarAggregation::Month, PriceType::Last)
376        }
377    }
378}
379
380/// Parses a kline message into a `Bar`.
381///
382/// Returns `None` if the kline is not closed yet.
383///
384/// # Errors
385///
386/// Returns an error if parsing fails.
387pub fn parse_kline(
388    msg: &BinanceFuturesKlineMsg,
389    instrument: &InstrumentAny,
390    ts_init: UnixNanos,
391) -> BinanceWsResult<Option<Bar>> {
392    // Only emit bars when the kline is closed
393    if !msg.kline.is_closed {
394        return Ok(None);
395    }
396
397    let instrument_id = instrument.id();
398    let price_precision = instrument.price_precision();
399    let size_precision = instrument.size_precision();
400
401    let spec = interval_to_bar_spec(msg.kline.interval);
402    let bar_type = BarType::new(instrument_id, spec, AggregationSource::External);
403
404    let open = msg
405        .kline
406        .open
407        .parse::<f64>()
408        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
409    let high = msg
410        .kline
411        .high
412        .parse::<f64>()
413        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
414    let low = msg
415        .kline
416        .low
417        .parse::<f64>()
418        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
419    let close = msg
420        .kline
421        .close
422        .parse::<f64>()
423        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
424    let volume = msg
425        .kline
426        .volume
427        .parse::<f64>()
428        .map_err(|e| BinanceWsError::ParseError(e.to_string()))?;
429
430    // Use the kline close time as the event timestamp
431    let ts_event = UnixNanos::from_millis(msg.kline.close_time as u64);
432
433    let bar = Bar::new(
434        bar_type,
435        Price::new(open, price_precision),
436        Price::new(high, price_precision),
437        Price::new(low, price_precision),
438        Price::new(close, price_precision),
439        Quantity::new(volume, size_precision),
440        ts_event,
441        ts_init,
442    );
443
444    Ok(Some(bar))
445}
446
447/// Extracts the symbol from a raw JSON message.
448pub fn extract_symbol(json: &serde_json::Value) -> Option<Ustr> {
449    json.get("s").and_then(|v| v.as_str()).map(Ustr::from)
450}
451
452/// Extracts the event type from a raw JSON message.
453pub fn extract_event_type(json: &serde_json::Value) -> Option<BinanceWsEventType> {
454    json.get("e")
455        .and_then(|v| serde_json::from_value(v.clone()).ok())
456}
457
458#[cfg(test)]
459mod tests {
460    use rstest::rstest;
461    use serde::de::DeserializeOwned;
462    use serde_json::json;
463
464    use super::*;
465    use crate::{
466        common::{
467            enums::{BinanceOrderStatus, BinanceSide, BinanceTradingStatus},
468            parse::parse_usdm_instrument,
469            testing::{load_fixture_string, load_json_fixture},
470        },
471        futures::{
472            http::models::BinanceFuturesUsdSymbol,
473            websocket::streams::messages::{BinanceFuturesLiquidationMsg, BinanceFuturesTickerMsg},
474        },
475    };
476
477    const PRICE_PRECISION: u8 = 8;
478    const SIZE_PRECISION: u8 = 3;
479
480    fn sample_futures_symbol() -> BinanceFuturesUsdSymbol {
481        BinanceFuturesUsdSymbol {
482            symbol: Ustr::from("BTCUSDT"),
483            pair: Ustr::from("BTCUSDT"),
484            contract_type: "PERPETUAL".to_string(),
485            delivery_date: 4_133_404_800_000,
486            onboard_date: 1_569_398_400_000,
487            status: BinanceTradingStatus::Trading,
488            maint_margin_percent: "2.5000".to_string(),
489            required_margin_percent: "5.0000".to_string(),
490            base_asset: Ustr::from("BTC"),
491            quote_asset: Ustr::from("USDT"),
492            margin_asset: Ustr::from("USDT"),
493            price_precision: PRICE_PRECISION as i32,
494            quantity_precision: SIZE_PRECISION as i32,
495            base_asset_precision: 8,
496            quote_precision: 8,
497            underlying_type: Some("COIN".to_string()),
498            underlying_sub_type: vec!["PoW".to_string()],
499            settle_plan: None,
500            trigger_protect: Some("0.0500".to_string()),
501            liquidation_fee: Some("0.012500".to_string()),
502            market_take_bound: Some("0.05".to_string()),
503            order_types: vec!["LIMIT".to_string(), "MARKET".to_string()],
504            time_in_force: vec!["GTC".to_string(), "IOC".to_string()],
505            filters: vec![
506                json!({
507                    "filterType": "PRICE_FILTER",
508                    "tickSize": "0.00000001",
509                    "maxPrice": "1000000",
510                    "minPrice": "0.00000001"
511                }),
512                json!({
513                    "filterType": "LOT_SIZE",
514                    "stepSize": "0.001",
515                    "maxQty": "1000",
516                    "minQty": "0.001"
517                }),
518            ],
519        }
520    }
521
522    fn sample_instrument() -> InstrumentAny {
523        let ts = UnixNanos::from(1_700_000_000_000_000_000u64);
524        parse_usdm_instrument(&sample_futures_symbol(), ts, ts).unwrap()
525    }
526
527    fn load_market_fixture<T: DeserializeOwned>(filename: &str) -> T {
528        let path = format!("futures/market_data_json/{filename}");
529        serde_json::from_str(&load_fixture_string(&path))
530            .unwrap_or_else(|e| panic!("Failed to parse fixture {path}: {e}"))
531    }
532
533    #[rstest]
534    fn test_parse_agg_trade() {
535        let instrument = sample_instrument();
536        let msg: BinanceFuturesAggTradeMsg = load_market_fixture("agg_trade_stream.json");
537        let ts_init = UnixNanos::from(1_700_000_001_000_000_000u64);
538
539        let trade = parse_agg_trade(&msg, &instrument, ts_init).unwrap();
540
541        assert_eq!(trade.instrument_id, instrument.id());
542        assert_eq!(trade.price, Price::new(0.001, PRICE_PRECISION));
543        assert_eq!(trade.size, Quantity::new(100.0, SIZE_PRECISION));
544        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
545        assert_eq!(trade.trade_id, TradeId::new("5933014"));
546        assert_eq!(trade.ts_event, UnixNanos::from(123_456_785_000_000u64));
547        assert_eq!(trade.ts_init, ts_init);
548    }
549
550    #[rstest]
551    fn test_parse_trade() {
552        let instrument = sample_instrument();
553        let msg: BinanceFuturesTradeMsg = load_market_fixture("trade_stream.json");
554        let ts_init = UnixNanos::from(1_700_000_001_000_000_000u64);
555
556        let trade = parse_trade(&msg, &instrument, ts_init).unwrap();
557
558        assert_eq!(trade.instrument_id, instrument.id());
559        assert_eq!(trade.price, Price::new(0.001, PRICE_PRECISION));
560        assert_eq!(trade.size, Quantity::new(100.0, SIZE_PRECISION));
561        assert_eq!(trade.aggressor_side, AggressorSide::Seller);
562        assert_eq!(trade.trade_id, TradeId::new("5933014"));
563        assert_eq!(trade.ts_event, UnixNanos::from(123_456_785_000_000u64));
564        assert_eq!(trade.ts_init, ts_init);
565    }
566
567    #[rstest]
568    fn test_parse_book_ticker() {
569        let instrument = sample_instrument();
570        let msg: BinanceFuturesBookTickerMsg = load_market_fixture("book_ticker_stream.json");
571        let ts_init = UnixNanos::from(1_700_000_001_000_000_000u64);
572
573        let quote = parse_book_ticker(&msg, &instrument, ts_init).unwrap();
574
575        assert_eq!(quote.instrument_id, instrument.id());
576        assert_eq!(quote.bid_price, Price::new(25.3519, PRICE_PRECISION));
577        assert_eq!(quote.ask_price, Price::new(25.3652, PRICE_PRECISION));
578        assert_eq!(quote.bid_size, Quantity::new(31.21, SIZE_PRECISION));
579        assert_eq!(quote.ask_size, Quantity::new(40.66, SIZE_PRECISION));
580        assert_eq!(
581            quote.ts_event,
582            UnixNanos::from(1_568_014_460_891_000_000u64)
583        );
584        assert_eq!(quote.ts_init, ts_init);
585    }
586
587    #[rstest]
588    fn test_parse_depth_update() {
589        let instrument = sample_instrument();
590        let msg: BinanceFuturesDepthUpdateMsg = load_market_fixture("depth_update_stream.json");
591        let ts_init = UnixNanos::from(1_700_000_001_000_000_000u64);
592
593        let deltas = parse_depth_update(&msg, &instrument, ts_init).unwrap();
594
595        assert_eq!(deltas.instrument_id, instrument.id());
596        assert_eq!(deltas.deltas.len(), 2);
597        assert_eq!(deltas.sequence, 160);
598        assert_eq!(deltas.ts_event, UnixNanos::from(123_456_788_000_000u64));
599        assert_eq!(deltas.ts_init, ts_init);
600        assert_eq!(deltas.deltas[0].action, BookAction::Update);
601        assert_eq!(deltas.deltas[0].order.side, OrderSide::Buy);
602        assert_eq!(
603            deltas.deltas[0].order.price,
604            Price::new(0.0024, PRICE_PRECISION)
605        );
606        assert_eq!(
607            deltas.deltas[0].order.size,
608            Quantity::new(10.0, SIZE_PRECISION)
609        );
610        assert_eq!(deltas.deltas[1].action, BookAction::Update);
611        assert_eq!(deltas.deltas[1].order.side, OrderSide::Sell);
612        assert_eq!(
613            deltas.deltas[1].order.price,
614            Price::new(0.0026, PRICE_PRECISION)
615        );
616        assert_eq!(
617            deltas.deltas[1].order.size,
618            Quantity::new(100.0, SIZE_PRECISION)
619        );
620        assert_eq!(deltas.deltas[1].flags, RecordFlag::F_LAST as u8);
621    }
622
623    #[rstest]
624    fn test_parse_mark_price() {
625        let instrument = sample_instrument();
626        let msg: BinanceFuturesMarkPriceMsg = load_market_fixture("mark_price_stream.json");
627        let ts_init = UnixNanos::from(1_700_000_001_000_000_000u64);
628
629        let (mark, index, funding) = parse_mark_price(&msg, &instrument, ts_init).unwrap();
630
631        assert_eq!(mark.instrument_id, instrument.id());
632        assert_eq!(mark.value, Price::new(11794.15, PRICE_PRECISION));
633        assert_eq!(index.value, Price::new(11784.62659091, PRICE_PRECISION));
634        assert_eq!(mark.ts_event, UnixNanos::from(1_562_305_380_000_000_000u64));
635        assert_eq!(funding.instrument_id, instrument.id());
636        assert_eq!(funding.rate.to_string(), "0.00038167");
637        assert_eq!(
638            funding.next_funding_ns,
639            Some(UnixNanos::from(1_562_306_400_000_000_000u64))
640        );
641        assert_eq!(
642            funding.ts_event,
643            UnixNanos::from(1_562_305_380_000_000_000u64)
644        );
645        assert_eq!(funding.ts_init, ts_init);
646    }
647
648    #[rstest]
649    fn test_parse_kline_closed() {
650        let instrument = sample_instrument();
651        let msg: BinanceFuturesKlineMsg = load_market_fixture("kline_stream_closed.json");
652        let ts_init = UnixNanos::from(1_700_000_001_000_000_000u64);
653
654        let bar = parse_kline(&msg, &instrument, ts_init).unwrap().unwrap();
655
656        assert_eq!(bar.bar_type.instrument_id(), instrument.id());
657        assert_eq!(bar.open, Price::new(0.001, PRICE_PRECISION));
658        assert_eq!(bar.high, Price::new(0.0025, PRICE_PRECISION));
659        assert_eq!(bar.low, Price::new(0.001, PRICE_PRECISION));
660        assert_eq!(bar.close, Price::new(0.002, PRICE_PRECISION));
661        assert_eq!(bar.volume, Quantity::new(1000.0, SIZE_PRECISION));
662        assert_eq!(bar.ts_event, UnixNanos::from(1_638_747_719_999_000_000u64));
663        assert_eq!(bar.ts_init, ts_init);
664    }
665
666    #[rstest]
667    fn test_parse_kline_open_returns_none() {
668        let instrument = sample_instrument();
669        let msg: BinanceFuturesKlineMsg = load_market_fixture("kline_stream_open.json");
670
671        let bar = parse_kline(&msg, &instrument, UnixNanos::default()).unwrap();
672
673        assert!(bar.is_none());
674    }
675
676    #[rstest]
677    fn test_parse_mark_price_funding_rate_fields() {
678        let instrument = sample_instrument();
679        let msg: BinanceFuturesMarkPriceMsg = load_market_fixture("mark_price_stream.json");
680        let ts_init = UnixNanos::from(1_700_000_001_000_000_000u64);
681
682        let (_mark, _index, funding) = parse_mark_price(&msg, &instrument, ts_init).unwrap();
683
684        assert_eq!(funding.instrument_id, instrument.id());
685        assert_eq!(funding.rate.to_string(), "0.00038167");
686        assert!(funding.interval.is_none());
687        assert_eq!(
688            funding.next_funding_ns,
689            Some(UnixNanos::from(1_562_306_400_000_000_000u64))
690        );
691        assert_eq!(
692            funding.ts_event,
693            UnixNanos::from(1_562_305_380_000_000_000u64)
694        );
695        assert_eq!(funding.ts_init, ts_init);
696    }
697
698    #[rstest]
699    fn test_deserialize_liquidation_msg() {
700        let msg: BinanceFuturesLiquidationMsg = load_market_fixture("liquidation_stream.json");
701
702        assert_eq!(msg.event_type, "forceOrder");
703        assert_eq!(msg.event_time, 1_568_014_460_893);
704        assert_eq!(msg.order.symbol, Ustr::from("BTCUSDT"));
705        assert_eq!(msg.order.side, BinanceSide::Sell);
706        assert_eq!(msg.order.original_qty, "0.014");
707        assert_eq!(msg.order.average_price, "9910.12345678");
708        assert_eq!(msg.order.status, BinanceOrderStatus::Filled);
709        assert_eq!(msg.order.accumulated_qty, "0.014");
710        assert_eq!(msg.order.trade_time, 1_568_014_460_893);
711    }
712
713    #[rstest]
714    fn test_deserialize_ticker_msg() {
715        let msg: BinanceFuturesTickerMsg = load_market_fixture("ticker_stream.json");
716
717        assert_eq!(msg.event_type, "24hrTicker");
718        assert_eq!(msg.symbol, Ustr::from("BTCUSDT"));
719        assert_eq!(msg.price_change, "-131.40000000");
720        assert_eq!(msg.price_change_percent, "-0.786");
721        assert_eq!(msg.weighted_avg_price, "16628.97377498");
722        assert_eq!(msg.last_price, "16584.60000000");
723        assert_eq!(msg.open_price, "16716.00000000");
724        assert_eq!(msg.high_price, "16764.89000000");
725        assert_eq!(msg.low_price, "16456.51000000");
726        assert_eq!(msg.volume, "122474.816");
727        assert_eq!(msg.quote_volume, "2036102085.69746400");
728        assert_eq!(msg.num_trades, 142853);
729    }
730
731    #[rstest]
732    fn test_extract_symbol() {
733        let json = load_json_fixture("futures/market_data_json/book_ticker_stream.json");
734
735        let symbol = extract_symbol(&json);
736
737        assert_eq!(symbol, Some(Ustr::from("BNBUSDT")));
738    }
739
740    #[rstest]
741    fn test_extract_event_type() {
742        let json = load_json_fixture("futures/market_data_json/mark_price_stream.json");
743
744        let event_type = extract_event_type(&json);
745
746        assert_eq!(event_type, Some(BinanceWsEventType::MarkPriceUpdate));
747    }
748
749    #[rstest]
750    fn test_extract_event_type_force_order() {
751        let json = load_json_fixture("futures/market_data_json/liquidation_stream.json");
752
753        let event_type = extract_event_type(&json);
754
755        assert_eq!(event_type, Some(BinanceWsEventType::ForceOrder));
756    }
757
758    #[rstest]
759    fn test_extract_event_type_ticker() {
760        let json = load_json_fixture("futures/market_data_json/ticker_stream.json");
761
762        let event_type = extract_event_type(&json);
763
764        assert_eq!(event_type, Some(BinanceWsEventType::Ticker24Hr));
765    }
766}