Skip to main content

nautilus_binance/futures/websocket/trading/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Futures WebSocket Trading API client.
17//!
18//! ## Connection details
19//!
20//! - Endpoint: `ws-fapi.binance.com/ws-fapi/v1` (USD-M only)
21//! - Authentication: HMAC-SHA256 signature per request
22//! - JSON request/response pattern
23//! - Connection validity: 24 hours
24//! - Ping/pong: every 20 seconds
25
26use std::{
27    fmt::Debug,
28    num::NonZeroU32,
29    sync::{
30        Arc, LazyLock, Mutex,
31        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32    },
33};
34
35use arc_swap::ArcSwap;
36use nautilus_common::live::get_runtime;
37use nautilus_core::string::secret::REDACTED;
38use nautilus_network::{
39    mode::ConnectionMode,
40    ratelimiter::quota::Quota,
41    websocket::{
42        PingHandler, TransportBackend, WebSocketClient, WebSocketConfig, channel_message_handler,
43    },
44};
45use tokio_util::sync::CancellationToken;
46use ustr::Ustr;
47
48use super::{
49    error::{BinanceFuturesWsApiError, BinanceFuturesWsApiResult},
50    handler::BinanceFuturesWsTradingHandler,
51    messages::{BinanceFuturesWsTradingCommand, BinanceFuturesWsTradingMessage},
52};
53use crate::{
54    common::{
55        consts::{BINANCE_API_KEY_HEADER, BINANCE_FUTURES_USD_WS_API_URL},
56        credential::SigningCredential,
57    },
58    futures::http::query::{
59        BinanceCancelOrderParams, BinanceModifyOrderParams, BinanceNewOrderParams,
60    },
61};
62
63/// Pre-interned rate limit key for futures order operations (place/cancel/modify).
64///
65/// Binance Futures WebSocket API: 1200 requests per minute per IP (20/sec).
66pub static BINANCE_FUTURES_WS_RATE_LIMIT_KEY_ORDER: LazyLock<[Ustr; 1]> =
67    LazyLock::new(|| [Ustr::from("futures_order")]);
68
69/// Returns the Binance Futures WebSocket API order rate limit quota (1200 per minute).
70// Constant values are provably valid
71#[expect(clippy::missing_panics_doc)]
72#[must_use]
73pub fn binance_futures_ws_order_quota() -> Quota {
74    Quota::per_second(NonZeroU32::new(20).expect("non-zero")).expect("valid constant")
75}
76
77/// Binance Futures WebSocket Trading API client.
78///
79/// Provides order management via WebSocket with JSON responses,
80/// complementing the HTTP client with lower-latency order submission.
81/// Only available for USD-M Futures.
82#[derive(Clone)]
83pub struct BinanceFuturesWsTradingClient {
84    url: String,
85    credential: Arc<SigningCredential>,
86    heartbeat: Option<u64>,
87    signal: Arc<AtomicBool>,
88    connection_mode: Arc<ArcSwap<AtomicU8>>,
89    cmd_tx: Arc<
90        tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsTradingCommand>>,
91    >,
92    out_rx:
93        Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsTradingMessage>>>>,
94    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
95    request_id_counter: Arc<AtomicU64>,
96    cancellation_token: CancellationToken,
97    transport_backend: TransportBackend,
98}
99
100impl Debug for BinanceFuturesWsTradingClient {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct(stringify!(BinanceFuturesWsTradingClient))
103            .field("url", &self.url)
104            .field("credential", &REDACTED)
105            .field("heartbeat", &self.heartbeat)
106            .finish_non_exhaustive()
107    }
108}
109
110impl BinanceFuturesWsTradingClient {
111    /// Creates a new [`BinanceFuturesWsTradingClient`] instance.
112    #[must_use]
113    pub fn new(
114        url: Option<String>,
115        api_key: String,
116        api_secret: String,
117        heartbeat: Option<u64>,
118        transport_backend: TransportBackend,
119    ) -> Self {
120        let url = url.unwrap_or_else(|| BINANCE_FUTURES_USD_WS_API_URL.to_string());
121        let credential = Arc::new(SigningCredential::new(api_key, api_secret));
122
123        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel();
124
125        Self {
126            url,
127            credential,
128            heartbeat,
129            signal: Arc::new(AtomicBool::new(false)),
130            connection_mode: Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
131                ConnectionMode::Closed as u8,
132            )))),
133            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
134            out_rx: Arc::new(Mutex::new(None)),
135            task_handle: None,
136            request_id_counter: Arc::new(AtomicU64::new(1)),
137            cancellation_token: CancellationToken::new(),
138            transport_backend,
139        }
140    }
141
142    /// Returns whether the client is actively connected.
143    #[must_use]
144    pub fn is_active(&self) -> bool {
145        let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
146        mode_u8 == ConnectionMode::Active as u8
147    }
148
149    /// Returns whether the client is closed.
150    #[must_use]
151    pub fn is_closed(&self) -> bool {
152        let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
153        mode_u8 == ConnectionMode::Closed as u8
154    }
155
156    pub fn next_request_id(&self) -> String {
157        let id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
158        format!("req-{id}")
159    }
160
161    /// Connects to the WebSocket Trading API server.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if connection fails.
166    // Mutex poisoning is not documented individually
167    #[expect(clippy::missing_panics_doc)]
168    pub async fn connect(&mut self) -> BinanceFuturesWsApiResult<()> {
169        self.signal.store(false, Ordering::Relaxed);
170        self.cancellation_token = CancellationToken::new();
171
172        let (raw_handler, raw_rx) = channel_message_handler();
173        let ping_handler: PingHandler = Arc::new(move |_| {});
174
175        let headers = vec![(
176            BINANCE_API_KEY_HEADER.to_string(),
177            self.credential.api_key().to_string(),
178        )];
179
180        let config = WebSocketConfig {
181            url: self.url.clone(),
182            headers,
183            heartbeat: self.heartbeat,
184            heartbeat_msg: None,
185            reconnect_timeout_ms: Some(5_000),
186            reconnect_delay_initial_ms: Some(500),
187            reconnect_delay_max_ms: Some(5_000),
188            reconnect_backoff_factor: Some(2.0),
189            reconnect_jitter_ms: Some(250),
190            reconnect_max_attempts: None,
191            idle_timeout_ms: None,
192            backend: self.transport_backend,
193            proxy_url: None,
194        };
195
196        let keyed_quotas = vec![(
197            BINANCE_FUTURES_WS_RATE_LIMIT_KEY_ORDER[0]
198                .as_str()
199                .to_string(),
200            binance_futures_ws_order_quota(),
201        )];
202
203        let client = WebSocketClient::connect(
204            config,
205            Some(raw_handler),
206            Some(ping_handler),
207            None,
208            keyed_quotas,
209            Some(binance_futures_ws_order_quota()),
210        )
211        .await
212        .map_err(|e| BinanceFuturesWsApiError::ConnectionError(e.to_string()))?;
213
214        self.connection_mode.store(client.connection_mode_atomic());
215
216        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
217        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
218
219        {
220            let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
221            *rx_guard = Some(out_rx);
222        }
223
224        {
225            let mut tx_guard = self.cmd_tx.write().await;
226            *tx_guard = cmd_tx;
227        }
228
229        let signal = self.signal.clone();
230        let credential = self.credential.clone();
231        let mut handler =
232            BinanceFuturesWsTradingHandler::new(signal, cmd_rx, raw_rx, out_tx, credential);
233
234        self.cmd_tx
235            .read()
236            .await
237            .send(BinanceFuturesWsTradingCommand::SetClient(client))
238            .map_err(|e| BinanceFuturesWsApiError::HandlerUnavailable(e.to_string()))?;
239
240        let cancellation_token = self.cancellation_token.clone();
241
242        let handle = get_runtime().spawn(async move {
243            tokio::select! {
244                () = cancellation_token.cancelled() => {
245                    log::debug!("Handler task cancelled");
246                }
247                _ = handler.run() => {
248                    log::debug!("Handler run completed");
249                }
250            }
251        });
252
253        self.task_handle = Some(Arc::new(handle));
254
255        Ok(())
256    }
257
258    /// Disconnects from the WebSocket Trading API server.
259    pub async fn disconnect(&mut self) {
260        self.signal.store(true, Ordering::Relaxed);
261
262        if let Err(e) = self
263            .cmd_tx
264            .read()
265            .await
266            .send(BinanceFuturesWsTradingCommand::Disconnect)
267        {
268            log::warn!("Failed to send disconnect command: {e}");
269        }
270
271        self.cancellation_token.cancel();
272
273        if let Some(handle) = self.task_handle.take()
274            && let Ok(handle) = Arc::try_unwrap(handle)
275        {
276            let _ = handle.await;
277        }
278    }
279
280    /// Places a new order via the WebSocket Trading API.
281    ///
282    /// # Errors
283    ///
284    /// Returns an error if the handler is unavailable.
285    pub async fn place_order(
286        &self,
287        params: BinanceNewOrderParams,
288    ) -> BinanceFuturesWsApiResult<String> {
289        let id = self.next_request_id();
290        self.place_order_with_id(id.clone(), params).await?;
291        Ok(id)
292    }
293
294    /// Places a new order via the WebSocket Trading API using a pre-generated request ID.
295    ///
296    /// # Errors
297    ///
298    /// Returns an error if the handler is unavailable.
299    pub async fn place_order_with_id(
300        &self,
301        id: String,
302        params: BinanceNewOrderParams,
303    ) -> BinanceFuturesWsApiResult<()> {
304        let cmd = BinanceFuturesWsTradingCommand::PlaceOrder { id, params };
305        self.send_cmd(cmd).await
306    }
307
308    /// Cancels an order via the WebSocket Trading API.
309    ///
310    /// # Errors
311    ///
312    /// Returns an error if the handler is unavailable.
313    pub async fn cancel_order(
314        &self,
315        params: BinanceCancelOrderParams,
316    ) -> BinanceFuturesWsApiResult<String> {
317        let id = self.next_request_id();
318        self.cancel_order_with_id(id.clone(), params).await?;
319        Ok(id)
320    }
321
322    /// Cancels an order via the WebSocket Trading API using a pre-generated request ID.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the handler is unavailable.
327    pub async fn cancel_order_with_id(
328        &self,
329        id: String,
330        params: BinanceCancelOrderParams,
331    ) -> BinanceFuturesWsApiResult<()> {
332        let cmd = BinanceFuturesWsTradingCommand::CancelOrder { id, params };
333        self.send_cmd(cmd).await
334    }
335
336    /// Modifies an order via the WebSocket Trading API (in-place amendment).
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if the handler is unavailable.
341    pub async fn modify_order(
342        &self,
343        params: BinanceModifyOrderParams,
344    ) -> BinanceFuturesWsApiResult<String> {
345        let id = self.next_request_id();
346        self.modify_order_with_id(id.clone(), params).await?;
347        Ok(id)
348    }
349
350    /// Modifies an order via the WebSocket Trading API using a pre-generated request ID.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if the handler is unavailable.
355    pub async fn modify_order_with_id(
356        &self,
357        id: String,
358        params: BinanceModifyOrderParams,
359    ) -> BinanceFuturesWsApiResult<()> {
360        let cmd = BinanceFuturesWsTradingCommand::ModifyOrder { id, params };
361        self.send_cmd(cmd).await
362    }
363
364    /// Receives the next message from the handler.
365    ///
366    /// Returns `None` if the receiver is closed or not initialized.
367    ///
368    /// # Panics
369    ///
370    /// Panics if the internal output receiver mutex is poisoned.
371    pub async fn recv(&self) -> Option<BinanceFuturesWsTradingMessage> {
372        let rx_opt = {
373            let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
374            rx_guard.take()
375        };
376
377        if let Some(mut rx) = rx_opt {
378            let result = rx.recv().await;
379
380            let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
381            *rx_guard = Some(rx);
382            result
383        } else {
384            None
385        }
386    }
387
388    async fn send_cmd(&self, cmd: BinanceFuturesWsTradingCommand) -> BinanceFuturesWsApiResult<()> {
389        self.cmd_tx
390            .read()
391            .await
392            .send(cmd)
393            .map_err(|e| BinanceFuturesWsApiError::HandlerUnavailable(e.to_string()))
394    }
395}