Skip to main content

nautilus_trading/algorithm/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Core component for execution algorithms.
17
18use std::{
19    cell::RefCell,
20    fmt::Debug,
21    ops::{Deref, DerefMut},
22    rc::Rc,
23};
24
25use ahash::{AHashMap, AHashSet};
26use indexmap::IndexMap;
27use nautilus_common::{
28    actor::{DataActorConfig, DataActorCore},
29    cache::Cache,
30    clock::Clock,
31    msgbus::TypedHandler,
32};
33use nautilus_model::{
34    events::{OrderEventAny, PositionEvent},
35    identifiers::{ActorId, ClientOrderId, ExecAlgorithmId, StrategyId, TraderId},
36    orders::{OrderAny, OrderList},
37    types::Quantity,
38};
39
40use super::config::ExecutionAlgorithmConfig;
41
42/// Holds event handlers for strategy event subscriptions.
43#[derive(Clone, Debug)]
44pub struct StrategyEventHandlers {
45    /// The topic string for order events.
46    pub order_topic: String,
47    /// The handler for order events.
48    pub order_handler: TypedHandler<OrderEventAny>,
49    /// The topic string for position events.
50    pub position_topic: String,
51    /// The handler for position events.
52    pub position_handler: TypedHandler<PositionEvent>,
53}
54
55/// The core component of an [`ExecutionAlgorithm`](super::ExecutionAlgorithm).
56///
57/// This struct manages the internal state for execution algorithms including
58/// spawn ID tracking and strategy subscriptions. It wraps a [`DataActorCore`]
59/// to provide data actor capabilities.
60///
61/// User algorithms should hold this as a member and implement `Deref`/`DerefMut`
62/// to satisfy the trait bounds of [`ExecutionAlgorithm`](super::ExecutionAlgorithm).
63pub struct ExecutionAlgorithmCore {
64    /// The underlying data actor core.
65    pub actor: DataActorCore,
66    /// The execution algorithm configuration.
67    pub config: ExecutionAlgorithmConfig,
68    /// The execution algorithm ID.
69    pub exec_algorithm_id: ExecAlgorithmId,
70    /// Maps primary order client IDs to their spawn sequence counter.
71    exec_spawn_ids: AHashMap<ClientOrderId, u32>,
72    /// Tracks strategies that have been subscribed to for events.
73    subscribed_strategies: AHashSet<StrategyId>,
74    /// Tracks pending spawn reductions for quantity restoration on denial/rejection.
75    pending_spawn_reductions: AHashMap<ClientOrderId, Quantity>,
76    /// Maps strategies to their event handlers for cleanup on reset.
77    strategy_event_handlers: IndexMap<StrategyId, StrategyEventHandlers>,
78}
79
80impl Debug for ExecutionAlgorithmCore {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        f.debug_struct(stringify!(ExecutionAlgorithmCore))
83            .field("actor", &self.actor)
84            .field("config", &self.config)
85            .field("exec_algorithm_id", &self.exec_algorithm_id)
86            .field("exec_spawn_ids", &self.exec_spawn_ids.len())
87            .field("subscribed_strategies", &self.subscribed_strategies.len())
88            .field(
89                "pending_spawn_reductions",
90                &self.pending_spawn_reductions.len(),
91            )
92            .field(
93                "strategy_event_handlers",
94                &self.strategy_event_handlers.len(),
95            )
96            .finish()
97    }
98}
99
100impl ExecutionAlgorithmCore {
101    /// Creates a new [`ExecutionAlgorithmCore`] instance.
102    ///
103    /// # Panics
104    ///
105    /// Panics if `config.exec_algorithm_id` is `None`.
106    #[must_use]
107    pub fn new(config: ExecutionAlgorithmConfig) -> Self {
108        let exec_algorithm_id = config
109            .exec_algorithm_id
110            .expect("ExecutionAlgorithmConfig must have exec_algorithm_id set");
111
112        let actor_config = DataActorConfig {
113            actor_id: Some(ActorId::from(exec_algorithm_id.inner().as_str())),
114            log_events: config.log_events,
115            log_commands: config.log_commands,
116        };
117
118        Self {
119            actor: DataActorCore::new(actor_config),
120            config,
121            exec_algorithm_id,
122            exec_spawn_ids: AHashMap::new(),
123            subscribed_strategies: AHashSet::new(),
124            pending_spawn_reductions: AHashMap::new(),
125            strategy_event_handlers: IndexMap::new(),
126        }
127    }
128
129    /// Registers the execution algorithm with the trading engine components.
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if registration with the actor core fails.
134    pub fn register(
135        &mut self,
136        trader_id: TraderId,
137        clock: Rc<RefCell<dyn Clock>>,
138        cache: Rc<RefCell<Cache>>,
139    ) -> anyhow::Result<()> {
140        self.actor.register(trader_id, clock, cache)
141    }
142
143    /// Returns the execution algorithm ID.
144    #[must_use]
145    pub fn id(&self) -> ExecAlgorithmId {
146        self.exec_algorithm_id
147    }
148
149    /// Generates the next spawn client order ID for a primary order.
150    ///
151    /// The generated ID follows the pattern: `{primary_id}-E{sequence}`.
152    #[must_use]
153    pub fn spawn_client_order_id(&mut self, primary_id: &ClientOrderId) -> ClientOrderId {
154        let sequence = self
155            .exec_spawn_ids
156            .entry(*primary_id)
157            .and_modify(|s| *s += 1)
158            .or_insert(1);
159
160        ClientOrderId::new(format!("{primary_id}-E{sequence}"))
161    }
162
163    /// Returns the current spawn sequence for a primary order, if any.
164    #[must_use]
165    pub fn spawn_sequence(&self, primary_id: &ClientOrderId) -> Option<u32> {
166        self.exec_spawn_ids.get(primary_id).copied()
167    }
168
169    /// Checks if a strategy has been subscribed to for events.
170    #[must_use]
171    pub fn is_strategy_subscribed(&self, strategy_id: &StrategyId) -> bool {
172        self.subscribed_strategies.contains(strategy_id)
173    }
174
175    /// Marks a strategy as subscribed for events.
176    pub fn add_subscribed_strategy(&mut self, strategy_id: StrategyId) {
177        self.subscribed_strategies.insert(strategy_id);
178    }
179
180    /// Stores the event handlers for a strategy subscription.
181    pub fn store_strategy_event_handlers(
182        &mut self,
183        strategy_id: StrategyId,
184        handlers: StrategyEventHandlers,
185    ) {
186        self.strategy_event_handlers.insert(strategy_id, handlers);
187    }
188
189    /// Takes and returns all stored strategy event handlers, clearing the internal map.
190    pub fn take_strategy_event_handlers(&mut self) -> IndexMap<StrategyId, StrategyEventHandlers> {
191        std::mem::take(&mut self.strategy_event_handlers)
192    }
193
194    /// Clears all spawn tracking state.
195    pub fn clear_spawn_ids(&mut self) {
196        self.exec_spawn_ids.clear();
197    }
198
199    /// Clears all strategy subscriptions.
200    pub fn clear_subscribed_strategies(&mut self) {
201        self.subscribed_strategies.clear();
202    }
203
204    /// Tracks a pending spawn reduction for potential restoration.
205    pub fn track_pending_spawn_reduction(&mut self, spawn_id: ClientOrderId, quantity: Quantity) {
206        self.pending_spawn_reductions.insert(spawn_id, quantity);
207    }
208
209    /// Removes and returns the pending spawn reduction for an order, if any.
210    pub fn take_pending_spawn_reduction(&mut self, spawn_id: &ClientOrderId) -> Option<Quantity> {
211        self.pending_spawn_reductions.remove(spawn_id)
212    }
213
214    /// Clears all pending spawn reductions.
215    pub fn clear_pending_spawn_reductions(&mut self) {
216        self.pending_spawn_reductions.clear();
217    }
218
219    /// Resets the core to its initial state.
220    ///
221    /// Note: This clears handler storage but does NOT unsubscribe from msgbus.
222    /// Call `unsubscribe_all_strategy_events` first to properly unsubscribe.
223    pub fn reset(&mut self) {
224        self.exec_spawn_ids.clear();
225        self.subscribed_strategies.clear();
226        self.pending_spawn_reductions.clear();
227        self.strategy_event_handlers.clear();
228    }
229
230    /// Returns the order for the given client order ID from the cache.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if the order is not found in the cache.
235    pub fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
236        self.cache()
237            .order(client_order_id)
238            .cloned()
239            .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
240    }
241
242    /// Returns all orders for the given order list from the cache.
243    ///
244    /// # Errors
245    ///
246    /// Returns an error if any order is not found in the cache.
247    pub fn get_orders_for_list(&self, order_list: &OrderList) -> anyhow::Result<Vec<OrderAny>> {
248        order_list
249            .client_order_ids
250            .iter()
251            .map(|id| self.get_order(id))
252            .collect()
253    }
254}
255
256impl Deref for ExecutionAlgorithmCore {
257    type Target = DataActorCore;
258    fn deref(&self) -> &Self::Target {
259        &self.actor
260    }
261}
262
263impl DerefMut for ExecutionAlgorithmCore {
264    fn deref_mut(&mut self) -> &mut Self::Target {
265        &mut self.actor
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use rstest::rstest;
272
273    use super::*;
274
275    fn create_test_config() -> ExecutionAlgorithmConfig {
276        ExecutionAlgorithmConfig {
277            exec_algorithm_id: Some(ExecAlgorithmId::new("TWAP")),
278            ..Default::default()
279        }
280    }
281
282    #[rstest]
283    fn test_core_new() {
284        let config = create_test_config();
285        let core = ExecutionAlgorithmCore::new(config.clone());
286
287        assert_eq!(core.exec_algorithm_id, ExecAlgorithmId::new("TWAP"));
288        assert_eq!(core.config.log_events, config.log_events);
289        assert!(core.exec_spawn_ids.is_empty());
290        assert!(core.subscribed_strategies.is_empty());
291    }
292
293    #[rstest]
294    fn test_spawn_client_order_id_sequence() {
295        let config = create_test_config();
296        let mut core = ExecutionAlgorithmCore::new(config);
297
298        let primary_id = ClientOrderId::new("O-001");
299
300        let spawn1 = core.spawn_client_order_id(&primary_id);
301        assert_eq!(spawn1.as_str(), "O-001-E1");
302
303        let spawn2 = core.spawn_client_order_id(&primary_id);
304        assert_eq!(spawn2.as_str(), "O-001-E2");
305
306        let spawn3 = core.spawn_client_order_id(&primary_id);
307        assert_eq!(spawn3.as_str(), "O-001-E3");
308    }
309
310    #[rstest]
311    fn test_spawn_client_order_id_different_primaries() {
312        let config = create_test_config();
313        let mut core = ExecutionAlgorithmCore::new(config);
314
315        let primary1 = ClientOrderId::new("O-001");
316        let primary2 = ClientOrderId::new("O-002");
317
318        let spawn1_1 = core.spawn_client_order_id(&primary1);
319        let spawn2_1 = core.spawn_client_order_id(&primary2);
320        let spawn1_2 = core.spawn_client_order_id(&primary1);
321
322        assert_eq!(spawn1_1.as_str(), "O-001-E1");
323        assert_eq!(spawn2_1.as_str(), "O-002-E1");
324        assert_eq!(spawn1_2.as_str(), "O-001-E2");
325    }
326
327    #[rstest]
328    fn test_spawn_sequence() {
329        let config = create_test_config();
330        let mut core = ExecutionAlgorithmCore::new(config);
331
332        let primary_id = ClientOrderId::new("O-001");
333
334        assert_eq!(core.spawn_sequence(&primary_id), None);
335
336        let _ = core.spawn_client_order_id(&primary_id);
337        assert_eq!(core.spawn_sequence(&primary_id), Some(1));
338
339        let _ = core.spawn_client_order_id(&primary_id);
340        assert_eq!(core.spawn_sequence(&primary_id), Some(2));
341    }
342
343    #[rstest]
344    fn test_strategy_subscription_tracking() {
345        let config = create_test_config();
346        let mut core = ExecutionAlgorithmCore::new(config);
347
348        let strategy_id = StrategyId::new("TEST-001");
349
350        assert!(!core.is_strategy_subscribed(&strategy_id));
351
352        core.add_subscribed_strategy(strategy_id);
353        assert!(core.is_strategy_subscribed(&strategy_id));
354    }
355
356    #[rstest]
357    fn test_clear_spawn_ids() {
358        let config = create_test_config();
359        let mut core = ExecutionAlgorithmCore::new(config);
360
361        let primary_id = ClientOrderId::new("O-001");
362        let _ = core.spawn_client_order_id(&primary_id);
363
364        assert!(core.spawn_sequence(&primary_id).is_some());
365
366        core.clear_spawn_ids();
367        assert!(core.spawn_sequence(&primary_id).is_none());
368    }
369
370    #[rstest]
371    fn test_reset() {
372        let config = create_test_config();
373        let mut core = ExecutionAlgorithmCore::new(config);
374
375        let primary_id = ClientOrderId::new("O-001");
376        let strategy_id = StrategyId::new("TEST-001");
377
378        let _ = core.spawn_client_order_id(&primary_id);
379        core.add_subscribed_strategy(strategy_id);
380
381        core.reset();
382
383        assert!(core.spawn_sequence(&primary_id).is_none());
384        assert!(!core.is_strategy_subscribed(&strategy_id));
385    }
386
387    #[rstest]
388    fn test_deref_to_data_actor_core() {
389        let config = create_test_config();
390        let core = ExecutionAlgorithmCore::new(config);
391
392        // Should be able to access DataActorCore methods via Deref
393        assert!(core.trader_id().is_none());
394    }
395}