Skip to main content

nautilus_common/actor/
data_actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell, RefMut},
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroUsize,
22    ops::{Deref, DerefMut},
23    rc::Rc,
24    sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{Params, UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33    Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36    data::{
37        Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38        MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
39        close::InstrumentClose,
40        option_chain::{OptionChainSlice, OptionGreeks, StrikeRange},
41    },
42    enums::BookType,
43    events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
44    identifiers::{ActorId, ClientId, ComponentId, InstrumentId, OptionSeriesId, TraderId, Venue},
45    instruments::{InstrumentAny, SyntheticInstrument},
46    orderbook::OrderBook,
47};
48use serde::{Deserialize, Serialize};
49use ustr::Ustr;
50
51#[cfg(feature = "indicators")]
52use super::indicators::Indicators;
53use super::{
54    Actor,
55    registry::{get_actor_unchecked, try_get_actor_unchecked},
56};
57#[cfg(feature = "defi")]
58use crate::defi;
59#[cfg(feature = "defi")]
60#[allow(unused_imports)]
61use crate::defi::data_actor as _; // Brings DeFi impl blocks into scope
62use crate::{
63    cache::Cache,
64    clock::Clock,
65    component::Component,
66    enums::{ComponentState, ComponentTrigger},
67    logging::{CMD, RECV, REQ, SEND},
68    messages::{
69        data::{
70            BarsResponse, BookResponse, CustomDataResponse, DataCommand, FundingRatesResponse,
71            InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
72            RequestBookSnapshot, RequestCommand, RequestCustomData, RequestFundingRates,
73            RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
74            SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
75            SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
76            SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
77            SubscribeMarkPrices, SubscribeOptionChain, SubscribeOptionGreeks, SubscribeQuotes,
78            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
79            UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData,
80            UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
81            UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
82            UnsubscribeMarkPrices, UnsubscribeOptionChain, UnsubscribeOptionGreeks,
83            UnsubscribeQuotes, UnsubscribeTrades,
84        },
85        system::ShutdownSystem,
86    },
87    msgbus::{
88        self, MStr, Pattern, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
89        switchboard::{
90            MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
91            get_custom_topic, get_funding_rate_topic, get_index_price_topic,
92            get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
93            get_instruments_pattern, get_mark_price_topic, get_option_chain_topic,
94            get_option_greeks_topic, get_order_cancels_topic, get_order_fills_topic,
95            get_quotes_topic, get_signal_pattern, get_trades_topic,
96        },
97    },
98    signal::Signal,
99    timer::{TimeEvent, TimeEventCallback},
100};
101
102/// Common configuration for [`DataActor`] based components.
103#[derive(Debug, Clone, Deserialize, Serialize)]
104#[serde(default, deny_unknown_fields)]
105#[cfg_attr(
106    feature = "python",
107    pyo3::pyclass(
108        module = "nautilus_trader.core.nautilus_pyo3.common",
109        subclass,
110        from_py_object
111    )
112)]
113#[cfg_attr(
114    feature = "python",
115    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
116)]
117pub struct DataActorConfig {
118    /// The custom identifier for the Actor.
119    pub actor_id: Option<ActorId>,
120    /// If events should be logged.
121    pub log_events: bool,
122    /// If commands should be logged.
123    pub log_commands: bool,
124}
125
126impl Default for DataActorConfig {
127    fn default() -> Self {
128        Self {
129            actor_id: None,
130            log_events: true,
131            log_commands: true,
132        }
133    }
134}
135
136/// Configuration for creating actors from importable paths.
137#[derive(Debug, Clone, Deserialize, Serialize)]
138#[serde(deny_unknown_fields)]
139#[cfg_attr(
140    feature = "python",
141    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
142)]
143#[cfg_attr(
144    feature = "python",
145    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
146)]
147pub struct ImportableActorConfig {
148    /// The fully qualified name of the Actor class.
149    pub actor_path: String,
150    /// The fully qualified name of the Actor config class.
151    pub config_path: String,
152    /// The actor configuration as a dictionary.
153    pub config: HashMap<String, serde_json::Value>,
154}
155
156type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
157
158pub trait DataActor:
159    Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
160{
161    /// Actions to be performed when the actor state is saved.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if saving the actor state fails.
166    fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
167        Ok(IndexMap::new())
168    }
169
170    /// Actions to be performed when the actor state is loaded.
171    ///
172    /// # Errors
173    ///
174    /// Returns an error if loading the actor state fails.
175    #[allow(unused_variables)]
176    fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
177        Ok(())
178    }
179
180    /// Actions to be performed on start.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if starting the actor fails.
185    fn on_start(&mut self) -> anyhow::Result<()> {
186        log::warn!(
187            "The `on_start` handler was called when not overridden, \
188            it's expected that any actions required when starting the actor \
189            occur here, such as subscribing/requesting data"
190        );
191        Ok(())
192    }
193
194    /// Actions to be performed on stop.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if stopping the actor fails.
199    fn on_stop(&mut self) -> anyhow::Result<()> {
200        log::warn!(
201            "The `on_stop` handler was called when not overridden, \
202            it's expected that any actions required when stopping the actor \
203            occur here, such as unsubscribing from data",
204        );
205        Ok(())
206    }
207
208    /// Actions to be performed on resume.
209    ///
210    /// # Errors
211    ///
212    /// Returns an error if resuming the actor fails.
213    fn on_resume(&mut self) -> anyhow::Result<()> {
214        log::warn!(
215            "The `on_resume` handler was called when not overridden, \
216            it's expected that any actions required when resuming the actor \
217            following a stop occur here"
218        );
219        Ok(())
220    }
221
222    /// Actions to be performed on reset.
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if resetting the actor fails.
227    fn on_reset(&mut self) -> anyhow::Result<()> {
228        log::warn!(
229            "The `on_reset` handler was called when not overridden, \
230            it's expected that any actions required when resetting the actor \
231            occur here, such as resetting indicators and other state"
232        );
233        Ok(())
234    }
235
236    /// Actions to be performed on dispose.
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if disposing the actor fails.
241    fn on_dispose(&mut self) -> anyhow::Result<()> {
242        Ok(())
243    }
244
245    /// Actions to be performed on degrade.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if degrading the actor fails.
250    fn on_degrade(&mut self) -> anyhow::Result<()> {
251        Ok(())
252    }
253
254    /// Actions to be performed on fault.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if faulting the actor fails.
259    fn on_fault(&mut self) -> anyhow::Result<()> {
260        Ok(())
261    }
262
263    /// Actions to be performed when receiving a time event.
264    ///
265    /// # Errors
266    ///
267    /// Returns an error if handling the time event fails.
268    #[allow(unused_variables)]
269    fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
270        Ok(())
271    }
272
273    /// Actions to be performed when receiving custom data.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if handling the data fails.
278    #[allow(unused_variables)]
279    fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
280        Ok(())
281    }
282
283    /// Actions to be performed when receiving a signal.
284    ///
285    /// # Errors
286    ///
287    /// Returns an error if handling the signal fails.
288    #[allow(unused_variables)]
289    fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
290        Ok(())
291    }
292
293    /// Actions to be performed when receiving an instrument.
294    ///
295    /// # Errors
296    ///
297    /// Returns an error if handling the instrument fails.
298    #[allow(unused_variables)]
299    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
300        Ok(())
301    }
302
303    /// Actions to be performed when receiving order book deltas.
304    ///
305    /// # Errors
306    ///
307    /// Returns an error if handling the book deltas fails.
308    #[allow(unused_variables)]
309    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
310        Ok(())
311    }
312
313    /// Actions to be performed when receiving an order book.
314    ///
315    /// # Errors
316    ///
317    /// Returns an error if handling the book fails.
318    #[allow(unused_variables)]
319    fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
320        Ok(())
321    }
322
323    /// Actions to be performed when receiving a quote.
324    ///
325    /// # Errors
326    ///
327    /// Returns an error if handling the quote fails.
328    #[allow(unused_variables)]
329    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
330        Ok(())
331    }
332
333    /// Actions to be performed when receiving a trade.
334    ///
335    /// # Errors
336    ///
337    /// Returns an error if handling the trade fails.
338    #[allow(unused_variables)]
339    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
340        Ok(())
341    }
342
343    /// Actions to be performed when receiving a bar.
344    ///
345    /// # Errors
346    ///
347    /// Returns an error if handling the bar fails.
348    #[allow(unused_variables)]
349    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
350        Ok(())
351    }
352
353    /// Actions to be performed when receiving a mark price update.
354    ///
355    /// # Errors
356    ///
357    /// Returns an error if handling the mark price update fails.
358    #[allow(unused_variables)]
359    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
360        Ok(())
361    }
362
363    /// Actions to be performed when receiving an index price update.
364    ///
365    /// # Errors
366    ///
367    /// Returns an error if handling the index price update fails.
368    #[allow(unused_variables)]
369    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
370        Ok(())
371    }
372
373    /// Actions to be performed when receiving a funding rate update.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if handling the funding rate update fails.
378    #[allow(unused_variables)]
379    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
380        Ok(())
381    }
382
383    /// Actions to be performed when receiving exchange-provided option greeks.
384    ///
385    /// # Errors
386    ///
387    /// Returns an error if handling the option greeks fails.
388    #[allow(unused_variables)]
389    fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
390        Ok(())
391    }
392
393    /// Actions to be performed when receiving an option chain slice snapshot.
394    ///
395    /// # Errors
396    ///
397    /// Returns an error if handling the option chain slice fails.
398    #[allow(unused_variables)]
399    fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
400        Ok(())
401    }
402
403    /// Actions to be performed when receiving an instrument status update.
404    ///
405    /// # Errors
406    ///
407    /// Returns an error if handling the instrument status update fails.
408    #[allow(unused_variables)]
409    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
410        Ok(())
411    }
412
413    /// Actions to be performed when receiving an instrument close update.
414    ///
415    /// # Errors
416    ///
417    /// Returns an error if handling the instrument close update fails.
418    #[allow(unused_variables)]
419    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
420        Ok(())
421    }
422
423    /// Actions to be performed when receiving an order filled event.
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if handling the order filled event fails.
428    #[allow(unused_variables)]
429    fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
430        Ok(())
431    }
432
433    /// Actions to be performed when receiving an order canceled event.
434    ///
435    /// # Errors
436    ///
437    /// Returns an error if handling the order canceled event fails.
438    #[allow(unused_variables)]
439    fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
440        Ok(())
441    }
442
443    #[cfg(feature = "defi")]
444    /// Actions to be performed when receiving a block.
445    ///
446    /// # Errors
447    ///
448    /// Returns an error if handling the block fails.
449    #[allow(unused_variables)]
450    fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
451        Ok(())
452    }
453
454    #[cfg(feature = "defi")]
455    /// Actions to be performed when receiving a pool.
456    ///
457    /// # Errors
458    ///
459    /// Returns an error if handling the pool fails.
460    #[allow(unused_variables)]
461    fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
462        Ok(())
463    }
464
465    #[cfg(feature = "defi")]
466    /// Actions to be performed when receiving a pool swap.
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if handling the pool swap fails.
471    #[allow(unused_variables)]
472    fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
473        Ok(())
474    }
475
476    #[cfg(feature = "defi")]
477    /// Actions to be performed when receiving a pool liquidity update.
478    ///
479    /// # Errors
480    ///
481    /// Returns an error if handling the pool liquidity update fails.
482    #[allow(unused_variables)]
483    fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
484        Ok(())
485    }
486
487    #[cfg(feature = "defi")]
488    /// Actions to be performed when receiving a pool fee collect event.
489    ///
490    /// # Errors
491    ///
492    /// Returns an error if handling the pool fee collect fails.
493    #[allow(unused_variables)]
494    fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
495        Ok(())
496    }
497
498    #[cfg(feature = "defi")]
499    /// Actions to be performed when receiving a pool flash event.
500    ///
501    /// # Errors
502    ///
503    /// Returns an error if handling the pool flash fails.
504    #[allow(unused_variables)]
505    fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
506        Ok(())
507    }
508
509    /// Actions to be performed when receiving historical data.
510    ///
511    /// # Errors
512    ///
513    /// Returns an error if handling the historical data fails.
514    #[allow(unused_variables)]
515    fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
516        Ok(())
517    }
518
519    /// Actions to be performed when receiving historical quotes.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if handling the historical quotes fails.
524    #[allow(unused_variables)]
525    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
526        Ok(())
527    }
528
529    /// Actions to be performed when receiving historical trades.
530    ///
531    /// # Errors
532    ///
533    /// Returns an error if handling the historical trades fails.
534    #[allow(unused_variables)]
535    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
536        Ok(())
537    }
538
539    /// Actions to be performed when receiving historical funding rates.
540    ///
541    /// # Errors
542    ///
543    /// Returns an error if handling the historical funding rates fails.
544    #[allow(unused_variables)]
545    fn on_historical_funding_rates(
546        &mut self,
547        funding_rates: &[FundingRateUpdate],
548    ) -> anyhow::Result<()> {
549        Ok(())
550    }
551
552    /// Actions to be performed when receiving historical bars.
553    ///
554    /// # Errors
555    ///
556    /// Returns an error if handling the historical bars fails.
557    #[allow(unused_variables)]
558    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
559        Ok(())
560    }
561
562    /// Actions to be performed when receiving historical mark prices.
563    ///
564    /// # Errors
565    ///
566    /// Returns an error if handling the historical mark prices fails.
567    #[allow(unused_variables)]
568    fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
569        Ok(())
570    }
571
572    /// Actions to be performed when receiving historical index prices.
573    ///
574    /// # Errors
575    ///
576    /// Returns an error if handling the historical index prices fails.
577    #[allow(unused_variables)]
578    fn on_historical_index_prices(
579        &mut self,
580        index_prices: &[IndexPriceUpdate],
581    ) -> anyhow::Result<()> {
582        Ok(())
583    }
584
585    /// Handles a received time event.
586    fn handle_time_event(&mut self, event: &TimeEvent) {
587        log_received(&event);
588
589        if self.not_running() {
590            log_not_running(&event);
591            return;
592        }
593
594        if let Err(e) = DataActor::on_time_event(self, event) {
595            log_error(&e);
596        }
597    }
598
599    /// Handles a received custom data point.
600    fn handle_data(&mut self, data: &CustomData) {
601        log_received(&data);
602
603        if self.not_running() {
604            log_not_running(&data);
605            return;
606        }
607
608        if let Err(e) = self.on_data(data) {
609            log_error(&e);
610        }
611    }
612
613    /// Handles a received signal.
614    fn handle_signal(&mut self, signal: &Signal) {
615        log_received(&signal);
616
617        if self.not_running() {
618            log_not_running(&signal);
619            return;
620        }
621
622        if let Err(e) = self.on_signal(signal) {
623            log_error(&e);
624        }
625    }
626
627    /// Handles a received instrument.
628    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
629        log_received(&instrument);
630
631        if self.not_running() {
632            log_not_running(&instrument);
633            return;
634        }
635
636        if let Err(e) = self.on_instrument(instrument) {
637            log_error(&e);
638        }
639    }
640
641    /// Handles received order book deltas.
642    fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
643        log_received(&deltas);
644
645        if self.not_running() {
646            log_not_running(&deltas);
647            return;
648        }
649
650        if let Err(e) = self.on_book_deltas(deltas) {
651            log_error(&e);
652        }
653    }
654
655    /// Handles a received order book reference.
656    fn handle_book(&mut self, book: &OrderBook) {
657        log_received(&book);
658
659        if self.not_running() {
660            log_not_running(&book);
661            return;
662        }
663
664        if let Err(e) = self.on_book(book) {
665            log_error(&e);
666        }
667    }
668
669    /// Handles a received quote.
670    fn handle_quote(&mut self, quote: &QuoteTick) {
671        log_received(&quote);
672
673        if self.not_running() {
674            log_not_running(&quote);
675            return;
676        }
677
678        if let Err(e) = self.on_quote(quote) {
679            log_error(&e);
680        }
681    }
682
683    /// Handles a received trade.
684    fn handle_trade(&mut self, trade: &TradeTick) {
685        log_received(&trade);
686
687        if self.not_running() {
688            log_not_running(&trade);
689            return;
690        }
691
692        if let Err(e) = self.on_trade(trade) {
693            log_error(&e);
694        }
695    }
696
697    /// Handles a receiving bar.
698    fn handle_bar(&mut self, bar: &Bar) {
699        log_received(&bar);
700
701        if self.not_running() {
702            log_not_running(&bar);
703            return;
704        }
705
706        if let Err(e) = self.on_bar(bar) {
707            log_error(&e);
708        }
709    }
710
711    /// Handles a received mark price update.
712    fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
713        log_received(&mark_price);
714
715        if self.not_running() {
716            log_not_running(&mark_price);
717            return;
718        }
719
720        if let Err(e) = self.on_mark_price(mark_price) {
721            log_error(&e);
722        }
723    }
724
725    /// Handles a received index price update.
726    fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
727        log_received(&index_price);
728
729        if self.not_running() {
730            log_not_running(&index_price);
731            return;
732        }
733
734        if let Err(e) = self.on_index_price(index_price) {
735            log_error(&e);
736        }
737    }
738
739    /// Handles a received funding rate update.
740    fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
741        log_received(&funding_rate);
742
743        if self.not_running() {
744            log_not_running(&funding_rate);
745            return;
746        }
747
748        if let Err(e) = self.on_funding_rate(funding_rate) {
749            log_error(&e);
750        }
751    }
752
753    /// Handles a received option greeks update.
754    fn handle_option_greeks(&mut self, greeks: &OptionGreeks) {
755        log_received(&greeks);
756
757        if self.not_running() {
758            log_not_running(&greeks);
759            return;
760        }
761
762        if let Err(e) = self.on_option_greeks(greeks) {
763            log_error(&e);
764        }
765    }
766
767    /// Handles a received option chain slice snapshot.
768    fn handle_option_chain(&mut self, slice: &OptionChainSlice) {
769        log_received(&slice);
770
771        if self.not_running() {
772            log_not_running(&slice);
773            return;
774        }
775
776        if let Err(e) = self.on_option_chain(slice) {
777            log_error(&e);
778        }
779    }
780
781    /// Handles a received instrument status.
782    fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
783        log_received(&status);
784
785        if self.not_running() {
786            log_not_running(&status);
787            return;
788        }
789
790        if let Err(e) = self.on_instrument_status(status) {
791            log_error(&e);
792        }
793    }
794
795    /// Handles a received instrument close.
796    fn handle_instrument_close(&mut self, close: &InstrumentClose) {
797        log_received(&close);
798
799        if self.not_running() {
800            log_not_running(&close);
801            return;
802        }
803
804        if let Err(e) = self.on_instrument_close(close) {
805            log_error(&e);
806        }
807    }
808
809    /// Handles a received order filled event.
810    fn handle_order_filled(&mut self, event: &OrderFilled) {
811        log_received(&event);
812
813        // Check for double-handling: if the event's strategy_id matches this actor's id,
814        // it means a Strategy is receiving its own fill event through both automatic
815        // subscription and manual subscribe_order_fills, so skip the manual handler.
816        if event.strategy_id.inner() == self.actor_id().inner() {
817            return;
818        }
819
820        if self.not_running() {
821            log_not_running(&event);
822            return;
823        }
824
825        if let Err(e) = self.on_order_filled(event) {
826            log_error(&e);
827        }
828    }
829
830    /// Handles a received order canceled event.
831    fn handle_order_canceled(&mut self, event: &OrderCanceled) {
832        log_received(&event);
833
834        // Check for double-handling: if the event's strategy_id matches this actor's id,
835        // it means a Strategy is receiving its own cancel event through both automatic
836        // subscription and manual subscribe_order_cancels, so skip the manual handler.
837        if event.strategy_id.inner() == self.actor_id().inner() {
838            return;
839        }
840
841        if self.not_running() {
842            log_not_running(&event);
843            return;
844        }
845
846        if let Err(e) = self.on_order_canceled(event) {
847            log_error(&e);
848        }
849    }
850
851    #[cfg(feature = "defi")]
852    /// Handles a received block.
853    fn handle_block(&mut self, block: &Block) {
854        log_received(&block);
855
856        if self.not_running() {
857            log_not_running(&block);
858            return;
859        }
860
861        if let Err(e) = self.on_block(block) {
862            log_error(&e);
863        }
864    }
865
866    #[cfg(feature = "defi")]
867    /// Handles a received pool definition update.
868    fn handle_pool(&mut self, pool: &Pool) {
869        log_received(&pool);
870
871        if self.not_running() {
872            log_not_running(&pool);
873            return;
874        }
875
876        if let Err(e) = self.on_pool(pool) {
877            log_error(&e);
878        }
879    }
880
881    #[cfg(feature = "defi")]
882    /// Handles a received pool swap.
883    fn handle_pool_swap(&mut self, swap: &PoolSwap) {
884        log_received(&swap);
885
886        if self.not_running() {
887            log_not_running(&swap);
888            return;
889        }
890
891        if let Err(e) = self.on_pool_swap(swap) {
892            log_error(&e);
893        }
894    }
895
896    #[cfg(feature = "defi")]
897    /// Handles a received pool liquidity update.
898    fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
899        log_received(&update);
900
901        if self.not_running() {
902            log_not_running(&update);
903            return;
904        }
905
906        if let Err(e) = self.on_pool_liquidity_update(update) {
907            log_error(&e);
908        }
909    }
910
911    #[cfg(feature = "defi")]
912    /// Handles a received pool fee collect.
913    fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
914        log_received(&collect);
915
916        if self.not_running() {
917            log_not_running(&collect);
918            return;
919        }
920
921        if let Err(e) = self.on_pool_fee_collect(collect) {
922            log_error(&e);
923        }
924    }
925
926    #[cfg(feature = "defi")]
927    /// Handles a received pool flash event.
928    fn handle_pool_flash(&mut self, flash: &PoolFlash) {
929        log_received(&flash);
930
931        if self.not_running() {
932            log_not_running(&flash);
933            return;
934        }
935
936        if let Err(e) = self.on_pool_flash(flash) {
937            log_error(&e);
938        }
939    }
940
941    /// Handles received historical data.
942    fn handle_historical_data(&mut self, data: &dyn Any) {
943        log_received(&data);
944
945        if let Err(e) = self.on_historical_data(data) {
946            log_error(&e);
947        }
948    }
949
950    /// Handles a data response.
951    fn handle_data_response(&mut self, resp: &CustomDataResponse) {
952        log_received(&resp);
953
954        if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
955            log_error(&e);
956        }
957    }
958
959    /// Handles an instrument response.
960    fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
961        log_received(&resp);
962
963        if let Err(e) = self.on_instrument(&resp.data) {
964            log_error(&e);
965        }
966    }
967
968    /// Handles an instruments response.
969    fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
970        log_received(&resp);
971
972        for inst in &resp.data {
973            if let Err(e) = self.on_instrument(inst) {
974                log_error(&e);
975            }
976        }
977    }
978
979    /// Handles a book response.
980    fn handle_book_response(&mut self, resp: &BookResponse) {
981        log_received(&resp);
982
983        if let Err(e) = self.on_book(&resp.data) {
984            log_error(&e);
985        }
986    }
987
988    /// Handles a quotes response.
989    fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
990        log_received(&resp);
991
992        if let Err(e) = self.on_historical_quotes(&resp.data) {
993            log_error(&e);
994        }
995    }
996
997    /// Handles a trades response.
998    fn handle_trades_response(&mut self, resp: &TradesResponse) {
999        log_received(&resp);
1000
1001        if let Err(e) = self.on_historical_trades(&resp.data) {
1002            log_error(&e);
1003        }
1004    }
1005
1006    /// Handles a funding rates response.
1007    fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
1008        log_received(&resp);
1009
1010        if let Err(e) = self.on_historical_funding_rates(&resp.data) {
1011            log_error(&e);
1012        }
1013    }
1014
1015    /// Handles a bars response.
1016    fn handle_bars_response(&mut self, resp: &BarsResponse) {
1017        log_received(&resp);
1018
1019        if let Err(e) = self.on_historical_bars(&resp.data) {
1020            log_error(&e);
1021        }
1022    }
1023
1024    /// Subscribe to streaming `data_type` data.
1025    fn subscribe_data(
1026        &mut self,
1027        data_type: DataType,
1028        client_id: Option<ClientId>,
1029        params: Option<Params>,
1030    ) where
1031        Self: 'static + Debug + Sized,
1032    {
1033        let actor_id = self.actor_id().inner();
1034        let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1035            get_actor_unchecked::<Self>(&actor_id).handle_data(data);
1036        });
1037
1038        DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
1039    }
1040
1041    /// Subscribe to [`Signal`] data by `name`.
1042    ///
1043    /// An empty `name` subscribes to every signal.
1044    fn subscribe_signal(&mut self, name: &str)
1045    where
1046        Self: 'static + Debug + Sized,
1047    {
1048        let actor_id = self.actor_id().inner();
1049        // Signals are published as `CustomData` wrapping a `Signal`; downcast
1050        // the inner value so subscribers receive the typed `Signal` in `on_signal`.
1051        let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1052            if let Some(signal) = data.data.as_any().downcast_ref::<Signal>() {
1053                if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1054                    actor.handle_signal(signal);
1055                } else {
1056                    log::error!("Actor {actor_id} not found for signal handling");
1057                }
1058            }
1059        });
1060
1061        DataActorCore::subscribe_signal(self, handler, name);
1062    }
1063
1064    /// Subscribe to streaming [`QuoteTick`] data for the `instrument_id`.
1065    fn subscribe_quotes(
1066        &mut self,
1067        instrument_id: InstrumentId,
1068        client_id: Option<ClientId>,
1069        params: Option<Params>,
1070    ) where
1071        Self: 'static + Debug + Sized,
1072    {
1073        let actor_id = self.actor_id().inner();
1074        let topic = get_quotes_topic(instrument_id);
1075
1076        let handler = TypedHandler::from(move |quote: &QuoteTick| {
1077            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1078                actor.handle_quote(quote);
1079            } else {
1080                log::error!("Actor {actor_id} not found for quote handling");
1081            }
1082        });
1083
1084        DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
1085    }
1086
1087    /// Subscribe to streaming [`InstrumentAny`] data for the `venue`.
1088    fn subscribe_instruments(
1089        &mut self,
1090        venue: Venue,
1091        client_id: Option<ClientId>,
1092        params: Option<Params>,
1093    ) where
1094        Self: 'static + Debug + Sized,
1095    {
1096        let actor_id = self.actor_id().inner();
1097        let pattern = get_instruments_pattern(venue);
1098
1099        let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
1100            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1101                actor.handle_instrument(instrument);
1102            } else {
1103                log::error!("Actor {actor_id} not found for instruments handling");
1104            }
1105        });
1106
1107        DataActorCore::subscribe_instruments(self, pattern, handler, venue, client_id, params);
1108    }
1109
1110    /// Subscribe to streaming [`InstrumentAny`] data for the `instrument_id`.
1111    fn subscribe_instrument(
1112        &mut self,
1113        instrument_id: InstrumentId,
1114        client_id: Option<ClientId>,
1115        params: Option<Params>,
1116    ) where
1117        Self: 'static + Debug + Sized,
1118    {
1119        let actor_id = self.actor_id().inner();
1120        let topic = get_instrument_topic(instrument_id);
1121
1122        let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
1123            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1124                actor.handle_instrument(instrument);
1125            } else {
1126                log::error!("Actor {actor_id} not found for instrument handling");
1127            }
1128        });
1129
1130        DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1131    }
1132
1133    /// Subscribe to streaming [`OrderBookDeltas`] data for the `instrument_id`.
1134    fn subscribe_book_deltas(
1135        &mut self,
1136        instrument_id: InstrumentId,
1137        book_type: BookType,
1138        depth: Option<NonZeroUsize>,
1139        client_id: Option<ClientId>,
1140        managed: bool,
1141        params: Option<Params>,
1142    ) where
1143        Self: 'static + Debug + Sized,
1144    {
1145        let actor_id = self.actor_id().inner();
1146        let topic = get_book_deltas_topic(instrument_id);
1147
1148        let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1149            get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1150        });
1151
1152        DataActorCore::subscribe_book_deltas(
1153            self,
1154            topic,
1155            handler,
1156            instrument_id,
1157            book_type,
1158            depth,
1159            client_id,
1160            managed,
1161            params,
1162        );
1163    }
1164
1165    /// Subscribe to [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1166    fn subscribe_book_at_interval(
1167        &mut self,
1168        instrument_id: InstrumentId,
1169        book_type: BookType,
1170        depth: Option<NonZeroUsize>,
1171        interval_ms: NonZeroUsize,
1172        client_id: Option<ClientId>,
1173        params: Option<Params>,
1174    ) where
1175        Self: 'static + Debug + Sized,
1176    {
1177        let actor_id = self.actor_id().inner();
1178        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1179
1180        let handler = TypedHandler::from(move |book: &OrderBook| {
1181            get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1182        });
1183
1184        DataActorCore::subscribe_book_at_interval(
1185            self,
1186            topic,
1187            handler,
1188            instrument_id,
1189            book_type,
1190            depth,
1191            interval_ms,
1192            client_id,
1193            params,
1194        );
1195    }
1196
1197    /// Subscribe to streaming [`TradeTick`] data for the `instrument_id`.
1198    fn subscribe_trades(
1199        &mut self,
1200        instrument_id: InstrumentId,
1201        client_id: Option<ClientId>,
1202        params: Option<Params>,
1203    ) where
1204        Self: 'static + Debug + Sized,
1205    {
1206        let actor_id = self.actor_id().inner();
1207        let topic = get_trades_topic(instrument_id);
1208
1209        let handler = TypedHandler::from(move |trade: &TradeTick| {
1210            get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1211        });
1212
1213        DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1214    }
1215
1216    /// Subscribe to streaming [`Bar`] data for the `bar_type`.
1217    fn subscribe_bars(
1218        &mut self,
1219        bar_type: BarType,
1220        client_id: Option<ClientId>,
1221        params: Option<Params>,
1222    ) where
1223        Self: 'static + Debug + Sized,
1224    {
1225        let actor_id = self.actor_id().inner();
1226        let topic = get_bars_topic(bar_type);
1227
1228        let handler = TypedHandler::from(move |bar: &Bar| {
1229            get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1230        });
1231
1232        DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1233    }
1234
1235    /// Subscribe to streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1236    fn subscribe_mark_prices(
1237        &mut self,
1238        instrument_id: InstrumentId,
1239        client_id: Option<ClientId>,
1240        params: Option<Params>,
1241    ) where
1242        Self: 'static + Debug + Sized,
1243    {
1244        let actor_id = self.actor_id().inner();
1245        let topic = get_mark_price_topic(instrument_id);
1246
1247        let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
1248            get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1249        });
1250
1251        DataActorCore::subscribe_mark_prices(
1252            self,
1253            topic,
1254            handler,
1255            instrument_id,
1256            client_id,
1257            params,
1258        );
1259    }
1260
1261    /// Subscribe to streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1262    fn subscribe_index_prices(
1263        &mut self,
1264        instrument_id: InstrumentId,
1265        client_id: Option<ClientId>,
1266        params: Option<Params>,
1267    ) where
1268        Self: 'static + Debug + Sized,
1269    {
1270        let actor_id = self.actor_id().inner();
1271        let topic = get_index_price_topic(instrument_id);
1272
1273        let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
1274            get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1275        });
1276
1277        DataActorCore::subscribe_index_prices(
1278            self,
1279            topic,
1280            handler,
1281            instrument_id,
1282            client_id,
1283            params,
1284        );
1285    }
1286
1287    /// Subscribe to streaming [`FundingRateUpdate`] data for the `instrument_id`.
1288    fn subscribe_funding_rates(
1289        &mut self,
1290        instrument_id: InstrumentId,
1291        client_id: Option<ClientId>,
1292        params: Option<Params>,
1293    ) where
1294        Self: 'static + Debug + Sized,
1295    {
1296        let actor_id = self.actor_id().inner();
1297        let topic = get_funding_rate_topic(instrument_id);
1298
1299        let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
1300            get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1301        });
1302
1303        DataActorCore::subscribe_funding_rates(
1304            self,
1305            topic,
1306            handler,
1307            instrument_id,
1308            client_id,
1309            params,
1310        );
1311    }
1312
1313    /// Subscribe to streaming [`OptionGreeks`] data for the `instrument_id`.
1314    fn subscribe_option_greeks(
1315        &mut self,
1316        instrument_id: InstrumentId,
1317        client_id: Option<ClientId>,
1318        params: Option<Params>,
1319    ) where
1320        Self: 'static + Debug + Sized,
1321    {
1322        let actor_id = self.actor_id().inner();
1323        let topic = get_option_greeks_topic(instrument_id);
1324
1325        let handler = TypedHandler::from(move |option_greeks: &OptionGreeks| {
1326            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1327                actor.handle_option_greeks(option_greeks);
1328            } else {
1329                log::error!("Actor {actor_id} not found for option greeks handling");
1330            }
1331        });
1332
1333        DataActorCore::subscribe_option_greeks(
1334            self,
1335            topic,
1336            handler,
1337            instrument_id,
1338            client_id,
1339            params,
1340        );
1341    }
1342
1343    /// Subscribe to streaming [`InstrumentStatus`] data for the `instrument_id`.
1344    fn subscribe_instrument_status(
1345        &mut self,
1346        instrument_id: InstrumentId,
1347        client_id: Option<ClientId>,
1348        params: Option<Params>,
1349    ) where
1350        Self: 'static + Debug + Sized,
1351    {
1352        let actor_id = self.actor_id().inner();
1353        let topic = get_instrument_status_topic(instrument_id);
1354
1355        let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
1356            get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1357        });
1358
1359        DataActorCore::subscribe_instrument_status(
1360            self,
1361            topic,
1362            handler,
1363            instrument_id,
1364            client_id,
1365            params,
1366        );
1367    }
1368
1369    /// Subscribe to streaming [`InstrumentClose`] data for the `instrument_id`.
1370    fn subscribe_instrument_close(
1371        &mut self,
1372        instrument_id: InstrumentId,
1373        client_id: Option<ClientId>,
1374        params: Option<Params>,
1375    ) where
1376        Self: 'static + Debug + Sized,
1377    {
1378        let actor_id = self.actor_id().inner();
1379        let topic = get_instrument_close_topic(instrument_id);
1380
1381        let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
1382            get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1383        });
1384
1385        DataActorCore::subscribe_instrument_close(
1386            self,
1387            topic,
1388            handler,
1389            instrument_id,
1390            client_id,
1391            params,
1392        );
1393    }
1394
1395    /// Subscribe to streaming [`OptionChainSlice`] snapshots for the option `series_id`.
1396    ///
1397    /// The ATM price is always derived from the exchange-provided forward price
1398    /// embedded in each option greeks/ticker update.
1399    fn subscribe_option_chain(
1400        &mut self,
1401        series_id: OptionSeriesId,
1402        strike_range: StrikeRange,
1403        snapshot_interval_ms: Option<u64>,
1404        client_id: Option<ClientId>,
1405        params: Option<Params>,
1406    ) where
1407        Self: 'static + Debug + Sized,
1408    {
1409        let actor_id = self.actor_id().inner();
1410        let topic = get_option_chain_topic(series_id);
1411
1412        let handler = TypedHandler::from(move |slice: &OptionChainSlice| {
1413            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1414                actor.handle_option_chain(slice);
1415            } else {
1416                log::error!("Actor {actor_id} not found for option chain handling");
1417            }
1418        });
1419
1420        DataActorCore::subscribe_option_chain(
1421            self,
1422            topic,
1423            handler,
1424            series_id,
1425            strike_range,
1426            snapshot_interval_ms,
1427            client_id,
1428            params,
1429        );
1430    }
1431
1432    /// Subscribe to [`OrderFilled`] events for the `instrument_id`.
1433    fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1434    where
1435        Self: 'static + Debug + Sized,
1436    {
1437        let actor_id = self.actor_id().inner();
1438        let topic = get_order_fills_topic(instrument_id);
1439
1440        let handler = TypedHandler::from(move |event: &OrderEventAny| {
1441            if let OrderEventAny::Filled(filled) = event {
1442                get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
1443            }
1444        });
1445
1446        DataActorCore::subscribe_order_fills(self, topic, handler);
1447    }
1448
1449    /// Subscribe to [`OrderCanceled`] events for the `instrument_id`.
1450    fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1451    where
1452        Self: 'static + Debug + Sized,
1453    {
1454        let actor_id = self.actor_id().inner();
1455        let topic = get_order_cancels_topic(instrument_id);
1456
1457        let handler = TypedHandler::from(move |event: &OrderEventAny| {
1458            if let OrderEventAny::Canceled(canceled) = event {
1459                get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
1460            }
1461        });
1462
1463        DataActorCore::subscribe_order_cancels(self, topic, handler);
1464    }
1465
1466    #[cfg(feature = "defi")]
1467    /// Subscribe to streaming [`Block`] data for the `chain`.
1468    fn subscribe_blocks(
1469        &mut self,
1470        chain: Blockchain,
1471        client_id: Option<ClientId>,
1472        params: Option<Params>,
1473    ) where
1474        Self: 'static + Debug + Sized,
1475    {
1476        let actor_id = self.actor_id().inner();
1477        let topic = defi::switchboard::get_defi_blocks_topic(chain);
1478
1479        let handler = TypedHandler::from(move |block: &Block| {
1480            get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1481        });
1482
1483        DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1484    }
1485
1486    #[cfg(feature = "defi")]
1487    /// Subscribe to streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1488    fn subscribe_pool(
1489        &mut self,
1490        instrument_id: InstrumentId,
1491        client_id: Option<ClientId>,
1492        params: Option<Params>,
1493    ) where
1494        Self: 'static + Debug + Sized,
1495    {
1496        let actor_id = self.actor_id().inner();
1497        let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1498
1499        let handler = TypedHandler::from(move |pool: &Pool| {
1500            get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1501        });
1502
1503        DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1504    }
1505
1506    #[cfg(feature = "defi")]
1507    /// Subscribe to streaming [`PoolSwap`] data for the `instrument_id`.
1508    fn subscribe_pool_swaps(
1509        &mut self,
1510        instrument_id: InstrumentId,
1511        client_id: Option<ClientId>,
1512        params: Option<Params>,
1513    ) where
1514        Self: 'static + Debug + Sized,
1515    {
1516        let actor_id = self.actor_id().inner();
1517        let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1518
1519        let handler = TypedHandler::from(move |swap: &PoolSwap| {
1520            get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1521        });
1522
1523        DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1524    }
1525
1526    #[cfg(feature = "defi")]
1527    /// Subscribe to streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1528    fn subscribe_pool_liquidity_updates(
1529        &mut self,
1530        instrument_id: InstrumentId,
1531        client_id: Option<ClientId>,
1532        params: Option<Params>,
1533    ) where
1534        Self: 'static + Debug + Sized,
1535    {
1536        let actor_id = self.actor_id().inner();
1537        let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1538
1539        let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
1540            get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1541        });
1542
1543        DataActorCore::subscribe_pool_liquidity_updates(
1544            self,
1545            topic,
1546            handler,
1547            instrument_id,
1548            client_id,
1549            params,
1550        );
1551    }
1552
1553    #[cfg(feature = "defi")]
1554    /// Subscribe to streaming [`PoolFeeCollect`] data for the `instrument_id`.
1555    fn subscribe_pool_fee_collects(
1556        &mut self,
1557        instrument_id: InstrumentId,
1558        client_id: Option<ClientId>,
1559        params: Option<Params>,
1560    ) where
1561        Self: 'static + Debug + Sized,
1562    {
1563        let actor_id = self.actor_id().inner();
1564        let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1565
1566        let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
1567            get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1568        });
1569
1570        DataActorCore::subscribe_pool_fee_collects(
1571            self,
1572            topic,
1573            handler,
1574            instrument_id,
1575            client_id,
1576            params,
1577        );
1578    }
1579
1580    #[cfg(feature = "defi")]
1581    /// Subscribe to streaming [`PoolFlash`] events for the given `instrument_id`.
1582    fn subscribe_pool_flash_events(
1583        &mut self,
1584        instrument_id: InstrumentId,
1585        client_id: Option<ClientId>,
1586        params: Option<Params>,
1587    ) where
1588        Self: 'static + Debug + Sized,
1589    {
1590        let actor_id = self.actor_id().inner();
1591        let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1592
1593        let handler = TypedHandler::from(move |flash: &PoolFlash| {
1594            get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1595        });
1596
1597        DataActorCore::subscribe_pool_flash_events(
1598            self,
1599            topic,
1600            handler,
1601            instrument_id,
1602            client_id,
1603            params,
1604        );
1605    }
1606
1607    /// Unsubscribe from streaming `data_type` data.
1608    fn unsubscribe_data(
1609        &mut self,
1610        data_type: DataType,
1611        client_id: Option<ClientId>,
1612        params: Option<Params>,
1613    ) where
1614        Self: 'static + Debug + Sized,
1615    {
1616        DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1617    }
1618
1619    /// Unsubscribe from [`Signal`] data by `name`.
1620    fn unsubscribe_signal(&mut self, name: &str)
1621    where
1622        Self: 'static + Debug + Sized,
1623    {
1624        DataActorCore::unsubscribe_signal(self, name);
1625    }
1626
1627    /// Unsubscribe from streaming [`InstrumentAny`] data for the `venue`.
1628    fn unsubscribe_instruments(
1629        &mut self,
1630        venue: Venue,
1631        client_id: Option<ClientId>,
1632        params: Option<Params>,
1633    ) where
1634        Self: 'static + Debug + Sized,
1635    {
1636        DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1637    }
1638
1639    /// Unsubscribe from streaming [`InstrumentAny`] data for the `instrument_id`.
1640    fn unsubscribe_instrument(
1641        &mut self,
1642        instrument_id: InstrumentId,
1643        client_id: Option<ClientId>,
1644        params: Option<Params>,
1645    ) where
1646        Self: 'static + Debug + Sized,
1647    {
1648        DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1649    }
1650
1651    /// Unsubscribe from streaming [`OrderBookDeltas`] data for the `instrument_id`.
1652    fn unsubscribe_book_deltas(
1653        &mut self,
1654        instrument_id: InstrumentId,
1655        client_id: Option<ClientId>,
1656        params: Option<Params>,
1657    ) where
1658        Self: 'static + Debug + Sized,
1659    {
1660        DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1661    }
1662
1663    /// Unsubscribe from [`OrderBook`] snapshots at a specified interval for the `instrument_id`.
1664    fn unsubscribe_book_at_interval(
1665        &mut self,
1666        instrument_id: InstrumentId,
1667        interval_ms: NonZeroUsize,
1668        client_id: Option<ClientId>,
1669        params: Option<Params>,
1670    ) where
1671        Self: 'static + Debug + Sized,
1672    {
1673        DataActorCore::unsubscribe_book_at_interval(
1674            self,
1675            instrument_id,
1676            interval_ms,
1677            client_id,
1678            params,
1679        );
1680    }
1681
1682    /// Unsubscribe from streaming [`QuoteTick`] data for the `instrument_id`.
1683    fn unsubscribe_quotes(
1684        &mut self,
1685        instrument_id: InstrumentId,
1686        client_id: Option<ClientId>,
1687        params: Option<Params>,
1688    ) where
1689        Self: 'static + Debug + Sized,
1690    {
1691        DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1692    }
1693
1694    /// Unsubscribe from streaming [`TradeTick`] data for the `instrument_id`.
1695    fn unsubscribe_trades(
1696        &mut self,
1697        instrument_id: InstrumentId,
1698        client_id: Option<ClientId>,
1699        params: Option<Params>,
1700    ) where
1701        Self: 'static + Debug + Sized,
1702    {
1703        DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1704    }
1705
1706    /// Unsubscribe from streaming [`Bar`] data for the `bar_type`.
1707    fn unsubscribe_bars(
1708        &mut self,
1709        bar_type: BarType,
1710        client_id: Option<ClientId>,
1711        params: Option<Params>,
1712    ) where
1713        Self: 'static + Debug + Sized,
1714    {
1715        DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1716    }
1717
1718    /// Unsubscribe from streaming [`MarkPriceUpdate`] data for the `instrument_id`.
1719    fn unsubscribe_mark_prices(
1720        &mut self,
1721        instrument_id: InstrumentId,
1722        client_id: Option<ClientId>,
1723        params: Option<Params>,
1724    ) where
1725        Self: 'static + Debug + Sized,
1726    {
1727        DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1728    }
1729
1730    /// Unsubscribe from streaming [`IndexPriceUpdate`] data for the `instrument_id`.
1731    fn unsubscribe_index_prices(
1732        &mut self,
1733        instrument_id: InstrumentId,
1734        client_id: Option<ClientId>,
1735        params: Option<Params>,
1736    ) where
1737        Self: 'static + Debug + Sized,
1738    {
1739        DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1740    }
1741
1742    /// Unsubscribe from streaming [`FundingRateUpdate`] data for the `instrument_id`.
1743    fn unsubscribe_funding_rates(
1744        &mut self,
1745        instrument_id: InstrumentId,
1746        client_id: Option<ClientId>,
1747        params: Option<Params>,
1748    ) where
1749        Self: 'static + Debug + Sized,
1750    {
1751        DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1752    }
1753
1754    /// Unsubscribe from streaming [`OptionGreeks`] data for the `instrument_id`.
1755    fn unsubscribe_option_greeks(
1756        &mut self,
1757        instrument_id: InstrumentId,
1758        client_id: Option<ClientId>,
1759        params: Option<Params>,
1760    ) where
1761        Self: 'static + Debug + Sized,
1762    {
1763        DataActorCore::unsubscribe_option_greeks(self, instrument_id, client_id, params);
1764    }
1765
1766    /// Unsubscribe from streaming [`InstrumentStatus`] data for the `instrument_id`.
1767    fn unsubscribe_instrument_status(
1768        &mut self,
1769        instrument_id: InstrumentId,
1770        client_id: Option<ClientId>,
1771        params: Option<Params>,
1772    ) where
1773        Self: 'static + Debug + Sized,
1774    {
1775        DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1776    }
1777
1778    /// Unsubscribe from streaming [`InstrumentClose`] data for the `instrument_id`.
1779    fn unsubscribe_instrument_close(
1780        &mut self,
1781        instrument_id: InstrumentId,
1782        client_id: Option<ClientId>,
1783        params: Option<Params>,
1784    ) where
1785        Self: 'static + Debug + Sized,
1786    {
1787        DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1788    }
1789
1790    /// Unsubscribe from streaming [`OptionChainSlice`] snapshots for the option `series_id`.
1791    fn unsubscribe_option_chain(&mut self, series_id: OptionSeriesId, client_id: Option<ClientId>)
1792    where
1793        Self: 'static + Debug + Sized,
1794    {
1795        DataActorCore::unsubscribe_option_chain(self, series_id, client_id);
1796    }
1797
1798    /// Unsubscribe from [`OrderFilled`] events for the `instrument_id`.
1799    fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1800    where
1801        Self: 'static + Debug + Sized,
1802    {
1803        DataActorCore::unsubscribe_order_fills(self, instrument_id);
1804    }
1805
1806    /// Unsubscribe from [`OrderCanceled`] events for the `instrument_id`.
1807    fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1808    where
1809        Self: 'static + Debug + Sized,
1810    {
1811        DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1812    }
1813
1814    #[cfg(feature = "defi")]
1815    /// Unsubscribe from streaming [`Block`] data for the `chain`.
1816    fn unsubscribe_blocks(
1817        &mut self,
1818        chain: Blockchain,
1819        client_id: Option<ClientId>,
1820        params: Option<Params>,
1821    ) where
1822        Self: 'static + Debug + Sized,
1823    {
1824        DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1825    }
1826
1827    #[cfg(feature = "defi")]
1828    /// Unsubscribe from streaming [`Pool`] definition updates for the AMM pool at the `instrument_id`.
1829    fn unsubscribe_pool(
1830        &mut self,
1831        instrument_id: InstrumentId,
1832        client_id: Option<ClientId>,
1833        params: Option<Params>,
1834    ) where
1835        Self: 'static + Debug + Sized,
1836    {
1837        DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1838    }
1839
1840    #[cfg(feature = "defi")]
1841    /// Unsubscribe from streaming [`PoolSwap`] data for the `instrument_id`.
1842    fn unsubscribe_pool_swaps(
1843        &mut self,
1844        instrument_id: InstrumentId,
1845        client_id: Option<ClientId>,
1846        params: Option<Params>,
1847    ) where
1848        Self: 'static + Debug + Sized,
1849    {
1850        DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1851    }
1852
1853    #[cfg(feature = "defi")]
1854    /// Unsubscribe from streaming [`PoolLiquidityUpdate`] data for the `instrument_id`.
1855    fn unsubscribe_pool_liquidity_updates(
1856        &mut self,
1857        instrument_id: InstrumentId,
1858        client_id: Option<ClientId>,
1859        params: Option<Params>,
1860    ) where
1861        Self: 'static + Debug + Sized,
1862    {
1863        DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1864    }
1865
1866    #[cfg(feature = "defi")]
1867    /// Unsubscribe from streaming [`PoolFeeCollect`] data for the `instrument_id`.
1868    fn unsubscribe_pool_fee_collects(
1869        &mut self,
1870        instrument_id: InstrumentId,
1871        client_id: Option<ClientId>,
1872        params: Option<Params>,
1873    ) where
1874        Self: 'static + Debug + Sized,
1875    {
1876        DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1877    }
1878
1879    #[cfg(feature = "defi")]
1880    /// Unsubscribe from streaming [`PoolFlash`] events for the given `instrument_id`.
1881    fn unsubscribe_pool_flash_events(
1882        &mut self,
1883        instrument_id: InstrumentId,
1884        client_id: Option<ClientId>,
1885        params: Option<Params>,
1886    ) where
1887        Self: 'static + Debug + Sized,
1888    {
1889        DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1890    }
1891
1892    /// Request historical custom data of the given `data_type`.
1893    ///
1894    /// # Errors
1895    ///
1896    /// Returns an error if input parameters are invalid.
1897    fn request_data(
1898        &mut self,
1899        data_type: DataType,
1900        client_id: ClientId,
1901        start: Option<DateTime<Utc>>,
1902        end: Option<DateTime<Utc>>,
1903        limit: Option<NonZeroUsize>,
1904        params: Option<Params>,
1905    ) -> anyhow::Result<UUID4>
1906    where
1907        Self: 'static + Debug + Sized,
1908    {
1909        let actor_id = self.actor_id().inner();
1910        let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
1911            get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1912        });
1913
1914        DataActorCore::request_data(
1915            self, data_type, client_id, start, end, limit, params, handler,
1916        )
1917    }
1918
1919    /// Request historical [`InstrumentResponse`] data for the given `instrument_id`.
1920    ///
1921    /// # Errors
1922    ///
1923    /// Returns an error if input parameters are invalid.
1924    fn request_instrument(
1925        &mut self,
1926        instrument_id: InstrumentId,
1927        start: Option<DateTime<Utc>>,
1928        end: Option<DateTime<Utc>>,
1929        client_id: Option<ClientId>,
1930        params: Option<Params>,
1931    ) -> anyhow::Result<UUID4>
1932    where
1933        Self: 'static + Debug + Sized,
1934    {
1935        let actor_id = self.actor_id().inner();
1936        let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
1937            get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1938        });
1939
1940        DataActorCore::request_instrument(
1941            self,
1942            instrument_id,
1943            start,
1944            end,
1945            client_id,
1946            params,
1947            handler,
1948        )
1949    }
1950
1951    /// Request historical [`InstrumentsResponse`] definitions for the optional `venue`.
1952    ///
1953    /// # Errors
1954    ///
1955    /// Returns an error if input parameters are invalid.
1956    fn request_instruments(
1957        &mut self,
1958        venue: Option<Venue>,
1959        start: Option<DateTime<Utc>>,
1960        end: Option<DateTime<Utc>>,
1961        client_id: Option<ClientId>,
1962        params: Option<Params>,
1963    ) -> anyhow::Result<UUID4>
1964    where
1965        Self: 'static + Debug + Sized,
1966    {
1967        let actor_id = self.actor_id().inner();
1968        let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
1969            get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1970        });
1971
1972        DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1973    }
1974
1975    /// Request an [`OrderBook`] snapshot for the given `instrument_id`.
1976    ///
1977    /// # Errors
1978    ///
1979    /// Returns an error if input parameters are invalid.
1980    fn request_book_snapshot(
1981        &mut self,
1982        instrument_id: InstrumentId,
1983        depth: Option<NonZeroUsize>,
1984        client_id: Option<ClientId>,
1985        params: Option<Params>,
1986    ) -> anyhow::Result<UUID4>
1987    where
1988        Self: 'static + Debug + Sized,
1989    {
1990        let actor_id = self.actor_id().inner();
1991        let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
1992            get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1993        });
1994
1995        DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1996    }
1997
1998    /// Request historical [`QuoteTick`] data for the given `instrument_id`.
1999    ///
2000    /// # Errors
2001    ///
2002    /// Returns an error if input parameters are invalid.
2003    fn request_quotes(
2004        &mut self,
2005        instrument_id: InstrumentId,
2006        start: Option<DateTime<Utc>>,
2007        end: Option<DateTime<Utc>>,
2008        limit: Option<NonZeroUsize>,
2009        client_id: Option<ClientId>,
2010        params: Option<Params>,
2011    ) -> anyhow::Result<UUID4>
2012    where
2013        Self: 'static + Debug + Sized,
2014    {
2015        let actor_id = self.actor_id().inner();
2016        let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
2017            get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
2018        });
2019
2020        DataActorCore::request_quotes(
2021            self,
2022            instrument_id,
2023            start,
2024            end,
2025            limit,
2026            client_id,
2027            params,
2028            handler,
2029        )
2030    }
2031
2032    /// Request historical [`TradeTick`] data for the given `instrument_id`.
2033    ///
2034    /// # Errors
2035    ///
2036    /// Returns an error if input parameters are invalid.
2037    fn request_trades(
2038        &mut self,
2039        instrument_id: InstrumentId,
2040        start: Option<DateTime<Utc>>,
2041        end: Option<DateTime<Utc>>,
2042        limit: Option<NonZeroUsize>,
2043        client_id: Option<ClientId>,
2044        params: Option<Params>,
2045    ) -> anyhow::Result<UUID4>
2046    where
2047        Self: 'static + Debug + Sized,
2048    {
2049        let actor_id = self.actor_id().inner();
2050        let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
2051            get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
2052        });
2053
2054        DataActorCore::request_trades(
2055            self,
2056            instrument_id,
2057            start,
2058            end,
2059            limit,
2060            client_id,
2061            params,
2062            handler,
2063        )
2064    }
2065
2066    /// Request historical [`FundingRateUpdate`] data for the given `instrument_id`.
2067    ///
2068    /// # Errors
2069    ///
2070    /// Returns an error if input parameters are invalid.
2071    fn request_funding_rates(
2072        &mut self,
2073        instrument_id: InstrumentId,
2074        start: Option<DateTime<Utc>>,
2075        end: Option<DateTime<Utc>>,
2076        limit: Option<NonZeroUsize>,
2077        client_id: Option<ClientId>,
2078        params: Option<Params>,
2079    ) -> anyhow::Result<UUID4>
2080    where
2081        Self: 'static + Debug + Sized,
2082    {
2083        let actor_id = self.actor_id().inner();
2084        let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
2085            get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
2086        });
2087
2088        DataActorCore::request_funding_rates(
2089            self,
2090            instrument_id,
2091            start,
2092            end,
2093            limit,
2094            client_id,
2095            params,
2096            handler,
2097        )
2098    }
2099
2100    /// Request historical [`Bar`] data for the given `bar_type`.
2101    ///
2102    /// # Errors
2103    ///
2104    /// Returns an error if input parameters are invalid.
2105    fn request_bars(
2106        &mut self,
2107        bar_type: BarType,
2108        start: Option<DateTime<Utc>>,
2109        end: Option<DateTime<Utc>>,
2110        limit: Option<NonZeroUsize>,
2111        client_id: Option<ClientId>,
2112        params: Option<Params>,
2113    ) -> anyhow::Result<UUID4>
2114    where
2115        Self: 'static + Debug + Sized,
2116    {
2117        let actor_id = self.actor_id().inner();
2118        let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
2119            get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
2120        });
2121
2122        DataActorCore::request_bars(
2123            self, bar_type, start, end, limit, client_id, params, handler,
2124        )
2125    }
2126}
2127
2128// Blanket implementation: any DataActor automatically implements Actor
2129impl<T> Actor for T
2130where
2131    T: DataActor + Debug + 'static,
2132{
2133    fn id(&self) -> Ustr {
2134        self.actor_id.inner()
2135    }
2136
2137    #[allow(unused_variables)]
2138    fn handle(&mut self, msg: &dyn Any) {
2139        // Default empty implementation - concrete actors can override if needed
2140    }
2141
2142    fn as_any(&self) -> &dyn Any {
2143        self
2144    }
2145}
2146
2147// Blanket implementation: any DataActor automatically implements Component
2148impl<T> Component for T
2149where
2150    T: DataActor + Debug + 'static,
2151{
2152    fn component_id(&self) -> ComponentId {
2153        ComponentId::new(self.actor_id.inner().as_str())
2154    }
2155
2156    fn state(&self) -> ComponentState {
2157        self.state
2158    }
2159
2160    fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
2161        self.state = self.state.transition(&trigger)?;
2162        log::info!("{}", self.state.variant_name());
2163        Ok(())
2164    }
2165
2166    fn register(
2167        &mut self,
2168        trader_id: TraderId,
2169        clock: Rc<RefCell<dyn Clock>>,
2170        cache: Rc<RefCell<Cache>>,
2171    ) -> anyhow::Result<()> {
2172        DataActorCore::register(self, trader_id, clock.clone(), cache)?;
2173
2174        // Register default time event handler for this actor
2175        let actor_id = self.actor_id().inner();
2176        let callback = TimeEventCallback::from(move |event: TimeEvent| {
2177            if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
2178                actor.handle_time_event(&event);
2179            } else {
2180                log::error!("Actor {actor_id} not found for time event handling");
2181            }
2182        });
2183
2184        clock.borrow_mut().register_default_handler(callback);
2185
2186        self.initialize()
2187    }
2188
2189    fn on_start(&mut self) -> anyhow::Result<()> {
2190        DataActor::on_start(self)
2191    }
2192
2193    fn on_stop(&mut self) -> anyhow::Result<()> {
2194        DataActor::on_stop(self)
2195    }
2196
2197    fn on_resume(&mut self) -> anyhow::Result<()> {
2198        DataActor::on_resume(self)
2199    }
2200
2201    fn on_degrade(&mut self) -> anyhow::Result<()> {
2202        DataActor::on_degrade(self)
2203    }
2204
2205    fn on_fault(&mut self) -> anyhow::Result<()> {
2206        DataActor::on_fault(self)
2207    }
2208
2209    fn on_reset(&mut self) -> anyhow::Result<()> {
2210        DataActor::on_reset(self)
2211    }
2212
2213    fn on_dispose(&mut self) -> anyhow::Result<()> {
2214        DataActor::on_dispose(self)
2215    }
2216}
2217
2218/// Core functionality for all actors.
2219#[derive(Clone)]
2220#[allow(
2221    dead_code,
2222    reason = "TODO: Under development (pending_requests, signal_classes)"
2223)]
2224pub struct DataActorCore {
2225    /// The actor identifier.
2226    pub actor_id: ActorId,
2227    /// The actors configuration.
2228    pub config: DataActorConfig,
2229    trader_id: Option<TraderId>,
2230    clock: Option<Rc<RefCell<dyn Clock>>>, // Wired up on registration
2231    cache: Option<Rc<RefCell<Cache>>>,     // Wired up on registration
2232    state: ComponentState,
2233    topic_handlers: AHashMap<MStr<Pattern>, ShareableMessageHandler>,
2234    deltas_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDeltas>>,
2235    depth10_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDepth10>>,
2236    book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
2237    quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
2238    trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
2239    bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
2240    mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
2241    index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
2242    funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
2243    option_greeks_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionGreeks>>,
2244    option_chain_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionChainSlice>>,
2245    order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
2246    #[cfg(feature = "defi")]
2247    block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
2248    #[cfg(feature = "defi")]
2249    pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
2250    #[cfg(feature = "defi")]
2251    pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
2252    #[cfg(feature = "defi")]
2253    pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
2254    #[cfg(feature = "defi")]
2255    pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
2256    #[cfg(feature = "defi")]
2257    pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
2258    warning_events: AHashSet<String>, // TODO: TBD
2259    pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2260    signal_classes: AHashMap<String, String>,
2261    #[cfg(feature = "indicators")]
2262    indicators: Indicators,
2263}
2264
2265impl Debug for DataActorCore {
2266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2267        f.debug_struct(stringify!(DataActorCore))
2268            .field("actor_id", &self.actor_id)
2269            .field("config", &self.config)
2270            .field("state", &self.state)
2271            .field("trader_id", &self.trader_id)
2272            .finish()
2273    }
2274}
2275
2276impl DataActorCore {
2277    /// Adds a subscription handler for the `topic`.
2278    ///
2279    //// Logs a warning if the actor is already subscribed to the topic.
2280    pub(crate) fn add_subscription_any(
2281        &mut self,
2282        topic: MStr<Topic>,
2283        handler: ShareableMessageHandler,
2284    ) {
2285        let pattern: MStr<Pattern> = topic.into();
2286        if self.topic_handlers.contains_key(&pattern) {
2287            log::warn!(
2288                "Actor {} attempted duplicate subscription to topic '{topic}'",
2289                self.actor_id,
2290            );
2291            return;
2292        }
2293
2294        self.topic_handlers.insert(pattern, handler.clone());
2295        msgbus::subscribe_any(pattern, handler, None);
2296    }
2297
2298    /// Removes a subscription handler for the `topic` if present.
2299    ///
2300    /// Logs a warning if the actor is not currently subscribed to the topic.
2301    pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
2302        let pattern: MStr<Pattern> = topic.into();
2303        if let Some(handler) = self.topic_handlers.remove(&pattern) {
2304            msgbus::unsubscribe_any(pattern, &handler);
2305        } else {
2306            log::warn!(
2307                "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2308                self.actor_id,
2309            );
2310        }
2311    }
2312
2313    pub(crate) fn add_quote_subscription(
2314        &mut self,
2315        topic: MStr<Topic>,
2316        handler: TypedHandler<QuoteTick>,
2317    ) {
2318        if self.quote_handlers.contains_key(&topic) {
2319            log::warn!(
2320                "Actor {} attempted duplicate quote subscription to '{topic}'",
2321                self.actor_id
2322            );
2323            return;
2324        }
2325        self.quote_handlers.insert(topic, handler.clone());
2326        msgbus::subscribe_quotes(topic.into(), handler, None);
2327    }
2328
2329    #[allow(dead_code)]
2330    pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
2331        if let Some(handler) = self.quote_handlers.remove(&topic) {
2332            msgbus::unsubscribe_quotes(topic.into(), &handler);
2333        }
2334    }
2335
2336    pub(crate) fn add_trade_subscription(
2337        &mut self,
2338        topic: MStr<Topic>,
2339        handler: TypedHandler<TradeTick>,
2340    ) {
2341        if self.trade_handlers.contains_key(&topic) {
2342            log::warn!(
2343                "Actor {} attempted duplicate trade subscription to '{topic}'",
2344                self.actor_id
2345            );
2346            return;
2347        }
2348        self.trade_handlers.insert(topic, handler.clone());
2349        msgbus::subscribe_trades(topic.into(), handler, None);
2350    }
2351
2352    #[allow(dead_code)]
2353    pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
2354        if let Some(handler) = self.trade_handlers.remove(&topic) {
2355            msgbus::unsubscribe_trades(topic.into(), &handler);
2356        }
2357    }
2358
2359    pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
2360        if self.bar_handlers.contains_key(&topic) {
2361            log::warn!(
2362                "Actor {} attempted duplicate bar subscription to '{topic}'",
2363                self.actor_id
2364            );
2365            return;
2366        }
2367        self.bar_handlers.insert(topic, handler.clone());
2368        msgbus::subscribe_bars(topic.into(), handler, None);
2369    }
2370
2371    #[allow(dead_code)]
2372    pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
2373        if let Some(handler) = self.bar_handlers.remove(&topic) {
2374            msgbus::unsubscribe_bars(topic.into(), &handler);
2375        }
2376    }
2377
2378    pub(crate) fn add_order_event_subscription(
2379        &mut self,
2380        topic: MStr<Topic>,
2381        handler: TypedHandler<OrderEventAny>,
2382    ) {
2383        if self.order_event_handlers.contains_key(&topic) {
2384            log::warn!(
2385                "Actor {} attempted duplicate order event subscription to '{topic}'",
2386                self.actor_id
2387            );
2388            return;
2389        }
2390        self.order_event_handlers.insert(topic, handler.clone());
2391        msgbus::subscribe_order_events(topic.into(), handler, None);
2392    }
2393
2394    #[allow(dead_code)]
2395    pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
2396        if let Some(handler) = self.order_event_handlers.remove(&topic) {
2397            msgbus::unsubscribe_order_events(topic.into(), &handler);
2398        }
2399    }
2400
2401    pub(crate) fn add_deltas_subscription(
2402        &mut self,
2403        topic: MStr<Topic>,
2404        handler: TypedHandler<OrderBookDeltas>,
2405    ) {
2406        if self.deltas_handlers.contains_key(&topic) {
2407            log::warn!(
2408                "Actor {} attempted duplicate deltas subscription to '{topic}'",
2409                self.actor_id
2410            );
2411            return;
2412        }
2413        self.deltas_handlers.insert(topic, handler.clone());
2414        msgbus::subscribe_book_deltas(topic.into(), handler, None);
2415    }
2416
2417    #[allow(dead_code)]
2418    pub(crate) fn remove_deltas_subscription(&mut self, topic: MStr<Topic>) {
2419        if let Some(handler) = self.deltas_handlers.remove(&topic) {
2420            msgbus::unsubscribe_book_deltas(topic.into(), &handler);
2421        }
2422    }
2423
2424    #[allow(dead_code)]
2425    pub(crate) fn add_depth10_subscription(
2426        &mut self,
2427        topic: MStr<Topic>,
2428        handler: TypedHandler<OrderBookDepth10>,
2429    ) {
2430        if self.depth10_handlers.contains_key(&topic) {
2431            log::warn!(
2432                "Actor {} attempted duplicate depth10 subscription to '{topic}'",
2433                self.actor_id
2434            );
2435            return;
2436        }
2437        self.depth10_handlers.insert(topic, handler.clone());
2438        msgbus::subscribe_book_depth10(topic.into(), handler, None);
2439    }
2440
2441    #[allow(dead_code)]
2442    pub(crate) fn remove_depth10_subscription(&mut self, topic: MStr<Topic>) {
2443        if let Some(handler) = self.depth10_handlers.remove(&topic) {
2444            msgbus::unsubscribe_book_depth10(topic.into(), &handler);
2445        }
2446    }
2447
2448    pub(crate) fn add_instrument_subscription(
2449        &mut self,
2450        pattern: MStr<Pattern>,
2451        handler: ShareableMessageHandler,
2452    ) {
2453        if self.topic_handlers.contains_key(&pattern) {
2454            log::warn!(
2455                "Actor {} attempted duplicate instrument subscription to '{pattern}'",
2456                self.actor_id
2457            );
2458            return;
2459        }
2460        self.topic_handlers.insert(pattern, handler.clone());
2461        msgbus::subscribe_any(pattern, handler, None);
2462    }
2463
2464    #[allow(dead_code)]
2465    pub(crate) fn remove_instrument_subscription(&mut self, pattern: MStr<Pattern>) {
2466        if let Some(handler) = self.topic_handlers.remove(&pattern) {
2467            msgbus::unsubscribe_any(pattern, &handler);
2468        }
2469    }
2470
2471    pub(crate) fn add_instrument_close_subscription(
2472        &mut self,
2473        topic: MStr<Topic>,
2474        handler: ShareableMessageHandler,
2475    ) {
2476        let pattern: MStr<Pattern> = topic.into();
2477        if self.topic_handlers.contains_key(&pattern) {
2478            log::warn!(
2479                "Actor {} attempted duplicate instrument close subscription to '{topic}'",
2480                self.actor_id
2481            );
2482            return;
2483        }
2484        self.topic_handlers.insert(pattern, handler.clone());
2485        msgbus::subscribe_any(pattern, handler, None);
2486    }
2487
2488    #[allow(dead_code)]
2489    pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
2490        let pattern: MStr<Pattern> = topic.into();
2491        if let Some(handler) = self.topic_handlers.remove(&pattern) {
2492            msgbus::unsubscribe_any(pattern, &handler);
2493        }
2494    }
2495
2496    pub(crate) fn add_book_snapshot_subscription(
2497        &mut self,
2498        topic: MStr<Topic>,
2499        handler: TypedHandler<OrderBook>,
2500    ) {
2501        if self.book_handlers.contains_key(&topic) {
2502            log::warn!(
2503                "Actor {} attempted duplicate book snapshot subscription to '{topic}'",
2504                self.actor_id
2505            );
2506            return;
2507        }
2508        self.book_handlers.insert(topic, handler.clone());
2509        msgbus::subscribe_book_snapshots(topic.into(), handler, None);
2510    }
2511
2512    #[allow(dead_code)]
2513    pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
2514        if let Some(handler) = self.book_handlers.remove(&topic) {
2515            msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
2516        }
2517    }
2518
2519    pub(crate) fn add_mark_price_subscription(
2520        &mut self,
2521        topic: MStr<Topic>,
2522        handler: TypedHandler<MarkPriceUpdate>,
2523    ) {
2524        if self.mark_price_handlers.contains_key(&topic) {
2525            log::warn!(
2526                "Actor {} attempted duplicate mark price subscription to '{topic}'",
2527                self.actor_id
2528            );
2529            return;
2530        }
2531        self.mark_price_handlers.insert(topic, handler.clone());
2532        msgbus::subscribe_mark_prices(topic.into(), handler, None);
2533    }
2534
2535    #[allow(dead_code)]
2536    pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
2537        if let Some(handler) = self.mark_price_handlers.remove(&topic) {
2538            msgbus::unsubscribe_mark_prices(topic.into(), &handler);
2539        }
2540    }
2541
2542    pub(crate) fn add_index_price_subscription(
2543        &mut self,
2544        topic: MStr<Topic>,
2545        handler: TypedHandler<IndexPriceUpdate>,
2546    ) {
2547        if self.index_price_handlers.contains_key(&topic) {
2548            log::warn!(
2549                "Actor {} attempted duplicate index price subscription to '{topic}'",
2550                self.actor_id
2551            );
2552            return;
2553        }
2554        self.index_price_handlers.insert(topic, handler.clone());
2555        msgbus::subscribe_index_prices(topic.into(), handler, None);
2556    }
2557
2558    #[allow(dead_code)]
2559    pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
2560        if let Some(handler) = self.index_price_handlers.remove(&topic) {
2561            msgbus::unsubscribe_index_prices(topic.into(), &handler);
2562        }
2563    }
2564
2565    pub(crate) fn add_funding_rate_subscription(
2566        &mut self,
2567        topic: MStr<Topic>,
2568        handler: TypedHandler<FundingRateUpdate>,
2569    ) {
2570        if self.funding_rate_handlers.contains_key(&topic) {
2571            log::warn!(
2572                "Actor {} attempted duplicate funding rate subscription to '{topic}'",
2573                self.actor_id
2574            );
2575            return;
2576        }
2577        self.funding_rate_handlers.insert(topic, handler.clone());
2578        msgbus::subscribe_funding_rates(topic.into(), handler, None);
2579    }
2580
2581    #[allow(dead_code)]
2582    pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
2583        if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
2584            msgbus::unsubscribe_funding_rates(topic.into(), &handler);
2585        }
2586    }
2587
2588    pub(crate) fn add_option_greeks_subscription(
2589        &mut self,
2590        topic: MStr<Topic>,
2591        handler: TypedHandler<OptionGreeks>,
2592    ) {
2593        if self.option_greeks_handlers.contains_key(&topic) {
2594            log::warn!(
2595                "Actor {} attempted duplicate option greeks subscription to '{topic}'",
2596                self.actor_id
2597            );
2598            return;
2599        }
2600        self.option_greeks_handlers.insert(topic, handler.clone());
2601        msgbus::subscribe_option_greeks(topic.into(), handler, None);
2602    }
2603
2604    #[allow(dead_code)]
2605    pub(crate) fn remove_option_greeks_subscription(&mut self, topic: MStr<Topic>) {
2606        if let Some(handler) = self.option_greeks_handlers.remove(&topic) {
2607            msgbus::unsubscribe_option_greeks(topic.into(), &handler);
2608        }
2609    }
2610
2611    pub(crate) fn add_option_chain_subscription(
2612        &mut self,
2613        topic: MStr<Topic>,
2614        handler: TypedHandler<OptionChainSlice>,
2615    ) {
2616        if self.option_chain_handlers.contains_key(&topic) {
2617            log::warn!(
2618                "Actor {} attempted duplicate option chain subscription to '{topic}'",
2619                self.actor_id
2620            );
2621            return;
2622        }
2623        self.option_chain_handlers.insert(topic, handler.clone());
2624        msgbus::subscribe_option_chain(topic.into(), handler, None);
2625    }
2626
2627    pub(crate) fn remove_option_chain_subscription(&mut self, topic: MStr<Topic>) {
2628        if let Some(handler) = self.option_chain_handlers.remove(&topic) {
2629            msgbus::unsubscribe_option_chain(topic.into(), &handler);
2630        }
2631    }
2632
2633    #[cfg(feature = "defi")]
2634    pub(crate) fn add_block_subscription(
2635        &mut self,
2636        topic: MStr<Topic>,
2637        handler: TypedHandler<Block>,
2638    ) {
2639        if self.block_handlers.contains_key(&topic) {
2640            log::warn!(
2641                "Actor {} attempted duplicate block subscription to '{topic}'",
2642                self.actor_id
2643            );
2644            return;
2645        }
2646        self.block_handlers.insert(topic, handler.clone());
2647        msgbus::subscribe_defi_blocks(topic.into(), handler, None);
2648    }
2649
2650    #[cfg(feature = "defi")]
2651    #[allow(dead_code)]
2652    pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
2653        if let Some(handler) = self.block_handlers.remove(&topic) {
2654            msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
2655        }
2656    }
2657
2658    #[cfg(feature = "defi")]
2659    pub(crate) fn add_pool_subscription(
2660        &mut self,
2661        topic: MStr<Topic>,
2662        handler: TypedHandler<Pool>,
2663    ) {
2664        if self.pool_handlers.contains_key(&topic) {
2665            log::warn!(
2666                "Actor {} attempted duplicate pool subscription to '{topic}'",
2667                self.actor_id
2668            );
2669            return;
2670        }
2671        self.pool_handlers.insert(topic, handler.clone());
2672        msgbus::subscribe_defi_pools(topic.into(), handler, None);
2673    }
2674
2675    #[cfg(feature = "defi")]
2676    #[allow(dead_code)]
2677    pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
2678        if let Some(handler) = self.pool_handlers.remove(&topic) {
2679            msgbus::unsubscribe_defi_pools(topic.into(), &handler);
2680        }
2681    }
2682
2683    #[cfg(feature = "defi")]
2684    pub(crate) fn add_pool_swap_subscription(
2685        &mut self,
2686        topic: MStr<Topic>,
2687        handler: TypedHandler<PoolSwap>,
2688    ) {
2689        if self.pool_swap_handlers.contains_key(&topic) {
2690            log::warn!(
2691                "Actor {} attempted duplicate pool swap subscription to '{topic}'",
2692                self.actor_id
2693            );
2694            return;
2695        }
2696        self.pool_swap_handlers.insert(topic, handler.clone());
2697        msgbus::subscribe_defi_swaps(topic.into(), handler, None);
2698    }
2699
2700    #[cfg(feature = "defi")]
2701    #[allow(dead_code)]
2702    pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
2703        if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
2704            msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
2705        }
2706    }
2707
2708    #[cfg(feature = "defi")]
2709    pub(crate) fn add_pool_liquidity_subscription(
2710        &mut self,
2711        topic: MStr<Topic>,
2712        handler: TypedHandler<PoolLiquidityUpdate>,
2713    ) {
2714        if self.pool_liquidity_handlers.contains_key(&topic) {
2715            log::warn!(
2716                "Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
2717                self.actor_id
2718            );
2719            return;
2720        }
2721        self.pool_liquidity_handlers.insert(topic, handler.clone());
2722        msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
2723    }
2724
2725    #[cfg(feature = "defi")]
2726    #[allow(dead_code)]
2727    pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
2728        if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
2729            msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
2730        }
2731    }
2732
2733    #[cfg(feature = "defi")]
2734    pub(crate) fn add_pool_collect_subscription(
2735        &mut self,
2736        topic: MStr<Topic>,
2737        handler: TypedHandler<PoolFeeCollect>,
2738    ) {
2739        if self.pool_collect_handlers.contains_key(&topic) {
2740            log::warn!(
2741                "Actor {} attempted duplicate pool collect subscription to '{topic}'",
2742                self.actor_id
2743            );
2744            return;
2745        }
2746        self.pool_collect_handlers.insert(topic, handler.clone());
2747        msgbus::subscribe_defi_collects(topic.into(), handler, None);
2748    }
2749
2750    #[cfg(feature = "defi")]
2751    #[allow(dead_code)]
2752    pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
2753        if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
2754            msgbus::unsubscribe_defi_collects(topic.into(), &handler);
2755        }
2756    }
2757
2758    #[cfg(feature = "defi")]
2759    pub(crate) fn add_pool_flash_subscription(
2760        &mut self,
2761        topic: MStr<Topic>,
2762        handler: TypedHandler<PoolFlash>,
2763    ) {
2764        if self.pool_flash_handlers.contains_key(&topic) {
2765            log::warn!(
2766                "Actor {} attempted duplicate pool flash subscription to '{topic}'",
2767                self.actor_id
2768            );
2769            return;
2770        }
2771        self.pool_flash_handlers.insert(topic, handler.clone());
2772        msgbus::subscribe_defi_flash(topic.into(), handler, None);
2773    }
2774
2775    #[cfg(feature = "defi")]
2776    #[allow(dead_code)]
2777    pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
2778        if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
2779            msgbus::unsubscribe_defi_flash(topic.into(), &handler);
2780        }
2781    }
2782
2783    /// Creates a new [`DataActorCore`] instance.
2784    pub fn new(config: DataActorConfig) -> Self {
2785        let actor_id = config
2786            .actor_id
2787            .unwrap_or_else(|| Self::default_actor_id(&config));
2788
2789        Self {
2790            actor_id,
2791            config,
2792            trader_id: None, // None until registered
2793            clock: None,     // None until registered
2794            cache: None,     // None until registered
2795            state: ComponentState::default(),
2796            topic_handlers: AHashMap::new(),
2797            deltas_handlers: AHashMap::new(),
2798            depth10_handlers: AHashMap::new(),
2799            book_handlers: AHashMap::new(),
2800            quote_handlers: AHashMap::new(),
2801            trade_handlers: AHashMap::new(),
2802            bar_handlers: AHashMap::new(),
2803            mark_price_handlers: AHashMap::new(),
2804            index_price_handlers: AHashMap::new(),
2805            funding_rate_handlers: AHashMap::new(),
2806            option_greeks_handlers: AHashMap::new(),
2807            option_chain_handlers: AHashMap::new(),
2808            order_event_handlers: AHashMap::new(),
2809            #[cfg(feature = "defi")]
2810            block_handlers: AHashMap::new(),
2811            #[cfg(feature = "defi")]
2812            pool_handlers: AHashMap::new(),
2813            #[cfg(feature = "defi")]
2814            pool_swap_handlers: AHashMap::new(),
2815            #[cfg(feature = "defi")]
2816            pool_liquidity_handlers: AHashMap::new(),
2817            #[cfg(feature = "defi")]
2818            pool_collect_handlers: AHashMap::new(),
2819            #[cfg(feature = "defi")]
2820            pool_flash_handlers: AHashMap::new(),
2821            warning_events: AHashSet::new(),
2822            pending_requests: AHashMap::new(),
2823            signal_classes: AHashMap::new(),
2824            #[cfg(feature = "indicators")]
2825            indicators: Indicators::default(),
2826        }
2827    }
2828
2829    /// Returns the memory address of this instance as a hexadecimal string.
2830    #[must_use]
2831    pub fn mem_address(&self) -> String {
2832        format!("{self:p}")
2833    }
2834
2835    /// Returns the actors state.
2836    pub fn state(&self) -> ComponentState {
2837        self.state
2838    }
2839
2840    /// Returns the trader ID this actor is registered to.
2841    pub fn trader_id(&self) -> Option<TraderId> {
2842        self.trader_id
2843    }
2844
2845    /// Returns the actors ID.
2846    pub fn actor_id(&self) -> ActorId {
2847        self.actor_id
2848    }
2849
2850    fn default_actor_id(config: &DataActorConfig) -> ActorId {
2851        let memory_address = std::ptr::from_ref(config) as usize;
2852        ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2853    }
2854
2855    /// Returns a UNIX nanoseconds timestamp from the actor's internal clock.
2856    pub fn timestamp_ns(&self) -> UnixNanos {
2857        self.clock_ref().timestamp_ns()
2858    }
2859
2860    /// Returns the clock for the actor (if registered).
2861    ///
2862    /// # Panics
2863    ///
2864    /// Panics if the actor has not been registered with a trader.
2865    pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2866        self.clock
2867            .as_ref()
2868            .unwrap_or_else(|| {
2869                panic!(
2870                    "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2871                    self.actor_id, self.trader_id
2872                )
2873            })
2874            .borrow_mut()
2875    }
2876
2877    /// Returns a clone of the reference-counted clock.
2878    ///
2879    /// # Panics
2880    ///
2881    /// Panics if the actor has not yet been registered (clock is `None`).
2882    pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2883        self.clock
2884            .as_ref()
2885            .expect("DataActor must be registered before accessing clock")
2886            .clone()
2887    }
2888
2889    fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2890        self.clock
2891            .as_ref()
2892            .unwrap_or_else(|| {
2893                panic!(
2894                    "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2895                    self.actor_id, self.trader_id
2896                )
2897            })
2898            .borrow()
2899    }
2900
2901    /// Returns a read-only reference to the cache.
2902    ///
2903    /// # Panics
2904    ///
2905    /// Panics if the actor has not yet been registered (cache is `None`).
2906    pub fn cache(&self) -> Ref<'_, Cache> {
2907        self.cache
2908            .as_ref()
2909            .expect("DataActor must be registered before accessing cache")
2910            .borrow()
2911    }
2912
2913    /// Returns a clone of the reference-counted cache.
2914    ///
2915    /// # Panics
2916    ///
2917    /// Panics if the actor has not yet been registered (cache is `None`).
2918    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2919        self.cache
2920            .as_ref()
2921            .expect("DataActor must be registered before accessing cache")
2922            .clone()
2923    }
2924
2925    /// Register the data actor with a trader.
2926    ///
2927    /// # Errors
2928    ///
2929    /// Returns an error if the actor has already been registered with a trader
2930    /// or if the provided dependencies are invalid.
2931    pub fn register(
2932        &mut self,
2933        trader_id: TraderId,
2934        clock: Rc<RefCell<dyn Clock>>,
2935        cache: Rc<RefCell<Cache>>,
2936    ) -> anyhow::Result<()> {
2937        if let Some(existing_trader_id) = self.trader_id {
2938            anyhow::bail!(
2939                "DataActor {} already registered with trader {existing_trader_id}",
2940                self.actor_id
2941            );
2942        }
2943
2944        // Validate clock by attempting to access it
2945        {
2946            let _timestamp = clock.borrow().timestamp_ns();
2947        }
2948
2949        // Validate cache by attempting to access it
2950        {
2951            let _cache_borrow = cache.borrow();
2952        }
2953
2954        self.trader_id = Some(trader_id);
2955        self.clock = Some(clock);
2956        self.cache = Some(cache);
2957
2958        // Verify complete registration
2959        if !self.is_properly_registered() {
2960            anyhow::bail!(
2961                "DataActor {} registration incomplete - validation failed",
2962                self.actor_id
2963            );
2964        }
2965
2966        log::debug!("Registered {} with trader {trader_id}", self.actor_id);
2967        Ok(())
2968    }
2969
2970    /// Register an event type for warning log levels.
2971    pub fn register_warning_event(&mut self, event_type: &str) {
2972        self.warning_events.insert(event_type.to_string());
2973        log::debug!("Registered event type '{event_type}' for warning logs");
2974    }
2975
2976    /// Deregister an event type from warning log levels.
2977    pub fn deregister_warning_event(&mut self, event_type: &str) {
2978        self.warning_events.remove(event_type);
2979        log::debug!("Deregistered event type '{event_type}' from warning logs");
2980    }
2981
2982    pub fn is_registered(&self) -> bool {
2983        self.trader_id.is_some()
2984    }
2985
2986    pub(crate) fn check_registered(&self) {
2987        assert!(
2988            self.is_registered(),
2989            "Actor has not been registered with a Trader"
2990        );
2991    }
2992
2993    /// Validates registration state without panicking.
2994    fn is_properly_registered(&self) -> bool {
2995        self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2996    }
2997
2998    pub(crate) fn send_data_cmd(&self, command: DataCommand) {
2999        if self.config.log_commands {
3000            log::info!("{CMD}{SEND} {command:?}");
3001        }
3002
3003        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3004        msgbus::send_data_command(endpoint, command);
3005    }
3006
3007    #[allow(dead_code)]
3008    fn send_data_req(&self, request: &RequestCommand) {
3009        if self.config.log_commands {
3010            log::info!("{REQ}{SEND} {request:?}");
3011        }
3012
3013        // For now, simplified approach - data requests without dynamic handlers
3014        // TODO: Implement proper dynamic dispatch for response handlers
3015        let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3016        msgbus::send_any(endpoint, request.as_any());
3017    }
3018
3019    /// Sends a shutdown command to the system with an optional reason.
3020    ///
3021    /// # Panics
3022    ///
3023    /// Panics if the actor is not registered or has no trader ID.
3024    pub fn shutdown_system(&self, reason: Option<String>) {
3025        self.check_registered();
3026
3027        // Checked registered before unwrapping trader ID
3028        let command = ShutdownSystem::new(
3029            self.trader_id().unwrap(),
3030            self.actor_id.inner(),
3031            reason,
3032            UUID4::new(),
3033            self.timestamp_ns(),
3034        );
3035
3036        let topic = MessagingSwitchboard::shutdown_system_topic();
3037        msgbus::publish_any(topic, command.as_any());
3038    }
3039
3040    /// Publishes `data` on the message bus under the topic derived from `data_type`.
3041    ///
3042    /// `data_type` is kept as an explicit parameter (rather than deriving it from
3043    /// `data.data_type`) to mirror the v1 Python `publish_data(data_type, data)` API and
3044    /// to allow callers to override the routing topic from the payload's intrinsic type.
3045    ///
3046    /// # Panics
3047    ///
3048    /// Panics if the actor is not registered with a trader.
3049    pub fn publish_data(&self, data_type: &DataType, data: &CustomData) {
3050        self.check_registered();
3051
3052        let topic = get_custom_topic(data_type);
3053        msgbus::publish_any(topic, data);
3054    }
3055
3056    /// Publishes a [`Signal`] constructed from `name` and `value`, wrapped in [`CustomData`]
3057    /// so it is consumed by signal subscribers and by any `CustomData`-aware pipeline
3058    /// (for example the feather persistence writer).
3059    ///
3060    /// The topic mirrors the v1 Python scheme `data.Signal<TitleName>` so subscribers
3061    /// using either a specific name or the global wildcard are both notified.
3062    /// If `ts_event` is zero then the current clock timestamp is used.
3063    ///
3064    /// # Panics
3065    ///
3066    /// Panics if the actor is not registered with a trader.
3067    pub fn publish_signal(&self, name: &str, value: String, ts_event: UnixNanos) {
3068        self.check_registered();
3069
3070        let now = self.timestamp_ns();
3071        let ts_event = if ts_event.as_u64() == 0 {
3072            now
3073        } else {
3074            ts_event
3075        };
3076        let signal = Signal::new(Ustr::from(name), value, ts_event, now);
3077
3078        let data_type = DataType::new(
3079            &format!(
3080                "Signal{}",
3081                nautilus_core::string::conversions::title_case(name)
3082            ),
3083            None,
3084            None,
3085        );
3086        let data = CustomData::new(Arc::new(signal), data_type);
3087        let topic = get_custom_topic(&data.data_type);
3088        msgbus::publish_any(topic, &data);
3089    }
3090
3091    /// Adds the `synthetic` instrument to the cache.
3092    ///
3093    /// # Errors
3094    ///
3095    /// Returns an error if a synthetic with the same ID already exists, or if the
3096    /// backing cache fails to persist it. Panics if the actor is not registered
3097    /// with a trader. // panics-doc-ok
3098    pub fn add_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3099        self.check_registered();
3100
3101        let cache = self.cache_rc();
3102        if cache.borrow().synthetic(&synthetic.id).is_some() {
3103            anyhow::bail!("`synthetic` {} already exists", synthetic.id);
3104        }
3105        cache.borrow_mut().add_synthetic(synthetic)
3106    }
3107
3108    /// Updates the `synthetic` instrument in the cache, replacing the existing entry.
3109    ///
3110    /// # Errors
3111    ///
3112    /// Returns an error if no synthetic with the same ID already exists, or if the
3113    /// backing cache fails to persist the replacement. Panics if the actor is not
3114    /// registered with a trader. // panics-doc-ok
3115    pub fn update_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3116        self.check_registered();
3117
3118        let cache = self.cache_rc();
3119        if cache.borrow().synthetic(&synthetic.id).is_none() {
3120            anyhow::bail!("`synthetic` {} does not exist", synthetic.id);
3121        }
3122        cache.borrow_mut().add_synthetic(synthetic)
3123    }
3124
3125    /// Helper method for registering data subscriptions from the trait.
3126    ///
3127    /// # Panics
3128    ///
3129    /// Panics if the actor is not properly registered.
3130    pub fn subscribe_data(
3131        &mut self,
3132        handler: ShareableMessageHandler,
3133        data_type: DataType,
3134        client_id: Option<ClientId>,
3135        params: Option<Params>,
3136    ) {
3137        assert!(
3138            self.is_properly_registered(),
3139            "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
3140            self.actor_id,
3141            self.trader_id,
3142            self.clock.is_some(),
3143            self.cache.is_some()
3144        );
3145
3146        let topic = get_custom_topic(&data_type);
3147        self.add_subscription_any(topic, handler);
3148
3149        // If no client ID specified, just subscribe to the topic
3150        if client_id.is_none() {
3151            return;
3152        }
3153
3154        let command = SubscribeCommand::Data(SubscribeCustomData {
3155            data_type,
3156            client_id,
3157            venue: None,
3158            command_id: UUID4::new(),
3159            ts_init: self.timestamp_ns(),
3160            correlation_id: None,
3161            params,
3162        });
3163
3164        self.send_data_cmd(DataCommand::Subscribe(command));
3165    }
3166
3167    /// Helper method for registering signal subscriptions from the trait.
3168    ///
3169    /// An empty `name` subscribes to every signal via the `data.Signal*` wildcard pattern.
3170    ///
3171    /// # Panics
3172    ///
3173    /// Panics if the actor is not registered with a trader.
3174    pub fn subscribe_signal(&mut self, handler: ShareableMessageHandler, name: &str) {
3175        self.check_registered();
3176
3177        let pattern = get_signal_pattern(name);
3178        if self.topic_handlers.contains_key(&pattern) {
3179            log::warn!(
3180                "Actor {} attempted duplicate signal subscription to '{pattern}'",
3181                self.actor_id,
3182            );
3183            return;
3184        }
3185        self.topic_handlers.insert(pattern, handler.clone());
3186        msgbus::subscribe_any(pattern, handler, None);
3187    }
3188
3189    /// Helper method for registering quotes subscriptions from the trait.
3190    pub fn subscribe_quotes(
3191        &mut self,
3192        topic: MStr<Topic>,
3193        handler: TypedHandler<QuoteTick>,
3194        instrument_id: InstrumentId,
3195        client_id: Option<ClientId>,
3196        params: Option<Params>,
3197    ) {
3198        self.check_registered();
3199
3200        self.add_quote_subscription(topic, handler);
3201
3202        let command = SubscribeCommand::Quotes(SubscribeQuotes {
3203            instrument_id,
3204            client_id,
3205            venue: Some(instrument_id.venue),
3206            command_id: UUID4::new(),
3207            ts_init: self.timestamp_ns(),
3208            correlation_id: None,
3209            params,
3210        });
3211
3212        self.send_data_cmd(DataCommand::Subscribe(command));
3213    }
3214
3215    /// Helper method for registering instruments subscriptions from the trait.
3216    pub fn subscribe_instruments(
3217        &mut self,
3218        pattern: MStr<Pattern>,
3219        handler: ShareableMessageHandler,
3220        venue: Venue,
3221        client_id: Option<ClientId>,
3222        params: Option<Params>,
3223    ) {
3224        self.check_registered();
3225
3226        self.add_instrument_subscription(pattern, handler);
3227
3228        let command = SubscribeCommand::Instruments(SubscribeInstruments {
3229            client_id,
3230            venue,
3231            command_id: UUID4::new(),
3232            ts_init: self.timestamp_ns(),
3233            correlation_id: None,
3234            params,
3235        });
3236
3237        self.send_data_cmd(DataCommand::Subscribe(command));
3238    }
3239
3240    /// Helper method for registering instrument subscriptions from the trait.
3241    pub fn subscribe_instrument(
3242        &mut self,
3243        topic: MStr<Topic>,
3244        handler: ShareableMessageHandler,
3245        instrument_id: InstrumentId,
3246        client_id: Option<ClientId>,
3247        params: Option<Params>,
3248    ) {
3249        self.check_registered();
3250
3251        self.add_instrument_subscription(topic.into(), handler);
3252
3253        let command = SubscribeCommand::Instrument(SubscribeInstrument {
3254            instrument_id,
3255            client_id,
3256            venue: Some(instrument_id.venue),
3257            command_id: UUID4::new(),
3258            ts_init: self.timestamp_ns(),
3259            correlation_id: None,
3260            params,
3261        });
3262
3263        self.send_data_cmd(DataCommand::Subscribe(command));
3264    }
3265
3266    /// Helper method for registering book deltas subscriptions from the trait.
3267    #[expect(clippy::too_many_arguments)]
3268    pub fn subscribe_book_deltas(
3269        &mut self,
3270        topic: MStr<Topic>,
3271        handler: TypedHandler<OrderBookDeltas>,
3272        instrument_id: InstrumentId,
3273        book_type: BookType,
3274        depth: Option<NonZeroUsize>,
3275        client_id: Option<ClientId>,
3276        managed: bool,
3277        params: Option<Params>,
3278    ) {
3279        self.check_registered();
3280
3281        self.add_deltas_subscription(topic, handler);
3282
3283        let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
3284            instrument_id,
3285            book_type,
3286            client_id,
3287            venue: Some(instrument_id.venue),
3288            command_id: UUID4::new(),
3289            ts_init: self.timestamp_ns(),
3290            depth,
3291            managed,
3292            correlation_id: None,
3293            params,
3294        });
3295
3296        self.send_data_cmd(DataCommand::Subscribe(command));
3297    }
3298
3299    /// Helper method for registering book snapshots subscriptions from the trait.
3300    #[expect(clippy::too_many_arguments)]
3301    pub fn subscribe_book_at_interval(
3302        &mut self,
3303        topic: MStr<Topic>,
3304        handler: TypedHandler<OrderBook>,
3305        instrument_id: InstrumentId,
3306        book_type: BookType,
3307        depth: Option<NonZeroUsize>,
3308        interval_ms: NonZeroUsize,
3309        client_id: Option<ClientId>,
3310        params: Option<Params>,
3311    ) {
3312        self.check_registered();
3313
3314        self.add_book_snapshot_subscription(topic, handler);
3315
3316        let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
3317            instrument_id,
3318            book_type,
3319            client_id,
3320            venue: Some(instrument_id.venue),
3321            command_id: UUID4::new(),
3322            ts_init: self.timestamp_ns(),
3323            depth,
3324            interval_ms,
3325            correlation_id: None,
3326            params,
3327        });
3328
3329        self.send_data_cmd(DataCommand::Subscribe(command));
3330    }
3331
3332    /// Helper method for registering trades subscriptions from the trait.
3333    pub fn subscribe_trades(
3334        &mut self,
3335        topic: MStr<Topic>,
3336        handler: TypedHandler<TradeTick>,
3337        instrument_id: InstrumentId,
3338        client_id: Option<ClientId>,
3339        params: Option<Params>,
3340    ) {
3341        self.check_registered();
3342
3343        self.add_trade_subscription(topic, handler);
3344
3345        let command = SubscribeCommand::Trades(SubscribeTrades {
3346            instrument_id,
3347            client_id,
3348            venue: Some(instrument_id.venue),
3349            command_id: UUID4::new(),
3350            ts_init: self.timestamp_ns(),
3351            correlation_id: None,
3352            params,
3353        });
3354
3355        self.send_data_cmd(DataCommand::Subscribe(command));
3356    }
3357
3358    /// Helper method for registering bars subscriptions from the trait.
3359    pub fn subscribe_bars(
3360        &mut self,
3361        topic: MStr<Topic>,
3362        handler: TypedHandler<Bar>,
3363        bar_type: BarType,
3364        client_id: Option<ClientId>,
3365        params: Option<Params>,
3366    ) {
3367        self.check_registered();
3368
3369        self.add_bar_subscription(topic, handler);
3370
3371        let command = SubscribeCommand::Bars(SubscribeBars {
3372            bar_type,
3373            client_id,
3374            venue: Some(bar_type.instrument_id().venue),
3375            command_id: UUID4::new(),
3376            ts_init: self.timestamp_ns(),
3377            correlation_id: None,
3378            params,
3379        });
3380
3381        self.send_data_cmd(DataCommand::Subscribe(command));
3382    }
3383
3384    /// Helper method for registering mark prices subscriptions from the trait.
3385    pub fn subscribe_mark_prices(
3386        &mut self,
3387        topic: MStr<Topic>,
3388        handler: TypedHandler<MarkPriceUpdate>,
3389        instrument_id: InstrumentId,
3390        client_id: Option<ClientId>,
3391        params: Option<Params>,
3392    ) {
3393        self.check_registered();
3394
3395        self.add_mark_price_subscription(topic, handler);
3396
3397        let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
3398            instrument_id,
3399            client_id,
3400            venue: Some(instrument_id.venue),
3401            command_id: UUID4::new(),
3402            ts_init: self.timestamp_ns(),
3403            correlation_id: None,
3404            params,
3405        });
3406
3407        self.send_data_cmd(DataCommand::Subscribe(command));
3408    }
3409
3410    /// Helper method for registering index prices subscriptions from the trait.
3411    pub fn subscribe_index_prices(
3412        &mut self,
3413        topic: MStr<Topic>,
3414        handler: TypedHandler<IndexPriceUpdate>,
3415        instrument_id: InstrumentId,
3416        client_id: Option<ClientId>,
3417        params: Option<Params>,
3418    ) {
3419        self.check_registered();
3420
3421        self.add_index_price_subscription(topic, handler);
3422
3423        let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
3424            instrument_id,
3425            client_id,
3426            venue: Some(instrument_id.venue),
3427            command_id: UUID4::new(),
3428            ts_init: self.timestamp_ns(),
3429            correlation_id: None,
3430            params,
3431        });
3432
3433        self.send_data_cmd(DataCommand::Subscribe(command));
3434    }
3435
3436    /// Helper method for registering funding rates subscriptions from the trait.
3437    pub fn subscribe_funding_rates(
3438        &mut self,
3439        topic: MStr<Topic>,
3440        handler: TypedHandler<FundingRateUpdate>,
3441        instrument_id: InstrumentId,
3442        client_id: Option<ClientId>,
3443        params: Option<Params>,
3444    ) {
3445        self.check_registered();
3446
3447        self.add_funding_rate_subscription(topic, handler);
3448
3449        let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
3450            instrument_id,
3451            client_id,
3452            venue: Some(instrument_id.venue),
3453            command_id: UUID4::new(),
3454            ts_init: self.timestamp_ns(),
3455            correlation_id: None,
3456            params,
3457        });
3458
3459        self.send_data_cmd(DataCommand::Subscribe(command));
3460    }
3461
3462    /// Helper method for registering option greeks subscriptions from the trait.
3463    pub fn subscribe_option_greeks(
3464        &mut self,
3465        topic: MStr<Topic>,
3466        handler: TypedHandler<OptionGreeks>,
3467        instrument_id: InstrumentId,
3468        client_id: Option<ClientId>,
3469        params: Option<Params>,
3470    ) {
3471        self.check_registered();
3472
3473        self.add_option_greeks_subscription(topic, handler);
3474
3475        let command = SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
3476            instrument_id,
3477            client_id,
3478            venue: Some(instrument_id.venue),
3479            command_id: UUID4::new(),
3480            ts_init: self.timestamp_ns(),
3481            correlation_id: None,
3482            params,
3483        });
3484
3485        self.send_data_cmd(DataCommand::Subscribe(command));
3486    }
3487
3488    /// Helper method for registering instrument status subscriptions from the trait.
3489    pub fn subscribe_instrument_status(
3490        &mut self,
3491        topic: MStr<Topic>,
3492        handler: ShareableMessageHandler,
3493        instrument_id: InstrumentId,
3494        client_id: Option<ClientId>,
3495        params: Option<Params>,
3496    ) {
3497        self.check_registered();
3498
3499        self.add_subscription_any(topic, handler);
3500
3501        let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
3502            instrument_id,
3503            client_id,
3504            venue: Some(instrument_id.venue),
3505            command_id: UUID4::new(),
3506            ts_init: self.timestamp_ns(),
3507            correlation_id: None,
3508            params,
3509        });
3510
3511        self.send_data_cmd(DataCommand::Subscribe(command));
3512    }
3513
3514    /// Helper method for registering instrument close subscriptions from the trait.
3515    pub fn subscribe_instrument_close(
3516        &mut self,
3517        topic: MStr<Topic>,
3518        handler: ShareableMessageHandler,
3519        instrument_id: InstrumentId,
3520        client_id: Option<ClientId>,
3521        params: Option<Params>,
3522    ) {
3523        self.check_registered();
3524
3525        self.add_instrument_close_subscription(topic, handler);
3526
3527        let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
3528            instrument_id,
3529            client_id,
3530            venue: Some(instrument_id.venue),
3531            command_id: UUID4::new(),
3532            ts_init: self.timestamp_ns(),
3533            correlation_id: None,
3534            params,
3535        });
3536
3537        self.send_data_cmd(DataCommand::Subscribe(command));
3538    }
3539
3540    /// Helper method for subscribing to option chain snapshots from the trait.
3541    #[allow(clippy::too_many_arguments)]
3542    pub fn subscribe_option_chain(
3543        &mut self,
3544        topic: MStr<Topic>,
3545        handler: TypedHandler<OptionChainSlice>,
3546        series_id: OptionSeriesId,
3547        strike_range: StrikeRange,
3548        snapshot_interval_ms: Option<u64>,
3549        client_id: Option<ClientId>,
3550        params: Option<Params>,
3551    ) {
3552        self.check_registered();
3553
3554        self.add_option_chain_subscription(topic, handler);
3555
3556        let command = SubscribeCommand::OptionChain(SubscribeOptionChain::new(
3557            series_id,
3558            strike_range,
3559            snapshot_interval_ms,
3560            UUID4::new(),
3561            self.timestamp_ns(),
3562            client_id,
3563            Some(series_id.venue),
3564            params,
3565        ));
3566
3567        self.send_data_cmd(DataCommand::Subscribe(command));
3568    }
3569
3570    /// Helper method for registering order fills subscriptions from the trait.
3571    pub fn subscribe_order_fills(
3572        &mut self,
3573        topic: MStr<Topic>,
3574        handler: TypedHandler<OrderEventAny>,
3575    ) {
3576        self.check_registered();
3577        self.add_order_event_subscription(topic, handler);
3578    }
3579
3580    /// Helper method for registering order cancels subscriptions from the trait.
3581    pub fn subscribe_order_cancels(
3582        &mut self,
3583        topic: MStr<Topic>,
3584        handler: TypedHandler<OrderEventAny>,
3585    ) {
3586        self.check_registered();
3587        self.add_order_event_subscription(topic, handler);
3588    }
3589
3590    /// Helper method for unsubscribing from data.
3591    pub fn unsubscribe_data(
3592        &mut self,
3593        data_type: DataType,
3594        client_id: Option<ClientId>,
3595        params: Option<Params>,
3596    ) {
3597        self.check_registered();
3598
3599        let topic = get_custom_topic(&data_type);
3600        self.remove_subscription_any(topic);
3601
3602        if client_id.is_none() {
3603            return;
3604        }
3605
3606        let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
3607            data_type,
3608            client_id,
3609            venue: None,
3610            command_id: UUID4::new(),
3611            ts_init: self.timestamp_ns(),
3612            correlation_id: None,
3613            params,
3614        });
3615
3616        self.send_data_cmd(DataCommand::Unsubscribe(command));
3617    }
3618
3619    /// Helper method for unsubscribing from signals.
3620    ///
3621    /// # Panics
3622    ///
3623    /// Panics if the actor is not registered with a trader.
3624    pub fn unsubscribe_signal(&mut self, name: &str) {
3625        self.check_registered();
3626
3627        let pattern = get_signal_pattern(name);
3628        if let Some(handler) = self.topic_handlers.remove(&pattern) {
3629            msgbus::unsubscribe_any(pattern, &handler);
3630        } else {
3631            log::warn!(
3632                "Actor {} attempted to unsubscribe from signal pattern '{pattern}' when not subscribed",
3633                self.actor_id,
3634            );
3635        }
3636    }
3637
3638    /// Helper method for unsubscribing from instruments.
3639    pub fn unsubscribe_instruments(
3640        &mut self,
3641        venue: Venue,
3642        client_id: Option<ClientId>,
3643        params: Option<Params>,
3644    ) {
3645        self.check_registered();
3646
3647        let pattern = get_instruments_pattern(venue);
3648        self.remove_instrument_subscription(pattern);
3649
3650        let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
3651            client_id,
3652            venue,
3653            command_id: UUID4::new(),
3654            ts_init: self.timestamp_ns(),
3655            correlation_id: None,
3656            params,
3657        });
3658
3659        self.send_data_cmd(DataCommand::Unsubscribe(command));
3660    }
3661
3662    /// Helper method for unsubscribing from instrument.
3663    pub fn unsubscribe_instrument(
3664        &mut self,
3665        instrument_id: InstrumentId,
3666        client_id: Option<ClientId>,
3667        params: Option<Params>,
3668    ) {
3669        self.check_registered();
3670
3671        let topic = get_instrument_topic(instrument_id);
3672        self.remove_instrument_subscription(topic.into());
3673
3674        let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
3675            instrument_id,
3676            client_id,
3677            venue: Some(instrument_id.venue),
3678            command_id: UUID4::new(),
3679            ts_init: self.timestamp_ns(),
3680            correlation_id: None,
3681            params,
3682        });
3683
3684        self.send_data_cmd(DataCommand::Unsubscribe(command));
3685    }
3686
3687    /// Helper method for unsubscribing from book deltas.
3688    pub fn unsubscribe_book_deltas(
3689        &mut self,
3690        instrument_id: InstrumentId,
3691        client_id: Option<ClientId>,
3692        params: Option<Params>,
3693    ) {
3694        self.check_registered();
3695
3696        let topic = get_book_deltas_topic(instrument_id);
3697        self.remove_deltas_subscription(topic);
3698
3699        let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
3700            instrument_id,
3701            client_id,
3702            venue: Some(instrument_id.venue),
3703            command_id: UUID4::new(),
3704            ts_init: self.timestamp_ns(),
3705            correlation_id: None,
3706            params,
3707        });
3708
3709        self.send_data_cmd(DataCommand::Unsubscribe(command));
3710    }
3711
3712    /// Helper method for unsubscribing from book snapshots at interval.
3713    pub fn unsubscribe_book_at_interval(
3714        &mut self,
3715        instrument_id: InstrumentId,
3716        interval_ms: NonZeroUsize,
3717        client_id: Option<ClientId>,
3718        params: Option<Params>,
3719    ) {
3720        self.check_registered();
3721
3722        let topic = get_book_snapshots_topic(instrument_id, interval_ms);
3723        self.remove_book_snapshot_subscription(topic);
3724
3725        let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
3726            instrument_id,
3727            interval_ms,
3728            client_id,
3729            venue: Some(instrument_id.venue),
3730            command_id: UUID4::new(),
3731            ts_init: self.timestamp_ns(),
3732            correlation_id: None,
3733            params,
3734        });
3735
3736        self.send_data_cmd(DataCommand::Unsubscribe(command));
3737    }
3738
3739    /// Helper method for unsubscribing from quotes.
3740    pub fn unsubscribe_quotes(
3741        &mut self,
3742        instrument_id: InstrumentId,
3743        client_id: Option<ClientId>,
3744        params: Option<Params>,
3745    ) {
3746        self.check_registered();
3747
3748        let topic = get_quotes_topic(instrument_id);
3749        self.remove_quote_subscription(topic);
3750
3751        let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
3752            instrument_id,
3753            client_id,
3754            venue: Some(instrument_id.venue),
3755            command_id: UUID4::new(),
3756            ts_init: self.timestamp_ns(),
3757            correlation_id: None,
3758            params,
3759        });
3760
3761        self.send_data_cmd(DataCommand::Unsubscribe(command));
3762    }
3763
3764    /// Helper method for unsubscribing from trades.
3765    pub fn unsubscribe_trades(
3766        &mut self,
3767        instrument_id: InstrumentId,
3768        client_id: Option<ClientId>,
3769        params: Option<Params>,
3770    ) {
3771        self.check_registered();
3772
3773        let topic = get_trades_topic(instrument_id);
3774        self.remove_trade_subscription(topic);
3775
3776        let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
3777            instrument_id,
3778            client_id,
3779            venue: Some(instrument_id.venue),
3780            command_id: UUID4::new(),
3781            ts_init: self.timestamp_ns(),
3782            correlation_id: None,
3783            params,
3784        });
3785
3786        self.send_data_cmd(DataCommand::Unsubscribe(command));
3787    }
3788
3789    /// Helper method for unsubscribing from bars.
3790    pub fn unsubscribe_bars(
3791        &mut self,
3792        bar_type: BarType,
3793        client_id: Option<ClientId>,
3794        params: Option<Params>,
3795    ) {
3796        self.check_registered();
3797
3798        let topic = get_bars_topic(bar_type);
3799        self.remove_bar_subscription(topic);
3800
3801        let command = UnsubscribeCommand::Bars(UnsubscribeBars {
3802            bar_type,
3803            client_id,
3804            venue: Some(bar_type.instrument_id().venue),
3805            command_id: UUID4::new(),
3806            ts_init: self.timestamp_ns(),
3807            correlation_id: None,
3808            params,
3809        });
3810
3811        self.send_data_cmd(DataCommand::Unsubscribe(command));
3812    }
3813
3814    /// Helper method for unsubscribing from mark prices.
3815    pub fn unsubscribe_mark_prices(
3816        &mut self,
3817        instrument_id: InstrumentId,
3818        client_id: Option<ClientId>,
3819        params: Option<Params>,
3820    ) {
3821        self.check_registered();
3822
3823        let topic = get_mark_price_topic(instrument_id);
3824        self.remove_mark_price_subscription(topic);
3825
3826        let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
3827            instrument_id,
3828            client_id,
3829            venue: Some(instrument_id.venue),
3830            command_id: UUID4::new(),
3831            ts_init: self.timestamp_ns(),
3832            correlation_id: None,
3833            params,
3834        });
3835
3836        self.send_data_cmd(DataCommand::Unsubscribe(command));
3837    }
3838
3839    /// Helper method for unsubscribing from index prices.
3840    pub fn unsubscribe_index_prices(
3841        &mut self,
3842        instrument_id: InstrumentId,
3843        client_id: Option<ClientId>,
3844        params: Option<Params>,
3845    ) {
3846        self.check_registered();
3847
3848        let topic = get_index_price_topic(instrument_id);
3849        self.remove_index_price_subscription(topic);
3850
3851        let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
3852            instrument_id,
3853            client_id,
3854            venue: Some(instrument_id.venue),
3855            command_id: UUID4::new(),
3856            ts_init: self.timestamp_ns(),
3857            correlation_id: None,
3858            params,
3859        });
3860
3861        self.send_data_cmd(DataCommand::Unsubscribe(command));
3862    }
3863
3864    /// Helper method for unsubscribing from funding rates.
3865    pub fn unsubscribe_funding_rates(
3866        &mut self,
3867        instrument_id: InstrumentId,
3868        client_id: Option<ClientId>,
3869        params: Option<Params>,
3870    ) {
3871        self.check_registered();
3872
3873        let topic = get_funding_rate_topic(instrument_id);
3874        self.remove_funding_rate_subscription(topic);
3875
3876        let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
3877            instrument_id,
3878            client_id,
3879            venue: Some(instrument_id.venue),
3880            command_id: UUID4::new(),
3881            ts_init: self.timestamp_ns(),
3882            correlation_id: None,
3883            params,
3884        });
3885
3886        self.send_data_cmd(DataCommand::Unsubscribe(command));
3887    }
3888
3889    /// Helper method for unsubscribing from option greeks.
3890    pub fn unsubscribe_option_greeks(
3891        &mut self,
3892        instrument_id: InstrumentId,
3893        client_id: Option<ClientId>,
3894        params: Option<Params>,
3895    ) {
3896        self.check_registered();
3897
3898        let topic = get_option_greeks_topic(instrument_id);
3899        self.remove_option_greeks_subscription(topic);
3900
3901        let command = UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
3902            instrument_id,
3903            client_id,
3904            venue: Some(instrument_id.venue),
3905            command_id: UUID4::new(),
3906            ts_init: self.timestamp_ns(),
3907            correlation_id: None,
3908            params,
3909        });
3910
3911        self.send_data_cmd(DataCommand::Unsubscribe(command));
3912    }
3913
3914    /// Helper method for unsubscribing from instrument status.
3915    pub fn unsubscribe_instrument_status(
3916        &mut self,
3917        instrument_id: InstrumentId,
3918        client_id: Option<ClientId>,
3919        params: Option<Params>,
3920    ) {
3921        self.check_registered();
3922
3923        let topic = get_instrument_status_topic(instrument_id);
3924        self.remove_subscription_any(topic);
3925
3926        let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
3927            instrument_id,
3928            client_id,
3929            venue: Some(instrument_id.venue),
3930            command_id: UUID4::new(),
3931            ts_init: self.timestamp_ns(),
3932            correlation_id: None,
3933            params,
3934        });
3935
3936        self.send_data_cmd(DataCommand::Unsubscribe(command));
3937    }
3938
3939    /// Helper method for unsubscribing from instrument close.
3940    pub fn unsubscribe_instrument_close(
3941        &mut self,
3942        instrument_id: InstrumentId,
3943        client_id: Option<ClientId>,
3944        params: Option<Params>,
3945    ) {
3946        self.check_registered();
3947
3948        let topic = get_instrument_close_topic(instrument_id);
3949        self.remove_instrument_close_subscription(topic);
3950
3951        let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
3952            instrument_id,
3953            client_id,
3954            venue: Some(instrument_id.venue),
3955            command_id: UUID4::new(),
3956            ts_init: self.timestamp_ns(),
3957            correlation_id: None,
3958            params,
3959        });
3960
3961        self.send_data_cmd(DataCommand::Unsubscribe(command));
3962    }
3963
3964    /// Helper method for unsubscribing from option chain snapshots.
3965    pub fn unsubscribe_option_chain(
3966        &mut self,
3967        series_id: OptionSeriesId,
3968        client_id: Option<ClientId>,
3969    ) {
3970        self.check_registered();
3971
3972        let topic = get_option_chain_topic(series_id);
3973        self.remove_option_chain_subscription(topic);
3974
3975        let command = UnsubscribeCommand::OptionChain(UnsubscribeOptionChain::new(
3976            series_id,
3977            UUID4::new(),
3978            self.timestamp_ns(),
3979            client_id,
3980            Some(series_id.venue),
3981        ));
3982
3983        self.send_data_cmd(DataCommand::Unsubscribe(command));
3984    }
3985
3986    /// Helper method for unsubscribing from order fills.
3987    pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
3988        self.check_registered();
3989
3990        let topic = get_order_fills_topic(instrument_id);
3991        self.remove_order_event_subscription(topic);
3992    }
3993
3994    /// Helper method for unsubscribing from order cancels.
3995    pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
3996        self.check_registered();
3997
3998        let topic = get_order_cancels_topic(instrument_id);
3999        self.remove_order_event_subscription(topic);
4000    }
4001
4002    /// Helper method for requesting data.
4003    ///
4004    /// # Errors
4005    ///
4006    /// Returns an error if input parameters are invalid.
4007    #[expect(clippy::too_many_arguments)]
4008    pub fn request_data(
4009        &self,
4010        data_type: DataType,
4011        client_id: ClientId,
4012        start: Option<DateTime<Utc>>,
4013        end: Option<DateTime<Utc>>,
4014        limit: Option<NonZeroUsize>,
4015        params: Option<Params>,
4016        handler: ShareableMessageHandler,
4017    ) -> anyhow::Result<UUID4> {
4018        self.check_registered();
4019
4020        let now = self.clock_ref().utc_now();
4021        check_timestamps(now, start, end)?;
4022
4023        let request_id = UUID4::new();
4024        let command = RequestCommand::Data(RequestCustomData {
4025            client_id,
4026            data_type,
4027            start,
4028            end,
4029            limit,
4030            request_id,
4031            ts_init: self.timestamp_ns(),
4032            params,
4033        });
4034
4035        get_message_bus()
4036            .borrow_mut()
4037            .register_response_handler(command.request_id(), handler)?;
4038
4039        self.send_data_cmd(DataCommand::Request(command));
4040
4041        Ok(request_id)
4042    }
4043
4044    /// Helper method for requesting instrument.
4045    ///
4046    /// # Errors
4047    ///
4048    /// Returns an error if input parameters are invalid.
4049    pub fn request_instrument(
4050        &self,
4051        instrument_id: InstrumentId,
4052        start: Option<DateTime<Utc>>,
4053        end: Option<DateTime<Utc>>,
4054        client_id: Option<ClientId>,
4055        params: Option<Params>,
4056        handler: ShareableMessageHandler,
4057    ) -> anyhow::Result<UUID4> {
4058        self.check_registered();
4059
4060        let now = self.clock_ref().utc_now();
4061        check_timestamps(now, start, end)?;
4062
4063        let request_id = UUID4::new();
4064        let command = RequestCommand::Instrument(RequestInstrument {
4065            instrument_id,
4066            start,
4067            end,
4068            client_id,
4069            request_id,
4070            ts_init: now.into(),
4071            params,
4072        });
4073
4074        get_message_bus()
4075            .borrow_mut()
4076            .register_response_handler(command.request_id(), handler)?;
4077
4078        self.send_data_cmd(DataCommand::Request(command));
4079
4080        Ok(request_id)
4081    }
4082
4083    /// Helper method for requesting instruments.
4084    ///
4085    /// # Errors
4086    ///
4087    /// Returns an error if input parameters are invalid.
4088    pub fn request_instruments(
4089        &self,
4090        venue: Option<Venue>,
4091        start: Option<DateTime<Utc>>,
4092        end: Option<DateTime<Utc>>,
4093        client_id: Option<ClientId>,
4094        params: Option<Params>,
4095        handler: ShareableMessageHandler,
4096    ) -> anyhow::Result<UUID4> {
4097        self.check_registered();
4098
4099        let now = self.clock_ref().utc_now();
4100        check_timestamps(now, start, end)?;
4101
4102        let request_id = UUID4::new();
4103        let command = RequestCommand::Instruments(RequestInstruments {
4104            venue,
4105            start,
4106            end,
4107            client_id,
4108            request_id,
4109            ts_init: now.into(),
4110            params,
4111        });
4112
4113        get_message_bus()
4114            .borrow_mut()
4115            .register_response_handler(command.request_id(), handler)?;
4116
4117        self.send_data_cmd(DataCommand::Request(command));
4118
4119        Ok(request_id)
4120    }
4121
4122    /// Helper method for requesting book snapshot.
4123    ///
4124    /// # Errors
4125    ///
4126    /// Returns an error if input parameters are invalid.
4127    pub fn request_book_snapshot(
4128        &self,
4129        instrument_id: InstrumentId,
4130        depth: Option<NonZeroUsize>,
4131        client_id: Option<ClientId>,
4132        params: Option<Params>,
4133        handler: ShareableMessageHandler,
4134    ) -> anyhow::Result<UUID4> {
4135        self.check_registered();
4136
4137        let request_id = UUID4::new();
4138        let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
4139            instrument_id,
4140            depth,
4141            client_id,
4142            request_id,
4143            ts_init: self.timestamp_ns(),
4144            params,
4145        });
4146
4147        get_message_bus()
4148            .borrow_mut()
4149            .register_response_handler(command.request_id(), handler)?;
4150
4151        self.send_data_cmd(DataCommand::Request(command));
4152
4153        Ok(request_id)
4154    }
4155
4156    /// Helper method for requesting quotes.
4157    ///
4158    /// # Errors
4159    ///
4160    /// Returns an error if input parameters are invalid.
4161    #[expect(clippy::too_many_arguments)]
4162    pub fn request_quotes(
4163        &self,
4164        instrument_id: InstrumentId,
4165        start: Option<DateTime<Utc>>,
4166        end: Option<DateTime<Utc>>,
4167        limit: Option<NonZeroUsize>,
4168        client_id: Option<ClientId>,
4169        params: Option<Params>,
4170        handler: ShareableMessageHandler,
4171    ) -> anyhow::Result<UUID4> {
4172        self.check_registered();
4173
4174        let now = self.clock_ref().utc_now();
4175        check_timestamps(now, start, end)?;
4176
4177        let request_id = UUID4::new();
4178        let command = RequestCommand::Quotes(RequestQuotes {
4179            instrument_id,
4180            start,
4181            end,
4182            limit,
4183            client_id,
4184            request_id,
4185            ts_init: now.into(),
4186            params,
4187        });
4188
4189        get_message_bus()
4190            .borrow_mut()
4191            .register_response_handler(command.request_id(), handler)?;
4192
4193        self.send_data_cmd(DataCommand::Request(command));
4194
4195        Ok(request_id)
4196    }
4197
4198    /// Helper method for requesting trades.
4199    ///
4200    /// # Errors
4201    ///
4202    /// Returns an error if input parameters are invalid.
4203    #[expect(clippy::too_many_arguments)]
4204    pub fn request_trades(
4205        &self,
4206        instrument_id: InstrumentId,
4207        start: Option<DateTime<Utc>>,
4208        end: Option<DateTime<Utc>>,
4209        limit: Option<NonZeroUsize>,
4210        client_id: Option<ClientId>,
4211        params: Option<Params>,
4212        handler: ShareableMessageHandler,
4213    ) -> anyhow::Result<UUID4> {
4214        self.check_registered();
4215
4216        let now = self.clock_ref().utc_now();
4217        check_timestamps(now, start, end)?;
4218
4219        let request_id = UUID4::new();
4220        let command = RequestCommand::Trades(RequestTrades {
4221            instrument_id,
4222            start,
4223            end,
4224            limit,
4225            client_id,
4226            request_id,
4227            ts_init: now.into(),
4228            params,
4229        });
4230
4231        get_message_bus()
4232            .borrow_mut()
4233            .register_response_handler(command.request_id(), handler)?;
4234
4235        self.send_data_cmd(DataCommand::Request(command));
4236
4237        Ok(request_id)
4238    }
4239
4240    /// Helper method for requesting funding rates.
4241    ///
4242    /// # Errors
4243    ///
4244    /// Returns an error if input parameters are invalid.
4245    #[expect(clippy::too_many_arguments)]
4246    pub fn request_funding_rates(
4247        &self,
4248        instrument_id: InstrumentId,
4249        start: Option<DateTime<Utc>>,
4250        end: Option<DateTime<Utc>>,
4251        limit: Option<NonZeroUsize>,
4252        client_id: Option<ClientId>,
4253        params: Option<Params>,
4254        handler: ShareableMessageHandler,
4255    ) -> anyhow::Result<UUID4> {
4256        self.check_registered();
4257
4258        let now = self.clock_ref().utc_now();
4259        check_timestamps(now, start, end)?;
4260
4261        let request_id = UUID4::new();
4262        let command = RequestCommand::FundingRates(RequestFundingRates {
4263            instrument_id,
4264            start,
4265            end,
4266            limit,
4267            client_id,
4268            request_id,
4269            ts_init: now.into(),
4270            params,
4271        });
4272
4273        get_message_bus()
4274            .borrow_mut()
4275            .register_response_handler(command.request_id(), handler)?;
4276
4277        self.send_data_cmd(DataCommand::Request(command));
4278
4279        Ok(request_id)
4280    }
4281
4282    /// Helper method for requesting bars.
4283    ///
4284    /// # Errors
4285    ///
4286    /// Returns an error if input parameters are invalid.
4287    #[expect(clippy::too_many_arguments)]
4288    pub fn request_bars(
4289        &self,
4290        bar_type: BarType,
4291        start: Option<DateTime<Utc>>,
4292        end: Option<DateTime<Utc>>,
4293        limit: Option<NonZeroUsize>,
4294        client_id: Option<ClientId>,
4295        params: Option<Params>,
4296        handler: ShareableMessageHandler,
4297    ) -> anyhow::Result<UUID4> {
4298        self.check_registered();
4299
4300        let now = self.clock_ref().utc_now();
4301        check_timestamps(now, start, end)?;
4302
4303        let request_id = UUID4::new();
4304        let command = RequestCommand::Bars(RequestBars {
4305            bar_type,
4306            start,
4307            end,
4308            limit,
4309            client_id,
4310            request_id,
4311            ts_init: now.into(),
4312            params,
4313        });
4314
4315        get_message_bus()
4316            .borrow_mut()
4317            .register_response_handler(command.request_id(), handler)?;
4318
4319        self.send_data_cmd(DataCommand::Request(command));
4320
4321        Ok(request_id)
4322    }
4323
4324    #[cfg(test)]
4325    pub fn quote_handler_count(&self) -> usize {
4326        self.quote_handlers.len()
4327    }
4328
4329    #[cfg(test)]
4330    pub fn trade_handler_count(&self) -> usize {
4331        self.trade_handlers.len()
4332    }
4333
4334    #[cfg(test)]
4335    pub fn bar_handler_count(&self) -> usize {
4336        self.bar_handlers.len()
4337    }
4338
4339    #[cfg(test)]
4340    pub fn deltas_handler_count(&self) -> usize {
4341        self.deltas_handlers.len()
4342    }
4343
4344    #[cfg(test)]
4345    pub fn has_quote_handler(&self, topic: &str) -> bool {
4346        self.quote_handlers
4347            .contains_key(&MStr::<Topic>::from(topic))
4348    }
4349
4350    #[cfg(test)]
4351    pub fn has_trade_handler(&self, topic: &str) -> bool {
4352        self.trade_handlers
4353            .contains_key(&MStr::<Topic>::from(topic))
4354    }
4355
4356    #[cfg(test)]
4357    pub fn has_bar_handler(&self, topic: &str) -> bool {
4358        self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
4359    }
4360
4361    #[cfg(test)]
4362    pub fn has_deltas_handler(&self, topic: &str) -> bool {
4363        self.deltas_handlers
4364            .contains_key(&MStr::<Topic>::from(topic))
4365    }
4366}
4367
4368fn check_timestamps(
4369    now: DateTime<Utc>,
4370    start: Option<DateTime<Utc>>,
4371    end: Option<DateTime<Utc>>,
4372) -> anyhow::Result<()> {
4373    if let Some(start) = start {
4374        check_predicate_true(start <= now, "start was > now")?;
4375    }
4376
4377    if let Some(end) = end {
4378        check_predicate_true(end <= now, "end was > now")?;
4379    }
4380
4381    if let (Some(start), Some(end)) = (start, end) {
4382        check_predicate_true(start < end, "start was >= end")?;
4383    }
4384
4385    Ok(())
4386}
4387
4388fn log_error(e: &anyhow::Error) {
4389    log::error!("{e}");
4390}
4391
4392fn log_not_running<T>(msg: &T)
4393where
4394    T: Debug,
4395{
4396    log::trace!("Received message when not running - skipping {msg:?}");
4397}
4398
4399fn log_received<T>(msg: &T)
4400where
4401    T: Debug,
4402{
4403    log::debug!("{RECV} {msg:?}");
4404}