Skip to main content

nautilus_interactive_brokers/gateway/
dockerized.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Dockerized IB Gateway management.
17
18#[cfg(feature = "gateway")]
19use std::{collections::HashMap, fmt::Debug, time::Duration};
20
21#[cfg(feature = "gateway")]
22use anyhow::Context;
23#[cfg(feature = "gateway")]
24use bollard::Docker;
25#[cfg(feature = "gateway")]
26use bollard::container::{
27    Config, CreateContainerOptions, LogOutput, LogsOptions, RemoveContainerOptions,
28};
29#[cfg(feature = "gateway")]
30use bollard::models::{
31    ContainerCreateResponse, HostConfig, PortBinding, RestartPolicy, RestartPolicyNameEnum,
32};
33#[cfg(feature = "gateway")]
34use futures_util::StreamExt;
35#[cfg(feature = "gateway")]
36use serde::{Deserialize, Serialize};
37
38#[cfg(feature = "gateway")]
39use crate::config::DockerizedIBGatewayConfig;
40#[cfg(feature = "gateway")]
41
42/// Container status enumeration.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44#[cfg_attr(
45    feature = "python",
46    pyo3::pyclass(
47        module = "nautilus_trader.core.nautilus_pyo3.interactive_brokers",
48        from_py_object
49    )
50)]
51pub enum ContainerStatus {
52    /// No container exists.
53    NoContainer = 1,
54    /// Container has been created but not started.
55    ContainerCreated = 2,
56    /// Container is starting.
57    ContainerStarting = 3,
58    /// Container has stopped.
59    ContainerStopped = 4,
60    /// Container is running but not logged in.
61    NotLoggedIn = 5,
62    /// Container is ready (running and logged in).
63    Ready = 6,
64    /// Unknown container status.
65    Unknown = 7,
66}
67
68/// Dockerized IB Gateway manager.
69///
70/// This struct manages the lifecycle of Interactive Brokers Gateway Docker containers,
71/// including creation, starting, stopping, and status checking.
72#[derive(Clone)]
73#[cfg_attr(
74    feature = "python",
75    pyo3::pyclass(
76        module = "nautilus_trader.core.nautilus_pyo3.interactive_brokers",
77        from_py_object
78    )
79)]
80#[cfg(feature = "gateway")]
81pub struct DockerizedIBGateway {
82    /// Configuration for the gateway.
83    config: DockerizedIBGatewayConfig,
84    /// Docker client.
85    pub(crate) docker: Docker,
86    /// Username for IB account.
87    username: String,
88    /// Password for IB account.
89    password: String,
90    /// Host address (always 127.0.0.1).
91    host: String,
92    /// Port for the gateway.
93    port: u16,
94    /// Container name.
95    container_name: String,
96}
97
98#[cfg(feature = "gateway")]
99impl Debug for DockerizedIBGateway {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        f.debug_struct(stringify!(DockerizedIBGateway))
102            .field("host", &self.host)
103            .field("port", &self.port)
104            .field("container_name", &self.container_name)
105            .field("trading_mode", &self.config.trading_mode)
106            .finish_non_exhaustive()
107    }
108}
109
110#[cfg(feature = "gateway")]
111impl DockerizedIBGateway {
112    /// Base container name.
113    pub const CONTAINER_NAME: &'static str = "nautilus-ib-gateway";
114
115    /// Host API ports by trading mode.
116    pub const HOST_PORTS: &'static [(&'static str, u16)] = &[("Paper", 4002), ("Live", 4001)];
117
118    /// Container API ports exposed by the IB Gateway image.
119    pub const CONTAINER_PORTS: &'static [(&'static str, u16)] = &[("Paper", 4004), ("Live", 4003)];
120
121    /// Internal VNC port.
122    pub const VNC_PORT_INTERNAL: u16 = 5900;
123
124    fn host_port_for_mode(trading_mode: crate::config::TradingMode) -> u16 {
125        match trading_mode {
126            crate::config::TradingMode::Paper => 4002,
127            crate::config::TradingMode::Live => 4001,
128        }
129    }
130
131    fn container_port_for_mode(trading_mode: crate::config::TradingMode) -> u16 {
132        match trading_mode {
133            crate::config::TradingMode::Paper => 4004,
134            crate::config::TradingMode::Live => 4003,
135        }
136    }
137
138    fn logs_indicate_ready(logs: &str) -> bool {
139        logs.contains("Login has completed")
140            || logs.contains("Configuration tasks completed")
141            || logs.contains("Logged in to")
142            || logs.contains("Login successful")
143    }
144
145    /// Create a new DockerizedIBGateway from configuration.
146    ///
147    /// # Arguments
148    ///
149    /// * `config` - Configuration for the gateway
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if:
154    /// - Username or password is not provided and not available in environment variables
155    /// - Docker client creation fails
156    pub fn new(config: DockerizedIBGatewayConfig) -> anyhow::Result<Self> {
157        // Load username from config or environment (clone to avoid partial move)
158        let username = config
159            .username
160            .clone()
161            .or_else(|| std::env::var("TWS_USERNAME").ok())
162            .ok_or_else(|| anyhow::anyhow!("username not set nor available in env TWS_USERNAME"))?;
163
164        // Load password from config or environment (clone to avoid partial move)
165        let password = config
166            .password
167            .clone()
168            .or_else(|| std::env::var("TWS_PASSWORD").ok())
169            .ok_or_else(|| anyhow::anyhow!("password not set nor available in env TWS_PASSWORD"))?;
170
171        // Connect to Docker
172        let docker = Docker::connect_with_local_defaults().context(
173            "Failed to connect to the local Docker daemon. Ensure Docker is running and the local Docker socket is available",
174        )?;
175
176        // Determine port based on trading mode
177        let mode_str = match config.trading_mode {
178            crate::config::TradingMode::Paper => "Paper",
179            crate::config::TradingMode::Live => "Live",
180        };
181        let port = Self::host_port_for_mode(config.trading_mode);
182
183        // Generate container name
184        let container_name = format!("{}-{}", Self::CONTAINER_NAME, mode_str).to_lowercase();
185
186        Ok(Self {
187            config,
188            docker,
189            username,
190            password,
191            host: "127.0.0.1".to_string(),
192            port,
193            container_name,
194        })
195    }
196
197    /// Get the container name.
198    pub fn container_name(&self) -> &str {
199        &self.container_name
200    }
201
202    /// Get the host address.
203    pub fn host(&self) -> &str {
204        &self.host
205    }
206
207    /// Get the port.
208    pub fn port(&self) -> u16 {
209        self.port
210    }
211
212    /// Check if the container is logged in by examining logs.
213    ///
214    /// # Arguments
215    ///
216    /// * `container_id` - The container ID to check
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if log retrieval fails.
221    pub async fn is_logged_in(&self, container_id: &str) -> anyhow::Result<bool> {
222        let logs_options = LogsOptions::<String> {
223            stdout: true,
224            stderr: true,
225            ..Default::default()
226        };
227
228        let mut logs_stream = self.docker.logs(container_id, Some(logs_options));
229
230        let mut logged_in = false;
231
232        while let Some(log_result) = logs_stream.next().await {
233            let log_output = log_result.context("Failed to read log chunk")?;
234            // Handle LogOutput enum variants
235            let log_bytes = match log_output {
236                LogOutput::StdOut { message } | LogOutput::StdErr { message } => message,
237                LogOutput::StdIn { message } | LogOutput::Console { message } => message,
238            };
239            let log_string = String::from_utf8_lossy(&log_bytes);
240            if Self::logs_indicate_ready(&log_string) {
241                logged_in = true;
242                break;
243            }
244        }
245
246        Ok(logged_in)
247    }
248
249    /// Get the current container status.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if container inspection fails.
254    pub async fn container_status(&self) -> anyhow::Result<ContainerStatus> {
255        let list_options = bollard::container::ListContainersOptions::<String> {
256            all: true,
257            ..Default::default()
258        };
259        let containers = self
260            .docker
261            .list_containers(Some(list_options))
262            .await
263            .context("Failed to list containers")?;
264
265        let container = containers.iter().find(|c| {
266            c.names
267                .as_ref()
268                .and_then(|names| names.first())
269                .map(|name| name.trim_start_matches('/') == self.container_name)
270                .unwrap_or(false)
271        });
272
273        let Some(container) = container else {
274            return Ok(ContainerStatus::NoContainer);
275        };
276
277        let state = container.state.as_deref().unwrap_or("unknown");
278
279        match state {
280            "running" => {
281                let container_id = container
282                    .id
283                    .as_ref()
284                    .ok_or_else(|| anyhow::anyhow!("Container ID missing"))?;
285
286                if self.is_logged_in(container_id).await.unwrap_or(false) {
287                    Ok(ContainerStatus::Ready)
288                } else {
289                    Ok(ContainerStatus::ContainerStarting)
290                }
291            }
292            "stopped" | "exited" => Ok(ContainerStatus::ContainerStopped),
293            "created" => Ok(ContainerStatus::ContainerCreated),
294            _ => Ok(ContainerStatus::Unknown),
295        }
296    }
297
298    /// Start the gateway container.
299    ///
300    /// # Arguments
301    ///
302    /// * `wait` - Optional wait time in seconds (overrides config timeout)
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if container creation or startup fails.
307    pub async fn start(&mut self, wait: Option<u64>) -> anyhow::Result<()> {
308        tracing::info!("Ensuring gateway is running");
309
310        let status = self.container_status().await?;
311
312        let broken_statuses = [
313            ContainerStatus::NotLoggedIn,
314            ContainerStatus::ContainerStopped,
315            ContainerStatus::ContainerCreated,
316            ContainerStatus::Unknown,
317        ];
318
319        match status {
320            ContainerStatus::NoContainer => {
321                tracing::debug!("No container, starting");
322            }
323            status if broken_statuses.contains(&status) => {
324                tracing::debug!("Status {:?}, removing existing container", status);
325                self.stop().await?;
326            }
327            ContainerStatus::Ready | ContainerStatus::ContainerStarting => {
328                tracing::info!("Status {:?}, using existing container", status);
329                return Ok(());
330            }
331            _ => {}
332        }
333
334        tracing::debug!("Starting new container");
335
336        // Determine port mappings
337        let host_port = Self::host_port_for_mode(self.config.trading_mode);
338        let container_port = Self::container_port_for_mode(self.config.trading_mode);
339
340        let mut port_bindings = HashMap::new();
341        port_bindings.insert(
342            format!("{}/tcp", container_port),
343            Some(vec![PortBinding {
344                host_ip: Some(self.host.clone()),
345                host_port: Some(host_port.to_string()),
346            }]),
347        );
348
349        if let Some(vnc_port) = self.config.vnc_port {
350            port_bindings.insert(
351                format!("{}/tcp", Self::VNC_PORT_INTERNAL),
352                Some(vec![PortBinding {
353                    host_ip: Some(self.host.clone()),
354                    host_port: Some(vnc_port.to_string()),
355                }]),
356            );
357        }
358
359        // Prepare environment variables
360        let mode_str = match self.config.trading_mode {
361            crate::config::TradingMode::Paper => "paper",
362            crate::config::TradingMode::Live => "live",
363        };
364        let env = vec![
365            format!("TWS_USERID={}", self.username),
366            format!("TWS_PASSWORD={}", self.password),
367            format!("TRADING_MODE={}", mode_str),
368            format!(
369                "READ_ONLY_API={}",
370                if self.config.read_only_api {
371                    "yes"
372                } else {
373                    "no"
374                }
375            ),
376            "EXISTING_SESSION_DETECTED_ACTION=primary".to_string(),
377        ];
378
379        // Create container configuration
380        let container_config = Config {
381            image: Some(self.config.container_image.clone()),
382            hostname: Some(self.container_name.clone()),
383            host_config: Some(HostConfig {
384                port_bindings: Some(port_bindings),
385                restart_policy: Some(RestartPolicy {
386                    name: Some(RestartPolicyNameEnum::ALWAYS),
387                    maximum_retry_count: None,
388                }),
389                ..Default::default()
390            }),
391            env: Some(env),
392            ..Default::default()
393        };
394
395        // Create container
396        let create_options = CreateContainerOptions {
397            name: self.container_name.clone(),
398            platform: None,
399        };
400
401        let create_response: ContainerCreateResponse = self
402            .docker
403            .create_container(Some(create_options), container_config)
404            .await
405            .context("Failed to create container")?;
406
407        let container_id = create_response.id;
408
409        // Start container
410        self.docker
411            .start_container(
412                &container_id,
413                None::<bollard::container::StartContainerOptions<String>>,
414            )
415            .await
416            .context("Failed to start container")?;
417
418        tracing::info!(
419            "Container `{}` starting, waiting for ready",
420            self.container_name
421        );
422
423        // Wait for container to be ready
424        let wait_time = wait.unwrap_or(self.config.timeout);
425        let mut waited = 0u64;
426
427        while waited < wait_time {
428            if self.is_logged_in(&container_id).await.unwrap_or(false) {
429                tracing::info!(
430                    "Gateway `{}` ready. VNC port is {:?}",
431                    self.container_name,
432                    self.config.vnc_port
433                );
434                return Ok(());
435            }
436
437            tracing::debug!("Waiting for IB Gateway to start");
438            tokio::time::sleep(Duration::from_secs(1)).await;
439            waited += 1;
440        }
441
442        anyhow::bail!(
443            "Gateway `{}` not ready after {} seconds",
444            self.container_name,
445            wait_time
446        )
447    }
448
449    /// Safely start the gateway, handling container already exists errors.
450    ///
451    /// # Arguments
452    ///
453    /// * `wait` - Optional wait time in seconds
454    ///
455    /// # Errors
456    ///
457    /// Returns an error if startup fails (other than container exists).
458    pub async fn safe_start(&mut self, wait: Option<u64>) -> anyhow::Result<()> {
459        match self.start(wait).await {
460            Ok(()) => Ok(()),
461            Err(e) if e.to_string().contains("already exists") => {
462                tracing::warn!("Container already exists, continuing");
463                Ok(())
464            }
465            Err(e) => Err(e),
466        }
467    }
468
469    /// Stop and remove the gateway container.
470    ///
471    /// # Errors
472    ///
473    /// Returns an error if container stop or removal fails.
474    pub async fn stop(&self) -> anyhow::Result<()> {
475        let list_options = bollard::container::ListContainersOptions::<String> {
476            all: true,
477            ..Default::default()
478        };
479        let containers = self
480            .docker
481            .list_containers(Some(list_options))
482            .await
483            .context("Failed to list containers")?;
484
485        let container = containers.iter().find(|c| {
486            c.names
487                .as_ref()
488                .and_then(|names| names.first())
489                .map(|name| name.trim_start_matches('/') == self.container_name)
490                .unwrap_or(false)
491        });
492
493        if let Some(container) = container {
494            if let Some(container_id) = &container.id {
495                // Stop container if running
496                if container.state.as_deref() == Some("running") {
497                    self.docker
498                        .stop_container(
499                            container_id,
500                            None::<bollard::container::StopContainerOptions>,
501                        )
502                        .await
503                        .context("Failed to stop container")?;
504                }
505
506                // Remove container
507                let remove_options = RemoveContainerOptions {
508                    force: true,
509                    ..Default::default()
510                };
511
512                self.docker
513                    .remove_container(container_id, Some(remove_options))
514                    .await
515                    .context("Failed to remove container")?;
516
517                tracing::info!("Stopped and removed container `{}`", self.container_name);
518            }
519        }
520
521        Ok(())
522    }
523}
524
525/// Stub implementation when gateway feature is disabled.
526#[cfg(not(feature = "gateway"))]
527#[derive(Debug)]
528pub struct DockerizedIBGateway;
529
530#[cfg(not(feature = "gateway"))]
531impl DockerizedIBGateway {
532    /// # Errors
533    ///
534    /// Returns an error if the Dockerized IB Gateway cannot be created or started.
535    pub fn new(_config: crate::config::DockerizedIBGatewayConfig) -> anyhow::Result<Self> {
536        anyhow::bail!("Gateway feature is not enabled. Build with --features gateway")
537    }
538}
539
540#[cfg(all(test, feature = "gateway"))]
541mod tests {
542    use rstest::rstest;
543
544    use super::DockerizedIBGateway;
545    use crate::config::TradingMode;
546
547    #[rstest]
548    #[case(TradingMode::Paper, 4002)]
549    #[case(TradingMode::Live, 4001)]
550    fn host_port_matches_trading_mode(#[case] trading_mode: TradingMode, #[case] expected: u16) {
551        assert_eq!(
552            DockerizedIBGateway::host_port_for_mode(trading_mode),
553            expected
554        );
555    }
556
557    #[rstest]
558    #[case(TradingMode::Paper, 4004)]
559    #[case(TradingMode::Live, 4003)]
560    fn container_port_matches_trading_mode(
561        #[case] trading_mode: TradingMode,
562        #[case] expected: u16,
563    ) {
564        assert_eq!(
565            DockerizedIBGateway::container_port_for_mode(trading_mode),
566            expected
567        );
568    }
569
570    #[rstest]
571    #[case(TradingMode::Paper, 4002)]
572    #[case(TradingMode::Live, 4001)]
573    fn new_reports_the_host_api_port(#[case] trading_mode: TradingMode, #[case] expected: u16) {
574        let gateway = DockerizedIBGateway::new(
575            crate::config::DockerizedIBGatewayConfig::builder()
576                .username("test-user".to_string())
577                .password("test-password".to_string())
578                .trading_mode(trading_mode)
579                .build(),
580        )
581        .unwrap();
582
583        assert_eq!(gateway.port(), expected);
584    }
585
586    #[rstest]
587    #[case("Forking ::: Starting IBC Gateway", false)]
588    #[case("Started IB Gateway", false)]
589    #[case("Login has completed", true)]
590    #[case("Configuration tasks completed", true)]
591    #[case("Logged in to backend", true)]
592    #[case("Login successful", true)]
593    fn ready_log_markers_are_strict(#[case] logs: &str, #[case] expected: bool) {
594        assert_eq!(DockerizedIBGateway::logs_indicate_ready(logs), expected);
595    }
596}