nautilus_bitmex/websocket/
handler.rs1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use nautilus_network::{
24 RECONNECTED,
25 retry::{RetryManager, create_websocket_retry_manager},
26 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
27};
28use tokio_tungstenite::tungstenite::Message;
29
30use super::{
31 enums::{BitmexWsAuthAction, BitmexWsOperation},
32 error::BitmexWsError,
33 messages::{BitmexHttpRequest, BitmexWsFrame, BitmexWsMessage},
34};
35
36#[derive(Debug)]
38pub enum HandlerCommand {
39 SetClient(WebSocketClient),
41 Disconnect,
43 Authenticate { payload: String },
45 Subscribe { topics: Vec<String> },
47 Unsubscribe { topics: Vec<String> },
49}
50
51pub(super) struct BitmexWsFeedHandler {
52 signal: Arc<AtomicBool>,
53 inner: Option<WebSocketClient>,
54 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
55 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
56 out_tx: tokio::sync::mpsc::UnboundedSender<BitmexWsMessage>,
57 auth_tracker: AuthTracker,
58 subscriptions: SubscriptionState,
59 retry_manager: RetryManager<BitmexWsError>,
60}
61
62impl BitmexWsFeedHandler {
63 pub(super) fn new(
65 signal: Arc<AtomicBool>,
66 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
67 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
68 out_tx: tokio::sync::mpsc::UnboundedSender<BitmexWsMessage>,
69 auth_tracker: AuthTracker,
70 subscriptions: SubscriptionState,
71 ) -> Self {
72 Self {
73 signal,
74 inner: None,
75 cmd_rx,
76 raw_rx,
77 out_tx,
78 auth_tracker,
79 subscriptions,
80 retry_manager: create_websocket_retry_manager(),
81 }
82 }
83
84 pub(super) fn is_stopped(&self) -> bool {
85 self.signal.load(Ordering::Relaxed)
86 }
87
88 pub(super) fn send(&self, msg: BitmexWsMessage) -> Result<(), ()> {
89 self.out_tx.send(msg).map_err(|_| ())
90 }
91
92 async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
94 if let Some(client) = &self.inner {
95 self.retry_manager
96 .execute_with_retry(
97 "websocket_send",
98 || {
99 let payload = payload.clone();
100 async move {
101 client.send_text(payload, None).await.map_err(|e| {
102 BitmexWsError::ClientError(format!("Send failed: {e}"))
103 })
104 }
105 },
106 should_retry_bitmex_error,
107 create_bitmex_timeout_error,
108 )
109 .await
110 .map_err(|e| anyhow::anyhow!("{e}"))
111 } else {
112 Err(anyhow::anyhow!("No active WebSocket client"))
113 }
114 }
115
116 pub(super) async fn next(&mut self) -> Option<BitmexWsMessage> {
117 loop {
118 tokio::select! {
119 Some(cmd) = self.cmd_rx.recv() => {
120 match cmd {
121 HandlerCommand::SetClient(client) => {
122 log::debug!("WebSocketClient received by handler");
123 self.inner = Some(client);
124 }
125 HandlerCommand::Disconnect => {
126 log::debug!("Disconnect command received");
127
128 if let Some(client) = self.inner.take() {
129 client.disconnect().await;
130 }
131 }
132 HandlerCommand::Authenticate { payload } => {
133 log::debug!("Authenticate command received");
134
135 if let Err(e) = self.send_with_retry(payload).await {
136 log::error!("Failed to send authentication after retries: {e}");
137 }
138 }
139 HandlerCommand::Subscribe { topics } => {
140 for topic in topics {
141 log::debug!("Subscribing to topic: {topic}");
142 if let Err(e) = self.send_with_retry(topic.clone()).await {
143 log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
144 }
145 }
146 }
147 HandlerCommand::Unsubscribe { topics } => {
148 for topic in topics {
149 log::debug!("Unsubscribing from topic: {topic}");
150 if let Err(e) = self.send_with_retry(topic.clone()).await {
151 log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
152 }
153 }
154 }
155 }
156 }
157
158 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
159 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
160 log::debug!("Stop signal received during idle period");
161 return None;
162 }
163 }
164
165 msg = self.raw_rx.recv() => {
166 let msg = match msg {
167 Some(msg) => msg,
168 None => {
169 log::debug!("WebSocket stream closed");
170 return None;
171 }
172 };
173
174 if let Message::Ping(data) = &msg {
176 log::trace!("Received ping frame with {} bytes", data.len());
177
178 if let Some(client) = &self.inner
179 && let Err(e) = client.send_pong(data.to_vec()).await
180 {
181 log::warn!("Failed to send pong frame: {e}");
182 }
183 continue;
184 }
185
186 let event = match Self::parse_raw_message(msg) {
187 Some(event) => event,
188 None => continue,
189 };
190
191 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
192 log::debug!("Stop signal received");
193 return None;
194 }
195
196 match event {
197 BitmexWsFrame::Reconnected => {
198 return Some(BitmexWsMessage::Reconnected);
199 }
200 BitmexWsFrame::Subscription {
201 success,
202 subscribe,
203 request,
204 error,
205 } => {
206 if let Some(msg) = self.handle_subscription_message(
207 success,
208 subscribe.as_ref(),
209 request.as_ref(),
210 error.as_deref(),
211 ) {
212 return Some(msg);
213 }
214 }
215 BitmexWsFrame::Table(table_msg) => {
216 return Some(BitmexWsMessage::Table(table_msg));
217 }
218 BitmexWsFrame::Welcome { .. } | BitmexWsFrame::Error { .. } => {}
219 }
220 }
221
222 else => {
224 log::debug!("Handler shutting down: stream ended or command channel closed");
225 return None;
226 }
227 }
228 }
229 }
230
231 fn parse_raw_message(msg: Message) -> Option<BitmexWsFrame> {
232 match msg {
233 Message::Text(text) => {
234 if text == RECONNECTED {
235 log::info!("Received WebSocket reconnected signal");
236 return Some(BitmexWsFrame::Reconnected);
237 }
238
239 log::trace!("Raw websocket message: {text}");
240
241 if Self::is_heartbeat_message(&text) {
242 log::trace!("Ignoring heartbeat control message: {text}");
243 return None;
244 }
245
246 match serde_json::from_str(&text) {
247 Ok(msg) => match &msg {
248 BitmexWsFrame::Welcome {
249 version,
250 heartbeat_enabled,
251 limit,
252 ..
253 } => {
254 log::info!(
255 "Welcome to the BitMEX Realtime API: version={}, heartbeat={}, rate_limit={:?}",
256 version,
257 heartbeat_enabled,
258 limit.as_ref().and_then(|l| l.remaining),
259 );
260 }
261 BitmexWsFrame::Subscription { .. } => return Some(msg),
262 BitmexWsFrame::Error { status, error, .. } => {
263 log::error!(
264 "Received error from BitMEX: status={status}, error={error}",
265 );
266 }
267 _ => return Some(msg),
268 },
269 Err(e) => {
270 log::error!("Failed to parse WebSocket message: {e}: {text}");
271 }
272 }
273 }
274 Message::Binary(msg) => {
275 log::debug!("Raw binary: {msg:?}");
276 }
277 Message::Close(_) => {
278 log::debug!("Received close message, waiting for reconnection");
279 }
280 Message::Ping(data) => {
281 log::trace!("Ping frame with {} bytes (already handled)", data.len());
283 }
284 Message::Pong(data) => {
285 log::trace!("Received pong frame with {} bytes", data.len());
286 }
287 Message::Frame(frame) => {
288 log::debug!("Received raw frame: {frame:?}");
289 }
290 }
291
292 None
293 }
294
295 fn is_heartbeat_message(text: &str) -> bool {
296 let trimmed = text.trim();
297
298 if !trimmed.starts_with('{') || trimmed.len() > 64 {
299 return false;
300 }
301
302 trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
303 }
304
305 fn handle_subscription_ack(
306 &self,
307 success: bool,
308 request: Option<&BitmexHttpRequest>,
309 subscribe: Option<&String>,
310 error: Option<&str>,
311 ) {
312 let topics = Self::topics_from_request(request, subscribe);
313
314 if topics.is_empty() {
315 log::debug!("Subscription acknowledgement without topics");
316 return;
317 }
318
319 for topic in topics {
320 if success {
321 self.subscriptions.confirm_subscribe(topic);
322 log::debug!("Subscription confirmed: topic={topic}");
323 } else {
324 self.subscriptions.mark_failure(topic);
325 let reason = error.unwrap_or("Subscription rejected");
326 log::error!("Subscription failed: topic={topic}, error={reason}");
327 }
328 }
329 }
330
331 fn handle_unsubscribe_ack(
332 &self,
333 success: bool,
334 request: Option<&BitmexHttpRequest>,
335 subscribe: Option<&String>,
336 error: Option<&str>,
337 ) {
338 let topics = Self::topics_from_request(request, subscribe);
339
340 if topics.is_empty() {
341 log::debug!("Unsubscription acknowledgement without topics");
342 return;
343 }
344
345 for topic in topics {
346 if success {
347 log::debug!("Unsubscription confirmed: topic={topic}");
348 self.subscriptions.confirm_unsubscribe(topic);
349 } else {
350 let reason = error.unwrap_or("Unsubscription rejected");
351 log::error!(
352 "Unsubscription failed - restoring subscription: topic={topic}, error={reason}",
353 );
354 self.subscriptions.confirm_unsubscribe(topic); self.subscriptions.mark_subscribe(topic); self.subscriptions.confirm_subscribe(topic); }
359 }
360 }
361
362 fn topics_from_request<'a>(
363 request: Option<&'a BitmexHttpRequest>,
364 fallback: Option<&'a String>,
365 ) -> Vec<&'a str> {
366 if let Some(req) = request
367 && !req.args.is_empty()
368 {
369 return req.args.iter().filter_map(|arg| arg.as_str()).collect();
370 }
371
372 fallback.into_iter().map(|topic| topic.as_str()).collect()
373 }
374
375 fn handle_subscription_message(
376 &self,
377 success: bool,
378 subscribe: Option<&String>,
379 request: Option<&BitmexHttpRequest>,
380 error: Option<&str>,
381 ) -> Option<BitmexWsMessage> {
382 if let Some(req) = request {
383 if req
384 .op
385 .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
386 {
387 if success {
388 log::info!("WebSocket authenticated");
389 self.auth_tracker.succeed();
390 return Some(BitmexWsMessage::Authenticated);
391 } else {
392 let reason = error.unwrap_or("Authentication rejected").to_string();
393 log::error!("WebSocket authentication failed: {reason}");
394 self.auth_tracker.fail(reason);
395 }
396 return None;
397 }
398
399 if req
400 .op
401 .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
402 {
403 self.handle_subscription_ack(success, request, subscribe, error);
404 return None;
405 }
406
407 if req
408 .op
409 .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
410 {
411 self.handle_unsubscribe_ack(success, request, subscribe, error);
412 return None;
413 }
414 }
415
416 if subscribe.is_some() {
417 self.handle_subscription_ack(success, request, subscribe, error);
418 return None;
419 }
420
421 if let Some(error) = error {
422 log::warn!("Unhandled subscription control message: success={success}, error={error}");
423 }
424
425 None
426 }
427}
428
429pub(crate) fn should_retry_bitmex_error(error: &BitmexWsError) -> bool {
431 match error {
432 BitmexWsError::TungsteniteError(_) => true, BitmexWsError::ClientError(msg) => {
434 let msg_lower = msg.to_lowercase();
436 msg_lower.contains("timeout")
437 || msg_lower.contains("timed out")
438 || msg_lower.contains("connection")
439 || msg_lower.contains("network")
440 }
441 _ => false,
442 }
443}
444
445pub(crate) fn create_bitmex_timeout_error(msg: String) -> BitmexWsError {
447 BitmexWsError::ClientError(msg)
448}
449
450#[cfg(test)]
451mod tests {
452 use rstest::rstest;
453
454 use super::*;
455
456 #[rstest]
457 fn test_is_heartbeat_message_detection() {
458 assert!(BitmexWsFeedHandler::is_heartbeat_message(
459 "{\"op\":\"ping\"}"
460 ));
461 assert!(BitmexWsFeedHandler::is_heartbeat_message(
462 "{\"op\":\"pong\"}"
463 ));
464 assert!(!BitmexWsFeedHandler::is_heartbeat_message(
465 "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
466 ));
467 }
468}