1use std::{
19 future::Future,
20 sync::{
21 Arc, Mutex,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::Duration,
25};
26
27use ahash::{AHashMap, AHashSet};
28use anyhow::Context;
29use async_trait::async_trait;
30use chrono::{DateTime, Duration as ChronoDuration, Utc};
31use futures_util::StreamExt;
32use nautilus_common::{
33 clients::DataClient,
34 live::{runner::get_data_event_sender, runtime::get_runtime},
35 messages::{
36 DataEvent, DataResponse,
37 data::{
38 BarsResponse, BookResponse, FundingRatesResponse, InstrumentResponse,
39 InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
40 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
41 SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
42 SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
43 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
44 UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeIndexPrices,
45 UnsubscribeInstrument, UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus,
46 UnsubscribeInstruments, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
47 },
48 },
49};
50use nautilus_core::{
51 AtomicMap, MUTEX_POISONED,
52 datetime::datetime_to_unix_nanos,
53 nanos::UnixNanos,
54 time::{AtomicTime, get_atomic_clock_realtime},
55};
56use nautilus_model::{
57 data::{Data, FundingRateUpdate, InstrumentStatus, MarkPriceUpdate, OrderBookDeltas_API},
58 enums::{BookType, MarketStatusAction},
59 identifiers::{ClientId, InstrumentId, Venue},
60 instruments::{Instrument, InstrumentAny},
61 types::Price,
62};
63use tokio::task::JoinHandle;
64use tokio_util::sync::CancellationToken;
65use ustr::Ustr;
66
67use crate::{
68 common::{
69 consts::{AX_AUTH_TOKEN_TTL_DATA_SECS, AX_FUNDING_RATE_LOOKBACK_DAYS, AX_VENUE},
70 credential::Credential,
71 enums::{AxCandleWidth, AxInstrumentState, AxMarketDataLevel},
72 parse::{ax_timestamp_stn_to_unix_nanos, map_bar_spec_to_candle_width},
73 },
74 config::AxDataClientConfig,
75 http::client::AxHttpClient,
76 websocket::{
77 data::{
78 client::{AxMdWebSocketClient, AxWsClientError, SymbolDataTypes},
79 parse::{
80 parse_book_l1_quote, parse_book_l2_deltas, parse_book_l3_deltas, parse_candle_bar,
81 parse_trade_tick,
82 },
83 },
84 messages::{AxDataWsMessage, AxMdCandle, AxMdMessage},
85 },
86};
87
88#[derive(Debug)]
96pub struct AxDataClient {
97 client_id: ClientId,
99 config: AxDataClientConfig,
101 http_client: AxHttpClient,
103 ws_client: AxMdWebSocketClient,
105 is_connected: Arc<AtomicBool>,
107 cancellation_token: CancellationToken,
109 tasks: Vec<JoinHandle<()>>,
111 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
113 instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
115 clock: &'static AtomicTime,
117 funding_rate_tasks: AHashMap<InstrumentId, JoinHandle<()>>,
118 funding_rate_cache: Arc<Mutex<AHashMap<InstrumentId, FundingRateUpdate>>>,
119}
120
121impl AxDataClient {
122 pub fn new(
128 client_id: ClientId,
129 config: AxDataClientConfig,
130 http_client: AxHttpClient,
131 ws_client: AxMdWebSocketClient,
132 ) -> anyhow::Result<Self> {
133 let clock = get_atomic_clock_realtime();
134 let data_sender = get_data_event_sender();
135
136 let instruments = http_client.instruments_cache.clone();
138
139 Ok(Self {
140 client_id,
141 config,
142 http_client,
143 ws_client,
144 is_connected: Arc::new(AtomicBool::new(false)),
145 cancellation_token: CancellationToken::new(),
146 tasks: Vec::new(),
147 data_sender,
148 instruments,
149 clock,
150 funding_rate_tasks: AHashMap::new(),
151 funding_rate_cache: Arc::new(Mutex::new(AHashMap::new())),
152 })
153 }
154
155 #[must_use]
157 pub fn venue(&self) -> Venue {
158 *AX_VENUE
159 }
160
161 fn map_book_type_to_market_data_level(book_type: BookType) -> AxMarketDataLevel {
162 match book_type {
163 BookType::L3_MBO => AxMarketDataLevel::Level3,
164 BookType::L1_MBP | BookType::L2_MBP => AxMarketDataLevel::Level2,
165 }
166 }
167
168 #[must_use]
170 pub fn instruments(&self) -> &Arc<AtomicMap<Ustr, InstrumentAny>> {
171 &self.instruments
172 }
173
174 fn spawn_message_handler(&mut self) {
176 let stream = self.ws_client.stream();
177 let data_sender = self.data_sender.clone();
178 let cancellation_token = self.cancellation_token.clone();
179 let is_connected = Arc::clone(&self.is_connected);
180 let instruments = Arc::clone(&self.instruments);
181 let symbol_data_types = self.ws_client.symbol_data_types();
182 let status_invalidations = self.ws_client.status_invalidations();
183 let clock = self.clock;
184
185 let handle = get_runtime().spawn(async move {
186 tokio::pin!(stream);
187
188 let mut book_sequences: AHashMap<Ustr, u64> = AHashMap::new();
189 let mut candle_cache: AHashMap<(Ustr, AxCandleWidth), AxMdCandle> = AHashMap::new();
190 let mut instrument_states: AHashMap<Ustr, AxInstrumentState> = AHashMap::new();
191
192 loop {
193 tokio::select! {
194 () = cancellation_token.cancelled() => {
195 log::debug!("Message handler cancelled");
196 break;
197 }
198 msg = stream.next() => {
199 match msg {
200 Some(ws_msg) => {
201 drain_status_invalidations(
202 &status_invalidations,
203 &mut instrument_states,
204 );
205
206 handle_ws_message(
207 ws_msg,
208 &data_sender,
209 &instruments,
210 &symbol_data_types,
211 &mut book_sequences,
212 &mut candle_cache,
213 &mut instrument_states,
214 clock,
215 );
216 }
217 None => {
218 log::debug!("WebSocket stream ended");
219 is_connected.store(false, Ordering::Release);
220 break;
221 }
222 }
223 }
224 }
225 }
226 });
227
228 self.tasks.push(handle);
229 }
230
231 fn spawn_instrument_refresh(&mut self) {
232 let minutes = self.config.update_instruments_interval_mins;
233 if minutes == 0 {
234 return;
235 }
236
237 let interval = Duration::from_secs(minutes.saturating_mul(60));
238 let cancellation = self.cancellation_token.clone();
239 let instruments_cache = Arc::clone(&self.instruments);
240 let http_client = self.http_client.clone();
241 let data_sender = self.data_sender.clone();
242 let client_id = self.client_id;
243
244 let handle = get_runtime().spawn(async move {
245 loop {
246 let sleep = tokio::time::sleep(interval);
247 tokio::pin!(sleep);
248 tokio::select! {
249 () = cancellation.cancelled() => {
250 log::debug!("Instrument refresh task cancelled");
251 break;
252 }
253 () = &mut sleep => {
254 match http_client.request_instruments(None, None).await {
255 Ok(instruments) => {
256 for inst in &instruments {
257 instruments_cache.insert(inst.symbol().inner(), inst.clone());
258
259 if let Err(e) = data_sender
260 .send(DataEvent::Instrument(inst.clone()))
261 {
262 log::warn!("Failed to send refreshed instrument: {e}");
263 }
264 }
265 http_client.cache_instruments(&instruments);
266 log::debug!(
267 "Instruments refreshed: client_id={client_id}, count={}",
268 instruments.len(),
269 );
270 }
271 Err(e) => {
272 log::warn!("Failed to refresh instruments: client_id={client_id}, error={e:?}");
273 }
274 }
275 }
276 }
277 }
278 });
279
280 self.tasks.push(handle);
281 }
282
283 #[expect(
284 clippy::unnecessary_wraps,
285 reason = "callers forward Result to trait methods"
286 )]
287 fn ws_symbol_op<F, Fut>(
288 &mut self,
289 instrument_id: InstrumentId,
290 op: F,
291 context: &'static str,
292 ) -> anyhow::Result<()>
293 where
294 F: FnOnce(AxMdWebSocketClient, String) -> Fut + Send + 'static,
295 Fut: Future<Output = Result<(), AxWsClientError>> + Send,
296 {
297 let symbol = instrument_id.symbol.to_string();
298 log::debug!("{context} for {symbol}");
299
300 let ws = self.ws_client.clone();
301 self.spawn_ws(
302 async move { op(ws, symbol).await.map_err(|e| anyhow::anyhow!(e)) },
303 context,
304 );
305
306 Ok(())
307 }
308
309 fn spawn_ws<F>(&mut self, fut: F, context: &'static str)
310 where
311 F: Future<Output = anyhow::Result<()>> + Send + 'static,
312 {
313 let handle = get_runtime().spawn(async move {
314 if let Err(e) = fut.await {
315 log::error!("{context}: {e:?}");
316 }
317 });
318
319 self.tasks.retain(|h| !h.is_finished());
320 self.tasks.push(handle);
321 }
322
323 fn abort_all_tasks(&mut self) {
324 self.cancellation_token.cancel();
325
326 for task in self.tasks.drain(..) {
327 task.abort();
328 }
329
330 for (_, task) in self.funding_rate_tasks.drain() {
331 task.abort();
332 }
333 }
334}
335
336#[async_trait(?Send)]
337impl DataClient for AxDataClient {
338 fn client_id(&self) -> ClientId {
339 self.client_id
340 }
341
342 fn venue(&self) -> Option<Venue> {
343 Some(*AX_VENUE)
344 }
345
346 fn start(&mut self) -> anyhow::Result<()> {
347 log::debug!("Starting {}", self.client_id);
348 Ok(())
349 }
350
351 fn stop(&mut self) -> anyhow::Result<()> {
352 log::debug!("Stopping {}", self.client_id);
353 self.abort_all_tasks();
354 self.is_connected.store(false, Ordering::Release);
355 Ok(())
356 }
357
358 fn reset(&mut self) -> anyhow::Result<()> {
359 log::debug!("Resetting {}", self.client_id);
360 self.abort_all_tasks();
361 self.funding_rate_cache
362 .lock()
363 .expect(MUTEX_POISONED)
364 .clear();
365 self.cancellation_token = CancellationToken::new();
366 Ok(())
367 }
368
369 fn dispose(&mut self) -> anyhow::Result<()> {
370 log::debug!("Disposing {}", self.client_id);
371 self.abort_all_tasks();
372 self.is_connected.store(false, Ordering::Release);
373 Ok(())
374 }
375
376 fn is_connected(&self) -> bool {
377 self.is_connected.load(Ordering::Acquire)
378 }
379
380 fn is_disconnected(&self) -> bool {
381 !self.is_connected()
382 }
383
384 async fn connect(&mut self) -> anyhow::Result<()> {
385 if self.is_connected() {
386 log::debug!("Already connected {}", self.client_id);
387 return Ok(());
388 }
389
390 log::info!("Connecting {}", self.client_id);
391
392 self.cancellation_token = CancellationToken::new();
394
395 if self.config.has_api_credentials() {
396 let credential =
397 Credential::resolve(self.config.api_key.clone(), self.config.api_secret.clone())
398 .context("API credentials not configured")?;
399
400 let token = self
401 .http_client
402 .authenticate(
403 credential.api_key(),
404 credential.api_secret(),
405 AX_AUTH_TOKEN_TTL_DATA_SECS,
406 )
407 .await
408 .context("Failed to authenticate with Ax")?;
409 log::info!("Authenticated with Ax");
410 self.ws_client.set_auth_token(token);
411 }
412
413 let instruments = self
414 .http_client
415 .request_instruments(None, None)
416 .await
417 .context("Failed to fetch instruments")?;
418
419 for instrument in &instruments {
420 self.instruments
421 .insert(instrument.symbol().inner(), instrument.clone());
422
423 if let Err(e) = self
424 .data_sender
425 .send(DataEvent::Instrument(instrument.clone()))
426 {
427 log::warn!("Failed to send instrument: {e}");
428 }
429 }
430 self.http_client.cache_instruments(&instruments);
431 log::info!(
432 "Cached {} instruments",
433 self.http_client.get_cached_symbols().len()
434 );
435
436 self.ws_client
437 .connect()
438 .await
439 .context("Failed to connect WebSocket")?;
440 log::info!("WebSocket connected");
441 self.spawn_message_handler();
442 self.spawn_instrument_refresh();
443
444 self.is_connected.store(true, Ordering::Release);
445 log::info!("Connected {}", self.client_id);
446
447 Ok(())
448 }
449
450 async fn disconnect(&mut self) -> anyhow::Result<()> {
451 log::info!("Disconnecting {}", self.client_id);
452 self.ws_client.close().await;
453 self.abort_all_tasks();
454 self.funding_rate_cache
455 .lock()
456 .expect(MUTEX_POISONED)
457 .clear();
458
459 self.is_connected.store(false, Ordering::Release);
460 log::info!("Disconnected {}", self.client_id);
461
462 Ok(())
463 }
464
465 fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
466 log::debug!("Instruments subscription not applicable for AX (use request_instruments)");
468 Ok(())
469 }
470
471 fn subscribe_instrument(&mut self, _cmd: SubscribeInstrument) -> anyhow::Result<()> {
472 log::debug!("Instrument subscription not applicable for AX (use request_instrument)");
474 Ok(())
475 }
476
477 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
478 let symbol = cmd.instrument_id.symbol.to_string();
479 let level = Self::map_book_type_to_market_data_level(cmd.book_type);
480 if cmd.book_type == BookType::L1_MBP {
481 log::warn!(
482 "Book type L1_MBP not supported by AX for deltas, downgrading {symbol} to LEVEL_2"
483 );
484 }
485 log::debug!("Subscribing to book deltas for {symbol} at {level:?}");
486
487 let ws = self.ws_client.clone();
488 self.spawn_ws(
489 async move {
490 ws.subscribe_book_deltas(&symbol, level)
491 .await
492 .map_err(|e| anyhow::anyhow!(e))
493 },
494 "subscribe book deltas",
495 );
496
497 Ok(())
498 }
499
500 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
501 self.ws_symbol_op(
502 cmd.instrument_id,
503 |ws, s| async move { ws.subscribe_quotes(&s).await },
504 "Subscribing to quotes",
505 )
506 }
507
508 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
509 self.ws_symbol_op(
510 cmd.instrument_id,
511 |ws, s| async move { ws.subscribe_trades(&s).await },
512 "Subscribing to trades",
513 )
514 }
515
516 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
517 self.ws_symbol_op(
518 cmd.instrument_id,
519 |ws, s| async move { ws.subscribe_mark_prices(&s).await },
520 "Subscribing to mark prices",
521 )
522 }
523
524 fn subscribe_index_prices(&mut self, _cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
525 log::warn!("Index prices not supported by AX Exchange");
526 Ok(())
527 }
528
529 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
530 let bar_type = cmd.bar_type;
531 let symbol = bar_type.instrument_id().symbol.to_string();
532 let width = map_bar_spec_to_candle_width(&bar_type.spec())?;
533 log::debug!("Subscribing to bars for {bar_type} (width: {width:?})");
534
535 let ws = self.ws_client.clone();
536 self.spawn_ws(
537 async move {
538 ws.subscribe_candles(&symbol, width)
539 .await
540 .map_err(|e| anyhow::anyhow!(e))
541 },
542 "subscribe bars",
543 );
544
545 Ok(())
546 }
547
548 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
549 let poll_interval_mins = self.config.funding_rate_poll_interval_mins.max(1);
550
551 let lookback = ChronoDuration::days(AX_FUNDING_RATE_LOOKBACK_DAYS);
553
554 let instrument_id = cmd.instrument_id;
555
556 if self.funding_rate_tasks.contains_key(&instrument_id) {
557 log::debug!("Already subscribed to funding rates for {instrument_id}");
558 return Ok(());
559 }
560
561 log::debug!("Subscribing to funding rates for {instrument_id} (HTTP polling)");
562
563 let http = self.http_client.clone();
564 let sender = self.data_sender.clone();
565 let symbol = instrument_id.symbol.inner();
566 let cancel = self.cancellation_token.clone();
567 let cache = Arc::clone(&self.funding_rate_cache);
568 let clock = self.clock;
569
570 let handle = get_runtime().spawn(async move {
571 let mut interval = tokio::time::interval(Duration::from_mins(poll_interval_mins));
573
574 loop {
575 tokio::select! {
576 () = cancel.cancelled() => {
577 log::debug!("Funding rate polling cancelled for {symbol}");
578 break;
579 }
580 _ = interval.tick() => {
581 let now: DateTime<Utc> = clock.get_time_ns().into();
582 let start = now - lookback;
583
584 match http.request_funding_rates(instrument_id, Some(start), Some(now)).await {
585 Ok(funding_rates) => {
586 if funding_rates.is_empty() {
587 log::warn!(
588 "No funding rates returned for {symbol}"
589 );
590 } else if let Some(update) = funding_rates.last() {
591 let should_emit = cache.lock().expect(MUTEX_POISONED)
593 .get(&instrument_id) != Some(update);
594
595 if should_emit {
596 log::info!(
597 "Funding rate for {symbol}: {}",
598 update.rate,
599 );
600 let update = *update;
601 cache.lock().expect(MUTEX_POISONED)
602 .insert(instrument_id, update);
603
604 if let Err(e) = sender.send(
605 DataEvent::FundingRate(update),
606 ) {
607 log::error!(
608 "Failed to send funding rate for {symbol}: {e}"
609 );
610 }
611 }
612 }
613 }
614 Err(e) => {
615 log::error!(
616 "Failed to poll funding rates for {symbol}: {e}"
617 );
618 }
619 }
620 }
621 }
622 }
623 });
624
625 self.funding_rate_tasks.insert(instrument_id, handle);
626 Ok(())
627 }
628
629 fn subscribe_instrument_status(
630 &mut self,
631 cmd: SubscribeInstrumentStatus,
632 ) -> anyhow::Result<()> {
633 self.ws_symbol_op(
634 cmd.instrument_id,
635 |ws, s| async move { ws.subscribe_instrument_status(&s).await },
636 "Subscribing to instrument status",
637 )
638 }
639
640 fn subscribe_instrument_close(&mut self, _cmd: SubscribeInstrumentClose) -> anyhow::Result<()> {
641 log::warn!("Instrument close not supported by AX Exchange");
642 Ok(())
643 }
644
645 fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
646 Ok(())
647 }
648
649 fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
650 Ok(())
651 }
652
653 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
654 self.ws_symbol_op(
655 cmd.instrument_id,
656 |ws, s| async move { ws.unsubscribe_book_deltas(&s).await },
657 "Unsubscribing from book deltas",
658 )
659 }
660
661 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
662 self.ws_symbol_op(
663 cmd.instrument_id,
664 |ws, s| async move { ws.unsubscribe_quotes(&s).await },
665 "Unsubscribing from quotes",
666 )
667 }
668
669 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
670 self.ws_symbol_op(
671 cmd.instrument_id,
672 |ws, s| async move { ws.unsubscribe_trades(&s).await },
673 "Unsubscribing from trades",
674 )
675 }
676
677 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
678 self.ws_symbol_op(
679 cmd.instrument_id,
680 |ws, s| async move { ws.unsubscribe_mark_prices(&s).await },
681 "Unsubscribing from mark prices",
682 )
683 }
684
685 fn unsubscribe_index_prices(&mut self, _cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
686 Ok(())
687 }
688
689 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
690 let bar_type = cmd.bar_type;
691 let symbol = bar_type.instrument_id().symbol.to_string();
692 let width = map_bar_spec_to_candle_width(&bar_type.spec())?;
693 log::debug!("Unsubscribing from bars for {bar_type}");
694
695 let ws = self.ws_client.clone();
696 self.spawn_ws(
697 async move {
698 ws.unsubscribe_candles(&symbol, width)
699 .await
700 .map_err(|e| anyhow::anyhow!(e))
701 },
702 "unsubscribe bars",
703 );
704
705 Ok(())
706 }
707
708 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
709 let instrument_id = cmd.instrument_id;
710
711 if let Some(task) = self.funding_rate_tasks.remove(&instrument_id) {
712 log::debug!("Unsubscribing from funding rates for {instrument_id}");
713 task.abort();
714 self.funding_rate_cache
715 .lock()
716 .expect(MUTEX_POISONED)
717 .remove(&instrument_id);
718 } else {
719 log::debug!("Not subscribed to funding rates for {instrument_id}");
720 }
721
722 Ok(())
723 }
724
725 fn unsubscribe_instrument_status(
726 &mut self,
727 cmd: &UnsubscribeInstrumentStatus,
728 ) -> anyhow::Result<()> {
729 self.ws_symbol_op(
730 cmd.instrument_id,
731 |ws, s| async move { ws.unsubscribe_instrument_status(&s).await },
732 "Unsubscribing from instrument status",
733 )
734 }
735
736 fn unsubscribe_instrument_close(
737 &mut self,
738 _cmd: &UnsubscribeInstrumentClose,
739 ) -> anyhow::Result<()> {
740 Ok(())
741 }
742
743 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
744 let http = self.http_client.clone();
745 let instruments_cache = Arc::clone(&self.instruments);
746 let sender = self.data_sender.clone();
747 let cancel = self.cancellation_token.clone();
748 let request_id = request.request_id;
749 let client_id = request.client_id.unwrap_or(self.client_id);
750 let venue = *AX_VENUE;
751 let start_nanos = datetime_to_unix_nanos(request.start);
752 let end_nanos = datetime_to_unix_nanos(request.end);
753 let params = request.params;
754 let clock = self.clock;
755
756 get_runtime().spawn(async move {
757 match http.request_instruments(None, None).await {
758 Ok(instruments) => {
759 if cancel.is_cancelled() {
760 return;
761 }
762 log::info!("Fetched {} instruments from Ax", instruments.len());
763 for inst in &instruments {
764 instruments_cache.insert(inst.symbol().inner(), inst.clone());
765 }
766 http.cache_instruments(&instruments);
767
768 let response = DataResponse::Instruments(InstrumentsResponse::new(
769 request_id,
770 client_id,
771 venue,
772 instruments,
773 start_nanos,
774 end_nanos,
775 clock.get_time_ns(),
776 params,
777 ));
778
779 if let Err(e) = sender.send(DataEvent::Response(response)) {
780 log::error!("Failed to send instruments response: {e}");
781 }
782 }
783 Err(e) => {
784 log::error!("Failed to request instruments: {e}");
785 }
786 }
787 });
788
789 Ok(())
790 }
791
792 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
793 let http = self.http_client.clone();
794 let instruments_cache = Arc::clone(&self.instruments);
795 let sender = self.data_sender.clone();
796 let cancel = self.cancellation_token.clone();
797 let request_id = request.request_id;
798 let client_id = request.client_id.unwrap_or(self.client_id);
799 let instrument_id = request.instrument_id;
800 let symbol = instrument_id.symbol.inner();
801 let start_nanos = datetime_to_unix_nanos(request.start);
802 let end_nanos = datetime_to_unix_nanos(request.end);
803 let params = request.params;
804 let clock = self.clock;
805
806 get_runtime().spawn(async move {
807 match http.request_instrument(symbol, None, None).await {
808 Ok(instrument) => {
809 if cancel.is_cancelled() {
810 return;
811 }
812 log::debug!("Fetched instrument {symbol} from Ax");
813 instruments_cache.insert(symbol, instrument.clone());
814 http.cache_instrument(instrument.clone());
815
816 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
817 request_id,
818 client_id,
819 instrument_id,
820 instrument,
821 start_nanos,
822 end_nanos,
823 clock.get_time_ns(),
824 params,
825 )));
826
827 if let Err(e) = sender.send(DataEvent::Response(response)) {
828 log::error!("Failed to send instrument response: {e}");
829 }
830 }
831 Err(e) => {
832 log::error!("Failed to request instrument {symbol}: {e}");
833 }
834 }
835 });
836
837 Ok(())
838 }
839
840 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
841 let http = self.http_client.clone();
842 let sender = self.data_sender.clone();
843 let cancel = self.cancellation_token.clone();
844 let request_id = request.request_id;
845 let client_id = request.client_id.unwrap_or(self.client_id);
846 let instrument_id = request.instrument_id;
847 let symbol = instrument_id.symbol.inner();
848 let depth = request.depth.map(|n| n.get());
849 let params = request.params;
850 let clock = self.clock;
851
852 get_runtime().spawn(async move {
853 match http.request_book_snapshot(symbol, depth).await {
854 Ok(book) => {
855 if cancel.is_cancelled() {
856 return;
857 }
858 log::debug!(
859 "Fetched book snapshot for {symbol} ({} bids, {} asks)",
860 book.bids(None).count(),
861 book.asks(None).count(),
862 );
863
864 let response = DataResponse::Book(BookResponse::new(
865 request_id,
866 client_id,
867 instrument_id,
868 book,
869 None,
870 None,
871 clock.get_time_ns(),
872 params,
873 ));
874
875 if let Err(e) = sender.send(DataEvent::Response(response)) {
876 log::error!("Failed to send book snapshot response: {e}");
877 }
878 }
879 Err(e) => {
880 log::error!("Failed to request book snapshot for {symbol}: {e}");
881 }
882 }
883 });
884
885 Ok(())
886 }
887
888 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
889 let http = self.http_client.clone();
890 let sender = self.data_sender.clone();
891 let cancel = self.cancellation_token.clone();
892 let request_id = request.request_id;
893 let client_id = request.client_id.unwrap_or(self.client_id);
894 let instrument_id = request.instrument_id;
895 let symbol = instrument_id.symbol.inner();
896 let limit = request.limit.map(|n| n.get() as i32);
897 let start_nanos = datetime_to_unix_nanos(request.start);
898 let end_nanos = datetime_to_unix_nanos(request.end);
899 let params = request.params;
900 let clock = self.clock;
901
902 get_runtime().spawn(async move {
903 match http
904 .request_trade_ticks(symbol, limit, start_nanos, end_nanos)
905 .await
906 {
907 Ok(ticks) => {
908 if cancel.is_cancelled() {
909 return;
910 }
911 log::debug!("Fetched {} trades for {symbol}", ticks.len());
912
913 let response = DataResponse::Trades(TradesResponse::new(
914 request_id,
915 client_id,
916 instrument_id,
917 ticks,
918 start_nanos,
919 end_nanos,
920 clock.get_time_ns(),
921 params,
922 ));
923
924 if let Err(e) = sender.send(DataEvent::Response(response)) {
925 log::error!("Failed to send trades response: {e}");
926 }
927 }
928 Err(e) => {
929 log::error!("Failed to request trades for {symbol}: {e}");
930 }
931 }
932 });
933
934 Ok(())
935 }
936
937 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
938 let http = self.http_client.clone();
939 let sender = self.data_sender.clone();
940 let request_id = request.request_id;
941 let client_id = request.client_id.unwrap_or(self.client_id);
942 let bar_type = request.bar_type;
943 let symbol = bar_type.instrument_id().symbol.inner();
944 let start = request.start;
945 let end = request.end;
946 let start_nanos = datetime_to_unix_nanos(start);
947 let end_nanos = datetime_to_unix_nanos(end);
948 let params = request.params;
949 let clock = self.clock;
950 let width = match map_bar_spec_to_candle_width(&bar_type.spec()) {
951 Ok(w) => w,
952 Err(e) => {
953 log::error!("Failed to map bar type {bar_type}: {e}");
954 return Err(e);
955 }
956 };
957
958 let cancel = self.cancellation_token.clone();
959
960 get_runtime().spawn(async move {
961 match http.request_bars(symbol, start, end, width).await {
962 Ok(bars) => {
963 if cancel.is_cancelled() {
964 return;
965 }
966 log::debug!("Fetched {} bars for {symbol}", bars.len());
967
968 let response = DataResponse::Bars(BarsResponse::new(
969 request_id,
970 client_id,
971 bar_type,
972 bars,
973 start_nanos,
974 end_nanos,
975 clock.get_time_ns(),
976 params,
977 ));
978
979 if let Err(e) = sender.send(DataEvent::Response(response)) {
980 log::error!("Failed to send bars response: {e}");
981 }
982 }
983 Err(e) => {
984 log::error!("Failed to request bars for {symbol}: {e}");
985 }
986 }
987 });
988
989 Ok(())
990 }
991
992 fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
993 let http = self.http_client.clone();
994 let sender = self.data_sender.clone();
995 let cancel = self.cancellation_token.clone();
996 let request_id = request.request_id;
997 let client_id = request.client_id.unwrap_or(self.client_id);
998 let instrument_id = request.instrument_id;
999 let symbol = instrument_id.symbol.inner();
1000 let start = request.start;
1001 let end = request.end;
1002 let start_nanos = datetime_to_unix_nanos(start);
1003 let end_nanos = datetime_to_unix_nanos(end);
1004 let params = request.params;
1005 let clock = self.clock;
1006
1007 get_runtime().spawn(async move {
1008 match http.request_funding_rates(instrument_id, start, end).await {
1009 Ok(funding_rates) => {
1010 if cancel.is_cancelled() {
1011 return;
1012 }
1013 log::debug!("Fetched {} funding rates for {symbol}", funding_rates.len());
1014
1015 let ts_init = clock.get_time_ns();
1016 let response = DataResponse::FundingRates(FundingRatesResponse::new(
1017 request_id,
1018 client_id,
1019 instrument_id,
1020 funding_rates,
1021 start_nanos,
1022 end_nanos,
1023 ts_init,
1024 params,
1025 ));
1026
1027 if let Err(e) = sender.send(DataEvent::Response(response)) {
1028 log::error!("Failed to send funding rates response: {e}");
1029 }
1030 }
1031 Err(e) => {
1032 log::error!("Failed to request funding rates for {symbol}: {e}");
1033 }
1034 }
1035 });
1036
1037 Ok(())
1038 }
1039}
1040
1041fn drain_status_invalidations(
1042 invalidations: &Arc<Mutex<AHashSet<Ustr>>>,
1043 instrument_states: &mut AHashMap<Ustr, AxInstrumentState>,
1044) {
1045 if let Ok(mut set) = invalidations.lock() {
1046 for symbol in set.drain() {
1047 instrument_states.remove(&symbol);
1048 }
1049 }
1050}
1051
1052#[expect(clippy::too_many_arguments)]
1053fn handle_ws_message(
1054 msg: AxDataWsMessage,
1055 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1056 instruments: &Arc<AtomicMap<Ustr, InstrumentAny>>,
1057 symbol_data_types: &Arc<AtomicMap<String, SymbolDataTypes>>,
1058 book_sequences: &mut AHashMap<Ustr, u64>,
1059 candle_cache: &mut AHashMap<(Ustr, AxCandleWidth), AxMdCandle>,
1060 instrument_states: &mut AHashMap<Ustr, AxInstrumentState>,
1061 clock: &'static AtomicTime,
1062) {
1063 match msg {
1064 AxDataWsMessage::Reconnected => {
1065 candle_cache.clear();
1066 instrument_states.clear();
1067 log::info!("WebSocket reconnected");
1068 }
1069 AxDataWsMessage::CandleUnsubscribed { symbol, width } => {
1070 candle_cache.remove(&(symbol, width));
1071 }
1072 AxDataWsMessage::MdMessage(md_msg) => {
1073 handle_md_message(
1074 md_msg,
1075 sender,
1076 instruments,
1077 symbol_data_types,
1078 book_sequences,
1079 candle_cache,
1080 instrument_states,
1081 clock,
1082 );
1083 }
1084 }
1085}
1086
1087#[expect(clippy::too_many_arguments)]
1088fn handle_md_message(
1089 message: AxMdMessage,
1090 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1091 instruments: &Arc<AtomicMap<Ustr, InstrumentAny>>,
1092 symbol_data_types: &Arc<AtomicMap<String, SymbolDataTypes>>,
1093 book_sequences: &mut AHashMap<Ustr, u64>,
1094 candle_cache: &mut AHashMap<(Ustr, AxCandleWidth), AxMdCandle>,
1095 instrument_states: &mut AHashMap<Ustr, AxInstrumentState>,
1096 clock: &'static AtomicTime,
1097) {
1098 let ts_init = || -> UnixNanos { clock.get_time_ns() };
1099
1100 let instruments_snap = instruments.load();
1101 let sdt_snap = symbol_data_types.load();
1102
1103 match message {
1104 AxMdMessage::BookL1(book) => {
1105 let l1_subscribed = sdt_snap
1106 .get(book.s.as_str())
1107 .is_some_and(|e| e.quotes || e.book_level == Some(AxMarketDataLevel::Level1));
1108
1109 if !l1_subscribed {
1110 return;
1111 }
1112
1113 let Some(instrument) = instruments_snap.get(&book.s) else {
1114 log::error!(
1115 "No instrument cached for symbol '{}' - cannot parse L1 book",
1116 book.s
1117 );
1118 return;
1119 };
1120
1121 match parse_book_l1_quote(&book, instrument, ts_init()) {
1122 Ok(quote) => {
1123 let _ = sender.send(DataEvent::Data(Data::Quote(quote)));
1124 }
1125 Err(e) => log::error!("Failed to parse L1 to QuoteTick: {e}"),
1126 }
1127 }
1128 AxMdMessage::BookL2(book) => {
1129 let symbol = book.s;
1130 let seq = book_sequences.entry(symbol).or_insert(0);
1131 *seq += 1;
1132 let sequence = *seq;
1133
1134 let Some(instrument) = instruments_snap.get(&symbol) else {
1135 log::error!("No instrument cached for symbol '{symbol}' - cannot parse L2 book");
1136 return;
1137 };
1138
1139 match parse_book_l2_deltas(&book, instrument, sequence, ts_init()) {
1140 Ok(deltas) => {
1141 let api_deltas = OrderBookDeltas_API::new(deltas);
1142 let _ = sender.send(DataEvent::Data(Data::Deltas(api_deltas)));
1143 }
1144 Err(e) => log::error!("Failed to parse L2 to OrderBookDeltas: {e}"),
1145 }
1146 }
1147 AxMdMessage::BookL3(book) => {
1148 let symbol = book.s;
1149 let seq = book_sequences.entry(symbol).or_insert(0);
1150 *seq += 1;
1151 let sequence = *seq;
1152
1153 let Some(instrument) = instruments_snap.get(&symbol) else {
1154 log::error!("No instrument cached for symbol '{symbol}' - cannot parse L3 book");
1155 return;
1156 };
1157
1158 match parse_book_l3_deltas(&book, instrument, sequence, ts_init()) {
1159 Ok(deltas) => {
1160 let api_deltas = OrderBookDeltas_API::new(deltas);
1161 let _ = sender.send(DataEvent::Data(Data::Deltas(api_deltas)));
1162 }
1163 Err(e) => log::error!("Failed to parse L3 to OrderBookDeltas: {e}"),
1164 }
1165 }
1166 AxMdMessage::Ticker(ticker) => {
1167 let Some(instrument) = instruments_snap.get(&ticker.s) else {
1168 log::debug!("No instrument cached for ticker symbol '{}'", ticker.s);
1169 return;
1170 };
1171
1172 let instrument_id = instrument.id();
1173 let price_precision = instrument.price_precision();
1174 let ts_event =
1175 ax_timestamp_stn_to_unix_nanos(ticker.ts, ticker.tn).unwrap_or_else(|_| ts_init());
1176 let ts_init = ts_init();
1177
1178 let mark_prices_subscribed = sdt_snap
1179 .get(ticker.s.as_str())
1180 .is_some_and(|e| e.mark_prices);
1181 if mark_prices_subscribed && let Some(mark_price) = ticker.m {
1182 match Price::from_decimal_dp(mark_price, price_precision) {
1183 Ok(price) => {
1184 let update = MarkPriceUpdate::new(instrument_id, price, ts_event, ts_init);
1185 let _ = sender.send(DataEvent::Data(Data::MarkPriceUpdate(update)));
1186 }
1187 Err(e) => {
1188 log::error!("Failed to parse mark price for {}: {e}", ticker.s);
1189 }
1190 }
1191 }
1192
1193 if let Some(state) = ticker.i {
1194 let status_subscribed = sdt_snap
1195 .get(ticker.s.as_str())
1196 .is_some_and(|e| e.instrument_status);
1197 if status_subscribed {
1198 let prev = instrument_states.insert(ticker.s, state);
1199 if prev != Some(state) {
1200 let action = MarketStatusAction::from(state);
1201 let status = InstrumentStatus::new(
1202 instrument_id,
1203 action,
1204 ts_event,
1205 ts_init,
1206 None,
1207 None,
1208 Some(state == AxInstrumentState::Open),
1209 None,
1210 None,
1211 );
1212 let _ = sender.send(DataEvent::InstrumentStatus(status));
1213 }
1214 }
1215 }
1216 }
1217 AxMdMessage::Trade(trade) => {
1218 let trades_subscribed = sdt_snap.get(trade.s.as_str()).is_some_and(|e| e.trades);
1219
1220 if !trades_subscribed {
1221 return;
1222 }
1223
1224 let Some(instrument) = instruments_snap.get(&trade.s) else {
1225 log::error!(
1226 "No instrument cached for symbol '{}' - cannot parse trade",
1227 trade.s
1228 );
1229 return;
1230 };
1231
1232 match parse_trade_tick(&trade, instrument, ts_init()) {
1233 Ok(tick) => {
1234 let _ = sender.send(DataEvent::Data(Data::Trade(tick)));
1235 }
1236 Err(e) => log::error!("Failed to parse trade to TradeTick: {e}"),
1237 }
1238 }
1239 AxMdMessage::Candle(candle) => {
1240 let cache_key = (candle.symbol, candle.width);
1241
1242 let closed_candle = if let Some(cached) = candle_cache.get(&cache_key) {
1243 if cached.ts == candle.ts {
1244 None
1245 } else {
1246 Some(cached.clone())
1247 }
1248 } else {
1249 None
1250 };
1251
1252 candle_cache.insert(cache_key, candle);
1253
1254 if let Some(closed) = closed_candle {
1255 let Some(instrument) = instruments_snap.get(&closed.symbol) else {
1256 log::error!(
1257 "No instrument cached for symbol '{}' - cannot parse candle",
1258 closed.symbol
1259 );
1260 return;
1261 };
1262
1263 match parse_candle_bar(&closed, instrument, ts_init()) {
1264 Ok(bar) => {
1265 let _ = sender.send(DataEvent::Data(Data::Bar(bar)));
1266 }
1267 Err(e) => log::error!("Failed to parse candle to Bar: {e}"),
1268 }
1269 }
1270 }
1271 AxMdMessage::Heartbeat(_) => {
1272 log::trace!("Received heartbeat");
1273 }
1274 AxMdMessage::SubscriptionResponse(_) => {}
1275 AxMdMessage::Error(error) => {
1276 log::error!("WebSocket error: {}", error.message);
1277 }
1278 }
1279}
1280
1281#[cfg(test)]
1282mod tests {
1283 use std::sync::{Arc, Mutex};
1284
1285 use ahash::{AHashMap, AHashSet};
1286 use nautilus_model::{
1287 data::InstrumentStatus,
1288 enums::AssetClass,
1289 identifiers::{InstrumentId, Symbol},
1290 instruments::PerpetualContract,
1291 types::{Currency, Price, Quantity},
1292 };
1293 use rstest::rstest;
1294 use rust_decimal::Decimal;
1295 use ustr::Ustr;
1296
1297 use super::*;
1298 use crate::websocket::{
1299 data::client::SymbolDataTypes,
1300 messages::{AxMdMessage, AxMdTicker},
1301 };
1302
1303 #[rstest]
1304 fn test_drain_status_invalidations_removes_cached_state() {
1305 let invalidations = Arc::new(Mutex::new(AHashSet::new()));
1306 let mut states = AHashMap::new();
1307 let sym = Ustr::from("EURUSD-PERP");
1308
1309 states.insert(sym, AxInstrumentState::Open);
1310 invalidations.lock().unwrap().insert(sym);
1311
1312 drain_status_invalidations(&invalidations, &mut states);
1313
1314 assert!(!states.contains_key(&sym));
1315 assert!(invalidations.lock().unwrap().is_empty());
1316 }
1317
1318 #[rstest]
1319 fn test_drain_status_invalidations_no_op_when_empty() {
1320 let invalidations = Arc::new(Mutex::new(AHashSet::new()));
1321 let mut states = AHashMap::new();
1322 let sym = Ustr::from("EURUSD-PERP");
1323 states.insert(sym, AxInstrumentState::Open);
1324
1325 drain_status_invalidations(&invalidations, &mut states);
1326
1327 assert!(states.contains_key(&sym));
1328 }
1329
1330 fn ticker_test_instrument() -> InstrumentAny {
1331 let symbol = Symbol::new("EURUSD-PERP");
1332 let instrument = PerpetualContract::new(
1333 InstrumentId::new(symbol, *crate::common::consts::AX_VENUE),
1334 symbol,
1335 Ustr::from("EURUSD"),
1336 AssetClass::FX,
1337 None,
1338 Currency::USD(),
1339 Currency::USD(),
1340 false,
1341 4,
1342 0,
1343 Price::from("0.0001"),
1344 Quantity::from("1"),
1345 None,
1346 None,
1347 None,
1348 None,
1349 None,
1350 None,
1351 None,
1352 None,
1353 Some(Decimal::new(1, 2)),
1354 Some(Decimal::new(5, 3)),
1355 Some(Decimal::new(2, 4)),
1356 Some(Decimal::new(5, 4)),
1357 None,
1358 UnixNanos::default(),
1359 UnixNanos::default(),
1360 );
1361 InstrumentAny::PerpetualContract(instrument)
1362 }
1363
1364 fn ticker_message(state: AxInstrumentState) -> AxMdTicker {
1365 AxMdTicker {
1366 ts: 1_700_000_000,
1367 tn: 0,
1368 s: Ustr::from("EURUSD-PERP"),
1369 p: rust_decimal::Decimal::ZERO,
1370 q: 0,
1371 o: rust_decimal::Decimal::ZERO,
1372 l: rust_decimal::Decimal::ZERO,
1373 h: rust_decimal::Decimal::ZERO,
1374 v: 0,
1375 oi: None,
1376 m: None,
1377 i: Some(state),
1378 pl: None,
1379 pu: None,
1380 lsp: None,
1381 }
1382 }
1383
1384 fn collect_instrument_statuses(
1385 rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
1386 ) -> Vec<InstrumentStatus> {
1387 let mut statuses = Vec::new();
1388
1389 while let Ok(event) = rx.try_recv() {
1390 if let DataEvent::InstrumentStatus(status) = event {
1391 statuses.push(status);
1392 }
1393 }
1394 statuses
1395 }
1396
1397 #[rstest]
1398 fn test_ticker_instrument_status_emitted_once_when_state_unchanged() {
1399 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1400 let instruments = Arc::new(AtomicMap::new());
1401 instruments.insert(Ustr::from("EURUSD-PERP"), ticker_test_instrument());
1402
1403 let sdt = Arc::new(AtomicMap::new());
1404 sdt.insert(
1405 "EURUSD-PERP".to_string(),
1406 SymbolDataTypes {
1407 quotes: false,
1408 trades: false,
1409 mark_prices: false,
1410 instrument_status: true,
1411 book_level: None,
1412 },
1413 );
1414
1415 let mut book_sequences = AHashMap::new();
1416 let mut candle_cache = AHashMap::new();
1417 let mut instrument_states = AHashMap::new();
1418 let clock = get_atomic_clock_realtime();
1419
1420 let msg = AxMdMessage::Ticker(ticker_message(AxInstrumentState::Open));
1421 handle_md_message(
1422 msg.clone(),
1423 &tx,
1424 &instruments,
1425 &sdt,
1426 &mut book_sequences,
1427 &mut candle_cache,
1428 &mut instrument_states,
1429 clock,
1430 );
1431
1432 handle_md_message(
1434 msg,
1435 &tx,
1436 &instruments,
1437 &sdt,
1438 &mut book_sequences,
1439 &mut candle_cache,
1440 &mut instrument_states,
1441 clock,
1442 );
1443
1444 let statuses = collect_instrument_statuses(&mut rx);
1445 assert_eq!(
1446 statuses.len(),
1447 1,
1448 "expected a single emission, found {statuses:?}"
1449 );
1450 assert_eq!(statuses[0].is_trading, Some(true));
1451 }
1452
1453 #[rstest]
1454 fn test_ticker_instrument_status_emitted_on_transition() {
1455 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1456 let instruments = Arc::new(AtomicMap::new());
1457 instruments.insert(Ustr::from("EURUSD-PERP"), ticker_test_instrument());
1458
1459 let sdt = Arc::new(AtomicMap::new());
1460 sdt.insert(
1461 "EURUSD-PERP".to_string(),
1462 SymbolDataTypes {
1463 quotes: false,
1464 trades: false,
1465 mark_prices: false,
1466 instrument_status: true,
1467 book_level: None,
1468 },
1469 );
1470
1471 let mut book_sequences = AHashMap::new();
1472 let mut candle_cache = AHashMap::new();
1473 let mut instrument_states = AHashMap::new();
1474 let clock = get_atomic_clock_realtime();
1475
1476 handle_md_message(
1477 AxMdMessage::Ticker(ticker_message(AxInstrumentState::Open)),
1478 &tx,
1479 &instruments,
1480 &sdt,
1481 &mut book_sequences,
1482 &mut candle_cache,
1483 &mut instrument_states,
1484 clock,
1485 );
1486 handle_md_message(
1487 AxMdMessage::Ticker(ticker_message(AxInstrumentState::Closed)),
1488 &tx,
1489 &instruments,
1490 &sdt,
1491 &mut book_sequences,
1492 &mut candle_cache,
1493 &mut instrument_states,
1494 clock,
1495 );
1496
1497 let statuses = collect_instrument_statuses(&mut rx);
1498 assert_eq!(statuses.len(), 2, "expected one emission per transition");
1499 assert_eq!(statuses[0].is_trading, Some(true));
1500 assert_eq!(statuses[1].is_trading, Some(false));
1501 }
1502
1503 #[rstest]
1504 fn test_ticker_instrument_status_skipped_when_not_subscribed() {
1505 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1506 let instruments = Arc::new(AtomicMap::new());
1507 instruments.insert(Ustr::from("EURUSD-PERP"), ticker_test_instrument());
1508
1509 let sdt = Arc::new(AtomicMap::new());
1510 sdt.insert(
1511 "EURUSD-PERP".to_string(),
1512 SymbolDataTypes {
1513 quotes: false,
1514 trades: false,
1515 mark_prices: false,
1516 instrument_status: false,
1517 book_level: None,
1518 },
1519 );
1520
1521 let mut book_sequences = AHashMap::new();
1522 let mut candle_cache = AHashMap::new();
1523 let mut instrument_states = AHashMap::new();
1524 let clock = get_atomic_clock_realtime();
1525
1526 handle_md_message(
1527 AxMdMessage::Ticker(ticker_message(AxInstrumentState::Open)),
1528 &tx,
1529 &instruments,
1530 &sdt,
1531 &mut book_sequences,
1532 &mut candle_cache,
1533 &mut instrument_states,
1534 clock,
1535 );
1536
1537 let statuses = collect_instrument_statuses(&mut rx);
1538 assert!(statuses.is_empty());
1539 }
1540}