nautilus_kraken/websocket/futures/
handler.rs1use std::{
19 collections::VecDeque,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use nautilus_network::{
27 RECONNECTED,
28 websocket::{SubscriptionState, WebSocketClient},
29};
30use serde::Deserialize;
31use serde_json::Value;
32use tokio_tungstenite::tungstenite::Message;
33use ustr::Ustr;
34
35use super::messages::{
36 KrakenFuturesBookDelta, KrakenFuturesBookSnapshot, KrakenFuturesChannel,
37 KrakenFuturesFillsDelta, KrakenFuturesMessageType, KrakenFuturesOpenOrdersCancel,
38 KrakenFuturesOpenOrdersDelta, KrakenFuturesTickerData, KrakenFuturesTradeData,
39 KrakenFuturesWsMessage, classify_futures_message,
40};
41
42#[derive(Debug)]
44pub enum FuturesHandlerCommand {
45 SetClient(WebSocketClient),
46 Disconnect,
47 Subscribe { payload: String },
48 Unsubscribe { payload: String },
49 RequestChallenge { payload: String },
50}
51
52pub struct FuturesFeedHandler {
54 signal: Arc<AtomicBool>,
55 inner: Option<WebSocketClient>,
56 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<FuturesHandlerCommand>,
57 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
58 subscriptions: SubscriptionState,
59 pending_messages: VecDeque<KrakenFuturesWsMessage>,
60}
61
62impl FuturesFeedHandler {
63 pub fn new(
65 signal: Arc<AtomicBool>,
66 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<FuturesHandlerCommand>,
67 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
68 subscriptions: SubscriptionState,
69 ) -> Self {
70 Self {
71 signal,
72 inner: None,
73 cmd_rx,
74 raw_rx,
75 subscriptions,
76 pending_messages: VecDeque::new(),
77 }
78 }
79
80 pub fn is_stopped(&self) -> bool {
81 self.signal.load(Ordering::Relaxed)
82 }
83
84 fn is_subscribed(&self, channel: KrakenFuturesChannel, symbol: &Ustr) -> bool {
85 let channel_ustr = Ustr::from(channel.as_ref());
86 self.subscriptions.is_subscribed(&channel_ustr, symbol)
87 }
88
89 pub async fn next(&mut self) -> Option<KrakenFuturesWsMessage> {
91 if let Some(msg) = self.pending_messages.pop_front() {
92 return Some(msg);
93 }
94
95 loop {
96 tokio::select! {
97 Some(cmd) = self.cmd_rx.recv() => {
98 match cmd {
99 FuturesHandlerCommand::SetClient(client) => {
100 log::debug!("WebSocketClient received by futures handler");
101 self.inner = Some(client);
102 }
103 FuturesHandlerCommand::Disconnect => {
104 log::debug!("Disconnect command received");
105
106 if let Some(client) = self.inner.take() {
107 client.disconnect().await;
108 }
109 return None;
110 }
111 FuturesHandlerCommand::Subscribe { payload }
112 | FuturesHandlerCommand::Unsubscribe { payload }
113 | FuturesHandlerCommand::RequestChallenge { payload } => {
114 if let Some(ref client) = self.inner
115 && let Err(e) = client.send_text(payload, None).await
116 {
117 log::error!("Failed to send text: {e}");
118 }
119 }
120 }
121 }
122
123 msg = self.raw_rx.recv() => {
124 let msg = match msg {
125 Some(msg) => msg,
126 None => {
127 log::debug!("WebSocket stream closed");
128 return None;
129 }
130 };
131
132 if self.signal.load(Ordering::Relaxed) {
133 log::debug!("Stop signal received");
134 return None;
135 }
136
137 match &msg {
138 Message::Ping(data) => {
139 let len = data.len();
140 log::trace!("Received ping frame with {len} bytes");
141
142 if let Some(client) = &self.inner
143 && let Err(e) = client.send_pong(data.to_vec()).await
144 {
145 log::warn!("Failed to send pong frame: {e}");
146 }
147 continue;
148 }
149 Message::Pong(_) => {
150 log::debug!("Received pong from server");
151 continue;
152 }
153 Message::Close(_) => {
154 log::info!("WebSocket connection closed");
155 return None;
156 }
157 Message::Frame(_) => {
158 log::trace!("Received raw frame");
159 continue;
160 }
161 _ => {}
162 }
163
164 let text: &str = match &msg {
165 Message::Text(text) => text,
166 Message::Binary(data) => match std::str::from_utf8(data) {
167 Ok(s) => s,
168 Err(_) => continue,
169 },
170 _ => continue,
171 };
172
173 if text == RECONNECTED {
174 log::info!("Received WebSocket reconnected signal");
175 return Some(KrakenFuturesWsMessage::Reconnected);
176 }
177
178 self.parse_message(text);
179
180 if let Some(msg) = self.pending_messages.pop_front() {
181 return Some(msg);
182 }
183 }
184 }
185 }
186 }
187
188 fn parse_message(&mut self, text: &str) {
189 let value: Value = match serde_json::from_str(text) {
190 Ok(v) => v,
191 Err(e) => {
192 log::debug!("Failed to parse message as JSON: {e}");
193 return;
194 }
195 };
196
197 match classify_futures_message(&value) {
198 KrakenFuturesMessageType::OpenOrdersSnapshot => {
199 log::debug!(
200 "Skipping open_orders_snapshot (REST reconciliation handles initial state)"
201 );
202 }
203 KrakenFuturesMessageType::OpenOrdersCancel => {
204 self.handle_open_orders_cancel_value(value);
205 }
206 KrakenFuturesMessageType::OpenOrdersDelta => {
207 self.handle_open_orders_delta_value(value);
208 }
209 KrakenFuturesMessageType::FillsSnapshot => {
210 log::debug!("Skipping fills_snapshot (REST reconciliation handles initial state)");
211 }
212 KrakenFuturesMessageType::FillsDelta => {
213 self.handle_fills_delta_value(value);
214 }
215 KrakenFuturesMessageType::Ticker => {
216 self.handle_ticker_message_value(value);
217 }
218 KrakenFuturesMessageType::TradeSnapshot => {
219 log::debug!("Skipping trade_snapshot (only streaming live trades)");
220 }
221 KrakenFuturesMessageType::Trade => {
222 self.handle_trade_message_value(value);
223 }
224 KrakenFuturesMessageType::BookSnapshot => {
225 self.handle_book_snapshot_value(value);
226 }
227 KrakenFuturesMessageType::BookDelta => {
228 self.handle_book_delta_value(value);
229 }
230 KrakenFuturesMessageType::Info => {
231 log::debug!("Received info message: {text}");
232 }
233 KrakenFuturesMessageType::Pong => {
234 log::debug!("Received text pong response");
235 }
236 KrakenFuturesMessageType::Subscribed => {
237 log::debug!("Subscription confirmed: {text}");
238 }
239 KrakenFuturesMessageType::Unsubscribed => {
240 log::debug!("Unsubscription confirmed: {text}");
241 }
242 KrakenFuturesMessageType::Challenge => {
243 self.handle_challenge_response_value(value);
244 }
245 KrakenFuturesMessageType::Heartbeat => {
246 log::trace!("Heartbeat received");
247 }
248 KrakenFuturesMessageType::Error => {
249 let message = value
250 .get("message")
251 .and_then(|v| v.as_str())
252 .unwrap_or("Unknown error");
253 log::error!("Kraken Futures WebSocket error: {message}");
254 }
255 KrakenFuturesMessageType::Alert => {
256 let message = value
257 .get("message")
258 .and_then(|v| v.as_str())
259 .unwrap_or("Unknown alert");
260 log::warn!("Kraken Futures WebSocket alert: {message}");
261 }
262 KrakenFuturesMessageType::Unknown => {
263 log::warn!("Unhandled futures message: {text}");
264 }
265 }
266 }
267
268 fn handle_challenge_response_value(&mut self, value: Value) {
269 #[derive(Deserialize)]
270 struct ChallengeResponse {
271 message: String,
272 }
273
274 match serde_json::from_value::<ChallengeResponse>(value) {
275 Ok(response) => {
276 let len = response.message.len();
277 log::debug!("Challenge received, length: {len}");
278
279 self.pending_messages
280 .push_back(KrakenFuturesWsMessage::Challenge(response.message));
281 }
282 Err(e) => {
283 log::error!("Failed to parse challenge response: {e}");
284 }
285 }
286 }
287
288 fn handle_ticker_message_value(&mut self, value: Value) {
289 let ticker = match serde_json::from_value::<KrakenFuturesTickerData>(value) {
290 Ok(t) => t,
291 Err(e) => {
292 log::debug!("Failed to parse ticker: {e}");
293 return;
294 }
295 };
296
297 self.pending_messages
298 .push_back(KrakenFuturesWsMessage::Ticker(ticker));
299 }
300
301 fn handle_trade_message_value(&mut self, value: Value) {
302 let trade = match serde_json::from_value::<KrakenFuturesTradeData>(value) {
303 Ok(t) => t,
304 Err(e) => {
305 log::warn!("Failed to parse trade: {e}");
306 return;
307 }
308 };
309
310 if !self.is_subscribed(KrakenFuturesChannel::Trades, &trade.product_id) {
311 log::debug!(
312 "Received trade for unsubscribed product: {}",
313 trade.product_id
314 );
315 return;
316 }
317
318 self.pending_messages
319 .push_back(KrakenFuturesWsMessage::Trade(trade));
320 }
321
322 fn handle_book_snapshot_value(&mut self, value: Value) {
323 let snapshot = match serde_json::from_value::<KrakenFuturesBookSnapshot>(value) {
324 Ok(s) => s,
325 Err(e) => {
326 log::warn!("Failed to parse book snapshot: {e}");
327 return;
328 }
329 };
330
331 let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &snapshot.product_id);
332 let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &snapshot.product_id);
333
334 if !has_book && !has_quotes {
335 log::debug!(
336 "Received book snapshot for unsubscribed product: {}",
337 snapshot.product_id
338 );
339 return;
340 }
341
342 self.pending_messages
343 .push_back(KrakenFuturesWsMessage::BookSnapshot(snapshot));
344 }
345
346 fn handle_book_delta_value(&mut self, value: Value) {
347 let delta = match serde_json::from_value::<KrakenFuturesBookDelta>(value) {
348 Ok(d) => d,
349 Err(e) => {
350 log::warn!("Failed to parse book delta: {e}");
351 return;
352 }
353 };
354
355 let has_book = self.is_subscribed(KrakenFuturesChannel::Book, &delta.product_id);
356 let has_quotes = self.is_subscribed(KrakenFuturesChannel::Quotes, &delta.product_id);
357
358 if !has_book && !has_quotes {
359 log::debug!(
360 "Received book delta for unsubscribed product: {}",
361 delta.product_id
362 );
363 return;
364 }
365
366 self.pending_messages
367 .push_back(KrakenFuturesWsMessage::BookDelta(delta));
368 }
369
370 fn handle_open_orders_delta_value(&mut self, value: Value) {
371 let delta = match serde_json::from_value::<KrakenFuturesOpenOrdersDelta>(value) {
372 Ok(d) => d,
373 Err(e) => {
374 log::error!("Failed to parse open_orders delta: {e}");
375 return;
376 }
377 };
378
379 log::debug!(
380 "Received open_orders delta: order_id={}, is_cancel={}, reason={:?}",
381 delta.order.order_id,
382 delta.is_cancel,
383 delta.reason
384 );
385
386 self.pending_messages
387 .push_back(KrakenFuturesWsMessage::OpenOrdersDelta(delta));
388 }
389
390 fn handle_open_orders_cancel_value(&mut self, value: Value) {
391 let cancel = match serde_json::from_value::<KrakenFuturesOpenOrdersCancel>(value) {
392 Ok(c) => c,
393 Err(e) => {
394 log::error!("Failed to parse open_orders cancel: {e}");
395 return;
396 }
397 };
398
399 log::debug!(
400 "Received open_orders cancel: order_id={}, cli_ord_id={:?}, reason={:?}",
401 cancel.order_id,
402 cancel.cli_ord_id,
403 cancel.reason
404 );
405
406 self.pending_messages
407 .push_back(KrakenFuturesWsMessage::OpenOrdersCancel(cancel));
408 }
409
410 fn handle_fills_delta_value(&mut self, value: Value) {
411 let delta = match serde_json::from_value::<KrakenFuturesFillsDelta>(value) {
412 Ok(d) => d,
413 Err(e) => {
414 log::error!("Failed to parse fills delta: {e}");
415 return;
416 }
417 };
418
419 log::debug!("Received fills delta: fill_count={}", delta.fills.len());
420
421 self.pending_messages
422 .push_back(KrakenFuturesWsMessage::FillsDelta(delta));
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use rstest::rstest;
429
430 use super::*;
431
432 fn create_test_handler() -> FuturesFeedHandler {
433 let signal = Arc::new(AtomicBool::new(false));
434 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
435 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
436 let subscriptions = SubscriptionState::new(':');
437
438 FuturesFeedHandler::new(signal, cmd_rx, raw_rx, subscriptions)
439 }
440
441 #[rstest]
442 fn test_parse_ticker_emits_ticker_message() {
443 let mut handler = create_test_handler();
444 let json = include_str!("../../../test_data/ws_futures_ticker.json");
445
446 handler.parse_message(json);
447
448 assert_eq!(handler.pending_messages.len(), 1);
449 let msg = handler.pending_messages.pop_front().unwrap();
450 let KrakenFuturesWsMessage::Ticker(ticker) = msg else {
451 panic!("Expected Ticker message, was {msg:?}");
452 };
453 assert_eq!(ticker.product_id, Ustr::from("PI_XBTUSD"));
454 assert_eq!(ticker.bid, Some(21978.5));
455 assert_eq!(ticker.ask, Some(21987.0));
456 }
457
458 #[rstest]
459 fn test_parse_trade_emits_trade_message() {
460 let mut handler = create_test_handler();
461 handler.subscriptions.mark_subscribe("trades:PI_XBTUSD");
462 handler.subscriptions.confirm_subscribe("trades:PI_XBTUSD");
463
464 let json = include_str!("../../../test_data/ws_futures_trade.json");
465
466 handler.parse_message(json);
467
468 assert_eq!(handler.pending_messages.len(), 1);
469 let msg = handler.pending_messages.pop_front().unwrap();
470 let KrakenFuturesWsMessage::Trade(trade) = msg else {
471 panic!("Expected Trade message, was {msg:?}");
472 };
473 assert_eq!(trade.product_id, Ustr::from("PI_XBTUSD"));
474 assert_eq!(trade.price, 34969.5);
475 assert_eq!(trade.qty, 15000.0);
476 }
477
478 #[rstest]
479 fn test_parse_trade_filters_unsubscribed() {
480 let mut handler = create_test_handler();
481 let json = include_str!("../../../test_data/ws_futures_trade.json");
482
483 handler.parse_message(json);
484
485 assert!(
486 handler.pending_messages.is_empty(),
487 "Trade for unsubscribed product should be filtered"
488 );
489 }
490
491 #[rstest]
492 fn test_parse_book_snapshot_emits_book_snapshot() {
493 let mut handler = create_test_handler();
494 handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
495 handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
496
497 let json = include_str!("../../../test_data/ws_futures_book_snapshot.json");
498
499 handler.parse_message(json);
500
501 assert_eq!(handler.pending_messages.len(), 1);
502 let msg = handler.pending_messages.pop_front().unwrap();
503 let KrakenFuturesWsMessage::BookSnapshot(snapshot) = msg else {
504 panic!("Expected BookSnapshot message, was {msg:?}");
505 };
506 assert_eq!(snapshot.product_id, Ustr::from("PI_XBTUSD"));
507 assert_eq!(snapshot.bids.len(), 2);
508 assert_eq!(snapshot.asks.len(), 2);
509 }
510
511 #[rstest]
512 fn test_parse_book_snapshot_filters_unsubscribed() {
513 let mut handler = create_test_handler();
514 let json = include_str!("../../../test_data/ws_futures_book_snapshot.json");
515
516 handler.parse_message(json);
517
518 assert!(
519 handler.pending_messages.is_empty(),
520 "Book snapshot for unsubscribed product should be filtered"
521 );
522 }
523
524 #[rstest]
525 fn test_parse_book_delta_emits_book_delta() {
526 let mut handler = create_test_handler();
527 handler.subscriptions.mark_subscribe("book:PI_XBTUSD");
528 handler.subscriptions.confirm_subscribe("book:PI_XBTUSD");
529
530 let json = include_str!("../../../test_data/ws_futures_book_delta.json");
531
532 handler.parse_message(json);
533
534 assert_eq!(handler.pending_messages.len(), 1);
535 let msg = handler.pending_messages.pop_front().unwrap();
536 let KrakenFuturesWsMessage::BookDelta(delta) = msg else {
537 panic!("Expected BookDelta message, was {msg:?}");
538 };
539 assert_eq!(delta.product_id, Ustr::from("PI_XBTUSD"));
540 assert_eq!(delta.price, 34981.0);
541 }
542
543 #[rstest]
544 fn test_parse_book_delta_filters_unsubscribed() {
545 let mut handler = create_test_handler();
546 let json = include_str!("../../../test_data/ws_futures_book_delta.json");
547
548 handler.parse_message(json);
549
550 assert!(
551 handler.pending_messages.is_empty(),
552 "Book delta for unsubscribed product should be filtered"
553 );
554 }
555
556 #[rstest]
557 fn test_parse_open_orders_cancel_emits_cancel() {
558 let mut handler = create_test_handler();
559 let json = include_str!("../../../test_data/ws_futures_open_orders_cancel.json");
560
561 handler.parse_message(json);
562
563 assert_eq!(handler.pending_messages.len(), 1);
564 let msg = handler.pending_messages.pop_front().unwrap();
565 let KrakenFuturesWsMessage::OpenOrdersCancel(cancel) = msg else {
566 panic!("Expected OpenOrdersCancel message, was {msg:?}");
567 };
568 assert_eq!(cancel.order_id, "660c6b23-8007-48c1-a7c9-4893f4572e8c");
569 assert!(cancel.is_cancel);
570 }
571
572 #[rstest]
573 fn test_parse_open_orders_delta_emits_delta() {
574 let mut handler = create_test_handler();
575 let json = include_str!("../../../test_data/ws_futures_open_orders_delta.json");
576
577 handler.parse_message(json);
578
579 assert_eq!(handler.pending_messages.len(), 1);
580 let msg = handler.pending_messages.pop_front().unwrap();
581 let KrakenFuturesWsMessage::OpenOrdersDelta(delta) = msg else {
582 panic!("Expected OpenOrdersDelta message, was {msg:?}");
583 };
584 assert_eq!(delta.order.instrument, Ustr::from("PI_XBTUSD"));
585 assert!(!delta.is_cancel);
586 }
587
588 #[rstest]
589 fn test_parse_fills_delta_emits_fills() {
590 let mut handler = create_test_handler();
591 let json = include_str!("../../../test_data/ws_futures_fills_delta.json");
592
593 handler.parse_message(json);
594
595 assert_eq!(handler.pending_messages.len(), 1);
596 let msg = handler.pending_messages.pop_front().unwrap();
597 let KrakenFuturesWsMessage::FillsDelta(fills) = msg else {
598 panic!("Expected FillsDelta message, was {msg:?}");
599 };
600 assert_eq!(fills.fills.len(), 1);
601 assert_eq!(
602 fills.fills[0].fill_id,
603 "6a22a3fb-e18e-4e76-b841-8689735c9158"
604 );
605 }
606
607 #[rstest]
608 fn test_parse_challenge_emits_challenge_message() {
609 let mut handler = create_test_handler();
610 let json = r#"{"event":"challenge","message":"server-challenge-abc"}"#;
611
612 handler.parse_message(json);
613
614 assert_eq!(handler.pending_messages.len(), 1);
615 let msg = handler.pending_messages.pop_front().unwrap();
616 let KrakenFuturesWsMessage::Challenge(challenge) = msg else {
617 panic!("Expected Challenge message, was {msg:?}");
618 };
619 assert_eq!(challenge, "server-challenge-abc");
620 }
621
622 #[rstest]
623 fn test_heartbeat_produces_no_message() {
624 let mut handler = create_test_handler();
625 let json = r#"{"feed":"heartbeat","time":1700000000000}"#;
626
627 handler.parse_message(json);
628
629 assert!(handler.pending_messages.is_empty());
630 }
631
632 #[rstest]
633 fn test_info_event_produces_no_message() {
634 let mut handler = create_test_handler();
635 let json = r#"{"event":"info","version":1}"#;
636
637 handler.parse_message(json);
638
639 assert!(handler.pending_messages.is_empty());
640 }
641}