Skip to main content

nautilus_common/python/
actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
5//  You may not use this file except in compliance with the License.
6//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
7//  Unless required by applicable law or agreed to in writing, software
8//  distributed under the License is distributed on an "AS IS" BASIS,
9//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10//  See the License for the specific language governing permissions and
11//  limitations under the License.
12// -------------------------------------------------------------------------------------------------
13
14//! Python bindings for DataActor with complete command and event handler forwarding.
15
16use std::{
17    any::Any,
18    cell::{RefCell, UnsafeCell},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    ops::{Deref, DerefMut},
23    rc::Rc,
24};
25
26use nautilus_core::{
27    from_pydict,
28    nanos::UnixNanos,
29    python::{IntoPyObjectNautilusExt, to_pyruntime_err, to_pyvalue_err},
30};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33    Block, Blockchain, Pool, PoolFeeCollect, PoolFlash, PoolLiquidityUpdate, PoolSwap,
34};
35use nautilus_model::{
36    data::{
37        Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38        MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick,
39        close::InstrumentClose,
40        option_chain::{OptionChainSlice, OptionGreeks},
41    },
42    enums::BookType,
43    identifiers::{ActorId, ClientId, InstrumentId, OptionSeriesId, TraderId, Venue},
44    instruments::{InstrumentAny, SyntheticInstrument},
45    orderbook::OrderBook,
46    python::{data::option_chain::PyStrikeRange, instruments::instrument_any_to_pyobject},
47};
48use pyo3::{prelude::*, types::PyDict};
49
50use crate::{
51    actor::{
52        Actor, DataActor,
53        data_actor::{DataActorConfig, DataActorCore, ImportableActorConfig},
54        registry::{get_actor_registry, try_get_actor_unchecked},
55    },
56    cache::Cache,
57    clock::Clock,
58    component::{Component, get_component_registry},
59    enums::ComponentState,
60    python::{cache::PyCache, clock::PyClock, logging::PyLogger},
61    signal::Signal,
62    timer::{TimeEvent, TimeEventCallback},
63};
64
65#[pyo3::pymethods]
66#[pyo3_stub_gen::derive::gen_stub_pymethods]
67impl DataActorConfig {
68    /// Common configuration for `DataActor` based components.
69    #[new]
70    #[pyo3(signature = (actor_id=None, log_events=true, log_commands=true))]
71    fn py_new(actor_id: Option<ActorId>, log_events: bool, log_commands: bool) -> Self {
72        Self {
73            actor_id,
74            log_events,
75            log_commands,
76        }
77    }
78}
79
80#[pyo3::pymethods]
81#[pyo3_stub_gen::derive::gen_stub_pymethods]
82impl ImportableActorConfig {
83    /// Configuration for creating actors from importable paths.
84    #[new]
85    #[expect(clippy::needless_pass_by_value)]
86    fn py_new(actor_path: String, config_path: String, config: Py<PyDict>) -> PyResult<Self> {
87        let json_config = Python::attach(|py| -> PyResult<HashMap<String, serde_json::Value>> {
88            let kwargs = PyDict::new(py);
89            kwargs.set_item("default", py.eval(pyo3::ffi::c_str!("str"), None, None)?)?;
90            let json_str: String = PyModule::import(py, "json")?
91                .call_method("dumps", (config.bind(py),), Some(&kwargs))?
92                .extract()?;
93
94            let json_value: serde_json::Value =
95                serde_json::from_str(&json_str).map_err(to_pyvalue_err)?;
96
97            if let serde_json::Value::Object(map) = json_value {
98                Ok(map.into_iter().collect())
99            } else {
100                Err(to_pyvalue_err("Config must be a dictionary"))
101            }
102        })?;
103
104        Ok(Self {
105            actor_path,
106            config_path,
107            config: json_config,
108        })
109    }
110
111    #[getter]
112    fn actor_path(&self) -> &String {
113        &self.actor_path
114    }
115
116    #[getter]
117    fn config_path(&self) -> &String {
118        &self.config_path
119    }
120
121    #[getter]
122    fn config(&self, py: Python<'_>) -> PyResult<Py<PyDict>> {
123        // Convert HashMap<String, serde_json::Value> back to Python dict
124        let py_dict = PyDict::new(py);
125
126        for (key, value) in &self.config {
127            // Convert serde_json::Value back to Python object via JSON
128            let json_str = serde_json::to_string(value).map_err(to_pyvalue_err)?;
129            let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
130            py_dict.set_item(key, py_value)?;
131        }
132        Ok(py_dict.unbind())
133    }
134}
135
136/// Inner state of PyDataActor, shared between Python wrapper and Rust registries.
137///
138/// This type holds the actual actor state and implements all the actor traits.
139/// It is wrapped in `Rc<UnsafeCell<>>` to allow shared ownership between Python
140/// and the global registries without copying.
141pub struct PyDataActorInner {
142    core: DataActorCore,
143    py_self: Option<Py<PyAny>>,
144    clock: PyClock,
145    logger: PyLogger,
146}
147
148impl Debug for PyDataActorInner {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct(stringify!(PyDataActorInner))
151            .field("core", &self.core)
152            .field("py_self", &self.py_self.as_ref().map(|_| "<Py<PyAny>>"))
153            .field("clock", &self.clock)
154            .field("logger", &self.logger)
155            .finish()
156    }
157}
158
159impl Deref for PyDataActorInner {
160    type Target = DataActorCore;
161
162    fn deref(&self) -> &Self::Target {
163        &self.core
164    }
165}
166
167impl DerefMut for PyDataActorInner {
168    fn deref_mut(&mut self) -> &mut Self::Target {
169        &mut self.core
170    }
171}
172
173#[expect(clippy::needless_pass_by_ref_mut)]
174impl PyDataActorInner {
175    fn dispatch_on_start(&self) -> PyResult<()> {
176        if let Some(ref py_self) = self.py_self {
177            Python::attach(|py| py_self.call_method0(py, "on_start"))?;
178        }
179        Ok(())
180    }
181
182    fn dispatch_on_stop(&mut self) -> PyResult<()> {
183        if let Some(ref py_self) = self.py_self {
184            Python::attach(|py| py_self.call_method0(py, "on_stop"))?;
185        }
186        Ok(())
187    }
188
189    fn dispatch_on_resume(&mut self) -> PyResult<()> {
190        if let Some(ref py_self) = self.py_self {
191            Python::attach(|py| py_self.call_method0(py, "on_resume"))?;
192        }
193        Ok(())
194    }
195
196    fn dispatch_on_reset(&mut self) -> PyResult<()> {
197        if let Some(ref py_self) = self.py_self {
198            Python::attach(|py| py_self.call_method0(py, "on_reset"))?;
199        }
200        Ok(())
201    }
202
203    fn dispatch_on_dispose(&mut self) -> PyResult<()> {
204        if let Some(ref py_self) = self.py_self {
205            Python::attach(|py| py_self.call_method0(py, "on_dispose"))?;
206        }
207        Ok(())
208    }
209
210    fn dispatch_on_degrade(&mut self) -> PyResult<()> {
211        if let Some(ref py_self) = self.py_self {
212            Python::attach(|py| py_self.call_method0(py, "on_degrade"))?;
213        }
214        Ok(())
215    }
216
217    fn dispatch_on_fault(&mut self) -> PyResult<()> {
218        if let Some(ref py_self) = self.py_self {
219            Python::attach(|py| py_self.call_method0(py, "on_fault"))?;
220        }
221        Ok(())
222    }
223
224    fn dispatch_on_time_event(&mut self, event: TimeEvent) -> PyResult<()> {
225        if let Some(ref py_self) = self.py_self {
226            Python::attach(|py| {
227                py_self.call_method1(py, "on_time_event", (event.into_py_any_unwrap(py),))
228            })?;
229        }
230        Ok(())
231    }
232
233    fn dispatch_on_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
234        if let Some(ref py_self) = self.py_self {
235            Python::attach(|py| py_self.call_method1(py, "on_data", (data,)))?;
236        }
237        Ok(())
238    }
239
240    fn dispatch_on_signal(&mut self, signal: &Signal) -> PyResult<()> {
241        if let Some(ref py_self) = self.py_self {
242            Python::attach(|py| {
243                py_self.call_method1(py, "on_signal", (signal.clone().into_py_any_unwrap(py),))
244            })?;
245        }
246        Ok(())
247    }
248
249    fn dispatch_on_instrument(&mut self, instrument: Py<PyAny>) -> PyResult<()> {
250        if let Some(ref py_self) = self.py_self {
251            Python::attach(|py| py_self.call_method1(py, "on_instrument", (instrument,)))?;
252        }
253        Ok(())
254    }
255
256    fn dispatch_on_quote(&mut self, quote: QuoteTick) -> PyResult<()> {
257        if let Some(ref py_self) = self.py_self {
258            Python::attach(|py| {
259                py_self.call_method1(py, "on_quote", (quote.into_py_any_unwrap(py),))
260            })?;
261        }
262        Ok(())
263    }
264
265    fn dispatch_on_trade(&mut self, trade: TradeTick) -> PyResult<()> {
266        if let Some(ref py_self) = self.py_self {
267            Python::attach(|py| {
268                py_self.call_method1(py, "on_trade", (trade.into_py_any_unwrap(py),))
269            })?;
270        }
271        Ok(())
272    }
273
274    fn dispatch_on_bar(&mut self, bar: Bar) -> PyResult<()> {
275        if let Some(ref py_self) = self.py_self {
276            Python::attach(|py| py_self.call_method1(py, "on_bar", (bar.into_py_any_unwrap(py),)))?;
277        }
278        Ok(())
279    }
280
281    fn dispatch_on_book_deltas(&mut self, deltas: OrderBookDeltas) -> PyResult<()> {
282        if let Some(ref py_self) = self.py_self {
283            Python::attach(|py| {
284                py_self.call_method1(py, "on_book_deltas", (deltas.into_py_any_unwrap(py),))
285            })?;
286        }
287        Ok(())
288    }
289
290    fn dispatch_on_book(&mut self, book: &OrderBook) -> PyResult<()> {
291        if let Some(ref py_self) = self.py_self {
292            Python::attach(|py| {
293                py_self.call_method1(py, "on_book", (book.clone().into_py_any_unwrap(py),))
294            })?;
295        }
296        Ok(())
297    }
298
299    fn dispatch_on_mark_price(&mut self, mark_price: MarkPriceUpdate) -> PyResult<()> {
300        if let Some(ref py_self) = self.py_self {
301            Python::attach(|py| {
302                py_self.call_method1(py, "on_mark_price", (mark_price.into_py_any_unwrap(py),))
303            })?;
304        }
305        Ok(())
306    }
307
308    fn dispatch_on_index_price(&mut self, index_price: IndexPriceUpdate) -> PyResult<()> {
309        if let Some(ref py_self) = self.py_self {
310            Python::attach(|py| {
311                py_self.call_method1(py, "on_index_price", (index_price.into_py_any_unwrap(py),))
312            })?;
313        }
314        Ok(())
315    }
316
317    fn dispatch_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) -> PyResult<()> {
318        if let Some(ref py_self) = self.py_self {
319            Python::attach(|py| {
320                py_self.call_method1(
321                    py,
322                    "on_funding_rate",
323                    (funding_rate.into_py_any_unwrap(py),),
324                )
325            })?;
326        }
327        Ok(())
328    }
329
330    fn dispatch_on_instrument_status(&mut self, data: InstrumentStatus) -> PyResult<()> {
331        if let Some(ref py_self) = self.py_self {
332            Python::attach(|py| {
333                py_self.call_method1(py, "on_instrument_status", (data.into_py_any_unwrap(py),))
334            })?;
335        }
336        Ok(())
337    }
338
339    fn dispatch_on_instrument_close(&mut self, update: InstrumentClose) -> PyResult<()> {
340        if let Some(ref py_self) = self.py_self {
341            Python::attach(|py| {
342                py_self.call_method1(py, "on_instrument_close", (update.into_py_any_unwrap(py),))
343            })?;
344        }
345        Ok(())
346    }
347
348    fn dispatch_on_option_greeks(&mut self, greeks: OptionGreeks) -> PyResult<()> {
349        if let Some(ref py_self) = self.py_self {
350            Python::attach(|py| {
351                py_self.call_method1(py, "on_option_greeks", (greeks.into_py_any_unwrap(py),))
352            })?;
353        }
354        Ok(())
355    }
356
357    fn dispatch_on_option_chain(&mut self, slice: OptionChainSlice) -> PyResult<()> {
358        if let Some(ref py_self) = self.py_self {
359            Python::attach(|py| {
360                py_self.call_method1(py, "on_option_chain", (slice.into_py_any_unwrap(py),))
361            })?;
362        }
363        Ok(())
364    }
365
366    fn dispatch_on_historical_data(&mut self, data: Py<PyAny>) -> PyResult<()> {
367        if let Some(ref py_self) = self.py_self {
368            Python::attach(|py| py_self.call_method1(py, "on_historical_data", (data,)))?;
369        }
370        Ok(())
371    }
372
373    fn dispatch_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) -> PyResult<()> {
374        if let Some(ref py_self) = self.py_self {
375            Python::attach(|py| {
376                let py_quotes: Vec<_> = quotes
377                    .into_iter()
378                    .map(|q| q.into_py_any_unwrap(py))
379                    .collect();
380                py_self.call_method1(py, "on_historical_quotes", (py_quotes,))
381            })?;
382        }
383        Ok(())
384    }
385
386    fn dispatch_on_historical_trades(&mut self, trades: Vec<TradeTick>) -> PyResult<()> {
387        if let Some(ref py_self) = self.py_self {
388            Python::attach(|py| {
389                let py_trades: Vec<_> = trades
390                    .into_iter()
391                    .map(|t| t.into_py_any_unwrap(py))
392                    .collect();
393                py_self.call_method1(py, "on_historical_trades", (py_trades,))
394            })?;
395        }
396        Ok(())
397    }
398
399    fn dispatch_on_historical_funding_rates(
400        &mut self,
401        funding_rates: Vec<FundingRateUpdate>,
402    ) -> PyResult<()> {
403        if let Some(ref py_self) = self.py_self {
404            Python::attach(|py| {
405                let py_rates: Vec<_> = funding_rates
406                    .into_iter()
407                    .map(|r| r.into_py_any_unwrap(py))
408                    .collect();
409                py_self.call_method1(py, "on_historical_funding_rates", (py_rates,))
410            })?;
411        }
412        Ok(())
413    }
414
415    fn dispatch_on_historical_bars(&mut self, bars: Vec<Bar>) -> PyResult<()> {
416        if let Some(ref py_self) = self.py_self {
417            Python::attach(|py| {
418                let py_bars: Vec<_> = bars.into_iter().map(|b| b.into_py_any_unwrap(py)).collect();
419                py_self.call_method1(py, "on_historical_bars", (py_bars,))
420            })?;
421        }
422        Ok(())
423    }
424
425    fn dispatch_on_historical_mark_prices(
426        &mut self,
427        mark_prices: Vec<MarkPriceUpdate>,
428    ) -> PyResult<()> {
429        if let Some(ref py_self) = self.py_self {
430            Python::attach(|py| {
431                let py_prices: Vec<_> = mark_prices
432                    .into_iter()
433                    .map(|p| p.into_py_any_unwrap(py))
434                    .collect();
435                py_self.call_method1(py, "on_historical_mark_prices", (py_prices,))
436            })?;
437        }
438        Ok(())
439    }
440
441    fn dispatch_on_historical_index_prices(
442        &mut self,
443        index_prices: Vec<IndexPriceUpdate>,
444    ) -> PyResult<()> {
445        if let Some(ref py_self) = self.py_self {
446            Python::attach(|py| {
447                let py_prices: Vec<_> = index_prices
448                    .into_iter()
449                    .map(|p| p.into_py_any_unwrap(py))
450                    .collect();
451                py_self.call_method1(py, "on_historical_index_prices", (py_prices,))
452            })?;
453        }
454        Ok(())
455    }
456
457    #[cfg(feature = "defi")]
458    fn dispatch_on_block(&mut self, block: Block) -> PyResult<()> {
459        if let Some(ref py_self) = self.py_self {
460            Python::attach(|py| {
461                py_self.call_method1(py, "on_block", (block.into_py_any_unwrap(py),))
462            })?;
463        }
464        Ok(())
465    }
466
467    #[cfg(feature = "defi")]
468    fn dispatch_on_pool(&mut self, pool: Pool) -> PyResult<()> {
469        if let Some(ref py_self) = self.py_self {
470            Python::attach(|py| {
471                py_self.call_method1(py, "on_pool", (pool.into_py_any_unwrap(py),))
472            })?;
473        }
474        Ok(())
475    }
476
477    #[cfg(feature = "defi")]
478    fn dispatch_on_pool_swap(&mut self, swap: PoolSwap) -> PyResult<()> {
479        if let Some(ref py_self) = self.py_self {
480            Python::attach(|py| {
481                py_self.call_method1(py, "on_pool_swap", (swap.into_py_any_unwrap(py),))
482            })?;
483        }
484        Ok(())
485    }
486
487    #[cfg(feature = "defi")]
488    fn dispatch_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) -> PyResult<()> {
489        if let Some(ref py_self) = self.py_self {
490            Python::attach(|py| {
491                py_self.call_method1(
492                    py,
493                    "on_pool_liquidity_update",
494                    (update.into_py_any_unwrap(py),),
495                )
496            })?;
497        }
498        Ok(())
499    }
500
501    #[cfg(feature = "defi")]
502    fn dispatch_on_pool_fee_collect(&mut self, collect: PoolFeeCollect) -> PyResult<()> {
503        if let Some(ref py_self) = self.py_self {
504            Python::attach(|py| {
505                py_self.call_method1(py, "on_pool_fee_collect", (collect.into_py_any_unwrap(py),))
506            })?;
507        }
508        Ok(())
509    }
510
511    #[cfg(feature = "defi")]
512    fn dispatch_on_pool_flash(&mut self, flash: PoolFlash) -> PyResult<()> {
513        if let Some(ref py_self) = self.py_self {
514            Python::attach(|py| {
515                py_self.call_method1(py, "on_pool_flash", (flash.into_py_any_unwrap(py),))
516            })?;
517        }
518        Ok(())
519    }
520}
521
522fn dict_to_params(
523    py: Python<'_>,
524    params: Option<Py<PyDict>>,
525) -> PyResult<Option<nautilus_core::Params>> {
526    match params {
527        Some(dict) => from_pydict(py, dict),
528        None => Ok(None),
529    }
530}
531
532/// Python-facing wrapper for DataActor.
533///
534/// This wrapper holds shared ownership of `PyDataActorInner` via `Rc<UnsafeCell<>>`.
535/// Both Python (through this wrapper) and the global registries share the same
536/// underlying actor instance, ensuring mutations are visible from both sides.
537#[allow(non_camel_case_types)]
538#[pyo3::pyclass(
539    module = "nautilus_trader.common",
540    name = "DataActor",
541    unsendable,
542    subclass
543)]
544#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")]
545pub struct PyDataActor {
546    inner: Rc<UnsafeCell<PyDataActorInner>>,
547}
548
549impl Debug for PyDataActor {
550    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
551        f.debug_struct(stringify!(PyDataActor))
552            .field("inner", &self.inner())
553            .finish()
554    }
555}
556
557impl PyDataActor {
558    /// Returns a reference to the inner actor state.
559    ///
560    /// # Safety
561    ///
562    /// This is safe for single-threaded use. The `UnsafeCell` allows interior
563    /// mutability which is required for the registries to mutate the actor.
564    #[inline]
565    #[allow(unsafe_code)]
566    pub(crate) fn inner(&self) -> &PyDataActorInner {
567        unsafe { &*self.inner.get() }
568    }
569
570    /// Returns a mutable reference to the inner actor state.
571    ///
572    /// # Safety
573    ///
574    /// This is safe for single-threaded use. Callers must ensure no aliasing
575    /// mutable references exist.
576    #[inline]
577    #[allow(unsafe_code, clippy::mut_from_ref)]
578    pub(crate) fn inner_mut(&self) -> &mut PyDataActorInner {
579        unsafe { &mut *self.inner.get() }
580    }
581}
582
583impl Deref for PyDataActor {
584    type Target = DataActorCore;
585
586    fn deref(&self) -> &Self::Target {
587        &self.inner().core
588    }
589}
590
591impl DerefMut for PyDataActor {
592    fn deref_mut(&mut self) -> &mut Self::Target {
593        &mut self.inner_mut().core
594    }
595}
596
597impl PyDataActor {
598    // Rust constructor for tests and direct Rust usage
599    pub fn new(config: Option<DataActorConfig>) -> Self {
600        let config = config.unwrap_or_default();
601        let core = DataActorCore::new(config);
602        let clock = PyClock::new_test(); // Temporary clock, will be updated on registration
603        let logger = PyLogger::new(core.actor_id().as_str());
604
605        let inner = PyDataActorInner {
606            core,
607            py_self: None,
608            clock,
609            logger,
610        };
611
612        Self {
613            inner: Rc::new(UnsafeCell::new(inner)),
614        }
615    }
616
617    /// Sets the Python instance reference for method dispatch.
618    ///
619    /// This enables the PyDataActor to forward method calls (like `on_start`, `on_stop`)
620    /// to the original Python instance that contains this PyDataActor. This is essential
621    /// for Python inheritance to work correctly, allowing Python subclasses to override
622    /// DataActor methods and have them called by the Rust system.
623    pub fn set_python_instance(&mut self, py_obj: Py<PyAny>) {
624        self.inner_mut().py_self = Some(py_obj);
625    }
626
627    /// Updates the actor_id in both the core config and the actor_id field.
628    ///
629    /// This method is only exposed for the Python actor to assist with configuration and should
630    /// **never** be called post registration. Calling this after registration will cause
631    /// inconsistent state where the actor is registered under one ID but its internal actor_id
632    /// field contains another, breaking message routing and lifecycle management.
633    pub fn set_actor_id(&mut self, actor_id: ActorId) {
634        let inner = self.inner_mut();
635        inner.core.config.actor_id = Some(actor_id);
636        inner.core.actor_id = actor_id;
637    }
638
639    /// Updates the log_events setting in the core config.
640    pub fn set_log_events(&mut self, log_events: bool) {
641        self.inner_mut().core.config.log_events = log_events;
642    }
643
644    /// Updates the log_commands setting in the core config.
645    pub fn set_log_commands(&mut self, log_commands: bool) {
646        self.inner_mut().core.config.log_commands = log_commands;
647    }
648
649    /// Returns the memory address of this instance as a hexadecimal string.
650    pub fn mem_address(&self) -> String {
651        self.inner().core.mem_address()
652    }
653
654    /// Returns a value indicating whether the actor has been registered with a trader.
655    pub fn is_registered(&self) -> bool {
656        self.inner().core.is_registered()
657    }
658
659    /// Register the actor with a trader.
660    ///
661    /// # Errors
662    ///
663    /// Returns an error if the actor is already registered or if the registration process fails.
664    pub fn register(
665        &mut self,
666        trader_id: TraderId,
667        clock: Rc<RefCell<dyn Clock>>,
668        cache: Rc<RefCell<Cache>>,
669    ) -> anyhow::Result<()> {
670        let inner = self.inner_mut();
671        inner.core.register(trader_id, clock, cache)?;
672
673        inner.clock = PyClock::from_rc(inner.core.clock_rc());
674
675        // Register default time event handler for this actor
676        let actor_id = inner.actor_id().inner();
677        let callback = TimeEventCallback::from(move |event: TimeEvent| {
678            if let Some(mut actor) = try_get_actor_unchecked::<PyDataActorInner>(&actor_id) {
679                if let Err(e) = actor.on_time_event(&event) {
680                    log::error!("Python time event handler failed for actor {actor_id}: {e}");
681                }
682            } else {
683                log::error!("Actor {actor_id} not found for time event handling");
684            }
685        });
686
687        inner.clock.inner_mut().register_default_handler(callback);
688
689        inner.initialize()
690    }
691
692    /// Registers this actor in the global component and actor registries.
693    ///
694    /// Clones the internal `Rc` and inserts into both registries. This ensures
695    /// Python and the registries share the exact same actor instance.
696    pub fn register_in_global_registries(&self) {
697        let inner = self.inner();
698        let component_id = inner.component_id().inner();
699        let actor_id = Actor::id(inner);
700
701        let inner_ref: Rc<UnsafeCell<PyDataActorInner>> = self.inner.clone();
702
703        let component_trait_ref: Rc<UnsafeCell<dyn Component>> = inner_ref.clone();
704        get_component_registry().insert(component_id, component_trait_ref);
705
706        let actor_trait_ref: Rc<UnsafeCell<dyn Actor>> = inner_ref;
707        get_actor_registry().insert(actor_id, actor_trait_ref);
708    }
709}
710
711impl DataActor for PyDataActorInner {
712    fn on_start(&mut self) -> anyhow::Result<()> {
713        self.dispatch_on_start()
714            .map_err(|e| anyhow::anyhow!("Python on_start failed: {e}"))
715    }
716
717    fn on_stop(&mut self) -> anyhow::Result<()> {
718        self.dispatch_on_stop()
719            .map_err(|e| anyhow::anyhow!("Python on_stop failed: {e}"))
720    }
721
722    fn on_resume(&mut self) -> anyhow::Result<()> {
723        self.dispatch_on_resume()
724            .map_err(|e| anyhow::anyhow!("Python on_resume failed: {e}"))
725    }
726
727    fn on_reset(&mut self) -> anyhow::Result<()> {
728        self.dispatch_on_reset()
729            .map_err(|e| anyhow::anyhow!("Python on_reset failed: {e}"))
730    }
731
732    fn on_dispose(&mut self) -> anyhow::Result<()> {
733        self.dispatch_on_dispose()
734            .map_err(|e| anyhow::anyhow!("Python on_dispose failed: {e}"))
735    }
736
737    fn on_degrade(&mut self) -> anyhow::Result<()> {
738        self.dispatch_on_degrade()
739            .map_err(|e| anyhow::anyhow!("Python on_degrade failed: {e}"))
740    }
741
742    fn on_fault(&mut self) -> anyhow::Result<()> {
743        self.dispatch_on_fault()
744            .map_err(|e| anyhow::anyhow!("Python on_fault failed: {e}"))
745    }
746
747    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
748        self.dispatch_on_time_event(event.clone())
749            .map_err(|e| anyhow::anyhow!("Python on_time_event failed: {e}"))
750    }
751
752    #[allow(unused_variables)]
753    fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
754        Python::attach(|py| {
755            let py_data: Py<PyAny> = Py::new(py, data.clone())?.into_any();
756            self.dispatch_on_data(py_data)
757                .map_err(|e| anyhow::anyhow!("Python on_data failed: {e}"))
758        })
759    }
760
761    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
762        self.dispatch_on_signal(signal)
763            .map_err(|e| anyhow::anyhow!("Python on_signal failed: {e}"))
764    }
765
766    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
767        Python::attach(|py| {
768            let py_instrument = instrument_any_to_pyobject(py, instrument.clone())
769                .map_err(|e| anyhow::anyhow!("Failed to convert InstrumentAny to Python: {e}"))?;
770            self.dispatch_on_instrument(py_instrument)
771                .map_err(|e| anyhow::anyhow!("Python on_instrument failed: {e}"))
772        })
773    }
774
775    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
776        self.dispatch_on_quote(*quote)
777            .map_err(|e| anyhow::anyhow!("Python on_quote failed: {e}"))
778    }
779
780    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
781        self.dispatch_on_trade(*tick)
782            .map_err(|e| anyhow::anyhow!("Python on_trade failed: {e}"))
783    }
784
785    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
786        self.dispatch_on_bar(*bar)
787            .map_err(|e| anyhow::anyhow!("Python on_bar failed: {e}"))
788    }
789
790    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
791        self.dispatch_on_book_deltas(deltas.clone())
792            .map_err(|e| anyhow::anyhow!("Python on_book_deltas failed: {e}"))
793    }
794
795    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
796        self.dispatch_on_book(order_book)
797            .map_err(|e| anyhow::anyhow!("Python on_book failed: {e}"))
798    }
799
800    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
801        self.dispatch_on_mark_price(*mark_price)
802            .map_err(|e| anyhow::anyhow!("Python on_mark_price failed: {e}"))
803    }
804
805    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
806        self.dispatch_on_index_price(*index_price)
807            .map_err(|e| anyhow::anyhow!("Python on_index_price failed: {e}"))
808    }
809
810    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
811        self.dispatch_on_funding_rate(*funding_rate)
812            .map_err(|e| anyhow::anyhow!("Python on_funding_rate failed: {e}"))
813    }
814
815    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
816        self.dispatch_on_instrument_status(*data)
817            .map_err(|e| anyhow::anyhow!("Python on_instrument_status failed: {e}"))
818    }
819
820    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
821        self.dispatch_on_instrument_close(*update)
822            .map_err(|e| anyhow::anyhow!("Python on_instrument_close failed: {e}"))
823    }
824
825    fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
826        self.dispatch_on_option_greeks(*greeks)
827            .map_err(|e| anyhow::anyhow!("Python on_option_greeks failed: {e}"))
828    }
829
830    fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
831        self.dispatch_on_option_chain(slice.clone())
832            .map_err(|e| anyhow::anyhow!("Python on_option_chain failed: {e}"))
833    }
834
835    #[cfg(feature = "defi")]
836    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
837        self.dispatch_on_block(block.clone())
838            .map_err(|e| anyhow::anyhow!("Python on_block failed: {e}"))
839    }
840
841    #[cfg(feature = "defi")]
842    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
843        self.dispatch_on_pool(pool.clone())
844            .map_err(|e| anyhow::anyhow!("Python on_pool failed: {e}"))
845    }
846
847    #[cfg(feature = "defi")]
848    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
849        self.dispatch_on_pool_swap(swap.clone())
850            .map_err(|e| anyhow::anyhow!("Python on_pool_swap failed: {e}"))
851    }
852
853    #[cfg(feature = "defi")]
854    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
855        self.dispatch_on_pool_liquidity_update(update.clone())
856            .map_err(|e| anyhow::anyhow!("Python on_pool_liquidity_update failed: {e}"))
857    }
858
859    #[cfg(feature = "defi")]
860    fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
861        self.dispatch_on_pool_fee_collect(collect.clone())
862            .map_err(|e| anyhow::anyhow!("Python on_pool_fee_collect failed: {e}"))
863    }
864
865    #[cfg(feature = "defi")]
866    fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
867        self.dispatch_on_pool_flash(flash.clone())
868            .map_err(|e| anyhow::anyhow!("Python on_pool_flash failed: {e}"))
869    }
870
871    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
872        Python::attach(|py| {
873            let py_data: Py<PyAny> = if let Some(custom_data) = data.downcast_ref::<CustomData>() {
874                Py::new(py, custom_data.clone())?.into_any()
875            } else {
876                anyhow::bail!("Failed to convert historical data to Python: unsupported type");
877            };
878            self.dispatch_on_historical_data(py_data)
879                .map_err(|e| anyhow::anyhow!("Python on_historical_data failed: {e}"))
880        })
881    }
882
883    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
884        self.dispatch_on_historical_quotes(quotes.to_vec())
885            .map_err(|e| anyhow::anyhow!("Python on_historical_quotes failed: {e}"))
886    }
887
888    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
889        self.dispatch_on_historical_trades(trades.to_vec())
890            .map_err(|e| anyhow::anyhow!("Python on_historical_trades failed: {e}"))
891    }
892
893    fn on_historical_funding_rates(
894        &mut self,
895        funding_rates: &[FundingRateUpdate],
896    ) -> anyhow::Result<()> {
897        self.dispatch_on_historical_funding_rates(funding_rates.to_vec())
898            .map_err(|e| anyhow::anyhow!("Python on_historical_funding_rates failed: {e}"))
899    }
900
901    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
902        self.dispatch_on_historical_bars(bars.to_vec())
903            .map_err(|e| anyhow::anyhow!("Python on_historical_bars failed: {e}"))
904    }
905
906    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
907        self.dispatch_on_historical_mark_prices(mark_prices.to_vec())
908            .map_err(|e| anyhow::anyhow!("Python on_historical_mark_prices failed: {e}"))
909    }
910
911    fn on_historical_index_prices(
912        &mut self,
913        index_prices: &[IndexPriceUpdate],
914    ) -> anyhow::Result<()> {
915        self.dispatch_on_historical_index_prices(index_prices.to_vec())
916            .map_err(|e| anyhow::anyhow!("Python on_historical_index_prices failed: {e}"))
917    }
918}
919
920#[pymethods]
921#[pyo3_stub_gen::derive::gen_stub_pymethods]
922impl PyDataActor {
923    #[new]
924    #[pyo3(signature = (config=None))]
925    fn py_new(config: Option<DataActorConfig>) -> Self {
926        Self::new(config)
927    }
928
929    #[pyo3(signature = (config=None))]
930    #[allow(unused_variables, clippy::needless_pass_by_value)]
931    fn __init__(slf: &Bound<'_, Self>, config: Option<DataActorConfig>) {
932        let py_self: Py<PyAny> = slf.clone().unbind().into_any();
933        slf.borrow_mut().set_python_instance(py_self);
934    }
935
936    #[getter]
937    #[pyo3(name = "clock")]
938    fn py_clock(&self) -> PyResult<PyClock> {
939        let inner = self.inner();
940        if inner.core.is_registered() {
941            Ok(inner.clock.clone())
942        } else {
943            Err(to_pyruntime_err(
944                "Actor must be registered with a trader before accessing clock",
945            ))
946        }
947    }
948
949    #[getter]
950    #[pyo3(name = "cache")]
951    fn py_cache(&self) -> PyResult<PyCache> {
952        let inner = self.inner();
953        if inner.core.is_registered() {
954            Ok(PyCache::from_rc(inner.core.cache_rc()))
955        } else {
956            Err(to_pyruntime_err(
957                "Actor must be registered with a trader before accessing cache",
958            ))
959        }
960    }
961
962    #[getter]
963    #[pyo3(name = "log")]
964    fn py_log(&self) -> PyLogger {
965        self.inner().logger.clone()
966    }
967
968    #[getter]
969    #[pyo3(name = "actor_id")]
970    fn py_actor_id(&self) -> ActorId {
971        self.inner().core.actor_id
972    }
973
974    #[getter]
975    #[pyo3(name = "trader_id")]
976    fn py_trader_id(&self) -> Option<TraderId> {
977        self.inner().core.trader_id()
978    }
979
980    #[pyo3(name = "state")]
981    fn py_state(&self) -> ComponentState {
982        Component::state(self.inner())
983    }
984
985    #[pyo3(name = "is_ready")]
986    fn py_is_ready(&self) -> bool {
987        Component::is_ready(self.inner())
988    }
989
990    #[pyo3(name = "is_running")]
991    fn py_is_running(&self) -> bool {
992        Component::is_running(self.inner())
993    }
994
995    #[pyo3(name = "is_stopped")]
996    fn py_is_stopped(&self) -> bool {
997        Component::is_stopped(self.inner())
998    }
999
1000    #[pyo3(name = "is_degraded")]
1001    fn py_is_degraded(&self) -> bool {
1002        Component::is_degraded(self.inner())
1003    }
1004
1005    #[pyo3(name = "is_faulted")]
1006    fn py_is_faulted(&self) -> bool {
1007        Component::is_faulted(self.inner())
1008    }
1009
1010    #[pyo3(name = "is_disposed")]
1011    fn py_is_disposed(&self) -> bool {
1012        Component::is_disposed(self.inner())
1013    }
1014
1015    #[pyo3(name = "start")]
1016    fn py_start(&mut self) -> PyResult<()> {
1017        Component::start(self.inner_mut()).map_err(to_pyruntime_err)
1018    }
1019
1020    #[pyo3(name = "stop")]
1021    fn py_stop(&mut self) -> PyResult<()> {
1022        Component::stop(self.inner_mut()).map_err(to_pyruntime_err)
1023    }
1024
1025    #[pyo3(name = "resume")]
1026    fn py_resume(&mut self) -> PyResult<()> {
1027        Component::resume(self.inner_mut()).map_err(to_pyruntime_err)
1028    }
1029
1030    #[pyo3(name = "reset")]
1031    fn py_reset(&mut self) -> PyResult<()> {
1032        Component::reset(self.inner_mut()).map_err(to_pyruntime_err)
1033    }
1034
1035    #[pyo3(name = "dispose")]
1036    fn py_dispose(&mut self) -> PyResult<()> {
1037        Component::dispose(self.inner_mut()).map_err(to_pyruntime_err)
1038    }
1039
1040    #[pyo3(name = "degrade")]
1041    fn py_degrade(&mut self) -> PyResult<()> {
1042        Component::degrade(self.inner_mut()).map_err(to_pyruntime_err)
1043    }
1044
1045    #[pyo3(name = "fault")]
1046    fn py_fault(&mut self) -> PyResult<()> {
1047        Component::fault(self.inner_mut()).map_err(to_pyruntime_err)
1048    }
1049
1050    #[pyo3(name = "shutdown_system")]
1051    #[pyo3(signature = (reason=None))]
1052    fn py_shutdown_system(&self, reason: Option<String>) {
1053        self.inner().core.shutdown_system(reason);
1054    }
1055
1056    #[pyo3(name = "publish_data")]
1057    fn py_publish_data(&self, data_type: &DataType, data: &CustomData) {
1058        self.inner().core.publish_data(data_type, data);
1059    }
1060
1061    #[pyo3(name = "publish_signal")]
1062    #[pyo3(signature = (name, value, ts_event=0))]
1063    #[allow(clippy::needless_pass_by_value)]
1064    fn py_publish_signal(
1065        &self,
1066        py: Python<'_>,
1067        name: &str,
1068        value: Py<PyAny>,
1069        ts_event: u64,
1070    ) -> PyResult<()> {
1071        // Accept any int / float / str / bool — match v1 behaviour by coercing with `str(value)`.
1072        let value_str: String = value.bind(py).str()?.extract()?;
1073        self.inner()
1074            .core
1075            .publish_signal(name, value_str, UnixNanos::from(ts_event));
1076        Ok(())
1077    }
1078
1079    #[pyo3(name = "add_synthetic")]
1080    fn py_add_synthetic(&self, synthetic: SyntheticInstrument) -> PyResult<()> {
1081        self.inner()
1082            .core
1083            .add_synthetic(synthetic)
1084            .map_err(to_pyvalue_err)
1085    }
1086
1087    #[pyo3(name = "update_synthetic")]
1088    fn py_update_synthetic(&self, synthetic: SyntheticInstrument) -> PyResult<()> {
1089        self.inner()
1090            .core
1091            .update_synthetic(synthetic)
1092            .map_err(to_pyvalue_err)
1093    }
1094
1095    #[pyo3(name = "on_start")]
1096    fn py_on_start(&self) {}
1097
1098    #[pyo3(name = "on_stop")]
1099    fn py_on_stop(&mut self) {}
1100
1101    #[pyo3(name = "on_resume")]
1102    fn py_on_resume(&mut self) {}
1103
1104    #[pyo3(name = "on_reset")]
1105    fn py_on_reset(&mut self) {}
1106
1107    #[pyo3(name = "on_dispose")]
1108    fn py_on_dispose(&mut self) {}
1109
1110    #[pyo3(name = "on_degrade")]
1111    fn py_on_degrade(&mut self) {}
1112
1113    #[pyo3(name = "on_fault")]
1114    fn py_on_fault(&mut self) {}
1115
1116    #[allow(unused_variables, clippy::needless_pass_by_value)]
1117    #[pyo3(name = "on_time_event")]
1118    fn py_on_time_event(&mut self, event: TimeEvent) {}
1119
1120    #[allow(unused_variables, clippy::needless_pass_by_value)]
1121    #[pyo3(name = "on_data")]
1122    fn py_on_data(&mut self, data: Py<PyAny>) {}
1123
1124    #[allow(unused_variables)]
1125    #[pyo3(name = "on_signal")]
1126    fn py_on_signal(&mut self, signal: &Signal) {}
1127
1128    #[allow(unused_variables, clippy::needless_pass_by_value)]
1129    #[pyo3(name = "on_instrument")]
1130    fn py_on_instrument(&mut self, instrument: Py<PyAny>) {}
1131
1132    #[allow(unused_variables)]
1133    #[pyo3(name = "on_quote")]
1134    fn py_on_quote(&mut self, quote: QuoteTick) {}
1135
1136    #[allow(unused_variables)]
1137    #[pyo3(name = "on_trade")]
1138    fn py_on_trade(&mut self, trade: TradeTick) {}
1139
1140    #[allow(unused_variables)]
1141    #[pyo3(name = "on_bar")]
1142    fn py_on_bar(&mut self, bar: Bar) {}
1143
1144    #[allow(unused_variables, clippy::needless_pass_by_value)]
1145    #[pyo3(name = "on_book_deltas")]
1146    fn py_on_book_deltas(&mut self, deltas: OrderBookDeltas) {}
1147
1148    #[allow(unused_variables)]
1149    #[pyo3(name = "on_book")]
1150    fn py_on_book(&mut self, book: &OrderBook) {}
1151
1152    #[allow(unused_variables)]
1153    #[pyo3(name = "on_mark_price")]
1154    fn py_on_mark_price(&mut self, mark_price: MarkPriceUpdate) {}
1155
1156    #[allow(unused_variables)]
1157    #[pyo3(name = "on_index_price")]
1158    fn py_on_index_price(&mut self, index_price: IndexPriceUpdate) {}
1159
1160    #[allow(unused_variables)]
1161    #[pyo3(name = "on_funding_rate")]
1162    fn py_on_funding_rate(&mut self, funding_rate: FundingRateUpdate) {}
1163
1164    #[allow(unused_variables)]
1165    #[pyo3(name = "on_instrument_status")]
1166    fn py_on_instrument_status(&mut self, status: InstrumentStatus) {}
1167
1168    #[allow(unused_variables)]
1169    #[pyo3(name = "on_instrument_close")]
1170    fn py_on_instrument_close(&mut self, close: InstrumentClose) {}
1171
1172    #[allow(unused_variables)]
1173    #[pyo3(name = "on_option_greeks")]
1174    fn py_on_option_greeks(&mut self, greeks: OptionGreeks) {}
1175
1176    #[allow(unused_variables, clippy::needless_pass_by_value)]
1177    #[pyo3(name = "on_option_chain")]
1178    fn py_on_option_chain(&mut self, slice: OptionChainSlice) {}
1179
1180    #[pyo3(name = "subscribe_data")]
1181    #[pyo3(signature = (data_type, client_id=None, params=None))]
1182    fn py_subscribe_data(
1183        &mut self,
1184        py: Python<'_>,
1185        data_type: DataType,
1186        client_id: Option<ClientId>,
1187        params: Option<Py<PyDict>>,
1188    ) -> PyResult<()> {
1189        let params = dict_to_params(py, params)?;
1190        DataActor::subscribe_data(self.inner_mut(), data_type, client_id, params);
1191        Ok(())
1192    }
1193
1194    #[pyo3(name = "subscribe_signal")]
1195    #[pyo3(signature = (name=""))]
1196    fn py_subscribe_signal(&mut self, name: &str) {
1197        DataActor::subscribe_signal(self.inner_mut(), name);
1198    }
1199
1200    #[pyo3(name = "subscribe_instruments")]
1201    #[pyo3(signature = (venue, client_id=None, params=None))]
1202    fn py_subscribe_instruments(
1203        &mut self,
1204        py: Python<'_>,
1205        venue: Venue,
1206        client_id: Option<ClientId>,
1207        params: Option<Py<PyDict>>,
1208    ) -> PyResult<()> {
1209        let params = dict_to_params(py, params)?;
1210        DataActor::subscribe_instruments(self.inner_mut(), venue, client_id, params);
1211        Ok(())
1212    }
1213
1214    #[pyo3(name = "subscribe_instrument")]
1215    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1216    fn py_subscribe_instrument(
1217        &mut self,
1218        py: Python<'_>,
1219        instrument_id: InstrumentId,
1220        client_id: Option<ClientId>,
1221        params: Option<Py<PyDict>>,
1222    ) -> PyResult<()> {
1223        let params = dict_to_params(py, params)?;
1224        DataActor::subscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1225        Ok(())
1226    }
1227
1228    #[pyo3(name = "subscribe_book_deltas")]
1229    #[pyo3(signature = (instrument_id, book_type, depth=None, client_id=None, managed=false, params=None))]
1230    #[expect(clippy::too_many_arguments)]
1231    fn py_subscribe_book_deltas(
1232        &mut self,
1233        py: Python<'_>,
1234        instrument_id: InstrumentId,
1235        book_type: BookType,
1236        depth: Option<usize>,
1237        client_id: Option<ClientId>,
1238        managed: bool,
1239        params: Option<Py<PyDict>>,
1240    ) -> PyResult<()> {
1241        let params = dict_to_params(py, params)?;
1242        let depth = depth.and_then(NonZeroUsize::new);
1243        DataActor::subscribe_book_deltas(
1244            self.inner_mut(),
1245            instrument_id,
1246            book_type,
1247            depth,
1248            client_id,
1249            managed,
1250            params,
1251        );
1252        Ok(())
1253    }
1254
1255    #[pyo3(name = "subscribe_book_at_interval")]
1256    #[pyo3(signature = (instrument_id, book_type, interval_ms, depth=None, client_id=None, params=None))]
1257    #[expect(clippy::too_many_arguments)]
1258    fn py_subscribe_book_at_interval(
1259        &mut self,
1260        py: Python<'_>,
1261        instrument_id: InstrumentId,
1262        book_type: BookType,
1263        interval_ms: usize,
1264        depth: Option<usize>,
1265        client_id: Option<ClientId>,
1266        params: Option<Py<PyDict>>,
1267    ) -> PyResult<()> {
1268        let params = dict_to_params(py, params)?;
1269        let depth = depth.and_then(NonZeroUsize::new);
1270        let interval_ms = NonZeroUsize::new(interval_ms)
1271            .ok_or_else(|| to_pyvalue_err("interval_ms must be > 0"))?;
1272
1273        DataActor::subscribe_book_at_interval(
1274            self.inner_mut(),
1275            instrument_id,
1276            book_type,
1277            depth,
1278            interval_ms,
1279            client_id,
1280            params,
1281        );
1282        Ok(())
1283    }
1284
1285    #[pyo3(name = "subscribe_quotes")]
1286    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1287    fn py_subscribe_quotes(
1288        &mut self,
1289        py: Python<'_>,
1290        instrument_id: InstrumentId,
1291        client_id: Option<ClientId>,
1292        params: Option<Py<PyDict>>,
1293    ) -> PyResult<()> {
1294        let params = dict_to_params(py, params)?;
1295        DataActor::subscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1296        Ok(())
1297    }
1298
1299    #[pyo3(name = "subscribe_trades")]
1300    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1301    fn py_subscribe_trades(
1302        &mut self,
1303        py: Python<'_>,
1304        instrument_id: InstrumentId,
1305        client_id: Option<ClientId>,
1306        params: Option<Py<PyDict>>,
1307    ) -> PyResult<()> {
1308        let params = dict_to_params(py, params)?;
1309        DataActor::subscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1310        Ok(())
1311    }
1312
1313    #[pyo3(name = "subscribe_bars")]
1314    #[pyo3(signature = (bar_type, client_id=None, params=None))]
1315    fn py_subscribe_bars(
1316        &mut self,
1317        py: Python<'_>,
1318        bar_type: BarType,
1319        client_id: Option<ClientId>,
1320        params: Option<Py<PyDict>>,
1321    ) -> PyResult<()> {
1322        let params = dict_to_params(py, params)?;
1323        DataActor::subscribe_bars(self.inner_mut(), bar_type, client_id, params);
1324        Ok(())
1325    }
1326
1327    #[pyo3(name = "subscribe_mark_prices")]
1328    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1329    fn py_subscribe_mark_prices(
1330        &mut self,
1331        py: Python<'_>,
1332        instrument_id: InstrumentId,
1333        client_id: Option<ClientId>,
1334        params: Option<Py<PyDict>>,
1335    ) -> PyResult<()> {
1336        let params = dict_to_params(py, params)?;
1337        DataActor::subscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1338        Ok(())
1339    }
1340
1341    #[pyo3(name = "subscribe_index_prices")]
1342    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1343    fn py_subscribe_index_prices(
1344        &mut self,
1345        py: Python<'_>,
1346        instrument_id: InstrumentId,
1347        client_id: Option<ClientId>,
1348        params: Option<Py<PyDict>>,
1349    ) -> PyResult<()> {
1350        let params = dict_to_params(py, params)?;
1351        DataActor::subscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1352        Ok(())
1353    }
1354
1355    #[pyo3(name = "subscribe_funding_rates")]
1356    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1357    fn py_subscribe_funding_rates(
1358        &mut self,
1359        py: Python<'_>,
1360        instrument_id: InstrumentId,
1361        client_id: Option<ClientId>,
1362        params: Option<Py<PyDict>>,
1363    ) -> PyResult<()> {
1364        let params = dict_to_params(py, params)?;
1365        DataActor::subscribe_funding_rates(self.inner_mut(), instrument_id, client_id, params);
1366        Ok(())
1367    }
1368
1369    #[pyo3(name = "subscribe_option_greeks")]
1370    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1371    fn py_subscribe_option_greeks(
1372        &mut self,
1373        py: Python<'_>,
1374        instrument_id: InstrumentId,
1375        client_id: Option<ClientId>,
1376        params: Option<Py<PyDict>>,
1377    ) -> PyResult<()> {
1378        let params = dict_to_params(py, params)?;
1379        DataActor::subscribe_option_greeks(self.inner_mut(), instrument_id, client_id, params);
1380        Ok(())
1381    }
1382
1383    #[pyo3(name = "subscribe_instrument_status")]
1384    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1385    fn py_subscribe_instrument_status(
1386        &mut self,
1387        py: Python<'_>,
1388        instrument_id: InstrumentId,
1389        client_id: Option<ClientId>,
1390        params: Option<Py<PyDict>>,
1391    ) -> PyResult<()> {
1392        let params = dict_to_params(py, params)?;
1393        DataActor::subscribe_instrument_status(self.inner_mut(), instrument_id, client_id, params);
1394        Ok(())
1395    }
1396
1397    #[pyo3(name = "subscribe_instrument_close")]
1398    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1399    fn py_subscribe_instrument_close(
1400        &mut self,
1401        py: Python<'_>,
1402        instrument_id: InstrumentId,
1403        client_id: Option<ClientId>,
1404        params: Option<Py<PyDict>>,
1405    ) -> PyResult<()> {
1406        let params = dict_to_params(py, params)?;
1407        DataActor::subscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1408        Ok(())
1409    }
1410
1411    #[pyo3(name = "subscribe_option_chain")]
1412    #[pyo3(signature = (series_id, strike_range, snapshot_interval_ms=None, client_id=None, params=None))]
1413    fn py_subscribe_option_chain(
1414        &mut self,
1415        py: Python<'_>,
1416        series_id: OptionSeriesId,
1417        strike_range: PyStrikeRange,
1418        snapshot_interval_ms: Option<u64>,
1419        client_id: Option<ClientId>,
1420        params: Option<Py<PyDict>>,
1421    ) -> PyResult<()> {
1422        let params = dict_to_params(py, params)?;
1423        DataActor::subscribe_option_chain(
1424            self.inner_mut(),
1425            series_id,
1426            strike_range.inner,
1427            snapshot_interval_ms,
1428            client_id,
1429            params,
1430        );
1431        Ok(())
1432    }
1433
1434    #[pyo3(name = "subscribe_order_fills")]
1435    #[pyo3(signature = (instrument_id))]
1436    fn py_subscribe_order_fills(&mut self, instrument_id: InstrumentId) {
1437        DataActor::subscribe_order_fills(self.inner_mut(), instrument_id);
1438    }
1439
1440    #[pyo3(name = "subscribe_order_cancels")]
1441    #[pyo3(signature = (instrument_id))]
1442    fn py_subscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
1443        DataActor::subscribe_order_cancels(self.inner_mut(), instrument_id);
1444    }
1445
1446    #[pyo3(name = "unsubscribe_data")]
1447    #[pyo3(signature = (data_type, client_id=None, params=None))]
1448    fn py_unsubscribe_data(
1449        &mut self,
1450        py: Python<'_>,
1451        data_type: DataType,
1452        client_id: Option<ClientId>,
1453        params: Option<Py<PyDict>>,
1454    ) -> PyResult<()> {
1455        let params = dict_to_params(py, params)?;
1456        DataActor::unsubscribe_data(self.inner_mut(), data_type, client_id, params);
1457        Ok(())
1458    }
1459
1460    #[pyo3(name = "unsubscribe_signal")]
1461    #[pyo3(signature = (name=""))]
1462    fn py_unsubscribe_signal(&mut self, name: &str) {
1463        DataActor::unsubscribe_signal(self.inner_mut(), name);
1464    }
1465
1466    #[pyo3(name = "unsubscribe_instruments")]
1467    #[pyo3(signature = (venue, client_id=None, params=None))]
1468    fn py_unsubscribe_instruments(
1469        &mut self,
1470        py: Python<'_>,
1471        venue: Venue,
1472        client_id: Option<ClientId>,
1473        params: Option<Py<PyDict>>,
1474    ) -> PyResult<()> {
1475        let params = dict_to_params(py, params)?;
1476        DataActor::unsubscribe_instruments(self.inner_mut(), venue, client_id, params);
1477        Ok(())
1478    }
1479
1480    #[pyo3(name = "unsubscribe_instrument")]
1481    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1482    fn py_unsubscribe_instrument(
1483        &mut self,
1484        py: Python<'_>,
1485        instrument_id: InstrumentId,
1486        client_id: Option<ClientId>,
1487        params: Option<Py<PyDict>>,
1488    ) -> PyResult<()> {
1489        let params = dict_to_params(py, params)?;
1490        DataActor::unsubscribe_instrument(self.inner_mut(), instrument_id, client_id, params);
1491        Ok(())
1492    }
1493
1494    #[pyo3(name = "unsubscribe_book_deltas")]
1495    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1496    fn py_unsubscribe_book_deltas(
1497        &mut self,
1498        py: Python<'_>,
1499        instrument_id: InstrumentId,
1500        client_id: Option<ClientId>,
1501        params: Option<Py<PyDict>>,
1502    ) -> PyResult<()> {
1503        let params = dict_to_params(py, params)?;
1504        DataActor::unsubscribe_book_deltas(self.inner_mut(), instrument_id, client_id, params);
1505        Ok(())
1506    }
1507
1508    #[pyo3(name = "unsubscribe_book_at_interval")]
1509    #[pyo3(signature = (instrument_id, interval_ms, client_id=None, params=None))]
1510    fn py_unsubscribe_book_at_interval(
1511        &mut self,
1512        py: Python<'_>,
1513        instrument_id: InstrumentId,
1514        interval_ms: usize,
1515        client_id: Option<ClientId>,
1516        params: Option<Py<PyDict>>,
1517    ) -> PyResult<()> {
1518        let params = dict_to_params(py, params)?;
1519        let interval_ms = NonZeroUsize::new(interval_ms)
1520            .ok_or_else(|| to_pyvalue_err("interval_ms must be > 0"))?;
1521
1522        DataActor::unsubscribe_book_at_interval(
1523            self.inner_mut(),
1524            instrument_id,
1525            interval_ms,
1526            client_id,
1527            params,
1528        );
1529        Ok(())
1530    }
1531
1532    #[pyo3(name = "unsubscribe_quotes")]
1533    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1534    fn py_unsubscribe_quotes(
1535        &mut self,
1536        py: Python<'_>,
1537        instrument_id: InstrumentId,
1538        client_id: Option<ClientId>,
1539        params: Option<Py<PyDict>>,
1540    ) -> PyResult<()> {
1541        let params = dict_to_params(py, params)?;
1542        DataActor::unsubscribe_quotes(self.inner_mut(), instrument_id, client_id, params);
1543        Ok(())
1544    }
1545
1546    #[pyo3(name = "unsubscribe_trades")]
1547    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1548    fn py_unsubscribe_trades(
1549        &mut self,
1550        py: Python<'_>,
1551        instrument_id: InstrumentId,
1552        client_id: Option<ClientId>,
1553        params: Option<Py<PyDict>>,
1554    ) -> PyResult<()> {
1555        let params = dict_to_params(py, params)?;
1556        DataActor::unsubscribe_trades(self.inner_mut(), instrument_id, client_id, params);
1557        Ok(())
1558    }
1559
1560    #[pyo3(name = "unsubscribe_bars")]
1561    #[pyo3(signature = (bar_type, client_id=None, params=None))]
1562    fn py_unsubscribe_bars(
1563        &mut self,
1564        py: Python<'_>,
1565        bar_type: BarType,
1566        client_id: Option<ClientId>,
1567        params: Option<Py<PyDict>>,
1568    ) -> PyResult<()> {
1569        let params = dict_to_params(py, params)?;
1570        DataActor::unsubscribe_bars(self.inner_mut(), bar_type, client_id, params);
1571        Ok(())
1572    }
1573
1574    #[pyo3(name = "unsubscribe_mark_prices")]
1575    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1576    fn py_unsubscribe_mark_prices(
1577        &mut self,
1578        py: Python<'_>,
1579        instrument_id: InstrumentId,
1580        client_id: Option<ClientId>,
1581        params: Option<Py<PyDict>>,
1582    ) -> PyResult<()> {
1583        let params = dict_to_params(py, params)?;
1584        DataActor::unsubscribe_mark_prices(self.inner_mut(), instrument_id, client_id, params);
1585        Ok(())
1586    }
1587
1588    #[pyo3(name = "unsubscribe_index_prices")]
1589    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1590    fn py_unsubscribe_index_prices(
1591        &mut self,
1592        py: Python<'_>,
1593        instrument_id: InstrumentId,
1594        client_id: Option<ClientId>,
1595        params: Option<Py<PyDict>>,
1596    ) -> PyResult<()> {
1597        let params = dict_to_params(py, params)?;
1598        DataActor::unsubscribe_index_prices(self.inner_mut(), instrument_id, client_id, params);
1599        Ok(())
1600    }
1601
1602    #[pyo3(name = "unsubscribe_funding_rates")]
1603    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1604    fn py_unsubscribe_funding_rates(
1605        &mut self,
1606        py: Python<'_>,
1607        instrument_id: InstrumentId,
1608        client_id: Option<ClientId>,
1609        params: Option<Py<PyDict>>,
1610    ) -> PyResult<()> {
1611        let params = dict_to_params(py, params)?;
1612        DataActor::unsubscribe_funding_rates(self.inner_mut(), instrument_id, client_id, params);
1613        Ok(())
1614    }
1615
1616    #[pyo3(name = "unsubscribe_option_greeks")]
1617    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1618    fn py_unsubscribe_option_greeks(
1619        &mut self,
1620        py: Python<'_>,
1621        instrument_id: InstrumentId,
1622        client_id: Option<ClientId>,
1623        params: Option<Py<PyDict>>,
1624    ) -> PyResult<()> {
1625        let params = dict_to_params(py, params)?;
1626        DataActor::unsubscribe_option_greeks(self.inner_mut(), instrument_id, client_id, params);
1627        Ok(())
1628    }
1629
1630    #[pyo3(name = "unsubscribe_instrument_status")]
1631    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1632    fn py_unsubscribe_instrument_status(
1633        &mut self,
1634        py: Python<'_>,
1635        instrument_id: InstrumentId,
1636        client_id: Option<ClientId>,
1637        params: Option<Py<PyDict>>,
1638    ) -> PyResult<()> {
1639        let params = dict_to_params(py, params)?;
1640        DataActor::unsubscribe_instrument_status(
1641            self.inner_mut(),
1642            instrument_id,
1643            client_id,
1644            params,
1645        );
1646        Ok(())
1647    }
1648
1649    #[pyo3(name = "unsubscribe_instrument_close")]
1650    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1651    fn py_unsubscribe_instrument_close(
1652        &mut self,
1653        py: Python<'_>,
1654        instrument_id: InstrumentId,
1655        client_id: Option<ClientId>,
1656        params: Option<Py<PyDict>>,
1657    ) -> PyResult<()> {
1658        let params = dict_to_params(py, params)?;
1659        DataActor::unsubscribe_instrument_close(self.inner_mut(), instrument_id, client_id, params);
1660        Ok(())
1661    }
1662
1663    #[pyo3(name = "unsubscribe_option_chain")]
1664    #[pyo3(signature = (series_id, client_id=None))]
1665    fn py_unsubscribe_option_chain(
1666        &mut self,
1667        series_id: OptionSeriesId,
1668        client_id: Option<ClientId>,
1669    ) {
1670        DataActor::unsubscribe_option_chain(self.inner_mut(), series_id, client_id);
1671    }
1672
1673    #[pyo3(name = "unsubscribe_order_fills")]
1674    #[pyo3(signature = (instrument_id))]
1675    fn py_unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
1676        DataActor::unsubscribe_order_fills(self.inner_mut(), instrument_id);
1677    }
1678
1679    #[pyo3(name = "unsubscribe_order_cancels")]
1680    #[pyo3(signature = (instrument_id))]
1681    fn py_unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
1682        DataActor::unsubscribe_order_cancels(self.inner_mut(), instrument_id);
1683    }
1684
1685    #[pyo3(name = "request_data")]
1686    #[pyo3(signature = (data_type, client_id, start=None, end=None, limit=None, params=None))]
1687    #[expect(clippy::too_many_arguments)]
1688    fn py_request_data(
1689        &mut self,
1690        py: Python<'_>,
1691        data_type: DataType,
1692        client_id: ClientId,
1693        start: Option<u64>,
1694        end: Option<u64>,
1695        limit: Option<usize>,
1696        params: Option<Py<PyDict>>,
1697    ) -> PyResult<String> {
1698        let params = dict_to_params(py, params)?;
1699        let limit = limit.and_then(NonZeroUsize::new);
1700        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1701        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1702
1703        let request_id = DataActor::request_data(
1704            self.inner_mut(),
1705            data_type,
1706            client_id,
1707            start,
1708            end,
1709            limit,
1710            params,
1711        )
1712        .map_err(to_pyvalue_err)?;
1713        Ok(request_id.to_string())
1714    }
1715
1716    #[pyo3(name = "request_instrument")]
1717    #[pyo3(signature = (instrument_id, start=None, end=None, client_id=None, params=None))]
1718    fn py_request_instrument(
1719        &mut self,
1720        py: Python<'_>,
1721        instrument_id: InstrumentId,
1722        start: Option<u64>,
1723        end: Option<u64>,
1724        client_id: Option<ClientId>,
1725        params: Option<Py<PyDict>>,
1726    ) -> PyResult<String> {
1727        let params = dict_to_params(py, params)?;
1728        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1729        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1730
1731        let request_id = DataActor::request_instrument(
1732            self.inner_mut(),
1733            instrument_id,
1734            start,
1735            end,
1736            client_id,
1737            params,
1738        )
1739        .map_err(to_pyvalue_err)?;
1740        Ok(request_id.to_string())
1741    }
1742
1743    #[pyo3(name = "request_instruments")]
1744    #[pyo3(signature = (venue=None, start=None, end=None, client_id=None, params=None))]
1745    fn py_request_instruments(
1746        &mut self,
1747        py: Python<'_>,
1748        venue: Option<Venue>,
1749        start: Option<u64>,
1750        end: Option<u64>,
1751        client_id: Option<ClientId>,
1752        params: Option<Py<PyDict>>,
1753    ) -> PyResult<String> {
1754        let params = dict_to_params(py, params)?;
1755        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1756        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1757
1758        let request_id =
1759            DataActor::request_instruments(self.inner_mut(), venue, start, end, client_id, params)
1760                .map_err(to_pyvalue_err)?;
1761        Ok(request_id.to_string())
1762    }
1763
1764    #[pyo3(name = "request_book_snapshot")]
1765    #[pyo3(signature = (instrument_id, depth=None, client_id=None, params=None))]
1766    fn py_request_book_snapshot(
1767        &mut self,
1768        py: Python<'_>,
1769        instrument_id: InstrumentId,
1770        depth: Option<usize>,
1771        client_id: Option<ClientId>,
1772        params: Option<Py<PyDict>>,
1773    ) -> PyResult<String> {
1774        let params = dict_to_params(py, params)?;
1775        let depth = depth.and_then(NonZeroUsize::new);
1776
1777        let request_id = DataActor::request_book_snapshot(
1778            self.inner_mut(),
1779            instrument_id,
1780            depth,
1781            client_id,
1782            params,
1783        )
1784        .map_err(to_pyvalue_err)?;
1785        Ok(request_id.to_string())
1786    }
1787
1788    #[pyo3(name = "request_quotes")]
1789    #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1790    #[expect(clippy::too_many_arguments)]
1791    fn py_request_quotes(
1792        &mut self,
1793        py: Python<'_>,
1794        instrument_id: InstrumentId,
1795        start: Option<u64>,
1796        end: Option<u64>,
1797        limit: Option<usize>,
1798        client_id: Option<ClientId>,
1799        params: Option<Py<PyDict>>,
1800    ) -> PyResult<String> {
1801        let params = dict_to_params(py, params)?;
1802        let limit = limit.and_then(NonZeroUsize::new);
1803        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1804        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1805
1806        let request_id = DataActor::request_quotes(
1807            self.inner_mut(),
1808            instrument_id,
1809            start,
1810            end,
1811            limit,
1812            client_id,
1813            params,
1814        )
1815        .map_err(to_pyvalue_err)?;
1816        Ok(request_id.to_string())
1817    }
1818
1819    #[pyo3(name = "request_trades")]
1820    #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1821    #[expect(clippy::too_many_arguments)]
1822    fn py_request_trades(
1823        &mut self,
1824        py: Python<'_>,
1825        instrument_id: InstrumentId,
1826        start: Option<u64>,
1827        end: Option<u64>,
1828        limit: Option<usize>,
1829        client_id: Option<ClientId>,
1830        params: Option<Py<PyDict>>,
1831    ) -> PyResult<String> {
1832        let params = dict_to_params(py, params)?;
1833        let limit = limit.and_then(NonZeroUsize::new);
1834        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1835        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1836
1837        let request_id = DataActor::request_trades(
1838            self.inner_mut(),
1839            instrument_id,
1840            start,
1841            end,
1842            limit,
1843            client_id,
1844            params,
1845        )
1846        .map_err(to_pyvalue_err)?;
1847        Ok(request_id.to_string())
1848    }
1849
1850    #[pyo3(name = "request_funding_rates")]
1851    #[pyo3(signature = (instrument_id, start=None, end=None, limit=None, client_id=None, params=None))]
1852    #[expect(clippy::too_many_arguments)]
1853    fn py_request_funding_rates(
1854        &mut self,
1855        py: Python<'_>,
1856        instrument_id: InstrumentId,
1857        start: Option<u64>,
1858        end: Option<u64>,
1859        limit: Option<usize>,
1860        client_id: Option<ClientId>,
1861        params: Option<Py<PyDict>>,
1862    ) -> PyResult<String> {
1863        let params = dict_to_params(py, params)?;
1864        let limit = limit.and_then(NonZeroUsize::new);
1865        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1866        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1867
1868        let request_id = DataActor::request_funding_rates(
1869            self.inner_mut(),
1870            instrument_id,
1871            start,
1872            end,
1873            limit,
1874            client_id,
1875            params,
1876        )
1877        .map_err(to_pyvalue_err)?;
1878        Ok(request_id.to_string())
1879    }
1880
1881    #[pyo3(name = "request_bars")]
1882    #[pyo3(signature = (bar_type, start=None, end=None, limit=None, client_id=None, params=None))]
1883    #[expect(clippy::too_many_arguments)]
1884    fn py_request_bars(
1885        &mut self,
1886        py: Python<'_>,
1887        bar_type: BarType,
1888        start: Option<u64>,
1889        end: Option<u64>,
1890        limit: Option<usize>,
1891        client_id: Option<ClientId>,
1892        params: Option<Py<PyDict>>,
1893    ) -> PyResult<String> {
1894        let params = dict_to_params(py, params)?;
1895        let limit = limit.and_then(NonZeroUsize::new);
1896        let start = start.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1897        let end = end.map(|ts| UnixNanos::from(ts).to_datetime_utc());
1898
1899        let request_id = DataActor::request_bars(
1900            self.inner_mut(),
1901            bar_type,
1902            start,
1903            end,
1904            limit,
1905            client_id,
1906            params,
1907        )
1908        .map_err(to_pyvalue_err)?;
1909        Ok(request_id.to_string())
1910    }
1911
1912    #[allow(unused_variables, clippy::needless_pass_by_value)]
1913    #[pyo3(name = "on_historical_data")]
1914    fn py_on_historical_data(&mut self, data: Py<PyAny>) {
1915        // Default implementation - can be overridden in Python subclasses
1916    }
1917
1918    #[allow(unused_variables, clippy::needless_pass_by_value)]
1919    #[pyo3(name = "on_historical_quotes")]
1920    fn py_on_historical_quotes(&mut self, quotes: Vec<QuoteTick>) {
1921        // Default implementation - can be overridden in Python subclasses
1922    }
1923
1924    #[allow(unused_variables, clippy::needless_pass_by_value)]
1925    #[pyo3(name = "on_historical_trades")]
1926    fn py_on_historical_trades(&mut self, trades: Vec<TradeTick>) {
1927        // Default implementation - can be overridden in Python subclasses
1928    }
1929
1930    #[allow(unused_variables, clippy::needless_pass_by_value)]
1931    #[pyo3(name = "on_historical_funding_rates")]
1932    fn py_on_historical_funding_rates(&mut self, funding_rates: Vec<FundingRateUpdate>) {
1933        // Default implementation - can be overridden in Python subclasses
1934    }
1935
1936    #[allow(unused_variables, clippy::needless_pass_by_value)]
1937    #[pyo3(name = "on_historical_bars")]
1938    fn py_on_historical_bars(&mut self, bars: Vec<Bar>) {
1939        // Default implementation - can be overridden in Python subclasses
1940    }
1941
1942    #[allow(unused_variables, clippy::needless_pass_by_value)]
1943    #[pyo3(name = "on_historical_mark_prices")]
1944    fn py_on_historical_mark_prices(&mut self, mark_prices: Vec<MarkPriceUpdate>) {
1945        // Default implementation - can be overridden in Python subclasses
1946    }
1947
1948    #[allow(unused_variables, clippy::needless_pass_by_value)]
1949    #[pyo3(name = "on_historical_index_prices")]
1950    fn py_on_historical_index_prices(&mut self, index_prices: Vec<IndexPriceUpdate>) {
1951        // Default implementation - can be overridden in Python subclasses
1952    }
1953}
1954
1955#[cfg(feature = "defi")]
1956#[pymethods]
1957#[pyo3_stub_gen::derive::gen_stub_pymethods]
1958impl PyDataActor {
1959    #[pyo3(name = "on_block")]
1960    #[allow(unused_variables, clippy::needless_pass_by_value)]
1961    fn py_on_block(&mut self, block: Block) {}
1962
1963    #[pyo3(name = "on_pool")]
1964    #[allow(unused_variables, clippy::needless_pass_by_value)]
1965    fn py_on_pool(&mut self, pool: Pool) {}
1966
1967    #[pyo3(name = "on_pool_swap")]
1968    #[allow(unused_variables, clippy::needless_pass_by_value)]
1969    fn py_on_pool_swap(&mut self, swap: PoolSwap) {}
1970
1971    #[pyo3(name = "on_pool_liquidity_update")]
1972    #[allow(unused_variables, clippy::needless_pass_by_value)]
1973    fn py_on_pool_liquidity_update(&mut self, update: PoolLiquidityUpdate) {}
1974
1975    #[pyo3(name = "on_pool_fee_collect")]
1976    #[allow(unused_variables, clippy::needless_pass_by_value)]
1977    fn py_on_pool_fee_collect(&mut self, update: PoolFeeCollect) {}
1978
1979    #[pyo3(name = "on_pool_flash")]
1980    #[allow(unused_variables, clippy::needless_pass_by_value)]
1981    fn py_on_pool_flash(&mut self, flash: PoolFlash) {}
1982
1983    #[pyo3(name = "subscribe_blocks")]
1984    #[pyo3(signature = (chain, client_id=None, params=None))]
1985    fn py_subscribe_blocks(
1986        &mut self,
1987        py: Python<'_>,
1988        chain: Blockchain,
1989        client_id: Option<ClientId>,
1990        params: Option<Py<PyDict>>,
1991    ) -> PyResult<()> {
1992        let params = dict_to_params(py, params)?;
1993        DataActor::subscribe_blocks(self.inner_mut(), chain, client_id, params);
1994        Ok(())
1995    }
1996
1997    #[pyo3(name = "subscribe_pool")]
1998    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
1999    fn py_subscribe_pool(
2000        &mut self,
2001        py: Python<'_>,
2002        instrument_id: InstrumentId,
2003        client_id: Option<ClientId>,
2004        params: Option<Py<PyDict>>,
2005    ) -> PyResult<()> {
2006        let params = dict_to_params(py, params)?;
2007        DataActor::subscribe_pool(self.inner_mut(), instrument_id, client_id, params);
2008        Ok(())
2009    }
2010
2011    #[pyo3(name = "subscribe_pool_swaps")]
2012    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2013    fn py_subscribe_pool_swaps(
2014        &mut self,
2015        py: Python<'_>,
2016        instrument_id: InstrumentId,
2017        client_id: Option<ClientId>,
2018        params: Option<Py<PyDict>>,
2019    ) -> PyResult<()> {
2020        let params = dict_to_params(py, params)?;
2021        DataActor::subscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
2022        Ok(())
2023    }
2024
2025    #[pyo3(name = "subscribe_pool_liquidity_updates")]
2026    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2027    fn py_subscribe_pool_liquidity_updates(
2028        &mut self,
2029        py: Python<'_>,
2030        instrument_id: InstrumentId,
2031        client_id: Option<ClientId>,
2032        params: Option<Py<PyDict>>,
2033    ) -> PyResult<()> {
2034        let params = dict_to_params(py, params)?;
2035        DataActor::subscribe_pool_liquidity_updates(
2036            self.inner_mut(),
2037            instrument_id,
2038            client_id,
2039            params,
2040        );
2041        Ok(())
2042    }
2043
2044    #[pyo3(name = "subscribe_pool_fee_collects")]
2045    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2046    fn py_subscribe_pool_fee_collects(
2047        &mut self,
2048        py: Python<'_>,
2049        instrument_id: InstrumentId,
2050        client_id: Option<ClientId>,
2051        params: Option<Py<PyDict>>,
2052    ) -> PyResult<()> {
2053        let params = dict_to_params(py, params)?;
2054        DataActor::subscribe_pool_fee_collects(self.inner_mut(), instrument_id, client_id, params);
2055        Ok(())
2056    }
2057
2058    #[pyo3(name = "subscribe_pool_flash_events")]
2059    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2060    fn py_subscribe_pool_flash_events(
2061        &mut self,
2062        py: Python<'_>,
2063        instrument_id: InstrumentId,
2064        client_id: Option<ClientId>,
2065        params: Option<Py<PyDict>>,
2066    ) -> PyResult<()> {
2067        let params = dict_to_params(py, params)?;
2068        DataActor::subscribe_pool_flash_events(self.inner_mut(), instrument_id, client_id, params);
2069        Ok(())
2070    }
2071
2072    #[pyo3(name = "unsubscribe_blocks")]
2073    #[pyo3(signature = (chain, client_id=None, params=None))]
2074    fn py_unsubscribe_blocks(
2075        &mut self,
2076        py: Python<'_>,
2077        chain: Blockchain,
2078        client_id: Option<ClientId>,
2079        params: Option<Py<PyDict>>,
2080    ) -> PyResult<()> {
2081        let params = dict_to_params(py, params)?;
2082        DataActor::unsubscribe_blocks(self.inner_mut(), chain, client_id, params);
2083        Ok(())
2084    }
2085
2086    #[pyo3(name = "unsubscribe_pool")]
2087    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2088    fn py_unsubscribe_pool(
2089        &mut self,
2090        py: Python<'_>,
2091        instrument_id: InstrumentId,
2092        client_id: Option<ClientId>,
2093        params: Option<Py<PyDict>>,
2094    ) -> PyResult<()> {
2095        let params = dict_to_params(py, params)?;
2096        DataActor::unsubscribe_pool(self.inner_mut(), instrument_id, client_id, params);
2097        Ok(())
2098    }
2099
2100    #[pyo3(name = "unsubscribe_pool_swaps")]
2101    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2102    fn py_unsubscribe_pool_swaps(
2103        &mut self,
2104        py: Python<'_>,
2105        instrument_id: InstrumentId,
2106        client_id: Option<ClientId>,
2107        params: Option<Py<PyDict>>,
2108    ) -> PyResult<()> {
2109        let params = dict_to_params(py, params)?;
2110        DataActor::unsubscribe_pool_swaps(self.inner_mut(), instrument_id, client_id, params);
2111        Ok(())
2112    }
2113
2114    #[pyo3(name = "unsubscribe_pool_liquidity_updates")]
2115    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2116    fn py_unsubscribe_pool_liquidity_updates(
2117        &mut self,
2118        py: Python<'_>,
2119        instrument_id: InstrumentId,
2120        client_id: Option<ClientId>,
2121        params: Option<Py<PyDict>>,
2122    ) -> PyResult<()> {
2123        let params = dict_to_params(py, params)?;
2124        DataActor::unsubscribe_pool_liquidity_updates(
2125            self.inner_mut(),
2126            instrument_id,
2127            client_id,
2128            params,
2129        );
2130        Ok(())
2131    }
2132
2133    #[pyo3(name = "unsubscribe_pool_fee_collects")]
2134    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2135    fn py_unsubscribe_pool_fee_collects(
2136        &mut self,
2137        py: Python<'_>,
2138        instrument_id: InstrumentId,
2139        client_id: Option<ClientId>,
2140        params: Option<Py<PyDict>>,
2141    ) -> PyResult<()> {
2142        let params = dict_to_params(py, params)?;
2143        DataActor::unsubscribe_pool_fee_collects(
2144            self.inner_mut(),
2145            instrument_id,
2146            client_id,
2147            params,
2148        );
2149        Ok(())
2150    }
2151
2152    #[pyo3(name = "unsubscribe_pool_flash_events")]
2153    #[pyo3(signature = (instrument_id, client_id=None, params=None))]
2154    fn py_unsubscribe_pool_flash_events(
2155        &mut self,
2156        py: Python<'_>,
2157        instrument_id: InstrumentId,
2158        client_id: Option<ClientId>,
2159        params: Option<Py<PyDict>>,
2160    ) -> PyResult<()> {
2161        let params = dict_to_params(py, params)?;
2162        DataActor::unsubscribe_pool_flash_events(
2163            self.inner_mut(),
2164            instrument_id,
2165            client_id,
2166            params,
2167        );
2168        Ok(())
2169    }
2170}
2171
2172#[cfg(test)]
2173mod tests {
2174    use std::{cell::RefCell, rc::Rc, str::FromStr, sync::Arc};
2175
2176    #[cfg(feature = "defi")]
2177    use alloy_primitives::{I256, U160, U256};
2178    use nautilus_core::{UUID4, UnixNanos, python::IntoPyObjectNautilusExt};
2179    #[cfg(feature = "defi")]
2180    use nautilus_model::defi::{
2181        AmmType, Block, Blockchain, Chain, Dex, DexType, Pool, PoolFeeCollect, PoolFlash,
2182        PoolIdentifier, PoolLiquidityUpdate, PoolLiquidityUpdateType, PoolSwap, Token,
2183    };
2184    use nautilus_model::{
2185        data::{
2186            Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate,
2187            InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, QuoteTick,
2188            TradeTick,
2189            close::InstrumentClose,
2190            greeks::OptionGreekValues,
2191            option_chain::{OptionChainSlice, OptionGreeks},
2192            stubs::stub_custom_data,
2193        },
2194        enums::{
2195            AggressorSide, BookType, GreeksConvention, InstrumentCloseType, MarketStatusAction,
2196        },
2197        identifiers::{ClientId, OptionSeriesId, TradeId, TraderId, Venue},
2198        instruments::{CurrencyPair, InstrumentAny, stubs::audusd_sim},
2199        orderbook::OrderBook,
2200        types::{Price, Quantity},
2201    };
2202    use pyo3::{Py, PyAny, PyResult, Python, ffi::c_str, types::PyAnyMethods};
2203    use rstest::{fixture, rstest};
2204    use ustr::Ustr;
2205
2206    use super::PyDataActor;
2207    use crate::{
2208        actor::DataActor,
2209        cache::Cache,
2210        clock::TestClock,
2211        enums::ComponentState,
2212        runner::{SyncDataCommandSender, set_data_cmd_sender},
2213        signal::Signal,
2214        timer::TimeEvent,
2215    };
2216
2217    #[fixture]
2218    fn clock() -> Rc<RefCell<TestClock>> {
2219        Rc::new(RefCell::new(TestClock::new()))
2220    }
2221
2222    #[fixture]
2223    fn cache() -> Rc<RefCell<Cache>> {
2224        Rc::new(RefCell::new(Cache::new(None, None)))
2225    }
2226
2227    #[fixture]
2228    fn trader_id() -> TraderId {
2229        TraderId::from("TRADER-001")
2230    }
2231
2232    #[fixture]
2233    fn client_id() -> ClientId {
2234        ClientId::new("TestClient")
2235    }
2236
2237    #[fixture]
2238    fn venue() -> Venue {
2239        Venue::from("SIM")
2240    }
2241
2242    #[fixture]
2243    fn data_type() -> DataType {
2244        DataType::new("TestData", None, None)
2245    }
2246
2247    #[fixture]
2248    fn bar_type(audusd_sim: CurrencyPair) -> BarType {
2249        BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", audusd_sim.id)).unwrap()
2250    }
2251
2252    fn create_unregistered_actor() -> PyDataActor {
2253        PyDataActor::new(None)
2254    }
2255
2256    fn create_registered_actor(
2257        clock: Rc<RefCell<TestClock>>,
2258        cache: Rc<RefCell<Cache>>,
2259        trader_id: TraderId,
2260    ) -> PyDataActor {
2261        // Set up sync data command sender for tests
2262        let sender = SyncDataCommandSender;
2263        set_data_cmd_sender(Arc::new(sender));
2264
2265        let mut actor = PyDataActor::new(None);
2266        actor.register(trader_id, clock, cache).unwrap();
2267        actor
2268    }
2269
2270    #[rstest]
2271    fn test_new_actor_creation() {
2272        let actor = PyDataActor::new(None);
2273        assert!(actor.trader_id().is_none());
2274    }
2275
2276    #[rstest]
2277    fn test_clock_access_before_registration_raises_error() {
2278        let actor = PyDataActor::new(None);
2279
2280        // Accessing clock before registration should raise PyRuntimeError
2281        let result = actor.py_clock();
2282        assert!(result.is_err());
2283
2284        let error = result.unwrap_err();
2285        pyo3::Python::initialize();
2286        pyo3::Python::attach(|py| {
2287            assert!(error.is_instance_of::<pyo3::exceptions::PyRuntimeError>(py));
2288        });
2289
2290        let error_msg = error.to_string();
2291        assert!(
2292            error_msg.contains("Actor must be registered with a trader before accessing clock")
2293        );
2294    }
2295
2296    #[rstest]
2297    fn test_unregistered_actor_methods_work() {
2298        let actor = create_unregistered_actor();
2299
2300        assert!(!actor.py_is_ready());
2301        assert!(!actor.py_is_running());
2302        assert!(!actor.py_is_stopped());
2303        assert!(!actor.py_is_disposed());
2304        assert!(!actor.py_is_degraded());
2305        assert!(!actor.py_is_faulted());
2306
2307        // Verify unregistered state
2308        assert_eq!(actor.trader_id(), None);
2309    }
2310
2311    #[rstest]
2312    fn test_registration_success(
2313        clock: Rc<RefCell<TestClock>>,
2314        cache: Rc<RefCell<Cache>>,
2315        trader_id: TraderId,
2316    ) {
2317        let mut actor = create_unregistered_actor();
2318        actor.register(trader_id, clock, cache).unwrap();
2319        assert!(actor.trader_id().is_some());
2320        assert_eq!(actor.trader_id().unwrap(), trader_id);
2321    }
2322
2323    #[rstest]
2324    fn test_registered_actor_basic_properties(
2325        clock: Rc<RefCell<TestClock>>,
2326        cache: Rc<RefCell<Cache>>,
2327        trader_id: TraderId,
2328    ) {
2329        let actor = create_registered_actor(clock, cache, trader_id);
2330
2331        assert_eq!(actor.state(), ComponentState::Ready);
2332        assert_eq!(actor.trader_id(), Some(TraderId::from("TRADER-001")));
2333        assert!(actor.py_is_ready());
2334        assert!(!actor.py_is_running());
2335        assert!(!actor.py_is_stopped());
2336        assert!(!actor.py_is_disposed());
2337        assert!(!actor.py_is_degraded());
2338        assert!(!actor.py_is_faulted());
2339    }
2340
2341    #[rstest]
2342    fn test_basic_subscription_methods_compile(
2343        clock: Rc<RefCell<TestClock>>,
2344        cache: Rc<RefCell<Cache>>,
2345        trader_id: TraderId,
2346        data_type: DataType,
2347        client_id: ClientId,
2348        audusd_sim: CurrencyPair,
2349    ) {
2350        let mut actor = create_registered_actor(clock, cache, trader_id);
2351
2352        pyo3::Python::initialize();
2353        pyo3::Python::attach(|py| {
2354            assert!(
2355                actor
2356                    .py_subscribe_data(py, data_type.clone(), Some(client_id), None)
2357                    .is_ok()
2358            );
2359            assert!(
2360                actor
2361                    .py_subscribe_quotes(py, audusd_sim.id, Some(client_id), None)
2362                    .is_ok()
2363            );
2364            assert!(
2365                actor
2366                    .py_unsubscribe_data(py, data_type, Some(client_id), None)
2367                    .is_ok()
2368            );
2369            assert!(
2370                actor
2371                    .py_unsubscribe_quotes(py, audusd_sim.id, Some(client_id), None)
2372                    .is_ok()
2373            );
2374        });
2375    }
2376
2377    #[rstest]
2378    fn test_shutdown_system_passes_through(
2379        clock: Rc<RefCell<TestClock>>,
2380        cache: Rc<RefCell<Cache>>,
2381        trader_id: TraderId,
2382    ) {
2383        let actor = create_registered_actor(clock, cache, trader_id);
2384
2385        actor.py_shutdown_system(Some("Test shutdown".to_string()));
2386        actor.py_shutdown_system(None);
2387    }
2388
2389    #[rstest]
2390    fn test_publish_data_delivers_to_any_subscriber(
2391        clock: Rc<RefCell<TestClock>>,
2392        cache: Rc<RefCell<Cache>>,
2393        trader_id: TraderId,
2394    ) {
2395        use crate::msgbus::{
2396            self, MessageBus, get_message_bus, switchboard::get_custom_topic,
2397            typed_handler::ShareableMessageHandler,
2398        };
2399
2400        // Ensure clean msgbus for this test
2401        *get_message_bus().borrow_mut() = MessageBus::default();
2402
2403        let actor = create_registered_actor(clock, cache, trader_id);
2404        let data = stub_custom_data(1, 42, None, None);
2405        let topic = get_custom_topic(&data.data_type);
2406
2407        let received: Rc<RefCell<Vec<CustomData>>> = Rc::new(RefCell::new(Vec::new()));
2408        let received_clone = received.clone();
2409        let handler = ShareableMessageHandler::from_typed(move |d: &CustomData| {
2410            received_clone.borrow_mut().push(d.clone());
2411        });
2412        msgbus::subscribe_any(topic.into(), handler, None);
2413
2414        actor.py_publish_data(&data.data_type, &data);
2415
2416        let received = received.borrow();
2417        assert_eq!(received.len(), 1);
2418        assert_eq!(received[0].data_type, data.data_type);
2419    }
2420
2421    #[rstest]
2422    fn test_publish_signal_delivers_to_customdata_subscriber(
2423        clock: Rc<RefCell<TestClock>>,
2424        cache: Rc<RefCell<Cache>>,
2425        trader_id: TraderId,
2426    ) {
2427        use crate::{
2428            msgbus::{
2429                self, MessageBus, Pattern, get_message_bus, typed_handler::ShareableMessageHandler,
2430            },
2431            signal::Signal,
2432        };
2433
2434        *get_message_bus().borrow_mut() = MessageBus::default();
2435
2436        let actor = create_registered_actor(clock, cache, trader_id);
2437
2438        // Signals travel as `CustomData` on the bus so persistence and other
2439        // `CustomData`-aware subscribers pick them up. Downcast inside the handler.
2440        let received: Rc<RefCell<Vec<Signal>>> = Rc::new(RefCell::new(Vec::new()));
2441        let received_clone = received.clone();
2442        let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
2443            if let Some(sig) = data.data.as_any().downcast_ref::<Signal>() {
2444                received_clone.borrow_mut().push(sig.clone());
2445            }
2446        });
2447        let pattern: crate::msgbus::MStr<Pattern> = "data.Signal*".to_string().into();
2448        msgbus::subscribe_any(pattern, handler, None);
2449
2450        pyo3::Python::initialize();
2451        Python::attach(|py| {
2452            let val1: Py<PyAny> = 1.0_f64.into_py_any_unwrap(py);
2453            let val2: Py<PyAny> = "HIGH".into_py_any_unwrap(py);
2454            actor.py_publish_signal(py, "example", val1, 0).unwrap();
2455            actor
2456                .py_publish_signal(py, "risk", val2, 1_700_000_000_000_000_000)
2457                .unwrap();
2458        });
2459
2460        let received = received.borrow();
2461        assert_eq!(received.len(), 2);
2462        assert_eq!(received[0].name.as_str(), "example");
2463        assert_eq!(received[0].value, "1.0");
2464        assert_eq!(received[1].name.as_str(), "risk");
2465        assert_eq!(received[1].value, "HIGH");
2466        assert_eq!(
2467            received[1].ts_event,
2468            UnixNanos::from(1_700_000_000_000_000_000_u64)
2469        );
2470    }
2471
2472    #[rstest]
2473    fn test_publish_signal_accepts_numeric_py_values(
2474        clock: Rc<RefCell<TestClock>>,
2475        cache: Rc<RefCell<Cache>>,
2476        trader_id: TraderId,
2477    ) {
2478        use crate::{
2479            msgbus::{
2480                self, MessageBus, Pattern, get_message_bus, typed_handler::ShareableMessageHandler,
2481            },
2482            signal::Signal,
2483        };
2484
2485        *get_message_bus().borrow_mut() = MessageBus::default();
2486
2487        let actor = create_registered_actor(clock, cache, trader_id);
2488
2489        let received: Rc<RefCell<Vec<Signal>>> = Rc::new(RefCell::new(Vec::new()));
2490        let received_clone = received.clone();
2491        let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
2492            if let Some(sig) = data.data.as_any().downcast_ref::<Signal>() {
2493                received_clone.borrow_mut().push(sig.clone());
2494            }
2495        });
2496        let pattern: crate::msgbus::MStr<Pattern> = "data.Signal*".to_string().into();
2497        msgbus::subscribe_any(pattern, handler, None);
2498
2499        pyo3::Python::initialize();
2500        Python::attach(|py| {
2501            let int_value: Py<PyAny> = 42_i64.into_py_any_unwrap(py);
2502            let float_value: Py<PyAny> = 3.5_f64.into_py_any_unwrap(py);
2503            let bool_value: Py<PyAny> = true.into_py_any_unwrap(py);
2504            actor.py_publish_signal(py, "count", int_value, 0).unwrap();
2505            actor
2506                .py_publish_signal(py, "ratio", float_value, 0)
2507                .unwrap();
2508            actor
2509                .py_publish_signal(py, "active", bool_value, 0)
2510                .unwrap();
2511        });
2512
2513        let received = received.borrow();
2514        assert_eq!(received.len(), 3);
2515        assert_eq!(received[0].value, "42");
2516        assert_eq!(received[1].value, "3.5");
2517        assert_eq!(received[2].value, "True");
2518    }
2519
2520    #[rstest]
2521    fn test_subscribe_and_unsubscribe_signal_compile(
2522        clock: Rc<RefCell<TestClock>>,
2523        cache: Rc<RefCell<Cache>>,
2524        trader_id: TraderId,
2525    ) {
2526        use crate::msgbus::{MessageBus, get_message_bus};
2527
2528        *get_message_bus().borrow_mut() = MessageBus::default();
2529
2530        let mut actor = create_registered_actor(clock, cache, trader_id);
2531        actor.py_subscribe_signal("example");
2532        actor.py_unsubscribe_signal("example");
2533        actor.py_subscribe_signal("");
2534        actor.py_unsubscribe_signal("");
2535    }
2536
2537    #[rstest]
2538    fn test_publish_data_dispatches_to_python_on_data(
2539        clock: Rc<RefCell<TestClock>>,
2540        cache: Rc<RefCell<Cache>>,
2541        trader_id: TraderId,
2542    ) {
2543        use crate::msgbus::{MessageBus, get_message_bus};
2544
2545        *get_message_bus().borrow_mut() = MessageBus::default();
2546
2547        pyo3::Python::initialize();
2548        Python::attach(|py| {
2549            let py_actor = create_tracking_python_actor(py).unwrap();
2550
2551            let mut rust_actor = PyDataActor::new(None);
2552            rust_actor.set_python_instance(py_actor.clone_ref(py));
2553            rust_actor.register(trader_id, clock, cache).unwrap();
2554            rust_actor.register_in_global_registries();
2555            rust_actor.py_start().unwrap();
2556
2557            let data = stub_custom_data(1, 42, None, None);
2558            rust_actor
2559                .py_subscribe_data(py, data.data_type.clone(), None, None)
2560                .unwrap();
2561
2562            rust_actor.py_publish_data(&data.data_type, &data);
2563            rust_actor.py_publish_data(&data.data_type, &data);
2564
2565            assert!(python_method_was_called(&py_actor, py, "on_data"));
2566            assert_eq!(python_method_call_count(&py_actor, py, "on_data"), 2);
2567        });
2568    }
2569
2570    #[rstest]
2571    fn test_publish_signal_dispatches_to_python_on_signal(
2572        clock: Rc<RefCell<TestClock>>,
2573        cache: Rc<RefCell<Cache>>,
2574        trader_id: TraderId,
2575    ) {
2576        use crate::msgbus::{MessageBus, get_message_bus};
2577
2578        *get_message_bus().borrow_mut() = MessageBus::default();
2579
2580        pyo3::Python::initialize();
2581        Python::attach(|py| {
2582            let py_actor = create_tracking_python_actor(py).unwrap();
2583
2584            let mut rust_actor = PyDataActor::new(None);
2585            rust_actor.set_python_instance(py_actor.clone_ref(py));
2586            rust_actor.register(trader_id, clock, cache).unwrap();
2587            rust_actor.register_in_global_registries();
2588            rust_actor.py_start().unwrap();
2589
2590            rust_actor.py_subscribe_signal("example");
2591            let val1: Py<PyAny> = "1.5".into_py_any_unwrap(py);
2592            let val2: Py<PyAny> = 2.0_f64.into_py_any_unwrap(py);
2593            rust_actor
2594                .py_publish_signal(py, "example", val1, 0)
2595                .unwrap();
2596            rust_actor
2597                .py_publish_signal(py, "example", val2, 1_700_000_000_000_000_000)
2598                .unwrap();
2599
2600            assert!(python_method_was_called(&py_actor, py, "on_signal"));
2601            assert_eq!(python_method_call_count(&py_actor, py, "on_signal"), 2);
2602        });
2603    }
2604
2605    #[rstest]
2606    fn test_unsubscribe_signal_stops_python_dispatch(
2607        clock: Rc<RefCell<TestClock>>,
2608        cache: Rc<RefCell<Cache>>,
2609        trader_id: TraderId,
2610    ) {
2611        use crate::msgbus::{MessageBus, get_message_bus};
2612
2613        *get_message_bus().borrow_mut() = MessageBus::default();
2614
2615        pyo3::Python::initialize();
2616        Python::attach(|py| {
2617            let py_actor = create_tracking_python_actor(py).unwrap();
2618
2619            let mut rust_actor = PyDataActor::new(None);
2620            rust_actor.set_python_instance(py_actor.clone_ref(py));
2621            rust_actor.register(trader_id, clock, cache).unwrap();
2622            rust_actor.register_in_global_registries();
2623            rust_actor.py_start().unwrap();
2624
2625            rust_actor.py_subscribe_signal("example");
2626            let val1: Py<PyAny> = "1".into_py_any_unwrap(py);
2627            let val2: Py<PyAny> = "2".into_py_any_unwrap(py);
2628            rust_actor
2629                .py_publish_signal(py, "example", val1, 0)
2630                .unwrap();
2631
2632            rust_actor.py_unsubscribe_signal("example");
2633            rust_actor
2634                .py_publish_signal(py, "example", val2, 0)
2635                .unwrap();
2636
2637            assert_eq!(python_method_call_count(&py_actor, py, "on_signal"), 1);
2638        });
2639    }
2640
2641    #[rstest]
2642    fn test_subscribe_signal_wildcard_dispatches_all_names_to_python(
2643        clock: Rc<RefCell<TestClock>>,
2644        cache: Rc<RefCell<Cache>>,
2645        trader_id: TraderId,
2646    ) {
2647        use crate::msgbus::{MessageBus, get_message_bus};
2648
2649        *get_message_bus().borrow_mut() = MessageBus::default();
2650
2651        pyo3::Python::initialize();
2652        Python::attach(|py| {
2653            let py_actor = create_tracking_python_actor(py).unwrap();
2654
2655            let mut rust_actor = PyDataActor::new(None);
2656            rust_actor.set_python_instance(py_actor.clone_ref(py));
2657            rust_actor.register(trader_id, clock, cache).unwrap();
2658            rust_actor.register_in_global_registries();
2659            rust_actor.py_start().unwrap();
2660
2661            rust_actor.py_subscribe_signal("");
2662            let val1: Py<PyAny> = "1".into_py_any_unwrap(py);
2663            let val2: Py<PyAny> = "2".into_py_any_unwrap(py);
2664            let val3: Py<PyAny> = "3".into_py_any_unwrap(py);
2665            rust_actor.py_publish_signal(py, "alpha", val1, 0).unwrap();
2666            rust_actor.py_publish_signal(py, "beta", val2, 0).unwrap();
2667            rust_actor.py_publish_signal(py, "gamma", val3, 0).unwrap();
2668
2669            assert_eq!(python_method_call_count(&py_actor, py, "on_signal"), 3);
2670        });
2671    }
2672
2673    #[rstest]
2674    fn test_signal_customdata_unwraps_to_python_signal(
2675        clock: Rc<RefCell<TestClock>>,
2676        cache: Rc<RefCell<Cache>>,
2677        trader_id: TraderId,
2678    ) {
2679        // Exercises the `Signal::to_pyobject` path: a `CustomData` wrapping a
2680        // `Signal` reaches Python `on_data`, and the PyO3 `.data` getter must
2681        // successfully unwrap the inner `Arc<dyn CustomDataTrait>` into a
2682        // Python `Signal`. Without `Signal::to_pyobject`, the getter raises
2683        // `TypeError` and this assertion fails.
2684        use crate::msgbus::{MessageBus, get_message_bus};
2685
2686        *get_message_bus().borrow_mut() = MessageBus::default();
2687
2688        pyo3::Python::initialize();
2689        Python::attach(|py| {
2690            let capture_code = c_str!(
2691                r#"
2692class CapturingActor:
2693    def __init__(self):
2694        self.captured = []
2695
2696    def on_start(self): pass
2697    def on_stop(self): pass
2698    def on_resume(self): pass
2699    def on_reset(self): pass
2700    def on_dispose(self): pass
2701    def on_degrade(self): pass
2702    def on_fault(self): pass
2703    def on_signal(self, signal): pass
2704
2705    def on_data(self, custom):
2706        # Exercise the CustomData.data getter: raises TypeError if the
2707        # inner payload cannot be converted back to a Python object.
2708        inner = custom.data
2709        self.captured.append((type(inner).__name__, inner.name, inner.value))
2710"#
2711            );
2712            py.run(capture_code, None, None).unwrap();
2713            let cls = py.eval(c_str!("CapturingActor"), None, None).unwrap();
2714            let py_actor: Py<PyAny> = cls.call0().unwrap().unbind();
2715
2716            let mut rust_actor = PyDataActor::new(None);
2717            rust_actor.set_python_instance(py_actor.clone_ref(py));
2718            rust_actor.register(trader_id, clock, cache).unwrap();
2719            rust_actor.register_in_global_registries();
2720            rust_actor.py_start().unwrap();
2721
2722            // Subscribe as custom-data for the signal's advertised DataType
2723            // (`data.SignalExample`) so `on_data` fires with the wrapping CustomData.
2724            let data_type = DataType::new("SignalExample", None, None);
2725            rust_actor
2726                .py_subscribe_data(py, data_type, None, None)
2727                .unwrap();
2728
2729            let val: Py<PyAny> = "1.5".into_py_any_unwrap(py);
2730            rust_actor.py_publish_signal(py, "example", val, 0).unwrap();
2731
2732            let captured = py_actor
2733                .bind(py)
2734                .getattr("captured")
2735                .unwrap()
2736                .extract::<Vec<(String, String, String)>>()
2737                .unwrap();
2738            assert_eq!(captured.len(), 1);
2739            assert_eq!(captured[0].0, "Signal");
2740            assert_eq!(captured[0].1, "example");
2741            assert_eq!(captured[0].2, "1.5");
2742        });
2743    }
2744
2745    #[rstest]
2746    fn test_add_and_update_synthetic_via_pyo3(
2747        clock: Rc<RefCell<TestClock>>,
2748        cache: Rc<RefCell<Cache>>,
2749        trader_id: TraderId,
2750    ) {
2751        use nautilus_model::{
2752            identifiers::{InstrumentId, Symbol},
2753            instruments::SyntheticInstrument,
2754        };
2755
2756        let actor = create_registered_actor(clock, cache.clone(), trader_id);
2757
2758        let comp1 = InstrumentId::from_str("BTC-USD.VENUE").unwrap();
2759        let comp2 = InstrumentId::from_str("ETH-USD.VENUE").unwrap();
2760        let formula = format!("({comp1} + {comp2}) / 2.0");
2761        let synthetic = SyntheticInstrument::new(
2762            Symbol::from("SYN"),
2763            2,
2764            vec![comp1, comp2],
2765            &formula,
2766            UnixNanos::default(),
2767            UnixNanos::default(),
2768        );
2769        let synthetic_id = synthetic.id;
2770
2771        actor.py_add_synthetic(synthetic.clone()).unwrap();
2772        assert!(cache.borrow().synthetic(&synthetic_id).is_some());
2773
2774        // Adding again raises
2775        assert!(actor.py_add_synthetic(synthetic).is_err());
2776
2777        let new_formula = format!("{comp1} + {comp2}");
2778        let updated = SyntheticInstrument::new(
2779            Symbol::from("SYN"),
2780            2,
2781            vec![comp1, comp2],
2782            &new_formula,
2783            UnixNanos::default(),
2784            UnixNanos::default(),
2785        );
2786        actor.py_update_synthetic(updated).unwrap();
2787        assert_eq!(
2788            cache.borrow().synthetic(&synthetic_id).unwrap().formula,
2789            new_formula
2790        );
2791
2792        // Updating a non-existent raises
2793        let missing = SyntheticInstrument::new(
2794            Symbol::from("GONE"),
2795            2,
2796            vec![comp1, comp2],
2797            &formula,
2798            UnixNanos::default(),
2799            UnixNanos::default(),
2800        );
2801        assert!(actor.py_update_synthetic(missing).is_err());
2802    }
2803
2804    #[rstest]
2805    fn test_book_at_interval_invalid_interval_ms(
2806        clock: Rc<RefCell<TestClock>>,
2807        cache: Rc<RefCell<Cache>>,
2808        trader_id: TraderId,
2809        audusd_sim: CurrencyPair,
2810    ) {
2811        pyo3::Python::initialize();
2812        let mut actor = create_registered_actor(clock, cache, trader_id);
2813
2814        pyo3::Python::attach(|py| {
2815            let result = actor.py_subscribe_book_at_interval(
2816                py,
2817                audusd_sim.id,
2818                BookType::L2_MBP,
2819                0,
2820                None,
2821                None,
2822                None,
2823            );
2824            assert!(result.is_err());
2825            assert_eq!(
2826                result.unwrap_err().to_string(),
2827                "ValueError: interval_ms must be > 0"
2828            );
2829
2830            let result = actor.py_unsubscribe_book_at_interval(py, audusd_sim.id, 0, None, None);
2831            assert!(result.is_err());
2832            assert_eq!(
2833                result.unwrap_err().to_string(),
2834                "ValueError: interval_ms must be > 0"
2835            );
2836        });
2837    }
2838
2839    #[rstest]
2840    fn test_request_methods_signatures_exist() {
2841        let actor = create_unregistered_actor();
2842        assert!(actor.trader_id().is_none());
2843    }
2844
2845    #[rstest]
2846    fn test_data_actor_trait_implementation(
2847        clock: Rc<RefCell<TestClock>>,
2848        cache: Rc<RefCell<Cache>>,
2849        trader_id: TraderId,
2850    ) {
2851        let actor = create_registered_actor(clock, cache, trader_id);
2852        let state = actor.state();
2853        assert_eq!(state, ComponentState::Ready);
2854    }
2855
2856    fn sample_instrument() -> CurrencyPair {
2857        audusd_sim()
2858    }
2859
2860    fn sample_data() -> CustomData {
2861        stub_custom_data(1, 42, None, None)
2862    }
2863
2864    fn sample_time_event() -> TimeEvent {
2865        TimeEvent::new(
2866            Ustr::from("test_timer"),
2867            UUID4::new(),
2868            UnixNanos::default(),
2869            UnixNanos::default(),
2870        )
2871    }
2872
2873    fn sample_signal() -> Signal {
2874        Signal::new(
2875            Ustr::from("test_signal"),
2876            "1.0".to_string(),
2877            UnixNanos::default(),
2878            UnixNanos::default(),
2879        )
2880    }
2881
2882    fn sample_quote() -> QuoteTick {
2883        let instrument = sample_instrument();
2884        QuoteTick::new(
2885            instrument.id,
2886            Price::from("1.00000"),
2887            Price::from("1.00001"),
2888            Quantity::from(100_000),
2889            Quantity::from(100_000),
2890            UnixNanos::default(),
2891            UnixNanos::default(),
2892        )
2893    }
2894
2895    fn sample_trade() -> TradeTick {
2896        let instrument = sample_instrument();
2897        TradeTick::new(
2898            instrument.id,
2899            Price::from("1.00000"),
2900            Quantity::from(100_000),
2901            AggressorSide::Buyer,
2902            TradeId::new("123456"),
2903            UnixNanos::default(),
2904            UnixNanos::default(),
2905        )
2906    }
2907
2908    fn sample_bar() -> Bar {
2909        let instrument = sample_instrument();
2910        let bar_type =
2911            BarType::from_str(&format!("{}-1-MINUTE-LAST-INTERNAL", instrument.id)).unwrap();
2912        Bar::new(
2913            bar_type,
2914            Price::from("1.00000"),
2915            Price::from("1.00010"),
2916            Price::from("0.99990"),
2917            Price::from("1.00005"),
2918            Quantity::from(100_000),
2919            UnixNanos::default(),
2920            UnixNanos::default(),
2921        )
2922    }
2923
2924    fn sample_book() -> OrderBook {
2925        OrderBook::new(sample_instrument().id, BookType::L2_MBP)
2926    }
2927
2928    fn sample_book_deltas() -> OrderBookDeltas {
2929        let instrument = sample_instrument();
2930        let delta =
2931            OrderBookDelta::clear(instrument.id, 0, UnixNanos::default(), UnixNanos::default());
2932        OrderBookDeltas::new(instrument.id, vec![delta])
2933    }
2934
2935    fn sample_mark_price() -> MarkPriceUpdate {
2936        MarkPriceUpdate::new(
2937            sample_instrument().id,
2938            Price::from("1.00000"),
2939            UnixNanos::default(),
2940            UnixNanos::default(),
2941        )
2942    }
2943
2944    fn sample_index_price() -> IndexPriceUpdate {
2945        IndexPriceUpdate::new(
2946            sample_instrument().id,
2947            Price::from("1.00000"),
2948            UnixNanos::default(),
2949            UnixNanos::default(),
2950        )
2951    }
2952
2953    fn sample_funding_rate() -> FundingRateUpdate {
2954        FundingRateUpdate::new(
2955            sample_instrument().id,
2956            "0.0001".parse().unwrap(),
2957            None,
2958            None,
2959            UnixNanos::default(),
2960            UnixNanos::default(),
2961        )
2962    }
2963
2964    fn sample_instrument_status() -> InstrumentStatus {
2965        InstrumentStatus::new(
2966            sample_instrument().id,
2967            MarketStatusAction::Trading,
2968            UnixNanos::default(),
2969            UnixNanos::default(),
2970            None,
2971            None,
2972            None,
2973            None,
2974            None,
2975        )
2976    }
2977
2978    fn sample_instrument_close() -> InstrumentClose {
2979        InstrumentClose::new(
2980            sample_instrument().id,
2981            Price::from("1.00000"),
2982            InstrumentCloseType::EndOfSession,
2983            UnixNanos::default(),
2984            UnixNanos::default(),
2985        )
2986    }
2987
2988    fn sample_option_greeks() -> OptionGreeks {
2989        OptionGreeks {
2990            instrument_id: sample_instrument().id,
2991            convention: GreeksConvention::BlackScholes,
2992            greeks: OptionGreekValues {
2993                delta: 0.55,
2994                gamma: 0.03,
2995                vega: 0.12,
2996                theta: -0.05,
2997                rho: 0.01,
2998            },
2999            mark_iv: Some(0.25),
3000            bid_iv: None,
3001            ask_iv: None,
3002            underlying_price: None,
3003            open_interest: None,
3004            ts_event: UnixNanos::default(),
3005            ts_init: UnixNanos::default(),
3006        }
3007    }
3008
3009    fn sample_option_chain() -> OptionChainSlice {
3010        OptionChainSlice {
3011            series_id: OptionSeriesId::new(
3012                Venue::from("SIM"),
3013                Ustr::from("AUD"),
3014                Ustr::from("USD"),
3015                UnixNanos::from(1_711_036_800_000_000_000),
3016            ),
3017            atm_strike: None,
3018            calls: Default::default(),
3019            puts: Default::default(),
3020            ts_event: UnixNanos::default(),
3021            ts_init: UnixNanos::default(),
3022        }
3023    }
3024
3025    #[cfg(feature = "defi")]
3026    fn sample_block() -> Block {
3027        Block::new(
3028            "0x1234567890abcdef".to_string(),
3029            "0xabcdef1234567890".to_string(),
3030            12345,
3031            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0".into(),
3032            21000,
3033            20000,
3034            UnixNanos::default(),
3035            Some(Blockchain::Ethereum),
3036        )
3037    }
3038
3039    #[cfg(feature = "defi")]
3040    fn sample_pool_components() -> (Arc<Chain>, Arc<Dex>, Pool) {
3041        let chain = Arc::new(Chain::new(Blockchain::Ethereum, 1));
3042        let dex = Arc::new(Dex::new(
3043            Chain::new(Blockchain::Ethereum, 1),
3044            DexType::UniswapV3,
3045            "0x1F98431c8aD98523631AE4a59f267346ea31F984",
3046            0,
3047            AmmType::CLAMM,
3048            "PoolCreated",
3049            "Swap",
3050            "Mint",
3051            "Burn",
3052            "Collect",
3053        ));
3054        let token0 = Token::new(
3055            chain.clone(),
3056            "0xa0b86a33e6441c8c06dd7b111a8c4e82e2b2a5e1"
3057                .parse()
3058                .unwrap(),
3059            "USDC".into(),
3060            "USD Coin".into(),
3061            6,
3062        );
3063        let token1 = Token::new(
3064            chain.clone(),
3065            "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2"
3066                .parse()
3067                .unwrap(),
3068            "WETH".into(),
3069            "Wrapped Ether".into(),
3070            18,
3071        );
3072        let pool_address = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
3073            .parse()
3074            .unwrap();
3075        let pool_identifier: PoolIdentifier = "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8"
3076            .parse()
3077            .unwrap();
3078        let pool = Pool::new(
3079            chain.clone(),
3080            dex.clone(),
3081            pool_address,
3082            pool_identifier,
3083            12345,
3084            token0,
3085            token1,
3086            Some(500),
3087            Some(10),
3088            UnixNanos::default(),
3089        );
3090
3091        (chain, dex, pool)
3092    }
3093
3094    #[cfg(feature = "defi")]
3095    fn sample_pool_swap() -> PoolSwap {
3096        let (chain, dex, pool) = sample_pool_components();
3097        PoolSwap::new(
3098            chain,
3099            dex,
3100            pool.instrument_id,
3101            pool.pool_identifier,
3102            12345,
3103            "0xabc123".to_string(),
3104            0,
3105            0,
3106            None,
3107            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3108                .parse()
3109                .unwrap(),
3110            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3111                .parse()
3112                .unwrap(),
3113            I256::from_str("1000000000000000000").unwrap(),
3114            I256::from_str("400000000000000").unwrap(),
3115            U160::from(59000000000000u128),
3116            1000000,
3117            100,
3118        )
3119    }
3120
3121    #[cfg(feature = "defi")]
3122    fn sample_pool_liquidity_update() -> PoolLiquidityUpdate {
3123        let (chain, dex, pool) = sample_pool_components();
3124        PoolLiquidityUpdate::new(
3125            chain,
3126            dex,
3127            pool.instrument_id,
3128            pool.pool_identifier,
3129            PoolLiquidityUpdateType::Mint,
3130            12345,
3131            "0xabc123".to_string(),
3132            0,
3133            0,
3134            Some(
3135                "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3136                    .parse()
3137                    .unwrap(),
3138            ),
3139            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3140                .parse()
3141                .unwrap(),
3142            1000,
3143            U256::from(1_000u64),
3144            U256::from(2_000u64),
3145            -10,
3146            10,
3147            Some(UnixNanos::default()),
3148        )
3149    }
3150
3151    #[cfg(feature = "defi")]
3152    fn sample_pool_fee_collect() -> PoolFeeCollect {
3153        let (chain, dex, pool) = sample_pool_components();
3154        PoolFeeCollect::new(
3155            chain,
3156            dex,
3157            pool.instrument_id,
3158            pool.pool_identifier,
3159            12345,
3160            "0xabc123".to_string(),
3161            0,
3162            0,
3163            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3164                .parse()
3165                .unwrap(),
3166            100,
3167            200,
3168            -10,
3169            10,
3170            Some(UnixNanos::default()),
3171        )
3172    }
3173
3174    #[cfg(feature = "defi")]
3175    fn sample_pool_flash() -> PoolFlash {
3176        let (chain, dex, pool) = sample_pool_components();
3177        PoolFlash::new(
3178            chain,
3179            dex,
3180            pool.instrument_id,
3181            pool.pool_identifier,
3182            12345,
3183            "0xabc123".to_string(),
3184            0,
3185            0,
3186            Some(UnixNanos::default()),
3187            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3188                .parse()
3189                .unwrap(),
3190            "0x742E4422b21FB8B4dF463F28689AC98bD56c39e0"
3191                .parse()
3192                .unwrap(),
3193            U256::from(100u64),
3194            U256::from(200u64),
3195            U256::from(101u64),
3196            U256::from(201u64),
3197        )
3198    }
3199
3200    const TRACKING_ACTOR_CODE: &std::ffi::CStr = c_str!(
3201        r#"
3202class TrackingActor:
3203    """A mock Python actor that tracks all method calls."""
3204
3205    TRACKED_METHODS = {
3206        "on_start",
3207        "on_stop",
3208        "on_resume",
3209        "on_reset",
3210        "on_dispose",
3211        "on_degrade",
3212        "on_fault",
3213        "on_time_event",
3214        "on_data",
3215        "on_signal",
3216        "on_instrument",
3217        "on_quote",
3218        "on_trade",
3219        "on_bar",
3220        "on_book",
3221        "on_book_deltas",
3222        "on_mark_price",
3223        "on_index_price",
3224        "on_funding_rate",
3225        "on_instrument_status",
3226        "on_instrument_close",
3227        "on_option_greeks",
3228        "on_option_chain",
3229        "on_historical_data",
3230        "on_historical_quotes",
3231        "on_historical_trades",
3232        "on_historical_funding_rates",
3233        "on_historical_bars",
3234        "on_historical_mark_prices",
3235        "on_historical_index_prices",
3236        "on_block",
3237        "on_pool",
3238        "on_pool_swap",
3239        "on_pool_liquidity_update",
3240        "on_pool_fee_collect",
3241        "on_pool_flash",
3242    }
3243
3244    def __init__(self):
3245        self.calls = []
3246
3247    def _record(self, method_name, *args):
3248        self.calls.append((method_name, args))
3249
3250    def was_called(self, method_name):
3251        return any(call[0] == method_name for call in self.calls)
3252
3253    def call_count(self, method_name):
3254        return sum(1 for call in self.calls if call[0] == method_name)
3255
3256    def __getattr__(self, name):
3257        if name in self.TRACKED_METHODS:
3258            return lambda *args: self._record(name, *args)
3259        raise AttributeError(name)
3260"#
3261    );
3262
3263    fn create_tracking_python_actor(py: Python<'_>) -> PyResult<Py<PyAny>> {
3264        py.run(TRACKING_ACTOR_CODE, None, None)?;
3265        let tracking_actor_class = py.eval(c_str!("TrackingActor"), None, None)?;
3266        let instance = tracking_actor_class.call0()?;
3267        Ok(instance.unbind())
3268    }
3269
3270    fn python_method_was_called(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> bool {
3271        py_actor
3272            .call_method1(py, "was_called", (method_name,))
3273            .and_then(|r| r.extract::<bool>(py))
3274            .unwrap_or(false)
3275    }
3276
3277    fn python_method_call_count(py_actor: &Py<PyAny>, py: Python<'_>, method_name: &str) -> i32 {
3278        py_actor
3279            .call_method1(py, "call_count", (method_name,))
3280            .and_then(|r| r.extract::<i32>(py))
3281            .unwrap_or(0)
3282    }
3283
3284    fn assert_python_dispatch<F>(
3285        py: Python<'_>,
3286        clock: Rc<RefCell<TestClock>>,
3287        cache: Rc<RefCell<Cache>>,
3288        trader_id: TraderId,
3289        method_name: &str,
3290        invoke: F,
3291    ) where
3292        F: FnOnce(&mut PyDataActor) -> anyhow::Result<()>,
3293    {
3294        let py_actor = create_tracking_python_actor(py).unwrap();
3295
3296        let mut rust_actor = PyDataActor::new(None);
3297        rust_actor.set_python_instance(py_actor.clone_ref(py));
3298        rust_actor.register(trader_id, clock, cache).unwrap();
3299
3300        let result = invoke(&mut rust_actor);
3301
3302        assert!(result.is_ok());
3303        assert!(python_method_was_called(&py_actor, py, method_name));
3304        assert_eq!(python_method_call_count(&py_actor, py, method_name), 1);
3305    }
3306
3307    #[rstest]
3308    #[case("on_start")]
3309    #[case("on_stop")]
3310    #[case("on_resume")]
3311    #[case("on_reset")]
3312    #[case("on_dispose")]
3313    #[case("on_degrade")]
3314    #[case("on_fault")]
3315    fn test_python_dispatch_lifecycle_matrix(
3316        clock: Rc<RefCell<TestClock>>,
3317        cache: Rc<RefCell<Cache>>,
3318        trader_id: TraderId,
3319        #[case] method_name: &str,
3320    ) {
3321        pyo3::Python::initialize();
3322        Python::attach(|py| {
3323            assert_python_dispatch(py, clock, cache, trader_id, method_name, |rust_actor| {
3324                match method_name {
3325                    "on_start" => DataActor::on_start(rust_actor.inner_mut()),
3326                    "on_stop" => DataActor::on_stop(rust_actor.inner_mut()),
3327                    "on_resume" => DataActor::on_resume(rust_actor.inner_mut()),
3328                    "on_reset" => DataActor::on_reset(rust_actor.inner_mut()),
3329                    "on_dispose" => DataActor::on_dispose(rust_actor.inner_mut()),
3330                    "on_degrade" => DataActor::on_degrade(rust_actor.inner_mut()),
3331                    "on_fault" => DataActor::on_fault(rust_actor.inner_mut()),
3332                    _ => unreachable!("unhandled lifecycle case: {method_name}"),
3333                }
3334            });
3335        });
3336    }
3337
3338    #[rstest]
3339    #[case("on_time_event")]
3340    #[case("on_data")]
3341    #[case("on_signal")]
3342    #[case("on_instrument")]
3343    #[case("on_quote")]
3344    #[case("on_trade")]
3345    #[case("on_bar")]
3346    #[case("on_book")]
3347    #[case("on_book_deltas")]
3348    #[case("on_mark_price")]
3349    #[case("on_index_price")]
3350    #[case("on_funding_rate")]
3351    #[case("on_instrument_status")]
3352    #[case("on_instrument_close")]
3353    #[case("on_option_greeks")]
3354    #[case("on_option_chain")]
3355    fn test_python_dispatch_typed_callback_matrix(
3356        clock: Rc<RefCell<TestClock>>,
3357        cache: Rc<RefCell<Cache>>,
3358        trader_id: TraderId,
3359        #[case] method_name: &str,
3360    ) {
3361        pyo3::Python::initialize();
3362        Python::attach(|py| {
3363            assert_python_dispatch(py, clock, cache, trader_id, method_name, |rust_actor| {
3364                match method_name {
3365                    "on_time_event" => {
3366                        let event = sample_time_event();
3367                        rust_actor.inner_mut().on_time_event(&event)
3368                    }
3369                    "on_data" => {
3370                        let data = sample_data();
3371                        rust_actor.inner_mut().on_data(&data)
3372                    }
3373                    "on_signal" => {
3374                        let signal = sample_signal();
3375                        rust_actor.inner_mut().on_signal(&signal)
3376                    }
3377                    "on_instrument" => {
3378                        let instrument = InstrumentAny::CurrencyPair(sample_instrument());
3379                        rust_actor.inner_mut().on_instrument(&instrument)
3380                    }
3381                    "on_quote" => {
3382                        let quote = sample_quote();
3383                        rust_actor.inner_mut().on_quote(&quote)
3384                    }
3385                    "on_trade" => {
3386                        let trade = sample_trade();
3387                        rust_actor.inner_mut().on_trade(&trade)
3388                    }
3389                    "on_bar" => {
3390                        let bar = sample_bar();
3391                        rust_actor.inner_mut().on_bar(&bar)
3392                    }
3393                    "on_book" => {
3394                        let book = sample_book();
3395                        rust_actor.inner_mut().on_book(&book)
3396                    }
3397                    "on_book_deltas" => {
3398                        let deltas = sample_book_deltas();
3399                        rust_actor.inner_mut().on_book_deltas(&deltas)
3400                    }
3401                    "on_mark_price" => {
3402                        let update = sample_mark_price();
3403                        rust_actor.inner_mut().on_mark_price(&update)
3404                    }
3405                    "on_index_price" => {
3406                        let update = sample_index_price();
3407                        rust_actor.inner_mut().on_index_price(&update)
3408                    }
3409                    "on_funding_rate" => {
3410                        let update = sample_funding_rate();
3411                        rust_actor.inner_mut().on_funding_rate(&update)
3412                    }
3413                    "on_instrument_status" => {
3414                        let status = sample_instrument_status();
3415                        rust_actor.inner_mut().on_instrument_status(&status)
3416                    }
3417                    "on_instrument_close" => {
3418                        let close = sample_instrument_close();
3419                        rust_actor.inner_mut().on_instrument_close(&close)
3420                    }
3421                    "on_option_greeks" => {
3422                        let greeks = sample_option_greeks();
3423                        rust_actor.inner_mut().on_option_greeks(&greeks)
3424                    }
3425                    "on_option_chain" => {
3426                        let chain = sample_option_chain();
3427                        rust_actor.inner_mut().on_option_chain(&chain)
3428                    }
3429                    _ => unreachable!("unhandled typed callback case: {method_name}"),
3430                }
3431            });
3432        });
3433    }
3434
3435    #[rstest]
3436    #[case("on_historical_data")]
3437    #[case("on_historical_quotes")]
3438    #[case("on_historical_trades")]
3439    #[case("on_historical_funding_rates")]
3440    #[case("on_historical_bars")]
3441    #[case("on_historical_mark_prices")]
3442    #[case("on_historical_index_prices")]
3443    fn test_python_dispatch_historical_callback_matrix(
3444        clock: Rc<RefCell<TestClock>>,
3445        cache: Rc<RefCell<Cache>>,
3446        trader_id: TraderId,
3447        #[case] method_name: &str,
3448    ) {
3449        pyo3::Python::initialize();
3450        Python::attach(|py| {
3451            assert_python_dispatch(py, clock, cache, trader_id, method_name, |rust_actor| {
3452                match method_name {
3453                    "on_historical_data" => {
3454                        let data = sample_data();
3455                        rust_actor.inner_mut().on_historical_data(&data)
3456                    }
3457                    "on_historical_quotes" => {
3458                        let quotes = vec![sample_quote()];
3459                        rust_actor.inner_mut().on_historical_quotes(&quotes)
3460                    }
3461                    "on_historical_trades" => {
3462                        let trades = vec![sample_trade()];
3463                        rust_actor.inner_mut().on_historical_trades(&trades)
3464                    }
3465                    "on_historical_funding_rates" => {
3466                        let funding_rates = vec![sample_funding_rate()];
3467                        rust_actor
3468                            .inner_mut()
3469                            .on_historical_funding_rates(&funding_rates)
3470                    }
3471                    "on_historical_bars" => {
3472                        let bars = vec![sample_bar()];
3473                        rust_actor.inner_mut().on_historical_bars(&bars)
3474                    }
3475                    "on_historical_mark_prices" => {
3476                        let mark_prices = vec![sample_mark_price()];
3477                        rust_actor
3478                            .inner_mut()
3479                            .on_historical_mark_prices(&mark_prices)
3480                    }
3481                    "on_historical_index_prices" => {
3482                        let index_prices = vec![sample_index_price()];
3483                        rust_actor
3484                            .inner_mut()
3485                            .on_historical_index_prices(&index_prices)
3486                    }
3487                    _ => unreachable!("unhandled historical callback case: {method_name}"),
3488                }
3489            });
3490        });
3491    }
3492
3493    #[cfg(feature = "defi")]
3494    #[rstest]
3495    #[case("on_block")]
3496    #[case("on_pool")]
3497    #[case("on_pool_swap")]
3498    #[case("on_pool_liquidity_update")]
3499    #[case("on_pool_fee_collect")]
3500    #[case("on_pool_flash")]
3501    fn test_python_dispatch_defi_callback_matrix(
3502        clock: Rc<RefCell<TestClock>>,
3503        cache: Rc<RefCell<Cache>>,
3504        trader_id: TraderId,
3505        #[case] method_name: &str,
3506    ) {
3507        pyo3::Python::initialize();
3508        Python::attach(|py| {
3509            assert_python_dispatch(py, clock, cache, trader_id, method_name, |rust_actor| {
3510                match method_name {
3511                    "on_block" => {
3512                        let block = sample_block();
3513                        rust_actor.inner_mut().on_block(&block)
3514                    }
3515                    "on_pool" => {
3516                        let (_chain, _dex, pool) = sample_pool_components();
3517                        rust_actor.inner_mut().on_pool(&pool)
3518                    }
3519                    "on_pool_swap" => {
3520                        let swap = sample_pool_swap();
3521                        rust_actor.inner_mut().on_pool_swap(&swap)
3522                    }
3523                    "on_pool_liquidity_update" => {
3524                        let update = sample_pool_liquidity_update();
3525                        rust_actor.inner_mut().on_pool_liquidity_update(&update)
3526                    }
3527                    "on_pool_fee_collect" => {
3528                        let collect = sample_pool_fee_collect();
3529                        rust_actor.inner_mut().on_pool_fee_collect(&collect)
3530                    }
3531                    "on_pool_flash" => {
3532                        let flash = sample_pool_flash();
3533                        rust_actor.inner_mut().on_pool_flash(&flash)
3534                    }
3535                    _ => unreachable!("unhandled defi callback case: {method_name}"),
3536                }
3537            });
3538        });
3539    }
3540
3541    #[rstest]
3542    fn test_python_dispatch_multiple_calls_tracked(
3543        clock: Rc<RefCell<TestClock>>,
3544        cache: Rc<RefCell<Cache>>,
3545        trader_id: TraderId,
3546        audusd_sim: CurrencyPair,
3547    ) {
3548        pyo3::Python::initialize();
3549        Python::attach(|py| {
3550            let py_actor = create_tracking_python_actor(py).unwrap();
3551
3552            let mut rust_actor = PyDataActor::new(None);
3553            rust_actor.set_python_instance(py_actor.clone_ref(py));
3554            rust_actor.register(trader_id, clock, cache).unwrap();
3555
3556            let quote = QuoteTick::new(
3557                audusd_sim.id,
3558                Price::from("1.00000"),
3559                Price::from("1.00001"),
3560                Quantity::from(100_000),
3561                Quantity::from(100_000),
3562                UnixNanos::default(),
3563                UnixNanos::default(),
3564            );
3565
3566            rust_actor.inner_mut().on_quote(&quote).unwrap();
3567            rust_actor.inner_mut().on_quote(&quote).unwrap();
3568            rust_actor.inner_mut().on_quote(&quote).unwrap();
3569
3570            assert_eq!(python_method_call_count(&py_actor, py, "on_quote"), 3);
3571        });
3572    }
3573
3574    #[rstest]
3575    fn test_python_dispatch_no_call_when_py_self_not_set(
3576        clock: Rc<RefCell<TestClock>>,
3577        cache: Rc<RefCell<Cache>>,
3578        trader_id: TraderId,
3579    ) {
3580        pyo3::Python::initialize();
3581        Python::attach(|_py| {
3582            let mut rust_actor = PyDataActor::new(None);
3583            rust_actor.register(trader_id, clock, cache).unwrap();
3584
3585            // When py_self is None, the dispatch returns Ok(()) without calling Python
3586            let result = DataActor::on_start(rust_actor.inner_mut());
3587            assert!(result.is_ok());
3588        });
3589    }
3590
3591    #[rstest]
3592    fn test_python_on_historical_data_rejects_non_custom_data(
3593        clock: Rc<RefCell<TestClock>>,
3594        cache: Rc<RefCell<Cache>>,
3595        trader_id: TraderId,
3596    ) {
3597        pyo3::Python::initialize();
3598        let mut rust_actor = PyDataActor::new(None);
3599        rust_actor.register(trader_id, clock, cache).unwrap();
3600
3601        let non_custom: String = "not CustomData".to_string();
3602        let result = rust_actor.inner_mut().on_historical_data(&non_custom);
3603
3604        assert!(result.is_err());
3605        assert!(result.unwrap_err().to_string().contains("unsupported type"));
3606    }
3607}