nautilus_binance/spot/websocket/trading/
client.rs1use std::{
27 fmt::Debug,
28 num::NonZeroU32,
29 sync::{
30 Arc, LazyLock, Mutex,
31 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32 },
33};
34
35use arc_swap::ArcSwap;
36use nautilus_common::live::get_runtime;
37use nautilus_core::string::secret::REDACTED;
38use nautilus_network::{
39 mode::ConnectionMode,
40 ratelimiter::quota::Quota,
41 websocket::{
42 PingHandler, TransportBackend, WebSocketClient, WebSocketConfig, channel_message_handler,
43 },
44};
45use tokio_util::sync::CancellationToken;
46use ustr::Ustr;
47
48use super::{
49 error::{BinanceWsApiError, BinanceWsApiResult},
50 handler::BinanceSpotWsTradingHandler,
51 messages::{BinanceSpotWsTradingCommand, BinanceSpotWsTradingMessage},
52};
53use crate::{
54 common::{
55 consts::{BINANCE_API_KEY_HEADER, BINANCE_SPOT_SBE_WS_API_URL},
56 credential::SigningCredential,
57 },
58 spot::http::query::{CancelOrderParams, CancelReplaceOrderParams, NewOrderParams},
59};
60
61pub const BINANCE_API_KEY: &str = "BINANCE_API_KEY";
63
64pub const BINANCE_API_SECRET: &str = "BINANCE_API_SECRET";
66
67pub static BINANCE_WS_RATE_LIMIT_KEY_ORDER: LazyLock<[Ustr; 1]> =
71 LazyLock::new(|| [Ustr::from("order")]);
72
73#[expect(clippy::missing_panics_doc)]
78#[must_use]
79pub fn binance_ws_order_quota() -> Quota {
80 Quota::per_second(NonZeroU32::new(20).expect("non-zero")).expect("valid constant")
81}
82
83#[derive(Clone)]
88pub struct BinanceSpotWsTradingClient {
89 url: String,
90 credential: Arc<SigningCredential>,
91 heartbeat: Option<u64>,
92 signal: Arc<AtomicBool>,
93 connection_mode: Arc<ArcSwap<AtomicU8>>,
94 cmd_tx:
95 Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<BinanceSpotWsTradingCommand>>>,
96 out_rx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsTradingMessage>>>>,
97 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
98 request_id_counter: Arc<AtomicU64>,
99 cancellation_token: CancellationToken,
100 transport_backend: TransportBackend,
101}
102
103impl Debug for BinanceSpotWsTradingClient {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 f.debug_struct(stringify!(BinanceSpotWsTradingClient))
106 .field("url", &self.url)
107 .field("credential", &REDACTED)
108 .field("heartbeat", &self.heartbeat)
109 .finish_non_exhaustive()
110 }
111}
112
113impl BinanceSpotWsTradingClient {
114 #[must_use]
116 pub fn new(
117 url: Option<String>,
118 api_key: String,
119 api_secret: String,
120 heartbeat: Option<u64>,
121 transport_backend: TransportBackend,
122 ) -> Self {
123 let url = url.unwrap_or_else(|| BINANCE_SPOT_SBE_WS_API_URL.to_string());
124 let credential = Arc::new(SigningCredential::new(api_key, api_secret));
125
126 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel();
127
128 Self {
129 url,
130 credential,
131 heartbeat,
132 signal: Arc::new(AtomicBool::new(false)),
133 connection_mode: Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
134 ConnectionMode::Closed as u8,
135 )))),
136 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
137 out_rx: Arc::new(Mutex::new(None)),
138 task_handle: None,
139 request_id_counter: Arc::new(AtomicU64::new(1)),
140 cancellation_token: CancellationToken::new(),
141 transport_backend,
142 }
143 }
144
145 pub fn with_env(
155 url: Option<String>,
156 api_key: Option<String>,
157 api_secret: Option<String>,
158 heartbeat: Option<u64>,
159 transport_backend: TransportBackend,
160 ) -> anyhow::Result<Self> {
161 let api_key = nautilus_core::env::get_or_env_var(api_key, BINANCE_API_KEY)?;
162 let api_secret = nautilus_core::env::get_or_env_var(api_secret, BINANCE_API_SECRET)?;
163 Ok(Self::new(
164 url,
165 api_key,
166 api_secret,
167 heartbeat,
168 transport_backend,
169 ))
170 }
171
172 pub fn from_env(url: Option<String>, heartbeat: Option<u64>) -> anyhow::Result<Self> {
182 Self::with_env(url, None, None, heartbeat, TransportBackend::default())
183 }
184
185 #[must_use]
187 pub fn is_active(&self) -> bool {
188 let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
189 mode_u8 == ConnectionMode::Active as u8
190 }
191
192 #[must_use]
194 pub fn is_closed(&self) -> bool {
195 let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
196 mode_u8 == ConnectionMode::Closed as u8
197 }
198
199 pub fn next_request_id(&self) -> String {
201 let id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
202 format!("req-{id}")
203 }
204
205 #[expect(clippy::missing_panics_doc)]
212 pub async fn connect(&mut self) -> BinanceWsApiResult<()> {
213 self.signal.store(false, Ordering::Relaxed);
214 self.cancellation_token = CancellationToken::new();
215
216 let (raw_handler, raw_rx) = channel_message_handler();
217 let ping_handler: PingHandler = Arc::new(move |_| {});
218
219 let headers = vec![(
220 BINANCE_API_KEY_HEADER.to_string(),
221 self.credential.api_key().to_string(),
222 )];
223
224 let config = WebSocketConfig {
225 url: self.url.clone(),
226 headers,
227 heartbeat: self.heartbeat,
228 heartbeat_msg: None,
229 reconnect_timeout_ms: Some(5_000),
230 reconnect_delay_initial_ms: Some(500),
231 reconnect_delay_max_ms: Some(5_000),
232 reconnect_backoff_factor: Some(2.0),
233 reconnect_jitter_ms: Some(250),
234 reconnect_max_attempts: None,
235 idle_timeout_ms: None,
236 backend: self.transport_backend,
237 proxy_url: None,
238 };
239
240 let keyed_quotas = vec![(
242 BINANCE_WS_RATE_LIMIT_KEY_ORDER[0].as_str().to_string(),
243 binance_ws_order_quota(),
244 )];
245
246 let client = WebSocketClient::connect(
247 config,
248 Some(raw_handler),
249 Some(ping_handler),
250 None,
251 keyed_quotas,
252 Some(binance_ws_order_quota()), )
254 .await
255 .map_err(|e| BinanceWsApiError::ConnectionError(e.to_string()))?;
256
257 self.connection_mode.store(client.connection_mode_atomic());
258
259 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
260 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
261
262 {
263 let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
264 *rx_guard = Some(out_rx);
265 }
266
267 {
268 let mut tx_guard = self.cmd_tx.write().await;
269 *tx_guard = cmd_tx;
270 }
271
272 let signal = self.signal.clone();
273 let credential = self.credential.clone();
274 let mut handler =
275 BinanceSpotWsTradingHandler::new(signal, cmd_rx, raw_rx, out_tx, credential);
276
277 self.cmd_tx
278 .read()
279 .await
280 .send(BinanceSpotWsTradingCommand::SetClient(client))
281 .map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))?;
282
283 let cancellation_token = self.cancellation_token.clone();
284
285 let handle = get_runtime().spawn(async move {
286 tokio::select! {
287 () = cancellation_token.cancelled() => {
288 log::debug!("Handler task cancelled");
289 }
290 _ = handler.run() => {
291 log::debug!("Handler run completed");
292 }
293 }
294 });
295
296 self.task_handle = Some(Arc::new(handle));
297
298 Ok(())
299 }
300
301 pub async fn disconnect(&mut self) {
303 self.signal.store(true, Ordering::Relaxed);
304
305 if let Err(e) = self
306 .cmd_tx
307 .read()
308 .await
309 .send(BinanceSpotWsTradingCommand::Disconnect)
310 {
311 log::warn!("Failed to send disconnect command: {e}");
312 }
313
314 self.cancellation_token.cancel();
315
316 if let Some(handle) = self.task_handle.take()
317 && let Ok(handle) = Arc::try_unwrap(handle)
318 {
319 let _ = handle.await;
320 }
321 }
322
323 pub async fn place_order(&self, params: NewOrderParams) -> BinanceWsApiResult<String> {
329 let id = self.next_request_id();
330 self.place_order_with_id(id.clone(), params).await?;
331 Ok(id)
332 }
333
334 pub async fn place_order_with_id(
340 &self,
341 id: String,
342 params: NewOrderParams,
343 ) -> BinanceWsApiResult<()> {
344 let cmd = BinanceSpotWsTradingCommand::PlaceOrder { id, params };
345 self.send_cmd(cmd).await
346 }
347
348 pub async fn cancel_order(&self, params: CancelOrderParams) -> BinanceWsApiResult<String> {
354 let id = self.next_request_id();
355 self.cancel_order_with_id(id.clone(), params).await?;
356 Ok(id)
357 }
358
359 pub async fn cancel_order_with_id(
365 &self,
366 id: String,
367 params: CancelOrderParams,
368 ) -> BinanceWsApiResult<()> {
369 let cmd = BinanceSpotWsTradingCommand::CancelOrder { id, params };
370 self.send_cmd(cmd).await
371 }
372
373 pub async fn cancel_replace_order(
379 &self,
380 params: CancelReplaceOrderParams,
381 ) -> BinanceWsApiResult<String> {
382 let id = self.next_request_id();
383 self.cancel_replace_order_with_id(id.clone(), params)
384 .await?;
385 Ok(id)
386 }
387
388 pub async fn cancel_replace_order_with_id(
394 &self,
395 id: String,
396 params: CancelReplaceOrderParams,
397 ) -> BinanceWsApiResult<()> {
398 let cmd = BinanceSpotWsTradingCommand::CancelReplaceOrder { id, params };
399 self.send_cmd(cmd).await
400 }
401
402 pub async fn cancel_all_orders(&self, symbol: impl Into<String>) -> BinanceWsApiResult<String> {
408 let id = self.next_request_id();
409 let cmd = BinanceSpotWsTradingCommand::CancelAllOrders {
410 id: id.clone(),
411 symbol: symbol.into(),
412 };
413 self.send_cmd(cmd).await?;
414 Ok(id)
415 }
416
417 pub async fn recv(&self) -> Option<BinanceSpotWsTradingMessage> {
425 let rx_opt = {
427 let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
428 rx_guard.take()
429 };
430
431 if let Some(mut rx) = rx_opt {
432 let result = rx.recv().await;
433
434 let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
435 *rx_guard = Some(rx);
436 result
437 } else {
438 None
439 }
440 }
441
442 pub async fn session_logon(&self) -> BinanceWsApiResult<()> {
448 self.send_cmd(BinanceSpotWsTradingCommand::SessionLogon)
449 .await
450 }
451
452 pub async fn subscribe_user_data(&self) -> BinanceWsApiResult<()> {
458 self.send_cmd(BinanceSpotWsTradingCommand::SubscribeUserData)
459 .await
460 }
461
462 async fn send_cmd(&self, cmd: BinanceSpotWsTradingCommand) -> BinanceWsApiResult<()> {
463 self.cmd_tx
464 .read()
465 .await
466 .send(cmd)
467 .map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))
468 }
469}