Skip to main content

nautilus_binance/futures/websocket/streams/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Futures WebSocket client for JSON market data streams.
17//!
18//! ## Connection Details
19//!
20//! - USD-M Endpoint: `wss://fstream.binance.com/market/ws`
21//! - COIN-M Endpoint: `wss://dstream.binance.com/ws`
22//! - Max streams: 200 per connection
23//! - Max connections: 20 per pool (up to 4,000 total streams)
24//! - Connection validity: 24 hours
25//! - Ping/pong: Every 3 minutes
26
27use std::{
28    fmt::Debug,
29    sync::{
30        Arc, Mutex,
31        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32    },
33};
34
35use futures_util::Stream;
36use nautilus_common::live::get_runtime;
37use nautilus_core::{AtomicMap, string::secret::REDACTED};
38use nautilus_model::instruments::{Instrument, InstrumentAny};
39use nautilus_network::{
40    mode::ConnectionMode,
41    websocket::{
42        PingHandler, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
43        channel_message_handler,
44    },
45};
46use tokio_tungstenite::tungstenite::Message;
47use tokio_util::sync::CancellationToken;
48use ustr::Ustr;
49
50use super::{
51    error::{BinanceWsError, BinanceWsResult},
52    handler::BinanceFuturesDataWsFeedHandler,
53    messages::{BinanceFuturesWsStreamsCommand, BinanceFuturesWsStreamsMessage},
54};
55use crate::common::{
56    consts::{
57        BINANCE_API_KEY_HEADER, BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION, BINANCE_WS_CONNECTION_QUOTA,
58        BINANCE_WS_SUBSCRIPTION_QUOTA,
59    },
60    credential::SigningCredential,
61    enums::{BinanceEnvironment, BinanceProductType},
62    urls::get_ws_base_url,
63};
64
65/// Maximum streams per WebSocket connection for Futures.
66pub const MAX_STREAMS_PER_CONNECTION: usize = 200;
67
68/// Maximum connections per pool.
69const MAX_CONNECTIONS: usize = 20;
70
71// State for a single WebSocket connection within the pool
72struct ConnectionSlot {
73    cmd_tx: tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsStreamsCommand>,
74    streams: Vec<String>,
75    subscriptions_state: SubscriptionState,
76    handler_task: tokio::task::JoinHandle<()>,
77    bytes_task: tokio::task::JoinHandle<()>,
78    cancellation_token: CancellationToken,
79    connection_mode: Arc<AtomicU8>,
80}
81
82/// Binance Futures WebSocket client for JSON market data streams.
83///
84/// Manages a pool of up to 20 connections, each supporting up to 200 streams.
85/// New connections are created automatically when subscribing exceeds the current
86/// connection's stream limit. All connections feed into a single output stream,
87/// transparent to the data client.
88#[derive(Clone)]
89pub struct BinanceFuturesWebSocketClient {
90    url: String,
91    product_type: BinanceProductType,
92    credential: Option<Arc<SigningCredential>>,
93    heartbeat: Option<u64>,
94    signal: Arc<AtomicBool>,
95    slots: Arc<Mutex<Vec<ConnectionSlot>>>,
96    out_tx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsStreamsMessage>>>>,
97    out_rx:
98        Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsStreamsMessage>>>>,
99    request_id_counter: Arc<AtomicU64>,
100    instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
101    transport_backend: TransportBackend,
102}
103
104impl Debug for BinanceFuturesWebSocketClient {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct(stringify!(BinanceFuturesWebSocketClient))
107            .field("url", &self.url)
108            .field("product_type", &self.product_type)
109            .field("credential", &self.credential.as_ref().map(|_| REDACTED))
110            .field("heartbeat", &self.heartbeat)
111            .finish_non_exhaustive()
112    }
113}
114
115impl BinanceFuturesWebSocketClient {
116    /// Creates a new [`BinanceFuturesWebSocketClient`] instance.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if:
121    /// - `product_type` is not a futures type (UsdM or CoinM).
122    /// - Credential creation fails.
123    pub fn new(
124        product_type: BinanceProductType,
125        environment: BinanceEnvironment,
126        api_key: Option<String>,
127        api_secret: Option<String>,
128        url_override: Option<String>,
129        heartbeat: Option<u64>,
130        transport_backend: TransportBackend,
131    ) -> anyhow::Result<Self> {
132        match product_type {
133            BinanceProductType::UsdM | BinanceProductType::CoinM => {}
134            _ => {
135                anyhow::bail!(
136                    "BinanceFuturesWebSocketClient requires UsdM or CoinM product type, was {product_type:?}"
137                );
138            }
139        }
140
141        let url =
142            url_override.unwrap_or_else(|| get_ws_base_url(product_type, environment).to_string());
143
144        let credential = match (api_key, api_secret) {
145            (Some(key), Some(secret)) => Some(Arc::new(SigningCredential::new(key, secret))),
146            _ => None,
147        };
148
149        Ok(Self {
150            url,
151            product_type,
152            credential,
153            heartbeat,
154            signal: Arc::new(AtomicBool::new(false)),
155            slots: Arc::new(Mutex::new(Vec::new())),
156            out_tx: Arc::new(Mutex::new(None)),
157            out_rx: Arc::new(Mutex::new(None)),
158            request_id_counter: Arc::new(AtomicU64::new(1)),
159            instruments_cache: Arc::new(AtomicMap::new()),
160            transport_backend,
161        })
162    }
163
164    /// Returns the product type (UsdM or CoinM).
165    #[must_use]
166    pub const fn product_type(&self) -> BinanceProductType {
167        self.product_type
168    }
169
170    /// Returns whether any connection in the pool is active.
171    #[must_use]
172    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
173    pub fn is_active(&self) -> bool {
174        let slots = self.slots.lock().expect("slots lock poisoned");
175        slots
176            .iter()
177            .any(|s| s.connection_mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8)
178    }
179
180    /// Returns whether all connections in the pool are closed.
181    #[must_use]
182    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
183    pub fn is_closed(&self) -> bool {
184        let slots = self.slots.lock().expect("slots lock poisoned");
185        slots.is_empty()
186            || slots
187                .iter()
188                .all(|s| s.connection_mode.load(Ordering::Relaxed) == ConnectionMode::Closed as u8)
189    }
190
191    /// Returns the total number of confirmed subscriptions across all connections.
192    #[must_use]
193    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
194    pub fn subscription_count(&self) -> usize {
195        let slots = self.slots.lock().expect("slots lock poisoned");
196        slots.iter().map(|s| s.subscriptions_state.len()).sum()
197    }
198
199    /// Connects the first WebSocket connection in the pool.
200    ///
201    /// # Errors
202    ///
203    /// Returns an error if connection fails.
204    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
205    pub async fn connect(&mut self) -> BinanceWsResult<()> {
206        self.signal.store(false, Ordering::Relaxed);
207
208        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
209        *self.out_tx.lock().expect("out_tx lock poisoned") = Some(out_tx);
210        *self.out_rx.lock().expect("out_rx lock poisoned") = Some(out_rx);
211
212        let slot = self.create_connection().await?;
213        self.slots.lock().expect("slots lock poisoned").push(slot);
214
215        log::info!(
216            "Connected to Binance Futures stream pool: url={}, product_type={:?}",
217            self.url,
218            self.product_type
219        );
220        Ok(())
221    }
222
223    /// Closes all WebSocket connections in the pool.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if disconnect fails.
228    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
229    pub async fn close(&mut self) -> BinanceWsResult<()> {
230        self.signal.store(true, Ordering::Relaxed);
231
232        let slots: Vec<ConnectionSlot> = {
233            let mut guard = self.slots.lock().expect("slots lock poisoned");
234            guard.drain(..).collect()
235        };
236
237        for slot in slots {
238            slot.cancellation_token.cancel();
239            let _ = slot.cmd_tx.send(BinanceFuturesWsStreamsCommand::Disconnect);
240            let _ = slot.handler_task.await;
241            slot.bytes_task.abort();
242        }
243
244        *self.out_tx.lock().expect("out_tx lock poisoned") = None;
245        *self.out_rx.lock().expect("out_rx lock poisoned") = None;
246
247        log::info!("Disconnected from Binance Futures stream pool");
248        Ok(())
249    }
250
251    /// Subscribes to the specified streams.
252    ///
253    /// Streams are distributed across pool connections. New connections are created
254    /// automatically when existing ones reach the 200-stream limit, up to a maximum
255    /// of 20 connections.
256    ///
257    /// # Errors
258    ///
259    /// Returns an error if the pool is exhausted or command delivery fails.
260    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
261    pub async fn subscribe(&self, streams: Vec<String>) -> BinanceWsResult<()> {
262        // Phase 1: filter already-subscribed streams (brief lock)
263        let new_streams: Vec<String> = {
264            let slots = self.slots.lock().expect("slots lock poisoned");
265            streams
266                .into_iter()
267                .filter(|s| !slots.iter().any(|slot| slot.streams.contains(s)))
268                .collect()
269        };
270
271        if new_streams.is_empty() {
272            return Ok(());
273        }
274
275        // Phase 2: create connections if needed (no lock held during async connect)
276        loop {
277            let (remaining_capacity, slot_count) = {
278                let slots = self.slots.lock().expect("slots lock poisoned");
279                let cap: usize = slots
280                    .iter()
281                    .map(|s| MAX_STREAMS_PER_CONNECTION - s.streams.len())
282                    .sum();
283                (cap, slots.len())
284            };
285
286            if remaining_capacity >= new_streams.len() || slot_count >= MAX_CONNECTIONS {
287                break;
288            }
289
290            let new_slot = self.create_connection().await?;
291            let slot_count = {
292                let mut slots = self.slots.lock().expect("slots lock poisoned");
293                slots.push(new_slot);
294                slots.len()
295            };
296            log::info!(
297                "Pool slot {} connected: url={}, product_type={:?}",
298                slot_count - 1,
299                self.url,
300                self.product_type
301            );
302        }
303
304        // Phase 3: assign streams to slots and send commands (brief lock).
305        // Stage assignments first so a capacity error leaves slots unchanged.
306        let mut slots = self.slots.lock().expect("slots lock poisoned");
307        let mut slot_batches: Vec<(usize, Vec<String>)> = Vec::new();
308        let mut slot_counts: Vec<usize> = slots.iter().map(|s| s.streams.len()).collect();
309
310        for stream in &new_streams {
311            let slot_idx = slot_counts
312                .iter()
313                .position(|&count| count < MAX_STREAMS_PER_CONNECTION)
314                .ok_or_else(|| {
315                    let max_total = MAX_CONNECTIONS * MAX_STREAMS_PER_CONNECTION;
316                    BinanceWsError::ClientError(format!(
317                        "Pool exhausted: {max_total} total subscriptions \
318                         ({MAX_CONNECTIONS} connections x {MAX_STREAMS_PER_CONNECTION} streams)"
319                    ))
320                })?;
321
322            slot_counts[slot_idx] += 1;
323
324            if let Some(batch) = slot_batches.iter_mut().find(|(i, _)| *i == slot_idx) {
325                batch.1.push(stream.clone());
326            } else {
327                slot_batches.push((slot_idx, vec![stream.clone()]));
328            }
329        }
330
331        // Send commands first; only update slot state on success
332        for (slot_idx, batch) in &slot_batches {
333            slots[*slot_idx]
334                .cmd_tx
335                .send(BinanceFuturesWsStreamsCommand::Subscribe {
336                    streams: batch.clone(),
337                })
338                .map_err(|e| {
339                    BinanceWsError::ClientError(format!(
340                        "Handler not available for pool slot {slot_idx}: {e}"
341                    ))
342                })?;
343            slots[*slot_idx].streams.extend(batch.iter().cloned());
344        }
345
346        Ok(())
347    }
348
349    /// Unsubscribes from the specified streams.
350    ///
351    /// # Errors
352    ///
353    /// Returns an error if command delivery fails.
354    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
355    pub async fn unsubscribe(&self, streams: Vec<String>) -> BinanceWsResult<()> {
356        let mut slots = self.slots.lock().expect("slots lock poisoned");
357        let mut slot_batches: Vec<(usize, Vec<String>)> = Vec::new();
358
359        for stream in &streams {
360            if let Some(slot_idx) = slots.iter().position(|s| s.streams.contains(stream)) {
361                if let Some(batch) = slot_batches.iter_mut().find(|(i, _)| *i == slot_idx) {
362                    batch.1.push(stream.clone());
363                } else {
364                    slot_batches.push((slot_idx, vec![stream.clone()]));
365                }
366            }
367        }
368
369        // Send commands first; only update slot state on success
370        for (slot_idx, batch) in &slot_batches {
371            slots[*slot_idx]
372                .cmd_tx
373                .send(BinanceFuturesWsStreamsCommand::Unsubscribe {
374                    streams: batch.clone(),
375                })
376                .map_err(|e| {
377                    BinanceWsError::ClientError(format!(
378                        "Handler not available for pool slot {slot_idx}: {e}"
379                    ))
380                })?;
381
382            for stream in batch {
383                slots[*slot_idx].streams.retain(|s| s != stream);
384            }
385        }
386
387        Ok(())
388    }
389
390    /// Returns a stream of messages from all WebSocket connections.
391    ///
392    /// This method can only be called once per connection lifecycle. Subsequent calls
393    /// return an empty stream.
394    ///
395    /// # Panics
396    ///
397    /// Panics if the internal output receiver mutex is poisoned.
398    pub fn stream(&self) -> impl Stream<Item = BinanceFuturesWsStreamsMessage> + 'static {
399        let out_rx = self.out_rx.lock().expect("out_rx lock poisoned").take();
400        async_stream::stream! {
401            if let Some(mut rx) = out_rx {
402                while let Some(msg) = rx.recv().await {
403                    yield msg;
404                }
405            }
406        }
407    }
408
409    /// Bulk initialize the instrument cache.
410    pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
411        self.instruments_cache.rcu(|m| {
412            for inst in instruments {
413                m.insert(inst.raw_symbol().inner(), inst.clone());
414            }
415        });
416    }
417
418    /// Update a single instrument in the cache.
419    pub fn cache_instrument(&self, instrument: InstrumentAny) {
420        self.instruments_cache
421            .insert(instrument.raw_symbol().inner(), instrument);
422    }
423
424    /// Returns a shared reference to the instruments cache.
425    #[must_use]
426    pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
427        self.instruments_cache.clone()
428    }
429
430    /// Returns an instrument from the cache by raw symbol.
431    #[must_use]
432    pub fn get_instrument(&self, symbol: &str) -> Option<InstrumentAny> {
433        self.instruments_cache.get_cloned(&Ustr::from(symbol))
434    }
435
436    async fn create_connection(&self) -> BinanceWsResult<ConnectionSlot> {
437        let out_tx = self
438            .out_tx
439            .lock()
440            .expect("out_tx lock poisoned")
441            .clone()
442            .ok_or_else(|| {
443                BinanceWsError::ClientError("Output channel not initialized".to_string())
444            })?;
445
446        let (raw_handler, raw_rx) = channel_message_handler();
447        let ping_handler: PingHandler = Arc::new(move |_| {});
448
449        let headers = if let Some(ref cred) = self.credential {
450            vec![(
451                BINANCE_API_KEY_HEADER.to_string(),
452                cred.api_key().to_string(),
453            )]
454        } else {
455            vec![]
456        };
457
458        let config = WebSocketConfig {
459            url: self.url.clone(),
460            headers,
461            heartbeat: self.heartbeat,
462            heartbeat_msg: None,
463            reconnect_timeout_ms: Some(5_000),
464            reconnect_delay_initial_ms: Some(500),
465            reconnect_delay_max_ms: Some(5_000),
466            reconnect_backoff_factor: Some(2.0),
467            reconnect_jitter_ms: Some(250),
468            reconnect_max_attempts: None,
469            idle_timeout_ms: None,
470            backend: self.transport_backend,
471            proxy_url: None,
472        };
473
474        let keyed_quotas = vec![(
475            BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION[0].as_str().to_string(),
476            *BINANCE_WS_SUBSCRIPTION_QUOTA,
477        )];
478
479        let client = WebSocketClient::connect(
480            config,
481            Some(raw_handler),
482            Some(ping_handler),
483            None,
484            keyed_quotas,
485            Some(*BINANCE_WS_CONNECTION_QUOTA),
486        )
487        .await
488        .map_err(|e| BinanceWsError::NetworkError(e.to_string()))?;
489
490        let connection_mode = client.connection_mode_atomic();
491        let subscriptions_state = SubscriptionState::new('@');
492        let cancellation_token = CancellationToken::new();
493
494        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
495
496        // Convert raw Message frames to Vec<u8> for the JSON handler
497        let (bytes_tx, bytes_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
498
499        let bytes_task = get_runtime().spawn(async move {
500            let mut raw_rx = raw_rx;
501            while let Some(msg) = raw_rx.recv().await {
502                let data = match msg {
503                    Message::Binary(data) => data.to_vec(),
504                    Message::Text(text) => text.as_bytes().to_vec(),
505                    Message::Close(_) => break,
506                    Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => continue,
507                };
508
509                if bytes_tx.send(data).is_err() {
510                    break;
511                }
512            }
513        });
514
515        let mut handler = BinanceFuturesDataWsFeedHandler::new(
516            self.signal.clone(),
517            cmd_rx,
518            bytes_rx,
519            out_tx.clone(),
520            subscriptions_state.clone(),
521            self.request_id_counter.clone(),
522        );
523
524        cmd_tx
525            .send(BinanceFuturesWsStreamsCommand::SetClient(client))
526            .map_err(|e| BinanceWsError::ClientError(format!("Failed to set client: {e}")))?;
527
528        let signal = self.signal.clone();
529        let token = cancellation_token.clone();
530        let subs = subscriptions_state.clone();
531        let resubscribe_tx = cmd_tx.clone();
532
533        let handler_task = get_runtime().spawn(async move {
534            loop {
535                tokio::select! {
536                    () = token.cancelled() => {
537                        log::debug!("Handler task cancelled");
538                        break;
539                    }
540                    result = handler.next() => {
541                        match result {
542                            Some(BinanceFuturesWsStreamsMessage::Reconnected) => {
543                                log::info!("WebSocket reconnected, restoring subscriptions");
544                                let all_topics = subs.all_topics();
545                                for topic in &all_topics {
546                                    subs.mark_failure(topic);
547                                }
548
549                                let streams = subs.all_topics();
550                                if !streams.is_empty()
551                                    && let Err(e) = resubscribe_tx.send(BinanceFuturesWsStreamsCommand::Subscribe { streams }) {
552                                        log::error!("Failed to resubscribe after reconnect: {e}");
553                                    }
554
555                                if out_tx.send(BinanceFuturesWsStreamsMessage::Reconnected).is_err() {
556                                    log::debug!("Output channel closed");
557                                    break;
558                                }
559                            }
560                            Some(msg) => {
561                                if out_tx.send(msg).is_err() {
562                                    log::debug!("Output channel closed");
563                                    break;
564                                }
565                            }
566                            None => {
567                                if signal.load(Ordering::Relaxed) {
568                                    log::debug!("Handler received shutdown signal");
569                                } else {
570                                    log::warn!("Handler loop ended unexpectedly");
571                                }
572                                break;
573                            }
574                        }
575                    }
576                }
577            }
578        });
579
580        Ok(ConnectionSlot {
581            cmd_tx,
582            streams: Vec::new(),
583            subscriptions_state,
584            handler_task,
585            bytes_task,
586            cancellation_token,
587            connection_mode,
588        })
589    }
590}