Skip to main content

nautilus_dydx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **dYdX v4 Indexer REST API** –
17//! <https://docs.dydx.xyz/api_integration-indexer/indexer_api>.
18//!
19//! This module exports two complementary HTTP clients following the standardized
20//! two-layer architecture pattern established in OKX, Bybit, and BitMEX adapters:
21//!
22//! - [`DydxRawHttpClient`]: Low-level HTTP methods matching dYdX Indexer API endpoints.
23//! - [`DydxHttpClient`]: High-level methods using Nautilus domain types with instrument caching.
24//!
25//! ## Two-Layer Architecture
26//!
27//! The raw client handles HTTP communication, rate limiting, retries, and basic response parsing.
28//! The domain client wraps the raw client in an `Arc`, maintains an instrument cache using `DashMap`,
29//! and provides high-level methods that work with Nautilus domain types.
30//!
31//! ## Responsibilities
32//!
33//! - Rate-limiting based on the public dYdX specification.
34//! - Zero-copy deserialization of large JSON payloads into domain models.
35//! - Conversion of raw exchange errors into the rich [`DydxHttpError`] enum.
36//! - Instrument caching with standard methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
37//!
38//! # Important Note
39//!
40//! The dYdX v4 Indexer REST API does **NOT** require authentication or request signing.
41//! All endpoints are publicly accessible using only wallet addresses and subaccount numbers
42//! as query parameters. Order submission and trading operations use gRPC with blockchain
43//! transaction signing, not REST API.
44//!
45//! # Official Documentation
46//!
47//! | Endpoint          | Reference                                                                 |
48//! |-------------------|---------------------------------------------------------------------------|
49//! | Market data       | <https://docs.dydx.xyz/api_integration-indexer/indexer_api#markets>  |
50//! | Account data      | <https://docs.dydx.xyz/api_integration-indexer/indexer_api#accounts> |
51//! | Utility endpoints | <https://docs.dydx.xyz/api_integration-indexer/indexer_api#utility>  |
52
53use std::{
54    collections::HashMap,
55    fmt::Debug,
56    num::NonZeroU32,
57    sync::{Arc, LazyLock},
58};
59
60use chrono::{DateTime, Utc};
61use nautilus_core::{
62    UnixNanos,
63    consts::NAUTILUS_USER_AGENT,
64    string::urlencoding,
65    time::{AtomicTime, get_atomic_clock_realtime},
66};
67use nautilus_model::{
68    data::{
69        Bar, BarType, BookOrder, FundingRateUpdate, OrderBookDelta, OrderBookDeltas, TradeTick,
70    },
71    enums::{
72        AggregationSource, BarAggregation, BookAction, OrderSide as NautilusOrderSide, PriceType,
73        RecordFlag,
74    },
75    events::AccountState,
76    identifiers::{AccountId, InstrumentId},
77    instruments::{Instrument, InstrumentAny},
78    reports::{FillReport, OrderStatusReport, PositionStatusReport},
79    types::{Price, Quantity},
80};
81use nautilus_network::{
82    http::{HttpClient, Method, USER_AGENT},
83    ratelimiter::quota::Quota,
84    retry::{RetryConfig, RetryManager},
85};
86use rust_decimal::Decimal;
87use serde::{Deserialize, Serialize, de::DeserializeOwned};
88use tokio_util::sync::CancellationToken;
89
90use super::error::DydxHttpError;
91use crate::{
92    common::{
93        consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
94        enums::{DydxCandleResolution, DydxNetwork},
95        instrument_cache::InstrumentCache,
96        parse::extract_raw_symbol,
97    },
98    http::parse::{parse_account_state_from_http, parse_instrument_any},
99};
100
101/// Maximum number of candles returned per dYdX API request.
102const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
103
104/// Perpetual markets endpoint (shared between `get_markets` and `get_market`).
105const ENDPOINT_PERPETUAL_MARKETS: &str = "/v4/perpetualMarkets";
106
107fn bar_type_to_resolution(bar_type: &BarType) -> anyhow::Result<DydxCandleResolution> {
108    if bar_type.aggregation_source() != AggregationSource::External {
109        anyhow::bail!(
110            "dYdX only supports EXTERNAL aggregation, was {:?}",
111            bar_type.aggregation_source()
112        );
113    }
114
115    let spec = bar_type.spec();
116    if spec.price_type != PriceType::Last {
117        anyhow::bail!(
118            "dYdX only supports LAST price type, was {:?}",
119            spec.price_type
120        );
121    }
122
123    DydxCandleResolution::from_bar_spec(&spec)
124}
125
126/// Default dYdX Indexer REST API rate limit.
127///
128/// The dYdX Indexer API rate limits are generous for read-only operations:
129/// - General: 100 requests per 10 seconds per IP
130/// - We use a conservative 10 requests per second as the default quota.
131pub static DYDX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
132    Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant")
133});
134
135/// Represents a dYdX HTTP response wrapper.
136///
137/// Most dYdX Indexer API endpoints return data directly without a wrapper,
138/// but some endpoints may use this structure for consistency.
139#[derive(Debug, Serialize, Deserialize)]
140pub struct DydxResponse<T> {
141    /// The typed data returned by the dYdX endpoint.
142    pub data: T,
143}
144
145/// Provides a raw HTTP client for interacting with the [dYdX v4](https://dydx.exchange) Indexer REST API.
146///
147/// This client wraps the underlying [`HttpClient`] to handle functionality
148/// specific to dYdX Indexer API, such as rate-limiting, forming request URLs,
149/// and deserializing responses into dYdX specific data models.
150///
151/// **Note**: Unlike traditional centralized exchanges, the dYdX v4 Indexer REST API
152/// does NOT require authentication, API keys, or request signing. All endpoints are
153/// publicly accessible.
154pub struct DydxRawHttpClient {
155    base_url: String,
156    client: HttpClient,
157    retry_manager: RetryManager<DydxHttpError>,
158    cancellation_token: CancellationToken,
159    network: DydxNetwork,
160}
161
162impl Default for DydxRawHttpClient {
163    fn default() -> Self {
164        Self::new(None, 60, None, DydxNetwork::Mainnet, None)
165            .expect("Failed to create default DydxRawHttpClient")
166    }
167}
168
169impl Debug for DydxRawHttpClient {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        f.debug_struct(stringify!(DydxRawHttpClient))
172            .field("base_url", &self.base_url)
173            .field("network", &self.network)
174            .finish_non_exhaustive()
175    }
176}
177
178impl DydxRawHttpClient {
179    /// Cancels all pending HTTP requests.
180    pub fn cancel_all_requests(&self) {
181        self.cancellation_token.cancel();
182    }
183
184    /// Returns the cancellation token for this client.
185    pub fn cancellation_token(&self) -> &CancellationToken {
186        &self.cancellation_token
187    }
188
189    /// Creates a new [`DydxRawHttpClient`] using the default dYdX Indexer HTTP URL,
190    /// optionally overridden with a custom base URL.
191    ///
192    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if the retry manager cannot be created.
197    pub fn new(
198        base_url: Option<String>,
199        timeout_secs: u64,
200        proxy_url: Option<String>,
201        network: DydxNetwork,
202        retry_config: Option<RetryConfig>,
203    ) -> anyhow::Result<Self> {
204        let base_url = match network {
205            DydxNetwork::Testnet => base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string()),
206            DydxNetwork::Mainnet => base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string()),
207        };
208
209        let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
210
211        let mut headers = HashMap::new();
212        headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
213
214        let client = HttpClient::new(
215            headers,
216            vec![], // No specific headers to extract from responses
217            vec![], // No keyed quotas (we use a single global quota)
218            Some(*DYDX_REST_QUOTA),
219            Some(timeout_secs),
220            proxy_url,
221        )
222        .map_err(|e| {
223            DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
224        })?;
225
226        Ok(Self {
227            base_url,
228            client,
229            retry_manager,
230            cancellation_token: CancellationToken::new(),
231            network,
232        })
233    }
234
235    /// Returns `true` if this client is configured for testnet.
236    #[must_use]
237    pub const fn is_testnet(&self) -> bool {
238        matches!(self.network, DydxNetwork::Testnet)
239    }
240
241    /// Returns the base URL used by this client.
242    #[must_use]
243    pub fn base_url(&self) -> &str {
244        &self.base_url
245    }
246
247    /// Sends a request to a dYdX Indexer API endpoint.
248    ///
249    /// **Note**: dYdX Indexer API does not require authentication headers.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if:
254    /// - The HTTP request fails.
255    /// - The response has a non-success HTTP status code.
256    /// - The response body cannot be deserialized to type `T`.
257    /// - The request is canceled.
258    pub async fn send_request<T>(
259        &self,
260        method: Method,
261        endpoint: &str,
262        query_params: Option<&str>,
263    ) -> Result<T, DydxHttpError>
264    where
265        T: DeserializeOwned,
266    {
267        let url = if let Some(params) = query_params {
268            format!("{}{endpoint}?{params}", self.base_url)
269        } else {
270            format!("{}{endpoint}", self.base_url)
271        };
272
273        let operation = || async {
274            let request = self
275                .client
276                .request_with_ustr_keys(
277                    method.clone(),
278                    url.clone(),
279                    None, // No params
280                    None, // No additional headers
281                    None, // No body for GET requests
282                    None, // Use default timeout
283                    None, // No specific rate limit keys (using global quota)
284                )
285                .await
286                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
287
288            if !request.status.is_success() {
289                return Err(DydxHttpError::HttpStatus {
290                    status: request.status.as_u16(),
291                    message: String::from_utf8_lossy(&request.body).to_string(),
292                });
293            }
294
295            Ok(request)
296        };
297
298        // Retry strategy for dYdX Indexer API:
299        // 1. Network errors: always retry (transient connection issues)
300        // 2. HTTP 429/5xx: rate limiting and server errors should be retried
301        // 3. Client errors (4xx except 429): should NOT be retried
302        let should_retry = |error: &DydxHttpError| -> bool {
303            match error {
304                DydxHttpError::HttpClientError(_) => true,
305                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
306                _ => false,
307            }
308        };
309
310        let create_error = |msg: String| -> DydxHttpError {
311            if msg == "canceled" {
312                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
313            } else if msg.contains("Timed out") {
314                // Timeouts are transient -- map to HttpClientError so they are retried
315                DydxHttpError::HttpClientError(msg)
316            } else {
317                DydxHttpError::ValidationError(msg)
318            }
319        };
320
321        let response = self
322            .retry_manager
323            .execute_with_retry_with_cancel(
324                endpoint,
325                operation,
326                should_retry,
327                create_error,
328                &self.cancellation_token,
329            )
330            .await?;
331
332        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
333            error: e.to_string(),
334            body: String::from_utf8_lossy(&response.body).to_string(),
335        })
336    }
337
338    /// Sends a POST request to a dYdX Indexer API endpoint.
339    ///
340    /// Note: Most dYdX Indexer endpoints are GET-based. POST is rarely used.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if:
345    /// - The request body cannot be serialized to JSON.
346    /// - The HTTP request fails.
347    /// - The response has a non-success HTTP status code.
348    /// - The response body cannot be deserialized to type `T`.
349    /// - The request is canceled.
350    pub async fn send_post_request<T, B>(
351        &self,
352        endpoint: &str,
353        body: &B,
354    ) -> Result<T, DydxHttpError>
355    where
356        T: DeserializeOwned,
357        B: Serialize,
358    {
359        let url = format!("{}{endpoint}", self.base_url);
360
361        let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
362            error: e.to_string(),
363        })?;
364
365        let operation = || async {
366            let request = self
367                .client
368                .request_with_ustr_keys(
369                    Method::POST,
370                    url.clone(),
371                    None, // No params
372                    None, // No additional headers (content-type handled by body)
373                    Some(body_bytes.clone()),
374                    None, // Use default timeout
375                    None, // No specific rate limit keys (using global quota)
376                )
377                .await
378                .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
379
380            if !request.status.is_success() {
381                return Err(DydxHttpError::HttpStatus {
382                    status: request.status.as_u16(),
383                    message: String::from_utf8_lossy(&request.body).to_string(),
384                });
385            }
386
387            Ok(request)
388        };
389
390        // Retry strategy (same as GET requests)
391        let should_retry = |error: &DydxHttpError| -> bool {
392            match error {
393                DydxHttpError::HttpClientError(_) => true,
394                DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
395                _ => false,
396            }
397        };
398
399        let create_error = |msg: String| -> DydxHttpError {
400            if msg == "canceled" {
401                DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
402            } else if msg.contains("Timed out") {
403                // Timeouts are transient -- map to HttpClientError so they are retried
404                DydxHttpError::HttpClientError(msg)
405            } else {
406                DydxHttpError::ValidationError(msg)
407            }
408        };
409
410        let response = self
411            .retry_manager
412            .execute_with_retry_with_cancel(
413                endpoint,
414                operation,
415                should_retry,
416                create_error,
417                &self.cancellation_token,
418            )
419            .await?;
420
421        serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
422            error: e.to_string(),
423            body: String::from_utf8_lossy(&response.body).to_string(),
424        })
425    }
426
427    /// Fetch all perpetual markets from dYdX.
428    ///
429    /// # Errors
430    ///
431    /// Returns an error if the HTTP request fails or response parsing fails.
432    pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
433        self.send_request(Method::GET, ENDPOINT_PERPETUAL_MARKETS, None)
434            .await
435    }
436
437    /// Fetch a single perpetual market by ticker.
438    ///
439    /// Uses the `market` query parameter for efficient single-market fetch.
440    ///
441    /// # Errors
442    ///
443    /// Returns an error if the HTTP request fails or response parsing fails.
444    pub async fn get_market(
445        &self,
446        ticker: &str,
447    ) -> Result<super::models::MarketsResponse, DydxHttpError> {
448        let query = format!("ticker={ticker}");
449        self.send_request(Method::GET, ENDPOINT_PERPETUAL_MARKETS, Some(&query))
450            .await
451    }
452
453    /// Fetch orderbook for a specific market.
454    ///
455    /// # Errors
456    ///
457    /// Returns an error if the HTTP request fails or response parsing fails.
458    pub async fn get_orderbook(
459        &self,
460        ticker: &str,
461    ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
462        let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
463        self.send_request(Method::GET, &endpoint, None).await
464    }
465
466    /// Fetch recent trades for a market.
467    ///
468    /// # Errors
469    ///
470    /// Returns an error if the HTTP request fails or response parsing fails.
471    pub async fn get_trades(
472        &self,
473        ticker: &str,
474        limit: Option<u32>,
475        starting_before_or_at_height: Option<u64>,
476    ) -> Result<super::models::TradesResponse, DydxHttpError> {
477        let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
478        let mut query_parts = Vec::new();
479
480        if let Some(l) = limit {
481            query_parts.push(format!("limit={l}"));
482        }
483
484        if let Some(height) = starting_before_or_at_height {
485            query_parts.push(format!("createdBeforeOrAtHeight={height}"));
486        }
487        let query = if query_parts.is_empty() {
488            None
489        } else {
490            Some(query_parts.join("&"))
491        };
492        self.send_request(Method::GET, &endpoint, query.as_deref())
493            .await
494    }
495
496    /// Fetch candles/klines for a market.
497    ///
498    /// # Errors
499    ///
500    /// Returns an error if the HTTP request fails or response parsing fails.
501    pub async fn get_candles(
502        &self,
503        ticker: &str,
504        resolution: DydxCandleResolution,
505        limit: Option<u32>,
506        from_iso: Option<DateTime<Utc>>,
507        to_iso: Option<DateTime<Utc>>,
508    ) -> Result<super::models::CandlesResponse, DydxHttpError> {
509        let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
510        let mut query_parts = vec![format!("resolution={resolution}")];
511
512        if let Some(l) = limit {
513            query_parts.push(format!("limit={l}"));
514        }
515
516        if let Some(from) = from_iso {
517            let from_str = from.to_rfc3339();
518            query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
519        }
520
521        if let Some(to) = to_iso {
522            let to_str = to.to_rfc3339();
523            query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
524        }
525        let query = query_parts.join("&");
526        self.send_request(Method::GET, &endpoint, Some(&query))
527            .await
528    }
529
530    /// Fetch subaccount information.
531    ///
532    /// # Errors
533    ///
534    /// Returns an error if the HTTP request fails or response parsing fails.
535    pub async fn get_subaccount(
536        &self,
537        address: &str,
538        subaccount_number: u32,
539    ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
540        let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
541        self.send_request(Method::GET, &endpoint, None).await
542    }
543
544    /// Fetch fills for a subaccount.
545    ///
546    /// # Errors
547    ///
548    /// Returns an error if the HTTP request fails or response parsing fails.
549    pub async fn get_fills(
550        &self,
551        address: &str,
552        subaccount_number: u32,
553        market: Option<&str>,
554        limit: Option<u32>,
555    ) -> Result<super::models::FillsResponse, DydxHttpError> {
556        let endpoint = "/v4/fills";
557        let mut query_parts = vec![
558            format!("address={address}"),
559            format!("subaccountNumber={subaccount_number}"),
560        ];
561
562        if let Some(m) = market {
563            query_parts.push(format!("market={m}"));
564            query_parts.push("marketType=PERPETUAL".to_string());
565        }
566
567        if let Some(l) = limit {
568            query_parts.push(format!("limit={l}"));
569        }
570        let query = query_parts.join("&");
571        self.send_request(Method::GET, endpoint, Some(&query)).await
572    }
573
574    /// Fetch orders for a subaccount.
575    ///
576    /// # Errors
577    ///
578    /// Returns an error if the HTTP request fails or response parsing fails.
579    pub async fn get_orders(
580        &self,
581        address: &str,
582        subaccount_number: u32,
583        market: Option<&str>,
584        limit: Option<u32>,
585    ) -> Result<super::models::OrdersResponse, DydxHttpError> {
586        let endpoint = "/v4/orders";
587        let mut query_parts = vec![
588            format!("address={address}"),
589            format!("subaccountNumber={subaccount_number}"),
590        ];
591
592        if let Some(m) = market {
593            query_parts.push(format!("market={m}"));
594            query_parts.push("marketType=PERPETUAL".to_string());
595        }
596
597        if let Some(l) = limit {
598            query_parts.push(format!("limit={l}"));
599        }
600        let query = query_parts.join("&");
601        self.send_request(Method::GET, endpoint, Some(&query)).await
602    }
603
604    /// Fetch transfers for a subaccount.
605    ///
606    /// # Errors
607    ///
608    /// Returns an error if the HTTP request fails or response parsing fails.
609    pub async fn get_transfers(
610        &self,
611        address: &str,
612        subaccount_number: u32,
613        limit: Option<u32>,
614    ) -> Result<super::models::TransfersResponse, DydxHttpError> {
615        let endpoint = "/v4/transfers";
616        let mut query_parts = vec![
617            format!("address={address}"),
618            format!("subaccountNumber={subaccount_number}"),
619        ];
620
621        if let Some(l) = limit {
622            query_parts.push(format!("limit={l}"));
623        }
624        let query = query_parts.join("&");
625        self.send_request(Method::GET, endpoint, Some(&query)).await
626    }
627
628    /// Fetch historical funding rates for a market.
629    ///
630    /// # Errors
631    ///
632    /// Returns an error if the HTTP request fails or response parsing fails.
633    pub async fn get_historical_funding(
634        &self,
635        ticker: &str,
636        limit: Option<u32>,
637        effective_before_or_at_height: Option<u64>,
638        effective_before_or_at: Option<DateTime<Utc>>,
639    ) -> Result<super::models::HistoricalFundingResponse, DydxHttpError> {
640        let endpoint = format!("/v4/historicalFunding/{ticker}");
641        let mut query_parts = Vec::new();
642
643        if let Some(l) = limit {
644            query_parts.push(format!("limit={l}"));
645        }
646
647        if let Some(height) = effective_before_or_at_height {
648            query_parts.push(format!("effectiveBeforeOrAtHeight={height}"));
649        }
650
651        if let Some(before) = effective_before_or_at {
652            let before_str = before.to_rfc3339();
653            query_parts.push(format!(
654                "effectiveBeforeOrAt={}",
655                urlencoding::encode(&before_str)
656            ));
657        }
658
659        let query = if query_parts.is_empty() {
660            None
661        } else {
662            Some(query_parts.join("&"))
663        };
664        self.send_request(Method::GET, &endpoint, query.as_deref())
665            .await
666    }
667
668    /// Returns the current server time.
669    ///
670    /// # Errors
671    ///
672    /// Returns an error if the HTTP request fails or response parsing fails.
673    pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
674        self.send_request(Method::GET, "/v4/time", None).await
675    }
676
677    /// Returns the current blockchain height.
678    ///
679    /// # Errors
680    ///
681    /// Returns an error if the HTTP request fails or response parsing fails.
682    pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
683        self.send_request(Method::GET, "/v4/height", None).await
684    }
685}
686
687/// Provides a higher-level HTTP client for the [dYdX v4](https://dydx.exchange) Indexer REST API.
688///
689/// This client wraps the underlying `DydxRawHttpClient` to handle conversions
690/// into the Nautilus domain model, following the two-layer pattern established
691/// in OKX, Bybit, and BitMEX adapters.
692///
693/// **Architecture:**
694/// - **Raw client** (`DydxRawHttpClient`): Low-level HTTP methods matching dYdX Indexer API endpoints.
695/// - **Domain client** (`DydxHttpClient`): High-level methods using Nautilus domain types.
696///
697/// The domain client:
698/// - Wraps the raw client in an `Arc` for efficient cloning (required for Python bindings).
699/// - Maintains an instrument cache using `DashMap` for thread-safe concurrent access.
700/// - Provides standard cache methods: `cache_instruments()`, `cache_instrument()`, `get_instrument()`.
701/// - Tracks cache initialization state for optimizations.
702#[derive(Debug)]
703#[cfg_attr(
704    feature = "python",
705    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx", from_py_object)
706)]
707#[cfg_attr(
708    feature = "python",
709    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.dydx")
710)]
711pub struct DydxHttpClient {
712    /// Raw HTTP client wrapped in Arc for efficient cloning.
713    pub(crate) inner: Arc<DydxRawHttpClient>,
714    /// Shared instrument cache with multiple lookup indices.
715    ///
716    /// This cache is shared across HTTP client, WebSocket client, and execution client.
717    /// It provides O(1) lookups by symbol, market ticker, or clob_pair_id.
718    pub(crate) instrument_cache: Arc<InstrumentCache>,
719    clock: &'static AtomicTime,
720}
721
722impl Clone for DydxHttpClient {
723    fn clone(&self) -> Self {
724        Self {
725            inner: self.inner.clone(),
726            instrument_cache: Arc::clone(&self.instrument_cache),
727            clock: self.clock,
728        }
729    }
730}
731
732impl Default for DydxHttpClient {
733    fn default() -> Self {
734        Self::new(None, 60, None, DydxNetwork::Mainnet, None)
735            .expect("Failed to create default DydxHttpClient")
736    }
737}
738
739impl DydxHttpClient {
740    /// Creates a new [`DydxHttpClient`] using the default dYdX Indexer HTTP URL,
741    /// optionally overridden with a custom base URL.
742    ///
743    /// This constructor creates its own internal instrument cache. For shared caching
744    /// across multiple clients, use [`new_with_cache`](Self::new_with_cache) instead.
745    ///
746    /// **Note**: No credentials are required as the dYdX Indexer API is publicly accessible.
747    /// Order submission and trading operations use gRPC with blockchain transaction signing.
748    ///
749    /// # Errors
750    ///
751    /// Returns an error if the underlying HTTP client or retry manager cannot be created.
752    pub fn new(
753        base_url: Option<String>,
754        timeout_secs: u64,
755        proxy_url: Option<String>,
756        network: DydxNetwork,
757        retry_config: Option<RetryConfig>,
758    ) -> anyhow::Result<Self> {
759        Self::new_with_cache(
760            base_url,
761            timeout_secs,
762            proxy_url,
763            network,
764            retry_config,
765            Arc::new(InstrumentCache::new()),
766        )
767    }
768
769    /// Creates a new [`DydxHttpClient`] with a shared instrument cache.
770    ///
771    /// Use this constructor when sharing instrument data between HTTP client,
772    /// WebSocket client, and execution client.
773    ///
774    /// # Arguments
775    ///
776    /// * `instrument_cache` - Shared instrument cache for lookups by symbol, ticker, or clob_pair_id
777    ///
778    /// # Errors
779    ///
780    /// Returns an error if the underlying HTTP client or retry manager cannot be created.
781    pub fn new_with_cache(
782        base_url: Option<String>,
783        timeout_secs: u64,
784        proxy_url: Option<String>,
785        network: DydxNetwork,
786        retry_config: Option<RetryConfig>,
787        instrument_cache: Arc<InstrumentCache>,
788    ) -> anyhow::Result<Self> {
789        Ok(Self {
790            inner: Arc::new(DydxRawHttpClient::new(
791                base_url,
792                timeout_secs,
793                proxy_url,
794                network,
795                retry_config,
796            )?),
797            instrument_cache,
798            clock: get_atomic_clock_realtime(),
799        })
800    }
801
802    /// Requests instruments from the dYdX Indexer API and returns Nautilus domain types.
803    ///
804    /// This method does NOT automatically cache results. Use `fetch_and_cache_instruments()`
805    /// for automatic caching, or call `cache_instruments()` manually with the results.
806    ///
807    /// # Errors
808    ///
809    /// Returns an error if the HTTP request or parsing fails.
810    /// Individual instrument parsing errors are logged as warnings.
811    pub async fn request_instruments(
812        &self,
813        symbol: Option<String>,
814        maker_fee: Option<Decimal>,
815        taker_fee: Option<Decimal>,
816    ) -> anyhow::Result<Vec<InstrumentAny>> {
817        let markets_response = self.inner.get_markets().await?;
818        let ts_init = self.generate_ts_init();
819
820        let mut instruments = Vec::new();
821        let mut skipped_inactive = 0;
822
823        for (ticker, market) in markets_response.markets {
824            // Filter by symbol if specified
825            if let Some(ref sym) = symbol
826                && ticker != *sym
827            {
828                continue;
829            }
830
831            if !super::parse::is_market_active(&market.status) {
832                log::debug!(
833                    "Skipping inactive market {ticker} (status: {:?})",
834                    market.status
835                );
836                skipped_inactive += 1;
837                continue;
838            }
839
840            match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
841                Ok(instrument) => {
842                    instruments.push(instrument);
843                }
844                Err(e) => {
845                    log::error!("Failed to parse instrument {ticker}: {e}");
846                }
847            }
848        }
849
850        if skipped_inactive > 0 {
851            log::info!(
852                "Parsed {} instruments, skipped {} inactive",
853                instruments.len(),
854                skipped_inactive
855            );
856        } else {
857            log::debug!("Parsed {} instruments", instruments.len());
858        }
859
860        Ok(instruments)
861    }
862
863    /// Fetches instruments from the API and caches them.
864    ///
865    /// This is a convenience method that fetches instruments and populates both
866    /// the symbol-based and CLOB pair ID-based caches.
867    ///
868    /// On success, existing caches are cleared and repopulated atomically.
869    /// On failure, existing caches are preserved (no partial updates).
870    ///
871    /// # Errors
872    ///
873    /// Returns an error if the HTTP request fails.
874    pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
875        // Fetch first - preserve existing cache on network failure
876        let markets_response = self.inner.get_markets().await?;
877        let ts_init = self.generate_ts_init();
878
879        let mut parsed_instruments = Vec::new();
880        let mut parsed_markets = Vec::new();
881        let mut skipped_inactive = 0;
882
883        for (ticker, market) in markets_response.markets {
884            if !super::parse::is_market_active(&market.status) {
885                log::debug!(
886                    "Skipping inactive market {ticker} (status: {:?})",
887                    market.status
888                );
889                skipped_inactive += 1;
890                continue;
891            }
892
893            match super::parse::parse_instrument_any(&market, None, None, ts_init) {
894                Ok(instrument) => {
895                    parsed_instruments.push(instrument);
896                    parsed_markets.push(market);
897                }
898                Err(e) => {
899                    log::error!("Failed to parse instrument {ticker}: {e}");
900                }
901            }
902        }
903
904        // Only clear and repopulate cache after successful fetch and parse
905        self.instrument_cache.clear();
906
907        // Zip instruments with their market data for bulk insert
908        let items: Vec<_> = parsed_instruments.into_iter().zip(parsed_markets).collect();
909
910        if !items.is_empty() {
911            self.instrument_cache.insert_many(items.clone());
912        }
913
914        let count = items.len();
915
916        if skipped_inactive > 0 {
917            log::info!("Cached {count} instruments, skipped {skipped_inactive} inactive");
918        } else {
919            log::info!("Cached {count} instruments");
920        }
921
922        Ok(())
923    }
924
925    /// Fetches a single instrument by ticker and caches it.
926    ///
927    /// # Errors
928    ///
929    /// Returns an error if the HTTP request fails.
930    pub async fn fetch_and_cache_single_instrument(
931        &self,
932        ticker: &str,
933    ) -> anyhow::Result<Option<InstrumentAny>> {
934        let markets_response = self.inner.get_market(ticker).await?;
935        let ts_init = self.generate_ts_init();
936
937        // The API returns all markets if ticker not found, so check specifically
938        if let Some(market) = markets_response.markets.get(ticker) {
939            if !super::parse::is_market_active(&market.status) {
940                log::debug!(
941                    "Skipping inactive market {ticker} (status: {:?})",
942                    market.status
943                );
944                return Ok(None);
945            }
946
947            let instrument = parse_instrument_any(market, None, None, ts_init)?;
948            self.instrument_cache
949                .insert(instrument.clone(), market.clone());
950
951            log::info!("Fetched and cached new instrument: {ticker}");
952            return Ok(Some(instrument));
953        }
954
955        Ok(None)
956    }
957
958    /// Caches multiple instruments (symbol lookup only).
959    ///
960    /// Use `fetch_and_cache_instruments()` for full caching with market params.
961    /// Any existing instruments with the same symbols will be replaced.
962    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
963        self.instrument_cache.insert_instruments_only(instruments);
964    }
965
966    /// Caches a single instrument (symbol lookup only).
967    ///
968    /// Use `fetch_and_cache_instruments()` for full caching with market params.
969    /// Any existing instrument with the same symbol will be replaced.
970    pub fn cache_instrument(&self, instrument: InstrumentAny) {
971        self.instrument_cache.insert_instrument_only(instrument);
972    }
973
974    /// Gets an instrument from the cache by InstrumentId.
975    #[must_use]
976    pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
977        self.instrument_cache.get(instrument_id)
978    }
979
980    /// Gets an instrument by CLOB pair ID.
981    ///
982    /// Only works for instruments cached via `fetch_and_cache_instruments()`.
983    #[must_use]
984    pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
985        self.instrument_cache.get_by_clob_id(clob_pair_id)
986    }
987
988    /// Gets an instrument by market ticker (e.g., "BTC-USD").
989    ///
990    /// Only works for instruments cached via `fetch_and_cache_instruments()`.
991    #[must_use]
992    pub fn get_instrument_by_market(&self, ticker: &str) -> Option<InstrumentAny> {
993        self.instrument_cache.get_by_market(ticker)
994    }
995
996    /// Gets market parameters for order submission from the cached market data.
997    ///
998    /// Returns the quantization parameters needed by OrderBuilder to construct
999    /// properly formatted orders for the dYdX v4 protocol.
1000    ///
1001    /// # Errors
1002    ///
1003    /// Returns None if the instrument is not found in the market params cache.
1004    #[must_use]
1005    pub fn get_market_params(
1006        &self,
1007        instrument_id: &InstrumentId,
1008    ) -> Option<super::models::PerpetualMarket> {
1009        self.instrument_cache.get_market_params(instrument_id)
1010    }
1011
1012    /// Requests historical trades for a symbol.
1013    ///
1014    /// Fetches trade data from the dYdX Indexer API's `/v4/trades/perpetualMarket/:ticker` endpoint.
1015    /// Results are ordered by creation time descending (newest first).
1016    ///
1017    /// # Errors
1018    ///
1019    /// Returns an error if the HTTP request fails or response cannot be parsed.
1020    pub async fn request_trades(
1021        &self,
1022        symbol: &str,
1023        limit: Option<u32>,
1024        starting_before_or_at_height: Option<u64>,
1025    ) -> anyhow::Result<super::models::TradesResponse> {
1026        self.inner
1027            .get_trades(symbol, limit, starting_before_or_at_height)
1028            .await
1029            .map_err(Into::into)
1030    }
1031
1032    /// Requests historical candles for a symbol.
1033    ///
1034    /// Fetches candle data from the dYdX Indexer API's `/v4/candles/perpetualMarkets/:ticker` endpoint.
1035    /// Results are ordered by start time ascending (oldest first).
1036    ///
1037    /// # Errors
1038    ///
1039    /// Returns an error if the HTTP request fails or response cannot be parsed.
1040    pub async fn request_candles(
1041        &self,
1042        symbol: &str,
1043        resolution: DydxCandleResolution,
1044        limit: Option<u32>,
1045        from_iso: Option<DateTime<Utc>>,
1046        to_iso: Option<DateTime<Utc>>,
1047    ) -> anyhow::Result<super::models::CandlesResponse> {
1048        self.inner
1049            .get_candles(symbol, resolution, limit, from_iso, to_iso)
1050            .await
1051            .map_err(Into::into)
1052    }
1053
1054    /// Requests historical bars for an instrument with optional pagination.
1055    ///
1056    /// Fetches candle data from the dYdX Indexer API and converts to Nautilus
1057    /// `Bar` objects. Supports time-chunked pagination for large date ranges.
1058    ///
1059    /// The resolution is derived internally from `bar_type` (no need to pass
1060    /// `DydxCandleResolution`). Incomplete bars (where `ts_event >= now`) are
1061    /// filtered out.
1062    ///
1063    /// Results are returned in chronological order (oldest first).
1064    ///
1065    /// # Errors
1066    ///
1067    /// Returns an error if:
1068    /// - The bar type uses unsupported aggregation/price type.
1069    /// - The HTTP request fails or response cannot be parsed.
1070    /// - The instrument is not found in the cache.
1071    pub async fn request_bars(
1072        &self,
1073        bar_type: BarType,
1074        start: Option<DateTime<Utc>>,
1075        end: Option<DateTime<Utc>>,
1076        limit: Option<u32>,
1077        timestamp_on_close: bool,
1078    ) -> anyhow::Result<Vec<Bar>> {
1079        let resolution = bar_type_to_resolution(&bar_type)?;
1080        let instrument_id = bar_type.instrument_id();
1081
1082        let instrument = self
1083            .get_instrument(&instrument_id)
1084            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1085
1086        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1087        let price_precision = instrument.price_precision();
1088        let size_precision = instrument.size_precision();
1089        let ts_init = self.generate_ts_init();
1090
1091        let mut all_bars: Vec<Bar> = Vec::new();
1092
1093        // Determine bar duration in seconds for pagination chunking
1094        let spec = bar_type.spec();
1095        let bar_secs: i64 = match spec.aggregation {
1096            BarAggregation::Minute => spec.step.get() as i64 * 60,
1097            BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1098            BarAggregation::Day => spec.step.get() as i64 * 86_400,
1099            _ => anyhow::bail!("Unsupported aggregation: {:?}", spec.aggregation),
1100        };
1101
1102        match (start, end) {
1103            // Time-chunked pagination for date ranges
1104            (Some(range_start), Some(range_end)) if range_end > range_start => {
1105                let overall_limit = limit.unwrap_or(u32::MAX);
1106                let mut remaining = overall_limit;
1107                let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1108                let chunk_duration = chrono::Duration::seconds(bar_secs * bars_per_call as i64);
1109                let mut chunk_start = range_start;
1110
1111                while chunk_start < range_end && remaining > 0 {
1112                    let chunk_end = (chunk_start + chunk_duration).min(range_end);
1113                    let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1114
1115                    let response = self
1116                        .inner
1117                        .get_candles(
1118                            ticker,
1119                            resolution,
1120                            Some(per_call_limit),
1121                            Some(chunk_start),
1122                            Some(chunk_end),
1123                        )
1124                        .await?;
1125
1126                    let count = response.candles.len() as u32;
1127                    if count == 0 {
1128                        break;
1129                    }
1130
1131                    for candle in &response.candles {
1132                        match super::parse::parse_bar(
1133                            candle,
1134                            bar_type,
1135                            price_precision,
1136                            size_precision,
1137                            timestamp_on_close,
1138                            ts_init,
1139                        ) {
1140                            Ok(bar) => all_bars.push(bar),
1141                            Err(e) => log::warn!("Failed to parse candle for {instrument_id}: {e}"),
1142                        }
1143                    }
1144
1145                    if remaining <= count {
1146                        break;
1147                    }
1148                    remaining -= count;
1149                    chunk_start += chunk_duration;
1150                }
1151            }
1152            // Single request (no date range or invalid range)
1153            _ => {
1154                let req_limit = limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1155                let response = self
1156                    .inner
1157                    .get_candles(ticker, resolution, Some(req_limit), None, None)
1158                    .await?;
1159
1160                for candle in &response.candles {
1161                    match super::parse::parse_bar(
1162                        candle,
1163                        bar_type,
1164                        price_precision,
1165                        size_precision,
1166                        timestamp_on_close,
1167                        ts_init,
1168                    ) {
1169                        Ok(bar) => all_bars.push(bar),
1170                        Err(e) => log::warn!("Failed to parse candle for {instrument_id}: {e}"),
1171                    }
1172                }
1173            }
1174        }
1175
1176        // Filter incomplete bars (ts_event >= current time)
1177        let current_time_ns = self.generate_ts_init();
1178        all_bars.retain(|bar| bar.ts_event < current_time_ns);
1179
1180        Ok(all_bars)
1181    }
1182
1183    /// Requests historical trade ticks for an instrument with optional pagination.
1184    ///
1185    /// Fetches trade data from the dYdX Indexer API and converts them to Nautilus
1186    /// `TradeTick` objects. Supports cursor-based pagination using block height
1187    /// and client-side time filtering (the dYdX API has no timestamp filter).
1188    ///
1189    /// Results are returned in chronological order (oldest first).
1190    ///
1191    /// # Errors
1192    ///
1193    /// Returns an error if the HTTP request fails, response cannot be parsed,
1194    /// or the instrument is not found in the cache.
1195    ///
1196    /// # Panics
1197    ///
1198    /// This function will panic if the API returns a non-empty trades response
1199    /// but `last()` on the trades vector returns `None` (should never happen).
1200    pub async fn request_trade_ticks(
1201        &self,
1202        instrument_id: InstrumentId,
1203        start: Option<DateTime<Utc>>,
1204        end: Option<DateTime<Utc>>,
1205        limit: Option<u32>,
1206    ) -> anyhow::Result<Vec<TradeTick>> {
1207        const DYDX_MAX_TRADES_PER_REQUEST: u32 = 1_000;
1208
1209        // Validation
1210        if let (Some(s), Some(e)) = (start, end) {
1211            anyhow::ensure!(s < e, "start ({s}) must be before end ({e})");
1212        }
1213
1214        let instrument = self
1215            .get_instrument(&instrument_id)
1216            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1217
1218        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1219        let price_precision = instrument.price_precision();
1220        let size_precision = instrument.size_precision();
1221        let ts_init = self.generate_ts_init();
1222
1223        // We always start pagination from the chain head (cursor = None). An earlier
1224        // version used `DEFAULT_BLOCK_TIME_SECS` with `get_height()` to skip directly
1225        // to an estimated target block, but any hardcoded block-time estimate that
1226        // underestimates the true average lands the cursor BEFORE the real `end`
1227        // block and silently drops the trades in the skipped window. Walking back
1228        // from head costs a few extra round-trips for stale `end` times but is
1229        // always correct. Per-call trades above `end` are filtered inside the loop.
1230        let overall_limit = limit.unwrap_or(u32::MAX);
1231        let mut remaining = overall_limit;
1232        let mut cursor_height: Option<u64> = None;
1233        let mut all_trades = Vec::new();
1234        // Global trade-id dedup across pages. Using a set prevents non-adjacent duplicates
1235        // from slipping past the legacy Vec::dedup_by adjacency check.
1236        let mut seen_trade_ids: ahash::AHashSet<String> = ahash::AHashSet::new();
1237
1238        loop {
1239            let page_limit = remaining.min(DYDX_MAX_TRADES_PER_REQUEST);
1240            let response = self
1241                .inner
1242                .get_trades(ticker, Some(page_limit), cursor_height)
1243                .await?;
1244
1245            let page_count = response.trades.len() as u32;
1246            if page_count == 0 {
1247                break;
1248            }
1249
1250            // Trades come newest-first; oldest is last
1251            let oldest_trade = response.trades.last().unwrap();
1252            let oldest_height = oldest_trade.created_at_height;
1253            let oldest_created_at = oldest_trade.created_at;
1254
1255            // Count how many unique (unseen) trades this page contributed
1256            let mut new_trades_this_page: usize = 0;
1257            let mut page_before_start = false;
1258
1259            for trade in &response.trades {
1260                if !seen_trade_ids.insert(trade.id.clone()) {
1261                    // Already emitted; skip
1262                    continue;
1263                }
1264
1265                if start.is_some_and(|s| trade.created_at < s) {
1266                    page_before_start = true;
1267                    continue;
1268                }
1269
1270                if end.is_some_and(|e| trade.created_at > e) {
1271                    continue;
1272                }
1273
1274                all_trades.push(super::parse::parse_trade_tick(
1275                    trade,
1276                    instrument_id,
1277                    price_precision,
1278                    size_precision,
1279                    ts_init,
1280                )?);
1281                new_trades_this_page += 1;
1282            }
1283
1284            // If the oldest trade is before the start boundary we're done
1285            if let Some(s) = start
1286                && oldest_created_at < s
1287            {
1288                let _ = page_before_start;
1289                break;
1290            }
1291
1292            // Advance the cursor by one block. `createdBeforeOrAtHeight` is an inclusive
1293            // upper bound, and the endpoint has no `after`/offset cursor, so keeping the
1294            // same height would re-request the same page. Any same-block trades that
1295            // overflowed the previous page are lost here; the dYdX venue tops out well
1296            // below `DYDX_MAX_TRADES_PER_REQUEST` trades per block in practice. The
1297            // `saturating_sub(1)` bottoms out at 0, which the `page_count == 0` guard at
1298            // the top of the loop handles.
1299            let next_cursor = Some(oldest_height.saturating_sub(1));
1300
1301            // Terminal guard: if we're already at block 0 and this page produced nothing
1302            // new, there is nowhere further back to paginate.
1303            if oldest_height == 0 && new_trades_this_page == 0 {
1304                break;
1305            }
1306            cursor_height = next_cursor;
1307
1308            remaining = remaining.saturating_sub(new_trades_this_page as u32);
1309
1310            // Break on partial page (no more data) or limit reached
1311            if page_count < page_limit || remaining == 0 {
1312                break;
1313            }
1314        }
1315
1316        // Reverse to chronological order (oldest first)
1317        all_trades.reverse();
1318
1319        // Truncate to requested limit
1320        if let Some(lim) = limit {
1321            all_trades.truncate(lim as usize);
1322        }
1323
1324        Ok(all_trades)
1325    }
1326
1327    /// Requests historical funding rates for an instrument.
1328    ///
1329    /// Fetches funding rate data from the dYdX Indexer API's
1330    /// `/v4/historicalFunding/:ticker` endpoint and converts them to Nautilus
1331    /// `FundingRateUpdate` objects.
1332    ///
1333    /// Results are returned in chronological order (oldest first).
1334    ///
1335    /// # Errors
1336    ///
1337    /// Returns an error if the HTTP request fails or response cannot be parsed.
1338    pub async fn request_funding_rates(
1339        &self,
1340        instrument_id: InstrumentId,
1341        start: Option<DateTime<Utc>>,
1342        end: Option<DateTime<Utc>>,
1343        limit: Option<u32>,
1344    ) -> anyhow::Result<Vec<FundingRateUpdate>> {
1345        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1346        let ts_init = self.generate_ts_init();
1347
1348        let response = self
1349            .inner
1350            .get_historical_funding(ticker, limit, None, end)
1351            .await?;
1352
1353        let mut rates = Vec::with_capacity(response.historical_funding.len());
1354
1355        for entry in &response.historical_funding {
1356            // Filter by start time if specified
1357            if start.is_some_and(|s| entry.effective_at < s) {
1358                continue;
1359            }
1360
1361            let ts_event =
1362                UnixNanos::from(entry.effective_at.timestamp_nanos_opt().ok_or_else(|| {
1363                    anyhow::anyhow!("Timestamp overflow for {}", entry.effective_at)
1364                })? as u64);
1365
1366            rates.push(FundingRateUpdate::new(
1367                instrument_id,
1368                entry.rate,
1369                Some(60),
1370                None,
1371                ts_event,
1372                ts_init,
1373            ));
1374        }
1375
1376        // dYdX returns newest first; reverse to chronological order
1377        rates.reverse();
1378
1379        log::info!("Fetched {} funding rates for {instrument_id}", rates.len(),);
1380
1381        Ok(rates)
1382    }
1383
1384    /// Requests an order book snapshot for a symbol.
1385    ///
1386    /// Fetches order book data from the dYdX Indexer API and converts it to Nautilus
1387    /// `OrderBookDeltas`. The snapshot is represented as a sequence of deltas starting
1388    /// with a CLEAR action followed by ADD actions for each level.
1389    ///
1390    /// # Errors
1391    ///
1392    /// Returns an error if the HTTP request fails, response cannot be parsed,
1393    /// or the instrument is not found in the cache.
1394    pub async fn request_orderbook_snapshot(
1395        &self,
1396        instrument_id: InstrumentId,
1397    ) -> anyhow::Result<OrderBookDeltas> {
1398        let instrument = self
1399            .get_instrument(&instrument_id)
1400            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1401
1402        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1403        let response = self.inner.get_orderbook(ticker).await?;
1404
1405        let ts_init = self.generate_ts_init();
1406        let snapshot_flag = RecordFlag::F_SNAPSHOT as u8;
1407
1408        let mut deltas = Vec::with_capacity(1 + response.bids.len() + response.asks.len());
1409
1410        // Empty book snapshot: Clear alone must carry F_SNAPSHOT | F_LAST
1411        if response.bids.is_empty() && response.asks.is_empty() {
1412            let mut clear_delta = OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init);
1413            clear_delta.flags = snapshot_flag | RecordFlag::F_LAST as u8;
1414            deltas.push(clear_delta);
1415            return Ok(OrderBookDeltas::new(instrument_id, deltas));
1416        }
1417
1418        let mut clear_delta = OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init);
1419        clear_delta.flags = snapshot_flag;
1420        deltas.push(clear_delta);
1421
1422        for (i, level) in response.bids.iter().enumerate() {
1423            let is_last = i == response.bids.len() - 1 && response.asks.is_empty();
1424            let flags = if is_last {
1425                snapshot_flag | RecordFlag::F_LAST as u8
1426            } else {
1427                snapshot_flag
1428            };
1429
1430            let order = BookOrder::new(
1431                NautilusOrderSide::Buy,
1432                Price::from_decimal_dp(level.price, instrument.price_precision())?,
1433                Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1434                0,
1435            );
1436
1437            deltas.push(OrderBookDelta::new(
1438                instrument_id,
1439                BookAction::Add,
1440                order,
1441                flags,
1442                0,
1443                ts_init,
1444                ts_init,
1445            ));
1446        }
1447
1448        for (i, level) in response.asks.iter().enumerate() {
1449            let is_last = i == response.asks.len() - 1;
1450            let flags = if is_last {
1451                snapshot_flag | RecordFlag::F_LAST as u8
1452            } else {
1453                snapshot_flag
1454            };
1455
1456            let order = BookOrder::new(
1457                NautilusOrderSide::Sell,
1458                Price::from_decimal_dp(level.price, instrument.price_precision())?,
1459                Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1460                0,
1461            );
1462
1463            deltas.push(OrderBookDelta::new(
1464                instrument_id,
1465                BookAction::Add,
1466                order,
1467                flags,
1468                0,
1469                ts_init,
1470                ts_init,
1471            ));
1472        }
1473
1474        Ok(OrderBookDeltas::new(instrument_id, deltas))
1475    }
1476
1477    /// Exposes raw HTTP client for testing and advanced use cases.
1478    ///
1479    /// This provides access to the underlying [`DydxRawHttpClient`] for cases
1480    /// where low-level API access is needed. Most users should use the domain
1481    /// client methods instead.
1482    #[must_use]
1483    pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
1484        &self.inner
1485    }
1486
1487    /// Returns `true` if this client is configured for testnet.
1488    #[must_use]
1489    pub fn is_testnet(&self) -> bool {
1490        self.inner.is_testnet()
1491    }
1492
1493    /// Returns the base URL used by this client.
1494    #[must_use]
1495    pub fn base_url(&self) -> &str {
1496        self.inner.base_url()
1497    }
1498
1499    /// Returns `true` if the instrument cache has been initialized.
1500    #[must_use]
1501    pub fn is_cache_initialized(&self) -> bool {
1502        self.instrument_cache.is_initialized()
1503    }
1504
1505    /// Returns the number of instruments currently cached.
1506    #[must_use]
1507    pub fn cached_instruments_count(&self) -> usize {
1508        self.instrument_cache.len()
1509    }
1510
1511    /// Returns a reference to the shared instrument cache.
1512    ///
1513    /// The cache provides lookups by symbol, market ticker, and clob_pair_id.
1514    #[must_use]
1515    pub fn instrument_cache(&self) -> &Arc<InstrumentCache> {
1516        &self.instrument_cache
1517    }
1518
1519    /// Returns all cached instruments.
1520    ///
1521    /// This is a convenience method that collects all instruments into a Vec.
1522    #[must_use]
1523    pub fn all_instruments(&self) -> Vec<InstrumentAny> {
1524        self.instrument_cache.all_instruments()
1525    }
1526
1527    /// Returns all cached instrument IDs.
1528    #[must_use]
1529    pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
1530        self.instrument_cache.all_instrument_ids()
1531    }
1532
1533    fn generate_ts_init(&self) -> UnixNanos {
1534        self.clock.get_time_ns()
1535    }
1536
1537    /// Requests order status reports for a subaccount.
1538    ///
1539    /// Fetches orders from the dYdX Indexer API and converts them to Nautilus
1540    /// `OrderStatusReport` objects.
1541    ///
1542    /// # Errors
1543    ///
1544    /// Returns an error if the HTTP request fails or parsing fails.
1545    pub async fn request_order_status_reports(
1546        &self,
1547        address: &str,
1548        subaccount_number: u32,
1549        account_id: AccountId,
1550        instrument_id: Option<InstrumentId>,
1551    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1552        let ts_init = self.generate_ts_init();
1553
1554        // Convert instrument_id to market filter
1555        let market = instrument_id.map(|id| {
1556            let symbol = id.symbol.to_string();
1557            // Remove -PERP suffix if present to get the dYdX market format (e.g., ETH-USD)
1558            symbol.trim_end_matches("-PERP").to_string()
1559        });
1560
1561        let orders = self
1562            .inner
1563            .get_orders(address, subaccount_number, market.as_deref(), None)
1564            .await?;
1565
1566        let mut reports = Vec::new();
1567
1568        for order in orders {
1569            // Get instrument by clob_pair_id
1570            let instrument = match self.get_instrument_by_clob_id(order.clob_pair_id) {
1571                Some(inst) => inst,
1572                None => {
1573                    log::warn!(
1574                        "Skipping order {}: no cached instrument for clob_pair_id {}",
1575                        order.id,
1576                        order.clob_pair_id
1577                    );
1578                    continue;
1579                }
1580            };
1581
1582            // Filter by instrument_id if specified
1583            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1584                continue;
1585            }
1586
1587            match super::parse::parse_order_status_report(&order, &instrument, account_id, ts_init)
1588            {
1589                Ok(report) => reports.push(report),
1590                Err(e) => {
1591                    log::warn!("Failed to parse order {}: {e}", order.id);
1592                }
1593            }
1594        }
1595
1596        Ok(reports)
1597    }
1598
1599    /// Requests fill reports for a subaccount.
1600    ///
1601    /// Fetches fills from the dYdX Indexer API and converts them to Nautilus
1602    /// `FillReport` objects.
1603    ///
1604    /// # Errors
1605    ///
1606    /// Returns an error if the HTTP request fails or parsing fails.
1607    pub async fn request_fill_reports(
1608        &self,
1609        address: &str,
1610        subaccount_number: u32,
1611        account_id: AccountId,
1612        instrument_id: Option<InstrumentId>,
1613    ) -> anyhow::Result<Vec<FillReport>> {
1614        let ts_init = self.generate_ts_init();
1615
1616        // Convert instrument_id to market filter
1617        let market = instrument_id.map(|id| {
1618            let symbol = id.symbol.to_string();
1619            symbol.trim_end_matches("-PERP").to_string()
1620        });
1621
1622        let fills_response = self
1623            .inner
1624            .get_fills(address, subaccount_number, market.as_deref(), None)
1625            .await?;
1626
1627        let mut reports = Vec::new();
1628
1629        for fill in fills_response.fills {
1630            // Get instrument by market ticker (e.g., "BTC-USD")
1631            let instrument = match self.get_instrument_by_market(&fill.market) {
1632                Some(inst) => inst,
1633                None => {
1634                    log::warn!(
1635                        "Skipping fill {}: no cached instrument for market {}",
1636                        fill.id,
1637                        fill.market
1638                    );
1639                    continue;
1640                }
1641            };
1642
1643            // Filter by instrument_id if specified
1644            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1645                continue;
1646            }
1647
1648            match super::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1649                Ok(report) => reports.push(report),
1650                Err(e) => {
1651                    log::warn!("Failed to parse fill {}: {e}", fill.id);
1652                }
1653            }
1654        }
1655
1656        Ok(reports)
1657    }
1658
1659    /// Requests position status reports for a subaccount.
1660    ///
1661    /// Fetches positions from the dYdX Indexer API and converts them to Nautilus
1662    /// `PositionStatusReport` objects.
1663    ///
1664    /// # Errors
1665    ///
1666    /// Returns an error if the HTTP request fails or parsing fails.
1667    pub async fn request_position_status_reports(
1668        &self,
1669        address: &str,
1670        subaccount_number: u32,
1671        account_id: AccountId,
1672        instrument_id: Option<InstrumentId>,
1673    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1674        let ts_init = self.generate_ts_init();
1675
1676        let subaccount_response = self
1677            .inner
1678            .get_subaccount(address, subaccount_number)
1679            .await?;
1680
1681        let mut reports = Vec::new();
1682
1683        for (market, position) in subaccount_response.subaccount.open_perpetual_positions {
1684            // Get instrument by market ticker (e.g., "BTC-USD")
1685            let instrument = match self.get_instrument_by_market(&market) {
1686                Some(inst) => inst,
1687                None => {
1688                    log::warn!("Skipping position: no cached instrument for market {market}");
1689                    continue;
1690                }
1691            };
1692
1693            // Filter by instrument_id if specified
1694            if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1695                continue;
1696            }
1697
1698            match super::parse::parse_position_status_report(
1699                &position,
1700                &instrument,
1701                account_id,
1702                ts_init,
1703            ) {
1704                Ok(report) => reports.push(report),
1705                Err(e) => {
1706                    log::warn!("Failed to parse position for {market}: {e}");
1707                }
1708            }
1709        }
1710
1711        Ok(reports)
1712    }
1713
1714    /// Requests account state for a subaccount.
1715    ///
1716    /// Fetches the subaccount from the dYdX Indexer API and converts it to a Nautilus
1717    /// `AccountState` with balances and margin calculations.
1718    ///
1719    /// # Errors
1720    ///
1721    /// Returns an error if the HTTP request fails or parsing fails.
1722    pub async fn request_account_state(
1723        &self,
1724        address: &str,
1725        subaccount_number: u32,
1726        account_id: AccountId,
1727    ) -> anyhow::Result<AccountState> {
1728        let ts_init = self.generate_ts_init();
1729        let subaccount_response = self
1730            .inner
1731            .get_subaccount(address, subaccount_number)
1732            .await?;
1733
1734        // Build instruments map from cache
1735        let instruments: HashMap<InstrumentId, InstrumentAny> = self
1736            .instrument_cache
1737            .all_instruments()
1738            .into_iter()
1739            .map(|inst| (inst.id(), inst))
1740            .collect();
1741
1742        // Use current oracle prices from instrument cache (updated via WS)
1743        let oracle_prices = self.instrument_cache.to_oracle_prices_map();
1744
1745        parse_account_state_from_http(
1746            &subaccount_response.subaccount,
1747            account_id,
1748            &instruments,
1749            &oracle_prices,
1750            ts_init,
1751            ts_init,
1752        )
1753    }
1754}
1755
1756#[cfg(test)]
1757mod tests {
1758    use axum::{Router, routing::get};
1759    use nautilus_model::identifiers::{Symbol, Venue};
1760    use rstest::rstest;
1761
1762    use super::*;
1763    use crate::http::error;
1764
1765    #[tokio::test]
1766    async fn test_raw_client_creation() {
1767        let client = DydxRawHttpClient::new(None, 30, None, DydxNetwork::Mainnet, None);
1768        assert!(client.is_ok());
1769
1770        let client = client.unwrap();
1771        assert!(!client.is_testnet());
1772        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1773    }
1774
1775    #[tokio::test]
1776    async fn test_raw_client_testnet() {
1777        let client = DydxRawHttpClient::new(None, 30, None, DydxNetwork::Testnet, None);
1778        assert!(client.is_ok());
1779
1780        let client = client.unwrap();
1781        assert!(client.is_testnet());
1782        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1783    }
1784
1785    #[tokio::test]
1786    async fn test_domain_client_creation() {
1787        let client = DydxHttpClient::new(None, 30, None, DydxNetwork::Mainnet, None);
1788        assert!(client.is_ok());
1789
1790        let client = client.unwrap();
1791        assert!(!client.is_testnet());
1792        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1793        assert!(!client.is_cache_initialized());
1794        assert_eq!(client.cached_instruments_count(), 0);
1795    }
1796
1797    #[tokio::test]
1798    async fn test_domain_client_testnet() {
1799        let client = DydxHttpClient::new(None, 30, None, DydxNetwork::Testnet, None);
1800        assert!(client.is_ok());
1801
1802        let client = client.unwrap();
1803        assert!(client.is_testnet());
1804        assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1805    }
1806
1807    #[tokio::test]
1808    async fn test_domain_client_default() {
1809        let client = DydxHttpClient::default();
1810        assert!(!client.is_testnet());
1811        assert_eq!(client.base_url(), DYDX_HTTP_URL);
1812        assert!(!client.is_cache_initialized());
1813    }
1814
1815    #[tokio::test]
1816    async fn test_domain_client_clone() {
1817        let client = DydxHttpClient::new(None, 30, None, DydxNetwork::Mainnet, None).unwrap();
1818
1819        // Clone before initialization
1820        let cloned = client.clone();
1821        assert!(!cloned.is_cache_initialized());
1822
1823        client.instrument_cache.insert_instruments_only(vec![]);
1824
1825        // Clone after initialization
1826        #[expect(clippy::redundant_clone)]
1827        let cloned_after = client.clone();
1828        assert!(cloned_after.is_cache_initialized());
1829    }
1830
1831    #[rstest]
1832    fn test_domain_client_get_instrument_not_found() {
1833        let client = DydxHttpClient::default();
1834        let instrument_id = InstrumentId::new(Symbol::new("ETH-USD-PERP"), Venue::new("DYDX"));
1835        let result = client.get_instrument(&instrument_id);
1836        assert!(result.is_none());
1837    }
1838
1839    #[tokio::test]
1840    async fn test_http_timeout_respects_configuration_and_does_not_block() {
1841        use tokio::net::TcpListener;
1842
1843        async fn slow_handler() -> &'static str {
1844            // Sleep longer than the configured HTTP timeout.
1845            tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1846            "ok"
1847        }
1848
1849        let router = Router::new().route("/v4/slow", get(slow_handler));
1850
1851        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1852        let addr = listener.local_addr().unwrap();
1853
1854        tokio::spawn(async move {
1855            axum::serve(listener, router.into_make_service())
1856                .await
1857                .unwrap();
1858        });
1859
1860        let base_url = format!("http://{addr}");
1861
1862        // Configure a small operation timeout and no retries so the request
1863        // fails quickly even though the handler sleeps for 5 seconds.
1864        let retry_config = RetryConfig {
1865            max_retries: 0,
1866            initial_delay_ms: 0,
1867            max_delay_ms: 0,
1868            backoff_factor: 1.0,
1869            jitter_ms: 0,
1870            operation_timeout_ms: Some(500),
1871            immediate_first: true,
1872            max_elapsed_ms: Some(1_000),
1873        };
1874
1875        // Keep HTTP client timeout at a typical value; rely on RetryManager
1876        // operation timeout to enforce non-blocking behavior.
1877        let client = DydxRawHttpClient::new(
1878            Some(base_url),
1879            60,
1880            None,
1881            DydxNetwork::Mainnet,
1882            Some(retry_config),
1883        )
1884        .unwrap();
1885
1886        let start = std::time::Instant::now();
1887        let result: Result<serde_json::Value, error::DydxHttpError> =
1888            client.send_request(Method::GET, "/v4/slow", None).await;
1889        let elapsed = start.elapsed();
1890
1891        // Request should fail (timeout or client error), but without blocking the thread
1892        // for the full handler duration.
1893        assert!(result.is_err());
1894        assert!(elapsed < std::time::Duration::from_secs(3));
1895    }
1896}