1use std::{
17 any::Any,
18 cell::{Ref, RefCell, RefMut},
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroUsize,
22 ops::{Deref, DerefMut},
23 rc::Rc,
24 sync::Arc,
25};
26
27use ahash::{AHashMap, AHashSet};
28use chrono::{DateTime, Utc};
29use indexmap::IndexMap;
30use nautilus_core::{Params, UUID4, UnixNanos, correctness::check_predicate_true};
31#[cfg(feature = "defi")]
32use nautilus_model::defi::{
33 Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
34};
35use nautilus_model::{
36 data::{
37 Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
38 MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
39 close::InstrumentClose,
40 option_chain::{OptionChainSlice, OptionGreeks, StrikeRange},
41 },
42 enums::BookType,
43 events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
44 identifiers::{ActorId, ClientId, ComponentId, InstrumentId, OptionSeriesId, TraderId, Venue},
45 instruments::{InstrumentAny, SyntheticInstrument},
46 orderbook::OrderBook,
47};
48use serde::{Deserialize, Serialize};
49use ustr::Ustr;
50
51#[cfg(feature = "indicators")]
52use super::indicators::Indicators;
53use super::{
54 Actor,
55 registry::{get_actor_unchecked, try_get_actor_unchecked},
56};
57#[cfg(feature = "defi")]
58use crate::defi;
59#[cfg(feature = "defi")]
60#[allow(unused_imports)]
61use crate::defi::data_actor as _; use crate::{
63 cache::Cache,
64 clock::Clock,
65 component::Component,
66 enums::{ComponentState, ComponentTrigger},
67 logging::{CMD, RECV, REQ, SEND},
68 messages::{
69 data::{
70 BarsResponse, BookResponse, CustomDataResponse, DataCommand, FundingRatesResponse,
71 InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
72 RequestBookSnapshot, RequestCommand, RequestCustomData, RequestFundingRates,
73 RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
74 SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
75 SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
76 SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
77 SubscribeMarkPrices, SubscribeOptionChain, SubscribeOptionGreeks, SubscribeQuotes,
78 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
79 UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData,
80 UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
81 UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
82 UnsubscribeMarkPrices, UnsubscribeOptionChain, UnsubscribeOptionGreeks,
83 UnsubscribeQuotes, UnsubscribeTrades,
84 },
85 system::ShutdownSystem,
86 },
87 msgbus::{
88 self, MStr, Pattern, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
89 switchboard::{
90 MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
91 get_custom_topic, get_funding_rate_topic, get_index_price_topic,
92 get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
93 get_instruments_pattern, get_mark_price_topic, get_option_chain_topic,
94 get_option_greeks_topic, get_order_cancels_topic, get_order_fills_topic,
95 get_quotes_topic, get_signal_pattern, get_trades_topic,
96 },
97 },
98 signal::Signal,
99 timer::{TimeEvent, TimeEventCallback},
100};
101
102#[derive(Debug, Clone, Deserialize, Serialize)]
104#[serde(default, deny_unknown_fields)]
105#[cfg_attr(
106 feature = "python",
107 pyo3::pyclass(
108 module = "nautilus_trader.core.nautilus_pyo3.common",
109 subclass,
110 from_py_object
111 )
112)]
113#[cfg_attr(
114 feature = "python",
115 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
116)]
117pub struct DataActorConfig {
118 pub actor_id: Option<ActorId>,
120 pub log_events: bool,
122 pub log_commands: bool,
124}
125
126impl Default for DataActorConfig {
127 fn default() -> Self {
128 Self {
129 actor_id: None,
130 log_events: true,
131 log_commands: true,
132 }
133 }
134}
135
136#[derive(Debug, Clone, Deserialize, Serialize)]
138#[serde(deny_unknown_fields)]
139#[cfg_attr(
140 feature = "python",
141 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
142)]
143#[cfg_attr(
144 feature = "python",
145 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
146)]
147pub struct ImportableActorConfig {
148 pub actor_path: String,
150 pub config_path: String,
152 pub config: HashMap<String, serde_json::Value>,
154}
155
156type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
157
158pub trait DataActor:
159 Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
160{
161 fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
167 Ok(IndexMap::new())
168 }
169
170 #[allow(unused_variables)]
176 fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
177 Ok(())
178 }
179
180 fn on_start(&mut self) -> anyhow::Result<()> {
186 log::warn!(
187 "The `on_start` handler was called when not overridden, \
188 it's expected that any actions required when starting the actor \
189 occur here, such as subscribing/requesting data"
190 );
191 Ok(())
192 }
193
194 fn on_stop(&mut self) -> anyhow::Result<()> {
200 log::warn!(
201 "The `on_stop` handler was called when not overridden, \
202 it's expected that any actions required when stopping the actor \
203 occur here, such as unsubscribing from data",
204 );
205 Ok(())
206 }
207
208 fn on_resume(&mut self) -> anyhow::Result<()> {
214 log::warn!(
215 "The `on_resume` handler was called when not overridden, \
216 it's expected that any actions required when resuming the actor \
217 following a stop occur here"
218 );
219 Ok(())
220 }
221
222 fn on_reset(&mut self) -> anyhow::Result<()> {
228 log::warn!(
229 "The `on_reset` handler was called when not overridden, \
230 it's expected that any actions required when resetting the actor \
231 occur here, such as resetting indicators and other state"
232 );
233 Ok(())
234 }
235
236 fn on_dispose(&mut self) -> anyhow::Result<()> {
242 Ok(())
243 }
244
245 fn on_degrade(&mut self) -> anyhow::Result<()> {
251 Ok(())
252 }
253
254 fn on_fault(&mut self) -> anyhow::Result<()> {
260 Ok(())
261 }
262
263 #[allow(unused_variables)]
269 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
270 Ok(())
271 }
272
273 #[allow(unused_variables)]
279 fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
280 Ok(())
281 }
282
283 #[allow(unused_variables)]
289 fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
290 Ok(())
291 }
292
293 #[allow(unused_variables)]
299 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
300 Ok(())
301 }
302
303 #[allow(unused_variables)]
309 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
310 Ok(())
311 }
312
313 #[allow(unused_variables)]
319 fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
320 Ok(())
321 }
322
323 #[allow(unused_variables)]
329 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
330 Ok(())
331 }
332
333 #[allow(unused_variables)]
339 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
340 Ok(())
341 }
342
343 #[allow(unused_variables)]
349 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
350 Ok(())
351 }
352
353 #[allow(unused_variables)]
359 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
360 Ok(())
361 }
362
363 #[allow(unused_variables)]
369 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
370 Ok(())
371 }
372
373 #[allow(unused_variables)]
379 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
380 Ok(())
381 }
382
383 #[allow(unused_variables)]
389 fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
390 Ok(())
391 }
392
393 #[allow(unused_variables)]
399 fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
400 Ok(())
401 }
402
403 #[allow(unused_variables)]
409 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
410 Ok(())
411 }
412
413 #[allow(unused_variables)]
419 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
420 Ok(())
421 }
422
423 #[allow(unused_variables)]
429 fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
430 Ok(())
431 }
432
433 #[allow(unused_variables)]
439 fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
440 Ok(())
441 }
442
443 #[cfg(feature = "defi")]
444 #[allow(unused_variables)]
450 fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
451 Ok(())
452 }
453
454 #[cfg(feature = "defi")]
455 #[allow(unused_variables)]
461 fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
462 Ok(())
463 }
464
465 #[cfg(feature = "defi")]
466 #[allow(unused_variables)]
472 fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
473 Ok(())
474 }
475
476 #[cfg(feature = "defi")]
477 #[allow(unused_variables)]
483 fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
484 Ok(())
485 }
486
487 #[cfg(feature = "defi")]
488 #[allow(unused_variables)]
494 fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
495 Ok(())
496 }
497
498 #[cfg(feature = "defi")]
499 #[allow(unused_variables)]
505 fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
506 Ok(())
507 }
508
509 #[allow(unused_variables)]
515 fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
516 Ok(())
517 }
518
519 #[allow(unused_variables)]
525 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
526 Ok(())
527 }
528
529 #[allow(unused_variables)]
535 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
536 Ok(())
537 }
538
539 #[allow(unused_variables)]
545 fn on_historical_funding_rates(
546 &mut self,
547 funding_rates: &[FundingRateUpdate],
548 ) -> anyhow::Result<()> {
549 Ok(())
550 }
551
552 #[allow(unused_variables)]
558 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
559 Ok(())
560 }
561
562 #[allow(unused_variables)]
568 fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
569 Ok(())
570 }
571
572 #[allow(unused_variables)]
578 fn on_historical_index_prices(
579 &mut self,
580 index_prices: &[IndexPriceUpdate],
581 ) -> anyhow::Result<()> {
582 Ok(())
583 }
584
585 fn handle_time_event(&mut self, event: &TimeEvent) {
587 log_received(&event);
588
589 if self.not_running() {
590 log_not_running(&event);
591 return;
592 }
593
594 if let Err(e) = DataActor::on_time_event(self, event) {
595 log_error(&e);
596 }
597 }
598
599 fn handle_data(&mut self, data: &CustomData) {
601 log_received(&data);
602
603 if self.not_running() {
604 log_not_running(&data);
605 return;
606 }
607
608 if let Err(e) = self.on_data(data) {
609 log_error(&e);
610 }
611 }
612
613 fn handle_signal(&mut self, signal: &Signal) {
615 log_received(&signal);
616
617 if self.not_running() {
618 log_not_running(&signal);
619 return;
620 }
621
622 if let Err(e) = self.on_signal(signal) {
623 log_error(&e);
624 }
625 }
626
627 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
629 log_received(&instrument);
630
631 if self.not_running() {
632 log_not_running(&instrument);
633 return;
634 }
635
636 if let Err(e) = self.on_instrument(instrument) {
637 log_error(&e);
638 }
639 }
640
641 fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
643 log_received(&deltas);
644
645 if self.not_running() {
646 log_not_running(&deltas);
647 return;
648 }
649
650 if let Err(e) = self.on_book_deltas(deltas) {
651 log_error(&e);
652 }
653 }
654
655 fn handle_book(&mut self, book: &OrderBook) {
657 log_received(&book);
658
659 if self.not_running() {
660 log_not_running(&book);
661 return;
662 }
663
664 if let Err(e) = self.on_book(book) {
665 log_error(&e);
666 }
667 }
668
669 fn handle_quote(&mut self, quote: &QuoteTick) {
671 log_received("e);
672
673 if self.not_running() {
674 log_not_running("e);
675 return;
676 }
677
678 if let Err(e) = self.on_quote(quote) {
679 log_error(&e);
680 }
681 }
682
683 fn handle_trade(&mut self, trade: &TradeTick) {
685 log_received(&trade);
686
687 if self.not_running() {
688 log_not_running(&trade);
689 return;
690 }
691
692 if let Err(e) = self.on_trade(trade) {
693 log_error(&e);
694 }
695 }
696
697 fn handle_bar(&mut self, bar: &Bar) {
699 log_received(&bar);
700
701 if self.not_running() {
702 log_not_running(&bar);
703 return;
704 }
705
706 if let Err(e) = self.on_bar(bar) {
707 log_error(&e);
708 }
709 }
710
711 fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
713 log_received(&mark_price);
714
715 if self.not_running() {
716 log_not_running(&mark_price);
717 return;
718 }
719
720 if let Err(e) = self.on_mark_price(mark_price) {
721 log_error(&e);
722 }
723 }
724
725 fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
727 log_received(&index_price);
728
729 if self.not_running() {
730 log_not_running(&index_price);
731 return;
732 }
733
734 if let Err(e) = self.on_index_price(index_price) {
735 log_error(&e);
736 }
737 }
738
739 fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
741 log_received(&funding_rate);
742
743 if self.not_running() {
744 log_not_running(&funding_rate);
745 return;
746 }
747
748 if let Err(e) = self.on_funding_rate(funding_rate) {
749 log_error(&e);
750 }
751 }
752
753 fn handle_option_greeks(&mut self, greeks: &OptionGreeks) {
755 log_received(&greeks);
756
757 if self.not_running() {
758 log_not_running(&greeks);
759 return;
760 }
761
762 if let Err(e) = self.on_option_greeks(greeks) {
763 log_error(&e);
764 }
765 }
766
767 fn handle_option_chain(&mut self, slice: &OptionChainSlice) {
769 log_received(&slice);
770
771 if self.not_running() {
772 log_not_running(&slice);
773 return;
774 }
775
776 if let Err(e) = self.on_option_chain(slice) {
777 log_error(&e);
778 }
779 }
780
781 fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
783 log_received(&status);
784
785 if self.not_running() {
786 log_not_running(&status);
787 return;
788 }
789
790 if let Err(e) = self.on_instrument_status(status) {
791 log_error(&e);
792 }
793 }
794
795 fn handle_instrument_close(&mut self, close: &InstrumentClose) {
797 log_received(&close);
798
799 if self.not_running() {
800 log_not_running(&close);
801 return;
802 }
803
804 if let Err(e) = self.on_instrument_close(close) {
805 log_error(&e);
806 }
807 }
808
809 fn handle_order_filled(&mut self, event: &OrderFilled) {
811 log_received(&event);
812
813 if event.strategy_id.inner() == self.actor_id().inner() {
817 return;
818 }
819
820 if self.not_running() {
821 log_not_running(&event);
822 return;
823 }
824
825 if let Err(e) = self.on_order_filled(event) {
826 log_error(&e);
827 }
828 }
829
830 fn handle_order_canceled(&mut self, event: &OrderCanceled) {
832 log_received(&event);
833
834 if event.strategy_id.inner() == self.actor_id().inner() {
838 return;
839 }
840
841 if self.not_running() {
842 log_not_running(&event);
843 return;
844 }
845
846 if let Err(e) = self.on_order_canceled(event) {
847 log_error(&e);
848 }
849 }
850
851 #[cfg(feature = "defi")]
852 fn handle_block(&mut self, block: &Block) {
854 log_received(&block);
855
856 if self.not_running() {
857 log_not_running(&block);
858 return;
859 }
860
861 if let Err(e) = self.on_block(block) {
862 log_error(&e);
863 }
864 }
865
866 #[cfg(feature = "defi")]
867 fn handle_pool(&mut self, pool: &Pool) {
869 log_received(&pool);
870
871 if self.not_running() {
872 log_not_running(&pool);
873 return;
874 }
875
876 if let Err(e) = self.on_pool(pool) {
877 log_error(&e);
878 }
879 }
880
881 #[cfg(feature = "defi")]
882 fn handle_pool_swap(&mut self, swap: &PoolSwap) {
884 log_received(&swap);
885
886 if self.not_running() {
887 log_not_running(&swap);
888 return;
889 }
890
891 if let Err(e) = self.on_pool_swap(swap) {
892 log_error(&e);
893 }
894 }
895
896 #[cfg(feature = "defi")]
897 fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
899 log_received(&update);
900
901 if self.not_running() {
902 log_not_running(&update);
903 return;
904 }
905
906 if let Err(e) = self.on_pool_liquidity_update(update) {
907 log_error(&e);
908 }
909 }
910
911 #[cfg(feature = "defi")]
912 fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
914 log_received(&collect);
915
916 if self.not_running() {
917 log_not_running(&collect);
918 return;
919 }
920
921 if let Err(e) = self.on_pool_fee_collect(collect) {
922 log_error(&e);
923 }
924 }
925
926 #[cfg(feature = "defi")]
927 fn handle_pool_flash(&mut self, flash: &PoolFlash) {
929 log_received(&flash);
930
931 if self.not_running() {
932 log_not_running(&flash);
933 return;
934 }
935
936 if let Err(e) = self.on_pool_flash(flash) {
937 log_error(&e);
938 }
939 }
940
941 fn handle_historical_data(&mut self, data: &dyn Any) {
943 log_received(&data);
944
945 if let Err(e) = self.on_historical_data(data) {
946 log_error(&e);
947 }
948 }
949
950 fn handle_data_response(&mut self, resp: &CustomDataResponse) {
952 log_received(&resp);
953
954 if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
955 log_error(&e);
956 }
957 }
958
959 fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
961 log_received(&resp);
962
963 if let Err(e) = self.on_instrument(&resp.data) {
964 log_error(&e);
965 }
966 }
967
968 fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
970 log_received(&resp);
971
972 for inst in &resp.data {
973 if let Err(e) = self.on_instrument(inst) {
974 log_error(&e);
975 }
976 }
977 }
978
979 fn handle_book_response(&mut self, resp: &BookResponse) {
981 log_received(&resp);
982
983 if let Err(e) = self.on_book(&resp.data) {
984 log_error(&e);
985 }
986 }
987
988 fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
990 log_received(&resp);
991
992 if let Err(e) = self.on_historical_quotes(&resp.data) {
993 log_error(&e);
994 }
995 }
996
997 fn handle_trades_response(&mut self, resp: &TradesResponse) {
999 log_received(&resp);
1000
1001 if let Err(e) = self.on_historical_trades(&resp.data) {
1002 log_error(&e);
1003 }
1004 }
1005
1006 fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
1008 log_received(&resp);
1009
1010 if let Err(e) = self.on_historical_funding_rates(&resp.data) {
1011 log_error(&e);
1012 }
1013 }
1014
1015 fn handle_bars_response(&mut self, resp: &BarsResponse) {
1017 log_received(&resp);
1018
1019 if let Err(e) = self.on_historical_bars(&resp.data) {
1020 log_error(&e);
1021 }
1022 }
1023
1024 fn subscribe_data(
1026 &mut self,
1027 data_type: DataType,
1028 client_id: Option<ClientId>,
1029 params: Option<Params>,
1030 ) where
1031 Self: 'static + Debug + Sized,
1032 {
1033 let actor_id = self.actor_id().inner();
1034 let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1035 get_actor_unchecked::<Self>(&actor_id).handle_data(data);
1036 });
1037
1038 DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
1039 }
1040
1041 fn subscribe_signal(&mut self, name: &str)
1045 where
1046 Self: 'static + Debug + Sized,
1047 {
1048 let actor_id = self.actor_id().inner();
1049 let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
1052 if let Some(signal) = data.data.as_any().downcast_ref::<Signal>() {
1053 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1054 actor.handle_signal(signal);
1055 } else {
1056 log::error!("Actor {actor_id} not found for signal handling");
1057 }
1058 }
1059 });
1060
1061 DataActorCore::subscribe_signal(self, handler, name);
1062 }
1063
1064 fn subscribe_quotes(
1066 &mut self,
1067 instrument_id: InstrumentId,
1068 client_id: Option<ClientId>,
1069 params: Option<Params>,
1070 ) where
1071 Self: 'static + Debug + Sized,
1072 {
1073 let actor_id = self.actor_id().inner();
1074 let topic = get_quotes_topic(instrument_id);
1075
1076 let handler = TypedHandler::from(move |quote: &QuoteTick| {
1077 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1078 actor.handle_quote(quote);
1079 } else {
1080 log::error!("Actor {actor_id} not found for quote handling");
1081 }
1082 });
1083
1084 DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
1085 }
1086
1087 fn subscribe_instruments(
1089 &mut self,
1090 venue: Venue,
1091 client_id: Option<ClientId>,
1092 params: Option<Params>,
1093 ) where
1094 Self: 'static + Debug + Sized,
1095 {
1096 let actor_id = self.actor_id().inner();
1097 let pattern = get_instruments_pattern(venue);
1098
1099 let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
1100 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1101 actor.handle_instrument(instrument);
1102 } else {
1103 log::error!("Actor {actor_id} not found for instruments handling");
1104 }
1105 });
1106
1107 DataActorCore::subscribe_instruments(self, pattern, handler, venue, client_id, params);
1108 }
1109
1110 fn subscribe_instrument(
1112 &mut self,
1113 instrument_id: InstrumentId,
1114 client_id: Option<ClientId>,
1115 params: Option<Params>,
1116 ) where
1117 Self: 'static + Debug + Sized,
1118 {
1119 let actor_id = self.actor_id().inner();
1120 let topic = get_instrument_topic(instrument_id);
1121
1122 let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
1123 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1124 actor.handle_instrument(instrument);
1125 } else {
1126 log::error!("Actor {actor_id} not found for instrument handling");
1127 }
1128 });
1129
1130 DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
1131 }
1132
1133 fn subscribe_book_deltas(
1135 &mut self,
1136 instrument_id: InstrumentId,
1137 book_type: BookType,
1138 depth: Option<NonZeroUsize>,
1139 client_id: Option<ClientId>,
1140 managed: bool,
1141 params: Option<Params>,
1142 ) where
1143 Self: 'static + Debug + Sized,
1144 {
1145 let actor_id = self.actor_id().inner();
1146 let topic = get_book_deltas_topic(instrument_id);
1147
1148 let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
1149 get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
1150 });
1151
1152 DataActorCore::subscribe_book_deltas(
1153 self,
1154 topic,
1155 handler,
1156 instrument_id,
1157 book_type,
1158 depth,
1159 client_id,
1160 managed,
1161 params,
1162 );
1163 }
1164
1165 fn subscribe_book_at_interval(
1167 &mut self,
1168 instrument_id: InstrumentId,
1169 book_type: BookType,
1170 depth: Option<NonZeroUsize>,
1171 interval_ms: NonZeroUsize,
1172 client_id: Option<ClientId>,
1173 params: Option<Params>,
1174 ) where
1175 Self: 'static + Debug + Sized,
1176 {
1177 let actor_id = self.actor_id().inner();
1178 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
1179
1180 let handler = TypedHandler::from(move |book: &OrderBook| {
1181 get_actor_unchecked::<Self>(&actor_id).handle_book(book);
1182 });
1183
1184 DataActorCore::subscribe_book_at_interval(
1185 self,
1186 topic,
1187 handler,
1188 instrument_id,
1189 book_type,
1190 depth,
1191 interval_ms,
1192 client_id,
1193 params,
1194 );
1195 }
1196
1197 fn subscribe_trades(
1199 &mut self,
1200 instrument_id: InstrumentId,
1201 client_id: Option<ClientId>,
1202 params: Option<Params>,
1203 ) where
1204 Self: 'static + Debug + Sized,
1205 {
1206 let actor_id = self.actor_id().inner();
1207 let topic = get_trades_topic(instrument_id);
1208
1209 let handler = TypedHandler::from(move |trade: &TradeTick| {
1210 get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
1211 });
1212
1213 DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
1214 }
1215
1216 fn subscribe_bars(
1218 &mut self,
1219 bar_type: BarType,
1220 client_id: Option<ClientId>,
1221 params: Option<Params>,
1222 ) where
1223 Self: 'static + Debug + Sized,
1224 {
1225 let actor_id = self.actor_id().inner();
1226 let topic = get_bars_topic(bar_type);
1227
1228 let handler = TypedHandler::from(move |bar: &Bar| {
1229 get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
1230 });
1231
1232 DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
1233 }
1234
1235 fn subscribe_mark_prices(
1237 &mut self,
1238 instrument_id: InstrumentId,
1239 client_id: Option<ClientId>,
1240 params: Option<Params>,
1241 ) where
1242 Self: 'static + Debug + Sized,
1243 {
1244 let actor_id = self.actor_id().inner();
1245 let topic = get_mark_price_topic(instrument_id);
1246
1247 let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
1248 get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
1249 });
1250
1251 DataActorCore::subscribe_mark_prices(
1252 self,
1253 topic,
1254 handler,
1255 instrument_id,
1256 client_id,
1257 params,
1258 );
1259 }
1260
1261 fn subscribe_index_prices(
1263 &mut self,
1264 instrument_id: InstrumentId,
1265 client_id: Option<ClientId>,
1266 params: Option<Params>,
1267 ) where
1268 Self: 'static + Debug + Sized,
1269 {
1270 let actor_id = self.actor_id().inner();
1271 let topic = get_index_price_topic(instrument_id);
1272
1273 let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
1274 get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
1275 });
1276
1277 DataActorCore::subscribe_index_prices(
1278 self,
1279 topic,
1280 handler,
1281 instrument_id,
1282 client_id,
1283 params,
1284 );
1285 }
1286
1287 fn subscribe_funding_rates(
1289 &mut self,
1290 instrument_id: InstrumentId,
1291 client_id: Option<ClientId>,
1292 params: Option<Params>,
1293 ) where
1294 Self: 'static + Debug + Sized,
1295 {
1296 let actor_id = self.actor_id().inner();
1297 let topic = get_funding_rate_topic(instrument_id);
1298
1299 let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
1300 get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
1301 });
1302
1303 DataActorCore::subscribe_funding_rates(
1304 self,
1305 topic,
1306 handler,
1307 instrument_id,
1308 client_id,
1309 params,
1310 );
1311 }
1312
1313 fn subscribe_option_greeks(
1315 &mut self,
1316 instrument_id: InstrumentId,
1317 client_id: Option<ClientId>,
1318 params: Option<Params>,
1319 ) where
1320 Self: 'static + Debug + Sized,
1321 {
1322 let actor_id = self.actor_id().inner();
1323 let topic = get_option_greeks_topic(instrument_id);
1324
1325 let handler = TypedHandler::from(move |option_greeks: &OptionGreeks| {
1326 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1327 actor.handle_option_greeks(option_greeks);
1328 } else {
1329 log::error!("Actor {actor_id} not found for option greeks handling");
1330 }
1331 });
1332
1333 DataActorCore::subscribe_option_greeks(
1334 self,
1335 topic,
1336 handler,
1337 instrument_id,
1338 client_id,
1339 params,
1340 );
1341 }
1342
1343 fn subscribe_instrument_status(
1345 &mut self,
1346 instrument_id: InstrumentId,
1347 client_id: Option<ClientId>,
1348 params: Option<Params>,
1349 ) where
1350 Self: 'static + Debug + Sized,
1351 {
1352 let actor_id = self.actor_id().inner();
1353 let topic = get_instrument_status_topic(instrument_id);
1354
1355 let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
1356 get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
1357 });
1358
1359 DataActorCore::subscribe_instrument_status(
1360 self,
1361 topic,
1362 handler,
1363 instrument_id,
1364 client_id,
1365 params,
1366 );
1367 }
1368
1369 fn subscribe_instrument_close(
1371 &mut self,
1372 instrument_id: InstrumentId,
1373 client_id: Option<ClientId>,
1374 params: Option<Params>,
1375 ) where
1376 Self: 'static + Debug + Sized,
1377 {
1378 let actor_id = self.actor_id().inner();
1379 let topic = get_instrument_close_topic(instrument_id);
1380
1381 let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
1382 get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
1383 });
1384
1385 DataActorCore::subscribe_instrument_close(
1386 self,
1387 topic,
1388 handler,
1389 instrument_id,
1390 client_id,
1391 params,
1392 );
1393 }
1394
1395 fn subscribe_option_chain(
1400 &mut self,
1401 series_id: OptionSeriesId,
1402 strike_range: StrikeRange,
1403 snapshot_interval_ms: Option<u64>,
1404 client_id: Option<ClientId>,
1405 params: Option<Params>,
1406 ) where
1407 Self: 'static + Debug + Sized,
1408 {
1409 let actor_id = self.actor_id().inner();
1410 let topic = get_option_chain_topic(series_id);
1411
1412 let handler = TypedHandler::from(move |slice: &OptionChainSlice| {
1413 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
1414 actor.handle_option_chain(slice);
1415 } else {
1416 log::error!("Actor {actor_id} not found for option chain handling");
1417 }
1418 });
1419
1420 DataActorCore::subscribe_option_chain(
1421 self,
1422 topic,
1423 handler,
1424 series_id,
1425 strike_range,
1426 snapshot_interval_ms,
1427 client_id,
1428 params,
1429 );
1430 }
1431
1432 fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
1434 where
1435 Self: 'static + Debug + Sized,
1436 {
1437 let actor_id = self.actor_id().inner();
1438 let topic = get_order_fills_topic(instrument_id);
1439
1440 let handler = TypedHandler::from(move |event: &OrderEventAny| {
1441 if let OrderEventAny::Filled(filled) = event {
1442 get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
1443 }
1444 });
1445
1446 DataActorCore::subscribe_order_fills(self, topic, handler);
1447 }
1448
1449 fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1451 where
1452 Self: 'static + Debug + Sized,
1453 {
1454 let actor_id = self.actor_id().inner();
1455 let topic = get_order_cancels_topic(instrument_id);
1456
1457 let handler = TypedHandler::from(move |event: &OrderEventAny| {
1458 if let OrderEventAny::Canceled(canceled) = event {
1459 get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
1460 }
1461 });
1462
1463 DataActorCore::subscribe_order_cancels(self, topic, handler);
1464 }
1465
1466 #[cfg(feature = "defi")]
1467 fn subscribe_blocks(
1469 &mut self,
1470 chain: Blockchain,
1471 client_id: Option<ClientId>,
1472 params: Option<Params>,
1473 ) where
1474 Self: 'static + Debug + Sized,
1475 {
1476 let actor_id = self.actor_id().inner();
1477 let topic = defi::switchboard::get_defi_blocks_topic(chain);
1478
1479 let handler = TypedHandler::from(move |block: &Block| {
1480 get_actor_unchecked::<Self>(&actor_id).handle_block(block);
1481 });
1482
1483 DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
1484 }
1485
1486 #[cfg(feature = "defi")]
1487 fn subscribe_pool(
1489 &mut self,
1490 instrument_id: InstrumentId,
1491 client_id: Option<ClientId>,
1492 params: Option<Params>,
1493 ) where
1494 Self: 'static + Debug + Sized,
1495 {
1496 let actor_id = self.actor_id().inner();
1497 let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
1498
1499 let handler = TypedHandler::from(move |pool: &Pool| {
1500 get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
1501 });
1502
1503 DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
1504 }
1505
1506 #[cfg(feature = "defi")]
1507 fn subscribe_pool_swaps(
1509 &mut self,
1510 instrument_id: InstrumentId,
1511 client_id: Option<ClientId>,
1512 params: Option<Params>,
1513 ) where
1514 Self: 'static + Debug + Sized,
1515 {
1516 let actor_id = self.actor_id().inner();
1517 let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
1518
1519 let handler = TypedHandler::from(move |swap: &PoolSwap| {
1520 get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
1521 });
1522
1523 DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
1524 }
1525
1526 #[cfg(feature = "defi")]
1527 fn subscribe_pool_liquidity_updates(
1529 &mut self,
1530 instrument_id: InstrumentId,
1531 client_id: Option<ClientId>,
1532 params: Option<Params>,
1533 ) where
1534 Self: 'static + Debug + Sized,
1535 {
1536 let actor_id = self.actor_id().inner();
1537 let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
1538
1539 let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
1540 get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
1541 });
1542
1543 DataActorCore::subscribe_pool_liquidity_updates(
1544 self,
1545 topic,
1546 handler,
1547 instrument_id,
1548 client_id,
1549 params,
1550 );
1551 }
1552
1553 #[cfg(feature = "defi")]
1554 fn subscribe_pool_fee_collects(
1556 &mut self,
1557 instrument_id: InstrumentId,
1558 client_id: Option<ClientId>,
1559 params: Option<Params>,
1560 ) where
1561 Self: 'static + Debug + Sized,
1562 {
1563 let actor_id = self.actor_id().inner();
1564 let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
1565
1566 let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
1567 get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
1568 });
1569
1570 DataActorCore::subscribe_pool_fee_collects(
1571 self,
1572 topic,
1573 handler,
1574 instrument_id,
1575 client_id,
1576 params,
1577 );
1578 }
1579
1580 #[cfg(feature = "defi")]
1581 fn subscribe_pool_flash_events(
1583 &mut self,
1584 instrument_id: InstrumentId,
1585 client_id: Option<ClientId>,
1586 params: Option<Params>,
1587 ) where
1588 Self: 'static + Debug + Sized,
1589 {
1590 let actor_id = self.actor_id().inner();
1591 let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
1592
1593 let handler = TypedHandler::from(move |flash: &PoolFlash| {
1594 get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
1595 });
1596
1597 DataActorCore::subscribe_pool_flash_events(
1598 self,
1599 topic,
1600 handler,
1601 instrument_id,
1602 client_id,
1603 params,
1604 );
1605 }
1606
1607 fn unsubscribe_data(
1609 &mut self,
1610 data_type: DataType,
1611 client_id: Option<ClientId>,
1612 params: Option<Params>,
1613 ) where
1614 Self: 'static + Debug + Sized,
1615 {
1616 DataActorCore::unsubscribe_data(self, data_type, client_id, params);
1617 }
1618
1619 fn unsubscribe_signal(&mut self, name: &str)
1621 where
1622 Self: 'static + Debug + Sized,
1623 {
1624 DataActorCore::unsubscribe_signal(self, name);
1625 }
1626
1627 fn unsubscribe_instruments(
1629 &mut self,
1630 venue: Venue,
1631 client_id: Option<ClientId>,
1632 params: Option<Params>,
1633 ) where
1634 Self: 'static + Debug + Sized,
1635 {
1636 DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
1637 }
1638
1639 fn unsubscribe_instrument(
1641 &mut self,
1642 instrument_id: InstrumentId,
1643 client_id: Option<ClientId>,
1644 params: Option<Params>,
1645 ) where
1646 Self: 'static + Debug + Sized,
1647 {
1648 DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
1649 }
1650
1651 fn unsubscribe_book_deltas(
1653 &mut self,
1654 instrument_id: InstrumentId,
1655 client_id: Option<ClientId>,
1656 params: Option<Params>,
1657 ) where
1658 Self: 'static + Debug + Sized,
1659 {
1660 DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
1661 }
1662
1663 fn unsubscribe_book_at_interval(
1665 &mut self,
1666 instrument_id: InstrumentId,
1667 interval_ms: NonZeroUsize,
1668 client_id: Option<ClientId>,
1669 params: Option<Params>,
1670 ) where
1671 Self: 'static + Debug + Sized,
1672 {
1673 DataActorCore::unsubscribe_book_at_interval(
1674 self,
1675 instrument_id,
1676 interval_ms,
1677 client_id,
1678 params,
1679 );
1680 }
1681
1682 fn unsubscribe_quotes(
1684 &mut self,
1685 instrument_id: InstrumentId,
1686 client_id: Option<ClientId>,
1687 params: Option<Params>,
1688 ) where
1689 Self: 'static + Debug + Sized,
1690 {
1691 DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
1692 }
1693
1694 fn unsubscribe_trades(
1696 &mut self,
1697 instrument_id: InstrumentId,
1698 client_id: Option<ClientId>,
1699 params: Option<Params>,
1700 ) where
1701 Self: 'static + Debug + Sized,
1702 {
1703 DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
1704 }
1705
1706 fn unsubscribe_bars(
1708 &mut self,
1709 bar_type: BarType,
1710 client_id: Option<ClientId>,
1711 params: Option<Params>,
1712 ) where
1713 Self: 'static + Debug + Sized,
1714 {
1715 DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
1716 }
1717
1718 fn unsubscribe_mark_prices(
1720 &mut self,
1721 instrument_id: InstrumentId,
1722 client_id: Option<ClientId>,
1723 params: Option<Params>,
1724 ) where
1725 Self: 'static + Debug + Sized,
1726 {
1727 DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
1728 }
1729
1730 fn unsubscribe_index_prices(
1732 &mut self,
1733 instrument_id: InstrumentId,
1734 client_id: Option<ClientId>,
1735 params: Option<Params>,
1736 ) where
1737 Self: 'static + Debug + Sized,
1738 {
1739 DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
1740 }
1741
1742 fn unsubscribe_funding_rates(
1744 &mut self,
1745 instrument_id: InstrumentId,
1746 client_id: Option<ClientId>,
1747 params: Option<Params>,
1748 ) where
1749 Self: 'static + Debug + Sized,
1750 {
1751 DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
1752 }
1753
1754 fn unsubscribe_option_greeks(
1756 &mut self,
1757 instrument_id: InstrumentId,
1758 client_id: Option<ClientId>,
1759 params: Option<Params>,
1760 ) where
1761 Self: 'static + Debug + Sized,
1762 {
1763 DataActorCore::unsubscribe_option_greeks(self, instrument_id, client_id, params);
1764 }
1765
1766 fn unsubscribe_instrument_status(
1768 &mut self,
1769 instrument_id: InstrumentId,
1770 client_id: Option<ClientId>,
1771 params: Option<Params>,
1772 ) where
1773 Self: 'static + Debug + Sized,
1774 {
1775 DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
1776 }
1777
1778 fn unsubscribe_instrument_close(
1780 &mut self,
1781 instrument_id: InstrumentId,
1782 client_id: Option<ClientId>,
1783 params: Option<Params>,
1784 ) where
1785 Self: 'static + Debug + Sized,
1786 {
1787 DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
1788 }
1789
1790 fn unsubscribe_option_chain(&mut self, series_id: OptionSeriesId, client_id: Option<ClientId>)
1792 where
1793 Self: 'static + Debug + Sized,
1794 {
1795 DataActorCore::unsubscribe_option_chain(self, series_id, client_id);
1796 }
1797
1798 fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
1800 where
1801 Self: 'static + Debug + Sized,
1802 {
1803 DataActorCore::unsubscribe_order_fills(self, instrument_id);
1804 }
1805
1806 fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
1808 where
1809 Self: 'static + Debug + Sized,
1810 {
1811 DataActorCore::unsubscribe_order_cancels(self, instrument_id);
1812 }
1813
1814 #[cfg(feature = "defi")]
1815 fn unsubscribe_blocks(
1817 &mut self,
1818 chain: Blockchain,
1819 client_id: Option<ClientId>,
1820 params: Option<Params>,
1821 ) where
1822 Self: 'static + Debug + Sized,
1823 {
1824 DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
1825 }
1826
1827 #[cfg(feature = "defi")]
1828 fn unsubscribe_pool(
1830 &mut self,
1831 instrument_id: InstrumentId,
1832 client_id: Option<ClientId>,
1833 params: Option<Params>,
1834 ) where
1835 Self: 'static + Debug + Sized,
1836 {
1837 DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
1838 }
1839
1840 #[cfg(feature = "defi")]
1841 fn unsubscribe_pool_swaps(
1843 &mut self,
1844 instrument_id: InstrumentId,
1845 client_id: Option<ClientId>,
1846 params: Option<Params>,
1847 ) where
1848 Self: 'static + Debug + Sized,
1849 {
1850 DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
1851 }
1852
1853 #[cfg(feature = "defi")]
1854 fn unsubscribe_pool_liquidity_updates(
1856 &mut self,
1857 instrument_id: InstrumentId,
1858 client_id: Option<ClientId>,
1859 params: Option<Params>,
1860 ) where
1861 Self: 'static + Debug + Sized,
1862 {
1863 DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
1864 }
1865
1866 #[cfg(feature = "defi")]
1867 fn unsubscribe_pool_fee_collects(
1869 &mut self,
1870 instrument_id: InstrumentId,
1871 client_id: Option<ClientId>,
1872 params: Option<Params>,
1873 ) where
1874 Self: 'static + Debug + Sized,
1875 {
1876 DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
1877 }
1878
1879 #[cfg(feature = "defi")]
1880 fn unsubscribe_pool_flash_events(
1882 &mut self,
1883 instrument_id: InstrumentId,
1884 client_id: Option<ClientId>,
1885 params: Option<Params>,
1886 ) where
1887 Self: 'static + Debug + Sized,
1888 {
1889 DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
1890 }
1891
1892 fn request_data(
1898 &mut self,
1899 data_type: DataType,
1900 client_id: ClientId,
1901 start: Option<DateTime<Utc>>,
1902 end: Option<DateTime<Utc>>,
1903 limit: Option<NonZeroUsize>,
1904 params: Option<Params>,
1905 ) -> anyhow::Result<UUID4>
1906 where
1907 Self: 'static + Debug + Sized,
1908 {
1909 let actor_id = self.actor_id().inner();
1910 let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
1911 get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
1912 });
1913
1914 DataActorCore::request_data(
1915 self, data_type, client_id, start, end, limit, params, handler,
1916 )
1917 }
1918
1919 fn request_instrument(
1925 &mut self,
1926 instrument_id: InstrumentId,
1927 start: Option<DateTime<Utc>>,
1928 end: Option<DateTime<Utc>>,
1929 client_id: Option<ClientId>,
1930 params: Option<Params>,
1931 ) -> anyhow::Result<UUID4>
1932 where
1933 Self: 'static + Debug + Sized,
1934 {
1935 let actor_id = self.actor_id().inner();
1936 let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
1937 get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
1938 });
1939
1940 DataActorCore::request_instrument(
1941 self,
1942 instrument_id,
1943 start,
1944 end,
1945 client_id,
1946 params,
1947 handler,
1948 )
1949 }
1950
1951 fn request_instruments(
1957 &mut self,
1958 venue: Option<Venue>,
1959 start: Option<DateTime<Utc>>,
1960 end: Option<DateTime<Utc>>,
1961 client_id: Option<ClientId>,
1962 params: Option<Params>,
1963 ) -> anyhow::Result<UUID4>
1964 where
1965 Self: 'static + Debug + Sized,
1966 {
1967 let actor_id = self.actor_id().inner();
1968 let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
1969 get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
1970 });
1971
1972 DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
1973 }
1974
1975 fn request_book_snapshot(
1981 &mut self,
1982 instrument_id: InstrumentId,
1983 depth: Option<NonZeroUsize>,
1984 client_id: Option<ClientId>,
1985 params: Option<Params>,
1986 ) -> anyhow::Result<UUID4>
1987 where
1988 Self: 'static + Debug + Sized,
1989 {
1990 let actor_id = self.actor_id().inner();
1991 let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
1992 get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
1993 });
1994
1995 DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
1996 }
1997
1998 fn request_quotes(
2004 &mut self,
2005 instrument_id: InstrumentId,
2006 start: Option<DateTime<Utc>>,
2007 end: Option<DateTime<Utc>>,
2008 limit: Option<NonZeroUsize>,
2009 client_id: Option<ClientId>,
2010 params: Option<Params>,
2011 ) -> anyhow::Result<UUID4>
2012 where
2013 Self: 'static + Debug + Sized,
2014 {
2015 let actor_id = self.actor_id().inner();
2016 let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
2017 get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
2018 });
2019
2020 DataActorCore::request_quotes(
2021 self,
2022 instrument_id,
2023 start,
2024 end,
2025 limit,
2026 client_id,
2027 params,
2028 handler,
2029 )
2030 }
2031
2032 fn request_trades(
2038 &mut self,
2039 instrument_id: InstrumentId,
2040 start: Option<DateTime<Utc>>,
2041 end: Option<DateTime<Utc>>,
2042 limit: Option<NonZeroUsize>,
2043 client_id: Option<ClientId>,
2044 params: Option<Params>,
2045 ) -> anyhow::Result<UUID4>
2046 where
2047 Self: 'static + Debug + Sized,
2048 {
2049 let actor_id = self.actor_id().inner();
2050 let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
2051 get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
2052 });
2053
2054 DataActorCore::request_trades(
2055 self,
2056 instrument_id,
2057 start,
2058 end,
2059 limit,
2060 client_id,
2061 params,
2062 handler,
2063 )
2064 }
2065
2066 fn request_funding_rates(
2072 &mut self,
2073 instrument_id: InstrumentId,
2074 start: Option<DateTime<Utc>>,
2075 end: Option<DateTime<Utc>>,
2076 limit: Option<NonZeroUsize>,
2077 client_id: Option<ClientId>,
2078 params: Option<Params>,
2079 ) -> anyhow::Result<UUID4>
2080 where
2081 Self: 'static + Debug + Sized,
2082 {
2083 let actor_id = self.actor_id().inner();
2084 let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
2085 get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
2086 });
2087
2088 DataActorCore::request_funding_rates(
2089 self,
2090 instrument_id,
2091 start,
2092 end,
2093 limit,
2094 client_id,
2095 params,
2096 handler,
2097 )
2098 }
2099
2100 fn request_bars(
2106 &mut self,
2107 bar_type: BarType,
2108 start: Option<DateTime<Utc>>,
2109 end: Option<DateTime<Utc>>,
2110 limit: Option<NonZeroUsize>,
2111 client_id: Option<ClientId>,
2112 params: Option<Params>,
2113 ) -> anyhow::Result<UUID4>
2114 where
2115 Self: 'static + Debug + Sized,
2116 {
2117 let actor_id = self.actor_id().inner();
2118 let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
2119 get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
2120 });
2121
2122 DataActorCore::request_bars(
2123 self, bar_type, start, end, limit, client_id, params, handler,
2124 )
2125 }
2126}
2127
2128impl<T> Actor for T
2130where
2131 T: DataActor + Debug + 'static,
2132{
2133 fn id(&self) -> Ustr {
2134 self.actor_id.inner()
2135 }
2136
2137 #[allow(unused_variables)]
2138 fn handle(&mut self, msg: &dyn Any) {
2139 }
2141
2142 fn as_any(&self) -> &dyn Any {
2143 self
2144 }
2145}
2146
2147impl<T> Component for T
2149where
2150 T: DataActor + Debug + 'static,
2151{
2152 fn component_id(&self) -> ComponentId {
2153 ComponentId::new(self.actor_id.inner().as_str())
2154 }
2155
2156 fn state(&self) -> ComponentState {
2157 self.state
2158 }
2159
2160 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
2161 self.state = self.state.transition(&trigger)?;
2162 log::info!("{}", self.state.variant_name());
2163 Ok(())
2164 }
2165
2166 fn register(
2167 &mut self,
2168 trader_id: TraderId,
2169 clock: Rc<RefCell<dyn Clock>>,
2170 cache: Rc<RefCell<Cache>>,
2171 ) -> anyhow::Result<()> {
2172 DataActorCore::register(self, trader_id, clock.clone(), cache)?;
2173
2174 let actor_id = self.actor_id().inner();
2176 let callback = TimeEventCallback::from(move |event: TimeEvent| {
2177 if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
2178 actor.handle_time_event(&event);
2179 } else {
2180 log::error!("Actor {actor_id} not found for time event handling");
2181 }
2182 });
2183
2184 clock.borrow_mut().register_default_handler(callback);
2185
2186 self.initialize()
2187 }
2188
2189 fn on_start(&mut self) -> anyhow::Result<()> {
2190 DataActor::on_start(self)
2191 }
2192
2193 fn on_stop(&mut self) -> anyhow::Result<()> {
2194 DataActor::on_stop(self)
2195 }
2196
2197 fn on_resume(&mut self) -> anyhow::Result<()> {
2198 DataActor::on_resume(self)
2199 }
2200
2201 fn on_degrade(&mut self) -> anyhow::Result<()> {
2202 DataActor::on_degrade(self)
2203 }
2204
2205 fn on_fault(&mut self) -> anyhow::Result<()> {
2206 DataActor::on_fault(self)
2207 }
2208
2209 fn on_reset(&mut self) -> anyhow::Result<()> {
2210 DataActor::on_reset(self)
2211 }
2212
2213 fn on_dispose(&mut self) -> anyhow::Result<()> {
2214 DataActor::on_dispose(self)
2215 }
2216}
2217
2218#[derive(Clone)]
2220#[allow(
2221 dead_code,
2222 reason = "TODO: Under development (pending_requests, signal_classes)"
2223)]
2224pub struct DataActorCore {
2225 pub actor_id: ActorId,
2227 pub config: DataActorConfig,
2229 trader_id: Option<TraderId>,
2230 clock: Option<Rc<RefCell<dyn Clock>>>, cache: Option<Rc<RefCell<Cache>>>, state: ComponentState,
2233 topic_handlers: AHashMap<MStr<Pattern>, ShareableMessageHandler>,
2234 deltas_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDeltas>>,
2235 depth10_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDepth10>>,
2236 book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
2237 quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
2238 trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
2239 bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
2240 mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
2241 index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
2242 funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
2243 option_greeks_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionGreeks>>,
2244 option_chain_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionChainSlice>>,
2245 order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
2246 #[cfg(feature = "defi")]
2247 block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
2248 #[cfg(feature = "defi")]
2249 pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
2250 #[cfg(feature = "defi")]
2251 pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
2252 #[cfg(feature = "defi")]
2253 pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
2254 #[cfg(feature = "defi")]
2255 pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
2256 #[cfg(feature = "defi")]
2257 pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
2258 warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
2260 signal_classes: AHashMap<String, String>,
2261 #[cfg(feature = "indicators")]
2262 indicators: Indicators,
2263}
2264
2265impl Debug for DataActorCore {
2266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2267 f.debug_struct(stringify!(DataActorCore))
2268 .field("actor_id", &self.actor_id)
2269 .field("config", &self.config)
2270 .field("state", &self.state)
2271 .field("trader_id", &self.trader_id)
2272 .finish()
2273 }
2274}
2275
2276impl DataActorCore {
2277 pub(crate) fn add_subscription_any(
2281 &mut self,
2282 topic: MStr<Topic>,
2283 handler: ShareableMessageHandler,
2284 ) {
2285 let pattern: MStr<Pattern> = topic.into();
2286 if self.topic_handlers.contains_key(&pattern) {
2287 log::warn!(
2288 "Actor {} attempted duplicate subscription to topic '{topic}'",
2289 self.actor_id,
2290 );
2291 return;
2292 }
2293
2294 self.topic_handlers.insert(pattern, handler.clone());
2295 msgbus::subscribe_any(pattern, handler, None);
2296 }
2297
2298 pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
2302 let pattern: MStr<Pattern> = topic.into();
2303 if let Some(handler) = self.topic_handlers.remove(&pattern) {
2304 msgbus::unsubscribe_any(pattern, &handler);
2305 } else {
2306 log::warn!(
2307 "Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
2308 self.actor_id,
2309 );
2310 }
2311 }
2312
2313 pub(crate) fn add_quote_subscription(
2314 &mut self,
2315 topic: MStr<Topic>,
2316 handler: TypedHandler<QuoteTick>,
2317 ) {
2318 if self.quote_handlers.contains_key(&topic) {
2319 log::warn!(
2320 "Actor {} attempted duplicate quote subscription to '{topic}'",
2321 self.actor_id
2322 );
2323 return;
2324 }
2325 self.quote_handlers.insert(topic, handler.clone());
2326 msgbus::subscribe_quotes(topic.into(), handler, None);
2327 }
2328
2329 #[allow(dead_code)]
2330 pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
2331 if let Some(handler) = self.quote_handlers.remove(&topic) {
2332 msgbus::unsubscribe_quotes(topic.into(), &handler);
2333 }
2334 }
2335
2336 pub(crate) fn add_trade_subscription(
2337 &mut self,
2338 topic: MStr<Topic>,
2339 handler: TypedHandler<TradeTick>,
2340 ) {
2341 if self.trade_handlers.contains_key(&topic) {
2342 log::warn!(
2343 "Actor {} attempted duplicate trade subscription to '{topic}'",
2344 self.actor_id
2345 );
2346 return;
2347 }
2348 self.trade_handlers.insert(topic, handler.clone());
2349 msgbus::subscribe_trades(topic.into(), handler, None);
2350 }
2351
2352 #[allow(dead_code)]
2353 pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
2354 if let Some(handler) = self.trade_handlers.remove(&topic) {
2355 msgbus::unsubscribe_trades(topic.into(), &handler);
2356 }
2357 }
2358
2359 pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
2360 if self.bar_handlers.contains_key(&topic) {
2361 log::warn!(
2362 "Actor {} attempted duplicate bar subscription to '{topic}'",
2363 self.actor_id
2364 );
2365 return;
2366 }
2367 self.bar_handlers.insert(topic, handler.clone());
2368 msgbus::subscribe_bars(topic.into(), handler, None);
2369 }
2370
2371 #[allow(dead_code)]
2372 pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
2373 if let Some(handler) = self.bar_handlers.remove(&topic) {
2374 msgbus::unsubscribe_bars(topic.into(), &handler);
2375 }
2376 }
2377
2378 pub(crate) fn add_order_event_subscription(
2379 &mut self,
2380 topic: MStr<Topic>,
2381 handler: TypedHandler<OrderEventAny>,
2382 ) {
2383 if self.order_event_handlers.contains_key(&topic) {
2384 log::warn!(
2385 "Actor {} attempted duplicate order event subscription to '{topic}'",
2386 self.actor_id
2387 );
2388 return;
2389 }
2390 self.order_event_handlers.insert(topic, handler.clone());
2391 msgbus::subscribe_order_events(topic.into(), handler, None);
2392 }
2393
2394 #[allow(dead_code)]
2395 pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
2396 if let Some(handler) = self.order_event_handlers.remove(&topic) {
2397 msgbus::unsubscribe_order_events(topic.into(), &handler);
2398 }
2399 }
2400
2401 pub(crate) fn add_deltas_subscription(
2402 &mut self,
2403 topic: MStr<Topic>,
2404 handler: TypedHandler<OrderBookDeltas>,
2405 ) {
2406 if self.deltas_handlers.contains_key(&topic) {
2407 log::warn!(
2408 "Actor {} attempted duplicate deltas subscription to '{topic}'",
2409 self.actor_id
2410 );
2411 return;
2412 }
2413 self.deltas_handlers.insert(topic, handler.clone());
2414 msgbus::subscribe_book_deltas(topic.into(), handler, None);
2415 }
2416
2417 #[allow(dead_code)]
2418 pub(crate) fn remove_deltas_subscription(&mut self, topic: MStr<Topic>) {
2419 if let Some(handler) = self.deltas_handlers.remove(&topic) {
2420 msgbus::unsubscribe_book_deltas(topic.into(), &handler);
2421 }
2422 }
2423
2424 #[allow(dead_code)]
2425 pub(crate) fn add_depth10_subscription(
2426 &mut self,
2427 topic: MStr<Topic>,
2428 handler: TypedHandler<OrderBookDepth10>,
2429 ) {
2430 if self.depth10_handlers.contains_key(&topic) {
2431 log::warn!(
2432 "Actor {} attempted duplicate depth10 subscription to '{topic}'",
2433 self.actor_id
2434 );
2435 return;
2436 }
2437 self.depth10_handlers.insert(topic, handler.clone());
2438 msgbus::subscribe_book_depth10(topic.into(), handler, None);
2439 }
2440
2441 #[allow(dead_code)]
2442 pub(crate) fn remove_depth10_subscription(&mut self, topic: MStr<Topic>) {
2443 if let Some(handler) = self.depth10_handlers.remove(&topic) {
2444 msgbus::unsubscribe_book_depth10(topic.into(), &handler);
2445 }
2446 }
2447
2448 pub(crate) fn add_instrument_subscription(
2449 &mut self,
2450 pattern: MStr<Pattern>,
2451 handler: ShareableMessageHandler,
2452 ) {
2453 if self.topic_handlers.contains_key(&pattern) {
2454 log::warn!(
2455 "Actor {} attempted duplicate instrument subscription to '{pattern}'",
2456 self.actor_id
2457 );
2458 return;
2459 }
2460 self.topic_handlers.insert(pattern, handler.clone());
2461 msgbus::subscribe_any(pattern, handler, None);
2462 }
2463
2464 #[allow(dead_code)]
2465 pub(crate) fn remove_instrument_subscription(&mut self, pattern: MStr<Pattern>) {
2466 if let Some(handler) = self.topic_handlers.remove(&pattern) {
2467 msgbus::unsubscribe_any(pattern, &handler);
2468 }
2469 }
2470
2471 pub(crate) fn add_instrument_close_subscription(
2472 &mut self,
2473 topic: MStr<Topic>,
2474 handler: ShareableMessageHandler,
2475 ) {
2476 let pattern: MStr<Pattern> = topic.into();
2477 if self.topic_handlers.contains_key(&pattern) {
2478 log::warn!(
2479 "Actor {} attempted duplicate instrument close subscription to '{topic}'",
2480 self.actor_id
2481 );
2482 return;
2483 }
2484 self.topic_handlers.insert(pattern, handler.clone());
2485 msgbus::subscribe_any(pattern, handler, None);
2486 }
2487
2488 #[allow(dead_code)]
2489 pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
2490 let pattern: MStr<Pattern> = topic.into();
2491 if let Some(handler) = self.topic_handlers.remove(&pattern) {
2492 msgbus::unsubscribe_any(pattern, &handler);
2493 }
2494 }
2495
2496 pub(crate) fn add_book_snapshot_subscription(
2497 &mut self,
2498 topic: MStr<Topic>,
2499 handler: TypedHandler<OrderBook>,
2500 ) {
2501 if self.book_handlers.contains_key(&topic) {
2502 log::warn!(
2503 "Actor {} attempted duplicate book snapshot subscription to '{topic}'",
2504 self.actor_id
2505 );
2506 return;
2507 }
2508 self.book_handlers.insert(topic, handler.clone());
2509 msgbus::subscribe_book_snapshots(topic.into(), handler, None);
2510 }
2511
2512 #[allow(dead_code)]
2513 pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
2514 if let Some(handler) = self.book_handlers.remove(&topic) {
2515 msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
2516 }
2517 }
2518
2519 pub(crate) fn add_mark_price_subscription(
2520 &mut self,
2521 topic: MStr<Topic>,
2522 handler: TypedHandler<MarkPriceUpdate>,
2523 ) {
2524 if self.mark_price_handlers.contains_key(&topic) {
2525 log::warn!(
2526 "Actor {} attempted duplicate mark price subscription to '{topic}'",
2527 self.actor_id
2528 );
2529 return;
2530 }
2531 self.mark_price_handlers.insert(topic, handler.clone());
2532 msgbus::subscribe_mark_prices(topic.into(), handler, None);
2533 }
2534
2535 #[allow(dead_code)]
2536 pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
2537 if let Some(handler) = self.mark_price_handlers.remove(&topic) {
2538 msgbus::unsubscribe_mark_prices(topic.into(), &handler);
2539 }
2540 }
2541
2542 pub(crate) fn add_index_price_subscription(
2543 &mut self,
2544 topic: MStr<Topic>,
2545 handler: TypedHandler<IndexPriceUpdate>,
2546 ) {
2547 if self.index_price_handlers.contains_key(&topic) {
2548 log::warn!(
2549 "Actor {} attempted duplicate index price subscription to '{topic}'",
2550 self.actor_id
2551 );
2552 return;
2553 }
2554 self.index_price_handlers.insert(topic, handler.clone());
2555 msgbus::subscribe_index_prices(topic.into(), handler, None);
2556 }
2557
2558 #[allow(dead_code)]
2559 pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
2560 if let Some(handler) = self.index_price_handlers.remove(&topic) {
2561 msgbus::unsubscribe_index_prices(topic.into(), &handler);
2562 }
2563 }
2564
2565 pub(crate) fn add_funding_rate_subscription(
2566 &mut self,
2567 topic: MStr<Topic>,
2568 handler: TypedHandler<FundingRateUpdate>,
2569 ) {
2570 if self.funding_rate_handlers.contains_key(&topic) {
2571 log::warn!(
2572 "Actor {} attempted duplicate funding rate subscription to '{topic}'",
2573 self.actor_id
2574 );
2575 return;
2576 }
2577 self.funding_rate_handlers.insert(topic, handler.clone());
2578 msgbus::subscribe_funding_rates(topic.into(), handler, None);
2579 }
2580
2581 #[allow(dead_code)]
2582 pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
2583 if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
2584 msgbus::unsubscribe_funding_rates(topic.into(), &handler);
2585 }
2586 }
2587
2588 pub(crate) fn add_option_greeks_subscription(
2589 &mut self,
2590 topic: MStr<Topic>,
2591 handler: TypedHandler<OptionGreeks>,
2592 ) {
2593 if self.option_greeks_handlers.contains_key(&topic) {
2594 log::warn!(
2595 "Actor {} attempted duplicate option greeks subscription to '{topic}'",
2596 self.actor_id
2597 );
2598 return;
2599 }
2600 self.option_greeks_handlers.insert(topic, handler.clone());
2601 msgbus::subscribe_option_greeks(topic.into(), handler, None);
2602 }
2603
2604 #[allow(dead_code)]
2605 pub(crate) fn remove_option_greeks_subscription(&mut self, topic: MStr<Topic>) {
2606 if let Some(handler) = self.option_greeks_handlers.remove(&topic) {
2607 msgbus::unsubscribe_option_greeks(topic.into(), &handler);
2608 }
2609 }
2610
2611 pub(crate) fn add_option_chain_subscription(
2612 &mut self,
2613 topic: MStr<Topic>,
2614 handler: TypedHandler<OptionChainSlice>,
2615 ) {
2616 if self.option_chain_handlers.contains_key(&topic) {
2617 log::warn!(
2618 "Actor {} attempted duplicate option chain subscription to '{topic}'",
2619 self.actor_id
2620 );
2621 return;
2622 }
2623 self.option_chain_handlers.insert(topic, handler.clone());
2624 msgbus::subscribe_option_chain(topic.into(), handler, None);
2625 }
2626
2627 pub(crate) fn remove_option_chain_subscription(&mut self, topic: MStr<Topic>) {
2628 if let Some(handler) = self.option_chain_handlers.remove(&topic) {
2629 msgbus::unsubscribe_option_chain(topic.into(), &handler);
2630 }
2631 }
2632
2633 #[cfg(feature = "defi")]
2634 pub(crate) fn add_block_subscription(
2635 &mut self,
2636 topic: MStr<Topic>,
2637 handler: TypedHandler<Block>,
2638 ) {
2639 if self.block_handlers.contains_key(&topic) {
2640 log::warn!(
2641 "Actor {} attempted duplicate block subscription to '{topic}'",
2642 self.actor_id
2643 );
2644 return;
2645 }
2646 self.block_handlers.insert(topic, handler.clone());
2647 msgbus::subscribe_defi_blocks(topic.into(), handler, None);
2648 }
2649
2650 #[cfg(feature = "defi")]
2651 #[allow(dead_code)]
2652 pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
2653 if let Some(handler) = self.block_handlers.remove(&topic) {
2654 msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
2655 }
2656 }
2657
2658 #[cfg(feature = "defi")]
2659 pub(crate) fn add_pool_subscription(
2660 &mut self,
2661 topic: MStr<Topic>,
2662 handler: TypedHandler<Pool>,
2663 ) {
2664 if self.pool_handlers.contains_key(&topic) {
2665 log::warn!(
2666 "Actor {} attempted duplicate pool subscription to '{topic}'",
2667 self.actor_id
2668 );
2669 return;
2670 }
2671 self.pool_handlers.insert(topic, handler.clone());
2672 msgbus::subscribe_defi_pools(topic.into(), handler, None);
2673 }
2674
2675 #[cfg(feature = "defi")]
2676 #[allow(dead_code)]
2677 pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
2678 if let Some(handler) = self.pool_handlers.remove(&topic) {
2679 msgbus::unsubscribe_defi_pools(topic.into(), &handler);
2680 }
2681 }
2682
2683 #[cfg(feature = "defi")]
2684 pub(crate) fn add_pool_swap_subscription(
2685 &mut self,
2686 topic: MStr<Topic>,
2687 handler: TypedHandler<PoolSwap>,
2688 ) {
2689 if self.pool_swap_handlers.contains_key(&topic) {
2690 log::warn!(
2691 "Actor {} attempted duplicate pool swap subscription to '{topic}'",
2692 self.actor_id
2693 );
2694 return;
2695 }
2696 self.pool_swap_handlers.insert(topic, handler.clone());
2697 msgbus::subscribe_defi_swaps(topic.into(), handler, None);
2698 }
2699
2700 #[cfg(feature = "defi")]
2701 #[allow(dead_code)]
2702 pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
2703 if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
2704 msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
2705 }
2706 }
2707
2708 #[cfg(feature = "defi")]
2709 pub(crate) fn add_pool_liquidity_subscription(
2710 &mut self,
2711 topic: MStr<Topic>,
2712 handler: TypedHandler<PoolLiquidityUpdate>,
2713 ) {
2714 if self.pool_liquidity_handlers.contains_key(&topic) {
2715 log::warn!(
2716 "Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
2717 self.actor_id
2718 );
2719 return;
2720 }
2721 self.pool_liquidity_handlers.insert(topic, handler.clone());
2722 msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
2723 }
2724
2725 #[cfg(feature = "defi")]
2726 #[allow(dead_code)]
2727 pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
2728 if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
2729 msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
2730 }
2731 }
2732
2733 #[cfg(feature = "defi")]
2734 pub(crate) fn add_pool_collect_subscription(
2735 &mut self,
2736 topic: MStr<Topic>,
2737 handler: TypedHandler<PoolFeeCollect>,
2738 ) {
2739 if self.pool_collect_handlers.contains_key(&topic) {
2740 log::warn!(
2741 "Actor {} attempted duplicate pool collect subscription to '{topic}'",
2742 self.actor_id
2743 );
2744 return;
2745 }
2746 self.pool_collect_handlers.insert(topic, handler.clone());
2747 msgbus::subscribe_defi_collects(topic.into(), handler, None);
2748 }
2749
2750 #[cfg(feature = "defi")]
2751 #[allow(dead_code)]
2752 pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
2753 if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
2754 msgbus::unsubscribe_defi_collects(topic.into(), &handler);
2755 }
2756 }
2757
2758 #[cfg(feature = "defi")]
2759 pub(crate) fn add_pool_flash_subscription(
2760 &mut self,
2761 topic: MStr<Topic>,
2762 handler: TypedHandler<PoolFlash>,
2763 ) {
2764 if self.pool_flash_handlers.contains_key(&topic) {
2765 log::warn!(
2766 "Actor {} attempted duplicate pool flash subscription to '{topic}'",
2767 self.actor_id
2768 );
2769 return;
2770 }
2771 self.pool_flash_handlers.insert(topic, handler.clone());
2772 msgbus::subscribe_defi_flash(topic.into(), handler, None);
2773 }
2774
2775 #[cfg(feature = "defi")]
2776 #[allow(dead_code)]
2777 pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
2778 if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
2779 msgbus::unsubscribe_defi_flash(topic.into(), &handler);
2780 }
2781 }
2782
2783 pub fn new(config: DataActorConfig) -> Self {
2785 let actor_id = config
2786 .actor_id
2787 .unwrap_or_else(|| Self::default_actor_id(&config));
2788
2789 Self {
2790 actor_id,
2791 config,
2792 trader_id: None, clock: None, cache: None, state: ComponentState::default(),
2796 topic_handlers: AHashMap::new(),
2797 deltas_handlers: AHashMap::new(),
2798 depth10_handlers: AHashMap::new(),
2799 book_handlers: AHashMap::new(),
2800 quote_handlers: AHashMap::new(),
2801 trade_handlers: AHashMap::new(),
2802 bar_handlers: AHashMap::new(),
2803 mark_price_handlers: AHashMap::new(),
2804 index_price_handlers: AHashMap::new(),
2805 funding_rate_handlers: AHashMap::new(),
2806 option_greeks_handlers: AHashMap::new(),
2807 option_chain_handlers: AHashMap::new(),
2808 order_event_handlers: AHashMap::new(),
2809 #[cfg(feature = "defi")]
2810 block_handlers: AHashMap::new(),
2811 #[cfg(feature = "defi")]
2812 pool_handlers: AHashMap::new(),
2813 #[cfg(feature = "defi")]
2814 pool_swap_handlers: AHashMap::new(),
2815 #[cfg(feature = "defi")]
2816 pool_liquidity_handlers: AHashMap::new(),
2817 #[cfg(feature = "defi")]
2818 pool_collect_handlers: AHashMap::new(),
2819 #[cfg(feature = "defi")]
2820 pool_flash_handlers: AHashMap::new(),
2821 warning_events: AHashSet::new(),
2822 pending_requests: AHashMap::new(),
2823 signal_classes: AHashMap::new(),
2824 #[cfg(feature = "indicators")]
2825 indicators: Indicators::default(),
2826 }
2827 }
2828
2829 #[must_use]
2831 pub fn mem_address(&self) -> String {
2832 format!("{self:p}")
2833 }
2834
2835 pub fn state(&self) -> ComponentState {
2837 self.state
2838 }
2839
2840 pub fn trader_id(&self) -> Option<TraderId> {
2842 self.trader_id
2843 }
2844
2845 pub fn actor_id(&self) -> ActorId {
2847 self.actor_id
2848 }
2849
2850 fn default_actor_id(config: &DataActorConfig) -> ActorId {
2851 let memory_address = std::ptr::from_ref(config) as usize;
2852 ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
2853 }
2854
2855 pub fn timestamp_ns(&self) -> UnixNanos {
2857 self.clock_ref().timestamp_ns()
2858 }
2859
2860 pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
2866 self.clock
2867 .as_ref()
2868 .unwrap_or_else(|| {
2869 panic!(
2870 "DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
2871 self.actor_id, self.trader_id
2872 )
2873 })
2874 .borrow_mut()
2875 }
2876
2877 pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
2883 self.clock
2884 .as_ref()
2885 .expect("DataActor must be registered before accessing clock")
2886 .clone()
2887 }
2888
2889 fn clock_ref(&self) -> Ref<'_, dyn Clock> {
2890 self.clock
2891 .as_ref()
2892 .unwrap_or_else(|| {
2893 panic!(
2894 "DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
2895 self.actor_id, self.trader_id
2896 )
2897 })
2898 .borrow()
2899 }
2900
2901 pub fn cache(&self) -> Ref<'_, Cache> {
2907 self.cache
2908 .as_ref()
2909 .expect("DataActor must be registered before accessing cache")
2910 .borrow()
2911 }
2912
2913 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
2919 self.cache
2920 .as_ref()
2921 .expect("DataActor must be registered before accessing cache")
2922 .clone()
2923 }
2924
2925 pub fn register(
2932 &mut self,
2933 trader_id: TraderId,
2934 clock: Rc<RefCell<dyn Clock>>,
2935 cache: Rc<RefCell<Cache>>,
2936 ) -> anyhow::Result<()> {
2937 if let Some(existing_trader_id) = self.trader_id {
2938 anyhow::bail!(
2939 "DataActor {} already registered with trader {existing_trader_id}",
2940 self.actor_id
2941 );
2942 }
2943
2944 {
2946 let _timestamp = clock.borrow().timestamp_ns();
2947 }
2948
2949 {
2951 let _cache_borrow = cache.borrow();
2952 }
2953
2954 self.trader_id = Some(trader_id);
2955 self.clock = Some(clock);
2956 self.cache = Some(cache);
2957
2958 if !self.is_properly_registered() {
2960 anyhow::bail!(
2961 "DataActor {} registration incomplete - validation failed",
2962 self.actor_id
2963 );
2964 }
2965
2966 log::debug!("Registered {} with trader {trader_id}", self.actor_id);
2967 Ok(())
2968 }
2969
2970 pub fn register_warning_event(&mut self, event_type: &str) {
2972 self.warning_events.insert(event_type.to_string());
2973 log::debug!("Registered event type '{event_type}' for warning logs");
2974 }
2975
2976 pub fn deregister_warning_event(&mut self, event_type: &str) {
2978 self.warning_events.remove(event_type);
2979 log::debug!("Deregistered event type '{event_type}' from warning logs");
2980 }
2981
2982 pub fn is_registered(&self) -> bool {
2983 self.trader_id.is_some()
2984 }
2985
2986 pub(crate) fn check_registered(&self) {
2987 assert!(
2988 self.is_registered(),
2989 "Actor has not been registered with a Trader"
2990 );
2991 }
2992
2993 fn is_properly_registered(&self) -> bool {
2995 self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
2996 }
2997
2998 pub(crate) fn send_data_cmd(&self, command: DataCommand) {
2999 if self.config.log_commands {
3000 log::info!("{CMD}{SEND} {command:?}");
3001 }
3002
3003 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3004 msgbus::send_data_command(endpoint, command);
3005 }
3006
3007 #[allow(dead_code)]
3008 fn send_data_req(&self, request: &RequestCommand) {
3009 if self.config.log_commands {
3010 log::info!("{REQ}{SEND} {request:?}");
3011 }
3012
3013 let endpoint = MessagingSwitchboard::data_engine_queue_execute();
3016 msgbus::send_any(endpoint, request.as_any());
3017 }
3018
3019 pub fn shutdown_system(&self, reason: Option<String>) {
3025 self.check_registered();
3026
3027 let command = ShutdownSystem::new(
3029 self.trader_id().unwrap(),
3030 self.actor_id.inner(),
3031 reason,
3032 UUID4::new(),
3033 self.timestamp_ns(),
3034 );
3035
3036 let topic = MessagingSwitchboard::shutdown_system_topic();
3037 msgbus::publish_any(topic, command.as_any());
3038 }
3039
3040 pub fn publish_data(&self, data_type: &DataType, data: &CustomData) {
3050 self.check_registered();
3051
3052 let topic = get_custom_topic(data_type);
3053 msgbus::publish_any(topic, data);
3054 }
3055
3056 pub fn publish_signal(&self, name: &str, value: String, ts_event: UnixNanos) {
3068 self.check_registered();
3069
3070 let now = self.timestamp_ns();
3071 let ts_event = if ts_event.as_u64() == 0 {
3072 now
3073 } else {
3074 ts_event
3075 };
3076 let signal = Signal::new(Ustr::from(name), value, ts_event, now);
3077
3078 let data_type = DataType::new(
3079 &format!(
3080 "Signal{}",
3081 nautilus_core::string::conversions::title_case(name)
3082 ),
3083 None,
3084 None,
3085 );
3086 let data = CustomData::new(Arc::new(signal), data_type);
3087 let topic = get_custom_topic(&data.data_type);
3088 msgbus::publish_any(topic, &data);
3089 }
3090
3091 pub fn add_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3099 self.check_registered();
3100
3101 let cache = self.cache_rc();
3102 if cache.borrow().synthetic(&synthetic.id).is_some() {
3103 anyhow::bail!("`synthetic` {} already exists", synthetic.id);
3104 }
3105 cache.borrow_mut().add_synthetic(synthetic)
3106 }
3107
3108 pub fn update_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
3116 self.check_registered();
3117
3118 let cache = self.cache_rc();
3119 if cache.borrow().synthetic(&synthetic.id).is_none() {
3120 anyhow::bail!("`synthetic` {} does not exist", synthetic.id);
3121 }
3122 cache.borrow_mut().add_synthetic(synthetic)
3123 }
3124
3125 pub fn subscribe_data(
3131 &mut self,
3132 handler: ShareableMessageHandler,
3133 data_type: DataType,
3134 client_id: Option<ClientId>,
3135 params: Option<Params>,
3136 ) {
3137 assert!(
3138 self.is_properly_registered(),
3139 "DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
3140 self.actor_id,
3141 self.trader_id,
3142 self.clock.is_some(),
3143 self.cache.is_some()
3144 );
3145
3146 let topic = get_custom_topic(&data_type);
3147 self.add_subscription_any(topic, handler);
3148
3149 if client_id.is_none() {
3151 return;
3152 }
3153
3154 let command = SubscribeCommand::Data(SubscribeCustomData {
3155 data_type,
3156 client_id,
3157 venue: None,
3158 command_id: UUID4::new(),
3159 ts_init: self.timestamp_ns(),
3160 correlation_id: None,
3161 params,
3162 });
3163
3164 self.send_data_cmd(DataCommand::Subscribe(command));
3165 }
3166
3167 pub fn subscribe_signal(&mut self, handler: ShareableMessageHandler, name: &str) {
3175 self.check_registered();
3176
3177 let pattern = get_signal_pattern(name);
3178 if self.topic_handlers.contains_key(&pattern) {
3179 log::warn!(
3180 "Actor {} attempted duplicate signal subscription to '{pattern}'",
3181 self.actor_id,
3182 );
3183 return;
3184 }
3185 self.topic_handlers.insert(pattern, handler.clone());
3186 msgbus::subscribe_any(pattern, handler, None);
3187 }
3188
3189 pub fn subscribe_quotes(
3191 &mut self,
3192 topic: MStr<Topic>,
3193 handler: TypedHandler<QuoteTick>,
3194 instrument_id: InstrumentId,
3195 client_id: Option<ClientId>,
3196 params: Option<Params>,
3197 ) {
3198 self.check_registered();
3199
3200 self.add_quote_subscription(topic, handler);
3201
3202 let command = SubscribeCommand::Quotes(SubscribeQuotes {
3203 instrument_id,
3204 client_id,
3205 venue: Some(instrument_id.venue),
3206 command_id: UUID4::new(),
3207 ts_init: self.timestamp_ns(),
3208 correlation_id: None,
3209 params,
3210 });
3211
3212 self.send_data_cmd(DataCommand::Subscribe(command));
3213 }
3214
3215 pub fn subscribe_instruments(
3217 &mut self,
3218 pattern: MStr<Pattern>,
3219 handler: ShareableMessageHandler,
3220 venue: Venue,
3221 client_id: Option<ClientId>,
3222 params: Option<Params>,
3223 ) {
3224 self.check_registered();
3225
3226 self.add_instrument_subscription(pattern, handler);
3227
3228 let command = SubscribeCommand::Instruments(SubscribeInstruments {
3229 client_id,
3230 venue,
3231 command_id: UUID4::new(),
3232 ts_init: self.timestamp_ns(),
3233 correlation_id: None,
3234 params,
3235 });
3236
3237 self.send_data_cmd(DataCommand::Subscribe(command));
3238 }
3239
3240 pub fn subscribe_instrument(
3242 &mut self,
3243 topic: MStr<Topic>,
3244 handler: ShareableMessageHandler,
3245 instrument_id: InstrumentId,
3246 client_id: Option<ClientId>,
3247 params: Option<Params>,
3248 ) {
3249 self.check_registered();
3250
3251 self.add_instrument_subscription(topic.into(), handler);
3252
3253 let command = SubscribeCommand::Instrument(SubscribeInstrument {
3254 instrument_id,
3255 client_id,
3256 venue: Some(instrument_id.venue),
3257 command_id: UUID4::new(),
3258 ts_init: self.timestamp_ns(),
3259 correlation_id: None,
3260 params,
3261 });
3262
3263 self.send_data_cmd(DataCommand::Subscribe(command));
3264 }
3265
3266 #[expect(clippy::too_many_arguments)]
3268 pub fn subscribe_book_deltas(
3269 &mut self,
3270 topic: MStr<Topic>,
3271 handler: TypedHandler<OrderBookDeltas>,
3272 instrument_id: InstrumentId,
3273 book_type: BookType,
3274 depth: Option<NonZeroUsize>,
3275 client_id: Option<ClientId>,
3276 managed: bool,
3277 params: Option<Params>,
3278 ) {
3279 self.check_registered();
3280
3281 self.add_deltas_subscription(topic, handler);
3282
3283 let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
3284 instrument_id,
3285 book_type,
3286 client_id,
3287 venue: Some(instrument_id.venue),
3288 command_id: UUID4::new(),
3289 ts_init: self.timestamp_ns(),
3290 depth,
3291 managed,
3292 correlation_id: None,
3293 params,
3294 });
3295
3296 self.send_data_cmd(DataCommand::Subscribe(command));
3297 }
3298
3299 #[expect(clippy::too_many_arguments)]
3301 pub fn subscribe_book_at_interval(
3302 &mut self,
3303 topic: MStr<Topic>,
3304 handler: TypedHandler<OrderBook>,
3305 instrument_id: InstrumentId,
3306 book_type: BookType,
3307 depth: Option<NonZeroUsize>,
3308 interval_ms: NonZeroUsize,
3309 client_id: Option<ClientId>,
3310 params: Option<Params>,
3311 ) {
3312 self.check_registered();
3313
3314 self.add_book_snapshot_subscription(topic, handler);
3315
3316 let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
3317 instrument_id,
3318 book_type,
3319 client_id,
3320 venue: Some(instrument_id.venue),
3321 command_id: UUID4::new(),
3322 ts_init: self.timestamp_ns(),
3323 depth,
3324 interval_ms,
3325 correlation_id: None,
3326 params,
3327 });
3328
3329 self.send_data_cmd(DataCommand::Subscribe(command));
3330 }
3331
3332 pub fn subscribe_trades(
3334 &mut self,
3335 topic: MStr<Topic>,
3336 handler: TypedHandler<TradeTick>,
3337 instrument_id: InstrumentId,
3338 client_id: Option<ClientId>,
3339 params: Option<Params>,
3340 ) {
3341 self.check_registered();
3342
3343 self.add_trade_subscription(topic, handler);
3344
3345 let command = SubscribeCommand::Trades(SubscribeTrades {
3346 instrument_id,
3347 client_id,
3348 venue: Some(instrument_id.venue),
3349 command_id: UUID4::new(),
3350 ts_init: self.timestamp_ns(),
3351 correlation_id: None,
3352 params,
3353 });
3354
3355 self.send_data_cmd(DataCommand::Subscribe(command));
3356 }
3357
3358 pub fn subscribe_bars(
3360 &mut self,
3361 topic: MStr<Topic>,
3362 handler: TypedHandler<Bar>,
3363 bar_type: BarType,
3364 client_id: Option<ClientId>,
3365 params: Option<Params>,
3366 ) {
3367 self.check_registered();
3368
3369 self.add_bar_subscription(topic, handler);
3370
3371 let command = SubscribeCommand::Bars(SubscribeBars {
3372 bar_type,
3373 client_id,
3374 venue: Some(bar_type.instrument_id().venue),
3375 command_id: UUID4::new(),
3376 ts_init: self.timestamp_ns(),
3377 correlation_id: None,
3378 params,
3379 });
3380
3381 self.send_data_cmd(DataCommand::Subscribe(command));
3382 }
3383
3384 pub fn subscribe_mark_prices(
3386 &mut self,
3387 topic: MStr<Topic>,
3388 handler: TypedHandler<MarkPriceUpdate>,
3389 instrument_id: InstrumentId,
3390 client_id: Option<ClientId>,
3391 params: Option<Params>,
3392 ) {
3393 self.check_registered();
3394
3395 self.add_mark_price_subscription(topic, handler);
3396
3397 let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
3398 instrument_id,
3399 client_id,
3400 venue: Some(instrument_id.venue),
3401 command_id: UUID4::new(),
3402 ts_init: self.timestamp_ns(),
3403 correlation_id: None,
3404 params,
3405 });
3406
3407 self.send_data_cmd(DataCommand::Subscribe(command));
3408 }
3409
3410 pub fn subscribe_index_prices(
3412 &mut self,
3413 topic: MStr<Topic>,
3414 handler: TypedHandler<IndexPriceUpdate>,
3415 instrument_id: InstrumentId,
3416 client_id: Option<ClientId>,
3417 params: Option<Params>,
3418 ) {
3419 self.check_registered();
3420
3421 self.add_index_price_subscription(topic, handler);
3422
3423 let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
3424 instrument_id,
3425 client_id,
3426 venue: Some(instrument_id.venue),
3427 command_id: UUID4::new(),
3428 ts_init: self.timestamp_ns(),
3429 correlation_id: None,
3430 params,
3431 });
3432
3433 self.send_data_cmd(DataCommand::Subscribe(command));
3434 }
3435
3436 pub fn subscribe_funding_rates(
3438 &mut self,
3439 topic: MStr<Topic>,
3440 handler: TypedHandler<FundingRateUpdate>,
3441 instrument_id: InstrumentId,
3442 client_id: Option<ClientId>,
3443 params: Option<Params>,
3444 ) {
3445 self.check_registered();
3446
3447 self.add_funding_rate_subscription(topic, handler);
3448
3449 let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
3450 instrument_id,
3451 client_id,
3452 venue: Some(instrument_id.venue),
3453 command_id: UUID4::new(),
3454 ts_init: self.timestamp_ns(),
3455 correlation_id: None,
3456 params,
3457 });
3458
3459 self.send_data_cmd(DataCommand::Subscribe(command));
3460 }
3461
3462 pub fn subscribe_option_greeks(
3464 &mut self,
3465 topic: MStr<Topic>,
3466 handler: TypedHandler<OptionGreeks>,
3467 instrument_id: InstrumentId,
3468 client_id: Option<ClientId>,
3469 params: Option<Params>,
3470 ) {
3471 self.check_registered();
3472
3473 self.add_option_greeks_subscription(topic, handler);
3474
3475 let command = SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
3476 instrument_id,
3477 client_id,
3478 venue: Some(instrument_id.venue),
3479 command_id: UUID4::new(),
3480 ts_init: self.timestamp_ns(),
3481 correlation_id: None,
3482 params,
3483 });
3484
3485 self.send_data_cmd(DataCommand::Subscribe(command));
3486 }
3487
3488 pub fn subscribe_instrument_status(
3490 &mut self,
3491 topic: MStr<Topic>,
3492 handler: ShareableMessageHandler,
3493 instrument_id: InstrumentId,
3494 client_id: Option<ClientId>,
3495 params: Option<Params>,
3496 ) {
3497 self.check_registered();
3498
3499 self.add_subscription_any(topic, handler);
3500
3501 let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
3502 instrument_id,
3503 client_id,
3504 venue: Some(instrument_id.venue),
3505 command_id: UUID4::new(),
3506 ts_init: self.timestamp_ns(),
3507 correlation_id: None,
3508 params,
3509 });
3510
3511 self.send_data_cmd(DataCommand::Subscribe(command));
3512 }
3513
3514 pub fn subscribe_instrument_close(
3516 &mut self,
3517 topic: MStr<Topic>,
3518 handler: ShareableMessageHandler,
3519 instrument_id: InstrumentId,
3520 client_id: Option<ClientId>,
3521 params: Option<Params>,
3522 ) {
3523 self.check_registered();
3524
3525 self.add_instrument_close_subscription(topic, handler);
3526
3527 let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
3528 instrument_id,
3529 client_id,
3530 venue: Some(instrument_id.venue),
3531 command_id: UUID4::new(),
3532 ts_init: self.timestamp_ns(),
3533 correlation_id: None,
3534 params,
3535 });
3536
3537 self.send_data_cmd(DataCommand::Subscribe(command));
3538 }
3539
3540 #[allow(clippy::too_many_arguments)]
3542 pub fn subscribe_option_chain(
3543 &mut self,
3544 topic: MStr<Topic>,
3545 handler: TypedHandler<OptionChainSlice>,
3546 series_id: OptionSeriesId,
3547 strike_range: StrikeRange,
3548 snapshot_interval_ms: Option<u64>,
3549 client_id: Option<ClientId>,
3550 params: Option<Params>,
3551 ) {
3552 self.check_registered();
3553
3554 self.add_option_chain_subscription(topic, handler);
3555
3556 let command = SubscribeCommand::OptionChain(SubscribeOptionChain::new(
3557 series_id,
3558 strike_range,
3559 snapshot_interval_ms,
3560 UUID4::new(),
3561 self.timestamp_ns(),
3562 client_id,
3563 Some(series_id.venue),
3564 params,
3565 ));
3566
3567 self.send_data_cmd(DataCommand::Subscribe(command));
3568 }
3569
3570 pub fn subscribe_order_fills(
3572 &mut self,
3573 topic: MStr<Topic>,
3574 handler: TypedHandler<OrderEventAny>,
3575 ) {
3576 self.check_registered();
3577 self.add_order_event_subscription(topic, handler);
3578 }
3579
3580 pub fn subscribe_order_cancels(
3582 &mut self,
3583 topic: MStr<Topic>,
3584 handler: TypedHandler<OrderEventAny>,
3585 ) {
3586 self.check_registered();
3587 self.add_order_event_subscription(topic, handler);
3588 }
3589
3590 pub fn unsubscribe_data(
3592 &mut self,
3593 data_type: DataType,
3594 client_id: Option<ClientId>,
3595 params: Option<Params>,
3596 ) {
3597 self.check_registered();
3598
3599 let topic = get_custom_topic(&data_type);
3600 self.remove_subscription_any(topic);
3601
3602 if client_id.is_none() {
3603 return;
3604 }
3605
3606 let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
3607 data_type,
3608 client_id,
3609 venue: None,
3610 command_id: UUID4::new(),
3611 ts_init: self.timestamp_ns(),
3612 correlation_id: None,
3613 params,
3614 });
3615
3616 self.send_data_cmd(DataCommand::Unsubscribe(command));
3617 }
3618
3619 pub fn unsubscribe_signal(&mut self, name: &str) {
3625 self.check_registered();
3626
3627 let pattern = get_signal_pattern(name);
3628 if let Some(handler) = self.topic_handlers.remove(&pattern) {
3629 msgbus::unsubscribe_any(pattern, &handler);
3630 } else {
3631 log::warn!(
3632 "Actor {} attempted to unsubscribe from signal pattern '{pattern}' when not subscribed",
3633 self.actor_id,
3634 );
3635 }
3636 }
3637
3638 pub fn unsubscribe_instruments(
3640 &mut self,
3641 venue: Venue,
3642 client_id: Option<ClientId>,
3643 params: Option<Params>,
3644 ) {
3645 self.check_registered();
3646
3647 let pattern = get_instruments_pattern(venue);
3648 self.remove_instrument_subscription(pattern);
3649
3650 let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
3651 client_id,
3652 venue,
3653 command_id: UUID4::new(),
3654 ts_init: self.timestamp_ns(),
3655 correlation_id: None,
3656 params,
3657 });
3658
3659 self.send_data_cmd(DataCommand::Unsubscribe(command));
3660 }
3661
3662 pub fn unsubscribe_instrument(
3664 &mut self,
3665 instrument_id: InstrumentId,
3666 client_id: Option<ClientId>,
3667 params: Option<Params>,
3668 ) {
3669 self.check_registered();
3670
3671 let topic = get_instrument_topic(instrument_id);
3672 self.remove_instrument_subscription(topic.into());
3673
3674 let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
3675 instrument_id,
3676 client_id,
3677 venue: Some(instrument_id.venue),
3678 command_id: UUID4::new(),
3679 ts_init: self.timestamp_ns(),
3680 correlation_id: None,
3681 params,
3682 });
3683
3684 self.send_data_cmd(DataCommand::Unsubscribe(command));
3685 }
3686
3687 pub fn unsubscribe_book_deltas(
3689 &mut self,
3690 instrument_id: InstrumentId,
3691 client_id: Option<ClientId>,
3692 params: Option<Params>,
3693 ) {
3694 self.check_registered();
3695
3696 let topic = get_book_deltas_topic(instrument_id);
3697 self.remove_deltas_subscription(topic);
3698
3699 let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
3700 instrument_id,
3701 client_id,
3702 venue: Some(instrument_id.venue),
3703 command_id: UUID4::new(),
3704 ts_init: self.timestamp_ns(),
3705 correlation_id: None,
3706 params,
3707 });
3708
3709 self.send_data_cmd(DataCommand::Unsubscribe(command));
3710 }
3711
3712 pub fn unsubscribe_book_at_interval(
3714 &mut self,
3715 instrument_id: InstrumentId,
3716 interval_ms: NonZeroUsize,
3717 client_id: Option<ClientId>,
3718 params: Option<Params>,
3719 ) {
3720 self.check_registered();
3721
3722 let topic = get_book_snapshots_topic(instrument_id, interval_ms);
3723 self.remove_book_snapshot_subscription(topic);
3724
3725 let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
3726 instrument_id,
3727 interval_ms,
3728 client_id,
3729 venue: Some(instrument_id.venue),
3730 command_id: UUID4::new(),
3731 ts_init: self.timestamp_ns(),
3732 correlation_id: None,
3733 params,
3734 });
3735
3736 self.send_data_cmd(DataCommand::Unsubscribe(command));
3737 }
3738
3739 pub fn unsubscribe_quotes(
3741 &mut self,
3742 instrument_id: InstrumentId,
3743 client_id: Option<ClientId>,
3744 params: Option<Params>,
3745 ) {
3746 self.check_registered();
3747
3748 let topic = get_quotes_topic(instrument_id);
3749 self.remove_quote_subscription(topic);
3750
3751 let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
3752 instrument_id,
3753 client_id,
3754 venue: Some(instrument_id.venue),
3755 command_id: UUID4::new(),
3756 ts_init: self.timestamp_ns(),
3757 correlation_id: None,
3758 params,
3759 });
3760
3761 self.send_data_cmd(DataCommand::Unsubscribe(command));
3762 }
3763
3764 pub fn unsubscribe_trades(
3766 &mut self,
3767 instrument_id: InstrumentId,
3768 client_id: Option<ClientId>,
3769 params: Option<Params>,
3770 ) {
3771 self.check_registered();
3772
3773 let topic = get_trades_topic(instrument_id);
3774 self.remove_trade_subscription(topic);
3775
3776 let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
3777 instrument_id,
3778 client_id,
3779 venue: Some(instrument_id.venue),
3780 command_id: UUID4::new(),
3781 ts_init: self.timestamp_ns(),
3782 correlation_id: None,
3783 params,
3784 });
3785
3786 self.send_data_cmd(DataCommand::Unsubscribe(command));
3787 }
3788
3789 pub fn unsubscribe_bars(
3791 &mut self,
3792 bar_type: BarType,
3793 client_id: Option<ClientId>,
3794 params: Option<Params>,
3795 ) {
3796 self.check_registered();
3797
3798 let topic = get_bars_topic(bar_type);
3799 self.remove_bar_subscription(topic);
3800
3801 let command = UnsubscribeCommand::Bars(UnsubscribeBars {
3802 bar_type,
3803 client_id,
3804 venue: Some(bar_type.instrument_id().venue),
3805 command_id: UUID4::new(),
3806 ts_init: self.timestamp_ns(),
3807 correlation_id: None,
3808 params,
3809 });
3810
3811 self.send_data_cmd(DataCommand::Unsubscribe(command));
3812 }
3813
3814 pub fn unsubscribe_mark_prices(
3816 &mut self,
3817 instrument_id: InstrumentId,
3818 client_id: Option<ClientId>,
3819 params: Option<Params>,
3820 ) {
3821 self.check_registered();
3822
3823 let topic = get_mark_price_topic(instrument_id);
3824 self.remove_mark_price_subscription(topic);
3825
3826 let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
3827 instrument_id,
3828 client_id,
3829 venue: Some(instrument_id.venue),
3830 command_id: UUID4::new(),
3831 ts_init: self.timestamp_ns(),
3832 correlation_id: None,
3833 params,
3834 });
3835
3836 self.send_data_cmd(DataCommand::Unsubscribe(command));
3837 }
3838
3839 pub fn unsubscribe_index_prices(
3841 &mut self,
3842 instrument_id: InstrumentId,
3843 client_id: Option<ClientId>,
3844 params: Option<Params>,
3845 ) {
3846 self.check_registered();
3847
3848 let topic = get_index_price_topic(instrument_id);
3849 self.remove_index_price_subscription(topic);
3850
3851 let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
3852 instrument_id,
3853 client_id,
3854 venue: Some(instrument_id.venue),
3855 command_id: UUID4::new(),
3856 ts_init: self.timestamp_ns(),
3857 correlation_id: None,
3858 params,
3859 });
3860
3861 self.send_data_cmd(DataCommand::Unsubscribe(command));
3862 }
3863
3864 pub fn unsubscribe_funding_rates(
3866 &mut self,
3867 instrument_id: InstrumentId,
3868 client_id: Option<ClientId>,
3869 params: Option<Params>,
3870 ) {
3871 self.check_registered();
3872
3873 let topic = get_funding_rate_topic(instrument_id);
3874 self.remove_funding_rate_subscription(topic);
3875
3876 let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
3877 instrument_id,
3878 client_id,
3879 venue: Some(instrument_id.venue),
3880 command_id: UUID4::new(),
3881 ts_init: self.timestamp_ns(),
3882 correlation_id: None,
3883 params,
3884 });
3885
3886 self.send_data_cmd(DataCommand::Unsubscribe(command));
3887 }
3888
3889 pub fn unsubscribe_option_greeks(
3891 &mut self,
3892 instrument_id: InstrumentId,
3893 client_id: Option<ClientId>,
3894 params: Option<Params>,
3895 ) {
3896 self.check_registered();
3897
3898 let topic = get_option_greeks_topic(instrument_id);
3899 self.remove_option_greeks_subscription(topic);
3900
3901 let command = UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
3902 instrument_id,
3903 client_id,
3904 venue: Some(instrument_id.venue),
3905 command_id: UUID4::new(),
3906 ts_init: self.timestamp_ns(),
3907 correlation_id: None,
3908 params,
3909 });
3910
3911 self.send_data_cmd(DataCommand::Unsubscribe(command));
3912 }
3913
3914 pub fn unsubscribe_instrument_status(
3916 &mut self,
3917 instrument_id: InstrumentId,
3918 client_id: Option<ClientId>,
3919 params: Option<Params>,
3920 ) {
3921 self.check_registered();
3922
3923 let topic = get_instrument_status_topic(instrument_id);
3924 self.remove_subscription_any(topic);
3925
3926 let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
3927 instrument_id,
3928 client_id,
3929 venue: Some(instrument_id.venue),
3930 command_id: UUID4::new(),
3931 ts_init: self.timestamp_ns(),
3932 correlation_id: None,
3933 params,
3934 });
3935
3936 self.send_data_cmd(DataCommand::Unsubscribe(command));
3937 }
3938
3939 pub fn unsubscribe_instrument_close(
3941 &mut self,
3942 instrument_id: InstrumentId,
3943 client_id: Option<ClientId>,
3944 params: Option<Params>,
3945 ) {
3946 self.check_registered();
3947
3948 let topic = get_instrument_close_topic(instrument_id);
3949 self.remove_instrument_close_subscription(topic);
3950
3951 let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
3952 instrument_id,
3953 client_id,
3954 venue: Some(instrument_id.venue),
3955 command_id: UUID4::new(),
3956 ts_init: self.timestamp_ns(),
3957 correlation_id: None,
3958 params,
3959 });
3960
3961 self.send_data_cmd(DataCommand::Unsubscribe(command));
3962 }
3963
3964 pub fn unsubscribe_option_chain(
3966 &mut self,
3967 series_id: OptionSeriesId,
3968 client_id: Option<ClientId>,
3969 ) {
3970 self.check_registered();
3971
3972 let topic = get_option_chain_topic(series_id);
3973 self.remove_option_chain_subscription(topic);
3974
3975 let command = UnsubscribeCommand::OptionChain(UnsubscribeOptionChain::new(
3976 series_id,
3977 UUID4::new(),
3978 self.timestamp_ns(),
3979 client_id,
3980 Some(series_id.venue),
3981 ));
3982
3983 self.send_data_cmd(DataCommand::Unsubscribe(command));
3984 }
3985
3986 pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
3988 self.check_registered();
3989
3990 let topic = get_order_fills_topic(instrument_id);
3991 self.remove_order_event_subscription(topic);
3992 }
3993
3994 pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
3996 self.check_registered();
3997
3998 let topic = get_order_cancels_topic(instrument_id);
3999 self.remove_order_event_subscription(topic);
4000 }
4001
4002 #[expect(clippy::too_many_arguments)]
4008 pub fn request_data(
4009 &self,
4010 data_type: DataType,
4011 client_id: ClientId,
4012 start: Option<DateTime<Utc>>,
4013 end: Option<DateTime<Utc>>,
4014 limit: Option<NonZeroUsize>,
4015 params: Option<Params>,
4016 handler: ShareableMessageHandler,
4017 ) -> anyhow::Result<UUID4> {
4018 self.check_registered();
4019
4020 let now = self.clock_ref().utc_now();
4021 check_timestamps(now, start, end)?;
4022
4023 let request_id = UUID4::new();
4024 let command = RequestCommand::Data(RequestCustomData {
4025 client_id,
4026 data_type,
4027 start,
4028 end,
4029 limit,
4030 request_id,
4031 ts_init: self.timestamp_ns(),
4032 params,
4033 });
4034
4035 get_message_bus()
4036 .borrow_mut()
4037 .register_response_handler(command.request_id(), handler)?;
4038
4039 self.send_data_cmd(DataCommand::Request(command));
4040
4041 Ok(request_id)
4042 }
4043
4044 pub fn request_instrument(
4050 &self,
4051 instrument_id: InstrumentId,
4052 start: Option<DateTime<Utc>>,
4053 end: Option<DateTime<Utc>>,
4054 client_id: Option<ClientId>,
4055 params: Option<Params>,
4056 handler: ShareableMessageHandler,
4057 ) -> anyhow::Result<UUID4> {
4058 self.check_registered();
4059
4060 let now = self.clock_ref().utc_now();
4061 check_timestamps(now, start, end)?;
4062
4063 let request_id = UUID4::new();
4064 let command = RequestCommand::Instrument(RequestInstrument {
4065 instrument_id,
4066 start,
4067 end,
4068 client_id,
4069 request_id,
4070 ts_init: now.into(),
4071 params,
4072 });
4073
4074 get_message_bus()
4075 .borrow_mut()
4076 .register_response_handler(command.request_id(), handler)?;
4077
4078 self.send_data_cmd(DataCommand::Request(command));
4079
4080 Ok(request_id)
4081 }
4082
4083 pub fn request_instruments(
4089 &self,
4090 venue: Option<Venue>,
4091 start: Option<DateTime<Utc>>,
4092 end: Option<DateTime<Utc>>,
4093 client_id: Option<ClientId>,
4094 params: Option<Params>,
4095 handler: ShareableMessageHandler,
4096 ) -> anyhow::Result<UUID4> {
4097 self.check_registered();
4098
4099 let now = self.clock_ref().utc_now();
4100 check_timestamps(now, start, end)?;
4101
4102 let request_id = UUID4::new();
4103 let command = RequestCommand::Instruments(RequestInstruments {
4104 venue,
4105 start,
4106 end,
4107 client_id,
4108 request_id,
4109 ts_init: now.into(),
4110 params,
4111 });
4112
4113 get_message_bus()
4114 .borrow_mut()
4115 .register_response_handler(command.request_id(), handler)?;
4116
4117 self.send_data_cmd(DataCommand::Request(command));
4118
4119 Ok(request_id)
4120 }
4121
4122 pub fn request_book_snapshot(
4128 &self,
4129 instrument_id: InstrumentId,
4130 depth: Option<NonZeroUsize>,
4131 client_id: Option<ClientId>,
4132 params: Option<Params>,
4133 handler: ShareableMessageHandler,
4134 ) -> anyhow::Result<UUID4> {
4135 self.check_registered();
4136
4137 let request_id = UUID4::new();
4138 let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
4139 instrument_id,
4140 depth,
4141 client_id,
4142 request_id,
4143 ts_init: self.timestamp_ns(),
4144 params,
4145 });
4146
4147 get_message_bus()
4148 .borrow_mut()
4149 .register_response_handler(command.request_id(), handler)?;
4150
4151 self.send_data_cmd(DataCommand::Request(command));
4152
4153 Ok(request_id)
4154 }
4155
4156 #[expect(clippy::too_many_arguments)]
4162 pub fn request_quotes(
4163 &self,
4164 instrument_id: InstrumentId,
4165 start: Option<DateTime<Utc>>,
4166 end: Option<DateTime<Utc>>,
4167 limit: Option<NonZeroUsize>,
4168 client_id: Option<ClientId>,
4169 params: Option<Params>,
4170 handler: ShareableMessageHandler,
4171 ) -> anyhow::Result<UUID4> {
4172 self.check_registered();
4173
4174 let now = self.clock_ref().utc_now();
4175 check_timestamps(now, start, end)?;
4176
4177 let request_id = UUID4::new();
4178 let command = RequestCommand::Quotes(RequestQuotes {
4179 instrument_id,
4180 start,
4181 end,
4182 limit,
4183 client_id,
4184 request_id,
4185 ts_init: now.into(),
4186 params,
4187 });
4188
4189 get_message_bus()
4190 .borrow_mut()
4191 .register_response_handler(command.request_id(), handler)?;
4192
4193 self.send_data_cmd(DataCommand::Request(command));
4194
4195 Ok(request_id)
4196 }
4197
4198 #[expect(clippy::too_many_arguments)]
4204 pub fn request_trades(
4205 &self,
4206 instrument_id: InstrumentId,
4207 start: Option<DateTime<Utc>>,
4208 end: Option<DateTime<Utc>>,
4209 limit: Option<NonZeroUsize>,
4210 client_id: Option<ClientId>,
4211 params: Option<Params>,
4212 handler: ShareableMessageHandler,
4213 ) -> anyhow::Result<UUID4> {
4214 self.check_registered();
4215
4216 let now = self.clock_ref().utc_now();
4217 check_timestamps(now, start, end)?;
4218
4219 let request_id = UUID4::new();
4220 let command = RequestCommand::Trades(RequestTrades {
4221 instrument_id,
4222 start,
4223 end,
4224 limit,
4225 client_id,
4226 request_id,
4227 ts_init: now.into(),
4228 params,
4229 });
4230
4231 get_message_bus()
4232 .borrow_mut()
4233 .register_response_handler(command.request_id(), handler)?;
4234
4235 self.send_data_cmd(DataCommand::Request(command));
4236
4237 Ok(request_id)
4238 }
4239
4240 #[expect(clippy::too_many_arguments)]
4246 pub fn request_funding_rates(
4247 &self,
4248 instrument_id: InstrumentId,
4249 start: Option<DateTime<Utc>>,
4250 end: Option<DateTime<Utc>>,
4251 limit: Option<NonZeroUsize>,
4252 client_id: Option<ClientId>,
4253 params: Option<Params>,
4254 handler: ShareableMessageHandler,
4255 ) -> anyhow::Result<UUID4> {
4256 self.check_registered();
4257
4258 let now = self.clock_ref().utc_now();
4259 check_timestamps(now, start, end)?;
4260
4261 let request_id = UUID4::new();
4262 let command = RequestCommand::FundingRates(RequestFundingRates {
4263 instrument_id,
4264 start,
4265 end,
4266 limit,
4267 client_id,
4268 request_id,
4269 ts_init: now.into(),
4270 params,
4271 });
4272
4273 get_message_bus()
4274 .borrow_mut()
4275 .register_response_handler(command.request_id(), handler)?;
4276
4277 self.send_data_cmd(DataCommand::Request(command));
4278
4279 Ok(request_id)
4280 }
4281
4282 #[expect(clippy::too_many_arguments)]
4288 pub fn request_bars(
4289 &self,
4290 bar_type: BarType,
4291 start: Option<DateTime<Utc>>,
4292 end: Option<DateTime<Utc>>,
4293 limit: Option<NonZeroUsize>,
4294 client_id: Option<ClientId>,
4295 params: Option<Params>,
4296 handler: ShareableMessageHandler,
4297 ) -> anyhow::Result<UUID4> {
4298 self.check_registered();
4299
4300 let now = self.clock_ref().utc_now();
4301 check_timestamps(now, start, end)?;
4302
4303 let request_id = UUID4::new();
4304 let command = RequestCommand::Bars(RequestBars {
4305 bar_type,
4306 start,
4307 end,
4308 limit,
4309 client_id,
4310 request_id,
4311 ts_init: now.into(),
4312 params,
4313 });
4314
4315 get_message_bus()
4316 .borrow_mut()
4317 .register_response_handler(command.request_id(), handler)?;
4318
4319 self.send_data_cmd(DataCommand::Request(command));
4320
4321 Ok(request_id)
4322 }
4323
4324 #[cfg(test)]
4325 pub fn quote_handler_count(&self) -> usize {
4326 self.quote_handlers.len()
4327 }
4328
4329 #[cfg(test)]
4330 pub fn trade_handler_count(&self) -> usize {
4331 self.trade_handlers.len()
4332 }
4333
4334 #[cfg(test)]
4335 pub fn bar_handler_count(&self) -> usize {
4336 self.bar_handlers.len()
4337 }
4338
4339 #[cfg(test)]
4340 pub fn deltas_handler_count(&self) -> usize {
4341 self.deltas_handlers.len()
4342 }
4343
4344 #[cfg(test)]
4345 pub fn has_quote_handler(&self, topic: &str) -> bool {
4346 self.quote_handlers
4347 .contains_key(&MStr::<Topic>::from(topic))
4348 }
4349
4350 #[cfg(test)]
4351 pub fn has_trade_handler(&self, topic: &str) -> bool {
4352 self.trade_handlers
4353 .contains_key(&MStr::<Topic>::from(topic))
4354 }
4355
4356 #[cfg(test)]
4357 pub fn has_bar_handler(&self, topic: &str) -> bool {
4358 self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
4359 }
4360
4361 #[cfg(test)]
4362 pub fn has_deltas_handler(&self, topic: &str) -> bool {
4363 self.deltas_handlers
4364 .contains_key(&MStr::<Topic>::from(topic))
4365 }
4366}
4367
4368fn check_timestamps(
4369 now: DateTime<Utc>,
4370 start: Option<DateTime<Utc>>,
4371 end: Option<DateTime<Utc>>,
4372) -> anyhow::Result<()> {
4373 if let Some(start) = start {
4374 check_predicate_true(start <= now, "start was > now")?;
4375 }
4376
4377 if let Some(end) = end {
4378 check_predicate_true(end <= now, "end was > now")?;
4379 }
4380
4381 if let (Some(start), Some(end)) = (start, end) {
4382 check_predicate_true(start < end, "start was >= end")?;
4383 }
4384
4385 Ok(())
4386}
4387
4388fn log_error(e: &anyhow::Error) {
4389 log::error!("{e}");
4390}
4391
4392fn log_not_running<T>(msg: &T)
4393where
4394 T: Debug,
4395{
4396 log::trace!("Received message when not running - skipping {msg:?}");
4397}
4398
4399fn log_received<T>(msg: &T)
4400where
4401 T: Debug,
4402{
4403 log::debug!("{RECV} {msg:?}");
4404}