1use std::fmt::Debug;
19
20use nautilus_common::{actor::DataActor, timer::TimeEvent};
21use nautilus_core::params::Params;
22use nautilus_model::{
23 data::{QuoteTick, option_chain::OptionGreeks},
24 enums::{OptionKind, OrderSide},
25 events::{OrderCanceled, OrderFilled},
26 identifiers::InstrumentId,
27 instruments::Instrument,
28 orders::Order,
29 types::{Price, Quantity},
30};
31use serde_json::json;
32use ustr::Ustr;
33
34use super::config::DeltaNeutralVolConfig;
35use crate::{
36 nautilus_strategy,
37 strategy::{Strategy, StrategyCore},
38};
39
40const REHEDGE_TIMER: &str = "delta_rehedge";
41
42pub struct DeltaNeutralVol {
49 pub(super) core: StrategyCore,
50 pub(super) config: DeltaNeutralVolConfig,
51 pub(super) call_instrument_id: Option<InstrumentId>,
52 pub(super) put_instrument_id: Option<InstrumentId>,
53 pub(super) subscribed_greeks: Vec<InstrumentId>,
54 pub(super) call_delta: f64,
55 pub(super) put_delta: f64,
56 pub(super) call_mark_iv: Option<f64>,
57 pub(super) put_mark_iv: Option<f64>,
58 pub(super) call_delta_ready: bool,
59 pub(super) put_delta_ready: bool,
60 pub(super) call_position: f64,
61 pub(super) put_position: f64,
62 pub(super) hedge_position: f64,
63 pub(super) hedge_pending: bool,
64}
65
66impl DeltaNeutralVol {
67 #[must_use]
69 pub fn new(config: DeltaNeutralVolConfig) -> Self {
70 Self {
71 core: StrategyCore::new(config.base.clone()),
72 call_instrument_id: None,
73 put_instrument_id: None,
74 subscribed_greeks: Vec::new(),
75 call_delta: 0.0,
76 put_delta: 0.0,
77 call_mark_iv: None,
78 put_mark_iv: None,
79 call_delta_ready: false,
80 put_delta_ready: false,
81 call_position: 0.0,
82 put_position: 0.0,
83 hedge_position: 0.0,
84 hedge_pending: false,
85 config,
86 }
87 }
88
89 #[must_use]
91 pub fn portfolio_delta(&self) -> f64 {
92 self.call_delta * self.call_position
93 + self.put_delta * self.put_position
94 + self.hedge_position
95 }
96
97 #[must_use]
99 pub fn greeks_initialized(&self) -> bool {
100 self.call_instrument_id.is_some()
101 && self.put_instrument_id.is_some()
102 && self.call_delta_ready
103 && self.put_delta_ready
104 }
105
106 #[must_use]
108 pub fn should_rehedge(&self) -> bool {
109 self.greeks_initialized()
110 && self.portfolio_delta().abs() > self.config.rehedge_delta_threshold
111 }
112
113 #[must_use]
116 pub fn should_enter_strangle(&self) -> bool {
117 self.config.enter_strangle
118 && self.greeks_initialized()
119 && self.call_mark_iv.is_some()
120 && self.put_mark_iv.is_some()
121 && self.call_position == 0.0
122 && self.put_position == 0.0
123 && !self.has_working_entry_orders()
124 }
125
126 #[must_use]
128 pub fn has_working_entry_orders(&self) -> bool {
129 let cache = self.cache();
130
131 for id in [self.call_instrument_id, self.put_instrument_id]
132 .into_iter()
133 .flatten()
134 {
135 let open = cache.orders_open(None, Some(&id), None, None, None);
136 let inflight = cache.orders_inflight(None, Some(&id), None, None, None);
137
138 if !open.is_empty() || !inflight.is_empty() {
139 return true;
140 }
141 }
142 false
143 }
144
145 fn enter_strangle(&mut self) -> anyhow::Result<()> {
146 if !self.should_enter_strangle() {
147 return Ok(());
148 }
149
150 let call_id = self.call_instrument_id.unwrap();
151 let put_id = self.put_instrument_id.unwrap();
152 let call_iv = self.call_mark_iv.unwrap();
153 let put_iv = self.put_mark_iv.unwrap();
154 let offset = self.config.entry_iv_offset;
155
156 let call_entry_iv = call_iv - offset;
157 let put_entry_iv = put_iv - offset;
158
159 log::info!(
160 "Entering strangle: SELL {} x {call_id} @ iv={call_entry_iv:.4} \
161 + SELL {} x {put_id} @ iv={put_entry_iv:.4} (offset={offset})",
162 self.config.contracts,
163 self.config.contracts,
164 );
165
166 let contracts = self.config.contracts;
167 let tif = self.config.entry_time_in_force;
168 let client_id = self.config.client_id;
169
170 let call_order = self.core.order_factory().limit(
171 call_id,
172 OrderSide::Sell,
173 Quantity::new(contracts as f64, 0),
174 Price::new(call_entry_iv, 4),
175 Some(tif),
176 None,
177 None,
178 None,
179 None,
180 None,
181 None,
182 None,
183 None,
184 None,
185 None,
186 None,
187 );
188
189 let mut call_params = Params::new();
190 call_params.insert(
191 self.config.iv_param_key.clone(),
192 json!(call_entry_iv.to_string()),
193 );
194
195 self.submit_order_with_params(call_order, None, Some(client_id), call_params)?;
196
197 let put_order = self.core.order_factory().limit(
198 put_id,
199 OrderSide::Sell,
200 Quantity::new(contracts as f64, 0),
201 Price::new(put_entry_iv, 4),
202 Some(tif),
203 None,
204 None,
205 None,
206 None,
207 None,
208 None,
209 None,
210 None,
211 None,
212 None,
213 None,
214 );
215
216 let mut put_params = Params::new();
217 put_params.insert(
218 self.config.iv_param_key.clone(),
219 json!(put_entry_iv.to_string()),
220 );
221
222 self.submit_order_with_params(put_order, None, Some(client_id), put_params)?;
223
224 Ok(())
225 }
226
227 fn check_rehedge(&mut self) -> anyhow::Result<()> {
228 let delta = self.portfolio_delta();
229
230 if !self.should_rehedge() {
231 return Ok(());
232 }
233
234 if self.hedge_pending {
235 log::info!("Hedge order already pending, skipping rehedge");
236 return Ok(());
237 }
238
239 let hedge_qty = delta.abs();
240 let side = if delta > 0.0 {
241 OrderSide::Sell
242 } else {
243 OrderSide::Buy
244 };
245
246 log::info!(
247 "Rehedging: portfolio_delta={delta:.4}, submitting {side:?} {hedge_qty:.4} on {}",
248 self.config.hedge_instrument_id,
249 );
250
251 let hedge_id = self.config.hedge_instrument_id;
252 let size_precision = {
253 let cache = self.cache();
254 cache
255 .instrument(&hedge_id)
256 .map_or(2, |i| i.size_precision())
257 };
258
259 let order = self.core.order_factory().market(
260 hedge_id,
261 side,
262 Quantity::new(hedge_qty, size_precision),
263 None,
264 None,
265 None,
266 None,
267 None,
268 None,
269 None,
270 );
271
272 self.hedge_pending = true;
273
274 if let Err(e) = self.submit_order(order, None, Some(self.config.client_id)) {
275 self.hedge_pending = false;
276 return Err(e);
277 }
278
279 Ok(())
280 }
281}
282
283nautilus_strategy!(DeltaNeutralVol);
284
285impl Debug for DeltaNeutralVol {
286 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
287 f.debug_struct(stringify!(DeltaNeutralVol))
288 .field("config", &self.config)
289 .field("call_instrument_id", &self.call_instrument_id)
290 .field("put_instrument_id", &self.put_instrument_id)
291 .field("call_delta", &self.call_delta)
292 .field("put_delta", &self.put_delta)
293 .field("portfolio_delta", &self.portfolio_delta())
294 .finish()
295 }
296}
297
298impl DataActor for DeltaNeutralVol {
299 fn on_start(&mut self) -> anyhow::Result<()> {
300 let venue = self.config.hedge_instrument_id.venue;
301 let underlying = Ustr::from(&self.config.option_family);
302 let now_ns = self.timestamp_ns().as_u64();
303
304 let mut calls: Vec<(InstrumentId, f64, u64)> = Vec::new();
305 let mut puts: Vec<(InstrumentId, f64, u64)> = Vec::new();
306
307 {
308 let cache = self.cache();
309 let instruments = cache.instruments(&venue, Some(&underlying));
310
311 for inst in &instruments {
312 let Some(expiry_ns) = inst.expiration_ns() else {
313 continue;
314 };
315
316 if expiry_ns.as_u64() <= now_ns {
317 continue;
318 }
319
320 if let Some(ref filter) = self.config.expiry_filter {
321 let symbol = inst.symbol().inner();
322 if !symbol.as_str().contains(filter.as_str()) {
323 continue;
324 }
325 }
326
327 let strike = match inst.strike_price() {
328 Some(p) => p.as_f64(),
329 None => continue,
330 };
331
332 match inst.option_kind() {
333 Some(OptionKind::Call) => {
334 calls.push((inst.id(), strike, expiry_ns.as_u64()));
335 }
336 Some(OptionKind::Put) => {
337 puts.push((inst.id(), strike, expiry_ns.as_u64()));
338 }
339 None => {}
340 }
341 }
342 }
343
344 if calls.is_empty() || puts.is_empty() {
345 log::warn!(
346 "Insufficient options found for family '{}': {} calls, {} puts",
347 self.config.option_family,
348 calls.len(),
349 puts.len(),
350 );
351 return Ok(());
352 }
353
354 if self.config.expiry_filter.is_none() {
355 let nearest = calls
356 .iter()
357 .chain(puts.iter())
358 .map(|(_, _, exp)| *exp)
359 .min()
360 .unwrap();
361 calls.retain(|(_, _, exp)| *exp == nearest);
362 puts.retain(|(_, _, exp)| *exp == nearest);
363 }
364
365 if calls.is_empty() || puts.is_empty() {
366 log::warn!(
367 "Nearest expiry has incomplete chain: {} calls, {} puts",
368 calls.len(),
369 puts.len(),
370 );
371 return Ok(());
372 }
373
374 log::info!(
375 "Found {} calls and {} puts for family '{}'",
376 calls.len(),
377 puts.len(),
378 self.config.option_family,
379 );
380
381 calls.sort_by(|(_, s1, _), (_, s2, _)| s1.partial_cmp(s2).unwrap());
386 puts.sort_by(|(_, s1, _), (_, s2, _)| s1.partial_cmp(s2).unwrap());
387
388 let call_idx = ((1.0 - self.config.target_call_delta) * calls.len() as f64) as usize;
390 let call_idx = call_idx.min(calls.len() - 1);
391 let (call_id, call_strike, _) = calls[call_idx];
392
393 let put_idx = (self.config.target_put_delta.abs() * puts.len() as f64) as usize;
395 let put_idx = put_idx.min(puts.len() - 1);
396 let (put_id, put_strike, _) = puts[put_idx];
397
398 self.call_instrument_id = Some(call_id);
399 self.put_instrument_id = Some(put_id);
400
401 log::info!("Selected call: {call_id} (strike={call_strike})");
402 log::info!("Selected put: {put_id} (strike={put_strike})");
403 log::info!(
404 "Strangle: {} contracts per leg, hedge on {}",
405 self.config.contracts,
406 self.config.hedge_instrument_id,
407 );
408
409 let (cached_call_pos, cached_put_pos, cached_hedge_pos) = {
410 let cache = self.cache();
411 let hedge_id = self.config.hedge_instrument_id;
412
413 let call_pos: f64 = cache
414 .positions_open(None, Some(&call_id), None, None, None)
415 .iter()
416 .map(|p| p.signed_qty)
417 .sum();
418
419 let put_pos: f64 = cache
420 .positions_open(None, Some(&put_id), None, None, None)
421 .iter()
422 .map(|p| p.signed_qty)
423 .sum();
424
425 let hedge_pos: f64 = cache
426 .positions_open(None, Some(&hedge_id), None, None, None)
427 .iter()
428 .map(|p| p.signed_qty)
429 .sum();
430
431 (call_pos, put_pos, hedge_pos)
432 };
433
434 self.call_position = cached_call_pos;
435 self.put_position = cached_put_pos;
436 self.hedge_position = cached_hedge_pos;
437
438 if self.call_position != 0.0 || self.put_position != 0.0 || self.hedge_position != 0.0 {
439 log::info!(
440 "Hydrated positions: call={}, put={}, hedge={}",
441 self.call_position,
442 self.put_position,
443 self.hedge_position,
444 );
445 }
446
447 let client_id = self.config.client_id;
448
449 self.subscribe_option_greeks(call_id, Some(client_id), None);
450 self.subscribed_greeks.push(call_id);
451
452 self.subscribe_option_greeks(put_id, Some(client_id), None);
453 self.subscribed_greeks.push(put_id);
454
455 self.subscribe_quotes(self.config.hedge_instrument_id, None, None);
456
457 let interval_ns = self.config.rehedge_interval_secs * 1_000_000_000;
458 self.clock()
459 .set_timer_ns(REHEDGE_TIMER, interval_ns, None, None, None, None, None)?;
460
461 log::info!(
462 "Rehedge timer set: every {}s, threshold={}",
463 self.config.rehedge_interval_secs,
464 self.config.rehedge_delta_threshold,
465 );
466
467 if self.config.enter_strangle {
468 log::info!(
469 "Strangle entry enabled: SELL {} x {call_id} (call) + SELL {} x {put_id} (put) \
470 once Greeks arrive (iv_offset={})",
471 self.config.contracts,
472 self.config.contracts,
473 self.config.entry_iv_offset,
474 );
475 } else {
476 log::info!(
477 "Strangle entry disabled: hedging externally-held positions only. \
478 Monitoring {call_id} (call) + {put_id} (put)",
479 );
480 }
481
482 Ok(())
483 }
484
485 fn on_stop(&mut self) -> anyhow::Result<()> {
486 self.clock().cancel_timer(REHEDGE_TIMER);
487
488 let ids: Vec<InstrumentId> = self.subscribed_greeks.drain(..).collect();
489 let client_id = self.config.client_id;
490
491 for instrument_id in ids {
492 self.unsubscribe_option_greeks(instrument_id, Some(client_id), None);
493 }
494
495 if let Some(call_id) = self.call_instrument_id {
496 self.cancel_all_orders(call_id, None, None)?;
497 }
498
499 if let Some(put_id) = self.put_instrument_id {
500 self.cancel_all_orders(put_id, None, None)?;
501 }
502
503 let hedge_id = self.config.hedge_instrument_id;
504 self.unsubscribe_quotes(hedge_id, None, None);
505 self.cancel_all_orders(hedge_id, None, None)?;
506 self.hedge_pending = false;
507
508 log::info!("Delta-neutral vol strategy stopped, positions left unchanged");
509
510 Ok(())
511 }
512
513 fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
514 if Some(greeks.instrument_id) == self.call_instrument_id {
515 self.call_delta = greeks.greeks.delta;
516 self.call_delta_ready = true;
517
518 if let Some(iv) = greeks.mark_iv {
519 self.call_mark_iv = Some(iv);
520 }
521 } else if Some(greeks.instrument_id) == self.put_instrument_id {
522 self.put_delta = greeks.greeks.delta;
523 self.put_delta_ready = true;
524
525 if let Some(iv) = greeks.mark_iv {
526 self.put_mark_iv = Some(iv);
527 }
528 }
529
530 let portfolio_delta = self.portfolio_delta();
531
532 log::info!(
533 "Greeks update: {} delta={:.4} | portfolio_delta={portfolio_delta:.4} \
534 (call={:.4}*{}, put={:.4}*{}, hedge={})",
535 greeks.instrument_id,
536 greeks.greeks.delta,
537 self.call_delta,
538 self.call_position,
539 self.put_delta,
540 self.put_position,
541 self.hedge_position,
542 );
543
544 self.enter_strangle()?;
545 self.check_rehedge()?;
546
547 Ok(())
548 }
549
550 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
551 if quote.instrument_id == self.config.hedge_instrument_id {
552 log::debug!(
553 "Hedge quote: bid={} ask={} on {}",
554 quote.bid_price,
555 quote.ask_price,
556 quote.instrument_id,
557 );
558 }
559
560 Ok(())
561 }
562
563 fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
564 let qty = event.last_qty.as_f64();
565 let signed_qty = match event.order_side {
566 OrderSide::Buy => qty,
567 OrderSide::Sell => -qty,
568 _ => 0.0,
569 };
570
571 if event.instrument_id == self.config.hedge_instrument_id {
572 self.hedge_position += signed_qty;
573
574 let is_closed = self
575 .cache()
576 .order(&event.client_order_id)
577 .is_some_and(|o| o.is_closed());
578
579 if is_closed {
580 self.hedge_pending = false;
581 }
582 } else if Some(event.instrument_id) == self.call_instrument_id {
583 self.call_position += signed_qty;
584 } else if Some(event.instrument_id) == self.put_instrument_id {
585 self.put_position += signed_qty;
586 }
587
588 log::info!(
589 "Fill: {} {:.4} {} | positions: call={}, put={}, hedge={}",
590 event.order_side,
591 event.last_qty,
592 event.instrument_id,
593 self.call_position,
594 self.put_position,
595 self.hedge_position,
596 );
597
598 Ok(())
599 }
600
601 fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
602 let instrument_id = self
603 .cache()
604 .order(&event.client_order_id)
605 .map(|o| o.instrument_id());
606
607 if instrument_id == Some(self.config.hedge_instrument_id) {
608 self.hedge_pending = false;
609 }
610
611 Ok(())
612 }
613
614 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
615 if event.name.as_str() == REHEDGE_TIMER {
616 self.check_rehedge()?;
617 }
618
619 Ok(())
620 }
621}