Skip to main content

nautilus_interactive_brokers/common/
shared_client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Shared IB API client connection per (host, port, client_id).
17//!
18//! Data, execution, and historical clients use a single TCP connection per logical
19//! connection to avoid client ID conflicts and redundant connections (parity with
20//! Python's get_cached_ib_client).
21
22use std::{
23    collections::HashMap,
24    fmt::Debug,
25    ops::Deref,
26    sync::{Arc, LazyLock, Mutex},
27    time::Duration,
28};
29
30use anyhow::Context;
31use ibapi::client::Client;
32
33/// Key for the connection registry: (host, port, client_id).
34#[derive(Clone, Debug, Eq, Hash, PartialEq)]
35struct ConnectionKey(String, u16, i32);
36
37/// Registry entry: shared client and its ref count.
38type RegistryMap = HashMap<ConnectionKey, (Arc<Client>, u32)>;
39
40/// Global registry: one shared client per (host, port, client_id) with ref count.
41static REGISTRY: LazyLock<Arc<Mutex<RegistryMap>>> =
42    LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
43
44/// Handle to a shared IB client; when dropped, ref count is decremented and the
45/// connection is removed from the registry when the count reaches zero.
46pub struct SharedClientHandle {
47    client: Arc<Client>,
48    registry: Arc<Mutex<RegistryMap>>,
49    key: ConnectionKey,
50}
51
52impl Debug for SharedClientHandle {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        f.debug_struct(stringify!(SharedClientHandle))
55            .field("key", &self.key)
56            .finish_non_exhaustive()
57    }
58}
59
60impl SharedClientHandle {
61    fn new(client: Arc<Client>, registry: Arc<Mutex<RegistryMap>>, key: ConnectionKey) -> Self {
62        Self {
63            client,
64            registry,
65            key,
66        }
67    }
68
69    /// Returns a reference to the underlying `Arc<Client>` for call sites that need it.
70    pub fn as_arc(&self) -> &Arc<Client> {
71        &self.client
72    }
73}
74
75impl Deref for SharedClientHandle {
76    type Target = Client;
77
78    fn deref(&self) -> &Self::Target {
79        self.client.as_ref()
80    }
81}
82
83impl Drop for SharedClientHandle {
84    fn drop(&mut self) {
85        if let Ok(mut guard) = self.registry.lock()
86            && let Some((_, ref_count)) = guard.get_mut(&self.key)
87        {
88            *ref_count = ref_count.saturating_sub(1);
89            if *ref_count == 0 {
90                guard.remove(&self.key);
91                tracing::debug!(
92                    "Shared IB client removed from registry (host={}, port={}, client_id={})",
93                    self.key.0,
94                    self.key.1,
95                    self.key.2
96                );
97            }
98        }
99    }
100}
101
102/// Returns a handle to the shared IB client for the given (host, port, client_id).
103/// If a connection already exists, its ref count is incremented and the same client
104/// is returned. Otherwise a new connection is established and registered.
105///
106/// # Errors
107///
108/// Returns an error if connecting to IB Gateway/TWS fails.
109pub async fn get_or_connect(
110    host: &str,
111    port: u16,
112    client_id: i32,
113    connection_timeout_secs: u64,
114) -> anyhow::Result<SharedClientHandle> {
115    let key = ConnectionKey(host.to_string(), port, client_id);
116    let registry = Arc::clone(&REGISTRY);
117
118    log::debug!(
119        "Acquiring shared IB client (host={}, port={}, client_id={}, timeout_secs={})",
120        host,
121        port,
122        client_id,
123        connection_timeout_secs
124    );
125
126    let (reuse_client, ref_count_val) = {
127        let mut guard = registry
128            .lock()
129            .map_err(|e| anyhow::anyhow!("Registry mutex poisoned: {e}"))?;
130
131        if let Some((client, ref_count)) = guard.get_mut(&key) {
132            *ref_count += 1;
133            let ref_count_val = *ref_count;
134            let client = Arc::clone(client);
135            (Some(client), ref_count_val)
136        } else {
137            (None, 0)
138        }
139    };
140
141    if let Some(client) = reuse_client {
142        log::debug!(
143            "Reusing shared IB client (host={}, port={}, client_id={}, ref_count={})",
144            host,
145            port,
146            client_id,
147            ref_count_val
148        );
149        return Ok(SharedClientHandle::new(client, registry, key));
150    }
151
152    let address = format!("{host}:{port}");
153    let connect_timeout = Duration::from_secs(connection_timeout_secs);
154    log::debug!(
155        "No shared IB client found, establishing new connection to {} with timeout {:?}",
156        address,
157        connect_timeout
158    );
159    let client = tokio::time::timeout(connect_timeout, Client::connect(&address, client_id))
160        .await
161        .map_err(|_| {
162            anyhow::anyhow!(
163                "Timed out connecting to IB Gateway/TWS after {}s",
164                connection_timeout_secs
165            )
166        })?
167        .context("Failed to connect to IB Gateway/TWS")?;
168    let client = Arc::new(client);
169
170    {
171        let mut guard = registry
172            .lock()
173            .map_err(|e| anyhow::anyhow!("Registry mutex poisoned: {e}"))?;
174        log::debug!(
175            "Registering shared IB client in registry (host={}, port={}, client_id={})",
176            host,
177            port,
178            client_id
179        );
180        guard.insert(key.clone(), (Arc::clone(&client), 1));
181    }
182
183    tracing::info!(
184        "Registered new shared IB client (host={}, port={}, client_id={})",
185        host,
186        port,
187        client_id
188    );
189
190    Ok(SharedClientHandle::new(client, registry, key))
191}