1use std::sync::{
22 Arc,
23 atomic::{AtomicBool, Ordering},
24};
25
26use anyhow::Context;
27use nautilus_common::{
28 clients::DataClient,
29 live::{runner::get_data_event_sender, runtime::get_runtime},
30 messages::{
31 DataEvent,
32 data::{
33 BarsResponse, BookResponse, DataResponse, InstrumentResponse, InstrumentsResponse,
34 RequestBars, RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestTrades,
35 SubscribeBars, SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices,
36 SubscribeInstrument, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
37 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeFundingRates,
38 UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeMarkPrices,
39 UnsubscribeQuotes, UnsubscribeTrades,
40 },
41 },
42};
43use nautilus_core::{
44 AtomicMap,
45 datetime::datetime_to_unix_nanos,
46 time::{AtomicTime, get_atomic_clock_realtime},
47};
48use nautilus_model::{
49 data::{Data, OrderBookDeltas_API},
50 enums::{BarAggregation, BookType, OrderSide},
51 identifiers::{ClientId, InstrumentId, Venue},
52 instruments::{Instrument, InstrumentAny},
53 orderbook::OrderBook,
54};
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57use ustr::Ustr;
58
59pub(crate) mod poll;
60
61use crate::{
62 common::{
63 consts::COINBASE_VENUE, credential::CoinbaseCredential, enums::CoinbaseWsChannel,
64 parse::bar_type_to_granularity,
65 },
66 config::CoinbaseDataClientConfig,
67 data::poll::DerivPollManager,
68 http::{
69 client::{CoinbaseHttpClient, data_client_retry_config},
70 models::{CandlesResponse, PriceBook, TickerResponse},
71 parse::{parse_bar, parse_product_book_snapshot, parse_trade_tick},
72 },
73 provider::CoinbaseInstrumentProvider,
74 websocket::{client::CoinbaseWebSocketClient, handler::NautilusWsMessage},
75};
76
77#[derive(Debug)]
83pub struct CoinbaseDataClient {
84 client_id: ClientId,
85 #[allow(dead_code)]
86 config: CoinbaseDataClientConfig,
87 http_client: CoinbaseHttpClient,
88 ws_client: CoinbaseWebSocketClient,
89 provider: CoinbaseInstrumentProvider,
90 is_connected: AtomicBool,
91 cancellation_token: CancellationToken,
92 tasks: Vec<JoinHandle<()>>,
93 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
94 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
95 deriv_polls: DerivPollManager,
96 clock: &'static AtomicTime,
97}
98
99impl CoinbaseDataClient {
100 pub fn new(client_id: ClientId, config: CoinbaseDataClientConfig) -> anyhow::Result<Self> {
106 let clock = get_atomic_clock_realtime();
107 let data_sender = get_data_event_sender();
108
109 let retry_config = data_client_retry_config();
110
111 let http_client = match CoinbaseCredential::resolve(
112 config.api_key.as_deref(),
113 config.api_secret.as_deref(),
114 ) {
115 Some(credential) => CoinbaseHttpClient::with_credentials(
116 credential,
117 config.environment,
118 config.http_timeout_secs,
119 config.proxy_url.clone(),
120 Some(retry_config),
121 )
122 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
123 None => CoinbaseHttpClient::new(
124 config.environment,
125 config.http_timeout_secs,
126 config.proxy_url.clone(),
127 Some(retry_config),
128 )
129 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
130 };
131
132 if let Some(url) = &config.base_url_rest {
133 http_client.set_base_url(url.clone());
134 }
135
136 let ws_url = config.ws_url();
137 let ws_client = CoinbaseWebSocketClient::new(
138 &ws_url,
139 config.transport_backend,
140 config.proxy_url.clone(),
141 );
142 let provider = CoinbaseInstrumentProvider::new(http_client.clone());
143
144 let deriv_polls = DerivPollManager::new(
145 http_client.clone(),
146 data_sender.clone(),
147 clock,
148 config.derivatives_poll_interval_secs,
149 );
150
151 Ok(Self {
152 client_id,
153 config,
154 http_client,
155 ws_client,
156 provider,
157 is_connected: AtomicBool::new(false),
158 cancellation_token: CancellationToken::new(),
159 tasks: Vec::new(),
160 data_sender,
161 instruments: Arc::new(AtomicMap::new()),
162 deriv_polls,
163 clock,
164 })
165 }
166
167 fn venue(&self) -> Venue {
168 *COINBASE_VENUE
169 }
170
171 async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
172 let instruments = self
173 .provider
174 .load_all()
175 .await
176 .context("failed to fetch instruments during bootstrap")?;
177
178 self.instruments.rcu(|m| {
179 for instrument in &instruments {
180 m.insert(instrument.id(), instrument.clone());
181 }
182 });
183
184 for instrument in &instruments {
185 self.ws_client.update_instrument(instrument.clone()).await;
186 }
187
188 log::info!("Bootstrapped {} instruments", instruments.len());
189 Ok(instruments)
190 }
191
192 async fn spawn_ws(&mut self) -> anyhow::Result<()> {
193 self.ws_client
194 .connect()
195 .await
196 .context("failed to connect to Coinbase WebSocket")?;
197
198 let mut out_rx = self
199 .ws_client
200 .take_out_rx()
201 .ok_or_else(|| anyhow::anyhow!("WebSocket output receiver not available"))?;
202
203 let data_sender = self.data_sender.clone();
204 let cancellation_token = self.cancellation_token.clone();
205
206 let task = get_runtime().spawn(async move {
207 log::info!("Coinbase WebSocket consumption loop started");
208
209 loop {
210 tokio::select! {
211 () = cancellation_token.cancelled() => {
212 log::info!("WebSocket consumption loop cancelled");
213 break;
214 }
215 msg_opt = out_rx.recv() => {
216 match msg_opt {
217 Some(msg) => dispatch_ws_message(msg, &data_sender),
218 None => {
219 log::debug!("WebSocket output channel closed");
220 break;
221 }
222 }
223 }
224 }
225 }
226
227 log::info!("Coinbase WebSocket consumption loop finished");
228 });
229
230 self.tasks.push(task);
231 log::info!("WebSocket consumption task spawned");
232 Ok(())
233 }
234
235 fn product_id(instrument_id: InstrumentId) -> Ustr {
236 instrument_id.symbol.inner()
237 }
238
239 fn resolve_wire_product_id(&self, subscribed: Ustr) -> Ustr {
246 self.http_client
247 .product_aliases()
248 .get_cloned(&subscribed)
249 .filter(|alias| !alias.is_empty())
250 .unwrap_or(subscribed)
251 }
252}
253
254fn dispatch_ws_message(
255 msg: NautilusWsMessage,
256 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
257) {
258 match msg {
259 NautilusWsMessage::Trade(trade) => {
260 if let Err(e) = data_sender.send(DataEvent::Data(Data::Trade(trade))) {
261 log::error!("Failed to send trade tick: {e}");
262 }
263 }
264 NautilusWsMessage::Quote(quote) => {
265 if let Err(e) = data_sender.send(DataEvent::Data(Data::Quote(quote))) {
266 log::error!("Failed to send quote tick: {e}");
267 }
268 }
269 NautilusWsMessage::Deltas(deltas) => {
270 if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
271 OrderBookDeltas_API::new(deltas),
272 ))) {
273 log::error!("Failed to send order book deltas: {e}");
274 }
275 }
276 NautilusWsMessage::Bar(bar) => {
277 if let Err(e) = data_sender.send(DataEvent::Data(Data::Bar(bar))) {
278 log::error!("Failed to send bar: {e}");
279 }
280 }
281 NautilusWsMessage::Reconnected => {
282 log::info!("WebSocket reconnected");
283 }
284 NautilusWsMessage::Error(e) => {
285 log::error!("WebSocket error: {e}");
286 }
287 NautilusWsMessage::UserOrder(_) => {
288 log::debug!("Dropping user-channel update received on the data client");
290 }
291 NautilusWsMessage::FuturesBalanceSummary(_) => {
292 log::debug!("Dropping futures_balance_summary event received on the data client");
294 }
295 }
296}
297
298#[async_trait::async_trait(?Send)]
299impl DataClient for CoinbaseDataClient {
300 fn client_id(&self) -> ClientId {
301 self.client_id
302 }
303
304 fn venue(&self) -> Option<Venue> {
305 Some(Self::venue(self))
306 }
307
308 fn start(&mut self) -> anyhow::Result<()> {
309 log::info!(
310 "Starting Coinbase data client: client_id={}, environment={:?}",
311 self.client_id,
312 self.config.environment,
313 );
314 Ok(())
315 }
316
317 fn stop(&mut self) -> anyhow::Result<()> {
318 log::info!("Stopping Coinbase data client {}", self.client_id);
319 self.cancellation_token.cancel();
320 self.deriv_polls.shutdown();
321 self.is_connected.store(false, Ordering::Relaxed);
322 Ok(())
323 }
324
325 fn reset(&mut self) -> anyhow::Result<()> {
326 log::debug!("Resetting Coinbase data client {}", self.client_id);
327 self.cancellation_token.cancel();
328 self.deriv_polls.shutdown();
329 self.is_connected.store(false, Ordering::Relaxed);
330 self.cancellation_token = CancellationToken::new();
331 self.tasks.clear();
332 Ok(())
333 }
334
335 fn dispose(&mut self) -> anyhow::Result<()> {
336 log::debug!("Disposing Coinbase data client {}", self.client_id);
337 self.stop()
338 }
339
340 fn is_connected(&self) -> bool {
341 self.is_connected.load(Ordering::Acquire)
342 }
343
344 fn is_disconnected(&self) -> bool {
345 !self.is_connected()
346 }
347
348 async fn connect(&mut self) -> anyhow::Result<()> {
349 if self.is_connected() {
350 return Ok(());
351 }
352
353 self.cancellation_token = CancellationToken::new();
354
355 let instruments = self
356 .bootstrap_instruments()
357 .await
358 .context("failed to bootstrap instruments")?;
359
360 for instrument in instruments {
361 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
362 log::warn!("Failed to send instrument: {e}");
363 }
364 }
365
366 self.spawn_ws()
367 .await
368 .context("failed to spawn WebSocket client")?;
369
370 self.deriv_polls.resume();
376
377 self.is_connected.store(true, Ordering::Relaxed);
378 log::info!("Connected: client_id={}", self.client_id);
379
380 Ok(())
381 }
382
383 async fn disconnect(&mut self) -> anyhow::Result<()> {
384 if !self.is_connected() {
385 return Ok(());
386 }
387
388 self.cancellation_token.cancel();
389 self.deriv_polls.shutdown();
390
391 for task in self.tasks.drain(..) {
392 if let Err(e) = task.await {
393 log::error!("Error waiting for task to complete: {e}");
394 }
395 }
396
397 self.ws_client.disconnect().await;
398 self.instruments.store(ahash::AHashMap::new());
399 self.is_connected.store(false, Ordering::Relaxed);
400 log::info!("Disconnected: client_id={}", self.client_id);
401
402 Ok(())
403 }
404
405 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
406 let instruments = self.instruments.load();
407
408 if let Some(instrument) = instruments.get(&cmd.instrument_id) {
409 if let Err(e) = self
410 .data_sender
411 .send(DataEvent::Instrument(instrument.clone()))
412 {
413 log::error!("Failed to send instrument {}: {e}", cmd.instrument_id);
414 }
415 } else {
416 log::warn!("Instrument {} not found in cache", cmd.instrument_id);
417 }
418
419 Ok(())
420 }
421
422 fn subscribe_book_deltas(&mut self, subscription: SubscribeBookDeltas) -> anyhow::Result<()> {
423 log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
424
425 if subscription.book_type != BookType::L2_MBP {
426 anyhow::bail!("Coinbase only supports L2_MBP order book deltas");
427 }
428
429 let ws = self.ws_client.clone();
430 let subscribed_id = Self::product_id(subscription.instrument_id);
431 let wire_id = self.resolve_wire_product_id(subscribed_id);
432 if wire_id != subscribed_id {
433 ws.register_subscription_alias(wire_id, subscribed_id);
434 }
435
436 get_runtime().spawn(async move {
437 if let Err(e) = ws.subscribe(CoinbaseWsChannel::Level2, &[wire_id]).await {
438 log::error!("Failed to subscribe to book deltas: {e:?}");
439 }
440 });
441
442 Ok(())
443 }
444
445 fn subscribe_quotes(&mut self, subscription: SubscribeQuotes) -> anyhow::Result<()> {
446 log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
447
448 let ws = self.ws_client.clone();
449 let subscribed_id = Self::product_id(subscription.instrument_id);
450 let wire_id = self.resolve_wire_product_id(subscribed_id);
451 if wire_id != subscribed_id {
452 ws.register_subscription_alias(wire_id, subscribed_id);
453 }
454
455 get_runtime().spawn(async move {
456 if let Err(e) = ws.subscribe(CoinbaseWsChannel::Ticker, &[wire_id]).await {
457 log::error!("Failed to subscribe to quotes: {e:?}");
458 }
459 });
460
461 Ok(())
462 }
463
464 fn subscribe_trades(&mut self, subscription: SubscribeTrades) -> anyhow::Result<()> {
465 log::debug!("Subscribing to trades: {}", subscription.instrument_id);
466
467 let ws = self.ws_client.clone();
468 let subscribed_id = Self::product_id(subscription.instrument_id);
469 let wire_id = self.resolve_wire_product_id(subscribed_id);
470 if wire_id != subscribed_id {
471 ws.register_subscription_alias(wire_id, subscribed_id);
472 }
473
474 get_runtime().spawn(async move {
475 if let Err(e) = ws
476 .subscribe(CoinbaseWsChannel::MarketTrades, &[wire_id])
477 .await
478 {
479 log::error!("Failed to subscribe to trades: {e:?}");
480 }
481 });
482
483 Ok(())
484 }
485
486 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
487 anyhow::bail!(
493 "Coinbase Advanced Trade does not publish mark prices; \
494 cannot subscribe for {}",
495 cmd.instrument_id
496 )
497 }
498
499 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
500 log::debug!("Subscribing to index prices: {}", cmd.instrument_id);
501 self.deriv_polls.subscribe_index(cmd.instrument_id);
502 Ok(())
503 }
504
505 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
506 log::debug!("Subscribing to funding rates: {}", cmd.instrument_id);
507 self.deriv_polls.subscribe_funding(cmd.instrument_id);
508 Ok(())
509 }
510
511 fn subscribe_bars(&mut self, subscription: SubscribeBars) -> anyhow::Result<()> {
512 log::debug!("Subscribing to bars: {}", subscription.bar_type);
513
514 let instrument_id = subscription.bar_type.instrument_id();
515
516 if !self.instruments.contains_key(&instrument_id) {
517 anyhow::bail!("Instrument {instrument_id} not found");
518 }
519
520 let bar_type = subscription.bar_type;
521 let subscribed_id = Self::product_id(instrument_id);
522 let wire_id = self.resolve_wire_product_id(subscribed_id);
523 if wire_id != subscribed_id {
524 self.ws_client
525 .register_subscription_alias(wire_id, subscribed_id);
526 }
527 let key = wire_id.to_string();
528
529 self.ws_client.register_bar_type(key.clone(), bar_type);
531
532 let mut ws = self.ws_client.clone();
533
534 get_runtime().spawn(async move {
535 ws.add_bar_type(key, bar_type).await;
536
537 if let Err(e) = ws.subscribe(CoinbaseWsChannel::Candles, &[wire_id]).await {
538 log::error!("Failed to subscribe to bars: {e:?}");
539 }
540 });
541
542 Ok(())
543 }
544
545 fn unsubscribe_instrument(
555 &mut self,
556 _unsubscription: &UnsubscribeInstrument,
557 ) -> anyhow::Result<()> {
558 Ok(())
560 }
561
562 fn unsubscribe_book_deltas(
563 &mut self,
564 unsubscription: &UnsubscribeBookDeltas,
565 ) -> anyhow::Result<()> {
566 log::debug!(
567 "Unsubscribing from book deltas: {}",
568 unsubscription.instrument_id
569 );
570
571 let ws = self.ws_client.clone();
572 let subscribed_id = Self::product_id(unsubscription.instrument_id);
573 let wire_id = self.resolve_wire_product_id(subscribed_id);
574
575 get_runtime().spawn(async move {
576 if let Err(e) = ws.unsubscribe(CoinbaseWsChannel::Level2, &[wire_id]).await {
577 log::error!("Failed to unsubscribe from book deltas: {e:?}");
578 }
579 });
580
581 Ok(())
582 }
583
584 fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
585 log::debug!(
586 "Unsubscribing from quotes: {}",
587 unsubscription.instrument_id
588 );
589
590 let ws = self.ws_client.clone();
591 let subscribed_id = Self::product_id(unsubscription.instrument_id);
592 let wire_id = self.resolve_wire_product_id(subscribed_id);
593
594 get_runtime().spawn(async move {
595 if let Err(e) = ws.unsubscribe(CoinbaseWsChannel::Ticker, &[wire_id]).await {
596 log::error!("Failed to unsubscribe from quotes: {e:?}");
597 }
598 });
599
600 Ok(())
601 }
602
603 fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
604 log::debug!(
605 "Unsubscribing from trades: {}",
606 unsubscription.instrument_id
607 );
608
609 let ws = self.ws_client.clone();
610 let subscribed_id = Self::product_id(unsubscription.instrument_id);
611 let wire_id = self.resolve_wire_product_id(subscribed_id);
612
613 get_runtime().spawn(async move {
614 if let Err(e) = ws
615 .unsubscribe(CoinbaseWsChannel::MarketTrades, &[wire_id])
616 .await
617 {
618 log::error!("Failed to unsubscribe from trades: {e:?}");
619 }
620 });
621
622 Ok(())
623 }
624
625 fn unsubscribe_mark_prices(&mut self, _cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
626 Ok(())
627 }
628
629 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
630 log::debug!("Unsubscribing from index prices: {}", cmd.instrument_id);
631 self.deriv_polls.unsubscribe_index(cmd.instrument_id);
632 Ok(())
633 }
634
635 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
636 log::debug!("Unsubscribing from funding rates: {}", cmd.instrument_id);
637 self.deriv_polls.unsubscribe_funding(cmd.instrument_id);
638 Ok(())
639 }
640
641 fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
642 log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
643
644 let instrument_id = unsubscription.bar_type.instrument_id();
645 let subscribed_id = Self::product_id(instrument_id);
646 let wire_id = self.resolve_wire_product_id(subscribed_id);
647 let ws = self.ws_client.clone();
648
649 get_runtime().spawn(async move {
650 if let Err(e) = ws.unsubscribe(CoinbaseWsChannel::Candles, &[wire_id]).await {
651 log::error!("Failed to unsubscribe from bars: {e:?}");
652 }
653 });
654
655 Ok(())
656 }
657
658 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
659 log::debug!("Requesting all instruments");
660
661 let provider = self.provider.clone();
662 let sender = self.data_sender.clone();
663 let instruments_cache = self.instruments.clone();
664 let ws = self.ws_client.clone();
665 let request_id = request.request_id;
666 let client_id = request.client_id.unwrap_or(self.client_id);
667 let venue = Self::venue(self);
668 let start_nanos = datetime_to_unix_nanos(request.start);
669 let end_nanos = datetime_to_unix_nanos(request.end);
670 let params = request.params;
671 let clock = self.clock;
672
673 get_runtime().spawn(async move {
674 match provider.load_all().await {
675 Ok(instruments) => {
676 instruments_cache.rcu(|m| {
677 for instrument in &instruments {
678 m.insert(instrument.id(), instrument.clone());
679 }
680 });
681
682 for instrument in &instruments {
683 ws.update_instrument(instrument.clone()).await;
684 }
685
686 let response = DataResponse::Instruments(InstrumentsResponse::new(
687 request_id,
688 client_id,
689 venue,
690 instruments,
691 start_nanos,
692 end_nanos,
693 clock.get_time_ns(),
694 params,
695 ));
696
697 if let Err(e) = sender.send(DataEvent::Response(response)) {
698 log::error!("Failed to send instruments response: {e}");
699 }
700 }
701 Err(e) => {
702 log::error!("Failed to fetch instruments: {e:?}");
703 }
704 }
705 });
706
707 Ok(())
708 }
709
710 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
711 log::debug!("Requesting instrument: {}", request.instrument_id);
712
713 let provider = self.provider.clone();
714 let sender = self.data_sender.clone();
715 let instruments_cache = self.instruments.clone();
716 let ws = self.ws_client.clone();
717 let instrument_id = request.instrument_id;
718 let product_id = instrument_id.symbol.to_string();
719 let request_id = request.request_id;
720 let client_id = request.client_id.unwrap_or(self.client_id);
721 let start_nanos = datetime_to_unix_nanos(request.start);
722 let end_nanos = datetime_to_unix_nanos(request.end);
723 let params = request.params;
724 let clock = self.clock;
725
726 get_runtime().spawn(async move {
727 match provider.load(&product_id).await {
728 Ok(instrument) => {
729 instruments_cache.rcu(|m| {
730 m.insert(instrument.id(), instrument.clone());
731 });
732 ws.update_instrument(instrument.clone()).await;
733
734 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
735 request_id,
736 client_id,
737 instrument.id(),
738 instrument,
739 start_nanos,
740 end_nanos,
741 clock.get_time_ns(),
742 params,
743 )));
744
745 if let Err(e) = sender.send(DataEvent::Response(response)) {
746 log::error!("Failed to send instrument response: {e}");
747 }
748 }
749 Err(e) => {
750 log::error!("Failed to fetch instrument {instrument_id}: {e:?}");
751 }
752 }
753 });
754
755 Ok(())
756 }
757
758 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
759 let instrument_id = request.instrument_id;
760 let product_id = instrument_id.symbol.to_string();
761
762 let instruments = self.instruments.load();
763 let instrument = instruments
764 .get(&instrument_id)
765 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
766 let price_precision = instrument.price_precision();
767 let size_precision = instrument.size_precision();
768 let depth = request.depth.map(|d| d.get() as u32);
769
770 let http = self.http_client.clone();
771 let sender = self.data_sender.clone();
772 let client_id = request.client_id.unwrap_or(self.client_id);
773 let request_id = request.request_id;
774 let params = request.params;
775 let clock = self.clock;
776
777 get_runtime().spawn(async move {
778 match http.get_product_book(&product_id, depth).await {
779 Ok(json) => {
780 let pricebook_value = json.get("pricebook").cloned().unwrap_or(json);
781
782 let pricebook: PriceBook = match serde_json::from_value(pricebook_value) {
783 Ok(b) => b,
784 Err(e) => {
785 log::error!("Failed to parse product book: {e}");
786 return;
787 }
788 };
789
790 let ts_init = clock.get_time_ns();
791
792 match parse_product_book_snapshot(
793 &pricebook,
794 instrument_id,
795 price_precision,
796 size_precision,
797 ts_init,
798 ) {
799 Ok(deltas) => {
800 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
801
802 for delta in &deltas.deltas {
803 if delta.order.side != OrderSide::NoOrderSide {
804 book.add(
805 delta.order,
806 delta.flags,
807 delta.sequence,
808 delta.ts_event,
809 );
810 }
811 }
812
813 let response = DataResponse::Book(BookResponse::new(
814 request_id,
815 client_id,
816 instrument_id,
817 book,
818 None,
819 None,
820 clock.get_time_ns(),
821 params,
822 ));
823
824 if let Err(e) = sender.send(DataEvent::Response(response)) {
825 log::error!("Failed to send book snapshot response: {e}");
826 }
827 }
828 Err(e) => {
829 log::error!("Failed to parse book snapshot for {instrument_id}: {e}");
830 }
831 }
832 }
833 Err(e) => {
834 log::error!("Book snapshot request failed for {instrument_id}: {e:?}");
835 }
836 }
837 });
838
839 Ok(())
840 }
841
842 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
843 log::debug!("Requesting trades for {}", request.instrument_id);
844
845 let instrument_id = request.instrument_id;
846 let product_id = instrument_id.symbol.to_string();
847
848 let instruments = self.instruments.load();
849 let instrument = instruments
850 .get(&instrument_id)
851 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
852 let price_precision = instrument.price_precision();
853 let size_precision = instrument.size_precision();
854
855 let http = self.http_client.clone();
856 let sender = self.data_sender.clone();
857 let request_id = request.request_id;
858 let client_id = request.client_id.unwrap_or(self.client_id);
859 let limit = request.limit.map_or(100, |n| n.get() as u32);
860 let start_nanos = datetime_to_unix_nanos(request.start);
861 let end_nanos = datetime_to_unix_nanos(request.end);
862 let params = request.params;
863 let clock = self.clock;
864
865 get_runtime().spawn(async move {
866 match http.get_market_trades(&product_id, limit).await {
867 Ok(json) => {
868 let ticker: TickerResponse = match serde_json::from_value(json) {
869 Ok(r) => r,
870 Err(e) => {
871 log::error!("Failed to parse trades response: {e}");
872 return;
873 }
874 };
875
876 let ts_init = clock.get_time_ns();
877 let mut trades: Vec<_> = ticker
878 .trades
879 .iter()
880 .filter_map(|trade| {
881 parse_trade_tick(
882 trade,
883 instrument_id,
884 price_precision,
885 size_precision,
886 ts_init,
887 )
888 .map_err(|e| log::warn!("Failed to parse trade: {e}"))
889 .ok()
890 })
891 .collect();
892
893 trades.sort_by_key(|t| t.ts_event);
895
896 let response = DataResponse::Trades(TradesResponse::new(
897 request_id,
898 client_id,
899 instrument_id,
900 trades,
901 start_nanos,
902 end_nanos,
903 clock.get_time_ns(),
904 params,
905 ));
906
907 if let Err(e) = sender.send(DataEvent::Response(response)) {
908 log::error!("Failed to send trades response: {e}");
909 }
910 }
911 Err(e) => log::error!("Trades request failed for {instrument_id}: {e:?}"),
912 }
913 });
914
915 Ok(())
916 }
917
918 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
919 log::debug!("Requesting bars for {}", request.bar_type);
920
921 let bar_type = request.bar_type;
922 let granularity = bar_type_to_granularity(&bar_type)?;
923 let instrument_id = bar_type.instrument_id();
924 let product_id = instrument_id.symbol.to_string();
925
926 let instruments = self.instruments.load();
927 let instrument = instruments
928 .get(&instrument_id)
929 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
930 let price_precision = instrument.price_precision();
931 let size_precision = instrument.size_precision();
932
933 let http = self.http_client.clone();
934 let sender = self.data_sender.clone();
935 let request_id = request.request_id;
936 let client_id = request.client_id.unwrap_or(self.client_id);
937 let start = request.start;
938 let end = request.end;
939 let limit = request.limit.map(|n| n.get());
940 let start_nanos = datetime_to_unix_nanos(start);
941 let end_nanos = datetime_to_unix_nanos(end);
942 let params = request.params;
943 let clock = self.clock;
944
945 get_runtime().spawn(async move {
946 let now = chrono::Utc::now();
947 let end_secs = end.unwrap_or(now).timestamp().to_string();
948 let start_secs = if let Some(s) = start {
949 s.timestamp().to_string()
950 } else {
951 let spec = bar_type.spec();
952 let step_secs = match spec.aggregation {
953 BarAggregation::Minute => spec.step.get() as i64 * 60,
954 BarAggregation::Hour => spec.step.get() as i64 * 3600,
955 BarAggregation::Day => spec.step.get() as i64 * 86400,
956 _ => 60,
957 };
958 let count = limit.unwrap_or(300) as i64;
959 let end_ts = end.unwrap_or(now).timestamp();
960 (end_ts - count * step_secs).to_string()
961 };
962
963 let granularity_str = granularity.to_string();
964
965 match http
966 .get_candles(&product_id, &start_secs, &end_secs, &granularity_str)
967 .await
968 {
969 Ok(json) => {
970 let candles_response: CandlesResponse = match serde_json::from_value(json) {
971 Ok(r) => r,
972 Err(e) => {
973 log::error!("Failed to parse candles response: {e}");
974 return;
975 }
976 };
977
978 let ts_init = clock.get_time_ns();
979 let mut bars: Vec<_> = candles_response
980 .candles
981 .iter()
982 .filter_map(|candle| {
983 parse_bar(candle, bar_type, price_precision, size_precision, ts_init)
984 .map_err(|e| log::warn!("Failed to parse bar: {e}"))
985 .ok()
986 })
987 .collect();
988
989 bars.sort_by_key(|b| b.ts_event);
990
991 if let Some(limit) = limit
992 && bars.len() > limit
993 {
994 bars.drain(..bars.len() - limit);
995 }
996
997 let response = DataResponse::Bars(BarsResponse::new(
998 request_id,
999 client_id,
1000 bar_type,
1001 bars,
1002 start_nanos,
1003 end_nanos,
1004 clock.get_time_ns(),
1005 params,
1006 ));
1007
1008 if let Err(e) = sender.send(DataEvent::Response(response)) {
1009 log::error!("Failed to send bars response: {e}");
1010 }
1011 }
1012 Err(e) => log::error!("Bar request failed: {e:?}"),
1013 }
1014 });
1015
1016 Ok(())
1017 }
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022 use nautilus_common::{
1023 live::runner::set_data_event_sender, messages::data::SubscribeMarkPrices,
1024 };
1025 use nautilus_core::{UUID4, UnixNanos};
1026 use nautilus_model::identifiers::InstrumentId;
1027 use rstest::rstest;
1028
1029 use super::*;
1030
1031 #[rstest]
1036 #[tokio::test]
1037 async fn test_subscribe_mark_prices_rejects_with_explicit_error() {
1038 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1039 set_data_event_sender(tx);
1040
1041 let config = CoinbaseDataClientConfig::default();
1042 let mut client = CoinbaseDataClient::new(ClientId::new("COINBASE"), config)
1043 .expect("client construction");
1044
1045 let instrument_id = InstrumentId::from("BIP-20DEC30-CDE.COINBASE");
1046 let cmd = SubscribeMarkPrices::new(
1047 instrument_id,
1048 Some(ClientId::new("COINBASE")),
1049 None,
1050 UUID4::new(),
1051 UnixNanos::default(),
1052 None,
1053 None,
1054 );
1055
1056 let err = client
1057 .subscribe_mark_prices(cmd)
1058 .expect_err("must reject mark-price subscriptions");
1059 let msg = err.to_string();
1060 assert!(
1061 msg.contains("mark prices"),
1062 "error must mention mark prices, was: {msg}"
1063 );
1064 assert!(
1065 msg.contains("BIP-20DEC30-CDE.COINBASE"),
1066 "error must name the instrument, was: {msg}"
1067 );
1068 }
1069}