Skip to main content

nautilus_trading/examples/strategies/hurst_vpin_directional/
strategy.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Hurst/VPIN directional strategy implementation.
17
18use std::{collections::VecDeque, fmt::Debug};
19
20use ahash::AHashSet;
21use nautilus_common::actor::DataActor;
22use nautilus_model::{
23    data::{Bar, QuoteTick, TradeTick},
24    enums::{AggressorSide, OrderSide, PositionSide, TimeInForce},
25    events::{
26        OrderCanceled, OrderDenied, OrderExpired, OrderFilled, OrderRejected, PositionClosed,
27        PositionOpened,
28    },
29    identifiers::{ClientOrderId, PositionId, StrategyId},
30    orders::{Order, OrderCore},
31    types::Quantity,
32};
33
34use super::config::HurstVpinDirectionalConfig;
35use crate::{
36    nautilus_strategy,
37    strategy::{Strategy, StrategyCore},
38};
39
40/// Directional strategy combining a Hurst-exponent regime filter on dollar bars
41/// with a VPIN (Volume-synchronized Probability of Informed Trading) signal
42/// derived from trade aggressor flow, with entry timing gated by the live
43/// quote stream.
44///
45/// The strategy is sampled on information-driven (value) bars rather than
46/// clock time, following Lopez de Prado (*Advances in Financial Machine
47/// Learning*, Chapter 2). The Hurst exponent is estimated by rescaled range
48/// over the window of dollar bar log returns. VPIN is averaged over completed
49/// volume buckets, with a signed variant carrying the net informed direction.
50pub struct HurstVpinDirectional {
51    pub(super) core: StrategyCore,
52    pub(super) config: HurstVpinDirectionalConfig,
53    pub(super) returns: VecDeque<f64>,
54    pub(super) abs_imbalances: VecDeque<f64>,
55    pub(super) signed_imbalances: VecDeque<f64>,
56    pub(super) last_close: Option<f64>,
57    pub(super) bucket_buy_volume: f64,
58    pub(super) bucket_sell_volume: f64,
59    pub(super) hurst: Option<f64>,
60    pub(super) vpin: Option<f64>,
61    pub(super) signed_vpin: Option<f64>,
62    pub(super) position_opened_ns: Option<u64>,
63    pub(super) exit_cooldown: bool,
64    pub(super) entry_order_id: Option<ClientOrderId>,
65    pub(super) exit_order_ids: AHashSet<ClientOrderId>,
66}
67
68impl HurstVpinDirectional {
69    /// Creates a new [`HurstVpinDirectional`] instance from config.
70    #[must_use]
71    pub fn new(config: HurstVpinDirectionalConfig) -> Self {
72        let hurst_window = config.hurst_window;
73        let vpin_window = config.vpin_window;
74        Self {
75            core: StrategyCore::new(config.base.clone()),
76            config,
77            returns: VecDeque::with_capacity(hurst_window),
78            abs_imbalances: VecDeque::with_capacity(vpin_window),
79            signed_imbalances: VecDeque::with_capacity(vpin_window),
80            last_close: None,
81            bucket_buy_volume: 0.0,
82            bucket_sell_volume: 0.0,
83            hurst: None,
84            vpin: None,
85            signed_vpin: None,
86            position_opened_ns: None,
87            exit_cooldown: false,
88            entry_order_id: None,
89            exit_order_ids: AHashSet::new(),
90        }
91    }
92
93    pub(super) fn signals_ready(&self) -> bool {
94        self.hurst.is_some()
95            && self.vpin.is_some()
96            && self.signed_vpin.is_some()
97            && self.returns.len() == self.config.hurst_window
98            && self.abs_imbalances.len() == self.config.vpin_window
99    }
100
101    pub(super) fn push_bounded(values: &mut VecDeque<f64>, capacity: usize, value: f64) {
102        if values.len() == capacity {
103            values.pop_front();
104        }
105        values.push_back(value);
106    }
107
108    pub(super) fn rolling_mean(values: &VecDeque<f64>) -> Option<f64> {
109        if values.is_empty() {
110            return None;
111        }
112        Some(values.iter().copied().sum::<f64>() / values.len() as f64)
113    }
114
115    #[allow(
116        clippy::cognitive_complexity,
117        reason = "R/S regression is inherently nested"
118    )]
119    pub(super) fn estimate_hurst(&self) -> Option<f64> {
120        if self.returns.len() < self.config.hurst_window {
121            return None;
122        }
123
124        let returns: Vec<f64> = self.returns.iter().copied().collect();
125        let mut log_lags: Vec<f64> = Vec::new();
126        let mut log_rs: Vec<f64> = Vec::new();
127
128        for &lag in &self.config.hurst_lags {
129            if lag < 2 || lag > returns.len() {
130                continue;
131            }
132
133            let mut rs_values: Vec<f64> = Vec::new();
134
135            for start in (0..=returns.len().saturating_sub(lag)).step_by(lag) {
136                let chunk = &returns[start..start + lag];
137                let mean = chunk.iter().sum::<f64>() / lag as f64;
138
139                let mut running = 0.0f64;
140                let mut cum_min = 0.0f64;
141                let mut cum_max = 0.0f64;
142                let mut var_sum = 0.0f64;
143
144                for value in chunk {
145                    let deviation = value - mean;
146                    running += deviation;
147                    if running < cum_min {
148                        cum_min = running;
149                    }
150
151                    if running > cum_max {
152                        cum_max = running;
153                    }
154                    var_sum += deviation * deviation;
155                }
156                let r_range = cum_max - cum_min;
157                let stdev = (var_sum / lag as f64).sqrt();
158                if r_range > 0.0 && stdev > 0.0 {
159                    rs_values.push(r_range / stdev);
160                }
161            }
162
163            if !rs_values.is_empty() {
164                let avg_rs = rs_values.iter().copied().sum::<f64>() / rs_values.len() as f64;
165                log_lags.push((lag as f64).ln());
166                log_rs.push(avg_rs.ln());
167            }
168        }
169
170        if log_lags.len() < 2 {
171            return None;
172        }
173
174        let n = log_lags.len() as f64;
175        let sx: f64 = log_lags.iter().sum();
176        let sy: f64 = log_rs.iter().sum();
177        let sxx: f64 = log_lags.iter().map(|x| x * x).sum();
178        let sxy: f64 = log_lags.iter().zip(log_rs.iter()).map(|(x, y)| x * y).sum();
179        let denom = n * sxx - sx * sx;
180        if denom == 0.0 {
181            return None;
182        }
183        Some((n * sxy - sx * sy) / denom)
184    }
185
186    pub(super) fn try_open_position(&mut self) -> anyhow::Result<()> {
187        let (hurst, vpin, signed_vpin) = match (self.hurst, self.vpin, self.signed_vpin) {
188            (Some(h), Some(v), Some(s)) => (h, v, s),
189            _ => return Ok(()),
190        };
191
192        if hurst < self.config.hurst_enter || vpin < self.config.vpin_threshold {
193            return Ok(());
194        }
195
196        if signed_vpin > 0.0 {
197            self.submit_entry(OrderSide::Buy)?;
198        } else if signed_vpin < 0.0 {
199            self.submit_entry(OrderSide::Sell)?;
200        }
201
202        Ok(())
203    }
204
205    pub(super) fn check_regime_exit(&mut self) -> anyhow::Result<()> {
206        if !self.exit_order_ids.is_empty() {
207            return Ok(());
208        }
209        let hurst = match self.hurst {
210            Some(h) => h,
211            None => return Ok(()),
212        };
213
214        if hurst >= self.config.hurst_exit {
215            return Ok(());
216        }
217
218        let has_open_position = self.has_open_position();
219        if !has_open_position {
220            return Ok(());
221        }
222
223        log::info!("Regime decay (Hurst={hurst:.3}); closing position");
224        self.submit_close()
225    }
226
227    pub(super) fn check_holding_timeout(&mut self, tick: &QuoteTick) -> anyhow::Result<()> {
228        if !self.exit_order_ids.is_empty() {
229            return Ok(());
230        }
231        let opened_ns = match self.position_opened_ns {
232            Some(ns) => ns,
233            None => return Ok(()),
234        };
235        let held_ns = tick.ts_event.as_u64().saturating_sub(opened_ns);
236        if held_ns < self.config.max_holding_secs * 1_000_000_000 {
237            return Ok(());
238        }
239
240        log::info!("Holding timeout reached; closing position");
241        self.submit_close()
242    }
243
244    fn submit_entry(&mut self, side: OrderSide) -> anyhow::Result<()> {
245        let order = self.core.order_factory().market(
246            self.config.instrument_id,
247            side,
248            self.config.trade_size,
249            Some(TimeInForce::Ioc),
250            None, // reduce_only
251            None, // quote_quantity
252            None, // exec_algorithm_id
253            None, // exec_algorithm_params
254            None, // tags
255            None, // client_order_id
256        );
257        self.entry_order_id = Some(order.client_order_id());
258        self.submit_order(order, None, None)
259    }
260
261    fn submit_close(&mut self) -> anyhow::Result<()> {
262        let instrument_id = self.config.instrument_id;
263        let strategy_id = StrategyId::from(self.actor_id.inner().as_str());
264
265        let positions: Vec<(PositionId, Quantity, PositionSide)> = self
266            .cache()
267            .positions_open(None, Some(&instrument_id), Some(&strategy_id), None, None)
268            .iter()
269            .map(|p| (p.id, p.quantity, p.side))
270            .collect();
271
272        if positions.is_empty() {
273            return Ok(());
274        }
275
276        self.exit_cooldown = true;
277
278        for (position_id, quantity, side) in positions {
279            let closing_side = OrderCore::closing_side(side);
280            let close_order = self.core.order_factory().market(
281                instrument_id,
282                closing_side,
283                quantity,
284                Some(TimeInForce::Ioc),
285                Some(true), // reduce_only
286                None,
287                None,
288                None,
289                None,
290                None,
291            );
292            self.exit_order_ids.insert(close_order.client_order_id());
293            self.submit_order(close_order, Some(position_id), None)?;
294        }
295
296        Ok(())
297    }
298
299    fn has_open_position(&self) -> bool {
300        let instrument_id = self.config.instrument_id;
301        let strategy_id = StrategyId::from(self.actor_id.inner().as_str());
302        !self
303            .cache()
304            .positions_open(None, Some(&instrument_id), Some(&strategy_id), None, None)
305            .is_empty()
306    }
307
308    fn clear_latch_for(&mut self, client_order_id: &ClientOrderId) {
309        if self.entry_order_id.as_ref() == Some(client_order_id) {
310            self.entry_order_id = None;
311        }
312        self.exit_order_ids.remove(client_order_id);
313    }
314}
315
316nautilus_strategy!(HurstVpinDirectional, {
317    fn on_position_opened(&mut self, event: PositionOpened) {
318        if event.instrument_id == self.config.instrument_id {
319            self.position_opened_ns = Some(event.ts_event.as_u64());
320        }
321    }
322
323    fn on_position_closed(&mut self, event: PositionClosed) {
324        if event.instrument_id == self.config.instrument_id {
325            self.position_opened_ns = None;
326        }
327    }
328
329    fn on_order_rejected(&mut self, event: OrderRejected) {
330        if event.instrument_id == self.config.instrument_id {
331            self.clear_latch_for(&event.client_order_id);
332        }
333    }
334
335    fn on_order_expired(&mut self, event: OrderExpired) {
336        if event.instrument_id == self.config.instrument_id {
337            self.clear_latch_for(&event.client_order_id);
338        }
339    }
340
341    fn on_order_denied(&mut self, event: OrderDenied) {
342        if event.instrument_id == self.config.instrument_id {
343            self.clear_latch_for(&event.client_order_id);
344        }
345    }
346});
347
348impl Debug for HurstVpinDirectional {
349    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350        f.debug_struct(stringify!(HurstVpinDirectional))
351            .field("config", &self.config)
352            .field("hurst", &self.hurst)
353            .field("vpin", &self.vpin)
354            .field("signed_vpin", &self.signed_vpin)
355            .finish()
356    }
357}
358
359impl DataActor for HurstVpinDirectional {
360    fn on_start(&mut self) -> anyhow::Result<()> {
361        let instrument_id = self.config.instrument_id;
362        let bar_instrument_id = self.config.bar_type.instrument_id();
363        if bar_instrument_id != instrument_id {
364            anyhow::bail!(
365                "bar_type instrument {bar_instrument_id} does not match traded instrument {instrument_id}"
366            );
367        }
368        {
369            let cache = self.cache();
370            if cache.instrument(&instrument_id).is_none() {
371                anyhow::bail!("Instrument {instrument_id} not found in cache");
372            }
373        }
374
375        self.subscribe_bars(self.config.bar_type, None, None);
376        self.subscribe_quotes(instrument_id, None, None);
377        self.subscribe_trades(instrument_id, None, None);
378        Ok(())
379    }
380
381    fn on_stop(&mut self) -> anyhow::Result<()> {
382        let instrument_id = self.config.instrument_id;
383        self.cancel_all_orders(instrument_id, None, None)?;
384        self.close_all_positions(instrument_id, None, None, None, None, None, None)?;
385        self.unsubscribe_bars(self.config.bar_type, None, None);
386        self.unsubscribe_quotes(instrument_id, None, None);
387        self.unsubscribe_trades(instrument_id, None, None);
388        Ok(())
389    }
390
391    fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
392        let size = tick.size.as_f64();
393        match tick.aggressor_side {
394            AggressorSide::Buyer => self.bucket_buy_volume += size,
395            AggressorSide::Seller => self.bucket_sell_volume += size,
396            _ => {}
397        }
398        Ok(())
399    }
400
401    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
402        let close = bar.close.as_f64();
403
404        if let Some(prev) = self.last_close
405            && prev > 0.0
406            && close > 0.0
407        {
408            let window = self.config.hurst_window;
409            Self::push_bounded(&mut self.returns, window, (close / prev).ln());
410        }
411        self.last_close = Some(close);
412
413        let total = self.bucket_buy_volume + self.bucket_sell_volume;
414        if total > 0.0 {
415            let imbalance = (self.bucket_buy_volume - self.bucket_sell_volume) / total;
416            let vpin_window = self.config.vpin_window;
417            Self::push_bounded(&mut self.abs_imbalances, vpin_window, imbalance.abs());
418            Self::push_bounded(&mut self.signed_imbalances, vpin_window, imbalance);
419        }
420        self.bucket_buy_volume = 0.0;
421        self.bucket_sell_volume = 0.0;
422
423        self.hurst = self.estimate_hurst();
424        self.vpin = Self::rolling_mean(&self.abs_imbalances);
425        self.signed_vpin = Self::rolling_mean(&self.signed_imbalances);
426
427        if let Some(h) = self.hurst {
428            log::info!(
429                "Hurst={h:.3} VPIN={:.3} signed={:+.3} bar_close={close:.2}",
430                self.vpin.unwrap_or(0.0),
431                self.signed_vpin.unwrap_or(0.0),
432            );
433        }
434
435        self.exit_cooldown = false;
436        self.check_regime_exit()
437    }
438
439    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
440        if !self.signals_ready() {
441            return Ok(());
442        }
443
444        if self.has_open_position() {
445            return self.check_holding_timeout(quote);
446        }
447
448        if self.exit_cooldown {
449            return Ok(());
450        }
451
452        if self.entry_order_id.is_some() || !self.exit_order_ids.is_empty() {
453            return Ok(());
454        }
455
456        let strategy_id = StrategyId::from(self.actor_id.inner().as_str());
457        let has_working = {
458            let cache = self.cache();
459            !cache
460                .orders_open(
461                    None,
462                    Some(&self.config.instrument_id),
463                    Some(&strategy_id),
464                    None,
465                    None,
466                )
467                .is_empty()
468                || !cache
469                    .orders_inflight(
470                        None,
471                        Some(&self.config.instrument_id),
472                        Some(&strategy_id),
473                        None,
474                        None,
475                    )
476                    .is_empty()
477        };
478
479        if has_working {
480            return Ok(());
481        }
482
483        self.try_open_position()
484    }
485
486    fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
487        if event.instrument_id != self.config.instrument_id {
488            return Ok(());
489        }
490
491        let closed = self
492            .cache()
493            .order(&event.client_order_id)
494            .is_some_and(|o| o.is_closed());
495        if closed {
496            self.clear_latch_for(&event.client_order_id);
497        }
498        Ok(())
499    }
500
501    fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
502        if event.instrument_id != self.config.instrument_id {
503            return Ok(());
504        }
505        self.clear_latch_for(&event.client_order_id);
506        Ok(())
507    }
508
509    fn on_reset(&mut self) -> anyhow::Result<()> {
510        self.returns.clear();
511        self.abs_imbalances.clear();
512        self.signed_imbalances.clear();
513        self.last_close = None;
514        self.bucket_buy_volume = 0.0;
515        self.bucket_sell_volume = 0.0;
516        self.hurst = None;
517        self.vpin = None;
518        self.signed_vpin = None;
519        self.position_opened_ns = None;
520        self.exit_cooldown = false;
521        self.entry_order_id = None;
522        self.exit_order_ids.clear();
523        Ok(())
524    }
525}