Skip to main content

nautilus_architect_ax/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the AX Exchange adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, Mutex,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::Duration,
25};
26
27use ahash::{AHashMap, AHashSet};
28use anyhow::Context;
29use async_trait::async_trait;
30use chrono::{DateTime, Duration as ChronoDuration, Utc};
31use futures_util::StreamExt;
32use nautilus_common::{
33    clients::DataClient,
34    live::{runner::get_data_event_sender, runtime::get_runtime},
35    messages::{
36        DataEvent, DataResponse,
37        data::{
38            BarsResponse, BookResponse, FundingRatesResponse, InstrumentResponse,
39            InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
40            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
41            SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
42            SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
43            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
44            UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeIndexPrices,
45            UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
46            UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
47        },
48    },
49};
50use nautilus_core::{
51    AtomicMap, MUTEX_POISONED,
52    datetime::datetime_to_unix_nanos,
53    nanos::UnixNanos,
54    time::{AtomicTime, get_atomic_clock_realtime},
55};
56use nautilus_model::{
57    data::{Data, FundingRateUpdate, InstrumentStatus, MarkPriceUpdate, OrderBookDeltas_API},
58    enums::{BookType, MarketStatusAction},
59    identifiers::{ClientId, InstrumentId, Venue},
60    instruments::{Instrument, InstrumentAny},
61    types::Price,
62};
63use tokio::task::JoinHandle;
64use tokio_util::sync::CancellationToken;
65use ustr::Ustr;
66
67use crate::{
68    common::{
69        consts::{AX_AUTH_TOKEN_TTL_DATA_SECS, AX_FUNDING_RATE_LOOKBACK_DAYS, AX_VENUE},
70        credential::Credential,
71        enums::{AxCandleWidth, AxInstrumentState, AxMarketDataLevel},
72        parse::{ax_timestamp_stn_to_unix_nanos, map_bar_spec_to_candle_width},
73    },
74    config::AxDataClientConfig,
75    http::client::AxHttpClient,
76    websocket::{
77        data::{
78            client::{AxMdWebSocketClient, AxWsClientError, SymbolDataTypes},
79            parse::{
80                parse_book_l1_quote, parse_book_l2_deltas, parse_book_l3_deltas, parse_candle_bar,
81                parse_trade_tick,
82            },
83        },
84        messages::{AxDataWsMessage, AxMdCandle, AxMdMessage},
85    },
86};
87
88/// AX Exchange data client for live market data streaming and historical data requests.
89///
90/// This client integrates with the Nautilus DataEngine to provide:
91/// - Real-time market data via WebSocket subscriptions
92/// - Historical data via REST API requests
93/// - Automatic instrument discovery and caching
94/// - Connection lifecycle management
95#[derive(Debug)]
96pub struct AxDataClient {
97    /// The client ID for this data client.
98    client_id: ClientId,
99    /// Configuration for the data client.
100    config: AxDataClientConfig,
101    /// HTTP client for REST API requests.
102    http_client: AxHttpClient,
103    /// WebSocket client for real-time data streaming.
104    ws_client: AxMdWebSocketClient,
105    /// Whether the client is currently connected.
106    is_connected: Arc<AtomicBool>,
107    /// Cancellation token for async operations.
108    cancellation_token: CancellationToken,
109    /// Background task handles.
110    tasks: Vec<JoinHandle<()>>,
111    /// Channel sender for emitting data events to the DataEngine.
112    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
113    /// Cached instruments by symbol (shared with HTTP client).
114    instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
115    /// High-resolution clock for timestamps.
116    clock: &'static AtomicTime,
117    funding_rate_tasks: AHashMap<InstrumentId, JoinHandle<()>>,
118    funding_rate_cache: Arc<Mutex<AHashMap<InstrumentId, FundingRateUpdate>>>,
119}
120
121impl AxDataClient {
122    /// Creates a new [`AxDataClient`] instance.
123    ///
124    /// # Errors
125    ///
126    /// Returns an error if the data event sender cannot be obtained.
127    pub fn new(
128        client_id: ClientId,
129        config: AxDataClientConfig,
130        http_client: AxHttpClient,
131        ws_client: AxMdWebSocketClient,
132    ) -> anyhow::Result<Self> {
133        let clock = get_atomic_clock_realtime();
134        let data_sender = get_data_event_sender();
135
136        // Share instruments cache with HTTP client
137        let instruments = http_client.instruments_cache.clone();
138
139        Ok(Self {
140            client_id,
141            config,
142            http_client,
143            ws_client,
144            is_connected: Arc::new(AtomicBool::new(false)),
145            cancellation_token: CancellationToken::new(),
146            tasks: Vec::new(),
147            data_sender,
148            instruments,
149            clock,
150            funding_rate_tasks: AHashMap::new(),
151            funding_rate_cache: Arc::new(Mutex::new(AHashMap::new())),
152        })
153    }
154
155    /// Returns the venue for this data client.
156    #[must_use]
157    pub fn venue(&self) -> Venue {
158        *AX_VENUE
159    }
160
161    fn map_book_type_to_market_data_level(book_type: BookType) -> AxMarketDataLevel {
162        match book_type {
163            BookType::L3_MBO => AxMarketDataLevel::Level3,
164            BookType::L1_MBP | BookType::L2_MBP => AxMarketDataLevel::Level2,
165        }
166    }
167
168    /// Returns a reference to the instruments cache.
169    #[must_use]
170    pub fn instruments(&self) -> &Arc<AtomicMap<Ustr, InstrumentAny>> {
171        &self.instruments
172    }
173
174    /// Spawns a message handler task to forward WebSocket data to the DataEngine.
175    fn spawn_message_handler(&mut self) {
176        let stream = self.ws_client.stream();
177        let data_sender = self.data_sender.clone();
178        let cancellation_token = self.cancellation_token.clone();
179        let is_connected = Arc::clone(&self.is_connected);
180        let instruments = Arc::clone(&self.instruments);
181        let symbol_data_types = self.ws_client.symbol_data_types();
182        let status_invalidations = self.ws_client.status_invalidations();
183        let clock = self.clock;
184
185        let handle = get_runtime().spawn(async move {
186            tokio::pin!(stream);
187
188            let mut book_sequences: AHashMap<Ustr, u64> = AHashMap::new();
189            let mut candle_cache: AHashMap<(Ustr, AxCandleWidth), AxMdCandle> = AHashMap::new();
190            let mut instrument_states: AHashMap<Ustr, AxInstrumentState> = AHashMap::new();
191
192            loop {
193                tokio::select! {
194                    () = cancellation_token.cancelled() => {
195                        log::debug!("Message handler cancelled");
196                        break;
197                    }
198                    msg = stream.next() => {
199                        match msg {
200                            Some(ws_msg) => {
201                                drain_status_invalidations(
202                                    &status_invalidations,
203                                    &mut instrument_states,
204                                );
205
206                                handle_ws_message(
207                                    ws_msg,
208                                    &data_sender,
209                                    &instruments,
210                                    &symbol_data_types,
211                                    &mut book_sequences,
212                                    &mut candle_cache,
213                                    &mut instrument_states,
214                                    clock,
215                                );
216                            }
217                            None => {
218                                log::debug!("WebSocket stream ended");
219                                is_connected.store(false, Ordering::Release);
220                                break;
221                            }
222                        }
223                    }
224                }
225            }
226        });
227
228        self.tasks.push(handle);
229    }
230
231    fn spawn_instrument_refresh(&mut self) {
232        let minutes = self.config.update_instruments_interval_mins;
233        if minutes == 0 {
234            return;
235        }
236
237        let interval = Duration::from_secs(minutes.saturating_mul(60));
238        let cancellation = self.cancellation_token.clone();
239        let instruments_cache = Arc::clone(&self.instruments);
240        let http_client = self.http_client.clone();
241        let data_sender = self.data_sender.clone();
242        let client_id = self.client_id;
243
244        let handle = get_runtime().spawn(async move {
245            loop {
246                let sleep = tokio::time::sleep(interval);
247                tokio::pin!(sleep);
248                tokio::select! {
249                    () = cancellation.cancelled() => {
250                        log::debug!("Instrument refresh task cancelled");
251                        break;
252                    }
253                    () = &mut sleep => {
254                        match http_client.request_instruments(None, None).await {
255                            Ok(instruments) => {
256                                for inst in &instruments {
257                                    instruments_cache.insert(inst.symbol().inner(), inst.clone());
258
259                                    if let Err(e) = data_sender
260                                        .send(DataEvent::Instrument(inst.clone()))
261                                    {
262                                        log::warn!("Failed to send refreshed instrument: {e}");
263                                    }
264                                }
265                                http_client.cache_instruments(&instruments);
266                                log::debug!(
267                                    "Instruments refreshed: client_id={client_id}, count={}",
268                                    instruments.len(),
269                                );
270                            }
271                            Err(e) => {
272                                log::warn!("Failed to refresh instruments: client_id={client_id}, error={e:?}");
273                            }
274                        }
275                    }
276                }
277            }
278        });
279
280        self.tasks.push(handle);
281    }
282
283    #[expect(
284        clippy::unnecessary_wraps,
285        reason = "callers forward Result to trait methods"
286    )]
287    fn ws_symbol_op<F, Fut>(
288        &mut self,
289        instrument_id: InstrumentId,
290        op: F,
291        context: &'static str,
292    ) -> anyhow::Result<()>
293    where
294        F: FnOnce(AxMdWebSocketClient, String) -> Fut + Send + 'static,
295        Fut: Future<Output = Result<(), AxWsClientError>> + Send,
296    {
297        let symbol = instrument_id.symbol.to_string();
298        log::debug!("{context} for {symbol}");
299
300        let ws = self.ws_client.clone();
301        self.spawn_ws(
302            async move { op(ws, symbol).await.map_err(|e| anyhow::anyhow!(e)) },
303            context,
304        );
305
306        Ok(())
307    }
308
309    fn spawn_ws<F>(&mut self, fut: F, context: &'static str)
310    where
311        F: Future<Output = anyhow::Result<()>> + Send + 'static,
312    {
313        let handle = get_runtime().spawn(async move {
314            if let Err(e) = fut.await {
315                log::error!("{context}: {e:?}");
316            }
317        });
318
319        self.tasks.retain(|h| !h.is_finished());
320        self.tasks.push(handle);
321    }
322
323    fn abort_all_tasks(&mut self) {
324        self.cancellation_token.cancel();
325
326        for task in self.tasks.drain(..) {
327            task.abort();
328        }
329
330        for (_, task) in self.funding_rate_tasks.drain() {
331            task.abort();
332        }
333    }
334}
335
336#[async_trait(?Send)]
337impl DataClient for AxDataClient {
338    fn client_id(&self) -> ClientId {
339        self.client_id
340    }
341
342    fn venue(&self) -> Option<Venue> {
343        Some(*AX_VENUE)
344    }
345
346    fn start(&mut self) -> anyhow::Result<()> {
347        log::debug!("Starting {}", self.client_id);
348        Ok(())
349    }
350
351    fn stop(&mut self) -> anyhow::Result<()> {
352        log::debug!("Stopping {}", self.client_id);
353        self.abort_all_tasks();
354        self.is_connected.store(false, Ordering::Release);
355        Ok(())
356    }
357
358    fn reset(&mut self) -> anyhow::Result<()> {
359        log::debug!("Resetting {}", self.client_id);
360        self.abort_all_tasks();
361        self.funding_rate_cache
362            .lock()
363            .expect(MUTEX_POISONED)
364            .clear();
365        self.cancellation_token = CancellationToken::new();
366        Ok(())
367    }
368
369    fn dispose(&mut self) -> anyhow::Result<()> {
370        log::debug!("Disposing {}", self.client_id);
371        self.abort_all_tasks();
372        self.is_connected.store(false, Ordering::Release);
373        Ok(())
374    }
375
376    fn is_connected(&self) -> bool {
377        self.is_connected.load(Ordering::Acquire)
378    }
379
380    fn is_disconnected(&self) -> bool {
381        !self.is_connected()
382    }
383
384    async fn connect(&mut self) -> anyhow::Result<()> {
385        if self.is_connected() {
386            log::debug!("Already connected {}", self.client_id);
387            return Ok(());
388        }
389
390        log::info!("Connecting {}", self.client_id);
391
392        // Recreate token so a previous disconnect/stop doesn't block new operations
393        self.cancellation_token = CancellationToken::new();
394
395        if self.config.has_api_credentials() {
396            let credential =
397                Credential::resolve(self.config.api_key.clone(), self.config.api_secret.clone())
398                    .context("API credentials not configured")?;
399
400            let token = self
401                .http_client
402                .authenticate(
403                    credential.api_key(),
404                    credential.api_secret(),
405                    AX_AUTH_TOKEN_TTL_DATA_SECS,
406                )
407                .await
408                .context("Failed to authenticate with Ax")?;
409            log::info!("Authenticated with Ax");
410            self.ws_client.set_auth_token(token);
411        }
412
413        let instruments = self
414            .http_client
415            .request_instruments(None, None)
416            .await
417            .context("Failed to fetch instruments")?;
418
419        for instrument in &instruments {
420            self.instruments
421                .insert(instrument.symbol().inner(), instrument.clone());
422
423            if let Err(e) = self
424                .data_sender
425                .send(DataEvent::Instrument(instrument.clone()))
426            {
427                log::warn!("Failed to send instrument: {e}");
428            }
429        }
430        self.http_client.cache_instruments(&instruments);
431        log::info!(
432            "Cached {} instruments",
433            self.http_client.get_cached_symbols().len()
434        );
435
436        self.ws_client
437            .connect()
438            .await
439            .context("Failed to connect WebSocket")?;
440        log::info!("WebSocket connected");
441        self.spawn_message_handler();
442        self.spawn_instrument_refresh();
443
444        self.is_connected.store(true, Ordering::Release);
445        log::info!("Connected {}", self.client_id);
446
447        Ok(())
448    }
449
450    async fn disconnect(&mut self) -> anyhow::Result<()> {
451        log::info!("Disconnecting {}", self.client_id);
452        self.ws_client.close().await;
453        self.abort_all_tasks();
454        self.funding_rate_cache
455            .lock()
456            .expect(MUTEX_POISONED)
457            .clear();
458
459        self.is_connected.store(false, Ordering::Release);
460        log::info!("Disconnected {}", self.client_id);
461
462        Ok(())
463    }
464
465    fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
466        // AX does not have a real-time instruments channel; instruments are fetched via HTTP
467        log::debug!("Instruments subscription not applicable for AX (use request_instruments)");
468        Ok(())
469    }
470
471    fn subscribe_instrument(&mut self, _cmd: SubscribeInstrument) -> anyhow::Result<()> {
472        // AX does not have a real-time instrument channel; instruments are fetched via HTTP
473        log::debug!("Instrument subscription not applicable for AX (use request_instrument)");
474        Ok(())
475    }
476
477    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
478        let symbol = cmd.instrument_id.symbol.to_string();
479        let level = Self::map_book_type_to_market_data_level(cmd.book_type);
480        if cmd.book_type == BookType::L1_MBP {
481            log::warn!(
482                "Book type L1_MBP not supported by AX for deltas, downgrading {symbol} to LEVEL_2"
483            );
484        }
485        log::debug!("Subscribing to book deltas for {symbol} at {level:?}");
486
487        let ws = self.ws_client.clone();
488        self.spawn_ws(
489            async move {
490                ws.subscribe_book_deltas(&symbol, level)
491                    .await
492                    .map_err(|e| anyhow::anyhow!(e))
493            },
494            "subscribe book deltas",
495        );
496
497        Ok(())
498    }
499
500    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
501        self.ws_symbol_op(
502            cmd.instrument_id,
503            |ws, s| async move { ws.subscribe_quotes(&s).await },
504            "Subscribing to quotes",
505        )
506    }
507
508    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
509        self.ws_symbol_op(
510            cmd.instrument_id,
511            |ws, s| async move { ws.subscribe_trades(&s).await },
512            "Subscribing to trades",
513        )
514    }
515
516    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
517        self.ws_symbol_op(
518            cmd.instrument_id,
519            |ws, s| async move { ws.subscribe_mark_prices(&s).await },
520            "Subscribing to mark prices",
521        )
522    }
523
524    fn subscribe_index_prices(&mut self, _cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
525        log::warn!("Index prices not supported by AX Exchange");
526        Ok(())
527    }
528
529    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
530        let bar_type = cmd.bar_type;
531        let symbol = bar_type.instrument_id().symbol.to_string();
532        let width = map_bar_spec_to_candle_width(&bar_type.spec())?;
533        log::debug!("Subscribing to bars for {bar_type} (width: {width:?})");
534
535        let ws = self.ws_client.clone();
536        self.spawn_ws(
537            async move {
538                ws.subscribe_candles(&symbol, width)
539                    .await
540                    .map_err(|e| anyhow::anyhow!(e))
541            },
542            "subscribe bars",
543        );
544
545        Ok(())
546    }
547
548    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
549        let poll_interval_mins = self.config.funding_rate_poll_interval_mins.max(1);
550
551        // Use 7-day lookback to capture latest rate across weekends/holidays
552        let lookback = ChronoDuration::days(AX_FUNDING_RATE_LOOKBACK_DAYS);
553
554        let instrument_id = cmd.instrument_id;
555
556        if self.funding_rate_tasks.contains_key(&instrument_id) {
557            log::debug!("Already subscribed to funding rates for {instrument_id}");
558            return Ok(());
559        }
560
561        log::debug!("Subscribing to funding rates for {instrument_id} (HTTP polling)");
562
563        let http = self.http_client.clone();
564        let sender = self.data_sender.clone();
565        let symbol = instrument_id.symbol.inner();
566        let cancel = self.cancellation_token.clone();
567        let cache = Arc::clone(&self.funding_rate_cache);
568        let clock = self.clock;
569
570        let handle = get_runtime().spawn(async move {
571            // First tick fires immediately for initial emission
572            let mut interval = tokio::time::interval(Duration::from_mins(poll_interval_mins));
573
574            loop {
575                tokio::select! {
576                    () = cancel.cancelled() => {
577                        log::debug!("Funding rate polling cancelled for {symbol}");
578                        break;
579                    }
580                    _ = interval.tick() => {
581                        let now: DateTime<Utc> = clock.get_time_ns().into();
582                        let start = now - lookback;
583
584                        match http.request_funding_rates(instrument_id, Some(start), Some(now)).await {
585                            Ok(funding_rates) => {
586                                if funding_rates.is_empty() {
587                                    log::warn!(
588                                        "No funding rates returned for {symbol}"
589                                    );
590                                } else if let Some(update) = funding_rates.last() {
591                                    // Only emit if rate changed
592                                    let should_emit = cache.lock().expect(MUTEX_POISONED)
593                                        .get(&instrument_id) != Some(update);
594
595                                    if should_emit {
596                                        log::info!(
597                                            "Funding rate for {symbol}: {}",
598                                            update.rate,
599                                        );
600                                        let update = *update;
601                                        cache.lock().expect(MUTEX_POISONED)
602                                            .insert(instrument_id, update);
603
604                                        if let Err(e) = sender.send(
605                                            DataEvent::FundingRate(update),
606                                        ) {
607                                            log::error!(
608                                                "Failed to send funding rate for {symbol}: {e}"
609                                            );
610                                        }
611                                    }
612                                }
613                            }
614                            Err(e) => {
615                                log::error!(
616                                    "Failed to poll funding rates for {symbol}: {e}"
617                                );
618                            }
619                        }
620                    }
621                }
622            }
623        });
624
625        self.funding_rate_tasks.insert(instrument_id, handle);
626        Ok(())
627    }
628
629    fn subscribe_instrument_status(
630        &mut self,
631        cmd: SubscribeInstrumentStatus,
632    ) -> anyhow::Result<()> {
633        self.ws_symbol_op(
634            cmd.instrument_id,
635            |ws, s| async move { ws.subscribe_instrument_status(&s).await },
636            "Subscribing to instrument status",
637        )
638    }
639
640    fn subscribe_instrument_close(&mut self, _cmd: SubscribeInstrumentClose) -> anyhow::Result<()> {
641        log::warn!("Instrument close not supported by AX Exchange");
642        Ok(())
643    }
644
645    fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
646        Ok(())
647    }
648
649    fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
650        Ok(())
651    }
652
653    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
654        self.ws_symbol_op(
655            cmd.instrument_id,
656            |ws, s| async move { ws.unsubscribe_book_deltas(&s).await },
657            "Unsubscribing from book deltas",
658        )
659    }
660
661    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
662        self.ws_symbol_op(
663            cmd.instrument_id,
664            |ws, s| async move { ws.unsubscribe_quotes(&s).await },
665            "Unsubscribing from quotes",
666        )
667    }
668
669    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
670        self.ws_symbol_op(
671            cmd.instrument_id,
672            |ws, s| async move { ws.unsubscribe_trades(&s).await },
673            "Unsubscribing from trades",
674        )
675    }
676
677    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
678        self.ws_symbol_op(
679            cmd.instrument_id,
680            |ws, s| async move { ws.unsubscribe_mark_prices(&s).await },
681            "Unsubscribing from mark prices",
682        )
683    }
684
685    fn unsubscribe_index_prices(&mut self, _cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
686        Ok(())
687    }
688
689    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
690        let bar_type = cmd.bar_type;
691        let symbol = bar_type.instrument_id().symbol.to_string();
692        let width = map_bar_spec_to_candle_width(&bar_type.spec())?;
693        log::debug!("Unsubscribing from bars for {bar_type}");
694
695        let ws = self.ws_client.clone();
696        self.spawn_ws(
697            async move {
698                ws.unsubscribe_candles(&symbol, width)
699                    .await
700                    .map_err(|e| anyhow::anyhow!(e))
701            },
702            "unsubscribe bars",
703        );
704
705        Ok(())
706    }
707
708    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
709        let instrument_id = cmd.instrument_id;
710
711        if let Some(task) = self.funding_rate_tasks.remove(&instrument_id) {
712            log::debug!("Unsubscribing from funding rates for {instrument_id}");
713            task.abort();
714            self.funding_rate_cache
715                .lock()
716                .expect(MUTEX_POISONED)
717                .remove(&instrument_id);
718        } else {
719            log::debug!("Not subscribed to funding rates for {instrument_id}");
720        }
721
722        Ok(())
723    }
724
725    fn unsubscribe_instrument_status(
726        &mut self,
727        cmd: &UnsubscribeInstrumentStatus,
728    ) -> anyhow::Result<()> {
729        self.ws_symbol_op(
730            cmd.instrument_id,
731            |ws, s| async move { ws.unsubscribe_instrument_status(&s).await },
732            "Unsubscribing from instrument status",
733        )
734    }
735
736    fn unsubscribe_instrument_close(
737        &mut self,
738        _cmd: &UnsubscribeInstrumentClose,
739    ) -> anyhow::Result<()> {
740        Ok(())
741    }
742
743    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
744        let http = self.http_client.clone();
745        let instruments_cache = Arc::clone(&self.instruments);
746        let sender = self.data_sender.clone();
747        let cancel = self.cancellation_token.clone();
748        let request_id = request.request_id;
749        let client_id = request.client_id.unwrap_or(self.client_id);
750        let venue = *AX_VENUE;
751        let start_nanos = datetime_to_unix_nanos(request.start);
752        let end_nanos = datetime_to_unix_nanos(request.end);
753        let params = request.params;
754        let clock = self.clock;
755
756        get_runtime().spawn(async move {
757            match http.request_instruments(None, None).await {
758                Ok(instruments) => {
759                    if cancel.is_cancelled() {
760                        return;
761                    }
762                    log::info!("Fetched {} instruments from Ax", instruments.len());
763                    for inst in &instruments {
764                        instruments_cache.insert(inst.symbol().inner(), inst.clone());
765                    }
766                    http.cache_instruments(&instruments);
767
768                    let response = DataResponse::Instruments(InstrumentsResponse::new(
769                        request_id,
770                        client_id,
771                        venue,
772                        instruments,
773                        start_nanos,
774                        end_nanos,
775                        clock.get_time_ns(),
776                        params,
777                    ));
778
779                    if let Err(e) = sender.send(DataEvent::Response(response)) {
780                        log::error!("Failed to send instruments response: {e}");
781                    }
782                }
783                Err(e) => {
784                    log::error!("Failed to request instruments: {e}");
785                }
786            }
787        });
788
789        Ok(())
790    }
791
792    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
793        let http = self.http_client.clone();
794        let instruments_cache = Arc::clone(&self.instruments);
795        let sender = self.data_sender.clone();
796        let cancel = self.cancellation_token.clone();
797        let request_id = request.request_id;
798        let client_id = request.client_id.unwrap_or(self.client_id);
799        let instrument_id = request.instrument_id;
800        let symbol = instrument_id.symbol.inner();
801        let start_nanos = datetime_to_unix_nanos(request.start);
802        let end_nanos = datetime_to_unix_nanos(request.end);
803        let params = request.params;
804        let clock = self.clock;
805
806        get_runtime().spawn(async move {
807            match http.request_instrument(symbol, None, None).await {
808                Ok(instrument) => {
809                    if cancel.is_cancelled() {
810                        return;
811                    }
812                    log::debug!("Fetched instrument {symbol} from Ax");
813                    instruments_cache.insert(symbol, instrument.clone());
814                    http.cache_instrument(instrument.clone());
815
816                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
817                        request_id,
818                        client_id,
819                        instrument_id,
820                        instrument,
821                        start_nanos,
822                        end_nanos,
823                        clock.get_time_ns(),
824                        params,
825                    )));
826
827                    if let Err(e) = sender.send(DataEvent::Response(response)) {
828                        log::error!("Failed to send instrument response: {e}");
829                    }
830                }
831                Err(e) => {
832                    log::error!("Failed to request instrument {symbol}: {e}");
833                }
834            }
835        });
836
837        Ok(())
838    }
839
840    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
841        let http = self.http_client.clone();
842        let sender = self.data_sender.clone();
843        let cancel = self.cancellation_token.clone();
844        let request_id = request.request_id;
845        let client_id = request.client_id.unwrap_or(self.client_id);
846        let instrument_id = request.instrument_id;
847        let symbol = instrument_id.symbol.inner();
848        let depth = request.depth.map(|n| n.get());
849        let params = request.params;
850        let clock = self.clock;
851
852        get_runtime().spawn(async move {
853            match http.request_book_snapshot(symbol, depth).await {
854                Ok(book) => {
855                    if cancel.is_cancelled() {
856                        return;
857                    }
858                    log::debug!(
859                        "Fetched book snapshot for {symbol} ({} bids, {} asks)",
860                        book.bids(None).count(),
861                        book.asks(None).count(),
862                    );
863
864                    let response = DataResponse::Book(BookResponse::new(
865                        request_id,
866                        client_id,
867                        instrument_id,
868                        book,
869                        None,
870                        None,
871                        clock.get_time_ns(),
872                        params,
873                    ));
874
875                    if let Err(e) = sender.send(DataEvent::Response(response)) {
876                        log::error!("Failed to send book snapshot response: {e}");
877                    }
878                }
879                Err(e) => {
880                    log::error!("Failed to request book snapshot for {symbol}: {e}");
881                }
882            }
883        });
884
885        Ok(())
886    }
887
888    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
889        let http = self.http_client.clone();
890        let sender = self.data_sender.clone();
891        let cancel = self.cancellation_token.clone();
892        let request_id = request.request_id;
893        let client_id = request.client_id.unwrap_or(self.client_id);
894        let instrument_id = request.instrument_id;
895        let symbol = instrument_id.symbol.inner();
896        let limit = request.limit.map(|n| n.get() as i32);
897        let start_nanos = datetime_to_unix_nanos(request.start);
898        let end_nanos = datetime_to_unix_nanos(request.end);
899        let params = request.params;
900        let clock = self.clock;
901
902        get_runtime().spawn(async move {
903            match http
904                .request_trade_ticks(symbol, limit, start_nanos, end_nanos)
905                .await
906            {
907                Ok(ticks) => {
908                    if cancel.is_cancelled() {
909                        return;
910                    }
911                    log::debug!("Fetched {} trades for {symbol}", ticks.len());
912
913                    let response = DataResponse::Trades(TradesResponse::new(
914                        request_id,
915                        client_id,
916                        instrument_id,
917                        ticks,
918                        start_nanos,
919                        end_nanos,
920                        clock.get_time_ns(),
921                        params,
922                    ));
923
924                    if let Err(e) = sender.send(DataEvent::Response(response)) {
925                        log::error!("Failed to send trades response: {e}");
926                    }
927                }
928                Err(e) => {
929                    log::error!("Failed to request trades for {symbol}: {e}");
930                }
931            }
932        });
933
934        Ok(())
935    }
936
937    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
938        let http = self.http_client.clone();
939        let sender = self.data_sender.clone();
940        let request_id = request.request_id;
941        let client_id = request.client_id.unwrap_or(self.client_id);
942        let bar_type = request.bar_type;
943        let symbol = bar_type.instrument_id().symbol.inner();
944        let start = request.start;
945        let end = request.end;
946        let start_nanos = datetime_to_unix_nanos(start);
947        let end_nanos = datetime_to_unix_nanos(end);
948        let params = request.params;
949        let clock = self.clock;
950        let width = match map_bar_spec_to_candle_width(&bar_type.spec()) {
951            Ok(w) => w,
952            Err(e) => {
953                log::error!("Failed to map bar type {bar_type}: {e}");
954                return Err(e);
955            }
956        };
957
958        let cancel = self.cancellation_token.clone();
959
960        get_runtime().spawn(async move {
961            match http.request_bars(symbol, start, end, width).await {
962                Ok(bars) => {
963                    if cancel.is_cancelled() {
964                        return;
965                    }
966                    log::debug!("Fetched {} bars for {symbol}", bars.len());
967
968                    let response = DataResponse::Bars(BarsResponse::new(
969                        request_id,
970                        client_id,
971                        bar_type,
972                        bars,
973                        start_nanos,
974                        end_nanos,
975                        clock.get_time_ns(),
976                        params,
977                    ));
978
979                    if let Err(e) = sender.send(DataEvent::Response(response)) {
980                        log::error!("Failed to send bars response: {e}");
981                    }
982                }
983                Err(e) => {
984                    log::error!("Failed to request bars for {symbol}: {e}");
985                }
986            }
987        });
988
989        Ok(())
990    }
991
992    fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
993        let http = self.http_client.clone();
994        let sender = self.data_sender.clone();
995        let cancel = self.cancellation_token.clone();
996        let request_id = request.request_id;
997        let client_id = request.client_id.unwrap_or(self.client_id);
998        let instrument_id = request.instrument_id;
999        let symbol = instrument_id.symbol.inner();
1000        let start = request.start;
1001        let end = request.end;
1002        let start_nanos = datetime_to_unix_nanos(start);
1003        let end_nanos = datetime_to_unix_nanos(end);
1004        let params = request.params;
1005        let clock = self.clock;
1006
1007        get_runtime().spawn(async move {
1008            match http.request_funding_rates(instrument_id, start, end).await {
1009                Ok(funding_rates) => {
1010                    if cancel.is_cancelled() {
1011                        return;
1012                    }
1013                    log::debug!("Fetched {} funding rates for {symbol}", funding_rates.len());
1014
1015                    let ts_init = clock.get_time_ns();
1016                    let response = DataResponse::FundingRates(FundingRatesResponse::new(
1017                        request_id,
1018                        client_id,
1019                        instrument_id,
1020                        funding_rates,
1021                        start_nanos,
1022                        end_nanos,
1023                        ts_init,
1024                        params,
1025                    ));
1026
1027                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1028                        log::error!("Failed to send funding rates response: {e}");
1029                    }
1030                }
1031                Err(e) => {
1032                    log::error!("Failed to request funding rates for {symbol}: {e}");
1033                }
1034            }
1035        });
1036
1037        Ok(())
1038    }
1039}
1040
1041fn drain_status_invalidations(
1042    invalidations: &Arc<Mutex<AHashSet<Ustr>>>,
1043    instrument_states: &mut AHashMap<Ustr, AxInstrumentState>,
1044) {
1045    if let Ok(mut set) = invalidations.lock() {
1046        for symbol in set.drain() {
1047            instrument_states.remove(&symbol);
1048        }
1049    }
1050}
1051
1052#[expect(clippy::too_many_arguments)]
1053fn handle_ws_message(
1054    msg: AxDataWsMessage,
1055    sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1056    instruments: &Arc<AtomicMap<Ustr, InstrumentAny>>,
1057    symbol_data_types: &Arc<AtomicMap<String, SymbolDataTypes>>,
1058    book_sequences: &mut AHashMap<Ustr, u64>,
1059    candle_cache: &mut AHashMap<(Ustr, AxCandleWidth), AxMdCandle>,
1060    instrument_states: &mut AHashMap<Ustr, AxInstrumentState>,
1061    clock: &'static AtomicTime,
1062) {
1063    match msg {
1064        AxDataWsMessage::Reconnected => {
1065            candle_cache.clear();
1066            instrument_states.clear();
1067            log::info!("WebSocket reconnected");
1068        }
1069        AxDataWsMessage::CandleUnsubscribed { symbol, width } => {
1070            candle_cache.remove(&(symbol, width));
1071        }
1072        AxDataWsMessage::MdMessage(md_msg) => {
1073            handle_md_message(
1074                md_msg,
1075                sender,
1076                instruments,
1077                symbol_data_types,
1078                book_sequences,
1079                candle_cache,
1080                instrument_states,
1081                clock,
1082            );
1083        }
1084    }
1085}
1086
1087#[expect(clippy::too_many_arguments)]
1088fn handle_md_message(
1089    message: AxMdMessage,
1090    sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1091    instruments: &Arc<AtomicMap<Ustr, InstrumentAny>>,
1092    symbol_data_types: &Arc<AtomicMap<String, SymbolDataTypes>>,
1093    book_sequences: &mut AHashMap<Ustr, u64>,
1094    candle_cache: &mut AHashMap<(Ustr, AxCandleWidth), AxMdCandle>,
1095    instrument_states: &mut AHashMap<Ustr, AxInstrumentState>,
1096    clock: &'static AtomicTime,
1097) {
1098    let ts_init = || -> UnixNanos { clock.get_time_ns() };
1099
1100    let instruments_snap = instruments.load();
1101    let sdt_snap = symbol_data_types.load();
1102
1103    match message {
1104        AxMdMessage::BookL1(book) => {
1105            let l1_subscribed = sdt_snap
1106                .get(book.s.as_str())
1107                .is_some_and(|e| e.quotes || e.book_level == Some(AxMarketDataLevel::Level1));
1108
1109            if !l1_subscribed {
1110                return;
1111            }
1112
1113            let Some(instrument) = instruments_snap.get(&book.s) else {
1114                log::error!(
1115                    "No instrument cached for symbol '{}' - cannot parse L1 book",
1116                    book.s
1117                );
1118                return;
1119            };
1120
1121            match parse_book_l1_quote(&book, instrument, ts_init()) {
1122                Ok(quote) => {
1123                    let _ = sender.send(DataEvent::Data(Data::Quote(quote)));
1124                }
1125                Err(e) => log::error!("Failed to parse L1 to QuoteTick: {e}"),
1126            }
1127        }
1128        AxMdMessage::BookL2(book) => {
1129            let symbol = book.s;
1130            let seq = book_sequences.entry(symbol).or_insert(0);
1131            *seq += 1;
1132            let sequence = *seq;
1133
1134            let Some(instrument) = instruments_snap.get(&symbol) else {
1135                log::error!("No instrument cached for symbol '{symbol}' - cannot parse L2 book");
1136                return;
1137            };
1138
1139            match parse_book_l2_deltas(&book, instrument, sequence, ts_init()) {
1140                Ok(deltas) => {
1141                    let api_deltas = OrderBookDeltas_API::new(deltas);
1142                    let _ = sender.send(DataEvent::Data(Data::Deltas(api_deltas)));
1143                }
1144                Err(e) => log::error!("Failed to parse L2 to OrderBookDeltas: {e}"),
1145            }
1146        }
1147        AxMdMessage::BookL3(book) => {
1148            let symbol = book.s;
1149            let seq = book_sequences.entry(symbol).or_insert(0);
1150            *seq += 1;
1151            let sequence = *seq;
1152
1153            let Some(instrument) = instruments_snap.get(&symbol) else {
1154                log::error!("No instrument cached for symbol '{symbol}' - cannot parse L3 book");
1155                return;
1156            };
1157
1158            match parse_book_l3_deltas(&book, instrument, sequence, ts_init()) {
1159                Ok(deltas) => {
1160                    let api_deltas = OrderBookDeltas_API::new(deltas);
1161                    let _ = sender.send(DataEvent::Data(Data::Deltas(api_deltas)));
1162                }
1163                Err(e) => log::error!("Failed to parse L3 to OrderBookDeltas: {e}"),
1164            }
1165        }
1166        AxMdMessage::Ticker(ticker) => {
1167            let Some(instrument) = instruments_snap.get(&ticker.s) else {
1168                log::debug!("No instrument cached for ticker symbol '{}'", ticker.s);
1169                return;
1170            };
1171
1172            let instrument_id = instrument.id();
1173            let price_precision = instrument.price_precision();
1174            let ts_event =
1175                ax_timestamp_stn_to_unix_nanos(ticker.ts, ticker.tn).unwrap_or_else(|_| ts_init());
1176            let ts_init = ts_init();
1177
1178            let mark_prices_subscribed = sdt_snap
1179                .get(ticker.s.as_str())
1180                .is_some_and(|e| e.mark_prices);
1181            if mark_prices_subscribed && let Some(mark_price) = ticker.m {
1182                match Price::from_decimal_dp(mark_price, price_precision) {
1183                    Ok(price) => {
1184                        let update = MarkPriceUpdate::new(instrument_id, price, ts_event, ts_init);
1185                        let _ = sender.send(DataEvent::Data(Data::MarkPriceUpdate(update)));
1186                    }
1187                    Err(e) => {
1188                        log::error!("Failed to parse mark price for {}: {e}", ticker.s);
1189                    }
1190                }
1191            }
1192
1193            if let Some(state) = ticker.i {
1194                let status_subscribed = sdt_snap
1195                    .get(ticker.s.as_str())
1196                    .is_some_and(|e| e.instrument_status);
1197                if status_subscribed {
1198                    let prev = instrument_states.insert(ticker.s, state);
1199                    if prev != Some(state) {
1200                        let action = MarketStatusAction::from(state);
1201                        let status = InstrumentStatus::new(
1202                            instrument_id,
1203                            action,
1204                            ts_event,
1205                            ts_init,
1206                            None,
1207                            None,
1208                            Some(state == AxInstrumentState::Open),
1209                            None,
1210                            None,
1211                        );
1212                        let _ = sender.send(DataEvent::InstrumentStatus(status));
1213                    }
1214                }
1215            }
1216        }
1217        AxMdMessage::Trade(trade) => {
1218            let trades_subscribed = sdt_snap.get(trade.s.as_str()).is_some_and(|e| e.trades);
1219
1220            if !trades_subscribed {
1221                return;
1222            }
1223
1224            let Some(instrument) = instruments_snap.get(&trade.s) else {
1225                log::error!(
1226                    "No instrument cached for symbol '{}' - cannot parse trade",
1227                    trade.s
1228                );
1229                return;
1230            };
1231
1232            match parse_trade_tick(&trade, instrument, ts_init()) {
1233                Ok(tick) => {
1234                    let _ = sender.send(DataEvent::Data(Data::Trade(tick)));
1235                }
1236                Err(e) => log::error!("Failed to parse trade to TradeTick: {e}"),
1237            }
1238        }
1239        AxMdMessage::Candle(candle) => {
1240            let cache_key = (candle.symbol, candle.width);
1241
1242            let closed_candle = if let Some(cached) = candle_cache.get(&cache_key) {
1243                if cached.ts == candle.ts {
1244                    None
1245                } else {
1246                    Some(cached.clone())
1247                }
1248            } else {
1249                None
1250            };
1251
1252            candle_cache.insert(cache_key, candle);
1253
1254            if let Some(closed) = closed_candle {
1255                let Some(instrument) = instruments_snap.get(&closed.symbol) else {
1256                    log::error!(
1257                        "No instrument cached for symbol '{}' - cannot parse candle",
1258                        closed.symbol
1259                    );
1260                    return;
1261                };
1262
1263                match parse_candle_bar(&closed, instrument, ts_init()) {
1264                    Ok(bar) => {
1265                        let _ = sender.send(DataEvent::Data(Data::Bar(bar)));
1266                    }
1267                    Err(e) => log::error!("Failed to parse candle to Bar: {e}"),
1268                }
1269            }
1270        }
1271        AxMdMessage::Heartbeat(_) => {
1272            log::trace!("Received heartbeat");
1273        }
1274        AxMdMessage::SubscriptionResponse(_) => {}
1275        AxMdMessage::Error(error) => {
1276            log::error!("WebSocket error: {}", error.message);
1277        }
1278    }
1279}
1280
1281#[cfg(test)]
1282mod tests {
1283    use std::sync::{Arc, Mutex};
1284
1285    use ahash::{AHashMap, AHashSet};
1286    use nautilus_model::{
1287        data::InstrumentStatus,
1288        enums::AssetClass,
1289        identifiers::{InstrumentId, Symbol},
1290        instruments::PerpetualContract,
1291        types::{Currency, Price, Quantity},
1292    };
1293    use rstest::rstest;
1294    use rust_decimal::Decimal;
1295    use ustr::Ustr;
1296
1297    use super::*;
1298    use crate::websocket::{
1299        data::client::SymbolDataTypes,
1300        messages::{AxMdMessage, AxMdTicker},
1301    };
1302
1303    #[rstest]
1304    fn test_drain_status_invalidations_removes_cached_state() {
1305        let invalidations = Arc::new(Mutex::new(AHashSet::new()));
1306        let mut states = AHashMap::new();
1307        let sym = Ustr::from("EURUSD-PERP");
1308
1309        states.insert(sym, AxInstrumentState::Open);
1310        invalidations.lock().unwrap().insert(sym);
1311
1312        drain_status_invalidations(&invalidations, &mut states);
1313
1314        assert!(!states.contains_key(&sym));
1315        assert!(invalidations.lock().unwrap().is_empty());
1316    }
1317
1318    #[rstest]
1319    fn test_drain_status_invalidations_no_op_when_empty() {
1320        let invalidations = Arc::new(Mutex::new(AHashSet::new()));
1321        let mut states = AHashMap::new();
1322        let sym = Ustr::from("EURUSD-PERP");
1323        states.insert(sym, AxInstrumentState::Open);
1324
1325        drain_status_invalidations(&invalidations, &mut states);
1326
1327        assert!(states.contains_key(&sym));
1328    }
1329
1330    fn ticker_test_instrument() -> InstrumentAny {
1331        let symbol = Symbol::new("EURUSD-PERP");
1332        let instrument = PerpetualContract::new(
1333            InstrumentId::new(symbol, *crate::common::consts::AX_VENUE),
1334            symbol,
1335            Ustr::from("EURUSD"),
1336            AssetClass::FX,
1337            None,
1338            Currency::USD(),
1339            Currency::USD(),
1340            false,
1341            4,
1342            0,
1343            Price::from("0.0001"),
1344            Quantity::from("1"),
1345            None,
1346            None,
1347            None,
1348            None,
1349            None,
1350            None,
1351            None,
1352            None,
1353            Some(Decimal::new(1, 2)),
1354            Some(Decimal::new(5, 3)),
1355            Some(Decimal::new(2, 4)),
1356            Some(Decimal::new(5, 4)),
1357            None,
1358            UnixNanos::default(),
1359            UnixNanos::default(),
1360        );
1361        InstrumentAny::PerpetualContract(instrument)
1362    }
1363
1364    fn ticker_message(state: AxInstrumentState) -> AxMdTicker {
1365        AxMdTicker {
1366            ts: 1_700_000_000,
1367            tn: 0,
1368            s: Ustr::from("EURUSD-PERP"),
1369            p: rust_decimal::Decimal::ZERO,
1370            q: 0,
1371            o: rust_decimal::Decimal::ZERO,
1372            l: rust_decimal::Decimal::ZERO,
1373            h: rust_decimal::Decimal::ZERO,
1374            v: 0,
1375            oi: None,
1376            m: None,
1377            i: Some(state),
1378            pl: None,
1379            pu: None,
1380            lsp: None,
1381        }
1382    }
1383
1384    fn collect_instrument_statuses(
1385        rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
1386    ) -> Vec<InstrumentStatus> {
1387        let mut statuses = Vec::new();
1388
1389        while let Ok(event) = rx.try_recv() {
1390            if let DataEvent::InstrumentStatus(status) = event {
1391                statuses.push(status);
1392            }
1393        }
1394        statuses
1395    }
1396
1397    #[rstest]
1398    fn test_ticker_instrument_status_emitted_once_when_state_unchanged() {
1399        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1400        let instruments = Arc::new(AtomicMap::new());
1401        instruments.insert(Ustr::from("EURUSD-PERP"), ticker_test_instrument());
1402
1403        let sdt = Arc::new(AtomicMap::new());
1404        sdt.insert(
1405            "EURUSD-PERP".to_string(),
1406            SymbolDataTypes {
1407                quotes: false,
1408                trades: false,
1409                mark_prices: false,
1410                instrument_status: true,
1411                book_level: None,
1412            },
1413        );
1414
1415        let mut book_sequences = AHashMap::new();
1416        let mut candle_cache = AHashMap::new();
1417        let mut instrument_states = AHashMap::new();
1418        let clock = get_atomic_clock_realtime();
1419
1420        let msg = AxMdMessage::Ticker(ticker_message(AxInstrumentState::Open));
1421        handle_md_message(
1422            msg.clone(),
1423            &tx,
1424            &instruments,
1425            &sdt,
1426            &mut book_sequences,
1427            &mut candle_cache,
1428            &mut instrument_states,
1429            clock,
1430        );
1431
1432        // Same state repeated: second call should not emit a second InstrumentStatus
1433        handle_md_message(
1434            msg,
1435            &tx,
1436            &instruments,
1437            &sdt,
1438            &mut book_sequences,
1439            &mut candle_cache,
1440            &mut instrument_states,
1441            clock,
1442        );
1443
1444        let statuses = collect_instrument_statuses(&mut rx);
1445        assert_eq!(
1446            statuses.len(),
1447            1,
1448            "expected a single emission, found {statuses:?}"
1449        );
1450        assert_eq!(statuses[0].is_trading, Some(true));
1451    }
1452
1453    #[rstest]
1454    fn test_ticker_instrument_status_emitted_on_transition() {
1455        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1456        let instruments = Arc::new(AtomicMap::new());
1457        instruments.insert(Ustr::from("EURUSD-PERP"), ticker_test_instrument());
1458
1459        let sdt = Arc::new(AtomicMap::new());
1460        sdt.insert(
1461            "EURUSD-PERP".to_string(),
1462            SymbolDataTypes {
1463                quotes: false,
1464                trades: false,
1465                mark_prices: false,
1466                instrument_status: true,
1467                book_level: None,
1468            },
1469        );
1470
1471        let mut book_sequences = AHashMap::new();
1472        let mut candle_cache = AHashMap::new();
1473        let mut instrument_states = AHashMap::new();
1474        let clock = get_atomic_clock_realtime();
1475
1476        handle_md_message(
1477            AxMdMessage::Ticker(ticker_message(AxInstrumentState::Open)),
1478            &tx,
1479            &instruments,
1480            &sdt,
1481            &mut book_sequences,
1482            &mut candle_cache,
1483            &mut instrument_states,
1484            clock,
1485        );
1486        handle_md_message(
1487            AxMdMessage::Ticker(ticker_message(AxInstrumentState::Closed)),
1488            &tx,
1489            &instruments,
1490            &sdt,
1491            &mut book_sequences,
1492            &mut candle_cache,
1493            &mut instrument_states,
1494            clock,
1495        );
1496
1497        let statuses = collect_instrument_statuses(&mut rx);
1498        assert_eq!(statuses.len(), 2, "expected one emission per transition");
1499        assert_eq!(statuses[0].is_trading, Some(true));
1500        assert_eq!(statuses[1].is_trading, Some(false));
1501    }
1502
1503    #[rstest]
1504    fn test_ticker_instrument_status_skipped_when_not_subscribed() {
1505        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1506        let instruments = Arc::new(AtomicMap::new());
1507        instruments.insert(Ustr::from("EURUSD-PERP"), ticker_test_instrument());
1508
1509        let sdt = Arc::new(AtomicMap::new());
1510        sdt.insert(
1511            "EURUSD-PERP".to_string(),
1512            SymbolDataTypes {
1513                quotes: false,
1514                trades: false,
1515                mark_prices: false,
1516                instrument_status: false,
1517                book_level: None,
1518            },
1519        );
1520
1521        let mut book_sequences = AHashMap::new();
1522        let mut candle_cache = AHashMap::new();
1523        let mut instrument_states = AHashMap::new();
1524        let clock = get_atomic_clock_realtime();
1525
1526        handle_md_message(
1527            AxMdMessage::Ticker(ticker_message(AxInstrumentState::Open)),
1528            &tx,
1529            &instruments,
1530            &sdt,
1531            &mut book_sequences,
1532            &mut candle_cache,
1533            &mut instrument_states,
1534            clock,
1535        );
1536
1537        let statuses = collect_instrument_statuses(&mut rx);
1538        assert!(statuses.is_empty());
1539    }
1540}