nautilus_data/engine/
book.rs1use std::{
17 cell::{Ref, RefCell},
18 num::NonZeroUsize,
19 rc::Rc,
20};
21
22use indexmap::IndexMap;
23use nautilus_common::{
24 cache::Cache,
25 msgbus::{self, Handler, MStr, Topic},
26 timer::TimeEvent,
27};
28use nautilus_model::{
29 data::{OrderBookDeltas, OrderBookDepth10},
30 identifiers::{InstrumentId, Venue},
31 instruments::Instrument,
32};
33use ustr::Ustr;
34
35#[derive(Clone, Debug)]
37pub struct BookSnapshotInfo {
38 pub instrument_id: InstrumentId,
39 pub venue: Venue,
40 pub is_composite: bool,
41 pub root: Ustr,
42 pub topic: MStr<Topic>,
43 pub interval_ms: NonZeroUsize,
44}
45
46pub(crate) type BookSnapshotInfos = Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>;
51
52pub(crate) type BookSnapshotKey = (InstrumentId, NonZeroUsize);
54
55pub(crate) enum BookSnapshotUnsubscribeResult {
57 NotSubscribed,
59 Decremented,
61 Removed,
63}
64
65#[derive(Debug)]
71pub struct BookUpdater {
72 pub id: Ustr,
73 pub instrument_id: InstrumentId,
74 pub cache: Rc<RefCell<Cache>>,
75}
76
77impl BookUpdater {
78 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
80 Self {
81 id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
82 instrument_id: *instrument_id,
83 cache,
84 }
85 }
86}
87
88impl Handler<OrderBookDeltas> for BookUpdater {
89 fn id(&self) -> Ustr {
90 self.id
91 }
92
93 fn handle(&self, deltas: &OrderBookDeltas) {
94 if let Some(book) = self
95 .cache
96 .borrow_mut()
97 .order_book_mut(&deltas.instrument_id)
98 && let Err(e) = book.apply_deltas(deltas)
99 {
100 log::error!("Failed to apply deltas: {e}");
101 }
102 }
103}
104
105impl Handler<OrderBookDepth10> for BookUpdater {
106 fn id(&self) -> Ustr {
107 self.id
108 }
109
110 fn handle(&self, depth: &OrderBookDepth10) {
111 if let Some(book) = self.cache.borrow_mut().order_book_mut(&depth.instrument_id)
112 && let Err(e) = book.apply_depth(depth)
113 {
114 log::error!("Failed to apply depth: {e}");
115 }
116 }
117}
118
119#[derive(Debug)]
125pub struct BookSnapshotter {
126 pub timer_name: Ustr,
127 pub interval_ms: NonZeroUsize,
128 pub snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
129 pub cache: Rc<RefCell<Cache>>,
130}
131
132impl BookSnapshotter {
133 pub fn new(
135 interval_ms: NonZeroUsize,
136 snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
137 cache: Rc<RefCell<Cache>>,
138 ) -> Self {
139 let timer_name = format!("OrderBookSnapshots|{interval_ms}");
140
141 Self {
142 timer_name: Ustr::from(&timer_name),
143 interval_ms,
144 snapshot_infos,
145 cache,
146 }
147 }
148
149 pub fn snapshot(&self, _event: TimeEvent) {
150 let snapshot_infos: Vec<BookSnapshotInfo> =
151 self.snapshot_infos.borrow().values().cloned().collect();
152
153 log::debug!(
154 "BookSnapshotter.snapshot called for {} subscriptions at {}ms",
155 snapshot_infos.len(),
156 self.interval_ms,
157 );
158
159 let cache = self.cache.borrow();
160
161 for snap_info in snapshot_infos {
162 self.publish_snapshot(&snap_info, &cache);
163 }
164 }
165
166 fn publish_snapshot(&self, snap_info: &BookSnapshotInfo, cache: &Ref<Cache>) {
167 if snap_info.is_composite {
168 let topic = snap_info.topic;
169 let underlying = snap_info.root;
170 for instrument in cache.instruments(&snap_info.venue, Some(&underlying)) {
171 self.publish_order_book(&instrument.id(), topic, cache);
172 }
173 } else {
174 self.publish_order_book(&snap_info.instrument_id, snap_info.topic, cache);
175 }
176 }
177
178 fn publish_order_book(
179 &self,
180 instrument_id: &InstrumentId,
181 topic: MStr<Topic>,
182 cache: &Ref<Cache>,
183 ) {
184 let book = cache
185 .order_book(instrument_id)
186 .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
187
188 if book.update_count == 0 {
189 log::debug!("OrderBook not yet updated for snapshot: {instrument_id}");
190 return;
191 }
192 log::debug!(
193 "Publishing OrderBook snapshot for {instrument_id} (update_count={})",
194 book.update_count
195 );
196
197 msgbus::publish_book(topic, book);
198 }
199}