Skip to main content

nautilus_live/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live trading node built on a single-threaded tokio event loop.
17//!
18//! `LiveNode::run()` drives the system through a `tokio::select!` loop that
19//! multiplexes data events, execution events, trading commands, timers, and
20//! periodic maintenance tasks (reconciliation, purge, prune, audit).
21//!
22//! # Threading model
23//!
24//! The core types (`ExecutionManager`, `ExecutionEngine`, `Cache`) use
25//! `Rc<RefCell<..>>` and are `!Send`. All access happens on the same thread.
26//! The `select!` macro runs one branch to completion (including inner awaits)
27//! before polling the next, so `RefCell` borrows held across `.await` points
28//! within a single branch cannot conflict with borrows in other branches.
29//!
30//! # Startup sequencing
31//!
32//! Startup connects clients in two phases so that instruments are in the
33//! cache before execution clients read them:
34//!
35//! 1. Connect data clients (instruments arrive as buffered `DataEvent`s).
36//! 2. Flush all pending data events and commands into the cache via
37//!    `flush_pending_data`, which loops `try_recv` on the channel receivers
38//!    until no items remain.
39//! 3. Connect execution clients (`load_instruments_from_cache` now finds
40//!    populated instruments).
41//! 4. Drain remaining events, then run reconciliation.
42//!
43//! Both `run()` (integrated event loop) and `start()` (manual lifecycle)
44//! follow this sequence.
45//!
46//! # Reconciliation
47//!
48//! Three sub-checks run on independent intervals: inflight orders, open order
49//! consistency, and position consistency. A single reconciliation timer fires
50//! at the minimum enabled interval. Each tick, the handler checks which
51//! sub-checks are due based on elapsed nanoseconds and runs them in sequence.
52//! The open order and position checks query venues via async HTTP calls,
53//! blocking the select loop for the duration of each query.
54
55use std::{
56    fmt::Debug,
57    sync::{
58        Arc,
59        atomic::{AtomicBool, AtomicU8, Ordering},
60    },
61    time::Duration,
62};
63
64use nautilus_common::{
65    actor::{Actor, DataActor},
66    cache::database::CacheDatabaseAdapter,
67    component::Component,
68    enums::{Environment, LogColor},
69    live::dst,
70    log_info,
71    messages::{
72        DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
73    },
74    timer::TimeEventHandler,
75};
76use nautilus_core::{
77    UUID4, UnixNanos,
78    datetime::{NANOSECONDS_IN_MILLISECOND, mins_to_secs, secs_to_nanos_unchecked},
79};
80use nautilus_model::{
81    events::OrderEventAny,
82    identifiers::{ClientOrderId, StrategyId, TraderId},
83    orders::Order,
84};
85use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
86use nautilus_trading::{ExecutionAlgorithm, strategy::Strategy};
87use tabled::{Table, Tabled, settings::Style};
88
89use crate::{
90    builder::LiveNodeBuilder,
91    config::LiveNodeConfig,
92    manager::{ExecutionManager, ExecutionManagerConfig},
93    runner::{AsyncRunner, AsyncRunnerChannels},
94};
95
96/// Lifecycle state of the `LiveNode` runner.
97#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
98#[repr(u8)]
99pub enum NodeState {
100    #[default]
101    Idle = 0,
102    Starting = 1,
103    Running = 2,
104    ShuttingDown = 3,
105    Stopped = 4,
106}
107
108impl NodeState {
109    /// Creates a `NodeState` from its `u8` representation.
110    ///
111    /// # Panics
112    ///
113    /// Panics if the value is not a valid `NodeState` discriminant (0-4).
114    #[must_use]
115    pub const fn from_u8(value: u8) -> Self {
116        match value {
117            0 => Self::Idle,
118            1 => Self::Starting,
119            2 => Self::Running,
120            3 => Self::ShuttingDown,
121            4 => Self::Stopped,
122            _ => panic!("Invalid NodeState value"),
123        }
124    }
125
126    /// Returns the `u8` representation of this state.
127    #[must_use]
128    pub const fn as_u8(self) -> u8 {
129        self as u8
130    }
131
132    /// Returns whether the state is `Running`.
133    #[must_use]
134    pub const fn is_running(&self) -> bool {
135        matches!(self, Self::Running)
136    }
137}
138
139/// A thread-safe handle to control a `LiveNode` from other threads.
140///
141/// This allows stopping and querying the node's state without requiring the
142/// node itself to be Send + Sync.
143#[derive(Clone, Debug)]
144pub struct LiveNodeHandle {
145    /// Atomic flag indicating if the node should stop.
146    pub(crate) stop_flag: Arc<AtomicBool>,
147    /// Atomic state as `NodeState::as_u8()`.
148    pub(crate) state: Arc<AtomicU8>,
149}
150
151impl Default for LiveNodeHandle {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156
157impl LiveNodeHandle {
158    /// Creates a new handle with default (`Idle`) state.
159    #[must_use]
160    pub fn new() -> Self {
161        Self {
162            stop_flag: Arc::new(AtomicBool::new(false)),
163            state: Arc::new(AtomicU8::new(NodeState::Idle.as_u8())),
164        }
165    }
166
167    /// Sets the node state (internal use).
168    pub(crate) fn set_state(&self, state: NodeState) {
169        self.state.store(state.as_u8(), Ordering::Relaxed);
170        if state == NodeState::Running {
171            // Clear stop flag when entering running state
172            self.stop_flag.store(false, Ordering::Relaxed);
173        }
174    }
175
176    /// Returns the current node state.
177    #[must_use]
178    pub fn state(&self) -> NodeState {
179        NodeState::from_u8(self.state.load(Ordering::Relaxed))
180    }
181
182    /// Returns whether the node should stop.
183    #[must_use]
184    pub fn should_stop(&self) -> bool {
185        self.stop_flag.load(Ordering::Relaxed)
186    }
187
188    /// Returns whether the node is currently running.
189    #[must_use]
190    pub fn is_running(&self) -> bool {
191        self.state().is_running()
192    }
193
194    /// Signals the node to stop.
195    pub fn stop(&self) {
196        self.stop_flag.store(true, Ordering::Relaxed);
197    }
198}
199
200/// High-level abstraction for a live Nautilus system node.
201///
202/// Provides a simplified interface for running live systems
203/// with automatic client management and lifecycle handling.
204#[derive(Debug)]
205#[cfg_attr(
206    feature = "python",
207    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
208)]
209#[cfg_attr(
210    feature = "python",
211    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")
212)]
213pub struct LiveNode {
214    kernel: NautilusKernel,
215    runner: Option<AsyncRunner>,
216    config: LiveNodeConfig,
217    handle: LiveNodeHandle,
218    exec_manager: ExecutionManager,
219    shutdown_deadline: Option<dst::time::Instant>,
220    #[cfg(feature = "python")]
221    #[allow(dead_code)] // TODO: Under development
222    python_actors: Vec<pyo3::Py<pyo3::PyAny>>,
223}
224
225impl LiveNode {
226    /// Creates a new `LiveNode` from builder components.
227    ///
228    /// This is an internal constructor used by `LiveNodeBuilder`.
229    #[must_use]
230    pub(crate) fn new_from_builder(
231        kernel: NautilusKernel,
232        runner: AsyncRunner,
233        config: LiveNodeConfig,
234        exec_manager: ExecutionManager,
235    ) -> Self {
236        Self {
237            kernel,
238            runner: Some(runner),
239            config,
240            handle: LiveNodeHandle::new(),
241            exec_manager,
242            shutdown_deadline: None,
243            #[cfg(feature = "python")]
244            python_actors: Vec::new(),
245        }
246    }
247
248    /// Creates a new [`LiveNodeBuilder`] for fluent configuration.
249    ///
250    /// # Errors
251    ///
252    /// Returns an error if the environment is invalid for live trading.
253    pub fn builder(
254        trader_id: TraderId,
255        environment: Environment,
256    ) -> anyhow::Result<LiveNodeBuilder> {
257        LiveNodeBuilder::new(trader_id, environment)
258    }
259
260    /// Creates a new [`LiveNode`] directly from a kernel name and optional configuration.
261    ///
262    /// This is a convenience method for creating a live node with a pre-configured
263    /// kernel configuration, bypassing the builder pattern. If no config is provided,
264    /// a default configuration will be used.
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if kernel construction fails.
269    pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
270        let mut config = config.unwrap_or_default();
271        config.environment = Environment::Live;
272
273        match config.environment() {
274            Environment::Sandbox | Environment::Live => {}
275            Environment::Backtest => {
276                anyhow::bail!("LiveNode cannot be used with Backtest environment");
277            }
278        }
279
280        config.validate_runtime_support()?;
281
282        let runner = AsyncRunner::new();
283        runner.bind_senders();
284
285        let kernel = NautilusKernel::new(name, config.clone())?;
286
287        let exec_manager_config =
288            ExecutionManagerConfig::from(&config.exec_engine).with_trader_id(config.trader_id);
289        let exec_manager = ExecutionManager::new(
290            kernel.clock.clone(),
291            kernel.cache.clone(),
292            exec_manager_config,
293        );
294
295        log::info!("LiveNode built successfully with kernel config");
296
297        Ok(Self {
298            kernel,
299            runner: Some(runner),
300            config,
301            handle: LiveNodeHandle::new(),
302            exec_manager,
303            shutdown_deadline: None,
304            #[cfg(feature = "python")]
305            python_actors: Vec::new(),
306        })
307    }
308
309    /// Returns a thread-safe handle to control this node.
310    #[must_use]
311    pub fn handle(&self) -> LiveNodeHandle {
312        self.handle.clone()
313    }
314
315    /// Starts the live node without entering a select loop.
316    ///
317    /// Connects clients, runs reconciliation, and starts the trader, but does
318    /// not consume the runner or drive channel receivers. Channel traffic that
319    /// arrives after startup is not serviced until the caller provides a loop.
320    ///
321    /// For a self-contained entry point that owns the event loop, use [`run`](Self::run).
322    ///
323    /// # Errors
324    ///
325    /// Returns an error if startup fails.
326    pub async fn start(&mut self) -> anyhow::Result<()> {
327        if self.state().is_running() {
328            anyhow::bail!("Already running");
329        }
330
331        if let Some(runner) = self.runner.as_ref() {
332            runner.bind_senders();
333        }
334
335        self.handle.set_state(NodeState::Starting);
336
337        self.kernel.start_async().await;
338
339        // Connect data clients first and flush instrument events into cache
340        self.kernel.connect_data_clients().await;
341
342        if let Some(runner) = self.runner.as_mut() {
343            runner.flush_pending_data();
344        }
345
346        self.kernel.connect_exec_clients().await;
347
348        if !self.await_engines_connected().await {
349            log::error!("Cannot start trader: engine client(s) not connected");
350            self.handle.set_state(NodeState::Running);
351            return Ok(());
352        }
353
354        self.perform_startup_reconciliation().await?;
355
356        self.kernel.start_trader();
357
358        self.handle.set_state(NodeState::Running);
359
360        Ok(())
361    }
362
363    /// Stop the live node.
364    ///
365    /// This method stops the trader, waits for the configured grace period to allow
366    /// residual events to be processed, then finalizes the shutdown sequence.
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if shutdown fails.
371    pub async fn stop(&mut self) -> anyhow::Result<()> {
372        if !self.state().is_running() {
373            anyhow::bail!("Not running");
374        }
375
376        self.handle.set_state(NodeState::ShuttingDown);
377
378        self.kernel.stop_trader();
379        let delay = self.kernel.delay_post_stop();
380        log::info!("Awaiting residual events ({delay:?})...");
381
382        dst::time::sleep(delay).await;
383        self.finalize_stop().await
384    }
385
386    /// Awaits engine clients to connect with timeout.
387    ///
388    /// Returns `true` if all engines connected, `false` if timed out.
389    async fn await_engines_connected(&self) -> bool {
390        log::info!(
391            "Awaiting engine connections ({:?} timeout)...",
392            self.config.timeout_connection
393        );
394
395        let start = dst::time::Instant::now();
396        let timeout = self.config.timeout_connection;
397        let interval = Duration::from_millis(100);
398
399        while start.elapsed() < timeout {
400            if self.kernel.check_engines_connected() {
401                log::info!("All engine clients connected");
402                return true;
403            }
404            dst::time::sleep(interval).await;
405        }
406
407        self.log_connection_status();
408        false
409    }
410
411    /// Awaits engine clients to disconnect with timeout.
412    ///
413    /// Logs an error with client status on timeout but does not fail.
414    async fn await_engines_disconnected(&self) {
415        log::info!(
416            "Awaiting engine disconnections ({:?} timeout)...",
417            self.config.timeout_disconnection
418        );
419
420        let start = dst::time::Instant::now();
421        let timeout = self.config.timeout_disconnection;
422        let interval = Duration::from_millis(100);
423
424        while start.elapsed() < timeout {
425            if self.kernel.check_engines_disconnected() {
426                log::info!("All engine clients disconnected");
427                return;
428            }
429            dst::time::sleep(interval).await;
430        }
431
432        log::error!(
433            "Timed out ({:?}) waiting for engines to disconnect\n\
434             DataEngine.check_disconnected() == {}\n\
435             ExecEngine.check_disconnected() == {}",
436            timeout,
437            self.kernel.data_engine().check_disconnected(),
438            self.kernel.exec_engine().borrow().check_disconnected(),
439        );
440    }
441
442    fn log_connection_status(&self) {
443        #[derive(Tabled)]
444        struct ClientStatus {
445            #[tabled(rename = "Client")]
446            client: String,
447            #[tabled(rename = "Type")]
448            client_type: &'static str,
449            #[tabled(rename = "Connected")]
450            connected: bool,
451        }
452
453        let data_status = self.kernel.data_client_connection_status();
454        let exec_status = self.kernel.exec_client_connection_status();
455
456        let mut rows: Vec<ClientStatus> = Vec::new();
457
458        for (client_id, connected) in data_status {
459            rows.push(ClientStatus {
460                client: client_id.to_string(),
461                client_type: "Data",
462                connected,
463            });
464        }
465
466        for (client_id, connected) in exec_status {
467            rows.push(ClientStatus {
468                client: client_id.to_string(),
469                client_type: "Execution",
470                connected,
471            });
472        }
473
474        let table = Table::new(&rows).with(Style::rounded()).to_string();
475
476        log::warn!(
477            "Timed out ({:?}) waiting for engines to connect\n\n{table}\n\n\
478             DataEngine.check_connected() == {}\n\
479             ExecEngine.check_connected() == {}",
480            self.config.timeout_connection,
481            self.kernel.data_engine().check_connected(),
482            self.kernel.exec_engine().borrow().check_connected(),
483        );
484    }
485
486    /// Performs startup reconciliation to align internal state with venue state.
487    ///
488    /// This method queries each execution client for mass status (orders, fills, positions)
489    /// and reconciles any discrepancies with the local cache state.
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if reconciliation fails or times out.
494    #[expect(clippy::await_holding_refcell_ref)] // Single-threaded runtime, intentional design
495    async fn perform_startup_reconciliation(&mut self) -> anyhow::Result<()> {
496        if !self.config.exec_engine.reconciliation {
497            log::info!("Startup reconciliation disabled");
498            return Ok(());
499        }
500
501        log_info!(
502            "Starting execution state reconciliation...",
503            color = LogColor::Blue
504        );
505
506        let lookback_mins = self
507            .config
508            .exec_engine
509            .reconciliation_lookback_mins
510            .map(|m| m as u64);
511
512        let timeout = self.config.timeout_reconciliation;
513        let start = dst::time::Instant::now();
514        let client_ids = self.kernel.exec_engine.borrow().client_ids();
515
516        for client_id in client_ids {
517            if start.elapsed() > timeout {
518                log::warn!("Reconciliation timeout reached, stopping early");
519                break;
520            }
521
522            log_info!(
523                "Requesting mass status from {}...",
524                client_id,
525                color = LogColor::Blue
526            );
527
528            let mass_status_result = self
529                .kernel
530                .exec_engine
531                .borrow_mut()
532                .generate_mass_status(&client_id, lookback_mins)
533                .await;
534
535            match mass_status_result {
536                Ok(Some(mass_status)) => {
537                    log_info!(
538                        "Reconciling ExecutionMassStatus for {}",
539                        client_id,
540                        color = LogColor::Blue
541                    );
542
543                    let exec_engine_rc = self.kernel.exec_engine.clone();
544
545                    let result = self
546                        .exec_manager
547                        .reconcile_execution_mass_status(mass_status, exec_engine_rc)
548                        .await;
549
550                    if result.events.is_empty() {
551                        log_info!(
552                            "Reconciliation for {} succeeded",
553                            client_id,
554                            color = LogColor::Blue
555                        );
556                    } else {
557                        log::info!(
558                            color = LogColor::Blue as u8;
559                            "Reconciliation for {} processed {} events",
560                            client_id,
561                            result.events.len()
562                        );
563                    }
564
565                    // Register external orders with execution clients for tracking
566                    if !result.external_orders.is_empty() {
567                        let exec_engine = self.kernel.exec_engine.borrow();
568                        for external in result.external_orders {
569                            exec_engine.register_external_order(
570                                external.client_order_id,
571                                external.venue_order_id,
572                                external.instrument_id,
573                                external.strategy_id,
574                                external.ts_init,
575                            );
576                        }
577                    }
578                }
579                Ok(None) => {
580                    log::warn!(
581                        "No mass status available from {client_id} \
582                         (likely adapter error when generating reports)"
583                    );
584                }
585                Err(e) => {
586                    log::warn!("Failed to get mass status from {client_id}: {e}");
587                }
588            }
589        }
590
591        self.kernel.portfolio.borrow_mut().initialize_orders();
592        self.kernel.portfolio.borrow_mut().initialize_positions();
593
594        let elapsed_secs = start.elapsed().as_secs_f64();
595        log_info!(
596            "Startup reconciliation completed in {:.2}s",
597            elapsed_secs,
598            color = LogColor::Blue
599        );
600
601        Ok(())
602    }
603
604    /// Run the live node with automatic shutdown handling.
605    ///
606    /// This method starts the node, runs indefinitely, and handles graceful shutdown
607    /// on interrupt signals.
608    ///
609    /// # Thread Safety
610    ///
611    /// The event loop runs directly on the current thread (not spawned) because the
612    /// msgbus uses thread-local storage. Endpoints registered by the kernel are only
613    /// accessible from the same thread.
614    ///
615    /// # Shutdown Sequence
616    ///
617    /// 1. Signal received (SIGINT or handle stop).
618    /// 2. Trader components stopped (triggers order cancellations, etc.).
619    /// 3. Event loop continues processing residual events for the configured grace period.
620    /// 4. Kernel finalized, clients disconnected, remaining events drained.
621    ///
622    /// # Errors
623    ///
624    /// Returns an error if the node fails to start or encounters a runtime error.
625    pub async fn run(&mut self) -> anyhow::Result<()> {
626        if self.state().is_running() {
627            anyhow::bail!("Already running");
628        }
629
630        let Some(runner) = self.runner.take() else {
631            anyhow::bail!("Runner already consumed - run() called twice");
632        };
633        runner.bind_senders();
634
635        let AsyncRunnerChannels {
636            mut time_evt_rx,
637            mut data_evt_rx,
638            mut data_cmd_rx,
639            mut exec_evt_rx,
640            mut exec_cmd_rx,
641        } = runner.take_channels();
642
643        log::info!("Event loop starting");
644
645        self.handle.set_state(NodeState::Starting);
646        self.kernel.start_async().await;
647        self.kernel.reset_shutdown_flag();
648
649        let stop_handle = self.handle.clone();
650        let shutdown_flag = self.kernel.shutdown_flag();
651        let mut pending = PendingEvents::default();
652
653        // Startup phase 1: Connect data clients and drain instrument events into cache.
654        // This ensures the cache is populated before execution clients connect.
655        // TODO: Add ctrl_c and stop_handle monitoring here to allow aborting a
656        // hanging startup. Currently signals during startup are ignored, and
657        // any pending stop_flag is cleared when transitioning to Running.
658        drive_with_event_buffering(
659            self.kernel.connect_data_clients(),
660            &mut pending,
661            &mut time_evt_rx,
662            &mut data_evt_rx,
663            &mut data_cmd_rx,
664            &mut exec_evt_rx,
665            &mut exec_cmd_rx,
666        )
667        .await;
668
669        // Flush any data events still queued in the channel receivers that the
670        // select loop did not capture before the connect future resolved, then
671        // drain everything into cache.
672        flush_pending_data(&mut pending, &mut data_evt_rx, &mut data_cmd_rx);
673        debug_assert!(
674            pending.data_evts.is_empty() && pending.data_cmds.is_empty(),
675            "data must be drained into cache before exec clients connect",
676        );
677
678        // Startup phase 2: Connect execution clients (instruments now in cache)
679        let engines_connected = drive_with_event_buffering(
680            self.connect_exec_phase(),
681            &mut pending,
682            &mut time_evt_rx,
683            &mut data_evt_rx,
684            &mut data_cmd_rx,
685            &mut exec_evt_rx,
686            &mut exec_cmd_rx,
687        )
688        .await?;
689
690        // Flush channel receivers and drain all remaining pending events
691        flush_all_pending(
692            &mut pending,
693            &mut time_evt_rx,
694            &mut data_evt_rx,
695            &mut data_cmd_rx,
696            &mut exec_evt_rx,
697            &mut exec_cmd_rx,
698        );
699        debug_assert!(
700            pending.is_empty(),
701            "all startup events must be processed before reconciliation",
702        );
703
704        if engines_connected {
705            // Run reconciliation now that instruments are in cache and start trader
706            self.perform_startup_reconciliation().await?;
707            self.kernel.start_trader();
708        } else {
709            log::error!("Not starting trader: engine client(s) not connected");
710        }
711
712        self.handle.set_state(NodeState::Running);
713
714        let exec_config = &self.config.exec_engine;
715        let inflight_interval_ns =
716            (exec_config.inflight_check_interval_ms as u64) * NANOSECONDS_IN_MILLISECOND;
717        let open_interval_ns = exec_config
718            .open_check_interval_secs
719            .filter(|&s| s > 0.0)
720            .map_or(0, secs_to_nanos_unchecked);
721        let position_interval_ns = exec_config
722            .position_check_interval_secs
723            .filter(|&s| s > 0.0)
724            .map_or(0, secs_to_nanos_unchecked);
725        let has_clients = !self
726            .kernel
727            .exec_engine
728            .borrow()
729            .get_all_clients()
730            .is_empty();
731        let recon_enabled = has_clients
732            && (inflight_interval_ns > 0 || open_interval_ns > 0 || position_interval_ns > 0);
733
734        let recon_min_interval = if recon_enabled {
735            let mut intervals = Vec::new();
736
737            if exec_config.inflight_check_interval_ms > 0 {
738                intervals.push(Duration::from_millis(
739                    exec_config.inflight_check_interval_ms as u64,
740                ));
741            }
742
743            if let Some(s) = exec_config.open_check_interval_secs.filter(|&s| s > 0.0) {
744                intervals.push(Duration::from_secs_f64(s));
745            }
746
747            if let Some(s) = exec_config
748                .position_check_interval_secs
749                .filter(|&s| s > 0.0)
750            {
751                intervals.push(Duration::from_secs_f64(s));
752            }
753            intervals
754                .into_iter()
755                .min()
756                .unwrap_or(Duration::from_secs(1))
757        } else {
758            Duration::from_secs(1) // Unused, timer won't fire
759        };
760
761        // `reconciliation_startup_delay_secs` is a post-reconciliation grace period:
762        // startup reconciliation has already completed above, and this delay offsets
763        // the first periodic tick to let the system stabilize before continuous checks
764        // begin. Matches the legacy Python semantics in `LiveExecutionEngine`.
765        let startup_delay = if self.config.exec_engine.reconciliation {
766            Duration::from_secs_f64(exec_config.reconciliation_startup_delay_secs)
767        } else {
768            Duration::ZERO
769        };
770
771        let recon_start = dst::time::Instant::now() + startup_delay;
772
773        let mut ts_last_inflight = self.exec_manager.generate_timestamp_ns();
774        let mut ts_last_open = ts_last_inflight;
775        let mut ts_last_position = ts_last_inflight;
776
777        // Disabled timers use a far-future interval so they never fire.
778        // All timers start one full interval after the startup delay
779        // so the first tick does not fire immediately.
780        let far_future = Duration::from_secs(86400 * 365 * 100);
781
782        let make_timer = |opt_dur: Option<Duration>| {
783            let dur = opt_dur.unwrap_or(far_future);
784            let mut timer = dst::time::interval_at(recon_start + dur, dur);
785            timer.set_missed_tick_behavior(dst::time::MissedTickBehavior::Delay);
786            timer
787        };
788
789        let mut recon_timer = make_timer(if recon_enabled {
790            Some(recon_min_interval)
791        } else {
792            None
793        });
794
795        let mut purge_orders_timer = make_timer(
796            exec_config
797                .purge_closed_orders_interval_mins
798                .filter(|&m| m > 0)
799                .map(|m| Duration::from_secs(mins_to_secs(m as u64))),
800        );
801
802        let mut purge_positions_timer = make_timer(
803            exec_config
804                .purge_closed_positions_interval_mins
805                .filter(|&m| m > 0)
806                .map(|m| Duration::from_secs(mins_to_secs(m as u64))),
807        );
808
809        let mut purge_account_timer = make_timer(
810            exec_config
811                .purge_account_events_interval_mins
812                .filter(|&m| m > 0)
813                .map(|m| Duration::from_secs(mins_to_secs(m as u64))),
814        );
815
816        let mut own_books_timer = make_timer(
817            exec_config
818                .own_books_audit_interval_secs
819                .filter(|&s| s > 0.0)
820                .map(Duration::from_secs_f64),
821        );
822
823        let mut prune_fills_timer = make_timer(Some(Duration::from_secs(60)));
824
825        // Stop-check timer is not subject to the reconciliation startup delay,
826        // so shutdown signals remain responsive from the moment the node reaches
827        // `Running`. Set `MissedTickBehavior::Skip` so backlog ticks do not fire
828        // a burst after the select arm was suspended by other branches.
829        let mut stop_check_timer = dst::time::interval(Duration::from_millis(100));
830        stop_check_timer.set_missed_tick_behavior(dst::time::MissedTickBehavior::Skip);
831
832        // Running phase: runs until shutdown deadline expires
833        let mut residual_events = 0usize;
834        let ctrl_c = dst::signal::ctrl_c();
835        tokio::pin!(ctrl_c);
836
837        loop {
838            let shutdown_deadline = self.shutdown_deadline;
839            let is_shutting_down = self.state() == NodeState::ShuttingDown;
840            let is_running = self.state() == NodeState::Running;
841
842            tokio::select! {
843                biased;
844
845                // Signal branches first so they are always checked
846                result = &mut ctrl_c, if is_running => {
847                    match result {
848                        Ok(()) => log::info!("Received SIGINT, shutting down"),
849                        Err(e) => log::error!("Failed to listen for SIGINT: {e}"),
850                    }
851                    self.initiate_shutdown();
852                }
853                _ = stop_check_timer.tick(), if is_running => {
854                    if stop_handle.should_stop() {
855                        log::info!("Received stop signal from handle");
856                        self.initiate_shutdown();
857                    } else if shutdown_flag.get() {
858                        log::info!("Received ShutdownSystem command, shutting down");
859                        self.initiate_shutdown();
860                    }
861                }
862                () = async {
863                    match shutdown_deadline {
864                        Some(deadline) => dst::time::sleep_until(deadline).await,
865                        None => std::future::pending::<()>().await,
866                    }
867                }, if self.state() == NodeState::ShuttingDown => {
868                    break;
869                }
870
871                // Housekeeping timers (before event processing to avoid starvation)
872                _ = recon_timer.tick(), if is_running && recon_enabled => {
873                    if let Err(e) = self.run_reconciliation_checks(
874                        inflight_interval_ns,
875                        open_interval_ns,
876                        position_interval_ns,
877                        &mut ts_last_inflight,
878                        &mut ts_last_open,
879                        &mut ts_last_position,
880                    ).await {
881                        log::error!("Reconciliation check error: {e}");
882                    }
883                }
884                _ = purge_orders_timer.tick(), if is_running => {
885                    self.exec_manager.purge_closed_orders();
886                }
887                _ = purge_positions_timer.tick(), if is_running => {
888                    self.exec_manager.purge_closed_positions();
889                }
890                _ = purge_account_timer.tick(), if is_running => {
891                    self.exec_manager.purge_account_events();
892                }
893                _ = own_books_timer.tick(), if is_running => {
894                    self.kernel.cache().borrow_mut().audit_own_order_books();
895                }
896                _ = prune_fills_timer.tick(), if is_running => {
897                    self.exec_manager.prune_recent_fills_cache(60.0);
898                }
899
900                // Event processing branches
901                Some(handler) = time_evt_rx.recv() => {
902                    AsyncRunner::handle_time_event(handler);
903
904                    if is_shutting_down {
905                        log::debug!("Residual time event");
906                        residual_events += 1;
907                    }
908                }
909                Some(evt) = data_evt_rx.recv() => {
910                    if is_shutting_down {
911                        log::debug!("Residual data event: {evt:?}");
912                        residual_events += 1;
913                    }
914                    AsyncRunner::handle_data_event(evt);
915                }
916                Some(cmd) = data_cmd_rx.recv() => {
917                    if is_shutting_down {
918                        log::debug!("Residual data command: {cmd:?}");
919                        residual_events += 1;
920                    }
921                    AsyncRunner::handle_data_command(cmd);
922                }
923                Some(evt) = exec_evt_rx.recv() => {
924                    if is_shutting_down {
925                        log::debug!("Residual exec event: {evt:?}");
926                        residual_events += 1;
927                    }
928
929                    let mut close_ids: Vec<ClientOrderId> = Vec::new();
930
931                    match &evt {
932                        ExecutionEvent::Order(order_evt) => {
933                            self.exec_manager.record_local_activity(order_evt.client_order_id());
934                            match order_evt {
935                                OrderEventAny::Filled(fill) => {
936                                    self.exec_manager.record_position_activity(
937                                        fill.instrument_id,
938                                        fill.ts_event,
939                                    );
940                                    self.exec_manager.mark_fill_processed(fill.trade_id);
941                                }
942                                OrderEventAny::Accepted(_) => {
943                                    self.exec_manager.clear_recon_tracking(
944                                        &order_evt.client_order_id(), true,
945                                    );
946                                }
947                                OrderEventAny::Rejected(_)
948                                | OrderEventAny::Canceled(_)
949                                | OrderEventAny::Expired(_)
950                                | OrderEventAny::Denied(_) => {
951                                    self.exec_manager.clear_recon_tracking(
952                                        &order_evt.client_order_id(), true,
953                                    );
954                                }
955                                _ => {}
956                            }
957                            close_ids.push(order_evt.client_order_id());
958                        }
959                        ExecutionEvent::OrderSubmittedBatch(batch) => {
960                            for submitted in &batch.events {
961                                self.exec_manager.record_local_activity(submitted.client_order_id);
962                            }
963                        }
964                        ExecutionEvent::OrderAcceptedBatch(batch) => {
965                            for accepted in &batch.events {
966                                self.exec_manager.record_local_activity(accepted.client_order_id);
967                                self.exec_manager.clear_recon_tracking(
968                                    &accepted.client_order_id, true,
969                                );
970                            }
971                        }
972                        ExecutionEvent::OrderCanceledBatch(batch) => {
973                            for canceled in &batch.events {
974                                self.exec_manager.record_local_activity(canceled.client_order_id);
975                                self.exec_manager.clear_recon_tracking(
976                                    &canceled.client_order_id, true,
977                                );
978                                close_ids.push(canceled.client_order_id);
979                            }
980                        }
981                        ExecutionEvent::Report(report) => {
982                            if let ExecutionReport::Fill(fill_report) = report
983                                && self.exec_manager.is_fill_recently_processed(&fill_report.trade_id) {
984                                    log::debug!(
985                                        "Skipping recently processed fill report: {}",
986                                        fill_report.trade_id,
987                                    );
988                                    continue;
989                            }
990                            self.exec_manager.observe_execution_report(report);
991                        }
992                        ExecutionEvent::Account(_) => {}
993                    }
994
995                    AsyncRunner::handle_exec_event(evt);
996
997                    // Post-dispatch: clear tracking when order closes
998                    for coid in &close_ids {
999                        let is_closed = self.kernel.cache().borrow()
1000                            .order(coid).is_some_and(|o| o.is_closed());
1001                        if is_closed {
1002                            self.exec_manager.clear_recon_tracking(coid, true);
1003                        }
1004                    }
1005                }
1006                Some(cmd) = exec_cmd_rx.recv() => {
1007                    if is_shutting_down {
1008                        log::debug!("Residual exec command: {cmd:?}");
1009                        residual_events += 1;
1010                    }
1011
1012                    match &cmd {
1013                        TradingCommand::SubmitOrder(submit) => {
1014                            self.exec_manager.register_inflight(submit.client_order_id);
1015                        }
1016                        TradingCommand::SubmitOrderList(submit) => {
1017                            for order_init in &submit.order_inits {
1018                                self.exec_manager.register_inflight(order_init.client_order_id);
1019                            }
1020                        }
1021                        TradingCommand::ModifyOrder(modify) => {
1022                            self.exec_manager.register_inflight(modify.client_order_id);
1023                        }
1024                        TradingCommand::CancelOrder(cancel) => {
1025                            self.exec_manager.register_inflight(cancel.client_order_id);
1026                        }
1027                        _ => {}
1028                    }
1029                    AsyncRunner::handle_exec_command(cmd);
1030                }
1031            }
1032        }
1033
1034        if residual_events > 0 {
1035            log::debug!("Processed {residual_events} residual events during shutdown");
1036        }
1037
1038        let _ = self.kernel.cache().borrow().check_residuals();
1039
1040        self.finalize_stop().await?;
1041
1042        // Handle events that arrived during finalize_stop
1043        self.drain_channels(
1044            &mut time_evt_rx,
1045            &mut data_evt_rx,
1046            &mut data_cmd_rx,
1047            &mut exec_evt_rx,
1048            &mut exec_cmd_rx,
1049        );
1050
1051        log::info!("Event loop stopped");
1052
1053        Ok(())
1054    }
1055
1056    fn process_reconciliation_events(&mut self, events: &[OrderEventAny]) {
1057        if events.is_empty() {
1058            return;
1059        }
1060
1061        log::info!(
1062            "Processing {} reconciliation event{}",
1063            events.len(),
1064            if events.len() == 1 { "" } else { "s" }
1065        );
1066
1067        for event in events {
1068            self.exec_manager
1069                .record_local_activity(event.client_order_id());
1070            if let OrderEventAny::Filled(fill) = event {
1071                self.exec_manager
1072                    .record_position_activity(fill.instrument_id, fill.ts_event);
1073                self.exec_manager.mark_fill_processed(fill.trade_id);
1074            }
1075            self.kernel.exec_engine.borrow_mut().process(event);
1076        }
1077    }
1078
1079    /// Connects execution clients and checks all engines are connected.
1080    ///
1081    /// Returns `true` if all engines connected successfully, `false` otherwise.
1082    /// Must be called after data clients are connected and instrument events drained.
1083    async fn connect_exec_phase(&mut self) -> anyhow::Result<bool> {
1084        self.kernel.connect_exec_clients().await;
1085
1086        if !self.await_engines_connected().await {
1087            return Ok(false);
1088        }
1089
1090        Ok(true)
1091    }
1092
1093    fn initiate_shutdown(&mut self) {
1094        self.kernel.stop_trader();
1095        let delay = self.kernel.delay_post_stop();
1096        log::info!("Awaiting residual events ({delay:?})...");
1097
1098        self.shutdown_deadline = Some(dst::time::Instant::now() + delay);
1099        self.handle.set_state(NodeState::ShuttingDown);
1100    }
1101
1102    async fn finalize_stop(&mut self) -> anyhow::Result<()> {
1103        let disconnect_result = self.kernel.disconnect_clients().await;
1104        if let Err(ref e) = disconnect_result {
1105            log::error!("Error disconnecting clients: {e}");
1106        }
1107
1108        self.await_engines_disconnected().await;
1109        self.kernel.finalize_stop().await;
1110
1111        self.handle.set_state(NodeState::Stopped);
1112
1113        disconnect_result
1114    }
1115
1116    fn drain_channels(
1117        &self,
1118        time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
1119        data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
1120        data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
1121        exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1122        exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
1123    ) {
1124        let mut drained = 0;
1125
1126        while let Ok(handler) = time_evt_rx.try_recv() {
1127            AsyncRunner::handle_time_event(handler);
1128            drained += 1;
1129        }
1130
1131        while let Ok(cmd) = data_cmd_rx.try_recv() {
1132            AsyncRunner::handle_data_command(cmd);
1133            drained += 1;
1134        }
1135
1136        while let Ok(evt) = data_evt_rx.try_recv() {
1137            AsyncRunner::handle_data_event(evt);
1138            drained += 1;
1139        }
1140
1141        while let Ok(cmd) = exec_cmd_rx.try_recv() {
1142            AsyncRunner::handle_exec_command(cmd);
1143            drained += 1;
1144        }
1145
1146        while let Ok(evt) = exec_evt_rx.try_recv() {
1147            AsyncRunner::handle_exec_event(evt);
1148            drained += 1;
1149        }
1150
1151        if drained > 0 {
1152            log::info!("Drained {drained} remaining events during shutdown");
1153        }
1154    }
1155
1156    /// Gets the node's environment.
1157    #[must_use]
1158    pub fn environment(&self) -> Environment {
1159        self.kernel.environment()
1160    }
1161
1162    /// Gets a reference to the underlying kernel.
1163    #[must_use]
1164    pub const fn kernel(&self) -> &NautilusKernel {
1165        &self.kernel
1166    }
1167
1168    /// Gets an exclusive reference to the underlying kernel.
1169    #[must_use]
1170    pub const fn kernel_mut(&mut self) -> &mut NautilusKernel {
1171        &mut self.kernel
1172    }
1173
1174    /// Gets the node's trader ID.
1175    #[must_use]
1176    pub fn trader_id(&self) -> TraderId {
1177        self.kernel.trader_id()
1178    }
1179
1180    /// Gets the node's instance ID.
1181    #[must_use]
1182    pub const fn instance_id(&self) -> UUID4 {
1183        self.kernel.instance_id()
1184    }
1185
1186    /// Returns the current node state.
1187    #[must_use]
1188    pub fn state(&self) -> NodeState {
1189        self.handle.state()
1190    }
1191
1192    /// Checks if the live node is currently running.
1193    #[must_use]
1194    pub fn is_running(&self) -> bool {
1195        self.state().is_running()
1196    }
1197
1198    /// Sets the cache database adapter for persistence.
1199    ///
1200    /// This allows setting a database adapter (e.g., PostgreSQL, Redis) after the node
1201    /// is built but before it starts running. The database adapter is used to persist
1202    /// cache data for recovery and state management.
1203    ///
1204    /// # Errors
1205    ///
1206    /// Returns an error if the node is already running.
1207    pub fn set_cache_database(
1208        &mut self,
1209        database: Box<dyn CacheDatabaseAdapter>,
1210    ) -> anyhow::Result<()> {
1211        if self.state() != NodeState::Idle {
1212            anyhow::bail!(
1213                "Cannot set cache database while node is running, set it before calling start()"
1214            );
1215        }
1216
1217        self.kernel.cache().borrow_mut().set_database(database);
1218        Ok(())
1219    }
1220
1221    /// Returns the execution manager.
1222    #[must_use]
1223    pub fn exec_manager(&self) -> &ExecutionManager {
1224        &self.exec_manager
1225    }
1226
1227    /// Returns a mutable reference to the execution manager.
1228    #[must_use]
1229    pub fn exec_manager_mut(&mut self) -> &mut ExecutionManager {
1230        &mut self.exec_manager
1231    }
1232
1233    /// Adds an actor to the trader.
1234    ///
1235    /// This method provides a high-level interface for adding actors to the underlying
1236    /// trader without requiring direct access to the kernel. Actors should be added
1237    /// after the node is built but before starting the node.
1238    ///
1239    /// # Errors
1240    ///
1241    /// Returns an error if:
1242    /// - The trader is not in a valid state for adding components.
1243    /// - An actor with the same ID is already registered.
1244    /// - The node is currently running.
1245    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
1246    where
1247        T: DataActor + Component + Actor + 'static,
1248    {
1249        if self.state() != NodeState::Idle {
1250            anyhow::bail!(
1251                "Cannot add actor while node is running, add actors before calling start()"
1252            );
1253        }
1254
1255        self.kernel.trader.borrow_mut().add_actor(actor)
1256    }
1257
1258    /// Adds an actor to the live node using a factory function.
1259    ///
1260    /// The factory function is called at registration time to create the actor,
1261    /// avoiding cloning issues with non-cloneable actor types.
1262    ///
1263    /// # Errors
1264    ///
1265    /// Returns an error if:
1266    /// - The node is currently running.
1267    /// - The factory function fails to create the actor.
1268    /// - The underlying trader registration fails.
1269    pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
1270    where
1271        F: FnOnce() -> anyhow::Result<T>,
1272        T: DataActor + Component + Actor + 'static,
1273    {
1274        if self.state() != NodeState::Idle {
1275            anyhow::bail!(
1276                "Cannot add actor while node is running, add actors before calling start()"
1277            );
1278        }
1279
1280        self.kernel
1281            .trader
1282            .borrow_mut()
1283            .add_actor_from_factory(factory)
1284    }
1285
1286    /// Adds a strategy to the trader.
1287    ///
1288    /// Strategies are registered in both the component registry (for lifecycle management)
1289    /// and the actor registry (for data callbacks via msgbus).
1290    ///
1291    /// # Errors
1292    ///
1293    /// Returns an error if:
1294    /// - The node is currently running.
1295    /// - A strategy with the same ID is already registered.
1296    pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
1297    where
1298        T: Strategy + Component + Debug + 'static,
1299    {
1300        if self.state() != NodeState::Idle {
1301            anyhow::bail!(
1302                "Cannot add strategy while node is running, add strategies before calling start()"
1303            );
1304        }
1305
1306        // Register external order claims before adding strategy (which moves it)
1307        let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
1308        if let Some(claims) = strategy.external_order_claims() {
1309            for instrument_id in claims {
1310                self.exec_manager
1311                    .claim_external_orders(instrument_id, strategy_id);
1312            }
1313            log_info!(
1314                "Registered external order claims for {}: {:?}",
1315                strategy_id,
1316                strategy.external_order_claims(),
1317                color = LogColor::Blue
1318            );
1319        }
1320
1321        self.kernel.trader.borrow_mut().add_strategy(strategy)
1322    }
1323
1324    /// Adds an execution algorithm to the trader.
1325    ///
1326    /// Execution algorithms are registered in both the component registry (for lifecycle
1327    /// management) and the actor registry (for data callbacks via msgbus).
1328    ///
1329    /// # Errors
1330    ///
1331    /// Returns an error if:
1332    /// - The node is currently running.
1333    /// - An execution algorithm with the same ID is already registered.
1334    pub fn add_exec_algorithm<T>(&mut self, exec_algorithm: T) -> anyhow::Result<()>
1335    where
1336        T: ExecutionAlgorithm + Component + Debug + 'static,
1337    {
1338        if self.state() != NodeState::Idle {
1339            anyhow::bail!(
1340                "Cannot add exec algorithm while node is running, add exec algorithms before calling start()"
1341            );
1342        }
1343
1344        self.kernel
1345            .trader
1346            .borrow_mut()
1347            .add_exec_algorithm(exec_algorithm)
1348    }
1349
1350    // Runs up to three reconciliation sub-checks (inflight, open orders,
1351    // positions), each gated by its own interval. A single recon_timer in
1352    // the select! loop fires at the minimum enabled interval; this method
1353    // then checks which sub-checks are actually due.
1354    //
1355    // The exec_engine borrow is held across the async venue queries because
1356    // get_all_clients() returns references into the engine's client map.
1357    // This is safe: select! runs one branch to completion, so no other
1358    // branch can borrow the same RefCells concurrently.
1359    #[expect(clippy::await_holding_refcell_ref)]
1360    async fn run_reconciliation_checks(
1361        &mut self,
1362        inflight_interval_ns: u64,
1363        open_interval_ns: u64,
1364        position_interval_ns: u64,
1365        ts_last_inflight: &mut UnixNanos,
1366        ts_last_open: &mut UnixNanos,
1367        ts_last_position: &mut UnixNanos,
1368    ) -> anyhow::Result<()> {
1369        let ts_now = self.exec_manager.generate_timestamp_ns();
1370
1371        if inflight_interval_ns > 0 && (ts_now - *ts_last_inflight).as_u64() >= inflight_interval_ns
1372        {
1373            if self.state() == NodeState::ShuttingDown {
1374                return Ok(());
1375            }
1376            let result = self.exec_manager.check_inflight_orders();
1377            self.process_reconciliation_events(&result.events);
1378            for cmd in result.queries {
1379                AsyncRunner::handle_exec_command(cmd);
1380            }
1381            *ts_last_inflight = ts_now;
1382        }
1383
1384        if open_interval_ns > 0 && (ts_now - *ts_last_open).as_u64() >= open_interval_ns {
1385            if self.state() == NodeState::ShuttingDown {
1386                return Ok(());
1387            }
1388            let eng_ref = self.kernel.exec_engine.borrow();
1389            let clients = eng_ref.get_all_clients();
1390            let events = self.exec_manager.check_open_orders(&clients).await;
1391            drop(clients);
1392            drop(eng_ref);
1393            self.process_reconciliation_events(&events);
1394            *ts_last_open = ts_now;
1395        }
1396
1397        if position_interval_ns > 0 && (ts_now - *ts_last_position).as_u64() >= position_interval_ns
1398        {
1399            if self.state() == NodeState::ShuttingDown {
1400                return Ok(());
1401            }
1402            let eng_ref = self.kernel.exec_engine.borrow();
1403            let clients = eng_ref.get_all_clients();
1404            let events = self
1405                .exec_manager
1406                .check_positions_consistency(&clients)
1407                .await;
1408            drop(clients);
1409            drop(eng_ref);
1410            self.process_reconciliation_events(&events);
1411            *ts_last_position = ts_now;
1412        }
1413
1414        Ok(())
1415    }
1416}
1417
1418/// Flushes data events and commands from both `pending` and the channel receivers
1419/// into the cache, looping until no progress is made.
1420///
1421/// This closes the gap where `drive_with_event_buffering` exits as soon as its
1422/// driven future resolves (biased select), leaving items in the channel receivers
1423/// that were not captured into `pending`.
1424fn flush_pending_data(
1425    pending: &mut PendingEvents,
1426    data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
1427    data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
1428) {
1429    loop {
1430        let mut progressed = pending.drain_data();
1431
1432        while let Ok(evt) = data_evt_rx.try_recv() {
1433            AsyncRunner::handle_data_event(evt);
1434            progressed = true;
1435        }
1436
1437        while let Ok(cmd) = data_cmd_rx.try_recv() {
1438            AsyncRunner::handle_data_command(cmd);
1439            progressed = true;
1440        }
1441
1442        if !progressed {
1443            break;
1444        }
1445    }
1446}
1447
1448/// Flushes all channel receivers into `pending`, then drains everything.
1449///
1450/// Unlike [`flush_pending_data`] this is a single pass, not a drain-until-quiet
1451/// loop. Sufficient for phase 2 where the goal is to capture items the biased
1452/// select did not poll before the connect future resolved.
1453fn flush_all_pending(
1454    pending: &mut PendingEvents,
1455    time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
1456    data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
1457    data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
1458    exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1459    exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
1460) {
1461    // Flush channel receivers into pending
1462    while let Ok(handler) = time_evt_rx.try_recv() {
1463        AsyncRunner::handle_time_event(handler);
1464    }
1465
1466    while let Ok(evt) = data_evt_rx.try_recv() {
1467        pending.data_evts.push(evt);
1468    }
1469
1470    while let Ok(cmd) = data_cmd_rx.try_recv() {
1471        pending.data_cmds.push(cmd);
1472    }
1473
1474    while let Ok(evt) = exec_evt_rx.try_recv() {
1475        match evt {
1476            ExecutionEvent::Account(_) => {
1477                AsyncRunner::handle_exec_event(evt);
1478            }
1479            ExecutionEvent::Report(report) => {
1480                pending.exec_reports.push(report);
1481            }
1482            ExecutionEvent::Order(order_evt) => {
1483                pending.order_evts.push(order_evt);
1484            }
1485            ExecutionEvent::OrderSubmittedBatch(batch) => {
1486                for submitted in batch {
1487                    pending.order_evts.push(OrderEventAny::Submitted(submitted));
1488                }
1489            }
1490            ExecutionEvent::OrderAcceptedBatch(batch) => {
1491                for accepted in batch {
1492                    pending.order_evts.push(OrderEventAny::Accepted(accepted));
1493                }
1494            }
1495            ExecutionEvent::OrderCanceledBatch(batch) => {
1496                for canceled in batch {
1497                    pending.order_evts.push(OrderEventAny::Canceled(canceled));
1498                }
1499            }
1500        }
1501    }
1502
1503    while let Ok(cmd) = exec_cmd_rx.try_recv() {
1504        pending.exec_cmds.push(cmd);
1505    }
1506
1507    pending.drain();
1508}
1509
1510/// Drives a future to completion while buffering channel events.
1511///
1512/// Time events are handled immediately. Account events are forwarded directly.
1513/// All other events are buffered in `pending` for later processing.
1514async fn drive_with_event_buffering<F: std::future::Future>(
1515    future: F,
1516    pending: &mut PendingEvents,
1517    time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
1518    data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
1519    data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
1520    exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1521    exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
1522) -> F::Output {
1523    tokio::pin!(future);
1524
1525    loop {
1526        tokio::select! {
1527            biased;
1528
1529            result = &mut future => {
1530                break result;
1531            }
1532            Some(handler) = time_evt_rx.recv() => {
1533                AsyncRunner::handle_time_event(handler);
1534            }
1535            Some(evt) = data_evt_rx.recv() => {
1536                pending.data_evts.push(evt);
1537            }
1538            Some(cmd) = data_cmd_rx.recv() => {
1539                pending.data_cmds.push(cmd);
1540            }
1541            Some(evt) = exec_evt_rx.recv() => {
1542                // Account events are safe to process immediately. Report and
1543                // Order events need ExecEngine borrow_mut which may conflict
1544                // with the borrow held by the driven future.
1545                match evt {
1546                    ExecutionEvent::Account(_) => {
1547                        AsyncRunner::handle_exec_event(evt);
1548                    }
1549                    ExecutionEvent::Report(report) => {
1550                        pending.exec_reports.push(report);
1551                    }
1552                    ExecutionEvent::Order(order_evt) => {
1553                        pending.order_evts.push(order_evt);
1554                    }
1555                    ExecutionEvent::OrderSubmittedBatch(batch) => {
1556                        for submitted in batch {
1557                            pending.order_evts.push(OrderEventAny::Submitted(submitted));
1558                        }
1559                    }
1560                    ExecutionEvent::OrderAcceptedBatch(batch) => {
1561                        for accepted in batch {
1562                            pending.order_evts.push(OrderEventAny::Accepted(accepted));
1563                        }
1564                    }
1565                    ExecutionEvent::OrderCanceledBatch(batch) => {
1566                        for canceled in batch {
1567                            pending.order_evts.push(OrderEventAny::Canceled(canceled));
1568                        }
1569                    }
1570                }
1571            }
1572            Some(cmd) = exec_cmd_rx.recv() => {
1573                pending.exec_cmds.push(cmd);
1574            }
1575        }
1576    }
1577}
1578
1579#[derive(Default)]
1580struct PendingEvents {
1581    data_cmds: Vec<DataCommand>,
1582    data_evts: Vec<DataEvent>,
1583    exec_cmds: Vec<TradingCommand>,
1584    exec_reports: Vec<ExecutionReport>,
1585    order_evts: Vec<OrderEventAny>,
1586}
1587
1588impl PendingEvents {
1589    fn is_empty(&self) -> bool {
1590        self.data_evts.is_empty()
1591            && self.data_cmds.is_empty()
1592            && self.exec_cmds.is_empty()
1593            && self.exec_reports.is_empty()
1594            && self.order_evts.is_empty()
1595    }
1596
1597    /// Drains only data events and commands into the cache.
1598    ///
1599    /// Returns `true` if any events or commands were drained.
1600    fn drain_data(&mut self) -> bool {
1601        let total = self.data_evts.len() + self.data_cmds.len();
1602
1603        if total > 0 {
1604            log::debug!(
1605                "Draining {total} data events/commands into cache \
1606                 (data_evts={}, data_cmds={})",
1607                self.data_evts.len(),
1608                self.data_cmds.len(),
1609            );
1610        }
1611
1612        for evt in self.data_evts.drain(..) {
1613            AsyncRunner::handle_data_event(evt);
1614        }
1615
1616        for cmd in self.data_cmds.drain(..) {
1617            AsyncRunner::handle_data_command(cmd);
1618        }
1619
1620        total > 0
1621    }
1622
1623    /// Drains all remaining pending events.
1624    fn drain(&mut self) {
1625        let total = self.data_evts.len()
1626            + self.data_cmds.len()
1627            + self.exec_cmds.len()
1628            + self.exec_reports.len()
1629            + self.order_evts.len();
1630
1631        if total > 0 {
1632            log::debug!(
1633                "Processing {total} events/commands queued during startup \
1634                 (data_evts={}, data_cmds={}, exec_cmds={}, exec_reports={}, order_evts={})",
1635                self.data_evts.len(),
1636                self.data_cmds.len(),
1637                self.exec_cmds.len(),
1638                self.exec_reports.len(),
1639                self.order_evts.len()
1640            );
1641        }
1642
1643        for evt in self.data_evts.drain(..) {
1644            AsyncRunner::handle_data_event(evt);
1645        }
1646
1647        for cmd in self.data_cmds.drain(..) {
1648            AsyncRunner::handle_data_command(cmd);
1649        }
1650
1651        for report in self.exec_reports.drain(..) {
1652            AsyncRunner::handle_exec_event(ExecutionEvent::Report(report));
1653        }
1654
1655        for cmd in self.exec_cmds.drain(..) {
1656            AsyncRunner::handle_exec_command(cmd);
1657        }
1658
1659        for evt in self.order_evts.drain(..) {
1660            AsyncRunner::handle_exec_event(ExecutionEvent::Order(evt));
1661        }
1662    }
1663}
1664
1665#[cfg(test)]
1666mod tests {
1667    #[cfg(feature = "python")]
1668    use std::sync::Arc;
1669
1670    #[cfg(feature = "python")]
1671    use nautilus_common::runner::{
1672        SyncDataCommandSender, SyncTradingCommandSender, replace_data_cmd_sender,
1673        replace_exec_cmd_sender,
1674    };
1675    use nautilus_model::identifiers::TraderId;
1676    use rstest::*;
1677
1678    use super::*;
1679
1680    #[rstest]
1681    #[case(0, NodeState::Idle)]
1682    #[case(1, NodeState::Starting)]
1683    #[case(2, NodeState::Running)]
1684    #[case(3, NodeState::ShuttingDown)]
1685    #[case(4, NodeState::Stopped)]
1686    fn test_node_state_from_u8_valid(#[case] value: u8, #[case] expected: NodeState) {
1687        assert_eq!(NodeState::from_u8(value), expected);
1688    }
1689
1690    #[rstest]
1691    #[case(5)]
1692    #[case(255)]
1693    #[should_panic(expected = "Invalid NodeState value")]
1694    fn test_node_state_from_u8_invalid_panics(#[case] value: u8) {
1695        let _ = NodeState::from_u8(value);
1696    }
1697
1698    #[rstest]
1699    fn test_node_state_roundtrip() {
1700        for state in [
1701            NodeState::Idle,
1702            NodeState::Starting,
1703            NodeState::Running,
1704            NodeState::ShuttingDown,
1705            NodeState::Stopped,
1706        ] {
1707            assert_eq!(NodeState::from_u8(state.as_u8()), state);
1708        }
1709    }
1710
1711    #[rstest]
1712    fn test_node_state_is_running_only_for_running() {
1713        assert!(!NodeState::Idle.is_running());
1714        assert!(!NodeState::Starting.is_running());
1715        assert!(NodeState::Running.is_running());
1716        assert!(!NodeState::ShuttingDown.is_running());
1717        assert!(!NodeState::Stopped.is_running());
1718    }
1719
1720    #[rstest]
1721    fn test_handle_initial_state() {
1722        let handle = LiveNodeHandle::new();
1723
1724        assert_eq!(handle.state(), NodeState::Idle);
1725        assert!(!handle.should_stop());
1726        assert!(!handle.is_running());
1727    }
1728
1729    #[rstest]
1730    fn test_handle_stop_sets_flag() {
1731        let handle = LiveNodeHandle::new();
1732
1733        handle.stop();
1734
1735        assert!(handle.should_stop());
1736    }
1737
1738    #[rstest]
1739    fn test_handle_set_state_running_clears_stop_flag() {
1740        let handle = LiveNodeHandle::new();
1741        handle.stop();
1742        assert!(handle.should_stop());
1743
1744        handle.set_state(NodeState::Running);
1745
1746        assert!(!handle.should_stop());
1747        assert!(handle.is_running());
1748        assert_eq!(handle.state(), NodeState::Running);
1749    }
1750
1751    #[rstest]
1752    fn test_handle_node_state_transitions() {
1753        let handle = LiveNodeHandle::new();
1754        assert_eq!(handle.state(), NodeState::Idle);
1755
1756        handle.set_state(NodeState::Starting);
1757        assert_eq!(handle.state(), NodeState::Starting);
1758        assert!(!handle.is_running());
1759
1760        handle.set_state(NodeState::Running);
1761        assert_eq!(handle.state(), NodeState::Running);
1762        assert!(handle.is_running());
1763
1764        handle.set_state(NodeState::ShuttingDown);
1765        assert_eq!(handle.state(), NodeState::ShuttingDown);
1766        assert!(!handle.is_running());
1767
1768        handle.set_state(NodeState::Stopped);
1769        assert_eq!(handle.state(), NodeState::Stopped);
1770        assert!(!handle.is_running());
1771    }
1772
1773    #[rstest]
1774    fn test_handle_clone_shares_state_bidirectionally() {
1775        let handle1 = LiveNodeHandle::new();
1776        let handle2 = handle1.clone();
1777
1778        // Mutation from handle1 visible in handle2
1779        handle1.stop();
1780        assert!(handle2.should_stop());
1781
1782        // Mutation from handle2 visible in handle1
1783        handle2.set_state(NodeState::Running);
1784        assert_eq!(handle1.state(), NodeState::Running);
1785    }
1786
1787    #[rstest]
1788    fn test_handle_stop_flag_independent_of_state() {
1789        let handle = LiveNodeHandle::new();
1790
1791        // Stop flag can be set regardless of state
1792        handle.set_state(NodeState::Starting);
1793        handle.stop();
1794        assert!(handle.should_stop());
1795        assert_eq!(handle.state(), NodeState::Starting);
1796
1797        // Only Running state clears the stop flag
1798        handle.set_state(NodeState::ShuttingDown);
1799        assert!(handle.should_stop()); // Still set
1800
1801        handle.set_state(NodeState::Running);
1802        assert!(!handle.should_stop()); // Cleared
1803    }
1804
1805    #[rstest]
1806    fn test_builder_creation() {
1807        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1808
1809        assert!(result.is_ok());
1810    }
1811
1812    #[rstest]
1813    fn test_builder_rejects_backtest() {
1814        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
1815
1816        assert!(result.is_err());
1817        assert!(result.unwrap_err().to_string().contains("Backtest"));
1818    }
1819
1820    #[rstest]
1821    fn test_builder_accepts_live_environment() {
1822        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live);
1823
1824        assert!(result.is_ok());
1825    }
1826
1827    #[rstest]
1828    fn test_builder_accepts_sandbox_environment() {
1829        let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1830
1831        assert!(result.is_ok());
1832    }
1833
1834    #[rstest]
1835    fn test_builder_fluent_api_chaining() {
1836        let builder = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live)
1837            .unwrap()
1838            .with_name("TestNode")
1839            .with_instance_id(UUID4::new())
1840            .with_load_state(false)
1841            .with_save_state(true)
1842            .with_timeout_connection(30)
1843            .with_timeout_reconciliation(60)
1844            .with_reconciliation(true)
1845            .with_reconciliation_lookback_mins(120)
1846            .with_timeout_portfolio(10)
1847            .with_timeout_disconnection_secs(5)
1848            .with_delay_post_stop_secs(3)
1849            .with_delay_shutdown_secs(10);
1850
1851        assert_eq!(builder.name(), "TestNode");
1852    }
1853
1854    #[cfg(feature = "python")]
1855    #[rstest]
1856    fn test_node_build_and_initial_state() {
1857        let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1858            .unwrap()
1859            .with_name("TestNode")
1860            .build()
1861            .unwrap();
1862
1863        assert_eq!(node.state(), NodeState::Idle);
1864        assert!(!node.is_running());
1865        assert_eq!(node.environment(), Environment::Sandbox);
1866        assert_eq!(node.trader_id(), TraderId::from("TRADER-001"));
1867    }
1868
1869    #[cfg(feature = "python")]
1870    #[rstest]
1871    fn test_node_build_replaces_stale_runner_senders() {
1872        replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
1873        replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
1874
1875        let first = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1876            .unwrap()
1877            .with_name("FirstNode")
1878            .build()
1879            .unwrap();
1880
1881        assert_eq!(first.state(), NodeState::Idle);
1882        drop(first);
1883
1884        let second = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1885            .unwrap()
1886            .with_name("SecondNode")
1887            .build()
1888            .unwrap();
1889
1890        assert_eq!(second.state(), NodeState::Idle);
1891        assert!(!second.is_running());
1892    }
1893
1894    #[cfg(feature = "python")]
1895    #[rstest]
1896    fn test_node_handle_reflects_node_state() {
1897        let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1898            .unwrap()
1899            .with_name("TestNode")
1900            .build()
1901            .unwrap();
1902
1903        let handle = node.handle();
1904
1905        assert_eq!(handle.state(), NodeState::Idle);
1906        assert!(!handle.is_running());
1907    }
1908
1909    #[rstest]
1910    fn test_pending_drain_data_returns_false_when_empty() {
1911        let mut pending = PendingEvents::default();
1912
1913        assert!(!pending.drain_data());
1914    }
1915
1916    #[rstest]
1917    fn test_pending_drain_data_returns_true_when_non_empty() {
1918        use nautilus_model::instruments::{InstrumentAny, stubs::crypto_perpetual_ethusdt};
1919
1920        let mut pending = PendingEvents::default();
1921        pending
1922            .data_evts
1923            .push(DataEvent::Instrument(InstrumentAny::CryptoPerpetual(
1924                crypto_perpetual_ethusdt(),
1925            )));
1926
1927        assert!(pending.drain_data());
1928        assert!(pending.data_evts.is_empty());
1929    }
1930
1931    fn stub_data_event() -> DataEvent {
1932        use nautilus_model::instruments::{InstrumentAny, stubs::crypto_perpetual_ethusdt};
1933
1934        DataEvent::Instrument(InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt()))
1935    }
1936
1937    fn stub_data_command() -> DataCommand {
1938        use nautilus_common::messages::data::{SubscribeCommand, subscribe::SubscribeInstruments};
1939        use nautilus_core::{UUID4, UnixNanos};
1940        use nautilus_model::identifiers::Venue;
1941
1942        DataCommand::Subscribe(SubscribeCommand::Instruments(SubscribeInstruments::new(
1943            None,
1944            Venue::from("TEST"),
1945            UUID4::new(),
1946            UnixNanos::default(),
1947            None,
1948            None,
1949        )))
1950    }
1951
1952    #[rstest]
1953    fn test_flush_pending_data_drains_events_and_commands() {
1954        let (evt_tx, mut evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1955        let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1956
1957        let mut pending = PendingEvents::default();
1958
1959        // Pre-load pending (items captured by the select loop)
1960        pending.data_evts.push(stub_data_event());
1961        pending.data_cmds.push(stub_data_command());
1962
1963        // Pre-load channels (items missed by the select loop)
1964        evt_tx.send(stub_data_event()).unwrap();
1965        cmd_tx.send(stub_data_command()).unwrap();
1966
1967        flush_pending_data(&mut pending, &mut evt_rx, &mut cmd_rx);
1968
1969        assert!(pending.data_evts.is_empty());
1970        assert!(pending.data_cmds.is_empty());
1971        assert!(evt_rx.try_recv().is_err());
1972        assert!(cmd_rx.try_recv().is_err());
1973    }
1974
1975    #[rstest]
1976    fn test_flush_pending_data_drains_mixed_sources() {
1977        let (evt_tx, mut evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1978        let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1979
1980        let mut pending = PendingEvents::default();
1981
1982        // First pass: pending has an event, channel has a command
1983        pending.data_evts.push(stub_data_event());
1984        cmd_tx.send(stub_data_command()).unwrap();
1985
1986        // Second pass: channel has items that simulate arrival during first drain
1987        evt_tx.send(stub_data_event()).unwrap();
1988        evt_tx.send(stub_data_event()).unwrap();
1989        cmd_tx.send(stub_data_command()).unwrap();
1990
1991        flush_pending_data(&mut pending, &mut evt_rx, &mut cmd_rx);
1992
1993        assert!(pending.data_evts.is_empty());
1994        assert!(pending.data_cmds.is_empty());
1995        assert!(evt_rx.try_recv().is_err());
1996        assert!(cmd_rx.try_recv().is_err());
1997    }
1998
1999    fn stub_time_event_handler() -> TimeEventHandler {
2000        use std::rc::Rc;
2001
2002        use nautilus_common::timer::{TimeEvent, TimeEventCallback, TimeEventHandler};
2003        use nautilus_core::{UUID4, UnixNanos};
2004        use ustr::Ustr;
2005
2006        TimeEventHandler::new(
2007            TimeEvent::new(
2008                Ustr::from("test-timer"),
2009                UUID4::new(),
2010                UnixNanos::default(),
2011                UnixNanos::default(),
2012            ),
2013            TimeEventCallback::RustLocal(Rc::new(|_| {})),
2014        )
2015    }
2016
2017    fn stub_trading_command() -> TradingCommand {
2018        use nautilus_common::messages::execution::query::QueryAccount;
2019        use nautilus_core::{UUID4, UnixNanos};
2020        use nautilus_model::identifiers::AccountId;
2021
2022        TradingCommand::QueryAccount(QueryAccount::new(
2023            TraderId::from("TESTER-001"),
2024            None,
2025            AccountId::from("TEST-001"),
2026            UUID4::new(),
2027            UnixNanos::default(),
2028            None,
2029        ))
2030    }
2031
2032    fn stub_exec_event() -> ExecutionEvent {
2033        use nautilus_model::{
2034            enums::{LiquiditySide, OrderSide},
2035            identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
2036            reports::FillReport,
2037            types::{Money, Price, Quantity},
2038        };
2039
2040        ExecutionEvent::Report(ExecutionReport::Fill(Box::new(FillReport::new(
2041            AccountId::from("TEST-001"),
2042            InstrumentId::from("TEST.VENUE"),
2043            VenueOrderId::from("V-001"),
2044            TradeId::from("T-001"),
2045            OrderSide::Buy,
2046            Quantity::from("1.0"),
2047            Price::from("100.0"),
2048            Money::from("0.01 USD"),
2049            LiquiditySide::Maker,
2050            None,
2051            None,
2052            nautilus_core::UnixNanos::default(),
2053            nautilus_core::UnixNanos::default(),
2054            None,
2055        ))))
2056    }
2057
2058    #[rstest]
2059    fn test_flush_all_pending_drains_all_channel_types() {
2060        let (time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
2061        let (data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2062        let (data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
2063        let (exec_evt_tx, mut exec_evt_rx) =
2064            tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
2065        let (exec_cmd_tx, mut exec_cmd_rx) =
2066            tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
2067
2068        let mut pending = PendingEvents::default();
2069
2070        // Pre-load pending with data items
2071        pending.data_evts.push(stub_data_event());
2072        pending.data_cmds.push(stub_data_command());
2073
2074        // Pre-load all channel types
2075        time_tx.send(stub_time_event_handler()).unwrap();
2076        data_evt_tx.send(stub_data_event()).unwrap();
2077        data_cmd_tx.send(stub_data_command()).unwrap();
2078        exec_evt_tx.send(stub_exec_event()).unwrap();
2079        exec_cmd_tx.send(stub_trading_command()).unwrap();
2080
2081        flush_all_pending(
2082            &mut pending,
2083            &mut time_rx,
2084            &mut data_evt_rx,
2085            &mut data_cmd_rx,
2086            &mut exec_evt_rx,
2087            &mut exec_cmd_rx,
2088        );
2089
2090        assert!(pending.data_evts.is_empty());
2091        assert!(pending.data_cmds.is_empty());
2092        assert!(pending.exec_reports.is_empty());
2093        assert!(pending.exec_cmds.is_empty());
2094        assert!(pending.order_evts.is_empty());
2095        assert!(time_rx.try_recv().is_err());
2096        assert!(data_evt_rx.try_recv().is_err());
2097        assert!(data_cmd_rx.try_recv().is_err());
2098        assert!(exec_evt_rx.try_recv().is_err());
2099        assert!(exec_cmd_rx.try_recv().is_err());
2100    }
2101
2102    fn stub_order_event() -> ExecutionEvent {
2103        use nautilus_core::{UUID4, UnixNanos};
2104        use nautilus_model::{
2105            events::OrderSubmitted,
2106            identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId},
2107        };
2108
2109        ExecutionEvent::Order(OrderEventAny::Submitted(OrderSubmitted::new(
2110            TraderId::from("TESTER-001"),
2111            StrategyId::from("S-001"),
2112            InstrumentId::from("TEST.VENUE"),
2113            ClientOrderId::from("O-001"),
2114            AccountId::from("TEST-001"),
2115            UUID4::new(),
2116            UnixNanos::default(),
2117            UnixNanos::default(),
2118        )))
2119    }
2120
2121    fn stub_account_event() -> ExecutionEvent {
2122        use nautilus_core::{UUID4, UnixNanos};
2123        use nautilus_model::{
2124            enums::AccountType, events::account::state::AccountState, identifiers::AccountId,
2125        };
2126
2127        ExecutionEvent::Account(AccountState::new(
2128            AccountId::from("TEST-001"),
2129            AccountType::Cash,
2130            vec![],
2131            vec![],
2132            true,
2133            UUID4::new(),
2134            UnixNanos::default(),
2135            UnixNanos::default(),
2136            None,
2137        ))
2138    }
2139
2140    #[rstest]
2141    fn test_flush_all_pending_routes_order_event_to_order_evts() {
2142        let (_time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
2143        let (_data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2144        let (_data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
2145        let (exec_evt_tx, mut exec_evt_rx) =
2146            tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
2147        let (_exec_cmd_tx, mut exec_cmd_rx) =
2148            tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
2149
2150        let mut pending = PendingEvents::default();
2151
2152        exec_evt_tx.send(stub_order_event()).unwrap();
2153        exec_evt_tx.send(stub_exec_event()).unwrap();
2154
2155        flush_all_pending(
2156            &mut pending,
2157            &mut time_rx,
2158            &mut data_evt_rx,
2159            &mut data_cmd_rx,
2160            &mut exec_evt_rx,
2161            &mut exec_cmd_rx,
2162        );
2163
2164        // Both order and report events are drained by pending.drain()
2165        assert!(pending.order_evts.is_empty());
2166        assert!(pending.exec_reports.is_empty());
2167        assert!(exec_evt_rx.try_recv().is_err());
2168    }
2169
2170    #[rstest]
2171    fn test_flush_all_pending_routes_account_event_immediately() {
2172        let (_time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
2173        let (_data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2174        let (_data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
2175        let (exec_evt_tx, mut exec_evt_rx) =
2176            tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
2177        let (_exec_cmd_tx, mut exec_cmd_rx) =
2178            tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
2179
2180        let mut pending = PendingEvents::default();
2181
2182        exec_evt_tx.send(stub_account_event()).unwrap();
2183
2184        flush_all_pending(
2185            &mut pending,
2186            &mut time_rx,
2187            &mut data_evt_rx,
2188            &mut data_cmd_rx,
2189            &mut exec_evt_rx,
2190            &mut exec_cmd_rx,
2191        );
2192
2193        // Account events are forwarded immediately, never buffered in pending
2194        assert!(pending.exec_reports.is_empty());
2195        assert!(pending.order_evts.is_empty());
2196        assert!(pending.exec_cmds.is_empty());
2197        assert!(exec_evt_rx.try_recv().is_err());
2198    }
2199
2200    #[rstest]
2201    fn test_pending_is_empty_when_default() {
2202        let pending = PendingEvents::default();
2203
2204        assert!(pending.is_empty());
2205    }
2206
2207    #[rstest]
2208    fn test_pending_is_empty_false_with_data_evt() {
2209        let mut pending = PendingEvents::default();
2210        pending.data_evts.push(stub_data_event());
2211
2212        assert!(!pending.is_empty());
2213    }
2214
2215    #[rstest]
2216    fn test_pending_is_empty_false_with_data_cmd() {
2217        let mut pending = PendingEvents::default();
2218        pending.data_cmds.push(stub_data_command());
2219
2220        assert!(!pending.is_empty());
2221    }
2222
2223    #[rstest]
2224    fn test_pending_is_empty_false_with_exec_cmd() {
2225        let mut pending = PendingEvents::default();
2226        pending.exec_cmds.push(stub_trading_command());
2227
2228        assert!(!pending.is_empty());
2229    }
2230
2231    #[rstest]
2232    fn test_pending_is_empty_false_with_exec_report() {
2233        let mut pending = PendingEvents::default();
2234
2235        if let ExecutionEvent::Report(report) = stub_exec_event() {
2236            pending.exec_reports.push(report);
2237        }
2238
2239        assert!(!pending.is_empty());
2240    }
2241
2242    #[rstest]
2243    fn test_pending_is_empty_false_with_order_evt() {
2244        let mut pending = PendingEvents::default();
2245
2246        if let ExecutionEvent::Order(order_evt) = stub_order_event() {
2247            pending.order_evts.push(order_evt);
2248        }
2249
2250        assert!(!pending.is_empty());
2251    }
2252
2253    fn stub_submitted_batch_event() -> ExecutionEvent {
2254        use nautilus_core::{UUID4, UnixNanos};
2255        use nautilus_model::{
2256            events::{OrderSubmitted, OrderSubmittedBatch},
2257            identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId},
2258        };
2259
2260        let events = vec![
2261            OrderSubmitted::new(
2262                TraderId::from("TESTER-001"),
2263                StrategyId::from("S-001"),
2264                InstrumentId::from("TEST.VENUE"),
2265                ClientOrderId::from("O-001"),
2266                AccountId::from("TEST-001"),
2267                UUID4::new(),
2268                UnixNanos::default(),
2269                UnixNanos::default(),
2270            ),
2271            OrderSubmitted::new(
2272                TraderId::from("TESTER-001"),
2273                StrategyId::from("S-001"),
2274                InstrumentId::from("TEST.VENUE"),
2275                ClientOrderId::from("O-002"),
2276                AccountId::from("TEST-001"),
2277                UUID4::new(),
2278                UnixNanos::default(),
2279                UnixNanos::default(),
2280            ),
2281        ];
2282
2283        ExecutionEvent::OrderSubmittedBatch(OrderSubmittedBatch::new(events))
2284    }
2285
2286    fn stub_canceled_batch_event() -> ExecutionEvent {
2287        use nautilus_core::{UUID4, UnixNanos};
2288        use nautilus_model::{
2289            events::{OrderCanceled, OrderCanceledBatch},
2290            identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId},
2291        };
2292
2293        let events = vec![
2294            OrderCanceled::new(
2295                TraderId::from("TESTER-001"),
2296                StrategyId::from("S-001"),
2297                InstrumentId::from("TEST.VENUE"),
2298                ClientOrderId::from("O-001"),
2299                UUID4::new(),
2300                UnixNanos::default(),
2301                UnixNanos::default(),
2302                false,
2303                None,
2304                Some(AccountId::from("TEST-001")),
2305            ),
2306            OrderCanceled::new(
2307                TraderId::from("TESTER-001"),
2308                StrategyId::from("S-001"),
2309                InstrumentId::from("TEST.VENUE"),
2310                ClientOrderId::from("O-002"),
2311                UUID4::new(),
2312                UnixNanos::default(),
2313                UnixNanos::default(),
2314                false,
2315                None,
2316                Some(AccountId::from("TEST-001")),
2317            ),
2318        ];
2319
2320        ExecutionEvent::OrderCanceledBatch(OrderCanceledBatch::new(events))
2321    }
2322
2323    #[rstest]
2324    fn test_flush_all_pending_buffers_submitted_batch_as_individual_events() {
2325        let (_time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
2326        let (_data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2327        let (_data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
2328        let (exec_evt_tx, mut exec_evt_rx) =
2329            tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
2330        let (_exec_cmd_tx, mut exec_cmd_rx) =
2331            tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
2332
2333        let mut pending = PendingEvents::default();
2334
2335        exec_evt_tx.send(stub_submitted_batch_event()).unwrap();
2336
2337        flush_all_pending(
2338            &mut pending,
2339            &mut time_rx,
2340            &mut data_evt_rx,
2341            &mut data_cmd_rx,
2342            &mut exec_evt_rx,
2343            &mut exec_cmd_rx,
2344        );
2345
2346        // Batch should be unpacked into individual Submitted events then drained
2347        assert!(pending.order_evts.is_empty());
2348        assert!(exec_evt_rx.try_recv().is_err());
2349    }
2350
2351    #[rstest]
2352    fn test_flush_all_pending_buffers_canceled_batch_as_individual_events() {
2353        let (_time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
2354        let (_data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2355        let (_data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
2356        let (exec_evt_tx, mut exec_evt_rx) =
2357            tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
2358        let (_exec_cmd_tx, mut exec_cmd_rx) =
2359            tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
2360
2361        let mut pending = PendingEvents::default();
2362
2363        exec_evt_tx.send(stub_canceled_batch_event()).unwrap();
2364
2365        flush_all_pending(
2366            &mut pending,
2367            &mut time_rx,
2368            &mut data_evt_rx,
2369            &mut data_cmd_rx,
2370            &mut exec_evt_rx,
2371            &mut exec_cmd_rx,
2372        );
2373
2374        // Batch should be unpacked into individual Canceled events then drained
2375        assert!(pending.order_evts.is_empty());
2376        assert!(exec_evt_rx.try_recv().is_err());
2377    }
2378
2379    #[rstest]
2380    fn test_flush_all_pending_expands_batch_into_order_evts_before_drain() {
2381        use nautilus_model::identifiers::ClientOrderId;
2382
2383        let (exec_evt_tx, mut exec_evt_rx) =
2384            tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
2385
2386        exec_evt_tx.send(stub_canceled_batch_event()).unwrap();
2387
2388        let mut pending = PendingEvents::default();
2389
2390        // Manually replicate what flush_all_pending does before drain
2391        while let Ok(evt) = exec_evt_rx.try_recv() {
2392            match evt {
2393                ExecutionEvent::Account(_) => {
2394                    AsyncRunner::handle_exec_event(evt);
2395                }
2396                ExecutionEvent::Report(report) => {
2397                    pending.exec_reports.push(report);
2398                }
2399                ExecutionEvent::Order(order_evt) => {
2400                    pending.order_evts.push(order_evt);
2401                }
2402                ExecutionEvent::OrderSubmittedBatch(batch) => {
2403                    for submitted in batch {
2404                        pending.order_evts.push(OrderEventAny::Submitted(submitted));
2405                    }
2406                }
2407                ExecutionEvent::OrderAcceptedBatch(batch) => {
2408                    for accepted in batch {
2409                        pending.order_evts.push(OrderEventAny::Accepted(accepted));
2410                    }
2411                }
2412                ExecutionEvent::OrderCanceledBatch(batch) => {
2413                    for canceled in batch {
2414                        pending.order_evts.push(OrderEventAny::Canceled(canceled));
2415                    }
2416                }
2417            }
2418        }
2419
2420        assert_eq!(pending.order_evts.len(), 2);
2421        assert!(
2422            matches!(&pending.order_evts[0], OrderEventAny::Canceled(c) if c.client_order_id == ClientOrderId::from("O-001"))
2423        );
2424        assert!(
2425            matches!(&pending.order_evts[1], OrderEventAny::Canceled(c) if c.client_order_id == ClientOrderId::from("O-002"))
2426        );
2427    }
2428}