Skip to main content

nautilus_coinbase/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Coinbase Advanced Trade data client for NautilusTrader.
17//!
18//! Implements the [`DataClient`] trait, providing market data subscriptions and
19//! historical data requests through the Coinbase Advanced Trade API.
20
21use std::sync::{
22    Arc,
23    atomic::{AtomicBool, Ordering},
24};
25
26use anyhow::Context;
27use nautilus_common::{
28    clients::DataClient,
29    live::{runner::get_data_event_sender, runtime::get_runtime},
30    messages::{
31        DataEvent,
32        data::{
33            BarsResponse, BookResponse, DataResponse, InstrumentResponse, InstrumentsResponse,
34            RequestBars, RequestBookSnapshot, RequestInstrument, RequestInstruments, RequestTrades,
35            SubscribeBars, SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices,
36            SubscribeInstrument, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
37            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeFundingRates,
38            UnsubscribeIndexPrices, UnsubscribeInstrument, UnsubscribeMarkPrices,
39            UnsubscribeQuotes, UnsubscribeTrades,
40        },
41    },
42};
43use nautilus_core::{
44    AtomicMap,
45    datetime::datetime_to_unix_nanos,
46    time::{AtomicTime, get_atomic_clock_realtime},
47};
48use nautilus_model::{
49    data::{Data, OrderBookDeltas_API},
50    enums::{BarAggregation, BookType, OrderSide},
51    identifiers::{ClientId, InstrumentId, Venue},
52    instruments::{Instrument, InstrumentAny},
53    orderbook::OrderBook,
54};
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57use ustr::Ustr;
58
59pub(crate) mod poll;
60
61use crate::{
62    common::{
63        consts::COINBASE_VENUE, credential::CoinbaseCredential, enums::CoinbaseWsChannel,
64        parse::bar_type_to_granularity,
65    },
66    config::CoinbaseDataClientConfig,
67    data::poll::DerivPollManager,
68    http::{
69        client::{CoinbaseHttpClient, data_client_retry_config},
70        models::{CandlesResponse, PriceBook, TickerResponse},
71        parse::{parse_bar, parse_product_book_snapshot, parse_trade_tick},
72    },
73    provider::CoinbaseInstrumentProvider,
74    websocket::{client::CoinbaseWebSocketClient, handler::NautilusWsMessage},
75};
76
77/// Data client for Coinbase Advanced Trade.
78///
79/// Owns an HTTP client, WebSocket client, and instrument provider. Bootstraps
80/// instruments on connect, subscribes to WS channels for live data, and handles
81/// historical data requests through the REST API.
82#[derive(Debug)]
83pub struct CoinbaseDataClient {
84    client_id: ClientId,
85    #[allow(dead_code)]
86    config: CoinbaseDataClientConfig,
87    http_client: CoinbaseHttpClient,
88    ws_client: CoinbaseWebSocketClient,
89    provider: CoinbaseInstrumentProvider,
90    is_connected: AtomicBool,
91    cancellation_token: CancellationToken,
92    tasks: Vec<JoinHandle<()>>,
93    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
94    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
95    deriv_polls: DerivPollManager,
96    clock: &'static AtomicTime,
97}
98
99impl CoinbaseDataClient {
100    /// Creates a new [`CoinbaseDataClient`] instance.
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if the HTTP client fails to initialize.
105    pub fn new(client_id: ClientId, config: CoinbaseDataClientConfig) -> anyhow::Result<Self> {
106        let clock = get_atomic_clock_realtime();
107        let data_sender = get_data_event_sender();
108
109        let retry_config = data_client_retry_config();
110
111        let http_client = match CoinbaseCredential::resolve(
112            config.api_key.as_deref(),
113            config.api_secret.as_deref(),
114        ) {
115            Some(credential) => CoinbaseHttpClient::with_credentials(
116                credential,
117                config.environment,
118                config.http_timeout_secs,
119                config.proxy_url.clone(),
120                Some(retry_config),
121            )
122            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
123            None => CoinbaseHttpClient::new(
124                config.environment,
125                config.http_timeout_secs,
126                config.proxy_url.clone(),
127                Some(retry_config),
128            )
129            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
130        };
131
132        if let Some(url) = &config.base_url_rest {
133            http_client.set_base_url(url.clone());
134        }
135
136        let ws_url = config.ws_url();
137        let ws_client = CoinbaseWebSocketClient::new(
138            &ws_url,
139            config.transport_backend,
140            config.proxy_url.clone(),
141        );
142        let provider = CoinbaseInstrumentProvider::new(http_client.clone());
143
144        let deriv_polls = DerivPollManager::new(
145            http_client.clone(),
146            data_sender.clone(),
147            clock,
148            config.derivatives_poll_interval_secs,
149        );
150
151        Ok(Self {
152            client_id,
153            config,
154            http_client,
155            ws_client,
156            provider,
157            is_connected: AtomicBool::new(false),
158            cancellation_token: CancellationToken::new(),
159            tasks: Vec::new(),
160            data_sender,
161            instruments: Arc::new(AtomicMap::new()),
162            deriv_polls,
163            clock,
164        })
165    }
166
167    fn venue(&self) -> Venue {
168        *COINBASE_VENUE
169    }
170
171    async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
172        let instruments = self
173            .provider
174            .load_all()
175            .await
176            .context("failed to fetch instruments during bootstrap")?;
177
178        self.instruments.rcu(|m| {
179            for instrument in &instruments {
180                m.insert(instrument.id(), instrument.clone());
181            }
182        });
183
184        for instrument in &instruments {
185            self.ws_client.update_instrument(instrument.clone()).await;
186        }
187
188        log::info!("Bootstrapped {} instruments", instruments.len());
189        Ok(instruments)
190    }
191
192    async fn spawn_ws(&mut self) -> anyhow::Result<()> {
193        self.ws_client
194            .connect()
195            .await
196            .context("failed to connect to Coinbase WebSocket")?;
197
198        let mut out_rx = self
199            .ws_client
200            .take_out_rx()
201            .ok_or_else(|| anyhow::anyhow!("WebSocket output receiver not available"))?;
202
203        let data_sender = self.data_sender.clone();
204        let cancellation_token = self.cancellation_token.clone();
205
206        let task = get_runtime().spawn(async move {
207            log::info!("Coinbase WebSocket consumption loop started");
208
209            loop {
210                tokio::select! {
211                    () = cancellation_token.cancelled() => {
212                        log::info!("WebSocket consumption loop cancelled");
213                        break;
214                    }
215                    msg_opt = out_rx.recv() => {
216                        match msg_opt {
217                            Some(msg) => dispatch_ws_message(msg, &data_sender),
218                            None => {
219                                log::debug!("WebSocket output channel closed");
220                                break;
221                            }
222                        }
223                    }
224                }
225            }
226
227            log::info!("Coinbase WebSocket consumption loop finished");
228        });
229
230        self.tasks.push(task);
231        log::info!("WebSocket consumption task spawned");
232        Ok(())
233    }
234
235    fn product_id(instrument_id: InstrumentId) -> Ustr {
236        instrument_id.symbol.inner()
237    }
238
239    // Resolves a caller-supplied product id to Coinbase's canonical alias (if
240    // any). Coinbase consolidates aliased pairs into a single book server-side
241    // and rewrites WS subscription confirmations and inbound messages to use
242    // the canonical id (e.g. BTC-USDC -> BTC-USD), so we must subscribe with
243    // the canonical id and remember the mapping so inbound messages can be
244    // re-keyed to what the strategy actually subscribed to.
245    fn resolve_wire_product_id(&self, subscribed: Ustr) -> Ustr {
246        self.http_client
247            .product_aliases()
248            .get_cloned(&subscribed)
249            .filter(|alias| !alias.is_empty())
250            .unwrap_or(subscribed)
251    }
252}
253
254fn dispatch_ws_message(
255    msg: NautilusWsMessage,
256    data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
257) {
258    match msg {
259        NautilusWsMessage::Trade(trade) => {
260            if let Err(e) = data_sender.send(DataEvent::Data(Data::Trade(trade))) {
261                log::error!("Failed to send trade tick: {e}");
262            }
263        }
264        NautilusWsMessage::Quote(quote) => {
265            if let Err(e) = data_sender.send(DataEvent::Data(Data::Quote(quote))) {
266                log::error!("Failed to send quote tick: {e}");
267            }
268        }
269        NautilusWsMessage::Deltas(deltas) => {
270            if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
271                OrderBookDeltas_API::new(deltas),
272            ))) {
273                log::error!("Failed to send order book deltas: {e}");
274            }
275        }
276        NautilusWsMessage::Bar(bar) => {
277            if let Err(e) = data_sender.send(DataEvent::Data(Data::Bar(bar))) {
278                log::error!("Failed to send bar: {e}");
279            }
280        }
281        NautilusWsMessage::Reconnected => {
282            log::info!("WebSocket reconnected");
283        }
284        NautilusWsMessage::Error(e) => {
285            log::error!("WebSocket error: {e}");
286        }
287        NautilusWsMessage::UserOrder(_) => {
288            // User-channel execution reports are consumed by the execution client
289            log::debug!("Dropping user-channel update received on the data client");
290        }
291        NautilusWsMessage::FuturesBalanceSummary(_) => {
292            // Futures balance summary events are consumed by the execution client
293            log::debug!("Dropping futures_balance_summary event received on the data client");
294        }
295    }
296}
297
298#[async_trait::async_trait(?Send)]
299impl DataClient for CoinbaseDataClient {
300    fn client_id(&self) -> ClientId {
301        self.client_id
302    }
303
304    fn venue(&self) -> Option<Venue> {
305        Some(Self::venue(self))
306    }
307
308    fn start(&mut self) -> anyhow::Result<()> {
309        log::info!(
310            "Starting Coinbase data client: client_id={}, environment={:?}",
311            self.client_id,
312            self.config.environment,
313        );
314        Ok(())
315    }
316
317    fn stop(&mut self) -> anyhow::Result<()> {
318        log::info!("Stopping Coinbase data client {}", self.client_id);
319        self.cancellation_token.cancel();
320        self.deriv_polls.shutdown();
321        self.is_connected.store(false, Ordering::Relaxed);
322        Ok(())
323    }
324
325    fn reset(&mut self) -> anyhow::Result<()> {
326        log::debug!("Resetting Coinbase data client {}", self.client_id);
327        self.cancellation_token.cancel();
328        self.deriv_polls.shutdown();
329        self.is_connected.store(false, Ordering::Relaxed);
330        self.cancellation_token = CancellationToken::new();
331        self.tasks.clear();
332        Ok(())
333    }
334
335    fn dispose(&mut self) -> anyhow::Result<()> {
336        log::debug!("Disposing Coinbase data client {}", self.client_id);
337        self.stop()
338    }
339
340    fn is_connected(&self) -> bool {
341        self.is_connected.load(Ordering::Acquire)
342    }
343
344    fn is_disconnected(&self) -> bool {
345        !self.is_connected()
346    }
347
348    async fn connect(&mut self) -> anyhow::Result<()> {
349        if self.is_connected() {
350            return Ok(());
351        }
352
353        self.cancellation_token = CancellationToken::new();
354
355        let instruments = self
356            .bootstrap_instruments()
357            .await
358            .context("failed to bootstrap instruments")?;
359
360        for instrument in instruments {
361            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
362                log::warn!("Failed to send instrument: {e}");
363            }
364        }
365
366        self.spawn_ws()
367            .await
368            .context("failed to spawn WebSocket client")?;
369
370        // Re-spawn polling tasks for any derivatives subscriptions that
371        // survived a previous disconnect. The data engine's client adapter
372        // remembers the subscription set and suppresses duplicate subscribe
373        // commands, so without this resume step index-price and
374        // funding-rate streams would stay dark after a reconnect.
375        self.deriv_polls.resume();
376
377        self.is_connected.store(true, Ordering::Relaxed);
378        log::info!("Connected: client_id={}", self.client_id);
379
380        Ok(())
381    }
382
383    async fn disconnect(&mut self) -> anyhow::Result<()> {
384        if !self.is_connected() {
385            return Ok(());
386        }
387
388        self.cancellation_token.cancel();
389        self.deriv_polls.shutdown();
390
391        for task in self.tasks.drain(..) {
392            if let Err(e) = task.await {
393                log::error!("Error waiting for task to complete: {e}");
394            }
395        }
396
397        self.ws_client.disconnect().await;
398        self.instruments.store(ahash::AHashMap::new());
399        self.is_connected.store(false, Ordering::Relaxed);
400        log::info!("Disconnected: client_id={}", self.client_id);
401
402        Ok(())
403    }
404
405    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
406        let instruments = self.instruments.load();
407
408        if let Some(instrument) = instruments.get(&cmd.instrument_id) {
409            if let Err(e) = self
410                .data_sender
411                .send(DataEvent::Instrument(instrument.clone()))
412            {
413                log::error!("Failed to send instrument {}: {e}", cmd.instrument_id);
414            }
415        } else {
416            log::warn!("Instrument {} not found in cache", cmd.instrument_id);
417        }
418
419        Ok(())
420    }
421
422    fn subscribe_book_deltas(&mut self, subscription: SubscribeBookDeltas) -> anyhow::Result<()> {
423        log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
424
425        if subscription.book_type != BookType::L2_MBP {
426            anyhow::bail!("Coinbase only supports L2_MBP order book deltas");
427        }
428
429        let ws = self.ws_client.clone();
430        let subscribed_id = Self::product_id(subscription.instrument_id);
431        let wire_id = self.resolve_wire_product_id(subscribed_id);
432        if wire_id != subscribed_id {
433            ws.register_subscription_alias(wire_id, subscribed_id);
434        }
435
436        get_runtime().spawn(async move {
437            if let Err(e) = ws.subscribe(CoinbaseWsChannel::Level2, &[wire_id]).await {
438                log::error!("Failed to subscribe to book deltas: {e:?}");
439            }
440        });
441
442        Ok(())
443    }
444
445    fn subscribe_quotes(&mut self, subscription: SubscribeQuotes) -> anyhow::Result<()> {
446        log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
447
448        let ws = self.ws_client.clone();
449        let subscribed_id = Self::product_id(subscription.instrument_id);
450        let wire_id = self.resolve_wire_product_id(subscribed_id);
451        if wire_id != subscribed_id {
452            ws.register_subscription_alias(wire_id, subscribed_id);
453        }
454
455        get_runtime().spawn(async move {
456            if let Err(e) = ws.subscribe(CoinbaseWsChannel::Ticker, &[wire_id]).await {
457                log::error!("Failed to subscribe to quotes: {e:?}");
458            }
459        });
460
461        Ok(())
462    }
463
464    fn subscribe_trades(&mut self, subscription: SubscribeTrades) -> anyhow::Result<()> {
465        log::debug!("Subscribing to trades: {}", subscription.instrument_id);
466
467        let ws = self.ws_client.clone();
468        let subscribed_id = Self::product_id(subscription.instrument_id);
469        let wire_id = self.resolve_wire_product_id(subscribed_id);
470        if wire_id != subscribed_id {
471            ws.register_subscription_alias(wire_id, subscribed_id);
472        }
473
474        get_runtime().spawn(async move {
475            if let Err(e) = ws
476                .subscribe(CoinbaseWsChannel::MarketTrades, &[wire_id])
477                .await
478            {
479                log::error!("Failed to subscribe to trades: {e:?}");
480            }
481        });
482
483        Ok(())
484    }
485
486    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
487        // Coinbase Advanced Trade does not publish a live mark price for its
488        // perpetuals on either WS or REST. `settlement_price` is the prior
489        // daily settlement and drifts from the live index, so synthesizing a
490        // mark from it would be misleading. Reject explicitly so callers
491        // failing this subscription know why.
492        anyhow::bail!(
493            "Coinbase Advanced Trade does not publish mark prices; \
494             cannot subscribe for {}",
495            cmd.instrument_id
496        )
497    }
498
499    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
500        log::debug!("Subscribing to index prices: {}", cmd.instrument_id);
501        self.deriv_polls.subscribe_index(cmd.instrument_id);
502        Ok(())
503    }
504
505    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
506        log::debug!("Subscribing to funding rates: {}", cmd.instrument_id);
507        self.deriv_polls.subscribe_funding(cmd.instrument_id);
508        Ok(())
509    }
510
511    fn subscribe_bars(&mut self, subscription: SubscribeBars) -> anyhow::Result<()> {
512        log::debug!("Subscribing to bars: {}", subscription.bar_type);
513
514        let instrument_id = subscription.bar_type.instrument_id();
515
516        if !self.instruments.contains_key(&instrument_id) {
517            anyhow::bail!("Instrument {instrument_id} not found");
518        }
519
520        let bar_type = subscription.bar_type;
521        let subscribed_id = Self::product_id(instrument_id);
522        let wire_id = self.resolve_wire_product_id(subscribed_id);
523        if wire_id != subscribed_id {
524            self.ws_client
525                .register_subscription_alias(wire_id, subscribed_id);
526        }
527        let key = wire_id.to_string();
528
529        // Register on the original client so the bar type persists across clones
530        self.ws_client.register_bar_type(key.clone(), bar_type);
531
532        let mut ws = self.ws_client.clone();
533
534        get_runtime().spawn(async move {
535            ws.add_bar_type(key, bar_type).await;
536
537            if let Err(e) = ws.subscribe(CoinbaseWsChannel::Candles, &[wire_id]).await {
538                log::error!("Failed to subscribe to bars: {e:?}");
539            }
540        });
541
542        Ok(())
543    }
544
545    // Unsubscribe paths intentionally do NOT call
546    // `unregister_subscription_alias`. The same canonical wire id is shared
547    // across multiple data channels (ticker, market_trades, level2,
548    // candles), so dropping the entry on the first unsubscribe would cause
549    // every still-active channel for the same alias to mistag inbound
550    // messages. The mapping is stable per product for the process lifetime
551    // and the venue does not deliver messages for products that aren't
552    // subscribed to, so leaving it in place is safe.
553
554    fn unsubscribe_instrument(
555        &mut self,
556        _unsubscription: &UnsubscribeInstrument,
557    ) -> anyhow::Result<()> {
558        // `subscribe_instrument` only replays cached state; no venue subscription to tear down.
559        Ok(())
560    }
561
562    fn unsubscribe_book_deltas(
563        &mut self,
564        unsubscription: &UnsubscribeBookDeltas,
565    ) -> anyhow::Result<()> {
566        log::debug!(
567            "Unsubscribing from book deltas: {}",
568            unsubscription.instrument_id
569        );
570
571        let ws = self.ws_client.clone();
572        let subscribed_id = Self::product_id(unsubscription.instrument_id);
573        let wire_id = self.resolve_wire_product_id(subscribed_id);
574
575        get_runtime().spawn(async move {
576            if let Err(e) = ws.unsubscribe(CoinbaseWsChannel::Level2, &[wire_id]).await {
577                log::error!("Failed to unsubscribe from book deltas: {e:?}");
578            }
579        });
580
581        Ok(())
582    }
583
584    fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
585        log::debug!(
586            "Unsubscribing from quotes: {}",
587            unsubscription.instrument_id
588        );
589
590        let ws = self.ws_client.clone();
591        let subscribed_id = Self::product_id(unsubscription.instrument_id);
592        let wire_id = self.resolve_wire_product_id(subscribed_id);
593
594        get_runtime().spawn(async move {
595            if let Err(e) = ws.unsubscribe(CoinbaseWsChannel::Ticker, &[wire_id]).await {
596                log::error!("Failed to unsubscribe from quotes: {e:?}");
597            }
598        });
599
600        Ok(())
601    }
602
603    fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
604        log::debug!(
605            "Unsubscribing from trades: {}",
606            unsubscription.instrument_id
607        );
608
609        let ws = self.ws_client.clone();
610        let subscribed_id = Self::product_id(unsubscription.instrument_id);
611        let wire_id = self.resolve_wire_product_id(subscribed_id);
612
613        get_runtime().spawn(async move {
614            if let Err(e) = ws
615                .unsubscribe(CoinbaseWsChannel::MarketTrades, &[wire_id])
616                .await
617            {
618                log::error!("Failed to unsubscribe from trades: {e:?}");
619            }
620        });
621
622        Ok(())
623    }
624
625    fn unsubscribe_mark_prices(&mut self, _cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
626        Ok(())
627    }
628
629    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
630        log::debug!("Unsubscribing from index prices: {}", cmd.instrument_id);
631        self.deriv_polls.unsubscribe_index(cmd.instrument_id);
632        Ok(())
633    }
634
635    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
636        log::debug!("Unsubscribing from funding rates: {}", cmd.instrument_id);
637        self.deriv_polls.unsubscribe_funding(cmd.instrument_id);
638        Ok(())
639    }
640
641    fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
642        log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
643
644        let instrument_id = unsubscription.bar_type.instrument_id();
645        let subscribed_id = Self::product_id(instrument_id);
646        let wire_id = self.resolve_wire_product_id(subscribed_id);
647        let ws = self.ws_client.clone();
648
649        get_runtime().spawn(async move {
650            if let Err(e) = ws.unsubscribe(CoinbaseWsChannel::Candles, &[wire_id]).await {
651                log::error!("Failed to unsubscribe from bars: {e:?}");
652            }
653        });
654
655        Ok(())
656    }
657
658    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
659        log::debug!("Requesting all instruments");
660
661        let provider = self.provider.clone();
662        let sender = self.data_sender.clone();
663        let instruments_cache = self.instruments.clone();
664        let ws = self.ws_client.clone();
665        let request_id = request.request_id;
666        let client_id = request.client_id.unwrap_or(self.client_id);
667        let venue = Self::venue(self);
668        let start_nanos = datetime_to_unix_nanos(request.start);
669        let end_nanos = datetime_to_unix_nanos(request.end);
670        let params = request.params;
671        let clock = self.clock;
672
673        get_runtime().spawn(async move {
674            match provider.load_all().await {
675                Ok(instruments) => {
676                    instruments_cache.rcu(|m| {
677                        for instrument in &instruments {
678                            m.insert(instrument.id(), instrument.clone());
679                        }
680                    });
681
682                    for instrument in &instruments {
683                        ws.update_instrument(instrument.clone()).await;
684                    }
685
686                    let response = DataResponse::Instruments(InstrumentsResponse::new(
687                        request_id,
688                        client_id,
689                        venue,
690                        instruments,
691                        start_nanos,
692                        end_nanos,
693                        clock.get_time_ns(),
694                        params,
695                    ));
696
697                    if let Err(e) = sender.send(DataEvent::Response(response)) {
698                        log::error!("Failed to send instruments response: {e}");
699                    }
700                }
701                Err(e) => {
702                    log::error!("Failed to fetch instruments: {e:?}");
703                }
704            }
705        });
706
707        Ok(())
708    }
709
710    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
711        log::debug!("Requesting instrument: {}", request.instrument_id);
712
713        let provider = self.provider.clone();
714        let sender = self.data_sender.clone();
715        let instruments_cache = self.instruments.clone();
716        let ws = self.ws_client.clone();
717        let instrument_id = request.instrument_id;
718        let product_id = instrument_id.symbol.to_string();
719        let request_id = request.request_id;
720        let client_id = request.client_id.unwrap_or(self.client_id);
721        let start_nanos = datetime_to_unix_nanos(request.start);
722        let end_nanos = datetime_to_unix_nanos(request.end);
723        let params = request.params;
724        let clock = self.clock;
725
726        get_runtime().spawn(async move {
727            match provider.load(&product_id).await {
728                Ok(instrument) => {
729                    instruments_cache.rcu(|m| {
730                        m.insert(instrument.id(), instrument.clone());
731                    });
732                    ws.update_instrument(instrument.clone()).await;
733
734                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
735                        request_id,
736                        client_id,
737                        instrument.id(),
738                        instrument,
739                        start_nanos,
740                        end_nanos,
741                        clock.get_time_ns(),
742                        params,
743                    )));
744
745                    if let Err(e) = sender.send(DataEvent::Response(response)) {
746                        log::error!("Failed to send instrument response: {e}");
747                    }
748                }
749                Err(e) => {
750                    log::error!("Failed to fetch instrument {instrument_id}: {e:?}");
751                }
752            }
753        });
754
755        Ok(())
756    }
757
758    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
759        let instrument_id = request.instrument_id;
760        let product_id = instrument_id.symbol.to_string();
761
762        let instruments = self.instruments.load();
763        let instrument = instruments
764            .get(&instrument_id)
765            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
766        let price_precision = instrument.price_precision();
767        let size_precision = instrument.size_precision();
768        let depth = request.depth.map(|d| d.get() as u32);
769
770        let http = self.http_client.clone();
771        let sender = self.data_sender.clone();
772        let client_id = request.client_id.unwrap_or(self.client_id);
773        let request_id = request.request_id;
774        let params = request.params;
775        let clock = self.clock;
776
777        get_runtime().spawn(async move {
778            match http.get_product_book(&product_id, depth).await {
779                Ok(json) => {
780                    let pricebook_value = json.get("pricebook").cloned().unwrap_or(json);
781
782                    let pricebook: PriceBook = match serde_json::from_value(pricebook_value) {
783                        Ok(b) => b,
784                        Err(e) => {
785                            log::error!("Failed to parse product book: {e}");
786                            return;
787                        }
788                    };
789
790                    let ts_init = clock.get_time_ns();
791
792                    match parse_product_book_snapshot(
793                        &pricebook,
794                        instrument_id,
795                        price_precision,
796                        size_precision,
797                        ts_init,
798                    ) {
799                        Ok(deltas) => {
800                            let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
801
802                            for delta in &deltas.deltas {
803                                if delta.order.side != OrderSide::NoOrderSide {
804                                    book.add(
805                                        delta.order,
806                                        delta.flags,
807                                        delta.sequence,
808                                        delta.ts_event,
809                                    );
810                                }
811                            }
812
813                            let response = DataResponse::Book(BookResponse::new(
814                                request_id,
815                                client_id,
816                                instrument_id,
817                                book,
818                                None,
819                                None,
820                                clock.get_time_ns(),
821                                params,
822                            ));
823
824                            if let Err(e) = sender.send(DataEvent::Response(response)) {
825                                log::error!("Failed to send book snapshot response: {e}");
826                            }
827                        }
828                        Err(e) => {
829                            log::error!("Failed to parse book snapshot for {instrument_id}: {e}");
830                        }
831                    }
832                }
833                Err(e) => {
834                    log::error!("Book snapshot request failed for {instrument_id}: {e:?}");
835                }
836            }
837        });
838
839        Ok(())
840    }
841
842    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
843        log::debug!("Requesting trades for {}", request.instrument_id);
844
845        let instrument_id = request.instrument_id;
846        let product_id = instrument_id.symbol.to_string();
847
848        let instruments = self.instruments.load();
849        let instrument = instruments
850            .get(&instrument_id)
851            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
852        let price_precision = instrument.price_precision();
853        let size_precision = instrument.size_precision();
854
855        let http = self.http_client.clone();
856        let sender = self.data_sender.clone();
857        let request_id = request.request_id;
858        let client_id = request.client_id.unwrap_or(self.client_id);
859        let limit = request.limit.map_or(100, |n| n.get() as u32);
860        let start_nanos = datetime_to_unix_nanos(request.start);
861        let end_nanos = datetime_to_unix_nanos(request.end);
862        let params = request.params;
863        let clock = self.clock;
864
865        get_runtime().spawn(async move {
866            match http.get_market_trades(&product_id, limit).await {
867                Ok(json) => {
868                    let ticker: TickerResponse = match serde_json::from_value(json) {
869                        Ok(r) => r,
870                        Err(e) => {
871                            log::error!("Failed to parse trades response: {e}");
872                            return;
873                        }
874                    };
875
876                    let ts_init = clock.get_time_ns();
877                    let mut trades: Vec<_> = ticker
878                        .trades
879                        .iter()
880                        .filter_map(|trade| {
881                            parse_trade_tick(
882                                trade,
883                                instrument_id,
884                                price_precision,
885                                size_precision,
886                                ts_init,
887                            )
888                            .map_err(|e| log::warn!("Failed to parse trade: {e}"))
889                            .ok()
890                        })
891                        .collect();
892
893                    // Coinbase returns newest-first; sort ascending
894                    trades.sort_by_key(|t| t.ts_event);
895
896                    let response = DataResponse::Trades(TradesResponse::new(
897                        request_id,
898                        client_id,
899                        instrument_id,
900                        trades,
901                        start_nanos,
902                        end_nanos,
903                        clock.get_time_ns(),
904                        params,
905                    ));
906
907                    if let Err(e) = sender.send(DataEvent::Response(response)) {
908                        log::error!("Failed to send trades response: {e}");
909                    }
910                }
911                Err(e) => log::error!("Trades request failed for {instrument_id}: {e:?}"),
912            }
913        });
914
915        Ok(())
916    }
917
918    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
919        log::debug!("Requesting bars for {}", request.bar_type);
920
921        let bar_type = request.bar_type;
922        let granularity = bar_type_to_granularity(&bar_type)?;
923        let instrument_id = bar_type.instrument_id();
924        let product_id = instrument_id.symbol.to_string();
925
926        let instruments = self.instruments.load();
927        let instrument = instruments
928            .get(&instrument_id)
929            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
930        let price_precision = instrument.price_precision();
931        let size_precision = instrument.size_precision();
932
933        let http = self.http_client.clone();
934        let sender = self.data_sender.clone();
935        let request_id = request.request_id;
936        let client_id = request.client_id.unwrap_or(self.client_id);
937        let start = request.start;
938        let end = request.end;
939        let limit = request.limit.map(|n| n.get());
940        let start_nanos = datetime_to_unix_nanos(start);
941        let end_nanos = datetime_to_unix_nanos(end);
942        let params = request.params;
943        let clock = self.clock;
944
945        get_runtime().spawn(async move {
946            let now = chrono::Utc::now();
947            let end_secs = end.unwrap_or(now).timestamp().to_string();
948            let start_secs = if let Some(s) = start {
949                s.timestamp().to_string()
950            } else {
951                let spec = bar_type.spec();
952                let step_secs = match spec.aggregation {
953                    BarAggregation::Minute => spec.step.get() as i64 * 60,
954                    BarAggregation::Hour => spec.step.get() as i64 * 3600,
955                    BarAggregation::Day => spec.step.get() as i64 * 86400,
956                    _ => 60,
957                };
958                let count = limit.unwrap_or(300) as i64;
959                let end_ts = end.unwrap_or(now).timestamp();
960                (end_ts - count * step_secs).to_string()
961            };
962
963            let granularity_str = granularity.to_string();
964
965            match http
966                .get_candles(&product_id, &start_secs, &end_secs, &granularity_str)
967                .await
968            {
969                Ok(json) => {
970                    let candles_response: CandlesResponse = match serde_json::from_value(json) {
971                        Ok(r) => r,
972                        Err(e) => {
973                            log::error!("Failed to parse candles response: {e}");
974                            return;
975                        }
976                    };
977
978                    let ts_init = clock.get_time_ns();
979                    let mut bars: Vec<_> = candles_response
980                        .candles
981                        .iter()
982                        .filter_map(|candle| {
983                            parse_bar(candle, bar_type, price_precision, size_precision, ts_init)
984                                .map_err(|e| log::warn!("Failed to parse bar: {e}"))
985                                .ok()
986                        })
987                        .collect();
988
989                    bars.sort_by_key(|b| b.ts_event);
990
991                    if let Some(limit) = limit
992                        && bars.len() > limit
993                    {
994                        bars.drain(..bars.len() - limit);
995                    }
996
997                    let response = DataResponse::Bars(BarsResponse::new(
998                        request_id,
999                        client_id,
1000                        bar_type,
1001                        bars,
1002                        start_nanos,
1003                        end_nanos,
1004                        clock.get_time_ns(),
1005                        params,
1006                    ));
1007
1008                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1009                        log::error!("Failed to send bars response: {e}");
1010                    }
1011                }
1012                Err(e) => log::error!("Bar request failed: {e:?}"),
1013            }
1014        });
1015
1016        Ok(())
1017    }
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022    use nautilus_common::{
1023        live::runner::set_data_event_sender, messages::data::SubscribeMarkPrices,
1024    };
1025    use nautilus_core::{UUID4, UnixNanos};
1026    use nautilus_model::identifiers::InstrumentId;
1027    use rstest::rstest;
1028
1029    use super::*;
1030
1031    // Coinbase Advanced Trade does not publish live mark prices for its
1032    // perpetuals, so `subscribe_mark_prices` must return an explicit error
1033    // naming the instrument and mentioning mark prices. A regression that
1034    // silently `Ok(())`s the call would mask the unsupported feature.
1035    #[rstest]
1036    #[tokio::test]
1037    async fn test_subscribe_mark_prices_rejects_with_explicit_error() {
1038        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
1039        set_data_event_sender(tx);
1040
1041        let config = CoinbaseDataClientConfig::default();
1042        let mut client = CoinbaseDataClient::new(ClientId::new("COINBASE"), config)
1043            .expect("client construction");
1044
1045        let instrument_id = InstrumentId::from("BIP-20DEC30-CDE.COINBASE");
1046        let cmd = SubscribeMarkPrices::new(
1047            instrument_id,
1048            Some(ClientId::new("COINBASE")),
1049            None,
1050            UUID4::new(),
1051            UnixNanos::default(),
1052            None,
1053            None,
1054        );
1055
1056        let err = client
1057            .subscribe_mark_prices(cmd)
1058            .expect_err("must reject mark-price subscriptions");
1059        let msg = err.to_string();
1060        assert!(
1061            msg.contains("mark prices"),
1062            "error must mention mark prices, was: {msg}"
1063        );
1064        assert!(
1065            msg.contains("BIP-20DEC30-CDE.COINBASE"),
1066            "error must name the instrument, was: {msg}"
1067        );
1068    }
1069}