1use std::{collections::VecDeque, fmt::Debug};
19
20use ahash::AHashSet;
21use nautilus_common::actor::DataActor;
22use nautilus_model::{
23 data::{Bar, QuoteTick, TradeTick},
24 enums::{AggressorSide, OrderSide, PositionSide, TimeInForce},
25 events::{
26 OrderCanceled, OrderDenied, OrderExpired, OrderFilled, OrderRejected, PositionClosed,
27 PositionOpened,
28 },
29 identifiers::{ClientOrderId, PositionId, StrategyId},
30 orders::{Order, OrderCore},
31 types::Quantity,
32};
33
34use super::config::HurstVpinDirectionalConfig;
35use crate::{
36 nautilus_strategy,
37 strategy::{Strategy, StrategyCore},
38};
39
40pub struct HurstVpinDirectional {
51 pub(super) core: StrategyCore,
52 pub(super) config: HurstVpinDirectionalConfig,
53 pub(super) returns: VecDeque<f64>,
54 pub(super) abs_imbalances: VecDeque<f64>,
55 pub(super) signed_imbalances: VecDeque<f64>,
56 pub(super) last_close: Option<f64>,
57 pub(super) bucket_buy_volume: f64,
58 pub(super) bucket_sell_volume: f64,
59 pub(super) hurst: Option<f64>,
60 pub(super) vpin: Option<f64>,
61 pub(super) signed_vpin: Option<f64>,
62 pub(super) position_opened_ns: Option<u64>,
63 pub(super) exit_cooldown: bool,
64 pub(super) entry_order_id: Option<ClientOrderId>,
65 pub(super) exit_order_ids: AHashSet<ClientOrderId>,
66}
67
68impl HurstVpinDirectional {
69 #[must_use]
71 pub fn new(config: HurstVpinDirectionalConfig) -> Self {
72 let hurst_window = config.hurst_window;
73 let vpin_window = config.vpin_window;
74 Self {
75 core: StrategyCore::new(config.base.clone()),
76 config,
77 returns: VecDeque::with_capacity(hurst_window),
78 abs_imbalances: VecDeque::with_capacity(vpin_window),
79 signed_imbalances: VecDeque::with_capacity(vpin_window),
80 last_close: None,
81 bucket_buy_volume: 0.0,
82 bucket_sell_volume: 0.0,
83 hurst: None,
84 vpin: None,
85 signed_vpin: None,
86 position_opened_ns: None,
87 exit_cooldown: false,
88 entry_order_id: None,
89 exit_order_ids: AHashSet::new(),
90 }
91 }
92
93 pub(super) fn signals_ready(&self) -> bool {
94 self.hurst.is_some()
95 && self.vpin.is_some()
96 && self.signed_vpin.is_some()
97 && self.returns.len() == self.config.hurst_window
98 && self.abs_imbalances.len() == self.config.vpin_window
99 }
100
101 pub(super) fn push_bounded(values: &mut VecDeque<f64>, capacity: usize, value: f64) {
102 if values.len() == capacity {
103 values.pop_front();
104 }
105 values.push_back(value);
106 }
107
108 pub(super) fn rolling_mean(values: &VecDeque<f64>) -> Option<f64> {
109 if values.is_empty() {
110 return None;
111 }
112 Some(values.iter().copied().sum::<f64>() / values.len() as f64)
113 }
114
115 #[allow(
116 clippy::cognitive_complexity,
117 reason = "R/S regression is inherently nested"
118 )]
119 pub(super) fn estimate_hurst(&self) -> Option<f64> {
120 if self.returns.len() < self.config.hurst_window {
121 return None;
122 }
123
124 let returns: Vec<f64> = self.returns.iter().copied().collect();
125 let mut log_lags: Vec<f64> = Vec::new();
126 let mut log_rs: Vec<f64> = Vec::new();
127
128 for &lag in &self.config.hurst_lags {
129 if lag < 2 || lag > returns.len() {
130 continue;
131 }
132
133 let mut rs_values: Vec<f64> = Vec::new();
134
135 for start in (0..=returns.len().saturating_sub(lag)).step_by(lag) {
136 let chunk = &returns[start..start + lag];
137 let mean = chunk.iter().sum::<f64>() / lag as f64;
138
139 let mut running = 0.0f64;
140 let mut cum_min = 0.0f64;
141 let mut cum_max = 0.0f64;
142 let mut var_sum = 0.0f64;
143
144 for value in chunk {
145 let deviation = value - mean;
146 running += deviation;
147 if running < cum_min {
148 cum_min = running;
149 }
150
151 if running > cum_max {
152 cum_max = running;
153 }
154 var_sum += deviation * deviation;
155 }
156 let r_range = cum_max - cum_min;
157 let stdev = (var_sum / lag as f64).sqrt();
158 if r_range > 0.0 && stdev > 0.0 {
159 rs_values.push(r_range / stdev);
160 }
161 }
162
163 if !rs_values.is_empty() {
164 let avg_rs = rs_values.iter().copied().sum::<f64>() / rs_values.len() as f64;
165 log_lags.push((lag as f64).ln());
166 log_rs.push(avg_rs.ln());
167 }
168 }
169
170 if log_lags.len() < 2 {
171 return None;
172 }
173
174 let n = log_lags.len() as f64;
175 let sx: f64 = log_lags.iter().sum();
176 let sy: f64 = log_rs.iter().sum();
177 let sxx: f64 = log_lags.iter().map(|x| x * x).sum();
178 let sxy: f64 = log_lags.iter().zip(log_rs.iter()).map(|(x, y)| x * y).sum();
179 let denom = n * sxx - sx * sx;
180 if denom == 0.0 {
181 return None;
182 }
183 Some((n * sxy - sx * sy) / denom)
184 }
185
186 pub(super) fn try_open_position(&mut self) -> anyhow::Result<()> {
187 let (hurst, vpin, signed_vpin) = match (self.hurst, self.vpin, self.signed_vpin) {
188 (Some(h), Some(v), Some(s)) => (h, v, s),
189 _ => return Ok(()),
190 };
191
192 if hurst < self.config.hurst_enter || vpin < self.config.vpin_threshold {
193 return Ok(());
194 }
195
196 if signed_vpin > 0.0 {
197 self.submit_entry(OrderSide::Buy)?;
198 } else if signed_vpin < 0.0 {
199 self.submit_entry(OrderSide::Sell)?;
200 }
201
202 Ok(())
203 }
204
205 pub(super) fn check_regime_exit(&mut self) -> anyhow::Result<()> {
206 if !self.exit_order_ids.is_empty() {
207 return Ok(());
208 }
209 let hurst = match self.hurst {
210 Some(h) => h,
211 None => return Ok(()),
212 };
213
214 if hurst >= self.config.hurst_exit {
215 return Ok(());
216 }
217
218 let has_open_position = self.has_open_position();
219 if !has_open_position {
220 return Ok(());
221 }
222
223 log::info!("Regime decay (Hurst={hurst:.3}); closing position");
224 self.submit_close()
225 }
226
227 pub(super) fn check_holding_timeout(&mut self, tick: &QuoteTick) -> anyhow::Result<()> {
228 if !self.exit_order_ids.is_empty() {
229 return Ok(());
230 }
231 let opened_ns = match self.position_opened_ns {
232 Some(ns) => ns,
233 None => return Ok(()),
234 };
235 let held_ns = tick.ts_event.as_u64().saturating_sub(opened_ns);
236 if held_ns < self.config.max_holding_secs * 1_000_000_000 {
237 return Ok(());
238 }
239
240 log::info!("Holding timeout reached; closing position");
241 self.submit_close()
242 }
243
244 fn submit_entry(&mut self, side: OrderSide) -> anyhow::Result<()> {
245 let order = self.core.order_factory().market(
246 self.config.instrument_id,
247 side,
248 self.config.trade_size,
249 Some(TimeInForce::Ioc),
250 None, None, None, None, None, None, );
257 self.entry_order_id = Some(order.client_order_id());
258 self.submit_order(order, None, None)
259 }
260
261 fn submit_close(&mut self) -> anyhow::Result<()> {
262 let instrument_id = self.config.instrument_id;
263 let strategy_id = StrategyId::from(self.actor_id.inner().as_str());
264
265 let positions: Vec<(PositionId, Quantity, PositionSide)> = self
266 .cache()
267 .positions_open(None, Some(&instrument_id), Some(&strategy_id), None, None)
268 .iter()
269 .map(|p| (p.id, p.quantity, p.side))
270 .collect();
271
272 if positions.is_empty() {
273 return Ok(());
274 }
275
276 self.exit_cooldown = true;
277
278 for (position_id, quantity, side) in positions {
279 let closing_side = OrderCore::closing_side(side);
280 let close_order = self.core.order_factory().market(
281 instrument_id,
282 closing_side,
283 quantity,
284 Some(TimeInForce::Ioc),
285 Some(true), None,
287 None,
288 None,
289 None,
290 None,
291 );
292 self.exit_order_ids.insert(close_order.client_order_id());
293 self.submit_order(close_order, Some(position_id), None)?;
294 }
295
296 Ok(())
297 }
298
299 fn has_open_position(&self) -> bool {
300 let instrument_id = self.config.instrument_id;
301 let strategy_id = StrategyId::from(self.actor_id.inner().as_str());
302 !self
303 .cache()
304 .positions_open(None, Some(&instrument_id), Some(&strategy_id), None, None)
305 .is_empty()
306 }
307
308 fn clear_latch_for(&mut self, client_order_id: &ClientOrderId) {
309 if self.entry_order_id.as_ref() == Some(client_order_id) {
310 self.entry_order_id = None;
311 }
312 self.exit_order_ids.remove(client_order_id);
313 }
314}
315
316nautilus_strategy!(HurstVpinDirectional, {
317 fn on_position_opened(&mut self, event: PositionOpened) {
318 if event.instrument_id == self.config.instrument_id {
319 self.position_opened_ns = Some(event.ts_event.as_u64());
320 }
321 }
322
323 fn on_position_closed(&mut self, event: PositionClosed) {
324 if event.instrument_id == self.config.instrument_id {
325 self.position_opened_ns = None;
326 }
327 }
328
329 fn on_order_rejected(&mut self, event: OrderRejected) {
330 if event.instrument_id == self.config.instrument_id {
331 self.clear_latch_for(&event.client_order_id);
332 }
333 }
334
335 fn on_order_expired(&mut self, event: OrderExpired) {
336 if event.instrument_id == self.config.instrument_id {
337 self.clear_latch_for(&event.client_order_id);
338 }
339 }
340
341 fn on_order_denied(&mut self, event: OrderDenied) {
342 if event.instrument_id == self.config.instrument_id {
343 self.clear_latch_for(&event.client_order_id);
344 }
345 }
346});
347
348impl Debug for HurstVpinDirectional {
349 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350 f.debug_struct(stringify!(HurstVpinDirectional))
351 .field("config", &self.config)
352 .field("hurst", &self.hurst)
353 .field("vpin", &self.vpin)
354 .field("signed_vpin", &self.signed_vpin)
355 .finish()
356 }
357}
358
359impl DataActor for HurstVpinDirectional {
360 fn on_start(&mut self) -> anyhow::Result<()> {
361 let instrument_id = self.config.instrument_id;
362 let bar_instrument_id = self.config.bar_type.instrument_id();
363 if bar_instrument_id != instrument_id {
364 anyhow::bail!(
365 "bar_type instrument {bar_instrument_id} does not match traded instrument {instrument_id}"
366 );
367 }
368 {
369 let cache = self.cache();
370 if cache.instrument(&instrument_id).is_none() {
371 anyhow::bail!("Instrument {instrument_id} not found in cache");
372 }
373 }
374
375 self.subscribe_bars(self.config.bar_type, None, None);
376 self.subscribe_quotes(instrument_id, None, None);
377 self.subscribe_trades(instrument_id, None, None);
378 Ok(())
379 }
380
381 fn on_stop(&mut self) -> anyhow::Result<()> {
382 let instrument_id = self.config.instrument_id;
383 self.cancel_all_orders(instrument_id, None, None)?;
384 self.close_all_positions(instrument_id, None, None, None, None, None, None)?;
385 self.unsubscribe_bars(self.config.bar_type, None, None);
386 self.unsubscribe_quotes(instrument_id, None, None);
387 self.unsubscribe_trades(instrument_id, None, None);
388 Ok(())
389 }
390
391 fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
392 let size = tick.size.as_f64();
393 match tick.aggressor_side {
394 AggressorSide::Buyer => self.bucket_buy_volume += size,
395 AggressorSide::Seller => self.bucket_sell_volume += size,
396 _ => {}
397 }
398 Ok(())
399 }
400
401 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
402 let close = bar.close.as_f64();
403
404 if let Some(prev) = self.last_close
405 && prev > 0.0
406 && close > 0.0
407 {
408 let window = self.config.hurst_window;
409 Self::push_bounded(&mut self.returns, window, (close / prev).ln());
410 }
411 self.last_close = Some(close);
412
413 let total = self.bucket_buy_volume + self.bucket_sell_volume;
414 if total > 0.0 {
415 let imbalance = (self.bucket_buy_volume - self.bucket_sell_volume) / total;
416 let vpin_window = self.config.vpin_window;
417 Self::push_bounded(&mut self.abs_imbalances, vpin_window, imbalance.abs());
418 Self::push_bounded(&mut self.signed_imbalances, vpin_window, imbalance);
419 }
420 self.bucket_buy_volume = 0.0;
421 self.bucket_sell_volume = 0.0;
422
423 self.hurst = self.estimate_hurst();
424 self.vpin = Self::rolling_mean(&self.abs_imbalances);
425 self.signed_vpin = Self::rolling_mean(&self.signed_imbalances);
426
427 if let Some(h) = self.hurst {
428 log::info!(
429 "Hurst={h:.3} VPIN={:.3} signed={:+.3} bar_close={close:.2}",
430 self.vpin.unwrap_or(0.0),
431 self.signed_vpin.unwrap_or(0.0),
432 );
433 }
434
435 self.exit_cooldown = false;
436 self.check_regime_exit()
437 }
438
439 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
440 if !self.signals_ready() {
441 return Ok(());
442 }
443
444 if self.has_open_position() {
445 return self.check_holding_timeout(quote);
446 }
447
448 if self.exit_cooldown {
449 return Ok(());
450 }
451
452 if self.entry_order_id.is_some() || !self.exit_order_ids.is_empty() {
453 return Ok(());
454 }
455
456 let strategy_id = StrategyId::from(self.actor_id.inner().as_str());
457 let has_working = {
458 let cache = self.cache();
459 !cache
460 .orders_open(
461 None,
462 Some(&self.config.instrument_id),
463 Some(&strategy_id),
464 None,
465 None,
466 )
467 .is_empty()
468 || !cache
469 .orders_inflight(
470 None,
471 Some(&self.config.instrument_id),
472 Some(&strategy_id),
473 None,
474 None,
475 )
476 .is_empty()
477 };
478
479 if has_working {
480 return Ok(());
481 }
482
483 self.try_open_position()
484 }
485
486 fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
487 if event.instrument_id != self.config.instrument_id {
488 return Ok(());
489 }
490
491 let closed = self
492 .cache()
493 .order(&event.client_order_id)
494 .is_some_and(|o| o.is_closed());
495 if closed {
496 self.clear_latch_for(&event.client_order_id);
497 }
498 Ok(())
499 }
500
501 fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
502 if event.instrument_id != self.config.instrument_id {
503 return Ok(());
504 }
505 self.clear_latch_for(&event.client_order_id);
506 Ok(())
507 }
508
509 fn on_reset(&mut self) -> anyhow::Result<()> {
510 self.returns.clear();
511 self.abs_imbalances.clear();
512 self.signed_imbalances.clear();
513 self.last_close = None;
514 self.bucket_buy_volume = 0.0;
515 self.bucket_sell_volume = 0.0;
516 self.hurst = None;
517 self.vpin = None;
518 self.signed_vpin = None;
519 self.position_opened_ns = None;
520 self.exit_cooldown = false;
521 self.entry_order_id = None;
522 self.exit_order_ids.clear();
523 Ok(())
524 }
525}