nautilus_binance/futures/websocket/trading/
client.rs1use std::{
27 fmt::Debug,
28 num::NonZeroU32,
29 sync::{
30 Arc, LazyLock, Mutex,
31 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32 },
33};
34
35use arc_swap::ArcSwap;
36use nautilus_common::live::get_runtime;
37use nautilus_core::string::secret::REDACTED;
38use nautilus_network::{
39 mode::ConnectionMode,
40 ratelimiter::quota::Quota,
41 websocket::{
42 PingHandler, TransportBackend, WebSocketClient, WebSocketConfig, channel_message_handler,
43 },
44};
45use tokio_util::sync::CancellationToken;
46use ustr::Ustr;
47
48use super::{
49 error::{BinanceFuturesWsApiError, BinanceFuturesWsApiResult},
50 handler::BinanceFuturesWsTradingHandler,
51 messages::{BinanceFuturesWsTradingCommand, BinanceFuturesWsTradingMessage},
52};
53use crate::{
54 common::{
55 consts::{BINANCE_API_KEY_HEADER, BINANCE_FUTURES_USD_WS_API_URL},
56 credential::SigningCredential,
57 },
58 futures::http::query::{
59 BinanceCancelOrderParams, BinanceModifyOrderParams, BinanceNewOrderParams,
60 },
61};
62
63pub static BINANCE_FUTURES_WS_RATE_LIMIT_KEY_ORDER: LazyLock<[Ustr; 1]> =
67 LazyLock::new(|| [Ustr::from("futures_order")]);
68
69#[expect(clippy::missing_panics_doc)]
72#[must_use]
73pub fn binance_futures_ws_order_quota() -> Quota {
74 Quota::per_second(NonZeroU32::new(20).expect("non-zero")).expect("valid constant")
75}
76
77#[derive(Clone)]
83pub struct BinanceFuturesWsTradingClient {
84 url: String,
85 credential: Arc<SigningCredential>,
86 heartbeat: Option<u64>,
87 signal: Arc<AtomicBool>,
88 connection_mode: Arc<ArcSwap<AtomicU8>>,
89 cmd_tx: Arc<
90 tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsTradingCommand>>,
91 >,
92 out_rx:
93 Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsTradingMessage>>>>,
94 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
95 request_id_counter: Arc<AtomicU64>,
96 cancellation_token: CancellationToken,
97 transport_backend: TransportBackend,
98}
99
100impl Debug for BinanceFuturesWsTradingClient {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct(stringify!(BinanceFuturesWsTradingClient))
103 .field("url", &self.url)
104 .field("credential", &REDACTED)
105 .field("heartbeat", &self.heartbeat)
106 .finish_non_exhaustive()
107 }
108}
109
110impl BinanceFuturesWsTradingClient {
111 #[must_use]
113 pub fn new(
114 url: Option<String>,
115 api_key: String,
116 api_secret: String,
117 heartbeat: Option<u64>,
118 transport_backend: TransportBackend,
119 ) -> Self {
120 let url = url.unwrap_or_else(|| BINANCE_FUTURES_USD_WS_API_URL.to_string());
121 let credential = Arc::new(SigningCredential::new(api_key, api_secret));
122
123 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel();
124
125 Self {
126 url,
127 credential,
128 heartbeat,
129 signal: Arc::new(AtomicBool::new(false)),
130 connection_mode: Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
131 ConnectionMode::Closed as u8,
132 )))),
133 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
134 out_rx: Arc::new(Mutex::new(None)),
135 task_handle: None,
136 request_id_counter: Arc::new(AtomicU64::new(1)),
137 cancellation_token: CancellationToken::new(),
138 transport_backend,
139 }
140 }
141
142 #[must_use]
144 pub fn is_active(&self) -> bool {
145 let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
146 mode_u8 == ConnectionMode::Active as u8
147 }
148
149 #[must_use]
151 pub fn is_closed(&self) -> bool {
152 let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
153 mode_u8 == ConnectionMode::Closed as u8
154 }
155
156 pub fn next_request_id(&self) -> String {
157 let id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
158 format!("req-{id}")
159 }
160
161 #[expect(clippy::missing_panics_doc)]
168 pub async fn connect(&mut self) -> BinanceFuturesWsApiResult<()> {
169 self.signal.store(false, Ordering::Relaxed);
170 self.cancellation_token = CancellationToken::new();
171
172 let (raw_handler, raw_rx) = channel_message_handler();
173 let ping_handler: PingHandler = Arc::new(move |_| {});
174
175 let headers = vec![(
176 BINANCE_API_KEY_HEADER.to_string(),
177 self.credential.api_key().to_string(),
178 )];
179
180 let config = WebSocketConfig {
181 url: self.url.clone(),
182 headers,
183 heartbeat: self.heartbeat,
184 heartbeat_msg: None,
185 reconnect_timeout_ms: Some(5_000),
186 reconnect_delay_initial_ms: Some(500),
187 reconnect_delay_max_ms: Some(5_000),
188 reconnect_backoff_factor: Some(2.0),
189 reconnect_jitter_ms: Some(250),
190 reconnect_max_attempts: None,
191 idle_timeout_ms: None,
192 backend: self.transport_backend,
193 proxy_url: None,
194 };
195
196 let keyed_quotas = vec![(
197 BINANCE_FUTURES_WS_RATE_LIMIT_KEY_ORDER[0]
198 .as_str()
199 .to_string(),
200 binance_futures_ws_order_quota(),
201 )];
202
203 let client = WebSocketClient::connect(
204 config,
205 Some(raw_handler),
206 Some(ping_handler),
207 None,
208 keyed_quotas,
209 Some(binance_futures_ws_order_quota()),
210 )
211 .await
212 .map_err(|e| BinanceFuturesWsApiError::ConnectionError(e.to_string()))?;
213
214 self.connection_mode.store(client.connection_mode_atomic());
215
216 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
217 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
218
219 {
220 let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
221 *rx_guard = Some(out_rx);
222 }
223
224 {
225 let mut tx_guard = self.cmd_tx.write().await;
226 *tx_guard = cmd_tx;
227 }
228
229 let signal = self.signal.clone();
230 let credential = self.credential.clone();
231 let mut handler =
232 BinanceFuturesWsTradingHandler::new(signal, cmd_rx, raw_rx, out_tx, credential);
233
234 self.cmd_tx
235 .read()
236 .await
237 .send(BinanceFuturesWsTradingCommand::SetClient(client))
238 .map_err(|e| BinanceFuturesWsApiError::HandlerUnavailable(e.to_string()))?;
239
240 let cancellation_token = self.cancellation_token.clone();
241
242 let handle = get_runtime().spawn(async move {
243 tokio::select! {
244 () = cancellation_token.cancelled() => {
245 log::debug!("Handler task cancelled");
246 }
247 _ = handler.run() => {
248 log::debug!("Handler run completed");
249 }
250 }
251 });
252
253 self.task_handle = Some(Arc::new(handle));
254
255 Ok(())
256 }
257
258 pub async fn disconnect(&mut self) {
260 self.signal.store(true, Ordering::Relaxed);
261
262 if let Err(e) = self
263 .cmd_tx
264 .read()
265 .await
266 .send(BinanceFuturesWsTradingCommand::Disconnect)
267 {
268 log::warn!("Failed to send disconnect command: {e}");
269 }
270
271 self.cancellation_token.cancel();
272
273 if let Some(handle) = self.task_handle.take()
274 && let Ok(handle) = Arc::try_unwrap(handle)
275 {
276 let _ = handle.await;
277 }
278 }
279
280 pub async fn place_order(
286 &self,
287 params: BinanceNewOrderParams,
288 ) -> BinanceFuturesWsApiResult<String> {
289 let id = self.next_request_id();
290 self.place_order_with_id(id.clone(), params).await?;
291 Ok(id)
292 }
293
294 pub async fn place_order_with_id(
300 &self,
301 id: String,
302 params: BinanceNewOrderParams,
303 ) -> BinanceFuturesWsApiResult<()> {
304 let cmd = BinanceFuturesWsTradingCommand::PlaceOrder { id, params };
305 self.send_cmd(cmd).await
306 }
307
308 pub async fn cancel_order(
314 &self,
315 params: BinanceCancelOrderParams,
316 ) -> BinanceFuturesWsApiResult<String> {
317 let id = self.next_request_id();
318 self.cancel_order_with_id(id.clone(), params).await?;
319 Ok(id)
320 }
321
322 pub async fn cancel_order_with_id(
328 &self,
329 id: String,
330 params: BinanceCancelOrderParams,
331 ) -> BinanceFuturesWsApiResult<()> {
332 let cmd = BinanceFuturesWsTradingCommand::CancelOrder { id, params };
333 self.send_cmd(cmd).await
334 }
335
336 pub async fn modify_order(
342 &self,
343 params: BinanceModifyOrderParams,
344 ) -> BinanceFuturesWsApiResult<String> {
345 let id = self.next_request_id();
346 self.modify_order_with_id(id.clone(), params).await?;
347 Ok(id)
348 }
349
350 pub async fn modify_order_with_id(
356 &self,
357 id: String,
358 params: BinanceModifyOrderParams,
359 ) -> BinanceFuturesWsApiResult<()> {
360 let cmd = BinanceFuturesWsTradingCommand::ModifyOrder { id, params };
361 self.send_cmd(cmd).await
362 }
363
364 pub async fn recv(&self) -> Option<BinanceFuturesWsTradingMessage> {
372 let rx_opt = {
373 let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
374 rx_guard.take()
375 };
376
377 if let Some(mut rx) = rx_opt {
378 let result = rx.recv().await;
379
380 let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
381 *rx_guard = Some(rx);
382 result
383 } else {
384 None
385 }
386 }
387
388 async fn send_cmd(&self, cmd: BinanceFuturesWsTradingCommand) -> BinanceFuturesWsApiResult<()> {
389 self.cmd_tx
390 .read()
391 .await
392 .send(cmd)
393 .map_err(|e| BinanceFuturesWsApiError::HandlerUnavailable(e.to_string()))
394 }
395}