Skip to main content

nautilus_binance/futures/websocket/trading/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Futures WebSocket Trading API message handler.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21//!
22//! Unlike the Spot handler which decodes SBE binary responses, the Futures handler
23//! works with JSON text responses throughout.
24
25use std::{
26    fmt::Debug,
27    sync::{
28        Arc,
29        atomic::{AtomicBool, Ordering},
30    },
31};
32
33use ahash::AHashMap;
34use nautilus_network::{RECONNECTED, websocket::WebSocketClient};
35use tokio_tungstenite::tungstenite::Message;
36
37use super::{
38    client::BINANCE_FUTURES_WS_RATE_LIMIT_KEY_ORDER,
39    error::{BinanceFuturesWsApiError, BinanceFuturesWsApiResult},
40    messages::{
41        BinanceFuturesWsTradingCommand, BinanceFuturesWsTradingMessage,
42        BinanceFuturesWsTradingRequest, BinanceFuturesWsTradingRequestMeta,
43        BinanceFuturesWsTradingResponse, method,
44    },
45};
46use crate::{
47    common::credential::SigningCredential,
48    futures::http::query::{
49        BinanceCancelOrderParams, BinanceModifyOrderParams, BinanceNewOrderParams,
50    },
51};
52
53/// Binance Futures WebSocket Trading API handler.
54///
55/// Runs in a dedicated Tokio task, processing commands from the client
56/// and transforming raw WebSocket JSON messages into Nautilus domain events.
57pub struct BinanceFuturesWsTradingHandler {
58    signal: Arc<AtomicBool>,
59    inner: Option<WebSocketClient>,
60    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsTradingCommand>,
61    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
62    out_tx: tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsTradingMessage>,
63    credential: Arc<SigningCredential>,
64    pending_requests: AHashMap<String, BinanceFuturesWsTradingRequestMeta>,
65}
66
67impl Debug for BinanceFuturesWsTradingHandler {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct(stringify!(BinanceFuturesWsTradingHandler))
70            .field("inner", &self.inner.as_ref().map(|_| "<client>"))
71            .field(
72                "pending_requests",
73                &format!("{} pending", self.pending_requests.len()),
74            )
75            .finish_non_exhaustive()
76    }
77}
78
79impl BinanceFuturesWsTradingHandler {
80    /// Creates a new handler instance.
81    #[must_use]
82    pub fn new(
83        signal: Arc<AtomicBool>,
84        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsTradingCommand>,
85        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
86        out_tx: tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsTradingMessage>,
87        credential: Arc<SigningCredential>,
88    ) -> Self {
89        Self {
90            signal,
91            inner: None,
92            cmd_rx,
93            raw_rx,
94            out_tx,
95            credential,
96            pending_requests: AHashMap::new(),
97        }
98    }
99
100    /// Runs the main event loop for commands and raw messages.
101    ///
102    /// Returns `false` when disconnected or the signal is set.
103    pub async fn run(&mut self) -> bool {
104        loop {
105            if self.signal.load(Ordering::Relaxed) {
106                return false;
107            }
108
109            tokio::select! {
110                Some(cmd) = self.cmd_rx.recv() => {
111                    match cmd {
112                        BinanceFuturesWsTradingCommand::SetClient(client) => {
113                            log::debug!("Handler received WebSocket client");
114                            self.inner = Some(client);
115                            self.emit(BinanceFuturesWsTradingMessage::Connected);
116                        }
117                        BinanceFuturesWsTradingCommand::Disconnect => {
118                            log::debug!("Handler disconnecting WebSocket client");
119                            self.inner = None;
120                            return false;
121                        }
122                        BinanceFuturesWsTradingCommand::PlaceOrder { id, params } => {
123                            if let Err(e) = self.handle_place_order(id.clone(), params).await {
124                                log::error!("Failed to handle place order command: {e}");
125                                self.emit(BinanceFuturesWsTradingMessage::OrderRejected {
126                                    request_id: id,
127                                    code: -1,
128                                    msg: e.to_string(),
129                                });
130                            }
131                        }
132                        BinanceFuturesWsTradingCommand::CancelOrder { id, params } => {
133                            if let Err(e) = self.handle_cancel_order(id.clone(), params).await {
134                                log::error!("Failed to handle cancel order command: {e}");
135                                self.emit(BinanceFuturesWsTradingMessage::CancelRejected {
136                                    request_id: id,
137                                    code: -1,
138                                    msg: e.to_string(),
139                                });
140                            }
141                        }
142                        BinanceFuturesWsTradingCommand::ModifyOrder { id, params } => {
143                            if let Err(e) = self.handle_modify_order(id.clone(), params).await {
144                                log::error!("Failed to handle modify order command: {e}");
145                                self.emit(BinanceFuturesWsTradingMessage::ModifyRejected {
146                                    request_id: id,
147                                    code: -1,
148                                    msg: e.to_string(),
149                                });
150                            }
151                        }
152                    }
153                }
154                Some(msg) = self.raw_rx.recv() => {
155                    if let Message::Text(ref text) = msg
156                        && text.as_str() == RECONNECTED
157                    {
158                        log::info!("Handler received reconnection signal");
159                        self.fail_pending_requests();
160                        self.emit(BinanceFuturesWsTradingMessage::Reconnected);
161                        continue;
162                    }
163
164                    self.handle_message(msg);
165                }
166                else => {
167                    return false;
168                }
169            }
170        }
171    }
172
173    fn emit(&self, msg: BinanceFuturesWsTradingMessage) {
174        if let Err(e) = self.out_tx.send(msg) {
175            log::error!("Failed to send message to output channel: {e}");
176        }
177    }
178
179    fn fail_pending_requests(&mut self) {
180        if self.pending_requests.is_empty() {
181            return;
182        }
183
184        let count = self.pending_requests.len();
185        log::warn!("Failing {count} pending requests after reconnection");
186
187        let pending = std::mem::take(&mut self.pending_requests);
188        for (request_id, meta) in pending {
189            let msg = self.create_rejection(
190                request_id,
191                -1,
192                "Connection lost before response received".to_string(),
193                meta,
194            );
195            self.emit(msg);
196        }
197    }
198
199    async fn handle_place_order(
200        &mut self,
201        id: String,
202        params: BinanceNewOrderParams,
203    ) -> BinanceFuturesWsApiResult<()> {
204        let params_json = serde_json::to_value(&params)
205            .map_err(|e| BinanceFuturesWsApiError::JsonError(e.to_string()))?;
206        let signed_params = self.sign_params(params_json)?;
207
208        let request = BinanceFuturesWsTradingRequest::new(&id, method::ORDER_PLACE, signed_params);
209        self.pending_requests
210            .insert(id.clone(), BinanceFuturesWsTradingRequestMeta::PlaceOrder);
211        self.send_request(request).await
212    }
213
214    async fn handle_cancel_order(
215        &mut self,
216        id: String,
217        params: BinanceCancelOrderParams,
218    ) -> BinanceFuturesWsApiResult<()> {
219        let params_json = serde_json::to_value(&params)
220            .map_err(|e| BinanceFuturesWsApiError::JsonError(e.to_string()))?;
221        let signed_params = self.sign_params(params_json)?;
222
223        let request = BinanceFuturesWsTradingRequest::new(&id, method::ORDER_CANCEL, signed_params);
224        self.pending_requests
225            .insert(id.clone(), BinanceFuturesWsTradingRequestMeta::CancelOrder);
226        self.send_request(request).await
227    }
228
229    async fn handle_modify_order(
230        &mut self,
231        id: String,
232        params: BinanceModifyOrderParams,
233    ) -> BinanceFuturesWsApiResult<()> {
234        let params_json = serde_json::to_value(&params)
235            .map_err(|e| BinanceFuturesWsApiError::JsonError(e.to_string()))?;
236        let signed_params = self.sign_params(params_json)?;
237
238        let request = BinanceFuturesWsTradingRequest::new(&id, method::ORDER_MODIFY, signed_params);
239        self.pending_requests
240            .insert(id.clone(), BinanceFuturesWsTradingRequestMeta::ModifyOrder);
241        self.send_request(request).await
242    }
243
244    fn sign_params(
245        &self,
246        mut params: serde_json::Value,
247    ) -> BinanceFuturesWsApiResult<serde_json::Value> {
248        let timestamp = std::time::SystemTime::now()
249            .duration_since(std::time::UNIX_EPOCH)
250            .map_err(|e| BinanceFuturesWsApiError::ClientError(e.to_string()))?
251            .as_millis() as i64;
252
253        if let Some(obj) = params.as_object_mut() {
254            obj.insert("timestamp".to_string(), serde_json::json!(timestamp));
255            obj.insert(
256                "apiKey".to_string(),
257                serde_json::json!(self.credential.api_key()),
258            );
259        }
260
261        let query_string = serde_urlencoded::to_string(&params)
262            .map_err(|e| BinanceFuturesWsApiError::ClientError(e.to_string()))?;
263        let signature = self.credential.sign(&query_string);
264
265        if let Some(obj) = params.as_object_mut() {
266            obj.insert("signature".to_string(), serde_json::json!(signature));
267        }
268
269        Ok(params)
270    }
271
272    async fn send_request(
273        &mut self,
274        request: BinanceFuturesWsTradingRequest,
275    ) -> BinanceFuturesWsApiResult<()> {
276        let client = self.inner.as_mut().ok_or_else(|| {
277            BinanceFuturesWsApiError::ConnectionError("WebSocket not connected".to_string())
278        })?;
279
280        let json = serde_json::to_string(&request)
281            .map_err(|e| BinanceFuturesWsApiError::JsonError(e.to_string()))?;
282
283        log::debug!(
284            "Sending Futures WS Trading API request id={} method={}",
285            request.id,
286            request.method
287        );
288
289        client
290            .send_text(
291                json,
292                Some(BINANCE_FUTURES_WS_RATE_LIMIT_KEY_ORDER.as_slice()),
293            )
294            .await
295            .map_err(|e| {
296                BinanceFuturesWsApiError::ConnectionError(format!("Failed to send request: {e}"))
297            })?;
298
299        Ok(())
300    }
301
302    fn handle_message(&mut self, msg: Message) {
303        match msg {
304            Message::Text(text) => self.handle_text_response(&text),
305            Message::Ping(_) | Message::Pong(_) => {}
306            Message::Close(frame) => {
307                log::debug!("WebSocket closed: {frame:?}");
308            }
309            Message::Binary(_) | Message::Frame(_) => {}
310        }
311    }
312
313    fn handle_text_response(&mut self, text: &str) {
314        let response: BinanceFuturesWsTradingResponse = match serde_json::from_str(text) {
315            Ok(r) => r,
316            Err(e) => {
317                log::warn!("Failed to parse WS Trading API response: {e}");
318                return;
319            }
320        };
321
322        let Some(meta) = self.pending_requests.remove(&response.id) else {
323            log::warn!("Received response for unknown request ID: {}", response.id);
324            return;
325        };
326
327        if response.status != 200 {
328            let (code, msg) = response.error.map(|e| (e.code, e.msg)).unwrap_or((
329                -1,
330                format!("Request failed with status {}", response.status),
331            ));
332            let rejection = self.create_rejection(response.id, code, msg, meta);
333            self.emit(rejection);
334            return;
335        }
336
337        let Some(result) = response.result else {
338            log::warn!(
339                "Missing result in success response for request {}",
340                response.id
341            );
342            return;
343        };
344
345        match meta {
346            BinanceFuturesWsTradingRequestMeta::PlaceOrder => {
347                match serde_json::from_value(result) {
348                    Ok(order) => {
349                        self.emit(BinanceFuturesWsTradingMessage::OrderAccepted {
350                            request_id: response.id,
351                            response: Box::new(order),
352                        });
353                    }
354                    Err(e) => {
355                        log::error!("Failed to deserialize order response: {e}");
356                        self.emit(BinanceFuturesWsTradingMessage::Error(e.to_string()));
357                    }
358                }
359            }
360            BinanceFuturesWsTradingRequestMeta::CancelOrder => match serde_json::from_value(result)
361            {
362                Ok(order) => {
363                    self.emit(BinanceFuturesWsTradingMessage::OrderCanceled {
364                        request_id: response.id,
365                        response: Box::new(order),
366                    });
367                }
368                Err(e) => {
369                    log::error!("Failed to deserialize cancel response: {e}");
370                    self.emit(BinanceFuturesWsTradingMessage::Error(e.to_string()));
371                }
372            },
373            BinanceFuturesWsTradingRequestMeta::ModifyOrder => match serde_json::from_value(result)
374            {
375                Ok(order) => {
376                    self.emit(BinanceFuturesWsTradingMessage::OrderModified {
377                        request_id: response.id,
378                        response: Box::new(order),
379                    });
380                }
381                Err(e) => {
382                    log::error!("Failed to deserialize modify response: {e}");
383                    self.emit(BinanceFuturesWsTradingMessage::Error(e.to_string()));
384                }
385            },
386        }
387    }
388
389    fn create_rejection(
390        &self,
391        request_id: String,
392        code: i32,
393        msg: String,
394        meta: BinanceFuturesWsTradingRequestMeta,
395    ) -> BinanceFuturesWsTradingMessage {
396        match meta {
397            BinanceFuturesWsTradingRequestMeta::PlaceOrder => {
398                BinanceFuturesWsTradingMessage::OrderRejected {
399                    request_id,
400                    code,
401                    msg,
402                }
403            }
404            BinanceFuturesWsTradingRequestMeta::CancelOrder => {
405                BinanceFuturesWsTradingMessage::CancelRejected {
406                    request_id,
407                    code,
408                    msg,
409                }
410            }
411            BinanceFuturesWsTradingRequestMeta::ModifyOrder => {
412                BinanceFuturesWsTradingMessage::ModifyRejected {
413                    request_id,
414                    code,
415                    msg,
416                }
417            }
418        }
419    }
420}