Skip to main content

nautilus_coinbase/
provider.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Instrument provider for loading and caching Coinbase instruments.
17
18use std::sync::Arc;
19
20use nautilus_core::AtomicMap;
21use nautilus_model::{
22    identifiers::InstrumentId,
23    instruments::{Instrument, InstrumentAny},
24};
25
26use crate::{
27    common::enums::{CoinbaseFuturesAssetType, CoinbaseProductType},
28    http::{
29        client::CoinbaseHttpClient,
30        models::{Product, ProductsResponse},
31        parse::{is_perpetual_product, parse_instrument},
32    },
33};
34
35/// Loads and caches Coinbase instruments.
36///
37/// Wraps a [`CoinbaseHttpClient`] and provides methods for loading instruments
38/// from the REST API or from pre-fetched JSON responses. Parsed instruments are
39/// cached in the HTTP client's shared `AtomicMap`.
40#[derive(Debug, Clone)]
41pub struct CoinbaseInstrumentProvider {
42    client: CoinbaseHttpClient,
43}
44
45impl CoinbaseInstrumentProvider {
46    /// Creates a new [`CoinbaseInstrumentProvider`].
47    #[must_use]
48    pub fn new(client: CoinbaseHttpClient) -> Self {
49        Self { client }
50    }
51
52    /// Returns a reference to the instrument cache.
53    #[must_use]
54    pub fn instruments(&self) -> &Arc<AtomicMap<InstrumentId, InstrumentAny>> {
55        self.client.instruments()
56    }
57
58    /// Returns the number of cached instruments.
59    #[must_use]
60    pub fn count(&self) -> usize {
61        self.client.instruments().len()
62    }
63
64    /// Returns a cached instrument by ID, if present.
65    #[must_use]
66    pub fn get(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
67        self.client.instruments().get_cloned(instrument_id)
68    }
69
70    /// Loads all instruments from the Coinbase REST API and caches them.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if the HTTP request fails or the response cannot be parsed.
75    pub async fn load_all(&self) -> anyhow::Result<Vec<InstrumentAny>> {
76        let json = self
77            .client
78            .get_products()
79            .await
80            .map_err(|e| anyhow::anyhow!("Failed to fetch products: {e}"))?;
81
82        self.load_from_products_response(&json)
83    }
84
85    /// Loads all instruments of a specific product type from the REST API.
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if the HTTP request fails or the response cannot be parsed.
90    pub async fn load_all_filtered(
91        &self,
92        product_type: CoinbaseProductType,
93    ) -> anyhow::Result<Vec<InstrumentAny>> {
94        let json = self
95            .client
96            .get_products()
97            .await
98            .map_err(|e| anyhow::anyhow!("Failed to fetch products: {e}"))?;
99
100        self.load_from_products_response_filtered(&json, product_type)
101    }
102
103    /// Loads a single instrument by product ID from the REST API.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the HTTP request fails or the response cannot be parsed.
108    pub async fn load(&self, product_id: &str) -> anyhow::Result<InstrumentAny> {
109        let json = self
110            .client
111            .get_product(product_id)
112            .await
113            .map_err(|e| anyhow::anyhow!("Failed to fetch product '{product_id}': {e}"))?;
114
115        self.load_from_product_response(&json)
116    }
117
118    /// Parses a products list response and caches the instruments.
119    ///
120    /// Expects the JSON shape returned by `GET /products`: `{"products": [...]}`.
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if the JSON cannot be deserialized or any product fails to parse.
125    pub fn load_from_products_response(
126        &self,
127        json: &serde_json::Value,
128    ) -> anyhow::Result<Vec<InstrumentAny>> {
129        let response: ProductsResponse =
130            serde_json::from_value(json.clone()).map_err(|e| anyhow::anyhow!("{e}"))?;
131
132        let instruments = self.parse_and_cache_products(&response.products)?;
133        // Populate the alias map so subscribe paths (which only see the parsed
134        // `InstrumentAny`) can resolve a caller-supplied product id back to the
135        // canonical id Coinbase uses on the wire.
136        self.client.record_product_aliases(&response.products);
137        Ok(instruments)
138    }
139
140    /// Parses a products list response, filtering by product type, and caches the instruments.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the JSON cannot be deserialized or any product fails to parse.
145    pub fn load_from_products_response_filtered(
146        &self,
147        json: &serde_json::Value,
148        product_type: CoinbaseProductType,
149    ) -> anyhow::Result<Vec<InstrumentAny>> {
150        let response: ProductsResponse =
151            serde_json::from_value(json.clone()).map_err(|e| anyhow::anyhow!("{e}"))?;
152
153        let filtered: Vec<&Product> = response
154            .products
155            .iter()
156            .filter(|p| p.product_type == product_type)
157            .collect();
158
159        let instruments = self.parse_and_cache_product_refs(&filtered)?;
160        // Filtering throws away non-matching products, but their alias
161        // metadata is independent of the type filter and should still be
162        // recorded so subsequent subscribes can resolve aliased pairs.
163        self.client.record_product_aliases(&response.products);
164        Ok(instruments)
165    }
166
167    /// Parses a single product response and caches the instrument.
168    ///
169    /// Expects the JSON shape returned by `GET /products/{product_id}`.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the JSON cannot be deserialized or the product fails to parse.
174    pub fn load_from_product_response(
175        &self,
176        json: &serde_json::Value,
177    ) -> anyhow::Result<InstrumentAny> {
178        let product: Product =
179            serde_json::from_value(json.clone()).map_err(|e| anyhow::anyhow!("{e}"))?;
180
181        anyhow::ensure!(
182            is_supported_product(&product),
183            "Unsupported product '{}' (type={}, non_crypto={})",
184            product.product_id,
185            product.product_type,
186            product
187                .future_product_details
188                .as_ref()
189                .is_some_and(|d| d.non_crypto),
190        );
191
192        let ts_init = self.client.ts_now();
193        let instrument = parse_instrument(&product, ts_init)?;
194
195        self.cache_instrument(&instrument);
196        self.client
197            .record_product_aliases(std::slice::from_ref(&product));
198
199        Ok(instrument)
200    }
201
202    fn parse_and_cache_products(&self, products: &[Product]) -> anyhow::Result<Vec<InstrumentAny>> {
203        let ts_init = self.client.ts_now();
204        let mut instruments = Vec::with_capacity(products.len());
205
206        for product in products {
207            if !is_supported_product(product) {
208                log::debug!("Skipping unsupported product '{}'", product.product_id);
209                continue;
210            }
211            let instrument = parse_instrument(product, ts_init)?;
212            instruments.push(instrument);
213        }
214
215        self.cache_instruments(&instruments);
216
217        Ok(instruments)
218    }
219
220    fn parse_and_cache_product_refs(
221        &self,
222        products: &[&Product],
223    ) -> anyhow::Result<Vec<InstrumentAny>> {
224        let ts_init = self.client.ts_now();
225        let mut instruments = Vec::with_capacity(products.len());
226
227        for product in products {
228            if !is_supported_product(product) {
229                log::debug!("Skipping unsupported product '{}'", product.product_id);
230                continue;
231            }
232            let instrument = parse_instrument(product, ts_init)?;
233            instruments.push(instrument);
234        }
235
236        self.cache_instruments(&instruments);
237
238        Ok(instruments)
239    }
240
241    fn cache_instrument(&self, instrument: &InstrumentAny) {
242        self.client.instruments().rcu(|m| {
243            m.insert(instrument.id(), instrument.clone());
244        });
245    }
246
247    fn cache_instruments(&self, instruments: &[InstrumentAny]) {
248        self.client.instruments().rcu(|m| {
249            for instrument in instruments {
250                m.insert(instrument.id(), instrument.clone());
251            }
252        });
253    }
254}
255
256/// Returns whether a product is supported for instrument parsing.
257///
258/// Rejects unknown product types and non-crypto futures (energy, metals, stocks).
259fn is_supported_product(product: &Product) -> bool {
260    match product.product_type {
261        CoinbaseProductType::Unknown => false,
262        CoinbaseProductType::Future => {
263            match &product.future_product_details {
264                Some(details) => {
265                    if details.non_crypto {
266                        return false;
267                    }
268
269                    match details.futures_asset_type {
270                        Some(CoinbaseFuturesAssetType::Crypto) | None => true,
271                        Some(_) => false,
272                    }
273                }
274                // Dated futures need contract_expiry from details; perpetuals
275                // can still be parsed via the display_name heuristic
276                None => is_perpetual_product(product),
277            }
278        }
279        CoinbaseProductType::Spot => true,
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use nautilus_model::{instruments::Instrument, types::Quantity};
286    use rstest::rstest;
287    use serde_json::json;
288
289    use super::*;
290    use crate::common::testing::load_test_fixture;
291
292    fn provider() -> CoinbaseInstrumentProvider {
293        CoinbaseInstrumentProvider::new(CoinbaseHttpClient::default())
294    }
295
296    #[rstest]
297    fn test_provider_starts_empty() {
298        let provider = provider();
299        assert_eq!(provider.count(), 0);
300    }
301
302    #[rstest]
303    fn test_load_single_spot_product() {
304        let provider = provider();
305        let json_str = load_test_fixture("http_product.json");
306        let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
307
308        let instrument = provider.load_from_product_response(&json).unwrap();
309
310        assert!(matches!(instrument, InstrumentAny::CurrencyPair(_)));
311        assert_eq!(instrument.id().symbol.as_str(), "BTC-USD");
312        assert_eq!(instrument.id().venue.as_str(), "COINBASE");
313        assert_eq!(provider.count(), 1);
314        assert!(provider.get(&instrument.id()).is_some());
315    }
316
317    #[rstest]
318    fn test_load_spot_products_from_list() {
319        let provider = provider();
320        let json_str = load_test_fixture("http_products.json");
321        let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
322
323        let instruments = provider.load_from_products_response(&json).unwrap();
324
325        assert_eq!(instruments.len(), 2);
326        assert_eq!(provider.count(), 2);
327
328        for inst in &instruments {
329            assert!(matches!(inst, InstrumentAny::CurrencyPair(_)));
330            assert!(provider.get(&inst.id()).is_some());
331        }
332    }
333
334    #[rstest]
335    fn test_load_future_products_distinguishes_perp_and_dated() {
336        let provider = provider();
337        let json_str = load_test_fixture("http_products_future.json");
338        let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
339
340        let instruments = provider.load_from_products_response(&json).unwrap();
341
342        assert_eq!(instruments.len(), 2);
343        assert_eq!(provider.count(), 2);
344
345        assert!(
346            matches!(&instruments[0], InstrumentAny::CryptoPerpetual(_)),
347            "Expected CryptoPerpetual, was {:?}",
348            &instruments[0]
349        );
350        assert!(
351            matches!(&instruments[1], InstrumentAny::CryptoFuture(_)),
352            "Expected CryptoFuture, was {:?}",
353            &instruments[1]
354        );
355    }
356
357    #[rstest]
358    fn test_load_filtered_spot_only() {
359        let provider = provider();
360        let json_str = load_test_fixture("http_products.json");
361        let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
362
363        let instruments = provider
364            .load_from_products_response_filtered(&json, CoinbaseProductType::Spot)
365            .unwrap();
366
367        assert_eq!(instruments.len(), 2);
368        for inst in &instruments {
369            assert!(matches!(inst, InstrumentAny::CurrencyPair(_)));
370        }
371    }
372
373    #[rstest]
374    fn test_load_filtered_future_excludes_spot() {
375        let provider = provider();
376        let json_str = load_test_fixture("http_products.json");
377        let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
378
379        let instruments = provider
380            .load_from_products_response_filtered(&json, CoinbaseProductType::Future)
381            .unwrap();
382
383        assert_eq!(instruments.len(), 0);
384        assert_eq!(provider.count(), 0);
385    }
386
387    #[rstest]
388    fn test_cache_updates_on_reload() {
389        let provider = provider();
390        let json_str = load_test_fixture("http_product.json");
391        let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
392
393        let first = provider.load_from_product_response(&json).unwrap();
394        assert_eq!(provider.count(), 1);
395
396        let second = provider.load_from_product_response(&json).unwrap();
397        assert_eq!(provider.count(), 1);
398        assert_eq!(first.id(), second.id());
399    }
400
401    #[rstest]
402    fn test_cache_accumulates_across_loads() {
403        let provider = provider();
404
405        let spot_json_str = load_test_fixture("http_products.json");
406        let spot_json: serde_json::Value = serde_json::from_str(&spot_json_str).unwrap();
407        provider.load_from_products_response(&spot_json).unwrap();
408        assert_eq!(provider.count(), 2);
409
410        let future_json_str = load_test_fixture("http_products_future.json");
411        let future_json: serde_json::Value = serde_json::from_str(&future_json_str).unwrap();
412        provider.load_from_products_response(&future_json).unwrap();
413        assert_eq!(provider.count(), 4);
414    }
415
416    #[rstest]
417    fn test_get_returns_none_for_missing_instrument() {
418        let provider = provider();
419        let missing_id = InstrumentId::from("MISSING-PAIR.COINBASE");
420        assert!(provider.get(&missing_id).is_none());
421    }
422
423    #[rstest]
424    fn test_load_from_invalid_json_returns_error() {
425        let provider = provider();
426        let invalid = json!({"not_a_product": true});
427        assert!(provider.load_from_product_response(&invalid).is_err());
428    }
429
430    #[rstest]
431    fn test_load_from_invalid_products_response_returns_error() {
432        let provider = provider();
433        let invalid = json!({"not_products": []});
434        assert!(provider.load_from_products_response(&invalid).is_err());
435    }
436
437    #[rstest]
438    fn test_spot_instrument_precision() {
439        let provider = provider();
440        let json_str = load_test_fixture("http_product.json");
441        let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
442
443        let instrument = provider.load_from_product_response(&json).unwrap();
444
445        assert_eq!(instrument.price_precision(), 2);
446        assert_eq!(instrument.size_precision(), 8);
447    }
448
449    #[rstest]
450    fn test_perpetual_instrument_fields() {
451        let provider = provider();
452        let json_str = load_test_fixture("http_products_future.json");
453        let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
454
455        let instruments = provider.load_from_products_response(&json).unwrap();
456        let perp = match &instruments[0] {
457            InstrumentAny::CryptoPerpetual(p) => p,
458            other => panic!("Expected CryptoPerpetual, was {other:?}"),
459        };
460
461        assert_eq!(perp.base_currency().unwrap().code.as_str(), "BTC");
462        assert_eq!(perp.quote_currency().code.as_str(), "USD");
463        assert_eq!(perp.multiplier, Quantity::from("0.01"));
464    }
465
466    #[rstest]
467    fn test_future_instrument_has_expiry() {
468        let provider = provider();
469        let json_str = load_test_fixture("http_products_future.json");
470        let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
471
472        let instruments = provider.load_from_products_response(&json).unwrap();
473        let future = match &instruments[1] {
474            InstrumentAny::CryptoFuture(f) => f,
475            other => panic!("Expected CryptoFuture, was {other:?}"),
476        };
477
478        // 2026-04-24T15:00:00Z
479        assert_eq!(future.expiration_ns.as_u64(), 1_777_042_800_000_000_000);
480        assert_eq!(future.base_currency().unwrap().code.as_str(), "BTC");
481    }
482
483    /// Loads the spot fixture JSON and patches `product_type` and
484    /// `future_product_details` to build a Product with controlled support fields.
485    fn make_product(product_type: &str, future_details: Option<serde_json::Value>) -> Product {
486        let json_str = load_test_fixture("http_product.json");
487        let mut json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
488        json["product_type"] = serde_json::Value::String(product_type.to_string());
489
490        if let Some(details) = future_details {
491            json["future_product_details"] = details;
492        } else {
493            json["future_product_details"] = serde_json::Value::Null;
494        }
495
496        serde_json::from_value(json).unwrap()
497    }
498
499    #[rstest]
500    #[case::spot("SPOT", None, true)]
501    #[case::future_crypto(
502        "FUTURE",
503        Some(json!({"venue":"cde","contract_code":"BIT","contract_expiry":"2026-04-24T15:00:00Z","contract_size":"0.01","contract_root_unit":"BTC","group_description":"","contract_expiry_timezone":"","group_short_description":"","risk_managed_by":"MANAGED_BY_FCM","contract_expiry_type":"EXPIRING","contract_display_name":"","non_crypto":false,"futures_asset_type":"FUTURES_ASSET_TYPE_CRYPTO"})),
504        true
505    )]
506    #[case::future_no_details("FUTURE", None, false)]
507    #[case::future_no_asset_type(
508        "FUTURE",
509        Some(json!({"venue":"cde","contract_code":"BIT","contract_expiry":"2026-04-24T15:00:00Z","contract_size":"0.01","contract_root_unit":"BTC","group_description":"","contract_expiry_timezone":"","group_short_description":"","risk_managed_by":"MANAGED_BY_FCM","contract_expiry_type":"EXPIRING","contract_display_name":"","non_crypto":false})),
510        true
511    )]
512    #[case::unknown("UNKNOWN_PRODUCT_TYPE", None, false)]
513    #[case::future_non_crypto(
514        "FUTURE",
515        Some(json!({"venue":"cde","contract_code":"BIT","contract_expiry":"2026-04-24T15:00:00Z","contract_size":"0.01","contract_root_unit":"BTC","group_description":"","contract_expiry_timezone":"","group_short_description":"","risk_managed_by":"MANAGED_BY_FCM","contract_expiry_type":"EXPIRING","contract_display_name":"","non_crypto":true,"futures_asset_type":"FUTURES_ASSET_TYPE_CRYPTO"})),
516        false
517    )]
518    #[case::future_energy(
519        "FUTURE",
520        Some(json!({"venue":"cde","contract_code":"OIL","contract_expiry":"2026-04-24T15:00:00Z","contract_size":"1","contract_root_unit":"OIL","group_description":"","contract_expiry_timezone":"","group_short_description":"","risk_managed_by":"MANAGED_BY_FCM","contract_expiry_type":"EXPIRING","contract_display_name":"","non_crypto":false,"futures_asset_type":"FUTURES_ASSET_TYPE_ENERGY"})),
521        false
522    )]
523    #[case::future_metals(
524        "FUTURE",
525        Some(json!({"venue":"cde","contract_code":"GLD","contract_expiry":"2026-04-24T15:00:00Z","contract_size":"1","contract_root_unit":"GLD","group_description":"","contract_expiry_timezone":"","group_short_description":"","risk_managed_by":"MANAGED_BY_FCM","contract_expiry_type":"EXPIRING","contract_display_name":"","non_crypto":false,"futures_asset_type":"FUTURES_ASSET_TYPE_METALS"})),
526        false
527    )]
528    #[case::future_stocks(
529        "FUTURE",
530        Some(json!({"venue":"cde","contract_code":"SPX","contract_expiry":"2026-04-24T15:00:00Z","contract_size":"1","contract_root_unit":"SPX","group_description":"","contract_expiry_timezone":"","group_short_description":"","risk_managed_by":"MANAGED_BY_FCM","contract_expiry_type":"EXPIRING","contract_display_name":"","non_crypto":false,"futures_asset_type":"FUTURES_ASSET_TYPE_STOCKS"})),
531        false
532    )]
533    fn test_is_supported_product(
534        #[case] product_type: &str,
535        #[case] future_details: Option<serde_json::Value>,
536        #[case] expected: bool,
537    ) {
538        let product = make_product(product_type, future_details);
539        assert_eq!(is_supported_product(&product), expected);
540    }
541
542    #[rstest]
543    fn test_load_from_product_response_rejects_unsupported() {
544        let provider = provider();
545        let json_str = load_test_fixture("http_product.json");
546        let mut json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
547        json["product_type"] = serde_json::Value::String("OPTIONS".to_string());
548
549        let result = provider.load_from_product_response(&json);
550
551        assert!(result.is_err());
552        let err_msg = result.unwrap_err().to_string();
553        assert!(
554            err_msg.contains("Unsupported product"),
555            "Expected 'Unsupported product' in error, was: {err_msg}"
556        );
557    }
558
559    #[rstest]
560    fn test_future_no_details_with_perp_display_name_is_supported() {
561        let json_str = load_test_fixture("http_product.json");
562        let mut json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
563        json["product_type"] = serde_json::Value::String("FUTURE".to_string());
564        json["display_name"] = serde_json::Value::String("BTC PERP".to_string());
565        json["future_product_details"] = serde_json::Value::Null;
566
567        let product: Product = serde_json::from_value(json).unwrap();
568
569        assert!(is_supported_product(&product));
570    }
571
572    #[rstest]
573    fn test_future_no_details_without_perp_display_name_is_unsupported() {
574        let product = make_product("FUTURE", None);
575        assert!(!is_supported_product(&product));
576    }
577
578    #[rstest]
579    fn test_bulk_load_skips_unsupported_products() {
580        let provider = provider();
581        let spot_json_str = load_test_fixture("http_product.json");
582        let spot_json: serde_json::Value = serde_json::from_str(&spot_json_str).unwrap();
583
584        let mut unknown_json = spot_json.clone();
585        unknown_json["product_id"] = serde_json::Value::String("UNKNOWN-PAIR".to_string());
586        unknown_json["product_type"] = serde_json::Value::String("OPTIONS".to_string());
587
588        let response = json!({
589            "products": [spot_json, unknown_json],
590            "num_products": 2
591        });
592
593        let instruments = provider.load_from_products_response(&response).unwrap();
594
595        assert_eq!(instruments.len(), 1);
596        assert_eq!(instruments[0].id().symbol.as_str(), "BTC-USD");
597        assert_eq!(provider.count(), 1);
598    }
599}