1use std::{
19 collections::HashMap,
20 sync::{
21 Arc, RwLock,
22 atomic::{AtomicBool, AtomicU8, Ordering},
23 },
24};
25
26use arc_swap::ArcSwap;
27use nautilus_common::live::get_runtime;
28use nautilus_core::AtomicMap;
29use nautilus_model::{
30 identifiers::{
31 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
32 },
33 instruments::{Instrument, InstrumentAny},
34};
35use nautilus_network::{
36 mode::ConnectionMode,
37 websocket::{
38 AuthTracker, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
39 channel_message_handler,
40 },
41};
42use tokio_util::sync::CancellationToken;
43
44use super::{
45 handler::{FuturesFeedHandler, FuturesHandlerCommand},
46 messages::{
47 KrakenFuturesChallengeRequest, KrakenFuturesEvent, KrakenFuturesFeed,
48 KrakenFuturesPrivateSubscribeRequest, KrakenFuturesRequest, KrakenFuturesWsMessage,
49 },
50};
51use crate::{
52 common::{credential::KrakenCredential, parse::truncate_cl_ord_id},
53 websocket::error::KrakenWsError,
54};
55
56pub const KRAKEN_FUTURES_WS_TOPIC_DELIMITER: char = ':';
60
61#[derive(Debug)]
63#[cfg_attr(
64 feature = "python",
65 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken", from_py_object)
66)]
67#[cfg_attr(
68 feature = "python",
69 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.kraken")
70)]
71pub struct KrakenFuturesWebSocketClient {
72 url: String,
73 heartbeat_secs: u64,
74 signal: Arc<AtomicBool>,
75 connection_mode: Arc<ArcSwap<AtomicU8>>,
76 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<FuturesHandlerCommand>>>,
77 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>>>,
78 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
79 subscriptions: SubscriptionState,
80 subscription_payloads: Arc<tokio::sync::RwLock<HashMap<String, String>>>,
81 auth_tracker: AuthTracker,
82 cancellation_token: CancellationToken,
83 credential: Option<KrakenCredential>,
84 original_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
85 signed_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
86 account_id: Arc<RwLock<Option<AccountId>>>,
87 truncated_id_map: Arc<AtomicMap<String, ClientOrderId>>,
88 order_instrument_map: Arc<AtomicMap<String, InstrumentId>>,
89 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
90 transport_backend: TransportBackend,
91 proxy_url: Option<String>,
92}
93
94impl Clone for KrakenFuturesWebSocketClient {
95 fn clone(&self) -> Self {
96 Self {
97 url: self.url.clone(),
98 heartbeat_secs: self.heartbeat_secs,
99 signal: Arc::clone(&self.signal),
100 connection_mode: Arc::clone(&self.connection_mode),
101 cmd_tx: Arc::clone(&self.cmd_tx),
102 out_rx: self.out_rx.clone(),
103 task_handle: self.task_handle.clone(),
104 subscriptions: self.subscriptions.clone(),
105 subscription_payloads: Arc::clone(&self.subscription_payloads),
106 auth_tracker: self.auth_tracker.clone(),
107 cancellation_token: self.cancellation_token.clone(),
108 credential: self.credential.clone(),
109 original_challenge: Arc::clone(&self.original_challenge),
110 signed_challenge: Arc::clone(&self.signed_challenge),
111 account_id: Arc::clone(&self.account_id),
112 truncated_id_map: Arc::clone(&self.truncated_id_map),
113 order_instrument_map: Arc::clone(&self.order_instrument_map),
114 instruments: Arc::clone(&self.instruments),
115 transport_backend: self.transport_backend,
116 proxy_url: self.proxy_url.clone(),
117 }
118 }
119}
120
121impl KrakenFuturesWebSocketClient {
122 #[must_use]
124 pub fn new(url: String, heartbeat_secs: u64, proxy_url: Option<String>) -> Self {
125 Self::with_credentials(
126 url,
127 heartbeat_secs,
128 None,
129 TransportBackend::default(),
130 proxy_url,
131 )
132 }
133
134 #[must_use]
136 pub fn with_credentials(
137 url: String,
138 heartbeat_secs: u64,
139 credential: Option<KrakenCredential>,
140 transport_backend: TransportBackend,
141 proxy_url: Option<String>,
142 ) -> Self {
143 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<FuturesHandlerCommand>();
144 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
145 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
146
147 Self {
148 url,
149 heartbeat_secs,
150 signal: Arc::new(AtomicBool::new(false)),
151 connection_mode,
152 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
153 out_rx: None,
154 task_handle: None,
155 subscriptions: SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER),
156 subscription_payloads: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
157 auth_tracker: AuthTracker::new(),
158 cancellation_token: CancellationToken::new(),
159 credential,
160 original_challenge: Arc::new(tokio::sync::RwLock::new(None)),
161 signed_challenge: Arc::new(tokio::sync::RwLock::new(None)),
162 account_id: Arc::new(RwLock::new(None)),
163 truncated_id_map: Arc::new(AtomicMap::new()),
164 order_instrument_map: Arc::new(AtomicMap::new()),
165 instruments: Arc::new(AtomicMap::new()),
166 transport_backend,
167 proxy_url,
168 }
169 }
170
171 #[must_use]
173 pub fn has_credentials(&self) -> bool {
174 self.credential.is_some()
175 }
176
177 #[must_use]
179 pub fn url(&self) -> &str {
180 &self.url
181 }
182
183 #[must_use]
185 pub fn is_closed(&self) -> bool {
186 ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
187 == ConnectionMode::Closed
188 }
189
190 #[must_use]
192 pub fn is_active(&self) -> bool {
193 ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
194 == ConnectionMode::Active
195 }
196
197 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
199 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
200
201 tokio::time::timeout(timeout, async {
202 while !self.is_active() {
203 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
204 }
205 })
206 .await
207 .map_err(|_| {
208 KrakenWsError::ConnectionError(format!(
209 "WebSocket connection timeout after {timeout_secs} seconds"
210 ))
211 })?;
212
213 Ok(())
214 }
215
216 #[must_use]
218 pub fn is_authenticated(&self) -> bool {
219 self.auth_tracker.is_authenticated()
220 }
221
222 pub async fn wait_until_authenticated(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
226 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
227 if self.auth_tracker.wait_for_authenticated(timeout).await {
228 Ok(())
229 } else {
230 Err(KrakenWsError::AuthenticationError(format!(
231 "Authentication not completed within {timeout_secs} seconds"
232 )))
233 }
234 }
235
236 pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
242 let credential = self.credential.as_ref().ok_or_else(|| {
243 KrakenWsError::AuthenticationError("API credentials required".to_string())
244 })?;
245
246 let payload = build_challenge_payload(credential)
247 .map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
248
249 let receiver = self.auth_tracker.begin();
250
251 self.cmd_tx
252 .read()
253 .await
254 .send(FuturesHandlerCommand::RequestChallenge { payload })
255 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
256
257 self.auth_tracker
258 .wait_for_result::<KrakenWsError>(tokio::time::Duration::from_secs(10), receiver)
259 .await?;
260
261 log::debug!("Futures WebSocket authentication successful");
262 Ok(())
263 }
264
265 pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
267 log::debug!("Connecting to Futures WebSocket: {}", self.url);
268
269 self.signal.store(false, Ordering::Relaxed);
270
271 let (raw_handler, raw_rx) = channel_message_handler();
272
273 let ws_config = WebSocketConfig {
274 url: self.url.clone(),
275 headers: vec![],
276 heartbeat: Some(self.heartbeat_secs),
277 heartbeat_msg: None, reconnect_timeout_ms: Some(5_000),
279 reconnect_delay_initial_ms: Some(500),
280 reconnect_delay_max_ms: Some(5_000),
281 reconnect_backoff_factor: Some(1.5),
282 reconnect_jitter_ms: Some(250),
283 reconnect_max_attempts: None,
284 idle_timeout_ms: None,
285 backend: self.transport_backend,
286 proxy_url: self.proxy_url.clone(),
287 };
288
289 let ws_client =
290 WebSocketClient::connect(ws_config, Some(raw_handler), None, None, vec![], None)
291 .await
292 .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
293
294 self.connection_mode
295 .store(ws_client.connection_mode_atomic());
296
297 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<KrakenFuturesWsMessage>();
298 self.out_rx = Some(Arc::new(out_rx));
299
300 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<FuturesHandlerCommand>();
301 *self.cmd_tx.write().await = cmd_tx.clone();
302
303 if let Err(e) = cmd_tx.send(FuturesHandlerCommand::SetClient(ws_client)) {
304 return Err(KrakenWsError::ConnectionError(format!(
305 "Failed to send WebSocketClient to handler: {e}"
306 )));
307 }
308
309 let signal = self.signal.clone();
310 let subscriptions = self.subscriptions.clone();
311 let subscription_payloads = self.subscription_payloads.clone();
312 let cmd_tx_for_reconnect = cmd_tx.clone();
313 let credential_for_reconnect = self.credential.clone();
314 let original_challenge_for_reconnect = self.original_challenge.clone();
315 let signed_challenge_for_reconnect = self.signed_challenge.clone();
316 let auth_tracker_for_reconnect = self.auth_tracker.clone();
317
318 let stream_handle = get_runtime().spawn(async move {
319 let mut handler =
320 FuturesFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
321 let mut pending_resubscribe = false;
322
323 loop {
324 match handler.next().await {
325 Some(KrakenFuturesWsMessage::Reconnected) => {
326 if signal.load(Ordering::Relaxed) {
327 continue;
328 }
329 log::info!("WebSocket reconnected");
330
331 let confirmed_topics = subscriptions.all_topics();
332 for topic in &confirmed_topics {
333 subscriptions.mark_failure(topic);
334 }
335
336 auth_tracker_for_reconnect.invalidate();
337 *original_challenge_for_reconnect.write().await = None;
338 *signed_challenge_for_reconnect.write().await = None;
339
340 let payloads = subscription_payloads.read().await.clone();
341
342 resubscribe_public(&cmd_tx_for_reconnect, &subscriptions, &payloads);
345
346 let has_private =
347 payloads.keys().any(|k| k == "open_orders" || k == "fills");
348
349 pending_resubscribe = false;
350
351 if has_private {
352 if let Some(ref cred) = credential_for_reconnect {
353 match build_challenge_payload(cred) {
354 Ok(payload) => {
355 let _rx = auth_tracker_for_reconnect.begin();
356
357 if let Err(e) = cmd_tx_for_reconnect.send(
358 FuturesHandlerCommand::RequestChallenge { payload },
359 ) {
360 log::error!("Failed to queue reconnect challenge: {e}");
361 } else {
362 pending_resubscribe = true;
363 }
364 }
365 Err(e) => {
366 log::error!("Failed to serialize reconnect challenge: {e}");
367 }
368 }
369 } else {
370 log::warn!(
371 "Private subscriptions exist but no credentials available"
372 );
373 }
374 }
375
376 if let Err(e) = out_tx.send(KrakenFuturesWsMessage::Reconnected) {
377 log::debug!("Output channel closed: {e}");
378 break;
379 }
380 }
381 Some(KrakenFuturesWsMessage::Challenge(challenge)) => {
382 let Some(ref cred) = credential_for_reconnect else {
383 log::warn!("Challenge received but no credentials configured");
384 auth_tracker_for_reconnect.fail("no credentials");
385 continue;
386 };
387
388 match cred.sign_ws_challenge(&challenge) {
389 Ok(signed) => {
390 *original_challenge_for_reconnect.write().await =
391 Some(challenge.clone());
392 *signed_challenge_for_reconnect.write().await =
393 Some(signed.clone());
394 auth_tracker_for_reconnect.succeed();
395 log::debug!("Signed WebSocket challenge");
396
397 if pending_resubscribe {
398 let payloads = subscription_payloads.read().await;
399 resubscribe_private(
400 &cmd_tx_for_reconnect,
401 &subscriptions,
402 &payloads,
403 cred,
404 challenge.as_str(),
405 signed.as_str(),
406 );
407 pending_resubscribe = false;
408 }
409 }
410 Err(e) => {
411 log::error!("Failed to sign challenge: {e}");
412 auth_tracker_for_reconnect.fail(e.to_string());
413 pending_resubscribe = false;
414 }
415 }
416 }
417 Some(msg) => {
418 if let Err(e) = out_tx.send(msg) {
419 log::debug!("Output channel closed: {e}");
420 break;
421 }
422 }
423 None => {
424 log::debug!("Handler stream ended");
425 break;
426 }
427 }
428 }
429
430 log::debug!("Futures handler task exiting");
431 });
432
433 self.task_handle = Some(Arc::new(stream_handle));
434
435 log::debug!("Futures WebSocket connected successfully");
436 Ok(())
437 }
438
439 pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
441 log::debug!("Disconnecting Futures WebSocket");
442
443 self.signal.store(true, Ordering::Relaxed);
444
445 if let Err(e) = self
446 .cmd_tx
447 .read()
448 .await
449 .send(FuturesHandlerCommand::Disconnect)
450 {
451 log::debug!(
452 "Failed to send disconnect command (handler may already be shut down): {e}"
453 );
454 }
455
456 if let Some(task_handle) = self.task_handle.take() {
457 match Arc::try_unwrap(task_handle) {
458 Ok(handle) => {
459 match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
460 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
461 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
462 Err(_) => {
463 log::warn!("Timeout waiting for task handle");
464 }
465 }
466 }
467 Err(arc_handle) => {
468 log::debug!("Cannot take ownership of task handle, aborting");
469 arc_handle.abort();
470 }
471 }
472 }
473
474 self.subscriptions.clear();
475 self.subscription_payloads.write().await.clear();
476 self.auth_tracker.fail("Disconnected");
477 Ok(())
478 }
479
480 pub async fn close(&mut self) -> Result<(), KrakenWsError> {
482 self.disconnect().await
483 }
484
485 pub async fn subscribe_mark_price(
487 &self,
488 instrument_id: InstrumentId,
489 ) -> Result<(), KrakenWsError> {
490 let symbol = instrument_id.symbol;
491 let key = format!("mark:{symbol}");
492
493 if !self.subscriptions.add_reference(&key) {
494 return Ok(());
495 }
496
497 self.subscriptions.mark_subscribe(&key);
498 self.subscriptions.confirm_subscribe(&key);
499 self.ensure_ticker_subscribed(symbol).await
500 }
501
502 pub async fn unsubscribe_mark_price(
504 &self,
505 instrument_id: InstrumentId,
506 ) -> Result<(), KrakenWsError> {
507 let symbol = instrument_id.symbol;
508 let key = format!("mark:{symbol}");
509
510 if !self.subscriptions.remove_reference(&key) {
511 return Ok(());
512 }
513
514 self.subscriptions.mark_unsubscribe(&key);
515 self.subscriptions.confirm_unsubscribe(&key);
516 self.maybe_unsubscribe_ticker(symbol).await
517 }
518
519 pub async fn subscribe_index_price(
521 &self,
522 instrument_id: InstrumentId,
523 ) -> Result<(), KrakenWsError> {
524 let symbol = instrument_id.symbol;
525 let key = format!("index:{symbol}");
526
527 if !self.subscriptions.add_reference(&key) {
528 return Ok(());
529 }
530
531 self.subscriptions.mark_subscribe(&key);
532 self.subscriptions.confirm_subscribe(&key);
533 self.ensure_ticker_subscribed(symbol).await
534 }
535
536 pub async fn unsubscribe_index_price(
538 &self,
539 instrument_id: InstrumentId,
540 ) -> Result<(), KrakenWsError> {
541 let symbol = instrument_id.symbol;
542 let key = format!("index:{symbol}");
543
544 if !self.subscriptions.remove_reference(&key) {
545 return Ok(());
546 }
547
548 self.subscriptions.mark_unsubscribe(&key);
549 self.subscriptions.confirm_unsubscribe(&key);
550 self.maybe_unsubscribe_ticker(symbol).await
551 }
552
553 pub async fn subscribe_funding_rate(
555 &self,
556 instrument_id: InstrumentId,
557 ) -> Result<(), KrakenWsError> {
558 let symbol = instrument_id.symbol;
559 let key = format!("funding:{symbol}");
560
561 if !self.subscriptions.add_reference(&key) {
562 return Ok(());
563 }
564
565 self.subscriptions.mark_subscribe(&key);
566 self.subscriptions.confirm_subscribe(&key);
567 self.ensure_ticker_subscribed(symbol).await
568 }
569
570 pub async fn unsubscribe_funding_rate(
572 &self,
573 instrument_id: InstrumentId,
574 ) -> Result<(), KrakenWsError> {
575 let symbol = instrument_id.symbol;
576 let key = format!("funding:{symbol}");
577
578 if !self.subscriptions.remove_reference(&key) {
579 return Ok(());
580 }
581
582 self.subscriptions.mark_unsubscribe(&key);
583 self.subscriptions.confirm_unsubscribe(&key);
584 self.maybe_unsubscribe_ticker(symbol).await
585 }
586
587 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
591 let symbol = instrument_id.symbol;
592 let key = format!("quotes:{symbol}");
593
594 if !self.subscriptions.add_reference(&key) {
595 return Ok(());
596 }
597
598 self.subscriptions.mark_subscribe(&key);
599 self.subscriptions.confirm_subscribe(&key);
600
601 self.ensure_book_subscribed(symbol).await
603 }
604
605 pub async fn unsubscribe_quotes(
607 &self,
608 instrument_id: InstrumentId,
609 ) -> Result<(), KrakenWsError> {
610 let symbol = instrument_id.symbol;
611 let key = format!("quotes:{symbol}");
612
613 if !self.subscriptions.remove_reference(&key) {
614 return Ok(());
615 }
616
617 self.subscriptions.mark_unsubscribe(&key);
618 self.subscriptions.confirm_unsubscribe(&key);
619 self.maybe_unsubscribe_book(symbol).await
620 }
621
622 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
624 let symbol = instrument_id.symbol;
625 let key = format!("trades:{symbol}");
626
627 if !self.subscriptions.add_reference(&key) {
628 return Ok(());
629 }
630
631 self.subscriptions.mark_subscribe(&key);
632 let payload = self
633 .send_subscribe_feed(KrakenFuturesFeed::Trade, vec![symbol.to_string()])
634 .await?;
635 self.subscriptions.confirm_subscribe(&key);
636 self.subscription_payloads
637 .write()
638 .await
639 .insert(key, payload);
640 Ok(())
641 }
642
643 pub async fn unsubscribe_trades(
645 &self,
646 instrument_id: InstrumentId,
647 ) -> Result<(), KrakenWsError> {
648 let symbol = instrument_id.symbol;
649 let key = format!("trades:{symbol}");
650
651 if !self.subscriptions.remove_reference(&key) {
652 return Ok(());
653 }
654
655 self.subscriptions.mark_unsubscribe(&key);
656 self.send_unsubscribe_feed(KrakenFuturesFeed::Trade, vec![symbol.to_string()])
657 .await?;
658 self.subscriptions.confirm_unsubscribe(&key);
659 self.subscription_payloads.write().await.remove(&key);
660 Ok(())
661 }
662
663 pub async fn subscribe_book(
668 &self,
669 instrument_id: InstrumentId,
670 _depth: Option<u32>,
671 ) -> Result<(), KrakenWsError> {
672 let symbol = instrument_id.symbol;
673
674 let deltas_key = format!("deltas:{symbol}");
675 self.subscriptions.add_reference(&deltas_key);
676 self.subscriptions.mark_subscribe(&deltas_key);
677 self.subscriptions.confirm_subscribe(&deltas_key);
678
679 self.ensure_book_subscribed(symbol).await
680 }
681
682 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
684 let symbol = instrument_id.symbol;
685
686 let deltas_key = format!("deltas:{symbol}");
687 self.subscriptions.remove_reference(&deltas_key);
688 self.subscriptions.mark_unsubscribe(&deltas_key);
689 self.subscriptions.confirm_unsubscribe(&deltas_key);
690
691 self.maybe_unsubscribe_book(symbol).await
692 }
693
694 async fn ensure_ticker_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
695 let ticker_key = format!("ticker:{symbol}");
696
697 if !self.subscriptions.add_reference(&ticker_key) {
698 return Ok(());
699 }
700
701 self.subscriptions.mark_subscribe(&ticker_key);
702 let payload = self
703 .send_subscribe_feed(KrakenFuturesFeed::Ticker, vec![symbol.to_string()])
704 .await?;
705 self.subscriptions.confirm_subscribe(&ticker_key);
706 self.subscription_payloads
707 .write()
708 .await
709 .insert(ticker_key, payload);
710 Ok(())
711 }
712
713 async fn maybe_unsubscribe_ticker(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
714 let ticker_key = format!("ticker:{symbol}");
715
716 if !self.subscriptions.remove_reference(&ticker_key) {
717 return Ok(());
718 }
719
720 self.subscriptions.mark_unsubscribe(&ticker_key);
721 self.send_unsubscribe_feed(KrakenFuturesFeed::Ticker, vec![symbol.to_string()])
722 .await?;
723 self.subscriptions.confirm_unsubscribe(&ticker_key);
724 self.subscription_payloads.write().await.remove(&ticker_key);
725 Ok(())
726 }
727
728 async fn ensure_book_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
729 let book_key = format!("book:{symbol}");
730
731 if !self.subscriptions.add_reference(&book_key) {
732 return Ok(());
733 }
734
735 self.subscriptions.mark_subscribe(&book_key);
736 let payload = self
737 .send_subscribe_feed(KrakenFuturesFeed::Book, vec![symbol.to_string()])
738 .await?;
739 self.subscriptions.confirm_subscribe(&book_key);
740 self.subscription_payloads
741 .write()
742 .await
743 .insert(book_key, payload);
744 Ok(())
745 }
746
747 async fn maybe_unsubscribe_book(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
748 let book_key = format!("book:{symbol}");
749
750 if !self.subscriptions.remove_reference(&book_key) {
751 return Ok(());
752 }
753
754 self.subscriptions.mark_unsubscribe(&book_key);
755 self.send_unsubscribe_feed(KrakenFuturesFeed::Book, vec![symbol.to_string()])
756 .await?;
757 self.subscriptions.confirm_unsubscribe(&book_key);
758 self.subscription_payloads.write().await.remove(&book_key);
759 Ok(())
760 }
761
762 pub fn take_output_rx(
764 &mut self,
765 ) -> Option<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>> {
766 self.out_rx.take().and_then(|arc| Arc::try_unwrap(arc).ok())
767 }
768
769 pub async fn set_auth_credentials(
771 &self,
772 original_challenge: String,
773 signed_challenge: String,
774 ) -> Result<(), KrakenWsError> {
775 let _credential = self.credential.as_ref().ok_or_else(|| {
776 KrakenWsError::AuthenticationError("API credentials required".to_string())
777 })?;
778
779 *self.original_challenge.write().await = Some(original_challenge);
780 *self.signed_challenge.write().await = Some(signed_challenge);
781 self.auth_tracker.succeed();
782
783 Ok(())
784 }
785
786 pub fn sign_challenge(&self, challenge: &str) -> Result<String, KrakenWsError> {
790 let credential = self.credential.as_ref().ok_or_else(|| {
791 KrakenWsError::AuthenticationError("API credentials required".to_string())
792 })?;
793
794 credential.sign_ws_challenge(challenge).map_err(|e| {
795 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
796 })
797 }
798
799 pub async fn authenticate_with_challenge(&self, challenge: &str) -> Result<(), KrakenWsError> {
801 let credential = self.credential.as_ref().ok_or_else(|| {
802 KrakenWsError::AuthenticationError("API credentials required".to_string())
803 })?;
804
805 let signed_challenge = credential.sign_ws_challenge(challenge).map_err(|e| {
806 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
807 })?;
808
809 self.set_auth_credentials(challenge.to_string(), signed_challenge)
810 .await
811 }
812
813 pub fn set_account_id(&self, account_id: AccountId) {
815 if let Ok(mut guard) = self.account_id.write() {
816 *guard = Some(account_id);
817 }
818 }
819
820 #[must_use]
822 pub fn account_id(&self) -> Option<AccountId> {
823 self.account_id.read().ok().and_then(|g| *g)
824 }
825
826 #[must_use]
828 pub fn account_id_shared(&self) -> &Arc<RwLock<Option<AccountId>>> {
829 &self.account_id
830 }
831
832 #[must_use]
834 pub fn truncated_id_map(&self) -> &Arc<AtomicMap<String, ClientOrderId>> {
835 &self.truncated_id_map
836 }
837
838 #[must_use]
840 pub fn order_instrument_map(&self) -> &Arc<AtomicMap<String, InstrumentId>> {
841 &self.order_instrument_map
842 }
843
844 #[must_use]
846 pub fn instruments_shared(&self) -> &Arc<AtomicMap<InstrumentId, InstrumentAny>> {
847 &self.instruments
848 }
849
850 #[must_use]
852 pub fn subscriptions(&self) -> &SubscriptionState {
853 &self.subscriptions
854 }
855
856 pub fn cache_instrument(&self, instrument: InstrumentAny) {
858 self.instruments.insert(instrument.id(), instrument);
859 }
860
861 pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
863 self.instruments.rcu(|m| {
864 for instrument in instruments {
865 m.insert(instrument.id(), instrument.clone());
866 }
867 });
868 }
869
870 pub fn cache_client_order(
876 &self,
877 client_order_id: ClientOrderId,
878 venue_order_id: Option<VenueOrderId>,
879 instrument_id: InstrumentId,
880 _trader_id: TraderId,
881 _strategy_id: StrategyId,
882 ) {
883 let truncated = truncate_cl_ord_id(&client_order_id);
884
885 if truncated != client_order_id.as_str() {
886 self.truncated_id_map.insert(truncated, client_order_id);
887 }
888
889 if let Some(venue_id) = venue_order_id {
890 self.order_instrument_map
891 .insert(venue_id.to_string(), instrument_id);
892 }
893 }
894
895 pub async fn subscribe_open_orders(&self) -> Result<(), KrakenWsError> {
897 let key = "open_orders";
898 if !self.subscriptions.add_reference(key) {
899 return Ok(());
900 }
901
902 self.subscriptions.mark_subscribe(key);
903 let payload = self
904 .send_private_subscribe_feed(KrakenFuturesFeed::OpenOrders)
905 .await?;
906 self.subscriptions.confirm_subscribe(key);
907 self.subscription_payloads
908 .write()
909 .await
910 .insert(key.to_string(), payload);
911 Ok(())
912 }
913
914 pub async fn subscribe_fills(&self) -> Result<(), KrakenWsError> {
916 let key = "fills";
917 if !self.subscriptions.add_reference(key) {
918 return Ok(());
919 }
920
921 self.subscriptions.mark_subscribe(key);
922 let payload = self
923 .send_private_subscribe_feed(KrakenFuturesFeed::Fills)
924 .await?;
925 self.subscriptions.confirm_subscribe(key);
926 self.subscription_payloads
927 .write()
928 .await
929 .insert(key.to_string(), payload);
930 Ok(())
931 }
932
933 pub async fn subscribe_executions(&self) -> Result<(), KrakenWsError> {
935 self.subscribe_open_orders().await?;
936 self.subscribe_fills().await?;
937 Ok(())
938 }
939
940 async fn send_subscribe_feed(
941 &self,
942 feed: KrakenFuturesFeed,
943 product_ids: Vec<String>,
944 ) -> Result<String, KrakenWsError> {
945 let request = KrakenFuturesRequest {
946 event: KrakenFuturesEvent::Subscribe,
947 feed,
948 product_ids,
949 };
950 let payload =
951 serde_json::to_string(&request).map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
952 self.cmd_tx
953 .read()
954 .await
955 .send(FuturesHandlerCommand::Subscribe {
956 payload: payload.clone(),
957 })
958 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
959 Ok(payload)
960 }
961
962 async fn send_unsubscribe_feed(
963 &self,
964 feed: KrakenFuturesFeed,
965 product_ids: Vec<String>,
966 ) -> Result<(), KrakenWsError> {
967 let request = KrakenFuturesRequest {
968 event: KrakenFuturesEvent::Unsubscribe,
969 feed,
970 product_ids,
971 };
972 let payload =
973 serde_json::to_string(&request).map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
974 self.cmd_tx
975 .read()
976 .await
977 .send(FuturesHandlerCommand::Unsubscribe { payload })
978 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
979 Ok(())
980 }
981
982 async fn send_private_subscribe_feed(
983 &self,
984 feed: KrakenFuturesFeed,
985 ) -> Result<String, KrakenWsError> {
986 let credential = self.credential.as_ref().ok_or_else(|| {
987 KrakenWsError::AuthenticationError("API credentials required".to_string())
988 })?;
989 let original_challenge = self
990 .original_challenge
991 .read()
992 .await
993 .clone()
994 .ok_or_else(|| {
995 KrakenWsError::AuthenticationError(
996 "Must authenticate before subscribing to private feeds".to_string(),
997 )
998 })?;
999 let signed_challenge = self.signed_challenge.read().await.clone().ok_or_else(|| {
1000 KrakenWsError::AuthenticationError(
1001 "Must authenticate before subscribing to private feeds".to_string(),
1002 )
1003 })?;
1004
1005 let request = KrakenFuturesPrivateSubscribeRequest {
1006 event: KrakenFuturesEvent::Subscribe,
1007 feed,
1008 api_key: credential.api_key().to_string(),
1009 original_challenge,
1010 signed_challenge,
1011 };
1012 let payload =
1013 serde_json::to_string(&request).map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
1014 self.cmd_tx
1015 .read()
1016 .await
1017 .send(FuturesHandlerCommand::Subscribe {
1018 payload: payload.clone(),
1019 })
1020 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
1021 Ok(payload)
1022 }
1023}
1024
1025fn update_private_payload_credentials(
1026 payload: &str,
1027 api_key: &str,
1028 original_challenge: &str,
1029 signed_challenge: &str,
1030) -> Option<String> {
1031 let mut value: serde_json::Value = serde_json::from_str(payload).ok()?;
1032 let obj = value.as_object_mut()?;
1033 obj.insert(
1034 "api_key".to_string(),
1035 serde_json::Value::String(api_key.to_string()),
1036 );
1037 obj.insert(
1038 "original_challenge".to_string(),
1039 serde_json::Value::String(original_challenge.to_string()),
1040 );
1041 obj.insert(
1042 "signed_challenge".to_string(),
1043 serde_json::Value::String(signed_challenge.to_string()),
1044 );
1045 serde_json::to_string(&value).ok()
1046}
1047
1048fn build_challenge_payload(credential: &KrakenCredential) -> serde_json::Result<String> {
1049 let request = KrakenFuturesChallengeRequest {
1050 event: KrakenFuturesEvent::Challenge,
1051 api_key: credential.api_key().to_string(),
1052 };
1053 serde_json::to_string(&request)
1054}
1055
1056fn is_private_feed_key(key: &str) -> bool {
1057 key == "open_orders" || key == "fills"
1058}
1059
1060fn resubscribe_public(
1061 cmd_tx: &tokio::sync::mpsc::UnboundedSender<FuturesHandlerCommand>,
1062 subscriptions: &SubscriptionState,
1063 payloads: &HashMap<String, String>,
1064) {
1065 for (key, payload) in payloads {
1066 if is_private_feed_key(key) {
1067 continue;
1068 }
1069
1070 if let Err(e) = cmd_tx.send(FuturesHandlerCommand::Subscribe {
1071 payload: payload.clone(),
1072 }) {
1073 log::error!("Failed to send resubscribe: error={e}, topic={key}");
1074 continue;
1075 }
1076
1077 subscriptions.mark_subscribe(key);
1078 }
1079}
1080
1081fn resubscribe_private(
1082 cmd_tx: &tokio::sync::mpsc::UnboundedSender<FuturesHandlerCommand>,
1083 subscriptions: &SubscriptionState,
1084 payloads: &HashMap<String, String>,
1085 credential: &KrakenCredential,
1086 original_challenge: &str,
1087 signed_challenge: &str,
1088) {
1089 for (key, payload) in payloads {
1090 if !is_private_feed_key(key) {
1091 continue;
1092 }
1093
1094 let Some(updated) = update_private_payload_credentials(
1095 payload,
1096 credential.api_key(),
1097 original_challenge,
1098 signed_challenge,
1099 ) else {
1100 log::error!("Failed to update private payload for {key}");
1101 continue;
1102 };
1103
1104 if let Err(e) = cmd_tx.send(FuturesHandlerCommand::Subscribe { payload: updated }) {
1105 log::error!("Failed to send resubscribe: error={e}, topic={key}");
1106 continue;
1107 }
1108
1109 subscriptions.mark_subscribe(key);
1110 }
1111}
1112
1113#[cfg(test)]
1114mod tests {
1115 use base64::{Engine, engine::general_purpose::STANDARD};
1116 use nautilus_network::websocket::AuthTracker;
1117 use rstest::rstest;
1118
1119 use super::*;
1120
1121 fn test_credential() -> KrakenCredential {
1122 let secret = STANDARD.encode(b"test_secret_key_24bytes!");
1123 KrakenCredential::new("test_key", secret)
1124 }
1125
1126 #[rstest]
1127 fn test_build_challenge_payload_emits_expected_event() {
1128 let credential = test_credential();
1129 let payload = build_challenge_payload(&credential).expect("serializes");
1130 assert!(payload.contains(r#""event":"challenge""#));
1131 assert!(payload.contains(r#""api_key":"test_key""#));
1132 }
1133
1134 #[rstest]
1135 #[tokio::test]
1136 async fn test_resubscribe_public_skips_private_feeds() {
1137 let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<FuturesHandlerCommand>();
1138 let subscriptions = SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER);
1139
1140 let mut payloads = HashMap::new();
1141 payloads.insert(
1142 "trades:PI_XBTUSD".to_string(),
1143 r#"{"event":"subscribe","feed":"trade","product_ids":["PI_XBTUSD"]}"#.to_string(),
1144 );
1145 payloads.insert(
1146 "open_orders".to_string(),
1147 r#"{"event":"subscribe","feed":"open_orders"}"#.to_string(),
1148 );
1149
1150 resubscribe_public(&cmd_tx, &subscriptions, &payloads);
1151
1152 let mut subscribed = Vec::new();
1153 while let Ok(FuturesHandlerCommand::Subscribe { payload }) = cmd_rx.try_recv() {
1154 subscribed.push(payload);
1155 }
1156
1157 assert_eq!(
1158 subscribed.len(),
1159 1,
1160 "only the public feed should resubscribe"
1161 );
1162 assert!(subscribed[0].contains("PI_XBTUSD"));
1163 }
1164
1165 #[rstest]
1166 #[tokio::test]
1167 async fn test_resubscribe_public_restores_publics_even_with_credentialed_client() {
1168 let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<FuturesHandlerCommand>();
1171 let subscriptions = SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER);
1172
1173 let mut payloads = HashMap::new();
1174 payloads.insert(
1175 "trades:PI_XBTUSD".to_string(),
1176 r#"{"event":"subscribe","feed":"trade","product_ids":["PI_XBTUSD"]}"#.to_string(),
1177 );
1178
1179 resubscribe_public(&cmd_tx, &subscriptions, &payloads);
1180
1181 match cmd_rx.try_recv().expect("public subscribe expected") {
1182 FuturesHandlerCommand::Subscribe { payload } => {
1183 assert!(payload.contains("PI_XBTUSD"));
1184 }
1185 other => panic!("expected Subscribe, was {other:?}"),
1186 }
1187 }
1188
1189 #[rstest]
1190 #[tokio::test]
1191 async fn test_resubscribe_private_patches_credentials() {
1192 let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<FuturesHandlerCommand>();
1193 let subscriptions = SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER);
1194 let credential = test_credential();
1195
1196 let mut payloads = HashMap::new();
1197 payloads.insert(
1198 "open_orders".to_string(),
1199 r#"{"event":"subscribe","feed":"open_orders","api_key":"","original_challenge":"","signed_challenge":""}"#.to_string(),
1200 );
1201 payloads.insert(
1202 "trades:PI_XBTUSD".to_string(),
1203 r#"{"event":"subscribe","feed":"trade","product_ids":["PI_XBTUSD"]}"#.to_string(),
1204 );
1205
1206 resubscribe_private(
1207 &cmd_tx,
1208 &subscriptions,
1209 &payloads,
1210 &credential,
1211 "server-challenge",
1212 "signed-value",
1213 );
1214
1215 let mut subscribed = Vec::new();
1216 while let Ok(FuturesHandlerCommand::Subscribe { payload }) = cmd_rx.try_recv() {
1217 subscribed.push(payload);
1218 }
1219
1220 assert_eq!(
1221 subscribed.len(),
1222 1,
1223 "only the private feed should resubscribe"
1224 );
1225 let value: serde_json::Value =
1226 serde_json::from_str(&subscribed[0]).expect("payload is valid JSON");
1227 assert_eq!(value["event"], "subscribe");
1228 assert_eq!(value["feed"], "open_orders");
1229 assert_eq!(value["api_key"], "test_key");
1230 assert_eq!(value["original_challenge"], "server-challenge");
1231 assert_eq!(value["signed_challenge"], "signed-value");
1232 }
1233
1234 #[rstest]
1235 #[tokio::test]
1236 async fn test_auth_tracker_succeed_completes_wait_for_result() {
1237 let tracker = AuthTracker::new();
1238 let receiver = tracker.begin();
1239
1240 let tracker_for_responder = tracker.clone();
1241
1242 tokio::spawn(async move {
1243 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
1244 tracker_for_responder.succeed();
1245 });
1246
1247 tracker
1248 .wait_for_result::<KrakenWsError>(tokio::time::Duration::from_secs(1), receiver)
1249 .await
1250 .expect("auth should succeed");
1251
1252 assert!(tracker.is_authenticated());
1253 }
1254
1255 #[rstest]
1256 #[tokio::test]
1257 async fn test_auth_tracker_wait_for_result_times_out() {
1258 let tracker = AuthTracker::new();
1259 let receiver = tracker.begin();
1260
1261 let err = tracker
1262 .wait_for_result::<KrakenWsError>(tokio::time::Duration::from_millis(20), receiver)
1263 .await
1264 .expect_err("should time out");
1265
1266 assert!(matches!(err, KrakenWsError::AuthenticationError(_)));
1267 assert!(!tracker.is_authenticated());
1268 }
1269
1270 #[rstest]
1271 #[tokio::test]
1272 async fn test_authenticate_without_credentials_errors() {
1273 let client = KrakenFuturesWebSocketClient::new(
1274 "wss://futures.kraken.com/ws/v1".to_string(),
1275 60,
1276 None,
1277 );
1278
1279 let err = client.authenticate().await.expect_err("should fail");
1280 assert!(
1281 matches!(err, KrakenWsError::AuthenticationError(ref msg) if msg.contains("API credentials required")),
1282 "unexpected error: {err:?}"
1283 );
1284 }
1285
1286 #[rstest]
1287 #[tokio::test]
1288 async fn test_set_auth_credentials_marks_tracker_authenticated() {
1289 let client = KrakenFuturesWebSocketClient::with_credentials(
1290 "wss://futures.kraken.com/ws/v1".to_string(),
1291 60,
1292 Some(test_credential()),
1293 TransportBackend::default(),
1294 None,
1295 );
1296
1297 assert!(!client.is_authenticated());
1298
1299 client
1300 .set_auth_credentials("orig-challenge".to_string(), "signed-challenge".to_string())
1301 .await
1302 .expect("should succeed");
1303
1304 assert!(client.is_authenticated());
1305 client
1306 .wait_until_authenticated(0.05)
1307 .await
1308 .expect("should return immediately");
1309 }
1310
1311 #[rstest]
1312 #[tokio::test]
1313 async fn test_set_auth_credentials_without_credentials_errors() {
1314 let client = KrakenFuturesWebSocketClient::new(
1315 "wss://futures.kraken.com/ws/v1".to_string(),
1316 60,
1317 None,
1318 );
1319
1320 let err = client
1321 .set_auth_credentials("orig".to_string(), "signed".to_string())
1322 .await
1323 .expect_err("should fail");
1324 assert!(matches!(err, KrakenWsError::AuthenticationError(_)));
1325 assert!(!client.is_authenticated());
1326 }
1327
1328 #[rstest]
1329 #[tokio::test]
1330 async fn test_authenticate_with_challenge_updates_state() {
1331 let client = KrakenFuturesWebSocketClient::with_credentials(
1332 "wss://futures.kraken.com/ws/v1".to_string(),
1333 60,
1334 Some(test_credential()),
1335 TransportBackend::default(),
1336 None,
1337 );
1338
1339 client
1340 .authenticate_with_challenge("server-challenge")
1341 .await
1342 .expect("should succeed");
1343
1344 assert!(client.is_authenticated());
1345 }
1346
1347 #[rstest]
1348 #[tokio::test]
1349 async fn test_wait_until_authenticated_resolves_after_success() {
1350 let client = KrakenFuturesWebSocketClient::with_credentials(
1351 "wss://futures.kraken.com/ws/v1".to_string(),
1352 60,
1353 Some(test_credential()),
1354 TransportBackend::default(),
1355 None,
1356 );
1357
1358 let client_for_responder = client.clone();
1359
1360 tokio::spawn(async move {
1361 tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
1362 client_for_responder
1363 .set_auth_credentials("orig".to_string(), "signed".to_string())
1364 .await
1365 .expect("succeeds");
1366 });
1367
1368 client
1369 .wait_until_authenticated(1.0)
1370 .await
1371 .expect("should resolve once credentials are set");
1372 }
1373
1374 #[rstest]
1375 #[tokio::test]
1376 async fn test_wait_until_authenticated_times_out() {
1377 let client = KrakenFuturesWebSocketClient::with_credentials(
1378 "wss://futures.kraken.com/ws/v1".to_string(),
1379 60,
1380 Some(test_credential()),
1381 TransportBackend::default(),
1382 None,
1383 );
1384
1385 let err = client
1386 .wait_until_authenticated(0.05)
1387 .await
1388 .expect_err("should time out");
1389 assert!(matches!(err, KrakenWsError::AuthenticationError(_)));
1390 }
1391}