1use std::{
54 collections::HashMap,
55 fmt::Debug,
56 num::NonZeroU32,
57 sync::{Arc, LazyLock},
58};
59
60use chrono::{DateTime, Utc};
61use nautilus_core::{
62 UnixNanos,
63 consts::NAUTILUS_USER_AGENT,
64 string::urlencoding,
65 time::{AtomicTime, get_atomic_clock_realtime},
66};
67use nautilus_model::{
68 data::{
69 Bar, BarType, BookOrder, FundingRateUpdate, OrderBookDelta, OrderBookDeltas, TradeTick,
70 },
71 enums::{
72 AggregationSource, BarAggregation, BookAction, OrderSide as NautilusOrderSide, PriceType,
73 RecordFlag,
74 },
75 events::AccountState,
76 identifiers::{AccountId, InstrumentId},
77 instruments::{Instrument, InstrumentAny},
78 reports::{FillReport, OrderStatusReport, PositionStatusReport},
79 types::{Price, Quantity},
80};
81use nautilus_network::{
82 http::{HttpClient, Method, USER_AGENT},
83 ratelimiter::quota::Quota,
84 retry::{RetryConfig, RetryManager},
85};
86use rust_decimal::Decimal;
87use serde::{Deserialize, Serialize, de::DeserializeOwned};
88use tokio_util::sync::CancellationToken;
89
90use super::error::DydxHttpError;
91use crate::{
92 common::{
93 consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
94 enums::{DydxCandleResolution, DydxNetwork},
95 instrument_cache::InstrumentCache,
96 parse::extract_raw_symbol,
97 },
98 http::parse::{parse_account_state_from_http, parse_instrument_any},
99};
100
101const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
103
104const ENDPOINT_PERPETUAL_MARKETS: &str = "/v4/perpetualMarkets";
106
107fn bar_type_to_resolution(bar_type: &BarType) -> anyhow::Result<DydxCandleResolution> {
108 if bar_type.aggregation_source() != AggregationSource::External {
109 anyhow::bail!(
110 "dYdX only supports EXTERNAL aggregation, was {:?}",
111 bar_type.aggregation_source()
112 );
113 }
114
115 let spec = bar_type.spec();
116 if spec.price_type != PriceType::Last {
117 anyhow::bail!(
118 "dYdX only supports LAST price type, was {:?}",
119 spec.price_type
120 );
121 }
122
123 DydxCandleResolution::from_bar_spec(&spec)
124}
125
126pub static DYDX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
132 Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant")
133});
134
135#[derive(Debug, Serialize, Deserialize)]
140pub struct DydxResponse<T> {
141 pub data: T,
143}
144
145pub struct DydxRawHttpClient {
155 base_url: String,
156 client: HttpClient,
157 retry_manager: RetryManager<DydxHttpError>,
158 cancellation_token: CancellationToken,
159 network: DydxNetwork,
160}
161
162impl Default for DydxRawHttpClient {
163 fn default() -> Self {
164 Self::new(None, 60, None, DydxNetwork::Mainnet, None)
165 .expect("Failed to create default DydxRawHttpClient")
166 }
167}
168
169impl Debug for DydxRawHttpClient {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 f.debug_struct(stringify!(DydxRawHttpClient))
172 .field("base_url", &self.base_url)
173 .field("network", &self.network)
174 .finish_non_exhaustive()
175 }
176}
177
178impl DydxRawHttpClient {
179 pub fn cancel_all_requests(&self) {
181 self.cancellation_token.cancel();
182 }
183
184 pub fn cancellation_token(&self) -> &CancellationToken {
186 &self.cancellation_token
187 }
188
189 pub fn new(
198 base_url: Option<String>,
199 timeout_secs: u64,
200 proxy_url: Option<String>,
201 network: DydxNetwork,
202 retry_config: Option<RetryConfig>,
203 ) -> anyhow::Result<Self> {
204 let base_url = match network {
205 DydxNetwork::Testnet => base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string()),
206 DydxNetwork::Mainnet => base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string()),
207 };
208
209 let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
210
211 let mut headers = HashMap::new();
212 headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
213
214 let client = HttpClient::new(
215 headers,
216 vec![], vec![], Some(*DYDX_REST_QUOTA),
219 Some(timeout_secs),
220 proxy_url,
221 )
222 .map_err(|e| {
223 DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
224 })?;
225
226 Ok(Self {
227 base_url,
228 client,
229 retry_manager,
230 cancellation_token: CancellationToken::new(),
231 network,
232 })
233 }
234
235 #[must_use]
237 pub const fn is_testnet(&self) -> bool {
238 matches!(self.network, DydxNetwork::Testnet)
239 }
240
241 #[must_use]
243 pub fn base_url(&self) -> &str {
244 &self.base_url
245 }
246
247 pub async fn send_request<T>(
259 &self,
260 method: Method,
261 endpoint: &str,
262 query_params: Option<&str>,
263 ) -> Result<T, DydxHttpError>
264 where
265 T: DeserializeOwned,
266 {
267 let url = if let Some(params) = query_params {
268 format!("{}{endpoint}?{params}", self.base_url)
269 } else {
270 format!("{}{endpoint}", self.base_url)
271 };
272
273 let operation = || async {
274 let request = self
275 .client
276 .request_with_ustr_keys(
277 method.clone(),
278 url.clone(),
279 None, None, None, None, None, )
285 .await
286 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
287
288 if !request.status.is_success() {
289 return Err(DydxHttpError::HttpStatus {
290 status: request.status.as_u16(),
291 message: String::from_utf8_lossy(&request.body).to_string(),
292 });
293 }
294
295 Ok(request)
296 };
297
298 let should_retry = |error: &DydxHttpError| -> bool {
303 match error {
304 DydxHttpError::HttpClientError(_) => true,
305 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
306 _ => false,
307 }
308 };
309
310 let create_error = |msg: String| -> DydxHttpError {
311 if msg == "canceled" {
312 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
313 } else if msg.contains("Timed out") {
314 DydxHttpError::HttpClientError(msg)
316 } else {
317 DydxHttpError::ValidationError(msg)
318 }
319 };
320
321 let response = self
322 .retry_manager
323 .execute_with_retry_with_cancel(
324 endpoint,
325 operation,
326 should_retry,
327 create_error,
328 &self.cancellation_token,
329 )
330 .await?;
331
332 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
333 error: e.to_string(),
334 body: String::from_utf8_lossy(&response.body).to_string(),
335 })
336 }
337
338 pub async fn send_post_request<T, B>(
351 &self,
352 endpoint: &str,
353 body: &B,
354 ) -> Result<T, DydxHttpError>
355 where
356 T: DeserializeOwned,
357 B: Serialize,
358 {
359 let url = format!("{}{endpoint}", self.base_url);
360
361 let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
362 error: e.to_string(),
363 })?;
364
365 let operation = || async {
366 let request = self
367 .client
368 .request_with_ustr_keys(
369 Method::POST,
370 url.clone(),
371 None, None, Some(body_bytes.clone()),
374 None, None, )
377 .await
378 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
379
380 if !request.status.is_success() {
381 return Err(DydxHttpError::HttpStatus {
382 status: request.status.as_u16(),
383 message: String::from_utf8_lossy(&request.body).to_string(),
384 });
385 }
386
387 Ok(request)
388 };
389
390 let should_retry = |error: &DydxHttpError| -> bool {
392 match error {
393 DydxHttpError::HttpClientError(_) => true,
394 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
395 _ => false,
396 }
397 };
398
399 let create_error = |msg: String| -> DydxHttpError {
400 if msg == "canceled" {
401 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
402 } else if msg.contains("Timed out") {
403 DydxHttpError::HttpClientError(msg)
405 } else {
406 DydxHttpError::ValidationError(msg)
407 }
408 };
409
410 let response = self
411 .retry_manager
412 .execute_with_retry_with_cancel(
413 endpoint,
414 operation,
415 should_retry,
416 create_error,
417 &self.cancellation_token,
418 )
419 .await?;
420
421 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
422 error: e.to_string(),
423 body: String::from_utf8_lossy(&response.body).to_string(),
424 })
425 }
426
427 pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
433 self.send_request(Method::GET, ENDPOINT_PERPETUAL_MARKETS, None)
434 .await
435 }
436
437 pub async fn get_market(
445 &self,
446 ticker: &str,
447 ) -> Result<super::models::MarketsResponse, DydxHttpError> {
448 let query = format!("ticker={ticker}");
449 self.send_request(Method::GET, ENDPOINT_PERPETUAL_MARKETS, Some(&query))
450 .await
451 }
452
453 pub async fn get_orderbook(
459 &self,
460 ticker: &str,
461 ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
462 let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
463 self.send_request(Method::GET, &endpoint, None).await
464 }
465
466 pub async fn get_trades(
472 &self,
473 ticker: &str,
474 limit: Option<u32>,
475 starting_before_or_at_height: Option<u64>,
476 ) -> Result<super::models::TradesResponse, DydxHttpError> {
477 let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
478 let mut query_parts = Vec::new();
479
480 if let Some(l) = limit {
481 query_parts.push(format!("limit={l}"));
482 }
483
484 if let Some(height) = starting_before_or_at_height {
485 query_parts.push(format!("createdBeforeOrAtHeight={height}"));
486 }
487 let query = if query_parts.is_empty() {
488 None
489 } else {
490 Some(query_parts.join("&"))
491 };
492 self.send_request(Method::GET, &endpoint, query.as_deref())
493 .await
494 }
495
496 pub async fn get_candles(
502 &self,
503 ticker: &str,
504 resolution: DydxCandleResolution,
505 limit: Option<u32>,
506 from_iso: Option<DateTime<Utc>>,
507 to_iso: Option<DateTime<Utc>>,
508 ) -> Result<super::models::CandlesResponse, DydxHttpError> {
509 let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
510 let mut query_parts = vec![format!("resolution={resolution}")];
511
512 if let Some(l) = limit {
513 query_parts.push(format!("limit={l}"));
514 }
515
516 if let Some(from) = from_iso {
517 let from_str = from.to_rfc3339();
518 query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
519 }
520
521 if let Some(to) = to_iso {
522 let to_str = to.to_rfc3339();
523 query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
524 }
525 let query = query_parts.join("&");
526 self.send_request(Method::GET, &endpoint, Some(&query))
527 .await
528 }
529
530 pub async fn get_subaccount(
536 &self,
537 address: &str,
538 subaccount_number: u32,
539 ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
540 let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
541 self.send_request(Method::GET, &endpoint, None).await
542 }
543
544 pub async fn get_fills(
550 &self,
551 address: &str,
552 subaccount_number: u32,
553 market: Option<&str>,
554 limit: Option<u32>,
555 ) -> Result<super::models::FillsResponse, DydxHttpError> {
556 let endpoint = "/v4/fills";
557 let mut query_parts = vec![
558 format!("address={address}"),
559 format!("subaccountNumber={subaccount_number}"),
560 ];
561
562 if let Some(m) = market {
563 query_parts.push(format!("market={m}"));
564 query_parts.push("marketType=PERPETUAL".to_string());
565 }
566
567 if let Some(l) = limit {
568 query_parts.push(format!("limit={l}"));
569 }
570 let query = query_parts.join("&");
571 self.send_request(Method::GET, endpoint, Some(&query)).await
572 }
573
574 pub async fn get_orders(
580 &self,
581 address: &str,
582 subaccount_number: u32,
583 market: Option<&str>,
584 limit: Option<u32>,
585 ) -> Result<super::models::OrdersResponse, DydxHttpError> {
586 let endpoint = "/v4/orders";
587 let mut query_parts = vec![
588 format!("address={address}"),
589 format!("subaccountNumber={subaccount_number}"),
590 ];
591
592 if let Some(m) = market {
593 query_parts.push(format!("market={m}"));
594 query_parts.push("marketType=PERPETUAL".to_string());
595 }
596
597 if let Some(l) = limit {
598 query_parts.push(format!("limit={l}"));
599 }
600 let query = query_parts.join("&");
601 self.send_request(Method::GET, endpoint, Some(&query)).await
602 }
603
604 pub async fn get_transfers(
610 &self,
611 address: &str,
612 subaccount_number: u32,
613 limit: Option<u32>,
614 ) -> Result<super::models::TransfersResponse, DydxHttpError> {
615 let endpoint = "/v4/transfers";
616 let mut query_parts = vec![
617 format!("address={address}"),
618 format!("subaccountNumber={subaccount_number}"),
619 ];
620
621 if let Some(l) = limit {
622 query_parts.push(format!("limit={l}"));
623 }
624 let query = query_parts.join("&");
625 self.send_request(Method::GET, endpoint, Some(&query)).await
626 }
627
628 pub async fn get_historical_funding(
634 &self,
635 ticker: &str,
636 limit: Option<u32>,
637 effective_before_or_at_height: Option<u64>,
638 effective_before_or_at: Option<DateTime<Utc>>,
639 ) -> Result<super::models::HistoricalFundingResponse, DydxHttpError> {
640 let endpoint = format!("/v4/historicalFunding/{ticker}");
641 let mut query_parts = Vec::new();
642
643 if let Some(l) = limit {
644 query_parts.push(format!("limit={l}"));
645 }
646
647 if let Some(height) = effective_before_or_at_height {
648 query_parts.push(format!("effectiveBeforeOrAtHeight={height}"));
649 }
650
651 if let Some(before) = effective_before_or_at {
652 let before_str = before.to_rfc3339();
653 query_parts.push(format!(
654 "effectiveBeforeOrAt={}",
655 urlencoding::encode(&before_str)
656 ));
657 }
658
659 let query = if query_parts.is_empty() {
660 None
661 } else {
662 Some(query_parts.join("&"))
663 };
664 self.send_request(Method::GET, &endpoint, query.as_deref())
665 .await
666 }
667
668 pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
674 self.send_request(Method::GET, "/v4/time", None).await
675 }
676
677 pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
683 self.send_request(Method::GET, "/v4/height", None).await
684 }
685}
686
687#[derive(Debug)]
703#[cfg_attr(
704 feature = "python",
705 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx", from_py_object)
706)]
707#[cfg_attr(
708 feature = "python",
709 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.dydx")
710)]
711pub struct DydxHttpClient {
712 pub(crate) inner: Arc<DydxRawHttpClient>,
714 pub(crate) instrument_cache: Arc<InstrumentCache>,
719 clock: &'static AtomicTime,
720}
721
722impl Clone for DydxHttpClient {
723 fn clone(&self) -> Self {
724 Self {
725 inner: self.inner.clone(),
726 instrument_cache: Arc::clone(&self.instrument_cache),
727 clock: self.clock,
728 }
729 }
730}
731
732impl Default for DydxHttpClient {
733 fn default() -> Self {
734 Self::new(None, 60, None, DydxNetwork::Mainnet, None)
735 .expect("Failed to create default DydxHttpClient")
736 }
737}
738
739impl DydxHttpClient {
740 pub fn new(
753 base_url: Option<String>,
754 timeout_secs: u64,
755 proxy_url: Option<String>,
756 network: DydxNetwork,
757 retry_config: Option<RetryConfig>,
758 ) -> anyhow::Result<Self> {
759 Self::new_with_cache(
760 base_url,
761 timeout_secs,
762 proxy_url,
763 network,
764 retry_config,
765 Arc::new(InstrumentCache::new()),
766 )
767 }
768
769 pub fn new_with_cache(
782 base_url: Option<String>,
783 timeout_secs: u64,
784 proxy_url: Option<String>,
785 network: DydxNetwork,
786 retry_config: Option<RetryConfig>,
787 instrument_cache: Arc<InstrumentCache>,
788 ) -> anyhow::Result<Self> {
789 Ok(Self {
790 inner: Arc::new(DydxRawHttpClient::new(
791 base_url,
792 timeout_secs,
793 proxy_url,
794 network,
795 retry_config,
796 )?),
797 instrument_cache,
798 clock: get_atomic_clock_realtime(),
799 })
800 }
801
802 pub async fn request_instruments(
812 &self,
813 symbol: Option<String>,
814 maker_fee: Option<Decimal>,
815 taker_fee: Option<Decimal>,
816 ) -> anyhow::Result<Vec<InstrumentAny>> {
817 let markets_response = self.inner.get_markets().await?;
818 let ts_init = self.generate_ts_init();
819
820 let mut instruments = Vec::new();
821 let mut skipped_inactive = 0;
822
823 for (ticker, market) in markets_response.markets {
824 if let Some(ref sym) = symbol
826 && ticker != *sym
827 {
828 continue;
829 }
830
831 if !super::parse::is_market_active(&market.status) {
832 log::debug!(
833 "Skipping inactive market {ticker} (status: {:?})",
834 market.status
835 );
836 skipped_inactive += 1;
837 continue;
838 }
839
840 match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
841 Ok(instrument) => {
842 instruments.push(instrument);
843 }
844 Err(e) => {
845 log::error!("Failed to parse instrument {ticker}: {e}");
846 }
847 }
848 }
849
850 if skipped_inactive > 0 {
851 log::info!(
852 "Parsed {} instruments, skipped {} inactive",
853 instruments.len(),
854 skipped_inactive
855 );
856 } else {
857 log::debug!("Parsed {} instruments", instruments.len());
858 }
859
860 Ok(instruments)
861 }
862
863 pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
875 let markets_response = self.inner.get_markets().await?;
877 let ts_init = self.generate_ts_init();
878
879 let mut parsed_instruments = Vec::new();
880 let mut parsed_markets = Vec::new();
881 let mut skipped_inactive = 0;
882
883 for (ticker, market) in markets_response.markets {
884 if !super::parse::is_market_active(&market.status) {
885 log::debug!(
886 "Skipping inactive market {ticker} (status: {:?})",
887 market.status
888 );
889 skipped_inactive += 1;
890 continue;
891 }
892
893 match super::parse::parse_instrument_any(&market, None, None, ts_init) {
894 Ok(instrument) => {
895 parsed_instruments.push(instrument);
896 parsed_markets.push(market);
897 }
898 Err(e) => {
899 log::error!("Failed to parse instrument {ticker}: {e}");
900 }
901 }
902 }
903
904 self.instrument_cache.clear();
906
907 let items: Vec<_> = parsed_instruments.into_iter().zip(parsed_markets).collect();
909
910 if !items.is_empty() {
911 self.instrument_cache.insert_many(items.clone());
912 }
913
914 let count = items.len();
915
916 if skipped_inactive > 0 {
917 log::info!("Cached {count} instruments, skipped {skipped_inactive} inactive");
918 } else {
919 log::info!("Cached {count} instruments");
920 }
921
922 Ok(())
923 }
924
925 pub async fn fetch_and_cache_single_instrument(
931 &self,
932 ticker: &str,
933 ) -> anyhow::Result<Option<InstrumentAny>> {
934 let markets_response = self.inner.get_market(ticker).await?;
935 let ts_init = self.generate_ts_init();
936
937 if let Some(market) = markets_response.markets.get(ticker) {
939 if !super::parse::is_market_active(&market.status) {
940 log::debug!(
941 "Skipping inactive market {ticker} (status: {:?})",
942 market.status
943 );
944 return Ok(None);
945 }
946
947 let instrument = parse_instrument_any(market, None, None, ts_init)?;
948 self.instrument_cache
949 .insert(instrument.clone(), market.clone());
950
951 log::info!("Fetched and cached new instrument: {ticker}");
952 return Ok(Some(instrument));
953 }
954
955 Ok(None)
956 }
957
958 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
963 self.instrument_cache.insert_instruments_only(instruments);
964 }
965
966 pub fn cache_instrument(&self, instrument: InstrumentAny) {
971 self.instrument_cache.insert_instrument_only(instrument);
972 }
973
974 #[must_use]
976 pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
977 self.instrument_cache.get(instrument_id)
978 }
979
980 #[must_use]
984 pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
985 self.instrument_cache.get_by_clob_id(clob_pair_id)
986 }
987
988 #[must_use]
992 pub fn get_instrument_by_market(&self, ticker: &str) -> Option<InstrumentAny> {
993 self.instrument_cache.get_by_market(ticker)
994 }
995
996 #[must_use]
1005 pub fn get_market_params(
1006 &self,
1007 instrument_id: &InstrumentId,
1008 ) -> Option<super::models::PerpetualMarket> {
1009 self.instrument_cache.get_market_params(instrument_id)
1010 }
1011
1012 pub async fn request_trades(
1021 &self,
1022 symbol: &str,
1023 limit: Option<u32>,
1024 starting_before_or_at_height: Option<u64>,
1025 ) -> anyhow::Result<super::models::TradesResponse> {
1026 self.inner
1027 .get_trades(symbol, limit, starting_before_or_at_height)
1028 .await
1029 .map_err(Into::into)
1030 }
1031
1032 pub async fn request_candles(
1041 &self,
1042 symbol: &str,
1043 resolution: DydxCandleResolution,
1044 limit: Option<u32>,
1045 from_iso: Option<DateTime<Utc>>,
1046 to_iso: Option<DateTime<Utc>>,
1047 ) -> anyhow::Result<super::models::CandlesResponse> {
1048 self.inner
1049 .get_candles(symbol, resolution, limit, from_iso, to_iso)
1050 .await
1051 .map_err(Into::into)
1052 }
1053
1054 pub async fn request_bars(
1072 &self,
1073 bar_type: BarType,
1074 start: Option<DateTime<Utc>>,
1075 end: Option<DateTime<Utc>>,
1076 limit: Option<u32>,
1077 timestamp_on_close: bool,
1078 ) -> anyhow::Result<Vec<Bar>> {
1079 let resolution = bar_type_to_resolution(&bar_type)?;
1080 let instrument_id = bar_type.instrument_id();
1081
1082 let instrument = self
1083 .get_instrument(&instrument_id)
1084 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1085
1086 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1087 let price_precision = instrument.price_precision();
1088 let size_precision = instrument.size_precision();
1089 let ts_init = self.generate_ts_init();
1090
1091 let mut all_bars: Vec<Bar> = Vec::new();
1092
1093 let spec = bar_type.spec();
1095 let bar_secs: i64 = match spec.aggregation {
1096 BarAggregation::Minute => spec.step.get() as i64 * 60,
1097 BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1098 BarAggregation::Day => spec.step.get() as i64 * 86_400,
1099 _ => anyhow::bail!("Unsupported aggregation: {:?}", spec.aggregation),
1100 };
1101
1102 match (start, end) {
1103 (Some(range_start), Some(range_end)) if range_end > range_start => {
1105 let overall_limit = limit.unwrap_or(u32::MAX);
1106 let mut remaining = overall_limit;
1107 let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1108 let chunk_duration = chrono::Duration::seconds(bar_secs * bars_per_call as i64);
1109 let mut chunk_start = range_start;
1110
1111 while chunk_start < range_end && remaining > 0 {
1112 let chunk_end = (chunk_start + chunk_duration).min(range_end);
1113 let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1114
1115 let response = self
1116 .inner
1117 .get_candles(
1118 ticker,
1119 resolution,
1120 Some(per_call_limit),
1121 Some(chunk_start),
1122 Some(chunk_end),
1123 )
1124 .await?;
1125
1126 let count = response.candles.len() as u32;
1127 if count == 0 {
1128 break;
1129 }
1130
1131 for candle in &response.candles {
1132 match super::parse::parse_bar(
1133 candle,
1134 bar_type,
1135 price_precision,
1136 size_precision,
1137 timestamp_on_close,
1138 ts_init,
1139 ) {
1140 Ok(bar) => all_bars.push(bar),
1141 Err(e) => log::warn!("Failed to parse candle for {instrument_id}: {e}"),
1142 }
1143 }
1144
1145 if remaining <= count {
1146 break;
1147 }
1148 remaining -= count;
1149 chunk_start += chunk_duration;
1150 }
1151 }
1152 _ => {
1154 let req_limit = limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1155 let response = self
1156 .inner
1157 .get_candles(ticker, resolution, Some(req_limit), None, None)
1158 .await?;
1159
1160 for candle in &response.candles {
1161 match super::parse::parse_bar(
1162 candle,
1163 bar_type,
1164 price_precision,
1165 size_precision,
1166 timestamp_on_close,
1167 ts_init,
1168 ) {
1169 Ok(bar) => all_bars.push(bar),
1170 Err(e) => log::warn!("Failed to parse candle for {instrument_id}: {e}"),
1171 }
1172 }
1173 }
1174 }
1175
1176 let current_time_ns = self.generate_ts_init();
1178 all_bars.retain(|bar| bar.ts_event < current_time_ns);
1179
1180 Ok(all_bars)
1181 }
1182
1183 pub async fn request_trade_ticks(
1201 &self,
1202 instrument_id: InstrumentId,
1203 start: Option<DateTime<Utc>>,
1204 end: Option<DateTime<Utc>>,
1205 limit: Option<u32>,
1206 ) -> anyhow::Result<Vec<TradeTick>> {
1207 const DYDX_MAX_TRADES_PER_REQUEST: u32 = 1_000;
1208
1209 if let (Some(s), Some(e)) = (start, end) {
1211 anyhow::ensure!(s < e, "start ({s}) must be before end ({e})");
1212 }
1213
1214 let instrument = self
1215 .get_instrument(&instrument_id)
1216 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1217
1218 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1219 let price_precision = instrument.price_precision();
1220 let size_precision = instrument.size_precision();
1221 let ts_init = self.generate_ts_init();
1222
1223 let overall_limit = limit.unwrap_or(u32::MAX);
1231 let mut remaining = overall_limit;
1232 let mut cursor_height: Option<u64> = None;
1233 let mut all_trades = Vec::new();
1234 let mut seen_trade_ids: ahash::AHashSet<String> = ahash::AHashSet::new();
1237
1238 loop {
1239 let page_limit = remaining.min(DYDX_MAX_TRADES_PER_REQUEST);
1240 let response = self
1241 .inner
1242 .get_trades(ticker, Some(page_limit), cursor_height)
1243 .await?;
1244
1245 let page_count = response.trades.len() as u32;
1246 if page_count == 0 {
1247 break;
1248 }
1249
1250 let oldest_trade = response.trades.last().unwrap();
1252 let oldest_height = oldest_trade.created_at_height;
1253 let oldest_created_at = oldest_trade.created_at;
1254
1255 let mut new_trades_this_page: usize = 0;
1257 let mut page_before_start = false;
1258
1259 for trade in &response.trades {
1260 if !seen_trade_ids.insert(trade.id.clone()) {
1261 continue;
1263 }
1264
1265 if start.is_some_and(|s| trade.created_at < s) {
1266 page_before_start = true;
1267 continue;
1268 }
1269
1270 if end.is_some_and(|e| trade.created_at > e) {
1271 continue;
1272 }
1273
1274 all_trades.push(super::parse::parse_trade_tick(
1275 trade,
1276 instrument_id,
1277 price_precision,
1278 size_precision,
1279 ts_init,
1280 )?);
1281 new_trades_this_page += 1;
1282 }
1283
1284 if let Some(s) = start
1286 && oldest_created_at < s
1287 {
1288 let _ = page_before_start;
1289 break;
1290 }
1291
1292 let next_cursor = Some(oldest_height.saturating_sub(1));
1300
1301 if oldest_height == 0 && new_trades_this_page == 0 {
1304 break;
1305 }
1306 cursor_height = next_cursor;
1307
1308 remaining = remaining.saturating_sub(new_trades_this_page as u32);
1309
1310 if page_count < page_limit || remaining == 0 {
1312 break;
1313 }
1314 }
1315
1316 all_trades.reverse();
1318
1319 if let Some(lim) = limit {
1321 all_trades.truncate(lim as usize);
1322 }
1323
1324 Ok(all_trades)
1325 }
1326
1327 pub async fn request_funding_rates(
1339 &self,
1340 instrument_id: InstrumentId,
1341 start: Option<DateTime<Utc>>,
1342 end: Option<DateTime<Utc>>,
1343 limit: Option<u32>,
1344 ) -> anyhow::Result<Vec<FundingRateUpdate>> {
1345 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1346 let ts_init = self.generate_ts_init();
1347
1348 let response = self
1349 .inner
1350 .get_historical_funding(ticker, limit, None, end)
1351 .await?;
1352
1353 let mut rates = Vec::with_capacity(response.historical_funding.len());
1354
1355 for entry in &response.historical_funding {
1356 if start.is_some_and(|s| entry.effective_at < s) {
1358 continue;
1359 }
1360
1361 let ts_event =
1362 UnixNanos::from(entry.effective_at.timestamp_nanos_opt().ok_or_else(|| {
1363 anyhow::anyhow!("Timestamp overflow for {}", entry.effective_at)
1364 })? as u64);
1365
1366 rates.push(FundingRateUpdate::new(
1367 instrument_id,
1368 entry.rate,
1369 Some(60),
1370 None,
1371 ts_event,
1372 ts_init,
1373 ));
1374 }
1375
1376 rates.reverse();
1378
1379 log::info!("Fetched {} funding rates for {instrument_id}", rates.len(),);
1380
1381 Ok(rates)
1382 }
1383
1384 pub async fn request_orderbook_snapshot(
1395 &self,
1396 instrument_id: InstrumentId,
1397 ) -> anyhow::Result<OrderBookDeltas> {
1398 let instrument = self
1399 .get_instrument(&instrument_id)
1400 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1401
1402 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1403 let response = self.inner.get_orderbook(ticker).await?;
1404
1405 let ts_init = self.generate_ts_init();
1406 let snapshot_flag = RecordFlag::F_SNAPSHOT as u8;
1407
1408 let mut deltas = Vec::with_capacity(1 + response.bids.len() + response.asks.len());
1409
1410 if response.bids.is_empty() && response.asks.is_empty() {
1412 let mut clear_delta = OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init);
1413 clear_delta.flags = snapshot_flag | RecordFlag::F_LAST as u8;
1414 deltas.push(clear_delta);
1415 return Ok(OrderBookDeltas::new(instrument_id, deltas));
1416 }
1417
1418 let mut clear_delta = OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init);
1419 clear_delta.flags = snapshot_flag;
1420 deltas.push(clear_delta);
1421
1422 for (i, level) in response.bids.iter().enumerate() {
1423 let is_last = i == response.bids.len() - 1 && response.asks.is_empty();
1424 let flags = if is_last {
1425 snapshot_flag | RecordFlag::F_LAST as u8
1426 } else {
1427 snapshot_flag
1428 };
1429
1430 let order = BookOrder::new(
1431 NautilusOrderSide::Buy,
1432 Price::from_decimal_dp(level.price, instrument.price_precision())?,
1433 Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1434 0,
1435 );
1436
1437 deltas.push(OrderBookDelta::new(
1438 instrument_id,
1439 BookAction::Add,
1440 order,
1441 flags,
1442 0,
1443 ts_init,
1444 ts_init,
1445 ));
1446 }
1447
1448 for (i, level) in response.asks.iter().enumerate() {
1449 let is_last = i == response.asks.len() - 1;
1450 let flags = if is_last {
1451 snapshot_flag | RecordFlag::F_LAST as u8
1452 } else {
1453 snapshot_flag
1454 };
1455
1456 let order = BookOrder::new(
1457 NautilusOrderSide::Sell,
1458 Price::from_decimal_dp(level.price, instrument.price_precision())?,
1459 Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1460 0,
1461 );
1462
1463 deltas.push(OrderBookDelta::new(
1464 instrument_id,
1465 BookAction::Add,
1466 order,
1467 flags,
1468 0,
1469 ts_init,
1470 ts_init,
1471 ));
1472 }
1473
1474 Ok(OrderBookDeltas::new(instrument_id, deltas))
1475 }
1476
1477 #[must_use]
1483 pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
1484 &self.inner
1485 }
1486
1487 #[must_use]
1489 pub fn is_testnet(&self) -> bool {
1490 self.inner.is_testnet()
1491 }
1492
1493 #[must_use]
1495 pub fn base_url(&self) -> &str {
1496 self.inner.base_url()
1497 }
1498
1499 #[must_use]
1501 pub fn is_cache_initialized(&self) -> bool {
1502 self.instrument_cache.is_initialized()
1503 }
1504
1505 #[must_use]
1507 pub fn cached_instruments_count(&self) -> usize {
1508 self.instrument_cache.len()
1509 }
1510
1511 #[must_use]
1515 pub fn instrument_cache(&self) -> &Arc<InstrumentCache> {
1516 &self.instrument_cache
1517 }
1518
1519 #[must_use]
1523 pub fn all_instruments(&self) -> Vec<InstrumentAny> {
1524 self.instrument_cache.all_instruments()
1525 }
1526
1527 #[must_use]
1529 pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
1530 self.instrument_cache.all_instrument_ids()
1531 }
1532
1533 fn generate_ts_init(&self) -> UnixNanos {
1534 self.clock.get_time_ns()
1535 }
1536
1537 pub async fn request_order_status_reports(
1546 &self,
1547 address: &str,
1548 subaccount_number: u32,
1549 account_id: AccountId,
1550 instrument_id: Option<InstrumentId>,
1551 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1552 let ts_init = self.generate_ts_init();
1553
1554 let market = instrument_id.map(|id| {
1556 let symbol = id.symbol.to_string();
1557 symbol.trim_end_matches("-PERP").to_string()
1559 });
1560
1561 let orders = self
1562 .inner
1563 .get_orders(address, subaccount_number, market.as_deref(), None)
1564 .await?;
1565
1566 let mut reports = Vec::new();
1567
1568 for order in orders {
1569 let instrument = match self.get_instrument_by_clob_id(order.clob_pair_id) {
1571 Some(inst) => inst,
1572 None => {
1573 log::warn!(
1574 "Skipping order {}: no cached instrument for clob_pair_id {}",
1575 order.id,
1576 order.clob_pair_id
1577 );
1578 continue;
1579 }
1580 };
1581
1582 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1584 continue;
1585 }
1586
1587 match super::parse::parse_order_status_report(&order, &instrument, account_id, ts_init)
1588 {
1589 Ok(report) => reports.push(report),
1590 Err(e) => {
1591 log::warn!("Failed to parse order {}: {e}", order.id);
1592 }
1593 }
1594 }
1595
1596 Ok(reports)
1597 }
1598
1599 pub async fn request_fill_reports(
1608 &self,
1609 address: &str,
1610 subaccount_number: u32,
1611 account_id: AccountId,
1612 instrument_id: Option<InstrumentId>,
1613 ) -> anyhow::Result<Vec<FillReport>> {
1614 let ts_init = self.generate_ts_init();
1615
1616 let market = instrument_id.map(|id| {
1618 let symbol = id.symbol.to_string();
1619 symbol.trim_end_matches("-PERP").to_string()
1620 });
1621
1622 let fills_response = self
1623 .inner
1624 .get_fills(address, subaccount_number, market.as_deref(), None)
1625 .await?;
1626
1627 let mut reports = Vec::new();
1628
1629 for fill in fills_response.fills {
1630 let instrument = match self.get_instrument_by_market(&fill.market) {
1632 Some(inst) => inst,
1633 None => {
1634 log::warn!(
1635 "Skipping fill {}: no cached instrument for market {}",
1636 fill.id,
1637 fill.market
1638 );
1639 continue;
1640 }
1641 };
1642
1643 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1645 continue;
1646 }
1647
1648 match super::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1649 Ok(report) => reports.push(report),
1650 Err(e) => {
1651 log::warn!("Failed to parse fill {}: {e}", fill.id);
1652 }
1653 }
1654 }
1655
1656 Ok(reports)
1657 }
1658
1659 pub async fn request_position_status_reports(
1668 &self,
1669 address: &str,
1670 subaccount_number: u32,
1671 account_id: AccountId,
1672 instrument_id: Option<InstrumentId>,
1673 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1674 let ts_init = self.generate_ts_init();
1675
1676 let subaccount_response = self
1677 .inner
1678 .get_subaccount(address, subaccount_number)
1679 .await?;
1680
1681 let mut reports = Vec::new();
1682
1683 for (market, position) in subaccount_response.subaccount.open_perpetual_positions {
1684 let instrument = match self.get_instrument_by_market(&market) {
1686 Some(inst) => inst,
1687 None => {
1688 log::warn!("Skipping position: no cached instrument for market {market}");
1689 continue;
1690 }
1691 };
1692
1693 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1695 continue;
1696 }
1697
1698 match super::parse::parse_position_status_report(
1699 &position,
1700 &instrument,
1701 account_id,
1702 ts_init,
1703 ) {
1704 Ok(report) => reports.push(report),
1705 Err(e) => {
1706 log::warn!("Failed to parse position for {market}: {e}");
1707 }
1708 }
1709 }
1710
1711 Ok(reports)
1712 }
1713
1714 pub async fn request_account_state(
1723 &self,
1724 address: &str,
1725 subaccount_number: u32,
1726 account_id: AccountId,
1727 ) -> anyhow::Result<AccountState> {
1728 let ts_init = self.generate_ts_init();
1729 let subaccount_response = self
1730 .inner
1731 .get_subaccount(address, subaccount_number)
1732 .await?;
1733
1734 let instruments: HashMap<InstrumentId, InstrumentAny> = self
1736 .instrument_cache
1737 .all_instruments()
1738 .into_iter()
1739 .map(|inst| (inst.id(), inst))
1740 .collect();
1741
1742 let oracle_prices = self.instrument_cache.to_oracle_prices_map();
1744
1745 parse_account_state_from_http(
1746 &subaccount_response.subaccount,
1747 account_id,
1748 &instruments,
1749 &oracle_prices,
1750 ts_init,
1751 ts_init,
1752 )
1753 }
1754}
1755
1756#[cfg(test)]
1757mod tests {
1758 use axum::{Router, routing::get};
1759 use nautilus_model::identifiers::{Symbol, Venue};
1760 use rstest::rstest;
1761
1762 use super::*;
1763 use crate::http::error;
1764
1765 #[tokio::test]
1766 async fn test_raw_client_creation() {
1767 let client = DydxRawHttpClient::new(None, 30, None, DydxNetwork::Mainnet, None);
1768 assert!(client.is_ok());
1769
1770 let client = client.unwrap();
1771 assert!(!client.is_testnet());
1772 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1773 }
1774
1775 #[tokio::test]
1776 async fn test_raw_client_testnet() {
1777 let client = DydxRawHttpClient::new(None, 30, None, DydxNetwork::Testnet, None);
1778 assert!(client.is_ok());
1779
1780 let client = client.unwrap();
1781 assert!(client.is_testnet());
1782 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1783 }
1784
1785 #[tokio::test]
1786 async fn test_domain_client_creation() {
1787 let client = DydxHttpClient::new(None, 30, None, DydxNetwork::Mainnet, None);
1788 assert!(client.is_ok());
1789
1790 let client = client.unwrap();
1791 assert!(!client.is_testnet());
1792 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1793 assert!(!client.is_cache_initialized());
1794 assert_eq!(client.cached_instruments_count(), 0);
1795 }
1796
1797 #[tokio::test]
1798 async fn test_domain_client_testnet() {
1799 let client = DydxHttpClient::new(None, 30, None, DydxNetwork::Testnet, None);
1800 assert!(client.is_ok());
1801
1802 let client = client.unwrap();
1803 assert!(client.is_testnet());
1804 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1805 }
1806
1807 #[tokio::test]
1808 async fn test_domain_client_default() {
1809 let client = DydxHttpClient::default();
1810 assert!(!client.is_testnet());
1811 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1812 assert!(!client.is_cache_initialized());
1813 }
1814
1815 #[tokio::test]
1816 async fn test_domain_client_clone() {
1817 let client = DydxHttpClient::new(None, 30, None, DydxNetwork::Mainnet, None).unwrap();
1818
1819 let cloned = client.clone();
1821 assert!(!cloned.is_cache_initialized());
1822
1823 client.instrument_cache.insert_instruments_only(vec![]);
1824
1825 #[expect(clippy::redundant_clone)]
1827 let cloned_after = client.clone();
1828 assert!(cloned_after.is_cache_initialized());
1829 }
1830
1831 #[rstest]
1832 fn test_domain_client_get_instrument_not_found() {
1833 let client = DydxHttpClient::default();
1834 let instrument_id = InstrumentId::new(Symbol::new("ETH-USD-PERP"), Venue::new("DYDX"));
1835 let result = client.get_instrument(&instrument_id);
1836 assert!(result.is_none());
1837 }
1838
1839 #[tokio::test]
1840 async fn test_http_timeout_respects_configuration_and_does_not_block() {
1841 use tokio::net::TcpListener;
1842
1843 async fn slow_handler() -> &'static str {
1844 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1846 "ok"
1847 }
1848
1849 let router = Router::new().route("/v4/slow", get(slow_handler));
1850
1851 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1852 let addr = listener.local_addr().unwrap();
1853
1854 tokio::spawn(async move {
1855 axum::serve(listener, router.into_make_service())
1856 .await
1857 .unwrap();
1858 });
1859
1860 let base_url = format!("http://{addr}");
1861
1862 let retry_config = RetryConfig {
1865 max_retries: 0,
1866 initial_delay_ms: 0,
1867 max_delay_ms: 0,
1868 backoff_factor: 1.0,
1869 jitter_ms: 0,
1870 operation_timeout_ms: Some(500),
1871 immediate_first: true,
1872 max_elapsed_ms: Some(1_000),
1873 };
1874
1875 let client = DydxRawHttpClient::new(
1878 Some(base_url),
1879 60,
1880 None,
1881 DydxNetwork::Mainnet,
1882 Some(retry_config),
1883 )
1884 .unwrap();
1885
1886 let start = std::time::Instant::now();
1887 let result: Result<serde_json::Value, error::DydxHttpError> =
1888 client.send_request(Method::GET, "/v4/slow", None).await;
1889 let elapsed = start.elapsed();
1890
1891 assert!(result.is_err());
1894 assert!(elapsed < std::time::Duration::from_secs(3));
1895 }
1896}