1use std::{cell::RefCell, collections::HashMap, rc::Rc};
19
20use nautilus_common::{
21 actor::data_actor::ImportableActorConfig, cache::CacheConfig, enums::Environment,
22 live::get_runtime, logging::logger::LoggerConfig, python::actor::PyDataActor,
23};
24#[cfg(feature = "examples")]
25use nautilus_core::python::to_pytype_err;
26use nautilus_core::{
27 UUID4,
28 python::{to_pyruntime_err, to_pyvalue_err},
29};
30use nautilus_model::identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId};
31use nautilus_portfolio::config::PortfolioConfig;
32use nautilus_system::get_global_pyo3_registry;
33use nautilus_trading::{
34 ImportableExecAlgorithmConfig, ImportableStrategyConfig,
35 python::strategy::{PyStrategy, PyStrategyInner},
36};
37use pyo3::{
38 prelude::*,
39 types::{PyDict, PyTuple},
40};
41use serde_json;
42
43use crate::{
44 builder::LiveNodeBuilder,
45 config::{LiveDataEngineConfig, LiveExecEngineConfig, LiveNodeConfig, LiveRiskEngineConfig},
46 node::LiveNode,
47};
48
49#[pyo3_stub_gen::derive::gen_stub_pymethods]
50#[pymethods]
51impl LiveNode {
52 #[staticmethod]
53 #[pyo3(name = "build")]
54 #[pyo3(signature = (name, config=None))]
55 fn py_build(name: String, config: Option<LiveNodeConfig>) -> PyResult<Self> {
56 Self::build(name, config).map_err(to_pyruntime_err)
57 }
58
59 #[staticmethod]
60 #[pyo3(name = "builder")]
61 fn py_builder(
62 name: String,
63 trader_id: TraderId,
64 environment: Environment,
65 ) -> PyResult<LiveNodeBuilderPy> {
66 match Self::builder(trader_id, environment) {
67 Ok(builder) => Ok(LiveNodeBuilderPy {
68 inner: Rc::new(RefCell::new(Some(builder.with_name(name)))),
69 }),
70 Err(e) => Err(to_pyruntime_err(e)),
71 }
72 }
73
74 #[getter]
75 #[pyo3(name = "environment")]
76 fn py_environment(&self) -> Environment {
77 self.environment()
78 }
79
80 #[getter]
81 #[pyo3(name = "trader_id")]
82 fn py_trader_id(&self) -> TraderId {
83 self.trader_id()
84 }
85
86 #[getter]
87 #[pyo3(name = "instance_id")]
88 const fn py_instance_id(&self) -> UUID4 {
89 self.instance_id()
90 }
91
92 #[getter]
93 #[pyo3(name = "is_running")]
94 fn py_is_running(&self) -> bool {
95 self.is_running()
96 }
97
98 #[pyo3(name = "start")]
99 fn py_start(&mut self) -> PyResult<()> {
100 if self.is_running() {
101 return Err(to_pyruntime_err("LiveNode is already running"));
102 }
103
104 get_runtime().block_on(async { self.start().await.map_err(to_pyruntime_err) })
106 }
107
108 #[pyo3(name = "run")]
109 fn py_run(&mut self, py: Python) -> PyResult<()> {
110 if self.is_running() {
111 return Err(to_pyruntime_err("LiveNode is already running"));
112 }
113
114 let handle = self.handle();
116
117 let signal_module = py.import("signal")?;
119 let original_handler =
120 signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?; let handle_for_signal = handle;
124 let signal_callback = pyo3::types::PyCFunction::new_closure(
125 py,
126 None,
127 None,
128 move |_args: &pyo3::Bound<'_, PyTuple>,
129 _kwargs: Option<&pyo3::Bound<'_, PyDict>>|
130 -> PyResult<()> {
131 log::info!("Python signal handler called");
132 handle_for_signal.stop();
133 Ok(())
134 },
135 )?;
136
137 signal_module.call_method1("signal", (2, signal_callback))?;
139
140 let result =
142 { get_runtime().block_on(async { self.run().await.map_err(to_pyruntime_err) }) };
143
144 signal_module.call_method1("signal", (2, original_handler))?;
146
147 result
148 }
149
150 #[pyo3(name = "stop")]
151 fn py_stop(&self) -> PyResult<()> {
152 if !self.is_running() {
153 return Err(to_pyruntime_err("LiveNode is not running"));
154 }
155
156 self.handle().stop();
158 Ok(())
159 }
160
161 #[allow(
162 unsafe_code,
163 reason = "Required for Python actor component registration"
164 )]
165 #[pyo3(name = "add_actor_from_config")]
166 #[expect(clippy::needless_pass_by_value)]
167 fn py_add_actor_from_config(
168 &mut self,
169 _py: Python,
170 config: ImportableActorConfig,
171 ) -> PyResult<()> {
172 log::debug!("`add_actor_from_config` with: {config:?}");
173
174 let parts: Vec<&str> = config.actor_path.split(':').collect();
176 if parts.len() != 2 {
177 return Err(to_pyvalue_err(
178 "actor_path must be in format 'module.path:ClassName'",
179 ));
180 }
181 let (module_name, class_name) = (parts[0], parts[1]);
182
183 log::info!("Importing actor from module: {module_name} class: {class_name}");
184
185 let (python_actor, actor_id) =
187 Python::attach(|py| -> anyhow::Result<(Py<PyAny>, ActorId)> {
188 let actor_module = py
189 .import(module_name)
190 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
191 let actor_class = actor_module
192 .getattr(class_name)
193 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
194
195 let config_instance =
196 create_config_instance(py, &config.config_path, &config.config)?;
197
198 let python_actor = if let Some(config_obj) = config_instance.clone() {
199 actor_class.call1((config_obj,))?
200 } else {
201 actor_class.call0()?
202 };
203
204 log::debug!("Created Python actor instance: {python_actor:?}");
205
206 let mut py_data_actor_ref = python_actor
207 .extract::<PyRefMut<PyDataActor>>()
208 .map_err(Into::<PyErr>::into)
209 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
210
211 if let Some(config_obj) = config_instance.as_ref() {
213 if let Ok(actor_id) = config_obj.getattr("actor_id")
214 && !actor_id.is_none()
215 {
216 let actor_id_val = if let Ok(aid) = actor_id.extract::<ActorId>() {
217 aid
218 } else if let Ok(aid_str) = actor_id.extract::<String>() {
219 ActorId::new_checked(&aid_str)?
220 } else {
221 anyhow::bail!("Invalid `actor_id` type");
222 };
223 py_data_actor_ref.set_actor_id(actor_id_val);
224 }
225
226 if let Some(val) = extract_bool_config_attr(config_obj, "log_events") {
227 py_data_actor_ref.set_log_events(val);
228 }
229
230 if let Some(val) = extract_bool_config_attr(config_obj, "log_commands") {
231 py_data_actor_ref.set_log_commands(val);
232 }
233 }
234
235 py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
236
237 let actor_id = py_data_actor_ref.actor_id();
238
239 Ok((python_actor.unbind(), actor_id))
240 })
241 .map_err(to_pyruntime_err)?;
242
243 if self
245 .kernel()
246 .trader
247 .borrow()
248 .actor_ids()
249 .contains(&actor_id)
250 {
251 return Err(to_pyruntime_err(format!(
252 "Actor '{actor_id}' is already registered"
253 )));
254 }
255
256 let trader_id = self.kernel().trader_id();
260 let cache = self.kernel().cache();
261 let component_id = ComponentId::new(actor_id.inner().as_str());
262 let clock = self
263 .kernel_mut()
264 .trader
265 .borrow_mut()
266 .create_component_clock(component_id);
267
268 Python::attach(|py| -> anyhow::Result<()> {
270 let py_actor = python_actor.bind(py);
271 let mut py_data_actor_ref = py_actor
272 .extract::<PyRefMut<PyDataActor>>()
273 .map_err(Into::<PyErr>::into)
274 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
275
276 py_data_actor_ref
277 .register(trader_id, clock, cache)
278 .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
279
280 log::debug!(
281 "Internal PyDataActor registered: {}, state: {:?}",
282 py_data_actor_ref.is_registered(),
283 py_data_actor_ref.state()
284 );
285
286 Ok(())
287 })
288 .map_err(to_pyruntime_err)?;
289
290 Python::attach(|py| -> anyhow::Result<()> {
292 let py_actor = python_actor.bind(py);
293 let py_data_actor_ref = py_actor
294 .cast::<PyDataActor>()
295 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
296 py_data_actor_ref.borrow().register_in_global_registries();
297 Ok(())
298 })
299 .map_err(to_pyruntime_err)?;
300
301 self.kernel_mut()
302 .trader
303 .borrow_mut()
304 .add_actor_id_for_lifecycle(actor_id)
305 .map_err(to_pyruntime_err)?;
306
307 log::info!("Registered Python actor {actor_id}");
308 Ok(())
309 }
310
311 #[allow(
312 unsafe_code,
313 reason = "Required for Python strategy component registration"
314 )]
315 #[pyo3(name = "add_strategy_from_config")]
316 #[expect(clippy::needless_pass_by_value)]
317 fn py_add_strategy_from_config(
318 &mut self,
319 _py: Python,
320 config: ImportableStrategyConfig,
321 ) -> PyResult<()> {
322 log::debug!("`add_strategy_from_config` with: {config:?}");
323
324 let parts: Vec<&str> = config.strategy_path.split(':').collect();
326 if parts.len() != 2 {
327 return Err(to_pyvalue_err(
328 "strategy_path must be in format 'module.path:ClassName'",
329 ));
330 }
331 let (module_name, class_name) = (parts[0], parts[1]);
332
333 log::info!("Importing strategy from module: {module_name} class: {class_name}");
334
335 let (python_strategy, strategy_id) =
337 Python::attach(|py| -> anyhow::Result<(Py<PyAny>, StrategyId)> {
338 let strategy_module = py
339 .import(module_name)
340 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
341 let strategy_class = strategy_module
342 .getattr(class_name)
343 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
344
345 let config_instance =
346 create_config_instance(py, &config.config_path, &config.config)?;
347
348 let python_strategy = if let Some(config_obj) = config_instance.clone() {
349 strategy_class.call1((config_obj,))?
350 } else {
351 strategy_class.call0()?
352 };
353
354 log::debug!("Created Python strategy instance: {python_strategy:?}");
355
356 let mut py_strategy_ref = python_strategy
357 .extract::<PyRefMut<PyStrategy>>()
358 .map_err(Into::<PyErr>::into)
359 .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
360
361 if let Some(config_obj) = config_instance.as_ref() {
363 if let Ok(strategy_id) = config_obj.getattr("strategy_id")
364 && !strategy_id.is_none()
365 {
366 let strategy_id_val = if let Ok(sid) = strategy_id.extract::<StrategyId>() {
367 sid
368 } else if let Ok(sid_str) = strategy_id.extract::<String>() {
369 StrategyId::new_checked(&sid_str)?
370 } else {
371 anyhow::bail!("Invalid `strategy_id` type");
372 };
373 py_strategy_ref.set_strategy_id(strategy_id_val);
374 }
375
376 if let Some(val) = extract_bool_config_attr(config_obj, "log_events") {
377 py_strategy_ref.set_log_events(val);
378 }
379
380 if let Some(val) = extract_bool_config_attr(config_obj, "log_commands") {
381 py_strategy_ref.set_log_commands(val);
382 }
383 }
384
385 py_strategy_ref.set_python_instance(python_strategy.clone().unbind());
386
387 let strategy_id = py_strategy_ref.strategy_id();
388
389 Ok((python_strategy.unbind(), strategy_id))
390 })
391 .map_err(to_pyruntime_err)?;
392
393 if self
395 .kernel()
396 .trader
397 .borrow()
398 .strategy_ids()
399 .contains(&strategy_id)
400 {
401 return Err(to_pyruntime_err(format!(
402 "Strategy '{strategy_id}' is already registered"
403 )));
404 }
405
406 let trader_id = self.kernel().trader_id();
410 let cache = self.kernel().cache();
411 let portfolio = self.kernel().portfolio.clone();
412 let component_id = ComponentId::new(strategy_id.inner().as_str());
413 let clock = self
414 .kernel_mut()
415 .trader
416 .borrow_mut()
417 .create_component_clock(component_id);
418
419 Python::attach(|py| -> anyhow::Result<()> {
421 let py_strategy = python_strategy.bind(py);
422 let mut py_strategy_ref = py_strategy
423 .extract::<PyRefMut<PyStrategy>>()
424 .map_err(Into::<PyErr>::into)
425 .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
426
427 py_strategy_ref
428 .register(trader_id, clock, cache, portfolio)
429 .map_err(|e| anyhow::anyhow!("Failed to register PyStrategy: {e}"))?;
430
431 log::debug!(
432 "Internal PyStrategy registered: {}",
433 py_strategy_ref.is_registered()
434 );
435
436 Ok(())
437 })
438 .map_err(to_pyruntime_err)?;
439
440 Python::attach(|py| -> anyhow::Result<()> {
442 let py_strategy = python_strategy.bind(py);
443 let py_strategy_ref = py_strategy
444 .cast::<PyStrategy>()
445 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyStrategy: {e}"))?;
446 py_strategy_ref.borrow().register_in_global_registries();
447 Ok(())
448 })
449 .map_err(to_pyruntime_err)?;
450
451 Python::attach(|py| {
456 let py_strategy = python_strategy.bind(py);
457 if let Ok(claims) = py_strategy.getattr("external_order_claims")
458 && !claims.is_none()
459 && claims.len().unwrap_or(0) > 0
460 {
461 log::warn!(
462 "Strategy '{strategy_id}' has external_order_claims configured, \
463 but these are not yet supported for Python strategies on the Rust backend"
464 );
465 }
466 });
467
468 self.kernel_mut()
469 .trader
470 .borrow_mut()
471 .add_strategy_id_with_subscriptions::<PyStrategyInner>(strategy_id)
472 .map_err(to_pyruntime_err)?;
473
474 log::info!("Registered Python strategy {strategy_id}");
475 Ok(())
476 }
477
478 #[cfg(feature = "examples")]
483 #[pyo3(name = "add_native_strategy")]
484 fn py_add_native_strategy(&mut self, config: &Bound<'_, PyAny>) -> PyResult<()> {
485 use nautilus_trading::examples::strategies::{
486 DeltaNeutralVol, DeltaNeutralVolConfig, EmaCross, EmaCrossConfig, GridMarketMaker,
487 GridMarketMakerConfig, HurstVpinDirectional, HurstVpinDirectionalConfig,
488 };
489
490 if let Ok(config) = config.extract::<EmaCrossConfig>() {
491 self.add_strategy(EmaCross::from_config(config))
492 .map_err(to_pyruntime_err)
493 } else if let Ok(config) = config.extract::<GridMarketMakerConfig>() {
494 self.add_strategy(GridMarketMaker::new(config))
495 .map_err(to_pyruntime_err)
496 } else if let Ok(config) = config.extract::<DeltaNeutralVolConfig>() {
497 self.add_strategy(DeltaNeutralVol::new(config))
498 .map_err(to_pyruntime_err)
499 } else if let Ok(config) = config.extract::<HurstVpinDirectionalConfig>() {
500 self.add_strategy(HurstVpinDirectional::new(config))
501 .map_err(to_pyruntime_err)
502 } else {
503 let type_name = config.get_type().name()?;
504 Err(to_pytype_err(format!(
505 "Unsupported native strategy config type: {type_name}",
506 )))
507 }
508 }
509
510 #[cfg(feature = "examples")]
515 #[pyo3(name = "add_native_actor")]
516 fn py_add_native_actor(&mut self, config: &Bound<'_, PyAny>) -> PyResult<()> {
517 use nautilus_trading::examples::actors::{BookImbalanceActor, BookImbalanceActorConfig};
518
519 if let Ok(config) = config.extract::<BookImbalanceActorConfig>() {
520 self.add_actor(BookImbalanceActor::from_config(config))
521 .map_err(to_pyruntime_err)
522 } else {
523 let type_name = config.get_type().name()?;
524 Err(to_pytype_err(format!(
525 "Unsupported native actor config type: {type_name}",
526 )))
527 }
528 }
529
530 #[allow(
531 unsafe_code,
532 reason = "Required for Python exec algorithm component registration"
533 )]
534 #[pyo3(name = "add_exec_algorithm_from_config")]
535 #[expect(clippy::needless_pass_by_value)]
536 fn py_add_exec_algorithm_from_config(
537 &mut self,
538 _py: Python,
539 config: ImportableExecAlgorithmConfig,
540 ) -> PyResult<()> {
541 if self.is_running() {
542 return Err(to_pyruntime_err(
543 "Cannot add exec algorithm while node is running",
544 ));
545 }
546
547 log::debug!("`add_exec_algorithm_from_config` with: {config:?}");
548
549 let parts: Vec<&str> = config.exec_algorithm_path.split(':').collect();
550 if parts.len() != 2 {
551 return Err(to_pyvalue_err(
552 "exec_algorithm_path must be in format 'module.path:ClassName'",
553 ));
554 }
555 let (module_name, class_name) = (parts[0], parts[1]);
556
557 log::info!("Importing exec algorithm from module: {module_name} class: {class_name}");
558
559 let (python_exec_algorithm, actor_id) =
561 Python::attach(|py| -> anyhow::Result<(Py<PyAny>, ActorId)> {
562 let algo_module = py
563 .import(module_name)
564 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
565 let algo_class = algo_module
566 .getattr(class_name)
567 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
568
569 let config_instance =
570 create_config_instance(py, &config.config_path, &config.config)?;
571
572 let python_exec_algorithm = if let Some(config_obj) = config_instance.clone() {
573 algo_class.call1((config_obj,))?
574 } else {
575 algo_class.call0()?
576 };
577
578 log::debug!("Created Python exec algorithm instance: {python_exec_algorithm:?}");
579
580 let mut py_data_actor_ref = python_exec_algorithm
581 .extract::<PyRefMut<PyDataActor>>()
582 .map_err(Into::<PyErr>::into)
583 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
584
585 if let Some(config_obj) = config_instance.as_ref() {
587 let id_attr = config_obj
588 .getattr("exec_algorithm_id")
589 .ok()
590 .filter(|v| !v.is_none())
591 .or_else(|| config_obj.getattr("actor_id").ok().filter(|v| !v.is_none()));
592
593 if let Some(id_value) = id_attr {
594 let actor_id_val = if let Ok(eaid) = id_value.extract::<ExecAlgorithmId>() {
595 ActorId::new(eaid.inner().as_str())
596 } else if let Ok(aid) = id_value.extract::<ActorId>() {
597 aid
598 } else if let Ok(aid_str) = id_value.extract::<String>() {
599 ActorId::new_checked(&aid_str)?
600 } else {
601 anyhow::bail!("Invalid `exec_algorithm_id`/`actor_id` type");
602 };
603 py_data_actor_ref.set_actor_id(actor_id_val);
604 }
605
606 if let Some(val) = extract_bool_config_attr(config_obj, "log_events") {
607 py_data_actor_ref.set_log_events(val);
608 }
609
610 if let Some(val) = extract_bool_config_attr(config_obj, "log_commands") {
611 py_data_actor_ref.set_log_commands(val);
612 }
613 }
614
615 py_data_actor_ref.set_python_instance(python_exec_algorithm.clone().unbind());
616
617 let actor_id = py_data_actor_ref.actor_id();
618
619 Ok((python_exec_algorithm.unbind(), actor_id))
620 })
621 .map_err(to_pyruntime_err)?;
622
623 let exec_algorithm_id = ExecAlgorithmId::from(actor_id.inner().as_str());
624
625 if self
626 .kernel()
627 .trader
628 .borrow()
629 .exec_algorithm_ids()
630 .contains(&exec_algorithm_id)
631 {
632 return Err(to_pyruntime_err(format!(
633 "Execution algorithm '{exec_algorithm_id}' is already registered"
634 )));
635 }
636
637 let trader_id = self.kernel().trader_id();
641 let cache = self.kernel().cache();
642 let component_id = ComponentId::new(actor_id.inner().as_str());
643 let clock = self
644 .kernel_mut()
645 .trader
646 .borrow_mut()
647 .create_component_clock(component_id);
648
649 Python::attach(|py| -> anyhow::Result<()> {
651 let py_algo = python_exec_algorithm.bind(py);
652 let mut py_data_actor_ref = py_algo
653 .extract::<PyRefMut<PyDataActor>>()
654 .map_err(Into::<PyErr>::into)
655 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
656
657 py_data_actor_ref
658 .register(trader_id, clock, cache)
659 .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
660
661 log::debug!(
662 "Internal PyDataActor registered: {}, state: {:?}",
663 py_data_actor_ref.is_registered(),
664 py_data_actor_ref.state()
665 );
666
667 Ok(())
668 })
669 .map_err(to_pyruntime_err)?;
670
671 Python::attach(|py| -> anyhow::Result<()> {
673 let py_algo = python_exec_algorithm.bind(py);
674 let py_data_actor_ref = py_algo
675 .cast::<PyDataActor>()
676 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
677 py_data_actor_ref.borrow().register_in_global_registries();
678 Ok(())
679 })
680 .map_err(to_pyruntime_err)?;
681
682 self.kernel_mut()
683 .trader
684 .borrow_mut()
685 .add_exec_algorithm_id_for_lifecycle(exec_algorithm_id)
686 .map_err(to_pyruntime_err)?;
687
688 log::info!("Registered Python exec algorithm {exec_algorithm_id}");
689 Ok(())
690 }
691
692 fn __repr__(&self) -> String {
693 format!(
694 "LiveNode(trader_id={}, environment={:?}, running={})",
695 self.trader_id(),
696 self.environment(),
697 self.is_running()
698 )
699 }
700}
701
702#[derive(Debug)]
705#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
706#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")]
707pub struct LiveNodeBuilderPy {
708 inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
709}
710
711#[pyo3_stub_gen::derive::gen_stub_pymethods]
712#[pymethods]
713impl LiveNodeBuilderPy {
714 #[pyo3(name = "with_instance_id")]
715 fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
716 let mut inner_ref = self.inner.borrow_mut();
717 if let Some(builder) = inner_ref.take() {
718 *inner_ref = Some(builder.with_instance_id(instance_id));
719 Ok(Self {
720 inner: self.inner.clone(),
721 })
722 } else {
723 Err(to_pyruntime_err("Builder already consumed"))
724 }
725 }
726
727 #[pyo3(name = "with_load_state")]
728 fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
729 let mut inner_ref = self.inner.borrow_mut();
730 if let Some(builder) = inner_ref.take() {
731 *inner_ref = Some(builder.with_load_state(load_state));
732 Ok(Self {
733 inner: self.inner.clone(),
734 })
735 } else {
736 Err(to_pyruntime_err("Builder already consumed"))
737 }
738 }
739
740 #[pyo3(name = "with_save_state")]
741 fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
742 let mut inner_ref = self.inner.borrow_mut();
743 if let Some(builder) = inner_ref.take() {
744 *inner_ref = Some(builder.with_save_state(save_state));
745 Ok(Self {
746 inner: self.inner.clone(),
747 })
748 } else {
749 Err(to_pyruntime_err("Builder already consumed"))
750 }
751 }
752
753 #[pyo3(name = "with_timeout_connection")]
754 fn py_with_timeout_connection(&self, timeout_secs: u64) -> PyResult<Self> {
755 let mut inner_ref = self.inner.borrow_mut();
756 if let Some(builder) = inner_ref.take() {
757 *inner_ref = Some(builder.with_timeout_connection(timeout_secs));
758 Ok(Self {
759 inner: self.inner.clone(),
760 })
761 } else {
762 Err(to_pyruntime_err("Builder already consumed"))
763 }
764 }
765
766 #[pyo3(name = "with_timeout_reconciliation")]
767 fn py_with_timeout_reconciliation(&self, timeout_secs: u64) -> PyResult<Self> {
768 let mut inner_ref = self.inner.borrow_mut();
769 if let Some(builder) = inner_ref.take() {
770 *inner_ref = Some(builder.with_timeout_reconciliation(timeout_secs));
771 Ok(Self {
772 inner: self.inner.clone(),
773 })
774 } else {
775 Err(to_pyruntime_err("Builder already consumed"))
776 }
777 }
778
779 #[pyo3(name = "with_timeout_portfolio")]
780 fn py_with_timeout_portfolio(&self, timeout_secs: u64) -> PyResult<Self> {
781 let mut inner_ref = self.inner.borrow_mut();
782 if let Some(builder) = inner_ref.take() {
783 *inner_ref = Some(builder.with_timeout_portfolio(timeout_secs));
784 Ok(Self {
785 inner: self.inner.clone(),
786 })
787 } else {
788 Err(to_pyruntime_err("Builder already consumed"))
789 }
790 }
791
792 #[pyo3(name = "with_timeout_disconnection_secs")]
793 fn py_with_timeout_disconnection_secs(&self, timeout_secs: u64) -> PyResult<Self> {
794 let mut inner_ref = self.inner.borrow_mut();
795 if let Some(builder) = inner_ref.take() {
796 *inner_ref = Some(builder.with_timeout_disconnection_secs(timeout_secs));
797 Ok(Self {
798 inner: self.inner.clone(),
799 })
800 } else {
801 Err(to_pyruntime_err("Builder already consumed"))
802 }
803 }
804
805 #[pyo3(name = "with_delay_post_stop_secs")]
806 fn py_with_delay_post_stop_secs(&self, delay_secs: u64) -> PyResult<Self> {
807 let mut inner_ref = self.inner.borrow_mut();
808 if let Some(builder) = inner_ref.take() {
809 *inner_ref = Some(builder.with_delay_post_stop_secs(delay_secs));
810 Ok(Self {
811 inner: self.inner.clone(),
812 })
813 } else {
814 Err(to_pyruntime_err("Builder already consumed"))
815 }
816 }
817
818 #[pyo3(name = "with_delay_shutdown_secs")]
819 fn py_with_delay_shutdown_secs(&self, delay_secs: u64) -> PyResult<Self> {
820 let mut inner_ref = self.inner.borrow_mut();
821 if let Some(builder) = inner_ref.take() {
822 *inner_ref = Some(builder.with_delay_shutdown_secs(delay_secs));
823 Ok(Self {
824 inner: self.inner.clone(),
825 })
826 } else {
827 Err(to_pyruntime_err("Builder already consumed"))
828 }
829 }
830
831 #[pyo3(name = "with_reconciliation")]
832 fn py_with_reconciliation(&self, reconciliation: bool) -> PyResult<Self> {
833 let mut inner_ref = self.inner.borrow_mut();
834 if let Some(builder) = inner_ref.take() {
835 *inner_ref = Some(builder.with_reconciliation(reconciliation));
836 Ok(Self {
837 inner: self.inner.clone(),
838 })
839 } else {
840 Err(to_pyruntime_err("Builder already consumed"))
841 }
842 }
843
844 #[pyo3(name = "with_reconciliation_lookback_mins")]
845 fn py_with_reconciliation_lookback_mins(&self, mins: u32) -> PyResult<Self> {
846 let mut inner_ref = self.inner.borrow_mut();
847 if let Some(builder) = inner_ref.take() {
848 *inner_ref = Some(builder.with_reconciliation_lookback_mins(mins));
849 Ok(Self {
850 inner: self.inner.clone(),
851 })
852 } else {
853 Err(to_pyruntime_err("Builder already consumed"))
854 }
855 }
856
857 #[pyo3(name = "with_cache_config")]
858 fn py_with_cache_config(&self, config: CacheConfig) -> PyResult<Self> {
859 let mut inner_ref = self.inner.borrow_mut();
860 if let Some(builder) = inner_ref.take() {
861 *inner_ref = Some(builder.with_cache_config(config));
862 Ok(Self {
863 inner: self.inner.clone(),
864 })
865 } else {
866 Err(to_pyruntime_err("Builder already consumed"))
867 }
868 }
869
870 #[pyo3(name = "with_portfolio_config")]
871 fn py_with_portfolio_config(&self, config: PortfolioConfig) -> PyResult<Self> {
872 let mut inner_ref = self.inner.borrow_mut();
873 if let Some(builder) = inner_ref.take() {
874 *inner_ref = Some(builder.with_portfolio_config(config));
875 Ok(Self {
876 inner: self.inner.clone(),
877 })
878 } else {
879 Err(to_pyruntime_err("Builder already consumed"))
880 }
881 }
882
883 #[pyo3(name = "with_data_engine_config")]
884 fn py_with_data_engine_config(&self, config: LiveDataEngineConfig) -> PyResult<Self> {
885 let mut inner_ref = self.inner.borrow_mut();
886 if let Some(builder) = inner_ref.take() {
887 *inner_ref = Some(builder.with_data_engine_config(config));
888 Ok(Self {
889 inner: self.inner.clone(),
890 })
891 } else {
892 Err(to_pyruntime_err("Builder already consumed"))
893 }
894 }
895
896 #[pyo3(name = "with_risk_engine_config")]
897 fn py_with_risk_engine_config(&self, config: LiveRiskEngineConfig) -> PyResult<Self> {
898 let mut inner_ref = self.inner.borrow_mut();
899 if let Some(builder) = inner_ref.take() {
900 *inner_ref = Some(builder.with_risk_engine_config(config));
901 Ok(Self {
902 inner: self.inner.clone(),
903 })
904 } else {
905 Err(to_pyruntime_err("Builder already consumed"))
906 }
907 }
908
909 #[pyo3(name = "with_exec_engine_config")]
910 fn py_with_exec_engine_config(&self, config: LiveExecEngineConfig) -> PyResult<Self> {
911 let mut inner_ref = self.inner.borrow_mut();
912 if let Some(builder) = inner_ref.take() {
913 *inner_ref = Some(builder.with_exec_engine_config(config));
914 Ok(Self {
915 inner: self.inner.clone(),
916 })
917 } else {
918 Err(to_pyruntime_err("Builder already consumed"))
919 }
920 }
921
922 #[pyo3(name = "with_logging")]
923 fn py_with_logging(&self, logging: LoggerConfig) -> PyResult<Self> {
924 let mut inner_ref = self.inner.borrow_mut();
925 if let Some(builder) = inner_ref.take() {
926 *inner_ref = Some(builder.with_logging(logging));
927 Ok(Self {
928 inner: self.inner.clone(),
929 })
930 } else {
931 Err(to_pyruntime_err("Builder already consumed"))
932 }
933 }
934
935 #[pyo3(name = "add_data_client")]
936 #[expect(clippy::needless_pass_by_value)]
937 fn py_add_data_client(
938 &self,
939 name: Option<String>,
940 factory: Py<PyAny>,
941 config: Py<PyAny>,
942 ) -> PyResult<Self> {
943 let mut inner_ref = self.inner.borrow_mut();
944 if let Some(builder) = inner_ref.take() {
945 Python::attach(|py| -> PyResult<Self> {
946 let registry = get_global_pyo3_registry();
948
949 let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
950 let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
951
952 let factory_name = factory
954 .getattr(py, "name")?
955 .call0(py)?
956 .extract::<String>(py)?;
957 let client_name = name.unwrap_or(factory_name);
958
959 match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
961 Ok(updated_builder) => {
962 *inner_ref = Some(updated_builder);
963 Ok(Self {
964 inner: self.inner.clone(),
965 })
966 }
967 Err(e) => Err(to_pyruntime_err(format!("Failed to add data client: {e}"))),
968 }
969 })
970 } else {
971 Err(to_pyruntime_err("Builder already consumed"))
972 }
973 }
974
975 #[pyo3(name = "add_exec_client")]
976 #[expect(clippy::needless_pass_by_value)]
977 fn py_add_exec_client(
978 &self,
979 name: Option<String>,
980 factory: Py<PyAny>,
981 config: Py<PyAny>,
982 ) -> PyResult<Self> {
983 let mut inner_ref = self.inner.borrow_mut();
984 if let Some(builder) = inner_ref.take() {
985 Python::attach(|py| -> PyResult<Self> {
986 let registry = get_global_pyo3_registry();
987
988 let boxed_factory = registry.extract_exec_factory(py, factory.clone_ref(py))?;
989 let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
990
991 let factory_name = factory
992 .getattr(py, "name")?
993 .call0(py)?
994 .extract::<String>(py)?;
995 let client_name = name.unwrap_or(factory_name);
996
997 match builder.add_exec_client(Some(client_name), boxed_factory, boxed_config) {
998 Ok(updated_builder) => {
999 *inner_ref = Some(updated_builder);
1000 Ok(Self {
1001 inner: self.inner.clone(),
1002 })
1003 }
1004 Err(e) => Err(to_pyruntime_err(format!("Failed to add exec client: {e}"))),
1005 }
1006 })
1007 } else {
1008 Err(to_pyruntime_err("Builder already consumed"))
1009 }
1010 }
1011
1012 #[pyo3(name = "build")]
1013 fn py_build(&self) -> PyResult<LiveNode> {
1014 let mut inner_ref = self.inner.borrow_mut();
1015 if let Some(builder) = inner_ref.take() {
1016 match builder.build() {
1017 Ok(node) => Ok(node),
1018 Err(e) => Err(to_pyruntime_err(e)),
1019 }
1020 } else {
1021 Err(to_pyruntime_err("Builder already consumed"))
1022 }
1023 }
1024
1025 fn __repr__(&self) -> String {
1026 format!("{self:?}")
1027 }
1028}
1029
1030fn create_config_instance<'py>(
1039 py: Python<'py>,
1040 config_path: &str,
1041 config: &HashMap<String, serde_json::Value>,
1042) -> anyhow::Result<Option<Bound<'py, PyAny>>> {
1043 if config_path.is_empty() && config.is_empty() {
1044 log::debug!("No config_path or empty config, using None");
1045 return Ok(None);
1046 }
1047
1048 let config_parts: Vec<&str> = config_path.split(':').collect();
1049 if config_parts.len() != 2 {
1050 anyhow::bail!("config_path must be in format 'module.path:ClassName', was {config_path}");
1051 }
1052 let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
1053
1054 log::debug!(
1055 "Importing config class from module: {config_module_name} class: {config_class_name}"
1056 );
1057
1058 let config_module = py
1059 .import(config_module_name)
1060 .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
1061 let config_class = config_module
1062 .getattr(config_class_name)
1063 .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
1064
1065 let py_dict = PyDict::new(py);
1067
1068 for (key, value) in config {
1069 let json_str = serde_json::to_string(value)
1070 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
1071 let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
1072 py_dict.set_item(key, py_value)?;
1073 }
1074
1075 log::debug!("Created config dict: {py_dict:?}");
1076
1077 let config_instance = match config_class.call((), Some(&py_dict)) {
1079 Ok(instance) => {
1080 log::debug!("Created config instance with kwargs");
1081 instance
1082 }
1083 Err(kwargs_err) => {
1084 log::debug!("Failed to create config with kwargs: {kwargs_err}");
1085
1086 match config_class.call0() {
1087 Ok(instance) => {
1088 log::debug!("Created default config instance, setting attributes");
1089 for (key, value) in config {
1090 let json_str = serde_json::to_string(value).map_err(|e| {
1091 anyhow::anyhow!("Failed to serialize config value: {e}")
1092 })?;
1093 let py_value = PyModule::import(py, "json")?.call_method(
1094 "loads",
1095 (json_str,),
1096 None,
1097 )?;
1098
1099 if let Err(setattr_err) = instance.setattr(key, py_value) {
1100 log::warn!("Failed to set attribute {key}: {setattr_err}");
1101 }
1102 }
1103
1104 if instance.hasattr("__post_init__")? {
1107 instance.call_method0("__post_init__")?;
1108 }
1109
1110 instance
1111 }
1112 Err(default_err) => {
1113 anyhow::bail!(
1114 "Failed to create config instance. \
1115 Tried kwargs: {kwargs_err}, default: {default_err}"
1116 );
1117 }
1118 }
1119 }
1120 };
1121
1122 log::debug!("Created config instance: {config_instance:?}");
1123
1124 Ok(Some(config_instance))
1125}
1126
1127fn extract_bool_config_attr(config_obj: &Bound<'_, PyAny>, attr: &str) -> Option<bool> {
1132 config_obj
1133 .getattr(attr)
1134 .ok()
1135 .and_then(|val| val.extract::<bool>().ok())
1136}
1137
1138#[cfg(all(test, feature = "python"))]
1139mod tests {
1140 use std::{
1141 any::Any,
1142 cell::RefCell,
1143 collections::HashMap,
1144 ffi::CString,
1145 fmt::Debug,
1146 rc::Rc,
1147 sync::{
1148 Arc,
1149 atomic::{AtomicBool, AtomicUsize, Ordering},
1150 },
1151 time::Duration,
1152 };
1153
1154 use async_trait::async_trait;
1155 use nautilus_common::{
1156 cache::Cache,
1157 clients::DataClient,
1158 clock::Clock,
1159 enums::Environment,
1160 factories::{ClientConfig, DataClientFactory},
1161 live::runner::get_data_event_sender,
1162 messages::{
1163 DataEvent, DataResponse,
1164 data::{BarsResponse, RequestBars},
1165 },
1166 msgbus::get_message_bus,
1167 };
1168 use nautilus_core::UnixNanos;
1169 use nautilus_model::{
1170 data::{Bar, BarType},
1171 identifiers::{ClientId, TraderId, Venue},
1172 types::{Price, Quantity},
1173 };
1174 use nautilus_trading::{ImportableStrategyConfig, python::strategy::PyStrategy};
1175 use pyo3::{
1176 Python,
1177 types::{PyAnyMethods, PyDict, PyModule, PyModuleMethods},
1178 };
1179
1180 use super::LiveNode;
1181 #[derive(Debug, Default)]
1182 struct TestDataClientConfig;
1183
1184 impl ClientConfig for TestDataClientConfig {
1185 fn as_any(&self) -> &dyn Any {
1186 self
1187 }
1188 }
1189
1190 #[derive(Debug)]
1191 struct TestHistoricalBarsDataClientFactory {
1192 request_count: Arc<AtomicUsize>,
1193 response_sent_count: Arc<AtomicUsize>,
1194 handler_visible_count: Arc<AtomicUsize>,
1195 }
1196
1197 impl TestHistoricalBarsDataClientFactory {
1198 fn new(
1199 request_count: Arc<AtomicUsize>,
1200 response_sent_count: Arc<AtomicUsize>,
1201 handler_visible_count: Arc<AtomicUsize>,
1202 ) -> Self {
1203 Self {
1204 request_count,
1205 response_sent_count,
1206 handler_visible_count,
1207 }
1208 }
1209 }
1210
1211 impl DataClientFactory for TestHistoricalBarsDataClientFactory {
1212 fn create(
1213 &self,
1214 name: &str,
1215 _config: &dyn ClientConfig,
1216 _cache: Rc<RefCell<Cache>>,
1217 _clock: Rc<RefCell<dyn Clock>>,
1218 ) -> anyhow::Result<Box<dyn DataClient>> {
1219 Ok(Box::new(TestHistoricalBarsDataClient::new(
1220 ClientId::from(name),
1221 Venue::from("SIM"),
1222 self.request_count.clone(),
1223 self.response_sent_count.clone(),
1224 self.handler_visible_count.clone(),
1225 )))
1226 }
1227
1228 fn name(&self) -> &'static str {
1229 "TEST_DATA"
1230 }
1231
1232 fn config_type(&self) -> &'static str {
1233 "TestDataClientConfig"
1234 }
1235 }
1236
1237 #[derive(Debug)]
1238 struct TestHistoricalBarsDataClient {
1239 client_id: ClientId,
1240 venue: Venue,
1241 connected: Arc<AtomicBool>,
1242 request_count: Arc<AtomicUsize>,
1243 response_sent_count: Arc<AtomicUsize>,
1244 handler_visible_count: Arc<AtomicUsize>,
1245 }
1246
1247 impl TestHistoricalBarsDataClient {
1248 fn new(
1249 client_id: ClientId,
1250 venue: Venue,
1251 request_count: Arc<AtomicUsize>,
1252 response_sent_count: Arc<AtomicUsize>,
1253 handler_visible_count: Arc<AtomicUsize>,
1254 ) -> Self {
1255 Self {
1256 client_id,
1257 venue,
1258 connected: Arc::new(AtomicBool::new(false)),
1259 request_count,
1260 response_sent_count,
1261 handler_visible_count,
1262 }
1263 }
1264
1265 fn make_bar(bar_type: BarType) -> Bar {
1266 Bar::new(
1267 bar_type,
1268 Price::from("1.0000"),
1269 Price::from("1.1000"),
1270 Price::from("0.9000"),
1271 Price::from("1.0500"),
1272 Quantity::from("1000"),
1273 UnixNanos::from(1_700_000_000_000_000_000u64),
1274 UnixNanos::from(1_700_000_000_000_000_001u64),
1275 )
1276 }
1277 }
1278
1279 #[async_trait(?Send)]
1280 impl DataClient for TestHistoricalBarsDataClient {
1281 fn client_id(&self) -> ClientId {
1282 self.client_id
1283 }
1284
1285 fn venue(&self) -> Option<Venue> {
1286 Some(self.venue)
1287 }
1288
1289 fn start(&mut self) -> anyhow::Result<()> {
1290 Ok(())
1291 }
1292
1293 fn stop(&mut self) -> anyhow::Result<()> {
1294 Ok(())
1295 }
1296
1297 fn reset(&mut self) -> anyhow::Result<()> {
1298 Ok(())
1299 }
1300
1301 fn dispose(&mut self) -> anyhow::Result<()> {
1302 Ok(())
1303 }
1304
1305 fn is_connected(&self) -> bool {
1306 self.connected.load(Ordering::Relaxed)
1307 }
1308
1309 fn is_disconnected(&self) -> bool {
1310 !self.is_connected()
1311 }
1312
1313 async fn connect(&mut self) -> anyhow::Result<()> {
1314 self.connected.store(true, Ordering::Relaxed);
1315 Ok(())
1316 }
1317
1318 async fn disconnect(&mut self) -> anyhow::Result<()> {
1319 self.connected.store(false, Ordering::Relaxed);
1320 Ok(())
1321 }
1322
1323 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1324 self.request_count.fetch_add(1, Ordering::Relaxed);
1325
1326 if get_message_bus()
1327 .borrow()
1328 .get_response_handler(&request.request_id)
1329 .is_some()
1330 {
1331 self.handler_visible_count.fetch_add(1, Ordering::Relaxed);
1332 }
1333
1334 let sender = get_data_event_sender();
1335 let client_id = self.client_id;
1336 let response_sent_count = self.response_sent_count.clone();
1337 let response = BarsResponse::new(
1338 request.request_id,
1339 client_id,
1340 request.bar_type,
1341 vec![Self::make_bar(request.bar_type)],
1342 None,
1343 None,
1344 UnixNanos::from(1_700_000_000_000_000_002u64),
1345 None,
1346 );
1347
1348 tokio::spawn(async move {
1349 tokio::time::sleep(Duration::from_millis(10)).await;
1350 response_sent_count.fetch_add(1, Ordering::Relaxed);
1351 sender
1352 .send(DataEvent::Response(DataResponse::Bars(response)))
1353 .expect("test bars response should send");
1354 });
1355
1356 Ok(())
1357 }
1358 }
1359
1360 fn install_tracking_strategy_module(py: Python<'_>, module_name: &str) {
1361 let module = PyModule::new(py, module_name).expect("test module should create");
1362 module
1363 .setattr("Strategy", py.get_type::<PyStrategy>())
1364 .expect("Strategy type should bind");
1365 module
1366 .setattr("BarType", py.get_type::<BarType>())
1367 .expect("BarType type should bind");
1368 module
1369 .setattr("RESULTS", PyDict::new(py))
1370 .expect("RESULTS should bind");
1371
1372 let code = CString::new(
1373 r#"
1374RESULTS["on_start"] = 0
1375RESULTS["on_historical_bars"] = 0
1376RESULTS["historical_bar_count"] = 0
1377RESULTS["last_request_id"] = ""
1378
1379class HistoricalBarsStrategy(Strategy):
1380 def __init__(self):
1381 super().__init__()
1382 self.bar_type = BarType.from_str("AUDUSD.SIM-1-MINUTE-LAST-EXTERNAL")
1383
1384 def on_start(self):
1385 RESULTS["on_start"] += 1
1386 RESULTS["last_request_id"] = self.request_bars(self.bar_type)
1387
1388 def on_stop(self):
1389 pass
1390
1391 def on_historical_bars(self, bars):
1392 RESULTS["on_historical_bars"] += 1
1393 RESULTS["historical_bar_count"] += len(bars)
1394"#,
1395 )
1396 .expect("python test code should be valid CString");
1397
1398 py.run(code.as_c_str(), Some(&module.dict()), None)
1399 .expect("test strategy code should execute");
1400
1401 let sys_modules = py
1402 .import("sys")
1403 .expect("sys should import")
1404 .getattr("modules")
1405 .expect("sys.modules should exist");
1406 sys_modules
1407 .set_item(module_name, module)
1408 .expect("test strategy module should register");
1409 }
1410
1411 fn get_results(py: Python<'_>, module_name: &str) -> (usize, usize, usize) {
1412 let module = py
1413 .import(module_name)
1414 .expect("test strategy module should import");
1415 let results_obj = module.getattr("RESULTS").expect("RESULTS should exist");
1416 let results = results_obj
1417 .cast::<PyDict>()
1418 .expect("RESULTS should be a dict");
1419
1420 let on_start = results
1421 .get_item("on_start")
1422 .expect("on_start key should exist")
1423 .extract::<usize>()
1424 .expect("on_start should extract");
1425 let on_historical_bars = results
1426 .get_item("on_historical_bars")
1427 .expect("on_historical_bars key should exist")
1428 .extract::<usize>()
1429 .expect("on_historical_bars should extract");
1430 let historical_bar_count = results
1431 .get_item("historical_bar_count")
1432 .expect("historical_bar_count key should exist")
1433 .extract::<usize>()
1434 .expect("historical_bar_count should extract");
1435
1436 (on_start, on_historical_bars, historical_bar_count)
1437 }
1438
1439 #[tokio::test(flavor = "current_thread")]
1440 async fn test_live_node_pystrategy_request_bars_dispatches_on_historical_bars() {
1441 Python::initialize();
1442
1443 let module_name = "test_live_node_historical_bars_strategy";
1444 Python::attach(|py| install_tracking_strategy_module(py, module_name));
1445
1446 let request_count = Arc::new(AtomicUsize::new(0));
1447 let response_sent_count = Arc::new(AtomicUsize::new(0));
1448 let handler_visible_count = Arc::new(AtomicUsize::new(0));
1449 let factory = TestHistoricalBarsDataClientFactory::new(
1450 request_count.clone(),
1451 response_sent_count.clone(),
1452 handler_visible_count.clone(),
1453 );
1454 let config = TestDataClientConfig;
1455
1456 let mut node = LiveNode::builder(TraderId::from("TESTER-001"), Environment::Sandbox)
1457 .unwrap()
1458 .with_reconciliation(false)
1459 .with_delay_post_stop_secs(0)
1460 .with_timeout_connection(1)
1461 .add_data_client(
1462 Some("TEST_DATA".to_string()),
1463 Box::new(factory),
1464 Box::new(config),
1465 )
1466 .unwrap()
1467 .build()
1468 .unwrap();
1469
1470 let importable = ImportableStrategyConfig {
1471 strategy_path: format!("{module_name}:HistoricalBarsStrategy"),
1472 config_path: String::new(),
1473 config: HashMap::new(),
1474 };
1475
1476 Python::attach(|py| {
1477 node.py_add_strategy_from_config(py, importable)
1478 .expect("strategy should register");
1479 });
1480
1481 let handle = node.handle();
1482 let stop_handle = handle.clone();
1483 let response_sent_count_for_stop = response_sent_count.clone();
1484
1485 tokio::spawn(async move {
1486 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
1487
1488 loop {
1489 if response_sent_count_for_stop.load(Ordering::Relaxed) == 1
1490 || tokio::time::Instant::now() >= deadline
1491 {
1492 break;
1493 }
1494 tokio::time::sleep(Duration::from_millis(20)).await;
1495 }
1496 tokio::time::sleep(Duration::from_millis(250)).await;
1497 stop_handle.stop();
1498 });
1499
1500 node.run().await.expect("node should run cleanly");
1501
1502 let (on_start, on_historical_bars, historical_bar_count) =
1503 Python::attach(|py| get_results(py, module_name));
1504
1505 assert_eq!(request_count.load(Ordering::Relaxed), 1);
1506 assert_eq!(handler_visible_count.load(Ordering::Relaxed), 1);
1507 assert_eq!(response_sent_count.load(Ordering::Relaxed), 1);
1508 assert_eq!(on_start, 1);
1509 assert_eq!(on_historical_bars, 1);
1510 assert_eq!(historical_bar_count, 1);
1511 }
1512}