1use std::sync::Arc;
19
20use nautilus_core::AtomicMap;
21use nautilus_model::{
22 identifiers::InstrumentId,
23 instruments::{Instrument, InstrumentAny},
24};
25
26use crate::{
27 common::enums::{CoinbaseFuturesAssetType, CoinbaseProductType},
28 http::{
29 client::CoinbaseHttpClient,
30 models::{Product, ProductsResponse},
31 parse::{is_perpetual_product, parse_instrument},
32 },
33};
34
35#[derive(Debug, Clone)]
41pub struct CoinbaseInstrumentProvider {
42 client: CoinbaseHttpClient,
43}
44
45impl CoinbaseInstrumentProvider {
46 #[must_use]
48 pub fn new(client: CoinbaseHttpClient) -> Self {
49 Self { client }
50 }
51
52 #[must_use]
54 pub fn instruments(&self) -> &Arc<AtomicMap<InstrumentId, InstrumentAny>> {
55 self.client.instruments()
56 }
57
58 #[must_use]
60 pub fn count(&self) -> usize {
61 self.client.instruments().len()
62 }
63
64 #[must_use]
66 pub fn get(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
67 self.client.instruments().get_cloned(instrument_id)
68 }
69
70 pub async fn load_all(&self) -> anyhow::Result<Vec<InstrumentAny>> {
76 let json = self
77 .client
78 .get_products()
79 .await
80 .map_err(|e| anyhow::anyhow!("Failed to fetch products: {e}"))?;
81
82 self.load_from_products_response(&json)
83 }
84
85 pub async fn load_all_filtered(
91 &self,
92 product_type: CoinbaseProductType,
93 ) -> anyhow::Result<Vec<InstrumentAny>> {
94 let json = self
95 .client
96 .get_products()
97 .await
98 .map_err(|e| anyhow::anyhow!("Failed to fetch products: {e}"))?;
99
100 self.load_from_products_response_filtered(&json, product_type)
101 }
102
103 pub async fn load(&self, product_id: &str) -> anyhow::Result<InstrumentAny> {
109 let json = self
110 .client
111 .get_product(product_id)
112 .await
113 .map_err(|e| anyhow::anyhow!("Failed to fetch product '{product_id}': {e}"))?;
114
115 self.load_from_product_response(&json)
116 }
117
118 pub fn load_from_products_response(
126 &self,
127 json: &serde_json::Value,
128 ) -> anyhow::Result<Vec<InstrumentAny>> {
129 let response: ProductsResponse =
130 serde_json::from_value(json.clone()).map_err(|e| anyhow::anyhow!("{e}"))?;
131
132 let instruments = self.parse_and_cache_products(&response.products)?;
133 self.client.record_product_aliases(&response.products);
137 Ok(instruments)
138 }
139
140 pub fn load_from_products_response_filtered(
146 &self,
147 json: &serde_json::Value,
148 product_type: CoinbaseProductType,
149 ) -> anyhow::Result<Vec<InstrumentAny>> {
150 let response: ProductsResponse =
151 serde_json::from_value(json.clone()).map_err(|e| anyhow::anyhow!("{e}"))?;
152
153 let filtered: Vec<&Product> = response
154 .products
155 .iter()
156 .filter(|p| p.product_type == product_type)
157 .collect();
158
159 let instruments = self.parse_and_cache_product_refs(&filtered)?;
160 self.client.record_product_aliases(&response.products);
164 Ok(instruments)
165 }
166
167 pub fn load_from_product_response(
175 &self,
176 json: &serde_json::Value,
177 ) -> anyhow::Result<InstrumentAny> {
178 let product: Product =
179 serde_json::from_value(json.clone()).map_err(|e| anyhow::anyhow!("{e}"))?;
180
181 anyhow::ensure!(
182 is_supported_product(&product),
183 "Unsupported product '{}' (type={}, non_crypto={})",
184 product.product_id,
185 product.product_type,
186 product
187 .future_product_details
188 .as_ref()
189 .is_some_and(|d| d.non_crypto),
190 );
191
192 let ts_init = self.client.ts_now();
193 let instrument = parse_instrument(&product, ts_init)?;
194
195 self.cache_instrument(&instrument);
196 self.client
197 .record_product_aliases(std::slice::from_ref(&product));
198
199 Ok(instrument)
200 }
201
202 fn parse_and_cache_products(&self, products: &[Product]) -> anyhow::Result<Vec<InstrumentAny>> {
203 let ts_init = self.client.ts_now();
204 let mut instruments = Vec::with_capacity(products.len());
205
206 for product in products {
207 if !is_supported_product(product) {
208 log::debug!("Skipping unsupported product '{}'", product.product_id);
209 continue;
210 }
211 let instrument = parse_instrument(product, ts_init)?;
212 instruments.push(instrument);
213 }
214
215 self.cache_instruments(&instruments);
216
217 Ok(instruments)
218 }
219
220 fn parse_and_cache_product_refs(
221 &self,
222 products: &[&Product],
223 ) -> anyhow::Result<Vec<InstrumentAny>> {
224 let ts_init = self.client.ts_now();
225 let mut instruments = Vec::with_capacity(products.len());
226
227 for product in products {
228 if !is_supported_product(product) {
229 log::debug!("Skipping unsupported product '{}'", product.product_id);
230 continue;
231 }
232 let instrument = parse_instrument(product, ts_init)?;
233 instruments.push(instrument);
234 }
235
236 self.cache_instruments(&instruments);
237
238 Ok(instruments)
239 }
240
241 fn cache_instrument(&self, instrument: &InstrumentAny) {
242 self.client.instruments().rcu(|m| {
243 m.insert(instrument.id(), instrument.clone());
244 });
245 }
246
247 fn cache_instruments(&self, instruments: &[InstrumentAny]) {
248 self.client.instruments().rcu(|m| {
249 for instrument in instruments {
250 m.insert(instrument.id(), instrument.clone());
251 }
252 });
253 }
254}
255
256fn is_supported_product(product: &Product) -> bool {
260 match product.product_type {
261 CoinbaseProductType::Unknown => false,
262 CoinbaseProductType::Future => {
263 match &product.future_product_details {
264 Some(details) => {
265 if details.non_crypto {
266 return false;
267 }
268
269 match details.futures_asset_type {
270 Some(CoinbaseFuturesAssetType::Crypto) | None => true,
271 Some(_) => false,
272 }
273 }
274 None => is_perpetual_product(product),
277 }
278 }
279 CoinbaseProductType::Spot => true,
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use nautilus_model::{instruments::Instrument, types::Quantity};
286 use rstest::rstest;
287 use serde_json::json;
288
289 use super::*;
290 use crate::common::testing::load_test_fixture;
291
292 fn provider() -> CoinbaseInstrumentProvider {
293 CoinbaseInstrumentProvider::new(CoinbaseHttpClient::default())
294 }
295
296 #[rstest]
297 fn test_provider_starts_empty() {
298 let provider = provider();
299 assert_eq!(provider.count(), 0);
300 }
301
302 #[rstest]
303 fn test_load_single_spot_product() {
304 let provider = provider();
305 let json_str = load_test_fixture("http_product.json");
306 let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
307
308 let instrument = provider.load_from_product_response(&json).unwrap();
309
310 assert!(matches!(instrument, InstrumentAny::CurrencyPair(_)));
311 assert_eq!(instrument.id().symbol.as_str(), "BTC-USD");
312 assert_eq!(instrument.id().venue.as_str(), "COINBASE");
313 assert_eq!(provider.count(), 1);
314 assert!(provider.get(&instrument.id()).is_some());
315 }
316
317 #[rstest]
318 fn test_load_spot_products_from_list() {
319 let provider = provider();
320 let json_str = load_test_fixture("http_products.json");
321 let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
322
323 let instruments = provider.load_from_products_response(&json).unwrap();
324
325 assert_eq!(instruments.len(), 2);
326 assert_eq!(provider.count(), 2);
327
328 for inst in &instruments {
329 assert!(matches!(inst, InstrumentAny::CurrencyPair(_)));
330 assert!(provider.get(&inst.id()).is_some());
331 }
332 }
333
334 #[rstest]
335 fn test_load_future_products_distinguishes_perp_and_dated() {
336 let provider = provider();
337 let json_str = load_test_fixture("http_products_future.json");
338 let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
339
340 let instruments = provider.load_from_products_response(&json).unwrap();
341
342 assert_eq!(instruments.len(), 2);
343 assert_eq!(provider.count(), 2);
344
345 assert!(
346 matches!(&instruments[0], InstrumentAny::CryptoPerpetual(_)),
347 "Expected CryptoPerpetual, was {:?}",
348 &instruments[0]
349 );
350 assert!(
351 matches!(&instruments[1], InstrumentAny::CryptoFuture(_)),
352 "Expected CryptoFuture, was {:?}",
353 &instruments[1]
354 );
355 }
356
357 #[rstest]
358 fn test_load_filtered_spot_only() {
359 let provider = provider();
360 let json_str = load_test_fixture("http_products.json");
361 let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
362
363 let instruments = provider
364 .load_from_products_response_filtered(&json, CoinbaseProductType::Spot)
365 .unwrap();
366
367 assert_eq!(instruments.len(), 2);
368 for inst in &instruments {
369 assert!(matches!(inst, InstrumentAny::CurrencyPair(_)));
370 }
371 }
372
373 #[rstest]
374 fn test_load_filtered_future_excludes_spot() {
375 let provider = provider();
376 let json_str = load_test_fixture("http_products.json");
377 let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
378
379 let instruments = provider
380 .load_from_products_response_filtered(&json, CoinbaseProductType::Future)
381 .unwrap();
382
383 assert_eq!(instruments.len(), 0);
384 assert_eq!(provider.count(), 0);
385 }
386
387 #[rstest]
388 fn test_cache_updates_on_reload() {
389 let provider = provider();
390 let json_str = load_test_fixture("http_product.json");
391 let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
392
393 let first = provider.load_from_product_response(&json).unwrap();
394 assert_eq!(provider.count(), 1);
395
396 let second = provider.load_from_product_response(&json).unwrap();
397 assert_eq!(provider.count(), 1);
398 assert_eq!(first.id(), second.id());
399 }
400
401 #[rstest]
402 fn test_cache_accumulates_across_loads() {
403 let provider = provider();
404
405 let spot_json_str = load_test_fixture("http_products.json");
406 let spot_json: serde_json::Value = serde_json::from_str(&spot_json_str).unwrap();
407 provider.load_from_products_response(&spot_json).unwrap();
408 assert_eq!(provider.count(), 2);
409
410 let future_json_str = load_test_fixture("http_products_future.json");
411 let future_json: serde_json::Value = serde_json::from_str(&future_json_str).unwrap();
412 provider.load_from_products_response(&future_json).unwrap();
413 assert_eq!(provider.count(), 4);
414 }
415
416 #[rstest]
417 fn test_get_returns_none_for_missing_instrument() {
418 let provider = provider();
419 let missing_id = InstrumentId::from("MISSING-PAIR.COINBASE");
420 assert!(provider.get(&missing_id).is_none());
421 }
422
423 #[rstest]
424 fn test_load_from_invalid_json_returns_error() {
425 let provider = provider();
426 let invalid = json!({"not_a_product": true});
427 assert!(provider.load_from_product_response(&invalid).is_err());
428 }
429
430 #[rstest]
431 fn test_load_from_invalid_products_response_returns_error() {
432 let provider = provider();
433 let invalid = json!({"not_products": []});
434 assert!(provider.load_from_products_response(&invalid).is_err());
435 }
436
437 #[rstest]
438 fn test_spot_instrument_precision() {
439 let provider = provider();
440 let json_str = load_test_fixture("http_product.json");
441 let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
442
443 let instrument = provider.load_from_product_response(&json).unwrap();
444
445 assert_eq!(instrument.price_precision(), 2);
446 assert_eq!(instrument.size_precision(), 8);
447 }
448
449 #[rstest]
450 fn test_perpetual_instrument_fields() {
451 let provider = provider();
452 let json_str = load_test_fixture("http_products_future.json");
453 let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
454
455 let instruments = provider.load_from_products_response(&json).unwrap();
456 let perp = match &instruments[0] {
457 InstrumentAny::CryptoPerpetual(p) => p,
458 other => panic!("Expected CryptoPerpetual, was {other:?}"),
459 };
460
461 assert_eq!(perp.base_currency().unwrap().code.as_str(), "BTC");
462 assert_eq!(perp.quote_currency().code.as_str(), "USD");
463 assert_eq!(perp.multiplier, Quantity::from("0.01"));
464 }
465
466 #[rstest]
467 fn test_future_instrument_has_expiry() {
468 let provider = provider();
469 let json_str = load_test_fixture("http_products_future.json");
470 let json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
471
472 let instruments = provider.load_from_products_response(&json).unwrap();
473 let future = match &instruments[1] {
474 InstrumentAny::CryptoFuture(f) => f,
475 other => panic!("Expected CryptoFuture, was {other:?}"),
476 };
477
478 assert_eq!(future.expiration_ns.as_u64(), 1_777_042_800_000_000_000);
480 assert_eq!(future.base_currency().unwrap().code.as_str(), "BTC");
481 }
482
483 fn make_product(product_type: &str, future_details: Option<serde_json::Value>) -> Product {
486 let json_str = load_test_fixture("http_product.json");
487 let mut json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
488 json["product_type"] = serde_json::Value::String(product_type.to_string());
489
490 if let Some(details) = future_details {
491 json["future_product_details"] = details;
492 } else {
493 json["future_product_details"] = serde_json::Value::Null;
494 }
495
496 serde_json::from_value(json).unwrap()
497 }
498
499 #[rstest]
500 #[case::spot("SPOT", None, true)]
501 #[case::future_crypto(
502 "FUTURE",
503 Some(json!({"venue":"cde","contract_code":"BIT","contract_expiry":"2026-04-24T15:00:00Z","contract_size":"0.01","contract_root_unit":"BTC","group_description":"","contract_expiry_timezone":"","group_short_description":"","risk_managed_by":"MANAGED_BY_FCM","contract_expiry_type":"EXPIRING","contract_display_name":"","non_crypto":false,"futures_asset_type":"FUTURES_ASSET_TYPE_CRYPTO"})),
504 true
505 )]
506 #[case::future_no_details("FUTURE", None, false)]
507 #[case::future_no_asset_type(
508 "FUTURE",
509 Some(json!({"venue":"cde","contract_code":"BIT","contract_expiry":"2026-04-24T15:00:00Z","contract_size":"0.01","contract_root_unit":"BTC","group_description":"","contract_expiry_timezone":"","group_short_description":"","risk_managed_by":"MANAGED_BY_FCM","contract_expiry_type":"EXPIRING","contract_display_name":"","non_crypto":false})),
510 true
511 )]
512 #[case::unknown("UNKNOWN_PRODUCT_TYPE", None, false)]
513 #[case::future_non_crypto(
514 "FUTURE",
515 Some(json!({"venue":"cde","contract_code":"BIT","contract_expiry":"2026-04-24T15:00:00Z","contract_size":"0.01","contract_root_unit":"BTC","group_description":"","contract_expiry_timezone":"","group_short_description":"","risk_managed_by":"MANAGED_BY_FCM","contract_expiry_type":"EXPIRING","contract_display_name":"","non_crypto":true,"futures_asset_type":"FUTURES_ASSET_TYPE_CRYPTO"})),
516 false
517 )]
518 #[case::future_energy(
519 "FUTURE",
520 Some(json!({"venue":"cde","contract_code":"OIL","contract_expiry":"2026-04-24T15:00:00Z","contract_size":"1","contract_root_unit":"OIL","group_description":"","contract_expiry_timezone":"","group_short_description":"","risk_managed_by":"MANAGED_BY_FCM","contract_expiry_type":"EXPIRING","contract_display_name":"","non_crypto":false,"futures_asset_type":"FUTURES_ASSET_TYPE_ENERGY"})),
521 false
522 )]
523 #[case::future_metals(
524 "FUTURE",
525 Some(json!({"venue":"cde","contract_code":"GLD","contract_expiry":"2026-04-24T15:00:00Z","contract_size":"1","contract_root_unit":"GLD","group_description":"","contract_expiry_timezone":"","group_short_description":"","risk_managed_by":"MANAGED_BY_FCM","contract_expiry_type":"EXPIRING","contract_display_name":"","non_crypto":false,"futures_asset_type":"FUTURES_ASSET_TYPE_METALS"})),
526 false
527 )]
528 #[case::future_stocks(
529 "FUTURE",
530 Some(json!({"venue":"cde","contract_code":"SPX","contract_expiry":"2026-04-24T15:00:00Z","contract_size":"1","contract_root_unit":"SPX","group_description":"","contract_expiry_timezone":"","group_short_description":"","risk_managed_by":"MANAGED_BY_FCM","contract_expiry_type":"EXPIRING","contract_display_name":"","non_crypto":false,"futures_asset_type":"FUTURES_ASSET_TYPE_STOCKS"})),
531 false
532 )]
533 fn test_is_supported_product(
534 #[case] product_type: &str,
535 #[case] future_details: Option<serde_json::Value>,
536 #[case] expected: bool,
537 ) {
538 let product = make_product(product_type, future_details);
539 assert_eq!(is_supported_product(&product), expected);
540 }
541
542 #[rstest]
543 fn test_load_from_product_response_rejects_unsupported() {
544 let provider = provider();
545 let json_str = load_test_fixture("http_product.json");
546 let mut json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
547 json["product_type"] = serde_json::Value::String("OPTIONS".to_string());
548
549 let result = provider.load_from_product_response(&json);
550
551 assert!(result.is_err());
552 let err_msg = result.unwrap_err().to_string();
553 assert!(
554 err_msg.contains("Unsupported product"),
555 "Expected 'Unsupported product' in error, was: {err_msg}"
556 );
557 }
558
559 #[rstest]
560 fn test_future_no_details_with_perp_display_name_is_supported() {
561 let json_str = load_test_fixture("http_product.json");
562 let mut json: serde_json::Value = serde_json::from_str(&json_str).unwrap();
563 json["product_type"] = serde_json::Value::String("FUTURE".to_string());
564 json["display_name"] = serde_json::Value::String("BTC PERP".to_string());
565 json["future_product_details"] = serde_json::Value::Null;
566
567 let product: Product = serde_json::from_value(json).unwrap();
568
569 assert!(is_supported_product(&product));
570 }
571
572 #[rstest]
573 fn test_future_no_details_without_perp_display_name_is_unsupported() {
574 let product = make_product("FUTURE", None);
575 assert!(!is_supported_product(&product));
576 }
577
578 #[rstest]
579 fn test_bulk_load_skips_unsupported_products() {
580 let provider = provider();
581 let spot_json_str = load_test_fixture("http_product.json");
582 let spot_json: serde_json::Value = serde_json::from_str(&spot_json_str).unwrap();
583
584 let mut unknown_json = spot_json.clone();
585 unknown_json["product_id"] = serde_json::Value::String("UNKNOWN-PAIR".to_string());
586 unknown_json["product_type"] = serde_json::Value::String("OPTIONS".to_string());
587
588 let response = json!({
589 "products": [spot_json, unknown_json],
590 "num_products": 2
591 });
592
593 let instruments = provider.load_from_products_response(&response).unwrap();
594
595 assert_eq!(instruments.len(), 1);
596 assert_eq!(instruments[0].id().symbol.as_str(), "BTC-USD");
597 assert_eq!(provider.count(), 1);
598 }
599}