nautilus_binance/futures/websocket/streams/
client.rs1use std::{
28 fmt::Debug,
29 sync::{
30 Arc, Mutex,
31 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32 },
33};
34
35use futures_util::Stream;
36use nautilus_common::live::get_runtime;
37use nautilus_core::{AtomicMap, string::secret::REDACTED};
38use nautilus_model::instruments::{Instrument, InstrumentAny};
39use nautilus_network::{
40 mode::ConnectionMode,
41 websocket::{
42 PingHandler, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
43 channel_message_handler,
44 },
45};
46use tokio_tungstenite::tungstenite::Message;
47use tokio_util::sync::CancellationToken;
48use ustr::Ustr;
49
50use super::{
51 error::{BinanceWsError, BinanceWsResult},
52 handler::BinanceFuturesDataWsFeedHandler,
53 messages::{BinanceFuturesWsStreamsCommand, BinanceFuturesWsStreamsMessage},
54};
55use crate::common::{
56 consts::{
57 BINANCE_API_KEY_HEADER, BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION, BINANCE_WS_CONNECTION_QUOTA,
58 BINANCE_WS_SUBSCRIPTION_QUOTA,
59 },
60 credential::SigningCredential,
61 enums::{BinanceEnvironment, BinanceProductType},
62 urls::get_ws_base_url,
63};
64
65pub const MAX_STREAMS_PER_CONNECTION: usize = 200;
67
68const MAX_CONNECTIONS: usize = 20;
70
71struct ConnectionSlot {
73 cmd_tx: tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsStreamsCommand>,
74 streams: Vec<String>,
75 subscriptions_state: SubscriptionState,
76 handler_task: tokio::task::JoinHandle<()>,
77 bytes_task: tokio::task::JoinHandle<()>,
78 cancellation_token: CancellationToken,
79 connection_mode: Arc<AtomicU8>,
80}
81
82#[derive(Clone)]
89pub struct BinanceFuturesWebSocketClient {
90 url: String,
91 product_type: BinanceProductType,
92 credential: Option<Arc<SigningCredential>>,
93 heartbeat: Option<u64>,
94 signal: Arc<AtomicBool>,
95 slots: Arc<Mutex<Vec<ConnectionSlot>>>,
96 out_tx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsStreamsMessage>>>>,
97 out_rx:
98 Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsStreamsMessage>>>>,
99 request_id_counter: Arc<AtomicU64>,
100 instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
101 transport_backend: TransportBackend,
102}
103
104impl Debug for BinanceFuturesWebSocketClient {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct(stringify!(BinanceFuturesWebSocketClient))
107 .field("url", &self.url)
108 .field("product_type", &self.product_type)
109 .field("credential", &self.credential.as_ref().map(|_| REDACTED))
110 .field("heartbeat", &self.heartbeat)
111 .finish_non_exhaustive()
112 }
113}
114
115impl BinanceFuturesWebSocketClient {
116 pub fn new(
124 product_type: BinanceProductType,
125 environment: BinanceEnvironment,
126 api_key: Option<String>,
127 api_secret: Option<String>,
128 url_override: Option<String>,
129 heartbeat: Option<u64>,
130 transport_backend: TransportBackend,
131 ) -> anyhow::Result<Self> {
132 match product_type {
133 BinanceProductType::UsdM | BinanceProductType::CoinM => {}
134 _ => {
135 anyhow::bail!(
136 "BinanceFuturesWebSocketClient requires UsdM or CoinM product type, was {product_type:?}"
137 );
138 }
139 }
140
141 let url =
142 url_override.unwrap_or_else(|| get_ws_base_url(product_type, environment).to_string());
143
144 let credential = match (api_key, api_secret) {
145 (Some(key), Some(secret)) => Some(Arc::new(SigningCredential::new(key, secret))),
146 _ => None,
147 };
148
149 Ok(Self {
150 url,
151 product_type,
152 credential,
153 heartbeat,
154 signal: Arc::new(AtomicBool::new(false)),
155 slots: Arc::new(Mutex::new(Vec::new())),
156 out_tx: Arc::new(Mutex::new(None)),
157 out_rx: Arc::new(Mutex::new(None)),
158 request_id_counter: Arc::new(AtomicU64::new(1)),
159 instruments_cache: Arc::new(AtomicMap::new()),
160 transport_backend,
161 })
162 }
163
164 #[must_use]
166 pub const fn product_type(&self) -> BinanceProductType {
167 self.product_type
168 }
169
170 #[must_use]
172 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
173 pub fn is_active(&self) -> bool {
174 let slots = self.slots.lock().expect("slots lock poisoned");
175 slots
176 .iter()
177 .any(|s| s.connection_mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8)
178 }
179
180 #[must_use]
182 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
183 pub fn is_closed(&self) -> bool {
184 let slots = self.slots.lock().expect("slots lock poisoned");
185 slots.is_empty()
186 || slots
187 .iter()
188 .all(|s| s.connection_mode.load(Ordering::Relaxed) == ConnectionMode::Closed as u8)
189 }
190
191 #[must_use]
193 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
194 pub fn subscription_count(&self) -> usize {
195 let slots = self.slots.lock().expect("slots lock poisoned");
196 slots.iter().map(|s| s.subscriptions_state.len()).sum()
197 }
198
199 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
205 pub async fn connect(&mut self) -> BinanceWsResult<()> {
206 self.signal.store(false, Ordering::Relaxed);
207
208 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
209 *self.out_tx.lock().expect("out_tx lock poisoned") = Some(out_tx);
210 *self.out_rx.lock().expect("out_rx lock poisoned") = Some(out_rx);
211
212 let slot = self.create_connection().await?;
213 self.slots.lock().expect("slots lock poisoned").push(slot);
214
215 log::info!(
216 "Connected to Binance Futures stream pool: url={}, product_type={:?}",
217 self.url,
218 self.product_type
219 );
220 Ok(())
221 }
222
223 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
229 pub async fn close(&mut self) -> BinanceWsResult<()> {
230 self.signal.store(true, Ordering::Relaxed);
231
232 let slots: Vec<ConnectionSlot> = {
233 let mut guard = self.slots.lock().expect("slots lock poisoned");
234 guard.drain(..).collect()
235 };
236
237 for slot in slots {
238 slot.cancellation_token.cancel();
239 let _ = slot.cmd_tx.send(BinanceFuturesWsStreamsCommand::Disconnect);
240 let _ = slot.handler_task.await;
241 slot.bytes_task.abort();
242 }
243
244 *self.out_tx.lock().expect("out_tx lock poisoned") = None;
245 *self.out_rx.lock().expect("out_rx lock poisoned") = None;
246
247 log::info!("Disconnected from Binance Futures stream pool");
248 Ok(())
249 }
250
251 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
261 pub async fn subscribe(&self, streams: Vec<String>) -> BinanceWsResult<()> {
262 let new_streams: Vec<String> = {
264 let slots = self.slots.lock().expect("slots lock poisoned");
265 streams
266 .into_iter()
267 .filter(|s| !slots.iter().any(|slot| slot.streams.contains(s)))
268 .collect()
269 };
270
271 if new_streams.is_empty() {
272 return Ok(());
273 }
274
275 loop {
277 let (remaining_capacity, slot_count) = {
278 let slots = self.slots.lock().expect("slots lock poisoned");
279 let cap: usize = slots
280 .iter()
281 .map(|s| MAX_STREAMS_PER_CONNECTION - s.streams.len())
282 .sum();
283 (cap, slots.len())
284 };
285
286 if remaining_capacity >= new_streams.len() || slot_count >= MAX_CONNECTIONS {
287 break;
288 }
289
290 let new_slot = self.create_connection().await?;
291 let slot_count = {
292 let mut slots = self.slots.lock().expect("slots lock poisoned");
293 slots.push(new_slot);
294 slots.len()
295 };
296 log::info!(
297 "Pool slot {} connected: url={}, product_type={:?}",
298 slot_count - 1,
299 self.url,
300 self.product_type
301 );
302 }
303
304 let mut slots = self.slots.lock().expect("slots lock poisoned");
307 let mut slot_batches: Vec<(usize, Vec<String>)> = Vec::new();
308 let mut slot_counts: Vec<usize> = slots.iter().map(|s| s.streams.len()).collect();
309
310 for stream in &new_streams {
311 let slot_idx = slot_counts
312 .iter()
313 .position(|&count| count < MAX_STREAMS_PER_CONNECTION)
314 .ok_or_else(|| {
315 let max_total = MAX_CONNECTIONS * MAX_STREAMS_PER_CONNECTION;
316 BinanceWsError::ClientError(format!(
317 "Pool exhausted: {max_total} total subscriptions \
318 ({MAX_CONNECTIONS} connections x {MAX_STREAMS_PER_CONNECTION} streams)"
319 ))
320 })?;
321
322 slot_counts[slot_idx] += 1;
323
324 if let Some(batch) = slot_batches.iter_mut().find(|(i, _)| *i == slot_idx) {
325 batch.1.push(stream.clone());
326 } else {
327 slot_batches.push((slot_idx, vec![stream.clone()]));
328 }
329 }
330
331 for (slot_idx, batch) in &slot_batches {
333 slots[*slot_idx]
334 .cmd_tx
335 .send(BinanceFuturesWsStreamsCommand::Subscribe {
336 streams: batch.clone(),
337 })
338 .map_err(|e| {
339 BinanceWsError::ClientError(format!(
340 "Handler not available for pool slot {slot_idx}: {e}"
341 ))
342 })?;
343 slots[*slot_idx].streams.extend(batch.iter().cloned());
344 }
345
346 Ok(())
347 }
348
349 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
355 pub async fn unsubscribe(&self, streams: Vec<String>) -> BinanceWsResult<()> {
356 let mut slots = self.slots.lock().expect("slots lock poisoned");
357 let mut slot_batches: Vec<(usize, Vec<String>)> = Vec::new();
358
359 for stream in &streams {
360 if let Some(slot_idx) = slots.iter().position(|s| s.streams.contains(stream)) {
361 if let Some(batch) = slot_batches.iter_mut().find(|(i, _)| *i == slot_idx) {
362 batch.1.push(stream.clone());
363 } else {
364 slot_batches.push((slot_idx, vec![stream.clone()]));
365 }
366 }
367 }
368
369 for (slot_idx, batch) in &slot_batches {
371 slots[*slot_idx]
372 .cmd_tx
373 .send(BinanceFuturesWsStreamsCommand::Unsubscribe {
374 streams: batch.clone(),
375 })
376 .map_err(|e| {
377 BinanceWsError::ClientError(format!(
378 "Handler not available for pool slot {slot_idx}: {e}"
379 ))
380 })?;
381
382 for stream in batch {
383 slots[*slot_idx].streams.retain(|s| s != stream);
384 }
385 }
386
387 Ok(())
388 }
389
390 pub fn stream(&self) -> impl Stream<Item = BinanceFuturesWsStreamsMessage> + 'static {
399 let out_rx = self.out_rx.lock().expect("out_rx lock poisoned").take();
400 async_stream::stream! {
401 if let Some(mut rx) = out_rx {
402 while let Some(msg) = rx.recv().await {
403 yield msg;
404 }
405 }
406 }
407 }
408
409 pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
411 self.instruments_cache.rcu(|m| {
412 for inst in instruments {
413 m.insert(inst.raw_symbol().inner(), inst.clone());
414 }
415 });
416 }
417
418 pub fn cache_instrument(&self, instrument: InstrumentAny) {
420 self.instruments_cache
421 .insert(instrument.raw_symbol().inner(), instrument);
422 }
423
424 #[must_use]
426 pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
427 self.instruments_cache.clone()
428 }
429
430 #[must_use]
432 pub fn get_instrument(&self, symbol: &str) -> Option<InstrumentAny> {
433 self.instruments_cache.get_cloned(&Ustr::from(symbol))
434 }
435
436 async fn create_connection(&self) -> BinanceWsResult<ConnectionSlot> {
437 let out_tx = self
438 .out_tx
439 .lock()
440 .expect("out_tx lock poisoned")
441 .clone()
442 .ok_or_else(|| {
443 BinanceWsError::ClientError("Output channel not initialized".to_string())
444 })?;
445
446 let (raw_handler, raw_rx) = channel_message_handler();
447 let ping_handler: PingHandler = Arc::new(move |_| {});
448
449 let headers = if let Some(ref cred) = self.credential {
450 vec![(
451 BINANCE_API_KEY_HEADER.to_string(),
452 cred.api_key().to_string(),
453 )]
454 } else {
455 vec![]
456 };
457
458 let config = WebSocketConfig {
459 url: self.url.clone(),
460 headers,
461 heartbeat: self.heartbeat,
462 heartbeat_msg: None,
463 reconnect_timeout_ms: Some(5_000),
464 reconnect_delay_initial_ms: Some(500),
465 reconnect_delay_max_ms: Some(5_000),
466 reconnect_backoff_factor: Some(2.0),
467 reconnect_jitter_ms: Some(250),
468 reconnect_max_attempts: None,
469 idle_timeout_ms: None,
470 backend: self.transport_backend,
471 proxy_url: None,
472 };
473
474 let keyed_quotas = vec![(
475 BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION[0].as_str().to_string(),
476 *BINANCE_WS_SUBSCRIPTION_QUOTA,
477 )];
478
479 let client = WebSocketClient::connect(
480 config,
481 Some(raw_handler),
482 Some(ping_handler),
483 None,
484 keyed_quotas,
485 Some(*BINANCE_WS_CONNECTION_QUOTA),
486 )
487 .await
488 .map_err(|e| BinanceWsError::NetworkError(e.to_string()))?;
489
490 let connection_mode = client.connection_mode_atomic();
491 let subscriptions_state = SubscriptionState::new('@');
492 let cancellation_token = CancellationToken::new();
493
494 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
495
496 let (bytes_tx, bytes_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
498
499 let bytes_task = get_runtime().spawn(async move {
500 let mut raw_rx = raw_rx;
501 while let Some(msg) = raw_rx.recv().await {
502 let data = match msg {
503 Message::Binary(data) => data.to_vec(),
504 Message::Text(text) => text.as_bytes().to_vec(),
505 Message::Close(_) => break,
506 Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => continue,
507 };
508
509 if bytes_tx.send(data).is_err() {
510 break;
511 }
512 }
513 });
514
515 let mut handler = BinanceFuturesDataWsFeedHandler::new(
516 self.signal.clone(),
517 cmd_rx,
518 bytes_rx,
519 out_tx.clone(),
520 subscriptions_state.clone(),
521 self.request_id_counter.clone(),
522 );
523
524 cmd_tx
525 .send(BinanceFuturesWsStreamsCommand::SetClient(client))
526 .map_err(|e| BinanceWsError::ClientError(format!("Failed to set client: {e}")))?;
527
528 let signal = self.signal.clone();
529 let token = cancellation_token.clone();
530 let subs = subscriptions_state.clone();
531 let resubscribe_tx = cmd_tx.clone();
532
533 let handler_task = get_runtime().spawn(async move {
534 loop {
535 tokio::select! {
536 () = token.cancelled() => {
537 log::debug!("Handler task cancelled");
538 break;
539 }
540 result = handler.next() => {
541 match result {
542 Some(BinanceFuturesWsStreamsMessage::Reconnected) => {
543 log::info!("WebSocket reconnected, restoring subscriptions");
544 let all_topics = subs.all_topics();
545 for topic in &all_topics {
546 subs.mark_failure(topic);
547 }
548
549 let streams = subs.all_topics();
550 if !streams.is_empty()
551 && let Err(e) = resubscribe_tx.send(BinanceFuturesWsStreamsCommand::Subscribe { streams }) {
552 log::error!("Failed to resubscribe after reconnect: {e}");
553 }
554
555 if out_tx.send(BinanceFuturesWsStreamsMessage::Reconnected).is_err() {
556 log::debug!("Output channel closed");
557 break;
558 }
559 }
560 Some(msg) => {
561 if out_tx.send(msg).is_err() {
562 log::debug!("Output channel closed");
563 break;
564 }
565 }
566 None => {
567 if signal.load(Ordering::Relaxed) {
568 log::debug!("Handler received shutdown signal");
569 } else {
570 log::warn!("Handler loop ended unexpectedly");
571 }
572 break;
573 }
574 }
575 }
576 }
577 }
578 });
579
580 Ok(ConnectionSlot {
581 cmd_tx,
582 streams: Vec::new(),
583 subscriptions_state,
584 handler_task,
585 bytes_task,
586 cancellation_token,
587 connection_mode,
588 })
589 }
590}