nautilus_binance/spot/websocket/streams/
handler.rs1use std::{
23 collections::VecDeque,
24 sync::{
25 Arc,
26 atomic::{AtomicBool, AtomicU64, Ordering},
27 },
28};
29
30use ahash::AHashMap;
31use nautilus_network::{
32 RECONNECTED,
33 websocket::{SubscriptionState, WebSocketClient},
34};
35use tokio_tungstenite::tungstenite::Message;
36use ustr::Ustr;
37
38pub use super::parse::{MarketDataMessage, decode_market_data};
39use super::{
40 messages::{
41 BinanceSpotWsMessage, BinanceSpotWsStreamsCommand, BinanceWsErrorMsg,
42 BinanceWsErrorResponse, BinanceWsResponse, BinanceWsSubscription,
43 },
44 parse::decode_market_data as decode_sbe,
45};
46use crate::common::consts::BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION;
47
48pub(super) struct BinanceSpotWsFeedHandler {
54 #[allow(dead_code)]
55 signal: Arc<AtomicBool>,
56 inner: Option<WebSocketClient>,
57 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsStreamsCommand>,
58 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
59 #[allow(dead_code)]
60 out_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsMessage>,
61 subscriptions: SubscriptionState,
62 request_id_counter: Arc<AtomicU64>,
63 pending_messages: VecDeque<BinanceSpotWsMessage>,
64 pending_requests: AHashMap<u64, Vec<String>>,
65}
66
67impl BinanceSpotWsFeedHandler {
68 pub(super) fn new(
70 signal: Arc<AtomicBool>,
71 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceSpotWsStreamsCommand>,
72 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
73 out_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsMessage>,
74 subscriptions: SubscriptionState,
75 request_id_counter: Arc<AtomicU64>,
76 ) -> Self {
77 Self {
78 signal,
79 inner: None,
80 cmd_rx,
81 raw_rx,
82 out_tx,
83 subscriptions,
84 request_id_counter,
85 pending_messages: VecDeque::new(),
86 pending_requests: AHashMap::new(),
87 }
88 }
89
90 pub(super) async fn next(&mut self) -> Option<BinanceSpotWsMessage> {
94 if let Some(message) = self.pending_messages.pop_front() {
95 return Some(message);
96 }
97
98 loop {
99 tokio::select! {
100 Some(cmd) = self.cmd_rx.recv() => {
101 match cmd {
102 BinanceSpotWsStreamsCommand::SetClient(client) => {
103 log::debug!("Handler received WebSocket client");
104 self.inner = Some(client);
105 }
106 BinanceSpotWsStreamsCommand::Disconnect => {
107 log::debug!("Handler disconnecting WebSocket client");
108 self.inner = None;
109 return None;
110 }
111 BinanceSpotWsStreamsCommand::Subscribe { streams } => {
112 if let Err(e) = self.handle_subscribe(streams).await {
113 log::error!("Failed to handle subscribe command: {e}");
114 }
115 }
116 BinanceSpotWsStreamsCommand::Unsubscribe { streams } => {
117 if let Err(e) = self.handle_unsubscribe(streams).await {
118 log::error!("Failed to handle unsubscribe command: {e}");
119 }
120 }
121 }
122 }
123 Some(msg) = self.raw_rx.recv() => {
124 if let Message::Text(ref text) = msg
125 && text.as_str() == RECONNECTED
126 {
127 log::info!("Handler received reconnection signal");
128 return Some(BinanceSpotWsMessage::Reconnected);
129 }
130
131 let messages = self.handle_message(msg);
132 if !messages.is_empty() {
133 let mut iter = messages.into_iter();
134 let first = iter.next();
135 self.pending_messages.extend(iter);
136
137 if let Some(msg) = first {
138 return Some(msg);
139 }
140 }
141 }
142 else => {
143 return None;
144 }
145 }
146 }
147 }
148
149 fn handle_message(&mut self, msg: Message) -> Vec<BinanceSpotWsMessage> {
150 match msg {
151 Message::Binary(data) => self.handle_binary_frame(&data),
152 Message::Text(text) => self.handle_text_frame(&text),
153 Message::Close(_) => {
154 log::debug!("Received close frame");
155 vec![]
156 }
157 Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => vec![],
158 }
159 }
160
161 fn handle_binary_frame(&self, data: &[u8]) -> Vec<BinanceSpotWsMessage> {
162 match decode_sbe(data) {
163 Ok(MarketDataMessage::Trades(event)) => {
164 vec![BinanceSpotWsMessage::Trades(event)]
165 }
166 Ok(MarketDataMessage::BestBidAsk(event)) => {
167 vec![BinanceSpotWsMessage::BestBidAsk(event)]
168 }
169 Ok(MarketDataMessage::DepthSnapshot(event)) => {
170 vec![BinanceSpotWsMessage::DepthSnapshot(event)]
171 }
172 Ok(MarketDataMessage::DepthDiff(event)) => {
173 vec![BinanceSpotWsMessage::DepthDiff(event)]
174 }
175 Err(e) => {
176 log::error!("SBE decode error: {e}");
177 vec![BinanceSpotWsMessage::RawBinary(data.to_vec())]
178 }
179 }
180 }
181
182 fn handle_text_frame(&mut self, text: &str) -> Vec<BinanceSpotWsMessage> {
183 if let Ok(response) = serde_json::from_str::<BinanceWsResponse>(text) {
184 self.handle_subscription_response(&response);
185 return vec![];
186 }
187
188 if let Ok(error) = serde_json::from_str::<BinanceWsErrorResponse>(text) {
189 if let Some(id) = error.id
190 && let Some(streams) = self.pending_requests.remove(&id)
191 {
192 for stream in &streams {
193 self.subscriptions.mark_failure(stream);
194 }
195 log::warn!(
196 "Subscription request failed: id={id}, streams={streams:?}, code={}, msg={}",
197 error.code,
198 error.msg
199 );
200 }
201 return vec![BinanceSpotWsMessage::Error(BinanceWsErrorMsg {
202 code: error.code,
203 msg: error.msg,
204 })];
205 }
206
207 if let Ok(value) = serde_json::from_str(text) {
208 vec![BinanceSpotWsMessage::RawJson(value)]
209 } else {
210 log::warn!("Failed to parse JSON message: {text}");
211 vec![]
212 }
213 }
214
215 fn handle_subscription_response(&mut self, response: &BinanceWsResponse) {
216 if let Some(streams) = self.pending_requests.remove(&response.id) {
217 if response.result.is_none() {
218 for stream in &streams {
219 self.subscriptions.confirm_subscribe(stream);
220 }
221 log::debug!("Subscription confirmed: streams={streams:?}");
222 } else {
223 for stream in &streams {
224 self.subscriptions.mark_failure(stream);
225 }
226 log::warn!(
227 "Subscription failed: streams={streams:?}, result={:?}",
228 response.result
229 );
230 }
231 } else {
232 log::debug!("Received response for unknown request: id={}", response.id);
233 }
234 }
235
236 async fn handle_subscribe(&mut self, streams: Vec<String>) -> anyhow::Result<()> {
237 let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
238 let request = BinanceWsSubscription::subscribe(streams.clone(), request_id);
239 let payload = serde_json::to_string(&request)?;
240
241 self.pending_requests.insert(request_id, streams.clone());
242
243 for stream in &streams {
244 self.subscriptions.mark_subscribe(stream);
245 }
246
247 self.send_text(
248 payload,
249 Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()),
250 )
251 .await?;
252 Ok(())
253 }
254
255 async fn handle_unsubscribe(&self, streams: Vec<String>) -> anyhow::Result<()> {
256 let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
257 let request = BinanceWsSubscription::unsubscribe(streams.clone(), request_id);
258 let payload = serde_json::to_string(&request)?;
259
260 self.send_text(
261 payload,
262 Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()),
263 )
264 .await?;
265
266 for stream in &streams {
267 self.subscriptions.mark_unsubscribe(stream);
268 self.subscriptions.confirm_unsubscribe(stream);
269 }
270
271 Ok(())
272 }
273
274 async fn send_text(
275 &self,
276 payload: String,
277 rate_limit_keys: Option<&[Ustr]>,
278 ) -> anyhow::Result<()> {
279 let Some(client) = &self.inner else {
280 anyhow::bail!("No active WebSocket client");
281 };
282 client
283 .send_text(payload, rate_limit_keys)
284 .await
285 .map_err(|e| anyhow::anyhow!("Failed to send message: {e}"))?;
286 Ok(())
287 }
288}