Skip to main content

nautilus_live/
builder.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Builder for constructing [`LiveNode`] instances.
17
18use std::{collections::HashMap, time::Duration};
19
20use nautilus_common::{
21    cache::CacheConfig,
22    enums::Environment,
23    factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
24    logging::logger::LoggerConfig,
25    msgbus::database::MessageBusConfig,
26};
27use nautilus_core::UUID4;
28use nautilus_data::client::DataClientAdapter;
29use nautilus_execution::engine::ExecutionEngine;
30use nautilus_model::identifiers::TraderId;
31use nautilus_portfolio::config::PortfolioConfig;
32use nautilus_system::{config::StreamingConfig, kernel::NautilusKernel};
33
34use crate::{
35    config::{LiveDataEngineConfig, LiveExecEngineConfig, LiveNodeConfig, LiveRiskEngineConfig},
36    manager::{ExecutionManager, ExecutionManagerConfig},
37    node::LiveNode,
38    runner::AsyncRunner,
39};
40
41/// Builder for constructing a [`LiveNode`] with a fluent API.
42///
43/// Provides configuration options specific to live nodes,
44/// including client factory registration and timeout settings.
45#[derive(Debug)]
46#[cfg_attr(
47    feature = "python",
48    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
49)]
50pub struct LiveNodeBuilder {
51    name: String,
52    config: LiveNodeConfig,
53    data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
54    exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
55    data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
56    exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
57}
58
59impl LiveNodeBuilder {
60    /// Creates a new [`LiveNodeBuilder`] with required parameters.
61    ///
62    /// # Errors
63    ///
64    /// Returns an error if `environment` is invalid (BACKTEST).
65    pub fn new(trader_id: TraderId, environment: Environment) -> anyhow::Result<Self> {
66        match environment {
67            Environment::Sandbox | Environment::Live => {}
68            Environment::Backtest => {
69                anyhow::bail!("LiveNode cannot be used with Backtest environment");
70            }
71        }
72
73        let config = LiveNodeConfig {
74            environment,
75            trader_id,
76            ..Default::default()
77        };
78
79        Ok(Self {
80            name: "LiveNode".to_string(),
81            config,
82            data_client_factories: HashMap::new(),
83            exec_client_factories: HashMap::new(),
84            data_client_configs: HashMap::new(),
85            exec_client_configs: HashMap::new(),
86        })
87    }
88
89    /// Creates a new [`LiveNodeBuilder`] from an existing [`LiveNodeConfig`].
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the config's environment is invalid (BACKTEST).
94    pub fn from_config(config: LiveNodeConfig) -> anyhow::Result<Self> {
95        match config.environment {
96            Environment::Sandbox | Environment::Live => {}
97            Environment::Backtest => {
98                anyhow::bail!("LiveNode cannot be used with Backtest environment");
99            }
100        }
101
102        Ok(Self {
103            name: "LiveNode".to_string(),
104            config,
105            data_client_factories: HashMap::new(),
106            exec_client_factories: HashMap::new(),
107            data_client_configs: HashMap::new(),
108            exec_client_configs: HashMap::new(),
109        })
110    }
111
112    /// Returns the name for the node.
113    #[must_use]
114    pub fn name(&self) -> &str {
115        &self.name
116    }
117
118    /// Set the name for the node.
119    #[must_use]
120    pub fn with_name(mut self, name: impl Into<String>) -> Self {
121        self.name = name.into();
122        self
123    }
124
125    /// Set the instance ID for the node.
126    #[must_use]
127    pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
128        self.config.instance_id = Some(instance_id);
129        self
130    }
131
132    /// Configure whether to load state on startup.
133    #[must_use]
134    pub const fn with_load_state(mut self, load_state: bool) -> Self {
135        self.config.load_state = load_state;
136        self
137    }
138
139    /// Configure whether to save state on shutdown.
140    #[must_use]
141    pub const fn with_save_state(mut self, save_state: bool) -> Self {
142        self.config.save_state = save_state;
143        self
144    }
145
146    /// Set the connection timeout in seconds.
147    #[must_use]
148    pub const fn with_timeout_connection(mut self, timeout_secs: u64) -> Self {
149        self.config.timeout_connection = Duration::from_secs(timeout_secs);
150        self
151    }
152
153    /// Set the reconciliation timeout in seconds.
154    #[must_use]
155    pub const fn with_timeout_reconciliation(mut self, timeout_secs: u64) -> Self {
156        self.config.timeout_reconciliation = Duration::from_secs(timeout_secs);
157        self
158    }
159
160    /// Configure whether to run startup reconciliation.
161    #[must_use]
162    pub fn with_reconciliation(mut self, reconciliation: bool) -> Self {
163        self.config.exec_engine.reconciliation = reconciliation;
164        self
165    }
166
167    /// Set the reconciliation lookback in minutes.
168    #[must_use]
169    pub fn with_reconciliation_lookback_mins(mut self, mins: u32) -> Self {
170        self.config.exec_engine.reconciliation_lookback_mins = Some(mins);
171        self
172    }
173
174    /// Set the portfolio initialization timeout in seconds.
175    #[must_use]
176    pub const fn with_timeout_portfolio(mut self, timeout_secs: u64) -> Self {
177        self.config.timeout_portfolio = Duration::from_secs(timeout_secs);
178        self
179    }
180
181    /// Set the disconnection timeout in seconds.
182    #[must_use]
183    pub const fn with_timeout_disconnection_secs(mut self, timeout_secs: u64) -> Self {
184        self.config.timeout_disconnection = Duration::from_secs(timeout_secs);
185        self
186    }
187
188    /// Set the post-stop delay in seconds.
189    #[must_use]
190    pub const fn with_delay_post_stop_secs(mut self, delay_secs: u64) -> Self {
191        self.config.delay_post_stop = Duration::from_secs(delay_secs);
192        self
193    }
194
195    /// Set the shutdown timeout in seconds.
196    #[must_use]
197    pub const fn with_delay_shutdown_secs(mut self, delay_secs: u64) -> Self {
198        self.config.timeout_shutdown = Duration::from_secs(delay_secs);
199        self
200    }
201
202    /// Set the cache configuration.
203    #[must_use]
204    pub fn with_cache_config(mut self, config: CacheConfig) -> Self {
205        self.config.cache = Some(config);
206        self
207    }
208
209    /// Set the message bus configuration.
210    ///
211    /// The Rust live runtime does not support this setting yet.
212    /// `build()` returns an error when it is set.
213    #[must_use]
214    pub fn with_msgbus_config(mut self, config: MessageBusConfig) -> Self {
215        self.config.msgbus = Some(config);
216        self
217    }
218
219    /// Set the portfolio configuration.
220    #[must_use]
221    pub fn with_portfolio_config(mut self, config: PortfolioConfig) -> Self {
222        self.config.portfolio = Some(config);
223        self
224    }
225
226    /// Set the streaming configuration.
227    ///
228    /// The Rust live runtime does not support this setting yet.
229    /// `build()` returns an error when it is set.
230    #[must_use]
231    pub fn with_streaming_config(mut self, config: StreamingConfig) -> Self {
232        self.config.streaming = Some(config);
233        self
234    }
235
236    /// Set the data engine configuration.
237    ///
238    /// The Rust live runtime currently supports only the default `qsize`.
239    /// `build()` returns an error for other values.
240    #[must_use]
241    pub fn with_data_engine_config(mut self, config: LiveDataEngineConfig) -> Self {
242        self.config.data_engine = config;
243        self
244    }
245
246    /// Set the risk engine configuration.
247    ///
248    /// The Rust live runtime currently supports only the default `qsize`.
249    /// `build()` returns an error for other values.
250    #[must_use]
251    pub fn with_risk_engine_config(mut self, config: LiveRiskEngineConfig) -> Self {
252        self.config.risk_engine = config;
253        self
254    }
255
256    /// Set the execution engine configuration.
257    ///
258    /// The Rust live runtime currently supports only the default `qsize`.
259    /// `build()` returns an error for other values.
260    #[must_use]
261    pub fn with_exec_engine_config(mut self, config: LiveExecEngineConfig) -> Self {
262        self.config.exec_engine = config;
263        self
264    }
265
266    /// Set the logging configuration.
267    #[must_use]
268    pub fn with_logging(mut self, logging: LoggerConfig) -> Self {
269        self.config.logging = logging;
270        self
271    }
272
273    /// Adds a data client factory with configuration.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if a client with the same name is already registered.
278    pub fn add_data_client(
279        mut self,
280        name: Option<String>,
281        factory: Box<dyn DataClientFactory>,
282        config: Box<dyn ClientConfig>,
283    ) -> anyhow::Result<Self> {
284        let name = name.unwrap_or_else(|| factory.name().to_string());
285
286        if self.data_client_factories.contains_key(&name) {
287            anyhow::bail!("Data client '{name}' is already registered");
288        }
289
290        self.data_client_factories.insert(name.clone(), factory);
291        self.data_client_configs.insert(name, config);
292        Ok(self)
293    }
294
295    /// Adds an execution client factory with configuration.
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if a client with the same name is already registered.
300    pub fn add_exec_client(
301        mut self,
302        name: Option<String>,
303        factory: Box<dyn ExecutionClientFactory>,
304        config: Box<dyn ClientConfig>,
305    ) -> anyhow::Result<Self> {
306        let name = name.unwrap_or_else(|| factory.name().to_string());
307
308        if self.exec_client_factories.contains_key(&name) {
309            anyhow::bail!("Execution client '{name}' is already registered");
310        }
311
312        self.exec_client_factories.insert(name.clone(), factory);
313        self.exec_client_configs.insert(name, config);
314        Ok(self)
315    }
316
317    /// Build the [`LiveNode`] with the configured settings.
318    ///
319    /// This will:
320    /// 1. Build the underlying kernel.
321    /// 2. Create clients using factories.
322    /// 3. Register clients with engines.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if node construction fails.
327    pub fn build(mut self) -> anyhow::Result<LiveNode> {
328        log::info!(
329            "Building LiveNode with {} data clients and {} execution clients",
330            self.data_client_factories.len(),
331            self.exec_client_factories.len()
332        );
333
334        self.config.validate_runtime_support()?;
335
336        let runner = AsyncRunner::new();
337        runner.bind_senders();
338
339        let kernel = NautilusKernel::new(self.name.clone(), self.config.clone())?;
340
341        for (name, factory) in self.data_client_factories {
342            if let Some(config) = self.data_client_configs.remove(&name) {
343                log::debug!("Creating data client {name}");
344
345                let client =
346                    factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
347                let client_id = client.client_id();
348                let venue = client.venue();
349
350                let adapter = DataClientAdapter::new(
351                    client_id, venue, true, // handles_order_book_deltas
352                    true, // handles_order_book_snapshots
353                    client,
354                );
355
356                kernel
357                    .data_engine
358                    .borrow_mut()
359                    .register_client(adapter, venue);
360
361                log::info!("Registered DataClient-{client_id}");
362            } else {
363                log::warn!("No config found for data client factory {name}");
364            }
365        }
366
367        for (name, factory) in self.exec_client_factories {
368            if let Some(config) = self.exec_client_configs.remove(&name) {
369                log::debug!("Creating execution client {name}");
370
371                let client = factory.create(&name, config.as_ref(), kernel.cache())?;
372                let client_id = client.client_id();
373                let venue = client.venue();
374
375                kernel.exec_engine.borrow_mut().register_client(client)?;
376                ExecutionEngine::subscribe_venue_instruments(&kernel.exec_engine, venue);
377
378                log::info!("Registered ExecutionClient-{client_id}");
379            } else {
380                log::warn!("No config found for execution client factory {name}");
381            }
382        }
383
384        let exec_manager_config = ExecutionManagerConfig::from(&self.config.exec_engine)
385            .with_trader_id(self.config.trader_id);
386        let exec_manager = ExecutionManager::new(
387            kernel.clock.clone(),
388            kernel.cache.clone(),
389            exec_manager_config,
390        );
391
392        log::info!("Built successfully");
393
394        Ok(LiveNode::new_from_builder(
395            kernel,
396            runner,
397            self.config,
398            exec_manager,
399        ))
400    }
401}