Skip to main content

nautilus_dydx/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the dYdX adapter.
17
18use std::{
19    str::FromStr,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use anyhow::Context;
27use dashmap::DashMap;
28use futures_util::{Stream, StreamExt, pin_mut};
29use nautilus_common::{
30    clients::DataClient,
31    live::{runner::get_data_event_sender, runtime::get_runtime},
32    messages::{
33        DataEvent, DataResponse,
34        data::{
35            BarsResponse, FundingRatesResponse, InstrumentResponse, InstrumentsResponse,
36            RequestBars, RequestFundingRates, RequestInstrument, RequestInstruments, RequestTrades,
37            SubscribeBars, SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices,
38            SubscribeInstrument, SubscribeInstrumentStatus, SubscribeInstruments,
39            SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
40            UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeIndexPrices,
41            UnsubscribeInstrument, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
42            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
43        },
44    },
45};
46use nautilus_core::{
47    AtomicMap, AtomicSet,
48    datetime::datetime_to_unix_nanos,
49    time::{AtomicTime, get_atomic_clock_realtime},
50};
51use nautilus_model::{
52    data::{
53        Bar, BarSpecification, BarType, BookOrder, Data as NautilusData, FundingRateUpdate,
54        IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
55        OrderBookDeltas_API, QuoteTick,
56    },
57    enums::{BookAction, BookType, MarketStatusAction, OrderSide, RecordFlag},
58    identifiers::{ClientId, InstrumentId, Symbol, Venue},
59    instruments::{Instrument, InstrumentAny},
60    orderbook::OrderBook,
61    types::Quantity,
62};
63use rust_decimal::Decimal;
64use tokio::{task::JoinHandle, time::Duration};
65use tokio_util::sync::CancellationToken;
66use ustr::Ustr;
67
68use crate::{
69    common::{
70        consts::DYDX_VENUE,
71        enums::DydxCandleResolution,
72        instrument_cache::InstrumentCache,
73        parse::{extract_raw_symbol, parse_price},
74    },
75    config::DydxDataClientConfig,
76    http::client::DydxHttpClient,
77    websocket::{client::DydxWebSocketClient, enums::DydxWsOutputMessage, parse as ws_parse},
78};
79
80struct WsMessageContext {
81    clock: &'static AtomicTime,
82    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
83    instrument_cache: Arc<InstrumentCache>,
84    order_books: Arc<DashMap<InstrumentId, OrderBook>>,
85    last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
86    ws_client: DydxWebSocketClient,
87    http_client: DydxHttpClient,
88    active_quote_subs: Arc<AtomicSet<InstrumentId>>,
89    active_delta_subs: Arc<AtomicSet<InstrumentId>>,
90    active_trade_subs: Arc<AtomicSet<InstrumentId>>,
91    active_bar_subs: Arc<AtomicMap<(InstrumentId, String), BarType>>,
92    incomplete_bars: Arc<DashMap<BarType, Bar>>,
93    bar_type_mappings: Arc<AtomicMap<String, BarType>>,
94    active_mark_price_subs: Arc<AtomicSet<InstrumentId>>,
95    active_index_price_subs: Arc<AtomicSet<InstrumentId>>,
96    active_funding_rate_subs: Arc<AtomicSet<InstrumentId>>,
97    active_instrument_status_subs: Arc<AtomicSet<InstrumentId>>,
98    last_instrument_statuses: Arc<DashMap<InstrumentId, InstrumentStatus>>,
99    bars_timestamp_on_close: bool,
100    pending_bars: Arc<DashMap<String, Bar>>,
101    seen_tickers: Arc<AtomicSet<Ustr>>,
102}
103
104/// dYdX data client for live market data streaming and historical data requests.
105///
106/// This client integrates with the Nautilus DataEngine to provide:
107/// - Real-time market data via WebSocket subscriptions
108/// - Historical data via REST API requests
109/// - Automatic instrument discovery and caching
110/// - Connection lifecycle management
111#[derive(Debug)]
112pub struct DydxDataClient {
113    clock: &'static AtomicTime,
114    client_id: ClientId,
115    config: DydxDataClientConfig,
116    http_client: DydxHttpClient,
117    ws_client: DydxWebSocketClient,
118    is_connected: AtomicBool,
119    cancellation_token: CancellationToken,
120    tasks: Vec<JoinHandle<()>>,
121    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
122    instrument_cache: Arc<InstrumentCache>,
123    order_books: Arc<DashMap<InstrumentId, OrderBook>>,
124    last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
125    incomplete_bars: Arc<DashMap<BarType, Bar>>,
126    bar_type_mappings: Arc<AtomicMap<String, BarType>>,
127    active_quote_subs: Arc<AtomicSet<InstrumentId>>,
128    active_delta_subs: Arc<AtomicSet<InstrumentId>>,
129    active_trade_subs: Arc<AtomicSet<InstrumentId>>,
130    active_bar_subs: Arc<AtomicMap<(InstrumentId, String), BarType>>,
131    active_mark_price_subs: Arc<AtomicSet<InstrumentId>>,
132    active_index_price_subs: Arc<AtomicSet<InstrumentId>>,
133    active_funding_rate_subs: Arc<AtomicSet<InstrumentId>>,
134    active_instrument_status_subs: Arc<AtomicSet<InstrumentId>>,
135    last_instrument_statuses: Arc<DashMap<InstrumentId, InstrumentStatus>>,
136}
137
138impl DydxDataClient {
139    fn map_bar_spec_to_resolution(spec: &BarSpecification) -> anyhow::Result<&'static str> {
140        let resolution: &'static str = DydxCandleResolution::from_bar_spec(spec)?.into();
141        Ok(resolution)
142    }
143
144    /// Creates a new [`DydxDataClient`] instance.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the client fails to initialize.
149    pub fn new(
150        client_id: ClientId,
151        config: DydxDataClientConfig,
152        http_client: DydxHttpClient,
153        ws_client: DydxWebSocketClient,
154    ) -> anyhow::Result<Self> {
155        let clock = get_atomic_clock_realtime();
156        let data_sender = get_data_event_sender();
157
158        let instrument_cache = Arc::clone(http_client.instrument_cache());
159
160        Ok(Self {
161            clock,
162            client_id,
163            config,
164            http_client,
165            ws_client,
166            is_connected: AtomicBool::new(false),
167            cancellation_token: CancellationToken::new(),
168            tasks: Vec::new(),
169            data_sender,
170            instrument_cache,
171            order_books: Arc::new(DashMap::new()),
172            last_quotes: Arc::new(DashMap::new()),
173            incomplete_bars: Arc::new(DashMap::new()),
174            bar_type_mappings: Arc::new(AtomicMap::new()),
175            active_quote_subs: Arc::new(AtomicSet::new()),
176            active_delta_subs: Arc::new(AtomicSet::new()),
177            active_trade_subs: Arc::new(AtomicSet::new()),
178            active_bar_subs: Arc::new(AtomicMap::new()),
179            active_mark_price_subs: Arc::new(AtomicSet::new()),
180            active_index_price_subs: Arc::new(AtomicSet::new()),
181            active_funding_rate_subs: Arc::new(AtomicSet::new()),
182            active_instrument_status_subs: Arc::new(AtomicSet::new()),
183            last_instrument_statuses: Arc::new(DashMap::new()),
184        })
185    }
186
187    /// Returns the venue for this data client.
188    #[must_use]
189    pub fn venue(&self) -> Venue {
190        *DYDX_VENUE
191    }
192
193    /// Returns a reference to the client configuration.
194    #[must_use]
195    pub fn config(&self) -> &DydxDataClientConfig {
196        &self.config
197    }
198
199    /// Returns `true` when the client is connected.
200    #[must_use]
201    pub fn is_connected(&self) -> bool {
202        self.is_connected.load(Ordering::Relaxed)
203    }
204
205    fn spawn_ws<F>(&self, fut: F, context: &'static str)
206    where
207        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
208    {
209        get_runtime().spawn(async move {
210            if let Err(e) = fut.await {
211                log::error!("{context}: {e:?}");
212            }
213        });
214    }
215
216    fn spawn_ws_stream_handler(
217        &mut self,
218        stream: impl Stream<Item = DydxWsOutputMessage> + Send + 'static,
219        ctx: WsMessageContext,
220    ) {
221        let cancellation = self.cancellation_token.clone();
222
223        let handle = get_runtime().spawn(async move {
224            log::debug!("Message processing task started");
225            pin_mut!(stream);
226
227            loop {
228                tokio::select! {
229                    maybe_msg = stream.next() => {
230                        match maybe_msg {
231                            Some(msg) => Self::handle_ws_message(msg, &ctx),
232                            None => {
233                                log::debug!("WebSocket message channel closed");
234                                break;
235                            }
236                        }
237                    }
238                    () = cancellation.cancelled() => {
239                        log::debug!("WebSocket message task cancelled");
240                        break;
241                    }
242                }
243            }
244            log::debug!("WebSocket stream handler ended");
245        });
246
247        self.tasks.push(handle);
248    }
249
250    async fn await_tasks_with_timeout(&mut self, timeout: Duration) {
251        for handle in self.tasks.drain(..) {
252            let _ = tokio::time::timeout(timeout, handle).await;
253        }
254    }
255
256    async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
257        self.http_client
258            .fetch_and_cache_instruments()
259            .await
260            .context("failed to load instruments from dYdX")?;
261
262        let instruments: Vec<InstrumentAny> = self.http_client.all_instruments();
263
264        if instruments.is_empty() {
265            log::warn!("No instruments were loaded");
266            return Ok(instruments);
267        }
268
269        log::info!("Loaded {} instruments into shared cache", instruments.len());
270
271        self.ws_client.cache_instruments(instruments.clone());
272
273        for instrument in &instruments {
274            if let Err(e) = self
275                .data_sender
276                .send(DataEvent::Instrument(instrument.clone()))
277            {
278                log::warn!("Failed to publish instrument {}: {e}", instrument.id());
279            }
280        }
281        log::debug!("Published {} instruments to data engine", instruments.len());
282
283        Ok(instruments)
284    }
285}
286
287#[async_trait::async_trait(?Send)]
288impl DataClient for DydxDataClient {
289    fn client_id(&self) -> ClientId {
290        self.client_id
291    }
292
293    fn venue(&self) -> Option<Venue> {
294        Some(*DYDX_VENUE)
295    }
296
297    fn start(&mut self) -> anyhow::Result<()> {
298        log::info!(
299            "Starting: client_id={}, is_testnet={}",
300            self.client_id,
301            self.http_client.is_testnet()
302        );
303        Ok(())
304    }
305
306    fn stop(&mut self) -> anyhow::Result<()> {
307        log::info!("Stopping {}", self.client_id);
308        self.cancellation_token.cancel();
309        self.is_connected.store(false, Ordering::Relaxed);
310        Ok(())
311    }
312
313    fn reset(&mut self) -> anyhow::Result<()> {
314        log::debug!("Resetting {}", self.client_id);
315        self.is_connected.store(false, Ordering::Relaxed);
316        self.cancellation_token = CancellationToken::new();
317        // Abort remaining tasks instead of just dropping handles to prevent resource leaks
318        for handle in self.tasks.drain(..) {
319            handle.abort();
320        }
321        Ok(())
322    }
323
324    fn dispose(&mut self) -> anyhow::Result<()> {
325        log::debug!("Disposing {}", self.client_id);
326        self.stop()
327    }
328
329    async fn connect(&mut self) -> anyhow::Result<()> {
330        if self.is_connected() {
331            return Ok(());
332        }
333
334        log::info!("Connecting");
335
336        self.bootstrap_instruments().await?;
337
338        self.ws_client
339            .connect()
340            .await
341            .context("failed to connect dYdX websocket")?;
342
343        self.ws_client
344            .subscribe_markets()
345            .await
346            .context("failed to subscribe to markets channel")?;
347
348        let seen_tickers: Arc<AtomicSet<Ustr>> = Arc::new(AtomicSet::new());
349
350        for instrument in self.instrument_cache.all_instruments() {
351            let id = instrument.id();
352            let ticker = extract_raw_symbol(id.symbol.as_str());
353            seen_tickers.insert(Ustr::from(ticker));
354        }
355
356        let ctx = WsMessageContext {
357            clock: self.clock,
358            data_sender: self.data_sender.clone(),
359            instrument_cache: self.instrument_cache.clone(),
360            order_books: self.order_books.clone(),
361            last_quotes: self.last_quotes.clone(),
362            ws_client: self.ws_client.clone(),
363            http_client: self.http_client.clone(),
364            active_quote_subs: self.active_quote_subs.clone(),
365            active_delta_subs: self.active_delta_subs.clone(),
366            active_trade_subs: self.active_trade_subs.clone(),
367            active_bar_subs: self.active_bar_subs.clone(),
368            incomplete_bars: self.incomplete_bars.clone(),
369            bar_type_mappings: self.bar_type_mappings.clone(),
370            active_mark_price_subs: self.active_mark_price_subs.clone(),
371            active_index_price_subs: self.active_index_price_subs.clone(),
372            active_funding_rate_subs: self.active_funding_rate_subs.clone(),
373            active_instrument_status_subs: self.active_instrument_status_subs.clone(),
374            last_instrument_statuses: self.last_instrument_statuses.clone(),
375            bars_timestamp_on_close: self.ws_client.bars_timestamp_on_close(),
376            pending_bars: Arc::new(DashMap::new()),
377            seen_tickers,
378        };
379
380        let stream = self.ws_client.stream();
381        self.spawn_ws_stream_handler(stream, ctx);
382
383        self.is_connected.store(true, Ordering::Relaxed);
384        log::info!("Connected");
385
386        Ok(())
387    }
388
389    async fn disconnect(&mut self) -> anyhow::Result<()> {
390        if !self.is_connected() {
391            return Ok(());
392        }
393
394        log::info!("Disconnecting");
395
396        self.cancellation_token.cancel();
397
398        self.await_tasks_with_timeout(Duration::from_secs(5)).await;
399
400        self.ws_client
401            .disconnect()
402            .await
403            .context("failed to disconnect dYdX websocket")?;
404
405        self.last_instrument_statuses.clear();
406        self.is_connected.store(false, Ordering::Relaxed);
407        log::info!("Disconnected dYdX data client");
408
409        Ok(())
410    }
411
412    fn is_connected(&self) -> bool {
413        self.is_connected.load(Ordering::Relaxed)
414    }
415
416    fn is_disconnected(&self) -> bool {
417        !self.is_connected()
418    }
419
420    fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
421        log::debug!(
422            "subscribe_instruments: dYdX instruments discovered via global v4_markets channel"
423        );
424        Ok(())
425    }
426
427    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
428        if let Some(instrument) = self.instrument_cache.get(&cmd.instrument_id) {
429            log::debug!("Sending cached instrument for {}", cmd.instrument_id);
430            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
431                log::warn!("Failed to send instrument {}: {e}", cmd.instrument_id);
432            }
433        } else {
434            log::warn!(
435                "Instrument {} not found in cache (available: {})",
436                cmd.instrument_id,
437                self.instrument_cache.len()
438            );
439        }
440        Ok(())
441    }
442
443    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
444        if cmd.book_type != BookType::L2_MBP {
445            anyhow::bail!(
446                "dYdX only supports L2_MBP order book deltas, received {:?}",
447                cmd.book_type
448            );
449        }
450
451        self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
452        self.active_delta_subs.insert(cmd.instrument_id);
453
454        let ws = self.ws_client.clone();
455        let instrument_id = cmd.instrument_id;
456
457        self.spawn_ws(
458            async move {
459                ws.subscribe_orderbook(instrument_id)
460                    .await
461                    .context("orderbook subscription")
462            },
463            "dYdX orderbook subscription",
464        );
465
466        Ok(())
467    }
468
469    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
470        log::debug!(
471            "Subscribe_quotes for {}: subscribing to orderbook WS channel for quote synthesis",
472            cmd.instrument_id
473        );
474
475        self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
476        self.active_quote_subs.insert(cmd.instrument_id);
477        let ws = self.ws_client.clone();
478        let instrument_id = cmd.instrument_id;
479
480        self.spawn_ws(
481            async move {
482                ws.subscribe_orderbook(instrument_id)
483                    .await
484                    .context("orderbook subscription (for quotes)")
485            },
486            "dYdX orderbook subscription (quotes)",
487        );
488
489        Ok(())
490    }
491
492    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
493        let ws = self.ws_client.clone();
494        let instrument_id = cmd.instrument_id;
495
496        self.active_trade_subs.insert(instrument_id);
497
498        self.spawn_ws(
499            async move {
500                ws.subscribe_trades(instrument_id)
501                    .await
502                    .context("trade subscription")
503            },
504            "dYdX trade subscription",
505        );
506
507        Ok(())
508    }
509
510    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
511        let instrument_id = cmd.instrument_id;
512        self.active_mark_price_subs.insert(instrument_id);
513        log::info!("Subscribed to mark prices for {instrument_id} (via v4_markets channel)");
514        Ok(())
515    }
516
517    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
518        let instrument_id = cmd.instrument_id;
519        self.active_index_price_subs.insert(instrument_id);
520        log::info!("Subscribed to index prices for {instrument_id} (via v4_markets channel)");
521        Ok(())
522    }
523
524    fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
525        let ws = self.ws_client.clone();
526        let instrument_id = cmd.bar_type.instrument_id();
527        let spec = cmd.bar_type.spec();
528
529        let resolution = Self::map_bar_spec_to_resolution(&spec)?;
530        let bar_type = cmd.bar_type;
531        self.active_bar_subs
532            .insert((instrument_id, resolution.to_string()), bar_type);
533
534        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
535        let topic = format!("{ticker}/{resolution}");
536        self.bar_type_mappings.insert(topic, bar_type);
537
538        self.spawn_ws(
539            async move {
540                ws.subscribe_candles(instrument_id, resolution)
541                    .await
542                    .context("candles subscription")
543            },
544            "dYdX candles subscription",
545        );
546
547        Ok(())
548    }
549
550    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
551        let instrument_id = cmd.instrument_id;
552        self.active_funding_rate_subs.insert(instrument_id);
553        log::info!("Subscribed to funding rates for {instrument_id} (via v4_markets channel)");
554        Ok(())
555    }
556
557    fn subscribe_instrument_status(
558        &mut self,
559        cmd: SubscribeInstrumentStatus,
560    ) -> anyhow::Result<()> {
561        let instrument_id = cmd.instrument_id;
562        self.active_instrument_status_subs.insert(instrument_id);
563        log::info!("Subscribed to instrument status for {instrument_id} (via v4_markets channel)");
564
565        // Replay last known status (initial snapshot arrives before subscription)
566        if let Some(status) = self.last_instrument_statuses.get(&instrument_id)
567            && let Err(e) = self.data_sender.send(DataEvent::InstrumentStatus(*status))
568        {
569            log::error!("Failed to replay instrument status for {instrument_id}: {e}");
570        }
571
572        Ok(())
573    }
574
575    fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
576        log::debug!("unsubscribe_instruments: dYdX markets channel is global; no-op");
577        Ok(())
578    }
579
580    fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
581        log::debug!("unsubscribe_instrument: dYdX markets channel is global; no-op");
582        Ok(())
583    }
584
585    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
586        self.active_delta_subs.remove(&cmd.instrument_id);
587
588        let ws = self.ws_client.clone();
589        let instrument_id = cmd.instrument_id;
590
591        self.spawn_ws(
592            async move {
593                ws.unsubscribe_orderbook(instrument_id)
594                    .await
595                    .context("orderbook unsubscription")
596            },
597            "dYdX orderbook unsubscription",
598        );
599
600        Ok(())
601    }
602
603    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
604        log::debug!(
605            "unsubscribe_quotes for {}: removing quote subscription",
606            cmd.instrument_id
607        );
608
609        self.active_quote_subs.remove(&cmd.instrument_id);
610
611        let ws = self.ws_client.clone();
612        let instrument_id = cmd.instrument_id;
613
614        self.spawn_ws(
615            async move {
616                ws.unsubscribe_orderbook(instrument_id)
617                    .await
618                    .context("orderbook unsubscription (for quotes)")
619            },
620            "dYdX orderbook unsubscription (quotes)",
621        );
622
623        Ok(())
624    }
625
626    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
627        self.active_trade_subs.remove(&cmd.instrument_id);
628
629        let ws = self.ws_client.clone();
630        let instrument_id = cmd.instrument_id;
631
632        self.spawn_ws(
633            async move {
634                ws.unsubscribe_trades(instrument_id)
635                    .await
636                    .context("trade unsubscription")
637            },
638            "dYdX trade unsubscription",
639        );
640
641        Ok(())
642    }
643
644    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
645        self.active_mark_price_subs.remove(&cmd.instrument_id);
646        log::info!("Unsubscribed from mark prices for {}", cmd.instrument_id);
647        Ok(())
648    }
649
650    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
651        self.active_index_price_subs.remove(&cmd.instrument_id);
652        log::info!("Unsubscribed from index prices for {}", cmd.instrument_id);
653        Ok(())
654    }
655
656    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
657        let ws = self.ws_client.clone();
658        let instrument_id = cmd.bar_type.instrument_id();
659        let spec = cmd.bar_type.spec();
660
661        let resolution = Self::map_bar_spec_to_resolution(&spec)?;
662
663        self.active_bar_subs
664            .remove(&(instrument_id, resolution.to_string()));
665
666        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
667        let topic = format!("{ticker}/{resolution}");
668        self.bar_type_mappings.remove(&topic);
669
670        self.spawn_ws(
671            async move {
672                ws.unsubscribe_candles(instrument_id, resolution)
673                    .await
674                    .context("candles unsubscription")
675            },
676            "dYdX candles unsubscription",
677        );
678
679        Ok(())
680    }
681
682    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
683        self.active_funding_rate_subs.remove(&cmd.instrument_id);
684        log::info!("Unsubscribed from funding rates for {}", cmd.instrument_id);
685        Ok(())
686    }
687
688    fn unsubscribe_instrument_status(
689        &mut self,
690        cmd: &UnsubscribeInstrumentStatus,
691    ) -> anyhow::Result<()> {
692        self.active_instrument_status_subs
693            .remove(&cmd.instrument_id);
694        log::info!(
695            "Unsubscribed from instrument status for {}",
696            cmd.instrument_id
697        );
698        Ok(())
699    }
700
701    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
702        if request.start.is_some() {
703            log::warn!(
704                "Requesting instrument {} with specified `start` which has no effect",
705                request.instrument_id
706            );
707        }
708
709        if request.end.is_some() {
710            log::warn!(
711                "Requesting instrument {} with specified `end` which has no effect",
712                request.instrument_id
713            );
714        }
715
716        let instrument_cache = self.instrument_cache.clone();
717        let sender = self.data_sender.clone();
718        let http = self.http_client.clone();
719        let instrument_id = request.instrument_id;
720        let request_id = request.request_id;
721        let client_id = request.client_id.unwrap_or(self.client_id);
722        let start = request.start;
723        let end = request.end;
724        let params = request.params;
725        let clock = self.clock;
726        let start_nanos = datetime_to_unix_nanos(start);
727        let end_nanos = datetime_to_unix_nanos(end);
728
729        get_runtime().spawn(async move {
730            let instrument = match http.request_instruments(None, None, None).await {
731                Ok(instruments) => {
732                    for inst in &instruments {
733                        instrument_cache.insert_instrument_only(inst.clone());
734                    }
735                    instruments.into_iter().find(|i| i.id() == instrument_id)
736                }
737                Err(e) => {
738                    log::error!("Failed to fetch instruments from dYdX: {e:?}");
739                    None
740                }
741            };
742
743            if let Some(inst) = instrument {
744                let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
745                    request_id,
746                    client_id,
747                    instrument_id,
748                    inst,
749                    start_nanos,
750                    end_nanos,
751                    clock.get_time_ns(),
752                    params,
753                )));
754
755                if let Err(e) = sender.send(DataEvent::Response(response)) {
756                    log::error!("Failed to send instrument response: {e}");
757                }
758            } else {
759                log::error!("Instrument {instrument_id} not found");
760            }
761        });
762
763        Ok(())
764    }
765
766    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
767        let http = self.http_client.clone();
768        let sender = self.data_sender.clone();
769        let instrument_cache = self.instrument_cache.clone();
770        let request_id = request.request_id;
771        let client_id = request.client_id.unwrap_or(self.client_id);
772        let venue = self.venue();
773        let start = request.start;
774        let end = request.end;
775        let params = request.params;
776        let clock = self.clock;
777        let start_nanos = datetime_to_unix_nanos(start);
778        let end_nanos = datetime_to_unix_nanos(end);
779
780        get_runtime().spawn(async move {
781            match http.request_instruments(None, None, None).await {
782                Ok(instruments) => {
783                    log::info!("Fetched {} instruments from dYdX", instruments.len());
784
785                    for instrument in &instruments {
786                        instrument_cache.insert_instrument_only(instrument.clone());
787                    }
788
789                    let response = DataResponse::Instruments(InstrumentsResponse::new(
790                        request_id,
791                        client_id,
792                        venue,
793                        instruments,
794                        start_nanos,
795                        end_nanos,
796                        clock.get_time_ns(),
797                        params,
798                    ));
799
800                    if let Err(e) = sender.send(DataEvent::Response(response)) {
801                        log::error!("Failed to send instruments response: {e}");
802                    }
803                }
804                Err(e) => {
805                    log::error!("Failed to fetch instruments from dYdX: {e:?}");
806
807                    let response = DataResponse::Instruments(InstrumentsResponse::new(
808                        request_id,
809                        client_id,
810                        venue,
811                        Vec::new(),
812                        start_nanos,
813                        end_nanos,
814                        clock.get_time_ns(),
815                        params,
816                    ));
817
818                    if let Err(e) = sender.send(DataEvent::Response(response)) {
819                        log::error!("Failed to send empty instruments response: {e}");
820                    }
821                }
822            }
823        });
824
825        Ok(())
826    }
827
828    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
829        let http_client = self.http_client.clone();
830        let sender = self.data_sender.clone();
831        let instrument_id = request.instrument_id;
832        let start = request.start;
833        let end = request.end;
834        let limit = request.limit.map(|n| n.get() as u32);
835        let request_id = request.request_id;
836        let client_id = request.client_id.unwrap_or(self.client_id);
837        let params = request.params;
838        let clock = self.clock;
839        let start_nanos = datetime_to_unix_nanos(start);
840        let end_nanos = datetime_to_unix_nanos(end);
841
842        get_runtime().spawn(async move {
843            match http_client
844                .request_trade_ticks(instrument_id, start, end, limit)
845                .await
846                .context("failed to request trades from dYdX")
847            {
848                Ok(trades) => {
849                    let response = DataResponse::Trades(TradesResponse::new(
850                        request_id,
851                        client_id,
852                        instrument_id,
853                        trades,
854                        start_nanos,
855                        end_nanos,
856                        clock.get_time_ns(),
857                        params,
858                    ));
859
860                    if let Err(e) = sender.send(DataEvent::Response(response)) {
861                        log::error!("Failed to send trades response: {e}");
862                    }
863                }
864                Err(e) => log::error!("Trade request failed for {instrument_id}: {e:?}"),
865            }
866        });
867
868        Ok(())
869    }
870
871    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
872        let http_client = self.http_client.clone();
873        let sender = self.data_sender.clone();
874        let bar_type = request.bar_type;
875        let start = request.start;
876        let end = request.end;
877        let limit = request.limit.map(|n| n.get() as u32);
878        let request_id = request.request_id;
879        let client_id = request.client_id.unwrap_or(self.client_id);
880        let params = request.params;
881        let clock = self.clock;
882        let start_nanos = datetime_to_unix_nanos(start);
883        let end_nanos = datetime_to_unix_nanos(end);
884
885        get_runtime().spawn(async move {
886            match http_client
887                .request_bars(bar_type, start, end, limit, true)
888                .await
889                .context("failed to request bars from dYdX")
890            {
891                Ok(bars) => {
892                    let response = DataResponse::Bars(BarsResponse::new(
893                        request_id,
894                        client_id,
895                        bar_type,
896                        bars,
897                        start_nanos,
898                        end_nanos,
899                        clock.get_time_ns(),
900                        params,
901                    ));
902
903                    if let Err(e) = sender.send(DataEvent::Response(response)) {
904                        log::error!("Failed to send bars response: {e}");
905                    }
906                }
907                Err(e) => log::error!("Bar request failed for {bar_type}: {e:?}"),
908            }
909        });
910
911        Ok(())
912    }
913
914    fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
915        let http_client = self.http_client.clone();
916        let sender = self.data_sender.clone();
917        let instrument_id = request.instrument_id;
918        let start = request.start;
919        let end = request.end;
920        let limit = request.limit.map(|n| n.get() as u32);
921        let request_id = request.request_id;
922        let client_id = request.client_id.unwrap_or(self.client_id);
923        let params = request.params;
924        let clock = self.clock;
925        let start_nanos = datetime_to_unix_nanos(start);
926        let end_nanos = datetime_to_unix_nanos(end);
927
928        get_runtime().spawn(async move {
929            match http_client
930                .request_funding_rates(instrument_id, start, end, limit)
931                .await
932                .context("failed to request funding rates from dYdX")
933            {
934                Ok(funding_rates) => {
935                    let response = DataResponse::FundingRates(FundingRatesResponse::new(
936                        request_id,
937                        client_id,
938                        instrument_id,
939                        funding_rates,
940                        start_nanos,
941                        end_nanos,
942                        clock.get_time_ns(),
943                        params,
944                    ));
945
946                    if let Err(e) = sender.send(DataEvent::Response(response)) {
947                        log::error!("Failed to send funding rates response: {e}");
948                    }
949                }
950                Err(e) => {
951                    log::error!("Funding rates request failed for {instrument_id}: {e:?}");
952
953                    let response = DataResponse::FundingRates(FundingRatesResponse::new(
954                        request_id,
955                        client_id,
956                        instrument_id,
957                        Vec::new(),
958                        start_nanos,
959                        end_nanos,
960                        clock.get_time_ns(),
961                        params,
962                    ));
963
964                    if let Err(e) = sender.send(DataEvent::Response(response)) {
965                        log::error!("Failed to send empty funding rates response: {e}");
966                    }
967                }
968            }
969        });
970
971        Ok(())
972    }
973}
974
975impl DydxDataClient {
976    /// Returns a cached instrument by InstrumentId.
977    #[must_use]
978    pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
979        self.instrument_cache.get(instrument_id)
980    }
981
982    /// Returns all cached instruments.
983    #[must_use]
984    pub fn get_instruments(&self) -> Vec<InstrumentAny> {
985        self.instrument_cache.all_instruments()
986    }
987
988    /// Caches a single instrument.
989    pub fn cache_instrument(&self, instrument: InstrumentAny) {
990        self.instrument_cache.insert_instrument_only(instrument);
991    }
992
993    /// Caches multiple instruments.
994    ///
995    /// Clears the existing cache first, then adds all provided instruments.
996    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
997        self.instrument_cache.clear();
998        self.instrument_cache.insert_instruments_only(instruments);
999    }
1000
1001    fn ensure_order_book(&self, instrument_id: InstrumentId, book_type: BookType) {
1002        self.order_books
1003            .entry(instrument_id)
1004            .or_insert_with(|| OrderBook::new(instrument_id, book_type));
1005    }
1006
1007    /// Returns the BarType for a given WebSocket candle topic.
1008    #[must_use]
1009    pub fn get_bar_type_for_topic(&self, topic: &str) -> Option<BarType> {
1010        self.bar_type_mappings.load().get(topic).copied()
1011    }
1012
1013    /// Returns all registered bar topics.
1014    #[must_use]
1015    pub fn get_bar_topics(&self) -> Vec<String> {
1016        self.bar_type_mappings.load().keys().cloned().collect()
1017    }
1018
1019    fn handle_ws_message(message: DydxWsOutputMessage, ctx: &WsMessageContext) {
1020        let ts_init = ctx.clock.get_time_ns();
1021
1022        match message {
1023            DydxWsOutputMessage::Trades { id, contents } => {
1024                let Some(instrument) = ctx.instrument_cache.get_by_market(&id) else {
1025                    log::warn!("No instrument cached for market {id}");
1026                    return;
1027                };
1028                let instrument_id = instrument.id();
1029
1030                match ws_parse::parse_trade_ticks(instrument_id, &instrument, &contents, ts_init) {
1031                    Ok(data) => {
1032                        Self::handle_data_message(
1033                            data,
1034                            &ctx.data_sender,
1035                            &ctx.incomplete_bars,
1036                            ctx.clock,
1037                        );
1038                    }
1039                    Err(e) => log::error!("Failed to parse trade ticks for {id}: {e}"),
1040                }
1041            }
1042            DydxWsOutputMessage::OrderbookSnapshot { id, contents } => {
1043                let Some(instrument) = ctx.instrument_cache.get_by_market(&id) else {
1044                    log::warn!("No instrument cached for market {id}");
1045                    return;
1046                };
1047                let instrument_id = instrument.id();
1048
1049                match ws_parse::parse_orderbook_snapshot(
1050                    &instrument_id,
1051                    &contents,
1052                    instrument.price_precision(),
1053                    instrument.size_precision(),
1054                    ts_init,
1055                ) {
1056                    Ok(deltas) => {
1057                        Self::handle_deltas_message(
1058                            deltas,
1059                            &ctx.data_sender,
1060                            &ctx.order_books,
1061                            &ctx.last_quotes,
1062                            &ctx.instrument_cache,
1063                            &ctx.active_quote_subs,
1064                            &ctx.active_delta_subs,
1065                        );
1066                    }
1067                    Err(e) => log::error!("Failed to parse orderbook snapshot for {id}: {e}"),
1068                }
1069            }
1070            DydxWsOutputMessage::OrderbookUpdate { id, contents } => {
1071                let Some(instrument) = ctx.instrument_cache.get_by_market(&id) else {
1072                    log::warn!("No instrument cached for market {id}");
1073                    return;
1074                };
1075                let instrument_id = instrument.id();
1076
1077                match ws_parse::parse_orderbook_deltas(
1078                    &instrument_id,
1079                    &contents,
1080                    instrument.price_precision(),
1081                    instrument.size_precision(),
1082                    ts_init,
1083                ) {
1084                    Ok(deltas) => {
1085                        Self::handle_deltas_message(
1086                            deltas,
1087                            &ctx.data_sender,
1088                            &ctx.order_books,
1089                            &ctx.last_quotes,
1090                            &ctx.instrument_cache,
1091                            &ctx.active_quote_subs,
1092                            &ctx.active_delta_subs,
1093                        );
1094                    }
1095                    Err(e) => log::error!("Failed to parse orderbook deltas for {id}: {e}"),
1096                }
1097            }
1098            DydxWsOutputMessage::OrderbookBatch { id, updates } => {
1099                let Some(instrument) = ctx.instrument_cache.get_by_market(&id) else {
1100                    log::warn!("No instrument cached for market {id}");
1101                    return;
1102                };
1103                let instrument_id = instrument.id();
1104                let price_precision = instrument.price_precision();
1105                let size_precision = instrument.size_precision();
1106
1107                let mut all_deltas = Vec::new();
1108                let last_idx = updates.len().saturating_sub(1);
1109
1110                for (i, update) in updates.iter().enumerate() {
1111                    let is_last = i == last_idx;
1112                    let result = if is_last {
1113                        ws_parse::parse_orderbook_deltas(
1114                            &instrument_id,
1115                            update,
1116                            price_precision,
1117                            size_precision,
1118                            ts_init,
1119                        )
1120                        .map(|d| d.deltas)
1121                    } else {
1122                        ws_parse::parse_orderbook_deltas_with_flag(
1123                            &instrument_id,
1124                            update,
1125                            price_precision,
1126                            size_precision,
1127                            ts_init,
1128                            false,
1129                        )
1130                    };
1131
1132                    match result {
1133                        Ok(deltas) => all_deltas.extend(deltas),
1134                        Err(e) => {
1135                            log::error!("Failed to parse orderbook batch delta {i} for {id}: {e}");
1136                            return;
1137                        }
1138                    }
1139                }
1140
1141                if all_deltas.is_empty() {
1142                    return;
1143                }
1144                let deltas = OrderBookDeltas::new(instrument_id, all_deltas);
1145                Self::handle_deltas_message(
1146                    deltas,
1147                    &ctx.data_sender,
1148                    &ctx.order_books,
1149                    &ctx.last_quotes,
1150                    &ctx.instrument_cache,
1151                    &ctx.active_quote_subs,
1152                    &ctx.active_delta_subs,
1153                );
1154            }
1155            DydxWsOutputMessage::Candles { id, contents } => {
1156                let parts: Vec<&str> = id.splitn(2, '/').collect();
1157                if parts.len() != 2 {
1158                    log::warn!("Unexpected candle topic format: {id}");
1159                    return;
1160                }
1161                let ticker = parts[0];
1162
1163                let Some(bar_type) = ctx.bar_type_mappings.load().get(&id).copied() else {
1164                    log::debug!("No bar type mapping for candle topic {id}");
1165                    return;
1166                };
1167
1168                let Some(instrument) = ctx.instrument_cache.get_by_market(ticker) else {
1169                    log::warn!("No instrument cached for market {ticker}");
1170                    return;
1171                };
1172
1173                match ws_parse::parse_candle_bar(
1174                    bar_type,
1175                    &instrument,
1176                    &contents,
1177                    ctx.bars_timestamp_on_close,
1178                    ts_init,
1179                ) {
1180                    Ok(bar) => {
1181                        let prev = ctx.pending_bars.get(&id).map(|r| *r);
1182                        if let Some(prev_bar) = prev
1183                            && bar.ts_event != prev_bar.ts_event
1184                        {
1185                            Self::emit_bar_guarded(prev_bar, ctx);
1186                        }
1187                        ctx.pending_bars.insert(id, bar);
1188                    }
1189                    Err(e) => log::error!("Failed to parse candle bar for {id}: {e}"),
1190                }
1191            }
1192            DydxWsOutputMessage::Markets(contents) => {
1193                Self::handle_markets_message(&contents, ctx, ts_init);
1194            }
1195            DydxWsOutputMessage::SubaccountSubscribed(_) => {
1196                log::debug!("Ignoring subaccount subscribed on data client");
1197            }
1198            DydxWsOutputMessage::SubaccountsChannelData(_) => {
1199                log::debug!("Ignoring subaccounts channel data on data client");
1200            }
1201            DydxWsOutputMessage::BlockHeight { .. } => {
1202                log::debug!("Ignoring block height on data client");
1203            }
1204            DydxWsOutputMessage::Error(err) => {
1205                log::error!("dYdX WS error: {err}");
1206            }
1207            DydxWsOutputMessage::Reconnected => {
1208                log::info!("dYdX WS reconnected, re-subscribing to active subscriptions");
1209                ctx.pending_bars.clear();
1210
1211                let total_subs = ctx.active_quote_subs.len()
1212                    + ctx.active_delta_subs.len()
1213                    + ctx.active_trade_subs.len()
1214                    + ctx.active_bar_subs.len();
1215
1216                if total_subs == 0 {
1217                    log::debug!("No active subscriptions to restore");
1218                    return;
1219                }
1220
1221                log::info!(
1222                    "Restoring {} subscriptions (quotes={}, deltas={}, trades={}, bars={})",
1223                    total_subs,
1224                    ctx.active_quote_subs.len(),
1225                    ctx.active_delta_subs.len(),
1226                    ctx.active_trade_subs.len(),
1227                    ctx.active_bar_subs.len()
1228                );
1229
1230                for instrument_id in ctx.active_quote_subs.load().iter().copied() {
1231                    let ws_clone = ctx.ws_client.clone();
1232                    get_runtime().spawn(async move {
1233                        if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1234                            log::error!(
1235                                "Failed to re-subscribe to orderbook (quotes) for {instrument_id}: {e:?}"
1236                            );
1237                        } else {
1238                            log::debug!("Re-subscribed to orderbook (quotes) for {instrument_id}");
1239                        }
1240                    });
1241                }
1242
1243                for instrument_id in ctx.active_delta_subs.load().iter().copied() {
1244                    let ws_clone = ctx.ws_client.clone();
1245                    get_runtime().spawn(async move {
1246                        if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1247                            log::error!(
1248                                "Failed to re-subscribe to orderbook (deltas) for {instrument_id}: {e:?}"
1249                            );
1250                        } else {
1251                            log::debug!("Re-subscribed to orderbook (deltas) for {instrument_id}");
1252                        }
1253                    });
1254                }
1255
1256                for instrument_id in ctx.active_trade_subs.load().iter().copied() {
1257                    let ws_clone = ctx.ws_client.clone();
1258                    get_runtime().spawn(async move {
1259                        if let Err(e) = ws_clone.subscribe_trades(instrument_id).await {
1260                            log::error!(
1261                                "Failed to re-subscribe to trades for {instrument_id}: {e:?}"
1262                            );
1263                        } else {
1264                            log::debug!("Re-subscribed to trades for {instrument_id}");
1265                        }
1266                    });
1267                }
1268
1269                for ((instrument_id, resolution), _) in ctx.active_bar_subs.load().iter() {
1270                    let instrument_id = *instrument_id;
1271                    let resolution = resolution.clone();
1272                    let ws_clone = ctx.ws_client.clone();
1273
1274                    get_runtime().spawn(async move {
1275                        if let Err(e) =
1276                            ws_clone.subscribe_candles(instrument_id, &resolution).await
1277                        {
1278                            log::error!(
1279                                "Failed to re-subscribe to candles for {instrument_id} ({resolution}): {e:?}"
1280                            );
1281                        } else {
1282                            log::debug!(
1283                                "Re-subscribed to candles for {instrument_id} ({resolution})"
1284                            );
1285                        }
1286                    });
1287                }
1288
1289                log::info!("Completed re-subscription requests after reconnection");
1290            }
1291        }
1292    }
1293
1294    fn instrument_id_from_ticker(ticker: &str) -> InstrumentId {
1295        let symbol = format!("{ticker}-PERP");
1296        InstrumentId::new(Symbol::new(&symbol), *DYDX_VENUE)
1297    }
1298
1299    fn handle_markets_message(
1300        contents: &crate::websocket::messages::DydxMarketsContents,
1301        ctx: &WsMessageContext,
1302        ts_init: nautilus_core::UnixNanos,
1303    ) {
1304        if let Some(ref oracle_prices) = contents.oracle_prices {
1305            for (ticker, oracle_data) in oracle_prices {
1306                let instrument_id = Self::instrument_id_from_ticker(ticker);
1307
1308                let Ok(price) = parse_price(&oracle_data.oracle_price, "oracle_price") else {
1309                    log::warn!("Failed to parse oracle price for {ticker}");
1310                    continue;
1311                };
1312
1313                if ctx.active_mark_price_subs.contains(&instrument_id) {
1314                    let mark_price = MarkPriceUpdate::new(instrument_id, price, ts_init, ts_init);
1315                    let data = NautilusData::MarkPriceUpdate(mark_price);
1316                    if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
1317                        log::error!("Failed to emit mark price for {instrument_id}: {e}");
1318                    }
1319                }
1320
1321                if ctx.active_index_price_subs.contains(&instrument_id) {
1322                    let index_price = IndexPriceUpdate::new(instrument_id, price, ts_init, ts_init);
1323                    let data = NautilusData::IndexPriceUpdate(index_price);
1324                    if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
1325                        log::error!("Failed to emit index price for {instrument_id}: {e}");
1326                    }
1327                }
1328            }
1329        }
1330
1331        Self::handle_markets_trading_data(contents.trading.as_ref(), ctx, ts_init, false);
1332        Self::handle_markets_trading_data(contents.markets.as_ref(), ctx, ts_init, true);
1333    }
1334
1335    fn handle_markets_trading_data(
1336        trading: Option<
1337            &std::collections::HashMap<String, crate::websocket::messages::DydxMarketTradingUpdate>,
1338        >,
1339        ctx: &WsMessageContext,
1340        ts_init: nautilus_core::UnixNanos,
1341        is_snapshot: bool,
1342    ) {
1343        let Some(trading_map) = trading else {
1344            return;
1345        };
1346
1347        for (ticker, update) in trading_map {
1348            let instrument_id = Self::instrument_id_from_ticker(ticker);
1349
1350            if let Some(status) = &update.status {
1351                let action = MarketStatusAction::from(*status);
1352                let is_trading = matches!(status, crate::common::enums::DydxMarketStatus::Active);
1353
1354                let instrument_status = InstrumentStatus::new(
1355                    instrument_id,
1356                    action,
1357                    ts_init,
1358                    ts_init,
1359                    None,
1360                    None,
1361                    Some(is_trading),
1362                    None,
1363                    None,
1364                );
1365
1366                ctx.last_instrument_statuses
1367                    .insert(instrument_id, instrument_status);
1368
1369                if ctx.active_instrument_status_subs.contains(&instrument_id)
1370                    && let Err(e) = ctx
1371                        .data_sender
1372                        .send(DataEvent::InstrumentStatus(instrument_status))
1373                {
1374                    log::error!("Failed to emit instrument status for {instrument_id}: {e}");
1375                }
1376            }
1377
1378            let ticker_ustr = Ustr::from(ticker.as_str());
1379            if !ctx.seen_tickers.contains(&ticker_ustr) {
1380                let is_active = update
1381                    .status
1382                    .as_ref()
1383                    .is_none_or(|s| matches!(s, crate::common::enums::DydxMarketStatus::Active));
1384                if ctx.instrument_cache.get_by_market(ticker).is_some() {
1385                    ctx.seen_tickers.insert(ticker_ustr);
1386                } else if is_active {
1387                    ctx.seen_tickers.insert(ticker_ustr);
1388                    Self::handle_new_instrument_discovered(ticker, ctx);
1389                }
1390            }
1391
1392            if let Some(ref rate_str) = update.next_funding_rate {
1393                if let Ok(rate) = Decimal::from_str(rate_str) {
1394                    if ctx.active_funding_rate_subs.contains(&instrument_id) {
1395                        let funding_rate = FundingRateUpdate {
1396                            instrument_id,
1397                            rate,
1398                            interval: Some(60),
1399                            next_funding_ns: None,
1400                            ts_event: ts_init,
1401                            ts_init,
1402                        };
1403
1404                        if let Err(e) = ctx.data_sender.send(DataEvent::FundingRate(funding_rate)) {
1405                            log::error!("Failed to emit funding rate for {instrument_id}: {e}");
1406                        }
1407                    }
1408                } else {
1409                    log::warn!("Failed to parse next_funding_rate for {ticker}: {rate_str}");
1410                }
1411            }
1412
1413            if is_snapshot
1414                && let Some(ref oracle_price_str) = update.oracle_price
1415                && let Ok(price) = parse_price(oracle_price_str, "oracle_price")
1416            {
1417                if ctx.active_mark_price_subs.contains(&instrument_id) {
1418                    let mark_price = MarkPriceUpdate::new(instrument_id, price, ts_init, ts_init);
1419                    let data = NautilusData::MarkPriceUpdate(mark_price);
1420
1421                    if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
1422                        log::error!("Failed to emit mark price for {instrument_id}: {e}");
1423                    }
1424                }
1425
1426                if ctx.active_index_price_subs.contains(&instrument_id) {
1427                    let index_price = IndexPriceUpdate::new(instrument_id, price, ts_init, ts_init);
1428                    let data = NautilusData::IndexPriceUpdate(index_price);
1429
1430                    if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
1431                        log::error!("Failed to emit index price for {instrument_id}: {e}");
1432                    }
1433                }
1434            }
1435        }
1436    }
1437
1438    fn emit_bar_guarded(bar: Bar, ctx: &WsMessageContext) {
1439        let current_time_ns = ctx.clock.get_time_ns();
1440        if bar.ts_event <= current_time_ns {
1441            ctx.incomplete_bars.remove(&bar.bar_type);
1442            if let Err(e) = ctx
1443                .data_sender
1444                .send(DataEvent::Data(NautilusData::Bar(bar)))
1445            {
1446                log::error!("Failed to emit completed bar: {e}");
1447            }
1448        } else {
1449            ctx.incomplete_bars.insert(bar.bar_type, bar);
1450        }
1451    }
1452
1453    fn handle_new_instrument_discovered(ticker: &str, ctx: &WsMessageContext) {
1454        log::info!("New instrument discovered via WebSocket: {ticker}");
1455
1456        let http_client = ctx.http_client.clone();
1457        let ws_client = ctx.ws_client.clone();
1458        let data_sender = ctx.data_sender.clone();
1459        let ticker = ticker.to_string();
1460
1461        get_runtime().spawn(async move {
1462            match http_client.fetch_and_cache_single_instrument(&ticker).await {
1463                Ok(Some(instrument)) => {
1464                    ws_client.cache_instrument(instrument.clone());
1465                    if let Err(e) = data_sender.send(DataEvent::Instrument(instrument)) {
1466                        log::error!("Failed to emit new instrument: {e}");
1467                    }
1468                    log::info!("Fetched and cached new instrument: {ticker}");
1469                }
1470                Ok(None) => {
1471                    log::warn!("New instrument {ticker} not found or inactive");
1472                }
1473                Err(e) => {
1474                    log::error!("Failed to fetch new instrument {ticker}: {e}");
1475                }
1476            }
1477        });
1478    }
1479
1480    fn handle_data_message(
1481        payloads: Vec<NautilusData>,
1482        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1483        incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1484        clock: &'static AtomicTime,
1485    ) {
1486        for data in payloads {
1487            // Filter bars through incomplete bars cache
1488            if let NautilusData::Bar(bar) = data {
1489                Self::handle_bar_message(bar, data_sender, incomplete_bars, clock);
1490            } else if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1491                log::error!("Failed to emit data event: {e}");
1492            }
1493        }
1494    }
1495
1496    fn handle_bar_message(
1497        bar: Bar,
1498        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1499        incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1500        clock: &'static AtomicTime,
1501    ) {
1502        let current_time_ns = clock.get_time_ns();
1503        let bar_type = bar.bar_type;
1504
1505        if bar.ts_event <= current_time_ns {
1506            // Bar is complete - emit it and remove from incomplete cache
1507            incomplete_bars.remove(&bar_type);
1508
1509            if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Bar(bar))) {
1510                log::error!("Failed to emit completed bar: {e}");
1511            }
1512        } else {
1513            // Bar is incomplete - cache it (updates existing entry)
1514            log::trace!(
1515                "Caching incomplete bar for {} (ts_event={}, current={})",
1516                bar_type,
1517                bar.ts_event,
1518                current_time_ns
1519            );
1520            incomplete_bars.insert(bar_type, bar);
1521        }
1522    }
1523
1524    fn resolve_crossed_order_book(
1525        book: &mut OrderBook,
1526        venue_deltas: &OrderBookDeltas,
1527        instrument: &InstrumentAny,
1528    ) -> anyhow::Result<OrderBookDeltas> {
1529        let instrument_id = venue_deltas.instrument_id;
1530        let ts_init = venue_deltas.ts_init;
1531        let mut all_deltas = venue_deltas.deltas.clone();
1532
1533        // If the input batch is a snapshot, every synthetic and terminator delta must
1534        // carry F_SNAPSHOT as well so consumers apply the whole batch as one
1535        // replacement image rather than a snapshot followed by standalone updates.
1536        let snapshot_flag = RecordFlag::F_SNAPSHOT as u8;
1537        let is_snapshot_batch = venue_deltas
1538            .deltas
1539            .iter()
1540            .any(|d| d.flags & snapshot_flag != 0);
1541        let synthetic_flags = if is_snapshot_batch { snapshot_flag } else { 0 };
1542
1543        // Apply the original venue deltas first
1544        book.apply_deltas(venue_deltas)?;
1545
1546        // Check if orderbook is crossed
1547        let mut is_crossed = if let (Some(bid_price), Some(ask_price)) =
1548            (book.best_bid_price(), book.best_ask_price())
1549        {
1550            bid_price >= ask_price
1551        } else {
1552            false
1553        };
1554
1555        // Iteratively uncross the orderbook
1556        while is_crossed {
1557            log::debug!(
1558                "Resolving crossed order book for {}: bid={:?} >= ask={:?}",
1559                instrument_id,
1560                book.best_bid_price(),
1561                book.best_ask_price()
1562            );
1563
1564            let bid_price = match book.best_bid_price() {
1565                Some(p) => p,
1566                None => break,
1567            };
1568            let ask_price = match book.best_ask_price() {
1569                Some(p) => p,
1570                None => break,
1571            };
1572            let bid_size = match book.best_bid_size() {
1573                Some(s) => s,
1574                None => break,
1575            };
1576            let ask_size = match book.best_ask_size() {
1577                Some(s) => s,
1578                None => break,
1579            };
1580
1581            let mut temp_deltas = Vec::new();
1582
1583            if bid_size > ask_size {
1584                // Remove ask level, reduce bid level
1585                let new_bid_size = Quantity::from_decimal_dp(
1586                    bid_size.as_decimal() - ask_size.as_decimal(),
1587                    instrument.size_precision(),
1588                )?;
1589                temp_deltas.push(OrderBookDelta::new(
1590                    instrument_id,
1591                    BookAction::Update,
1592                    BookOrder::new(OrderSide::Buy, bid_price, new_bid_size, 0),
1593                    synthetic_flags,
1594                    0,
1595                    ts_init,
1596                    ts_init,
1597                ));
1598                temp_deltas.push(OrderBookDelta::new(
1599                    instrument_id,
1600                    BookAction::Delete,
1601                    BookOrder::new(
1602                        OrderSide::Sell,
1603                        ask_price,
1604                        Quantity::zero(instrument.size_precision()),
1605                        0,
1606                    ),
1607                    synthetic_flags,
1608                    0,
1609                    ts_init,
1610                    ts_init,
1611                ));
1612            } else if bid_size < ask_size {
1613                // Remove bid level, reduce ask level
1614                let new_ask_size = Quantity::from_decimal_dp(
1615                    ask_size.as_decimal() - bid_size.as_decimal(),
1616                    instrument.size_precision(),
1617                )?;
1618                temp_deltas.push(OrderBookDelta::new(
1619                    instrument_id,
1620                    BookAction::Update,
1621                    BookOrder::new(OrderSide::Sell, ask_price, new_ask_size, 0),
1622                    synthetic_flags,
1623                    0,
1624                    ts_init,
1625                    ts_init,
1626                ));
1627                temp_deltas.push(OrderBookDelta::new(
1628                    instrument_id,
1629                    BookAction::Delete,
1630                    BookOrder::new(
1631                        OrderSide::Buy,
1632                        bid_price,
1633                        Quantity::zero(instrument.size_precision()),
1634                        0,
1635                    ),
1636                    synthetic_flags,
1637                    0,
1638                    ts_init,
1639                    ts_init,
1640                ));
1641            } else {
1642                // Equal sizes: remove both levels
1643                temp_deltas.push(OrderBookDelta::new(
1644                    instrument_id,
1645                    BookAction::Delete,
1646                    BookOrder::new(
1647                        OrderSide::Buy,
1648                        bid_price,
1649                        Quantity::zero(instrument.size_precision()),
1650                        0,
1651                    ),
1652                    synthetic_flags,
1653                    0,
1654                    ts_init,
1655                    ts_init,
1656                ));
1657                temp_deltas.push(OrderBookDelta::new(
1658                    instrument_id,
1659                    BookAction::Delete,
1660                    BookOrder::new(
1661                        OrderSide::Sell,
1662                        ask_price,
1663                        Quantity::zero(instrument.size_precision()),
1664                        0,
1665                    ),
1666                    synthetic_flags,
1667                    0,
1668                    ts_init,
1669                    ts_init,
1670                ));
1671            }
1672
1673            // Apply temporary deltas to the book
1674            let temp_deltas_obj = OrderBookDeltas::new(instrument_id, temp_deltas.clone());
1675            book.apply_deltas(&temp_deltas_obj)?;
1676            all_deltas.extend(temp_deltas);
1677
1678            // Check if still crossed
1679            is_crossed = if let (Some(bid_price), Some(ask_price)) =
1680                (book.best_bid_price(), book.best_ask_price())
1681            {
1682                bid_price >= ask_price
1683            } else {
1684                false
1685            };
1686        }
1687
1688        // Set F_LAST on the final delta, preserving F_SNAPSHOT when the batch is a
1689        // snapshot so consumers close the replacement image correctly.
1690        if let Some(last_delta) = all_deltas.last_mut() {
1691            last_delta.flags = synthetic_flags | RecordFlag::F_LAST as u8;
1692        }
1693
1694        Ok(OrderBookDeltas::new(instrument_id, all_deltas))
1695    }
1696
1697    fn handle_deltas_message(
1698        deltas: OrderBookDeltas,
1699        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1700        order_books: &Arc<DashMap<InstrumentId, OrderBook>>,
1701        last_quotes: &Arc<DashMap<InstrumentId, QuoteTick>>,
1702        instrument_cache: &Arc<InstrumentCache>,
1703        active_quote_subs: &Arc<AtomicSet<InstrumentId>>,
1704        active_delta_subs: &Arc<AtomicSet<InstrumentId>>,
1705    ) {
1706        let instrument_id = deltas.instrument_id;
1707
1708        // Get instrument for crossed orderbook resolution
1709        let instrument = match instrument_cache.get(&instrument_id) {
1710            Some(inst) => inst,
1711            None => {
1712                log::error!("Cannot resolve crossed order book: no instrument for {instrument_id}");
1713                // Still emit the raw deltas if delta subscription is active
1714                if active_delta_subs.contains(&instrument_id)
1715                    && let Err(e) = data_sender.send(DataEvent::Data(NautilusData::from(
1716                        OrderBookDeltas_API::new(deltas),
1717                    )))
1718                {
1719                    log::error!("Failed to emit order book deltas: {e}");
1720                }
1721                return;
1722            }
1723        };
1724
1725        // Always maintain local orderbook -- both subscription types need book state
1726        let mut book = order_books
1727            .entry(instrument_id)
1728            .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
1729
1730        // Resolve crossed orderbook (applies deltas internally)
1731        let resolved_deltas =
1732            match Self::resolve_crossed_order_book(&mut book, &deltas, &instrument) {
1733                Ok(d) => d,
1734                Err(e) => {
1735                    log::error!("Failed to resolve crossed order book for {instrument_id}: {e}");
1736                    return;
1737                }
1738            };
1739
1740        // Conditionally emit QuoteTick if instrument has quote subscription
1741        if active_quote_subs.contains(&instrument_id) {
1742            // Generate QuoteTick from updated top-of-book
1743            // Edge case: If orderbook is empty after deltas, fall back to last quote
1744            let quote_opt = if let (Some(bid_price), Some(ask_price)) =
1745                (book.best_bid_price(), book.best_ask_price())
1746                && let (Some(bid_size), Some(ask_size)) =
1747                    (book.best_bid_size(), book.best_ask_size())
1748            {
1749                Some(QuoteTick::new(
1750                    instrument_id,
1751                    bid_price,
1752                    ask_price,
1753                    bid_size,
1754                    ask_size,
1755                    resolved_deltas.ts_event,
1756                    resolved_deltas.ts_init,
1757                ))
1758            } else {
1759                // Edge case: Empty orderbook levels - use last quote as fallback
1760                if book.best_bid_price().is_none() && book.best_ask_price().is_none() {
1761                    log::debug!(
1762                        "Empty orderbook for {instrument_id} after applying deltas, using last quote"
1763                    );
1764                    last_quotes.get(&instrument_id).map(|q| *q)
1765                } else {
1766                    None
1767                }
1768            };
1769
1770            if let Some(quote) = quote_opt {
1771                // Only emit when top-of-book changes
1772                let emit_quote = !matches!(
1773                    last_quotes.get(&instrument_id),
1774                    Some(existing) if *existing == quote
1775                );
1776
1777                if emit_quote {
1778                    last_quotes.insert(instrument_id, quote);
1779                    if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Quote(quote))) {
1780                        log::error!("Failed to emit quote tick: {e}");
1781                    }
1782                }
1783            } else if book.best_bid_price().is_some() || book.best_ask_price().is_some() {
1784                // Partial orderbook (only one side) - log but don't emit
1785                log::debug!(
1786                    "Incomplete top-of-book for {instrument_id} (bid={:?}, ask={:?})",
1787                    book.best_bid_price(),
1788                    book.best_ask_price()
1789                );
1790            }
1791        }
1792
1793        // Conditionally emit OrderBookDeltas if instrument has delta subscription
1794        if active_delta_subs.contains(&instrument_id) {
1795            let data: NautilusData = OrderBookDeltas_API::new(resolved_deltas).into();
1796            if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1797                log::error!("Failed to emit order book deltas event: {e}");
1798            }
1799        }
1800    }
1801}
1802
1803#[cfg(test)]
1804mod tests {
1805    use nautilus_core::UnixNanos;
1806    use nautilus_model::{
1807        data::{BookOrder, OrderBookDelta, OrderBookDeltas},
1808        enums::{BookAction, BookType, OrderSide, RecordFlag},
1809        identifiers::{InstrumentId, Symbol, Venue},
1810        instruments::{CryptoPerpetual, InstrumentAny},
1811        orderbook::OrderBook,
1812        types::{Currency, Price, Quantity},
1813    };
1814    use rstest::rstest;
1815    use rust_decimal_macros::dec;
1816
1817    use super::*;
1818
1819    fn test_instrument() -> InstrumentAny {
1820        let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1821        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
1822            instrument_id,
1823            instrument_id.symbol,
1824            Currency::BTC(),
1825            Currency::USD(),
1826            Currency::USD(),
1827            false,
1828            2,                   // price_precision
1829            8,                   // size_precision (wide enough to reveal f64 rounding)
1830            Price::new(0.01, 2), // price_increment
1831            Quantity::new(0.00000001, 8),
1832            None,
1833            None,
1834            None,
1835            None,
1836            None,
1837            None,
1838            None,
1839            None,
1840            None,
1841            None,
1842            None,
1843            None,
1844            None,
1845            UnixNanos::default(),
1846            UnixNanos::default(),
1847        ))
1848    }
1849
1850    fn seed_book_with_levels(
1851        instrument_id: InstrumentId,
1852        bids: &[(f64, f64)],
1853        asks: &[(f64, f64)],
1854    ) -> OrderBook {
1855        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1856        let ts = UnixNanos::default();
1857
1858        let mut deltas: Vec<OrderBookDelta> = Vec::new();
1859        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts, ts));
1860        for (price, size) in bids {
1861            deltas.push(OrderBookDelta::new(
1862                instrument_id,
1863                BookAction::Add,
1864                BookOrder::new(
1865                    OrderSide::Buy,
1866                    Price::new(*price, 2),
1867                    Quantity::new(*size, 8),
1868                    0,
1869                ),
1870                0,
1871                0,
1872                ts,
1873                ts,
1874            ));
1875        }
1876
1877        for (price, size) in asks {
1878            deltas.push(OrderBookDelta::new(
1879                instrument_id,
1880                BookAction::Add,
1881                BookOrder::new(
1882                    OrderSide::Sell,
1883                    Price::new(*price, 2),
1884                    Quantity::new(*size, 8),
1885                    0,
1886                ),
1887                0,
1888                0,
1889                ts,
1890                ts,
1891            ));
1892        }
1893
1894        if let Some(last) = deltas.last_mut() {
1895            last.flags = RecordFlag::F_LAST as u8;
1896        }
1897
1898        book.apply_deltas(&OrderBookDeltas::new(instrument_id, deltas))
1899            .expect("failed to apply seed deltas");
1900        book
1901    }
1902
1903    fn crossing_bid_deltas(
1904        instrument_id: InstrumentId,
1905        bid_price: f64,
1906        bid_size: f64,
1907    ) -> OrderBookDeltas {
1908        let ts = UnixNanos::default();
1909        let delta = OrderBookDelta::new(
1910            instrument_id,
1911            BookAction::Add,
1912            BookOrder::new(
1913                OrderSide::Buy,
1914                Price::new(bid_price, 2),
1915                Quantity::new(bid_size, 8),
1916                0,
1917            ),
1918            RecordFlag::F_LAST as u8,
1919            0,
1920            ts,
1921            ts,
1922        );
1923        OrderBookDeltas::new(instrument_id, vec![delta])
1924    }
1925
1926    #[rstest]
1927    fn test_resolve_crossed_order_book_preserves_decimal_precision() {
1928        // Book seeded uncrossed: bid at 99.00 / ask at 100.05 size=0.50000000.
1929        // Venue delta adds a crossing bid at 100.10 size=1.00000001.
1930        // The reducing side (Buy) must end up at size = 0.50000001 exactly --
1931        // f64 subtraction of 1.00000001 - 0.5 would round this to 0.50000000 at 8 dp.
1932        let instrument = test_instrument();
1933        let instrument_id = instrument.id();
1934        let mut book = seed_book_with_levels(
1935            instrument_id,
1936            &[(99.00, 1.00000000)],
1937            &[(100.05, 0.50000000)],
1938        );
1939
1940        let venue_deltas = crossing_bid_deltas(instrument_id, 100.10, 1.00000001);
1941
1942        let resolved =
1943            DydxDataClient::resolve_crossed_order_book(&mut book, &venue_deltas, &instrument)
1944                .expect("resolution should succeed");
1945
1946        // An Update on the Buy side at the crossing price must carry the exact
1947        // Decimal-subtracted remainder (0.50000001), not the f64-rounded 0.50000000.
1948        let update = resolved
1949            .deltas
1950            .iter()
1951            .find(|d| {
1952                d.action == BookAction::Update
1953                    && d.order.side == OrderSide::Buy
1954                    && d.order.price.as_decimal() == dec!(100.10)
1955            })
1956            .expect("expected a Buy Update delta from crossed-book resolution");
1957        assert_eq!(update.order.size.as_decimal(), dec!(0.50000001));
1958
1959        // The terminal delta must carry F_LAST so downstream buffering flushes.
1960        assert_eq!(
1961            resolved.deltas.last().unwrap().flags,
1962            RecordFlag::F_LAST as u8,
1963        );
1964
1965        // Book is no longer crossed.
1966        if let (Some(bid), Some(ask)) = (book.best_bid_price(), book.best_ask_price()) {
1967            assert!(bid < ask, "book still crossed: bid={bid:?} ask={ask:?}");
1968        }
1969    }
1970
1971    fn crossing_snapshot_batch(
1972        instrument_id: InstrumentId,
1973        bid_price: f64,
1974        bid_size: f64,
1975    ) -> OrderBookDeltas {
1976        let ts = UnixNanos::default();
1977        let snapshot = RecordFlag::F_SNAPSHOT as u8;
1978        let last = RecordFlag::F_LAST as u8;
1979        // Mimic an inbound snapshot: every delta carries F_SNAPSHOT; terminator also
1980        // carries F_LAST.
1981        let deltas = vec![OrderBookDelta::new(
1982            instrument_id,
1983            BookAction::Add,
1984            BookOrder::new(
1985                OrderSide::Buy,
1986                Price::new(bid_price, 2),
1987                Quantity::new(bid_size, 8),
1988                0,
1989            ),
1990            snapshot | last,
1991            0,
1992            ts,
1993            ts,
1994        )];
1995        OrderBookDeltas::new(instrument_id, deltas)
1996    }
1997
1998    /// A crossed snapshot must exit `resolve_crossed_order_book` still flagged as a
1999    /// snapshot: every delta keeps `F_SNAPSHOT` and the terminator carries
2000    /// `F_SNAPSHOT | F_LAST`, so downstream consumers treat the emitted batch as a
2001    /// complete replacement image.
2002    #[rstest]
2003    fn test_resolve_crossed_order_book_preserves_snapshot_flags() {
2004        let instrument = test_instrument();
2005        let instrument_id = instrument.id();
2006        let mut book = seed_book_with_levels(
2007            instrument_id,
2008            &[(99.00, 1.00000000)],
2009            &[(100.05, 0.50000000)],
2010        );
2011
2012        let venue_deltas = crossing_snapshot_batch(instrument_id, 100.10, 1.00000001);
2013
2014        let resolved =
2015            DydxDataClient::resolve_crossed_order_book(&mut book, &venue_deltas, &instrument)
2016                .expect("resolution should succeed");
2017
2018        let snapshot = RecordFlag::F_SNAPSHOT as u8;
2019        let last = RecordFlag::F_LAST as u8;
2020
2021        // Every delta must still carry F_SNAPSHOT; the terminator carries both.
2022        for (idx, delta) in resolved.deltas.iter().enumerate() {
2023            assert!(
2024                delta.flags & snapshot != 0,
2025                "delta at index {idx} lost F_SNAPSHOT: flags={:#010b}",
2026                delta.flags,
2027            );
2028        }
2029        assert_eq!(
2030            resolved.deltas.last().unwrap().flags,
2031            snapshot | last,
2032            "snapshot terminator must be F_SNAPSHOT | F_LAST",
2033        );
2034    }
2035
2036    #[rstest]
2037    fn test_resolve_crossed_order_book_equal_sizes_removes_both_levels() {
2038        // Seed bid at 99.00 / ask at 100.05 size=1.0, then add a crossing bid at
2039        // 100.10 size=1.0 -- both top-of-book sides match in size and must be deleted.
2040        let instrument = test_instrument();
2041        let instrument_id = instrument.id();
2042        let mut book = seed_book_with_levels(
2043            instrument_id,
2044            &[(99.00, 1.00000000)],
2045            &[(100.05, 1.00000000)],
2046        );
2047
2048        let venue_deltas = crossing_bid_deltas(instrument_id, 100.10, 1.00000000);
2049
2050        let resolved =
2051            DydxDataClient::resolve_crossed_order_book(&mut book, &venue_deltas, &instrument)
2052                .expect("resolution should succeed");
2053
2054        // Equal-size branch must emit two Deletes (one per side) at top-of-book.
2055        let deletes_count = resolved
2056            .deltas
2057            .iter()
2058            .filter(|d| {
2059                d.action == BookAction::Delete
2060                    && (d.order.price.as_decimal() == dec!(100.10)
2061                        || d.order.price.as_decimal() == dec!(100.05))
2062            })
2063            .count();
2064        assert_eq!(deletes_count, 2);
2065    }
2066}