Skip to main content

nautilus_kraken/websocket/futures/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for the Kraken Futures v1 streaming API.
17
18use std::{
19    collections::HashMap,
20    sync::{
21        Arc, RwLock,
22        atomic::{AtomicBool, AtomicU8, Ordering},
23    },
24};
25
26use arc_swap::ArcSwap;
27use nautilus_common::live::get_runtime;
28use nautilus_core::AtomicMap;
29use nautilus_model::{
30    identifiers::{
31        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
32    },
33    instruments::{Instrument, InstrumentAny},
34};
35use nautilus_network::{
36    mode::ConnectionMode,
37    websocket::{
38        AuthTracker, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
39        channel_message_handler,
40    },
41};
42use tokio_util::sync::CancellationToken;
43
44use super::{
45    handler::{FuturesFeedHandler, FuturesHandlerCommand},
46    messages::{
47        KrakenFuturesChallengeRequest, KrakenFuturesEvent, KrakenFuturesFeed,
48        KrakenFuturesPrivateSubscribeRequest, KrakenFuturesRequest, KrakenFuturesWsMessage,
49    },
50};
51use crate::{
52    common::{credential::KrakenCredential, parse::truncate_cl_ord_id},
53    websocket::error::KrakenWsError,
54};
55
56/// Topic delimiter for Kraken Futures WebSocket subscriptions.
57///
58/// Topics use colon format: `feed:symbol` (e.g., `trades:PF_ETHUSD`).
59pub const KRAKEN_FUTURES_WS_TOPIC_DELIMITER: char = ':';
60
61/// WebSocket client for the Kraken Futures v1 streaming API.
62#[derive(Debug)]
63#[cfg_attr(
64    feature = "python",
65    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken", from_py_object)
66)]
67#[cfg_attr(
68    feature = "python",
69    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.kraken")
70)]
71pub struct KrakenFuturesWebSocketClient {
72    url: String,
73    heartbeat_secs: u64,
74    signal: Arc<AtomicBool>,
75    connection_mode: Arc<ArcSwap<AtomicU8>>,
76    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<FuturesHandlerCommand>>>,
77    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>>>,
78    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
79    subscriptions: SubscriptionState,
80    subscription_payloads: Arc<tokio::sync::RwLock<HashMap<String, String>>>,
81    auth_tracker: AuthTracker,
82    cancellation_token: CancellationToken,
83    credential: Option<KrakenCredential>,
84    original_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
85    signed_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
86    account_id: Arc<RwLock<Option<AccountId>>>,
87    truncated_id_map: Arc<AtomicMap<String, ClientOrderId>>,
88    order_instrument_map: Arc<AtomicMap<String, InstrumentId>>,
89    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
90    transport_backend: TransportBackend,
91    proxy_url: Option<String>,
92}
93
94impl Clone for KrakenFuturesWebSocketClient {
95    fn clone(&self) -> Self {
96        Self {
97            url: self.url.clone(),
98            heartbeat_secs: self.heartbeat_secs,
99            signal: Arc::clone(&self.signal),
100            connection_mode: Arc::clone(&self.connection_mode),
101            cmd_tx: Arc::clone(&self.cmd_tx),
102            out_rx: self.out_rx.clone(),
103            task_handle: self.task_handle.clone(),
104            subscriptions: self.subscriptions.clone(),
105            subscription_payloads: Arc::clone(&self.subscription_payloads),
106            auth_tracker: self.auth_tracker.clone(),
107            cancellation_token: self.cancellation_token.clone(),
108            credential: self.credential.clone(),
109            original_challenge: Arc::clone(&self.original_challenge),
110            signed_challenge: Arc::clone(&self.signed_challenge),
111            account_id: Arc::clone(&self.account_id),
112            truncated_id_map: Arc::clone(&self.truncated_id_map),
113            order_instrument_map: Arc::clone(&self.order_instrument_map),
114            instruments: Arc::clone(&self.instruments),
115            transport_backend: self.transport_backend,
116            proxy_url: self.proxy_url.clone(),
117        }
118    }
119}
120
121impl KrakenFuturesWebSocketClient {
122    /// Creates a new client with the given URL.
123    #[must_use]
124    pub fn new(url: String, heartbeat_secs: u64, proxy_url: Option<String>) -> Self {
125        Self::with_credentials(
126            url,
127            heartbeat_secs,
128            None,
129            TransportBackend::default(),
130            proxy_url,
131        )
132    }
133
134    /// Creates a new client with API credentials for authenticated feeds.
135    #[must_use]
136    pub fn with_credentials(
137        url: String,
138        heartbeat_secs: u64,
139        credential: Option<KrakenCredential>,
140        transport_backend: TransportBackend,
141        proxy_url: Option<String>,
142    ) -> Self {
143        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<FuturesHandlerCommand>();
144        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
145        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
146
147        Self {
148            url,
149            heartbeat_secs,
150            signal: Arc::new(AtomicBool::new(false)),
151            connection_mode,
152            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
153            out_rx: None,
154            task_handle: None,
155            subscriptions: SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER),
156            subscription_payloads: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
157            auth_tracker: AuthTracker::new(),
158            cancellation_token: CancellationToken::new(),
159            credential,
160            original_challenge: Arc::new(tokio::sync::RwLock::new(None)),
161            signed_challenge: Arc::new(tokio::sync::RwLock::new(None)),
162            account_id: Arc::new(RwLock::new(None)),
163            truncated_id_map: Arc::new(AtomicMap::new()),
164            order_instrument_map: Arc::new(AtomicMap::new()),
165            instruments: Arc::new(AtomicMap::new()),
166            transport_backend,
167            proxy_url,
168        }
169    }
170
171    /// Returns true if the client has API credentials set.
172    #[must_use]
173    pub fn has_credentials(&self) -> bool {
174        self.credential.is_some()
175    }
176
177    /// Returns the WebSocket URL.
178    #[must_use]
179    pub fn url(&self) -> &str {
180        &self.url
181    }
182
183    /// Returns true if the connection is closed.
184    #[must_use]
185    pub fn is_closed(&self) -> bool {
186        ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
187            == ConnectionMode::Closed
188    }
189
190    /// Returns true if the connection is active.
191    #[must_use]
192    pub fn is_active(&self) -> bool {
193        ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
194            == ConnectionMode::Active
195    }
196
197    /// Waits until the WebSocket connection is active or timeout.
198    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
199        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
200
201        tokio::time::timeout(timeout, async {
202            while !self.is_active() {
203                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
204            }
205        })
206        .await
207        .map_err(|_| {
208            KrakenWsError::ConnectionError(format!(
209                "WebSocket connection timeout after {timeout_secs} seconds"
210            ))
211        })?;
212
213        Ok(())
214    }
215
216    /// Returns true if the WebSocket is authenticated for private feeds.
217    #[must_use]
218    pub fn is_authenticated(&self) -> bool {
219        self.auth_tracker.is_authenticated()
220    }
221
222    /// Waits until the WebSocket is authenticated or the timeout elapses.
223    ///
224    /// Returns an error on timeout or explicit auth failure.
225    pub async fn wait_until_authenticated(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
226        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
227        if self.auth_tracker.wait_for_authenticated(timeout).await {
228            Ok(())
229        } else {
230            Err(KrakenWsError::AuthenticationError(format!(
231                "Authentication not completed within {timeout_secs} seconds"
232            )))
233        }
234    }
235
236    /// Authenticates the WebSocket connection for private feeds.
237    ///
238    /// Sends a challenge request and waits for the handler to parse the response,
239    /// sign it, and mark the `AuthTracker` successful. Private subscriptions gate
240    /// on the stored challenge / signed-challenge pair.
241    pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
242        let credential = self.credential.as_ref().ok_or_else(|| {
243            KrakenWsError::AuthenticationError("API credentials required".to_string())
244        })?;
245
246        let payload = build_challenge_payload(credential)
247            .map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
248
249        let receiver = self.auth_tracker.begin();
250
251        self.cmd_tx
252            .read()
253            .await
254            .send(FuturesHandlerCommand::RequestChallenge { payload })
255            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
256
257        self.auth_tracker
258            .wait_for_result::<KrakenWsError>(tokio::time::Duration::from_secs(10), receiver)
259            .await?;
260
261        log::debug!("Futures WebSocket authentication successful");
262        Ok(())
263    }
264
265    /// Connects to the WebSocket server.
266    pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
267        log::debug!("Connecting to Futures WebSocket: {}", self.url);
268
269        self.signal.store(false, Ordering::Relaxed);
270
271        let (raw_handler, raw_rx) = channel_message_handler();
272
273        let ws_config = WebSocketConfig {
274            url: self.url.clone(),
275            headers: vec![],
276            heartbeat: Some(self.heartbeat_secs),
277            heartbeat_msg: None, // Use WebSocket ping frames, not text messages
278            reconnect_timeout_ms: Some(5_000),
279            reconnect_delay_initial_ms: Some(500),
280            reconnect_delay_max_ms: Some(5_000),
281            reconnect_backoff_factor: Some(1.5),
282            reconnect_jitter_ms: Some(250),
283            reconnect_max_attempts: None,
284            idle_timeout_ms: None,
285            backend: self.transport_backend,
286            proxy_url: self.proxy_url.clone(),
287        };
288
289        let ws_client =
290            WebSocketClient::connect(ws_config, Some(raw_handler), None, None, vec![], None)
291                .await
292                .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
293
294        self.connection_mode
295            .store(ws_client.connection_mode_atomic());
296
297        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<KrakenFuturesWsMessage>();
298        self.out_rx = Some(Arc::new(out_rx));
299
300        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<FuturesHandlerCommand>();
301        *self.cmd_tx.write().await = cmd_tx.clone();
302
303        if let Err(e) = cmd_tx.send(FuturesHandlerCommand::SetClient(ws_client)) {
304            return Err(KrakenWsError::ConnectionError(format!(
305                "Failed to send WebSocketClient to handler: {e}"
306            )));
307        }
308
309        let signal = self.signal.clone();
310        let subscriptions = self.subscriptions.clone();
311        let subscription_payloads = self.subscription_payloads.clone();
312        let cmd_tx_for_reconnect = cmd_tx.clone();
313        let credential_for_reconnect = self.credential.clone();
314        let original_challenge_for_reconnect = self.original_challenge.clone();
315        let signed_challenge_for_reconnect = self.signed_challenge.clone();
316        let auth_tracker_for_reconnect = self.auth_tracker.clone();
317
318        let stream_handle = get_runtime().spawn(async move {
319            let mut handler =
320                FuturesFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
321            let mut pending_resubscribe = false;
322
323            loop {
324                match handler.next().await {
325                    Some(KrakenFuturesWsMessage::Reconnected) => {
326                        if signal.load(Ordering::Relaxed) {
327                            continue;
328                        }
329                        log::info!("WebSocket reconnected");
330
331                        let confirmed_topics = subscriptions.all_topics();
332                        for topic in &confirmed_topics {
333                            subscriptions.mark_failure(topic);
334                        }
335
336                        auth_tracker_for_reconnect.invalidate();
337                        *original_challenge_for_reconnect.write().await = None;
338                        *signed_challenge_for_reconnect.write().await = None;
339
340                        let payloads = subscription_payloads.read().await.clone();
341
342                        // Resubscribe public topics straight away; they don't depend on auth,
343                        // so don't tie their restoration to the challenge outcome.
344                        resubscribe_public(&cmd_tx_for_reconnect, &subscriptions, &payloads);
345
346                        let has_private =
347                            payloads.keys().any(|k| k == "open_orders" || k == "fills");
348
349                        pending_resubscribe = false;
350
351                        if has_private {
352                            if let Some(ref cred) = credential_for_reconnect {
353                                match build_challenge_payload(cred) {
354                                    Ok(payload) => {
355                                        let _rx = auth_tracker_for_reconnect.begin();
356
357                                        if let Err(e) = cmd_tx_for_reconnect.send(
358                                            FuturesHandlerCommand::RequestChallenge { payload },
359                                        ) {
360                                            log::error!("Failed to queue reconnect challenge: {e}");
361                                        } else {
362                                            pending_resubscribe = true;
363                                        }
364                                    }
365                                    Err(e) => {
366                                        log::error!("Failed to serialize reconnect challenge: {e}");
367                                    }
368                                }
369                            } else {
370                                log::warn!(
371                                    "Private subscriptions exist but no credentials available"
372                                );
373                            }
374                        }
375
376                        if let Err(e) = out_tx.send(KrakenFuturesWsMessage::Reconnected) {
377                            log::debug!("Output channel closed: {e}");
378                            break;
379                        }
380                    }
381                    Some(KrakenFuturesWsMessage::Challenge(challenge)) => {
382                        let Some(ref cred) = credential_for_reconnect else {
383                            log::warn!("Challenge received but no credentials configured");
384                            auth_tracker_for_reconnect.fail("no credentials");
385                            continue;
386                        };
387
388                        match cred.sign_ws_challenge(&challenge) {
389                            Ok(signed) => {
390                                *original_challenge_for_reconnect.write().await =
391                                    Some(challenge.clone());
392                                *signed_challenge_for_reconnect.write().await =
393                                    Some(signed.clone());
394                                auth_tracker_for_reconnect.succeed();
395                                log::debug!("Signed WebSocket challenge");
396
397                                if pending_resubscribe {
398                                    let payloads = subscription_payloads.read().await;
399                                    resubscribe_private(
400                                        &cmd_tx_for_reconnect,
401                                        &subscriptions,
402                                        &payloads,
403                                        cred,
404                                        challenge.as_str(),
405                                        signed.as_str(),
406                                    );
407                                    pending_resubscribe = false;
408                                }
409                            }
410                            Err(e) => {
411                                log::error!("Failed to sign challenge: {e}");
412                                auth_tracker_for_reconnect.fail(e.to_string());
413                                pending_resubscribe = false;
414                            }
415                        }
416                    }
417                    Some(msg) => {
418                        if let Err(e) = out_tx.send(msg) {
419                            log::debug!("Output channel closed: {e}");
420                            break;
421                        }
422                    }
423                    None => {
424                        log::debug!("Handler stream ended");
425                        break;
426                    }
427                }
428            }
429
430            log::debug!("Futures handler task exiting");
431        });
432
433        self.task_handle = Some(Arc::new(stream_handle));
434
435        log::debug!("Futures WebSocket connected successfully");
436        Ok(())
437    }
438
439    /// Disconnects from the WebSocket server.
440    pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
441        log::debug!("Disconnecting Futures WebSocket");
442
443        self.signal.store(true, Ordering::Relaxed);
444
445        if let Err(e) = self
446            .cmd_tx
447            .read()
448            .await
449            .send(FuturesHandlerCommand::Disconnect)
450        {
451            log::debug!(
452                "Failed to send disconnect command (handler may already be shut down): {e}"
453            );
454        }
455
456        if let Some(task_handle) = self.task_handle.take() {
457            match Arc::try_unwrap(task_handle) {
458                Ok(handle) => {
459                    match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
460                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
461                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
462                        Err(_) => {
463                            log::warn!("Timeout waiting for task handle");
464                        }
465                    }
466                }
467                Err(arc_handle) => {
468                    log::debug!("Cannot take ownership of task handle, aborting");
469                    arc_handle.abort();
470                }
471            }
472        }
473
474        self.subscriptions.clear();
475        self.subscription_payloads.write().await.clear();
476        self.auth_tracker.fail("Disconnected");
477        Ok(())
478    }
479
480    /// Closes the WebSocket connection.
481    pub async fn close(&mut self) -> Result<(), KrakenWsError> {
482        self.disconnect().await
483    }
484
485    /// Subscribes to mark price updates for the given instrument.
486    pub async fn subscribe_mark_price(
487        &self,
488        instrument_id: InstrumentId,
489    ) -> Result<(), KrakenWsError> {
490        let symbol = instrument_id.symbol;
491        let key = format!("mark:{symbol}");
492
493        if !self.subscriptions.add_reference(&key) {
494            return Ok(());
495        }
496
497        self.subscriptions.mark_subscribe(&key);
498        self.subscriptions.confirm_subscribe(&key);
499        self.ensure_ticker_subscribed(symbol).await
500    }
501
502    /// Unsubscribes from mark price updates for the given instrument.
503    pub async fn unsubscribe_mark_price(
504        &self,
505        instrument_id: InstrumentId,
506    ) -> Result<(), KrakenWsError> {
507        let symbol = instrument_id.symbol;
508        let key = format!("mark:{symbol}");
509
510        if !self.subscriptions.remove_reference(&key) {
511            return Ok(());
512        }
513
514        self.subscriptions.mark_unsubscribe(&key);
515        self.subscriptions.confirm_unsubscribe(&key);
516        self.maybe_unsubscribe_ticker(symbol).await
517    }
518
519    /// Subscribes to index price updates for the given instrument.
520    pub async fn subscribe_index_price(
521        &self,
522        instrument_id: InstrumentId,
523    ) -> Result<(), KrakenWsError> {
524        let symbol = instrument_id.symbol;
525        let key = format!("index:{symbol}");
526
527        if !self.subscriptions.add_reference(&key) {
528            return Ok(());
529        }
530
531        self.subscriptions.mark_subscribe(&key);
532        self.subscriptions.confirm_subscribe(&key);
533        self.ensure_ticker_subscribed(symbol).await
534    }
535
536    /// Unsubscribes from index price updates for the given instrument.
537    pub async fn unsubscribe_index_price(
538        &self,
539        instrument_id: InstrumentId,
540    ) -> Result<(), KrakenWsError> {
541        let symbol = instrument_id.symbol;
542        let key = format!("index:{symbol}");
543
544        if !self.subscriptions.remove_reference(&key) {
545            return Ok(());
546        }
547
548        self.subscriptions.mark_unsubscribe(&key);
549        self.subscriptions.confirm_unsubscribe(&key);
550        self.maybe_unsubscribe_ticker(symbol).await
551    }
552
553    /// Subscribes to funding rate updates for the given instrument.
554    pub async fn subscribe_funding_rate(
555        &self,
556        instrument_id: InstrumentId,
557    ) -> Result<(), KrakenWsError> {
558        let symbol = instrument_id.symbol;
559        let key = format!("funding:{symbol}");
560
561        if !self.subscriptions.add_reference(&key) {
562            return Ok(());
563        }
564
565        self.subscriptions.mark_subscribe(&key);
566        self.subscriptions.confirm_subscribe(&key);
567        self.ensure_ticker_subscribed(symbol).await
568    }
569
570    /// Unsubscribes from funding rate updates for the given instrument.
571    pub async fn unsubscribe_funding_rate(
572        &self,
573        instrument_id: InstrumentId,
574    ) -> Result<(), KrakenWsError> {
575        let symbol = instrument_id.symbol;
576        let key = format!("funding:{symbol}");
577
578        if !self.subscriptions.remove_reference(&key) {
579            return Ok(());
580        }
581
582        self.subscriptions.mark_unsubscribe(&key);
583        self.subscriptions.confirm_unsubscribe(&key);
584        self.maybe_unsubscribe_ticker(symbol).await
585    }
586
587    /// Subscribes to quote updates for the given instrument.
588    ///
589    /// Uses the order book channel for low-latency top-of-book quotes.
590    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
591        let symbol = instrument_id.symbol;
592        let key = format!("quotes:{symbol}");
593
594        if !self.subscriptions.add_reference(&key) {
595            return Ok(());
596        }
597
598        self.subscriptions.mark_subscribe(&key);
599        self.subscriptions.confirm_subscribe(&key);
600
601        // Use book feed for low-latency quotes (not throttled ticker)
602        self.ensure_book_subscribed(symbol).await
603    }
604
605    /// Unsubscribes from quote updates for the given instrument.
606    pub async fn unsubscribe_quotes(
607        &self,
608        instrument_id: InstrumentId,
609    ) -> Result<(), KrakenWsError> {
610        let symbol = instrument_id.symbol;
611        let key = format!("quotes:{symbol}");
612
613        if !self.subscriptions.remove_reference(&key) {
614            return Ok(());
615        }
616
617        self.subscriptions.mark_unsubscribe(&key);
618        self.subscriptions.confirm_unsubscribe(&key);
619        self.maybe_unsubscribe_book(symbol).await
620    }
621
622    /// Subscribes to trade updates for the given instrument.
623    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
624        let symbol = instrument_id.symbol;
625        let key = format!("trades:{symbol}");
626
627        if !self.subscriptions.add_reference(&key) {
628            return Ok(());
629        }
630
631        self.subscriptions.mark_subscribe(&key);
632        let payload = self
633            .send_subscribe_feed(KrakenFuturesFeed::Trade, vec![symbol.to_string()])
634            .await?;
635        self.subscriptions.confirm_subscribe(&key);
636        self.subscription_payloads
637            .write()
638            .await
639            .insert(key, payload);
640        Ok(())
641    }
642
643    /// Unsubscribes from trade updates for the given instrument.
644    pub async fn unsubscribe_trades(
645        &self,
646        instrument_id: InstrumentId,
647    ) -> Result<(), KrakenWsError> {
648        let symbol = instrument_id.symbol;
649        let key = format!("trades:{symbol}");
650
651        if !self.subscriptions.remove_reference(&key) {
652            return Ok(());
653        }
654
655        self.subscriptions.mark_unsubscribe(&key);
656        self.send_unsubscribe_feed(KrakenFuturesFeed::Trade, vec![symbol.to_string()])
657            .await?;
658        self.subscriptions.confirm_unsubscribe(&key);
659        self.subscription_payloads.write().await.remove(&key);
660        Ok(())
661    }
662
663    /// Subscribes to order book updates for the given instrument.
664    ///
665    /// Note: The `depth` parameter is accepted for API compatibility with spot client but is
666    /// not used by Kraken Futures (full book is always returned).
667    pub async fn subscribe_book(
668        &self,
669        instrument_id: InstrumentId,
670        _depth: Option<u32>,
671    ) -> Result<(), KrakenWsError> {
672        let symbol = instrument_id.symbol;
673
674        let deltas_key = format!("deltas:{symbol}");
675        self.subscriptions.add_reference(&deltas_key);
676        self.subscriptions.mark_subscribe(&deltas_key);
677        self.subscriptions.confirm_subscribe(&deltas_key);
678
679        self.ensure_book_subscribed(symbol).await
680    }
681
682    /// Unsubscribes from order book updates for the given instrument.
683    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
684        let symbol = instrument_id.symbol;
685
686        let deltas_key = format!("deltas:{symbol}");
687        self.subscriptions.remove_reference(&deltas_key);
688        self.subscriptions.mark_unsubscribe(&deltas_key);
689        self.subscriptions.confirm_unsubscribe(&deltas_key);
690
691        self.maybe_unsubscribe_book(symbol).await
692    }
693
694    async fn ensure_ticker_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
695        let ticker_key = format!("ticker:{symbol}");
696
697        if !self.subscriptions.add_reference(&ticker_key) {
698            return Ok(());
699        }
700
701        self.subscriptions.mark_subscribe(&ticker_key);
702        let payload = self
703            .send_subscribe_feed(KrakenFuturesFeed::Ticker, vec![symbol.to_string()])
704            .await?;
705        self.subscriptions.confirm_subscribe(&ticker_key);
706        self.subscription_payloads
707            .write()
708            .await
709            .insert(ticker_key, payload);
710        Ok(())
711    }
712
713    async fn maybe_unsubscribe_ticker(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
714        let ticker_key = format!("ticker:{symbol}");
715
716        if !self.subscriptions.remove_reference(&ticker_key) {
717            return Ok(());
718        }
719
720        self.subscriptions.mark_unsubscribe(&ticker_key);
721        self.send_unsubscribe_feed(KrakenFuturesFeed::Ticker, vec![symbol.to_string()])
722            .await?;
723        self.subscriptions.confirm_unsubscribe(&ticker_key);
724        self.subscription_payloads.write().await.remove(&ticker_key);
725        Ok(())
726    }
727
728    async fn ensure_book_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
729        let book_key = format!("book:{symbol}");
730
731        if !self.subscriptions.add_reference(&book_key) {
732            return Ok(());
733        }
734
735        self.subscriptions.mark_subscribe(&book_key);
736        let payload = self
737            .send_subscribe_feed(KrakenFuturesFeed::Book, vec![symbol.to_string()])
738            .await?;
739        self.subscriptions.confirm_subscribe(&book_key);
740        self.subscription_payloads
741            .write()
742            .await
743            .insert(book_key, payload);
744        Ok(())
745    }
746
747    async fn maybe_unsubscribe_book(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
748        let book_key = format!("book:{symbol}");
749
750        if !self.subscriptions.remove_reference(&book_key) {
751            return Ok(());
752        }
753
754        self.subscriptions.mark_unsubscribe(&book_key);
755        self.send_unsubscribe_feed(KrakenFuturesFeed::Book, vec![symbol.to_string()])
756            .await?;
757        self.subscriptions.confirm_unsubscribe(&book_key);
758        self.subscription_payloads.write().await.remove(&book_key);
759        Ok(())
760    }
761
762    /// Gets the output receiver for processed messages.
763    pub fn take_output_rx(
764        &mut self,
765    ) -> Option<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>> {
766        self.out_rx.take().and_then(|arc| Arc::try_unwrap(arc).ok())
767    }
768
769    /// Set authentication credentials directly (for when challenge is obtained externally).
770    pub async fn set_auth_credentials(
771        &self,
772        original_challenge: String,
773        signed_challenge: String,
774    ) -> Result<(), KrakenWsError> {
775        let _credential = self.credential.as_ref().ok_or_else(|| {
776            KrakenWsError::AuthenticationError("API credentials required".to_string())
777        })?;
778
779        *self.original_challenge.write().await = Some(original_challenge);
780        *self.signed_challenge.write().await = Some(signed_challenge);
781        self.auth_tracker.succeed();
782
783        Ok(())
784    }
785
786    /// Sign a challenge with the API credentials.
787    ///
788    /// Returns the signed challenge on success.
789    pub fn sign_challenge(&self, challenge: &str) -> Result<String, KrakenWsError> {
790        let credential = self.credential.as_ref().ok_or_else(|| {
791            KrakenWsError::AuthenticationError("API credentials required".to_string())
792        })?;
793
794        credential.sign_ws_challenge(challenge).map_err(|e| {
795            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
796        })
797    }
798
799    /// Complete authentication with a received challenge.
800    pub async fn authenticate_with_challenge(&self, challenge: &str) -> Result<(), KrakenWsError> {
801        let credential = self.credential.as_ref().ok_or_else(|| {
802            KrakenWsError::AuthenticationError("API credentials required".to_string())
803        })?;
804
805        let signed_challenge = credential.sign_ws_challenge(challenge).map_err(|e| {
806            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
807        })?;
808
809        self.set_auth_credentials(challenge.to_string(), signed_challenge)
810            .await
811    }
812
813    /// Sets the account ID for execution report parsing.
814    pub fn set_account_id(&self, account_id: AccountId) {
815        if let Ok(mut guard) = self.account_id.write() {
816            *guard = Some(account_id);
817        }
818    }
819
820    /// Returns the account ID if set.
821    #[must_use]
822    pub fn account_id(&self) -> Option<AccountId> {
823        self.account_id.read().ok().and_then(|g| *g)
824    }
825
826    /// Returns a reference to the shared account ID.
827    #[must_use]
828    pub fn account_id_shared(&self) -> &Arc<RwLock<Option<AccountId>>> {
829        &self.account_id
830    }
831
832    /// Returns a reference to the truncated ID map.
833    #[must_use]
834    pub fn truncated_id_map(&self) -> &Arc<AtomicMap<String, ClientOrderId>> {
835        &self.truncated_id_map
836    }
837
838    /// Returns a reference to the order-to-instrument map.
839    #[must_use]
840    pub fn order_instrument_map(&self) -> &Arc<AtomicMap<String, InstrumentId>> {
841        &self.order_instrument_map
842    }
843
844    /// Returns a reference to the shared instruments map.
845    #[must_use]
846    pub fn instruments_shared(&self) -> &Arc<AtomicMap<InstrumentId, InstrumentAny>> {
847        &self.instruments
848    }
849
850    /// Returns a reference to the subscription state.
851    #[must_use]
852    pub fn subscriptions(&self) -> &SubscriptionState {
853        &self.subscriptions
854    }
855
856    /// Caches an instrument for execution report parsing.
857    pub fn cache_instrument(&self, instrument: InstrumentAny) {
858        self.instruments.insert(instrument.id(), instrument);
859    }
860
861    /// Caches multiple instruments for execution report parsing.
862    pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
863        self.instruments.rcu(|m| {
864            for instrument in instruments {
865                m.insert(instrument.id(), instrument.clone());
866            }
867        });
868    }
869
870    /// Caches a client order for truncated ID resolution and instrument lookup.
871    ///
872    /// Kraken Futures limits client order IDs to 18 characters, so orders with
873    /// longer IDs are truncated. This method stores the mapping from truncated
874    /// to full ID, and from venue order ID to instrument ID for cancel messages.
875    pub fn cache_client_order(
876        &self,
877        client_order_id: ClientOrderId,
878        venue_order_id: Option<VenueOrderId>,
879        instrument_id: InstrumentId,
880        _trader_id: TraderId,
881        _strategy_id: StrategyId,
882    ) {
883        let truncated = truncate_cl_ord_id(&client_order_id);
884
885        if truncated != client_order_id.as_str() {
886            self.truncated_id_map.insert(truncated, client_order_id);
887        }
888
889        if let Some(venue_id) = venue_order_id {
890            self.order_instrument_map
891                .insert(venue_id.to_string(), instrument_id);
892        }
893    }
894
895    /// Subscribes to open orders feed (private, requires authentication).
896    pub async fn subscribe_open_orders(&self) -> Result<(), KrakenWsError> {
897        let key = "open_orders";
898        if !self.subscriptions.add_reference(key) {
899            return Ok(());
900        }
901
902        self.subscriptions.mark_subscribe(key);
903        let payload = self
904            .send_private_subscribe_feed(KrakenFuturesFeed::OpenOrders)
905            .await?;
906        self.subscriptions.confirm_subscribe(key);
907        self.subscription_payloads
908            .write()
909            .await
910            .insert(key.to_string(), payload);
911        Ok(())
912    }
913
914    /// Subscribes to fills feed (private, requires authentication).
915    pub async fn subscribe_fills(&self) -> Result<(), KrakenWsError> {
916        let key = "fills";
917        if !self.subscriptions.add_reference(key) {
918            return Ok(());
919        }
920
921        self.subscriptions.mark_subscribe(key);
922        let payload = self
923            .send_private_subscribe_feed(KrakenFuturesFeed::Fills)
924            .await?;
925        self.subscriptions.confirm_subscribe(key);
926        self.subscription_payloads
927            .write()
928            .await
929            .insert(key.to_string(), payload);
930        Ok(())
931    }
932
933    /// Subscribes to both open orders and fills (convenience method).
934    pub async fn subscribe_executions(&self) -> Result<(), KrakenWsError> {
935        self.subscribe_open_orders().await?;
936        self.subscribe_fills().await?;
937        Ok(())
938    }
939
940    async fn send_subscribe_feed(
941        &self,
942        feed: KrakenFuturesFeed,
943        product_ids: Vec<String>,
944    ) -> Result<String, KrakenWsError> {
945        let request = KrakenFuturesRequest {
946            event: KrakenFuturesEvent::Subscribe,
947            feed,
948            product_ids,
949        };
950        let payload =
951            serde_json::to_string(&request).map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
952        self.cmd_tx
953            .read()
954            .await
955            .send(FuturesHandlerCommand::Subscribe {
956                payload: payload.clone(),
957            })
958            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
959        Ok(payload)
960    }
961
962    async fn send_unsubscribe_feed(
963        &self,
964        feed: KrakenFuturesFeed,
965        product_ids: Vec<String>,
966    ) -> Result<(), KrakenWsError> {
967        let request = KrakenFuturesRequest {
968            event: KrakenFuturesEvent::Unsubscribe,
969            feed,
970            product_ids,
971        };
972        let payload =
973            serde_json::to_string(&request).map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
974        self.cmd_tx
975            .read()
976            .await
977            .send(FuturesHandlerCommand::Unsubscribe { payload })
978            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
979        Ok(())
980    }
981
982    async fn send_private_subscribe_feed(
983        &self,
984        feed: KrakenFuturesFeed,
985    ) -> Result<String, KrakenWsError> {
986        let credential = self.credential.as_ref().ok_or_else(|| {
987            KrakenWsError::AuthenticationError("API credentials required".to_string())
988        })?;
989        let original_challenge = self
990            .original_challenge
991            .read()
992            .await
993            .clone()
994            .ok_or_else(|| {
995                KrakenWsError::AuthenticationError(
996                    "Must authenticate before subscribing to private feeds".to_string(),
997                )
998            })?;
999        let signed_challenge = self.signed_challenge.read().await.clone().ok_or_else(|| {
1000            KrakenWsError::AuthenticationError(
1001                "Must authenticate before subscribing to private feeds".to_string(),
1002            )
1003        })?;
1004
1005        let request = KrakenFuturesPrivateSubscribeRequest {
1006            event: KrakenFuturesEvent::Subscribe,
1007            feed,
1008            api_key: credential.api_key().to_string(),
1009            original_challenge,
1010            signed_challenge,
1011        };
1012        let payload =
1013            serde_json::to_string(&request).map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
1014        self.cmd_tx
1015            .read()
1016            .await
1017            .send(FuturesHandlerCommand::Subscribe {
1018                payload: payload.clone(),
1019            })
1020            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
1021        Ok(payload)
1022    }
1023}
1024
1025fn update_private_payload_credentials(
1026    payload: &str,
1027    api_key: &str,
1028    original_challenge: &str,
1029    signed_challenge: &str,
1030) -> Option<String> {
1031    let mut value: serde_json::Value = serde_json::from_str(payload).ok()?;
1032    let obj = value.as_object_mut()?;
1033    obj.insert(
1034        "api_key".to_string(),
1035        serde_json::Value::String(api_key.to_string()),
1036    );
1037    obj.insert(
1038        "original_challenge".to_string(),
1039        serde_json::Value::String(original_challenge.to_string()),
1040    );
1041    obj.insert(
1042        "signed_challenge".to_string(),
1043        serde_json::Value::String(signed_challenge.to_string()),
1044    );
1045    serde_json::to_string(&value).ok()
1046}
1047
1048fn build_challenge_payload(credential: &KrakenCredential) -> serde_json::Result<String> {
1049    let request = KrakenFuturesChallengeRequest {
1050        event: KrakenFuturesEvent::Challenge,
1051        api_key: credential.api_key().to_string(),
1052    };
1053    serde_json::to_string(&request)
1054}
1055
1056fn is_private_feed_key(key: &str) -> bool {
1057    key == "open_orders" || key == "fills"
1058}
1059
1060fn resubscribe_public(
1061    cmd_tx: &tokio::sync::mpsc::UnboundedSender<FuturesHandlerCommand>,
1062    subscriptions: &SubscriptionState,
1063    payloads: &HashMap<String, String>,
1064) {
1065    for (key, payload) in payloads {
1066        if is_private_feed_key(key) {
1067            continue;
1068        }
1069
1070        if let Err(e) = cmd_tx.send(FuturesHandlerCommand::Subscribe {
1071            payload: payload.clone(),
1072        }) {
1073            log::error!("Failed to send resubscribe: error={e}, topic={key}");
1074            continue;
1075        }
1076
1077        subscriptions.mark_subscribe(key);
1078    }
1079}
1080
1081fn resubscribe_private(
1082    cmd_tx: &tokio::sync::mpsc::UnboundedSender<FuturesHandlerCommand>,
1083    subscriptions: &SubscriptionState,
1084    payloads: &HashMap<String, String>,
1085    credential: &KrakenCredential,
1086    original_challenge: &str,
1087    signed_challenge: &str,
1088) {
1089    for (key, payload) in payloads {
1090        if !is_private_feed_key(key) {
1091            continue;
1092        }
1093
1094        let Some(updated) = update_private_payload_credentials(
1095            payload,
1096            credential.api_key(),
1097            original_challenge,
1098            signed_challenge,
1099        ) else {
1100            log::error!("Failed to update private payload for {key}");
1101            continue;
1102        };
1103
1104        if let Err(e) = cmd_tx.send(FuturesHandlerCommand::Subscribe { payload: updated }) {
1105            log::error!("Failed to send resubscribe: error={e}, topic={key}");
1106            continue;
1107        }
1108
1109        subscriptions.mark_subscribe(key);
1110    }
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115    use base64::{Engine, engine::general_purpose::STANDARD};
1116    use nautilus_network::websocket::AuthTracker;
1117    use rstest::rstest;
1118
1119    use super::*;
1120
1121    fn test_credential() -> KrakenCredential {
1122        let secret = STANDARD.encode(b"test_secret_key_24bytes!");
1123        KrakenCredential::new("test_key", secret)
1124    }
1125
1126    #[rstest]
1127    fn test_build_challenge_payload_emits_expected_event() {
1128        let credential = test_credential();
1129        let payload = build_challenge_payload(&credential).expect("serializes");
1130        assert!(payload.contains(r#""event":"challenge""#));
1131        assert!(payload.contains(r#""api_key":"test_key""#));
1132    }
1133
1134    #[rstest]
1135    #[tokio::test]
1136    async fn test_resubscribe_public_skips_private_feeds() {
1137        let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<FuturesHandlerCommand>();
1138        let subscriptions = SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER);
1139
1140        let mut payloads = HashMap::new();
1141        payloads.insert(
1142            "trades:PI_XBTUSD".to_string(),
1143            r#"{"event":"subscribe","feed":"trade","product_ids":["PI_XBTUSD"]}"#.to_string(),
1144        );
1145        payloads.insert(
1146            "open_orders".to_string(),
1147            r#"{"event":"subscribe","feed":"open_orders"}"#.to_string(),
1148        );
1149
1150        resubscribe_public(&cmd_tx, &subscriptions, &payloads);
1151
1152        let mut subscribed = Vec::new();
1153        while let Ok(FuturesHandlerCommand::Subscribe { payload }) = cmd_rx.try_recv() {
1154            subscribed.push(payload);
1155        }
1156
1157        assert_eq!(
1158            subscribed.len(),
1159            1,
1160            "only the public feed should resubscribe"
1161        );
1162        assert!(subscribed[0].contains("PI_XBTUSD"));
1163    }
1164
1165    #[rstest]
1166    #[tokio::test]
1167    async fn test_resubscribe_public_restores_publics_even_with_credentialed_client() {
1168        // The reconnect path runs resubscribe_public() unconditionally, so a
1169        // credentialed client's public feeds keep flowing even if re-auth fails.
1170        let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<FuturesHandlerCommand>();
1171        let subscriptions = SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER);
1172
1173        let mut payloads = HashMap::new();
1174        payloads.insert(
1175            "trades:PI_XBTUSD".to_string(),
1176            r#"{"event":"subscribe","feed":"trade","product_ids":["PI_XBTUSD"]}"#.to_string(),
1177        );
1178
1179        resubscribe_public(&cmd_tx, &subscriptions, &payloads);
1180
1181        match cmd_rx.try_recv().expect("public subscribe expected") {
1182            FuturesHandlerCommand::Subscribe { payload } => {
1183                assert!(payload.contains("PI_XBTUSD"));
1184            }
1185            other => panic!("expected Subscribe, was {other:?}"),
1186        }
1187    }
1188
1189    #[rstest]
1190    #[tokio::test]
1191    async fn test_resubscribe_private_patches_credentials() {
1192        let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<FuturesHandlerCommand>();
1193        let subscriptions = SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER);
1194        let credential = test_credential();
1195
1196        let mut payloads = HashMap::new();
1197        payloads.insert(
1198            "open_orders".to_string(),
1199            r#"{"event":"subscribe","feed":"open_orders","api_key":"","original_challenge":"","signed_challenge":""}"#.to_string(),
1200        );
1201        payloads.insert(
1202            "trades:PI_XBTUSD".to_string(),
1203            r#"{"event":"subscribe","feed":"trade","product_ids":["PI_XBTUSD"]}"#.to_string(),
1204        );
1205
1206        resubscribe_private(
1207            &cmd_tx,
1208            &subscriptions,
1209            &payloads,
1210            &credential,
1211            "server-challenge",
1212            "signed-value",
1213        );
1214
1215        let mut subscribed = Vec::new();
1216        while let Ok(FuturesHandlerCommand::Subscribe { payload }) = cmd_rx.try_recv() {
1217            subscribed.push(payload);
1218        }
1219
1220        assert_eq!(
1221            subscribed.len(),
1222            1,
1223            "only the private feed should resubscribe"
1224        );
1225        let value: serde_json::Value =
1226            serde_json::from_str(&subscribed[0]).expect("payload is valid JSON");
1227        assert_eq!(value["event"], "subscribe");
1228        assert_eq!(value["feed"], "open_orders");
1229        assert_eq!(value["api_key"], "test_key");
1230        assert_eq!(value["original_challenge"], "server-challenge");
1231        assert_eq!(value["signed_challenge"], "signed-value");
1232    }
1233
1234    #[rstest]
1235    #[tokio::test]
1236    async fn test_auth_tracker_succeed_completes_wait_for_result() {
1237        let tracker = AuthTracker::new();
1238        let receiver = tracker.begin();
1239
1240        let tracker_for_responder = tracker.clone();
1241
1242        tokio::spawn(async move {
1243            tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
1244            tracker_for_responder.succeed();
1245        });
1246
1247        tracker
1248            .wait_for_result::<KrakenWsError>(tokio::time::Duration::from_secs(1), receiver)
1249            .await
1250            .expect("auth should succeed");
1251
1252        assert!(tracker.is_authenticated());
1253    }
1254
1255    #[rstest]
1256    #[tokio::test]
1257    async fn test_auth_tracker_wait_for_result_times_out() {
1258        let tracker = AuthTracker::new();
1259        let receiver = tracker.begin();
1260
1261        let err = tracker
1262            .wait_for_result::<KrakenWsError>(tokio::time::Duration::from_millis(20), receiver)
1263            .await
1264            .expect_err("should time out");
1265
1266        assert!(matches!(err, KrakenWsError::AuthenticationError(_)));
1267        assert!(!tracker.is_authenticated());
1268    }
1269
1270    #[rstest]
1271    #[tokio::test]
1272    async fn test_authenticate_without_credentials_errors() {
1273        let client = KrakenFuturesWebSocketClient::new(
1274            "wss://futures.kraken.com/ws/v1".to_string(),
1275            60,
1276            None,
1277        );
1278
1279        let err = client.authenticate().await.expect_err("should fail");
1280        assert!(
1281            matches!(err, KrakenWsError::AuthenticationError(ref msg) if msg.contains("API credentials required")),
1282            "unexpected error: {err:?}"
1283        );
1284    }
1285
1286    #[rstest]
1287    #[tokio::test]
1288    async fn test_set_auth_credentials_marks_tracker_authenticated() {
1289        let client = KrakenFuturesWebSocketClient::with_credentials(
1290            "wss://futures.kraken.com/ws/v1".to_string(),
1291            60,
1292            Some(test_credential()),
1293            TransportBackend::default(),
1294            None,
1295        );
1296
1297        assert!(!client.is_authenticated());
1298
1299        client
1300            .set_auth_credentials("orig-challenge".to_string(), "signed-challenge".to_string())
1301            .await
1302            .expect("should succeed");
1303
1304        assert!(client.is_authenticated());
1305        client
1306            .wait_until_authenticated(0.05)
1307            .await
1308            .expect("should return immediately");
1309    }
1310
1311    #[rstest]
1312    #[tokio::test]
1313    async fn test_set_auth_credentials_without_credentials_errors() {
1314        let client = KrakenFuturesWebSocketClient::new(
1315            "wss://futures.kraken.com/ws/v1".to_string(),
1316            60,
1317            None,
1318        );
1319
1320        let err = client
1321            .set_auth_credentials("orig".to_string(), "signed".to_string())
1322            .await
1323            .expect_err("should fail");
1324        assert!(matches!(err, KrakenWsError::AuthenticationError(_)));
1325        assert!(!client.is_authenticated());
1326    }
1327
1328    #[rstest]
1329    #[tokio::test]
1330    async fn test_authenticate_with_challenge_updates_state() {
1331        let client = KrakenFuturesWebSocketClient::with_credentials(
1332            "wss://futures.kraken.com/ws/v1".to_string(),
1333            60,
1334            Some(test_credential()),
1335            TransportBackend::default(),
1336            None,
1337        );
1338
1339        client
1340            .authenticate_with_challenge("server-challenge")
1341            .await
1342            .expect("should succeed");
1343
1344        assert!(client.is_authenticated());
1345    }
1346
1347    #[rstest]
1348    #[tokio::test]
1349    async fn test_wait_until_authenticated_resolves_after_success() {
1350        let client = KrakenFuturesWebSocketClient::with_credentials(
1351            "wss://futures.kraken.com/ws/v1".to_string(),
1352            60,
1353            Some(test_credential()),
1354            TransportBackend::default(),
1355            None,
1356        );
1357
1358        let client_for_responder = client.clone();
1359
1360        tokio::spawn(async move {
1361            tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
1362            client_for_responder
1363                .set_auth_credentials("orig".to_string(), "signed".to_string())
1364                .await
1365                .expect("succeeds");
1366        });
1367
1368        client
1369            .wait_until_authenticated(1.0)
1370            .await
1371            .expect("should resolve once credentials are set");
1372    }
1373
1374    #[rstest]
1375    #[tokio::test]
1376    async fn test_wait_until_authenticated_times_out() {
1377        let client = KrakenFuturesWebSocketClient::with_credentials(
1378            "wss://futures.kraken.com/ws/v1".to_string(),
1379            60,
1380            Some(test_credential()),
1381            TransportBackend::default(),
1382            None,
1383        );
1384
1385        let err = client
1386            .wait_until_authenticated(0.05)
1387            .await
1388            .expect_err("should time out");
1389        assert!(matches!(err, KrakenWsError::AuthenticationError(_)));
1390    }
1391}