1use std::{
19 str::FromStr,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use anyhow::Context;
27use dashmap::DashMap;
28use futures_util::{Stream, StreamExt, pin_mut};
29use nautilus_common::{
30 clients::DataClient,
31 live::{runner::get_data_event_sender, runtime::get_runtime},
32 messages::{
33 DataEvent, DataResponse,
34 data::{
35 BarsResponse, FundingRatesResponse, InstrumentResponse, InstrumentsResponse,
36 RequestBars, RequestFundingRates, RequestInstrument, RequestInstruments, RequestTrades,
37 SubscribeBars, SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices,
38 SubscribeInstrument, SubscribeInstrumentStatus, SubscribeInstruments,
39 SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades, TradesResponse, UnsubscribeBars,
40 UnsubscribeBookDeltas, UnsubscribeFundingRates, UnsubscribeIndexPrices,
41 UnsubscribeInstrument, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
42 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
43 },
44 },
45};
46use nautilus_core::{
47 AtomicMap, AtomicSet,
48 datetime::datetime_to_unix_nanos,
49 time::{AtomicTime, get_atomic_clock_realtime},
50};
51use nautilus_model::{
52 data::{
53 Bar, BarSpecification, BarType, BookOrder, Data as NautilusData, FundingRateUpdate,
54 IndexPriceUpdate, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
55 OrderBookDeltas_API, QuoteTick,
56 },
57 enums::{BookAction, BookType, MarketStatusAction, OrderSide, RecordFlag},
58 identifiers::{ClientId, InstrumentId, Symbol, Venue},
59 instruments::{Instrument, InstrumentAny},
60 orderbook::OrderBook,
61 types::Quantity,
62};
63use rust_decimal::Decimal;
64use tokio::{task::JoinHandle, time::Duration};
65use tokio_util::sync::CancellationToken;
66use ustr::Ustr;
67
68use crate::{
69 common::{
70 consts::DYDX_VENUE,
71 enums::DydxCandleResolution,
72 instrument_cache::InstrumentCache,
73 parse::{extract_raw_symbol, parse_price},
74 },
75 config::DydxDataClientConfig,
76 http::client::DydxHttpClient,
77 websocket::{client::DydxWebSocketClient, enums::DydxWsOutputMessage, parse as ws_parse},
78};
79
80struct WsMessageContext {
81 clock: &'static AtomicTime,
82 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
83 instrument_cache: Arc<InstrumentCache>,
84 order_books: Arc<DashMap<InstrumentId, OrderBook>>,
85 last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
86 ws_client: DydxWebSocketClient,
87 http_client: DydxHttpClient,
88 active_quote_subs: Arc<AtomicSet<InstrumentId>>,
89 active_delta_subs: Arc<AtomicSet<InstrumentId>>,
90 active_trade_subs: Arc<AtomicSet<InstrumentId>>,
91 active_bar_subs: Arc<AtomicMap<(InstrumentId, String), BarType>>,
92 incomplete_bars: Arc<DashMap<BarType, Bar>>,
93 bar_type_mappings: Arc<AtomicMap<String, BarType>>,
94 active_mark_price_subs: Arc<AtomicSet<InstrumentId>>,
95 active_index_price_subs: Arc<AtomicSet<InstrumentId>>,
96 active_funding_rate_subs: Arc<AtomicSet<InstrumentId>>,
97 active_instrument_status_subs: Arc<AtomicSet<InstrumentId>>,
98 last_instrument_statuses: Arc<DashMap<InstrumentId, InstrumentStatus>>,
99 bars_timestamp_on_close: bool,
100 pending_bars: Arc<DashMap<String, Bar>>,
101 seen_tickers: Arc<AtomicSet<Ustr>>,
102}
103
104#[derive(Debug)]
112pub struct DydxDataClient {
113 clock: &'static AtomicTime,
114 client_id: ClientId,
115 config: DydxDataClientConfig,
116 http_client: DydxHttpClient,
117 ws_client: DydxWebSocketClient,
118 is_connected: AtomicBool,
119 cancellation_token: CancellationToken,
120 tasks: Vec<JoinHandle<()>>,
121 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
122 instrument_cache: Arc<InstrumentCache>,
123 order_books: Arc<DashMap<InstrumentId, OrderBook>>,
124 last_quotes: Arc<DashMap<InstrumentId, QuoteTick>>,
125 incomplete_bars: Arc<DashMap<BarType, Bar>>,
126 bar_type_mappings: Arc<AtomicMap<String, BarType>>,
127 active_quote_subs: Arc<AtomicSet<InstrumentId>>,
128 active_delta_subs: Arc<AtomicSet<InstrumentId>>,
129 active_trade_subs: Arc<AtomicSet<InstrumentId>>,
130 active_bar_subs: Arc<AtomicMap<(InstrumentId, String), BarType>>,
131 active_mark_price_subs: Arc<AtomicSet<InstrumentId>>,
132 active_index_price_subs: Arc<AtomicSet<InstrumentId>>,
133 active_funding_rate_subs: Arc<AtomicSet<InstrumentId>>,
134 active_instrument_status_subs: Arc<AtomicSet<InstrumentId>>,
135 last_instrument_statuses: Arc<DashMap<InstrumentId, InstrumentStatus>>,
136}
137
138impl DydxDataClient {
139 fn map_bar_spec_to_resolution(spec: &BarSpecification) -> anyhow::Result<&'static str> {
140 let resolution: &'static str = DydxCandleResolution::from_bar_spec(spec)?.into();
141 Ok(resolution)
142 }
143
144 pub fn new(
150 client_id: ClientId,
151 config: DydxDataClientConfig,
152 http_client: DydxHttpClient,
153 ws_client: DydxWebSocketClient,
154 ) -> anyhow::Result<Self> {
155 let clock = get_atomic_clock_realtime();
156 let data_sender = get_data_event_sender();
157
158 let instrument_cache = Arc::clone(http_client.instrument_cache());
159
160 Ok(Self {
161 clock,
162 client_id,
163 config,
164 http_client,
165 ws_client,
166 is_connected: AtomicBool::new(false),
167 cancellation_token: CancellationToken::new(),
168 tasks: Vec::new(),
169 data_sender,
170 instrument_cache,
171 order_books: Arc::new(DashMap::new()),
172 last_quotes: Arc::new(DashMap::new()),
173 incomplete_bars: Arc::new(DashMap::new()),
174 bar_type_mappings: Arc::new(AtomicMap::new()),
175 active_quote_subs: Arc::new(AtomicSet::new()),
176 active_delta_subs: Arc::new(AtomicSet::new()),
177 active_trade_subs: Arc::new(AtomicSet::new()),
178 active_bar_subs: Arc::new(AtomicMap::new()),
179 active_mark_price_subs: Arc::new(AtomicSet::new()),
180 active_index_price_subs: Arc::new(AtomicSet::new()),
181 active_funding_rate_subs: Arc::new(AtomicSet::new()),
182 active_instrument_status_subs: Arc::new(AtomicSet::new()),
183 last_instrument_statuses: Arc::new(DashMap::new()),
184 })
185 }
186
187 #[must_use]
189 pub fn venue(&self) -> Venue {
190 *DYDX_VENUE
191 }
192
193 #[must_use]
195 pub fn config(&self) -> &DydxDataClientConfig {
196 &self.config
197 }
198
199 #[must_use]
201 pub fn is_connected(&self) -> bool {
202 self.is_connected.load(Ordering::Relaxed)
203 }
204
205 fn spawn_ws<F>(&self, fut: F, context: &'static str)
206 where
207 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
208 {
209 get_runtime().spawn(async move {
210 if let Err(e) = fut.await {
211 log::error!("{context}: {e:?}");
212 }
213 });
214 }
215
216 fn spawn_ws_stream_handler(
217 &mut self,
218 stream: impl Stream<Item = DydxWsOutputMessage> + Send + 'static,
219 ctx: WsMessageContext,
220 ) {
221 let cancellation = self.cancellation_token.clone();
222
223 let handle = get_runtime().spawn(async move {
224 log::debug!("Message processing task started");
225 pin_mut!(stream);
226
227 loop {
228 tokio::select! {
229 maybe_msg = stream.next() => {
230 match maybe_msg {
231 Some(msg) => Self::handle_ws_message(msg, &ctx),
232 None => {
233 log::debug!("WebSocket message channel closed");
234 break;
235 }
236 }
237 }
238 () = cancellation.cancelled() => {
239 log::debug!("WebSocket message task cancelled");
240 break;
241 }
242 }
243 }
244 log::debug!("WebSocket stream handler ended");
245 });
246
247 self.tasks.push(handle);
248 }
249
250 async fn await_tasks_with_timeout(&mut self, timeout: Duration) {
251 for handle in self.tasks.drain(..) {
252 let _ = tokio::time::timeout(timeout, handle).await;
253 }
254 }
255
256 async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
257 self.http_client
258 .fetch_and_cache_instruments()
259 .await
260 .context("failed to load instruments from dYdX")?;
261
262 let instruments: Vec<InstrumentAny> = self.http_client.all_instruments();
263
264 if instruments.is_empty() {
265 log::warn!("No instruments were loaded");
266 return Ok(instruments);
267 }
268
269 log::info!("Loaded {} instruments into shared cache", instruments.len());
270
271 self.ws_client.cache_instruments(instruments.clone());
272
273 for instrument in &instruments {
274 if let Err(e) = self
275 .data_sender
276 .send(DataEvent::Instrument(instrument.clone()))
277 {
278 log::warn!("Failed to publish instrument {}: {e}", instrument.id());
279 }
280 }
281 log::debug!("Published {} instruments to data engine", instruments.len());
282
283 Ok(instruments)
284 }
285}
286
287#[async_trait::async_trait(?Send)]
288impl DataClient for DydxDataClient {
289 fn client_id(&self) -> ClientId {
290 self.client_id
291 }
292
293 fn venue(&self) -> Option<Venue> {
294 Some(*DYDX_VENUE)
295 }
296
297 fn start(&mut self) -> anyhow::Result<()> {
298 log::info!(
299 "Starting: client_id={}, is_testnet={}",
300 self.client_id,
301 self.http_client.is_testnet()
302 );
303 Ok(())
304 }
305
306 fn stop(&mut self) -> anyhow::Result<()> {
307 log::info!("Stopping {}", self.client_id);
308 self.cancellation_token.cancel();
309 self.is_connected.store(false, Ordering::Relaxed);
310 Ok(())
311 }
312
313 fn reset(&mut self) -> anyhow::Result<()> {
314 log::debug!("Resetting {}", self.client_id);
315 self.is_connected.store(false, Ordering::Relaxed);
316 self.cancellation_token = CancellationToken::new();
317 for handle in self.tasks.drain(..) {
319 handle.abort();
320 }
321 Ok(())
322 }
323
324 fn dispose(&mut self) -> anyhow::Result<()> {
325 log::debug!("Disposing {}", self.client_id);
326 self.stop()
327 }
328
329 async fn connect(&mut self) -> anyhow::Result<()> {
330 if self.is_connected() {
331 return Ok(());
332 }
333
334 log::info!("Connecting");
335
336 self.bootstrap_instruments().await?;
337
338 self.ws_client
339 .connect()
340 .await
341 .context("failed to connect dYdX websocket")?;
342
343 self.ws_client
344 .subscribe_markets()
345 .await
346 .context("failed to subscribe to markets channel")?;
347
348 let seen_tickers: Arc<AtomicSet<Ustr>> = Arc::new(AtomicSet::new());
349
350 for instrument in self.instrument_cache.all_instruments() {
351 let id = instrument.id();
352 let ticker = extract_raw_symbol(id.symbol.as_str());
353 seen_tickers.insert(Ustr::from(ticker));
354 }
355
356 let ctx = WsMessageContext {
357 clock: self.clock,
358 data_sender: self.data_sender.clone(),
359 instrument_cache: self.instrument_cache.clone(),
360 order_books: self.order_books.clone(),
361 last_quotes: self.last_quotes.clone(),
362 ws_client: self.ws_client.clone(),
363 http_client: self.http_client.clone(),
364 active_quote_subs: self.active_quote_subs.clone(),
365 active_delta_subs: self.active_delta_subs.clone(),
366 active_trade_subs: self.active_trade_subs.clone(),
367 active_bar_subs: self.active_bar_subs.clone(),
368 incomplete_bars: self.incomplete_bars.clone(),
369 bar_type_mappings: self.bar_type_mappings.clone(),
370 active_mark_price_subs: self.active_mark_price_subs.clone(),
371 active_index_price_subs: self.active_index_price_subs.clone(),
372 active_funding_rate_subs: self.active_funding_rate_subs.clone(),
373 active_instrument_status_subs: self.active_instrument_status_subs.clone(),
374 last_instrument_statuses: self.last_instrument_statuses.clone(),
375 bars_timestamp_on_close: self.ws_client.bars_timestamp_on_close(),
376 pending_bars: Arc::new(DashMap::new()),
377 seen_tickers,
378 };
379
380 let stream = self.ws_client.stream();
381 self.spawn_ws_stream_handler(stream, ctx);
382
383 self.is_connected.store(true, Ordering::Relaxed);
384 log::info!("Connected");
385
386 Ok(())
387 }
388
389 async fn disconnect(&mut self) -> anyhow::Result<()> {
390 if !self.is_connected() {
391 return Ok(());
392 }
393
394 log::info!("Disconnecting");
395
396 self.cancellation_token.cancel();
397
398 self.await_tasks_with_timeout(Duration::from_secs(5)).await;
399
400 self.ws_client
401 .disconnect()
402 .await
403 .context("failed to disconnect dYdX websocket")?;
404
405 self.last_instrument_statuses.clear();
406 self.is_connected.store(false, Ordering::Relaxed);
407 log::info!("Disconnected dYdX data client");
408
409 Ok(())
410 }
411
412 fn is_connected(&self) -> bool {
413 self.is_connected.load(Ordering::Relaxed)
414 }
415
416 fn is_disconnected(&self) -> bool {
417 !self.is_connected()
418 }
419
420 fn subscribe_instruments(&mut self, _cmd: SubscribeInstruments) -> anyhow::Result<()> {
421 log::debug!(
422 "subscribe_instruments: dYdX instruments discovered via global v4_markets channel"
423 );
424 Ok(())
425 }
426
427 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
428 if let Some(instrument) = self.instrument_cache.get(&cmd.instrument_id) {
429 log::debug!("Sending cached instrument for {}", cmd.instrument_id);
430 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
431 log::warn!("Failed to send instrument {}: {e}", cmd.instrument_id);
432 }
433 } else {
434 log::warn!(
435 "Instrument {} not found in cache (available: {})",
436 cmd.instrument_id,
437 self.instrument_cache.len()
438 );
439 }
440 Ok(())
441 }
442
443 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
444 if cmd.book_type != BookType::L2_MBP {
445 anyhow::bail!(
446 "dYdX only supports L2_MBP order book deltas, received {:?}",
447 cmd.book_type
448 );
449 }
450
451 self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
452 self.active_delta_subs.insert(cmd.instrument_id);
453
454 let ws = self.ws_client.clone();
455 let instrument_id = cmd.instrument_id;
456
457 self.spawn_ws(
458 async move {
459 ws.subscribe_orderbook(instrument_id)
460 .await
461 .context("orderbook subscription")
462 },
463 "dYdX orderbook subscription",
464 );
465
466 Ok(())
467 }
468
469 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
470 log::debug!(
471 "Subscribe_quotes for {}: subscribing to orderbook WS channel for quote synthesis",
472 cmd.instrument_id
473 );
474
475 self.ensure_order_book(cmd.instrument_id, BookType::L2_MBP);
476 self.active_quote_subs.insert(cmd.instrument_id);
477 let ws = self.ws_client.clone();
478 let instrument_id = cmd.instrument_id;
479
480 self.spawn_ws(
481 async move {
482 ws.subscribe_orderbook(instrument_id)
483 .await
484 .context("orderbook subscription (for quotes)")
485 },
486 "dYdX orderbook subscription (quotes)",
487 );
488
489 Ok(())
490 }
491
492 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
493 let ws = self.ws_client.clone();
494 let instrument_id = cmd.instrument_id;
495
496 self.active_trade_subs.insert(instrument_id);
497
498 self.spawn_ws(
499 async move {
500 ws.subscribe_trades(instrument_id)
501 .await
502 .context("trade subscription")
503 },
504 "dYdX trade subscription",
505 );
506
507 Ok(())
508 }
509
510 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
511 let instrument_id = cmd.instrument_id;
512 self.active_mark_price_subs.insert(instrument_id);
513 log::info!("Subscribed to mark prices for {instrument_id} (via v4_markets channel)");
514 Ok(())
515 }
516
517 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
518 let instrument_id = cmd.instrument_id;
519 self.active_index_price_subs.insert(instrument_id);
520 log::info!("Subscribed to index prices for {instrument_id} (via v4_markets channel)");
521 Ok(())
522 }
523
524 fn subscribe_bars(&mut self, cmd: SubscribeBars) -> anyhow::Result<()> {
525 let ws = self.ws_client.clone();
526 let instrument_id = cmd.bar_type.instrument_id();
527 let spec = cmd.bar_type.spec();
528
529 let resolution = Self::map_bar_spec_to_resolution(&spec)?;
530 let bar_type = cmd.bar_type;
531 self.active_bar_subs
532 .insert((instrument_id, resolution.to_string()), bar_type);
533
534 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
535 let topic = format!("{ticker}/{resolution}");
536 self.bar_type_mappings.insert(topic, bar_type);
537
538 self.spawn_ws(
539 async move {
540 ws.subscribe_candles(instrument_id, resolution)
541 .await
542 .context("candles subscription")
543 },
544 "dYdX candles subscription",
545 );
546
547 Ok(())
548 }
549
550 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
551 let instrument_id = cmd.instrument_id;
552 self.active_funding_rate_subs.insert(instrument_id);
553 log::info!("Subscribed to funding rates for {instrument_id} (via v4_markets channel)");
554 Ok(())
555 }
556
557 fn subscribe_instrument_status(
558 &mut self,
559 cmd: SubscribeInstrumentStatus,
560 ) -> anyhow::Result<()> {
561 let instrument_id = cmd.instrument_id;
562 self.active_instrument_status_subs.insert(instrument_id);
563 log::info!("Subscribed to instrument status for {instrument_id} (via v4_markets channel)");
564
565 if let Some(status) = self.last_instrument_statuses.get(&instrument_id)
567 && let Err(e) = self.data_sender.send(DataEvent::InstrumentStatus(*status))
568 {
569 log::error!("Failed to replay instrument status for {instrument_id}: {e}");
570 }
571
572 Ok(())
573 }
574
575 fn unsubscribe_instruments(&mut self, _cmd: &UnsubscribeInstruments) -> anyhow::Result<()> {
576 log::debug!("unsubscribe_instruments: dYdX markets channel is global; no-op");
577 Ok(())
578 }
579
580 fn unsubscribe_instrument(&mut self, _cmd: &UnsubscribeInstrument) -> anyhow::Result<()> {
581 log::debug!("unsubscribe_instrument: dYdX markets channel is global; no-op");
582 Ok(())
583 }
584
585 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
586 self.active_delta_subs.remove(&cmd.instrument_id);
587
588 let ws = self.ws_client.clone();
589 let instrument_id = cmd.instrument_id;
590
591 self.spawn_ws(
592 async move {
593 ws.unsubscribe_orderbook(instrument_id)
594 .await
595 .context("orderbook unsubscription")
596 },
597 "dYdX orderbook unsubscription",
598 );
599
600 Ok(())
601 }
602
603 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
604 log::debug!(
605 "unsubscribe_quotes for {}: removing quote subscription",
606 cmd.instrument_id
607 );
608
609 self.active_quote_subs.remove(&cmd.instrument_id);
610
611 let ws = self.ws_client.clone();
612 let instrument_id = cmd.instrument_id;
613
614 self.spawn_ws(
615 async move {
616 ws.unsubscribe_orderbook(instrument_id)
617 .await
618 .context("orderbook unsubscription (for quotes)")
619 },
620 "dYdX orderbook unsubscription (quotes)",
621 );
622
623 Ok(())
624 }
625
626 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
627 self.active_trade_subs.remove(&cmd.instrument_id);
628
629 let ws = self.ws_client.clone();
630 let instrument_id = cmd.instrument_id;
631
632 self.spawn_ws(
633 async move {
634 ws.unsubscribe_trades(instrument_id)
635 .await
636 .context("trade unsubscription")
637 },
638 "dYdX trade unsubscription",
639 );
640
641 Ok(())
642 }
643
644 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
645 self.active_mark_price_subs.remove(&cmd.instrument_id);
646 log::info!("Unsubscribed from mark prices for {}", cmd.instrument_id);
647 Ok(())
648 }
649
650 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
651 self.active_index_price_subs.remove(&cmd.instrument_id);
652 log::info!("Unsubscribed from index prices for {}", cmd.instrument_id);
653 Ok(())
654 }
655
656 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
657 let ws = self.ws_client.clone();
658 let instrument_id = cmd.bar_type.instrument_id();
659 let spec = cmd.bar_type.spec();
660
661 let resolution = Self::map_bar_spec_to_resolution(&spec)?;
662
663 self.active_bar_subs
664 .remove(&(instrument_id, resolution.to_string()));
665
666 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
667 let topic = format!("{ticker}/{resolution}");
668 self.bar_type_mappings.remove(&topic);
669
670 self.spawn_ws(
671 async move {
672 ws.unsubscribe_candles(instrument_id, resolution)
673 .await
674 .context("candles unsubscription")
675 },
676 "dYdX candles unsubscription",
677 );
678
679 Ok(())
680 }
681
682 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
683 self.active_funding_rate_subs.remove(&cmd.instrument_id);
684 log::info!("Unsubscribed from funding rates for {}", cmd.instrument_id);
685 Ok(())
686 }
687
688 fn unsubscribe_instrument_status(
689 &mut self,
690 cmd: &UnsubscribeInstrumentStatus,
691 ) -> anyhow::Result<()> {
692 self.active_instrument_status_subs
693 .remove(&cmd.instrument_id);
694 log::info!(
695 "Unsubscribed from instrument status for {}",
696 cmd.instrument_id
697 );
698 Ok(())
699 }
700
701 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
702 if request.start.is_some() {
703 log::warn!(
704 "Requesting instrument {} with specified `start` which has no effect",
705 request.instrument_id
706 );
707 }
708
709 if request.end.is_some() {
710 log::warn!(
711 "Requesting instrument {} with specified `end` which has no effect",
712 request.instrument_id
713 );
714 }
715
716 let instrument_cache = self.instrument_cache.clone();
717 let sender = self.data_sender.clone();
718 let http = self.http_client.clone();
719 let instrument_id = request.instrument_id;
720 let request_id = request.request_id;
721 let client_id = request.client_id.unwrap_or(self.client_id);
722 let start = request.start;
723 let end = request.end;
724 let params = request.params;
725 let clock = self.clock;
726 let start_nanos = datetime_to_unix_nanos(start);
727 let end_nanos = datetime_to_unix_nanos(end);
728
729 get_runtime().spawn(async move {
730 let instrument = match http.request_instruments(None, None, None).await {
731 Ok(instruments) => {
732 for inst in &instruments {
733 instrument_cache.insert_instrument_only(inst.clone());
734 }
735 instruments.into_iter().find(|i| i.id() == instrument_id)
736 }
737 Err(e) => {
738 log::error!("Failed to fetch instruments from dYdX: {e:?}");
739 None
740 }
741 };
742
743 if let Some(inst) = instrument {
744 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
745 request_id,
746 client_id,
747 instrument_id,
748 inst,
749 start_nanos,
750 end_nanos,
751 clock.get_time_ns(),
752 params,
753 )));
754
755 if let Err(e) = sender.send(DataEvent::Response(response)) {
756 log::error!("Failed to send instrument response: {e}");
757 }
758 } else {
759 log::error!("Instrument {instrument_id} not found");
760 }
761 });
762
763 Ok(())
764 }
765
766 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
767 let http = self.http_client.clone();
768 let sender = self.data_sender.clone();
769 let instrument_cache = self.instrument_cache.clone();
770 let request_id = request.request_id;
771 let client_id = request.client_id.unwrap_or(self.client_id);
772 let venue = self.venue();
773 let start = request.start;
774 let end = request.end;
775 let params = request.params;
776 let clock = self.clock;
777 let start_nanos = datetime_to_unix_nanos(start);
778 let end_nanos = datetime_to_unix_nanos(end);
779
780 get_runtime().spawn(async move {
781 match http.request_instruments(None, None, None).await {
782 Ok(instruments) => {
783 log::info!("Fetched {} instruments from dYdX", instruments.len());
784
785 for instrument in &instruments {
786 instrument_cache.insert_instrument_only(instrument.clone());
787 }
788
789 let response = DataResponse::Instruments(InstrumentsResponse::new(
790 request_id,
791 client_id,
792 venue,
793 instruments,
794 start_nanos,
795 end_nanos,
796 clock.get_time_ns(),
797 params,
798 ));
799
800 if let Err(e) = sender.send(DataEvent::Response(response)) {
801 log::error!("Failed to send instruments response: {e}");
802 }
803 }
804 Err(e) => {
805 log::error!("Failed to fetch instruments from dYdX: {e:?}");
806
807 let response = DataResponse::Instruments(InstrumentsResponse::new(
808 request_id,
809 client_id,
810 venue,
811 Vec::new(),
812 start_nanos,
813 end_nanos,
814 clock.get_time_ns(),
815 params,
816 ));
817
818 if let Err(e) = sender.send(DataEvent::Response(response)) {
819 log::error!("Failed to send empty instruments response: {e}");
820 }
821 }
822 }
823 });
824
825 Ok(())
826 }
827
828 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
829 let http_client = self.http_client.clone();
830 let sender = self.data_sender.clone();
831 let instrument_id = request.instrument_id;
832 let start = request.start;
833 let end = request.end;
834 let limit = request.limit.map(|n| n.get() as u32);
835 let request_id = request.request_id;
836 let client_id = request.client_id.unwrap_or(self.client_id);
837 let params = request.params;
838 let clock = self.clock;
839 let start_nanos = datetime_to_unix_nanos(start);
840 let end_nanos = datetime_to_unix_nanos(end);
841
842 get_runtime().spawn(async move {
843 match http_client
844 .request_trade_ticks(instrument_id, start, end, limit)
845 .await
846 .context("failed to request trades from dYdX")
847 {
848 Ok(trades) => {
849 let response = DataResponse::Trades(TradesResponse::new(
850 request_id,
851 client_id,
852 instrument_id,
853 trades,
854 start_nanos,
855 end_nanos,
856 clock.get_time_ns(),
857 params,
858 ));
859
860 if let Err(e) = sender.send(DataEvent::Response(response)) {
861 log::error!("Failed to send trades response: {e}");
862 }
863 }
864 Err(e) => log::error!("Trade request failed for {instrument_id}: {e:?}"),
865 }
866 });
867
868 Ok(())
869 }
870
871 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
872 let http_client = self.http_client.clone();
873 let sender = self.data_sender.clone();
874 let bar_type = request.bar_type;
875 let start = request.start;
876 let end = request.end;
877 let limit = request.limit.map(|n| n.get() as u32);
878 let request_id = request.request_id;
879 let client_id = request.client_id.unwrap_or(self.client_id);
880 let params = request.params;
881 let clock = self.clock;
882 let start_nanos = datetime_to_unix_nanos(start);
883 let end_nanos = datetime_to_unix_nanos(end);
884
885 get_runtime().spawn(async move {
886 match http_client
887 .request_bars(bar_type, start, end, limit, true)
888 .await
889 .context("failed to request bars from dYdX")
890 {
891 Ok(bars) => {
892 let response = DataResponse::Bars(BarsResponse::new(
893 request_id,
894 client_id,
895 bar_type,
896 bars,
897 start_nanos,
898 end_nanos,
899 clock.get_time_ns(),
900 params,
901 ));
902
903 if let Err(e) = sender.send(DataEvent::Response(response)) {
904 log::error!("Failed to send bars response: {e}");
905 }
906 }
907 Err(e) => log::error!("Bar request failed for {bar_type}: {e:?}"),
908 }
909 });
910
911 Ok(())
912 }
913
914 fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
915 let http_client = self.http_client.clone();
916 let sender = self.data_sender.clone();
917 let instrument_id = request.instrument_id;
918 let start = request.start;
919 let end = request.end;
920 let limit = request.limit.map(|n| n.get() as u32);
921 let request_id = request.request_id;
922 let client_id = request.client_id.unwrap_or(self.client_id);
923 let params = request.params;
924 let clock = self.clock;
925 let start_nanos = datetime_to_unix_nanos(start);
926 let end_nanos = datetime_to_unix_nanos(end);
927
928 get_runtime().spawn(async move {
929 match http_client
930 .request_funding_rates(instrument_id, start, end, limit)
931 .await
932 .context("failed to request funding rates from dYdX")
933 {
934 Ok(funding_rates) => {
935 let response = DataResponse::FundingRates(FundingRatesResponse::new(
936 request_id,
937 client_id,
938 instrument_id,
939 funding_rates,
940 start_nanos,
941 end_nanos,
942 clock.get_time_ns(),
943 params,
944 ));
945
946 if let Err(e) = sender.send(DataEvent::Response(response)) {
947 log::error!("Failed to send funding rates response: {e}");
948 }
949 }
950 Err(e) => {
951 log::error!("Funding rates request failed for {instrument_id}: {e:?}");
952
953 let response = DataResponse::FundingRates(FundingRatesResponse::new(
954 request_id,
955 client_id,
956 instrument_id,
957 Vec::new(),
958 start_nanos,
959 end_nanos,
960 clock.get_time_ns(),
961 params,
962 ));
963
964 if let Err(e) = sender.send(DataEvent::Response(response)) {
965 log::error!("Failed to send empty funding rates response: {e}");
966 }
967 }
968 }
969 });
970
971 Ok(())
972 }
973}
974
975impl DydxDataClient {
976 #[must_use]
978 pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
979 self.instrument_cache.get(instrument_id)
980 }
981
982 #[must_use]
984 pub fn get_instruments(&self) -> Vec<InstrumentAny> {
985 self.instrument_cache.all_instruments()
986 }
987
988 pub fn cache_instrument(&self, instrument: InstrumentAny) {
990 self.instrument_cache.insert_instrument_only(instrument);
991 }
992
993 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
997 self.instrument_cache.clear();
998 self.instrument_cache.insert_instruments_only(instruments);
999 }
1000
1001 fn ensure_order_book(&self, instrument_id: InstrumentId, book_type: BookType) {
1002 self.order_books
1003 .entry(instrument_id)
1004 .or_insert_with(|| OrderBook::new(instrument_id, book_type));
1005 }
1006
1007 #[must_use]
1009 pub fn get_bar_type_for_topic(&self, topic: &str) -> Option<BarType> {
1010 self.bar_type_mappings.load().get(topic).copied()
1011 }
1012
1013 #[must_use]
1015 pub fn get_bar_topics(&self) -> Vec<String> {
1016 self.bar_type_mappings.load().keys().cloned().collect()
1017 }
1018
1019 fn handle_ws_message(message: DydxWsOutputMessage, ctx: &WsMessageContext) {
1020 let ts_init = ctx.clock.get_time_ns();
1021
1022 match message {
1023 DydxWsOutputMessage::Trades { id, contents } => {
1024 let Some(instrument) = ctx.instrument_cache.get_by_market(&id) else {
1025 log::warn!("No instrument cached for market {id}");
1026 return;
1027 };
1028 let instrument_id = instrument.id();
1029
1030 match ws_parse::parse_trade_ticks(instrument_id, &instrument, &contents, ts_init) {
1031 Ok(data) => {
1032 Self::handle_data_message(
1033 data,
1034 &ctx.data_sender,
1035 &ctx.incomplete_bars,
1036 ctx.clock,
1037 );
1038 }
1039 Err(e) => log::error!("Failed to parse trade ticks for {id}: {e}"),
1040 }
1041 }
1042 DydxWsOutputMessage::OrderbookSnapshot { id, contents } => {
1043 let Some(instrument) = ctx.instrument_cache.get_by_market(&id) else {
1044 log::warn!("No instrument cached for market {id}");
1045 return;
1046 };
1047 let instrument_id = instrument.id();
1048
1049 match ws_parse::parse_orderbook_snapshot(
1050 &instrument_id,
1051 &contents,
1052 instrument.price_precision(),
1053 instrument.size_precision(),
1054 ts_init,
1055 ) {
1056 Ok(deltas) => {
1057 Self::handle_deltas_message(
1058 deltas,
1059 &ctx.data_sender,
1060 &ctx.order_books,
1061 &ctx.last_quotes,
1062 &ctx.instrument_cache,
1063 &ctx.active_quote_subs,
1064 &ctx.active_delta_subs,
1065 );
1066 }
1067 Err(e) => log::error!("Failed to parse orderbook snapshot for {id}: {e}"),
1068 }
1069 }
1070 DydxWsOutputMessage::OrderbookUpdate { id, contents } => {
1071 let Some(instrument) = ctx.instrument_cache.get_by_market(&id) else {
1072 log::warn!("No instrument cached for market {id}");
1073 return;
1074 };
1075 let instrument_id = instrument.id();
1076
1077 match ws_parse::parse_orderbook_deltas(
1078 &instrument_id,
1079 &contents,
1080 instrument.price_precision(),
1081 instrument.size_precision(),
1082 ts_init,
1083 ) {
1084 Ok(deltas) => {
1085 Self::handle_deltas_message(
1086 deltas,
1087 &ctx.data_sender,
1088 &ctx.order_books,
1089 &ctx.last_quotes,
1090 &ctx.instrument_cache,
1091 &ctx.active_quote_subs,
1092 &ctx.active_delta_subs,
1093 );
1094 }
1095 Err(e) => log::error!("Failed to parse orderbook deltas for {id}: {e}"),
1096 }
1097 }
1098 DydxWsOutputMessage::OrderbookBatch { id, updates } => {
1099 let Some(instrument) = ctx.instrument_cache.get_by_market(&id) else {
1100 log::warn!("No instrument cached for market {id}");
1101 return;
1102 };
1103 let instrument_id = instrument.id();
1104 let price_precision = instrument.price_precision();
1105 let size_precision = instrument.size_precision();
1106
1107 let mut all_deltas = Vec::new();
1108 let last_idx = updates.len().saturating_sub(1);
1109
1110 for (i, update) in updates.iter().enumerate() {
1111 let is_last = i == last_idx;
1112 let result = if is_last {
1113 ws_parse::parse_orderbook_deltas(
1114 &instrument_id,
1115 update,
1116 price_precision,
1117 size_precision,
1118 ts_init,
1119 )
1120 .map(|d| d.deltas)
1121 } else {
1122 ws_parse::parse_orderbook_deltas_with_flag(
1123 &instrument_id,
1124 update,
1125 price_precision,
1126 size_precision,
1127 ts_init,
1128 false,
1129 )
1130 };
1131
1132 match result {
1133 Ok(deltas) => all_deltas.extend(deltas),
1134 Err(e) => {
1135 log::error!("Failed to parse orderbook batch delta {i} for {id}: {e}");
1136 return;
1137 }
1138 }
1139 }
1140
1141 if all_deltas.is_empty() {
1142 return;
1143 }
1144 let deltas = OrderBookDeltas::new(instrument_id, all_deltas);
1145 Self::handle_deltas_message(
1146 deltas,
1147 &ctx.data_sender,
1148 &ctx.order_books,
1149 &ctx.last_quotes,
1150 &ctx.instrument_cache,
1151 &ctx.active_quote_subs,
1152 &ctx.active_delta_subs,
1153 );
1154 }
1155 DydxWsOutputMessage::Candles { id, contents } => {
1156 let parts: Vec<&str> = id.splitn(2, '/').collect();
1157 if parts.len() != 2 {
1158 log::warn!("Unexpected candle topic format: {id}");
1159 return;
1160 }
1161 let ticker = parts[0];
1162
1163 let Some(bar_type) = ctx.bar_type_mappings.load().get(&id).copied() else {
1164 log::debug!("No bar type mapping for candle topic {id}");
1165 return;
1166 };
1167
1168 let Some(instrument) = ctx.instrument_cache.get_by_market(ticker) else {
1169 log::warn!("No instrument cached for market {ticker}");
1170 return;
1171 };
1172
1173 match ws_parse::parse_candle_bar(
1174 bar_type,
1175 &instrument,
1176 &contents,
1177 ctx.bars_timestamp_on_close,
1178 ts_init,
1179 ) {
1180 Ok(bar) => {
1181 let prev = ctx.pending_bars.get(&id).map(|r| *r);
1182 if let Some(prev_bar) = prev
1183 && bar.ts_event != prev_bar.ts_event
1184 {
1185 Self::emit_bar_guarded(prev_bar, ctx);
1186 }
1187 ctx.pending_bars.insert(id, bar);
1188 }
1189 Err(e) => log::error!("Failed to parse candle bar for {id}: {e}"),
1190 }
1191 }
1192 DydxWsOutputMessage::Markets(contents) => {
1193 Self::handle_markets_message(&contents, ctx, ts_init);
1194 }
1195 DydxWsOutputMessage::SubaccountSubscribed(_) => {
1196 log::debug!("Ignoring subaccount subscribed on data client");
1197 }
1198 DydxWsOutputMessage::SubaccountsChannelData(_) => {
1199 log::debug!("Ignoring subaccounts channel data on data client");
1200 }
1201 DydxWsOutputMessage::BlockHeight { .. } => {
1202 log::debug!("Ignoring block height on data client");
1203 }
1204 DydxWsOutputMessage::Error(err) => {
1205 log::error!("dYdX WS error: {err}");
1206 }
1207 DydxWsOutputMessage::Reconnected => {
1208 log::info!("dYdX WS reconnected, re-subscribing to active subscriptions");
1209 ctx.pending_bars.clear();
1210
1211 let total_subs = ctx.active_quote_subs.len()
1212 + ctx.active_delta_subs.len()
1213 + ctx.active_trade_subs.len()
1214 + ctx.active_bar_subs.len();
1215
1216 if total_subs == 0 {
1217 log::debug!("No active subscriptions to restore");
1218 return;
1219 }
1220
1221 log::info!(
1222 "Restoring {} subscriptions (quotes={}, deltas={}, trades={}, bars={})",
1223 total_subs,
1224 ctx.active_quote_subs.len(),
1225 ctx.active_delta_subs.len(),
1226 ctx.active_trade_subs.len(),
1227 ctx.active_bar_subs.len()
1228 );
1229
1230 for instrument_id in ctx.active_quote_subs.load().iter().copied() {
1231 let ws_clone = ctx.ws_client.clone();
1232 get_runtime().spawn(async move {
1233 if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1234 log::error!(
1235 "Failed to re-subscribe to orderbook (quotes) for {instrument_id}: {e:?}"
1236 );
1237 } else {
1238 log::debug!("Re-subscribed to orderbook (quotes) for {instrument_id}");
1239 }
1240 });
1241 }
1242
1243 for instrument_id in ctx.active_delta_subs.load().iter().copied() {
1244 let ws_clone = ctx.ws_client.clone();
1245 get_runtime().spawn(async move {
1246 if let Err(e) = ws_clone.subscribe_orderbook(instrument_id).await {
1247 log::error!(
1248 "Failed to re-subscribe to orderbook (deltas) for {instrument_id}: {e:?}"
1249 );
1250 } else {
1251 log::debug!("Re-subscribed to orderbook (deltas) for {instrument_id}");
1252 }
1253 });
1254 }
1255
1256 for instrument_id in ctx.active_trade_subs.load().iter().copied() {
1257 let ws_clone = ctx.ws_client.clone();
1258 get_runtime().spawn(async move {
1259 if let Err(e) = ws_clone.subscribe_trades(instrument_id).await {
1260 log::error!(
1261 "Failed to re-subscribe to trades for {instrument_id}: {e:?}"
1262 );
1263 } else {
1264 log::debug!("Re-subscribed to trades for {instrument_id}");
1265 }
1266 });
1267 }
1268
1269 for ((instrument_id, resolution), _) in ctx.active_bar_subs.load().iter() {
1270 let instrument_id = *instrument_id;
1271 let resolution = resolution.clone();
1272 let ws_clone = ctx.ws_client.clone();
1273
1274 get_runtime().spawn(async move {
1275 if let Err(e) =
1276 ws_clone.subscribe_candles(instrument_id, &resolution).await
1277 {
1278 log::error!(
1279 "Failed to re-subscribe to candles for {instrument_id} ({resolution}): {e:?}"
1280 );
1281 } else {
1282 log::debug!(
1283 "Re-subscribed to candles for {instrument_id} ({resolution})"
1284 );
1285 }
1286 });
1287 }
1288
1289 log::info!("Completed re-subscription requests after reconnection");
1290 }
1291 }
1292 }
1293
1294 fn instrument_id_from_ticker(ticker: &str) -> InstrumentId {
1295 let symbol = format!("{ticker}-PERP");
1296 InstrumentId::new(Symbol::new(&symbol), *DYDX_VENUE)
1297 }
1298
1299 fn handle_markets_message(
1300 contents: &crate::websocket::messages::DydxMarketsContents,
1301 ctx: &WsMessageContext,
1302 ts_init: nautilus_core::UnixNanos,
1303 ) {
1304 if let Some(ref oracle_prices) = contents.oracle_prices {
1305 for (ticker, oracle_data) in oracle_prices {
1306 let instrument_id = Self::instrument_id_from_ticker(ticker);
1307
1308 let Ok(price) = parse_price(&oracle_data.oracle_price, "oracle_price") else {
1309 log::warn!("Failed to parse oracle price for {ticker}");
1310 continue;
1311 };
1312
1313 if ctx.active_mark_price_subs.contains(&instrument_id) {
1314 let mark_price = MarkPriceUpdate::new(instrument_id, price, ts_init, ts_init);
1315 let data = NautilusData::MarkPriceUpdate(mark_price);
1316 if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
1317 log::error!("Failed to emit mark price for {instrument_id}: {e}");
1318 }
1319 }
1320
1321 if ctx.active_index_price_subs.contains(&instrument_id) {
1322 let index_price = IndexPriceUpdate::new(instrument_id, price, ts_init, ts_init);
1323 let data = NautilusData::IndexPriceUpdate(index_price);
1324 if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
1325 log::error!("Failed to emit index price for {instrument_id}: {e}");
1326 }
1327 }
1328 }
1329 }
1330
1331 Self::handle_markets_trading_data(contents.trading.as_ref(), ctx, ts_init, false);
1332 Self::handle_markets_trading_data(contents.markets.as_ref(), ctx, ts_init, true);
1333 }
1334
1335 fn handle_markets_trading_data(
1336 trading: Option<
1337 &std::collections::HashMap<String, crate::websocket::messages::DydxMarketTradingUpdate>,
1338 >,
1339 ctx: &WsMessageContext,
1340 ts_init: nautilus_core::UnixNanos,
1341 is_snapshot: bool,
1342 ) {
1343 let Some(trading_map) = trading else {
1344 return;
1345 };
1346
1347 for (ticker, update) in trading_map {
1348 let instrument_id = Self::instrument_id_from_ticker(ticker);
1349
1350 if let Some(status) = &update.status {
1351 let action = MarketStatusAction::from(*status);
1352 let is_trading = matches!(status, crate::common::enums::DydxMarketStatus::Active);
1353
1354 let instrument_status = InstrumentStatus::new(
1355 instrument_id,
1356 action,
1357 ts_init,
1358 ts_init,
1359 None,
1360 None,
1361 Some(is_trading),
1362 None,
1363 None,
1364 );
1365
1366 ctx.last_instrument_statuses
1367 .insert(instrument_id, instrument_status);
1368
1369 if ctx.active_instrument_status_subs.contains(&instrument_id)
1370 && let Err(e) = ctx
1371 .data_sender
1372 .send(DataEvent::InstrumentStatus(instrument_status))
1373 {
1374 log::error!("Failed to emit instrument status for {instrument_id}: {e}");
1375 }
1376 }
1377
1378 let ticker_ustr = Ustr::from(ticker.as_str());
1379 if !ctx.seen_tickers.contains(&ticker_ustr) {
1380 let is_active = update
1381 .status
1382 .as_ref()
1383 .is_none_or(|s| matches!(s, crate::common::enums::DydxMarketStatus::Active));
1384 if ctx.instrument_cache.get_by_market(ticker).is_some() {
1385 ctx.seen_tickers.insert(ticker_ustr);
1386 } else if is_active {
1387 ctx.seen_tickers.insert(ticker_ustr);
1388 Self::handle_new_instrument_discovered(ticker, ctx);
1389 }
1390 }
1391
1392 if let Some(ref rate_str) = update.next_funding_rate {
1393 if let Ok(rate) = Decimal::from_str(rate_str) {
1394 if ctx.active_funding_rate_subs.contains(&instrument_id) {
1395 let funding_rate = FundingRateUpdate {
1396 instrument_id,
1397 rate,
1398 interval: Some(60),
1399 next_funding_ns: None,
1400 ts_event: ts_init,
1401 ts_init,
1402 };
1403
1404 if let Err(e) = ctx.data_sender.send(DataEvent::FundingRate(funding_rate)) {
1405 log::error!("Failed to emit funding rate for {instrument_id}: {e}");
1406 }
1407 }
1408 } else {
1409 log::warn!("Failed to parse next_funding_rate for {ticker}: {rate_str}");
1410 }
1411 }
1412
1413 if is_snapshot
1414 && let Some(ref oracle_price_str) = update.oracle_price
1415 && let Ok(price) = parse_price(oracle_price_str, "oracle_price")
1416 {
1417 if ctx.active_mark_price_subs.contains(&instrument_id) {
1418 let mark_price = MarkPriceUpdate::new(instrument_id, price, ts_init, ts_init);
1419 let data = NautilusData::MarkPriceUpdate(mark_price);
1420
1421 if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
1422 log::error!("Failed to emit mark price for {instrument_id}: {e}");
1423 }
1424 }
1425
1426 if ctx.active_index_price_subs.contains(&instrument_id) {
1427 let index_price = IndexPriceUpdate::new(instrument_id, price, ts_init, ts_init);
1428 let data = NautilusData::IndexPriceUpdate(index_price);
1429
1430 if let Err(e) = ctx.data_sender.send(DataEvent::Data(data)) {
1431 log::error!("Failed to emit index price for {instrument_id}: {e}");
1432 }
1433 }
1434 }
1435 }
1436 }
1437
1438 fn emit_bar_guarded(bar: Bar, ctx: &WsMessageContext) {
1439 let current_time_ns = ctx.clock.get_time_ns();
1440 if bar.ts_event <= current_time_ns {
1441 ctx.incomplete_bars.remove(&bar.bar_type);
1442 if let Err(e) = ctx
1443 .data_sender
1444 .send(DataEvent::Data(NautilusData::Bar(bar)))
1445 {
1446 log::error!("Failed to emit completed bar: {e}");
1447 }
1448 } else {
1449 ctx.incomplete_bars.insert(bar.bar_type, bar);
1450 }
1451 }
1452
1453 fn handle_new_instrument_discovered(ticker: &str, ctx: &WsMessageContext) {
1454 log::info!("New instrument discovered via WebSocket: {ticker}");
1455
1456 let http_client = ctx.http_client.clone();
1457 let ws_client = ctx.ws_client.clone();
1458 let data_sender = ctx.data_sender.clone();
1459 let ticker = ticker.to_string();
1460
1461 get_runtime().spawn(async move {
1462 match http_client.fetch_and_cache_single_instrument(&ticker).await {
1463 Ok(Some(instrument)) => {
1464 ws_client.cache_instrument(instrument.clone());
1465 if let Err(e) = data_sender.send(DataEvent::Instrument(instrument)) {
1466 log::error!("Failed to emit new instrument: {e}");
1467 }
1468 log::info!("Fetched and cached new instrument: {ticker}");
1469 }
1470 Ok(None) => {
1471 log::warn!("New instrument {ticker} not found or inactive");
1472 }
1473 Err(e) => {
1474 log::error!("Failed to fetch new instrument {ticker}: {e}");
1475 }
1476 }
1477 });
1478 }
1479
1480 fn handle_data_message(
1481 payloads: Vec<NautilusData>,
1482 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1483 incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1484 clock: &'static AtomicTime,
1485 ) {
1486 for data in payloads {
1487 if let NautilusData::Bar(bar) = data {
1489 Self::handle_bar_message(bar, data_sender, incomplete_bars, clock);
1490 } else if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1491 log::error!("Failed to emit data event: {e}");
1492 }
1493 }
1494 }
1495
1496 fn handle_bar_message(
1497 bar: Bar,
1498 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1499 incomplete_bars: &Arc<DashMap<BarType, Bar>>,
1500 clock: &'static AtomicTime,
1501 ) {
1502 let current_time_ns = clock.get_time_ns();
1503 let bar_type = bar.bar_type;
1504
1505 if bar.ts_event <= current_time_ns {
1506 incomplete_bars.remove(&bar_type);
1508
1509 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Bar(bar))) {
1510 log::error!("Failed to emit completed bar: {e}");
1511 }
1512 } else {
1513 log::trace!(
1515 "Caching incomplete bar for {} (ts_event={}, current={})",
1516 bar_type,
1517 bar.ts_event,
1518 current_time_ns
1519 );
1520 incomplete_bars.insert(bar_type, bar);
1521 }
1522 }
1523
1524 fn resolve_crossed_order_book(
1525 book: &mut OrderBook,
1526 venue_deltas: &OrderBookDeltas,
1527 instrument: &InstrumentAny,
1528 ) -> anyhow::Result<OrderBookDeltas> {
1529 let instrument_id = venue_deltas.instrument_id;
1530 let ts_init = venue_deltas.ts_init;
1531 let mut all_deltas = venue_deltas.deltas.clone();
1532
1533 let snapshot_flag = RecordFlag::F_SNAPSHOT as u8;
1537 let is_snapshot_batch = venue_deltas
1538 .deltas
1539 .iter()
1540 .any(|d| d.flags & snapshot_flag != 0);
1541 let synthetic_flags = if is_snapshot_batch { snapshot_flag } else { 0 };
1542
1543 book.apply_deltas(venue_deltas)?;
1545
1546 let mut is_crossed = if let (Some(bid_price), Some(ask_price)) =
1548 (book.best_bid_price(), book.best_ask_price())
1549 {
1550 bid_price >= ask_price
1551 } else {
1552 false
1553 };
1554
1555 while is_crossed {
1557 log::debug!(
1558 "Resolving crossed order book for {}: bid={:?} >= ask={:?}",
1559 instrument_id,
1560 book.best_bid_price(),
1561 book.best_ask_price()
1562 );
1563
1564 let bid_price = match book.best_bid_price() {
1565 Some(p) => p,
1566 None => break,
1567 };
1568 let ask_price = match book.best_ask_price() {
1569 Some(p) => p,
1570 None => break,
1571 };
1572 let bid_size = match book.best_bid_size() {
1573 Some(s) => s,
1574 None => break,
1575 };
1576 let ask_size = match book.best_ask_size() {
1577 Some(s) => s,
1578 None => break,
1579 };
1580
1581 let mut temp_deltas = Vec::new();
1582
1583 if bid_size > ask_size {
1584 let new_bid_size = Quantity::from_decimal_dp(
1586 bid_size.as_decimal() - ask_size.as_decimal(),
1587 instrument.size_precision(),
1588 )?;
1589 temp_deltas.push(OrderBookDelta::new(
1590 instrument_id,
1591 BookAction::Update,
1592 BookOrder::new(OrderSide::Buy, bid_price, new_bid_size, 0),
1593 synthetic_flags,
1594 0,
1595 ts_init,
1596 ts_init,
1597 ));
1598 temp_deltas.push(OrderBookDelta::new(
1599 instrument_id,
1600 BookAction::Delete,
1601 BookOrder::new(
1602 OrderSide::Sell,
1603 ask_price,
1604 Quantity::zero(instrument.size_precision()),
1605 0,
1606 ),
1607 synthetic_flags,
1608 0,
1609 ts_init,
1610 ts_init,
1611 ));
1612 } else if bid_size < ask_size {
1613 let new_ask_size = Quantity::from_decimal_dp(
1615 ask_size.as_decimal() - bid_size.as_decimal(),
1616 instrument.size_precision(),
1617 )?;
1618 temp_deltas.push(OrderBookDelta::new(
1619 instrument_id,
1620 BookAction::Update,
1621 BookOrder::new(OrderSide::Sell, ask_price, new_ask_size, 0),
1622 synthetic_flags,
1623 0,
1624 ts_init,
1625 ts_init,
1626 ));
1627 temp_deltas.push(OrderBookDelta::new(
1628 instrument_id,
1629 BookAction::Delete,
1630 BookOrder::new(
1631 OrderSide::Buy,
1632 bid_price,
1633 Quantity::zero(instrument.size_precision()),
1634 0,
1635 ),
1636 synthetic_flags,
1637 0,
1638 ts_init,
1639 ts_init,
1640 ));
1641 } else {
1642 temp_deltas.push(OrderBookDelta::new(
1644 instrument_id,
1645 BookAction::Delete,
1646 BookOrder::new(
1647 OrderSide::Buy,
1648 bid_price,
1649 Quantity::zero(instrument.size_precision()),
1650 0,
1651 ),
1652 synthetic_flags,
1653 0,
1654 ts_init,
1655 ts_init,
1656 ));
1657 temp_deltas.push(OrderBookDelta::new(
1658 instrument_id,
1659 BookAction::Delete,
1660 BookOrder::new(
1661 OrderSide::Sell,
1662 ask_price,
1663 Quantity::zero(instrument.size_precision()),
1664 0,
1665 ),
1666 synthetic_flags,
1667 0,
1668 ts_init,
1669 ts_init,
1670 ));
1671 }
1672
1673 let temp_deltas_obj = OrderBookDeltas::new(instrument_id, temp_deltas.clone());
1675 book.apply_deltas(&temp_deltas_obj)?;
1676 all_deltas.extend(temp_deltas);
1677
1678 is_crossed = if let (Some(bid_price), Some(ask_price)) =
1680 (book.best_bid_price(), book.best_ask_price())
1681 {
1682 bid_price >= ask_price
1683 } else {
1684 false
1685 };
1686 }
1687
1688 if let Some(last_delta) = all_deltas.last_mut() {
1691 last_delta.flags = synthetic_flags | RecordFlag::F_LAST as u8;
1692 }
1693
1694 Ok(OrderBookDeltas::new(instrument_id, all_deltas))
1695 }
1696
1697 fn handle_deltas_message(
1698 deltas: OrderBookDeltas,
1699 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1700 order_books: &Arc<DashMap<InstrumentId, OrderBook>>,
1701 last_quotes: &Arc<DashMap<InstrumentId, QuoteTick>>,
1702 instrument_cache: &Arc<InstrumentCache>,
1703 active_quote_subs: &Arc<AtomicSet<InstrumentId>>,
1704 active_delta_subs: &Arc<AtomicSet<InstrumentId>>,
1705 ) {
1706 let instrument_id = deltas.instrument_id;
1707
1708 let instrument = match instrument_cache.get(&instrument_id) {
1710 Some(inst) => inst,
1711 None => {
1712 log::error!("Cannot resolve crossed order book: no instrument for {instrument_id}");
1713 if active_delta_subs.contains(&instrument_id)
1715 && let Err(e) = data_sender.send(DataEvent::Data(NautilusData::from(
1716 OrderBookDeltas_API::new(deltas),
1717 )))
1718 {
1719 log::error!("Failed to emit order book deltas: {e}");
1720 }
1721 return;
1722 }
1723 };
1724
1725 let mut book = order_books
1727 .entry(instrument_id)
1728 .or_insert_with(|| OrderBook::new(instrument_id, BookType::L2_MBP));
1729
1730 let resolved_deltas =
1732 match Self::resolve_crossed_order_book(&mut book, &deltas, &instrument) {
1733 Ok(d) => d,
1734 Err(e) => {
1735 log::error!("Failed to resolve crossed order book for {instrument_id}: {e}");
1736 return;
1737 }
1738 };
1739
1740 if active_quote_subs.contains(&instrument_id) {
1742 let quote_opt = if let (Some(bid_price), Some(ask_price)) =
1745 (book.best_bid_price(), book.best_ask_price())
1746 && let (Some(bid_size), Some(ask_size)) =
1747 (book.best_bid_size(), book.best_ask_size())
1748 {
1749 Some(QuoteTick::new(
1750 instrument_id,
1751 bid_price,
1752 ask_price,
1753 bid_size,
1754 ask_size,
1755 resolved_deltas.ts_event,
1756 resolved_deltas.ts_init,
1757 ))
1758 } else {
1759 if book.best_bid_price().is_none() && book.best_ask_price().is_none() {
1761 log::debug!(
1762 "Empty orderbook for {instrument_id} after applying deltas, using last quote"
1763 );
1764 last_quotes.get(&instrument_id).map(|q| *q)
1765 } else {
1766 None
1767 }
1768 };
1769
1770 if let Some(quote) = quote_opt {
1771 let emit_quote = !matches!(
1773 last_quotes.get(&instrument_id),
1774 Some(existing) if *existing == quote
1775 );
1776
1777 if emit_quote {
1778 last_quotes.insert(instrument_id, quote);
1779 if let Err(e) = data_sender.send(DataEvent::Data(NautilusData::Quote(quote))) {
1780 log::error!("Failed to emit quote tick: {e}");
1781 }
1782 }
1783 } else if book.best_bid_price().is_some() || book.best_ask_price().is_some() {
1784 log::debug!(
1786 "Incomplete top-of-book for {instrument_id} (bid={:?}, ask={:?})",
1787 book.best_bid_price(),
1788 book.best_ask_price()
1789 );
1790 }
1791 }
1792
1793 if active_delta_subs.contains(&instrument_id) {
1795 let data: NautilusData = OrderBookDeltas_API::new(resolved_deltas).into();
1796 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
1797 log::error!("Failed to emit order book deltas event: {e}");
1798 }
1799 }
1800 }
1801}
1802
1803#[cfg(test)]
1804mod tests {
1805 use nautilus_core::UnixNanos;
1806 use nautilus_model::{
1807 data::{BookOrder, OrderBookDelta, OrderBookDeltas},
1808 enums::{BookAction, BookType, OrderSide, RecordFlag},
1809 identifiers::{InstrumentId, Symbol, Venue},
1810 instruments::{CryptoPerpetual, InstrumentAny},
1811 orderbook::OrderBook,
1812 types::{Currency, Price, Quantity},
1813 };
1814 use rstest::rstest;
1815 use rust_decimal_macros::dec;
1816
1817 use super::*;
1818
1819 fn test_instrument() -> InstrumentAny {
1820 let instrument_id = InstrumentId::new(Symbol::new("BTC-USD-PERP"), Venue::new("DYDX"));
1821 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
1822 instrument_id,
1823 instrument_id.symbol,
1824 Currency::BTC(),
1825 Currency::USD(),
1826 Currency::USD(),
1827 false,
1828 2, 8, Price::new(0.01, 2), Quantity::new(0.00000001, 8),
1832 None,
1833 None,
1834 None,
1835 None,
1836 None,
1837 None,
1838 None,
1839 None,
1840 None,
1841 None,
1842 None,
1843 None,
1844 None,
1845 UnixNanos::default(),
1846 UnixNanos::default(),
1847 ))
1848 }
1849
1850 fn seed_book_with_levels(
1851 instrument_id: InstrumentId,
1852 bids: &[(f64, f64)],
1853 asks: &[(f64, f64)],
1854 ) -> OrderBook {
1855 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1856 let ts = UnixNanos::default();
1857
1858 let mut deltas: Vec<OrderBookDelta> = Vec::new();
1859 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts, ts));
1860 for (price, size) in bids {
1861 deltas.push(OrderBookDelta::new(
1862 instrument_id,
1863 BookAction::Add,
1864 BookOrder::new(
1865 OrderSide::Buy,
1866 Price::new(*price, 2),
1867 Quantity::new(*size, 8),
1868 0,
1869 ),
1870 0,
1871 0,
1872 ts,
1873 ts,
1874 ));
1875 }
1876
1877 for (price, size) in asks {
1878 deltas.push(OrderBookDelta::new(
1879 instrument_id,
1880 BookAction::Add,
1881 BookOrder::new(
1882 OrderSide::Sell,
1883 Price::new(*price, 2),
1884 Quantity::new(*size, 8),
1885 0,
1886 ),
1887 0,
1888 0,
1889 ts,
1890 ts,
1891 ));
1892 }
1893
1894 if let Some(last) = deltas.last_mut() {
1895 last.flags = RecordFlag::F_LAST as u8;
1896 }
1897
1898 book.apply_deltas(&OrderBookDeltas::new(instrument_id, deltas))
1899 .expect("failed to apply seed deltas");
1900 book
1901 }
1902
1903 fn crossing_bid_deltas(
1904 instrument_id: InstrumentId,
1905 bid_price: f64,
1906 bid_size: f64,
1907 ) -> OrderBookDeltas {
1908 let ts = UnixNanos::default();
1909 let delta = OrderBookDelta::new(
1910 instrument_id,
1911 BookAction::Add,
1912 BookOrder::new(
1913 OrderSide::Buy,
1914 Price::new(bid_price, 2),
1915 Quantity::new(bid_size, 8),
1916 0,
1917 ),
1918 RecordFlag::F_LAST as u8,
1919 0,
1920 ts,
1921 ts,
1922 );
1923 OrderBookDeltas::new(instrument_id, vec![delta])
1924 }
1925
1926 #[rstest]
1927 fn test_resolve_crossed_order_book_preserves_decimal_precision() {
1928 let instrument = test_instrument();
1933 let instrument_id = instrument.id();
1934 let mut book = seed_book_with_levels(
1935 instrument_id,
1936 &[(99.00, 1.00000000)],
1937 &[(100.05, 0.50000000)],
1938 );
1939
1940 let venue_deltas = crossing_bid_deltas(instrument_id, 100.10, 1.00000001);
1941
1942 let resolved =
1943 DydxDataClient::resolve_crossed_order_book(&mut book, &venue_deltas, &instrument)
1944 .expect("resolution should succeed");
1945
1946 let update = resolved
1949 .deltas
1950 .iter()
1951 .find(|d| {
1952 d.action == BookAction::Update
1953 && d.order.side == OrderSide::Buy
1954 && d.order.price.as_decimal() == dec!(100.10)
1955 })
1956 .expect("expected a Buy Update delta from crossed-book resolution");
1957 assert_eq!(update.order.size.as_decimal(), dec!(0.50000001));
1958
1959 assert_eq!(
1961 resolved.deltas.last().unwrap().flags,
1962 RecordFlag::F_LAST as u8,
1963 );
1964
1965 if let (Some(bid), Some(ask)) = (book.best_bid_price(), book.best_ask_price()) {
1967 assert!(bid < ask, "book still crossed: bid={bid:?} ask={ask:?}");
1968 }
1969 }
1970
1971 fn crossing_snapshot_batch(
1972 instrument_id: InstrumentId,
1973 bid_price: f64,
1974 bid_size: f64,
1975 ) -> OrderBookDeltas {
1976 let ts = UnixNanos::default();
1977 let snapshot = RecordFlag::F_SNAPSHOT as u8;
1978 let last = RecordFlag::F_LAST as u8;
1979 let deltas = vec![OrderBookDelta::new(
1982 instrument_id,
1983 BookAction::Add,
1984 BookOrder::new(
1985 OrderSide::Buy,
1986 Price::new(bid_price, 2),
1987 Quantity::new(bid_size, 8),
1988 0,
1989 ),
1990 snapshot | last,
1991 0,
1992 ts,
1993 ts,
1994 )];
1995 OrderBookDeltas::new(instrument_id, deltas)
1996 }
1997
1998 #[rstest]
2003 fn test_resolve_crossed_order_book_preserves_snapshot_flags() {
2004 let instrument = test_instrument();
2005 let instrument_id = instrument.id();
2006 let mut book = seed_book_with_levels(
2007 instrument_id,
2008 &[(99.00, 1.00000000)],
2009 &[(100.05, 0.50000000)],
2010 );
2011
2012 let venue_deltas = crossing_snapshot_batch(instrument_id, 100.10, 1.00000001);
2013
2014 let resolved =
2015 DydxDataClient::resolve_crossed_order_book(&mut book, &venue_deltas, &instrument)
2016 .expect("resolution should succeed");
2017
2018 let snapshot = RecordFlag::F_SNAPSHOT as u8;
2019 let last = RecordFlag::F_LAST as u8;
2020
2021 for (idx, delta) in resolved.deltas.iter().enumerate() {
2023 assert!(
2024 delta.flags & snapshot != 0,
2025 "delta at index {idx} lost F_SNAPSHOT: flags={:#010b}",
2026 delta.flags,
2027 );
2028 }
2029 assert_eq!(
2030 resolved.deltas.last().unwrap().flags,
2031 snapshot | last,
2032 "snapshot terminator must be F_SNAPSHOT | F_LAST",
2033 );
2034 }
2035
2036 #[rstest]
2037 fn test_resolve_crossed_order_book_equal_sizes_removes_both_levels() {
2038 let instrument = test_instrument();
2041 let instrument_id = instrument.id();
2042 let mut book = seed_book_with_levels(
2043 instrument_id,
2044 &[(99.00, 1.00000000)],
2045 &[(100.05, 1.00000000)],
2046 );
2047
2048 let venue_deltas = crossing_bid_deltas(instrument_id, 100.10, 1.00000000);
2049
2050 let resolved =
2051 DydxDataClient::resolve_crossed_order_book(&mut book, &venue_deltas, &instrument)
2052 .expect("resolution should succeed");
2053
2054 let deletes_count = resolved
2056 .deltas
2057 .iter()
2058 .filter(|d| {
2059 d.action == BookAction::Delete
2060 && (d.order.price.as_decimal() == dec!(100.10)
2061 || d.order.price.as_decimal() == dec!(100.05))
2062 })
2063 .count();
2064 assert_eq!(deletes_count, 2);
2065 }
2066}