1use std::{cell::RefCell, collections::HashMap, rc::Rc};
24
25use nautilus_common::{
26 cache::Cache,
27 clock::Clock,
28 messages::data::{
29 SubscribeCommand, SubscribeInstrumentStatus, SubscribeOptionChain, SubscribeOptionGreeks,
30 SubscribeQuotes, UnsubscribeCommand, UnsubscribeInstrumentStatus, UnsubscribeOptionGreeks,
31 UnsubscribeQuotes,
32 },
33 msgbus::{self, MStr, Topic, TypedHandler, switchboard},
34 timer::{TimeEvent, TimeEventCallback},
35};
36use nautilus_core::{UUID4, correctness::FAILED, datetime::millis_to_nanos_unchecked};
37use nautilus_model::{
38 data::{QuoteTick, option_chain::OptionGreeks},
39 enums::OptionKind,
40 identifiers::{InstrumentId, OptionSeriesId, Venue},
41 instruments::Instrument,
42 types::Price,
43};
44use ustr::Ustr;
45
46use super::{
47 AtmTracker, OptionChainAggregator,
48 handlers::{OptionChainGreeksHandler, OptionChainQuoteHandler, OptionChainSlicePublisher},
49};
50use crate::{
51 client::DataClientAdapter,
52 engine::{DeferredCommand, DeferredCommandQueue},
53};
54
55#[derive(Debug)]
61pub struct OptionChainManager {
62 aggregator: OptionChainAggregator,
63 topic: MStr<Topic>,
64 quote_handlers: Vec<TypedHandler<QuoteTick>>,
65 greeks_handlers: Vec<TypedHandler<OptionGreeks>>,
66 timer_name: Option<Ustr>,
67 msgbus_priority: u8,
68 bootstrapped: bool,
70 deferred_cmd_queue: DeferredCommandQueue,
72 clock: Rc<RefCell<dyn Clock>>,
74 raw_mode: bool,
76}
77
78impl OptionChainManager {
79 #[expect(clippy::too_many_arguments)]
86 pub(crate) fn create_and_setup(
87 series_id: OptionSeriesId,
88 cache: &Rc<RefCell<Cache>>,
89 cmd: &SubscribeOptionChain,
90 clock: &Rc<RefCell<dyn Clock>>,
91 msgbus_priority: u8,
92 client: Option<&mut DataClientAdapter>,
93 initial_atm_price: Option<Price>,
94 deferred_cmd_queue: DeferredCommandQueue,
95 ) -> Rc<RefCell<Self>> {
96 let topic = switchboard::get_option_chain_topic(series_id);
97 let instruments = Self::resolve_instruments(cache, &series_id);
98
99 let mut tracker = AtmTracker::new();
100
101 if let Some((strike, _)) = instruments.values().next() {
103 tracker.set_forward_precision(strike.precision);
104 }
105
106 if let Some(price) = initial_atm_price {
107 tracker.set_initial_price(price);
108 log::info!("Pre-populated ATM with forward price: {price}");
109 }
110 let aggregator =
111 OptionChainAggregator::new(series_id, cmd.strike_range.clone(), tracker, instruments);
112
113 let active_instrument_ids = aggregator.instrument_ids();
116 let all_instrument_ids = aggregator.all_instrument_ids();
117 let bootstrapped = !active_instrument_ids.is_empty() || all_instrument_ids.is_empty();
119
120 let raw_mode = cmd.snapshot_interval_ms.is_none();
121
122 let manager = Self {
123 aggregator,
124 topic,
125 quote_handlers: Vec::new(),
126 greeks_handlers: Vec::new(),
127 timer_name: None,
128 msgbus_priority,
129 bootstrapped,
130 deferred_cmd_queue,
131 clock: clock.clone(),
132 raw_mode,
133 };
134 let manager_rc = Rc::new(RefCell::new(manager));
135
136 let (quote_handlers, _quote_handler) = Self::register_quote_handlers(
138 &manager_rc,
139 &active_instrument_ids,
140 series_id,
141 msgbus_priority,
142 );
143 let greeks_handlers = Self::register_greeks_handlers(
144 &manager_rc,
145 &active_instrument_ids,
146 series_id,
147 msgbus_priority,
148 );
149
150 Self::forward_client_subscriptions(
153 client,
154 &active_instrument_ids,
155 cmd,
156 series_id.venue,
157 clock,
158 );
159
160 let timer_name = cmd
161 .snapshot_interval_ms
162 .map(|ms| Self::setup_timer(&manager_rc, series_id, ms, clock));
163
164 {
165 let mut mgr = manager_rc.borrow_mut();
166 mgr.quote_handlers = quote_handlers;
167 mgr.greeks_handlers = greeks_handlers;
168 mgr.timer_name = timer_name;
169 }
170
171 let mode_str = match cmd.snapshot_interval_ms {
172 Some(ms) => format!("interval={ms}ms"),
173 None => "mode=raw".to_string(),
174 };
175 log::info!(
176 "Subscribed option chain for {series_id} ({} active/{} total instruments, {mode_str})",
177 active_instrument_ids.len(),
178 all_instrument_ids.len(),
179 );
180
181 manager_rc
182 }
183
184 fn register_quote_handlers(
189 manager_rc: &Rc<RefCell<Self>>,
190 instrument_ids: &[InstrumentId],
191 series_id: OptionSeriesId,
192 priority: u8,
193 ) -> (Vec<TypedHandler<QuoteTick>>, TypedHandler<QuoteTick>) {
194 let quote_handler = TypedHandler::new(OptionChainQuoteHandler::new(manager_rc, series_id));
195 let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
197 handlers.push(quote_handler.clone());
198
199 for instrument_id in instrument_ids {
200 let topic = switchboard::get_quotes_topic(*instrument_id);
201 msgbus::subscribe_quotes(topic.into(), quote_handler.clone(), Some(priority));
202 handlers.push(quote_handler.clone());
203 }
204 (handlers, quote_handler)
205 }
206
207 fn register_greeks_handlers(
212 manager_rc: &Rc<RefCell<Self>>,
213 instrument_ids: &[InstrumentId],
214 series_id: OptionSeriesId,
215 priority: u8,
216 ) -> Vec<TypedHandler<OptionGreeks>> {
217 let greeks_handler =
218 TypedHandler::new(OptionChainGreeksHandler::new(manager_rc, series_id));
219 let mut handlers = Vec::with_capacity(instrument_ids.len() + 1);
221 handlers.push(greeks_handler.clone());
222
223 for instrument_id in instrument_ids {
224 let topic = switchboard::get_option_greeks_topic(*instrument_id);
225 msgbus::subscribe_option_greeks(topic.into(), greeks_handler.clone(), Some(priority));
226 handlers.push(greeks_handler.clone());
227 }
228 handlers
229 }
230
231 fn forward_client_subscriptions(
233 client: Option<&mut DataClientAdapter>,
234 instrument_ids: &[InstrumentId],
235 cmd: &SubscribeOptionChain,
236 venue: Venue,
237 clock: &Rc<RefCell<dyn Clock>>,
238 ) {
239 let ts_init = clock.borrow().timestamp_ns();
240
241 let Some(client) = client else {
242 log::error!(
243 "Cannot forward option chain subscriptions: no client found for venue={venue}",
244 );
245 return;
246 };
247
248 for instrument_id in instrument_ids {
249 client.execute_subscribe(SubscribeCommand::Quotes(SubscribeQuotes {
250 instrument_id: *instrument_id,
251 client_id: cmd.client_id,
252 venue: Some(venue),
253 command_id: UUID4::new(),
254 ts_init,
255 correlation_id: None,
256 params: None,
257 }));
258 client.execute_subscribe(SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
259 instrument_id: *instrument_id,
260 client_id: cmd.client_id,
261 venue: Some(venue),
262 command_id: UUID4::new(),
263 ts_init,
264 correlation_id: None,
265 params: None,
266 }));
267 client.execute_subscribe(SubscribeCommand::InstrumentStatus(
268 SubscribeInstrumentStatus {
269 instrument_id: *instrument_id,
270 client_id: cmd.client_id,
271 venue: Some(venue),
272 command_id: UUID4::new(),
273 ts_init,
274 correlation_id: None,
275 params: None,
276 },
277 ));
278 }
279
280 log::info!(
281 "Forwarded {} quote + greeks + instrument status subscriptions to DataClient",
282 instrument_ids.len(),
283 );
284 }
285
286 fn setup_timer(
288 manager_rc: &Rc<RefCell<Self>>,
289 series_id: OptionSeriesId,
290 interval_ms: u64,
291 clock: &Rc<RefCell<dyn Clock>>,
292 ) -> Ustr {
293 let interval_ns = millis_to_nanos_unchecked(interval_ms as f64);
294 let publisher = OptionChainSlicePublisher::new(manager_rc);
295 let timer_name = Ustr::from(&format!("OptionChain|{series_id}|{interval_ms}"));
296
297 let now_ns = clock.borrow().timestamp_ns().as_u64();
298 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
299
300 let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |event| publisher.publish(&event));
301 let callback = TimeEventCallback::from(callback_fn);
302
303 clock
304 .borrow_mut()
305 .set_timer_ns(
306 &timer_name,
307 interval_ns,
308 Some(start_time_ns.into()),
309 None,
310 Some(callback),
311 None,
312 None,
313 )
314 .expect(FAILED);
315
316 timer_name
317 }
318
319 #[must_use]
321 pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
322 self.aggregator.all_instrument_ids()
323 }
324
325 #[must_use]
327 pub fn venue(&self) -> Venue {
328 self.aggregator.series_id().venue
329 }
330
331 pub fn teardown(&mut self, clock: &Rc<RefCell<dyn Clock>>) {
333 let instrument_ids = self.aggregator.instrument_ids();
335
336 if let Some(handler) = self.quote_handlers.first() {
338 for instrument_id in &instrument_ids {
339 let topic = switchboard::get_quotes_topic(*instrument_id);
340 msgbus::unsubscribe_quotes(topic.into(), handler);
341 }
342 }
343
344 if let Some(handler) = self.greeks_handlers.first() {
346 for instrument_id in &instrument_ids {
347 let topic = switchboard::get_option_greeks_topic(*instrument_id);
348 msgbus::unsubscribe_option_greeks(topic.into(), handler);
349 }
350 }
351
352 if let Some(timer_name) = self.timer_name.take() {
354 let mut clk = clock.borrow_mut();
355 if clk.timer_exists(&timer_name) {
356 clk.cancel_timer(&timer_name);
357 }
358 }
359
360 self.quote_handlers.clear();
361 self.greeks_handlers.clear();
362 }
363
364 pub fn handle_greeks(&mut self, greeks: &OptionGreeks) {
369 self.aggregator
371 .atm_tracker_mut()
372 .update_from_option_greeks(greeks);
373 self.aggregator.update_greeks(greeks);
375 self.maybe_bootstrap();
377
378 if self.raw_mode
379 && self.bootstrapped
380 && self.aggregator.active_ids().contains(&greeks.instrument_id)
381 {
382 self.publish_slice(greeks.ts_event);
383 }
384 }
385
386 pub fn handle_instrument_expired(&mut self, instrument_id: &InstrumentId) -> bool {
392 let was_active = self.aggregator.active_ids().contains(instrument_id);
393
394 if !self.aggregator.remove_instrument(instrument_id) {
395 return self.aggregator.is_catalog_empty();
396 }
397
398 if was_active {
399 if let Some(qh) = self.quote_handlers.first() {
401 let topic = switchboard::get_quotes_topic(*instrument_id);
402 msgbus::unsubscribe_quotes(topic.into(), qh);
403 }
404
405 if let Some(gh) = self.greeks_handlers.first() {
406 let topic = switchboard::get_option_greeks_topic(*instrument_id);
407 msgbus::unsubscribe_option_greeks(topic.into(), gh);
408 }
409
410 self.push_unsubscribe_commands(*instrument_id);
412 }
413
414 log::info!(
415 "Removed expired instrument {instrument_id} from option chain {} (was_active={was_active}, remaining={})",
416 self.aggregator.series_id(),
417 self.aggregator.instruments().len(),
418 );
419
420 self.aggregator.is_catalog_empty()
421 }
422
423 pub fn handle_quote(&mut self, quote: &QuoteTick) {
428 self.aggregator.update_quote(quote);
429 self.maybe_bootstrap();
430
431 if self.raw_mode
432 && self.bootstrapped
433 && self.aggregator.active_ids().contains("e.instrument_id)
434 {
435 self.publish_slice(quote.ts_event);
436 }
437 }
438
439 fn maybe_bootstrap(&mut self) {
444 if self.bootstrapped {
445 return;
446 }
447
448 if self.aggregator.atm_tracker().atm_price().is_none() {
449 return;
450 }
451
452 let active_ids = self.aggregator.recompute_active_set();
454 self.register_handlers_for_instruments_bulk(&active_ids);
455
456 for &id in &active_ids {
457 self.push_subscribe_commands(id);
458 }
459
460 self.bootstrapped = true;
461
462 log::info!(
463 "Bootstrapped option chain for {} ({} active instruments)",
464 self.aggregator.series_id(),
465 active_ids.len(),
466 );
467 }
468
469 fn register_handlers_for_instruments_bulk(&self, instrument_ids: &[InstrumentId]) {
471 for &id in instrument_ids {
472 self.register_handlers_for_instrument(id);
473 }
474 }
475
476 pub fn add_instrument(
482 &mut self,
483 instrument_id: InstrumentId,
484 strike: Price,
485 kind: OptionKind,
486 client: Option<&mut DataClientAdapter>,
487 clock: &Rc<RefCell<dyn Clock>>,
488 ) -> bool {
489 if !self.aggregator.add_instrument(instrument_id, strike, kind) {
490 return false;
491 }
492
493 if self.aggregator.active_ids().contains(&instrument_id) {
494 self.register_handlers_for_instrument(instrument_id);
495 }
496
497 let venue = self.aggregator.series_id().venue;
498 Self::forward_instrument_subscriptions(client, instrument_id, venue, clock);
499
500 log::info!(
501 "Added instrument {instrument_id} to option chain {} (active={})",
502 self.aggregator.series_id(),
503 self.aggregator.active_ids().contains(&instrument_id),
504 );
505
506 true
507 }
508
509 fn register_handlers_for_instrument(&self, instrument_id: InstrumentId) {
510 if let Some(qh) = self.quote_handlers.first().cloned() {
511 let topic = switchboard::get_quotes_topic(instrument_id);
512 msgbus::subscribe_quotes(topic.into(), qh, Some(self.msgbus_priority));
513 }
514
515 if let Some(gh) = self.greeks_handlers.first().cloned() {
516 let topic = switchboard::get_option_greeks_topic(instrument_id);
517 msgbus::subscribe_option_greeks(topic.into(), gh, Some(self.msgbus_priority));
518 }
519 }
520
521 fn push_subscribe_commands(&self, instrument_id: InstrumentId) {
523 let venue = self.aggregator.series_id().venue;
524 let ts_init = self.clock.borrow().timestamp_ns();
525 let mut queue = self.deferred_cmd_queue.borrow_mut();
526 queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::Quotes(
527 SubscribeQuotes {
528 instrument_id,
529 client_id: None,
530 venue: Some(venue),
531 command_id: UUID4::new(),
532 ts_init,
533 correlation_id: None,
534 params: None,
535 },
536 )));
537 queue.push_back(DeferredCommand::Subscribe(SubscribeCommand::OptionGreeks(
538 SubscribeOptionGreeks {
539 instrument_id,
540 client_id: None,
541 venue: Some(venue),
542 command_id: UUID4::new(),
543 ts_init,
544 correlation_id: None,
545 params: None,
546 },
547 )));
548 queue.push_back(DeferredCommand::Subscribe(
549 SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
550 instrument_id,
551 client_id: None,
552 venue: Some(venue),
553 command_id: UUID4::new(),
554 ts_init,
555 correlation_id: None,
556 params: None,
557 }),
558 ));
559 }
560
561 fn push_unsubscribe_commands(&self, instrument_id: InstrumentId) {
563 let venue = self.aggregator.series_id().venue;
564 let ts_init = self.clock.borrow().timestamp_ns();
565 let mut queue = self.deferred_cmd_queue.borrow_mut();
566 queue.push_back(DeferredCommand::Unsubscribe(UnsubscribeCommand::Quotes(
567 UnsubscribeQuotes {
568 instrument_id,
569 client_id: None,
570 venue: Some(venue),
571 command_id: UUID4::new(),
572 ts_init,
573 correlation_id: None,
574 params: None,
575 },
576 )));
577 queue.push_back(DeferredCommand::Unsubscribe(
578 UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
579 instrument_id,
580 client_id: None,
581 venue: Some(venue),
582 command_id: UUID4::new(),
583 ts_init,
584 correlation_id: None,
585 params: None,
586 }),
587 ));
588 queue.push_back(DeferredCommand::Unsubscribe(
589 UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
590 instrument_id,
591 client_id: None,
592 venue: Some(venue),
593 command_id: UUID4::new(),
594 ts_init,
595 correlation_id: None,
596 params: None,
597 }),
598 ));
599 }
600
601 fn forward_instrument_subscriptions(
603 client: Option<&mut DataClientAdapter>,
604 instrument_id: InstrumentId,
605 venue: Venue,
606 clock: &Rc<RefCell<dyn Clock>>,
607 ) {
608 let Some(client) = client else {
609 log::error!(
610 "Cannot forward subscriptions for {instrument_id}: no client for venue={venue}",
611 );
612 return;
613 };
614
615 let ts_init = clock.borrow().timestamp_ns();
616
617 client.execute_subscribe(SubscribeCommand::Quotes(SubscribeQuotes {
618 instrument_id,
619 client_id: None,
620 venue: Some(venue),
621 command_id: UUID4::new(),
622 ts_init,
623 correlation_id: None,
624 params: None,
625 }));
626 client.execute_subscribe(SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
627 instrument_id,
628 client_id: None,
629 venue: Some(venue),
630 command_id: UUID4::new(),
631 ts_init,
632 correlation_id: None,
633 params: None,
634 }));
635 client.execute_subscribe(SubscribeCommand::InstrumentStatus(
636 SubscribeInstrumentStatus {
637 instrument_id,
638 client_id: None,
639 venue: Some(venue),
640 command_id: UUID4::new(),
641 ts_init,
642 correlation_id: None,
643 params: None,
644 },
645 ));
646 }
647
648 fn maybe_rebalance(&mut self, now_ns: nautilus_core::UnixNanos) {
650 let Some(action) = self.aggregator.check_rebalance(now_ns) else {
651 return;
652 };
653
654 if let Some(qh) = self.quote_handlers.first() {
656 for id in &action.remove {
657 msgbus::unsubscribe_quotes(switchboard::get_quotes_topic(*id).into(), qh);
658 }
659 }
660
661 if let Some(gh) = self.greeks_handlers.first() {
662 for id in &action.remove {
663 msgbus::unsubscribe_option_greeks(
664 switchboard::get_option_greeks_topic(*id).into(),
665 gh,
666 );
667 }
668 }
669
670 if let Some(qh) = self.quote_handlers.first().cloned() {
672 for id in &action.add {
673 msgbus::subscribe_quotes(
674 switchboard::get_quotes_topic(*id).into(),
675 qh.clone(),
676 Some(self.msgbus_priority),
677 );
678 }
679 }
680
681 if let Some(gh) = self.greeks_handlers.first().cloned() {
682 for id in &action.add {
683 msgbus::subscribe_option_greeks(
684 switchboard::get_option_greeks_topic(*id).into(),
685 gh.clone(),
686 Some(self.msgbus_priority),
687 );
688 }
689 }
690
691 for &id in &action.add {
693 self.push_subscribe_commands(id);
694 }
695
696 for &id in &action.remove {
697 self.push_unsubscribe_commands(id);
698 }
699
700 if !action.add.is_empty() || !action.remove.is_empty() {
701 log::info!(
702 "Rebalanced option chain for {}: +{} -{} instruments",
703 self.aggregator.series_id(),
704 action.add.len(),
705 action.remove.len(),
706 );
707 }
708
709 self.aggregator.apply_rebalance(&action, now_ns);
711 }
712
713 pub fn publish_slice(&mut self, ts: nautilus_core::UnixNanos) {
715 if self.aggregator.is_expired(ts) {
717 self.deferred_cmd_queue
718 .borrow_mut()
719 .push_back(DeferredCommand::ExpireSeries(self.aggregator.series_id()));
720 return;
721 }
722
723 self.maybe_rebalance(ts);
724
725 let series_id = self.aggregator.series_id();
726 let slice = self.aggregator.snapshot(ts);
727
728 if slice.is_empty() {
729 log::debug!("OptionChainSlice empty for {series_id}, skipping publish");
730 return;
731 }
732
733 log::debug!(
734 "Publishing OptionChainSlice for {} (calls={}, puts={})",
735 series_id,
736 slice.call_count(),
737 slice.put_count(),
738 );
739 msgbus::publish_option_chain(self.topic, &slice);
740 }
741
742 fn resolve_instruments(
744 cache: &Rc<RefCell<Cache>>,
745 series_id: &OptionSeriesId,
746 ) -> HashMap<InstrumentId, (Price, OptionKind)> {
747 let cache = cache.borrow();
748 let mut map = HashMap::new();
749
750 for instrument in cache.instruments(&series_id.venue, Some(&series_id.underlying)) {
751 let Some(expiration) = instrument.expiration_ns() else {
752 continue;
753 };
754
755 if expiration != series_id.expiration_ns {
756 continue;
757 }
758
759 if instrument.settlement_currency().code != series_id.settlement_currency {
760 continue;
761 }
762
763 let Some(strike) = instrument.strike_price() else {
764 continue;
765 };
766
767 let Some(kind) = instrument.option_kind() else {
768 continue;
769 };
770
771 map.insert(instrument.id(), (strike, kind));
772 }
773
774 map
775 }
776}
777
778#[cfg(test)]
779mod tests {
780 use std::collections::VecDeque;
781
782 use nautilus_common::clock::TestClock;
783 use nautilus_core::UnixNanos;
784 use nautilus_model::{data::option_chain::StrikeRange, identifiers::Venue, types::Quantity};
785 use rstest::*;
786
787 use super::*;
788
789 fn make_series_id() -> OptionSeriesId {
790 OptionSeriesId::new(
791 Venue::new("DERIBIT"),
792 ustr::Ustr::from("BTC"),
793 ustr::Ustr::from("BTC"),
794 UnixNanos::from(1_700_000_000_000_000_000u64),
795 )
796 }
797
798 fn make_test_queue() -> DeferredCommandQueue {
799 Rc::new(RefCell::new(VecDeque::new()))
800 }
801
802 fn make_manager() -> (OptionChainManager, DeferredCommandQueue) {
803 let series_id = make_series_id();
804 let topic = switchboard::get_option_chain_topic(series_id);
805 let tracker = AtmTracker::new();
806 let aggregator = OptionChainAggregator::new(
807 series_id,
808 StrikeRange::Fixed(vec![]),
809 tracker,
810 HashMap::new(),
811 );
812 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
813 let queue = make_test_queue();
814
815 let manager = OptionChainManager {
816 aggregator,
817 topic,
818 quote_handlers: Vec::new(),
819 greeks_handlers: Vec::new(),
820 timer_name: None,
821 msgbus_priority: 0,
822 bootstrapped: true,
823 deferred_cmd_queue: queue.clone(),
824 clock,
825 raw_mode: false,
826 };
827 (manager, queue)
828 }
829
830 #[rstest]
831 fn test_manager_handle_quote_no_instrument() {
832 let (mut manager, _queue) = make_manager();
833
834 let quote = QuoteTick::new(
836 InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
837 Price::from("100.00"),
838 Price::from("101.00"),
839 Quantity::from("1.0"),
840 Quantity::from("1.0"),
841 UnixNanos::from(1u64),
842 UnixNanos::from(1u64),
843 );
844 manager.handle_quote("e);
845 }
846
847 #[rstest]
848 fn test_manager_publish_slice_empty() {
849 let (mut manager, _queue) = make_manager();
850 manager.publish_slice(UnixNanos::from(100u64));
852 }
853
854 #[rstest]
855 fn test_manager_teardown_no_handlers() {
856 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
857 let (mut manager, _queue) = make_manager();
858 manager.teardown(&clock);
860 assert!(manager.quote_handlers.is_empty());
861 }
862
863 fn make_option_chain_manager() -> (OptionChainManager, DeferredCommandQueue) {
864 let series_id = make_series_id();
865 let topic = switchboard::get_option_chain_topic(series_id);
866
867 let strikes = [45000, 47500, 50000, 52500, 55000];
868 let mut instruments = HashMap::new();
869
870 for s in &strikes {
871 let strike = Price::from(&s.to_string());
872 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
873 let put_id = InstrumentId::from(&format!("BTC-20240101-{s}-P.DERIBIT"));
874 instruments.insert(call_id, (strike, OptionKind::Call));
875 instruments.insert(put_id, (strike, OptionKind::Put));
876 }
877
878 let tracker = AtmTracker::new();
879 let aggregator = OptionChainAggregator::new(
880 series_id,
881 StrikeRange::AtmRelative {
882 strikes_above: 1,
883 strikes_below: 1,
884 },
885 tracker,
886 instruments,
887 );
888 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
889 let queue = make_test_queue();
890
891 let manager = OptionChainManager {
892 aggregator,
893 topic,
894 quote_handlers: Vec::new(),
895 greeks_handlers: Vec::new(),
896 timer_name: None,
897 msgbus_priority: 0,
898 bootstrapped: false,
899 deferred_cmd_queue: queue.clone(),
900 clock,
901 raw_mode: false,
902 };
903 (manager, queue)
904 }
905
906 fn bootstrap_via_greeks(manager: &mut OptionChainManager) {
907 use nautilus_model::data::option_chain::OptionGreeks;
908 let greeks = OptionGreeks {
909 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
910 underlying_price: Some(50000.0),
911 ..Default::default()
912 };
913 manager.handle_greeks(&greeks);
914 }
915
916 #[rstest]
917 fn test_manager_publish_slice_triggers_rebalance() {
918 let (mut manager, queue) = make_option_chain_manager();
919 assert_eq!(manager.aggregator.instrument_ids().len(), 0);
921
922 bootstrap_via_greeks(&mut manager);
924 assert!(manager.bootstrapped);
925 assert_eq!(manager.aggregator.instrument_ids().len(), 6); assert_eq!(queue.borrow().len(), 18);
929
930 manager.publish_slice(UnixNanos::from(100u64));
932 assert!(manager.aggregator.last_atm_strike().is_some());
933 }
934
935 #[rstest]
936 fn test_manager_add_instrument_new() {
937 let (mut manager, _queue) = make_option_chain_manager();
938 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
939 let new_id = InstrumentId::from("BTC-20240101-57500-C.DERIBIT");
940 let strike = Price::from("57500");
941 let count_before = manager.aggregator.instruments().len();
942
943 let result = manager.add_instrument(new_id, strike, OptionKind::Call, None, &clock);
944
945 assert!(result);
946 assert_eq!(manager.aggregator.instruments().len(), count_before + 1);
947 }
948
949 #[rstest]
950 fn test_manager_add_instrument_already_known() {
951 let (mut manager, _queue) = make_option_chain_manager();
952 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
953 let existing_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
954 let strike = Price::from("50000");
955 let count_before = manager.aggregator.instruments().len();
956
957 let result = manager.add_instrument(existing_id, strike, OptionKind::Call, None, &clock);
958
959 assert!(!result);
960 assert_eq!(manager.aggregator.instruments().len(), count_before);
961 }
962
963 #[rstest]
964 fn test_manager_deferred_bootstrap_on_first_atm() {
965 let (mut manager, queue) = make_option_chain_manager();
966 assert!(!manager.bootstrapped);
968 assert_eq!(manager.aggregator.instrument_ids().len(), 0);
969 assert!(queue.borrow().is_empty());
970
971 bootstrap_via_greeks(&mut manager);
973
974 assert!(manager.bootstrapped);
975 assert_eq!(manager.aggregator.instrument_ids().len(), 6); assert_eq!(queue.borrow().len(), 18);
978
979 assert!(
981 queue
982 .borrow()
983 .iter()
984 .all(|cmd| matches!(cmd, DeferredCommand::Subscribe(_)))
985 );
986 }
987
988 #[rstest]
989 fn test_manager_bootstrap_idempotent() {
990 use nautilus_model::data::option_chain::OptionGreeks;
991
992 let (mut manager, _queue) = make_option_chain_manager();
993 bootstrap_via_greeks(&mut manager);
994 assert!(manager.bootstrapped);
995 let count = manager.aggregator.instrument_ids().len();
996
997 let greeks2 = OptionGreeks {
999 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1000 underlying_price: Some(50200.0),
1001 ..Default::default()
1002 };
1003 manager.handle_greeks(&greeks2);
1004 assert_eq!(manager.aggregator.instrument_ids().len(), count);
1005 }
1006
1007 #[rstest]
1008 fn test_manager_fixed_range_bootstrapped_immediately() {
1009 let (manager, queue) = make_manager();
1011 assert!(manager.bootstrapped);
1012 assert!(queue.borrow().is_empty());
1013 }
1014
1015 #[rstest]
1016 fn test_manager_forward_price_bootstrap_from_greeks() {
1017 use nautilus_model::data::option_chain::OptionGreeks;
1018
1019 let (mut manager, _queue) = make_option_chain_manager();
1020 assert!(!manager.bootstrapped);
1021
1022 let greeks = OptionGreeks {
1024 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1025 underlying_price: Some(50000.0),
1026 ..Default::default()
1027 };
1028 manager.handle_greeks(&greeks);
1029 assert!(manager.bootstrapped);
1030 assert_eq!(manager.aggregator.instrument_ids().len(), 6);
1032 }
1033
1034 #[rstest]
1035 fn test_manager_forward_price_no_bootstrap_without_underlying() {
1036 use nautilus_model::data::option_chain::OptionGreeks;
1037
1038 let (mut manager, _queue) = make_option_chain_manager();
1039 assert!(!manager.bootstrapped);
1040
1041 let greeks = OptionGreeks {
1043 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1044 underlying_price: None,
1045 ..Default::default()
1046 };
1047 manager.handle_greeks(&greeks);
1048 assert!(!manager.bootstrapped);
1049 }
1050
1051 #[rstest]
1052 fn test_handle_instrument_expired_removes_from_aggregator() {
1053 let (mut manager, queue) = make_option_chain_manager();
1054 bootstrap_via_greeks(&mut manager);
1056 assert!(manager.bootstrapped);
1057 let initial_count = manager.aggregator.instruments().len();
1058 queue.borrow_mut().clear(); let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1061 let is_empty = manager.handle_instrument_expired(&expired_id);
1062
1063 assert!(!is_empty);
1064 assert_eq!(manager.aggregator.instruments().len(), initial_count - 1);
1065 assert!(!manager.aggregator.active_ids().contains(&expired_id));
1066 }
1067
1068 #[rstest]
1069 fn test_handle_instrument_expired_pushes_deferred_unsubscribes() {
1070 let (mut manager, queue) = make_option_chain_manager();
1071 bootstrap_via_greeks(&mut manager);
1072 queue.borrow_mut().clear();
1073
1074 let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1075 manager.handle_instrument_expired(&expired_id);
1076
1077 let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1079 assert_eq!(cmds.len(), 3);
1080 assert!(
1081 cmds.iter()
1082 .all(|c| matches!(c, DeferredCommand::Unsubscribe(_)))
1083 );
1084 }
1085
1086 #[rstest]
1087 fn test_handle_instrument_expired_returns_true_when_last() {
1088 let series_id = make_series_id();
1089 let topic = switchboard::get_option_chain_topic(series_id);
1090 let call_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1091 let strike = Price::from("50000");
1092 let mut instruments = HashMap::new();
1093 instruments.insert(call_id, (strike, OptionKind::Call));
1094 let tracker = AtmTracker::new();
1095 let aggregator = OptionChainAggregator::new(
1096 series_id,
1097 StrikeRange::Fixed(vec![strike]),
1098 tracker,
1099 instruments,
1100 );
1101 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1102 let queue = make_test_queue();
1103
1104 let mut manager = OptionChainManager {
1105 aggregator,
1106 topic,
1107 quote_handlers: Vec::new(),
1108 greeks_handlers: Vec::new(),
1109 timer_name: None,
1110 msgbus_priority: 0,
1111 bootstrapped: true,
1112 deferred_cmd_queue: queue,
1113 clock,
1114 raw_mode: false,
1115 };
1116
1117 let is_empty = manager.handle_instrument_expired(&call_id);
1118 assert!(is_empty);
1119 assert!(manager.aggregator.is_catalog_empty());
1120 }
1121
1122 #[rstest]
1123 fn test_handle_instrument_expired_unknown_noop() {
1124 let (mut manager, queue) = make_manager();
1125 queue.borrow_mut().clear();
1126
1127 let unknown = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
1128 let is_empty = manager.handle_instrument_expired(&unknown);
1129
1130 assert!(is_empty);
1132 assert!(queue.borrow().is_empty()); }
1134
1135 #[rstest]
1136 fn test_publish_slice_pushes_expire_series_when_expired() {
1137 let (mut manager, queue) = make_option_chain_manager();
1138 bootstrap_via_greeks(&mut manager);
1139 queue.borrow_mut().clear();
1140
1141 let expiry_ns = manager.aggregator.series_id().expiration_ns;
1143 manager.publish_slice(expiry_ns);
1144
1145 let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1146 assert_eq!(cmds.len(), 1);
1147 assert!(matches!(cmds[0], DeferredCommand::ExpireSeries(_)));
1148 }
1149
1150 #[rstest]
1151 fn test_expired_instrument_unsubscribes_include_instrument_status() {
1152 let (mut manager, queue) = make_option_chain_manager();
1153 bootstrap_via_greeks(&mut manager);
1154 queue.borrow_mut().clear();
1155
1156 let expired_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
1157 manager.handle_instrument_expired(&expired_id);
1158
1159 let cmds: Vec<_> = queue.borrow().iter().cloned().collect();
1160 let status_unsubs = cmds
1162 .iter()
1163 .filter(|c| {
1164 matches!(
1165 c,
1166 DeferredCommand::Unsubscribe(UnsubscribeCommand::InstrumentStatus(_))
1167 )
1168 })
1169 .count();
1170 assert_eq!(status_unsubs, 1);
1171 }
1172}