nautilus_interactive_brokers/common/
connection.rs1use std::{
19 fmt::Debug,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, AtomicU32, Ordering},
23 },
24 time::Duration,
25};
26
27use anyhow::Context;
28use ibapi::client::Client;
29use nautilus_common::live::get_runtime;
30
31#[derive(Debug, Clone)]
36pub struct ConnectionManager {
37 host: String,
39 port: u16,
41 client_id: i32,
43 is_connected: Arc<AtomicBool>,
45 attempt_count: Arc<AtomicU32>,
47 max_attempts: u32,
49 retry_indefinitely: bool,
51 current_backoff: Arc<std::sync::Mutex<Duration>>,
53 last_disconnection: Arc<std::sync::Mutex<Option<tokio::time::Instant>>>,
55}
56
57impl ConnectionManager {
58 pub fn new(host: String, port: u16, client_id: i32, max_attempts: u32) -> Self {
67 Self {
68 host,
69 port,
70 client_id,
71 is_connected: Arc::new(AtomicBool::new(false)),
72 attempt_count: Arc::new(AtomicU32::new(0)),
73 max_attempts,
74 retry_indefinitely: max_attempts == 0,
75 current_backoff: Arc::new(std::sync::Mutex::new(Duration::from_secs(1))),
76 last_disconnection: Arc::new(std::sync::Mutex::new(None)),
77 }
78 }
79
80 pub async fn connect_with_retry(&self) -> anyhow::Result<Arc<Client>> {
94 const MAX_BACKOFF: Duration = Duration::from_secs(60);
95 let mut attempt = 0;
96 let mut backoff = Duration::from_secs(1);
97
98 loop {
99 attempt += 1;
100 self.attempt_count.store(attempt, Ordering::Relaxed);
101
102 if !self.retry_indefinitely && attempt > self.max_attempts {
103 anyhow::bail!("Failed to connect after {} attempts", self.max_attempts);
104 }
105
106 tracing::info!(
107 "Connection attempt {} to {}:{} (client_id: {})",
108 attempt,
109 self.host,
110 self.port,
111 self.client_id
112 );
113
114 let address = format!("{}:{}", self.host, self.port);
115 match Client::connect(&address, self.client_id).await {
116 Ok(client) => {
117 tracing::info!(
118 "Successfully connected to IB Gateway/TWS at {} (client_id: {})",
119 address,
120 self.client_id
121 );
122
123 self.is_connected.store(true, Ordering::Relaxed);
124 self.attempt_count.store(0, Ordering::Relaxed);
125 *self.current_backoff.lock().unwrap() = Duration::from_secs(1);
126
127 return Ok(Arc::new(client));
128 }
129 Err(e) => {
130 tracing::warn!(
131 "Connection attempt {} failed: {} (backoff: {:?})",
132 attempt,
133 e,
134 backoff
135 );
136
137 if !self.retry_indefinitely && attempt >= self.max_attempts {
138 return Err(e).context(format!(
139 "Failed to connect after {} attempts",
140 self.max_attempts
141 ));
142 }
143
144 tokio::time::sleep(backoff).await;
146 backoff = std::cmp::min(backoff * 2, MAX_BACKOFF);
147 *self.current_backoff.lock().unwrap() = backoff;
148 }
149 }
150 }
151 }
152
153 pub fn is_connected(&self) -> bool {
155 self.is_connected.load(Ordering::Relaxed)
156 }
157
158 pub fn mark_disconnected(&self) {
164 self.is_connected.store(false, Ordering::Relaxed);
165 *self.last_disconnection.lock().unwrap() = Some(tokio::time::Instant::now());
166 }
167
168 pub fn attempt_count(&self) -> u32 {
170 self.attempt_count.load(Ordering::Relaxed)
171 }
172
173 pub fn current_backoff(&self) -> Duration {
179 *self.current_backoff.lock().unwrap()
180 }
181
182 pub fn time_since_disconnection(&self) -> Option<Duration> {
188 self.last_disconnection
189 .lock()
190 .unwrap()
191 .map(|time: tokio::time::Instant| time.elapsed())
192 }
193}
194
195#[derive(Clone)]
199pub struct ConnectionWatchdog {
200 manager: Arc<ConnectionManager>,
202 check_interval: Duration,
204 client: Arc<std::sync::Mutex<Option<Arc<Client>>>>,
206 reconnect_callback: Arc<
208 dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<Arc<Client>>> + Send + Sync + 'static,
209 >,
210 is_running: Arc<AtomicBool>,
212}
213
214impl Debug for ConnectionWatchdog {
215 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216 f.debug_struct(stringify!(ConnectionWatchdog))
217 .field("check_interval", &self.check_interval)
218 .field("is_running", &self.is_running.load(Ordering::Relaxed))
219 .finish_non_exhaustive()
220 }
221}
222
223impl ConnectionWatchdog {
224 pub fn new(
232 manager: Arc<ConnectionManager>,
233 check_interval: Duration,
234 reconnect_callback: Arc<
235 dyn Fn() -> tokio::task::JoinHandle<anyhow::Result<Arc<Client>>> + Send + Sync,
236 >,
237 ) -> Self {
238 Self {
239 manager,
240 check_interval,
241 client: Arc::new(std::sync::Mutex::new(None)),
242 reconnect_callback,
243 is_running: Arc::new(AtomicBool::new(false)),
244 }
245 }
246
247 pub fn set_client(&self, client: Arc<Client>) {
253 *self.client.lock().unwrap() = Some(client);
254 }
255
256 pub fn start(&self) -> tokio::task::JoinHandle<()> {
262 let manager = Arc::clone(&self.manager);
263 let client = Arc::clone(&self.client);
264 let reconnect_callback = Arc::clone(&self.reconnect_callback);
265 let check_interval = self.check_interval;
266 let is_running = Arc::clone(&self.is_running);
267
268 is_running.store(true, Ordering::Relaxed);
269
270 get_runtime().spawn(async move {
271 tracing::info!("Connection watchdog started");
272
273 while is_running.load(Ordering::Relaxed) {
274 tokio::time::sleep(check_interval).await;
275
276 if !manager.is_connected() {
277 tracing::warn!(
278 "Connection watchdog detected disconnection, triggering reconnection"
279 );
280
281 let handle = reconnect_callback();
283
284 match handle.await {
286 Ok(Ok(new_client)) => {
287 tracing::info!("Reconnection successful via watchdog");
288 *client.lock().unwrap() = Some(new_client);
289 manager.is_connected.store(true, Ordering::Relaxed);
290 }
291 Ok(Err(e)) => {
292 tracing::error!("Reconnection failed via watchdog: {}", e);
293 }
294 Err(e) => {
295 tracing::error!("Reconnection task panicked: {}", e);
296 }
297 }
298 }
299 }
300
301 tracing::info!("Connection watchdog stopped");
302 })
303 }
304
305 pub fn stop(&self) {
307 self.is_running.store(false, Ordering::Relaxed);
308 }
309}