Skip to main content

nautilus_binance/spot/websocket/streams/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Spot WebSocket message handler.
17//!
18//! The handler is a stateless I/O boundary: it decodes raw SBE binary frames
19//! into venue-specific event types and emits them on the output channel.
20//! Domain conversion happens in the data client layer.
21
22use std::{
23    collections::VecDeque,
24    sync::{
25        Arc,
26        atomic::{AtomicBool, AtomicU64, Ordering},
27    },
28};
29
30use ahash::AHashMap;
31use nautilus_network::{
32    RECONNECTED,
33    websocket::{SubscriptionState, WebSocketClient},
34};
35use tokio_tungstenite::tungstenite::Message;
36use ustr::Ustr;
37
38pub use super::parse::{MarketDataMessage, decode_market_data};
39use super::{
40    messages::{
41        BinanceSpotWsMessage, BinanceSpotWsStreamsCommand, BinanceWsErrorMsg,
42        BinanceWsErrorResponse, BinanceWsResponse, BinanceWsSubscription,
43    },
44    parse::decode_market_data as decode_sbe,
45};
46use crate::common::consts::BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION;
47
48/// Binance Spot WebSocket feed handler.
49///
50/// Decodes raw SBE binary frames into venue-specific event types without
51/// performing domain conversion. The data client layer owns instrument
52/// lookups and Nautilus type construction.
53pub(super) struct BinanceSpotWsFeedHandler {
54    #[allow(dead_code)]
55    signal: Arc<AtomicBool>,
56    inner: Option<WebSocketClient>,
57    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsStreamsCommand>,
58    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
59    #[allow(dead_code)]
60    out_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsMessage>,
61    subscriptions: SubscriptionState,
62    request_id_counter: Arc<AtomicU64>,
63    pending_messages: VecDeque<BinanceSpotWsMessage>,
64    pending_requests: AHashMap<u64, Vec<String>>,
65}
66
67impl BinanceSpotWsFeedHandler {
68    /// Creates a new handler instance.
69    pub(super) fn new(
70        signal: Arc<AtomicBool>,
71        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsStreamsCommand>,
72        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
73        out_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsMessage>,
74        subscriptions: SubscriptionState,
75        request_id_counter: Arc<AtomicU64>,
76    ) -> Self {
77        Self {
78            signal,
79            inner: None,
80            cmd_rx,
81            raw_rx,
82            out_tx,
83            subscriptions,
84            request_id_counter,
85            pending_messages: VecDeque::new(),
86            pending_requests: AHashMap::new(),
87        }
88    }
89
90    /// Returns the next message from the handler.
91    ///
92    /// Processes both commands and raw WebSocket messages.
93    pub(super) async fn next(&mut self) -> Option<BinanceSpotWsMessage> {
94        if let Some(message) = self.pending_messages.pop_front() {
95            return Some(message);
96        }
97
98        loop {
99            tokio::select! {
100                Some(cmd) = self.cmd_rx.recv() => {
101                    match cmd {
102                        BinanceSpotWsStreamsCommand::SetClient(client) => {
103                            log::debug!("Handler received WebSocket client");
104                            self.inner = Some(client);
105                        }
106                        BinanceSpotWsStreamsCommand::Disconnect => {
107                            log::debug!("Handler disconnecting WebSocket client");
108                            self.inner = None;
109                            return None;
110                        }
111                        BinanceSpotWsStreamsCommand::Subscribe { streams } => {
112                            if let Err(e) = self.handle_subscribe(streams).await {
113                                log::error!("Failed to handle subscribe command: {e}");
114                            }
115                        }
116                        BinanceSpotWsStreamsCommand::Unsubscribe { streams } => {
117                            if let Err(e) = self.handle_unsubscribe(streams).await {
118                                log::error!("Failed to handle unsubscribe command: {e}");
119                            }
120                        }
121                    }
122                }
123                Some(msg) = self.raw_rx.recv() => {
124                    if let Message::Text(ref text) = msg
125                        && text.as_str() == RECONNECTED
126                    {
127                        log::info!("Handler received reconnection signal");
128                        return Some(BinanceSpotWsMessage::Reconnected);
129                    }
130
131                    let messages = self.handle_message(msg);
132                    if !messages.is_empty() {
133                        let mut iter = messages.into_iter();
134                        let first = iter.next();
135                        self.pending_messages.extend(iter);
136
137                        if let Some(msg) = first {
138                            return Some(msg);
139                        }
140                    }
141                }
142                else => {
143                    return None;
144                }
145            }
146        }
147    }
148
149    fn handle_message(&mut self, msg: Message) -> Vec<BinanceSpotWsMessage> {
150        match msg {
151            Message::Binary(data) => self.handle_binary_frame(&data),
152            Message::Text(text) => self.handle_text_frame(&text),
153            Message::Close(_) => {
154                log::debug!("Received close frame");
155                vec![]
156            }
157            Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => vec![],
158        }
159    }
160
161    fn handle_binary_frame(&self, data: &[u8]) -> Vec<BinanceSpotWsMessage> {
162        match decode_sbe(data) {
163            Ok(MarketDataMessage::Trades(event)) => {
164                vec![BinanceSpotWsMessage::Trades(event)]
165            }
166            Ok(MarketDataMessage::BestBidAsk(event)) => {
167                vec![BinanceSpotWsMessage::BestBidAsk(event)]
168            }
169            Ok(MarketDataMessage::DepthSnapshot(event)) => {
170                vec![BinanceSpotWsMessage::DepthSnapshot(event)]
171            }
172            Ok(MarketDataMessage::DepthDiff(event)) => {
173                vec![BinanceSpotWsMessage::DepthDiff(event)]
174            }
175            Err(e) => {
176                log::error!("SBE decode error: {e}");
177                vec![BinanceSpotWsMessage::RawBinary(data.to_vec())]
178            }
179        }
180    }
181
182    fn handle_text_frame(&mut self, text: &str) -> Vec<BinanceSpotWsMessage> {
183        if let Ok(response) = serde_json::from_str::<BinanceWsResponse>(text) {
184            self.handle_subscription_response(&response);
185            return vec![];
186        }
187
188        if let Ok(error) = serde_json::from_str::<BinanceWsErrorResponse>(text) {
189            if let Some(id) = error.id
190                && let Some(streams) = self.pending_requests.remove(&id)
191            {
192                for stream in &streams {
193                    self.subscriptions.mark_failure(stream);
194                }
195                log::warn!(
196                    "Subscription request failed: id={id}, streams={streams:?}, code={}, msg={}",
197                    error.code,
198                    error.msg
199                );
200            }
201            return vec![BinanceSpotWsMessage::Error(BinanceWsErrorMsg {
202                code: error.code,
203                msg: error.msg,
204            })];
205        }
206
207        if let Ok(value) = serde_json::from_str(text) {
208            vec![BinanceSpotWsMessage::RawJson(value)]
209        } else {
210            log::warn!("Failed to parse JSON message: {text}");
211            vec![]
212        }
213    }
214
215    fn handle_subscription_response(&mut self, response: &BinanceWsResponse) {
216        if let Some(streams) = self.pending_requests.remove(&response.id) {
217            if response.result.is_none() {
218                for stream in &streams {
219                    self.subscriptions.confirm_subscribe(stream);
220                }
221                log::debug!("Subscription confirmed: streams={streams:?}");
222            } else {
223                for stream in &streams {
224                    self.subscriptions.mark_failure(stream);
225                }
226                log::warn!(
227                    "Subscription failed: streams={streams:?}, result={:?}",
228                    response.result
229                );
230            }
231        } else {
232            log::debug!("Received response for unknown request: id={}", response.id);
233        }
234    }
235
236    async fn handle_subscribe(&mut self, streams: Vec<String>) -> anyhow::Result<()> {
237        let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
238        let request = BinanceWsSubscription::subscribe(streams.clone(), request_id);
239        let payload = serde_json::to_string(&request)?;
240
241        self.pending_requests.insert(request_id, streams.clone());
242
243        for stream in &streams {
244            self.subscriptions.mark_subscribe(stream);
245        }
246
247        self.send_text(
248            payload,
249            Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()),
250        )
251        .await?;
252        Ok(())
253    }
254
255    async fn handle_unsubscribe(&self, streams: Vec<String>) -> anyhow::Result<()> {
256        let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
257        let request = BinanceWsSubscription::unsubscribe(streams.clone(), request_id);
258        let payload = serde_json::to_string(&request)?;
259
260        self.send_text(
261            payload,
262            Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()),
263        )
264        .await?;
265
266        for stream in &streams {
267            self.subscriptions.mark_unsubscribe(stream);
268            self.subscriptions.confirm_unsubscribe(stream);
269        }
270
271        Ok(())
272    }
273
274    async fn send_text(
275        &self,
276        payload: String,
277        rate_limit_keys: Option<&[Ustr]>,
278    ) -> anyhow::Result<()> {
279        let Some(client) = &self.inner else {
280            anyhow::bail!("No active WebSocket client");
281        };
282        client
283            .send_text(payload, rate_limit_keys)
284            .await
285            .map_err(|e| anyhow::anyhow!("Failed to send message: {e}"))?;
286        Ok(())
287    }
288}