1pub mod config;
21pub mod database;
22pub mod fifo;
23pub mod quote;
24
25mod index;
26
27#[cfg(test)]
28mod tests;
29
30use std::{
31 collections::VecDeque,
32 fmt::{Debug, Display},
33 time::{SystemTime, UNIX_EPOCH},
34};
35
36use ahash::{AHashMap, AHashSet};
37use bytes::Bytes;
38pub use config::CacheConfig; use database::{CacheDatabaseAdapter, CacheMap};
40use index::CacheIndex;
41use nautilus_core::{
42 UUID4, UnixNanos,
43 correctness::{
44 check_key_not_in_map, check_predicate_false, check_slice_not_empty,
45 check_valid_string_ascii,
46 },
47 datetime::secs_to_nanos_unchecked,
48};
49use nautilus_model::{
50 accounts::{Account, AccountAny},
51 data::{
52 Bar, BarType, FundingRateUpdate, GreeksData, IndexPriceUpdate, InstrumentStatus,
53 MarkPriceUpdate, QuoteTick, TradeTick, YieldCurveData, option_chain::OptionGreeks,
54 },
55 enums::{
56 AggregationSource, ContingencyType, OmsType, OrderSide, PositionSide, PriceType,
57 TriggerType,
58 },
59 identifiers::{
60 AccountId, ClientId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
61 OrderListId, PositionId, StrategyId, Venue, VenueOrderId,
62 },
63 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
64 orderbook::{
65 OrderBook,
66 own::{OwnOrderBook, should_handle_own_book_order},
67 },
68 orders::{Order, OrderAny, OrderList},
69 position::Position,
70 types::{Currency, Money, Price, Quantity},
71};
72use ustr::Ustr;
73
74use crate::xrate::get_exchange_rate;
75
76#[cfg_attr(
78 feature = "python",
79 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", unsendable)
80)]
81pub struct Cache {
82 config: CacheConfig,
83 index: CacheIndex,
84 database: Option<Box<dyn CacheDatabaseAdapter>>,
85 general: AHashMap<String, Bytes>,
86 currencies: AHashMap<Ustr, Currency>,
87 instruments: AHashMap<InstrumentId, InstrumentAny>,
88 synthetics: AHashMap<InstrumentId, SyntheticInstrument>,
89 books: AHashMap<InstrumentId, OrderBook>,
90 own_books: AHashMap<InstrumentId, OwnOrderBook>,
91 quotes: AHashMap<InstrumentId, VecDeque<QuoteTick>>,
92 trades: AHashMap<InstrumentId, VecDeque<TradeTick>>,
93 mark_xrates: AHashMap<(Currency, Currency), f64>,
94 mark_prices: AHashMap<InstrumentId, VecDeque<MarkPriceUpdate>>,
95 index_prices: AHashMap<InstrumentId, VecDeque<IndexPriceUpdate>>,
96 funding_rates: AHashMap<InstrumentId, VecDeque<FundingRateUpdate>>,
97 instrument_statuses: AHashMap<InstrumentId, VecDeque<InstrumentStatus>>,
98 bars: AHashMap<BarType, VecDeque<Bar>>,
99 greeks: AHashMap<InstrumentId, GreeksData>,
100 option_greeks: AHashMap<InstrumentId, OptionGreeks>,
101 yield_curves: AHashMap<String, YieldCurveData>,
102 accounts: AHashMap<AccountId, AccountAny>,
103 orders: AHashMap<ClientOrderId, OrderAny>,
104 order_lists: AHashMap<OrderListId, OrderList>,
105 positions: AHashMap<PositionId, Position>,
106 position_snapshots: AHashMap<PositionId, Vec<Bytes>>,
107 #[cfg(feature = "defi")]
108 pub(crate) defi: crate::defi::cache::DefiCache,
109}
110
111impl Debug for Cache {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct(stringify!(Cache))
114 .field("config", &self.config)
115 .field("index", &self.index)
116 .field("general", &self.general)
117 .field("currencies", &self.currencies)
118 .field("instruments", &self.instruments)
119 .field("synthetics", &self.synthetics)
120 .field("books", &self.books)
121 .field("own_books", &self.own_books)
122 .field("quotes", &self.quotes)
123 .field("trades", &self.trades)
124 .field("mark_xrates", &self.mark_xrates)
125 .field("mark_prices", &self.mark_prices)
126 .field("index_prices", &self.index_prices)
127 .field("funding_rates", &self.funding_rates)
128 .field("instrument_statuses", &self.instrument_statuses)
129 .field("bars", &self.bars)
130 .field("greeks", &self.greeks)
131 .field("option_greeks", &self.option_greeks)
132 .field("yield_curves", &self.yield_curves)
133 .field("accounts", &self.accounts)
134 .field("orders", &self.orders)
135 .field("order_lists", &self.order_lists)
136 .field("positions", &self.positions)
137 .field("position_snapshots", &self.position_snapshots)
138 .finish()
139 }
140}
141
142impl Default for Cache {
143 fn default() -> Self {
145 Self::new(Some(CacheConfig::default()), None)
146 }
147}
148
149impl Cache {
150 #[must_use]
152 pub fn new(
156 config: Option<CacheConfig>,
157 database: Option<Box<dyn CacheDatabaseAdapter>>,
158 ) -> Self {
159 Self {
160 config: config.unwrap_or_default(),
161 index: CacheIndex::default(),
162 database,
163 general: AHashMap::new(),
164 currencies: AHashMap::new(),
165 instruments: AHashMap::new(),
166 synthetics: AHashMap::new(),
167 books: AHashMap::new(),
168 own_books: AHashMap::new(),
169 quotes: AHashMap::new(),
170 trades: AHashMap::new(),
171 mark_xrates: AHashMap::new(),
172 mark_prices: AHashMap::new(),
173 index_prices: AHashMap::new(),
174 funding_rates: AHashMap::new(),
175 instrument_statuses: AHashMap::new(),
176 bars: AHashMap::new(),
177 greeks: AHashMap::new(),
178 option_greeks: AHashMap::new(),
179 yield_curves: AHashMap::new(),
180 accounts: AHashMap::new(),
181 orders: AHashMap::new(),
182 order_lists: AHashMap::new(),
183 positions: AHashMap::new(),
184 position_snapshots: AHashMap::new(),
185 #[cfg(feature = "defi")]
186 defi: crate::defi::cache::DefiCache::default(),
187 }
188 }
189
190 #[must_use]
192 pub fn memory_address(&self) -> String {
193 format!("{:?}", std::ptr::from_ref(self))
194 }
195
196 pub fn set_database(&mut self, database: Box<dyn CacheDatabaseAdapter>) {
200 let type_name = std::any::type_name_of_val(&*database);
201 log::info!("Cache database adapter set: {type_name}");
202 self.database = Some(database);
203 }
204
205 pub fn cache_general(&mut self) -> anyhow::Result<()> {
213 self.general = match &mut self.database {
214 Some(db) => db.load()?,
215 None => AHashMap::new(),
216 };
217
218 log::info!(
219 "Cached {} general object(s) from database",
220 self.general.len()
221 );
222 Ok(())
223 }
224
225 pub async fn cache_all(&mut self) -> anyhow::Result<()> {
231 let cache_map = match &self.database {
232 Some(db) => db.load_all().await?,
233 None => CacheMap::default(),
234 };
235
236 self.currencies = cache_map.currencies;
237 self.instruments = cache_map.instruments;
238 self.synthetics = cache_map.synthetics;
239 self.accounts = cache_map.accounts;
240 self.orders = cache_map.orders;
241 self.positions = cache_map.positions;
242
243 self.assign_position_ids_to_contingencies();
244 Ok(())
245 }
246
247 pub async fn cache_currencies(&mut self) -> anyhow::Result<()> {
253 self.currencies = match &mut self.database {
254 Some(db) => db.load_currencies().await?,
255 None => AHashMap::new(),
256 };
257
258 log::info!("Cached {} currencies from database", self.general.len());
259 Ok(())
260 }
261
262 pub async fn cache_instruments(&mut self) -> anyhow::Result<()> {
268 self.instruments = match &mut self.database {
269 Some(db) => db.load_instruments().await?,
270 None => AHashMap::new(),
271 };
272
273 log::info!("Cached {} instruments from database", self.general.len());
274 Ok(())
275 }
276
277 pub async fn cache_synthetics(&mut self) -> anyhow::Result<()> {
283 self.synthetics = match &mut self.database {
284 Some(db) => db.load_synthetics().await?,
285 None => AHashMap::new(),
286 };
287
288 log::info!(
289 "Cached {} synthetic instruments from database",
290 self.general.len()
291 );
292 Ok(())
293 }
294
295 pub async fn cache_accounts(&mut self) -> anyhow::Result<()> {
301 self.accounts = match &mut self.database {
302 Some(db) => db.load_accounts().await?,
303 None => AHashMap::new(),
304 };
305
306 log::info!(
307 "Cached {} synthetic instruments from database",
308 self.general.len()
309 );
310 Ok(())
311 }
312
313 pub async fn cache_orders(&mut self) -> anyhow::Result<()> {
319 self.orders = match &mut self.database {
320 Some(db) => db.load_orders().await?,
321 None => AHashMap::new(),
322 };
323
324 log::info!("Cached {} orders from database", self.general.len());
325
326 self.assign_position_ids_to_contingencies();
327 Ok(())
328 }
329
330 pub async fn cache_positions(&mut self) -> anyhow::Result<()> {
336 self.positions = match &mut self.database {
337 Some(db) => db.load_positions().await?,
338 None => AHashMap::new(),
339 };
340
341 log::info!("Cached {} positions from database", self.general.len());
342 Ok(())
343 }
344
345 pub fn build_index(&mut self) {
347 log::debug!("Building index");
348
349 for account_id in self.accounts.keys() {
351 self.index
352 .venue_account
353 .insert(account_id.get_issuer(), *account_id);
354 }
355
356 for (client_order_id, order) in &self.orders {
358 let instrument_id = order.instrument_id();
359 let venue = instrument_id.venue;
360 let strategy_id = order.strategy_id();
361
362 self.index
364 .venue_orders
365 .entry(venue)
366 .or_default()
367 .insert(*client_order_id);
368
369 if let Some(venue_order_id) = order.venue_order_id() {
371 self.index
372 .venue_order_ids
373 .insert(venue_order_id, *client_order_id);
374 }
375
376 if let Some(position_id) = order.position_id() {
378 self.index
379 .order_position
380 .insert(*client_order_id, position_id);
381 }
382
383 self.index
385 .order_strategy
386 .insert(*client_order_id, order.strategy_id());
387
388 self.index
390 .instrument_orders
391 .entry(instrument_id)
392 .or_default()
393 .insert(*client_order_id);
394
395 self.index
397 .strategy_orders
398 .entry(strategy_id)
399 .or_default()
400 .insert(*client_order_id);
401
402 if let Some(account_id) = order.account_id() {
404 self.index
405 .account_orders
406 .entry(account_id)
407 .or_default()
408 .insert(*client_order_id);
409 }
410
411 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
413 self.index
414 .exec_algorithm_orders
415 .entry(exec_algorithm_id)
416 .or_default()
417 .insert(*client_order_id);
418 }
419
420 if let Some(exec_spawn_id) = order.exec_spawn_id() {
422 self.index
423 .exec_spawn_orders
424 .entry(exec_spawn_id)
425 .or_default()
426 .insert(*client_order_id);
427 }
428
429 self.index.orders.insert(*client_order_id);
431
432 if order.is_active_local() {
434 self.index.orders_active_local.insert(*client_order_id);
435 }
436
437 if order.is_open() {
439 self.index.orders_open.insert(*client_order_id);
440 }
441
442 if order.is_closed() {
444 self.index.orders_closed.insert(*client_order_id);
445 }
446
447 if let Some(emulation_trigger) = order.emulation_trigger()
449 && emulation_trigger != TriggerType::NoTrigger
450 && !order.is_closed()
451 {
452 self.index.orders_emulated.insert(*client_order_id);
453 }
454
455 if order.is_inflight() {
457 self.index.orders_inflight.insert(*client_order_id);
458 }
459
460 self.index.strategies.insert(strategy_id);
462
463 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
465 self.index.exec_algorithms.insert(exec_algorithm_id);
466 }
467 }
468
469 for (position_id, position) in &self.positions {
471 let instrument_id = position.instrument_id;
472 let venue = instrument_id.venue;
473 let strategy_id = position.strategy_id;
474
475 self.index
477 .venue_positions
478 .entry(venue)
479 .or_default()
480 .insert(*position_id);
481
482 self.index
484 .position_strategy
485 .insert(*position_id, position.strategy_id);
486
487 self.index
489 .position_orders
490 .entry(*position_id)
491 .or_default()
492 .extend(position.client_order_ids());
493
494 self.index
496 .instrument_positions
497 .entry(instrument_id)
498 .or_default()
499 .insert(*position_id);
500
501 self.index
503 .strategy_positions
504 .entry(strategy_id)
505 .or_default()
506 .insert(*position_id);
507
508 self.index
510 .account_positions
511 .entry(position.account_id)
512 .or_default()
513 .insert(*position_id);
514
515 self.index.positions.insert(*position_id);
517
518 if position.is_open() {
520 self.index.positions_open.insert(*position_id);
521 }
522
523 if position.is_closed() {
525 self.index.positions_closed.insert(*position_id);
526 }
527
528 self.index.strategies.insert(strategy_id);
530 }
531 }
532
533 #[must_use]
535 pub const fn has_backing(&self) -> bool {
536 self.config.database.is_some()
537 }
538
539 #[must_use]
541 pub fn calculate_unrealized_pnl(&self, position: &Position) -> Option<Money> {
542 let quote = if let Some(quote) = self.quote(&position.instrument_id) {
543 quote
544 } else {
545 log::warn!(
546 "Cannot calculate unrealized PnL for {}, no quotes for {}",
547 position.id,
548 position.instrument_id
549 );
550 return None;
551 };
552
553 let last = match position.side {
555 PositionSide::Flat | PositionSide::NoPositionSide => {
556 return Some(Money::new(0.0, position.settlement_currency));
557 }
558 PositionSide::Long => quote.bid_price,
559 PositionSide::Short => quote.ask_price,
560 };
561
562 Some(position.unrealized_pnl(last))
563 }
564
565 #[must_use]
574 pub fn check_integrity(&mut self) -> bool {
575 let mut error_count = 0;
576 let failure = "Integrity failure";
577
578 let timestamp_us = SystemTime::now()
580 .duration_since(UNIX_EPOCH)
581 .expect("Time went backwards")
582 .as_micros();
583
584 log::info!("Checking data integrity");
585
586 for account_id in self.accounts.keys() {
588 if !self
589 .index
590 .venue_account
591 .contains_key(&account_id.get_issuer())
592 {
593 log::error!(
594 "{failure} in accounts: {account_id} not found in `self.index.venue_account`",
595 );
596 error_count += 1;
597 }
598 }
599
600 for (client_order_id, order) in &self.orders {
601 if !self.index.order_strategy.contains_key(client_order_id) {
602 log::error!(
603 "{failure} in orders: {client_order_id} not found in `self.index.order_strategy`"
604 );
605 error_count += 1;
606 }
607
608 if !self.index.orders.contains(client_order_id) {
609 log::error!(
610 "{failure} in orders: {client_order_id} not found in `self.index.orders`",
611 );
612 error_count += 1;
613 }
614
615 if order.is_inflight() && !self.index.orders_inflight.contains(client_order_id) {
616 log::error!(
617 "{failure} in orders: {client_order_id} not found in `self.index.orders_inflight`",
618 );
619 error_count += 1;
620 }
621
622 if order.is_active_local() && !self.index.orders_active_local.contains(client_order_id)
623 {
624 log::error!(
625 "{failure} in orders: {client_order_id} not found in `self.index.orders_active_local`",
626 );
627 error_count += 1;
628 }
629
630 if order.is_open() && !self.index.orders_open.contains(client_order_id) {
631 log::error!(
632 "{failure} in orders: {client_order_id} not found in `self.index.orders_open`",
633 );
634 error_count += 1;
635 }
636
637 if order.is_closed() && !self.index.orders_closed.contains(client_order_id) {
638 log::error!(
639 "{failure} in orders: {client_order_id} not found in `self.index.orders_closed`",
640 );
641 error_count += 1;
642 }
643
644 if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
645 if !self
646 .index
647 .exec_algorithm_orders
648 .contains_key(&exec_algorithm_id)
649 {
650 log::error!(
651 "{failure} in orders: {client_order_id} not found in `self.index.exec_algorithm_orders`",
652 );
653 error_count += 1;
654 }
655
656 if order.exec_spawn_id().is_none()
657 && !self.index.exec_spawn_orders.contains_key(client_order_id)
658 {
659 log::error!(
660 "{failure} in orders: {client_order_id} not found in `self.index.exec_spawn_orders`",
661 );
662 error_count += 1;
663 }
664 }
665 }
666
667 for (position_id, position) in &self.positions {
668 if !self.index.position_strategy.contains_key(position_id) {
669 log::error!(
670 "{failure} in positions: {position_id} not found in `self.index.position_strategy`",
671 );
672 error_count += 1;
673 }
674
675 if !self.index.position_orders.contains_key(position_id) {
676 log::error!(
677 "{failure} in positions: {position_id} not found in `self.index.position_orders`",
678 );
679 error_count += 1;
680 }
681
682 if !self.index.positions.contains(position_id) {
683 log::error!(
684 "{failure} in positions: {position_id} not found in `self.index.positions`",
685 );
686 error_count += 1;
687 }
688
689 if position.is_open() && !self.index.positions_open.contains(position_id) {
690 log::error!(
691 "{failure} in positions: {position_id} not found in `self.index.positions_open`",
692 );
693 error_count += 1;
694 }
695
696 if position.is_closed() && !self.index.positions_closed.contains(position_id) {
697 log::error!(
698 "{failure} in positions: {position_id} not found in `self.index.positions_closed`",
699 );
700 error_count += 1;
701 }
702 }
703
704 for account_id in self.index.venue_account.values() {
706 if !self.accounts.contains_key(account_id) {
707 log::error!(
708 "{failure} in `index.venue_account`: {account_id} not found in `self.accounts`",
709 );
710 error_count += 1;
711 }
712 }
713
714 for client_order_id in self.index.venue_order_ids.values() {
715 if !self.orders.contains_key(client_order_id) {
716 log::error!(
717 "{failure} in `index.venue_order_ids`: {client_order_id} not found in `self.orders`",
718 );
719 error_count += 1;
720 }
721 }
722
723 for client_order_id in self.index.client_order_ids.keys() {
724 if !self.orders.contains_key(client_order_id) {
725 log::error!(
726 "{failure} in `index.client_order_ids`: {client_order_id} not found in `self.orders`",
727 );
728 error_count += 1;
729 }
730 }
731
732 for client_order_id in self.index.order_position.keys() {
733 if !self.orders.contains_key(client_order_id) {
734 log::error!(
735 "{failure} in `index.order_position`: {client_order_id} not found in `self.orders`",
736 );
737 error_count += 1;
738 }
739 }
740
741 for client_order_id in self.index.order_strategy.keys() {
743 if !self.orders.contains_key(client_order_id) {
744 log::error!(
745 "{failure} in `index.order_strategy`: {client_order_id} not found in `self.orders`",
746 );
747 error_count += 1;
748 }
749 }
750
751 for position_id in self.index.position_strategy.keys() {
752 if !self.positions.contains_key(position_id) {
753 log::error!(
754 "{failure} in `index.position_strategy`: {position_id} not found in `self.positions`",
755 );
756 error_count += 1;
757 }
758 }
759
760 for position_id in self.index.position_orders.keys() {
761 if !self.positions.contains_key(position_id) {
762 log::error!(
763 "{failure} in `index.position_orders`: {position_id} not found in `self.positions`",
764 );
765 error_count += 1;
766 }
767 }
768
769 for (instrument_id, client_order_ids) in &self.index.instrument_orders {
770 for client_order_id in client_order_ids {
771 if !self.orders.contains_key(client_order_id) {
772 log::error!(
773 "{failure} in `index.instrument_orders`: {instrument_id} not found in `self.orders`",
774 );
775 error_count += 1;
776 }
777 }
778 }
779
780 for instrument_id in self.index.instrument_positions.keys() {
781 if !self.index.instrument_orders.contains_key(instrument_id) {
782 log::error!(
783 "{failure} in `index.instrument_positions`: {instrument_id} not found in `index.instrument_orders`",
784 );
785 error_count += 1;
786 }
787 }
788
789 for client_order_ids in self.index.strategy_orders.values() {
790 for client_order_id in client_order_ids {
791 if !self.orders.contains_key(client_order_id) {
792 log::error!(
793 "{failure} in `index.strategy_orders`: {client_order_id} not found in `self.orders`",
794 );
795 error_count += 1;
796 }
797 }
798 }
799
800 for position_ids in self.index.strategy_positions.values() {
801 for position_id in position_ids {
802 if !self.positions.contains_key(position_id) {
803 log::error!(
804 "{failure} in `index.strategy_positions`: {position_id} not found in `self.positions`",
805 );
806 error_count += 1;
807 }
808 }
809 }
810
811 for client_order_id in &self.index.orders {
812 if !self.orders.contains_key(client_order_id) {
813 log::error!(
814 "{failure} in `index.orders`: {client_order_id} not found in `self.orders`",
815 );
816 error_count += 1;
817 }
818 }
819
820 for client_order_id in &self.index.orders_emulated {
821 if !self.orders.contains_key(client_order_id) {
822 log::error!(
823 "{failure} in `index.orders_emulated`: {client_order_id} not found in `self.orders`",
824 );
825 error_count += 1;
826 }
827 }
828
829 for client_order_id in &self.index.orders_active_local {
830 if !self.orders.contains_key(client_order_id) {
831 log::error!(
832 "{failure} in `index.orders_active_local`: {client_order_id} not found in `self.orders`",
833 );
834 error_count += 1;
835 }
836 }
837
838 for client_order_id in &self.index.orders_inflight {
839 if !self.orders.contains_key(client_order_id) {
840 log::error!(
841 "{failure} in `index.orders_inflight`: {client_order_id} not found in `self.orders`",
842 );
843 error_count += 1;
844 }
845 }
846
847 for client_order_id in &self.index.orders_open {
848 if !self.orders.contains_key(client_order_id) {
849 log::error!(
850 "{failure} in `index.orders_open`: {client_order_id} not found in `self.orders`",
851 );
852 error_count += 1;
853 }
854 }
855
856 for client_order_id in &self.index.orders_closed {
857 if !self.orders.contains_key(client_order_id) {
858 log::error!(
859 "{failure} in `index.orders_closed`: {client_order_id} not found in `self.orders`",
860 );
861 error_count += 1;
862 }
863 }
864
865 for position_id in &self.index.positions {
866 if !self.positions.contains_key(position_id) {
867 log::error!(
868 "{failure} in `index.positions`: {position_id} not found in `self.positions`",
869 );
870 error_count += 1;
871 }
872 }
873
874 for position_id in &self.index.positions_open {
875 if !self.positions.contains_key(position_id) {
876 log::error!(
877 "{failure} in `index.positions_open`: {position_id} not found in `self.positions`",
878 );
879 error_count += 1;
880 }
881 }
882
883 for position_id in &self.index.positions_closed {
884 if !self.positions.contains_key(position_id) {
885 log::error!(
886 "{failure} in `index.positions_closed`: {position_id} not found in `self.positions`",
887 );
888 error_count += 1;
889 }
890 }
891
892 for strategy_id in &self.index.strategies {
893 if !self.index.strategy_orders.contains_key(strategy_id) {
894 log::error!(
895 "{failure} in `index.strategies`: {strategy_id} not found in `index.strategy_orders`",
896 );
897 error_count += 1;
898 }
899 }
900
901 for exec_algorithm_id in &self.index.exec_algorithms {
902 if !self
903 .index
904 .exec_algorithm_orders
905 .contains_key(exec_algorithm_id)
906 {
907 log::error!(
908 "{failure} in `index.exec_algorithms`: {exec_algorithm_id} not found in `index.exec_algorithm_orders`",
909 );
910 error_count += 1;
911 }
912 }
913
914 let total_us = SystemTime::now()
915 .duration_since(UNIX_EPOCH)
916 .expect("Time went backwards")
917 .as_micros()
918 - timestamp_us;
919
920 if error_count == 0 {
921 log::info!("Integrity check passed in {total_us}μs");
922 true
923 } else {
924 log::error!(
925 "Integrity check failed with {error_count} error{} in {total_us}μs",
926 if error_count == 1 { "" } else { "s" },
927 );
928 false
929 }
930 }
931
932 #[must_use]
936 pub fn check_residuals(&self) -> bool {
937 log::debug!("Checking residuals");
938
939 let mut residuals = false;
940
941 for order in self.orders_open(None, None, None, None, None) {
943 residuals = true;
944 log::warn!("Residual {order}");
945 }
946
947 for position in self.positions_open(None, None, None, None, None) {
949 residuals = true;
950 log::warn!("Residual {position}");
951 }
952
953 residuals
954 }
955
956 pub fn purge_closed_orders(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
962 log::debug!(
963 "Purging closed orders{}",
964 if buffer_secs > 0 {
965 format!(" with buffer_secs={buffer_secs}")
966 } else {
967 String::new()
968 }
969 );
970
971 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
972
973 let mut affected_order_list_ids: AHashSet<OrderListId> = AHashSet::new();
974
975 'outer: for client_order_id in self.index.orders_closed.clone() {
976 if let Some(order) = self.orders.get(&client_order_id)
977 && order.is_closed()
978 && let Some(ts_closed) = order.ts_closed()
979 && ts_closed + buffer_ns <= ts_now
980 {
981 if let Some(linked_order_ids) = order.linked_order_ids() {
983 for linked_order_id in linked_order_ids {
984 if let Some(linked_order) = self.orders.get(linked_order_id)
985 && linked_order.is_open()
986 {
987 continue 'outer;
989 }
990 }
991 }
992
993 if let Some(order_list_id) = order.order_list_id() {
994 affected_order_list_ids.insert(order_list_id);
995 }
996
997 self.purge_order(client_order_id);
998 }
999 }
1000
1001 for order_list_id in affected_order_list_ids {
1002 if let Some(order_list) = self.order_lists.get(&order_list_id) {
1003 let all_purged = order_list
1004 .client_order_ids
1005 .iter()
1006 .all(|id| !self.orders.contains_key(id));
1007
1008 if all_purged {
1009 self.order_lists.remove(&order_list_id);
1010 log::info!("Purged {order_list_id}");
1011 }
1012 }
1013 }
1014 }
1015
1016 pub fn purge_closed_positions(&mut self, ts_now: UnixNanos, buffer_secs: u64) {
1018 log::debug!(
1019 "Purging closed positions{}",
1020 if buffer_secs > 0 {
1021 format!(" with buffer_secs={buffer_secs}")
1022 } else {
1023 String::new()
1024 }
1025 );
1026
1027 let buffer_ns = secs_to_nanos_unchecked(buffer_secs as f64);
1028
1029 for position_id in self.index.positions_closed.clone() {
1030 if let Some(position) = self.positions.get(&position_id)
1031 && position.is_closed()
1032 && let Some(ts_closed) = position.ts_closed
1033 && ts_closed + buffer_ns <= ts_now
1034 {
1035 self.purge_position(position_id);
1036 }
1037 }
1038 }
1039
1040 pub fn purge_order(&mut self, client_order_id: ClientOrderId) {
1044 let order = self.orders.get(&client_order_id).cloned();
1046
1047 if let Some(ref ord) = order
1049 && ord.is_open()
1050 {
1051 log::warn!("Order {client_order_id} found open when purging, skipping purge");
1052 return;
1053 }
1054
1055 if let Some(ref ord) = order {
1057 self.orders.remove(&client_order_id);
1059
1060 if let Some(venue_orders) = self.index.venue_orders.get_mut(&ord.instrument_id().venue)
1062 {
1063 venue_orders.remove(&client_order_id);
1064 if venue_orders.is_empty() {
1065 self.index.venue_orders.remove(&ord.instrument_id().venue);
1066 }
1067 }
1068
1069 if let Some(venue_order_id) = ord.venue_order_id() {
1071 self.index.venue_order_ids.remove(&venue_order_id);
1072 }
1073
1074 if let Some(instrument_orders) =
1076 self.index.instrument_orders.get_mut(&ord.instrument_id())
1077 {
1078 instrument_orders.remove(&client_order_id);
1079 if instrument_orders.is_empty() {
1080 self.index.instrument_orders.remove(&ord.instrument_id());
1081 }
1082 }
1083
1084 if let Some(position_id) = ord.position_id()
1086 && let Some(position_orders) = self.index.position_orders.get_mut(&position_id)
1087 {
1088 position_orders.remove(&client_order_id);
1089 if position_orders.is_empty() {
1090 self.index.position_orders.remove(&position_id);
1091 }
1092 }
1093
1094 if let Some(exec_algorithm_id) = ord.exec_algorithm_id()
1096 && let Some(exec_algorithm_orders) =
1097 self.index.exec_algorithm_orders.get_mut(&exec_algorithm_id)
1098 {
1099 exec_algorithm_orders.remove(&client_order_id);
1100 if exec_algorithm_orders.is_empty() {
1101 self.index.exec_algorithm_orders.remove(&exec_algorithm_id);
1102 }
1103 }
1104
1105 if let Some(strategy_orders) = self.index.strategy_orders.get_mut(&ord.strategy_id()) {
1107 strategy_orders.remove(&client_order_id);
1108 if strategy_orders.is_empty() {
1109 self.index.strategy_orders.remove(&ord.strategy_id());
1110 }
1111 }
1112
1113 if let Some(account_id) = ord.account_id()
1115 && let Some(account_orders) = self.index.account_orders.get_mut(&account_id)
1116 {
1117 account_orders.remove(&client_order_id);
1118 if account_orders.is_empty() {
1119 self.index.account_orders.remove(&account_id);
1120 }
1121 }
1122
1123 if let Some(exec_spawn_id) = ord.exec_spawn_id()
1125 && let Some(spawn_orders) = self.index.exec_spawn_orders.get_mut(&exec_spawn_id)
1126 {
1127 spawn_orders.remove(&client_order_id);
1128 if spawn_orders.is_empty() {
1129 self.index.exec_spawn_orders.remove(&exec_spawn_id);
1130 }
1131 }
1132
1133 log::info!("Purged order {client_order_id}");
1134 } else {
1135 log::warn!("Order {client_order_id} not found when purging");
1136 }
1137
1138 self.index.order_position.remove(&client_order_id);
1140 let strategy_id = self.index.order_strategy.remove(&client_order_id);
1141 self.index.order_client.remove(&client_order_id);
1142 self.index.client_order_ids.remove(&client_order_id);
1143
1144 if let Some(strategy_id) = strategy_id
1146 && let Some(strategy_orders) = self.index.strategy_orders.get_mut(&strategy_id)
1147 {
1148 strategy_orders.remove(&client_order_id);
1149 if strategy_orders.is_empty() {
1150 self.index.strategy_orders.remove(&strategy_id);
1151 }
1152 }
1153
1154 self.index.exec_spawn_orders.remove(&client_order_id);
1156
1157 self.index.orders.remove(&client_order_id);
1158 self.index.orders_active_local.remove(&client_order_id);
1159 self.index.orders_open.remove(&client_order_id);
1160 self.index.orders_closed.remove(&client_order_id);
1161 self.index.orders_emulated.remove(&client_order_id);
1162 self.index.orders_inflight.remove(&client_order_id);
1163 self.index.orders_pending_cancel.remove(&client_order_id);
1164 }
1165
1166 pub fn purge_position(&mut self, position_id: PositionId) {
1170 let position = self.positions.get(&position_id).cloned();
1172
1173 if let Some(ref pos) = position
1175 && pos.is_open()
1176 {
1177 log::warn!("Position {position_id} found open when purging, skipping purge");
1178 return;
1179 }
1180
1181 if let Some(ref pos) = position {
1183 self.positions.remove(&position_id);
1184
1185 if let Some(venue_positions) =
1187 self.index.venue_positions.get_mut(&pos.instrument_id.venue)
1188 {
1189 venue_positions.remove(&position_id);
1190 if venue_positions.is_empty() {
1191 self.index.venue_positions.remove(&pos.instrument_id.venue);
1192 }
1193 }
1194
1195 if let Some(instrument_positions) =
1197 self.index.instrument_positions.get_mut(&pos.instrument_id)
1198 {
1199 instrument_positions.remove(&position_id);
1200 if instrument_positions.is_empty() {
1201 self.index.instrument_positions.remove(&pos.instrument_id);
1202 }
1203 }
1204
1205 if let Some(strategy_positions) =
1207 self.index.strategy_positions.get_mut(&pos.strategy_id)
1208 {
1209 strategy_positions.remove(&position_id);
1210 if strategy_positions.is_empty() {
1211 self.index.strategy_positions.remove(&pos.strategy_id);
1212 }
1213 }
1214
1215 if let Some(account_positions) = self.index.account_positions.get_mut(&pos.account_id) {
1217 account_positions.remove(&position_id);
1218 if account_positions.is_empty() {
1219 self.index.account_positions.remove(&pos.account_id);
1220 }
1221 }
1222
1223 for client_order_id in pos.client_order_ids() {
1225 self.index.order_position.remove(&client_order_id);
1226 }
1227
1228 log::info!("Purged position {position_id}");
1229 } else {
1230 log::warn!("Position {position_id} not found when purging");
1231 }
1232
1233 self.index.position_strategy.remove(&position_id);
1235 self.index.position_orders.remove(&position_id);
1236 self.index.positions.remove(&position_id);
1237 self.index.positions_open.remove(&position_id);
1238 self.index.positions_closed.remove(&position_id);
1239
1240 self.position_snapshots.remove(&position_id);
1242 }
1243
1244 pub fn purge_account_events(&mut self, ts_now: UnixNanos, lookback_secs: u64) {
1249 log::debug!(
1250 "Purging account events{}",
1251 if lookback_secs > 0 {
1252 format!(" with lookback_secs={lookback_secs}")
1253 } else {
1254 String::new()
1255 }
1256 );
1257
1258 for account in self.accounts.values_mut() {
1259 let event_count = account.event_count();
1260 account.purge_account_events(ts_now, lookback_secs);
1261 let count_diff = event_count - account.event_count();
1262 if count_diff > 0 {
1263 log::info!(
1264 "Purged {} event(s) from account {}",
1265 count_diff,
1266 account.id()
1267 );
1268 }
1269 }
1270 }
1271
1272 pub fn clear_index(&mut self) {
1274 self.index.clear();
1275 log::debug!("Cleared index");
1276 }
1277
1278 pub fn reset(&mut self) {
1284 log::debug!("Resetting cache");
1285
1286 self.general.clear();
1287 self.books.clear();
1288 self.own_books.clear();
1289 self.quotes.clear();
1290 self.trades.clear();
1291 self.mark_xrates.clear();
1292 self.mark_prices.clear();
1293 self.index_prices.clear();
1294 self.funding_rates.clear();
1295 self.instrument_statuses.clear();
1296 self.bars.clear();
1297 self.accounts.clear();
1298 self.orders.clear();
1299 self.order_lists.clear();
1300 self.positions.clear();
1301 self.position_snapshots.clear();
1302 self.greeks.clear();
1303 self.yield_curves.clear();
1304
1305 if self.config.drop_instruments_on_reset {
1306 self.currencies.clear();
1307 self.instruments.clear();
1308 self.synthetics.clear();
1309 }
1310
1311 #[cfg(feature = "defi")]
1312 {
1313 self.defi.pools.clear();
1314 self.defi.pool_profilers.clear();
1315 }
1316
1317 self.clear_index();
1318
1319 log::info!("Reset cache");
1320 }
1321
1322 pub fn dispose(&mut self) {
1326 self.reset();
1327
1328 if let Some(database) = &mut self.database
1329 && let Err(e) = database.close()
1330 {
1331 log::error!("Failed to close database during dispose: {e}");
1332 }
1333 }
1334
1335 pub fn flush_db(&mut self) {
1339 if let Some(database) = &mut self.database
1340 && let Err(e) = database.flush()
1341 {
1342 log::error!("Failed to flush database: {e}");
1343 }
1344 }
1345
1346 pub fn add(&mut self, key: &str, value: Bytes) -> anyhow::Result<()> {
1354 check_valid_string_ascii(key, stringify!(key))?;
1355 check_predicate_false(value.is_empty(), stringify!(value))?;
1356
1357 log::debug!("Adding general {key}");
1358 self.general.insert(key.to_string(), value.clone());
1359
1360 if let Some(database) = &mut self.database {
1361 database.add(key.to_string(), value)?;
1362 }
1363 Ok(())
1364 }
1365
1366 pub fn add_order_book(&mut self, book: OrderBook) -> anyhow::Result<()> {
1372 log::debug!("Adding `OrderBook` {}", book.instrument_id);
1373
1374 if self.config.save_market_data
1375 && let Some(database) = &mut self.database
1376 {
1377 database.add_order_book(&book)?;
1378 }
1379
1380 self.books.insert(book.instrument_id, book);
1381 Ok(())
1382 }
1383
1384 pub fn add_own_order_book(&mut self, own_book: OwnOrderBook) -> anyhow::Result<()> {
1390 log::debug!("Adding `OwnOrderBook` {}", own_book.instrument_id);
1391
1392 self.own_books.insert(own_book.instrument_id, own_book);
1393 Ok(())
1394 }
1395
1396 pub fn add_mark_price(&mut self, mark_price: MarkPriceUpdate) -> anyhow::Result<()> {
1402 log::debug!("Adding `MarkPriceUpdate` for {}", mark_price.instrument_id);
1403
1404 if self.config.save_market_data {
1405 }
1407
1408 let mark_prices_deque = self
1409 .mark_prices
1410 .entry(mark_price.instrument_id)
1411 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1412 mark_prices_deque.push_front(mark_price);
1413 Ok(())
1414 }
1415
1416 pub fn add_index_price(&mut self, index_price: IndexPriceUpdate) -> anyhow::Result<()> {
1422 log::debug!(
1423 "Adding `IndexPriceUpdate` for {}",
1424 index_price.instrument_id
1425 );
1426
1427 if self.config.save_market_data {
1428 }
1430
1431 let index_prices_deque = self
1432 .index_prices
1433 .entry(index_price.instrument_id)
1434 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1435 index_prices_deque.push_front(index_price);
1436 Ok(())
1437 }
1438
1439 pub fn add_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> anyhow::Result<()> {
1445 log::debug!(
1446 "Adding `FundingRateUpdate` for {}",
1447 funding_rate.instrument_id
1448 );
1449
1450 if self.config.save_market_data {
1451 }
1453
1454 let funding_rates_deque = self
1455 .funding_rates
1456 .entry(funding_rate.instrument_id)
1457 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1458 funding_rates_deque.push_front(funding_rate);
1459 Ok(())
1460 }
1461
1462 pub fn add_funding_rates(&mut self, funding_rates: &[FundingRateUpdate]) -> anyhow::Result<()> {
1468 check_slice_not_empty(funding_rates, stringify!(funding_rates))?;
1469
1470 let instrument_id = funding_rates[0].instrument_id;
1471 log::debug!(
1472 "Adding `FundingRateUpdate`[{}] {instrument_id}",
1473 funding_rates.len()
1474 );
1475
1476 if self.config.save_market_data
1477 && let Some(database) = &mut self.database
1478 {
1479 for funding_rate in funding_rates {
1480 database.add_funding_rate(funding_rate)?;
1481 }
1482 }
1483
1484 let funding_rate_deque = self
1485 .funding_rates
1486 .entry(instrument_id)
1487 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1488
1489 for funding_rate in funding_rates {
1490 funding_rate_deque.push_front(*funding_rate);
1491 }
1492 Ok(())
1493 }
1494
1495 pub fn add_instrument_status(&mut self, status: InstrumentStatus) -> anyhow::Result<()> {
1501 log::debug!("Adding `InstrumentStatus` for {}", status.instrument_id);
1502
1503 if self.config.save_market_data {
1504 }
1506
1507 let statuses_deque = self
1508 .instrument_statuses
1509 .entry(status.instrument_id)
1510 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1511 statuses_deque.push_front(status);
1512 Ok(())
1513 }
1514
1515 pub fn add_quote(&mut self, quote: QuoteTick) -> anyhow::Result<()> {
1521 log::debug!("Adding `QuoteTick` {}", quote.instrument_id);
1522
1523 if self.config.save_market_data
1524 && let Some(database) = &mut self.database
1525 {
1526 database.add_quote("e)?;
1527 }
1528
1529 let quotes_deque = self
1530 .quotes
1531 .entry(quote.instrument_id)
1532 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1533 quotes_deque.push_front(quote);
1534 Ok(())
1535 }
1536
1537 pub fn add_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
1543 check_slice_not_empty(quotes, stringify!(quotes))?;
1544
1545 let instrument_id = quotes[0].instrument_id;
1546 log::debug!("Adding `QuoteTick`[{}] {instrument_id}", quotes.len());
1547
1548 if self.config.save_market_data
1549 && let Some(database) = &mut self.database
1550 {
1551 for quote in quotes {
1552 database.add_quote(quote)?;
1553 }
1554 }
1555
1556 let quotes_deque = self
1557 .quotes
1558 .entry(instrument_id)
1559 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1560
1561 for quote in quotes {
1562 quotes_deque.push_front(*quote);
1563 }
1564 Ok(())
1565 }
1566
1567 pub fn add_trade(&mut self, trade: TradeTick) -> anyhow::Result<()> {
1573 log::debug!("Adding `TradeTick` {}", trade.instrument_id);
1574
1575 if self.config.save_market_data
1576 && let Some(database) = &mut self.database
1577 {
1578 database.add_trade(&trade)?;
1579 }
1580
1581 let trades_deque = self
1582 .trades
1583 .entry(trade.instrument_id)
1584 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1585 trades_deque.push_front(trade);
1586 Ok(())
1587 }
1588
1589 pub fn add_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
1595 check_slice_not_empty(trades, stringify!(trades))?;
1596
1597 let instrument_id = trades[0].instrument_id;
1598 log::debug!("Adding `TradeTick`[{}] {instrument_id}", trades.len());
1599
1600 if self.config.save_market_data
1601 && let Some(database) = &mut self.database
1602 {
1603 for trade in trades {
1604 database.add_trade(trade)?;
1605 }
1606 }
1607
1608 let trades_deque = self
1609 .trades
1610 .entry(instrument_id)
1611 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1612
1613 for trade in trades {
1614 trades_deque.push_front(*trade);
1615 }
1616 Ok(())
1617 }
1618
1619 pub fn add_bar(&mut self, bar: Bar) -> anyhow::Result<()> {
1625 log::debug!("Adding `Bar` {}", bar.bar_type);
1626
1627 if self.config.save_market_data
1628 && let Some(database) = &mut self.database
1629 {
1630 database.add_bar(&bar)?;
1631 }
1632
1633 let bars = self
1634 .bars
1635 .entry(bar.bar_type)
1636 .or_insert_with(|| VecDeque::with_capacity(self.config.bar_capacity));
1637 bars.push_front(bar);
1638 Ok(())
1639 }
1640
1641 pub fn add_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
1647 check_slice_not_empty(bars, stringify!(bars))?;
1648
1649 let bar_type = bars[0].bar_type;
1650 log::debug!("Adding `Bar`[{}] {bar_type}", bars.len());
1651
1652 if self.config.save_market_data
1653 && let Some(database) = &mut self.database
1654 {
1655 for bar in bars {
1656 database.add_bar(bar)?;
1657 }
1658 }
1659
1660 let bars_deque = self
1661 .bars
1662 .entry(bar_type)
1663 .or_insert_with(|| VecDeque::with_capacity(self.config.tick_capacity));
1664
1665 for bar in bars {
1666 bars_deque.push_front(*bar);
1667 }
1668 Ok(())
1669 }
1670
1671 pub fn add_greeks(&mut self, greeks: GreeksData) -> anyhow::Result<()> {
1677 log::debug!("Adding `GreeksData` {}", greeks.instrument_id);
1678
1679 if self.config.save_market_data
1680 && let Some(_database) = &mut self.database
1681 {
1682 }
1684
1685 self.greeks.insert(greeks.instrument_id, greeks);
1686 Ok(())
1687 }
1688
1689 pub fn greeks(&self, instrument_id: &InstrumentId) -> Option<GreeksData> {
1691 self.greeks.get(instrument_id).cloned()
1692 }
1693
1694 pub fn add_option_greeks(&mut self, greeks: OptionGreeks) {
1696 log::debug!("Adding `OptionGreeks` {}", greeks.instrument_id);
1697 self.option_greeks.insert(greeks.instrument_id, greeks);
1698 }
1699
1700 #[must_use]
1702 pub fn option_greeks(&self, instrument_id: &InstrumentId) -> Option<&OptionGreeks> {
1703 self.option_greeks.get(instrument_id)
1704 }
1705
1706 pub fn add_yield_curve(&mut self, yield_curve: YieldCurveData) -> anyhow::Result<()> {
1712 log::debug!("Adding `YieldCurveData` {}", yield_curve.curve_name);
1713
1714 if self.config.save_market_data
1715 && let Some(_database) = &mut self.database
1716 {
1717 }
1719
1720 self.yield_curves
1721 .insert(yield_curve.curve_name.clone(), yield_curve);
1722 Ok(())
1723 }
1724
1725 pub fn yield_curve(&self, key: &str) -> Option<Box<dyn Fn(f64) -> f64>> {
1727 self.yield_curves.get(key).map(|curve| {
1728 let curve_clone = curve.clone();
1729 Box::new(move |expiry_in_years: f64| curve_clone.get_rate(expiry_in_years))
1730 as Box<dyn Fn(f64) -> f64>
1731 })
1732 }
1733
1734 pub fn add_currency(&mut self, currency: Currency) -> anyhow::Result<()> {
1740 if self.currencies.contains_key(¤cy.code) {
1741 return Ok(());
1742 }
1743 log::debug!("Adding `Currency` {}", currency.code);
1744
1745 if let Some(database) = &mut self.database {
1746 database.add_currency(¤cy)?;
1747 }
1748
1749 self.currencies.insert(currency.code, currency);
1750 Ok(())
1751 }
1752
1753 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
1759 log::debug!("Adding `Instrument` {}", instrument.id());
1760
1761 if let Some(base_currency) = instrument.base_currency() {
1763 self.add_currency(base_currency)?;
1764 }
1765 self.add_currency(instrument.quote_currency())?;
1766 self.add_currency(instrument.settlement_currency())?;
1767
1768 if let Some(database) = &mut self.database {
1769 database.add_instrument(&instrument)?;
1770 }
1771
1772 self.instruments.insert(instrument.id(), instrument);
1773 Ok(())
1774 }
1775
1776 pub fn add_synthetic(&mut self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
1782 log::debug!("Adding `SyntheticInstrument` {}", synthetic.id);
1783
1784 if let Some(database) = &mut self.database {
1785 database.add_synthetic(&synthetic)?;
1786 }
1787
1788 self.synthetics.insert(synthetic.id, synthetic);
1789 Ok(())
1790 }
1791
1792 pub fn add_account(&mut self, account: AccountAny) -> anyhow::Result<()> {
1798 log::debug!("Adding `Account` {}", account.id());
1799
1800 if let Some(database) = &mut self.database {
1801 database.add_account(&account)?;
1802 }
1803
1804 let account_id = account.id();
1805 self.accounts.insert(account_id, account);
1806 self.index
1807 .venue_account
1808 .insert(account_id.get_issuer(), account_id);
1809 Ok(())
1810 }
1811
1812 pub fn add_venue_order_id(
1820 &mut self,
1821 client_order_id: &ClientOrderId,
1822 venue_order_id: &VenueOrderId,
1823 overwrite: bool,
1824 ) -> anyhow::Result<()> {
1825 if let Some(existing_venue_order_id) = self.index.client_order_ids.get(client_order_id)
1826 && !overwrite
1827 && existing_venue_order_id != venue_order_id
1828 {
1829 anyhow::bail!(
1830 "Existing {existing_venue_order_id} for {client_order_id}
1831 did not match the given {venue_order_id}.
1832 If you are writing a test then try a different `venue_order_id`,
1833 otherwise this is probably a bug."
1834 );
1835 }
1836
1837 self.index
1838 .client_order_ids
1839 .insert(*client_order_id, *venue_order_id);
1840 self.index
1841 .venue_order_ids
1842 .insert(*venue_order_id, *client_order_id);
1843
1844 Ok(())
1845 }
1846
1847 pub fn add_order(
1859 &mut self,
1860 order: OrderAny,
1861 position_id: Option<PositionId>,
1862 client_id: Option<ClientId>,
1863 replace_existing: bool,
1864 ) -> anyhow::Result<()> {
1865 let instrument_id = order.instrument_id();
1866 let venue = instrument_id.venue;
1867 let client_order_id = order.client_order_id();
1868 let strategy_id = order.strategy_id();
1869 let exec_algorithm_id = order.exec_algorithm_id();
1870 let exec_spawn_id = order.exec_spawn_id();
1871
1872 if !replace_existing {
1873 check_key_not_in_map(
1874 &client_order_id,
1875 &self.orders,
1876 stringify!(client_order_id),
1877 stringify!(orders),
1878 )?;
1879 }
1880
1881 log::debug!("Adding {order:?}");
1882
1883 self.index.orders.insert(client_order_id);
1884
1885 if order.is_active_local() {
1886 self.index.orders_active_local.insert(client_order_id);
1887 }
1888 self.index
1889 .order_strategy
1890 .insert(client_order_id, strategy_id);
1891 self.index.strategies.insert(strategy_id);
1892
1893 self.index
1895 .venue_orders
1896 .entry(venue)
1897 .or_default()
1898 .insert(client_order_id);
1899
1900 self.index
1902 .instrument_orders
1903 .entry(instrument_id)
1904 .or_default()
1905 .insert(client_order_id);
1906
1907 self.index
1909 .strategy_orders
1910 .entry(strategy_id)
1911 .or_default()
1912 .insert(client_order_id);
1913
1914 if let Some(account_id) = order.account_id() {
1916 self.index
1917 .account_orders
1918 .entry(account_id)
1919 .or_default()
1920 .insert(client_order_id);
1921 }
1922
1923 if let Some(exec_algorithm_id) = exec_algorithm_id {
1925 self.index.exec_algorithms.insert(exec_algorithm_id);
1926
1927 self.index
1928 .exec_algorithm_orders
1929 .entry(exec_algorithm_id)
1930 .or_default()
1931 .insert(client_order_id);
1932 }
1933
1934 if let Some(exec_spawn_id) = exec_spawn_id {
1936 self.index
1937 .exec_spawn_orders
1938 .entry(exec_spawn_id)
1939 .or_default()
1940 .insert(client_order_id);
1941 }
1942
1943 if let Some(emulation_trigger) = order.emulation_trigger()
1945 && emulation_trigger != TriggerType::NoTrigger
1946 {
1947 self.index.orders_emulated.insert(client_order_id);
1948 }
1949
1950 if let Some(position_id) = position_id {
1952 self.add_position_id(
1953 &position_id,
1954 &order.instrument_id().venue,
1955 &client_order_id,
1956 &strategy_id,
1957 )?;
1958 }
1959
1960 if let Some(client_id) = client_id {
1962 self.index.order_client.insert(client_order_id, client_id);
1963 log::debug!("Indexed {client_id:?}");
1964 }
1965
1966 if let Some(database) = &mut self.database {
1967 database.add_order(&order, client_id)?;
1968 }
1973
1974 self.orders.insert(client_order_id, order);
1975
1976 Ok(())
1977 }
1978
1979 pub fn add_order_list(&mut self, order_list: OrderList) -> anyhow::Result<()> {
1985 let order_list_id = order_list.id;
1986 check_key_not_in_map(
1987 &order_list_id,
1988 &self.order_lists,
1989 stringify!(order_list_id),
1990 stringify!(order_lists),
1991 )?;
1992
1993 log::debug!("Adding {order_list:?}");
1994 self.order_lists.insert(order_list_id, order_list);
1995 Ok(())
1996 }
1997
1998 pub fn add_position_id(
2004 &mut self,
2005 position_id: &PositionId,
2006 venue: &Venue,
2007 client_order_id: &ClientOrderId,
2008 strategy_id: &StrategyId,
2009 ) -> anyhow::Result<()> {
2010 self.index
2011 .order_position
2012 .insert(*client_order_id, *position_id);
2013
2014 if let Some(database) = &mut self.database {
2016 database.index_order_position(*client_order_id, *position_id)?;
2017 }
2018
2019 self.index
2021 .position_strategy
2022 .insert(*position_id, *strategy_id);
2023
2024 self.index
2026 .position_orders
2027 .entry(*position_id)
2028 .or_default()
2029 .insert(*client_order_id);
2030
2031 self.index
2033 .strategy_positions
2034 .entry(*strategy_id)
2035 .or_default()
2036 .insert(*position_id);
2037
2038 self.index
2040 .venue_positions
2041 .entry(*venue)
2042 .or_default()
2043 .insert(*position_id);
2044
2045 Ok(())
2046 }
2047
2048 fn assign_position_ids_to_contingencies(&mut self) {
2057 let mut assignments: Vec<(PositionId, ClientOrderId)> = Vec::new();
2058
2059 for parent in self.orders.values() {
2060 if parent.contingency_type() != Some(ContingencyType::Oto) {
2061 continue;
2062 }
2063 let Some(parent_position_id) = parent.position_id() else {
2064 continue;
2065 };
2066 let Some(linked_order_ids) = parent.linked_order_ids() else {
2067 continue;
2068 };
2069
2070 for client_order_id in linked_order_ids {
2071 match self.orders.get(client_order_id) {
2072 None => {
2073 log::error!("Contingency order {client_order_id} not found");
2074 }
2075 Some(contingent) => {
2076 if contingent.position_id().is_none() {
2077 assignments.push((parent_position_id, *client_order_id));
2078 }
2079 }
2080 }
2081 }
2082 }
2083
2084 for (position_id, client_order_id) in assignments {
2085 let Some((venue, strategy_id)) =
2086 self.orders.get_mut(&client_order_id).map(|contingent| {
2087 contingent.set_position_id(Some(position_id));
2088 (contingent.instrument_id().venue, contingent.strategy_id())
2089 })
2090 else {
2091 continue;
2092 };
2093
2094 self.index
2101 .order_position
2102 .insert(client_order_id, position_id);
2103 self.index
2104 .position_strategy
2105 .insert(position_id, strategy_id);
2106 self.index
2107 .position_orders
2108 .entry(position_id)
2109 .or_default()
2110 .insert(client_order_id);
2111 self.index
2112 .strategy_positions
2113 .entry(strategy_id)
2114 .or_default()
2115 .insert(position_id);
2116 self.index
2117 .venue_positions
2118 .entry(venue)
2119 .or_default()
2120 .insert(position_id);
2121 }
2122 }
2123
2124 pub fn add_position(&mut self, position: &Position, _oms_type: OmsType) -> anyhow::Result<()> {
2130 self.positions.insert(position.id, position.clone());
2131 self.index.positions.insert(position.id);
2132 self.index.positions_open.insert(position.id);
2133 self.index.positions_closed.remove(&position.id); log::debug!("Adding {position}");
2136
2137 self.add_position_id(
2138 &position.id,
2139 &position.instrument_id.venue,
2140 &position.opening_order_id,
2141 &position.strategy_id,
2142 )?;
2143
2144 let venue = position.instrument_id.venue;
2145 let venue_positions = self.index.venue_positions.entry(venue).or_default();
2146 venue_positions.insert(position.id);
2147
2148 let instrument_id = position.instrument_id;
2150 let instrument_positions = self
2151 .index
2152 .instrument_positions
2153 .entry(instrument_id)
2154 .or_default();
2155 instrument_positions.insert(position.id);
2156
2157 self.index
2159 .account_positions
2160 .entry(position.account_id)
2161 .or_default()
2162 .insert(position.id);
2163
2164 if let Some(database) = &mut self.database {
2165 database.add_position(position)?;
2166 }
2175
2176 Ok(())
2177 }
2178
2179 pub fn update_account(&mut self, account: &AccountAny) -> anyhow::Result<()> {
2185 let account_id = account.id();
2186 self.accounts.insert(account_id, account.clone());
2187
2188 if let Some(database) = &mut self.database {
2189 database.update_account(account)?;
2190 }
2191 Ok(())
2192 }
2193
2194 pub fn update_order(&mut self, order: &OrderAny) -> anyhow::Result<()> {
2200 let client_order_id = order.client_order_id();
2201
2202 if order.is_active_local() {
2203 self.index.orders_active_local.insert(client_order_id);
2204 } else {
2205 self.index.orders_active_local.remove(&client_order_id);
2206 }
2207
2208 if let Some(venue_order_id) = order.venue_order_id() {
2210 if !self.index.venue_order_ids.contains_key(&venue_order_id) {
2213 self.add_venue_order_id(&order.client_order_id(), &venue_order_id, false)?;
2215 }
2216 }
2217
2218 if order.is_inflight() {
2220 self.index.orders_inflight.insert(client_order_id);
2221 } else {
2222 self.index.orders_inflight.remove(&client_order_id);
2223 }
2224
2225 if order.is_open() {
2227 self.index.orders_closed.remove(&client_order_id);
2228 self.index.orders_open.insert(client_order_id);
2229 } else if order.is_closed() {
2230 self.index.orders_open.remove(&client_order_id);
2231 self.index.orders_pending_cancel.remove(&client_order_id);
2232 self.index.orders_closed.insert(client_order_id);
2233 }
2234
2235 if let Some(emulation_trigger) = order.emulation_trigger()
2237 && emulation_trigger != TriggerType::NoTrigger
2238 && !order.is_closed()
2239 {
2240 self.index.orders_emulated.insert(client_order_id);
2241 } else {
2242 self.index.orders_emulated.remove(&client_order_id);
2243 }
2244
2245 if let Some(account_id) = order.account_id() {
2247 self.index
2248 .account_orders
2249 .entry(account_id)
2250 .or_default()
2251 .insert(client_order_id);
2252 }
2253
2254 if self.own_order_book(&order.instrument_id()).is_some()
2256 && should_handle_own_book_order(order)
2257 {
2258 self.update_own_order_book(order);
2259 }
2260
2261 if let Some(database) = &mut self.database {
2262 database.update_order(order.last_event())?;
2263 }
2268
2269 self.orders.insert(client_order_id, order.clone());
2271
2272 Ok(())
2273 }
2274
2275 pub fn update_order_pending_cancel_local(&mut self, order: &OrderAny) {
2277 self.index
2278 .orders_pending_cancel
2279 .insert(order.client_order_id());
2280 }
2281
2282 pub fn update_position(&mut self, position: &Position) -> anyhow::Result<()> {
2288 if position.is_open() {
2291 self.index.positions_open.insert(position.id);
2292 self.index.positions_closed.remove(&position.id);
2293 } else {
2294 self.index.positions_closed.insert(position.id);
2295 self.index.positions_open.remove(&position.id);
2296 }
2297
2298 if let Some(database) = &mut self.database {
2299 database.update_position(position)?;
2300 }
2305
2306 self.positions.insert(position.id, position.clone());
2307
2308 Ok(())
2309 }
2310
2311 pub fn snapshot_position(&mut self, position: &Position) -> anyhow::Result<()> {
2318 let position_id = position.id;
2319
2320 let mut copied_position = position.clone();
2321 let new_id = format!("{}-{}", position_id.as_str(), UUID4::new());
2322 copied_position.id = PositionId::new(new_id);
2323
2324 let position_serialized = serde_json::to_vec(&copied_position)?;
2326
2327 self.position_snapshots
2328 .entry(position_id)
2329 .or_default()
2330 .push(Bytes::from(position_serialized));
2331
2332 log::debug!("Snapshot {copied_position}");
2333 Ok(())
2334 }
2335
2336 pub fn snapshot_position_state(
2342 &mut self,
2343 position: &Position,
2344 open_only: Option<bool>,
2347 ) -> anyhow::Result<()> {
2348 let open_only = open_only.unwrap_or(true);
2349
2350 if open_only && !position.is_open() {
2351 return Ok(());
2352 }
2353
2354 if let Some(database) = &mut self.database {
2355 database.snapshot_position_state(position).map_err(|e| {
2356 log::error!(
2357 "Failed to snapshot position state for {}: {e:?}",
2358 position.id
2359 );
2360 e
2361 })?;
2362 } else {
2363 log::warn!(
2364 "Cannot snapshot position state for {} (no database configured)",
2365 position.id
2366 );
2367 }
2368
2369 todo!()
2371 }
2372
2373 #[must_use]
2375 pub fn oms_type(&self, position_id: &PositionId) -> Option<OmsType> {
2376 if self.index.position_strategy.contains_key(position_id) {
2378 Some(OmsType::Netting)
2381 } else {
2382 None
2383 }
2384 }
2385
2386 #[must_use]
2391 pub fn position_snapshot_bytes(&self, position_id: &PositionId) -> Option<Vec<Vec<u8>>> {
2392 self.position_snapshots
2393 .get(position_id)
2394 .map(|frames| frames.iter().map(|b| b.to_vec()).collect())
2395 }
2396
2397 #[must_use]
2401 pub fn position_snapshot_count(&self, position_id: &PositionId) -> usize {
2402 self.position_snapshots.get(position_id).map_or(0, Vec::len)
2403 }
2404
2405 #[must_use]
2411 pub fn position_snapshots(
2412 &self,
2413 position_id: Option<&PositionId>,
2414 account_id: Option<&AccountId>,
2415 ) -> Vec<Position> {
2416 let frames: Box<dyn Iterator<Item = &Bytes> + '_> = match position_id {
2417 Some(pid) => match self.position_snapshots.get(pid) {
2418 Some(v) => Box::new(v.iter()),
2419 None => Box::new(std::iter::empty()),
2420 },
2421 None => Box::new(self.position_snapshots.values().flat_map(|v| v.iter())),
2422 };
2423
2424 let mut results: Vec<Position> = frames
2425 .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
2426 Ok(position) => Some(position),
2427 Err(e) => {
2428 log::warn!("Failed to decode position snapshot: {e}");
2429 None
2430 }
2431 })
2432 .collect();
2433
2434 if let Some(aid) = account_id {
2435 results.retain(|p| p.account_id == *aid);
2436 }
2437
2438 results
2439 }
2440
2441 #[must_use]
2447 pub fn position_snapshots_from(&self, position_id: &PositionId, skip: usize) -> Vec<Position> {
2448 let Some(frames) = self.position_snapshots.get(position_id) else {
2449 return Vec::new();
2450 };
2451
2452 frames
2453 .iter()
2454 .skip(skip)
2455 .filter_map(|bytes| match serde_json::from_slice::<Position>(bytes) {
2456 Ok(position) => Some(position),
2457 Err(e) => {
2458 log::warn!("Failed to decode position snapshot: {e}");
2459 None
2460 }
2461 })
2462 .collect()
2463 }
2464
2465 #[must_use]
2467 pub fn position_snapshot_ids(&self, instrument_id: &InstrumentId) -> AHashSet<PositionId> {
2468 let mut result = AHashSet::new();
2470
2471 for (position_id, _) in &self.position_snapshots {
2472 if let Some(position) = self.positions.get(position_id)
2474 && position.instrument_id == *instrument_id
2475 {
2476 result.insert(*position_id);
2477 }
2478 }
2479 result
2480 }
2481
2482 pub fn snapshot_order_state(&self, order: &OrderAny) -> anyhow::Result<()> {
2488 let database = if let Some(database) = &self.database {
2489 database
2490 } else {
2491 log::warn!(
2492 "Cannot snapshot order state for {} (no database configured)",
2493 order.client_order_id()
2494 );
2495 return Ok(());
2496 };
2497
2498 database.snapshot_order_state(order)
2499 }
2500
2501 fn build_order_query_filter_set(
2504 &self,
2505 venue: Option<&Venue>,
2506 instrument_id: Option<&InstrumentId>,
2507 strategy_id: Option<&StrategyId>,
2508 account_id: Option<&AccountId>,
2509 ) -> Option<AHashSet<ClientOrderId>> {
2510 let mut query: Option<AHashSet<ClientOrderId>> = None;
2511
2512 if let Some(venue) = venue {
2513 query = Some(
2514 self.index
2515 .venue_orders
2516 .get(venue)
2517 .cloned()
2518 .unwrap_or_default(),
2519 );
2520 }
2521
2522 if let Some(instrument_id) = instrument_id {
2523 let instrument_orders = self
2524 .index
2525 .instrument_orders
2526 .get(instrument_id)
2527 .cloned()
2528 .unwrap_or_default();
2529
2530 if let Some(existing_query) = &mut query {
2531 *existing_query = existing_query
2532 .intersection(&instrument_orders)
2533 .copied()
2534 .collect();
2535 } else {
2536 query = Some(instrument_orders);
2537 }
2538 }
2539
2540 if let Some(strategy_id) = strategy_id {
2541 let strategy_orders = self
2542 .index
2543 .strategy_orders
2544 .get(strategy_id)
2545 .cloned()
2546 .unwrap_or_default();
2547
2548 if let Some(existing_query) = &mut query {
2549 *existing_query = existing_query
2550 .intersection(&strategy_orders)
2551 .copied()
2552 .collect();
2553 } else {
2554 query = Some(strategy_orders);
2555 }
2556 }
2557
2558 if let Some(account_id) = account_id {
2559 let account_orders = self
2560 .index
2561 .account_orders
2562 .get(account_id)
2563 .cloned()
2564 .unwrap_or_default();
2565
2566 if let Some(existing_query) = &mut query {
2567 *existing_query = existing_query
2568 .intersection(&account_orders)
2569 .copied()
2570 .collect();
2571 } else {
2572 query = Some(account_orders);
2573 }
2574 }
2575
2576 query
2577 }
2578
2579 fn build_position_query_filter_set(
2580 &self,
2581 venue: Option<&Venue>,
2582 instrument_id: Option<&InstrumentId>,
2583 strategy_id: Option<&StrategyId>,
2584 account_id: Option<&AccountId>,
2585 ) -> Option<AHashSet<PositionId>> {
2586 let mut query: Option<AHashSet<PositionId>> = None;
2587
2588 if let Some(venue) = venue {
2589 query = Some(
2590 self.index
2591 .venue_positions
2592 .get(venue)
2593 .cloned()
2594 .unwrap_or_default(),
2595 );
2596 }
2597
2598 if let Some(instrument_id) = instrument_id {
2599 let instrument_positions = self
2600 .index
2601 .instrument_positions
2602 .get(instrument_id)
2603 .cloned()
2604 .unwrap_or_default();
2605
2606 if let Some(existing_query) = query {
2607 query = Some(
2608 existing_query
2609 .intersection(&instrument_positions)
2610 .copied()
2611 .collect(),
2612 );
2613 } else {
2614 query = Some(instrument_positions);
2615 }
2616 }
2617
2618 if let Some(strategy_id) = strategy_id {
2619 let strategy_positions = self
2620 .index
2621 .strategy_positions
2622 .get(strategy_id)
2623 .cloned()
2624 .unwrap_or_default();
2625
2626 if let Some(existing_query) = query {
2627 query = Some(
2628 existing_query
2629 .intersection(&strategy_positions)
2630 .copied()
2631 .collect(),
2632 );
2633 } else {
2634 query = Some(strategy_positions);
2635 }
2636 }
2637
2638 if let Some(account_id) = account_id {
2639 let account_positions = self
2640 .index
2641 .account_positions
2642 .get(account_id)
2643 .cloned()
2644 .unwrap_or_default();
2645
2646 if let Some(existing_query) = query {
2647 query = Some(
2648 existing_query
2649 .intersection(&account_positions)
2650 .copied()
2651 .collect(),
2652 );
2653 } else {
2654 query = Some(account_positions);
2655 }
2656 }
2657
2658 query
2659 }
2660
2661 fn get_orders_for_ids(
2667 &self,
2668 client_order_ids: &AHashSet<ClientOrderId>,
2669 side: Option<OrderSide>,
2670 ) -> Vec<&OrderAny> {
2671 let side = side.unwrap_or(OrderSide::NoOrderSide);
2672 let mut orders = Vec::new();
2673
2674 for client_order_id in client_order_ids {
2675 let order = self
2676 .orders
2677 .get(client_order_id)
2678 .unwrap_or_else(|| panic!("Order {client_order_id} not found"));
2679
2680 if side == OrderSide::NoOrderSide || side == order.order_side() {
2681 orders.push(order);
2682 }
2683 }
2684
2685 orders.sort_by_key(|o| o.client_order_id());
2688 orders
2689 }
2690
2691 fn get_positions_for_ids(
2697 &self,
2698 position_ids: &AHashSet<PositionId>,
2699 side: Option<PositionSide>,
2700 ) -> Vec<&Position> {
2701 let side = side.unwrap_or(PositionSide::NoPositionSide);
2702 let mut positions = Vec::new();
2703
2704 for position_id in position_ids {
2705 let position = self
2706 .positions
2707 .get(position_id)
2708 .unwrap_or_else(|| panic!("Position {position_id} not found"));
2709
2710 if side == PositionSide::NoPositionSide || side == position.side {
2711 positions.push(position);
2712 }
2713 }
2714
2715 positions.sort_by_key(|p| p.id);
2718 positions
2719 }
2720
2721 #[must_use]
2723 pub fn client_order_ids(
2724 &self,
2725 venue: Option<&Venue>,
2726 instrument_id: Option<&InstrumentId>,
2727 strategy_id: Option<&StrategyId>,
2728 account_id: Option<&AccountId>,
2729 ) -> AHashSet<ClientOrderId> {
2730 let query =
2731 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2732
2733 match query {
2734 Some(query) => self.index.orders.intersection(&query).copied().collect(),
2735 None => self.index.orders.clone(),
2736 }
2737 }
2738
2739 #[must_use]
2741 pub fn client_order_ids_open(
2742 &self,
2743 venue: Option<&Venue>,
2744 instrument_id: Option<&InstrumentId>,
2745 strategy_id: Option<&StrategyId>,
2746 account_id: Option<&AccountId>,
2747 ) -> AHashSet<ClientOrderId> {
2748 let query =
2749 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2750
2751 match query {
2752 Some(query) => self
2753 .index
2754 .orders_open
2755 .intersection(&query)
2756 .copied()
2757 .collect(),
2758 None => self.index.orders_open.clone(),
2759 }
2760 }
2761
2762 #[must_use]
2764 pub fn client_order_ids_closed(
2765 &self,
2766 venue: Option<&Venue>,
2767 instrument_id: Option<&InstrumentId>,
2768 strategy_id: Option<&StrategyId>,
2769 account_id: Option<&AccountId>,
2770 ) -> AHashSet<ClientOrderId> {
2771 let query =
2772 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2773
2774 match query {
2775 Some(query) => self
2776 .index
2777 .orders_closed
2778 .intersection(&query)
2779 .copied()
2780 .collect(),
2781 None => self.index.orders_closed.clone(),
2782 }
2783 }
2784
2785 #[must_use]
2790 pub fn client_order_ids_active_local(
2791 &self,
2792 venue: Option<&Venue>,
2793 instrument_id: Option<&InstrumentId>,
2794 strategy_id: Option<&StrategyId>,
2795 account_id: Option<&AccountId>,
2796 ) -> AHashSet<ClientOrderId> {
2797 let query =
2798 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2799
2800 match query {
2801 Some(query) => self
2802 .index
2803 .orders_active_local
2804 .intersection(&query)
2805 .copied()
2806 .collect(),
2807 None => self.index.orders_active_local.clone(),
2808 }
2809 }
2810
2811 #[must_use]
2813 pub fn client_order_ids_emulated(
2814 &self,
2815 venue: Option<&Venue>,
2816 instrument_id: Option<&InstrumentId>,
2817 strategy_id: Option<&StrategyId>,
2818 account_id: Option<&AccountId>,
2819 ) -> AHashSet<ClientOrderId> {
2820 let query =
2821 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2822
2823 match query {
2824 Some(query) => self
2825 .index
2826 .orders_emulated
2827 .intersection(&query)
2828 .copied()
2829 .collect(),
2830 None => self.index.orders_emulated.clone(),
2831 }
2832 }
2833
2834 #[must_use]
2836 pub fn client_order_ids_inflight(
2837 &self,
2838 venue: Option<&Venue>,
2839 instrument_id: Option<&InstrumentId>,
2840 strategy_id: Option<&StrategyId>,
2841 account_id: Option<&AccountId>,
2842 ) -> AHashSet<ClientOrderId> {
2843 let query =
2844 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
2845
2846 match query {
2847 Some(query) => self
2848 .index
2849 .orders_inflight
2850 .intersection(&query)
2851 .copied()
2852 .collect(),
2853 None => self.index.orders_inflight.clone(),
2854 }
2855 }
2856
2857 #[must_use]
2859 pub fn position_ids(
2860 &self,
2861 venue: Option<&Venue>,
2862 instrument_id: Option<&InstrumentId>,
2863 strategy_id: Option<&StrategyId>,
2864 account_id: Option<&AccountId>,
2865 ) -> AHashSet<PositionId> {
2866 let query =
2867 self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2868
2869 match query {
2870 Some(query) => self.index.positions.intersection(&query).copied().collect(),
2871 None => self.index.positions.clone(),
2872 }
2873 }
2874
2875 #[must_use]
2877 pub fn position_open_ids(
2878 &self,
2879 venue: Option<&Venue>,
2880 instrument_id: Option<&InstrumentId>,
2881 strategy_id: Option<&StrategyId>,
2882 account_id: Option<&AccountId>,
2883 ) -> AHashSet<PositionId> {
2884 let query =
2885 self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2886
2887 match query {
2888 Some(query) => self
2889 .index
2890 .positions_open
2891 .intersection(&query)
2892 .copied()
2893 .collect(),
2894 None => self.index.positions_open.clone(),
2895 }
2896 }
2897
2898 #[must_use]
2900 pub fn position_closed_ids(
2901 &self,
2902 venue: Option<&Venue>,
2903 instrument_id: Option<&InstrumentId>,
2904 strategy_id: Option<&StrategyId>,
2905 account_id: Option<&AccountId>,
2906 ) -> AHashSet<PositionId> {
2907 let query =
2908 self.build_position_query_filter_set(venue, instrument_id, strategy_id, account_id);
2909
2910 match query {
2911 Some(query) => self
2912 .index
2913 .positions_closed
2914 .intersection(&query)
2915 .copied()
2916 .collect(),
2917 None => self.index.positions_closed.clone(),
2918 }
2919 }
2920
2921 #[must_use]
2923 pub fn actor_ids(&self) -> AHashSet<ComponentId> {
2924 self.index.actors.clone()
2925 }
2926
2927 #[must_use]
2929 pub fn strategy_ids(&self) -> AHashSet<StrategyId> {
2930 self.index.strategies.clone()
2931 }
2932
2933 #[must_use]
2935 pub fn exec_algorithm_ids(&self) -> AHashSet<ExecAlgorithmId> {
2936 self.index.exec_algorithms.clone()
2937 }
2938
2939 #[must_use]
2943 pub fn order(&self, client_order_id: &ClientOrderId) -> Option<&OrderAny> {
2944 self.orders.get(client_order_id)
2945 }
2946
2947 #[must_use]
2949 pub fn orders_for_ids(
2950 &self,
2951 client_order_ids: &[ClientOrderId],
2952 context: &dyn Display,
2953 ) -> Vec<OrderAny> {
2954 let mut orders = Vec::with_capacity(client_order_ids.len());
2955 for id in client_order_ids {
2956 match self.orders.get(id) {
2957 Some(order) => orders.push(order.clone()),
2958 None => log::error!("Order {id} not found in cache for {context}"),
2959 }
2960 }
2961 orders
2962 }
2963
2964 #[must_use]
2966 pub fn mut_order(&mut self, client_order_id: &ClientOrderId) -> Option<&mut OrderAny> {
2967 self.orders.get_mut(client_order_id)
2968 }
2969
2970 #[must_use]
2972 pub fn client_order_id(&self, venue_order_id: &VenueOrderId) -> Option<&ClientOrderId> {
2973 self.index.venue_order_ids.get(venue_order_id)
2974 }
2975
2976 #[must_use]
2978 pub fn venue_order_id(&self, client_order_id: &ClientOrderId) -> Option<&VenueOrderId> {
2979 self.index.client_order_ids.get(client_order_id)
2980 }
2981
2982 #[must_use]
2984 pub fn client_id(&self, client_order_id: &ClientOrderId) -> Option<&ClientId> {
2985 self.index.order_client.get(client_order_id)
2986 }
2987
2988 #[must_use]
2990 pub fn orders(
2991 &self,
2992 venue: Option<&Venue>,
2993 instrument_id: Option<&InstrumentId>,
2994 strategy_id: Option<&StrategyId>,
2995 account_id: Option<&AccountId>,
2996 side: Option<OrderSide>,
2997 ) -> Vec<&OrderAny> {
2998 let client_order_ids = self.client_order_ids(venue, instrument_id, strategy_id, account_id);
2999 self.get_orders_for_ids(&client_order_ids, side)
3000 }
3001
3002 #[must_use]
3004 pub fn orders_open(
3005 &self,
3006 venue: Option<&Venue>,
3007 instrument_id: Option<&InstrumentId>,
3008 strategy_id: Option<&StrategyId>,
3009 account_id: Option<&AccountId>,
3010 side: Option<OrderSide>,
3011 ) -> Vec<&OrderAny> {
3012 let client_order_ids =
3013 self.client_order_ids_open(venue, instrument_id, strategy_id, account_id);
3014 self.get_orders_for_ids(&client_order_ids, side)
3015 }
3016
3017 #[must_use]
3019 pub fn orders_closed(
3020 &self,
3021 venue: Option<&Venue>,
3022 instrument_id: Option<&InstrumentId>,
3023 strategy_id: Option<&StrategyId>,
3024 account_id: Option<&AccountId>,
3025 side: Option<OrderSide>,
3026 ) -> Vec<&OrderAny> {
3027 let client_order_ids =
3028 self.client_order_ids_closed(venue, instrument_id, strategy_id, account_id);
3029 self.get_orders_for_ids(&client_order_ids, side)
3030 }
3031
3032 #[must_use]
3037 pub fn orders_active_local(
3038 &self,
3039 venue: Option<&Venue>,
3040 instrument_id: Option<&InstrumentId>,
3041 strategy_id: Option<&StrategyId>,
3042 account_id: Option<&AccountId>,
3043 side: Option<OrderSide>,
3044 ) -> Vec<&OrderAny> {
3045 let client_order_ids =
3046 self.client_order_ids_active_local(venue, instrument_id, strategy_id, account_id);
3047 self.get_orders_for_ids(&client_order_ids, side)
3048 }
3049
3050 #[must_use]
3052 pub fn orders_emulated(
3053 &self,
3054 venue: Option<&Venue>,
3055 instrument_id: Option<&InstrumentId>,
3056 strategy_id: Option<&StrategyId>,
3057 account_id: Option<&AccountId>,
3058 side: Option<OrderSide>,
3059 ) -> Vec<&OrderAny> {
3060 let client_order_ids =
3061 self.client_order_ids_emulated(venue, instrument_id, strategy_id, account_id);
3062 self.get_orders_for_ids(&client_order_ids, side)
3063 }
3064
3065 #[must_use]
3067 pub fn orders_inflight(
3068 &self,
3069 venue: Option<&Venue>,
3070 instrument_id: Option<&InstrumentId>,
3071 strategy_id: Option<&StrategyId>,
3072 account_id: Option<&AccountId>,
3073 side: Option<OrderSide>,
3074 ) -> Vec<&OrderAny> {
3075 let client_order_ids =
3076 self.client_order_ids_inflight(venue, instrument_id, strategy_id, account_id);
3077 self.get_orders_for_ids(&client_order_ids, side)
3078 }
3079
3080 #[must_use]
3082 pub fn orders_for_position(&self, position_id: &PositionId) -> Vec<&OrderAny> {
3083 let client_order_ids = self.index.position_orders.get(position_id);
3084 match client_order_ids {
3085 Some(client_order_ids) => {
3086 self.get_orders_for_ids(&client_order_ids.iter().copied().collect(), None)
3087 }
3088 None => Vec::new(),
3089 }
3090 }
3091
3092 #[must_use]
3094 pub fn order_exists(&self, client_order_id: &ClientOrderId) -> bool {
3095 self.index.orders.contains(client_order_id)
3096 }
3097
3098 #[must_use]
3100 pub fn is_order_open(&self, client_order_id: &ClientOrderId) -> bool {
3101 self.index.orders_open.contains(client_order_id)
3102 }
3103
3104 #[must_use]
3106 pub fn is_order_closed(&self, client_order_id: &ClientOrderId) -> bool {
3107 self.index.orders_closed.contains(client_order_id)
3108 }
3109
3110 #[must_use]
3115 pub fn is_order_active_local(&self, client_order_id: &ClientOrderId) -> bool {
3116 self.index.orders_active_local.contains(client_order_id)
3117 }
3118
3119 #[must_use]
3121 pub fn is_order_emulated(&self, client_order_id: &ClientOrderId) -> bool {
3122 self.index.orders_emulated.contains(client_order_id)
3123 }
3124
3125 #[must_use]
3127 pub fn is_order_inflight(&self, client_order_id: &ClientOrderId) -> bool {
3128 self.index.orders_inflight.contains(client_order_id)
3129 }
3130
3131 #[must_use]
3133 pub fn is_order_pending_cancel_local(&self, client_order_id: &ClientOrderId) -> bool {
3134 self.index.orders_pending_cancel.contains(client_order_id)
3135 }
3136
3137 #[must_use]
3139 pub fn orders_open_count(
3140 &self,
3141 venue: Option<&Venue>,
3142 instrument_id: Option<&InstrumentId>,
3143 strategy_id: Option<&StrategyId>,
3144 account_id: Option<&AccountId>,
3145 side: Option<OrderSide>,
3146 ) -> usize {
3147 self.orders_open(venue, instrument_id, strategy_id, account_id, side)
3148 .len()
3149 }
3150
3151 #[must_use]
3153 pub fn orders_closed_count(
3154 &self,
3155 venue: Option<&Venue>,
3156 instrument_id: Option<&InstrumentId>,
3157 strategy_id: Option<&StrategyId>,
3158 account_id: Option<&AccountId>,
3159 side: Option<OrderSide>,
3160 ) -> usize {
3161 self.orders_closed(venue, instrument_id, strategy_id, account_id, side)
3162 .len()
3163 }
3164
3165 #[must_use]
3170 pub fn orders_active_local_count(
3171 &self,
3172 venue: Option<&Venue>,
3173 instrument_id: Option<&InstrumentId>,
3174 strategy_id: Option<&StrategyId>,
3175 account_id: Option<&AccountId>,
3176 side: Option<OrderSide>,
3177 ) -> usize {
3178 self.orders_active_local(venue, instrument_id, strategy_id, account_id, side)
3179 .len()
3180 }
3181
3182 #[must_use]
3184 pub fn orders_emulated_count(
3185 &self,
3186 venue: Option<&Venue>,
3187 instrument_id: Option<&InstrumentId>,
3188 strategy_id: Option<&StrategyId>,
3189 account_id: Option<&AccountId>,
3190 side: Option<OrderSide>,
3191 ) -> usize {
3192 self.orders_emulated(venue, instrument_id, strategy_id, account_id, side)
3193 .len()
3194 }
3195
3196 #[must_use]
3198 pub fn orders_inflight_count(
3199 &self,
3200 venue: Option<&Venue>,
3201 instrument_id: Option<&InstrumentId>,
3202 strategy_id: Option<&StrategyId>,
3203 account_id: Option<&AccountId>,
3204 side: Option<OrderSide>,
3205 ) -> usize {
3206 self.orders_inflight(venue, instrument_id, strategy_id, account_id, side)
3207 .len()
3208 }
3209
3210 #[must_use]
3212 pub fn orders_total_count(
3213 &self,
3214 venue: Option<&Venue>,
3215 instrument_id: Option<&InstrumentId>,
3216 strategy_id: Option<&StrategyId>,
3217 account_id: Option<&AccountId>,
3218 side: Option<OrderSide>,
3219 ) -> usize {
3220 self.orders(venue, instrument_id, strategy_id, account_id, side)
3221 .len()
3222 }
3223
3224 #[must_use]
3226 pub fn order_list(&self, order_list_id: &OrderListId) -> Option<&OrderList> {
3227 self.order_lists.get(order_list_id)
3228 }
3229
3230 #[must_use]
3232 pub fn order_lists(
3233 &self,
3234 venue: Option<&Venue>,
3235 instrument_id: Option<&InstrumentId>,
3236 strategy_id: Option<&StrategyId>,
3237 account_id: Option<&AccountId>,
3238 ) -> Vec<&OrderList> {
3239 let mut order_lists = self.order_lists.values().collect::<Vec<&OrderList>>();
3240
3241 if let Some(venue) = venue {
3242 order_lists.retain(|ol| &ol.instrument_id.venue == venue);
3243 }
3244
3245 if let Some(instrument_id) = instrument_id {
3246 order_lists.retain(|ol| &ol.instrument_id == instrument_id);
3247 }
3248
3249 if let Some(strategy_id) = strategy_id {
3250 order_lists.retain(|ol| &ol.strategy_id == strategy_id);
3251 }
3252
3253 if let Some(account_id) = account_id {
3254 order_lists.retain(|ol| {
3255 ol.client_order_ids.iter().any(|client_order_id| {
3256 self.orders
3257 .get(client_order_id)
3258 .is_some_and(|order| order.account_id().as_ref() == Some(account_id))
3259 })
3260 });
3261 }
3262
3263 order_lists
3264 }
3265
3266 #[must_use]
3268 pub fn order_list_exists(&self, order_list_id: &OrderListId) -> bool {
3269 self.order_lists.contains_key(order_list_id)
3270 }
3271
3272 #[must_use]
3277 pub fn orders_for_exec_algorithm(
3278 &self,
3279 exec_algorithm_id: &ExecAlgorithmId,
3280 venue: Option<&Venue>,
3281 instrument_id: Option<&InstrumentId>,
3282 strategy_id: Option<&StrategyId>,
3283 account_id: Option<&AccountId>,
3284 side: Option<OrderSide>,
3285 ) -> Vec<&OrderAny> {
3286 let query =
3287 self.build_order_query_filter_set(venue, instrument_id, strategy_id, account_id);
3288 let exec_algorithm_order_ids = self.index.exec_algorithm_orders.get(exec_algorithm_id);
3289
3290 if let Some(query) = query
3291 && let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids
3292 {
3293 let _exec_algorithm_order_ids = exec_algorithm_order_ids.intersection(&query);
3294 }
3295
3296 if let Some(exec_algorithm_order_ids) = exec_algorithm_order_ids {
3297 self.get_orders_for_ids(exec_algorithm_order_ids, side)
3298 } else {
3299 Vec::new()
3300 }
3301 }
3302
3303 #[must_use]
3305 pub fn orders_for_exec_spawn(&self, exec_spawn_id: &ClientOrderId) -> Vec<&OrderAny> {
3306 self.get_orders_for_ids(
3307 self.index
3308 .exec_spawn_orders
3309 .get(exec_spawn_id)
3310 .unwrap_or(&AHashSet::new()),
3311 None,
3312 )
3313 }
3314
3315 #[must_use]
3317 pub fn exec_spawn_total_quantity(
3318 &self,
3319 exec_spawn_id: &ClientOrderId,
3320 active_only: bool,
3321 ) -> Option<Quantity> {
3322 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3323
3324 let mut total_quantity: Option<Quantity> = None;
3325
3326 for spawn_order in exec_spawn_orders {
3327 if active_only && spawn_order.is_closed() {
3328 continue;
3329 }
3330
3331 match total_quantity.as_mut() {
3332 Some(total) => *total = *total + spawn_order.quantity(),
3333 None => total_quantity = Some(spawn_order.quantity()),
3334 }
3335 }
3336
3337 total_quantity
3338 }
3339
3340 #[must_use]
3342 pub fn exec_spawn_total_filled_qty(
3343 &self,
3344 exec_spawn_id: &ClientOrderId,
3345 active_only: bool,
3346 ) -> Option<Quantity> {
3347 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3348
3349 let mut total_quantity: Option<Quantity> = None;
3350
3351 for spawn_order in exec_spawn_orders {
3352 if active_only && spawn_order.is_closed() {
3353 continue;
3354 }
3355
3356 match total_quantity.as_mut() {
3357 Some(total) => *total = *total + spawn_order.filled_qty(),
3358 None => total_quantity = Some(spawn_order.filled_qty()),
3359 }
3360 }
3361
3362 total_quantity
3363 }
3364
3365 #[must_use]
3367 pub fn exec_spawn_total_leaves_qty(
3368 &self,
3369 exec_spawn_id: &ClientOrderId,
3370 active_only: bool,
3371 ) -> Option<Quantity> {
3372 let exec_spawn_orders = self.orders_for_exec_spawn(exec_spawn_id);
3373
3374 let mut total_quantity: Option<Quantity> = None;
3375
3376 for spawn_order in exec_spawn_orders {
3377 if active_only && spawn_order.is_closed() {
3378 continue;
3379 }
3380
3381 match total_quantity.as_mut() {
3382 Some(total) => *total = *total + spawn_order.leaves_qty(),
3383 None => total_quantity = Some(spawn_order.leaves_qty()),
3384 }
3385 }
3386
3387 total_quantity
3388 }
3389
3390 #[must_use]
3394 pub fn position(&self, position_id: &PositionId) -> Option<&Position> {
3395 self.positions.get(position_id)
3396 }
3397
3398 #[must_use]
3400 pub fn position_for_order(&self, client_order_id: &ClientOrderId) -> Option<&Position> {
3401 self.index
3402 .order_position
3403 .get(client_order_id)
3404 .and_then(|position_id| self.positions.get(position_id))
3405 }
3406
3407 #[must_use]
3409 pub fn position_id(&self, client_order_id: &ClientOrderId) -> Option<&PositionId> {
3410 self.index.order_position.get(client_order_id)
3411 }
3412
3413 #[must_use]
3415 pub fn positions(
3416 &self,
3417 venue: Option<&Venue>,
3418 instrument_id: Option<&InstrumentId>,
3419 strategy_id: Option<&StrategyId>,
3420 account_id: Option<&AccountId>,
3421 side: Option<PositionSide>,
3422 ) -> Vec<&Position> {
3423 let position_ids = self.position_ids(venue, instrument_id, strategy_id, account_id);
3424 self.get_positions_for_ids(&position_ids, side)
3425 }
3426
3427 #[must_use]
3429 pub fn positions_open(
3430 &self,
3431 venue: Option<&Venue>,
3432 instrument_id: Option<&InstrumentId>,
3433 strategy_id: Option<&StrategyId>,
3434 account_id: Option<&AccountId>,
3435 side: Option<PositionSide>,
3436 ) -> Vec<&Position> {
3437 let position_ids = self.position_open_ids(venue, instrument_id, strategy_id, account_id);
3438 self.get_positions_for_ids(&position_ids, side)
3439 }
3440
3441 #[must_use]
3443 pub fn positions_closed(
3444 &self,
3445 venue: Option<&Venue>,
3446 instrument_id: Option<&InstrumentId>,
3447 strategy_id: Option<&StrategyId>,
3448 account_id: Option<&AccountId>,
3449 side: Option<PositionSide>,
3450 ) -> Vec<&Position> {
3451 let position_ids = self.position_closed_ids(venue, instrument_id, strategy_id, account_id);
3452 self.get_positions_for_ids(&position_ids, side)
3453 }
3454
3455 #[must_use]
3457 pub fn position_exists(&self, position_id: &PositionId) -> bool {
3458 self.index.positions.contains(position_id)
3459 }
3460
3461 #[must_use]
3463 pub fn is_position_open(&self, position_id: &PositionId) -> bool {
3464 self.index.positions_open.contains(position_id)
3465 }
3466
3467 #[must_use]
3469 pub fn is_position_closed(&self, position_id: &PositionId) -> bool {
3470 self.index.positions_closed.contains(position_id)
3471 }
3472
3473 #[must_use]
3475 pub fn positions_open_count(
3476 &self,
3477 venue: Option<&Venue>,
3478 instrument_id: Option<&InstrumentId>,
3479 strategy_id: Option<&StrategyId>,
3480 account_id: Option<&AccountId>,
3481 side: Option<PositionSide>,
3482 ) -> usize {
3483 self.positions_open(venue, instrument_id, strategy_id, account_id, side)
3484 .len()
3485 }
3486
3487 #[must_use]
3489 pub fn positions_closed_count(
3490 &self,
3491 venue: Option<&Venue>,
3492 instrument_id: Option<&InstrumentId>,
3493 strategy_id: Option<&StrategyId>,
3494 account_id: Option<&AccountId>,
3495 side: Option<PositionSide>,
3496 ) -> usize {
3497 self.positions_closed(venue, instrument_id, strategy_id, account_id, side)
3498 .len()
3499 }
3500
3501 #[must_use]
3503 pub fn positions_total_count(
3504 &self,
3505 venue: Option<&Venue>,
3506 instrument_id: Option<&InstrumentId>,
3507 strategy_id: Option<&StrategyId>,
3508 account_id: Option<&AccountId>,
3509 side: Option<PositionSide>,
3510 ) -> usize {
3511 self.positions(venue, instrument_id, strategy_id, account_id, side)
3512 .len()
3513 }
3514
3515 #[must_use]
3519 pub fn strategy_id_for_order(&self, client_order_id: &ClientOrderId) -> Option<&StrategyId> {
3520 self.index.order_strategy.get(client_order_id)
3521 }
3522
3523 #[must_use]
3525 pub fn strategy_id_for_position(&self, position_id: &PositionId) -> Option<&StrategyId> {
3526 self.index.position_strategy.get(position_id)
3527 }
3528
3529 pub fn get(&self, key: &str) -> anyhow::Result<Option<&Bytes>> {
3537 check_valid_string_ascii(key, stringify!(key))?;
3538
3539 Ok(self.general.get(key))
3540 }
3541
3542 #[must_use]
3546 pub fn price(&self, instrument_id: &InstrumentId, price_type: PriceType) -> Option<Price> {
3547 match price_type {
3548 PriceType::Bid => self
3549 .quotes
3550 .get(instrument_id)
3551 .and_then(|quotes| quotes.front().map(|quote| quote.bid_price)),
3552 PriceType::Ask => self
3553 .quotes
3554 .get(instrument_id)
3555 .and_then(|quotes| quotes.front().map(|quote| quote.ask_price)),
3556 PriceType::Mid => self.quotes.get(instrument_id).and_then(|quotes| {
3557 quotes.front().map(|quote| {
3558 Price::new(
3559 f64::midpoint(quote.ask_price.as_f64(), quote.bid_price.as_f64()),
3560 quote.bid_price.precision + 1,
3561 )
3562 })
3563 }),
3564 PriceType::Last => self
3565 .trades
3566 .get(instrument_id)
3567 .and_then(|trades| trades.front().map(|trade| trade.price)),
3568 PriceType::Mark => self
3569 .mark_prices
3570 .get(instrument_id)
3571 .and_then(|marks| marks.front().map(|mark| mark.value)),
3572 }
3573 }
3574
3575 #[must_use]
3577 pub fn quotes(&self, instrument_id: &InstrumentId) -> Option<Vec<QuoteTick>> {
3578 self.quotes
3579 .get(instrument_id)
3580 .map(|quotes| quotes.iter().copied().collect())
3581 }
3582
3583 #[must_use]
3585 pub fn trades(&self, instrument_id: &InstrumentId) -> Option<Vec<TradeTick>> {
3586 self.trades
3587 .get(instrument_id)
3588 .map(|trades| trades.iter().copied().collect())
3589 }
3590
3591 #[must_use]
3593 pub fn mark_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<MarkPriceUpdate>> {
3594 self.mark_prices
3595 .get(instrument_id)
3596 .map(|mark_prices| mark_prices.iter().copied().collect())
3597 }
3598
3599 #[must_use]
3601 pub fn index_prices(&self, instrument_id: &InstrumentId) -> Option<Vec<IndexPriceUpdate>> {
3602 self.index_prices
3603 .get(instrument_id)
3604 .map(|index_prices| index_prices.iter().copied().collect())
3605 }
3606
3607 #[must_use]
3609 pub fn funding_rates(&self, instrument_id: &InstrumentId) -> Option<Vec<FundingRateUpdate>> {
3610 self.funding_rates
3611 .get(instrument_id)
3612 .map(|funding_rates| funding_rates.iter().copied().collect())
3613 }
3614
3615 #[must_use]
3617 pub fn instrument_statuses(
3618 &self,
3619 instrument_id: &InstrumentId,
3620 ) -> Option<Vec<InstrumentStatus>> {
3621 self.instrument_statuses
3622 .get(instrument_id)
3623 .map(|statuses| statuses.iter().copied().collect())
3624 }
3625
3626 #[must_use]
3628 pub fn bars(&self, bar_type: &BarType) -> Option<Vec<Bar>> {
3629 self.bars
3630 .get(bar_type)
3631 .map(|bars| bars.iter().copied().collect())
3632 }
3633
3634 #[must_use]
3636 pub fn order_book(&self, instrument_id: &InstrumentId) -> Option<&OrderBook> {
3637 self.books.get(instrument_id)
3638 }
3639
3640 #[must_use]
3642 pub fn order_book_mut(&mut self, instrument_id: &InstrumentId) -> Option<&mut OrderBook> {
3643 self.books.get_mut(instrument_id)
3644 }
3645
3646 #[must_use]
3648 pub fn own_order_book(&self, instrument_id: &InstrumentId) -> Option<&OwnOrderBook> {
3649 self.own_books.get(instrument_id)
3650 }
3651
3652 #[must_use]
3654 pub fn own_order_book_mut(
3655 &mut self,
3656 instrument_id: &InstrumentId,
3657 ) -> Option<&mut OwnOrderBook> {
3658 self.own_books.get_mut(instrument_id)
3659 }
3660
3661 #[must_use]
3663 pub fn quote(&self, instrument_id: &InstrumentId) -> Option<&QuoteTick> {
3664 self.quotes
3665 .get(instrument_id)
3666 .and_then(|quotes| quotes.front())
3667 }
3668
3669 #[must_use]
3673 pub fn quote_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&QuoteTick> {
3674 self.quotes
3675 .get(instrument_id)
3676 .and_then(|quotes| quotes.get(index))
3677 }
3678
3679 #[must_use]
3681 pub fn trade(&self, instrument_id: &InstrumentId) -> Option<&TradeTick> {
3682 self.trades
3683 .get(instrument_id)
3684 .and_then(|trades| trades.front())
3685 }
3686
3687 #[must_use]
3691 pub fn trade_at_index(&self, instrument_id: &InstrumentId, index: usize) -> Option<&TradeTick> {
3692 self.trades
3693 .get(instrument_id)
3694 .and_then(|trades| trades.get(index))
3695 }
3696
3697 #[must_use]
3699 pub fn mark_price(&self, instrument_id: &InstrumentId) -> Option<&MarkPriceUpdate> {
3700 self.mark_prices
3701 .get(instrument_id)
3702 .and_then(|mark_prices| mark_prices.front())
3703 }
3704
3705 #[must_use]
3707 pub fn index_price(&self, instrument_id: &InstrumentId) -> Option<&IndexPriceUpdate> {
3708 self.index_prices
3709 .get(instrument_id)
3710 .and_then(|index_prices| index_prices.front())
3711 }
3712
3713 #[must_use]
3715 pub fn funding_rate(&self, instrument_id: &InstrumentId) -> Option<&FundingRateUpdate> {
3716 self.funding_rates
3717 .get(instrument_id)
3718 .and_then(|funding_rates| funding_rates.front())
3719 }
3720
3721 #[must_use]
3723 pub fn instrument_status(&self, instrument_id: &InstrumentId) -> Option<&InstrumentStatus> {
3724 self.instrument_statuses
3725 .get(instrument_id)
3726 .and_then(|statuses| statuses.front())
3727 }
3728
3729 #[must_use]
3731 pub fn bar(&self, bar_type: &BarType) -> Option<&Bar> {
3732 self.bars.get(bar_type).and_then(|bars| bars.front())
3733 }
3734
3735 #[must_use]
3739 pub fn bar_at_index(&self, bar_type: &BarType, index: usize) -> Option<&Bar> {
3740 self.bars.get(bar_type).and_then(|bars| bars.get(index))
3741 }
3742
3743 #[must_use]
3745 pub fn book_update_count(&self, instrument_id: &InstrumentId) -> usize {
3746 self.books
3747 .get(instrument_id)
3748 .map_or(0, |book| book.update_count) as usize
3749 }
3750
3751 #[must_use]
3753 pub fn quote_count(&self, instrument_id: &InstrumentId) -> usize {
3754 self.quotes
3755 .get(instrument_id)
3756 .map_or(0, std::collections::VecDeque::len)
3757 }
3758
3759 #[must_use]
3761 pub fn trade_count(&self, instrument_id: &InstrumentId) -> usize {
3762 self.trades
3763 .get(instrument_id)
3764 .map_or(0, std::collections::VecDeque::len)
3765 }
3766
3767 #[must_use]
3769 pub fn bar_count(&self, bar_type: &BarType) -> usize {
3770 self.bars
3771 .get(bar_type)
3772 .map_or(0, std::collections::VecDeque::len)
3773 }
3774
3775 #[must_use]
3777 pub fn has_order_book(&self, instrument_id: &InstrumentId) -> bool {
3778 self.books.contains_key(instrument_id)
3779 }
3780
3781 #[must_use]
3783 pub fn has_quote_ticks(&self, instrument_id: &InstrumentId) -> bool {
3784 self.quote_count(instrument_id) > 0
3785 }
3786
3787 #[must_use]
3789 pub fn has_trade_ticks(&self, instrument_id: &InstrumentId) -> bool {
3790 self.trade_count(instrument_id) > 0
3791 }
3792
3793 #[must_use]
3795 pub fn has_bars(&self, bar_type: &BarType) -> bool {
3796 self.bar_count(bar_type) > 0
3797 }
3798
3799 #[must_use]
3800 pub fn get_xrate(
3801 &self,
3802 venue: Venue,
3803 from_currency: Currency,
3804 to_currency: Currency,
3805 price_type: PriceType,
3806 ) -> Option<f64> {
3807 if from_currency == to_currency {
3808 return Some(1.0);
3811 }
3812
3813 let (bid_quote, ask_quote) = self.build_quote_table(&venue);
3814
3815 match get_exchange_rate(
3816 from_currency.code,
3817 to_currency.code,
3818 price_type,
3819 bid_quote,
3820 ask_quote,
3821 ) {
3822 Ok(rate) => rate,
3823 Err(e) => {
3824 log::error!("Failed to calculate xrate: {e}");
3825 None
3826 }
3827 }
3828 }
3829
3830 fn build_quote_table(&self, venue: &Venue) -> (AHashMap<String, f64>, AHashMap<String, f64>) {
3831 let mut bid_quotes = AHashMap::new();
3832 let mut ask_quotes = AHashMap::new();
3833
3834 for instrument_id in self.instruments.keys() {
3835 if instrument_id.venue != *venue {
3836 continue;
3837 }
3838
3839 let (bid_price, ask_price) = if let Some(ticks) = self.quotes.get(instrument_id) {
3840 if let Some(tick) = ticks.front() {
3841 (tick.bid_price, tick.ask_price)
3842 } else {
3843 continue; }
3845 } else {
3846 let bid_bar = self
3847 .bars
3848 .iter()
3849 .find(|(k, _)| {
3850 k.instrument_id() == *instrument_id
3851 && matches!(k.spec().price_type, PriceType::Bid)
3852 })
3853 .map(|(_, v)| v);
3854
3855 let ask_bar = self
3856 .bars
3857 .iter()
3858 .find(|(k, _)| {
3859 k.instrument_id() == *instrument_id
3860 && matches!(k.spec().price_type, PriceType::Ask)
3861 })
3862 .map(|(_, v)| v);
3863
3864 match (bid_bar, ask_bar) {
3865 (Some(bid), Some(ask)) => {
3866 match (bid.front(), ask.front()) {
3867 (Some(bid_bar), Some(ask_bar)) => (bid_bar.close, ask_bar.close),
3868 _ => {
3869 continue;
3871 }
3872 }
3873 }
3874 _ => continue,
3875 }
3876 };
3877
3878 bid_quotes.insert(instrument_id.symbol.to_string(), bid_price.as_f64());
3879 ask_quotes.insert(instrument_id.symbol.to_string(), ask_price.as_f64());
3880 }
3881
3882 (bid_quotes, ask_quotes)
3883 }
3884
3885 #[must_use]
3887 pub fn get_mark_xrate(&self, from_currency: Currency, to_currency: Currency) -> Option<f64> {
3888 self.mark_xrates.get(&(from_currency, to_currency)).copied()
3889 }
3890
3891 pub fn set_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency, xrate: f64) {
3897 assert!(xrate > 0.0, "xrate was zero");
3898 self.mark_xrates.insert((from_currency, to_currency), xrate);
3899 self.mark_xrates
3900 .insert((to_currency, from_currency), 1.0 / xrate);
3901 }
3902
3903 pub fn clear_mark_xrate(&mut self, from_currency: Currency, to_currency: Currency) {
3905 let _ = self.mark_xrates.remove(&(from_currency, to_currency));
3906 }
3907
3908 pub fn clear_mark_xrates(&mut self) {
3910 self.mark_xrates.clear();
3911 }
3912
3913 #[must_use]
3917 pub fn instrument(&self, instrument_id: &InstrumentId) -> Option<&InstrumentAny> {
3918 self.instruments.get(instrument_id)
3919 }
3920
3921 #[must_use]
3923 pub fn instrument_ids(&self, venue: Option<&Venue>) -> Vec<&InstrumentId> {
3924 match venue {
3925 Some(v) => self.instruments.keys().filter(|i| &i.venue == v).collect(),
3926 None => self.instruments.keys().collect(),
3927 }
3928 }
3929
3930 #[must_use]
3932 pub fn instruments(&self, venue: &Venue, underlying: Option<&Ustr>) -> Vec<&InstrumentAny> {
3933 self.instruments
3934 .values()
3935 .filter(|i| &i.id().venue == venue)
3936 .filter(|i| underlying.is_none_or(|u| i.underlying() == Some(*u)))
3937 .collect()
3938 }
3939
3940 #[must_use]
3942 pub fn bar_types(
3943 &self,
3944 instrument_id: Option<&InstrumentId>,
3945 price_type: Option<&PriceType>,
3946 aggregation_source: AggregationSource,
3947 ) -> Vec<&BarType> {
3948 let mut bar_types = self
3949 .bars
3950 .keys()
3951 .filter(|bar_type| bar_type.aggregation_source() == aggregation_source)
3952 .collect::<Vec<&BarType>>();
3953
3954 if let Some(instrument_id) = instrument_id {
3955 bar_types.retain(|bar_type| bar_type.instrument_id() == *instrument_id);
3956 }
3957
3958 if let Some(price_type) = price_type {
3959 bar_types.retain(|bar_type| &bar_type.spec().price_type == price_type);
3960 }
3961
3962 bar_types
3963 }
3964
3965 #[must_use]
3969 pub fn synthetic(&self, instrument_id: &InstrumentId) -> Option<&SyntheticInstrument> {
3970 self.synthetics.get(instrument_id)
3971 }
3972
3973 #[must_use]
3975 pub fn synthetic_ids(&self) -> Vec<&InstrumentId> {
3976 self.synthetics.keys().collect()
3977 }
3978
3979 #[must_use]
3981 pub fn synthetics(&self) -> Vec<&SyntheticInstrument> {
3982 self.synthetics.values().collect()
3983 }
3984
3985 #[must_use]
3989 pub fn account(&self, account_id: &AccountId) -> Option<&AccountAny> {
3990 self.accounts.get(account_id)
3991 }
3992
3993 #[must_use]
3995 pub fn account_for_venue(&self, venue: &Venue) -> Option<&AccountAny> {
3996 self.index
3997 .venue_account
3998 .get(venue)
3999 .and_then(|account_id| self.accounts.get(account_id))
4000 }
4001
4002 #[must_use]
4004 pub fn account_id(&self, venue: &Venue) -> Option<&AccountId> {
4005 self.index.venue_account.get(venue)
4006 }
4007
4008 #[must_use]
4010 pub fn accounts(&self, account_id: &AccountId) -> Vec<&AccountAny> {
4011 self.accounts
4012 .values()
4013 .filter(|account| &account.id() == account_id)
4014 .collect()
4015 }
4016
4017 pub fn update_own_order_book(&mut self, order: &OrderAny) {
4025 if !order.has_price() {
4026 return;
4027 }
4028
4029 let instrument_id = order.instrument_id();
4030
4031 let own_book = self
4032 .own_books
4033 .entry(instrument_id)
4034 .or_insert_with(|| OwnOrderBook::new(instrument_id));
4035
4036 let own_book_order = order.to_own_book_order();
4037
4038 if order.is_closed() {
4039 if let Err(e) = own_book.delete(own_book_order) {
4040 log::debug!(
4041 "Failed to delete order {} from own book: {e}",
4042 order.client_order_id(),
4043 );
4044 } else {
4045 log::debug!("Deleted order {} from own book", order.client_order_id());
4046 }
4047 } else {
4048 if let Err(e) = own_book.update(own_book_order) {
4050 log::debug!(
4051 "Failed to update order {} in own book: {e}; inserting instead",
4052 order.client_order_id(),
4053 );
4054 own_book.add(own_book_order);
4055 }
4056 log::debug!("Updated order {} in own book", order.client_order_id());
4057 }
4058 }
4059
4060 pub fn force_remove_from_own_order_book(&mut self, client_order_id: &ClientOrderId) {
4066 let order = match self.orders.get(client_order_id) {
4067 Some(order) => order,
4068 None => return,
4069 };
4070
4071 self.index.orders_open.remove(client_order_id);
4072 self.index.orders_pending_cancel.remove(client_order_id);
4073 self.index.orders_inflight.remove(client_order_id);
4074 self.index.orders_emulated.remove(client_order_id);
4075 self.index.orders_active_local.remove(client_order_id);
4076
4077 if let Some(own_book) = self.own_books.get_mut(&order.instrument_id())
4078 && order.has_price()
4079 {
4080 let own_book_order = order.to_own_book_order();
4081 if let Err(e) = own_book.delete(own_book_order) {
4082 log::debug!("Could not force delete {client_order_id} from own book: {e}");
4083 } else {
4084 log::debug!("Force deleted {client_order_id} from own book");
4085 }
4086 }
4087
4088 self.index.orders_closed.insert(*client_order_id);
4089 }
4090
4091 pub fn audit_own_order_books(&mut self) {
4098 log::debug!("Starting own books audit");
4099 let start = std::time::Instant::now();
4100
4101 let valid_order_ids: AHashSet<ClientOrderId> = self
4104 .index
4105 .orders_open
4106 .union(&self.index.orders_inflight)
4107 .copied()
4108 .collect();
4109
4110 for own_book in self.own_books.values_mut() {
4111 own_book.audit_open_orders(&valid_order_ids);
4112 }
4113
4114 log::debug!("Completed own books audit in {:?}", start.elapsed());
4115 }
4116}