1use std::{
56 fmt::Debug,
57 sync::{
58 Arc,
59 atomic::{AtomicBool, AtomicU8, Ordering},
60 },
61 time::Duration,
62};
63
64use nautilus_common::{
65 actor::{Actor, DataActor},
66 cache::database::CacheDatabaseAdapter,
67 component::Component,
68 enums::{Environment, LogColor},
69 live::dst,
70 log_info,
71 messages::{
72 DataEvent, ExecutionEvent, ExecutionReport, data::DataCommand, execution::TradingCommand,
73 },
74 timer::TimeEventHandler,
75};
76use nautilus_core::{
77 UUID4, UnixNanos,
78 datetime::{NANOSECONDS_IN_MILLISECOND, mins_to_secs, secs_to_nanos_unchecked},
79};
80use nautilus_model::{
81 events::OrderEventAny,
82 identifiers::{ClientOrderId, StrategyId, TraderId},
83 orders::Order,
84};
85use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
86use nautilus_trading::{ExecutionAlgorithm, strategy::Strategy};
87use tabled::{Table, Tabled, settings::Style};
88
89use crate::{
90 builder::LiveNodeBuilder,
91 config::LiveNodeConfig,
92 manager::{ExecutionManager, ExecutionManagerConfig},
93 runner::{AsyncRunner, AsyncRunnerChannels},
94};
95
96#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
98#[repr(u8)]
99pub enum NodeState {
100 #[default]
101 Idle = 0,
102 Starting = 1,
103 Running = 2,
104 ShuttingDown = 3,
105 Stopped = 4,
106}
107
108impl NodeState {
109 #[must_use]
115 pub const fn from_u8(value: u8) -> Self {
116 match value {
117 0 => Self::Idle,
118 1 => Self::Starting,
119 2 => Self::Running,
120 3 => Self::ShuttingDown,
121 4 => Self::Stopped,
122 _ => panic!("Invalid NodeState value"),
123 }
124 }
125
126 #[must_use]
128 pub const fn as_u8(self) -> u8 {
129 self as u8
130 }
131
132 #[must_use]
134 pub const fn is_running(&self) -> bool {
135 matches!(self, Self::Running)
136 }
137}
138
139#[derive(Clone, Debug)]
144pub struct LiveNodeHandle {
145 pub(crate) stop_flag: Arc<AtomicBool>,
147 pub(crate) state: Arc<AtomicU8>,
149}
150
151impl Default for LiveNodeHandle {
152 fn default() -> Self {
153 Self::new()
154 }
155}
156
157impl LiveNodeHandle {
158 #[must_use]
160 pub fn new() -> Self {
161 Self {
162 stop_flag: Arc::new(AtomicBool::new(false)),
163 state: Arc::new(AtomicU8::new(NodeState::Idle.as_u8())),
164 }
165 }
166
167 pub(crate) fn set_state(&self, state: NodeState) {
169 self.state.store(state.as_u8(), Ordering::Relaxed);
170 if state == NodeState::Running {
171 self.stop_flag.store(false, Ordering::Relaxed);
173 }
174 }
175
176 #[must_use]
178 pub fn state(&self) -> NodeState {
179 NodeState::from_u8(self.state.load(Ordering::Relaxed))
180 }
181
182 #[must_use]
184 pub fn should_stop(&self) -> bool {
185 self.stop_flag.load(Ordering::Relaxed)
186 }
187
188 #[must_use]
190 pub fn is_running(&self) -> bool {
191 self.state().is_running()
192 }
193
194 pub fn stop(&self) {
196 self.stop_flag.store(true, Ordering::Relaxed);
197 }
198}
199
200#[derive(Debug)]
205#[cfg_attr(
206 feature = "python",
207 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
208)]
209#[cfg_attr(
210 feature = "python",
211 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")
212)]
213pub struct LiveNode {
214 kernel: NautilusKernel,
215 runner: Option<AsyncRunner>,
216 config: LiveNodeConfig,
217 handle: LiveNodeHandle,
218 exec_manager: ExecutionManager,
219 shutdown_deadline: Option<dst::time::Instant>,
220 #[cfg(feature = "python")]
221 #[allow(dead_code)] python_actors: Vec<pyo3::Py<pyo3::PyAny>>,
223}
224
225impl LiveNode {
226 #[must_use]
230 pub(crate) fn new_from_builder(
231 kernel: NautilusKernel,
232 runner: AsyncRunner,
233 config: LiveNodeConfig,
234 exec_manager: ExecutionManager,
235 ) -> Self {
236 Self {
237 kernel,
238 runner: Some(runner),
239 config,
240 handle: LiveNodeHandle::new(),
241 exec_manager,
242 shutdown_deadline: None,
243 #[cfg(feature = "python")]
244 python_actors: Vec::new(),
245 }
246 }
247
248 pub fn builder(
254 trader_id: TraderId,
255 environment: Environment,
256 ) -> anyhow::Result<LiveNodeBuilder> {
257 LiveNodeBuilder::new(trader_id, environment)
258 }
259
260 pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
270 let mut config = config.unwrap_or_default();
271 config.environment = Environment::Live;
272
273 match config.environment() {
274 Environment::Sandbox | Environment::Live => {}
275 Environment::Backtest => {
276 anyhow::bail!("LiveNode cannot be used with Backtest environment");
277 }
278 }
279
280 config.validate_runtime_support()?;
281
282 let runner = AsyncRunner::new();
283 runner.bind_senders();
284
285 let kernel = NautilusKernel::new(name, config.clone())?;
286
287 let exec_manager_config =
288 ExecutionManagerConfig::from(&config.exec_engine).with_trader_id(config.trader_id);
289 let exec_manager = ExecutionManager::new(
290 kernel.clock.clone(),
291 kernel.cache.clone(),
292 exec_manager_config,
293 );
294
295 log::info!("LiveNode built successfully with kernel config");
296
297 Ok(Self {
298 kernel,
299 runner: Some(runner),
300 config,
301 handle: LiveNodeHandle::new(),
302 exec_manager,
303 shutdown_deadline: None,
304 #[cfg(feature = "python")]
305 python_actors: Vec::new(),
306 })
307 }
308
309 #[must_use]
311 pub fn handle(&self) -> LiveNodeHandle {
312 self.handle.clone()
313 }
314
315 pub async fn start(&mut self) -> anyhow::Result<()> {
327 if self.state().is_running() {
328 anyhow::bail!("Already running");
329 }
330
331 if let Some(runner) = self.runner.as_ref() {
332 runner.bind_senders();
333 }
334
335 self.handle.set_state(NodeState::Starting);
336
337 self.kernel.start_async().await;
338
339 self.kernel.connect_data_clients().await;
341
342 if let Some(runner) = self.runner.as_mut() {
343 runner.flush_pending_data();
344 }
345
346 self.kernel.connect_exec_clients().await;
347
348 if !self.await_engines_connected().await {
349 log::error!("Cannot start trader: engine client(s) not connected");
350 self.handle.set_state(NodeState::Running);
351 return Ok(());
352 }
353
354 self.perform_startup_reconciliation().await?;
355
356 self.kernel.start_trader();
357
358 self.handle.set_state(NodeState::Running);
359
360 Ok(())
361 }
362
363 pub async fn stop(&mut self) -> anyhow::Result<()> {
372 if !self.state().is_running() {
373 anyhow::bail!("Not running");
374 }
375
376 self.handle.set_state(NodeState::ShuttingDown);
377
378 self.kernel.stop_trader();
379 let delay = self.kernel.delay_post_stop();
380 log::info!("Awaiting residual events ({delay:?})...");
381
382 dst::time::sleep(delay).await;
383 self.finalize_stop().await
384 }
385
386 async fn await_engines_connected(&self) -> bool {
390 log::info!(
391 "Awaiting engine connections ({:?} timeout)...",
392 self.config.timeout_connection
393 );
394
395 let start = dst::time::Instant::now();
396 let timeout = self.config.timeout_connection;
397 let interval = Duration::from_millis(100);
398
399 while start.elapsed() < timeout {
400 if self.kernel.check_engines_connected() {
401 log::info!("All engine clients connected");
402 return true;
403 }
404 dst::time::sleep(interval).await;
405 }
406
407 self.log_connection_status();
408 false
409 }
410
411 async fn await_engines_disconnected(&self) {
415 log::info!(
416 "Awaiting engine disconnections ({:?} timeout)...",
417 self.config.timeout_disconnection
418 );
419
420 let start = dst::time::Instant::now();
421 let timeout = self.config.timeout_disconnection;
422 let interval = Duration::from_millis(100);
423
424 while start.elapsed() < timeout {
425 if self.kernel.check_engines_disconnected() {
426 log::info!("All engine clients disconnected");
427 return;
428 }
429 dst::time::sleep(interval).await;
430 }
431
432 log::error!(
433 "Timed out ({:?}) waiting for engines to disconnect\n\
434 DataEngine.check_disconnected() == {}\n\
435 ExecEngine.check_disconnected() == {}",
436 timeout,
437 self.kernel.data_engine().check_disconnected(),
438 self.kernel.exec_engine().borrow().check_disconnected(),
439 );
440 }
441
442 fn log_connection_status(&self) {
443 #[derive(Tabled)]
444 struct ClientStatus {
445 #[tabled(rename = "Client")]
446 client: String,
447 #[tabled(rename = "Type")]
448 client_type: &'static str,
449 #[tabled(rename = "Connected")]
450 connected: bool,
451 }
452
453 let data_status = self.kernel.data_client_connection_status();
454 let exec_status = self.kernel.exec_client_connection_status();
455
456 let mut rows: Vec<ClientStatus> = Vec::new();
457
458 for (client_id, connected) in data_status {
459 rows.push(ClientStatus {
460 client: client_id.to_string(),
461 client_type: "Data",
462 connected,
463 });
464 }
465
466 for (client_id, connected) in exec_status {
467 rows.push(ClientStatus {
468 client: client_id.to_string(),
469 client_type: "Execution",
470 connected,
471 });
472 }
473
474 let table = Table::new(&rows).with(Style::rounded()).to_string();
475
476 log::warn!(
477 "Timed out ({:?}) waiting for engines to connect\n\n{table}\n\n\
478 DataEngine.check_connected() == {}\n\
479 ExecEngine.check_connected() == {}",
480 self.config.timeout_connection,
481 self.kernel.data_engine().check_connected(),
482 self.kernel.exec_engine().borrow().check_connected(),
483 );
484 }
485
486 #[expect(clippy::await_holding_refcell_ref)] async fn perform_startup_reconciliation(&mut self) -> anyhow::Result<()> {
496 if !self.config.exec_engine.reconciliation {
497 log::info!("Startup reconciliation disabled");
498 return Ok(());
499 }
500
501 log_info!(
502 "Starting execution state reconciliation...",
503 color = LogColor::Blue
504 );
505
506 let lookback_mins = self
507 .config
508 .exec_engine
509 .reconciliation_lookback_mins
510 .map(|m| m as u64);
511
512 let timeout = self.config.timeout_reconciliation;
513 let start = dst::time::Instant::now();
514 let client_ids = self.kernel.exec_engine.borrow().client_ids();
515
516 for client_id in client_ids {
517 if start.elapsed() > timeout {
518 log::warn!("Reconciliation timeout reached, stopping early");
519 break;
520 }
521
522 log_info!(
523 "Requesting mass status from {}...",
524 client_id,
525 color = LogColor::Blue
526 );
527
528 let mass_status_result = self
529 .kernel
530 .exec_engine
531 .borrow_mut()
532 .generate_mass_status(&client_id, lookback_mins)
533 .await;
534
535 match mass_status_result {
536 Ok(Some(mass_status)) => {
537 log_info!(
538 "Reconciling ExecutionMassStatus for {}",
539 client_id,
540 color = LogColor::Blue
541 );
542
543 let exec_engine_rc = self.kernel.exec_engine.clone();
544
545 let result = self
546 .exec_manager
547 .reconcile_execution_mass_status(mass_status, exec_engine_rc)
548 .await;
549
550 if result.events.is_empty() {
551 log_info!(
552 "Reconciliation for {} succeeded",
553 client_id,
554 color = LogColor::Blue
555 );
556 } else {
557 log::info!(
558 color = LogColor::Blue as u8;
559 "Reconciliation for {} processed {} events",
560 client_id,
561 result.events.len()
562 );
563 }
564
565 if !result.external_orders.is_empty() {
567 let exec_engine = self.kernel.exec_engine.borrow();
568 for external in result.external_orders {
569 exec_engine.register_external_order(
570 external.client_order_id,
571 external.venue_order_id,
572 external.instrument_id,
573 external.strategy_id,
574 external.ts_init,
575 );
576 }
577 }
578 }
579 Ok(None) => {
580 log::warn!(
581 "No mass status available from {client_id} \
582 (likely adapter error when generating reports)"
583 );
584 }
585 Err(e) => {
586 log::warn!("Failed to get mass status from {client_id}: {e}");
587 }
588 }
589 }
590
591 self.kernel.portfolio.borrow_mut().initialize_orders();
592 self.kernel.portfolio.borrow_mut().initialize_positions();
593
594 let elapsed_secs = start.elapsed().as_secs_f64();
595 log_info!(
596 "Startup reconciliation completed in {:.2}s",
597 elapsed_secs,
598 color = LogColor::Blue
599 );
600
601 Ok(())
602 }
603
604 pub async fn run(&mut self) -> anyhow::Result<()> {
626 if self.state().is_running() {
627 anyhow::bail!("Already running");
628 }
629
630 let Some(runner) = self.runner.take() else {
631 anyhow::bail!("Runner already consumed - run() called twice");
632 };
633 runner.bind_senders();
634
635 let AsyncRunnerChannels {
636 mut time_evt_rx,
637 mut data_evt_rx,
638 mut data_cmd_rx,
639 mut exec_evt_rx,
640 mut exec_cmd_rx,
641 } = runner.take_channels();
642
643 log::info!("Event loop starting");
644
645 self.handle.set_state(NodeState::Starting);
646 self.kernel.start_async().await;
647 self.kernel.reset_shutdown_flag();
648
649 let stop_handle = self.handle.clone();
650 let shutdown_flag = self.kernel.shutdown_flag();
651 let mut pending = PendingEvents::default();
652
653 drive_with_event_buffering(
659 self.kernel.connect_data_clients(),
660 &mut pending,
661 &mut time_evt_rx,
662 &mut data_evt_rx,
663 &mut data_cmd_rx,
664 &mut exec_evt_rx,
665 &mut exec_cmd_rx,
666 )
667 .await;
668
669 flush_pending_data(&mut pending, &mut data_evt_rx, &mut data_cmd_rx);
673 debug_assert!(
674 pending.data_evts.is_empty() && pending.data_cmds.is_empty(),
675 "data must be drained into cache before exec clients connect",
676 );
677
678 let engines_connected = drive_with_event_buffering(
680 self.connect_exec_phase(),
681 &mut pending,
682 &mut time_evt_rx,
683 &mut data_evt_rx,
684 &mut data_cmd_rx,
685 &mut exec_evt_rx,
686 &mut exec_cmd_rx,
687 )
688 .await?;
689
690 flush_all_pending(
692 &mut pending,
693 &mut time_evt_rx,
694 &mut data_evt_rx,
695 &mut data_cmd_rx,
696 &mut exec_evt_rx,
697 &mut exec_cmd_rx,
698 );
699 debug_assert!(
700 pending.is_empty(),
701 "all startup events must be processed before reconciliation",
702 );
703
704 if engines_connected {
705 self.perform_startup_reconciliation().await?;
707 self.kernel.start_trader();
708 } else {
709 log::error!("Not starting trader: engine client(s) not connected");
710 }
711
712 self.handle.set_state(NodeState::Running);
713
714 let exec_config = &self.config.exec_engine;
715 let inflight_interval_ns =
716 (exec_config.inflight_check_interval_ms as u64) * NANOSECONDS_IN_MILLISECOND;
717 let open_interval_ns = exec_config
718 .open_check_interval_secs
719 .filter(|&s| s > 0.0)
720 .map_or(0, secs_to_nanos_unchecked);
721 let position_interval_ns = exec_config
722 .position_check_interval_secs
723 .filter(|&s| s > 0.0)
724 .map_or(0, secs_to_nanos_unchecked);
725 let has_clients = !self
726 .kernel
727 .exec_engine
728 .borrow()
729 .get_all_clients()
730 .is_empty();
731 let recon_enabled = has_clients
732 && (inflight_interval_ns > 0 || open_interval_ns > 0 || position_interval_ns > 0);
733
734 let recon_min_interval = if recon_enabled {
735 let mut intervals = Vec::new();
736
737 if exec_config.inflight_check_interval_ms > 0 {
738 intervals.push(Duration::from_millis(
739 exec_config.inflight_check_interval_ms as u64,
740 ));
741 }
742
743 if let Some(s) = exec_config.open_check_interval_secs.filter(|&s| s > 0.0) {
744 intervals.push(Duration::from_secs_f64(s));
745 }
746
747 if let Some(s) = exec_config
748 .position_check_interval_secs
749 .filter(|&s| s > 0.0)
750 {
751 intervals.push(Duration::from_secs_f64(s));
752 }
753 intervals
754 .into_iter()
755 .min()
756 .unwrap_or(Duration::from_secs(1))
757 } else {
758 Duration::from_secs(1) };
760
761 let startup_delay = if self.config.exec_engine.reconciliation {
766 Duration::from_secs_f64(exec_config.reconciliation_startup_delay_secs)
767 } else {
768 Duration::ZERO
769 };
770
771 let recon_start = dst::time::Instant::now() + startup_delay;
772
773 let mut ts_last_inflight = self.exec_manager.generate_timestamp_ns();
774 let mut ts_last_open = ts_last_inflight;
775 let mut ts_last_position = ts_last_inflight;
776
777 let far_future = Duration::from_secs(86400 * 365 * 100);
781
782 let make_timer = |opt_dur: Option<Duration>| {
783 let dur = opt_dur.unwrap_or(far_future);
784 let mut timer = dst::time::interval_at(recon_start + dur, dur);
785 timer.set_missed_tick_behavior(dst::time::MissedTickBehavior::Delay);
786 timer
787 };
788
789 let mut recon_timer = make_timer(if recon_enabled {
790 Some(recon_min_interval)
791 } else {
792 None
793 });
794
795 let mut purge_orders_timer = make_timer(
796 exec_config
797 .purge_closed_orders_interval_mins
798 .filter(|&m| m > 0)
799 .map(|m| Duration::from_secs(mins_to_secs(m as u64))),
800 );
801
802 let mut purge_positions_timer = make_timer(
803 exec_config
804 .purge_closed_positions_interval_mins
805 .filter(|&m| m > 0)
806 .map(|m| Duration::from_secs(mins_to_secs(m as u64))),
807 );
808
809 let mut purge_account_timer = make_timer(
810 exec_config
811 .purge_account_events_interval_mins
812 .filter(|&m| m > 0)
813 .map(|m| Duration::from_secs(mins_to_secs(m as u64))),
814 );
815
816 let mut own_books_timer = make_timer(
817 exec_config
818 .own_books_audit_interval_secs
819 .filter(|&s| s > 0.0)
820 .map(Duration::from_secs_f64),
821 );
822
823 let mut prune_fills_timer = make_timer(Some(Duration::from_secs(60)));
824
825 let mut stop_check_timer = dst::time::interval(Duration::from_millis(100));
830 stop_check_timer.set_missed_tick_behavior(dst::time::MissedTickBehavior::Skip);
831
832 let mut residual_events = 0usize;
834 let ctrl_c = dst::signal::ctrl_c();
835 tokio::pin!(ctrl_c);
836
837 loop {
838 let shutdown_deadline = self.shutdown_deadline;
839 let is_shutting_down = self.state() == NodeState::ShuttingDown;
840 let is_running = self.state() == NodeState::Running;
841
842 tokio::select! {
843 biased;
844
845 result = &mut ctrl_c, if is_running => {
847 match result {
848 Ok(()) => log::info!("Received SIGINT, shutting down"),
849 Err(e) => log::error!("Failed to listen for SIGINT: {e}"),
850 }
851 self.initiate_shutdown();
852 }
853 _ = stop_check_timer.tick(), if is_running => {
854 if stop_handle.should_stop() {
855 log::info!("Received stop signal from handle");
856 self.initiate_shutdown();
857 } else if shutdown_flag.get() {
858 log::info!("Received ShutdownSystem command, shutting down");
859 self.initiate_shutdown();
860 }
861 }
862 () = async {
863 match shutdown_deadline {
864 Some(deadline) => dst::time::sleep_until(deadline).await,
865 None => std::future::pending::<()>().await,
866 }
867 }, if self.state() == NodeState::ShuttingDown => {
868 break;
869 }
870
871 _ = recon_timer.tick(), if is_running && recon_enabled => {
873 if let Err(e) = self.run_reconciliation_checks(
874 inflight_interval_ns,
875 open_interval_ns,
876 position_interval_ns,
877 &mut ts_last_inflight,
878 &mut ts_last_open,
879 &mut ts_last_position,
880 ).await {
881 log::error!("Reconciliation check error: {e}");
882 }
883 }
884 _ = purge_orders_timer.tick(), if is_running => {
885 self.exec_manager.purge_closed_orders();
886 }
887 _ = purge_positions_timer.tick(), if is_running => {
888 self.exec_manager.purge_closed_positions();
889 }
890 _ = purge_account_timer.tick(), if is_running => {
891 self.exec_manager.purge_account_events();
892 }
893 _ = own_books_timer.tick(), if is_running => {
894 self.kernel.cache().borrow_mut().audit_own_order_books();
895 }
896 _ = prune_fills_timer.tick(), if is_running => {
897 self.exec_manager.prune_recent_fills_cache(60.0);
898 }
899
900 Some(handler) = time_evt_rx.recv() => {
902 AsyncRunner::handle_time_event(handler);
903
904 if is_shutting_down {
905 log::debug!("Residual time event");
906 residual_events += 1;
907 }
908 }
909 Some(evt) = data_evt_rx.recv() => {
910 if is_shutting_down {
911 log::debug!("Residual data event: {evt:?}");
912 residual_events += 1;
913 }
914 AsyncRunner::handle_data_event(evt);
915 }
916 Some(cmd) = data_cmd_rx.recv() => {
917 if is_shutting_down {
918 log::debug!("Residual data command: {cmd:?}");
919 residual_events += 1;
920 }
921 AsyncRunner::handle_data_command(cmd);
922 }
923 Some(evt) = exec_evt_rx.recv() => {
924 if is_shutting_down {
925 log::debug!("Residual exec event: {evt:?}");
926 residual_events += 1;
927 }
928
929 let mut close_ids: Vec<ClientOrderId> = Vec::new();
930
931 match &evt {
932 ExecutionEvent::Order(order_evt) => {
933 self.exec_manager.record_local_activity(order_evt.client_order_id());
934 match order_evt {
935 OrderEventAny::Filled(fill) => {
936 self.exec_manager.record_position_activity(
937 fill.instrument_id,
938 fill.ts_event,
939 );
940 self.exec_manager.mark_fill_processed(fill.trade_id);
941 }
942 OrderEventAny::Accepted(_) => {
943 self.exec_manager.clear_recon_tracking(
944 &order_evt.client_order_id(), true,
945 );
946 }
947 OrderEventAny::Rejected(_)
948 | OrderEventAny::Canceled(_)
949 | OrderEventAny::Expired(_)
950 | OrderEventAny::Denied(_) => {
951 self.exec_manager.clear_recon_tracking(
952 &order_evt.client_order_id(), true,
953 );
954 }
955 _ => {}
956 }
957 close_ids.push(order_evt.client_order_id());
958 }
959 ExecutionEvent::OrderSubmittedBatch(batch) => {
960 for submitted in &batch.events {
961 self.exec_manager.record_local_activity(submitted.client_order_id);
962 }
963 }
964 ExecutionEvent::OrderAcceptedBatch(batch) => {
965 for accepted in &batch.events {
966 self.exec_manager.record_local_activity(accepted.client_order_id);
967 self.exec_manager.clear_recon_tracking(
968 &accepted.client_order_id, true,
969 );
970 }
971 }
972 ExecutionEvent::OrderCanceledBatch(batch) => {
973 for canceled in &batch.events {
974 self.exec_manager.record_local_activity(canceled.client_order_id);
975 self.exec_manager.clear_recon_tracking(
976 &canceled.client_order_id, true,
977 );
978 close_ids.push(canceled.client_order_id);
979 }
980 }
981 ExecutionEvent::Report(report) => {
982 if let ExecutionReport::Fill(fill_report) = report
983 && self.exec_manager.is_fill_recently_processed(&fill_report.trade_id) {
984 log::debug!(
985 "Skipping recently processed fill report: {}",
986 fill_report.trade_id,
987 );
988 continue;
989 }
990 self.exec_manager.observe_execution_report(report);
991 }
992 ExecutionEvent::Account(_) => {}
993 }
994
995 AsyncRunner::handle_exec_event(evt);
996
997 for coid in &close_ids {
999 let is_closed = self.kernel.cache().borrow()
1000 .order(coid).is_some_and(|o| o.is_closed());
1001 if is_closed {
1002 self.exec_manager.clear_recon_tracking(coid, true);
1003 }
1004 }
1005 }
1006 Some(cmd) = exec_cmd_rx.recv() => {
1007 if is_shutting_down {
1008 log::debug!("Residual exec command: {cmd:?}");
1009 residual_events += 1;
1010 }
1011
1012 match &cmd {
1013 TradingCommand::SubmitOrder(submit) => {
1014 self.exec_manager.register_inflight(submit.client_order_id);
1015 }
1016 TradingCommand::SubmitOrderList(submit) => {
1017 for order_init in &submit.order_inits {
1018 self.exec_manager.register_inflight(order_init.client_order_id);
1019 }
1020 }
1021 TradingCommand::ModifyOrder(modify) => {
1022 self.exec_manager.register_inflight(modify.client_order_id);
1023 }
1024 TradingCommand::CancelOrder(cancel) => {
1025 self.exec_manager.register_inflight(cancel.client_order_id);
1026 }
1027 _ => {}
1028 }
1029 AsyncRunner::handle_exec_command(cmd);
1030 }
1031 }
1032 }
1033
1034 if residual_events > 0 {
1035 log::debug!("Processed {residual_events} residual events during shutdown");
1036 }
1037
1038 let _ = self.kernel.cache().borrow().check_residuals();
1039
1040 self.finalize_stop().await?;
1041
1042 self.drain_channels(
1044 &mut time_evt_rx,
1045 &mut data_evt_rx,
1046 &mut data_cmd_rx,
1047 &mut exec_evt_rx,
1048 &mut exec_cmd_rx,
1049 );
1050
1051 log::info!("Event loop stopped");
1052
1053 Ok(())
1054 }
1055
1056 fn process_reconciliation_events(&mut self, events: &[OrderEventAny]) {
1057 if events.is_empty() {
1058 return;
1059 }
1060
1061 log::info!(
1062 "Processing {} reconciliation event{}",
1063 events.len(),
1064 if events.len() == 1 { "" } else { "s" }
1065 );
1066
1067 for event in events {
1068 self.exec_manager
1069 .record_local_activity(event.client_order_id());
1070 if let OrderEventAny::Filled(fill) = event {
1071 self.exec_manager
1072 .record_position_activity(fill.instrument_id, fill.ts_event);
1073 self.exec_manager.mark_fill_processed(fill.trade_id);
1074 }
1075 self.kernel.exec_engine.borrow_mut().process(event);
1076 }
1077 }
1078
1079 async fn connect_exec_phase(&mut self) -> anyhow::Result<bool> {
1084 self.kernel.connect_exec_clients().await;
1085
1086 if !self.await_engines_connected().await {
1087 return Ok(false);
1088 }
1089
1090 Ok(true)
1091 }
1092
1093 fn initiate_shutdown(&mut self) {
1094 self.kernel.stop_trader();
1095 let delay = self.kernel.delay_post_stop();
1096 log::info!("Awaiting residual events ({delay:?})...");
1097
1098 self.shutdown_deadline = Some(dst::time::Instant::now() + delay);
1099 self.handle.set_state(NodeState::ShuttingDown);
1100 }
1101
1102 async fn finalize_stop(&mut self) -> anyhow::Result<()> {
1103 let disconnect_result = self.kernel.disconnect_clients().await;
1104 if let Err(ref e) = disconnect_result {
1105 log::error!("Error disconnecting clients: {e}");
1106 }
1107
1108 self.await_engines_disconnected().await;
1109 self.kernel.finalize_stop().await;
1110
1111 self.handle.set_state(NodeState::Stopped);
1112
1113 disconnect_result
1114 }
1115
1116 fn drain_channels(
1117 &self,
1118 time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
1119 data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
1120 data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
1121 exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1122 exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
1123 ) {
1124 let mut drained = 0;
1125
1126 while let Ok(handler) = time_evt_rx.try_recv() {
1127 AsyncRunner::handle_time_event(handler);
1128 drained += 1;
1129 }
1130
1131 while let Ok(cmd) = data_cmd_rx.try_recv() {
1132 AsyncRunner::handle_data_command(cmd);
1133 drained += 1;
1134 }
1135
1136 while let Ok(evt) = data_evt_rx.try_recv() {
1137 AsyncRunner::handle_data_event(evt);
1138 drained += 1;
1139 }
1140
1141 while let Ok(cmd) = exec_cmd_rx.try_recv() {
1142 AsyncRunner::handle_exec_command(cmd);
1143 drained += 1;
1144 }
1145
1146 while let Ok(evt) = exec_evt_rx.try_recv() {
1147 AsyncRunner::handle_exec_event(evt);
1148 drained += 1;
1149 }
1150
1151 if drained > 0 {
1152 log::info!("Drained {drained} remaining events during shutdown");
1153 }
1154 }
1155
1156 #[must_use]
1158 pub fn environment(&self) -> Environment {
1159 self.kernel.environment()
1160 }
1161
1162 #[must_use]
1164 pub const fn kernel(&self) -> &NautilusKernel {
1165 &self.kernel
1166 }
1167
1168 #[must_use]
1170 pub const fn kernel_mut(&mut self) -> &mut NautilusKernel {
1171 &mut self.kernel
1172 }
1173
1174 #[must_use]
1176 pub fn trader_id(&self) -> TraderId {
1177 self.kernel.trader_id()
1178 }
1179
1180 #[must_use]
1182 pub const fn instance_id(&self) -> UUID4 {
1183 self.kernel.instance_id()
1184 }
1185
1186 #[must_use]
1188 pub fn state(&self) -> NodeState {
1189 self.handle.state()
1190 }
1191
1192 #[must_use]
1194 pub fn is_running(&self) -> bool {
1195 self.state().is_running()
1196 }
1197
1198 pub fn set_cache_database(
1208 &mut self,
1209 database: Box<dyn CacheDatabaseAdapter>,
1210 ) -> anyhow::Result<()> {
1211 if self.state() != NodeState::Idle {
1212 anyhow::bail!(
1213 "Cannot set cache database while node is running, set it before calling start()"
1214 );
1215 }
1216
1217 self.kernel.cache().borrow_mut().set_database(database);
1218 Ok(())
1219 }
1220
1221 #[must_use]
1223 pub fn exec_manager(&self) -> &ExecutionManager {
1224 &self.exec_manager
1225 }
1226
1227 #[must_use]
1229 pub fn exec_manager_mut(&mut self) -> &mut ExecutionManager {
1230 &mut self.exec_manager
1231 }
1232
1233 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
1246 where
1247 T: DataActor + Component + Actor + 'static,
1248 {
1249 if self.state() != NodeState::Idle {
1250 anyhow::bail!(
1251 "Cannot add actor while node is running, add actors before calling start()"
1252 );
1253 }
1254
1255 self.kernel.trader.borrow_mut().add_actor(actor)
1256 }
1257
1258 pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
1270 where
1271 F: FnOnce() -> anyhow::Result<T>,
1272 T: DataActor + Component + Actor + 'static,
1273 {
1274 if self.state() != NodeState::Idle {
1275 anyhow::bail!(
1276 "Cannot add actor while node is running, add actors before calling start()"
1277 );
1278 }
1279
1280 self.kernel
1281 .trader
1282 .borrow_mut()
1283 .add_actor_from_factory(factory)
1284 }
1285
1286 pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
1297 where
1298 T: Strategy + Component + Debug + 'static,
1299 {
1300 if self.state() != NodeState::Idle {
1301 anyhow::bail!(
1302 "Cannot add strategy while node is running, add strategies before calling start()"
1303 );
1304 }
1305
1306 let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
1308 if let Some(claims) = strategy.external_order_claims() {
1309 for instrument_id in claims {
1310 self.exec_manager
1311 .claim_external_orders(instrument_id, strategy_id);
1312 }
1313 log_info!(
1314 "Registered external order claims for {}: {:?}",
1315 strategy_id,
1316 strategy.external_order_claims(),
1317 color = LogColor::Blue
1318 );
1319 }
1320
1321 self.kernel.trader.borrow_mut().add_strategy(strategy)
1322 }
1323
1324 pub fn add_exec_algorithm<T>(&mut self, exec_algorithm: T) -> anyhow::Result<()>
1335 where
1336 T: ExecutionAlgorithm + Component + Debug + 'static,
1337 {
1338 if self.state() != NodeState::Idle {
1339 anyhow::bail!(
1340 "Cannot add exec algorithm while node is running, add exec algorithms before calling start()"
1341 );
1342 }
1343
1344 self.kernel
1345 .trader
1346 .borrow_mut()
1347 .add_exec_algorithm(exec_algorithm)
1348 }
1349
1350 #[expect(clippy::await_holding_refcell_ref)]
1360 async fn run_reconciliation_checks(
1361 &mut self,
1362 inflight_interval_ns: u64,
1363 open_interval_ns: u64,
1364 position_interval_ns: u64,
1365 ts_last_inflight: &mut UnixNanos,
1366 ts_last_open: &mut UnixNanos,
1367 ts_last_position: &mut UnixNanos,
1368 ) -> anyhow::Result<()> {
1369 let ts_now = self.exec_manager.generate_timestamp_ns();
1370
1371 if inflight_interval_ns > 0 && (ts_now - *ts_last_inflight).as_u64() >= inflight_interval_ns
1372 {
1373 if self.state() == NodeState::ShuttingDown {
1374 return Ok(());
1375 }
1376 let result = self.exec_manager.check_inflight_orders();
1377 self.process_reconciliation_events(&result.events);
1378 for cmd in result.queries {
1379 AsyncRunner::handle_exec_command(cmd);
1380 }
1381 *ts_last_inflight = ts_now;
1382 }
1383
1384 if open_interval_ns > 0 && (ts_now - *ts_last_open).as_u64() >= open_interval_ns {
1385 if self.state() == NodeState::ShuttingDown {
1386 return Ok(());
1387 }
1388 let eng_ref = self.kernel.exec_engine.borrow();
1389 let clients = eng_ref.get_all_clients();
1390 let events = self.exec_manager.check_open_orders(&clients).await;
1391 drop(clients);
1392 drop(eng_ref);
1393 self.process_reconciliation_events(&events);
1394 *ts_last_open = ts_now;
1395 }
1396
1397 if position_interval_ns > 0 && (ts_now - *ts_last_position).as_u64() >= position_interval_ns
1398 {
1399 if self.state() == NodeState::ShuttingDown {
1400 return Ok(());
1401 }
1402 let eng_ref = self.kernel.exec_engine.borrow();
1403 let clients = eng_ref.get_all_clients();
1404 let events = self
1405 .exec_manager
1406 .check_positions_consistency(&clients)
1407 .await;
1408 drop(clients);
1409 drop(eng_ref);
1410 self.process_reconciliation_events(&events);
1411 *ts_last_position = ts_now;
1412 }
1413
1414 Ok(())
1415 }
1416}
1417
1418fn flush_pending_data(
1425 pending: &mut PendingEvents,
1426 data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
1427 data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
1428) {
1429 loop {
1430 let mut progressed = pending.drain_data();
1431
1432 while let Ok(evt) = data_evt_rx.try_recv() {
1433 AsyncRunner::handle_data_event(evt);
1434 progressed = true;
1435 }
1436
1437 while let Ok(cmd) = data_cmd_rx.try_recv() {
1438 AsyncRunner::handle_data_command(cmd);
1439 progressed = true;
1440 }
1441
1442 if !progressed {
1443 break;
1444 }
1445 }
1446}
1447
1448fn flush_all_pending(
1454 pending: &mut PendingEvents,
1455 time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
1456 data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
1457 data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
1458 exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1459 exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
1460) {
1461 while let Ok(handler) = time_evt_rx.try_recv() {
1463 AsyncRunner::handle_time_event(handler);
1464 }
1465
1466 while let Ok(evt) = data_evt_rx.try_recv() {
1467 pending.data_evts.push(evt);
1468 }
1469
1470 while let Ok(cmd) = data_cmd_rx.try_recv() {
1471 pending.data_cmds.push(cmd);
1472 }
1473
1474 while let Ok(evt) = exec_evt_rx.try_recv() {
1475 match evt {
1476 ExecutionEvent::Account(_) => {
1477 AsyncRunner::handle_exec_event(evt);
1478 }
1479 ExecutionEvent::Report(report) => {
1480 pending.exec_reports.push(report);
1481 }
1482 ExecutionEvent::Order(order_evt) => {
1483 pending.order_evts.push(order_evt);
1484 }
1485 ExecutionEvent::OrderSubmittedBatch(batch) => {
1486 for submitted in batch {
1487 pending.order_evts.push(OrderEventAny::Submitted(submitted));
1488 }
1489 }
1490 ExecutionEvent::OrderAcceptedBatch(batch) => {
1491 for accepted in batch {
1492 pending.order_evts.push(OrderEventAny::Accepted(accepted));
1493 }
1494 }
1495 ExecutionEvent::OrderCanceledBatch(batch) => {
1496 for canceled in batch {
1497 pending.order_evts.push(OrderEventAny::Canceled(canceled));
1498 }
1499 }
1500 }
1501 }
1502
1503 while let Ok(cmd) = exec_cmd_rx.try_recv() {
1504 pending.exec_cmds.push(cmd);
1505 }
1506
1507 pending.drain();
1508}
1509
1510async fn drive_with_event_buffering<F: std::future::Future>(
1515 future: F,
1516 pending: &mut PendingEvents,
1517 time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
1518 data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
1519 data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
1520 exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1521 exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
1522) -> F::Output {
1523 tokio::pin!(future);
1524
1525 loop {
1526 tokio::select! {
1527 biased;
1528
1529 result = &mut future => {
1530 break result;
1531 }
1532 Some(handler) = time_evt_rx.recv() => {
1533 AsyncRunner::handle_time_event(handler);
1534 }
1535 Some(evt) = data_evt_rx.recv() => {
1536 pending.data_evts.push(evt);
1537 }
1538 Some(cmd) = data_cmd_rx.recv() => {
1539 pending.data_cmds.push(cmd);
1540 }
1541 Some(evt) = exec_evt_rx.recv() => {
1542 match evt {
1546 ExecutionEvent::Account(_) => {
1547 AsyncRunner::handle_exec_event(evt);
1548 }
1549 ExecutionEvent::Report(report) => {
1550 pending.exec_reports.push(report);
1551 }
1552 ExecutionEvent::Order(order_evt) => {
1553 pending.order_evts.push(order_evt);
1554 }
1555 ExecutionEvent::OrderSubmittedBatch(batch) => {
1556 for submitted in batch {
1557 pending.order_evts.push(OrderEventAny::Submitted(submitted));
1558 }
1559 }
1560 ExecutionEvent::OrderAcceptedBatch(batch) => {
1561 for accepted in batch {
1562 pending.order_evts.push(OrderEventAny::Accepted(accepted));
1563 }
1564 }
1565 ExecutionEvent::OrderCanceledBatch(batch) => {
1566 for canceled in batch {
1567 pending.order_evts.push(OrderEventAny::Canceled(canceled));
1568 }
1569 }
1570 }
1571 }
1572 Some(cmd) = exec_cmd_rx.recv() => {
1573 pending.exec_cmds.push(cmd);
1574 }
1575 }
1576 }
1577}
1578
1579#[derive(Default)]
1580struct PendingEvents {
1581 data_cmds: Vec<DataCommand>,
1582 data_evts: Vec<DataEvent>,
1583 exec_cmds: Vec<TradingCommand>,
1584 exec_reports: Vec<ExecutionReport>,
1585 order_evts: Vec<OrderEventAny>,
1586}
1587
1588impl PendingEvents {
1589 fn is_empty(&self) -> bool {
1590 self.data_evts.is_empty()
1591 && self.data_cmds.is_empty()
1592 && self.exec_cmds.is_empty()
1593 && self.exec_reports.is_empty()
1594 && self.order_evts.is_empty()
1595 }
1596
1597 fn drain_data(&mut self) -> bool {
1601 let total = self.data_evts.len() + self.data_cmds.len();
1602
1603 if total > 0 {
1604 log::debug!(
1605 "Draining {total} data events/commands into cache \
1606 (data_evts={}, data_cmds={})",
1607 self.data_evts.len(),
1608 self.data_cmds.len(),
1609 );
1610 }
1611
1612 for evt in self.data_evts.drain(..) {
1613 AsyncRunner::handle_data_event(evt);
1614 }
1615
1616 for cmd in self.data_cmds.drain(..) {
1617 AsyncRunner::handle_data_command(cmd);
1618 }
1619
1620 total > 0
1621 }
1622
1623 fn drain(&mut self) {
1625 let total = self.data_evts.len()
1626 + self.data_cmds.len()
1627 + self.exec_cmds.len()
1628 + self.exec_reports.len()
1629 + self.order_evts.len();
1630
1631 if total > 0 {
1632 log::debug!(
1633 "Processing {total} events/commands queued during startup \
1634 (data_evts={}, data_cmds={}, exec_cmds={}, exec_reports={}, order_evts={})",
1635 self.data_evts.len(),
1636 self.data_cmds.len(),
1637 self.exec_cmds.len(),
1638 self.exec_reports.len(),
1639 self.order_evts.len()
1640 );
1641 }
1642
1643 for evt in self.data_evts.drain(..) {
1644 AsyncRunner::handle_data_event(evt);
1645 }
1646
1647 for cmd in self.data_cmds.drain(..) {
1648 AsyncRunner::handle_data_command(cmd);
1649 }
1650
1651 for report in self.exec_reports.drain(..) {
1652 AsyncRunner::handle_exec_event(ExecutionEvent::Report(report));
1653 }
1654
1655 for cmd in self.exec_cmds.drain(..) {
1656 AsyncRunner::handle_exec_command(cmd);
1657 }
1658
1659 for evt in self.order_evts.drain(..) {
1660 AsyncRunner::handle_exec_event(ExecutionEvent::Order(evt));
1661 }
1662 }
1663}
1664
1665#[cfg(test)]
1666mod tests {
1667 #[cfg(feature = "python")]
1668 use std::sync::Arc;
1669
1670 #[cfg(feature = "python")]
1671 use nautilus_common::runner::{
1672 SyncDataCommandSender, SyncTradingCommandSender, replace_data_cmd_sender,
1673 replace_exec_cmd_sender,
1674 };
1675 use nautilus_model::identifiers::TraderId;
1676 use rstest::*;
1677
1678 use super::*;
1679
1680 #[rstest]
1681 #[case(0, NodeState::Idle)]
1682 #[case(1, NodeState::Starting)]
1683 #[case(2, NodeState::Running)]
1684 #[case(3, NodeState::ShuttingDown)]
1685 #[case(4, NodeState::Stopped)]
1686 fn test_node_state_from_u8_valid(#[case] value: u8, #[case] expected: NodeState) {
1687 assert_eq!(NodeState::from_u8(value), expected);
1688 }
1689
1690 #[rstest]
1691 #[case(5)]
1692 #[case(255)]
1693 #[should_panic(expected = "Invalid NodeState value")]
1694 fn test_node_state_from_u8_invalid_panics(#[case] value: u8) {
1695 let _ = NodeState::from_u8(value);
1696 }
1697
1698 #[rstest]
1699 fn test_node_state_roundtrip() {
1700 for state in [
1701 NodeState::Idle,
1702 NodeState::Starting,
1703 NodeState::Running,
1704 NodeState::ShuttingDown,
1705 NodeState::Stopped,
1706 ] {
1707 assert_eq!(NodeState::from_u8(state.as_u8()), state);
1708 }
1709 }
1710
1711 #[rstest]
1712 fn test_node_state_is_running_only_for_running() {
1713 assert!(!NodeState::Idle.is_running());
1714 assert!(!NodeState::Starting.is_running());
1715 assert!(NodeState::Running.is_running());
1716 assert!(!NodeState::ShuttingDown.is_running());
1717 assert!(!NodeState::Stopped.is_running());
1718 }
1719
1720 #[rstest]
1721 fn test_handle_initial_state() {
1722 let handle = LiveNodeHandle::new();
1723
1724 assert_eq!(handle.state(), NodeState::Idle);
1725 assert!(!handle.should_stop());
1726 assert!(!handle.is_running());
1727 }
1728
1729 #[rstest]
1730 fn test_handle_stop_sets_flag() {
1731 let handle = LiveNodeHandle::new();
1732
1733 handle.stop();
1734
1735 assert!(handle.should_stop());
1736 }
1737
1738 #[rstest]
1739 fn test_handle_set_state_running_clears_stop_flag() {
1740 let handle = LiveNodeHandle::new();
1741 handle.stop();
1742 assert!(handle.should_stop());
1743
1744 handle.set_state(NodeState::Running);
1745
1746 assert!(!handle.should_stop());
1747 assert!(handle.is_running());
1748 assert_eq!(handle.state(), NodeState::Running);
1749 }
1750
1751 #[rstest]
1752 fn test_handle_node_state_transitions() {
1753 let handle = LiveNodeHandle::new();
1754 assert_eq!(handle.state(), NodeState::Idle);
1755
1756 handle.set_state(NodeState::Starting);
1757 assert_eq!(handle.state(), NodeState::Starting);
1758 assert!(!handle.is_running());
1759
1760 handle.set_state(NodeState::Running);
1761 assert_eq!(handle.state(), NodeState::Running);
1762 assert!(handle.is_running());
1763
1764 handle.set_state(NodeState::ShuttingDown);
1765 assert_eq!(handle.state(), NodeState::ShuttingDown);
1766 assert!(!handle.is_running());
1767
1768 handle.set_state(NodeState::Stopped);
1769 assert_eq!(handle.state(), NodeState::Stopped);
1770 assert!(!handle.is_running());
1771 }
1772
1773 #[rstest]
1774 fn test_handle_clone_shares_state_bidirectionally() {
1775 let handle1 = LiveNodeHandle::new();
1776 let handle2 = handle1.clone();
1777
1778 handle1.stop();
1780 assert!(handle2.should_stop());
1781
1782 handle2.set_state(NodeState::Running);
1784 assert_eq!(handle1.state(), NodeState::Running);
1785 }
1786
1787 #[rstest]
1788 fn test_handle_stop_flag_independent_of_state() {
1789 let handle = LiveNodeHandle::new();
1790
1791 handle.set_state(NodeState::Starting);
1793 handle.stop();
1794 assert!(handle.should_stop());
1795 assert_eq!(handle.state(), NodeState::Starting);
1796
1797 handle.set_state(NodeState::ShuttingDown);
1799 assert!(handle.should_stop()); handle.set_state(NodeState::Running);
1802 assert!(!handle.should_stop()); }
1804
1805 #[rstest]
1806 fn test_builder_creation() {
1807 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1808
1809 assert!(result.is_ok());
1810 }
1811
1812 #[rstest]
1813 fn test_builder_rejects_backtest() {
1814 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
1815
1816 assert!(result.is_err());
1817 assert!(result.unwrap_err().to_string().contains("Backtest"));
1818 }
1819
1820 #[rstest]
1821 fn test_builder_accepts_live_environment() {
1822 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live);
1823
1824 assert!(result.is_ok());
1825 }
1826
1827 #[rstest]
1828 fn test_builder_accepts_sandbox_environment() {
1829 let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
1830
1831 assert!(result.is_ok());
1832 }
1833
1834 #[rstest]
1835 fn test_builder_fluent_api_chaining() {
1836 let builder = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live)
1837 .unwrap()
1838 .with_name("TestNode")
1839 .with_instance_id(UUID4::new())
1840 .with_load_state(false)
1841 .with_save_state(true)
1842 .with_timeout_connection(30)
1843 .with_timeout_reconciliation(60)
1844 .with_reconciliation(true)
1845 .with_reconciliation_lookback_mins(120)
1846 .with_timeout_portfolio(10)
1847 .with_timeout_disconnection_secs(5)
1848 .with_delay_post_stop_secs(3)
1849 .with_delay_shutdown_secs(10);
1850
1851 assert_eq!(builder.name(), "TestNode");
1852 }
1853
1854 #[cfg(feature = "python")]
1855 #[rstest]
1856 fn test_node_build_and_initial_state() {
1857 let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1858 .unwrap()
1859 .with_name("TestNode")
1860 .build()
1861 .unwrap();
1862
1863 assert_eq!(node.state(), NodeState::Idle);
1864 assert!(!node.is_running());
1865 assert_eq!(node.environment(), Environment::Sandbox);
1866 assert_eq!(node.trader_id(), TraderId::from("TRADER-001"));
1867 }
1868
1869 #[cfg(feature = "python")]
1870 #[rstest]
1871 fn test_node_build_replaces_stale_runner_senders() {
1872 replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
1873 replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
1874
1875 let first = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1876 .unwrap()
1877 .with_name("FirstNode")
1878 .build()
1879 .unwrap();
1880
1881 assert_eq!(first.state(), NodeState::Idle);
1882 drop(first);
1883
1884 let second = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1885 .unwrap()
1886 .with_name("SecondNode")
1887 .build()
1888 .unwrap();
1889
1890 assert_eq!(second.state(), NodeState::Idle);
1891 assert!(!second.is_running());
1892 }
1893
1894 #[cfg(feature = "python")]
1895 #[rstest]
1896 fn test_node_handle_reflects_node_state() {
1897 let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
1898 .unwrap()
1899 .with_name("TestNode")
1900 .build()
1901 .unwrap();
1902
1903 let handle = node.handle();
1904
1905 assert_eq!(handle.state(), NodeState::Idle);
1906 assert!(!handle.is_running());
1907 }
1908
1909 #[rstest]
1910 fn test_pending_drain_data_returns_false_when_empty() {
1911 let mut pending = PendingEvents::default();
1912
1913 assert!(!pending.drain_data());
1914 }
1915
1916 #[rstest]
1917 fn test_pending_drain_data_returns_true_when_non_empty() {
1918 use nautilus_model::instruments::{InstrumentAny, stubs::crypto_perpetual_ethusdt};
1919
1920 let mut pending = PendingEvents::default();
1921 pending
1922 .data_evts
1923 .push(DataEvent::Instrument(InstrumentAny::CryptoPerpetual(
1924 crypto_perpetual_ethusdt(),
1925 )));
1926
1927 assert!(pending.drain_data());
1928 assert!(pending.data_evts.is_empty());
1929 }
1930
1931 fn stub_data_event() -> DataEvent {
1932 use nautilus_model::instruments::{InstrumentAny, stubs::crypto_perpetual_ethusdt};
1933
1934 DataEvent::Instrument(InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt()))
1935 }
1936
1937 fn stub_data_command() -> DataCommand {
1938 use nautilus_common::messages::data::{SubscribeCommand, subscribe::SubscribeInstruments};
1939 use nautilus_core::{UUID4, UnixNanos};
1940 use nautilus_model::identifiers::Venue;
1941
1942 DataCommand::Subscribe(SubscribeCommand::Instruments(SubscribeInstruments::new(
1943 None,
1944 Venue::from("TEST"),
1945 UUID4::new(),
1946 UnixNanos::default(),
1947 None,
1948 None,
1949 )))
1950 }
1951
1952 #[rstest]
1953 fn test_flush_pending_data_drains_events_and_commands() {
1954 let (evt_tx, mut evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1955 let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1956
1957 let mut pending = PendingEvents::default();
1958
1959 pending.data_evts.push(stub_data_event());
1961 pending.data_cmds.push(stub_data_command());
1962
1963 evt_tx.send(stub_data_event()).unwrap();
1965 cmd_tx.send(stub_data_command()).unwrap();
1966
1967 flush_pending_data(&mut pending, &mut evt_rx, &mut cmd_rx);
1968
1969 assert!(pending.data_evts.is_empty());
1970 assert!(pending.data_cmds.is_empty());
1971 assert!(evt_rx.try_recv().is_err());
1972 assert!(cmd_rx.try_recv().is_err());
1973 }
1974
1975 #[rstest]
1976 fn test_flush_pending_data_drains_mixed_sources() {
1977 let (evt_tx, mut evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1978 let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
1979
1980 let mut pending = PendingEvents::default();
1981
1982 pending.data_evts.push(stub_data_event());
1984 cmd_tx.send(stub_data_command()).unwrap();
1985
1986 evt_tx.send(stub_data_event()).unwrap();
1988 evt_tx.send(stub_data_event()).unwrap();
1989 cmd_tx.send(stub_data_command()).unwrap();
1990
1991 flush_pending_data(&mut pending, &mut evt_rx, &mut cmd_rx);
1992
1993 assert!(pending.data_evts.is_empty());
1994 assert!(pending.data_cmds.is_empty());
1995 assert!(evt_rx.try_recv().is_err());
1996 assert!(cmd_rx.try_recv().is_err());
1997 }
1998
1999 fn stub_time_event_handler() -> TimeEventHandler {
2000 use std::rc::Rc;
2001
2002 use nautilus_common::timer::{TimeEvent, TimeEventCallback, TimeEventHandler};
2003 use nautilus_core::{UUID4, UnixNanos};
2004 use ustr::Ustr;
2005
2006 TimeEventHandler::new(
2007 TimeEvent::new(
2008 Ustr::from("test-timer"),
2009 UUID4::new(),
2010 UnixNanos::default(),
2011 UnixNanos::default(),
2012 ),
2013 TimeEventCallback::RustLocal(Rc::new(|_| {})),
2014 )
2015 }
2016
2017 fn stub_trading_command() -> TradingCommand {
2018 use nautilus_common::messages::execution::query::QueryAccount;
2019 use nautilus_core::{UUID4, UnixNanos};
2020 use nautilus_model::identifiers::AccountId;
2021
2022 TradingCommand::QueryAccount(QueryAccount::new(
2023 TraderId::from("TESTER-001"),
2024 None,
2025 AccountId::from("TEST-001"),
2026 UUID4::new(),
2027 UnixNanos::default(),
2028 None,
2029 ))
2030 }
2031
2032 fn stub_exec_event() -> ExecutionEvent {
2033 use nautilus_model::{
2034 enums::{LiquiditySide, OrderSide},
2035 identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
2036 reports::FillReport,
2037 types::{Money, Price, Quantity},
2038 };
2039
2040 ExecutionEvent::Report(ExecutionReport::Fill(Box::new(FillReport::new(
2041 AccountId::from("TEST-001"),
2042 InstrumentId::from("TEST.VENUE"),
2043 VenueOrderId::from("V-001"),
2044 TradeId::from("T-001"),
2045 OrderSide::Buy,
2046 Quantity::from("1.0"),
2047 Price::from("100.0"),
2048 Money::from("0.01 USD"),
2049 LiquiditySide::Maker,
2050 None,
2051 None,
2052 nautilus_core::UnixNanos::default(),
2053 nautilus_core::UnixNanos::default(),
2054 None,
2055 ))))
2056 }
2057
2058 #[rstest]
2059 fn test_flush_all_pending_drains_all_channel_types() {
2060 let (time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
2061 let (data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2062 let (data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
2063 let (exec_evt_tx, mut exec_evt_rx) =
2064 tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
2065 let (exec_cmd_tx, mut exec_cmd_rx) =
2066 tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
2067
2068 let mut pending = PendingEvents::default();
2069
2070 pending.data_evts.push(stub_data_event());
2072 pending.data_cmds.push(stub_data_command());
2073
2074 time_tx.send(stub_time_event_handler()).unwrap();
2076 data_evt_tx.send(stub_data_event()).unwrap();
2077 data_cmd_tx.send(stub_data_command()).unwrap();
2078 exec_evt_tx.send(stub_exec_event()).unwrap();
2079 exec_cmd_tx.send(stub_trading_command()).unwrap();
2080
2081 flush_all_pending(
2082 &mut pending,
2083 &mut time_rx,
2084 &mut data_evt_rx,
2085 &mut data_cmd_rx,
2086 &mut exec_evt_rx,
2087 &mut exec_cmd_rx,
2088 );
2089
2090 assert!(pending.data_evts.is_empty());
2091 assert!(pending.data_cmds.is_empty());
2092 assert!(pending.exec_reports.is_empty());
2093 assert!(pending.exec_cmds.is_empty());
2094 assert!(pending.order_evts.is_empty());
2095 assert!(time_rx.try_recv().is_err());
2096 assert!(data_evt_rx.try_recv().is_err());
2097 assert!(data_cmd_rx.try_recv().is_err());
2098 assert!(exec_evt_rx.try_recv().is_err());
2099 assert!(exec_cmd_rx.try_recv().is_err());
2100 }
2101
2102 fn stub_order_event() -> ExecutionEvent {
2103 use nautilus_core::{UUID4, UnixNanos};
2104 use nautilus_model::{
2105 events::OrderSubmitted,
2106 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId},
2107 };
2108
2109 ExecutionEvent::Order(OrderEventAny::Submitted(OrderSubmitted::new(
2110 TraderId::from("TESTER-001"),
2111 StrategyId::from("S-001"),
2112 InstrumentId::from("TEST.VENUE"),
2113 ClientOrderId::from("O-001"),
2114 AccountId::from("TEST-001"),
2115 UUID4::new(),
2116 UnixNanos::default(),
2117 UnixNanos::default(),
2118 )))
2119 }
2120
2121 fn stub_account_event() -> ExecutionEvent {
2122 use nautilus_core::{UUID4, UnixNanos};
2123 use nautilus_model::{
2124 enums::AccountType, events::account::state::AccountState, identifiers::AccountId,
2125 };
2126
2127 ExecutionEvent::Account(AccountState::new(
2128 AccountId::from("TEST-001"),
2129 AccountType::Cash,
2130 vec![],
2131 vec![],
2132 true,
2133 UUID4::new(),
2134 UnixNanos::default(),
2135 UnixNanos::default(),
2136 None,
2137 ))
2138 }
2139
2140 #[rstest]
2141 fn test_flush_all_pending_routes_order_event_to_order_evts() {
2142 let (_time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
2143 let (_data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2144 let (_data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
2145 let (exec_evt_tx, mut exec_evt_rx) =
2146 tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
2147 let (_exec_cmd_tx, mut exec_cmd_rx) =
2148 tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
2149
2150 let mut pending = PendingEvents::default();
2151
2152 exec_evt_tx.send(stub_order_event()).unwrap();
2153 exec_evt_tx.send(stub_exec_event()).unwrap();
2154
2155 flush_all_pending(
2156 &mut pending,
2157 &mut time_rx,
2158 &mut data_evt_rx,
2159 &mut data_cmd_rx,
2160 &mut exec_evt_rx,
2161 &mut exec_cmd_rx,
2162 );
2163
2164 assert!(pending.order_evts.is_empty());
2166 assert!(pending.exec_reports.is_empty());
2167 assert!(exec_evt_rx.try_recv().is_err());
2168 }
2169
2170 #[rstest]
2171 fn test_flush_all_pending_routes_account_event_immediately() {
2172 let (_time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
2173 let (_data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2174 let (_data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
2175 let (exec_evt_tx, mut exec_evt_rx) =
2176 tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
2177 let (_exec_cmd_tx, mut exec_cmd_rx) =
2178 tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
2179
2180 let mut pending = PendingEvents::default();
2181
2182 exec_evt_tx.send(stub_account_event()).unwrap();
2183
2184 flush_all_pending(
2185 &mut pending,
2186 &mut time_rx,
2187 &mut data_evt_rx,
2188 &mut data_cmd_rx,
2189 &mut exec_evt_rx,
2190 &mut exec_cmd_rx,
2191 );
2192
2193 assert!(pending.exec_reports.is_empty());
2195 assert!(pending.order_evts.is_empty());
2196 assert!(pending.exec_cmds.is_empty());
2197 assert!(exec_evt_rx.try_recv().is_err());
2198 }
2199
2200 #[rstest]
2201 fn test_pending_is_empty_when_default() {
2202 let pending = PendingEvents::default();
2203
2204 assert!(pending.is_empty());
2205 }
2206
2207 #[rstest]
2208 fn test_pending_is_empty_false_with_data_evt() {
2209 let mut pending = PendingEvents::default();
2210 pending.data_evts.push(stub_data_event());
2211
2212 assert!(!pending.is_empty());
2213 }
2214
2215 #[rstest]
2216 fn test_pending_is_empty_false_with_data_cmd() {
2217 let mut pending = PendingEvents::default();
2218 pending.data_cmds.push(stub_data_command());
2219
2220 assert!(!pending.is_empty());
2221 }
2222
2223 #[rstest]
2224 fn test_pending_is_empty_false_with_exec_cmd() {
2225 let mut pending = PendingEvents::default();
2226 pending.exec_cmds.push(stub_trading_command());
2227
2228 assert!(!pending.is_empty());
2229 }
2230
2231 #[rstest]
2232 fn test_pending_is_empty_false_with_exec_report() {
2233 let mut pending = PendingEvents::default();
2234
2235 if let ExecutionEvent::Report(report) = stub_exec_event() {
2236 pending.exec_reports.push(report);
2237 }
2238
2239 assert!(!pending.is_empty());
2240 }
2241
2242 #[rstest]
2243 fn test_pending_is_empty_false_with_order_evt() {
2244 let mut pending = PendingEvents::default();
2245
2246 if let ExecutionEvent::Order(order_evt) = stub_order_event() {
2247 pending.order_evts.push(order_evt);
2248 }
2249
2250 assert!(!pending.is_empty());
2251 }
2252
2253 fn stub_submitted_batch_event() -> ExecutionEvent {
2254 use nautilus_core::{UUID4, UnixNanos};
2255 use nautilus_model::{
2256 events::{OrderSubmitted, OrderSubmittedBatch},
2257 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId},
2258 };
2259
2260 let events = vec![
2261 OrderSubmitted::new(
2262 TraderId::from("TESTER-001"),
2263 StrategyId::from("S-001"),
2264 InstrumentId::from("TEST.VENUE"),
2265 ClientOrderId::from("O-001"),
2266 AccountId::from("TEST-001"),
2267 UUID4::new(),
2268 UnixNanos::default(),
2269 UnixNanos::default(),
2270 ),
2271 OrderSubmitted::new(
2272 TraderId::from("TESTER-001"),
2273 StrategyId::from("S-001"),
2274 InstrumentId::from("TEST.VENUE"),
2275 ClientOrderId::from("O-002"),
2276 AccountId::from("TEST-001"),
2277 UUID4::new(),
2278 UnixNanos::default(),
2279 UnixNanos::default(),
2280 ),
2281 ];
2282
2283 ExecutionEvent::OrderSubmittedBatch(OrderSubmittedBatch::new(events))
2284 }
2285
2286 fn stub_canceled_batch_event() -> ExecutionEvent {
2287 use nautilus_core::{UUID4, UnixNanos};
2288 use nautilus_model::{
2289 events::{OrderCanceled, OrderCanceledBatch},
2290 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId},
2291 };
2292
2293 let events = vec![
2294 OrderCanceled::new(
2295 TraderId::from("TESTER-001"),
2296 StrategyId::from("S-001"),
2297 InstrumentId::from("TEST.VENUE"),
2298 ClientOrderId::from("O-001"),
2299 UUID4::new(),
2300 UnixNanos::default(),
2301 UnixNanos::default(),
2302 false,
2303 None,
2304 Some(AccountId::from("TEST-001")),
2305 ),
2306 OrderCanceled::new(
2307 TraderId::from("TESTER-001"),
2308 StrategyId::from("S-001"),
2309 InstrumentId::from("TEST.VENUE"),
2310 ClientOrderId::from("O-002"),
2311 UUID4::new(),
2312 UnixNanos::default(),
2313 UnixNanos::default(),
2314 false,
2315 None,
2316 Some(AccountId::from("TEST-001")),
2317 ),
2318 ];
2319
2320 ExecutionEvent::OrderCanceledBatch(OrderCanceledBatch::new(events))
2321 }
2322
2323 #[rstest]
2324 fn test_flush_all_pending_buffers_submitted_batch_as_individual_events() {
2325 let (_time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
2326 let (_data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2327 let (_data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
2328 let (exec_evt_tx, mut exec_evt_rx) =
2329 tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
2330 let (_exec_cmd_tx, mut exec_cmd_rx) =
2331 tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
2332
2333 let mut pending = PendingEvents::default();
2334
2335 exec_evt_tx.send(stub_submitted_batch_event()).unwrap();
2336
2337 flush_all_pending(
2338 &mut pending,
2339 &mut time_rx,
2340 &mut data_evt_rx,
2341 &mut data_cmd_rx,
2342 &mut exec_evt_rx,
2343 &mut exec_cmd_rx,
2344 );
2345
2346 assert!(pending.order_evts.is_empty());
2348 assert!(exec_evt_rx.try_recv().is_err());
2349 }
2350
2351 #[rstest]
2352 fn test_flush_all_pending_buffers_canceled_batch_as_individual_events() {
2353 let (_time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
2354 let (_data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
2355 let (_data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
2356 let (exec_evt_tx, mut exec_evt_rx) =
2357 tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
2358 let (_exec_cmd_tx, mut exec_cmd_rx) =
2359 tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
2360
2361 let mut pending = PendingEvents::default();
2362
2363 exec_evt_tx.send(stub_canceled_batch_event()).unwrap();
2364
2365 flush_all_pending(
2366 &mut pending,
2367 &mut time_rx,
2368 &mut data_evt_rx,
2369 &mut data_cmd_rx,
2370 &mut exec_evt_rx,
2371 &mut exec_cmd_rx,
2372 );
2373
2374 assert!(pending.order_evts.is_empty());
2376 assert!(exec_evt_rx.try_recv().is_err());
2377 }
2378
2379 #[rstest]
2380 fn test_flush_all_pending_expands_batch_into_order_evts_before_drain() {
2381 use nautilus_model::identifiers::ClientOrderId;
2382
2383 let (exec_evt_tx, mut exec_evt_rx) =
2384 tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
2385
2386 exec_evt_tx.send(stub_canceled_batch_event()).unwrap();
2387
2388 let mut pending = PendingEvents::default();
2389
2390 while let Ok(evt) = exec_evt_rx.try_recv() {
2392 match evt {
2393 ExecutionEvent::Account(_) => {
2394 AsyncRunner::handle_exec_event(evt);
2395 }
2396 ExecutionEvent::Report(report) => {
2397 pending.exec_reports.push(report);
2398 }
2399 ExecutionEvent::Order(order_evt) => {
2400 pending.order_evts.push(order_evt);
2401 }
2402 ExecutionEvent::OrderSubmittedBatch(batch) => {
2403 for submitted in batch {
2404 pending.order_evts.push(OrderEventAny::Submitted(submitted));
2405 }
2406 }
2407 ExecutionEvent::OrderAcceptedBatch(batch) => {
2408 for accepted in batch {
2409 pending.order_evts.push(OrderEventAny::Accepted(accepted));
2410 }
2411 }
2412 ExecutionEvent::OrderCanceledBatch(batch) => {
2413 for canceled in batch {
2414 pending.order_evts.push(OrderEventAny::Canceled(canceled));
2415 }
2416 }
2417 }
2418 }
2419
2420 assert_eq!(pending.order_evts.len(), 2);
2421 assert!(
2422 matches!(&pending.order_evts[0], OrderEventAny::Canceled(c) if c.client_order_id == ClientOrderId::from("O-001"))
2423 );
2424 assert!(
2425 matches!(&pending.order_evts[1], OrderEventAny::Canceled(c) if c.client_order_id == ClientOrderId::from("O-002"))
2426 );
2427 }
2428}