Skip to main content

nautilus_bybit/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Bybit.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use nautilus_network::{
24    retry::{RetryManager, create_websocket_retry_manager},
25    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
26};
27use tokio_tungstenite::tungstenite::Message;
28
29use super::{
30    enums::BybitWsOperation,
31    error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
32    messages::{
33        BybitWebSocketError, BybitWsFrame, BybitWsMessage, BybitWsResponse, BybitWsSubscriptionMsg,
34    },
35    parse::parse_bybit_ws_frame,
36};
37
38/// Commands sent from the outer client to the inner message handler.
39#[derive(Debug)]
40pub enum HandlerCommand {
41    SetClient(WebSocketClient),
42    Disconnect,
43    Authenticate { payload: String },
44    Subscribe { topics: Vec<String> },
45    Unsubscribe { topics: Vec<String> },
46    SendText { payload: String },
47}
48
49pub(super) struct BybitWsFeedHandler {
50    signal: Arc<AtomicBool>,
51    inner: Option<WebSocketClient>,
52    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
53    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
54    auth_tracker: AuthTracker,
55    subscriptions: SubscriptionState,
56    retry_manager: RetryManager<BybitWsError>,
57}
58
59impl BybitWsFeedHandler {
60    /// Creates a new [`BybitWsFeedHandler`] instance.
61    pub(super) fn new(
62        signal: Arc<AtomicBool>,
63        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
64        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
65        auth_tracker: AuthTracker,
66        subscriptions: SubscriptionState,
67    ) -> Self {
68        Self {
69            signal,
70            inner: None,
71            cmd_rx,
72            raw_rx,
73            auth_tracker,
74            subscriptions,
75            retry_manager: create_websocket_retry_manager(),
76        }
77    }
78
79    pub(super) fn is_stopped(&self) -> bool {
80        self.signal.load(Ordering::Relaxed)
81    }
82
83    /// Sends a WebSocket message with retry logic.
84    async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
85        if let Some(client) = &self.inner {
86            self.retry_manager
87                .execute_with_retry(
88                    "websocket_send",
89                    || {
90                        let payload = payload.clone();
91                        async move {
92                            client
93                                .send_text(payload, None)
94                                .await
95                                .map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
96                        }
97                    },
98                    should_retry_bybit_error,
99                    create_bybit_timeout_error,
100                )
101                .await
102        } else {
103            Err(BybitWsError::ClientError(
104                "No active WebSocket client".to_string(),
105            ))
106        }
107    }
108
109    pub(super) async fn next(&mut self) -> Option<BybitWsMessage> {
110        loop {
111            tokio::select! {
112                Some(cmd) = self.cmd_rx.recv() => {
113                    match cmd {
114                        HandlerCommand::SetClient(client) => {
115                            log::debug!("WebSocketClient received by handler");
116                            self.inner = Some(client);
117                        }
118                        HandlerCommand::Disconnect => {
119                            log::debug!("Disconnect command received");
120
121                            if let Some(client) = self.inner.take() {
122                                client.disconnect().await;
123                            }
124                        }
125                        HandlerCommand::Authenticate { payload } => {
126                            log::debug!("Authenticate command received");
127
128                            if let Err(e) = self.send_with_retry(payload).await {
129                                log::error!("Failed to send authentication after retries: {e}");
130                            }
131                        }
132                        HandlerCommand::Subscribe { topics } => {
133                            for topic in topics {
134                                log::debug!("Subscribing to topic: topic={topic}");
135                                if let Err(e) = self.send_with_retry(topic.clone()).await {
136                                    log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
137                                }
138                            }
139                        }
140                        HandlerCommand::Unsubscribe { topics } => {
141                            for topic in topics {
142                                log::debug!("Unsubscribing from topic: topic={topic}");
143                                if let Err(e) = self.send_with_retry(topic.clone()).await {
144                                    log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
145                                }
146                            }
147                        }
148                        HandlerCommand::SendText { payload } => {
149                            if let Err(e) = self.send_with_retry(payload).await {
150                                log::error!("Error sending text with retry: {e}");
151                            }
152                        }
153                    }
154                }
155
156                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
157                    if self.signal.load(Ordering::Relaxed) {
158                        log::debug!("Stop signal received during idle period");
159                        return None;
160                    }
161                }
162
163                msg = self.raw_rx.recv() => {
164                    let msg = match msg {
165                        Some(msg) => msg,
166                        None => {
167                            log::debug!("WebSocket stream closed");
168                            return None;
169                        }
170                    };
171
172                    if let Message::Ping(data) = &msg {
173                        log::trace!("Received ping frame with {} bytes", data.len());
174
175                        if let Some(client) = &self.inner
176                            && let Err(e) = client.send_pong(data.to_vec()).await
177                        {
178                            log::warn!("Failed to send pong frame: error={e}");
179                        }
180                        continue;
181                    }
182
183                    let frame = match Self::parse_raw_frame(msg) {
184                        Some(frame) => frame,
185                        None => continue,
186                    };
187
188                    if self.signal.load(Ordering::Relaxed) {
189                        log::debug!("Stop signal received");
190                        return None;
191                    }
192
193                    match frame {
194                        BybitWsFrame::Subscription(ref sub_msg) => {
195                            self.handle_subscription_ack(sub_msg);
196                        }
197                        BybitWsFrame::Auth(auth_response) => {
198                            let is_success = auth_response.success.unwrap_or(false)
199                                || (auth_response.ret_code == Some(0));
200
201                            if is_success {
202                                self.auth_tracker.succeed();
203                                log::info!("WebSocket authenticated");
204                            } else {
205                                let error_msg = auth_response
206                                    .ret_msg
207                                    .as_deref()
208                                    .unwrap_or("Authentication rejected");
209                                self.auth_tracker.fail(error_msg);
210                                log::error!("WebSocket authentication failed: error={error_msg}");
211                            }
212                            return Some(BybitWsMessage::Auth(auth_response));
213                        }
214                        BybitWsFrame::ErrorResponse(ref resp) => {
215                            // Failed subscription/unsubscription ACKs arrive as
216                            // ErrorResponse when success=false. Route them through
217                            // the subscription state machine when the op field is
218                            // present, otherwise forward as a generic error.
219                            if let Some(op) = &resp.op {
220                                if *op == BybitWsOperation::Subscribe
221                                    || *op == BybitWsOperation::Unsubscribe
222                                {
223                                    self.handle_subscription_error(resp);
224                                } else {
225                                    let error = BybitWebSocketError::from_response(resp);
226                                    return Some(BybitWsMessage::Error(error));
227                                }
228                            } else {
229                                let error = BybitWebSocketError::from_response(resp);
230                                return Some(BybitWsMessage::Error(error));
231                            }
232                        }
233                        BybitWsFrame::OrderResponse(resp) => {
234                            return Some(BybitWsMessage::OrderResponse(resp));
235                        }
236                        BybitWsFrame::Orderbook(msg) => {
237                            return Some(BybitWsMessage::Orderbook(msg));
238                        }
239                        BybitWsFrame::Trade(msg) => {
240                            return Some(BybitWsMessage::Trade(msg));
241                        }
242                        BybitWsFrame::Kline(msg) => {
243                            return Some(BybitWsMessage::Kline(msg));
244                        }
245                        BybitWsFrame::TickerLinear(msg) => {
246                            return Some(BybitWsMessage::TickerLinear(msg));
247                        }
248                        BybitWsFrame::TickerOption(msg) => {
249                            return Some(BybitWsMessage::TickerOption(msg));
250                        }
251                        BybitWsFrame::AccountOrder(msg) => {
252                            return Some(BybitWsMessage::AccountOrder(msg));
253                        }
254                        BybitWsFrame::AccountExecution(msg) => {
255                            return Some(BybitWsMessage::AccountExecution(msg));
256                        }
257                        BybitWsFrame::AccountWallet(msg) => {
258                            return Some(BybitWsMessage::AccountWallet(msg));
259                        }
260                        BybitWsFrame::AccountPosition(msg) => {
261                            return Some(BybitWsMessage::AccountPosition(msg));
262                        }
263                        BybitWsFrame::Reconnected => {
264                            self.auth_tracker.invalidate();
265                            return Some(BybitWsMessage::Reconnected);
266                        }
267                        BybitWsFrame::Unknown(value) => {
268                            log::debug!("Unknown WebSocket frame: {value}");
269                        }
270                    }
271                }
272            }
273        }
274    }
275
276    fn handle_subscription_ack(&self, sub_msg: &BybitWsSubscriptionMsg) {
277        match sub_msg.op {
278            BybitWsOperation::Subscribe => {
279                if sub_msg.success {
280                    if let Some(topic) = &sub_msg.req_id {
281                        self.subscriptions.confirm_subscribe(topic);
282                        log::debug!("Subscription confirmed: topic={topic}");
283                    } else {
284                        // No req_id, fall back to confirming all pending
285                        for topic in self.subscriptions.pending_subscribe_topics() {
286                            self.subscriptions.confirm_subscribe(&topic);
287                            log::debug!("Subscription confirmed (bulk): topic={topic}");
288                        }
289                    }
290                } else if let Some(topic) = &sub_msg.req_id {
291                    self.subscriptions.mark_failure(topic);
292                    log::warn!(
293                        "Subscription failed: topic={topic}, error={:?}",
294                        sub_msg.ret_msg
295                    );
296                } else {
297                    for topic in self.subscriptions.pending_subscribe_topics() {
298                        self.subscriptions.mark_failure(&topic);
299                        log::warn!(
300                            "Subscription failed (bulk): topic={topic}, error={:?}",
301                            sub_msg.ret_msg
302                        );
303                    }
304                }
305            }
306            BybitWsOperation::Unsubscribe => {
307                if sub_msg.success {
308                    if let Some(topic) = &sub_msg.req_id {
309                        self.subscriptions.confirm_unsubscribe(topic);
310                        log::debug!("Unsubscription confirmed: topic={topic}");
311                    } else {
312                        for topic in self.subscriptions.pending_unsubscribe_topics() {
313                            self.subscriptions.confirm_unsubscribe(&topic);
314                            log::debug!("Unsubscription confirmed (bulk): topic={topic}");
315                        }
316                    }
317                } else {
318                    let topic_desc = sub_msg.req_id.as_deref().unwrap_or("unknown");
319                    log::warn!(
320                        "Unsubscription failed: topic={topic_desc}, error={:?}",
321                        sub_msg.ret_msg
322                    );
323                }
324            }
325            _ => {}
326        }
327    }
328
329    fn handle_subscription_error(&self, resp: &BybitWsResponse) {
330        let topic = resp.req_id.as_deref().unwrap_or("unknown");
331        let error_msg = resp.ret_msg.as_deref().unwrap_or("unknown error");
332
333        match resp.op {
334            Some(BybitWsOperation::Subscribe) => {
335                // Duplicate subscribe is harmless: the topic is active on the
336                // venue, so confirm it instead of looping retries every reconnect.
337                if is_already_subscribed_error(error_msg)
338                    && let Some(ref req_id) = resp.req_id
339                {
340                    self.subscriptions.confirm_subscribe(req_id);
341                    log::debug!("Subscription duplicate ignored: topic={topic}, error={error_msg}");
342                    return;
343                }
344
345                if let Some(ref req_id) = resp.req_id {
346                    self.subscriptions.mark_failure(req_id);
347                } else {
348                    for t in self.subscriptions.pending_subscribe_topics() {
349                        self.subscriptions.mark_failure(&t);
350                    }
351                }
352                log::warn!("Subscription error: topic={topic}, error={error_msg}");
353            }
354            Some(BybitWsOperation::Unsubscribe) => {
355                log::warn!("Unsubscription error: topic={topic}, error={error_msg}");
356            }
357            _ => {}
358        }
359    }
360
361    fn parse_raw_frame(msg: Message) -> Option<BybitWsFrame> {
362        match msg {
363            Message::Text(text) => {
364                if text == nautilus_network::RECONNECTED {
365                    log::info!("Received WebSocket reconnected signal");
366                    return Some(BybitWsFrame::Reconnected);
367                }
368
369                if text.trim().eq_ignore_ascii_case("pong") {
370                    return None;
371                }
372
373                log::trace!("Raw websocket message: {text}");
374
375                let value: serde_json::Value = match serde_json::from_str(&text) {
376                    Ok(v) => v,
377                    Err(e) => {
378                        log::error!("Failed to parse WebSocket message: {e}: {text}");
379                        return None;
380                    }
381                };
382
383                if value
384                    .get("op")
385                    .and_then(serde_json::Value::as_str)
386                    .is_some_and(|op| op == BybitWsOperation::Pong.as_ref())
387                {
388                    return None;
389                }
390
391                Some(parse_bybit_ws_frame(value))
392            }
393            Message::Binary(msg) => {
394                log::debug!("Raw binary: {msg:?}");
395                None
396            }
397            Message::Close(_) => {
398                log::debug!("Received close message, waiting for reconnection");
399                None
400            }
401            _ => None,
402        }
403    }
404}
405
406fn is_already_subscribed_error(error_msg: &str) -> bool {
407    error_msg
408        .to_ascii_lowercase()
409        .contains("already subscribed")
410}
411
412#[cfg(test)]
413mod tests {
414    use rstest::rstest;
415    use ustr::Ustr;
416
417    use super::*;
418    use crate::common::{consts::BYBIT_WS_TOPIC_DELIMITER, testing::load_test_json};
419
420    fn create_test_handler() -> BybitWsFeedHandler {
421        let signal = Arc::new(AtomicBool::new(false));
422        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
423        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
424        let auth_tracker = AuthTracker::new();
425        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
426
427        BybitWsFeedHandler::new(signal, cmd_rx, raw_rx, auth_tracker, subscriptions)
428    }
429
430    fn load_value(fixture: &str) -> serde_json::Value {
431        let json = load_test_json(fixture);
432        serde_json::from_str(&json).unwrap()
433    }
434
435    #[rstest]
436    fn test_handler_initializes() {
437        let _handler = create_test_handler();
438    }
439
440    #[rstest]
441    fn test_parse_frame_auth_success() {
442        let value = load_value("ws_auth_success.json");
443        let frame = parse_bybit_ws_frame(value);
444        match frame {
445            BybitWsFrame::Auth(auth) => {
446                assert_eq!(auth.conn_id.as_deref(), Some("cejreaspqfm9se7usbrg-2xh"));
447                assert_eq!(auth.ret_code, Some(0));
448                assert_eq!(auth.success, Some(true));
449            }
450            other => panic!("Expected Auth, was {other:?}"),
451        }
452    }
453
454    #[rstest]
455    fn test_parse_frame_auth_failure() {
456        let value = load_value("ws_auth_failure.json");
457        let frame = parse_bybit_ws_frame(value);
458        match frame {
459            BybitWsFrame::ErrorResponse(resp) => {
460                assert_eq!(resp.ret_code, Some(10003));
461                assert_eq!(resp.ret_msg.as_deref(), Some("Invalid apikey"));
462            }
463            other => panic!("Expected ErrorResponse, was {other:?}"),
464        }
465    }
466
467    #[rstest]
468    fn test_parse_frame_subscription_ack() {
469        let value = load_value("ws_subscription_ack.json");
470        let frame = parse_bybit_ws_frame(value);
471        match frame {
472            BybitWsFrame::Subscription(sub) => {
473                assert!(sub.success);
474                assert_eq!(sub.op, BybitWsOperation::Subscribe);
475                assert_eq!(sub.req_id.as_deref(), Some("sub-orderbook-1"));
476            }
477            other => panic!("Expected Subscription, was {other:?}"),
478        }
479    }
480
481    #[rstest]
482    fn test_parse_frame_subscription_failure() {
483        let value = load_value("ws_subscription_failure.json");
484        let frame = parse_bybit_ws_frame(value);
485        match frame {
486            BybitWsFrame::ErrorResponse(resp) => {
487                assert_eq!(
488                    resp.ret_msg.as_deref(),
489                    Some("Invalid topic: invalid.topic.BTCUSDT")
490                );
491            }
492            other => panic!("Expected ErrorResponse, was {other:?}"),
493        }
494    }
495
496    #[rstest]
497    fn test_parse_frame_order_response() {
498        let value = load_value("ws_order_response.json");
499        let frame = parse_bybit_ws_frame(value);
500        match frame {
501            BybitWsFrame::OrderResponse(resp) => {
502                assert_eq!(resp.op.as_str(), "order.create");
503                assert_eq!(resp.ret_code, 0);
504                assert_eq!(resp.ret_msg, "OK");
505            }
506            other => panic!("Expected OrderResponse, was {other:?}"),
507        }
508    }
509
510    #[rstest]
511    fn test_parse_frame_orderbook() {
512        let value = load_value("ws_orderbook_snapshot.json");
513        let frame = parse_bybit_ws_frame(value);
514        assert!(
515            matches!(frame, BybitWsFrame::Orderbook(_)),
516            "Expected Orderbook, was {frame:?}"
517        );
518    }
519
520    #[rstest]
521    fn test_parse_frame_trade() {
522        let value = load_value("ws_public_trade.json");
523        let frame = parse_bybit_ws_frame(value);
524        assert!(
525            matches!(frame, BybitWsFrame::Trade(_)),
526            "Expected Trade, was {frame:?}"
527        );
528    }
529
530    #[rstest]
531    fn test_parse_frame_kline() {
532        let value = load_value("ws_kline.json");
533        let frame = parse_bybit_ws_frame(value);
534        assert!(
535            matches!(frame, BybitWsFrame::Kline(_)),
536            "Expected Kline, was {frame:?}"
537        );
538    }
539
540    #[rstest]
541    fn test_parse_frame_ticker_linear() {
542        let value = load_value("ws_ticker_linear.json");
543        let frame = parse_bybit_ws_frame(value);
544        assert!(
545            matches!(frame, BybitWsFrame::TickerLinear(_)),
546            "Expected TickerLinear, was {frame:?}"
547        );
548    }
549
550    #[rstest]
551    fn test_parse_frame_ticker_option() {
552        let value = load_value("ws_ticker_option.json");
553        let frame = parse_bybit_ws_frame(value);
554        assert!(
555            matches!(frame, BybitWsFrame::TickerOption(_)),
556            "Expected TickerOption, was {frame:?}"
557        );
558    }
559
560    #[rstest]
561    fn test_parse_frame_account_order() {
562        let value = load_value("ws_account_order.json");
563        let frame = parse_bybit_ws_frame(value);
564        assert!(
565            matches!(frame, BybitWsFrame::AccountOrder(_)),
566            "Expected AccountOrder, was {frame:?}"
567        );
568    }
569
570    #[rstest]
571    fn test_parse_frame_account_execution() {
572        let value = load_value("ws_account_execution.json");
573        let frame = parse_bybit_ws_frame(value);
574        assert!(
575            matches!(frame, BybitWsFrame::AccountExecution(_)),
576            "Expected AccountExecution, was {frame:?}"
577        );
578    }
579
580    #[rstest]
581    fn test_parse_frame_account_wallet() {
582        let value = load_value("ws_account_wallet.json");
583        let frame = parse_bybit_ws_frame(value);
584        assert!(
585            matches!(frame, BybitWsFrame::AccountWallet(_)),
586            "Expected AccountWallet, was {frame:?}"
587        );
588    }
589
590    #[rstest]
591    fn test_parse_frame_account_position() {
592        let value = load_value("ws_account_position.json");
593        let frame = parse_bybit_ws_frame(value);
594        assert!(
595            matches!(frame, BybitWsFrame::AccountPosition(_)),
596            "Expected AccountPosition, was {frame:?}"
597        );
598    }
599
600    #[rstest]
601    fn test_parse_frame_unknown_message() {
602        let value: serde_json::Value = serde_json::json!({"foo": "bar"});
603        let frame = parse_bybit_ws_frame(value);
604        assert!(
605            matches!(frame, BybitWsFrame::Unknown(_)),
606            "Expected Unknown, was {frame:?}"
607        );
608    }
609
610    #[rstest]
611    fn test_parse_raw_reconnected_signal() {
612        let msg = Message::Text(nautilus_network::RECONNECTED.to_string().into());
613        let result = BybitWsFeedHandler::parse_raw_frame(msg);
614        assert!(
615            matches!(result, Some(BybitWsFrame::Reconnected)),
616            "Expected Some(Reconnected), was {result:?}"
617        );
618    }
619
620    #[rstest]
621    fn test_parse_raw_pong_text() {
622        let msg = Message::Text("pong".into());
623        let result = BybitWsFeedHandler::parse_raw_frame(msg);
624        assert!(result.is_none(), "Expected None for pong, was {result:?}");
625    }
626
627    #[rstest]
628    fn test_parse_raw_json_pong_message() {
629        let msg = Message::Text(
630            r#"{"args":["1777226678908"],"conn_id":"yzr7jz02gws1vh60mk5m-hxqdp","op":"pong"}"#
631                .into(),
632        );
633        let result = BybitWsFeedHandler::parse_raw_frame(msg);
634        assert!(
635            result.is_none(),
636            "Expected None for JSON pong, was {result:?}"
637        );
638    }
639
640    #[rstest]
641    fn test_parse_raw_valid_json() {
642        let json = load_test_json("ws_public_trade.json");
643        let msg = Message::Text(json.into());
644        let result = BybitWsFeedHandler::parse_raw_frame(msg);
645        assert!(
646            matches!(result, Some(BybitWsFrame::Trade(_))),
647            "Expected Some(Trade), was {result:?}"
648        );
649    }
650
651    #[rstest]
652    fn test_parse_raw_invalid_json() {
653        let msg = Message::Text("not valid json".into());
654        let result = BybitWsFeedHandler::parse_raw_frame(msg);
655        assert!(
656            result.is_none(),
657            "Expected None for invalid JSON, was {result:?}"
658        );
659    }
660
661    #[rstest]
662    fn test_parse_raw_binary_message() {
663        let msg = Message::Binary(vec![0x01, 0x02].into());
664        let result = BybitWsFeedHandler::parse_raw_frame(msg);
665        assert!(result.is_none(), "Expected None for binary, was {result:?}");
666    }
667
668    #[rstest]
669    fn test_subscription_ack_with_req_id_confirms_only_that_topic() {
670        let handler = create_test_handler();
671        handler.subscriptions.mark_subscribe("orderbook.50.BTCUSDT");
672        handler.subscriptions.mark_subscribe("publicTrade.BTCUSDT");
673
674        let ack = BybitWsSubscriptionMsg {
675            success: true,
676            op: BybitWsOperation::Subscribe,
677            conn_id: None,
678            req_id: Some("orderbook.50.BTCUSDT".to_string()),
679            ret_msg: None,
680        };
681
682        handler.handle_subscription_ack(&ack);
683
684        // Only orderbook should be confirmed, trade stays pending
685        assert!(
686            handler
687                .subscriptions
688                .pending_subscribe_topics()
689                .contains(&"publicTrade.BTCUSDT".to_string())
690        );
691        assert!(
692            !handler
693                .subscriptions
694                .pending_subscribe_topics()
695                .contains(&"orderbook.50.BTCUSDT".to_string())
696        );
697    }
698
699    #[rstest]
700    fn test_subscription_failure_with_req_id_marks_only_that_topic() {
701        let handler = create_test_handler();
702        handler.subscriptions.mark_subscribe("orderbook.50.BTCUSDT");
703        handler.subscriptions.mark_subscribe("publicTrade.BTCUSDT");
704
705        let ack = BybitWsSubscriptionMsg {
706            success: false,
707            op: BybitWsOperation::Subscribe,
708            conn_id: None,
709            req_id: Some("orderbook.50.BTCUSDT".to_string()),
710            ret_msg: Some("Invalid topic".to_string()),
711        };
712
713        handler.handle_subscription_ack(&ack);
714
715        // Orderbook should be marked as failed (back to pending for retry)
716        // Trade should remain pending (unaffected)
717        let pending = handler.subscriptions.pending_subscribe_topics();
718        assert!(pending.contains(&"orderbook.50.BTCUSDT".to_string()));
719        assert!(pending.contains(&"publicTrade.BTCUSDT".to_string()));
720    }
721
722    #[rstest]
723    fn test_error_response_with_subscribe_op_triggers_mark_failure() {
724        let handler = create_test_handler();
725        handler
726            .subscriptions
727            .mark_subscribe("invalid.topic.BTCUSDT");
728
729        let resp = BybitWsResponse {
730            op: Some(BybitWsOperation::Subscribe),
731            topic: None,
732            success: Some(false),
733            conn_id: None,
734            req_id: Some("invalid.topic.BTCUSDT".to_string()),
735            ret_code: Some(10001),
736            ret_msg: Some("Invalid topic".to_string()),
737        };
738
739        handler.handle_subscription_error(&resp);
740
741        // Topic should still be in pending (mark_failure moves confirmed -> pending)
742        let pending = handler.subscriptions.pending_subscribe_topics();
743        assert!(pending.contains(&"invalid.topic.BTCUSDT".to_string()));
744    }
745
746    #[rstest]
747    fn test_already_subscribed_error_confirms_topic() {
748        let handler = create_test_handler();
749        handler.subscriptions.mark_subscribe("tickers.ETHUSDT");
750
751        let resp = BybitWsResponse {
752            op: Some(BybitWsOperation::Subscribe),
753            topic: None,
754            success: Some(false),
755            conn_id: None,
756            req_id: Some("tickers.ETHUSDT".to_string()),
757            ret_code: Some(10001),
758            ret_msg: Some("error:already subscribed,topic:tickers.ETHUSDT".to_string()),
759        };
760
761        handler.handle_subscription_error(&resp);
762
763        let pending = handler.subscriptions.pending_subscribe_topics();
764        assert!(!pending.contains(&"tickers.ETHUSDT".to_string()));
765        let symbols = handler.subscriptions.confirmed();
766        let entry = symbols
767            .get(&Ustr::from("tickers"))
768            .expect("channel present");
769        assert!(entry.contains(&Ustr::from("ETHUSDT")));
770    }
771
772    #[rstest]
773    fn test_subscription_ack_without_req_id_confirms_all_pending() {
774        let handler = create_test_handler();
775        handler.subscriptions.mark_subscribe("orderbook.50.BTCUSDT");
776        handler.subscriptions.mark_subscribe("publicTrade.BTCUSDT");
777
778        let ack = BybitWsSubscriptionMsg {
779            success: true,
780            op: BybitWsOperation::Subscribe,
781            conn_id: None,
782            req_id: None,
783            ret_msg: None,
784        };
785
786        handler.handle_subscription_ack(&ack);
787
788        // Both should be confirmed when no req_id
789        assert!(handler.subscriptions.pending_subscribe_topics().is_empty());
790    }
791}