1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use nautilus_network::{
24 retry::{RetryManager, create_websocket_retry_manager},
25 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
26};
27use tokio_tungstenite::tungstenite::Message;
28
29use super::{
30 enums::BybitWsOperation,
31 error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
32 messages::{
33 BybitWebSocketError, BybitWsFrame, BybitWsMessage, BybitWsResponse, BybitWsSubscriptionMsg,
34 },
35 parse::parse_bybit_ws_frame,
36};
37
38#[derive(Debug)]
40pub enum HandlerCommand {
41 SetClient(WebSocketClient),
42 Disconnect,
43 Authenticate { payload: String },
44 Subscribe { topics: Vec<String> },
45 Unsubscribe { topics: Vec<String> },
46 SendText { payload: String },
47}
48
49pub(super) struct BybitWsFeedHandler {
50 signal: Arc<AtomicBool>,
51 inner: Option<WebSocketClient>,
52 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
53 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
54 auth_tracker: AuthTracker,
55 subscriptions: SubscriptionState,
56 retry_manager: RetryManager<BybitWsError>,
57}
58
59impl BybitWsFeedHandler {
60 pub(super) fn new(
62 signal: Arc<AtomicBool>,
63 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
64 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
65 auth_tracker: AuthTracker,
66 subscriptions: SubscriptionState,
67 ) -> Self {
68 Self {
69 signal,
70 inner: None,
71 cmd_rx,
72 raw_rx,
73 auth_tracker,
74 subscriptions,
75 retry_manager: create_websocket_retry_manager(),
76 }
77 }
78
79 pub(super) fn is_stopped(&self) -> bool {
80 self.signal.load(Ordering::Relaxed)
81 }
82
83 async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
85 if let Some(client) = &self.inner {
86 self.retry_manager
87 .execute_with_retry(
88 "websocket_send",
89 || {
90 let payload = payload.clone();
91 async move {
92 client
93 .send_text(payload, None)
94 .await
95 .map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
96 }
97 },
98 should_retry_bybit_error,
99 create_bybit_timeout_error,
100 )
101 .await
102 } else {
103 Err(BybitWsError::ClientError(
104 "No active WebSocket client".to_string(),
105 ))
106 }
107 }
108
109 pub(super) async fn next(&mut self) -> Option<BybitWsMessage> {
110 loop {
111 tokio::select! {
112 Some(cmd) = self.cmd_rx.recv() => {
113 match cmd {
114 HandlerCommand::SetClient(client) => {
115 log::debug!("WebSocketClient received by handler");
116 self.inner = Some(client);
117 }
118 HandlerCommand::Disconnect => {
119 log::debug!("Disconnect command received");
120
121 if let Some(client) = self.inner.take() {
122 client.disconnect().await;
123 }
124 }
125 HandlerCommand::Authenticate { payload } => {
126 log::debug!("Authenticate command received");
127
128 if let Err(e) = self.send_with_retry(payload).await {
129 log::error!("Failed to send authentication after retries: {e}");
130 }
131 }
132 HandlerCommand::Subscribe { topics } => {
133 for topic in topics {
134 log::debug!("Subscribing to topic: topic={topic}");
135 if let Err(e) = self.send_with_retry(topic.clone()).await {
136 log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
137 }
138 }
139 }
140 HandlerCommand::Unsubscribe { topics } => {
141 for topic in topics {
142 log::debug!("Unsubscribing from topic: topic={topic}");
143 if let Err(e) = self.send_with_retry(topic.clone()).await {
144 log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
145 }
146 }
147 }
148 HandlerCommand::SendText { payload } => {
149 if let Err(e) = self.send_with_retry(payload).await {
150 log::error!("Error sending text with retry: {e}");
151 }
152 }
153 }
154 }
155
156 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
157 if self.signal.load(Ordering::Relaxed) {
158 log::debug!("Stop signal received during idle period");
159 return None;
160 }
161 }
162
163 msg = self.raw_rx.recv() => {
164 let msg = match msg {
165 Some(msg) => msg,
166 None => {
167 log::debug!("WebSocket stream closed");
168 return None;
169 }
170 };
171
172 if let Message::Ping(data) = &msg {
173 log::trace!("Received ping frame with {} bytes", data.len());
174
175 if let Some(client) = &self.inner
176 && let Err(e) = client.send_pong(data.to_vec()).await
177 {
178 log::warn!("Failed to send pong frame: error={e}");
179 }
180 continue;
181 }
182
183 let frame = match Self::parse_raw_frame(msg) {
184 Some(frame) => frame,
185 None => continue,
186 };
187
188 if self.signal.load(Ordering::Relaxed) {
189 log::debug!("Stop signal received");
190 return None;
191 }
192
193 match frame {
194 BybitWsFrame::Subscription(ref sub_msg) => {
195 self.handle_subscription_ack(sub_msg);
196 }
197 BybitWsFrame::Auth(auth_response) => {
198 let is_success = auth_response.success.unwrap_or(false)
199 || (auth_response.ret_code == Some(0));
200
201 if is_success {
202 self.auth_tracker.succeed();
203 log::info!("WebSocket authenticated");
204 } else {
205 let error_msg = auth_response
206 .ret_msg
207 .as_deref()
208 .unwrap_or("Authentication rejected");
209 self.auth_tracker.fail(error_msg);
210 log::error!("WebSocket authentication failed: error={error_msg}");
211 }
212 return Some(BybitWsMessage::Auth(auth_response));
213 }
214 BybitWsFrame::ErrorResponse(ref resp) => {
215 if let Some(op) = &resp.op {
220 if *op == BybitWsOperation::Subscribe
221 || *op == BybitWsOperation::Unsubscribe
222 {
223 self.handle_subscription_error(resp);
224 } else {
225 let error = BybitWebSocketError::from_response(resp);
226 return Some(BybitWsMessage::Error(error));
227 }
228 } else {
229 let error = BybitWebSocketError::from_response(resp);
230 return Some(BybitWsMessage::Error(error));
231 }
232 }
233 BybitWsFrame::OrderResponse(resp) => {
234 return Some(BybitWsMessage::OrderResponse(resp));
235 }
236 BybitWsFrame::Orderbook(msg) => {
237 return Some(BybitWsMessage::Orderbook(msg));
238 }
239 BybitWsFrame::Trade(msg) => {
240 return Some(BybitWsMessage::Trade(msg));
241 }
242 BybitWsFrame::Kline(msg) => {
243 return Some(BybitWsMessage::Kline(msg));
244 }
245 BybitWsFrame::TickerLinear(msg) => {
246 return Some(BybitWsMessage::TickerLinear(msg));
247 }
248 BybitWsFrame::TickerOption(msg) => {
249 return Some(BybitWsMessage::TickerOption(msg));
250 }
251 BybitWsFrame::AccountOrder(msg) => {
252 return Some(BybitWsMessage::AccountOrder(msg));
253 }
254 BybitWsFrame::AccountExecution(msg) => {
255 return Some(BybitWsMessage::AccountExecution(msg));
256 }
257 BybitWsFrame::AccountWallet(msg) => {
258 return Some(BybitWsMessage::AccountWallet(msg));
259 }
260 BybitWsFrame::AccountPosition(msg) => {
261 return Some(BybitWsMessage::AccountPosition(msg));
262 }
263 BybitWsFrame::Reconnected => {
264 self.auth_tracker.invalidate();
265 return Some(BybitWsMessage::Reconnected);
266 }
267 BybitWsFrame::Unknown(value) => {
268 log::debug!("Unknown WebSocket frame: {value}");
269 }
270 }
271 }
272 }
273 }
274 }
275
276 fn handle_subscription_ack(&self, sub_msg: &BybitWsSubscriptionMsg) {
277 match sub_msg.op {
278 BybitWsOperation::Subscribe => {
279 if sub_msg.success {
280 if let Some(topic) = &sub_msg.req_id {
281 self.subscriptions.confirm_subscribe(topic);
282 log::debug!("Subscription confirmed: topic={topic}");
283 } else {
284 for topic in self.subscriptions.pending_subscribe_topics() {
286 self.subscriptions.confirm_subscribe(&topic);
287 log::debug!("Subscription confirmed (bulk): topic={topic}");
288 }
289 }
290 } else if let Some(topic) = &sub_msg.req_id {
291 self.subscriptions.mark_failure(topic);
292 log::warn!(
293 "Subscription failed: topic={topic}, error={:?}",
294 sub_msg.ret_msg
295 );
296 } else {
297 for topic in self.subscriptions.pending_subscribe_topics() {
298 self.subscriptions.mark_failure(&topic);
299 log::warn!(
300 "Subscription failed (bulk): topic={topic}, error={:?}",
301 sub_msg.ret_msg
302 );
303 }
304 }
305 }
306 BybitWsOperation::Unsubscribe => {
307 if sub_msg.success {
308 if let Some(topic) = &sub_msg.req_id {
309 self.subscriptions.confirm_unsubscribe(topic);
310 log::debug!("Unsubscription confirmed: topic={topic}");
311 } else {
312 for topic in self.subscriptions.pending_unsubscribe_topics() {
313 self.subscriptions.confirm_unsubscribe(&topic);
314 log::debug!("Unsubscription confirmed (bulk): topic={topic}");
315 }
316 }
317 } else {
318 let topic_desc = sub_msg.req_id.as_deref().unwrap_or("unknown");
319 log::warn!(
320 "Unsubscription failed: topic={topic_desc}, error={:?}",
321 sub_msg.ret_msg
322 );
323 }
324 }
325 _ => {}
326 }
327 }
328
329 fn handle_subscription_error(&self, resp: &BybitWsResponse) {
330 let topic = resp.req_id.as_deref().unwrap_or("unknown");
331 let error_msg = resp.ret_msg.as_deref().unwrap_or("unknown error");
332
333 match resp.op {
334 Some(BybitWsOperation::Subscribe) => {
335 if is_already_subscribed_error(error_msg)
338 && let Some(ref req_id) = resp.req_id
339 {
340 self.subscriptions.confirm_subscribe(req_id);
341 log::debug!("Subscription duplicate ignored: topic={topic}, error={error_msg}");
342 return;
343 }
344
345 if let Some(ref req_id) = resp.req_id {
346 self.subscriptions.mark_failure(req_id);
347 } else {
348 for t in self.subscriptions.pending_subscribe_topics() {
349 self.subscriptions.mark_failure(&t);
350 }
351 }
352 log::warn!("Subscription error: topic={topic}, error={error_msg}");
353 }
354 Some(BybitWsOperation::Unsubscribe) => {
355 log::warn!("Unsubscription error: topic={topic}, error={error_msg}");
356 }
357 _ => {}
358 }
359 }
360
361 fn parse_raw_frame(msg: Message) -> Option<BybitWsFrame> {
362 match msg {
363 Message::Text(text) => {
364 if text == nautilus_network::RECONNECTED {
365 log::info!("Received WebSocket reconnected signal");
366 return Some(BybitWsFrame::Reconnected);
367 }
368
369 if text.trim().eq_ignore_ascii_case("pong") {
370 return None;
371 }
372
373 log::trace!("Raw websocket message: {text}");
374
375 let value: serde_json::Value = match serde_json::from_str(&text) {
376 Ok(v) => v,
377 Err(e) => {
378 log::error!("Failed to parse WebSocket message: {e}: {text}");
379 return None;
380 }
381 };
382
383 if value
384 .get("op")
385 .and_then(serde_json::Value::as_str)
386 .is_some_and(|op| op == BybitWsOperation::Pong.as_ref())
387 {
388 return None;
389 }
390
391 Some(parse_bybit_ws_frame(value))
392 }
393 Message::Binary(msg) => {
394 log::debug!("Raw binary: {msg:?}");
395 None
396 }
397 Message::Close(_) => {
398 log::debug!("Received close message, waiting for reconnection");
399 None
400 }
401 _ => None,
402 }
403 }
404}
405
406fn is_already_subscribed_error(error_msg: &str) -> bool {
407 error_msg
408 .to_ascii_lowercase()
409 .contains("already subscribed")
410}
411
412#[cfg(test)]
413mod tests {
414 use rstest::rstest;
415 use ustr::Ustr;
416
417 use super::*;
418 use crate::common::{consts::BYBIT_WS_TOPIC_DELIMITER, testing::load_test_json};
419
420 fn create_test_handler() -> BybitWsFeedHandler {
421 let signal = Arc::new(AtomicBool::new(false));
422 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
423 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
424 let auth_tracker = AuthTracker::new();
425 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
426
427 BybitWsFeedHandler::new(signal, cmd_rx, raw_rx, auth_tracker, subscriptions)
428 }
429
430 fn load_value(fixture: &str) -> serde_json::Value {
431 let json = load_test_json(fixture);
432 serde_json::from_str(&json).unwrap()
433 }
434
435 #[rstest]
436 fn test_handler_initializes() {
437 let _handler = create_test_handler();
438 }
439
440 #[rstest]
441 fn test_parse_frame_auth_success() {
442 let value = load_value("ws_auth_success.json");
443 let frame = parse_bybit_ws_frame(value);
444 match frame {
445 BybitWsFrame::Auth(auth) => {
446 assert_eq!(auth.conn_id.as_deref(), Some("cejreaspqfm9se7usbrg-2xh"));
447 assert_eq!(auth.ret_code, Some(0));
448 assert_eq!(auth.success, Some(true));
449 }
450 other => panic!("Expected Auth, was {other:?}"),
451 }
452 }
453
454 #[rstest]
455 fn test_parse_frame_auth_failure() {
456 let value = load_value("ws_auth_failure.json");
457 let frame = parse_bybit_ws_frame(value);
458 match frame {
459 BybitWsFrame::ErrorResponse(resp) => {
460 assert_eq!(resp.ret_code, Some(10003));
461 assert_eq!(resp.ret_msg.as_deref(), Some("Invalid apikey"));
462 }
463 other => panic!("Expected ErrorResponse, was {other:?}"),
464 }
465 }
466
467 #[rstest]
468 fn test_parse_frame_subscription_ack() {
469 let value = load_value("ws_subscription_ack.json");
470 let frame = parse_bybit_ws_frame(value);
471 match frame {
472 BybitWsFrame::Subscription(sub) => {
473 assert!(sub.success);
474 assert_eq!(sub.op, BybitWsOperation::Subscribe);
475 assert_eq!(sub.req_id.as_deref(), Some("sub-orderbook-1"));
476 }
477 other => panic!("Expected Subscription, was {other:?}"),
478 }
479 }
480
481 #[rstest]
482 fn test_parse_frame_subscription_failure() {
483 let value = load_value("ws_subscription_failure.json");
484 let frame = parse_bybit_ws_frame(value);
485 match frame {
486 BybitWsFrame::ErrorResponse(resp) => {
487 assert_eq!(
488 resp.ret_msg.as_deref(),
489 Some("Invalid topic: invalid.topic.BTCUSDT")
490 );
491 }
492 other => panic!("Expected ErrorResponse, was {other:?}"),
493 }
494 }
495
496 #[rstest]
497 fn test_parse_frame_order_response() {
498 let value = load_value("ws_order_response.json");
499 let frame = parse_bybit_ws_frame(value);
500 match frame {
501 BybitWsFrame::OrderResponse(resp) => {
502 assert_eq!(resp.op.as_str(), "order.create");
503 assert_eq!(resp.ret_code, 0);
504 assert_eq!(resp.ret_msg, "OK");
505 }
506 other => panic!("Expected OrderResponse, was {other:?}"),
507 }
508 }
509
510 #[rstest]
511 fn test_parse_frame_orderbook() {
512 let value = load_value("ws_orderbook_snapshot.json");
513 let frame = parse_bybit_ws_frame(value);
514 assert!(
515 matches!(frame, BybitWsFrame::Orderbook(_)),
516 "Expected Orderbook, was {frame:?}"
517 );
518 }
519
520 #[rstest]
521 fn test_parse_frame_trade() {
522 let value = load_value("ws_public_trade.json");
523 let frame = parse_bybit_ws_frame(value);
524 assert!(
525 matches!(frame, BybitWsFrame::Trade(_)),
526 "Expected Trade, was {frame:?}"
527 );
528 }
529
530 #[rstest]
531 fn test_parse_frame_kline() {
532 let value = load_value("ws_kline.json");
533 let frame = parse_bybit_ws_frame(value);
534 assert!(
535 matches!(frame, BybitWsFrame::Kline(_)),
536 "Expected Kline, was {frame:?}"
537 );
538 }
539
540 #[rstest]
541 fn test_parse_frame_ticker_linear() {
542 let value = load_value("ws_ticker_linear.json");
543 let frame = parse_bybit_ws_frame(value);
544 assert!(
545 matches!(frame, BybitWsFrame::TickerLinear(_)),
546 "Expected TickerLinear, was {frame:?}"
547 );
548 }
549
550 #[rstest]
551 fn test_parse_frame_ticker_option() {
552 let value = load_value("ws_ticker_option.json");
553 let frame = parse_bybit_ws_frame(value);
554 assert!(
555 matches!(frame, BybitWsFrame::TickerOption(_)),
556 "Expected TickerOption, was {frame:?}"
557 );
558 }
559
560 #[rstest]
561 fn test_parse_frame_account_order() {
562 let value = load_value("ws_account_order.json");
563 let frame = parse_bybit_ws_frame(value);
564 assert!(
565 matches!(frame, BybitWsFrame::AccountOrder(_)),
566 "Expected AccountOrder, was {frame:?}"
567 );
568 }
569
570 #[rstest]
571 fn test_parse_frame_account_execution() {
572 let value = load_value("ws_account_execution.json");
573 let frame = parse_bybit_ws_frame(value);
574 assert!(
575 matches!(frame, BybitWsFrame::AccountExecution(_)),
576 "Expected AccountExecution, was {frame:?}"
577 );
578 }
579
580 #[rstest]
581 fn test_parse_frame_account_wallet() {
582 let value = load_value("ws_account_wallet.json");
583 let frame = parse_bybit_ws_frame(value);
584 assert!(
585 matches!(frame, BybitWsFrame::AccountWallet(_)),
586 "Expected AccountWallet, was {frame:?}"
587 );
588 }
589
590 #[rstest]
591 fn test_parse_frame_account_position() {
592 let value = load_value("ws_account_position.json");
593 let frame = parse_bybit_ws_frame(value);
594 assert!(
595 matches!(frame, BybitWsFrame::AccountPosition(_)),
596 "Expected AccountPosition, was {frame:?}"
597 );
598 }
599
600 #[rstest]
601 fn test_parse_frame_unknown_message() {
602 let value: serde_json::Value = serde_json::json!({"foo": "bar"});
603 let frame = parse_bybit_ws_frame(value);
604 assert!(
605 matches!(frame, BybitWsFrame::Unknown(_)),
606 "Expected Unknown, was {frame:?}"
607 );
608 }
609
610 #[rstest]
611 fn test_parse_raw_reconnected_signal() {
612 let msg = Message::Text(nautilus_network::RECONNECTED.to_string().into());
613 let result = BybitWsFeedHandler::parse_raw_frame(msg);
614 assert!(
615 matches!(result, Some(BybitWsFrame::Reconnected)),
616 "Expected Some(Reconnected), was {result:?}"
617 );
618 }
619
620 #[rstest]
621 fn test_parse_raw_pong_text() {
622 let msg = Message::Text("pong".into());
623 let result = BybitWsFeedHandler::parse_raw_frame(msg);
624 assert!(result.is_none(), "Expected None for pong, was {result:?}");
625 }
626
627 #[rstest]
628 fn test_parse_raw_json_pong_message() {
629 let msg = Message::Text(
630 r#"{"args":["1777226678908"],"conn_id":"yzr7jz02gws1vh60mk5m-hxqdp","op":"pong"}"#
631 .into(),
632 );
633 let result = BybitWsFeedHandler::parse_raw_frame(msg);
634 assert!(
635 result.is_none(),
636 "Expected None for JSON pong, was {result:?}"
637 );
638 }
639
640 #[rstest]
641 fn test_parse_raw_valid_json() {
642 let json = load_test_json("ws_public_trade.json");
643 let msg = Message::Text(json.into());
644 let result = BybitWsFeedHandler::parse_raw_frame(msg);
645 assert!(
646 matches!(result, Some(BybitWsFrame::Trade(_))),
647 "Expected Some(Trade), was {result:?}"
648 );
649 }
650
651 #[rstest]
652 fn test_parse_raw_invalid_json() {
653 let msg = Message::Text("not valid json".into());
654 let result = BybitWsFeedHandler::parse_raw_frame(msg);
655 assert!(
656 result.is_none(),
657 "Expected None for invalid JSON, was {result:?}"
658 );
659 }
660
661 #[rstest]
662 fn test_parse_raw_binary_message() {
663 let msg = Message::Binary(vec![0x01, 0x02].into());
664 let result = BybitWsFeedHandler::parse_raw_frame(msg);
665 assert!(result.is_none(), "Expected None for binary, was {result:?}");
666 }
667
668 #[rstest]
669 fn test_subscription_ack_with_req_id_confirms_only_that_topic() {
670 let handler = create_test_handler();
671 handler.subscriptions.mark_subscribe("orderbook.50.BTCUSDT");
672 handler.subscriptions.mark_subscribe("publicTrade.BTCUSDT");
673
674 let ack = BybitWsSubscriptionMsg {
675 success: true,
676 op: BybitWsOperation::Subscribe,
677 conn_id: None,
678 req_id: Some("orderbook.50.BTCUSDT".to_string()),
679 ret_msg: None,
680 };
681
682 handler.handle_subscription_ack(&ack);
683
684 assert!(
686 handler
687 .subscriptions
688 .pending_subscribe_topics()
689 .contains(&"publicTrade.BTCUSDT".to_string())
690 );
691 assert!(
692 !handler
693 .subscriptions
694 .pending_subscribe_topics()
695 .contains(&"orderbook.50.BTCUSDT".to_string())
696 );
697 }
698
699 #[rstest]
700 fn test_subscription_failure_with_req_id_marks_only_that_topic() {
701 let handler = create_test_handler();
702 handler.subscriptions.mark_subscribe("orderbook.50.BTCUSDT");
703 handler.subscriptions.mark_subscribe("publicTrade.BTCUSDT");
704
705 let ack = BybitWsSubscriptionMsg {
706 success: false,
707 op: BybitWsOperation::Subscribe,
708 conn_id: None,
709 req_id: Some("orderbook.50.BTCUSDT".to_string()),
710 ret_msg: Some("Invalid topic".to_string()),
711 };
712
713 handler.handle_subscription_ack(&ack);
714
715 let pending = handler.subscriptions.pending_subscribe_topics();
718 assert!(pending.contains(&"orderbook.50.BTCUSDT".to_string()));
719 assert!(pending.contains(&"publicTrade.BTCUSDT".to_string()));
720 }
721
722 #[rstest]
723 fn test_error_response_with_subscribe_op_triggers_mark_failure() {
724 let handler = create_test_handler();
725 handler
726 .subscriptions
727 .mark_subscribe("invalid.topic.BTCUSDT");
728
729 let resp = BybitWsResponse {
730 op: Some(BybitWsOperation::Subscribe),
731 topic: None,
732 success: Some(false),
733 conn_id: None,
734 req_id: Some("invalid.topic.BTCUSDT".to_string()),
735 ret_code: Some(10001),
736 ret_msg: Some("Invalid topic".to_string()),
737 };
738
739 handler.handle_subscription_error(&resp);
740
741 let pending = handler.subscriptions.pending_subscribe_topics();
743 assert!(pending.contains(&"invalid.topic.BTCUSDT".to_string()));
744 }
745
746 #[rstest]
747 fn test_already_subscribed_error_confirms_topic() {
748 let handler = create_test_handler();
749 handler.subscriptions.mark_subscribe("tickers.ETHUSDT");
750
751 let resp = BybitWsResponse {
752 op: Some(BybitWsOperation::Subscribe),
753 topic: None,
754 success: Some(false),
755 conn_id: None,
756 req_id: Some("tickers.ETHUSDT".to_string()),
757 ret_code: Some(10001),
758 ret_msg: Some("error:already subscribed,topic:tickers.ETHUSDT".to_string()),
759 };
760
761 handler.handle_subscription_error(&resp);
762
763 let pending = handler.subscriptions.pending_subscribe_topics();
764 assert!(!pending.contains(&"tickers.ETHUSDT".to_string()));
765 let symbols = handler.subscriptions.confirmed();
766 let entry = symbols
767 .get(&Ustr::from("tickers"))
768 .expect("channel present");
769 assert!(entry.contains(&Ustr::from("ETHUSDT")));
770 }
771
772 #[rstest]
773 fn test_subscription_ack_without_req_id_confirms_all_pending() {
774 let handler = create_test_handler();
775 handler.subscriptions.mark_subscribe("orderbook.50.BTCUSDT");
776 handler.subscriptions.mark_subscribe("publicTrade.BTCUSDT");
777
778 let ack = BybitWsSubscriptionMsg {
779 success: true,
780 op: BybitWsOperation::Subscribe,
781 conn_id: None,
782 req_id: None,
783 ret_msg: None,
784 };
785
786 handler.handle_subscription_ack(&ack);
787
788 assert!(handler.subscriptions.pending_subscribe_topics().is_empty());
790 }
791}