Skip to main content

nautilus_data/engine/
book.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::{Ref, RefCell},
18    num::NonZeroUsize,
19    rc::Rc,
20};
21
22use indexmap::IndexMap;
23use nautilus_common::{
24    cache::Cache,
25    msgbus::{self, Handler, MStr, Topic},
26    timer::TimeEvent,
27};
28use nautilus_model::{
29    data::{OrderBookDeltas, OrderBookDepth10},
30    identifiers::{InstrumentId, Venue},
31    instruments::Instrument,
32};
33use ustr::Ustr;
34
35/// Contains information for creating snapshots of specific order books.
36#[derive(Clone, Debug)]
37pub struct BookSnapshotInfo {
38    pub instrument_id: InstrumentId,
39    pub venue: Venue,
40    pub is_composite: bool,
41    pub root: Ustr,
42    pub topic: MStr<Topic>,
43    pub interval_ms: NonZeroUsize,
44}
45
46/// Reference-counted map of per-instrument book snapshot descriptors.
47///
48/// Shared between the engine (which populates it on subscribe) and the
49/// [`BookSnapshotter`] timer callback (which iterates it on each tick).
50pub(crate) type BookSnapshotInfos = Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>;
51
52/// Reference count key for a book snapshot subscription.
53pub(crate) type BookSnapshotKey = (InstrumentId, NonZeroUsize);
54
55/// Outcome of decrementing a book snapshot subscription.
56pub(crate) enum BookSnapshotUnsubscribeResult {
57    /// No matching subscription was found.
58    NotSubscribed,
59    /// The reference count was decremented but other consumers remain.
60    Decremented,
61    /// The last consumer was removed; tear down associated state.
62    Removed,
63}
64
65/// Handles order book updates and delta processing for a specific instrument.
66///
67/// The `BookUpdater` processes incoming order book deltas and maintains
68/// the current state of an order book. It can handle both incremental
69/// updates and full snapshots for the instrument it's assigned to.
70#[derive(Debug)]
71pub struct BookUpdater {
72    pub id: Ustr,
73    pub instrument_id: InstrumentId,
74    pub cache: Rc<RefCell<Cache>>,
75}
76
77impl BookUpdater {
78    /// Creates a new [`BookUpdater`] instance.
79    pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
80        Self {
81            id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
82            instrument_id: *instrument_id,
83            cache,
84        }
85    }
86}
87
88impl Handler<OrderBookDeltas> for BookUpdater {
89    fn id(&self) -> Ustr {
90        self.id
91    }
92
93    fn handle(&self, deltas: &OrderBookDeltas) {
94        if let Some(book) = self
95            .cache
96            .borrow_mut()
97            .order_book_mut(&deltas.instrument_id)
98            && let Err(e) = book.apply_deltas(deltas)
99        {
100            log::error!("Failed to apply deltas: {e}");
101        }
102    }
103}
104
105impl Handler<OrderBookDepth10> for BookUpdater {
106    fn id(&self) -> Ustr {
107        self.id
108    }
109
110    fn handle(&self, depth: &OrderBookDepth10) {
111        if let Some(book) = self.cache.borrow_mut().order_book_mut(&depth.instrument_id)
112            && let Err(e) = book.apply_depth(depth)
113        {
114            log::error!("Failed to apply depth: {e}");
115        }
116    }
117}
118
119/// Creates periodic snapshots of order books at configured intervals.
120///
121/// The `BookSnapshotter` generates order book snapshots on timer events,
122/// publishing them as market data. This is useful for providing periodic
123/// full order book state updates in addition to incremental delta updates.
124#[derive(Debug)]
125pub struct BookSnapshotter {
126    pub timer_name: Ustr,
127    pub interval_ms: NonZeroUsize,
128    pub snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
129    pub cache: Rc<RefCell<Cache>>,
130}
131
132impl BookSnapshotter {
133    /// Creates a new [`BookSnapshotter`] instance.
134    pub fn new(
135        interval_ms: NonZeroUsize,
136        snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
137        cache: Rc<RefCell<Cache>>,
138    ) -> Self {
139        let timer_name = format!("OrderBookSnapshots|{interval_ms}");
140
141        Self {
142            timer_name: Ustr::from(&timer_name),
143            interval_ms,
144            snapshot_infos,
145            cache,
146        }
147    }
148
149    pub fn snapshot(&self, _event: TimeEvent) {
150        let snapshot_infos: Vec<BookSnapshotInfo> =
151            self.snapshot_infos.borrow().values().cloned().collect();
152
153        log::debug!(
154            "BookSnapshotter.snapshot called for {} subscriptions at {}ms",
155            snapshot_infos.len(),
156            self.interval_ms,
157        );
158
159        let cache = self.cache.borrow();
160
161        for snap_info in snapshot_infos {
162            self.publish_snapshot(&snap_info, &cache);
163        }
164    }
165
166    fn publish_snapshot(&self, snap_info: &BookSnapshotInfo, cache: &Ref<Cache>) {
167        if snap_info.is_composite {
168            let topic = snap_info.topic;
169            let underlying = snap_info.root;
170            for instrument in cache.instruments(&snap_info.venue, Some(&underlying)) {
171                self.publish_order_book(&instrument.id(), topic, cache);
172            }
173        } else {
174            self.publish_order_book(&snap_info.instrument_id, snap_info.topic, cache);
175        }
176    }
177
178    fn publish_order_book(
179        &self,
180        instrument_id: &InstrumentId,
181        topic: MStr<Topic>,
182        cache: &Ref<Cache>,
183    ) {
184        let book = cache
185            .order_book(instrument_id)
186            .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
187
188        if book.update_count == 0 {
189            log::debug!("OrderBook not yet updated for snapshot: {instrument_id}");
190            return;
191        }
192        log::debug!(
193            "Publishing OrderBook snapshot for {instrument_id} (update_count={})",
194            book.update_count
195        );
196
197        msgbus::publish_book(topic, book);
198    }
199}