Skip to main content

nautilus_tardis/machine/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Deduplication cache for derivative ticker data.
17
18use ahash::AHashMap;
19use nautilus_model::{
20    data::{FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate},
21    identifiers::InstrumentId,
22    types::Price,
23};
24
25/// Caches last-emitted derivative ticker values per instrument to suppress
26/// duplicates when the exchange re-sends unchanged fields.
27#[derive(Debug, Default)]
28pub struct DerivativeTickerCache {
29    funding_rates: AHashMap<InstrumentId, FundingRateUpdate>,
30    mark_prices: AHashMap<InstrumentId, Price>,
31    index_prices: AHashMap<InstrumentId, Price>,
32}
33
34impl DerivativeTickerCache {
35    /// Returns `true` if the funding rate changed (or is new) and should be emitted.
36    pub fn should_emit_funding_rate(&mut self, update: &FundingRateUpdate) -> bool {
37        let id = update.instrument_id;
38
39        if let Some(cached) = self.funding_rates.get(&id)
40            && cached == update
41        {
42            return false;
43        }
44
45        self.funding_rates.insert(id, *update);
46        true
47    }
48
49    /// Returns `true` if the mark price changed (or is new) and should be emitted.
50    pub fn should_emit_mark_price(&mut self, update: &MarkPriceUpdate) -> bool {
51        let id = update.instrument_id;
52
53        if let Some(cached) = self.mark_prices.get(&id)
54            && *cached == update.value
55        {
56            return false;
57        }
58
59        self.mark_prices.insert(id, update.value);
60        true
61    }
62
63    /// Returns `true` if the index price changed (or is new) and should be emitted.
64    pub fn should_emit_index_price(&mut self, update: &IndexPriceUpdate) -> bool {
65        let id = update.instrument_id;
66
67        if let Some(cached) = self.index_prices.get(&id)
68            && *cached == update.value
69        {
70            return false;
71        }
72
73        self.index_prices.insert(id, update.value);
74        true
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use nautilus_model::{
81        data::{FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate},
82        identifiers::InstrumentId,
83        types::Price,
84    };
85    use rstest::rstest;
86    use rust_decimal_macros::dec;
87
88    use super::*;
89
90    fn instrument_id() -> InstrumentId {
91        InstrumentId::from("BTCUSDT-PERP.BINANCE")
92    }
93
94    #[rstest]
95    fn test_funding_rate_first_value_emits() {
96        let mut cache = DerivativeTickerCache::default();
97        let update = FundingRateUpdate::new(
98            instrument_id(),
99            dec!(0.0001),
100            None,
101            None,
102            1.into(),
103            2.into(),
104        );
105        assert!(cache.should_emit_funding_rate(&update));
106    }
107
108    #[rstest]
109    fn test_funding_rate_duplicate_suppressed() {
110        let mut cache = DerivativeTickerCache::default();
111        let update = FundingRateUpdate::new(
112            instrument_id(),
113            dec!(0.0001),
114            None,
115            None,
116            1.into(),
117            2.into(),
118        );
119        cache.should_emit_funding_rate(&update);
120
121        let duplicate = FundingRateUpdate::new(
122            instrument_id(),
123            dec!(0.0001),
124            None,
125            None,
126            3.into(),
127            4.into(),
128        );
129        assert!(!cache.should_emit_funding_rate(&duplicate));
130    }
131
132    #[rstest]
133    fn test_funding_rate_changed_value_emits() {
134        let mut cache = DerivativeTickerCache::default();
135        let first = FundingRateUpdate::new(
136            instrument_id(),
137            dec!(0.0001),
138            None,
139            None,
140            1.into(),
141            2.into(),
142        );
143        cache.should_emit_funding_rate(&first);
144
145        let changed = FundingRateUpdate::new(
146            instrument_id(),
147            dec!(0.0002),
148            None,
149            None,
150            3.into(),
151            4.into(),
152        );
153        assert!(cache.should_emit_funding_rate(&changed));
154    }
155
156    #[rstest]
157    fn test_mark_price_first_value_emits() {
158        let mut cache = DerivativeTickerCache::default();
159        let update =
160            MarkPriceUpdate::new(instrument_id(), Price::new(42000.0, 1), 1.into(), 2.into());
161        assert!(cache.should_emit_mark_price(&update));
162    }
163
164    #[rstest]
165    fn test_mark_price_duplicate_suppressed() {
166        let mut cache = DerivativeTickerCache::default();
167        let update =
168            MarkPriceUpdate::new(instrument_id(), Price::new(42000.0, 1), 1.into(), 2.into());
169        cache.should_emit_mark_price(&update);
170
171        let duplicate =
172            MarkPriceUpdate::new(instrument_id(), Price::new(42000.0, 1), 3.into(), 4.into());
173        assert!(!cache.should_emit_mark_price(&duplicate));
174    }
175
176    #[rstest]
177    fn test_mark_price_changed_value_emits() {
178        let mut cache = DerivativeTickerCache::default();
179        let first =
180            MarkPriceUpdate::new(instrument_id(), Price::new(42000.0, 1), 1.into(), 2.into());
181        cache.should_emit_mark_price(&first);
182
183        let changed =
184            MarkPriceUpdate::new(instrument_id(), Price::new(42001.0, 1), 3.into(), 4.into());
185        assert!(cache.should_emit_mark_price(&changed));
186    }
187
188    #[rstest]
189    fn test_index_price_first_value_emits() {
190        let mut cache = DerivativeTickerCache::default();
191        let update =
192            IndexPriceUpdate::new(instrument_id(), Price::new(42000.0, 1), 1.into(), 2.into());
193        assert!(cache.should_emit_index_price(&update));
194    }
195
196    #[rstest]
197    fn test_index_price_duplicate_suppressed() {
198        let mut cache = DerivativeTickerCache::default();
199        let update =
200            IndexPriceUpdate::new(instrument_id(), Price::new(42000.0, 1), 1.into(), 2.into());
201        cache.should_emit_index_price(&update);
202
203        let duplicate =
204            IndexPriceUpdate::new(instrument_id(), Price::new(42000.0, 1), 3.into(), 4.into());
205        assert!(!cache.should_emit_index_price(&duplicate));
206    }
207
208    #[rstest]
209    fn test_index_price_changed_value_emits() {
210        let mut cache = DerivativeTickerCache::default();
211        let first =
212            IndexPriceUpdate::new(instrument_id(), Price::new(42000.0, 1), 1.into(), 2.into());
213        cache.should_emit_index_price(&first);
214
215        let changed =
216            IndexPriceUpdate::new(instrument_id(), Price::new(42001.0, 1), 3.into(), 4.into());
217        assert!(cache.should_emit_index_price(&changed));
218    }
219}