1use std::{collections::HashMap, sync::Arc};
19
20use anyhow::Context;
21use ibapi::{
22 accounts::{
23 AccountSummary, AccountSummaryResult, AccountSummaryTags,
24 types::{AccountGroup, AccountId as IbAccountId},
25 },
26 client::Client,
27};
28use nautilus_common::{
29 live::runner::get_exec_event_sender,
30 messages::{ExecutionEvent, ExecutionReport},
31};
32use nautilus_core::time::get_atomic_clock_realtime;
33use nautilus_model::{
34 enums::PositionSideSpecified,
35 identifiers::AccountId,
36 instruments::Instrument,
37 reports::PositionStatusReport,
38 types::{AccountBalance, Currency, MarginBalance, Money, Quantity},
39};
40use rust_decimal::{Decimal, prelude::ToPrimitive};
41
42use crate::common::parse::ib_contract_to_instrument_id_simple;
43
44fn raw_ib_account_code(account_id: &AccountId) -> String {
45 account_id
46 .to_string()
47 .strip_prefix("IB-")
48 .unwrap_or(account_id.as_str())
49 .to_string()
50}
51
52pub async fn subscribe_account_summary(
67 client: &Arc<Client>,
68 account_id: AccountId,
69) -> anyhow::Result<(Vec<AccountBalance>, Vec<MarginBalance>)> {
70 let raw_account_id = raw_ib_account_code(&account_id);
71 let tags = &[
73 AccountSummaryTags::NET_LIQUIDATION,
74 AccountSummaryTags::TOTAL_CASH_VALUE,
75 AccountSummaryTags::SETTLED_CASH,
76 AccountSummaryTags::BUYING_POWER,
77 AccountSummaryTags::EQUITY_WITH_LOAN_VALUE,
78 AccountSummaryTags::AVAILABLE_FUNDS,
79 AccountSummaryTags::EXCESS_LIQUIDITY,
80 AccountSummaryTags::INIT_MARGIN_REQ,
81 AccountSummaryTags::MAINT_MARGIN_REQ,
82 AccountSummaryTags::CUSHION,
83 ];
84
85 let group = AccountGroup("All".to_string());
86 let mut subscription = client
87 .account_summary(&group, tags)
88 .await
89 .context("Failed to subscribe to account summary")?;
90
91 tracing::info!("Subscribed to account summary for account: {}", account_id);
92
93 let mut balances: Vec<AccountBalance> = Vec::new();
97 let mut margins: Vec<MarginBalance> = Vec::new();
98
99 while let Some(result) = subscription.next().await {
100 match result {
101 Ok(AccountSummaryResult::Summary(summary)) => {
102 if summary.account != raw_account_id {
104 continue;
105 }
106
107 match parse_account_summary_to_balance(&summary) {
108 Ok(balance) => {
109 if let Some(existing) = balances
111 .iter_mut()
112 .find(|b| b.total.currency == balance.total.currency)
113 {
114 if let Some(merged) = merge_account_summary_balance(
115 existing,
116 summary.tag.as_str(),
117 &summary.value,
118 &summary.currency,
119 )? {
120 *existing = merged;
121 }
122 } else {
123 balances.push(balance);
124 }
125 }
126 Err(e) => {
127 tracing::warn!("Failed to parse account summary: {}", e);
128 }
129 }
130
131 merge_account_summary_margin(&mut margins, &summary);
136 }
137 Ok(AccountSummaryResult::End) => {
138 break;
139 }
140 Err(e) => {
141 tracing::warn!("Error receiving account summary: {}", e);
142 }
143 }
144 }
145
146 tracing::info!(
147 "Received account summary: {} balances, {} margins",
148 balances.len(),
149 margins.len()
150 );
151
152 Ok((balances, margins))
153}
154
155fn merge_account_summary_margin(margins: &mut Vec<MarginBalance>, summary: &AccountSummary) {
156 let currency = match parse_currency(&summary.currency) {
157 Ok(currency) => currency,
158 Err(e) => {
159 tracing::warn!("Skipping margin summary with unknown currency: {}", e);
160 return;
161 }
162 };
163 let value = match parse_balance_decimal(&summary.value)
164 .and_then(|d| Money::from_decimal(d, currency).map_err(|e| anyhow::anyhow!(e.to_string())))
165 {
166 Ok(money) => money,
167 Err(e) => {
168 tracing::warn!("Failed to parse margin value '{}': {}", summary.value, e);
169 return;
170 }
171 };
172
173 let existing = margins
174 .iter_mut()
175 .find(|m| m.currency == currency && m.instrument_id.is_none());
176
177 match summary.tag.as_str() {
178 AccountSummaryTags::INIT_MARGIN_REQ => match existing {
179 Some(margin) => margin.initial = value,
180 None => margins.push(MarginBalance::new(value, Money::zero(currency), None)),
181 },
182 AccountSummaryTags::MAINT_MARGIN_REQ => match existing {
183 Some(margin) => margin.maintenance = value,
184 None => margins.push(MarginBalance::new(Money::zero(currency), value, None)),
185 },
186 _ => {}
187 }
188}
189
190fn merge_account_summary_balance(
191 existing: &AccountBalance,
192 tag: &str,
193 value: &str,
194 currency_code: &str,
195) -> anyhow::Result<Option<AccountBalance>> {
196 let currency = parse_currency(currency_code)?;
197
198 match tag {
199 AccountSummaryTags::SETTLED_CASH => {
200 let settled_cash = parse_balance_decimal(value)?;
201 Ok(Some(AccountBalance::from_total_and_locked(
202 settled_cash,
203 Decimal::ZERO,
204 currency,
205 )?))
206 }
207 AccountSummaryTags::NET_LIQUIDATION => {
208 let net_liq = parse_balance_decimal(value)?;
209 Ok(Some(AccountBalance::from_total_and_free(
210 net_liq,
211 existing.free.as_decimal(),
212 currency,
213 )?))
214 }
215 _ => Ok(None),
216 }
217}
218
219pub async fn subscribe_pnl(client: &Arc<Client>, account_id: AccountId) -> anyhow::Result<()> {
232 let account = IbAccountId(raw_ib_account_code(&account_id));
233 let mut subscription = client
234 .pnl(&account, None)
235 .await
236 .context("Failed to subscribe to PnL")?;
237
238 tracing::info!("Subscribed to PnL updates for account: {}", account_id);
239
240 nautilus_common::live::get_runtime().spawn(async move {
242 while let Some(result) = subscription.next().await {
243 match result {
244 Ok(pnl) => {
245 tracing::info!(
246 "PnL update - Daily: {:.2}, Unrealized: {:?}, Realized: {:?}",
247 pnl.daily_pnl,
248 pnl.unrealized_pnl,
249 pnl.realized_pnl
250 );
251 }
255 Err(e) => {
256 tracing::warn!("Error receiving PnL update: {}", e);
257 }
258 }
259 }
260 });
261
262 Ok(())
263}
264
265pub type PositionTracker = Arc<tokio::sync::Mutex<HashMap<i32, Decimal>>>;
267
268pub fn create_position_tracker() -> PositionTracker {
270 Arc::new(tokio::sync::Mutex::new(HashMap::new()))
271}
272
273pub async fn check_external_position_change(
285 position_tracker: &PositionTracker,
286 contract_id: i32,
287 new_quantity: Decimal,
288) -> Option<(bool, Decimal)> {
289 let mut tracker = position_tracker.lock().await;
290 let known_quantity = tracker.get(&contract_id).copied().unwrap_or(Decimal::ZERO);
291
292 if new_quantity.is_zero() {
294 tracker.remove(&contract_id);
295 return None;
296 }
297
298 if known_quantity == new_quantity {
301 return None;
302 }
303
304 tracker.insert(contract_id, new_quantity);
308
309 if known_quantity != Decimal::ZERO && known_quantity != new_quantity {
311 Some((true, known_quantity))
312 } else {
313 Some((false, known_quantity))
315 }
316}
317
318pub async fn initialize_position_tracking(
333 client: &Arc<Client>,
334 account_id: AccountId,
335 position_tracker: PositionTracker,
336) -> anyhow::Result<()> {
337 let raw_account_id = raw_ib_account_code(&account_id);
338 let mut subscription = client
339 .positions()
340 .await
341 .context("Failed to request positions")?;
342
343 tracing::info!("Initializing position tracking for account: {}", account_id);
344
345 let mut position_count = 0;
346 let mut tracker = position_tracker.lock().await;
347
348 while let Some(result) = subscription.next().await {
349 match result {
350 Ok(ibapi::accounts::PositionUpdate::Position(position)) => {
351 if position.account != raw_account_id {
353 continue;
354 }
355
356 let contract_id = position.contract.contract_id;
357 let quantity = Decimal::from_f64_retain(position.position).unwrap_or_default();
358
359 if !quantity.is_zero() {
361 tracker.insert(contract_id, quantity);
362 position_count += 1;
363 }
364 }
365 Ok(ibapi::accounts::PositionUpdate::PositionEnd) => {
366 break;
367 }
368 Err(e) => {
369 tracing::warn!("Error receiving position update: {}", e);
370 }
371 }
372 }
373
374 tracing::info!(
375 "Initialized tracking for {} existing positions",
376 position_count
377 );
378
379 Ok(())
380}
381
382pub async fn subscribe_positions(
398 client: &Arc<Client>,
399 account_id: AccountId,
400 position_tracker: PositionTracker,
401 instrument_provider: Arc<crate::providers::instruments::InteractiveBrokersInstrumentProvider>,
402) -> anyhow::Result<()> {
403 let raw_account_id = raw_ib_account_code(&account_id);
404 let mut subscription = client
405 .positions()
406 .await
407 .context("Failed to subscribe to positions")?;
408
409 tracing::info!("Subscribed to position updates for account: {}", account_id);
410
411 let exec_sender = get_exec_event_sender();
412 let clock = get_atomic_clock_realtime();
413
414 nautilus_common::live::get_runtime().spawn(async move {
416 while let Some(result) = subscription.next().await {
417 match result {
418 Ok(ibapi::accounts::PositionUpdate::Position(position)) => {
419 if position.account != raw_account_id {
420 continue;
421 }
422
423 let contract_id = position.contract.contract_id;
424 let new_quantity =
425 Decimal::from_f64_retain(position.position).unwrap_or_default();
426
427 if let Some((is_external, old_quantity)) =
429 check_external_position_change(&position_tracker, contract_id, new_quantity)
430 .await
431 && is_external
432 {
433 tracing::warn!(
434 "External position change detected (likely option exercise): \
435 Contract ID {}, quantity change: {} -> {}",
436 contract_id,
437 old_quantity,
438 new_quantity
439 );
440
441 match ib_contract_to_instrument_id_simple(&position.contract) {
443 Ok(instrument_id) => {
444 if let Some(instrument) = instrument_provider.find(&instrument_id) {
446 let position_side = if new_quantity.is_zero() {
448 PositionSideSpecified::Flat
449 } else if new_quantity > Decimal::ZERO {
450 PositionSideSpecified::Long
451 } else {
452 PositionSideSpecified::Short
453 };
454
455 let quantity = Quantity::new(
456 new_quantity.abs().to_f64().unwrap_or(0.0),
457 instrument.size_precision(),
458 );
459
460 let avg_px_open = if position.average_cost > 0.0 {
463 let price_magnifier = instrument_provider
464 .get_price_magnifier(&instrument_id)
465 as f64;
466 let multiplier = instrument.multiplier().as_f64();
467 let converted_avg_cost =
468 position.average_cost / (multiplier * price_magnifier);
469 let price_precision = instrument.price_precision();
470 Some(
471 Decimal::from_f64_retain(converted_avg_cost)
472 .and_then(|d| {
473 let rounded =
475 d.round_dp(price_precision as u32);
476 Some(rounded)
477 })
478 .unwrap_or_default(),
479 )
480 } else {
481 None
482 };
483
484 let ts_init = clock.get_time_ns();
485
486 let report = PositionStatusReport::new(
487 account_id,
488 instrument_id,
489 position_side,
490 quantity,
491 ts_init,
492 ts_init,
493 None, None, avg_px_open,
496 );
497
498 let event = ExecutionEvent::Report(
500 ExecutionReport::Position(
501 Box::new(report),
502 ),
503 );
504
505 if exec_sender.send(event).is_err() {
506 tracing::warn!(
507 "Failed to send position status report for external change"
508 );
509 } else {
510 tracing::info!(
511 "Generated position status report for external change (likely option exercise)"
512 );
513 }
514 } else {
515 tracing::warn!(
516 "Instrument not found for contract ID: {}",
517 contract_id
518 );
519 }
520 }
521 Err(e) => {
522 tracing::warn!(
523 "Failed to convert contract to instrument ID: {}",
524 e
525 );
526 }
527 }
528 }
529 }
530 Ok(ibapi::accounts::PositionUpdate::PositionEnd) => {
531 break;
532 }
533 Err(e) => {
534 tracing::warn!("Error receiving position update: {}", e);
535 }
536 }
537 }
538 });
539
540 Ok(())
541}
542
543fn parse_account_summary_to_balance(summary: &AccountSummary) -> anyhow::Result<AccountBalance> {
545 let currency = parse_currency(&summary.currency)?;
546 let balance = parse_balance_decimal(&summary.value)?;
547
548 match summary.tag.as_str() {
549 AccountSummaryTags::SETTLED_CASH | AccountSummaryTags::TOTAL_CASH_VALUE => {
550 AccountBalance::from_total_and_locked(balance, Decimal::ZERO, currency)
552 .map_err(Into::into)
553 }
554 AccountSummaryTags::NET_LIQUIDATION => {
555 AccountBalance::from_total_and_locked(balance, Decimal::ZERO, currency)
558 .map_err(Into::into)
559 }
560 AccountSummaryTags::BUYING_POWER | AccountSummaryTags::AVAILABLE_FUNDS => {
561 AccountBalance::from_total_and_free(balance, balance, currency).map_err(Into::into)
563 }
564 _ => {
565 AccountBalance::from_total_and_locked(balance, Decimal::ZERO, currency)
567 .map_err(Into::into)
568 }
569 }
570}
571
572fn parse_balance_decimal(value: &str) -> anyhow::Result<Decimal> {
573 value
574 .parse::<Decimal>()
575 .context(format!("Failed to parse balance value: {}", value))
576}
577
578fn parse_currency(currency: &str) -> anyhow::Result<Currency> {
579 anyhow::ensure!(!currency.is_empty(), "Account summary currency was empty");
580 Ok(Currency::from(currency))
581}
582
583#[cfg(test)]
584mod tests {
585 use ibapi::accounts::AccountSummary;
586 use nautilus_model::types::{AccountBalance, Currency, MarginBalance, Money};
587 use rstest::rstest;
588
589 use super::{
590 AccountSummaryTags, merge_account_summary_balance, merge_account_summary_margin,
591 parse_currency,
592 };
593
594 fn margin_summary(tag: &str, value: &str, currency: &str) -> AccountSummary {
595 AccountSummary {
596 account: "DU123".to_string(),
597 tag: tag.to_string(),
598 value: value.to_string(),
599 currency: currency.to_string(),
600 }
601 }
602
603 #[rstest]
606 fn test_ib_avg_cost_to_price_conversion() {
607 let avg_cost = 100.0;
608 let multiplier = 10.0;
609 let price_magnifier = 2.0;
610 let converted = avg_cost / (multiplier * price_magnifier);
611 assert_eq!(converted, 5.0);
612
613 let avg_cost2 = 1_500_000.0;
614 let multiplier2 = 50.0;
615 let price_magnifier2 = 10;
616 let converted2 = avg_cost2 / (multiplier2 * (price_magnifier2 as f64));
617 assert_eq!(converted2, 3000.0);
618 }
619
620 #[rstest]
621 fn test_parse_currency_rejects_empty_string() {
622 let result = parse_currency("");
623 assert!(result.is_err());
624 assert_eq!(
625 result.unwrap_err().to_string(),
626 "Account summary currency was empty",
627 );
628 }
629
630 #[rstest]
631 fn test_net_liquidation_merge_clamps_free_to_total() {
632 let existing = AccountBalance::from_total_and_free(
633 "120.00".parse().unwrap(),
634 "120.00".parse().unwrap(),
635 Currency::USD(),
636 )
637 .unwrap();
638
639 let merged = merge_account_summary_balance(
640 &existing,
641 AccountSummaryTags::NET_LIQUIDATION,
642 "100.00",
643 "USD",
644 )
645 .unwrap()
646 .unwrap();
647
648 assert_eq!(merged.total.as_decimal(), "100.00".parse().unwrap());
649 assert_eq!(merged.locked.as_decimal(), "0.00".parse().unwrap());
650 assert_eq!(merged.free.as_decimal(), "100.00".parse().unwrap());
651 }
652
653 #[rstest]
654 fn test_merge_account_summary_margin_combines_init_and_maint() {
655 let mut margins: Vec<MarginBalance> = Vec::new();
660
661 merge_account_summary_margin(
662 &mut margins,
663 &margin_summary(AccountSummaryTags::INIT_MARGIN_REQ, "500.00", "USD"),
664 );
665 merge_account_summary_margin(
666 &mut margins,
667 &margin_summary(AccountSummaryTags::MAINT_MARGIN_REQ, "250.00", "USD"),
668 );
669
670 assert_eq!(margins.len(), 1);
671 let margin = &margins[0];
672 assert!(margin.instrument_id.is_none());
673 assert_eq!(margin.currency, Currency::USD());
674 assert_eq!(margin.initial, Money::from("500.00 USD"));
675 assert_eq!(margin.maintenance, Money::from("250.00 USD"));
676 }
677
678 #[rstest]
679 fn test_merge_account_summary_margin_order_independent() {
680 let mut margins: Vec<MarginBalance> = Vec::new();
682
683 merge_account_summary_margin(
684 &mut margins,
685 &margin_summary(AccountSummaryTags::MAINT_MARGIN_REQ, "250.00", "USD"),
686 );
687 merge_account_summary_margin(
688 &mut margins,
689 &margin_summary(AccountSummaryTags::INIT_MARGIN_REQ, "500.00", "USD"),
690 );
691
692 assert_eq!(margins.len(), 1);
693 let margin = &margins[0];
694 assert_eq!(margin.initial, Money::from("500.00 USD"));
695 assert_eq!(margin.maintenance, Money::from("250.00 USD"));
696 }
697
698 #[rstest]
699 fn test_merge_account_summary_margin_separates_currencies() {
700 let mut margins: Vec<MarginBalance> = Vec::new();
701
702 merge_account_summary_margin(
703 &mut margins,
704 &margin_summary(AccountSummaryTags::INIT_MARGIN_REQ, "500.00", "USD"),
705 );
706 merge_account_summary_margin(
707 &mut margins,
708 &margin_summary(AccountSummaryTags::INIT_MARGIN_REQ, "400.00", "EUR"),
709 );
710
711 assert_eq!(margins.len(), 2);
712 let usd = margins
713 .iter()
714 .find(|m| m.currency == Currency::USD())
715 .unwrap();
716 let eur = margins
717 .iter()
718 .find(|m| m.currency == Currency::EUR())
719 .unwrap();
720 assert_eq!(usd.initial, Money::from("500.00 USD"));
721 assert_eq!(eur.initial, Money::from("400.00 EUR"));
722 }
723}