nautilus_trading/algorithm/
core.rs1use std::{
19 cell::RefCell,
20 fmt::Debug,
21 ops::{Deref, DerefMut},
22 rc::Rc,
23};
24
25use ahash::{AHashMap, AHashSet};
26use indexmap::IndexMap;
27use nautilus_common::{
28 actor::{DataActorConfig, DataActorCore},
29 cache::Cache,
30 clock::Clock,
31 msgbus::TypedHandler,
32};
33use nautilus_model::{
34 events::{OrderEventAny, PositionEvent},
35 identifiers::{ActorId, ClientOrderId, ExecAlgorithmId, StrategyId, TraderId},
36 orders::{OrderAny, OrderList},
37 types::Quantity,
38};
39
40use super::config::ExecutionAlgorithmConfig;
41
42#[derive(Clone, Debug)]
44pub struct StrategyEventHandlers {
45 pub order_topic: String,
47 pub order_handler: TypedHandler<OrderEventAny>,
49 pub position_topic: String,
51 pub position_handler: TypedHandler<PositionEvent>,
53}
54
55pub struct ExecutionAlgorithmCore {
64 pub actor: DataActorCore,
66 pub config: ExecutionAlgorithmConfig,
68 pub exec_algorithm_id: ExecAlgorithmId,
70 exec_spawn_ids: AHashMap<ClientOrderId, u32>,
72 subscribed_strategies: AHashSet<StrategyId>,
74 pending_spawn_reductions: AHashMap<ClientOrderId, Quantity>,
76 strategy_event_handlers: IndexMap<StrategyId, StrategyEventHandlers>,
78}
79
80impl Debug for ExecutionAlgorithmCore {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 f.debug_struct(stringify!(ExecutionAlgorithmCore))
83 .field("actor", &self.actor)
84 .field("config", &self.config)
85 .field("exec_algorithm_id", &self.exec_algorithm_id)
86 .field("exec_spawn_ids", &self.exec_spawn_ids.len())
87 .field("subscribed_strategies", &self.subscribed_strategies.len())
88 .field(
89 "pending_spawn_reductions",
90 &self.pending_spawn_reductions.len(),
91 )
92 .field(
93 "strategy_event_handlers",
94 &self.strategy_event_handlers.len(),
95 )
96 .finish()
97 }
98}
99
100impl ExecutionAlgorithmCore {
101 #[must_use]
107 pub fn new(config: ExecutionAlgorithmConfig) -> Self {
108 let exec_algorithm_id = config
109 .exec_algorithm_id
110 .expect("ExecutionAlgorithmConfig must have exec_algorithm_id set");
111
112 let actor_config = DataActorConfig {
113 actor_id: Some(ActorId::from(exec_algorithm_id.inner().as_str())),
114 log_events: config.log_events,
115 log_commands: config.log_commands,
116 };
117
118 Self {
119 actor: DataActorCore::new(actor_config),
120 config,
121 exec_algorithm_id,
122 exec_spawn_ids: AHashMap::new(),
123 subscribed_strategies: AHashSet::new(),
124 pending_spawn_reductions: AHashMap::new(),
125 strategy_event_handlers: IndexMap::new(),
126 }
127 }
128
129 pub fn register(
135 &mut self,
136 trader_id: TraderId,
137 clock: Rc<RefCell<dyn Clock>>,
138 cache: Rc<RefCell<Cache>>,
139 ) -> anyhow::Result<()> {
140 self.actor.register(trader_id, clock, cache)
141 }
142
143 #[must_use]
145 pub fn id(&self) -> ExecAlgorithmId {
146 self.exec_algorithm_id
147 }
148
149 #[must_use]
153 pub fn spawn_client_order_id(&mut self, primary_id: &ClientOrderId) -> ClientOrderId {
154 let sequence = self
155 .exec_spawn_ids
156 .entry(*primary_id)
157 .and_modify(|s| *s += 1)
158 .or_insert(1);
159
160 ClientOrderId::new(format!("{primary_id}-E{sequence}"))
161 }
162
163 #[must_use]
165 pub fn spawn_sequence(&self, primary_id: &ClientOrderId) -> Option<u32> {
166 self.exec_spawn_ids.get(primary_id).copied()
167 }
168
169 #[must_use]
171 pub fn is_strategy_subscribed(&self, strategy_id: &StrategyId) -> bool {
172 self.subscribed_strategies.contains(strategy_id)
173 }
174
175 pub fn add_subscribed_strategy(&mut self, strategy_id: StrategyId) {
177 self.subscribed_strategies.insert(strategy_id);
178 }
179
180 pub fn store_strategy_event_handlers(
182 &mut self,
183 strategy_id: StrategyId,
184 handlers: StrategyEventHandlers,
185 ) {
186 self.strategy_event_handlers.insert(strategy_id, handlers);
187 }
188
189 pub fn take_strategy_event_handlers(&mut self) -> IndexMap<StrategyId, StrategyEventHandlers> {
191 std::mem::take(&mut self.strategy_event_handlers)
192 }
193
194 pub fn clear_spawn_ids(&mut self) {
196 self.exec_spawn_ids.clear();
197 }
198
199 pub fn clear_subscribed_strategies(&mut self) {
201 self.subscribed_strategies.clear();
202 }
203
204 pub fn track_pending_spawn_reduction(&mut self, spawn_id: ClientOrderId, quantity: Quantity) {
206 self.pending_spawn_reductions.insert(spawn_id, quantity);
207 }
208
209 pub fn take_pending_spawn_reduction(&mut self, spawn_id: &ClientOrderId) -> Option<Quantity> {
211 self.pending_spawn_reductions.remove(spawn_id)
212 }
213
214 pub fn clear_pending_spawn_reductions(&mut self) {
216 self.pending_spawn_reductions.clear();
217 }
218
219 pub fn reset(&mut self) {
224 self.exec_spawn_ids.clear();
225 self.subscribed_strategies.clear();
226 self.pending_spawn_reductions.clear();
227 self.strategy_event_handlers.clear();
228 }
229
230 pub fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
236 self.cache()
237 .order(client_order_id)
238 .cloned()
239 .ok_or_else(|| anyhow::anyhow!("Order not found in cache for {client_order_id}"))
240 }
241
242 pub fn get_orders_for_list(&self, order_list: &OrderList) -> anyhow::Result<Vec<OrderAny>> {
248 order_list
249 .client_order_ids
250 .iter()
251 .map(|id| self.get_order(id))
252 .collect()
253 }
254}
255
256impl Deref for ExecutionAlgorithmCore {
257 type Target = DataActorCore;
258 fn deref(&self) -> &Self::Target {
259 &self.actor
260 }
261}
262
263impl DerefMut for ExecutionAlgorithmCore {
264 fn deref_mut(&mut self) -> &mut Self::Target {
265 &mut self.actor
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use rstest::rstest;
272
273 use super::*;
274
275 fn create_test_config() -> ExecutionAlgorithmConfig {
276 ExecutionAlgorithmConfig {
277 exec_algorithm_id: Some(ExecAlgorithmId::new("TWAP")),
278 ..Default::default()
279 }
280 }
281
282 #[rstest]
283 fn test_core_new() {
284 let config = create_test_config();
285 let core = ExecutionAlgorithmCore::new(config.clone());
286
287 assert_eq!(core.exec_algorithm_id, ExecAlgorithmId::new("TWAP"));
288 assert_eq!(core.config.log_events, config.log_events);
289 assert!(core.exec_spawn_ids.is_empty());
290 assert!(core.subscribed_strategies.is_empty());
291 }
292
293 #[rstest]
294 fn test_spawn_client_order_id_sequence() {
295 let config = create_test_config();
296 let mut core = ExecutionAlgorithmCore::new(config);
297
298 let primary_id = ClientOrderId::new("O-001");
299
300 let spawn1 = core.spawn_client_order_id(&primary_id);
301 assert_eq!(spawn1.as_str(), "O-001-E1");
302
303 let spawn2 = core.spawn_client_order_id(&primary_id);
304 assert_eq!(spawn2.as_str(), "O-001-E2");
305
306 let spawn3 = core.spawn_client_order_id(&primary_id);
307 assert_eq!(spawn3.as_str(), "O-001-E3");
308 }
309
310 #[rstest]
311 fn test_spawn_client_order_id_different_primaries() {
312 let config = create_test_config();
313 let mut core = ExecutionAlgorithmCore::new(config);
314
315 let primary1 = ClientOrderId::new("O-001");
316 let primary2 = ClientOrderId::new("O-002");
317
318 let spawn1_1 = core.spawn_client_order_id(&primary1);
319 let spawn2_1 = core.spawn_client_order_id(&primary2);
320 let spawn1_2 = core.spawn_client_order_id(&primary1);
321
322 assert_eq!(spawn1_1.as_str(), "O-001-E1");
323 assert_eq!(spawn2_1.as_str(), "O-002-E1");
324 assert_eq!(spawn1_2.as_str(), "O-001-E2");
325 }
326
327 #[rstest]
328 fn test_spawn_sequence() {
329 let config = create_test_config();
330 let mut core = ExecutionAlgorithmCore::new(config);
331
332 let primary_id = ClientOrderId::new("O-001");
333
334 assert_eq!(core.spawn_sequence(&primary_id), None);
335
336 let _ = core.spawn_client_order_id(&primary_id);
337 assert_eq!(core.spawn_sequence(&primary_id), Some(1));
338
339 let _ = core.spawn_client_order_id(&primary_id);
340 assert_eq!(core.spawn_sequence(&primary_id), Some(2));
341 }
342
343 #[rstest]
344 fn test_strategy_subscription_tracking() {
345 let config = create_test_config();
346 let mut core = ExecutionAlgorithmCore::new(config);
347
348 let strategy_id = StrategyId::new("TEST-001");
349
350 assert!(!core.is_strategy_subscribed(&strategy_id));
351
352 core.add_subscribed_strategy(strategy_id);
353 assert!(core.is_strategy_subscribed(&strategy_id));
354 }
355
356 #[rstest]
357 fn test_clear_spawn_ids() {
358 let config = create_test_config();
359 let mut core = ExecutionAlgorithmCore::new(config);
360
361 let primary_id = ClientOrderId::new("O-001");
362 let _ = core.spawn_client_order_id(&primary_id);
363
364 assert!(core.spawn_sequence(&primary_id).is_some());
365
366 core.clear_spawn_ids();
367 assert!(core.spawn_sequence(&primary_id).is_none());
368 }
369
370 #[rstest]
371 fn test_reset() {
372 let config = create_test_config();
373 let mut core = ExecutionAlgorithmCore::new(config);
374
375 let primary_id = ClientOrderId::new("O-001");
376 let strategy_id = StrategyId::new("TEST-001");
377
378 let _ = core.spawn_client_order_id(&primary_id);
379 core.add_subscribed_strategy(strategy_id);
380
381 core.reset();
382
383 assert!(core.spawn_sequence(&primary_id).is_none());
384 assert!(!core.is_strategy_subscribed(&strategy_id));
385 }
386
387 #[rstest]
388 fn test_deref_to_data_actor_core() {
389 let config = create_test_config();
390 let core = ExecutionAlgorithmCore::new(config);
391
392 assert!(core.trader_id().is_none());
394 }
395}