Skip to main content

nautilus_bitmex/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for BitMEX.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use nautilus_network::{
24    RECONNECTED,
25    retry::{RetryManager, create_websocket_retry_manager},
26    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
27};
28use tokio_tungstenite::tungstenite::Message;
29
30use super::{
31    enums::{BitmexWsAuthAction, BitmexWsOperation},
32    error::BitmexWsError,
33    messages::{BitmexHttpRequest, BitmexWsFrame, BitmexWsMessage},
34};
35
36/// Commands sent from the outer client to the inner message handler.
37#[derive(Debug)]
38pub enum HandlerCommand {
39    /// Set the WebSocketClient for the handler to use.
40    SetClient(WebSocketClient),
41    /// Disconnect the WebSocket connection.
42    Disconnect,
43    /// Send authentication payload to the WebSocket.
44    Authenticate { payload: String },
45    /// Subscribe to the given topics.
46    Subscribe { topics: Vec<String> },
47    /// Unsubscribe from the given topics.
48    Unsubscribe { topics: Vec<String> },
49}
50
51pub(super) struct BitmexWsFeedHandler {
52    signal: Arc<AtomicBool>,
53    inner: Option<WebSocketClient>,
54    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
55    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
56    out_tx: tokio::sync::mpsc::UnboundedSender<BitmexWsMessage>,
57    auth_tracker: AuthTracker,
58    subscriptions: SubscriptionState,
59    retry_manager: RetryManager<BitmexWsError>,
60}
61
62impl BitmexWsFeedHandler {
63    /// Creates a new [`BitmexWsFeedHandler`] instance.
64    pub(super) fn new(
65        signal: Arc<AtomicBool>,
66        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
67        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
68        out_tx: tokio::sync::mpsc::UnboundedSender<BitmexWsMessage>,
69        auth_tracker: AuthTracker,
70        subscriptions: SubscriptionState,
71    ) -> Self {
72        Self {
73            signal,
74            inner: None,
75            cmd_rx,
76            raw_rx,
77            out_tx,
78            auth_tracker,
79            subscriptions,
80            retry_manager: create_websocket_retry_manager(),
81        }
82    }
83
84    pub(super) fn is_stopped(&self) -> bool {
85        self.signal.load(Ordering::Relaxed)
86    }
87
88    pub(super) fn send(&self, msg: BitmexWsMessage) -> Result<(), ()> {
89        self.out_tx.send(msg).map_err(|_| ())
90    }
91
92    /// Sends a WebSocket message with retry logic.
93    async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
94        if let Some(client) = &self.inner {
95            self.retry_manager
96                .execute_with_retry(
97                    "websocket_send",
98                    || {
99                        let payload = payload.clone();
100                        async move {
101                            client.send_text(payload, None).await.map_err(|e| {
102                                BitmexWsError::ClientError(format!("Send failed: {e}"))
103                            })
104                        }
105                    },
106                    should_retry_bitmex_error,
107                    create_bitmex_timeout_error,
108                )
109                .await
110                .map_err(|e| anyhow::anyhow!("{e}"))
111        } else {
112            Err(anyhow::anyhow!("No active WebSocket client"))
113        }
114    }
115
116    pub(super) async fn next(&mut self) -> Option<BitmexWsMessage> {
117        loop {
118            tokio::select! {
119                Some(cmd) = self.cmd_rx.recv() => {
120                    match cmd {
121                        HandlerCommand::SetClient(client) => {
122                            log::debug!("WebSocketClient received by handler");
123                            self.inner = Some(client);
124                        }
125                        HandlerCommand::Disconnect => {
126                            log::debug!("Disconnect command received");
127
128                            if let Some(client) = self.inner.take() {
129                                client.disconnect().await;
130                            }
131                        }
132                        HandlerCommand::Authenticate { payload } => {
133                            log::debug!("Authenticate command received");
134
135                            if let Err(e) = self.send_with_retry(payload).await {
136                                log::error!("Failed to send authentication after retries: {e}");
137                            }
138                        }
139                        HandlerCommand::Subscribe { topics } => {
140                            for topic in topics {
141                                log::debug!("Subscribing to topic: {topic}");
142                                if let Err(e) = self.send_with_retry(topic.clone()).await {
143                                    log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
144                                }
145                            }
146                        }
147                        HandlerCommand::Unsubscribe { topics } => {
148                            for topic in topics {
149                                log::debug!("Unsubscribing from topic: {topic}");
150                                if let Err(e) = self.send_with_retry(topic.clone()).await {
151                                    log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
152                                }
153                            }
154                        }
155                    }
156                }
157
158                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
159                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
160                        log::debug!("Stop signal received during idle period");
161                        return None;
162                    }
163                }
164
165                msg = self.raw_rx.recv() => {
166                    let msg = match msg {
167                        Some(msg) => msg,
168                        None => {
169                            log::debug!("WebSocket stream closed");
170                            return None;
171                        }
172                    };
173
174                    // Handle ping frames directly for minimal latency
175                    if let Message::Ping(data) = &msg {
176                        log::trace!("Received ping frame with {} bytes", data.len());
177
178                        if let Some(client) = &self.inner
179                            && let Err(e) = client.send_pong(data.to_vec()).await
180                        {
181                            log::warn!("Failed to send pong frame: {e}");
182                        }
183                        continue;
184                    }
185
186                    let event = match Self::parse_raw_message(msg) {
187                        Some(event) => event,
188                        None => continue,
189                    };
190
191                    if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
192                        log::debug!("Stop signal received");
193                        return None;
194                    }
195
196                    match event {
197                        BitmexWsFrame::Reconnected => {
198                            return Some(BitmexWsMessage::Reconnected);
199                        }
200                        BitmexWsFrame::Subscription {
201                            success,
202                            subscribe,
203                            request,
204                            error,
205                        } => {
206                            if let Some(msg) = self.handle_subscription_message(
207                                success,
208                                subscribe.as_ref(),
209                                request.as_ref(),
210                                error.as_deref(),
211                            ) {
212                                return Some(msg);
213                            }
214                        }
215                        BitmexWsFrame::Table(table_msg) => {
216                            return Some(BitmexWsMessage::Table(table_msg));
217                        }
218                        BitmexWsFrame::Welcome { .. } | BitmexWsFrame::Error { .. } => {}
219                    }
220                }
221
222                // Handle shutdown - either channel closed or stream ended
223                else => {
224                    log::debug!("Handler shutting down: stream ended or command channel closed");
225                    return None;
226                }
227            }
228        }
229    }
230
231    fn parse_raw_message(msg: Message) -> Option<BitmexWsFrame> {
232        match msg {
233            Message::Text(text) => {
234                if text == RECONNECTED {
235                    log::info!("Received WebSocket reconnected signal");
236                    return Some(BitmexWsFrame::Reconnected);
237                }
238
239                log::trace!("Raw websocket message: {text}");
240
241                if Self::is_heartbeat_message(&text) {
242                    log::trace!("Ignoring heartbeat control message: {text}");
243                    return None;
244                }
245
246                match serde_json::from_str(&text) {
247                    Ok(msg) => match &msg {
248                        BitmexWsFrame::Welcome {
249                            version,
250                            heartbeat_enabled,
251                            limit,
252                            ..
253                        } => {
254                            log::info!(
255                                "Welcome to the BitMEX Realtime API: version={}, heartbeat={}, rate_limit={:?}",
256                                version,
257                                heartbeat_enabled,
258                                limit.as_ref().and_then(|l| l.remaining),
259                            );
260                        }
261                        BitmexWsFrame::Subscription { .. } => return Some(msg),
262                        BitmexWsFrame::Error { status, error, .. } => {
263                            log::error!(
264                                "Received error from BitMEX: status={status}, error={error}",
265                            );
266                        }
267                        _ => return Some(msg),
268                    },
269                    Err(e) => {
270                        log::error!("Failed to parse WebSocket message: {e}: {text}");
271                    }
272                }
273            }
274            Message::Binary(msg) => {
275                log::debug!("Raw binary: {msg:?}");
276            }
277            Message::Close(_) => {
278                log::debug!("Received close message, waiting for reconnection");
279            }
280            Message::Ping(data) => {
281                // Handled in select! loop before parse_raw_message
282                log::trace!("Ping frame with {} bytes (already handled)", data.len());
283            }
284            Message::Pong(data) => {
285                log::trace!("Received pong frame with {} bytes", data.len());
286            }
287            Message::Frame(frame) => {
288                log::debug!("Received raw frame: {frame:?}");
289            }
290        }
291
292        None
293    }
294
295    fn is_heartbeat_message(text: &str) -> bool {
296        let trimmed = text.trim();
297
298        if !trimmed.starts_with('{') || trimmed.len() > 64 {
299            return false;
300        }
301
302        trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
303    }
304
305    fn handle_subscription_ack(
306        &self,
307        success: bool,
308        request: Option<&BitmexHttpRequest>,
309        subscribe: Option<&String>,
310        error: Option<&str>,
311    ) {
312        let topics = Self::topics_from_request(request, subscribe);
313
314        if topics.is_empty() {
315            log::debug!("Subscription acknowledgement without topics");
316            return;
317        }
318
319        for topic in topics {
320            if success {
321                self.subscriptions.confirm_subscribe(topic);
322                log::debug!("Subscription confirmed: topic={topic}");
323            } else {
324                self.subscriptions.mark_failure(topic);
325                let reason = error.unwrap_or("Subscription rejected");
326                log::error!("Subscription failed: topic={topic}, error={reason}");
327            }
328        }
329    }
330
331    fn handle_unsubscribe_ack(
332        &self,
333        success: bool,
334        request: Option<&BitmexHttpRequest>,
335        subscribe: Option<&String>,
336        error: Option<&str>,
337    ) {
338        let topics = Self::topics_from_request(request, subscribe);
339
340        if topics.is_empty() {
341            log::debug!("Unsubscription acknowledgement without topics");
342            return;
343        }
344
345        for topic in topics {
346            if success {
347                log::debug!("Unsubscription confirmed: topic={topic}");
348                self.subscriptions.confirm_unsubscribe(topic);
349            } else {
350                let reason = error.unwrap_or("Unsubscription rejected");
351                log::error!(
352                    "Unsubscription failed - restoring subscription: topic={topic}, error={reason}",
353                );
354                // Venue rejected unsubscribe, so we're still subscribed. Restore state:
355                self.subscriptions.confirm_unsubscribe(topic); // Clear pending_unsubscribe
356                self.subscriptions.mark_subscribe(topic); // Mark as subscribing
357                self.subscriptions.confirm_subscribe(topic); // Confirm subscription
358            }
359        }
360    }
361
362    fn topics_from_request<'a>(
363        request: Option<&'a BitmexHttpRequest>,
364        fallback: Option<&'a String>,
365    ) -> Vec<&'a str> {
366        if let Some(req) = request
367            && !req.args.is_empty()
368        {
369            return req.args.iter().filter_map(|arg| arg.as_str()).collect();
370        }
371
372        fallback.into_iter().map(|topic| topic.as_str()).collect()
373    }
374
375    fn handle_subscription_message(
376        &self,
377        success: bool,
378        subscribe: Option<&String>,
379        request: Option<&BitmexHttpRequest>,
380        error: Option<&str>,
381    ) -> Option<BitmexWsMessage> {
382        if let Some(req) = request {
383            if req
384                .op
385                .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
386            {
387                if success {
388                    log::info!("WebSocket authenticated");
389                    self.auth_tracker.succeed();
390                    return Some(BitmexWsMessage::Authenticated);
391                } else {
392                    let reason = error.unwrap_or("Authentication rejected").to_string();
393                    log::error!("WebSocket authentication failed: {reason}");
394                    self.auth_tracker.fail(reason);
395                }
396                return None;
397            }
398
399            if req
400                .op
401                .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
402            {
403                self.handle_subscription_ack(success, request, subscribe, error);
404                return None;
405            }
406
407            if req
408                .op
409                .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
410            {
411                self.handle_unsubscribe_ack(success, request, subscribe, error);
412                return None;
413            }
414        }
415
416        if subscribe.is_some() {
417            self.handle_subscription_ack(success, request, subscribe, error);
418            return None;
419        }
420
421        if let Some(error) = error {
422            log::warn!("Unhandled subscription control message: success={success}, error={error}");
423        }
424
425        None
426    }
427}
428
429/// Returns `true` when a BitMEX error should be retried.
430pub(crate) fn should_retry_bitmex_error(error: &BitmexWsError) -> bool {
431    match error {
432        BitmexWsError::TungsteniteError(_) => true, // Network errors are retryable
433        BitmexWsError::ClientError(msg) => {
434            // Retry on timeout and connection errors (case-insensitive)
435            let msg_lower = msg.to_lowercase();
436            msg_lower.contains("timeout")
437                || msg_lower.contains("timed out")
438                || msg_lower.contains("connection")
439                || msg_lower.contains("network")
440        }
441        _ => false,
442    }
443}
444
445/// Creates a timeout error for BitMEX retry logic.
446pub(crate) fn create_bitmex_timeout_error(msg: String) -> BitmexWsError {
447    BitmexWsError::ClientError(msg)
448}
449
450#[cfg(test)]
451mod tests {
452    use rstest::rstest;
453
454    use super::*;
455
456    #[rstest]
457    fn test_is_heartbeat_message_detection() {
458        assert!(BitmexWsFeedHandler::is_heartbeat_message(
459            "{\"op\":\"ping\"}"
460        ));
461        assert!(BitmexWsFeedHandler::is_heartbeat_message(
462            "{\"op\":\"pong\"}"
463        ));
464        assert!(!BitmexWsFeedHandler::is_heartbeat_message(
465            "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
466        ));
467    }
468}