nautilus_binance/futures/websocket/streams/
handler.rs1use std::{
23 fmt::Debug,
24 sync::{
25 Arc,
26 atomic::{AtomicBool, AtomicU64, Ordering},
27 },
28};
29
30use ahash::AHashMap;
31use nautilus_network::{
32 RECONNECTED,
33 websocket::{SubscriptionState, WebSocketClient},
34};
35
36use super::{
37 messages::{
38 BinanceFuturesAccountConfigMsg, BinanceFuturesAccountUpdateMsg, BinanceFuturesAggTradeMsg,
39 BinanceFuturesAlgoUpdateMsg, BinanceFuturesBookTickerMsg, BinanceFuturesDepthUpdateMsg,
40 BinanceFuturesKlineMsg, BinanceFuturesLiquidationMsg, BinanceFuturesListenKeyExpiredMsg,
41 BinanceFuturesMarginCallMsg, BinanceFuturesMarkPriceMsg, BinanceFuturesOrderUpdateMsg,
42 BinanceFuturesTickerMsg, BinanceFuturesTradeLiteMsg, BinanceFuturesTradeMsg,
43 BinanceFuturesWsErrorMsg, BinanceFuturesWsErrorResponse, BinanceFuturesWsStreamsCommand,
44 BinanceFuturesWsStreamsMessage, BinanceFuturesWsSubscribeRequest,
45 BinanceFuturesWsSubscribeResponse,
46 },
47 parse_data::extract_event_type,
48};
49use crate::common::{
50 consts::BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION,
51 enums::{BinanceWsEventType, BinanceWsMethod},
52};
53
54pub struct BinanceFuturesDataWsFeedHandler {
60 #[allow(dead_code)]
61 signal: Arc<AtomicBool>,
62 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsStreamsCommand>,
63 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
64 #[allow(dead_code)]
65 out_tx: tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsStreamsMessage>,
66 inner: Option<WebSocketClient>,
67 subscriptions_state: SubscriptionState,
68 request_id_counter: Arc<AtomicU64>,
69 pending_requests: AHashMap<u64, Vec<String>>,
70}
71
72impl Debug for BinanceFuturesDataWsFeedHandler {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 f.debug_struct(stringify!(BinanceFuturesDataWsFeedHandler))
75 .field("pending_requests", &self.pending_requests.len())
76 .finish_non_exhaustive()
77 }
78}
79
80impl BinanceFuturesDataWsFeedHandler {
81 pub fn new(
83 signal: Arc<AtomicBool>,
84 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesWsStreamsCommand>,
85 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
86 out_tx: tokio::sync::mpsc::UnboundedSender<BinanceFuturesWsStreamsMessage>,
87 subscriptions_state: SubscriptionState,
88 request_id_counter: Arc<AtomicU64>,
89 ) -> Self {
90 Self {
91 signal,
92 cmd_rx,
93 raw_rx,
94 out_tx,
95 inner: None,
96 subscriptions_state,
97 request_id_counter,
98 pending_requests: AHashMap::new(),
99 }
100 }
101
102 pub async fn next(&mut self) -> Option<BinanceFuturesWsStreamsMessage> {
106 loop {
107 if self.signal.load(Ordering::Relaxed) {
108 return None;
109 }
110
111 tokio::select! {
112 Some(cmd) = self.cmd_rx.recv() => {
113 self.handle_command(cmd).await;
114 }
115 Some(raw) = self.raw_rx.recv() => {
116 if let Some(msg) = self.handle_raw_message(raw).await {
117 return Some(msg);
118 }
119 }
120 else => {
121 return None;
122 }
123 }
124 }
125 }
126
127 async fn handle_command(&mut self, cmd: BinanceFuturesWsStreamsCommand) {
128 match cmd {
129 BinanceFuturesWsStreamsCommand::SetClient(client) => {
130 self.inner = Some(client);
131 }
132 BinanceFuturesWsStreamsCommand::Disconnect => {
133 if let Some(client) = &self.inner {
134 let () = client.disconnect().await;
135 }
136 self.inner = None;
137 }
138 BinanceFuturesWsStreamsCommand::Subscribe { streams } => {
139 self.send_subscribe(streams).await;
140 }
141 BinanceFuturesWsStreamsCommand::Unsubscribe { streams } => {
142 self.send_unsubscribe(streams).await;
143 }
144 }
145 }
146
147 async fn send_subscribe(&mut self, streams: Vec<String>) {
148 let Some(client) = &self.inner else {
149 log::warn!("Cannot subscribe: no client connected");
150 return;
151 };
152
153 let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
154
155 self.pending_requests.insert(request_id, streams.clone());
156
157 for stream in &streams {
158 self.subscriptions_state.mark_subscribe(stream);
159 }
160
161 let request = BinanceFuturesWsSubscribeRequest {
162 method: BinanceWsMethod::Subscribe,
163 params: streams,
164 id: request_id,
165 };
166
167 let json = match serde_json::to_string(&request) {
168 Ok(j) => j,
169 Err(e) => {
170 log::error!("Failed to serialize subscribe request: {e}");
171 return;
172 }
173 };
174
175 if let Err(e) = client
176 .send_text(json, Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
177 .await
178 {
179 log::error!("Failed to send subscribe request: {e}");
180 }
181 }
182
183 async fn send_unsubscribe(&self, streams: Vec<String>) {
184 let Some(client) = &self.inner else {
185 log::warn!("Cannot unsubscribe: no client connected");
186 return;
187 };
188
189 let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
190
191 let request = BinanceFuturesWsSubscribeRequest {
192 method: BinanceWsMethod::Unsubscribe,
193 params: streams.clone(),
194 id: request_id,
195 };
196
197 let json = match serde_json::to_string(&request) {
198 Ok(j) => j,
199 Err(e) => {
200 log::error!("Failed to serialize unsubscribe request: {e}");
201 return;
202 }
203 };
204
205 if let Err(e) = client
206 .send_text(json, Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
207 .await
208 {
209 log::error!("Failed to send unsubscribe request: {e}");
210 }
211
212 for stream in &streams {
213 self.subscriptions_state.mark_unsubscribe(stream);
214 self.subscriptions_state.confirm_unsubscribe(stream);
215 }
216 }
217
218 async fn handle_raw_message(&mut self, raw: Vec<u8>) -> Option<BinanceFuturesWsStreamsMessage> {
219 if let Ok(text) = std::str::from_utf8(&raw)
220 && text == RECONNECTED
221 {
222 log::info!("WebSocket reconnected signal received");
223 return Some(BinanceFuturesWsStreamsMessage::Reconnected);
224 }
225
226 let json: serde_json::Value = match serde_json::from_slice(&raw) {
227 Ok(j) => j,
228 Err(e) => {
229 log::warn!("Failed to parse JSON message: {e}");
230 return None;
231 }
232 };
233
234 if json.get("result").is_some() || json.get("id").is_some() {
235 self.handle_subscription_response(&json);
236 return None;
237 }
238
239 if let Some(code) = json.get("code")
240 && let Some(code) = code.as_i64()
241 {
242 let msg = json
243 .get("msg")
244 .and_then(|m| m.as_str())
245 .unwrap_or("Unknown error")
246 .to_string();
247 return Some(BinanceFuturesWsStreamsMessage::Error(
248 BinanceFuturesWsErrorMsg { code, msg },
249 ));
250 }
251
252 self.handle_stream_data(&json)
253 }
254
255 fn handle_subscription_response(&mut self, json: &serde_json::Value) {
256 if let Ok(response) =
257 serde_json::from_value::<BinanceFuturesWsSubscribeResponse>(json.clone())
258 {
259 if let Some(streams) = self.pending_requests.remove(&response.id) {
260 if response.result.is_none() {
261 for stream in &streams {
262 self.subscriptions_state.confirm_subscribe(stream);
263 }
264 log::debug!("Subscription confirmed: streams={streams:?}");
265 } else {
266 for stream in &streams {
267 self.subscriptions_state.mark_failure(stream);
268 }
269 log::warn!(
270 "Subscription failed: streams={streams:?}, result={:?}",
271 response.result
272 );
273 }
274 }
275 } else if let Ok(error) =
276 serde_json::from_value::<BinanceFuturesWsErrorResponse>(json.clone())
277 {
278 if let Some(id) = error.id
279 && let Some(streams) = self.pending_requests.remove(&id)
280 {
281 for stream in &streams {
282 self.subscriptions_state.mark_failure(stream);
283 }
284 }
285 log::warn!(
286 "WebSocket error response: code={}, msg={}",
287 error.code,
288 error.msg
289 );
290 }
291 }
292
293 fn handle_stream_data(
294 &self,
295 json: &serde_json::Value,
296 ) -> Option<BinanceFuturesWsStreamsMessage> {
297 let event_type = extract_event_type(json)?;
298
299 match event_type {
300 BinanceWsEventType::AggTrade => {
301 serde_json::from_value::<BinanceFuturesAggTradeMsg>(json.clone())
302 .map(BinanceFuturesWsStreamsMessage::AggTrade)
303 .map_err(|e| log::warn!("Failed to parse aggregate trade: {e}"))
304 .ok()
305 }
306 BinanceWsEventType::Trade => {
307 serde_json::from_value::<BinanceFuturesTradeMsg>(json.clone())
308 .map(BinanceFuturesWsStreamsMessage::Trade)
309 .map_err(|e| log::warn!("Failed to parse trade: {e}"))
310 .ok()
311 }
312 BinanceWsEventType::BookTicker => {
313 serde_json::from_value::<BinanceFuturesBookTickerMsg>(json.clone())
314 .map(BinanceFuturesWsStreamsMessage::BookTicker)
315 .map_err(|e| log::warn!("Failed to parse book ticker: {e}"))
316 .ok()
317 }
318 BinanceWsEventType::DepthUpdate => {
319 serde_json::from_value::<BinanceFuturesDepthUpdateMsg>(json.clone())
320 .map(BinanceFuturesWsStreamsMessage::DepthUpdate)
321 .map_err(|e| log::warn!("Failed to parse depth update: {e}"))
322 .ok()
323 }
324 BinanceWsEventType::MarkPriceUpdate => {
325 serde_json::from_value::<BinanceFuturesMarkPriceMsg>(json.clone())
326 .map(BinanceFuturesWsStreamsMessage::MarkPrice)
327 .map_err(|e| log::warn!("Failed to parse mark price: {e}"))
328 .ok()
329 }
330 BinanceWsEventType::Kline => {
331 serde_json::from_value::<BinanceFuturesKlineMsg>(json.clone())
332 .map(BinanceFuturesWsStreamsMessage::Kline)
333 .map_err(|e| log::warn!("Failed to parse kline: {e}"))
334 .ok()
335 }
336 BinanceWsEventType::ForceOrder => {
337 serde_json::from_value::<BinanceFuturesLiquidationMsg>(json.clone())
338 .map(BinanceFuturesWsStreamsMessage::ForceOrder)
339 .map_err(|e| log::warn!("Failed to parse force order: {e}"))
340 .ok()
341 }
342 BinanceWsEventType::Ticker24Hr => {
343 serde_json::from_value::<BinanceFuturesTickerMsg>(json.clone())
344 .map(BinanceFuturesWsStreamsMessage::Ticker)
345 .map_err(|e| log::warn!("Failed to parse ticker: {e}"))
346 .ok()
347 }
348 BinanceWsEventType::MiniTicker24Hr => {
349 log::debug!("Mini ticker not yet supported, skipping");
350 None
351 }
352 BinanceWsEventType::AccountUpdate => {
353 serde_json::from_value::<BinanceFuturesAccountUpdateMsg>(json.clone())
354 .map(|msg| {
355 log::debug!(
356 "Account update: reason={:?}, balances={}, positions={}",
357 msg.account.reason,
358 msg.account.balances.len(),
359 msg.account.positions.len()
360 );
361 BinanceFuturesWsStreamsMessage::AccountUpdate(msg)
362 })
363 .map_err(|e| log::warn!("Failed to parse account update: {e}"))
364 .ok()
365 }
366 BinanceWsEventType::OrderTradeUpdate => {
367 serde_json::from_value::<BinanceFuturesOrderUpdateMsg>(json.clone())
368 .map(|msg| {
369 log::debug!(
370 "Order update: symbol={}, order_id={}, exec={:?}, status={:?}",
371 msg.order.symbol,
372 msg.order.order_id,
373 msg.order.execution_type,
374 msg.order.order_status
375 );
376 BinanceFuturesWsStreamsMessage::OrderUpdate(Box::new(msg))
377 })
378 .map_err(|e| log::warn!("Failed to parse order update: {e}"))
379 .ok()
380 }
381 BinanceWsEventType::TradeLite => {
382 serde_json::from_value::<BinanceFuturesTradeLiteMsg>(json.clone())
383 .map(|msg| {
384 log::debug!(
385 "Trade lite: symbol={}, order_id={}, trade_id={}",
386 msg.symbol,
387 msg.order_id,
388 msg.trade_id
389 );
390 BinanceFuturesWsStreamsMessage::TradeLite(Box::new(msg))
391 })
392 .map_err(|e| log::warn!("Failed to parse trade lite: {e}"))
393 .ok()
394 }
395 BinanceWsEventType::AlgoUpdate => {
396 serde_json::from_value::<BinanceFuturesAlgoUpdateMsg>(json.clone())
397 .map(|msg| {
398 log::debug!(
399 "Algo order update: symbol={}, algo_id={}, status={:?}",
400 msg.algo_order.symbol,
401 msg.algo_order.algo_id,
402 msg.algo_order.algo_status
403 );
404 BinanceFuturesWsStreamsMessage::AlgoUpdate(Box::new(msg))
405 })
406 .map_err(|e| log::warn!("Failed to parse algo order update: {e}"))
407 .ok()
408 }
409 BinanceWsEventType::MarginCall => {
410 serde_json::from_value::<BinanceFuturesMarginCallMsg>(json.clone())
411 .map(|msg| {
412 log::warn!(
413 "Margin call: cross_wallet_balance={}, positions_at_risk={}",
414 msg.cross_wallet_balance,
415 msg.positions.len()
416 );
417 BinanceFuturesWsStreamsMessage::MarginCall(msg)
418 })
419 .map_err(|e| log::warn!("Failed to parse margin call: {e}"))
420 .ok()
421 }
422 BinanceWsEventType::AccountConfigUpdate => {
423 serde_json::from_value::<BinanceFuturesAccountConfigMsg>(json.clone())
424 .map(|msg| {
425 if let Some(ref lc) = msg.leverage_config {
426 log::debug!(
427 "Account config update: symbol={}, leverage={}",
428 lc.symbol,
429 lc.leverage
430 );
431 }
432 BinanceFuturesWsStreamsMessage::AccountConfigUpdate(msg)
433 })
434 .map_err(|e| log::warn!("Failed to parse account config update: {e}"))
435 .ok()
436 }
437 BinanceWsEventType::ListenKeyExpired => {
438 if let Ok(msg) =
439 serde_json::from_value::<BinanceFuturesListenKeyExpiredMsg>(json.clone())
440 {
441 log::warn!("Listen key expired at {}", msg.event_time);
442 }
443 Some(BinanceFuturesWsStreamsMessage::ListenKeyExpired)
444 }
445 BinanceWsEventType::Unknown => {
446 log::warn!("Unknown event type in message: {json}");
447 None
448 }
449 }
450 }
451}