Skip to main content

nautilus_backtest/python/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for backtest node.
17
18use std::collections::HashMap;
19
20use nautilus_common::{actor::data_actor::ImportableActorConfig, python::actor::PyDataActor};
21use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
22use nautilus_model::identifiers::{ActorId, ComponentId, StrategyId};
23use nautilus_trading::{
24    ImportableStrategyConfig,
25    python::strategy::{PyStrategy, PyStrategyInner},
26};
27use pyo3::{prelude::*, types::PyDict};
28
29use crate::{config::BacktestRunConfig, node::BacktestNode, result::BacktestResult};
30
31#[pyo3_stub_gen::derive::gen_stub_pymethods]
32#[pymethods]
33impl BacktestNode {
34    /// Orchestrates catalog-driven backtests from run configurations.
35    ///
36    /// `BacktestNode` connects the `ParquetDataCatalog` with `BacktestEngine` to load
37    /// historical data and run backtests. Supports both oneshot and streaming modes.
38    #[new]
39    fn py_new(configs: Vec<BacktestRunConfig>) -> PyResult<Self> {
40        Self::new(configs).map_err(to_pyruntime_err)
41    }
42
43    /// Builds backtest engines from the run configurations.
44    ///
45    /// For each config, creates a `BacktestEngine`, adds venues, and loads
46    /// instruments from the catalog.
47    ///
48    /// # Errors
49    ///
50    /// Returns an error if engine creation, venue setup, or instrument loading fails.
51    #[pyo3(name = "build")]
52    fn py_build(&mut self) -> PyResult<()> {
53        self.build().map_err(to_pyruntime_err)
54    }
55
56    /// Runs all configured backtests and returns results.
57    ///
58    /// Automatically calls `build()` if engines have not been created yet.
59    /// For each run config, loads data from the catalog and runs the engine.
60    /// Supports both oneshot (`chunk_size = None`) and streaming modes.
61    ///
62    /// # Errors
63    ///
64    /// Returns an error if building, data loading, or engine execution fails.
65    #[pyo3(name = "run")]
66    fn py_run(&mut self) -> PyResult<Vec<BacktestResult>> {
67        self.run().map_err(to_pyruntime_err)
68    }
69
70    /// Disposes all engines and releases resources.
71    #[pyo3(name = "dispose")]
72    fn py_dispose(&mut self) {
73        self.dispose();
74    }
75
76    #[allow(
77        unsafe_code,
78        reason = "Required for Python actor component registration"
79    )]
80    #[pyo3(name = "add_actor_from_config")]
81    #[expect(clippy::needless_pass_by_value)]
82    fn py_add_actor_from_config(
83        &mut self,
84        _py: Python,
85        run_config_id: &str,
86        config: ImportableActorConfig,
87    ) -> PyResult<()> {
88        log::debug!("`add_actor_from_config` with: {config:?}");
89
90        let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
91            to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
92        })?;
93
94        let parts: Vec<&str> = config.actor_path.split(':').collect();
95        if parts.len() != 2 {
96            return Err(to_pyvalue_err(
97                "actor_path must be in format 'module.path:ClassName'",
98            ));
99        }
100        let (module_name, class_name) = (parts[0], parts[1]);
101
102        log::info!("Importing actor from module: {module_name} class: {class_name}");
103
104        // Phase 1: Create and configure the Python actor, extract its actor_id
105        let (python_actor, actor_id) =
106            Python::attach(|py| -> anyhow::Result<(Py<PyAny>, ActorId)> {
107                let actor_module = py
108                    .import(module_name)
109                    .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
110                let actor_class = actor_module
111                    .getattr(class_name)
112                    .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
113
114                let config_instance =
115                    create_config_instance(py, &config.config_path, &config.config)?;
116
117                let python_actor = if let Some(config_obj) = config_instance.clone() {
118                    actor_class.call1((config_obj,))?
119                } else {
120                    actor_class.call0()?
121                };
122
123                log::debug!("Created Python actor instance: {python_actor:?}");
124
125                let mut py_data_actor_ref = python_actor
126                    .extract::<PyRefMut<PyDataActor>>()
127                    .map_err(Into::<PyErr>::into)
128                    .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
129
130                // Extract inherited config fields from the Python config
131                if let Some(config_obj) = config_instance.as_ref() {
132                    if let Ok(actor_id) = config_obj.getattr("actor_id")
133                        && !actor_id.is_none()
134                    {
135                        let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
136                            actor_id_val
137                        } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
138                            ActorId::new_checked(&actor_id_str)?
139                        } else {
140                            anyhow::bail!("Invalid `actor_id` type");
141                        };
142                        py_data_actor_ref.set_actor_id(actor_id_val);
143                    }
144
145                    if let Ok(log_events) = config_obj.getattr("log_events")
146                        && let Ok(log_events_val) = log_events.extract::<bool>()
147                    {
148                        py_data_actor_ref.set_log_events(log_events_val);
149                    }
150
151                    if let Ok(log_commands) = config_obj.getattr("log_commands")
152                        && let Ok(log_commands_val) = log_commands.extract::<bool>()
153                    {
154                        py_data_actor_ref.set_log_commands(log_commands_val);
155                    }
156                }
157
158                py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
159
160                let actor_id = py_data_actor_ref.actor_id();
161
162                Ok((python_actor.unbind(), actor_id))
163            })
164            .map_err(to_pyruntime_err)?;
165
166        // Validate no duplicate before any mutations
167        if engine
168            .kernel()
169            .trader
170            .borrow()
171            .actor_ids()
172            .contains(&actor_id)
173        {
174            return Err(to_pyruntime_err(format!(
175                "Actor '{actor_id}' is already registered"
176            )));
177        }
178
179        // Phase 2: Create per-component clock via the trader (individual
180        // TestClock in backtest so each actor gets its own default timer handler)
181        let trader_id = engine.kernel().config.trader_id();
182        let cache = engine.kernel().cache.clone();
183        let component_id = ComponentId::new(actor_id.inner().as_str());
184        let clock = engine
185            .kernel_mut()
186            .trader
187            .borrow_mut()
188            .create_component_clock(component_id);
189
190        // Phase 3: Register the actor with its dedicated clock
191        Python::attach(|py| -> anyhow::Result<()> {
192            let py_actor = python_actor.bind(py);
193            let mut py_data_actor_ref = py_actor
194                .extract::<PyRefMut<PyDataActor>>()
195                .map_err(Into::<PyErr>::into)
196                .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
197
198            py_data_actor_ref
199                .register(trader_id, clock, cache)
200                .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
201
202            log::debug!(
203                "Internal PyDataActor registered: {}, state: {:?}",
204                py_data_actor_ref.is_registered(),
205                py_data_actor_ref.state()
206            );
207
208            Ok(())
209        })
210        .map_err(to_pyruntime_err)?;
211
212        // Phase 4: Register in global registries and track for lifecycle
213        Python::attach(|py| -> anyhow::Result<()> {
214            let py_actor = python_actor.bind(py);
215            let py_data_actor_ref = py_actor
216                .cast::<PyDataActor>()
217                .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
218            py_data_actor_ref.borrow().register_in_global_registries();
219            Ok(())
220        })
221        .map_err(to_pyruntime_err)?;
222
223        engine
224            .kernel_mut()
225            .trader
226            .borrow_mut()
227            .add_actor_id_for_lifecycle(actor_id)
228            .map_err(to_pyruntime_err)?;
229
230        log::info!("Registered Python actor {actor_id}");
231        Ok(())
232    }
233
234    #[allow(
235        unsafe_code,
236        reason = "Required for Python strategy component registration"
237    )]
238    #[pyo3(name = "add_strategy_from_config")]
239    #[expect(clippy::needless_pass_by_value)]
240    fn py_add_strategy_from_config(
241        &mut self,
242        _py: Python,
243        run_config_id: &str,
244        config: ImportableStrategyConfig,
245    ) -> PyResult<()> {
246        log::debug!("`add_strategy_from_config` with: {config:?}");
247
248        let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
249            to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
250        })?;
251
252        let parts: Vec<&str> = config.strategy_path.split(':').collect();
253        if parts.len() != 2 {
254            return Err(to_pyvalue_err(
255                "strategy_path must be in format 'module.path:ClassName'",
256            ));
257        }
258        let (module_name, class_name) = (parts[0], parts[1]);
259
260        log::info!("Importing strategy from module: {module_name} class: {class_name}");
261
262        // Phase 1: Create and configure the Python strategy, extract its strategy_id
263        let (python_strategy, strategy_id) =
264            Python::attach(|py| -> anyhow::Result<(Py<PyAny>, StrategyId)> {
265                let strategy_module = py
266                    .import(module_name)
267                    .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
268                let strategy_class = strategy_module
269                    .getattr(class_name)
270                    .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
271
272                let config_instance =
273                    create_config_instance(py, &config.config_path, &config.config)?;
274
275                let python_strategy = if let Some(config_obj) = config_instance.clone() {
276                    strategy_class.call1((config_obj,))?
277                } else {
278                    strategy_class.call0()?
279                };
280
281                log::debug!("Created Python strategy instance: {python_strategy:?}");
282
283                let mut py_strategy_ref = python_strategy
284                    .extract::<PyRefMut<PyStrategy>>()
285                    .map_err(Into::<PyErr>::into)
286                    .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
287
288                // Extract inherited config fields from the Python config
289                if let Some(config_obj) = config_instance.as_ref() {
290                    if let Ok(strategy_id) = config_obj.getattr("strategy_id")
291                        && !strategy_id.is_none()
292                    {
293                        let strategy_id_val = if let Ok(sid) = strategy_id.extract::<StrategyId>() {
294                            sid
295                        } else if let Ok(sid_str) = strategy_id.extract::<String>() {
296                            StrategyId::new_checked(&sid_str)?
297                        } else {
298                            anyhow::bail!("Invalid `strategy_id` type");
299                        };
300                        py_strategy_ref.set_strategy_id(strategy_id_val);
301                    }
302
303                    if let Ok(log_events) = config_obj.getattr("log_events")
304                        && let Ok(log_events_val) = log_events.extract::<bool>()
305                    {
306                        py_strategy_ref.set_log_events(log_events_val);
307                    }
308
309                    if let Ok(log_commands) = config_obj.getattr("log_commands")
310                        && let Ok(log_commands_val) = log_commands.extract::<bool>()
311                    {
312                        py_strategy_ref.set_log_commands(log_commands_val);
313                    }
314                }
315
316                py_strategy_ref.set_python_instance(python_strategy.clone().unbind());
317
318                let strategy_id = py_strategy_ref.strategy_id();
319
320                Ok((python_strategy.unbind(), strategy_id))
321            })
322            .map_err(to_pyruntime_err)?;
323
324        // Validate no duplicate before any mutations
325        if engine
326            .kernel()
327            .trader
328            .borrow()
329            .strategy_ids()
330            .contains(&strategy_id)
331        {
332            return Err(to_pyruntime_err(format!(
333                "Strategy '{strategy_id}' is already registered"
334            )));
335        }
336
337        // Phase 2: Create per-component clock via the trader (individual
338        // TestClock in backtest so each strategy gets its own default timer handler)
339        let trader_id = engine.kernel().config.trader_id();
340        let cache = engine.kernel().cache.clone();
341        let portfolio = engine.kernel().portfolio.clone();
342        let component_id = ComponentId::new(strategy_id.inner().as_str());
343        let clock = engine
344            .kernel_mut()
345            .trader
346            .borrow_mut()
347            .create_component_clock(component_id);
348
349        // Phase 3: Register the strategy with its dedicated clock
350        Python::attach(|py| -> anyhow::Result<()> {
351            let py_strategy = python_strategy.bind(py);
352            let mut py_strategy_ref = py_strategy
353                .extract::<PyRefMut<PyStrategy>>()
354                .map_err(Into::<PyErr>::into)
355                .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
356
357            py_strategy_ref
358                .register(trader_id, clock, cache, portfolio)
359                .map_err(|e| anyhow::anyhow!("Failed to register PyStrategy: {e}"))?;
360
361            log::debug!(
362                "Internal PyStrategy registered: {}",
363                py_strategy_ref.is_registered()
364            );
365
366            Ok(())
367        })
368        .map_err(to_pyruntime_err)?;
369
370        // Phase 4: Register in global registries and install event subscriptions
371        Python::attach(|py| -> anyhow::Result<()> {
372            let py_strategy = python_strategy.bind(py);
373            let py_strategy_ref = py_strategy
374                .cast::<PyStrategy>()
375                .map_err(|e| anyhow::anyhow!("Failed to downcast to PyStrategy: {e}"))?;
376            py_strategy_ref.borrow().register_in_global_registries();
377            Ok(())
378        })
379        .map_err(to_pyruntime_err)?;
380
381        engine
382            .kernel_mut()
383            .trader
384            .borrow_mut()
385            .add_strategy_id_with_subscriptions::<PyStrategyInner>(strategy_id)
386            .map_err(to_pyruntime_err)?;
387
388        log::info!("Registered Python strategy {strategy_id}");
389        Ok(())
390    }
391
392    fn __repr__(&self) -> String {
393        format!("{self:?}")
394    }
395}
396
397pub(crate) fn create_config_instance<'py>(
398    py: Python<'py>,
399    config_path: &str,
400    config: &HashMap<String, serde_json::Value>,
401) -> anyhow::Result<Option<Bound<'py, PyAny>>> {
402    if config_path.is_empty() && config.is_empty() {
403        log::debug!("No config_path or empty config, using None");
404        return Ok(None);
405    }
406
407    let config_parts: Vec<&str> = config_path.split(':').collect();
408    if config_parts.len() != 2 {
409        anyhow::bail!("config_path must be in format 'module.path:ClassName', was {config_path}");
410    }
411    let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
412
413    log::debug!(
414        "Importing config class from module: {config_module_name} class: {config_class_name}"
415    );
416
417    let config_module = py
418        .import(config_module_name)
419        .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
420    let config_class = config_module
421        .getattr(config_class_name)
422        .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
423
424    // Convert config dict to Python dict
425    let py_dict = PyDict::new(py);
426
427    for (key, value) in config {
428        let json_str = serde_json::to_string(value)
429            .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
430        let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
431        py_dict.set_item(key, py_value)?;
432    }
433
434    log::debug!("Created config dict: {py_dict:?}");
435
436    // Try kwargs first, then default constructor with setattr
437    let config_instance = match config_class.call((), Some(&py_dict)) {
438        Ok(instance) => {
439            log::debug!("Created config instance with kwargs");
440            instance
441        }
442        Err(kwargs_err) => {
443            log::debug!("Failed to create config with kwargs: {kwargs_err}");
444
445            match config_class.call0() {
446                Ok(instance) => {
447                    log::debug!("Created default config instance, setting attributes");
448                    for (key, value) in config {
449                        let json_str = serde_json::to_string(value).map_err(|e| {
450                            anyhow::anyhow!("Failed to serialize config value: {e}")
451                        })?;
452                        let py_value = PyModule::import(py, "json")?.call_method(
453                            "loads",
454                            (json_str,),
455                            None,
456                        )?;
457
458                        if let Err(setattr_err) = instance.setattr(key, py_value) {
459                            log::warn!("Failed to set attribute {key}: {setattr_err}");
460                        }
461                    }
462
463                    // Only call __post_init__ if it exists (setattr path
464                    // needs it, kwargs path already triggered it via __init__)
465                    if instance.hasattr("__post_init__")? {
466                        instance.call_method0("__post_init__")?;
467                    }
468
469                    instance
470                }
471                Err(default_err) => {
472                    anyhow::bail!(
473                        "Failed to create config instance. \
474                         Tried kwargs: {kwargs_err}, default: {default_err}"
475                    );
476                }
477            }
478        }
479    };
480
481    log::debug!("Created config instance: {config_instance:?}");
482
483    Ok(Some(config_instance))
484}