Skip to main content

nautilus_kraken/data/
futures.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Kraken Futures data client implementation.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, AtomicU64, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use async_trait::async_trait;
29use nautilus_common::{
30    clients::DataClient,
31    live::{get_data_event_sender, get_runtime},
32    messages::{
33        DataEvent,
34        data::{
35            BarsResponse, BookResponse, DataResponse, FundingRatesResponse, InstrumentResponse,
36            InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
37            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
38            SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
39            SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
40            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
41            UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrumentStatus,
42            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
43        },
44    },
45};
46use nautilus_core::{
47    AtomicMap, AtomicSet,
48    datetime::datetime_to_unix_nanos,
49    nanos::UnixNanos,
50    time::{AtomicTime, get_atomic_clock_realtime},
51};
52use nautilus_model::{
53    data::{Data, OrderBookDeltas, OrderBookDeltas_API, QuoteTick},
54    enums::BookType,
55    identifiers::{ClientId, InstrumentId, Symbol, Venue},
56    instruments::{Instrument, InstrumentAny},
57    orderbook::OrderBook,
58};
59use tokio::task::JoinHandle;
60use tokio_util::sync::CancellationToken;
61
62use crate::{
63    common::consts::KRAKEN_VENUE,
64    config::KrakenDataClientConfig,
65    http::{
66        KrakenFuturesHttpClient, futures::client::KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND,
67    },
68    websocket::futures::{
69        client::KrakenFuturesWebSocketClient,
70        messages::KrakenFuturesWsMessage,
71        parse::{
72            parse_futures_ws_book_delta, parse_futures_ws_book_snapshot_deltas,
73            parse_futures_ws_funding_rate, parse_futures_ws_index_price,
74            parse_futures_ws_mark_price, parse_futures_ws_trade_tick,
75        },
76    },
77};
78
79/// Kraken Futures data client.
80///
81/// Provides real-time market data from Kraken Futures markets.
82#[allow(dead_code)]
83#[derive(Debug)]
84pub struct KrakenFuturesDataClient {
85    clock: &'static AtomicTime,
86    client_id: ClientId,
87    config: KrakenDataClientConfig,
88    http: KrakenFuturesHttpClient,
89    ws: KrakenFuturesWebSocketClient,
90    is_connected: AtomicBool,
91    cancellation_token: CancellationToken,
92    tasks: Vec<JoinHandle<()>>,
93    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
94    quote_instruments: Arc<AtomicSet<InstrumentId>>,
95    book_instruments: Arc<AtomicSet<InstrumentId>>,
96    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
97}
98
99impl KrakenFuturesDataClient {
100    /// Creates a new [`KrakenFuturesDataClient`] instance.
101    pub fn new(client_id: ClientId, config: KrakenDataClientConfig) -> anyhow::Result<Self> {
102        let cancellation_token = CancellationToken::new();
103
104        let http = KrakenFuturesHttpClient::new(
105            config.environment,
106            config.base_url.clone(),
107            config.timeout_secs,
108            None,
109            None,
110            None,
111            config.proxy_url.clone(),
112            config
113                .max_requests_per_second
114                .unwrap_or(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND),
115        )?;
116
117        let ws = KrakenFuturesWebSocketClient::with_credentials(
118            config.ws_public_url(),
119            config.heartbeat_interval_secs,
120            None,
121            config.transport_backend,
122            config.proxy_url.clone(),
123        );
124
125        Ok(Self {
126            clock: get_atomic_clock_realtime(),
127            client_id,
128            config,
129            http,
130            ws,
131            is_connected: AtomicBool::new(false),
132            cancellation_token,
133            tasks: Vec::new(),
134            instruments: Arc::new(AtomicMap::new()),
135            quote_instruments: Arc::new(AtomicSet::new()),
136            book_instruments: Arc::new(AtomicSet::new()),
137            data_sender: get_data_event_sender(),
138        })
139    }
140
141    /// Returns the cached instruments.
142    #[must_use]
143    pub fn instruments(&self) -> Vec<InstrumentAny> {
144        self.instruments.load().values().cloned().collect()
145    }
146
147    /// Returns a cached instrument by ID.
148    #[must_use]
149    pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
150        self.instruments.load().get(instrument_id).cloned()
151    }
152
153    async fn load_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
154        let instruments = self
155            .http
156            .request_instruments()
157            .await
158            .context("Failed to load futures instruments")?;
159
160        self.instruments.rcu(|m| {
161            for instrument in &instruments {
162                m.insert(instrument.id(), instrument.clone());
163            }
164        });
165
166        self.http.cache_instruments(&instruments);
167
168        log::info!(
169            "Loaded instruments: client_id={}, count={}",
170            self.client_id,
171            instruments.len()
172        );
173
174        Ok(instruments)
175    }
176
177    fn spawn_ws<F>(&self, fut: F, context: &'static str)
178    where
179        F: Future<Output = anyhow::Result<()>> + Send + 'static,
180    {
181        get_runtime().spawn(async move {
182            if let Err(e) = fut.await {
183                log::error!("{context}: {e:?}");
184            }
185        });
186    }
187
188    fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
189        let mut rx = self
190            .ws
191            .take_output_rx()
192            .context("Failed to take futures WebSocket output receiver")?;
193        let data_sender = self.data_sender.clone();
194        let instruments = self.instruments.clone();
195        let quote_instruments = self.quote_instruments.clone();
196        let book_instruments = self.book_instruments.clone();
197        let book_sequence = Arc::new(AtomicU64::new(0));
198        let cancellation_token = self.cancellation_token.clone();
199        let clock = self.clock;
200
201        let handle = get_runtime().spawn(async move {
202            let mut order_books: AHashMap<InstrumentId, OrderBook> = AHashMap::new();
203            let mut last_quotes: AHashMap<InstrumentId, QuoteTick> = AHashMap::new();
204
205            loop {
206                tokio::select! {
207                    () = cancellation_token.cancelled() => {
208                        log::debug!("Futures message handler cancelled");
209                        break;
210                    }
211                    msg = rx.recv() => {
212                        match msg {
213                            Some(ws_msg) => {
214                                Self::handle_ws_message(
215                                    ws_msg,
216                                    &data_sender,
217                                    &instruments,
218                                    &quote_instruments,
219                                    &book_instruments,
220                                    &mut order_books,
221                                    &mut last_quotes,
222                                    &book_sequence,
223                                    clock,
224                                );
225                            }
226                            None => {
227                                log::debug!("Futures WebSocket stream ended");
228                                break;
229                            }
230                        }
231                    }
232                }
233            }
234        });
235
236        self.tasks.push(handle);
237        Ok(())
238    }
239
240    fn lookup_instrument(
241        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
242        product_id: &str,
243    ) -> Option<InstrumentAny> {
244        let instrument_id = InstrumentId::new(Symbol::new(product_id), *KRAKEN_VENUE);
245        instruments.load().get(&instrument_id).cloned()
246    }
247
248    #[expect(clippy::too_many_arguments)]
249    fn handle_ws_message(
250        msg: KrakenFuturesWsMessage,
251        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
252        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
253        quote_instruments: &Arc<AtomicSet<InstrumentId>>,
254        book_instruments: &Arc<AtomicSet<InstrumentId>>,
255        order_books: &mut AHashMap<InstrumentId, OrderBook>,
256        last_quotes: &mut AHashMap<InstrumentId, QuoteTick>,
257        book_sequence: &Arc<AtomicU64>,
258        clock: &'static AtomicTime,
259    ) {
260        let ts_init = clock.get_time_ns();
261
262        match msg {
263            KrakenFuturesWsMessage::Ticker(ticker) => {
264                let Some(instrument) =
265                    Self::lookup_instrument(instruments, ticker.product_id.as_str())
266                else {
267                    log::warn!("No instrument for product_id: {}", ticker.product_id);
268                    return;
269                };
270
271                if let Some(mark) = parse_futures_ws_mark_price(&ticker, &instrument, ts_init)
272                    && let Err(e) = sender.send(DataEvent::Data(Data::MarkPriceUpdate(mark)))
273                {
274                    log::error!("Failed to send mark price: {e}");
275                }
276
277                if let Some(index) = parse_futures_ws_index_price(&ticker, &instrument, ts_init)
278                    && let Err(e) = sender.send(DataEvent::Data(Data::IndexPriceUpdate(index)))
279                {
280                    log::error!("Failed to send index price: {e}");
281                }
282
283                if let Some(funding) = parse_futures_ws_funding_rate(&ticker, &instrument, ts_init)
284                    && let Err(e) = sender.send(DataEvent::FundingRate(funding))
285                {
286                    log::error!("Failed to send funding rate: {e}");
287                }
288            }
289            KrakenFuturesWsMessage::Trade(trade) => {
290                let Some(instrument) =
291                    Self::lookup_instrument(instruments, trade.product_id.as_str())
292                else {
293                    log::warn!("No instrument for product_id: {}", trade.product_id);
294                    return;
295                };
296
297                match parse_futures_ws_trade_tick(&trade, &instrument, ts_init) {
298                    Ok(tick) => {
299                        if let Err(e) = sender.send(DataEvent::Data(Data::Trade(tick))) {
300                            log::error!("Failed to send trade: {e}");
301                        }
302                    }
303                    Err(e) => log::error!("Failed to parse futures trade tick: {e}"),
304                }
305            }
306            KrakenFuturesWsMessage::BookSnapshot(snapshot) => {
307                let Some(instrument) =
308                    Self::lookup_instrument(instruments, snapshot.product_id.as_str())
309                else {
310                    log::warn!("No instrument for product_id: {}", snapshot.product_id);
311                    return;
312                };
313                let instrument_id = instrument.id();
314                let sequence = book_sequence.load(Ordering::Relaxed);
315
316                match parse_futures_ws_book_snapshot_deltas(
317                    &snapshot,
318                    &instrument,
319                    sequence,
320                    ts_init,
321                ) {
322                    Ok(delta_vec) => {
323                        if delta_vec.is_empty() {
324                            return;
325                        }
326                        book_sequence.fetch_add(delta_vec.len() as u64, Ordering::Relaxed);
327                        let deltas = OrderBookDeltas::new(instrument_id, delta_vec);
328
329                        let has_quote_sub = quote_instruments.contains(&instrument_id);
330
331                        if has_quote_sub {
332                            let book = order_books
333                                .entry(instrument_id)
334                                .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
335
336                            if let Err(e) = book.apply_deltas(&deltas) {
337                                log::error!("Failed to apply snapshot deltas to order book: {e}");
338                            } else {
339                                Self::maybe_emit_quote(
340                                    book,
341                                    instrument_id,
342                                    last_quotes,
343                                    ts_init,
344                                    sender,
345                                );
346                            }
347                        }
348
349                        let has_book_sub = book_instruments.contains(&instrument_id);
350
351                        if has_book_sub {
352                            let api_deltas = OrderBookDeltas_API::new(deltas);
353                            if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
354                                log::error!("Failed to send book snapshot deltas: {e}");
355                            }
356                        }
357                    }
358                    Err(e) => log::error!("Failed to parse book snapshot: {e}"),
359                }
360            }
361            KrakenFuturesWsMessage::BookDelta(delta) => {
362                let Some(instrument) =
363                    Self::lookup_instrument(instruments, delta.product_id.as_str())
364                else {
365                    log::warn!("No instrument for product_id: {}", delta.product_id);
366                    return;
367                };
368                let instrument_id = instrument.id();
369                let sequence = book_sequence.fetch_add(1, Ordering::Relaxed);
370                match parse_futures_ws_book_delta(&delta, &instrument, sequence, ts_init) {
371                    Ok(book_delta) => {
372                        let deltas = OrderBookDeltas::new(instrument_id, vec![book_delta]);
373
374                        let has_quote_sub = quote_instruments.contains(&instrument_id);
375
376                        if has_quote_sub && let Some(book) = order_books.get_mut(&instrument_id) {
377                            if let Err(e) = book.apply_deltas(&deltas) {
378                                log::error!("Failed to apply delta to order book: {e}");
379                            } else {
380                                Self::maybe_emit_quote(
381                                    book,
382                                    instrument_id,
383                                    last_quotes,
384                                    ts_init,
385                                    sender,
386                                );
387                            }
388                        }
389
390                        let has_book_sub = book_instruments.contains(&instrument_id);
391
392                        if has_book_sub {
393                            let api_deltas = OrderBookDeltas_API::new(deltas);
394                            if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
395                                log::error!("Failed to send book delta: {e}");
396                            }
397                        }
398                    }
399                    Err(e) => log::error!("Failed to parse book delta: {e}"),
400                }
401            }
402            KrakenFuturesWsMessage::Reconnected => {
403                log::info!("Futures WebSocket reconnected");
404            }
405            KrakenFuturesWsMessage::OpenOrdersCancel(_)
406            | KrakenFuturesWsMessage::OpenOrdersDelta(_)
407            | KrakenFuturesWsMessage::FillsDelta(_)
408            | KrakenFuturesWsMessage::Challenge(_) => {}
409        }
410    }
411
412    fn maybe_emit_quote(
413        book: &OrderBook,
414        instrument_id: InstrumentId,
415        last_quotes: &mut AHashMap<InstrumentId, QuoteTick>,
416        ts_init: UnixNanos,
417        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
418    ) {
419        let (Some(bid_price), Some(ask_price)) = (book.best_bid_price(), book.best_ask_price())
420        else {
421            return;
422        };
423        let (Some(bid_size), Some(ask_size)) = (book.best_bid_size(), book.best_ask_size()) else {
424            return;
425        };
426
427        let bid = bid_price.as_f64();
428        let ask = ask_price.as_f64();
429        if bid > 0.0 && (ask - bid) / bid > 0.25 {
430            log::debug!("Filtered quote with wide spread: bid={bid}, ask={ask}");
431            return;
432        }
433
434        let quote = QuoteTick::new(
435            instrument_id,
436            bid_price,
437            ask_price,
438            bid_size,
439            ask_size,
440            ts_init,
441            ts_init,
442        );
443
444        if matches!(last_quotes.get(&instrument_id), Some(prev) if *prev == quote) {
445            return;
446        }
447
448        last_quotes.insert(instrument_id, quote);
449
450        if let Err(e) = sender.send(DataEvent::Data(Data::Quote(quote))) {
451            log::error!("Failed to send quote: {e}");
452        }
453    }
454}
455
456#[async_trait(?Send)]
457impl DataClient for KrakenFuturesDataClient {
458    fn client_id(&self) -> ClientId {
459        self.client_id
460    }
461
462    fn venue(&self) -> Option<Venue> {
463        Some(*KRAKEN_VENUE)
464    }
465
466    fn start(&mut self) -> anyhow::Result<()> {
467        log::info!(
468            "Starting Futures data client: client_id={}, environment={:?}",
469            self.client_id,
470            self.config.environment
471        );
472        Ok(())
473    }
474
475    fn stop(&mut self) -> anyhow::Result<()> {
476        log::info!("Stopping Futures data client: {}", self.client_id);
477        self.cancellation_token.cancel();
478        self.is_connected.store(false, Ordering::Relaxed);
479        Ok(())
480    }
481
482    fn reset(&mut self) -> anyhow::Result<()> {
483        log::info!("Resetting Futures data client: {}", self.client_id);
484        self.cancellation_token.cancel();
485
486        for task in self.tasks.drain(..) {
487            task.abort();
488        }
489
490        let mut ws = self.ws.clone();
491        get_runtime().spawn(async move {
492            let _ = ws.close().await;
493        });
494
495        self.instruments.store(ahash::AHashMap::new());
496
497        self.quote_instruments.store(ahash::AHashSet::new());
498
499        self.is_connected.store(false, Ordering::Relaxed);
500        self.cancellation_token = CancellationToken::new();
501        Ok(())
502    }
503
504    fn dispose(&mut self) -> anyhow::Result<()> {
505        log::info!("Disposing Futures data client: {}", self.client_id);
506        self.stop()
507    }
508
509    fn is_connected(&self) -> bool {
510        self.is_connected.load(Ordering::SeqCst)
511    }
512
513    fn is_disconnected(&self) -> bool {
514        !self.is_connected()
515    }
516
517    async fn connect(&mut self) -> anyhow::Result<()> {
518        if self.is_connected() {
519            return Ok(());
520        }
521
522        let instruments = self.load_instruments().await?;
523
524        self.ws
525            .connect()
526            .await
527            .context("Failed to connect futures WebSocket")?;
528        self.ws
529            .wait_until_active(10.0)
530            .await
531            .context("Futures WebSocket failed to become active")?;
532
533        self.spawn_message_handler()?;
534
535        for instrument in instruments {
536            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
537                log::error!("Failed to send instrument: {e}");
538            }
539        }
540
541        self.is_connected.store(true, Ordering::Release);
542        log::info!(
543            "Connected: client_id={}, product_type=Futures",
544            self.client_id
545        );
546        Ok(())
547    }
548
549    async fn disconnect(&mut self) -> anyhow::Result<()> {
550        if self.is_disconnected() {
551            return Ok(());
552        }
553
554        self.cancellation_token.cancel();
555        let _ = self.ws.close().await;
556
557        for handle in self.tasks.drain(..) {
558            if let Err(e) = handle.await {
559                log::error!("Error joining WebSocket task: {e:?}");
560            }
561        }
562
563        self.cancellation_token = CancellationToken::new();
564
565        self.quote_instruments.store(ahash::AHashSet::new());
566        self.is_connected.store(false, Ordering::Relaxed);
567
568        log::info!("Disconnected: client_id={}", self.client_id);
569        Ok(())
570    }
571
572    fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
573        log::debug!("subscribe_instruments: Kraken instruments are fetched via HTTP on connect");
574        Ok(())
575    }
576
577    fn subscribe_instrument(&mut self, _cmd: SubscribeInstrument) -> anyhow::Result<()> {
578        log::debug!("subscribe_instrument: Kraken instruments are fetched via HTTP on connect");
579        Ok(())
580    }
581
582    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
583        let instrument_id = cmd.instrument_id;
584        let depth = cmd.depth;
585
586        if cmd.book_type != BookType::L2_MBP {
587            log::warn!(
588                "Book type {:?} not supported by Kraken, skipping subscription",
589                cmd.book_type
590            );
591            return Ok(());
592        }
593
594        self.book_instruments.insert(instrument_id);
595
596        let ws = self.ws.clone();
597        self.spawn_ws(
598            async move {
599                ws.subscribe_book(instrument_id, depth.map(|d| d.get() as u32))
600                    .await
601                    .map_err(|e| anyhow::anyhow!("{e}"))
602            },
603            "subscribe book",
604        );
605
606        log::info!("Subscribed to book: instrument_id={instrument_id}, depth={depth:?}");
607        Ok(())
608    }
609
610    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
611        let instrument_id = cmd.instrument_id;
612        let ws = self.ws.clone();
613
614        self.quote_instruments.insert(instrument_id);
615
616        self.spawn_ws(
617            async move {
618                ws.subscribe_quotes(instrument_id)
619                    .await
620                    .map_err(|e| anyhow::anyhow!("{e}"))
621            },
622            "subscribe quotes",
623        );
624
625        log::info!("Subscribed to quotes: instrument_id={instrument_id}");
626        Ok(())
627    }
628
629    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
630        let instrument_id = cmd.instrument_id;
631        let ws = self.ws.clone();
632
633        self.spawn_ws(
634            async move {
635                ws.subscribe_trades(instrument_id)
636                    .await
637                    .map_err(|e| anyhow::anyhow!("{e}"))
638            },
639            "subscribe trades",
640        );
641
642        log::info!("Subscribed to trades: instrument_id={instrument_id}");
643        Ok(())
644    }
645
646    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
647        let instrument_id = cmd.instrument_id;
648        let ws = self.ws.clone();
649
650        self.spawn_ws(
651            async move {
652                ws.subscribe_mark_price(instrument_id)
653                    .await
654                    .map_err(|e| anyhow::anyhow!("{e}"))
655            },
656            "subscribe mark price",
657        );
658
659        log::info!("Subscribed to mark price: instrument_id={instrument_id}");
660        Ok(())
661    }
662
663    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
664        let instrument_id = cmd.instrument_id;
665        let ws = self.ws.clone();
666
667        self.spawn_ws(
668            async move {
669                ws.subscribe_index_price(instrument_id)
670                    .await
671                    .map_err(|e| anyhow::anyhow!("{e}"))
672            },
673            "subscribe index price",
674        );
675
676        log::info!("Subscribed to index price: instrument_id={instrument_id}");
677        Ok(())
678    }
679
680    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
681        let instrument_id = cmd.instrument_id;
682        let ws = self.ws.clone();
683
684        self.spawn_ws(
685            async move {
686                ws.subscribe_funding_rate(instrument_id)
687                    .await
688                    .map_err(|e| anyhow::anyhow!("{e}"))
689            },
690            "subscribe funding rate",
691        );
692
693        log::info!("Subscribed to funding rate: instrument_id={instrument_id}");
694        Ok(())
695    }
696
697    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
698        log::warn!(
699            "Cannot subscribe to {} bars: Kraken Futures does not support EXTERNAL bar streaming",
700            cmd.bar_type
701        );
702        Ok(())
703    }
704
705    fn subscribe_instrument_status(
706        &mut self,
707        cmd: SubscribeInstrumentStatus,
708    ) -> anyhow::Result<()> {
709        log::info!(
710            "subscribe_instrument_status: {} (status changes detected via periodic instrument polling)",
711            cmd.instrument_id,
712        );
713        Ok(())
714    }
715
716    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
717        let instrument_id = cmd.instrument_id;
718
719        self.book_instruments.remove(&instrument_id);
720
721        let ws = self.ws.clone();
722        self.spawn_ws(
723            async move {
724                ws.unsubscribe_book(instrument_id)
725                    .await
726                    .map_err(|e| anyhow::anyhow!("{e}"))
727            },
728            "unsubscribe book",
729        );
730
731        log::info!("Unsubscribed from book: instrument_id={instrument_id}");
732        Ok(())
733    }
734
735    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
736        let instrument_id = cmd.instrument_id;
737        let ws = self.ws.clone();
738
739        self.quote_instruments.remove(&instrument_id);
740
741        self.spawn_ws(
742            async move {
743                ws.unsubscribe_quotes(instrument_id)
744                    .await
745                    .map_err(|e| anyhow::anyhow!("{e}"))
746            },
747            "unsubscribe quotes",
748        );
749
750        log::info!("Unsubscribed from quotes: instrument_id={instrument_id}");
751        Ok(())
752    }
753
754    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
755        let instrument_id = cmd.instrument_id;
756        let ws = self.ws.clone();
757
758        self.spawn_ws(
759            async move {
760                ws.unsubscribe_trades(instrument_id)
761                    .await
762                    .map_err(|e| anyhow::anyhow!("{e}"))
763            },
764            "unsubscribe trades",
765        );
766
767        log::info!("Unsubscribed from trades: instrument_id={instrument_id}");
768        Ok(())
769    }
770
771    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
772        let instrument_id = cmd.instrument_id;
773        let ws = self.ws.clone();
774
775        self.spawn_ws(
776            async move {
777                ws.unsubscribe_mark_price(instrument_id)
778                    .await
779                    .map_err(|e| anyhow::anyhow!("{e}"))
780            },
781            "unsubscribe mark price",
782        );
783
784        log::info!("Unsubscribed from mark price: instrument_id={instrument_id}");
785        Ok(())
786    }
787
788    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
789        let instrument_id = cmd.instrument_id;
790        let ws = self.ws.clone();
791
792        self.spawn_ws(
793            async move {
794                ws.unsubscribe_index_price(instrument_id)
795                    .await
796                    .map_err(|e| anyhow::anyhow!("{e}"))
797            },
798            "unsubscribe index price",
799        );
800
801        log::info!("Unsubscribed from index price: instrument_id={instrument_id}");
802        Ok(())
803    }
804
805    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
806        let instrument_id = cmd.instrument_id;
807        let ws = self.ws.clone();
808
809        self.spawn_ws(
810            async move {
811                ws.unsubscribe_funding_rate(instrument_id)
812                    .await
813                    .map_err(|e| anyhow::anyhow!("{e}"))
814            },
815            "unsubscribe funding rate",
816        );
817
818        log::info!("Unsubscribed from funding rate: instrument_id={instrument_id}");
819        Ok(())
820    }
821
822    fn unsubscribe_bars(&mut self, _cmd: &UnsubscribeBars) -> anyhow::Result<()> {
823        Ok(())
824    }
825
826    fn unsubscribe_instrument_status(
827        &mut self,
828        _cmd: &UnsubscribeInstrumentStatus,
829    ) -> anyhow::Result<()> {
830        Ok(())
831    }
832
833    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
834        let http = self.http.clone();
835        let sender = self.data_sender.clone();
836        let instruments_cache = self.instruments.clone();
837        let request_id = request.request_id;
838        let client_id = request.client_id.unwrap_or(self.client_id);
839        let venue = *KRAKEN_VENUE;
840        let start_nanos = datetime_to_unix_nanos(request.start);
841        let end_nanos = datetime_to_unix_nanos(request.end);
842        let params = request.params;
843        let clock = self.clock;
844
845        get_runtime().spawn(async move {
846            match http.request_instruments().await {
847                Ok(instruments) => {
848                    instruments_cache.rcu(|m| {
849                        for instrument in &instruments {
850                            m.insert(instrument.id(), instrument.clone());
851                        }
852                    });
853                    http.cache_instruments(&instruments);
854
855                    let response = DataResponse::Instruments(InstrumentsResponse::new(
856                        request_id,
857                        client_id,
858                        venue,
859                        instruments,
860                        start_nanos,
861                        end_nanos,
862                        clock.get_time_ns(),
863                        params,
864                    ));
865
866                    if let Err(e) = sender.send(DataEvent::Response(response)) {
867                        log::error!("Failed to send instruments response: {e}");
868                    }
869                }
870                Err(e) => log::error!("Instruments request failed: {e:?}"),
871            }
872        });
873
874        Ok(())
875    }
876
877    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
878        let http = self.http.clone();
879        let sender = self.data_sender.clone();
880        let instruments = self.instruments.clone();
881        let instrument_id = request.instrument_id;
882        let request_id = request.request_id;
883        let client_id = request.client_id.unwrap_or(self.client_id);
884        let start_nanos = datetime_to_unix_nanos(request.start);
885        let end_nanos = datetime_to_unix_nanos(request.end);
886        let params = request.params;
887        let clock = self.clock;
888
889        get_runtime().spawn(async move {
890            if let Some(instrument) = instruments.load().get(&instrument_id) {
891                let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
892                    request_id,
893                    client_id,
894                    instrument.id(),
895                    instrument.clone(),
896                    start_nanos,
897                    end_nanos,
898                    clock.get_time_ns(),
899                    params,
900                )));
901
902                if let Err(e) = sender.send(DataEvent::Response(response)) {
903                    log::error!("Failed to send instrument response: {e}");
904                }
905                return;
906            }
907
908            match http.request_instruments().await {
909                Ok(all_instruments) => {
910                    instruments.rcu(|m| {
911                        for instrument in &all_instruments {
912                            m.insert(instrument.id(), instrument.clone());
913                        }
914                    });
915                    http.cache_instruments(&all_instruments);
916
917                    let instrument = all_instruments
918                        .into_iter()
919                        .find(|i| i.id() == instrument_id);
920
921                    if let Some(instrument) = instrument {
922                        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
923                            request_id,
924                            client_id,
925                            instrument.id(),
926                            instrument,
927                            start_nanos,
928                            end_nanos,
929                            clock.get_time_ns(),
930                            params,
931                        )));
932
933                        if let Err(e) = sender.send(DataEvent::Response(response)) {
934                            log::error!("Failed to send instrument response: {e}");
935                        }
936                    } else {
937                        log::error!("Instrument not found: {instrument_id}");
938                    }
939                }
940                Err(e) => log::error!("Instrument request failed: {e:?}"),
941            }
942        });
943
944        Ok(())
945    }
946
947    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
948        let http = self.http.clone();
949        let sender = self.data_sender.clone();
950        let instrument_id = request.instrument_id;
951        let start = request.start;
952        let end = request.end;
953        let limit = request.limit.map(|n| n.get() as u64);
954        let request_id = request.request_id;
955        let client_id = request.client_id.unwrap_or(self.client_id);
956        let params = request.params;
957        let clock = self.clock;
958        let start_nanos = datetime_to_unix_nanos(start);
959        let end_nanos = datetime_to_unix_nanos(end);
960
961        get_runtime().spawn(async move {
962            match http.request_trades(instrument_id, start, end, limit).await {
963                Ok(trades) => {
964                    let response = DataResponse::Trades(TradesResponse::new(
965                        request_id,
966                        client_id,
967                        instrument_id,
968                        trades,
969                        start_nanos,
970                        end_nanos,
971                        clock.get_time_ns(),
972                        params,
973                    ));
974
975                    if let Err(e) = sender.send(DataEvent::Response(response)) {
976                        log::error!("Failed to send trades response: {e}");
977                    }
978                }
979                Err(e) => log::error!("Trades request failed: {e:?}"),
980            }
981        });
982
983        Ok(())
984    }
985
986    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
987        let http = self.http.clone();
988        let sender = self.data_sender.clone();
989        let bar_type = request.bar_type;
990        let start = request.start;
991        let end = request.end;
992        let limit = request.limit.map(|n| n.get() as u64);
993        let request_id = request.request_id;
994        let client_id = request.client_id.unwrap_or(self.client_id);
995        let params = request.params;
996        let clock = self.clock;
997        let start_nanos = datetime_to_unix_nanos(start);
998        let end_nanos = datetime_to_unix_nanos(end);
999
1000        get_runtime().spawn(async move {
1001            match http.request_bars(bar_type, start, end, limit).await {
1002                Ok(bars) => {
1003                    let response = DataResponse::Bars(BarsResponse::new(
1004                        request_id,
1005                        client_id,
1006                        bar_type,
1007                        bars,
1008                        start_nanos,
1009                        end_nanos,
1010                        clock.get_time_ns(),
1011                        params,
1012                    ));
1013
1014                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1015                        log::error!("Failed to send bars response: {e}");
1016                    }
1017                }
1018                Err(e) => log::error!("Bars request failed: {e:?}"),
1019            }
1020        });
1021
1022        Ok(())
1023    }
1024
1025    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1026        let http = self.http.clone();
1027        let sender = self.data_sender.clone();
1028        let instrument_id = request.instrument_id;
1029        let depth = request.depth.map(|n| n.get() as u32);
1030        let request_id = request.request_id;
1031        let client_id = request.client_id.unwrap_or(self.client_id);
1032        let params = request.params;
1033        let clock = self.clock;
1034
1035        get_runtime().spawn(async move {
1036            match http.request_book_snapshot(instrument_id, depth).await {
1037                Ok(book) => {
1038                    let response = DataResponse::Book(BookResponse::new(
1039                        request_id,
1040                        client_id,
1041                        instrument_id,
1042                        book,
1043                        None,
1044                        None,
1045                        clock.get_time_ns(),
1046                        params,
1047                    ));
1048
1049                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1050                        log::error!("Failed to send book snapshot response: {e}");
1051                    }
1052                }
1053                Err(e) => log::error!("Book snapshot request failed: {e:?}"),
1054            }
1055        });
1056
1057        Ok(())
1058    }
1059
1060    fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1061        let http = self.http.clone();
1062        let sender = self.data_sender.clone();
1063        let instrument_id = request.instrument_id;
1064        let start = request.start;
1065        let end = request.end;
1066        let limit = request.limit.map(|n| n.get());
1067        let request_id = request.request_id;
1068        let client_id = request.client_id.unwrap_or(self.client_id);
1069        let start_nanos = datetime_to_unix_nanos(start);
1070        let end_nanos = datetime_to_unix_nanos(end);
1071        let params = request.params;
1072        let clock = self.clock;
1073
1074        get_runtime().spawn(async move {
1075            match http
1076                .request_funding_rates(instrument_id, start, end, limit)
1077                .await
1078            {
1079                Ok(rates) => {
1080                    let response = DataResponse::FundingRates(FundingRatesResponse::new(
1081                        request_id,
1082                        client_id,
1083                        instrument_id,
1084                        rates,
1085                        start_nanos,
1086                        end_nanos,
1087                        clock.get_time_ns(),
1088                        params,
1089                    ));
1090
1091                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1092                        log::error!("Failed to send funding rates response: {e}");
1093                    }
1094                }
1095                Err(e) => log::error!("Funding rates request failed: {e:?}"),
1096            }
1097        });
1098
1099        Ok(())
1100    }
1101}
1102
1103#[cfg(test)]
1104mod tests {
1105    use nautilus_common::{live::runner::set_data_event_sender, messages::DataEvent};
1106    use nautilus_model::identifiers::ClientId;
1107    use rstest::rstest;
1108
1109    use super::*;
1110    use crate::{common::enums::KrakenProductType, config::KrakenDataClientConfig};
1111
1112    fn setup_test_env() {
1113        let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1114        set_data_event_sender(sender);
1115    }
1116
1117    #[rstest]
1118    fn test_futures_data_client_new() {
1119        setup_test_env();
1120        let config = KrakenDataClientConfig {
1121            product_type: KrakenProductType::Futures,
1122            ..Default::default()
1123        };
1124        let client = KrakenFuturesDataClient::new(ClientId::from("KRAKEN"), config);
1125        assert!(client.is_ok());
1126
1127        let client = client.unwrap();
1128        assert_eq!(client.client_id(), ClientId::from("KRAKEN"));
1129        assert_eq!(client.venue(), Some(*KRAKEN_VENUE));
1130        assert!(!client.is_connected());
1131        assert!(client.is_disconnected());
1132        assert!(client.instruments().is_empty());
1133    }
1134
1135    #[rstest]
1136    fn test_futures_data_client_start_stop() {
1137        setup_test_env();
1138        let config = KrakenDataClientConfig {
1139            product_type: KrakenProductType::Futures,
1140            ..Default::default()
1141        };
1142        let mut client = KrakenFuturesDataClient::new(ClientId::from("KRAKEN"), config).unwrap();
1143
1144        assert!(client.start().is_ok());
1145        assert!(client.stop().is_ok());
1146        assert!(client.is_disconnected());
1147    }
1148}