nautilus_interactive_brokers/gateway/
dockerized.rs1#[cfg(feature = "gateway")]
19use std::{collections::HashMap, fmt::Debug, time::Duration};
20
21#[cfg(feature = "gateway")]
22use anyhow::Context;
23#[cfg(feature = "gateway")]
24use bollard::Docker;
25#[cfg(feature = "gateway")]
26use bollard::container::{
27 Config, CreateContainerOptions, LogOutput, LogsOptions, RemoveContainerOptions,
28};
29#[cfg(feature = "gateway")]
30use bollard::models::{
31 ContainerCreateResponse, HostConfig, PortBinding, RestartPolicy, RestartPolicyNameEnum,
32};
33#[cfg(feature = "gateway")]
34use futures_util::StreamExt;
35#[cfg(feature = "gateway")]
36use serde::{Deserialize, Serialize};
37
38#[cfg(feature = "gateway")]
39use crate::config::DockerizedIBGatewayConfig;
40#[cfg(feature = "gateway")]
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44#[cfg_attr(
45 feature = "python",
46 pyo3::pyclass(
47 module = "nautilus_trader.core.nautilus_pyo3.interactive_brokers",
48 from_py_object
49 )
50)]
51pub enum ContainerStatus {
52 NoContainer = 1,
54 ContainerCreated = 2,
56 ContainerStarting = 3,
58 ContainerStopped = 4,
60 NotLoggedIn = 5,
62 Ready = 6,
64 Unknown = 7,
66}
67
68#[derive(Clone)]
73#[cfg_attr(
74 feature = "python",
75 pyo3::pyclass(
76 module = "nautilus_trader.core.nautilus_pyo3.interactive_brokers",
77 from_py_object
78 )
79)]
80#[cfg(feature = "gateway")]
81pub struct DockerizedIBGateway {
82 config: DockerizedIBGatewayConfig,
84 pub(crate) docker: Docker,
86 username: String,
88 password: String,
90 host: String,
92 port: u16,
94 container_name: String,
96}
97
98#[cfg(feature = "gateway")]
99impl Debug for DockerizedIBGateway {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.debug_struct(stringify!(DockerizedIBGateway))
102 .field("host", &self.host)
103 .field("port", &self.port)
104 .field("container_name", &self.container_name)
105 .field("trading_mode", &self.config.trading_mode)
106 .finish_non_exhaustive()
107 }
108}
109
110#[cfg(feature = "gateway")]
111impl DockerizedIBGateway {
112 pub const CONTAINER_NAME: &'static str = "nautilus-ib-gateway";
114
115 pub const HOST_PORTS: &'static [(&'static str, u16)] = &[("Paper", 4002), ("Live", 4001)];
117
118 pub const CONTAINER_PORTS: &'static [(&'static str, u16)] = &[("Paper", 4004), ("Live", 4003)];
120
121 pub const VNC_PORT_INTERNAL: u16 = 5900;
123
124 fn host_port_for_mode(trading_mode: crate::config::TradingMode) -> u16 {
125 match trading_mode {
126 crate::config::TradingMode::Paper => 4002,
127 crate::config::TradingMode::Live => 4001,
128 }
129 }
130
131 fn container_port_for_mode(trading_mode: crate::config::TradingMode) -> u16 {
132 match trading_mode {
133 crate::config::TradingMode::Paper => 4004,
134 crate::config::TradingMode::Live => 4003,
135 }
136 }
137
138 fn logs_indicate_ready(logs: &str) -> bool {
139 logs.contains("Login has completed")
140 || logs.contains("Configuration tasks completed")
141 || logs.contains("Logged in to")
142 || logs.contains("Login successful")
143 }
144
145 pub fn new(config: DockerizedIBGatewayConfig) -> anyhow::Result<Self> {
157 let username = config
159 .username
160 .clone()
161 .or_else(|| std::env::var("TWS_USERNAME").ok())
162 .ok_or_else(|| anyhow::anyhow!("username not set nor available in env TWS_USERNAME"))?;
163
164 let password = config
166 .password
167 .clone()
168 .or_else(|| std::env::var("TWS_PASSWORD").ok())
169 .ok_or_else(|| anyhow::anyhow!("password not set nor available in env TWS_PASSWORD"))?;
170
171 let docker = Docker::connect_with_local_defaults().context(
173 "Failed to connect to the local Docker daemon. Ensure Docker is running and the local Docker socket is available",
174 )?;
175
176 let mode_str = match config.trading_mode {
178 crate::config::TradingMode::Paper => "Paper",
179 crate::config::TradingMode::Live => "Live",
180 };
181 let port = Self::host_port_for_mode(config.trading_mode);
182
183 let container_name = format!("{}-{}", Self::CONTAINER_NAME, mode_str).to_lowercase();
185
186 Ok(Self {
187 config,
188 docker,
189 username,
190 password,
191 host: "127.0.0.1".to_string(),
192 port,
193 container_name,
194 })
195 }
196
197 pub fn container_name(&self) -> &str {
199 &self.container_name
200 }
201
202 pub fn host(&self) -> &str {
204 &self.host
205 }
206
207 pub fn port(&self) -> u16 {
209 self.port
210 }
211
212 pub async fn is_logged_in(&self, container_id: &str) -> anyhow::Result<bool> {
222 let logs_options = LogsOptions::<String> {
223 stdout: true,
224 stderr: true,
225 ..Default::default()
226 };
227
228 let mut logs_stream = self.docker.logs(container_id, Some(logs_options));
229
230 let mut logged_in = false;
231
232 while let Some(log_result) = logs_stream.next().await {
233 let log_output = log_result.context("Failed to read log chunk")?;
234 let log_bytes = match log_output {
236 LogOutput::StdOut { message } | LogOutput::StdErr { message } => message,
237 LogOutput::StdIn { message } | LogOutput::Console { message } => message,
238 };
239 let log_string = String::from_utf8_lossy(&log_bytes);
240 if Self::logs_indicate_ready(&log_string) {
241 logged_in = true;
242 break;
243 }
244 }
245
246 Ok(logged_in)
247 }
248
249 pub async fn container_status(&self) -> anyhow::Result<ContainerStatus> {
255 let list_options = bollard::container::ListContainersOptions::<String> {
256 all: true,
257 ..Default::default()
258 };
259 let containers = self
260 .docker
261 .list_containers(Some(list_options))
262 .await
263 .context("Failed to list containers")?;
264
265 let container = containers.iter().find(|c| {
266 c.names
267 .as_ref()
268 .and_then(|names| names.first())
269 .map(|name| name.trim_start_matches('/') == self.container_name)
270 .unwrap_or(false)
271 });
272
273 let Some(container) = container else {
274 return Ok(ContainerStatus::NoContainer);
275 };
276
277 let state = container.state.as_deref().unwrap_or("unknown");
278
279 match state {
280 "running" => {
281 let container_id = container
282 .id
283 .as_ref()
284 .ok_or_else(|| anyhow::anyhow!("Container ID missing"))?;
285
286 if self.is_logged_in(container_id).await.unwrap_or(false) {
287 Ok(ContainerStatus::Ready)
288 } else {
289 Ok(ContainerStatus::ContainerStarting)
290 }
291 }
292 "stopped" | "exited" => Ok(ContainerStatus::ContainerStopped),
293 "created" => Ok(ContainerStatus::ContainerCreated),
294 _ => Ok(ContainerStatus::Unknown),
295 }
296 }
297
298 pub async fn start(&mut self, wait: Option<u64>) -> anyhow::Result<()> {
308 tracing::info!("Ensuring gateway is running");
309
310 let status = self.container_status().await?;
311
312 let broken_statuses = [
313 ContainerStatus::NotLoggedIn,
314 ContainerStatus::ContainerStopped,
315 ContainerStatus::ContainerCreated,
316 ContainerStatus::Unknown,
317 ];
318
319 match status {
320 ContainerStatus::NoContainer => {
321 tracing::debug!("No container, starting");
322 }
323 status if broken_statuses.contains(&status) => {
324 tracing::debug!("Status {:?}, removing existing container", status);
325 self.stop().await?;
326 }
327 ContainerStatus::Ready | ContainerStatus::ContainerStarting => {
328 tracing::info!("Status {:?}, using existing container", status);
329 return Ok(());
330 }
331 _ => {}
332 }
333
334 tracing::debug!("Starting new container");
335
336 let host_port = Self::host_port_for_mode(self.config.trading_mode);
338 let container_port = Self::container_port_for_mode(self.config.trading_mode);
339
340 let mut port_bindings = HashMap::new();
341 port_bindings.insert(
342 format!("{}/tcp", container_port),
343 Some(vec![PortBinding {
344 host_ip: Some(self.host.clone()),
345 host_port: Some(host_port.to_string()),
346 }]),
347 );
348
349 if let Some(vnc_port) = self.config.vnc_port {
350 port_bindings.insert(
351 format!("{}/tcp", Self::VNC_PORT_INTERNAL),
352 Some(vec![PortBinding {
353 host_ip: Some(self.host.clone()),
354 host_port: Some(vnc_port.to_string()),
355 }]),
356 );
357 }
358
359 let mode_str = match self.config.trading_mode {
361 crate::config::TradingMode::Paper => "paper",
362 crate::config::TradingMode::Live => "live",
363 };
364 let env = vec![
365 format!("TWS_USERID={}", self.username),
366 format!("TWS_PASSWORD={}", self.password),
367 format!("TRADING_MODE={}", mode_str),
368 format!(
369 "READ_ONLY_API={}",
370 if self.config.read_only_api {
371 "yes"
372 } else {
373 "no"
374 }
375 ),
376 "EXISTING_SESSION_DETECTED_ACTION=primary".to_string(),
377 ];
378
379 let container_config = Config {
381 image: Some(self.config.container_image.clone()),
382 hostname: Some(self.container_name.clone()),
383 host_config: Some(HostConfig {
384 port_bindings: Some(port_bindings),
385 restart_policy: Some(RestartPolicy {
386 name: Some(RestartPolicyNameEnum::ALWAYS),
387 maximum_retry_count: None,
388 }),
389 ..Default::default()
390 }),
391 env: Some(env),
392 ..Default::default()
393 };
394
395 let create_options = CreateContainerOptions {
397 name: self.container_name.clone(),
398 platform: None,
399 };
400
401 let create_response: ContainerCreateResponse = self
402 .docker
403 .create_container(Some(create_options), container_config)
404 .await
405 .context("Failed to create container")?;
406
407 let container_id = create_response.id;
408
409 self.docker
411 .start_container(
412 &container_id,
413 None::<bollard::container::StartContainerOptions<String>>,
414 )
415 .await
416 .context("Failed to start container")?;
417
418 tracing::info!(
419 "Container `{}` starting, waiting for ready",
420 self.container_name
421 );
422
423 let wait_time = wait.unwrap_or(self.config.timeout);
425 let mut waited = 0u64;
426
427 while waited < wait_time {
428 if self.is_logged_in(&container_id).await.unwrap_or(false) {
429 tracing::info!(
430 "Gateway `{}` ready. VNC port is {:?}",
431 self.container_name,
432 self.config.vnc_port
433 );
434 return Ok(());
435 }
436
437 tracing::debug!("Waiting for IB Gateway to start");
438 tokio::time::sleep(Duration::from_secs(1)).await;
439 waited += 1;
440 }
441
442 anyhow::bail!(
443 "Gateway `{}` not ready after {} seconds",
444 self.container_name,
445 wait_time
446 )
447 }
448
449 pub async fn safe_start(&mut self, wait: Option<u64>) -> anyhow::Result<()> {
459 match self.start(wait).await {
460 Ok(()) => Ok(()),
461 Err(e) if e.to_string().contains("already exists") => {
462 tracing::warn!("Container already exists, continuing");
463 Ok(())
464 }
465 Err(e) => Err(e),
466 }
467 }
468
469 pub async fn stop(&self) -> anyhow::Result<()> {
475 let list_options = bollard::container::ListContainersOptions::<String> {
476 all: true,
477 ..Default::default()
478 };
479 let containers = self
480 .docker
481 .list_containers(Some(list_options))
482 .await
483 .context("Failed to list containers")?;
484
485 let container = containers.iter().find(|c| {
486 c.names
487 .as_ref()
488 .and_then(|names| names.first())
489 .map(|name| name.trim_start_matches('/') == self.container_name)
490 .unwrap_or(false)
491 });
492
493 if let Some(container) = container {
494 if let Some(container_id) = &container.id {
495 if container.state.as_deref() == Some("running") {
497 self.docker
498 .stop_container(
499 container_id,
500 None::<bollard::container::StopContainerOptions>,
501 )
502 .await
503 .context("Failed to stop container")?;
504 }
505
506 let remove_options = RemoveContainerOptions {
508 force: true,
509 ..Default::default()
510 };
511
512 self.docker
513 .remove_container(container_id, Some(remove_options))
514 .await
515 .context("Failed to remove container")?;
516
517 tracing::info!("Stopped and removed container `{}`", self.container_name);
518 }
519 }
520
521 Ok(())
522 }
523}
524
525#[cfg(not(feature = "gateway"))]
527#[derive(Debug)]
528pub struct DockerizedIBGateway;
529
530#[cfg(not(feature = "gateway"))]
531impl DockerizedIBGateway {
532 pub fn new(_config: crate::config::DockerizedIBGatewayConfig) -> anyhow::Result<Self> {
536 anyhow::bail!("Gateway feature is not enabled. Build with --features gateway")
537 }
538}
539
540#[cfg(all(test, feature = "gateway"))]
541mod tests {
542 use rstest::rstest;
543
544 use super::DockerizedIBGateway;
545 use crate::config::TradingMode;
546
547 #[rstest]
548 #[case(TradingMode::Paper, 4002)]
549 #[case(TradingMode::Live, 4001)]
550 fn host_port_matches_trading_mode(#[case] trading_mode: TradingMode, #[case] expected: u16) {
551 assert_eq!(
552 DockerizedIBGateway::host_port_for_mode(trading_mode),
553 expected
554 );
555 }
556
557 #[rstest]
558 #[case(TradingMode::Paper, 4004)]
559 #[case(TradingMode::Live, 4003)]
560 fn container_port_matches_trading_mode(
561 #[case] trading_mode: TradingMode,
562 #[case] expected: u16,
563 ) {
564 assert_eq!(
565 DockerizedIBGateway::container_port_for_mode(trading_mode),
566 expected
567 );
568 }
569
570 #[rstest]
571 #[case(TradingMode::Paper, 4002)]
572 #[case(TradingMode::Live, 4001)]
573 fn new_reports_the_host_api_port(#[case] trading_mode: TradingMode, #[case] expected: u16) {
574 let gateway = DockerizedIBGateway::new(
575 crate::config::DockerizedIBGatewayConfig::builder()
576 .username("test-user".to_string())
577 .password("test-password".to_string())
578 .trading_mode(trading_mode)
579 .build(),
580 )
581 .unwrap();
582
583 assert_eq!(gateway.port(), expected);
584 }
585
586 #[rstest]
587 #[case("Forking ::: Starting IBC Gateway", false)]
588 #[case("Started IB Gateway", false)]
589 #[case("Login has completed", true)]
590 #[case("Configuration tasks completed", true)]
591 #[case("Logged in to backend", true)]
592 #[case("Login successful", true)]
593 fn ready_log_markers_are_strict(#[case] logs: &str, #[case] expected: bool) {
594 assert_eq!(DockerizedIBGateway::logs_indicate_ready(logs), expected);
595 }
596}