Skip to main content

nautilus_binance/spot/websocket/trading/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Spot WebSocket API client for SBE trading.
17//!
18//! ## Connection Details
19//!
20//! - Endpoint: `ws-api.binance.com:443/ws-api/v3`
21//! - Authentication: Ed25519 signature per request
22//! - SBE responses: Enabled via `responseFormat=sbe` query parameter
23//! - Connection validity: 24 hours
24//! - Ping/pong: Every 20 seconds
25
26use std::{
27    fmt::Debug,
28    num::NonZeroU32,
29    sync::{
30        Arc, LazyLock, Mutex,
31        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32    },
33};
34
35use arc_swap::ArcSwap;
36use nautilus_common::live::get_runtime;
37use nautilus_core::string::secret::REDACTED;
38use nautilus_network::{
39    mode::ConnectionMode,
40    ratelimiter::quota::Quota,
41    websocket::{
42        PingHandler, TransportBackend, WebSocketClient, WebSocketConfig, channel_message_handler,
43    },
44};
45use tokio_util::sync::CancellationToken;
46use ustr::Ustr;
47
48use super::{
49    error::{BinanceWsApiError, BinanceWsApiResult},
50    handler::BinanceSpotWsTradingHandler,
51    messages::{BinanceSpotWsTradingCommand, BinanceSpotWsTradingMessage},
52};
53use crate::{
54    common::{
55        consts::{BINANCE_API_KEY_HEADER, BINANCE_SPOT_SBE_WS_API_URL},
56        credential::SigningCredential,
57    },
58    spot::http::query::{CancelOrderParams, CancelReplaceOrderParams, NewOrderParams},
59};
60
61/// Environment variable key for Binance API key.
62pub const BINANCE_API_KEY: &str = "BINANCE_API_KEY";
63
64/// Environment variable key for Binance API secret.
65pub const BINANCE_API_SECRET: &str = "BINANCE_API_SECRET";
66
67/// Pre-interned rate limit key for order operations (place/cancel/replace).
68///
69/// Binance WebSocket API: 1200 requests per minute per IP (20/sec).
70pub static BINANCE_WS_RATE_LIMIT_KEY_ORDER: LazyLock<[Ustr; 1]> =
71    LazyLock::new(|| [Ustr::from("order")]);
72
73/// Binance WebSocket API order rate limit: 1200 per minute (20/sec).
74///
75/// Based on Binance documentation for WebSocket API rate limits.
76// Constant values are provably valid
77#[expect(clippy::missing_panics_doc)]
78#[must_use]
79pub fn binance_ws_order_quota() -> Quota {
80    Quota::per_second(NonZeroU32::new(20).expect("non-zero")).expect("valid constant")
81}
82
83/// Binance Spot WebSocket API client for SBE trading.
84///
85/// This client provides order management via WebSocket with SBE-encoded responses,
86/// complementing the HTTP client with lower-latency order submission.
87#[derive(Clone)]
88pub struct BinanceSpotWsTradingClient {
89    url: String,
90    credential: Arc<SigningCredential>,
91    heartbeat: Option<u64>,
92    signal: Arc<AtomicBool>,
93    connection_mode: Arc<ArcSwap<AtomicU8>>,
94    cmd_tx:
95        Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<BinanceSpotWsTradingCommand>>>,
96    out_rx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsTradingMessage>>>>,
97    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
98    request_id_counter: Arc<AtomicU64>,
99    cancellation_token: CancellationToken,
100    transport_backend: TransportBackend,
101}
102
103impl Debug for BinanceSpotWsTradingClient {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        f.debug_struct(stringify!(BinanceSpotWsTradingClient))
106            .field("url", &self.url)
107            .field("credential", &REDACTED)
108            .field("heartbeat", &self.heartbeat)
109            .finish_non_exhaustive()
110    }
111}
112
113impl BinanceSpotWsTradingClient {
114    /// Creates a new [`BinanceSpotWsTradingClient`] instance.
115    #[must_use]
116    pub fn new(
117        url: Option<String>,
118        api_key: String,
119        api_secret: String,
120        heartbeat: Option<u64>,
121        transport_backend: TransportBackend,
122    ) -> Self {
123        let url = url.unwrap_or_else(|| BINANCE_SPOT_SBE_WS_API_URL.to_string());
124        let credential = Arc::new(SigningCredential::new(api_key, api_secret));
125
126        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel();
127
128        Self {
129            url,
130            credential,
131            heartbeat,
132            signal: Arc::new(AtomicBool::new(false)),
133            connection_mode: Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
134                ConnectionMode::Closed as u8,
135            )))),
136            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
137            out_rx: Arc::new(Mutex::new(None)),
138            task_handle: None,
139            request_id_counter: Arc::new(AtomicU64::new(1)),
140            cancellation_token: CancellationToken::new(),
141            transport_backend,
142        }
143    }
144
145    /// Creates a new client with credentials sourced from environment variables.
146    ///
147    /// Falls back to env vars if `api_key` or `api_secret` are `None`:
148    /// - `BINANCE_API_KEY` for the API key
149    /// - `BINANCE_API_SECRET` for the API secret
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if credentials are missing from environment.
154    pub fn with_env(
155        url: Option<String>,
156        api_key: Option<String>,
157        api_secret: Option<String>,
158        heartbeat: Option<u64>,
159        transport_backend: TransportBackend,
160    ) -> anyhow::Result<Self> {
161        let api_key = nautilus_core::env::get_or_env_var(api_key, BINANCE_API_KEY)?;
162        let api_secret = nautilus_core::env::get_or_env_var(api_secret, BINANCE_API_SECRET)?;
163        Ok(Self::new(
164            url,
165            api_key,
166            api_secret,
167            heartbeat,
168            transport_backend,
169        ))
170    }
171
172    /// Creates a new client with credentials loaded entirely from environment variables.
173    ///
174    /// Reads:
175    /// - `BINANCE_API_KEY` for the API key
176    /// - `BINANCE_API_SECRET` for the API secret
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if environment variables are missing.
181    pub fn from_env(url: Option<String>, heartbeat: Option<u64>) -> anyhow::Result<Self> {
182        Self::with_env(url, None, None, heartbeat, TransportBackend::default())
183    }
184
185    /// Returns whether the client is actively connected.
186    #[must_use]
187    pub fn is_active(&self) -> bool {
188        let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
189        mode_u8 == ConnectionMode::Active as u8
190    }
191
192    /// Returns whether the client is closed.
193    #[must_use]
194    pub fn is_closed(&self) -> bool {
195        let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
196        mode_u8 == ConnectionMode::Closed as u8
197    }
198
199    /// Generates the next request ID.
200    pub fn next_request_id(&self) -> String {
201        let id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
202        format!("req-{id}")
203    }
204
205    /// Connects to the WebSocket API server.
206    ///
207    /// # Errors
208    ///
209    /// Returns an error if connection fails.
210    // Mutex poisoning is not documented individually
211    #[expect(clippy::missing_panics_doc)]
212    pub async fn connect(&mut self) -> BinanceWsApiResult<()> {
213        self.signal.store(false, Ordering::Relaxed);
214        self.cancellation_token = CancellationToken::new();
215
216        let (raw_handler, raw_rx) = channel_message_handler();
217        let ping_handler: PingHandler = Arc::new(move |_| {});
218
219        let headers = vec![(
220            BINANCE_API_KEY_HEADER.to_string(),
221            self.credential.api_key().to_string(),
222        )];
223
224        let config = WebSocketConfig {
225            url: self.url.clone(),
226            headers,
227            heartbeat: self.heartbeat,
228            heartbeat_msg: None,
229            reconnect_timeout_ms: Some(5_000),
230            reconnect_delay_initial_ms: Some(500),
231            reconnect_delay_max_ms: Some(5_000),
232            reconnect_backoff_factor: Some(2.0),
233            reconnect_jitter_ms: Some(250),
234            reconnect_max_attempts: None,
235            idle_timeout_ms: None,
236            backend: self.transport_backend,
237            proxy_url: None,
238        };
239
240        // Configure rate limits for order operations
241        let keyed_quotas = vec![(
242            BINANCE_WS_RATE_LIMIT_KEY_ORDER[0].as_str().to_string(),
243            binance_ws_order_quota(),
244        )];
245
246        let client = WebSocketClient::connect(
247            config,
248            Some(raw_handler),
249            Some(ping_handler),
250            None,
251            keyed_quotas,
252            Some(binance_ws_order_quota()), // Default quota for all operations
253        )
254        .await
255        .map_err(|e| BinanceWsApiError::ConnectionError(e.to_string()))?;
256
257        self.connection_mode.store(client.connection_mode_atomic());
258
259        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
260        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
261
262        {
263            let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
264            *rx_guard = Some(out_rx);
265        }
266
267        {
268            let mut tx_guard = self.cmd_tx.write().await;
269            *tx_guard = cmd_tx;
270        }
271
272        let signal = self.signal.clone();
273        let credential = self.credential.clone();
274        let mut handler =
275            BinanceSpotWsTradingHandler::new(signal, cmd_rx, raw_rx, out_tx, credential);
276
277        self.cmd_tx
278            .read()
279            .await
280            .send(BinanceSpotWsTradingCommand::SetClient(client))
281            .map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))?;
282
283        let cancellation_token = self.cancellation_token.clone();
284
285        let handle = get_runtime().spawn(async move {
286            tokio::select! {
287                () = cancellation_token.cancelled() => {
288                    log::debug!("Handler task cancelled");
289                }
290                _ = handler.run() => {
291                    log::debug!("Handler run completed");
292                }
293            }
294        });
295
296        self.task_handle = Some(Arc::new(handle));
297
298        Ok(())
299    }
300
301    /// Disconnects from the WebSocket API server.
302    pub async fn disconnect(&mut self) {
303        self.signal.store(true, Ordering::Relaxed);
304
305        if let Err(e) = self
306            .cmd_tx
307            .read()
308            .await
309            .send(BinanceSpotWsTradingCommand::Disconnect)
310        {
311            log::warn!("Failed to send disconnect command: {e}");
312        }
313
314        self.cancellation_token.cancel();
315
316        if let Some(handle) = self.task_handle.take()
317            && let Ok(handle) = Arc::try_unwrap(handle)
318        {
319            let _ = handle.await;
320        }
321    }
322
323    /// Places a new order via WebSocket API.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if the handler is unavailable.
328    pub async fn place_order(&self, params: NewOrderParams) -> BinanceWsApiResult<String> {
329        let id = self.next_request_id();
330        self.place_order_with_id(id.clone(), params).await?;
331        Ok(id)
332    }
333
334    /// Places a new order via WebSocket API using a pre-generated request ID.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if the handler is unavailable.
339    pub async fn place_order_with_id(
340        &self,
341        id: String,
342        params: NewOrderParams,
343    ) -> BinanceWsApiResult<()> {
344        let cmd = BinanceSpotWsTradingCommand::PlaceOrder { id, params };
345        self.send_cmd(cmd).await
346    }
347
348    /// Cancels an order via WebSocket API.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if the handler is unavailable.
353    pub async fn cancel_order(&self, params: CancelOrderParams) -> BinanceWsApiResult<String> {
354        let id = self.next_request_id();
355        self.cancel_order_with_id(id.clone(), params).await?;
356        Ok(id)
357    }
358
359    /// Cancels an order via WebSocket API using a pre-generated request ID.
360    ///
361    /// # Errors
362    ///
363    /// Returns an error if the handler is unavailable.
364    pub async fn cancel_order_with_id(
365        &self,
366        id: String,
367        params: CancelOrderParams,
368    ) -> BinanceWsApiResult<()> {
369        let cmd = BinanceSpotWsTradingCommand::CancelOrder { id, params };
370        self.send_cmd(cmd).await
371    }
372
373    /// Cancels and replaces an order atomically via WebSocket API.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if the handler is unavailable.
378    pub async fn cancel_replace_order(
379        &self,
380        params: CancelReplaceOrderParams,
381    ) -> BinanceWsApiResult<String> {
382        let id = self.next_request_id();
383        self.cancel_replace_order_with_id(id.clone(), params)
384            .await?;
385        Ok(id)
386    }
387
388    /// Cancels and replaces an order atomically via WebSocket API using a pre-generated request ID.
389    ///
390    /// # Errors
391    ///
392    /// Returns an error if the handler is unavailable.
393    pub async fn cancel_replace_order_with_id(
394        &self,
395        id: String,
396        params: CancelReplaceOrderParams,
397    ) -> BinanceWsApiResult<()> {
398        let cmd = BinanceSpotWsTradingCommand::CancelReplaceOrder { id, params };
399        self.send_cmd(cmd).await
400    }
401
402    /// Cancels all open orders for a symbol via WebSocket API.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if the handler is unavailable.
407    pub async fn cancel_all_orders(&self, symbol: impl Into<String>) -> BinanceWsApiResult<String> {
408        let id = self.next_request_id();
409        let cmd = BinanceSpotWsTradingCommand::CancelAllOrders {
410            id: id.clone(),
411            symbol: symbol.into(),
412        };
413        self.send_cmd(cmd).await?;
414        Ok(id)
415    }
416
417    /// Receives the next message from the handler.
418    ///
419    /// Returns `None` if the receiver is closed or not initialized.
420    ///
421    /// # Panics
422    ///
423    /// Panics if the internal output receiver mutex is poisoned.
424    pub async fn recv(&self) -> Option<BinanceSpotWsTradingMessage> {
425        // Take the receiver out of the mutex to avoid holding it across await
426        let rx_opt = {
427            let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
428            rx_guard.take()
429        };
430
431        if let Some(mut rx) = rx_opt {
432            let result = rx.recv().await;
433
434            let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
435            *rx_guard = Some(rx);
436            result
437        } else {
438            None
439        }
440    }
441
442    /// Authenticates the WebSocket session via `session.logon`.
443    ///
444    /// # Errors
445    ///
446    /// Returns an error if the handler is unavailable.
447    pub async fn session_logon(&self) -> BinanceWsApiResult<()> {
448        self.send_cmd(BinanceSpotWsTradingCommand::SessionLogon)
449            .await
450    }
451
452    /// Subscribes to the user data stream via `userDataStream.subscribe`.
453    ///
454    /// # Errors
455    ///
456    /// Returns an error if the handler is unavailable.
457    pub async fn subscribe_user_data(&self) -> BinanceWsApiResult<()> {
458        self.send_cmd(BinanceSpotWsTradingCommand::SubscribeUserData)
459            .await
460    }
461
462    async fn send_cmd(&self, cmd: BinanceSpotWsTradingCommand) -> BinanceWsApiResult<()> {
463        self.cmd_tx
464            .read()
465            .await
466            .send(cmd)
467            .map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))
468    }
469}