1use ahash::AHashMap;
19use nautilus_common::messages::DataEvent;
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22 data::InstrumentStatus, enums::MarketStatusAction, identifiers::InstrumentId,
23};
24
25use crate::spot::sbe::generated::symbol_status::SymbolStatus;
26
27impl From<SymbolStatus> for MarketStatusAction {
28 fn from(status: SymbolStatus) -> Self {
29 match status {
30 SymbolStatus::Trading => Self::Trading,
31 SymbolStatus::EndOfDay => Self::Close,
32 SymbolStatus::Halt => Self::Halt,
33 SymbolStatus::Break => Self::Pause,
34 SymbolStatus::NonRepresentable | SymbolStatus::NullVal => Self::NotAvailableForTrading,
35 }
36 }
37}
38
39pub fn diff_and_emit_statuses(
45 new_statuses: &AHashMap<InstrumentId, MarketStatusAction>,
46 cached_statuses: &mut AHashMap<InstrumentId, MarketStatusAction>,
47 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
48 ts_event: UnixNanos,
49 ts_init: UnixNanos,
50) {
51 for (instrument_id, &new_action) in new_statuses {
52 let changed = cached_statuses
53 .get(instrument_id)
54 .is_none_or(|&prev| prev != new_action);
55
56 if changed {
57 cached_statuses.insert(*instrument_id, new_action);
58 emit_status(sender, *instrument_id, new_action, ts_event, ts_init);
59 }
60 }
61
62 let removed: Vec<InstrumentId> = cached_statuses
64 .keys()
65 .filter(|id| !new_statuses.contains_key(id))
66 .copied()
67 .collect();
68
69 for instrument_id in removed {
70 cached_statuses.remove(&instrument_id);
71 emit_status(
72 sender,
73 instrument_id,
74 MarketStatusAction::NotAvailableForTrading,
75 ts_event,
76 ts_init,
77 );
78 }
79}
80
81fn emit_status(
82 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
83 instrument_id: InstrumentId,
84 action: MarketStatusAction,
85 ts_event: UnixNanos,
86 ts_init: UnixNanos,
87) {
88 let is_trading = Some(matches!(action, MarketStatusAction::Trading));
89 let status = InstrumentStatus::new(
90 instrument_id,
91 action,
92 ts_event,
93 ts_init,
94 None,
95 None,
96 is_trading,
97 None,
98 None,
99 );
100
101 if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
102 log::error!("Failed to emit instrument status event: {e}");
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use nautilus_model::identifiers::InstrumentId;
109 use rstest::rstest;
110
111 use super::{
112 super::enums::{BinanceContractStatus, BinanceTradingStatus},
113 *,
114 };
115
116 #[rstest]
117 #[case(SymbolStatus::Trading, MarketStatusAction::Trading)]
118 #[case(SymbolStatus::EndOfDay, MarketStatusAction::Close)]
119 #[case(SymbolStatus::Halt, MarketStatusAction::Halt)]
120 #[case(SymbolStatus::Break, MarketStatusAction::Pause)]
121 #[case(
122 SymbolStatus::NonRepresentable,
123 MarketStatusAction::NotAvailableForTrading
124 )]
125 #[case(SymbolStatus::NullVal, MarketStatusAction::NotAvailableForTrading)]
126 fn test_symbol_status_to_market_action(
127 #[case] input: SymbolStatus,
128 #[case] expected: MarketStatusAction,
129 ) {
130 assert_eq!(MarketStatusAction::from(input), expected);
131 }
132
133 #[rstest]
134 #[case(BinanceTradingStatus::Trading, MarketStatusAction::Trading)]
135 #[case(BinanceTradingStatus::PendingTrading, MarketStatusAction::PreOpen)]
136 #[case(BinanceTradingStatus::PreTrading, MarketStatusAction::PreOpen)]
137 #[case(BinanceTradingStatus::PostTrading, MarketStatusAction::PostClose)]
138 #[case(BinanceTradingStatus::EndOfDay, MarketStatusAction::Close)]
139 #[case(BinanceTradingStatus::Halt, MarketStatusAction::Halt)]
140 #[case(BinanceTradingStatus::AuctionMatch, MarketStatusAction::Cross)]
141 #[case(BinanceTradingStatus::Break, MarketStatusAction::Pause)]
142 #[case(
143 BinanceTradingStatus::Unknown,
144 MarketStatusAction::NotAvailableForTrading
145 )]
146 fn test_trading_status_to_market_action(
147 #[case] input: BinanceTradingStatus,
148 #[case] expected: MarketStatusAction,
149 ) {
150 assert_eq!(MarketStatusAction::from(input), expected);
151 }
152
153 #[rstest]
154 #[case(BinanceContractStatus::Trading, MarketStatusAction::Trading)]
155 #[case(BinanceContractStatus::PendingTrading, MarketStatusAction::PreOpen)]
156 #[case(BinanceContractStatus::PreDelivering, MarketStatusAction::PreClose)]
157 #[case(BinanceContractStatus::Delivering, MarketStatusAction::Close)]
158 #[case(BinanceContractStatus::Delivered, MarketStatusAction::Close)]
159 #[case(BinanceContractStatus::PreDelisting, MarketStatusAction::PreClose)]
160 #[case(BinanceContractStatus::Delisting, MarketStatusAction::Suspend)]
161 #[case(
162 BinanceContractStatus::Down,
163 MarketStatusAction::NotAvailableForTrading
164 )]
165 #[case(
166 BinanceContractStatus::Unknown,
167 MarketStatusAction::NotAvailableForTrading
168 )]
169 fn test_contract_status_to_market_action(
170 #[case] input: BinanceContractStatus,
171 #[case] expected: MarketStatusAction,
172 ) {
173 assert_eq!(MarketStatusAction::from(input), expected);
174 }
175
176 #[rstest]
177 fn test_diff_emits_on_change() {
178 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
179 let id = InstrumentId::from("BTCUSDT.BINANCE");
180
181 let mut cached = AHashMap::new();
182 cached.insert(id, MarketStatusAction::Trading);
183
184 let mut new_statuses = AHashMap::new();
185 new_statuses.insert(id, MarketStatusAction::Halt);
186
187 diff_and_emit_statuses(
188 &new_statuses,
189 &mut cached,
190 &tx,
191 UnixNanos::default(),
192 UnixNanos::default(),
193 );
194
195 let event = rx.try_recv().expect("expected status event");
196 match event {
197 DataEvent::InstrumentStatus(status) => {
198 assert_eq!(status.instrument_id, id);
199 assert_eq!(status.action, MarketStatusAction::Halt);
200 assert_eq!(status.is_trading, Some(false));
201 }
202 _ => panic!("expected InstrumentStatus event"),
203 }
204
205 assert_eq!(cached.get(&id), Some(&MarketStatusAction::Halt));
206 }
207
208 #[rstest]
209 fn test_diff_no_emit_when_unchanged() {
210 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
211 let id = InstrumentId::from("BTCUSDT.BINANCE");
212
213 let mut cached = AHashMap::new();
214 cached.insert(id, MarketStatusAction::Trading);
215
216 let mut new_statuses = AHashMap::new();
217 new_statuses.insert(id, MarketStatusAction::Trading);
218
219 diff_and_emit_statuses(
220 &new_statuses,
221 &mut cached,
222 &tx,
223 UnixNanos::default(),
224 UnixNanos::default(),
225 );
226
227 assert!(rx.try_recv().is_err());
228 }
229
230 #[rstest]
231 fn test_diff_emits_for_new_symbol() {
232 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
233 let id = InstrumentId::from("ETHUSDT.BINANCE");
234
235 let mut cached = AHashMap::new();
236 let mut new_statuses = AHashMap::new();
237 new_statuses.insert(id, MarketStatusAction::Trading);
238
239 diff_and_emit_statuses(
240 &new_statuses,
241 &mut cached,
242 &tx,
243 UnixNanos::default(),
244 UnixNanos::default(),
245 );
246
247 let event = rx.try_recv().expect("expected status event for new symbol");
248 match event {
249 DataEvent::InstrumentStatus(status) => {
250 assert_eq!(status.instrument_id, id);
251 assert_eq!(status.action, MarketStatusAction::Trading);
252 assert_eq!(status.is_trading, Some(true));
253 }
254 _ => panic!("expected InstrumentStatus event"),
255 }
256 }
257
258 #[rstest]
259 fn test_diff_emits_not_available_for_removed_symbol() {
260 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
261 let id = InstrumentId::from("BTCUSDT.BINANCE");
262
263 let mut cached = AHashMap::new();
264 cached.insert(id, MarketStatusAction::Trading);
265
266 let new_statuses = AHashMap::new(); diff_and_emit_statuses(
269 &new_statuses,
270 &mut cached,
271 &tx,
272 UnixNanos::default(),
273 UnixNanos::default(),
274 );
275
276 let event = rx
277 .try_recv()
278 .expect("expected status event for removed symbol");
279 match event {
280 DataEvent::InstrumentStatus(status) => {
281 assert_eq!(status.instrument_id, id);
282 assert_eq!(status.action, MarketStatusAction::NotAvailableForTrading);
283 assert_eq!(status.is_trading, Some(false));
284 }
285 _ => panic!("expected InstrumentStatus event"),
286 }
287
288 assert!(!cached.contains_key(&id));
289 }
290}