Skip to main content

nautilus_binance/common/
status.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Instrument status mapping and polling for the Binance adapter.
17
18use ahash::AHashMap;
19use nautilus_common::messages::DataEvent;
20use nautilus_core::UnixNanos;
21use nautilus_model::{
22    data::InstrumentStatus, enums::MarketStatusAction, identifiers::InstrumentId,
23};
24
25use crate::spot::sbe::generated::symbol_status::SymbolStatus;
26
27impl From<SymbolStatus> for MarketStatusAction {
28    fn from(status: SymbolStatus) -> Self {
29        match status {
30            SymbolStatus::Trading => Self::Trading,
31            SymbolStatus::EndOfDay => Self::Close,
32            SymbolStatus::Halt => Self::Halt,
33            SymbolStatus::Break => Self::Pause,
34            SymbolStatus::NonRepresentable | SymbolStatus::NullVal => Self::NotAvailableForTrading,
35        }
36    }
37}
38
39/// Compares new status snapshot against cached state, emitting [`InstrumentStatus`]
40/// events for changes and removals.
41///
42/// Symbols present in the cache but absent from the new snapshot are treated as
43/// removed and emit `NotAvailableForTrading`.
44pub fn diff_and_emit_statuses(
45    new_statuses: &AHashMap<InstrumentId, MarketStatusAction>,
46    cached_statuses: &mut AHashMap<InstrumentId, MarketStatusAction>,
47    sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
48    ts_event: UnixNanos,
49    ts_init: UnixNanos,
50) {
51    for (instrument_id, &new_action) in new_statuses {
52        let changed = cached_statuses
53            .get(instrument_id)
54            .is_none_or(|&prev| prev != new_action);
55
56        if changed {
57            cached_statuses.insert(*instrument_id, new_action);
58            emit_status(sender, *instrument_id, new_action, ts_event, ts_init);
59        }
60    }
61
62    // Detect symbols removed from the exchange info snapshot
63    let removed: Vec<InstrumentId> = cached_statuses
64        .keys()
65        .filter(|id| !new_statuses.contains_key(id))
66        .copied()
67        .collect();
68
69    for instrument_id in removed {
70        cached_statuses.remove(&instrument_id);
71        emit_status(
72            sender,
73            instrument_id,
74            MarketStatusAction::NotAvailableForTrading,
75            ts_event,
76            ts_init,
77        );
78    }
79}
80
81fn emit_status(
82    sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
83    instrument_id: InstrumentId,
84    action: MarketStatusAction,
85    ts_event: UnixNanos,
86    ts_init: UnixNanos,
87) {
88    let is_trading = Some(matches!(action, MarketStatusAction::Trading));
89    let status = InstrumentStatus::new(
90        instrument_id,
91        action,
92        ts_event,
93        ts_init,
94        None,
95        None,
96        is_trading,
97        None,
98        None,
99    );
100
101    if let Err(e) = sender.send(DataEvent::InstrumentStatus(status)) {
102        log::error!("Failed to emit instrument status event: {e}");
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use nautilus_model::identifiers::InstrumentId;
109    use rstest::rstest;
110
111    use super::{
112        super::enums::{BinanceContractStatus, BinanceTradingStatus},
113        *,
114    };
115
116    #[rstest]
117    #[case(SymbolStatus::Trading, MarketStatusAction::Trading)]
118    #[case(SymbolStatus::EndOfDay, MarketStatusAction::Close)]
119    #[case(SymbolStatus::Halt, MarketStatusAction::Halt)]
120    #[case(SymbolStatus::Break, MarketStatusAction::Pause)]
121    #[case(
122        SymbolStatus::NonRepresentable,
123        MarketStatusAction::NotAvailableForTrading
124    )]
125    #[case(SymbolStatus::NullVal, MarketStatusAction::NotAvailableForTrading)]
126    fn test_symbol_status_to_market_action(
127        #[case] input: SymbolStatus,
128        #[case] expected: MarketStatusAction,
129    ) {
130        assert_eq!(MarketStatusAction::from(input), expected);
131    }
132
133    #[rstest]
134    #[case(BinanceTradingStatus::Trading, MarketStatusAction::Trading)]
135    #[case(BinanceTradingStatus::PendingTrading, MarketStatusAction::PreOpen)]
136    #[case(BinanceTradingStatus::PreTrading, MarketStatusAction::PreOpen)]
137    #[case(BinanceTradingStatus::PostTrading, MarketStatusAction::PostClose)]
138    #[case(BinanceTradingStatus::EndOfDay, MarketStatusAction::Close)]
139    #[case(BinanceTradingStatus::Halt, MarketStatusAction::Halt)]
140    #[case(BinanceTradingStatus::AuctionMatch, MarketStatusAction::Cross)]
141    #[case(BinanceTradingStatus::Break, MarketStatusAction::Pause)]
142    #[case(
143        BinanceTradingStatus::Unknown,
144        MarketStatusAction::NotAvailableForTrading
145    )]
146    fn test_trading_status_to_market_action(
147        #[case] input: BinanceTradingStatus,
148        #[case] expected: MarketStatusAction,
149    ) {
150        assert_eq!(MarketStatusAction::from(input), expected);
151    }
152
153    #[rstest]
154    #[case(BinanceContractStatus::Trading, MarketStatusAction::Trading)]
155    #[case(BinanceContractStatus::PendingTrading, MarketStatusAction::PreOpen)]
156    #[case(BinanceContractStatus::PreDelivering, MarketStatusAction::PreClose)]
157    #[case(BinanceContractStatus::Delivering, MarketStatusAction::Close)]
158    #[case(BinanceContractStatus::Delivered, MarketStatusAction::Close)]
159    #[case(BinanceContractStatus::PreDelisting, MarketStatusAction::PreClose)]
160    #[case(BinanceContractStatus::Delisting, MarketStatusAction::Suspend)]
161    #[case(
162        BinanceContractStatus::Down,
163        MarketStatusAction::NotAvailableForTrading
164    )]
165    #[case(
166        BinanceContractStatus::Unknown,
167        MarketStatusAction::NotAvailableForTrading
168    )]
169    fn test_contract_status_to_market_action(
170        #[case] input: BinanceContractStatus,
171        #[case] expected: MarketStatusAction,
172    ) {
173        assert_eq!(MarketStatusAction::from(input), expected);
174    }
175
176    #[rstest]
177    fn test_diff_emits_on_change() {
178        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
179        let id = InstrumentId::from("BTCUSDT.BINANCE");
180
181        let mut cached = AHashMap::new();
182        cached.insert(id, MarketStatusAction::Trading);
183
184        let mut new_statuses = AHashMap::new();
185        new_statuses.insert(id, MarketStatusAction::Halt);
186
187        diff_and_emit_statuses(
188            &new_statuses,
189            &mut cached,
190            &tx,
191            UnixNanos::default(),
192            UnixNanos::default(),
193        );
194
195        let event = rx.try_recv().expect("expected status event");
196        match event {
197            DataEvent::InstrumentStatus(status) => {
198                assert_eq!(status.instrument_id, id);
199                assert_eq!(status.action, MarketStatusAction::Halt);
200                assert_eq!(status.is_trading, Some(false));
201            }
202            _ => panic!("expected InstrumentStatus event"),
203        }
204
205        assert_eq!(cached.get(&id), Some(&MarketStatusAction::Halt));
206    }
207
208    #[rstest]
209    fn test_diff_no_emit_when_unchanged() {
210        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
211        let id = InstrumentId::from("BTCUSDT.BINANCE");
212
213        let mut cached = AHashMap::new();
214        cached.insert(id, MarketStatusAction::Trading);
215
216        let mut new_statuses = AHashMap::new();
217        new_statuses.insert(id, MarketStatusAction::Trading);
218
219        diff_and_emit_statuses(
220            &new_statuses,
221            &mut cached,
222            &tx,
223            UnixNanos::default(),
224            UnixNanos::default(),
225        );
226
227        assert!(rx.try_recv().is_err());
228    }
229
230    #[rstest]
231    fn test_diff_emits_for_new_symbol() {
232        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
233        let id = InstrumentId::from("ETHUSDT.BINANCE");
234
235        let mut cached = AHashMap::new();
236        let mut new_statuses = AHashMap::new();
237        new_statuses.insert(id, MarketStatusAction::Trading);
238
239        diff_and_emit_statuses(
240            &new_statuses,
241            &mut cached,
242            &tx,
243            UnixNanos::default(),
244            UnixNanos::default(),
245        );
246
247        let event = rx.try_recv().expect("expected status event for new symbol");
248        match event {
249            DataEvent::InstrumentStatus(status) => {
250                assert_eq!(status.instrument_id, id);
251                assert_eq!(status.action, MarketStatusAction::Trading);
252                assert_eq!(status.is_trading, Some(true));
253            }
254            _ => panic!("expected InstrumentStatus event"),
255        }
256    }
257
258    #[rstest]
259    fn test_diff_emits_not_available_for_removed_symbol() {
260        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
261        let id = InstrumentId::from("BTCUSDT.BINANCE");
262
263        let mut cached = AHashMap::new();
264        cached.insert(id, MarketStatusAction::Trading);
265
266        let new_statuses = AHashMap::new(); // Symbol disappeared
267
268        diff_and_emit_statuses(
269            &new_statuses,
270            &mut cached,
271            &tx,
272            UnixNanos::default(),
273            UnixNanos::default(),
274        );
275
276        let event = rx
277            .try_recv()
278            .expect("expected status event for removed symbol");
279        match event {
280            DataEvent::InstrumentStatus(status) => {
281                assert_eq!(status.instrument_id, id);
282                assert_eq!(status.action, MarketStatusAction::NotAvailableForTrading);
283                assert_eq!(status.is_trading, Some(false));
284            }
285            _ => panic!("expected InstrumentStatus event"),
286        }
287
288        assert!(!cached.contains_key(&id));
289    }
290}