Skip to main content

nautilus_system/
trader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Central orchestrator for managing actors, strategies, and execution algorithms.
17//!
18//! The `Trader` component serves as the primary coordination layer between the kernel
19//! and individual trading components. It manages component lifecycles, provides
20//! unique identification, and coordinates with system engines.
21
22use std::{cell::RefCell, fmt::Debug, rc::Rc};
23
24use ahash::AHashMap;
25use nautilus_common::{
26    actor::{DataActor, registry::try_get_actor_unchecked},
27    cache::Cache,
28    clock::{Clock, TestClock},
29    component::{
30        Component, dispose_component, register_component_actor, reset_component, start_component,
31        stop_component,
32    },
33    enums::{ComponentState, ComponentTrigger, Environment},
34    messages::execution::TradingCommand,
35    msgbus,
36    msgbus::{
37        Endpoint, MStr, ShareableMessageHandler, TypedHandler, get_message_bus,
38        switchboard::{get_event_orders_topic, get_event_positions_topic},
39    },
40    timer::{TimeEvent, TimeEventCallback},
41};
42use nautilus_core::{UUID4, UnixNanos};
43use nautilus_model::{
44    events::{OrderEventAny, PositionEvent},
45    identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId},
46};
47use nautilus_portfolio::portfolio::Portfolio;
48use nautilus_trading::{ExecutionAlgorithm, strategy::Strategy};
49use ustr::Ustr;
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub(crate) enum StrategyCommand {
53    ExitMarket,
54}
55
56fn strategy_control_endpoint(strategy_id: StrategyId) -> MStr<Endpoint> {
57    format!("{strategy_id}.control").into()
58}
59
60/// Central orchestrator for managing trading components.
61///
62/// The `Trader` manages the lifecycle and coordination of actors, strategies,
63/// and execution algorithms within the trading system. It provides component
64/// registration, state management, and integration with system engines.
65///
66/// # Notes
67///
68/// Strategies implement `Strategy::stop() -> bool` which returns whether to proceed
69/// with the component stop. This enables `manage_stop` behavior where the strategy
70/// can defer stopping until a market exit completes.
71///
72/// We store type-erased closures because the component registry stores trait objects
73/// and we need to call `Strategy::stop()` which requires the concrete type. The
74/// closure is created during `add_strategy` when the concrete type `T` is known.
75pub struct Trader {
76    /// The unique trader identifier.
77    pub trader_id: TraderId,
78    /// The unique instance identifier.
79    pub instance_id: UUID4,
80    /// The trading environment context.
81    pub environment: Environment,
82    /// Component state for lifecycle management.
83    state: ComponentState,
84    /// System clock for timestamping.
85    clock: Rc<RefCell<dyn Clock>>,
86    /// System cache for data storage.
87    cache: Rc<RefCell<Cache>>,
88    /// Portfolio reference for strategy registration.
89    portfolio: Rc<RefCell<Portfolio>>,
90    /// Registered actor IDs (actors stored in global registry).
91    actor_ids: Vec<ActorId>,
92    /// Registered strategy IDs (strategies stored in global registry).
93    strategy_ids: Vec<StrategyId>,
94    /// Strategy stop functions for managed stop behavior.
95    strategy_stop_fns: AHashMap<StrategyId, Box<dyn FnMut() -> bool>>,
96    /// Msgbus handler IDs for strategy event subscriptions (order, position).
97    strategy_handler_ids: AHashMap<StrategyId, (Ustr, Ustr)>,
98    /// Registered exec algorithm IDs (algorithms stored in global registry).
99    exec_algorithm_ids: Vec<ExecAlgorithmId>,
100    /// Component clocks for individual components.
101    clocks: AHashMap<ComponentId, Rc<RefCell<dyn Clock>>>,
102    /// Timestamp when the trader was created.
103    ts_created: UnixNanos,
104    /// Timestamp when the trader was last started.
105    ts_started: Option<UnixNanos>,
106    /// Timestamp when the trader was last stopped.
107    ts_stopped: Option<UnixNanos>,
108}
109
110impl Debug for Trader {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        write!(f, "{:?}", stringify!(TraderId)) // TODO
113    }
114}
115
116impl Trader {
117    /// Creates a new [`Trader`] instance.
118    #[must_use]
119    pub fn new(
120        trader_id: TraderId,
121        instance_id: UUID4,
122        environment: Environment,
123        clock: Rc<RefCell<dyn Clock>>,
124        cache: Rc<RefCell<Cache>>,
125        portfolio: Rc<RefCell<Portfolio>>,
126    ) -> Self {
127        let ts_created = clock.borrow().timestamp_ns();
128
129        Self {
130            trader_id,
131            instance_id,
132            environment,
133            state: ComponentState::PreInitialized,
134            clock,
135            cache,
136            portfolio,
137            actor_ids: Vec::new(),
138            strategy_ids: Vec::new(),
139            strategy_stop_fns: AHashMap::new(),
140            strategy_handler_ids: AHashMap::new(),
141            exec_algorithm_ids: Vec::new(),
142            clocks: AHashMap::new(),
143            ts_created,
144            ts_started: None,
145            ts_stopped: None,
146        }
147    }
148
149    /// Returns the trader ID.
150    #[must_use]
151    pub const fn trader_id(&self) -> TraderId {
152        self.trader_id
153    }
154
155    /// Returns the instance ID.
156    #[must_use]
157    pub const fn instance_id(&self) -> UUID4 {
158        self.instance_id
159    }
160
161    /// Returns the trading environment.
162    #[must_use]
163    pub const fn environment(&self) -> Environment {
164        self.environment
165    }
166
167    /// Returns the current component state.
168    #[must_use]
169    pub const fn state(&self) -> ComponentState {
170        self.state
171    }
172
173    /// Returns the timestamp when the trader was created (UNIX nanoseconds).
174    #[must_use]
175    pub const fn ts_created(&self) -> UnixNanos {
176        self.ts_created
177    }
178
179    /// Returns the timestamp when the trader was last started (UNIX nanoseconds).
180    #[must_use]
181    pub const fn ts_started(&self) -> Option<UnixNanos> {
182        self.ts_started
183    }
184
185    /// Returns the timestamp when the trader was last stopped (UNIX nanoseconds).
186    #[must_use]
187    pub const fn ts_stopped(&self) -> Option<UnixNanos> {
188        self.ts_stopped
189    }
190
191    /// Returns the number of registered actors.
192    #[must_use]
193    pub const fn actor_count(&self) -> usize {
194        self.actor_ids.len()
195    }
196
197    /// Returns the number of registered strategies.
198    #[must_use]
199    pub const fn strategy_count(&self) -> usize {
200        self.strategy_ids.len()
201    }
202
203    /// Returns the number of registered execution algorithms.
204    #[must_use]
205    pub const fn exec_algorithm_count(&self) -> usize {
206        self.exec_algorithm_ids.len()
207    }
208
209    /// Returns references to all component clocks for backtest time advancement.
210    pub fn get_component_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
211        self.clocks.values().cloned().collect()
212    }
213
214    /// Returns the total number of registered components.
215    #[must_use]
216    pub const fn component_count(&self) -> usize {
217        self.actor_ids.len() + self.strategy_ids.len() + self.exec_algorithm_ids.len()
218    }
219
220    /// Returns a list of all registered actor IDs.
221    #[must_use]
222    pub fn actor_ids(&self) -> Vec<ActorId> {
223        self.actor_ids.clone()
224    }
225
226    /// Returns a list of all registered strategy IDs.
227    #[must_use]
228    pub fn strategy_ids(&self) -> Vec<StrategyId> {
229        self.strategy_ids.clone()
230    }
231
232    /// Returns a list of all registered execution algorithm IDs.
233    #[must_use]
234    pub fn exec_algorithm_ids(&self) -> Vec<ExecAlgorithmId> {
235        self.exec_algorithm_ids.clone()
236    }
237
238    /// Creates a clock for a component and registers it for time advancement.
239    ///
240    /// Each component gets its own clock instance so that the default time event
241    /// callback registered on each clock is independent. In backtest mode, the
242    /// clocks are also used for deterministic time advancement by the engine.
243    pub fn create_component_clock(&mut self, component_id: ComponentId) -> Rc<RefCell<dyn Clock>> {
244        let clock: Rc<RefCell<dyn Clock>> = match self.environment {
245            Environment::Backtest => Rc::new(RefCell::new(TestClock::new())),
246            Environment::Live | Environment::Sandbox => Self::create_live_clock(),
247        };
248        self.clocks.insert(component_id, clock.clone());
249        clock
250    }
251
252    #[cfg(feature = "live")]
253    fn create_live_clock() -> Rc<RefCell<dyn Clock>> {
254        Rc::new(RefCell::new(
255            nautilus_common::live::clock::LiveClock::default(), // nautilus-import-ok
256        ))
257    }
258
259    #[cfg(not(feature = "live"))]
260    fn create_live_clock() -> Rc<RefCell<dyn Clock>> {
261        panic!("Live/Sandbox environment requires the 'live' feature to be enabled");
262    }
263
264    /// Adds an actor to the trader.
265    ///
266    /// # Errors
267    ///
268    /// Returns an error if:
269    /// - The trader is not in a valid state for adding components.
270    /// - An actor with the same ID is already registered.
271    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
272    where
273        T: DataActor + Component + Debug + 'static,
274    {
275        self.validate_actor_or_strategy_registration()?;
276
277        let actor_id = actor.actor_id();
278
279        // Check for duplicate registration
280        if self.actor_ids.contains(&actor_id) {
281            anyhow::bail!("Actor {actor_id} is already registered");
282        }
283
284        let component_id = ComponentId::new(actor_id.inner().as_str());
285        let clock = self.create_component_clock(component_id);
286
287        let mut actor_mut = actor;
288        actor_mut.register(self.trader_id, clock, self.cache.clone())?;
289
290        self.add_registered_actor(actor_mut)
291    }
292
293    /// Adds an actor to the trader using a factory function.
294    ///
295    /// The factory function is called at registration time to create the actor,
296    /// avoiding cloning issues with non-cloneable actor types.
297    ///
298    /// # Errors
299    ///
300    /// Returns an error if:
301    /// - The factory function fails to create the actor.
302    /// - The trader is not in a valid state for adding components.
303    /// - An actor with the same ID is already registered.
304    pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
305    where
306        F: FnOnce() -> anyhow::Result<T>,
307        T: DataActor + Component + Debug + 'static,
308    {
309        let actor = factory()?;
310
311        self.add_actor(actor)
312    }
313
314    /// Adds an already registered actor to the trader's component registry.
315    ///
316    /// # Errors
317    ///
318    /// Returns an error if the actor cannot be registered in the component registry.
319    pub fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
320    where
321        T: DataActor + Component + Debug + 'static,
322    {
323        let actor_id = actor.actor_id();
324
325        // Register in both component and actor registries (this consumes the actor)
326        register_component_actor(actor);
327
328        // Store actor ID for lifecycle management
329        self.actor_ids.push(actor_id);
330
331        log::info!("Registered actor {actor_id} with trader {}", self.trader_id);
332
333        Ok(())
334    }
335
336    /// Adds an actor ID to the trader's lifecycle management without consuming the actor.
337    ///
338    /// This is useful when the actor is already registered in the global component registry
339    /// but the trader needs to track it for lifecycle management. The caller is responsible
340    /// for ensuring the actor is properly registered in the global registries.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if the actor ID is already tracked by this trader.
345    pub fn add_actor_id_for_lifecycle(&mut self, actor_id: ActorId) -> anyhow::Result<()> {
346        // Check for duplicate registration
347        if self.actor_ids.contains(&actor_id) {
348            anyhow::bail!("Actor '{actor_id}' is already tracked by trader");
349        }
350
351        // Store actor ID for lifecycle management
352        self.actor_ids.push(actor_id);
353
354        log::debug!(
355            "Added actor ID '{actor_id}' to trader {} for lifecycle management",
356            self.trader_id
357        );
358
359        Ok(())
360    }
361
362    /// Adds an externally-registered execution algorithm ID to the trader for lifecycle management.
363    ///
364    /// The execution algorithm must already be registered in the global component and actor
365    /// registries. This method only tracks the ID so the trader can manage the algorithm's
366    /// lifecycle (start/stop/dispose).
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if an execution algorithm with the same ID is already tracked.
371    pub fn add_exec_algorithm_id_for_lifecycle(
372        &mut self,
373        exec_algorithm_id: ExecAlgorithmId,
374    ) -> anyhow::Result<()> {
375        if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
376            anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already tracked by trader");
377        }
378
379        self.exec_algorithm_ids.push(exec_algorithm_id);
380
381        log::debug!(
382            "Added exec algorithm ID '{exec_algorithm_id}' to trader {} for lifecycle management",
383            self.trader_id
384        );
385
386        Ok(())
387    }
388
389    /// Adds an externally-registered strategy to the trader for lifecycle management
390    /// and installs its order/position event subscriptions, stop hook, and control endpoint.
391    ///
392    /// The strategy must already be registered in the global component and actor
393    /// registries. The generic parameter `T` must match the concrete type stored
394    /// in those registries so that the typed event handlers can retrieve it.
395    ///
396    /// # Errors
397    ///
398    /// Returns an error if the strategy ID is already tracked by this trader.
399    pub fn add_strategy_id_with_subscriptions<T>(
400        &mut self,
401        strategy_id: StrategyId,
402    ) -> anyhow::Result<()>
403    where
404        T: Strategy + Component + Debug + 'static,
405    {
406        if self.strategy_ids.contains(&strategy_id) {
407            anyhow::bail!("Strategy '{strategy_id}' is already tracked by trader");
408        }
409
410        let actor_id = Ustr::from(strategy_id.inner().as_str());
411
412        // Subscribe to order events for this strategy
413        let order_topic = get_event_orders_topic(strategy_id);
414        let order_actor_id = actor_id;
415        let order_handler = TypedHandler::from(move |event: &OrderEventAny| {
416            if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
417                strategy.handle_order_event(event.clone());
418            } else {
419                log::error!("Strategy {order_actor_id} not found for order event handling");
420            }
421        });
422        let order_handler_id = order_handler.id();
423        msgbus::subscribe_order_events(order_topic.into(), order_handler, None);
424
425        // Subscribe to position events for this strategy
426        let position_topic = get_event_positions_topic(strategy_id);
427        let position_handler = TypedHandler::from(move |event: &PositionEvent| {
428            if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
429                strategy.handle_position_event(event.clone());
430            } else {
431                log::error!("Strategy {actor_id} not found for position event handling");
432            }
433        });
434        let position_handler_id = position_handler.id();
435        msgbus::subscribe_position_events(position_topic.into(), position_handler, None);
436
437        let control_actor_id = actor_id;
438        let control_handler = TypedHandler::from(move |command: &StrategyCommand| {
439            if let Some(mut strategy) = try_get_actor_unchecked::<T>(&control_actor_id) {
440                match command {
441                    StrategyCommand::ExitMarket => {
442                        if let Err(e) = strategy.market_exit() {
443                            log::error!(
444                                "Error handling strategy command for {control_actor_id}: {e}"
445                            );
446                        }
447                    }
448                }
449            } else {
450                log::error!("Strategy {control_actor_id} not found for control handling");
451            }
452        });
453        get_message_bus()
454            .borrow_mut()
455            .endpoint_map::<StrategyCommand>()
456            .register(strategy_control_endpoint(strategy_id), control_handler);
457
458        self.strategy_ids.push(strategy_id);
459        self.strategy_handler_ids
460            .insert(strategy_id, (order_handler_id, position_handler_id));
461
462        // Register stop hook
463        let stop_actor_id = actor_id;
464        let stop_fn = Box::new(move || -> bool {
465            if let Some(mut strategy) = try_get_actor_unchecked::<T>(&stop_actor_id) {
466                Strategy::stop(&mut *strategy)
467            } else {
468                log::error!("Strategy {stop_actor_id} not found for stop");
469                true
470            }
471        });
472        self.strategy_stop_fns.insert(strategy_id, stop_fn);
473
474        log::debug!(
475            "Added strategy '{strategy_id}' to trader {} with event subscriptions",
476            self.trader_id
477        );
478
479        Ok(())
480    }
481
482    /// Adds a strategy to the trader.
483    ///
484    /// Strategies are registered in both the component registry (for lifecycle management)
485    /// and the actor registry (for data callbacks via msgbus). The strategy's `StrategyCore`
486    /// is also registered with the portfolio for order management.
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if:
491    /// - The trader is not in a valid state for adding components.
492    /// - A strategy with the same ID is already registered.
493    pub fn add_strategy<T>(&mut self, mut strategy: T) -> anyhow::Result<()>
494    where
495        T: Strategy + Component + Debug + 'static,
496    {
497        self.validate_actor_or_strategy_registration()?;
498
499        let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
500
501        // Check for duplicate registration
502        if self.strategy_ids.contains(&strategy_id) {
503            anyhow::bail!("Strategy {strategy_id} is already registered");
504        }
505
506        let component_id = strategy.component_id();
507        let clock = self.create_component_clock(component_id);
508
509        // Register strategy core with portfolio for order management
510        strategy.core_mut().register(
511            self.trader_id,
512            clock.clone(),
513            self.cache.clone(),
514            self.portfolio.clone(),
515        )?;
516
517        // Register default time event handler for this strategy
518        let actor_id = strategy.actor_id().inner();
519        let callback = TimeEventCallback::from(move |event: TimeEvent| {
520            if let Some(mut actor) = try_get_actor_unchecked::<T>(&actor_id) {
521                actor.handle_time_event(&event);
522            } else {
523                log::error!("Strategy {actor_id} not found for time event handling");
524            }
525        });
526        clock.borrow_mut().register_default_handler(callback);
527
528        // Transition to Ready state
529        strategy.initialize()?;
530
531        // Register in both component and actor registries
532        register_component_actor(strategy);
533
534        let order_topic = get_event_orders_topic(strategy_id);
535        let order_actor_id = actor_id;
536        let order_handler = TypedHandler::from(move |event: &OrderEventAny| {
537            if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
538                strategy.handle_order_event(event.clone());
539            } else {
540                log::error!("Strategy {order_actor_id} not found for order event handling");
541            }
542        });
543        let order_handler_id = order_handler.id();
544        msgbus::subscribe_order_events(order_topic.into(), order_handler, None);
545
546        let position_topic = get_event_positions_topic(strategy_id);
547        let position_handler = TypedHandler::from(move |event: &PositionEvent| {
548            if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
549                strategy.handle_position_event(event.clone());
550            } else {
551                log::error!("Strategy {actor_id} not found for position event handling");
552            }
553        });
554        let position_handler_id = position_handler.id();
555        msgbus::subscribe_position_events(position_topic.into(), position_handler, None);
556
557        let control_actor_id = actor_id;
558        let control_handler = TypedHandler::from(move |command: &StrategyCommand| {
559            if let Some(mut strategy) = try_get_actor_unchecked::<T>(&control_actor_id) {
560                match command {
561                    StrategyCommand::ExitMarket => {
562                        if let Err(e) = strategy.market_exit() {
563                            log::error!(
564                                "Error handling strategy command for {control_actor_id}: {e}"
565                            );
566                        }
567                    }
568                }
569            } else {
570                log::error!("Strategy {control_actor_id} not found for control handling");
571            }
572        });
573        get_message_bus()
574            .borrow_mut()
575            .endpoint_map::<StrategyCommand>()
576            .register(strategy_control_endpoint(strategy_id), control_handler);
577
578        self.strategy_ids.push(strategy_id);
579        self.strategy_handler_ids
580            .insert(strategy_id, (order_handler_id, position_handler_id));
581
582        let stop_actor_id = actor_id;
583        let stop_fn = Box::new(move || -> bool {
584            if let Some(mut strategy) = try_get_actor_unchecked::<T>(&stop_actor_id) {
585                Strategy::stop(&mut *strategy)
586            } else {
587                log::error!("Strategy {stop_actor_id} not found for stop");
588                true // Proceed with component stop anyway
589            }
590        });
591        self.strategy_stop_fns.insert(strategy_id, stop_fn);
592
593        log::info!(
594            "Registered strategy {strategy_id} with trader {}",
595            self.trader_id
596        );
597
598        Ok(())
599    }
600
601    /// Adds an execution algorithm to the trader.
602    ///
603    /// Execution algorithms are registered in both the component registry (for lifecycle
604    /// management) and the actor registry (for data callbacks via msgbus).
605    ///
606    /// # Errors
607    ///
608    /// Returns an error if:
609    /// - The trader is not in a valid state for adding components.
610    /// - An execution algorithm with the same ID is already registered.
611    pub fn add_exec_algorithm<T>(&mut self, mut exec_algorithm: T) -> anyhow::Result<()>
612    where
613        T: ExecutionAlgorithm + Component + Debug + 'static,
614    {
615        self.validate_exec_algorithm_registration()?;
616
617        let exec_algorithm_id =
618            ExecAlgorithmId::from(exec_algorithm.component_id().inner().as_str());
619
620        if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
621            anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already registered");
622        }
623
624        let component_id = exec_algorithm.component_id();
625        let clock = self.create_component_clock(component_id);
626
627        exec_algorithm.register(self.trader_id, clock, self.cache.clone())?;
628
629        register_component_actor(exec_algorithm);
630
631        // Register the {id}.execute endpoint so the order manager can
632        // route TradingCommands to this algorithm via msgbus::send_any
633        let actor_id = Ustr::from(exec_algorithm_id.inner().as_str());
634        let endpoint: Ustr = format!("{exec_algorithm_id}.execute").into();
635        let handler = ShareableMessageHandler::from_typed(move |command: &TradingCommand| {
636            if let Some(mut algo) = try_get_actor_unchecked::<T>(&actor_id) {
637                if let Err(e) = algo.execute(command.clone()) {
638                    log::error!("Error executing command on algorithm {actor_id}: {e}");
639                }
640            } else {
641                log::error!("Execution algorithm {actor_id} not found in registry");
642            }
643        });
644        msgbus::register_any(endpoint.into(), handler);
645
646        self.exec_algorithm_ids.push(exec_algorithm_id);
647
648        log::info!(
649            "Registered execution algorithm {exec_algorithm_id} with trader {}",
650            self.trader_id
651        );
652
653        Ok(())
654    }
655
656    /// Validates that the trader is in a valid state for actor and strategy registration.
657    ///
658    /// Actors and strategies can be added while the trader is `PreInitialized`, `Ready`,
659    /// `Stopped`, or `Running`. This enables the [`Controller`](crate::controller::Controller)
660    /// to add them at runtime.
661    fn validate_actor_or_strategy_registration(&self) -> anyhow::Result<()> {
662        match self.state {
663            ComponentState::PreInitialized
664            | ComponentState::Ready
665            | ComponentState::Stopped
666            | ComponentState::Running => Ok(()),
667            ComponentState::Disposed => {
668                anyhow::bail!("Cannot add components to disposed trader")
669            }
670            _ => anyhow::bail!("Cannot add components in current state: {}", self.state),
671        }
672    }
673
674    /// Validates that the trader is in a valid state for execution algorithm registration.
675    fn validate_exec_algorithm_registration(&self) -> anyhow::Result<()> {
676        match self.state {
677            ComponentState::PreInitialized | ComponentState::Ready | ComponentState::Stopped => {
678                Ok(())
679            }
680            ComponentState::Running => {
681                anyhow::bail!("Cannot add execution algorithms to running trader")
682            }
683            ComponentState::Disposed => {
684                anyhow::bail!("Cannot add components to disposed trader")
685            }
686            _ => anyhow::bail!(
687                "Cannot add execution algorithms in current state: {}",
688                self.state
689            ),
690        }
691    }
692
693    /// Starts all registered components.
694    ///
695    /// # Errors
696    ///
697    /// Returns an error if any component fails to start.
698    pub fn start_components(&mut self) -> anyhow::Result<()> {
699        for actor_id in &self.actor_ids {
700            log::debug!("Starting actor {actor_id}");
701            start_component(&actor_id.inner())?;
702        }
703
704        for strategy_id in &self.strategy_ids {
705            log::debug!("Starting strategy {strategy_id}");
706            start_component(&strategy_id.inner())?;
707        }
708
709        for exec_algorithm_id in &self.exec_algorithm_ids {
710            log::debug!("Starting execution algorithm {exec_algorithm_id}");
711            start_component(&exec_algorithm_id.inner())?;
712        }
713
714        Ok(())
715    }
716
717    /// Stops all registered components.
718    ///
719    /// # Errors
720    ///
721    /// Returns an error if any component fails to stop.
722    pub fn stop_components(&mut self) -> anyhow::Result<()> {
723        for actor_id in &self.actor_ids {
724            log::debug!("Stopping actor {actor_id}");
725            stop_component(&actor_id.inner())?;
726        }
727
728        for exec_algorithm_id in &self.exec_algorithm_ids {
729            log::debug!("Stopping execution algorithm {exec_algorithm_id}");
730            stop_component(&exec_algorithm_id.inner())?;
731        }
732
733        for strategy_id in self.strategy_ids.clone() {
734            log::debug!("Stopping strategy {strategy_id}");
735            let should_proceed = self
736                .strategy_stop_fns
737                .get_mut(&strategy_id)
738                .is_none_or(|stop_fn| stop_fn());
739
740            if should_proceed {
741                stop_component(&strategy_id.inner())?;
742            }
743        }
744
745        Ok(())
746    }
747
748    /// Resets all registered components.
749    ///
750    /// # Errors
751    ///
752    /// Returns an error if any component fails to reset.
753    pub fn reset_components(&mut self) -> anyhow::Result<()> {
754        for actor_id in &self.actor_ids {
755            log::debug!("Resetting actor {actor_id}");
756            reset_component(&actor_id.inner())?;
757        }
758
759        for strategy_id in &self.strategy_ids {
760            log::debug!("Resetting strategy {strategy_id}");
761            reset_component(&strategy_id.inner())?;
762        }
763
764        for exec_algorithm_id in &self.exec_algorithm_ids {
765            log::debug!("Resetting execution algorithm {exec_algorithm_id}");
766            reset_component(&exec_algorithm_id.inner())?;
767        }
768
769        Ok(())
770    }
771
772    /// Disposes of all registered components.
773    ///
774    /// # Errors
775    ///
776    /// Returns an error if any component fails to dispose.
777    pub fn dispose_components(&mut self) -> anyhow::Result<()> {
778        for actor_id in &self.actor_ids {
779            log::debug!("Disposing actor {actor_id}");
780            dispose_component(&actor_id.inner())?;
781        }
782
783        for strategy_id in &self.strategy_ids {
784            log::debug!("Disposing strategy {strategy_id}");
785            dispose_component(&strategy_id.inner())?;
786            get_message_bus()
787                .borrow_mut()
788                .endpoint_map::<StrategyCommand>()
789                .deregister(strategy_control_endpoint(*strategy_id));
790        }
791
792        for exec_algorithm_id in &self.exec_algorithm_ids {
793            log::debug!("Disposing execution algorithm {exec_algorithm_id}");
794            dispose_component(&exec_algorithm_id.inner())?;
795            let endpoint: Ustr = format!("{exec_algorithm_id}.execute").into();
796            msgbus::deregister_any(endpoint.into());
797        }
798
799        self.actor_ids.clear();
800        self.strategy_ids.clear();
801        self.strategy_stop_fns.clear();
802        self.strategy_handler_ids.clear();
803        self.exec_algorithm_ids.clear();
804        self.clocks.clear();
805
806        Ok(())
807    }
808
809    /// Clears all registered strategies, disposing each and removing their clocks.
810    ///
811    /// # Errors
812    ///
813    /// Returns an error if any strategy fails to dispose.
814    pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
815        for strategy_id in &self.strategy_ids {
816            log::debug!("Disposing strategy {strategy_id}");
817            dispose_component(&strategy_id.inner())?;
818            let component_id = ComponentId::new(strategy_id.inner().as_str());
819            self.clocks.remove(&component_id);
820
821            // Remove only this strategy's own msgbus handlers
822            if let Some((order_hid, position_hid)) = self.strategy_handler_ids.get(strategy_id) {
823                let order_topic = get_event_orders_topic(*strategy_id);
824                let position_topic = get_event_positions_topic(*strategy_id);
825                msgbus::remove_order_event_handler(order_topic.into(), *order_hid);
826                msgbus::remove_position_event_handler(position_topic.into(), *position_hid);
827            }
828
829            get_message_bus()
830                .borrow_mut()
831                .endpoint_map::<StrategyCommand>()
832                .deregister(strategy_control_endpoint(*strategy_id));
833        }
834
835        self.strategy_ids.clear();
836        self.strategy_stop_fns.clear();
837        self.strategy_handler_ids.clear();
838
839        Ok(())
840    }
841
842    /// Clears all registered actors, disposing each and removing their clocks.
843    ///
844    /// # Errors
845    ///
846    /// Returns an error if any actor fails to dispose.
847    pub fn clear_actors(&mut self) -> anyhow::Result<()> {
848        for actor_id in &self.actor_ids {
849            log::debug!("Disposing actor {actor_id}");
850            // Stop if running before disposal; ignore stop failures so a single
851            // misbehaving actor does not leave the rest in a half-cleared state.
852            let _ = stop_component(&actor_id.inner());
853            dispose_component(&actor_id.inner())?;
854            let component_id = ComponentId::new(actor_id.inner().as_str());
855            self.clocks.remove(&component_id);
856        }
857
858        self.actor_ids.clear();
859
860        Ok(())
861    }
862
863    /// Clears all registered execution algorithms, disposing each and removing their clocks.
864    ///
865    /// # Errors
866    ///
867    /// Returns an error if any execution algorithm fails to dispose.
868    pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
869        for exec_algorithm_id in &self.exec_algorithm_ids {
870            log::debug!("Disposing execution algorithm {exec_algorithm_id}");
871            dispose_component(&exec_algorithm_id.inner())?;
872            let endpoint: Ustr = format!("{exec_algorithm_id}.execute").into();
873            msgbus::deregister_any(endpoint.into());
874            let component_id = ComponentId::new(exec_algorithm_id.inner().as_str());
875            self.clocks.remove(&component_id);
876        }
877
878        self.exec_algorithm_ids.clear();
879
880        Ok(())
881    }
882
883    // -- Individual component management ----------------------------------------
884
885    /// Starts the actor with the given `actor_id`.
886    ///
887    /// # Errors
888    ///
889    /// Returns an error if the actor is not registered or cannot be started.
890    pub fn start_actor(&self, actor_id: &ActorId) -> anyhow::Result<()> {
891        if !self.actor_ids.contains(actor_id) {
892            anyhow::bail!("Cannot start actor, {actor_id} not found");
893        }
894        start_component(&actor_id.inner())
895    }
896
897    /// Stops the actor with the given `actor_id`.
898    ///
899    /// # Errors
900    ///
901    /// Returns an error if the actor is not registered or cannot be stopped.
902    pub fn stop_actor(&self, actor_id: &ActorId) -> anyhow::Result<()> {
903        if !self.actor_ids.contains(actor_id) {
904            anyhow::bail!("Cannot stop actor, {actor_id} not found");
905        }
906        stop_component(&actor_id.inner())
907    }
908
909    /// Removes the actor with the given `actor_id`.
910    ///
911    /// Will stop the actor first if it is currently running. Disposes the actor
912    /// and removes it from the trader's tracking.
913    ///
914    /// # Errors
915    ///
916    /// Returns an error if the actor is not registered.
917    pub fn remove_actor(&mut self, actor_id: &ActorId) -> anyhow::Result<()> {
918        let pos = self
919            .actor_ids
920            .iter()
921            .position(|id| id == actor_id)
922            .ok_or_else(|| anyhow::anyhow!("Cannot remove actor, {actor_id} not found"))?;
923
924        // Stop if running, then dispose
925        let _ = stop_component(&actor_id.inner());
926        dispose_component(&actor_id.inner())?;
927
928        self.actor_ids.swap_remove(pos);
929        let component_id = ComponentId::new(actor_id.inner().as_str());
930        self.clocks.remove(&component_id);
931
932        log::info!("Removed actor {actor_id} from trader {}", self.trader_id);
933        Ok(())
934    }
935
936    /// Starts the strategy with the given `strategy_id`.
937    ///
938    /// # Errors
939    ///
940    /// Returns an error if the strategy is not registered or cannot be started.
941    pub fn start_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<()> {
942        if !self.strategy_ids.contains(strategy_id) {
943            anyhow::bail!("Cannot start strategy, {strategy_id} not found");
944        }
945        start_component(&strategy_id.inner())
946    }
947
948    /// Stops the strategy with the given `strategy_id`.
949    ///
950    /// Respects the `manage_stop` behavior — if the strategy's stop function
951    /// returns `false`, the component stop is deferred until market exit completes.
952    ///
953    /// # Errors
954    ///
955    /// Returns an error if the strategy is not registered or cannot be stopped.
956    pub fn stop_strategy(&mut self, strategy_id: &StrategyId) -> anyhow::Result<()> {
957        if !self.strategy_ids.contains(strategy_id) {
958            anyhow::bail!("Cannot stop strategy, {strategy_id} not found");
959        }
960
961        let should_proceed = self
962            .strategy_stop_fns
963            .get_mut(strategy_id)
964            .is_none_or(|stop_fn| stop_fn());
965
966        if should_proceed {
967            stop_component(&strategy_id.inner())?;
968        }
969
970        Ok(())
971    }
972
973    /// Exits the market for the strategy with the given `strategy_id`.
974    ///
975    /// Sends a strategy command to the strategy's control endpoint. The strategy
976    /// then performs its own managed market exit.
977    ///
978    /// # Errors
979    ///
980    /// Returns an error if the strategy is not registered or its control endpoint is missing.
981    pub fn market_exit_strategy(
982        trader: &Rc<RefCell<Self>>,
983        strategy_id: &StrategyId,
984    ) -> anyhow::Result<()> {
985        let handler = trader.borrow().strategy_command_handler(strategy_id)?;
986        handler.handle(&StrategyCommand::ExitMarket);
987        Ok(())
988    }
989
990    fn strategy_command_handler(
991        &self,
992        strategy_id: &StrategyId,
993    ) -> anyhow::Result<TypedHandler<StrategyCommand>> {
994        if !self.strategy_ids.contains(strategy_id) {
995            anyhow::bail!("Cannot market exit strategy, {strategy_id} not found");
996        }
997
998        let endpoint = strategy_control_endpoint(*strategy_id);
999        let handler = {
1000            let msgbus = get_message_bus();
1001            msgbus
1002                .borrow_mut()
1003                .endpoint_map::<StrategyCommand>()
1004                .get(endpoint)
1005                .cloned()
1006        };
1007
1008        let Some(handler) = handler else {
1009            anyhow::bail!(
1010                "Cannot exit market for strategy {strategy_id}: control endpoint '{}' not registered",
1011                endpoint.as_str()
1012            );
1013        };
1014
1015        Ok(handler)
1016    }
1017
1018    /// Removes the strategy with the given `strategy_id`.
1019    ///
1020    /// Will stop the strategy first if it is currently running. Disposes the strategy
1021    /// and removes it from the trader's tracking along with its event subscriptions.
1022    ///
1023    /// # Errors
1024    ///
1025    /// Returns an error if the strategy is not registered.
1026    pub fn remove_strategy(&mut self, strategy_id: &StrategyId) -> anyhow::Result<()> {
1027        let pos = self
1028            .strategy_ids
1029            .iter()
1030            .position(|id| id == strategy_id)
1031            .ok_or_else(|| anyhow::anyhow!("Cannot remove strategy, {strategy_id} not found"))?;
1032
1033        // Stop if running, then dispose
1034        let _ = stop_component(&strategy_id.inner());
1035        dispose_component(&strategy_id.inner())?;
1036
1037        // Clean up event subscriptions
1038        if let Some((order_hid, position_hid)) = self.strategy_handler_ids.remove(strategy_id) {
1039            let order_topic = get_event_orders_topic(*strategy_id);
1040            let position_topic = get_event_positions_topic(*strategy_id);
1041            msgbus::remove_order_event_handler(order_topic.into(), order_hid);
1042            msgbus::remove_position_event_handler(position_topic.into(), position_hid);
1043        }
1044
1045        get_message_bus()
1046            .borrow_mut()
1047            .endpoint_map::<StrategyCommand>()
1048            .deregister(strategy_control_endpoint(*strategy_id));
1049
1050        self.strategy_ids.swap_remove(pos);
1051        self.strategy_stop_fns.remove(strategy_id);
1052        let component_id = ComponentId::new(strategy_id.inner().as_str());
1053        self.clocks.remove(&component_id);
1054
1055        log::info!(
1056            "Removed strategy {strategy_id} from trader {}",
1057            self.trader_id
1058        );
1059        Ok(())
1060    }
1061
1062    // -- Lifecycle management ---------------------------------------------------
1063
1064    /// Initializes the trader, transitioning from `PreInitialized` to `Ready` state.
1065    ///
1066    /// This method must be called before starting the trader.
1067    ///
1068    /// # Errors
1069    ///
1070    /// Returns an error if the trader cannot be initialized from its current state.
1071    pub fn initialize(&mut self) -> anyhow::Result<()> {
1072        let new_state = self.state.transition(&ComponentTrigger::Initialize)?;
1073        self.state = new_state;
1074
1075        Ok(())
1076    }
1077
1078    fn on_start(&mut self) -> anyhow::Result<()> {
1079        self.start_components()?;
1080
1081        // Transition to running state
1082        self.ts_started = Some(self.clock.borrow().timestamp_ns());
1083
1084        Ok(())
1085    }
1086
1087    fn on_stop(&mut self) -> anyhow::Result<()> {
1088        self.stop_components()?;
1089
1090        self.ts_stopped = Some(self.clock.borrow().timestamp_ns());
1091
1092        Ok(())
1093    }
1094
1095    fn on_reset(&mut self) -> anyhow::Result<()> {
1096        self.reset_components()?;
1097
1098        self.ts_started = None;
1099        self.ts_stopped = None;
1100
1101        Ok(())
1102    }
1103
1104    fn on_dispose(&mut self) -> anyhow::Result<()> {
1105        if self.is_running() {
1106            self.stop()?;
1107        }
1108
1109        self.dispose_components()?;
1110
1111        Ok(())
1112    }
1113}
1114
1115impl Component for Trader {
1116    fn component_id(&self) -> ComponentId {
1117        ComponentId::new(format!("Trader-{}", self.trader_id))
1118    }
1119
1120    fn state(&self) -> ComponentState {
1121        self.state
1122    }
1123
1124    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1125        self.state = self.state.transition(&trigger)?;
1126        log::info!("{}", self.state.variant_name());
1127        Ok(())
1128    }
1129
1130    fn register(
1131        &mut self,
1132        _trader_id: TraderId,
1133        _clock: Rc<RefCell<dyn Clock>>,
1134        _cache: Rc<RefCell<Cache>>,
1135    ) -> anyhow::Result<()> {
1136        anyhow::bail!("Trader cannot register with itself")
1137    }
1138
1139    fn on_start(&mut self) -> anyhow::Result<()> {
1140        Self::on_start(self)
1141    }
1142
1143    fn on_stop(&mut self) -> anyhow::Result<()> {
1144        Self::on_stop(self)
1145    }
1146
1147    fn on_reset(&mut self) -> anyhow::Result<()> {
1148        Self::on_reset(self)
1149    }
1150
1151    fn on_dispose(&mut self) -> anyhow::Result<()> {
1152        Self::on_dispose(self)
1153    }
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158    use std::{cell::RefCell, rc::Rc};
1159
1160    use nautilus_common::{
1161        actor::{DataActorCore, data_actor::DataActorConfig},
1162        cache::Cache,
1163        clock::TestClock,
1164        enums::{ComponentState, Environment},
1165        msgbus,
1166        msgbus::{MessageBus, TypedHandler, switchboard::get_event_orders_topic},
1167        nautilus_actor,
1168    };
1169    use nautilus_core::UUID4;
1170    use nautilus_data::engine::{DataEngine, config::DataEngineConfig};
1171    use nautilus_execution::engine::{ExecutionEngine, config::ExecutionEngineConfig};
1172    use nautilus_model::{
1173        events::OrderAccepted,
1174        identifiers::{ActorId, ComponentId, TraderId},
1175        orders::OrderAny,
1176        stubs::TestDefault,
1177    };
1178    use nautilus_portfolio::portfolio::Portfolio;
1179    use nautilus_risk::engine::{RiskEngine, config::RiskEngineConfig};
1180    use nautilus_trading::{
1181        ExecutionAlgorithm as ExecutionAlgorithmTrait, ExecutionAlgorithmConfig,
1182        ExecutionAlgorithmCore, nautilus_strategy,
1183        strategy::{config::StrategyConfig, core::StrategyCore},
1184    };
1185    use rstest::rstest;
1186
1187    use super::*;
1188
1189    // Simple DataActor wrapper for testing
1190    #[derive(Debug)]
1191    struct TestDataActor {
1192        core: DataActorCore,
1193    }
1194
1195    impl TestDataActor {
1196        fn new(config: DataActorConfig) -> Self {
1197            Self {
1198                core: DataActorCore::new(config),
1199            }
1200        }
1201    }
1202
1203    impl DataActor for TestDataActor {}
1204
1205    nautilus_actor!(TestDataActor);
1206
1207    // Simple ExecutionAlgorithm wrapper for testing
1208    #[derive(Debug)]
1209    struct TestExecAlgorithm {
1210        core: ExecutionAlgorithmCore,
1211    }
1212
1213    impl TestExecAlgorithm {
1214        fn new(config: ExecutionAlgorithmConfig) -> Self {
1215            Self {
1216                core: ExecutionAlgorithmCore::new(config),
1217            }
1218        }
1219    }
1220
1221    impl DataActor for TestExecAlgorithm {}
1222
1223    nautilus_actor!(TestExecAlgorithm);
1224
1225    impl ExecutionAlgorithmTrait for TestExecAlgorithm {
1226        fn core_mut(&mut self) -> &mut ExecutionAlgorithmCore {
1227            &mut self.core
1228        }
1229
1230        fn on_order(&mut self, _order: OrderAny) -> anyhow::Result<()> {
1231            Ok(())
1232        }
1233    }
1234
1235    // Simple Strategy wrapper for testing
1236    #[derive(Debug)]
1237    struct TestStrategy {
1238        core: StrategyCore,
1239    }
1240
1241    impl TestStrategy {
1242        fn new(config: StrategyConfig) -> Self {
1243            Self {
1244                core: StrategyCore::new(config),
1245            }
1246        }
1247    }
1248
1249    impl DataActor for TestStrategy {}
1250
1251    nautilus_strategy!(TestStrategy);
1252
1253    #[expect(clippy::type_complexity)]
1254    fn create_trader_components() -> (
1255        Rc<RefCell<MessageBus>>,
1256        Rc<RefCell<Cache>>,
1257        Rc<RefCell<Portfolio>>,
1258        Rc<RefCell<DataEngine>>,
1259        Rc<RefCell<RiskEngine>>,
1260        Rc<RefCell<ExecutionEngine>>,
1261        Rc<RefCell<TestClock>>,
1262    ) {
1263        let trader_id = TraderId::test_default();
1264        let instance_id = UUID4::new();
1265        let clock = Rc::new(RefCell::new(TestClock::new()));
1266        // Set the clock to a non-zero time for test purposes
1267        clock.borrow_mut().set_time(1_000_000_000u64.into());
1268        let msgbus = Rc::new(RefCell::new(MessageBus::new(
1269            trader_id,
1270            instance_id,
1271            Some("test".to_string()),
1272            None,
1273        )));
1274        let cache = Rc::new(RefCell::new(Cache::new(None, None)));
1275        let portfolio = Rc::new(RefCell::new(Portfolio::new(
1276            cache.clone(),
1277            clock.clone() as Rc<RefCell<dyn Clock>>,
1278            None,
1279        )));
1280        let data_engine = Rc::new(RefCell::new(DataEngine::new(
1281            clock.clone(),
1282            cache.clone(),
1283            Some(DataEngineConfig::default()),
1284        )));
1285
1286        // Create separate cache and clock instances for RiskEngine to avoid borrowing conflicts
1287        let risk_cache = Rc::new(RefCell::new(Cache::new(None, None)));
1288        let risk_clock = Rc::new(RefCell::new(TestClock::new()));
1289        let risk_portfolio = Portfolio::new(
1290            risk_cache.clone(),
1291            risk_clock.clone() as Rc<RefCell<dyn Clock>>,
1292            None,
1293        );
1294        let risk_engine = Rc::new(RefCell::new(RiskEngine::new(
1295            RiskEngineConfig::default(),
1296            risk_portfolio,
1297            risk_clock as Rc<RefCell<dyn Clock>>,
1298            risk_cache,
1299        )));
1300        let exec_engine = Rc::new(RefCell::new(ExecutionEngine::new(
1301            clock.clone(),
1302            cache.clone(),
1303            Some(ExecutionEngineConfig::default()),
1304        )));
1305
1306        (
1307            msgbus,
1308            cache,
1309            portfolio,
1310            data_engine,
1311            risk_engine,
1312            exec_engine,
1313            clock,
1314        )
1315    }
1316
1317    #[rstest]
1318    fn test_trader_creation() {
1319        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1320            create_trader_components();
1321        let trader_id = TraderId::test_default();
1322        let instance_id = UUID4::new();
1323
1324        let trader = Trader::new(
1325            trader_id,
1326            instance_id,
1327            Environment::Backtest,
1328            clock,
1329            cache,
1330            portfolio,
1331        );
1332
1333        assert_eq!(trader.trader_id(), trader_id);
1334        assert_eq!(trader.instance_id(), instance_id);
1335        assert_eq!(trader.environment(), Environment::Backtest);
1336        assert_eq!(trader.state(), ComponentState::PreInitialized);
1337        assert_eq!(trader.actor_count(), 0);
1338        assert_eq!(trader.strategy_count(), 0);
1339        assert_eq!(trader.exec_algorithm_count(), 0);
1340        assert_eq!(trader.component_count(), 0);
1341        assert!(!trader.is_running());
1342        assert!(!trader.is_stopped());
1343        assert!(!trader.is_disposed());
1344        assert!(trader.ts_created() > 0);
1345        assert!(trader.ts_started().is_none());
1346        assert!(trader.ts_stopped().is_none());
1347    }
1348
1349    #[rstest]
1350    fn test_trader_component_id() {
1351        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1352            create_trader_components();
1353        let trader_id = TraderId::from("TRADER-001");
1354        let instance_id = UUID4::new();
1355
1356        let trader = Trader::new(
1357            trader_id,
1358            instance_id,
1359            Environment::Backtest,
1360            clock,
1361            cache,
1362            portfolio,
1363        );
1364
1365        assert_eq!(
1366            trader.component_id(),
1367            ComponentId::from("Trader-TRADER-001")
1368        );
1369    }
1370
1371    #[rstest]
1372    fn test_add_actor_success() {
1373        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1374            create_trader_components();
1375        let trader_id = TraderId::test_default();
1376        let instance_id = UUID4::new();
1377
1378        let mut trader = Trader::new(
1379            trader_id,
1380            instance_id,
1381            Environment::Backtest,
1382            clock,
1383            cache,
1384            portfolio,
1385        );
1386
1387        let actor = TestDataActor::new(DataActorConfig::default());
1388        let actor_id = actor.actor_id();
1389
1390        let result = trader.add_actor(actor);
1391        assert!(result.is_ok());
1392        assert_eq!(trader.actor_count(), 1);
1393        assert_eq!(trader.component_count(), 1);
1394        assert!(trader.actor_ids().contains(&actor_id));
1395    }
1396
1397    #[rstest]
1398    fn test_add_duplicate_actor_fails() {
1399        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1400            create_trader_components();
1401        let trader_id = TraderId::test_default();
1402        let instance_id = UUID4::new();
1403
1404        let mut trader = Trader::new(
1405            trader_id,
1406            instance_id,
1407            Environment::Backtest,
1408            clock,
1409            cache,
1410            portfolio,
1411        );
1412
1413        let config = DataActorConfig {
1414            actor_id: Some(ActorId::from("TestActor")),
1415            ..Default::default()
1416        };
1417        let actor1 = TestDataActor::new(config.clone());
1418        let actor2 = TestDataActor::new(config);
1419
1420        // First addition should succeed
1421        assert!(trader.add_actor(actor1).is_ok());
1422        assert_eq!(trader.actor_count(), 1);
1423
1424        // Second addition should fail
1425        let result = trader.add_actor(actor2);
1426        assert!(result.is_err());
1427        assert!(
1428            result
1429                .unwrap_err()
1430                .to_string()
1431                .contains("already registered")
1432        );
1433        assert_eq!(trader.actor_count(), 1);
1434    }
1435
1436    #[rstest]
1437    fn test_add_strategy_success() {
1438        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1439            create_trader_components();
1440        let trader_id = TraderId::test_default();
1441        let instance_id = UUID4::new();
1442
1443        let mut trader = Trader::new(
1444            trader_id,
1445            instance_id,
1446            Environment::Backtest,
1447            clock,
1448            cache,
1449            portfolio,
1450        );
1451
1452        let config = StrategyConfig {
1453            strategy_id: Some(StrategyId::from("Test-Strategy")),
1454            ..Default::default()
1455        };
1456        let strategy = TestStrategy::new(config);
1457        let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
1458
1459        let result = trader.add_strategy(strategy);
1460        assert!(result.is_ok());
1461        assert_eq!(trader.strategy_count(), 1);
1462        assert_eq!(trader.component_count(), 1);
1463        assert!(trader.strategy_ids().contains(&strategy_id));
1464    }
1465
1466    #[rstest]
1467    fn test_add_exec_algorithm_success() {
1468        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1469            create_trader_components();
1470        let trader_id = TraderId::test_default();
1471        let instance_id = UUID4::new();
1472
1473        let mut trader = Trader::new(
1474            trader_id,
1475            instance_id,
1476            Environment::Backtest,
1477            clock,
1478            cache,
1479            portfolio,
1480        );
1481
1482        let config = ExecutionAlgorithmConfig {
1483            exec_algorithm_id: Some(ExecAlgorithmId::from("TestExecAlgorithm")),
1484            ..Default::default()
1485        };
1486        let exec_algorithm = TestExecAlgorithm::new(config);
1487        let exec_algorithm_id = ExecAlgorithmId::from(exec_algorithm.actor_id().inner().as_str());
1488
1489        let result = trader.add_exec_algorithm(exec_algorithm);
1490        assert!(result.is_ok());
1491        assert_eq!(trader.exec_algorithm_count(), 1);
1492        assert_eq!(trader.component_count(), 1);
1493        assert!(trader.exec_algorithm_ids().contains(&exec_algorithm_id));
1494    }
1495
1496    #[rstest]
1497    fn test_cannot_add_exec_algorithm_while_running() {
1498        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1499            create_trader_components();
1500        let trader_id = TraderId::test_default();
1501        let instance_id = UUID4::new();
1502
1503        let mut trader = Trader::new(
1504            trader_id,
1505            instance_id,
1506            Environment::Backtest,
1507            clock,
1508            cache,
1509            portfolio,
1510        );
1511        trader.state = ComponentState::Running;
1512
1513        let config = ExecutionAlgorithmConfig {
1514            exec_algorithm_id: Some(ExecAlgorithmId::from("TestExecAlgorithm")),
1515            ..Default::default()
1516        };
1517        let exec_algorithm = TestExecAlgorithm::new(config);
1518
1519        let result = trader.add_exec_algorithm(exec_algorithm);
1520        assert!(result.is_err());
1521        assert_eq!(
1522            result.unwrap_err().to_string(),
1523            "Cannot add execution algorithms to running trader"
1524        );
1525        assert_eq!(trader.exec_algorithm_count(), 0);
1526    }
1527
1528    #[rstest]
1529    fn test_component_lifecycle() {
1530        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1531            create_trader_components();
1532        let trader_id = TraderId::test_default();
1533        let instance_id = UUID4::new();
1534
1535        let mut trader = Trader::new(
1536            trader_id,
1537            instance_id,
1538            Environment::Backtest,
1539            clock,
1540            cache,
1541            portfolio,
1542        );
1543
1544        // Add components
1545        let actor = TestDataActor::new(DataActorConfig::default());
1546
1547        let strategy_config = StrategyConfig {
1548            strategy_id: Some(StrategyId::from("Test-Strategy")),
1549            ..Default::default()
1550        };
1551        let strategy = TestStrategy::new(strategy_config);
1552
1553        let exec_algorithm_config = ExecutionAlgorithmConfig {
1554            exec_algorithm_id: Some(ExecAlgorithmId::from("TestExecAlgorithm")),
1555            ..Default::default()
1556        };
1557        let exec_algorithm = TestExecAlgorithm::new(exec_algorithm_config);
1558
1559        assert!(trader.add_actor(actor).is_ok());
1560        assert!(trader.add_strategy(strategy).is_ok());
1561        assert!(trader.add_exec_algorithm(exec_algorithm).is_ok());
1562        assert_eq!(trader.component_count(), 3);
1563
1564        // Test start components
1565        let start_result = trader.start_components();
1566        assert!(start_result.is_ok(), "{:?}", start_result.unwrap_err());
1567
1568        // Test stop components
1569        assert!(trader.stop_components().is_ok());
1570
1571        // Test reset components
1572        assert!(trader.reset_components().is_ok());
1573
1574        // Test dispose components
1575        assert!(trader.dispose_components().is_ok());
1576        assert_eq!(trader.component_count(), 0);
1577    }
1578
1579    #[rstest]
1580    fn test_trader_component_lifecycle() {
1581        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1582            create_trader_components();
1583        let trader_id = TraderId::test_default();
1584        let instance_id = UUID4::new();
1585
1586        let mut trader = Trader::new(
1587            trader_id,
1588            instance_id,
1589            Environment::Backtest,
1590            clock,
1591            cache,
1592            portfolio,
1593        );
1594
1595        // Initially pre-initialized
1596        assert_eq!(trader.state(), ComponentState::PreInitialized);
1597        assert!(!trader.is_running());
1598        assert!(!trader.is_stopped());
1599        assert!(!trader.is_disposed());
1600
1601        // Cannot start from pre-initialized state
1602        assert!(trader.start().is_err());
1603
1604        // Simulate initialization (normally done by kernel)
1605        trader.initialize().unwrap();
1606
1607        // Test start
1608        assert!(trader.start().is_ok());
1609        assert_eq!(trader.state(), ComponentState::Running);
1610        assert!(trader.is_running());
1611        assert!(trader.ts_started().is_some());
1612
1613        // Test stop
1614        assert!(trader.stop().is_ok());
1615        assert_eq!(trader.state(), ComponentState::Stopped);
1616        assert!(trader.is_stopped());
1617        assert!(trader.ts_stopped().is_some());
1618
1619        // Test reset
1620        assert!(trader.reset().is_ok());
1621        assert_eq!(trader.state(), ComponentState::Ready);
1622        assert!(trader.ts_started().is_none());
1623        assert!(trader.ts_stopped().is_none());
1624
1625        // Test dispose
1626        assert!(trader.dispose().is_ok());
1627        assert_eq!(trader.state(), ComponentState::Disposed);
1628        assert!(trader.is_disposed());
1629    }
1630
1631    #[rstest]
1632    fn test_market_exit_strategy_fails_when_control_endpoint_missing() {
1633        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1634            create_trader_components();
1635        let trader_id = TraderId::test_default();
1636        let instance_id = UUID4::new();
1637
1638        let mut trader = Trader::new(
1639            trader_id,
1640            instance_id,
1641            Environment::Backtest,
1642            clock,
1643            cache,
1644            portfolio,
1645        );
1646
1647        let config = StrategyConfig {
1648            strategy_id: Some(StrategyId::from("Test-Strategy")),
1649            ..Default::default()
1650        };
1651        let strategy = TestStrategy::new(config);
1652        let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
1653        trader.add_strategy(strategy).unwrap();
1654
1655        let endpoint = strategy_control_endpoint(strategy_id);
1656        assert!(
1657            get_message_bus()
1658                .borrow_mut()
1659                .endpoint_map::<StrategyCommand>()
1660                .is_registered(endpoint)
1661        );
1662        get_message_bus()
1663            .borrow_mut()
1664            .endpoint_map::<StrategyCommand>()
1665            .deregister(endpoint);
1666
1667        let trader = Rc::new(RefCell::new(trader));
1668        let result = Trader::market_exit_strategy(&trader, &strategy_id);
1669        assert!(result.is_err());
1670        assert_eq!(
1671            result.unwrap_err().to_string(),
1672            format!(
1673                "Cannot exit market for strategy {strategy_id}: control endpoint '{}' not registered",
1674                endpoint.as_str()
1675            )
1676        );
1677    }
1678
1679    #[rstest]
1680    fn test_remove_strategy_deregisters_strategy_endpoint() {
1681        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1682            create_trader_components();
1683        let trader_id = TraderId::test_default();
1684        let instance_id = UUID4::new();
1685
1686        let mut trader = Trader::new(
1687            trader_id,
1688            instance_id,
1689            Environment::Backtest,
1690            clock,
1691            cache,
1692            portfolio,
1693        );
1694
1695        let config = StrategyConfig {
1696            strategy_id: Some(StrategyId::from("Test-Strategy")),
1697            ..Default::default()
1698        };
1699        let strategy = TestStrategy::new(config);
1700        let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
1701        trader.add_strategy(strategy).unwrap();
1702
1703        let endpoint = strategy_control_endpoint(strategy_id);
1704        assert!(
1705            get_message_bus()
1706                .borrow_mut()
1707                .endpoint_map::<StrategyCommand>()
1708                .is_registered(endpoint)
1709        );
1710
1711        trader.remove_strategy(&strategy_id).unwrap();
1712
1713        assert!(
1714            !get_message_bus()
1715                .borrow_mut()
1716                .endpoint_map::<StrategyCommand>()
1717                .is_registered(endpoint)
1718        );
1719    }
1720
1721    #[rstest]
1722    fn test_can_add_components_while_running() {
1723        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1724            create_trader_components();
1725        let trader_id = TraderId::test_default();
1726        let instance_id = UUID4::new();
1727
1728        let mut trader = Trader::new(
1729            trader_id,
1730            instance_id,
1731            Environment::Backtest,
1732            clock,
1733            cache,
1734            portfolio,
1735        );
1736
1737        // Simulate running state
1738        trader.state = ComponentState::Running;
1739
1740        let actor = TestDataActor::new(DataActorConfig::default());
1741        let result = trader.add_actor(actor);
1742        assert!(result.is_ok());
1743        assert_eq!(trader.actor_count(), 1);
1744    }
1745
1746    #[rstest]
1747    fn test_cannot_add_components_while_disposed() {
1748        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1749            create_trader_components();
1750        let trader_id = TraderId::test_default();
1751        let instance_id = UUID4::new();
1752
1753        let mut trader = Trader::new(
1754            trader_id,
1755            instance_id,
1756            Environment::Backtest,
1757            clock,
1758            cache,
1759            portfolio,
1760        );
1761
1762        // Simulate disposed state
1763        trader.state = ComponentState::Disposed;
1764
1765        let actor = TestDataActor::new(DataActorConfig::default());
1766        let result = trader.add_actor(actor);
1767        assert!(result.is_err());
1768        assert!(result.unwrap_err().to_string().contains("disposed trader"));
1769    }
1770
1771    #[rstest]
1772    fn test_create_component_clock_backtest_creates_individual_clocks() {
1773        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1774            create_trader_components();
1775        let trader_id = TraderId::test_default();
1776        let instance_id = UUID4::new();
1777
1778        let mut trader = Trader::new(
1779            trader_id,
1780            instance_id,
1781            Environment::Backtest,
1782            clock.clone(),
1783            cache,
1784            portfolio,
1785        );
1786
1787        let component_a = ComponentId::new("ACTOR-A");
1788        let component_b = ComponentId::new("ACTOR-B");
1789        let clock_a = trader.create_component_clock(component_a);
1790        let clock_b = trader.create_component_clock(component_b);
1791
1792        // Each component gets its own clock instance
1793        assert_ne!(clock_a.as_ptr() as *const _, clock.as_ptr() as *const _);
1794        assert_ne!(clock_a.as_ptr() as *const _, clock_b.as_ptr() as *const _);
1795    }
1796
1797    #[rstest]
1798    fn test_clear_strategies_preserves_other_handlers() {
1799        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1800            create_trader_components();
1801        let trader_id = TraderId::test_default();
1802        let instance_id = UUID4::new();
1803
1804        let mut trader = Trader::new(
1805            trader_id,
1806            instance_id,
1807            Environment::Backtest,
1808            clock,
1809            cache,
1810            portfolio,
1811        );
1812
1813        let config = StrategyConfig {
1814            strategy_id: Some(StrategyId::from("Test-Strategy")),
1815            ..Default::default()
1816        };
1817        let strategy = TestStrategy::new(config);
1818        let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
1819        trader.add_strategy(strategy).unwrap();
1820
1821        let endpoint = strategy_control_endpoint(strategy_id);
1822        assert!(
1823            get_message_bus()
1824                .borrow_mut()
1825                .endpoint_map::<StrategyCommand>()
1826                .is_registered(endpoint)
1827        );
1828
1829        // Simulate an exec algorithm subscribing to the same strategy topic
1830        let ext_received = Rc::new(RefCell::new(0));
1831        let ext_clone = ext_received.clone();
1832        let ext_handler =
1833            TypedHandler::from_with_id("exec-algo-handler", move |_: &OrderEventAny| {
1834                *ext_clone.borrow_mut() += 1;
1835            });
1836        let order_topic = get_event_orders_topic(strategy_id);
1837        msgbus::subscribe_order_events(order_topic.into(), ext_handler, None);
1838
1839        trader.clear_strategies().unwrap();
1840        assert_eq!(trader.strategy_count(), 0);
1841        assert!(
1842            !get_message_bus()
1843                .borrow_mut()
1844                .endpoint_map::<StrategyCommand>()
1845                .is_registered(endpoint)
1846        );
1847
1848        let event = OrderEventAny::Accepted(OrderAccepted::test_default());
1849        msgbus::publish_order_event(order_topic, &event);
1850        assert_eq!(*ext_received.borrow(), 1);
1851    }
1852
1853    #[rstest]
1854    fn test_clear_actors_disposes_and_clears_state() {
1855        let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1856            create_trader_components();
1857        let trader_id = TraderId::test_default();
1858        let instance_id = UUID4::new();
1859
1860        let mut trader = Trader::new(
1861            trader_id,
1862            instance_id,
1863            Environment::Backtest,
1864            clock,
1865            cache,
1866            portfolio,
1867        );
1868
1869        let actor_a = TestDataActor::new(DataActorConfig {
1870            actor_id: Some(ActorId::from("Actor-A")),
1871            ..Default::default()
1872        });
1873        let actor_b = TestDataActor::new(DataActorConfig {
1874            actor_id: Some(ActorId::from("Actor-B")),
1875            ..Default::default()
1876        });
1877        trader.add_actor(actor_a).unwrap();
1878        trader.add_actor(actor_b).unwrap();
1879        assert_eq!(trader.actor_count(), 2);
1880        assert_eq!(
1881            trader.get_component_clocks().len(),
1882            2,
1883            "each registered actor must have a component clock",
1884        );
1885
1886        trader.clear_actors().unwrap();
1887
1888        assert_eq!(trader.actor_count(), 0);
1889        assert!(trader.actor_ids().is_empty());
1890        assert_eq!(
1891            trader.get_component_clocks().len(),
1892            0,
1893            "actor clocks must be dropped after clear_actors",
1894        );
1895    }
1896}