1pub mod bar;
32pub mod book;
33mod commands;
34pub mod config;
35mod handlers;
36
37#[cfg(feature = "defi")]
38pub mod pool;
39
40use std::{
41 any::{Any, type_name},
42 cell::{Ref, RefCell},
43 collections::VecDeque,
44 fmt::{Debug, Display},
45 num::NonZeroUsize,
46 rc::Rc,
47};
48
49use ahash::{AHashMap, AHashSet};
50pub use bar::BarAggregatorSubscription;
51use book::{
52 BookSnapshotInfo, BookSnapshotInfos, BookSnapshotKey, BookSnapshotUnsubscribeResult,
53 BookSnapshotter, BookUpdater,
54};
55pub(crate) use commands::{DeferredCommand, DeferredCommandQueue};
56use config::DataEngineConfig;
57use futures::future::join_all;
58use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
59use indexmap::IndexMap;
60use nautilus_common::{
61 cache::Cache,
62 clock::Clock,
63 logging::{RECV, RES},
64 messages::data::{
65 DataCommand, DataResponse, ForwardPricesResponse, RequestCommand, RequestForwardPrices,
66 SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
67 SubscribeCommand, SubscribeOptionChain, UnsubscribeBars, UnsubscribeBookDeltas,
68 UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCommand,
69 UnsubscribeInstrumentStatus, UnsubscribeOptionChain, UnsubscribeOptionGreeks,
70 UnsubscribeQuotes,
71 },
72 msgbus::{
73 self, ShareableMessageHandler, TypedHandler, TypedIntoHandler,
74 switchboard::{self, MessagingSwitchboard},
75 },
76 runner::get_data_cmd_sender,
77 timer::{TimeEvent, TimeEventCallback},
78};
79use nautilus_core::{
80 UUID4, WeakCell,
81 correctness::{
82 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
83 },
84 datetime::millis_to_nanos_unchecked,
85};
86#[cfg(feature = "defi")]
87use nautilus_model::defi::DefiData;
88use nautilus_model::{
89 data::{
90 Bar, BarType, CustomData, Data, DataType, FundingRateUpdate, IndexPriceUpdate,
91 InstrumentClose, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
92 OrderBookDepth10, QuoteTick, TradeTick,
93 option_chain::{OptionGreeks, StrikeRange},
94 },
95 enums::{
96 AggregationSource, BarAggregation, BookType, MarketStatusAction, PriceType, RecordFlag,
97 },
98 identifiers::{ClientId, InstrumentId, OptionSeriesId, Venue},
99 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
100 orderbook::OrderBook,
101 types::Price,
102};
103#[cfg(feature = "streaming")]
104use nautilus_persistence::backend::catalog::ParquetDataCatalog;
105use ustr::Ustr;
106
107#[cfg(feature = "defi")]
108#[allow(unused_imports)] use crate::defi::engine as _;
110#[cfg(feature = "defi")]
111use crate::engine::pool::PoolUpdater;
112use crate::{
113 aggregation::{
114 BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
115 TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
116 ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
117 VolumeRunsBarAggregator,
118 },
119 client::DataClientAdapter,
120 option_chains::OptionChainManager,
121};
122
123#[derive(Debug)]
125pub struct DataEngine {
126 pub(crate) clock: Rc<RefCell<dyn Clock>>,
127 pub(crate) cache: Rc<RefCell<Cache>>,
128 pub(crate) external_clients: AHashSet<ClientId>,
129 clients: IndexMap<ClientId, DataClientAdapter>,
130 default_client: Option<DataClientAdapter>,
131 #[cfg(feature = "streaming")]
132 catalogs: AHashMap<Ustr, ParquetDataCatalog>,
133 routing_map: IndexMap<Venue, ClientId>,
134 book_intervals: AHashMap<NonZeroUsize, BookSnapshotInfos>,
135 book_snapshot_counts: IndexMap<BookSnapshotKey, usize>,
136 book_deltas_subs: AHashSet<InstrumentId>,
137 book_depth10_subs: AHashSet<InstrumentId>,
138 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
139 book_snapshotters: AHashMap<NonZeroUsize, Rc<BookSnapshotter>>,
140 bar_aggregators: IndexMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
141 bar_aggregator_handlers: AHashMap<BarType, Vec<BarAggregatorSubscription>>,
142 option_chain_managers: AHashMap<OptionSeriesId, Rc<RefCell<OptionChainManager>>>,
143 option_chain_instrument_index: AHashMap<InstrumentId, OptionSeriesId>,
144 deferred_cmd_queue: DeferredCommandQueue,
145 pending_option_chain_requests: AHashMap<UUID4, SubscribeOptionChain>,
146 _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
147 _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
148 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
149 command_count: u64,
150 data_count: u64,
151 request_count: u64,
152 response_count: u64,
153 pub(crate) msgbus_priority: u8,
154 pub(crate) config: DataEngineConfig,
155 #[cfg(feature = "defi")]
156 pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
157 #[cfg(feature = "defi")]
158 pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
159 #[cfg(feature = "defi")]
160 pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
161 #[cfg(feature = "defi")]
162 pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
163}
164
165impl DataEngine {
166 #[must_use]
168 pub fn new(
169 clock: Rc<RefCell<dyn Clock>>,
170 cache: Rc<RefCell<Cache>>,
171 config: Option<DataEngineConfig>,
172 ) -> Self {
173 let config = config.unwrap_or_default();
174
175 let external_clients: AHashSet<ClientId> = config
176 .external_clients
177 .clone()
178 .unwrap_or_default()
179 .into_iter()
180 .collect();
181
182 Self {
183 clock,
184 cache,
185 external_clients,
186 clients: IndexMap::new(),
187 default_client: None,
188 #[cfg(feature = "streaming")]
189 catalogs: AHashMap::new(),
190 routing_map: IndexMap::new(),
191 book_intervals: AHashMap::new(),
192 book_snapshot_counts: IndexMap::new(),
193 book_deltas_subs: AHashSet::new(),
194 book_depth10_subs: AHashSet::new(),
195 book_updaters: AHashMap::new(),
196 book_snapshotters: AHashMap::new(),
197 bar_aggregators: IndexMap::new(),
198 bar_aggregator_handlers: AHashMap::new(),
199 option_chain_managers: AHashMap::new(),
200 option_chain_instrument_index: AHashMap::new(),
201 deferred_cmd_queue: Rc::new(RefCell::new(VecDeque::new())),
202 pending_option_chain_requests: AHashMap::new(),
203 _synthetic_quote_feeds: AHashMap::new(),
204 _synthetic_trade_feeds: AHashMap::new(),
205 buffered_deltas_map: AHashMap::new(),
206 command_count: 0,
207 data_count: 0,
208 request_count: 0,
209 response_count: 0,
210 msgbus_priority: 10, config,
212 #[cfg(feature = "defi")]
213 pool_updaters: AHashMap::new(),
214 #[cfg(feature = "defi")]
215 pool_updaters_pending: AHashSet::new(),
216 #[cfg(feature = "defi")]
217 pool_snapshot_pending: AHashSet::new(),
218 #[cfg(feature = "defi")]
219 pool_event_buffers: AHashMap::new(),
220 }
221 }
222
223 pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
225 let weak = WeakCell::from(Rc::downgrade(engine));
226
227 let weak1 = weak.clone();
228 msgbus::register_data_command_endpoint(
229 MessagingSwitchboard::data_engine_execute(),
230 TypedIntoHandler::from(move |cmd: DataCommand| {
231 if let Some(rc) = weak1.upgrade() {
232 rc.borrow_mut().execute(cmd);
233 }
234 }),
235 );
236
237 msgbus::register_data_command_endpoint(
238 MessagingSwitchboard::data_engine_queue_execute(),
239 TypedIntoHandler::from(move |cmd: DataCommand| {
240 get_data_cmd_sender().clone().execute(cmd);
241 }),
242 );
243
244 let weak2 = weak.clone();
246 msgbus::register_any(
247 MessagingSwitchboard::data_engine_process(),
248 ShareableMessageHandler::from_any(move |data: &dyn Any| {
249 if let Some(rc) = weak2.upgrade() {
250 rc.borrow_mut().process(data);
251 }
252 }),
253 );
254
255 let weak3 = weak.clone();
257 msgbus::register_data_endpoint(
258 MessagingSwitchboard::data_engine_process_data(),
259 TypedIntoHandler::from(move |data: Data| {
260 if let Some(rc) = weak3.upgrade() {
261 rc.borrow_mut().process_data(data);
262 }
263 }),
264 );
265
266 #[cfg(feature = "defi")]
268 {
269 let weak4 = weak.clone();
270 msgbus::register_defi_data_endpoint(
271 MessagingSwitchboard::data_engine_process_defi_data(),
272 TypedIntoHandler::from(move |data: DefiData| {
273 if let Some(rc) = weak4.upgrade() {
274 rc.borrow_mut().process_defi_data(data);
275 }
276 }),
277 );
278 }
279
280 let weak5 = weak;
281 msgbus::register_data_response_endpoint(
282 MessagingSwitchboard::data_engine_response(),
283 TypedIntoHandler::from(move |resp: DataResponse| {
284 if let Some(rc) = weak5.upgrade() {
285 rc.borrow_mut().response(resp);
286 }
287 }),
288 );
289 }
290
291 #[must_use]
293 pub const fn command_count(&self) -> u64 {
294 self.command_count
295 }
296
297 #[must_use]
299 pub const fn data_count(&self) -> u64 {
300 self.data_count
301 }
302
303 #[cfg(feature = "defi")]
304 pub(crate) const fn increment_data_count(&mut self) {
305 self.data_count += 1;
306 }
307
308 #[must_use]
310 pub const fn request_count(&self) -> u64 {
311 self.request_count
312 }
313
314 #[must_use]
316 pub const fn response_count(&self) -> u64 {
317 self.response_count
318 }
319
320 #[must_use]
322 pub fn has_option_chain_manager(&self, series_id: &OptionSeriesId) -> bool {
323 self.option_chain_managers.contains_key(series_id)
324 }
325
326 #[must_use]
328 pub fn pending_option_chain_request_count(&self) -> usize {
329 self.pending_option_chain_requests.len()
330 }
331
332 #[must_use]
334 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
335 self.clock.borrow()
336 }
337
338 #[must_use]
340 pub fn get_cache(&self) -> Ref<'_, Cache> {
341 self.cache.borrow()
342 }
343
344 #[must_use]
346 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
347 Rc::clone(&self.cache)
348 }
349
350 #[cfg(feature = "streaming")]
356 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<&str>) {
357 let name = Ustr::from(name.unwrap_or("catalog_0"));
358
359 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
360
361 self.catalogs.insert(name, catalog);
362 log::info!("Registered catalog <{name}>");
363 }
364
365 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
372 let client_id = client.client_id();
373
374 if let Some(default_client) = &self.default_client {
375 check_predicate_false(
376 default_client.client_id() == client.client_id(),
377 "client_id already registered as default client",
378 )
379 .expect(FAILED);
380 }
381
382 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
383
384 if let Some(routing) = routing {
385 self.routing_map.insert(routing, client_id);
386 log::debug!("Set client {client_id} routing for {routing}");
387 }
388
389 if client.venue.is_none() && self.default_client.is_none() {
390 self.default_client = Some(client);
391 log::debug!("Registered client {client_id} for default routing");
392 } else {
393 self.clients.insert(client_id, client);
394 log::debug!("Registered client {client_id}");
395 }
396 }
397
398 pub fn deregister_client(&mut self, client_id: &ClientId) {
404 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
405
406 self.clients.shift_remove(client_id);
407 log::info!("Deregistered client {client_id}");
408 }
409
410 pub fn register_default_client(&mut self, client: DataClientAdapter) {
422 check_predicate_true(
423 self.default_client.is_none(),
424 "default client already registered",
425 )
426 .expect(FAILED);
427
428 let client_id = client.client_id();
429
430 self.default_client = Some(client);
431 log::debug!("Registered default client {client_id}");
432 }
433
434 pub fn start(&mut self) {
436 for client in self.get_clients_mut() {
437 if let Err(e) = client.start() {
438 log::error!("{e}");
439 }
440 }
441
442 for aggregator in self.bar_aggregators.values() {
443 if aggregator.borrow().bar_type().spec().is_time_aggregated() {
444 aggregator
445 .borrow_mut()
446 .start_timer(Some(aggregator.clone()));
447 }
448 }
449 }
450
451 pub fn stop(&mut self) {
453 for client in self.get_clients_mut() {
454 if let Err(e) = client.stop() {
455 log::error!("{e}");
456 }
457 }
458
459 for aggregator in self.bar_aggregators.values() {
460 aggregator.borrow_mut().stop();
461 }
462 }
463
464 pub fn reset(&mut self) {
466 for client in self.get_clients_mut() {
467 if let Err(e) = client.reset() {
468 log::error!("{e}");
469 }
470 }
471
472 let bar_types: Vec<BarType> = self.bar_aggregators.keys().copied().collect();
473 for bar_type in bar_types {
474 if let Err(e) = self.stop_bar_aggregator(bar_type) {
475 log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
476 }
477 }
478
479 self.command_count = 0;
480 self.data_count = 0;
481 self.request_count = 0;
482 self.response_count = 0;
483 }
484
485 pub fn dispose(&mut self) {
487 for client in self.get_clients_mut() {
488 if let Err(e) = client.dispose() {
489 log::error!("{e}");
490 }
491 }
492
493 self.clock.borrow_mut().cancel_timers();
494 }
495
496 pub async fn connect(&mut self) {
500 let futures: Vec<_> = self
501 .get_clients_mut()
502 .into_iter()
503 .map(|client| client.connect())
504 .collect();
505
506 let results = join_all(futures).await;
507
508 for error in results.into_iter().filter_map(Result::err) {
509 log::error!("Failed to connect data client: {error}");
510 }
511 }
512
513 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
519 let futures: Vec<_> = self
520 .get_clients_mut()
521 .into_iter()
522 .map(|client| client.disconnect())
523 .collect();
524
525 let results = join_all(futures).await;
526 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
527
528 if errors.is_empty() {
529 Ok(())
530 } else {
531 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
532 anyhow::bail!(
533 "Failed to disconnect data clients: {}",
534 error_msgs.join("; ")
535 )
536 }
537 }
538
539 #[must_use]
541 pub fn check_connected(&self) -> bool {
542 self.get_clients()
543 .iter()
544 .all(|client| client.is_connected())
545 }
546
547 #[must_use]
549 pub fn check_disconnected(&self) -> bool {
550 self.get_clients()
551 .iter()
552 .all(|client| !client.is_connected())
553 }
554
555 #[must_use]
557 pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
558 self.get_clients()
559 .into_iter()
560 .map(|client| (client.client_id(), client.is_connected()))
561 .collect()
562 }
563
564 #[must_use]
566 pub fn registered_clients(&self) -> Vec<ClientId> {
567 self.get_clients()
568 .into_iter()
569 .map(|client| client.client_id())
570 .collect()
571 }
572
573 pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
576 where
577 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
578 T: Clone,
579 {
580 self.get_clients()
581 .into_iter()
582 .flat_map(get_subs)
583 .cloned()
584 .collect()
585 }
586
587 #[must_use]
588 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
589 let (default_opt, clients_map) = (&self.default_client, &self.clients);
590 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
591
592 if let Some(default) = default_opt {
593 clients.push(default);
594 }
595
596 clients
597 }
598
599 #[must_use]
600 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
601 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
602 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
603
604 if let Some(default) = default_opt {
605 clients.push(default);
606 }
607
608 clients
609 }
610
611 pub fn get_client(
612 &mut self,
613 client_id: Option<&ClientId>,
614 venue: Option<&Venue>,
615 ) -> Option<&mut DataClientAdapter> {
616 if let Some(client_id) = client_id {
617 if let Some(client) = self.clients.get_mut(client_id) {
619 return Some(client);
620 }
621
622 if let Some(default) = self.default_client.as_mut()
624 && default.client_id() == *client_id
625 {
626 return Some(default);
627 }
628
629 return None;
631 }
632
633 if let Some(v) = venue {
634 if let Some(client_id) = self.routing_map.get(v) {
636 return self.clients.get_mut(client_id);
637 }
638 }
639
640 self.get_default_client()
642 }
643
644 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
645 self.default_client.as_mut()
646 }
647
648 #[must_use]
650 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
651 self.collect_subscriptions(|client| &client.subscriptions_custom)
652 }
653
654 #[must_use]
656 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
657 self.collect_subscriptions(|client| &client.subscriptions_instrument)
658 }
659
660 #[must_use]
662 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
663 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
664 }
665
666 #[must_use]
668 pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
669 self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
670 }
671
672 #[must_use]
674 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
675 self.book_snapshot_counts
676 .keys()
677 .map(|(instrument_id, _)| *instrument_id)
678 .collect()
679 }
680
681 #[must_use]
683 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
684 self.collect_subscriptions(|client| &client.subscriptions_quotes)
685 }
686
687 #[must_use]
689 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
690 self.collect_subscriptions(|client| &client.subscriptions_trades)
691 }
692
693 #[must_use]
695 pub fn subscribed_bars(&self) -> Vec<BarType> {
696 self.collect_subscriptions(|client| &client.subscriptions_bars)
697 }
698
699 #[must_use]
701 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
702 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
703 }
704
705 #[must_use]
707 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
708 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
709 }
710
711 #[must_use]
713 pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
714 self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
715 }
716
717 #[must_use]
719 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
720 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
721 }
722
723 #[must_use]
725 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
726 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
727 }
728
729 pub fn execute(&mut self, cmd: DataCommand) {
735 match &cmd {
736 DataCommand::Subscribe(_) | DataCommand::Unsubscribe(_) => self.command_count += 1,
737 DataCommand::Request(_) => self.request_count += 1,
738 #[cfg(feature = "defi")]
739 DataCommand::DefiRequest(_) => self.request_count += 1,
740 #[cfg(feature = "defi")]
741 DataCommand::DefiSubscribe(_) | DataCommand::DefiUnsubscribe(_) => {
742 self.command_count += 1;
743 }
744 _ => {}
745 }
746
747 if let Err(e) = match cmd {
748 DataCommand::Subscribe(c) => self.execute_subscribe(c),
749 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
750 DataCommand::Request(c) => self.execute_request(c),
751 #[cfg(feature = "defi")]
752 DataCommand::DefiRequest(c) => self.execute_defi_request(c),
753 #[cfg(feature = "defi")]
754 DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
755 #[cfg(feature = "defi")]
756 DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
757 _ => {
758 log::warn!("Unhandled DataCommand variant");
759 Ok(())
760 }
761 } {
762 log::error!("{e}");
763 }
764 }
765
766 pub fn execute_subscribe(&mut self, cmd: SubscribeCommand) -> anyhow::Result<()> {
773 match &cmd {
775 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
776 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
777 SubscribeCommand::BookSnapshots(cmd) => {
778 return self.subscribe_book_snapshots(cmd);
780 }
781 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
782 SubscribeCommand::OptionChain(cmd) => {
783 self.subscribe_option_chain(cmd);
784 return Ok(());
785 }
786 SubscribeCommand::Instrument(cmd) if cmd.instrument_id.is_synthetic() => {
787 anyhow::bail!("Cannot subscribe for synthetic instrument `Instrument` data");
788 }
789 SubscribeCommand::InstrumentStatus(cmd) if cmd.instrument_id.is_synthetic() => {
790 anyhow::bail!("Cannot subscribe for synthetic instrument `InstrumentStatus` data");
791 }
792 SubscribeCommand::InstrumentClose(cmd) if cmd.instrument_id.is_synthetic() => {
793 anyhow::bail!("Cannot subscribe for synthetic instrument `InstrumentClose` data");
794 }
795 SubscribeCommand::OptionGreeks(cmd) if cmd.instrument_id.is_synthetic() => {
796 anyhow::bail!("Cannot subscribe for synthetic instrument `OptionGreeks` data");
797 }
798 _ => {} }
800
801 if let Some(client_id) = cmd.client_id()
802 && self.external_clients.contains(client_id)
803 {
804 if self.config.debug {
805 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
806 }
807 return Ok(());
808 }
809
810 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
811 client.execute_subscribe(cmd);
812 } else {
813 log::error!(
814 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
815 cmd.client_id(),
816 cmd.venue(),
817 );
818 }
819
820 Ok(())
821 }
822
823 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
829 match &cmd {
830 UnsubscribeCommand::BookDeltas(cmd) if !self.unsubscribe_book_deltas(cmd) => {
831 return Ok(());
832 }
833 UnsubscribeCommand::BookDepth10(cmd) if !self.unsubscribe_book_depth10(cmd) => {
834 return Ok(());
835 }
836 UnsubscribeCommand::BookSnapshots(cmd) => {
837 self.unsubscribe_book_snapshots(cmd);
839 return Ok(());
840 }
841 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
842 UnsubscribeCommand::OptionChain(cmd) => {
843 self.unsubscribe_option_chain(cmd);
844 return Ok(());
845 }
846 UnsubscribeCommand::Instrument(cmd) if cmd.instrument_id.is_synthetic() => {
847 anyhow::bail!("Cannot unsubscribe from synthetic instrument `Instrument` data");
848 }
849 UnsubscribeCommand::InstrumentStatus(cmd) if cmd.instrument_id.is_synthetic() => {
850 anyhow::bail!(
851 "Cannot unsubscribe from synthetic instrument `InstrumentStatus` data"
852 );
853 }
854 UnsubscribeCommand::InstrumentClose(cmd) if cmd.instrument_id.is_synthetic() => {
855 anyhow::bail!(
856 "Cannot unsubscribe from synthetic instrument `InstrumentClose` data"
857 );
858 }
859 UnsubscribeCommand::OptionGreeks(cmd) if cmd.instrument_id.is_synthetic() => {
860 anyhow::bail!("Cannot unsubscribe from synthetic instrument `OptionGreeks` data");
861 }
862 _ => {} }
864
865 if let Some(client_id) = cmd.client_id()
866 && self.external_clients.contains(client_id)
867 {
868 if self.config.debug {
869 log::debug!(
870 "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
871 );
872 }
873 return Ok(());
874 }
875
876 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
877 client.execute_unsubscribe(cmd);
878 } else {
879 log::error!(
880 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
881 cmd.client_id(),
882 cmd.venue(),
883 );
884 }
885
886 Ok(())
887 }
888
889 pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
896 if let Some(cid) = req.client_id()
898 && self.external_clients.contains(cid)
899 {
900 if self.config.debug {
901 log::debug!("Skipping data request for external client {cid}: {req:?}");
902 }
903 return Ok(());
904 }
905
906 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
907 match req {
908 RequestCommand::Data(req) => client.request_data(req),
909 RequestCommand::Instrument(req) => client.request_instrument(req),
910 RequestCommand::Instruments(req) => client.request_instruments(req),
911 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
912 RequestCommand::BookDepth(req) => client.request_book_depth(req),
913 RequestCommand::Quotes(req) => client.request_quotes(req),
914 RequestCommand::Trades(req) => client.request_trades(req),
915 RequestCommand::FundingRates(req) => client.request_funding_rates(req),
916 RequestCommand::ForwardPrices(req) => client.request_forward_prices(req),
917 RequestCommand::Bars(req) => client.request_bars(req),
918 }
919 } else {
920 anyhow::bail!(
921 "Cannot handle request: no client found for {:?} {:?}",
922 req.client_id(),
923 req.venue()
924 );
925 }
926 }
927
928 pub fn process(&mut self, data: &dyn Any) {
932 self.data_count += 1;
933 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
935 self.handle_instrument(instrument);
936 } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
937 self.handle_funding_rate(*funding_rate);
938 } else if let Some(status) = data.downcast_ref::<InstrumentStatus>() {
939 self.handle_instrument_status(*status);
940 } else if let Some(option_greeks) = data.downcast_ref::<OptionGreeks>() {
941 self.cache.borrow_mut().add_option_greeks(*option_greeks);
942 let topic = switchboard::get_option_greeks_topic(option_greeks.instrument_id);
943 msgbus::publish_option_greeks(topic, option_greeks);
944 self.drain_deferred_commands();
945 } else {
946 log::error!("Cannot process data {data:?}, type is unrecognized");
947 }
948
949 }
951
952 pub fn process_data(&mut self, data: Data) {
954 self.data_count += 1;
955
956 match data {
957 Data::Delta(delta) => self.handle_delta(delta),
958 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
959 Data::Depth10(depth) => self.handle_depth10(*depth),
960 Data::Quote(quote) => {
961 self.handle_quote(quote);
962 self.drain_deferred_commands();
963 }
964 Data::Trade(trade) => self.handle_trade(trade),
965 Data::Bar(bar) => self.handle_bar(bar),
966 Data::MarkPriceUpdate(mark_price) => {
967 self.handle_mark_price(mark_price);
968 self.drain_deferred_commands();
969 }
970 Data::IndexPriceUpdate(index_price) => {
971 self.handle_index_price(index_price);
972 self.drain_deferred_commands();
973 }
974 Data::InstrumentStatus(status) => {
975 self.handle_instrument_status(status);
976 self.drain_deferred_commands();
977 }
978 Data::InstrumentClose(close) => self.handle_instrument_close(close),
979 Data::Custom(custom) => self.handle_custom_data(&custom),
980 }
981 }
982
983 #[expect(clippy::needless_pass_by_value)] pub fn response(&mut self, resp: DataResponse) {
986 log::debug!("{RECV}{RES} {resp:?}");
987
988 self.response_count += 1;
989 let correlation_id = *resp.correlation_id();
990
991 match &resp {
992 DataResponse::Instrument(r) => {
993 self.handle_instrument_response(r.data.clone());
994 }
995 DataResponse::Instruments(r) => {
996 self.handle_instruments(&r.data);
997 }
998 DataResponse::Quotes(r) => {
999 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1000 self.handle_quotes(&r.data);
1001 }
1002 }
1003 DataResponse::Trades(r) => {
1004 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1005 self.handle_trades(&r.data);
1006 }
1007 }
1008 DataResponse::FundingRates(r) => {
1009 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1010 self.handle_funding_rates(&r.data);
1011 }
1012 }
1013 DataResponse::Bars(r) => {
1014 if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
1015 self.handle_bars(&r.data);
1016 }
1017 }
1018 DataResponse::Book(r) => self.handle_book_response(&r.data),
1019 DataResponse::ForwardPrices(r) => {
1020 return self.handle_forward_prices_response(&correlation_id, r);
1021 }
1022 _ => todo!("Handle other response types"),
1023 }
1024
1025 msgbus::send_response(&correlation_id, &resp);
1026 }
1027
1028 fn handle_instrument(&mut self, instrument: &InstrumentAny) {
1031 log::debug!("Handling instrument: {}", instrument.id());
1032
1033 if let Err(e) = self
1034 .cache
1035 .as_ref()
1036 .borrow_mut()
1037 .add_instrument(instrument.clone())
1038 {
1039 log_error_on_cache_insert(&e);
1040 }
1041
1042 let topic = switchboard::get_instrument_topic(instrument.id());
1043 log::debug!("Publishing instrument to topic: {topic}");
1044 msgbus::publish_any(topic, instrument);
1045
1046 self.update_option_chains(instrument);
1047 }
1048
1049 fn update_option_chains(&mut self, instrument: &InstrumentAny) {
1050 let Some(underlying) = instrument.underlying() else {
1051 return;
1052 };
1053 let Some(expiration_ns) = instrument.expiration_ns() else {
1054 return;
1055 };
1056 let Some(strike) = instrument.strike_price() else {
1057 return;
1058 };
1059 let Some(kind) = instrument.option_kind() else {
1060 return;
1061 };
1062
1063 let venue = instrument.id().venue;
1064 let settlement = instrument.settlement_currency().code;
1065 let series_id = OptionSeriesId::new(venue, underlying, settlement, expiration_ns);
1066
1067 let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1069 return;
1070 };
1071
1072 let clock = self.clock.clone();
1073 let client = self.get_client(None, Some(&venue));
1074
1075 if manager_rc
1076 .borrow_mut()
1077 .add_instrument(instrument.id(), strike, kind, client, &clock)
1078 {
1079 self.option_chain_instrument_index
1080 .insert(instrument.id(), series_id);
1081 }
1082 }
1083
1084 fn handle_delta(&mut self, delta: OrderBookDelta) {
1085 let deltas = if self.config.buffer_deltas {
1086 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
1087 buffered_deltas.deltas.push(delta);
1088 buffered_deltas.flags = delta.flags;
1089 buffered_deltas.sequence = delta.sequence;
1090 buffered_deltas.ts_event = delta.ts_event;
1091 buffered_deltas.ts_init = delta.ts_init;
1092 } else {
1093 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
1094 self.buffered_deltas_map
1095 .insert(delta.instrument_id, buffered_deltas);
1096 }
1097
1098 if !RecordFlag::F_LAST.matches(delta.flags) {
1099 return; }
1101
1102 self.buffered_deltas_map
1103 .remove(&delta.instrument_id)
1104 .expect("buffered deltas exist")
1105 } else {
1106 OrderBookDeltas::new(delta.instrument_id, vec![delta])
1107 };
1108
1109 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
1110 msgbus::publish_deltas(topic, &deltas);
1111 }
1112
1113 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
1114 if self.config.buffer_deltas {
1115 let instrument_id = deltas.instrument_id;
1116
1117 for delta in deltas.deltas {
1118 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
1119 buffered_deltas.deltas.push(delta);
1120 buffered_deltas.flags = delta.flags;
1121 buffered_deltas.sequence = delta.sequence;
1122 buffered_deltas.ts_event = delta.ts_event;
1123 buffered_deltas.ts_init = delta.ts_init;
1124 } else {
1125 let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1126 self.buffered_deltas_map
1127 .insert(instrument_id, buffered_deltas);
1128 }
1129
1130 if RecordFlag::F_LAST.matches(delta.flags) {
1131 let deltas_to_publish = self
1132 .buffered_deltas_map
1133 .remove(&instrument_id)
1134 .expect("buffered deltas exist");
1135 let topic = switchboard::get_book_deltas_topic(instrument_id);
1136 msgbus::publish_deltas(topic, &deltas_to_publish);
1137 }
1138 }
1139 } else {
1140 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
1141 msgbus::publish_deltas(topic, &deltas);
1142 }
1143 }
1144
1145 fn handle_depth10(&self, depth: OrderBookDepth10) {
1146 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
1147 msgbus::publish_depth10(topic, &depth);
1148 }
1149
1150 fn handle_quote(&self, quote: QuoteTick) {
1151 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
1152 log_error_on_cache_insert(&e);
1153 }
1154
1155 let topic = switchboard::get_quotes_topic(quote.instrument_id);
1158 msgbus::publish_quote(topic, "e);
1159 }
1160
1161 fn handle_trade(&self, trade: TradeTick) {
1162 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
1163 log_error_on_cache_insert(&e);
1164 }
1165
1166 let topic = switchboard::get_trades_topic(trade.instrument_id);
1169 msgbus::publish_trade(topic, &trade);
1170 }
1171
1172 fn handle_bar(&self, bar: Bar) {
1173 if self.config.validate_data_sequence
1175 && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
1176 {
1177 if bar.ts_event < last_bar.ts_event {
1178 log::warn!(
1179 "Bar {bar} was prior to last bar `ts_event` {}",
1180 last_bar.ts_event
1181 );
1182 return; }
1184
1185 if bar.ts_init < last_bar.ts_init {
1186 log::warn!(
1187 "Bar {bar} was prior to last bar `ts_init` {}",
1188 last_bar.ts_init
1189 );
1190 return; }
1192 }
1194
1195 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
1196 log_error_on_cache_insert(&e);
1197 }
1198
1199 let topic = switchboard::get_bars_topic(bar.bar_type);
1200 msgbus::publish_bar(topic, &bar);
1201 }
1202
1203 fn handle_mark_price(&self, mark_price: MarkPriceUpdate) {
1204 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
1205 log_error_on_cache_insert(&e);
1206 }
1207
1208 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
1209 msgbus::publish_mark_price(topic, &mark_price);
1210 }
1211
1212 fn handle_index_price(&self, index_price: IndexPriceUpdate) {
1213 if let Err(e) = self
1214 .cache
1215 .as_ref()
1216 .borrow_mut()
1217 .add_index_price(index_price)
1218 {
1219 log_error_on_cache_insert(&e);
1220 }
1221
1222 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
1223 msgbus::publish_index_price(topic, &index_price);
1224 }
1225
1226 pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
1228 if let Err(e) = self
1229 .cache
1230 .as_ref()
1231 .borrow_mut()
1232 .add_funding_rate(funding_rate)
1233 {
1234 log_error_on_cache_insert(&e);
1235 }
1236
1237 let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
1238 msgbus::publish_funding_rate(topic, &funding_rate);
1239 }
1240
1241 fn handle_instrument_status(&mut self, status: InstrumentStatus) {
1242 if let Err(e) = self
1243 .cache
1244 .as_ref()
1245 .borrow_mut()
1246 .add_instrument_status(status)
1247 {
1248 log_error_on_cache_insert(&e);
1249 }
1250
1251 let topic = switchboard::get_instrument_status_topic(status.instrument_id);
1252 msgbus::publish_any(topic, &status);
1253
1254 if self
1256 .option_chain_instrument_index
1257 .contains_key(&status.instrument_id)
1258 && matches!(
1259 status.action,
1260 MarketStatusAction::Close | MarketStatusAction::NotAvailableForTrading
1261 )
1262 {
1263 self.expire_option_chain_instrument(status.instrument_id);
1264 }
1265 }
1266
1267 fn expire_option_chain_instrument(&mut self, instrument_id: InstrumentId) {
1274 let Some(series_id) = self.option_chain_instrument_index.remove(&instrument_id) else {
1275 return;
1276 };
1277
1278 let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1279 return;
1280 };
1281
1282 let series_empty = manager_rc
1283 .borrow_mut()
1284 .handle_instrument_expired(&instrument_id);
1285
1286 self.drain_deferred_commands();
1288
1289 log::info!(
1290 "Expired instrument {instrument_id} from option chain {series_id} (series_empty={series_empty})",
1291 );
1292
1293 if series_empty {
1294 manager_rc.borrow_mut().teardown(&self.clock);
1295 self.option_chain_managers.remove(&series_id);
1296
1297 log::info!("Torn down empty option chain manager for {series_id}");
1298 }
1299 }
1300
1301 fn handle_instrument_close(&self, close: InstrumentClose) {
1302 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
1303 msgbus::publish_any(topic, &close);
1304 }
1305
1306 fn handle_custom_data(&self, custom: &CustomData) {
1307 log::debug!("Processing custom data: {}", custom.data.type_name());
1308 let topic = switchboard::get_custom_topic(&custom.data_type);
1309 msgbus::publish_any(topic, custom);
1310 }
1311
1312 fn drain_deferred_commands(&mut self) {
1316 loop {
1318 let commands: VecDeque<DeferredCommand> =
1319 std::mem::take(&mut *self.deferred_cmd_queue.borrow_mut());
1320
1321 if commands.is_empty() {
1322 break;
1323 }
1324
1325 for cmd in commands {
1326 match cmd {
1327 DeferredCommand::Subscribe(sub) => {
1328 let client = self.get_client(sub.client_id(), sub.venue());
1329 if let Some(client) = client {
1330 client.execute_subscribe(sub);
1331 }
1332 }
1333 DeferredCommand::Unsubscribe(unsub) => {
1334 let client = self.get_client(unsub.client_id(), unsub.venue());
1335 if let Some(client) = client {
1336 client.execute_unsubscribe(&unsub);
1337 }
1338 }
1339 DeferredCommand::ExpireSeries(series_id) => {
1340 self.expire_series(series_id);
1341 }
1342 }
1343 }
1344 }
1345 }
1346
1347 fn expire_series(&mut self, series_id: OptionSeriesId) {
1353 let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1354 return;
1355 };
1356
1357 let instrument_ids: Vec<InstrumentId> = self
1358 .option_chain_instrument_index
1359 .iter()
1360 .filter(|(_, sid)| **sid == series_id)
1361 .map(|(id, _)| *id)
1362 .collect();
1363
1364 for id in &instrument_ids {
1365 self.option_chain_instrument_index.remove(id);
1366 manager_rc.borrow_mut().handle_instrument_expired(id);
1367 }
1368
1369 manager_rc.borrow_mut().teardown(&self.clock);
1370 self.option_chain_managers.remove(&series_id);
1371
1372 log::info!("Proactively torn down expired option chain {series_id}");
1373 }
1374
1375 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
1378 if cmd.instrument_id.is_synthetic() {
1379 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1380 }
1381
1382 self.book_deltas_subs.insert(cmd.instrument_id);
1383 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
1384
1385 Ok(())
1386 }
1387
1388 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
1389 if cmd.instrument_id.is_synthetic() {
1390 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
1391 }
1392
1393 self.book_depth10_subs.insert(cmd.instrument_id);
1394 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
1395
1396 Ok(())
1397 }
1398
1399 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
1400 if cmd.instrument_id.is_synthetic() {
1401 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1402 }
1403
1404 let had_snapshots = self.has_book_snapshot_subscriptions(&cmd.instrument_id);
1405 let inserted = self.increment_book_snapshot_subscription(cmd);
1406
1407 if inserted && !had_snapshots && !self.book_deltas_subs.contains(&cmd.instrument_id) {
1408 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
1409 }
1410
1411 if had_snapshots || self.book_deltas_subs.contains(&cmd.instrument_id) {
1412 return Ok(());
1413 }
1414
1415 if let Some(client_id) = cmd.client_id.as_ref()
1416 && self.external_clients.contains(client_id)
1417 {
1418 if self.config.debug {
1419 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
1420 }
1421 return Ok(());
1422 }
1423
1424 log::debug!(
1425 "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
1426 cmd.instrument_id,
1427 cmd.client_id,
1428 cmd.venue,
1429 );
1430
1431 if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1432 let deltas_cmd = SubscribeBookDeltas::new(
1433 cmd.instrument_id,
1434 cmd.book_type,
1435 cmd.client_id,
1436 cmd.venue,
1437 UUID4::new(),
1438 cmd.ts_init,
1439 cmd.depth,
1440 true, Some(cmd.command_id),
1442 cmd.params.clone(),
1443 );
1444 log::debug!(
1445 "Calling client.execute_subscribe for BookDeltas: {}",
1446 cmd.instrument_id
1447 );
1448 client.execute_subscribe(SubscribeCommand::BookDeltas(deltas_cmd));
1449 } else {
1450 log::error!(
1451 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1452 cmd.client_id,
1453 cmd.venue,
1454 );
1455 }
1456
1457 Ok(())
1458 }
1459
1460 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1461 match cmd.bar_type.aggregation_source() {
1462 AggregationSource::Internal => {
1463 if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1464 self.start_bar_aggregator(cmd.bar_type)?;
1465 }
1466 }
1467 AggregationSource::External => {
1468 if cmd.bar_type.instrument_id().is_synthetic() {
1469 anyhow::bail!(
1470 "Cannot subscribe for externally aggregated synthetic instrument bar data"
1471 );
1472 }
1473 }
1474 }
1475
1476 Ok(())
1477 }
1478
1479 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> bool {
1480 if !self.book_deltas_subs.contains(&cmd.instrument_id) {
1481 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1482 return false;
1483 }
1484
1485 self.book_deltas_subs.remove(&cmd.instrument_id);
1486 self.maintain_book_updater(&cmd.instrument_id);
1487
1488 !self.has_book_snapshot_subscriptions(&cmd.instrument_id)
1491 }
1492
1493 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> bool {
1494 if !self.book_depth10_subs.contains(&cmd.instrument_id) {
1495 log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
1496 return false;
1497 }
1498
1499 self.book_depth10_subs.remove(&cmd.instrument_id);
1500 self.maintain_book_updater(&cmd.instrument_id);
1501
1502 true
1503 }
1504
1505 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) {
1506 match self.decrement_book_snapshot_subscription(cmd.instrument_id, cmd.interval_ms) {
1507 BookSnapshotUnsubscribeResult::NotSubscribed => {
1508 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1509 return;
1510 }
1511 BookSnapshotUnsubscribeResult::Decremented => return,
1512 BookSnapshotUnsubscribeResult::Removed => {}
1513 }
1514
1515 if self.has_book_snapshot_subscriptions(&cmd.instrument_id) {
1516 return;
1517 }
1518
1519 self.maintain_book_updater(&cmd.instrument_id);
1520
1521 if self.book_deltas_subs.contains(&cmd.instrument_id) {
1522 return;
1523 }
1524
1525 if let Some(client_id) = cmd.client_id.as_ref()
1526 && self.external_clients.contains(client_id)
1527 {
1528 return;
1529 }
1530
1531 if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1532 let deltas_cmd = UnsubscribeBookDeltas::new(
1533 cmd.instrument_id,
1534 cmd.client_id,
1535 cmd.venue,
1536 UUID4::new(),
1537 cmd.ts_init,
1538 Some(cmd.command_id),
1539 cmd.params.clone(),
1540 );
1541 client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
1542 }
1543 }
1544
1545 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) {
1546 let bar_type = cmd.bar_type;
1547
1548 let topic = switchboard::get_bars_topic(bar_type.standard());
1550 if msgbus::exact_subscriber_count_bars(topic) > 0 {
1551 return;
1552 }
1553
1554 if self.bar_aggregators.contains_key(&bar_type.standard())
1555 && let Err(e) = self.stop_bar_aggregator(bar_type)
1556 {
1557 log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1558 }
1559
1560 if bar_type.is_composite() {
1562 let source_type = bar_type.composite();
1563 let source_topic = switchboard::get_bars_topic(source_type);
1564 if msgbus::exact_subscriber_count_bars(source_topic) == 0
1565 && self.bar_aggregators.contains_key(&source_type)
1566 && let Err(e) = self.stop_bar_aggregator(source_type)
1567 {
1568 log::error!("Error stopping source bar aggregator for {source_type}: {e}");
1569 }
1570 }
1571 }
1572
1573 fn subscribe_option_chain(&mut self, cmd: &SubscribeOptionChain) {
1574 let series_id = cmd.series_id;
1575
1576 if let Some(old) = self.option_chain_managers.remove(&series_id) {
1578 log::info!("Re-subscribing option chain for {series_id}, tearing down previous");
1579 let all_ids = old.borrow().all_instrument_ids();
1580 let old_venue = old.borrow().venue();
1581 old.borrow_mut().teardown(&self.clock);
1582 self.forward_option_chain_unsubscribes(&all_ids, old_venue, cmd.client_id);
1583 }
1584
1585 self.pending_option_chain_requests
1587 .retain(|_, pending_cmd| pending_cmd.series_id != series_id);
1588
1589 if !matches!(cmd.strike_range, StrikeRange::Fixed(_)) {
1592 let resolved_client_id = self
1594 .get_client(cmd.client_id.as_ref(), Some(&series_id.venue))
1595 .map(|c| c.client_id);
1596
1597 if let Some(client_id) = resolved_client_id {
1598 let request_id = UUID4::new();
1599 let ts_init = self.clock.borrow().timestamp_ns();
1600
1601 let sample_instrument_id = {
1604 let cache = self.cache.borrow();
1605 cache
1606 .instruments(&series_id.venue, Some(&series_id.underlying))
1607 .iter()
1608 .find(|i| {
1609 i.expiration_ns() == Some(series_id.expiration_ns)
1610 && i.settlement_currency().code == series_id.settlement_currency
1611 })
1612 .map(|i| i.id())
1613 };
1614
1615 let request = RequestForwardPrices::new(
1616 series_id.venue,
1617 series_id.underlying,
1618 sample_instrument_id,
1619 Some(client_id),
1620 request_id,
1621 ts_init,
1622 None,
1623 );
1624
1625 self.pending_option_chain_requests
1626 .insert(request_id, cmd.clone());
1627
1628 let req_cmd = RequestCommand::ForwardPrices(request);
1629 if let Err(e) = self.execute_request(req_cmd) {
1630 log::warn!("Failed to request forward prices for {series_id}: {e}");
1631 let cmd = self
1632 .pending_option_chain_requests
1633 .remove(&request_id)
1634 .expect("just inserted");
1635 self.create_option_chain_manager(&cmd, None);
1636 }
1637
1638 return;
1639 }
1640 }
1641
1642 self.create_option_chain_manager(cmd, None);
1643 }
1644
1645 fn create_option_chain_manager(
1647 &mut self,
1648 cmd: &SubscribeOptionChain,
1649 initial_atm_price: Option<Price>,
1650 ) {
1651 let series_id = cmd.series_id;
1652 let cache = self.cache.clone();
1653 let clock = self.clock.clone();
1654 let priority = self.msgbus_priority;
1655 let deferred_cmd_queue = self.deferred_cmd_queue.clone();
1656
1657 let manager_rc = {
1658 let client = self.get_client(cmd.client_id.as_ref(), Some(&series_id.venue));
1659 OptionChainManager::create_and_setup(
1660 series_id,
1661 &cache,
1662 cmd,
1663 &clock,
1664 priority,
1665 client,
1666 initial_atm_price,
1667 deferred_cmd_queue,
1668 )
1669 };
1670
1671 for id in manager_rc.borrow().all_instrument_ids() {
1673 self.option_chain_instrument_index.insert(id, series_id);
1674 }
1675
1676 self.option_chain_managers.insert(series_id, manager_rc);
1677 }
1678
1679 fn unsubscribe_option_chain(&mut self, cmd: &UnsubscribeOptionChain) {
1680 let series_id = cmd.series_id;
1681
1682 let Some(manager_rc) = self.option_chain_managers.remove(&series_id) else {
1683 log::warn!("Cannot unsubscribe option chain for {series_id}: not subscribed");
1684 return;
1685 };
1686
1687 let all_ids = manager_rc.borrow().all_instrument_ids();
1689 let venue = manager_rc.borrow().venue();
1690
1691 for id in &all_ids {
1693 self.option_chain_instrument_index.remove(id);
1694 }
1695
1696 manager_rc.borrow_mut().teardown(&self.clock);
1697
1698 self.forward_option_chain_unsubscribes(&all_ids, venue, cmd.client_id);
1700
1701 log::info!("Unsubscribed option chain for {series_id}");
1702 }
1703
1704 fn forward_option_chain_unsubscribes(
1706 &mut self,
1707 instrument_ids: &[InstrumentId],
1708 venue: Venue,
1709 client_id: Option<ClientId>,
1710 ) {
1711 let ts_init = self.clock.borrow().timestamp_ns();
1712
1713 let Some(client) = self.get_client(client_id.as_ref(), Some(&venue)) else {
1714 log::error!(
1715 "Cannot forward option chain unsubscribes: no client found for venue={venue}",
1716 );
1717 return;
1718 };
1719
1720 for instrument_id in instrument_ids {
1721 client.execute_unsubscribe(&UnsubscribeCommand::Quotes(UnsubscribeQuotes::new(
1722 *instrument_id,
1723 client_id,
1724 Some(venue),
1725 UUID4::new(),
1726 ts_init,
1727 None,
1728 None,
1729 )));
1730 client.execute_unsubscribe(&UnsubscribeCommand::OptionGreeks(
1731 UnsubscribeOptionGreeks::new(
1732 *instrument_id,
1733 client_id,
1734 Some(venue),
1735 UUID4::new(),
1736 ts_init,
1737 None,
1738 None,
1739 ),
1740 ));
1741 client.execute_unsubscribe(&UnsubscribeCommand::InstrumentStatus(
1742 UnsubscribeInstrumentStatus::new(
1743 *instrument_id,
1744 client_id,
1745 Some(venue),
1746 UUID4::new(),
1747 ts_init,
1748 None,
1749 None,
1750 ),
1751 ));
1752 }
1753 }
1754
1755 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId) {
1756 let Some(updater) = self.book_updaters.get(instrument_id) else {
1757 return;
1758 };
1759
1760 let has_deltas = self.book_deltas_subs.contains(instrument_id);
1761 let has_depth10 = self.book_depth10_subs.contains(instrument_id);
1762 let has_snapshots = self.has_book_snapshot_subscriptions(instrument_id);
1763
1764 let deltas_topic = switchboard::get_book_deltas_topic(*instrument_id);
1765 let depth_topic = switchboard::get_book_depth10_topic(*instrument_id);
1766 let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
1767 let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater.clone());
1768
1769 if !has_deltas && !has_snapshots {
1770 msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
1771 }
1772
1773 if !has_depth10 {
1774 msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
1775 }
1776
1777 if !has_deltas && !has_depth10 && !has_snapshots {
1778 self.book_updaters.remove(instrument_id);
1779 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1780 }
1781 }
1782
1783 fn has_book_snapshot_subscriptions(&self, instrument_id: &InstrumentId) -> bool {
1784 self.book_snapshot_counts
1785 .keys()
1786 .any(|(id, _)| id == instrument_id)
1787 }
1788
1789 fn increment_book_snapshot_subscription(&mut self, cmd: &SubscribeBookSnapshots) -> bool {
1790 let key = (cmd.instrument_id, cmd.interval_ms);
1791
1792 if let Some(count) = self.book_snapshot_counts.get_mut(&key) {
1793 *count += 1;
1794 return false;
1795 }
1796
1797 self.book_snapshot_counts.insert(key, 1);
1798
1799 let snapshot_infos = if let Some(snapshot_infos) = self.book_intervals.get(&cmd.interval_ms)
1800 {
1801 snapshot_infos.clone()
1802 } else {
1803 let snapshot_infos = Rc::new(RefCell::new(IndexMap::new()));
1804 self.book_intervals
1805 .insert(cmd.interval_ms, snapshot_infos.clone());
1806 self.schedule_book_snapshotter(cmd.interval_ms, snapshot_infos.clone());
1807 snapshot_infos
1808 };
1809
1810 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
1811 let snap_info = BookSnapshotInfo {
1812 instrument_id: cmd.instrument_id,
1813 venue: cmd.instrument_id.venue,
1814 is_composite: cmd.instrument_id.symbol.is_composite(),
1815 root: Ustr::from(cmd.instrument_id.symbol.root()),
1816 topic,
1817 interval_ms: cmd.interval_ms,
1818 };
1819
1820 snapshot_infos
1821 .borrow_mut()
1822 .insert(cmd.instrument_id, snap_info);
1823
1824 true
1825 }
1826
1827 fn decrement_book_snapshot_subscription(
1828 &mut self,
1829 instrument_id: InstrumentId,
1830 interval_ms: NonZeroUsize,
1831 ) -> BookSnapshotUnsubscribeResult {
1832 let key = (instrument_id, interval_ms);
1833
1834 let Some(count) = self.book_snapshot_counts.get_mut(&key) else {
1835 return BookSnapshotUnsubscribeResult::NotSubscribed;
1836 };
1837
1838 if *count > 1 {
1839 *count -= 1;
1840 return BookSnapshotUnsubscribeResult::Decremented;
1841 }
1842
1843 self.book_snapshot_counts.shift_remove(&key);
1844
1845 let remove_interval = if let Some(snapshot_infos) = self.book_intervals.get(&interval_ms) {
1846 let mut snapshot_infos = snapshot_infos.borrow_mut();
1847 snapshot_infos.shift_remove(&instrument_id);
1848 snapshot_infos.is_empty()
1849 } else {
1850 false
1851 };
1852
1853 if remove_interval {
1854 self.book_intervals.remove(&interval_ms);
1855
1856 if let Some(snapshotter) = self.book_snapshotters.remove(&interval_ms) {
1857 let timer_name = snapshotter.timer_name;
1858 let mut clock = self.clock.borrow_mut();
1859 if clock.timer_exists(&timer_name) {
1860 clock.cancel_timer(&timer_name);
1861 }
1862 }
1863 }
1864
1865 BookSnapshotUnsubscribeResult::Removed
1866 }
1867
1868 fn schedule_book_snapshotter(
1869 &mut self,
1870 interval_ms: NonZeroUsize,
1871 snapshot_infos: BookSnapshotInfos,
1872 ) {
1873 let interval_ns = millis_to_nanos_unchecked(interval_ms.get() as f64);
1874 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
1875 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
1876
1877 let snapshotter = Rc::new(BookSnapshotter::new(
1878 interval_ms,
1879 snapshot_infos,
1880 self.cache.clone(),
1881 ));
1882 let timer_name = snapshotter.timer_name;
1883 let snapshotter_callback = snapshotter.clone();
1884 let callback_fn: Rc<dyn Fn(TimeEvent)> =
1885 Rc::new(move |event| snapshotter_callback.snapshot(event));
1886 let callback = TimeEventCallback::from(callback_fn);
1887
1888 self.clock
1889 .borrow_mut()
1890 .set_timer_ns(
1891 &timer_name,
1892 interval_ns,
1893 Some(start_time_ns.into()),
1894 None,
1895 Some(callback),
1896 None,
1897 None,
1898 )
1899 .expect(FAILED);
1900
1901 self.book_snapshotters.insert(interval_ms, snapshotter);
1902 }
1903
1904 fn handle_instrument_response(&self, instrument: InstrumentAny) {
1907 let mut cache = self.cache.as_ref().borrow_mut();
1908 if let Err(e) = cache.add_instrument(instrument) {
1909 log_error_on_cache_insert(&e);
1910 }
1911 }
1912
1913 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1914 let mut cache = self.cache.as_ref().borrow_mut();
1916 for instrument in instruments {
1917 if let Err(e) = cache.add_instrument(instrument.clone()) {
1918 log_error_on_cache_insert(&e);
1919 }
1920 }
1921 }
1922
1923 fn handle_quotes(&self, quotes: &[QuoteTick]) {
1924 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1925 log_error_on_cache_insert(&e);
1926 }
1927 }
1928
1929 fn handle_trades(&self, trades: &[TradeTick]) {
1930 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1931 log_error_on_cache_insert(&e);
1932 }
1933 }
1934
1935 fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
1936 if let Err(e) = self
1937 .cache
1938 .as_ref()
1939 .borrow_mut()
1940 .add_funding_rates(funding_rates)
1941 {
1942 log_error_on_cache_insert(&e);
1943 }
1944 }
1945
1946 fn handle_bars(&self, bars: &[Bar]) {
1947 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1948 log_error_on_cache_insert(&e);
1949 }
1950 }
1951
1952 fn handle_book_response(&self, book: &OrderBook) {
1953 log::debug!("Adding order book {} to cache", book.instrument_id);
1954
1955 if let Err(e) = self
1956 .cache
1957 .as_ref()
1958 .borrow_mut()
1959 .add_order_book(book.clone())
1960 {
1961 log_error_on_cache_insert(&e);
1962 }
1963 }
1964
1965 fn handle_forward_prices_response(
1968 &mut self,
1969 correlation_id: &UUID4,
1970 resp: &ForwardPricesResponse,
1971 ) {
1972 let Some(cmd) = self.pending_option_chain_requests.remove(correlation_id) else {
1973 log::debug!(
1974 "No pending option chain request for correlation_id={correlation_id}, ignoring"
1975 );
1976 return;
1977 };
1978
1979 let series_id = cmd.series_id;
1980
1981 let cache = self.cache.borrow();
1984 let mut best_price: Option<Price> = None;
1985
1986 for fp in &resp.data {
1987 if let Some(instrument) = cache.instrument(&fp.instrument_id)
1989 && let Some(expiration) = instrument.expiration_ns()
1990 && expiration == series_id.expiration_ns
1991 && instrument.settlement_currency().code == series_id.settlement_currency
1992 {
1993 match Price::from_decimal(fp.forward_price) {
1994 Ok(price) => best_price = Some(price),
1995 Err(e) => log::warn!("Invalid forward price for {}: {e}", fp.instrument_id),
1996 }
1997 break;
1998 }
1999 }
2000 drop(cache);
2001
2002 if let Some(price) = best_price {
2003 log::info!("Forward price for {series_id}: {price} (instant bootstrap)",);
2004 } else {
2005 log::info!(
2006 "No matching forward price found for {series_id}, will bootstrap from live data",
2007 );
2008 }
2009
2010 self.create_option_chain_manager(&cmd, best_price);
2011 }
2012
2013 fn setup_book_updater(
2016 &mut self,
2017 instrument_id: &InstrumentId,
2018 book_type: BookType,
2019 only_deltas: bool,
2020 managed: bool,
2021 ) -> anyhow::Result<()> {
2022 let mut cache = self.cache.borrow_mut();
2023 if managed && !cache.has_order_book(instrument_id) {
2024 let book = OrderBook::new(*instrument_id, book_type);
2025 log::debug!("Created {book}");
2026 cache.add_order_book(book)?;
2027 }
2028
2029 let updater = self
2031 .book_updaters
2032 .entry(*instrument_id)
2033 .or_insert_with(|| Rc::new(BookUpdater::new(instrument_id, self.cache.clone())))
2034 .clone();
2035
2036 let topic = switchboard::get_book_deltas_topic(*instrument_id);
2038 let deltas_handler = TypedHandler::new(updater.clone());
2039 msgbus::subscribe_book_deltas(topic.into(), deltas_handler, Some(self.msgbus_priority));
2040
2041 if !only_deltas {
2043 let topic = switchboard::get_book_depth10_topic(*instrument_id);
2044 let depth_handler = TypedHandler::new(updater);
2045 msgbus::subscribe_book_depth10(topic.into(), depth_handler, Some(self.msgbus_priority));
2046 }
2047
2048 Ok(())
2049 }
2050
2051 fn create_bar_aggregator(
2052 &self,
2053 instrument: &InstrumentAny,
2054 bar_type: BarType,
2055 ) -> Box<dyn BarAggregator> {
2056 let cache = self.cache.clone();
2057
2058 let handler = move |bar: Bar| {
2059 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
2060 log_error_on_cache_insert(&e);
2061 }
2062
2063 let topic = switchboard::get_bars_topic(bar.bar_type);
2064 msgbus::publish_bar(topic, &bar);
2065 };
2066
2067 let clock = self.clock.clone();
2068 let config = self.config.clone();
2069
2070 let price_precision = instrument.price_precision();
2071 let size_precision = instrument.size_precision();
2072
2073 if bar_type.spec().is_time_aggregated() {
2074 let time_bars_origin_offset = config
2076 .time_bars_origins
2077 .get(&bar_type.spec().aggregation)
2078 .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
2079
2080 Box::new(TimeBarAggregator::new(
2081 bar_type,
2082 price_precision,
2083 size_precision,
2084 clock,
2085 handler,
2086 config.time_bars_build_with_no_updates,
2087 config.time_bars_timestamp_on_close,
2088 config.time_bars_interval_type,
2089 time_bars_origin_offset,
2090 config.time_bars_build_delay,
2091 config.time_bars_skip_first_non_full_bar,
2092 ))
2093 } else {
2094 match bar_type.spec().aggregation {
2095 BarAggregation::Tick => Box::new(TickBarAggregator::new(
2096 bar_type,
2097 price_precision,
2098 size_precision,
2099 handler,
2100 )) as Box<dyn BarAggregator>,
2101 BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
2102 bar_type,
2103 price_precision,
2104 size_precision,
2105 handler,
2106 )) as Box<dyn BarAggregator>,
2107 BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
2108 bar_type,
2109 price_precision,
2110 size_precision,
2111 handler,
2112 )) as Box<dyn BarAggregator>,
2113 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
2114 bar_type,
2115 price_precision,
2116 size_precision,
2117 handler,
2118 )) as Box<dyn BarAggregator>,
2119 BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
2120 bar_type,
2121 price_precision,
2122 size_precision,
2123 handler,
2124 )) as Box<dyn BarAggregator>,
2125 BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
2126 bar_type,
2127 price_precision,
2128 size_precision,
2129 handler,
2130 )) as Box<dyn BarAggregator>,
2131 BarAggregation::Value => Box::new(ValueBarAggregator::new(
2132 bar_type,
2133 price_precision,
2134 size_precision,
2135 handler,
2136 )) as Box<dyn BarAggregator>,
2137 BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
2138 bar_type,
2139 price_precision,
2140 size_precision,
2141 handler,
2142 )) as Box<dyn BarAggregator>,
2143 BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
2144 bar_type,
2145 price_precision,
2146 size_precision,
2147 handler,
2148 )) as Box<dyn BarAggregator>,
2149 BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
2150 bar_type,
2151 price_precision,
2152 size_precision,
2153 instrument.price_increment(),
2154 handler,
2155 )) as Box<dyn BarAggregator>,
2156 other => unreachable!(
2157 "Unsupported internal bar aggregation dispatch for {other:?}; update `create_bar_aggregator`"
2158 ),
2159 }
2160 }
2161 }
2162
2163 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
2164 let instrument = {
2166 let cache = self.cache.borrow();
2167 cache
2168 .instrument(&bar_type.instrument_id())
2169 .ok_or_else(|| {
2170 anyhow::anyhow!(
2171 "Cannot start bar aggregation: no instrument found for {}",
2172 bar_type.instrument_id(),
2173 )
2174 })?
2175 .clone()
2176 };
2177
2178 let bar_key = bar_type.standard();
2180
2181 let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
2183 rc.clone()
2184 } else {
2185 let agg = self.create_bar_aggregator(&instrument, bar_type);
2186 let rc = Rc::new(RefCell::new(agg));
2187 self.bar_aggregators.insert(bar_key, rc.clone());
2188 rc
2189 };
2190
2191 let mut subscriptions = Vec::new();
2193
2194 if bar_type.is_composite() {
2195 let topic = switchboard::get_bars_topic(bar_type.composite());
2196 let handler = TypedHandler::new(BarBarHandler::new(&aggregator, bar_key));
2197 msgbus::subscribe_bars(topic.into(), handler.clone(), Some(self.msgbus_priority));
2198 subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
2199 } else if bar_type.spec().price_type == PriceType::Last {
2200 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
2201 let handler = TypedHandler::new(BarTradeHandler::new(&aggregator, bar_key));
2202 msgbus::subscribe_trades(topic.into(), handler.clone(), Some(self.msgbus_priority));
2203 subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
2204 } else {
2205 if matches!(
2207 bar_type.spec().aggregation,
2208 BarAggregation::TickImbalance
2209 | BarAggregation::VolumeImbalance
2210 | BarAggregation::ValueImbalance
2211 | BarAggregation::TickRuns
2212 | BarAggregation::VolumeRuns
2213 | BarAggregation::ValueRuns
2214 ) {
2215 log::warn!(
2216 "Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
2217 data with `aggressor_side`, but `price_type` is not LAST so it will receive \
2218 quote data: bars will not emit correctly",
2219 );
2220 }
2221
2222 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
2223 let handler = TypedHandler::new(BarQuoteHandler::new(&aggregator, bar_key));
2224 msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(self.msgbus_priority));
2225 subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
2226 }
2227
2228 self.bar_aggregator_handlers.insert(bar_key, subscriptions);
2229
2230 self.setup_bar_aggregator(bar_type, false)?;
2232
2233 aggregator.borrow_mut().set_is_running(true);
2234
2235 Ok(())
2236 }
2237
2238 fn setup_bar_aggregator(&self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
2242 let bar_key = bar_type.standard();
2243 let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
2244 anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
2245 })?;
2246
2247 let handler: Box<dyn FnMut(Bar)> = if historical {
2249 let cache = self.cache.clone();
2251 Box::new(move |bar: Bar| {
2252 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
2253 log_error_on_cache_insert(&e);
2254 }
2255 })
2257 } else {
2258 let cache = self.cache.clone();
2260 Box::new(move |bar: Bar| {
2261 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
2262 log_error_on_cache_insert(&e);
2263 }
2264 let topic = switchboard::get_bars_topic(bar.bar_type);
2265 msgbus::publish_bar(topic, &bar);
2266 })
2267 };
2268
2269 aggregator
2270 .borrow_mut()
2271 .set_historical_mode(historical, handler);
2272
2273 if bar_type.spec().is_time_aggregated() {
2275 use nautilus_common::clock::TestClock;
2276
2277 if historical {
2278 let test_clock = Rc::new(RefCell::new(TestClock::new()));
2280 aggregator.borrow_mut().set_clock(test_clock);
2281 let aggregator_weak = Rc::downgrade(aggregator);
2284 aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
2285 } else {
2286 aggregator.borrow_mut().set_clock(self.clock.clone());
2287 aggregator
2288 .borrow_mut()
2289 .start_timer(Some(aggregator.clone()));
2290 }
2291 }
2292
2293 Ok(())
2294 }
2295
2296 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
2297 let aggregator = self
2298 .bar_aggregators
2299 .shift_remove(&bar_type.standard())
2300 .ok_or_else(|| {
2301 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
2302 })?;
2303
2304 aggregator.borrow_mut().stop();
2305
2306 let bar_key = bar_type.standard();
2308 if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
2309 for sub in subs {
2310 match sub {
2311 BarAggregatorSubscription::Bar { topic, handler } => {
2312 msgbus::unsubscribe_bars(topic.into(), &handler);
2313 }
2314 BarAggregatorSubscription::Trade { topic, handler } => {
2315 msgbus::unsubscribe_trades(topic.into(), &handler);
2316 }
2317 BarAggregatorSubscription::Quote { topic, handler } => {
2318 msgbus::unsubscribe_quotes(topic.into(), &handler);
2319 }
2320 }
2321 }
2322 }
2323
2324 Ok(())
2325 }
2326}
2327
2328#[inline(always)]
2329fn log_error_on_cache_insert<T: Display>(e: &T) {
2330 log::error!("Error on cache insert: {e}");
2331}
2332
2333#[inline(always)]
2334fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
2335 if data.is_empty() {
2336 let name = type_name::<T>();
2337 let short_name = name.rsplit("::").next().unwrap_or(name);
2338 log::warn!("Received empty {short_name} response for {id} {correlation_id}");
2339 return true;
2340 }
2341 false
2342}