1use std::{
17 cell::{Cell, Ref, RefCell},
18 rc::Rc,
19 time::Duration,
20};
21
22use nautilus_common::{
23 cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
24 clock::{Clock, TestClock},
25 component::Component,
26 enums::Environment,
27 logging::{
28 headers, init_logging,
29 logger::{LogGuard, LoggerConfig},
30 writer::FileWriterConfig,
31 },
32 messages::system::ShutdownSystem,
33 msgbus::{
34 self, MessageBus, MessagingSwitchboard, ShareableMessageHandler, get_message_bus,
35 set_message_bus,
36 },
37};
38use nautilus_core::{UUID4, UnixNanos};
39use nautilus_data::engine::DataEngine;
40use nautilus_execution::{engine::ExecutionEngine, order_emulator::adapter::OrderEmulatorAdapter};
41use nautilus_model::identifiers::{ClientId, TraderId};
42use nautilus_portfolio::portfolio::Portfolio;
43use nautilus_risk::engine::RiskEngine;
44use ustr::Ustr;
45
46use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
47
48#[derive(Debug)]
52pub struct NautilusKernel {
53 pub name: String,
55 pub instance_id: UUID4,
57 pub machine_id: String,
59 pub config: Box<dyn NautilusKernelConfig>,
61 pub cache: Rc<RefCell<Cache>>,
63 pub clock: Rc<RefCell<dyn Clock>>,
65 pub portfolio: Rc<RefCell<Portfolio>>,
67 pub log_guard: LogGuard,
69 pub data_engine: Rc<RefCell<DataEngine>>,
71 pub risk_engine: Rc<RefCell<RiskEngine>>,
73 pub exec_engine: Rc<RefCell<ExecutionEngine>>,
75 pub order_emulator: OrderEmulatorAdapter,
77 pub trader: Rc<RefCell<Trader>>,
79 pub ts_created: UnixNanos,
81 pub ts_started: Option<UnixNanos>,
83 pub ts_shutdown: Option<UnixNanos>,
85 shutdown_requested: Rc<Cell<bool>>,
86}
87
88impl NautilusKernel {
89 #[must_use]
91 pub const fn builder(
92 name: String,
93 trader_id: TraderId,
94 environment: Environment,
95 ) -> NautilusKernelBuilder {
96 NautilusKernelBuilder::new(name, trader_id, environment)
97 }
98
99 pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
105 let instance_id = config.instance_id().unwrap_or_default();
106 let machine_id = Self::determine_machine_id()?;
107
108 let logger_config = config.logging();
109 let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
110 headers::log_header(
111 config.trader_id(),
112 &machine_id,
113 instance_id,
114 Ustr::from(&name),
115 );
116
117 log::info!("Building system kernel");
118
119 let clock = Self::initialize_clock(&config.environment());
120 let cache = Self::initialize_cache(config.cache());
121
122 let msgbus = Rc::new(RefCell::new(MessageBus::new(
123 config.trader_id(),
124 instance_id,
125 Some(name.clone()),
126 None,
127 )));
128 set_message_bus(msgbus);
129
130 let portfolio = Rc::new(RefCell::new(Portfolio::new(
131 cache.clone(),
132 clock.clone(),
133 config.portfolio(),
134 )));
135
136 let risk_engine = RiskEngine::new(
137 config.risk_engine().unwrap_or_default(),
138 portfolio.borrow().clone_shallow(),
139 clock.clone(),
140 cache.clone(),
141 );
142 let risk_engine = Rc::new(RefCell::new(risk_engine));
143
144 let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
145 let exec_engine = Rc::new(RefCell::new(exec_engine));
146
147 let order_emulator =
148 OrderEmulatorAdapter::new(config.trader_id(), clock.clone(), cache.clone());
149
150 let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
151 let data_engine = Rc::new(RefCell::new(data_engine));
152
153 DataEngine::register_msgbus_handlers(&data_engine);
154 RiskEngine::register_msgbus_handlers(&risk_engine);
155 ExecutionEngine::register_msgbus_handlers(&exec_engine);
156
157 let shutdown_requested = Rc::new(Cell::new(false));
158 Self::register_shutdown_handler(config.trader_id(), shutdown_requested.clone());
159
160 let trader = Rc::new(RefCell::new(Trader::new(
161 config.trader_id(),
162 instance_id,
163 config.environment(),
164 clock.clone(),
165 cache.clone(),
166 portfolio.clone(),
167 )));
168
169 let ts_created = clock.borrow().timestamp_ns();
170
171 Ok(Self {
172 name,
173 instance_id,
174 machine_id,
175 config: Box::new(config),
176 cache,
177 clock,
178 portfolio,
179 log_guard,
180 data_engine,
181 risk_engine,
182 exec_engine,
183 order_emulator,
184 trader,
185 ts_created,
186 ts_started: None,
187 ts_shutdown: None,
188 shutdown_requested,
189 })
190 }
191
192 fn register_shutdown_handler(trader_id: TraderId, shutdown_requested: Rc<Cell<bool>>) {
193 let handler = ShareableMessageHandler::from_typed(move |cmd: &ShutdownSystem| {
194 if cmd.trader_id != trader_id {
195 log::warn!("Received {cmd} not for this trader {trader_id}, ignoring",);
196 return;
197 }
198
199 if shutdown_requested.get() {
200 log::debug!("Shutdown already requested, ignoring {cmd}");
201 return;
202 }
203
204 log::info!("Received {cmd}, requesting shutdown");
205 shutdown_requested.set(true);
206 });
207 let topic = MessagingSwitchboard::shutdown_system_topic();
208 msgbus::subscribe_any(topic.into(), handler, None);
209 }
210
211 fn determine_machine_id() -> anyhow::Result<String> {
212 sysinfo::System::host_name().ok_or_else(|| anyhow::anyhow!("Failed to determine hostname"))
213 }
214
215 fn initialize_logging(
216 trader_id: TraderId,
217 instance_id: UUID4,
218 config: LoggerConfig,
219 ) -> anyhow::Result<LogGuard> {
220 #[cfg(feature = "tracing-bridge")]
221 let use_tracing = config.use_tracing;
222
223 let log_guard = match init_logging(
224 trader_id,
225 instance_id,
226 config,
227 FileWriterConfig::default(), ) {
229 Ok(guard) => guard,
230 Err(e) => {
231 if e.downcast_ref::<log::SetLoggerError>().is_some() {
236 if let Some(guard) = LogGuard::new() {
237 guard
238 } else {
239 return Err(e.context(
240 "A non-Nautilus logger is already registered; \
241 cannot initialize Nautilus logging",
242 ));
243 }
244 } else {
245 return Err(e);
246 }
247 }
248 };
249
250 #[cfg(feature = "tracing-bridge")]
252 if use_tracing && !nautilus_common::logging::bridge::tracing_is_initialized() {
253 nautilus_common::logging::bridge::init_tracing()?;
254 }
255
256 Ok(log_guard)
257 }
258
259 fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
260 match environment {
261 Environment::Backtest => {
262 let test_clock = TestClock::new();
263 Rc::new(RefCell::new(test_clock))
264 }
265 #[cfg(feature = "live")]
266 Environment::Live | Environment::Sandbox => {
267 let live_clock = nautilus_common::live::clock::LiveClock::default(); Rc::new(RefCell::new(live_clock))
269 }
270 #[cfg(not(feature = "live"))]
271 Environment::Live | Environment::Sandbox => {
272 panic!(
273 "Live/Sandbox environment requires the 'live' feature to be enabled. \
274 Build with `--features live` or add `features = [\"live\"]` to your dependency."
275 );
276 }
277 }
278 }
279
280 fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
281 let cache_config = cache_config.unwrap_or_default();
282
283 let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
285 let cache = Cache::new(Some(cache_config), cache_database);
286
287 Rc::new(RefCell::new(cache))
288 }
289
290 fn cancel_timers(&self) {
291 self.clock.borrow_mut().cancel_timers();
292 }
293
294 #[must_use]
295 pub fn generate_timestamp_ns(&self) -> UnixNanos {
296 self.clock.borrow().timestamp_ns()
297 }
298
299 #[must_use]
301 pub fn environment(&self) -> Environment {
302 self.config.environment()
303 }
304
305 #[must_use]
307 pub const fn name(&self) -> &str {
308 self.name.as_str()
309 }
310
311 #[must_use]
313 pub fn trader_id(&self) -> TraderId {
314 self.config.trader_id()
315 }
316
317 #[must_use]
319 pub fn machine_id(&self) -> &str {
320 &self.machine_id
321 }
322
323 #[must_use]
325 pub const fn instance_id(&self) -> UUID4 {
326 self.instance_id
327 }
328
329 #[must_use]
331 pub fn delay_post_stop(&self) -> Duration {
332 self.config.delay_post_stop()
333 }
334
335 #[must_use]
337 pub const fn ts_created(&self) -> UnixNanos {
338 self.ts_created
339 }
340
341 #[must_use]
343 pub const fn ts_started(&self) -> Option<UnixNanos> {
344 self.ts_started
345 }
346
347 #[must_use]
349 pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
350 self.ts_shutdown
351 }
352
353 #[must_use]
355 pub fn is_shutdown_requested(&self) -> bool {
356 self.shutdown_requested.get()
357 }
358
359 pub fn reset_shutdown_flag(&self) {
364 self.shutdown_requested.set(false);
365 }
366
367 #[must_use]
370 pub fn shutdown_flag(&self) -> Rc<Cell<bool>> {
371 self.shutdown_requested.clone()
372 }
373
374 #[must_use]
376 pub fn load_state(&self) -> bool {
377 self.config.load_state()
378 }
379
380 #[must_use]
382 pub fn save_state(&self) -> bool {
383 self.config.save_state()
384 }
385
386 #[must_use]
388 pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
389 self.clock.clone()
390 }
391
392 #[must_use]
394 pub fn cache(&self) -> Rc<RefCell<Cache>> {
395 self.cache.clone()
396 }
397
398 #[must_use]
400 pub fn portfolio(&self) -> Ref<'_, Portfolio> {
401 self.portfolio.borrow()
402 }
403
404 #[must_use]
406 pub fn data_engine(&self) -> Ref<'_, DataEngine> {
407 self.data_engine.borrow()
408 }
409
410 #[must_use]
412 pub const fn risk_engine(&self) -> &Rc<RefCell<RiskEngine>> {
413 &self.risk_engine
414 }
415
416 #[must_use]
418 pub const fn exec_engine(&self) -> &Rc<RefCell<ExecutionEngine>> {
419 &self.exec_engine
420 }
421
422 #[must_use]
424 pub fn trader(&self) -> &Rc<RefCell<Trader>> {
425 &self.trader
426 }
427
428 pub fn start(&mut self) {
430 log::info!("Starting");
431 self.start_engines();
432
433 log::info!("Initializing trader");
434 if let Err(e) = self.trader.borrow_mut().initialize() {
435 log::error!("Error initializing trader: {e:?}");
436 return;
437 }
438
439 log::info!("Starting clients...");
440
441 if let Err(e) = self.start_clients() {
442 log::error!("Error starting clients: {e:?}");
443 }
444 log::info!("Clients started");
445
446 self.ts_started = Some(self.clock.borrow().timestamp_ns());
447 log::info!("Started");
448 }
449
450 pub async fn start_async(&mut self) {
452 self.start();
453 }
454
455 pub fn start_trader(&mut self) {
459 log::info!("Starting trader...");
460 if let Err(e) = self.trader.borrow_mut().start() {
461 log::error!("Error starting trader: {e:?}");
462 }
463 log::info!("Trader started");
464 }
465
466 pub fn stop_trader(&mut self) {
472 if !self.trader.borrow().is_running() {
473 return;
474 }
475
476 log::info!("Stopping trader...");
477
478 if let Err(e) = self.trader.borrow_mut().stop() {
479 log::error!("Error stopping trader: {e}");
480 }
481 }
482
483 pub async fn finalize_stop(&mut self) {
488 if let Err(e) = self.stop_all_clients() {
490 log::error!("Error stopping clients: {e:?}");
491 }
492
493 self.stop_engines();
494 self.cancel_timers();
495
496 self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
497 log::info!("Stopped");
498 }
499
500 pub fn reset(&mut self) {
502 log::info!("Resetting");
503
504 if let Err(e) = self.trader.borrow_mut().reset() {
505 log::error!("Error resetting trader: {e:?}");
506 }
507
508 self.data_engine.borrow_mut().reset();
509 self.exec_engine.borrow_mut().reset();
510 self.risk_engine.borrow_mut().reset();
511
512 self.ts_started = None;
513 self.ts_shutdown = None;
514
515 log::info!("Reset");
516 }
517
518 pub fn dispose(&mut self) {
520 log::info!("Disposing");
521
522 if let Err(e) = self.trader.borrow_mut().dispose() {
523 log::error!("Error disposing trader: {e:?}");
524 }
525
526 self.stop_engines();
527
528 self.data_engine.borrow_mut().dispose();
529 self.exec_engine.borrow_mut().dispose();
530 self.risk_engine.borrow_mut().dispose();
531 self.cache.borrow_mut().dispose();
532 get_message_bus().borrow_mut().dispose();
533
534 log::info!("Disposed");
535 }
536
537 fn start_engines(&self) {
539 self.data_engine.borrow_mut().start();
540 self.exec_engine.borrow_mut().start();
541 self.risk_engine.borrow_mut().start();
542 }
543
544 fn stop_engines(&self) {
546 self.data_engine.borrow_mut().stop();
547 self.exec_engine.borrow_mut().stop();
548 self.risk_engine.borrow_mut().stop();
549 }
550
551 fn start_clients(&self) -> Result<(), Vec<anyhow::Error>> {
556 let mut errors = Vec::new();
557
558 {
559 let mut exec_engine = self.exec_engine.borrow_mut();
560 let exec_adapters = exec_engine.get_clients_mut();
561
562 for adapter in exec_adapters {
563 if let Err(e) = adapter.start() {
564 log::error!("Error starting execution client {}: {e}", adapter.client_id);
565 errors.push(e);
566 }
567 }
568 }
569
570 if errors.is_empty() {
571 Ok(())
572 } else {
573 Err(errors)
574 }
575 }
576
577 fn stop_all_clients(&self) -> Result<(), Vec<anyhow::Error>> {
582 let mut errors = Vec::new();
583
584 {
585 let mut exec_engine = self.exec_engine.borrow_mut();
586 let exec_adapters = exec_engine.get_clients_mut();
587
588 for adapter in exec_adapters {
589 if let Err(e) = adapter.stop() {
590 log::error!("Error stopping execution client {}: {e}", adapter.client_id);
591 errors.push(e);
592 }
593 }
594 }
595
596 if errors.is_empty() {
597 Ok(())
598 } else {
599 Err(errors)
600 }
601 }
602
603 #[expect(clippy::await_holding_refcell_ref)] pub async fn connect_data_clients(&mut self) {
609 log::info!("Connecting data clients...");
610 self.data_engine.borrow_mut().connect().await;
611 }
612
613 #[expect(clippy::await_holding_refcell_ref)] pub async fn connect_exec_clients(&mut self) {
619 log::info!("Connecting execution clients...");
620 self.exec_engine.borrow_mut().connect().await;
621 }
622
623 #[expect(clippy::await_holding_refcell_ref)] pub async fn disconnect_clients(&mut self) -> anyhow::Result<()> {
630 log::info!("Disconnecting clients...");
631 self.data_engine.borrow_mut().disconnect().await?;
632 self.exec_engine.borrow_mut().disconnect().await?;
633 Ok(())
634 }
635
636 #[must_use]
638 pub fn check_engines_connected(&self) -> bool {
639 self.data_engine.borrow().check_connected() && self.exec_engine.borrow().check_connected()
640 }
641
642 #[must_use]
644 pub fn check_engines_disconnected(&self) -> bool {
645 self.data_engine.borrow().check_disconnected()
646 && self.exec_engine.borrow().check_disconnected()
647 }
648
649 #[must_use]
651 pub fn data_client_connection_status(&self) -> Vec<(ClientId, bool)> {
652 self.data_engine.borrow().client_connection_status()
653 }
654
655 #[must_use]
657 pub fn exec_client_connection_status(&self) -> Vec<(ClientId, bool)> {
658 self.exec_engine.borrow().client_connection_status()
659 }
660}
661
662#[cfg(all(test, feature = "python"))]
663mod tests {
664 use nautilus_common::messages::system::ShutdownSystem;
665 use nautilus_core::UUID4;
666 use rstest::*;
667 use ustr::Ustr;
668
669 use super::*;
670 use crate::builder::NautilusKernelBuilder;
671
672 #[rstest]
673 fn test_shutdown_system_sets_kernel_flag() {
674 let kernel = NautilusKernelBuilder::default().build().unwrap();
675 assert!(!kernel.is_shutdown_requested());
676
677 let command = ShutdownSystem::new(
678 kernel.trader_id(),
679 Ustr::from("TestComponent"),
680 Some("unit test".to_string()),
681 UUID4::new(),
682 kernel.generate_timestamp_ns(),
683 );
684
685 msgbus::publish_any(
686 MessagingSwitchboard::shutdown_system_topic(),
687 command.as_any(),
688 );
689 assert!(kernel.is_shutdown_requested());
690
691 kernel.reset_shutdown_flag();
692 assert!(!kernel.is_shutdown_requested());
693 }
694
695 #[rstest]
696 fn test_shutdown_system_idempotent() {
697 let kernel = NautilusKernelBuilder::default().build().unwrap();
698
699 let make_cmd = || {
700 ShutdownSystem::new(
701 kernel.trader_id(),
702 Ustr::from("TestComponent"),
703 None,
704 UUID4::new(),
705 kernel.generate_timestamp_ns(),
706 )
707 };
708
709 let topic = MessagingSwitchboard::shutdown_system_topic();
710 msgbus::publish_any(topic, make_cmd().as_any());
711 assert!(kernel.is_shutdown_requested());
712
713 msgbus::publish_any(topic, make_cmd().as_any());
714 assert!(kernel.is_shutdown_requested());
715
716 kernel.reset_shutdown_flag();
717 assert!(!kernel.is_shutdown_requested());
718
719 msgbus::publish_any(topic, make_cmd().as_any());
720 assert!(kernel.is_shutdown_requested());
721 }
722
723 #[rstest]
724 fn test_shutdown_system_ignores_other_trader() {
725 let kernel = NautilusKernelBuilder::default().build().unwrap();
726
727 let command = ShutdownSystem::new(
728 TraderId::from("OTHER-TRADER"),
729 Ustr::from("TestComponent"),
730 None,
731 UUID4::new(),
732 kernel.generate_timestamp_ns(),
733 );
734
735 msgbus::publish_any(
736 MessagingSwitchboard::shutdown_system_topic(),
737 command.as_any(),
738 );
739 assert!(!kernel.is_shutdown_requested());
740 }
741}