nautilus_trading/examples/actors/imbalance/
actor.rs1use std::{collections::BTreeMap, fmt::Debug};
19
20use ahash::AHashMap;
21use nautilus_common::{
22 actor::{DataActor, DataActorConfig, DataActorCore},
23 nautilus_actor,
24};
25use nautilus_model::{
26 data::OrderBookDeltas,
27 enums::{BookType, OrderSide},
28 identifiers::{ActorId, InstrumentId},
29};
30
31#[derive(Debug)]
33pub struct ImbalanceState {
34 pub update_count: u64,
36 pub bid_volume_total: f64,
38 pub ask_volume_total: f64,
40}
41
42impl ImbalanceState {
43 #[must_use]
45 pub fn new() -> Self {
46 Self {
47 update_count: 0,
48 bid_volume_total: 0.0,
49 ask_volume_total: 0.0,
50 }
51 }
52
53 #[must_use]
55 pub fn imbalance(&self) -> f64 {
56 let total = self.bid_volume_total + self.ask_volume_total;
57 if total > 0.0 {
58 (self.bid_volume_total - self.ask_volume_total) / total
59 } else {
60 0.0
61 }
62 }
63}
64
65impl Default for ImbalanceState {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71pub struct BookImbalanceActor {
78 core: DataActorCore,
79 instrument_ids: Vec<InstrumentId>,
80 log_interval: u64,
81 states: AHashMap<InstrumentId, ImbalanceState>,
82}
83
84impl BookImbalanceActor {
85 #[must_use]
87 pub fn from_config(config: super::config::BookImbalanceActorConfig) -> Self {
88 Self::new(config.instrument_ids, config.log_interval, config.actor_id)
89 }
90
91 #[must_use]
99 pub fn new(
100 instrument_ids: Vec<InstrumentId>,
101 log_interval: u64,
102 actor_id: Option<ActorId>,
103 ) -> Self {
104 let config = DataActorConfig {
105 actor_id: Some(actor_id.unwrap_or(ActorId::from("BOOK_IMBALANCE-001"))),
106 ..Default::default()
107 };
108 Self {
109 core: DataActorCore::new(config),
110 instrument_ids,
111 log_interval,
112 states: AHashMap::new(),
113 }
114 }
115
116 #[must_use]
118 pub fn states(&self) -> &AHashMap<InstrumentId, ImbalanceState> {
119 &self.states
120 }
121
122 pub fn print_summary(&self) {
124 println!("\n--- Book imbalance summary ---");
125 let sorted: BTreeMap<String, &ImbalanceState> = self
126 .states
127 .iter()
128 .map(|(id, state)| (id.to_string(), state))
129 .collect();
130
131 for (id, state) in &sorted {
132 println!(
133 " {id} updates: {} bid_vol: {:.2} ask_vol: {:.2} imbalance: {:.4}",
134 state.update_count,
135 state.bid_volume_total,
136 state.ask_volume_total,
137 state.imbalance(),
138 );
139 }
140 }
141}
142
143nautilus_actor!(BookImbalanceActor);
144
145impl Debug for BookImbalanceActor {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 f.debug_struct(stringify!(BookImbalanceActor))
148 .field("instrument_ids", &self.instrument_ids)
149 .field("log_interval", &self.log_interval)
150 .finish()
151 }
152}
153
154impl DataActor for BookImbalanceActor {
155 fn on_start(&mut self) -> anyhow::Result<()> {
156 let ids = self.instrument_ids.clone();
157 for instrument_id in ids {
158 self.subscribe_book_deltas(
159 instrument_id,
160 BookType::L2_MBP,
161 None, None, false, None, );
166 }
167 Ok(())
168 }
169
170 fn on_stop(&mut self) -> anyhow::Result<()> {
171 self.print_summary();
172 Ok(())
173 }
174
175 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
176 let mut bid_volume = 0.0;
177 let mut ask_volume = 0.0;
178
179 for delta in &deltas.deltas {
180 let size = delta.order.size.as_f64();
181 match delta.order.side {
182 OrderSide::Buy => bid_volume += size,
183 OrderSide::Sell => ask_volume += size,
184 _ => {}
185 }
186 }
187
188 let state = self.states.entry(deltas.instrument_id).or_default();
189
190 state.update_count += 1;
191 state.bid_volume_total += bid_volume;
192 state.ask_volume_total += ask_volume;
193
194 if self.log_interval > 0 && state.update_count.is_multiple_of(self.log_interval) {
195 println!(
196 "[{}] update #{}: batch bid={:.2} ask={:.2} cumulative imbalance={:.4}",
197 deltas.instrument_id,
198 state.update_count,
199 bid_volume,
200 ask_volume,
201 state.imbalance(),
202 );
203 }
204
205 Ok(())
206 }
207}