1use std::sync::{
19 Arc, RwLock,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 clients::DataClient,
28 live::{runner::get_data_event_sender, runtime::get_runtime},
29 messages::{
30 DataEvent,
31 data::{
32 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34 SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
35 SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
36 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeFundingRates,
37 UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
38 subscribe::SubscribeInstrumentStatus, unsubscribe::UnsubscribeInstrumentStatus,
39 },
40 },
41};
42use nautilus_core::{
43 AtomicMap, MUTEX_POISONED,
44 datetime::{NANOSECONDS_IN_MILLISECOND, datetime_to_unix_nanos},
45 nanos::UnixNanos,
46 time::{AtomicTime, get_atomic_clock_realtime},
47};
48use nautilus_model::{
49 data::{BookOrder, Data, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
50 enums::{BookAction, BookType, MarketStatusAction, OrderSide, RecordFlag},
51 identifiers::{ClientId, InstrumentId, Venue},
52 instruments::{Instrument, InstrumentAny},
53 types::{Price, Quantity},
54};
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57use ustr::Ustr;
58
59use crate::{
60 common::{
61 consts::{BINANCE_BOOK_DEPTHS, BINANCE_VENUE},
62 enums::{BinanceEnvironment, BinanceProductType},
63 parse::bar_spec_to_binance_interval,
64 status::diff_and_emit_statuses,
65 symbol::format_binance_stream_symbol,
66 urls::{get_usdm_ws_route_base_url, get_ws_public_base_url},
67 },
68 config::BinanceDataClientConfig,
69 futures::{
70 http::{
71 client::BinanceFuturesHttpClient, models::BinanceOrderBook, query::BinanceDepthParams,
72 },
73 websocket::streams::{
74 client::BinanceFuturesWebSocketClient,
75 messages::BinanceFuturesWsStreamsMessage,
76 parse_data::{
77 parse_agg_trade, parse_book_ticker, parse_depth_update, parse_kline,
78 parse_mark_price, parse_trade,
79 },
80 },
81 },
82};
83
84#[derive(Debug, Clone)]
85struct BufferedDepthUpdate {
86 deltas: OrderBookDeltas,
87 first_update_id: u64,
88 final_update_id: u64,
89 prev_final_update_id: u64,
90}
91
92#[derive(Debug, Clone)]
93struct BookBuffer {
94 updates: Vec<BufferedDepthUpdate>,
95 epoch: u64,
96}
97
98impl BookBuffer {
99 fn new(epoch: u64) -> Self {
100 Self {
101 updates: Vec::new(),
102 epoch,
103 }
104 }
105}
106
107#[derive(Debug)]
109pub struct BinanceFuturesDataClient {
110 clock: &'static AtomicTime,
111 client_id: ClientId,
112 config: BinanceDataClientConfig,
113 product_type: BinanceProductType,
114 http_client: BinanceFuturesHttpClient,
115 ws_client: BinanceFuturesWebSocketClient,
116 ws_public_client: BinanceFuturesWebSocketClient,
117 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
118 is_connected: AtomicBool,
119 cancellation_token: CancellationToken,
120 tasks: Vec<JoinHandle<()>>,
121 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
122 status_cache: Arc<AtomicMap<InstrumentId, MarketStatusAction>>,
123 book_buffers: Arc<AtomicMap<InstrumentId, BookBuffer>>,
124 book_subscriptions: Arc<AtomicMap<InstrumentId, u32>>,
125 mark_price_refs: Arc<AtomicMap<InstrumentId, u32>>,
126 book_epoch: Arc<RwLock<u64>>,
127}
128
129impl BinanceFuturesDataClient {
130 pub fn new(
137 client_id: ClientId,
138 config: BinanceDataClientConfig,
139 product_type: BinanceProductType,
140 ) -> anyhow::Result<Self> {
141 match product_type {
142 BinanceProductType::UsdM | BinanceProductType::CoinM => {}
143 _ => {
144 anyhow::bail!(
145 "BinanceFuturesDataClient requires UsdM or CoinM product type, was {product_type:?}"
146 );
147 }
148 }
149
150 let clock = get_atomic_clock_realtime();
151 let data_sender = get_data_event_sender();
152
153 let http_client = BinanceFuturesHttpClient::new(
154 product_type,
155 config.environment,
156 clock,
157 config.api_key.clone(),
158 config.api_secret.clone(),
159 config.base_url_http.clone(),
160 None, None, None, false, )?;
165
166 let market_url = config.base_url_ws.clone().map(|url| {
167 if product_type == BinanceProductType::UsdM
168 && config.environment == BinanceEnvironment::Mainnet
169 {
170 get_usdm_ws_route_base_url(&url, "market")
171 } else {
172 url
173 }
174 });
175
176 let ws_client = BinanceFuturesWebSocketClient::new(
177 product_type,
178 config.environment,
179 config.api_key.clone(),
180 config.api_secret.clone(),
181 market_url,
182 Some(20), config.transport_backend,
184 )?;
185
186 let public_url = config.base_url_ws.clone().map_or_else(
187 || get_ws_public_base_url(product_type, config.environment).to_string(),
188 |url| {
189 if product_type == BinanceProductType::UsdM
190 && config.environment == BinanceEnvironment::Mainnet
191 {
192 get_usdm_ws_route_base_url(&url, "public")
193 } else {
194 url
195 }
196 },
197 );
198 let ws_public_client = BinanceFuturesWebSocketClient::new(
199 product_type,
200 config.environment,
201 None,
202 None,
203 Some(public_url),
204 Some(20),
205 config.transport_backend,
206 )?;
207
208 Ok(Self {
209 clock,
210 client_id,
211 config,
212 product_type,
213 http_client,
214 ws_client,
215 ws_public_client,
216 data_sender,
217 is_connected: AtomicBool::new(false),
218 cancellation_token: CancellationToken::new(),
219 tasks: Vec::new(),
220 instruments: Arc::new(AtomicMap::new()),
221 status_cache: Arc::new(AtomicMap::new()),
222 book_buffers: Arc::new(AtomicMap::new()),
223 book_subscriptions: Arc::new(AtomicMap::new()),
224 mark_price_refs: Arc::new(AtomicMap::new()),
225 book_epoch: Arc::new(RwLock::new(0)),
226 })
227 }
228
229 fn venue(&self) -> Venue {
230 *BINANCE_VENUE
231 }
232
233 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
234 if let Err(e) = sender.send(DataEvent::Data(data)) {
235 log::error!("Failed to emit data event: {e}");
236 }
237 }
238
239 fn spawn_ws<F>(&self, fut: F, context: &'static str)
240 where
241 F: Future<Output = anyhow::Result<()>> + Send + 'static,
242 {
243 get_runtime().spawn(async move {
244 if let Err(e) = fut.await {
245 log::error!("{context}: {e:?}");
246 }
247 });
248 }
249
250 #[expect(clippy::too_many_arguments)]
251 fn handle_ws_message(
252 msg: BinanceFuturesWsStreamsMessage,
253 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
254 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
255 ws_instruments: &Arc<AtomicMap<Ustr, InstrumentAny>>,
256 book_buffers: &Arc<AtomicMap<InstrumentId, BookBuffer>>,
257 book_subscriptions: &Arc<AtomicMap<InstrumentId, u32>>,
258 book_epoch: &Arc<RwLock<u64>>,
259 http_client: &BinanceFuturesHttpClient,
260 clock: &'static AtomicTime,
261 ) {
262 let ts_init = clock.get_time_ns();
263 let cache = ws_instruments.load();
264
265 match msg {
266 BinanceFuturesWsStreamsMessage::AggTrade(ref trade_msg) => {
267 if let Some(instrument) = cache.get(&trade_msg.symbol) {
268 match parse_agg_trade(trade_msg, instrument, ts_init) {
269 Ok(trade) => Self::send_data(data_sender, Data::Trade(trade)),
270 Err(e) => log::warn!("Failed to parse aggregate trade: {e}"),
271 }
272 }
273 }
274 BinanceFuturesWsStreamsMessage::Trade(ref trade_msg) => {
275 if let Some(instrument) = cache.get(&trade_msg.symbol) {
276 match parse_trade(trade_msg, instrument, ts_init) {
277 Ok(trade) => Self::send_data(data_sender, Data::Trade(trade)),
278 Err(e) => log::warn!("Failed to parse trade: {e}"),
279 }
280 }
281 }
282 BinanceFuturesWsStreamsMessage::BookTicker(ref ticker_msg) => {
283 if let Some(instrument) = cache.get(&ticker_msg.symbol) {
284 match parse_book_ticker(ticker_msg, instrument, ts_init) {
285 Ok(quote) => Self::send_data(data_sender, Data::Quote(quote)),
286 Err(e) => log::warn!("Failed to parse book ticker: {e}"),
287 }
288 }
289 }
290 BinanceFuturesWsStreamsMessage::DepthUpdate(ref depth_msg) => {
291 if let Some(instrument) = cache.get(&depth_msg.symbol) {
292 match parse_depth_update(depth_msg, instrument, ts_init) {
293 Ok(deltas) => {
294 let instrument_id = deltas.instrument_id;
295 let final_update_id = deltas.sequence;
296 let first_update_id = depth_msg.first_update_id;
297 let prev_final_update_id = depth_msg.prev_final_update_id;
298
299 if book_buffers.contains_key(&instrument_id) {
300 let mut was_buffered = false;
301 book_buffers.rcu(|m| {
302 was_buffered = false;
303
304 if let Some(buffer) = m.get_mut(&instrument_id) {
305 buffer.updates.push(BufferedDepthUpdate {
306 deltas: deltas.clone(),
307 first_update_id,
308 final_update_id,
309 prev_final_update_id,
310 });
311 was_buffered = true;
312 }
313 });
314
315 if was_buffered {
316 return;
317 }
318 }
319
320 Self::send_data(
321 data_sender,
322 Data::Deltas(OrderBookDeltas_API::new(deltas)),
323 );
324 }
325 Err(e) => log::warn!("Failed to parse depth update: {e}"),
326 }
327 }
328 }
329 BinanceFuturesWsStreamsMessage::MarkPrice(ref mark_msg) => {
330 if let Some(instrument) = cache.get(&mark_msg.symbol) {
331 match parse_mark_price(mark_msg, instrument, ts_init) {
332 Ok((mark_update, index_update, funding_update)) => {
333 Self::send_data(data_sender, Data::MarkPriceUpdate(mark_update));
334 Self::send_data(data_sender, Data::IndexPriceUpdate(index_update));
335 if let Err(e) = data_sender.send(DataEvent::FundingRate(funding_update))
336 {
337 log::error!("Failed to emit funding rate: {e}");
338 }
339 }
340 Err(e) => log::warn!("Failed to parse mark price: {e}"),
341 }
342 }
343 }
344 BinanceFuturesWsStreamsMessage::Kline(ref kline_msg) => {
345 if let Some(instrument) = cache.get(&kline_msg.symbol) {
346 match parse_kline(kline_msg, instrument, ts_init) {
347 Ok(Some(bar)) => Self::send_data(data_sender, Data::Bar(bar)),
348 Ok(None) => {} Err(e) => log::warn!("Failed to parse kline: {e}"),
350 }
351 }
352 }
353 BinanceFuturesWsStreamsMessage::ForceOrder(ref liq_msg) => {
354 log::info!(
355 "Liquidation: {} {:?} {:?} qty={} at price={}",
356 liq_msg.order.symbol,
357 liq_msg.order.side,
358 liq_msg.order.status,
359 liq_msg.order.original_qty,
360 liq_msg.order.average_price,
361 );
362 }
363 BinanceFuturesWsStreamsMessage::Ticker(ref ticker_msg) => {
364 log::debug!(
365 "Ticker: {} last={} vol={}",
366 ticker_msg.symbol,
367 ticker_msg.last_price,
368 ticker_msg.volume,
369 );
370 }
371 BinanceFuturesWsStreamsMessage::AccountUpdate(_)
373 | BinanceFuturesWsStreamsMessage::OrderUpdate(_)
374 | BinanceFuturesWsStreamsMessage::TradeLite(_)
375 | BinanceFuturesWsStreamsMessage::AlgoUpdate(_)
376 | BinanceFuturesWsStreamsMessage::MarginCall(_)
377 | BinanceFuturesWsStreamsMessage::AccountConfigUpdate(_)
378 | BinanceFuturesWsStreamsMessage::ListenKeyExpired => {}
379 BinanceFuturesWsStreamsMessage::Error(e) => {
380 log::error!(
381 "Binance Futures WebSocket error: code={}, msg={}",
382 e.code,
383 e.msg
384 );
385 }
386 BinanceFuturesWsStreamsMessage::Reconnected => {
387 log::info!("WebSocket reconnected, rebuilding order book snapshots");
388
389 let epoch = {
390 let mut guard = book_epoch.write().expect(MUTEX_POISONED);
391 *guard = guard.wrapping_add(1);
392 *guard
393 };
394
395 let subs: Vec<(InstrumentId, u32)> = {
396 let guard = book_subscriptions.load();
397 guard.iter().map(|(k, v)| (*k, *v)).collect()
398 };
399
400 for (instrument_id, depth) in subs {
401 book_buffers.insert(instrument_id, BookBuffer::new(epoch));
402
403 log::info!(
404 "OrderBook snapshot rebuild for {instrument_id} @ depth {depth} \
405 starting (reconnect, epoch={epoch})"
406 );
407
408 let http = http_client.clone();
409 let sender = data_sender.clone();
410 let buffers = book_buffers.clone();
411 let insts = instruments.clone();
412
413 get_runtime().spawn(async move {
414 Self::fetch_and_emit_snapshot(
415 http,
416 sender,
417 buffers,
418 insts,
419 instrument_id,
420 depth,
421 epoch,
422 clock,
423 )
424 .await;
425 });
426 }
427 }
428 }
429 }
430
431 #[expect(clippy::too_many_arguments)]
432 async fn fetch_and_emit_snapshot(
433 http: BinanceFuturesHttpClient,
434 sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
435 buffers: Arc<AtomicMap<InstrumentId, BookBuffer>>,
436 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
437 instrument_id: InstrumentId,
438 depth: u32,
439 epoch: u64,
440 clock: &'static AtomicTime,
441 ) {
442 Self::fetch_and_emit_snapshot_inner(
443 http,
444 sender,
445 buffers,
446 instruments,
447 instrument_id,
448 depth,
449 epoch,
450 clock,
451 0,
452 )
453 .await;
454 }
455
456 #[expect(clippy::too_many_arguments)]
457 async fn fetch_and_emit_snapshot_inner(
458 http: BinanceFuturesHttpClient,
459 sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
460 buffers: Arc<AtomicMap<InstrumentId, BookBuffer>>,
461 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
462 instrument_id: InstrumentId,
463 depth: u32,
464 epoch: u64,
465 clock: &'static AtomicTime,
466 retry_count: u32,
467 ) {
468 const MAX_RETRIES: u32 = 3;
469
470 let symbol = format_binance_stream_symbol(&instrument_id).to_uppercase();
471 let params = BinanceDepthParams {
472 symbol,
473 limit: Some(depth),
474 };
475
476 match http.depth(¶ms).await {
477 Ok(order_book) => {
478 let ts_init = clock.get_time_ns();
479 let last_update_id = order_book.last_update_id as u64;
480
481 {
483 let guard = buffers.load();
484 match guard.get(&instrument_id) {
485 None => {
486 log::debug!(
487 "OrderBook subscription for {instrument_id} was cancelled, \
488 discarding snapshot"
489 );
490 return;
491 }
492 Some(buffer) if buffer.epoch != epoch => {
493 log::debug!(
494 "OrderBook snapshot for {instrument_id} is stale \
495 (epoch {epoch} != {}), discarding",
496 buffer.epoch
497 );
498 return;
499 }
500 _ => {}
501 }
502 }
503
504 let (price_precision, size_precision) = {
506 let guard = instruments.load();
507 match guard.get(&instrument_id) {
508 Some(inst) => (inst.price_precision(), inst.size_precision()),
509 None => {
510 log::error!("No instrument in cache for snapshot: {instrument_id}");
511 buffers.remove(&instrument_id);
512 return;
513 }
514 }
515 };
516
517 let first_valid = {
520 let guard = buffers.load();
521 guard.get(&instrument_id).and_then(|buffer| {
522 buffer
523 .updates
524 .iter()
525 .find(|u| u.final_update_id > last_update_id)
526 .cloned()
527 })
528 };
529
530 if let Some(first) = &first_valid {
531 let target = last_update_id + 1;
532 let valid_overlap =
533 first.first_update_id <= target && first.final_update_id >= target;
534
535 if !valid_overlap {
536 if retry_count < MAX_RETRIES {
537 log::warn!(
538 "OrderBook overlap validation failed for {instrument_id}: \
539 lastUpdateId={last_update_id}, first_update_id={}, \
540 final_update_id={} (need U <= {} <= u), \
541 retrying snapshot (attempt {}/{})",
542 first.first_update_id,
543 first.final_update_id,
544 target,
545 retry_count + 1,
546 MAX_RETRIES
547 );
548
549 buffers.rcu(|m| {
550 if let Some(buffer) = m.get_mut(&instrument_id)
551 && buffer.epoch == epoch
552 {
553 buffer.updates.clear();
554 }
555 });
556
557 Box::pin(Self::fetch_and_emit_snapshot_inner(
558 http,
559 sender,
560 buffers,
561 instruments,
562 instrument_id,
563 depth,
564 epoch,
565 clock,
566 retry_count + 1,
567 ))
568 .await;
569 return;
570 }
571 log::error!(
572 "OrderBook overlap validation failed for {instrument_id} after \
573 {MAX_RETRIES} retries; book may be inconsistent"
574 );
575 }
576 }
577
578 let snapshot_deltas = parse_order_book_snapshot(
579 &order_book,
580 instrument_id,
581 price_precision,
582 size_precision,
583 ts_init,
584 );
585
586 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(
587 OrderBookDeltas_API::new(snapshot_deltas),
588 ))) {
589 log::error!("Failed to send snapshot: {e}");
590 }
591
592 let buffered = {
594 let mut taken = Vec::new();
595 let mut should_return = false;
596 buffers.rcu(|m| {
597 taken = Vec::new();
598 should_return = false;
599
600 match m.get_mut(&instrument_id) {
601 Some(buffer) if buffer.epoch == epoch => {
602 taken = std::mem::take(&mut buffer.updates);
603 }
604 _ => should_return = true,
605 }
606 });
607
608 if should_return {
609 return;
610 }
611 taken
612 };
613
614 let mut replayed = 0;
616 let mut last_final_update_id = last_update_id;
617
618 for update in buffered {
619 if update.final_update_id <= last_update_id {
621 continue;
622 }
623
624 if update.prev_final_update_id != last_final_update_id {
627 if retry_count < MAX_RETRIES {
628 log::warn!(
629 "OrderBook continuity break for {instrument_id}: \
630 expected pu={last_final_update_id}, was pu={}, \
631 triggering resync (attempt {}/{})",
632 update.prev_final_update_id,
633 retry_count + 1,
634 MAX_RETRIES
635 );
636
637 buffers.rcu(|m| {
638 if let Some(buffer) = m.get_mut(&instrument_id)
639 && buffer.epoch == epoch
640 {
641 buffer.updates.clear();
642 }
643 });
644
645 Box::pin(Self::fetch_and_emit_snapshot_inner(
646 http,
647 sender,
648 buffers,
649 instruments,
650 instrument_id,
651 depth,
652 epoch,
653 clock,
654 retry_count + 1,
655 ))
656 .await;
657 return;
658 }
659 log::error!(
660 "OrderBook continuity break for {instrument_id} after {MAX_RETRIES} \
661 retries: expected pu={last_final_update_id}, was pu={}; \
662 book may be inconsistent",
663 update.prev_final_update_id
664 );
665 }
666
667 last_final_update_id = update.final_update_id;
668 replayed += 1;
669
670 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(
671 OrderBookDeltas_API::new(update.deltas),
672 ))) {
673 log::error!("Failed to send replayed deltas: {e}");
674 }
675 }
676
677 loop {
679 let more = {
680 let mut taken = Vec::new();
681 let mut should_break = false;
682 buffers.rcu(|m| {
683 taken = Vec::new();
684 should_break = false;
685
686 match m.get_mut(&instrument_id) {
687 Some(buffer) if buffer.epoch == epoch => {
688 if buffer.updates.is_empty() {
689 m.remove(&instrument_id);
690 should_break = true;
691 } else {
692 taken = std::mem::take(&mut buffer.updates);
693 }
694 }
695 _ => should_break = true,
696 }
697 });
698
699 if should_break {
700 break;
701 }
702 taken
703 };
704
705 for update in more {
706 if update.final_update_id <= last_update_id {
707 continue;
708 }
709
710 if update.prev_final_update_id != last_final_update_id {
711 if retry_count < MAX_RETRIES {
712 log::warn!(
713 "OrderBook continuity break for {instrument_id}: \
714 expected pu={last_final_update_id}, was pu={}, \
715 triggering resync (attempt {}/{})",
716 update.prev_final_update_id,
717 retry_count + 1,
718 MAX_RETRIES
719 );
720
721 buffers.rcu(|m| {
722 if let Some(buffer) = m.get_mut(&instrument_id)
723 && buffer.epoch == epoch
724 {
725 buffer.updates.clear();
726 }
727 });
728
729 Box::pin(Self::fetch_and_emit_snapshot_inner(
730 http,
731 sender,
732 buffers,
733 instruments,
734 instrument_id,
735 depth,
736 epoch,
737 clock,
738 retry_count + 1,
739 ))
740 .await;
741 return;
742 }
743 log::error!(
744 "OrderBook continuity break for {instrument_id} after \
745 {MAX_RETRIES} retries; book may be inconsistent"
746 );
747 }
748
749 last_final_update_id = update.final_update_id;
750 replayed += 1;
751
752 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(
753 OrderBookDeltas_API::new(update.deltas),
754 ))) {
755 log::error!("Failed to send replayed deltas: {e}");
756 }
757 }
758 }
759
760 log::info!(
761 "OrderBook snapshot rebuild for {instrument_id} completed \
762 (lastUpdateId={last_update_id}, replayed={replayed})"
763 );
764 }
765 Err(e) => {
766 log::error!("Failed to request order book snapshot for {instrument_id}: {e}");
767 buffers.remove(&instrument_id);
768 }
769 }
770 }
771}
772
773fn upsert_instrument(
774 cache: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
775 instrument: InstrumentAny,
776) {
777 cache.insert(instrument.id(), instrument);
778}
779
780fn parse_order_book_snapshot(
781 order_book: &BinanceOrderBook,
782 instrument_id: InstrumentId,
783 price_precision: u8,
784 size_precision: u8,
785 ts_init: UnixNanos,
786) -> OrderBookDeltas {
787 let sequence = order_book.last_update_id as u64;
788 let ts_event = order_book.transaction_time.map_or(ts_init, |t| {
789 UnixNanos::from((t as u64) * NANOSECONDS_IN_MILLISECOND)
790 });
791
792 let total_levels = order_book.bids.len() + order_book.asks.len();
793 let mut deltas = Vec::with_capacity(total_levels + 1);
794
795 deltas.push(OrderBookDelta::clear(
797 instrument_id,
798 sequence,
799 ts_event,
800 ts_init,
801 ));
802
803 for (i, (price_str, qty_str)) in order_book.bids.iter().enumerate() {
804 let price: f64 = price_str.parse().unwrap_or(0.0);
805 let size: f64 = qty_str.parse().unwrap_or(0.0);
806
807 let is_last = i == order_book.bids.len() - 1 && order_book.asks.is_empty();
808 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
809
810 let order = BookOrder::new(
811 OrderSide::Buy,
812 Price::new(price, price_precision),
813 Quantity::new(size, size_precision),
814 0,
815 );
816
817 deltas.push(OrderBookDelta::new(
818 instrument_id,
819 BookAction::Add,
820 order,
821 flags,
822 sequence,
823 ts_event,
824 ts_init,
825 ));
826 }
827
828 for (i, (price_str, qty_str)) in order_book.asks.iter().enumerate() {
829 let price: f64 = price_str.parse().unwrap_or(0.0);
830 let size: f64 = qty_str.parse().unwrap_or(0.0);
831
832 let is_last = i == order_book.asks.len() - 1;
833 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
834
835 let order = BookOrder::new(
836 OrderSide::Sell,
837 Price::new(price, price_precision),
838 Quantity::new(size, size_precision),
839 0,
840 );
841
842 deltas.push(OrderBookDelta::new(
843 instrument_id,
844 BookAction::Add,
845 order,
846 flags,
847 sequence,
848 ts_event,
849 ts_init,
850 ));
851 }
852
853 OrderBookDeltas::new(instrument_id, deltas)
854}
855
856#[async_trait::async_trait(?Send)]
857impl DataClient for BinanceFuturesDataClient {
858 fn client_id(&self) -> ClientId {
859 self.client_id
860 }
861
862 fn venue(&self) -> Option<Venue> {
863 Some(self.venue())
864 }
865
866 fn start(&mut self) -> anyhow::Result<()> {
867 log::info!(
868 "Started: client_id={}, product_type={:?}, environment={:?}",
869 self.client_id,
870 self.product_type,
871 self.config.environment,
872 );
873 Ok(())
874 }
875
876 fn stop(&mut self) -> anyhow::Result<()> {
877 log::info!("Stopping {id}", id = self.client_id);
878 self.cancellation_token.cancel();
879 self.is_connected.store(false, Ordering::Relaxed);
880 Ok(())
881 }
882
883 fn reset(&mut self) -> anyhow::Result<()> {
884 log::debug!("Resetting {id}", id = self.client_id);
885
886 self.cancellation_token.cancel();
887
888 for task in self.tasks.drain(..) {
889 task.abort();
890 }
891
892 let mut ws = self.ws_client.clone();
893 get_runtime().spawn(async move {
894 let _ = ws.close().await;
895 });
896
897 self.mark_price_refs.store(AHashMap::new());
899 self.book_subscriptions.store(AHashMap::new());
900 self.book_buffers.store(AHashMap::new());
901
902 self.is_connected.store(false, Ordering::Relaxed);
903 self.cancellation_token = CancellationToken::new();
904 Ok(())
905 }
906
907 fn dispose(&mut self) -> anyhow::Result<()> {
908 log::debug!("Disposing {id}", id = self.client_id);
909 self.stop()
910 }
911
912 async fn connect(&mut self) -> anyhow::Result<()> {
913 if self.is_connected() {
914 return Ok(());
915 }
916
917 self.cancellation_token = CancellationToken::new();
919
920 let instruments = self
921 .http_client
922 .request_instruments()
923 .await
924 .context("failed to request Binance Futures instruments")?;
925
926 {
928 let mut inst_map = AHashMap::new();
929 let mut status_map = AHashMap::new();
930
931 for instrument in &instruments {
932 inst_map.insert(instrument.id(), instrument.clone());
933 }
934
935 let http_instruments = self.http_client.instruments_cache();
936 for entry in http_instruments.iter() {
937 let raw_symbol = entry.key();
938 let action = match entry.value() {
939 crate::futures::http::client::BinanceFuturesInstrument::UsdM(s) => {
940 MarketStatusAction::from(s.status)
941 }
942 crate::futures::http::client::BinanceFuturesInstrument::CoinM(s) => s
943 .contract_status
944 .map_or(MarketStatusAction::NotAvailableForTrading, Into::into),
945 };
946
947 for instrument in &instruments {
948 if instrument.raw_symbol().as_str() == raw_symbol.as_str() {
949 status_map.insert(instrument.id(), action);
950 break;
951 }
952 }
953 }
954
955 self.instruments.store(inst_map);
956 self.status_cache.store(status_map);
957 }
958
959 for instrument in instruments.clone() {
960 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
961 log::warn!("Failed to send instrument: {e}");
962 }
963 }
964
965 self.ws_client.cache_instruments(&instruments);
966 self.ws_public_client.cache_instruments(&instruments);
967
968 log::info!("Connecting to Binance Futures market WebSocket...");
969 self.ws_client.connect().await.map_err(|e| {
970 log::error!("Binance Futures market WebSocket connection failed: {e:?}");
971 anyhow::anyhow!("failed to connect Binance Futures market WebSocket: {e}")
972 })?;
973 log::info!("Binance Futures market WebSocket connected");
974
975 log::info!("Connecting to Binance Futures public WebSocket...");
976 self.ws_public_client.connect().await.map_err(|e| {
977 log::error!("Binance Futures public WebSocket connection failed: {e:?}");
978 anyhow::anyhow!("failed to connect Binance Futures public WebSocket: {e}")
979 })?;
980 log::info!("Binance Futures public WebSocket connected");
981
982 let stream = self.ws_client.stream();
984 let sender = self.data_sender.clone();
985 let insts = self.instruments.clone();
986 let ws_insts = self.ws_client.instruments_cache();
987 let buffers = self.book_buffers.clone();
988 let book_subs = self.book_subscriptions.clone();
989 let book_epoch = self.book_epoch.clone();
990 let http = self.http_client.clone();
991 let clock = self.clock;
992 let cancel = self.cancellation_token.clone();
993
994 let handle = get_runtime().spawn(async move {
995 pin_mut!(stream);
996
997 loop {
998 tokio::select! {
999 Some(message) = stream.next() => {
1000 Self::handle_ws_message(
1001 message,
1002 &sender,
1003 &insts,
1004 &ws_insts,
1005 &buffers,
1006 &book_subs,
1007 &book_epoch,
1008 &http,
1009 clock,
1010 );
1011 }
1012 () = cancel.cancelled() => {
1013 log::debug!("Market WebSocket stream task cancelled");
1014 break;
1015 }
1016 }
1017 }
1018 });
1019 self.tasks.push(handle);
1020
1021 let pub_stream = self.ws_public_client.stream();
1023 let pub_sender = self.data_sender.clone();
1024 let pub_insts = self.instruments.clone();
1025 let pub_ws_insts = self.ws_public_client.instruments_cache();
1026 let pub_buffers = self.book_buffers.clone();
1027 let pub_book_subs = self.book_subscriptions.clone();
1028 let pub_book_epoch = self.book_epoch.clone();
1029 let pub_http = self.http_client.clone();
1030 let pub_cancel = self.cancellation_token.clone();
1031
1032 let pub_handle = get_runtime().spawn(async move {
1033 pin_mut!(pub_stream);
1034
1035 loop {
1036 tokio::select! {
1037 Some(message) = pub_stream.next() => {
1038 Self::handle_ws_message(
1039 message,
1040 &pub_sender,
1041 &pub_insts,
1042 &pub_ws_insts,
1043 &pub_buffers,
1044 &pub_book_subs,
1045 &pub_book_epoch,
1046 &pub_http,
1047 clock,
1048 );
1049 }
1050 () = pub_cancel.cancelled() => {
1051 log::debug!("Public WebSocket stream task cancelled");
1052 break;
1053 }
1054 }
1055 }
1056 });
1057 self.tasks.push(pub_handle);
1058
1059 let poll_secs = self.config.instrument_status_poll_secs;
1061 if poll_secs > 0 {
1062 let poll_http = self.http_client.clone();
1063 let poll_sender = self.data_sender.clone();
1064 let poll_instruments = self.instruments.clone();
1065 let poll_status_cache = self.status_cache.clone();
1066 let poll_cancel = self.cancellation_token.clone();
1067 let poll_clock = self.clock;
1068
1069 let poll_handle = get_runtime().spawn(async move {
1070 let mut interval =
1071 tokio::time::interval(tokio::time::Duration::from_secs(poll_secs));
1072 interval.tick().await; loop {
1075 tokio::select! {
1076 _ = interval.tick() => {
1077 match poll_http.request_symbol_statuses().await {
1078 Ok(symbol_statuses) => {
1079 let ts = poll_clock.get_time_ns();
1080 let inst_guard = poll_instruments.load();
1081
1082 let raw_to_id: AHashMap<Ustr, InstrumentId> = inst_guard
1084 .values()
1085 .map(|inst| (inst.raw_symbol().inner(), inst.id()))
1086 .collect();
1087
1088 let mut new_statuses = AHashMap::new();
1089
1090 for (raw_symbol, action) in &symbol_statuses {
1091 if let Some(&id) = raw_to_id.get(raw_symbol) {
1092 new_statuses.insert(id, *action);
1093 }
1094 }
1095 drop(inst_guard);
1096
1097 let mut cache = (**poll_status_cache.load()).clone();
1098 diff_and_emit_statuses(
1099 &new_statuses, &mut cache, &poll_sender, ts, ts,
1100 );
1101 poll_status_cache.store(cache);
1102 }
1103 Err(e) => {
1104 log::warn!("Futures instrument status poll failed: {e}");
1105 }
1106 }
1107 }
1108 () = poll_cancel.cancelled() => {
1109 log::debug!("Futures instrument status polling task cancelled");
1110 break;
1111 }
1112 }
1113 }
1114 });
1115 self.tasks.push(poll_handle);
1116 log::info!("Futures instrument status polling started: interval={poll_secs}s");
1117 }
1118
1119 self.is_connected.store(true, Ordering::Release);
1120 log::info!("Connected: client_id={}", self.client_id);
1121 Ok(())
1122 }
1123
1124 async fn disconnect(&mut self) -> anyhow::Result<()> {
1125 if self.is_disconnected() {
1126 return Ok(());
1127 }
1128
1129 self.cancellation_token.cancel();
1130
1131 let _ = self.ws_client.close().await;
1132 let _ = self.ws_public_client.close().await;
1133
1134 let handles: Vec<_> = self.tasks.drain(..).collect();
1135 for handle in handles {
1136 if let Err(e) = handle.await {
1137 log::error!("Error joining WebSocket task: {e}");
1138 }
1139 }
1140
1141 self.mark_price_refs.store(AHashMap::new());
1143 self.book_subscriptions.store(AHashMap::new());
1144 self.book_buffers.store(AHashMap::new());
1145
1146 self.is_connected.store(false, Ordering::Release);
1147 log::info!("Disconnected: client_id={}", self.client_id);
1148 Ok(())
1149 }
1150
1151 fn is_connected(&self) -> bool {
1152 self.is_connected.load(Ordering::Relaxed)
1153 }
1154
1155 fn is_disconnected(&self) -> bool {
1156 !self.is_connected()
1157 }
1158
1159 fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
1160 log::debug!(
1161 "subscribe_instruments: Binance Futures instruments are fetched via HTTP on connect"
1162 );
1163 Ok(())
1164 }
1165
1166 fn subscribe_instrument(&mut self, _cmd: SubscribeInstrument) -> anyhow::Result<()> {
1167 log::debug!(
1168 "subscribe_instrument: Binance Futures instruments are fetched via HTTP on connect"
1169 );
1170 Ok(())
1171 }
1172
1173 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
1174 if cmd.book_type != BookType::L2_MBP {
1175 anyhow::bail!("Binance Futures only supports L2_MBP order book deltas");
1176 }
1177
1178 let instrument_id = cmd.instrument_id;
1179 let depth = cmd.depth.map_or(1000, |d| d.get() as u32);
1180
1181 if !BINANCE_BOOK_DEPTHS.contains(&depth) {
1182 anyhow::bail!(
1183 "Invalid depth {depth} for Binance Futures order book. \
1184 Valid values: {BINANCE_BOOK_DEPTHS:?}"
1185 );
1186 }
1187
1188 self.book_subscriptions.insert(instrument_id, depth);
1190
1191 let epoch = {
1193 let mut guard = self.book_epoch.write().expect(MUTEX_POISONED);
1194 *guard = guard.wrapping_add(1);
1195 *guard
1196 };
1197
1198 self.book_buffers
1200 .insert(instrument_id, BookBuffer::new(epoch));
1201
1202 log::info!("OrderBook snapshot rebuild for {instrument_id} @ depth {depth} starting");
1203
1204 let ws = self.ws_public_client.clone();
1206 let stream = format!("{}@depth@0ms", format_binance_stream_symbol(&instrument_id));
1207
1208 self.spawn_ws(
1209 async move {
1210 ws.subscribe(vec![stream])
1211 .await
1212 .context("book deltas subscription")
1213 },
1214 "order book subscription",
1215 );
1216
1217 let http = self.http_client.clone();
1219 let sender = self.data_sender.clone();
1220 let buffers = self.book_buffers.clone();
1221 let instruments = self.instruments.clone();
1222 let clock = self.clock;
1223
1224 get_runtime().spawn(async move {
1225 Self::fetch_and_emit_snapshot(
1226 http,
1227 sender,
1228 buffers,
1229 instruments,
1230 instrument_id,
1231 depth,
1232 epoch,
1233 clock,
1234 )
1235 .await;
1236 });
1237
1238 Ok(())
1239 }
1240
1241 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
1242 let instrument_id = cmd.instrument_id;
1243 let ws = self.ws_public_client.clone();
1244
1245 let stream = format!(
1247 "{}@bookTicker",
1248 format_binance_stream_symbol(&instrument_id)
1249 );
1250
1251 self.spawn_ws(
1252 async move {
1253 ws.subscribe(vec![stream])
1254 .await
1255 .context("quotes subscription")
1256 },
1257 "quote subscription",
1258 );
1259 Ok(())
1260 }
1261
1262 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
1263 let instrument_id = cmd.instrument_id;
1264 let ws = self.ws_client.clone();
1265
1266 let stream = format!("{}@aggTrade", format_binance_stream_symbol(&instrument_id));
1268
1269 self.spawn_ws(
1270 async move {
1271 ws.subscribe(vec![stream])
1272 .await
1273 .context("trades subscription")
1274 },
1275 "trade subscription",
1276 );
1277 Ok(())
1278 }
1279
1280 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
1281 let bar_type = cmd.bar_type;
1282 let ws = self.ws_client.clone();
1283 let interval = bar_spec_to_binance_interval(bar_type.spec())?;
1284
1285 let stream = format!(
1286 "{}@kline_{}",
1287 format_binance_stream_symbol(&bar_type.instrument_id()),
1288 interval.as_str()
1289 );
1290
1291 self.spawn_ws(
1292 async move {
1293 ws.subscribe(vec![stream])
1294 .await
1295 .context("bars subscription")
1296 },
1297 "bar subscription",
1298 );
1299 Ok(())
1300 }
1301
1302 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
1303 let instrument_id = cmd.instrument_id;
1304
1305 let should_subscribe = {
1307 let prev = self
1308 .mark_price_refs
1309 .load()
1310 .get(&instrument_id)
1311 .copied()
1312 .unwrap_or(0);
1313 self.mark_price_refs.rcu(|m| {
1314 let count = m.entry(instrument_id).or_insert(0);
1315 *count += 1;
1316 });
1317 prev == 0
1318 };
1319
1320 if should_subscribe {
1321 let ws = self.ws_client.clone();
1322 let stream = format!(
1323 "{}@markPrice@1s",
1324 format_binance_stream_symbol(&instrument_id)
1325 );
1326
1327 self.spawn_ws(
1328 async move {
1329 ws.subscribe(vec![stream])
1330 .await
1331 .context("mark prices subscription")
1332 },
1333 "mark prices subscription",
1334 );
1335 }
1336 Ok(())
1337 }
1338
1339 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
1340 let instrument_id = cmd.instrument_id;
1341
1342 let should_subscribe = {
1344 let prev = self
1345 .mark_price_refs
1346 .load()
1347 .get(&instrument_id)
1348 .copied()
1349 .unwrap_or(0);
1350 self.mark_price_refs.rcu(|m| {
1351 let count = m.entry(instrument_id).or_insert(0);
1352 *count += 1;
1353 });
1354 prev == 0
1355 };
1356
1357 if should_subscribe {
1358 let ws = self.ws_client.clone();
1359 let stream = format!(
1360 "{}@markPrice@1s",
1361 format_binance_stream_symbol(&instrument_id)
1362 );
1363
1364 self.spawn_ws(
1365 async move {
1366 ws.subscribe(vec![stream])
1367 .await
1368 .context("index prices subscription")
1369 },
1370 "index prices subscription",
1371 );
1372 }
1373 Ok(())
1374 }
1375
1376 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
1377 let instrument_id = cmd.instrument_id;
1378
1379 let should_subscribe = {
1380 let prev = self
1381 .mark_price_refs
1382 .load()
1383 .get(&instrument_id)
1384 .copied()
1385 .unwrap_or(0);
1386 self.mark_price_refs.rcu(|m| {
1387 let count = m.entry(instrument_id).or_insert(0);
1388 *count += 1;
1389 });
1390 prev == 0
1391 };
1392
1393 if should_subscribe {
1394 let ws = self.ws_client.clone();
1395 let stream = format!(
1396 "{}@markPrice@1s",
1397 format_binance_stream_symbol(&instrument_id)
1398 );
1399
1400 self.spawn_ws(
1401 async move {
1402 ws.subscribe(vec![stream])
1403 .await
1404 .context("funding rates subscription")
1405 },
1406 "funding rates subscription",
1407 );
1408 }
1409 Ok(())
1410 }
1411
1412 fn subscribe_instrument_status(
1413 &mut self,
1414 cmd: SubscribeInstrumentStatus,
1415 ) -> anyhow::Result<()> {
1416 log::debug!(
1417 "subscribe_instrument_status: {id} (status changes detected via periodic exchange info polling)",
1418 id = cmd.instrument_id,
1419 );
1420 Ok(())
1421 }
1422
1423 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1424 let instrument_id = cmd.instrument_id;
1425 let ws = self.ws_public_client.clone();
1426
1427 self.book_subscriptions.remove(&instrument_id);
1429
1430 self.book_buffers.remove(&instrument_id);
1432
1433 let symbol_lower = format_binance_stream_symbol(&instrument_id);
1434 let streams = vec![
1435 format!("{symbol_lower}@depth"),
1436 format!("{symbol_lower}@depth@0ms"),
1437 format!("{symbol_lower}@depth@100ms"),
1438 format!("{symbol_lower}@depth@250ms"),
1439 format!("{symbol_lower}@depth@500ms"),
1440 ];
1441
1442 self.spawn_ws(
1443 async move {
1444 ws.unsubscribe(streams)
1445 .await
1446 .context("book deltas unsubscribe")
1447 },
1448 "order book unsubscribe",
1449 );
1450 Ok(())
1451 }
1452
1453 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1454 let instrument_id = cmd.instrument_id;
1455 let ws = self.ws_public_client.clone();
1456
1457 let stream = format!(
1458 "{}@bookTicker",
1459 format_binance_stream_symbol(&instrument_id)
1460 );
1461
1462 self.spawn_ws(
1463 async move {
1464 ws.unsubscribe(vec![stream])
1465 .await
1466 .context("quotes unsubscribe")
1467 },
1468 "quote unsubscribe",
1469 );
1470 Ok(())
1471 }
1472
1473 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1474 let instrument_id = cmd.instrument_id;
1475 let ws = self.ws_client.clone();
1476
1477 let stream = format!("{}@aggTrade", format_binance_stream_symbol(&instrument_id));
1478
1479 self.spawn_ws(
1480 async move {
1481 ws.unsubscribe(vec![stream])
1482 .await
1483 .context("trades unsubscribe")
1484 },
1485 "trade unsubscribe",
1486 );
1487 Ok(())
1488 }
1489
1490 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1491 let bar_type = cmd.bar_type;
1492 let ws = self.ws_client.clone();
1493 let interval = bar_spec_to_binance_interval(bar_type.spec())?;
1494
1495 let stream = format!(
1496 "{}@kline_{}",
1497 format_binance_stream_symbol(&bar_type.instrument_id()),
1498 interval.as_str()
1499 );
1500
1501 self.spawn_ws(
1502 async move {
1503 ws.unsubscribe(vec![stream])
1504 .await
1505 .context("bars unsubscribe")
1506 },
1507 "bar unsubscribe",
1508 );
1509 Ok(())
1510 }
1511
1512 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1513 let instrument_id = cmd.instrument_id;
1514
1515 let should_unsubscribe = {
1517 let prev = self.mark_price_refs.load().get(&instrument_id).copied();
1518 match prev {
1519 Some(count) if count <= 1 => {
1520 self.mark_price_refs.remove(&instrument_id);
1521 true
1522 }
1523 Some(_) => {
1524 self.mark_price_refs.rcu(|m| {
1525 if let Some(count) = m.get_mut(&instrument_id) {
1526 *count = count.saturating_sub(1);
1527 }
1528 });
1529 false
1530 }
1531 None => false,
1532 }
1533 };
1534
1535 if should_unsubscribe {
1536 let ws = self.ws_client.clone();
1537 let symbol_lower = format_binance_stream_symbol(&instrument_id);
1538 let streams = vec![
1539 format!("{symbol_lower}@markPrice"),
1540 format!("{symbol_lower}@markPrice@1s"),
1541 format!("{symbol_lower}@markPrice@3s"),
1542 ];
1543
1544 self.spawn_ws(
1545 async move {
1546 ws.unsubscribe(streams)
1547 .await
1548 .context("mark prices unsubscribe")
1549 },
1550 "mark prices unsubscribe",
1551 );
1552 }
1553 Ok(())
1554 }
1555
1556 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1557 let instrument_id = cmd.instrument_id;
1558
1559 let should_unsubscribe = {
1561 let prev = self.mark_price_refs.load().get(&instrument_id).copied();
1562 match prev {
1563 Some(count) if count <= 1 => {
1564 self.mark_price_refs.remove(&instrument_id);
1565 true
1566 }
1567 Some(_) => {
1568 self.mark_price_refs.rcu(|m| {
1569 if let Some(count) = m.get_mut(&instrument_id) {
1570 *count = count.saturating_sub(1);
1571 }
1572 });
1573 false
1574 }
1575 None => false,
1576 }
1577 };
1578
1579 if should_unsubscribe {
1580 let ws = self.ws_client.clone();
1581 let symbol_lower = format_binance_stream_symbol(&instrument_id);
1582 let streams = vec![
1583 format!("{symbol_lower}@markPrice"),
1584 format!("{symbol_lower}@markPrice@1s"),
1585 format!("{symbol_lower}@markPrice@3s"),
1586 ];
1587
1588 self.spawn_ws(
1589 async move {
1590 ws.unsubscribe(streams)
1591 .await
1592 .context("index prices unsubscribe")
1593 },
1594 "index prices unsubscribe",
1595 );
1596 }
1597 Ok(())
1598 }
1599
1600 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1601 let instrument_id = cmd.instrument_id;
1602
1603 let should_unsubscribe = {
1604 let prev = self.mark_price_refs.load().get(&instrument_id).copied();
1605 match prev {
1606 Some(count) if count <= 1 => {
1607 self.mark_price_refs.remove(&instrument_id);
1608 true
1609 }
1610 Some(_) => {
1611 self.mark_price_refs.rcu(|m| {
1612 if let Some(count) = m.get_mut(&instrument_id) {
1613 *count = count.saturating_sub(1);
1614 }
1615 });
1616 false
1617 }
1618 None => false,
1619 }
1620 };
1621
1622 if should_unsubscribe {
1623 let ws = self.ws_client.clone();
1624 let symbol_lower = format_binance_stream_symbol(&instrument_id);
1625 let streams = vec![
1626 format!("{symbol_lower}@markPrice"),
1627 format!("{symbol_lower}@markPrice@1s"),
1628 format!("{symbol_lower}@markPrice@3s"),
1629 ];
1630
1631 self.spawn_ws(
1632 async move {
1633 ws.unsubscribe(streams)
1634 .await
1635 .context("funding rates unsubscribe")
1636 },
1637 "funding rates unsubscribe",
1638 );
1639 }
1640 Ok(())
1641 }
1642
1643 fn unsubscribe_instrument_status(
1644 &mut self,
1645 cmd: &UnsubscribeInstrumentStatus,
1646 ) -> anyhow::Result<()> {
1647 log::debug!(
1648 "unsubscribe_instrument_status: {id}",
1649 id = cmd.instrument_id,
1650 );
1651 Ok(())
1652 }
1653
1654 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1655 let http = self.http_client.clone();
1656 let sender = self.data_sender.clone();
1657 let instruments_cache = self.instruments.clone();
1658 let request_id = request.request_id;
1659 let client_id = request.client_id.unwrap_or(self.client_id);
1660 let venue = self.venue();
1661 let start = request.start;
1662 let end = request.end;
1663 let params = request.params;
1664 let clock = self.clock;
1665 let start_nanos = datetime_to_unix_nanos(start);
1666 let end_nanos = datetime_to_unix_nanos(end);
1667
1668 get_runtime().spawn(async move {
1669 match http.request_instruments().await {
1670 Ok(instruments) => {
1671 for instrument in &instruments {
1672 upsert_instrument(&instruments_cache, instrument.clone());
1673 }
1674
1675 let response = DataResponse::Instruments(InstrumentsResponse::new(
1676 request_id,
1677 client_id,
1678 venue,
1679 instruments,
1680 start_nanos,
1681 end_nanos,
1682 clock.get_time_ns(),
1683 params,
1684 ));
1685
1686 if let Err(e) = sender.send(DataEvent::Response(response)) {
1687 log::error!("Failed to send instruments response: {e}");
1688 }
1689 }
1690 Err(e) => log::error!("Instruments request failed: {e:?}"),
1691 }
1692 });
1693
1694 Ok(())
1695 }
1696
1697 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1698 let http = self.http_client.clone();
1699 let sender = self.data_sender.clone();
1700 let instruments = self.instruments.clone();
1701 let instrument_id = request.instrument_id;
1702 let request_id = request.request_id;
1703 let client_id = request.client_id.unwrap_or(self.client_id);
1704 let start = request.start;
1705 let end = request.end;
1706 let params = request.params;
1707 let clock = self.clock;
1708 let start_nanos = datetime_to_unix_nanos(start);
1709 let end_nanos = datetime_to_unix_nanos(end);
1710
1711 get_runtime().spawn(async move {
1712 match http.request_instruments().await {
1713 Ok(all_instruments) => {
1714 for instrument in &all_instruments {
1715 upsert_instrument(&instruments, instrument.clone());
1716 }
1717
1718 let instrument = all_instruments
1719 .into_iter()
1720 .find(|i| i.id() == instrument_id);
1721
1722 if let Some(instrument) = instrument {
1723 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1724 request_id,
1725 client_id,
1726 instrument.id(),
1727 instrument,
1728 start_nanos,
1729 end_nanos,
1730 clock.get_time_ns(),
1731 params,
1732 )));
1733
1734 if let Err(e) = sender.send(DataEvent::Response(response)) {
1735 log::error!("Failed to send instrument response: {e}");
1736 }
1737 } else {
1738 log::error!("Instrument not found: {instrument_id}");
1739 }
1740 }
1741 Err(e) => log::error!("Instrument request failed: {e:?}"),
1742 }
1743 });
1744
1745 Ok(())
1746 }
1747
1748 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1749 let http = self.http_client.clone();
1750 let sender = self.data_sender.clone();
1751 let instrument_id = request.instrument_id;
1752 let limit = request.limit.map(|n| n.get() as u32);
1753 let request_id = request.request_id;
1754 let client_id = request.client_id.unwrap_or(self.client_id);
1755 let params = request.params;
1756 let clock = self.clock;
1757 let start_nanos = datetime_to_unix_nanos(request.start);
1758 let end_nanos = datetime_to_unix_nanos(request.end);
1759
1760 get_runtime().spawn(async move {
1761 match http
1762 .request_trades(instrument_id, limit)
1763 .await
1764 .context("failed to request trades from Binance Futures")
1765 {
1766 Ok(trades) => {
1767 let response = DataResponse::Trades(TradesResponse::new(
1768 request_id,
1769 client_id,
1770 instrument_id,
1771 trades,
1772 start_nanos,
1773 end_nanos,
1774 clock.get_time_ns(),
1775 params,
1776 ));
1777
1778 if let Err(e) = sender.send(DataEvent::Response(response)) {
1779 log::error!("Failed to send trades response: {e}");
1780 }
1781 }
1782 Err(e) => log::error!("Trade request failed: {e:?}"),
1783 }
1784 });
1785
1786 Ok(())
1787 }
1788
1789 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1790 let http = self.http_client.clone();
1791 let sender = self.data_sender.clone();
1792 let bar_type = request.bar_type;
1793 let start = request.start;
1794 let end = request.end;
1795 let limit = request.limit.map(|n| n.get() as u32);
1796 let request_id = request.request_id;
1797 let client_id = request.client_id.unwrap_or(self.client_id);
1798 let params = request.params;
1799 let clock = self.clock;
1800 let start_nanos = datetime_to_unix_nanos(start);
1801 let end_nanos = datetime_to_unix_nanos(end);
1802
1803 get_runtime().spawn(async move {
1804 match http
1805 .request_bars(bar_type, start, end, limit)
1806 .await
1807 .context("failed to request bars from Binance Futures")
1808 {
1809 Ok(bars) => {
1810 let response = DataResponse::Bars(BarsResponse::new(
1811 request_id,
1812 client_id,
1813 bar_type,
1814 bars,
1815 start_nanos,
1816 end_nanos,
1817 clock.get_time_ns(),
1818 params,
1819 ));
1820
1821 if let Err(e) = sender.send(DataEvent::Response(response)) {
1822 log::error!("Failed to send bars response: {e}");
1823 }
1824 }
1825 Err(e) => log::error!("Bar request failed: {e:?}"),
1826 }
1827 });
1828
1829 Ok(())
1830 }
1831}