nautilus_backtest/python/
node.rs1use std::collections::HashMap;
19
20use nautilus_common::{actor::data_actor::ImportableActorConfig, python::actor::PyDataActor};
21use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
22use nautilus_model::identifiers::{ActorId, ComponentId, StrategyId};
23use nautilus_trading::{
24 ImportableStrategyConfig,
25 python::strategy::{PyStrategy, PyStrategyInner},
26};
27use pyo3::{prelude::*, types::PyDict};
28
29use crate::{config::BacktestRunConfig, node::BacktestNode, result::BacktestResult};
30
31#[pyo3_stub_gen::derive::gen_stub_pymethods]
32#[pymethods]
33impl BacktestNode {
34 #[new]
39 fn py_new(configs: Vec<BacktestRunConfig>) -> PyResult<Self> {
40 Self::new(configs).map_err(to_pyruntime_err)
41 }
42
43 #[pyo3(name = "build")]
52 fn py_build(&mut self) -> PyResult<()> {
53 self.build().map_err(to_pyruntime_err)
54 }
55
56 #[pyo3(name = "run")]
66 fn py_run(&mut self) -> PyResult<Vec<BacktestResult>> {
67 self.run().map_err(to_pyruntime_err)
68 }
69
70 #[pyo3(name = "dispose")]
72 fn py_dispose(&mut self) {
73 self.dispose();
74 }
75
76 #[allow(
77 unsafe_code,
78 reason = "Required for Python actor component registration"
79 )]
80 #[pyo3(name = "add_actor_from_config")]
81 #[expect(clippy::needless_pass_by_value)]
82 fn py_add_actor_from_config(
83 &mut self,
84 _py: Python,
85 run_config_id: &str,
86 config: ImportableActorConfig,
87 ) -> PyResult<()> {
88 log::debug!("`add_actor_from_config` with: {config:?}");
89
90 let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
91 to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
92 })?;
93
94 let parts: Vec<&str> = config.actor_path.split(':').collect();
95 if parts.len() != 2 {
96 return Err(to_pyvalue_err(
97 "actor_path must be in format 'module.path:ClassName'",
98 ));
99 }
100 let (module_name, class_name) = (parts[0], parts[1]);
101
102 log::info!("Importing actor from module: {module_name} class: {class_name}");
103
104 let (python_actor, actor_id) =
106 Python::attach(|py| -> anyhow::Result<(Py<PyAny>, ActorId)> {
107 let actor_module = py
108 .import(module_name)
109 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
110 let actor_class = actor_module
111 .getattr(class_name)
112 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
113
114 let config_instance =
115 create_config_instance(py, &config.config_path, &config.config)?;
116
117 let python_actor = if let Some(config_obj) = config_instance.clone() {
118 actor_class.call1((config_obj,))?
119 } else {
120 actor_class.call0()?
121 };
122
123 log::debug!("Created Python actor instance: {python_actor:?}");
124
125 let mut py_data_actor_ref = python_actor
126 .extract::<PyRefMut<PyDataActor>>()
127 .map_err(Into::<PyErr>::into)
128 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
129
130 if let Some(config_obj) = config_instance.as_ref() {
132 if let Ok(actor_id) = config_obj.getattr("actor_id")
133 && !actor_id.is_none()
134 {
135 let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
136 actor_id_val
137 } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
138 ActorId::new_checked(&actor_id_str)?
139 } else {
140 anyhow::bail!("Invalid `actor_id` type");
141 };
142 py_data_actor_ref.set_actor_id(actor_id_val);
143 }
144
145 if let Ok(log_events) = config_obj.getattr("log_events")
146 && let Ok(log_events_val) = log_events.extract::<bool>()
147 {
148 py_data_actor_ref.set_log_events(log_events_val);
149 }
150
151 if let Ok(log_commands) = config_obj.getattr("log_commands")
152 && let Ok(log_commands_val) = log_commands.extract::<bool>()
153 {
154 py_data_actor_ref.set_log_commands(log_commands_val);
155 }
156 }
157
158 py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
159
160 let actor_id = py_data_actor_ref.actor_id();
161
162 Ok((python_actor.unbind(), actor_id))
163 })
164 .map_err(to_pyruntime_err)?;
165
166 if engine
168 .kernel()
169 .trader
170 .borrow()
171 .actor_ids()
172 .contains(&actor_id)
173 {
174 return Err(to_pyruntime_err(format!(
175 "Actor '{actor_id}' is already registered"
176 )));
177 }
178
179 let trader_id = engine.kernel().config.trader_id();
182 let cache = engine.kernel().cache.clone();
183 let component_id = ComponentId::new(actor_id.inner().as_str());
184 let clock = engine
185 .kernel_mut()
186 .trader
187 .borrow_mut()
188 .create_component_clock(component_id);
189
190 Python::attach(|py| -> anyhow::Result<()> {
192 let py_actor = python_actor.bind(py);
193 let mut py_data_actor_ref = py_actor
194 .extract::<PyRefMut<PyDataActor>>()
195 .map_err(Into::<PyErr>::into)
196 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
197
198 py_data_actor_ref
199 .register(trader_id, clock, cache)
200 .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
201
202 log::debug!(
203 "Internal PyDataActor registered: {}, state: {:?}",
204 py_data_actor_ref.is_registered(),
205 py_data_actor_ref.state()
206 );
207
208 Ok(())
209 })
210 .map_err(to_pyruntime_err)?;
211
212 Python::attach(|py| -> anyhow::Result<()> {
214 let py_actor = python_actor.bind(py);
215 let py_data_actor_ref = py_actor
216 .cast::<PyDataActor>()
217 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
218 py_data_actor_ref.borrow().register_in_global_registries();
219 Ok(())
220 })
221 .map_err(to_pyruntime_err)?;
222
223 engine
224 .kernel_mut()
225 .trader
226 .borrow_mut()
227 .add_actor_id_for_lifecycle(actor_id)
228 .map_err(to_pyruntime_err)?;
229
230 log::info!("Registered Python actor {actor_id}");
231 Ok(())
232 }
233
234 #[allow(
235 unsafe_code,
236 reason = "Required for Python strategy component registration"
237 )]
238 #[pyo3(name = "add_strategy_from_config")]
239 #[expect(clippy::needless_pass_by_value)]
240 fn py_add_strategy_from_config(
241 &mut self,
242 _py: Python,
243 run_config_id: &str,
244 config: ImportableStrategyConfig,
245 ) -> PyResult<()> {
246 log::debug!("`add_strategy_from_config` with: {config:?}");
247
248 let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
249 to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
250 })?;
251
252 let parts: Vec<&str> = config.strategy_path.split(':').collect();
253 if parts.len() != 2 {
254 return Err(to_pyvalue_err(
255 "strategy_path must be in format 'module.path:ClassName'",
256 ));
257 }
258 let (module_name, class_name) = (parts[0], parts[1]);
259
260 log::info!("Importing strategy from module: {module_name} class: {class_name}");
261
262 let (python_strategy, strategy_id) =
264 Python::attach(|py| -> anyhow::Result<(Py<PyAny>, StrategyId)> {
265 let strategy_module = py
266 .import(module_name)
267 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
268 let strategy_class = strategy_module
269 .getattr(class_name)
270 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
271
272 let config_instance =
273 create_config_instance(py, &config.config_path, &config.config)?;
274
275 let python_strategy = if let Some(config_obj) = config_instance.clone() {
276 strategy_class.call1((config_obj,))?
277 } else {
278 strategy_class.call0()?
279 };
280
281 log::debug!("Created Python strategy instance: {python_strategy:?}");
282
283 let mut py_strategy_ref = python_strategy
284 .extract::<PyRefMut<PyStrategy>>()
285 .map_err(Into::<PyErr>::into)
286 .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
287
288 if let Some(config_obj) = config_instance.as_ref() {
290 if let Ok(strategy_id) = config_obj.getattr("strategy_id")
291 && !strategy_id.is_none()
292 {
293 let strategy_id_val = if let Ok(sid) = strategy_id.extract::<StrategyId>() {
294 sid
295 } else if let Ok(sid_str) = strategy_id.extract::<String>() {
296 StrategyId::new_checked(&sid_str)?
297 } else {
298 anyhow::bail!("Invalid `strategy_id` type");
299 };
300 py_strategy_ref.set_strategy_id(strategy_id_val);
301 }
302
303 if let Ok(log_events) = config_obj.getattr("log_events")
304 && let Ok(log_events_val) = log_events.extract::<bool>()
305 {
306 py_strategy_ref.set_log_events(log_events_val);
307 }
308
309 if let Ok(log_commands) = config_obj.getattr("log_commands")
310 && let Ok(log_commands_val) = log_commands.extract::<bool>()
311 {
312 py_strategy_ref.set_log_commands(log_commands_val);
313 }
314 }
315
316 py_strategy_ref.set_python_instance(python_strategy.clone().unbind());
317
318 let strategy_id = py_strategy_ref.strategy_id();
319
320 Ok((python_strategy.unbind(), strategy_id))
321 })
322 .map_err(to_pyruntime_err)?;
323
324 if engine
326 .kernel()
327 .trader
328 .borrow()
329 .strategy_ids()
330 .contains(&strategy_id)
331 {
332 return Err(to_pyruntime_err(format!(
333 "Strategy '{strategy_id}' is already registered"
334 )));
335 }
336
337 let trader_id = engine.kernel().config.trader_id();
340 let cache = engine.kernel().cache.clone();
341 let portfolio = engine.kernel().portfolio.clone();
342 let component_id = ComponentId::new(strategy_id.inner().as_str());
343 let clock = engine
344 .kernel_mut()
345 .trader
346 .borrow_mut()
347 .create_component_clock(component_id);
348
349 Python::attach(|py| -> anyhow::Result<()> {
351 let py_strategy = python_strategy.bind(py);
352 let mut py_strategy_ref = py_strategy
353 .extract::<PyRefMut<PyStrategy>>()
354 .map_err(Into::<PyErr>::into)
355 .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
356
357 py_strategy_ref
358 .register(trader_id, clock, cache, portfolio)
359 .map_err(|e| anyhow::anyhow!("Failed to register PyStrategy: {e}"))?;
360
361 log::debug!(
362 "Internal PyStrategy registered: {}",
363 py_strategy_ref.is_registered()
364 );
365
366 Ok(())
367 })
368 .map_err(to_pyruntime_err)?;
369
370 Python::attach(|py| -> anyhow::Result<()> {
372 let py_strategy = python_strategy.bind(py);
373 let py_strategy_ref = py_strategy
374 .cast::<PyStrategy>()
375 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyStrategy: {e}"))?;
376 py_strategy_ref.borrow().register_in_global_registries();
377 Ok(())
378 })
379 .map_err(to_pyruntime_err)?;
380
381 engine
382 .kernel_mut()
383 .trader
384 .borrow_mut()
385 .add_strategy_id_with_subscriptions::<PyStrategyInner>(strategy_id)
386 .map_err(to_pyruntime_err)?;
387
388 log::info!("Registered Python strategy {strategy_id}");
389 Ok(())
390 }
391
392 fn __repr__(&self) -> String {
393 format!("{self:?}")
394 }
395}
396
397pub(crate) fn create_config_instance<'py>(
398 py: Python<'py>,
399 config_path: &str,
400 config: &HashMap<String, serde_json::Value>,
401) -> anyhow::Result<Option<Bound<'py, PyAny>>> {
402 if config_path.is_empty() && config.is_empty() {
403 log::debug!("No config_path or empty config, using None");
404 return Ok(None);
405 }
406
407 let config_parts: Vec<&str> = config_path.split(':').collect();
408 if config_parts.len() != 2 {
409 anyhow::bail!("config_path must be in format 'module.path:ClassName', was {config_path}");
410 }
411 let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
412
413 log::debug!(
414 "Importing config class from module: {config_module_name} class: {config_class_name}"
415 );
416
417 let config_module = py
418 .import(config_module_name)
419 .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
420 let config_class = config_module
421 .getattr(config_class_name)
422 .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
423
424 let py_dict = PyDict::new(py);
426
427 for (key, value) in config {
428 let json_str = serde_json::to_string(value)
429 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
430 let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
431 py_dict.set_item(key, py_value)?;
432 }
433
434 log::debug!("Created config dict: {py_dict:?}");
435
436 let config_instance = match config_class.call((), Some(&py_dict)) {
438 Ok(instance) => {
439 log::debug!("Created config instance with kwargs");
440 instance
441 }
442 Err(kwargs_err) => {
443 log::debug!("Failed to create config with kwargs: {kwargs_err}");
444
445 match config_class.call0() {
446 Ok(instance) => {
447 log::debug!("Created default config instance, setting attributes");
448 for (key, value) in config {
449 let json_str = serde_json::to_string(value).map_err(|e| {
450 anyhow::anyhow!("Failed to serialize config value: {e}")
451 })?;
452 let py_value = PyModule::import(py, "json")?.call_method(
453 "loads",
454 (json_str,),
455 None,
456 )?;
457
458 if let Err(setattr_err) = instance.setattr(key, py_value) {
459 log::warn!("Failed to set attribute {key}: {setattr_err}");
460 }
461 }
462
463 if instance.hasattr("__post_init__")? {
466 instance.call_method0("__post_init__")?;
467 }
468
469 instance
470 }
471 Err(default_err) => {
472 anyhow::bail!(
473 "Failed to create config instance. \
474 Tried kwargs: {kwargs_err}, default: {default_err}"
475 );
476 }
477 }
478 }
479 };
480
481 log::debug!("Created config instance: {config_instance:?}");
482
483 Ok(Some(config_instance))
484}