Skip to main content

nautilus_binance/futures/websocket/streams/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Futures WebSocket handler for JSON streams.
17//!
18//! The handler is a stateless I/O boundary: it deserializes raw JSON into
19//! venue-specific types and emits them on the output channel. Domain conversion
20//! happens in the data and execution client layers.
21
22use std::{
23    fmt::Debug,
24    sync::{
25        Arc,
26        atomic::{AtomicBool, AtomicU64, Ordering},
27    },
28};
29
30use ahash::AHashMap;
31use nautilus_network::{
32    RECONNECTED,
33    websocket::{SubscriptionState, WebSocketClient},
34};
35
36use super::{
37    messages::{
38        BinanceFuturesAccountConfigMsg, BinanceFuturesAccountUpdateMsg, BinanceFuturesAggTradeMsg,
39        BinanceFuturesAlgoUpdateMsg, BinanceFuturesBookTickerMsg, BinanceFuturesDepthUpdateMsg,
40        BinanceFuturesKlineMsg, BinanceFuturesLiquidationMsg, BinanceFuturesListenKeyExpiredMsg,
41        BinanceFuturesMarginCallMsg, BinanceFuturesMarkPriceMsg, BinanceFuturesOrderUpdateMsg,
42        BinanceFuturesTickerMsg, BinanceFuturesTradeLiteMsg, BinanceFuturesTradeMsg,
43        BinanceFuturesWsErrorMsg, BinanceFuturesWsErrorResponse, BinanceFuturesWsStreamsCommand,
44        BinanceFuturesWsStreamsMessage, BinanceFuturesWsSubscribeRequest,
45        BinanceFuturesWsSubscribeResponse,
46    },
47    parse_data::extract_event_type,
48};
49use crate::common::{
50    consts::BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION,
51    enums::{BinanceWsEventType, BinanceWsMethod},
52};
53
54/// Handler for Binance Futures WebSocket JSON streams.
55///
56/// Deserializes raw JSON into venue-specific types without performing
57/// domain conversion. The data and execution client layers own instrument
58/// lookups and Nautilus type construction.
59pub struct BinanceFuturesDataWsFeedHandler {
60    #[allow(dead_code)]
61    signal: Arc<AtomicBool>,
62    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsStreamsCommand>,
63    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
64    #[allow(dead_code)]
65    out_tx: tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsStreamsMessage>,
66    inner: Option<WebSocketClient>,
67    subscriptions_state: SubscriptionState,
68    request_id_counter: Arc<AtomicU64>,
69    pending_requests: AHashMap<u64, Vec<String>>,
70}
71
72impl Debug for BinanceFuturesDataWsFeedHandler {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        f.debug_struct(stringify!(BinanceFuturesDataWsFeedHandler))
75            .field("pending_requests", &self.pending_requests.len())
76            .finish_non_exhaustive()
77    }
78}
79
80impl BinanceFuturesDataWsFeedHandler {
81    /// Creates a new handler instance.
82    pub fn new(
83        signal: Arc<AtomicBool>,
84        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsStreamsCommand>,
85        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
86        out_tx: tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsStreamsMessage>,
87        subscriptions_state: SubscriptionState,
88        request_id_counter: Arc<AtomicU64>,
89    ) -> Self {
90        Self {
91            signal,
92            cmd_rx,
93            raw_rx,
94            out_tx,
95            inner: None,
96            subscriptions_state,
97            request_id_counter,
98            pending_requests: AHashMap::new(),
99        }
100    }
101
102    /// Returns the next message from the handler.
103    ///
104    /// Processes both commands and raw WebSocket messages.
105    pub async fn next(&mut self) -> Option<BinanceFuturesWsStreamsMessage> {
106        loop {
107            if self.signal.load(Ordering::Relaxed) {
108                return None;
109            }
110
111            tokio::select! {
112                Some(cmd) = self.cmd_rx.recv() => {
113                    self.handle_command(cmd).await;
114                }
115                Some(raw) = self.raw_rx.recv() => {
116                    if let Some(msg) = self.handle_raw_message(raw).await {
117                        return Some(msg);
118                    }
119                }
120                else => {
121                    return None;
122                }
123            }
124        }
125    }
126
127    async fn handle_command(&mut self, cmd: BinanceFuturesWsStreamsCommand) {
128        match cmd {
129            BinanceFuturesWsStreamsCommand::SetClient(client) => {
130                self.inner = Some(client);
131            }
132            BinanceFuturesWsStreamsCommand::Disconnect => {
133                if let Some(client) = &self.inner {
134                    let () = client.disconnect().await;
135                }
136                self.inner = None;
137            }
138            BinanceFuturesWsStreamsCommand::Subscribe { streams } => {
139                self.send_subscribe(streams).await;
140            }
141            BinanceFuturesWsStreamsCommand::Unsubscribe { streams } => {
142                self.send_unsubscribe(streams).await;
143            }
144        }
145    }
146
147    async fn send_subscribe(&mut self, streams: Vec<String>) {
148        let Some(client) = &self.inner else {
149            log::warn!("Cannot subscribe: no client connected");
150            return;
151        };
152
153        let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
154
155        self.pending_requests.insert(request_id, streams.clone());
156
157        for stream in &streams {
158            self.subscriptions_state.mark_subscribe(stream);
159        }
160
161        let request = BinanceFuturesWsSubscribeRequest {
162            method: BinanceWsMethod::Subscribe,
163            params: streams,
164            id: request_id,
165        };
166
167        let json = match serde_json::to_string(&request) {
168            Ok(j) => j,
169            Err(e) => {
170                log::error!("Failed to serialize subscribe request: {e}");
171                return;
172            }
173        };
174
175        if let Err(e) = client
176            .send_text(json, Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
177            .await
178        {
179            log::error!("Failed to send subscribe request: {e}");
180        }
181    }
182
183    async fn send_unsubscribe(&self, streams: Vec<String>) {
184        let Some(client) = &self.inner else {
185            log::warn!("Cannot unsubscribe: no client connected");
186            return;
187        };
188
189        let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
190
191        let request = BinanceFuturesWsSubscribeRequest {
192            method: BinanceWsMethod::Unsubscribe,
193            params: streams.clone(),
194            id: request_id,
195        };
196
197        let json = match serde_json::to_string(&request) {
198            Ok(j) => j,
199            Err(e) => {
200                log::error!("Failed to serialize unsubscribe request: {e}");
201                return;
202            }
203        };
204
205        if let Err(e) = client
206            .send_text(json, Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
207            .await
208        {
209            log::error!("Failed to send unsubscribe request: {e}");
210        }
211
212        for stream in &streams {
213            self.subscriptions_state.mark_unsubscribe(stream);
214            self.subscriptions_state.confirm_unsubscribe(stream);
215        }
216    }
217
218    async fn handle_raw_message(&mut self, raw: Vec<u8>) -> Option<BinanceFuturesWsStreamsMessage> {
219        if let Ok(text) = std::str::from_utf8(&raw)
220            && text == RECONNECTED
221        {
222            log::info!("WebSocket reconnected signal received");
223            return Some(BinanceFuturesWsStreamsMessage::Reconnected);
224        }
225
226        let json: serde_json::Value = match serde_json::from_slice(&raw) {
227            Ok(j) => j,
228            Err(e) => {
229                log::warn!("Failed to parse JSON message: {e}");
230                return None;
231            }
232        };
233
234        if json.get("result").is_some() || json.get("id").is_some() {
235            self.handle_subscription_response(&json);
236            return None;
237        }
238
239        if let Some(code) = json.get("code")
240            && let Some(code) = code.as_i64()
241        {
242            let msg = json
243                .get("msg")
244                .and_then(|m| m.as_str())
245                .unwrap_or("Unknown error")
246                .to_string();
247            return Some(BinanceFuturesWsStreamsMessage::Error(
248                BinanceFuturesWsErrorMsg { code, msg },
249            ));
250        }
251
252        self.handle_stream_data(&json)
253    }
254
255    fn handle_subscription_response(&mut self, json: &serde_json::Value) {
256        if let Ok(response) =
257            serde_json::from_value::<BinanceFuturesWsSubscribeResponse>(json.clone())
258        {
259            if let Some(streams) = self.pending_requests.remove(&response.id) {
260                if response.result.is_none() {
261                    for stream in &streams {
262                        self.subscriptions_state.confirm_subscribe(stream);
263                    }
264                    log::debug!("Subscription confirmed: streams={streams:?}");
265                } else {
266                    for stream in &streams {
267                        self.subscriptions_state.mark_failure(stream);
268                    }
269                    log::warn!(
270                        "Subscription failed: streams={streams:?}, result={:?}",
271                        response.result
272                    );
273                }
274            }
275        } else if let Ok(error) =
276            serde_json::from_value::<BinanceFuturesWsErrorResponse>(json.clone())
277        {
278            if let Some(id) = error.id
279                && let Some(streams) = self.pending_requests.remove(&id)
280            {
281                for stream in &streams {
282                    self.subscriptions_state.mark_failure(stream);
283                }
284            }
285            log::warn!(
286                "WebSocket error response: code={}, msg={}",
287                error.code,
288                error.msg
289            );
290        }
291    }
292
293    fn handle_stream_data(
294        &self,
295        json: &serde_json::Value,
296    ) -> Option<BinanceFuturesWsStreamsMessage> {
297        let event_type = extract_event_type(json)?;
298
299        match event_type {
300            BinanceWsEventType::AggTrade => {
301                serde_json::from_value::<BinanceFuturesAggTradeMsg>(json.clone())
302                    .map(BinanceFuturesWsStreamsMessage::AggTrade)
303                    .map_err(|e| log::warn!("Failed to parse aggregate trade: {e}"))
304                    .ok()
305            }
306            BinanceWsEventType::Trade => {
307                serde_json::from_value::<BinanceFuturesTradeMsg>(json.clone())
308                    .map(BinanceFuturesWsStreamsMessage::Trade)
309                    .map_err(|e| log::warn!("Failed to parse trade: {e}"))
310                    .ok()
311            }
312            BinanceWsEventType::BookTicker => {
313                serde_json::from_value::<BinanceFuturesBookTickerMsg>(json.clone())
314                    .map(BinanceFuturesWsStreamsMessage::BookTicker)
315                    .map_err(|e| log::warn!("Failed to parse book ticker: {e}"))
316                    .ok()
317            }
318            BinanceWsEventType::DepthUpdate => {
319                serde_json::from_value::<BinanceFuturesDepthUpdateMsg>(json.clone())
320                    .map(BinanceFuturesWsStreamsMessage::DepthUpdate)
321                    .map_err(|e| log::warn!("Failed to parse depth update: {e}"))
322                    .ok()
323            }
324            BinanceWsEventType::MarkPriceUpdate => {
325                serde_json::from_value::<BinanceFuturesMarkPriceMsg>(json.clone())
326                    .map(BinanceFuturesWsStreamsMessage::MarkPrice)
327                    .map_err(|e| log::warn!("Failed to parse mark price: {e}"))
328                    .ok()
329            }
330            BinanceWsEventType::Kline => {
331                serde_json::from_value::<BinanceFuturesKlineMsg>(json.clone())
332                    .map(BinanceFuturesWsStreamsMessage::Kline)
333                    .map_err(|e| log::warn!("Failed to parse kline: {e}"))
334                    .ok()
335            }
336            BinanceWsEventType::ForceOrder => {
337                serde_json::from_value::<BinanceFuturesLiquidationMsg>(json.clone())
338                    .map(BinanceFuturesWsStreamsMessage::ForceOrder)
339                    .map_err(|e| log::warn!("Failed to parse force order: {e}"))
340                    .ok()
341            }
342            BinanceWsEventType::Ticker24Hr => {
343                serde_json::from_value::<BinanceFuturesTickerMsg>(json.clone())
344                    .map(BinanceFuturesWsStreamsMessage::Ticker)
345                    .map_err(|e| log::warn!("Failed to parse ticker: {e}"))
346                    .ok()
347            }
348            BinanceWsEventType::MiniTicker24Hr => {
349                log::debug!("Mini ticker not yet supported, skipping");
350                None
351            }
352            BinanceWsEventType::AccountUpdate => {
353                serde_json::from_value::<BinanceFuturesAccountUpdateMsg>(json.clone())
354                    .map(|msg| {
355                        log::debug!(
356                            "Account update: reason={:?}, balances={}, positions={}",
357                            msg.account.reason,
358                            msg.account.balances.len(),
359                            msg.account.positions.len()
360                        );
361                        BinanceFuturesWsStreamsMessage::AccountUpdate(msg)
362                    })
363                    .map_err(|e| log::warn!("Failed to parse account update: {e}"))
364                    .ok()
365            }
366            BinanceWsEventType::OrderTradeUpdate => {
367                serde_json::from_value::<BinanceFuturesOrderUpdateMsg>(json.clone())
368                    .map(|msg| {
369                        log::debug!(
370                            "Order update: symbol={}, order_id={}, exec={:?}, status={:?}",
371                            msg.order.symbol,
372                            msg.order.order_id,
373                            msg.order.execution_type,
374                            msg.order.order_status
375                        );
376                        BinanceFuturesWsStreamsMessage::OrderUpdate(Box::new(msg))
377                    })
378                    .map_err(|e| log::warn!("Failed to parse order update: {e}"))
379                    .ok()
380            }
381            BinanceWsEventType::TradeLite => {
382                serde_json::from_value::<BinanceFuturesTradeLiteMsg>(json.clone())
383                    .map(|msg| {
384                        log::debug!(
385                            "Trade lite: symbol={}, order_id={}, trade_id={}",
386                            msg.symbol,
387                            msg.order_id,
388                            msg.trade_id
389                        );
390                        BinanceFuturesWsStreamsMessage::TradeLite(Box::new(msg))
391                    })
392                    .map_err(|e| log::warn!("Failed to parse trade lite: {e}"))
393                    .ok()
394            }
395            BinanceWsEventType::AlgoUpdate => {
396                serde_json::from_value::<BinanceFuturesAlgoUpdateMsg>(json.clone())
397                    .map(|msg| {
398                        log::debug!(
399                            "Algo order update: symbol={}, algo_id={}, status={:?}",
400                            msg.algo_order.symbol,
401                            msg.algo_order.algo_id,
402                            msg.algo_order.algo_status
403                        );
404                        BinanceFuturesWsStreamsMessage::AlgoUpdate(Box::new(msg))
405                    })
406                    .map_err(|e| log::warn!("Failed to parse algo order update: {e}"))
407                    .ok()
408            }
409            BinanceWsEventType::MarginCall => {
410                serde_json::from_value::<BinanceFuturesMarginCallMsg>(json.clone())
411                    .map(|msg| {
412                        log::warn!(
413                            "Margin call: cross_wallet_balance={}, positions_at_risk={}",
414                            msg.cross_wallet_balance,
415                            msg.positions.len()
416                        );
417                        BinanceFuturesWsStreamsMessage::MarginCall(msg)
418                    })
419                    .map_err(|e| log::warn!("Failed to parse margin call: {e}"))
420                    .ok()
421            }
422            BinanceWsEventType::AccountConfigUpdate => {
423                serde_json::from_value::<BinanceFuturesAccountConfigMsg>(json.clone())
424                    .map(|msg| {
425                        if let Some(ref lc) = msg.leverage_config {
426                            log::debug!(
427                                "Account config update: symbol={}, leverage={}",
428                                lc.symbol,
429                                lc.leverage
430                            );
431                        }
432                        BinanceFuturesWsStreamsMessage::AccountConfigUpdate(msg)
433                    })
434                    .map_err(|e| log::warn!("Failed to parse account config update: {e}"))
435                    .ok()
436            }
437            BinanceWsEventType::ListenKeyExpired => {
438                if let Ok(msg) =
439                    serde_json::from_value::<BinanceFuturesListenKeyExpiredMsg>(json.clone())
440                {
441                    log::warn!("Listen key expired at {}", msg.event_time);
442                }
443                Some(BinanceFuturesWsStreamsMessage::ListenKeyExpired)
444            }
445            BinanceWsEventType::Unknown => {
446                log::warn!("Unknown event type in message: {json}");
447                None
448            }
449        }
450    }
451}