Skip to main content

nautilus_interactive_brokers/execution/
account.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Account management for Interactive Brokers execution client.
17
18use std::{collections::HashMap, sync::Arc};
19
20use anyhow::Context;
21use ibapi::{
22    accounts::{
23        AccountSummary, AccountSummaryResult, AccountSummaryTags,
24        types::{AccountGroup, AccountId as IbAccountId},
25    },
26    client::Client,
27};
28use nautilus_common::{
29    live::runner::get_exec_event_sender,
30    messages::{ExecutionEvent, ExecutionReport},
31};
32use nautilus_core::time::get_atomic_clock_realtime;
33use nautilus_model::{
34    enums::PositionSideSpecified,
35    identifiers::AccountId,
36    instruments::Instrument,
37    reports::PositionStatusReport,
38    types::{AccountBalance, Currency, MarginBalance, Money, Quantity},
39};
40use rust_decimal::{Decimal, prelude::ToPrimitive};
41
42use crate::common::parse::ib_contract_to_instrument_id_simple;
43
44fn raw_ib_account_code(account_id: &AccountId) -> String {
45    account_id
46        .to_string()
47        .strip_prefix("IB-")
48        .unwrap_or(account_id.as_str())
49        .to_string()
50}
51
52/// Subscribe to account summary and parse to balances and margins.
53///
54/// # Arguments
55///
56/// * `client` - The IB API client
57/// * `account_id` - The account ID
58///
59/// # Returns
60///
61/// Returns balances and margins parsed from account summary.
62///
63/// # Errors
64///
65/// Returns an error if subscription fails.
66pub async fn subscribe_account_summary(
67    client: &Arc<Client>,
68    account_id: AccountId,
69) -> anyhow::Result<(Vec<AccountBalance>, Vec<MarginBalance>)> {
70    let raw_account_id = raw_ib_account_code(&account_id);
71    // Request key account summary tags (includes TotalCashValue to match Python account summary info dict).
72    let tags = &[
73        AccountSummaryTags::NET_LIQUIDATION,
74        AccountSummaryTags::TOTAL_CASH_VALUE,
75        AccountSummaryTags::SETTLED_CASH,
76        AccountSummaryTags::BUYING_POWER,
77        AccountSummaryTags::EQUITY_WITH_LOAN_VALUE,
78        AccountSummaryTags::AVAILABLE_FUNDS,
79        AccountSummaryTags::EXCESS_LIQUIDITY,
80        AccountSummaryTags::INIT_MARGIN_REQ,
81        AccountSummaryTags::MAINT_MARGIN_REQ,
82        AccountSummaryTags::CUSHION,
83    ];
84
85    let group = AccountGroup("All".to_string());
86    let mut subscription = client
87        .account_summary(&group, tags)
88        .await
89        .context("Failed to subscribe to account summary")?;
90
91    tracing::info!("Subscribed to account summary for account: {}", account_id);
92
93    // Process initial account summary snapshot
94    // We collect all summary items until the API sends AccountSummaryResult::End, so the
95    // returned balances/margins are complete (matches Python behavior of waiting for all tags).
96    let mut balances: Vec<AccountBalance> = Vec::new();
97    let mut margins: Vec<MarginBalance> = Vec::new();
98
99    while let Some(result) = subscription.next().await {
100        match result {
101            Ok(AccountSummaryResult::Summary(summary)) => {
102                // Filter for the specific account
103                if summary.account != raw_account_id {
104                    continue;
105                }
106
107                match parse_account_summary_to_balance(&summary) {
108                    Ok(balance) => {
109                        // Check if balance already exists for this currency
110                        if let Some(existing) = balances
111                            .iter_mut()
112                            .find(|b| b.total.currency == balance.total.currency)
113                        {
114                            if let Some(merged) = merge_account_summary_balance(
115                                existing,
116                                summary.tag.as_str(),
117                                &summary.value,
118                                &summary.currency,
119                            )? {
120                                *existing = merged;
121                            }
122                        } else {
123                            balances.push(balance);
124                        }
125                    }
126                    Err(e) => {
127                        tracing::warn!("Failed to parse account summary: {}", e);
128                    }
129                }
130
131                // Accumulate margin requirements by currency. IB reports INIT_MARGIN_REQ
132                // and MAINT_MARGIN_REQ as separate summary entries; merge them into one
133                // `MarginBalance` per currency so neither half overwrites the other when
134                // the account-wide margin store keys by `Currency`.
135                merge_account_summary_margin(&mut margins, &summary);
136            }
137            Ok(AccountSummaryResult::End) => {
138                break;
139            }
140            Err(e) => {
141                tracing::warn!("Error receiving account summary: {}", e);
142            }
143        }
144    }
145
146    tracing::info!(
147        "Received account summary: {} balances, {} margins",
148        balances.len(),
149        margins.len()
150    );
151
152    Ok((balances, margins))
153}
154
155fn merge_account_summary_margin(margins: &mut Vec<MarginBalance>, summary: &AccountSummary) {
156    let currency = match parse_currency(&summary.currency) {
157        Ok(currency) => currency,
158        Err(e) => {
159            tracing::warn!("Skipping margin summary with unknown currency: {}", e);
160            return;
161        }
162    };
163    let value = match parse_balance_decimal(&summary.value)
164        .and_then(|d| Money::from_decimal(d, currency).map_err(|e| anyhow::anyhow!(e.to_string())))
165    {
166        Ok(money) => money,
167        Err(e) => {
168            tracing::warn!("Failed to parse margin value '{}': {}", summary.value, e);
169            return;
170        }
171    };
172
173    let existing = margins
174        .iter_mut()
175        .find(|m| m.currency == currency && m.instrument_id.is_none());
176
177    match summary.tag.as_str() {
178        AccountSummaryTags::INIT_MARGIN_REQ => match existing {
179            Some(margin) => margin.initial = value,
180            None => margins.push(MarginBalance::new(value, Money::zero(currency), None)),
181        },
182        AccountSummaryTags::MAINT_MARGIN_REQ => match existing {
183            Some(margin) => margin.maintenance = value,
184            None => margins.push(MarginBalance::new(Money::zero(currency), value, None)),
185        },
186        _ => {}
187    }
188}
189
190fn merge_account_summary_balance(
191    existing: &AccountBalance,
192    tag: &str,
193    value: &str,
194    currency_code: &str,
195) -> anyhow::Result<Option<AccountBalance>> {
196    let currency = parse_currency(currency_code)?;
197
198    match tag {
199        AccountSummaryTags::SETTLED_CASH => {
200            let settled_cash = parse_balance_decimal(value)?;
201            Ok(Some(AccountBalance::from_total_and_locked(
202                settled_cash,
203                Decimal::ZERO,
204                currency,
205            )?))
206        }
207        AccountSummaryTags::NET_LIQUIDATION => {
208            let net_liq = parse_balance_decimal(value)?;
209            Ok(Some(AccountBalance::from_total_and_free(
210                net_liq,
211                existing.free.as_decimal(),
212                currency,
213            )?))
214        }
215        _ => Ok(None),
216    }
217}
218
219/// Subscribe to PnL updates for the account.
220///
221/// This spawns a background task to handle PnL updates.
222///
223/// # Arguments
224///
225/// * `client` - The IB API client
226/// * `account_id` - The account ID
227///
228/// # Errors
229///
230/// Returns an error if subscription fails.
231pub async fn subscribe_pnl(client: &Arc<Client>, account_id: AccountId) -> anyhow::Result<()> {
232    let account = IbAccountId(raw_ib_account_code(&account_id));
233    let mut subscription = client
234        .pnl(&account, None)
235        .await
236        .context("Failed to subscribe to PnL")?;
237
238    tracing::info!("Subscribed to PnL updates for account: {}", account_id);
239
240    // Process PnL updates in background task
241    nautilus_common::live::get_runtime().spawn(async move {
242        while let Some(result) = subscription.next().await {
243            match result {
244                Ok(pnl) => {
245                    tracing::info!(
246                        "PnL update - Daily: {:.2}, Unrealized: {:?}, Realized: {:?}",
247                        pnl.daily_pnl,
248                        pnl.unrealized_pnl,
249                        pnl.realized_pnl
250                    );
251                    // Note: Account state updates are handled by position updates and account summary
252                    // PnL is informational and tracked separately. If needed, account state can be
253                    // generated by subscribing to account summary which includes updated balances.
254                }
255                Err(e) => {
256                    tracing::warn!("Error receiving PnL update: {}", e);
257                }
258            }
259        }
260    });
261
262    Ok(())
263}
264
265/// Track known positions for detecting external changes (e.g., option exercises).
266pub type PositionTracker = Arc<tokio::sync::Mutex<HashMap<i32, Decimal>>>;
267
268/// Create a new position tracker.
269pub fn create_position_tracker() -> PositionTracker {
270    Arc::new(tokio::sync::Mutex::new(HashMap::new()))
271}
272
273/// Check if a position update represents an external change (e.g., option exercise).
274///
275/// # Arguments
276///
277/// * `position_tracker` - Shared position tracker
278/// * `contract_id` - IB contract ID
279/// * `new_quantity` - New position quantity
280///
281/// # Returns
282///
283/// Returns `(is_external_change, old_quantity)` if this is an external change.
284pub async fn check_external_position_change(
285    position_tracker: &PositionTracker,
286    contract_id: i32,
287    new_quantity: Decimal,
288) -> Option<(bool, Decimal)> {
289    let mut tracker = position_tracker.lock().await;
290    let known_quantity = tracker.get(&contract_id).copied().unwrap_or(Decimal::ZERO);
291
292    // Skip zero positions
293    if new_quantity.is_zero() {
294        tracker.remove(&contract_id);
295        return None;
296    }
297
298    // Check if this is an external position change
299    // If quantities match, this is likely from normal trading - not external
300    if known_quantity == new_quantity {
301        return None;
302    }
303
304    // This is a change - determine if it's external
305    // External changes occur when position changes without a corresponding execution
306    // Update tracked position
307    tracker.insert(contract_id, new_quantity);
308
309    // If we had a known position and it changed, it's likely external
310    if known_quantity != Decimal::ZERO && known_quantity != new_quantity {
311        Some((true, known_quantity))
312    } else {
313        // New position or first time seeing it
314        Some((false, known_quantity))
315    }
316}
317
318/// Initialize position tracking with existing positions.
319///
320/// This fetches all current positions and initializes the position tracker
321/// to avoid processing duplicates from execDetails.
322///
323/// # Arguments
324///
325/// * `client` - The IB API client
326/// * `account_id` - The account ID
327/// * `position_tracker` - Shared position tracker to initialize
328///
329/// # Errors
330///
331/// Returns an error if position request fails.
332pub async fn initialize_position_tracking(
333    client: &Arc<Client>,
334    account_id: AccountId,
335    position_tracker: PositionTracker,
336) -> anyhow::Result<()> {
337    let raw_account_id = raw_ib_account_code(&account_id);
338    let mut subscription = client
339        .positions()
340        .await
341        .context("Failed to request positions")?;
342
343    tracing::info!("Initializing position tracking for account: {}", account_id);
344
345    let mut position_count = 0;
346    let mut tracker = position_tracker.lock().await;
347
348    while let Some(result) = subscription.next().await {
349        match result {
350            Ok(ibapi::accounts::PositionUpdate::Position(position)) => {
351                // Filter for the specific account
352                if position.account != raw_account_id {
353                    continue;
354                }
355
356                let contract_id = position.contract.contract_id;
357                let quantity = Decimal::from_f64_retain(position.position).unwrap_or_default();
358
359                // Only track non-zero positions
360                if !quantity.is_zero() {
361                    tracker.insert(contract_id, quantity);
362                    position_count += 1;
363                }
364            }
365            Ok(ibapi::accounts::PositionUpdate::PositionEnd) => {
366                break;
367            }
368            Err(e) => {
369                tracing::warn!("Error receiving position update: {}", e);
370            }
371        }
372    }
373
374    tracing::info!(
375        "Initialized tracking for {} existing positions",
376        position_count
377    );
378
379    Ok(())
380}
381
382/// Subscribe to real-time position updates for detecting external position changes (e.g., option exercises).
383///
384/// This spawns a background task to track position changes and generate position status reports
385/// for external changes.
386///
387/// # Arguments
388///
389/// * `client` - The IB API client
390/// * `account_id` - The account ID
391/// * `position_tracker` - Shared position tracker for detecting external changes
392/// * `instrument_provider` - Instrument provider for resolving contracts to instruments
393///
394/// # Errors
395///
396/// Returns an error if subscription fails.
397pub async fn subscribe_positions(
398    client: &Arc<Client>,
399    account_id: AccountId,
400    position_tracker: PositionTracker,
401    instrument_provider: Arc<crate::providers::instruments::InteractiveBrokersInstrumentProvider>,
402) -> anyhow::Result<()> {
403    let raw_account_id = raw_ib_account_code(&account_id);
404    let mut subscription = client
405        .positions()
406        .await
407        .context("Failed to subscribe to positions")?;
408
409    tracing::info!("Subscribed to position updates for account: {}", account_id);
410
411    let exec_sender = get_exec_event_sender();
412    let clock = get_atomic_clock_realtime();
413
414    // Spawn background task to handle position updates
415    nautilus_common::live::get_runtime().spawn(async move {
416        while let Some(result) = subscription.next().await {
417            match result {
418                Ok(ibapi::accounts::PositionUpdate::Position(position)) => {
419                    if position.account != raw_account_id {
420                        continue;
421                    }
422
423                    let contract_id = position.contract.contract_id;
424                    let new_quantity =
425                        Decimal::from_f64_retain(position.position).unwrap_or_default();
426
427                    // Check if this is an external position change
428                    if let Some((is_external, old_quantity)) =
429                        check_external_position_change(&position_tracker, contract_id, new_quantity)
430                            .await
431                        && is_external
432                    {
433                        tracing::warn!(
434                            "External position change detected (likely option exercise): \
435                                Contract ID {}, quantity change: {} -> {}",
436                            contract_id,
437                            old_quantity,
438                            new_quantity
439                        );
440
441                        // Convert IB contract to instrument ID
442                        match ib_contract_to_instrument_id_simple(&position.contract) {
443                            Ok(instrument_id) => {
444                                // Get instrument for precision
445                                if let Some(instrument) = instrument_provider.find(&instrument_id) {
446                                    // Determine position side
447                                    let position_side = if new_quantity.is_zero() {
448                                        PositionSideSpecified::Flat
449                                    } else if new_quantity > Decimal::ZERO {
450                                        PositionSideSpecified::Long
451                                    } else {
452                                        PositionSideSpecified::Short
453                                    };
454
455                                    let quantity = Quantity::new(
456                                        new_quantity.abs().to_f64().unwrap_or(0.0),
457                                        instrument.size_precision(),
458                                    );
459
460                                    // Convert IB avg_cost to Nautilus Price, accounting for price magnifier and multiplier
461                                    // Python: converted_avg_cost = avg_cost / (multiplier * price_magnifier)
462                                    let avg_px_open = if position.average_cost > 0.0 {
463                                        let price_magnifier = instrument_provider
464                                            .get_price_magnifier(&instrument_id)
465                                            as f64;
466                                        let multiplier = instrument.multiplier().as_f64();
467                                        let converted_avg_cost =
468                                            position.average_cost / (multiplier * price_magnifier);
469                                        let price_precision = instrument.price_precision();
470                                        Some(
471                                            Decimal::from_f64_retain(converted_avg_cost)
472                                                .and_then(|d| {
473                                                    // Round to price precision
474                                                    let rounded =
475                                                        d.round_dp(price_precision as u32);
476                                                    Some(rounded)
477                                                })
478                                                .unwrap_or_default(),
479                                        )
480                                    } else {
481                                        None
482                                    };
483
484                                    let ts_init = clock.get_time_ns();
485
486                                    let report = PositionStatusReport::new(
487                                        account_id,
488                                        instrument_id,
489                                        position_side,
490                                        quantity,
491                                        ts_init,
492                                        ts_init,
493                                        None, // report_id: auto-generated
494                                        None, // venue_position_id
495                                        avg_px_open,
496                                    );
497
498                                    // Send position status report
499                                    let event = ExecutionEvent::Report(
500                                        ExecutionReport::Position(
501                                            Box::new(report),
502                                        ),
503                                    );
504
505                                    if exec_sender.send(event).is_err() {
506                                        tracing::warn!(
507                                            "Failed to send position status report for external change"
508                                        );
509                                    } else {
510                                        tracing::info!(
511                                            "Generated position status report for external change (likely option exercise)"
512                                        );
513                                    }
514                                } else {
515                                    tracing::warn!(
516                                        "Instrument not found for contract ID: {}",
517                                        contract_id
518                                    );
519                                }
520                            }
521                            Err(e) => {
522                                tracing::warn!(
523                                    "Failed to convert contract to instrument ID: {}",
524                                    e
525                                );
526                            }
527                        }
528                    }
529                }
530                Ok(ibapi::accounts::PositionUpdate::PositionEnd) => {
531                    break;
532                }
533                Err(e) => {
534                    tracing::warn!("Error receiving position update: {}", e);
535                }
536            }
537        }
538    });
539
540    Ok(())
541}
542
543/// Parse IB account summary to Nautilus AccountBalance.
544fn parse_account_summary_to_balance(summary: &AccountSummary) -> anyhow::Result<AccountBalance> {
545    let currency = parse_currency(&summary.currency)?;
546    let balance = parse_balance_decimal(&summary.value)?;
547
548    match summary.tag.as_str() {
549        AccountSummaryTags::SETTLED_CASH | AccountSummaryTags::TOTAL_CASH_VALUE => {
550            // Cash balance - free equals total for settled cash
551            AccountBalance::from_total_and_locked(balance, Decimal::ZERO, currency)
552                .map_err(Into::into)
553        }
554        AccountSummaryTags::NET_LIQUIDATION => {
555            // Net liquidation - represents total equity
556            // Free would be calculated from available funds
557            AccountBalance::from_total_and_locked(balance, Decimal::ZERO, currency)
558                .map_err(Into::into)
559        }
560        AccountSummaryTags::BUYING_POWER | AccountSummaryTags::AVAILABLE_FUNDS => {
561            // Available funds - this is the free amount
562            AccountBalance::from_total_and_free(balance, balance, currency).map_err(Into::into)
563        }
564        _ => {
565            // Default: treat as total balance
566            AccountBalance::from_total_and_locked(balance, Decimal::ZERO, currency)
567                .map_err(Into::into)
568        }
569    }
570}
571
572fn parse_balance_decimal(value: &str) -> anyhow::Result<Decimal> {
573    value
574        .parse::<Decimal>()
575        .context(format!("Failed to parse balance value: {}", value))
576}
577
578fn parse_currency(currency: &str) -> anyhow::Result<Currency> {
579    anyhow::ensure!(!currency.is_empty(), "Account summary currency was empty");
580    Ok(Currency::from(currency))
581}
582
583#[cfg(test)]
584mod tests {
585    use ibapi::accounts::AccountSummary;
586    use nautilus_model::types::{AccountBalance, Currency, MarginBalance, Money};
587    use rstest::rstest;
588
589    use super::{
590        AccountSummaryTags, merge_account_summary_balance, merge_account_summary_margin,
591        parse_currency,
592    };
593
594    fn margin_summary(tag: &str, value: &str, currency: &str) -> AccountSummary {
595        AccountSummary {
596            account: "DU123".to_string(),
597            tag: tag.to_string(),
598            value: value.to_string(),
599            currency: currency.to_string(),
600        }
601    }
602
603    /// Verifies the IB avg cost to Nautilus price conversion formula used in position parsing.
604    /// Python: converted_avg_cost = avg_cost / (multiplier * price_magnifier)
605    #[rstest]
606    fn test_ib_avg_cost_to_price_conversion() {
607        let avg_cost = 100.0;
608        let multiplier = 10.0;
609        let price_magnifier = 2.0;
610        let converted = avg_cost / (multiplier * price_magnifier);
611        assert_eq!(converted, 5.0);
612
613        let avg_cost2 = 1_500_000.0;
614        let multiplier2 = 50.0;
615        let price_magnifier2 = 10;
616        let converted2 = avg_cost2 / (multiplier2 * (price_magnifier2 as f64));
617        assert_eq!(converted2, 3000.0);
618    }
619
620    #[rstest]
621    fn test_parse_currency_rejects_empty_string() {
622        let result = parse_currency("");
623        assert!(result.is_err());
624        assert_eq!(
625            result.unwrap_err().to_string(),
626            "Account summary currency was empty",
627        );
628    }
629
630    #[rstest]
631    fn test_net_liquidation_merge_clamps_free_to_total() {
632        let existing = AccountBalance::from_total_and_free(
633            "120.00".parse().unwrap(),
634            "120.00".parse().unwrap(),
635            Currency::USD(),
636        )
637        .unwrap();
638
639        let merged = merge_account_summary_balance(
640            &existing,
641            AccountSummaryTags::NET_LIQUIDATION,
642            "100.00",
643            "USD",
644        )
645        .unwrap()
646        .unwrap();
647
648        assert_eq!(merged.total.as_decimal(), "100.00".parse().unwrap());
649        assert_eq!(merged.locked.as_decimal(), "0.00".parse().unwrap());
650        assert_eq!(merged.free.as_decimal(), "100.00".parse().unwrap());
651    }
652
653    #[rstest]
654    fn test_merge_account_summary_margin_combines_init_and_maint() {
655        // Regression: `INIT_MARGIN_REQ` and `MAINT_MARGIN_REQ` arrive as separate
656        // summary entries. The merge must land in a single `MarginBalance` per
657        // currency so neither half overwrites the other once the account-wide
658        // store keys by `Currency`.
659        let mut margins: Vec<MarginBalance> = Vec::new();
660
661        merge_account_summary_margin(
662            &mut margins,
663            &margin_summary(AccountSummaryTags::INIT_MARGIN_REQ, "500.00", "USD"),
664        );
665        merge_account_summary_margin(
666            &mut margins,
667            &margin_summary(AccountSummaryTags::MAINT_MARGIN_REQ, "250.00", "USD"),
668        );
669
670        assert_eq!(margins.len(), 1);
671        let margin = &margins[0];
672        assert!(margin.instrument_id.is_none());
673        assert_eq!(margin.currency, Currency::USD());
674        assert_eq!(margin.initial, Money::from("500.00 USD"));
675        assert_eq!(margin.maintenance, Money::from("250.00 USD"));
676    }
677
678    #[rstest]
679    fn test_merge_account_summary_margin_order_independent() {
680        // Arrival order should not matter.
681        let mut margins: Vec<MarginBalance> = Vec::new();
682
683        merge_account_summary_margin(
684            &mut margins,
685            &margin_summary(AccountSummaryTags::MAINT_MARGIN_REQ, "250.00", "USD"),
686        );
687        merge_account_summary_margin(
688            &mut margins,
689            &margin_summary(AccountSummaryTags::INIT_MARGIN_REQ, "500.00", "USD"),
690        );
691
692        assert_eq!(margins.len(), 1);
693        let margin = &margins[0];
694        assert_eq!(margin.initial, Money::from("500.00 USD"));
695        assert_eq!(margin.maintenance, Money::from("250.00 USD"));
696    }
697
698    #[rstest]
699    fn test_merge_account_summary_margin_separates_currencies() {
700        let mut margins: Vec<MarginBalance> = Vec::new();
701
702        merge_account_summary_margin(
703            &mut margins,
704            &margin_summary(AccountSummaryTags::INIT_MARGIN_REQ, "500.00", "USD"),
705        );
706        merge_account_summary_margin(
707            &mut margins,
708            &margin_summary(AccountSummaryTags::INIT_MARGIN_REQ, "400.00", "EUR"),
709        );
710
711        assert_eq!(margins.len(), 2);
712        let usd = margins
713            .iter()
714            .find(|m| m.currency == Currency::USD())
715            .unwrap();
716        let eur = margins
717            .iter()
718            .find(|m| m.currency == Currency::EUR())
719            .unwrap();
720        assert_eq!(usd.initial, Money::from("500.00 USD"));
721        assert_eq!(eur.initial, Money::from("400.00 EUR"));
722    }
723}