nautilus_interactive_brokers/common/
shared_client.rs1use std::{
23 collections::HashMap,
24 fmt::Debug,
25 ops::Deref,
26 sync::{Arc, LazyLock, Mutex},
27 time::Duration,
28};
29
30use anyhow::Context;
31use ibapi::client::Client;
32
33#[derive(Clone, Debug, Eq, Hash, PartialEq)]
35struct ConnectionKey(String, u16, i32);
36
37type RegistryMap = HashMap<ConnectionKey, (Arc<Client>, u32)>;
39
40static REGISTRY: LazyLock<Arc<Mutex<RegistryMap>>> =
42 LazyLock::new(|| Arc::new(Mutex::new(HashMap::new())));
43
44pub struct SharedClientHandle {
47 client: Arc<Client>,
48 registry: Arc<Mutex<RegistryMap>>,
49 key: ConnectionKey,
50}
51
52impl Debug for SharedClientHandle {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct(stringify!(SharedClientHandle))
55 .field("key", &self.key)
56 .finish_non_exhaustive()
57 }
58}
59
60impl SharedClientHandle {
61 fn new(client: Arc<Client>, registry: Arc<Mutex<RegistryMap>>, key: ConnectionKey) -> Self {
62 Self {
63 client,
64 registry,
65 key,
66 }
67 }
68
69 pub fn as_arc(&self) -> &Arc<Client> {
71 &self.client
72 }
73}
74
75impl Deref for SharedClientHandle {
76 type Target = Client;
77
78 fn deref(&self) -> &Self::Target {
79 self.client.as_ref()
80 }
81}
82
83impl Drop for SharedClientHandle {
84 fn drop(&mut self) {
85 if let Ok(mut guard) = self.registry.lock()
86 && let Some((_, ref_count)) = guard.get_mut(&self.key)
87 {
88 *ref_count = ref_count.saturating_sub(1);
89 if *ref_count == 0 {
90 guard.remove(&self.key);
91 tracing::debug!(
92 "Shared IB client removed from registry (host={}, port={}, client_id={})",
93 self.key.0,
94 self.key.1,
95 self.key.2
96 );
97 }
98 }
99 }
100}
101
102pub async fn get_or_connect(
110 host: &str,
111 port: u16,
112 client_id: i32,
113 connection_timeout_secs: u64,
114) -> anyhow::Result<SharedClientHandle> {
115 let key = ConnectionKey(host.to_string(), port, client_id);
116 let registry = Arc::clone(®ISTRY);
117
118 log::debug!(
119 "Acquiring shared IB client (host={}, port={}, client_id={}, timeout_secs={})",
120 host,
121 port,
122 client_id,
123 connection_timeout_secs
124 );
125
126 let (reuse_client, ref_count_val) = {
127 let mut guard = registry
128 .lock()
129 .map_err(|e| anyhow::anyhow!("Registry mutex poisoned: {e}"))?;
130
131 if let Some((client, ref_count)) = guard.get_mut(&key) {
132 *ref_count += 1;
133 let ref_count_val = *ref_count;
134 let client = Arc::clone(client);
135 (Some(client), ref_count_val)
136 } else {
137 (None, 0)
138 }
139 };
140
141 if let Some(client) = reuse_client {
142 log::debug!(
143 "Reusing shared IB client (host={}, port={}, client_id={}, ref_count={})",
144 host,
145 port,
146 client_id,
147 ref_count_val
148 );
149 return Ok(SharedClientHandle::new(client, registry, key));
150 }
151
152 let address = format!("{host}:{port}");
153 let connect_timeout = Duration::from_secs(connection_timeout_secs);
154 log::debug!(
155 "No shared IB client found, establishing new connection to {} with timeout {:?}",
156 address,
157 connect_timeout
158 );
159 let client = tokio::time::timeout(connect_timeout, Client::connect(&address, client_id))
160 .await
161 .map_err(|_| {
162 anyhow::anyhow!(
163 "Timed out connecting to IB Gateway/TWS after {}s",
164 connection_timeout_secs
165 )
166 })?
167 .context("Failed to connect to IB Gateway/TWS")?;
168 let client = Arc::new(client);
169
170 {
171 let mut guard = registry
172 .lock()
173 .map_err(|e| anyhow::anyhow!("Registry mutex poisoned: {e}"))?;
174 log::debug!(
175 "Registering shared IB client in registry (host={}, port={}, client_id={})",
176 host,
177 port,
178 client_id
179 );
180 guard.insert(key.clone(), (Arc::clone(&client), 1));
181 }
182
183 tracing::info!(
184 "Registered new shared IB client (host={}, port={}, client_id={})",
185 host,
186 port,
187 client_id
188 );
189
190 Ok(SharedClientHandle::new(client, registry, key))
191}