Skip to main content

nautilus_common/cache/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! In-memory cache for market and execution data, with optional persistent backing.
17//!
18//! Provides methods to load, query, and update cached data such as instruments, orders, and prices.
19
20pub mod config;
21pub mod database;
22pub mod fifo;
23pub mod quote;
24
25mod index;
26
27#[cfg(test)]
28mod tests;
29
30use std::{
31    collections::VecDeque,
32    fmt::{Debug, Display},
33    time::{SystemTime, UNIX_EPOCH},
34};
35
36use ahash::{AHashMap, AHashSet};
37use bytes::Bytes;
38pub use config::CacheConfig; // Re-export
39use database::{CacheDatabaseAdapter, CacheMap};
40use index::CacheIndex;
41use nautilus_core::{
42    UUID4, UnixNanos,
43    correctness::{
44        check_key_not_in_map, check_predicate_false, check_slice_not_empty,
45        check_valid_string_ascii,
46    },
47    datetime::secs_to_nanos_unchecked,
48};
49use nautilus_model::{
50    accounts::{Account, AccountAny},
51    data::{
52        Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, InstrumentStatus,
53        MarkPriceUpdate, QuoteTick, TradeTick, YieldCurveData, option_chain::OptionGreeks,
54    },
55    enums::{
56        AggregationSource, ContingencyType, OmsType, OrderSide, PositionSide, PriceType,
57        TriggerType,
58    },
59    identifiers::{
60        AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
61        OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
62    },
63    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
64    orderbook::{
65        OrderBook,
66        own::{OwnOrderBook, should_handle_own_book_order},
67    },
68    orders::{Order, OrderAny, OrderList},
69    position::Position,
70    types::{Currency, Money, Price, Quantity},
71};
72use ustr::Ustr;
73
74use crate::xrate::get_exchange_rate;
75
76/// A common in-memory `Cache` for market and execution related data.
77#[cfg_attr(
78    feature = "python",
79    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
80)]
81pub struct Cache {
82    config: CacheConfig,
83    index: CacheIndex,
84    database: Option<Box<dyn CacheDatabaseAdapter>>,
85    general: AHashMap<String, Bytes>,
86    currencies: AHashMap<Ustr, Currency>,
87    instruments: AHashMap<InstrumentId, InstrumentAny>,
88    synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
89    books: AHashMap<InstrumentId, OrderBook>,
90    own_books: AHashMap<InstrumentId, OwnOrderBook>,
91    quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
92    trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
93    mark_xrates: AHashMap<(Currency, Currency), f64>,
94    mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
95    index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
96    funding_rates: AHashMap<InstrumentId, VecDeque<FundingRateUpdate>>,
97    instrument_statuses: AHashMap<InstrumentId, VecDeque<InstrumentStatus>>,
98    bars: AHashMap<BarType, VecDeque<Bar>>,
99    greeks: AHashMap<InstrumentId, GreeksData>,
100    option_greeks: AHashMap<InstrumentId, OptionGreeks>,
101    yield_curves: AHashMap<String, YieldCurveData>,
102    accounts: AHashMap<AccountId, AccountAny>,
103    orders: AHashMap<ClientOrderId, OrderAny>,
104    order_lists: AHashMap<OrderListId, OrderList>,
105    positions: AHashMap<PositionId, Position>,
106    position_snapshots: AHashMap<PositionId, Vec<Bytes>>,
107    #[cfg(feature = "defi")]
108    pub(crate) defi: crate::defi::cache::DefiCache,
109}
110
111impl Debug for Cache {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct(stringify!(Cache))
114            .field("config", &self.config)
115            .field("index", &self.index)
116            .field("general", &self.general)
117            .field("currencies", &self.currencies)
118            .field("instruments", &self.instruments)
119            .field("synthetics", &self.synthetics)
120            .field("books", &self.books)
121            .field("own_books", &self.own_books)
122            .field("quotes", &self.quotes)
123            .field("trades", &self.trades)
124            .field("mark_xrates", &self.mark_xrates)
125            .field("mark_prices", &self.mark_prices)
126            .field("index_prices", &self.index_prices)
127            .field("funding_rates", &self.funding_rates)
128            .field("instrument_statuses", &self.instrument_statuses)
129            .field("bars", &self.bars)
130            .field("greeks", &self.greeks)
131            .field("option_greeks", &self.option_greeks)
132            .field("yield_curves", &self.yield_curves)
133            .field("accounts", &self.accounts)
134            .field("orders", &self.orders)
135            .field("order_lists", &self.order_lists)
136            .field("positions", &self.positions)
137            .field("position_snapshots", &self.position_snapshots)
138            .finish()
139    }
140}
141
142impl Default for Cache {
143    /// Creates a new default [`Cache`] instance.
144    fn default() -> Self {
145        Self::new(Some(CacheConfig::default()), None)
146    }
147}
148
149impl Cache {
150    /// Creates a new [`Cache`] instance with optional configuration and database adapter.
151    #[must_use]
152    /// # Note
153    ///
154    /// Uses provided `CacheConfig` or defaults, and optional `CacheDatabaseAdapter` for persistence.
155    pub fn new(
156        config: Option<CacheConfig>,
157        database: Option<Box<dyn CacheDatabaseAdapter>>,
158    ) -> Self {
159        Self {
160            config: config.unwrap_or_default(),
161            index: CacheIndex::default(),
162            database,
163            general: AHashMap::new(),
164            currencies: AHashMap::new(),
165            instruments: AHashMap::new(),
166            synthetics: AHashMap::new(),
167            books: AHashMap::new(),
168            own_books: AHashMap::new(),
169            quotes: AHashMap::new(),
170            trades: AHashMap::new(),
171            mark_xrates: AHashMap::new(),
172            mark_prices: AHashMap::new(),
173            index_prices: AHashMap::new(),
174            funding_rates: AHashMap::new(),
175            instrument_statuses: AHashMap::new(),
176            bars: AHashMap::new(),
177            greeks: AHashMap::new(),
178            option_greeks: AHashMap::new(),
179            yield_curves: AHashMap::new(),
180            accounts: AHashMap::new(),
181            orders: AHashMap::new(),
182            order_lists: AHashMap::new(),
183            positions: AHashMap::new(),
184            position_snapshots: AHashMap::new(),
185            #[cfg(feature = "defi")]
186            defi: crate::defi::cache::DefiCache::default(),
187        }
188    }
189
190    /// Returns the cache instances memory address.
191    #[must_use]
192    pub fn memory_address(&self) -> String {
193        format!("{:?}", std::ptr::from_ref(self))
194    }
195
196    /// Sets the cache database adapter for persistence.
197    ///
198    /// This allows setting or replacing the database adapter after cache construction.
199    pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
200        let type_name = std::any::type_name_of_val(&*database);
201        log::info!("Cache database adapter set: {type_name}");
202        self.database = Some(database);
203    }
204
205    // -- COMMANDS --------------------------------------------------------------------------------
206
207    /// Clears and reloads general entries from the database into the cache.
208    ///
209    /// # Errors
210    ///
211    /// Returns an error if loading general cache data fails.
212    pub fn cache_general(&mut self) -> anyhow::Result<()> {
213        self.general = match &mut self.database {
214            Some(db) => db.load()?,
215            None => AHashMap::new(),
216        };
217
218        log::info!(
219            "Cached {} general object(s) from database",
220            self.general.len()
221        );
222        Ok(())
223    }
224
225    /// Loads all core caches (currencies, instruments, accounts, orders, positions) from the database.
226    ///
227    /// # Errors
228    ///
229    /// Returns an error if loading all cache data fails.
230    pub async fn cache_all(&mut self) -> anyhow::Result<()> {
231        let cache_map = match &self.database {
232            Some(db) => db.load_all().await?,
233            None => CacheMap::default(),
234        };
235
236        self.currencies = cache_map.currencies;
237        self.instruments = cache_map.instruments;
238        self.synthetics = cache_map.synthetics;
239        self.accounts = cache_map.accounts;
240        self.orders = cache_map.orders;
241        self.positions = cache_map.positions;
242
243        self.assign_position_ids_to_contingencies();
244        Ok(())
245    }
246
247    /// Clears and reloads the currency cache from the database.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if loading currencies cache fails.
252    pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
253        self.currencies = match &mut self.database {
254            Some(db) => db.load_currencies().await?,
255            None => AHashMap::new(),
256        };
257
258        log::info!("Cached {} currencies from database", self.general.len());
259        Ok(())
260    }
261
262    /// Clears and reloads the instrument cache from the database.
263    ///
264    /// # Errors
265    ///
266    /// Returns an error if loading instruments cache fails.
267    pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
268        self.instruments = match &mut self.database {
269            Some(db) => db.load_instruments().await?,
270            None => AHashMap::new(),
271        };
272
273        log::info!("Cached {} instruments from database", self.general.len());
274        Ok(())
275    }
276
277    /// Clears and reloads the synthetic instrument cache from the database.
278    ///
279    /// # Errors
280    ///
281    /// Returns an error if loading synthetic instruments cache fails.
282    pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
283        self.synthetics = match &mut self.database {
284            Some(db) => db.load_synthetics().await?,
285            None => AHashMap::new(),
286        };
287
288        log::info!(
289            "Cached {} synthetic instruments from database",
290            self.general.len()
291        );
292        Ok(())
293    }
294
295    /// Clears and reloads the account cache from the database.
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if loading accounts cache fails.
300    pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
301        self.accounts = match &mut self.database {
302            Some(db) => db.load_accounts().await?,
303            None => AHashMap::new(),
304        };
305
306        log::info!(
307            "Cached {} synthetic instruments from database",
308            self.general.len()
309        );
310        Ok(())
311    }
312
313    /// Clears and reloads the order cache from the database.
314    ///
315    /// # Errors
316    ///
317    /// Returns an error if loading orders cache fails.
318    pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
319        self.orders = match &mut self.database {
320            Some(db) => db.load_orders().await?,
321            None => AHashMap::new(),
322        };
323
324        log::info!("Cached {} orders from database", self.general.len());
325
326        self.assign_position_ids_to_contingencies();
327        Ok(())
328    }
329
330    /// Clears and reloads the position cache from the database.
331    ///
332    /// # Errors
333    ///
334    /// Returns an error if loading positions cache fails.
335    pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
336        self.positions = match &mut self.database {
337            Some(db) => db.load_positions().await?,
338            None => AHashMap::new(),
339        };
340
341        log::info!("Cached {} positions from database", self.general.len());
342        Ok(())
343    }
344
345    /// Clears the current cache index and re-build.
346    pub fn build_index(&mut self) {
347        log::debug!("Building index");
348
349        // Index accounts
350        for account_id in self.accounts.keys() {
351            self.index
352                .venue_account
353                .insert(account_id.get_issuer(), *account_id);
354        }
355
356        // Index orders
357        for (client_order_id, order) in &self.orders {
358            let instrument_id = order.instrument_id();
359            let venue = instrument_id.venue;
360            let strategy_id = order.strategy_id();
361
362            // 1: Build index.venue_orders -> {Venue, {ClientOrderId}}
363            self.index
364                .venue_orders
365                .entry(venue)
366                .or_default()
367                .insert(*client_order_id);
368
369            // 2: Build index.order_ids -> {VenueOrderId, ClientOrderId}
370            if let Some(venue_order_id) = order.venue_order_id() {
371                self.index
372                    .venue_order_ids
373                    .insert(venue_order_id, *client_order_id);
374            }
375
376            // 3: Build index.order_position -> {ClientOrderId, PositionId}
377            if let Some(position_id) = order.position_id() {
378                self.index
379                    .order_position
380                    .insert(*client_order_id, position_id);
381            }
382
383            // 4: Build index.order_strategy -> {ClientOrderId, StrategyId}
384            self.index
385                .order_strategy
386                .insert(*client_order_id, order.strategy_id());
387
388            // 5: Build index.instrument_orders -> {InstrumentId, {ClientOrderId}}
389            self.index
390                .instrument_orders
391                .entry(instrument_id)
392                .or_default()
393                .insert(*client_order_id);
394
395            // 6: Build index.strategy_orders -> {StrategyId, {ClientOrderId}}
396            self.index
397                .strategy_orders
398                .entry(strategy_id)
399                .or_default()
400                .insert(*client_order_id);
401
402            // 7: Build index.account_orders -> {AccountId, {ClientOrderId}}
403            if let Some(account_id) = order.account_id() {
404                self.index
405                    .account_orders
406                    .entry(account_id)
407                    .or_default()
408                    .insert(*client_order_id);
409            }
410
411            // 8: Build index.exec_algorithm_orders -> {ExecAlgorithmId, {ClientOrderId}}
412            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
413                self.index
414                    .exec_algorithm_orders
415                    .entry(exec_algorithm_id)
416                    .or_default()
417                    .insert(*client_order_id);
418            }
419
420            // 8: Build index.exec_spawn_orders -> {ClientOrderId, {ClientOrderId}}
421            if let Some(exec_spawn_id) = order.exec_spawn_id() {
422                self.index
423                    .exec_spawn_orders
424                    .entry(exec_spawn_id)
425                    .or_default()
426                    .insert(*client_order_id);
427            }
428
429            // 9: Build index.orders -> {ClientOrderId}
430            self.index.orders.insert(*client_order_id);
431
432            // 10: Build index.orders_active_local -> {ClientOrderId}
433            if order.is_active_local() {
434                self.index.orders_active_local.insert(*client_order_id);
435            }
436
437            // 11: Build index.orders_open -> {ClientOrderId}
438            if order.is_open() {
439                self.index.orders_open.insert(*client_order_id);
440            }
441
442            // 12: Build index.orders_closed -> {ClientOrderId}
443            if order.is_closed() {
444                self.index.orders_closed.insert(*client_order_id);
445            }
446
447            // 13: Build index.orders_emulated -> {ClientOrderId}
448            if let Some(emulation_trigger) = order.emulation_trigger()
449                && emulation_trigger != TriggerType::NoTrigger
450                && !order.is_closed()
451            {
452                self.index.orders_emulated.insert(*client_order_id);
453            }
454
455            // 14: Build index.orders_inflight -> {ClientOrderId}
456            if order.is_inflight() {
457                self.index.orders_inflight.insert(*client_order_id);
458            }
459
460            // 15: Build index.strategies -> {StrategyId}
461            self.index.strategies.insert(strategy_id);
462
463            // 16: Build index.strategies -> {ExecAlgorithmId}
464            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
465                self.index.exec_algorithms.insert(exec_algorithm_id);
466            }
467        }
468
469        // Index positions
470        for (position_id, position) in &self.positions {
471            let instrument_id = position.instrument_id;
472            let venue = instrument_id.venue;
473            let strategy_id = position.strategy_id;
474
475            // 1: Build index.venue_positions -> {Venue, {PositionId}}
476            self.index
477                .venue_positions
478                .entry(venue)
479                .or_default()
480                .insert(*position_id);
481
482            // 2: Build index.position_strategy -> {PositionId, StrategyId}
483            self.index
484                .position_strategy
485                .insert(*position_id, position.strategy_id);
486
487            // 3: Build index.position_orders -> {PositionId, {ClientOrderId}}
488            self.index
489                .position_orders
490                .entry(*position_id)
491                .or_default()
492                .extend(position.client_order_ids());
493
494            // 4: Build index.instrument_positions -> {InstrumentId, {PositionId}}
495            self.index
496                .instrument_positions
497                .entry(instrument_id)
498                .or_default()
499                .insert(*position_id);
500
501            // 5: Build index.strategy_positions -> {StrategyId, {PositionId}}
502            self.index
503                .strategy_positions
504                .entry(strategy_id)
505                .or_default()
506                .insert(*position_id);
507
508            // 6: Build index.account_positions -> {AccountId, {PositionId}}
509            self.index
510                .account_positions
511                .entry(position.account_id)
512                .or_default()
513                .insert(*position_id);
514
515            // 7: Build index.positions -> {PositionId}
516            self.index.positions.insert(*position_id);
517
518            // 8: Build index.positions_open -> {PositionId}
519            if position.is_open() {
520                self.index.positions_open.insert(*position_id);
521            }
522
523            // 9: Build index.positions_closed -> {PositionId}
524            if position.is_closed() {
525                self.index.positions_closed.insert(*position_id);
526            }
527
528            // 10: Build index.strategies -> {StrategyId}
529            self.index.strategies.insert(strategy_id);
530        }
531    }
532
533    /// Returns whether the cache has a backing database.
534    #[must_use]
535    pub const fn has_backing(&self) -> bool {
536        self.config.database.is_some()
537    }
538
539    // Calculate the unrealized profit and loss (PnL) for `position`.
540    #[must_use]
541    pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
542        let quote = if let Some(quote) = self.quote(&position.instrument_id) {
543            quote
544        } else {
545            log::warn!(
546                "Cannot calculate unrealized PnL for {}, no quotes for {}",
547                position.id,
548                position.instrument_id
549            );
550            return None;
551        };
552
553        // Use exit price for mark-to-market: longs exit at bid, shorts exit at ask
554        let last = match position.side {
555            PositionSide::Flat | PositionSide::NoPositionSide => {
556                return Some(Money::new(0.0, position.settlement_currency));
557            }
558            PositionSide::Long => quote.bid_price,
559            PositionSide::Short => quote.ask_price,
560        };
561
562        Some(position.unrealized_pnl(last))
563    }
564
565    /// Checks integrity of data within the cache.
566    ///
567    /// All data should be loaded from the database prior to this call.
568    /// If an error is found then a log error message will also be produced.
569    ///
570    /// # Panics
571    ///
572    /// Panics if failure calling system clock.
573    #[must_use]
574    pub fn check_integrity(&mut self) -> bool {
575        let mut error_count = 0;
576        let failure = "Integrity failure";
577
578        // Get current timestamp in microseconds
579        let timestamp_us = SystemTime::now()
580            .duration_since(UNIX_EPOCH)
581            .expect("Time went backwards")
582            .as_micros();
583
584        log::info!("Checking data integrity");
585
586        // Check object caches
587        for account_id in self.accounts.keys() {
588            if !self
589                .index
590                .venue_account
591                .contains_key(&account_id.get_issuer())
592            {
593                log::error!(
594                    "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
595                );
596                error_count += 1;
597            }
598        }
599
600        for (client_order_id, order) in &self.orders {
601            if !self.index.order_strategy.contains_key(client_order_id) {
602                log::error!(
603                    "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
604                );
605                error_count += 1;
606            }
607
608            if !self.index.orders.contains(client_order_id) {
609                log::error!(
610                    "{failure} in orders: {client_order_id} not found in `self.index.orders`",
611                );
612                error_count += 1;
613            }
614
615            if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
616                log::error!(
617                    "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
618                );
619                error_count += 1;
620            }
621
622            if order.is_active_local() && !self.index.orders_active_local.contains(client_order_id)
623            {
624                log::error!(
625                    "{failure} in orders: {client_order_id} not found in `self.index.orders_active_local`",
626                );
627                error_count += 1;
628            }
629
630            if order.is_open() && !self.index.orders_open.contains(client_order_id) {
631                log::error!(
632                    "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
633                );
634                error_count += 1;
635            }
636
637            if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
638                log::error!(
639                    "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
640                );
641                error_count += 1;
642            }
643
644            if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
645                if !self
646                    .index
647                    .exec_algorithm_orders
648                    .contains_key(&exec_algorithm_id)
649                {
650                    log::error!(
651                        "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
652                    );
653                    error_count += 1;
654                }
655
656                if order.exec_spawn_id().is_none()
657                    && !self.index.exec_spawn_orders.contains_key(client_order_id)
658                {
659                    log::error!(
660                        "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
661                    );
662                    error_count += 1;
663                }
664            }
665        }
666
667        for (position_id, position) in &self.positions {
668            if !self.index.position_strategy.contains_key(position_id) {
669                log::error!(
670                    "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
671                );
672                error_count += 1;
673            }
674
675            if !self.index.position_orders.contains_key(position_id) {
676                log::error!(
677                    "{failure} in positions: {position_id} not found in `self.index.position_orders`",
678                );
679                error_count += 1;
680            }
681
682            if !self.index.positions.contains(position_id) {
683                log::error!(
684                    "{failure} in positions: {position_id} not found in `self.index.positions`",
685                );
686                error_count += 1;
687            }
688
689            if position.is_open() && !self.index.positions_open.contains(position_id) {
690                log::error!(
691                    "{failure} in positions: {position_id} not found in `self.index.positions_open`",
692                );
693                error_count += 1;
694            }
695
696            if position.is_closed() && !self.index.positions_closed.contains(position_id) {
697                log::error!(
698                    "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
699                );
700                error_count += 1;
701            }
702        }
703
704        // Check indexes
705        for account_id in self.index.venue_account.values() {
706            if !self.accounts.contains_key(account_id) {
707                log::error!(
708                    "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
709                );
710                error_count += 1;
711            }
712        }
713
714        for client_order_id in self.index.venue_order_ids.values() {
715            if !self.orders.contains_key(client_order_id) {
716                log::error!(
717                    "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
718                );
719                error_count += 1;
720            }
721        }
722
723        for client_order_id in self.index.client_order_ids.keys() {
724            if !self.orders.contains_key(client_order_id) {
725                log::error!(
726                    "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
727                );
728                error_count += 1;
729            }
730        }
731
732        for client_order_id in self.index.order_position.keys() {
733            if !self.orders.contains_key(client_order_id) {
734                log::error!(
735                    "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
736                );
737                error_count += 1;
738            }
739        }
740
741        // Check indexes
742        for client_order_id in self.index.order_strategy.keys() {
743            if !self.orders.contains_key(client_order_id) {
744                log::error!(
745                    "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
746                );
747                error_count += 1;
748            }
749        }
750
751        for position_id in self.index.position_strategy.keys() {
752            if !self.positions.contains_key(position_id) {
753                log::error!(
754                    "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
755                );
756                error_count += 1;
757            }
758        }
759
760        for position_id in self.index.position_orders.keys() {
761            if !self.positions.contains_key(position_id) {
762                log::error!(
763                    "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
764                );
765                error_count += 1;
766            }
767        }
768
769        for (instrument_id, client_order_ids) in &self.index.instrument_orders {
770            for client_order_id in client_order_ids {
771                if !self.orders.contains_key(client_order_id) {
772                    log::error!(
773                        "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
774                    );
775                    error_count += 1;
776                }
777            }
778        }
779
780        for instrument_id in self.index.instrument_positions.keys() {
781            if !self.index.instrument_orders.contains_key(instrument_id) {
782                log::error!(
783                    "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
784                );
785                error_count += 1;
786            }
787        }
788
789        for client_order_ids in self.index.strategy_orders.values() {
790            for client_order_id in client_order_ids {
791                if !self.orders.contains_key(client_order_id) {
792                    log::error!(
793                        "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
794                    );
795                    error_count += 1;
796                }
797            }
798        }
799
800        for position_ids in self.index.strategy_positions.values() {
801            for position_id in position_ids {
802                if !self.positions.contains_key(position_id) {
803                    log::error!(
804                        "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
805                    );
806                    error_count += 1;
807                }
808            }
809        }
810
811        for client_order_id in &self.index.orders {
812            if !self.orders.contains_key(client_order_id) {
813                log::error!(
814                    "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
815                );
816                error_count += 1;
817            }
818        }
819
820        for client_order_id in &self.index.orders_emulated {
821            if !self.orders.contains_key(client_order_id) {
822                log::error!(
823                    "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
824                );
825                error_count += 1;
826            }
827        }
828
829        for client_order_id in &self.index.orders_active_local {
830            if !self.orders.contains_key(client_order_id) {
831                log::error!(
832                    "{failure} in `index.orders_active_local`: {client_order_id} not found in `self.orders`",
833                );
834                error_count += 1;
835            }
836        }
837
838        for client_order_id in &self.index.orders_inflight {
839            if !self.orders.contains_key(client_order_id) {
840                log::error!(
841                    "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
842                );
843                error_count += 1;
844            }
845        }
846
847        for client_order_id in &self.index.orders_open {
848            if !self.orders.contains_key(client_order_id) {
849                log::error!(
850                    "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
851                );
852                error_count += 1;
853            }
854        }
855
856        for client_order_id in &self.index.orders_closed {
857            if !self.orders.contains_key(client_order_id) {
858                log::error!(
859                    "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
860                );
861                error_count += 1;
862            }
863        }
864
865        for position_id in &self.index.positions {
866            if !self.positions.contains_key(position_id) {
867                log::error!(
868                    "{failure} in `index.positions`: {position_id} not found in `self.positions`",
869                );
870                error_count += 1;
871            }
872        }
873
874        for position_id in &self.index.positions_open {
875            if !self.positions.contains_key(position_id) {
876                log::error!(
877                    "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
878                );
879                error_count += 1;
880            }
881        }
882
883        for position_id in &self.index.positions_closed {
884            if !self.positions.contains_key(position_id) {
885                log::error!(
886                    "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
887                );
888                error_count += 1;
889            }
890        }
891
892        for strategy_id in &self.index.strategies {
893            if !self.index.strategy_orders.contains_key(strategy_id) {
894                log::error!(
895                    "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
896                );
897                error_count += 1;
898            }
899        }
900
901        for exec_algorithm_id in &self.index.exec_algorithms {
902            if !self
903                .index
904                .exec_algorithm_orders
905                .contains_key(exec_algorithm_id)
906            {
907                log::error!(
908                    "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
909                );
910                error_count += 1;
911            }
912        }
913
914        let total_us = SystemTime::now()
915            .duration_since(UNIX_EPOCH)
916            .expect("Time went backwards")
917            .as_micros()
918            - timestamp_us;
919
920        if error_count == 0 {
921            log::info!("Integrity check passed in {total_us}μs");
922            true
923        } else {
924            log::error!(
925                "Integrity check failed with {error_count} error{} in {total_us}μs",
926                if error_count == 1 { "" } else { "s" },
927            );
928            false
929        }
930    }
931
932    /// Checks for any residual open state and log warnings if any are found.
933    ///
934    ///'Open state' is considered to be open orders and open positions.
935    #[must_use]
936    pub fn check_residuals(&self) -> bool {
937        log::debug!("Checking residuals");
938
939        let mut residuals = false;
940
941        // Check for any open orders
942        for order in self.orders_open(None, None, None, None, None) {
943            residuals = true;
944            log::warn!("Residual {order}");
945        }
946
947        // Check for any open positions
948        for position in self.positions_open(None, None, None, None, None) {
949            residuals = true;
950            log::warn!("Residual {position}");
951        }
952
953        residuals
954    }
955
956    /// Purges all closed orders from the cache that are older than `buffer_secs`.
957    ///
958    ///
959    /// Only orders that have been closed for at least this amount of time will be purged.
960    /// A value of 0 means purge all closed orders regardless of when they were closed.
961    pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
962        log::debug!(
963            "Purging closed orders{}",
964            if buffer_secs > 0 {
965                format!(" with buffer_secs={buffer_secs}")
966            } else {
967                String::new()
968            }
969        );
970
971        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
972
973        let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
974
975        'outer: for client_order_id in self.index.orders_closed.clone() {
976            if let Some(order) = self.orders.get(&client_order_id)
977                && order.is_closed()
978                && let Some(ts_closed) = order.ts_closed()
979                && ts_closed + buffer_ns <= ts_now
980            {
981                // Check any linked orders (contingency orders)
982                if let Some(linked_order_ids) = order.linked_order_ids() {
983                    for linked_order_id in linked_order_ids {
984                        if let Some(linked_order) = self.orders.get(linked_order_id)
985                            && linked_order.is_open()
986                        {
987                            // Do not purge if linked order still open
988                            continue 'outer;
989                        }
990                    }
991                }
992
993                if let Some(order_list_id) = order.order_list_id() {
994                    affected_order_list_ids.insert(order_list_id);
995                }
996
997                self.purge_order(client_order_id);
998            }
999        }
1000
1001        for order_list_id in affected_order_list_ids {
1002            if let Some(order_list) = self.order_lists.get(&order_list_id) {
1003                let all_purged = order_list
1004                    .client_order_ids
1005                    .iter()
1006                    .all(|id| !self.orders.contains_key(id));
1007
1008                if all_purged {
1009                    self.order_lists.remove(&order_list_id);
1010                    log::info!("Purged {order_list_id}");
1011                }
1012            }
1013        }
1014    }
1015
1016    /// Purges all closed positions from the cache that are older than `buffer_secs`.
1017    pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
1018        log::debug!(
1019            "Purging closed positions{}",
1020            if buffer_secs > 0 {
1021                format!(" with buffer_secs={buffer_secs}")
1022            } else {
1023                String::new()
1024            }
1025        );
1026
1027        let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
1028
1029        for position_id in self.index.positions_closed.clone() {
1030            if let Some(position) = self.positions.get(&position_id)
1031                && position.is_closed()
1032                && let Some(ts_closed) = position.ts_closed
1033                && ts_closed + buffer_ns <= ts_now
1034            {
1035                self.purge_position(position_id);
1036            }
1037        }
1038    }
1039
1040    /// Purges the order with the `client_order_id` from the cache (if found).
1041    ///
1042    /// For safety, an order is prevented from being purged if it's open.
1043    pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
1044        // Check if order exists and is safe to purge before removing
1045        let order = self.orders.get(&client_order_id).cloned();
1046
1047        // Prevent purging open orders
1048        if let Some(ref ord) = order
1049            && ord.is_open()
1050        {
1051            log::warn!("Order {client_order_id} found open when purging, skipping purge");
1052            return;
1053        }
1054
1055        // If order exists in cache, remove it and clean up order-specific indices
1056        if let Some(ref ord) = order {
1057            // Safe to purge
1058            self.orders.remove(&client_order_id);
1059
1060            // Remove order from venue index
1061            if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
1062            {
1063                venue_orders.remove(&client_order_id);
1064                if venue_orders.is_empty() {
1065                    self.index.venue_orders.remove(&ord.instrument_id().venue);
1066                }
1067            }
1068
1069            // Remove venue order ID index if exists
1070            if let Some(venue_order_id) = ord.venue_order_id() {
1071                self.index.venue_order_ids.remove(&venue_order_id);
1072            }
1073
1074            // Remove from instrument orders index
1075            if let Some(instrument_orders) =
1076                self.index.instrument_orders.get_mut(&ord.instrument_id())
1077            {
1078                instrument_orders.remove(&client_order_id);
1079                if instrument_orders.is_empty() {
1080                    self.index.instrument_orders.remove(&ord.instrument_id());
1081                }
1082            }
1083
1084            // Remove from position orders index if associated with a position
1085            if let Some(position_id) = ord.position_id()
1086                && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
1087            {
1088                position_orders.remove(&client_order_id);
1089                if position_orders.is_empty() {
1090                    self.index.position_orders.remove(&position_id);
1091                }
1092            }
1093
1094            // Remove from exec algorithm orders index if it has an exec algorithm
1095            if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
1096                && let Some(exec_algorithm_orders) =
1097                    self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1098            {
1099                exec_algorithm_orders.remove(&client_order_id);
1100                if exec_algorithm_orders.is_empty() {
1101                    self.index.exec_algorithm_orders.remove(&exec_algorithm_id);
1102                }
1103            }
1104
1105            // Clean up strategy orders reverse index
1106            if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1107                strategy_orders.remove(&client_order_id);
1108                if strategy_orders.is_empty() {
1109                    self.index.strategy_orders.remove(&ord.strategy_id());
1110                }
1111            }
1112
1113            // Clean up account orders index
1114            if let Some(account_id) = ord.account_id()
1115                && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
1116            {
1117                account_orders.remove(&client_order_id);
1118                if account_orders.is_empty() {
1119                    self.index.account_orders.remove(&account_id);
1120                }
1121            }
1122
1123            // Clean up exec spawn reverse index (if this order is a spawned child)
1124            if let Some(exec_spawn_id) = ord.exec_spawn_id()
1125                && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1126            {
1127                spawn_orders.remove(&client_order_id);
1128                if spawn_orders.is_empty() {
1129                    self.index.exec_spawn_orders.remove(&exec_spawn_id);
1130                }
1131            }
1132
1133            log::info!("Purged order {client_order_id}");
1134        } else {
1135            log::warn!("Order {client_order_id} not found when purging");
1136        }
1137
1138        // Always clean up order indices (even if order was not in cache)
1139        self.index.order_position.remove(&client_order_id);
1140        let strategy_id = self.index.order_strategy.remove(&client_order_id);
1141        self.index.order_client.remove(&client_order_id);
1142        self.index.client_order_ids.remove(&client_order_id);
1143
1144        // Clean up reverse index when order not in cache (using forward index)
1145        if let Some(strategy_id) = strategy_id
1146            && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1147        {
1148            strategy_orders.remove(&client_order_id);
1149            if strategy_orders.is_empty() {
1150                self.index.strategy_orders.remove(&strategy_id);
1151            }
1152        }
1153
1154        // Remove spawn parent entry if this order was a spawn root
1155        self.index.exec_spawn_orders.remove(&client_order_id);
1156
1157        self.index.orders.remove(&client_order_id);
1158        self.index.orders_active_local.remove(&client_order_id);
1159        self.index.orders_open.remove(&client_order_id);
1160        self.index.orders_closed.remove(&client_order_id);
1161        self.index.orders_emulated.remove(&client_order_id);
1162        self.index.orders_inflight.remove(&client_order_id);
1163        self.index.orders_pending_cancel.remove(&client_order_id);
1164    }
1165
1166    /// Purges the position with the `position_id` from the cache (if found).
1167    ///
1168    /// For safety, a position is prevented from being purged if it's open.
1169    pub fn purge_position(&mut self, position_id: PositionId) {
1170        // Check if position exists and is safe to purge before removing
1171        let position = self.positions.get(&position_id).cloned();
1172
1173        // Prevent purging open positions
1174        if let Some(ref pos) = position
1175            && pos.is_open()
1176        {
1177            log::warn!("Position {position_id} found open when purging, skipping purge");
1178            return;
1179        }
1180
1181        // If position exists in cache, remove it and clean up position-specific indices
1182        if let Some(ref pos) = position {
1183            self.positions.remove(&position_id);
1184
1185            // Remove from venue positions index
1186            if let Some(venue_positions) =
1187                self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1188            {
1189                venue_positions.remove(&position_id);
1190                if venue_positions.is_empty() {
1191                    self.index.venue_positions.remove(&pos.instrument_id.venue);
1192                }
1193            }
1194
1195            // Remove from instrument positions index
1196            if let Some(instrument_positions) =
1197                self.index.instrument_positions.get_mut(&pos.instrument_id)
1198            {
1199                instrument_positions.remove(&position_id);
1200                if instrument_positions.is_empty() {
1201                    self.index.instrument_positions.remove(&pos.instrument_id);
1202                }
1203            }
1204
1205            // Remove from strategy positions index
1206            if let Some(strategy_positions) =
1207                self.index.strategy_positions.get_mut(&pos.strategy_id)
1208            {
1209                strategy_positions.remove(&position_id);
1210                if strategy_positions.is_empty() {
1211                    self.index.strategy_positions.remove(&pos.strategy_id);
1212                }
1213            }
1214
1215            // Remove from account positions index
1216            if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
1217                account_positions.remove(&position_id);
1218                if account_positions.is_empty() {
1219                    self.index.account_positions.remove(&pos.account_id);
1220                }
1221            }
1222
1223            // Remove position ID from orders that reference it
1224            for client_order_id in pos.client_order_ids() {
1225                self.index.order_position.remove(&client_order_id);
1226            }
1227
1228            log::info!("Purged position {position_id}");
1229        } else {
1230            log::warn!("Position {position_id} not found when purging");
1231        }
1232
1233        // Always clean up position indices (even if position not in cache)
1234        self.index.position_strategy.remove(&position_id);
1235        self.index.position_orders.remove(&position_id);
1236        self.index.positions.remove(&position_id);
1237        self.index.positions_open.remove(&position_id);
1238        self.index.positions_closed.remove(&position_id);
1239
1240        // Always clean up position snapshots (even if position not in cache)
1241        self.position_snapshots.remove(&position_id);
1242    }
1243
1244    /// Purges all account state events which are outside the lookback window.
1245    ///
1246    /// Only events which are outside the lookback window will be purged.
1247    /// A value of 0 means purge all account state events.
1248    pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1249        log::debug!(
1250            "Purging account events{}",
1251            if lookback_secs > 0 {
1252                format!(" with lookback_secs={lookback_secs}")
1253            } else {
1254                String::new()
1255            }
1256        );
1257
1258        for account in self.accounts.values_mut() {
1259            let event_count = account.event_count();
1260            account.purge_account_events(ts_now, lookback_secs);
1261            let count_diff = event_count - account.event_count();
1262            if count_diff > 0 {
1263                log::info!(
1264                    "Purged {} event(s) from account {}",
1265                    count_diff,
1266                    account.id()
1267                );
1268            }
1269        }
1270    }
1271
1272    /// Clears the caches index.
1273    pub fn clear_index(&mut self) {
1274        self.index.clear();
1275        log::debug!("Cleared index");
1276    }
1277
1278    /// Resets the cache.
1279    ///
1280    /// All stateful fields are reset to their initial value. Instruments,
1281    /// currencies and synthetics are retained when `drop_instruments_on_reset`
1282    /// is `false` so that repeated backtest runs can reuse the same dataset.
1283    pub fn reset(&mut self) {
1284        log::debug!("Resetting cache");
1285
1286        self.general.clear();
1287        self.books.clear();
1288        self.own_books.clear();
1289        self.quotes.clear();
1290        self.trades.clear();
1291        self.mark_xrates.clear();
1292        self.mark_prices.clear();
1293        self.index_prices.clear();
1294        self.funding_rates.clear();
1295        self.instrument_statuses.clear();
1296        self.bars.clear();
1297        self.accounts.clear();
1298        self.orders.clear();
1299        self.order_lists.clear();
1300        self.positions.clear();
1301        self.position_snapshots.clear();
1302        self.greeks.clear();
1303        self.yield_curves.clear();
1304
1305        if self.config.drop_instruments_on_reset {
1306            self.currencies.clear();
1307            self.instruments.clear();
1308            self.synthetics.clear();
1309        }
1310
1311        #[cfg(feature = "defi")]
1312        {
1313            self.defi.pools.clear();
1314            self.defi.pool_profilers.clear();
1315        }
1316
1317        self.clear_index();
1318
1319        log::info!("Reset cache");
1320    }
1321
1322    /// Dispose of the cache which will close any underlying database adapter.
1323    ///
1324    /// If closing the database connection fails, an error is logged.
1325    pub fn dispose(&mut self) {
1326        self.reset();
1327
1328        if let Some(database) = &mut self.database
1329            && let Err(e) = database.close()
1330        {
1331            log::error!("Failed to close database during dispose: {e}");
1332        }
1333    }
1334
1335    /// Flushes the caches database which permanently removes all persisted data.
1336    ///
1337    /// If flushing the database connection fails, an error is logged.
1338    pub fn flush_db(&mut self) {
1339        if let Some(database) = &mut self.database
1340            && let Err(e) = database.flush()
1341        {
1342            log::error!("Failed to flush database: {e}");
1343        }
1344    }
1345
1346    /// Adds a raw bytes `value` to the cache under the `key`.
1347    ///
1348    /// The cache stores only raw bytes; interpretation is the caller's responsibility.
1349    ///
1350    /// # Errors
1351    ///
1352    /// Returns an error if persisting the entry to the backing database fails.
1353    pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1354        check_valid_string_ascii(key, stringify!(key))?;
1355        check_predicate_false(value.is_empty(), stringify!(value))?;
1356
1357        log::debug!("Adding general {key}");
1358        self.general.insert(key.to_string(), value.clone());
1359
1360        if let Some(database) = &mut self.database {
1361            database.add(key.to_string(), value)?;
1362        }
1363        Ok(())
1364    }
1365
1366    /// Adds an `OrderBook` to the cache.
1367    ///
1368    /// # Errors
1369    ///
1370    /// Returns an error if persisting the order book to the backing database fails.
1371    pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1372        log::debug!("Adding `OrderBook` {}", book.instrument_id);
1373
1374        if self.config.save_market_data
1375            && let Some(database) = &mut self.database
1376        {
1377            database.add_order_book(&book)?;
1378        }
1379
1380        self.books.insert(book.instrument_id, book);
1381        Ok(())
1382    }
1383
1384    /// Adds an `OwnOrderBook` to the cache.
1385    ///
1386    /// # Errors
1387    ///
1388    /// Returns an error if persisting the own order book fails.
1389    pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1390        log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1391
1392        self.own_books.insert(own_book.instrument_id, own_book);
1393        Ok(())
1394    }
1395
1396    /// Adds the `mark_price` update to the cache.
1397    ///
1398    /// # Errors
1399    ///
1400    /// Returns an error if persisting the mark price to the backing database fails.
1401    pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1402        log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1403
1404        if self.config.save_market_data {
1405            // TODO: Placeholder and return Result for consistency
1406        }
1407
1408        let mark_prices_deque = self
1409            .mark_prices
1410            .entry(mark_price.instrument_id)
1411            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1412        mark_prices_deque.push_front(mark_price);
1413        Ok(())
1414    }
1415
1416    /// Adds the `index_price` update to the cache.
1417    ///
1418    /// # Errors
1419    ///
1420    /// Returns an error if persisting the index price to the backing database fails.
1421    pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1422        log::debug!(
1423            "Adding `IndexPriceUpdate` for {}",
1424            index_price.instrument_id
1425        );
1426
1427        if self.config.save_market_data {
1428            // TODO: Placeholder and return Result for consistency
1429        }
1430
1431        let index_prices_deque = self
1432            .index_prices
1433            .entry(index_price.instrument_id)
1434            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1435        index_prices_deque.push_front(index_price);
1436        Ok(())
1437    }
1438
1439    /// Adds the `funding_rate` update to the cache.
1440    ///
1441    /// # Errors
1442    ///
1443    /// Returns an error if persisting the funding rate update to the backing database fails.
1444    pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1445        log::debug!(
1446            "Adding `FundingRateUpdate` for {}",
1447            funding_rate.instrument_id
1448        );
1449
1450        if self.config.save_market_data {
1451            // TODO: Placeholder and return Result for consistency
1452        }
1453
1454        let funding_rates_deque = self
1455            .funding_rates
1456            .entry(funding_rate.instrument_id)
1457            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1458        funding_rates_deque.push_front(funding_rate);
1459        Ok(())
1460    }
1461
1462    /// Adds the given `funding rates` to the cache.
1463    ///
1464    /// # Errors
1465    ///
1466    /// Returns an error if persisting the trade ticks to the backing database fails.
1467    pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
1468        check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
1469
1470        let instrument_id = funding_rates[0].instrument_id;
1471        log::debug!(
1472            "Adding `FundingRateUpdate`[{}] {instrument_id}",
1473            funding_rates.len()
1474        );
1475
1476        if self.config.save_market_data
1477            && let Some(database) = &mut self.database
1478        {
1479            for funding_rate in funding_rates {
1480                database.add_funding_rate(funding_rate)?;
1481            }
1482        }
1483
1484        let funding_rate_deque = self
1485            .funding_rates
1486            .entry(instrument_id)
1487            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1488
1489        for funding_rate in funding_rates {
1490            funding_rate_deque.push_front(*funding_rate);
1491        }
1492        Ok(())
1493    }
1494
1495    /// Adds the `instrument_status` update to the cache.
1496    ///
1497    /// # Errors
1498    ///
1499    /// Returns an error if persisting the instrument status to the backing database fails.
1500    pub fn add_instrument_status(&mut self, status: InstrumentStatus) -> anyhow::Result<()> {
1501        log::debug!("Adding `InstrumentStatus` for {}", status.instrument_id);
1502
1503        if self.config.save_market_data {
1504            // TODO: Placeholder and return Result for consistency
1505        }
1506
1507        let statuses_deque = self
1508            .instrument_statuses
1509            .entry(status.instrument_id)
1510            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1511        statuses_deque.push_front(status);
1512        Ok(())
1513    }
1514
1515    /// Adds the `quote` tick to the cache.
1516    ///
1517    /// # Errors
1518    ///
1519    /// Returns an error if persisting the quote tick to the backing database fails.
1520    pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1521        log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1522
1523        if self.config.save_market_data
1524            && let Some(database) = &mut self.database
1525        {
1526            database.add_quote(&quote)?;
1527        }
1528
1529        let quotes_deque = self
1530            .quotes
1531            .entry(quote.instrument_id)
1532            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1533        quotes_deque.push_front(quote);
1534        Ok(())
1535    }
1536
1537    /// Adds the `quotes` to the cache.
1538    ///
1539    /// # Errors
1540    ///
1541    /// Returns an error if persisting the quote ticks to the backing database fails.
1542    pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1543        check_slice_not_empty(quotes, stringify!(quotes))?;
1544
1545        let instrument_id = quotes[0].instrument_id;
1546        log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1547
1548        if self.config.save_market_data
1549            && let Some(database) = &mut self.database
1550        {
1551            for quote in quotes {
1552                database.add_quote(quote)?;
1553            }
1554        }
1555
1556        let quotes_deque = self
1557            .quotes
1558            .entry(instrument_id)
1559            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1560
1561        for quote in quotes {
1562            quotes_deque.push_front(*quote);
1563        }
1564        Ok(())
1565    }
1566
1567    /// Adds the `trade` tick to the cache.
1568    ///
1569    /// # Errors
1570    ///
1571    /// Returns an error if persisting the trade tick to the backing database fails.
1572    pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1573        log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1574
1575        if self.config.save_market_data
1576            && let Some(database) = &mut self.database
1577        {
1578            database.add_trade(&trade)?;
1579        }
1580
1581        let trades_deque = self
1582            .trades
1583            .entry(trade.instrument_id)
1584            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1585        trades_deque.push_front(trade);
1586        Ok(())
1587    }
1588
1589    /// Adds the give `trades` to the cache.
1590    ///
1591    /// # Errors
1592    ///
1593    /// Returns an error if persisting the trade ticks to the backing database fails.
1594    pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1595        check_slice_not_empty(trades, stringify!(trades))?;
1596
1597        let instrument_id = trades[0].instrument_id;
1598        log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1599
1600        if self.config.save_market_data
1601            && let Some(database) = &mut self.database
1602        {
1603            for trade in trades {
1604                database.add_trade(trade)?;
1605            }
1606        }
1607
1608        let trades_deque = self
1609            .trades
1610            .entry(instrument_id)
1611            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1612
1613        for trade in trades {
1614            trades_deque.push_front(*trade);
1615        }
1616        Ok(())
1617    }
1618
1619    /// Adds the `bar` to the cache.
1620    ///
1621    /// # Errors
1622    ///
1623    /// Returns an error if persisting the bar to the backing database fails.
1624    pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1625        log::debug!("Adding `Bar` {}", bar.bar_type);
1626
1627        if self.config.save_market_data
1628            && let Some(database) = &mut self.database
1629        {
1630            database.add_bar(&bar)?;
1631        }
1632
1633        let bars = self
1634            .bars
1635            .entry(bar.bar_type)
1636            .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1637        bars.push_front(bar);
1638        Ok(())
1639    }
1640
1641    /// Adds the `bars` to the cache.
1642    ///
1643    /// # Errors
1644    ///
1645    /// Returns an error if persisting the bars to the backing database fails.
1646    pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1647        check_slice_not_empty(bars, stringify!(bars))?;
1648
1649        let bar_type = bars[0].bar_type;
1650        log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1651
1652        if self.config.save_market_data
1653            && let Some(database) = &mut self.database
1654        {
1655            for bar in bars {
1656                database.add_bar(bar)?;
1657            }
1658        }
1659
1660        let bars_deque = self
1661            .bars
1662            .entry(bar_type)
1663            .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1664
1665        for bar in bars {
1666            bars_deque.push_front(*bar);
1667        }
1668        Ok(())
1669    }
1670
1671    /// Adds the `greeks` data to the cache.
1672    ///
1673    /// # Errors
1674    ///
1675    /// Returns an error if persisting the greeks data to the backing database fails.
1676    pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1677        log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1678
1679        if self.config.save_market_data
1680            && let Some(_database) = &mut self.database
1681        {
1682            // TODO: Implement database.add_greeks(&greeks) when database adapter is updated
1683        }
1684
1685        self.greeks.insert(greeks.instrument_id, greeks);
1686        Ok(())
1687    }
1688
1689    /// Gets the greeks data for the `instrument_id`.
1690    pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1691        self.greeks.get(instrument_id).cloned()
1692    }
1693
1694    /// Adds exchange-provided option greeks to the cache.
1695    pub fn add_option_greeks(&mut self, greeks: OptionGreeks) {
1696        log::debug!("Adding `OptionGreeks` {}", greeks.instrument_id);
1697        self.option_greeks.insert(greeks.instrument_id, greeks);
1698    }
1699
1700    /// Gets a reference to the exchange-provided option greeks for the `instrument_id`.
1701    #[must_use]
1702    pub fn option_greeks(&self, instrument_id: &InstrumentId) -> Option<&OptionGreeks> {
1703        self.option_greeks.get(instrument_id)
1704    }
1705
1706    /// Adds the `yield_curve` data to the cache.
1707    ///
1708    /// # Errors
1709    ///
1710    /// Returns an error if persisting the yield curve data to the backing database fails.
1711    pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1712        log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1713
1714        if self.config.save_market_data
1715            && let Some(_database) = &mut self.database
1716        {
1717            // TODO: Implement database.add_yield_curve(&yield_curve) when database adapter is updated
1718        }
1719
1720        self.yield_curves
1721            .insert(yield_curve.curve_name.clone(), yield_curve);
1722        Ok(())
1723    }
1724
1725    /// Gets the yield curve for the `key`.
1726    pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1727        self.yield_curves.get(key).map(|curve| {
1728            let curve_clone = curve.clone();
1729            Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1730                as Box<dyn Fn(f64) -> f64>
1731        })
1732    }
1733
1734    /// Adds the `currency` to the cache.
1735    ///
1736    /// # Errors
1737    ///
1738    /// Returns an error if persisting the currency to the backing database fails.
1739    pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1740        if self.currencies.contains_key(&currency.code) {
1741            return Ok(());
1742        }
1743        log::debug!("Adding `Currency` {}", currency.code);
1744
1745        if let Some(database) = &mut self.database {
1746            database.add_currency(&currency)?;
1747        }
1748
1749        self.currencies.insert(currency.code, currency);
1750        Ok(())
1751    }
1752
1753    /// Adds the `instrument` to the cache.
1754    ///
1755    /// # Errors
1756    ///
1757    /// Returns an error if persisting the instrument to the backing database fails.
1758    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1759        log::debug!("Adding `Instrument` {}", instrument.id());
1760
1761        // Ensure currencies exist in cache - safe to call repeatedly as add_currency is idempotent
1762        if let Some(base_currency) = instrument.base_currency() {
1763            self.add_currency(base_currency)?;
1764        }
1765        self.add_currency(instrument.quote_currency())?;
1766        self.add_currency(instrument.settlement_currency())?;
1767
1768        if let Some(database) = &mut self.database {
1769            database.add_instrument(&instrument)?;
1770        }
1771
1772        self.instruments.insert(instrument.id(), instrument);
1773        Ok(())
1774    }
1775
1776    /// Adds the `synthetic` instrument to the cache.
1777    ///
1778    /// # Errors
1779    ///
1780    /// Returns an error if persisting the synthetic instrument to the backing database fails.
1781    pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1782        log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1783
1784        if let Some(database) = &mut self.database {
1785            database.add_synthetic(&synthetic)?;
1786        }
1787
1788        self.synthetics.insert(synthetic.id, synthetic);
1789        Ok(())
1790    }
1791
1792    /// Adds the `account` to the cache.
1793    ///
1794    /// # Errors
1795    ///
1796    /// Returns an error if persisting the account to the backing database fails.
1797    pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1798        log::debug!("Adding `Account` {}", account.id());
1799
1800        if let Some(database) = &mut self.database {
1801            database.add_account(&account)?;
1802        }
1803
1804        let account_id = account.id();
1805        self.accounts.insert(account_id, account);
1806        self.index
1807            .venue_account
1808            .insert(account_id.get_issuer(), account_id);
1809        Ok(())
1810    }
1811
1812    /// Indexes the `client_order_id` with the `venue_order_id`.
1813    ///
1814    /// The `overwrite` parameter determines whether to overwrite any existing cached identifier.
1815    ///
1816    /// # Errors
1817    ///
1818    /// Returns an error if the existing venue order ID conflicts and overwrite is false.
1819    pub fn add_venue_order_id(
1820        &mut self,
1821        client_order_id: &ClientOrderId,
1822        venue_order_id: &VenueOrderId,
1823        overwrite: bool,
1824    ) -> anyhow::Result<()> {
1825        if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1826            && !overwrite
1827            && existing_venue_order_id != venue_order_id
1828        {
1829            anyhow::bail!(
1830                "Existing {existing_venue_order_id} for {client_order_id}
1831                    did not match the given {venue_order_id}.
1832                    If you are writing a test then try a different `venue_order_id`,
1833                    otherwise this is probably a bug."
1834            );
1835        }
1836
1837        self.index
1838            .client_order_ids
1839            .insert(*client_order_id, *venue_order_id);
1840        self.index
1841            .venue_order_ids
1842            .insert(*venue_order_id, *client_order_id);
1843
1844        Ok(())
1845    }
1846
1847    /// Adds the `order` to the cache indexed with any given identifiers.
1848    ///
1849    /// # Parameters
1850    ///
1851    /// `override_existing`: If the added order should 'override' any existing order and replace
1852    /// it in the cache. This is currently used for emulated orders which are
1853    /// being released and transformed into another type.
1854    ///
1855    /// # Errors
1856    ///
1857    /// Returns an error if not `replace_existing` and the `order.client_order_id` is already contained in the cache.
1858    pub fn add_order(
1859        &mut self,
1860        order: OrderAny,
1861        position_id: Option<PositionId>,
1862        client_id: Option<ClientId>,
1863        replace_existing: bool,
1864    ) -> anyhow::Result<()> {
1865        let instrument_id = order.instrument_id();
1866        let venue = instrument_id.venue;
1867        let client_order_id = order.client_order_id();
1868        let strategy_id = order.strategy_id();
1869        let exec_algorithm_id = order.exec_algorithm_id();
1870        let exec_spawn_id = order.exec_spawn_id();
1871
1872        if !replace_existing {
1873            check_key_not_in_map(
1874                &client_order_id,
1875                &self.orders,
1876                stringify!(client_order_id),
1877                stringify!(orders),
1878            )?;
1879        }
1880
1881        log::debug!("Adding {order:?}");
1882
1883        self.index.orders.insert(client_order_id);
1884
1885        if order.is_active_local() {
1886            self.index.orders_active_local.insert(client_order_id);
1887        }
1888        self.index
1889            .order_strategy
1890            .insert(client_order_id, strategy_id);
1891        self.index.strategies.insert(strategy_id);
1892
1893        // Update venue -> orders index
1894        self.index
1895            .venue_orders
1896            .entry(venue)
1897            .or_default()
1898            .insert(client_order_id);
1899
1900        // Update instrument -> orders index
1901        self.index
1902            .instrument_orders
1903            .entry(instrument_id)
1904            .or_default()
1905            .insert(client_order_id);
1906
1907        // Update strategy -> orders index
1908        self.index
1909            .strategy_orders
1910            .entry(strategy_id)
1911            .or_default()
1912            .insert(client_order_id);
1913
1914        // Update account -> orders index (if account_id known at creation)
1915        if let Some(account_id) = order.account_id() {
1916            self.index
1917                .account_orders
1918                .entry(account_id)
1919                .or_default()
1920                .insert(client_order_id);
1921        }
1922
1923        // Update exec_algorithm -> orders index
1924        if let Some(exec_algorithm_id) = exec_algorithm_id {
1925            self.index.exec_algorithms.insert(exec_algorithm_id);
1926
1927            self.index
1928                .exec_algorithm_orders
1929                .entry(exec_algorithm_id)
1930                .or_default()
1931                .insert(client_order_id);
1932        }
1933
1934        // Update exec_spawn -> orders index
1935        if let Some(exec_spawn_id) = exec_spawn_id {
1936            self.index
1937                .exec_spawn_orders
1938                .entry(exec_spawn_id)
1939                .or_default()
1940                .insert(client_order_id);
1941        }
1942
1943        // Update emulation index
1944        if let Some(emulation_trigger) = order.emulation_trigger()
1945            && emulation_trigger != TriggerType::NoTrigger
1946        {
1947            self.index.orders_emulated.insert(client_order_id);
1948        }
1949
1950        // Index position ID if provided
1951        if let Some(position_id) = position_id {
1952            self.add_position_id(
1953                &position_id,
1954                &order.instrument_id().venue,
1955                &client_order_id,
1956                &strategy_id,
1957            )?;
1958        }
1959
1960        // Index client ID if provided
1961        if let Some(client_id) = client_id {
1962            self.index.order_client.insert(client_order_id, client_id);
1963            log::debug!("Indexed {client_id:?}");
1964        }
1965
1966        if let Some(database) = &mut self.database {
1967            database.add_order(&order, client_id)?;
1968            // TODO: Implement
1969            // if self.config.snapshot_orders {
1970            //     database.snapshot_order_state(order)?;
1971            // }
1972        }
1973
1974        self.orders.insert(client_order_id, order);
1975
1976        Ok(())
1977    }
1978
1979    /// Adds the `order_list` to the cache.
1980    ///
1981    /// # Errors
1982    ///
1983    /// Returns an error if the order list ID is already contained in the cache.
1984    pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
1985        let order_list_id = order_list.id;
1986        check_key_not_in_map(
1987            &order_list_id,
1988            &self.order_lists,
1989            stringify!(order_list_id),
1990            stringify!(order_lists),
1991        )?;
1992
1993        log::debug!("Adding {order_list:?}");
1994        self.order_lists.insert(order_list_id, order_list);
1995        Ok(())
1996    }
1997
1998    /// Indexes the `position_id` with the other given IDs.
1999    ///
2000    /// # Errors
2001    ///
2002    /// Returns an error if indexing position ID in the backing database fails.
2003    pub fn add_position_id(
2004        &mut self,
2005        position_id: &PositionId,
2006        venue: &Venue,
2007        client_order_id: &ClientOrderId,
2008        strategy_id: &StrategyId,
2009    ) -> anyhow::Result<()> {
2010        self.index
2011            .order_position
2012            .insert(*client_order_id, *position_id);
2013
2014        // Index: ClientOrderId -> PositionId
2015        if let Some(database) = &mut self.database {
2016            database.index_order_position(*client_order_id, *position_id)?;
2017        }
2018
2019        // Index: PositionId -> StrategyId
2020        self.index
2021            .position_strategy
2022            .insert(*position_id, *strategy_id);
2023
2024        // Index: PositionId -> set[ClientOrderId]
2025        self.index
2026            .position_orders
2027            .entry(*position_id)
2028            .or_default()
2029            .insert(*client_order_id);
2030
2031        // Index: StrategyId -> set[PositionId]
2032        self.index
2033            .strategy_positions
2034            .entry(*strategy_id)
2035            .or_default()
2036            .insert(*position_id);
2037
2038        // Index: Venue -> set[PositionId]
2039        self.index
2040            .venue_positions
2041            .entry(*venue)
2042            .or_default()
2043            .insert(*position_id);
2044
2045        Ok(())
2046    }
2047
2048    // Propagates parent OTO `position_id` to contingent children that are missing one.
2049    //
2050    // Recovers from a partial-write window during fill handling: the fill-time path in the
2051    // execution engine assigns `position_id` to each contingent child in a non-atomic loop
2052    // (`set_position_id` then `add_position_id`), so a crash mid-loop can leave the database
2053    // with the parent updated and some children un-updated. This pass re-applies any missing
2054    // assignments after load. Mirrors the Cython behaviour at
2055    // `nautilus_trader/cache/cache.pyx::_assign_position_id_to_contingencies`.
2056    fn assign_position_ids_to_contingencies(&mut self) {
2057        let mut assignments: Vec<(PositionId, ClientOrderId)> = Vec::new();
2058
2059        for parent in self.orders.values() {
2060            if parent.contingency_type() != Some(ContingencyType::Oto) {
2061                continue;
2062            }
2063            let Some(parent_position_id) = parent.position_id() else {
2064                continue;
2065            };
2066            let Some(linked_order_ids) = parent.linked_order_ids() else {
2067                continue;
2068            };
2069
2070            for client_order_id in linked_order_ids {
2071                match self.orders.get(client_order_id) {
2072                    None => {
2073                        log::error!("Contingency order {client_order_id} not found");
2074                    }
2075                    Some(contingent) => {
2076                        if contingent.position_id().is_none() {
2077                            assignments.push((parent_position_id, *client_order_id));
2078                        }
2079                    }
2080                }
2081            }
2082        }
2083
2084        for (position_id, client_order_id) in assignments {
2085            let Some((venue, strategy_id)) =
2086                self.orders.get_mut(&client_order_id).map(|contingent| {
2087                    contingent.set_position_id(Some(position_id));
2088                    (contingent.instrument_id().venue, contingent.strategy_id())
2089                })
2090            else {
2091                continue;
2092            };
2093
2094            // In-memory index updates only. The persistent index entry (if any) was written by
2095            // the original fill-time `add_position_id` call; replaying the database write here
2096            // would invoke `CacheDatabaseAdapter::index_order_position`, which is currently
2097            // `todo!()` on both the Redis and SQL adapters. Until those land, the load-time
2098            // recovery is in-memory-only: sufficient for the current process to operate, but
2099            // not durable across another restart.
2100            self.index
2101                .order_position
2102                .insert(client_order_id, position_id);
2103            self.index
2104                .position_strategy
2105                .insert(position_id, strategy_id);
2106            self.index
2107                .position_orders
2108                .entry(position_id)
2109                .or_default()
2110                .insert(client_order_id);
2111            self.index
2112                .strategy_positions
2113                .entry(strategy_id)
2114                .or_default()
2115                .insert(position_id);
2116            self.index
2117                .venue_positions
2118                .entry(venue)
2119                .or_default()
2120                .insert(position_id);
2121        }
2122    }
2123
2124    /// Adds the `position` to the cache.
2125    ///
2126    /// # Errors
2127    ///
2128    /// Returns an error if persisting the position to the backing database fails.
2129    pub fn add_position(&mut self, position: &Position, _oms_type: OmsType) -> anyhow::Result<()> {
2130        self.positions.insert(position.id, position.clone());
2131        self.index.positions.insert(position.id);
2132        self.index.positions_open.insert(position.id);
2133        self.index.positions_closed.remove(&position.id); // Cleanup for NETTING reopen
2134
2135        log::debug!("Adding {position}");
2136
2137        self.add_position_id(
2138            &position.id,
2139            &position.instrument_id.venue,
2140            &position.opening_order_id,
2141            &position.strategy_id,
2142        )?;
2143
2144        let venue = position.instrument_id.venue;
2145        let venue_positions = self.index.venue_positions.entry(venue).or_default();
2146        venue_positions.insert(position.id);
2147
2148        // Index: InstrumentId -> AHashSet
2149        let instrument_id = position.instrument_id;
2150        let instrument_positions = self
2151            .index
2152            .instrument_positions
2153            .entry(instrument_id)
2154            .or_default();
2155        instrument_positions.insert(position.id);
2156
2157        // Index: AccountId -> AHashSet<PositionId>
2158        self.index
2159            .account_positions
2160            .entry(position.account_id)
2161            .or_default()
2162            .insert(position.id);
2163
2164        if let Some(database) = &mut self.database {
2165            database.add_position(position)?;
2166            // TODO: Implement position snapshots
2167            // if self.snapshot_positions {
2168            //     database.snapshot_position_state(
2169            //         position,
2170            //         position.ts_last,
2171            //         self.calculate_unrealized_pnl(&position),
2172            //     )?;
2173            // }
2174        }
2175
2176        Ok(())
2177    }
2178
2179    /// Updates the `account` in the cache.
2180    ///
2181    /// # Errors
2182    ///
2183    /// Returns an error if updating the account in the database fails.
2184    pub fn update_account(&mut self, account: &AccountAny) -> anyhow::Result<()> {
2185        let account_id = account.id();
2186        self.accounts.insert(account_id, account.clone());
2187
2188        if let Some(database) = &mut self.database {
2189            database.update_account(account)?;
2190        }
2191        Ok(())
2192    }
2193
2194    /// Updates the `order` in the cache.
2195    ///
2196    /// # Errors
2197    ///
2198    /// Returns an error if updating the order in the database fails.
2199    pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2200        let client_order_id = order.client_order_id();
2201
2202        if order.is_active_local() {
2203            self.index.orders_active_local.insert(client_order_id);
2204        } else {
2205            self.index.orders_active_local.remove(&client_order_id);
2206        }
2207
2208        // Update venue order ID
2209        if let Some(venue_order_id) = order.venue_order_id() {
2210            // If the order is being modified then we allow a changing `VenueOrderId` to accommodate
2211            // venues which use a cancel+replace update strategy.
2212            if !self.index.venue_order_ids.contains_key(&venue_order_id) {
2213                // TODO: If the last event was `OrderUpdated` then overwrite should be true
2214                self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
2215            }
2216        }
2217
2218        // Update in-flight state
2219        if order.is_inflight() {
2220            self.index.orders_inflight.insert(client_order_id);
2221        } else {
2222            self.index.orders_inflight.remove(&client_order_id);
2223        }
2224
2225        // Update open/closed state
2226        if order.is_open() {
2227            self.index.orders_closed.remove(&client_order_id);
2228            self.index.orders_open.insert(client_order_id);
2229        } else if order.is_closed() {
2230            self.index.orders_open.remove(&client_order_id);
2231            self.index.orders_pending_cancel.remove(&client_order_id);
2232            self.index.orders_closed.insert(client_order_id);
2233        }
2234
2235        // Update emulation index
2236        if let Some(emulation_trigger) = order.emulation_trigger()
2237            && emulation_trigger != TriggerType::NoTrigger
2238            && !order.is_closed()
2239        {
2240            self.index.orders_emulated.insert(client_order_id);
2241        } else {
2242            self.index.orders_emulated.remove(&client_order_id);
2243        }
2244
2245        // Update account orders index when account_id becomes available
2246        if let Some(account_id) = order.account_id() {
2247            self.index
2248                .account_orders
2249                .entry(account_id)
2250                .or_default()
2251                .insert(client_order_id);
2252        }
2253
2254        // Update own book
2255        if self.own_order_book(&order.instrument_id()).is_some()
2256            && should_handle_own_book_order(order)
2257        {
2258            self.update_own_order_book(order);
2259        }
2260
2261        if let Some(database) = &mut self.database {
2262            database.update_order(order.last_event())?;
2263            // TODO: Implement order snapshots
2264            // if self.snapshot_orders {
2265            //     database.snapshot_order_state(order)?;
2266            // }
2267        }
2268
2269        // update the order in the cache
2270        self.orders.insert(client_order_id, order.clone());
2271
2272        Ok(())
2273    }
2274
2275    /// Updates the `order` as pending cancel locally.
2276    pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
2277        self.index
2278            .orders_pending_cancel
2279            .insert(order.client_order_id());
2280    }
2281
2282    /// Updates the `position` in the cache.
2283    ///
2284    /// # Errors
2285    ///
2286    /// Returns an error if updating the position in the database fails.
2287    pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
2288        // Update open/closed state
2289
2290        if position.is_open() {
2291            self.index.positions_open.insert(position.id);
2292            self.index.positions_closed.remove(&position.id);
2293        } else {
2294            self.index.positions_closed.insert(position.id);
2295            self.index.positions_open.remove(&position.id);
2296        }
2297
2298        if let Some(database) = &mut self.database {
2299            database.update_position(position)?;
2300            // TODO: Implement order snapshots
2301            // if self.snapshot_orders {
2302            //     database.snapshot_order_state(order)?;
2303            // }
2304        }
2305
2306        self.positions.insert(position.id, position.clone());
2307
2308        Ok(())
2309    }
2310
2311    /// Creates a snapshot of the `position` by cloning it, assigning a new ID,
2312    /// serializing it, and storing it in the position snapshots.
2313    ///
2314    /// # Errors
2315    ///
2316    /// Returns an error if serializing or storing the position snapshot fails.
2317    pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
2318        let position_id = position.id;
2319
2320        let mut copied_position = position.clone();
2321        let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
2322        copied_position.id = PositionId::new(new_id);
2323
2324        // Serialize the position (TODO: temporarily just to JSON to remove a dependency)
2325        let position_serialized = serde_json::to_vec(&copied_position)?;
2326
2327        self.position_snapshots
2328            .entry(position_id)
2329            .or_default()
2330            .push(Bytes::from(position_serialized));
2331
2332        log::debug!("Snapshot {copied_position}");
2333        Ok(())
2334    }
2335
2336    /// Creates a snapshot of the `position` state in the database.
2337    ///
2338    /// # Errors
2339    ///
2340    /// Returns an error if snapshotting the position state fails.
2341    pub fn snapshot_position_state(
2342        &mut self,
2343        position: &Position,
2344        // ts_snapshot: u64,
2345        // unrealized_pnl: Option<Money>,
2346        open_only: Option<bool>,
2347    ) -> anyhow::Result<()> {
2348        let open_only = open_only.unwrap_or(true);
2349
2350        if open_only && !position.is_open() {
2351            return Ok(());
2352        }
2353
2354        if let Some(database) = &mut self.database {
2355            database.snapshot_position_state(position).map_err(|e| {
2356                log::error!(
2357                    "Failed to snapshot position state for {}: {e:?}",
2358                    position.id
2359                );
2360                e
2361            })?;
2362        } else {
2363            log::warn!(
2364                "Cannot snapshot position state for {} (no database configured)",
2365                position.id
2366            );
2367        }
2368
2369        // Ok(())
2370        todo!()
2371    }
2372
2373    /// Gets the OMS type for the `position_id`.
2374    #[must_use]
2375    pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2376        // Get OMS type from the index
2377        if self.index.position_strategy.contains_key(position_id) {
2378            // For now, we'll default to NETTING
2379            // TODO: Store and retrieve actual OMS type per position
2380            Some(OmsType::Netting)
2381        } else {
2382            None
2383        }
2384    }
2385
2386    /// Gets the serialized position snapshot frames for the `position_id`.
2387    ///
2388    /// Each element in the returned vector is one JSON-encoded [`Position`] snapshot,
2389    /// in the order they were taken.
2390    #[must_use]
2391    pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<Vec<u8>>> {
2392        self.position_snapshots
2393            .get(position_id)
2394            .map(|frames| frames.iter().map(|b| b.to_vec()).collect())
2395    }
2396
2397    /// Returns the number of stored snapshot frames for the `position_id`.
2398    ///
2399    /// Returns `0` when no frames are stored. Does not allocate or copy frame bytes.
2400    #[must_use]
2401    pub fn position_snapshot_count(&self, position_id: &PositionId) -> usize {
2402        self.position_snapshots.get(position_id).map_or(0, Vec::len)
2403    }
2404
2405    /// Returns all position snapshots with the given optional filters.
2406    ///
2407    /// When `position_id` is `Some`, only snapshots for that position are returned.
2408    /// When `account_id` is `Some`, snapshots are filtered to that account.
2409    /// Frames that fail to deserialize are skipped with a warning.
2410    #[must_use]
2411    pub fn position_snapshots(
2412        &self,
2413        position_id: Option<&PositionId>,
2414        account_id: Option<&AccountId>,
2415    ) -> Vec<Position> {
2416        let frames: Box<dyn Iterator<Item = &Bytes> + '_> = match position_id {
2417            Some(pid) => match self.position_snapshots.get(pid) {
2418                Some(v) => Box::new(v.iter()),
2419                None => Box::new(std::iter::empty()),
2420            },
2421            None => Box::new(self.position_snapshots.values().flat_map(|v| v.iter())),
2422        };
2423
2424        let mut results: Vec<Position> = frames
2425            .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
2426                Ok(position) => Some(position),
2427                Err(e) => {
2428                    log::warn!("Failed to decode position snapshot: {e}");
2429                    None
2430                }
2431            })
2432            .collect();
2433
2434        if let Some(aid) = account_id {
2435            results.retain(|p| p.account_id == *aid);
2436        }
2437
2438        results
2439    }
2440
2441    /// Returns position snapshots for `position_id` starting from the `skip`th frame.
2442    ///
2443    /// Use this to deserialize only newly appended snapshots when the caller already
2444    /// processed earlier frames. Returns an empty vector when no frames or fewer than
2445    /// `skip` frames are stored. Frames that fail to deserialize are skipped with a warning.
2446    #[must_use]
2447    pub fn position_snapshots_from(&self, position_id: &PositionId, skip: usize) -> Vec<Position> {
2448        let Some(frames) = self.position_snapshots.get(position_id) else {
2449            return Vec::new();
2450        };
2451
2452        frames
2453            .iter()
2454            .skip(skip)
2455            .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
2456                Ok(position) => Some(position),
2457                Err(e) => {
2458                    log::warn!("Failed to decode position snapshot: {e}");
2459                    None
2460                }
2461            })
2462            .collect()
2463    }
2464
2465    /// Gets position snapshot IDs for the `instrument_id`.
2466    #[must_use]
2467    pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2468        // Get snapshot position IDs that match the instrument
2469        let mut result = AHashSet::new();
2470
2471        for (position_id, _) in &self.position_snapshots {
2472            // Check if this position is for the requested instrument
2473            if let Some(position) = self.positions.get(position_id)
2474                && position.instrument_id == *instrument_id
2475            {
2476                result.insert(*position_id);
2477            }
2478        }
2479        result
2480    }
2481
2482    /// Snapshots the `order` state in the database.
2483    ///
2484    /// # Errors
2485    ///
2486    /// Returns an error if snapshotting the order state fails.
2487    pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2488        let database = if let Some(database) = &self.database {
2489            database
2490        } else {
2491            log::warn!(
2492                "Cannot snapshot order state for {} (no database configured)",
2493                order.client_order_id()
2494            );
2495            return Ok(());
2496        };
2497
2498        database.snapshot_order_state(order)
2499    }
2500
2501    // -- IDENTIFIER QUERIES ----------------------------------------------------------------------
2502
2503    fn build_order_query_filter_set(
2504        &self,
2505        venue: Option<&Venue>,
2506        instrument_id: Option<&InstrumentId>,
2507        strategy_id: Option<&StrategyId>,
2508        account_id: Option<&AccountId>,
2509    ) -> Option<AHashSet<ClientOrderId>> {
2510        let mut query: Option<AHashSet<ClientOrderId>> = None;
2511
2512        if let Some(venue) = venue {
2513            query = Some(
2514                self.index
2515                    .venue_orders
2516                    .get(venue)
2517                    .cloned()
2518                    .unwrap_or_default(),
2519            );
2520        }
2521
2522        if let Some(instrument_id) = instrument_id {
2523            let instrument_orders = self
2524                .index
2525                .instrument_orders
2526                .get(instrument_id)
2527                .cloned()
2528                .unwrap_or_default();
2529
2530            if let Some(existing_query) = &mut query {
2531                *existing_query = existing_query
2532                    .intersection(&instrument_orders)
2533                    .copied()
2534                    .collect();
2535            } else {
2536                query = Some(instrument_orders);
2537            }
2538        }
2539
2540        if let Some(strategy_id) = strategy_id {
2541            let strategy_orders = self
2542                .index
2543                .strategy_orders
2544                .get(strategy_id)
2545                .cloned()
2546                .unwrap_or_default();
2547
2548            if let Some(existing_query) = &mut query {
2549                *existing_query = existing_query
2550                    .intersection(&strategy_orders)
2551                    .copied()
2552                    .collect();
2553            } else {
2554                query = Some(strategy_orders);
2555            }
2556        }
2557
2558        if let Some(account_id) = account_id {
2559            let account_orders = self
2560                .index
2561                .account_orders
2562                .get(account_id)
2563                .cloned()
2564                .unwrap_or_default();
2565
2566            if let Some(existing_query) = &mut query {
2567                *existing_query = existing_query
2568                    .intersection(&account_orders)
2569                    .copied()
2570                    .collect();
2571            } else {
2572                query = Some(account_orders);
2573            }
2574        }
2575
2576        query
2577    }
2578
2579    fn build_position_query_filter_set(
2580        &self,
2581        venue: Option<&Venue>,
2582        instrument_id: Option<&InstrumentId>,
2583        strategy_id: Option<&StrategyId>,
2584        account_id: Option<&AccountId>,
2585    ) -> Option<AHashSet<PositionId>> {
2586        let mut query: Option<AHashSet<PositionId>> = None;
2587
2588        if let Some(venue) = venue {
2589            query = Some(
2590                self.index
2591                    .venue_positions
2592                    .get(venue)
2593                    .cloned()
2594                    .unwrap_or_default(),
2595            );
2596        }
2597
2598        if let Some(instrument_id) = instrument_id {
2599            let instrument_positions = self
2600                .index
2601                .instrument_positions
2602                .get(instrument_id)
2603                .cloned()
2604                .unwrap_or_default();
2605
2606            if let Some(existing_query) = query {
2607                query = Some(
2608                    existing_query
2609                        .intersection(&instrument_positions)
2610                        .copied()
2611                        .collect(),
2612                );
2613            } else {
2614                query = Some(instrument_positions);
2615            }
2616        }
2617
2618        if let Some(strategy_id) = strategy_id {
2619            let strategy_positions = self
2620                .index
2621                .strategy_positions
2622                .get(strategy_id)
2623                .cloned()
2624                .unwrap_or_default();
2625
2626            if let Some(existing_query) = query {
2627                query = Some(
2628                    existing_query
2629                        .intersection(&strategy_positions)
2630                        .copied()
2631                        .collect(),
2632                );
2633            } else {
2634                query = Some(strategy_positions);
2635            }
2636        }
2637
2638        if let Some(account_id) = account_id {
2639            let account_positions = self
2640                .index
2641                .account_positions
2642                .get(account_id)
2643                .cloned()
2644                .unwrap_or_default();
2645
2646            if let Some(existing_query) = query {
2647                query = Some(
2648                    existing_query
2649                        .intersection(&account_positions)
2650                        .copied()
2651                        .collect(),
2652                );
2653            } else {
2654                query = Some(account_positions);
2655            }
2656        }
2657
2658        query
2659    }
2660
2661    /// Retrieves orders corresponding to the `client_order_ids`, optionally filtering by `side`.
2662    ///
2663    /// # Panics
2664    ///
2665    /// Panics if any `client_order_id` in the set is not found in the cache.
2666    fn get_orders_for_ids(
2667        &self,
2668        client_order_ids: &AHashSet<ClientOrderId>,
2669        side: Option<OrderSide>,
2670    ) -> Vec<&OrderAny> {
2671        let side = side.unwrap_or(OrderSide::NoOrderSide);
2672        let mut orders = Vec::new();
2673
2674        for client_order_id in client_order_ids {
2675            let order = self
2676                .orders
2677                .get(client_order_id)
2678                .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2679
2680            if side == OrderSide::NoOrderSide || side == order.order_side() {
2681                orders.push(order);
2682            }
2683        }
2684
2685        // Sort so callers receive a deterministic Vec across runs; the
2686        // underlying client_order_ids set is AHash-backed.
2687        orders.sort_by_key(|o| o.client_order_id());
2688        orders
2689    }
2690
2691    /// Retrieves positions corresponding to the `position_ids`, optionally filtering by `side`.
2692    ///
2693    /// # Panics
2694    ///
2695    /// Panics if any `position_id` in the set is not found in the cache.
2696    fn get_positions_for_ids(
2697        &self,
2698        position_ids: &AHashSet<PositionId>,
2699        side: Option<PositionSide>,
2700    ) -> Vec<&Position> {
2701        let side = side.unwrap_or(PositionSide::NoPositionSide);
2702        let mut positions = Vec::new();
2703
2704        for position_id in position_ids {
2705            let position = self
2706                .positions
2707                .get(position_id)
2708                .unwrap_or_else(|| panic!("Position {position_id} not found"));
2709
2710            if side == PositionSide::NoPositionSide || side == position.side {
2711                positions.push(position);
2712            }
2713        }
2714
2715        // Sort so callers receive a deterministic Vec across runs; the
2716        // underlying position_ids set is AHash-backed.
2717        positions.sort_by_key(|p| p.id);
2718        positions
2719    }
2720
2721    /// Returns the `ClientOrderId`s of all orders.
2722    #[must_use]
2723    pub fn client_order_ids(
2724        &self,
2725        venue: Option<&Venue>,
2726        instrument_id: Option<&InstrumentId>,
2727        strategy_id: Option<&StrategyId>,
2728        account_id: Option<&AccountId>,
2729    ) -> AHashSet<ClientOrderId> {
2730        let query =
2731            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2732
2733        match query {
2734            Some(query) => self.index.orders.intersection(&query).copied().collect(),
2735            None => self.index.orders.clone(),
2736        }
2737    }
2738
2739    /// Returns the `ClientOrderId`s of all open orders.
2740    #[must_use]
2741    pub fn client_order_ids_open(
2742        &self,
2743        venue: Option<&Venue>,
2744        instrument_id: Option<&InstrumentId>,
2745        strategy_id: Option<&StrategyId>,
2746        account_id: Option<&AccountId>,
2747    ) -> AHashSet<ClientOrderId> {
2748        let query =
2749            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2750
2751        match query {
2752            Some(query) => self
2753                .index
2754                .orders_open
2755                .intersection(&query)
2756                .copied()
2757                .collect(),
2758            None => self.index.orders_open.clone(),
2759        }
2760    }
2761
2762    /// Returns the `ClientOrderId`s of all closed orders.
2763    #[must_use]
2764    pub fn client_order_ids_closed(
2765        &self,
2766        venue: Option<&Venue>,
2767        instrument_id: Option<&InstrumentId>,
2768        strategy_id: Option<&StrategyId>,
2769        account_id: Option<&AccountId>,
2770    ) -> AHashSet<ClientOrderId> {
2771        let query =
2772            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2773
2774        match query {
2775            Some(query) => self
2776                .index
2777                .orders_closed
2778                .intersection(&query)
2779                .copied()
2780                .collect(),
2781            None => self.index.orders_closed.clone(),
2782        }
2783    }
2784
2785    /// Returns the `ClientOrderId`s of all locally active orders.
2786    ///
2787    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
2788    /// (a superset of emulated orders).
2789    #[must_use]
2790    pub fn client_order_ids_active_local(
2791        &self,
2792        venue: Option<&Venue>,
2793        instrument_id: Option<&InstrumentId>,
2794        strategy_id: Option<&StrategyId>,
2795        account_id: Option<&AccountId>,
2796    ) -> AHashSet<ClientOrderId> {
2797        let query =
2798            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2799
2800        match query {
2801            Some(query) => self
2802                .index
2803                .orders_active_local
2804                .intersection(&query)
2805                .copied()
2806                .collect(),
2807            None => self.index.orders_active_local.clone(),
2808        }
2809    }
2810
2811    /// Returns the `ClientOrderId`s of all emulated orders.
2812    #[must_use]
2813    pub fn client_order_ids_emulated(
2814        &self,
2815        venue: Option<&Venue>,
2816        instrument_id: Option<&InstrumentId>,
2817        strategy_id: Option<&StrategyId>,
2818        account_id: Option<&AccountId>,
2819    ) -> AHashSet<ClientOrderId> {
2820        let query =
2821            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2822
2823        match query {
2824            Some(query) => self
2825                .index
2826                .orders_emulated
2827                .intersection(&query)
2828                .copied()
2829                .collect(),
2830            None => self.index.orders_emulated.clone(),
2831        }
2832    }
2833
2834    /// Returns the `ClientOrderId`s of all in-flight orders.
2835    #[must_use]
2836    pub fn client_order_ids_inflight(
2837        &self,
2838        venue: Option<&Venue>,
2839        instrument_id: Option<&InstrumentId>,
2840        strategy_id: Option<&StrategyId>,
2841        account_id: Option<&AccountId>,
2842    ) -> AHashSet<ClientOrderId> {
2843        let query =
2844            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2845
2846        match query {
2847            Some(query) => self
2848                .index
2849                .orders_inflight
2850                .intersection(&query)
2851                .copied()
2852                .collect(),
2853            None => self.index.orders_inflight.clone(),
2854        }
2855    }
2856
2857    /// Returns `PositionId`s of all positions.
2858    #[must_use]
2859    pub fn position_ids(
2860        &self,
2861        venue: Option<&Venue>,
2862        instrument_id: Option<&InstrumentId>,
2863        strategy_id: Option<&StrategyId>,
2864        account_id: Option<&AccountId>,
2865    ) -> AHashSet<PositionId> {
2866        let query =
2867            self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2868
2869        match query {
2870            Some(query) => self.index.positions.intersection(&query).copied().collect(),
2871            None => self.index.positions.clone(),
2872        }
2873    }
2874
2875    /// Returns the `PositionId`s of all open positions.
2876    #[must_use]
2877    pub fn position_open_ids(
2878        &self,
2879        venue: Option<&Venue>,
2880        instrument_id: Option<&InstrumentId>,
2881        strategy_id: Option<&StrategyId>,
2882        account_id: Option<&AccountId>,
2883    ) -> AHashSet<PositionId> {
2884        let query =
2885            self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2886
2887        match query {
2888            Some(query) => self
2889                .index
2890                .positions_open
2891                .intersection(&query)
2892                .copied()
2893                .collect(),
2894            None => self.index.positions_open.clone(),
2895        }
2896    }
2897
2898    /// Returns the `PositionId`s of all closed positions.
2899    #[must_use]
2900    pub fn position_closed_ids(
2901        &self,
2902        venue: Option<&Venue>,
2903        instrument_id: Option<&InstrumentId>,
2904        strategy_id: Option<&StrategyId>,
2905        account_id: Option<&AccountId>,
2906    ) -> AHashSet<PositionId> {
2907        let query =
2908            self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2909
2910        match query {
2911            Some(query) => self
2912                .index
2913                .positions_closed
2914                .intersection(&query)
2915                .copied()
2916                .collect(),
2917            None => self.index.positions_closed.clone(),
2918        }
2919    }
2920
2921    /// Returns the `ComponentId`s of all actors.
2922    #[must_use]
2923    pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2924        self.index.actors.clone()
2925    }
2926
2927    /// Returns the `StrategyId`s of all strategies.
2928    #[must_use]
2929    pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2930        self.index.strategies.clone()
2931    }
2932
2933    /// Returns the `ExecAlgorithmId`s of all execution algorithms.
2934    #[must_use]
2935    pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2936        self.index.exec_algorithms.clone()
2937    }
2938
2939    // -- ORDER QUERIES ---------------------------------------------------------------------------
2940
2941    /// Gets a reference to the order with the `client_order_id` (if found).
2942    #[must_use]
2943    pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2944        self.orders.get(client_order_id)
2945    }
2946
2947    /// Gets cloned orders for the given `client_order_ids`, logging an error for any missing.
2948    #[must_use]
2949    pub fn orders_for_ids(
2950        &self,
2951        client_order_ids: &[ClientOrderId],
2952        context: &dyn Display,
2953    ) -> Vec<OrderAny> {
2954        let mut orders = Vec::with_capacity(client_order_ids.len());
2955        for id in client_order_ids {
2956            match self.orders.get(id) {
2957                Some(order) => orders.push(order.clone()),
2958                None => log::error!("Order {id} not found in cache for {context}"),
2959            }
2960        }
2961        orders
2962    }
2963
2964    /// Gets a reference to the order with the `client_order_id` (if found).
2965    #[must_use]
2966    pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2967        self.orders.get_mut(client_order_id)
2968    }
2969
2970    /// Gets a reference to the client order ID for the `venue_order_id` (if found).
2971    #[must_use]
2972    pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2973        self.index.venue_order_ids.get(venue_order_id)
2974    }
2975
2976    /// Gets a reference to the venue order ID for the `client_order_id` (if found).
2977    #[must_use]
2978    pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2979        self.index.client_order_ids.get(client_order_id)
2980    }
2981
2982    /// Gets a reference to the client ID indexed for then `client_order_id` (if found).
2983    #[must_use]
2984    pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2985        self.index.order_client.get(client_order_id)
2986    }
2987
2988    /// Returns references to all orders matching the optional filter parameters.
2989    #[must_use]
2990    pub fn orders(
2991        &self,
2992        venue: Option<&Venue>,
2993        instrument_id: Option<&InstrumentId>,
2994        strategy_id: Option<&StrategyId>,
2995        account_id: Option<&AccountId>,
2996        side: Option<OrderSide>,
2997    ) -> Vec<&OrderAny> {
2998        let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
2999        self.get_orders_for_ids(&client_order_ids, side)
3000    }
3001
3002    /// Returns references to all open orders matching the optional filter parameters.
3003    #[must_use]
3004    pub fn orders_open(
3005        &self,
3006        venue: Option<&Venue>,
3007        instrument_id: Option<&InstrumentId>,
3008        strategy_id: Option<&StrategyId>,
3009        account_id: Option<&AccountId>,
3010        side: Option<OrderSide>,
3011    ) -> Vec<&OrderAny> {
3012        let client_order_ids =
3013            self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
3014        self.get_orders_for_ids(&client_order_ids, side)
3015    }
3016
3017    /// Returns references to all closed orders matching the optional filter parameters.
3018    #[must_use]
3019    pub fn orders_closed(
3020        &self,
3021        venue: Option<&Venue>,
3022        instrument_id: Option<&InstrumentId>,
3023        strategy_id: Option<&StrategyId>,
3024        account_id: Option<&AccountId>,
3025        side: Option<OrderSide>,
3026    ) -> Vec<&OrderAny> {
3027        let client_order_ids =
3028            self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
3029        self.get_orders_for_ids(&client_order_ids, side)
3030    }
3031
3032    /// Returns references to all locally active orders matching the optional filter parameters.
3033    ///
3034    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
3035    /// (a superset of emulated orders).
3036    #[must_use]
3037    pub fn orders_active_local(
3038        &self,
3039        venue: Option<&Venue>,
3040        instrument_id: Option<&InstrumentId>,
3041        strategy_id: Option<&StrategyId>,
3042        account_id: Option<&AccountId>,
3043        side: Option<OrderSide>,
3044    ) -> Vec<&OrderAny> {
3045        let client_order_ids =
3046            self.client_order_ids_active_local(venue, instrument_id, strategy_id, account_id);
3047        self.get_orders_for_ids(&client_order_ids, side)
3048    }
3049
3050    /// Returns references to all emulated orders matching the optional filter parameters.
3051    #[must_use]
3052    pub fn orders_emulated(
3053        &self,
3054        venue: Option<&Venue>,
3055        instrument_id: Option<&InstrumentId>,
3056        strategy_id: Option<&StrategyId>,
3057        account_id: Option<&AccountId>,
3058        side: Option<OrderSide>,
3059    ) -> Vec<&OrderAny> {
3060        let client_order_ids =
3061            self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
3062        self.get_orders_for_ids(&client_order_ids, side)
3063    }
3064
3065    /// Returns references to all in-flight orders matching the optional filter parameters.
3066    #[must_use]
3067    pub fn orders_inflight(
3068        &self,
3069        venue: Option<&Venue>,
3070        instrument_id: Option<&InstrumentId>,
3071        strategy_id: Option<&StrategyId>,
3072        account_id: Option<&AccountId>,
3073        side: Option<OrderSide>,
3074    ) -> Vec<&OrderAny> {
3075        let client_order_ids =
3076            self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
3077        self.get_orders_for_ids(&client_order_ids, side)
3078    }
3079
3080    /// Returns references to all orders for the `position_id`.
3081    #[must_use]
3082    pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
3083        let client_order_ids = self.index.position_orders.get(position_id);
3084        match client_order_ids {
3085            Some(client_order_ids) => {
3086                self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
3087            }
3088            None => Vec::new(),
3089        }
3090    }
3091
3092    /// Returns whether an order with the `client_order_id` exists.
3093    #[must_use]
3094    pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
3095        self.index.orders.contains(client_order_id)
3096    }
3097
3098    /// Returns whether an order with the `client_order_id` is open.
3099    #[must_use]
3100    pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
3101        self.index.orders_open.contains(client_order_id)
3102    }
3103
3104    /// Returns whether an order with the `client_order_id` is closed.
3105    #[must_use]
3106    pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
3107        self.index.orders_closed.contains(client_order_id)
3108    }
3109
3110    /// Returns whether an order with the `client_order_id` is locally active.
3111    ///
3112    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
3113    /// (a superset of emulated orders).
3114    #[must_use]
3115    pub fn is_order_active_local(&self, client_order_id: &ClientOrderId) -> bool {
3116        self.index.orders_active_local.contains(client_order_id)
3117    }
3118
3119    /// Returns whether an order with the `client_order_id` is emulated.
3120    #[must_use]
3121    pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
3122        self.index.orders_emulated.contains(client_order_id)
3123    }
3124
3125    /// Returns whether an order with the `client_order_id` is in-flight.
3126    #[must_use]
3127    pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
3128        self.index.orders_inflight.contains(client_order_id)
3129    }
3130
3131    /// Returns whether an order with the `client_order_id` is `PENDING_CANCEL` locally.
3132    #[must_use]
3133    pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
3134        self.index.orders_pending_cancel.contains(client_order_id)
3135    }
3136
3137    /// Returns the count of all open orders.
3138    #[must_use]
3139    pub fn orders_open_count(
3140        &self,
3141        venue: Option<&Venue>,
3142        instrument_id: Option<&InstrumentId>,
3143        strategy_id: Option<&StrategyId>,
3144        account_id: Option<&AccountId>,
3145        side: Option<OrderSide>,
3146    ) -> usize {
3147        self.orders_open(venue, instrument_id, strategy_id, account_id, side)
3148            .len()
3149    }
3150
3151    /// Returns the count of all closed orders.
3152    #[must_use]
3153    pub fn orders_closed_count(
3154        &self,
3155        venue: Option<&Venue>,
3156        instrument_id: Option<&InstrumentId>,
3157        strategy_id: Option<&StrategyId>,
3158        account_id: Option<&AccountId>,
3159        side: Option<OrderSide>,
3160    ) -> usize {
3161        self.orders_closed(venue, instrument_id, strategy_id, account_id, side)
3162            .len()
3163    }
3164
3165    /// Returns the count of all locally active orders.
3166    ///
3167    /// Locally active orders are in the `INITIALIZED`, `EMULATED`, or `RELEASED` state
3168    /// (a superset of emulated orders).
3169    #[must_use]
3170    pub fn orders_active_local_count(
3171        &self,
3172        venue: Option<&Venue>,
3173        instrument_id: Option<&InstrumentId>,
3174        strategy_id: Option<&StrategyId>,
3175        account_id: Option<&AccountId>,
3176        side: Option<OrderSide>,
3177    ) -> usize {
3178        self.orders_active_local(venue, instrument_id, strategy_id, account_id, side)
3179            .len()
3180    }
3181
3182    /// Returns the count of all emulated orders.
3183    #[must_use]
3184    pub fn orders_emulated_count(
3185        &self,
3186        venue: Option<&Venue>,
3187        instrument_id: Option<&InstrumentId>,
3188        strategy_id: Option<&StrategyId>,
3189        account_id: Option<&AccountId>,
3190        side: Option<OrderSide>,
3191    ) -> usize {
3192        self.orders_emulated(venue, instrument_id, strategy_id, account_id, side)
3193            .len()
3194    }
3195
3196    /// Returns the count of all in-flight orders.
3197    #[must_use]
3198    pub fn orders_inflight_count(
3199        &self,
3200        venue: Option<&Venue>,
3201        instrument_id: Option<&InstrumentId>,
3202        strategy_id: Option<&StrategyId>,
3203        account_id: Option<&AccountId>,
3204        side: Option<OrderSide>,
3205    ) -> usize {
3206        self.orders_inflight(venue, instrument_id, strategy_id, account_id, side)
3207            .len()
3208    }
3209
3210    /// Returns the count of all orders.
3211    #[must_use]
3212    pub fn orders_total_count(
3213        &self,
3214        venue: Option<&Venue>,
3215        instrument_id: Option<&InstrumentId>,
3216        strategy_id: Option<&StrategyId>,
3217        account_id: Option<&AccountId>,
3218        side: Option<OrderSide>,
3219    ) -> usize {
3220        self.orders(venue, instrument_id, strategy_id, account_id, side)
3221            .len()
3222    }
3223
3224    /// Returns the order list for the `order_list_id`.
3225    #[must_use]
3226    pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
3227        self.order_lists.get(order_list_id)
3228    }
3229
3230    /// Returns all order lists matching the optional filter parameters.
3231    #[must_use]
3232    pub fn order_lists(
3233        &self,
3234        venue: Option<&Venue>,
3235        instrument_id: Option<&InstrumentId>,
3236        strategy_id: Option<&StrategyId>,
3237        account_id: Option<&AccountId>,
3238    ) -> Vec<&OrderList> {
3239        let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
3240
3241        if let Some(venue) = venue {
3242            order_lists.retain(|ol| &ol.instrument_id.venue == venue);
3243        }
3244
3245        if let Some(instrument_id) = instrument_id {
3246            order_lists.retain(|ol| &ol.instrument_id == instrument_id);
3247        }
3248
3249        if let Some(strategy_id) = strategy_id {
3250            order_lists.retain(|ol| &ol.strategy_id == strategy_id);
3251        }
3252
3253        if let Some(account_id) = account_id {
3254            order_lists.retain(|ol| {
3255                ol.client_order_ids.iter().any(|client_order_id| {
3256                    self.orders
3257                        .get(client_order_id)
3258                        .is_some_and(|order| order.account_id().as_ref() == Some(account_id))
3259                })
3260            });
3261        }
3262
3263        order_lists
3264    }
3265
3266    /// Returns whether an order list with the `order_list_id` exists.
3267    #[must_use]
3268    pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
3269        self.order_lists.contains_key(order_list_id)
3270    }
3271
3272    // -- EXEC ALGORITHM QUERIES ------------------------------------------------------------------
3273
3274    /// Returns references to all orders associated with the `exec_algorithm_id` matching the
3275    /// optional filter parameters.
3276    #[must_use]
3277    pub fn orders_for_exec_algorithm(
3278        &self,
3279        exec_algorithm_id: &ExecAlgorithmId,
3280        venue: Option<&Venue>,
3281        instrument_id: Option<&InstrumentId>,
3282        strategy_id: Option<&StrategyId>,
3283        account_id: Option<&AccountId>,
3284        side: Option<OrderSide>,
3285    ) -> Vec<&OrderAny> {
3286        let query =
3287            self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
3288        let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
3289
3290        if let Some(query) = query
3291            && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
3292        {
3293            let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
3294        }
3295
3296        if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
3297            self.get_orders_for_ids(exec_algorithm_order_ids, side)
3298        } else {
3299            Vec::new()
3300        }
3301    }
3302
3303    /// Returns references to all orders with the `exec_spawn_id`.
3304    #[must_use]
3305    pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
3306        self.get_orders_for_ids(
3307            self.index
3308                .exec_spawn_orders
3309                .get(exec_spawn_id)
3310                .unwrap_or(&AHashSet::new()),
3311            None,
3312        )
3313    }
3314
3315    /// Returns the total order quantity for the `exec_spawn_id`.
3316    #[must_use]
3317    pub fn exec_spawn_total_quantity(
3318        &self,
3319        exec_spawn_id: &ClientOrderId,
3320        active_only: bool,
3321    ) -> Option<Quantity> {
3322        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3323
3324        let mut total_quantity: Option<Quantity> = None;
3325
3326        for spawn_order in exec_spawn_orders {
3327            if active_only && spawn_order.is_closed() {
3328                continue;
3329            }
3330
3331            match total_quantity.as_mut() {
3332                Some(total) => *total = *total + spawn_order.quantity(),
3333                None => total_quantity = Some(spawn_order.quantity()),
3334            }
3335        }
3336
3337        total_quantity
3338    }
3339
3340    /// Returns the total filled quantity for all orders with the `exec_spawn_id`.
3341    #[must_use]
3342    pub fn exec_spawn_total_filled_qty(
3343        &self,
3344        exec_spawn_id: &ClientOrderId,
3345        active_only: bool,
3346    ) -> Option<Quantity> {
3347        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3348
3349        let mut total_quantity: Option<Quantity> = None;
3350
3351        for spawn_order in exec_spawn_orders {
3352            if active_only && spawn_order.is_closed() {
3353                continue;
3354            }
3355
3356            match total_quantity.as_mut() {
3357                Some(total) => *total = *total + spawn_order.filled_qty(),
3358                None => total_quantity = Some(spawn_order.filled_qty()),
3359            }
3360        }
3361
3362        total_quantity
3363    }
3364
3365    /// Returns the total leaves quantity for all orders with the `exec_spawn_id`.
3366    #[must_use]
3367    pub fn exec_spawn_total_leaves_qty(
3368        &self,
3369        exec_spawn_id: &ClientOrderId,
3370        active_only: bool,
3371    ) -> Option<Quantity> {
3372        let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3373
3374        let mut total_quantity: Option<Quantity> = None;
3375
3376        for spawn_order in exec_spawn_orders {
3377            if active_only && spawn_order.is_closed() {
3378                continue;
3379            }
3380
3381            match total_quantity.as_mut() {
3382                Some(total) => *total = *total + spawn_order.leaves_qty(),
3383                None => total_quantity = Some(spawn_order.leaves_qty()),
3384            }
3385        }
3386
3387        total_quantity
3388    }
3389
3390    // -- POSITION QUERIES ------------------------------------------------------------------------
3391
3392    /// Returns a reference to the position with the `position_id` (if found).
3393    #[must_use]
3394    pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
3395        self.positions.get(position_id)
3396    }
3397
3398    /// Returns a reference to the position for the `client_order_id` (if found).
3399    #[must_use]
3400    pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
3401        self.index
3402            .order_position
3403            .get(client_order_id)
3404            .and_then(|position_id| self.positions.get(position_id))
3405    }
3406
3407    /// Returns a reference to the position ID for the `client_order_id` (if found).
3408    #[must_use]
3409    pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
3410        self.index.order_position.get(client_order_id)
3411    }
3412
3413    /// Returns a reference to all positions matching the optional filter parameters.
3414    #[must_use]
3415    pub fn positions(
3416        &self,
3417        venue: Option<&Venue>,
3418        instrument_id: Option<&InstrumentId>,
3419        strategy_id: Option<&StrategyId>,
3420        account_id: Option<&AccountId>,
3421        side: Option<PositionSide>,
3422    ) -> Vec<&Position> {
3423        let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
3424        self.get_positions_for_ids(&position_ids, side)
3425    }
3426
3427    /// Returns a reference to all open positions matching the optional filter parameters.
3428    #[must_use]
3429    pub fn positions_open(
3430        &self,
3431        venue: Option<&Venue>,
3432        instrument_id: Option<&InstrumentId>,
3433        strategy_id: Option<&StrategyId>,
3434        account_id: Option<&AccountId>,
3435        side: Option<PositionSide>,
3436    ) -> Vec<&Position> {
3437        let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
3438        self.get_positions_for_ids(&position_ids, side)
3439    }
3440
3441    /// Returns a reference to all closed positions matching the optional filter parameters.
3442    #[must_use]
3443    pub fn positions_closed(
3444        &self,
3445        venue: Option<&Venue>,
3446        instrument_id: Option<&InstrumentId>,
3447        strategy_id: Option<&StrategyId>,
3448        account_id: Option<&AccountId>,
3449        side: Option<PositionSide>,
3450    ) -> Vec<&Position> {
3451        let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
3452        self.get_positions_for_ids(&position_ids, side)
3453    }
3454
3455    /// Returns whether a position with the `position_id` exists.
3456    #[must_use]
3457    pub fn position_exists(&self, position_id: &PositionId) -> bool {
3458        self.index.positions.contains(position_id)
3459    }
3460
3461    /// Returns whether a position with the `position_id` is open.
3462    #[must_use]
3463    pub fn is_position_open(&self, position_id: &PositionId) -> bool {
3464        self.index.positions_open.contains(position_id)
3465    }
3466
3467    /// Returns whether a position with the `position_id` is closed.
3468    #[must_use]
3469    pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
3470        self.index.positions_closed.contains(position_id)
3471    }
3472
3473    /// Returns the count of all open positions.
3474    #[must_use]
3475    pub fn positions_open_count(
3476        &self,
3477        venue: Option<&Venue>,
3478        instrument_id: Option<&InstrumentId>,
3479        strategy_id: Option<&StrategyId>,
3480        account_id: Option<&AccountId>,
3481        side: Option<PositionSide>,
3482    ) -> usize {
3483        self.positions_open(venue, instrument_id, strategy_id, account_id, side)
3484            .len()
3485    }
3486
3487    /// Returns the count of all closed positions.
3488    #[must_use]
3489    pub fn positions_closed_count(
3490        &self,
3491        venue: Option<&Venue>,
3492        instrument_id: Option<&InstrumentId>,
3493        strategy_id: Option<&StrategyId>,
3494        account_id: Option<&AccountId>,
3495        side: Option<PositionSide>,
3496    ) -> usize {
3497        self.positions_closed(venue, instrument_id, strategy_id, account_id, side)
3498            .len()
3499    }
3500
3501    /// Returns the count of all positions.
3502    #[must_use]
3503    pub fn positions_total_count(
3504        &self,
3505        venue: Option<&Venue>,
3506        instrument_id: Option<&InstrumentId>,
3507        strategy_id: Option<&StrategyId>,
3508        account_id: Option<&AccountId>,
3509        side: Option<PositionSide>,
3510    ) -> usize {
3511        self.positions(venue, instrument_id, strategy_id, account_id, side)
3512            .len()
3513    }
3514
3515    // -- STRATEGY QUERIES ------------------------------------------------------------------------
3516
3517    /// Gets a reference to the strategy ID for the `client_order_id` (if found).
3518    #[must_use]
3519    pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
3520        self.index.order_strategy.get(client_order_id)
3521    }
3522
3523    /// Gets a reference to the strategy ID for the `position_id` (if found).
3524    #[must_use]
3525    pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
3526        self.index.position_strategy.get(position_id)
3527    }
3528
3529    // -- GENERAL ---------------------------------------------------------------------------------
3530
3531    /// Gets a reference to the general value for the `key` (if found).
3532    ///
3533    /// # Errors
3534    ///
3535    /// Returns an error if the `key` is invalid.
3536    pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
3537        check_valid_string_ascii(key, stringify!(key))?;
3538
3539        Ok(self.general.get(key))
3540    }
3541
3542    // -- DATA QUERIES ----------------------------------------------------------------------------
3543
3544    /// Returns the price for the `instrument_id` and `price_type` (if found).
3545    #[must_use]
3546    pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
3547        match price_type {
3548            PriceType::Bid => self
3549                .quotes
3550                .get(instrument_id)
3551                .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
3552            PriceType::Ask => self
3553                .quotes
3554                .get(instrument_id)
3555                .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
3556            PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
3557                quotes.front().map(|quote| {
3558                    Price::new(
3559                        f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
3560                        quote.bid_price.precision + 1,
3561                    )
3562                })
3563            }),
3564            PriceType::Last => self
3565                .trades
3566                .get(instrument_id)
3567                .and_then(|trades| trades.front().map(|trade| trade.price)),
3568            PriceType::Mark => self
3569                .mark_prices
3570                .get(instrument_id)
3571                .and_then(|marks| marks.front().map(|mark| mark.value)),
3572        }
3573    }
3574
3575    /// Gets all quotes for the `instrument_id`.
3576    #[must_use]
3577    pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
3578        self.quotes
3579            .get(instrument_id)
3580            .map(|quotes| quotes.iter().copied().collect())
3581    }
3582
3583    /// Gets all trades for the `instrument_id`.
3584    #[must_use]
3585    pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
3586        self.trades
3587            .get(instrument_id)
3588            .map(|trades| trades.iter().copied().collect())
3589    }
3590
3591    /// Gets all mark price updates for the `instrument_id`.
3592    #[must_use]
3593    pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
3594        self.mark_prices
3595            .get(instrument_id)
3596            .map(|mark_prices| mark_prices.iter().copied().collect())
3597    }
3598
3599    /// Gets all index price updates for the `instrument_id`.
3600    #[must_use]
3601    pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
3602        self.index_prices
3603            .get(instrument_id)
3604            .map(|index_prices| index_prices.iter().copied().collect())
3605    }
3606
3607    /// Gets all funding rate updates for the `instrument_id`.
3608    #[must_use]
3609    pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
3610        self.funding_rates
3611            .get(instrument_id)
3612            .map(|funding_rates| funding_rates.iter().copied().collect())
3613    }
3614
3615    /// Gets all instrument status updates for the `instrument_id`.
3616    #[must_use]
3617    pub fn instrument_statuses(
3618        &self,
3619        instrument_id: &InstrumentId,
3620    ) -> Option<Vec<InstrumentStatus>> {
3621        self.instrument_statuses
3622            .get(instrument_id)
3623            .map(|statuses| statuses.iter().copied().collect())
3624    }
3625
3626    /// Gets all bars for the `bar_type`.
3627    #[must_use]
3628    pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
3629        self.bars
3630            .get(bar_type)
3631            .map(|bars| bars.iter().copied().collect())
3632    }
3633
3634    /// Gets a reference to the order book for the `instrument_id`.
3635    #[must_use]
3636    pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3637        self.books.get(instrument_id)
3638    }
3639
3640    /// Gets a reference to the order book for the `instrument_id`.
3641    #[must_use]
3642    pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3643        self.books.get_mut(instrument_id)
3644    }
3645
3646    /// Gets a reference to the own order book for the `instrument_id`.
3647    #[must_use]
3648    pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3649        self.own_books.get(instrument_id)
3650    }
3651
3652    /// Gets a reference to the own order book for the `instrument_id`.
3653    #[must_use]
3654    pub fn own_order_book_mut(
3655        &mut self,
3656        instrument_id: &InstrumentId,
3657    ) -> Option<&mut OwnOrderBook> {
3658        self.own_books.get_mut(instrument_id)
3659    }
3660
3661    /// Gets a reference to the latest quote for the `instrument_id`.
3662    #[must_use]
3663    pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3664        self.quotes
3665            .get(instrument_id)
3666            .and_then(|quotes| quotes.front())
3667    }
3668
3669    /// Gets a reference to the quote at `index` for the `instrument_id`.
3670    ///
3671    /// Index 0 is the most recent.
3672    #[must_use]
3673    pub fn quote_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&QuoteTick> {
3674        self.quotes
3675            .get(instrument_id)
3676            .and_then(|quotes| quotes.get(index))
3677    }
3678
3679    /// Gets a reference to the latest trade for the `instrument_id`.
3680    #[must_use]
3681    pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3682        self.trades
3683            .get(instrument_id)
3684            .and_then(|trades| trades.front())
3685    }
3686
3687    /// Gets a reference to the trade at `index` for the `instrument_id`.
3688    ///
3689    /// Index 0 is the most recent.
3690    #[must_use]
3691    pub fn trade_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&TradeTick> {
3692        self.trades
3693            .get(instrument_id)
3694            .and_then(|trades| trades.get(index))
3695    }
3696
3697    /// Gets a reference to the latest mark price update for the `instrument_id`.
3698    #[must_use]
3699    pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3700        self.mark_prices
3701            .get(instrument_id)
3702            .and_then(|mark_prices| mark_prices.front())
3703    }
3704
3705    /// Gets a reference to the latest index price update for the `instrument_id`.
3706    #[must_use]
3707    pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3708        self.index_prices
3709            .get(instrument_id)
3710            .and_then(|index_prices| index_prices.front())
3711    }
3712
3713    /// Gets a reference to the latest funding rate update for the `instrument_id`.
3714    #[must_use]
3715    pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3716        self.funding_rates
3717            .get(instrument_id)
3718            .and_then(|funding_rates| funding_rates.front())
3719    }
3720
3721    /// Gets a reference to the latest instrument status update for the `instrument_id`.
3722    #[must_use]
3723    pub fn instrument_status(&self, instrument_id: &InstrumentId) -> Option<&InstrumentStatus> {
3724        self.instrument_statuses
3725            .get(instrument_id)
3726            .and_then(|statuses| statuses.front())
3727    }
3728
3729    /// Gets a reference to the latest bar for the `bar_type`.
3730    #[must_use]
3731    pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3732        self.bars.get(bar_type).and_then(|bars| bars.front())
3733    }
3734
3735    /// Gets a reference to the bar at `index` for the `bar_type`.
3736    ///
3737    /// Index 0 is the most recent.
3738    #[must_use]
3739    pub fn bar_at_index(&self, bar_type: &BarType, index: usize) -> Option<&Bar> {
3740        self.bars.get(bar_type).and_then(|bars| bars.get(index))
3741    }
3742
3743    /// Gets the order book update count for the `instrument_id`.
3744    #[must_use]
3745    pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3746        self.books
3747            .get(instrument_id)
3748            .map_or(0, |book| book.update_count) as usize
3749    }
3750
3751    /// Gets the quote tick count for the `instrument_id`.
3752    #[must_use]
3753    pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3754        self.quotes
3755            .get(instrument_id)
3756            .map_or(0, std::collections::VecDeque::len)
3757    }
3758
3759    /// Gets the trade tick count for the `instrument_id`.
3760    #[must_use]
3761    pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3762        self.trades
3763            .get(instrument_id)
3764            .map_or(0, std::collections::VecDeque::len)
3765    }
3766
3767    /// Gets the bar count for the `instrument_id`.
3768    #[must_use]
3769    pub fn bar_count(&self, bar_type: &BarType) -> usize {
3770        self.bars
3771            .get(bar_type)
3772            .map_or(0, std::collections::VecDeque::len)
3773    }
3774
3775    /// Returns whether the cache contains an order book for the `instrument_id`.
3776    #[must_use]
3777    pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3778        self.books.contains_key(instrument_id)
3779    }
3780
3781    /// Returns whether the cache contains quotes for the `instrument_id`.
3782    #[must_use]
3783    pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3784        self.quote_count(instrument_id) > 0
3785    }
3786
3787    /// Returns whether the cache contains trades for the `instrument_id`.
3788    #[must_use]
3789    pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3790        self.trade_count(instrument_id) > 0
3791    }
3792
3793    /// Returns whether the cache contains bars for the `bar_type`.
3794    #[must_use]
3795    pub fn has_bars(&self, bar_type: &BarType) -> bool {
3796        self.bar_count(bar_type) > 0
3797    }
3798
3799    #[must_use]
3800    pub fn get_xrate(
3801        &self,
3802        venue: Venue,
3803        from_currency: Currency,
3804        to_currency: Currency,
3805        price_type: PriceType,
3806    ) -> Option<f64> {
3807        if from_currency == to_currency {
3808            // When the source and target currencies are identical,
3809            // no conversion is needed; return an exchange rate of 1.0.
3810            return Some(1.0);
3811        }
3812
3813        let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3814
3815        match get_exchange_rate(
3816            from_currency.code,
3817            to_currency.code,
3818            price_type,
3819            bid_quote,
3820            ask_quote,
3821        ) {
3822            Ok(rate) => rate,
3823            Err(e) => {
3824                log::error!("Failed to calculate xrate: {e}");
3825                None
3826            }
3827        }
3828    }
3829
3830    fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3831        let mut bid_quotes = AHashMap::new();
3832        let mut ask_quotes = AHashMap::new();
3833
3834        for instrument_id in self.instruments.keys() {
3835            if instrument_id.venue != *venue {
3836                continue;
3837            }
3838
3839            let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3840                if let Some(tick) = ticks.front() {
3841                    (tick.bid_price, tick.ask_price)
3842                } else {
3843                    continue; // Empty ticks vector
3844                }
3845            } else {
3846                let bid_bar = self
3847                    .bars
3848                    .iter()
3849                    .find(|(k, _)| {
3850                        k.instrument_id() == *instrument_id
3851                            && matches!(k.spec().price_type, PriceType::Bid)
3852                    })
3853                    .map(|(_, v)| v);
3854
3855                let ask_bar = self
3856                    .bars
3857                    .iter()
3858                    .find(|(k, _)| {
3859                        k.instrument_id() == *instrument_id
3860                            && matches!(k.spec().price_type, PriceType::Ask)
3861                    })
3862                    .map(|(_, v)| v);
3863
3864                match (bid_bar, ask_bar) {
3865                    (Some(bid), Some(ask)) => {
3866                        match (bid.front(), ask.front()) {
3867                            (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3868                            _ => {
3869                                // Empty bar VecDeques
3870                                continue;
3871                            }
3872                        }
3873                    }
3874                    _ => continue,
3875                }
3876            };
3877
3878            bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3879            ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3880        }
3881
3882        (bid_quotes, ask_quotes)
3883    }
3884
3885    /// Returns the mark exchange rate for the given currency pair, or `None` if not set.
3886    #[must_use]
3887    pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3888        self.mark_xrates.get(&(from_currency, to_currency)).copied()
3889    }
3890
3891    /// Sets the mark exchange rate for the given currency pair and automatically sets the inverse rate.
3892    ///
3893    /// # Panics
3894    ///
3895    /// Panics if `xrate` is not positive.
3896    pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3897        assert!(xrate > 0.0, "xrate was zero");
3898        self.mark_xrates.insert((from_currency, to_currency), xrate);
3899        self.mark_xrates
3900            .insert((to_currency, from_currency), 1.0 / xrate);
3901    }
3902
3903    /// Clears the mark exchange rate for the given currency pair.
3904    pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3905        let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3906    }
3907
3908    /// Clears all mark exchange rates.
3909    pub fn clear_mark_xrates(&mut self) {
3910        self.mark_xrates.clear();
3911    }
3912
3913    // -- INSTRUMENT QUERIES ----------------------------------------------------------------------
3914
3915    /// Returns a reference to the instrument for the `instrument_id` (if found).
3916    #[must_use]
3917    pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3918        self.instruments.get(instrument_id)
3919    }
3920
3921    /// Returns references to all instrument IDs for the `venue`.
3922    #[must_use]
3923    pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3924        match venue {
3925            Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3926            None => self.instruments.keys().collect(),
3927        }
3928    }
3929
3930    /// Returns references to all instruments for the `venue`.
3931    #[must_use]
3932    pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3933        self.instruments
3934            .values()
3935            .filter(|i| &i.id().venue == venue)
3936            .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3937            .collect()
3938    }
3939
3940    /// Returns references to all bar types contained in the cache.
3941    #[must_use]
3942    pub fn bar_types(
3943        &self,
3944        instrument_id: Option<&InstrumentId>,
3945        price_type: Option<&PriceType>,
3946        aggregation_source: AggregationSource,
3947    ) -> Vec<&BarType> {
3948        let mut bar_types = self
3949            .bars
3950            .keys()
3951            .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3952            .collect::<Vec<&BarType>>();
3953
3954        if let Some(instrument_id) = instrument_id {
3955            bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3956        }
3957
3958        if let Some(price_type) = price_type {
3959            bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3960        }
3961
3962        bar_types
3963    }
3964
3965    // -- SYNTHETIC QUERIES -----------------------------------------------------------------------
3966
3967    /// Returns a reference to the synthetic instrument for the `instrument_id` (if found).
3968    #[must_use]
3969    pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3970        self.synthetics.get(instrument_id)
3971    }
3972
3973    /// Returns references to instrument IDs for all synthetic instruments contained in the cache.
3974    #[must_use]
3975    pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3976        self.synthetics.keys().collect()
3977    }
3978
3979    /// Returns references to all synthetic instruments contained in the cache.
3980    #[must_use]
3981    pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3982        self.synthetics.values().collect()
3983    }
3984
3985    // -- ACCOUNT QUERIES -----------------------------------------------------------------------
3986
3987    /// Returns a reference to the account for the `account_id` (if found).
3988    #[must_use]
3989    pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3990        self.accounts.get(account_id)
3991    }
3992
3993    /// Returns a reference to the account for the `venue` (if found).
3994    #[must_use]
3995    pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3996        self.index
3997            .venue_account
3998            .get(venue)
3999            .and_then(|account_id| self.accounts.get(account_id))
4000    }
4001
4002    /// Returns a reference to the account ID for the `venue` (if found).
4003    #[must_use]
4004    pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
4005        self.index.venue_account.get(venue)
4006    }
4007
4008    /// Returns references to all accounts for the `account_id`.
4009    #[must_use]
4010    pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
4011        self.accounts
4012            .values()
4013            .filter(|account| &account.id() == account_id)
4014            .collect()
4015    }
4016
4017    /// Updates the own order book with an order.
4018    ///
4019    /// This method adds, updates, or removes an order from the own order book
4020    /// based on the order's current state.
4021    ///
4022    /// Orders without prices (MARKET, etc.) are skipped as they cannot be
4023    /// represented in own books.
4024    pub fn update_own_order_book(&mut self, order: &OrderAny) {
4025        if !order.has_price() {
4026            return;
4027        }
4028
4029        let instrument_id = order.instrument_id();
4030
4031        let own_book = self
4032            .own_books
4033            .entry(instrument_id)
4034            .or_insert_with(|| OwnOrderBook::new(instrument_id));
4035
4036        let own_book_order = order.to_own_book_order();
4037
4038        if order.is_closed() {
4039            if let Err(e) = own_book.delete(own_book_order) {
4040                log::debug!(
4041                    "Failed to delete order {} from own book: {e}",
4042                    order.client_order_id(),
4043                );
4044            } else {
4045                log::debug!("Deleted order {} from own book", order.client_order_id());
4046            }
4047        } else {
4048            // Add or update the order in the own book
4049            if let Err(e) = own_book.update(own_book_order) {
4050                log::debug!(
4051                    "Failed to update order {} in own book: {e}; inserting instead",
4052                    order.client_order_id(),
4053                );
4054                own_book.add(own_book_order);
4055            }
4056            log::debug!("Updated order {} in own book", order.client_order_id());
4057        }
4058    }
4059
4060    /// Force removal of an order from own order books and clean up all indexes.
4061    ///
4062    /// This method is used when order event application fails and we need to ensure
4063    /// terminal orders are properly cleaned up from own books and all relevant indexes.
4064    /// Replicates the index cleanup that update_order performs for closed orders.
4065    pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
4066        let order = match self.orders.get(client_order_id) {
4067            Some(order) => order,
4068            None => return,
4069        };
4070
4071        self.index.orders_open.remove(client_order_id);
4072        self.index.orders_pending_cancel.remove(client_order_id);
4073        self.index.orders_inflight.remove(client_order_id);
4074        self.index.orders_emulated.remove(client_order_id);
4075        self.index.orders_active_local.remove(client_order_id);
4076
4077        if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
4078            && order.has_price()
4079        {
4080            let own_book_order = order.to_own_book_order();
4081            if let Err(e) = own_book.delete(own_book_order) {
4082                log::debug!("Could not force delete {client_order_id} from own book: {e}");
4083            } else {
4084                log::debug!("Force deleted {client_order_id} from own book");
4085            }
4086        }
4087
4088        self.index.orders_closed.insert(*client_order_id);
4089    }
4090
4091    /// Audit all own order books against open and inflight order indexes.
4092    ///
4093    /// Ensures closed orders are removed from own order books. This includes both
4094    /// orders tracked in `orders_open` (ACCEPTED, TRIGGERED, PENDING_*, PARTIALLY_FILLED)
4095    /// and `orders_inflight` (INITIALIZED, SUBMITTED) to prevent false positives
4096    /// during venue latency windows.
4097    pub fn audit_own_order_books(&mut self) {
4098        log::debug!("Starting own books audit");
4099        let start = std::time::Instant::now();
4100
4101        // Build union of open and inflight orders for audit,
4102        // this prevents false positives for SUBMITTED orders during venue latency.
4103        let valid_order_ids: AHashSet<ClientOrderId> = self
4104            .index
4105            .orders_open
4106            .union(&self.index.orders_inflight)
4107            .copied()
4108            .collect();
4109
4110        for own_book in self.own_books.values_mut() {
4111            own_book.audit_open_orders(&valid_order_ids);
4112        }
4113
4114        log::debug!("Completed own books audit in {:?}", start.elapsed());
4115    }
4116}