1use std::{collections::HashMap, time::Duration};
19
20use nautilus_common::{
21 cache::CacheConfig,
22 enums::Environment,
23 factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
24 logging::logger::LoggerConfig,
25 msgbus::database::MessageBusConfig,
26};
27use nautilus_core::UUID4;
28use nautilus_data::client::DataClientAdapter;
29use nautilus_execution::engine::ExecutionEngine;
30use nautilus_model::identifiers::TraderId;
31use nautilus_portfolio::config::PortfolioConfig;
32use nautilus_system::{config::StreamingConfig, kernel::NautilusKernel};
33
34use crate::{
35 config::{LiveDataEngineConfig, LiveExecEngineConfig, LiveNodeConfig, LiveRiskEngineConfig},
36 manager::{ExecutionManager, ExecutionManagerConfig},
37 node::LiveNode,
38 runner::AsyncRunner,
39};
40
41#[derive(Debug)]
46#[cfg_attr(
47 feature = "python",
48 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
49)]
50pub struct LiveNodeBuilder {
51 name: String,
52 config: LiveNodeConfig,
53 data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
54 exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
55 data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
56 exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
57}
58
59impl LiveNodeBuilder {
60 pub fn new(trader_id: TraderId, environment: Environment) -> anyhow::Result<Self> {
66 match environment {
67 Environment::Sandbox | Environment::Live => {}
68 Environment::Backtest => {
69 anyhow::bail!("LiveNode cannot be used with Backtest environment");
70 }
71 }
72
73 let config = LiveNodeConfig {
74 environment,
75 trader_id,
76 ..Default::default()
77 };
78
79 Ok(Self {
80 name: "LiveNode".to_string(),
81 config,
82 data_client_factories: HashMap::new(),
83 exec_client_factories: HashMap::new(),
84 data_client_configs: HashMap::new(),
85 exec_client_configs: HashMap::new(),
86 })
87 }
88
89 pub fn from_config(config: LiveNodeConfig) -> anyhow::Result<Self> {
95 match config.environment {
96 Environment::Sandbox | Environment::Live => {}
97 Environment::Backtest => {
98 anyhow::bail!("LiveNode cannot be used with Backtest environment");
99 }
100 }
101
102 Ok(Self {
103 name: "LiveNode".to_string(),
104 config,
105 data_client_factories: HashMap::new(),
106 exec_client_factories: HashMap::new(),
107 data_client_configs: HashMap::new(),
108 exec_client_configs: HashMap::new(),
109 })
110 }
111
112 #[must_use]
114 pub fn name(&self) -> &str {
115 &self.name
116 }
117
118 #[must_use]
120 pub fn with_name(mut self, name: impl Into<String>) -> Self {
121 self.name = name.into();
122 self
123 }
124
125 #[must_use]
127 pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
128 self.config.instance_id = Some(instance_id);
129 self
130 }
131
132 #[must_use]
134 pub const fn with_load_state(mut self, load_state: bool) -> Self {
135 self.config.load_state = load_state;
136 self
137 }
138
139 #[must_use]
141 pub const fn with_save_state(mut self, save_state: bool) -> Self {
142 self.config.save_state = save_state;
143 self
144 }
145
146 #[must_use]
148 pub const fn with_timeout_connection(mut self, timeout_secs: u64) -> Self {
149 self.config.timeout_connection = Duration::from_secs(timeout_secs);
150 self
151 }
152
153 #[must_use]
155 pub const fn with_timeout_reconciliation(mut self, timeout_secs: u64) -> Self {
156 self.config.timeout_reconciliation = Duration::from_secs(timeout_secs);
157 self
158 }
159
160 #[must_use]
162 pub fn with_reconciliation(mut self, reconciliation: bool) -> Self {
163 self.config.exec_engine.reconciliation = reconciliation;
164 self
165 }
166
167 #[must_use]
169 pub fn with_reconciliation_lookback_mins(mut self, mins: u32) -> Self {
170 self.config.exec_engine.reconciliation_lookback_mins = Some(mins);
171 self
172 }
173
174 #[must_use]
176 pub const fn with_timeout_portfolio(mut self, timeout_secs: u64) -> Self {
177 self.config.timeout_portfolio = Duration::from_secs(timeout_secs);
178 self
179 }
180
181 #[must_use]
183 pub const fn with_timeout_disconnection_secs(mut self, timeout_secs: u64) -> Self {
184 self.config.timeout_disconnection = Duration::from_secs(timeout_secs);
185 self
186 }
187
188 #[must_use]
190 pub const fn with_delay_post_stop_secs(mut self, delay_secs: u64) -> Self {
191 self.config.delay_post_stop = Duration::from_secs(delay_secs);
192 self
193 }
194
195 #[must_use]
197 pub const fn with_delay_shutdown_secs(mut self, delay_secs: u64) -> Self {
198 self.config.timeout_shutdown = Duration::from_secs(delay_secs);
199 self
200 }
201
202 #[must_use]
204 pub fn with_cache_config(mut self, config: CacheConfig) -> Self {
205 self.config.cache = Some(config);
206 self
207 }
208
209 #[must_use]
214 pub fn with_msgbus_config(mut self, config: MessageBusConfig) -> Self {
215 self.config.msgbus = Some(config);
216 self
217 }
218
219 #[must_use]
221 pub fn with_portfolio_config(mut self, config: PortfolioConfig) -> Self {
222 self.config.portfolio = Some(config);
223 self
224 }
225
226 #[must_use]
231 pub fn with_streaming_config(mut self, config: StreamingConfig) -> Self {
232 self.config.streaming = Some(config);
233 self
234 }
235
236 #[must_use]
241 pub fn with_data_engine_config(mut self, config: LiveDataEngineConfig) -> Self {
242 self.config.data_engine = config;
243 self
244 }
245
246 #[must_use]
251 pub fn with_risk_engine_config(mut self, config: LiveRiskEngineConfig) -> Self {
252 self.config.risk_engine = config;
253 self
254 }
255
256 #[must_use]
261 pub fn with_exec_engine_config(mut self, config: LiveExecEngineConfig) -> Self {
262 self.config.exec_engine = config;
263 self
264 }
265
266 #[must_use]
268 pub fn with_logging(mut self, logging: LoggerConfig) -> Self {
269 self.config.logging = logging;
270 self
271 }
272
273 pub fn add_data_client(
279 mut self,
280 name: Option<String>,
281 factory: Box<dyn DataClientFactory>,
282 config: Box<dyn ClientConfig>,
283 ) -> anyhow::Result<Self> {
284 let name = name.unwrap_or_else(|| factory.name().to_string());
285
286 if self.data_client_factories.contains_key(&name) {
287 anyhow::bail!("Data client '{name}' is already registered");
288 }
289
290 self.data_client_factories.insert(name.clone(), factory);
291 self.data_client_configs.insert(name, config);
292 Ok(self)
293 }
294
295 pub fn add_exec_client(
301 mut self,
302 name: Option<String>,
303 factory: Box<dyn ExecutionClientFactory>,
304 config: Box<dyn ClientConfig>,
305 ) -> anyhow::Result<Self> {
306 let name = name.unwrap_or_else(|| factory.name().to_string());
307
308 if self.exec_client_factories.contains_key(&name) {
309 anyhow::bail!("Execution client '{name}' is already registered");
310 }
311
312 self.exec_client_factories.insert(name.clone(), factory);
313 self.exec_client_configs.insert(name, config);
314 Ok(self)
315 }
316
317 pub fn build(mut self) -> anyhow::Result<LiveNode> {
328 log::info!(
329 "Building LiveNode with {} data clients and {} execution clients",
330 self.data_client_factories.len(),
331 self.exec_client_factories.len()
332 );
333
334 self.config.validate_runtime_support()?;
335
336 let runner = AsyncRunner::new();
337 runner.bind_senders();
338
339 let kernel = NautilusKernel::new(self.name.clone(), self.config.clone())?;
340
341 for (name, factory) in self.data_client_factories {
342 if let Some(config) = self.data_client_configs.remove(&name) {
343 log::debug!("Creating data client {name}");
344
345 let client =
346 factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
347 let client_id = client.client_id();
348 let venue = client.venue();
349
350 let adapter = DataClientAdapter::new(
351 client_id, venue, true, true, client,
354 );
355
356 kernel
357 .data_engine
358 .borrow_mut()
359 .register_client(adapter, venue);
360
361 log::info!("Registered DataClient-{client_id}");
362 } else {
363 log::warn!("No config found for data client factory {name}");
364 }
365 }
366
367 for (name, factory) in self.exec_client_factories {
368 if let Some(config) = self.exec_client_configs.remove(&name) {
369 log::debug!("Creating execution client {name}");
370
371 let client = factory.create(&name, config.as_ref(), kernel.cache())?;
372 let client_id = client.client_id();
373 let venue = client.venue();
374
375 kernel.exec_engine.borrow_mut().register_client(client)?;
376 ExecutionEngine::subscribe_venue_instruments(&kernel.exec_engine, venue);
377
378 log::info!("Registered ExecutionClient-{client_id}");
379 } else {
380 log::warn!("No config found for execution client factory {name}");
381 }
382 }
383
384 let exec_manager_config = ExecutionManagerConfig::from(&self.config.exec_engine)
385 .with_trader_id(self.config.trader_id);
386 let exec_manager = ExecutionManager::new(
387 kernel.clock.clone(),
388 kernel.cache.clone(),
389 exec_manager_config,
390 );
391
392 log::info!("Built successfully");
393
394 Ok(LiveNode::new_from_builder(
395 kernel,
396 runner,
397 self.config,
398 exec_manager,
399 ))
400 }
401}