Skip to main content

nautilus_trading/examples/actors/imbalance/
actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Order book imbalance actor implementation.
17
18use std::{collections::BTreeMap, fmt::Debug};
19
20use ahash::AHashMap;
21use nautilus_common::{
22    actor::{DataActor, DataActorConfig, DataActorCore},
23    nautilus_actor,
24};
25use nautilus_model::{
26    data::OrderBookDeltas,
27    enums::{BookType, OrderSide},
28    identifiers::{ActorId, InstrumentId},
29};
30
31/// Per-instrument imbalance tracking state.
32#[derive(Debug)]
33pub struct ImbalanceState {
34    /// Total number of book delta batches processed.
35    pub update_count: u64,
36    /// Cumulative bid-side volume across all updates.
37    pub bid_volume_total: f64,
38    /// Cumulative ask-side volume across all updates.
39    pub ask_volume_total: f64,
40}
41
42impl ImbalanceState {
43    /// Creates a new [`ImbalanceState`] with zero counts.
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            update_count: 0,
48            bid_volume_total: 0.0,
49            ask_volume_total: 0.0,
50        }
51    }
52
53    /// Returns the cumulative quoted volume imbalance, or 0.0 if no volume observed.
54    #[must_use]
55    pub fn imbalance(&self) -> f64 {
56        let total = self.bid_volume_total + self.ask_volume_total;
57        if total > 0.0 {
58            (self.bid_volume_total - self.ask_volume_total) / total
59        } else {
60            0.0
61        }
62    }
63}
64
65impl Default for ImbalanceState {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71/// Actor that tracks bid/ask quoted volume imbalance from order book deltas.
72///
73/// On start, subscribes to [`OrderBookDeltas`] for each configured instrument.
74/// On each update, sums the resting size at each updated level per side and
75/// accumulates running totals. Logs the cumulative imbalance at a configurable
76/// interval. On stop, prints a per-instrument summary.
77pub struct BookImbalanceActor {
78    core: DataActorCore,
79    instrument_ids: Vec<InstrumentId>,
80    log_interval: u64,
81    states: AHashMap<InstrumentId, ImbalanceState>,
82}
83
84impl BookImbalanceActor {
85    /// Creates a new [`BookImbalanceActor`] from config.
86    #[must_use]
87    pub fn from_config(config: super::config::BookImbalanceActorConfig) -> Self {
88        Self::new(config.instrument_ids, config.log_interval, config.actor_id)
89    }
90
91    /// Creates a new [`BookImbalanceActor`].
92    ///
93    /// `actor_id` sets the actor identifier. Pass `None` for the default
94    /// `"BOOK_IMBALANCE-001"`.
95    ///
96    /// `log_interval` controls how often (in update count) a progress line
97    /// is printed. Set to 0 to disable periodic logging.
98    #[must_use]
99    pub fn new(
100        instrument_ids: Vec<InstrumentId>,
101        log_interval: u64,
102        actor_id: Option<ActorId>,
103    ) -> Self {
104        let config = DataActorConfig {
105            actor_id: Some(actor_id.unwrap_or(ActorId::from("BOOK_IMBALANCE-001"))),
106            ..Default::default()
107        };
108        Self {
109            core: DataActorCore::new(config),
110            instrument_ids,
111            log_interval,
112            states: AHashMap::new(),
113        }
114    }
115
116    /// Returns the per-instrument imbalance states.
117    #[must_use]
118    pub fn states(&self) -> &AHashMap<InstrumentId, ImbalanceState> {
119        &self.states
120    }
121
122    /// Prints a summary of all tracked instruments to stdout.
123    pub fn print_summary(&self) {
124        println!("\n--- Book imbalance summary ---");
125        let sorted: BTreeMap<String, &ImbalanceState> = self
126            .states
127            .iter()
128            .map(|(id, state)| (id.to_string(), state))
129            .collect();
130
131        for (id, state) in &sorted {
132            println!(
133                "  {id}  updates: {}  bid_vol: {:.2}  ask_vol: {:.2}  imbalance: {:.4}",
134                state.update_count,
135                state.bid_volume_total,
136                state.ask_volume_total,
137                state.imbalance(),
138            );
139        }
140    }
141}
142
143nautilus_actor!(BookImbalanceActor);
144
145impl Debug for BookImbalanceActor {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        f.debug_struct(stringify!(BookImbalanceActor))
148            .field("instrument_ids", &self.instrument_ids)
149            .field("log_interval", &self.log_interval)
150            .finish()
151    }
152}
153
154impl DataActor for BookImbalanceActor {
155    fn on_start(&mut self) -> anyhow::Result<()> {
156        let ids = self.instrument_ids.clone();
157        for instrument_id in ids {
158            self.subscribe_book_deltas(
159                instrument_id,
160                BookType::L2_MBP,
161                None,  // depth
162                None,  // client_id
163                false, // managed
164                None,  // params
165            );
166        }
167        Ok(())
168    }
169
170    fn on_stop(&mut self) -> anyhow::Result<()> {
171        self.print_summary();
172        Ok(())
173    }
174
175    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
176        let mut bid_volume = 0.0;
177        let mut ask_volume = 0.0;
178
179        for delta in &deltas.deltas {
180            let size = delta.order.size.as_f64();
181            match delta.order.side {
182                OrderSide::Buy => bid_volume += size,
183                OrderSide::Sell => ask_volume += size,
184                _ => {}
185            }
186        }
187
188        let state = self.states.entry(deltas.instrument_id).or_default();
189
190        state.update_count += 1;
191        state.bid_volume_total += bid_volume;
192        state.ask_volume_total += ask_volume;
193
194        if self.log_interval > 0 && state.update_count.is_multiple_of(self.log_interval) {
195            println!(
196                "[{}] update #{}: batch bid={:.2} ask={:.2}  cumulative imbalance={:.4}",
197                deltas.instrument_id,
198                state.update_count,
199                bid_volume,
200                ask_volume,
201                state.imbalance(),
202            );
203        }
204
205        Ok(())
206    }
207}