Skip to main content

nautilus_kraken/websocket/spot_v2/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Kraken Spot v2.
17
18use std::{
19    collections::VecDeque,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use nautilus_network::{
27    RECONNECTED,
28    websocket::{SubscriptionState, WebSocketClient},
29};
30use serde_json::Value;
31use tokio_tungstenite::tungstenite::Message;
32
33use super::{
34    enums::{KrakenWsChannel, KrakenWsMessageType},
35    messages::{
36        KrakenSpotWsMessage, KrakenWsBookData, KrakenWsExecutionData, KrakenWsMessage,
37        KrakenWsOhlcData, KrakenWsResponse, KrakenWsTickerData, KrakenWsTradeData,
38    },
39};
40
41/// Commands sent from the outer client to the inner message handler.
42#[derive(Debug)]
43pub enum SpotHandlerCommand {
44    SetClient(WebSocketClient),
45    Disconnect,
46    Subscribe { payload: String },
47    Unsubscribe { payload: String },
48    Ping { payload: String },
49}
50
51/// WebSocket message handler for Kraken Spot v2.
52pub(super) struct SpotFeedHandler {
53    signal: Arc<AtomicBool>,
54    inner: Option<WebSocketClient>,
55    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<SpotHandlerCommand>,
56    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
57    subscriptions: SubscriptionState,
58    pending_messages: VecDeque<KrakenSpotWsMessage>,
59}
60
61impl SpotFeedHandler {
62    /// Creates a new [`SpotFeedHandler`] instance.
63    pub(super) fn new(
64        signal: Arc<AtomicBool>,
65        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<SpotHandlerCommand>,
66        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
67        subscriptions: SubscriptionState,
68    ) -> Self {
69        Self {
70            signal,
71            inner: None,
72            cmd_rx,
73            raw_rx,
74            subscriptions,
75            pending_messages: VecDeque::new(),
76        }
77    }
78
79    pub(super) fn is_stopped(&self) -> bool {
80        self.signal.load(Ordering::Relaxed)
81    }
82
83    fn is_subscribed(&self, topic: &str) -> bool {
84        self.subscriptions.all_topics().iter().any(|t| t == topic)
85    }
86
87    /// Processes messages and commands, returning when stopped or stream ends.
88    pub(super) async fn next(&mut self) -> Option<KrakenSpotWsMessage> {
89        if let Some(msg) = self.pending_messages.pop_front() {
90            return Some(msg);
91        }
92
93        loop {
94            tokio::select! {
95                Some(cmd) = self.cmd_rx.recv() => {
96                    match cmd {
97                        SpotHandlerCommand::SetClient(client) => {
98                            log::debug!("WebSocketClient received by handler");
99                            self.inner = Some(client);
100                        }
101                        SpotHandlerCommand::Disconnect => {
102                            log::debug!("Disconnect command received");
103
104                            if let Some(client) = self.inner.take() {
105                                client.disconnect().await;
106                            }
107                        }
108                        SpotHandlerCommand::Subscribe { payload }
109                        | SpotHandlerCommand::Unsubscribe { payload }
110                        | SpotHandlerCommand::Ping { payload } => {
111                            if let Some(client) = &self.inner
112                                && let Err(e) = client.send_text(payload.clone(), None).await
113                            {
114                                log::error!("Failed to send text: {e}");
115                            }
116                        }
117                    }
118                }
119
120                msg = self.raw_rx.recv() => {
121                    let msg = match msg {
122                        Some(msg) => msg,
123                        None => {
124                            log::debug!("WebSocket stream closed");
125                            return None;
126                        }
127                    };
128
129                    if let Message::Ping(data) = &msg {
130                        log::trace!("Received ping frame with {} bytes", data.len());
131
132                        if let Some(client) = &self.inner
133                            && let Err(e) = client.send_pong(data.to_vec()).await
134                        {
135                            log::warn!("Failed to send pong frame: {e}");
136                        }
137                        continue;
138                    }
139
140                    if self.signal.load(Ordering::Relaxed) {
141                        log::debug!("Stop signal received");
142                        return None;
143                    }
144
145                    let text = match msg {
146                        Message::Text(text) => text.to_string(),
147                        Message::Binary(data) => {
148                            match String::from_utf8(data.to_vec()) {
149                                Ok(text) => text,
150                                Err(e) => {
151                                    log::warn!("Failed to decode binary message: {e}");
152                                    continue;
153                                }
154                            }
155                        }
156                        Message::Pong(_) => {
157                            log::trace!("Received pong");
158                            continue;
159                        }
160                        Message::Close(_) => {
161                            log::info!("WebSocket connection closed");
162                            return None;
163                        }
164                        Message::Frame(_) => {
165                            log::trace!("Received raw frame");
166                            continue;
167                        }
168                        _ => continue,
169                    };
170
171                    if text == RECONNECTED {
172                        log::info!("Received WebSocket reconnected signal");
173                        return Some(KrakenSpotWsMessage::Reconnected);
174                    }
175
176                    if let Some(msg) = self.parse_message(&text) {
177                        return Some(msg);
178                    }
179                }
180            }
181        }
182    }
183
184    fn parse_message(&self, text: &str) -> Option<KrakenSpotWsMessage> {
185        // Fast pre-filter for high-frequency control messages (no JSON parsing)
186        if text.len() < 50 && text.starts_with("{\"channel\":\"") {
187            if text.contains("heartbeat") {
188                log::trace!("Received heartbeat");
189                return None;
190            }
191
192            if text.contains("status") {
193                log::debug!("Received status message");
194                return None;
195            }
196        }
197
198        let value: Value = match serde_json::from_str(text) {
199            Ok(v) => v,
200            Err(e) => {
201                log::warn!("Failed to parse message: {e}");
202                return None;
203            }
204        };
205
206        // Control messages have "method" field
207        if value.get("method").is_some() {
208            self.handle_control_message(value);
209            return None;
210        }
211
212        // Data messages have "channel" and "data" fields
213        if value.get("channel").is_some() && value.get("data").is_some() {
214            match serde_json::from_value::<KrakenWsMessage>(value) {
215                Ok(msg) => return self.handle_data_message(msg),
216                Err(e) => {
217                    log::debug!("Failed to parse data message: {e}");
218                    return None;
219                }
220            }
221        }
222
223        log::debug!("Unhandled message structure: {text}");
224        None
225    }
226
227    fn handle_control_message(&self, value: Value) {
228        match serde_json::from_value::<KrakenWsResponse>(value) {
229            Ok(response) => match response {
230                KrakenWsResponse::Subscribe(sub) => {
231                    if sub.success {
232                        if let Some(result) = &sub.result {
233                            log::debug!(
234                                "Subscription confirmed: channel={:?}, req_id={:?}",
235                                result.channel,
236                                sub.req_id
237                            );
238                        } else {
239                            log::debug!("Subscription confirmed: req_id={:?}", sub.req_id);
240                        }
241                    } else {
242                        log::warn!(
243                            "Subscription failed: error={:?}, req_id={:?}",
244                            sub.error,
245                            sub.req_id
246                        );
247                    }
248                }
249                KrakenWsResponse::Unsubscribe(unsub) => {
250                    if unsub.success {
251                        log::debug!("Unsubscription confirmed: req_id={:?}", unsub.req_id);
252                    } else {
253                        log::warn!(
254                            "Unsubscription failed: error={:?}, req_id={:?}",
255                            unsub.error,
256                            unsub.req_id
257                        );
258                    }
259                }
260                KrakenWsResponse::Pong(pong) => {
261                    log::trace!("Received pong: req_id={:?}", pong.req_id);
262                }
263                KrakenWsResponse::Other => {
264                    log::debug!("Received unknown control response");
265                }
266            },
267            Err(_) => {
268                log::debug!("Received control message (failed to parse details)");
269            }
270        }
271    }
272
273    fn handle_data_message(&self, msg: KrakenWsMessage) -> Option<KrakenSpotWsMessage> {
274        match msg.channel {
275            KrakenWsChannel::Book => self.handle_book_message(msg),
276            KrakenWsChannel::Ticker => self.handle_ticker_message(msg),
277            KrakenWsChannel::Trade => self.handle_trade_message(msg),
278            KrakenWsChannel::Ohlc => self.handle_ohlc_message(msg),
279            KrakenWsChannel::Executions => self.handle_executions_message(msg),
280            _ => {
281                log::warn!("Unhandled channel: {:?}", msg.channel);
282                None
283            }
284        }
285    }
286
287    fn handle_book_message(&self, msg: KrakenWsMessage) -> Option<KrakenSpotWsMessage> {
288        let is_snapshot = msg.event_type == KrakenWsMessageType::Snapshot;
289        let mut book_data = Vec::new();
290
291        for data in msg.data {
292            match serde_json::from_value::<KrakenWsBookData>(data) {
293                Ok(bd) => {
294                    if !self.is_subscribed(&format!("book:{}", bd.symbol)) {
295                        continue;
296                    }
297                    book_data.push(bd);
298                }
299                Err(e) => log::error!("Failed to deserialize book data: {e}"),
300            }
301        }
302
303        if book_data.is_empty() {
304            None
305        } else {
306            Some(KrakenSpotWsMessage::Book {
307                data: book_data,
308                is_snapshot,
309            })
310        }
311    }
312
313    fn handle_ticker_message(&self, msg: KrakenWsMessage) -> Option<KrakenSpotWsMessage> {
314        let mut tickers = Vec::new();
315
316        for data in msg.data {
317            match serde_json::from_value::<KrakenWsTickerData>(data) {
318                Ok(td) => {
319                    let symbol = &td.symbol;
320                    let quotes_key = format!("quotes:{symbol}");
321                    let ticker_key = format!("ticker:{symbol}");
322                    if !self.is_subscribed(&quotes_key) && !self.is_subscribed(&ticker_key) {
323                        continue;
324                    }
325                    tickers.push(td);
326                }
327                Err(e) => log::error!("Failed to deserialize ticker data: {e}"),
328            }
329        }
330
331        if tickers.is_empty() {
332            None
333        } else {
334            Some(KrakenSpotWsMessage::Ticker(tickers))
335        }
336    }
337
338    fn handle_trade_message(&self, msg: KrakenWsMessage) -> Option<KrakenSpotWsMessage> {
339        let mut trades = Vec::new();
340
341        for data in msg.data {
342            match serde_json::from_value::<KrakenWsTradeData>(data) {
343                Ok(td) => trades.push(td),
344                Err(e) => log::error!("Failed to deserialize trade data: {e}"),
345            }
346        }
347
348        if trades.is_empty() {
349            None
350        } else {
351            Some(KrakenSpotWsMessage::Trade(trades))
352        }
353    }
354
355    fn handle_ohlc_message(&self, msg: KrakenWsMessage) -> Option<KrakenSpotWsMessage> {
356        let mut ohlc_data = Vec::new();
357
358        for data in msg.data {
359            match serde_json::from_value::<KrakenWsOhlcData>(data) {
360                Ok(od) => ohlc_data.push(od),
361                Err(e) => log::error!("Failed to deserialize OHLC data: {e}"),
362            }
363        }
364
365        if ohlc_data.is_empty() {
366            None
367        } else {
368            Some(KrakenSpotWsMessage::Ohlc(ohlc_data))
369        }
370    }
371
372    fn handle_executions_message(&self, msg: KrakenWsMessage) -> Option<KrakenSpotWsMessage> {
373        let mut executions = Vec::new();
374
375        for data in msg.data {
376            match serde_json::from_value::<KrakenWsExecutionData>(data) {
377                Ok(ed) => executions.push(ed),
378                Err(e) => log::error!("Failed to deserialize execution data: {e}"),
379            }
380        }
381
382        if executions.is_empty() {
383            None
384        } else {
385            Some(KrakenSpotWsMessage::Execution(executions))
386        }
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use rstest::rstest;
393
394    use super::*;
395
396    fn create_test_handler() -> SpotFeedHandler {
397        let signal = Arc::new(AtomicBool::new(false));
398        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
399        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
400        let subscriptions = SubscriptionState::new(':');
401
402        SpotFeedHandler::new(signal, cmd_rx, raw_rx, subscriptions)
403    }
404
405    #[rstest]
406    fn test_ticker_message_filtered_without_quotes_subscription() {
407        let handler = create_test_handler();
408
409        let json = r#"{
410            "channel": "ticker",
411            "type": "snapshot",
412            "data": [{
413                "symbol": "BTC/USD",
414                "bid": 105944.20,
415                "bid_qty": 2.5,
416                "ask": 105944.30,
417                "ask_qty": 3.2,
418                "last": 105899.40,
419                "volume": 163.28908096,
420                "vwap": 105904.39279,
421                "low": 104711.00,
422                "high": 106613.10,
423                "change": 250.00,
424                "change_pct": 0.24,
425                "timestamp": "2022-12-25T09:30:59.123456Z"
426            }]
427        }"#;
428
429        let result = handler.parse_message(json);
430        assert!(
431            result.is_none(),
432            "Ticker message should be filtered when no quotes subscription exists"
433        );
434    }
435
436    #[rstest]
437    fn test_ticker_message_passes_with_quotes_subscription() {
438        let handler = create_test_handler();
439        handler.subscriptions.mark_subscribe("quotes:BTC/USD");
440        handler.subscriptions.confirm_subscribe("quotes:BTC/USD");
441
442        let json = r#"{
443            "channel": "ticker",
444            "type": "snapshot",
445            "data": [{
446                "symbol": "BTC/USD",
447                "bid": 105944.20,
448                "bid_qty": 2.5,
449                "ask": 105944.30,
450                "ask_qty": 3.2,
451                "last": 105899.40,
452                "volume": 163.28908096,
453                "vwap": 105904.39279,
454                "low": 104711.00,
455                "high": 106613.10,
456                "change": 250.00,
457                "change_pct": 0.24,
458                "timestamp": "2022-12-25T09:30:59.123456Z"
459            }]
460        }"#;
461
462        let result = handler.parse_message(json);
463        assert!(
464            result.is_some(),
465            "Ticker message should pass with quotes subscription"
466        );
467
468        match result.unwrap() {
469            KrakenSpotWsMessage::Ticker(data) => {
470                assert!(!data.is_empty(), "Should have ticker data");
471            }
472            _ => panic!("Expected Ticker message"),
473        }
474    }
475
476    #[rstest]
477    fn test_ticker_message_passes_with_ticker_subscription() {
478        let handler = create_test_handler();
479        handler.subscriptions.mark_subscribe("ticker:BTC/USD");
480        handler.subscriptions.confirm_subscribe("ticker:BTC/USD");
481
482        let json = r#"{
483            "channel": "ticker",
484            "type": "snapshot",
485            "data": [{
486                "symbol": "BTC/USD",
487                "bid": 105944.20,
488                "bid_qty": 2.5,
489                "ask": 105944.30,
490                "ask_qty": 3.2,
491                "last": 105899.40,
492                "volume": 163.28908096,
493                "vwap": 105904.39279,
494                "low": 104711.00,
495                "high": 106613.10,
496                "change": 250.00,
497                "change_pct": 0.24,
498                "timestamp": "2022-12-25T09:30:59.123456Z"
499            }]
500        }"#;
501
502        let result = handler.parse_message(json);
503        assert!(
504            result.is_some(),
505            "Ticker message should pass with ticker: subscription"
506        );
507
508        match result.unwrap() {
509            KrakenSpotWsMessage::Ticker(data) => {
510                assert!(!data.is_empty(), "Should have ticker data");
511            }
512            _ => panic!("Expected Ticker message"),
513        }
514    }
515
516    #[rstest]
517    fn test_book_message_filtered_without_book_subscription() {
518        let handler = create_test_handler();
519
520        let json = r#"{
521            "channel": "book",
522            "type": "snapshot",
523            "data": [{
524                "symbol": "BTC/USD",
525                "bids": [{"price": 105944.20, "qty": 2.5}],
526                "asks": [{"price": 105944.30, "qty": 3.2}],
527                "checksum": 12345,
528                "timestamp": "2023-10-06T17:35:55.440295Z"
529            }]
530        }"#;
531
532        let result = handler.parse_message(json);
533        assert!(
534            result.is_none(),
535            "Book message should be filtered when no book subscription exists"
536        );
537    }
538
539    #[rstest]
540    fn test_book_message_passes_with_book_subscription() {
541        let handler = create_test_handler();
542        handler.subscriptions.mark_subscribe("book:BTC/USD");
543        handler.subscriptions.confirm_subscribe("book:BTC/USD");
544
545        let json = r#"{
546            "channel": "book",
547            "type": "snapshot",
548            "data": [{
549                "symbol": "BTC/USD",
550                "bids": [{"price": 105944.20, "qty": 2.5}],
551                "asks": [{"price": 105944.30, "qty": 3.2}],
552                "checksum": 12345,
553                "timestamp": "2023-10-06T17:35:55.440295Z"
554            }]
555        }"#;
556
557        let result = handler.parse_message(json);
558        assert!(
559            result.is_some(),
560            "Book message should pass with book subscription"
561        );
562
563        match result.unwrap() {
564            KrakenSpotWsMessage::Book { data, is_snapshot } => {
565                assert!(!data.is_empty());
566                assert!(is_snapshot);
567            }
568            _ => panic!("Expected Book message"),
569        }
570    }
571
572    #[rstest]
573    fn test_quotes_and_book_subscriptions_independent() {
574        let handler = create_test_handler();
575        handler.subscriptions.mark_subscribe("quotes:BTC/USD");
576        handler.subscriptions.confirm_subscribe("quotes:BTC/USD");
577
578        let book_json = r#"{
579            "channel": "book",
580            "type": "snapshot",
581            "data": [{
582                "symbol": "BTC/USD",
583                "bids": [{"price": 105944.20, "qty": 2.5}],
584                "asks": [{"price": 105944.30, "qty": 3.2}],
585                "checksum": 12345,
586                "timestamp": "2023-10-06T17:35:55.440295Z"
587            }]
588        }"#;
589
590        let book_result = handler.parse_message(book_json);
591        assert!(
592            book_result.is_none(),
593            "Book message should be filtered without book: subscription"
594        );
595
596        let ticker_json = r#"{
597            "channel": "ticker",
598            "type": "snapshot",
599            "data": [{
600                "symbol": "BTC/USD",
601                "bid": 105944.20,
602                "bid_qty": 2.5,
603                "ask": 105944.30,
604                "ask_qty": 3.2,
605                "last": 105899.40,
606                "volume": 163.28908096,
607                "vwap": 105904.39279,
608                "low": 104711.00,
609                "high": 106613.10,
610                "change": 250.00,
611                "change_pct": 0.24,
612                "timestamp": "2022-12-25T09:30:59.123456Z"
613            }]
614        }"#;
615
616        let ticker_result = handler.parse_message(ticker_json);
617        assert!(
618            ticker_result.is_some(),
619            "Ticker should pass with quotes subscription"
620        );
621    }
622}