1use std::{cell::RefCell, fmt::Debug, rc::Rc};
23
24use ahash::AHashMap;
25use nautilus_common::{
26 actor::{DataActor, registry::try_get_actor_unchecked},
27 cache::Cache,
28 clock::{Clock, TestClock},
29 component::{
30 Component, dispose_component, register_component_actor, reset_component, start_component,
31 stop_component,
32 },
33 enums::{ComponentState, ComponentTrigger, Environment},
34 messages::execution::TradingCommand,
35 msgbus,
36 msgbus::{
37 Endpoint, MStr, ShareableMessageHandler, TypedHandler, get_message_bus,
38 switchboard::{get_event_orders_topic, get_event_positions_topic},
39 },
40 timer::{TimeEvent, TimeEventCallback},
41};
42use nautilus_core::{UUID4, UnixNanos};
43use nautilus_model::{
44 events::{OrderEventAny, PositionEvent},
45 identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId},
46};
47use nautilus_portfolio::portfolio::Portfolio;
48use nautilus_trading::{ExecutionAlgorithm, strategy::Strategy};
49use ustr::Ustr;
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub(crate) enum StrategyCommand {
53 ExitMarket,
54}
55
56fn strategy_control_endpoint(strategy_id: StrategyId) -> MStr<Endpoint> {
57 format!("{strategy_id}.control").into()
58}
59
60pub struct Trader {
76 pub trader_id: TraderId,
78 pub instance_id: UUID4,
80 pub environment: Environment,
82 state: ComponentState,
84 clock: Rc<RefCell<dyn Clock>>,
86 cache: Rc<RefCell<Cache>>,
88 portfolio: Rc<RefCell<Portfolio>>,
90 actor_ids: Vec<ActorId>,
92 strategy_ids: Vec<StrategyId>,
94 strategy_stop_fns: AHashMap<StrategyId, Box<dyn FnMut() -> bool>>,
96 strategy_handler_ids: AHashMap<StrategyId, (Ustr, Ustr)>,
98 exec_algorithm_ids: Vec<ExecAlgorithmId>,
100 clocks: AHashMap<ComponentId, Rc<RefCell<dyn Clock>>>,
102 ts_created: UnixNanos,
104 ts_started: Option<UnixNanos>,
106 ts_stopped: Option<UnixNanos>,
108}
109
110impl Debug for Trader {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 write!(f, "{:?}", stringify!(TraderId)) }
114}
115
116impl Trader {
117 #[must_use]
119 pub fn new(
120 trader_id: TraderId,
121 instance_id: UUID4,
122 environment: Environment,
123 clock: Rc<RefCell<dyn Clock>>,
124 cache: Rc<RefCell<Cache>>,
125 portfolio: Rc<RefCell<Portfolio>>,
126 ) -> Self {
127 let ts_created = clock.borrow().timestamp_ns();
128
129 Self {
130 trader_id,
131 instance_id,
132 environment,
133 state: ComponentState::PreInitialized,
134 clock,
135 cache,
136 portfolio,
137 actor_ids: Vec::new(),
138 strategy_ids: Vec::new(),
139 strategy_stop_fns: AHashMap::new(),
140 strategy_handler_ids: AHashMap::new(),
141 exec_algorithm_ids: Vec::new(),
142 clocks: AHashMap::new(),
143 ts_created,
144 ts_started: None,
145 ts_stopped: None,
146 }
147 }
148
149 #[must_use]
151 pub const fn trader_id(&self) -> TraderId {
152 self.trader_id
153 }
154
155 #[must_use]
157 pub const fn instance_id(&self) -> UUID4 {
158 self.instance_id
159 }
160
161 #[must_use]
163 pub const fn environment(&self) -> Environment {
164 self.environment
165 }
166
167 #[must_use]
169 pub const fn state(&self) -> ComponentState {
170 self.state
171 }
172
173 #[must_use]
175 pub const fn ts_created(&self) -> UnixNanos {
176 self.ts_created
177 }
178
179 #[must_use]
181 pub const fn ts_started(&self) -> Option<UnixNanos> {
182 self.ts_started
183 }
184
185 #[must_use]
187 pub const fn ts_stopped(&self) -> Option<UnixNanos> {
188 self.ts_stopped
189 }
190
191 #[must_use]
193 pub const fn actor_count(&self) -> usize {
194 self.actor_ids.len()
195 }
196
197 #[must_use]
199 pub const fn strategy_count(&self) -> usize {
200 self.strategy_ids.len()
201 }
202
203 #[must_use]
205 pub const fn exec_algorithm_count(&self) -> usize {
206 self.exec_algorithm_ids.len()
207 }
208
209 pub fn get_component_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
211 self.clocks.values().cloned().collect()
212 }
213
214 #[must_use]
216 pub const fn component_count(&self) -> usize {
217 self.actor_ids.len() + self.strategy_ids.len() + self.exec_algorithm_ids.len()
218 }
219
220 #[must_use]
222 pub fn actor_ids(&self) -> Vec<ActorId> {
223 self.actor_ids.clone()
224 }
225
226 #[must_use]
228 pub fn strategy_ids(&self) -> Vec<StrategyId> {
229 self.strategy_ids.clone()
230 }
231
232 #[must_use]
234 pub fn exec_algorithm_ids(&self) -> Vec<ExecAlgorithmId> {
235 self.exec_algorithm_ids.clone()
236 }
237
238 pub fn create_component_clock(&mut self, component_id: ComponentId) -> Rc<RefCell<dyn Clock>> {
244 let clock: Rc<RefCell<dyn Clock>> = match self.environment {
245 Environment::Backtest => Rc::new(RefCell::new(TestClock::new())),
246 Environment::Live | Environment::Sandbox => Self::create_live_clock(),
247 };
248 self.clocks.insert(component_id, clock.clone());
249 clock
250 }
251
252 #[cfg(feature = "live")]
253 fn create_live_clock() -> Rc<RefCell<dyn Clock>> {
254 Rc::new(RefCell::new(
255 nautilus_common::live::clock::LiveClock::default(), ))
257 }
258
259 #[cfg(not(feature = "live"))]
260 fn create_live_clock() -> Rc<RefCell<dyn Clock>> {
261 panic!("Live/Sandbox environment requires the 'live' feature to be enabled");
262 }
263
264 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
272 where
273 T: DataActor + Component + Debug + 'static,
274 {
275 self.validate_actor_or_strategy_registration()?;
276
277 let actor_id = actor.actor_id();
278
279 if self.actor_ids.contains(&actor_id) {
281 anyhow::bail!("Actor {actor_id} is already registered");
282 }
283
284 let component_id = ComponentId::new(actor_id.inner().as_str());
285 let clock = self.create_component_clock(component_id);
286
287 let mut actor_mut = actor;
288 actor_mut.register(self.trader_id, clock, self.cache.clone())?;
289
290 self.add_registered_actor(actor_mut)
291 }
292
293 pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
305 where
306 F: FnOnce() -> anyhow::Result<T>,
307 T: DataActor + Component + Debug + 'static,
308 {
309 let actor = factory()?;
310
311 self.add_actor(actor)
312 }
313
314 pub fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
320 where
321 T: DataActor + Component + Debug + 'static,
322 {
323 let actor_id = actor.actor_id();
324
325 register_component_actor(actor);
327
328 self.actor_ids.push(actor_id);
330
331 log::info!("Registered actor {actor_id} with trader {}", self.trader_id);
332
333 Ok(())
334 }
335
336 pub fn add_actor_id_for_lifecycle(&mut self, actor_id: ActorId) -> anyhow::Result<()> {
346 if self.actor_ids.contains(&actor_id) {
348 anyhow::bail!("Actor '{actor_id}' is already tracked by trader");
349 }
350
351 self.actor_ids.push(actor_id);
353
354 log::debug!(
355 "Added actor ID '{actor_id}' to trader {} for lifecycle management",
356 self.trader_id
357 );
358
359 Ok(())
360 }
361
362 pub fn add_exec_algorithm_id_for_lifecycle(
372 &mut self,
373 exec_algorithm_id: ExecAlgorithmId,
374 ) -> anyhow::Result<()> {
375 if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
376 anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already tracked by trader");
377 }
378
379 self.exec_algorithm_ids.push(exec_algorithm_id);
380
381 log::debug!(
382 "Added exec algorithm ID '{exec_algorithm_id}' to trader {} for lifecycle management",
383 self.trader_id
384 );
385
386 Ok(())
387 }
388
389 pub fn add_strategy_id_with_subscriptions<T>(
400 &mut self,
401 strategy_id: StrategyId,
402 ) -> anyhow::Result<()>
403 where
404 T: Strategy + Component + Debug + 'static,
405 {
406 if self.strategy_ids.contains(&strategy_id) {
407 anyhow::bail!("Strategy '{strategy_id}' is already tracked by trader");
408 }
409
410 let actor_id = Ustr::from(strategy_id.inner().as_str());
411
412 let order_topic = get_event_orders_topic(strategy_id);
414 let order_actor_id = actor_id;
415 let order_handler = TypedHandler::from(move |event: &OrderEventAny| {
416 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
417 strategy.handle_order_event(event.clone());
418 } else {
419 log::error!("Strategy {order_actor_id} not found for order event handling");
420 }
421 });
422 let order_handler_id = order_handler.id();
423 msgbus::subscribe_order_events(order_topic.into(), order_handler, None);
424
425 let position_topic = get_event_positions_topic(strategy_id);
427 let position_handler = TypedHandler::from(move |event: &PositionEvent| {
428 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
429 strategy.handle_position_event(event.clone());
430 } else {
431 log::error!("Strategy {actor_id} not found for position event handling");
432 }
433 });
434 let position_handler_id = position_handler.id();
435 msgbus::subscribe_position_events(position_topic.into(), position_handler, None);
436
437 let control_actor_id = actor_id;
438 let control_handler = TypedHandler::from(move |command: &StrategyCommand| {
439 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&control_actor_id) {
440 match command {
441 StrategyCommand::ExitMarket => {
442 if let Err(e) = strategy.market_exit() {
443 log::error!(
444 "Error handling strategy command for {control_actor_id}: {e}"
445 );
446 }
447 }
448 }
449 } else {
450 log::error!("Strategy {control_actor_id} not found for control handling");
451 }
452 });
453 get_message_bus()
454 .borrow_mut()
455 .endpoint_map::<StrategyCommand>()
456 .register(strategy_control_endpoint(strategy_id), control_handler);
457
458 self.strategy_ids.push(strategy_id);
459 self.strategy_handler_ids
460 .insert(strategy_id, (order_handler_id, position_handler_id));
461
462 let stop_actor_id = actor_id;
464 let stop_fn = Box::new(move || -> bool {
465 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&stop_actor_id) {
466 Strategy::stop(&mut *strategy)
467 } else {
468 log::error!("Strategy {stop_actor_id} not found for stop");
469 true
470 }
471 });
472 self.strategy_stop_fns.insert(strategy_id, stop_fn);
473
474 log::debug!(
475 "Added strategy '{strategy_id}' to trader {} with event subscriptions",
476 self.trader_id
477 );
478
479 Ok(())
480 }
481
482 pub fn add_strategy<T>(&mut self, mut strategy: T) -> anyhow::Result<()>
494 where
495 T: Strategy + Component + Debug + 'static,
496 {
497 self.validate_actor_or_strategy_registration()?;
498
499 let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
500
501 if self.strategy_ids.contains(&strategy_id) {
503 anyhow::bail!("Strategy {strategy_id} is already registered");
504 }
505
506 let component_id = strategy.component_id();
507 let clock = self.create_component_clock(component_id);
508
509 strategy.core_mut().register(
511 self.trader_id,
512 clock.clone(),
513 self.cache.clone(),
514 self.portfolio.clone(),
515 )?;
516
517 let actor_id = strategy.actor_id().inner();
519 let callback = TimeEventCallback::from(move |event: TimeEvent| {
520 if let Some(mut actor) = try_get_actor_unchecked::<T>(&actor_id) {
521 actor.handle_time_event(&event);
522 } else {
523 log::error!("Strategy {actor_id} not found for time event handling");
524 }
525 });
526 clock.borrow_mut().register_default_handler(callback);
527
528 strategy.initialize()?;
530
531 register_component_actor(strategy);
533
534 let order_topic = get_event_orders_topic(strategy_id);
535 let order_actor_id = actor_id;
536 let order_handler = TypedHandler::from(move |event: &OrderEventAny| {
537 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
538 strategy.handle_order_event(event.clone());
539 } else {
540 log::error!("Strategy {order_actor_id} not found for order event handling");
541 }
542 });
543 let order_handler_id = order_handler.id();
544 msgbus::subscribe_order_events(order_topic.into(), order_handler, None);
545
546 let position_topic = get_event_positions_topic(strategy_id);
547 let position_handler = TypedHandler::from(move |event: &PositionEvent| {
548 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
549 strategy.handle_position_event(event.clone());
550 } else {
551 log::error!("Strategy {actor_id} not found for position event handling");
552 }
553 });
554 let position_handler_id = position_handler.id();
555 msgbus::subscribe_position_events(position_topic.into(), position_handler, None);
556
557 let control_actor_id = actor_id;
558 let control_handler = TypedHandler::from(move |command: &StrategyCommand| {
559 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&control_actor_id) {
560 match command {
561 StrategyCommand::ExitMarket => {
562 if let Err(e) = strategy.market_exit() {
563 log::error!(
564 "Error handling strategy command for {control_actor_id}: {e}"
565 );
566 }
567 }
568 }
569 } else {
570 log::error!("Strategy {control_actor_id} not found for control handling");
571 }
572 });
573 get_message_bus()
574 .borrow_mut()
575 .endpoint_map::<StrategyCommand>()
576 .register(strategy_control_endpoint(strategy_id), control_handler);
577
578 self.strategy_ids.push(strategy_id);
579 self.strategy_handler_ids
580 .insert(strategy_id, (order_handler_id, position_handler_id));
581
582 let stop_actor_id = actor_id;
583 let stop_fn = Box::new(move || -> bool {
584 if let Some(mut strategy) = try_get_actor_unchecked::<T>(&stop_actor_id) {
585 Strategy::stop(&mut *strategy)
586 } else {
587 log::error!("Strategy {stop_actor_id} not found for stop");
588 true }
590 });
591 self.strategy_stop_fns.insert(strategy_id, stop_fn);
592
593 log::info!(
594 "Registered strategy {strategy_id} with trader {}",
595 self.trader_id
596 );
597
598 Ok(())
599 }
600
601 pub fn add_exec_algorithm<T>(&mut self, mut exec_algorithm: T) -> anyhow::Result<()>
612 where
613 T: ExecutionAlgorithm + Component + Debug + 'static,
614 {
615 self.validate_exec_algorithm_registration()?;
616
617 let exec_algorithm_id =
618 ExecAlgorithmId::from(exec_algorithm.component_id().inner().as_str());
619
620 if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
621 anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already registered");
622 }
623
624 let component_id = exec_algorithm.component_id();
625 let clock = self.create_component_clock(component_id);
626
627 exec_algorithm.register(self.trader_id, clock, self.cache.clone())?;
628
629 register_component_actor(exec_algorithm);
630
631 let actor_id = Ustr::from(exec_algorithm_id.inner().as_str());
634 let endpoint: Ustr = format!("{exec_algorithm_id}.execute").into();
635 let handler = ShareableMessageHandler::from_typed(move |command: &TradingCommand| {
636 if let Some(mut algo) = try_get_actor_unchecked::<T>(&actor_id) {
637 if let Err(e) = algo.execute(command.clone()) {
638 log::error!("Error executing command on algorithm {actor_id}: {e}");
639 }
640 } else {
641 log::error!("Execution algorithm {actor_id} not found in registry");
642 }
643 });
644 msgbus::register_any(endpoint.into(), handler);
645
646 self.exec_algorithm_ids.push(exec_algorithm_id);
647
648 log::info!(
649 "Registered execution algorithm {exec_algorithm_id} with trader {}",
650 self.trader_id
651 );
652
653 Ok(())
654 }
655
656 fn validate_actor_or_strategy_registration(&self) -> anyhow::Result<()> {
662 match self.state {
663 ComponentState::PreInitialized
664 | ComponentState::Ready
665 | ComponentState::Stopped
666 | ComponentState::Running => Ok(()),
667 ComponentState::Disposed => {
668 anyhow::bail!("Cannot add components to disposed trader")
669 }
670 _ => anyhow::bail!("Cannot add components in current state: {}", self.state),
671 }
672 }
673
674 fn validate_exec_algorithm_registration(&self) -> anyhow::Result<()> {
676 match self.state {
677 ComponentState::PreInitialized | ComponentState::Ready | ComponentState::Stopped => {
678 Ok(())
679 }
680 ComponentState::Running => {
681 anyhow::bail!("Cannot add execution algorithms to running trader")
682 }
683 ComponentState::Disposed => {
684 anyhow::bail!("Cannot add components to disposed trader")
685 }
686 _ => anyhow::bail!(
687 "Cannot add execution algorithms in current state: {}",
688 self.state
689 ),
690 }
691 }
692
693 pub fn start_components(&mut self) -> anyhow::Result<()> {
699 for actor_id in &self.actor_ids {
700 log::debug!("Starting actor {actor_id}");
701 start_component(&actor_id.inner())?;
702 }
703
704 for strategy_id in &self.strategy_ids {
705 log::debug!("Starting strategy {strategy_id}");
706 start_component(&strategy_id.inner())?;
707 }
708
709 for exec_algorithm_id in &self.exec_algorithm_ids {
710 log::debug!("Starting execution algorithm {exec_algorithm_id}");
711 start_component(&exec_algorithm_id.inner())?;
712 }
713
714 Ok(())
715 }
716
717 pub fn stop_components(&mut self) -> anyhow::Result<()> {
723 for actor_id in &self.actor_ids {
724 log::debug!("Stopping actor {actor_id}");
725 stop_component(&actor_id.inner())?;
726 }
727
728 for exec_algorithm_id in &self.exec_algorithm_ids {
729 log::debug!("Stopping execution algorithm {exec_algorithm_id}");
730 stop_component(&exec_algorithm_id.inner())?;
731 }
732
733 for strategy_id in self.strategy_ids.clone() {
734 log::debug!("Stopping strategy {strategy_id}");
735 let should_proceed = self
736 .strategy_stop_fns
737 .get_mut(&strategy_id)
738 .is_none_or(|stop_fn| stop_fn());
739
740 if should_proceed {
741 stop_component(&strategy_id.inner())?;
742 }
743 }
744
745 Ok(())
746 }
747
748 pub fn reset_components(&mut self) -> anyhow::Result<()> {
754 for actor_id in &self.actor_ids {
755 log::debug!("Resetting actor {actor_id}");
756 reset_component(&actor_id.inner())?;
757 }
758
759 for strategy_id in &self.strategy_ids {
760 log::debug!("Resetting strategy {strategy_id}");
761 reset_component(&strategy_id.inner())?;
762 }
763
764 for exec_algorithm_id in &self.exec_algorithm_ids {
765 log::debug!("Resetting execution algorithm {exec_algorithm_id}");
766 reset_component(&exec_algorithm_id.inner())?;
767 }
768
769 Ok(())
770 }
771
772 pub fn dispose_components(&mut self) -> anyhow::Result<()> {
778 for actor_id in &self.actor_ids {
779 log::debug!("Disposing actor {actor_id}");
780 dispose_component(&actor_id.inner())?;
781 }
782
783 for strategy_id in &self.strategy_ids {
784 log::debug!("Disposing strategy {strategy_id}");
785 dispose_component(&strategy_id.inner())?;
786 get_message_bus()
787 .borrow_mut()
788 .endpoint_map::<StrategyCommand>()
789 .deregister(strategy_control_endpoint(*strategy_id));
790 }
791
792 for exec_algorithm_id in &self.exec_algorithm_ids {
793 log::debug!("Disposing execution algorithm {exec_algorithm_id}");
794 dispose_component(&exec_algorithm_id.inner())?;
795 let endpoint: Ustr = format!("{exec_algorithm_id}.execute").into();
796 msgbus::deregister_any(endpoint.into());
797 }
798
799 self.actor_ids.clear();
800 self.strategy_ids.clear();
801 self.strategy_stop_fns.clear();
802 self.strategy_handler_ids.clear();
803 self.exec_algorithm_ids.clear();
804 self.clocks.clear();
805
806 Ok(())
807 }
808
809 pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
815 for strategy_id in &self.strategy_ids {
816 log::debug!("Disposing strategy {strategy_id}");
817 dispose_component(&strategy_id.inner())?;
818 let component_id = ComponentId::new(strategy_id.inner().as_str());
819 self.clocks.remove(&component_id);
820
821 if let Some((order_hid, position_hid)) = self.strategy_handler_ids.get(strategy_id) {
823 let order_topic = get_event_orders_topic(*strategy_id);
824 let position_topic = get_event_positions_topic(*strategy_id);
825 msgbus::remove_order_event_handler(order_topic.into(), *order_hid);
826 msgbus::remove_position_event_handler(position_topic.into(), *position_hid);
827 }
828
829 get_message_bus()
830 .borrow_mut()
831 .endpoint_map::<StrategyCommand>()
832 .deregister(strategy_control_endpoint(*strategy_id));
833 }
834
835 self.strategy_ids.clear();
836 self.strategy_stop_fns.clear();
837 self.strategy_handler_ids.clear();
838
839 Ok(())
840 }
841
842 pub fn clear_actors(&mut self) -> anyhow::Result<()> {
848 for actor_id in &self.actor_ids {
849 log::debug!("Disposing actor {actor_id}");
850 let _ = stop_component(&actor_id.inner());
853 dispose_component(&actor_id.inner())?;
854 let component_id = ComponentId::new(actor_id.inner().as_str());
855 self.clocks.remove(&component_id);
856 }
857
858 self.actor_ids.clear();
859
860 Ok(())
861 }
862
863 pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
869 for exec_algorithm_id in &self.exec_algorithm_ids {
870 log::debug!("Disposing execution algorithm {exec_algorithm_id}");
871 dispose_component(&exec_algorithm_id.inner())?;
872 let endpoint: Ustr = format!("{exec_algorithm_id}.execute").into();
873 msgbus::deregister_any(endpoint.into());
874 let component_id = ComponentId::new(exec_algorithm_id.inner().as_str());
875 self.clocks.remove(&component_id);
876 }
877
878 self.exec_algorithm_ids.clear();
879
880 Ok(())
881 }
882
883 pub fn start_actor(&self, actor_id: &ActorId) -> anyhow::Result<()> {
891 if !self.actor_ids.contains(actor_id) {
892 anyhow::bail!("Cannot start actor, {actor_id} not found");
893 }
894 start_component(&actor_id.inner())
895 }
896
897 pub fn stop_actor(&self, actor_id: &ActorId) -> anyhow::Result<()> {
903 if !self.actor_ids.contains(actor_id) {
904 anyhow::bail!("Cannot stop actor, {actor_id} not found");
905 }
906 stop_component(&actor_id.inner())
907 }
908
909 pub fn remove_actor(&mut self, actor_id: &ActorId) -> anyhow::Result<()> {
918 let pos = self
919 .actor_ids
920 .iter()
921 .position(|id| id == actor_id)
922 .ok_or_else(|| anyhow::anyhow!("Cannot remove actor, {actor_id} not found"))?;
923
924 let _ = stop_component(&actor_id.inner());
926 dispose_component(&actor_id.inner())?;
927
928 self.actor_ids.swap_remove(pos);
929 let component_id = ComponentId::new(actor_id.inner().as_str());
930 self.clocks.remove(&component_id);
931
932 log::info!("Removed actor {actor_id} from trader {}", self.trader_id);
933 Ok(())
934 }
935
936 pub fn start_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<()> {
942 if !self.strategy_ids.contains(strategy_id) {
943 anyhow::bail!("Cannot start strategy, {strategy_id} not found");
944 }
945 start_component(&strategy_id.inner())
946 }
947
948 pub fn stop_strategy(&mut self, strategy_id: &StrategyId) -> anyhow::Result<()> {
957 if !self.strategy_ids.contains(strategy_id) {
958 anyhow::bail!("Cannot stop strategy, {strategy_id} not found");
959 }
960
961 let should_proceed = self
962 .strategy_stop_fns
963 .get_mut(strategy_id)
964 .is_none_or(|stop_fn| stop_fn());
965
966 if should_proceed {
967 stop_component(&strategy_id.inner())?;
968 }
969
970 Ok(())
971 }
972
973 pub fn market_exit_strategy(
982 trader: &Rc<RefCell<Self>>,
983 strategy_id: &StrategyId,
984 ) -> anyhow::Result<()> {
985 let handler = trader.borrow().strategy_command_handler(strategy_id)?;
986 handler.handle(&StrategyCommand::ExitMarket);
987 Ok(())
988 }
989
990 fn strategy_command_handler(
991 &self,
992 strategy_id: &StrategyId,
993 ) -> anyhow::Result<TypedHandler<StrategyCommand>> {
994 if !self.strategy_ids.contains(strategy_id) {
995 anyhow::bail!("Cannot market exit strategy, {strategy_id} not found");
996 }
997
998 let endpoint = strategy_control_endpoint(*strategy_id);
999 let handler = {
1000 let msgbus = get_message_bus();
1001 msgbus
1002 .borrow_mut()
1003 .endpoint_map::<StrategyCommand>()
1004 .get(endpoint)
1005 .cloned()
1006 };
1007
1008 let Some(handler) = handler else {
1009 anyhow::bail!(
1010 "Cannot exit market for strategy {strategy_id}: control endpoint '{}' not registered",
1011 endpoint.as_str()
1012 );
1013 };
1014
1015 Ok(handler)
1016 }
1017
1018 pub fn remove_strategy(&mut self, strategy_id: &StrategyId) -> anyhow::Result<()> {
1027 let pos = self
1028 .strategy_ids
1029 .iter()
1030 .position(|id| id == strategy_id)
1031 .ok_or_else(|| anyhow::anyhow!("Cannot remove strategy, {strategy_id} not found"))?;
1032
1033 let _ = stop_component(&strategy_id.inner());
1035 dispose_component(&strategy_id.inner())?;
1036
1037 if let Some((order_hid, position_hid)) = self.strategy_handler_ids.remove(strategy_id) {
1039 let order_topic = get_event_orders_topic(*strategy_id);
1040 let position_topic = get_event_positions_topic(*strategy_id);
1041 msgbus::remove_order_event_handler(order_topic.into(), order_hid);
1042 msgbus::remove_position_event_handler(position_topic.into(), position_hid);
1043 }
1044
1045 get_message_bus()
1046 .borrow_mut()
1047 .endpoint_map::<StrategyCommand>()
1048 .deregister(strategy_control_endpoint(*strategy_id));
1049
1050 self.strategy_ids.swap_remove(pos);
1051 self.strategy_stop_fns.remove(strategy_id);
1052 let component_id = ComponentId::new(strategy_id.inner().as_str());
1053 self.clocks.remove(&component_id);
1054
1055 log::info!(
1056 "Removed strategy {strategy_id} from trader {}",
1057 self.trader_id
1058 );
1059 Ok(())
1060 }
1061
1062 pub fn initialize(&mut self) -> anyhow::Result<()> {
1072 let new_state = self.state.transition(&ComponentTrigger::Initialize)?;
1073 self.state = new_state;
1074
1075 Ok(())
1076 }
1077
1078 fn on_start(&mut self) -> anyhow::Result<()> {
1079 self.start_components()?;
1080
1081 self.ts_started = Some(self.clock.borrow().timestamp_ns());
1083
1084 Ok(())
1085 }
1086
1087 fn on_stop(&mut self) -> anyhow::Result<()> {
1088 self.stop_components()?;
1089
1090 self.ts_stopped = Some(self.clock.borrow().timestamp_ns());
1091
1092 Ok(())
1093 }
1094
1095 fn on_reset(&mut self) -> anyhow::Result<()> {
1096 self.reset_components()?;
1097
1098 self.ts_started = None;
1099 self.ts_stopped = None;
1100
1101 Ok(())
1102 }
1103
1104 fn on_dispose(&mut self) -> anyhow::Result<()> {
1105 if self.is_running() {
1106 self.stop()?;
1107 }
1108
1109 self.dispose_components()?;
1110
1111 Ok(())
1112 }
1113}
1114
1115impl Component for Trader {
1116 fn component_id(&self) -> ComponentId {
1117 ComponentId::new(format!("Trader-{}", self.trader_id))
1118 }
1119
1120 fn state(&self) -> ComponentState {
1121 self.state
1122 }
1123
1124 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1125 self.state = self.state.transition(&trigger)?;
1126 log::info!("{}", self.state.variant_name());
1127 Ok(())
1128 }
1129
1130 fn register(
1131 &mut self,
1132 _trader_id: TraderId,
1133 _clock: Rc<RefCell<dyn Clock>>,
1134 _cache: Rc<RefCell<Cache>>,
1135 ) -> anyhow::Result<()> {
1136 anyhow::bail!("Trader cannot register with itself")
1137 }
1138
1139 fn on_start(&mut self) -> anyhow::Result<()> {
1140 Self::on_start(self)
1141 }
1142
1143 fn on_stop(&mut self) -> anyhow::Result<()> {
1144 Self::on_stop(self)
1145 }
1146
1147 fn on_reset(&mut self) -> anyhow::Result<()> {
1148 Self::on_reset(self)
1149 }
1150
1151 fn on_dispose(&mut self) -> anyhow::Result<()> {
1152 Self::on_dispose(self)
1153 }
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158 use std::{cell::RefCell, rc::Rc};
1159
1160 use nautilus_common::{
1161 actor::{DataActorCore, data_actor::DataActorConfig},
1162 cache::Cache,
1163 clock::TestClock,
1164 enums::{ComponentState, Environment},
1165 msgbus,
1166 msgbus::{MessageBus, TypedHandler, switchboard::get_event_orders_topic},
1167 nautilus_actor,
1168 };
1169 use nautilus_core::UUID4;
1170 use nautilus_data::engine::{DataEngine, config::DataEngineConfig};
1171 use nautilus_execution::engine::{ExecutionEngine, config::ExecutionEngineConfig};
1172 use nautilus_model::{
1173 events::OrderAccepted,
1174 identifiers::{ActorId, ComponentId, TraderId},
1175 orders::OrderAny,
1176 stubs::TestDefault,
1177 };
1178 use nautilus_portfolio::portfolio::Portfolio;
1179 use nautilus_risk::engine::{RiskEngine, config::RiskEngineConfig};
1180 use nautilus_trading::{
1181 ExecutionAlgorithm as ExecutionAlgorithmTrait, ExecutionAlgorithmConfig,
1182 ExecutionAlgorithmCore, nautilus_strategy,
1183 strategy::{config::StrategyConfig, core::StrategyCore},
1184 };
1185 use rstest::rstest;
1186
1187 use super::*;
1188
1189 #[derive(Debug)]
1191 struct TestDataActor {
1192 core: DataActorCore,
1193 }
1194
1195 impl TestDataActor {
1196 fn new(config: DataActorConfig) -> Self {
1197 Self {
1198 core: DataActorCore::new(config),
1199 }
1200 }
1201 }
1202
1203 impl DataActor for TestDataActor {}
1204
1205 nautilus_actor!(TestDataActor);
1206
1207 #[derive(Debug)]
1209 struct TestExecAlgorithm {
1210 core: ExecutionAlgorithmCore,
1211 }
1212
1213 impl TestExecAlgorithm {
1214 fn new(config: ExecutionAlgorithmConfig) -> Self {
1215 Self {
1216 core: ExecutionAlgorithmCore::new(config),
1217 }
1218 }
1219 }
1220
1221 impl DataActor for TestExecAlgorithm {}
1222
1223 nautilus_actor!(TestExecAlgorithm);
1224
1225 impl ExecutionAlgorithmTrait for TestExecAlgorithm {
1226 fn core_mut(&mut self) -> &mut ExecutionAlgorithmCore {
1227 &mut self.core
1228 }
1229
1230 fn on_order(&mut self, _order: OrderAny) -> anyhow::Result<()> {
1231 Ok(())
1232 }
1233 }
1234
1235 #[derive(Debug)]
1237 struct TestStrategy {
1238 core: StrategyCore,
1239 }
1240
1241 impl TestStrategy {
1242 fn new(config: StrategyConfig) -> Self {
1243 Self {
1244 core: StrategyCore::new(config),
1245 }
1246 }
1247 }
1248
1249 impl DataActor for TestStrategy {}
1250
1251 nautilus_strategy!(TestStrategy);
1252
1253 #[expect(clippy::type_complexity)]
1254 fn create_trader_components() -> (
1255 Rc<RefCell<MessageBus>>,
1256 Rc<RefCell<Cache>>,
1257 Rc<RefCell<Portfolio>>,
1258 Rc<RefCell<DataEngine>>,
1259 Rc<RefCell<RiskEngine>>,
1260 Rc<RefCell<ExecutionEngine>>,
1261 Rc<RefCell<TestClock>>,
1262 ) {
1263 let trader_id = TraderId::test_default();
1264 let instance_id = UUID4::new();
1265 let clock = Rc::new(RefCell::new(TestClock::new()));
1266 clock.borrow_mut().set_time(1_000_000_000u64.into());
1268 let msgbus = Rc::new(RefCell::new(MessageBus::new(
1269 trader_id,
1270 instance_id,
1271 Some("test".to_string()),
1272 None,
1273 )));
1274 let cache = Rc::new(RefCell::new(Cache::new(None, None)));
1275 let portfolio = Rc::new(RefCell::new(Portfolio::new(
1276 cache.clone(),
1277 clock.clone() as Rc<RefCell<dyn Clock>>,
1278 None,
1279 )));
1280 let data_engine = Rc::new(RefCell::new(DataEngine::new(
1281 clock.clone(),
1282 cache.clone(),
1283 Some(DataEngineConfig::default()),
1284 )));
1285
1286 let risk_cache = Rc::new(RefCell::new(Cache::new(None, None)));
1288 let risk_clock = Rc::new(RefCell::new(TestClock::new()));
1289 let risk_portfolio = Portfolio::new(
1290 risk_cache.clone(),
1291 risk_clock.clone() as Rc<RefCell<dyn Clock>>,
1292 None,
1293 );
1294 let risk_engine = Rc::new(RefCell::new(RiskEngine::new(
1295 RiskEngineConfig::default(),
1296 risk_portfolio,
1297 risk_clock as Rc<RefCell<dyn Clock>>,
1298 risk_cache,
1299 )));
1300 let exec_engine = Rc::new(RefCell::new(ExecutionEngine::new(
1301 clock.clone(),
1302 cache.clone(),
1303 Some(ExecutionEngineConfig::default()),
1304 )));
1305
1306 (
1307 msgbus,
1308 cache,
1309 portfolio,
1310 data_engine,
1311 risk_engine,
1312 exec_engine,
1313 clock,
1314 )
1315 }
1316
1317 #[rstest]
1318 fn test_trader_creation() {
1319 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1320 create_trader_components();
1321 let trader_id = TraderId::test_default();
1322 let instance_id = UUID4::new();
1323
1324 let trader = Trader::new(
1325 trader_id,
1326 instance_id,
1327 Environment::Backtest,
1328 clock,
1329 cache,
1330 portfolio,
1331 );
1332
1333 assert_eq!(trader.trader_id(), trader_id);
1334 assert_eq!(trader.instance_id(), instance_id);
1335 assert_eq!(trader.environment(), Environment::Backtest);
1336 assert_eq!(trader.state(), ComponentState::PreInitialized);
1337 assert_eq!(trader.actor_count(), 0);
1338 assert_eq!(trader.strategy_count(), 0);
1339 assert_eq!(trader.exec_algorithm_count(), 0);
1340 assert_eq!(trader.component_count(), 0);
1341 assert!(!trader.is_running());
1342 assert!(!trader.is_stopped());
1343 assert!(!trader.is_disposed());
1344 assert!(trader.ts_created() > 0);
1345 assert!(trader.ts_started().is_none());
1346 assert!(trader.ts_stopped().is_none());
1347 }
1348
1349 #[rstest]
1350 fn test_trader_component_id() {
1351 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1352 create_trader_components();
1353 let trader_id = TraderId::from("TRADER-001");
1354 let instance_id = UUID4::new();
1355
1356 let trader = Trader::new(
1357 trader_id,
1358 instance_id,
1359 Environment::Backtest,
1360 clock,
1361 cache,
1362 portfolio,
1363 );
1364
1365 assert_eq!(
1366 trader.component_id(),
1367 ComponentId::from("Trader-TRADER-001")
1368 );
1369 }
1370
1371 #[rstest]
1372 fn test_add_actor_success() {
1373 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1374 create_trader_components();
1375 let trader_id = TraderId::test_default();
1376 let instance_id = UUID4::new();
1377
1378 let mut trader = Trader::new(
1379 trader_id,
1380 instance_id,
1381 Environment::Backtest,
1382 clock,
1383 cache,
1384 portfolio,
1385 );
1386
1387 let actor = TestDataActor::new(DataActorConfig::default());
1388 let actor_id = actor.actor_id();
1389
1390 let result = trader.add_actor(actor);
1391 assert!(result.is_ok());
1392 assert_eq!(trader.actor_count(), 1);
1393 assert_eq!(trader.component_count(), 1);
1394 assert!(trader.actor_ids().contains(&actor_id));
1395 }
1396
1397 #[rstest]
1398 fn test_add_duplicate_actor_fails() {
1399 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1400 create_trader_components();
1401 let trader_id = TraderId::test_default();
1402 let instance_id = UUID4::new();
1403
1404 let mut trader = Trader::new(
1405 trader_id,
1406 instance_id,
1407 Environment::Backtest,
1408 clock,
1409 cache,
1410 portfolio,
1411 );
1412
1413 let config = DataActorConfig {
1414 actor_id: Some(ActorId::from("TestActor")),
1415 ..Default::default()
1416 };
1417 let actor1 = TestDataActor::new(config.clone());
1418 let actor2 = TestDataActor::new(config);
1419
1420 assert!(trader.add_actor(actor1).is_ok());
1422 assert_eq!(trader.actor_count(), 1);
1423
1424 let result = trader.add_actor(actor2);
1426 assert!(result.is_err());
1427 assert!(
1428 result
1429 .unwrap_err()
1430 .to_string()
1431 .contains("already registered")
1432 );
1433 assert_eq!(trader.actor_count(), 1);
1434 }
1435
1436 #[rstest]
1437 fn test_add_strategy_success() {
1438 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1439 create_trader_components();
1440 let trader_id = TraderId::test_default();
1441 let instance_id = UUID4::new();
1442
1443 let mut trader = Trader::new(
1444 trader_id,
1445 instance_id,
1446 Environment::Backtest,
1447 clock,
1448 cache,
1449 portfolio,
1450 );
1451
1452 let config = StrategyConfig {
1453 strategy_id: Some(StrategyId::from("Test-Strategy")),
1454 ..Default::default()
1455 };
1456 let strategy = TestStrategy::new(config);
1457 let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
1458
1459 let result = trader.add_strategy(strategy);
1460 assert!(result.is_ok());
1461 assert_eq!(trader.strategy_count(), 1);
1462 assert_eq!(trader.component_count(), 1);
1463 assert!(trader.strategy_ids().contains(&strategy_id));
1464 }
1465
1466 #[rstest]
1467 fn test_add_exec_algorithm_success() {
1468 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1469 create_trader_components();
1470 let trader_id = TraderId::test_default();
1471 let instance_id = UUID4::new();
1472
1473 let mut trader = Trader::new(
1474 trader_id,
1475 instance_id,
1476 Environment::Backtest,
1477 clock,
1478 cache,
1479 portfolio,
1480 );
1481
1482 let config = ExecutionAlgorithmConfig {
1483 exec_algorithm_id: Some(ExecAlgorithmId::from("TestExecAlgorithm")),
1484 ..Default::default()
1485 };
1486 let exec_algorithm = TestExecAlgorithm::new(config);
1487 let exec_algorithm_id = ExecAlgorithmId::from(exec_algorithm.actor_id().inner().as_str());
1488
1489 let result = trader.add_exec_algorithm(exec_algorithm);
1490 assert!(result.is_ok());
1491 assert_eq!(trader.exec_algorithm_count(), 1);
1492 assert_eq!(trader.component_count(), 1);
1493 assert!(trader.exec_algorithm_ids().contains(&exec_algorithm_id));
1494 }
1495
1496 #[rstest]
1497 fn test_cannot_add_exec_algorithm_while_running() {
1498 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1499 create_trader_components();
1500 let trader_id = TraderId::test_default();
1501 let instance_id = UUID4::new();
1502
1503 let mut trader = Trader::new(
1504 trader_id,
1505 instance_id,
1506 Environment::Backtest,
1507 clock,
1508 cache,
1509 portfolio,
1510 );
1511 trader.state = ComponentState::Running;
1512
1513 let config = ExecutionAlgorithmConfig {
1514 exec_algorithm_id: Some(ExecAlgorithmId::from("TestExecAlgorithm")),
1515 ..Default::default()
1516 };
1517 let exec_algorithm = TestExecAlgorithm::new(config);
1518
1519 let result = trader.add_exec_algorithm(exec_algorithm);
1520 assert!(result.is_err());
1521 assert_eq!(
1522 result.unwrap_err().to_string(),
1523 "Cannot add execution algorithms to running trader"
1524 );
1525 assert_eq!(trader.exec_algorithm_count(), 0);
1526 }
1527
1528 #[rstest]
1529 fn test_component_lifecycle() {
1530 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1531 create_trader_components();
1532 let trader_id = TraderId::test_default();
1533 let instance_id = UUID4::new();
1534
1535 let mut trader = Trader::new(
1536 trader_id,
1537 instance_id,
1538 Environment::Backtest,
1539 clock,
1540 cache,
1541 portfolio,
1542 );
1543
1544 let actor = TestDataActor::new(DataActorConfig::default());
1546
1547 let strategy_config = StrategyConfig {
1548 strategy_id: Some(StrategyId::from("Test-Strategy")),
1549 ..Default::default()
1550 };
1551 let strategy = TestStrategy::new(strategy_config);
1552
1553 let exec_algorithm_config = ExecutionAlgorithmConfig {
1554 exec_algorithm_id: Some(ExecAlgorithmId::from("TestExecAlgorithm")),
1555 ..Default::default()
1556 };
1557 let exec_algorithm = TestExecAlgorithm::new(exec_algorithm_config);
1558
1559 assert!(trader.add_actor(actor).is_ok());
1560 assert!(trader.add_strategy(strategy).is_ok());
1561 assert!(trader.add_exec_algorithm(exec_algorithm).is_ok());
1562 assert_eq!(trader.component_count(), 3);
1563
1564 let start_result = trader.start_components();
1566 assert!(start_result.is_ok(), "{:?}", start_result.unwrap_err());
1567
1568 assert!(trader.stop_components().is_ok());
1570
1571 assert!(trader.reset_components().is_ok());
1573
1574 assert!(trader.dispose_components().is_ok());
1576 assert_eq!(trader.component_count(), 0);
1577 }
1578
1579 #[rstest]
1580 fn test_trader_component_lifecycle() {
1581 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1582 create_trader_components();
1583 let trader_id = TraderId::test_default();
1584 let instance_id = UUID4::new();
1585
1586 let mut trader = Trader::new(
1587 trader_id,
1588 instance_id,
1589 Environment::Backtest,
1590 clock,
1591 cache,
1592 portfolio,
1593 );
1594
1595 assert_eq!(trader.state(), ComponentState::PreInitialized);
1597 assert!(!trader.is_running());
1598 assert!(!trader.is_stopped());
1599 assert!(!trader.is_disposed());
1600
1601 assert!(trader.start().is_err());
1603
1604 trader.initialize().unwrap();
1606
1607 assert!(trader.start().is_ok());
1609 assert_eq!(trader.state(), ComponentState::Running);
1610 assert!(trader.is_running());
1611 assert!(trader.ts_started().is_some());
1612
1613 assert!(trader.stop().is_ok());
1615 assert_eq!(trader.state(), ComponentState::Stopped);
1616 assert!(trader.is_stopped());
1617 assert!(trader.ts_stopped().is_some());
1618
1619 assert!(trader.reset().is_ok());
1621 assert_eq!(trader.state(), ComponentState::Ready);
1622 assert!(trader.ts_started().is_none());
1623 assert!(trader.ts_stopped().is_none());
1624
1625 assert!(trader.dispose().is_ok());
1627 assert_eq!(trader.state(), ComponentState::Disposed);
1628 assert!(trader.is_disposed());
1629 }
1630
1631 #[rstest]
1632 fn test_market_exit_strategy_fails_when_control_endpoint_missing() {
1633 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1634 create_trader_components();
1635 let trader_id = TraderId::test_default();
1636 let instance_id = UUID4::new();
1637
1638 let mut trader = Trader::new(
1639 trader_id,
1640 instance_id,
1641 Environment::Backtest,
1642 clock,
1643 cache,
1644 portfolio,
1645 );
1646
1647 let config = StrategyConfig {
1648 strategy_id: Some(StrategyId::from("Test-Strategy")),
1649 ..Default::default()
1650 };
1651 let strategy = TestStrategy::new(config);
1652 let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
1653 trader.add_strategy(strategy).unwrap();
1654
1655 let endpoint = strategy_control_endpoint(strategy_id);
1656 assert!(
1657 get_message_bus()
1658 .borrow_mut()
1659 .endpoint_map::<StrategyCommand>()
1660 .is_registered(endpoint)
1661 );
1662 get_message_bus()
1663 .borrow_mut()
1664 .endpoint_map::<StrategyCommand>()
1665 .deregister(endpoint);
1666
1667 let trader = Rc::new(RefCell::new(trader));
1668 let result = Trader::market_exit_strategy(&trader, &strategy_id);
1669 assert!(result.is_err());
1670 assert_eq!(
1671 result.unwrap_err().to_string(),
1672 format!(
1673 "Cannot exit market for strategy {strategy_id}: control endpoint '{}' not registered",
1674 endpoint.as_str()
1675 )
1676 );
1677 }
1678
1679 #[rstest]
1680 fn test_remove_strategy_deregisters_strategy_endpoint() {
1681 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1682 create_trader_components();
1683 let trader_id = TraderId::test_default();
1684 let instance_id = UUID4::new();
1685
1686 let mut trader = Trader::new(
1687 trader_id,
1688 instance_id,
1689 Environment::Backtest,
1690 clock,
1691 cache,
1692 portfolio,
1693 );
1694
1695 let config = StrategyConfig {
1696 strategy_id: Some(StrategyId::from("Test-Strategy")),
1697 ..Default::default()
1698 };
1699 let strategy = TestStrategy::new(config);
1700 let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
1701 trader.add_strategy(strategy).unwrap();
1702
1703 let endpoint = strategy_control_endpoint(strategy_id);
1704 assert!(
1705 get_message_bus()
1706 .borrow_mut()
1707 .endpoint_map::<StrategyCommand>()
1708 .is_registered(endpoint)
1709 );
1710
1711 trader.remove_strategy(&strategy_id).unwrap();
1712
1713 assert!(
1714 !get_message_bus()
1715 .borrow_mut()
1716 .endpoint_map::<StrategyCommand>()
1717 .is_registered(endpoint)
1718 );
1719 }
1720
1721 #[rstest]
1722 fn test_can_add_components_while_running() {
1723 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1724 create_trader_components();
1725 let trader_id = TraderId::test_default();
1726 let instance_id = UUID4::new();
1727
1728 let mut trader = Trader::new(
1729 trader_id,
1730 instance_id,
1731 Environment::Backtest,
1732 clock,
1733 cache,
1734 portfolio,
1735 );
1736
1737 trader.state = ComponentState::Running;
1739
1740 let actor = TestDataActor::new(DataActorConfig::default());
1741 let result = trader.add_actor(actor);
1742 assert!(result.is_ok());
1743 assert_eq!(trader.actor_count(), 1);
1744 }
1745
1746 #[rstest]
1747 fn test_cannot_add_components_while_disposed() {
1748 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1749 create_trader_components();
1750 let trader_id = TraderId::test_default();
1751 let instance_id = UUID4::new();
1752
1753 let mut trader = Trader::new(
1754 trader_id,
1755 instance_id,
1756 Environment::Backtest,
1757 clock,
1758 cache,
1759 portfolio,
1760 );
1761
1762 trader.state = ComponentState::Disposed;
1764
1765 let actor = TestDataActor::new(DataActorConfig::default());
1766 let result = trader.add_actor(actor);
1767 assert!(result.is_err());
1768 assert!(result.unwrap_err().to_string().contains("disposed trader"));
1769 }
1770
1771 #[rstest]
1772 fn test_create_component_clock_backtest_creates_individual_clocks() {
1773 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1774 create_trader_components();
1775 let trader_id = TraderId::test_default();
1776 let instance_id = UUID4::new();
1777
1778 let mut trader = Trader::new(
1779 trader_id,
1780 instance_id,
1781 Environment::Backtest,
1782 clock.clone(),
1783 cache,
1784 portfolio,
1785 );
1786
1787 let component_a = ComponentId::new("ACTOR-A");
1788 let component_b = ComponentId::new("ACTOR-B");
1789 let clock_a = trader.create_component_clock(component_a);
1790 let clock_b = trader.create_component_clock(component_b);
1791
1792 assert_ne!(clock_a.as_ptr() as *const _, clock.as_ptr() as *const _);
1794 assert_ne!(clock_a.as_ptr() as *const _, clock_b.as_ptr() as *const _);
1795 }
1796
1797 #[rstest]
1798 fn test_clear_strategies_preserves_other_handlers() {
1799 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1800 create_trader_components();
1801 let trader_id = TraderId::test_default();
1802 let instance_id = UUID4::new();
1803
1804 let mut trader = Trader::new(
1805 trader_id,
1806 instance_id,
1807 Environment::Backtest,
1808 clock,
1809 cache,
1810 portfolio,
1811 );
1812
1813 let config = StrategyConfig {
1814 strategy_id: Some(StrategyId::from("Test-Strategy")),
1815 ..Default::default()
1816 };
1817 let strategy = TestStrategy::new(config);
1818 let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
1819 trader.add_strategy(strategy).unwrap();
1820
1821 let endpoint = strategy_control_endpoint(strategy_id);
1822 assert!(
1823 get_message_bus()
1824 .borrow_mut()
1825 .endpoint_map::<StrategyCommand>()
1826 .is_registered(endpoint)
1827 );
1828
1829 let ext_received = Rc::new(RefCell::new(0));
1831 let ext_clone = ext_received.clone();
1832 let ext_handler =
1833 TypedHandler::from_with_id("exec-algo-handler", move |_: &OrderEventAny| {
1834 *ext_clone.borrow_mut() += 1;
1835 });
1836 let order_topic = get_event_orders_topic(strategy_id);
1837 msgbus::subscribe_order_events(order_topic.into(), ext_handler, None);
1838
1839 trader.clear_strategies().unwrap();
1840 assert_eq!(trader.strategy_count(), 0);
1841 assert!(
1842 !get_message_bus()
1843 .borrow_mut()
1844 .endpoint_map::<StrategyCommand>()
1845 .is_registered(endpoint)
1846 );
1847
1848 let event = OrderEventAny::Accepted(OrderAccepted::test_default());
1849 msgbus::publish_order_event(order_topic, &event);
1850 assert_eq!(*ext_received.borrow(), 1);
1851 }
1852
1853 #[rstest]
1854 fn test_clear_actors_disposes_and_clears_state() {
1855 let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
1856 create_trader_components();
1857 let trader_id = TraderId::test_default();
1858 let instance_id = UUID4::new();
1859
1860 let mut trader = Trader::new(
1861 trader_id,
1862 instance_id,
1863 Environment::Backtest,
1864 clock,
1865 cache,
1866 portfolio,
1867 );
1868
1869 let actor_a = TestDataActor::new(DataActorConfig {
1870 actor_id: Some(ActorId::from("Actor-A")),
1871 ..Default::default()
1872 });
1873 let actor_b = TestDataActor::new(DataActorConfig {
1874 actor_id: Some(ActorId::from("Actor-B")),
1875 ..Default::default()
1876 });
1877 trader.add_actor(actor_a).unwrap();
1878 trader.add_actor(actor_b).unwrap();
1879 assert_eq!(trader.actor_count(), 2);
1880 assert_eq!(
1881 trader.get_component_clocks().len(),
1882 2,
1883 "each registered actor must have a component clock",
1884 );
1885
1886 trader.clear_actors().unwrap();
1887
1888 assert_eq!(trader.actor_count(), 0);
1889 assert!(trader.actor_ids().is_empty());
1890 assert_eq!(
1891 trader.get_component_clocks().len(),
1892 0,
1893 "actor clocks must be dropped after clear_actors",
1894 );
1895 }
1896}