Skip to main content

nautilus_live/python/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for live node.
17
18use std::{cell::RefCell, collections::HashMap, rc::Rc};
19
20use nautilus_common::{
21    actor::data_actor::ImportableActorConfig, cache::CacheConfig, enums::Environment,
22    live::get_runtime, logging::logger::LoggerConfig, python::actor::PyDataActor,
23};
24#[cfg(feature = "examples")]
25use nautilus_core::python::to_pytype_err;
26use nautilus_core::{
27    UUID4,
28    python::{to_pyruntime_err, to_pyvalue_err},
29};
30use nautilus_model::identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId};
31use nautilus_portfolio::config::PortfolioConfig;
32use nautilus_system::get_global_pyo3_registry;
33use nautilus_trading::{
34    ImportableExecAlgorithmConfig, ImportableStrategyConfig,
35    python::strategy::{PyStrategy, PyStrategyInner},
36};
37use pyo3::{
38    prelude::*,
39    types::{PyDict, PyTuple},
40};
41use serde_json;
42
43use crate::{
44    builder::LiveNodeBuilder,
45    config::{LiveDataEngineConfig, LiveExecEngineConfig, LiveNodeConfig, LiveRiskEngineConfig},
46    node::LiveNode,
47};
48
49#[pyo3_stub_gen::derive::gen_stub_pymethods]
50#[pymethods]
51impl LiveNode {
52    #[staticmethod]
53    #[pyo3(name = "build")]
54    #[pyo3(signature = (name, config=None))]
55    fn py_build(name: String, config: Option<LiveNodeConfig>) -> PyResult<Self> {
56        Self::build(name, config).map_err(to_pyruntime_err)
57    }
58
59    #[staticmethod]
60    #[pyo3(name = "builder")]
61    fn py_builder(
62        name: String,
63        trader_id: TraderId,
64        environment: Environment,
65    ) -> PyResult<LiveNodeBuilderPy> {
66        match Self::builder(trader_id, environment) {
67            Ok(builder) => Ok(LiveNodeBuilderPy {
68                inner: Rc::new(RefCell::new(Some(builder.with_name(name)))),
69            }),
70            Err(e) => Err(to_pyruntime_err(e)),
71        }
72    }
73
74    #[getter]
75    #[pyo3(name = "environment")]
76    fn py_environment(&self) -> Environment {
77        self.environment()
78    }
79
80    #[getter]
81    #[pyo3(name = "trader_id")]
82    fn py_trader_id(&self) -> TraderId {
83        self.trader_id()
84    }
85
86    #[getter]
87    #[pyo3(name = "instance_id")]
88    const fn py_instance_id(&self) -> UUID4 {
89        self.instance_id()
90    }
91
92    #[getter]
93    #[pyo3(name = "is_running")]
94    fn py_is_running(&self) -> bool {
95        self.is_running()
96    }
97
98    #[pyo3(name = "start")]
99    fn py_start(&mut self) -> PyResult<()> {
100        if self.is_running() {
101            return Err(to_pyruntime_err("LiveNode is already running"));
102        }
103
104        // Non-blocking start - just start the node in the background
105        get_runtime().block_on(async { self.start().await.map_err(to_pyruntime_err) })
106    }
107
108    #[pyo3(name = "run")]
109    fn py_run(&mut self, py: Python) -> PyResult<()> {
110        if self.is_running() {
111            return Err(to_pyruntime_err("LiveNode is already running"));
112        }
113
114        // Get a handle for coordinating with the signal checker
115        let handle = self.handle();
116
117        // Import signal module
118        let signal_module = py.import("signal")?;
119        let original_handler =
120            signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?; // Save original SIGINT handler (signal 2)
121
122        // Set up a custom signal handler that uses our handle
123        let handle_for_signal = handle;
124        let signal_callback = pyo3::types::PyCFunction::new_closure(
125            py,
126            None,
127            None,
128            move |_args: &pyo3::Bound<'_, PyTuple>,
129                  _kwargs: Option<&pyo3::Bound<'_, PyDict>>|
130                  -> PyResult<()> {
131                log::info!("Python signal handler called");
132                handle_for_signal.stop();
133                Ok(())
134            },
135        )?;
136
137        // Install our signal handler
138        signal_module.call_method1("signal", (2, signal_callback))?;
139
140        // Run the node and restore signal handler afterward
141        let result =
142            { get_runtime().block_on(async { self.run().await.map_err(to_pyruntime_err) }) };
143
144        // Restore original signal handler
145        signal_module.call_method1("signal", (2, original_handler))?;
146
147        result
148    }
149
150    #[pyo3(name = "stop")]
151    fn py_stop(&self) -> PyResult<()> {
152        if !self.is_running() {
153            return Err(to_pyruntime_err("LiveNode is not running"));
154        }
155
156        // Use the handle to signal stop - this is thread-safe and doesn't require async
157        self.handle().stop();
158        Ok(())
159    }
160
161    #[allow(
162        unsafe_code,
163        reason = "Required for Python actor component registration"
164    )]
165    #[pyo3(name = "add_actor_from_config")]
166    #[expect(clippy::needless_pass_by_value)]
167    fn py_add_actor_from_config(
168        &mut self,
169        _py: Python,
170        config: ImportableActorConfig,
171    ) -> PyResult<()> {
172        log::debug!("`add_actor_from_config` with: {config:?}");
173
174        // Extract module and class name from actor_path
175        let parts: Vec<&str> = config.actor_path.split(':').collect();
176        if parts.len() != 2 {
177            return Err(to_pyvalue_err(
178                "actor_path must be in format 'module.path:ClassName'",
179            ));
180        }
181        let (module_name, class_name) = (parts[0], parts[1]);
182
183        log::info!("Importing actor from module: {module_name} class: {class_name}");
184
185        // Phase 1: Create and configure the Python actor, extract its actor_id
186        let (python_actor, actor_id) =
187            Python::attach(|py| -> anyhow::Result<(Py<PyAny>, ActorId)> {
188                let actor_module = py
189                    .import(module_name)
190                    .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
191                let actor_class = actor_module
192                    .getattr(class_name)
193                    .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
194
195                let config_instance =
196                    create_config_instance(py, &config.config_path, &config.config)?;
197
198                let python_actor = if let Some(config_obj) = config_instance.clone() {
199                    actor_class.call1((config_obj,))?
200                } else {
201                    actor_class.call0()?
202                };
203
204                log::debug!("Created Python actor instance: {python_actor:?}");
205
206                let mut py_data_actor_ref = python_actor
207                    .extract::<PyRefMut<PyDataActor>>()
208                    .map_err(Into::<PyErr>::into)
209                    .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
210
211                // Extract inherited config fields from the Python config
212                if let Some(config_obj) = config_instance.as_ref() {
213                    if let Ok(actor_id) = config_obj.getattr("actor_id")
214                        && !actor_id.is_none()
215                    {
216                        let actor_id_val = if let Ok(aid) = actor_id.extract::<ActorId>() {
217                            aid
218                        } else if let Ok(aid_str) = actor_id.extract::<String>() {
219                            ActorId::new_checked(&aid_str)?
220                        } else {
221                            anyhow::bail!("Invalid `actor_id` type");
222                        };
223                        py_data_actor_ref.set_actor_id(actor_id_val);
224                    }
225
226                    if let Some(val) = extract_bool_config_attr(config_obj, "log_events") {
227                        py_data_actor_ref.set_log_events(val);
228                    }
229
230                    if let Some(val) = extract_bool_config_attr(config_obj, "log_commands") {
231                        py_data_actor_ref.set_log_commands(val);
232                    }
233                }
234
235                py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
236
237                let actor_id = py_data_actor_ref.actor_id();
238
239                Ok((python_actor.unbind(), actor_id))
240            })
241            .map_err(to_pyruntime_err)?;
242
243        // Validate no duplicate before any mutations
244        if self
245            .kernel()
246            .trader
247            .borrow()
248            .actor_ids()
249            .contains(&actor_id)
250        {
251            return Err(to_pyruntime_err(format!(
252                "Actor '{actor_id}' is already registered"
253            )));
254        }
255
256        // Phase 2: Create per-component clock via the trader.
257        // This requires `&mut self` access to the kernel, which cannot be held
258        // inside a `Python::attach` block, hence the separate phases.
259        let trader_id = self.kernel().trader_id();
260        let cache = self.kernel().cache();
261        let component_id = ComponentId::new(actor_id.inner().as_str());
262        let clock = self
263            .kernel_mut()
264            .trader
265            .borrow_mut()
266            .create_component_clock(component_id);
267
268        // Phase 3: Register the actor with its dedicated clock
269        Python::attach(|py| -> anyhow::Result<()> {
270            let py_actor = python_actor.bind(py);
271            let mut py_data_actor_ref = py_actor
272                .extract::<PyRefMut<PyDataActor>>()
273                .map_err(Into::<PyErr>::into)
274                .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
275
276            py_data_actor_ref
277                .register(trader_id, clock, cache)
278                .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
279
280            log::debug!(
281                "Internal PyDataActor registered: {}, state: {:?}",
282                py_data_actor_ref.is_registered(),
283                py_data_actor_ref.state()
284            );
285
286            Ok(())
287        })
288        .map_err(to_pyruntime_err)?;
289
290        // Phase 4: Register in global registries and track for lifecycle
291        Python::attach(|py| -> anyhow::Result<()> {
292            let py_actor = python_actor.bind(py);
293            let py_data_actor_ref = py_actor
294                .cast::<PyDataActor>()
295                .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
296            py_data_actor_ref.borrow().register_in_global_registries();
297            Ok(())
298        })
299        .map_err(to_pyruntime_err)?;
300
301        self.kernel_mut()
302            .trader
303            .borrow_mut()
304            .add_actor_id_for_lifecycle(actor_id)
305            .map_err(to_pyruntime_err)?;
306
307        log::info!("Registered Python actor {actor_id}");
308        Ok(())
309    }
310
311    #[allow(
312        unsafe_code,
313        reason = "Required for Python strategy component registration"
314    )]
315    #[pyo3(name = "add_strategy_from_config")]
316    #[expect(clippy::needless_pass_by_value)]
317    fn py_add_strategy_from_config(
318        &mut self,
319        _py: Python,
320        config: ImportableStrategyConfig,
321    ) -> PyResult<()> {
322        log::debug!("`add_strategy_from_config` with: {config:?}");
323
324        // Extract module and class name from strategy_path
325        let parts: Vec<&str> = config.strategy_path.split(':').collect();
326        if parts.len() != 2 {
327            return Err(to_pyvalue_err(
328                "strategy_path must be in format 'module.path:ClassName'",
329            ));
330        }
331        let (module_name, class_name) = (parts[0], parts[1]);
332
333        log::info!("Importing strategy from module: {module_name} class: {class_name}");
334
335        // Phase 1: Create and configure the Python strategy, extract its strategy_id
336        let (python_strategy, strategy_id) =
337            Python::attach(|py| -> anyhow::Result<(Py<PyAny>, StrategyId)> {
338                let strategy_module = py
339                    .import(module_name)
340                    .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
341                let strategy_class = strategy_module
342                    .getattr(class_name)
343                    .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
344
345                let config_instance =
346                    create_config_instance(py, &config.config_path, &config.config)?;
347
348                let python_strategy = if let Some(config_obj) = config_instance.clone() {
349                    strategy_class.call1((config_obj,))?
350                } else {
351                    strategy_class.call0()?
352                };
353
354                log::debug!("Created Python strategy instance: {python_strategy:?}");
355
356                let mut py_strategy_ref = python_strategy
357                    .extract::<PyRefMut<PyStrategy>>()
358                    .map_err(Into::<PyErr>::into)
359                    .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
360
361                // Extract inherited config fields from the Python config
362                if let Some(config_obj) = config_instance.as_ref() {
363                    if let Ok(strategy_id) = config_obj.getattr("strategy_id")
364                        && !strategy_id.is_none()
365                    {
366                        let strategy_id_val = if let Ok(sid) = strategy_id.extract::<StrategyId>() {
367                            sid
368                        } else if let Ok(sid_str) = strategy_id.extract::<String>() {
369                            StrategyId::new_checked(&sid_str)?
370                        } else {
371                            anyhow::bail!("Invalid `strategy_id` type");
372                        };
373                        py_strategy_ref.set_strategy_id(strategy_id_val);
374                    }
375
376                    if let Some(val) = extract_bool_config_attr(config_obj, "log_events") {
377                        py_strategy_ref.set_log_events(val);
378                    }
379
380                    if let Some(val) = extract_bool_config_attr(config_obj, "log_commands") {
381                        py_strategy_ref.set_log_commands(val);
382                    }
383                }
384
385                py_strategy_ref.set_python_instance(python_strategy.clone().unbind());
386
387                let strategy_id = py_strategy_ref.strategy_id();
388
389                Ok((python_strategy.unbind(), strategy_id))
390            })
391            .map_err(to_pyruntime_err)?;
392
393        // Validate no duplicate before any mutations
394        if self
395            .kernel()
396            .trader
397            .borrow()
398            .strategy_ids()
399            .contains(&strategy_id)
400        {
401            return Err(to_pyruntime_err(format!(
402                "Strategy '{strategy_id}' is already registered"
403            )));
404        }
405
406        // Phase 2: Create per-component clock via the trader.
407        // This requires `&mut self` access to the kernel, which cannot be held
408        // inside a `Python::attach` block, hence the separate phases.
409        let trader_id = self.kernel().trader_id();
410        let cache = self.kernel().cache();
411        let portfolio = self.kernel().portfolio.clone();
412        let component_id = ComponentId::new(strategy_id.inner().as_str());
413        let clock = self
414            .kernel_mut()
415            .trader
416            .borrow_mut()
417            .create_component_clock(component_id);
418
419        // Phase 3: Register the strategy with its dedicated clock
420        Python::attach(|py| -> anyhow::Result<()> {
421            let py_strategy = python_strategy.bind(py);
422            let mut py_strategy_ref = py_strategy
423                .extract::<PyRefMut<PyStrategy>>()
424                .map_err(Into::<PyErr>::into)
425                .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
426
427            py_strategy_ref
428                .register(trader_id, clock, cache, portfolio)
429                .map_err(|e| anyhow::anyhow!("Failed to register PyStrategy: {e}"))?;
430
431            log::debug!(
432                "Internal PyStrategy registered: {}",
433                py_strategy_ref.is_registered()
434            );
435
436            Ok(())
437        })
438        .map_err(to_pyruntime_err)?;
439
440        // Phase 4: Register in global registries and install event subscriptions
441        Python::attach(|py| -> anyhow::Result<()> {
442            let py_strategy = python_strategy.bind(py);
443            let py_strategy_ref = py_strategy
444                .cast::<PyStrategy>()
445                .map_err(|e| anyhow::anyhow!("Failed to downcast to PyStrategy: {e}"))?;
446            py_strategy_ref.borrow().register_in_global_registries();
447            Ok(())
448        })
449        .map_err(to_pyruntime_err)?;
450
451        // TODO: Register external order claims with the execution manager
452        // once PyStrategy exposes external_order_claims publicly.
453        // Currently external_order_claims are only handled for Rust-native
454        // strategies via LiveNode::add_strategy<T>().
455        Python::attach(|py| {
456            let py_strategy = python_strategy.bind(py);
457            if let Ok(claims) = py_strategy.getattr("external_order_claims")
458                && !claims.is_none()
459                && claims.len().unwrap_or(0) > 0
460            {
461                log::warn!(
462                    "Strategy '{strategy_id}' has external_order_claims configured, \
463                     but these are not yet supported for Python strategies on the Rust backend"
464                );
465            }
466        });
467
468        self.kernel_mut()
469            .trader
470            .borrow_mut()
471            .add_strategy_id_with_subscriptions::<PyStrategyInner>(strategy_id)
472            .map_err(to_pyruntime_err)?;
473
474        log::info!("Registered Python strategy {strategy_id}");
475        Ok(())
476    }
477
478    /// Adds a native Rust strategy from its config.
479    ///
480    /// The config type determines which built-in strategy is constructed.
481    /// All execution happens in Rust; Python is the configuration layer.
482    #[cfg(feature = "examples")]
483    #[pyo3(name = "add_native_strategy")]
484    fn py_add_native_strategy(&mut self, config: &Bound<'_, PyAny>) -> PyResult<()> {
485        use nautilus_trading::examples::strategies::{
486            DeltaNeutralVol, DeltaNeutralVolConfig, EmaCross, EmaCrossConfig, GridMarketMaker,
487            GridMarketMakerConfig, HurstVpinDirectional, HurstVpinDirectionalConfig,
488        };
489
490        if let Ok(config) = config.extract::<EmaCrossConfig>() {
491            self.add_strategy(EmaCross::from_config(config))
492                .map_err(to_pyruntime_err)
493        } else if let Ok(config) = config.extract::<GridMarketMakerConfig>() {
494            self.add_strategy(GridMarketMaker::new(config))
495                .map_err(to_pyruntime_err)
496        } else if let Ok(config) = config.extract::<DeltaNeutralVolConfig>() {
497            self.add_strategy(DeltaNeutralVol::new(config))
498                .map_err(to_pyruntime_err)
499        } else if let Ok(config) = config.extract::<HurstVpinDirectionalConfig>() {
500            self.add_strategy(HurstVpinDirectional::new(config))
501                .map_err(to_pyruntime_err)
502        } else {
503            let type_name = config.get_type().name()?;
504            Err(to_pytype_err(format!(
505                "Unsupported native strategy config type: {type_name}",
506            )))
507        }
508    }
509
510    /// Adds a native Rust actor from its config.
511    ///
512    /// The config type determines which built-in actor is constructed.
513    /// All execution happens in Rust; Python is the configuration layer.
514    #[cfg(feature = "examples")]
515    #[pyo3(name = "add_native_actor")]
516    fn py_add_native_actor(&mut self, config: &Bound<'_, PyAny>) -> PyResult<()> {
517        use nautilus_trading::examples::actors::{BookImbalanceActor, BookImbalanceActorConfig};
518
519        if let Ok(config) = config.extract::<BookImbalanceActorConfig>() {
520            self.add_actor(BookImbalanceActor::from_config(config))
521                .map_err(to_pyruntime_err)
522        } else {
523            let type_name = config.get_type().name()?;
524            Err(to_pytype_err(format!(
525                "Unsupported native actor config type: {type_name}",
526            )))
527        }
528    }
529
530    #[allow(
531        unsafe_code,
532        reason = "Required for Python exec algorithm component registration"
533    )]
534    #[pyo3(name = "add_exec_algorithm_from_config")]
535    #[expect(clippy::needless_pass_by_value)]
536    fn py_add_exec_algorithm_from_config(
537        &mut self,
538        _py: Python,
539        config: ImportableExecAlgorithmConfig,
540    ) -> PyResult<()> {
541        if self.is_running() {
542            return Err(to_pyruntime_err(
543                "Cannot add exec algorithm while node is running",
544            ));
545        }
546
547        log::debug!("`add_exec_algorithm_from_config` with: {config:?}");
548
549        let parts: Vec<&str> = config.exec_algorithm_path.split(':').collect();
550        if parts.len() != 2 {
551            return Err(to_pyvalue_err(
552                "exec_algorithm_path must be in format 'module.path:ClassName'",
553            ));
554        }
555        let (module_name, class_name) = (parts[0], parts[1]);
556
557        log::info!("Importing exec algorithm from module: {module_name} class: {class_name}");
558
559        // Phase 1: Create and configure the Python exec algorithm, extract its actor_id
560        let (python_exec_algorithm, actor_id) =
561            Python::attach(|py| -> anyhow::Result<(Py<PyAny>, ActorId)> {
562                let algo_module = py
563                    .import(module_name)
564                    .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
565                let algo_class = algo_module
566                    .getattr(class_name)
567                    .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
568
569                let config_instance =
570                    create_config_instance(py, &config.config_path, &config.config)?;
571
572                let python_exec_algorithm = if let Some(config_obj) = config_instance.clone() {
573                    algo_class.call1((config_obj,))?
574                } else {
575                    algo_class.call0()?
576                };
577
578                log::debug!("Created Python exec algorithm instance: {python_exec_algorithm:?}");
579
580                let mut py_data_actor_ref = python_exec_algorithm
581                    .extract::<PyRefMut<PyDataActor>>()
582                    .map_err(Into::<PyErr>::into)
583                    .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
584
585                // Extract ID from config: prefer exec_algorithm_id, fall back to actor_id
586                if let Some(config_obj) = config_instance.as_ref() {
587                    let id_attr = config_obj
588                        .getattr("exec_algorithm_id")
589                        .ok()
590                        .filter(|v| !v.is_none())
591                        .or_else(|| config_obj.getattr("actor_id").ok().filter(|v| !v.is_none()));
592
593                    if let Some(id_value) = id_attr {
594                        let actor_id_val = if let Ok(eaid) = id_value.extract::<ExecAlgorithmId>() {
595                            ActorId::new(eaid.inner().as_str())
596                        } else if let Ok(aid) = id_value.extract::<ActorId>() {
597                            aid
598                        } else if let Ok(aid_str) = id_value.extract::<String>() {
599                            ActorId::new_checked(&aid_str)?
600                        } else {
601                            anyhow::bail!("Invalid `exec_algorithm_id`/`actor_id` type");
602                        };
603                        py_data_actor_ref.set_actor_id(actor_id_val);
604                    }
605
606                    if let Some(val) = extract_bool_config_attr(config_obj, "log_events") {
607                        py_data_actor_ref.set_log_events(val);
608                    }
609
610                    if let Some(val) = extract_bool_config_attr(config_obj, "log_commands") {
611                        py_data_actor_ref.set_log_commands(val);
612                    }
613                }
614
615                py_data_actor_ref.set_python_instance(python_exec_algorithm.clone().unbind());
616
617                let actor_id = py_data_actor_ref.actor_id();
618
619                Ok((python_exec_algorithm.unbind(), actor_id))
620            })
621            .map_err(to_pyruntime_err)?;
622
623        let exec_algorithm_id = ExecAlgorithmId::from(actor_id.inner().as_str());
624
625        if self
626            .kernel()
627            .trader
628            .borrow()
629            .exec_algorithm_ids()
630            .contains(&exec_algorithm_id)
631        {
632            return Err(to_pyruntime_err(format!(
633                "Execution algorithm '{exec_algorithm_id}' is already registered"
634            )));
635        }
636
637        // Phase 2: Create per-component clock via the trader.
638        // This requires `&mut self` access to the kernel, which cannot be held
639        // inside a `Python::attach` block, hence the separate phases.
640        let trader_id = self.kernel().trader_id();
641        let cache = self.kernel().cache();
642        let component_id = ComponentId::new(actor_id.inner().as_str());
643        let clock = self
644            .kernel_mut()
645            .trader
646            .borrow_mut()
647            .create_component_clock(component_id);
648
649        // Phase 3: Register the exec algorithm with its dedicated clock
650        Python::attach(|py| -> anyhow::Result<()> {
651            let py_algo = python_exec_algorithm.bind(py);
652            let mut py_data_actor_ref = py_algo
653                .extract::<PyRefMut<PyDataActor>>()
654                .map_err(Into::<PyErr>::into)
655                .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
656
657            py_data_actor_ref
658                .register(trader_id, clock, cache)
659                .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
660
661            log::debug!(
662                "Internal PyDataActor registered: {}, state: {:?}",
663                py_data_actor_ref.is_registered(),
664                py_data_actor_ref.state()
665            );
666
667            Ok(())
668        })
669        .map_err(to_pyruntime_err)?;
670
671        // Phase 4: Register in global registries and track for lifecycle
672        Python::attach(|py| -> anyhow::Result<()> {
673            let py_algo = python_exec_algorithm.bind(py);
674            let py_data_actor_ref = py_algo
675                .cast::<PyDataActor>()
676                .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
677            py_data_actor_ref.borrow().register_in_global_registries();
678            Ok(())
679        })
680        .map_err(to_pyruntime_err)?;
681
682        self.kernel_mut()
683            .trader
684            .borrow_mut()
685            .add_exec_algorithm_id_for_lifecycle(exec_algorithm_id)
686            .map_err(to_pyruntime_err)?;
687
688        log::info!("Registered Python exec algorithm {exec_algorithm_id}");
689        Ok(())
690    }
691
692    fn __repr__(&self) -> String {
693        format!(
694            "LiveNode(trader_id={}, environment={:?}, running={})",
695            self.trader_id(),
696            self.environment(),
697            self.is_running()
698        )
699    }
700}
701
702/// Python wrapper for `LiveNodeBuilder` that uses interior mutability
703/// to work around PyO3's shared ownership model.
704#[derive(Debug)]
705#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
706#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")]
707pub struct LiveNodeBuilderPy {
708    inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
709}
710
711#[pyo3_stub_gen::derive::gen_stub_pymethods]
712#[pymethods]
713impl LiveNodeBuilderPy {
714    #[pyo3(name = "with_instance_id")]
715    fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
716        let mut inner_ref = self.inner.borrow_mut();
717        if let Some(builder) = inner_ref.take() {
718            *inner_ref = Some(builder.with_instance_id(instance_id));
719            Ok(Self {
720                inner: self.inner.clone(),
721            })
722        } else {
723            Err(to_pyruntime_err("Builder already consumed"))
724        }
725    }
726
727    #[pyo3(name = "with_load_state")]
728    fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
729        let mut inner_ref = self.inner.borrow_mut();
730        if let Some(builder) = inner_ref.take() {
731            *inner_ref = Some(builder.with_load_state(load_state));
732            Ok(Self {
733                inner: self.inner.clone(),
734            })
735        } else {
736            Err(to_pyruntime_err("Builder already consumed"))
737        }
738    }
739
740    #[pyo3(name = "with_save_state")]
741    fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
742        let mut inner_ref = self.inner.borrow_mut();
743        if let Some(builder) = inner_ref.take() {
744            *inner_ref = Some(builder.with_save_state(save_state));
745            Ok(Self {
746                inner: self.inner.clone(),
747            })
748        } else {
749            Err(to_pyruntime_err("Builder already consumed"))
750        }
751    }
752
753    #[pyo3(name = "with_timeout_connection")]
754    fn py_with_timeout_connection(&self, timeout_secs: u64) -> PyResult<Self> {
755        let mut inner_ref = self.inner.borrow_mut();
756        if let Some(builder) = inner_ref.take() {
757            *inner_ref = Some(builder.with_timeout_connection(timeout_secs));
758            Ok(Self {
759                inner: self.inner.clone(),
760            })
761        } else {
762            Err(to_pyruntime_err("Builder already consumed"))
763        }
764    }
765
766    #[pyo3(name = "with_timeout_reconciliation")]
767    fn py_with_timeout_reconciliation(&self, timeout_secs: u64) -> PyResult<Self> {
768        let mut inner_ref = self.inner.borrow_mut();
769        if let Some(builder) = inner_ref.take() {
770            *inner_ref = Some(builder.with_timeout_reconciliation(timeout_secs));
771            Ok(Self {
772                inner: self.inner.clone(),
773            })
774        } else {
775            Err(to_pyruntime_err("Builder already consumed"))
776        }
777    }
778
779    #[pyo3(name = "with_timeout_portfolio")]
780    fn py_with_timeout_portfolio(&self, timeout_secs: u64) -> PyResult<Self> {
781        let mut inner_ref = self.inner.borrow_mut();
782        if let Some(builder) = inner_ref.take() {
783            *inner_ref = Some(builder.with_timeout_portfolio(timeout_secs));
784            Ok(Self {
785                inner: self.inner.clone(),
786            })
787        } else {
788            Err(to_pyruntime_err("Builder already consumed"))
789        }
790    }
791
792    #[pyo3(name = "with_timeout_disconnection_secs")]
793    fn py_with_timeout_disconnection_secs(&self, timeout_secs: u64) -> PyResult<Self> {
794        let mut inner_ref = self.inner.borrow_mut();
795        if let Some(builder) = inner_ref.take() {
796            *inner_ref = Some(builder.with_timeout_disconnection_secs(timeout_secs));
797            Ok(Self {
798                inner: self.inner.clone(),
799            })
800        } else {
801            Err(to_pyruntime_err("Builder already consumed"))
802        }
803    }
804
805    #[pyo3(name = "with_delay_post_stop_secs")]
806    fn py_with_delay_post_stop_secs(&self, delay_secs: u64) -> PyResult<Self> {
807        let mut inner_ref = self.inner.borrow_mut();
808        if let Some(builder) = inner_ref.take() {
809            *inner_ref = Some(builder.with_delay_post_stop_secs(delay_secs));
810            Ok(Self {
811                inner: self.inner.clone(),
812            })
813        } else {
814            Err(to_pyruntime_err("Builder already consumed"))
815        }
816    }
817
818    #[pyo3(name = "with_delay_shutdown_secs")]
819    fn py_with_delay_shutdown_secs(&self, delay_secs: u64) -> PyResult<Self> {
820        let mut inner_ref = self.inner.borrow_mut();
821        if let Some(builder) = inner_ref.take() {
822            *inner_ref = Some(builder.with_delay_shutdown_secs(delay_secs));
823            Ok(Self {
824                inner: self.inner.clone(),
825            })
826        } else {
827            Err(to_pyruntime_err("Builder already consumed"))
828        }
829    }
830
831    #[pyo3(name = "with_reconciliation")]
832    fn py_with_reconciliation(&self, reconciliation: bool) -> PyResult<Self> {
833        let mut inner_ref = self.inner.borrow_mut();
834        if let Some(builder) = inner_ref.take() {
835            *inner_ref = Some(builder.with_reconciliation(reconciliation));
836            Ok(Self {
837                inner: self.inner.clone(),
838            })
839        } else {
840            Err(to_pyruntime_err("Builder already consumed"))
841        }
842    }
843
844    #[pyo3(name = "with_reconciliation_lookback_mins")]
845    fn py_with_reconciliation_lookback_mins(&self, mins: u32) -> PyResult<Self> {
846        let mut inner_ref = self.inner.borrow_mut();
847        if let Some(builder) = inner_ref.take() {
848            *inner_ref = Some(builder.with_reconciliation_lookback_mins(mins));
849            Ok(Self {
850                inner: self.inner.clone(),
851            })
852        } else {
853            Err(to_pyruntime_err("Builder already consumed"))
854        }
855    }
856
857    #[pyo3(name = "with_cache_config")]
858    fn py_with_cache_config(&self, config: CacheConfig) -> PyResult<Self> {
859        let mut inner_ref = self.inner.borrow_mut();
860        if let Some(builder) = inner_ref.take() {
861            *inner_ref = Some(builder.with_cache_config(config));
862            Ok(Self {
863                inner: self.inner.clone(),
864            })
865        } else {
866            Err(to_pyruntime_err("Builder already consumed"))
867        }
868    }
869
870    #[pyo3(name = "with_portfolio_config")]
871    fn py_with_portfolio_config(&self, config: PortfolioConfig) -> PyResult<Self> {
872        let mut inner_ref = self.inner.borrow_mut();
873        if let Some(builder) = inner_ref.take() {
874            *inner_ref = Some(builder.with_portfolio_config(config));
875            Ok(Self {
876                inner: self.inner.clone(),
877            })
878        } else {
879            Err(to_pyruntime_err("Builder already consumed"))
880        }
881    }
882
883    #[pyo3(name = "with_data_engine_config")]
884    fn py_with_data_engine_config(&self, config: LiveDataEngineConfig) -> PyResult<Self> {
885        let mut inner_ref = self.inner.borrow_mut();
886        if let Some(builder) = inner_ref.take() {
887            *inner_ref = Some(builder.with_data_engine_config(config));
888            Ok(Self {
889                inner: self.inner.clone(),
890            })
891        } else {
892            Err(to_pyruntime_err("Builder already consumed"))
893        }
894    }
895
896    #[pyo3(name = "with_risk_engine_config")]
897    fn py_with_risk_engine_config(&self, config: LiveRiskEngineConfig) -> PyResult<Self> {
898        let mut inner_ref = self.inner.borrow_mut();
899        if let Some(builder) = inner_ref.take() {
900            *inner_ref = Some(builder.with_risk_engine_config(config));
901            Ok(Self {
902                inner: self.inner.clone(),
903            })
904        } else {
905            Err(to_pyruntime_err("Builder already consumed"))
906        }
907    }
908
909    #[pyo3(name = "with_exec_engine_config")]
910    fn py_with_exec_engine_config(&self, config: LiveExecEngineConfig) -> PyResult<Self> {
911        let mut inner_ref = self.inner.borrow_mut();
912        if let Some(builder) = inner_ref.take() {
913            *inner_ref = Some(builder.with_exec_engine_config(config));
914            Ok(Self {
915                inner: self.inner.clone(),
916            })
917        } else {
918            Err(to_pyruntime_err("Builder already consumed"))
919        }
920    }
921
922    #[pyo3(name = "with_logging")]
923    fn py_with_logging(&self, logging: LoggerConfig) -> PyResult<Self> {
924        let mut inner_ref = self.inner.borrow_mut();
925        if let Some(builder) = inner_ref.take() {
926            *inner_ref = Some(builder.with_logging(logging));
927            Ok(Self {
928                inner: self.inner.clone(),
929            })
930        } else {
931            Err(to_pyruntime_err("Builder already consumed"))
932        }
933    }
934
935    #[pyo3(name = "add_data_client")]
936    #[expect(clippy::needless_pass_by_value)]
937    fn py_add_data_client(
938        &self,
939        name: Option<String>,
940        factory: Py<PyAny>,
941        config: Py<PyAny>,
942    ) -> PyResult<Self> {
943        let mut inner_ref = self.inner.borrow_mut();
944        if let Some(builder) = inner_ref.take() {
945            Python::attach(|py| -> PyResult<Self> {
946                // Use the global registry to extract Py<PyAny>s to trait objects
947                let registry = get_global_pyo3_registry();
948
949                let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
950                let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
951
952                // Use the factory name from the original factory for the client name
953                let factory_name = factory
954                    .getattr(py, "name")?
955                    .call0(py)?
956                    .extract::<String>(py)?;
957                let client_name = name.unwrap_or(factory_name);
958
959                // Add the data client to the builder using boxed trait objects
960                match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
961                    Ok(updated_builder) => {
962                        *inner_ref = Some(updated_builder);
963                        Ok(Self {
964                            inner: self.inner.clone(),
965                        })
966                    }
967                    Err(e) => Err(to_pyruntime_err(format!("Failed to add data client: {e}"))),
968                }
969            })
970        } else {
971            Err(to_pyruntime_err("Builder already consumed"))
972        }
973    }
974
975    #[pyo3(name = "add_exec_client")]
976    #[expect(clippy::needless_pass_by_value)]
977    fn py_add_exec_client(
978        &self,
979        name: Option<String>,
980        factory: Py<PyAny>,
981        config: Py<PyAny>,
982    ) -> PyResult<Self> {
983        let mut inner_ref = self.inner.borrow_mut();
984        if let Some(builder) = inner_ref.take() {
985            Python::attach(|py| -> PyResult<Self> {
986                let registry = get_global_pyo3_registry();
987
988                let boxed_factory = registry.extract_exec_factory(py, factory.clone_ref(py))?;
989                let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
990
991                let factory_name = factory
992                    .getattr(py, "name")?
993                    .call0(py)?
994                    .extract::<String>(py)?;
995                let client_name = name.unwrap_or(factory_name);
996
997                match builder.add_exec_client(Some(client_name), boxed_factory, boxed_config) {
998                    Ok(updated_builder) => {
999                        *inner_ref = Some(updated_builder);
1000                        Ok(Self {
1001                            inner: self.inner.clone(),
1002                        })
1003                    }
1004                    Err(e) => Err(to_pyruntime_err(format!("Failed to add exec client: {e}"))),
1005                }
1006            })
1007        } else {
1008            Err(to_pyruntime_err("Builder already consumed"))
1009        }
1010    }
1011
1012    #[pyo3(name = "build")]
1013    fn py_build(&self) -> PyResult<LiveNode> {
1014        let mut inner_ref = self.inner.borrow_mut();
1015        if let Some(builder) = inner_ref.take() {
1016            match builder.build() {
1017                Ok(node) => Ok(node),
1018                Err(e) => Err(to_pyruntime_err(e)),
1019            }
1020        } else {
1021            Err(to_pyruntime_err("Builder already consumed"))
1022        }
1023    }
1024
1025    fn __repr__(&self) -> String {
1026        format!("{self:?}")
1027    }
1028}
1029
1030/// Creates a Python config instance from a config path and config dictionary.
1031///
1032/// This helper is shared between `add_actor_from_config` and `add_strategy_from_config`.
1033/// It handles:
1034/// 1. Importing the config class from the module path
1035/// 2. Converting the `HashMap<String, serde_json::Value>` to a Python dict
1036/// 3. Trying kwargs-first construction, falling back to default + setattr
1037/// 4. Calling `__post_init__` for dataclasses when using the setattr path
1038fn create_config_instance<'py>(
1039    py: Python<'py>,
1040    config_path: &str,
1041    config: &HashMap<String, serde_json::Value>,
1042) -> anyhow::Result<Option<Bound<'py, PyAny>>> {
1043    if config_path.is_empty() && config.is_empty() {
1044        log::debug!("No config_path or empty config, using None");
1045        return Ok(None);
1046    }
1047
1048    let config_parts: Vec<&str> = config_path.split(':').collect();
1049    if config_parts.len() != 2 {
1050        anyhow::bail!("config_path must be in format 'module.path:ClassName', was {config_path}");
1051    }
1052    let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
1053
1054    log::debug!(
1055        "Importing config class from module: {config_module_name} class: {config_class_name}"
1056    );
1057
1058    let config_module = py
1059        .import(config_module_name)
1060        .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
1061    let config_class = config_module
1062        .getattr(config_class_name)
1063        .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
1064
1065    // Convert config dict to Python dict
1066    let py_dict = PyDict::new(py);
1067
1068    for (key, value) in config {
1069        let json_str = serde_json::to_string(value)
1070            .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
1071        let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
1072        py_dict.set_item(key, py_value)?;
1073    }
1074
1075    log::debug!("Created config dict: {py_dict:?}");
1076
1077    // Try kwargs first, then default constructor with setattr
1078    let config_instance = match config_class.call((), Some(&py_dict)) {
1079        Ok(instance) => {
1080            log::debug!("Created config instance with kwargs");
1081            instance
1082        }
1083        Err(kwargs_err) => {
1084            log::debug!("Failed to create config with kwargs: {kwargs_err}");
1085
1086            match config_class.call0() {
1087                Ok(instance) => {
1088                    log::debug!("Created default config instance, setting attributes");
1089                    for (key, value) in config {
1090                        let json_str = serde_json::to_string(value).map_err(|e| {
1091                            anyhow::anyhow!("Failed to serialize config value: {e}")
1092                        })?;
1093                        let py_value = PyModule::import(py, "json")?.call_method(
1094                            "loads",
1095                            (json_str,),
1096                            None,
1097                        )?;
1098
1099                        if let Err(setattr_err) = instance.setattr(key, py_value) {
1100                            log::warn!("Failed to set attribute {key}: {setattr_err}");
1101                        }
1102                    }
1103
1104                    // Only call __post_init__ if it exists (setattr path
1105                    // needs it, kwargs path already triggered it via __init__)
1106                    if instance.hasattr("__post_init__")? {
1107                        instance.call_method0("__post_init__")?;
1108                    }
1109
1110                    instance
1111                }
1112                Err(default_err) => {
1113                    anyhow::bail!(
1114                        "Failed to create config instance. \
1115                         Tried kwargs: {kwargs_err}, default: {default_err}"
1116                    );
1117                }
1118            }
1119        }
1120    };
1121
1122    log::debug!("Created config instance: {config_instance:?}");
1123
1124    Ok(Some(config_instance))
1125}
1126
1127/// Extracts an optional boolean attribute from a Python config object.
1128///
1129/// Returns `None` if the attribute doesn't exist or isn't a bool,
1130/// without raising an error (config fields are optional overrides).
1131fn extract_bool_config_attr(config_obj: &Bound<'_, PyAny>, attr: &str) -> Option<bool> {
1132    config_obj
1133        .getattr(attr)
1134        .ok()
1135        .and_then(|val| val.extract::<bool>().ok())
1136}
1137
1138#[cfg(all(test, feature = "python"))]
1139mod tests {
1140    use std::{
1141        any::Any,
1142        cell::RefCell,
1143        collections::HashMap,
1144        ffi::CString,
1145        fmt::Debug,
1146        rc::Rc,
1147        sync::{
1148            Arc,
1149            atomic::{AtomicBool, AtomicUsize, Ordering},
1150        },
1151        time::Duration,
1152    };
1153
1154    use async_trait::async_trait;
1155    use nautilus_common::{
1156        cache::Cache,
1157        clients::DataClient,
1158        clock::Clock,
1159        enums::Environment,
1160        factories::{ClientConfig, DataClientFactory},
1161        live::runner::get_data_event_sender,
1162        messages::{
1163            DataEvent, DataResponse,
1164            data::{BarsResponse, RequestBars},
1165        },
1166        msgbus::get_message_bus,
1167    };
1168    use nautilus_core::UnixNanos;
1169    use nautilus_model::{
1170        data::{Bar, BarType},
1171        identifiers::{ClientId, TraderId, Venue},
1172        types::{Price, Quantity},
1173    };
1174    use nautilus_trading::{ImportableStrategyConfig, python::strategy::PyStrategy};
1175    use pyo3::{
1176        Python,
1177        types::{PyAnyMethods, PyDict, PyModule, PyModuleMethods},
1178    };
1179
1180    use super::LiveNode;
1181    #[derive(Debug, Default)]
1182    struct TestDataClientConfig;
1183
1184    impl ClientConfig for TestDataClientConfig {
1185        fn as_any(&self) -> &dyn Any {
1186            self
1187        }
1188    }
1189
1190    #[derive(Debug)]
1191    struct TestHistoricalBarsDataClientFactory {
1192        request_count: Arc<AtomicUsize>,
1193        response_sent_count: Arc<AtomicUsize>,
1194        handler_visible_count: Arc<AtomicUsize>,
1195    }
1196
1197    impl TestHistoricalBarsDataClientFactory {
1198        fn new(
1199            request_count: Arc<AtomicUsize>,
1200            response_sent_count: Arc<AtomicUsize>,
1201            handler_visible_count: Arc<AtomicUsize>,
1202        ) -> Self {
1203            Self {
1204                request_count,
1205                response_sent_count,
1206                handler_visible_count,
1207            }
1208        }
1209    }
1210
1211    impl DataClientFactory for TestHistoricalBarsDataClientFactory {
1212        fn create(
1213            &self,
1214            name: &str,
1215            _config: &dyn ClientConfig,
1216            _cache: Rc<RefCell<Cache>>,
1217            _clock: Rc<RefCell<dyn Clock>>,
1218        ) -> anyhow::Result<Box<dyn DataClient>> {
1219            Ok(Box::new(TestHistoricalBarsDataClient::new(
1220                ClientId::from(name),
1221                Venue::from("SIM"),
1222                self.request_count.clone(),
1223                self.response_sent_count.clone(),
1224                self.handler_visible_count.clone(),
1225            )))
1226        }
1227
1228        fn name(&self) -> &'static str {
1229            "TEST_DATA"
1230        }
1231
1232        fn config_type(&self) -> &'static str {
1233            "TestDataClientConfig"
1234        }
1235    }
1236
1237    #[derive(Debug)]
1238    struct TestHistoricalBarsDataClient {
1239        client_id: ClientId,
1240        venue: Venue,
1241        connected: Arc<AtomicBool>,
1242        request_count: Arc<AtomicUsize>,
1243        response_sent_count: Arc<AtomicUsize>,
1244        handler_visible_count: Arc<AtomicUsize>,
1245    }
1246
1247    impl TestHistoricalBarsDataClient {
1248        fn new(
1249            client_id: ClientId,
1250            venue: Venue,
1251            request_count: Arc<AtomicUsize>,
1252            response_sent_count: Arc<AtomicUsize>,
1253            handler_visible_count: Arc<AtomicUsize>,
1254        ) -> Self {
1255            Self {
1256                client_id,
1257                venue,
1258                connected: Arc::new(AtomicBool::new(false)),
1259                request_count,
1260                response_sent_count,
1261                handler_visible_count,
1262            }
1263        }
1264
1265        fn make_bar(bar_type: BarType) -> Bar {
1266            Bar::new(
1267                bar_type,
1268                Price::from("1.0000"),
1269                Price::from("1.1000"),
1270                Price::from("0.9000"),
1271                Price::from("1.0500"),
1272                Quantity::from("1000"),
1273                UnixNanos::from(1_700_000_000_000_000_000u64),
1274                UnixNanos::from(1_700_000_000_000_000_001u64),
1275            )
1276        }
1277    }
1278
1279    #[async_trait(?Send)]
1280    impl DataClient for TestHistoricalBarsDataClient {
1281        fn client_id(&self) -> ClientId {
1282            self.client_id
1283        }
1284
1285        fn venue(&self) -> Option<Venue> {
1286            Some(self.venue)
1287        }
1288
1289        fn start(&mut self) -> anyhow::Result<()> {
1290            Ok(())
1291        }
1292
1293        fn stop(&mut self) -> anyhow::Result<()> {
1294            Ok(())
1295        }
1296
1297        fn reset(&mut self) -> anyhow::Result<()> {
1298            Ok(())
1299        }
1300
1301        fn dispose(&mut self) -> anyhow::Result<()> {
1302            Ok(())
1303        }
1304
1305        fn is_connected(&self) -> bool {
1306            self.connected.load(Ordering::Relaxed)
1307        }
1308
1309        fn is_disconnected(&self) -> bool {
1310            !self.is_connected()
1311        }
1312
1313        async fn connect(&mut self) -> anyhow::Result<()> {
1314            self.connected.store(true, Ordering::Relaxed);
1315            Ok(())
1316        }
1317
1318        async fn disconnect(&mut self) -> anyhow::Result<()> {
1319            self.connected.store(false, Ordering::Relaxed);
1320            Ok(())
1321        }
1322
1323        fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1324            self.request_count.fetch_add(1, Ordering::Relaxed);
1325
1326            if get_message_bus()
1327                .borrow()
1328                .get_response_handler(&request.request_id)
1329                .is_some()
1330            {
1331                self.handler_visible_count.fetch_add(1, Ordering::Relaxed);
1332            }
1333
1334            let sender = get_data_event_sender();
1335            let client_id = self.client_id;
1336            let response_sent_count = self.response_sent_count.clone();
1337            let response = BarsResponse::new(
1338                request.request_id,
1339                client_id,
1340                request.bar_type,
1341                vec![Self::make_bar(request.bar_type)],
1342                None,
1343                None,
1344                UnixNanos::from(1_700_000_000_000_000_002u64),
1345                None,
1346            );
1347
1348            tokio::spawn(async move {
1349                tokio::time::sleep(Duration::from_millis(10)).await;
1350                response_sent_count.fetch_add(1, Ordering::Relaxed);
1351                sender
1352                    .send(DataEvent::Response(DataResponse::Bars(response)))
1353                    .expect("test bars response should send");
1354            });
1355
1356            Ok(())
1357        }
1358    }
1359
1360    fn install_tracking_strategy_module(py: Python<'_>, module_name: &str) {
1361        let module = PyModule::new(py, module_name).expect("test module should create");
1362        module
1363            .setattr("Strategy", py.get_type::<PyStrategy>())
1364            .expect("Strategy type should bind");
1365        module
1366            .setattr("BarType", py.get_type::<BarType>())
1367            .expect("BarType type should bind");
1368        module
1369            .setattr("RESULTS", PyDict::new(py))
1370            .expect("RESULTS should bind");
1371
1372        let code = CString::new(
1373            r#"
1374RESULTS["on_start"] = 0
1375RESULTS["on_historical_bars"] = 0
1376RESULTS["historical_bar_count"] = 0
1377RESULTS["last_request_id"] = ""
1378
1379class HistoricalBarsStrategy(Strategy):
1380    def __init__(self):
1381        super().__init__()
1382        self.bar_type = BarType.from_str("AUDUSD.SIM-1-MINUTE-LAST-EXTERNAL")
1383
1384    def on_start(self):
1385        RESULTS["on_start"] += 1
1386        RESULTS["last_request_id"] = self.request_bars(self.bar_type)
1387
1388    def on_stop(self):
1389        pass
1390
1391    def on_historical_bars(self, bars):
1392        RESULTS["on_historical_bars"] += 1
1393        RESULTS["historical_bar_count"] += len(bars)
1394"#,
1395        )
1396        .expect("python test code should be valid CString");
1397
1398        py.run(code.as_c_str(), Some(&module.dict()), None)
1399            .expect("test strategy code should execute");
1400
1401        let sys_modules = py
1402            .import("sys")
1403            .expect("sys should import")
1404            .getattr("modules")
1405            .expect("sys.modules should exist");
1406        sys_modules
1407            .set_item(module_name, module)
1408            .expect("test strategy module should register");
1409    }
1410
1411    fn get_results(py: Python<'_>, module_name: &str) -> (usize, usize, usize) {
1412        let module = py
1413            .import(module_name)
1414            .expect("test strategy module should import");
1415        let results_obj = module.getattr("RESULTS").expect("RESULTS should exist");
1416        let results = results_obj
1417            .cast::<PyDict>()
1418            .expect("RESULTS should be a dict");
1419
1420        let on_start = results
1421            .get_item("on_start")
1422            .expect("on_start key should exist")
1423            .extract::<usize>()
1424            .expect("on_start should extract");
1425        let on_historical_bars = results
1426            .get_item("on_historical_bars")
1427            .expect("on_historical_bars key should exist")
1428            .extract::<usize>()
1429            .expect("on_historical_bars should extract");
1430        let historical_bar_count = results
1431            .get_item("historical_bar_count")
1432            .expect("historical_bar_count key should exist")
1433            .extract::<usize>()
1434            .expect("historical_bar_count should extract");
1435
1436        (on_start, on_historical_bars, historical_bar_count)
1437    }
1438
1439    #[tokio::test(flavor = "current_thread")]
1440    async fn test_live_node_pystrategy_request_bars_dispatches_on_historical_bars() {
1441        Python::initialize();
1442
1443        let module_name = "test_live_node_historical_bars_strategy";
1444        Python::attach(|py| install_tracking_strategy_module(py, module_name));
1445
1446        let request_count = Arc::new(AtomicUsize::new(0));
1447        let response_sent_count = Arc::new(AtomicUsize::new(0));
1448        let handler_visible_count = Arc::new(AtomicUsize::new(0));
1449        let factory = TestHistoricalBarsDataClientFactory::new(
1450            request_count.clone(),
1451            response_sent_count.clone(),
1452            handler_visible_count.clone(),
1453        );
1454        let config = TestDataClientConfig;
1455
1456        let mut node = LiveNode::builder(TraderId::from("TESTER-001"), Environment::Sandbox)
1457            .unwrap()
1458            .with_reconciliation(false)
1459            .with_delay_post_stop_secs(0)
1460            .with_timeout_connection(1)
1461            .add_data_client(
1462                Some("TEST_DATA".to_string()),
1463                Box::new(factory),
1464                Box::new(config),
1465            )
1466            .unwrap()
1467            .build()
1468            .unwrap();
1469
1470        let importable = ImportableStrategyConfig {
1471            strategy_path: format!("{module_name}:HistoricalBarsStrategy"),
1472            config_path: String::new(),
1473            config: HashMap::new(),
1474        };
1475
1476        Python::attach(|py| {
1477            node.py_add_strategy_from_config(py, importable)
1478                .expect("strategy should register");
1479        });
1480
1481        let handle = node.handle();
1482        let stop_handle = handle.clone();
1483        let response_sent_count_for_stop = response_sent_count.clone();
1484
1485        tokio::spawn(async move {
1486            let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
1487
1488            loop {
1489                if response_sent_count_for_stop.load(Ordering::Relaxed) == 1
1490                    || tokio::time::Instant::now() >= deadline
1491                {
1492                    break;
1493                }
1494                tokio::time::sleep(Duration::from_millis(20)).await;
1495            }
1496            tokio::time::sleep(Duration::from_millis(250)).await;
1497            stop_handle.stop();
1498        });
1499
1500        node.run().await.expect("node should run cleanly");
1501
1502        let (on_start, on_historical_bars, historical_bar_count) =
1503            Python::attach(|py| get_results(py, module_name));
1504
1505        assert_eq!(request_count.load(Ordering::Relaxed), 1);
1506        assert_eq!(handler_visible_count.load(Ordering::Relaxed), 1);
1507        assert_eq!(response_sent_count.load(Ordering::Relaxed), 1);
1508        assert_eq!(on_start, 1);
1509        assert_eq!(on_historical_bars, 1);
1510        assert_eq!(historical_bar_count, 1);
1511    }
1512}