1use std::collections::{BTreeMap, HashMap, HashSet};
19
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22 data::{
23 QuoteTick,
24 option_chain::{OptionChainSlice, OptionGreeks, OptionStrikeData, StrikeRange},
25 },
26 enums::OptionKind,
27 identifiers::{InstrumentId, OptionSeriesId},
28 types::Price,
29};
30
31use super::{
32 AtmTracker,
33 constants::{DEFAULT_REBALANCE_COOLDOWN_NS, DEFAULT_REBALANCE_HYSTERESIS},
34};
35
36#[derive(Debug)]
41pub struct OptionChainAggregator {
42 series_id: OptionSeriesId,
44 strike_range: StrikeRange,
46 atm_tracker: AtmTracker,
48 instruments: HashMap<InstrumentId, (Price, OptionKind)>,
51 active_ids: HashSet<InstrumentId>,
53 last_atm_strike: Option<Price>,
55 hysteresis: f64,
57 cooldown_ns: u64,
59 last_rebalance_ns: Option<UnixNanos>,
61 max_ts_event: UnixNanos,
63 pending_greeks: HashMap<InstrumentId, OptionGreeks>,
65 call_buffer: BTreeMap<Price, OptionStrikeData>,
67 put_buffer: BTreeMap<Price, OptionStrikeData>,
69}
70
71impl OptionChainAggregator {
72 pub fn new(
79 series_id: OptionSeriesId,
80 strike_range: StrikeRange,
81 atm_tracker: AtmTracker,
82 instruments: HashMap<InstrumentId, (Price, OptionKind)>,
83 ) -> Self {
84 let all_strikes = Self::sorted_strikes(&instruments);
85 let atm_price = atm_tracker.atm_price();
86 let active_strikes: HashSet<Price> = strike_range
87 .resolve(atm_price, &all_strikes)
88 .into_iter()
89 .collect();
90 let active_ids: HashSet<InstrumentId> = instruments
91 .iter()
92 .filter(|(_, (strike, _))| active_strikes.contains(strike))
93 .map(|(id, _)| *id)
94 .collect();
95 let last_atm_strike =
96 atm_price.and_then(|atm| Self::find_closest_strike(&all_strikes, atm));
97
98 Self {
99 series_id,
100 strike_range,
101 atm_tracker,
102 instruments,
103 active_ids,
104 last_atm_strike,
105 hysteresis: DEFAULT_REBALANCE_HYSTERESIS,
106 cooldown_ns: DEFAULT_REBALANCE_COOLDOWN_NS,
107 last_rebalance_ns: None,
108 max_ts_event: UnixNanos::default(),
109 pending_greeks: HashMap::new(),
110 call_buffer: BTreeMap::new(),
111 put_buffer: BTreeMap::new(),
112 }
113 }
114
115 pub fn atm_tracker_mut(&mut self) -> &mut AtmTracker {
117 &mut self.atm_tracker
118 }
119
120 #[must_use]
122 pub fn instrument_ids(&self) -> Vec<InstrumentId> {
123 self.active_ids.iter().copied().collect()
124 }
125
126 #[must_use]
128 pub fn active_ids(&self) -> &HashSet<InstrumentId> {
129 &self.active_ids
130 }
131
132 #[must_use]
134 pub fn series_id(&self) -> OptionSeriesId {
135 self.series_id
136 }
137
138 #[must_use]
140 pub fn is_expired(&self, now_ns: UnixNanos) -> bool {
141 now_ns >= self.series_id.expiration_ns
142 }
143
144 #[must_use]
146 pub fn instruments(&self) -> &HashMap<InstrumentId, (Price, OptionKind)> {
147 &self.instruments
148 }
149
150 #[must_use]
152 pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
153 self.instruments.keys().copied().collect()
154 }
155
156 #[must_use]
158 pub fn is_catalog_empty(&self) -> bool {
159 self.instruments.is_empty()
160 }
161
162 #[must_use]
168 pub fn remove_instrument(&mut self, instrument_id: &InstrumentId) -> bool {
169 let Some((strike, kind)) = self.instruments.remove(instrument_id) else {
170 return false;
171 };
172
173 self.active_ids.remove(instrument_id);
174 self.pending_greeks.remove(instrument_id);
175
176 let has_sibling = self
178 .instruments
179 .values()
180 .any(|(s, k)| *s == strike && *k == kind);
181
182 if !has_sibling {
183 let buffer = match kind {
184 OptionKind::Call => &mut self.call_buffer,
185 OptionKind::Put => &mut self.put_buffer,
186 };
187 buffer.remove(&strike);
188 }
189
190 true
191 }
192
193 #[must_use]
195 pub fn atm_tracker(&self) -> &AtmTracker {
196 &self.atm_tracker
197 }
198
199 pub fn recompute_active_set(&mut self) -> Vec<InstrumentId> {
204 let atm_price = self.atm_tracker.atm_price();
205 let all_strikes = Self::sorted_strikes(&self.instruments);
206 let active_strikes: HashSet<Price> = self
207 .strike_range
208 .resolve(atm_price, &all_strikes)
209 .into_iter()
210 .collect();
211 self.active_ids = self
212 .instruments
213 .iter()
214 .filter(|(_, (strike, _))| active_strikes.contains(strike))
215 .map(|(id, _)| *id)
216 .collect();
217 self.last_atm_strike =
218 atm_price.and_then(|atm| Self::find_closest_strike(&all_strikes, atm));
219 self.active_ids.iter().copied().collect()
220 }
221
222 #[must_use]
229 pub fn add_instrument(
230 &mut self,
231 instrument_id: InstrumentId,
232 strike: Price,
233 kind: OptionKind,
234 ) -> bool {
235 if self.instruments.contains_key(&instrument_id) {
236 return false;
237 }
238
239 self.instruments.insert(instrument_id, (strike, kind));
240
241 let all_strikes = Self::sorted_strikes(&self.instruments);
243 let atm_price = self.atm_tracker.atm_price();
244 let active_strikes: HashSet<Price> = self
245 .strike_range
246 .resolve(atm_price, &all_strikes)
247 .into_iter()
248 .collect();
249
250 if active_strikes.contains(&strike) {
251 self.active_ids.insert(instrument_id);
252 }
253
254 true
255 }
256
257 fn sorted_strikes(instruments: &HashMap<InstrumentId, (Price, OptionKind)>) -> Vec<Price> {
259 let mut strikes: Vec<Price> = instruments.values().map(|(s, _)| *s).collect();
260 strikes.sort();
261 strikes.dedup();
262 strikes
263 }
264
265 fn find_closest_strike(all_strikes: &[Price], atm: Price) -> Option<Price> {
267 all_strikes
268 .iter()
269 .min_by(|a, b| {
270 let da = (a.as_f64() - atm.as_f64()).abs();
271 let db = (b.as_f64() - atm.as_f64()).abs();
272 da.partial_cmp(&db).unwrap_or(std::cmp::Ordering::Equal)
273 })
274 .copied()
275 }
276
277 pub fn update_quote(&mut self, quote: &QuoteTick) {
279 if self.is_expired(quote.ts_event) {
280 log::warn!(
281 "Dropping quote for {}, series {} expired at {}",
282 quote.instrument_id,
283 self.series_id,
284 self.series_id.expiration_ns,
285 );
286 return;
287 }
288
289 if !self.active_ids.contains("e.instrument_id) {
290 return;
291 }
292
293 if let Some(&(strike, kind)) = self.instruments.get("e.instrument_id) {
294 if quote.ts_event > self.max_ts_event {
296 self.max_ts_event = quote.ts_event;
297 }
298
299 let buffer = match kind {
300 OptionKind::Call => &mut self.call_buffer,
301 OptionKind::Put => &mut self.put_buffer,
302 };
303
304 match buffer.get_mut(&strike) {
305 Some(data) => data.quote = *quote,
306 None => {
307 let greeks = self.pending_greeks.remove("e.instrument_id);
309 buffer.insert(
310 strike,
311 OptionStrikeData {
312 quote: *quote,
313 greeks,
314 },
315 );
316 }
317 }
318 }
319 }
320
321 pub fn update_greeks(&mut self, greeks: &OptionGreeks) {
327 if self.is_expired(greeks.ts_event) {
328 log::warn!(
329 "Dropping greeks for {}, series {} expired at {}",
330 greeks.instrument_id,
331 self.series_id,
332 self.series_id.expiration_ns,
333 );
334 return;
335 }
336
337 if !self.active_ids.contains(&greeks.instrument_id) {
338 return;
339 }
340
341 if let Some(&(strike, kind)) = self.instruments.get(&greeks.instrument_id) {
342 let buffer = match kind {
343 OptionKind::Call => &mut self.call_buffer,
344 OptionKind::Put => &mut self.put_buffer,
345 };
346
347 match buffer.get_mut(&strike) {
348 Some(data) => data.greeks = Some(*greeks),
349 None => {
350 self.pending_greeks.insert(greeks.instrument_id, *greeks);
352 }
353 }
354 }
355 }
356
357 pub fn snapshot(&self, ts_init: UnixNanos) -> OptionChainSlice {
366 let atm_price = self.atm_tracker.atm_price();
367
368 let catalog_strikes = Self::sorted_strikes(&self.instruments);
370 let atm_strike = atm_price.and_then(|atm| Self::find_closest_strike(&catalog_strikes, atm));
371
372 let active_strikes: HashSet<Price> = self
376 .active_ids
377 .iter()
378 .filter_map(|id| self.instruments.get(id).map(|(s, _)| *s))
379 .collect();
380
381 let mut calls = BTreeMap::new();
383
384 for (strike, data) in &self.call_buffer {
385 if active_strikes.contains(strike) {
386 calls.insert(*strike, data.clone());
387 }
388 }
389 let mut puts = BTreeMap::new();
390
391 for (strike, data) in &self.put_buffer {
392 if active_strikes.contains(strike) {
393 puts.insert(*strike, data.clone());
394 }
395 }
396
397 let ts_event = if self.max_ts_event == UnixNanos::default() {
399 ts_init
400 } else {
401 self.max_ts_event
402 };
403
404 OptionChainSlice {
405 series_id: self.series_id,
406 atm_strike,
407 calls,
408 puts,
409 ts_event,
410 ts_init,
411 }
412 }
413
414 #[must_use]
416 pub fn is_buffer_empty(&self) -> bool {
417 self.call_buffer.is_empty() && self.put_buffer.is_empty()
418 }
419
420 #[must_use]
427 pub fn check_rebalance(&self, now_ns: UnixNanos) -> Option<RebalanceAction> {
428 if matches!(self.strike_range, StrikeRange::Fixed(_)) {
430 return None;
431 }
432
433 let atm_price = self.atm_tracker.atm_price()?;
434 let all_strikes = Self::sorted_strikes(&self.instruments);
435 let current_atm_strike = Self::find_closest_strike(&all_strikes, atm_price)?;
436
437 if self.last_atm_strike == Some(current_atm_strike) {
439 return None;
440 }
441
442 if let Some(last_strike) = self.last_atm_strike
444 && self.hysteresis > 0.0
445 {
446 let last_f = last_strike.as_f64();
447 let atm_f = atm_price.as_f64();
448 let direction = atm_f - last_f;
449
450 let next_strike = if direction > 0.0 {
452 all_strikes.iter().find(|s| s.as_f64() > last_f)
453 } else {
454 all_strikes.iter().rev().find(|s| s.as_f64() < last_f)
455 };
456
457 if let Some(next) = next_strike {
458 let gap = (next.as_f64() - last_f).abs();
459 let threshold = last_f + direction.signum() * self.hysteresis * gap;
460 if direction > 0.0 && atm_f < threshold {
462 return None;
463 }
464
465 if direction < 0.0 && atm_f > threshold {
466 return None;
467 }
468 }
469 }
470
471 if self.cooldown_ns > 0
473 && let Some(last_ts) = self.last_rebalance_ns
474 && now_ns.as_u64().saturating_sub(last_ts.as_u64()) < self.cooldown_ns
475 {
476 return None;
477 }
478
479 let new_active_strikes: HashSet<Price> = self
481 .strike_range
482 .resolve(Some(atm_price), &all_strikes)
483 .into_iter()
484 .collect();
485 let new_active: HashSet<InstrumentId> = self
486 .instruments
487 .iter()
488 .filter(|(_, (s, _))| new_active_strikes.contains(s))
489 .map(|(id, _)| *id)
490 .collect();
491
492 let add = new_active.difference(&self.active_ids).copied().collect();
493 let remove = self.active_ids.difference(&new_active).copied().collect();
494
495 Some(RebalanceAction { add, remove })
496 }
497
498 pub fn apply_rebalance(&mut self, action: &RebalanceAction, now_ns: UnixNanos) {
501 for id in &action.add {
502 self.active_ids.insert(*id);
503 }
504
505 for id in &action.remove {
506 self.active_ids.remove(id);
507 }
508
509 let active_strikes: HashSet<Price> = self
511 .active_ids
512 .iter()
513 .filter_map(|id| self.instruments.get(id))
514 .map(|(s, _)| *s)
515 .collect();
516 self.call_buffer
517 .retain(|strike, _| active_strikes.contains(strike));
518 self.put_buffer
519 .retain(|strike, _| active_strikes.contains(strike));
520 self.pending_greeks
521 .retain(|id, _| self.active_ids.contains(id));
522
523 if let Some(atm) = self.atm_tracker.atm_price() {
525 let all_strikes = Self::sorted_strikes(&self.instruments);
526 self.last_atm_strike = Self::find_closest_strike(&all_strikes, atm);
527 }
528 self.last_rebalance_ns = Some(now_ns);
529 }
530}
531
532#[derive(Clone, Debug, PartialEq, Eq)]
534pub struct RebalanceAction {
535 pub add: Vec<InstrumentId>,
537 pub remove: Vec<InstrumentId>,
539}
540
541#[cfg(test)]
542impl OptionChainAggregator {
543 fn call_buffer_len(&self) -> usize {
544 self.call_buffer.len()
545 }
546
547 fn put_buffer_len(&self) -> usize {
548 self.put_buffer.len()
549 }
550
551 fn get_call_greeks_from_buffer(&self, strike: &Price) -> Option<&OptionGreeks> {
552 self.call_buffer.get(strike).and_then(|d| d.greeks.as_ref())
553 }
554
555 pub(crate) fn last_atm_strike(&self) -> Option<Price> {
556 self.last_atm_strike
557 }
558
559 fn set_hysteresis(&mut self, h: f64) {
560 self.hysteresis = h;
561 }
562
563 fn set_cooldown_ns(&mut self, ns: u64) {
564 self.cooldown_ns = ns;
565 }
566
567 fn pending_greeks_count(&self) -> usize {
568 self.pending_greeks.len()
569 }
570}
571
572#[cfg(test)]
573mod tests {
574 use nautilus_model::{data::greeks::OptionGreekValues, identifiers::Venue, types::Quantity};
575 use rstest::*;
576
577 use super::*;
578
579 fn make_series_id() -> OptionSeriesId {
580 OptionSeriesId::new(
581 Venue::new("DERIBIT"),
582 ustr::Ustr::from("BTC"),
583 ustr::Ustr::from("BTC"),
584 UnixNanos::from(1_700_000_000_000_000_000u64),
585 )
586 }
587
588 fn make_quote(instrument_id: InstrumentId, bid: &str, ask: &str) -> QuoteTick {
589 QuoteTick::new(
590 instrument_id,
591 Price::from(bid),
592 Price::from(ask),
593 Quantity::from("1.0"),
594 Quantity::from("1.0"),
595 UnixNanos::from(1u64),
596 UnixNanos::from(1u64),
597 )
598 }
599
600 fn now() -> UnixNanos {
601 UnixNanos::from(1_000_000_000_000_000_000u64)
603 }
604
605 fn set_atm_via_greeks(agg: &mut OptionChainAggregator, price: f64) {
607 let greeks = OptionGreeks {
608 instrument_id: InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
609 underlying_price: Some(price),
610 ..Default::default()
611 };
612 agg.atm_tracker_mut().update_from_option_greeks(&greeks);
613 }
614
615 fn make_aggregator() -> (OptionChainAggregator, InstrumentId, InstrumentId) {
616 let call_id = InstrumentId::from("BTC-20240101-50000-C.DERIBIT");
617 let put_id = InstrumentId::from("BTC-20240101-50000-P.DERIBIT");
618 let strike = Price::from("50000");
619
620 let mut instrument_map = HashMap::new();
621 instrument_map.insert(call_id, (strike, OptionKind::Call));
622 instrument_map.insert(put_id, (strike, OptionKind::Put));
623
624 let tracker = AtmTracker::new();
625 let agg = OptionChainAggregator::new(
626 make_series_id(),
627 StrikeRange::Fixed(vec![strike]),
628 tracker,
629 instrument_map,
630 );
631
632 (agg, call_id, put_id)
633 }
634
635 #[rstest]
636 fn test_aggregator_instrument_ids() {
637 let (agg, call_id, put_id) = make_aggregator();
638 let ids = agg.instrument_ids();
639 assert_eq!(ids.len(), 2);
640 assert!(ids.contains(&call_id));
641 assert!(ids.contains(&put_id));
642 }
643
644 #[rstest]
645 fn test_aggregator_update_quote() {
646 let (mut agg, call_id, _) = make_aggregator();
647 let quote = make_quote(call_id, "100.00", "101.00");
648
649 agg.update_quote("e);
650
651 assert_eq!(agg.call_buffer_len(), 1);
652 assert_eq!(agg.put_buffer_len(), 0);
653 }
654
655 #[rstest]
656 fn test_aggregator_update_greeks() {
657 let (mut agg, call_id, _) = make_aggregator();
658 let quote = make_quote(call_id, "100.00", "101.00");
659 agg.update_quote("e);
660
661 let greeks = OptionGreeks {
662 instrument_id: call_id,
663 greeks: OptionGreekValues {
664 delta: 0.55,
665 ..Default::default()
666 },
667 ..Default::default()
668 };
669 agg.update_greeks(&greeks);
670
671 let strike = Price::from("50000");
672 let data = agg.get_call_greeks_from_buffer(&strike);
673 assert!(data.is_some());
674 assert_eq!(data.unwrap().delta, 0.55);
675 }
676
677 #[rstest]
678 fn test_aggregator_snapshot_preserves_state() {
679 let (mut agg, call_id, _) = make_aggregator();
680 let quote = make_quote(call_id, "100.00", "101.00");
681 agg.update_quote("e);
682
683 let slice = agg.snapshot(UnixNanos::from(100u64));
684 assert_eq!(slice.call_count(), 1);
685 assert_eq!(slice.ts_init, UnixNanos::from(100u64));
686
687 assert!(!agg.is_buffer_empty());
689
690 let slice2 = agg.snapshot(UnixNanos::from(200u64));
692 assert_eq!(slice2.call_count(), 1);
693 assert_eq!(slice2.ts_init, UnixNanos::from(200u64));
694 }
695
696 #[rstest]
697 fn test_aggregator_ignores_unknown_instrument() {
698 let (mut agg, _, _) = make_aggregator();
699 let unknown_id = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
700 let quote = make_quote(unknown_id, "100.00", "101.00");
701
702 agg.update_quote("e);
703
704 assert!(agg.is_buffer_empty());
705 }
706
707 #[rstest]
708 fn test_check_rebalance_returns_none() {
709 let (agg, _, _) = make_aggregator();
710 assert!(agg.check_rebalance(now()).is_none());
711 }
712
713 fn make_multi_strike_aggregator() -> OptionChainAggregator {
718 let strikes = [45000, 47500, 50000, 52500, 55000];
719 let mut instruments = HashMap::new();
720
721 for s in &strikes {
722 let strike = Price::from(&s.to_string());
723 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
724 let put_id = InstrumentId::from(&format!("BTC-20240101-{s}-P.DERIBIT"));
725 instruments.insert(call_id, (strike, OptionKind::Call));
726 instruments.insert(put_id, (strike, OptionKind::Put));
727 }
728
729 let tracker = AtmTracker::new();
730 let mut agg = OptionChainAggregator::new(
731 make_series_id(),
732 StrikeRange::AtmRelative {
733 strikes_above: 1,
734 strikes_below: 1,
735 },
736 tracker,
737 instruments,
738 );
739 agg.set_hysteresis(0.0);
741 agg.set_cooldown_ns(0);
742 agg
743 }
744
745 #[rstest]
746 fn test_check_rebalance_fixed_always_none() {
747 let (mut agg, _, _) = make_aggregator();
749 set_atm_via_greeks(&mut agg, 50000.0);
750 assert!(agg.check_rebalance(now()).is_none());
751 }
752
753 #[rstest]
754 fn test_check_rebalance_no_atm_returns_none() {
755 let agg = make_multi_strike_aggregator();
756 assert!(agg.check_rebalance(now()).is_none());
758 }
759
760 #[rstest]
761 fn test_check_rebalance_atm_unchanged_returns_none() {
762 let mut agg = make_multi_strike_aggregator();
763 set_atm_via_greeks(&mut agg, 50000.0);
765 let action = agg.check_rebalance(now()).unwrap();
767 agg.apply_rebalance(&action, now());
768
769 set_atm_via_greeks(&mut agg, 50200.0);
771 assert!(agg.check_rebalance(now()).is_none());
772 }
773
774 #[rstest]
775 fn test_check_rebalance_detects_atm_shift() {
776 let mut agg = make_multi_strike_aggregator();
777 set_atm_via_greeks(&mut agg, 50000.0);
779 let action = agg.check_rebalance(now()).unwrap();
780 agg.apply_rebalance(&action, now());
781 assert_eq!(agg.instrument_ids().len(), 6); set_atm_via_greeks(&mut agg, 55000.0);
786 let action2 = agg.check_rebalance(now()).unwrap();
787 assert!(!action2.add.is_empty() || !action2.remove.is_empty());
789 }
790
791 #[rstest]
792 fn test_apply_rebalance_updates_instrument_map() {
793 let mut agg = make_multi_strike_aggregator();
794 set_atm_via_greeks(&mut agg, 50000.0);
796 let action = agg.check_rebalance(now()).unwrap();
797 agg.apply_rebalance(&action, now());
798
799 let active_ids = agg.instrument_ids();
801 assert_eq!(active_ids.len(), 6); set_atm_via_greeks(&mut agg, 55000.0);
805 let action2 = agg.check_rebalance(now()).unwrap();
806 agg.apply_rebalance(&action2, now());
807
808 let active_ids2 = agg.instrument_ids();
810 assert_eq!(active_ids2.len(), 4); }
812
813 #[rstest]
814 fn test_apply_rebalance_cleans_buffers() {
815 let mut agg = make_multi_strike_aggregator();
816 set_atm_via_greeks(&mut agg, 50000.0);
818 let action = agg.check_rebalance(now()).unwrap();
819 agg.apply_rebalance(&action, now());
820
821 let call_47500 = InstrumentId::from("BTC-20240101-47500-C.DERIBIT");
823 let quote = make_quote(call_47500, "100.00", "101.00");
824 agg.update_quote("e);
825 assert_eq!(agg.call_buffer_len(), 1);
826
827 set_atm_via_greeks(&mut agg, 55000.0);
829 let action2 = agg.check_rebalance(now()).unwrap();
830 agg.apply_rebalance(&action2, now());
831
832 assert_eq!(agg.call_buffer_len(), 0);
834 }
835
836 #[rstest]
837 fn test_initial_active_set_empty_when_no_atm() {
838 let agg = make_multi_strike_aggregator();
839 assert_eq!(agg.instrument_ids().len(), 0);
841 assert_eq!(agg.all_instrument_ids().len(), 10);
842 }
843
844 #[rstest]
845 fn test_catalog_vs_active_separation() {
846 let mut agg = make_multi_strike_aggregator();
847 set_atm_via_greeks(&mut agg, 50000.0);
849 let action = agg.check_rebalance(now()).unwrap();
850 agg.apply_rebalance(&action, now());
851
852 assert_eq!(agg.instruments().len(), 10);
854 assert_eq!(agg.instrument_ids().len(), 6);
856 }
857
858 #[rstest]
861 fn test_add_instrument_already_known() {
862 let (mut agg, call_id, _) = make_aggregator();
863 let strike = Price::from("50000");
864 let count_before = agg.instruments().len();
865
866 let result = agg.add_instrument(call_id, strike, OptionKind::Call);
867
868 assert!(!result);
869 assert_eq!(agg.instruments().len(), count_before);
870 }
871
872 #[rstest]
873 fn test_add_instrument_new_in_active_range() {
874 let (mut agg, _, _) = make_aggregator();
875 let new_id = InstrumentId::from("BTC-20240101-50000-C2.DERIBIT");
877 let strike = Price::from("50000");
878
879 let result = agg.add_instrument(new_id, strike, OptionKind::Call);
880
881 assert!(result);
882 assert_eq!(agg.instruments().len(), 3);
883 assert!(agg.active_ids().contains(&new_id));
884 }
885
886 #[rstest]
887 fn test_add_instrument_new_out_of_range() {
888 let (mut agg, _, _) = make_aggregator();
889 let new_id = InstrumentId::from("BTC-20240101-60000-C.DERIBIT");
891 let strike = Price::from("60000");
892
893 let result = agg.add_instrument(new_id, strike, OptionKind::Call);
894
895 assert!(result);
896 assert_eq!(agg.instruments().len(), 3);
897 assert!(!agg.active_ids().contains(&new_id));
898 }
899
900 #[rstest]
901 fn test_add_instrument_available_for_rebalance() {
902 let mut agg = make_multi_strike_aggregator();
903 set_atm_via_greeks(&mut agg, 50000.0);
905 let action = agg.check_rebalance(now()).unwrap();
906 agg.apply_rebalance(&action, now());
907 assert_eq!(agg.instrument_ids().len(), 6);
909
910 let new_id = InstrumentId::from("BTC-20240101-57500-C.DERIBIT");
912 let strike = Price::from("57500");
913 let result = agg.add_instrument(new_id, strike, OptionKind::Call);
914 assert!(result);
915 assert!(!agg.active_ids().contains(&new_id));
916
917 set_atm_via_greeks(&mut agg, 57500.0);
919 let action2 = agg.check_rebalance(now()).unwrap();
920 agg.apply_rebalance(&action2, now());
921
922 assert!(agg.active_ids().contains(&new_id));
923 }
924
925 #[rstest]
928 fn test_hysteresis_blocks_small_movement() {
929 let strikes = [47500, 50000, 52500];
930 let mut instruments = HashMap::new();
931
932 for s in &strikes {
933 let strike = Price::from(&s.to_string());
934 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
935 instruments.insert(call_id, (strike, OptionKind::Call));
936 }
937 let tracker = AtmTracker::new();
938 let mut agg = OptionChainAggregator::new(
939 make_series_id(),
940 StrikeRange::AtmRelative {
941 strikes_above: 1,
942 strikes_below: 1,
943 },
944 tracker,
945 instruments,
946 );
947 agg.set_hysteresis(0.6);
948 agg.set_cooldown_ns(0);
949
950 set_atm_via_greeks(&mut agg, 50000.0);
952 let action = agg.check_rebalance(now()).unwrap();
953 agg.apply_rebalance(&action, now());
954 assert_eq!(agg.last_atm_strike(), Some(Price::from("50000")));
955
956 set_atm_via_greeks(&mut agg, 51000.0);
959 assert!(agg.check_rebalance(now()).is_none());
960 }
961
962 #[rstest]
963 fn test_hysteresis_allows_large_movement() {
964 let strikes = [47500, 50000, 52500];
965 let mut instruments = HashMap::new();
966
967 for s in &strikes {
968 let strike = Price::from(&s.to_string());
969 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
970 instruments.insert(call_id, (strike, OptionKind::Call));
971 }
972 let tracker = AtmTracker::new();
973 let mut agg = OptionChainAggregator::new(
974 make_series_id(),
975 StrikeRange::AtmRelative {
976 strikes_above: 1,
977 strikes_below: 1,
978 },
979 tracker,
980 instruments,
981 );
982 agg.set_hysteresis(0.6);
983 agg.set_cooldown_ns(0);
984
985 set_atm_via_greeks(&mut agg, 50000.0);
987 let action = agg.check_rebalance(now()).unwrap();
988 agg.apply_rebalance(&action, now());
989
990 set_atm_via_greeks(&mut agg, 52000.0);
992 assert!(agg.check_rebalance(now()).is_some());
993 }
994
995 #[rstest]
996 fn test_zero_hysteresis_disables_guard() {
997 let mut agg = make_multi_strike_aggregator();
998 agg.set_hysteresis(0.0);
999 agg.set_cooldown_ns(0);
1000
1001 set_atm_via_greeks(&mut agg, 50000.0);
1002 let action = agg.check_rebalance(now()).unwrap();
1003 agg.apply_rebalance(&action, now());
1004
1005 set_atm_via_greeks(&mut agg, 52500.0);
1007 assert!(agg.check_rebalance(now()).is_some());
1008 }
1009
1010 #[rstest]
1013 fn test_cooldown_blocks_rapid_rebalance() {
1014 let mut agg = make_multi_strike_aggregator();
1015 agg.set_hysteresis(0.0);
1016 agg.set_cooldown_ns(5_000_000_000); set_atm_via_greeks(&mut agg, 50000.0);
1019 let t0 = now();
1020 let action = agg.check_rebalance(t0).unwrap();
1021 agg.apply_rebalance(&action, t0);
1022
1023 set_atm_via_greeks(&mut agg, 55000.0);
1025 let t1 = UnixNanos::from(t0.as_u64() + 1_000_000_000); assert!(agg.check_rebalance(t1).is_none());
1027 }
1028
1029 #[rstest]
1030 fn test_cooldown_allows_after_elapsed() {
1031 let mut agg = make_multi_strike_aggregator();
1032 agg.set_hysteresis(0.0);
1033 agg.set_cooldown_ns(5_000_000_000); set_atm_via_greeks(&mut agg, 50000.0);
1036 let t0 = now();
1037 let action = agg.check_rebalance(t0).unwrap();
1038 agg.apply_rebalance(&action, t0);
1039
1040 set_atm_via_greeks(&mut agg, 55000.0);
1042 let t1 = UnixNanos::from(t0.as_u64() + 6_000_000_000); assert!(agg.check_rebalance(t1).is_some());
1044 }
1045
1046 #[rstest]
1047 fn test_zero_cooldown_disables_guard() {
1048 let mut agg = make_multi_strike_aggregator();
1049 agg.set_hysteresis(0.0);
1050 agg.set_cooldown_ns(0);
1051
1052 set_atm_via_greeks(&mut agg, 50000.0);
1053 let t0 = now();
1054 let action = agg.check_rebalance(t0).unwrap();
1055 agg.apply_rebalance(&action, t0);
1056
1057 set_atm_via_greeks(&mut agg, 55000.0);
1059 assert!(agg.check_rebalance(t0).is_some());
1060 }
1061
1062 #[rstest]
1065 fn test_pending_greeks_consumed_on_first_quote() {
1066 let (mut agg, call_id, _) = make_aggregator();
1067
1068 let greeks = OptionGreeks {
1070 instrument_id: call_id,
1071 greeks: OptionGreekValues {
1072 delta: 0.55,
1073 ..Default::default()
1074 },
1075 ..Default::default()
1076 };
1077 agg.update_greeks(&greeks);
1078 assert_eq!(agg.pending_greeks_count(), 1);
1079
1080 let quote = make_quote(call_id, "100.00", "101.00");
1082 agg.update_quote("e);
1083 assert_eq!(agg.pending_greeks_count(), 0);
1084
1085 let strike = Price::from("50000");
1087 let data = agg.get_call_greeks_from_buffer(&strike);
1088 assert!(data.is_some());
1089 assert_eq!(data.unwrap().delta, 0.55);
1090 }
1091
1092 #[rstest]
1095 fn test_snapshot_ts_event_reflects_max_quote_timestamp() {
1096 let (mut agg, call_id, put_id) = make_aggregator();
1097
1098 let quote1 = QuoteTick::new(
1099 call_id,
1100 Price::from("100.00"),
1101 Price::from("101.00"),
1102 Quantity::from("1.0"),
1103 Quantity::from("1.0"),
1104 UnixNanos::from(500u64), UnixNanos::from(500u64),
1106 );
1107 agg.update_quote("e1);
1108
1109 let quote2 = QuoteTick::new(
1110 put_id,
1111 Price::from("50.00"),
1112 Price::from("51.00"),
1113 Quantity::from("1.0"),
1114 Quantity::from("1.0"),
1115 UnixNanos::from(800u64), UnixNanos::from(800u64),
1117 );
1118 agg.update_quote("e2);
1119
1120 let slice = agg.snapshot(UnixNanos::from(1000u64));
1121 assert_eq!(slice.ts_event, UnixNanos::from(800u64));
1122 assert_eq!(slice.ts_init, UnixNanos::from(1000u64));
1123 }
1124
1125 #[rstest]
1126 fn test_snapshot_ts_event_fallback_when_no_quotes() {
1127 let (agg, _, _) = make_aggregator();
1128 let slice = agg.snapshot(UnixNanos::from(1000u64));
1129 assert_eq!(slice.ts_event, UnixNanos::from(1000u64));
1131 }
1132
1133 #[rstest]
1134 fn test_snapshot_retains_buffered_data_during_hysteresis_window() {
1135 let strikes = [47500, 50000, 52500];
1137 let mut instruments = HashMap::new();
1138
1139 for s in &strikes {
1140 let strike = Price::from(&s.to_string());
1141 let call_id = InstrumentId::from(&format!("BTC-20240101-{s}-C.DERIBIT"));
1142 instruments.insert(call_id, (strike, OptionKind::Call));
1143 }
1144 let tracker = AtmTracker::new();
1145 let mut agg = OptionChainAggregator::new(
1146 make_series_id(),
1147 StrikeRange::AtmRelative {
1148 strikes_above: 1,
1149 strikes_below: 1,
1150 },
1151 tracker,
1152 instruments,
1153 );
1154 agg.set_hysteresis(0.6);
1155 agg.set_cooldown_ns(0);
1156
1157 set_atm_via_greeks(&mut agg, 50000.0);
1159 let action = agg.check_rebalance(now()).unwrap();
1160 agg.apply_rebalance(&action, now());
1161 assert_eq!(agg.instrument_ids().len(), 3);
1162
1163 let q1 = make_quote(
1165 InstrumentId::from("BTC-20240101-47500-C.DERIBIT"),
1166 "3000.00",
1167 "3100.00",
1168 );
1169 let q2 = make_quote(
1170 InstrumentId::from("BTC-20240101-50000-C.DERIBIT"),
1171 "1500.00",
1172 "1600.00",
1173 );
1174 let q3 = make_quote(
1175 InstrumentId::from("BTC-20240101-52500-C.DERIBIT"),
1176 "500.00",
1177 "600.00",
1178 );
1179 agg.update_quote(&q1);
1180 agg.update_quote(&q2);
1181 agg.update_quote(&q3);
1182 assert_eq!(agg.call_buffer_len(), 3);
1183
1184 set_atm_via_greeks(&mut agg, 51000.0);
1186 assert!(agg.check_rebalance(now()).is_none());
1187
1188 let slice = agg.snapshot(UnixNanos::from(100u64));
1190 assert_eq!(slice.call_count(), 3);
1191 }
1192
1193 #[rstest]
1194 fn test_remove_instrument_from_catalog() {
1195 let (mut agg, call_id, put_id) = make_aggregator();
1196 assert_eq!(agg.instruments().len(), 2);
1197
1198 let removed = agg.remove_instrument(&call_id);
1199 assert!(removed);
1200 assert_eq!(agg.instruments().len(), 1);
1201 assert!(!agg.active_ids().contains(&call_id));
1202 assert!(agg.instruments().contains_key(&put_id));
1203 }
1204
1205 #[rstest]
1206 fn test_remove_instrument_cleans_buffer() {
1207 let (mut agg, call_id, _) = make_aggregator();
1208 let quote = make_quote(call_id, "100.00", "101.00");
1209 agg.update_quote("e);
1210 assert_eq!(agg.call_buffer_len(), 1);
1211
1212 let _ = agg.remove_instrument(&call_id);
1213 assert_eq!(agg.call_buffer_len(), 0);
1215 }
1216
1217 #[rstest]
1218 fn test_remove_instrument_preserves_sibling_buffer() {
1219 let (mut agg, call_id, _) = make_aggregator();
1220 let sibling_id = InstrumentId::from("BTC-20240101-50000-C2.DERIBIT");
1222 let strike = Price::from("50000");
1223 let _ = agg.add_instrument(sibling_id, strike, OptionKind::Call);
1224
1225 let quote = make_quote(call_id, "100.00", "101.00");
1226 agg.update_quote("e);
1227 assert_eq!(agg.call_buffer_len(), 1);
1228
1229 let _ = agg.remove_instrument(&call_id);
1231 assert_eq!(agg.call_buffer_len(), 1); assert!(agg.instruments().contains_key(&sibling_id));
1233 }
1234
1235 #[rstest]
1236 fn test_remove_instrument_unknown_noop() {
1237 let (mut agg, _, _) = make_aggregator();
1238 let unknown = InstrumentId::from("ETH-20240101-3000-C.DERIBIT");
1239 assert!(!agg.remove_instrument(&unknown));
1240 assert_eq!(agg.instruments().len(), 2);
1241 }
1242
1243 #[rstest]
1244 fn test_remove_instrument_cleans_pending_greeks() {
1245 let (mut agg, call_id, _) = make_aggregator();
1246 let greeks = OptionGreeks {
1247 instrument_id: call_id,
1248 greeks: OptionGreekValues {
1249 delta: 0.55,
1250 ..Default::default()
1251 },
1252 ..Default::default()
1253 };
1254 agg.update_greeks(&greeks);
1255 assert_eq!(agg.pending_greeks_count(), 1);
1256
1257 let _ = agg.remove_instrument(&call_id);
1258 assert_eq!(agg.pending_greeks_count(), 0);
1259 }
1260
1261 #[rstest]
1262 fn test_is_catalog_empty_after_full_removal() {
1263 let (mut agg, call_id, put_id) = make_aggregator();
1264 assert!(!agg.is_catalog_empty());
1265
1266 let _ = agg.remove_instrument(&call_id);
1267 assert!(!agg.is_catalog_empty());
1268
1269 let _ = agg.remove_instrument(&put_id);
1270 assert!(agg.is_catalog_empty());
1271 }
1272
1273 #[rstest]
1276 fn test_expired_quote_is_dropped() {
1277 let (mut agg, call_id, _) = make_aggregator();
1278 let expired_quote = QuoteTick::new(
1280 call_id,
1281 Price::from("100.00"),
1282 Price::from("101.00"),
1283 Quantity::from("1.0"),
1284 Quantity::from("1.0"),
1285 UnixNanos::from(1_700_000_000_000_000_000u64),
1286 UnixNanos::from(1_700_000_000_000_000_000u64),
1287 );
1288 agg.update_quote(&expired_quote);
1289 assert!(agg.is_buffer_empty());
1290 }
1291
1292 #[rstest]
1293 fn test_expired_greeks_are_dropped() {
1294 let (mut agg, call_id, _) = make_aggregator();
1295 let quote = make_quote(call_id, "100.00", "101.00");
1297 agg.update_quote("e);
1298 assert_eq!(agg.call_buffer_len(), 1);
1299
1300 let greeks = OptionGreeks {
1302 instrument_id: call_id,
1303 ts_event: UnixNanos::from(1_700_000_000_000_000_000u64),
1304 greeks: OptionGreekValues {
1305 delta: 0.55,
1306 ..Default::default()
1307 },
1308 ..Default::default()
1309 };
1310 agg.update_greeks(&greeks);
1311
1312 let strike = Price::from("50000");
1313 assert!(agg.get_call_greeks_from_buffer(&strike).is_none());
1314 }
1315}