nautilus_binance/spot/websocket/streams/
client.rs1use std::{
28 fmt::Debug,
29 sync::{
30 Arc, Mutex,
31 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32 },
33};
34
35use futures_util::Stream;
36use nautilus_common::live::get_runtime;
37use nautilus_core::{AtomicMap, string::secret::REDACTED};
38use nautilus_model::instruments::{Instrument, InstrumentAny};
39use nautilus_network::{
40 mode::ConnectionMode,
41 websocket::{
42 PingHandler, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
43 channel_message_handler,
44 },
45};
46use tokio_util::sync::CancellationToken;
47use ustr::Ustr;
48
49use super::{
50 super::error::{BinanceWsError, BinanceWsResult},
51 handler::BinanceSpotWsFeedHandler,
52 messages::{BinanceSpotWsMessage, BinanceSpotWsStreamsCommand},
53 subscription::{MAX_CONNECTIONS, MAX_STREAMS_PER_CONNECTION},
54};
55use crate::common::{
56 consts::{
57 BINANCE_API_KEY_HEADER, BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION, BINANCE_SPOT_SBE_WS_URL,
58 BINANCE_WS_CONNECTION_QUOTA, BINANCE_WS_SUBSCRIPTION_QUOTA,
59 },
60 credential::Ed25519Credential,
61};
62
63struct ConnectionSlot {
65 cmd_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsStreamsCommand>,
66 streams: Vec<String>,
67 subscriptions_state: SubscriptionState,
68 task_handle: tokio::task::JoinHandle<()>,
69 cancellation_token: CancellationToken,
70 connection_mode: Arc<AtomicU8>,
71}
72
73#[derive(Clone)]
80pub struct BinanceSpotWebSocketClient {
81 url: String,
82 credential: Option<Arc<Ed25519Credential>>,
83 heartbeat: Option<u64>,
84 signal: Arc<AtomicBool>,
85 slots: Arc<Mutex<Vec<ConnectionSlot>>>,
86 out_tx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedSender<BinanceSpotWsMessage>>>>,
87 out_rx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsMessage>>>>,
88 request_id_counter: Arc<AtomicU64>,
89 instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
90 transport_backend: TransportBackend,
91}
92
93impl Debug for BinanceSpotWebSocketClient {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 f.debug_struct(stringify!(BinanceSpotWebSocketClient))
96 .field("url", &self.url)
97 .field("credential", &self.credential.as_ref().map(|_| REDACTED))
98 .field("heartbeat", &self.heartbeat)
99 .finish_non_exhaustive()
100 }
101}
102
103impl Default for BinanceSpotWebSocketClient {
104 fn default() -> Self {
105 Self::new(None, None, None, None, TransportBackend::default()).unwrap()
106 }
107}
108
109impl BinanceSpotWebSocketClient {
110 pub fn new(
116 url: Option<String>,
117 api_key: Option<String>,
118 api_secret: Option<String>,
119 heartbeat: Option<u64>,
120 transport_backend: TransportBackend,
121 ) -> anyhow::Result<Self> {
122 let url = url.unwrap_or(BINANCE_SPOT_SBE_WS_URL.to_string());
123
124 let credential = match (api_key, api_secret) {
125 (Some(key), Some(secret)) => Some(Arc::new(Ed25519Credential::new(key, &secret)?)),
126 _ => None,
127 };
128
129 Ok(Self {
130 url,
131 credential,
132 heartbeat,
133 signal: Arc::new(AtomicBool::new(false)),
134 slots: Arc::new(Mutex::new(Vec::new())),
135 out_tx: Arc::new(Mutex::new(None)),
136 out_rx: Arc::new(Mutex::new(None)),
137 request_id_counter: Arc::new(AtomicU64::new(1)),
138 instruments_cache: Arc::new(AtomicMap::new()),
139 transport_backend,
140 })
141 }
142
143 #[must_use]
145 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
146 pub fn is_active(&self) -> bool {
147 let slots = self.slots.lock().expect("slots lock poisoned");
148 slots
149 .iter()
150 .any(|s| s.connection_mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8)
151 }
152
153 #[must_use]
155 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
156 pub fn is_closed(&self) -> bool {
157 let slots = self.slots.lock().expect("slots lock poisoned");
158 slots.is_empty()
159 || slots
160 .iter()
161 .all(|s| s.connection_mode.load(Ordering::Relaxed) == ConnectionMode::Closed as u8)
162 }
163
164 #[must_use]
166 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
167 pub fn subscription_count(&self) -> usize {
168 let slots = self.slots.lock().expect("slots lock poisoned");
169 slots.iter().map(|s| s.subscriptions_state.len()).sum()
170 }
171
172 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
178 pub async fn connect(&mut self) -> BinanceWsResult<()> {
179 self.signal.store(false, Ordering::Relaxed);
180
181 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
182 *self.out_tx.lock().expect("out_tx lock poisoned") = Some(out_tx);
183 *self.out_rx.lock().expect("out_rx lock poisoned") = Some(out_rx);
184
185 let slot = self.create_connection().await?;
186 self.slots.lock().expect("slots lock poisoned").push(slot);
187
188 log::info!(
189 "Connected to Binance Spot SBE stream pool: url={}",
190 self.url
191 );
192 Ok(())
193 }
194
195 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
201 pub async fn close(&mut self) -> BinanceWsResult<()> {
202 self.signal.store(true, Ordering::Relaxed);
203
204 let slots: Vec<ConnectionSlot> = {
205 let mut guard = self.slots.lock().expect("slots lock poisoned");
206 guard.drain(..).collect()
207 };
208
209 for slot in slots {
210 slot.cancellation_token.cancel();
211 let _ = slot.cmd_tx.send(BinanceSpotWsStreamsCommand::Disconnect);
212 let _ = slot.task_handle.await;
213 }
214
215 *self.out_tx.lock().expect("out_tx lock poisoned") = None;
216 *self.out_rx.lock().expect("out_rx lock poisoned") = None;
217
218 log::info!("Disconnected from Binance Spot SBE stream pool");
219 Ok(())
220 }
221
222 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
232 pub async fn subscribe(&self, streams: Vec<String>) -> BinanceWsResult<()> {
233 let new_streams: Vec<String> = {
235 let slots = self.slots.lock().expect("slots lock poisoned");
236 streams
237 .into_iter()
238 .filter(|s| !slots.iter().any(|slot| slot.streams.contains(s)))
239 .collect()
240 };
241
242 if new_streams.is_empty() {
243 return Ok(());
244 }
245
246 loop {
248 let (remaining_capacity, slot_count) = {
249 let slots = self.slots.lock().expect("slots lock poisoned");
250 let cap: usize = slots
251 .iter()
252 .map(|s| MAX_STREAMS_PER_CONNECTION - s.streams.len())
253 .sum();
254 (cap, slots.len())
255 };
256
257 if remaining_capacity >= new_streams.len() || slot_count >= MAX_CONNECTIONS {
258 break;
259 }
260
261 let new_slot = self.create_connection().await?;
262 let slot_count = {
263 let mut slots = self.slots.lock().expect("slots lock poisoned");
264 slots.push(new_slot);
265 slots.len()
266 };
267 log::info!("Pool slot {} connected: url={}", slot_count - 1, self.url);
268 }
269
270 let mut slots = self.slots.lock().expect("slots lock poisoned");
273 let mut slot_batches: Vec<(usize, Vec<String>)> = Vec::new();
274 let mut slot_counts: Vec<usize> = slots.iter().map(|s| s.streams.len()).collect();
275
276 for stream in &new_streams {
277 let slot_idx = slot_counts
278 .iter()
279 .position(|&count| count < MAX_STREAMS_PER_CONNECTION)
280 .ok_or_else(|| {
281 let max_total = MAX_CONNECTIONS * MAX_STREAMS_PER_CONNECTION;
282 BinanceWsError::ClientError(format!(
283 "Pool exhausted: {max_total} total subscriptions \
284 ({MAX_CONNECTIONS} connections x {MAX_STREAMS_PER_CONNECTION} streams)"
285 ))
286 })?;
287
288 slot_counts[slot_idx] += 1;
289
290 if let Some(batch) = slot_batches.iter_mut().find(|(i, _)| *i == slot_idx) {
291 batch.1.push(stream.clone());
292 } else {
293 slot_batches.push((slot_idx, vec![stream.clone()]));
294 }
295 }
296
297 for (slot_idx, batch) in &slot_batches {
299 slots[*slot_idx]
300 .cmd_tx
301 .send(BinanceSpotWsStreamsCommand::Subscribe {
302 streams: batch.clone(),
303 })
304 .map_err(|e| {
305 BinanceWsError::ClientError(format!(
306 "Handler not available for pool slot {slot_idx}: {e}"
307 ))
308 })?;
309 slots[*slot_idx].streams.extend(batch.iter().cloned());
310 }
311
312 Ok(())
313 }
314
315 #[expect(clippy::missing_panics_doc, reason = "mutex poisoning is not expected")]
321 pub async fn unsubscribe(&self, streams: Vec<String>) -> BinanceWsResult<()> {
322 let mut slots = self.slots.lock().expect("slots lock poisoned");
323 let mut slot_batches: Vec<(usize, Vec<String>)> = Vec::new();
324
325 for stream in &streams {
326 if let Some(slot_idx) = slots.iter().position(|s| s.streams.contains(stream)) {
327 if let Some(batch) = slot_batches.iter_mut().find(|(i, _)| *i == slot_idx) {
328 batch.1.push(stream.clone());
329 } else {
330 slot_batches.push((slot_idx, vec![stream.clone()]));
331 }
332 }
333 }
334
335 for (slot_idx, batch) in &slot_batches {
337 slots[*slot_idx]
338 .cmd_tx
339 .send(BinanceSpotWsStreamsCommand::Unsubscribe {
340 streams: batch.clone(),
341 })
342 .map_err(|e| {
343 BinanceWsError::ClientError(format!(
344 "Handler not available for pool slot {slot_idx}: {e}"
345 ))
346 })?;
347
348 for stream in batch {
349 slots[*slot_idx].streams.retain(|s| s != stream);
350 }
351 }
352
353 Ok(())
354 }
355
356 pub fn stream(&self) -> impl Stream<Item = BinanceSpotWsMessage> + 'static {
365 let out_rx = self.out_rx.lock().expect("out_rx lock poisoned").take();
366 async_stream::stream! {
367 if let Some(mut rx) = out_rx {
368 while let Some(msg) = rx.recv().await {
369 yield msg;
370 }
371 }
372 }
373 }
374
375 pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
377 self.instruments_cache.rcu(|m| {
378 for inst in instruments {
379 m.insert(inst.symbol().inner(), inst.clone());
380 }
381 });
382 }
383
384 pub fn cache_instrument(&self, instrument: InstrumentAny) {
386 self.instruments_cache
387 .insert(instrument.symbol().inner(), instrument);
388 }
389
390 #[must_use]
392 pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
393 self.instruments_cache.clone()
394 }
395
396 #[must_use]
398 pub fn get_instrument(&self, symbol: &str) -> Option<InstrumentAny> {
399 self.instruments_cache.get_cloned(&Ustr::from(symbol))
400 }
401
402 async fn create_connection(&self) -> BinanceWsResult<ConnectionSlot> {
403 let out_tx = self
404 .out_tx
405 .lock()
406 .expect("out_tx lock poisoned")
407 .clone()
408 .ok_or_else(|| {
409 BinanceWsError::ClientError("Output channel not initialized".to_string())
410 })?;
411
412 let (raw_handler, raw_rx) = channel_message_handler();
413 let ping_handler: PingHandler = Arc::new(move |_| {});
414
415 let headers = if let Some(ref cred) = self.credential {
416 vec![(
417 BINANCE_API_KEY_HEADER.to_string(),
418 cred.api_key().to_string(),
419 )]
420 } else {
421 vec![]
422 };
423
424 let config = WebSocketConfig {
425 url: self.url.clone(),
426 headers,
427 heartbeat: self.heartbeat,
428 heartbeat_msg: None,
429 reconnect_timeout_ms: Some(5_000),
430 reconnect_delay_initial_ms: Some(500),
431 reconnect_delay_max_ms: Some(5_000),
432 reconnect_backoff_factor: Some(2.0),
433 reconnect_jitter_ms: Some(250),
434 reconnect_max_attempts: None,
435 idle_timeout_ms: None,
436 backend: self.transport_backend,
437 proxy_url: None,
438 };
439
440 let keyed_quotas = vec![(
441 BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION[0].as_str().to_string(),
442 *BINANCE_WS_SUBSCRIPTION_QUOTA,
443 )];
444
445 let client = WebSocketClient::connect(
446 config,
447 Some(raw_handler),
448 Some(ping_handler),
449 None,
450 keyed_quotas,
451 Some(*BINANCE_WS_CONNECTION_QUOTA),
452 )
453 .await
454 .map_err(|e| {
455 log::error!("WebSocket connection failed: {e}");
456 BinanceWsError::NetworkError(e.to_string())
457 })?;
458
459 let connection_mode = client.connection_mode_atomic();
460 let subscriptions_state = SubscriptionState::new('@');
461 let cancellation_token = CancellationToken::new();
462
463 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
464
465 let mut handler = BinanceSpotWsFeedHandler::new(
466 self.signal.clone(),
467 cmd_rx,
468 raw_rx,
469 out_tx.clone(),
470 subscriptions_state.clone(),
471 self.request_id_counter.clone(),
472 );
473
474 cmd_tx
475 .send(BinanceSpotWsStreamsCommand::SetClient(client))
476 .map_err(|e| BinanceWsError::ClientError(format!("Failed to set client: {e}")))?;
477
478 let signal = self.signal.clone();
479 let token = cancellation_token.clone();
480 let subs = subscriptions_state.clone();
481 let resubscribe_tx = cmd_tx.clone();
482
483 let task_handle = get_runtime().spawn(async move {
484 loop {
485 tokio::select! {
486 () = token.cancelled() => {
487 log::debug!("Handler task cancelled");
488 break;
489 }
490 result = handler.next() => {
491 match result {
492 Some(BinanceSpotWsMessage::Reconnected) => {
493 log::info!("WebSocket reconnected, restoring subscriptions");
494 let all_topics = subs.all_topics();
495 for topic in &all_topics {
496 subs.mark_failure(topic);
497 }
498
499 let streams = subs.all_topics();
500 if !streams.is_empty()
501 && let Err(e) = resubscribe_tx.send(BinanceSpotWsStreamsCommand::Subscribe { streams }) {
502 log::error!("Failed to resubscribe after reconnect: {e}");
503 }
504
505 if out_tx.send(BinanceSpotWsMessage::Reconnected).is_err() {
506 log::debug!("Output channel closed");
507 break;
508 }
509 }
510 Some(msg) => {
511 if out_tx.send(msg).is_err() {
512 log::debug!("Output channel closed");
513 break;
514 }
515 }
516 None => {
517 if signal.load(Ordering::Relaxed) {
518 log::debug!("Handler received shutdown signal");
519 } else {
520 log::warn!("Handler loop ended unexpectedly");
521 }
522 break;
523 }
524 }
525 }
526 }
527 }
528 });
529
530 Ok(ConnectionSlot {
531 cmd_tx,
532 streams: Vec::new(),
533 subscriptions_state,
534 task_handle,
535 cancellation_token,
536 connection_mode,
537 })
538 }
539}