1use std::{
19 future::Future,
20 sync::{
21 Arc, Mutex,
22 atomic::{AtomicBool, AtomicU64, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use anyhow::Context;
28use async_trait::async_trait;
29use futures_util::StreamExt;
30use nautilus_common::{
31 clients::DataClient,
32 live::{get_data_event_sender, get_runtime},
33 messages::{
34 DataEvent,
35 data::{
36 BarsResponse, BookResponse, DataResponse, InstrumentResponse, InstrumentsResponse,
37 RequestBars, RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestTrades,
38 SubscribeBars, SubscribeBookDeltas, SubscribeIndexPrices, SubscribeInstrument,
39 SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
40 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
41 UnsubscribeIndexPrices, UnsubscribeInstrumentStatus, UnsubscribeMarkPrices,
42 UnsubscribeQuotes, UnsubscribeTrades,
43 },
44 },
45};
46use nautilus_core::{
47 AtomicMap, UnixNanos,
48 datetime::datetime_to_unix_nanos,
49 time::{AtomicTime, get_atomic_clock_realtime},
50};
51use nautilus_model::{
52 data::{Bar, Data, OrderBookDeltas, OrderBookDeltas_API},
53 enums::{AggregationSource, BookType},
54 identifiers::{ClientId, InstrumentId, Symbol, Venue},
55 instruments::{Instrument, InstrumentAny},
56};
57use tokio::task::JoinHandle;
58use tokio_util::sync::CancellationToken;
59use ustr::Ustr;
60
61type OhlcBufferKey = (Ustr, u32);
62type OhlcBuffer = Arc<Mutex<AHashMap<OhlcBufferKey, (Bar, UnixNanos)>>>;
63
64use crate::{
65 common::consts::KRAKEN_VENUE,
66 config::KrakenDataClientConfig,
67 http::{KrakenSpotHttpClient, spot::client::KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND},
68 websocket::spot_v2::{
69 client::KrakenSpotWebSocketClient,
70 messages::KrakenSpotWsMessage,
71 parse::{parse_book_deltas, parse_quote_tick, parse_trade_tick, parse_ws_bar},
72 },
73};
74
75#[allow(dead_code)]
79#[derive(Debug)]
80pub struct KrakenSpotDataClient {
81 clock: &'static AtomicTime,
82 client_id: ClientId,
83 config: KrakenDataClientConfig,
84 http: KrakenSpotHttpClient,
85 ws: KrakenSpotWebSocketClient,
86 is_connected: AtomicBool,
87 cancellation_token: CancellationToken,
88 tasks: Vec<JoinHandle<()>>,
89 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
90 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
91}
92
93impl KrakenSpotDataClient {
94 pub fn new(client_id: ClientId, config: KrakenDataClientConfig) -> anyhow::Result<Self> {
96 let cancellation_token = CancellationToken::new();
97
98 let http = KrakenSpotHttpClient::new(
99 config.environment,
100 config.base_url.clone(),
101 config.timeout_secs,
102 None,
103 None,
104 None,
105 config.proxy_url.clone(),
106 config
107 .max_requests_per_second
108 .unwrap_or(KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND),
109 )?;
110
111 let ws = KrakenSpotWebSocketClient::new(
112 config.clone(),
113 cancellation_token.clone(),
114 config.proxy_url.clone(),
115 );
116
117 Ok(Self {
118 clock: get_atomic_clock_realtime(),
119 client_id,
120 config,
121 http,
122 ws,
123 is_connected: AtomicBool::new(false),
124 cancellation_token,
125 tasks: Vec::new(),
126 instruments: Arc::new(AtomicMap::new()),
127 data_sender: get_data_event_sender(),
128 })
129 }
130
131 #[must_use]
133 pub fn instruments(&self) -> Vec<InstrumentAny> {
134 self.instruments.load().values().cloned().collect()
135 }
136
137 #[must_use]
139 pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
140 self.instruments.load().get(instrument_id).cloned()
141 }
142
143 async fn load_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
144 let instruments = self
145 .http
146 .request_instruments(None)
147 .await
148 .context("Failed to load spot instruments")?;
149
150 self.instruments.rcu(|m| {
151 for instrument in &instruments {
152 m.insert(instrument.id(), instrument.clone());
153 }
154 });
155
156 self.http.cache_instruments(&instruments);
157
158 log::info!(
159 "Loaded instruments: client_id={}, count={}",
160 self.client_id,
161 instruments.len()
162 );
163
164 Ok(instruments)
165 }
166
167 fn spawn_ws<F>(&self, fut: F, context: &'static str)
168 where
169 F: Future<Output = anyhow::Result<()>> + Send + 'static,
170 {
171 get_runtime().spawn(async move {
172 if let Err(e) = fut.await {
173 log::error!("{context}: {e:?}");
174 }
175 });
176 }
177
178 fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
179 let stream = self.ws.stream().map_err(|e| anyhow::anyhow!("{e}"))?;
180 let data_sender = self.data_sender.clone();
181 let instruments = self.instruments.clone();
182 let book_sequence = Arc::new(AtomicU64::new(0));
183 let ohlc_buffer: OhlcBuffer = Arc::new(Mutex::new(AHashMap::new()));
184 let cancellation_token = self.cancellation_token.clone();
185 let clock = self.clock;
186
187 let handle = get_runtime().spawn(async move {
188 tokio::pin!(stream);
189
190 loop {
191 tokio::select! {
192 () = cancellation_token.cancelled() => {
193 log::debug!("Spot message handler cancelled");
194 Self::flush_ohlc_buffer(&ohlc_buffer, &data_sender);
195 break;
196 }
197 msg = stream.next() => {
198 match msg {
199 Some(ws_msg) => {
200 Self::handle_ws_message(
201 ws_msg,
202 &data_sender,
203 &instruments,
204 &book_sequence,
205 &ohlc_buffer,
206 clock,
207 );
208 }
209 None => {
210 log::debug!("Spot WebSocket stream ended");
211 Self::flush_ohlc_buffer(&ohlc_buffer, &data_sender);
212 break;
213 }
214 }
215 }
216 }
217 }
218 });
219
220 self.tasks.push(handle);
221 Ok(())
222 }
223
224 fn lookup_instrument(
225 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
226 symbol: &str,
227 ) -> Option<InstrumentAny> {
228 let instrument_id = InstrumentId::new(Symbol::new(symbol), *KRAKEN_VENUE);
229 instruments.load().get(&instrument_id).cloned()
230 }
231
232 fn flush_ohlc_buffer(
233 ohlc_buffer: &OhlcBuffer,
234 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
235 ) {
236 let Ok(mut buffer) = ohlc_buffer.lock() else {
237 return;
238 };
239 let bars: Vec<Bar> = buffer.drain().map(|(_, (bar, _))| bar).collect();
240 for bar in bars {
241 if let Err(e) = sender.send(DataEvent::Data(Data::Bar(bar))) {
242 log::error!("Failed to send buffered bar: {e}");
243 }
244 }
245 }
246
247 fn handle_ws_message(
248 msg: KrakenSpotWsMessage,
249 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
250 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
251 book_sequence: &Arc<AtomicU64>,
252 ohlc_buffer: &OhlcBuffer,
253 clock: &'static AtomicTime,
254 ) {
255 let ts_init = clock.get_time_ns();
256
257 match msg {
258 KrakenSpotWsMessage::Ticker(tickers) => {
259 for ticker in &tickers {
260 let Some(instrument) =
261 Self::lookup_instrument(instruments, ticker.symbol.as_str())
262 else {
263 log::warn!("No instrument for symbol: {}", ticker.symbol);
264 continue;
265 };
266
267 match parse_quote_tick(ticker, &instrument, ts_init) {
268 Ok(quote) => {
269 if let Err(e) = sender.send(DataEvent::Data(Data::Quote(quote))) {
270 log::error!("Failed to send quote: {e}");
271 }
272 }
273 Err(e) => log::error!("Failed to parse quote tick: {e}"),
274 }
275 }
276 }
277 KrakenSpotWsMessage::Trade(trades) => {
278 for trade in &trades {
279 let Some(instrument) =
280 Self::lookup_instrument(instruments, trade.symbol.as_str())
281 else {
282 log::warn!("No instrument for symbol: {}", trade.symbol);
283 continue;
284 };
285
286 match parse_trade_tick(trade, &instrument, ts_init) {
287 Ok(tick) => {
288 if let Err(e) = sender.send(DataEvent::Data(Data::Trade(tick))) {
289 log::error!("Failed to send trade: {e}");
290 }
291 }
292 Err(e) => log::error!("Failed to parse trade tick: {e}"),
293 }
294 }
295 }
296 KrakenSpotWsMessage::Book {
297 data,
298 is_snapshot: _,
299 } => {
300 for book in &data {
301 let Some(instrument) =
302 Self::lookup_instrument(instruments, book.symbol.as_str())
303 else {
304 log::warn!("No instrument for symbol: {}", book.symbol);
305 continue;
306 };
307 let sequence = book_sequence.load(Ordering::Relaxed);
308 match parse_book_deltas(book, &instrument, sequence, ts_init) {
309 Ok(delta_vec) => {
310 if delta_vec.is_empty() {
311 continue;
312 }
313 book_sequence.fetch_add(delta_vec.len() as u64, Ordering::Relaxed);
314 let deltas = OrderBookDeltas::new(instrument.id(), delta_vec);
315 let api_deltas = OrderBookDeltas_API::new(deltas);
316 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(api_deltas))) {
317 log::error!("Failed to send deltas: {e}");
318 }
319 }
320 Err(e) => log::error!("Failed to parse book deltas: {e}"),
321 }
322 }
323 }
324 KrakenSpotWsMessage::Ohlc(ohlc_data) => {
325 let Ok(mut buffer) = ohlc_buffer.lock() else {
326 log::error!("OHLC buffer lock poisoned");
327 return;
328 };
329
330 for ohlc in &ohlc_data {
331 let Some(instrument) =
332 Self::lookup_instrument(instruments, ohlc.symbol.as_str())
333 else {
334 log::warn!("No instrument for symbol: {}", ohlc.symbol);
335 continue;
336 };
337
338 match parse_ws_bar(ohlc, &instrument, ts_init) {
339 Ok(new_bar) => {
340 let key: (Ustr, u32) = (ohlc.symbol, ohlc.interval);
341 let new_interval_begin = UnixNanos::from(
342 ohlc.interval_begin.timestamp_nanos_opt().unwrap_or(0) as u64,
343 );
344
345 if let Some((buffered_bar, buffered_begin)) = buffer.get(&key)
346 && new_interval_begin != *buffered_begin
347 && let Err(e) =
348 sender.send(DataEvent::Data(Data::Bar(*buffered_bar)))
349 {
350 log::error!("Failed to send bar: {e}");
351 }
352
353 buffer.insert(key, (new_bar, new_interval_begin));
354 }
355 Err(e) => log::error!("Failed to parse bar: {e}"),
356 }
357 }
358 }
359 KrakenSpotWsMessage::Execution(_) => {}
360 KrakenSpotWsMessage::Reconnected => {
361 log::info!("Spot WebSocket reconnected");
362 }
363 }
364 }
365}
366
367#[async_trait(?Send)]
368impl DataClient for KrakenSpotDataClient {
369 fn client_id(&self) -> ClientId {
370 self.client_id
371 }
372
373 fn venue(&self) -> Option<Venue> {
374 Some(*KRAKEN_VENUE)
375 }
376
377 fn start(&mut self) -> anyhow::Result<()> {
378 log::info!(
379 "Starting Spot data client: client_id={}, environment={:?}",
380 self.client_id,
381 self.config.environment
382 );
383 Ok(())
384 }
385
386 fn stop(&mut self) -> anyhow::Result<()> {
387 log::info!("Stopping Spot data client: {}", self.client_id);
388 self.cancellation_token.cancel();
389 self.is_connected.store(false, Ordering::Relaxed);
390 Ok(())
391 }
392
393 fn reset(&mut self) -> anyhow::Result<()> {
394 log::info!("Resetting Spot data client: {}", self.client_id);
395 self.cancellation_token.cancel();
396
397 for task in self.tasks.drain(..) {
398 task.abort();
399 }
400
401 let mut ws = self.ws.clone();
402 get_runtime().spawn(async move {
403 let _ = ws.close().await;
404 });
405
406 self.instruments.store(ahash::AHashMap::new());
407
408 self.is_connected.store(false, Ordering::Relaxed);
409 self.cancellation_token = CancellationToken::new();
410 Ok(())
411 }
412
413 fn dispose(&mut self) -> anyhow::Result<()> {
414 log::info!("Disposing Spot data client: {}", self.client_id);
415 self.stop()
416 }
417
418 fn is_connected(&self) -> bool {
419 self.is_connected.load(Ordering::SeqCst)
420 }
421
422 fn is_disconnected(&self) -> bool {
423 !self.is_connected()
424 }
425
426 async fn connect(&mut self) -> anyhow::Result<()> {
427 if self.is_connected() {
428 return Ok(());
429 }
430
431 let instruments = self.load_instruments().await?;
432
433 self.ws
434 .connect()
435 .await
436 .context("Failed to connect spot WebSocket")?;
437 self.ws
438 .wait_until_active(10.0)
439 .await
440 .context("Spot WebSocket failed to become active")?;
441
442 self.spawn_message_handler()?;
443
444 for instrument in instruments {
445 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
446 log::error!("Failed to send instrument: {e}");
447 }
448 }
449
450 self.is_connected.store(true, Ordering::Release);
451 log::info!("Connected: client_id={}, product_type=Spot", self.client_id);
452 Ok(())
453 }
454
455 async fn disconnect(&mut self) -> anyhow::Result<()> {
456 if self.is_disconnected() {
457 return Ok(());
458 }
459
460 self.cancellation_token.cancel();
461 let _ = self.ws.close().await;
462
463 for handle in self.tasks.drain(..) {
464 if let Err(e) = handle.await {
465 log::error!("Error joining WebSocket task: {e:?}");
466 }
467 }
468
469 self.cancellation_token = CancellationToken::new();
470 self.is_connected.store(false, Ordering::Relaxed);
471
472 log::info!("Disconnected: client_id={}", self.client_id);
473 Ok(())
474 }
475
476 fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
477 log::debug!("subscribe_instruments: Kraken instruments are fetched via HTTP on connect");
478 Ok(())
479 }
480
481 fn subscribe_instrument(&mut self, _cmd: SubscribeInstrument) -> anyhow::Result<()> {
482 log::debug!("subscribe_instrument: Kraken instruments are fetched via HTTP on connect");
483 Ok(())
484 }
485
486 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
487 let instrument_id = cmd.instrument_id;
488 let depth = cmd.depth;
489
490 if cmd.book_type != BookType::L2_MBP {
491 log::warn!(
492 "Book type {:?} not supported by Kraken, skipping subscription",
493 cmd.book_type
494 );
495 return Ok(());
496 }
497
498 if let Some(d) = depth {
499 let d_val = d.get();
500 if !matches!(d_val, 10 | 25 | 100 | 500 | 1000) {
501 log::warn!("Invalid depth {d_val} for Kraken Spot, valid: 10, 25, 100, 500, 1000");
502 return Ok(());
503 }
504 }
505
506 let ws = self.ws.clone();
507 self.spawn_ws(
508 async move {
509 ws.subscribe_book(instrument_id, depth.map(|d| d.get() as u32))
510 .await
511 .map_err(|e| anyhow::anyhow!("{e}"))
512 },
513 "subscribe book",
514 );
515
516 log::info!("Subscribed to book: instrument_id={instrument_id}, depth={depth:?}");
517 Ok(())
518 }
519
520 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
521 let instrument_id = cmd.instrument_id;
522 let ws = self.ws.clone();
523
524 self.spawn_ws(
525 async move {
526 ws.subscribe_quotes(instrument_id)
527 .await
528 .map_err(|e| anyhow::anyhow!("{e}"))
529 },
530 "subscribe quotes",
531 );
532
533 log::info!("Subscribed to quotes: instrument_id={instrument_id}");
534 Ok(())
535 }
536
537 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
538 let instrument_id = cmd.instrument_id;
539 let ws = self.ws.clone();
540
541 self.spawn_ws(
542 async move {
543 ws.subscribe_trades(instrument_id)
544 .await
545 .map_err(|e| anyhow::anyhow!("{e}"))
546 },
547 "subscribe trades",
548 );
549
550 log::info!("Subscribed to trades: instrument_id={instrument_id}");
551 Ok(())
552 }
553
554 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
555 log::warn!(
556 "Mark price subscription not supported for Spot instrument {}",
557 cmd.instrument_id
558 );
559 Ok(())
560 }
561
562 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
563 log::warn!(
564 "Index price subscription not supported for Spot instrument {}",
565 cmd.instrument_id
566 );
567 Ok(())
568 }
569
570 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
571 let bar_type = cmd.bar_type;
572
573 if bar_type.aggregation_source() != AggregationSource::External {
574 log::warn!("Cannot subscribe to {bar_type} bars: only EXTERNAL bars supported");
575 return Ok(());
576 }
577
578 if !bar_type.spec().is_time_aggregated() {
579 log::warn!("Cannot subscribe to {bar_type} bars: only time-based bars supported");
580 return Ok(());
581 }
582
583 let ws = self.ws.clone();
584 self.spawn_ws(
585 async move {
586 ws.subscribe_bars(bar_type)
587 .await
588 .map_err(|e| anyhow::anyhow!("{e}"))
589 },
590 "subscribe bars",
591 );
592
593 log::info!("Subscribed to bars: bar_type={bar_type}");
594 Ok(())
595 }
596
597 fn subscribe_instrument_status(
598 &mut self,
599 cmd: SubscribeInstrumentStatus,
600 ) -> anyhow::Result<()> {
601 log::info!(
602 "subscribe_instrument_status: {} (status changes detected via periodic instrument polling)",
603 cmd.instrument_id,
604 );
605 Ok(())
606 }
607
608 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
609 let instrument_id = cmd.instrument_id;
610 let ws = self.ws.clone();
611
612 self.spawn_ws(
613 async move {
614 ws.unsubscribe_book(instrument_id)
615 .await
616 .map_err(|e| anyhow::anyhow!("{e}"))
617 },
618 "unsubscribe book",
619 );
620
621 log::info!("Unsubscribed from book: instrument_id={instrument_id}");
622 Ok(())
623 }
624
625 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
626 let instrument_id = cmd.instrument_id;
627 let ws = self.ws.clone();
628
629 self.spawn_ws(
630 async move {
631 ws.unsubscribe_quotes(instrument_id)
632 .await
633 .map_err(|e| anyhow::anyhow!("{e}"))
634 },
635 "unsubscribe quotes",
636 );
637
638 log::info!("Unsubscribed from quotes: instrument_id={instrument_id}");
639 Ok(())
640 }
641
642 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
643 let instrument_id = cmd.instrument_id;
644 let ws = self.ws.clone();
645
646 self.spawn_ws(
647 async move {
648 ws.unsubscribe_trades(instrument_id)
649 .await
650 .map_err(|e| anyhow::anyhow!("{e}"))
651 },
652 "unsubscribe trades",
653 );
654
655 log::info!("Unsubscribed from trades: instrument_id={instrument_id}");
656 Ok(())
657 }
658
659 fn unsubscribe_mark_prices(&mut self, _cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
660 Ok(())
661 }
662
663 fn unsubscribe_index_prices(&mut self, _cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
664 Ok(())
665 }
666
667 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
668 let bar_type = cmd.bar_type;
669 let ws = self.ws.clone();
670
671 self.spawn_ws(
672 async move {
673 ws.unsubscribe_bars(bar_type)
674 .await
675 .map_err(|e| anyhow::anyhow!("{e}"))
676 },
677 "unsubscribe bars",
678 );
679
680 log::info!("Unsubscribed from bars: bar_type={bar_type}");
681 Ok(())
682 }
683
684 fn unsubscribe_instrument_status(
685 &mut self,
686 _cmd: &UnsubscribeInstrumentStatus,
687 ) -> anyhow::Result<()> {
688 Ok(())
689 }
690
691 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
692 let http = self.http.clone();
693 let sender = self.data_sender.clone();
694 let instruments_cache = self.instruments.clone();
695 let request_id = request.request_id;
696 let client_id = request.client_id.unwrap_or(self.client_id);
697 let venue = *KRAKEN_VENUE;
698 let start_nanos = datetime_to_unix_nanos(request.start);
699 let end_nanos = datetime_to_unix_nanos(request.end);
700 let params = request.params;
701 let clock = self.clock;
702
703 get_runtime().spawn(async move {
704 match http.request_instruments(None).await {
705 Ok(instruments) => {
706 instruments_cache.rcu(|m| {
707 for instrument in &instruments {
708 m.insert(instrument.id(), instrument.clone());
709 }
710 });
711 http.cache_instruments(&instruments);
712
713 let response = DataResponse::Instruments(InstrumentsResponse::new(
714 request_id,
715 client_id,
716 venue,
717 instruments,
718 start_nanos,
719 end_nanos,
720 clock.get_time_ns(),
721 params,
722 ));
723
724 if let Err(e) = sender.send(DataEvent::Response(response)) {
725 log::error!("Failed to send instruments response: {e}");
726 }
727 }
728 Err(e) => log::error!("Instruments request failed: {e:?}"),
729 }
730 });
731
732 Ok(())
733 }
734
735 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
736 let http = self.http.clone();
737 let sender = self.data_sender.clone();
738 let instruments = self.instruments.clone();
739 let instrument_id = request.instrument_id;
740 let request_id = request.request_id;
741 let client_id = request.client_id.unwrap_or(self.client_id);
742 let start_nanos = datetime_to_unix_nanos(request.start);
743 let end_nanos = datetime_to_unix_nanos(request.end);
744 let params = request.params;
745 let clock = self.clock;
746
747 get_runtime().spawn(async move {
748 if let Some(instrument) = instruments.load().get(&instrument_id) {
749 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
750 request_id,
751 client_id,
752 instrument.id(),
753 instrument.clone(),
754 start_nanos,
755 end_nanos,
756 clock.get_time_ns(),
757 params,
758 )));
759
760 if let Err(e) = sender.send(DataEvent::Response(response)) {
761 log::error!("Failed to send instrument response: {e}");
762 }
763 return;
764 }
765
766 match http.request_instruments(None).await {
767 Ok(all_instruments) => {
768 instruments.rcu(|m| {
769 for instrument in &all_instruments {
770 m.insert(instrument.id(), instrument.clone());
771 }
772 });
773 http.cache_instruments(&all_instruments);
774
775 let instrument = all_instruments
776 .into_iter()
777 .find(|i| i.id() == instrument_id);
778
779 if let Some(instrument) = instrument {
780 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
781 request_id,
782 client_id,
783 instrument.id(),
784 instrument,
785 start_nanos,
786 end_nanos,
787 clock.get_time_ns(),
788 params,
789 )));
790
791 if let Err(e) = sender.send(DataEvent::Response(response)) {
792 log::error!("Failed to send instrument response: {e}");
793 }
794 } else {
795 log::error!("Instrument not found: {instrument_id}");
796 }
797 }
798 Err(e) => log::error!("Instrument request failed: {e:?}"),
799 }
800 });
801
802 Ok(())
803 }
804 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
805 let http = self.http.clone();
806 let sender = self.data_sender.clone();
807 let instrument_id = request.instrument_id;
808 let start = request.start;
809 let end = request.end;
810 let limit = request.limit.map(|n| n.get() as u64);
811 let request_id = request.request_id;
812 let client_id = request.client_id.unwrap_or(self.client_id);
813 let params = request.params;
814 let clock = self.clock;
815 let start_nanos = datetime_to_unix_nanos(start);
816 let end_nanos = datetime_to_unix_nanos(end);
817
818 get_runtime().spawn(async move {
819 match http.request_trades(instrument_id, start, end, limit).await {
820 Ok(trades) => {
821 let response = DataResponse::Trades(TradesResponse::new(
822 request_id,
823 client_id,
824 instrument_id,
825 trades,
826 start_nanos,
827 end_nanos,
828 clock.get_time_ns(),
829 params,
830 ));
831
832 if let Err(e) = sender.send(DataEvent::Response(response)) {
833 log::error!("Failed to send trades response: {e}");
834 }
835 }
836 Err(e) => log::error!("Trades request failed: {e:?}"),
837 }
838 });
839
840 Ok(())
841 }
842
843 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
844 let http = self.http.clone();
845 let sender = self.data_sender.clone();
846 let bar_type = request.bar_type;
847 let start = request.start;
848 let end = request.end;
849 let limit = request.limit.map(|n| n.get() as u64);
850 let request_id = request.request_id;
851 let client_id = request.client_id.unwrap_or(self.client_id);
852 let params = request.params;
853 let clock = self.clock;
854 let start_nanos = datetime_to_unix_nanos(start);
855 let end_nanos = datetime_to_unix_nanos(end);
856
857 get_runtime().spawn(async move {
858 match http.request_bars(bar_type, start, end, limit).await {
859 Ok(bars) => {
860 let response = DataResponse::Bars(BarsResponse::new(
861 request_id,
862 client_id,
863 bar_type,
864 bars,
865 start_nanos,
866 end_nanos,
867 clock.get_time_ns(),
868 params,
869 ));
870
871 if let Err(e) = sender.send(DataEvent::Response(response)) {
872 log::error!("Failed to send bars response: {e}");
873 }
874 }
875 Err(e) => log::error!("Bars request failed: {e:?}"),
876 }
877 });
878
879 Ok(())
880 }
881
882 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
883 let http = self.http.clone();
884 let sender = self.data_sender.clone();
885 let instrument_id = request.instrument_id;
886 let depth = request.depth.map(|n| n.get() as u32);
887 let request_id = request.request_id;
888 let client_id = request.client_id.unwrap_or(self.client_id);
889 let params = request.params;
890 let clock = self.clock;
891
892 get_runtime().spawn(async move {
893 match http.request_book_snapshot(instrument_id, depth).await {
894 Ok(book) => {
895 let response = DataResponse::Book(BookResponse::new(
896 request_id,
897 client_id,
898 instrument_id,
899 book,
900 None,
901 None,
902 clock.get_time_ns(),
903 params,
904 ));
905
906 if let Err(e) = sender.send(DataEvent::Response(response)) {
907 log::error!("Failed to send book snapshot response: {e}");
908 }
909 }
910 Err(e) => log::error!("Book snapshot request failed: {e:?}"),
911 }
912 });
913
914 Ok(())
915 }
916}
917
918#[cfg(test)]
919mod tests {
920 use nautilus_common::{live::runner::set_data_event_sender, messages::DataEvent};
921 use nautilus_model::identifiers::ClientId;
922 use rstest::rstest;
923
924 use super::*;
925 use crate::config::KrakenDataClientConfig;
926
927 fn setup_test_env() {
928 let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
929 set_data_event_sender(sender);
930 }
931
932 #[rstest]
933 fn test_spot_data_client_new() {
934 setup_test_env();
935 let config = KrakenDataClientConfig::default();
936 let client = KrakenSpotDataClient::new(ClientId::from("KRAKEN"), config);
937 assert!(client.is_ok());
938
939 let client = client.unwrap();
940 assert_eq!(client.client_id(), ClientId::from("KRAKEN"));
941 assert_eq!(client.venue(), Some(*KRAKEN_VENUE));
942 assert!(!client.is_connected());
943 assert!(client.is_disconnected());
944 assert!(client.instruments().is_empty());
945 }
946
947 #[rstest]
948 fn test_spot_data_client_start_stop() {
949 setup_test_env();
950 let config = KrakenDataClientConfig::default();
951 let mut client = KrakenSpotDataClient::new(ClientId::from("KRAKEN"), config).unwrap();
952
953 assert!(client.start().is_ok());
954 assert!(client.stop().is_ok());
955 assert!(client.is_disconnected());
956 }
957}