Skip to main content

nautilus_binance/spot/websocket/streams/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Spot WebSocket client for SBE market data streams.
17//!
18//! ## Connection Details
19//!
20//! - Endpoint: `stream-sbe.binance.com` or `stream-sbe.binance.com:9443`
21//! - Authentication: Ed25519 API key in `X-MBX-APIKEY` header
22//! - Max streams: 1024 per connection
23//! - Max connections: 20 per pool (up to 20,480 total streams)
24//! - Connection validity: 24 hours
25//! - Ping/pong: Every 20 seconds
26
27use std::{
28    fmt::Debug,
29    sync::{
30        Arc, Mutex,
31        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32    },
33};
34
35use futures_util::Stream;
36use nautilus_common::live::get_runtime;
37use nautilus_core::{AtomicMap, string::secret::REDACTED};
38use nautilus_model::instruments::{Instrument, InstrumentAny};
39use nautilus_network::{
40    mode::ConnectionMode,
41    websocket::{
42        PingHandler, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
43        channel_message_handler,
44    },
45};
46use tokio_util::sync::CancellationToken;
47use ustr::Ustr;
48
49use super::{
50    super::error::{BinanceWsError, BinanceWsResult},
51    handler::BinanceSpotWsFeedHandler,
52    messages::{BinanceSpotWsMessage, BinanceSpotWsStreamsCommand},
53    subscription::{MAX_CONNECTIONS, MAX_STREAMS_PER_CONNECTION},
54};
55use crate::common::{
56    consts::{
57        BINANCE_API_KEY_HEADER, BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION, BINANCE_SPOT_SBE_WS_URL,
58        BINANCE_WS_CONNECTION_QUOTA, BINANCE_WS_SUBSCRIPTION_QUOTA,
59    },
60    credential::Ed25519Credential,
61};
62
63// State for a single WebSocket connection within the pool
64struct ConnectionSlot {
65    cmd_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsStreamsCommand>,
66    streams: Vec<String>,
67    subscriptions_state: SubscriptionState,
68    task_handle: tokio::task::JoinHandle<()>,
69    cancellation_token: CancellationToken,
70    connection_mode: Arc<AtomicU8>,
71}
72
73/// Binance Spot WebSocket client for SBE market data streams.
74///
75/// Manages a pool of up to 20 connections, each supporting up to 1024 streams.
76/// New connections are created automatically when subscribing exceeds the current
77/// connection's stream limit. All connections feed into a single output stream,
78/// transparent to the data client.
79#[derive(Clone)]
80pub struct BinanceSpotWebSocketClient {
81    url: String,
82    credential: Option<Arc<Ed25519Credential>>,
83    heartbeat: Option<u64>,
84    signal: Arc<AtomicBool>,
85    slots: Arc<Mutex<Vec<ConnectionSlot>>>,
86    out_tx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedSender<BinanceSpotWsMessage>>>>,
87    out_rx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsMessage>>>>,
88    request_id_counter: Arc<AtomicU64>,
89    instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
90    transport_backend: TransportBackend,
91}
92
93impl Debug for BinanceSpotWebSocketClient {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        f.debug_struct(stringify!(BinanceSpotWebSocketClient))
96            .field("url", &self.url)
97            .field("credential", &self.credential.as_ref().map(|_| REDACTED))
98            .field("heartbeat", &self.heartbeat)
99            .finish_non_exhaustive()
100    }
101}
102
103impl Default for BinanceSpotWebSocketClient {
104    fn default() -> Self {
105        Self::new(None, None, None, None, TransportBackend::default()).unwrap()
106    }
107}
108
109impl BinanceSpotWebSocketClient {
110    /// Creates a new [`BinanceSpotWebSocketClient`] instance.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if credential creation fails.
115    pub fn new(
116        url: Option<String>,
117        api_key: Option<String>,
118        api_secret: Option<String>,
119        heartbeat: Option<u64>,
120        transport_backend: TransportBackend,
121    ) -> anyhow::Result<Self> {
122        let url = url.unwrap_or(BINANCE_SPOT_SBE_WS_URL.to_string());
123
124        let credential = match (api_key, api_secret) {
125            (Some(key), Some(secret)) => Some(Arc::new(Ed25519Credential::new(key, &secret)?)),
126            _ => None,
127        };
128
129        Ok(Self {
130            url,
131            credential,
132            heartbeat,
133            signal: Arc::new(AtomicBool::new(false)),
134            slots: Arc::new(Mutex::new(Vec::new())),
135            out_tx: Arc::new(Mutex::new(None)),
136            out_rx: Arc::new(Mutex::new(None)),
137            request_id_counter: Arc::new(AtomicU64::new(1)),
138            instruments_cache: Arc::new(AtomicMap::new()),
139            transport_backend,
140        })
141    }
142
143    /// Returns whether any connection in the pool is active.
144    #[must_use]
145    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
146    pub fn is_active(&self) -> bool {
147        let slots = self.slots.lock().expect("slots lock poisoned");
148        slots
149            .iter()
150            .any(|s| s.connection_mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8)
151    }
152
153    /// Returns whether all connections in the pool are closed.
154    #[must_use]
155    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
156    pub fn is_closed(&self) -> bool {
157        let slots = self.slots.lock().expect("slots lock poisoned");
158        slots.is_empty()
159            || slots
160                .iter()
161                .all(|s| s.connection_mode.load(Ordering::Relaxed) == ConnectionMode::Closed as u8)
162    }
163
164    /// Returns the total number of confirmed subscriptions across all connections.
165    #[must_use]
166    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
167    pub fn subscription_count(&self) -> usize {
168        let slots = self.slots.lock().expect("slots lock poisoned");
169        slots.iter().map(|s| s.subscriptions_state.len()).sum()
170    }
171
172    /// Connects the first WebSocket connection in the pool.
173    ///
174    /// # Errors
175    ///
176    /// Returns an error if connection fails.
177    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
178    pub async fn connect(&mut self) -> BinanceWsResult<()> {
179        self.signal.store(false, Ordering::Relaxed);
180
181        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
182        *self.out_tx.lock().expect("out_tx lock poisoned") = Some(out_tx);
183        *self.out_rx.lock().expect("out_rx lock poisoned") = Some(out_rx);
184
185        let slot = self.create_connection().await?;
186        self.slots.lock().expect("slots lock poisoned").push(slot);
187
188        log::info!(
189            "Connected to Binance Spot SBE stream pool: url={}",
190            self.url
191        );
192        Ok(())
193    }
194
195    /// Closes all WebSocket connections in the pool.
196    ///
197    /// # Errors
198    ///
199    /// Returns an error if disconnect fails.
200    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
201    pub async fn close(&mut self) -> BinanceWsResult<()> {
202        self.signal.store(true, Ordering::Relaxed);
203
204        let slots: Vec<ConnectionSlot> = {
205            let mut guard = self.slots.lock().expect("slots lock poisoned");
206            guard.drain(..).collect()
207        };
208
209        for slot in slots {
210            slot.cancellation_token.cancel();
211            let _ = slot.cmd_tx.send(BinanceSpotWsStreamsCommand::Disconnect);
212            let _ = slot.task_handle.await;
213        }
214
215        *self.out_tx.lock().expect("out_tx lock poisoned") = None;
216        *self.out_rx.lock().expect("out_rx lock poisoned") = None;
217
218        log::info!("Disconnected from Binance Spot SBE stream pool");
219        Ok(())
220    }
221
222    /// Subscribes to the specified streams.
223    ///
224    /// Streams are distributed across pool connections. New connections are created
225    /// automatically when existing ones reach the 1024-stream limit, up to a maximum
226    /// of 20 connections.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if the pool is exhausted or command delivery fails.
231    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
232    pub async fn subscribe(&self, streams: Vec<String>) -> BinanceWsResult<()> {
233        // Phase 1: filter already-subscribed streams (brief lock)
234        let new_streams: Vec<String> = {
235            let slots = self.slots.lock().expect("slots lock poisoned");
236            streams
237                .into_iter()
238                .filter(|s| !slots.iter().any(|slot| slot.streams.contains(s)))
239                .collect()
240        };
241
242        if new_streams.is_empty() {
243            return Ok(());
244        }
245
246        // Phase 2: create connections if needed (no lock held during async connect)
247        loop {
248            let (remaining_capacity, slot_count) = {
249                let slots = self.slots.lock().expect("slots lock poisoned");
250                let cap: usize = slots
251                    .iter()
252                    .map(|s| MAX_STREAMS_PER_CONNECTION - s.streams.len())
253                    .sum();
254                (cap, slots.len())
255            };
256
257            if remaining_capacity >= new_streams.len() || slot_count >= MAX_CONNECTIONS {
258                break;
259            }
260
261            let new_slot = self.create_connection().await?;
262            let slot_count = {
263                let mut slots = self.slots.lock().expect("slots lock poisoned");
264                slots.push(new_slot);
265                slots.len()
266            };
267            log::info!("Pool slot {} connected: url={}", slot_count - 1, self.url);
268        }
269
270        // Phase 3: assign streams to slots and send commands (brief lock).
271        // Stage assignments first so a capacity error leaves slots unchanged.
272        let mut slots = self.slots.lock().expect("slots lock poisoned");
273        let mut slot_batches: Vec<(usize, Vec<String>)> = Vec::new();
274        let mut slot_counts: Vec<usize> = slots.iter().map(|s| s.streams.len()).collect();
275
276        for stream in &new_streams {
277            let slot_idx = slot_counts
278                .iter()
279                .position(|&count| count < MAX_STREAMS_PER_CONNECTION)
280                .ok_or_else(|| {
281                    let max_total = MAX_CONNECTIONS * MAX_STREAMS_PER_CONNECTION;
282                    BinanceWsError::ClientError(format!(
283                        "Pool exhausted: {max_total} total subscriptions \
284                         ({MAX_CONNECTIONS} connections x {MAX_STREAMS_PER_CONNECTION} streams)"
285                    ))
286                })?;
287
288            slot_counts[slot_idx] += 1;
289
290            if let Some(batch) = slot_batches.iter_mut().find(|(i, _)| *i == slot_idx) {
291                batch.1.push(stream.clone());
292            } else {
293                slot_batches.push((slot_idx, vec![stream.clone()]));
294            }
295        }
296
297        // Send commands first; only update slot state on success
298        for (slot_idx, batch) in &slot_batches {
299            slots[*slot_idx]
300                .cmd_tx
301                .send(BinanceSpotWsStreamsCommand::Subscribe {
302                    streams: batch.clone(),
303                })
304                .map_err(|e| {
305                    BinanceWsError::ClientError(format!(
306                        "Handler not available for pool slot {slot_idx}: {e}"
307                    ))
308                })?;
309            slots[*slot_idx].streams.extend(batch.iter().cloned());
310        }
311
312        Ok(())
313    }
314
315    /// Unsubscribes from the specified streams.
316    ///
317    /// # Errors
318    ///
319    /// Returns an error if command delivery fails.
320    #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
321    pub async fn unsubscribe(&self, streams: Vec<String>) -> BinanceWsResult<()> {
322        let mut slots = self.slots.lock().expect("slots lock poisoned");
323        let mut slot_batches: Vec<(usize, Vec<String>)> = Vec::new();
324
325        for stream in &streams {
326            if let Some(slot_idx) = slots.iter().position(|s| s.streams.contains(stream)) {
327                if let Some(batch) = slot_batches.iter_mut().find(|(i, _)| *i == slot_idx) {
328                    batch.1.push(stream.clone());
329                } else {
330                    slot_batches.push((slot_idx, vec![stream.clone()]));
331                }
332            }
333        }
334
335        // Send commands first; only update slot state on success
336        for (slot_idx, batch) in &slot_batches {
337            slots[*slot_idx]
338                .cmd_tx
339                .send(BinanceSpotWsStreamsCommand::Unsubscribe {
340                    streams: batch.clone(),
341                })
342                .map_err(|e| {
343                    BinanceWsError::ClientError(format!(
344                        "Handler not available for pool slot {slot_idx}: {e}"
345                    ))
346                })?;
347
348            for stream in batch {
349                slots[*slot_idx].streams.retain(|s| s != stream);
350            }
351        }
352
353        Ok(())
354    }
355
356    /// Returns a stream of messages from all WebSocket connections.
357    ///
358    /// This method can only be called once per connection lifecycle. Subsequent calls
359    /// return an empty stream.
360    ///
361    /// # Panics
362    ///
363    /// Panics if the internal output receiver mutex is poisoned.
364    pub fn stream(&self) -> impl Stream<Item = BinanceSpotWsMessage> + 'static {
365        let out_rx = self.out_rx.lock().expect("out_rx lock poisoned").take();
366        async_stream::stream! {
367            if let Some(mut rx) = out_rx {
368                while let Some(msg) = rx.recv().await {
369                    yield msg;
370                }
371            }
372        }
373    }
374
375    /// Bulk initialize the instrument cache.
376    pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
377        self.instruments_cache.rcu(|m| {
378            for inst in instruments {
379                m.insert(inst.symbol().inner(), inst.clone());
380            }
381        });
382    }
383
384    /// Update a single instrument in the cache.
385    pub fn cache_instrument(&self, instrument: InstrumentAny) {
386        self.instruments_cache
387            .insert(instrument.symbol().inner(), instrument);
388    }
389
390    /// Returns a shared reference to the instruments cache.
391    #[must_use]
392    pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
393        self.instruments_cache.clone()
394    }
395
396    /// Returns an instrument from the cache by symbol.
397    #[must_use]
398    pub fn get_instrument(&self, symbol: &str) -> Option<InstrumentAny> {
399        self.instruments_cache.get_cloned(&Ustr::from(symbol))
400    }
401
402    async fn create_connection(&self) -> BinanceWsResult<ConnectionSlot> {
403        let out_tx = self
404            .out_tx
405            .lock()
406            .expect("out_tx lock poisoned")
407            .clone()
408            .ok_or_else(|| {
409                BinanceWsError::ClientError("Output channel not initialized".to_string())
410            })?;
411
412        let (raw_handler, raw_rx) = channel_message_handler();
413        let ping_handler: PingHandler = Arc::new(move |_| {});
414
415        let headers = if let Some(ref cred) = self.credential {
416            vec![(
417                BINANCE_API_KEY_HEADER.to_string(),
418                cred.api_key().to_string(),
419            )]
420        } else {
421            vec![]
422        };
423
424        let config = WebSocketConfig {
425            url: self.url.clone(),
426            headers,
427            heartbeat: self.heartbeat,
428            heartbeat_msg: None,
429            reconnect_timeout_ms: Some(5_000),
430            reconnect_delay_initial_ms: Some(500),
431            reconnect_delay_max_ms: Some(5_000),
432            reconnect_backoff_factor: Some(2.0),
433            reconnect_jitter_ms: Some(250),
434            reconnect_max_attempts: None,
435            idle_timeout_ms: None,
436            backend: self.transport_backend,
437            proxy_url: None,
438        };
439
440        let keyed_quotas = vec![(
441            BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION[0].as_str().to_string(),
442            *BINANCE_WS_SUBSCRIPTION_QUOTA,
443        )];
444
445        let client = WebSocketClient::connect(
446            config,
447            Some(raw_handler),
448            Some(ping_handler),
449            None,
450            keyed_quotas,
451            Some(*BINANCE_WS_CONNECTION_QUOTA),
452        )
453        .await
454        .map_err(|e| {
455            log::error!("WebSocket connection failed: {e}");
456            BinanceWsError::NetworkError(e.to_string())
457        })?;
458
459        let connection_mode = client.connection_mode_atomic();
460        let subscriptions_state = SubscriptionState::new('@');
461        let cancellation_token = CancellationToken::new();
462
463        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
464
465        let mut handler = BinanceSpotWsFeedHandler::new(
466            self.signal.clone(),
467            cmd_rx,
468            raw_rx,
469            out_tx.clone(),
470            subscriptions_state.clone(),
471            self.request_id_counter.clone(),
472        );
473
474        cmd_tx
475            .send(BinanceSpotWsStreamsCommand::SetClient(client))
476            .map_err(|e| BinanceWsError::ClientError(format!("Failed to set client: {e}")))?;
477
478        let signal = self.signal.clone();
479        let token = cancellation_token.clone();
480        let subs = subscriptions_state.clone();
481        let resubscribe_tx = cmd_tx.clone();
482
483        let task_handle = get_runtime().spawn(async move {
484            loop {
485                tokio::select! {
486                    () = token.cancelled() => {
487                        log::debug!("Handler task cancelled");
488                        break;
489                    }
490                    result = handler.next() => {
491                        match result {
492                            Some(BinanceSpotWsMessage::Reconnected) => {
493                                log::info!("WebSocket reconnected, restoring subscriptions");
494                                let all_topics = subs.all_topics();
495                                for topic in &all_topics {
496                                    subs.mark_failure(topic);
497                                }
498
499                                let streams = subs.all_topics();
500                                if !streams.is_empty()
501                                    && let Err(e) = resubscribe_tx.send(BinanceSpotWsStreamsCommand::Subscribe { streams }) {
502                                        log::error!("Failed to resubscribe after reconnect: {e}");
503                                    }
504
505                                if out_tx.send(BinanceSpotWsMessage::Reconnected).is_err() {
506                                    log::debug!("Output channel closed");
507                                    break;
508                                }
509                            }
510                            Some(msg) => {
511                                if out_tx.send(msg).is_err() {
512                                    log::debug!("Output channel closed");
513                                    break;
514                                }
515                            }
516                            None => {
517                                if signal.load(Ordering::Relaxed) {
518                                    log::debug!("Handler received shutdown signal");
519                                } else {
520                                    log::warn!("Handler loop ended unexpectedly");
521                                }
522                                break;
523                            }
524                        }
525                    }
526                }
527            }
528        });
529
530        Ok(ConnectionSlot {
531            cmd_tx,
532            streams: Vec::new(),
533            subscriptions_state,
534            task_handle,
535            cancellation_token,
536            connection_mode,
537        })
538    }
539}