Skip to main content

nautilus_interactive_brokers/common/
connection.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Connection management utilities for Interactive Brokers adapter.
17
18use std::{
19    fmt::Debug,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, AtomicU32, Ordering},
23    },
24    time::Duration,
25};
26
27use anyhow::Context;
28use ibapi::client::Client;
29use nautilus_common::live::get_runtime;
30
31/// Connection manager for Interactive Brokers clients.
32///
33/// Handles automatic reconnection with exponential backoff, connection monitoring,
34/// and subscription resubscription on reconnect.
35#[derive(Debug, Clone)]
36pub struct ConnectionManager {
37    /// Host address for IB Gateway/TWS.
38    host: String,
39    /// Port for IB Gateway/TWS.
40    port: u16,
41    /// Client ID.
42    client_id: i32,
43    /// Current connection state.
44    is_connected: Arc<AtomicBool>,
45    /// Connection attempt counter.
46    attempt_count: Arc<AtomicU32>,
47    /// Maximum connection attempts (0 = infinite).
48    max_attempts: u32,
49    /// Whether to retry indefinitely.
50    retry_indefinitely: bool,
51    /// Current backoff duration.
52    current_backoff: Arc<std::sync::Mutex<Duration>>,
53    /// Last disconnection time.
54    last_disconnection: Arc<std::sync::Mutex<Option<tokio::time::Instant>>>,
55}
56
57impl ConnectionManager {
58    /// Create a new connection manager.
59    ///
60    /// # Arguments
61    ///
62    /// * `host` - Host address
63    /// * `port` - Port number
64    /// * `client_id` - Client ID
65    /// * `max_attempts` - Maximum connection attempts (0 = infinite)
66    pub fn new(host: String, port: u16, client_id: i32, max_attempts: u32) -> Self {
67        Self {
68            host,
69            port,
70            client_id,
71            is_connected: Arc::new(AtomicBool::new(false)),
72            attempt_count: Arc::new(AtomicU32::new(0)),
73            max_attempts,
74            retry_indefinitely: max_attempts == 0,
75            current_backoff: Arc::new(std::sync::Mutex::new(Duration::from_secs(1))),
76            last_disconnection: Arc::new(std::sync::Mutex::new(None)),
77        }
78    }
79
80    /// Connect to IB Gateway/TWS with automatic retry.
81    ///
82    /// # Returns
83    ///
84    /// Returns the connected client on success.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if connection fails after max attempts.
89    ///
90    /// # Panics
91    ///
92    /// Panics if the mutex is poisoned.
93    pub async fn connect_with_retry(&self) -> anyhow::Result<Arc<Client>> {
94        const MAX_BACKOFF: Duration = Duration::from_secs(60);
95        let mut attempt = 0;
96        let mut backoff = Duration::from_secs(1);
97
98        loop {
99            attempt += 1;
100            self.attempt_count.store(attempt, Ordering::Relaxed);
101
102            if !self.retry_indefinitely && attempt > self.max_attempts {
103                anyhow::bail!("Failed to connect after {} attempts", self.max_attempts);
104            }
105
106            tracing::info!(
107                "Connection attempt {} to {}:{} (client_id: {})",
108                attempt,
109                self.host,
110                self.port,
111                self.client_id
112            );
113
114            let address = format!("{}:{}", self.host, self.port);
115            match Client::connect(&address, self.client_id).await {
116                Ok(client) => {
117                    tracing::info!(
118                        "Successfully connected to IB Gateway/TWS at {} (client_id: {})",
119                        address,
120                        self.client_id
121                    );
122
123                    self.is_connected.store(true, Ordering::Relaxed);
124                    self.attempt_count.store(0, Ordering::Relaxed);
125                    *self.current_backoff.lock().unwrap() = Duration::from_secs(1);
126
127                    return Ok(Arc::new(client));
128                }
129                Err(e) => {
130                    tracing::warn!(
131                        "Connection attempt {} failed: {} (backoff: {:?})",
132                        attempt,
133                        e,
134                        backoff
135                    );
136
137                    if !self.retry_indefinitely && attempt >= self.max_attempts {
138                        return Err(e).context(format!(
139                            "Failed to connect after {} attempts",
140                            self.max_attempts
141                        ));
142                    }
143
144                    // Exponential backoff
145                    tokio::time::sleep(backoff).await;
146                    backoff = std::cmp::min(backoff * 2, MAX_BACKOFF);
147                    *self.current_backoff.lock().unwrap() = backoff;
148                }
149            }
150        }
151    }
152
153    /// Check if currently connected.
154    pub fn is_connected(&self) -> bool {
155        self.is_connected.load(Ordering::Relaxed)
156    }
157
158    /// Mark connection as disconnected.
159    ///
160    /// # Panics
161    ///
162    /// Panics if the mutex is poisoned.
163    pub fn mark_disconnected(&self) {
164        self.is_connected.store(false, Ordering::Relaxed);
165        *self.last_disconnection.lock().unwrap() = Some(tokio::time::Instant::now());
166    }
167
168    /// Get current attempt count.
169    pub fn attempt_count(&self) -> u32 {
170        self.attempt_count.load(Ordering::Relaxed)
171    }
172
173    /// Get current backoff duration.
174    ///
175    /// # Panics
176    ///
177    /// Panics if the mutex is poisoned.
178    pub fn current_backoff(&self) -> Duration {
179        *self.current_backoff.lock().unwrap()
180    }
181
182    /// Get time since last disconnection.
183    ///
184    /// # Panics
185    ///
186    /// Panics if the mutex is poisoned.
187    pub fn time_since_disconnection(&self) -> Option<Duration> {
188        self.last_disconnection
189            .lock()
190            .unwrap()
191            .map(|time: tokio::time::Instant| time.elapsed())
192    }
193}
194
195/// Connection watchdog for monitoring connection health.
196///
197/// Periodically checks connection status and triggers reconnection if needed.
198#[derive(Clone)]
199pub struct ConnectionWatchdog {
200    /// Connection manager.
201    manager: Arc<ConnectionManager>,
202    /// Check interval.
203    check_interval: Duration,
204    /// Client reference (for health checks).
205    client: Arc<std::sync::Mutex<Option<Arc<Client>>>>,
206    /// Callback to call when reconnection is needed.
207    reconnect_callback: Arc<
208        dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<Arc<Client>>> + Send + Sync + 'static,
209    >,
210    /// Whether the watchdog is running.
211    is_running: Arc<AtomicBool>,
212}
213
214impl Debug for ConnectionWatchdog {
215    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216        f.debug_struct(stringify!(ConnectionWatchdog))
217            .field("check_interval", &self.check_interval)
218            .field("is_running", &self.is_running.load(Ordering::Relaxed))
219            .finish_non_exhaustive()
220    }
221}
222
223impl ConnectionWatchdog {
224    /// Create a new connection watchdog.
225    ///
226    /// # Arguments
227    ///
228    /// * `manager` - Connection manager
229    /// * `check_interval` - Interval between health checks
230    /// * `reconnect_callback` - Callback to trigger reconnection
231    pub fn new(
232        manager: Arc<ConnectionManager>,
233        check_interval: Duration,
234        reconnect_callback: Arc<
235            dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<Arc<Client>>> + Send + Sync,
236        >,
237    ) -> Self {
238        Self {
239            manager,
240            check_interval,
241            client: Arc::new(std::sync::Mutex::new(None)),
242            reconnect_callback,
243            is_running: Arc::new(AtomicBool::new(false)),
244        }
245    }
246
247    /// Set the client reference for health checks.
248    ///
249    /// # Panics
250    ///
251    /// Panics if the mutex is poisoned.
252    pub fn set_client(&self, client: Arc<Client>) {
253        *self.client.lock().unwrap() = Some(client);
254    }
255
256    /// Start the watchdog.
257    ///
258    /// # Panics
259    ///
260    /// Panics if a mutex is poisoned.
261    pub fn start(&self) -> tokio::task::JoinHandle<()> {
262        let manager = Arc::clone(&self.manager);
263        let client = Arc::clone(&self.client);
264        let reconnect_callback = Arc::clone(&self.reconnect_callback);
265        let check_interval = self.check_interval;
266        let is_running = Arc::clone(&self.is_running);
267
268        is_running.store(true, Ordering::Relaxed);
269
270        get_runtime().spawn(async move {
271            tracing::info!("Connection watchdog started");
272
273            while is_running.load(Ordering::Relaxed) {
274                tokio::time::sleep(check_interval).await;
275
276                if !manager.is_connected() {
277                    tracing::warn!(
278                        "Connection watchdog detected disconnection, triggering reconnection"
279                    );
280
281                    // Trigger reconnection
282                    let handle = reconnect_callback();
283
284                    // Wait for reconnection attempt
285                    match handle.await {
286                        Ok(Ok(new_client)) => {
287                            tracing::info!("Reconnection successful via watchdog");
288                            *client.lock().unwrap() = Some(new_client);
289                            manager.is_connected.store(true, Ordering::Relaxed);
290                        }
291                        Ok(Err(e)) => {
292                            tracing::error!("Reconnection failed via watchdog: {}", e);
293                        }
294                        Err(e) => {
295                            tracing::error!("Reconnection task panicked: {}", e);
296                        }
297                    }
298                }
299            }
300
301            tracing::info!("Connection watchdog stopped");
302        })
303    }
304
305    /// Stop the watchdog.
306    pub fn stop(&self) {
307        self.is_running.store(false, Ordering::Relaxed);
308    }
309}