Skip to main content

nautilus_bybit/common/
status.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Instrument status mapping and polling for the Bybit adapter.
17
18use ahash::{AHashMap, AHashSet};
19use nautilus_common::messages::DataEvent;
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22    data::InstrumentStatus, enums::MarketStatusAction, identifiers::InstrumentId,
23};
24
25use super::enums::BybitInstrumentStatus;
26
27impl From<BybitInstrumentStatus> for MarketStatusAction {
28    fn from(status: BybitInstrumentStatus) -> Self {
29        match status {
30            BybitInstrumentStatus::PreLaunch => Self::PreOpen,
31            BybitInstrumentStatus::Trading => Self::Trading,
32            BybitInstrumentStatus::Delivering => Self::PreClose,
33            BybitInstrumentStatus::Closed => Self::Close,
34            BybitInstrumentStatus::Other => Self::NotAvailableForTrading,
35        }
36    }
37}
38
39/// Compares new status snapshot against cached state, emitting [`InstrumentStatus`]
40/// events for changes and removals.
41///
42/// The cache is always updated to reflect the full API state. Emissions are gated
43/// by `subscriptions`: only instruments present in the subscription set produce
44/// events. Pass `None` to emit for all changes unconditionally.
45///
46/// Symbols present in the cache but absent from the new snapshot are treated as
47/// removed and emit `NotAvailableForTrading` (if subscribed).
48pub fn diff_and_emit_statuses(
49    new_statuses: &AHashMap<InstrumentId, MarketStatusAction>,
50    cached_statuses: &mut AHashMap<InstrumentId, MarketStatusAction>,
51    subscriptions: Option<&AHashSet<InstrumentId>>,
52    sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
53    ts_event: UnixNanos,
54    ts_init: UnixNanos,
55) {
56    let is_subscribed = |id: &InstrumentId| subscriptions.is_none_or(|subs| subs.contains(id));
57
58    for (instrument_id, &new_action) in new_statuses {
59        let changed = cached_statuses
60            .get(instrument_id)
61            .is_none_or(|&prev| prev != new_action);
62
63        if changed {
64            cached_statuses.insert(*instrument_id, new_action);
65            if is_subscribed(instrument_id) {
66                emit_status(sender, *instrument_id, new_action, ts_event, ts_init);
67            }
68        }
69    }
70
71    // Detect symbols removed from the exchange info snapshot
72    let removed: Vec<InstrumentId> = cached_statuses
73        .keys()
74        .filter(|id| !new_statuses.contains_key(id))
75        .copied()
76        .collect();
77
78    for instrument_id in removed {
79        cached_statuses.remove(&instrument_id);
80        if is_subscribed(&instrument_id) {
81            emit_status(
82                sender,
83                instrument_id,
84                MarketStatusAction::NotAvailableForTrading,
85                ts_event,
86                ts_init,
87            );
88        }
89    }
90}
91
92fn emit_status(
93    sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
94    instrument_id: InstrumentId,
95    action: MarketStatusAction,
96    ts_event: UnixNanos,
97    ts_init: UnixNanos,
98) {
99    let is_trading = Some(matches!(action, MarketStatusAction::Trading));
100    let status = InstrumentStatus::new(
101        instrument_id,
102        action,
103        ts_event,
104        ts_init,
105        None,
106        None,
107        is_trading,
108        None,
109        None,
110    );
111
112    if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
113        log::error!("Failed to emit instrument status event: {e}");
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use nautilus_model::identifiers::InstrumentId;
120    use rstest::rstest;
121
122    use super::*;
123
124    #[rstest]
125    #[case(BybitInstrumentStatus::Trading, MarketStatusAction::Trading)]
126    #[case(BybitInstrumentStatus::PreLaunch, MarketStatusAction::PreOpen)]
127    #[case(BybitInstrumentStatus::Delivering, MarketStatusAction::PreClose)]
128    #[case(BybitInstrumentStatus::Closed, MarketStatusAction::Close)]
129    #[case(
130        BybitInstrumentStatus::Other,
131        MarketStatusAction::NotAvailableForTrading
132    )]
133    fn test_bybit_instrument_status_to_market_action(
134        #[case] input: BybitInstrumentStatus,
135        #[case] expected: MarketStatusAction,
136    ) {
137        assert_eq!(MarketStatusAction::from(input), expected);
138    }
139
140    #[rstest]
141    fn test_diff_emits_on_change() {
142        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
143        let id = InstrumentId::from("BTCUSDT-LINEAR.BYBIT");
144
145        let mut cached = AHashMap::new();
146        cached.insert(id, MarketStatusAction::Trading);
147
148        let mut new_statuses = AHashMap::new();
149        new_statuses.insert(id, MarketStatusAction::Halt);
150
151        diff_and_emit_statuses(
152            &new_statuses,
153            &mut cached,
154            None,
155            &tx,
156            UnixNanos::default(),
157            UnixNanos::default(),
158        );
159
160        let event = rx.try_recv().expect("expected status event");
161        match event {
162            DataEvent::InstrumentStatus(status) => {
163                assert_eq!(status.instrument_id, id);
164                assert_eq!(status.action, MarketStatusAction::Halt);
165                assert_eq!(status.is_trading, Some(false));
166            }
167            _ => panic!("expected InstrumentStatus event"),
168        }
169
170        assert_eq!(cached.get(&id), Some(&MarketStatusAction::Halt));
171    }
172
173    #[rstest]
174    fn test_diff_no_emit_when_unchanged() {
175        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
176        let id = InstrumentId::from("BTCUSDT-LINEAR.BYBIT");
177
178        let mut cached = AHashMap::new();
179        cached.insert(id, MarketStatusAction::Trading);
180
181        let mut new_statuses = AHashMap::new();
182        new_statuses.insert(id, MarketStatusAction::Trading);
183
184        diff_and_emit_statuses(
185            &new_statuses,
186            &mut cached,
187            None,
188            &tx,
189            UnixNanos::default(),
190            UnixNanos::default(),
191        );
192
193        assert!(rx.try_recv().is_err());
194    }
195
196    #[rstest]
197    fn test_diff_emits_for_new_symbol() {
198        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
199        let id = InstrumentId::from("ETHUSDT-LINEAR.BYBIT");
200
201        let mut cached = AHashMap::new();
202        let mut new_statuses = AHashMap::new();
203        new_statuses.insert(id, MarketStatusAction::Trading);
204
205        diff_and_emit_statuses(
206            &new_statuses,
207            &mut cached,
208            None,
209            &tx,
210            UnixNanos::default(),
211            UnixNanos::default(),
212        );
213
214        let event = rx.try_recv().expect("expected status event for new symbol");
215        match event {
216            DataEvent::InstrumentStatus(status) => {
217                assert_eq!(status.instrument_id, id);
218                assert_eq!(status.action, MarketStatusAction::Trading);
219                assert_eq!(status.is_trading, Some(true));
220            }
221            _ => panic!("expected InstrumentStatus event"),
222        }
223    }
224
225    #[rstest]
226    fn test_diff_emits_not_available_for_removed_symbol() {
227        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
228        let id = InstrumentId::from("BTCUSDT-LINEAR.BYBIT");
229
230        let mut cached = AHashMap::new();
231        cached.insert(id, MarketStatusAction::Trading);
232
233        let new_statuses = AHashMap::new(); // Symbol disappeared
234
235        diff_and_emit_statuses(
236            &new_statuses,
237            &mut cached,
238            None,
239            &tx,
240            UnixNanos::default(),
241            UnixNanos::default(),
242        );
243
244        let event = rx
245            .try_recv()
246            .expect("expected status event for removed symbol");
247        match event {
248            DataEvent::InstrumentStatus(status) => {
249                assert_eq!(status.instrument_id, id);
250                assert_eq!(status.action, MarketStatusAction::NotAvailableForTrading);
251                assert_eq!(status.is_trading, Some(false));
252            }
253            _ => panic!("expected InstrumentStatus event"),
254        }
255
256        assert!(!cached.contains_key(&id));
257    }
258
259    #[rstest]
260    fn test_diff_subscription_gating_only_emits_for_subscribed() {
261        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
262        let subscribed_id = InstrumentId::from("BTCUSDT-LINEAR.BYBIT");
263        let unsubscribed_id = InstrumentId::from("ETHUSDT-LINEAR.BYBIT");
264
265        let mut subs = AHashSet::new();
266        subs.insert(subscribed_id);
267
268        let mut cached = AHashMap::new();
269        cached.insert(subscribed_id, MarketStatusAction::Trading);
270        cached.insert(unsubscribed_id, MarketStatusAction::Trading);
271
272        // Both change status
273        let mut new_statuses = AHashMap::new();
274        new_statuses.insert(subscribed_id, MarketStatusAction::Halt);
275        new_statuses.insert(unsubscribed_id, MarketStatusAction::Halt);
276
277        diff_and_emit_statuses(
278            &new_statuses,
279            &mut cached,
280            Some(&subs),
281            &tx,
282            UnixNanos::default(),
283            UnixNanos::default(),
284        );
285
286        // Only subscribed instrument emits
287        let event = rx.try_recv().expect("expected status event");
288        match event {
289            DataEvent::InstrumentStatus(status) => {
290                assert_eq!(status.instrument_id, subscribed_id);
291                assert_eq!(status.action, MarketStatusAction::Halt);
292            }
293            _ => panic!("expected InstrumentStatus event"),
294        }
295        assert!(rx.try_recv().is_err(), "should not emit for unsubscribed");
296
297        // But cache is updated for both
298        assert_eq!(cached.get(&subscribed_id), Some(&MarketStatusAction::Halt));
299        assert_eq!(
300            cached.get(&unsubscribed_id),
301            Some(&MarketStatusAction::Halt)
302        );
303    }
304
305    #[rstest]
306    fn test_diff_removal_only_emits_for_subscribed() {
307        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
308        let subscribed_id = InstrumentId::from("BTCUSDT-LINEAR.BYBIT");
309        let unsubscribed_id = InstrumentId::from("ETHUSDT-LINEAR.BYBIT");
310
311        let mut subs = AHashSet::new();
312        subs.insert(subscribed_id);
313
314        let mut cached = AHashMap::new();
315        cached.insert(subscribed_id, MarketStatusAction::Trading);
316        cached.insert(unsubscribed_id, MarketStatusAction::Trading);
317
318        let new_statuses = AHashMap::new(); // Both removed from API
319
320        diff_and_emit_statuses(
321            &new_statuses,
322            &mut cached,
323            Some(&subs),
324            &tx,
325            UnixNanos::default(),
326            UnixNanos::default(),
327        );
328
329        // Only subscribed instrument emits NotAvailableForTrading
330        let event = rx.try_recv().expect("expected removal event");
331        match event {
332            DataEvent::InstrumentStatus(status) => {
333                assert_eq!(status.instrument_id, subscribed_id);
334                assert_eq!(status.action, MarketStatusAction::NotAvailableForTrading);
335            }
336            _ => panic!("expected InstrumentStatus event"),
337        }
338        assert!(rx.try_recv().is_err(), "should not emit for unsubscribed");
339
340        // Both removed from cache
341        assert!(cached.is_empty());
342    }
343}