Skip to main content

nautilus_deribit/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Deribit HTTP client implementation.
17
18use std::{
19    collections::HashMap,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, AtomicU64, Ordering},
23    },
24};
25
26use ahash::AHashSet;
27use chrono::{DateTime, Utc};
28use nautilus_core::{
29    AtomicMap, AtomicTime, datetime::nanos_to_millis, nanos::UnixNanos,
30    time::get_atomic_clock_realtime,
31};
32use nautilus_model::{
33    data::{Bar, BarType, TradeTick},
34    enums::{AggregationSource, BarAggregation},
35    events::AccountState,
36    identifiers::{AccountId, InstrumentId},
37    instruments::{Instrument, InstrumentAny},
38    orderbook::OrderBook,
39    reports::{FillReport, OrderStatusReport, PositionStatusReport},
40};
41use nautilus_network::{
42    http::{HttpClient, Method},
43    ratelimiter::quota::Quota,
44    retry::{RetryConfig, RetryManager},
45};
46use serde::{Serialize, de::DeserializeOwned};
47use strum::IntoEnumIterator;
48use tokio_util::sync::CancellationToken;
49use ustr::Ustr;
50
51use super::{
52    error::DeribitHttpError,
53    models::{
54        DeribitAccountSummariesResponse, DeribitBookSummary, DeribitCurrency, DeribitInstrument,
55        DeribitJsonRpcRequest, DeribitJsonRpcResponse, DeribitPosition, DeribitProductType,
56        DeribitTicker, DeribitUserTradesResponse,
57    },
58    query::{
59        GetAccountSummariesParams, GetBookSummaryByCurrencyParams, GetInstrumentParams,
60        GetInstrumentsParams, GetOpenOrdersByInstrumentParams, GetOpenOrdersParams,
61        GetOrderHistoryByCurrencyParams, GetOrderHistoryByInstrumentParams, GetOrderStateParams,
62        GetPositionsParams, GetTickerParams, GetUserTradesByCurrencyAndTimeParams,
63        GetUserTradesByInstrumentAndTimeParams,
64    },
65};
66use crate::{
67    common::{
68        consts::{
69            DERIBIT_ACCOUNT_RATE_KEY, DERIBIT_API_PATH, DERIBIT_GLOBAL_RATE_KEY,
70            DERIBIT_HTTP_ACCOUNT_QUOTA, DERIBIT_HTTP_ORDER_QUOTA, DERIBIT_HTTP_REST_QUOTA,
71            DERIBIT_ORDER_RATE_KEY, JSONRPC_VERSION, should_retry_error_code,
72        },
73        credential::{Credential, credential_env_vars},
74        enums::DeribitEnvironment,
75        parse::{
76            extract_server_timestamp, parse_account_state, parse_bars,
77            parse_deribit_instrument_any, parse_order_book, parse_trade_tick,
78        },
79        urls::get_http_base_url,
80    },
81    http::{
82        models::{DeribitOrderBook, DeribitTradesResponse, DeribitTradingViewChartData},
83        query::{
84            GetLastTradesByInstrumentAndTimeParams, GetOrderBookParams,
85            GetTradingViewChartDataParams,
86        },
87    },
88    websocket::{
89        messages::{DeribitOrderMsg, DeribitUserTradeMsg},
90        parse::{parse_position_status_report, parse_user_order_msg, parse_user_trade_msg},
91    },
92};
93
94/// Maximum number of trades per request for Deribit's historical trades API.
95/// Deribit's default is 10 which is insufficient for most use cases.
96/// The API maximum is 1000.
97pub const DERIBIT_HISTORICAL_TRADES_MAX_COUNT: u32 = 1000;
98
99// Dedup and cursor state for timestamp-based trade pagination.
100// Deribit provides no offset cursor, so when multiple trades share
101// one millisecond we use trade-ID dedup to avoid reprocessing.
102// If an entire page contains only seen IDs we advance past that
103// millisecond, which can skip trades when >1000 share one timestamp.
104struct TradePaginator {
105    seen_ids: AHashSet<String>,
106    cursor: i64,
107    end: i64,
108}
109
110impl TradePaginator {
111    fn new(start: i64, end: i64) -> Self {
112        Self {
113            seen_ids: AHashSet::new(),
114            cursor: start,
115            end,
116        }
117    }
118
119    // Returns indices of new (unseen) items and advances the cursor.
120    // Returns None when the page is empty (pagination should stop).
121    fn advance(
122        &mut self,
123        ids: &[String],
124        timestamps: &[i64],
125        has_more: bool,
126    ) -> Option<Vec<usize>> {
127        if ids.is_empty() {
128            return None;
129        }
130
131        let prev_seen = self.seen_ids.len();
132        let mut new_indices = Vec::new();
133        let mut last_ts = self.cursor;
134
135        for (i, id) in ids.iter().enumerate() {
136            last_ts = timestamps[i];
137
138            if self.seen_ids.insert(id.clone()) {
139                new_indices.push(i);
140            }
141        }
142
143        if !has_more {
144            return Some(new_indices);
145        }
146
147        let new_count = self.seen_ids.len() - prev_seen;
148
149        if new_count == 0 {
150            self.cursor = last_ts + 1;
151        } else {
152            self.cursor = last_ts;
153        }
154
155        Some(new_indices)
156    }
157
158    // Strict greater-than so pages at exactly end_ms are still
159    // fetched (Deribit treats start_timestamp as inclusive).
160    fn is_exhausted(&self) -> bool {
161        self.cursor > self.end
162    }
163
164    fn reset(&mut self, start: i64) {
165        self.seen_ids.clear();
166        self.cursor = start;
167    }
168}
169
170/// Low-level Deribit HTTP client for raw API operations.
171///
172/// This client handles JSON-RPC 2.0 protocol, request signing, rate limiting,
173/// and retry logic. It returns venue-specific response types.
174#[derive(Debug)]
175pub struct DeribitRawHttpClient {
176    base_url: String,
177    client: HttpClient,
178    credential: Option<Credential>,
179    retry_manager: RetryManager<DeribitHttpError>,
180    cancellation_token: CancellationToken,
181    request_id: AtomicU64,
182}
183
184impl DeribitRawHttpClient {
185    /// Creates a new [`DeribitRawHttpClient`].
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if the HTTP client cannot be created.
190    pub fn new(
191        base_url: Option<String>,
192        environment: DeribitEnvironment,
193        timeout_secs: u64,
194        max_retries: u32,
195        retry_delay_ms: u64,
196        retry_delay_max_ms: u64,
197        proxy_url: Option<String>,
198    ) -> Result<Self, DeribitHttpError> {
199        let base_url = base_url
200            .unwrap_or_else(|| format!("{}{}", get_http_base_url(environment), DERIBIT_API_PATH));
201        let retry_config = RetryConfig {
202            max_retries,
203            initial_delay_ms: retry_delay_ms,
204            max_delay_ms: retry_delay_max_ms,
205            backoff_factor: 2.0,
206            jitter_ms: 1000,
207            operation_timeout_ms: Some(60_000),
208            immediate_first: false,
209            max_elapsed_ms: Some(180_000),
210        };
211
212        let retry_manager = RetryManager::new(retry_config);
213
214        Ok(Self {
215            base_url,
216            client: HttpClient::new(
217                HashMap::new(),
218                Vec::new(),
219                Self::rate_limiter_quotas(),
220                Some(*DERIBIT_HTTP_REST_QUOTA),
221                Some(timeout_secs),
222                proxy_url,
223            )
224            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
225            credential: None,
226            retry_manager,
227            cancellation_token: CancellationToken::new(),
228            request_id: AtomicU64::new(1),
229        })
230    }
231
232    /// Get the cancellation token for this client.
233    pub fn cancellation_token(&self) -> &CancellationToken {
234        &self.cancellation_token
235    }
236
237    /// Returns whether this client is connected to testnet.
238    #[must_use]
239    pub fn is_testnet(&self) -> bool {
240        self.base_url.contains("test.")
241    }
242
243    /// Returns the rate limiter quotas for the HTTP client.
244    ///
245    /// Quotas are organized by:
246    /// - Global: Overall rate limit for all requests
247    /// - Orders: Matching engine operations (buy, sell, cancel, etc.)
248    /// - Account: Account information endpoints
249    fn rate_limiter_quotas() -> Vec<(String, Quota)> {
250        vec![
251            (
252                DERIBIT_GLOBAL_RATE_KEY.to_string(),
253                *DERIBIT_HTTP_REST_QUOTA,
254            ),
255            (
256                DERIBIT_ORDER_RATE_KEY.to_string(),
257                *DERIBIT_HTTP_ORDER_QUOTA,
258            ),
259            (
260                DERIBIT_ACCOUNT_RATE_KEY.to_string(),
261                *DERIBIT_HTTP_ACCOUNT_QUOTA,
262            ),
263        ]
264    }
265
266    /// Returns rate limit keys for a given RPC method.
267    ///
268    /// Maps Deribit JSON-RPC methods to appropriate rate limit buckets.
269    fn rate_limit_keys(method: &str) -> Vec<String> {
270        let mut keys = vec![DERIBIT_GLOBAL_RATE_KEY.to_string()];
271
272        // Categorize by method type
273        if Self::is_order_method(method) {
274            keys.push(DERIBIT_ORDER_RATE_KEY.to_string());
275        } else if Self::is_account_method(method) {
276            keys.push(DERIBIT_ACCOUNT_RATE_KEY.to_string());
277        }
278
279        // Add method-specific key
280        keys.push(format!("deribit:{method}"));
281
282        keys
283    }
284
285    /// Returns true if the method is an order operation (matching engine).
286    fn is_order_method(method: &str) -> bool {
287        matches!(
288            method,
289            "private/buy"
290                | "private/sell"
291                | "private/edit"
292                | "private/cancel"
293                | "private/cancel_all"
294                | "private/cancel_all_by_currency"
295                | "private/cancel_all_by_instrument"
296                | "private/cancel_by_label"
297                | "private/close_position"
298        )
299    }
300
301    /// Returns true if the method accesses account information.
302    fn is_account_method(method: &str) -> bool {
303        matches!(
304            method,
305            "private/get_account_summaries"
306                | "private/get_account_summary"
307                | "private/get_positions"
308                | "private/get_position"
309                | "private/get_open_orders_by_currency"
310                | "private/get_open_orders_by_instrument"
311                | "private/get_order_state"
312                | "private/get_user_trades_by_currency"
313                | "private/get_user_trades_by_instrument"
314        )
315    }
316
317    /// Creates a new [`DeribitRawHttpClient`] with explicit credentials.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if the HTTP client cannot be created.
322    #[expect(clippy::too_many_arguments)]
323    pub fn with_credentials(
324        api_key: String,
325        api_secret: String,
326        base_url: Option<String>,
327        environment: DeribitEnvironment,
328        timeout_secs: u64,
329        max_retries: u32,
330        retry_delay_ms: u64,
331        retry_delay_max_ms: u64,
332        proxy_url: Option<String>,
333    ) -> Result<Self, DeribitHttpError> {
334        let base_url = base_url
335            .unwrap_or_else(|| format!("{}{}", get_http_base_url(environment), DERIBIT_API_PATH));
336        let retry_config = RetryConfig {
337            max_retries,
338            initial_delay_ms: retry_delay_ms,
339            max_delay_ms: retry_delay_max_ms,
340            backoff_factor: 2.0,
341            jitter_ms: 1000,
342            operation_timeout_ms: Some(60_000),
343            immediate_first: false,
344            max_elapsed_ms: Some(180_000),
345        };
346
347        let retry_manager = RetryManager::new(retry_config);
348        let credential = Credential::new(api_key, api_secret);
349
350        Ok(Self {
351            base_url,
352            client: HttpClient::new(
353                HashMap::new(),
354                Vec::new(),
355                Self::rate_limiter_quotas(),
356                Some(*DERIBIT_HTTP_REST_QUOTA),
357                Some(timeout_secs),
358                proxy_url,
359            )
360            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
361            credential: Some(credential),
362            retry_manager,
363            cancellation_token: CancellationToken::new(),
364            request_id: AtomicU64::new(1),
365        })
366    }
367
368    /// Creates a new [`DeribitRawHttpClient`] with credentials from environment variables.
369    ///
370    /// If `api_key` or `api_secret` are not provided, they will be loaded from environment:
371    /// - Mainnet: `DERIBIT_API_KEY`, `DERIBIT_API_SECRET`
372    /// - Testnet: `DERIBIT_TESTNET_API_KEY`, `DERIBIT_TESTNET_API_SECRET`
373    ///
374    /// # Errors
375    ///
376    /// Returns an error if:
377    /// - The HTTP client cannot be created
378    /// - Credentials are not provided and environment variables are not set
379    #[expect(clippy::too_many_arguments)]
380    pub fn new_with_env(
381        api_key: Option<String>,
382        api_secret: Option<String>,
383        base_url: Option<String>,
384        environment: DeribitEnvironment,
385        timeout_secs: u64,
386        max_retries: u32,
387        retry_delay_ms: u64,
388        retry_delay_max_ms: u64,
389        proxy_url: Option<String>,
390    ) -> Result<Self, DeribitHttpError> {
391        // Determine environment variable names based on environment
392        let (key_env, secret_env) = credential_env_vars(environment);
393
394        // Resolve credentials from explicit params or environment
395        let api_key = nautilus_core::env::get_or_env_var_opt(api_key, key_env);
396        let api_secret = nautilus_core::env::get_or_env_var_opt(api_secret, secret_env);
397
398        // If credentials were resolved, create authenticated client
399        if let (Some(key), Some(secret)) = (api_key, api_secret) {
400            Self::with_credentials(
401                key,
402                secret,
403                base_url,
404                environment,
405                timeout_secs,
406                max_retries,
407                retry_delay_ms,
408                retry_delay_max_ms,
409                proxy_url,
410            )
411        } else {
412            // No credentials - create unauthenticated client
413            Self::new(
414                base_url,
415                environment,
416                timeout_secs,
417                max_retries,
418                retry_delay_ms,
419                retry_delay_max_ms,
420                proxy_url,
421            )
422        }
423    }
424
425    /// Sends a JSON-RPC 2.0 request to the Deribit API.
426    async fn send_request<T, P>(
427        &self,
428        method: &str,
429        params: P,
430        authenticate: bool,
431    ) -> Result<DeribitJsonRpcResponse<T>, DeribitHttpError>
432    where
433        T: DeserializeOwned,
434        P: Serialize,
435    {
436        // Create operation identifier combining URL and RPC method
437        let operation_id = format!("{}#{}", self.base_url, method);
438        let params_clone = serde_json::to_value(&params)?;
439
440        let operation = || {
441            let method = method.to_string();
442            let params_clone = params_clone.clone();
443
444            async move {
445                // Build JSON-RPC request
446                let id = self.request_id.fetch_add(1, Ordering::SeqCst);
447                let request = DeribitJsonRpcRequest {
448                    jsonrpc: JSONRPC_VERSION,
449                    id,
450                    method: method.clone(),
451                    params: params_clone.clone(),
452                };
453
454                let body = serde_json::to_vec(&request)?;
455
456                // Build headers
457                let mut headers = HashMap::new();
458                headers.insert("Content-Type".to_string(), "application/json".to_string());
459
460                // Add authentication headers if required
461                if authenticate {
462                    let credentials = self
463                        .credential
464                        .as_ref()
465                        .ok_or(DeribitHttpError::MissingCredentials)?;
466                    let auth_headers = credentials.sign_auth_headers("POST", "/api/v2", &body)?;
467                    headers.extend(auth_headers);
468                }
469
470                let rate_limit_keys = Self::rate_limit_keys(&method);
471                let resp = self
472                    .client
473                    .request(
474                        Method::POST,
475                        self.base_url.clone(),
476                        None,
477                        Some(headers),
478                        Some(body),
479                        None,
480                        Some(rate_limit_keys),
481                    )
482                    .await
483                    .map_err(|e| DeribitHttpError::NetworkError(e.to_string()))?;
484
485                // Parse JSON-RPC response
486                // Note: Deribit may return JSON-RPC errors with non-2xx HTTP status (e.g., 400)
487                // Always try to parse as JSON-RPC first, then fall back to HTTP error handling
488
489                // Try to parse as JSON first
490                let json_value: serde_json::Value = match serde_json::from_slice(&resp.body) {
491                    Ok(json) => json,
492                    Err(_) => {
493                        // Not valid JSON - treat as HTTP error
494                        let error_body = String::from_utf8_lossy(&resp.body);
495                        log::error!(
496                            "Non-JSON response: method={method}, status={}, body={error_body}",
497                            resp.status.as_u16()
498                        );
499                        return Err(DeribitHttpError::UnexpectedStatus {
500                            status: resp.status.as_u16(),
501                            body: error_body.to_string(),
502                        });
503                    }
504                };
505
506                // Try to parse as JSON-RPC response
507                let json_rpc_response: DeribitJsonRpcResponse<T> =
508                    serde_json::from_value(json_value.clone()).map_err(|e| {
509                        log::error!(
510                            "Failed to deserialize Deribit JSON-RPC response: method={method}, status={}, error={e}",
511                            resp.status.as_u16()
512                        );
513                        log::debug!(
514                            "Response JSON (first 2000 chars): {}",
515                            &json_value
516                                .to_string()
517                                .chars()
518                                .take(2000)
519                                .collect::<String>()
520                        );
521                        DeribitHttpError::JsonError(e.to_string())
522                    })?;
523
524                // Check if it's a success or error result
525                if json_rpc_response.result.is_some() {
526                    Ok(json_rpc_response)
527                } else if let Some(error) = &json_rpc_response.error {
528                    // JSON-RPC error (may come with any HTTP status)
529                    log::warn!(
530                        "Deribit RPC error response: method={method}, http_status={}, error_code={}, error_message={}, error_data={:?}",
531                        resp.status.as_u16(),
532                        error.code,
533                        error.message,
534                        error.data
535                    );
536
537                    // Map JSON-RPC error to appropriate error variant
538                    Err(DeribitHttpError::from_jsonrpc_error(
539                        error.code,
540                        error.message.clone(),
541                        error.data.as_ref(),
542                    ))
543                } else {
544                    log::error!(
545                        "Response contains neither result nor error field: method={method}, status={}, request_id={:?}",
546                        resp.status.as_u16(),
547                        json_rpc_response.id
548                    );
549                    Err(DeribitHttpError::JsonError(
550                        "Response contains neither result nor error".to_string(),
551                    ))
552                }
553            }
554        };
555
556        // Retry strategy based on Deribit error responses and HTTP status codes:
557        //
558        // 1. Network errors: always retry (transient connection issues)
559        // 2. HTTP 5xx/429: server errors and rate limiting should be retried
560        // 3. Deribit-specific retryable error codes (defined in common::consts)
561        //
562        // Note: Deribit returns many permanent errors which should NOT be retried
563        // (e.g., "invalid_credentials", "not_enough_funds", "order_not_found")
564        let should_retry = |error: &DeribitHttpError| -> bool {
565            match error {
566                DeribitHttpError::NetworkError(_) => true,
567                DeribitHttpError::UnexpectedStatus { status, .. } => {
568                    *status >= 500 || *status == 429
569                }
570                DeribitHttpError::DeribitError { error_code, .. } => {
571                    should_retry_error_code(*error_code)
572                }
573                _ => false,
574            }
575        };
576
577        let create_error = |msg: String| -> DeribitHttpError {
578            if msg == "canceled" {
579                DeribitHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
580            } else {
581                DeribitHttpError::NetworkError(msg)
582            }
583        };
584
585        self.retry_manager
586            .execute_with_retry_with_cancel(
587                &operation_id,
588                operation,
589                should_retry,
590                create_error,
591                &self.cancellation_token,
592            )
593            .await
594    }
595
596    /// Gets available trading instruments.
597    ///
598    /// # Errors
599    ///
600    /// Returns an error if the request fails or the response cannot be parsed.
601    pub async fn get_instruments(
602        &self,
603        params: GetInstrumentsParams,
604    ) -> Result<DeribitJsonRpcResponse<Vec<DeribitInstrument>>, DeribitHttpError> {
605        self.send_request("public/get_instruments", params, false)
606            .await
607    }
608
609    /// Gets details for a specific trading instrument.
610    ///
611    /// # Errors
612    ///
613    /// Returns an error if the request fails or the response cannot be parsed.
614    pub async fn get_instrument(
615        &self,
616        params: GetInstrumentParams,
617    ) -> Result<DeribitJsonRpcResponse<DeribitInstrument>, DeribitHttpError> {
618        self.send_request("public/get_instrument", params, false)
619            .await
620    }
621
622    /// Gets recent trades for an instrument within a time range.
623    ///
624    /// # Errors
625    ///
626    /// Returns an error if the request fails or the response cannot be parsed.
627    pub async fn get_last_trades_by_instrument_and_time(
628        &self,
629        params: GetLastTradesByInstrumentAndTimeParams,
630    ) -> Result<DeribitJsonRpcResponse<DeribitTradesResponse>, DeribitHttpError> {
631        self.send_request(
632            "public/get_last_trades_by_instrument_and_time",
633            params,
634            false,
635        )
636        .await
637    }
638
639    /// Gets TradingView chart data (OHLCV) for an instrument.
640    ///
641    /// # Errors
642    ///
643    /// Returns an error if the request fails or the response cannot be parsed.
644    pub async fn get_tradingview_chart_data(
645        &self,
646        params: GetTradingViewChartDataParams,
647    ) -> Result<DeribitJsonRpcResponse<DeribitTradingViewChartData>, DeribitHttpError> {
648        self.send_request("public/get_tradingview_chart_data", params, false)
649            .await
650    }
651
652    /// Gets account summaries for all currencies.
653    ///
654    /// # Errors
655    ///
656    /// Returns an error if:
657    /// - Credentials are missing ([`DeribitHttpError::MissingCredentials`])
658    /// - Authentication fails (invalid signature, expired timestamp)
659    /// - The request fails or the response cannot be parsed
660    pub async fn get_account_summaries(
661        &self,
662        params: GetAccountSummariesParams,
663    ) -> Result<DeribitJsonRpcResponse<DeribitAccountSummariesResponse>, DeribitHttpError> {
664        self.send_request("private/get_account_summaries", params, true)
665            .await
666    }
667
668    /// Gets order book for an instrument.
669    ///
670    /// # Errors
671    ///
672    /// Returns an error if the request fails or the response cannot be parsed.
673    pub async fn get_order_book(
674        &self,
675        params: GetOrderBookParams,
676    ) -> Result<DeribitJsonRpcResponse<DeribitOrderBook>, DeribitHttpError> {
677        self.send_request("public/get_order_book", params, false)
678            .await
679    }
680
681    /// Gets a single order by its ID.
682    ///
683    /// # Errors
684    ///
685    /// Returns an error if:
686    /// - Credentials are missing ([`DeribitHttpError::MissingCredentials`])
687    /// - Authentication fails (invalid signature, expired timestamp)
688    /// - The request fails or the response cannot be parsed
689    pub async fn get_order_state(
690        &self,
691        params: GetOrderStateParams,
692    ) -> Result<DeribitJsonRpcResponse<DeribitOrderMsg>, DeribitHttpError> {
693        self.send_request("private/get_order_state", params, true)
694            .await
695    }
696
697    /// Gets all open orders across all currencies and instruments.
698    ///
699    /// # Errors
700    ///
701    /// Returns an error if:
702    /// - Credentials are missing ([`DeribitHttpError::MissingCredentials`])
703    /// - Authentication fails (invalid signature, expired timestamp)
704    /// - The request fails or the response cannot be parsed
705    pub async fn get_open_orders(
706        &self,
707        params: GetOpenOrdersParams,
708    ) -> Result<DeribitJsonRpcResponse<Vec<DeribitOrderMsg>>, DeribitHttpError> {
709        self.send_request("private/get_open_orders", params, true)
710            .await
711    }
712
713    /// Gets open orders for a specific instrument.
714    ///
715    /// # Errors
716    ///
717    /// Returns an error if:
718    /// - Credentials are missing ([`DeribitHttpError::MissingCredentials`])
719    /// - Authentication fails (invalid signature, expired timestamp)
720    /// - The request fails or the response cannot be parsed
721    pub async fn get_open_orders_by_instrument(
722        &self,
723        params: GetOpenOrdersByInstrumentParams,
724    ) -> Result<DeribitJsonRpcResponse<Vec<DeribitOrderMsg>>, DeribitHttpError> {
725        self.send_request("private/get_open_orders_by_instrument", params, true)
726            .await
727    }
728
729    /// Gets historical orders for a specific instrument.
730    ///
731    /// # Errors
732    ///
733    /// Returns an error if:
734    /// - Credentials are missing ([`DeribitHttpError::MissingCredentials`])
735    /// - Authentication fails (invalid signature, expired timestamp)
736    /// - The request fails or the response cannot be parsed
737    pub async fn get_order_history_by_instrument(
738        &self,
739        params: GetOrderHistoryByInstrumentParams,
740    ) -> Result<DeribitJsonRpcResponse<Vec<DeribitOrderMsg>>, DeribitHttpError> {
741        self.send_request("private/get_order_history_by_instrument", params, true)
742            .await
743    }
744
745    /// Gets historical orders for a specific currency.
746    ///
747    /// # Errors
748    ///
749    /// Returns an error if:
750    /// - Credentials are missing ([`DeribitHttpError::MissingCredentials`])
751    /// - Authentication fails (invalid signature, expired timestamp)
752    /// - The request fails or the response cannot be parsed
753    pub async fn get_order_history_by_currency(
754        &self,
755        params: GetOrderHistoryByCurrencyParams,
756    ) -> Result<DeribitJsonRpcResponse<Vec<DeribitOrderMsg>>, DeribitHttpError> {
757        self.send_request("private/get_order_history_by_currency", params, true)
758            .await
759    }
760
761    /// Gets user trades for a specific instrument within a time range.
762    ///
763    /// # Errors
764    ///
765    /// Returns an error if:
766    /// - Credentials are missing ([`DeribitHttpError::MissingCredentials`])
767    /// - Authentication fails (invalid signature, expired timestamp)
768    /// - The request fails or the response cannot be parsed
769    pub async fn get_user_trades_by_instrument_and_time(
770        &self,
771        params: GetUserTradesByInstrumentAndTimeParams,
772    ) -> Result<DeribitJsonRpcResponse<DeribitUserTradesResponse>, DeribitHttpError> {
773        self.send_request(
774            "private/get_user_trades_by_instrument_and_time",
775            params,
776            true,
777        )
778        .await
779    }
780
781    /// Gets user trades for a specific currency within a time range.
782    ///
783    /// # Errors
784    ///
785    /// Returns an error if:
786    /// - Credentials are missing ([`DeribitHttpError::MissingCredentials`])
787    /// - Authentication fails (invalid signature, expired timestamp)
788    /// - The request fails or the response cannot be parsed
789    pub async fn get_user_trades_by_currency_and_time(
790        &self,
791        params: GetUserTradesByCurrencyAndTimeParams,
792    ) -> Result<DeribitJsonRpcResponse<DeribitUserTradesResponse>, DeribitHttpError> {
793        self.send_request("private/get_user_trades_by_currency_and_time", params, true)
794            .await
795    }
796
797    /// Gets book summaries for all instruments of a given currency.
798    ///
799    /// # Errors
800    ///
801    /// Returns an error if the request fails or the response cannot be parsed.
802    pub async fn get_book_summary_by_currency(
803        &self,
804        params: GetBookSummaryByCurrencyParams,
805    ) -> Result<DeribitJsonRpcResponse<Vec<DeribitBookSummary>>, DeribitHttpError> {
806        self.send_request("public/get_book_summary_by_currency", params, false)
807            .await
808    }
809
810    /// Gets ticker data for a single instrument.
811    ///
812    /// # Errors
813    ///
814    /// Returns an error if the request fails or the response cannot be parsed.
815    pub async fn get_ticker(
816        &self,
817        params: GetTickerParams,
818    ) -> Result<DeribitJsonRpcResponse<DeribitTicker>, DeribitHttpError> {
819        self.send_request("public/ticker", params, false).await
820    }
821
822    /// Gets positions for a specific currency.
823    ///
824    /// # Errors
825    ///
826    /// Returns an error if:
827    /// - Credentials are missing ([`DeribitHttpError::MissingCredentials`])
828    /// - Authentication fails (invalid signature, expired timestamp)
829    /// - The request fails or the response cannot be parsed
830    pub async fn get_positions(
831        &self,
832        params: GetPositionsParams,
833    ) -> Result<DeribitJsonRpcResponse<Vec<DeribitPosition>>, DeribitHttpError> {
834        self.send_request("private/get_positions", params, true)
835            .await
836    }
837}
838
839/// High-level Deribit HTTP client with domain-level abstractions.
840///
841/// This client wraps the raw HTTP client and provides methods that use Nautilus
842/// domain types. It maintains an instrument cache for efficient lookups.
843#[derive(Debug)]
844#[cfg_attr(
845    feature = "python",
846    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.deribit", from_py_object)
847)]
848#[cfg_attr(
849    feature = "python",
850    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.deribit")
851)]
852pub struct DeribitHttpClient {
853    pub(crate) inner: Arc<DeribitRawHttpClient>,
854    pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
855    clock: &'static AtomicTime,
856    cache_initialized: AtomicBool,
857}
858
859impl Clone for DeribitHttpClient {
860    fn clone(&self) -> Self {
861        let cache_initialized = AtomicBool::new(false);
862
863        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
864        if is_initialized {
865            cache_initialized.store(true, Ordering::Release);
866        }
867
868        Self {
869            inner: self.inner.clone(),
870            instruments_cache: self.instruments_cache.clone(),
871            cache_initialized,
872            clock: self.clock,
873        }
874    }
875}
876
877impl DeribitHttpClient {
878    /// Creates a new [`DeribitHttpClient`] with default configuration.
879    ///
880    /// # Parameters
881    /// - `base_url`: Optional custom base URL (for testing)
882    /// - `environment`: The Deribit environment to connect to
883    ///
884    /// # Errors
885    ///
886    /// Returns an error if the HTTP client cannot be created.
887    pub fn new(
888        base_url: Option<String>,
889        environment: DeribitEnvironment,
890        timeout_secs: u64,
891        max_retries: u32,
892        retry_delay_ms: u64,
893        retry_delay_max_ms: u64,
894        proxy_url: Option<String>,
895    ) -> anyhow::Result<Self> {
896        let raw_client = Arc::new(DeribitRawHttpClient::new(
897            base_url,
898            environment,
899            timeout_secs,
900            max_retries,
901            retry_delay_ms,
902            retry_delay_max_ms,
903            proxy_url,
904        )?);
905
906        Ok(Self {
907            inner: raw_client,
908            instruments_cache: Arc::new(AtomicMap::new()),
909            cache_initialized: AtomicBool::new(false),
910            clock: get_atomic_clock_realtime(),
911        })
912    }
913
914    /// Creates a new [`DeribitHttpClient`] with credentials from environment variables.
915    ///
916    /// If `api_key` or `api_secret` are not provided, they will be loaded from environment:
917    /// - Mainnet: `DERIBIT_API_KEY`, `DERIBIT_API_SECRET`
918    /// - Testnet: `DERIBIT_TESTNET_API_KEY`, `DERIBIT_TESTNET_API_SECRET`
919    ///
920    /// # Errors
921    ///
922    /// Returns an error if:
923    /// - The HTTP client cannot be created
924    /// - Credentials are not provided and environment variables are not set
925    #[expect(clippy::too_many_arguments)]
926    pub fn new_with_env(
927        api_key: Option<String>,
928        api_secret: Option<String>,
929        base_url: Option<String>,
930        environment: DeribitEnvironment,
931        timeout_secs: u64,
932        max_retries: u32,
933        retry_delay_ms: u64,
934        retry_delay_max_ms: u64,
935        proxy_url: Option<String>,
936    ) -> anyhow::Result<Self> {
937        let raw_client = Arc::new(DeribitRawHttpClient::new_with_env(
938            api_key,
939            api_secret,
940            base_url,
941            environment,
942            timeout_secs,
943            max_retries,
944            retry_delay_ms,
945            retry_delay_max_ms,
946            proxy_url,
947        )?);
948
949        Ok(Self {
950            inner: raw_client,
951            instruments_cache: Arc::new(AtomicMap::new()),
952            cache_initialized: AtomicBool::new(false),
953            clock: get_atomic_clock_realtime(),
954        })
955    }
956
957    /// Requests instruments for a specific currency.
958    ///
959    /// # Errors
960    ///
961    /// Returns an error if the request fails or instruments cannot be parsed.
962    pub async fn request_instruments(
963        &self,
964        currency: DeribitCurrency,
965        product_type: Option<DeribitProductType>,
966    ) -> anyhow::Result<Vec<InstrumentAny>> {
967        // Build parameters
968        let params = if let Some(pt) = product_type {
969            GetInstrumentsParams::with_kind(currency, pt)
970        } else {
971            GetInstrumentsParams::new(currency)
972        };
973
974        // Call raw client
975        let full_response = self.inner.get_instruments(params).await?;
976        let result = full_response
977            .result
978            .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
979        let ts_event = extract_server_timestamp(full_response.us_out)?;
980        let ts_init = self.generate_ts_init();
981
982        // Parse each instrument
983        let mut instruments = Vec::new();
984        let mut skipped_count = 0;
985        let mut error_count = 0;
986
987        for raw_instrument in result {
988            match parse_deribit_instrument_any(&raw_instrument, ts_init, ts_event) {
989                Ok(Some(instrument)) => {
990                    instruments.push(instrument);
991                }
992                Ok(None) => {
993                    // Unsupported instrument type (e.g., combos)
994                    skipped_count += 1;
995                    log::debug!(
996                        "Skipped unsupported instrument type: {} (kind: {:?})",
997                        raw_instrument.instrument_name,
998                        raw_instrument.kind
999                    );
1000                }
1001                Err(e) => {
1002                    error_count += 1;
1003                    log::warn!(
1004                        "Failed to parse instrument {}: {}",
1005                        raw_instrument.instrument_name,
1006                        e
1007                    );
1008                }
1009            }
1010        }
1011
1012        log::info!(
1013            "Parsed {} instruments ({} skipped, {} errors)",
1014            instruments.len(),
1015            skipped_count,
1016            error_count
1017        );
1018
1019        Ok(instruments)
1020    }
1021
1022    /// Requests a specific instrument by its Nautilus instrument ID.
1023    ///
1024    /// This is a high-level method that fetches the raw instrument data from Deribit
1025    /// and converts it to a Nautilus `InstrumentAny` type.
1026    ///
1027    /// # Errors
1028    ///
1029    /// Returns an error if:
1030    /// - The instrument name format is invalid (error code `-32602`)
1031    /// - The instrument doesn't exist (error code `13020`)
1032    /// - Network or API errors occur
1033    pub async fn request_instrument(
1034        &self,
1035        instrument_id: InstrumentId,
1036    ) -> anyhow::Result<InstrumentAny> {
1037        let params = GetInstrumentParams {
1038            instrument_name: instrument_id.symbol.to_string(),
1039        };
1040
1041        let full_response = self.inner.get_instrument(params).await?;
1042        let response = full_response
1043            .result
1044            .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1045        let ts_event = extract_server_timestamp(full_response.us_out)?;
1046        let ts_init = self.generate_ts_init();
1047
1048        match parse_deribit_instrument_any(&response, ts_init, ts_event)? {
1049            Some(instrument) => Ok(instrument),
1050            None => anyhow::bail!(
1051                "Unsupported instrument type: {} (kind: {:?})",
1052                response.instrument_name,
1053                response.kind
1054            ),
1055        }
1056    }
1057
1058    /// Requests historical trades for an instrument within a time range.
1059    ///
1060    /// Fetches trade ticks from Deribit and converts them to Nautilus [`TradeTick`] objects.
1061    ///
1062    /// # Arguments
1063    ///
1064    /// * `instrument_id` - The instrument to fetch trades for
1065    /// * `start` - Optional start time filter
1066    /// * `end` - Optional end time filter
1067    /// * `limit` - Optional limit on number of trades (max 1000)
1068    ///
1069    /// # Errors
1070    ///
1071    /// Returns an error if:
1072    /// - The request fails
1073    /// - Trade parsing fails
1074    ///
1075    /// # Pagination
1076    ///
1077    /// When `limit` is `None`, this function automatically paginates through all available
1078    /// trades in the time range using the `has_more` field from the API response.
1079    /// When `limit` is specified, pagination stops once that many trades are collected.
1080    pub async fn request_trades(
1081        &self,
1082        instrument_id: InstrumentId,
1083        start: Option<DateTime<Utc>>,
1084        end: Option<DateTime<Utc>>,
1085        limit: Option<u32>,
1086    ) -> anyhow::Result<Vec<TradeTick>> {
1087        // Get instrument from cache to determine precisions
1088        let (price_precision, size_precision) =
1089            if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
1090                (instrument.price_precision(), instrument.size_precision())
1091            } else {
1092                log::warn!("Instrument {instrument_id} not in cache, skipping trades request");
1093                anyhow::bail!("Instrument {instrument_id} not in cache");
1094            };
1095
1096        // Convert timestamps to milliseconds
1097        let now = Utc::now();
1098        let end_dt = end.unwrap_or(now);
1099        let start_dt = start.unwrap_or(end_dt - chrono::Duration::hours(1));
1100
1101        if let (Some(s), Some(e)) = (start, end) {
1102            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1103        }
1104
1105        let start_ms = start_dt.timestamp_millis();
1106        let end_ms = end_dt.timestamp_millis();
1107        let ts_init = self.generate_ts_init();
1108        let mut all_trades = Vec::new();
1109        let mut paginator = TradePaginator::new(start_ms, end_ms);
1110
1111        loop {
1112            let params = GetLastTradesByInstrumentAndTimeParams::new(
1113                instrument_id.symbol.to_string(),
1114                paginator.cursor,
1115                end_ms,
1116                Some(DERIBIT_HISTORICAL_TRADES_MAX_COUNT),
1117                Some("asc".to_string()),
1118            );
1119
1120            let full_response = self
1121                .inner
1122                .get_last_trades_by_instrument_and_time(params)
1123                .await
1124                .map_err(|e| anyhow::anyhow!(e))?;
1125
1126            let response_data = full_response
1127                .result
1128                .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1129
1130            let ids: Vec<String> = response_data
1131                .trades
1132                .iter()
1133                .map(|t| t.trade_id.clone())
1134                .collect();
1135            let timestamps: Vec<i64> = response_data.trades.iter().map(|t| t.timestamp).collect();
1136
1137            let Some(new_indices) = paginator.advance(&ids, &timestamps, response_data.has_more)
1138            else {
1139                break;
1140            };
1141
1142            for i in &new_indices {
1143                let raw_trade = &response_data.trades[*i];
1144
1145                match parse_trade_tick(
1146                    raw_trade,
1147                    instrument_id,
1148                    price_precision,
1149                    size_precision,
1150                    ts_init,
1151                ) {
1152                    Ok(trade) => {
1153                        all_trades.push(trade);
1154
1155                        if let Some(max) = limit
1156                            && all_trades.len() >= max as usize
1157                        {
1158                            return Ok(all_trades);
1159                        }
1160                    }
1161                    Err(e) => {
1162                        log::warn!(
1163                            "Failed to parse trade {} for {}: {}",
1164                            raw_trade.trade_id,
1165                            instrument_id,
1166                            e
1167                        );
1168                    }
1169                }
1170            }
1171
1172            if !response_data.has_more || paginator.is_exhausted() {
1173                break;
1174            }
1175        }
1176
1177        log::info!(
1178            "Fetched {} historical trades for {} from {} to {}",
1179            all_trades.len(),
1180            instrument_id,
1181            start_dt,
1182            end_dt
1183        );
1184
1185        Ok(all_trades)
1186    }
1187
1188    /// Requests historical bars (OHLCV) for an instrument.
1189    ///
1190    /// Uses the `public/get_tradingview_chart_data` endpoint to fetch candlestick data.
1191    ///
1192    /// # Errors
1193    ///
1194    /// Returns an error if:
1195    /// - Aggregation source is not EXTERNAL
1196    /// - Bar aggregation type is not supported by Deribit
1197    /// - The request fails or response cannot be parsed
1198    ///
1199    /// # Supported Resolutions
1200    ///
1201    /// Deribit supports: 1, 3, 5, 10, 15, 30, 60, 120, 180, 360, 720 minutes, and 1D (daily)
1202    pub async fn request_bars(
1203        &self,
1204        bar_type: BarType,
1205        start: Option<DateTime<Utc>>,
1206        end: Option<DateTime<Utc>>,
1207        limit: Option<u32>,
1208    ) -> anyhow::Result<Vec<Bar>> {
1209        anyhow::ensure!(
1210            bar_type.aggregation_source() == AggregationSource::External,
1211            "Only EXTERNAL aggregation is supported"
1212        );
1213
1214        let now = Utc::now();
1215
1216        // Default to last hour if no start/end provided
1217        let end_dt = end.unwrap_or(now);
1218        let start_dt = start.unwrap_or(end_dt - chrono::Duration::hours(1));
1219
1220        if let (Some(s), Some(e)) = (start, end) {
1221            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1222        }
1223
1224        // Convert BarType to Deribit resolution
1225        let spec = bar_type.spec();
1226        let step = spec.step.get();
1227        let resolution = match spec.aggregation {
1228            BarAggregation::Minute => format!("{step}"),
1229            BarAggregation::Hour => format!("{}", step * 60),
1230            BarAggregation::Day => "1D".to_string(),
1231            a => anyhow::bail!("Deribit does not support {a:?} aggregation"),
1232        };
1233
1234        // Validate resolution is supported by Deribit
1235        let supported_resolutions = [
1236            "1", "3", "5", "10", "15", "30", "60", "120", "180", "360", "720", "1D",
1237        ];
1238
1239        if !supported_resolutions.contains(&resolution.as_str()) {
1240            anyhow::bail!(
1241                "Deribit does not support resolution '{resolution}'. Supported: {supported_resolutions:?}"
1242            );
1243        }
1244
1245        let instrument_name = bar_type.instrument_id().symbol.to_string();
1246        let start_timestamp = start_dt.timestamp_millis();
1247        let end_timestamp = end_dt.timestamp_millis();
1248
1249        let params = GetTradingViewChartDataParams::new(
1250            instrument_name,
1251            start_timestamp,
1252            end_timestamp,
1253            resolution,
1254        );
1255
1256        let full_response = self.inner.get_tradingview_chart_data(params).await?;
1257        let chart_data = full_response
1258            .result
1259            .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1260
1261        if chart_data.status == "no_data" {
1262            log::debug!("No bar data returned for {bar_type}");
1263            return Ok(Vec::new());
1264        }
1265
1266        // Get instrument from cache to determine precisions
1267        let instrument_id = bar_type.instrument_id();
1268        let (price_precision, size_precision) =
1269            if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
1270                (instrument.price_precision(), instrument.size_precision())
1271            } else {
1272                log::warn!("Instrument {instrument_id} not in cache, skipping bars request");
1273                anyhow::bail!("Instrument {instrument_id} not in cache");
1274            };
1275
1276        let ts_init = self.generate_ts_init();
1277        let mut bars = parse_bars(
1278            &chart_data,
1279            bar_type,
1280            price_precision,
1281            size_precision,
1282            ts_init,
1283        )?;
1284
1285        if let Some(max) = limit {
1286            let max = max as usize;
1287            if bars.len() > max {
1288                bars.drain(..bars.len() - max);
1289            }
1290        }
1291
1292        log::info!("Parsed {} bars for {}", bars.len(), bar_type);
1293
1294        Ok(bars)
1295    }
1296
1297    /// Requests a snapshot of the order book for an instrument.
1298    ///
1299    /// Fetches the order book from Deribit and converts it to a Nautilus [`OrderBook`].
1300    ///
1301    /// # Arguments
1302    ///
1303    /// * `instrument_id` - The instrument to fetch the order book for
1304    /// * `depth` - Optional depth limit (valid values: 1, 5, 10, 20, 50, 100, 1000, 10000)
1305    ///
1306    /// # Errors
1307    ///
1308    /// Returns an error if:
1309    /// - The request fails
1310    /// - Order book parsing fails
1311    pub async fn request_book_snapshot(
1312        &self,
1313        instrument_id: InstrumentId,
1314        depth: Option<u32>,
1315    ) -> anyhow::Result<OrderBook> {
1316        let (price_precision, size_precision) =
1317            if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
1318                (instrument.price_precision(), instrument.size_precision())
1319            } else {
1320                anyhow::bail!("Instrument {instrument_id} not in cache");
1321            };
1322
1323        let params = GetOrderBookParams::new(instrument_id.symbol.to_string(), depth);
1324        let full_response = self
1325            .inner
1326            .get_order_book(params)
1327            .await
1328            .map_err(|e| anyhow::anyhow!(e))?;
1329
1330        let order_book_data = full_response
1331            .result
1332            .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1333
1334        let ts_init = self.generate_ts_init();
1335        let book = parse_order_book(
1336            &order_book_data,
1337            instrument_id,
1338            price_precision,
1339            size_precision,
1340            ts_init,
1341        )?;
1342
1343        log::info!(
1344            "Fetched order book for {} with {} bids and {} asks",
1345            instrument_id,
1346            order_book_data.bids.len(),
1347            order_book_data.asks.len()
1348        );
1349
1350        Ok(book)
1351    }
1352
1353    /// Requests account state for all currencies.
1354    ///
1355    /// Fetches account balance and margin information for all currencies from Deribit
1356    /// and converts it to Nautilus [`AccountState`] event.
1357    ///
1358    /// # Errors
1359    ///
1360    /// Returns an error if:
1361    /// - The request fails
1362    /// - Currency conversion fails
1363    pub async fn request_account_state(
1364        &self,
1365        account_id: AccountId,
1366    ) -> anyhow::Result<AccountState> {
1367        let params = GetAccountSummariesParams::default();
1368        let full_response = self
1369            .inner
1370            .get_account_summaries(params)
1371            .await
1372            .map_err(|e| anyhow::anyhow!(e))?;
1373        let response_data = full_response
1374            .result
1375            .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1376        let ts_init = self.generate_ts_init();
1377        let ts_event = extract_server_timestamp(full_response.us_out)?;
1378
1379        parse_account_state(&response_data.summaries, account_id, ts_init, ts_event)
1380    }
1381
1382    /// Generates a timestamp for initialization.
1383    fn generate_ts_init(&self) -> UnixNanos {
1384        self.clock.get_time_ns()
1385    }
1386
1387    /// Caches instruments for later retrieval.
1388    pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1389        self.instruments_cache.rcu(|m| {
1390            for inst in instruments {
1391                m.insert(inst.raw_symbol().inner(), inst.clone());
1392            }
1393        });
1394        self.cache_initialized.store(true, Ordering::Release);
1395    }
1396
1397    /// Retrieves a cached instrument by symbol.
1398    #[must_use]
1399    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1400        self.instruments_cache.get_cloned(symbol)
1401    }
1402
1403    /// Checks if the instrument cache has been initialized.
1404    #[must_use]
1405    pub fn is_cache_initialized(&self) -> bool {
1406        self.cache_initialized.load(Ordering::Acquire)
1407    }
1408
1409    /// Returns whether this client is connected to testnet.
1410    #[must_use]
1411    pub fn is_testnet(&self) -> bool {
1412        self.inner.is_testnet()
1413    }
1414
1415    /// Requests order status reports for reconciliation.
1416    ///
1417    /// Fetches order statuses from Deribit and converts them to Nautilus [`OrderStatusReport`].
1418    ///
1419    /// # Strategy
1420    /// - Uses `/private/get_open_orders` for all open orders (single efficient API call)
1421    /// - Uses `/private/get_open_orders_by_instrument` when specific instrument is provided
1422    /// - For historical orders (when `open_only=false`), iterates over currencies
1423    ///
1424    /// # Errors
1425    ///
1426    /// Returns an error if the request fails or parsing fails.
1427    pub async fn request_order_status_reports(
1428        &self,
1429        account_id: AccountId,
1430        instrument_id: Option<InstrumentId>,
1431        start: Option<UnixNanos>,
1432        end: Option<UnixNanos>,
1433        open_only: bool,
1434    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1435        let ts_init = self.generate_ts_init();
1436        let mut reports = Vec::new();
1437        let mut seen_order_ids = AHashSet::new();
1438
1439        let mut parse_and_add = |order: &DeribitOrderMsg| {
1440            let symbol = Ustr::from(&order.instrument_name);
1441            if let Some(instrument) = self.get_instrument(&symbol) {
1442                match parse_user_order_msg(order, &instrument, account_id, ts_init) {
1443                    Ok(report) => {
1444                        // Apply time range filter based on ts_last
1445                        let ts_last = report.ts_last;
1446                        let in_range = match (start, end) {
1447                            (Some(s), Some(e)) => ts_last >= s && ts_last <= e,
1448                            (Some(s), None) => ts_last >= s,
1449                            (None, Some(e)) => ts_last <= e,
1450                            (None, None) => true,
1451                        };
1452                        // Only deduplicate if in range (prevents dropping valid historical reports)
1453                        if in_range && seen_order_ids.insert(order.order_id.clone()) {
1454                            reports.push(report);
1455                        }
1456                    }
1457                    Err(e) => {
1458                        log::warn!(
1459                            "Failed to parse order {} for {}: {}",
1460                            order.order_id,
1461                            order.instrument_name,
1462                            e
1463                        );
1464                    }
1465                }
1466            } else {
1467                log::debug!(
1468                    "Skipping order {} - instrument {} not in cache",
1469                    order.order_id,
1470                    order.instrument_name
1471                );
1472            }
1473        };
1474
1475        if let Some(instrument_id) = instrument_id {
1476            // Use instrument-specific endpoint (efficient)
1477            let instrument_name = instrument_id.symbol.to_string();
1478
1479            // Get open orders for this instrument
1480            let open_params = GetOpenOrdersByInstrumentParams {
1481                instrument_name: instrument_name.clone(),
1482                r#type: None,
1483            };
1484
1485            if let Some(orders) = self
1486                .inner
1487                .get_open_orders_by_instrument(open_params)
1488                .await?
1489                .result
1490            {
1491                for order in &orders {
1492                    parse_and_add(order);
1493                }
1494            }
1495
1496            if !open_only {
1497                const PAGE_SIZE: u32 = 100;
1498                let mut offset: u32 = 0;
1499
1500                loop {
1501                    let history_params = GetOrderHistoryByInstrumentParams {
1502                        instrument_name: instrument_name.clone(),
1503                        count: Some(PAGE_SIZE),
1504                        offset: Some(offset),
1505                        include_old: Some(true),
1506                        include_unfilled: Some(true),
1507                    };
1508                    let orders = self
1509                        .inner
1510                        .get_order_history_by_instrument(history_params)
1511                        .await?
1512                        .result
1513                        .unwrap_or_default();
1514
1515                    let count = orders.len() as u32;
1516                    for order in &orders {
1517                        parse_and_add(order);
1518                    }
1519
1520                    if count < PAGE_SIZE {
1521                        break;
1522                    }
1523                    offset += count;
1524                }
1525            }
1526        } else {
1527            // Use get_open_orders for ALL open orders - single API call!
1528            let open_params = GetOpenOrdersParams::default();
1529            if let Some(orders) = self.inner.get_open_orders(open_params).await?.result {
1530                for order in &orders {
1531                    parse_and_add(order);
1532                }
1533            }
1534
1535            if !open_only {
1536                const PAGE_SIZE: u32 = 100;
1537
1538                for currency in DeribitCurrency::iter().filter(|c| *c != DeribitCurrency::ANY) {
1539                    let mut offset: u32 = 0;
1540
1541                    loop {
1542                        let history_params = GetOrderHistoryByCurrencyParams {
1543                            currency,
1544                            kind: None,
1545                            count: Some(PAGE_SIZE),
1546                            offset: Some(offset),
1547                            include_old: Some(true),
1548                            include_unfilled: Some(true),
1549                        };
1550                        let orders = self
1551                            .inner
1552                            .get_order_history_by_currency(history_params)
1553                            .await?
1554                            .result
1555                            .unwrap_or_default();
1556
1557                        let count = orders.len() as u32;
1558                        for order in &orders {
1559                            parse_and_add(order);
1560                        }
1561
1562                        if count < PAGE_SIZE {
1563                            break;
1564                        }
1565                        offset += count;
1566                    }
1567                }
1568            }
1569        }
1570
1571        log::debug!("Generated {} order status reports", reports.len());
1572        Ok(reports)
1573    }
1574
1575    /// Requests fill reports for reconciliation.
1576    ///
1577    /// Fetches user trades from Deribit and converts them to Nautilus [`FillReport`].
1578    /// Automatically paginates through all results using time-cursor advancement.
1579    ///
1580    /// # Strategy
1581    /// - Uses `/private/get_user_trades_by_instrument_and_time` when instrument is provided
1582    /// - Otherwise iterates over currencies using `/private/get_user_trades_by_currency_and_time`
1583    ///
1584    /// # Errors
1585    ///
1586    /// Returns an error if the request fails or parsing fails.
1587    pub async fn request_fill_reports(
1588        &self,
1589        account_id: AccountId,
1590        instrument_id: Option<InstrumentId>,
1591        start: Option<UnixNanos>,
1592        end: Option<UnixNanos>,
1593    ) -> anyhow::Result<Vec<FillReport>> {
1594        let ts_init = self.generate_ts_init();
1595        let now_ms = Utc::now().timestamp_millis();
1596
1597        // Convert UnixNanos to milliseconds for Deribit API
1598        let start_ms = start.map_or(0, |ns| nanos_to_millis(ns.as_u64()) as i64);
1599        let end_ms = end.map_or(now_ms, |ns| nanos_to_millis(ns.as_u64()) as i64);
1600        let mut reports = Vec::new();
1601
1602        // Helper closure to parse trade and add to reports
1603        let mut parse_and_add = |trade: &DeribitUserTradeMsg| {
1604            let symbol = Ustr::from(&trade.instrument_name);
1605            if let Some(instrument) = self.get_instrument(&symbol) {
1606                match parse_user_trade_msg(trade, &instrument, account_id, ts_init) {
1607                    Ok(report) => reports.push(report),
1608                    Err(e) => {
1609                        log::warn!(
1610                            "Failed to parse trade {} for {}: {}",
1611                            trade.trade_id,
1612                            trade.instrument_name,
1613                            e
1614                        );
1615                    }
1616                }
1617            } else {
1618                log::debug!(
1619                    "Skipping trade {} - instrument {} not in cache",
1620                    trade.trade_id,
1621                    trade.instrument_name
1622                );
1623            }
1624        };
1625
1626        let mut paginator = TradePaginator::new(start_ms, end_ms);
1627
1628        if let Some(instrument_id) = instrument_id {
1629            loop {
1630                let params = GetUserTradesByInstrumentAndTimeParams {
1631                    instrument_name: instrument_id.symbol.to_string(),
1632                    start_timestamp: paginator.cursor,
1633                    end_timestamp: end_ms,
1634                    count: Some(DERIBIT_HISTORICAL_TRADES_MAX_COUNT),
1635                    sorting: Some("asc".to_string()),
1636                };
1637                let response = self
1638                    .inner
1639                    .get_user_trades_by_instrument_and_time(params)
1640                    .await?;
1641
1642                let Some(data) = response.result else { break };
1643
1644                let ids: Vec<String> = data.trades.iter().map(|t| t.trade_id.clone()).collect();
1645                let timestamps: Vec<i64> = data.trades.iter().map(|t| t.timestamp as i64).collect();
1646
1647                let Some(new_indices) = paginator.advance(&ids, &timestamps, data.has_more) else {
1648                    break;
1649                };
1650
1651                for i in &new_indices {
1652                    parse_and_add(&data.trades[*i]);
1653                }
1654
1655                if !data.has_more || paginator.is_exhausted() {
1656                    break;
1657                }
1658            }
1659        } else {
1660            for currency in DeribitCurrency::iter().filter(|c| *c != DeribitCurrency::ANY) {
1661                paginator.reset(start_ms);
1662
1663                loop {
1664                    let params = GetUserTradesByCurrencyAndTimeParams {
1665                        currency,
1666                        start_timestamp: paginator.cursor,
1667                        end_timestamp: end_ms,
1668                        kind: None,
1669                        count: Some(DERIBIT_HISTORICAL_TRADES_MAX_COUNT),
1670                        sorting: Some("asc".to_string()),
1671                    };
1672                    let response = self
1673                        .inner
1674                        .get_user_trades_by_currency_and_time(params)
1675                        .await?;
1676
1677                    let Some(data) = response.result else { break };
1678
1679                    let ids: Vec<String> = data.trades.iter().map(|t| t.trade_id.clone()).collect();
1680                    let timestamps: Vec<i64> =
1681                        data.trades.iter().map(|t| t.timestamp as i64).collect();
1682
1683                    let Some(new_indices) = paginator.advance(&ids, &timestamps, data.has_more)
1684                    else {
1685                        break;
1686                    };
1687
1688                    for i in &new_indices {
1689                        parse_and_add(&data.trades[*i]);
1690                    }
1691
1692                    if !data.has_more || paginator.is_exhausted() {
1693                        break;
1694                    }
1695                }
1696            }
1697        }
1698
1699        log::debug!("Generated {} fill reports", reports.len());
1700        Ok(reports)
1701    }
1702
1703    /// Requests ticker data for a single instrument.
1704    ///
1705    /// Returns the `DeribitTicker` which includes `underlying_price` (forward price).
1706    ///
1707    /// # Errors
1708    ///
1709    /// Returns an error if the request fails.
1710    pub async fn request_ticker(&self, instrument_name: &str) -> anyhow::Result<DeribitTicker> {
1711        let params = GetTickerParams {
1712            instrument_name: instrument_name.to_string(),
1713        };
1714        let response = self
1715            .inner
1716            .get_ticker(params)
1717            .await
1718            .map_err(|e| anyhow::anyhow!(e))?;
1719        response
1720            .result
1721            .ok_or_else(|| anyhow::anyhow!("No result in ticker response"))
1722    }
1723
1724    /// Requests book summaries for options of a given currency.
1725    ///
1726    /// Returns raw `DeribitBookSummary` items which include `underlying_price`
1727    /// (the forward price) for each option instrument.
1728    ///
1729    /// # Errors
1730    ///
1731    /// Returns an error if the request fails.
1732    pub async fn request_book_summaries(
1733        &self,
1734        currency: &str,
1735    ) -> anyhow::Result<Vec<DeribitBookSummary>> {
1736        let params = GetBookSummaryByCurrencyParams::options(currency);
1737        let full_response = self
1738            .inner
1739            .get_book_summary_by_currency(params)
1740            .await
1741            .map_err(|e| anyhow::anyhow!(e))?;
1742        full_response
1743            .result
1744            .ok_or_else(|| anyhow::anyhow!("No result in book summary response"))
1745    }
1746
1747    /// Requests position status reports for reconciliation.
1748    ///
1749    /// Fetches positions from Deribit and converts them to Nautilus [`PositionStatusReport`].
1750    ///
1751    /// # Strategy
1752    /// - Uses `currency=any` to fetch all positions in one call
1753    /// - Filters by instrument_id if provided
1754    ///
1755    /// # Errors
1756    ///
1757    /// Returns an error if the request fails or parsing fails.
1758    pub async fn request_position_status_reports(
1759        &self,
1760        account_id: AccountId,
1761        instrument_id: Option<InstrumentId>,
1762    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1763        let ts_init = self.generate_ts_init();
1764        let mut reports = Vec::new();
1765
1766        // Use ANY to get all positions across all currencies in one call
1767        let params = GetPositionsParams {
1768            currency: DeribitCurrency::ANY,
1769            kind: None,
1770        };
1771
1772        if let Some(positions) = self.inner.get_positions(params).await?.result {
1773            for position in &positions {
1774                // Skip flat positions (size == 0)
1775                if position.size.is_zero() {
1776                    continue;
1777                }
1778
1779                let symbol = position.instrument_name;
1780                if let Some(instrument) = self.get_instrument(&symbol) {
1781                    let report =
1782                        parse_position_status_report(position, &instrument, account_id, ts_init);
1783                    reports.push(report);
1784                } else {
1785                    log::debug!(
1786                        "Skipping position - instrument {} not in cache",
1787                        position.instrument_name
1788                    );
1789                }
1790            }
1791        }
1792
1793        // Filter by instrument if provided
1794        if let Some(instrument_id) = instrument_id {
1795            reports.retain(|r| r.instrument_id == instrument_id);
1796        }
1797
1798        log::debug!("Generated {} position status reports", reports.len());
1799        Ok(reports)
1800    }
1801}
1802
1803#[cfg(test)]
1804mod tests {
1805    use rstest::rstest;
1806
1807    use super::*;
1808    use crate::common::consts::{
1809        DERIBIT_ACCOUNT_RATE_KEY, DERIBIT_GLOBAL_RATE_KEY, DERIBIT_ORDER_RATE_KEY,
1810    };
1811
1812    #[rstest]
1813    #[case("private/buy", true, false)]
1814    #[case("private/cancel", true, false)]
1815    #[case("private/get_account_summaries", false, true)]
1816    #[case("private/get_positions", false, true)]
1817    #[case("public/get_instruments", false, false)]
1818    fn test_method_classification(
1819        #[case] method: &str,
1820        #[case] is_order: bool,
1821        #[case] is_account: bool,
1822    ) {
1823        assert_eq!(DeribitRawHttpClient::is_order_method(method), is_order);
1824        assert_eq!(DeribitRawHttpClient::is_account_method(method), is_account);
1825    }
1826
1827    #[rstest]
1828    #[case("private/buy", vec![DERIBIT_GLOBAL_RATE_KEY, DERIBIT_ORDER_RATE_KEY])]
1829    #[case("private/get_account_summaries", vec![DERIBIT_GLOBAL_RATE_KEY, DERIBIT_ACCOUNT_RATE_KEY])]
1830    #[case("public/get_instruments", vec![DERIBIT_GLOBAL_RATE_KEY])]
1831    fn test_rate_limit_keys(#[case] method: &str, #[case] expected_keys: Vec<&str>) {
1832        let keys = DeribitRawHttpClient::rate_limit_keys(method);
1833
1834        for key in &expected_keys {
1835            assert!(keys.contains(&key.to_string()));
1836        }
1837        assert!(keys.contains(&format!("deribit:{method}")));
1838    }
1839
1840    #[rstest]
1841    fn test_paginator_empty_page_returns_none() {
1842        let mut p = TradePaginator::new(100, 200);
1843        assert!(p.advance(&[], &[], true).is_none());
1844    }
1845
1846    #[rstest]
1847    fn test_paginator_single_page_no_more() {
1848        let mut p = TradePaginator::new(100, 200);
1849        let ids = vec!["t1".into(), "t2".into()];
1850        let ts = vec![150, 160];
1851
1852        let result = p.advance(&ids, &ts, false);
1853        assert_eq!(result, Some(vec![0, 1]));
1854    }
1855
1856    #[rstest]
1857    fn test_paginator_dedup_across_pages() {
1858        let mut p = TradePaginator::new(100, 200);
1859
1860        // First page: two new trades
1861        let ids1 = vec!["t1".into(), "t2".into()];
1862        let ts1 = vec![150, 150];
1863        let r1 = p.advance(&ids1, &ts1, true);
1864        assert_eq!(r1, Some(vec![0, 1]));
1865        assert_eq!(p.cursor, 150);
1866
1867        // Second page: t2 repeated, t3 new
1868        let ids2 = vec!["t2".into(), "t3".into()];
1869        let ts2 = vec![150, 150];
1870        let r2 = p.advance(&ids2, &ts2, false);
1871        assert_eq!(r2, Some(vec![1])); // Only t3 is new
1872    }
1873
1874    #[rstest]
1875    fn test_paginator_all_duplicates_advances_past_timestamp() {
1876        let mut p = TradePaginator::new(100, 200);
1877
1878        // First page
1879        let ids = vec!["t1".into(), "t2".into()];
1880        let ts = vec![150, 150];
1881        p.advance(&ids, &ts, true);
1882        assert_eq!(p.cursor, 150);
1883
1884        // Second page: same trades again (all duplicates)
1885        let r2 = p.advance(&ids, &ts, true);
1886        assert_eq!(r2, Some(vec![])); // No new items
1887        assert_eq!(p.cursor, 151); // Advanced past 150
1888    }
1889
1890    #[rstest]
1891    fn test_paginator_is_exhausted_strict_greater_than() {
1892        let mut p = TradePaginator::new(100, 150);
1893
1894        let ids = vec!["t1".into()];
1895        let ts = vec![150];
1896        p.advance(&ids, &ts, true);
1897
1898        // Cursor at end (150) should NOT be exhausted
1899        assert_eq!(p.cursor, 150);
1900        assert!(!p.is_exhausted());
1901
1902        // All duplicates: cursor advances to 151
1903        p.advance(&ids, &ts, true);
1904        assert_eq!(p.cursor, 151);
1905        assert!(p.is_exhausted());
1906    }
1907
1908    #[rstest]
1909    fn test_paginator_reset_clears_state() {
1910        let mut p = TradePaginator::new(100, 200);
1911
1912        let ids = vec!["t1".into()];
1913        let ts = vec![150];
1914        p.advance(&ids, &ts, true);
1915        assert_eq!(p.seen_ids.len(), 1);
1916
1917        p.reset(100);
1918        assert_eq!(p.cursor, 100);
1919        assert!(p.seen_ids.is_empty());
1920
1921        // Same ID is now treated as new
1922        let r = p.advance(&ids, &ts, false);
1923        assert_eq!(r, Some(vec![0]));
1924    }
1925}