Skip to main content

nautilus_binance/futures/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the Binance Futures adapter.
17
18use std::sync::{
19    Arc, RwLock,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    clients::DataClient,
28    live::{runner::get_data_event_sender, runtime::get_runtime},
29    messages::{
30        DataEvent,
31        data::{
32            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34            SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
35            SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
36            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeFundingRates,
37            UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
38            subscribe::SubscribeInstrumentStatus, unsubscribe::UnsubscribeInstrumentStatus,
39        },
40    },
41};
42use nautilus_core::{
43    AtomicMap, MUTEX_POISONED,
44    datetime::{NANOSECONDS_IN_MILLISECOND, datetime_to_unix_nanos},
45    nanos::UnixNanos,
46    time::{AtomicTime, get_atomic_clock_realtime},
47};
48use nautilus_model::{
49    data::{BookOrder, Data, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
50    enums::{BookAction, BookType, MarketStatusAction, OrderSide, RecordFlag},
51    identifiers::{ClientId, InstrumentId, Venue},
52    instruments::{Instrument, InstrumentAny},
53    types::{Price, Quantity},
54};
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57use ustr::Ustr;
58
59use crate::{
60    common::{
61        consts::{BINANCE_BOOK_DEPTHS, BINANCE_VENUE},
62        enums::{BinanceEnvironment, BinanceProductType},
63        parse::bar_spec_to_binance_interval,
64        status::diff_and_emit_statuses,
65        symbol::format_binance_stream_symbol,
66        urls::{get_usdm_ws_route_base_url, get_ws_public_base_url},
67    },
68    config::BinanceDataClientConfig,
69    futures::{
70        http::{
71            client::BinanceFuturesHttpClient, models::BinanceOrderBook, query::BinanceDepthParams,
72        },
73        websocket::streams::{
74            client::BinanceFuturesWebSocketClient,
75            messages::BinanceFuturesWsStreamsMessage,
76            parse_data::{
77                parse_agg_trade, parse_book_ticker, parse_depth_update, parse_kline,
78                parse_mark_price, parse_trade,
79            },
80        },
81    },
82};
83
84#[derive(Debug, Clone)]
85struct BufferedDepthUpdate {
86    deltas: OrderBookDeltas,
87    first_update_id: u64,
88    final_update_id: u64,
89    prev_final_update_id: u64,
90}
91
92#[derive(Debug, Clone)]
93struct BookBuffer {
94    updates: Vec<BufferedDepthUpdate>,
95    epoch: u64,
96}
97
98impl BookBuffer {
99    fn new(epoch: u64) -> Self {
100        Self {
101            updates: Vec::new(),
102            epoch,
103        }
104    }
105}
106
107/// Binance Futures data client for USD-M and COIN-M markets.
108#[derive(Debug)]
109pub struct BinanceFuturesDataClient {
110    clock: &'static AtomicTime,
111    client_id: ClientId,
112    config: BinanceDataClientConfig,
113    product_type: BinanceProductType,
114    http_client: BinanceFuturesHttpClient,
115    ws_client: BinanceFuturesWebSocketClient,
116    ws_public_client: BinanceFuturesWebSocketClient,
117    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
118    is_connected: AtomicBool,
119    cancellation_token: CancellationToken,
120    tasks: Vec<JoinHandle<()>>,
121    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
122    status_cache: Arc<AtomicMap<InstrumentId, MarketStatusAction>>,
123    book_buffers: Arc<AtomicMap<InstrumentId, BookBuffer>>,
124    book_subscriptions: Arc<AtomicMap<InstrumentId, u32>>,
125    mark_price_refs: Arc<AtomicMap<InstrumentId, u32>>,
126    book_epoch: Arc<RwLock<u64>>,
127}
128
129impl BinanceFuturesDataClient {
130    /// Creates a new [`BinanceFuturesDataClient`] instance.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error if the client fails to initialize or if the product type
135    /// is not a futures type (UsdM or CoinM).
136    pub fn new(
137        client_id: ClientId,
138        config: BinanceDataClientConfig,
139        product_type: BinanceProductType,
140    ) -> anyhow::Result<Self> {
141        match product_type {
142            BinanceProductType::UsdM | BinanceProductType::CoinM => {}
143            _ => {
144                anyhow::bail!(
145                    "BinanceFuturesDataClient requires UsdM or CoinM product type, was {product_type:?}"
146                );
147            }
148        }
149
150        let clock = get_atomic_clock_realtime();
151        let data_sender = get_data_event_sender();
152
153        let http_client = BinanceFuturesHttpClient::new(
154            product_type,
155            config.environment,
156            clock,
157            config.api_key.clone(),
158            config.api_secret.clone(),
159            config.base_url_http.clone(),
160            None,  // recv_window
161            None,  // timeout_secs
162            None,  // proxy_url
163            false, // treat_expired_as_canceled
164        )?;
165
166        let market_url = config.base_url_ws.clone().map(|url| {
167            if product_type == BinanceProductType::UsdM
168                && config.environment == BinanceEnvironment::Mainnet
169            {
170                get_usdm_ws_route_base_url(&url, "market")
171            } else {
172                url
173            }
174        });
175
176        let ws_client = BinanceFuturesWebSocketClient::new(
177            product_type,
178            config.environment,
179            config.api_key.clone(),
180            config.api_secret.clone(),
181            market_url,
182            Some(20), // Heartbeat interval
183            config.transport_backend,
184        )?;
185
186        let public_url = config.base_url_ws.clone().map_or_else(
187            || get_ws_public_base_url(product_type, config.environment).to_string(),
188            |url| {
189                if product_type == BinanceProductType::UsdM
190                    && config.environment == BinanceEnvironment::Mainnet
191                {
192                    get_usdm_ws_route_base_url(&url, "public")
193                } else {
194                    url
195                }
196            },
197        );
198        let ws_public_client = BinanceFuturesWebSocketClient::new(
199            product_type,
200            config.environment,
201            None,
202            None,
203            Some(public_url),
204            Some(20),
205            config.transport_backend,
206        )?;
207
208        Ok(Self {
209            clock,
210            client_id,
211            config,
212            product_type,
213            http_client,
214            ws_client,
215            ws_public_client,
216            data_sender,
217            is_connected: AtomicBool::new(false),
218            cancellation_token: CancellationToken::new(),
219            tasks: Vec::new(),
220            instruments: Arc::new(AtomicMap::new()),
221            status_cache: Arc::new(AtomicMap::new()),
222            book_buffers: Arc::new(AtomicMap::new()),
223            book_subscriptions: Arc::new(AtomicMap::new()),
224            mark_price_refs: Arc::new(AtomicMap::new()),
225            book_epoch: Arc::new(RwLock::new(0)),
226        })
227    }
228
229    fn venue(&self) -> Venue {
230        *BINANCE_VENUE
231    }
232
233    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
234        if let Err(e) = sender.send(DataEvent::Data(data)) {
235            log::error!("Failed to emit data event: {e}");
236        }
237    }
238
239    fn spawn_ws<F>(&self, fut: F, context: &'static str)
240    where
241        F: Future<Output = anyhow::Result<()>> + Send + 'static,
242    {
243        get_runtime().spawn(async move {
244            if let Err(e) = fut.await {
245                log::error!("{context}: {e:?}");
246            }
247        });
248    }
249
250    #[expect(clippy::too_many_arguments)]
251    fn handle_ws_message(
252        msg: BinanceFuturesWsStreamsMessage,
253        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
254        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
255        ws_instruments: &Arc<AtomicMap<Ustr, InstrumentAny>>,
256        book_buffers: &Arc<AtomicMap<InstrumentId, BookBuffer>>,
257        book_subscriptions: &Arc<AtomicMap<InstrumentId, u32>>,
258        book_epoch: &Arc<RwLock<u64>>,
259        http_client: &BinanceFuturesHttpClient,
260        clock: &'static AtomicTime,
261    ) {
262        let ts_init = clock.get_time_ns();
263        let cache = ws_instruments.load();
264
265        match msg {
266            BinanceFuturesWsStreamsMessage::AggTrade(ref trade_msg) => {
267                if let Some(instrument) = cache.get(&trade_msg.symbol) {
268                    match parse_agg_trade(trade_msg, instrument, ts_init) {
269                        Ok(trade) => Self::send_data(data_sender, Data::Trade(trade)),
270                        Err(e) => log::warn!("Failed to parse aggregate trade: {e}"),
271                    }
272                }
273            }
274            BinanceFuturesWsStreamsMessage::Trade(ref trade_msg) => {
275                if let Some(instrument) = cache.get(&trade_msg.symbol) {
276                    match parse_trade(trade_msg, instrument, ts_init) {
277                        Ok(trade) => Self::send_data(data_sender, Data::Trade(trade)),
278                        Err(e) => log::warn!("Failed to parse trade: {e}"),
279                    }
280                }
281            }
282            BinanceFuturesWsStreamsMessage::BookTicker(ref ticker_msg) => {
283                if let Some(instrument) = cache.get(&ticker_msg.symbol) {
284                    match parse_book_ticker(ticker_msg, instrument, ts_init) {
285                        Ok(quote) => Self::send_data(data_sender, Data::Quote(quote)),
286                        Err(e) => log::warn!("Failed to parse book ticker: {e}"),
287                    }
288                }
289            }
290            BinanceFuturesWsStreamsMessage::DepthUpdate(ref depth_msg) => {
291                if let Some(instrument) = cache.get(&depth_msg.symbol) {
292                    match parse_depth_update(depth_msg, instrument, ts_init) {
293                        Ok(deltas) => {
294                            let instrument_id = deltas.instrument_id;
295                            let final_update_id = deltas.sequence;
296                            let first_update_id = depth_msg.first_update_id;
297                            let prev_final_update_id = depth_msg.prev_final_update_id;
298
299                            if book_buffers.contains_key(&instrument_id) {
300                                let mut was_buffered = false;
301                                book_buffers.rcu(|m| {
302                                    was_buffered = false;
303
304                                    if let Some(buffer) = m.get_mut(&instrument_id) {
305                                        buffer.updates.push(BufferedDepthUpdate {
306                                            deltas: deltas.clone(),
307                                            first_update_id,
308                                            final_update_id,
309                                            prev_final_update_id,
310                                        });
311                                        was_buffered = true;
312                                    }
313                                });
314
315                                if was_buffered {
316                                    return;
317                                }
318                            }
319
320                            Self::send_data(
321                                data_sender,
322                                Data::Deltas(OrderBookDeltas_API::new(deltas)),
323                            );
324                        }
325                        Err(e) => log::warn!("Failed to parse depth update: {e}"),
326                    }
327                }
328            }
329            BinanceFuturesWsStreamsMessage::MarkPrice(ref mark_msg) => {
330                if let Some(instrument) = cache.get(&mark_msg.symbol) {
331                    match parse_mark_price(mark_msg, instrument, ts_init) {
332                        Ok((mark_update, index_update, funding_update)) => {
333                            Self::send_data(data_sender, Data::MarkPriceUpdate(mark_update));
334                            Self::send_data(data_sender, Data::IndexPriceUpdate(index_update));
335                            if let Err(e) = data_sender.send(DataEvent::FundingRate(funding_update))
336                            {
337                                log::error!("Failed to emit funding rate: {e}");
338                            }
339                        }
340                        Err(e) => log::warn!("Failed to parse mark price: {e}"),
341                    }
342                }
343            }
344            BinanceFuturesWsStreamsMessage::Kline(ref kline_msg) => {
345                if let Some(instrument) = cache.get(&kline_msg.symbol) {
346                    match parse_kline(kline_msg, instrument, ts_init) {
347                        Ok(Some(bar)) => Self::send_data(data_sender, Data::Bar(bar)),
348                        Ok(None) => {} // Kline not closed yet
349                        Err(e) => log::warn!("Failed to parse kline: {e}"),
350                    }
351                }
352            }
353            BinanceFuturesWsStreamsMessage::ForceOrder(ref liq_msg) => {
354                log::info!(
355                    "Liquidation: {} {:?} {:?} qty={} at price={}",
356                    liq_msg.order.symbol,
357                    liq_msg.order.side,
358                    liq_msg.order.status,
359                    liq_msg.order.original_qty,
360                    liq_msg.order.average_price,
361                );
362            }
363            BinanceFuturesWsStreamsMessage::Ticker(ref ticker_msg) => {
364                log::debug!(
365                    "Ticker: {} last={} vol={}",
366                    ticker_msg.symbol,
367                    ticker_msg.last_price,
368                    ticker_msg.volume,
369                );
370            }
371            // Execution messages ignored by data client
372            BinanceFuturesWsStreamsMessage::AccountUpdate(_)
373            | BinanceFuturesWsStreamsMessage::OrderUpdate(_)
374            | BinanceFuturesWsStreamsMessage::TradeLite(_)
375            | BinanceFuturesWsStreamsMessage::AlgoUpdate(_)
376            | BinanceFuturesWsStreamsMessage::MarginCall(_)
377            | BinanceFuturesWsStreamsMessage::AccountConfigUpdate(_)
378            | BinanceFuturesWsStreamsMessage::ListenKeyExpired => {}
379            BinanceFuturesWsStreamsMessage::Error(e) => {
380                log::error!(
381                    "Binance Futures WebSocket error: code={}, msg={}",
382                    e.code,
383                    e.msg
384                );
385            }
386            BinanceFuturesWsStreamsMessage::Reconnected => {
387                log::info!("WebSocket reconnected, rebuilding order book snapshots");
388
389                let epoch = {
390                    let mut guard = book_epoch.write().expect(MUTEX_POISONED);
391                    *guard = guard.wrapping_add(1);
392                    *guard
393                };
394
395                let subs: Vec<(InstrumentId, u32)> = {
396                    let guard = book_subscriptions.load();
397                    guard.iter().map(|(k, v)| (*k, *v)).collect()
398                };
399
400                for (instrument_id, depth) in subs {
401                    book_buffers.insert(instrument_id, BookBuffer::new(epoch));
402
403                    log::info!(
404                        "OrderBook snapshot rebuild for {instrument_id} @ depth {depth} \
405                        starting (reconnect, epoch={epoch})"
406                    );
407
408                    let http = http_client.clone();
409                    let sender = data_sender.clone();
410                    let buffers = book_buffers.clone();
411                    let insts = instruments.clone();
412
413                    get_runtime().spawn(async move {
414                        Self::fetch_and_emit_snapshot(
415                            http,
416                            sender,
417                            buffers,
418                            insts,
419                            instrument_id,
420                            depth,
421                            epoch,
422                            clock,
423                        )
424                        .await;
425                    });
426                }
427            }
428        }
429    }
430
431    #[expect(clippy::too_many_arguments)]
432    async fn fetch_and_emit_snapshot(
433        http: BinanceFuturesHttpClient,
434        sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
435        buffers: Arc<AtomicMap<InstrumentId, BookBuffer>>,
436        instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
437        instrument_id: InstrumentId,
438        depth: u32,
439        epoch: u64,
440        clock: &'static AtomicTime,
441    ) {
442        Self::fetch_and_emit_snapshot_inner(
443            http,
444            sender,
445            buffers,
446            instruments,
447            instrument_id,
448            depth,
449            epoch,
450            clock,
451            0,
452        )
453        .await;
454    }
455
456    #[expect(clippy::too_many_arguments)]
457    async fn fetch_and_emit_snapshot_inner(
458        http: BinanceFuturesHttpClient,
459        sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
460        buffers: Arc<AtomicMap<InstrumentId, BookBuffer>>,
461        instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
462        instrument_id: InstrumentId,
463        depth: u32,
464        epoch: u64,
465        clock: &'static AtomicTime,
466        retry_count: u32,
467    ) {
468        const MAX_RETRIES: u32 = 3;
469
470        let symbol = format_binance_stream_symbol(&instrument_id).to_uppercase();
471        let params = BinanceDepthParams {
472            symbol,
473            limit: Some(depth),
474        };
475
476        match http.depth(&params).await {
477            Ok(order_book) => {
478                let ts_init = clock.get_time_ns();
479                let last_update_id = order_book.last_update_id as u64;
480
481                // Check if subscription was cancelled or epoch changed
482                {
483                    let guard = buffers.load();
484                    match guard.get(&instrument_id) {
485                        None => {
486                            log::debug!(
487                                "OrderBook subscription for {instrument_id} was cancelled, \
488                                discarding snapshot"
489                            );
490                            return;
491                        }
492                        Some(buffer) if buffer.epoch != epoch => {
493                            log::debug!(
494                                "OrderBook snapshot for {instrument_id} is stale \
495                                (epoch {epoch} != {}), discarding",
496                                buffer.epoch
497                            );
498                            return;
499                        }
500                        _ => {}
501                    }
502                }
503
504                // Get instrument for precision
505                let (price_precision, size_precision) = {
506                    let guard = instruments.load();
507                    match guard.get(&instrument_id) {
508                        Some(inst) => (inst.price_precision(), inst.size_precision()),
509                        None => {
510                            log::error!("No instrument in cache for snapshot: {instrument_id}");
511                            buffers.remove(&instrument_id);
512                            return;
513                        }
514                    }
515                };
516
517                // Validate first applicable update per Binance spec:
518                // First update must satisfy: U <= lastUpdateId+1 AND u >= lastUpdateId+1
519                let first_valid = {
520                    let guard = buffers.load();
521                    guard.get(&instrument_id).and_then(|buffer| {
522                        buffer
523                            .updates
524                            .iter()
525                            .find(|u| u.final_update_id > last_update_id)
526                            .cloned()
527                    })
528                };
529
530                if let Some(first) = &first_valid {
531                    let target = last_update_id + 1;
532                    let valid_overlap =
533                        first.first_update_id <= target && first.final_update_id >= target;
534
535                    if !valid_overlap {
536                        if retry_count < MAX_RETRIES {
537                            log::warn!(
538                                "OrderBook overlap validation failed for {instrument_id}: \
539                                lastUpdateId={last_update_id}, first_update_id={}, \
540                                final_update_id={} (need U <= {} <= u), \
541                                retrying snapshot (attempt {}/{})",
542                                first.first_update_id,
543                                first.final_update_id,
544                                target,
545                                retry_count + 1,
546                                MAX_RETRIES
547                            );
548
549                            buffers.rcu(|m| {
550                                if let Some(buffer) = m.get_mut(&instrument_id)
551                                    && buffer.epoch == epoch
552                                {
553                                    buffer.updates.clear();
554                                }
555                            });
556
557                            Box::pin(Self::fetch_and_emit_snapshot_inner(
558                                http,
559                                sender,
560                                buffers,
561                                instruments,
562                                instrument_id,
563                                depth,
564                                epoch,
565                                clock,
566                                retry_count + 1,
567                            ))
568                            .await;
569                            return;
570                        }
571                        log::error!(
572                            "OrderBook overlap validation failed for {instrument_id} after \
573                            {MAX_RETRIES} retries; book may be inconsistent"
574                        );
575                    }
576                }
577
578                let snapshot_deltas = parse_order_book_snapshot(
579                    &order_book,
580                    instrument_id,
581                    price_precision,
582                    size_precision,
583                    ts_init,
584                );
585
586                if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(
587                    OrderBookDeltas_API::new(snapshot_deltas),
588                ))) {
589                    log::error!("Failed to send snapshot: {e}");
590                }
591
592                // Take buffered updates but keep buffer entry during replay
593                let buffered = {
594                    let mut taken = Vec::new();
595                    let mut should_return = false;
596                    buffers.rcu(|m| {
597                        taken = Vec::new();
598                        should_return = false;
599
600                        match m.get_mut(&instrument_id) {
601                            Some(buffer) if buffer.epoch == epoch => {
602                                taken = std::mem::take(&mut buffer.updates);
603                            }
604                            _ => should_return = true,
605                        }
606                    });
607
608                    if should_return {
609                        return;
610                    }
611                    taken
612                };
613
614                // Replay buffered updates with continuity validation
615                let mut replayed = 0;
616                let mut last_final_update_id = last_update_id;
617
618                for update in buffered {
619                    // Drop updates where u <= lastUpdateId
620                    if update.final_update_id <= last_update_id {
621                        continue;
622                    }
623
624                    // Validate continuity: pu should equal last emitted final_update_id
625                    // (for first update, this validates pu == snapshot lastUpdateId)
626                    if update.prev_final_update_id != last_final_update_id {
627                        if retry_count < MAX_RETRIES {
628                            log::warn!(
629                                "OrderBook continuity break for {instrument_id}: \
630                                expected pu={last_final_update_id}, was pu={}, \
631                                triggering resync (attempt {}/{})",
632                                update.prev_final_update_id,
633                                retry_count + 1,
634                                MAX_RETRIES
635                            );
636
637                            buffers.rcu(|m| {
638                                if let Some(buffer) = m.get_mut(&instrument_id)
639                                    && buffer.epoch == epoch
640                                {
641                                    buffer.updates.clear();
642                                }
643                            });
644
645                            Box::pin(Self::fetch_and_emit_snapshot_inner(
646                                http,
647                                sender,
648                                buffers,
649                                instruments,
650                                instrument_id,
651                                depth,
652                                epoch,
653                                clock,
654                                retry_count + 1,
655                            ))
656                            .await;
657                            return;
658                        }
659                        log::error!(
660                            "OrderBook continuity break for {instrument_id} after {MAX_RETRIES} \
661                            retries: expected pu={last_final_update_id}, was pu={}; \
662                            book may be inconsistent",
663                            update.prev_final_update_id
664                        );
665                    }
666
667                    last_final_update_id = update.final_update_id;
668                    replayed += 1;
669
670                    if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(
671                        OrderBookDeltas_API::new(update.deltas),
672                    ))) {
673                        log::error!("Failed to send replayed deltas: {e}");
674                    }
675                }
676
677                // Drain any updates that arrived during replay
678                loop {
679                    let more = {
680                        let mut taken = Vec::new();
681                        let mut should_break = false;
682                        buffers.rcu(|m| {
683                            taken = Vec::new();
684                            should_break = false;
685
686                            match m.get_mut(&instrument_id) {
687                                Some(buffer) if buffer.epoch == epoch => {
688                                    if buffer.updates.is_empty() {
689                                        m.remove(&instrument_id);
690                                        should_break = true;
691                                    } else {
692                                        taken = std::mem::take(&mut buffer.updates);
693                                    }
694                                }
695                                _ => should_break = true,
696                            }
697                        });
698
699                        if should_break {
700                            break;
701                        }
702                        taken
703                    };
704
705                    for update in more {
706                        if update.final_update_id <= last_update_id {
707                            continue;
708                        }
709
710                        if update.prev_final_update_id != last_final_update_id {
711                            if retry_count < MAX_RETRIES {
712                                log::warn!(
713                                    "OrderBook continuity break for {instrument_id}: \
714                                    expected pu={last_final_update_id}, was pu={}, \
715                                    triggering resync (attempt {}/{})",
716                                    update.prev_final_update_id,
717                                    retry_count + 1,
718                                    MAX_RETRIES
719                                );
720
721                                buffers.rcu(|m| {
722                                    if let Some(buffer) = m.get_mut(&instrument_id)
723                                        && buffer.epoch == epoch
724                                    {
725                                        buffer.updates.clear();
726                                    }
727                                });
728
729                                Box::pin(Self::fetch_and_emit_snapshot_inner(
730                                    http,
731                                    sender,
732                                    buffers,
733                                    instruments,
734                                    instrument_id,
735                                    depth,
736                                    epoch,
737                                    clock,
738                                    retry_count + 1,
739                                ))
740                                .await;
741                                return;
742                            }
743                            log::error!(
744                                "OrderBook continuity break for {instrument_id} after \
745                                {MAX_RETRIES} retries; book may be inconsistent"
746                            );
747                        }
748
749                        last_final_update_id = update.final_update_id;
750                        replayed += 1;
751
752                        if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(
753                            OrderBookDeltas_API::new(update.deltas),
754                        ))) {
755                            log::error!("Failed to send replayed deltas: {e}");
756                        }
757                    }
758                }
759
760                log::info!(
761                    "OrderBook snapshot rebuild for {instrument_id} completed \
762                    (lastUpdateId={last_update_id}, replayed={replayed})"
763                );
764            }
765            Err(e) => {
766                log::error!("Failed to request order book snapshot for {instrument_id}: {e}");
767                buffers.remove(&instrument_id);
768            }
769        }
770    }
771}
772
773fn upsert_instrument(
774    cache: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
775    instrument: InstrumentAny,
776) {
777    cache.insert(instrument.id(), instrument);
778}
779
780fn parse_order_book_snapshot(
781    order_book: &BinanceOrderBook,
782    instrument_id: InstrumentId,
783    price_precision: u8,
784    size_precision: u8,
785    ts_init: UnixNanos,
786) -> OrderBookDeltas {
787    let sequence = order_book.last_update_id as u64;
788    let ts_event = order_book.transaction_time.map_or(ts_init, |t| {
789        UnixNanos::from((t as u64) * NANOSECONDS_IN_MILLISECOND)
790    });
791
792    let total_levels = order_book.bids.len() + order_book.asks.len();
793    let mut deltas = Vec::with_capacity(total_levels + 1);
794
795    // First delta is CLEAR to reset the book
796    deltas.push(OrderBookDelta::clear(
797        instrument_id,
798        sequence,
799        ts_event,
800        ts_init,
801    ));
802
803    for (i, (price_str, qty_str)) in order_book.bids.iter().enumerate() {
804        let price: f64 = price_str.parse().unwrap_or(0.0);
805        let size: f64 = qty_str.parse().unwrap_or(0.0);
806
807        let is_last = i == order_book.bids.len() - 1 && order_book.asks.is_empty();
808        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
809
810        let order = BookOrder::new(
811            OrderSide::Buy,
812            Price::new(price, price_precision),
813            Quantity::new(size, size_precision),
814            0,
815        );
816
817        deltas.push(OrderBookDelta::new(
818            instrument_id,
819            BookAction::Add,
820            order,
821            flags,
822            sequence,
823            ts_event,
824            ts_init,
825        ));
826    }
827
828    for (i, (price_str, qty_str)) in order_book.asks.iter().enumerate() {
829        let price: f64 = price_str.parse().unwrap_or(0.0);
830        let size: f64 = qty_str.parse().unwrap_or(0.0);
831
832        let is_last = i == order_book.asks.len() - 1;
833        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
834
835        let order = BookOrder::new(
836            OrderSide::Sell,
837            Price::new(price, price_precision),
838            Quantity::new(size, size_precision),
839            0,
840        );
841
842        deltas.push(OrderBookDelta::new(
843            instrument_id,
844            BookAction::Add,
845            order,
846            flags,
847            sequence,
848            ts_event,
849            ts_init,
850        ));
851    }
852
853    OrderBookDeltas::new(instrument_id, deltas)
854}
855
856#[async_trait::async_trait(?Send)]
857impl DataClient for BinanceFuturesDataClient {
858    fn client_id(&self) -> ClientId {
859        self.client_id
860    }
861
862    fn venue(&self) -> Option<Venue> {
863        Some(self.venue())
864    }
865
866    fn start(&mut self) -> anyhow::Result<()> {
867        log::info!(
868            "Started: client_id={}, product_type={:?}, environment={:?}",
869            self.client_id,
870            self.product_type,
871            self.config.environment,
872        );
873        Ok(())
874    }
875
876    fn stop(&mut self) -> anyhow::Result<()> {
877        log::info!("Stopping {id}", id = self.client_id);
878        self.cancellation_token.cancel();
879        self.is_connected.store(false, Ordering::Relaxed);
880        Ok(())
881    }
882
883    fn reset(&mut self) -> anyhow::Result<()> {
884        log::debug!("Resetting {id}", id = self.client_id);
885
886        self.cancellation_token.cancel();
887
888        for task in self.tasks.drain(..) {
889            task.abort();
890        }
891
892        let mut ws = self.ws_client.clone();
893        get_runtime().spawn(async move {
894            let _ = ws.close().await;
895        });
896
897        // Clear subscription state so resubscribes issue fresh WS subscribes
898        self.mark_price_refs.store(AHashMap::new());
899        self.book_subscriptions.store(AHashMap::new());
900        self.book_buffers.store(AHashMap::new());
901
902        self.is_connected.store(false, Ordering::Relaxed);
903        self.cancellation_token = CancellationToken::new();
904        Ok(())
905    }
906
907    fn dispose(&mut self) -> anyhow::Result<()> {
908        log::debug!("Disposing {id}", id = self.client_id);
909        self.stop()
910    }
911
912    async fn connect(&mut self) -> anyhow::Result<()> {
913        if self.is_connected() {
914            return Ok(());
915        }
916
917        // Reinitialize token in case of reconnection after disconnect
918        self.cancellation_token = CancellationToken::new();
919
920        let instruments = self
921            .http_client
922            .request_instruments()
923            .await
924            .context("failed to request Binance Futures instruments")?;
925
926        // Seed the status cache from the HTTP client's instruments cache
927        {
928            let mut inst_map = AHashMap::new();
929            let mut status_map = AHashMap::new();
930
931            for instrument in &instruments {
932                inst_map.insert(instrument.id(), instrument.clone());
933            }
934
935            let http_instruments = self.http_client.instruments_cache();
936            for entry in http_instruments.iter() {
937                let raw_symbol = entry.key();
938                let action = match entry.value() {
939                    crate::futures::http::client::BinanceFuturesInstrument::UsdM(s) => {
940                        MarketStatusAction::from(s.status)
941                    }
942                    crate::futures::http::client::BinanceFuturesInstrument::CoinM(s) => s
943                        .contract_status
944                        .map_or(MarketStatusAction::NotAvailableForTrading, Into::into),
945                };
946
947                for instrument in &instruments {
948                    if instrument.raw_symbol().as_str() == raw_symbol.as_str() {
949                        status_map.insert(instrument.id(), action);
950                        break;
951                    }
952                }
953            }
954
955            self.instruments.store(inst_map);
956            self.status_cache.store(status_map);
957        }
958
959        for instrument in instruments.clone() {
960            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
961                log::warn!("Failed to send instrument: {e}");
962            }
963        }
964
965        self.ws_client.cache_instruments(&instruments);
966        self.ws_public_client.cache_instruments(&instruments);
967
968        log::info!("Connecting to Binance Futures market WebSocket...");
969        self.ws_client.connect().await.map_err(|e| {
970            log::error!("Binance Futures market WebSocket connection failed: {e:?}");
971            anyhow::anyhow!("failed to connect Binance Futures market WebSocket: {e}")
972        })?;
973        log::info!("Binance Futures market WebSocket connected");
974
975        log::info!("Connecting to Binance Futures public WebSocket...");
976        self.ws_public_client.connect().await.map_err(|e| {
977            log::error!("Binance Futures public WebSocket connection failed: {e:?}");
978            anyhow::anyhow!("failed to connect Binance Futures public WebSocket: {e}")
979        })?;
980        log::info!("Binance Futures public WebSocket connected");
981
982        // Spawn market stream handler
983        let stream = self.ws_client.stream();
984        let sender = self.data_sender.clone();
985        let insts = self.instruments.clone();
986        let ws_insts = self.ws_client.instruments_cache();
987        let buffers = self.book_buffers.clone();
988        let book_subs = self.book_subscriptions.clone();
989        let book_epoch = self.book_epoch.clone();
990        let http = self.http_client.clone();
991        let clock = self.clock;
992        let cancel = self.cancellation_token.clone();
993
994        let handle = get_runtime().spawn(async move {
995            pin_mut!(stream);
996
997            loop {
998                tokio::select! {
999                    Some(message) = stream.next() => {
1000                        Self::handle_ws_message(
1001                            message,
1002                            &sender,
1003                            &insts,
1004                            &ws_insts,
1005                            &buffers,
1006                            &book_subs,
1007                            &book_epoch,
1008                            &http,
1009                            clock,
1010                        );
1011                    }
1012                    () = cancel.cancelled() => {
1013                        log::debug!("Market WebSocket stream task cancelled");
1014                        break;
1015                    }
1016                }
1017            }
1018        });
1019        self.tasks.push(handle);
1020
1021        // Spawn public stream handler (book data)
1022        let pub_stream = self.ws_public_client.stream();
1023        let pub_sender = self.data_sender.clone();
1024        let pub_insts = self.instruments.clone();
1025        let pub_ws_insts = self.ws_public_client.instruments_cache();
1026        let pub_buffers = self.book_buffers.clone();
1027        let pub_book_subs = self.book_subscriptions.clone();
1028        let pub_book_epoch = self.book_epoch.clone();
1029        let pub_http = self.http_client.clone();
1030        let pub_cancel = self.cancellation_token.clone();
1031
1032        let pub_handle = get_runtime().spawn(async move {
1033            pin_mut!(pub_stream);
1034
1035            loop {
1036                tokio::select! {
1037                    Some(message) = pub_stream.next() => {
1038                        Self::handle_ws_message(
1039                            message,
1040                            &pub_sender,
1041                            &pub_insts,
1042                            &pub_ws_insts,
1043                            &pub_buffers,
1044                            &pub_book_subs,
1045                            &pub_book_epoch,
1046                            &pub_http,
1047                            clock,
1048                        );
1049                    }
1050                    () = pub_cancel.cancelled() => {
1051                        log::debug!("Public WebSocket stream task cancelled");
1052                        break;
1053                    }
1054                }
1055            }
1056        });
1057        self.tasks.push(pub_handle);
1058
1059        // Spawn instrument status polling task
1060        let poll_secs = self.config.instrument_status_poll_secs;
1061        if poll_secs > 0 {
1062            let poll_http = self.http_client.clone();
1063            let poll_sender = self.data_sender.clone();
1064            let poll_instruments = self.instruments.clone();
1065            let poll_status_cache = self.status_cache.clone();
1066            let poll_cancel = self.cancellation_token.clone();
1067            let poll_clock = self.clock;
1068
1069            let poll_handle = get_runtime().spawn(async move {
1070                let mut interval =
1071                    tokio::time::interval(tokio::time::Duration::from_secs(poll_secs));
1072                interval.tick().await; // Skip first immediate tick
1073
1074                loop {
1075                    tokio::select! {
1076                        _ = interval.tick() => {
1077                            match poll_http.request_symbol_statuses().await {
1078                                Ok(symbol_statuses) => {
1079                                    let ts = poll_clock.get_time_ns();
1080                                    let inst_guard = poll_instruments.load();
1081
1082                                    // Build raw_symbol -> InstrumentId lookup
1083                                    let raw_to_id: AHashMap<Ustr, InstrumentId> = inst_guard
1084                                        .values()
1085                                        .map(|inst| (inst.raw_symbol().inner(), inst.id()))
1086                                        .collect();
1087
1088                                    let mut new_statuses = AHashMap::new();
1089
1090                                    for (raw_symbol, action) in &symbol_statuses {
1091                                        if let Some(&id) = raw_to_id.get(raw_symbol) {
1092                                            new_statuses.insert(id, *action);
1093                                        }
1094                                    }
1095                                    drop(inst_guard);
1096
1097                                    let mut cache = (**poll_status_cache.load()).clone();
1098                                    diff_and_emit_statuses(
1099                                        &new_statuses, &mut cache, &poll_sender, ts, ts,
1100                                    );
1101                                    poll_status_cache.store(cache);
1102                                }
1103                                Err(e) => {
1104                                    log::warn!("Futures instrument status poll failed: {e}");
1105                                }
1106                            }
1107                        }
1108                        () = poll_cancel.cancelled() => {
1109                            log::debug!("Futures instrument status polling task cancelled");
1110                            break;
1111                        }
1112                    }
1113                }
1114            });
1115            self.tasks.push(poll_handle);
1116            log::info!("Futures instrument status polling started: interval={poll_secs}s");
1117        }
1118
1119        self.is_connected.store(true, Ordering::Release);
1120        log::info!("Connected: client_id={}", self.client_id);
1121        Ok(())
1122    }
1123
1124    async fn disconnect(&mut self) -> anyhow::Result<()> {
1125        if self.is_disconnected() {
1126            return Ok(());
1127        }
1128
1129        self.cancellation_token.cancel();
1130
1131        let _ = self.ws_client.close().await;
1132        let _ = self.ws_public_client.close().await;
1133
1134        let handles: Vec<_> = self.tasks.drain(..).collect();
1135        for handle in handles {
1136            if let Err(e) = handle.await {
1137                log::error!("Error joining WebSocket task: {e}");
1138            }
1139        }
1140
1141        // Clear subscription state so resubscribes issue fresh WS subscribes
1142        self.mark_price_refs.store(AHashMap::new());
1143        self.book_subscriptions.store(AHashMap::new());
1144        self.book_buffers.store(AHashMap::new());
1145
1146        self.is_connected.store(false, Ordering::Release);
1147        log::info!("Disconnected: client_id={}", self.client_id);
1148        Ok(())
1149    }
1150
1151    fn is_connected(&self) -> bool {
1152        self.is_connected.load(Ordering::Relaxed)
1153    }
1154
1155    fn is_disconnected(&self) -> bool {
1156        !self.is_connected()
1157    }
1158
1159    fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
1160        log::debug!(
1161            "subscribe_instruments: Binance Futures instruments are fetched via HTTP on connect"
1162        );
1163        Ok(())
1164    }
1165
1166    fn subscribe_instrument(&mut self, _cmd: SubscribeInstrument) -> anyhow::Result<()> {
1167        log::debug!(
1168            "subscribe_instrument: Binance Futures instruments are fetched via HTTP on connect"
1169        );
1170        Ok(())
1171    }
1172
1173    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
1174        if cmd.book_type != BookType::L2_MBP {
1175            anyhow::bail!("Binance Futures only supports L2_MBP order book deltas");
1176        }
1177
1178        let instrument_id = cmd.instrument_id;
1179        let depth = cmd.depth.map_or(1000, |d| d.get() as u32);
1180
1181        if !BINANCE_BOOK_DEPTHS.contains(&depth) {
1182            anyhow::bail!(
1183                "Invalid depth {depth} for Binance Futures order book. \
1184                Valid values: {BINANCE_BOOK_DEPTHS:?}"
1185            );
1186        }
1187
1188        // Track subscription for reconnect handling
1189        self.book_subscriptions.insert(instrument_id, depth);
1190
1191        // Bump epoch to invalidate any in-flight snapshot from a prior subscription
1192        let epoch = {
1193            let mut guard = self.book_epoch.write().expect(MUTEX_POISONED);
1194            *guard = guard.wrapping_add(1);
1195            *guard
1196        };
1197
1198        // Start buffering deltas for this instrument
1199        self.book_buffers
1200            .insert(instrument_id, BookBuffer::new(epoch));
1201
1202        log::info!("OrderBook snapshot rebuild for {instrument_id} @ depth {depth} starting");
1203
1204        // Subscribe to WebSocket depth stream (0ms = unthrottled for Futures)
1205        let ws = self.ws_public_client.clone();
1206        let stream = format!("{}@depth@0ms", format_binance_stream_symbol(&instrument_id));
1207
1208        self.spawn_ws(
1209            async move {
1210                ws.subscribe(vec![stream])
1211                    .await
1212                    .context("book deltas subscription")
1213            },
1214            "order book subscription",
1215        );
1216
1217        // Spawn task to fetch HTTP snapshot and replay buffered deltas
1218        let http = self.http_client.clone();
1219        let sender = self.data_sender.clone();
1220        let buffers = self.book_buffers.clone();
1221        let instruments = self.instruments.clone();
1222        let clock = self.clock;
1223
1224        get_runtime().spawn(async move {
1225            Self::fetch_and_emit_snapshot(
1226                http,
1227                sender,
1228                buffers,
1229                instruments,
1230                instrument_id,
1231                depth,
1232                epoch,
1233                clock,
1234            )
1235            .await;
1236        });
1237
1238        Ok(())
1239    }
1240
1241    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
1242        let instrument_id = cmd.instrument_id;
1243        let ws = self.ws_public_client.clone();
1244
1245        // Binance Futures uses bookTicker for best bid/ask (public endpoint)
1246        let stream = format!(
1247            "{}@bookTicker",
1248            format_binance_stream_symbol(&instrument_id)
1249        );
1250
1251        self.spawn_ws(
1252            async move {
1253                ws.subscribe(vec![stream])
1254                    .await
1255                    .context("quotes subscription")
1256            },
1257            "quote subscription",
1258        );
1259        Ok(())
1260    }
1261
1262    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
1263        let instrument_id = cmd.instrument_id;
1264        let ws = self.ws_client.clone();
1265
1266        // Binance Futures uses aggTrade for aggregate trades
1267        let stream = format!("{}@aggTrade", format_binance_stream_symbol(&instrument_id));
1268
1269        self.spawn_ws(
1270            async move {
1271                ws.subscribe(vec![stream])
1272                    .await
1273                    .context("trades subscription")
1274            },
1275            "trade subscription",
1276        );
1277        Ok(())
1278    }
1279
1280    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
1281        let bar_type = cmd.bar_type;
1282        let ws = self.ws_client.clone();
1283        let interval = bar_spec_to_binance_interval(bar_type.spec())?;
1284
1285        let stream = format!(
1286            "{}@kline_{}",
1287            format_binance_stream_symbol(&bar_type.instrument_id()),
1288            interval.as_str()
1289        );
1290
1291        self.spawn_ws(
1292            async move {
1293                ws.subscribe(vec![stream])
1294                    .await
1295                    .context("bars subscription")
1296            },
1297            "bar subscription",
1298        );
1299        Ok(())
1300    }
1301
1302    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
1303        let instrument_id = cmd.instrument_id;
1304
1305        // Mark/index/funding share the same stream - use ref counting
1306        let should_subscribe = {
1307            let prev = self
1308                .mark_price_refs
1309                .load()
1310                .get(&instrument_id)
1311                .copied()
1312                .unwrap_or(0);
1313            self.mark_price_refs.rcu(|m| {
1314                let count = m.entry(instrument_id).or_insert(0);
1315                *count += 1;
1316            });
1317            prev == 0
1318        };
1319
1320        if should_subscribe {
1321            let ws = self.ws_client.clone();
1322            let stream = format!(
1323                "{}@markPrice@1s",
1324                format_binance_stream_symbol(&instrument_id)
1325            );
1326
1327            self.spawn_ws(
1328                async move {
1329                    ws.subscribe(vec![stream])
1330                        .await
1331                        .context("mark prices subscription")
1332                },
1333                "mark prices subscription",
1334            );
1335        }
1336        Ok(())
1337    }
1338
1339    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
1340        let instrument_id = cmd.instrument_id;
1341
1342        // Mark/index/funding share the same stream - use ref counting
1343        let should_subscribe = {
1344            let prev = self
1345                .mark_price_refs
1346                .load()
1347                .get(&instrument_id)
1348                .copied()
1349                .unwrap_or(0);
1350            self.mark_price_refs.rcu(|m| {
1351                let count = m.entry(instrument_id).or_insert(0);
1352                *count += 1;
1353            });
1354            prev == 0
1355        };
1356
1357        if should_subscribe {
1358            let ws = self.ws_client.clone();
1359            let stream = format!(
1360                "{}@markPrice@1s",
1361                format_binance_stream_symbol(&instrument_id)
1362            );
1363
1364            self.spawn_ws(
1365                async move {
1366                    ws.subscribe(vec![stream])
1367                        .await
1368                        .context("index prices subscription")
1369                },
1370                "index prices subscription",
1371            );
1372        }
1373        Ok(())
1374    }
1375
1376    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
1377        let instrument_id = cmd.instrument_id;
1378
1379        let should_subscribe = {
1380            let prev = self
1381                .mark_price_refs
1382                .load()
1383                .get(&instrument_id)
1384                .copied()
1385                .unwrap_or(0);
1386            self.mark_price_refs.rcu(|m| {
1387                let count = m.entry(instrument_id).or_insert(0);
1388                *count += 1;
1389            });
1390            prev == 0
1391        };
1392
1393        if should_subscribe {
1394            let ws = self.ws_client.clone();
1395            let stream = format!(
1396                "{}@markPrice@1s",
1397                format_binance_stream_symbol(&instrument_id)
1398            );
1399
1400            self.spawn_ws(
1401                async move {
1402                    ws.subscribe(vec![stream])
1403                        .await
1404                        .context("funding rates subscription")
1405                },
1406                "funding rates subscription",
1407            );
1408        }
1409        Ok(())
1410    }
1411
1412    fn subscribe_instrument_status(
1413        &mut self,
1414        cmd: SubscribeInstrumentStatus,
1415    ) -> anyhow::Result<()> {
1416        log::debug!(
1417            "subscribe_instrument_status: {id} (status changes detected via periodic exchange info polling)",
1418            id = cmd.instrument_id,
1419        );
1420        Ok(())
1421    }
1422
1423    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1424        let instrument_id = cmd.instrument_id;
1425        let ws = self.ws_public_client.clone();
1426
1427        // Remove subscription tracking
1428        self.book_subscriptions.remove(&instrument_id);
1429
1430        // Remove buffer to prevent snapshot task from emitting after unsubscribe
1431        self.book_buffers.remove(&instrument_id);
1432
1433        let symbol_lower = format_binance_stream_symbol(&instrument_id);
1434        let streams = vec![
1435            format!("{symbol_lower}@depth"),
1436            format!("{symbol_lower}@depth@0ms"),
1437            format!("{symbol_lower}@depth@100ms"),
1438            format!("{symbol_lower}@depth@250ms"),
1439            format!("{symbol_lower}@depth@500ms"),
1440        ];
1441
1442        self.spawn_ws(
1443            async move {
1444                ws.unsubscribe(streams)
1445                    .await
1446                    .context("book deltas unsubscribe")
1447            },
1448            "order book unsubscribe",
1449        );
1450        Ok(())
1451    }
1452
1453    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1454        let instrument_id = cmd.instrument_id;
1455        let ws = self.ws_public_client.clone();
1456
1457        let stream = format!(
1458            "{}@bookTicker",
1459            format_binance_stream_symbol(&instrument_id)
1460        );
1461
1462        self.spawn_ws(
1463            async move {
1464                ws.unsubscribe(vec![stream])
1465                    .await
1466                    .context("quotes unsubscribe")
1467            },
1468            "quote unsubscribe",
1469        );
1470        Ok(())
1471    }
1472
1473    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1474        let instrument_id = cmd.instrument_id;
1475        let ws = self.ws_client.clone();
1476
1477        let stream = format!("{}@aggTrade", format_binance_stream_symbol(&instrument_id));
1478
1479        self.spawn_ws(
1480            async move {
1481                ws.unsubscribe(vec![stream])
1482                    .await
1483                    .context("trades unsubscribe")
1484            },
1485            "trade unsubscribe",
1486        );
1487        Ok(())
1488    }
1489
1490    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1491        let bar_type = cmd.bar_type;
1492        let ws = self.ws_client.clone();
1493        let interval = bar_spec_to_binance_interval(bar_type.spec())?;
1494
1495        let stream = format!(
1496            "{}@kline_{}",
1497            format_binance_stream_symbol(&bar_type.instrument_id()),
1498            interval.as_str()
1499        );
1500
1501        self.spawn_ws(
1502            async move {
1503                ws.unsubscribe(vec![stream])
1504                    .await
1505                    .context("bars unsubscribe")
1506            },
1507            "bar unsubscribe",
1508        );
1509        Ok(())
1510    }
1511
1512    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1513        let instrument_id = cmd.instrument_id;
1514
1515        // Mark/index/funding share the same stream - use ref counting
1516        let should_unsubscribe = {
1517            let prev = self.mark_price_refs.load().get(&instrument_id).copied();
1518            match prev {
1519                Some(count) if count <= 1 => {
1520                    self.mark_price_refs.remove(&instrument_id);
1521                    true
1522                }
1523                Some(_) => {
1524                    self.mark_price_refs.rcu(|m| {
1525                        if let Some(count) = m.get_mut(&instrument_id) {
1526                            *count = count.saturating_sub(1);
1527                        }
1528                    });
1529                    false
1530                }
1531                None => false,
1532            }
1533        };
1534
1535        if should_unsubscribe {
1536            let ws = self.ws_client.clone();
1537            let symbol_lower = format_binance_stream_symbol(&instrument_id);
1538            let streams = vec![
1539                format!("{symbol_lower}@markPrice"),
1540                format!("{symbol_lower}@markPrice@1s"),
1541                format!("{symbol_lower}@markPrice@3s"),
1542            ];
1543
1544            self.spawn_ws(
1545                async move {
1546                    ws.unsubscribe(streams)
1547                        .await
1548                        .context("mark prices unsubscribe")
1549                },
1550                "mark prices unsubscribe",
1551            );
1552        }
1553        Ok(())
1554    }
1555
1556    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1557        let instrument_id = cmd.instrument_id;
1558
1559        // Mark/index/funding share the same stream - use ref counting
1560        let should_unsubscribe = {
1561            let prev = self.mark_price_refs.load().get(&instrument_id).copied();
1562            match prev {
1563                Some(count) if count <= 1 => {
1564                    self.mark_price_refs.remove(&instrument_id);
1565                    true
1566                }
1567                Some(_) => {
1568                    self.mark_price_refs.rcu(|m| {
1569                        if let Some(count) = m.get_mut(&instrument_id) {
1570                            *count = count.saturating_sub(1);
1571                        }
1572                    });
1573                    false
1574                }
1575                None => false,
1576            }
1577        };
1578
1579        if should_unsubscribe {
1580            let ws = self.ws_client.clone();
1581            let symbol_lower = format_binance_stream_symbol(&instrument_id);
1582            let streams = vec![
1583                format!("{symbol_lower}@markPrice"),
1584                format!("{symbol_lower}@markPrice@1s"),
1585                format!("{symbol_lower}@markPrice@3s"),
1586            ];
1587
1588            self.spawn_ws(
1589                async move {
1590                    ws.unsubscribe(streams)
1591                        .await
1592                        .context("index prices unsubscribe")
1593                },
1594                "index prices unsubscribe",
1595            );
1596        }
1597        Ok(())
1598    }
1599
1600    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1601        let instrument_id = cmd.instrument_id;
1602
1603        let should_unsubscribe = {
1604            let prev = self.mark_price_refs.load().get(&instrument_id).copied();
1605            match prev {
1606                Some(count) if count <= 1 => {
1607                    self.mark_price_refs.remove(&instrument_id);
1608                    true
1609                }
1610                Some(_) => {
1611                    self.mark_price_refs.rcu(|m| {
1612                        if let Some(count) = m.get_mut(&instrument_id) {
1613                            *count = count.saturating_sub(1);
1614                        }
1615                    });
1616                    false
1617                }
1618                None => false,
1619            }
1620        };
1621
1622        if should_unsubscribe {
1623            let ws = self.ws_client.clone();
1624            let symbol_lower = format_binance_stream_symbol(&instrument_id);
1625            let streams = vec![
1626                format!("{symbol_lower}@markPrice"),
1627                format!("{symbol_lower}@markPrice@1s"),
1628                format!("{symbol_lower}@markPrice@3s"),
1629            ];
1630
1631            self.spawn_ws(
1632                async move {
1633                    ws.unsubscribe(streams)
1634                        .await
1635                        .context("funding rates unsubscribe")
1636                },
1637                "funding rates unsubscribe",
1638            );
1639        }
1640        Ok(())
1641    }
1642
1643    fn unsubscribe_instrument_status(
1644        &mut self,
1645        cmd: &UnsubscribeInstrumentStatus,
1646    ) -> anyhow::Result<()> {
1647        log::debug!(
1648            "unsubscribe_instrument_status: {id}",
1649            id = cmd.instrument_id,
1650        );
1651        Ok(())
1652    }
1653
1654    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1655        let http = self.http_client.clone();
1656        let sender = self.data_sender.clone();
1657        let instruments_cache = self.instruments.clone();
1658        let request_id = request.request_id;
1659        let client_id = request.client_id.unwrap_or(self.client_id);
1660        let venue = self.venue();
1661        let start = request.start;
1662        let end = request.end;
1663        let params = request.params;
1664        let clock = self.clock;
1665        let start_nanos = datetime_to_unix_nanos(start);
1666        let end_nanos = datetime_to_unix_nanos(end);
1667
1668        get_runtime().spawn(async move {
1669            match http.request_instruments().await {
1670                Ok(instruments) => {
1671                    for instrument in &instruments {
1672                        upsert_instrument(&instruments_cache, instrument.clone());
1673                    }
1674
1675                    let response = DataResponse::Instruments(InstrumentsResponse::new(
1676                        request_id,
1677                        client_id,
1678                        venue,
1679                        instruments,
1680                        start_nanos,
1681                        end_nanos,
1682                        clock.get_time_ns(),
1683                        params,
1684                    ));
1685
1686                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1687                        log::error!("Failed to send instruments response: {e}");
1688                    }
1689                }
1690                Err(e) => log::error!("Instruments request failed: {e:?}"),
1691            }
1692        });
1693
1694        Ok(())
1695    }
1696
1697    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1698        let http = self.http_client.clone();
1699        let sender = self.data_sender.clone();
1700        let instruments = self.instruments.clone();
1701        let instrument_id = request.instrument_id;
1702        let request_id = request.request_id;
1703        let client_id = request.client_id.unwrap_or(self.client_id);
1704        let start = request.start;
1705        let end = request.end;
1706        let params = request.params;
1707        let clock = self.clock;
1708        let start_nanos = datetime_to_unix_nanos(start);
1709        let end_nanos = datetime_to_unix_nanos(end);
1710
1711        get_runtime().spawn(async move {
1712            match http.request_instruments().await {
1713                Ok(all_instruments) => {
1714                    for instrument in &all_instruments {
1715                        upsert_instrument(&instruments, instrument.clone());
1716                    }
1717
1718                    let instrument = all_instruments
1719                        .into_iter()
1720                        .find(|i| i.id() == instrument_id);
1721
1722                    if let Some(instrument) = instrument {
1723                        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1724                            request_id,
1725                            client_id,
1726                            instrument.id(),
1727                            instrument,
1728                            start_nanos,
1729                            end_nanos,
1730                            clock.get_time_ns(),
1731                            params,
1732                        )));
1733
1734                        if let Err(e) = sender.send(DataEvent::Response(response)) {
1735                            log::error!("Failed to send instrument response: {e}");
1736                        }
1737                    } else {
1738                        log::error!("Instrument not found: {instrument_id}");
1739                    }
1740                }
1741                Err(e) => log::error!("Instrument request failed: {e:?}"),
1742            }
1743        });
1744
1745        Ok(())
1746    }
1747
1748    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1749        let http = self.http_client.clone();
1750        let sender = self.data_sender.clone();
1751        let instrument_id = request.instrument_id;
1752        let limit = request.limit.map(|n| n.get() as u32);
1753        let request_id = request.request_id;
1754        let client_id = request.client_id.unwrap_or(self.client_id);
1755        let params = request.params;
1756        let clock = self.clock;
1757        let start_nanos = datetime_to_unix_nanos(request.start);
1758        let end_nanos = datetime_to_unix_nanos(request.end);
1759
1760        get_runtime().spawn(async move {
1761            match http
1762                .request_trades(instrument_id, limit)
1763                .await
1764                .context("failed to request trades from Binance Futures")
1765            {
1766                Ok(trades) => {
1767                    let response = DataResponse::Trades(TradesResponse::new(
1768                        request_id,
1769                        client_id,
1770                        instrument_id,
1771                        trades,
1772                        start_nanos,
1773                        end_nanos,
1774                        clock.get_time_ns(),
1775                        params,
1776                    ));
1777
1778                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1779                        log::error!("Failed to send trades response: {e}");
1780                    }
1781                }
1782                Err(e) => log::error!("Trade request failed: {e:?}"),
1783            }
1784        });
1785
1786        Ok(())
1787    }
1788
1789    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1790        let http = self.http_client.clone();
1791        let sender = self.data_sender.clone();
1792        let bar_type = request.bar_type;
1793        let start = request.start;
1794        let end = request.end;
1795        let limit = request.limit.map(|n| n.get() as u32);
1796        let request_id = request.request_id;
1797        let client_id = request.client_id.unwrap_or(self.client_id);
1798        let params = request.params;
1799        let clock = self.clock;
1800        let start_nanos = datetime_to_unix_nanos(start);
1801        let end_nanos = datetime_to_unix_nanos(end);
1802
1803        get_runtime().spawn(async move {
1804            match http
1805                .request_bars(bar_type, start, end, limit)
1806                .await
1807                .context("failed to request bars from Binance Futures")
1808            {
1809                Ok(bars) => {
1810                    let response = DataResponse::Bars(BarsResponse::new(
1811                        request_id,
1812                        client_id,
1813                        bar_type,
1814                        bars,
1815                        start_nanos,
1816                        end_nanos,
1817                        clock.get_time_ns(),
1818                        params,
1819                    ));
1820
1821                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1822                        log::error!("Failed to send bars response: {e}");
1823                    }
1824                }
1825                Err(e) => log::error!("Bar request failed: {e:?}"),
1826            }
1827        });
1828
1829        Ok(())
1830    }
1831}