1use ahash::{AHashMap, AHashSet};
19use nautilus_common::messages::DataEvent;
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22 data::InstrumentStatus, enums::MarketStatusAction, identifiers::InstrumentId,
23};
24
25use super::enums::BybitInstrumentStatus;
26
27impl From<BybitInstrumentStatus> for MarketStatusAction {
28 fn from(status: BybitInstrumentStatus) -> Self {
29 match status {
30 BybitInstrumentStatus::PreLaunch => Self::PreOpen,
31 BybitInstrumentStatus::Trading => Self::Trading,
32 BybitInstrumentStatus::Delivering => Self::PreClose,
33 BybitInstrumentStatus::Closed => Self::Close,
34 BybitInstrumentStatus::Other => Self::NotAvailableForTrading,
35 }
36 }
37}
38
39pub fn diff_and_emit_statuses(
49 new_statuses: &AHashMap<InstrumentId, MarketStatusAction>,
50 cached_statuses: &mut AHashMap<InstrumentId, MarketStatusAction>,
51 subscriptions: Option<&AHashSet<InstrumentId>>,
52 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
53 ts_event: UnixNanos,
54 ts_init: UnixNanos,
55) {
56 let is_subscribed = |id: &InstrumentId| subscriptions.is_none_or(|subs| subs.contains(id));
57
58 for (instrument_id, &new_action) in new_statuses {
59 let changed = cached_statuses
60 .get(instrument_id)
61 .is_none_or(|&prev| prev != new_action);
62
63 if changed {
64 cached_statuses.insert(*instrument_id, new_action);
65 if is_subscribed(instrument_id) {
66 emit_status(sender, *instrument_id, new_action, ts_event, ts_init);
67 }
68 }
69 }
70
71 let removed: Vec<InstrumentId> = cached_statuses
73 .keys()
74 .filter(|id| !new_statuses.contains_key(id))
75 .copied()
76 .collect();
77
78 for instrument_id in removed {
79 cached_statuses.remove(&instrument_id);
80 if is_subscribed(&instrument_id) {
81 emit_status(
82 sender,
83 instrument_id,
84 MarketStatusAction::NotAvailableForTrading,
85 ts_event,
86 ts_init,
87 );
88 }
89 }
90}
91
92fn emit_status(
93 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
94 instrument_id: InstrumentId,
95 action: MarketStatusAction,
96 ts_event: UnixNanos,
97 ts_init: UnixNanos,
98) {
99 let is_trading = Some(matches!(action, MarketStatusAction::Trading));
100 let status = InstrumentStatus::new(
101 instrument_id,
102 action,
103 ts_event,
104 ts_init,
105 None,
106 None,
107 is_trading,
108 None,
109 None,
110 );
111
112 if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
113 log::error!("Failed to emit instrument status event: {e}");
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use nautilus_model::identifiers::InstrumentId;
120 use rstest::rstest;
121
122 use super::*;
123
124 #[rstest]
125 #[case(BybitInstrumentStatus::Trading, MarketStatusAction::Trading)]
126 #[case(BybitInstrumentStatus::PreLaunch, MarketStatusAction::PreOpen)]
127 #[case(BybitInstrumentStatus::Delivering, MarketStatusAction::PreClose)]
128 #[case(BybitInstrumentStatus::Closed, MarketStatusAction::Close)]
129 #[case(
130 BybitInstrumentStatus::Other,
131 MarketStatusAction::NotAvailableForTrading
132 )]
133 fn test_bybit_instrument_status_to_market_action(
134 #[case] input: BybitInstrumentStatus,
135 #[case] expected: MarketStatusAction,
136 ) {
137 assert_eq!(MarketStatusAction::from(input), expected);
138 }
139
140 #[rstest]
141 fn test_diff_emits_on_change() {
142 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
143 let id = InstrumentId::from("BTCUSDT-LINEAR.BYBIT");
144
145 let mut cached = AHashMap::new();
146 cached.insert(id, MarketStatusAction::Trading);
147
148 let mut new_statuses = AHashMap::new();
149 new_statuses.insert(id, MarketStatusAction::Halt);
150
151 diff_and_emit_statuses(
152 &new_statuses,
153 &mut cached,
154 None,
155 &tx,
156 UnixNanos::default(),
157 UnixNanos::default(),
158 );
159
160 let event = rx.try_recv().expect("expected status event");
161 match event {
162 DataEvent::InstrumentStatus(status) => {
163 assert_eq!(status.instrument_id, id);
164 assert_eq!(status.action, MarketStatusAction::Halt);
165 assert_eq!(status.is_trading, Some(false));
166 }
167 _ => panic!("expected InstrumentStatus event"),
168 }
169
170 assert_eq!(cached.get(&id), Some(&MarketStatusAction::Halt));
171 }
172
173 #[rstest]
174 fn test_diff_no_emit_when_unchanged() {
175 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
176 let id = InstrumentId::from("BTCUSDT-LINEAR.BYBIT");
177
178 let mut cached = AHashMap::new();
179 cached.insert(id, MarketStatusAction::Trading);
180
181 let mut new_statuses = AHashMap::new();
182 new_statuses.insert(id, MarketStatusAction::Trading);
183
184 diff_and_emit_statuses(
185 &new_statuses,
186 &mut cached,
187 None,
188 &tx,
189 UnixNanos::default(),
190 UnixNanos::default(),
191 );
192
193 assert!(rx.try_recv().is_err());
194 }
195
196 #[rstest]
197 fn test_diff_emits_for_new_symbol() {
198 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
199 let id = InstrumentId::from("ETHUSDT-LINEAR.BYBIT");
200
201 let mut cached = AHashMap::new();
202 let mut new_statuses = AHashMap::new();
203 new_statuses.insert(id, MarketStatusAction::Trading);
204
205 diff_and_emit_statuses(
206 &new_statuses,
207 &mut cached,
208 None,
209 &tx,
210 UnixNanos::default(),
211 UnixNanos::default(),
212 );
213
214 let event = rx.try_recv().expect("expected status event for new symbol");
215 match event {
216 DataEvent::InstrumentStatus(status) => {
217 assert_eq!(status.instrument_id, id);
218 assert_eq!(status.action, MarketStatusAction::Trading);
219 assert_eq!(status.is_trading, Some(true));
220 }
221 _ => panic!("expected InstrumentStatus event"),
222 }
223 }
224
225 #[rstest]
226 fn test_diff_emits_not_available_for_removed_symbol() {
227 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
228 let id = InstrumentId::from("BTCUSDT-LINEAR.BYBIT");
229
230 let mut cached = AHashMap::new();
231 cached.insert(id, MarketStatusAction::Trading);
232
233 let new_statuses = AHashMap::new(); diff_and_emit_statuses(
236 &new_statuses,
237 &mut cached,
238 None,
239 &tx,
240 UnixNanos::default(),
241 UnixNanos::default(),
242 );
243
244 let event = rx
245 .try_recv()
246 .expect("expected status event for removed symbol");
247 match event {
248 DataEvent::InstrumentStatus(status) => {
249 assert_eq!(status.instrument_id, id);
250 assert_eq!(status.action, MarketStatusAction::NotAvailableForTrading);
251 assert_eq!(status.is_trading, Some(false));
252 }
253 _ => panic!("expected InstrumentStatus event"),
254 }
255
256 assert!(!cached.contains_key(&id));
257 }
258
259 #[rstest]
260 fn test_diff_subscription_gating_only_emits_for_subscribed() {
261 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
262 let subscribed_id = InstrumentId::from("BTCUSDT-LINEAR.BYBIT");
263 let unsubscribed_id = InstrumentId::from("ETHUSDT-LINEAR.BYBIT");
264
265 let mut subs = AHashSet::new();
266 subs.insert(subscribed_id);
267
268 let mut cached = AHashMap::new();
269 cached.insert(subscribed_id, MarketStatusAction::Trading);
270 cached.insert(unsubscribed_id, MarketStatusAction::Trading);
271
272 let mut new_statuses = AHashMap::new();
274 new_statuses.insert(subscribed_id, MarketStatusAction::Halt);
275 new_statuses.insert(unsubscribed_id, MarketStatusAction::Halt);
276
277 diff_and_emit_statuses(
278 &new_statuses,
279 &mut cached,
280 Some(&subs),
281 &tx,
282 UnixNanos::default(),
283 UnixNanos::default(),
284 );
285
286 let event = rx.try_recv().expect("expected status event");
288 match event {
289 DataEvent::InstrumentStatus(status) => {
290 assert_eq!(status.instrument_id, subscribed_id);
291 assert_eq!(status.action, MarketStatusAction::Halt);
292 }
293 _ => panic!("expected InstrumentStatus event"),
294 }
295 assert!(rx.try_recv().is_err(), "should not emit for unsubscribed");
296
297 assert_eq!(cached.get(&subscribed_id), Some(&MarketStatusAction::Halt));
299 assert_eq!(
300 cached.get(&unsubscribed_id),
301 Some(&MarketStatusAction::Halt)
302 );
303 }
304
305 #[rstest]
306 fn test_diff_removal_only_emits_for_subscribed() {
307 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
308 let subscribed_id = InstrumentId::from("BTCUSDT-LINEAR.BYBIT");
309 let unsubscribed_id = InstrumentId::from("ETHUSDT-LINEAR.BYBIT");
310
311 let mut subs = AHashSet::new();
312 subs.insert(subscribed_id);
313
314 let mut cached = AHashMap::new();
315 cached.insert(subscribed_id, MarketStatusAction::Trading);
316 cached.insert(unsubscribed_id, MarketStatusAction::Trading);
317
318 let new_statuses = AHashMap::new(); diff_and_emit_statuses(
321 &new_statuses,
322 &mut cached,
323 Some(&subs),
324 &tx,
325 UnixNanos::default(),
326 UnixNanos::default(),
327 );
328
329 let event = rx.try_recv().expect("expected removal event");
331 match event {
332 DataEvent::InstrumentStatus(status) => {
333 assert_eq!(status.instrument_id, subscribed_id);
334 assert_eq!(status.action, MarketStatusAction::NotAvailableForTrading);
335 }
336 _ => panic!("expected InstrumentStatus event"),
337 }
338 assert!(rx.try_recv().is_err(), "should not emit for unsubscribed");
339
340 assert!(cached.is_empty());
342 }
343}