Skip to main content

nautilus_kraken/data/
spot.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Kraken Spot data client implementation.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, Mutex,
22        atomic::{AtomicBool, AtomicU64, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use async_trait::async_trait;
29use futures_util::StreamExt;
30use nautilus_common::{
31    clients::DataClient,
32    live::{get_data_event_sender, get_runtime},
33    messages::{
34        DataEvent,
35        data::{
36            BarsResponse, BookResponse, DataResponse, InstrumentResponse, InstrumentsResponse,
37            RequestBars, RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestTrades,
38            SubscribeBars, SubscribeBookDeltas, SubscribeIndexPrices, SubscribeInstrument,
39            SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
40            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
41            UnsubscribeIndexPrices, UnsubscribeInstrumentStatus, UnsubscribeMarkPrices,
42            UnsubscribeQuotes, UnsubscribeTrades,
43        },
44    },
45};
46use nautilus_core::{
47    AtomicMap, UnixNanos,
48    datetime::datetime_to_unix_nanos,
49    time::{AtomicTime, get_atomic_clock_realtime},
50};
51use nautilus_model::{
52    data::{Bar, Data, OrderBookDeltas, OrderBookDeltas_API},
53    enums::{AggregationSource, BookType},
54    identifiers::{ClientId, InstrumentId, Symbol, Venue},
55    instruments::{Instrument, InstrumentAny},
56};
57use tokio::task::JoinHandle;
58use tokio_util::sync::CancellationToken;
59use ustr::Ustr;
60
61type OhlcBufferKey = (Ustr, u32);
62type OhlcBuffer = Arc<Mutex<AHashMap<OhlcBufferKey, (Bar, UnixNanos)>>>;
63
64use crate::{
65    common::consts::KRAKEN_VENUE,
66    config::KrakenDataClientConfig,
67    http::{KrakenSpotHttpClient, spot::client::KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND},
68    websocket::spot_v2::{
69        client::KrakenSpotWebSocketClient,
70        messages::KrakenSpotWsMessage,
71        parse::{parse_book_deltas, parse_quote_tick, parse_trade_tick, parse_ws_bar},
72    },
73};
74
75/// Kraken Spot data client.
76///
77/// Provides real-time market data from Kraken Spot markets through WebSocket v2.
78#[allow(dead_code)]
79#[derive(Debug)]
80pub struct KrakenSpotDataClient {
81    clock: &'static AtomicTime,
82    client_id: ClientId,
83    config: KrakenDataClientConfig,
84    http: KrakenSpotHttpClient,
85    ws: KrakenSpotWebSocketClient,
86    is_connected: AtomicBool,
87    cancellation_token: CancellationToken,
88    tasks: Vec<JoinHandle<()>>,
89    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
90    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
91}
92
93impl KrakenSpotDataClient {
94    /// Creates a new [`KrakenSpotDataClient`] instance.
95    pub fn new(client_id: ClientId, config: KrakenDataClientConfig) -> anyhow::Result<Self> {
96        let cancellation_token = CancellationToken::new();
97
98        let http = KrakenSpotHttpClient::new(
99            config.environment,
100            config.base_url.clone(),
101            config.timeout_secs,
102            None,
103            None,
104            None,
105            config.proxy_url.clone(),
106            config
107                .max_requests_per_second
108                .unwrap_or(KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND),
109        )?;
110
111        let ws = KrakenSpotWebSocketClient::new(
112            config.clone(),
113            cancellation_token.clone(),
114            config.proxy_url.clone(),
115        );
116
117        Ok(Self {
118            clock: get_atomic_clock_realtime(),
119            client_id,
120            config,
121            http,
122            ws,
123            is_connected: AtomicBool::new(false),
124            cancellation_token,
125            tasks: Vec::new(),
126            instruments: Arc::new(AtomicMap::new()),
127            data_sender: get_data_event_sender(),
128        })
129    }
130
131    /// Returns the cached instruments.
132    #[must_use]
133    pub fn instruments(&self) -> Vec<InstrumentAny> {
134        self.instruments.load().values().cloned().collect()
135    }
136
137    /// Returns a cached instrument by ID.
138    #[must_use]
139    pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
140        self.instruments.load().get(instrument_id).cloned()
141    }
142
143    async fn load_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
144        let instruments = self
145            .http
146            .request_instruments(None)
147            .await
148            .context("Failed to load spot instruments")?;
149
150        self.instruments.rcu(|m| {
151            for instrument in &instruments {
152                m.insert(instrument.id(), instrument.clone());
153            }
154        });
155
156        self.http.cache_instruments(&instruments);
157
158        log::info!(
159            "Loaded instruments: client_id={}, count={}",
160            self.client_id,
161            instruments.len()
162        );
163
164        Ok(instruments)
165    }
166
167    fn spawn_ws<F>(&self, fut: F, context: &'static str)
168    where
169        F: Future<Output = anyhow::Result<()>> + Send + 'static,
170    {
171        get_runtime().spawn(async move {
172            if let Err(e) = fut.await {
173                log::error!("{context}: {e:?}");
174            }
175        });
176    }
177
178    fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
179        let stream = self.ws.stream().map_err(|e| anyhow::anyhow!("{e}"))?;
180        let data_sender = self.data_sender.clone();
181        let instruments = self.instruments.clone();
182        let book_sequence = Arc::new(AtomicU64::new(0));
183        let ohlc_buffer: OhlcBuffer = Arc::new(Mutex::new(AHashMap::new()));
184        let cancellation_token = self.cancellation_token.clone();
185        let clock = self.clock;
186
187        let handle = get_runtime().spawn(async move {
188            tokio::pin!(stream);
189
190            loop {
191                tokio::select! {
192                    () = cancellation_token.cancelled() => {
193                        log::debug!("Spot message handler cancelled");
194                        Self::flush_ohlc_buffer(&ohlc_buffer, &data_sender);
195                        break;
196                    }
197                    msg = stream.next() => {
198                        match msg {
199                            Some(ws_msg) => {
200                                Self::handle_ws_message(
201                                    ws_msg,
202                                    &data_sender,
203                                    &instruments,
204                                    &book_sequence,
205                                    &ohlc_buffer,
206                                    clock,
207                                );
208                            }
209                            None => {
210                                log::debug!("Spot WebSocket stream ended");
211                                Self::flush_ohlc_buffer(&ohlc_buffer, &data_sender);
212                                break;
213                            }
214                        }
215                    }
216                }
217            }
218        });
219
220        self.tasks.push(handle);
221        Ok(())
222    }
223
224    fn lookup_instrument(
225        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
226        symbol: &str,
227    ) -> Option<InstrumentAny> {
228        let instrument_id = InstrumentId::new(Symbol::new(symbol), *KRAKEN_VENUE);
229        instruments.load().get(&instrument_id).cloned()
230    }
231
232    fn flush_ohlc_buffer(
233        ohlc_buffer: &OhlcBuffer,
234        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
235    ) {
236        let Ok(mut buffer) = ohlc_buffer.lock() else {
237            return;
238        };
239        let bars: Vec<Bar> = buffer.drain().map(|(_, (bar, _))| bar).collect();
240        for bar in bars {
241            if let Err(e) = sender.send(DataEvent::Data(Data::Bar(bar))) {
242                log::error!("Failed to send buffered bar: {e}");
243            }
244        }
245    }
246
247    fn handle_ws_message(
248        msg: KrakenSpotWsMessage,
249        sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
250        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
251        book_sequence: &Arc<AtomicU64>,
252        ohlc_buffer: &OhlcBuffer,
253        clock: &'static AtomicTime,
254    ) {
255        let ts_init = clock.get_time_ns();
256
257        match msg {
258            KrakenSpotWsMessage::Ticker(tickers) => {
259                for ticker in &tickers {
260                    let Some(instrument) =
261                        Self::lookup_instrument(instruments, ticker.symbol.as_str())
262                    else {
263                        log::warn!("No instrument for symbol: {}", ticker.symbol);
264                        continue;
265                    };
266
267                    match parse_quote_tick(ticker, &instrument, ts_init) {
268                        Ok(quote) => {
269                            if let Err(e) = sender.send(DataEvent::Data(Data::Quote(quote))) {
270                                log::error!("Failed to send quote: {e}");
271                            }
272                        }
273                        Err(e) => log::error!("Failed to parse quote tick: {e}"),
274                    }
275                }
276            }
277            KrakenSpotWsMessage::Trade(trades) => {
278                for trade in &trades {
279                    let Some(instrument) =
280                        Self::lookup_instrument(instruments, trade.symbol.as_str())
281                    else {
282                        log::warn!("No instrument for symbol: {}", trade.symbol);
283                        continue;
284                    };
285
286                    match parse_trade_tick(trade, &instrument, ts_init) {
287                        Ok(tick) => {
288                            if let Err(e) = sender.send(DataEvent::Data(Data::Trade(tick))) {
289                                log::error!("Failed to send trade: {e}");
290                            }
291                        }
292                        Err(e) => log::error!("Failed to parse trade tick: {e}"),
293                    }
294                }
295            }
296            KrakenSpotWsMessage::Book {
297                data,
298                is_snapshot: _,
299            } => {
300                for book in &data {
301                    let Some(instrument) =
302                        Self::lookup_instrument(instruments, book.symbol.as_str())
303                    else {
304                        log::warn!("No instrument for symbol: {}", book.symbol);
305                        continue;
306                    };
307                    let sequence = book_sequence.load(Ordering::Relaxed);
308                    match parse_book_deltas(book, &instrument, sequence, ts_init) {
309                        Ok(delta_vec) => {
310                            if delta_vec.is_empty() {
311                                continue;
312                            }
313                            book_sequence.fetch_add(delta_vec.len() as u64, Ordering::Relaxed);
314                            let deltas = OrderBookDeltas::new(instrument.id(), delta_vec);
315                            let api_deltas = OrderBookDeltas_API::new(deltas);
316                            if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
317                                log::error!("Failed to send deltas: {e}");
318                            }
319                        }
320                        Err(e) => log::error!("Failed to parse book deltas: {e}"),
321                    }
322                }
323            }
324            KrakenSpotWsMessage::Ohlc(ohlc_data) => {
325                let Ok(mut buffer) = ohlc_buffer.lock() else {
326                    log::error!("OHLC buffer lock poisoned");
327                    return;
328                };
329
330                for ohlc in &ohlc_data {
331                    let Some(instrument) =
332                        Self::lookup_instrument(instruments, ohlc.symbol.as_str())
333                    else {
334                        log::warn!("No instrument for symbol: {}", ohlc.symbol);
335                        continue;
336                    };
337
338                    match parse_ws_bar(ohlc, &instrument, ts_init) {
339                        Ok(new_bar) => {
340                            let key: (Ustr, u32) = (ohlc.symbol, ohlc.interval);
341                            let new_interval_begin = UnixNanos::from(
342                                ohlc.interval_begin.timestamp_nanos_opt().unwrap_or(0) as u64,
343                            );
344
345                            if let Some((buffered_bar, buffered_begin)) = buffer.get(&key)
346                                && new_interval_begin != *buffered_begin
347                                && let Err(e) =
348                                    sender.send(DataEvent::Data(Data::Bar(*buffered_bar)))
349                            {
350                                log::error!("Failed to send bar: {e}");
351                            }
352
353                            buffer.insert(key, (new_bar, new_interval_begin));
354                        }
355                        Err(e) => log::error!("Failed to parse bar: {e}"),
356                    }
357                }
358            }
359            KrakenSpotWsMessage::Execution(_) => {}
360            KrakenSpotWsMessage::Reconnected => {
361                log::info!("Spot WebSocket reconnected");
362            }
363        }
364    }
365}
366
367#[async_trait(?Send)]
368impl DataClient for KrakenSpotDataClient {
369    fn client_id(&self) -> ClientId {
370        self.client_id
371    }
372
373    fn venue(&self) -> Option<Venue> {
374        Some(*KRAKEN_VENUE)
375    }
376
377    fn start(&mut self) -> anyhow::Result<()> {
378        log::info!(
379            "Starting Spot data client: client_id={}, environment={:?}",
380            self.client_id,
381            self.config.environment
382        );
383        Ok(())
384    }
385
386    fn stop(&mut self) -> anyhow::Result<()> {
387        log::info!("Stopping Spot data client: {}", self.client_id);
388        self.cancellation_token.cancel();
389        self.is_connected.store(false, Ordering::Relaxed);
390        Ok(())
391    }
392
393    fn reset(&mut self) -> anyhow::Result<()> {
394        log::info!("Resetting Spot data client: {}", self.client_id);
395        self.cancellation_token.cancel();
396
397        for task in self.tasks.drain(..) {
398            task.abort();
399        }
400
401        let mut ws = self.ws.clone();
402        get_runtime().spawn(async move {
403            let _ = ws.close().await;
404        });
405
406        self.instruments.store(ahash::AHashMap::new());
407
408        self.is_connected.store(false, Ordering::Relaxed);
409        self.cancellation_token = CancellationToken::new();
410        Ok(())
411    }
412
413    fn dispose(&mut self) -> anyhow::Result<()> {
414        log::info!("Disposing Spot data client: {}", self.client_id);
415        self.stop()
416    }
417
418    fn is_connected(&self) -> bool {
419        self.is_connected.load(Ordering::SeqCst)
420    }
421
422    fn is_disconnected(&self) -> bool {
423        !self.is_connected()
424    }
425
426    async fn connect(&mut self) -> anyhow::Result<()> {
427        if self.is_connected() {
428            return Ok(());
429        }
430
431        let instruments = self.load_instruments().await?;
432
433        self.ws
434            .connect()
435            .await
436            .context("Failed to connect spot WebSocket")?;
437        self.ws
438            .wait_until_active(10.0)
439            .await
440            .context("Spot WebSocket failed to become active")?;
441
442        self.spawn_message_handler()?;
443
444        for instrument in instruments {
445            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
446                log::error!("Failed to send instrument: {e}");
447            }
448        }
449
450        self.is_connected.store(true, Ordering::Release);
451        log::info!("Connected: client_id={}, product_type=Spot", self.client_id);
452        Ok(())
453    }
454
455    async fn disconnect(&mut self) -> anyhow::Result<()> {
456        if self.is_disconnected() {
457            return Ok(());
458        }
459
460        self.cancellation_token.cancel();
461        let _ = self.ws.close().await;
462
463        for handle in self.tasks.drain(..) {
464            if let Err(e) = handle.await {
465                log::error!("Error joining WebSocket task: {e:?}");
466            }
467        }
468
469        self.cancellation_token = CancellationToken::new();
470        self.is_connected.store(false, Ordering::Relaxed);
471
472        log::info!("Disconnected: client_id={}", self.client_id);
473        Ok(())
474    }
475
476    fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
477        log::debug!("subscribe_instruments: Kraken instruments are fetched via HTTP on connect");
478        Ok(())
479    }
480
481    fn subscribe_instrument(&mut self, _cmd: SubscribeInstrument) -> anyhow::Result<()> {
482        log::debug!("subscribe_instrument: Kraken instruments are fetched via HTTP on connect");
483        Ok(())
484    }
485
486    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
487        let instrument_id = cmd.instrument_id;
488        let depth = cmd.depth;
489
490        if cmd.book_type != BookType::L2_MBP {
491            log::warn!(
492                "Book type {:?} not supported by Kraken, skipping subscription",
493                cmd.book_type
494            );
495            return Ok(());
496        }
497
498        if let Some(d) = depth {
499            let d_val = d.get();
500            if !matches!(d_val, 10 | 25 | 100 | 500 | 1000) {
501                log::warn!("Invalid depth {d_val} for Kraken Spot, valid: 10, 25, 100, 500, 1000");
502                return Ok(());
503            }
504        }
505
506        let ws = self.ws.clone();
507        self.spawn_ws(
508            async move {
509                ws.subscribe_book(instrument_id, depth.map(|d| d.get() as u32))
510                    .await
511                    .map_err(|e| anyhow::anyhow!("{e}"))
512            },
513            "subscribe book",
514        );
515
516        log::info!("Subscribed to book: instrument_id={instrument_id}, depth={depth:?}");
517        Ok(())
518    }
519
520    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
521        let instrument_id = cmd.instrument_id;
522        let ws = self.ws.clone();
523
524        self.spawn_ws(
525            async move {
526                ws.subscribe_quotes(instrument_id)
527                    .await
528                    .map_err(|e| anyhow::anyhow!("{e}"))
529            },
530            "subscribe quotes",
531        );
532
533        log::info!("Subscribed to quotes: instrument_id={instrument_id}");
534        Ok(())
535    }
536
537    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
538        let instrument_id = cmd.instrument_id;
539        let ws = self.ws.clone();
540
541        self.spawn_ws(
542            async move {
543                ws.subscribe_trades(instrument_id)
544                    .await
545                    .map_err(|e| anyhow::anyhow!("{e}"))
546            },
547            "subscribe trades",
548        );
549
550        log::info!("Subscribed to trades: instrument_id={instrument_id}");
551        Ok(())
552    }
553
554    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
555        log::warn!(
556            "Mark price subscription not supported for Spot instrument {}",
557            cmd.instrument_id
558        );
559        Ok(())
560    }
561
562    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
563        log::warn!(
564            "Index price subscription not supported for Spot instrument {}",
565            cmd.instrument_id
566        );
567        Ok(())
568    }
569
570    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
571        let bar_type = cmd.bar_type;
572
573        if bar_type.aggregation_source() != AggregationSource::External {
574            log::warn!("Cannot subscribe to {bar_type} bars: only EXTERNAL bars supported");
575            return Ok(());
576        }
577
578        if !bar_type.spec().is_time_aggregated() {
579            log::warn!("Cannot subscribe to {bar_type} bars: only time-based bars supported");
580            return Ok(());
581        }
582
583        let ws = self.ws.clone();
584        self.spawn_ws(
585            async move {
586                ws.subscribe_bars(bar_type)
587                    .await
588                    .map_err(|e| anyhow::anyhow!("{e}"))
589            },
590            "subscribe bars",
591        );
592
593        log::info!("Subscribed to bars: bar_type={bar_type}");
594        Ok(())
595    }
596
597    fn subscribe_instrument_status(
598        &mut self,
599        cmd: SubscribeInstrumentStatus,
600    ) -> anyhow::Result<()> {
601        log::info!(
602            "subscribe_instrument_status: {} (status changes detected via periodic instrument polling)",
603            cmd.instrument_id,
604        );
605        Ok(())
606    }
607
608    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
609        let instrument_id = cmd.instrument_id;
610        let ws = self.ws.clone();
611
612        self.spawn_ws(
613            async move {
614                ws.unsubscribe_book(instrument_id)
615                    .await
616                    .map_err(|e| anyhow::anyhow!("{e}"))
617            },
618            "unsubscribe book",
619        );
620
621        log::info!("Unsubscribed from book: instrument_id={instrument_id}");
622        Ok(())
623    }
624
625    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
626        let instrument_id = cmd.instrument_id;
627        let ws = self.ws.clone();
628
629        self.spawn_ws(
630            async move {
631                ws.unsubscribe_quotes(instrument_id)
632                    .await
633                    .map_err(|e| anyhow::anyhow!("{e}"))
634            },
635            "unsubscribe quotes",
636        );
637
638        log::info!("Unsubscribed from quotes: instrument_id={instrument_id}");
639        Ok(())
640    }
641
642    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
643        let instrument_id = cmd.instrument_id;
644        let ws = self.ws.clone();
645
646        self.spawn_ws(
647            async move {
648                ws.unsubscribe_trades(instrument_id)
649                    .await
650                    .map_err(|e| anyhow::anyhow!("{e}"))
651            },
652            "unsubscribe trades",
653        );
654
655        log::info!("Unsubscribed from trades: instrument_id={instrument_id}");
656        Ok(())
657    }
658
659    fn unsubscribe_mark_prices(&mut self, _cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
660        Ok(())
661    }
662
663    fn unsubscribe_index_prices(&mut self, _cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
664        Ok(())
665    }
666
667    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
668        let bar_type = cmd.bar_type;
669        let ws = self.ws.clone();
670
671        self.spawn_ws(
672            async move {
673                ws.unsubscribe_bars(bar_type)
674                    .await
675                    .map_err(|e| anyhow::anyhow!("{e}"))
676            },
677            "unsubscribe bars",
678        );
679
680        log::info!("Unsubscribed from bars: bar_type={bar_type}");
681        Ok(())
682    }
683
684    fn unsubscribe_instrument_status(
685        &mut self,
686        _cmd: &UnsubscribeInstrumentStatus,
687    ) -> anyhow::Result<()> {
688        Ok(())
689    }
690
691    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
692        let http = self.http.clone();
693        let sender = self.data_sender.clone();
694        let instruments_cache = self.instruments.clone();
695        let request_id = request.request_id;
696        let client_id = request.client_id.unwrap_or(self.client_id);
697        let venue = *KRAKEN_VENUE;
698        let start_nanos = datetime_to_unix_nanos(request.start);
699        let end_nanos = datetime_to_unix_nanos(request.end);
700        let params = request.params;
701        let clock = self.clock;
702
703        get_runtime().spawn(async move {
704            match http.request_instruments(None).await {
705                Ok(instruments) => {
706                    instruments_cache.rcu(|m| {
707                        for instrument in &instruments {
708                            m.insert(instrument.id(), instrument.clone());
709                        }
710                    });
711                    http.cache_instruments(&instruments);
712
713                    let response = DataResponse::Instruments(InstrumentsResponse::new(
714                        request_id,
715                        client_id,
716                        venue,
717                        instruments,
718                        start_nanos,
719                        end_nanos,
720                        clock.get_time_ns(),
721                        params,
722                    ));
723
724                    if let Err(e) = sender.send(DataEvent::Response(response)) {
725                        log::error!("Failed to send instruments response: {e}");
726                    }
727                }
728                Err(e) => log::error!("Instruments request failed: {e:?}"),
729            }
730        });
731
732        Ok(())
733    }
734
735    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
736        let http = self.http.clone();
737        let sender = self.data_sender.clone();
738        let instruments = self.instruments.clone();
739        let instrument_id = request.instrument_id;
740        let request_id = request.request_id;
741        let client_id = request.client_id.unwrap_or(self.client_id);
742        let start_nanos = datetime_to_unix_nanos(request.start);
743        let end_nanos = datetime_to_unix_nanos(request.end);
744        let params = request.params;
745        let clock = self.clock;
746
747        get_runtime().spawn(async move {
748            if let Some(instrument) = instruments.load().get(&instrument_id) {
749                let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
750                    request_id,
751                    client_id,
752                    instrument.id(),
753                    instrument.clone(),
754                    start_nanos,
755                    end_nanos,
756                    clock.get_time_ns(),
757                    params,
758                )));
759
760                if let Err(e) = sender.send(DataEvent::Response(response)) {
761                    log::error!("Failed to send instrument response: {e}");
762                }
763                return;
764            }
765
766            match http.request_instruments(None).await {
767                Ok(all_instruments) => {
768                    instruments.rcu(|m| {
769                        for instrument in &all_instruments {
770                            m.insert(instrument.id(), instrument.clone());
771                        }
772                    });
773                    http.cache_instruments(&all_instruments);
774
775                    let instrument = all_instruments
776                        .into_iter()
777                        .find(|i| i.id() == instrument_id);
778
779                    if let Some(instrument) = instrument {
780                        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
781                            request_id,
782                            client_id,
783                            instrument.id(),
784                            instrument,
785                            start_nanos,
786                            end_nanos,
787                            clock.get_time_ns(),
788                            params,
789                        )));
790
791                        if let Err(e) = sender.send(DataEvent::Response(response)) {
792                            log::error!("Failed to send instrument response: {e}");
793                        }
794                    } else {
795                        log::error!("Instrument not found: {instrument_id}");
796                    }
797                }
798                Err(e) => log::error!("Instrument request failed: {e:?}"),
799            }
800        });
801
802        Ok(())
803    }
804    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
805        let http = self.http.clone();
806        let sender = self.data_sender.clone();
807        let instrument_id = request.instrument_id;
808        let start = request.start;
809        let end = request.end;
810        let limit = request.limit.map(|n| n.get() as u64);
811        let request_id = request.request_id;
812        let client_id = request.client_id.unwrap_or(self.client_id);
813        let params = request.params;
814        let clock = self.clock;
815        let start_nanos = datetime_to_unix_nanos(start);
816        let end_nanos = datetime_to_unix_nanos(end);
817
818        get_runtime().spawn(async move {
819            match http.request_trades(instrument_id, start, end, limit).await {
820                Ok(trades) => {
821                    let response = DataResponse::Trades(TradesResponse::new(
822                        request_id,
823                        client_id,
824                        instrument_id,
825                        trades,
826                        start_nanos,
827                        end_nanos,
828                        clock.get_time_ns(),
829                        params,
830                    ));
831
832                    if let Err(e) = sender.send(DataEvent::Response(response)) {
833                        log::error!("Failed to send trades response: {e}");
834                    }
835                }
836                Err(e) => log::error!("Trades request failed: {e:?}"),
837            }
838        });
839
840        Ok(())
841    }
842
843    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
844        let http = self.http.clone();
845        let sender = self.data_sender.clone();
846        let bar_type = request.bar_type;
847        let start = request.start;
848        let end = request.end;
849        let limit = request.limit.map(|n| n.get() as u64);
850        let request_id = request.request_id;
851        let client_id = request.client_id.unwrap_or(self.client_id);
852        let params = request.params;
853        let clock = self.clock;
854        let start_nanos = datetime_to_unix_nanos(start);
855        let end_nanos = datetime_to_unix_nanos(end);
856
857        get_runtime().spawn(async move {
858            match http.request_bars(bar_type, start, end, limit).await {
859                Ok(bars) => {
860                    let response = DataResponse::Bars(BarsResponse::new(
861                        request_id,
862                        client_id,
863                        bar_type,
864                        bars,
865                        start_nanos,
866                        end_nanos,
867                        clock.get_time_ns(),
868                        params,
869                    ));
870
871                    if let Err(e) = sender.send(DataEvent::Response(response)) {
872                        log::error!("Failed to send bars response: {e}");
873                    }
874                }
875                Err(e) => log::error!("Bars request failed: {e:?}"),
876            }
877        });
878
879        Ok(())
880    }
881
882    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
883        let http = self.http.clone();
884        let sender = self.data_sender.clone();
885        let instrument_id = request.instrument_id;
886        let depth = request.depth.map(|n| n.get() as u32);
887        let request_id = request.request_id;
888        let client_id = request.client_id.unwrap_or(self.client_id);
889        let params = request.params;
890        let clock = self.clock;
891
892        get_runtime().spawn(async move {
893            match http.request_book_snapshot(instrument_id, depth).await {
894                Ok(book) => {
895                    let response = DataResponse::Book(BookResponse::new(
896                        request_id,
897                        client_id,
898                        instrument_id,
899                        book,
900                        None,
901                        None,
902                        clock.get_time_ns(),
903                        params,
904                    ));
905
906                    if let Err(e) = sender.send(DataEvent::Response(response)) {
907                        log::error!("Failed to send book snapshot response: {e}");
908                    }
909                }
910                Err(e) => log::error!("Book snapshot request failed: {e:?}"),
911            }
912        });
913
914        Ok(())
915    }
916}
917
918#[cfg(test)]
919mod tests {
920    use nautilus_common::{live::runner::set_data_event_sender, messages::DataEvent};
921    use nautilus_model::identifiers::ClientId;
922    use rstest::rstest;
923
924    use super::*;
925    use crate::config::KrakenDataClientConfig;
926
927    fn setup_test_env() {
928        let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
929        set_data_event_sender(sender);
930    }
931
932    #[rstest]
933    fn test_spot_data_client_new() {
934        setup_test_env();
935        let config = KrakenDataClientConfig::default();
936        let client = KrakenSpotDataClient::new(ClientId::from("KRAKEN"), config);
937        assert!(client.is_ok());
938
939        let client = client.unwrap();
940        assert_eq!(client.client_id(), ClientId::from("KRAKEN"));
941        assert_eq!(client.venue(), Some(*KRAKEN_VENUE));
942        assert!(!client.is_connected());
943        assert!(client.is_disconnected());
944        assert!(client.instruments().is_empty());
945    }
946
947    #[rstest]
948    fn test_spot_data_client_start_stop() {
949        setup_test_env();
950        let config = KrakenDataClientConfig::default();
951        let mut client = KrakenSpotDataClient::new(ClientId::from("KRAKEN"), config).unwrap();
952
953        assert!(client.start().is_ok());
954        assert!(client.stop().is_ok());
955        assert!(client.is_disconnected());
956    }
957}