Skip to main content

nautilus_data/option_chains/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Per-series option chain manager.
17//!
18//! Each [`OptionChainManager`] instance is self-contained: it owns its aggregator,
19//! msgbus handlers, and timer for a single option series. The `DataEngine` holds
20//! one manager per active series in
21//! `AHashMap<OptionSeriesId, Rc<RefCell<OptionChainManager>>>`.
22
23use std::{cell::RefCell, collections::HashMap, rc::Rc};
24
25use nautilus_common::{
26    cache::Cache,
27    clock::Clock,
28    messages::data::{
29        SubscribeCommand, SubscribeInstrumentStatus, SubscribeOptionChain, SubscribeOptionGreeks,
30        SubscribeQuotes, UnsubscribeCommand, UnsubscribeInstrumentStatus, UnsubscribeOptionGreeks,
31        UnsubscribeQuotes,
32    },
33    msgbus::{self, MStr, Topic, TypedHandler, switchboard},
34    timer::{TimeEvent, TimeEventCallback},
35};
36use nautilus_core::{UUID4, correctness::FAILED, datetime::millis_to_nanos_unchecked};
37use nautilus_model::{
38    data::{QuoteTick, option_chain::OptionGreeks},
39    enums::OptionKind,
40    identifiers::{InstrumentId, OptionSeriesId, Venue},
41    instruments::Instrument,
42    types::Price,
43};
44use ustr::Ustr;
45
46use super::{
47    AtmTracker, OptionChainAggregator,
48    handlers::{OptionChainGreeksHandler, OptionChainQuoteHandler, OptionChainSlicePublisher},
49};
50use crate::{
51    client::DataClientAdapter,
52    engine::{DeferredCommand, DeferredCommandQueue},
53};
54
55/// Per-series option chain manager.
56///
57/// Each instance manages a single option series: its aggregator,
58/// handlers, timer, and lifecycle. The `DataEngine` holds one
59/// manager per active series.
60#[derive(Debug)]
61pub struct OptionChainManager {
62    aggregator: OptionChainAggregator,
63    topic: MStr<Topic>,
64    quote_handlers: Vec<TypedHandler<QuoteTick>>,
65    greeks_handlers: Vec<TypedHandler<OptionGreeks>>,
66    timer_name: Option<Ustr>,
67    msgbus_priority: u8,
68    /// Whether the first ATM price has been received and the active set bootstrapped.
69    bootstrapped: bool,
70    /// Shared deferred command queue — the `DataEngine` drains this on each data tick.
71    deferred_cmd_queue: DeferredCommandQueue,
72    /// Clock reference for constructing command timestamps.
73    clock: Rc<RefCell<dyn Clock>>,
74    /// When `true`, every quote/greeks update for an active instrument immediately publishes a snapshot.
75    raw_mode: bool,
76}
77
78impl OptionChainManager {
79    /// Factory method that creates a per-series manager, registers all msgbus
80    /// handlers, forwards subscribe commands to the data client, and sets up
81    /// the snapshot timer.
82    ///
83    /// Returns the manager wrapped in `Rc<RefCell<>>` (needed for `WeakCell`
84    /// handler pattern).
85    #[expect(clippy::too_many_arguments)]
86    pub(crate) fn create_and_setup(
87        series_id: OptionSeriesId,
88        cache: &Rc<RefCell<Cache>>,
89        cmd: &SubscribeOptionChain,
90        clock: &Rc<RefCell<dyn Clock>>,
91        msgbus_priority: u8,
92        client: Option<&mut DataClientAdapter>,
93        initial_atm_price: Option<Price>,
94        deferred_cmd_queue: DeferredCommandQueue,
95    ) -> Rc<RefCell<Self>> {
96        let topic = switchboard::get_option_chain_topic(series_id);
97        let instruments = Self::resolve_instruments(cache, &series_id);
98
99        let mut tracker = AtmTracker::new();
100
101        // Derive forward price precision from instrument strike prices
102        if let Some((strike, _)) = instruments.values().next() {
103            tracker.set_forward_precision(strike.precision);
104        }
105
106        if let Some(price) = initial_atm_price {
107            tracker.set_initial_price(price);
108            log::info!("Pre-populated ATM with forward price: {price}");
109        }
110        let aggregator =
111            OptionChainAggregator::new(series_id, cmd.strike_range.clone(), tracker, instruments);
112
113        // Initial active set for msgbus handlers (subset of all instruments).
114        // When ATM is unknown (ATM-based ranges), this is empty — deferred until bootstrap.
115        let active_instrument_ids = aggregator.instrument_ids();
116        let all_instrument_ids = aggregator.all_instrument_ids();
117        // If active set is already populated (Fixed range or ATM provided), we're bootstrapped
118        let bootstrapped = !active_instrument_ids.is_empty() || all_instrument_ids.is_empty();
119
120        let raw_mode = cmd.snapshot_interval_ms.is_none();
121
122        let manager = Self {
123            aggregator,
124            topic,
125            quote_handlers: Vec::new(),
126            greeks_handlers: Vec::new(),
127            timer_name: None,
128            msgbus_priority,
129            bootstrapped,
130            deferred_cmd_queue,
131            clock: clock.clone(),
132            raw_mode,
133        };
134        let manager_rc = Rc::new(RefCell::new(manager));
135
136        // Register msgbus handlers for initial active set only
137        let (quote_handlers, _quote_handler) = Self::register_quote_handlers(
138            &manager_rc,
139            &active_instrument_ids,
140            series_id,
141            msgbus_priority,
142        );
143        let greeks_handlers = Self::register_greeks_handlers(
144            &manager_rc,
145            &active_instrument_ids,
146            series_id,
147            msgbus_priority,
148        );
149
150        // Forward wire-level subscriptions for the active set.
151        // When ATM is unknown, active set is empty — deferred until bootstrap.
152        Self::forward_client_subscriptions(
153            client,
154            &active_instrument_ids,
155            cmd,
156            series_id.venue,
157            clock,
158        );
159
160        let timer_name = cmd
161            .snapshot_interval_ms
162            .map(|ms| Self::setup_timer(&manager_rc, series_id, ms, clock));
163
164        {
165            let mut mgr = manager_rc.borrow_mut();
166            mgr.quote_handlers = quote_handlers;
167            mgr.greeks_handlers = greeks_handlers;
168            mgr.timer_name = timer_name;
169        }
170
171        let mode_str = match cmd.snapshot_interval_ms {
172            Some(ms) => format!("interval={ms}ms"),
173            None => "mode=raw".to_string(),
174        };
175        log::info!(
176            "Subscribed option chain for {series_id} ({} active/{} total instruments, {mode_str})",
177            active_instrument_ids.len(),
178            all_instrument_ids.len(),
179        );
180
181        manager_rc
182    }
183
184    /// Registers quote handlers on the msgbus for each instrument.
185    ///
186    /// Always stores the handler prototype as the first element so that
187    /// `register_handlers_for_instrument` can clone it during deferred bootstrap.
188    fn register_quote_handlers(
189        manager_rc: &Rc<RefCell<Self>>,
190        instrument_ids: &[InstrumentId],
191        series_id: OptionSeriesId,
192        priority: u8,
193    ) -> (Vec<TypedHandler<QuoteTick>>, TypedHandler<QuoteTick>) {
194        let quote_handler = TypedHandler::new(OptionChainQuoteHandler::new(manager_rc, series_id));
195        // Always store prototype as first element for bootstrap cloning
196        let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
197        handlers.push(quote_handler.clone());
198
199        for instrument_id in instrument_ids {
200            let topic = switchboard::get_quotes_topic(*instrument_id);
201            msgbus::subscribe_quotes(topic.into(), quote_handler.clone(), Some(priority));
202            handlers.push(quote_handler.clone());
203        }
204        (handlers, quote_handler)
205    }
206
207    /// Registers greeks handlers on the msgbus for each instrument.
208    ///
209    /// Always stores the handler prototype as the first element so that
210    /// `register_handlers_for_instrument` can clone it during deferred bootstrap.
211    fn register_greeks_handlers(
212        manager_rc: &Rc<RefCell<Self>>,
213        instrument_ids: &[InstrumentId],
214        series_id: OptionSeriesId,
215        priority: u8,
216    ) -> Vec<TypedHandler<OptionGreeks>> {
217        let greeks_handler =
218            TypedHandler::new(OptionChainGreeksHandler::new(manager_rc, series_id));
219        // Always store prototype as first element for bootstrap cloning
220        let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
221        handlers.push(greeks_handler.clone());
222
223        for instrument_id in instrument_ids {
224            let topic = switchboard::get_option_greeks_topic(*instrument_id);
225            msgbus::subscribe_option_greeks(topic.into(), greeks_handler.clone(), Some(priority));
226            handlers.push(greeks_handler.clone());
227        }
228        handlers
229    }
230
231    /// Forwards subscribe commands to the data client for all instruments.
232    fn forward_client_subscriptions(
233        client: Option<&mut DataClientAdapter>,
234        instrument_ids: &[InstrumentId],
235        cmd: &SubscribeOptionChain,
236        venue: Venue,
237        clock: &Rc<RefCell<dyn Clock>>,
238    ) {
239        let ts_init = clock.borrow().timestamp_ns();
240
241        let Some(client) = client else {
242            log::error!(
243                "Cannot forward option chain subscriptions: no client found for venue={venue}",
244            );
245            return;
246        };
247
248        for instrument_id in instrument_ids {
249            client.execute_subscribe(SubscribeCommand::Quotes(SubscribeQuotes {
250                instrument_id: *instrument_id,
251                client_id: cmd.client_id,
252                venue: Some(venue),
253                command_id: UUID4::new(),
254                ts_init,
255                correlation_id: None,
256                params: None,
257            }));
258            client.execute_subscribe(SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
259                instrument_id: *instrument_id,
260                client_id: cmd.client_id,
261                venue: Some(venue),
262                command_id: UUID4::new(),
263                ts_init,
264                correlation_id: None,
265                params: None,
266            }));
267            client.execute_subscribe(SubscribeCommand::InstrumentStatus(
268                SubscribeInstrumentStatus {
269                    instrument_id: *instrument_id,
270                    client_id: cmd.client_id,
271                    venue: Some(venue),
272                    command_id: UUID4::new(),
273                    ts_init,
274                    correlation_id: None,
275                    params: None,
276                },
277            ));
278        }
279
280        log::info!(
281            "Forwarded {} quote + greeks + instrument status subscriptions to DataClient",
282            instrument_ids.len(),
283        );
284    }
285
286    /// Sets up the snapshot timer for periodic publishing.
287    fn setup_timer(
288        manager_rc: &Rc<RefCell<Self>>,
289        series_id: OptionSeriesId,
290        interval_ms: u64,
291        clock: &Rc<RefCell<dyn Clock>>,
292    ) -> Ustr {
293        let interval_ns = millis_to_nanos_unchecked(interval_ms as f64);
294        let publisher = OptionChainSlicePublisher::new(manager_rc);
295        let timer_name = Ustr::from(&format!("OptionChain|{series_id}|{interval_ms}"));
296
297        let now_ns = clock.borrow().timestamp_ns().as_u64();
298        let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
299
300        let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |event| publisher.publish(&event));
301        let callback = TimeEventCallback::from(callback_fn);
302
303        clock
304            .borrow_mut()
305            .set_timer_ns(
306                &timer_name,
307                interval_ns,
308                Some(start_time_ns.into()),
309                None,
310                Some(callback),
311                None,
312                None,
313            )
314            .expect(FAILED);
315
316        timer_name
317    }
318
319    /// Returns all instrument IDs in the full catalog (not just the active set).
320    #[must_use]
321    pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
322        self.aggregator.all_instrument_ids()
323    }
324
325    /// Returns the venue for this option chain.
326    #[must_use]
327    pub fn venue(&self) -> Venue {
328        self.aggregator.series_id().venue
329    }
330
331    /// Tears down this manager: unregisters all msgbus handlers and cancels the timer.
332    pub fn teardown(&mut self, clock: &Rc<RefCell<dyn Clock>>) {
333        // Unsubscribe from all currently active instruments
334        let instrument_ids = self.aggregator.instrument_ids();
335
336        // Unregister quote handlers
337        if let Some(handler) = self.quote_handlers.first() {
338            for instrument_id in &instrument_ids {
339                let topic = switchboard::get_quotes_topic(*instrument_id);
340                msgbus::unsubscribe_quotes(topic.into(), handler);
341            }
342        }
343
344        // Unregister greeks handlers
345        if let Some(handler) = self.greeks_handlers.first() {
346            for instrument_id in &instrument_ids {
347                let topic = switchboard::get_option_greeks_topic(*instrument_id);
348                msgbus::unsubscribe_option_greeks(topic.into(), handler);
349            }
350        }
351
352        // Cancel timer
353        if let Some(timer_name) = self.timer_name.take() {
354            let mut clk = clock.borrow_mut();
355            if clk.timer_exists(&timer_name) {
356                clk.cancel_timer(&timer_name);
357            }
358        }
359
360        self.quote_handlers.clear();
361        self.greeks_handlers.clear();
362    }
363
364    /// Routes incoming greeks to the aggregator.
365    ///
366    /// Also updates the ATM tracker from the forward price if `ForwardPrice` source is active,
367    /// and triggers deferred bootstrap on the first arrival.
368    pub fn handle_greeks(&mut self, greeks: &OptionGreeks) {
369        // Update ATM tracker from forward price (ForwardPrice source only)
370        self.aggregator
371            .atm_tracker_mut()
372            .update_from_option_greeks(greeks);
373        // Route greeks to aggregator for storage
374        self.aggregator.update_greeks(greeks);
375        // Check if first ATM arrival triggers deferred bootstrap
376        self.maybe_bootstrap();
377
378        if self.raw_mode
379            && self.bootstrapped
380            && self.aggregator.active_ids().contains(&greeks.instrument_id)
381        {
382            self.publish_slice(greeks.ts_event);
383        }
384    }
385
386    /// Handles an expired/settled instrument by removing it from the aggregator,
387    /// unregistering msgbus handlers, and pushing deferred wire unsubscribes.
388    ///
389    /// Returns `true` if the aggregator catalog is now empty (all instruments expired),
390    /// signaling the engine to tear down this entire manager.
391    pub fn handle_instrument_expired(&mut self, instrument_id: &InstrumentId) -> bool {
392        let was_active = self.aggregator.active_ids().contains(instrument_id);
393
394        if !self.aggregator.remove_instrument(instrument_id) {
395            return self.aggregator.is_catalog_empty();
396        }
397
398        if was_active {
399            // Unregister msgbus handlers for this instrument
400            if let Some(qh) = self.quote_handlers.first() {
401                let topic = switchboard::get_quotes_topic(*instrument_id);
402                msgbus::unsubscribe_quotes(topic.into(), qh);
403            }
404
405            if let Some(gh) = self.greeks_handlers.first() {
406                let topic = switchboard::get_option_greeks_topic(*instrument_id);
407                msgbus::unsubscribe_option_greeks(topic.into(), gh);
408            }
409
410            // Push deferred wire unsubscribes
411            self.push_unsubscribe_commands(*instrument_id);
412        }
413
414        log::info!(
415            "Removed expired instrument {instrument_id} from option chain {} (was_active={was_active}, remaining={})",
416            self.aggregator.series_id(),
417            self.aggregator.instruments().len(),
418        );
419
420        self.aggregator.is_catalog_empty()
421    }
422
423    /// Routes an incoming quote tick to the aggregator, then bootstraps if ready.
424    ///
425    /// This handles both option instrument quotes (aggregator) and ATM source quotes
426    /// (the aggregator's ATM tracker handles filtering internally).
427    pub fn handle_quote(&mut self, quote: &QuoteTick) {
428        self.aggregator.update_quote(quote);
429        self.maybe_bootstrap();
430
431        if self.raw_mode
432            && self.bootstrapped
433            && self.aggregator.active_ids().contains(&quote.instrument_id)
434        {
435            self.publish_slice(quote.ts_event);
436        }
437    }
438
439    /// Bootstraps the active instrument set on the first ATM price arrival.
440    ///
441    /// Computes active strikes, registers msgbus handlers for those instruments,
442    /// and pushes deferred wire subscriptions into the shared command queue.
443    fn maybe_bootstrap(&mut self) {
444        if self.bootstrapped {
445            return;
446        }
447
448        if self.aggregator.atm_tracker().atm_price().is_none() {
449            return;
450        }
451
452        // First ATM received — compute active set and register handlers
453        let active_ids = self.aggregator.recompute_active_set();
454        self.register_handlers_for_instruments_bulk(&active_ids);
455
456        for &id in &active_ids {
457            self.push_subscribe_commands(id);
458        }
459
460        self.bootstrapped = true;
461
462        log::info!(
463            "Bootstrapped option chain for {} ({} active instruments)",
464            self.aggregator.series_id(),
465            active_ids.len(),
466        );
467    }
468
469    /// Registers msgbus handlers for a batch of instruments.
470    fn register_handlers_for_instruments_bulk(&self, instrument_ids: &[InstrumentId]) {
471        for &id in instrument_ids {
472            self.register_handlers_for_instrument(id);
473        }
474    }
475
476    /// Adds a dynamically discovered instrument to this option chain.
477    ///
478    /// Registers msgbus handlers when the instrument falls in the active
479    /// range and forwards wire-level subscriptions via `client`.
480    /// Returns `true` if the instrument was newly inserted.
481    pub fn add_instrument(
482        &mut self,
483        instrument_id: InstrumentId,
484        strike: Price,
485        kind: OptionKind,
486        client: Option<&mut DataClientAdapter>,
487        clock: &Rc<RefCell<dyn Clock>>,
488    ) -> bool {
489        if !self.aggregator.add_instrument(instrument_id, strike, kind) {
490            return false;
491        }
492
493        if self.aggregator.active_ids().contains(&instrument_id) {
494            self.register_handlers_for_instrument(instrument_id);
495        }
496
497        let venue = self.aggregator.series_id().venue;
498        Self::forward_instrument_subscriptions(client, instrument_id, venue, clock);
499
500        log::info!(
501            "Added instrument {instrument_id} to option chain {} (active={})",
502            self.aggregator.series_id(),
503            self.aggregator.active_ids().contains(&instrument_id),
504        );
505
506        true
507    }
508
509    fn register_handlers_for_instrument(&self, instrument_id: InstrumentId) {
510        if let Some(qh) = self.quote_handlers.first().cloned() {
511            let topic = switchboard::get_quotes_topic(instrument_id);
512            msgbus::subscribe_quotes(topic.into(), qh, Some(self.msgbus_priority));
513        }
514
515        if let Some(gh) = self.greeks_handlers.first().cloned() {
516            let topic = switchboard::get_option_greeks_topic(instrument_id);
517            msgbus::subscribe_option_greeks(topic.into(), gh, Some(self.msgbus_priority));
518        }
519    }
520
521    /// Pushes deferred subscribe commands (quotes, greeks, instrument status) for a single instrument.
522    fn push_subscribe_commands(&self, instrument_id: InstrumentId) {
523        let venue = self.aggregator.series_id().venue;
524        let ts_init = self.clock.borrow().timestamp_ns();
525        let mut queue = self.deferred_cmd_queue.borrow_mut();
526        queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::Quotes(
527            SubscribeQuotes {
528                instrument_id,
529                client_id: None,
530                venue: Some(venue),
531                command_id: UUID4::new(),
532                ts_init,
533                correlation_id: None,
534                params: None,
535            },
536        )));
537        queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::OptionGreeks(
538            SubscribeOptionGreeks {
539                instrument_id,
540                client_id: None,
541                venue: Some(venue),
542                command_id: UUID4::new(),
543                ts_init,
544                correlation_id: None,
545                params: None,
546            },
547        )));
548        queue.push_back(DeferredCommand::Subscribe(
549            SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
550                instrument_id,
551                client_id: None,
552                venue: Some(venue),
553                command_id: UUID4::new(),
554                ts_init,
555                correlation_id: None,
556                params: None,
557            }),
558        ));
559    }
560
561    /// Pushes deferred unsubscribe commands (quotes, greeks, instrument status) for a single instrument.
562    fn push_unsubscribe_commands(&self, instrument_id: InstrumentId) {
563        let venue = self.aggregator.series_id().venue;
564        let ts_init = self.clock.borrow().timestamp_ns();
565        let mut queue = self.deferred_cmd_queue.borrow_mut();
566        queue.push_back(DeferredCommand::Unsubscribe(UnsubscribeCommand::Quotes(
567            UnsubscribeQuotes {
568                instrument_id,
569                client_id: None,
570                venue: Some(venue),
571                command_id: UUID4::new(),
572                ts_init,
573                correlation_id: None,
574                params: None,
575            },
576        )));
577        queue.push_back(DeferredCommand::Unsubscribe(
578            UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
579                instrument_id,
580                client_id: None,
581                venue: Some(venue),
582                command_id: UUID4::new(),
583                ts_init,
584                correlation_id: None,
585                params: None,
586            }),
587        ));
588        queue.push_back(DeferredCommand::Unsubscribe(
589            UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
590                instrument_id,
591                client_id: None,
592                venue: Some(venue),
593                command_id: UUID4::new(),
594                ts_init,
595                correlation_id: None,
596                params: None,
597            }),
598        ));
599    }
600
601    /// Forwards quote, greeks, and instrument status subscriptions for a single instrument.
602    fn forward_instrument_subscriptions(
603        client: Option<&mut DataClientAdapter>,
604        instrument_id: InstrumentId,
605        venue: Venue,
606        clock: &Rc<RefCell<dyn Clock>>,
607    ) {
608        let Some(client) = client else {
609            log::error!(
610                "Cannot forward subscriptions for {instrument_id}: no client for venue={venue}",
611            );
612            return;
613        };
614
615        let ts_init = clock.borrow().timestamp_ns();
616
617        client.execute_subscribe(SubscribeCommand::Quotes(SubscribeQuotes {
618            instrument_id,
619            client_id: None,
620            venue: Some(venue),
621            command_id: UUID4::new(),
622            ts_init,
623            correlation_id: None,
624            params: None,
625        }));
626        client.execute_subscribe(SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
627            instrument_id,
628            client_id: None,
629            venue: Some(venue),
630            command_id: UUID4::new(),
631            ts_init,
632            correlation_id: None,
633            params: None,
634        }));
635        client.execute_subscribe(SubscribeCommand::InstrumentStatus(
636            SubscribeInstrumentStatus {
637                instrument_id,
638                client_id: None,
639                venue: Some(venue),
640                command_id: UUID4::new(),
641                ts_init,
642                correlation_id: None,
643                params: None,
644            },
645        ));
646    }
647
648    /// Checks if ATM has shifted and rebalances msgbus subscriptions if needed.
649    fn maybe_rebalance(&mut self, now_ns: nautilus_core::UnixNanos) {
650        let Some(action) = self.aggregator.check_rebalance(now_ns) else {
651            return;
652        };
653
654        // Unsubscribe removed instruments from msgbus
655        if let Some(qh) = self.quote_handlers.first() {
656            for id in &action.remove {
657                msgbus::unsubscribe_quotes(switchboard::get_quotes_topic(*id).into(), qh);
658            }
659        }
660
661        if let Some(gh) = self.greeks_handlers.first() {
662            for id in &action.remove {
663                msgbus::unsubscribe_option_greeks(
664                    switchboard::get_option_greeks_topic(*id).into(),
665                    gh,
666                );
667            }
668        }
669
670        // Subscribe new instruments on msgbus
671        if let Some(qh) = self.quote_handlers.first().cloned() {
672            for id in &action.add {
673                msgbus::subscribe_quotes(
674                    switchboard::get_quotes_topic(*id).into(),
675                    qh.clone(),
676                    Some(self.msgbus_priority),
677                );
678            }
679        }
680
681        if let Some(gh) = self.greeks_handlers.first().cloned() {
682            for id in &action.add {
683                msgbus::subscribe_option_greeks(
684                    switchboard::get_option_greeks_topic(*id).into(),
685                    gh.clone(),
686                    Some(self.msgbus_priority),
687                );
688            }
689        }
690
691        // Push deferred wire-level changes into the shared command queue
692        for &id in &action.add {
693            self.push_subscribe_commands(id);
694        }
695
696        for &id in &action.remove {
697            self.push_unsubscribe_commands(id);
698        }
699
700        if !action.add.is_empty() || !action.remove.is_empty() {
701            log::info!(
702                "Rebalanced option chain for {}: +{} -{} instruments",
703                self.aggregator.series_id(),
704                action.add.len(),
705                action.remove.len(),
706            );
707        }
708
709        // Apply state changes to aggregator
710        self.aggregator.apply_rebalance(&action, now_ns);
711    }
712
713    /// Takes the accumulated snapshot and publishes it to the msgbus.
714    pub fn publish_slice(&mut self, ts: nautilus_core::UnixNanos) {
715        // Proactive expiry safeguard
716        if self.aggregator.is_expired(ts) {
717            self.deferred_cmd_queue
718                .borrow_mut()
719                .push_back(DeferredCommand::ExpireSeries(self.aggregator.series_id()));
720            return;
721        }
722
723        self.maybe_rebalance(ts);
724
725        let series_id = self.aggregator.series_id();
726        let slice = self.aggregator.snapshot(ts);
727
728        if slice.is_empty() {
729            log::debug!("OptionChainSlice empty for {series_id}, skipping publish");
730            return;
731        }
732
733        log::debug!(
734            "Publishing OptionChainSlice for {} (calls={}, puts={})",
735            series_id,
736            slice.call_count(),
737            slice.put_count(),
738        );
739        msgbus::publish_option_chain(self.topic, &slice);
740    }
741
742    /// Resolves instruments from cache that match the given option series.
743    fn resolve_instruments(
744        cache: &Rc<RefCell<Cache>>,
745        series_id: &OptionSeriesId,
746    ) -> HashMap<InstrumentId, (Price, OptionKind)> {
747        let cache = cache.borrow();
748        let mut map = HashMap::new();
749
750        for instrument in cache.instruments(&series_id.venue, Some(&series_id.underlying)) {
751            let Some(expiration) = instrument.expiration_ns() else {
752                continue;
753            };
754
755            if expiration != series_id.expiration_ns {
756                continue;
757            }
758
759            if instrument.settlement_currency().code != series_id.settlement_currency {
760                continue;
761            }
762
763            let Some(strike) = instrument.strike_price() else {
764                continue;
765            };
766
767            let Some(kind) = instrument.option_kind() else {
768                continue;
769            };
770
771            map.insert(instrument.id(), (strike, kind));
772        }
773
774        map
775    }
776}
777
778#[cfg(test)]
779mod tests {
780    use std::collections::VecDeque;
781
782    use nautilus_common::clock::TestClock;
783    use nautilus_core::UnixNanos;
784    use nautilus_model::{data::option_chain::StrikeRange, identifiers::Venue, types::Quantity};
785    use rstest::*;
786
787    use super::*;
788
789    fn make_series_id() -> OptionSeriesId {
790        OptionSeriesId::new(
791            Venue::new("DERIBIT"),
792            ustr::Ustr::from("BTC"),
793            ustr::Ustr::from("BTC"),
794            UnixNanos::from(1_700_000_000_000_000_000u64),
795        )
796    }
797
798    fn make_test_queue() -> DeferredCommandQueue {
799        Rc::new(RefCell::new(VecDeque::new()))
800    }
801
802    fn make_manager() -> (OptionChainManager, DeferredCommandQueue) {
803        let series_id = make_series_id();
804        let topic = switchboard::get_option_chain_topic(series_id);
805        let tracker = AtmTracker::new();
806        let aggregator = OptionChainAggregator::new(
807            series_id,
808            StrikeRange::Fixed(vec![]),
809            tracker,
810            HashMap::new(),
811        );
812        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
813        let queue = make_test_queue();
814
815        let manager = OptionChainManager {
816            aggregator,
817            topic,
818            quote_handlers: Vec::new(),
819            greeks_handlers: Vec::new(),
820            timer_name: None,
821            msgbus_priority: 0,
822            bootstrapped: true,
823            deferred_cmd_queue: queue.clone(),
824            clock,
825            raw_mode: false,
826        };
827        (manager, queue)
828    }
829
830    #[rstest]
831    fn test_manager_handle_quote_no_instrument() {
832        let (mut manager, _queue) = make_manager();
833
834        // Should not panic — quote for unknown instrument
835        let quote = QuoteTick::new(
836            InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
837            Price::from("100.00"),
838            Price::from("101.00"),
839            Quantity::from("1.0"),
840            Quantity::from("1.0"),
841            UnixNanos::from(1u64),
842            UnixNanos::from(1u64),
843        );
844        manager.handle_quote(&quote);
845    }
846
847    #[rstest]
848    fn test_manager_publish_slice_empty() {
849        let (mut manager, _queue) = make_manager();
850        // Should not panic — empty slice skips publish
851        manager.publish_slice(UnixNanos::from(100u64));
852    }
853
854    #[rstest]
855    fn test_manager_teardown_no_handlers() {
856        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
857        let (mut manager, _queue) = make_manager();
858        // Should not panic — no handlers to unregister
859        manager.teardown(&clock);
860        assert!(manager.quote_handlers.is_empty());
861    }
862
863    fn make_option_chain_manager() -> (OptionChainManager, DeferredCommandQueue) {
864        let series_id = make_series_id();
865        let topic = switchboard::get_option_chain_topic(series_id);
866
867        let strikes = [45000, 47500, 50000, 52500, 55000];
868        let mut instruments = HashMap::new();
869
870        for s in &strikes {
871            let strike = Price::from(&s.to_string());
872            let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
873            let put_id = InstrumentId::from(&format!("BTC-20240101-{s}-P.DERIBIT"));
874            instruments.insert(call_id, (strike, OptionKind::Call));
875            instruments.insert(put_id, (strike, OptionKind::Put));
876        }
877
878        let tracker = AtmTracker::new();
879        let aggregator = OptionChainAggregator::new(
880            series_id,
881            StrikeRange::AtmRelative {
882                strikes_above: 1,
883                strikes_below: 1,
884            },
885            tracker,
886            instruments,
887        );
888        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
889        let queue = make_test_queue();
890
891        let manager = OptionChainManager {
892            aggregator,
893            topic,
894            quote_handlers: Vec::new(),
895            greeks_handlers: Vec::new(),
896            timer_name: None,
897            msgbus_priority: 0,
898            bootstrapped: false,
899            deferred_cmd_queue: queue.clone(),
900            clock,
901            raw_mode: false,
902        };
903        (manager, queue)
904    }
905
906    fn bootstrap_via_greeks(manager: &mut OptionChainManager) {
907        use nautilus_model::data::option_chain::OptionGreeks;
908        let greeks = OptionGreeks {
909            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
910            underlying_price: Some(50000.0),
911            ..Default::default()
912        };
913        manager.handle_greeks(&greeks);
914    }
915
916    #[rstest]
917    fn test_manager_publish_slice_triggers_rebalance() {
918        let (mut manager, queue) = make_option_chain_manager();
919        // Initially no instruments active (ATM unknown, deferred)
920        assert_eq!(manager.aggregator.instrument_ids().len(), 0);
921
922        // Feed ATM near 50000 via greeks — bootstrap computes active set (3 strikes × 2 = 6)
923        bootstrap_via_greeks(&mut manager);
924        assert!(manager.bootstrapped);
925        assert_eq!(manager.aggregator.instrument_ids().len(), 6); // 3 strikes × 2
926
927        // Deferred queue should contain subscribe commands (6 instruments × 3 = 18 commands)
928        assert_eq!(queue.borrow().len(), 18);
929
930        // publish_slice should still work normally after bootstrap
931        manager.publish_slice(UnixNanos::from(100u64));
932        assert!(manager.aggregator.last_atm_strike().is_some());
933    }
934
935    #[rstest]
936    fn test_manager_add_instrument_new() {
937        let (mut manager, _queue) = make_option_chain_manager();
938        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
939        let new_id = InstrumentId::from("BTC-20240101-57500-C.DERIBIT");
940        let strike = Price::from("57500");
941        let count_before = manager.aggregator.instruments().len();
942
943        let result = manager.add_instrument(new_id, strike, OptionKind::Call, None, &clock);
944
945        assert!(result);
946        assert_eq!(manager.aggregator.instruments().len(), count_before + 1);
947    }
948
949    #[rstest]
950    fn test_manager_add_instrument_already_known() {
951        let (mut manager, _queue) = make_option_chain_manager();
952        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
953        let existing_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
954        let strike = Price::from("50000");
955        let count_before = manager.aggregator.instruments().len();
956
957        let result = manager.add_instrument(existing_id, strike, OptionKind::Call, None, &clock);
958
959        assert!(!result);
960        assert_eq!(manager.aggregator.instruments().len(), count_before);
961    }
962
963    #[rstest]
964    fn test_manager_deferred_bootstrap_on_first_atm() {
965        let (mut manager, queue) = make_option_chain_manager();
966        // Initially not bootstrapped, no active instruments
967        assert!(!manager.bootstrapped);
968        assert_eq!(manager.aggregator.instrument_ids().len(), 0);
969        assert!(queue.borrow().is_empty());
970
971        // Feed ATM via greeks → triggers bootstrap
972        bootstrap_via_greeks(&mut manager);
973
974        assert!(manager.bootstrapped);
975        assert_eq!(manager.aggregator.instrument_ids().len(), 6); // 3 strikes × 2
976        // 6 instruments × 3 commands each (quotes + greeks + instrument_status) = 18 deferred commands
977        assert_eq!(queue.borrow().len(), 18);
978
979        // All commands should be Subscribe variants
980        assert!(
981            queue
982                .borrow()
983                .iter()
984                .all(|cmd| matches!(cmd, DeferredCommand::Subscribe(_)))
985        );
986    }
987
988    #[rstest]
989    fn test_manager_bootstrap_idempotent() {
990        use nautilus_model::data::option_chain::OptionGreeks;
991
992        let (mut manager, _queue) = make_option_chain_manager();
993        bootstrap_via_greeks(&mut manager);
994        assert!(manager.bootstrapped);
995        let count = manager.aggregator.instrument_ids().len();
996
997        // Feed another ATM update — bootstrap should not fire again
998        let greeks2 = OptionGreeks {
999            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1000            underlying_price: Some(50200.0),
1001            ..Default::default()
1002        };
1003        manager.handle_greeks(&greeks2);
1004        assert_eq!(manager.aggregator.instrument_ids().len(), count);
1005    }
1006
1007    #[rstest]
1008    fn test_manager_fixed_range_bootstrapped_immediately() {
1009        // Fixed range manager is bootstrapped at creation (no ATM needed)
1010        let (manager, queue) = make_manager();
1011        assert!(manager.bootstrapped);
1012        assert!(queue.borrow().is_empty());
1013    }
1014
1015    #[rstest]
1016    fn test_manager_forward_price_bootstrap_from_greeks() {
1017        use nautilus_model::data::option_chain::OptionGreeks;
1018
1019        let (mut manager, _queue) = make_option_chain_manager();
1020        assert!(!manager.bootstrapped);
1021
1022        // First greeks with underlying_price → updates ATM tracker and triggers bootstrap
1023        let greeks = OptionGreeks {
1024            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1025            underlying_price: Some(50000.0),
1026            ..Default::default()
1027        };
1028        manager.handle_greeks(&greeks);
1029        assert!(manager.bootstrapped);
1030        // 3 strikes × 2 sides = 6 active instruments
1031        assert_eq!(manager.aggregator.instrument_ids().len(), 6);
1032    }
1033
1034    #[rstest]
1035    fn test_manager_forward_price_no_bootstrap_without_underlying() {
1036        use nautilus_model::data::option_chain::OptionGreeks;
1037
1038        let (mut manager, _queue) = make_option_chain_manager();
1039        assert!(!manager.bootstrapped);
1040
1041        // Greeks with no underlying_price → should not bootstrap
1042        let greeks = OptionGreeks {
1043            instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1044            underlying_price: None,
1045            ..Default::default()
1046        };
1047        manager.handle_greeks(&greeks);
1048        assert!(!manager.bootstrapped);
1049    }
1050
1051    #[rstest]
1052    fn test_handle_instrument_expired_removes_from_aggregator() {
1053        let (mut manager, queue) = make_option_chain_manager();
1054        // Bootstrap so instruments are active
1055        bootstrap_via_greeks(&mut manager);
1056        assert!(manager.bootstrapped);
1057        let initial_count = manager.aggregator.instruments().len();
1058        queue.borrow_mut().clear(); // clear bootstrap commands
1059
1060        let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1061        let is_empty = manager.handle_instrument_expired(&expired_id);
1062
1063        assert!(!is_empty);
1064        assert_eq!(manager.aggregator.instruments().len(), initial_count - 1);
1065        assert!(!manager.aggregator.active_ids().contains(&expired_id));
1066    }
1067
1068    #[rstest]
1069    fn test_handle_instrument_expired_pushes_deferred_unsubscribes() {
1070        let (mut manager, queue) = make_option_chain_manager();
1071        bootstrap_via_greeks(&mut manager);
1072        queue.borrow_mut().clear();
1073
1074        let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1075        manager.handle_instrument_expired(&expired_id);
1076
1077        // Should push 3 unsubscribe commands (quotes + greeks + instrument_status)
1078        let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1079        assert_eq!(cmds.len(), 3);
1080        assert!(
1081            cmds.iter()
1082                .all(|c| matches!(c, DeferredCommand::Unsubscribe(_)))
1083        );
1084    }
1085
1086    #[rstest]
1087    fn test_handle_instrument_expired_returns_true_when_last() {
1088        let series_id = make_series_id();
1089        let topic = switchboard::get_option_chain_topic(series_id);
1090        let call_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1091        let strike = Price::from("50000");
1092        let mut instruments = HashMap::new();
1093        instruments.insert(call_id, (strike, OptionKind::Call));
1094        let tracker = AtmTracker::new();
1095        let aggregator = OptionChainAggregator::new(
1096            series_id,
1097            StrikeRange::Fixed(vec![strike]),
1098            tracker,
1099            instruments,
1100        );
1101        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1102        let queue = make_test_queue();
1103
1104        let mut manager = OptionChainManager {
1105            aggregator,
1106            topic,
1107            quote_handlers: Vec::new(),
1108            greeks_handlers: Vec::new(),
1109            timer_name: None,
1110            msgbus_priority: 0,
1111            bootstrapped: true,
1112            deferred_cmd_queue: queue,
1113            clock,
1114            raw_mode: false,
1115        };
1116
1117        let is_empty = manager.handle_instrument_expired(&call_id);
1118        assert!(is_empty);
1119        assert!(manager.aggregator.is_catalog_empty());
1120    }
1121
1122    #[rstest]
1123    fn test_handle_instrument_expired_unknown_noop() {
1124        let (mut manager, queue) = make_manager();
1125        queue.borrow_mut().clear();
1126
1127        let unknown = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
1128        let is_empty = manager.handle_instrument_expired(&unknown);
1129
1130        // Empty manager returns true (catalog was already empty)
1131        assert!(is_empty);
1132        assert!(queue.borrow().is_empty()); // no deferred commands pushed
1133    }
1134
1135    #[rstest]
1136    fn test_publish_slice_pushes_expire_series_when_expired() {
1137        let (mut manager, queue) = make_option_chain_manager();
1138        bootstrap_via_greeks(&mut manager);
1139        queue.borrow_mut().clear();
1140
1141        // Publish at the expiration timestamp — should push ExpireSeries, not publish
1142        let expiry_ns = manager.aggregator.series_id().expiration_ns;
1143        manager.publish_slice(expiry_ns);
1144
1145        let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1146        assert_eq!(cmds.len(), 1);
1147        assert!(matches!(cmds[0], DeferredCommand::ExpireSeries(_)));
1148    }
1149
1150    #[rstest]
1151    fn test_expired_instrument_unsubscribes_include_instrument_status() {
1152        let (mut manager, queue) = make_option_chain_manager();
1153        bootstrap_via_greeks(&mut manager);
1154        queue.borrow_mut().clear();
1155
1156        let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1157        manager.handle_instrument_expired(&expired_id);
1158
1159        let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1160        // Should have exactly one InstrumentStatus unsubscribe among the 3
1161        let status_unsubs = cmds
1162            .iter()
1163            .filter(|c| {
1164                matches!(
1165                    c,
1166                    DeferredCommand::Unsubscribe(UnsubscribeCommand::InstrumentStatus(_))
1167                )
1168            })
1169            .count();
1170        assert_eq!(status_unsubs, 1);
1171    }
1172}