1use std::{
19 future::Future,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, AtomicU64, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use async_trait::async_trait;
29use nautilus_common::{
30 clients::DataClient,
31 live::{get_data_event_sender, get_runtime},
32 messages::{
33 DataEvent,
34 data::{
35 BarsResponse, BookResponse, DataResponse, FundingRatesResponse, InstrumentResponse,
36 InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
37 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
38 SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
39 SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
40 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
41 UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrumentStatus,
42 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
43 },
44 },
45};
46use nautilus_core::{
47 AtomicMap, AtomicSet,
48 datetime::datetime_to_unix_nanos,
49 nanos::UnixNanos,
50 time::{AtomicTime, get_atomic_clock_realtime},
51};
52use nautilus_model::{
53 data::{Data, OrderBookDeltas, OrderBookDeltas_API, QuoteTick},
54 enums::BookType,
55 identifiers::{ClientId, InstrumentId, Symbol, Venue},
56 instruments::{Instrument, InstrumentAny},
57 orderbook::OrderBook,
58};
59use tokio::task::JoinHandle;
60use tokio_util::sync::CancellationToken;
61
62use crate::{
63 common::consts::KRAKEN_VENUE,
64 config::KrakenDataClientConfig,
65 http::{
66 KrakenFuturesHttpClient, futures::client::KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND,
67 },
68 websocket::futures::{
69 client::KrakenFuturesWebSocketClient,
70 messages::KrakenFuturesWsMessage,
71 parse::{
72 parse_futures_ws_book_delta, parse_futures_ws_book_snapshot_deltas,
73 parse_futures_ws_funding_rate, parse_futures_ws_index_price,
74 parse_futures_ws_mark_price, parse_futures_ws_trade_tick,
75 },
76 },
77};
78
79#[allow(dead_code)]
83#[derive(Debug)]
84pub struct KrakenFuturesDataClient {
85 clock: &'static AtomicTime,
86 client_id: ClientId,
87 config: KrakenDataClientConfig,
88 http: KrakenFuturesHttpClient,
89 ws: KrakenFuturesWebSocketClient,
90 is_connected: AtomicBool,
91 cancellation_token: CancellationToken,
92 tasks: Vec<JoinHandle<()>>,
93 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
94 quote_instruments: Arc<AtomicSet<InstrumentId>>,
95 book_instruments: Arc<AtomicSet<InstrumentId>>,
96 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
97}
98
99impl KrakenFuturesDataClient {
100 pub fn new(client_id: ClientId, config: KrakenDataClientConfig) -> anyhow::Result<Self> {
102 let cancellation_token = CancellationToken::new();
103
104 let http = KrakenFuturesHttpClient::new(
105 config.environment,
106 config.base_url.clone(),
107 config.timeout_secs,
108 None,
109 None,
110 None,
111 config.proxy_url.clone(),
112 config
113 .max_requests_per_second
114 .unwrap_or(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND),
115 )?;
116
117 let ws = KrakenFuturesWebSocketClient::with_credentials(
118 config.ws_public_url(),
119 config.heartbeat_interval_secs,
120 None,
121 config.transport_backend,
122 config.proxy_url.clone(),
123 );
124
125 Ok(Self {
126 clock: get_atomic_clock_realtime(),
127 client_id,
128 config,
129 http,
130 ws,
131 is_connected: AtomicBool::new(false),
132 cancellation_token,
133 tasks: Vec::new(),
134 instruments: Arc::new(AtomicMap::new()),
135 quote_instruments: Arc::new(AtomicSet::new()),
136 book_instruments: Arc::new(AtomicSet::new()),
137 data_sender: get_data_event_sender(),
138 })
139 }
140
141 #[must_use]
143 pub fn instruments(&self) -> Vec<InstrumentAny> {
144 self.instruments.load().values().cloned().collect()
145 }
146
147 #[must_use]
149 pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
150 self.instruments.load().get(instrument_id).cloned()
151 }
152
153 async fn load_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
154 let instruments = self
155 .http
156 .request_instruments()
157 .await
158 .context("Failed to load futures instruments")?;
159
160 self.instruments.rcu(|m| {
161 for instrument in &instruments {
162 m.insert(instrument.id(), instrument.clone());
163 }
164 });
165
166 self.http.cache_instruments(&instruments);
167
168 log::info!(
169 "Loaded instruments: client_id={}, count={}",
170 self.client_id,
171 instruments.len()
172 );
173
174 Ok(instruments)
175 }
176
177 fn spawn_ws<F>(&self, fut: F, context: &'static str)
178 where
179 F: Future<Output = anyhow::Result<()>> + Send + 'static,
180 {
181 get_runtime().spawn(async move {
182 if let Err(e) = fut.await {
183 log::error!("{context}: {e:?}");
184 }
185 });
186 }
187
188 fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
189 let mut rx = self
190 .ws
191 .take_output_rx()
192 .context("Failed to take futures WebSocket output receiver")?;
193 let data_sender = self.data_sender.clone();
194 let instruments = self.instruments.clone();
195 let quote_instruments = self.quote_instruments.clone();
196 let book_instruments = self.book_instruments.clone();
197 let book_sequence = Arc::new(AtomicU64::new(0));
198 let cancellation_token = self.cancellation_token.clone();
199 let clock = self.clock;
200
201 let handle = get_runtime().spawn(async move {
202 let mut order_books: AHashMap<InstrumentId, OrderBook> = AHashMap::new();
203 let mut last_quotes: AHashMap<InstrumentId, QuoteTick> = AHashMap::new();
204
205 loop {
206 tokio::select! {
207 () = cancellation_token.cancelled() => {
208 log::debug!("Futures message handler cancelled");
209 break;
210 }
211 msg = rx.recv() => {
212 match msg {
213 Some(ws_msg) => {
214 Self::handle_ws_message(
215 ws_msg,
216 &data_sender,
217 &instruments,
218 "e_instruments,
219 &book_instruments,
220 &mut order_books,
221 &mut last_quotes,
222 &book_sequence,
223 clock,
224 );
225 }
226 None => {
227 log::debug!("Futures WebSocket stream ended");
228 break;
229 }
230 }
231 }
232 }
233 }
234 });
235
236 self.tasks.push(handle);
237 Ok(())
238 }
239
240 fn lookup_instrument(
241 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
242 product_id: &str,
243 ) -> Option<InstrumentAny> {
244 let instrument_id = InstrumentId::new(Symbol::new(product_id), *KRAKEN_VENUE);
245 instruments.load().get(&instrument_id).cloned()
246 }
247
248 #[expect(clippy::too_many_arguments)]
249 fn handle_ws_message(
250 msg: KrakenFuturesWsMessage,
251 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
252 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
253 quote_instruments: &Arc<AtomicSet<InstrumentId>>,
254 book_instruments: &Arc<AtomicSet<InstrumentId>>,
255 order_books: &mut AHashMap<InstrumentId, OrderBook>,
256 last_quotes: &mut AHashMap<InstrumentId, QuoteTick>,
257 book_sequence: &Arc<AtomicU64>,
258 clock: &'static AtomicTime,
259 ) {
260 let ts_init = clock.get_time_ns();
261
262 match msg {
263 KrakenFuturesWsMessage::Ticker(ticker) => {
264 let Some(instrument) =
265 Self::lookup_instrument(instruments, ticker.product_id.as_str())
266 else {
267 log::warn!("No instrument for product_id: {}", ticker.product_id);
268 return;
269 };
270
271 if let Some(mark) = parse_futures_ws_mark_price(&ticker, &instrument, ts_init)
272 && let Err(e) = sender.send(DataEvent::Data(Data::MarkPriceUpdate(mark)))
273 {
274 log::error!("Failed to send mark price: {e}");
275 }
276
277 if let Some(index) = parse_futures_ws_index_price(&ticker, &instrument, ts_init)
278 && let Err(e) = sender.send(DataEvent::Data(Data::IndexPriceUpdate(index)))
279 {
280 log::error!("Failed to send index price: {e}");
281 }
282
283 if let Some(funding) = parse_futures_ws_funding_rate(&ticker, &instrument, ts_init)
284 && let Err(e) = sender.send(DataEvent::FundingRate(funding))
285 {
286 log::error!("Failed to send funding rate: {e}");
287 }
288 }
289 KrakenFuturesWsMessage::Trade(trade) => {
290 let Some(instrument) =
291 Self::lookup_instrument(instruments, trade.product_id.as_str())
292 else {
293 log::warn!("No instrument for product_id: {}", trade.product_id);
294 return;
295 };
296
297 match parse_futures_ws_trade_tick(&trade, &instrument, ts_init) {
298 Ok(tick) => {
299 if let Err(e) = sender.send(DataEvent::Data(Data::Trade(tick))) {
300 log::error!("Failed to send trade: {e}");
301 }
302 }
303 Err(e) => log::error!("Failed to parse futures trade tick: {e}"),
304 }
305 }
306 KrakenFuturesWsMessage::BookSnapshot(snapshot) => {
307 let Some(instrument) =
308 Self::lookup_instrument(instruments, snapshot.product_id.as_str())
309 else {
310 log::warn!("No instrument for product_id: {}", snapshot.product_id);
311 return;
312 };
313 let instrument_id = instrument.id();
314 let sequence = book_sequence.load(Ordering::Relaxed);
315
316 match parse_futures_ws_book_snapshot_deltas(
317 &snapshot,
318 &instrument,
319 sequence,
320 ts_init,
321 ) {
322 Ok(delta_vec) => {
323 if delta_vec.is_empty() {
324 return;
325 }
326 book_sequence.fetch_add(delta_vec.len() as u64, Ordering::Relaxed);
327 let deltas = OrderBookDeltas::new(instrument_id, delta_vec);
328
329 let has_quote_sub = quote_instruments.contains(&instrument_id);
330
331 if has_quote_sub {
332 let book = order_books
333 .entry(instrument_id)
334 .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
335
336 if let Err(e) = book.apply_deltas(&deltas) {
337 log::error!("Failed to apply snapshot deltas to order book: {e}");
338 } else {
339 Self::maybe_emit_quote(
340 book,
341 instrument_id,
342 last_quotes,
343 ts_init,
344 sender,
345 );
346 }
347 }
348
349 let has_book_sub = book_instruments.contains(&instrument_id);
350
351 if has_book_sub {
352 let api_deltas = OrderBookDeltas_API::new(deltas);
353 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
354 log::error!("Failed to send book snapshot deltas: {e}");
355 }
356 }
357 }
358 Err(e) => log::error!("Failed to parse book snapshot: {e}"),
359 }
360 }
361 KrakenFuturesWsMessage::BookDelta(delta) => {
362 let Some(instrument) =
363 Self::lookup_instrument(instruments, delta.product_id.as_str())
364 else {
365 log::warn!("No instrument for product_id: {}", delta.product_id);
366 return;
367 };
368 let instrument_id = instrument.id();
369 let sequence = book_sequence.fetch_add(1, Ordering::Relaxed);
370 match parse_futures_ws_book_delta(&delta, &instrument, sequence, ts_init) {
371 Ok(book_delta) => {
372 let deltas = OrderBookDeltas::new(instrument_id, vec![book_delta]);
373
374 let has_quote_sub = quote_instruments.contains(&instrument_id);
375
376 if has_quote_sub && let Some(book) = order_books.get_mut(&instrument_id) {
377 if let Err(e) = book.apply_deltas(&deltas) {
378 log::error!("Failed to apply delta to order book: {e}");
379 } else {
380 Self::maybe_emit_quote(
381 book,
382 instrument_id,
383 last_quotes,
384 ts_init,
385 sender,
386 );
387 }
388 }
389
390 let has_book_sub = book_instruments.contains(&instrument_id);
391
392 if has_book_sub {
393 let api_deltas = OrderBookDeltas_API::new(deltas);
394 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
395 log::error!("Failed to send book delta: {e}");
396 }
397 }
398 }
399 Err(e) => log::error!("Failed to parse book delta: {e}"),
400 }
401 }
402 KrakenFuturesWsMessage::Reconnected => {
403 log::info!("Futures WebSocket reconnected");
404 }
405 KrakenFuturesWsMessage::OpenOrdersCancel(_)
406 | KrakenFuturesWsMessage::OpenOrdersDelta(_)
407 | KrakenFuturesWsMessage::FillsDelta(_)
408 | KrakenFuturesWsMessage::Challenge(_) => {}
409 }
410 }
411
412 fn maybe_emit_quote(
413 book: &OrderBook,
414 instrument_id: InstrumentId,
415 last_quotes: &mut AHashMap<InstrumentId, QuoteTick>,
416 ts_init: UnixNanos,
417 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
418 ) {
419 let (Some(bid_price), Some(ask_price)) = (book.best_bid_price(), book.best_ask_price())
420 else {
421 return;
422 };
423 let (Some(bid_size), Some(ask_size)) = (book.best_bid_size(), book.best_ask_size()) else {
424 return;
425 };
426
427 let bid = bid_price.as_f64();
428 let ask = ask_price.as_f64();
429 if bid > 0.0 && (ask - bid) / bid > 0.25 {
430 log::debug!("Filtered quote with wide spread: bid={bid}, ask={ask}");
431 return;
432 }
433
434 let quote = QuoteTick::new(
435 instrument_id,
436 bid_price,
437 ask_price,
438 bid_size,
439 ask_size,
440 ts_init,
441 ts_init,
442 );
443
444 if matches!(last_quotes.get(&instrument_id), Some(prev) if *prev == quote) {
445 return;
446 }
447
448 last_quotes.insert(instrument_id, quote);
449
450 if let Err(e) = sender.send(DataEvent::Data(Data::Quote(quote))) {
451 log::error!("Failed to send quote: {e}");
452 }
453 }
454}
455
456#[async_trait(?Send)]
457impl DataClient for KrakenFuturesDataClient {
458 fn client_id(&self) -> ClientId {
459 self.client_id
460 }
461
462 fn venue(&self) -> Option<Venue> {
463 Some(*KRAKEN_VENUE)
464 }
465
466 fn start(&mut self) -> anyhow::Result<()> {
467 log::info!(
468 "Starting Futures data client: client_id={}, environment={:?}",
469 self.client_id,
470 self.config.environment
471 );
472 Ok(())
473 }
474
475 fn stop(&mut self) -> anyhow::Result<()> {
476 log::info!("Stopping Futures data client: {}", self.client_id);
477 self.cancellation_token.cancel();
478 self.is_connected.store(false, Ordering::Relaxed);
479 Ok(())
480 }
481
482 fn reset(&mut self) -> anyhow::Result<()> {
483 log::info!("Resetting Futures data client: {}", self.client_id);
484 self.cancellation_token.cancel();
485
486 for task in self.tasks.drain(..) {
487 task.abort();
488 }
489
490 let mut ws = self.ws.clone();
491 get_runtime().spawn(async move {
492 let _ = ws.close().await;
493 });
494
495 self.instruments.store(ahash::AHashMap::new());
496
497 self.quote_instruments.store(ahash::AHashSet::new());
498
499 self.is_connected.store(false, Ordering::Relaxed);
500 self.cancellation_token = CancellationToken::new();
501 Ok(())
502 }
503
504 fn dispose(&mut self) -> anyhow::Result<()> {
505 log::info!("Disposing Futures data client: {}", self.client_id);
506 self.stop()
507 }
508
509 fn is_connected(&self) -> bool {
510 self.is_connected.load(Ordering::SeqCst)
511 }
512
513 fn is_disconnected(&self) -> bool {
514 !self.is_connected()
515 }
516
517 async fn connect(&mut self) -> anyhow::Result<()> {
518 if self.is_connected() {
519 return Ok(());
520 }
521
522 let instruments = self.load_instruments().await?;
523
524 self.ws
525 .connect()
526 .await
527 .context("Failed to connect futures WebSocket")?;
528 self.ws
529 .wait_until_active(10.0)
530 .await
531 .context("Futures WebSocket failed to become active")?;
532
533 self.spawn_message_handler()?;
534
535 for instrument in instruments {
536 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
537 log::error!("Failed to send instrument: {e}");
538 }
539 }
540
541 self.is_connected.store(true, Ordering::Release);
542 log::info!(
543 "Connected: client_id={}, product_type=Futures",
544 self.client_id
545 );
546 Ok(())
547 }
548
549 async fn disconnect(&mut self) -> anyhow::Result<()> {
550 if self.is_disconnected() {
551 return Ok(());
552 }
553
554 self.cancellation_token.cancel();
555 let _ = self.ws.close().await;
556
557 for handle in self.tasks.drain(..) {
558 if let Err(e) = handle.await {
559 log::error!("Error joining WebSocket task: {e:?}");
560 }
561 }
562
563 self.cancellation_token = CancellationToken::new();
564
565 self.quote_instruments.store(ahash::AHashSet::new());
566 self.is_connected.store(false, Ordering::Relaxed);
567
568 log::info!("Disconnected: client_id={}", self.client_id);
569 Ok(())
570 }
571
572 fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
573 log::debug!("subscribe_instruments: Kraken instruments are fetched via HTTP on connect");
574 Ok(())
575 }
576
577 fn subscribe_instrument(&mut self, _cmd: SubscribeInstrument) -> anyhow::Result<()> {
578 log::debug!("subscribe_instrument: Kraken instruments are fetched via HTTP on connect");
579 Ok(())
580 }
581
582 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
583 let instrument_id = cmd.instrument_id;
584 let depth = cmd.depth;
585
586 if cmd.book_type != BookType::L2_MBP {
587 log::warn!(
588 "Book type {:?} not supported by Kraken, skipping subscription",
589 cmd.book_type
590 );
591 return Ok(());
592 }
593
594 self.book_instruments.insert(instrument_id);
595
596 let ws = self.ws.clone();
597 self.spawn_ws(
598 async move {
599 ws.subscribe_book(instrument_id, depth.map(|d| d.get() as u32))
600 .await
601 .map_err(|e| anyhow::anyhow!("{e}"))
602 },
603 "subscribe book",
604 );
605
606 log::info!("Subscribed to book: instrument_id={instrument_id}, depth={depth:?}");
607 Ok(())
608 }
609
610 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
611 let instrument_id = cmd.instrument_id;
612 let ws = self.ws.clone();
613
614 self.quote_instruments.insert(instrument_id);
615
616 self.spawn_ws(
617 async move {
618 ws.subscribe_quotes(instrument_id)
619 .await
620 .map_err(|e| anyhow::anyhow!("{e}"))
621 },
622 "subscribe quotes",
623 );
624
625 log::info!("Subscribed to quotes: instrument_id={instrument_id}");
626 Ok(())
627 }
628
629 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
630 let instrument_id = cmd.instrument_id;
631 let ws = self.ws.clone();
632
633 self.spawn_ws(
634 async move {
635 ws.subscribe_trades(instrument_id)
636 .await
637 .map_err(|e| anyhow::anyhow!("{e}"))
638 },
639 "subscribe trades",
640 );
641
642 log::info!("Subscribed to trades: instrument_id={instrument_id}");
643 Ok(())
644 }
645
646 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
647 let instrument_id = cmd.instrument_id;
648 let ws = self.ws.clone();
649
650 self.spawn_ws(
651 async move {
652 ws.subscribe_mark_price(instrument_id)
653 .await
654 .map_err(|e| anyhow::anyhow!("{e}"))
655 },
656 "subscribe mark price",
657 );
658
659 log::info!("Subscribed to mark price: instrument_id={instrument_id}");
660 Ok(())
661 }
662
663 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
664 let instrument_id = cmd.instrument_id;
665 let ws = self.ws.clone();
666
667 self.spawn_ws(
668 async move {
669 ws.subscribe_index_price(instrument_id)
670 .await
671 .map_err(|e| anyhow::anyhow!("{e}"))
672 },
673 "subscribe index price",
674 );
675
676 log::info!("Subscribed to index price: instrument_id={instrument_id}");
677 Ok(())
678 }
679
680 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
681 let instrument_id = cmd.instrument_id;
682 let ws = self.ws.clone();
683
684 self.spawn_ws(
685 async move {
686 ws.subscribe_funding_rate(instrument_id)
687 .await
688 .map_err(|e| anyhow::anyhow!("{e}"))
689 },
690 "subscribe funding rate",
691 );
692
693 log::info!("Subscribed to funding rate: instrument_id={instrument_id}");
694 Ok(())
695 }
696
697 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
698 log::warn!(
699 "Cannot subscribe to {} bars: Kraken Futures does not support EXTERNAL bar streaming",
700 cmd.bar_type
701 );
702 Ok(())
703 }
704
705 fn subscribe_instrument_status(
706 &mut self,
707 cmd: SubscribeInstrumentStatus,
708 ) -> anyhow::Result<()> {
709 log::info!(
710 "subscribe_instrument_status: {} (status changes detected via periodic instrument polling)",
711 cmd.instrument_id,
712 );
713 Ok(())
714 }
715
716 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
717 let instrument_id = cmd.instrument_id;
718
719 self.book_instruments.remove(&instrument_id);
720
721 let ws = self.ws.clone();
722 self.spawn_ws(
723 async move {
724 ws.unsubscribe_book(instrument_id)
725 .await
726 .map_err(|e| anyhow::anyhow!("{e}"))
727 },
728 "unsubscribe book",
729 );
730
731 log::info!("Unsubscribed from book: instrument_id={instrument_id}");
732 Ok(())
733 }
734
735 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
736 let instrument_id = cmd.instrument_id;
737 let ws = self.ws.clone();
738
739 self.quote_instruments.remove(&instrument_id);
740
741 self.spawn_ws(
742 async move {
743 ws.unsubscribe_quotes(instrument_id)
744 .await
745 .map_err(|e| anyhow::anyhow!("{e}"))
746 },
747 "unsubscribe quotes",
748 );
749
750 log::info!("Unsubscribed from quotes: instrument_id={instrument_id}");
751 Ok(())
752 }
753
754 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
755 let instrument_id = cmd.instrument_id;
756 let ws = self.ws.clone();
757
758 self.spawn_ws(
759 async move {
760 ws.unsubscribe_trades(instrument_id)
761 .await
762 .map_err(|e| anyhow::anyhow!("{e}"))
763 },
764 "unsubscribe trades",
765 );
766
767 log::info!("Unsubscribed from trades: instrument_id={instrument_id}");
768 Ok(())
769 }
770
771 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
772 let instrument_id = cmd.instrument_id;
773 let ws = self.ws.clone();
774
775 self.spawn_ws(
776 async move {
777 ws.unsubscribe_mark_price(instrument_id)
778 .await
779 .map_err(|e| anyhow::anyhow!("{e}"))
780 },
781 "unsubscribe mark price",
782 );
783
784 log::info!("Unsubscribed from mark price: instrument_id={instrument_id}");
785 Ok(())
786 }
787
788 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
789 let instrument_id = cmd.instrument_id;
790 let ws = self.ws.clone();
791
792 self.spawn_ws(
793 async move {
794 ws.unsubscribe_index_price(instrument_id)
795 .await
796 .map_err(|e| anyhow::anyhow!("{e}"))
797 },
798 "unsubscribe index price",
799 );
800
801 log::info!("Unsubscribed from index price: instrument_id={instrument_id}");
802 Ok(())
803 }
804
805 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
806 let instrument_id = cmd.instrument_id;
807 let ws = self.ws.clone();
808
809 self.spawn_ws(
810 async move {
811 ws.unsubscribe_funding_rate(instrument_id)
812 .await
813 .map_err(|e| anyhow::anyhow!("{e}"))
814 },
815 "unsubscribe funding rate",
816 );
817
818 log::info!("Unsubscribed from funding rate: instrument_id={instrument_id}");
819 Ok(())
820 }
821
822 fn unsubscribe_bars(&mut self, _cmd: &UnsubscribeBars) -> anyhow::Result<()> {
823 Ok(())
824 }
825
826 fn unsubscribe_instrument_status(
827 &mut self,
828 _cmd: &UnsubscribeInstrumentStatus,
829 ) -> anyhow::Result<()> {
830 Ok(())
831 }
832
833 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
834 let http = self.http.clone();
835 let sender = self.data_sender.clone();
836 let instruments_cache = self.instruments.clone();
837 let request_id = request.request_id;
838 let client_id = request.client_id.unwrap_or(self.client_id);
839 let venue = *KRAKEN_VENUE;
840 let start_nanos = datetime_to_unix_nanos(request.start);
841 let end_nanos = datetime_to_unix_nanos(request.end);
842 let params = request.params;
843 let clock = self.clock;
844
845 get_runtime().spawn(async move {
846 match http.request_instruments().await {
847 Ok(instruments) => {
848 instruments_cache.rcu(|m| {
849 for instrument in &instruments {
850 m.insert(instrument.id(), instrument.clone());
851 }
852 });
853 http.cache_instruments(&instruments);
854
855 let response = DataResponse::Instruments(InstrumentsResponse::new(
856 request_id,
857 client_id,
858 venue,
859 instruments,
860 start_nanos,
861 end_nanos,
862 clock.get_time_ns(),
863 params,
864 ));
865
866 if let Err(e) = sender.send(DataEvent::Response(response)) {
867 log::error!("Failed to send instruments response: {e}");
868 }
869 }
870 Err(e) => log::error!("Instruments request failed: {e:?}"),
871 }
872 });
873
874 Ok(())
875 }
876
877 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
878 let http = self.http.clone();
879 let sender = self.data_sender.clone();
880 let instruments = self.instruments.clone();
881 let instrument_id = request.instrument_id;
882 let request_id = request.request_id;
883 let client_id = request.client_id.unwrap_or(self.client_id);
884 let start_nanos = datetime_to_unix_nanos(request.start);
885 let end_nanos = datetime_to_unix_nanos(request.end);
886 let params = request.params;
887 let clock = self.clock;
888
889 get_runtime().spawn(async move {
890 if let Some(instrument) = instruments.load().get(&instrument_id) {
891 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
892 request_id,
893 client_id,
894 instrument.id(),
895 instrument.clone(),
896 start_nanos,
897 end_nanos,
898 clock.get_time_ns(),
899 params,
900 )));
901
902 if let Err(e) = sender.send(DataEvent::Response(response)) {
903 log::error!("Failed to send instrument response: {e}");
904 }
905 return;
906 }
907
908 match http.request_instruments().await {
909 Ok(all_instruments) => {
910 instruments.rcu(|m| {
911 for instrument in &all_instruments {
912 m.insert(instrument.id(), instrument.clone());
913 }
914 });
915 http.cache_instruments(&all_instruments);
916
917 let instrument = all_instruments
918 .into_iter()
919 .find(|i| i.id() == instrument_id);
920
921 if let Some(instrument) = instrument {
922 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
923 request_id,
924 client_id,
925 instrument.id(),
926 instrument,
927 start_nanos,
928 end_nanos,
929 clock.get_time_ns(),
930 params,
931 )));
932
933 if let Err(e) = sender.send(DataEvent::Response(response)) {
934 log::error!("Failed to send instrument response: {e}");
935 }
936 } else {
937 log::error!("Instrument not found: {instrument_id}");
938 }
939 }
940 Err(e) => log::error!("Instrument request failed: {e:?}"),
941 }
942 });
943
944 Ok(())
945 }
946
947 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
948 let http = self.http.clone();
949 let sender = self.data_sender.clone();
950 let instrument_id = request.instrument_id;
951 let start = request.start;
952 let end = request.end;
953 let limit = request.limit.map(|n| n.get() as u64);
954 let request_id = request.request_id;
955 let client_id = request.client_id.unwrap_or(self.client_id);
956 let params = request.params;
957 let clock = self.clock;
958 let start_nanos = datetime_to_unix_nanos(start);
959 let end_nanos = datetime_to_unix_nanos(end);
960
961 get_runtime().spawn(async move {
962 match http.request_trades(instrument_id, start, end, limit).await {
963 Ok(trades) => {
964 let response = DataResponse::Trades(TradesResponse::new(
965 request_id,
966 client_id,
967 instrument_id,
968 trades,
969 start_nanos,
970 end_nanos,
971 clock.get_time_ns(),
972 params,
973 ));
974
975 if let Err(e) = sender.send(DataEvent::Response(response)) {
976 log::error!("Failed to send trades response: {e}");
977 }
978 }
979 Err(e) => log::error!("Trades request failed: {e:?}"),
980 }
981 });
982
983 Ok(())
984 }
985
986 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
987 let http = self.http.clone();
988 let sender = self.data_sender.clone();
989 let bar_type = request.bar_type;
990 let start = request.start;
991 let end = request.end;
992 let limit = request.limit.map(|n| n.get() as u64);
993 let request_id = request.request_id;
994 let client_id = request.client_id.unwrap_or(self.client_id);
995 let params = request.params;
996 let clock = self.clock;
997 let start_nanos = datetime_to_unix_nanos(start);
998 let end_nanos = datetime_to_unix_nanos(end);
999
1000 get_runtime().spawn(async move {
1001 match http.request_bars(bar_type, start, end, limit).await {
1002 Ok(bars) => {
1003 let response = DataResponse::Bars(BarsResponse::new(
1004 request_id,
1005 client_id,
1006 bar_type,
1007 bars,
1008 start_nanos,
1009 end_nanos,
1010 clock.get_time_ns(),
1011 params,
1012 ));
1013
1014 if let Err(e) = sender.send(DataEvent::Response(response)) {
1015 log::error!("Failed to send bars response: {e}");
1016 }
1017 }
1018 Err(e) => log::error!("Bars request failed: {e:?}"),
1019 }
1020 });
1021
1022 Ok(())
1023 }
1024
1025 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1026 let http = self.http.clone();
1027 let sender = self.data_sender.clone();
1028 let instrument_id = request.instrument_id;
1029 let depth = request.depth.map(|n| n.get() as u32);
1030 let request_id = request.request_id;
1031 let client_id = request.client_id.unwrap_or(self.client_id);
1032 let params = request.params;
1033 let clock = self.clock;
1034
1035 get_runtime().spawn(async move {
1036 match http.request_book_snapshot(instrument_id, depth).await {
1037 Ok(book) => {
1038 let response = DataResponse::Book(BookResponse::new(
1039 request_id,
1040 client_id,
1041 instrument_id,
1042 book,
1043 None,
1044 None,
1045 clock.get_time_ns(),
1046 params,
1047 ));
1048
1049 if let Err(e) = sender.send(DataEvent::Response(response)) {
1050 log::error!("Failed to send book snapshot response: {e}");
1051 }
1052 }
1053 Err(e) => log::error!("Book snapshot request failed: {e:?}"),
1054 }
1055 });
1056
1057 Ok(())
1058 }
1059
1060 fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1061 let http = self.http.clone();
1062 let sender = self.data_sender.clone();
1063 let instrument_id = request.instrument_id;
1064 let start = request.start;
1065 let end = request.end;
1066 let limit = request.limit.map(|n| n.get());
1067 let request_id = request.request_id;
1068 let client_id = request.client_id.unwrap_or(self.client_id);
1069 let start_nanos = datetime_to_unix_nanos(start);
1070 let end_nanos = datetime_to_unix_nanos(end);
1071 let params = request.params;
1072 let clock = self.clock;
1073
1074 get_runtime().spawn(async move {
1075 match http
1076 .request_funding_rates(instrument_id, start, end, limit)
1077 .await
1078 {
1079 Ok(rates) => {
1080 let response = DataResponse::FundingRates(FundingRatesResponse::new(
1081 request_id,
1082 client_id,
1083 instrument_id,
1084 rates,
1085 start_nanos,
1086 end_nanos,
1087 clock.get_time_ns(),
1088 params,
1089 ));
1090
1091 if let Err(e) = sender.send(DataEvent::Response(response)) {
1092 log::error!("Failed to send funding rates response: {e}");
1093 }
1094 }
1095 Err(e) => log::error!("Funding rates request failed: {e:?}"),
1096 }
1097 });
1098
1099 Ok(())
1100 }
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105 use nautilus_common::{live::runner::set_data_event_sender, messages::DataEvent};
1106 use nautilus_model::identifiers::ClientId;
1107 use rstest::rstest;
1108
1109 use super::*;
1110 use crate::{common::enums::KrakenProductType, config::KrakenDataClientConfig};
1111
1112 fn setup_test_env() {
1113 let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1114 set_data_event_sender(sender);
1115 }
1116
1117 #[rstest]
1118 fn test_futures_data_client_new() {
1119 setup_test_env();
1120 let config = KrakenDataClientConfig {
1121 product_type: KrakenProductType::Futures,
1122 ..Default::default()
1123 };
1124 let client = KrakenFuturesDataClient::new(ClientId::from("KRAKEN"), config);
1125 assert!(client.is_ok());
1126
1127 let client = client.unwrap();
1128 assert_eq!(client.client_id(), ClientId::from("KRAKEN"));
1129 assert_eq!(client.venue(), Some(*KRAKEN_VENUE));
1130 assert!(!client.is_connected());
1131 assert!(client.is_disconnected());
1132 assert!(client.instruments().is_empty());
1133 }
1134
1135 #[rstest]
1136 fn test_futures_data_client_start_stop() {
1137 setup_test_env();
1138 let config = KrakenDataClientConfig {
1139 product_type: KrakenProductType::Futures,
1140 ..Default::default()
1141 };
1142 let mut client = KrakenFuturesDataClient::new(ClientId::from("KRAKEN"), config).unwrap();
1143
1144 assert!(client.start().is_ok());
1145 assert!(client.stop().is_ok());
1146 assert!(client.is_disconnected());
1147 }
1148}