1use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::{AHashMap, AHashSet};
21use indexmap::{IndexMap, IndexSet};
22use nautilus_analysis::analyzer::PortfolioAnalyzer;
23use nautilus_common::{
24 cache::Cache,
25 clock::Clock,
26 enums::LogColor,
27 msgbus::{self, MessagingSwitchboard, TypedHandler},
28};
29use nautilus_core::{WeakCell, datetime::NANOSECONDS_IN_MILLISECOND};
30use nautilus_model::{
31 accounts::{Account, AccountAny},
32 data::{Bar, MarkPriceUpdate, QuoteTick},
33 enums::{OmsType, OrderType, PositionSide, PriceType},
34 events::{AccountState, OrderEventAny, position::PositionEvent},
35 identifiers::{AccountId, InstrumentId, PositionId, Venue},
36 instruments::{Instrument, InstrumentAny},
37 orders::{Order, OrderAny},
38 position::Position,
39 types::{Currency, Money, Price},
40};
41use rust_decimal::Decimal;
42
43use crate::{config::PortfolioConfig, manager::AccountsManager};
44
45struct PortfolioState {
46 accounts: AccountsManager,
47 analyzer: PortfolioAnalyzer,
48 unrealized_pnls: IndexMap<InstrumentId, Money>,
49 realized_pnls: IndexMap<InstrumentId, Money>,
50 snapshot_sum_per_position: AHashMap<PositionId, Money>,
51 snapshot_last_per_position: AHashMap<PositionId, Money>,
52 snapshot_processed_counts: AHashMap<PositionId, usize>,
53 snapshot_account_ids: AHashMap<PositionId, AccountId>,
54 net_positions: IndexMap<InstrumentId, Decimal>,
55 pending_calcs: AHashSet<InstrumentId>,
56 bar_close_prices: AHashMap<InstrumentId, Price>,
57 initialized: bool,
58 last_account_state_log_ts: AHashMap<AccountId, u64>,
59 min_account_state_logging_interval_ns: u64,
60 venues_missing_price: AHashMap<Venue, AHashSet<InstrumentId>>,
61}
62
63impl PortfolioState {
64 fn new(
65 clock: Rc<RefCell<dyn Clock>>,
66 cache: Rc<RefCell<Cache>>,
67 config: &PortfolioConfig,
68 ) -> Self {
69 let min_account_state_logging_interval_ns = config
70 .min_account_state_logging_interval_ms
71 .map_or(0, |ms| ms * NANOSECONDS_IN_MILLISECOND);
72
73 Self {
74 accounts: AccountsManager::new(clock, cache),
75 analyzer: PortfolioAnalyzer::default(),
76 unrealized_pnls: IndexMap::new(),
77 realized_pnls: IndexMap::new(),
78 snapshot_sum_per_position: AHashMap::new(),
79 snapshot_last_per_position: AHashMap::new(),
80 snapshot_processed_counts: AHashMap::new(),
81 snapshot_account_ids: AHashMap::new(),
82 net_positions: IndexMap::new(),
83 pending_calcs: AHashSet::new(),
84 bar_close_prices: AHashMap::new(),
85 initialized: false,
86 last_account_state_log_ts: AHashMap::new(),
87 min_account_state_logging_interval_ns,
88 venues_missing_price: AHashMap::new(),
89 }
90 }
91
92 fn reset(&mut self) {
93 log::debug!("RESETTING");
94 self.net_positions.clear();
95 self.unrealized_pnls.clear();
96 self.realized_pnls.clear();
97 self.snapshot_sum_per_position.clear();
98 self.snapshot_last_per_position.clear();
99 self.snapshot_processed_counts.clear();
100 self.snapshot_account_ids.clear();
101 self.pending_calcs.clear();
102 self.bar_close_prices.clear();
103 self.last_account_state_log_ts.clear();
104 self.venues_missing_price.clear();
105 self.analyzer.reset();
106 self.initialized = false;
107 log::debug!("READY");
108 }
109}
110
111pub struct Portfolio {
112 pub(crate) clock: Rc<RefCell<dyn Clock>>,
113 pub(crate) cache: Rc<RefCell<Cache>>,
114 inner: Rc<RefCell<PortfolioState>>,
115 config: PortfolioConfig,
116}
117
118impl Debug for Portfolio {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 f.debug_struct(stringify!(Portfolio)).finish()
121 }
122}
123
124impl Portfolio {
125 pub fn new(
126 cache: Rc<RefCell<Cache>>,
127 clock: Rc<RefCell<dyn Clock>>,
128 config: Option<PortfolioConfig>,
129 ) -> Self {
130 let config = config.unwrap_or_default();
131 let inner = Rc::new(RefCell::new(PortfolioState::new(
132 clock.clone(),
133 cache.clone(),
134 &config,
135 )));
136
137 Self::register_message_handlers(&cache, &clock, &inner, config);
138
139 Self {
140 clock,
141 cache,
142 inner,
143 config,
144 }
145 }
146
147 #[must_use]
152 pub fn clone_shallow(&self) -> Self {
153 Self {
154 clock: self.clock.clone(),
155 cache: self.cache.clone(),
156 inner: self.inner.clone(),
157 config: self.config,
158 }
159 }
160
161 fn register_message_handlers(
162 cache: &Rc<RefCell<Cache>>,
163 clock: &Rc<RefCell<dyn Clock>>,
164 inner: &Rc<RefCell<PortfolioState>>,
165 config: PortfolioConfig,
166 ) {
167 let inner_weak = WeakCell::from(Rc::downgrade(inner));
168
169 let update_account_handler = {
171 let cache = cache.clone();
172 let inner = inner_weak.clone();
173 TypedHandler::from(move |event: &AccountState| {
174 if let Some(inner_rc) = inner.upgrade() {
175 let inner_rc: Rc<RefCell<PortfolioState>> = inner_rc.into();
176 update_account(&cache, &inner_rc, event);
177 }
178 })
179 };
180
181 let update_position_handler = {
182 let cache = cache.clone();
183 let clock = clock.clone();
184 let inner = inner_weak.clone();
185 TypedHandler::from(move |event: &PositionEvent| {
186 if let Some(inner_rc) = inner.upgrade() {
187 let inner_rc: Rc<RefCell<PortfolioState>> = inner_rc.into();
188 update_position(&cache, &clock, &inner_rc, config, event);
189 }
190 })
191 };
192
193 let update_quote_handler = {
194 let cache = cache.clone();
195 let clock = clock.clone();
196 let inner = inner_weak.clone();
197 TypedHandler::from(move |quote: &QuoteTick| {
198 if let Some(inner_rc) = inner.upgrade() {
199 let inner_rc: Rc<RefCell<PortfolioState>> = inner_rc.into();
200 update_quote_tick(&cache, &clock, &inner_rc, config, quote);
201 }
202 })
203 };
204
205 let update_bar_handler = {
206 let cache = cache.clone();
207 let clock = clock.clone();
208 let inner = inner_weak.clone();
209 TypedHandler::from(move |bar: &Bar| {
210 if let Some(inner_rc) = inner.upgrade() {
211 let inner_rc: Rc<RefCell<PortfolioState>> = inner_rc.into();
212 update_bar(&cache, &clock, &inner_rc, config, bar);
213 }
214 })
215 };
216
217 let update_mark_price_handler = {
218 let cache = cache.clone();
219 let clock = clock.clone();
220 let inner = inner_weak.clone();
221 TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
222 if let Some(inner_rc) = inner.upgrade() {
223 let inner_rc: Rc<RefCell<PortfolioState>> = inner_rc.into();
224 update_instrument_id(
225 &cache,
226 &clock,
227 &inner_rc,
228 config,
229 &mark_price.instrument_id,
230 );
231 }
232 })
233 };
234
235 let update_order_handler = {
236 let cache = cache.clone();
237 let clock = clock.clone();
238 let inner = inner_weak;
239 TypedHandler::from(move |event: &OrderEventAny| {
240 if let Some(inner_rc) = inner.upgrade() {
241 let inner_rc: Rc<RefCell<PortfolioState>> = inner_rc.into();
242 update_order(&cache, &clock, &inner_rc, config, event);
243 }
244 })
245 };
246
247 let endpoint = MessagingSwitchboard::portfolio_update_account();
248 msgbus::register_account_state_endpoint(endpoint, update_account_handler.clone());
249
250 msgbus::subscribe_quotes("data.quotes.*".into(), update_quote_handler, Some(10));
251
252 if config.bar_updates {
253 msgbus::subscribe_bars("data.bars.*EXTERNAL".into(), update_bar_handler, Some(10));
254 }
255
256 if config.use_mark_prices {
257 msgbus::subscribe_mark_prices(
258 "data.mark_prices.*".into(),
259 update_mark_price_handler,
260 Some(10),
261 );
262 }
263 msgbus::subscribe_order_events("events.order.*".into(), update_order_handler, Some(10));
264 msgbus::subscribe_position_events(
265 "events.position.*".into(),
266 update_position_handler,
267 Some(10),
268 );
269 msgbus::subscribe_account_state(
270 "events.account.*".into(),
271 update_account_handler,
272 Some(10),
273 );
274 }
275
276 pub fn reset(&mut self) {
277 log::debug!("RESETTING");
278 self.inner.borrow_mut().reset();
279 log::debug!("READY");
280 }
281
282 #[must_use]
284 pub fn cache(&self) -> &Rc<RefCell<Cache>> {
285 &self.cache
286 }
287
288 #[must_use]
290 pub fn is_initialized(&self) -> bool {
291 self.inner.borrow().initialized
292 }
293
294 #[must_use]
298 pub fn balances_locked(&self, venue: &Venue) -> IndexMap<Currency, Money> {
299 self.cache.borrow().account_for_venue(venue).map_or_else(
300 || {
301 log::error!("Cannot get balances locked: no account generated for {venue}");
302 IndexMap::new()
303 },
304 AccountAny::balances_locked,
305 )
306 }
307
308 #[must_use]
312 pub fn margins_init(&self, venue: &Venue) -> IndexMap<InstrumentId, Money> {
313 self.cache.borrow().account_for_venue(venue).map_or_else(
314 || {
315 log::error!(
316 "Cannot get initial (order) margins: no account registered for {venue}"
317 );
318 IndexMap::new()
319 },
320 |account| match account {
321 AccountAny::Margin(margin_account) => margin_account.initial_margins(),
322 AccountAny::Cash(_) | AccountAny::Betting(_) => {
323 log::warn!("Initial margins not applicable for cash account");
324 IndexMap::new()
325 }
326 },
327 )
328 }
329
330 #[must_use]
334 pub fn margins_maint(&self, venue: &Venue) -> IndexMap<InstrumentId, Money> {
335 self.cache.borrow().account_for_venue(venue).map_or_else(
336 || {
337 log::error!(
338 "Cannot get maintenance (position) margins: no account registered for {venue}"
339 );
340 IndexMap::new()
341 },
342 |account| match account {
343 AccountAny::Margin(margin_account) => margin_account.maintenance_margins(),
344 AccountAny::Cash(_) | AccountAny::Betting(_) => {
345 log::warn!("Maintenance margins not applicable for cash account");
346 IndexMap::new()
347 }
348 },
349 )
350 }
351
352 #[must_use]
356 pub fn unrealized_pnls(
357 &mut self,
358 venue: &Venue,
359 account_id: Option<&AccountId>,
360 ) -> IndexMap<Currency, Money> {
361 let instrument_ids = {
362 let cache = self.cache.borrow();
363 let positions = cache.positions(Some(venue), None, None, account_id, None);
364
365 if positions.is_empty() {
366 return IndexMap::new(); }
368
369 let instrument_ids: IndexSet<InstrumentId> =
373 positions.iter().map(|p| p.instrument_id).collect();
374
375 instrument_ids
376 };
377
378 let mut unrealized_pnls: IndexMap<Currency, f64> = IndexMap::new();
379
380 for instrument_id in instrument_ids {
381 if account_id.is_none()
384 && let Some(&pnl) = self.inner.borrow_mut().unrealized_pnls.get(&instrument_id)
385 {
386 *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
387 continue;
388 }
389
390 if let Some(pnl) = self.calculate_unrealized_pnl(&instrument_id, account_id) {
391 *unrealized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
392 }
393 }
394
395 unrealized_pnls
396 .into_iter()
397 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
398 .collect()
399 }
400
401 #[must_use]
405 pub fn realized_pnls(
406 &mut self,
407 venue: &Venue,
408 account_id: Option<&AccountId>,
409 ) -> IndexMap<Currency, Money> {
410 let instrument_ids = {
411 let cache = self.cache.borrow();
412 let positions = cache.positions(Some(venue), None, None, account_id, None);
413
414 if positions.is_empty() {
415 return IndexMap::new(); }
417
418 let instrument_ids: IndexSet<InstrumentId> =
419 positions.iter().map(|p| p.instrument_id).collect();
420
421 instrument_ids
422 };
423
424 let mut realized_pnls: IndexMap<Currency, f64> = IndexMap::new();
425
426 for instrument_id in instrument_ids {
427 if account_id.is_none()
430 && let Some(&pnl) = self.inner.borrow_mut().realized_pnls.get(&instrument_id)
431 {
432 *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
433 continue;
434 }
435
436 if let Some(pnl) = self.calculate_realized_pnl(&instrument_id, account_id) {
437 *realized_pnls.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
438 }
439 }
440
441 realized_pnls
442 .into_iter()
443 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
444 .collect()
445 }
446
447 #[must_use]
448 pub fn net_exposures(
449 &self,
450 venue: &Venue,
451 account_id: Option<&AccountId>,
452 ) -> Option<IndexMap<Currency, Money>> {
453 let cache = self.cache.borrow();
454 let account = if let Some(id) = account_id {
455 if let Some(account) = cache.account(id) {
456 account
457 } else {
458 log::error!("Cannot calculate net exposures: no account for {id}");
459 return None;
460 }
461 } else if let Some(account) = cache.account_for_venue(venue) {
462 account
463 } else {
464 log::error!("Cannot calculate net exposures: no account registered for {venue}");
465 return None;
466 };
467
468 let positions_open = cache.positions_open(Some(venue), None, None, account_id, None);
469 if positions_open.is_empty() {
470 return Some(IndexMap::new()); }
472
473 let mut net_exposures: IndexMap<Currency, f64> = IndexMap::new();
474
475 for position in positions_open {
476 let instrument = if let Some(instrument) = cache.instrument(&position.instrument_id) {
477 instrument
478 } else {
479 log::error!(
480 "Cannot calculate net exposures: no instrument for {}",
481 position.instrument_id
482 );
483 return None; };
485
486 if position.side == PositionSide::Flat {
487 log::error!(
488 "Cannot calculate net exposures: position is flat for {}",
489 position.instrument_id
490 );
491 continue; }
493
494 let price = self.get_price(position)?;
495 let xrate = if let Some(xrate) = self.calculate_xrate_to_base(instrument, account) {
496 xrate
497 } else {
498 log::error!(
499 "Cannot calculate net exposures: insufficient data for {}/{:?}",
501 instrument.settlement_currency(),
502 account.base_currency()
503 );
504 return None; };
506
507 let settlement_currency = account
508 .base_currency()
509 .unwrap_or_else(|| instrument.settlement_currency());
510
511 let net_exposure = instrument
512 .calculate_notional_value(position.quantity, price, None)
513 .as_f64()
514 * xrate;
515
516 let net_exposure = (net_exposure * 10f64.powi(settlement_currency.precision.into()))
517 .round()
518 / 10f64.powi(settlement_currency.precision.into());
519
520 *net_exposures.entry(settlement_currency).or_insert(0.0) += net_exposure;
521 }
522
523 Some(
524 net_exposures
525 .into_iter()
526 .map(|(currency, amount)| (currency, Money::new(amount, currency)))
527 .collect(),
528 )
529 }
530
531 #[must_use]
532 pub fn unrealized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
533 if let Some(pnl) = self
534 .inner
535 .borrow()
536 .unrealized_pnls
537 .get(instrument_id)
538 .copied()
539 {
540 return Some(pnl);
541 }
542
543 let pnl = self.calculate_unrealized_pnl(instrument_id, None)?;
544 self.inner
545 .borrow_mut()
546 .unrealized_pnls
547 .insert(*instrument_id, pnl);
548 Some(pnl)
549 }
550
551 #[must_use]
552 pub fn realized_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
553 if let Some(pnl) = self
554 .inner
555 .borrow()
556 .realized_pnls
557 .get(instrument_id)
558 .copied()
559 {
560 return Some(pnl);
561 }
562
563 let pnl = self.calculate_realized_pnl(instrument_id, None)?;
564 self.inner
565 .borrow_mut()
566 .realized_pnls
567 .insert(*instrument_id, pnl);
568 Some(pnl)
569 }
570
571 #[must_use]
575 pub fn total_pnl(&mut self, instrument_id: &InstrumentId) -> Option<Money> {
576 let realized = self.realized_pnl(instrument_id)?;
577 let unrealized = self.unrealized_pnl(instrument_id)?;
578
579 if realized.currency != unrealized.currency {
580 log::error!(
581 "Cannot calculate total PnL: currency mismatch {} vs {}",
582 realized.currency,
583 unrealized.currency
584 );
585 return None;
586 }
587
588 Some(Money::new(
589 realized.as_f64() + unrealized.as_f64(),
590 realized.currency,
591 ))
592 }
593
594 #[must_use]
599 pub fn total_pnls(
600 &mut self,
601 venue: &Venue,
602 account_id: Option<&AccountId>,
603 ) -> IndexMap<Currency, Money> {
604 let realized_pnls = self.realized_pnls(venue, account_id);
605 let unrealized_pnls = self.unrealized_pnls(venue, account_id);
606
607 let mut total_pnls: IndexMap<Currency, Money> = IndexMap::new();
608
609 for (currency, realized) in realized_pnls {
611 total_pnls.insert(currency, realized);
612 }
613
614 for (currency, unrealized) in unrealized_pnls {
616 match total_pnls.get_mut(¤cy) {
617 Some(total) => {
618 *total = *total + unrealized;
619 }
620 None => {
621 total_pnls.insert(currency, unrealized);
622 }
623 }
624 }
625
626 total_pnls
627 }
628
629 #[must_use]
638 pub fn mark_values(
639 &mut self,
640 venue: &Venue,
641 account_id: Option<&AccountId>,
642 ) -> IndexMap<Currency, Money> {
643 let mut values: IndexMap<Currency, f64> = IndexMap::new();
644 let mut unpriced: AHashSet<InstrumentId> = AHashSet::new();
645
646 if self.accumulate_mark_values(venue, account_id, &mut values, &mut unpriced) {
647 self.update_missing_price_state(venue, &unpriced);
648 } else if account_id.is_none() {
649 self.inner.borrow_mut().venues_missing_price.remove(venue);
652 }
653
654 values
655 .into_iter()
656 .map(|(c, v)| (c, Money::new(v, c)))
657 .collect()
658 }
659
660 #[must_use]
670 pub fn equity(
671 &mut self,
672 venue: &Venue,
673 account_id: Option<&AccountId>,
674 ) -> IndexMap<Currency, Money> {
675 let (mut equity, is_margin) = {
676 let cache = self.cache.borrow();
677 let account = match account_id {
678 Some(id) => cache.account(id),
679 None => cache.account_for_venue(venue),
680 };
681
682 match account {
683 Some(account) => {
684 let equity: IndexMap<Currency, f64> = account
685 .balances_total()
686 .into_iter()
687 .map(|(c, m)| (c, m.as_f64()))
688 .collect();
689 (equity, matches!(account, AccountAny::Margin(_)))
690 }
691 None => return IndexMap::new(),
692 }
693 };
694
695 let mut unpriced: AHashSet<InstrumentId> = AHashSet::new();
696
697 if is_margin {
698 let instrument_ids: IndexSet<InstrumentId> = {
700 let cache = self.cache.borrow();
701 cache
702 .positions_open(Some(venue), None, None, account_id, None)
703 .iter()
704 .map(|p| p.instrument_id)
705 .collect()
706 };
707
708 if instrument_ids.is_empty() {
709 if account_id.is_none() {
710 self.inner.borrow_mut().venues_missing_price.remove(venue);
711 }
712 } else {
713 for instrument_id in instrument_ids {
714 let cached = if account_id.is_none() {
718 self.inner
719 .borrow()
720 .unrealized_pnls
721 .get(&instrument_id)
722 .copied()
723 } else {
724 None
725 };
726 let pnl = match cached {
727 Some(pnl) => Some(pnl),
728 None => self.calculate_unrealized_pnl(&instrument_id, account_id),
729 };
730
731 match pnl {
732 Some(pnl) => {
733 *equity.entry(pnl.currency).or_insert(0.0) += pnl.as_f64();
734 }
735 None => {
736 unpriced.insert(instrument_id);
737 }
738 }
739 }
740 self.update_missing_price_state(venue, &unpriced);
741 }
742 } else if self.accumulate_mark_values(venue, account_id, &mut equity, &mut unpriced) {
743 self.update_missing_price_state(venue, &unpriced);
744 } else if account_id.is_none() {
745 self.inner.borrow_mut().venues_missing_price.remove(venue);
746 }
747
748 equity
749 .into_iter()
750 .map(|(c, v)| (c, Money::new(v, c)))
751 .collect()
752 }
753
754 #[must_use]
760 pub fn missing_price_instruments(&self, venue: &Venue) -> Vec<InstrumentId> {
761 let mut ids: Vec<InstrumentId> = self
762 .inner
763 .borrow()
764 .venues_missing_price
765 .get(venue)
766 .map(|set| set.iter().copied().collect())
767 .unwrap_or_default();
768 ids.sort();
771 ids
772 }
773
774 fn update_missing_price_state(&self, venue: &Venue, unpriced: &AHashSet<InstrumentId>) {
775 let mut inner = self.inner.borrow_mut();
776 let tracked = inner.venues_missing_price.entry(*venue).or_default();
777
778 let mut ids: Vec<InstrumentId> = unpriced.iter().copied().collect();
780 ids.sort();
781 for instrument_id in ids {
782 if tracked.insert(instrument_id) {
783 log::warn!(
784 "No price available for open position {instrument_id}; \
785 subscribe to quotes, trades or bars for continuous mark-to-market equity"
786 );
787 }
788 }
789
790 tracked.retain(|id| unpriced.contains(id));
792 }
793
794 fn accumulate_mark_values(
798 &self,
799 venue: &Venue,
800 account_id: Option<&AccountId>,
801 values: &mut IndexMap<Currency, f64>,
802 unpriced: &mut AHashSet<InstrumentId>,
803 ) -> bool {
804 let cache = self.cache.borrow();
805 let positions = cache.positions_open(Some(venue), None, None, account_id, None);
806
807 if positions.is_empty() {
808 return false;
809 }
810
811 let account = match account_id {
812 Some(id) => cache.account(id),
813 None => cache.account_for_venue(venue),
814 };
815 let mut xrate_cache: AHashMap<Currency, Option<f64>> = AHashMap::new();
816
817 for position in positions {
818 let sign = match position.side {
819 PositionSide::Long => 1.0,
820 PositionSide::Short => -1.0,
821 PositionSide::Flat | PositionSide::NoPositionSide => continue,
822 };
823
824 let instrument = match cache.instrument(&position.instrument_id) {
825 Some(i) => i,
826 None => {
827 unpriced.insert(position.instrument_id);
828 continue;
829 }
830 };
831
832 let price = match self.get_price(position) {
833 Some(p) => p,
834 None => {
835 unpriced.insert(position.instrument_id);
836 continue;
837 }
838 };
839
840 let settlement = instrument.settlement_currency();
841 let (xrate, currency) = if self.config.convert_to_account_base_currency
842 && let Some(account) = account
843 && let Some(base_currency) = account.base_currency()
844 {
845 let xrate_opt = *xrate_cache
846 .entry(settlement)
847 .or_insert_with(|| self.calculate_xrate_to_base(instrument, account));
848 let xrate = match xrate_opt {
849 Some(x) => x,
850 None => {
851 unpriced.insert(position.instrument_id);
852 continue;
853 }
854 };
855 (xrate, base_currency)
856 } else {
857 (1.0, settlement)
858 };
859
860 let notional = position.notional_value(price).as_f64() * xrate;
861 *values.entry(currency).or_insert(0.0) += sign * notional;
862 }
863
864 true
865 }
866
867 #[must_use]
868 pub fn net_exposure(
869 &self,
870 instrument_id: &InstrumentId,
871 account_id: Option<&AccountId>,
872 ) -> Option<Money> {
873 let cache = self.cache.borrow();
874
875 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
876 instrument
877 } else {
878 log::error!("Cannot calculate net exposure: no instrument for {instrument_id}");
879 return None;
880 };
881
882 let positions_open =
883 cache.positions_open(None, Some(instrument_id), None, account_id, None);
884
885 if positions_open.is_empty() {
886 return Some(Money::new(0.0, instrument.settlement_currency()));
887 }
888
889 let mut net_exposure = 0.0;
890 let mut first_base_currency: Option<Currency> = None;
891 let mut first_account: Option<&AccountAny> = None;
892
893 for position in &positions_open {
894 let account = if let Some(account) = cache.account(&position.account_id) {
896 account
897 } else {
898 log::error!(
899 "Cannot calculate net exposure: no account for {}",
900 position.account_id
901 );
902 return None;
903 };
904
905 if let Some(base) = account.base_currency() {
907 match first_base_currency {
908 None => {
909 first_base_currency = Some(base);
910 first_account = Some(account);
911 }
912 Some(first) if first != base => {
913 log::error!(
914 "Cannot calculate net exposure: accounts have different base \
915 currencies ({first} vs {base}); multi-account aggregation requires \
916 consistent base currencies"
917 );
918 return None;
919 }
920 _ => {}
921 }
922 }
923
924 let price = self.get_price(position)?;
925 let xrate = if let Some(xrate) = self.calculate_xrate_to_base(instrument, account) {
926 xrate
927 } else {
928 log::error!(
929 "Cannot calculate net exposures: insufficient data for {}/{:?}",
930 instrument.settlement_currency(),
931 account.base_currency()
932 );
933 return None;
934 };
935
936 let notional_value =
937 instrument.calculate_notional_value(position.quantity, price, None);
938 net_exposure += notional_value.as_f64() * xrate;
939 }
940
941 let settlement_currency = first_account
942 .and_then(|a| a.base_currency())
943 .unwrap_or_else(|| instrument.settlement_currency());
944
945 Some(Money::new(net_exposure, settlement_currency))
946 }
947
948 #[must_use]
949 pub fn net_position(&self, instrument_id: &InstrumentId) -> Decimal {
950 self.inner
951 .borrow()
952 .net_positions
953 .get(instrument_id)
954 .copied()
955 .unwrap_or(Decimal::ZERO)
956 }
957
958 #[must_use]
959 pub fn is_net_long(&self, instrument_id: &InstrumentId) -> bool {
960 self.inner
961 .borrow()
962 .net_positions
963 .get(instrument_id)
964 .copied()
965 .map_or_else(|| false, |net_position| net_position > Decimal::ZERO)
966 }
967
968 #[must_use]
969 pub fn is_net_short(&self, instrument_id: &InstrumentId) -> bool {
970 self.inner
971 .borrow()
972 .net_positions
973 .get(instrument_id)
974 .copied()
975 .map_or_else(|| false, |net_position| net_position < Decimal::ZERO)
976 }
977
978 #[must_use]
979 pub fn is_flat(&self, instrument_id: &InstrumentId) -> bool {
980 self.inner
981 .borrow()
982 .net_positions
983 .get(instrument_id)
984 .copied()
985 .map_or_else(|| true, |net_position| net_position == Decimal::ZERO)
986 }
987
988 #[must_use]
989 pub fn is_completely_flat(&self) -> bool {
990 for net_position in self.inner.borrow().net_positions.values() {
991 if *net_position != Decimal::ZERO {
992 return false;
993 }
994 }
995 true
996 }
997
998 pub fn initialize_orders(&mut self) {
1004 let mut initialized = true;
1005 let orders_and_instruments = {
1006 let cache = self.cache.borrow();
1007 let all_orders_open = cache.orders_open(None, None, None, None, None);
1008
1009 let mut instruments_with_orders = Vec::new();
1010 let mut instruments = AHashSet::new();
1011
1012 for order in &all_orders_open {
1013 instruments.insert(order.instrument_id());
1014 }
1015
1016 for instrument_id in instruments {
1017 if let Some(instrument) = cache.instrument(&instrument_id) {
1018 let orders = cache
1019 .orders_open(None, Some(&instrument_id), None, None, None)
1020 .into_iter()
1021 .cloned()
1022 .collect::<Vec<OrderAny>>();
1023 instruments_with_orders.push((instrument.clone(), orders));
1024 } else {
1025 log::error!(
1026 "Cannot update initial (order) margin: no instrument found for {instrument_id}"
1027 );
1028 initialized = false;
1029 break;
1030 }
1031 }
1032 instruments_with_orders
1033 };
1034
1035 for (instrument, orders_open) in &orders_and_instruments {
1036 let account = {
1037 let cache = self.cache.borrow();
1038 if let Some(account) = cache.account_for_venue(&instrument.id().venue) {
1039 account.clone()
1040 } else {
1041 log::error!(
1042 "Cannot update initial (order) margin: no account registered for {}",
1043 instrument.id().venue
1044 );
1045 initialized = false;
1046 break;
1047 }
1048 };
1049
1050 let result = self.inner.borrow_mut().accounts.update_orders(
1051 &account,
1052 instrument,
1053 orders_open.iter().collect(),
1054 self.clock.borrow().timestamp_ns(),
1055 );
1056
1057 match result {
1058 Some((updated_account, _)) => {
1059 self.cache
1060 .borrow_mut()
1061 .update_account(&updated_account)
1062 .unwrap();
1063 }
1064 None => {
1065 initialized = false;
1066 }
1067 }
1068 }
1069
1070 let total_orders = orders_and_instruments
1071 .into_iter()
1072 .map(|(_, orders)| orders.len())
1073 .sum::<usize>();
1074
1075 log::info!(
1076 color = if total_orders > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
1077 "Initialized {} open order{}",
1078 total_orders,
1079 if total_orders == 1 { "" } else { "s" }
1080 );
1081
1082 self.inner.borrow_mut().initialized = initialized;
1083 }
1084
1085 pub fn initialize_positions(&mut self) {
1091 self.inner.borrow_mut().unrealized_pnls.clear();
1092 self.inner.borrow_mut().realized_pnls.clear();
1093 let all_positions_open: Vec<Position>;
1094 let mut instruments = AHashSet::new();
1095 {
1096 let cache = self.cache.borrow();
1097 all_positions_open = cache
1098 .positions_open(None, None, None, None, None)
1099 .into_iter()
1100 .cloned()
1101 .collect();
1102
1103 for position in &all_positions_open {
1104 instruments.insert(position.instrument_id);
1105 }
1106 }
1107
1108 let mut initialized = true;
1109
1110 for instrument_id in instruments {
1111 let positions_open: Vec<Position> = {
1112 let cache = self.cache.borrow();
1113 cache
1114 .positions_open(None, Some(&instrument_id), None, None, None)
1115 .into_iter()
1116 .cloned()
1117 .collect()
1118 };
1119
1120 self.update_net_position(&instrument_id, &positions_open);
1121
1122 if let Some(calculated_unrealized_pnl) =
1123 self.calculate_unrealized_pnl(&instrument_id, None)
1124 {
1125 self.inner
1126 .borrow_mut()
1127 .unrealized_pnls
1128 .insert(instrument_id, calculated_unrealized_pnl);
1129 } else {
1130 log::debug!(
1131 "Failed to calculate unrealized PnL for {instrument_id}, marking as pending"
1132 );
1133 self.inner.borrow_mut().pending_calcs.insert(instrument_id);
1134 }
1135
1136 if let Some(calculated_realized_pnl) = self.calculate_realized_pnl(&instrument_id, None)
1137 {
1138 self.inner
1139 .borrow_mut()
1140 .realized_pnls
1141 .insert(instrument_id, calculated_realized_pnl);
1142 } else {
1143 log::warn!(
1144 "Failed to calculate realized PnL for {instrument_id}, marking as pending"
1145 );
1146 self.inner.borrow_mut().pending_calcs.insert(instrument_id);
1147 }
1148
1149 let cache = self.cache.borrow();
1150 let Some(account) = cache.account_for_venue(&instrument_id.venue).cloned() else {
1151 log::error!(
1152 "Cannot update maintenance (position) margin: no account registered for {}",
1153 instrument_id.venue
1154 );
1155 initialized = false;
1156 break;
1157 };
1158
1159 let account = match account {
1160 AccountAny::Cash(_) | AccountAny::Betting(_) => continue,
1161 AccountAny::Margin(margin_account) => margin_account,
1162 };
1163
1164 let Some(instrument) = cache.instrument(&instrument_id).cloned() else {
1165 log::error!(
1166 "Cannot update maintenance (position) margin: no instrument found for {instrument_id}"
1167 );
1168 initialized = false;
1169 break;
1170 };
1171 let positions: Vec<Position> = cache
1172 .positions_open(None, Some(&instrument_id), None, None, None)
1173 .into_iter()
1174 .cloned()
1175 .collect();
1176 drop(cache);
1177
1178 let result = self.inner.borrow_mut().accounts.update_positions(
1179 &account,
1180 &instrument,
1181 positions.iter().collect(),
1182 self.clock.borrow().timestamp_ns(),
1183 );
1184
1185 match result {
1186 Some((updated_account, _)) => {
1187 self.cache
1188 .borrow_mut()
1189 .update_account(&AccountAny::Margin(updated_account))
1190 .unwrap();
1191 }
1192 None => {
1193 initialized = false;
1194 }
1195 }
1196 }
1197
1198 let open_count = all_positions_open.len();
1199 self.inner.borrow_mut().initialized = initialized;
1200 log::info!(
1201 color = if open_count > 0 { LogColor::Blue as u8 } else { LogColor::Normal as u8 };
1202 "Initialized {} open position{}",
1203 open_count,
1204 if open_count == 1 { "" } else { "s" }
1205 );
1206 }
1207
1208 pub fn update_quote_tick(&mut self, quote: &QuoteTick) {
1212 update_quote_tick(&self.cache, &self.clock, &self.inner, self.config, quote);
1213 }
1214
1215 pub fn update_bar(&mut self, bar: &Bar) {
1219 update_bar(&self.cache, &self.clock, &self.inner, self.config, bar);
1220 }
1221
1222 pub fn update_account(&mut self, event: &AccountState) {
1224 update_account(&self.cache, &self.inner, event);
1225 }
1226
1227 pub fn update_order(&mut self, event: &OrderEventAny) {
1231 update_order(&self.cache, &self.clock, &self.inner, self.config, event);
1232 }
1233
1234 pub fn update_position(&mut self, event: &PositionEvent) {
1238 update_position(&self.cache, &self.clock, &self.inner, self.config, event);
1239 }
1240
1241 fn update_net_position(&self, instrument_id: &InstrumentId, positions_open: &[Position]) {
1242 let mut net_position = Decimal::ZERO;
1243
1244 for open_position in positions_open {
1245 log::debug!("open_position: {open_position}");
1246 net_position += open_position.signed_decimal_qty();
1247 }
1248
1249 let existing_position = self.net_position(instrument_id);
1250 if existing_position != net_position {
1251 self.inner
1252 .borrow_mut()
1253 .net_positions
1254 .insert(*instrument_id, net_position);
1255 log::info!("{instrument_id} net_position={net_position}");
1256 }
1257 }
1258
1259 fn calculate_unrealized_pnl(
1260 &self,
1261 instrument_id: &InstrumentId,
1262 account_id: Option<&AccountId>,
1263 ) -> Option<Money> {
1264 let cache = self.cache.borrow();
1265 let account = match account_id {
1266 Some(id) => cache.account(id),
1267 None => cache.account_for_venue(&instrument_id.venue),
1268 };
1269 let account = if let Some(account) = account {
1270 account
1271 } else {
1272 log::error!(
1273 "Cannot calculate unrealized PnL: no account for {} / {account_id:?}",
1274 instrument_id.venue,
1275 );
1276 return None;
1277 };
1278
1279 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1280 instrument
1281 } else {
1282 log::error!("Cannot calculate unrealized PnL: no instrument for {instrument_id}");
1283 return None;
1284 };
1285
1286 let currency = account
1287 .base_currency()
1288 .unwrap_or_else(|| instrument.settlement_currency());
1289
1290 let positions_open =
1291 cache.positions_open(None, Some(instrument_id), None, account_id, None);
1292
1293 if positions_open.is_empty() {
1294 return Some(Money::new(0.0, currency));
1295 }
1296
1297 let mut total_pnl = 0.0;
1298
1299 for position in positions_open {
1300 if position.instrument_id != *instrument_id {
1301 continue; }
1303
1304 if position.side == PositionSide::Flat {
1305 continue; }
1307
1308 let price = if let Some(price) = self.get_price(position) {
1309 price
1310 } else {
1311 log::debug!("Cannot calculate unrealized PnL: no prices for {instrument_id}");
1312 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1313 return None; };
1315
1316 let mut pnl = position.unrealized_pnl(price).as_f64();
1317
1318 if let Some(base_currency) = account.base_currency() {
1319 let xrate = if let Some(xrate) = self.calculate_xrate_to_base(instrument, account) {
1320 xrate
1321 } else {
1322 log::error!(
1323 "Cannot calculate unrealized PnL: insufficient data for {}/{}",
1325 instrument.settlement_currency(),
1326 base_currency
1327 );
1328 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1329 return None; };
1331
1332 let scale = 10f64.powi(currency.precision.into());
1333 pnl = ((pnl * xrate) * scale).round() / scale;
1334 }
1335
1336 total_pnl += pnl;
1337 }
1338
1339 Some(Money::new(total_pnl, currency))
1340 }
1341
1342 fn ensure_snapshot_pnls_cached_for(&self, instrument_id: &InstrumentId) {
1343 let snapshot_position_ids = self.cache.borrow().position_snapshot_ids(instrument_id);
1349
1350 if snapshot_position_ids.is_empty() {
1351 return; }
1353
1354 let mut rebuild = false;
1355
1356 for position_id in &snapshot_position_ids {
1358 let curr_count = self.cache.borrow().position_snapshot_count(position_id);
1359 let prev_count = self
1360 .inner
1361 .borrow()
1362 .snapshot_processed_counts
1363 .get(position_id)
1364 .copied()
1365 .unwrap_or(0);
1366
1367 if prev_count > curr_count {
1368 rebuild = true;
1369 break;
1370 }
1371 }
1372
1373 if rebuild {
1374 for position_id in &snapshot_position_ids {
1376 let snapshot_count = self.cache.borrow().position_snapshot_count(position_id);
1380 let snapshots = self
1381 .cache
1382 .borrow()
1383 .position_snapshots(Some(position_id), None);
1384
1385 let mut sum_pnl: Option<Money> = None;
1386 let mut last_pnl: Option<Money> = None;
1387 let mut snapshot_account_id: Option<AccountId> = None;
1388
1389 for snapshot in snapshots {
1390 snapshot_account_id.get_or_insert(snapshot.account_id);
1391 if let Some(realized_pnl) = snapshot.realized_pnl {
1392 if let Some(sum) = sum_pnl {
1393 if sum.currency == realized_pnl.currency {
1394 sum_pnl = Some(sum + realized_pnl);
1395 }
1396 } else {
1397 sum_pnl = Some(realized_pnl);
1398 }
1399 last_pnl = Some(realized_pnl);
1400 }
1401 }
1402
1403 let mut inner = self.inner.borrow_mut();
1404
1405 if let Some(sum) = sum_pnl {
1406 inner.snapshot_sum_per_position.insert(*position_id, sum);
1407
1408 if let Some(last) = last_pnl {
1409 inner.snapshot_last_per_position.insert(*position_id, last);
1410 }
1411 } else {
1412 inner.snapshot_sum_per_position.remove(position_id);
1413 inner.snapshot_last_per_position.remove(position_id);
1414 }
1415
1416 if let Some(account_id) = snapshot_account_id {
1417 inner.snapshot_account_ids.insert(*position_id, account_id);
1418 } else {
1419 inner.snapshot_account_ids.remove(position_id);
1420 }
1421
1422 inner
1423 .snapshot_processed_counts
1424 .insert(*position_id, snapshot_count);
1425 }
1426 } else {
1427 for position_id in &snapshot_position_ids {
1429 let curr_count = self.cache.borrow().position_snapshot_count(position_id);
1432 let prev_count = self
1433 .inner
1434 .borrow()
1435 .snapshot_processed_counts
1436 .get(position_id)
1437 .copied()
1438 .unwrap_or(0);
1439
1440 if prev_count >= curr_count {
1441 continue;
1442 }
1443
1444 let mut sum_pnl = self
1445 .inner
1446 .borrow()
1447 .snapshot_sum_per_position
1448 .get(position_id)
1449 .copied();
1450 let mut last_pnl = self
1451 .inner
1452 .borrow()
1453 .snapshot_last_per_position
1454 .get(position_id)
1455 .copied();
1456 let mut snapshot_account_id: Option<AccountId> = None;
1457
1458 let new_snapshots = self
1459 .cache
1460 .borrow()
1461 .position_snapshots_from(position_id, prev_count);
1462
1463 for snapshot in new_snapshots {
1464 snapshot_account_id.get_or_insert(snapshot.account_id);
1465 if let Some(realized_pnl) = snapshot.realized_pnl {
1466 if let Some(sum) = sum_pnl {
1467 if sum.currency == realized_pnl.currency {
1468 sum_pnl = Some(sum + realized_pnl);
1469 }
1470 } else {
1471 sum_pnl = Some(realized_pnl);
1472 }
1473 last_pnl = Some(realized_pnl);
1474 }
1475 }
1476
1477 let mut inner = self.inner.borrow_mut();
1478
1479 if let Some(sum) = sum_pnl {
1480 inner.snapshot_sum_per_position.insert(*position_id, sum);
1481
1482 if let Some(last) = last_pnl {
1483 inner.snapshot_last_per_position.insert(*position_id, last);
1484 }
1485 }
1486
1487 if let Some(account_id) = snapshot_account_id
1488 && !inner.snapshot_account_ids.contains_key(position_id)
1489 {
1490 inner.snapshot_account_ids.insert(*position_id, account_id);
1491 }
1492
1493 inner
1494 .snapshot_processed_counts
1495 .insert(*position_id, curr_count);
1496 }
1497 }
1498 }
1499
1500 fn calculate_realized_pnl(
1501 &self,
1502 instrument_id: &InstrumentId,
1503 account_id: Option<&AccountId>,
1504 ) -> Option<Money> {
1505 self.ensure_snapshot_pnls_cached_for(instrument_id);
1507
1508 let cache = self.cache.borrow();
1509 let account = match account_id {
1510 Some(id) => cache.account(id),
1511 None => cache.account_for_venue(&instrument_id.venue),
1512 };
1513 let account = if let Some(account) = account {
1514 account
1515 } else {
1516 log::error!(
1517 "Cannot calculate realized PnL: no account for {} / {account_id:?}",
1518 instrument_id.venue,
1519 );
1520 return None;
1521 };
1522
1523 let instrument = if let Some(instrument) = cache.instrument(instrument_id) {
1524 instrument
1525 } else {
1526 log::error!("Cannot calculate realized PnL: no instrument for {instrument_id}");
1527 return None;
1528 };
1529
1530 let currency = account
1531 .base_currency()
1532 .unwrap_or_else(|| instrument.settlement_currency());
1533
1534 let positions = cache.positions(None, Some(instrument_id), None, account_id, None);
1535
1536 let mut snapshot_position_ids: Vec<PositionId> = if let Some(filter_id) = account_id {
1541 let inner = self.inner.borrow();
1542 cache
1543 .position_snapshot_ids(instrument_id)
1544 .into_iter()
1545 .filter(|pid| {
1546 inner
1547 .snapshot_account_ids
1548 .get(pid)
1549 .is_some_and(|id| id == filter_id)
1550 })
1551 .collect()
1552 } else {
1553 cache
1554 .position_snapshot_ids(instrument_id)
1555 .into_iter()
1556 .collect()
1557 };
1558 snapshot_position_ids.sort();
1559
1560 let is_netting = positions
1562 .iter()
1563 .any(|p| cache.oms_type(&p.id) == Some(OmsType::Netting));
1564
1565 let mut total_pnl = 0.0;
1566
1567 if is_netting && !snapshot_position_ids.is_empty() {
1568 for position_id in &snapshot_position_ids {
1571 let is_active = positions.iter().any(|p| p.id == *position_id);
1572
1573 if is_active {
1574 let last_pnl = self
1576 .inner
1577 .borrow()
1578 .snapshot_last_per_position
1579 .get(position_id)
1580 .copied();
1581
1582 if let Some(last_pnl) = last_pnl {
1583 let mut pnl = last_pnl.as_f64();
1584
1585 if let Some(base_currency) = account.base_currency()
1586 && positions.iter().any(|p| p.id == *position_id)
1587 {
1588 let xrate = if let Some(xrate) =
1589 self.calculate_xrate_to_base(instrument, account)
1590 {
1591 xrate
1592 } else {
1593 log::error!(
1594 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1595 instrument.settlement_currency(),
1596 base_currency
1597 );
1598 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1599 return Some(Money::new(0.0, currency));
1600 };
1601
1602 let scale = 10f64.powi(currency.precision.into());
1603 pnl = ((pnl * xrate) * scale).round() / scale;
1604 }
1605
1606 total_pnl += pnl;
1607 }
1608 } else {
1609 let sum_pnl = self
1611 .inner
1612 .borrow()
1613 .snapshot_sum_per_position
1614 .get(position_id)
1615 .copied();
1616
1617 if let Some(sum_pnl) = sum_pnl {
1618 let mut pnl = sum_pnl.as_f64();
1619
1620 if let Some(base_currency) = account.base_currency() {
1621 let xrate = cache.get_xrate(
1623 instrument_id.venue,
1624 instrument.settlement_currency(),
1625 base_currency,
1626 PriceType::Mid,
1627 );
1628
1629 if let Some(xrate) = xrate {
1630 let scale = 10f64.powi(currency.precision.into());
1631 pnl = ((pnl * xrate) * scale).round() / scale;
1632 } else {
1633 log::error!(
1634 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1635 instrument.settlement_currency(),
1636 base_currency
1637 );
1638 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1639 return Some(Money::new(0.0, currency));
1640 }
1641 }
1642
1643 total_pnl += pnl;
1644 }
1645 }
1646 }
1647
1648 for position in positions {
1650 if position.instrument_id != *instrument_id {
1651 continue;
1652 }
1653
1654 if let Some(realized_pnl) = position.realized_pnl {
1655 let mut pnl = realized_pnl.as_f64();
1656
1657 if let Some(base_currency) = account.base_currency() {
1658 let xrate = if let Some(xrate) =
1659 self.calculate_xrate_to_base(instrument, account)
1660 {
1661 xrate
1662 } else {
1663 log::error!(
1664 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1665 instrument.settlement_currency(),
1666 base_currency
1667 );
1668 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1669 return Some(Money::new(0.0, currency));
1670 };
1671
1672 let scale = 10f64.powi(currency.precision.into());
1673 pnl = ((pnl * xrate) * scale).round() / scale;
1674 }
1675
1676 total_pnl += pnl;
1677 }
1678 }
1679 } else {
1680 for position_id in &snapshot_position_ids {
1683 let sum_pnl = self
1684 .inner
1685 .borrow()
1686 .snapshot_sum_per_position
1687 .get(position_id)
1688 .copied();
1689
1690 if let Some(sum_pnl) = sum_pnl {
1691 let mut pnl = sum_pnl.as_f64();
1692
1693 if let Some(base_currency) = account.base_currency() {
1694 let xrate = cache.get_xrate(
1695 instrument_id.venue,
1696 instrument.settlement_currency(),
1697 base_currency,
1698 PriceType::Mid,
1699 );
1700
1701 if let Some(xrate) = xrate {
1702 let scale = 10f64.powi(currency.precision.into());
1703 pnl = ((pnl * xrate) * scale).round() / scale;
1704 } else {
1705 log::error!(
1706 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1707 instrument.settlement_currency(),
1708 base_currency
1709 );
1710 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1711 return Some(Money::new(0.0, currency));
1712 }
1713 }
1714
1715 total_pnl += pnl;
1716 }
1717 }
1718
1719 for position in positions {
1721 if position.instrument_id != *instrument_id {
1722 continue;
1723 }
1724
1725 if let Some(realized_pnl) = position.realized_pnl {
1726 let mut pnl = realized_pnl.as_f64();
1727
1728 if let Some(base_currency) = account.base_currency() {
1729 let xrate = if let Some(xrate) =
1730 self.calculate_xrate_to_base(instrument, account)
1731 {
1732 xrate
1733 } else {
1734 log::error!(
1735 "Cannot calculate realized PnL: insufficient exchange rate data for {}/{}, marking as pending calculation",
1736 instrument.settlement_currency(),
1737 base_currency
1738 );
1739 self.inner.borrow_mut().pending_calcs.insert(*instrument_id);
1740 return Some(Money::new(0.0, currency));
1741 };
1742
1743 let scale = 10f64.powi(currency.precision.into());
1744 pnl = ((pnl * xrate) * scale).round() / scale;
1745 }
1746
1747 total_pnl += pnl;
1748 }
1749 }
1750 }
1751
1752 Some(Money::new(total_pnl, currency))
1753 }
1754
1755 fn get_price(&self, position: &Position) -> Option<Price> {
1756 let cache = self.cache.borrow();
1757 let instrument_id = &position.instrument_id;
1758
1759 if self.config.use_mark_prices
1761 && let Some(mark_price) = cache.mark_price(instrument_id)
1762 {
1763 return Some(mark_price.value);
1764 }
1765
1766 let price_type = match position.side {
1768 PositionSide::Long => PriceType::Bid,
1769 PositionSide::Short => PriceType::Ask,
1770 _ => panic!("invalid `PositionSide`, was {}", position.side),
1771 };
1772
1773 cache
1774 .price(instrument_id, price_type)
1775 .or_else(|| cache.price(instrument_id, PriceType::Last))
1776 .or_else(|| {
1777 self.inner
1778 .borrow()
1779 .bar_close_prices
1780 .get(instrument_id)
1781 .copied()
1782 })
1783 }
1784
1785 fn calculate_xrate_to_base(
1786 &self,
1787 instrument: &InstrumentAny,
1788 account: &AccountAny,
1789 ) -> Option<f64> {
1790 if !self.config.convert_to_account_base_currency {
1791 return Some(1.0); }
1793
1794 let base_currency = match account.base_currency() {
1795 Some(base_currency) => base_currency,
1796 None => return Some(1.0),
1797 };
1798
1799 let settlement = instrument.settlement_currency();
1800 let cache = self.cache.borrow();
1801
1802 if self.config.use_mark_xrates
1803 && let Some(xrate) = cache.get_mark_xrate(settlement, base_currency)
1804 {
1805 return Some(xrate);
1806 }
1807
1808 cache.get_xrate(
1809 instrument.id().venue,
1810 settlement,
1811 base_currency,
1812 PriceType::Mid,
1813 )
1814 }
1815}
1816
1817fn update_quote_tick(
1819 cache: &Rc<RefCell<Cache>>,
1820 clock: &Rc<RefCell<dyn Clock>>,
1821 inner: &Rc<RefCell<PortfolioState>>,
1822 config: PortfolioConfig,
1823 quote: &QuoteTick,
1824) {
1825 update_instrument_id(cache, clock, inner, config, "e.instrument_id);
1826}
1827
1828fn update_bar(
1829 cache: &Rc<RefCell<Cache>>,
1830 clock: &Rc<RefCell<dyn Clock>>,
1831 inner: &Rc<RefCell<PortfolioState>>,
1832 config: PortfolioConfig,
1833 bar: &Bar,
1834) {
1835 let instrument_id = bar.bar_type.instrument_id();
1836 inner
1837 .borrow_mut()
1838 .bar_close_prices
1839 .insert(instrument_id, bar.close);
1840 update_instrument_id(cache, clock, inner, config, &instrument_id);
1841}
1842
1843fn update_instrument_id(
1844 cache: &Rc<RefCell<Cache>>,
1845 clock: &Rc<RefCell<dyn Clock>>,
1846 inner: &Rc<RefCell<PortfolioState>>,
1847 config: PortfolioConfig,
1848 instrument_id: &InstrumentId,
1849) {
1850 inner
1851 .borrow_mut()
1852 .unrealized_pnls
1853 .shift_remove(instrument_id);
1854
1855 if inner.borrow().initialized || !inner.borrow().pending_calcs.contains(instrument_id) {
1856 return;
1857 }
1858
1859 let mut result_maint = None;
1860
1861 let (account, instrument, orders_open, positions_open) = {
1863 let cache_ref = cache.borrow();
1864 let account = if let Some(account) = cache_ref.account_for_venue(&instrument_id.venue) {
1865 account.clone()
1866 } else {
1867 log::error!(
1868 "Cannot update tick: no account registered for {}",
1869 instrument_id.venue
1870 );
1871 return;
1872 };
1873 let instrument = if let Some(instrument) = cache_ref.instrument(instrument_id) {
1874 instrument.clone()
1875 } else {
1876 log::error!("Cannot update tick: no instrument found for {instrument_id}");
1877 return;
1878 };
1879 let orders_open: Vec<OrderAny> = cache_ref
1880 .orders_open(None, Some(instrument_id), None, None, None)
1881 .iter()
1882 .map(|o| (*o).clone())
1883 .collect();
1884 let positions_open: Vec<Position> = cache_ref
1885 .positions_open(None, Some(instrument_id), None, None, None)
1886 .iter()
1887 .map(|p| (*p).clone())
1888 .collect();
1889 (account, instrument, orders_open, positions_open)
1890 };
1891
1892 let result_init = inner.borrow().accounts.update_orders(
1894 &account,
1895 &instrument,
1896 orders_open.iter().collect(),
1897 clock.borrow().timestamp_ns(),
1898 );
1899
1900 if let AccountAny::Margin(ref margin_account) = account {
1901 result_maint = inner.borrow().accounts.update_positions(
1902 margin_account,
1903 &instrument,
1904 positions_open.iter().collect(),
1905 clock.borrow().timestamp_ns(),
1906 );
1907 }
1908
1909 if let Some((ref updated_account, _)) = result_init {
1910 cache.borrow_mut().update_account(updated_account).unwrap();
1911 }
1912
1913 let portfolio_clone = Portfolio {
1914 clock: clock.clone(),
1915 cache: cache.clone(),
1916 inner: inner.clone(),
1917 config,
1918 };
1919
1920 let result_unrealized_pnl: Option<Money> =
1921 portfolio_clone.calculate_unrealized_pnl(instrument_id, None);
1922
1923 if result_init.is_some()
1924 && (matches!(account, AccountAny::Cash(_) | AccountAny::Betting(_))
1925 || (result_maint.is_some() && result_unrealized_pnl.is_some()))
1926 {
1927 inner.borrow_mut().pending_calcs.remove(instrument_id);
1928 if inner.borrow().pending_calcs.is_empty() {
1929 inner.borrow_mut().initialized = true;
1930 }
1931 }
1932}
1933
1934fn update_order(
1935 cache: &Rc<RefCell<Cache>>,
1936 clock: &Rc<RefCell<dyn Clock>>,
1937 inner: &Rc<RefCell<PortfolioState>>,
1938 _config: PortfolioConfig,
1939 event: &OrderEventAny,
1940) {
1941 let account_id = match event.account_id() {
1942 Some(account_id) => account_id,
1943 None => {
1944 return; }
1946 };
1947
1948 let (account, instrument, orders_open) = {
1950 let cache_ref = cache.borrow();
1951
1952 let account = if let Some(account) = cache_ref.account(&account_id) {
1953 account.clone()
1954 } else {
1955 log::error!("Cannot update order: no account registered for {account_id}");
1956 return;
1957 };
1958
1959 match &account {
1960 AccountAny::Margin(margin_account) => {
1961 if !margin_account.base.calculate_account_state {
1962 return;
1963 }
1964 }
1965 AccountAny::Cash(cash_account) => {
1966 if !cash_account.base.calculate_account_state {
1967 return;
1968 }
1969 }
1970 AccountAny::Betting(betting_account) => {
1971 if !betting_account.base.calculate_account_state {
1972 return;
1973 }
1974 }
1975 }
1976
1977 match event {
1978 OrderEventAny::Accepted(_)
1979 | OrderEventAny::Canceled(_)
1980 | OrderEventAny::Rejected(_)
1981 | OrderEventAny::Updated(_)
1982 | OrderEventAny::Filled(_) => {}
1983 _ => {
1984 return;
1985 }
1986 }
1987
1988 let order = if let Some(order) = cache_ref.order(&event.client_order_id()) {
1989 order
1990 } else {
1991 log::error!(
1992 "Cannot update order: {} not found in the cache",
1993 event.client_order_id()
1994 );
1995 return; };
1997
1998 if matches!(event, OrderEventAny::Rejected(_)) && order.order_type() != OrderType::StopLimit
1999 {
2000 return; }
2002
2003 let instrument = if let Some(instrument) = cache_ref.instrument(&event.instrument_id()) {
2004 instrument.clone()
2005 } else {
2006 log::error!(
2007 "Cannot update order: no instrument found for {}",
2008 event.instrument_id()
2009 );
2010 return;
2011 };
2012
2013 let orders_open: Vec<OrderAny> = cache_ref
2014 .orders_open(None, Some(&event.instrument_id()), None, None, None)
2015 .iter()
2016 .map(|o| (*o).clone())
2017 .collect();
2018
2019 (account, instrument, orders_open)
2020 };
2021
2022 if let OrderEventAny::Filled(order_filled) = event {
2024 let _ =
2025 inner
2026 .borrow()
2027 .accounts
2028 .update_balances(account.clone(), &instrument, *order_filled);
2029
2030 let portfolio_clone = Portfolio {
2031 clock: clock.clone(),
2032 cache: cache.clone(),
2033 inner: inner.clone(),
2034 config: PortfolioConfig::default(), };
2036
2037 match portfolio_clone.calculate_unrealized_pnl(&order_filled.instrument_id, None) {
2038 Some(unrealized_pnl) => {
2039 inner
2040 .borrow_mut()
2041 .unrealized_pnls
2042 .insert(event.instrument_id(), unrealized_pnl);
2043 }
2044 None => {
2045 log::error!(
2046 "Failed to calculate unrealized PnL for instrument {}",
2047 event.instrument_id()
2048 );
2049 }
2050 }
2051 }
2052
2053 let account_state = inner.borrow().accounts.update_orders(
2054 &account,
2055 &instrument,
2056 orders_open.iter().collect(),
2057 clock.borrow().timestamp_ns(),
2058 );
2059
2060 if let Some((updated_account, account_state)) = account_state {
2061 cache.borrow_mut().update_account(&updated_account).unwrap();
2062 msgbus::publish_account_state(
2063 format!("events.account.{}", account.id()).into(),
2064 &account_state,
2065 );
2066 } else {
2067 log::debug!("Added pending calculation for {}", instrument.id());
2068 inner.borrow_mut().pending_calcs.insert(instrument.id());
2069 }
2070
2071 log::debug!("Updated {event}");
2072}
2073
2074fn update_position(
2075 cache: &Rc<RefCell<Cache>>,
2076 clock: &Rc<RefCell<dyn Clock>>,
2077 inner: &Rc<RefCell<PortfolioState>>,
2078 _config: PortfolioConfig,
2079 event: &PositionEvent,
2080) {
2081 let instrument_id = event.instrument_id();
2082
2083 let positions_open: Vec<Position> = {
2084 let cache_ref = cache.borrow();
2085
2086 cache_ref
2087 .positions_open(None, Some(&instrument_id), None, None, None)
2088 .iter()
2089 .map(|o| (*o).clone())
2090 .collect()
2091 };
2092
2093 log::debug!("position fresh from cache -> {positions_open:?}");
2094
2095 let portfolio_clone = Portfolio {
2096 clock: clock.clone(),
2097 cache: cache.clone(),
2098 inner: inner.clone(),
2099 config: PortfolioConfig::default(), };
2101
2102 portfolio_clone.update_net_position(&instrument_id, &positions_open);
2103
2104 if let Some(calculated_unrealized_pnl) =
2105 portfolio_clone.calculate_unrealized_pnl(&instrument_id, None)
2106 {
2107 inner
2108 .borrow_mut()
2109 .unrealized_pnls
2110 .insert(event.instrument_id(), calculated_unrealized_pnl);
2111 } else {
2112 log::debug!(
2113 "Failed to calculate unrealized PnL for {}, marking as pending",
2114 event.instrument_id()
2115 );
2116 inner
2117 .borrow_mut()
2118 .pending_calcs
2119 .insert(event.instrument_id());
2120 }
2121
2122 if let Some(calculated_realized_pnl) =
2123 portfolio_clone.calculate_realized_pnl(&instrument_id, None)
2124 {
2125 inner
2126 .borrow_mut()
2127 .realized_pnls
2128 .insert(event.instrument_id(), calculated_realized_pnl);
2129 } else {
2130 log::warn!(
2131 "Failed to calculate realized PnL for {}, marking as pending",
2132 event.instrument_id()
2133 );
2134 inner
2135 .borrow_mut()
2136 .pending_calcs
2137 .insert(event.instrument_id());
2138 }
2139
2140 let cache_ref = cache.borrow();
2141 let account = cache_ref.account(&event.account_id());
2142
2143 if let Some(AccountAny::Margin(margin_account)) = account {
2144 if !margin_account.calculate_account_state {
2145 return; }
2147
2148 let cache_ref = cache.borrow();
2149 let instrument = if let Some(instrument) = cache_ref.instrument(&instrument_id) {
2150 instrument
2151 } else {
2152 log::error!("Cannot update position: no instrument found for {instrument_id}");
2153 return;
2154 };
2155
2156 let result = inner.borrow_mut().accounts.update_positions(
2157 margin_account,
2158 instrument,
2159 positions_open.iter().collect(),
2160 clock.borrow().timestamp_ns(),
2161 );
2162 let mut cache_ref = cache.borrow_mut();
2163
2164 if let Some((margin_account, _)) = result {
2165 cache_ref
2166 .update_account(&AccountAny::Margin(margin_account))
2167 .unwrap();
2168 }
2169 } else if account.is_none() {
2170 log::error!(
2171 "Cannot update position: no account registered for {}",
2172 event.account_id()
2173 );
2174 }
2175}
2176
2177fn update_account(
2178 cache: &Rc<RefCell<Cache>>,
2179 inner: &Rc<RefCell<PortfolioState>>,
2180 event: &AccountState,
2181) {
2182 let mut cache_ref = cache.borrow_mut();
2183
2184 if let Some(existing) = cache_ref.account(&event.account_id) {
2185 let mut account = existing.clone();
2186 if let Err(e) = account.apply(event.clone()) {
2187 log::error!("Failed to apply account state: {e}");
2188 return;
2189 }
2190
2191 if let Err(e) = cache_ref.update_account(&account) {
2192 log::error!("Failed to update account: {e}");
2193 return;
2194 }
2195 } else {
2196 let account = match AccountAny::from_events(std::slice::from_ref(event)) {
2197 Ok(account) => account,
2198 Err(e) => {
2199 log::error!("Failed to create account: {e}");
2200 return;
2201 }
2202 };
2203
2204 if let Err(e) = cache_ref.add_account(account) {
2205 log::error!("Failed to add account: {e}");
2206 return;
2207 }
2208 }
2209
2210 let mut inner_ref = inner.borrow_mut();
2212 let should_log = if inner_ref.min_account_state_logging_interval_ns > 0 {
2213 let current_ts = event.ts_init.as_u64();
2214 let last_ts = inner_ref
2215 .last_account_state_log_ts
2216 .get(&event.account_id)
2217 .copied()
2218 .unwrap_or(0);
2219
2220 if last_ts == 0 || (current_ts - last_ts) >= inner_ref.min_account_state_logging_interval_ns
2221 {
2222 inner_ref
2223 .last_account_state_log_ts
2224 .insert(event.account_id, current_ts);
2225 true
2226 } else {
2227 false
2228 }
2229 } else {
2230 true };
2232
2233 if should_log {
2234 log::info!("Updated {event}");
2235 }
2236}