Skip to main content

nautilus_kraken/websocket/futures/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Kraken Futures.
17
18use std::{
19    collections::VecDeque,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use nautilus_network::{
27    RECONNECTED,
28    websocket::{SubscriptionState, WebSocketClient},
29};
30use serde::Deserialize;
31use serde_json::Value;
32use tokio_tungstenite::tungstenite::Message;
33use ustr::Ustr;
34
35use super::messages::{
36    KrakenFuturesBookDelta, KrakenFuturesBookSnapshot, KrakenFuturesChannel,
37    KrakenFuturesFillsDelta, KrakenFuturesMessageType, KrakenFuturesOpenOrdersCancel,
38    KrakenFuturesOpenOrdersDelta, KrakenFuturesTickerData, KrakenFuturesTradeData,
39    KrakenFuturesWsMessage, classify_futures_message,
40};
41
42/// Commands sent from the outer client to the inner message handler.
43#[derive(Debug)]
44pub enum FuturesHandlerCommand {
45    SetClient(WebSocketClient),
46    Disconnect,
47    Subscribe { payload: String },
48    Unsubscribe { payload: String },
49    RequestChallenge { payload: String },
50}
51
52/// WebSocket message handler for Kraken Futures.
53pub struct FuturesFeedHandler {
54    signal: Arc<AtomicBool>,
55    inner: Option<WebSocketClient>,
56    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<FuturesHandlerCommand>,
57    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
58    subscriptions: SubscriptionState,
59    pending_messages: VecDeque<KrakenFuturesWsMessage>,
60}
61
62impl FuturesFeedHandler {
63    /// Creates a new [`FuturesFeedHandler`] instance.
64    pub fn new(
65        signal: Arc<AtomicBool>,
66        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<FuturesHandlerCommand>,
67        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
68        subscriptions: SubscriptionState,
69    ) -> Self {
70        Self {
71            signal,
72            inner: None,
73            cmd_rx,
74            raw_rx,
75            subscriptions,
76            pending_messages: VecDeque::new(),
77        }
78    }
79
80    pub fn is_stopped(&self) -> bool {
81        self.signal.load(Ordering::Relaxed)
82    }
83
84    fn is_subscribed(&self, channel: KrakenFuturesChannel, symbol: &Ustr) -> bool {
85        let channel_ustr = Ustr::from(channel.as_ref());
86        self.subscriptions.is_subscribed(&channel_ustr, symbol)
87    }
88
89    /// Processes messages and commands, returning when stopped or stream ends.
90    pub async fn next(&mut self) -> Option<KrakenFuturesWsMessage> {
91        if let Some(msg) = self.pending_messages.pop_front() {
92            return Some(msg);
93        }
94
95        loop {
96            tokio::select! {
97                Some(cmd) = self.cmd_rx.recv() => {
98                    match cmd {
99                        FuturesHandlerCommand::SetClient(client) => {
100                            log::debug!("WebSocketClient received by futures handler");
101                            self.inner = Some(client);
102                        }
103                        FuturesHandlerCommand::Disconnect => {
104                            log::debug!("Disconnect command received");
105
106                            if let Some(client) = self.inner.take() {
107                                client.disconnect().await;
108                            }
109                            return None;
110                        }
111                        FuturesHandlerCommand::Subscribe { payload }
112                        | FuturesHandlerCommand::Unsubscribe { payload }
113                        | FuturesHandlerCommand::RequestChallenge { payload } => {
114                            if let Some(ref client) = self.inner
115                                && let Err(e) = client.send_text(payload, None).await
116                            {
117                                log::error!("Failed to send text: {e}");
118                            }
119                        }
120                    }
121                }
122
123                msg = self.raw_rx.recv() => {
124                    let msg = match msg {
125                        Some(msg) => msg,
126                        None => {
127                            log::debug!("WebSocket stream closed");
128                            return None;
129                        }
130                    };
131
132                    if self.signal.load(Ordering::Relaxed) {
133                        log::debug!("Stop signal received");
134                        return None;
135                    }
136
137                    match &msg {
138                        Message::Ping(data) => {
139                            let len = data.len();
140                            log::trace!("Received ping frame with {len} bytes");
141
142                            if let Some(client) = &self.inner
143                                && let Err(e) = client.send_pong(data.to_vec()).await
144                            {
145                                log::warn!("Failed to send pong frame: {e}");
146                            }
147                            continue;
148                        }
149                        Message::Pong(_) => {
150                            log::debug!("Received pong from server");
151                            continue;
152                        }
153                        Message::Close(_) => {
154                            log::info!("WebSocket connection closed");
155                            return None;
156                        }
157                        Message::Frame(_) => {
158                            log::trace!("Received raw frame");
159                            continue;
160                        }
161                        _ => {}
162                    }
163
164                    let text: &str = match &msg {
165                        Message::Text(text) => text,
166                        Message::Binary(data) => match std::str::from_utf8(data) {
167                            Ok(s) => s,
168                            Err(_) => continue,
169                        },
170                        _ => continue,
171                    };
172
173                    if text == RECONNECTED {
174                        log::info!("Received WebSocket reconnected signal");
175                        return Some(KrakenFuturesWsMessage::Reconnected);
176                    }
177
178                    self.parse_message(text);
179
180                    if let Some(msg) = self.pending_messages.pop_front() {
181                        return Some(msg);
182                    }
183                }
184            }
185        }
186    }
187
188    fn parse_message(&mut self, text: &str) {
189        let value: Value = match serde_json::from_str(text) {
190            Ok(v) => v,
191            Err(e) => {
192                log::debug!("Failed to parse message as JSON: {e}");
193                return;
194            }
195        };
196
197        match classify_futures_message(&value) {
198            KrakenFuturesMessageType::OpenOrdersSnapshot => {
199                log::debug!(
200                    "Skipping open_orders_snapshot (REST reconciliation handles initial state)"
201                );
202            }
203            KrakenFuturesMessageType::OpenOrdersCancel => {
204                self.handle_open_orders_cancel_value(value);
205            }
206            KrakenFuturesMessageType::OpenOrdersDelta => {
207                self.handle_open_orders_delta_value(value);
208            }
209            KrakenFuturesMessageType::FillsSnapshot => {
210                log::debug!("Skipping fills_snapshot (REST reconciliation handles initial state)");
211            }
212            KrakenFuturesMessageType::FillsDelta => {
213                self.handle_fills_delta_value(value);
214            }
215            KrakenFuturesMessageType::Ticker => {
216                self.handle_ticker_message_value(value);
217            }
218            KrakenFuturesMessageType::TradeSnapshot => {
219                log::debug!("Skipping trade_snapshot (only streaming live trades)");
220            }
221            KrakenFuturesMessageType::Trade => {
222                self.handle_trade_message_value(value);
223            }
224            KrakenFuturesMessageType::BookSnapshot => {
225                self.handle_book_snapshot_value(value);
226            }
227            KrakenFuturesMessageType::BookDelta => {
228                self.handle_book_delta_value(value);
229            }
230            KrakenFuturesMessageType::Info => {
231                log::debug!("Received info message: {text}");
232            }
233            KrakenFuturesMessageType::Pong => {
234                log::debug!("Received text pong response");
235            }
236            KrakenFuturesMessageType::Subscribed => {
237                log::debug!("Subscription confirmed: {text}");
238            }
239            KrakenFuturesMessageType::Unsubscribed => {
240                log::debug!("Unsubscription confirmed: {text}");
241            }
242            KrakenFuturesMessageType::Challenge => {
243                self.handle_challenge_response_value(value);
244            }
245            KrakenFuturesMessageType::Heartbeat => {
246                log::trace!("Heartbeat received");
247            }
248            KrakenFuturesMessageType::Error => {
249                let message = value
250                    .get("message")
251                    .and_then(|v| v.as_str())
252                    .unwrap_or("Unknown error");
253                log::error!("Kraken Futures WebSocket error: {message}");
254            }
255            KrakenFuturesMessageType::Alert => {
256                let message = value
257                    .get("message")
258                    .and_then(|v| v.as_str())
259                    .unwrap_or("Unknown alert");
260                log::warn!("Kraken Futures WebSocket alert: {message}");
261            }
262            KrakenFuturesMessageType::Unknown => {
263                log::warn!("Unhandled futures message: {text}");
264            }
265        }
266    }
267
268    fn handle_challenge_response_value(&mut self, value: Value) {
269        #[derive(Deserialize)]
270        struct ChallengeResponse {
271            message: String,
272        }
273
274        match serde_json::from_value::<ChallengeResponse>(value) {
275            Ok(response) => {
276                let len = response.message.len();
277                log::debug!("Challenge received, length: {len}");
278
279                self.pending_messages
280                    .push_back(KrakenFuturesWsMessage::Challenge(response.message));
281            }
282            Err(e) => {
283                log::error!("Failed to parse challenge response: {e}");
284            }
285        }
286    }
287
288    fn handle_ticker_message_value(&mut self, value: Value) {
289        let ticker = match serde_json::from_value::<KrakenFuturesTickerData>(value) {
290            Ok(t) => t,
291            Err(e) => {
292                log::debug!("Failed to parse ticker: {e}");
293                return;
294            }
295        };
296
297        self.pending_messages
298            .push_back(KrakenFuturesWsMessage::Ticker(ticker));
299    }
300
301    fn handle_trade_message_value(&mut self, value: Value) {
302        let trade = match serde_json::from_value::<KrakenFuturesTradeData>(value) {
303            Ok(t) => t,
304            Err(e) => {
305                log::warn!("Failed to parse trade: {e}");
306                return;
307            }
308        };
309
310        if !self.is_subscribed(KrakenFuturesChannel::Trades, &trade.product_id) {
311            log::debug!(
312                "Received trade for unsubscribed product: {}",
313                trade.product_id
314            );
315            return;
316        }
317
318        self.pending_messages
319            .push_back(KrakenFuturesWsMessage::Trade(trade));
320    }
321
322    fn handle_book_snapshot_value(&mut self, value: Value) {
323        let snapshot = match serde_json::from_value::<KrakenFuturesBookSnapshot>(value) {
324            Ok(s) => s,
325            Err(e) => {
326                log::warn!("Failed to parse book snapshot: {e}");
327                return;
328            }
329        };
330
331        let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &snapshot.product_id);
332        let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &snapshot.product_id);
333
334        if !has_book && !has_quotes {
335            log::debug!(
336                "Received book snapshot for unsubscribed product: {}",
337                snapshot.product_id
338            );
339            return;
340        }
341
342        self.pending_messages
343            .push_back(KrakenFuturesWsMessage::BookSnapshot(snapshot));
344    }
345
346    fn handle_book_delta_value(&mut self, value: Value) {
347        let delta = match serde_json::from_value::<KrakenFuturesBookDelta>(value) {
348            Ok(d) => d,
349            Err(e) => {
350                log::warn!("Failed to parse book delta: {e}");
351                return;
352            }
353        };
354
355        let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &delta.product_id);
356        let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &delta.product_id);
357
358        if !has_book && !has_quotes {
359            log::debug!(
360                "Received book delta for unsubscribed product: {}",
361                delta.product_id
362            );
363            return;
364        }
365
366        self.pending_messages
367            .push_back(KrakenFuturesWsMessage::BookDelta(delta));
368    }
369
370    fn handle_open_orders_delta_value(&mut self, value: Value) {
371        let delta = match serde_json::from_value::<KrakenFuturesOpenOrdersDelta>(value) {
372            Ok(d) => d,
373            Err(e) => {
374                log::error!("Failed to parse open_orders delta: {e}");
375                return;
376            }
377        };
378
379        log::debug!(
380            "Received open_orders delta: order_id={}, is_cancel={}, reason={:?}",
381            delta.order.order_id,
382            delta.is_cancel,
383            delta.reason
384        );
385
386        self.pending_messages
387            .push_back(KrakenFuturesWsMessage::OpenOrdersDelta(delta));
388    }
389
390    fn handle_open_orders_cancel_value(&mut self, value: Value) {
391        let cancel = match serde_json::from_value::<KrakenFuturesOpenOrdersCancel>(value) {
392            Ok(c) => c,
393            Err(e) => {
394                log::error!("Failed to parse open_orders cancel: {e}");
395                return;
396            }
397        };
398
399        log::debug!(
400            "Received open_orders cancel: order_id={}, cli_ord_id={:?}, reason={:?}",
401            cancel.order_id,
402            cancel.cli_ord_id,
403            cancel.reason
404        );
405
406        self.pending_messages
407            .push_back(KrakenFuturesWsMessage::OpenOrdersCancel(cancel));
408    }
409
410    fn handle_fills_delta_value(&mut self, value: Value) {
411        let delta = match serde_json::from_value::<KrakenFuturesFillsDelta>(value) {
412            Ok(d) => d,
413            Err(e) => {
414                log::error!("Failed to parse fills delta: {e}");
415                return;
416            }
417        };
418
419        log::debug!("Received fills delta: fill_count={}", delta.fills.len());
420
421        self.pending_messages
422            .push_back(KrakenFuturesWsMessage::FillsDelta(delta));
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use rstest::rstest;
429
430    use super::*;
431
432    fn create_test_handler() -> FuturesFeedHandler {
433        let signal = Arc::new(AtomicBool::new(false));
434        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
435        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
436        let subscriptions = SubscriptionState::new(':');
437
438        FuturesFeedHandler::new(signal, cmd_rx, raw_rx, subscriptions)
439    }
440
441    #[rstest]
442    fn test_parse_ticker_emits_ticker_message() {
443        let mut handler = create_test_handler();
444        let json = include_str!("../../../test_data/ws_futures_ticker.json");
445
446        handler.parse_message(json);
447
448        assert_eq!(handler.pending_messages.len(), 1);
449        let msg = handler.pending_messages.pop_front().unwrap();
450        let KrakenFuturesWsMessage::Ticker(ticker) = msg else {
451            panic!("Expected Ticker message, was {msg:?}");
452        };
453        assert_eq!(ticker.product_id, Ustr::from("PI_XBTUSD"));
454        assert_eq!(ticker.bid, Some(21978.5));
455        assert_eq!(ticker.ask, Some(21987.0));
456    }
457
458    #[rstest]
459    fn test_parse_trade_emits_trade_message() {
460        let mut handler = create_test_handler();
461        handler.subscriptions.mark_subscribe("trades:PI_XBTUSD");
462        handler.subscriptions.confirm_subscribe("trades:PI_XBTUSD");
463
464        let json = include_str!("../../../test_data/ws_futures_trade.json");
465
466        handler.parse_message(json);
467
468        assert_eq!(handler.pending_messages.len(), 1);
469        let msg = handler.pending_messages.pop_front().unwrap();
470        let KrakenFuturesWsMessage::Trade(trade) = msg else {
471            panic!("Expected Trade message, was {msg:?}");
472        };
473        assert_eq!(trade.product_id, Ustr::from("PI_XBTUSD"));
474        assert_eq!(trade.price, 34969.5);
475        assert_eq!(trade.qty, 15000.0);
476    }
477
478    #[rstest]
479    fn test_parse_trade_filters_unsubscribed() {
480        let mut handler = create_test_handler();
481        let json = include_str!("../../../test_data/ws_futures_trade.json");
482
483        handler.parse_message(json);
484
485        assert!(
486            handler.pending_messages.is_empty(),
487            "Trade for unsubscribed product should be filtered"
488        );
489    }
490
491    #[rstest]
492    fn test_parse_book_snapshot_emits_book_snapshot() {
493        let mut handler = create_test_handler();
494        handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
495        handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
496
497        let json = include_str!("../../../test_data/ws_futures_book_snapshot.json");
498
499        handler.parse_message(json);
500
501        assert_eq!(handler.pending_messages.len(), 1);
502        let msg = handler.pending_messages.pop_front().unwrap();
503        let KrakenFuturesWsMessage::BookSnapshot(snapshot) = msg else {
504            panic!("Expected BookSnapshot message, was {msg:?}");
505        };
506        assert_eq!(snapshot.product_id, Ustr::from("PI_XBTUSD"));
507        assert_eq!(snapshot.bids.len(), 2);
508        assert_eq!(snapshot.asks.len(), 2);
509    }
510
511    #[rstest]
512    fn test_parse_book_snapshot_filters_unsubscribed() {
513        let mut handler = create_test_handler();
514        let json = include_str!("../../../test_data/ws_futures_book_snapshot.json");
515
516        handler.parse_message(json);
517
518        assert!(
519            handler.pending_messages.is_empty(),
520            "Book snapshot for unsubscribed product should be filtered"
521        );
522    }
523
524    #[rstest]
525    fn test_parse_book_delta_emits_book_delta() {
526        let mut handler = create_test_handler();
527        handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
528        handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
529
530        let json = include_str!("../../../test_data/ws_futures_book_delta.json");
531
532        handler.parse_message(json);
533
534        assert_eq!(handler.pending_messages.len(), 1);
535        let msg = handler.pending_messages.pop_front().unwrap();
536        let KrakenFuturesWsMessage::BookDelta(delta) = msg else {
537            panic!("Expected BookDelta message, was {msg:?}");
538        };
539        assert_eq!(delta.product_id, Ustr::from("PI_XBTUSD"));
540        assert_eq!(delta.price, 34981.0);
541    }
542
543    #[rstest]
544    fn test_parse_book_delta_filters_unsubscribed() {
545        let mut handler = create_test_handler();
546        let json = include_str!("../../../test_data/ws_futures_book_delta.json");
547
548        handler.parse_message(json);
549
550        assert!(
551            handler.pending_messages.is_empty(),
552            "Book delta for unsubscribed product should be filtered"
553        );
554    }
555
556    #[rstest]
557    fn test_parse_open_orders_cancel_emits_cancel() {
558        let mut handler = create_test_handler();
559        let json = include_str!("../../../test_data/ws_futures_open_orders_cancel.json");
560
561        handler.parse_message(json);
562
563        assert_eq!(handler.pending_messages.len(), 1);
564        let msg = handler.pending_messages.pop_front().unwrap();
565        let KrakenFuturesWsMessage::OpenOrdersCancel(cancel) = msg else {
566            panic!("Expected OpenOrdersCancel message, was {msg:?}");
567        };
568        assert_eq!(cancel.order_id, "660c6b23-8007-48c1-a7c9-4893f4572e8c");
569        assert!(cancel.is_cancel);
570    }
571
572    #[rstest]
573    fn test_parse_open_orders_delta_emits_delta() {
574        let mut handler = create_test_handler();
575        let json = include_str!("../../../test_data/ws_futures_open_orders_delta.json");
576
577        handler.parse_message(json);
578
579        assert_eq!(handler.pending_messages.len(), 1);
580        let msg = handler.pending_messages.pop_front().unwrap();
581        let KrakenFuturesWsMessage::OpenOrdersDelta(delta) = msg else {
582            panic!("Expected OpenOrdersDelta message, was {msg:?}");
583        };
584        assert_eq!(delta.order.instrument, Ustr::from("PI_XBTUSD"));
585        assert!(!delta.is_cancel);
586    }
587
588    #[rstest]
589    fn test_parse_fills_delta_emits_fills() {
590        let mut handler = create_test_handler();
591        let json = include_str!("../../../test_data/ws_futures_fills_delta.json");
592
593        handler.parse_message(json);
594
595        assert_eq!(handler.pending_messages.len(), 1);
596        let msg = handler.pending_messages.pop_front().unwrap();
597        let KrakenFuturesWsMessage::FillsDelta(fills) = msg else {
598            panic!("Expected FillsDelta message, was {msg:?}");
599        };
600        assert_eq!(fills.fills.len(), 1);
601        assert_eq!(
602            fills.fills[0].fill_id,
603            "6a22a3fb-e18e-4e76-b841-8689735c9158"
604        );
605    }
606
607    #[rstest]
608    fn test_parse_challenge_emits_challenge_message() {
609        let mut handler = create_test_handler();
610        let json = r#"{"event":"challenge","message":"server-challenge-abc"}"#;
611
612        handler.parse_message(json);
613
614        assert_eq!(handler.pending_messages.len(), 1);
615        let msg = handler.pending_messages.pop_front().unwrap();
616        let KrakenFuturesWsMessage::Challenge(challenge) = msg else {
617            panic!("Expected Challenge message, was {msg:?}");
618        };
619        assert_eq!(challenge, "server-challenge-abc");
620    }
621
622    #[rstest]
623    fn test_heartbeat_produces_no_message() {
624        let mut handler = create_test_handler();
625        let json = r#"{"feed":"heartbeat","time":1700000000000}"#;
626
627        handler.parse_message(json);
628
629        assert!(handler.pending_messages.is_empty());
630    }
631
632    #[rstest]
633    fn test_info_event_produces_no_message() {
634        let mut handler = create_test_handler();
635        let json = r#"{"event":"info","version":1}"#;
636
637        handler.parse_message(json);
638
639        assert!(handler.pending_messages.is_empty());
640    }
641}