Skip to main content

nautilus_system/
kernel.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::{Cell, Ref, RefCell},
18    rc::Rc,
19    time::Duration,
20};
21
22use nautilus_common::{
23    cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
24    clock::{Clock, TestClock},
25    component::Component,
26    enums::Environment,
27    logging::{
28        headers, init_logging,
29        logger::{LogGuard, LoggerConfig},
30        writer::FileWriterConfig,
31    },
32    messages::system::ShutdownSystem,
33    msgbus::{
34        self, MessageBus, MessagingSwitchboard, ShareableMessageHandler, get_message_bus,
35        set_message_bus,
36    },
37};
38use nautilus_core::{UUID4, UnixNanos};
39use nautilus_data::engine::DataEngine;
40use nautilus_execution::{engine::ExecutionEngine, order_emulator::adapter::OrderEmulatorAdapter};
41use nautilus_model::identifiers::{ClientId, TraderId};
42use nautilus_portfolio::portfolio::Portfolio;
43use nautilus_risk::engine::RiskEngine;
44use ustr::Ustr;
45
46use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
47
48/// Core Nautilus system kernel.
49///
50/// Orchestrates data and execution engines, cache, clock, and messaging across environments.
51#[derive(Debug)]
52pub struct NautilusKernel {
53    /// The kernel name (for logging and identification).
54    pub name: String,
55    /// The unique instance identifier for this kernel.
56    pub instance_id: UUID4,
57    /// The machine identifier (hostname or similar).
58    pub machine_id: String,
59    /// The kernel configuration.
60    pub config: Box<dyn NautilusKernelConfig>,
61    /// The shared in-memory cache.
62    pub cache: Rc<RefCell<Cache>>,
63    /// The clock driving the kernel.
64    pub clock: Rc<RefCell<dyn Clock>>,
65    /// The portfolio manager.
66    pub portfolio: Rc<RefCell<Portfolio>>,
67    /// Guard for the logging subsystem (keeps logger thread alive).
68    pub log_guard: LogGuard,
69    /// The data engine instance.
70    pub data_engine: Rc<RefCell<DataEngine>>,
71    /// The risk engine instance.
72    pub risk_engine: Rc<RefCell<RiskEngine>>,
73    /// The execution engine instance.
74    pub exec_engine: Rc<RefCell<ExecutionEngine>>,
75    /// The order emulator for handling emulated orders.
76    pub order_emulator: OrderEmulatorAdapter,
77    /// The trader component (shared for [`Controller`](crate::controller::Controller) access).
78    pub trader: Rc<RefCell<Trader>>,
79    /// The UNIX timestamp (nanoseconds) when the kernel was created.
80    pub ts_created: UnixNanos,
81    /// The UNIX timestamp (nanoseconds) when the kernel was last started.
82    pub ts_started: Option<UnixNanos>,
83    /// The UNIX timestamp (nanoseconds) when the kernel was last shutdown.
84    pub ts_shutdown: Option<UnixNanos>,
85    shutdown_requested: Rc<Cell<bool>>,
86}
87
88impl NautilusKernel {
89    /// Create a new [`NautilusKernelBuilder`] for fluent configuration.
90    #[must_use]
91    pub const fn builder(
92        name: String,
93        trader_id: TraderId,
94        environment: Environment,
95    ) -> NautilusKernelBuilder {
96        NautilusKernelBuilder::new(name, trader_id, environment)
97    }
98
99    /// Create a new [`NautilusKernel`] instance.
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if the kernel fails to initialize.
104    pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
105        let instance_id = config.instance_id().unwrap_or_default();
106        let machine_id = Self::determine_machine_id()?;
107
108        let logger_config = config.logging();
109        let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
110        headers::log_header(
111            config.trader_id(),
112            &machine_id,
113            instance_id,
114            Ustr::from(&name),
115        );
116
117        log::info!("Building system kernel");
118
119        let clock = Self::initialize_clock(&config.environment());
120        let cache = Self::initialize_cache(config.cache());
121
122        let msgbus = Rc::new(RefCell::new(MessageBus::new(
123            config.trader_id(),
124            instance_id,
125            Some(name.clone()),
126            None,
127        )));
128        set_message_bus(msgbus);
129
130        let portfolio = Rc::new(RefCell::new(Portfolio::new(
131            cache.clone(),
132            clock.clone(),
133            config.portfolio(),
134        )));
135
136        let risk_engine = RiskEngine::new(
137            config.risk_engine().unwrap_or_default(),
138            portfolio.borrow().clone_shallow(),
139            clock.clone(),
140            cache.clone(),
141        );
142        let risk_engine = Rc::new(RefCell::new(risk_engine));
143
144        let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
145        let exec_engine = Rc::new(RefCell::new(exec_engine));
146
147        let order_emulator =
148            OrderEmulatorAdapter::new(config.trader_id(), clock.clone(), cache.clone());
149
150        let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
151        let data_engine = Rc::new(RefCell::new(data_engine));
152
153        DataEngine::register_msgbus_handlers(&data_engine);
154        RiskEngine::register_msgbus_handlers(&risk_engine);
155        ExecutionEngine::register_msgbus_handlers(&exec_engine);
156
157        let shutdown_requested = Rc::new(Cell::new(false));
158        Self::register_shutdown_handler(config.trader_id(), shutdown_requested.clone());
159
160        let trader = Rc::new(RefCell::new(Trader::new(
161            config.trader_id(),
162            instance_id,
163            config.environment(),
164            clock.clone(),
165            cache.clone(),
166            portfolio.clone(),
167        )));
168
169        let ts_created = clock.borrow().timestamp_ns();
170
171        Ok(Self {
172            name,
173            instance_id,
174            machine_id,
175            config: Box::new(config),
176            cache,
177            clock,
178            portfolio,
179            log_guard,
180            data_engine,
181            risk_engine,
182            exec_engine,
183            order_emulator,
184            trader,
185            ts_created,
186            ts_started: None,
187            ts_shutdown: None,
188            shutdown_requested,
189        })
190    }
191
192    fn register_shutdown_handler(trader_id: TraderId, shutdown_requested: Rc<Cell<bool>>) {
193        let handler = ShareableMessageHandler::from_typed(move |cmd: &ShutdownSystem| {
194            if cmd.trader_id != trader_id {
195                log::warn!("Received {cmd} not for this trader {trader_id}, ignoring",);
196                return;
197            }
198
199            if shutdown_requested.get() {
200                log::debug!("Shutdown already requested, ignoring {cmd}");
201                return;
202            }
203
204            log::info!("Received {cmd}, requesting shutdown");
205            shutdown_requested.set(true);
206        });
207        let topic = MessagingSwitchboard::shutdown_system_topic();
208        msgbus::subscribe_any(topic.into(), handler, None);
209    }
210
211    fn determine_machine_id() -> anyhow::Result<String> {
212        sysinfo::System::host_name().ok_or_else(|| anyhow::anyhow!("Failed to determine hostname"))
213    }
214
215    fn initialize_logging(
216        trader_id: TraderId,
217        instance_id: UUID4,
218        config: LoggerConfig,
219    ) -> anyhow::Result<LogGuard> {
220        #[cfg(feature = "tracing-bridge")]
221        let use_tracing = config.use_tracing;
222
223        let log_guard = match init_logging(
224            trader_id,
225            instance_id,
226            config,
227            FileWriterConfig::default(), // TODO: Properly incorporate file writer config
228        ) {
229            Ok(guard) => guard,
230            Err(e) => {
231                // Only recover from SetLoggerError (logger already registered).
232                // This is common in tests where multiple kernels are created and
233                // the log crate's global logger persists after LogGuard teardown.
234                // Any other error (e.g. thread spawn failure) is propagated.
235                if e.downcast_ref::<log::SetLoggerError>().is_some() {
236                    if let Some(guard) = LogGuard::new() {
237                        guard
238                    } else {
239                        return Err(e.context(
240                            "A non-Nautilus logger is already registered; \
241                             cannot initialize Nautilus logging",
242                        ));
243                    }
244                } else {
245                    return Err(e);
246                }
247            }
248        };
249
250        // Initialize tracing subscriber if enabled (idempotent)
251        #[cfg(feature = "tracing-bridge")]
252        if use_tracing && !nautilus_common::logging::bridge::tracing_is_initialized() {
253            nautilus_common::logging::bridge::init_tracing()?;
254        }
255
256        Ok(log_guard)
257    }
258
259    fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
260        match environment {
261            Environment::Backtest => {
262                let test_clock = TestClock::new();
263                Rc::new(RefCell::new(test_clock))
264            }
265            #[cfg(feature = "live")]
266            Environment::Live | Environment::Sandbox => {
267                let live_clock = nautilus_common::live::clock::LiveClock::default(); // nautilus-import-ok
268                Rc::new(RefCell::new(live_clock))
269            }
270            #[cfg(not(feature = "live"))]
271            Environment::Live | Environment::Sandbox => {
272                panic!(
273                    "Live/Sandbox environment requires the 'live' feature to be enabled. \
274                     Build with `--features live` or add `features = [\"live\"]` to your dependency."
275                );
276            }
277        }
278    }
279
280    fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
281        let cache_config = cache_config.unwrap_or_default();
282
283        // TODO: Placeholder: persistent database adapter can be initialized here (e.g., Redis)
284        let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
285        let cache = Cache::new(Some(cache_config), cache_database);
286
287        Rc::new(RefCell::new(cache))
288    }
289
290    fn cancel_timers(&self) {
291        self.clock.borrow_mut().cancel_timers();
292    }
293
294    #[must_use]
295    pub fn generate_timestamp_ns(&self) -> UnixNanos {
296        self.clock.borrow().timestamp_ns()
297    }
298
299    /// Returns the kernel's environment context (Backtest, Sandbox, Live).
300    #[must_use]
301    pub fn environment(&self) -> Environment {
302        self.config.environment()
303    }
304
305    /// Returns the kernel's name.
306    #[must_use]
307    pub const fn name(&self) -> &str {
308        self.name.as_str()
309    }
310
311    /// Returns the kernel's trader ID.
312    #[must_use]
313    pub fn trader_id(&self) -> TraderId {
314        self.config.trader_id()
315    }
316
317    /// Returns the kernel's machine ID.
318    #[must_use]
319    pub fn machine_id(&self) -> &str {
320        &self.machine_id
321    }
322
323    /// Returns the kernel's instance ID.
324    #[must_use]
325    pub const fn instance_id(&self) -> UUID4 {
326        self.instance_id
327    }
328
329    /// Returns the delay after stopping the node to await residual events before final shutdown.
330    #[must_use]
331    pub fn delay_post_stop(&self) -> Duration {
332        self.config.delay_post_stop()
333    }
334
335    /// Returns the UNIX timestamp (ns) when the kernel was created.
336    #[must_use]
337    pub const fn ts_created(&self) -> UnixNanos {
338        self.ts_created
339    }
340
341    /// Returns the UNIX timestamp (ns) when the kernel was last started.
342    #[must_use]
343    pub const fn ts_started(&self) -> Option<UnixNanos> {
344        self.ts_started
345    }
346
347    /// Returns the UNIX timestamp (ns) when the kernel was last shutdown.
348    #[must_use]
349    pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
350        self.ts_shutdown
351    }
352
353    /// Returns `true` if a `ShutdownSystem` command has been received.
354    #[must_use]
355    pub fn is_shutdown_requested(&self) -> bool {
356        self.shutdown_requested.get()
357    }
358
359    /// Clears the shutdown flag.
360    ///
361    /// Call this before starting a fresh run so a prior `ShutdownSystem`
362    /// command does not abort it.
363    pub fn reset_shutdown_flag(&self) {
364        self.shutdown_requested.set(false);
365    }
366
367    /// Returns a shared handle to the shutdown flag for async runtimes
368    /// that need to poll it outside the kernel's direct borrow.
369    #[must_use]
370    pub fn shutdown_flag(&self) -> Rc<Cell<bool>> {
371        self.shutdown_requested.clone()
372    }
373
374    /// Returns whether the kernel has been configured to load state.
375    #[must_use]
376    pub fn load_state(&self) -> bool {
377        self.config.load_state()
378    }
379
380    /// Returns whether the kernel has been configured to save state.
381    #[must_use]
382    pub fn save_state(&self) -> bool {
383        self.config.save_state()
384    }
385
386    /// Returns the kernel's clock.
387    #[must_use]
388    pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
389        self.clock.clone()
390    }
391
392    /// Returns the kernel's cache.
393    #[must_use]
394    pub fn cache(&self) -> Rc<RefCell<Cache>> {
395        self.cache.clone()
396    }
397
398    /// Returns the kernel's portfolio.
399    #[must_use]
400    pub fn portfolio(&self) -> Ref<'_, Portfolio> {
401        self.portfolio.borrow()
402    }
403
404    /// Returns the kernel's data engine.
405    #[must_use]
406    pub fn data_engine(&self) -> Ref<'_, DataEngine> {
407        self.data_engine.borrow()
408    }
409
410    /// Returns the kernel's risk engine.
411    #[must_use]
412    pub const fn risk_engine(&self) -> &Rc<RefCell<RiskEngine>> {
413        &self.risk_engine
414    }
415
416    /// Returns the kernel's execution engine.
417    #[must_use]
418    pub const fn exec_engine(&self) -> &Rc<RefCell<ExecutionEngine>> {
419        &self.exec_engine
420    }
421
422    /// Returns the kernel's trader (shared reference).
423    #[must_use]
424    pub fn trader(&self) -> &Rc<RefCell<Trader>> {
425        &self.trader
426    }
427
428    /// Starts the Nautilus system kernel synchronously (for backtest use).
429    pub fn start(&mut self) {
430        log::info!("Starting");
431        self.start_engines();
432
433        log::info!("Initializing trader");
434        if let Err(e) = self.trader.borrow_mut().initialize() {
435            log::error!("Error initializing trader: {e:?}");
436            return;
437        }
438
439        log::info!("Starting clients...");
440
441        if let Err(e) = self.start_clients() {
442            log::error!("Error starting clients: {e:?}");
443        }
444        log::info!("Clients started");
445
446        self.ts_started = Some(self.clock.borrow().timestamp_ns());
447        log::info!("Started");
448    }
449
450    /// Starts the Nautilus system kernel asynchronously.
451    pub async fn start_async(&mut self) {
452        self.start();
453    }
454
455    /// Starts the trader (strategies and actors).
456    ///
457    /// This should be called after clients are connected and instruments are cached.
458    pub fn start_trader(&mut self) {
459        log::info!("Starting trader...");
460        if let Err(e) = self.trader.borrow_mut().start() {
461            log::error!("Error starting trader: {e:?}");
462        }
463        log::info!("Trader started");
464    }
465
466    /// Stops the trader and its registered components.
467    ///
468    /// This method initiates a graceful shutdown of trading components (strategies, actors)
469    /// which may trigger residual events such as order cancellations. The caller should
470    /// continue processing events after calling this method to handle these residual events.
471    pub fn stop_trader(&mut self) {
472        if !self.trader.borrow().is_running() {
473            return;
474        }
475
476        log::info!("Stopping trader...");
477
478        if let Err(e) = self.trader.borrow_mut().stop() {
479            log::error!("Error stopping trader: {e}");
480        }
481    }
482
483    /// Finalizes the kernel shutdown after the grace period.
484    ///
485    /// This method should be called after the residual events grace period has elapsed
486    /// and all remaining events have been processed. It disconnects clients and stops engines.
487    pub async fn finalize_stop(&mut self) {
488        // Stop all adapter clients
489        if let Err(e) = self.stop_all_clients() {
490            log::error!("Error stopping clients: {e:?}");
491        }
492
493        self.stop_engines();
494        self.cancel_timers();
495
496        self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
497        log::info!("Stopped");
498    }
499
500    /// Resets the Nautilus system kernel to its initial state.
501    pub fn reset(&mut self) {
502        log::info!("Resetting");
503
504        if let Err(e) = self.trader.borrow_mut().reset() {
505            log::error!("Error resetting trader: {e:?}");
506        }
507
508        self.data_engine.borrow_mut().reset();
509        self.exec_engine.borrow_mut().reset();
510        self.risk_engine.borrow_mut().reset();
511
512        self.ts_started = None;
513        self.ts_shutdown = None;
514
515        log::info!("Reset");
516    }
517
518    /// Disposes of the Nautilus system kernel, releasing resources.
519    pub fn dispose(&mut self) {
520        log::info!("Disposing");
521
522        if let Err(e) = self.trader.borrow_mut().dispose() {
523            log::error!("Error disposing trader: {e:?}");
524        }
525
526        self.stop_engines();
527
528        self.data_engine.borrow_mut().dispose();
529        self.exec_engine.borrow_mut().dispose();
530        self.risk_engine.borrow_mut().dispose();
531        self.cache.borrow_mut().dispose();
532        get_message_bus().borrow_mut().dispose();
533
534        log::info!("Disposed");
535    }
536
537    /// Starts all engine components.
538    fn start_engines(&self) {
539        self.data_engine.borrow_mut().start();
540        self.exec_engine.borrow_mut().start();
541        self.risk_engine.borrow_mut().start();
542    }
543
544    /// Stops all engine components.
545    fn stop_engines(&self) {
546        self.data_engine.borrow_mut().stop();
547        self.exec_engine.borrow_mut().stop();
548        self.risk_engine.borrow_mut().stop();
549    }
550
551    /// Starts all engine clients.
552    ///
553    /// Note: Async connection (connect/disconnect) is handled by LiveNode for live clients.
554    /// This method only handles synchronous start operations on execution clients.
555    fn start_clients(&self) -> Result<(), Vec<anyhow::Error>> {
556        let mut errors = Vec::new();
557
558        {
559            let mut exec_engine = self.exec_engine.borrow_mut();
560            let exec_adapters = exec_engine.get_clients_mut();
561
562            for adapter in exec_adapters {
563                if let Err(e) = adapter.start() {
564                    log::error!("Error starting execution client {}: {e}", adapter.client_id);
565                    errors.push(e);
566                }
567            }
568        }
569
570        if errors.is_empty() {
571            Ok(())
572        } else {
573            Err(errors)
574        }
575    }
576
577    /// Stops all engine clients.
578    ///
579    /// Note: Async disconnection is handled by LiveNode for live clients.
580    /// This method only handles synchronous stop operations on execution clients.
581    fn stop_all_clients(&self) -> Result<(), Vec<anyhow::Error>> {
582        let mut errors = Vec::new();
583
584        {
585            let mut exec_engine = self.exec_engine.borrow_mut();
586            let exec_adapters = exec_engine.get_clients_mut();
587
588            for adapter in exec_adapters {
589                if let Err(e) = adapter.stop() {
590                    log::error!("Error stopping execution client {}: {e}", adapter.client_id);
591                    errors.push(e);
592                }
593            }
594        }
595
596        if errors.is_empty() {
597            Ok(())
598        } else {
599            Err(errors)
600        }
601    }
602
603    /// Connects data engine clients.
604    ///
605    /// Data clients are connected first so that instruments are published
606    /// and can be drained into the cache before execution clients connect.
607    #[expect(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
608    pub async fn connect_data_clients(&mut self) {
609        log::info!("Connecting data clients...");
610        self.data_engine.borrow_mut().connect().await;
611    }
612
613    /// Connects execution engine clients.
614    ///
615    /// Must be called after data clients are connected and instrument events
616    /// have been drained into the cache, so execution clients can load instruments.
617    #[expect(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
618    pub async fn connect_exec_clients(&mut self) {
619        log::info!("Connecting execution clients...");
620        self.exec_engine.borrow_mut().connect().await;
621    }
622
623    /// Disconnects all engine clients.
624    ///
625    /// # Errors
626    ///
627    /// Returns an error if any client fails to disconnect.
628    #[expect(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
629    pub async fn disconnect_clients(&mut self) -> anyhow::Result<()> {
630        log::info!("Disconnecting clients...");
631        self.data_engine.borrow_mut().disconnect().await?;
632        self.exec_engine.borrow_mut().disconnect().await?;
633        Ok(())
634    }
635
636    /// Returns `true` if all engine clients are connected.
637    #[must_use]
638    pub fn check_engines_connected(&self) -> bool {
639        self.data_engine.borrow().check_connected() && self.exec_engine.borrow().check_connected()
640    }
641
642    /// Returns `true` if all engine clients are disconnected.
643    #[must_use]
644    pub fn check_engines_disconnected(&self) -> bool {
645        self.data_engine.borrow().check_disconnected()
646            && self.exec_engine.borrow().check_disconnected()
647    }
648
649    /// Returns connection status for all data clients.
650    #[must_use]
651    pub fn data_client_connection_status(&self) -> Vec<(ClientId, bool)> {
652        self.data_engine.borrow().client_connection_status()
653    }
654
655    /// Returns connection status for all execution clients.
656    #[must_use]
657    pub fn exec_client_connection_status(&self) -> Vec<(ClientId, bool)> {
658        self.exec_engine.borrow().client_connection_status()
659    }
660}
661
662#[cfg(all(test, feature = "python"))]
663mod tests {
664    use nautilus_common::messages::system::ShutdownSystem;
665    use nautilus_core::UUID4;
666    use rstest::*;
667    use ustr::Ustr;
668
669    use super::*;
670    use crate::builder::NautilusKernelBuilder;
671
672    #[rstest]
673    fn test_shutdown_system_sets_kernel_flag() {
674        let kernel = NautilusKernelBuilder::default().build().unwrap();
675        assert!(!kernel.is_shutdown_requested());
676
677        let command = ShutdownSystem::new(
678            kernel.trader_id(),
679            Ustr::from("TestComponent"),
680            Some("unit test".to_string()),
681            UUID4::new(),
682            kernel.generate_timestamp_ns(),
683        );
684
685        msgbus::publish_any(
686            MessagingSwitchboard::shutdown_system_topic(),
687            command.as_any(),
688        );
689        assert!(kernel.is_shutdown_requested());
690
691        kernel.reset_shutdown_flag();
692        assert!(!kernel.is_shutdown_requested());
693    }
694
695    #[rstest]
696    fn test_shutdown_system_idempotent() {
697        let kernel = NautilusKernelBuilder::default().build().unwrap();
698
699        let make_cmd = || {
700            ShutdownSystem::new(
701                kernel.trader_id(),
702                Ustr::from("TestComponent"),
703                None,
704                UUID4::new(),
705                kernel.generate_timestamp_ns(),
706            )
707        };
708
709        let topic = MessagingSwitchboard::shutdown_system_topic();
710        msgbus::publish_any(topic, make_cmd().as_any());
711        assert!(kernel.is_shutdown_requested());
712
713        msgbus::publish_any(topic, make_cmd().as_any());
714        assert!(kernel.is_shutdown_requested());
715
716        kernel.reset_shutdown_flag();
717        assert!(!kernel.is_shutdown_requested());
718
719        msgbus::publish_any(topic, make_cmd().as_any());
720        assert!(kernel.is_shutdown_requested());
721    }
722
723    #[rstest]
724    fn test_shutdown_system_ignores_other_trader() {
725        let kernel = NautilusKernelBuilder::default().build().unwrap();
726
727        let command = ShutdownSystem::new(
728            TraderId::from("OTHER-TRADER"),
729            Ustr::from("TestComponent"),
730            None,
731            UUID4::new(),
732            kernel.generate_timestamp_ns(),
733        );
734
735        msgbus::publish_any(
736            MessagingSwitchboard::shutdown_system_topic(),
737            command.as_any(),
738        );
739        assert!(!kernel.is_shutdown_requested());
740    }
741}