Skip to main content

nautilus_dydx/common/
instrument_cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Thread-safe instrument cache for dYdX adapter.
17//!
18//! This module provides a centralized cache for instrument data that is shared
19//! between HTTP client, WebSocket client, and execution client via `Arc`.
20//!
21//! # Design
22//!
23//! dYdX uses different identifiers in different contexts:
24//! - **InstrumentId** ("BTC-USD-PERP.DYDX"): Nautilus internal identifier (primary key)
25//! - **Market ticker** ("BTC-USD"): Used in public WebSocket channels
26//! - **clob_pair_id** (0, 1, 2...): Used in blockchain transactions and order messages
27//!
28//! This cache provides O(1) lookups by any of these identifiers through internal indices.
29//! Using `InstrumentId` as the primary key provides better type safety and eliminates
30//! redundant conversions.
31//!
32//! # Thread Safety
33//!
34//! All operations use `DashMap` for lock-free concurrent access. The cache can be
35//! safely shared across multiple async tasks via `Arc<InstrumentCache>`.
36
37use std::sync::atomic::{AtomicBool, Ordering};
38
39use dashmap::DashMap;
40use nautilus_model::{
41    identifiers::InstrumentId,
42    instruments::{Instrument, InstrumentAny},
43};
44use ustr::Ustr;
45
46use crate::{grpc::OrderMarketParams, http::models::PerpetualMarket};
47
48/// Thread-safe instrument cache with multiple lookup indices.
49///
50/// Shared between HTTP client, WebSocket client, and execution client via `Arc`.
51/// Provides O(1) lookups by `InstrumentId`, market ticker, or clob_pair_id.
52
53#[derive(Debug, Default)]
54pub struct InstrumentCache {
55    /// Primary storage: InstrumentId → InstrumentAny
56    instruments: DashMap<InstrumentId, InstrumentAny>,
57    /// Index: clob_pair_id (0, 1, 2...) → InstrumentId (direct lookup)
58    clob_pair_id_index: DashMap<u32, InstrumentId>,
59    /// Index: market ticker ("BTC-USD") → InstrumentId (direct lookup)
60    market_index: DashMap<Ustr, InstrumentId>,
61    /// Market parameters: InstrumentId → PerpetualMarket
62    market_params: DashMap<InstrumentId, PerpetualMarket>,
63    /// Whether cache has been initialized with instrument data
64    initialized: AtomicBool,
65}
66
67impl InstrumentCache {
68    /// Creates a new empty instrument cache.
69    #[must_use]
70    pub fn new() -> Self {
71        Self::default()
72    }
73
74    /// Inserts an instrument with its market data.
75    ///
76    /// This populates the primary storage and all lookup indices.
77    pub fn insert(&self, instrument: InstrumentAny, market: PerpetualMarket) {
78        let instrument_id = instrument.id();
79        let ticker = Ustr::from(&market.ticker);
80        let clob_pair_id = market.clob_pair_id;
81
82        // Primary storage
83        self.instruments.insert(instrument_id, instrument);
84
85        // Build indices for reverse lookups (now point directly to InstrumentId)
86        self.clob_pair_id_index.insert(clob_pair_id, instrument_id);
87        self.market_index.insert(ticker, instrument_id);
88
89        // Store full market params for order building
90        self.market_params.insert(instrument_id, market);
91    }
92
93    /// Bulk inserts instruments with their market data.
94    ///
95    /// Marks the cache as initialized after insertion.
96    pub fn insert_many(&self, items: Vec<(InstrumentAny, PerpetualMarket)>) {
97        for (instrument, market) in items {
98            self.insert(instrument, market);
99        }
100        self.initialized.store(true, Ordering::Release);
101    }
102
103    /// Clears all cached data.
104    ///
105    /// Useful for refreshing instruments from the API.
106    pub fn clear(&self) {
107        self.instruments.clear();
108        self.clob_pair_id_index.clear();
109        self.market_index.clear();
110        self.market_params.clear();
111        self.initialized.store(false, Ordering::Release);
112    }
113
114    /// Inserts an instrument without market data.
115    ///
116    /// Derives the market ticker from the instrument symbol by stripping the
117    /// "-PERP" suffix, so `get_by_market()` works. `get_by_clob_id()` requires
118    /// full market params and won't work for instruments inserted this way.
119    pub fn insert_instrument_only(&self, instrument: InstrumentAny) {
120        let instrument_id = instrument.id();
121        let symbol = instrument_id.symbol.as_str();
122        let ticker = symbol.strip_suffix("-PERP").unwrap_or(symbol);
123        self.market_index.insert(Ustr::from(ticker), instrument_id);
124        self.instruments.insert(instrument_id, instrument);
125    }
126
127    /// Bulk inserts instruments without market data (derives market tickers).
128    ///
129    /// Marks the cache as initialized after insertion.
130    pub fn insert_instruments_only(&self, instruments: Vec<InstrumentAny>) {
131        for instrument in instruments {
132            self.insert_instrument_only(instrument);
133        }
134        self.initialized.store(true, Ordering::Release);
135    }
136
137    /// Gets an instrument by InstrumentId.
138    #[must_use]
139    pub fn get(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
140        self.instruments.get(instrument_id).map(|r| r.clone())
141    }
142
143    /// Gets an instrument by market ticker (e.g., "BTC-USD").
144    ///
145    /// This is the identifier used in public WebSocket channels.
146    #[must_use]
147    pub fn get_by_market(&self, ticker: &str) -> Option<InstrumentAny> {
148        let ticker_ustr = Ustr::from(ticker);
149        self.market_index
150            .get(&ticker_ustr)
151            .and_then(|instrument_id| self.instruments.get(&*instrument_id).map(|r| r.clone()))
152    }
153
154    /// Gets an instrument by clob_pair_id (e.g., 0, 1, 2).
155    ///
156    /// This is the identifier used in blockchain transactions and order messages.
157    #[must_use]
158    pub fn get_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
159        self.clob_pair_id_index
160            .get(&clob_pair_id)
161            .and_then(|instrument_id| self.instruments.get(&*instrument_id).map(|r| r.clone()))
162    }
163
164    /// Gets an InstrumentId by clob_pair_id.
165    ///
166    /// Returns directly from index without cloning full instrument.
167    #[must_use]
168    pub fn get_id_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentId> {
169        self.clob_pair_id_index.get(&clob_pair_id).map(|r| *r)
170    }
171
172    /// Gets an InstrumentId by market ticker.
173    ///
174    /// Returns directly from index without cloning full instrument.
175    #[must_use]
176    pub fn get_id_by_market(&self, ticker: &str) -> Option<InstrumentId> {
177        let ticker_ustr = Ustr::from(ticker);
178        self.market_index.get(&ticker_ustr).map(|r| *r)
179    }
180
181    /// Gets full market parameters by InstrumentId.
182    ///
183    /// Returns the complete `PerpetualMarket` data including margin requirements,
184    /// quantization parameters, and current oracle price.
185    #[must_use]
186    pub fn get_market_params(&self, instrument_id: &InstrumentId) -> Option<PerpetualMarket> {
187        self.market_params.get(instrument_id).map(|r| r.clone())
188    }
189
190    /// Gets order market parameters for order building.
191    ///
192    /// Returns the subset of market data needed for constructing orders
193    /// (quantization, clob_pair_id, etc.).
194    #[must_use]
195    pub fn get_order_market_params(
196        &self,
197        instrument_id: &InstrumentId,
198    ) -> Option<OrderMarketParams> {
199        self.get_market_params(instrument_id)
200            .map(|market| OrderMarketParams {
201                atomic_resolution: market.atomic_resolution,
202                clob_pair_id: market.clob_pair_id,
203                oracle_price: market.oracle_price,
204                quantum_conversion_exponent: market.quantum_conversion_exponent,
205                step_base_quantums: market.step_base_quantums,
206                subticks_per_tick: market.subticks_per_tick,
207            })
208    }
209
210    /// Updates oracle price for a market.
211    ///
212    /// Called when receiving price updates via WebSocket `v4_markets` channel.
213    pub fn update_oracle_price(&self, ticker: &str, oracle_price: rust_decimal::Decimal) {
214        let ticker_ustr = Ustr::from(ticker);
215        if let Some(instrument_id) = self.market_index.get(&ticker_ustr)
216            && let Some(mut market) = self.market_params.get_mut(&*instrument_id)
217        {
218            market.oracle_price = Some(oracle_price);
219        }
220    }
221
222    /// Returns whether the cache has been initialized with instrument data.
223    #[must_use]
224    pub fn is_initialized(&self) -> bool {
225        self.initialized.load(Ordering::Acquire)
226    }
227
228    /// Returns the number of cached instruments.
229    #[must_use]
230    pub fn len(&self) -> usize {
231        self.instruments.len()
232    }
233
234    /// Returns whether the cache is empty.
235    #[must_use]
236    pub fn is_empty(&self) -> bool {
237        self.instruments.is_empty()
238    }
239
240    /// Returns all cached instruments.
241    ///
242    /// Useful for WebSocket handler initialization and instrument replay.
243    #[must_use]
244    pub fn all_instruments(&self) -> Vec<InstrumentAny> {
245        self.instruments.iter().map(|r| r.clone()).collect()
246    }
247
248    /// Returns all InstrumentIds.
249    #[must_use]
250    pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
251        self.instruments.iter().map(|r| r.value().id()).collect()
252    }
253
254    /// Checks if an instrument exists by InstrumentId.
255    #[must_use]
256    pub fn contains(&self, instrument_id: &InstrumentId) -> bool {
257        self.instruments.contains_key(instrument_id)
258    }
259
260    /// Checks if an instrument exists by clob_pair_id.
261    #[must_use]
262    pub fn contains_clob_id(&self, clob_pair_id: u32) -> bool {
263        self.clob_pair_id_index.contains_key(&clob_pair_id)
264    }
265
266    /// Checks if an instrument exists by market ticker (e.g., "BTC-USD").
267    #[must_use]
268    pub fn contains_market(&self, ticker: &str) -> bool {
269        let ticker_ustr = Ustr::from(ticker);
270        self.market_index.contains_key(&ticker_ustr)
271    }
272
273    /// Returns a HashMap of all instruments keyed by InstrumentId.
274    ///
275    /// This is useful for parsing functions that expect `HashMap<InstrumentId, InstrumentAny>`.
276    /// Note: Creates a snapshot copy, so frequent calls should be avoided.
277    #[must_use]
278    pub fn to_instrument_id_map(&self) -> std::collections::HashMap<InstrumentId, InstrumentAny> {
279        self.instruments
280            .iter()
281            .map(|entry| (entry.value().id(), entry.value().clone()))
282            .collect()
283    }
284
285    /// Returns a HashMap of oracle prices keyed by InstrumentId.
286    ///
287    /// This is useful for parsing functions like `parse_account_state` that need oracle prices.
288    /// Note: Creates a snapshot copy, so frequent calls should be avoided.
289    #[must_use]
290    pub fn to_oracle_prices_map(
291        &self,
292    ) -> std::collections::HashMap<InstrumentId, rust_decimal::Decimal> {
293        self.market_params
294            .iter()
295            .filter_map(|entry| entry.value().oracle_price.map(|p| (*entry.key(), p)))
296            .collect()
297    }
298
299    /// Logs a warning about a missing instrument for a clob_pair_id, listing known mappings.
300    pub fn log_missing_clob_pair_id(&self, clob_pair_id: u32) {
301        let known: Vec<(u32, String)> = self
302            .clob_pair_id_index
303            .iter()
304            .map(|entry| (*entry.key(), entry.value().symbol.as_str().to_string()))
305            .collect();
306
307        log::warn!(
308            "Instrument for clob_pair_id {clob_pair_id} not found in cache. \
309             Known CLOB pair IDs and symbols: {known:?}"
310        );
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use nautilus_core::UnixNanos;
317    use nautilus_model::{
318        identifiers::{InstrumentId, Symbol, Venue},
319        instruments::{CryptoPerpetual, InstrumentAny},
320        types::{Currency, Price, Quantity},
321    };
322    use rstest::rstest;
323    use rust_decimal_macros::dec;
324    use ustr::Ustr;
325
326    use super::*;
327    use crate::common::enums::DydxMarketStatus;
328
329    fn create_test_instrument(symbol: &str) -> InstrumentAny {
330        let instrument_id = InstrumentId::new(Symbol::new(symbol), Venue::new("DYDX"));
331        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
332            instrument_id,
333            instrument_id.symbol,
334            Currency::BTC(),
335            Currency::USD(),
336            Currency::USD(),
337            false,
338            1,                       // price_precision
339            3,                       // size_precision
340            Price::new(0.1, 1),      // price_increment
341            Quantity::new(0.001, 3), // size_increment
342            None,                    // multiplier
343            None,                    // lot_size
344            None,                    // max_quantity
345            None,                    // min_quantity
346            None,                    // max_notional
347            None,                    // min_notional
348            None,                    // max_price
349            None,                    // min_price
350            None,                    // margin_init
351            None,                    // margin_maint
352            None,                    // maker_fee
353            None,                    // taker_fee
354            None,                    // info: Option<Params>
355            UnixNanos::default(),    // ts_event
356            UnixNanos::default(),    // ts_init
357        ))
358    }
359
360    fn create_test_market(ticker: &str, clob_pair_id: u32) -> PerpetualMarket {
361        PerpetualMarket {
362            clob_pair_id,
363            ticker: Ustr::from(ticker),
364            status: DydxMarketStatus::Active,
365            base_asset: Some(Ustr::from("BTC")),
366            quote_asset: Some(Ustr::from("USD")),
367            step_size: dec!(0.001),
368            tick_size: dec!(0.1),
369            index_price: Some(dec!(50000)),
370            oracle_price: Some(dec!(50000)),
371            price_change_24h: dec!(0),
372            next_funding_rate: dec!(0),
373            next_funding_at: None,
374            min_order_size: Some(dec!(0.001)),
375            market_type: None,
376            initial_margin_fraction: dec!(0.05),
377            maintenance_margin_fraction: dec!(0.03),
378            base_position_notional: None,
379            incremental_position_size: None,
380            incremental_initial_margin_fraction: None,
381            max_position_size: None,
382            open_interest: dec!(1000),
383            atomic_resolution: -10,
384            quantum_conversion_exponent: -9,
385            subticks_per_tick: 1000000,
386            step_base_quantums: 1000000,
387            is_reduce_only: false,
388        }
389    }
390
391    #[rstest]
392    fn test_insert_and_get() {
393        let cache = InstrumentCache::new();
394        let instrument = create_test_instrument("BTC-USD-PERP");
395        let instrument_id = instrument.id();
396        let market = create_test_market("BTC-USD", 0);
397
398        cache.insert(instrument, market);
399
400        // Get by InstrumentId
401        let retrieved = cache.get(&instrument_id);
402        assert!(retrieved.is_some());
403        assert_eq!(retrieved.unwrap().id().symbol.as_str(), "BTC-USD-PERP");
404    }
405
406    #[rstest]
407    fn test_get_by_market() {
408        let cache = InstrumentCache::new();
409        let instrument = create_test_instrument("BTC-USD-PERP");
410        let market = create_test_market("BTC-USD", 0);
411
412        cache.insert(instrument, market);
413
414        // Get by market ticker
415        let retrieved = cache.get_by_market("BTC-USD");
416        assert!(retrieved.is_some());
417        assert_eq!(retrieved.unwrap().id().symbol.as_str(), "BTC-USD-PERP");
418    }
419
420    #[rstest]
421    fn test_get_by_clob_id() {
422        let cache = InstrumentCache::new();
423        let instrument = create_test_instrument("BTC-USD-PERP");
424        let market = create_test_market("BTC-USD", 0);
425
426        cache.insert(instrument, market);
427
428        // Get by clob_pair_id
429        let retrieved = cache.get_by_clob_id(0);
430        assert!(retrieved.is_some());
431        assert_eq!(retrieved.unwrap().id().symbol.as_str(), "BTC-USD-PERP");
432
433        // Non-existent clob_pair_id
434        assert!(cache.get_by_clob_id(999).is_none());
435    }
436
437    #[rstest]
438    fn test_insert_many() {
439        let cache = InstrumentCache::new();
440
441        let items = vec![
442            (
443                create_test_instrument("BTC-USD-PERP"),
444                create_test_market("BTC-USD", 0),
445            ),
446            (
447                create_test_instrument("ETH-USD-PERP"),
448                create_test_market("ETH-USD", 1),
449            ),
450        ];
451
452        assert!(!cache.is_initialized());
453        cache.insert_many(items);
454        assert!(cache.is_initialized());
455
456        assert_eq!(cache.len(), 2);
457        assert!(cache.get_by_market("BTC-USD").is_some());
458        assert!(cache.get_by_market("ETH-USD").is_some());
459        assert!(cache.get_by_clob_id(0).is_some());
460        assert!(cache.get_by_clob_id(1).is_some());
461    }
462
463    #[rstest]
464    fn test_clear() {
465        let cache = InstrumentCache::new();
466        let instrument = create_test_instrument("BTC-USD-PERP");
467        let market = create_test_market("BTC-USD", 0);
468
469        cache.insert(instrument, market);
470        assert_eq!(cache.len(), 1);
471
472        cache.clear();
473        assert_eq!(cache.len(), 0);
474        assert!(!cache.is_initialized());
475    }
476
477    #[rstest]
478    fn test_get_market_params() {
479        let cache = InstrumentCache::new();
480        let instrument = create_test_instrument("BTC-USD-PERP");
481        let market = create_test_market("BTC-USD", 0);
482
483        cache.insert(instrument.clone(), market);
484
485        let params = cache.get_market_params(&instrument.id());
486        assert!(params.is_some());
487        let params = params.unwrap();
488        assert_eq!(params.clob_pair_id, 0);
489        assert_eq!(params.ticker, "BTC-USD");
490    }
491
492    #[rstest]
493    fn test_update_oracle_price() {
494        let cache = InstrumentCache::new();
495        let instrument = create_test_instrument("BTC-USD-PERP");
496        let market = create_test_market("BTC-USD", 0);
497
498        cache.insert(instrument.clone(), market);
499
500        // Initial oracle price
501        let params = cache.get_market_params(&instrument.id()).unwrap();
502        assert_eq!(params.oracle_price, Some(dec!(50000)));
503
504        // Update oracle price
505        cache.update_oracle_price("BTC-USD", dec!(55000));
506
507        let params = cache.get_market_params(&instrument.id()).unwrap();
508        assert_eq!(params.oracle_price, Some(dec!(55000)));
509    }
510
511    #[rstest]
512    fn test_to_oracle_prices_map() {
513        let cache = InstrumentCache::new();
514
515        let items = vec![
516            (
517                create_test_instrument("BTC-USD-PERP"),
518                create_test_market("BTC-USD", 0),
519            ),
520            (
521                create_test_instrument("ETH-USD-PERP"),
522                create_test_market("ETH-USD", 1),
523            ),
524        ];
525
526        cache.insert_many(items);
527
528        // Update one oracle price
529        cache.update_oracle_price("ETH-USD", dec!(3000));
530
531        let oracle_map = cache.to_oracle_prices_map();
532        assert_eq!(oracle_map.len(), 2);
533
534        // BTC-USD should have default 50000
535        let btc_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
536        assert_eq!(oracle_map.get(&btc_id), Some(&dec!(50000)));
537
538        // ETH-USD should have updated price 3000
539        let eth_id = InstrumentId::new(Symbol::new("ETH-USD-PERP"), Venue::new("DYDX"));
540        assert_eq!(oracle_map.get(&eth_id), Some(&dec!(3000)));
541    }
542
543    #[rstest]
544    fn test_get_order_market_params_with_none_oracle_price() {
545        let cache = InstrumentCache::new();
546        let instrument = create_test_instrument("WTI-USD-PERP");
547        let instrument_id = instrument.id();
548        let mut market = create_test_market("WTI-USD", 99);
549        market.oracle_price = None;
550
551        cache.insert(instrument, market);
552
553        let params = cache.get_order_market_params(&instrument_id).unwrap();
554        assert_eq!(params.oracle_price, None);
555        assert_eq!(params.clob_pair_id, 99);
556    }
557
558    #[rstest]
559    fn test_to_oracle_prices_map_excludes_none() {
560        let cache = InstrumentCache::new();
561
562        let mut market_no_oracle = create_test_market("WTI-USD", 99);
563        market_no_oracle.oracle_price = None;
564
565        let items = vec![
566            (
567                create_test_instrument("BTC-USD-PERP"),
568                create_test_market("BTC-USD", 0),
569            ),
570            (create_test_instrument("WTI-USD-PERP"), market_no_oracle),
571        ];
572
573        cache.insert_many(items);
574
575        let oracle_map = cache.to_oracle_prices_map();
576        assert_eq!(oracle_map.len(), 1);
577
578        let btc_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
579        assert_eq!(oracle_map.get(&btc_id), Some(&dec!(50000)));
580
581        let wti_id = InstrumentId::new(Symbol::new("WTI-USD-PERP"), Venue::new("DYDX"));
582        assert_eq!(oracle_map.get(&wti_id), None);
583    }
584}