Skip to main content

nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod bar;
32pub mod book;
33mod commands;
34pub mod config;
35mod handlers;
36
37#[cfg(feature = "defi")]
38pub mod pool;
39
40use std::{
41    any::{Any, type_name},
42    cell::{Ref, RefCell},
43    collections::VecDeque,
44    fmt::{Debug, Display},
45    num::NonZeroUsize,
46    rc::Rc,
47};
48
49use ahash::{AHashMap, AHashSet};
50pub use bar::BarAggregatorSubscription;
51use book::{
52    BookSnapshotInfo, BookSnapshotInfos, BookSnapshotKey, BookSnapshotUnsubscribeResult,
53    BookSnapshotter, BookUpdater,
54};
55pub(crate) use commands::{DeferredCommand, DeferredCommandQueue};
56use config::DataEngineConfig;
57use futures::future::join_all;
58use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
59use indexmap::IndexMap;
60use nautilus_common::{
61    cache::Cache,
62    clock::Clock,
63    logging::{RECV, RES},
64    messages::data::{
65        DataCommand, DataResponse, ForwardPricesResponse, RequestCommand, RequestForwardPrices,
66        SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
67        SubscribeCommand, SubscribeOptionChain, UnsubscribeBars, UnsubscribeBookDeltas,
68        UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCommand,
69        UnsubscribeInstrumentStatus, UnsubscribeOptionChain, UnsubscribeOptionGreeks,
70        UnsubscribeQuotes,
71    },
72    msgbus::{
73        self, ShareableMessageHandler, TypedHandler, TypedIntoHandler,
74        switchboard::{self, MessagingSwitchboard},
75    },
76    runner::get_data_cmd_sender,
77    timer::{TimeEvent, TimeEventCallback},
78};
79use nautilus_core::{
80    UUID4, WeakCell,
81    correctness::{
82        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
83    },
84    datetime::millis_to_nanos_unchecked,
85};
86#[cfg(feature = "defi")]
87use nautilus_model::defi::DefiData;
88use nautilus_model::{
89    data::{
90        Bar, BarType, CustomData, Data, DataType, FundingRateUpdate, IndexPriceUpdate,
91        InstrumentClose, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
92        OrderBookDepth10, QuoteTick, TradeTick,
93        option_chain::{OptionGreeks, StrikeRange},
94    },
95    enums::{
96        AggregationSource, BarAggregation, BookType, MarketStatusAction, PriceType, RecordFlag,
97    },
98    identifiers::{ClientId, InstrumentId, OptionSeriesId, Venue},
99    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
100    orderbook::OrderBook,
101    types::Price,
102};
103#[cfg(feature = "streaming")]
104use nautilus_persistence::backend::catalog::ParquetDataCatalog;
105use ustr::Ustr;
106
107#[cfg(feature = "defi")]
108#[allow(unused_imports)] // Brings DeFi impl blocks into scope
109use crate::defi::engine as _;
110#[cfg(feature = "defi")]
111use crate::engine::pool::PoolUpdater;
112use crate::{
113    aggregation::{
114        BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
115        TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
116        ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
117        VolumeRunsBarAggregator,
118    },
119    client::DataClientAdapter,
120    option_chains::OptionChainManager,
121};
122
123/// Provides a high-performance `DataEngine` for all environments.
124#[derive(Debug)]
125pub struct DataEngine {
126    pub(crate) clock: Rc<RefCell<dyn Clock>>,
127    pub(crate) cache: Rc<RefCell<Cache>>,
128    pub(crate) external_clients: AHashSet<ClientId>,
129    clients: IndexMap<ClientId, DataClientAdapter>,
130    default_client: Option<DataClientAdapter>,
131    #[cfg(feature = "streaming")]
132    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
133    routing_map: IndexMap<Venue, ClientId>,
134    book_intervals: AHashMap<NonZeroUsize, BookSnapshotInfos>,
135    book_snapshot_counts: IndexMap<BookSnapshotKey, usize>,
136    book_deltas_subs: AHashSet<InstrumentId>,
137    book_depth10_subs: AHashSet<InstrumentId>,
138    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
139    book_snapshotters: AHashMap<NonZeroUsize, Rc<BookSnapshotter>>,
140    bar_aggregators: IndexMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
141    bar_aggregator_handlers: AHashMap<BarType, Vec<BarAggregatorSubscription>>,
142    option_chain_managers: AHashMap<OptionSeriesId, Rc<RefCell<OptionChainManager>>>,
143    option_chain_instrument_index: AHashMap<InstrumentId, OptionSeriesId>,
144    deferred_cmd_queue: DeferredCommandQueue,
145    pending_option_chain_requests: AHashMap<UUID4, SubscribeOptionChain>,
146    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
147    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
148    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
149    command_count: u64,
150    data_count: u64,
151    request_count: u64,
152    response_count: u64,
153    pub(crate) msgbus_priority: u8,
154    pub(crate) config: DataEngineConfig,
155    #[cfg(feature = "defi")]
156    pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
157    #[cfg(feature = "defi")]
158    pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
159    #[cfg(feature = "defi")]
160    pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
161    #[cfg(feature = "defi")]
162    pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
163}
164
165impl DataEngine {
166    /// Creates a new [`DataEngine`] instance.
167    #[must_use]
168    pub fn new(
169        clock: Rc<RefCell<dyn Clock>>,
170        cache: Rc<RefCell<Cache>>,
171        config: Option<DataEngineConfig>,
172    ) -> Self {
173        let config = config.unwrap_or_default();
174
175        let external_clients: AHashSet<ClientId> = config
176            .external_clients
177            .clone()
178            .unwrap_or_default()
179            .into_iter()
180            .collect();
181
182        Self {
183            clock,
184            cache,
185            external_clients,
186            clients: IndexMap::new(),
187            default_client: None,
188            #[cfg(feature = "streaming")]
189            catalogs: AHashMap::new(),
190            routing_map: IndexMap::new(),
191            book_intervals: AHashMap::new(),
192            book_snapshot_counts: IndexMap::new(),
193            book_deltas_subs: AHashSet::new(),
194            book_depth10_subs: AHashSet::new(),
195            book_updaters: AHashMap::new(),
196            book_snapshotters: AHashMap::new(),
197            bar_aggregators: IndexMap::new(),
198            bar_aggregator_handlers: AHashMap::new(),
199            option_chain_managers: AHashMap::new(),
200            option_chain_instrument_index: AHashMap::new(),
201            deferred_cmd_queue: Rc::new(RefCell::new(VecDeque::new())),
202            pending_option_chain_requests: AHashMap::new(),
203            _synthetic_quote_feeds: AHashMap::new(),
204            _synthetic_trade_feeds: AHashMap::new(),
205            buffered_deltas_map: AHashMap::new(),
206            command_count: 0,
207            data_count: 0,
208            request_count: 0,
209            response_count: 0,
210            msgbus_priority: 10, // High-priority for built-in component
211            config,
212            #[cfg(feature = "defi")]
213            pool_updaters: AHashMap::new(),
214            #[cfg(feature = "defi")]
215            pool_updaters_pending: AHashSet::new(),
216            #[cfg(feature = "defi")]
217            pool_snapshot_pending: AHashSet::new(),
218            #[cfg(feature = "defi")]
219            pool_event_buffers: AHashMap::new(),
220        }
221    }
222
223    /// Registers all message bus handlers for the data engine.
224    pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
225        let weak = WeakCell::from(Rc::downgrade(engine));
226
227        let weak1 = weak.clone();
228        msgbus::register_data_command_endpoint(
229            MessagingSwitchboard::data_engine_execute(),
230            TypedIntoHandler::from(move |cmd: DataCommand| {
231                if let Some(rc) = weak1.upgrade() {
232                    rc.borrow_mut().execute(cmd);
233                }
234            }),
235        );
236
237        msgbus::register_data_command_endpoint(
238            MessagingSwitchboard::data_engine_queue_execute(),
239            TypedIntoHandler::from(move |cmd: DataCommand| {
240                get_data_cmd_sender().clone().execute(cmd);
241            }),
242        );
243
244        // Register process handler (polymorphic - uses Any)
245        let weak2 = weak.clone();
246        msgbus::register_any(
247            MessagingSwitchboard::data_engine_process(),
248            ShareableMessageHandler::from_any(move |data: &dyn Any| {
249                if let Some(rc) = weak2.upgrade() {
250                    rc.borrow_mut().process(data);
251                }
252            }),
253        );
254
255        // Register process_data handler (typed - takes ownership)
256        let weak3 = weak.clone();
257        msgbus::register_data_endpoint(
258            MessagingSwitchboard::data_engine_process_data(),
259            TypedIntoHandler::from(move |data: Data| {
260                if let Some(rc) = weak3.upgrade() {
261                    rc.borrow_mut().process_data(data);
262                }
263            }),
264        );
265
266        // Register process_defi_data handler (typed - takes ownership)
267        #[cfg(feature = "defi")]
268        {
269            let weak4 = weak.clone();
270            msgbus::register_defi_data_endpoint(
271                MessagingSwitchboard::data_engine_process_defi_data(),
272                TypedIntoHandler::from(move |data: DefiData| {
273                    if let Some(rc) = weak4.upgrade() {
274                        rc.borrow_mut().process_defi_data(data);
275                    }
276                }),
277            );
278        }
279
280        let weak5 = weak;
281        msgbus::register_data_response_endpoint(
282            MessagingSwitchboard::data_engine_response(),
283            TypedIntoHandler::from(move |resp: DataResponse| {
284                if let Some(rc) = weak5.upgrade() {
285                    rc.borrow_mut().response(resp);
286                }
287            }),
288        );
289    }
290
291    /// Returns the total count of data commands received by the engine.
292    #[must_use]
293    pub const fn command_count(&self) -> u64 {
294        self.command_count
295    }
296
297    /// Returns the total count of data stream objects received by the engine.
298    #[must_use]
299    pub const fn data_count(&self) -> u64 {
300        self.data_count
301    }
302
303    #[cfg(feature = "defi")]
304    pub(crate) const fn increment_data_count(&mut self) {
305        self.data_count += 1;
306    }
307
308    /// Returns the total count of data requests received by the engine.
309    #[must_use]
310    pub const fn request_count(&self) -> u64 {
311        self.request_count
312    }
313
314    /// Returns the total count of data responses received by the engine.
315    #[must_use]
316    pub const fn response_count(&self) -> u64 {
317        self.response_count
318    }
319
320    /// Returns whether an `OptionChainManager` exists for the given series.
321    #[must_use]
322    pub fn has_option_chain_manager(&self, series_id: &OptionSeriesId) -> bool {
323        self.option_chain_managers.contains_key(series_id)
324    }
325
326    /// Returns the count of pending option-chain bootstrap requests.
327    #[must_use]
328    pub fn pending_option_chain_request_count(&self) -> usize {
329        self.pending_option_chain_requests.len()
330    }
331
332    /// Returns a read-only reference to the engines clock.
333    #[must_use]
334    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
335        self.clock.borrow()
336    }
337
338    /// Returns a read-only reference to the engines cache.
339    #[must_use]
340    pub fn get_cache(&self) -> Ref<'_, Cache> {
341        self.cache.borrow()
342    }
343
344    /// Returns the `Rc<RefCell<Cache>>` used by this engine.
345    #[must_use]
346    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
347        Rc::clone(&self.cache)
348    }
349
350    /// Registers the `catalog` with the engine with an optional specific `name`.
351    ///
352    /// # Panics
353    ///
354    /// Panics if a catalog with the same `name` has already been registered.
355    #[cfg(feature = "streaming")]
356    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<&str>) {
357        let name = Ustr::from(name.unwrap_or("catalog_0"));
358
359        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
360
361        self.catalogs.insert(name, catalog);
362        log::info!("Registered catalog <{name}>");
363    }
364
365    /// Registers the `client` with the engine with an optional venue `routing`.
366    ///
367    ///
368    /// # Panics
369    ///
370    /// Panics if a client with the same client ID has already been registered.
371    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
372        let client_id = client.client_id();
373
374        if let Some(default_client) = &self.default_client {
375            check_predicate_false(
376                default_client.client_id() == client.client_id(),
377                "client_id already registered as default client",
378            )
379            .expect(FAILED);
380        }
381
382        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
383
384        if let Some(routing) = routing {
385            self.routing_map.insert(routing, client_id);
386            log::debug!("Set client {client_id} routing for {routing}");
387        }
388
389        if client.venue.is_none() && self.default_client.is_none() {
390            self.default_client = Some(client);
391            log::debug!("Registered client {client_id} for default routing");
392        } else {
393            self.clients.insert(client_id, client);
394            log::debug!("Registered client {client_id}");
395        }
396    }
397
398    /// Deregisters the client for the `client_id`.
399    ///
400    /// # Panics
401    ///
402    /// Panics if the client ID has not been registered.
403    pub fn deregister_client(&mut self, client_id: &ClientId) {
404        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
405
406        self.clients.shift_remove(client_id);
407        log::info!("Deregistered client {client_id}");
408    }
409
410    /// Registers the data `client` with the engine as the default routing client.
411    ///
412    /// When a specific venue routing cannot be found, this client will receive messages.
413    ///
414    /// # Warnings
415    ///
416    /// Any existing default routing client will be overwritten.
417    ///
418    /// # Panics
419    ///
420    /// Panics if a default client has already been registered.
421    pub fn register_default_client(&mut self, client: DataClientAdapter) {
422        check_predicate_true(
423            self.default_client.is_none(),
424            "default client already registered",
425        )
426        .expect(FAILED);
427
428        let client_id = client.client_id();
429
430        self.default_client = Some(client);
431        log::debug!("Registered default client {client_id}");
432    }
433
434    /// Starts all registered data clients and re-arms bar aggregator timers.
435    pub fn start(&mut self) {
436        for client in self.get_clients_mut() {
437            if let Err(e) = client.start() {
438                log::error!("{e}");
439            }
440        }
441
442        for aggregator in self.bar_aggregators.values() {
443            if aggregator.borrow().bar_type().spec().is_time_aggregated() {
444                aggregator
445                    .borrow_mut()
446                    .start_timer(Some(aggregator.clone()));
447            }
448        }
449    }
450
451    /// Stops all registered data clients and bar aggregator timers.
452    pub fn stop(&mut self) {
453        for client in self.get_clients_mut() {
454            if let Err(e) = client.stop() {
455                log::error!("{e}");
456            }
457        }
458
459        for aggregator in self.bar_aggregators.values() {
460            aggregator.borrow_mut().stop();
461        }
462    }
463
464    /// Resets all registered data clients and clears bar aggregator state.
465    pub fn reset(&mut self) {
466        for client in self.get_clients_mut() {
467            if let Err(e) = client.reset() {
468                log::error!("{e}");
469            }
470        }
471
472        let bar_types: Vec<BarType> = self.bar_aggregators.keys().copied().collect();
473        for bar_type in bar_types {
474            if let Err(e) = self.stop_bar_aggregator(bar_type) {
475                log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
476            }
477        }
478
479        self.command_count = 0;
480        self.data_count = 0;
481        self.request_count = 0;
482        self.response_count = 0;
483    }
484
485    /// Disposes the engine, stopping all clients and canceling any timers.
486    pub fn dispose(&mut self) {
487        for client in self.get_clients_mut() {
488            if let Err(e) = client.dispose() {
489                log::error!("{e}");
490            }
491        }
492
493        self.clock.borrow_mut().cancel_timers();
494    }
495
496    /// Connects all registered data clients concurrently.
497    ///
498    /// Connection failures are logged but do not prevent the node from running.
499    pub async fn connect(&mut self) {
500        let futures: Vec<_> = self
501            .get_clients_mut()
502            .into_iter()
503            .map(|client| client.connect())
504            .collect();
505
506        let results = join_all(futures).await;
507
508        for error in results.into_iter().filter_map(Result::err) {
509            log::error!("Failed to connect data client: {error}");
510        }
511    }
512
513    /// Disconnects all registered data clients concurrently.
514    ///
515    /// # Errors
516    ///
517    /// Returns an error if any client fails to disconnect.
518    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
519        let futures: Vec<_> = self
520            .get_clients_mut()
521            .into_iter()
522            .map(|client| client.disconnect())
523            .collect();
524
525        let results = join_all(futures).await;
526        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
527
528        if errors.is_empty() {
529            Ok(())
530        } else {
531            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
532            anyhow::bail!(
533                "Failed to disconnect data clients: {}",
534                error_msgs.join("; ")
535            )
536        }
537    }
538
539    /// Returns `true` if all registered data clients are currently connected.
540    #[must_use]
541    pub fn check_connected(&self) -> bool {
542        self.get_clients()
543            .iter()
544            .all(|client| client.is_connected())
545    }
546
547    /// Returns `true` if all registered data clients are currently disconnected.
548    #[must_use]
549    pub fn check_disconnected(&self) -> bool {
550        self.get_clients()
551            .iter()
552            .all(|client| !client.is_connected())
553    }
554
555    /// Returns connection status for each registered client.
556    #[must_use]
557    pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
558        self.get_clients()
559            .into_iter()
560            .map(|client| (client.client_id(), client.is_connected()))
561            .collect()
562    }
563
564    /// Returns a list of all registered client IDs, including the default client if set.
565    #[must_use]
566    pub fn registered_clients(&self) -> Vec<ClientId> {
567        self.get_clients()
568            .into_iter()
569            .map(|client| client.client_id())
570            .collect()
571    }
572
573    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
574
575    pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
576    where
577        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
578        T: Clone,
579    {
580        self.get_clients()
581            .into_iter()
582            .flat_map(get_subs)
583            .cloned()
584            .collect()
585    }
586
587    #[must_use]
588    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
589        let (default_opt, clients_map) = (&self.default_client, &self.clients);
590        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
591
592        if let Some(default) = default_opt {
593            clients.push(default);
594        }
595
596        clients
597    }
598
599    #[must_use]
600    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
601        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
602        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
603
604        if let Some(default) = default_opt {
605            clients.push(default);
606        }
607
608        clients
609    }
610
611    pub fn get_client(
612        &mut self,
613        client_id: Option<&ClientId>,
614        venue: Option<&Venue>,
615    ) -> Option<&mut DataClientAdapter> {
616        if let Some(client_id) = client_id {
617            // Explicit ID: first look in registered clients
618            if let Some(client) = self.clients.get_mut(client_id) {
619                return Some(client);
620            }
621
622            // Then check if it matches the default client
623            if let Some(default) = self.default_client.as_mut()
624                && default.client_id() == *client_id
625            {
626                return Some(default);
627            }
628
629            // Unknown explicit client
630            return None;
631        }
632
633        if let Some(v) = venue {
634            // Route by venue if mapped client still registered
635            if let Some(client_id) = self.routing_map.get(v) {
636                return self.clients.get_mut(client_id);
637            }
638        }
639
640        // Fallback to default client
641        self.get_default_client()
642    }
643
644    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
645        self.default_client.as_mut()
646    }
647
648    /// Returns all custom data types currently subscribed across all clients.
649    #[must_use]
650    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
651        self.collect_subscriptions(|client| &client.subscriptions_custom)
652    }
653
654    /// Returns all instrument IDs currently subscribed across all clients.
655    #[must_use]
656    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
657        self.collect_subscriptions(|client| &client.subscriptions_instrument)
658    }
659
660    /// Returns all instrument IDs for which book delta subscriptions exist.
661    #[must_use]
662    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
663        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
664    }
665
666    /// Returns all instrument IDs for which book depth10 subscriptions exist.
667    #[must_use]
668    pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
669        self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
670    }
671
672    /// Returns all instrument IDs for which book snapshot subscriptions exist.
673    #[must_use]
674    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
675        self.book_snapshot_counts
676            .keys()
677            .map(|(instrument_id, _)| *instrument_id)
678            .collect()
679    }
680
681    /// Returns all instrument IDs for which quote subscriptions exist.
682    #[must_use]
683    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
684        self.collect_subscriptions(|client| &client.subscriptions_quotes)
685    }
686
687    /// Returns all instrument IDs for which trade subscriptions exist.
688    #[must_use]
689    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
690        self.collect_subscriptions(|client| &client.subscriptions_trades)
691    }
692
693    /// Returns all bar types currently subscribed across all clients.
694    #[must_use]
695    pub fn subscribed_bars(&self) -> Vec<BarType> {
696        self.collect_subscriptions(|client| &client.subscriptions_bars)
697    }
698
699    /// Returns all instrument IDs for which mark price subscriptions exist.
700    #[must_use]
701    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
702        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
703    }
704
705    /// Returns all instrument IDs for which index price subscriptions exist.
706    #[must_use]
707    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
708        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
709    }
710
711    /// Returns all instrument IDs for which funding rate subscriptions exist.
712    #[must_use]
713    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
714        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
715    }
716
717    /// Returns all instrument IDs for which status subscriptions exist.
718    #[must_use]
719    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
720        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
721    }
722
723    /// Returns all instrument IDs for which instrument close subscriptions exist.
724    #[must_use]
725    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
726        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
727    }
728
729    // -- COMMANDS --------------------------------------------------------------------------------
730
731    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
732    ///
733    /// Errors during execution are logged.
734    pub fn execute(&mut self, cmd: DataCommand) {
735        match &cmd {
736            DataCommand::Subscribe(_) | DataCommand::Unsubscribe(_) => self.command_count += 1,
737            DataCommand::Request(_) => self.request_count += 1,
738            #[cfg(feature = "defi")]
739            DataCommand::DefiRequest(_) => self.request_count += 1,
740            #[cfg(feature = "defi")]
741            DataCommand::DefiSubscribe(_) | DataCommand::DefiUnsubscribe(_) => {
742                self.command_count += 1;
743            }
744            _ => {}
745        }
746
747        if let Err(e) = match cmd {
748            DataCommand::Subscribe(c) => self.execute_subscribe(c),
749            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
750            DataCommand::Request(c) => self.execute_request(c),
751            #[cfg(feature = "defi")]
752            DataCommand::DefiRequest(c) => self.execute_defi_request(c),
753            #[cfg(feature = "defi")]
754            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
755            #[cfg(feature = "defi")]
756            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
757            _ => {
758                log::warn!("Unhandled DataCommand variant");
759                Ok(())
760            }
761        } {
762            log::error!("{e}");
763        }
764    }
765
766    /// Handles a subscribe command, updating internal state and forwarding to the client.
767    ///
768    /// # Errors
769    ///
770    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
771    /// or if the underlying client operation fails.
772    pub fn execute_subscribe(&mut self, cmd: SubscribeCommand) -> anyhow::Result<()> {
773        // Update internal engine state
774        match &cmd {
775            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
776            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
777            SubscribeCommand::BookSnapshots(cmd) => {
778                // Handles client forwarding internally (forwards as BookDeltas)
779                return self.subscribe_book_snapshots(cmd);
780            }
781            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
782            SubscribeCommand::OptionChain(cmd) => {
783                self.subscribe_option_chain(cmd);
784                return Ok(());
785            }
786            SubscribeCommand::Instrument(cmd) if cmd.instrument_id.is_synthetic() => {
787                anyhow::bail!("Cannot subscribe for synthetic instrument `Instrument` data");
788            }
789            SubscribeCommand::InstrumentStatus(cmd) if cmd.instrument_id.is_synthetic() => {
790                anyhow::bail!("Cannot subscribe for synthetic instrument `InstrumentStatus` data");
791            }
792            SubscribeCommand::InstrumentClose(cmd) if cmd.instrument_id.is_synthetic() => {
793                anyhow::bail!("Cannot subscribe for synthetic instrument `InstrumentClose` data");
794            }
795            SubscribeCommand::OptionGreeks(cmd) if cmd.instrument_id.is_synthetic() => {
796                anyhow::bail!("Cannot subscribe for synthetic instrument `OptionGreeks` data");
797            }
798            _ => {} // Do nothing else
799        }
800
801        if let Some(client_id) = cmd.client_id()
802            && self.external_clients.contains(client_id)
803        {
804            if self.config.debug {
805                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
806            }
807            return Ok(());
808        }
809
810        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
811            client.execute_subscribe(cmd);
812        } else {
813            log::error!(
814                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
815                cmd.client_id(),
816                cmd.venue(),
817            );
818        }
819
820        Ok(())
821    }
822
823    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
824    ///
825    /// # Errors
826    ///
827    /// Returns an error if the underlying client operation fails.
828    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
829        match &cmd {
830            UnsubscribeCommand::BookDeltas(cmd) if !self.unsubscribe_book_deltas(cmd) => {
831                return Ok(());
832            }
833            UnsubscribeCommand::BookDepth10(cmd) if !self.unsubscribe_book_depth10(cmd) => {
834                return Ok(());
835            }
836            UnsubscribeCommand::BookSnapshots(cmd) => {
837                // Handles client forwarding internally (forwards as BookDeltas)
838                self.unsubscribe_book_snapshots(cmd);
839                return Ok(());
840            }
841            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
842            UnsubscribeCommand::OptionChain(cmd) => {
843                self.unsubscribe_option_chain(cmd);
844                return Ok(());
845            }
846            UnsubscribeCommand::Instrument(cmd) if cmd.instrument_id.is_synthetic() => {
847                anyhow::bail!("Cannot unsubscribe from synthetic instrument `Instrument` data");
848            }
849            UnsubscribeCommand::InstrumentStatus(cmd) if cmd.instrument_id.is_synthetic() => {
850                anyhow::bail!(
851                    "Cannot unsubscribe from synthetic instrument `InstrumentStatus` data"
852                );
853            }
854            UnsubscribeCommand::InstrumentClose(cmd) if cmd.instrument_id.is_synthetic() => {
855                anyhow::bail!(
856                    "Cannot unsubscribe from synthetic instrument `InstrumentClose` data"
857                );
858            }
859            UnsubscribeCommand::OptionGreeks(cmd) if cmd.instrument_id.is_synthetic() => {
860                anyhow::bail!("Cannot unsubscribe from synthetic instrument `OptionGreeks` data");
861            }
862            _ => {} // Do nothing else
863        }
864
865        if let Some(client_id) = cmd.client_id()
866            && self.external_clients.contains(client_id)
867        {
868            if self.config.debug {
869                log::debug!(
870                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
871                );
872            }
873            return Ok(());
874        }
875
876        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
877            client.execute_unsubscribe(cmd);
878        } else {
879            log::error!(
880                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
881                cmd.client_id(),
882                cmd.venue(),
883            );
884        }
885
886        Ok(())
887    }
888
889    /// Sends a [`RequestCommand`] to a suitable data client implementation.
890    ///
891    /// # Errors
892    ///
893    /// Returns an error if no client is found for the given client ID or venue,
894    /// or if the client fails to process the request.
895    pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
896        // Skip requests for external clients
897        if let Some(cid) = req.client_id()
898            && self.external_clients.contains(cid)
899        {
900            if self.config.debug {
901                log::debug!("Skipping data request for external client {cid}: {req:?}");
902            }
903            return Ok(());
904        }
905
906        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
907            match req {
908                RequestCommand::Data(req) => client.request_data(req),
909                RequestCommand::Instrument(req) => client.request_instrument(req),
910                RequestCommand::Instruments(req) => client.request_instruments(req),
911                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
912                RequestCommand::BookDepth(req) => client.request_book_depth(req),
913                RequestCommand::Quotes(req) => client.request_quotes(req),
914                RequestCommand::Trades(req) => client.request_trades(req),
915                RequestCommand::FundingRates(req) => client.request_funding_rates(req),
916                RequestCommand::ForwardPrices(req) => client.request_forward_prices(req),
917                RequestCommand::Bars(req) => client.request_bars(req),
918            }
919        } else {
920            anyhow::bail!(
921                "Cannot handle request: no client found for {:?} {:?}",
922                req.client_id(),
923                req.venue()
924            );
925        }
926    }
927
928    /// Processes a dynamically-typed data message.
929    ///
930    /// Currently supports `InstrumentAny` and `FundingRateUpdate`; unrecognized types are logged as errors.
931    pub fn process(&mut self, data: &dyn Any) {
932        self.data_count += 1;
933        // TODO: Eventually these can be added to the `Data` enum (C/Cython blocking), process here for now
934        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
935            self.handle_instrument(instrument);
936        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
937            self.handle_funding_rate(*funding_rate);
938        } else if let Some(status) = data.downcast_ref::<InstrumentStatus>() {
939            self.handle_instrument_status(*status);
940        } else if let Some(option_greeks) = data.downcast_ref::<OptionGreeks>() {
941            self.cache.borrow_mut().add_option_greeks(*option_greeks);
942            let topic = switchboard::get_option_greeks_topic(option_greeks.instrument_id);
943            msgbus::publish_option_greeks(topic, option_greeks);
944            self.drain_deferred_commands();
945        } else {
946            log::error!("Cannot process data {data:?}, type is unrecognized");
947        }
948
949        // TODO: Add custom data handling here
950    }
951
952    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
953    pub fn process_data(&mut self, data: Data) {
954        self.data_count += 1;
955
956        match data {
957            Data::Delta(delta) => self.handle_delta(delta),
958            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
959            Data::Depth10(depth) => self.handle_depth10(*depth),
960            Data::Quote(quote) => {
961                self.handle_quote(quote);
962                self.drain_deferred_commands();
963            }
964            Data::Trade(trade) => self.handle_trade(trade),
965            Data::Bar(bar) => self.handle_bar(bar),
966            Data::MarkPriceUpdate(mark_price) => {
967                self.handle_mark_price(mark_price);
968                self.drain_deferred_commands();
969            }
970            Data::IndexPriceUpdate(index_price) => {
971                self.handle_index_price(index_price);
972                self.drain_deferred_commands();
973            }
974            Data::InstrumentStatus(status) => {
975                self.handle_instrument_status(status);
976                self.drain_deferred_commands();
977            }
978            Data::InstrumentClose(close) => self.handle_instrument_close(close),
979            Data::Custom(custom) => self.handle_custom_data(&custom),
980        }
981    }
982
983    /// Processes a `DataResponse`, handling and publishing the response message.
984    #[expect(clippy::needless_pass_by_value)] // Required by message bus dispatch
985    pub fn response(&mut self, resp: DataResponse) {
986        log::debug!("{RECV}{RES} {resp:?}");
987
988        self.response_count += 1;
989        let correlation_id = *resp.correlation_id();
990
991        match &resp {
992            DataResponse::Instrument(r) => {
993                self.handle_instrument_response(r.data.clone());
994            }
995            DataResponse::Instruments(r) => {
996                self.handle_instruments(&r.data);
997            }
998            DataResponse::Quotes(r) => {
999                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1000                    self.handle_quotes(&r.data);
1001                }
1002            }
1003            DataResponse::Trades(r) => {
1004                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1005                    self.handle_trades(&r.data);
1006                }
1007            }
1008            DataResponse::FundingRates(r) => {
1009                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
1010                    self.handle_funding_rates(&r.data);
1011                }
1012            }
1013            DataResponse::Bars(r) => {
1014                if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
1015                    self.handle_bars(&r.data);
1016                }
1017            }
1018            DataResponse::Book(r) => self.handle_book_response(&r.data),
1019            DataResponse::ForwardPrices(r) => {
1020                return self.handle_forward_prices_response(&correlation_id, r);
1021            }
1022            _ => todo!("Handle other response types"),
1023        }
1024
1025        msgbus::send_response(&correlation_id, &resp);
1026    }
1027
1028    // -- DATA HANDLERS ---------------------------------------------------------------------------
1029
1030    fn handle_instrument(&mut self, instrument: &InstrumentAny) {
1031        log::debug!("Handling instrument: {}", instrument.id());
1032
1033        if let Err(e) = self
1034            .cache
1035            .as_ref()
1036            .borrow_mut()
1037            .add_instrument(instrument.clone())
1038        {
1039            log_error_on_cache_insert(&e);
1040        }
1041
1042        let topic = switchboard::get_instrument_topic(instrument.id());
1043        log::debug!("Publishing instrument to topic: {topic}");
1044        msgbus::publish_any(topic, instrument);
1045
1046        self.update_option_chains(instrument);
1047    }
1048
1049    fn update_option_chains(&mut self, instrument: &InstrumentAny) {
1050        let Some(underlying) = instrument.underlying() else {
1051            return;
1052        };
1053        let Some(expiration_ns) = instrument.expiration_ns() else {
1054            return;
1055        };
1056        let Some(strike) = instrument.strike_price() else {
1057            return;
1058        };
1059        let Some(kind) = instrument.option_kind() else {
1060            return;
1061        };
1062
1063        let venue = instrument.id().venue;
1064        let settlement = instrument.settlement_currency().code;
1065        let series_id = OptionSeriesId::new(venue, underlying, settlement, expiration_ns);
1066
1067        // Clone Rc to release borrow on self.option_chain_managers before accessing self.clients
1068        let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1069            return;
1070        };
1071
1072        let clock = self.clock.clone();
1073        let client = self.get_client(None, Some(&venue));
1074
1075        if manager_rc
1076            .borrow_mut()
1077            .add_instrument(instrument.id(), strike, kind, client, &clock)
1078        {
1079            self.option_chain_instrument_index
1080                .insert(instrument.id(), series_id);
1081        }
1082    }
1083
1084    fn handle_delta(&mut self, delta: OrderBookDelta) {
1085        let deltas = if self.config.buffer_deltas {
1086            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
1087                buffered_deltas.deltas.push(delta);
1088                buffered_deltas.flags = delta.flags;
1089                buffered_deltas.sequence = delta.sequence;
1090                buffered_deltas.ts_event = delta.ts_event;
1091                buffered_deltas.ts_init = delta.ts_init;
1092            } else {
1093                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
1094                self.buffered_deltas_map
1095                    .insert(delta.instrument_id, buffered_deltas);
1096            }
1097
1098            if !RecordFlag::F_LAST.matches(delta.flags) {
1099                return; // Not the last delta for event
1100            }
1101
1102            self.buffered_deltas_map
1103                .remove(&delta.instrument_id)
1104                .expect("buffered deltas exist")
1105        } else {
1106            OrderBookDeltas::new(delta.instrument_id, vec![delta])
1107        };
1108
1109        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
1110        msgbus::publish_deltas(topic, &deltas);
1111    }
1112
1113    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
1114        if self.config.buffer_deltas {
1115            let instrument_id = deltas.instrument_id;
1116
1117            for delta in deltas.deltas {
1118                if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
1119                    buffered_deltas.deltas.push(delta);
1120                    buffered_deltas.flags = delta.flags;
1121                    buffered_deltas.sequence = delta.sequence;
1122                    buffered_deltas.ts_event = delta.ts_event;
1123                    buffered_deltas.ts_init = delta.ts_init;
1124                } else {
1125                    let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1126                    self.buffered_deltas_map
1127                        .insert(instrument_id, buffered_deltas);
1128                }
1129
1130                if RecordFlag::F_LAST.matches(delta.flags) {
1131                    let deltas_to_publish = self
1132                        .buffered_deltas_map
1133                        .remove(&instrument_id)
1134                        .expect("buffered deltas exist");
1135                    let topic = switchboard::get_book_deltas_topic(instrument_id);
1136                    msgbus::publish_deltas(topic, &deltas_to_publish);
1137                }
1138            }
1139        } else {
1140            let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
1141            msgbus::publish_deltas(topic, &deltas);
1142        }
1143    }
1144
1145    fn handle_depth10(&self, depth: OrderBookDepth10) {
1146        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
1147        msgbus::publish_depth10(topic, &depth);
1148    }
1149
1150    fn handle_quote(&self, quote: QuoteTick) {
1151        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
1152            log_error_on_cache_insert(&e);
1153        }
1154
1155        // TODO: Handle synthetics
1156
1157        let topic = switchboard::get_quotes_topic(quote.instrument_id);
1158        msgbus::publish_quote(topic, &quote);
1159    }
1160
1161    fn handle_trade(&self, trade: TradeTick) {
1162        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
1163            log_error_on_cache_insert(&e);
1164        }
1165
1166        // TODO: Handle synthetics
1167
1168        let topic = switchboard::get_trades_topic(trade.instrument_id);
1169        msgbus::publish_trade(topic, &trade);
1170    }
1171
1172    fn handle_bar(&self, bar: Bar) {
1173        // TODO: Handle additional bar logic
1174        if self.config.validate_data_sequence
1175            && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
1176        {
1177            if bar.ts_event < last_bar.ts_event {
1178                log::warn!(
1179                    "Bar {bar} was prior to last bar `ts_event` {}",
1180                    last_bar.ts_event
1181                );
1182                return; // Bar is out of sequence
1183            }
1184
1185            if bar.ts_init < last_bar.ts_init {
1186                log::warn!(
1187                    "Bar {bar} was prior to last bar `ts_init` {}",
1188                    last_bar.ts_init
1189                );
1190                return; // Bar is out of sequence
1191            }
1192            // TODO: Implement `bar.is_revision` logic
1193        }
1194
1195        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
1196            log_error_on_cache_insert(&e);
1197        }
1198
1199        let topic = switchboard::get_bars_topic(bar.bar_type);
1200        msgbus::publish_bar(topic, &bar);
1201    }
1202
1203    fn handle_mark_price(&self, mark_price: MarkPriceUpdate) {
1204        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
1205            log_error_on_cache_insert(&e);
1206        }
1207
1208        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
1209        msgbus::publish_mark_price(topic, &mark_price);
1210    }
1211
1212    fn handle_index_price(&self, index_price: IndexPriceUpdate) {
1213        if let Err(e) = self
1214            .cache
1215            .as_ref()
1216            .borrow_mut()
1217            .add_index_price(index_price)
1218        {
1219            log_error_on_cache_insert(&e);
1220        }
1221
1222        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
1223        msgbus::publish_index_price(topic, &index_price);
1224    }
1225
1226    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
1227    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
1228        if let Err(e) = self
1229            .cache
1230            .as_ref()
1231            .borrow_mut()
1232            .add_funding_rate(funding_rate)
1233        {
1234            log_error_on_cache_insert(&e);
1235        }
1236
1237        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
1238        msgbus::publish_funding_rate(topic, &funding_rate);
1239    }
1240
1241    fn handle_instrument_status(&mut self, status: InstrumentStatus) {
1242        if let Err(e) = self
1243            .cache
1244            .as_ref()
1245            .borrow_mut()
1246            .add_instrument_status(status)
1247        {
1248            log_error_on_cache_insert(&e);
1249        }
1250
1251        let topic = switchboard::get_instrument_status_topic(status.instrument_id);
1252        msgbus::publish_any(topic, &status);
1253
1254        // Check if this instrument belongs to an option chain before expiring
1255        if self
1256            .option_chain_instrument_index
1257            .contains_key(&status.instrument_id)
1258            && matches!(
1259                status.action,
1260                MarketStatusAction::Close | MarketStatusAction::NotAvailableForTrading
1261            )
1262        {
1263            self.expire_option_chain_instrument(status.instrument_id);
1264        }
1265    }
1266
1267    /// Removes a settled/expired instrument from its option chain manager.
1268    ///
1269    /// Looks up the owning series via the reverse index, delegates removal to
1270    /// the manager (which unregisters msgbus handlers and pushes deferred wire
1271    /// unsubscribes), then drains those commands. When the series catalog
1272    /// becomes empty, the entire manager is torn down.
1273    fn expire_option_chain_instrument(&mut self, instrument_id: InstrumentId) {
1274        let Some(series_id) = self.option_chain_instrument_index.remove(&instrument_id) else {
1275            return;
1276        };
1277
1278        let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1279            return;
1280        };
1281
1282        let series_empty = manager_rc
1283            .borrow_mut()
1284            .handle_instrument_expired(&instrument_id);
1285
1286        // Drain deferred unsubscribe commands pushed by the manager
1287        self.drain_deferred_commands();
1288
1289        log::info!(
1290            "Expired instrument {instrument_id} from option chain {series_id} (series_empty={series_empty})",
1291        );
1292
1293        if series_empty {
1294            manager_rc.borrow_mut().teardown(&self.clock);
1295            self.option_chain_managers.remove(&series_id);
1296
1297            log::info!("Torn down empty option chain manager for {series_id}");
1298        }
1299    }
1300
1301    fn handle_instrument_close(&self, close: InstrumentClose) {
1302        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
1303        msgbus::publish_any(topic, &close);
1304    }
1305
1306    fn handle_custom_data(&self, custom: &CustomData) {
1307        log::debug!("Processing custom data: {}", custom.data.type_name());
1308        let topic = switchboard::get_custom_topic(&custom.data_type);
1309        msgbus::publish_any(topic, custom);
1310    }
1311
1312    /// Drains deferred subscribe/unsubscribe commands pushed by option chain
1313    /// managers (or any other component) and executes them against the appropriate
1314    /// data client.
1315    fn drain_deferred_commands(&mut self) {
1316        // Loop because expire_series pushes Unsubscribe commands; converges in <= 3 iterations
1317        loop {
1318            let commands: VecDeque<DeferredCommand> =
1319                std::mem::take(&mut *self.deferred_cmd_queue.borrow_mut());
1320
1321            if commands.is_empty() {
1322                break;
1323            }
1324
1325            for cmd in commands {
1326                match cmd {
1327                    DeferredCommand::Subscribe(sub) => {
1328                        let client = self.get_client(sub.client_id(), sub.venue());
1329                        if let Some(client) = client {
1330                            client.execute_subscribe(sub);
1331                        }
1332                    }
1333                    DeferredCommand::Unsubscribe(unsub) => {
1334                        let client = self.get_client(unsub.client_id(), unsub.venue());
1335                        if let Some(client) = client {
1336                            client.execute_unsubscribe(&unsub);
1337                        }
1338                    }
1339                    DeferredCommand::ExpireSeries(series_id) => {
1340                        self.expire_series(series_id);
1341                    }
1342                }
1343            }
1344        }
1345    }
1346
1347    /// Proactively expires all instruments for a series and tears down the manager.
1348    ///
1349    /// `handle_instrument_expired` removes each instrument from the aggregator and pushes
1350    /// deferred unsubscribe commands. `teardown` then cancels the snapshot timer and clears
1351    /// the handler lists (the aggregator is already empty at that point).
1352    fn expire_series(&mut self, series_id: OptionSeriesId) {
1353        let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
1354            return;
1355        };
1356
1357        let instrument_ids: Vec<InstrumentId> = self
1358            .option_chain_instrument_index
1359            .iter()
1360            .filter(|(_, sid)| **sid == series_id)
1361            .map(|(id, _)| *id)
1362            .collect();
1363
1364        for id in &instrument_ids {
1365            self.option_chain_instrument_index.remove(id);
1366            manager_rc.borrow_mut().handle_instrument_expired(id);
1367        }
1368
1369        manager_rc.borrow_mut().teardown(&self.clock);
1370        self.option_chain_managers.remove(&series_id);
1371
1372        log::info!("Proactively torn down expired option chain {series_id}");
1373    }
1374
1375    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
1376
1377    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
1378        if cmd.instrument_id.is_synthetic() {
1379            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1380        }
1381
1382        self.book_deltas_subs.insert(cmd.instrument_id);
1383        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
1384
1385        Ok(())
1386    }
1387
1388    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
1389        if cmd.instrument_id.is_synthetic() {
1390            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
1391        }
1392
1393        self.book_depth10_subs.insert(cmd.instrument_id);
1394        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
1395
1396        Ok(())
1397    }
1398
1399    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
1400        if cmd.instrument_id.is_synthetic() {
1401            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1402        }
1403
1404        let had_snapshots = self.has_book_snapshot_subscriptions(&cmd.instrument_id);
1405        let inserted = self.increment_book_snapshot_subscription(cmd);
1406
1407        if inserted && !had_snapshots && !self.book_deltas_subs.contains(&cmd.instrument_id) {
1408            self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
1409        }
1410
1411        if had_snapshots || self.book_deltas_subs.contains(&cmd.instrument_id) {
1412            return Ok(());
1413        }
1414
1415        if let Some(client_id) = cmd.client_id.as_ref()
1416            && self.external_clients.contains(client_id)
1417        {
1418            if self.config.debug {
1419                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
1420            }
1421            return Ok(());
1422        }
1423
1424        log::debug!(
1425            "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
1426            cmd.instrument_id,
1427            cmd.client_id,
1428            cmd.venue,
1429        );
1430
1431        if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1432            let deltas_cmd = SubscribeBookDeltas::new(
1433                cmd.instrument_id,
1434                cmd.book_type,
1435                cmd.client_id,
1436                cmd.venue,
1437                UUID4::new(),
1438                cmd.ts_init,
1439                cmd.depth,
1440                true, // managed
1441                Some(cmd.command_id),
1442                cmd.params.clone(),
1443            );
1444            log::debug!(
1445                "Calling client.execute_subscribe for BookDeltas: {}",
1446                cmd.instrument_id
1447            );
1448            client.execute_subscribe(SubscribeCommand::BookDeltas(deltas_cmd));
1449        } else {
1450            log::error!(
1451                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1452                cmd.client_id,
1453                cmd.venue,
1454            );
1455        }
1456
1457        Ok(())
1458    }
1459
1460    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1461        match cmd.bar_type.aggregation_source() {
1462            AggregationSource::Internal => {
1463                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1464                    self.start_bar_aggregator(cmd.bar_type)?;
1465                }
1466            }
1467            AggregationSource::External => {
1468                if cmd.bar_type.instrument_id().is_synthetic() {
1469                    anyhow::bail!(
1470                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
1471                    );
1472                }
1473            }
1474        }
1475
1476        Ok(())
1477    }
1478
1479    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> bool {
1480        if !self.book_deltas_subs.contains(&cmd.instrument_id) {
1481            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1482            return false;
1483        }
1484
1485        self.book_deltas_subs.remove(&cmd.instrument_id);
1486        self.maintain_book_updater(&cmd.instrument_id);
1487
1488        // Snapshot subscriptions reuse the deltas feed.
1489        // Keep the client subscribed until the last snapshot consumer is gone.
1490        !self.has_book_snapshot_subscriptions(&cmd.instrument_id)
1491    }
1492
1493    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> bool {
1494        if !self.book_depth10_subs.contains(&cmd.instrument_id) {
1495            log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
1496            return false;
1497        }
1498
1499        self.book_depth10_subs.remove(&cmd.instrument_id);
1500        self.maintain_book_updater(&cmd.instrument_id);
1501
1502        true
1503    }
1504
1505    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) {
1506        match self.decrement_book_snapshot_subscription(cmd.instrument_id, cmd.interval_ms) {
1507            BookSnapshotUnsubscribeResult::NotSubscribed => {
1508                log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1509                return;
1510            }
1511            BookSnapshotUnsubscribeResult::Decremented => return,
1512            BookSnapshotUnsubscribeResult::Removed => {}
1513        }
1514
1515        if self.has_book_snapshot_subscriptions(&cmd.instrument_id) {
1516            return;
1517        }
1518
1519        self.maintain_book_updater(&cmd.instrument_id);
1520
1521        if self.book_deltas_subs.contains(&cmd.instrument_id) {
1522            return;
1523        }
1524
1525        if let Some(client_id) = cmd.client_id.as_ref()
1526            && self.external_clients.contains(client_id)
1527        {
1528            return;
1529        }
1530
1531        if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1532            let deltas_cmd = UnsubscribeBookDeltas::new(
1533                cmd.instrument_id,
1534                cmd.client_id,
1535                cmd.venue,
1536                UUID4::new(),
1537                cmd.ts_init,
1538                Some(cmd.command_id),
1539                cmd.params.clone(),
1540            );
1541            client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
1542        }
1543    }
1544
1545    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) {
1546        let bar_type = cmd.bar_type;
1547
1548        // Don't remove aggregator if other exact-topic subscribers still exist
1549        let topic = switchboard::get_bars_topic(bar_type.standard());
1550        if msgbus::exact_subscriber_count_bars(topic) > 0 {
1551            return;
1552        }
1553
1554        if self.bar_aggregators.contains_key(&bar_type.standard())
1555            && let Err(e) = self.stop_bar_aggregator(bar_type)
1556        {
1557            log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1558        }
1559
1560        // After stopping a composite, check if the source aggregator is now orphaned
1561        if bar_type.is_composite() {
1562            let source_type = bar_type.composite();
1563            let source_topic = switchboard::get_bars_topic(source_type);
1564            if msgbus::exact_subscriber_count_bars(source_topic) == 0
1565                && self.bar_aggregators.contains_key(&source_type)
1566                && let Err(e) = self.stop_bar_aggregator(source_type)
1567            {
1568                log::error!("Error stopping source bar aggregator for {source_type}: {e}");
1569            }
1570        }
1571    }
1572
1573    fn subscribe_option_chain(&mut self, cmd: &SubscribeOptionChain) {
1574        let series_id = cmd.series_id;
1575
1576        // Handle edits to existing subscriptions by tearing down and re-setting up the OptionChainManager.
1577        if let Some(old) = self.option_chain_managers.remove(&series_id) {
1578            log::info!("Re-subscribing option chain for {series_id}, tearing down previous");
1579            let all_ids = old.borrow().all_instrument_ids();
1580            let old_venue = old.borrow().venue();
1581            old.borrow_mut().teardown(&self.clock);
1582            self.forward_option_chain_unsubscribes(&all_ids, old_venue, cmd.client_id);
1583        }
1584
1585        // Drain any stale pending forward price requests for this series
1586        self.pending_option_chain_requests
1587            .retain(|_, pending_cmd| pending_cmd.series_id != series_id);
1588
1589        // For ATM-based strike ranges, request forward prices from the adapter
1590        // to enable instant bootstrap without waiting for the first WebSocket tick.
1591        if !matches!(cmd.strike_range, StrikeRange::Fixed(_)) {
1592            // Extract client_id first to avoid borrow conflicts
1593            let resolved_client_id = self
1594                .get_client(cmd.client_id.as_ref(), Some(&series_id.venue))
1595                .map(|c| c.client_id);
1596
1597            if let Some(client_id) = resolved_client_id {
1598                let request_id = UUID4::new();
1599                let ts_init = self.clock.borrow().timestamp_ns();
1600
1601                // Pick any one option instrument at this expiry from cache
1602                // to enable single-instrument forward price fetch (1 HTTP call)
1603                let sample_instrument_id = {
1604                    let cache = self.cache.borrow();
1605                    cache
1606                        .instruments(&series_id.venue, Some(&series_id.underlying))
1607                        .iter()
1608                        .find(|i| {
1609                            i.expiration_ns() == Some(series_id.expiration_ns)
1610                                && i.settlement_currency().code == series_id.settlement_currency
1611                        })
1612                        .map(|i| i.id())
1613                };
1614
1615                let request = RequestForwardPrices::new(
1616                    series_id.venue,
1617                    series_id.underlying,
1618                    sample_instrument_id,
1619                    Some(client_id),
1620                    request_id,
1621                    ts_init,
1622                    None,
1623                );
1624
1625                self.pending_option_chain_requests
1626                    .insert(request_id, cmd.clone());
1627
1628                let req_cmd = RequestCommand::ForwardPrices(request);
1629                if let Err(e) = self.execute_request(req_cmd) {
1630                    log::warn!("Failed to request forward prices for {series_id}: {e}");
1631                    let cmd = self
1632                        .pending_option_chain_requests
1633                        .remove(&request_id)
1634                        .expect("just inserted");
1635                    self.create_option_chain_manager(&cmd, None);
1636                }
1637
1638                return;
1639            }
1640        }
1641
1642        self.create_option_chain_manager(cmd, None);
1643    }
1644
1645    /// Creates and stores an `OptionChainManager` for the given subscription.
1646    fn create_option_chain_manager(
1647        &mut self,
1648        cmd: &SubscribeOptionChain,
1649        initial_atm_price: Option<Price>,
1650    ) {
1651        let series_id = cmd.series_id;
1652        let cache = self.cache.clone();
1653        let clock = self.clock.clone();
1654        let priority = self.msgbus_priority;
1655        let deferred_cmd_queue = self.deferred_cmd_queue.clone();
1656
1657        let manager_rc = {
1658            let client = self.get_client(cmd.client_id.as_ref(), Some(&series_id.venue));
1659            OptionChainManager::create_and_setup(
1660                series_id,
1661                &cache,
1662                cmd,
1663                &clock,
1664                priority,
1665                client,
1666                initial_atm_price,
1667                deferred_cmd_queue,
1668            )
1669        };
1670
1671        // Index all instruments for reverse lookup
1672        for id in manager_rc.borrow().all_instrument_ids() {
1673            self.option_chain_instrument_index.insert(id, series_id);
1674        }
1675
1676        self.option_chain_managers.insert(series_id, manager_rc);
1677    }
1678
1679    fn unsubscribe_option_chain(&mut self, cmd: &UnsubscribeOptionChain) {
1680        let series_id = cmd.series_id;
1681
1682        let Some(manager_rc) = self.option_chain_managers.remove(&series_id) else {
1683            log::warn!("Cannot unsubscribe option chain for {series_id}: not subscribed");
1684            return;
1685        };
1686
1687        // Extract info before teardown
1688        let all_ids = manager_rc.borrow().all_instrument_ids();
1689        let venue = manager_rc.borrow().venue();
1690
1691        // Remove all instruments from reverse index
1692        for id in &all_ids {
1693            self.option_chain_instrument_index.remove(id);
1694        }
1695
1696        manager_rc.borrow_mut().teardown(&self.clock);
1697
1698        // Forward wire-level unsubscribes to the data client
1699        self.forward_option_chain_unsubscribes(&all_ids, venue, cmd.client_id);
1700
1701        log::info!("Unsubscribed option chain for {series_id}");
1702    }
1703
1704    /// Forwards wire-level unsubscribe commands for all option chain instruments.
1705    fn forward_option_chain_unsubscribes(
1706        &mut self,
1707        instrument_ids: &[InstrumentId],
1708        venue: Venue,
1709        client_id: Option<ClientId>,
1710    ) {
1711        let ts_init = self.clock.borrow().timestamp_ns();
1712
1713        let Some(client) = self.get_client(client_id.as_ref(), Some(&venue)) else {
1714            log::error!(
1715                "Cannot forward option chain unsubscribes: no client found for venue={venue}",
1716            );
1717            return;
1718        };
1719
1720        for instrument_id in instrument_ids {
1721            client.execute_unsubscribe(&UnsubscribeCommand::Quotes(UnsubscribeQuotes::new(
1722                *instrument_id,
1723                client_id,
1724                Some(venue),
1725                UUID4::new(),
1726                ts_init,
1727                None,
1728                None,
1729            )));
1730            client.execute_unsubscribe(&UnsubscribeCommand::OptionGreeks(
1731                UnsubscribeOptionGreeks::new(
1732                    *instrument_id,
1733                    client_id,
1734                    Some(venue),
1735                    UUID4::new(),
1736                    ts_init,
1737                    None,
1738                    None,
1739                ),
1740            ));
1741            client.execute_unsubscribe(&UnsubscribeCommand::InstrumentStatus(
1742                UnsubscribeInstrumentStatus::new(
1743                    *instrument_id,
1744                    client_id,
1745                    Some(venue),
1746                    UUID4::new(),
1747                    ts_init,
1748                    None,
1749                    None,
1750                ),
1751            ));
1752        }
1753    }
1754
1755    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId) {
1756        let Some(updater) = self.book_updaters.get(instrument_id) else {
1757            return;
1758        };
1759
1760        let has_deltas = self.book_deltas_subs.contains(instrument_id);
1761        let has_depth10 = self.book_depth10_subs.contains(instrument_id);
1762        let has_snapshots = self.has_book_snapshot_subscriptions(instrument_id);
1763
1764        let deltas_topic = switchboard::get_book_deltas_topic(*instrument_id);
1765        let depth_topic = switchboard::get_book_depth10_topic(*instrument_id);
1766        let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
1767        let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater.clone());
1768
1769        if !has_deltas && !has_snapshots {
1770            msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
1771        }
1772
1773        if !has_depth10 {
1774            msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
1775        }
1776
1777        if !has_deltas && !has_depth10 && !has_snapshots {
1778            self.book_updaters.remove(instrument_id);
1779            log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1780        }
1781    }
1782
1783    fn has_book_snapshot_subscriptions(&self, instrument_id: &InstrumentId) -> bool {
1784        self.book_snapshot_counts
1785            .keys()
1786            .any(|(id, _)| id == instrument_id)
1787    }
1788
1789    fn increment_book_snapshot_subscription(&mut self, cmd: &SubscribeBookSnapshots) -> bool {
1790        let key = (cmd.instrument_id, cmd.interval_ms);
1791
1792        if let Some(count) = self.book_snapshot_counts.get_mut(&key) {
1793            *count += 1;
1794            return false;
1795        }
1796
1797        self.book_snapshot_counts.insert(key, 1);
1798
1799        let snapshot_infos = if let Some(snapshot_infos) = self.book_intervals.get(&cmd.interval_ms)
1800        {
1801            snapshot_infos.clone()
1802        } else {
1803            let snapshot_infos = Rc::new(RefCell::new(IndexMap::new()));
1804            self.book_intervals
1805                .insert(cmd.interval_ms, snapshot_infos.clone());
1806            self.schedule_book_snapshotter(cmd.interval_ms, snapshot_infos.clone());
1807            snapshot_infos
1808        };
1809
1810        let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
1811        let snap_info = BookSnapshotInfo {
1812            instrument_id: cmd.instrument_id,
1813            venue: cmd.instrument_id.venue,
1814            is_composite: cmd.instrument_id.symbol.is_composite(),
1815            root: Ustr::from(cmd.instrument_id.symbol.root()),
1816            topic,
1817            interval_ms: cmd.interval_ms,
1818        };
1819
1820        snapshot_infos
1821            .borrow_mut()
1822            .insert(cmd.instrument_id, snap_info);
1823
1824        true
1825    }
1826
1827    fn decrement_book_snapshot_subscription(
1828        &mut self,
1829        instrument_id: InstrumentId,
1830        interval_ms: NonZeroUsize,
1831    ) -> BookSnapshotUnsubscribeResult {
1832        let key = (instrument_id, interval_ms);
1833
1834        let Some(count) = self.book_snapshot_counts.get_mut(&key) else {
1835            return BookSnapshotUnsubscribeResult::NotSubscribed;
1836        };
1837
1838        if *count > 1 {
1839            *count -= 1;
1840            return BookSnapshotUnsubscribeResult::Decremented;
1841        }
1842
1843        self.book_snapshot_counts.shift_remove(&key);
1844
1845        let remove_interval = if let Some(snapshot_infos) = self.book_intervals.get(&interval_ms) {
1846            let mut snapshot_infos = snapshot_infos.borrow_mut();
1847            snapshot_infos.shift_remove(&instrument_id);
1848            snapshot_infos.is_empty()
1849        } else {
1850            false
1851        };
1852
1853        if remove_interval {
1854            self.book_intervals.remove(&interval_ms);
1855
1856            if let Some(snapshotter) = self.book_snapshotters.remove(&interval_ms) {
1857                let timer_name = snapshotter.timer_name;
1858                let mut clock = self.clock.borrow_mut();
1859                if clock.timer_exists(&timer_name) {
1860                    clock.cancel_timer(&timer_name);
1861                }
1862            }
1863        }
1864
1865        BookSnapshotUnsubscribeResult::Removed
1866    }
1867
1868    fn schedule_book_snapshotter(
1869        &mut self,
1870        interval_ms: NonZeroUsize,
1871        snapshot_infos: BookSnapshotInfos,
1872    ) {
1873        let interval_ns = millis_to_nanos_unchecked(interval_ms.get() as f64);
1874        let now_ns = self.clock.borrow().timestamp_ns().as_u64();
1875        let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
1876
1877        let snapshotter = Rc::new(BookSnapshotter::new(
1878            interval_ms,
1879            snapshot_infos,
1880            self.cache.clone(),
1881        ));
1882        let timer_name = snapshotter.timer_name;
1883        let snapshotter_callback = snapshotter.clone();
1884        let callback_fn: Rc<dyn Fn(TimeEvent)> =
1885            Rc::new(move |event| snapshotter_callback.snapshot(event));
1886        let callback = TimeEventCallback::from(callback_fn);
1887
1888        self.clock
1889            .borrow_mut()
1890            .set_timer_ns(
1891                &timer_name,
1892                interval_ns,
1893                Some(start_time_ns.into()),
1894                None,
1895                Some(callback),
1896                None,
1897                None,
1898            )
1899            .expect(FAILED);
1900
1901        self.book_snapshotters.insert(interval_ms, snapshotter);
1902    }
1903
1904    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1905
1906    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1907        let mut cache = self.cache.as_ref().borrow_mut();
1908        if let Err(e) = cache.add_instrument(instrument) {
1909            log_error_on_cache_insert(&e);
1910        }
1911    }
1912
1913    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1914        // TODO: Improve by adding bulk update methods to cache and database
1915        let mut cache = self.cache.as_ref().borrow_mut();
1916        for instrument in instruments {
1917            if let Err(e) = cache.add_instrument(instrument.clone()) {
1918                log_error_on_cache_insert(&e);
1919            }
1920        }
1921    }
1922
1923    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1924        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1925            log_error_on_cache_insert(&e);
1926        }
1927    }
1928
1929    fn handle_trades(&self, trades: &[TradeTick]) {
1930        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1931            log_error_on_cache_insert(&e);
1932        }
1933    }
1934
1935    fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
1936        if let Err(e) = self
1937            .cache
1938            .as_ref()
1939            .borrow_mut()
1940            .add_funding_rates(funding_rates)
1941        {
1942            log_error_on_cache_insert(&e);
1943        }
1944    }
1945
1946    fn handle_bars(&self, bars: &[Bar]) {
1947        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1948            log_error_on_cache_insert(&e);
1949        }
1950    }
1951
1952    fn handle_book_response(&self, book: &OrderBook) {
1953        log::debug!("Adding order book {} to cache", book.instrument_id);
1954
1955        if let Err(e) = self
1956            .cache
1957            .as_ref()
1958            .borrow_mut()
1959            .add_order_book(book.clone())
1960        {
1961            log_error_on_cache_insert(&e);
1962        }
1963    }
1964
1965    /// Handles a `ForwardPricesResponse` by extracting the forward price
1966    /// for the pending option chain and creating the manager with instant bootstrap.
1967    fn handle_forward_prices_response(
1968        &mut self,
1969        correlation_id: &UUID4,
1970        resp: &ForwardPricesResponse,
1971    ) {
1972        let Some(cmd) = self.pending_option_chain_requests.remove(correlation_id) else {
1973            log::debug!(
1974                "No pending option chain request for correlation_id={correlation_id}, ignoring"
1975            );
1976            return;
1977        };
1978
1979        let series_id = cmd.series_id;
1980
1981        // Find a forward price that matches an instrument in this series.
1982        // We look up each forward price instrument in the cache to match by expiry and currency.
1983        let cache = self.cache.borrow();
1984        let mut best_price: Option<Price> = None;
1985
1986        for fp in &resp.data {
1987            // Check if any cached instrument with this id belongs to our series
1988            if let Some(instrument) = cache.instrument(&fp.instrument_id)
1989                && let Some(expiration) = instrument.expiration_ns()
1990                && expiration == series_id.expiration_ns
1991                && instrument.settlement_currency().code == series_id.settlement_currency
1992            {
1993                match Price::from_decimal(fp.forward_price) {
1994                    Ok(price) => best_price = Some(price),
1995                    Err(e) => log::warn!("Invalid forward price for {}: {e}", fp.instrument_id),
1996                }
1997                break;
1998            }
1999        }
2000        drop(cache);
2001
2002        if let Some(price) = best_price {
2003            log::info!("Forward price for {series_id}: {price} (instant bootstrap)",);
2004        } else {
2005            log::info!(
2006                "No matching forward price found for {series_id}, will bootstrap from live data",
2007            );
2008        }
2009
2010        self.create_option_chain_manager(&cmd, best_price);
2011    }
2012
2013    // -- INTERNAL --------------------------------------------------------------------------------
2014
2015    fn setup_book_updater(
2016        &mut self,
2017        instrument_id: &InstrumentId,
2018        book_type: BookType,
2019        only_deltas: bool,
2020        managed: bool,
2021    ) -> anyhow::Result<()> {
2022        let mut cache = self.cache.borrow_mut();
2023        if managed && !cache.has_order_book(instrument_id) {
2024            let book = OrderBook::new(*instrument_id, book_type);
2025            log::debug!("Created {book}");
2026            cache.add_order_book(book)?;
2027        }
2028
2029        // Reuse existing BookUpdater or create a new one
2030        let updater = self
2031            .book_updaters
2032            .entry(*instrument_id)
2033            .or_insert_with(|| Rc::new(BookUpdater::new(instrument_id, self.cache.clone())))
2034            .clone();
2035
2036        // Subscribe to deltas (typed router handles duplicates)
2037        let topic = switchboard::get_book_deltas_topic(*instrument_id);
2038        let deltas_handler = TypedHandler::new(updater.clone());
2039        msgbus::subscribe_book_deltas(topic.into(), deltas_handler, Some(self.msgbus_priority));
2040
2041        // Subscribe to depth10 if not only_deltas
2042        if !only_deltas {
2043            let topic = switchboard::get_book_depth10_topic(*instrument_id);
2044            let depth_handler = TypedHandler::new(updater);
2045            msgbus::subscribe_book_depth10(topic.into(), depth_handler, Some(self.msgbus_priority));
2046        }
2047
2048        Ok(())
2049    }
2050
2051    fn create_bar_aggregator(
2052        &self,
2053        instrument: &InstrumentAny,
2054        bar_type: BarType,
2055    ) -> Box<dyn BarAggregator> {
2056        let cache = self.cache.clone();
2057
2058        let handler = move |bar: Bar| {
2059            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
2060                log_error_on_cache_insert(&e);
2061            }
2062
2063            let topic = switchboard::get_bars_topic(bar.bar_type);
2064            msgbus::publish_bar(topic, &bar);
2065        };
2066
2067        let clock = self.clock.clone();
2068        let config = self.config.clone();
2069
2070        let price_precision = instrument.price_precision();
2071        let size_precision = instrument.size_precision();
2072
2073        if bar_type.spec().is_time_aggregated() {
2074            // Get time_bars_origin_offset from config
2075            let time_bars_origin_offset = config
2076                .time_bars_origins
2077                .get(&bar_type.spec().aggregation)
2078                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
2079
2080            Box::new(TimeBarAggregator::new(
2081                bar_type,
2082                price_precision,
2083                size_precision,
2084                clock,
2085                handler,
2086                config.time_bars_build_with_no_updates,
2087                config.time_bars_timestamp_on_close,
2088                config.time_bars_interval_type,
2089                time_bars_origin_offset,
2090                config.time_bars_build_delay,
2091                config.time_bars_skip_first_non_full_bar,
2092            ))
2093        } else {
2094            match bar_type.spec().aggregation {
2095                BarAggregation::Tick => Box::new(TickBarAggregator::new(
2096                    bar_type,
2097                    price_precision,
2098                    size_precision,
2099                    handler,
2100                )) as Box<dyn BarAggregator>,
2101                BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
2102                    bar_type,
2103                    price_precision,
2104                    size_precision,
2105                    handler,
2106                )) as Box<dyn BarAggregator>,
2107                BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
2108                    bar_type,
2109                    price_precision,
2110                    size_precision,
2111                    handler,
2112                )) as Box<dyn BarAggregator>,
2113                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
2114                    bar_type,
2115                    price_precision,
2116                    size_precision,
2117                    handler,
2118                )) as Box<dyn BarAggregator>,
2119                BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
2120                    bar_type,
2121                    price_precision,
2122                    size_precision,
2123                    handler,
2124                )) as Box<dyn BarAggregator>,
2125                BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
2126                    bar_type,
2127                    price_precision,
2128                    size_precision,
2129                    handler,
2130                )) as Box<dyn BarAggregator>,
2131                BarAggregation::Value => Box::new(ValueBarAggregator::new(
2132                    bar_type,
2133                    price_precision,
2134                    size_precision,
2135                    handler,
2136                )) as Box<dyn BarAggregator>,
2137                BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
2138                    bar_type,
2139                    price_precision,
2140                    size_precision,
2141                    handler,
2142                )) as Box<dyn BarAggregator>,
2143                BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
2144                    bar_type,
2145                    price_precision,
2146                    size_precision,
2147                    handler,
2148                )) as Box<dyn BarAggregator>,
2149                BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
2150                    bar_type,
2151                    price_precision,
2152                    size_precision,
2153                    instrument.price_increment(),
2154                    handler,
2155                )) as Box<dyn BarAggregator>,
2156                other => unreachable!(
2157                    "Unsupported internal bar aggregation dispatch for {other:?}; update `create_bar_aggregator`"
2158                ),
2159            }
2160        }
2161    }
2162
2163    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
2164        // Get the instrument for this bar type
2165        let instrument = {
2166            let cache = self.cache.borrow();
2167            cache
2168                .instrument(&bar_type.instrument_id())
2169                .ok_or_else(|| {
2170                    anyhow::anyhow!(
2171                        "Cannot start bar aggregation: no instrument found for {}",
2172                        bar_type.instrument_id(),
2173                    )
2174                })?
2175                .clone()
2176        };
2177
2178        // Use standard form of bar type as key
2179        let bar_key = bar_type.standard();
2180
2181        // Create or retrieve aggregator in Rc<RefCell>
2182        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
2183            rc.clone()
2184        } else {
2185            let agg = self.create_bar_aggregator(&instrument, bar_type);
2186            let rc = Rc::new(RefCell::new(agg));
2187            self.bar_aggregators.insert(bar_key, rc.clone());
2188            rc
2189        };
2190
2191        // Subscribe to underlying data topics
2192        let mut subscriptions = Vec::new();
2193
2194        if bar_type.is_composite() {
2195            let topic = switchboard::get_bars_topic(bar_type.composite());
2196            let handler = TypedHandler::new(BarBarHandler::new(&aggregator, bar_key));
2197            msgbus::subscribe_bars(topic.into(), handler.clone(), Some(self.msgbus_priority));
2198            subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
2199        } else if bar_type.spec().price_type == PriceType::Last {
2200            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
2201            let handler = TypedHandler::new(BarTradeHandler::new(&aggregator, bar_key));
2202            msgbus::subscribe_trades(topic.into(), handler.clone(), Some(self.msgbus_priority));
2203            subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
2204        } else {
2205            // Warn if imbalance/runs aggregation is wired to quotes (needs aggressor_side from trades)
2206            if matches!(
2207                bar_type.spec().aggregation,
2208                BarAggregation::TickImbalance
2209                    | BarAggregation::VolumeImbalance
2210                    | BarAggregation::ValueImbalance
2211                    | BarAggregation::TickRuns
2212                    | BarAggregation::VolumeRuns
2213                    | BarAggregation::ValueRuns
2214            ) {
2215                log::warn!(
2216                    "Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
2217                     data with `aggressor_side`, but `price_type` is not LAST so it will receive \
2218                     quote data: bars will not emit correctly",
2219                );
2220            }
2221
2222            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
2223            let handler = TypedHandler::new(BarQuoteHandler::new(&aggregator, bar_key));
2224            msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(self.msgbus_priority));
2225            subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
2226        }
2227
2228        self.bar_aggregator_handlers.insert(bar_key, subscriptions);
2229
2230        // Setup time bar aggregator if needed (matches Cython _setup_bar_aggregator)
2231        self.setup_bar_aggregator(bar_type, false)?;
2232
2233        aggregator.borrow_mut().set_is_running(true);
2234
2235        Ok(())
2236    }
2237
2238    /// Sets up a bar aggregator, matching Cython _setup_bar_aggregator logic.
2239    ///
2240    /// This method handles historical mode, message bus subscriptions, and time bar aggregator setup.
2241    fn setup_bar_aggregator(&self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
2242        let bar_key = bar_type.standard();
2243        let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
2244            anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
2245        })?;
2246
2247        // Set historical mode and handler
2248        let handler: Box<dyn FnMut(Bar)> = if historical {
2249            // Historical handler - process_historical equivalent
2250            let cache = self.cache.clone();
2251            Box::new(move |bar: Bar| {
2252                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
2253                    log_error_on_cache_insert(&e);
2254                }
2255                // In historical mode, bars are processed but not published to message bus
2256            })
2257        } else {
2258            // Regular handler - process equivalent
2259            let cache = self.cache.clone();
2260            Box::new(move |bar: Bar| {
2261                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
2262                    log_error_on_cache_insert(&e);
2263                }
2264                let topic = switchboard::get_bars_topic(bar.bar_type);
2265                msgbus::publish_bar(topic, &bar);
2266            })
2267        };
2268
2269        aggregator
2270            .borrow_mut()
2271            .set_historical_mode(historical, handler);
2272
2273        // For TimeBarAggregator, set clock and start timer
2274        if bar_type.spec().is_time_aggregated() {
2275            use nautilus_common::clock::TestClock;
2276
2277            if historical {
2278                // Each aggregator gets its own independent clock
2279                let test_clock = Rc::new(RefCell::new(TestClock::new()));
2280                aggregator.borrow_mut().set_clock(test_clock);
2281                // Set weak reference for historical mode (start_timer called later from preprocess_historical_events)
2282                // Store weak reference so start_timer can use it when called later
2283                let aggregator_weak = Rc::downgrade(aggregator);
2284                aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
2285            } else {
2286                aggregator.borrow_mut().set_clock(self.clock.clone());
2287                aggregator
2288                    .borrow_mut()
2289                    .start_timer(Some(aggregator.clone()));
2290            }
2291        }
2292
2293        Ok(())
2294    }
2295
2296    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
2297        let aggregator = self
2298            .bar_aggregators
2299            .shift_remove(&bar_type.standard())
2300            .ok_or_else(|| {
2301                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
2302            })?;
2303
2304        aggregator.borrow_mut().stop();
2305
2306        // Unsubscribe any registered message handlers
2307        let bar_key = bar_type.standard();
2308        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
2309            for sub in subs {
2310                match sub {
2311                    BarAggregatorSubscription::Bar { topic, handler } => {
2312                        msgbus::unsubscribe_bars(topic.into(), &handler);
2313                    }
2314                    BarAggregatorSubscription::Trade { topic, handler } => {
2315                        msgbus::unsubscribe_trades(topic.into(), &handler);
2316                    }
2317                    BarAggregatorSubscription::Quote { topic, handler } => {
2318                        msgbus::unsubscribe_quotes(topic.into(), &handler);
2319                    }
2320                }
2321            }
2322        }
2323
2324        Ok(())
2325    }
2326}
2327
2328#[inline(always)]
2329fn log_error_on_cache_insert<T: Display>(e: &T) {
2330    log::error!("Error on cache insert: {e}");
2331}
2332
2333#[inline(always)]
2334fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
2335    if data.is_empty() {
2336        let name = type_name::<T>();
2337        let short_name = name.rsplit("::").next().unwrap_or(name);
2338        log::warn!("Received empty {short_name} response for {id} {correlation_id}");
2339        return true;
2340    }
2341    false
2342}