Skip to main content

nautilus_portfolio/
portfolio.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `Portfolio` for all environments.
17
18use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::{AHashMap, AHashSet};
21use indexmap::{IndexMap, IndexSet};
22use nautilus_analysis::analyzer::PortfolioAnalyzer;
23use nautilus_common::{
24    cache::Cache,
25    clock::Clock,
26    enums::LogColor,
27    msgbus::{self, MessagingSwitchboard, TypedHandler},
28};
29use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
30use nautilus_model::{
31    accounts::{Account, AccountAny},
32    data::{Bar, MarkPriceUpdate, QuoteTick},
33    enums::{OmsType, OrderType, PositionSide, PriceType},
34    events::{AccountState, OrderEventAny, position::PositionEvent},
35    identifiers::{AccountId, InstrumentId, PositionId, Venue},
36    instruments::{Instrument, InstrumentAny},
37    orders::{Order, OrderAny},
38    position::Position,
39    types::{Currency, Money, Price},
40};
41use rust_decimal::Decimal;
42
43use crate::{config::PortfolioConfig, manager::AccountsManager};
44
45struct PortfolioState {
46    accounts: AccountsManager,
47    analyzer: PortfolioAnalyzer,
48    unrealized_pnls: IndexMap<InstrumentId, Money>,
49    realized_pnls: IndexMap<InstrumentId, Money>,
50    snapshot_sum_per_position: AHashMap<PositionId, Money>,
51    snapshot_last_per_position: AHashMap<PositionId, Money>,
52    snapshot_processed_counts: AHashMap<PositionId, usize>,
53    snapshot_account_ids: AHashMap<PositionId, AccountId>,
54    net_positions: IndexMap<InstrumentId, Decimal>,
55    pending_calcs: AHashSet<InstrumentId>,
56    bar_close_prices: AHashMap<InstrumentId, Price>,
57    initialized: bool,
58    last_account_state_log_ts: AHashMap<AccountId, u64>,
59    min_account_state_logging_interval_ns: u64,
60    venues_missing_price: AHashMap<Venue, AHashSet<InstrumentId>>,
61}
62
63impl PortfolioState {
64    fn new(
65        clock: Rc<RefCell<dyn Clock>>,
66        cache: Rc<RefCell<Cache>>,
67        config: &PortfolioConfig,
68    ) -> Self {
69        let min_account_state_logging_interval_ns = config
70            .min_account_state_logging_interval_ms
71            .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
72
73        Self {
74            accounts: AccountsManager::new(clock, cache),
75            analyzer: PortfolioAnalyzer::default(),
76            unrealized_pnls: IndexMap::new(),
77            realized_pnls: IndexMap::new(),
78            snapshot_sum_per_position: AHashMap::new(),
79            snapshot_last_per_position: AHashMap::new(),
80            snapshot_processed_counts: AHashMap::new(),
81            snapshot_account_ids: AHashMap::new(),
82            net_positions: IndexMap::new(),
83            pending_calcs: AHashSet::new(),
84            bar_close_prices: AHashMap::new(),
85            initialized: false,
86            last_account_state_log_ts: AHashMap::new(),
87            min_account_state_logging_interval_ns,
88            venues_missing_price: AHashMap::new(),
89        }
90    }
91
92    fn reset(&mut self) {
93        log::debug!("RESETTING");
94        self.net_positions.clear();
95        self.unrealized_pnls.clear();
96        self.realized_pnls.clear();
97        self.snapshot_sum_per_position.clear();
98        self.snapshot_last_per_position.clear();
99        self.snapshot_processed_counts.clear();
100        self.snapshot_account_ids.clear();
101        self.pending_calcs.clear();
102        self.bar_close_prices.clear();
103        self.last_account_state_log_ts.clear();
104        self.venues_missing_price.clear();
105        self.analyzer.reset();
106        self.initialized = false;
107        log::debug!("READY");
108    }
109}
110
111pub struct Portfolio {
112    pub(crate) clock: Rc<RefCell<dyn Clock>>,
113    pub(crate) cache: Rc<RefCell<Cache>>,
114    inner: Rc<RefCell<PortfolioState>>,
115    config: PortfolioConfig,
116}
117
118impl Debug for Portfolio {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        f.debug_struct(stringify!(Portfolio)).finish()
121    }
122}
123
124impl Portfolio {
125    pub fn new(
126        cache: Rc<RefCell<Cache>>,
127        clock: Rc<RefCell<dyn Clock>>,
128        config: Option<PortfolioConfig>,
129    ) -> Self {
130        let config = config.unwrap_or_default();
131        let inner = Rc::new(RefCell::new(PortfolioState::new(
132            clock.clone(),
133            cache.clone(),
134            &config,
135        )));
136
137        Self::register_message_handlers(&cache, &clock, &inner, config);
138
139        Self {
140            clock,
141            cache,
142            inner,
143            config,
144        }
145    }
146
147    /// Creates a shallow clone of the Portfolio that shares the same internal state.
148    ///
149    /// This is useful when multiple components need to reference the same Portfolio
150    /// without creating duplicate msgbus handler registrations.
151    #[must_use]
152    pub fn clone_shallow(&self) -> Self {
153        Self {
154            clock: self.clock.clone(),
155            cache: self.cache.clone(),
156            inner: self.inner.clone(),
157            config: self.config,
158        }
159    }
160
161    fn register_message_handlers(
162        cache: &Rc<RefCell<Cache>>,
163        clock: &Rc<RefCell<dyn Clock>>,
164        inner: &Rc<RefCell<PortfolioState>>,
165        config: PortfolioConfig,
166    ) {
167        let inner_weak = WeakCell::from(Rc::downgrade(inner));
168
169        // Typed handlers for subscriptions
170        let update_account_handler = {
171            let cache = cache.clone();
172            let inner = inner_weak.clone();
173            TypedHandler::from(move |event: &AccountState| {
174                if let Some(inner_rc) = inner.upgrade() {
175                    let inner_rc: Rc<RefCell<PortfolioState>> = inner_rc.into();
176                    update_account(&cache, &inner_rc, event);
177                }
178            })
179        };
180
181        let update_position_handler = {
182            let cache = cache.clone();
183            let clock = clock.clone();
184            let inner = inner_weak.clone();
185            TypedHandler::from(move |event: &PositionEvent| {
186                if let Some(inner_rc) = inner.upgrade() {
187                    let inner_rc: Rc<RefCell<PortfolioState>> = inner_rc.into();
188                    update_position(&cache, &clock, &inner_rc, config, event);
189                }
190            })
191        };
192
193        let update_quote_handler = {
194            let cache = cache.clone();
195            let clock = clock.clone();
196            let inner = inner_weak.clone();
197            TypedHandler::from(move |quote: &QuoteTick| {
198                if let Some(inner_rc) = inner.upgrade() {
199                    let inner_rc: Rc<RefCell<PortfolioState>> = inner_rc.into();
200                    update_quote_tick(&cache, &clock, &inner_rc, config, quote);
201                }
202            })
203        };
204
205        let update_bar_handler = {
206            let cache = cache.clone();
207            let clock = clock.clone();
208            let inner = inner_weak.clone();
209            TypedHandler::from(move |bar: &Bar| {
210                if let Some(inner_rc) = inner.upgrade() {
211                    let inner_rc: Rc<RefCell<PortfolioState>> = inner_rc.into();
212                    update_bar(&cache, &clock, &inner_rc, config, bar);
213                }
214            })
215        };
216
217        let update_mark_price_handler = {
218            let cache = cache.clone();
219            let clock = clock.clone();
220            let inner = inner_weak.clone();
221            TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
222                if let Some(inner_rc) = inner.upgrade() {
223                    let inner_rc: Rc<RefCell<PortfolioState>> = inner_rc.into();
224                    update_instrument_id(
225                        &cache,
226                        &clock,
227                        &inner_rc,
228                        config,
229                        &mark_price.instrument_id,
230                    );
231                }
232            })
233        };
234
235        let update_order_handler = {
236            let cache = cache.clone();
237            let clock = clock.clone();
238            let inner = inner_weak;
239            TypedHandler::from(move |event: &OrderEventAny| {
240                if let Some(inner_rc) = inner.upgrade() {
241                    let inner_rc: Rc<RefCell<PortfolioState>> = inner_rc.into();
242                    update_order(&cache, &clock, &inner_rc, config, event);
243                }
244            })
245        };
246
247        let endpoint = MessagingSwitchboard::portfolio_update_account();
248        msgbus::register_account_state_endpoint(endpoint, update_account_handler.clone());
249
250        msgbus::subscribe_quotes("data.quotes.*".into(), update_quote_handler, Some(10));
251
252        if config.bar_updates {
253            msgbus::subscribe_bars("data.bars.*EXTERNAL".into(), update_bar_handler, Some(10));
254        }
255
256        if config.use_mark_prices {
257            msgbus::subscribe_mark_prices(
258                "data.mark_prices.*".into(),
259                update_mark_price_handler,
260                Some(10),
261            );
262        }
263        msgbus::subscribe_order_events("events.order.*".into(), update_order_handler, Some(10));
264        msgbus::subscribe_position_events(
265            "events.position.*".into(),
266            update_position_handler,
267            Some(10),
268        );
269        msgbus::subscribe_account_state(
270            "events.account.*".into(),
271            update_account_handler,
272            Some(10),
273        );
274    }
275
276    pub fn reset(&mut self) {
277        log::debug!("RESETTING");
278        self.inner.borrow_mut().reset();
279        log::debug!("READY");
280    }
281
282    /// Returns a reference to the cache.
283    #[must_use]
284    pub fn cache(&self) -> &Rc<RefCell<Cache>> {
285        &self.cache
286    }
287
288    /// Returns `true` if the portfolio has been initialized.
289    #[must_use]
290    pub fn is_initialized(&self) -> bool {
291        self.inner.borrow().initialized
292    }
293
294    /// Returns the locked balances for the given venue.
295    ///
296    /// Locked balances represent funds reserved for open orders.
297    #[must_use]
298    pub fn balances_locked(&self, venue: &Venue) -> IndexMap<Currency, Money> {
299        self.cache.borrow().account_for_venue(venue).map_or_else(
300            || {
301                log::error!("Cannot get balances locked: no account generated for {venue}");
302                IndexMap::new()
303            },
304            AccountAny::balances_locked,
305        )
306    }
307
308    /// Returns the initial margin requirements for the given venue.
309    ///
310    /// Only applicable for margin accounts. Returns empty map for cash accounts.
311    #[must_use]
312    pub fn margins_init(&self, venue: &Venue) -> IndexMap<InstrumentId, Money> {
313        self.cache.borrow().account_for_venue(venue).map_or_else(
314            || {
315                log::error!(
316                    "Cannot get initial (order) margins: no account registered for {venue}"
317                );
318                IndexMap::new()
319            },
320            |account| match account {
321                AccountAny::Margin(margin_account) => margin_account.initial_margins(),
322                AccountAny::Cash(_) | AccountAny::Betting(_) => {
323                    log::warn!("Initial margins not applicable for cash account");
324                    IndexMap::new()
325                }
326            },
327        )
328    }
329
330    /// Returns the maintenance margin requirements for the given venue.
331    ///
332    /// Only applicable for margin accounts. Returns empty map for cash accounts.
333    #[must_use]
334    pub fn margins_maint(&self, venue: &Venue) -> IndexMap<InstrumentId, Money> {
335        self.cache.borrow().account_for_venue(venue).map_or_else(
336            || {
337                log::error!(
338                    "Cannot get maintenance (position) margins: no account registered for {venue}"
339                );
340                IndexMap::new()
341            },
342            |account| match account {
343                AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
344                AccountAny::Cash(_) | AccountAny::Betting(_) => {
345                    log::warn!("Maintenance margins not applicable for cash account");
346                    IndexMap::new()
347                }
348            },
349        )
350    }
351
352    /// Returns the unrealized PnLs for all positions at the given venue.
353    ///
354    /// Calculates mark-to-market PnL based on current market prices.
355    #[must_use]
356    pub fn unrealized_pnls(
357        &mut self,
358        venue: &Venue,
359        account_id: Option<&AccountId>,
360    ) -> IndexMap<Currency, Money> {
361        let instrument_ids = {
362            let cache = self.cache.borrow();
363            let positions = cache.positions(Some(venue), None, None, account_id, None);
364
365            if positions.is_empty() {
366                return IndexMap::new(); // Nothing to calculate
367            }
368
369            // IndexSet preserves the deterministic order of cache.positions
370            // through the dedup so the returned currency map iterates in a
371            // stable order across runs.
372            let instrument_ids: IndexSet<InstrumentId> =
373                positions.iter().map(|p| p.instrument_id).collect();
374
375            instrument_ids
376        };
377
378        let mut unrealized_pnls: IndexMap<Currency, f64> = IndexMap::new();
379
380        for instrument_id in instrument_ids {
381            // The instrument-keyed cache aggregates across all accounts on the
382            // same venue, so bypass it when the caller filters by account_id.
383            if account_id.is_none()
384                && let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id)
385            {
386                *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
387                continue;
388            }
389
390            if let Some(pnl) = self.calculate_unrealized_pnl(&instrument_id, account_id) {
391                *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
392            }
393        }
394
395        unrealized_pnls
396            .into_iter()
397            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
398            .collect()
399    }
400
401    /// Returns the realized PnLs for all positions at the given venue.
402    ///
403    /// Calculates total realized profit and loss from closed positions.
404    #[must_use]
405    pub fn realized_pnls(
406        &mut self,
407        venue: &Venue,
408        account_id: Option<&AccountId>,
409    ) -> IndexMap<Currency, Money> {
410        let instrument_ids = {
411            let cache = self.cache.borrow();
412            let positions = cache.positions(Some(venue), None, None, account_id, None);
413
414            if positions.is_empty() {
415                return IndexMap::new(); // Nothing to calculate
416            }
417
418            let instrument_ids: IndexSet<InstrumentId> =
419                positions.iter().map(|p| p.instrument_id).collect();
420
421            instrument_ids
422        };
423
424        let mut realized_pnls: IndexMap<Currency, f64> = IndexMap::new();
425
426        for instrument_id in instrument_ids {
427            // The instrument-keyed cache aggregates across all accounts on the
428            // same venue, so bypass it when the caller filters by account_id.
429            if account_id.is_none()
430                && let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id)
431            {
432                *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
433                continue;
434            }
435
436            if let Some(pnl) = self.calculate_realized_pnl(&instrument_id, account_id) {
437                *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
438            }
439        }
440
441        realized_pnls
442            .into_iter()
443            .map(|(currency, amount)| (currency, Money::new(amount, currency)))
444            .collect()
445    }
446
447    #[must_use]
448    pub fn net_exposures(
449        &self,
450        venue: &Venue,
451        account_id: Option<&AccountId>,
452    ) -> Option<IndexMap<Currency, Money>> {
453        let cache = self.cache.borrow();
454        let account = if let Some(id) = account_id {
455            if let Some(account) = cache.account(id) {
456                account
457            } else {
458                log::error!("Cannot calculate net exposures: no account for {id}");
459                return None;
460            }
461        } else if let Some(account) = cache.account_for_venue(venue) {
462            account
463        } else {
464            log::error!("Cannot calculate net exposures: no account registered for {venue}");
465            return None;
466        };
467
468        let positions_open = cache.positions_open(Some(venue), None, None, account_id, None);
469        if positions_open.is_empty() {
470            return Some(IndexMap::new()); // Nothing to calculate
471        }
472
473        let mut net_exposures: IndexMap<Currency, f64> = IndexMap::new();
474
475        for position in positions_open {
476            let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
477                instrument
478            } else {
479                log::error!(
480                    "Cannot calculate net exposures: no instrument for {}",
481                    position.instrument_id
482                );
483                return None; // Cannot calculate
484            };
485
486            if position.side == PositionSide::Flat {
487                log::error!(
488                    "Cannot calculate net exposures: position is flat for {}",
489                    position.instrument_id
490                );
491                continue; // Nothing to calculate
492            }
493
494            let price = self.get_price(position)?;
495            let xrate = if let Some(xrate) = self.calculate_xrate_to_base(instrument, account) {
496                xrate
497            } else {
498                log::error!(
499                    // TODO: Improve logging
500                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
501                    instrument.settlement_currency(),
502                    account.base_currency()
503                );
504                return None; // Cannot calculate
505            };
506
507            let settlement_currency = account
508                .base_currency()
509                .unwrap_or_else(|| instrument.settlement_currency());
510
511            let net_exposure = instrument
512                .calculate_notional_value(position.quantity, price, None)
513                .as_f64()
514                * xrate;
515
516            let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
517                .round()
518                / 10f64.powi(settlement_currency.precision.into());
519
520            *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
521        }
522
523        Some(
524            net_exposures
525                .into_iter()
526                .map(|(currency, amount)| (currency, Money::new(amount, currency)))
527                .collect(),
528        )
529    }
530
531    #[must_use]
532    pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
533        if let Some(pnl) = self
534            .inner
535            .borrow()
536            .unrealized_pnls
537            .get(instrument_id)
538            .copied()
539        {
540            return Some(pnl);
541        }
542
543        let pnl = self.calculate_unrealized_pnl(instrument_id, None)?;
544        self.inner
545            .borrow_mut()
546            .unrealized_pnls
547            .insert(*instrument_id, pnl);
548        Some(pnl)
549    }
550
551    #[must_use]
552    pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
553        if let Some(pnl) = self
554            .inner
555            .borrow()
556            .realized_pnls
557            .get(instrument_id)
558            .copied()
559        {
560            return Some(pnl);
561        }
562
563        let pnl = self.calculate_realized_pnl(instrument_id, None)?;
564        self.inner
565            .borrow_mut()
566            .realized_pnls
567            .insert(*instrument_id, pnl);
568        Some(pnl)
569    }
570
571    /// Returns the total PnL for the given instrument ID.
572    ///
573    /// Total PnL = Realized PnL + Unrealized PnL
574    #[must_use]
575    pub fn total_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
576        let realized = self.realized_pnl(instrument_id)?;
577        let unrealized = self.unrealized_pnl(instrument_id)?;
578
579        if realized.currency != unrealized.currency {
580            log::error!(
581                "Cannot calculate total PnL: currency mismatch {} vs {}",
582                realized.currency,
583                unrealized.currency
584            );
585            return None;
586        }
587
588        Some(Money::new(
589            realized.as_f64() + unrealized.as_f64(),
590            realized.currency,
591        ))
592    }
593
594    /// Returns the total PnLs for the given venue.
595    ///
596    /// Total PnL = Realized PnL + Unrealized PnL for each currency. Pass `account_id`
597    /// to scope the aggregation to a single account when multiple accounts share the venue.
598    #[must_use]
599    pub fn total_pnls(
600        &mut self,
601        venue: &Venue,
602        account_id: Option<&AccountId>,
603    ) -> IndexMap<Currency, Money> {
604        let realized_pnls = self.realized_pnls(venue, account_id);
605        let unrealized_pnls = self.unrealized_pnls(venue, account_id);
606
607        let mut total_pnls: IndexMap<Currency, Money> = IndexMap::new();
608
609        // Add realized PnLs
610        for (currency, realized) in realized_pnls {
611            total_pnls.insert(currency, realized);
612        }
613
614        // Add unrealized PnLs
615        for (currency, unrealized) in unrealized_pnls {
616            match total_pnls.get_mut(&currency) {
617                Some(total) => {
618                    *total = *total + unrealized;
619                }
620                None => {
621                    total_pnls.insert(currency, unrealized);
622                }
623            }
624        }
625
626        total_pnls
627    }
628
629    /// Returns the per-currency mark-to-market value of open positions at the given venue.
630    ///
631    /// For each open position the valuation uses the portfolio's internal price
632    /// resolution, which prefers mark prices (when configured), falls back to
633    /// side-appropriate bid/ask, then last trade, then the most recent bar close.
634    /// Instruments without any available price are skipped and the venue is flagged
635    /// for a no-price warning. Pass `account_id` to scope the aggregation to a
636    /// single account when multiple accounts share the venue.
637    #[must_use]
638    pub fn mark_values(
639        &mut self,
640        venue: &Venue,
641        account_id: Option<&AccountId>,
642    ) -> IndexMap<Currency, Money> {
643        let mut values: IndexMap<Currency, f64> = IndexMap::new();
644        let mut unpriced: AHashSet<InstrumentId> = AHashSet::new();
645
646        if self.accumulate_mark_values(venue, account_id, &mut values, &mut unpriced) {
647            self.update_missing_price_state(venue, &unpriced);
648        } else if account_id.is_none() {
649            // Only clear the tracker on an unfiltered sweep; otherwise we could
650            // wipe another account's flags on the same venue.
651            self.inner.borrow_mut().venues_missing_price.remove(venue);
652        }
653
654        values
655            .into_iter()
656            .map(|(c, v)| (c, Money::new(v, c)))
657            .collect()
658    }
659
660    /// Returns the per-currency total equity for the given venue.
661    ///
662    /// For cash accounts: `balance.total + Σ mark_value(open positions)` per currency.
663    /// For margin accounts: `balance.total + Σ unrealized_pnl(open positions)` per currency.
664    ///
665    /// Open-position instruments that cannot be priced are tracked via
666    /// [`Portfolio::missing_price_instruments`] (and warned once) for both branches,
667    /// so equity understatement does not go unnoticed. Pass `account_id` to scope
668    /// the aggregation to a single account when multiple accounts share the venue.
669    #[must_use]
670    pub fn equity(
671        &mut self,
672        venue: &Venue,
673        account_id: Option<&AccountId>,
674    ) -> IndexMap<Currency, Money> {
675        let (mut equity, is_margin) = {
676            let cache = self.cache.borrow();
677            let account = match account_id {
678                Some(id) => cache.account(id),
679                None => cache.account_for_venue(venue),
680            };
681
682            match account {
683                Some(account) => {
684                    let equity: IndexMap<Currency, f64> = account
685                        .balances_total()
686                        .into_iter()
687                        .map(|(c, m)| (c, m.as_f64()))
688                        .collect();
689                    (equity, matches!(account, AccountAny::Margin(_)))
690                }
691                None => return IndexMap::new(),
692            }
693        };
694
695        let mut unpriced: AHashSet<InstrumentId> = AHashSet::new();
696
697        if is_margin {
698            // Sum cached unrealized PnLs; fall through to recalculation on cache miss.
699            let instrument_ids: IndexSet<InstrumentId> = {
700                let cache = self.cache.borrow();
701                cache
702                    .positions_open(Some(venue), None, None, account_id, None)
703                    .iter()
704                    .map(|p| p.instrument_id)
705                    .collect()
706            };
707
708            if instrument_ids.is_empty() {
709                if account_id.is_none() {
710                    self.inner.borrow_mut().venues_missing_price.remove(venue);
711                }
712            } else {
713                for instrument_id in instrument_ids {
714                    // The instrument-keyed cache aggregates across all accounts on
715                    // the same venue, so bypass it when the caller filters by
716                    // account_id.
717                    let cached = if account_id.is_none() {
718                        self.inner
719                            .borrow()
720                            .unrealized_pnls
721                            .get(&instrument_id)
722                            .copied()
723                    } else {
724                        None
725                    };
726                    let pnl = match cached {
727                        Some(pnl) => Some(pnl),
728                        None => self.calculate_unrealized_pnl(&instrument_id, account_id),
729                    };
730
731                    match pnl {
732                        Some(pnl) => {
733                            *equity.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
734                        }
735                        None => {
736                            unpriced.insert(instrument_id);
737                        }
738                    }
739                }
740                self.update_missing_price_state(venue, &unpriced);
741            }
742        } else if self.accumulate_mark_values(venue, account_id, &mut equity, &mut unpriced) {
743            self.update_missing_price_state(venue, &unpriced);
744        } else if account_id.is_none() {
745            self.inner.borrow_mut().venues_missing_price.remove(venue);
746        }
747
748        equity
749            .into_iter()
750            .map(|(c, v)| (c, Money::new(v, c)))
751            .collect()
752    }
753
754    /// Returns the instruments currently flagged as unpriced for the given venue.
755    ///
756    /// An entry is added the first time [`Portfolio::mark_values`] cannot source a
757    /// price for an open position (after also emitting a warn log), and removed
758    /// once the instrument is priced again so a subsequent drop re-warns.
759    #[must_use]
760    pub fn missing_price_instruments(&self, venue: &Venue) -> Vec<InstrumentId> {
761        let mut ids: Vec<InstrumentId> = self
762            .inner
763            .borrow()
764            .venues_missing_price
765            .get(venue)
766            .map(|set| set.iter().copied().collect())
767            .unwrap_or_default();
768        // Sort so the public Vec is deterministic even though the underlying
769        // tracking set is AHash-backed.
770        ids.sort();
771        ids
772    }
773
774    fn update_missing_price_state(&self, venue: &Venue, unpriced: &AHashSet<InstrumentId>) {
775        let mut inner = self.inner.borrow_mut();
776        let tracked = inner.venues_missing_price.entry(*venue).or_default();
777
778        // Sort first so the warn-log sequence is deterministic across runs.
779        let mut ids: Vec<InstrumentId> = unpriced.iter().copied().collect();
780        ids.sort();
781        for instrument_id in ids {
782            if tracked.insert(instrument_id) {
783                log::warn!(
784                    "No price available for open position {instrument_id}; \
785                    subscribe to quotes, trades or bars for continuous mark-to-market equity"
786                );
787            }
788        }
789
790        // Instruments that are now priced should be removed so a future price drop re-warns
791        tracked.retain(|id| unpriced.contains(id));
792    }
793
794    // Returns `true` if at least one open position was seen (priced or not),
795    // `false` if the venue is flat. Unpriced instruments are written to
796    // `unpriced` for the caller to flow into `update_missing_price_state`.
797    fn accumulate_mark_values(
798        &self,
799        venue: &Venue,
800        account_id: Option<&AccountId>,
801        values: &mut IndexMap<Currency, f64>,
802        unpriced: &mut AHashSet<InstrumentId>,
803    ) -> bool {
804        let cache = self.cache.borrow();
805        let positions = cache.positions_open(Some(venue), None, None, account_id, None);
806
807        if positions.is_empty() {
808            return false;
809        }
810
811        let account = match account_id {
812            Some(id) => cache.account(id),
813            None => cache.account_for_venue(venue),
814        };
815        let mut xrate_cache: AHashMap<Currency, Option<f64>> = AHashMap::new();
816
817        for position in positions {
818            let sign = match position.side {
819                PositionSide::Long => 1.0,
820                PositionSide::Short => -1.0,
821                PositionSide::Flat | PositionSide::NoPositionSide => continue,
822            };
823
824            let instrument = match cache.instrument(&position.instrument_id) {
825                Some(i) => i,
826                None => {
827                    unpriced.insert(position.instrument_id);
828                    continue;
829                }
830            };
831
832            let price = match self.get_price(position) {
833                Some(p) => p,
834                None => {
835                    unpriced.insert(position.instrument_id);
836                    continue;
837                }
838            };
839
840            let settlement = instrument.settlement_currency();
841            let (xrate, currency) = if self.config.convert_to_account_base_currency
842                && let Some(account) = account
843                && let Some(base_currency) = account.base_currency()
844            {
845                let xrate_opt = *xrate_cache
846                    .entry(settlement)
847                    .or_insert_with(|| self.calculate_xrate_to_base(instrument, account));
848                let xrate = match xrate_opt {
849                    Some(x) => x,
850                    None => {
851                        unpriced.insert(position.instrument_id);
852                        continue;
853                    }
854                };
855                (xrate, base_currency)
856            } else {
857                (1.0, settlement)
858            };
859
860            let notional = position.notional_value(price).as_f64() * xrate;
861            *values.entry(currency).or_insert(0.0) += sign * notional;
862        }
863
864        true
865    }
866
867    #[must_use]
868    pub fn net_exposure(
869        &self,
870        instrument_id: &InstrumentId,
871        account_id: Option<&AccountId>,
872    ) -> Option<Money> {
873        let cache = self.cache.borrow();
874
875        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
876            instrument
877        } else {
878            log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
879            return None;
880        };
881
882        let positions_open =
883            cache.positions_open(None, Some(instrument_id), None, account_id, None);
884
885        if positions_open.is_empty() {
886            return Some(Money::new(0.0, instrument.settlement_currency()));
887        }
888
889        let mut net_exposure = 0.0;
890        let mut first_base_currency: Option<Currency> = None;
891        let mut first_account: Option<&AccountAny> = None;
892
893        for position in &positions_open {
894            // Get account for THIS position
895            let account = if let Some(account) = cache.account(&position.account_id) {
896                account
897            } else {
898                log::error!(
899                    "Cannot calculate net exposure: no account for {}",
900                    position.account_id
901                );
902                return None;
903            };
904
905            // Validate consistent base currency across accounts
906            if let Some(base) = account.base_currency() {
907                match first_base_currency {
908                    None => {
909                        first_base_currency = Some(base);
910                        first_account = Some(account);
911                    }
912                    Some(first) if first != base => {
913                        log::error!(
914                            "Cannot calculate net exposure: accounts have different base \
915                            currencies ({first} vs {base}); multi-account aggregation requires \
916                            consistent base currencies"
917                        );
918                        return None;
919                    }
920                    _ => {}
921                }
922            }
923
924            let price = self.get_price(position)?;
925            let xrate = if let Some(xrate) = self.calculate_xrate_to_base(instrument, account) {
926                xrate
927            } else {
928                log::error!(
929                    "Cannot calculate net exposures: insufficient data for {}/{:?}",
930                    instrument.settlement_currency(),
931                    account.base_currency()
932                );
933                return None;
934            };
935
936            let notional_value =
937                instrument.calculate_notional_value(position.quantity, price, None);
938            net_exposure += notional_value.as_f64() * xrate;
939        }
940
941        let settlement_currency = first_account
942            .and_then(|a| a.base_currency())
943            .unwrap_or_else(|| instrument.settlement_currency());
944
945        Some(Money::new(net_exposure, settlement_currency))
946    }
947
948    #[must_use]
949    pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
950        self.inner
951            .borrow()
952            .net_positions
953            .get(instrument_id)
954            .copied()
955            .unwrap_or(Decimal::ZERO)
956    }
957
958    #[must_use]
959    pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
960        self.inner
961            .borrow()
962            .net_positions
963            .get(instrument_id)
964            .copied()
965            .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
966    }
967
968    #[must_use]
969    pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
970        self.inner
971            .borrow()
972            .net_positions
973            .get(instrument_id)
974            .copied()
975            .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
976    }
977
978    #[must_use]
979    pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
980        self.inner
981            .borrow()
982            .net_positions
983            .get(instrument_id)
984            .copied()
985            .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
986    }
987
988    #[must_use]
989    pub fn is_completely_flat(&self) -> bool {
990        for net_position in self.inner.borrow().net_positions.values() {
991            if *net_position != Decimal::ZERO {
992                return false;
993            }
994        }
995        true
996    }
997
998    /// Initializes account margin based on existing open orders.
999    ///
1000    /// # Panics
1001    ///
1002    /// Panics if updating the cache with a mutated account fails.
1003    pub fn initialize_orders(&mut self) {
1004        let mut initialized = true;
1005        let orders_and_instruments = {
1006            let cache = self.cache.borrow();
1007            let all_orders_open = cache.orders_open(None, None, None, None, None);
1008
1009            let mut instruments_with_orders = Vec::new();
1010            let mut instruments = AHashSet::new();
1011
1012            for order in &all_orders_open {
1013                instruments.insert(order.instrument_id());
1014            }
1015
1016            for instrument_id in instruments {
1017                if let Some(instrument) = cache.instrument(&instrument_id) {
1018                    let orders = cache
1019                        .orders_open(None, Some(&instrument_id), None, None, None)
1020                        .into_iter()
1021                        .cloned()
1022                        .collect::<Vec<OrderAny>>();
1023                    instruments_with_orders.push((instrument.clone(), orders));
1024                } else {
1025                    log::error!(
1026                        "Cannot update initial (order) margin: no instrument found for {instrument_id}"
1027                    );
1028                    initialized = false;
1029                    break;
1030                }
1031            }
1032            instruments_with_orders
1033        };
1034
1035        for (instrument, orders_open) in &orders_and_instruments {
1036            let account = {
1037                let cache = self.cache.borrow();
1038                if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
1039                    account.clone()
1040                } else {
1041                    log::error!(
1042                        "Cannot update initial (order) margin: no account registered for {}",
1043                        instrument.id().venue
1044                    );
1045                    initialized = false;
1046                    break;
1047                }
1048            };
1049
1050            let result = self.inner.borrow_mut().accounts.update_orders(
1051                &account,
1052                instrument,
1053                orders_open.iter().collect(),
1054                self.clock.borrow().timestamp_ns(),
1055            );
1056
1057            match result {
1058                Some((updated_account, _)) => {
1059                    self.cache
1060                        .borrow_mut()
1061                        .update_account(&updated_account)
1062                        .unwrap();
1063                }
1064                None => {
1065                    initialized = false;
1066                }
1067            }
1068        }
1069
1070        let total_orders = orders_and_instruments
1071            .into_iter()
1072            .map(|(_, orders)| orders.len())
1073            .sum::<usize>();
1074
1075        log::info!(
1076            color = if total_orders > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
1077            "Initialized {} open order{}",
1078            total_orders,
1079            if total_orders == 1 { "" } else { "s" }
1080        );
1081
1082        self.inner.borrow_mut().initialized = initialized;
1083    }
1084
1085    /// Initializes account margin based on existing open positions.
1086    ///
1087    /// # Panics
1088    ///
1089    /// Panics if calculation of PnL or updating the cache with a mutated account fails.
1090    pub fn initialize_positions(&mut self) {
1091        self.inner.borrow_mut().unrealized_pnls.clear();
1092        self.inner.borrow_mut().realized_pnls.clear();
1093        let all_positions_open: Vec<Position>;
1094        let mut instruments = AHashSet::new();
1095        {
1096            let cache = self.cache.borrow();
1097            all_positions_open = cache
1098                .positions_open(None, None, None, None, None)
1099                .into_iter()
1100                .cloned()
1101                .collect();
1102
1103            for position in &all_positions_open {
1104                instruments.insert(position.instrument_id);
1105            }
1106        }
1107
1108        let mut initialized = true;
1109
1110        for instrument_id in instruments {
1111            let positions_open: Vec<Position> = {
1112                let cache = self.cache.borrow();
1113                cache
1114                    .positions_open(None, Some(&instrument_id), None, None, None)
1115                    .into_iter()
1116                    .cloned()
1117                    .collect()
1118            };
1119
1120            self.update_net_position(&instrument_id, &positions_open);
1121
1122            if let Some(calculated_unrealized_pnl) =
1123                self.calculate_unrealized_pnl(&instrument_id, None)
1124            {
1125                self.inner
1126                    .borrow_mut()
1127                    .unrealized_pnls
1128                    .insert(instrument_id, calculated_unrealized_pnl);
1129            } else {
1130                log::debug!(
1131                    "Failed to calculate unrealized PnL for {instrument_id}, marking as pending"
1132                );
1133                self.inner.borrow_mut().pending_calcs.insert(instrument_id);
1134            }
1135
1136            if let Some(calculated_realized_pnl) = self.calculate_realized_pnl(&instrument_id, None)
1137            {
1138                self.inner
1139                    .borrow_mut()
1140                    .realized_pnls
1141                    .insert(instrument_id, calculated_realized_pnl);
1142            } else {
1143                log::warn!(
1144                    "Failed to calculate realized PnL for {instrument_id}, marking as pending"
1145                );
1146                self.inner.borrow_mut().pending_calcs.insert(instrument_id);
1147            }
1148
1149            let cache = self.cache.borrow();
1150            let Some(account) = cache.account_for_venue(&instrument_id.venue).cloned() else {
1151                log::error!(
1152                    "Cannot update maintenance (position) margin: no account registered for {}",
1153                    instrument_id.venue
1154                );
1155                initialized = false;
1156                break;
1157            };
1158
1159            let account = match account {
1160                AccountAny::Cash(_) | AccountAny::Betting(_) => continue,
1161                AccountAny::Margin(margin_account) => margin_account,
1162            };
1163
1164            let Some(instrument) = cache.instrument(&instrument_id).cloned() else {
1165                log::error!(
1166                    "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
1167                );
1168                initialized = false;
1169                break;
1170            };
1171            let positions: Vec<Position> = cache
1172                .positions_open(None, Some(&instrument_id), None, None, None)
1173                .into_iter()
1174                .cloned()
1175                .collect();
1176            drop(cache);
1177
1178            let result = self.inner.borrow_mut().accounts.update_positions(
1179                &account,
1180                &instrument,
1181                positions.iter().collect(),
1182                self.clock.borrow().timestamp_ns(),
1183            );
1184
1185            match result {
1186                Some((updated_account, _)) => {
1187                    self.cache
1188                        .borrow_mut()
1189                        .update_account(&AccountAny::Margin(updated_account))
1190                        .unwrap();
1191                }
1192                None => {
1193                    initialized = false;
1194                }
1195            }
1196        }
1197
1198        let open_count = all_positions_open.len();
1199        self.inner.borrow_mut().initialized = initialized;
1200        log::info!(
1201            color = if open_count > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
1202            "Initialized {} open position{}",
1203            open_count,
1204            if open_count == 1 { "" } else { "s" }
1205        );
1206    }
1207
1208    /// Updates portfolio calculations based on a new quote tick.
1209    ///
1210    /// Recalculates unrealized PnL for positions affected by the quote update.
1211    pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
1212        update_quote_tick(&self.cache, &self.clock, &self.inner, self.config, quote);
1213    }
1214
1215    /// Updates portfolio calculations based on a new bar.
1216    ///
1217    /// Updates cached bar close prices and recalculates unrealized PnL.
1218    pub fn update_bar(&mut self, bar: &Bar) {
1219        update_bar(&self.cache, &self.clock, &self.inner, self.config, bar);
1220    }
1221
1222    /// Updates portfolio with a new account state event.
1223    pub fn update_account(&mut self, event: &AccountState) {
1224        update_account(&self.cache, &self.inner, event);
1225    }
1226
1227    /// Updates portfolio calculations based on an order event.
1228    ///
1229    /// Handles balance updates for order fills and margin calculations for order changes.
1230    pub fn update_order(&mut self, event: &OrderEventAny) {
1231        update_order(&self.cache, &self.clock, &self.inner, self.config, event);
1232    }
1233
1234    /// Updates portfolio calculations based on a position event.
1235    ///
1236    /// Recalculates net positions, unrealized PnL, and margin requirements.
1237    pub fn update_position(&mut self, event: &PositionEvent) {
1238        update_position(&self.cache, &self.clock, &self.inner, self.config, event);
1239    }
1240
1241    fn update_net_position(&self, instrument_id: &InstrumentId, positions_open: &[Position]) {
1242        let mut net_position = Decimal::ZERO;
1243
1244        for open_position in positions_open {
1245            log::debug!("open_position: {open_position}");
1246            net_position += open_position.signed_decimal_qty();
1247        }
1248
1249        let existing_position = self.net_position(instrument_id);
1250        if existing_position != net_position {
1251            self.inner
1252                .borrow_mut()
1253                .net_positions
1254                .insert(*instrument_id, net_position);
1255            log::info!("{instrument_id} net_position={net_position}");
1256        }
1257    }
1258
1259    fn calculate_unrealized_pnl(
1260        &self,
1261        instrument_id: &InstrumentId,
1262        account_id: Option<&AccountId>,
1263    ) -> Option<Money> {
1264        let cache = self.cache.borrow();
1265        let account = match account_id {
1266            Some(id) => cache.account(id),
1267            None => cache.account_for_venue(&instrument_id.venue),
1268        };
1269        let account = if let Some(account) = account {
1270            account
1271        } else {
1272            log::error!(
1273                "Cannot calculate unrealized PnL: no account for {} / {account_id:?}",
1274                instrument_id.venue,
1275            );
1276            return None;
1277        };
1278
1279        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1280            instrument
1281        } else {
1282            log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
1283            return None;
1284        };
1285
1286        let currency = account
1287            .base_currency()
1288            .unwrap_or_else(|| instrument.settlement_currency());
1289
1290        let positions_open =
1291            cache.positions_open(None, Some(instrument_id), None, account_id, None);
1292
1293        if positions_open.is_empty() {
1294            return Some(Money::new(0.0, currency));
1295        }
1296
1297        let mut total_pnl = 0.0;
1298
1299        for position in positions_open {
1300            if position.instrument_id != *instrument_id {
1301                continue; // Nothing to calculate
1302            }
1303
1304            if position.side == PositionSide::Flat {
1305                continue; // Nothing to calculate
1306            }
1307
1308            let price = if let Some(price) = self.get_price(position) {
1309                price
1310            } else {
1311                log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
1312                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1313                return None; // Cannot calculate
1314            };
1315
1316            let mut pnl = position.unrealized_pnl(price).as_f64();
1317
1318            if let Some(base_currency) = account.base_currency() {
1319                let xrate = if let Some(xrate) = self.calculate_xrate_to_base(instrument, account) {
1320                    xrate
1321                } else {
1322                    log::error!(
1323                        // TODO: Improve logging
1324                        "Cannot calculate unrealized PnL: insufficient data for {}/{}",
1325                        instrument.settlement_currency(),
1326                        base_currency
1327                    );
1328                    self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1329                    return None; // Cannot calculate
1330                };
1331
1332                let scale = 10f64.powi(currency.precision.into());
1333                pnl = ((pnl * xrate) * scale).round() / scale;
1334            }
1335
1336            total_pnl += pnl;
1337        }
1338
1339        Some(Money::new(total_pnl, currency))
1340    }
1341
1342    fn ensure_snapshot_pnls_cached_for(&self, instrument_id: &InstrumentId) {
1343        // Performance: This method maintains an incremental cache of snapshot PnLs
1344        // It only deserializes new snapshots that haven't been processed yet
1345        // Tracks sum and last PnL per position for efficient NETTING OMS support
1346
1347        // Get all position IDs that have snapshots for this instrument
1348        let snapshot_position_ids = self.cache.borrow().position_snapshot_ids(instrument_id);
1349
1350        if snapshot_position_ids.is_empty() {
1351            return; // Nothing to process
1352        }
1353
1354        let mut rebuild = false;
1355
1356        // Detect purge/reset (count regression) to trigger full rebuild
1357        for position_id in &snapshot_position_ids {
1358            let curr_count = self.cache.borrow().position_snapshot_count(position_id);
1359            let prev_count = self
1360                .inner
1361                .borrow()
1362                .snapshot_processed_counts
1363                .get(position_id)
1364                .copied()
1365                .unwrap_or(0);
1366
1367            if prev_count > curr_count {
1368                rebuild = true;
1369                break;
1370            }
1371        }
1372
1373        if rebuild {
1374            // Full rebuild: process all snapshots from scratch
1375            for position_id in &snapshot_position_ids {
1376                // Track the raw frame count, not the decoded count: snapshots that fail
1377                // to deserialize are skipped and would otherwise make the incremental
1378                // path reprocess trailing valid frames next time.
1379                let snapshot_count = self.cache.borrow().position_snapshot_count(position_id);
1380                let snapshots = self
1381                    .cache
1382                    .borrow()
1383                    .position_snapshots(Some(position_id), None);
1384
1385                let mut sum_pnl: Option<Money> = None;
1386                let mut last_pnl: Option<Money> = None;
1387                let mut snapshot_account_id: Option<AccountId> = None;
1388
1389                for snapshot in snapshots {
1390                    snapshot_account_id.get_or_insert(snapshot.account_id);
1391                    if let Some(realized_pnl) = snapshot.realized_pnl {
1392                        if let Some(sum) = sum_pnl {
1393                            if sum.currency == realized_pnl.currency {
1394                                sum_pnl = Some(sum + realized_pnl);
1395                            }
1396                        } else {
1397                            sum_pnl = Some(realized_pnl);
1398                        }
1399                        last_pnl = Some(realized_pnl);
1400                    }
1401                }
1402
1403                let mut inner = self.inner.borrow_mut();
1404
1405                if let Some(sum) = sum_pnl {
1406                    inner.snapshot_sum_per_position.insert(*position_id, sum);
1407
1408                    if let Some(last) = last_pnl {
1409                        inner.snapshot_last_per_position.insert(*position_id, last);
1410                    }
1411                } else {
1412                    inner.snapshot_sum_per_position.remove(position_id);
1413                    inner.snapshot_last_per_position.remove(position_id);
1414                }
1415
1416                if let Some(account_id) = snapshot_account_id {
1417                    inner.snapshot_account_ids.insert(*position_id, account_id);
1418                } else {
1419                    inner.snapshot_account_ids.remove(position_id);
1420                }
1421
1422                inner
1423                    .snapshot_processed_counts
1424                    .insert(*position_id, snapshot_count);
1425            }
1426        } else {
1427            // Incremental path: only process new snapshots
1428            for position_id in &snapshot_position_ids {
1429                // Compare raw frame counts first so untouched positions skip any
1430                // allocation/serde cost on repeated PnL refreshes.
1431                let curr_count = self.cache.borrow().position_snapshot_count(position_id);
1432                let prev_count = self
1433                    .inner
1434                    .borrow()
1435                    .snapshot_processed_counts
1436                    .get(position_id)
1437                    .copied()
1438                    .unwrap_or(0);
1439
1440                if prev_count >= curr_count {
1441                    continue;
1442                }
1443
1444                let mut sum_pnl = self
1445                    .inner
1446                    .borrow()
1447                    .snapshot_sum_per_position
1448                    .get(position_id)
1449                    .copied();
1450                let mut last_pnl = self
1451                    .inner
1452                    .borrow()
1453                    .snapshot_last_per_position
1454                    .get(position_id)
1455                    .copied();
1456                let mut snapshot_account_id: Option<AccountId> = None;
1457
1458                let new_snapshots = self
1459                    .cache
1460                    .borrow()
1461                    .position_snapshots_from(position_id, prev_count);
1462
1463                for snapshot in new_snapshots {
1464                    snapshot_account_id.get_or_insert(snapshot.account_id);
1465                    if let Some(realized_pnl) = snapshot.realized_pnl {
1466                        if let Some(sum) = sum_pnl {
1467                            if sum.currency == realized_pnl.currency {
1468                                sum_pnl = Some(sum + realized_pnl);
1469                            }
1470                        } else {
1471                            sum_pnl = Some(realized_pnl);
1472                        }
1473                        last_pnl = Some(realized_pnl);
1474                    }
1475                }
1476
1477                let mut inner = self.inner.borrow_mut();
1478
1479                if let Some(sum) = sum_pnl {
1480                    inner.snapshot_sum_per_position.insert(*position_id, sum);
1481
1482                    if let Some(last) = last_pnl {
1483                        inner.snapshot_last_per_position.insert(*position_id, last);
1484                    }
1485                }
1486
1487                if let Some(account_id) = snapshot_account_id
1488                    && !inner.snapshot_account_ids.contains_key(position_id)
1489                {
1490                    inner.snapshot_account_ids.insert(*position_id, account_id);
1491                }
1492
1493                inner
1494                    .snapshot_processed_counts
1495                    .insert(*position_id, curr_count);
1496            }
1497        }
1498    }
1499
1500    fn calculate_realized_pnl(
1501        &self,
1502        instrument_id: &InstrumentId,
1503        account_id: Option<&AccountId>,
1504    ) -> Option<Money> {
1505        // Ensure snapshot PnLs are cached for this instrument
1506        self.ensure_snapshot_pnls_cached_for(instrument_id);
1507
1508        let cache = self.cache.borrow();
1509        let account = match account_id {
1510            Some(id) => cache.account(id),
1511            None => cache.account_for_venue(&instrument_id.venue),
1512        };
1513        let account = if let Some(account) = account {
1514            account
1515        } else {
1516            log::error!(
1517                "Cannot calculate realized PnL: no account for {} / {account_id:?}",
1518                instrument_id.venue,
1519            );
1520            return None;
1521        };
1522
1523        let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1524            instrument
1525        } else {
1526            log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
1527            return None;
1528        };
1529
1530        let currency = account
1531            .base_currency()
1532            .unwrap_or_else(|| instrument.settlement_currency());
1533
1534        let positions = cache.positions(None, Some(instrument_id), None, account_id, None);
1535
1536        // Filter snapshots by account when requested so closed-position PnL
1537        // from other accounts on the same venue does not leak in. Sort the
1538        // collected IDs so the per-snapshot pending-calcs/early-return path
1539        // and the value accumulation iterate in a deterministic sequence.
1540        let mut snapshot_position_ids: Vec<PositionId> = if let Some(filter_id) = account_id {
1541            let inner = self.inner.borrow();
1542            cache
1543                .position_snapshot_ids(instrument_id)
1544                .into_iter()
1545                .filter(|pid| {
1546                    inner
1547                        .snapshot_account_ids
1548                        .get(pid)
1549                        .is_some_and(|id| id == filter_id)
1550                })
1551                .collect()
1552        } else {
1553            cache
1554                .position_snapshot_ids(instrument_id)
1555                .into_iter()
1556                .collect()
1557        };
1558        snapshot_position_ids.sort();
1559
1560        // Check if we need to use NETTING OMS logic
1561        let is_netting = positions
1562            .iter()
1563            .any(|p| cache.oms_type(&p.id) == Some(OmsType::Netting));
1564
1565        let mut total_pnl = 0.0;
1566
1567        if is_netting && !snapshot_position_ids.is_empty() {
1568            // NETTING OMS: Apply 3-case rule for position cycles
1569
1570            for position_id in &snapshot_position_ids {
1571                let is_active = positions.iter().any(|p| p.id == *position_id);
1572
1573                if is_active {
1574                    // Case 1 & 2: Active position - use only the last snapshot PnL
1575                    let last_pnl = self
1576                        .inner
1577                        .borrow()
1578                        .snapshot_last_per_position
1579                        .get(position_id)
1580                        .copied();
1581
1582                    if let Some(last_pnl) = last_pnl {
1583                        let mut pnl = last_pnl.as_f64();
1584
1585                        if let Some(base_currency) = account.base_currency()
1586                            && positions.iter().any(|p| p.id == *position_id)
1587                        {
1588                            let xrate = if let Some(xrate) =
1589                                self.calculate_xrate_to_base(instrument, account)
1590                            {
1591                                xrate
1592                            } else {
1593                                log::error!(
1594                                    "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1595                                    instrument.settlement_currency(),
1596                                    base_currency
1597                                );
1598                                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1599                                return Some(Money::new(0.0, currency));
1600                            };
1601
1602                            let scale = 10f64.powi(currency.precision.into());
1603                            pnl = ((pnl * xrate) * scale).round() / scale;
1604                        }
1605
1606                        total_pnl += pnl;
1607                    }
1608                } else {
1609                    // Case 3: Closed position - use sum of all snapshot PnLs
1610                    let sum_pnl = self
1611                        .inner
1612                        .borrow()
1613                        .snapshot_sum_per_position
1614                        .get(position_id)
1615                        .copied();
1616
1617                    if let Some(sum_pnl) = sum_pnl {
1618                        let mut pnl = sum_pnl.as_f64();
1619
1620                        if let Some(base_currency) = account.base_currency() {
1621                            // For closed positions, we don't have entry price, use current rates
1622                            let xrate = cache.get_xrate(
1623                                instrument_id.venue,
1624                                instrument.settlement_currency(),
1625                                base_currency,
1626                                PriceType::Mid,
1627                            );
1628
1629                            if let Some(xrate) = xrate {
1630                                let scale = 10f64.powi(currency.precision.into());
1631                                pnl = ((pnl * xrate) * scale).round() / scale;
1632                            } else {
1633                                log::error!(
1634                                    "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1635                                    instrument.settlement_currency(),
1636                                    base_currency
1637                                );
1638                                self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1639                                return Some(Money::new(0.0, currency));
1640                            }
1641                        }
1642
1643                        total_pnl += pnl;
1644                    }
1645                }
1646            }
1647
1648            // Add realized PnL from current active positions
1649            for position in positions {
1650                if position.instrument_id != *instrument_id {
1651                    continue;
1652                }
1653
1654                if let Some(realized_pnl) = position.realized_pnl {
1655                    let mut pnl = realized_pnl.as_f64();
1656
1657                    if let Some(base_currency) = account.base_currency() {
1658                        let xrate = if let Some(xrate) =
1659                            self.calculate_xrate_to_base(instrument, account)
1660                        {
1661                            xrate
1662                        } else {
1663                            log::error!(
1664                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1665                                instrument.settlement_currency(),
1666                                base_currency
1667                            );
1668                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1669                            return Some(Money::new(0.0, currency));
1670                        };
1671
1672                        let scale = 10f64.powi(currency.precision.into());
1673                        pnl = ((pnl * xrate) * scale).round() / scale;
1674                    }
1675
1676                    total_pnl += pnl;
1677                }
1678            }
1679        } else {
1680            // HEDGING OMS or no snapshots: Simple aggregation
1681            // Add snapshot PnLs (sum all)
1682            for position_id in &snapshot_position_ids {
1683                let sum_pnl = self
1684                    .inner
1685                    .borrow()
1686                    .snapshot_sum_per_position
1687                    .get(position_id)
1688                    .copied();
1689
1690                if let Some(sum_pnl) = sum_pnl {
1691                    let mut pnl = sum_pnl.as_f64();
1692
1693                    if let Some(base_currency) = account.base_currency() {
1694                        let xrate = cache.get_xrate(
1695                            instrument_id.venue,
1696                            instrument.settlement_currency(),
1697                            base_currency,
1698                            PriceType::Mid,
1699                        );
1700
1701                        if let Some(xrate) = xrate {
1702                            let scale = 10f64.powi(currency.precision.into());
1703                            pnl = ((pnl * xrate) * scale).round() / scale;
1704                        } else {
1705                            log::error!(
1706                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1707                                instrument.settlement_currency(),
1708                                base_currency
1709                            );
1710                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1711                            return Some(Money::new(0.0, currency));
1712                        }
1713                    }
1714
1715                    total_pnl += pnl;
1716                }
1717            }
1718
1719            // Add realized PnL from current positions
1720            for position in positions {
1721                if position.instrument_id != *instrument_id {
1722                    continue;
1723                }
1724
1725                if let Some(realized_pnl) = position.realized_pnl {
1726                    let mut pnl = realized_pnl.as_f64();
1727
1728                    if let Some(base_currency) = account.base_currency() {
1729                        let xrate = if let Some(xrate) =
1730                            self.calculate_xrate_to_base(instrument, account)
1731                        {
1732                            xrate
1733                        } else {
1734                            log::error!(
1735                                "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1736                                instrument.settlement_currency(),
1737                                base_currency
1738                            );
1739                            self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1740                            return Some(Money::new(0.0, currency));
1741                        };
1742
1743                        let scale = 10f64.powi(currency.precision.into());
1744                        pnl = ((pnl * xrate) * scale).round() / scale;
1745                    }
1746
1747                    total_pnl += pnl;
1748                }
1749            }
1750        }
1751
1752        Some(Money::new(total_pnl, currency))
1753    }
1754
1755    fn get_price(&self, position: &Position) -> Option<Price> {
1756        let cache = self.cache.borrow();
1757        let instrument_id = &position.instrument_id;
1758
1759        // Check for mark price first if configured
1760        if self.config.use_mark_prices
1761            && let Some(mark_price) = cache.mark_price(instrument_id)
1762        {
1763            return Some(mark_price.value);
1764        }
1765
1766        // Fall back to bid/ask based on position side
1767        let price_type = match position.side {
1768            PositionSide::Long => PriceType::Bid,
1769            PositionSide::Short => PriceType::Ask,
1770            _ => panic!("invalid `PositionSide`, was {}", position.side),
1771        };
1772
1773        cache
1774            .price(instrument_id, price_type)
1775            .or_else(|| cache.price(instrument_id, PriceType::Last))
1776            .or_else(|| {
1777                self.inner
1778                    .borrow()
1779                    .bar_close_prices
1780                    .get(instrument_id)
1781                    .copied()
1782            })
1783    }
1784
1785    fn calculate_xrate_to_base(
1786        &self,
1787        instrument: &InstrumentAny,
1788        account: &AccountAny,
1789    ) -> Option<f64> {
1790        if !self.config.convert_to_account_base_currency {
1791            return Some(1.0); // No conversion needed
1792        }
1793
1794        let base_currency = match account.base_currency() {
1795            Some(base_currency) => base_currency,
1796            None => return Some(1.0),
1797        };
1798
1799        let settlement = instrument.settlement_currency();
1800        let cache = self.cache.borrow();
1801
1802        if self.config.use_mark_xrates
1803            && let Some(xrate) = cache.get_mark_xrate(settlement, base_currency)
1804        {
1805            return Some(xrate);
1806        }
1807
1808        cache.get_xrate(
1809            instrument.id().venue,
1810            settlement,
1811            base_currency,
1812            PriceType::Mid,
1813        )
1814    }
1815}
1816
1817// Helper functions
1818fn update_quote_tick(
1819    cache: &Rc<RefCell<Cache>>,
1820    clock: &Rc<RefCell<dyn Clock>>,
1821    inner: &Rc<RefCell<PortfolioState>>,
1822    config: PortfolioConfig,
1823    quote: &QuoteTick,
1824) {
1825    update_instrument_id(cache, clock, inner, config, &quote.instrument_id);
1826}
1827
1828fn update_bar(
1829    cache: &Rc<RefCell<Cache>>,
1830    clock: &Rc<RefCell<dyn Clock>>,
1831    inner: &Rc<RefCell<PortfolioState>>,
1832    config: PortfolioConfig,
1833    bar: &Bar,
1834) {
1835    let instrument_id = bar.bar_type.instrument_id();
1836    inner
1837        .borrow_mut()
1838        .bar_close_prices
1839        .insert(instrument_id, bar.close);
1840    update_instrument_id(cache, clock, inner, config, &instrument_id);
1841}
1842
1843fn update_instrument_id(
1844    cache: &Rc<RefCell<Cache>>,
1845    clock: &Rc<RefCell<dyn Clock>>,
1846    inner: &Rc<RefCell<PortfolioState>>,
1847    config: PortfolioConfig,
1848    instrument_id: &InstrumentId,
1849) {
1850    inner
1851        .borrow_mut()
1852        .unrealized_pnls
1853        .shift_remove(instrument_id);
1854
1855    if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1856        return;
1857    }
1858
1859    let mut result_maint = None;
1860
1861    // Scoped borrow: must drop before calling AccountsManager (which borrows cache internally)
1862    let (account, instrument, orders_open, positions_open) = {
1863        let cache_ref = cache.borrow();
1864        let account = if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1865            account.clone()
1866        } else {
1867            log::error!(
1868                "Cannot update tick: no account registered for {}",
1869                instrument_id.venue
1870            );
1871            return;
1872        };
1873        let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1874            instrument.clone()
1875        } else {
1876            log::error!("Cannot update tick: no instrument found for {instrument_id}");
1877            return;
1878        };
1879        let orders_open: Vec<OrderAny> = cache_ref
1880            .orders_open(None, Some(instrument_id), None, None, None)
1881            .iter()
1882            .map(|o| (*o).clone())
1883            .collect();
1884        let positions_open: Vec<Position> = cache_ref
1885            .positions_open(None, Some(instrument_id), None, None, None)
1886            .iter()
1887            .map(|p| (*p).clone())
1888            .collect();
1889        (account, instrument, orders_open, positions_open)
1890    };
1891
1892    // No cache borrow held: AccountsManager borrows cache internally for xrate lookups
1893    let result_init = inner.borrow().accounts.update_orders(
1894        &account,
1895        &instrument,
1896        orders_open.iter().collect(),
1897        clock.borrow().timestamp_ns(),
1898    );
1899
1900    if let AccountAny::Margin(ref margin_account) = account {
1901        result_maint = inner.borrow().accounts.update_positions(
1902            margin_account,
1903            &instrument,
1904            positions_open.iter().collect(),
1905            clock.borrow().timestamp_ns(),
1906        );
1907    }
1908
1909    if let Some((ref updated_account, _)) = result_init {
1910        cache.borrow_mut().update_account(updated_account).unwrap();
1911    }
1912
1913    let portfolio_clone = Portfolio {
1914        clock: clock.clone(),
1915        cache: cache.clone(),
1916        inner: inner.clone(),
1917        config,
1918    };
1919
1920    let result_unrealized_pnl: Option<Money> =
1921        portfolio_clone.calculate_unrealized_pnl(instrument_id, None);
1922
1923    if result_init.is_some()
1924        && (matches!(account, AccountAny::Cash(_) | AccountAny::Betting(_))
1925            || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1926    {
1927        inner.borrow_mut().pending_calcs.remove(instrument_id);
1928        if inner.borrow().pending_calcs.is_empty() {
1929            inner.borrow_mut().initialized = true;
1930        }
1931    }
1932}
1933
1934fn update_order(
1935    cache: &Rc<RefCell<Cache>>,
1936    clock: &Rc<RefCell<dyn Clock>>,
1937    inner: &Rc<RefCell<PortfolioState>>,
1938    _config: PortfolioConfig,
1939    event: &OrderEventAny,
1940) {
1941    let account_id = match event.account_id() {
1942        Some(account_id) => account_id,
1943        None => {
1944            return; // No Account Assigned
1945        }
1946    };
1947
1948    // Scoped borrow: must drop before calling AccountsManager (which borrows cache internally)
1949    let (account, instrument, orders_open) = {
1950        let cache_ref = cache.borrow();
1951
1952        let account = if let Some(account) = cache_ref.account(&account_id) {
1953            account.clone()
1954        } else {
1955            log::error!("Cannot update order: no account registered for {account_id}");
1956            return;
1957        };
1958
1959        match &account {
1960            AccountAny::Margin(margin_account) => {
1961                if !margin_account.base.calculate_account_state {
1962                    return;
1963                }
1964            }
1965            AccountAny::Cash(cash_account) => {
1966                if !cash_account.base.calculate_account_state {
1967                    return;
1968                }
1969            }
1970            AccountAny::Betting(betting_account) => {
1971                if !betting_account.base.calculate_account_state {
1972                    return;
1973                }
1974            }
1975        }
1976
1977        match event {
1978            OrderEventAny::Accepted(_)
1979            | OrderEventAny::Canceled(_)
1980            | OrderEventAny::Rejected(_)
1981            | OrderEventAny::Updated(_)
1982            | OrderEventAny::Filled(_) => {}
1983            _ => {
1984                return;
1985            }
1986        }
1987
1988        let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1989            order
1990        } else {
1991            log::error!(
1992                "Cannot update order: {} not found in the cache",
1993                event.client_order_id()
1994            );
1995            return; // No Order Found
1996        };
1997
1998        if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit
1999        {
2000            return; // No change to account state
2001        }
2002
2003        let instrument = if let Some(instrument) = cache_ref.instrument(&event.instrument_id()) {
2004            instrument.clone()
2005        } else {
2006            log::error!(
2007                "Cannot update order: no instrument found for {}",
2008                event.instrument_id()
2009            );
2010            return;
2011        };
2012
2013        let orders_open: Vec<OrderAny> = cache_ref
2014            .orders_open(None, Some(&event.instrument_id()), None, None, None)
2015            .iter()
2016            .map(|o| (*o).clone())
2017            .collect();
2018
2019        (account, instrument, orders_open)
2020    };
2021
2022    // No cache borrow held: AccountsManager borrows cache internally for xrate lookups
2023    if let OrderEventAny::Filled(order_filled) = event {
2024        let _ =
2025            inner
2026                .borrow()
2027                .accounts
2028                .update_balances(account.clone(), &instrument, *order_filled);
2029
2030        let portfolio_clone = Portfolio {
2031            clock: clock.clone(),
2032            cache: cache.clone(),
2033            inner: inner.clone(),
2034            config: PortfolioConfig::default(), // TODO: TBD
2035        };
2036
2037        match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id, None) {
2038            Some(unrealized_pnl) => {
2039                inner
2040                    .borrow_mut()
2041                    .unrealized_pnls
2042                    .insert(event.instrument_id(), unrealized_pnl);
2043            }
2044            None => {
2045                log::error!(
2046                    "Failed to calculate unrealized PnL for instrument {}",
2047                    event.instrument_id()
2048                );
2049            }
2050        }
2051    }
2052
2053    let account_state = inner.borrow().accounts.update_orders(
2054        &account,
2055        &instrument,
2056        orders_open.iter().collect(),
2057        clock.borrow().timestamp_ns(),
2058    );
2059
2060    if let Some((updated_account, account_state)) = account_state {
2061        cache.borrow_mut().update_account(&updated_account).unwrap();
2062        msgbus::publish_account_state(
2063            format!("events.account.{}", account.id()).into(),
2064            &account_state,
2065        );
2066    } else {
2067        log::debug!("Added pending calculation for {}", instrument.id());
2068        inner.borrow_mut().pending_calcs.insert(instrument.id());
2069    }
2070
2071    log::debug!("Updated {event}");
2072}
2073
2074fn update_position(
2075    cache: &Rc<RefCell<Cache>>,
2076    clock: &Rc<RefCell<dyn Clock>>,
2077    inner: &Rc<RefCell<PortfolioState>>,
2078    _config: PortfolioConfig,
2079    event: &PositionEvent,
2080) {
2081    let instrument_id = event.instrument_id();
2082
2083    let positions_open: Vec<Position> = {
2084        let cache_ref = cache.borrow();
2085
2086        cache_ref
2087            .positions_open(None, Some(&instrument_id), None, None, None)
2088            .iter()
2089            .map(|o| (*o).clone())
2090            .collect()
2091    };
2092
2093    log::debug!("position fresh from cache -> {positions_open:?}");
2094
2095    let portfolio_clone = Portfolio {
2096        clock: clock.clone(),
2097        cache: cache.clone(),
2098        inner: inner.clone(),
2099        config: PortfolioConfig::default(), // TODO: TBD
2100    };
2101
2102    portfolio_clone.update_net_position(&instrument_id, &positions_open);
2103
2104    if let Some(calculated_unrealized_pnl) =
2105        portfolio_clone.calculate_unrealized_pnl(&instrument_id, None)
2106    {
2107        inner
2108            .borrow_mut()
2109            .unrealized_pnls
2110            .insert(event.instrument_id(), calculated_unrealized_pnl);
2111    } else {
2112        log::debug!(
2113            "Failed to calculate unrealized PnL for {}, marking as pending",
2114            event.instrument_id()
2115        );
2116        inner
2117            .borrow_mut()
2118            .pending_calcs
2119            .insert(event.instrument_id());
2120    }
2121
2122    if let Some(calculated_realized_pnl) =
2123        portfolio_clone.calculate_realized_pnl(&instrument_id, None)
2124    {
2125        inner
2126            .borrow_mut()
2127            .realized_pnls
2128            .insert(event.instrument_id(), calculated_realized_pnl);
2129    } else {
2130        log::warn!(
2131            "Failed to calculate realized PnL for {}, marking as pending",
2132            event.instrument_id()
2133        );
2134        inner
2135            .borrow_mut()
2136            .pending_calcs
2137            .insert(event.instrument_id());
2138    }
2139
2140    let cache_ref = cache.borrow();
2141    let account = cache_ref.account(&event.account_id());
2142
2143    if let Some(AccountAny::Margin(margin_account)) = account {
2144        if !margin_account.calculate_account_state {
2145            return; // Nothing to calculate
2146        }
2147
2148        let cache_ref = cache.borrow();
2149        let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
2150            instrument
2151        } else {
2152            log::error!("Cannot update position: no instrument found for {instrument_id}");
2153            return;
2154        };
2155
2156        let result = inner.borrow_mut().accounts.update_positions(
2157            margin_account,
2158            instrument,
2159            positions_open.iter().collect(),
2160            clock.borrow().timestamp_ns(),
2161        );
2162        let mut cache_ref = cache.borrow_mut();
2163
2164        if let Some((margin_account, _)) = result {
2165            cache_ref
2166                .update_account(&AccountAny::Margin(margin_account))
2167                .unwrap();
2168        }
2169    } else if account.is_none() {
2170        log::error!(
2171            "Cannot update position: no account registered for {}",
2172            event.account_id()
2173        );
2174    }
2175}
2176
2177fn update_account(
2178    cache: &Rc<RefCell<Cache>>,
2179    inner: &Rc<RefCell<PortfolioState>>,
2180    event: &AccountState,
2181) {
2182    let mut cache_ref = cache.borrow_mut();
2183
2184    if let Some(existing) = cache_ref.account(&event.account_id) {
2185        let mut account = existing.clone();
2186        if let Err(e) = account.apply(event.clone()) {
2187            log::error!("Failed to apply account state: {e}");
2188            return;
2189        }
2190
2191        if let Err(e) = cache_ref.update_account(&account) {
2192            log::error!("Failed to update account: {e}");
2193            return;
2194        }
2195    } else {
2196        let account = match AccountAny::from_events(std::slice::from_ref(event)) {
2197            Ok(account) => account,
2198            Err(e) => {
2199                log::error!("Failed to create account: {e}");
2200                return;
2201            }
2202        };
2203
2204        if let Err(e) = cache_ref.add_account(account) {
2205            log::error!("Failed to add account: {e}");
2206            return;
2207        }
2208    }
2209
2210    // Throttled logging logic
2211    let mut inner_ref = inner.borrow_mut();
2212    let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
2213        let current_ts = event.ts_init.as_u64();
2214        let last_ts = inner_ref
2215            .last_account_state_log_ts
2216            .get(&event.account_id)
2217            .copied()
2218            .unwrap_or(0);
2219
2220        if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
2221        {
2222            inner_ref
2223                .last_account_state_log_ts
2224                .insert(event.account_id, current_ts);
2225            true
2226        } else {
2227            false
2228        }
2229    } else {
2230        true // Throttling disabled, always log
2231    };
2232
2233    if should_log {
2234        log::info!("Updated {event}");
2235    }
2236}