1use std::{
17 any::Any,
18 cell::{RefCell, UnsafeCell},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 ops::{Deref, DerefMut},
23 rc::Rc,
24};
25
26use nautilus_core::{
27 from_pydict,
28 nanos::UnixNanos,
29 python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err},
30};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33 Block, Blockchain, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
34};
35use nautilus_model::{
36 data::{
37 Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick,
39 close::InstrumentClose,
40 option_chain::{OptionChainSlice, OptionGreeks},
41 },
42 enums::BookType,
43 identifiers::{ActorId, ClientId, InstrumentId, OptionSeriesId, TraderId, Venue},
44 instruments::{InstrumentAny, SyntheticInstrument},
45 orderbook::OrderBook,
46 python::{data::option_chain::PyStrikeRange, instruments::instrument_any_to_pyobject},
47};
48use pyo3::{prelude::*, types::PyDict};
49
50use crate::{
51 actor::{
52 Actor, DataActor,
53 data_actor::{DataActorConfig, DataActorCore, ImportableActorConfig},
54 registry::{get_actor_registry, try_get_actor_unchecked},
55 },
56 cache::Cache,
57 clock::Clock,
58 component::{Component, get_component_registry},
59 enums::ComponentState,
60 python::{cache::PyCache, clock::PyClock, logging::PyLogger},
61 signal::Signal,
62 timer::{TimeEvent, TimeEventCallback},
63};
64
65#[pyo3::pymethods]
66#[pyo3_stub_gen::derive::gen_stub_pymethods]
67impl DataActorConfig {
68 #[new]
70 #[pyo3(signature = (actor_id=None, log_events=true, log_commands=true))]
71 fn py_new(actor_id: Option<ActorId>, log_events: bool, log_commands: bool) -> Self {
72 Self {
73 actor_id,
74 log_events,
75 log_commands,
76 }
77 }
78}
79
80#[pyo3::pymethods]
81#[pyo3_stub_gen::derive::gen_stub_pymethods]
82impl ImportableActorConfig {
83 #[new]
85 #[expect(clippy::needless_pass_by_value)]
86 fn py_new(actor_path: String, config_path: String, config: Py<PyDict>) -> PyResult<Self> {
87 let json_config = Python::attach(|py| -> PyResult<HashMap<String, serde_json::Value>> {
88 let kwargs = PyDict::new(py);
89 kwargs.set_item("default", py.eval(pyo3::ffi::c_str!("str"), None, None)?)?;
90 let json_str: String = PyModule::import(py, "json")?
91 .call_method("dumps", (config.bind(py),), Some(&kwargs))?
92 .extract()?;
93
94 let json_value: serde_json::Value =
95 serde_json::from_str(&json_str).map_err(to_pyvalue_err)?;
96
97 if let serde_json::Value::Object(map) = json_value {
98 Ok(map.into_iter().collect())
99 } else {
100 Err(to_pyvalue_err("Config must be a dictionary"))
101 }
102 })?;
103
104 Ok(Self {
105 actor_path,
106 config_path,
107 config: json_config,
108 })
109 }
110
111 #[getter]
112 fn actor_path(&self) -> &String {
113 &self.actor_path
114 }
115
116 #[getter]
117 fn config_path(&self) -> &String {
118 &self.config_path
119 }
120
121 #[getter]
122 fn config(&self, py: Python<'_>) -> PyResult<Py<PyDict>> {
123 let py_dict = PyDict::new(py);
125
126 for (key, value) in &self.config {
127 let json_str = serde_json::to_string(value).map_err(to_pyvalue_err)?;
129 let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
130 py_dict.set_item(key, py_value)?;
131 }
132 Ok(py_dict.unbind())
133 }
134}
135
136pub struct PyDataActorInner {
142 core: DataActorCore,
143 py_self: Option<Py<PyAny>>,
144 clock: PyClock,
145 logger: PyLogger,
146}
147
148impl Debug for PyDataActorInner {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct(stringify!(PyDataActorInner))
151 .field("core", &self.core)
152 .field("py_self", &self.py_self.as_ref().map(|_| "<Py<PyAny>>"))
153 .field("clock", &self.clock)
154 .field("logger", &self.logger)
155 .finish()
156 }
157}
158
159impl Deref for PyDataActorInner {
160 type Target = DataActorCore;
161
162 fn deref(&self) -> &Self::Target {
163 &self.core
164 }
165}
166
167impl DerefMut for PyDataActorInner {
168 fn deref_mut(&mut self) -> &mut Self::Target {
169 &mut self.core
170 }
171}
172
173#[expect(clippy::needless_pass_by_ref_mut)]
174impl PyDataActorInner {
175 fn dispatch_on_start(&self) -> PyResult<()> {
176 if let Some(ref py_self) = self.py_self {
177 Python::attach(|py| py_self.call_method0(py, "on_start"))?;
178 }
179 Ok(())
180 }
181
182 fn dispatch_on_stop(&mut self) -> PyResult<()> {
183 if let Some(ref py_self) = self.py_self {
184 Python::attach(|py| py_self.call_method0(py, "on_stop"))?;
185 }
186 Ok(())
187 }
188
189 fn dispatch_on_resume(&mut self) -> PyResult<()> {
190 if let Some(ref py_self) = self.py_self {
191 Python::attach(|py| py_self.call_method0(py, "on_resume"))?;
192 }
193 Ok(())
194 }
195
196 fn dispatch_on_reset(&mut self) -> PyResult<()> {
197 if let Some(ref py_self) = self.py_self {
198 Python::attach(|py| py_self.call_method0(py, "on_reset"))?;
199 }
200 Ok(())
201 }
202
203 fn dispatch_on_dispose(&mut self) -> PyResult<()> {
204 if let Some(ref py_self) = self.py_self {
205 Python::attach(|py| py_self.call_method0(py, "on_dispose"))?;
206 }
207 Ok(())
208 }
209
210 fn dispatch_on_degrade(&mut self) -> PyResult<()> {
211 if let Some(ref py_self) = self.py_self {
212 Python::attach(|py| py_self.call_method0(py, "on_degrade"))?;
213 }
214 Ok(())
215 }
216
217 fn dispatch_on_fault(&mut self) -> PyResult<()> {
218 if let Some(ref py_self) = self.py_self {
219 Python::attach(|py| py_self.call_method0(py, "on_fault"))?;
220 }
221 Ok(())
222 }
223
224 fn dispatch_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
225 if let Some(ref py_self) = self.py_self {
226 Python::attach(|py| {
227 py_self.call_method1(py, "on_time_event", (event.into_py_any_unwrap(py),))
228 })?;
229 }
230 Ok(())
231 }
232
233 fn dispatch_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
234 if let Some(ref py_self) = self.py_self {
235 Python::attach(|py| py_self.call_method1(py, "on_data", (data,)))?;
236 }
237 Ok(())
238 }
239
240 fn dispatch_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
241 if let Some(ref py_self) = self.py_self {
242 Python::attach(|py| {
243 py_self.call_method1(py, "on_signal", (signal.clone().into_py_any_unwrap(py),))
244 })?;
245 }
246 Ok(())
247 }
248
249 fn dispatch_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
250 if let Some(ref py_self) = self.py_self {
251 Python::attach(|py| py_self.call_method1(py, "on_instrument", (instrument,)))?;
252 }
253 Ok(())
254 }
255
256 fn dispatch_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
257 if let Some(ref py_self) = self.py_self {
258 Python::attach(|py| {
259 py_self.call_method1(py, "on_quote", (quote.into_py_any_unwrap(py),))
260 })?;
261 }
262 Ok(())
263 }
264
265 fn dispatch_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
266 if let Some(ref py_self) = self.py_self {
267 Python::attach(|py| {
268 py_self.call_method1(py, "on_trade", (trade.into_py_any_unwrap(py),))
269 })?;
270 }
271 Ok(())
272 }
273
274 fn dispatch_on_bar(&mut self, bar: Bar) -> PyResult<()> {
275 if let Some(ref py_self) = self.py_self {
276 Python::attach(|py| py_self.call_method1(py, "on_bar", (bar.into_py_any_unwrap(py),)))?;
277 }
278 Ok(())
279 }
280
281 fn dispatch_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
282 if let Some(ref py_self) = self.py_self {
283 Python::attach(|py| {
284 py_self.call_method1(py, "on_book_deltas", (deltas.into_py_any_unwrap(py),))
285 })?;
286 }
287 Ok(())
288 }
289
290 fn dispatch_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
291 if let Some(ref py_self) = self.py_self {
292 Python::attach(|py| {
293 py_self.call_method1(py, "on_book", (book.clone().into_py_any_unwrap(py),))
294 })?;
295 }
296 Ok(())
297 }
298
299 fn dispatch_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
300 if let Some(ref py_self) = self.py_self {
301 Python::attach(|py| {
302 py_self.call_method1(py, "on_mark_price", (mark_price.into_py_any_unwrap(py),))
303 })?;
304 }
305 Ok(())
306 }
307
308 fn dispatch_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
309 if let Some(ref py_self) = self.py_self {
310 Python::attach(|py| {
311 py_self.call_method1(py, "on_index_price", (index_price.into_py_any_unwrap(py),))
312 })?;
313 }
314 Ok(())
315 }
316
317 fn dispatch_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
318 if let Some(ref py_self) = self.py_self {
319 Python::attach(|py| {
320 py_self.call_method1(
321 py,
322 "on_funding_rate",
323 (funding_rate.into_py_any_unwrap(py),),
324 )
325 })?;
326 }
327 Ok(())
328 }
329
330 fn dispatch_on_instrument_status(&mut self, data: InstrumentStatus) -> PyResult<()> {
331 if let Some(ref py_self) = self.py_self {
332 Python::attach(|py| {
333 py_self.call_method1(py, "on_instrument_status", (data.into_py_any_unwrap(py),))
334 })?;
335 }
336 Ok(())
337 }
338
339 fn dispatch_on_instrument_close(&mut self, update: InstrumentClose) -> PyResult<()> {
340 if let Some(ref py_self) = self.py_self {
341 Python::attach(|py| {
342 py_self.call_method1(py, "on_instrument_close", (update.into_py_any_unwrap(py),))
343 })?;
344 }
345 Ok(())
346 }
347
348 fn dispatch_on_option_greeks(&mut self, greeks: OptionGreeks) -> PyResult<()> {
349 if let Some(ref py_self) = self.py_self {
350 Python::attach(|py| {
351 py_self.call_method1(py, "on_option_greeks", (greeks.into_py_any_unwrap(py),))
352 })?;
353 }
354 Ok(())
355 }
356
357 fn dispatch_on_option_chain(&mut self, slice: OptionChainSlice) -> PyResult<()> {
358 if let Some(ref py_self) = self.py_self {
359 Python::attach(|py| {
360 py_self.call_method1(py, "on_option_chain", (slice.into_py_any_unwrap(py),))
361 })?;
362 }
363 Ok(())
364 }
365
366 fn dispatch_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
367 if let Some(ref py_self) = self.py_self {
368 Python::attach(|py| py_self.call_method1(py, "on_historical_data", (data,)))?;
369 }
370 Ok(())
371 }
372
373 fn dispatch_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
374 if let Some(ref py_self) = self.py_self {
375 Python::attach(|py| {
376 let py_quotes: Vec<_> = quotes
377 .into_iter()
378 .map(|q| q.into_py_any_unwrap(py))
379 .collect();
380 py_self.call_method1(py, "on_historical_quotes", (py_quotes,))
381 })?;
382 }
383 Ok(())
384 }
385
386 fn dispatch_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
387 if let Some(ref py_self) = self.py_self {
388 Python::attach(|py| {
389 let py_trades: Vec<_> = trades
390 .into_iter()
391 .map(|t| t.into_py_any_unwrap(py))
392 .collect();
393 py_self.call_method1(py, "on_historical_trades", (py_trades,))
394 })?;
395 }
396 Ok(())
397 }
398
399 fn dispatch_on_historical_funding_rates(
400 &mut self,
401 funding_rates: Vec<FundingRateUpdate>,
402 ) -> PyResult<()> {
403 if let Some(ref py_self) = self.py_self {
404 Python::attach(|py| {
405 let py_rates: Vec<_> = funding_rates
406 .into_iter()
407 .map(|r| r.into_py_any_unwrap(py))
408 .collect();
409 py_self.call_method1(py, "on_historical_funding_rates", (py_rates,))
410 })?;
411 }
412 Ok(())
413 }
414
415 fn dispatch_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
416 if let Some(ref py_self) = self.py_self {
417 Python::attach(|py| {
418 let py_bars: Vec<_> = bars.into_iter().map(|b| b.into_py_any_unwrap(py)).collect();
419 py_self.call_method1(py, "on_historical_bars", (py_bars,))
420 })?;
421 }
422 Ok(())
423 }
424
425 fn dispatch_on_historical_mark_prices(
426 &mut self,
427 mark_prices: Vec<MarkPriceUpdate>,
428 ) -> PyResult<()> {
429 if let Some(ref py_self) = self.py_self {
430 Python::attach(|py| {
431 let py_prices: Vec<_> = mark_prices
432 .into_iter()
433 .map(|p| p.into_py_any_unwrap(py))
434 .collect();
435 py_self.call_method1(py, "on_historical_mark_prices", (py_prices,))
436 })?;
437 }
438 Ok(())
439 }
440
441 fn dispatch_on_historical_index_prices(
442 &mut self,
443 index_prices: Vec<IndexPriceUpdate>,
444 ) -> PyResult<()> {
445 if let Some(ref py_self) = self.py_self {
446 Python::attach(|py| {
447 let py_prices: Vec<_> = index_prices
448 .into_iter()
449 .map(|p| p.into_py_any_unwrap(py))
450 .collect();
451 py_self.call_method1(py, "on_historical_index_prices", (py_prices,))
452 })?;
453 }
454 Ok(())
455 }
456
457 #[cfg(feature = "defi")]
458 fn dispatch_on_block(&mut self, block: Block) -> PyResult<()> {
459 if let Some(ref py_self) = self.py_self {
460 Python::attach(|py| {
461 py_self.call_method1(py, "on_block", (block.into_py_any_unwrap(py),))
462 })?;
463 }
464 Ok(())
465 }
466
467 #[cfg(feature = "defi")]
468 fn dispatch_on_pool(&mut self, pool: Pool) -> PyResult<()> {
469 if let Some(ref py_self) = self.py_self {
470 Python::attach(|py| {
471 py_self.call_method1(py, "on_pool", (pool.into_py_any_unwrap(py),))
472 })?;
473 }
474 Ok(())
475 }
476
477 #[cfg(feature = "defi")]
478 fn dispatch_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
479 if let Some(ref py_self) = self.py_self {
480 Python::attach(|py| {
481 py_self.call_method1(py, "on_pool_swap", (swap.into_py_any_unwrap(py),))
482 })?;
483 }
484 Ok(())
485 }
486
487 #[cfg(feature = "defi")]
488 fn dispatch_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
489 if let Some(ref py_self) = self.py_self {
490 Python::attach(|py| {
491 py_self.call_method1(
492 py,
493 "on_pool_liquidity_update",
494 (update.into_py_any_unwrap(py),),
495 )
496 })?;
497 }
498 Ok(())
499 }
500
501 #[cfg(feature = "defi")]
502 fn dispatch_on_pool_fee_collect(&mut self, collect: PoolFeeCollect) -> PyResult<()> {
503 if let Some(ref py_self) = self.py_self {
504 Python::attach(|py| {
505 py_self.call_method1(py, "on_pool_fee_collect", (collect.into_py_any_unwrap(py),))
506 })?;
507 }
508 Ok(())
509 }
510
511 #[cfg(feature = "defi")]
512 fn dispatch_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
513 if let Some(ref py_self) = self.py_self {
514 Python::attach(|py| {
515 py_self.call_method1(py, "on_pool_flash", (flash.into_py_any_unwrap(py),))
516 })?;
517 }
518 Ok(())
519 }
520}
521
522fn dict_to_params(
523 py: Python<'_>,
524 params: Option<Py<PyDict>>,
525) -> PyResult<Option<nautilus_core::Params>> {
526 match params {
527 Some(dict) => from_pydict(py, dict),
528 None => Ok(None),
529 }
530}
531
532#[allow(non_camel_case_types)]
538#[pyo3::pyclass(
539 module = "nautilus_trader.common",
540 name = "DataActor",
541 unsendable,
542 subclass
543)]
544#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")]
545pub struct PyDataActor {
546 inner: Rc<UnsafeCell<PyDataActorInner>>,
547}
548
549impl Debug for PyDataActor {
550 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
551 f.debug_struct(stringify!(PyDataActor))
552 .field("inner", &self.inner())
553 .finish()
554 }
555}
556
557impl PyDataActor {
558 #[inline]
565 #[allow(unsafe_code)]
566 pub(crate) fn inner(&self) -> &PyDataActorInner {
567 unsafe { &*self.inner.get() }
568 }
569
570 #[inline]
577 #[allow(unsafe_code, clippy::mut_from_ref)]
578 pub(crate) fn inner_mut(&self) -> &mut PyDataActorInner {
579 unsafe { &mut *self.inner.get() }
580 }
581}
582
583impl Deref for PyDataActor {
584 type Target = DataActorCore;
585
586 fn deref(&self) -> &Self::Target {
587 &self.inner().core
588 }
589}
590
591impl DerefMut for PyDataActor {
592 fn deref_mut(&mut self) -> &mut Self::Target {
593 &mut self.inner_mut().core
594 }
595}
596
597impl PyDataActor {
598 pub fn new(config: Option<DataActorConfig>) -> Self {
600 let config = config.unwrap_or_default();
601 let core = DataActorCore::new(config);
602 let clock = PyClock::new_test(); let logger = PyLogger::new(core.actor_id().as_str());
604
605 let inner = PyDataActorInner {
606 core,
607 py_self: None,
608 clock,
609 logger,
610 };
611
612 Self {
613 inner: Rc::new(UnsafeCell::new(inner)),
614 }
615 }
616
617 pub fn set_python_instance(&mut self, py_obj: Py<PyAny>) {
624 self.inner_mut().py_self = Some(py_obj);
625 }
626
627 pub fn set_actor_id(&mut self, actor_id: ActorId) {
634 let inner = self.inner_mut();
635 inner.core.config.actor_id = Some(actor_id);
636 inner.core.actor_id = actor_id;
637 }
638
639 pub fn set_log_events(&mut self, log_events: bool) {
641 self.inner_mut().core.config.log_events = log_events;
642 }
643
644 pub fn set_log_commands(&mut self, log_commands: bool) {
646 self.inner_mut().core.config.log_commands = log_commands;
647 }
648
649 pub fn mem_address(&self) -> String {
651 self.inner().core.mem_address()
652 }
653
654 pub fn is_registered(&self) -> bool {
656 self.inner().core.is_registered()
657 }
658
659 pub fn register(
665 &mut self,
666 trader_id: TraderId,
667 clock: Rc<RefCell<dyn Clock>>,
668 cache: Rc<RefCell<Cache>>,
669 ) -> anyhow::Result<()> {
670 let inner = self.inner_mut();
671 inner.core.register(trader_id, clock, cache)?;
672
673 inner.clock = PyClock::from_rc(inner.core.clock_rc());
674
675 let actor_id = inner.actor_id().inner();
677 let callback = TimeEventCallback::from(move |event: TimeEvent| {
678 if let Some(mut actor) = try_get_actor_unchecked::<PyDataActorInner>(&actor_id) {
679 if let Err(e) = actor.on_time_event(&event) {
680 log::error!("Python time event handler failed for actor {actor_id}: {e}");
681 }
682 } else {
683 log::error!("Actor {actor_id} not found for time event handling");
684 }
685 });
686
687 inner.clock.inner_mut().register_default_handler(callback);
688
689 inner.initialize()
690 }
691
692 pub fn register_in_global_registries(&self) {
697 let inner = self.inner();
698 let component_id = inner.component_id().inner();
699 let actor_id = Actor::id(inner);
700
701 let inner_ref: Rc<UnsafeCell<PyDataActorInner>> = self.inner.clone();
702
703 let component_trait_ref: Rc<UnsafeCell<dyn Component>> = inner_ref.clone();
704 get_component_registry().insert(component_id, component_trait_ref);
705
706 let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = inner_ref;
707 get_actor_registry().insert(actor_id, actor_trait_ref);
708 }
709}
710
711impl DataActor for PyDataActorInner {
712 fn on_start(&mut self) -> anyhow::Result<()> {
713 self.dispatch_on_start()
714 .map_err(|e| anyhow::anyhow!("Python on_start failed: {e}"))
715 }
716
717 fn on_stop(&mut self) -> anyhow::Result<()> {
718 self.dispatch_on_stop()
719 .map_err(|e| anyhow::anyhow!("Python on_stop failed: {e}"))
720 }
721
722 fn on_resume(&mut self) -> anyhow::Result<()> {
723 self.dispatch_on_resume()
724 .map_err(|e| anyhow::anyhow!("Python on_resume failed: {e}"))
725 }
726
727 fn on_reset(&mut self) -> anyhow::Result<()> {
728 self.dispatch_on_reset()
729 .map_err(|e| anyhow::anyhow!("Python on_reset failed: {e}"))
730 }
731
732 fn on_dispose(&mut self) -> anyhow::Result<()> {
733 self.dispatch_on_dispose()
734 .map_err(|e| anyhow::anyhow!("Python on_dispose failed: {e}"))
735 }
736
737 fn on_degrade(&mut self) -> anyhow::Result<()> {
738 self.dispatch_on_degrade()
739 .map_err(|e| anyhow::anyhow!("Python on_degrade failed: {e}"))
740 }
741
742 fn on_fault(&mut self) -> anyhow::Result<()> {
743 self.dispatch_on_fault()
744 .map_err(|e| anyhow::anyhow!("Python on_fault failed: {e}"))
745 }
746
747 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
748 self.dispatch_on_time_event(event.clone())
749 .map_err(|e| anyhow::anyhow!("Python on_time_event failed: {e}"))
750 }
751
752 #[allow(unused_variables)]
753 fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
754 Python::attach(|py| {
755 let py_data: Py<PyAny> = Py::new(py, data.clone())?.into_any();
756 self.dispatch_on_data(py_data)
757 .map_err(|e| anyhow::anyhow!("Python on_data failed: {e}"))
758 })
759 }
760
761 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
762 self.dispatch_on_signal(signal)
763 .map_err(|e| anyhow::anyhow!("Python on_signal failed: {e}"))
764 }
765
766 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
767 Python::attach(|py| {
768 let py_instrument = instrument_any_to_pyobject(py, instrument.clone())
769 .map_err(|e| anyhow::anyhow!("Failed to convert InstrumentAny to Python: {e}"))?;
770 self.dispatch_on_instrument(py_instrument)
771 .map_err(|e| anyhow::anyhow!("Python on_instrument failed: {e}"))
772 })
773 }
774
775 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
776 self.dispatch_on_quote(*quote)
777 .map_err(|e| anyhow::anyhow!("Python on_quote failed: {e}"))
778 }
779
780 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
781 self.dispatch_on_trade(*tick)
782 .map_err(|e| anyhow::anyhow!("Python on_trade failed: {e}"))
783 }
784
785 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
786 self.dispatch_on_bar(*bar)
787 .map_err(|e| anyhow::anyhow!("Python on_bar failed: {e}"))
788 }
789
790 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
791 self.dispatch_on_book_deltas(deltas.clone())
792 .map_err(|e| anyhow::anyhow!("Python on_book_deltas failed: {e}"))
793 }
794
795 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
796 self.dispatch_on_book(order_book)
797 .map_err(|e| anyhow::anyhow!("Python on_book failed: {e}"))
798 }
799
800 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
801 self.dispatch_on_mark_price(*mark_price)
802 .map_err(|e| anyhow::anyhow!("Python on_mark_price failed: {e}"))
803 }
804
805 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
806 self.dispatch_on_index_price(*index_price)
807 .map_err(|e| anyhow::anyhow!("Python on_index_price failed: {e}"))
808 }
809
810 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
811 self.dispatch_on_funding_rate(*funding_rate)
812 .map_err(|e| anyhow::anyhow!("Python on_funding_rate failed: {e}"))
813 }
814
815 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
816 self.dispatch_on_instrument_status(*data)
817 .map_err(|e| anyhow::anyhow!("Python on_instrument_status failed: {e}"))
818 }
819
820 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
821 self.dispatch_on_instrument_close(*update)
822 .map_err(|e| anyhow::anyhow!("Python on_instrument_close failed: {e}"))
823 }
824
825 fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
826 self.dispatch_on_option_greeks(*greeks)
827 .map_err(|e| anyhow::anyhow!("Python on_option_greeks failed: {e}"))
828 }
829
830 fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
831 self.dispatch_on_option_chain(slice.clone())
832 .map_err(|e| anyhow::anyhow!("Python on_option_chain failed: {e}"))
833 }
834
835 #[cfg(feature = "defi")]
836 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
837 self.dispatch_on_block(block.clone())
838 .map_err(|e| anyhow::anyhow!("Python on_block failed: {e}"))
839 }
840
841 #[cfg(feature = "defi")]
842 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
843 self.dispatch_on_pool(pool.clone())
844 .map_err(|e| anyhow::anyhow!("Python on_pool failed: {e}"))
845 }
846
847 #[cfg(feature = "defi")]
848 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
849 self.dispatch_on_pool_swap(swap.clone())
850 .map_err(|e| anyhow::anyhow!("Python on_pool_swap failed: {e}"))
851 }
852
853 #[cfg(feature = "defi")]
854 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
855 self.dispatch_on_pool_liquidity_update(update.clone())
856 .map_err(|e| anyhow::anyhow!("Python on_pool_liquidity_update failed: {e}"))
857 }
858
859 #[cfg(feature = "defi")]
860 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
861 self.dispatch_on_pool_fee_collect(collect.clone())
862 .map_err(|e| anyhow::anyhow!("Python on_pool_fee_collect failed: {e}"))
863 }
864
865 #[cfg(feature = "defi")]
866 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
867 self.dispatch_on_pool_flash(flash.clone())
868 .map_err(|e| anyhow::anyhow!("Python on_pool_flash failed: {e}"))
869 }
870
871 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
872 Python::attach(|py| {
873 let py_data: Py<PyAny> = if let Some(custom_data) = data.downcast_ref::<CustomData>() {
874 Py::new(py, custom_data.clone())?.into_any()
875 } else {
876 anyhow::bail!("Failed to convert historical data to Python: unsupported type");
877 };
878 self.dispatch_on_historical_data(py_data)
879 .map_err(|e| anyhow::anyhow!("Python on_historical_data failed: {e}"))
880 })
881 }
882
883 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
884 self.dispatch_on_historical_quotes(quotes.to_vec())
885 .map_err(|e| anyhow::anyhow!("Python on_historical_quotes failed: {e}"))
886 }
887
888 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
889 self.dispatch_on_historical_trades(trades.to_vec())
890 .map_err(|e| anyhow::anyhow!("Python on_historical_trades failed: {e}"))
891 }
892
893 fn on_historical_funding_rates(
894 &mut self,
895 funding_rates: &[FundingRateUpdate],
896 ) -> anyhow::Result<()> {
897 self.dispatch_on_historical_funding_rates(funding_rates.to_vec())
898 .map_err(|e| anyhow::anyhow!("Python on_historical_funding_rates failed: {e}"))
899 }
900
901 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
902 self.dispatch_on_historical_bars(bars.to_vec())
903 .map_err(|e| anyhow::anyhow!("Python on_historical_bars failed: {e}"))
904 }
905
906 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
907 self.dispatch_on_historical_mark_prices(mark_prices.to_vec())
908 .map_err(|e| anyhow::anyhow!("Python on_historical_mark_prices failed: {e}"))
909 }
910
911 fn on_historical_index_prices(
912 &mut self,
913 index_prices: &[IndexPriceUpdate],
914 ) -> anyhow::Result<()> {
915 self.dispatch_on_historical_index_prices(index_prices.to_vec())
916 .map_err(|e| anyhow::anyhow!("Python on_historical_index_prices failed: {e}"))
917 }
918}
919
920#[pymethods]
921#[pyo3_stub_gen::derive::gen_stub_pymethods]
922impl PyDataActor {
923 #[new]
924 #[pyo3(signature = (config=None))]
925 fn py_new(config: Option<DataActorConfig>) -> Self {
926 Self::new(config)
927 }
928
929 #[pyo3(signature = (config=None))]
930 #[allow(unused_variables, clippy::needless_pass_by_value)]
931 fn __init__(slf: &Bound<'_, Self>, config: Option<DataActorConfig>) {
932 let py_self: Py<PyAny> = slf.clone().unbind().into_any();
933 slf.borrow_mut().set_python_instance(py_self);
934 }
935
936 #[getter]
937 #[pyo3(name = "clock")]
938 fn py_clock(&self) -> PyResult<PyClock> {
939 let inner = self.inner();
940 if inner.core.is_registered() {
941 Ok(inner.clock.clone())
942 } else {
943 Err(to_pyruntime_err(
944 "Actor must be registered with a trader before accessing clock",
945 ))
946 }
947 }
948
949 #[getter]
950 #[pyo3(name = "cache")]
951 fn py_cache(&self) -> PyResult<PyCache> {
952 let inner = self.inner();
953 if inner.core.is_registered() {
954 Ok(PyCache::from_rc(inner.core.cache_rc()))
955 } else {
956 Err(to_pyruntime_err(
957 "Actor must be registered with a trader before accessing cache",
958 ))
959 }
960 }
961
962 #[getter]
963 #[pyo3(name = "log")]
964 fn py_log(&self) -> PyLogger {
965 self.inner().logger.clone()
966 }
967
968 #[getter]
969 #[pyo3(name = "actor_id")]
970 fn py_actor_id(&self) -> ActorId {
971 self.inner().core.actor_id
972 }
973
974 #[getter]
975 #[pyo3(name = "trader_id")]
976 fn py_trader_id(&self) -> Option<TraderId> {
977 self.inner().core.trader_id()
978 }
979
980 #[pyo3(name = "state")]
981 fn py_state(&self) -> ComponentState {
982 Component::state(self.inner())
983 }
984
985 #[pyo3(name = "is_ready")]
986 fn py_is_ready(&self) -> bool {
987 Component::is_ready(self.inner())
988 }
989
990 #[pyo3(name = "is_running")]
991 fn py_is_running(&self) -> bool {
992 Component::is_running(self.inner())
993 }
994
995 #[pyo3(name = "is_stopped")]
996 fn py_is_stopped(&self) -> bool {
997 Component::is_stopped(self.inner())
998 }
999
1000 #[pyo3(name = "is_degraded")]
1001 fn py_is_degraded(&self) -> bool {
1002 Component::is_degraded(self.inner())
1003 }
1004
1005 #[pyo3(name = "is_faulted")]
1006 fn py_is_faulted(&self) -> bool {
1007 Component::is_faulted(self.inner())
1008 }
1009
1010 #[pyo3(name = "is_disposed")]
1011 fn py_is_disposed(&self) -> bool {
1012 Component::is_disposed(self.inner())
1013 }
1014
1015 #[pyo3(name = "start")]
1016 fn py_start(&mut self) -> PyResult<()> {
1017 Component::start(self.inner_mut()).map_err(to_pyruntime_err)
1018 }
1019
1020 #[pyo3(name = "stop")]
1021 fn py_stop(&mut self) -> PyResult<()> {
1022 Component::stop(self.inner_mut()).map_err(to_pyruntime_err)
1023 }
1024
1025 #[pyo3(name = "resume")]
1026 fn py_resume(&mut self) -> PyResult<()> {
1027 Component::resume(self.inner_mut()).map_err(to_pyruntime_err)
1028 }
1029
1030 #[pyo3(name = "reset")]
1031 fn py_reset(&mut self) -> PyResult<()> {
1032 Component::reset(self.inner_mut()).map_err(to_pyruntime_err)
1033 }
1034
1035 #[pyo3(name = "dispose")]
1036 fn py_dispose(&mut self) -> PyResult<()> {
1037 Component::dispose(self.inner_mut()).map_err(to_pyruntime_err)
1038 }
1039
1040 #[pyo3(name = "degrade")]
1041 fn py_degrade(&mut self) -> PyResult<()> {
1042 Component::degrade(self.inner_mut()).map_err(to_pyruntime_err)
1043 }
1044
1045 #[pyo3(name = "fault")]
1046 fn py_fault(&mut self) -> PyResult<()> {
1047 Component::fault(self.inner_mut()).map_err(to_pyruntime_err)
1048 }
1049
1050 #[pyo3(name = "shutdown_system")]
1051 #[pyo3(signature = (reason=None))]
1052 fn py_shutdown_system(&self, reason: Option<String>) {
1053 self.inner().core.shutdown_system(reason);
1054 }
1055
1056 #[pyo3(name = "publish_data")]
1057 fn py_publish_data(&self, data_type: &DataType, data: &CustomData) {
1058 self.inner().core.publish_data(data_type, data);
1059 }
1060
1061 #[pyo3(name = "publish_signal")]
1062 #[pyo3(signature = (name, value, ts_event=0))]
1063 #[allow(clippy::needless_pass_by_value)]
1064 fn py_publish_signal(
1065 &self,
1066 py: Python<'_>,
1067 name: &str,
1068 value: Py<PyAny>,
1069 ts_event: u64,
1070 ) -> PyResult<()> {
1071 let value_str: String = value.bind(py).str()?.extract()?;
1073 self.inner()
1074 .core
1075 .publish_signal(name, value_str, UnixNanos::from(ts_event));
1076 Ok(())
1077 }
1078
1079 #[pyo3(name = "add_synthetic")]
1080 fn py_add_synthetic(&self, synthetic: SyntheticInstrument) -> PyResult<()> {
1081 self.inner()
1082 .core
1083 .add_synthetic(synthetic)
1084 .map_err(to_pyvalue_err)
1085 }
1086
1087 #[pyo3(name = "update_synthetic")]
1088 fn py_update_synthetic(&self, synthetic: SyntheticInstrument) -> PyResult<()> {
1089 self.inner()
1090 .core
1091 .update_synthetic(synthetic)
1092 .map_err(to_pyvalue_err)
1093 }
1094
1095 #[pyo3(name = "on_start")]
1096 fn py_on_start(&self) {}
1097
1098 #[pyo3(name = "on_stop")]
1099 fn py_on_stop(&mut self) {}
1100
1101 #[pyo3(name = "on_resume")]
1102 fn py_on_resume(&mut self) {}
1103
1104 #[pyo3(name = "on_reset")]
1105 fn py_on_reset(&mut self) {}
1106
1107 #[pyo3(name = "on_dispose")]
1108 fn py_on_dispose(&mut self) {}
1109
1110 #[pyo3(name = "on_degrade")]
1111 fn py_on_degrade(&mut self) {}
1112
1113 #[pyo3(name = "on_fault")]
1114 fn py_on_fault(&mut self) {}
1115
1116 #[allow(unused_variables, clippy::needless_pass_by_value)]
1117 #[pyo3(name = "on_time_event")]
1118 fn py_on_time_event(&mut self, event: TimeEvent) {}
1119
1120 #[allow(unused_variables, clippy::needless_pass_by_value)]
1121 #[pyo3(name = "on_data")]
1122 fn py_on_data(&mut self, data: Py<PyAny>) {}
1123
1124 #[allow(unused_variables)]
1125 #[pyo3(name = "on_signal")]
1126 fn py_on_signal(&mut self, signal: &Signal) {}
1127
1128 #[allow(unused_variables, clippy::needless_pass_by_value)]
1129 #[pyo3(name = "on_instrument")]
1130 fn py_on_instrument(&mut self, instrument: Py<PyAny>) {}
1131
1132 #[allow(unused_variables)]
1133 #[pyo3(name = "on_quote")]
1134 fn py_on_quote(&mut self, quote: QuoteTick) {}
1135
1136 #[allow(unused_variables)]
1137 #[pyo3(name = "on_trade")]
1138 fn py_on_trade(&mut self, trade: TradeTick) {}
1139
1140 #[allow(unused_variables)]
1141 #[pyo3(name = "on_bar")]
1142 fn py_on_bar(&mut self, bar: Bar) {}
1143
1144 #[allow(unused_variables, clippy::needless_pass_by_value)]
1145 #[pyo3(name = "on_book_deltas")]
1146 fn py_on_book_deltas(&mut self, deltas: OrderBookDeltas) {}
1147
1148 #[allow(unused_variables)]
1149 #[pyo3(name = "on_book")]
1150 fn py_on_book(&mut self, book: &OrderBook) {}
1151
1152 #[allow(unused_variables)]
1153 #[pyo3(name = "on_mark_price")]
1154 fn py_on_mark_price(&mut self, mark_price: MarkPriceUpdate) {}
1155
1156 #[allow(unused_variables)]
1157 #[pyo3(name = "on_index_price")]
1158 fn py_on_index_price(&mut self, index_price: IndexPriceUpdate) {}
1159
1160 #[allow(unused_variables)]
1161 #[pyo3(name = "on_funding_rate")]
1162 fn py_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) {}
1163
1164 #[allow(unused_variables)]
1165 #[pyo3(name = "on_instrument_status")]
1166 fn py_on_instrument_status(&mut self, status: InstrumentStatus) {}
1167
1168 #[allow(unused_variables)]
1169 #[pyo3(name = "on_instrument_close")]
1170 fn py_on_instrument_close(&mut self, close: InstrumentClose) {}
1171
1172 #[allow(unused_variables)]
1173 #[pyo3(name = "on_option_greeks")]
1174 fn py_on_option_greeks(&mut self, greeks: OptionGreeks) {}
1175
1176 #[allow(unused_variables, clippy::needless_pass_by_value)]
1177 #[pyo3(name = "on_option_chain")]
1178 fn py_on_option_chain(&mut self, slice: OptionChainSlice) {}
1179
1180 #[pyo3(name = "subscribe_data")]
1181 #[pyo3(signature = (data_type, client_id=None, params=None))]
1182 fn py_subscribe_data(
1183 &mut self,
1184 py: Python<'_>,
1185 data_type: DataType,
1186 client_id: Option<ClientId>,
1187 params: Option<Py<PyDict>>,
1188 ) -> PyResult<()> {
1189 let params = dict_to_params(py, params)?;
1190 DataActor::subscribe_data(self.inner_mut(), data_type, client_id, params);
1191 Ok(())
1192 }
1193
1194 #[pyo3(name = "subscribe_signal")]
1195 #[pyo3(signature = (name=""))]
1196 fn py_subscribe_signal(&mut self, name: &str) {
1197 DataActor::subscribe_signal(self.inner_mut(), name);
1198 }
1199
1200 #[pyo3(name = "subscribe_instruments")]
1201 #[pyo3(signature = (venue, client_id=None, params=None))]
1202 fn py_subscribe_instruments(
1203 &mut self,
1204 py: Python<'_>,
1205 venue: Venue,
1206 client_id: Option<ClientId>,
1207 params: Option<Py<PyDict>>,
1208 ) -> PyResult<()> {
1209 let params = dict_to_params(py, params)?;
1210 DataActor::subscribe_instruments(self.inner_mut(), venue, client_id, params);
1211 Ok(())
1212 }
1213
1214 #[pyo3(name = "subscribe_instrument")]
1215 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1216 fn py_subscribe_instrument(
1217 &mut self,
1218 py: Python<'_>,
1219 instrument_id: InstrumentId,
1220 client_id: Option<ClientId>,
1221 params: Option<Py<PyDict>>,
1222 ) -> PyResult<()> {
1223 let params = dict_to_params(py, params)?;
1224 DataActor::subscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1225 Ok(())
1226 }
1227
1228 #[pyo3(name = "subscribe_book_deltas")]
1229 #[pyo3(signature = (instrument_id, book_type, depth=None, client_id=None, managed=false, params=None))]
1230 #[expect(clippy::too_many_arguments)]
1231 fn py_subscribe_book_deltas(
1232 &mut self,
1233 py: Python<'_>,
1234 instrument_id: InstrumentId,
1235 book_type: BookType,
1236 depth: Option<usize>,
1237 client_id: Option<ClientId>,
1238 managed: bool,
1239 params: Option<Py<PyDict>>,
1240 ) -> PyResult<()> {
1241 let params = dict_to_params(py, params)?;
1242 let depth = depth.and_then(NonZeroUsize::new);
1243 DataActor::subscribe_book_deltas(
1244 self.inner_mut(),
1245 instrument_id,
1246 book_type,
1247 depth,
1248 client_id,
1249 managed,
1250 params,
1251 );
1252 Ok(())
1253 }
1254
1255 #[pyo3(name = "subscribe_book_at_interval")]
1256 #[pyo3(signature = (instrument_id, book_type, interval_ms, depth=None, client_id=None, params=None))]
1257 #[expect(clippy::too_many_arguments)]
1258 fn py_subscribe_book_at_interval(
1259 &mut self,
1260 py: Python<'_>,
1261 instrument_id: InstrumentId,
1262 book_type: BookType,
1263 interval_ms: usize,
1264 depth: Option<usize>,
1265 client_id: Option<ClientId>,
1266 params: Option<Py<PyDict>>,
1267 ) -> PyResult<()> {
1268 let params = dict_to_params(py, params)?;
1269 let depth = depth.and_then(NonZeroUsize::new);
1270 let interval_ms = NonZeroUsize::new(interval_ms)
1271 .ok_or_else(|| to_pyvalue_err("interval_ms must be > 0"))?;
1272
1273 DataActor::subscribe_book_at_interval(
1274 self.inner_mut(),
1275 instrument_id,
1276 book_type,
1277 depth,
1278 interval_ms,
1279 client_id,
1280 params,
1281 );
1282 Ok(())
1283 }
1284
1285 #[pyo3(name = "subscribe_quotes")]
1286 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1287 fn py_subscribe_quotes(
1288 &mut self,
1289 py: Python<'_>,
1290 instrument_id: InstrumentId,
1291 client_id: Option<ClientId>,
1292 params: Option<Py<PyDict>>,
1293 ) -> PyResult<()> {
1294 let params = dict_to_params(py, params)?;
1295 DataActor::subscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1296 Ok(())
1297 }
1298
1299 #[pyo3(name = "subscribe_trades")]
1300 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1301 fn py_subscribe_trades(
1302 &mut self,
1303 py: Python<'_>,
1304 instrument_id: InstrumentId,
1305 client_id: Option<ClientId>,
1306 params: Option<Py<PyDict>>,
1307 ) -> PyResult<()> {
1308 let params = dict_to_params(py, params)?;
1309 DataActor::subscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1310 Ok(())
1311 }
1312
1313 #[pyo3(name = "subscribe_bars")]
1314 #[pyo3(signature = (bar_type, client_id=None, params=None))]
1315 fn py_subscribe_bars(
1316 &mut self,
1317 py: Python<'_>,
1318 bar_type: BarType,
1319 client_id: Option<ClientId>,
1320 params: Option<Py<PyDict>>,
1321 ) -> PyResult<()> {
1322 let params = dict_to_params(py, params)?;
1323 DataActor::subscribe_bars(self.inner_mut(), bar_type, client_id, params);
1324 Ok(())
1325 }
1326
1327 #[pyo3(name = "subscribe_mark_prices")]
1328 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1329 fn py_subscribe_mark_prices(
1330 &mut self,
1331 py: Python<'_>,
1332 instrument_id: InstrumentId,
1333 client_id: Option<ClientId>,
1334 params: Option<Py<PyDict>>,
1335 ) -> PyResult<()> {
1336 let params = dict_to_params(py, params)?;
1337 DataActor::subscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1338 Ok(())
1339 }
1340
1341 #[pyo3(name = "subscribe_index_prices")]
1342 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1343 fn py_subscribe_index_prices(
1344 &mut self,
1345 py: Python<'_>,
1346 instrument_id: InstrumentId,
1347 client_id: Option<ClientId>,
1348 params: Option<Py<PyDict>>,
1349 ) -> PyResult<()> {
1350 let params = dict_to_params(py, params)?;
1351 DataActor::subscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1352 Ok(())
1353 }
1354
1355 #[pyo3(name = "subscribe_funding_rates")]
1356 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1357 fn py_subscribe_funding_rates(
1358 &mut self,
1359 py: Python<'_>,
1360 instrument_id: InstrumentId,
1361 client_id: Option<ClientId>,
1362 params: Option<Py<PyDict>>,
1363 ) -> PyResult<()> {
1364 let params = dict_to_params(py, params)?;
1365 DataActor::subscribe_funding_rates(self.inner_mut(), instrument_id, client_id, params);
1366 Ok(())
1367 }
1368
1369 #[pyo3(name = "subscribe_option_greeks")]
1370 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1371 fn py_subscribe_option_greeks(
1372 &mut self,
1373 py: Python<'_>,
1374 instrument_id: InstrumentId,
1375 client_id: Option<ClientId>,
1376 params: Option<Py<PyDict>>,
1377 ) -> PyResult<()> {
1378 let params = dict_to_params(py, params)?;
1379 DataActor::subscribe_option_greeks(self.inner_mut(), instrument_id, client_id, params);
1380 Ok(())
1381 }
1382
1383 #[pyo3(name = "subscribe_instrument_status")]
1384 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1385 fn py_subscribe_instrument_status(
1386 &mut self,
1387 py: Python<'_>,
1388 instrument_id: InstrumentId,
1389 client_id: Option<ClientId>,
1390 params: Option<Py<PyDict>>,
1391 ) -> PyResult<()> {
1392 let params = dict_to_params(py, params)?;
1393 DataActor::subscribe_instrument_status(self.inner_mut(), instrument_id, client_id, params);
1394 Ok(())
1395 }
1396
1397 #[pyo3(name = "subscribe_instrument_close")]
1398 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1399 fn py_subscribe_instrument_close(
1400 &mut self,
1401 py: Python<'_>,
1402 instrument_id: InstrumentId,
1403 client_id: Option<ClientId>,
1404 params: Option<Py<PyDict>>,
1405 ) -> PyResult<()> {
1406 let params = dict_to_params(py, params)?;
1407 DataActor::subscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1408 Ok(())
1409 }
1410
1411 #[pyo3(name = "subscribe_option_chain")]
1412 #[pyo3(signature = (series_id, strike_range, snapshot_interval_ms=None, client_id=None, params=None))]
1413 fn py_subscribe_option_chain(
1414 &mut self,
1415 py: Python<'_>,
1416 series_id: OptionSeriesId,
1417 strike_range: PyStrikeRange,
1418 snapshot_interval_ms: Option<u64>,
1419 client_id: Option<ClientId>,
1420 params: Option<Py<PyDict>>,
1421 ) -> PyResult<()> {
1422 let params = dict_to_params(py, params)?;
1423 DataActor::subscribe_option_chain(
1424 self.inner_mut(),
1425 series_id,
1426 strike_range.inner,
1427 snapshot_interval_ms,
1428 client_id,
1429 params,
1430 );
1431 Ok(())
1432 }
1433
1434 #[pyo3(name = "subscribe_order_fills")]
1435 #[pyo3(signature = (instrument_id))]
1436 fn py_subscribe_order_fills(&mut self, instrument_id: InstrumentId) {
1437 DataActor::subscribe_order_fills(self.inner_mut(), instrument_id);
1438 }
1439
1440 #[pyo3(name = "subscribe_order_cancels")]
1441 #[pyo3(signature = (instrument_id))]
1442 fn py_subscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
1443 DataActor::subscribe_order_cancels(self.inner_mut(), instrument_id);
1444 }
1445
1446 #[pyo3(name = "unsubscribe_data")]
1447 #[pyo3(signature = (data_type, client_id=None, params=None))]
1448 fn py_unsubscribe_data(
1449 &mut self,
1450 py: Python<'_>,
1451 data_type: DataType,
1452 client_id: Option<ClientId>,
1453 params: Option<Py<PyDict>>,
1454 ) -> PyResult<()> {
1455 let params = dict_to_params(py, params)?;
1456 DataActor::unsubscribe_data(self.inner_mut(), data_type, client_id, params);
1457 Ok(())
1458 }
1459
1460 #[pyo3(name = "unsubscribe_signal")]
1461 #[pyo3(signature = (name=""))]
1462 fn py_unsubscribe_signal(&mut self, name: &str) {
1463 DataActor::unsubscribe_signal(self.inner_mut(), name);
1464 }
1465
1466 #[pyo3(name = "unsubscribe_instruments")]
1467 #[pyo3(signature = (venue, client_id=None, params=None))]
1468 fn py_unsubscribe_instruments(
1469 &mut self,
1470 py: Python<'_>,
1471 venue: Venue,
1472 client_id: Option<ClientId>,
1473 params: Option<Py<PyDict>>,
1474 ) -> PyResult<()> {
1475 let params = dict_to_params(py, params)?;
1476 DataActor::unsubscribe_instruments(self.inner_mut(), venue, client_id, params);
1477 Ok(())
1478 }
1479
1480 #[pyo3(name = "unsubscribe_instrument")]
1481 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1482 fn py_unsubscribe_instrument(
1483 &mut self,
1484 py: Python<'_>,
1485 instrument_id: InstrumentId,
1486 client_id: Option<ClientId>,
1487 params: Option<Py<PyDict>>,
1488 ) -> PyResult<()> {
1489 let params = dict_to_params(py, params)?;
1490 DataActor::unsubscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1491 Ok(())
1492 }
1493
1494 #[pyo3(name = "unsubscribe_book_deltas")]
1495 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1496 fn py_unsubscribe_book_deltas(
1497 &mut self,
1498 py: Python<'_>,
1499 instrument_id: InstrumentId,
1500 client_id: Option<ClientId>,
1501 params: Option<Py<PyDict>>,
1502 ) -> PyResult<()> {
1503 let params = dict_to_params(py, params)?;
1504 DataActor::unsubscribe_book_deltas(self.inner_mut(), instrument_id, client_id, params);
1505 Ok(())
1506 }
1507
1508 #[pyo3(name = "unsubscribe_book_at_interval")]
1509 #[pyo3(signature = (instrument_id, interval_ms, client_id=None, params=None))]
1510 fn py_unsubscribe_book_at_interval(
1511 &mut self,
1512 py: Python<'_>,
1513 instrument_id: InstrumentId,
1514 interval_ms: usize,
1515 client_id: Option<ClientId>,
1516 params: Option<Py<PyDict>>,
1517 ) -> PyResult<()> {
1518 let params = dict_to_params(py, params)?;
1519 let interval_ms = NonZeroUsize::new(interval_ms)
1520 .ok_or_else(|| to_pyvalue_err("interval_ms must be > 0"))?;
1521
1522 DataActor::unsubscribe_book_at_interval(
1523 self.inner_mut(),
1524 instrument_id,
1525 interval_ms,
1526 client_id,
1527 params,
1528 );
1529 Ok(())
1530 }
1531
1532 #[pyo3(name = "unsubscribe_quotes")]
1533 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1534 fn py_unsubscribe_quotes(
1535 &mut self,
1536 py: Python<'_>,
1537 instrument_id: InstrumentId,
1538 client_id: Option<ClientId>,
1539 params: Option<Py<PyDict>>,
1540 ) -> PyResult<()> {
1541 let params = dict_to_params(py, params)?;
1542 DataActor::unsubscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1543 Ok(())
1544 }
1545
1546 #[pyo3(name = "unsubscribe_trades")]
1547 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1548 fn py_unsubscribe_trades(
1549 &mut self,
1550 py: Python<'_>,
1551 instrument_id: InstrumentId,
1552 client_id: Option<ClientId>,
1553 params: Option<Py<PyDict>>,
1554 ) -> PyResult<()> {
1555 let params = dict_to_params(py, params)?;
1556 DataActor::unsubscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1557 Ok(())
1558 }
1559
1560 #[pyo3(name = "unsubscribe_bars")]
1561 #[pyo3(signature = (bar_type, client_id=None, params=None))]
1562 fn py_unsubscribe_bars(
1563 &mut self,
1564 py: Python<'_>,
1565 bar_type: BarType,
1566 client_id: Option<ClientId>,
1567 params: Option<Py<PyDict>>,
1568 ) -> PyResult<()> {
1569 let params = dict_to_params(py, params)?;
1570 DataActor::unsubscribe_bars(self.inner_mut(), bar_type, client_id, params);
1571 Ok(())
1572 }
1573
1574 #[pyo3(name = "unsubscribe_mark_prices")]
1575 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1576 fn py_unsubscribe_mark_prices(
1577 &mut self,
1578 py: Python<'_>,
1579 instrument_id: InstrumentId,
1580 client_id: Option<ClientId>,
1581 params: Option<Py<PyDict>>,
1582 ) -> PyResult<()> {
1583 let params = dict_to_params(py, params)?;
1584 DataActor::unsubscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1585 Ok(())
1586 }
1587
1588 #[pyo3(name = "unsubscribe_index_prices")]
1589 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1590 fn py_unsubscribe_index_prices(
1591 &mut self,
1592 py: Python<'_>,
1593 instrument_id: InstrumentId,
1594 client_id: Option<ClientId>,
1595 params: Option<Py<PyDict>>,
1596 ) -> PyResult<()> {
1597 let params = dict_to_params(py, params)?;
1598 DataActor::unsubscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1599 Ok(())
1600 }
1601
1602 #[pyo3(name = "unsubscribe_funding_rates")]
1603 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1604 fn py_unsubscribe_funding_rates(
1605 &mut self,
1606 py: Python<'_>,
1607 instrument_id: InstrumentId,
1608 client_id: Option<ClientId>,
1609 params: Option<Py<PyDict>>,
1610 ) -> PyResult<()> {
1611 let params = dict_to_params(py, params)?;
1612 DataActor::unsubscribe_funding_rates(self.inner_mut(), instrument_id, client_id, params);
1613 Ok(())
1614 }
1615
1616 #[pyo3(name = "unsubscribe_option_greeks")]
1617 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1618 fn py_unsubscribe_option_greeks(
1619 &mut self,
1620 py: Python<'_>,
1621 instrument_id: InstrumentId,
1622 client_id: Option<ClientId>,
1623 params: Option<Py<PyDict>>,
1624 ) -> PyResult<()> {
1625 let params = dict_to_params(py, params)?;
1626 DataActor::unsubscribe_option_greeks(self.inner_mut(), instrument_id, client_id, params);
1627 Ok(())
1628 }
1629
1630 #[pyo3(name = "unsubscribe_instrument_status")]
1631 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1632 fn py_unsubscribe_instrument_status(
1633 &mut self,
1634 py: Python<'_>,
1635 instrument_id: InstrumentId,
1636 client_id: Option<ClientId>,
1637 params: Option<Py<PyDict>>,
1638 ) -> PyResult<()> {
1639 let params = dict_to_params(py, params)?;
1640 DataActor::unsubscribe_instrument_status(
1641 self.inner_mut(),
1642 instrument_id,
1643 client_id,
1644 params,
1645 );
1646 Ok(())
1647 }
1648
1649 #[pyo3(name = "unsubscribe_instrument_close")]
1650 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1651 fn py_unsubscribe_instrument_close(
1652 &mut self,
1653 py: Python<'_>,
1654 instrument_id: InstrumentId,
1655 client_id: Option<ClientId>,
1656 params: Option<Py<PyDict>>,
1657 ) -> PyResult<()> {
1658 let params = dict_to_params(py, params)?;
1659 DataActor::unsubscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1660 Ok(())
1661 }
1662
1663 #[pyo3(name = "unsubscribe_option_chain")]
1664 #[pyo3(signature = (series_id, client_id=None))]
1665 fn py_unsubscribe_option_chain(
1666 &mut self,
1667 series_id: OptionSeriesId,
1668 client_id: Option<ClientId>,
1669 ) {
1670 DataActor::unsubscribe_option_chain(self.inner_mut(), series_id, client_id);
1671 }
1672
1673 #[pyo3(name = "unsubscribe_order_fills")]
1674 #[pyo3(signature = (instrument_id))]
1675 fn py_unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
1676 DataActor::unsubscribe_order_fills(self.inner_mut(), instrument_id);
1677 }
1678
1679 #[pyo3(name = "unsubscribe_order_cancels")]
1680 #[pyo3(signature = (instrument_id))]
1681 fn py_unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
1682 DataActor::unsubscribe_order_cancels(self.inner_mut(), instrument_id);
1683 }
1684
1685 #[pyo3(name = "request_data")]
1686 #[pyo3(signature = (data_type, client_id, start=None, end=None, limit=None, params=None))]
1687 #[expect(clippy::too_many_arguments)]
1688 fn py_request_data(
1689 &mut self,
1690 py: Python<'_>,
1691 data_type: DataType,
1692 client_id: ClientId,
1693 start: Option<u64>,
1694 end: Option<u64>,
1695 limit: Option<usize>,
1696 params: Option<Py<PyDict>>,
1697 ) -> PyResult<String> {
1698 let params = dict_to_params(py, params)?;
1699 let limit = limit.and_then(NonZeroUsize::new);
1700 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1701 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1702
1703 let request_id = DataActor::request_data(
1704 self.inner_mut(),
1705 data_type,
1706 client_id,
1707 start,
1708 end,
1709 limit,
1710 params,
1711 )
1712 .map_err(to_pyvalue_err)?;
1713 Ok(request_id.to_string())
1714 }
1715
1716 #[pyo3(name = "request_instrument")]
1717 #[pyo3(signature = (instrument_id, start=None, end=None, client_id=None, params=None))]
1718 fn py_request_instrument(
1719 &mut self,
1720 py: Python<'_>,
1721 instrument_id: InstrumentId,
1722 start: Option<u64>,
1723 end: Option<u64>,
1724 client_id: Option<ClientId>,
1725 params: Option<Py<PyDict>>,
1726 ) -> PyResult<String> {
1727 let params = dict_to_params(py, params)?;
1728 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1729 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1730
1731 let request_id = DataActor::request_instrument(
1732 self.inner_mut(),
1733 instrument_id,
1734 start,
1735 end,
1736 client_id,
1737 params,
1738 )
1739 .map_err(to_pyvalue_err)?;
1740 Ok(request_id.to_string())
1741 }
1742
1743 #[pyo3(name = "request_instruments")]
1744 #[pyo3(signature = (venue=None, start=None, end=None, client_id=None, params=None))]
1745 fn py_request_instruments(
1746 &mut self,
1747 py: Python<'_>,
1748 venue: Option<Venue>,
1749 start: Option<u64>,
1750 end: Option<u64>,
1751 client_id: Option<ClientId>,
1752 params: Option<Py<PyDict>>,
1753 ) -> PyResult<String> {
1754 let params = dict_to_params(py, params)?;
1755 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1756 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1757
1758 let request_id =
1759 DataActor::request_instruments(self.inner_mut(), venue, start, end, client_id, params)
1760 .map_err(to_pyvalue_err)?;
1761 Ok(request_id.to_string())
1762 }
1763
1764 #[pyo3(name = "request_book_snapshot")]
1765 #[pyo3(signature = (instrument_id, depth=None, client_id=None, params=None))]
1766 fn py_request_book_snapshot(
1767 &mut self,
1768 py: Python<'_>,
1769 instrument_id: InstrumentId,
1770 depth: Option<usize>,
1771 client_id: Option<ClientId>,
1772 params: Option<Py<PyDict>>,
1773 ) -> PyResult<String> {
1774 let params = dict_to_params(py, params)?;
1775 let depth = depth.and_then(NonZeroUsize::new);
1776
1777 let request_id = DataActor::request_book_snapshot(
1778 self.inner_mut(),
1779 instrument_id,
1780 depth,
1781 client_id,
1782 params,
1783 )
1784 .map_err(to_pyvalue_err)?;
1785 Ok(request_id.to_string())
1786 }
1787
1788 #[pyo3(name = "request_quotes")]
1789 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1790 #[expect(clippy::too_many_arguments)]
1791 fn py_request_quotes(
1792 &mut self,
1793 py: Python<'_>,
1794 instrument_id: InstrumentId,
1795 start: Option<u64>,
1796 end: Option<u64>,
1797 limit: Option<usize>,
1798 client_id: Option<ClientId>,
1799 params: Option<Py<PyDict>>,
1800 ) -> PyResult<String> {
1801 let params = dict_to_params(py, params)?;
1802 let limit = limit.and_then(NonZeroUsize::new);
1803 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1804 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1805
1806 let request_id = DataActor::request_quotes(
1807 self.inner_mut(),
1808 instrument_id,
1809 start,
1810 end,
1811 limit,
1812 client_id,
1813 params,
1814 )
1815 .map_err(to_pyvalue_err)?;
1816 Ok(request_id.to_string())
1817 }
1818
1819 #[pyo3(name = "request_trades")]
1820 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1821 #[expect(clippy::too_many_arguments)]
1822 fn py_request_trades(
1823 &mut self,
1824 py: Python<'_>,
1825 instrument_id: InstrumentId,
1826 start: Option<u64>,
1827 end: Option<u64>,
1828 limit: Option<usize>,
1829 client_id: Option<ClientId>,
1830 params: Option<Py<PyDict>>,
1831 ) -> PyResult<String> {
1832 let params = dict_to_params(py, params)?;
1833 let limit = limit.and_then(NonZeroUsize::new);
1834 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1835 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1836
1837 let request_id = DataActor::request_trades(
1838 self.inner_mut(),
1839 instrument_id,
1840 start,
1841 end,
1842 limit,
1843 client_id,
1844 params,
1845 )
1846 .map_err(to_pyvalue_err)?;
1847 Ok(request_id.to_string())
1848 }
1849
1850 #[pyo3(name = "request_funding_rates")]
1851 #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1852 #[expect(clippy::too_many_arguments)]
1853 fn py_request_funding_rates(
1854 &mut self,
1855 py: Python<'_>,
1856 instrument_id: InstrumentId,
1857 start: Option<u64>,
1858 end: Option<u64>,
1859 limit: Option<usize>,
1860 client_id: Option<ClientId>,
1861 params: Option<Py<PyDict>>,
1862 ) -> PyResult<String> {
1863 let params = dict_to_params(py, params)?;
1864 let limit = limit.and_then(NonZeroUsize::new);
1865 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1866 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1867
1868 let request_id = DataActor::request_funding_rates(
1869 self.inner_mut(),
1870 instrument_id,
1871 start,
1872 end,
1873 limit,
1874 client_id,
1875 params,
1876 )
1877 .map_err(to_pyvalue_err)?;
1878 Ok(request_id.to_string())
1879 }
1880
1881 #[pyo3(name = "request_bars")]
1882 #[pyo3(signature = (bar_type, start=None, end=None, limit=None, client_id=None, params=None))]
1883 #[expect(clippy::too_many_arguments)]
1884 fn py_request_bars(
1885 &mut self,
1886 py: Python<'_>,
1887 bar_type: BarType,
1888 start: Option<u64>,
1889 end: Option<u64>,
1890 limit: Option<usize>,
1891 client_id: Option<ClientId>,
1892 params: Option<Py<PyDict>>,
1893 ) -> PyResult<String> {
1894 let params = dict_to_params(py, params)?;
1895 let limit = limit.and_then(NonZeroUsize::new);
1896 let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1897 let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1898
1899 let request_id = DataActor::request_bars(
1900 self.inner_mut(),
1901 bar_type,
1902 start,
1903 end,
1904 limit,
1905 client_id,
1906 params,
1907 )
1908 .map_err(to_pyvalue_err)?;
1909 Ok(request_id.to_string())
1910 }
1911
1912 #[allow(unused_variables, clippy::needless_pass_by_value)]
1913 #[pyo3(name = "on_historical_data")]
1914 fn py_on_historical_data(&mut self, data: Py<PyAny>) {
1915 }
1917
1918 #[allow(unused_variables, clippy::needless_pass_by_value)]
1919 #[pyo3(name = "on_historical_quotes")]
1920 fn py_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) {
1921 }
1923
1924 #[allow(unused_variables, clippy::needless_pass_by_value)]
1925 #[pyo3(name = "on_historical_trades")]
1926 fn py_on_historical_trades(&mut self, trades: Vec<TradeTick>) {
1927 }
1929
1930 #[allow(unused_variables, clippy::needless_pass_by_value)]
1931 #[pyo3(name = "on_historical_funding_rates")]
1932 fn py_on_historical_funding_rates(&mut self, funding_rates: Vec<FundingRateUpdate>) {
1933 }
1935
1936 #[allow(unused_variables, clippy::needless_pass_by_value)]
1937 #[pyo3(name = "on_historical_bars")]
1938 fn py_on_historical_bars(&mut self, bars: Vec<Bar>) {
1939 }
1941
1942 #[allow(unused_variables, clippy::needless_pass_by_value)]
1943 #[pyo3(name = "on_historical_mark_prices")]
1944 fn py_on_historical_mark_prices(&mut self, mark_prices: Vec<MarkPriceUpdate>) {
1945 }
1947
1948 #[allow(unused_variables, clippy::needless_pass_by_value)]
1949 #[pyo3(name = "on_historical_index_prices")]
1950 fn py_on_historical_index_prices(&mut self, index_prices: Vec<IndexPriceUpdate>) {
1951 }
1953}
1954
1955#[cfg(feature = "defi")]
1956#[pymethods]
1957#[pyo3_stub_gen::derive::gen_stub_pymethods]
1958impl PyDataActor {
1959 #[pyo3(name = "on_block")]
1960 #[allow(unused_variables, clippy::needless_pass_by_value)]
1961 fn py_on_block(&mut self, block: Block) {}
1962
1963 #[pyo3(name = "on_pool")]
1964 #[allow(unused_variables, clippy::needless_pass_by_value)]
1965 fn py_on_pool(&mut self, pool: Pool) {}
1966
1967 #[pyo3(name = "on_pool_swap")]
1968 #[allow(unused_variables, clippy::needless_pass_by_value)]
1969 fn py_on_pool_swap(&mut self, swap: PoolSwap) {}
1970
1971 #[pyo3(name = "on_pool_liquidity_update")]
1972 #[allow(unused_variables, clippy::needless_pass_by_value)]
1973 fn py_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) {}
1974
1975 #[pyo3(name = "on_pool_fee_collect")]
1976 #[allow(unused_variables, clippy::needless_pass_by_value)]
1977 fn py_on_pool_fee_collect(&mut self, update: PoolFeeCollect) {}
1978
1979 #[pyo3(name = "on_pool_flash")]
1980 #[allow(unused_variables, clippy::needless_pass_by_value)]
1981 fn py_on_pool_flash(&mut self, flash: PoolFlash) {}
1982
1983 #[pyo3(name = "subscribe_blocks")]
1984 #[pyo3(signature = (chain, client_id=None, params=None))]
1985 fn py_subscribe_blocks(
1986 &mut self,
1987 py: Python<'_>,
1988 chain: Blockchain,
1989 client_id: Option<ClientId>,
1990 params: Option<Py<PyDict>>,
1991 ) -> PyResult<()> {
1992 let params = dict_to_params(py, params)?;
1993 DataActor::subscribe_blocks(self.inner_mut(), chain, client_id, params);
1994 Ok(())
1995 }
1996
1997 #[pyo3(name = "subscribe_pool")]
1998 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1999 fn py_subscribe_pool(
2000 &mut self,
2001 py: Python<'_>,
2002 instrument_id: InstrumentId,
2003 client_id: Option<ClientId>,
2004 params: Option<Py<PyDict>>,
2005 ) -> PyResult<()> {
2006 let params = dict_to_params(py, params)?;
2007 DataActor::subscribe_pool(self.inner_mut(), instrument_id, client_id, params);
2008 Ok(())
2009 }
2010
2011 #[pyo3(name = "subscribe_pool_swaps")]
2012 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2013 fn py_subscribe_pool_swaps(
2014 &mut self,
2015 py: Python<'_>,
2016 instrument_id: InstrumentId,
2017 client_id: Option<ClientId>,
2018 params: Option<Py<PyDict>>,
2019 ) -> PyResult<()> {
2020 let params = dict_to_params(py, params)?;
2021 DataActor::subscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
2022 Ok(())
2023 }
2024
2025 #[pyo3(name = "subscribe_pool_liquidity_updates")]
2026 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2027 fn py_subscribe_pool_liquidity_updates(
2028 &mut self,
2029 py: Python<'_>,
2030 instrument_id: InstrumentId,
2031 client_id: Option<ClientId>,
2032 params: Option<Py<PyDict>>,
2033 ) -> PyResult<()> {
2034 let params = dict_to_params(py, params)?;
2035 DataActor::subscribe_pool_liquidity_updates(
2036 self.inner_mut(),
2037 instrument_id,
2038 client_id,
2039 params,
2040 );
2041 Ok(())
2042 }
2043
2044 #[pyo3(name = "subscribe_pool_fee_collects")]
2045 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2046 fn py_subscribe_pool_fee_collects(
2047 &mut self,
2048 py: Python<'_>,
2049 instrument_id: InstrumentId,
2050 client_id: Option<ClientId>,
2051 params: Option<Py<PyDict>>,
2052 ) -> PyResult<()> {
2053 let params = dict_to_params(py, params)?;
2054 DataActor::subscribe_pool_fee_collects(self.inner_mut(), instrument_id, client_id, params);
2055 Ok(())
2056 }
2057
2058 #[pyo3(name = "subscribe_pool_flash_events")]
2059 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2060 fn py_subscribe_pool_flash_events(
2061 &mut self,
2062 py: Python<'_>,
2063 instrument_id: InstrumentId,
2064 client_id: Option<ClientId>,
2065 params: Option<Py<PyDict>>,
2066 ) -> PyResult<()> {
2067 let params = dict_to_params(py, params)?;
2068 DataActor::subscribe_pool_flash_events(self.inner_mut(), instrument_id, client_id, params);
2069 Ok(())
2070 }
2071
2072 #[pyo3(name = "unsubscribe_blocks")]
2073 #[pyo3(signature = (chain, client_id=None, params=None))]
2074 fn py_unsubscribe_blocks(
2075 &mut self,
2076 py: Python<'_>,
2077 chain: Blockchain,
2078 client_id: Option<ClientId>,
2079 params: Option<Py<PyDict>>,
2080 ) -> PyResult<()> {
2081 let params = dict_to_params(py, params)?;
2082 DataActor::unsubscribe_blocks(self.inner_mut(), chain, client_id, params);
2083 Ok(())
2084 }
2085
2086 #[pyo3(name = "unsubscribe_pool")]
2087 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2088 fn py_unsubscribe_pool(
2089 &mut self,
2090 py: Python<'_>,
2091 instrument_id: InstrumentId,
2092 client_id: Option<ClientId>,
2093 params: Option<Py<PyDict>>,
2094 ) -> PyResult<()> {
2095 let params = dict_to_params(py, params)?;
2096 DataActor::unsubscribe_pool(self.inner_mut(), instrument_id, client_id, params);
2097 Ok(())
2098 }
2099
2100 #[pyo3(name = "unsubscribe_pool_swaps")]
2101 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2102 fn py_unsubscribe_pool_swaps(
2103 &mut self,
2104 py: Python<'_>,
2105 instrument_id: InstrumentId,
2106 client_id: Option<ClientId>,
2107 params: Option<Py<PyDict>>,
2108 ) -> PyResult<()> {
2109 let params = dict_to_params(py, params)?;
2110 DataActor::unsubscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
2111 Ok(())
2112 }
2113
2114 #[pyo3(name = "unsubscribe_pool_liquidity_updates")]
2115 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2116 fn py_unsubscribe_pool_liquidity_updates(
2117 &mut self,
2118 py: Python<'_>,
2119 instrument_id: InstrumentId,
2120 client_id: Option<ClientId>,
2121 params: Option<Py<PyDict>>,
2122 ) -> PyResult<()> {
2123 let params = dict_to_params(py, params)?;
2124 DataActor::unsubscribe_pool_liquidity_updates(
2125 self.inner_mut(),
2126 instrument_id,
2127 client_id,
2128 params,
2129 );
2130 Ok(())
2131 }
2132
2133 #[pyo3(name = "unsubscribe_pool_fee_collects")]
2134 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2135 fn py_unsubscribe_pool_fee_collects(
2136 &mut self,
2137 py: Python<'_>,
2138 instrument_id: InstrumentId,
2139 client_id: Option<ClientId>,
2140 params: Option<Py<PyDict>>,
2141 ) -> PyResult<()> {
2142 let params = dict_to_params(py, params)?;
2143 DataActor::unsubscribe_pool_fee_collects(
2144 self.inner_mut(),
2145 instrument_id,
2146 client_id,
2147 params,
2148 );
2149 Ok(())
2150 }
2151
2152 #[pyo3(name = "unsubscribe_pool_flash_events")]
2153 #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2154 fn py_unsubscribe_pool_flash_events(
2155 &mut self,
2156 py: Python<'_>,
2157 instrument_id: InstrumentId,
2158 client_id: Option<ClientId>,
2159 params: Option<Py<PyDict>>,
2160 ) -> PyResult<()> {
2161 let params = dict_to_params(py, params)?;
2162 DataActor::unsubscribe_pool_flash_events(
2163 self.inner_mut(),
2164 instrument_id,
2165 client_id,
2166 params,
2167 );
2168 Ok(())
2169 }
2170}
2171
2172#[cfg(test)]
2173mod tests {
2174 use std::{cell::RefCell, rc::Rc, str::FromStr, sync::Arc};
2175
2176 #[cfg(feature = "defi")]
2177 use alloy_primitives::{I256, U160, U256};
2178 use nautilus_core::{UUID4, UnixNanos, python::IntoPyObjectNautilusExt};
2179 #[cfg(feature = "defi")]
2180 use nautilus_model::defi::{
2181 AmmType, Block, Blockchain, Chain, Dex, DexType, Pool, PoolFeeCollect, PoolFlash,
2182 PoolIdentifier, PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap, Token,
2183 };
2184 use nautilus_model::{
2185 data::{
2186 Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate,
2187 InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, QuoteTick,
2188 TradeTick,
2189 close::InstrumentClose,
2190 greeks::OptionGreekValues,
2191 option_chain::{OptionChainSlice, OptionGreeks},
2192 stubs::stub_custom_data,
2193 },
2194 enums::{
2195 AggressorSide, BookType, GreeksConvention, InstrumentCloseType, MarketStatusAction,
2196 },
2197 identifiers::{ClientId, OptionSeriesId, TradeId, TraderId, Venue},
2198 instruments::{CurrencyPair, InstrumentAny, stubs::audusd_sim},
2199 orderbook::OrderBook,
2200 types::{Price, Quantity},
2201 };
2202 use pyo3::{Py, PyAny, PyResult, Python, ffi::c_str, types::PyAnyMethods};
2203 use rstest::{fixture, rstest};
2204 use ustr::Ustr;
2205
2206 use super::PyDataActor;
2207 use crate::{
2208 actor::DataActor,
2209 cache::Cache,
2210 clock::TestClock,
2211 enums::ComponentState,
2212 runner::{SyncDataCommandSender, set_data_cmd_sender},
2213 signal::Signal,
2214 timer::TimeEvent,
2215 };
2216
2217 #[fixture]
2218 fn clock() -> Rc<RefCell<TestClock>> {
2219 Rc::new(RefCell::new(TestClock::new()))
2220 }
2221
2222 #[fixture]
2223 fn cache() -> Rc<RefCell<Cache>> {
2224 Rc::new(RefCell::new(Cache::new(None, None)))
2225 }
2226
2227 #[fixture]
2228 fn trader_id() -> TraderId {
2229 TraderId::from("TRADER-001")
2230 }
2231
2232 #[fixture]
2233 fn client_id() -> ClientId {
2234 ClientId::new("TestClient")
2235 }
2236
2237 #[fixture]
2238 fn venue() -> Venue {
2239 Venue::from("SIM")
2240 }
2241
2242 #[fixture]
2243 fn data_type() -> DataType {
2244 DataType::new("TestData", None, None)
2245 }
2246
2247 #[fixture]
2248 fn bar_type(audusd_sim: CurrencyPair) -> BarType {
2249 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap()
2250 }
2251
2252 fn create_unregistered_actor() -> PyDataActor {
2253 PyDataActor::new(None)
2254 }
2255
2256 fn create_registered_actor(
2257 clock: Rc<RefCell<TestClock>>,
2258 cache: Rc<RefCell<Cache>>,
2259 trader_id: TraderId,
2260 ) -> PyDataActor {
2261 let sender = SyncDataCommandSender;
2263 set_data_cmd_sender(Arc::new(sender));
2264
2265 let mut actor = PyDataActor::new(None);
2266 actor.register(trader_id, clock, cache).unwrap();
2267 actor
2268 }
2269
2270 #[rstest]
2271 fn test_new_actor_creation() {
2272 let actor = PyDataActor::new(None);
2273 assert!(actor.trader_id().is_none());
2274 }
2275
2276 #[rstest]
2277 fn test_clock_access_before_registration_raises_error() {
2278 let actor = PyDataActor::new(None);
2279
2280 let result = actor.py_clock();
2282 assert!(result.is_err());
2283
2284 let error = result.unwrap_err();
2285 pyo3::Python::initialize();
2286 pyo3::Python::attach(|py| {
2287 assert!(error.is_instance_of::<pyo3::exceptions::PyRuntimeError>(py));
2288 });
2289
2290 let error_msg = error.to_string();
2291 assert!(
2292 error_msg.contains("Actor must be registered with a trader before accessing clock")
2293 );
2294 }
2295
2296 #[rstest]
2297 fn test_unregistered_actor_methods_work() {
2298 let actor = create_unregistered_actor();
2299
2300 assert!(!actor.py_is_ready());
2301 assert!(!actor.py_is_running());
2302 assert!(!actor.py_is_stopped());
2303 assert!(!actor.py_is_disposed());
2304 assert!(!actor.py_is_degraded());
2305 assert!(!actor.py_is_faulted());
2306
2307 assert_eq!(actor.trader_id(), None);
2309 }
2310
2311 #[rstest]
2312 fn test_registration_success(
2313 clock: Rc<RefCell<TestClock>>,
2314 cache: Rc<RefCell<Cache>>,
2315 trader_id: TraderId,
2316 ) {
2317 let mut actor = create_unregistered_actor();
2318 actor.register(trader_id, clock, cache).unwrap();
2319 assert!(actor.trader_id().is_some());
2320 assert_eq!(actor.trader_id().unwrap(), trader_id);
2321 }
2322
2323 #[rstest]
2324 fn test_registered_actor_basic_properties(
2325 clock: Rc<RefCell<TestClock>>,
2326 cache: Rc<RefCell<Cache>>,
2327 trader_id: TraderId,
2328 ) {
2329 let actor = create_registered_actor(clock, cache, trader_id);
2330
2331 assert_eq!(actor.state(), ComponentState::Ready);
2332 assert_eq!(actor.trader_id(), Some(TraderId::from("TRADER-001")));
2333 assert!(actor.py_is_ready());
2334 assert!(!actor.py_is_running());
2335 assert!(!actor.py_is_stopped());
2336 assert!(!actor.py_is_disposed());
2337 assert!(!actor.py_is_degraded());
2338 assert!(!actor.py_is_faulted());
2339 }
2340
2341 #[rstest]
2342 fn test_basic_subscription_methods_compile(
2343 clock: Rc<RefCell<TestClock>>,
2344 cache: Rc<RefCell<Cache>>,
2345 trader_id: TraderId,
2346 data_type: DataType,
2347 client_id: ClientId,
2348 audusd_sim: CurrencyPair,
2349 ) {
2350 let mut actor = create_registered_actor(clock, cache, trader_id);
2351
2352 pyo3::Python::initialize();
2353 pyo3::Python::attach(|py| {
2354 assert!(
2355 actor
2356 .py_subscribe_data(py, data_type.clone(), Some(client_id), None)
2357 .is_ok()
2358 );
2359 assert!(
2360 actor
2361 .py_subscribe_quotes(py, audusd_sim.id, Some(client_id), None)
2362 .is_ok()
2363 );
2364 assert!(
2365 actor
2366 .py_unsubscribe_data(py, data_type, Some(client_id), None)
2367 .is_ok()
2368 );
2369 assert!(
2370 actor
2371 .py_unsubscribe_quotes(py, audusd_sim.id, Some(client_id), None)
2372 .is_ok()
2373 );
2374 });
2375 }
2376
2377 #[rstest]
2378 fn test_shutdown_system_passes_through(
2379 clock: Rc<RefCell<TestClock>>,
2380 cache: Rc<RefCell<Cache>>,
2381 trader_id: TraderId,
2382 ) {
2383 let actor = create_registered_actor(clock, cache, trader_id);
2384
2385 actor.py_shutdown_system(Some("Test shutdown".to_string()));
2386 actor.py_shutdown_system(None);
2387 }
2388
2389 #[rstest]
2390 fn test_publish_data_delivers_to_any_subscriber(
2391 clock: Rc<RefCell<TestClock>>,
2392 cache: Rc<RefCell<Cache>>,
2393 trader_id: TraderId,
2394 ) {
2395 use crate::msgbus::{
2396 self, MessageBus, get_message_bus, switchboard::get_custom_topic,
2397 typed_handler::ShareableMessageHandler,
2398 };
2399
2400 *get_message_bus().borrow_mut() = MessageBus::default();
2402
2403 let actor = create_registered_actor(clock, cache, trader_id);
2404 let data = stub_custom_data(1, 42, None, None);
2405 let topic = get_custom_topic(&data.data_type);
2406
2407 let received: Rc<RefCell<Vec<CustomData>>> = Rc::new(RefCell::new(Vec::new()));
2408 let received_clone = received.clone();
2409 let handler = ShareableMessageHandler::from_typed(move |d: &CustomData| {
2410 received_clone.borrow_mut().push(d.clone());
2411 });
2412 msgbus::subscribe_any(topic.into(), handler, None);
2413
2414 actor.py_publish_data(&data.data_type, &data);
2415
2416 let received = received.borrow();
2417 assert_eq!(received.len(), 1);
2418 assert_eq!(received[0].data_type, data.data_type);
2419 }
2420
2421 #[rstest]
2422 fn test_publish_signal_delivers_to_customdata_subscriber(
2423 clock: Rc<RefCell<TestClock>>,
2424 cache: Rc<RefCell<Cache>>,
2425 trader_id: TraderId,
2426 ) {
2427 use crate::{
2428 msgbus::{
2429 self, MessageBus, Pattern, get_message_bus, typed_handler::ShareableMessageHandler,
2430 },
2431 signal::Signal,
2432 };
2433
2434 *get_message_bus().borrow_mut() = MessageBus::default();
2435
2436 let actor = create_registered_actor(clock, cache, trader_id);
2437
2438 let received: Rc<RefCell<Vec<Signal>>> = Rc::new(RefCell::new(Vec::new()));
2441 let received_clone = received.clone();
2442 let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
2443 if let Some(sig) = data.data.as_any().downcast_ref::<Signal>() {
2444 received_clone.borrow_mut().push(sig.clone());
2445 }
2446 });
2447 let pattern: crate::msgbus::MStr<Pattern> = "data.Signal*".to_string().into();
2448 msgbus::subscribe_any(pattern, handler, None);
2449
2450 pyo3::Python::initialize();
2451 Python::attach(|py| {
2452 let val1: Py<PyAny> = 1.0_f64.into_py_any_unwrap(py);
2453 let val2: Py<PyAny> = "HIGH".into_py_any_unwrap(py);
2454 actor.py_publish_signal(py, "example", val1, 0).unwrap();
2455 actor
2456 .py_publish_signal(py, "risk", val2, 1_700_000_000_000_000_000)
2457 .unwrap();
2458 });
2459
2460 let received = received.borrow();
2461 assert_eq!(received.len(), 2);
2462 assert_eq!(received[0].name.as_str(), "example");
2463 assert_eq!(received[0].value, "1.0");
2464 assert_eq!(received[1].name.as_str(), "risk");
2465 assert_eq!(received[1].value, "HIGH");
2466 assert_eq!(
2467 received[1].ts_event,
2468 UnixNanos::from(1_700_000_000_000_000_000_u64)
2469 );
2470 }
2471
2472 #[rstest]
2473 fn test_publish_signal_accepts_numeric_py_values(
2474 clock: Rc<RefCell<TestClock>>,
2475 cache: Rc<RefCell<Cache>>,
2476 trader_id: TraderId,
2477 ) {
2478 use crate::{
2479 msgbus::{
2480 self, MessageBus, Pattern, get_message_bus, typed_handler::ShareableMessageHandler,
2481 },
2482 signal::Signal,
2483 };
2484
2485 *get_message_bus().borrow_mut() = MessageBus::default();
2486
2487 let actor = create_registered_actor(clock, cache, trader_id);
2488
2489 let received: Rc<RefCell<Vec<Signal>>> = Rc::new(RefCell::new(Vec::new()));
2490 let received_clone = received.clone();
2491 let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
2492 if let Some(sig) = data.data.as_any().downcast_ref::<Signal>() {
2493 received_clone.borrow_mut().push(sig.clone());
2494 }
2495 });
2496 let pattern: crate::msgbus::MStr<Pattern> = "data.Signal*".to_string().into();
2497 msgbus::subscribe_any(pattern, handler, None);
2498
2499 pyo3::Python::initialize();
2500 Python::attach(|py| {
2501 let int_value: Py<PyAny> = 42_i64.into_py_any_unwrap(py);
2502 let float_value: Py<PyAny> = 3.5_f64.into_py_any_unwrap(py);
2503 let bool_value: Py<PyAny> = true.into_py_any_unwrap(py);
2504 actor.py_publish_signal(py, "count", int_value, 0).unwrap();
2505 actor
2506 .py_publish_signal(py, "ratio", float_value, 0)
2507 .unwrap();
2508 actor
2509 .py_publish_signal(py, "active", bool_value, 0)
2510 .unwrap();
2511 });
2512
2513 let received = received.borrow();
2514 assert_eq!(received.len(), 3);
2515 assert_eq!(received[0].value, "42");
2516 assert_eq!(received[1].value, "3.5");
2517 assert_eq!(received[2].value, "True");
2518 }
2519
2520 #[rstest]
2521 fn test_subscribe_and_unsubscribe_signal_compile(
2522 clock: Rc<RefCell<TestClock>>,
2523 cache: Rc<RefCell<Cache>>,
2524 trader_id: TraderId,
2525 ) {
2526 use crate::msgbus::{MessageBus, get_message_bus};
2527
2528 *get_message_bus().borrow_mut() = MessageBus::default();
2529
2530 let mut actor = create_registered_actor(clock, cache, trader_id);
2531 actor.py_subscribe_signal("example");
2532 actor.py_unsubscribe_signal("example");
2533 actor.py_subscribe_signal("");
2534 actor.py_unsubscribe_signal("");
2535 }
2536
2537 #[rstest]
2538 fn test_publish_data_dispatches_to_python_on_data(
2539 clock: Rc<RefCell<TestClock>>,
2540 cache: Rc<RefCell<Cache>>,
2541 trader_id: TraderId,
2542 ) {
2543 use crate::msgbus::{MessageBus, get_message_bus};
2544
2545 *get_message_bus().borrow_mut() = MessageBus::default();
2546
2547 pyo3::Python::initialize();
2548 Python::attach(|py| {
2549 let py_actor = create_tracking_python_actor(py).unwrap();
2550
2551 let mut rust_actor = PyDataActor::new(None);
2552 rust_actor.set_python_instance(py_actor.clone_ref(py));
2553 rust_actor.register(trader_id, clock, cache).unwrap();
2554 rust_actor.register_in_global_registries();
2555 rust_actor.py_start().unwrap();
2556
2557 let data = stub_custom_data(1, 42, None, None);
2558 rust_actor
2559 .py_subscribe_data(py, data.data_type.clone(), None, None)
2560 .unwrap();
2561
2562 rust_actor.py_publish_data(&data.data_type, &data);
2563 rust_actor.py_publish_data(&data.data_type, &data);
2564
2565 assert!(python_method_was_called(&py_actor, py, "on_data"));
2566 assert_eq!(python_method_call_count(&py_actor, py, "on_data"), 2);
2567 });
2568 }
2569
2570 #[rstest]
2571 fn test_publish_signal_dispatches_to_python_on_signal(
2572 clock: Rc<RefCell<TestClock>>,
2573 cache: Rc<RefCell<Cache>>,
2574 trader_id: TraderId,
2575 ) {
2576 use crate::msgbus::{MessageBus, get_message_bus};
2577
2578 *get_message_bus().borrow_mut() = MessageBus::default();
2579
2580 pyo3::Python::initialize();
2581 Python::attach(|py| {
2582 let py_actor = create_tracking_python_actor(py).unwrap();
2583
2584 let mut rust_actor = PyDataActor::new(None);
2585 rust_actor.set_python_instance(py_actor.clone_ref(py));
2586 rust_actor.register(trader_id, clock, cache).unwrap();
2587 rust_actor.register_in_global_registries();
2588 rust_actor.py_start().unwrap();
2589
2590 rust_actor.py_subscribe_signal("example");
2591 let val1: Py<PyAny> = "1.5".into_py_any_unwrap(py);
2592 let val2: Py<PyAny> = 2.0_f64.into_py_any_unwrap(py);
2593 rust_actor
2594 .py_publish_signal(py, "example", val1, 0)
2595 .unwrap();
2596 rust_actor
2597 .py_publish_signal(py, "example", val2, 1_700_000_000_000_000_000)
2598 .unwrap();
2599
2600 assert!(python_method_was_called(&py_actor, py, "on_signal"));
2601 assert_eq!(python_method_call_count(&py_actor, py, "on_signal"), 2);
2602 });
2603 }
2604
2605 #[rstest]
2606 fn test_unsubscribe_signal_stops_python_dispatch(
2607 clock: Rc<RefCell<TestClock>>,
2608 cache: Rc<RefCell<Cache>>,
2609 trader_id: TraderId,
2610 ) {
2611 use crate::msgbus::{MessageBus, get_message_bus};
2612
2613 *get_message_bus().borrow_mut() = MessageBus::default();
2614
2615 pyo3::Python::initialize();
2616 Python::attach(|py| {
2617 let py_actor = create_tracking_python_actor(py).unwrap();
2618
2619 let mut rust_actor = PyDataActor::new(None);
2620 rust_actor.set_python_instance(py_actor.clone_ref(py));
2621 rust_actor.register(trader_id, clock, cache).unwrap();
2622 rust_actor.register_in_global_registries();
2623 rust_actor.py_start().unwrap();
2624
2625 rust_actor.py_subscribe_signal("example");
2626 let val1: Py<PyAny> = "1".into_py_any_unwrap(py);
2627 let val2: Py<PyAny> = "2".into_py_any_unwrap(py);
2628 rust_actor
2629 .py_publish_signal(py, "example", val1, 0)
2630 .unwrap();
2631
2632 rust_actor.py_unsubscribe_signal("example");
2633 rust_actor
2634 .py_publish_signal(py, "example", val2, 0)
2635 .unwrap();
2636
2637 assert_eq!(python_method_call_count(&py_actor, py, "on_signal"), 1);
2638 });
2639 }
2640
2641 #[rstest]
2642 fn test_subscribe_signal_wildcard_dispatches_all_names_to_python(
2643 clock: Rc<RefCell<TestClock>>,
2644 cache: Rc<RefCell<Cache>>,
2645 trader_id: TraderId,
2646 ) {
2647 use crate::msgbus::{MessageBus, get_message_bus};
2648
2649 *get_message_bus().borrow_mut() = MessageBus::default();
2650
2651 pyo3::Python::initialize();
2652 Python::attach(|py| {
2653 let py_actor = create_tracking_python_actor(py).unwrap();
2654
2655 let mut rust_actor = PyDataActor::new(None);
2656 rust_actor.set_python_instance(py_actor.clone_ref(py));
2657 rust_actor.register(trader_id, clock, cache).unwrap();
2658 rust_actor.register_in_global_registries();
2659 rust_actor.py_start().unwrap();
2660
2661 rust_actor.py_subscribe_signal("");
2662 let val1: Py<PyAny> = "1".into_py_any_unwrap(py);
2663 let val2: Py<PyAny> = "2".into_py_any_unwrap(py);
2664 let val3: Py<PyAny> = "3".into_py_any_unwrap(py);
2665 rust_actor.py_publish_signal(py, "alpha", val1, 0).unwrap();
2666 rust_actor.py_publish_signal(py, "beta", val2, 0).unwrap();
2667 rust_actor.py_publish_signal(py, "gamma", val3, 0).unwrap();
2668
2669 assert_eq!(python_method_call_count(&py_actor, py, "on_signal"), 3);
2670 });
2671 }
2672
2673 #[rstest]
2674 fn test_signal_customdata_unwraps_to_python_signal(
2675 clock: Rc<RefCell<TestClock>>,
2676 cache: Rc<RefCell<Cache>>,
2677 trader_id: TraderId,
2678 ) {
2679 use crate::msgbus::{MessageBus, get_message_bus};
2685
2686 *get_message_bus().borrow_mut() = MessageBus::default();
2687
2688 pyo3::Python::initialize();
2689 Python::attach(|py| {
2690 let capture_code = c_str!(
2691 r#"
2692class CapturingActor:
2693 def __init__(self):
2694 self.captured = []
2695
2696 def on_start(self): pass
2697 def on_stop(self): pass
2698 def on_resume(self): pass
2699 def on_reset(self): pass
2700 def on_dispose(self): pass
2701 def on_degrade(self): pass
2702 def on_fault(self): pass
2703 def on_signal(self, signal): pass
2704
2705 def on_data(self, custom):
2706 # Exercise the CustomData.data getter: raises TypeError if the
2707 # inner payload cannot be converted back to a Python object.
2708 inner = custom.data
2709 self.captured.append((type(inner).__name__, inner.name, inner.value))
2710"#
2711 );
2712 py.run(capture_code, None, None).unwrap();
2713 let cls = py.eval(c_str!("CapturingActor"), None, None).unwrap();
2714 let py_actor: Py<PyAny> = cls.call0().unwrap().unbind();
2715
2716 let mut rust_actor = PyDataActor::new(None);
2717 rust_actor.set_python_instance(py_actor.clone_ref(py));
2718 rust_actor.register(trader_id, clock, cache).unwrap();
2719 rust_actor.register_in_global_registries();
2720 rust_actor.py_start().unwrap();
2721
2722 let data_type = DataType::new("SignalExample", None, None);
2725 rust_actor
2726 .py_subscribe_data(py, data_type, None, None)
2727 .unwrap();
2728
2729 let val: Py<PyAny> = "1.5".into_py_any_unwrap(py);
2730 rust_actor.py_publish_signal(py, "example", val, 0).unwrap();
2731
2732 let captured = py_actor
2733 .bind(py)
2734 .getattr("captured")
2735 .unwrap()
2736 .extract::<Vec<(String, String, String)>>()
2737 .unwrap();
2738 assert_eq!(captured.len(), 1);
2739 assert_eq!(captured[0].0, "Signal");
2740 assert_eq!(captured[0].1, "example");
2741 assert_eq!(captured[0].2, "1.5");
2742 });
2743 }
2744
2745 #[rstest]
2746 fn test_add_and_update_synthetic_via_pyo3(
2747 clock: Rc<RefCell<TestClock>>,
2748 cache: Rc<RefCell<Cache>>,
2749 trader_id: TraderId,
2750 ) {
2751 use nautilus_model::{
2752 identifiers::{InstrumentId, Symbol},
2753 instruments::SyntheticInstrument,
2754 };
2755
2756 let actor = create_registered_actor(clock, cache.clone(), trader_id);
2757
2758 let comp1 = InstrumentId::from_str("BTC-USD.VENUE").unwrap();
2759 let comp2 = InstrumentId::from_str("ETH-USD.VENUE").unwrap();
2760 let formula = format!("({comp1} + {comp2}) / 2.0");
2761 let synthetic = SyntheticInstrument::new(
2762 Symbol::from("SYN"),
2763 2,
2764 vec![comp1, comp2],
2765 &formula,
2766 UnixNanos::default(),
2767 UnixNanos::default(),
2768 );
2769 let synthetic_id = synthetic.id;
2770
2771 actor.py_add_synthetic(synthetic.clone()).unwrap();
2772 assert!(cache.borrow().synthetic(&synthetic_id).is_some());
2773
2774 assert!(actor.py_add_synthetic(synthetic).is_err());
2776
2777 let new_formula = format!("{comp1} + {comp2}");
2778 let updated = SyntheticInstrument::new(
2779 Symbol::from("SYN"),
2780 2,
2781 vec![comp1, comp2],
2782 &new_formula,
2783 UnixNanos::default(),
2784 UnixNanos::default(),
2785 );
2786 actor.py_update_synthetic(updated).unwrap();
2787 assert_eq!(
2788 cache.borrow().synthetic(&synthetic_id).unwrap().formula,
2789 new_formula
2790 );
2791
2792 let missing = SyntheticInstrument::new(
2794 Symbol::from("GONE"),
2795 2,
2796 vec![comp1, comp2],
2797 &formula,
2798 UnixNanos::default(),
2799 UnixNanos::default(),
2800 );
2801 assert!(actor.py_update_synthetic(missing).is_err());
2802 }
2803
2804 #[rstest]
2805 fn test_book_at_interval_invalid_interval_ms(
2806 clock: Rc<RefCell<TestClock>>,
2807 cache: Rc<RefCell<Cache>>,
2808 trader_id: TraderId,
2809 audusd_sim: CurrencyPair,
2810 ) {
2811 pyo3::Python::initialize();
2812 let mut actor = create_registered_actor(clock, cache, trader_id);
2813
2814 pyo3::Python::attach(|py| {
2815 let result = actor.py_subscribe_book_at_interval(
2816 py,
2817 audusd_sim.id,
2818 BookType::L2_MBP,
2819 0,
2820 None,
2821 None,
2822 None,
2823 );
2824 assert!(result.is_err());
2825 assert_eq!(
2826 result.unwrap_err().to_string(),
2827 "ValueError: interval_ms must be > 0"
2828 );
2829
2830 let result = actor.py_unsubscribe_book_at_interval(py, audusd_sim.id, 0, None, None);
2831 assert!(result.is_err());
2832 assert_eq!(
2833 result.unwrap_err().to_string(),
2834 "ValueError: interval_ms must be > 0"
2835 );
2836 });
2837 }
2838
2839 #[rstest]
2840 fn test_request_methods_signatures_exist() {
2841 let actor = create_unregistered_actor();
2842 assert!(actor.trader_id().is_none());
2843 }
2844
2845 #[rstest]
2846 fn test_data_actor_trait_implementation(
2847 clock: Rc<RefCell<TestClock>>,
2848 cache: Rc<RefCell<Cache>>,
2849 trader_id: TraderId,
2850 ) {
2851 let actor = create_registered_actor(clock, cache, trader_id);
2852 let state = actor.state();
2853 assert_eq!(state, ComponentState::Ready);
2854 }
2855
2856 fn sample_instrument() -> CurrencyPair {
2857 audusd_sim()
2858 }
2859
2860 fn sample_data() -> CustomData {
2861 stub_custom_data(1, 42, None, None)
2862 }
2863
2864 fn sample_time_event() -> TimeEvent {
2865 TimeEvent::new(
2866 Ustr::from("test_timer"),
2867 UUID4::new(),
2868 UnixNanos::default(),
2869 UnixNanos::default(),
2870 )
2871 }
2872
2873 fn sample_signal() -> Signal {
2874 Signal::new(
2875 Ustr::from("test_signal"),
2876 "1.0".to_string(),
2877 UnixNanos::default(),
2878 UnixNanos::default(),
2879 )
2880 }
2881
2882 fn sample_quote() -> QuoteTick {
2883 let instrument = sample_instrument();
2884 QuoteTick::new(
2885 instrument.id,
2886 Price::from("1.00000"),
2887 Price::from("1.00001"),
2888 Quantity::from(100_000),
2889 Quantity::from(100_000),
2890 UnixNanos::default(),
2891 UnixNanos::default(),
2892 )
2893 }
2894
2895 fn sample_trade() -> TradeTick {
2896 let instrument = sample_instrument();
2897 TradeTick::new(
2898 instrument.id,
2899 Price::from("1.00000"),
2900 Quantity::from(100_000),
2901 AggressorSide::Buyer,
2902 TradeId::new("123456"),
2903 UnixNanos::default(),
2904 UnixNanos::default(),
2905 )
2906 }
2907
2908 fn sample_bar() -> Bar {
2909 let instrument = sample_instrument();
2910 let bar_type =
2911 BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", instrument.id)).unwrap();
2912 Bar::new(
2913 bar_type,
2914 Price::from("1.00000"),
2915 Price::from("1.00010"),
2916 Price::from("0.99990"),
2917 Price::from("1.00005"),
2918 Quantity::from(100_000),
2919 UnixNanos::default(),
2920 UnixNanos::default(),
2921 )
2922 }
2923
2924 fn sample_book() -> OrderBook {
2925 OrderBook::new(sample_instrument().id, BookType::L2_MBP)
2926 }
2927
2928 fn sample_book_deltas() -> OrderBookDeltas {
2929 let instrument = sample_instrument();
2930 let delta =
2931 OrderBookDelta::clear(instrument.id, 0, UnixNanos::default(), UnixNanos::default());
2932 OrderBookDeltas::new(instrument.id, vec![delta])
2933 }
2934
2935 fn sample_mark_price() -> MarkPriceUpdate {
2936 MarkPriceUpdate::new(
2937 sample_instrument().id,
2938 Price::from("1.00000"),
2939 UnixNanos::default(),
2940 UnixNanos::default(),
2941 )
2942 }
2943
2944 fn sample_index_price() -> IndexPriceUpdate {
2945 IndexPriceUpdate::new(
2946 sample_instrument().id,
2947 Price::from("1.00000"),
2948 UnixNanos::default(),
2949 UnixNanos::default(),
2950 )
2951 }
2952
2953 fn sample_funding_rate() -> FundingRateUpdate {
2954 FundingRateUpdate::new(
2955 sample_instrument().id,
2956 "0.0001".parse().unwrap(),
2957 None,
2958 None,
2959 UnixNanos::default(),
2960 UnixNanos::default(),
2961 )
2962 }
2963
2964 fn sample_instrument_status() -> InstrumentStatus {
2965 InstrumentStatus::new(
2966 sample_instrument().id,
2967 MarketStatusAction::Trading,
2968 UnixNanos::default(),
2969 UnixNanos::default(),
2970 None,
2971 None,
2972 None,
2973 None,
2974 None,
2975 )
2976 }
2977
2978 fn sample_instrument_close() -> InstrumentClose {
2979 InstrumentClose::new(
2980 sample_instrument().id,
2981 Price::from("1.00000"),
2982 InstrumentCloseType::EndOfSession,
2983 UnixNanos::default(),
2984 UnixNanos::default(),
2985 )
2986 }
2987
2988 fn sample_option_greeks() -> OptionGreeks {
2989 OptionGreeks {
2990 instrument_id: sample_instrument().id,
2991 convention: GreeksConvention::BlackScholes,
2992 greeks: OptionGreekValues {
2993 delta: 0.55,
2994 gamma: 0.03,
2995 vega: 0.12,
2996 theta: -0.05,
2997 rho: 0.01,
2998 },
2999 mark_iv: Some(0.25),
3000 bid_iv: None,
3001 ask_iv: None,
3002 underlying_price: None,
3003 open_interest: None,
3004 ts_event: UnixNanos::default(),
3005 ts_init: UnixNanos::default(),
3006 }
3007 }
3008
3009 fn sample_option_chain() -> OptionChainSlice {
3010 OptionChainSlice {
3011 series_id: OptionSeriesId::new(
3012 Venue::from("SIM"),
3013 Ustr::from("AUD"),
3014 Ustr::from("USD"),
3015 UnixNanos::from(1_711_036_800_000_000_000),
3016 ),
3017 atm_strike: None,
3018 calls: Default::default(),
3019 puts: Default::default(),
3020 ts_event: UnixNanos::default(),
3021 ts_init: UnixNanos::default(),
3022 }
3023 }
3024
3025 #[cfg(feature = "defi")]
3026 fn sample_block() -> Block {
3027 Block::new(
3028 "0x1234567890abcdef".to_string(),
3029 "0xabcdef1234567890".to_string(),
3030 12345,
3031 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
3032 21000,
3033 20000,
3034 UnixNanos::default(),
3035 Some(Blockchain::Ethereum),
3036 )
3037 }
3038
3039 #[cfg(feature = "defi")]
3040 fn sample_pool_components() -> (Arc<Chain>, Arc<Dex>, Pool) {
3041 let chain = Arc::new(Chain::new(Blockchain::Ethereum, 1));
3042 let dex = Arc::new(Dex::new(
3043 Chain::new(Blockchain::Ethereum, 1),
3044 DexType::UniswapV3,
3045 "0x1F98431c8aD98523631AE4a59f267346ea31F984",
3046 0,
3047 AmmType::CLAMM,
3048 "PoolCreated",
3049 "Swap",
3050 "Mint",
3051 "Burn",
3052 "Collect",
3053 ));
3054 let token0 = Token::new(
3055 chain.clone(),
3056 "0xa0b86a33e6441c8c06dd7b111a8c4e82e2b2a5e1"
3057 .parse()
3058 .unwrap(),
3059 "USDC".into(),
3060 "USD Coin".into(),
3061 6,
3062 );
3063 let token1 = Token::new(
3064 chain.clone(),
3065 "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
3066 .parse()
3067 .unwrap(),
3068 "WETH".into(),
3069 "Wrapped Ether".into(),
3070 18,
3071 );
3072 let pool_address = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
3073 .parse()
3074 .unwrap();
3075 let pool_identifier: PoolIdentifier = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
3076 .parse()
3077 .unwrap();
3078 let pool = Pool::new(
3079 chain.clone(),
3080 dex.clone(),
3081 pool_address,
3082 pool_identifier,
3083 12345,
3084 token0,
3085 token1,
3086 Some(500),
3087 Some(10),
3088 UnixNanos::default(),
3089 );
3090
3091 (chain, dex, pool)
3092 }
3093
3094 #[cfg(feature = "defi")]
3095 fn sample_pool_swap() -> PoolSwap {
3096 let (chain, dex, pool) = sample_pool_components();
3097 PoolSwap::new(
3098 chain,
3099 dex,
3100 pool.instrument_id,
3101 pool.pool_identifier,
3102 12345,
3103 "0xabc123".to_string(),
3104 0,
3105 0,
3106 None,
3107 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3108 .parse()
3109 .unwrap(),
3110 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3111 .parse()
3112 .unwrap(),
3113 I256::from_str("1000000000000000000").unwrap(),
3114 I256::from_str("400000000000000").unwrap(),
3115 U160::from(59000000000000u128),
3116 1000000,
3117 100,
3118 )
3119 }
3120
3121 #[cfg(feature = "defi")]
3122 fn sample_pool_liquidity_update() -> PoolLiquidityUpdate {
3123 let (chain, dex, pool) = sample_pool_components();
3124 PoolLiquidityUpdate::new(
3125 chain,
3126 dex,
3127 pool.instrument_id,
3128 pool.pool_identifier,
3129 PoolLiquidityUpdateType::Mint,
3130 12345,
3131 "0xabc123".to_string(),
3132 0,
3133 0,
3134 Some(
3135 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3136 .parse()
3137 .unwrap(),
3138 ),
3139 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3140 .parse()
3141 .unwrap(),
3142 1000,
3143 U256::from(1_000u64),
3144 U256::from(2_000u64),
3145 -10,
3146 10,
3147 Some(UnixNanos::default()),
3148 )
3149 }
3150
3151 #[cfg(feature = "defi")]
3152 fn sample_pool_fee_collect() -> PoolFeeCollect {
3153 let (chain, dex, pool) = sample_pool_components();
3154 PoolFeeCollect::new(
3155 chain,
3156 dex,
3157 pool.instrument_id,
3158 pool.pool_identifier,
3159 12345,
3160 "0xabc123".to_string(),
3161 0,
3162 0,
3163 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3164 .parse()
3165 .unwrap(),
3166 100,
3167 200,
3168 -10,
3169 10,
3170 Some(UnixNanos::default()),
3171 )
3172 }
3173
3174 #[cfg(feature = "defi")]
3175 fn sample_pool_flash() -> PoolFlash {
3176 let (chain, dex, pool) = sample_pool_components();
3177 PoolFlash::new(
3178 chain,
3179 dex,
3180 pool.instrument_id,
3181 pool.pool_identifier,
3182 12345,
3183 "0xabc123".to_string(),
3184 0,
3185 0,
3186 Some(UnixNanos::default()),
3187 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3188 .parse()
3189 .unwrap(),
3190 "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3191 .parse()
3192 .unwrap(),
3193 U256::from(100u64),
3194 U256::from(200u64),
3195 U256::from(101u64),
3196 U256::from(201u64),
3197 )
3198 }
3199
3200 const TRACKING_ACTOR_CODE: &std::ffi::CStr = c_str!(
3201 r#"
3202class TrackingActor:
3203 """A mock Python actor that tracks all method calls."""
3204
3205 TRACKED_METHODS = {
3206 "on_start",
3207 "on_stop",
3208 "on_resume",
3209 "on_reset",
3210 "on_dispose",
3211 "on_degrade",
3212 "on_fault",
3213 "on_time_event",
3214 "on_data",
3215 "on_signal",
3216 "on_instrument",
3217 "on_quote",
3218 "on_trade",
3219 "on_bar",
3220 "on_book",
3221 "on_book_deltas",
3222 "on_mark_price",
3223 "on_index_price",
3224 "on_funding_rate",
3225 "on_instrument_status",
3226 "on_instrument_close",
3227 "on_option_greeks",
3228 "on_option_chain",
3229 "on_historical_data",
3230 "on_historical_quotes",
3231 "on_historical_trades",
3232 "on_historical_funding_rates",
3233 "on_historical_bars",
3234 "on_historical_mark_prices",
3235 "on_historical_index_prices",
3236 "on_block",
3237 "on_pool",
3238 "on_pool_swap",
3239 "on_pool_liquidity_update",
3240 "on_pool_fee_collect",
3241 "on_pool_flash",
3242 }
3243
3244 def __init__(self):
3245 self.calls = []
3246
3247 def _record(self, method_name, *args):
3248 self.calls.append((method_name, args))
3249
3250 def was_called(self, method_name):
3251 return any(call[0] == method_name for call in self.calls)
3252
3253 def call_count(self, method_name):
3254 return sum(1 for call in self.calls if call[0] == method_name)
3255
3256 def __getattr__(self, name):
3257 if name in self.TRACKED_METHODS:
3258 return lambda *args: self._record(name, *args)
3259 raise AttributeError(name)
3260"#
3261 );
3262
3263 fn create_tracking_python_actor(py: Python<'_>) -> PyResult<Py<PyAny>> {
3264 py.run(TRACKING_ACTOR_CODE, None, None)?;
3265 let tracking_actor_class = py.eval(c_str!("TrackingActor"), None, None)?;
3266 let instance = tracking_actor_class.call0()?;
3267 Ok(instance.unbind())
3268 }
3269
3270 fn python_method_was_called(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> bool {
3271 py_actor
3272 .call_method1(py, "was_called", (method_name,))
3273 .and_then(|r| r.extract::<bool>(py))
3274 .unwrap_or(false)
3275 }
3276
3277 fn python_method_call_count(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> i32 {
3278 py_actor
3279 .call_method1(py, "call_count", (method_name,))
3280 .and_then(|r| r.extract::<i32>(py))
3281 .unwrap_or(0)
3282 }
3283
3284 fn assert_python_dispatch<F>(
3285 py: Python<'_>,
3286 clock: Rc<RefCell<TestClock>>,
3287 cache: Rc<RefCell<Cache>>,
3288 trader_id: TraderId,
3289 method_name: &str,
3290 invoke: F,
3291 ) where
3292 F: FnOnce(&mut PyDataActor) -> anyhow::Result<()>,
3293 {
3294 let py_actor = create_tracking_python_actor(py).unwrap();
3295
3296 let mut rust_actor = PyDataActor::new(None);
3297 rust_actor.set_python_instance(py_actor.clone_ref(py));
3298 rust_actor.register(trader_id, clock, cache).unwrap();
3299
3300 let result = invoke(&mut rust_actor);
3301
3302 assert!(result.is_ok());
3303 assert!(python_method_was_called(&py_actor, py, method_name));
3304 assert_eq!(python_method_call_count(&py_actor, py, method_name), 1);
3305 }
3306
3307 #[rstest]
3308 #[case("on_start")]
3309 #[case("on_stop")]
3310 #[case("on_resume")]
3311 #[case("on_reset")]
3312 #[case("on_dispose")]
3313 #[case("on_degrade")]
3314 #[case("on_fault")]
3315 fn test_python_dispatch_lifecycle_matrix(
3316 clock: Rc<RefCell<TestClock>>,
3317 cache: Rc<RefCell<Cache>>,
3318 trader_id: TraderId,
3319 #[case] method_name: &str,
3320 ) {
3321 pyo3::Python::initialize();
3322 Python::attach(|py| {
3323 assert_python_dispatch(py, clock, cache, trader_id, method_name, |rust_actor| {
3324 match method_name {
3325 "on_start" => DataActor::on_start(rust_actor.inner_mut()),
3326 "on_stop" => DataActor::on_stop(rust_actor.inner_mut()),
3327 "on_resume" => DataActor::on_resume(rust_actor.inner_mut()),
3328 "on_reset" => DataActor::on_reset(rust_actor.inner_mut()),
3329 "on_dispose" => DataActor::on_dispose(rust_actor.inner_mut()),
3330 "on_degrade" => DataActor::on_degrade(rust_actor.inner_mut()),
3331 "on_fault" => DataActor::on_fault(rust_actor.inner_mut()),
3332 _ => unreachable!("unhandled lifecycle case: {method_name}"),
3333 }
3334 });
3335 });
3336 }
3337
3338 #[rstest]
3339 #[case("on_time_event")]
3340 #[case("on_data")]
3341 #[case("on_signal")]
3342 #[case("on_instrument")]
3343 #[case("on_quote")]
3344 #[case("on_trade")]
3345 #[case("on_bar")]
3346 #[case("on_book")]
3347 #[case("on_book_deltas")]
3348 #[case("on_mark_price")]
3349 #[case("on_index_price")]
3350 #[case("on_funding_rate")]
3351 #[case("on_instrument_status")]
3352 #[case("on_instrument_close")]
3353 #[case("on_option_greeks")]
3354 #[case("on_option_chain")]
3355 fn test_python_dispatch_typed_callback_matrix(
3356 clock: Rc<RefCell<TestClock>>,
3357 cache: Rc<RefCell<Cache>>,
3358 trader_id: TraderId,
3359 #[case] method_name: &str,
3360 ) {
3361 pyo3::Python::initialize();
3362 Python::attach(|py| {
3363 assert_python_dispatch(py, clock, cache, trader_id, method_name, |rust_actor| {
3364 match method_name {
3365 "on_time_event" => {
3366 let event = sample_time_event();
3367 rust_actor.inner_mut().on_time_event(&event)
3368 }
3369 "on_data" => {
3370 let data = sample_data();
3371 rust_actor.inner_mut().on_data(&data)
3372 }
3373 "on_signal" => {
3374 let signal = sample_signal();
3375 rust_actor.inner_mut().on_signal(&signal)
3376 }
3377 "on_instrument" => {
3378 let instrument = InstrumentAny::CurrencyPair(sample_instrument());
3379 rust_actor.inner_mut().on_instrument(&instrument)
3380 }
3381 "on_quote" => {
3382 let quote = sample_quote();
3383 rust_actor.inner_mut().on_quote("e)
3384 }
3385 "on_trade" => {
3386 let trade = sample_trade();
3387 rust_actor.inner_mut().on_trade(&trade)
3388 }
3389 "on_bar" => {
3390 let bar = sample_bar();
3391 rust_actor.inner_mut().on_bar(&bar)
3392 }
3393 "on_book" => {
3394 let book = sample_book();
3395 rust_actor.inner_mut().on_book(&book)
3396 }
3397 "on_book_deltas" => {
3398 let deltas = sample_book_deltas();
3399 rust_actor.inner_mut().on_book_deltas(&deltas)
3400 }
3401 "on_mark_price" => {
3402 let update = sample_mark_price();
3403 rust_actor.inner_mut().on_mark_price(&update)
3404 }
3405 "on_index_price" => {
3406 let update = sample_index_price();
3407 rust_actor.inner_mut().on_index_price(&update)
3408 }
3409 "on_funding_rate" => {
3410 let update = sample_funding_rate();
3411 rust_actor.inner_mut().on_funding_rate(&update)
3412 }
3413 "on_instrument_status" => {
3414 let status = sample_instrument_status();
3415 rust_actor.inner_mut().on_instrument_status(&status)
3416 }
3417 "on_instrument_close" => {
3418 let close = sample_instrument_close();
3419 rust_actor.inner_mut().on_instrument_close(&close)
3420 }
3421 "on_option_greeks" => {
3422 let greeks = sample_option_greeks();
3423 rust_actor.inner_mut().on_option_greeks(&greeks)
3424 }
3425 "on_option_chain" => {
3426 let chain = sample_option_chain();
3427 rust_actor.inner_mut().on_option_chain(&chain)
3428 }
3429 _ => unreachable!("unhandled typed callback case: {method_name}"),
3430 }
3431 });
3432 });
3433 }
3434
3435 #[rstest]
3436 #[case("on_historical_data")]
3437 #[case("on_historical_quotes")]
3438 #[case("on_historical_trades")]
3439 #[case("on_historical_funding_rates")]
3440 #[case("on_historical_bars")]
3441 #[case("on_historical_mark_prices")]
3442 #[case("on_historical_index_prices")]
3443 fn test_python_dispatch_historical_callback_matrix(
3444 clock: Rc<RefCell<TestClock>>,
3445 cache: Rc<RefCell<Cache>>,
3446 trader_id: TraderId,
3447 #[case] method_name: &str,
3448 ) {
3449 pyo3::Python::initialize();
3450 Python::attach(|py| {
3451 assert_python_dispatch(py, clock, cache, trader_id, method_name, |rust_actor| {
3452 match method_name {
3453 "on_historical_data" => {
3454 let data = sample_data();
3455 rust_actor.inner_mut().on_historical_data(&data)
3456 }
3457 "on_historical_quotes" => {
3458 let quotes = vec![sample_quote()];
3459 rust_actor.inner_mut().on_historical_quotes("es)
3460 }
3461 "on_historical_trades" => {
3462 let trades = vec![sample_trade()];
3463 rust_actor.inner_mut().on_historical_trades(&trades)
3464 }
3465 "on_historical_funding_rates" => {
3466 let funding_rates = vec![sample_funding_rate()];
3467 rust_actor
3468 .inner_mut()
3469 .on_historical_funding_rates(&funding_rates)
3470 }
3471 "on_historical_bars" => {
3472 let bars = vec![sample_bar()];
3473 rust_actor.inner_mut().on_historical_bars(&bars)
3474 }
3475 "on_historical_mark_prices" => {
3476 let mark_prices = vec![sample_mark_price()];
3477 rust_actor
3478 .inner_mut()
3479 .on_historical_mark_prices(&mark_prices)
3480 }
3481 "on_historical_index_prices" => {
3482 let index_prices = vec![sample_index_price()];
3483 rust_actor
3484 .inner_mut()
3485 .on_historical_index_prices(&index_prices)
3486 }
3487 _ => unreachable!("unhandled historical callback case: {method_name}"),
3488 }
3489 });
3490 });
3491 }
3492
3493 #[cfg(feature = "defi")]
3494 #[rstest]
3495 #[case("on_block")]
3496 #[case("on_pool")]
3497 #[case("on_pool_swap")]
3498 #[case("on_pool_liquidity_update")]
3499 #[case("on_pool_fee_collect")]
3500 #[case("on_pool_flash")]
3501 fn test_python_dispatch_defi_callback_matrix(
3502 clock: Rc<RefCell<TestClock>>,
3503 cache: Rc<RefCell<Cache>>,
3504 trader_id: TraderId,
3505 #[case] method_name: &str,
3506 ) {
3507 pyo3::Python::initialize();
3508 Python::attach(|py| {
3509 assert_python_dispatch(py, clock, cache, trader_id, method_name, |rust_actor| {
3510 match method_name {
3511 "on_block" => {
3512 let block = sample_block();
3513 rust_actor.inner_mut().on_block(&block)
3514 }
3515 "on_pool" => {
3516 let (_chain, _dex, pool) = sample_pool_components();
3517 rust_actor.inner_mut().on_pool(&pool)
3518 }
3519 "on_pool_swap" => {
3520 let swap = sample_pool_swap();
3521 rust_actor.inner_mut().on_pool_swap(&swap)
3522 }
3523 "on_pool_liquidity_update" => {
3524 let update = sample_pool_liquidity_update();
3525 rust_actor.inner_mut().on_pool_liquidity_update(&update)
3526 }
3527 "on_pool_fee_collect" => {
3528 let collect = sample_pool_fee_collect();
3529 rust_actor.inner_mut().on_pool_fee_collect(&collect)
3530 }
3531 "on_pool_flash" => {
3532 let flash = sample_pool_flash();
3533 rust_actor.inner_mut().on_pool_flash(&flash)
3534 }
3535 _ => unreachable!("unhandled defi callback case: {method_name}"),
3536 }
3537 });
3538 });
3539 }
3540
3541 #[rstest]
3542 fn test_python_dispatch_multiple_calls_tracked(
3543 clock: Rc<RefCell<TestClock>>,
3544 cache: Rc<RefCell<Cache>>,
3545 trader_id: TraderId,
3546 audusd_sim: CurrencyPair,
3547 ) {
3548 pyo3::Python::initialize();
3549 Python::attach(|py| {
3550 let py_actor = create_tracking_python_actor(py).unwrap();
3551
3552 let mut rust_actor = PyDataActor::new(None);
3553 rust_actor.set_python_instance(py_actor.clone_ref(py));
3554 rust_actor.register(trader_id, clock, cache).unwrap();
3555
3556 let quote = QuoteTick::new(
3557 audusd_sim.id,
3558 Price::from("1.00000"),
3559 Price::from("1.00001"),
3560 Quantity::from(100_000),
3561 Quantity::from(100_000),
3562 UnixNanos::default(),
3563 UnixNanos::default(),
3564 );
3565
3566 rust_actor.inner_mut().on_quote("e).unwrap();
3567 rust_actor.inner_mut().on_quote("e).unwrap();
3568 rust_actor.inner_mut().on_quote("e).unwrap();
3569
3570 assert_eq!(python_method_call_count(&py_actor, py, "on_quote"), 3);
3571 });
3572 }
3573
3574 #[rstest]
3575 fn test_python_dispatch_no_call_when_py_self_not_set(
3576 clock: Rc<RefCell<TestClock>>,
3577 cache: Rc<RefCell<Cache>>,
3578 trader_id: TraderId,
3579 ) {
3580 pyo3::Python::initialize();
3581 Python::attach(|_py| {
3582 let mut rust_actor = PyDataActor::new(None);
3583 rust_actor.register(trader_id, clock, cache).unwrap();
3584
3585 let result = DataActor::on_start(rust_actor.inner_mut());
3587 assert!(result.is_ok());
3588 });
3589 }
3590
3591 #[rstest]
3592 fn test_python_on_historical_data_rejects_non_custom_data(
3593 clock: Rc<RefCell<TestClock>>,
3594 cache: Rc<RefCell<Cache>>,
3595 trader_id: TraderId,
3596 ) {
3597 pyo3::Python::initialize();
3598 let mut rust_actor = PyDataActor::new(None);
3599 rust_actor.register(trader_id, clock, cache).unwrap();
3600
3601 let non_custom: String = "not CustomData".to_string();
3602 let result = rust_actor.inner_mut().on_historical_data(&non_custom);
3603
3604 assert!(result.is_err());
3605 assert!(result.unwrap_err().to_string().contains("unsupported type"));
3606 }
3607}