1use std::{
19 collections::VecDeque,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use nautilus_network::{
27 RECONNECTED,
28 websocket::{SubscriptionState, WebSocketClient},
29};
30use serde_json::Value;
31use tokio_tungstenite::tungstenite::Message;
32
33use super::{
34 enums::{KrakenWsChannel, KrakenWsMessageType},
35 messages::{
36 KrakenSpotWsMessage, KrakenWsBookData, KrakenWsExecutionData, KrakenWsMessage,
37 KrakenWsOhlcData, KrakenWsResponse, KrakenWsTickerData, KrakenWsTradeData,
38 },
39};
40
41#[derive(Debug)]
43pub enum SpotHandlerCommand {
44 SetClient(WebSocketClient),
45 Disconnect,
46 Subscribe { payload: String },
47 Unsubscribe { payload: String },
48 Ping { payload: String },
49}
50
51pub(super) struct SpotFeedHandler {
53 signal: Arc<AtomicBool>,
54 inner: Option<WebSocketClient>,
55 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<SpotHandlerCommand>,
56 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
57 subscriptions: SubscriptionState,
58 pending_messages: VecDeque<KrakenSpotWsMessage>,
59}
60
61impl SpotFeedHandler {
62 pub(super) fn new(
64 signal: Arc<AtomicBool>,
65 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<SpotHandlerCommand>,
66 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
67 subscriptions: SubscriptionState,
68 ) -> Self {
69 Self {
70 signal,
71 inner: None,
72 cmd_rx,
73 raw_rx,
74 subscriptions,
75 pending_messages: VecDeque::new(),
76 }
77 }
78
79 pub(super) fn is_stopped(&self) -> bool {
80 self.signal.load(Ordering::Relaxed)
81 }
82
83 fn is_subscribed(&self, topic: &str) -> bool {
84 self.subscriptions.all_topics().iter().any(|t| t == topic)
85 }
86
87 pub(super) async fn next(&mut self) -> Option<KrakenSpotWsMessage> {
89 if let Some(msg) = self.pending_messages.pop_front() {
90 return Some(msg);
91 }
92
93 loop {
94 tokio::select! {
95 Some(cmd) = self.cmd_rx.recv() => {
96 match cmd {
97 SpotHandlerCommand::SetClient(client) => {
98 log::debug!("WebSocketClient received by handler");
99 self.inner = Some(client);
100 }
101 SpotHandlerCommand::Disconnect => {
102 log::debug!("Disconnect command received");
103
104 if let Some(client) = self.inner.take() {
105 client.disconnect().await;
106 }
107 }
108 SpotHandlerCommand::Subscribe { payload }
109 | SpotHandlerCommand::Unsubscribe { payload }
110 | SpotHandlerCommand::Ping { payload } => {
111 if let Some(client) = &self.inner
112 && let Err(e) = client.send_text(payload.clone(), None).await
113 {
114 log::error!("Failed to send text: {e}");
115 }
116 }
117 }
118 }
119
120 msg = self.raw_rx.recv() => {
121 let msg = match msg {
122 Some(msg) => msg,
123 None => {
124 log::debug!("WebSocket stream closed");
125 return None;
126 }
127 };
128
129 if let Message::Ping(data) = &msg {
130 log::trace!("Received ping frame with {} bytes", data.len());
131
132 if let Some(client) = &self.inner
133 && let Err(e) = client.send_pong(data.to_vec()).await
134 {
135 log::warn!("Failed to send pong frame: {e}");
136 }
137 continue;
138 }
139
140 if self.signal.load(Ordering::Relaxed) {
141 log::debug!("Stop signal received");
142 return None;
143 }
144
145 let text = match msg {
146 Message::Text(text) => text.to_string(),
147 Message::Binary(data) => {
148 match String::from_utf8(data.to_vec()) {
149 Ok(text) => text,
150 Err(e) => {
151 log::warn!("Failed to decode binary message: {e}");
152 continue;
153 }
154 }
155 }
156 Message::Pong(_) => {
157 log::trace!("Received pong");
158 continue;
159 }
160 Message::Close(_) => {
161 log::info!("WebSocket connection closed");
162 return None;
163 }
164 Message::Frame(_) => {
165 log::trace!("Received raw frame");
166 continue;
167 }
168 _ => continue,
169 };
170
171 if text == RECONNECTED {
172 log::info!("Received WebSocket reconnected signal");
173 return Some(KrakenSpotWsMessage::Reconnected);
174 }
175
176 if let Some(msg) = self.parse_message(&text) {
177 return Some(msg);
178 }
179 }
180 }
181 }
182 }
183
184 fn parse_message(&self, text: &str) -> Option<KrakenSpotWsMessage> {
185 if text.len() < 50 && text.starts_with("{\"channel\":\"") {
187 if text.contains("heartbeat") {
188 log::trace!("Received heartbeat");
189 return None;
190 }
191
192 if text.contains("status") {
193 log::debug!("Received status message");
194 return None;
195 }
196 }
197
198 let value: Value = match serde_json::from_str(text) {
199 Ok(v) => v,
200 Err(e) => {
201 log::warn!("Failed to parse message: {e}");
202 return None;
203 }
204 };
205
206 if value.get("method").is_some() {
208 self.handle_control_message(value);
209 return None;
210 }
211
212 if value.get("channel").is_some() && value.get("data").is_some() {
214 match serde_json::from_value::<KrakenWsMessage>(value) {
215 Ok(msg) => return self.handle_data_message(msg),
216 Err(e) => {
217 log::debug!("Failed to parse data message: {e}");
218 return None;
219 }
220 }
221 }
222
223 log::debug!("Unhandled message structure: {text}");
224 None
225 }
226
227 fn handle_control_message(&self, value: Value) {
228 match serde_json::from_value::<KrakenWsResponse>(value) {
229 Ok(response) => match response {
230 KrakenWsResponse::Subscribe(sub) => {
231 if sub.success {
232 if let Some(result) = &sub.result {
233 log::debug!(
234 "Subscription confirmed: channel={:?}, req_id={:?}",
235 result.channel,
236 sub.req_id
237 );
238 } else {
239 log::debug!("Subscription confirmed: req_id={:?}", sub.req_id);
240 }
241 } else {
242 log::warn!(
243 "Subscription failed: error={:?}, req_id={:?}",
244 sub.error,
245 sub.req_id
246 );
247 }
248 }
249 KrakenWsResponse::Unsubscribe(unsub) => {
250 if unsub.success {
251 log::debug!("Unsubscription confirmed: req_id={:?}", unsub.req_id);
252 } else {
253 log::warn!(
254 "Unsubscription failed: error={:?}, req_id={:?}",
255 unsub.error,
256 unsub.req_id
257 );
258 }
259 }
260 KrakenWsResponse::Pong(pong) => {
261 log::trace!("Received pong: req_id={:?}", pong.req_id);
262 }
263 KrakenWsResponse::Other => {
264 log::debug!("Received unknown control response");
265 }
266 },
267 Err(_) => {
268 log::debug!("Received control message (failed to parse details)");
269 }
270 }
271 }
272
273 fn handle_data_message(&self, msg: KrakenWsMessage) -> Option<KrakenSpotWsMessage> {
274 match msg.channel {
275 KrakenWsChannel::Book => self.handle_book_message(msg),
276 KrakenWsChannel::Ticker => self.handle_ticker_message(msg),
277 KrakenWsChannel::Trade => self.handle_trade_message(msg),
278 KrakenWsChannel::Ohlc => self.handle_ohlc_message(msg),
279 KrakenWsChannel::Executions => self.handle_executions_message(msg),
280 _ => {
281 log::warn!("Unhandled channel: {:?}", msg.channel);
282 None
283 }
284 }
285 }
286
287 fn handle_book_message(&self, msg: KrakenWsMessage) -> Option<KrakenSpotWsMessage> {
288 let is_snapshot = msg.event_type == KrakenWsMessageType::Snapshot;
289 let mut book_data = Vec::new();
290
291 for data in msg.data {
292 match serde_json::from_value::<KrakenWsBookData>(data) {
293 Ok(bd) => {
294 if !self.is_subscribed(&format!("book:{}", bd.symbol)) {
295 continue;
296 }
297 book_data.push(bd);
298 }
299 Err(e) => log::error!("Failed to deserialize book data: {e}"),
300 }
301 }
302
303 if book_data.is_empty() {
304 None
305 } else {
306 Some(KrakenSpotWsMessage::Book {
307 data: book_data,
308 is_snapshot,
309 })
310 }
311 }
312
313 fn handle_ticker_message(&self, msg: KrakenWsMessage) -> Option<KrakenSpotWsMessage> {
314 let mut tickers = Vec::new();
315
316 for data in msg.data {
317 match serde_json::from_value::<KrakenWsTickerData>(data) {
318 Ok(td) => {
319 let symbol = &td.symbol;
320 let quotes_key = format!("quotes:{symbol}");
321 let ticker_key = format!("ticker:{symbol}");
322 if !self.is_subscribed("es_key) && !self.is_subscribed(&ticker_key) {
323 continue;
324 }
325 tickers.push(td);
326 }
327 Err(e) => log::error!("Failed to deserialize ticker data: {e}"),
328 }
329 }
330
331 if tickers.is_empty() {
332 None
333 } else {
334 Some(KrakenSpotWsMessage::Ticker(tickers))
335 }
336 }
337
338 fn handle_trade_message(&self, msg: KrakenWsMessage) -> Option<KrakenSpotWsMessage> {
339 let mut trades = Vec::new();
340
341 for data in msg.data {
342 match serde_json::from_value::<KrakenWsTradeData>(data) {
343 Ok(td) => trades.push(td),
344 Err(e) => log::error!("Failed to deserialize trade data: {e}"),
345 }
346 }
347
348 if trades.is_empty() {
349 None
350 } else {
351 Some(KrakenSpotWsMessage::Trade(trades))
352 }
353 }
354
355 fn handle_ohlc_message(&self, msg: KrakenWsMessage) -> Option<KrakenSpotWsMessage> {
356 let mut ohlc_data = Vec::new();
357
358 for data in msg.data {
359 match serde_json::from_value::<KrakenWsOhlcData>(data) {
360 Ok(od) => ohlc_data.push(od),
361 Err(e) => log::error!("Failed to deserialize OHLC data: {e}"),
362 }
363 }
364
365 if ohlc_data.is_empty() {
366 None
367 } else {
368 Some(KrakenSpotWsMessage::Ohlc(ohlc_data))
369 }
370 }
371
372 fn handle_executions_message(&self, msg: KrakenWsMessage) -> Option<KrakenSpotWsMessage> {
373 let mut executions = Vec::new();
374
375 for data in msg.data {
376 match serde_json::from_value::<KrakenWsExecutionData>(data) {
377 Ok(ed) => executions.push(ed),
378 Err(e) => log::error!("Failed to deserialize execution data: {e}"),
379 }
380 }
381
382 if executions.is_empty() {
383 None
384 } else {
385 Some(KrakenSpotWsMessage::Execution(executions))
386 }
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use rstest::rstest;
393
394 use super::*;
395
396 fn create_test_handler() -> SpotFeedHandler {
397 let signal = Arc::new(AtomicBool::new(false));
398 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
399 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
400 let subscriptions = SubscriptionState::new(':');
401
402 SpotFeedHandler::new(signal, cmd_rx, raw_rx, subscriptions)
403 }
404
405 #[rstest]
406 fn test_ticker_message_filtered_without_quotes_subscription() {
407 let handler = create_test_handler();
408
409 let json = r#"{
410 "channel": "ticker",
411 "type": "snapshot",
412 "data": [{
413 "symbol": "BTC/USD",
414 "bid": 105944.20,
415 "bid_qty": 2.5,
416 "ask": 105944.30,
417 "ask_qty": 3.2,
418 "last": 105899.40,
419 "volume": 163.28908096,
420 "vwap": 105904.39279,
421 "low": 104711.00,
422 "high": 106613.10,
423 "change": 250.00,
424 "change_pct": 0.24,
425 "timestamp": "2022-12-25T09:30:59.123456Z"
426 }]
427 }"#;
428
429 let result = handler.parse_message(json);
430 assert!(
431 result.is_none(),
432 "Ticker message should be filtered when no quotes subscription exists"
433 );
434 }
435
436 #[rstest]
437 fn test_ticker_message_passes_with_quotes_subscription() {
438 let handler = create_test_handler();
439 handler.subscriptions.mark_subscribe("quotes:BTC/USD");
440 handler.subscriptions.confirm_subscribe("quotes:BTC/USD");
441
442 let json = r#"{
443 "channel": "ticker",
444 "type": "snapshot",
445 "data": [{
446 "symbol": "BTC/USD",
447 "bid": 105944.20,
448 "bid_qty": 2.5,
449 "ask": 105944.30,
450 "ask_qty": 3.2,
451 "last": 105899.40,
452 "volume": 163.28908096,
453 "vwap": 105904.39279,
454 "low": 104711.00,
455 "high": 106613.10,
456 "change": 250.00,
457 "change_pct": 0.24,
458 "timestamp": "2022-12-25T09:30:59.123456Z"
459 }]
460 }"#;
461
462 let result = handler.parse_message(json);
463 assert!(
464 result.is_some(),
465 "Ticker message should pass with quotes subscription"
466 );
467
468 match result.unwrap() {
469 KrakenSpotWsMessage::Ticker(data) => {
470 assert!(!data.is_empty(), "Should have ticker data");
471 }
472 _ => panic!("Expected Ticker message"),
473 }
474 }
475
476 #[rstest]
477 fn test_ticker_message_passes_with_ticker_subscription() {
478 let handler = create_test_handler();
479 handler.subscriptions.mark_subscribe("ticker:BTC/USD");
480 handler.subscriptions.confirm_subscribe("ticker:BTC/USD");
481
482 let json = r#"{
483 "channel": "ticker",
484 "type": "snapshot",
485 "data": [{
486 "symbol": "BTC/USD",
487 "bid": 105944.20,
488 "bid_qty": 2.5,
489 "ask": 105944.30,
490 "ask_qty": 3.2,
491 "last": 105899.40,
492 "volume": 163.28908096,
493 "vwap": 105904.39279,
494 "low": 104711.00,
495 "high": 106613.10,
496 "change": 250.00,
497 "change_pct": 0.24,
498 "timestamp": "2022-12-25T09:30:59.123456Z"
499 }]
500 }"#;
501
502 let result = handler.parse_message(json);
503 assert!(
504 result.is_some(),
505 "Ticker message should pass with ticker: subscription"
506 );
507
508 match result.unwrap() {
509 KrakenSpotWsMessage::Ticker(data) => {
510 assert!(!data.is_empty(), "Should have ticker data");
511 }
512 _ => panic!("Expected Ticker message"),
513 }
514 }
515
516 #[rstest]
517 fn test_book_message_filtered_without_book_subscription() {
518 let handler = create_test_handler();
519
520 let json = r#"{
521 "channel": "book",
522 "type": "snapshot",
523 "data": [{
524 "symbol": "BTC/USD",
525 "bids": [{"price": 105944.20, "qty": 2.5}],
526 "asks": [{"price": 105944.30, "qty": 3.2}],
527 "checksum": 12345,
528 "timestamp": "2023-10-06T17:35:55.440295Z"
529 }]
530 }"#;
531
532 let result = handler.parse_message(json);
533 assert!(
534 result.is_none(),
535 "Book message should be filtered when no book subscription exists"
536 );
537 }
538
539 #[rstest]
540 fn test_book_message_passes_with_book_subscription() {
541 let handler = create_test_handler();
542 handler.subscriptions.mark_subscribe("book:BTC/USD");
543 handler.subscriptions.confirm_subscribe("book:BTC/USD");
544
545 let json = r#"{
546 "channel": "book",
547 "type": "snapshot",
548 "data": [{
549 "symbol": "BTC/USD",
550 "bids": [{"price": 105944.20, "qty": 2.5}],
551 "asks": [{"price": 105944.30, "qty": 3.2}],
552 "checksum": 12345,
553 "timestamp": "2023-10-06T17:35:55.440295Z"
554 }]
555 }"#;
556
557 let result = handler.parse_message(json);
558 assert!(
559 result.is_some(),
560 "Book message should pass with book subscription"
561 );
562
563 match result.unwrap() {
564 KrakenSpotWsMessage::Book { data, is_snapshot } => {
565 assert!(!data.is_empty());
566 assert!(is_snapshot);
567 }
568 _ => panic!("Expected Book message"),
569 }
570 }
571
572 #[rstest]
573 fn test_quotes_and_book_subscriptions_independent() {
574 let handler = create_test_handler();
575 handler.subscriptions.mark_subscribe("quotes:BTC/USD");
576 handler.subscriptions.confirm_subscribe("quotes:BTC/USD");
577
578 let book_json = r#"{
579 "channel": "book",
580 "type": "snapshot",
581 "data": [{
582 "symbol": "BTC/USD",
583 "bids": [{"price": 105944.20, "qty": 2.5}],
584 "asks": [{"price": 105944.30, "qty": 3.2}],
585 "checksum": 12345,
586 "timestamp": "2023-10-06T17:35:55.440295Z"
587 }]
588 }"#;
589
590 let book_result = handler.parse_message(book_json);
591 assert!(
592 book_result.is_none(),
593 "Book message should be filtered without book: subscription"
594 );
595
596 let ticker_json = r#"{
597 "channel": "ticker",
598 "type": "snapshot",
599 "data": [{
600 "symbol": "BTC/USD",
601 "bid": 105944.20,
602 "bid_qty": 2.5,
603 "ask": 105944.30,
604 "ask_qty": 3.2,
605 "last": 105899.40,
606 "volume": 163.28908096,
607 "vwap": 105904.39279,
608 "low": 104711.00,
609 "high": 106613.10,
610 "change": 250.00,
611 "change_pct": 0.24,
612 "timestamp": "2022-12-25T09:30:59.123456Z"
613 }]
614 }"#;
615
616 let ticker_result = handler.parse_message(ticker_json);
617 assert!(
618 ticker_result.is_some(),
619 "Ticker should pass with quotes subscription"
620 );
621 }
622}