Skip to main content

nautilus_bitmex/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [BitMEX](https://bitmex.com) REST API.
17//!
18//! This module defines and implements a [`BitmexHttpClient`] for
19//! sending requests to various BitMEX endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`BitmexHttpError`].
22//!
23//! BitMEX API reference <https://www.bitmex.com/api/explorer/#/default>.
24
25use std::{
26    collections::HashMap,
27    num::NonZeroU32,
28    sync::{
29        Arc, LazyLock, RwLock,
30        atomic::{AtomicBool, Ordering},
31    },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37    AtomicMap, AtomicTime, UUID4, UnixNanos,
38    consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39    env::get_or_env_var_opt,
40    time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43    data::{Bar, BarType, TradeTick},
44    enums::{
45        AccountType, AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType,
46        PriceType, TimeInForce, TrailingOffsetType, TriggerType,
47    },
48    events::AccountState,
49    identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50    instruments::{Instrument as InstrumentTrait, InstrumentAny},
51    reports::{FillReport, OrderStatusReport, PositionStatusReport},
52    types::{MarginBalance, Money, Price, Quantity},
53};
54use nautilus_network::{
55    http::{HttpClient, Method, StatusCode, USER_AGENT},
56    ratelimiter::quota::Quota,
57    retry::{RetryConfig, RetryManager},
58};
59use rust_decimal::Decimal;
60use serde::{Deserialize, Serialize, de::DeserializeOwned};
61use serde_json::Value;
62use tokio_util::sync::CancellationToken;
63use ustr::Ustr;
64
65use super::{
66    error::{BitmexErrorResponse, BitmexHttpError},
67    models::{
68        BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
69        BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
70    },
71    query::{
72        DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
73        GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
74        GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder,
75        PostCancelAllAfterParams, PostOrderParams, PostPositionLeverageParams, PutOrderParams,
76    },
77};
78use crate::{
79    common::{
80        consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
81        credential::{Credential, credential_env_vars},
82        enums::{
83            BitmexContingencyType, BitmexEnvironment, BitmexExecInstruction, BitmexOrderStatus,
84            BitmexOrderType, BitmexPegPriceType, BitmexSide, BitmexTimeInForce,
85        },
86        parse::{bitmex_currency_divisor, parse_account_balance, quantity_to_u32},
87    },
88    http::{
89        parse::{
90            InstrumentParseResult, parse_fill_report, parse_instrument_any,
91            parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
92        },
93        query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
94    },
95    websocket::messages::BitmexMarginMsg,
96};
97
98/// Default BitMEX REST API rate limits.
99///
100/// BitMEX implements a dual-layer rate limiting system:
101/// - Primary limit: 120 requests per minute for authenticated users (30 for unauthenticated).
102/// - Secondary limit: 10 requests per second burst limit for specific endpoints.
103const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
104const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
105const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
106
107const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
108const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
109
110static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
111    vec![
112        Ustr::from(BITMEX_GLOBAL_RATE_KEY),
113        Ustr::from(BITMEX_MINUTE_RATE_KEY),
114    ]
115});
116
117/// Represents a BitMEX HTTP response.
118#[derive(Debug, Serialize, Deserialize)]
119pub struct BitmexResponse<T> {
120    /// The typed data returned by the BitMEX endpoint.
121    pub data: Vec<T>,
122}
123
124/// Provides a lower-level HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
125///
126/// This client wraps the underlying [`HttpClient`] to handle functionality
127/// specific to BitMEX, such as request signing (for authenticated endpoints),
128/// forming request URLs, and deserializing responses into specific data models.
129///
130/// # Connection Management
131///
132/// The client uses HTTP keep-alive for connection pooling with a 90-second idle timeout,
133/// which matches BitMEX's server-side keep-alive timeout. Connections are automatically
134/// reused for subsequent requests to minimize latency.
135///
136/// # Rate Limiting
137///
138/// BitMEX enforces the following rate limits:
139/// - 120 requests per minute for authenticated users (30 for unauthenticated).
140/// - 10 requests per second burst limit for certain endpoints (order management).
141///
142/// The client automatically respects these limits through the configured quota.
143#[derive(Debug, Clone)]
144pub struct BitmexRawHttpClient {
145    base_url: String,
146    client: HttpClient,
147    credential: Option<Credential>,
148    recv_window_ms: u64,
149    retry_manager: RetryManager<BitmexHttpError>,
150    cancellation_token: Arc<RwLock<CancellationToken>>,
151}
152
153impl Default for BitmexRawHttpClient {
154    fn default() -> Self {
155        Self::new(
156            None,
157            60,
158            3,
159            1000,
160            10_000,
161            10_000,
162            BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND,
163            BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED,
164            None,
165        )
166        .expect("Failed to create default BitmexHttpInnerClient")
167    }
168}
169
170impl BitmexRawHttpClient {
171    /// Creates a new [`BitmexRawHttpClient`] using the default BitMEX HTTP URL,
172    /// optionally overridden with a custom base URL.
173    ///
174    /// This version of the client has **no credentials**, so it can only
175    /// call publicly accessible endpoints.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if the retry manager cannot be created.
180    #[expect(clippy::too_many_arguments)]
181    pub fn new(
182        base_url: Option<String>,
183        timeout_secs: u64,
184        max_retries: u32,
185        retry_delay_ms: u64,
186        retry_delay_max_ms: u64,
187        recv_window_ms: u64,
188        max_requests_per_second: u32,
189        max_requests_per_minute: u32,
190        proxy_url: Option<String>,
191    ) -> Result<Self, BitmexHttpError> {
192        let retry_config = RetryConfig {
193            max_retries,
194            initial_delay_ms: retry_delay_ms,
195            max_delay_ms: retry_delay_max_ms,
196            backoff_factor: 2.0,
197            jitter_ms: 1000,
198            operation_timeout_ms: Some(60_000),
199            immediate_first: false,
200            max_elapsed_ms: Some(180_000),
201        };
202
203        let retry_manager = RetryManager::new(retry_config);
204
205        Ok(Self {
206            base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
207            client: HttpClient::new(
208                Self::default_headers(),
209                vec![],
210                Self::rate_limiter_quotas(max_requests_per_second, max_requests_per_minute)?,
211                Some(Self::default_quota(max_requests_per_second)?),
212                Some(timeout_secs),
213                proxy_url,
214            )
215            .map_err(|e| {
216                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
217            })?,
218            credential: None,
219            recv_window_ms,
220            retry_manager,
221            cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
222        })
223    }
224
225    /// Creates a new [`BitmexRawHttpClient`] configured with credentials
226    /// for authenticated requests, optionally using a custom base URL.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if the retry manager cannot be created.
231    #[expect(clippy::too_many_arguments)]
232    pub fn with_credentials(
233        api_key: String,
234        api_secret: String,
235        base_url: String,
236        timeout_secs: u64,
237        max_retries: u32,
238        retry_delay_ms: u64,
239        retry_delay_max_ms: u64,
240        recv_window_ms: u64,
241        max_requests_per_second: u32,
242        max_requests_per_minute: u32,
243        proxy_url: Option<String>,
244    ) -> Result<Self, BitmexHttpError> {
245        let retry_config = RetryConfig {
246            max_retries,
247            initial_delay_ms: retry_delay_ms,
248            max_delay_ms: retry_delay_max_ms,
249            backoff_factor: 2.0,
250            jitter_ms: 1000,
251            operation_timeout_ms: Some(60_000),
252            immediate_first: false,
253            max_elapsed_ms: Some(180_000),
254        };
255
256        let retry_manager = RetryManager::new(retry_config);
257
258        Ok(Self {
259            base_url,
260            client: HttpClient::new(
261                Self::default_headers(),
262                vec![],
263                Self::rate_limiter_quotas(max_requests_per_second, max_requests_per_minute)?,
264                Some(Self::default_quota(max_requests_per_second)?),
265                Some(timeout_secs),
266                proxy_url,
267            )
268            .map_err(|e| {
269                BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
270            })?,
271            credential: Some(Credential::new(api_key, api_secret)),
272            recv_window_ms,
273            retry_manager,
274            cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
275        })
276    }
277
278    fn default_headers() -> HashMap<String, String> {
279        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
280    }
281
282    fn default_quota(max_requests_per_second: u32) -> Result<Quota, BitmexHttpError> {
283        let burst = NonZeroU32::new(max_requests_per_second)
284            .unwrap_or(NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).expect("non-zero"));
285        Quota::per_second(burst).ok_or_else(|| {
286            BitmexHttpError::ValidationError(format!(
287                "Invalid max_requests_per_second: {max_requests_per_second} exceeds maximum"
288            ))
289        })
290    }
291
292    fn rate_limiter_quotas(
293        max_requests_per_second: u32,
294        max_requests_per_minute: u32,
295    ) -> Result<Vec<(String, Quota)>, BitmexHttpError> {
296        let per_sec_quota = Self::default_quota(max_requests_per_second)?;
297        let per_min_quota =
298            Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
299                NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED)
300                    .expect("non-zero")
301            }));
302
303        Ok(vec![
304            (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
305            (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
306        ])
307    }
308
309    fn rate_limit_keys() -> Vec<Ustr> {
310        RATE_LIMIT_KEYS.clone()
311    }
312
313    /// Cancel all pending HTTP requests.
314    ///
315    /// # Panics
316    ///
317    /// Panics if the cancellation token lock is poisoned.
318    pub fn cancel_all_requests(&self) {
319        self.cancellation_token
320            .read()
321            .expect("cancellation token lock poisoned")
322            .cancel();
323    }
324
325    /// Replace the cancellation token so new requests can proceed.
326    ///
327    /// # Panics
328    ///
329    /// Panics if the cancellation token lock is poisoned.
330    pub fn reset_cancellation_token(&self) {
331        *self
332            .cancellation_token
333            .write()
334            .expect("cancellation token lock poisoned") = CancellationToken::new();
335    }
336
337    /// Get a clone of the cancellation token for this client.
338    ///
339    /// # Panics
340    ///
341    /// Panics if the cancellation token lock is poisoned.
342    pub fn cancellation_token(&self) -> CancellationToken {
343        self.cancellation_token
344            .read()
345            .expect("cancellation token lock poisoned")
346            .clone()
347    }
348
349    fn sign_request(
350        &self,
351        method: &Method,
352        endpoint: &str,
353        body: Option<&[u8]>,
354    ) -> Result<HashMap<String, String>, BitmexHttpError> {
355        let credential = self
356            .credential
357            .as_ref()
358            .ok_or(BitmexHttpError::MissingCredentials)?;
359
360        let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
361        let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
362
363        let full_path = if endpoint.starts_with("/api/v1") {
364            endpoint.to_string()
365        } else {
366            format!("/api/v1{endpoint}")
367        };
368
369        let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
370
371        let mut headers = HashMap::new();
372        headers.insert("api-expires".to_string(), expires.to_string());
373        headers.insert("api-key".to_string(), credential.api_key().to_string());
374        headers.insert("api-signature".to_string(), signature);
375
376        // Add Content-Type header for form-encoded body
377        if body.is_some()
378            && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
379        {
380            headers.insert(
381                "Content-Type".to_string(),
382                "application/x-www-form-urlencoded".to_string(),
383            );
384        }
385
386        Ok(headers)
387    }
388
389    async fn send_request<T: DeserializeOwned, P: Serialize>(
390        &self,
391        method: Method,
392        endpoint: &str,
393        params: Option<&P>,
394        body: Option<Vec<u8>>,
395        authenticate: bool,
396    ) -> Result<T, BitmexHttpError> {
397        let endpoint = endpoint.to_string();
398        let method_clone = method.clone();
399        let body_clone = body.clone();
400
401        // Serialize params before closure to avoid reference lifetime issues
402        // Query params are used with GET and DELETE methods
403        let params_str = if method == Method::GET || method == Method::DELETE {
404            params
405                .map(serde_urlencoded::to_string)
406                .transpose()
407                .map_err(|e| {
408                    BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
409                })?
410        } else {
411            None
412        };
413
414        let full_endpoint = match params_str {
415            Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
416            _ => endpoint.clone(),
417        };
418
419        let url = format!("{}{}", self.base_url, full_endpoint);
420
421        let operation = || {
422            let url = url.clone();
423            let method = method_clone.clone();
424            let body = body_clone.clone();
425            let full_endpoint = full_endpoint.clone();
426
427            async move {
428                let headers = if authenticate {
429                    Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
430                } else {
431                    None
432                };
433
434                let rate_keys = Self::rate_limit_keys();
435                let resp = self
436                    .client
437                    .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
438                    .await?;
439
440                if resp.status.is_success() {
441                    serde_json::from_slice(&resp.body).map_err(Into::into)
442                } else if let Ok(error_resp) =
443                    serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
444                {
445                    Err(error_resp.into())
446                } else {
447                    Err(BitmexHttpError::UnexpectedStatus {
448                        status: StatusCode::from_u16(resp.status.as_u16())
449                            .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
450                        body: String::from_utf8_lossy(&resp.body).to_string(),
451                    })
452                }
453            }
454        };
455
456        // Retry strategy based on BitMEX error responses and HTTP status codes:
457        //
458        // 1. Network errors: always retry (transient connection issues).
459        // 2. HTTP 5xx/429: server errors and rate limiting should be retried.
460        // 3. BitMEX JSON errors with specific handling:
461        //    - "RateLimitError": explicit rate limit error from BitMEX.
462        //    - "HTTPError": generic error name used by BitMEX for various issues
463        //      Only retry if message contains "rate limit" to avoid retrying
464        //      non-transient errors like authentication failures, validation errors,
465        //      insufficient balance, etc. which also return as "HTTPError".
466        //
467        // Note: BitMEX returns many permanent errors as "HTTPError" (e.g., "Invalid orderQty",
468        // "Account has insufficient Available Balance", "Invalid API Key") which should NOT
469        // be retried. We only retry when the message explicitly mentions rate limiting.
470        //
471        // See tests in tests/http.rs for retry behavior validation.
472        let should_retry = |error: &BitmexHttpError| -> bool {
473            match error {
474                BitmexHttpError::NetworkError(_) => true,
475                BitmexHttpError::UnexpectedStatus { status, .. } => {
476                    status.as_u16() >= 500 || status.as_u16() == 429
477                }
478                BitmexHttpError::BitmexError {
479                    error_name,
480                    message,
481                } => {
482                    error_name == "RateLimitError"
483                        || (error_name == "HTTPError"
484                            && message.to_lowercase().contains("rate limit"))
485                }
486                _ => false,
487            }
488        };
489
490        let create_error = |msg: String| -> BitmexHttpError {
491            if msg == "canceled" {
492                BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
493            } else {
494                BitmexHttpError::NetworkError(msg)
495            }
496        };
497
498        let cancel_token = self.cancellation_token();
499
500        self.retry_manager
501            .execute_with_retry_with_cancel(
502                endpoint.as_str(),
503                operation,
504                should_retry,
505                create_error,
506                &cancel_token,
507            )
508            .await
509    }
510
511    /// Get all instruments.
512    ///
513    /// # Errors
514    ///
515    /// Returns an error if the request fails, the response cannot be parsed, or the API returns an error.
516    pub async fn get_instruments(
517        &self,
518        active_only: bool,
519    ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
520        let path = if active_only {
521            "/instrument/active"
522        } else {
523            "/instrument"
524        };
525        self.send_request::<_, ()>(Method::GET, path, None, None, false)
526            .await
527    }
528
529    /// Requests the current server time from BitMEX.
530    ///
531    /// Retrieves the BitMEX API info including the system time in Unix timestamp (milliseconds).
532    /// This is useful for synchronizing local clocks with the exchange server and logging time drift.
533    ///
534    /// # Errors
535    ///
536    /// Returns an error if the HTTP request fails or if the response body
537    /// cannot be parsed into [`BitmexApiInfo`].
538    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
539        let response: BitmexApiInfo = self
540            .send_request::<_, ()>(Method::GET, "", None, None, false)
541            .await?;
542        Ok(response.timestamp)
543    }
544
545    /// Get the instrument definition for the specified symbol.
546    ///
547    /// BitMEX responds to `/instrument?symbol=...` with an array, even when
548    /// a single symbol is requested. This helper returns the first element of
549    /// that array and yields `Ok(None)` when the venue returns an empty list
550    /// (e.g. unknown symbol).
551    ///
552    /// # Errors
553    ///
554    /// Returns an error if the request fails or the payload cannot be deserialized.
555    pub async fn get_instrument(
556        &self,
557        symbol: &str,
558    ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
559        let path = &format!("/instrument?symbol={symbol}");
560        let instruments: Vec<BitmexInstrument> = self
561            .send_request::<_, ()>(Method::GET, path, None, None, false)
562            .await?;
563
564        Ok(instruments.into_iter().next())
565    }
566
567    /// Get user wallet information.
568    ///
569    /// # Errors
570    ///
571    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
572    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
573        let endpoint = "/user/wallet";
574        self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
575            .await
576    }
577
578    /// Get user margin information for a specific currency.
579    ///
580    /// # Errors
581    ///
582    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
583    pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
584        let path = format!("/user/margin?currency={currency}");
585        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
586            .await
587    }
588
589    /// Get user margin information for all currencies.
590    ///
591    /// # Errors
592    ///
593    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
594    pub async fn get_all_margins(&self) -> Result<Vec<BitmexMargin>, BitmexHttpError> {
595        self.send_request::<_, ()>(Method::GET, "/user/margin?currency=all", None, None, true)
596            .await
597    }
598
599    /// Get historical trades.
600    ///
601    /// # Errors
602    ///
603    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
604    pub async fn get_trades(
605        &self,
606        params: GetTradeParams,
607    ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
608        self.send_request(Method::GET, "/trade", Some(&params), None, true)
609            .await
610    }
611
612    /// Get bucketed (aggregated) trade data.
613    ///
614    /// # Errors
615    ///
616    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
617    pub async fn get_trade_bucketed(
618        &self,
619        params: GetTradeBucketedParams,
620    ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
621        self.send_request(Method::GET, "/trade/bucketed", Some(&params), None, true)
622            .await
623    }
624
625    /// Get user orders.
626    ///
627    /// # Errors
628    ///
629    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
630    pub async fn get_orders(
631        &self,
632        params: GetOrderParams,
633    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
634        self.send_request(Method::GET, "/order", Some(&params), None, true)
635            .await
636    }
637
638    /// Place a new order.
639    ///
640    /// # Errors
641    ///
642    /// Returns an error if credentials are missing, the request fails, order validation fails, or the API returns an error.
643    pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
644        // BitMEX spec requires form-encoded body for POST /order
645        let body = serde_urlencoded::to_string(&params)
646            .map_err(|e| {
647                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
648            })?
649            .into_bytes();
650        let path = "/order";
651        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
652            .await
653    }
654
655    /// Cancel user orders.
656    ///
657    /// # Errors
658    ///
659    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
660    pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
661        // BitMEX spec requires form-encoded body for DELETE /order
662        let body = serde_urlencoded::to_string(&params)
663            .map_err(|e| {
664                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
665            })?
666            .into_bytes();
667        let path = "/order";
668        self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
669            .await
670    }
671
672    /// Amend an existing order.
673    ///
674    /// # Errors
675    ///
676    /// Returns an error if credentials are missing, the request fails, the order doesn't exist, or the API returns an error.
677    pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
678        // BitMEX spec requires form-encoded body for PUT /order
679        let body = serde_urlencoded::to_string(&params)
680            .map_err(|e| {
681                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
682            })?
683            .into_bytes();
684        let path = "/order";
685        self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
686            .await
687    }
688
689    /// Cancel all orders.
690    ///
691    /// # Errors
692    ///
693    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
694    ///
695    /// # References
696    ///
697    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAll>
698    pub async fn cancel_all_orders(
699        &self,
700        params: DeleteAllOrdersParams,
701    ) -> Result<Value, BitmexHttpError> {
702        self.send_request(Method::DELETE, "/order/all", Some(&params), None, true)
703            .await
704    }
705
706    /// Set a dead man's switch (cancel all orders after timeout).
707    ///
708    /// Calling with `timeout=0` disarms the switch.
709    ///
710    /// # Errors
711    ///
712    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
713    ///
714    /// # References
715    ///
716    /// <https://www.bitmex.com/api/explorer/#!/Order/Order_cancelAllAfter>
717    pub async fn cancel_all_after(
718        &self,
719        params: PostCancelAllAfterParams,
720    ) -> Result<Value, BitmexHttpError> {
721        let body = serde_urlencoded::to_string(&params)
722            .map_err(|e| {
723                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
724            })?
725            .into_bytes();
726        self.send_request::<_, ()>(
727            Method::POST,
728            "/order/cancelAllAfter",
729            None,
730            Some(body),
731            true,
732        )
733        .await
734    }
735
736    /// Get user executions.
737    ///
738    /// # Errors
739    ///
740    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
741    pub async fn get_executions(
742        &self,
743        params: GetExecutionParams,
744    ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
745        let query = serde_urlencoded::to_string(&params).map_err(|e| {
746            BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
747        })?;
748        let path = format!("/execution/tradeHistory?{query}");
749        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
750            .await
751    }
752
753    /// Get user positions.
754    ///
755    /// # Errors
756    ///
757    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
758    pub async fn get_positions(
759        &self,
760        params: GetPositionParams,
761    ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
762        self.send_request(Method::GET, "/position", Some(&params), None, true)
763            .await
764    }
765
766    /// Update position leverage.
767    ///
768    /// # Errors
769    ///
770    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
771    pub async fn update_position_leverage(
772        &self,
773        params: PostPositionLeverageParams,
774    ) -> Result<BitmexPosition, BitmexHttpError> {
775        // BitMEX spec requires form-encoded body for POST endpoints
776        let body = serde_urlencoded::to_string(&params)
777            .map_err(|e| {
778                BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
779            })?
780            .into_bytes();
781        let path = "/position/leverage";
782        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
783            .await
784    }
785}
786
787/// Provides a HTTP client for connecting to the [BitMEX](https://bitmex.com) REST API.
788///
789/// This is the high-level client that wraps the inner client and provides
790/// Nautilus-specific functionality for trading operations.
791#[derive(Debug)]
792#[cfg_attr(
793    feature = "python",
794    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex", from_py_object)
795)]
796#[cfg_attr(
797    feature = "python",
798    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bitmex")
799)]
800pub struct BitmexHttpClient {
801    pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
802    pub(crate) order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
803    clock: &'static AtomicTime,
804    inner: Arc<BitmexRawHttpClient>,
805    cache_initialized: AtomicBool,
806}
807
808impl Clone for BitmexHttpClient {
809    fn clone(&self) -> Self {
810        let cache_initialized = AtomicBool::new(false);
811
812        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
813        if is_initialized {
814            cache_initialized.store(true, Ordering::Release);
815        }
816
817        Self {
818            inner: self.inner.clone(),
819            instruments_cache: self.instruments_cache.clone(),
820            order_type_cache: self.order_type_cache.clone(),
821            cache_initialized,
822            clock: self.clock,
823        }
824    }
825}
826
827impl Default for BitmexHttpClient {
828    fn default() -> Self {
829        Self::new(
830            None,
831            None,
832            None,
833            BitmexEnvironment::Mainnet,
834            60,
835            3,
836            1_000,
837            10_000,
838            10_000,
839            BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND,
840            BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED,
841            None,
842        )
843        .expect("Failed to create default BitmexHttpClient")
844    }
845}
846
847impl BitmexHttpClient {
848    /// Creates a new [`BitmexHttpClient`] instance.
849    ///
850    /// # Errors
851    ///
852    /// Returns an error if the HTTP client cannot be created.
853    #[expect(clippy::too_many_arguments)]
854    pub fn new(
855        base_url: Option<String>,
856        api_key: Option<String>,
857        api_secret: Option<String>,
858        environment: BitmexEnvironment,
859        timeout_secs: u64,
860        max_retries: u32,
861        retry_delay_ms: u64,
862        retry_delay_max_ms: u64,
863        recv_window_ms: u64,
864        max_requests_per_second: u32,
865        max_requests_per_minute: u32,
866        proxy_url: Option<String>,
867    ) -> Result<Self, BitmexHttpError> {
868        // Determine the base URL
869        let url = base_url.unwrap_or_else(|| match environment {
870            BitmexEnvironment::Testnet => BITMEX_HTTP_TESTNET_URL.to_string(),
871            BitmexEnvironment::Mainnet => BITMEX_HTTP_URL.to_string(),
872        });
873
874        let (key_var, secret_var) = credential_env_vars(environment);
875        let api_key = get_or_env_var_opt(api_key, key_var);
876        let api_secret = get_or_env_var_opt(api_secret, secret_var);
877
878        let inner = match (api_key, api_secret) {
879            (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
880                key,
881                secret,
882                url,
883                timeout_secs,
884                max_retries,
885                retry_delay_ms,
886                retry_delay_max_ms,
887                recv_window_ms,
888                max_requests_per_second,
889                max_requests_per_minute,
890                proxy_url,
891            )?,
892            (Some(_), None) | (None, Some(_)) => {
893                return Err(BitmexHttpError::ValidationError(
894                    "Both api_key and api_secret must be provided, or neither".to_string(),
895                ));
896            }
897            (None, None) => BitmexRawHttpClient::new(
898                Some(url),
899                timeout_secs,
900                max_retries,
901                retry_delay_ms,
902                retry_delay_max_ms,
903                recv_window_ms,
904                max_requests_per_second,
905                max_requests_per_minute,
906                proxy_url,
907            )?,
908        };
909
910        Ok(Self {
911            inner: Arc::new(inner),
912            instruments_cache: Arc::new(AtomicMap::new()),
913            order_type_cache: Arc::new(DashMap::new()),
914            cache_initialized: AtomicBool::new(false),
915            clock: get_atomic_clock_realtime(),
916        })
917    }
918
919    /// Creates a new [`BitmexHttpClient`] instance using environment variables and
920    /// the default BitMEX HTTP base URL.
921    ///
922    /// # Errors
923    ///
924    /// Returns an error if required environment variables are not set or invalid.
925    pub fn from_env() -> anyhow::Result<Self> {
926        Self::with_credentials(
927            None, None, None, 60, 3, 1_000, 10_000, 10_000, 10, 120, None,
928        )
929        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
930    }
931
932    /// Creates a new [`BitmexHttpClient`] configured with credentials
933    /// for authenticated requests.
934    ///
935    /// If `api_key` or `api_secret` are `None`, they will be sourced from the
936    /// `BITMEX_API_KEY` and `BITMEX_API_SECRET` environment variables.
937    ///
938    /// # Errors
939    ///
940    /// Returns an error if one credential is provided without the other.
941    #[expect(clippy::too_many_arguments)]
942    pub fn with_credentials(
943        api_key: Option<String>,
944        api_secret: Option<String>,
945        base_url: Option<String>,
946        timeout_secs: u64,
947        max_retries: u32,
948        retry_delay_ms: u64,
949        retry_delay_max_ms: u64,
950        recv_window_ms: u64,
951        max_requests_per_second: u32,
952        max_requests_per_minute: u32,
953        proxy_url: Option<String>,
954    ) -> anyhow::Result<Self> {
955        // Determine environment from URL to select correct environment variables
956        let environment = if base_url.as_ref().is_some_and(|url| url.contains("testnet")) {
957            BitmexEnvironment::Testnet
958        } else {
959            BitmexEnvironment::Mainnet
960        };
961
962        let (key_var, secret_var) = credential_env_vars(environment);
963
964        let api_key = get_or_env_var_opt(api_key, key_var);
965        let api_secret = get_or_env_var_opt(api_secret, secret_var);
966
967        // If we're trying to create an authenticated client, we need both key and secret
968        if api_key.is_some() && api_secret.is_none() {
969            anyhow::bail!("{secret_var} is required when {key_var} is provided");
970        }
971
972        if api_key.is_none() && api_secret.is_some() {
973            anyhow::bail!("{key_var} is required when {secret_var} is provided");
974        }
975
976        Self::new(
977            base_url,
978            api_key,
979            api_secret,
980            environment,
981            timeout_secs,
982            max_retries,
983            retry_delay_ms,
984            retry_delay_max_ms,
985            recv_window_ms,
986            max_requests_per_second,
987            max_requests_per_minute,
988            proxy_url,
989        )
990        .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
991    }
992
993    /// Returns the base url being used by the client.
994    #[must_use]
995    pub fn base_url(&self) -> &str {
996        self.inner.base_url.as_str()
997    }
998
999    /// Returns the public API key being used by the client.
1000    #[must_use]
1001    pub fn api_key(&self) -> Option<&str> {
1002        self.inner.credential.as_ref().map(|c| c.api_key())
1003    }
1004
1005    /// Returns a masked version of the API key for logging purposes.
1006    #[must_use]
1007    pub fn api_key_masked(&self) -> Option<String> {
1008        self.inner.credential.as_ref().map(|c| c.api_key_masked())
1009    }
1010
1011    /// Requests the current server time from BitMEX.
1012    ///
1013    /// Returns the BitMEX system time as a Unix timestamp in milliseconds.
1014    ///
1015    /// # Errors
1016    ///
1017    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
1018    pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
1019        self.inner.get_server_time().await
1020    }
1021
1022    /// Sets the dead man's switch (cancel all orders after timeout).
1023    ///
1024    /// Calling with `timeout_ms=0` disarms the switch.
1025    ///
1026    /// # Errors
1027    ///
1028    /// Returns an error if the HTTP request fails.
1029    pub async fn cancel_all_after(&self, timeout_ms: u64) -> anyhow::Result<()> {
1030        let params = PostCancelAllAfterParams {
1031            timeout: timeout_ms,
1032        };
1033        self.inner.cancel_all_after(params).await?;
1034        Ok(())
1035    }
1036
1037    /// Generates a timestamp for initialization.
1038    fn generate_ts_init(&self) -> UnixNanos {
1039        self.clock.get_time_ns()
1040    }
1041
1042    /// Check if the order has a contingency type that requires linking.
1043    fn is_contingent_order(contingency_type: ContingencyType) -> bool {
1044        matches!(
1045            contingency_type,
1046            ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
1047        )
1048    }
1049
1050    /// Check if the order is a parent in contingency relationships.
1051    fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
1052        matches!(
1053            contingency_type,
1054            ContingencyType::Oco | ContingencyType::Oto
1055        )
1056    }
1057
1058    /// Populate missing `linked_order_ids` for contingency orders by grouping on `order_list_id`.
1059    fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
1060        let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
1061        let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
1062        let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
1063        let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
1064
1065        for report in reports.iter() {
1066            let Some(client_order_id) = report.client_order_id else {
1067                continue;
1068            };
1069
1070            if let Some(order_list_id) = report.order_list_id {
1071                order_list_groups
1072                    .entry(order_list_id)
1073                    .or_default()
1074                    .push(client_order_id);
1075
1076                if Self::is_parent_contingency(report.contingency_type) {
1077                    order_list_parents
1078                        .entry(order_list_id)
1079                        .or_insert(client_order_id);
1080                }
1081            }
1082
1083            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1084                && Self::is_contingent_order(report.contingency_type)
1085            {
1086                prefix_groups
1087                    .entry(base.to_owned())
1088                    .or_default()
1089                    .push(client_order_id);
1090
1091                if Self::is_parent_contingency(report.contingency_type) {
1092                    prefix_parents
1093                        .entry(base.to_owned())
1094                        .or_insert(client_order_id);
1095                }
1096            }
1097        }
1098
1099        for report in reports.iter_mut() {
1100            let Some(client_order_id) = report.client_order_id else {
1101                continue;
1102            };
1103
1104            if report.linked_order_ids.is_some() {
1105                continue;
1106            }
1107
1108            // Only process contingent orders
1109            if !Self::is_contingent_order(report.contingency_type) {
1110                continue;
1111            }
1112
1113            if let Some(order_list_id) = report.order_list_id
1114                && let Some(group) = order_list_groups.get(&order_list_id)
1115            {
1116                let mut linked: Vec<ClientOrderId> = group
1117                    .iter()
1118                    .copied()
1119                    .filter(|candidate| candidate != &client_order_id)
1120                    .collect();
1121
1122                if !linked.is_empty() {
1123                    if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1124                        if client_order_id == *parent_id {
1125                            report.parent_order_id = None;
1126                        } else {
1127                            linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1128                            report.parent_order_id = Some(*parent_id);
1129                        }
1130                    } else {
1131                        report.parent_order_id = None;
1132                    }
1133
1134                    log::trace!(
1135                        "BitMEX linked ids sourced from order list id: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, linked_order_ids={:?}",
1136                        client_order_id,
1137                        order_list_id,
1138                        report.contingency_type,
1139                        linked,
1140                    );
1141                    report.linked_order_ids = Some(linked);
1142                    continue;
1143                }
1144
1145                log::trace!(
1146                    "BitMEX order list id group had no peers: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, order_list_group={:?}",
1147                    client_order_id,
1148                    order_list_id,
1149                    report.contingency_type,
1150                    group,
1151                );
1152                report.parent_order_id = None;
1153            } else if report.order_list_id.is_none() {
1154                report.parent_order_id = None;
1155            }
1156
1157            if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1158                && let Some(group) = prefix_groups.get(base)
1159            {
1160                let mut linked: Vec<ClientOrderId> = group
1161                    .iter()
1162                    .copied()
1163                    .filter(|candidate| candidate != &client_order_id)
1164                    .collect();
1165
1166                if !linked.is_empty() {
1167                    if let Some(parent_id) = prefix_parents.get(base) {
1168                        if client_order_id == *parent_id {
1169                            report.parent_order_id = None;
1170                        } else {
1171                            linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1172                            report.parent_order_id = Some(*parent_id);
1173                        }
1174                    } else {
1175                        report.parent_order_id = None;
1176                    }
1177
1178                    log::trace!(
1179                        "BitMEX linked ids constructed from client order id prefix: client_order_id={:?}, contingency_type={:?}, base={}, linked_order_ids={:?}",
1180                        client_order_id,
1181                        report.contingency_type,
1182                        base,
1183                        linked,
1184                    );
1185                    report.linked_order_ids = Some(linked);
1186                    continue;
1187                }
1188
1189                log::trace!(
1190                    "BitMEX client order id prefix group had no peers: client_order_id={:?}, contingency_type={:?}, base={}, prefix_group={:?}",
1191                    client_order_id,
1192                    report.contingency_type,
1193                    base,
1194                    group,
1195                );
1196                report.parent_order_id = None;
1197            } else if client_order_id.as_str().contains('-') {
1198                report.parent_order_id = None;
1199            }
1200
1201            if Self::is_contingent_order(report.contingency_type) {
1202                log::warn!(
1203                    "BitMEX order status report missing linked ids after grouping: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}",
1204                    report.client_order_id,
1205                    report.order_list_id,
1206                    report.contingency_type,
1207                );
1208                report.contingency_type = ContingencyType::NoContingency;
1209                report.parent_order_id = None;
1210            }
1211
1212            report.linked_order_ids = None;
1213        }
1214    }
1215
1216    /// Cancel all pending HTTP requests.
1217    pub fn cancel_all_requests(&self) {
1218        self.inner.cancel_all_requests();
1219    }
1220
1221    /// Replace the cancellation token so new requests can proceed.
1222    pub fn reset_cancellation_token(&self) {
1223        self.inner.reset_cancellation_token();
1224    }
1225
1226    /// Get a clone of the cancellation token for this client.
1227    pub fn cancellation_token(&self) -> CancellationToken {
1228        self.inner.cancellation_token()
1229    }
1230
1231    /// Caches a single instrument.
1232    ///
1233    /// Any existing instrument with the same symbol will be replaced.
1234    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1235        self.instruments_cache
1236            .insert(instrument.raw_symbol().inner(), instrument);
1237        self.cache_initialized.store(true, Ordering::Release);
1238    }
1239
1240    /// Caches multiple instruments.
1241    ///
1242    /// Any existing instruments with the same symbols will be replaced.
1243    pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1244        self.instruments_cache.rcu(|m| {
1245            for inst in instruments {
1246                m.insert(inst.raw_symbol().inner(), inst.clone());
1247            }
1248        });
1249        self.cache_initialized.store(true, Ordering::Release);
1250    }
1251
1252    /// Gets an instrument from the cache by symbol.
1253    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1254        self.instruments_cache.get_cloned(symbol)
1255    }
1256
1257    /// Request a single instrument and parse it into a Nautilus type.
1258    ///
1259    /// # Errors
1260    ///
1261    /// Returns `Ok(Some(..))` when the venue returns a definition that parses
1262    /// successfully, `Ok(None)` when the instrument is unknown, unsupported, or the payload
1263    /// cannot be converted into a Nautilus `Instrument`.
1264    pub async fn request_instrument(
1265        &self,
1266        instrument_id: InstrumentId,
1267    ) -> anyhow::Result<Option<InstrumentAny>> {
1268        let response = self
1269            .inner
1270            .get_instrument(instrument_id.symbol.as_str())
1271            .await?;
1272
1273        let instrument = match response {
1274            Some(instrument) => instrument,
1275            None => return Ok(None),
1276        };
1277
1278        let ts_init = self.generate_ts_init();
1279
1280        match parse_instrument_any(&instrument, ts_init) {
1281            InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
1282            InstrumentParseResult::Unsupported {
1283                symbol,
1284                instrument_type,
1285            } => {
1286                log::debug!(
1287                    "Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
1288                );
1289                Ok(None)
1290            }
1291            InstrumentParseResult::Inactive { symbol, state } => {
1292                log::debug!("Instrument {symbol} is inactive (state={state}), returning None");
1293                Ok(None)
1294            }
1295            InstrumentParseResult::Failed {
1296                symbol,
1297                instrument_type,
1298                error,
1299            } => {
1300                log::error!(
1301                    "Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
1302                );
1303                Ok(None)
1304            }
1305        }
1306    }
1307
1308    /// Request all available instruments and parse them into Nautilus types.
1309    ///
1310    /// # Errors
1311    ///
1312    /// Returns an error if the HTTP request fails or parsing fails.
1313    pub async fn request_instruments(
1314        &self,
1315        active_only: bool,
1316    ) -> anyhow::Result<Vec<InstrumentAny>> {
1317        let instruments = self.inner.get_instruments(active_only).await?;
1318        let ts_init = self.generate_ts_init();
1319
1320        let mut parsed_instruments = Vec::new();
1321        let mut skipped_count = 0;
1322        let mut inactive_count = 0;
1323        let mut failed_count = 0;
1324        let total_count = instruments.len();
1325
1326        for inst in instruments {
1327            match parse_instrument_any(&inst, ts_init) {
1328                InstrumentParseResult::Ok(instrument_any) => {
1329                    parsed_instruments.push(*instrument_any);
1330                }
1331                InstrumentParseResult::Unsupported {
1332                    symbol,
1333                    instrument_type,
1334                } => {
1335                    skipped_count += 1;
1336                    log::debug!(
1337                        "Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
1338                    );
1339                }
1340                InstrumentParseResult::Inactive { symbol, state } => {
1341                    inactive_count += 1;
1342                    log::debug!("Skipping inactive instrument: symbol={symbol}, state={state}");
1343                }
1344                InstrumentParseResult::Failed {
1345                    symbol,
1346                    instrument_type,
1347                    error,
1348                } => {
1349                    failed_count += 1;
1350                    log::error!(
1351                        "Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
1352                    );
1353                }
1354            }
1355        }
1356
1357        if skipped_count > 0 {
1358            log::info!(
1359                "Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
1360            );
1361        }
1362
1363        if inactive_count > 0 {
1364            log::info!(
1365                "Skipped {inactive_count} inactive instrument(s) out of {total_count} total"
1366            );
1367        }
1368
1369        if failed_count > 0 {
1370            log::error!(
1371                "Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
1372                parsed_instruments.len()
1373            );
1374        }
1375
1376        Ok(parsed_instruments)
1377    }
1378
1379    /// Get user wallet information.
1380    ///
1381    /// # Errors
1382    ///
1383    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1384    pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1385        let inner = self.inner.clone();
1386        inner.get_wallet().await
1387    }
1388
1389    /// Get user orders.
1390    ///
1391    /// # Errors
1392    ///
1393    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1394    pub async fn get_orders(
1395        &self,
1396        params: GetOrderParams,
1397    ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1398        let inner = self.inner.clone();
1399        inner.get_orders(params).await
1400    }
1401
1402    /// Get instrument from the instruments cache (if found).
1403    ///
1404    /// # Errors
1405    ///
1406    /// Returns an error if the instrument is not found in the cache.
1407    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1408        self.get_instrument(&symbol).ok_or_else(|| {
1409            anyhow::anyhow!(
1410                "Instrument {symbol} not found in cache, ensure instruments loaded first"
1411            )
1412        })
1413    }
1414
1415    /// Returns the cached price precision for the given symbol.
1416    ///
1417    /// # Errors
1418    ///
1419    /// Returns an error if the instrument was never cached (for example, if
1420    /// instruments were not loaded prior to use).
1421    pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1422        self.instrument_from_cache(symbol)
1423            .map(|instrument| instrument.price_precision())
1424    }
1425
1426    /// Get user margin information for a specific currency.
1427    ///
1428    /// # Errors
1429    ///
1430    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1431    pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1432        self.inner
1433            .get_margin(currency)
1434            .await
1435            .map_err(|e| anyhow::anyhow!(e))
1436    }
1437
1438    /// Get user margin information for all currencies.
1439    ///
1440    /// # Errors
1441    ///
1442    /// Returns an error if credentials are missing, the request fails, or the API returns an error.
1443    pub async fn get_all_margins(&self) -> anyhow::Result<Vec<BitmexMargin>> {
1444        self.inner
1445            .get_all_margins()
1446            .await
1447            .map_err(|e| anyhow::anyhow!(e))
1448    }
1449
1450    /// Request account state for the given account.
1451    ///
1452    /// # Errors
1453    ///
1454    /// Returns an error if the HTTP request fails or no account state is returned.
1455    pub async fn request_account_state(
1456        &self,
1457        account_id: AccountId,
1458    ) -> anyhow::Result<AccountState> {
1459        let margins = self
1460            .inner
1461            .get_all_margins()
1462            .await
1463            .map_err(|e| anyhow::anyhow!(e))?;
1464
1465        let ts_init =
1466            UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
1467
1468        let mut balances = Vec::with_capacity(margins.len());
1469        let mut margins_vec = Vec::new();
1470        let mut latest_timestamp: Option<chrono::DateTime<chrono::Utc>> = None;
1471
1472        for margin in margins {
1473            if let Some(ts) = margin.timestamp {
1474                latest_timestamp = Some(latest_timestamp.map_or(ts, |prev| prev.max(ts)));
1475            }
1476
1477            let margin_msg = BitmexMarginMsg {
1478                account: margin.account,
1479                currency: margin.currency,
1480                risk_limit: margin.risk_limit,
1481                amount: margin.amount,
1482                prev_realised_pnl: margin.prev_realised_pnl,
1483                gross_comm: margin.gross_comm,
1484                gross_open_cost: margin.gross_open_cost,
1485                gross_open_premium: margin.gross_open_premium,
1486                gross_exec_cost: margin.gross_exec_cost,
1487                gross_mark_value: margin.gross_mark_value,
1488                risk_value: margin.risk_value,
1489                init_margin: margin.init_margin,
1490                maint_margin: margin.maint_margin,
1491                target_excess_margin: margin.target_excess_margin,
1492                realised_pnl: margin.realised_pnl,
1493                unrealised_pnl: margin.unrealised_pnl,
1494                wallet_balance: margin.wallet_balance,
1495                margin_balance: margin.margin_balance,
1496                margin_leverage: margin.margin_leverage,
1497                margin_used_pcnt: margin.margin_used_pcnt,
1498                excess_margin: margin.excess_margin,
1499                available_margin: margin.available_margin,
1500                withdrawable_margin: margin.withdrawable_margin,
1501                maker_fee_discount: None,
1502                taker_fee_discount: None,
1503                timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1504                foreign_margin_balance: None,
1505                foreign_requirement: None,
1506            };
1507
1508            let balance = parse_account_balance(&margin_msg);
1509
1510            let divisor = bitmex_currency_divisor(margin_msg.currency.as_str());
1511            let initial_dec = Decimal::from(margin_msg.init_margin.unwrap_or(0).max(0)) / divisor;
1512            let maintenance_dec =
1513                Decimal::from(margin_msg.maint_margin.unwrap_or(0).max(0)) / divisor;
1514
1515            if !initial_dec.is_zero() || !maintenance_dec.is_zero() {
1516                let currency = balance.total.currency;
1517                // BitMEX reports cross-margin aggregates per collateral currency.
1518                margins_vec.push(MarginBalance::new(
1519                    Money::from_decimal(initial_dec, currency)
1520                        .unwrap_or_else(|_| Money::zero(currency)),
1521                    Money::from_decimal(maintenance_dec, currency)
1522                        .unwrap_or_else(|_| Money::zero(currency)),
1523                    None,
1524                ));
1525            }
1526
1527            balances.push(balance);
1528        }
1529
1530        if balances.is_empty() {
1531            anyhow::bail!("No margin data returned from BitMEX");
1532        }
1533
1534        let account_type = AccountType::Margin;
1535        let is_reported = true;
1536        let event_id = UUID4::new();
1537
1538        // Use server timestamp if available, otherwise fall back to local time
1539        let ts_event = latest_timestamp.map_or(ts_init, |ts| {
1540            UnixNanos::from(ts.timestamp_nanos_opt().unwrap_or_default() as u64)
1541        });
1542
1543        Ok(AccountState::new(
1544            account_id,
1545            account_type,
1546            balances,
1547            margins_vec,
1548            is_reported,
1549            event_id,
1550            ts_event,
1551            ts_init,
1552            None,
1553        ))
1554    }
1555
1556    /// Submit a new order.
1557    ///
1558    /// # Errors
1559    ///
1560    /// Returns an error if credentials are missing, the request fails, order validation fails,
1561    /// the order is rejected, or the API returns an error.
1562    #[expect(clippy::too_many_arguments)]
1563    pub async fn submit_order(
1564        &self,
1565        instrument_id: InstrumentId,
1566        client_order_id: ClientOrderId,
1567        order_side: OrderSide,
1568        order_type: OrderType,
1569        quantity: Quantity,
1570        time_in_force: TimeInForce,
1571        price: Option<Price>,
1572        trigger_price: Option<Price>,
1573        trigger_type: Option<TriggerType>,
1574        trailing_offset: Option<f64>,
1575        trailing_offset_type: Option<TrailingOffsetType>,
1576        display_qty: Option<Quantity>,
1577        post_only: bool,
1578        reduce_only: bool,
1579        order_list_id: Option<OrderListId>,
1580        contingency_type: Option<ContingencyType>,
1581        peg_price_type: Option<BitmexPegPriceType>,
1582        peg_offset_value: Option<f64>,
1583    ) -> anyhow::Result<OrderStatusReport> {
1584        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1585
1586        let mut params = super::query::PostOrderParamsBuilder::default();
1587        params.text(NAUTILUS_TRADER);
1588        params.symbol(instrument_id.symbol.as_str());
1589        params.cl_ord_id(client_order_id.as_str());
1590
1591        if order_side == OrderSide::NoOrderSide {
1592            anyhow::bail!("Order side must be Buy or Sell");
1593        }
1594        let side = BitmexSide::from(order_side.as_specified());
1595        params.side(side);
1596
1597        let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1598        params.ord_type(ord_type);
1599
1600        params.order_qty(quantity_to_u32(&quantity, &instrument));
1601
1602        let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1603        params.time_in_force(tif);
1604
1605        if let Some(price) = price {
1606            params.price(price.as_f64());
1607        }
1608
1609        if let Some(trigger_price) = trigger_price {
1610            params.stop_px(trigger_price.as_f64());
1611        }
1612
1613        if let Some(display_qty) = display_qty {
1614            params.display_qty(quantity_to_u32(&display_qty, &instrument));
1615        }
1616
1617        if let Some(order_list_id) = order_list_id {
1618            params.cl_ord_link_id(order_list_id.as_str());
1619        }
1620
1621        let is_trailing_stop = matches!(
1622            order_type,
1623            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
1624        );
1625
1626        if is_trailing_stop && let Some(offset) = trailing_offset {
1627            if let Some(offset_type) = trailing_offset_type
1628                && offset_type != TrailingOffsetType::Price
1629            {
1630                anyhow::bail!(
1631                    "BitMEX only supports PRICE trailing offset type, was {offset_type:?}"
1632                );
1633            }
1634
1635            params.peg_price_type(BitmexPegPriceType::TrailingStopPeg);
1636
1637            // BitMEX requires negative offset for stop-sell orders
1638            let signed_offset = match order_side {
1639                OrderSide::Sell => -offset.abs(),
1640                OrderSide::Buy => offset.abs(),
1641                _ => offset,
1642            };
1643            params.peg_offset_value(signed_offset);
1644        }
1645
1646        // Pegged orders (BBO) via params override
1647        if peg_price_type.is_none() && peg_offset_value.is_some() {
1648            anyhow::bail!("`peg_offset_value` requires `peg_price_type`");
1649        }
1650
1651        if let Some(peg_type) = peg_price_type {
1652            if order_type != OrderType::Limit {
1653                anyhow::bail!(
1654                    "Pegged orders only supported for LIMIT order type, was {order_type:?}"
1655                );
1656            }
1657            params.ord_type(BitmexOrderType::Pegged);
1658            params.peg_price_type(peg_type);
1659
1660            if let Some(offset) = peg_offset_value {
1661                params.peg_offset_value(offset);
1662            }
1663        }
1664
1665        let mut exec_inst = Vec::new();
1666
1667        if post_only {
1668            exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1669        }
1670
1671        if reduce_only {
1672            exec_inst.push(BitmexExecInstruction::ReduceOnly);
1673        }
1674
1675        // For trailing stops, trigger_type specifies which price to track (Mark, Last, Index)
1676        if (trigger_price.is_some() || is_trailing_stop)
1677            && let Some(trigger_type) = trigger_type
1678        {
1679            match trigger_type {
1680                TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1681                TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1682                TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1683                _ => {} // Use BitMEX default (LastPrice) for other trigger types
1684            }
1685        }
1686
1687        if !exec_inst.is_empty() {
1688            params.exec_inst(exec_inst);
1689        }
1690
1691        if let Some(contingency_type) = contingency_type {
1692            let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1693            params.contingency_type(bitmex_contingency);
1694        }
1695
1696        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1697
1698        let response = self.inner.place_order(params).await?;
1699
1700        let order: BitmexOrder = serde_json::from_value(response)?;
1701
1702        if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1703            let reason = order
1704                .ord_rej_reason
1705                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1706            anyhow::bail!("Order rejected: {reason}");
1707        }
1708
1709        // Cache order type for future lookups (e.g., cancel responses missing ord_type)
1710        self.order_type_cache.insert(client_order_id, order_type);
1711
1712        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1713        let ts_init = self.generate_ts_init();
1714
1715        parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1716    }
1717
1718    /// Cancel an order.
1719    ///
1720    /// # Errors
1721    ///
1722    /// Returns an error if:
1723    /// - Credentials are missing.
1724    /// - The request fails.
1725    /// - The order doesn't exist.
1726    /// - The API returns an error.
1727    pub async fn cancel_order(
1728        &self,
1729        instrument_id: InstrumentId,
1730        client_order_id: Option<ClientOrderId>,
1731        venue_order_id: Option<VenueOrderId>,
1732    ) -> anyhow::Result<OrderStatusReport> {
1733        let mut params = super::query::DeleteOrderParamsBuilder::default();
1734        params.text(NAUTILUS_TRADER);
1735
1736        if let Some(venue_order_id) = venue_order_id {
1737            params.order_id(vec![venue_order_id.as_str().to_string()]);
1738        } else if let Some(client_order_id) = client_order_id {
1739            params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1740        } else {
1741            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1742        }
1743
1744        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1745
1746        let response = self.inner.cancel_orders(params).await?;
1747
1748        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1749        let order = orders
1750            .into_iter()
1751            .next()
1752            .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1753
1754        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1755        let ts_init = self.generate_ts_init();
1756
1757        parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1758    }
1759
1760    /// Cancel multiple orders.
1761    ///
1762    /// # Errors
1763    ///
1764    /// Returns an error if:
1765    /// - Credentials are missing.
1766    /// - The request fails.
1767    /// - The order doesn't exist.
1768    /// - The API returns an error.
1769    pub async fn cancel_orders(
1770        &self,
1771        instrument_id: InstrumentId,
1772        client_order_ids: Option<Vec<ClientOrderId>>,
1773        venue_order_ids: Option<Vec<VenueOrderId>>,
1774    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1775        let mut params = super::query::DeleteOrderParamsBuilder::default();
1776        params.text(NAUTILUS_TRADER);
1777
1778        // BitMEX API requires either client order IDs or venue order IDs, not both
1779        // Prioritize venue order IDs if both are provided
1780        if let Some(venue_order_ids) = venue_order_ids {
1781            if venue_order_ids.is_empty() {
1782                anyhow::bail!("venue_order_ids cannot be empty");
1783            }
1784            params.order_id(
1785                venue_order_ids
1786                    .iter()
1787                    .map(|id| id.to_string())
1788                    .collect::<Vec<_>>(),
1789            );
1790        } else if let Some(client_order_ids) = client_order_ids {
1791            if client_order_ids.is_empty() {
1792                anyhow::bail!("client_order_ids cannot be empty");
1793            }
1794            params.cl_ord_id(
1795                client_order_ids
1796                    .iter()
1797                    .map(|id| id.to_string())
1798                    .collect::<Vec<_>>(),
1799            );
1800        } else {
1801            anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1802        }
1803
1804        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1805
1806        let response = self.inner.cancel_orders(params).await?;
1807
1808        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1809
1810        let ts_init = self.generate_ts_init();
1811        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1812
1813        let mut reports = Vec::new();
1814
1815        for order in orders {
1816            reports.push(parse_order_status_report(
1817                &order,
1818                &instrument,
1819                &self.order_type_cache,
1820                ts_init,
1821            )?);
1822        }
1823
1824        Self::populate_linked_order_ids(&mut reports);
1825
1826        Ok(reports)
1827    }
1828
1829    /// Cancel all orders for an instrument and optionally an order side.
1830    ///
1831    /// # Errors
1832    ///
1833    /// Returns an error if:
1834    /// - Credentials are missing.
1835    /// - The request fails.
1836    /// - The order doesn't exist.
1837    /// - The API returns an error.
1838    pub async fn cancel_all_orders(
1839        &self,
1840        instrument_id: InstrumentId,
1841        order_side: Option<OrderSide>,
1842    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1843        let mut params = DeleteAllOrdersParamsBuilder::default();
1844        params.text(NAUTILUS_TRADER);
1845        params.symbol(instrument_id.symbol.as_str());
1846
1847        if let Some(side) = order_side {
1848            if side == OrderSide::NoOrderSide {
1849                log::debug!("Ignoring NoOrderSide filter for cancel_all_orders on {instrument_id}",);
1850            } else {
1851                let side = BitmexSide::from(side.as_specified());
1852                params.filter(serde_json::json!({
1853                    "side": side
1854                }));
1855            }
1856        }
1857
1858        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1859
1860        let response = self.inner.cancel_all_orders(params).await?;
1861
1862        let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1863
1864        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1865        let ts_init = self.generate_ts_init();
1866
1867        let mut reports = Vec::new();
1868
1869        for order in orders {
1870            reports.push(parse_order_status_report(
1871                &order,
1872                &instrument,
1873                &self.order_type_cache,
1874                ts_init,
1875            )?);
1876        }
1877
1878        Self::populate_linked_order_ids(&mut reports);
1879
1880        Ok(reports)
1881    }
1882
1883    /// Modify an existing order.
1884    ///
1885    /// # Errors
1886    ///
1887    /// Returns an error if:
1888    /// - Credentials are missing.
1889    /// - The request fails.
1890    /// - The order doesn't exist.
1891    /// - The order is already closed.
1892    /// - The API returns an error.
1893    pub async fn modify_order(
1894        &self,
1895        instrument_id: InstrumentId,
1896        client_order_id: Option<ClientOrderId>,
1897        venue_order_id: Option<VenueOrderId>,
1898        quantity: Option<Quantity>,
1899        price: Option<Price>,
1900        trigger_price: Option<Price>,
1901    ) -> anyhow::Result<OrderStatusReport> {
1902        let mut params = PutOrderParamsBuilder::default();
1903        params.text(NAUTILUS_TRADER);
1904
1905        // Set order ID - prefer venue_order_id if available
1906        if let Some(venue_order_id) = venue_order_id {
1907            params.order_id(venue_order_id.as_str());
1908        } else if let Some(client_order_id) = client_order_id {
1909            params.orig_cl_ord_id(client_order_id.as_str());
1910        } else {
1911            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1912        }
1913
1914        if let Some(quantity) = quantity {
1915            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1916            params.order_qty(quantity_to_u32(&quantity, &instrument));
1917        }
1918
1919        if let Some(price) = price {
1920            params.price(price.as_f64());
1921        }
1922
1923        if let Some(trigger_price) = trigger_price {
1924            params.stop_px(trigger_price.as_f64());
1925        }
1926
1927        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1928
1929        let response = self.inner.amend_order(params).await?;
1930
1931        let order: BitmexOrder = serde_json::from_value(response)?;
1932
1933        if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1934            let reason = order
1935                .ord_rej_reason
1936                .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1937            anyhow::bail!("Order modification rejected: {reason}");
1938        }
1939
1940        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1941        let ts_init = self.generate_ts_init();
1942
1943        parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1944    }
1945
1946    /// Query a single order by client order ID or venue order ID.
1947    ///
1948    /// # Errors
1949    ///
1950    /// Returns an error if:
1951    /// - Credentials are missing.
1952    /// - The request fails.
1953    /// - The API returns an error.
1954    pub async fn query_order(
1955        &self,
1956        instrument_id: InstrumentId,
1957        client_order_id: Option<ClientOrderId>,
1958        venue_order_id: Option<VenueOrderId>,
1959    ) -> anyhow::Result<Option<OrderStatusReport>> {
1960        let mut params = GetOrderParamsBuilder::default();
1961
1962        let filter_json = if let Some(client_order_id) = client_order_id {
1963            serde_json::json!({
1964                "clOrdID": client_order_id.to_string()
1965            })
1966        } else if let Some(venue_order_id) = venue_order_id {
1967            serde_json::json!({
1968                "orderID": venue_order_id.to_string()
1969            })
1970        } else {
1971            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1972        };
1973
1974        params.filter(filter_json);
1975        params.count(1); // Only need one order
1976
1977        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1978
1979        let response = self.inner.get_orders(params).await?;
1980
1981        if response.is_empty() {
1982            return Ok(None);
1983        }
1984
1985        let order = &response[0];
1986
1987        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1988        let ts_init = self.generate_ts_init();
1989
1990        let report =
1991            parse_order_status_report(order, &instrument, &self.order_type_cache, ts_init)?;
1992
1993        Ok(Some(report))
1994    }
1995
1996    /// Request a single order status report.
1997    ///
1998    /// # Errors
1999    ///
2000    /// Returns an error if:
2001    /// - Credentials are missing.
2002    /// - The request fails.
2003    /// - The API returns an error.
2004    pub async fn request_order_status_report(
2005        &self,
2006        instrument_id: InstrumentId,
2007        client_order_id: Option<ClientOrderId>,
2008        venue_order_id: Option<VenueOrderId>,
2009    ) -> anyhow::Result<OrderStatusReport> {
2010        if venue_order_id.is_none() && client_order_id.is_none() {
2011            anyhow::bail!("Either venue_order_id or client_order_id must be provided");
2012        }
2013
2014        let mut params = GetOrderParamsBuilder::default();
2015        params.symbol(instrument_id.symbol.as_str());
2016
2017        if let Some(venue_order_id) = venue_order_id {
2018            params.filter(serde_json::json!({
2019                "orderID": venue_order_id.as_str()
2020            }));
2021        } else if let Some(client_order_id) = client_order_id {
2022            params.filter(serde_json::json!({
2023                "clOrdID": client_order_id.as_str()
2024            }));
2025        }
2026
2027        params.count(1i32);
2028        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2029
2030        let response = self.inner.get_orders(params).await?;
2031
2032        let order = response
2033            .into_iter()
2034            .next()
2035            .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
2036
2037        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2038        let ts_init = self.generate_ts_init();
2039
2040        parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
2041    }
2042
2043    /// Request multiple order status reports.
2044    ///
2045    /// # Errors
2046    ///
2047    /// Returns an error if:
2048    /// - Credentials are missing.
2049    /// - The request fails.
2050    /// - The API returns an error.
2051    pub async fn request_order_status_reports(
2052        &self,
2053        instrument_id: Option<InstrumentId>,
2054        open_only: bool,
2055        start: Option<DateTime<Utc>>,
2056        end: Option<DateTime<Utc>>,
2057        limit: Option<u32>,
2058    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2059        if let (Some(start), Some(end)) = (start, end) {
2060            anyhow::ensure!(
2061                start < end,
2062                "Invalid time range: start={start:?} end={end:?}",
2063            );
2064        }
2065
2066        let mut params = GetOrderParamsBuilder::default();
2067
2068        if let Some(instrument_id) = &instrument_id {
2069            params.symbol(instrument_id.symbol.as_str());
2070        }
2071
2072        if open_only {
2073            params.filter(serde_json::json!({
2074                "open": true
2075            }));
2076        }
2077
2078        if let Some(start) = start {
2079            params.start_time(start);
2080        }
2081
2082        if let Some(end) = end {
2083            params.end_time(end);
2084        }
2085
2086        if let Some(limit) = limit {
2087            params.count(limit as i32);
2088        } else {
2089            params.count(500); // Default count to avoid empty query
2090        }
2091
2092        params.reverse(true); // Get newest orders first
2093
2094        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2095
2096        let response = self.inner.get_orders(params).await?;
2097
2098        let ts_init = self.generate_ts_init();
2099
2100        let mut reports = Vec::new();
2101
2102        for order in response {
2103            if let Some(start) = start {
2104                match order.timestamp {
2105                    Some(timestamp) if timestamp < start => continue,
2106                    Some(_) => {}
2107                    None => {
2108                        log::debug!("Skipping order report without timestamp for bounded query");
2109                        continue;
2110                    }
2111                }
2112            }
2113
2114            if let Some(end) = end {
2115                match order.timestamp {
2116                    Some(timestamp) if timestamp > end => continue,
2117                    Some(_) => {}
2118                    None => {
2119                        log::debug!("Skipping order report without timestamp for bounded query");
2120                        continue;
2121                    }
2122                }
2123            }
2124
2125            // Skip orders without symbol (can happen with query responses)
2126            let Some(symbol) = order.symbol else {
2127                log::warn!("Order response missing symbol, skipping");
2128                continue;
2129            };
2130
2131            let Ok(instrument) = self.instrument_from_cache(symbol) else {
2132                log::debug!("Skipping order report for instrument not in cache: symbol={symbol}");
2133                continue;
2134            };
2135
2136            match parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init) {
2137                Ok(report) => reports.push(report),
2138                Err(e) => log::error!("Failed to parse order status report: {e}"),
2139            }
2140        }
2141
2142        Self::populate_linked_order_ids(&mut reports);
2143
2144        Ok(reports)
2145    }
2146
2147    /// Request trades for the given instrument.
2148    ///
2149    /// # Errors
2150    ///
2151    /// Returns an error if the HTTP request fails or parsing fails.
2152    pub async fn request_trades(
2153        &self,
2154        instrument_id: InstrumentId,
2155        start: Option<DateTime<Utc>>,
2156        end: Option<DateTime<Utc>>,
2157        limit: Option<u32>,
2158    ) -> anyhow::Result<Vec<TradeTick>> {
2159        let mut params = GetTradeParamsBuilder::default();
2160        params.symbol(instrument_id.symbol.as_str());
2161
2162        if let Some(start) = start {
2163            params.start_time(start);
2164        }
2165
2166        if let Some(end) = end {
2167            params.end_time(end);
2168        }
2169
2170        if let (Some(start), Some(end)) = (start, end) {
2171            anyhow::ensure!(
2172                start < end,
2173                "Invalid time range: start={start:?} end={end:?}",
2174            );
2175        }
2176
2177        if let Some(limit) = limit {
2178            let clamped_limit = limit.min(1000);
2179            if limit > 1000 {
2180                log::warn!(
2181                    "BitMEX trade request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2182                );
2183            }
2184            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2185        }
2186        params.reverse(false);
2187        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2188
2189        let response = self.inner.get_trades(params).await?;
2190
2191        let ts_init = self.generate_ts_init();
2192
2193        let mut parsed_trades = Vec::new();
2194
2195        for trade in response {
2196            if let Some(start) = start
2197                && trade.timestamp < start
2198            {
2199                continue;
2200            }
2201
2202            if let Some(end) = end
2203                && trade.timestamp > end
2204            {
2205                continue;
2206            }
2207
2208            let Some(instrument) = self.get_instrument(&trade.symbol) else {
2209                log::error!(
2210                    "Instrument {} not found in cache, skipping trade",
2211                    trade.symbol
2212                );
2213                continue;
2214            };
2215
2216            match parse_trade(&trade, &instrument, ts_init) {
2217                Ok(trade) => parsed_trades.push(trade),
2218                Err(e) => log::error!("Failed to parse trade: {e}"),
2219            }
2220        }
2221
2222        Ok(parsed_trades)
2223    }
2224
2225    /// Request bars for the given bar type.
2226    ///
2227    /// # Errors
2228    ///
2229    /// Returns an error if the HTTP request fails, parsing fails, or the bar specification is
2230    /// unsupported by BitMEX.
2231    pub async fn request_bars(
2232        &self,
2233        mut bar_type: BarType,
2234        start: Option<DateTime<Utc>>,
2235        end: Option<DateTime<Utc>>,
2236        limit: Option<u32>,
2237        partial: bool,
2238    ) -> anyhow::Result<Vec<Bar>> {
2239        bar_type = bar_type.standard();
2240
2241        anyhow::ensure!(
2242            bar_type.aggregation_source() == AggregationSource::External,
2243            "Only EXTERNAL aggregation bars are supported"
2244        );
2245        anyhow::ensure!(
2246            bar_type.spec().price_type == PriceType::Last,
2247            "Only LAST price type bars are supported"
2248        );
2249
2250        if let (Some(start), Some(end)) = (start, end) {
2251            anyhow::ensure!(
2252                start < end,
2253                "Invalid time range: start={start:?} end={end:?}"
2254            );
2255        }
2256
2257        let spec = bar_type.spec();
2258        let bin_size = match (spec.aggregation, spec.step.get()) {
2259            (BarAggregation::Minute, 1) => "1m",
2260            (BarAggregation::Minute, 5) => "5m",
2261            (BarAggregation::Hour, 1) => "1h",
2262            (BarAggregation::Day, 1) => "1d",
2263            _ => anyhow::bail!(
2264                "BitMEX does not support {}-{:?}-{:?} bars",
2265                spec.step.get(),
2266                spec.aggregation,
2267                spec.price_type,
2268            ),
2269        };
2270
2271        let instrument_id = bar_type.instrument_id();
2272        let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2273
2274        let mut params = GetTradeBucketedParamsBuilder::default();
2275        params.symbol(instrument_id.symbol.as_str());
2276        params.bin_size(bin_size);
2277
2278        if partial {
2279            params.partial(true);
2280        }
2281
2282        if let Some(start) = start {
2283            params.start_time(start);
2284        }
2285
2286        if let Some(end) = end {
2287            params.end_time(end);
2288        }
2289
2290        if let Some(limit) = limit {
2291            let clamped_limit = limit.min(1000);
2292            if limit > 1000 {
2293                log::warn!(
2294                    "BitMEX bar request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2295                );
2296            }
2297            params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2298        }
2299        params.reverse(false);
2300        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2301
2302        let response = self.inner.get_trade_bucketed(params).await?;
2303        let ts_init = self.generate_ts_init();
2304        let mut bars = Vec::new();
2305
2306        for bin in response {
2307            if let Some(start) = start
2308                && bin.timestamp < start
2309            {
2310                continue;
2311            }
2312
2313            if let Some(end) = end
2314                && bin.timestamp > end
2315            {
2316                continue;
2317            }
2318
2319            if bin.symbol != instrument_id.symbol.inner() {
2320                log::warn!(
2321                    "Skipping trade bin for unexpected symbol: symbol={}, expected={}",
2322                    bin.symbol,
2323                    instrument_id.symbol,
2324                );
2325                continue;
2326            }
2327
2328            match parse_trade_bin(&bin, &instrument, &bar_type, ts_init) {
2329                Ok(bar) => bars.push(bar),
2330                Err(e) => log::warn!("Failed to parse trade bin: {e}"),
2331            }
2332        }
2333
2334        Ok(bars)
2335    }
2336
2337    /// Request fill reports for the given instrument.
2338    ///
2339    /// # Errors
2340    ///
2341    /// Returns an error if the HTTP request fails or parsing fails.
2342    pub async fn request_fill_reports(
2343        &self,
2344        instrument_id: Option<InstrumentId>,
2345        start: Option<DateTime<Utc>>,
2346        end: Option<DateTime<Utc>>,
2347        limit: Option<u32>,
2348    ) -> anyhow::Result<Vec<FillReport>> {
2349        if let (Some(start), Some(end)) = (start, end) {
2350            anyhow::ensure!(
2351                start < end,
2352                "Invalid time range: start={start:?} end={end:?}",
2353            );
2354        }
2355
2356        let mut params = GetExecutionParamsBuilder::default();
2357
2358        if let Some(instrument_id) = instrument_id {
2359            params.symbol(instrument_id.symbol.as_str());
2360        }
2361
2362        if let Some(start) = start {
2363            params.start_time(start);
2364        }
2365
2366        if let Some(end) = end {
2367            params.end_time(end);
2368        }
2369
2370        if let Some(limit) = limit {
2371            params.count(limit as i32);
2372        } else {
2373            params.count(500); // Default count
2374        }
2375        params.reverse(true); // Get newest fills first
2376
2377        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2378
2379        let response = self.inner.get_executions(params).await?;
2380
2381        let ts_init = self.generate_ts_init();
2382
2383        let mut reports = Vec::new();
2384
2385        for exec in response {
2386            if let Some(start) = start {
2387                match exec.transact_time {
2388                    Some(timestamp) if timestamp < start => continue,
2389                    Some(_) => {}
2390                    None => {
2391                        log::debug!("Skipping fill report without transact_time for bounded query");
2392                        continue;
2393                    }
2394                }
2395            }
2396
2397            if let Some(end) = end {
2398                match exec.transact_time {
2399                    Some(timestamp) if timestamp > end => continue,
2400                    Some(_) => {}
2401                    None => {
2402                        log::debug!("Skipping fill report without transact_time for bounded query");
2403                        continue;
2404                    }
2405                }
2406            }
2407
2408            // Skip executions without symbol (e.g., CancelReject)
2409            let Some(symbol) = exec.symbol else {
2410                log::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2411                continue;
2412            };
2413            let symbol_str = symbol.to_string();
2414
2415            let instrument = match self.instrument_from_cache(symbol) {
2416                Ok(instrument) => instrument,
2417                Err(e) => {
2418                    log::error!(
2419                        "Instrument not found in cache for execution parsing: symbol={symbol_str}, {e}"
2420                    );
2421                    continue;
2422                }
2423            };
2424
2425            match parse_fill_report(&exec, &instrument, ts_init) {
2426                Ok(report) => reports.push(report),
2427                Err(e) => {
2428                    // Log at debug level for expected skip cases
2429                    let error_msg = e.to_string();
2430                    if error_msg.starts_with("Skipping non-trade execution")
2431                        || error_msg.starts_with("Skipping execution without order_id")
2432                    {
2433                        log::debug!("{e}");
2434                    } else {
2435                        log::error!("Failed to parse fill report: {e}");
2436                    }
2437                }
2438            }
2439        }
2440
2441        Ok(reports)
2442    }
2443
2444    /// Request position reports.
2445    ///
2446    /// # Errors
2447    ///
2448    /// Returns an error if the HTTP request fails or parsing fails.
2449    pub async fn request_position_status_reports(
2450        &self,
2451    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2452        let params = GetPositionParamsBuilder::default()
2453            .count(500) // Default count
2454            .build()
2455            .map_err(|e| anyhow::anyhow!(e))?;
2456
2457        let response = self.inner.get_positions(params).await?;
2458
2459        let ts_init = self.generate_ts_init();
2460
2461        let mut reports = Vec::new();
2462
2463        for pos in response {
2464            let symbol = pos.symbol;
2465            let instrument = match self.instrument_from_cache(symbol) {
2466                Ok(instrument) => instrument,
2467                Err(e) => {
2468                    log::error!(
2469                        "Instrument not found in cache for position parsing: symbol={}, {e}",
2470                        pos.symbol.as_str(),
2471                    );
2472                    continue;
2473                }
2474            };
2475
2476            match parse_position_report(&pos, &instrument, ts_init) {
2477                Ok(report) => reports.push(report),
2478                Err(e) => log::error!("Failed to parse position report: {e}"),
2479            }
2480        }
2481
2482        Ok(reports)
2483    }
2484
2485    /// Update position leverage.
2486    ///
2487    /// # Errors
2488    ///
2489    /// - Credentials are missing.
2490    /// - The request fails.
2491    /// - The API returns an error.
2492    pub async fn update_position_leverage(
2493        &self,
2494        symbol: &str,
2495        leverage: f64,
2496    ) -> anyhow::Result<PositionStatusReport> {
2497        let params = PostPositionLeverageParams {
2498            symbol: symbol.to_string(),
2499            leverage,
2500            target_account_id: None,
2501        };
2502
2503        let response = self.inner.update_position_leverage(params).await?;
2504
2505        let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2506        let ts_init = self.generate_ts_init();
2507
2508        parse_position_report(&response, &instrument, ts_init)
2509    }
2510}
2511
2512#[cfg(test)]
2513mod tests {
2514    use nautilus_core::UUID4;
2515    use nautilus_model::enums::OrderStatus;
2516    use rstest::rstest;
2517    use serde_json::json;
2518
2519    use super::*;
2520
2521    fn build_report(
2522        client_order_id: &str,
2523        venue_order_id: &str,
2524        contingency_type: ContingencyType,
2525        order_list_id: Option<&str>,
2526    ) -> OrderStatusReport {
2527        let mut report = OrderStatusReport::new(
2528            AccountId::from("BITMEX-1"),
2529            InstrumentId::from("XBTUSD.BITMEX"),
2530            Some(ClientOrderId::from(client_order_id)),
2531            VenueOrderId::from(venue_order_id),
2532            OrderSide::Buy,
2533            OrderType::Limit,
2534            TimeInForce::Gtc,
2535            OrderStatus::Accepted,
2536            Quantity::new(100.0, 0),
2537            Quantity::default(),
2538            UnixNanos::from(1_u64),
2539            UnixNanos::from(1_u64),
2540            UnixNanos::from(1_u64),
2541            Some(UUID4::new()),
2542        );
2543
2544        if let Some(id) = order_list_id {
2545            report = report.with_order_list_id(OrderListId::from(id));
2546        }
2547
2548        report.with_contingency_type(contingency_type)
2549    }
2550
2551    #[rstest]
2552    fn test_sign_request_generates_correct_headers() {
2553        let client = BitmexRawHttpClient::with_credentials(
2554            "test_api_key".to_string(),
2555            "test_api_secret".to_string(),
2556            "http://localhost:8080".to_string(),
2557            60,
2558            3,
2559            1_000,
2560            10_000,
2561            10_000,
2562            10,
2563            120,
2564            None,
2565        )
2566        .expect("Failed to create test client");
2567
2568        let headers = client
2569            .sign_request(&Method::GET, "/api/v1/order", None)
2570            .unwrap();
2571
2572        assert!(headers.contains_key("api-key"));
2573        assert!(headers.contains_key("api-signature"));
2574        assert!(headers.contains_key("api-expires"));
2575        assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2576    }
2577
2578    #[rstest]
2579    fn test_sign_request_with_body() {
2580        let client = BitmexRawHttpClient::with_credentials(
2581            "test_api_key".to_string(),
2582            "test_api_secret".to_string(),
2583            "http://localhost:8080".to_string(),
2584            60,
2585            3,
2586            1_000,
2587            10_000,
2588            10_000,
2589            10,
2590            120,
2591            None,
2592        )
2593        .expect("Failed to create test client");
2594
2595        let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2596        let body_bytes = serde_json::to_vec(&body).unwrap();
2597
2598        let headers_without_body = client
2599            .sign_request(&Method::POST, "/api/v1/order", None)
2600            .unwrap();
2601        let headers_with_body = client
2602            .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2603            .unwrap();
2604
2605        // Signatures should be different when body is included
2606        assert_ne!(
2607            headers_without_body.get("api-signature").unwrap(),
2608            headers_with_body.get("api-signature").unwrap()
2609        );
2610    }
2611
2612    #[rstest]
2613    fn test_sign_request_uses_custom_recv_window() {
2614        let client_default = BitmexRawHttpClient::with_credentials(
2615            "test_api_key".to_string(),
2616            "test_api_secret".to_string(),
2617            "http://localhost:8080".to_string(),
2618            60,
2619            3,
2620            1_000,
2621            10_000,
2622            10_000, // default recv_window_ms (10000ms = 10s)
2623            10,
2624            120,
2625            None,
2626        )
2627        .expect("Failed to create test client");
2628
2629        let client_custom = BitmexRawHttpClient::with_credentials(
2630            "test_api_key".to_string(),
2631            "test_api_secret".to_string(),
2632            "http://localhost:8080".to_string(),
2633            60,
2634            3,
2635            1_000,
2636            10_000,
2637            30_000, // 30 seconds
2638            10,
2639            120,
2640            None,
2641        )
2642        .expect("Failed to create test client");
2643
2644        let headers_default = client_default
2645            .sign_request(&Method::GET, "/api/v1/order", None)
2646            .unwrap();
2647        let headers_custom = client_custom
2648            .sign_request(&Method::GET, "/api/v1/order", None)
2649            .unwrap();
2650
2651        // Parse expires timestamps
2652        let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2653        let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2654
2655        // Verify both are valid future timestamps
2656        let now = Utc::now().timestamp();
2657        assert!(expires_default > now);
2658        assert!(expires_custom > now);
2659
2660        // Custom window should be greater than default
2661        assert!(expires_custom > expires_default);
2662
2663        // The difference should be approximately 20 seconds (30s - 10s)
2664        // Allow wider tolerance for delays between calls on slow CI runners
2665        let diff = expires_custom - expires_default;
2666        assert!((18..=25).contains(&diff));
2667    }
2668
2669    #[rstest]
2670    fn test_populate_linked_order_ids_from_order_list() {
2671        let base = "O-20250922-002219-001-000";
2672        let entry = format!("{base}-1");
2673        let stop = format!("{base}-2");
2674        let take = format!("{base}-3");
2675
2676        let mut reports = vec![
2677            build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2678            build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2679            build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2680        ];
2681
2682        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2683
2684        assert_eq!(
2685            reports[0].linked_order_ids,
2686            Some(vec![
2687                ClientOrderId::from(stop.as_str()),
2688                ClientOrderId::from(take.as_str()),
2689            ]),
2690        );
2691        assert_eq!(
2692            reports[1].linked_order_ids,
2693            Some(vec![
2694                ClientOrderId::from(entry.as_str()),
2695                ClientOrderId::from(take.as_str()),
2696            ]),
2697        );
2698        assert_eq!(
2699            reports[2].linked_order_ids,
2700            Some(vec![
2701                ClientOrderId::from(entry.as_str()),
2702                ClientOrderId::from(stop.as_str()),
2703            ]),
2704        );
2705    }
2706
2707    #[rstest]
2708    fn test_populate_linked_order_ids_from_id_prefix() {
2709        let base = "O-20250922-002220-001-000";
2710        let entry = format!("{base}-1");
2711        let stop = format!("{base}-2");
2712        let take = format!("{base}-3");
2713
2714        let mut reports = vec![
2715            build_report(&entry, "V-1", ContingencyType::Oto, None),
2716            build_report(&stop, "V-2", ContingencyType::Ouo, None),
2717            build_report(&take, "V-3", ContingencyType::Ouo, None),
2718        ];
2719
2720        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2721
2722        assert_eq!(
2723            reports[0].linked_order_ids,
2724            Some(vec![
2725                ClientOrderId::from(stop.as_str()),
2726                ClientOrderId::from(take.as_str()),
2727            ]),
2728        );
2729        assert_eq!(
2730            reports[1].linked_order_ids,
2731            Some(vec![
2732                ClientOrderId::from(entry.as_str()),
2733                ClientOrderId::from(take.as_str()),
2734            ]),
2735        );
2736        assert_eq!(
2737            reports[2].linked_order_ids,
2738            Some(vec![
2739                ClientOrderId::from(entry.as_str()),
2740                ClientOrderId::from(stop.as_str()),
2741            ]),
2742        );
2743    }
2744
2745    #[rstest]
2746    fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2747        let base = "O-20250922-002221-001-000";
2748        let entry = format!("{base}-1");
2749        let passive = format!("{base}-2");
2750
2751        let mut reports = vec![
2752            build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2753            build_report(&passive, "V-2", ContingencyType::Ouo, None),
2754        ];
2755
2756        BitmexHttpClient::populate_linked_order_ids(&mut reports);
2757
2758        // Non-contingent orders should not be linked
2759        assert!(reports[0].linked_order_ids.is_none());
2760
2761        // A contingent order with no other contingent peers should have contingency reset
2762        assert!(reports[1].linked_order_ids.is_none());
2763        assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2764    }
2765}