Skip to main content

nautilus_okx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides an ergonomic wrapper around the **OKX v5 REST API** –
17//! <https://www.okx.com/docs-v5/en/>.
18//!
19//! The core type exported by this module is [`OKXHttpClient`].  It offers an
20//! interface to all exchange endpoints currently required by NautilusTrader.
21//!
22//! Key responsibilities handled internally:
23//! • Request signing and header composition for private routes (HMAC-SHA256).
24//! • Rate-limiting based on the public OKX specification.
25//! • Deserialization of JSON payloads into domain models.
26//! • Conversion of raw exchange errors into the rich [`OKXHttpError`] enum.
27//!
28//! # Official Documentation
29//!
30//! | Endpoint                             | Reference                                              |
31//! |--------------------------------------|--------------------------------------------------------|
32//! | Market data                          | <https://www.okx.com/docs-v5/en/#rest-api-market-data> |
33//! | Account & positions                  | <https://www.okx.com/docs-v5/en/#rest-api-account>     |
34//! | Funding & asset balances             | <https://www.okx.com/docs-v5/en/#rest-api-funding>     |
35
36use std::{
37    collections::HashMap,
38    fmt::Debug,
39    num::NonZeroU32,
40    str::FromStr,
41    sync::{
42        Arc, LazyLock,
43        atomic::{AtomicBool, Ordering},
44    },
45};
46
47use ahash::{AHashMap, AHashSet};
48use anyhow::Context;
49use chrono::{DateTime, Utc};
50use nautilus_core::{
51    AtomicMap, AtomicTime, UnixNanos, consts::NAUTILUS_USER_AGENT,
52    datetime::NANOSECONDS_IN_MILLISECOND, env::get_or_env_var, string::secret::REDACTED,
53    time::get_atomic_clock_realtime,
54};
55use nautilus_model::{
56    data::{
57        Bar, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate,
58        OrderBookDelta, OrderBookDeltas, TradeTick, forward::ForwardPrice,
59    },
60    enums::{
61        AggregationSource, BarAggregation, BookAction, BookType, OrderSide, OrderType,
62        PositionSide, RecordFlag, TimeInForce, TriggerType,
63    },
64    events::AccountState,
65    identifiers::{AccountId, ClientOrderId, InstrumentId},
66    instruments::{Instrument, InstrumentAny},
67    orderbook::OrderBook,
68    reports::{FillReport, OrderStatusReport, PositionStatusReport},
69    types::{Price, Quantity},
70};
71use nautilus_network::{
72    http::{HttpClient, Method, StatusCode, USER_AGENT},
73    ratelimiter::quota::Quota,
74    retry::{RetryConfig, RetryManager},
75};
76use rust_decimal::Decimal;
77use serde::{Deserialize, Serialize, de::DeserializeOwned};
78use tokio_util::sync::CancellationToken;
79use ustr::Ustr;
80
81use super::{
82    error::OKXHttpError,
83    models::{
84        OKXAccount, OKXAmendAlgoOrderRequest, OKXAmendAlgoOrderResponse, OKXAttachAlgoOrdRequest,
85        OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate, OKXFundingRateHistory,
86        OKXIndexTicker, OKXMarkPrice, OKXOptionSummary, OKXOrderAlgo, OKXOrderBookSnapshot,
87        OKXOrderHistory, OKXPlaceAlgoOrderRequest, OKXPlaceAlgoOrderResponse, OKXPlaceOrderRequest,
88        OKXPlaceOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
89        OKXTransactionDetail,
90    },
91    query::{
92        GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
93        GetCandlesticksParamsBuilder, GetFundingRateHistoryParams, GetIndexTickerParams,
94        GetIndexTickerParamsBuilder, GetInstrumentsParams, GetInstrumentsParamsBuilder,
95        GetMarkPriceParams, GetMarkPriceParamsBuilder, GetOptionSummaryParams, GetOrderBookParams,
96        GetOrderHistoryParams, GetOrderHistoryParamsBuilder, GetOrderListParams,
97        GetOrderListParamsBuilder, GetPositionTiersParams, GetPositionsHistoryParams,
98        GetPositionsParams, GetPositionsParamsBuilder, GetTradeFeeParams, GetTradesParams,
99        GetTradesParamsBuilder, GetTransactionDetailsParams, GetTransactionDetailsParamsBuilder,
100        SetPositionModeParams, SetPositionModeParamsBuilder,
101    },
102};
103use crate::{
104    common::{
105        consts::{
106            OKX_FIELD_SCODE, OKX_FIELD_SMSG, OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID,
107            OKX_SUPPORTED_ORDER_TYPES, OKX_SUPPORTED_TIME_IN_FORCE, should_retry_error_code,
108        },
109        credential::Credential,
110        enums::{
111            OKXAlgoOrderType, OKXContractType, OKXEnvironment, OKXInstrumentStatus,
112            OKXInstrumentType, OKXOrderStatus, OKXOrderType, OKXPositionMode, OKXPositionSide,
113            OKXSide, OKXTargetCurrency, OKXTradeMode, OKXTriggerType,
114            conditional_order_to_algo_type,
115        },
116        models::OKXInstrument,
117        parse::{
118            extract_inst_family, okx_instrument_type, okx_instrument_type_from_symbol,
119            parse_account_state, parse_base_quote_from_symbol, parse_candlestick,
120            parse_fill_report, parse_funding_rate, parse_index_price_update, parse_instrument_any,
121            parse_instrument_id, parse_mark_price_update, parse_order_status_report,
122            parse_position_status_report, parse_price, parse_quantity,
123            parse_spot_margin_position_from_balance, parse_trade_tick,
124        },
125    },
126    http::{
127        models::{OKXCandlestick, OKXTrade},
128        query::GetOrderParams,
129    },
130    websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
131};
132
133const OKX_SUCCESS_CODE: &str = "0";
134
135/// Ranks a spot instrument's quote currency for deterministic tie-breaking
136/// when multiple pairs share the same base. Matches OKX's dominant-quote
137/// ordering so spot-margin position reports stay on a stable instrument id
138/// across restarts.
139fn spot_quote_priority(symbol: &str) -> u8 {
140    symbol.rsplit_once('-').map_or(4, |(_, quote)| match quote {
141        "USDT" => 0,
142        "USDC" => 1,
143        "USD" => 2,
144        _ => 3,
145    })
146}
147
148fn resolve_okx_error_message(response_body: &[u8], top_level_msg: &str) -> String {
149    let message = top_level_msg.trim();
150    let is_generic_top_level = message.eq_ignore_ascii_case("All operations failed");
151    if !message.is_empty() && !is_generic_top_level {
152        return message.to_string();
153    }
154
155    if let Ok(payload) = serde_json::from_slice::<serde_json::Value>(response_body)
156        && let Some(first_item) = payload
157            .get("data")
158            .and_then(serde_json::Value::as_array)
159            .and_then(|items| items.first())
160    {
161        if let Some(s_msg) = first_item
162            .get(OKX_FIELD_SMSG)
163            .and_then(serde_json::Value::as_str)
164        {
165            let s_msg = s_msg.trim();
166            if !s_msg.is_empty() {
167                return s_msg.to_string();
168            }
169        }
170
171        if let Some(s_code) = first_item
172            .get(OKX_FIELD_SCODE)
173            .and_then(serde_json::Value::as_str)
174        {
175            let s_code = s_code.trim();
176            if !s_code.is_empty() {
177                return s_code.to_string();
178            }
179        }
180    }
181
182    String::new()
183}
184
185#[cfg(test)]
186mod tests {
187    use rstest::rstest;
188
189    use super::resolve_okx_error_message;
190
191    #[rstest]
192    fn test_resolve_okx_error_message_prefers_detailed_s_msg_over_generic_top_level() {
193        let body = br#"{
194            "code": "1",
195            "msg": "All operations failed",
196            "data": [
197                {
198                    "sCode": "51046",
199                    "sMsg": "Test detailed failure"
200                }
201            ]
202        }"#;
203
204        assert_eq!(
205            resolve_okx_error_message(body, "All operations failed"),
206            "Test detailed failure",
207        );
208    }
209
210    #[rstest]
211    #[case("BTC-USD")]
212    #[case("BTC-USD-241217")]
213    #[case("BTC-USD-241217-92000")]
214    fn test_option_summary_expiry_key_rejects_short_symbol(#[case] symbol: &str) {
215        let result = super::OKXHttpClient::option_summary_expiry_key(symbol);
216        assert!(result.is_err());
217        let err = result.unwrap_err().to_string();
218        assert!(
219            err.contains("Expected OKX option symbol with expiry"),
220            "unexpected error: {err}"
221        );
222    }
223
224    #[rstest]
225    fn test_option_summary_expiry_key_extracts_base_quote_expiry() {
226        let result =
227            super::OKXHttpClient::option_summary_expiry_key("BTC-USD-241217-92000-C").unwrap();
228        assert_eq!(result, "BTC-USD-241217");
229    }
230
231    #[rstest]
232    #[case("BTC-USD")]
233    #[case("BTC-USD-241217")]
234    #[case("BTC-USD-241217-92000")]
235    fn test_option_summary_exp_time_rejects_short_symbol(#[case] symbol: &str) {
236        let result = super::OKXHttpClient::option_summary_exp_time(symbol);
237        assert!(result.is_err());
238        let err = result.unwrap_err().to_string();
239        assert!(
240            err.contains("Expected OKX option symbol with expiry"),
241            "unexpected error: {err}"
242        );
243    }
244
245    #[rstest]
246    fn test_option_summary_exp_time_extracts_expiry() {
247        let result =
248            super::OKXHttpClient::option_summary_exp_time("BTC-USD-241217-92000-C").unwrap();
249        assert_eq!(result, Some("241217".to_string()));
250    }
251}
252
253/// Default OKX REST API rate limit: 500 requests per 2 seconds.
254///
255/// - Sub-account order limit: 1000 requests per 2 seconds.
256/// - Account balance: 10 requests per 2 seconds.
257/// - Account instruments: 20 requests per 2 seconds.
258///
259/// We use a conservative 250 requests per second (500 per 2 seconds) as a general limit
260/// that should accommodate most use cases while respecting OKX's documented limits.
261pub static OKX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
262    Quota::per_second(NonZeroU32::new(250).expect("non-zero")).expect("valid constant")
263});
264
265const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
266
267// OKX returns at most 100 records per page for order, fill, and algo endpoints
268const OKX_PAGE_SIZE: usize = 100;
269
270// Safety cap on paginated reconciliation fetches to avoid unbounded loops
271const MAX_RECONCILIATION_PAGES: usize = 50;
272
273/// Represents an OKX HTTP response.
274#[derive(Debug, Serialize, Deserialize)]
275pub struct OKXResponse<T> {
276    /// The OKX response code, which is `"0"` for success.
277    pub code: String,
278    /// A message string which can be informational or describe an error cause.
279    pub msg: String,
280    /// The typed data returned by the OKX endpoint.
281    pub data: Vec<T>,
282}
283
284/// Provides a raw HTTP client for interacting with the [OKX](https://okx.com) REST API.
285///
286/// This client wraps the underlying [`HttpClient`] to handle functionality
287/// specific to OKX, such as request signing (for authenticated endpoints),
288/// forming request URLs, and deserializing responses into OKX specific data models.
289pub struct OKXRawHttpClient {
290    base_url: String,
291    client: HttpClient,
292    credential: Option<Credential>,
293    retry_manager: RetryManager<OKXHttpError>,
294    cancellation_token: CancellationToken,
295    environment: OKXEnvironment,
296}
297
298impl Default for OKXRawHttpClient {
299    fn default() -> Self {
300        Self::new(None, 60, 3, 1000, 10_000, OKXEnvironment::Live, None)
301            .expect("Failed to create default OKXRawHttpClient")
302    }
303}
304
305impl Debug for OKXRawHttpClient {
306    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307        let credential = self.credential.as_ref().map(|_| REDACTED);
308        f.debug_struct(stringify!(OKXRawHttpClient))
309            .field("base_url", &self.base_url)
310            .field("credential", &credential)
311            .finish_non_exhaustive()
312    }
313}
314
315impl OKXRawHttpClient {
316    fn rate_limiter_quotas() -> Vec<(String, Quota)> {
317        vec![
318            (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
319            (
320                "okx:/api/v5/account/balance".to_string(),
321                Quota::per_second(NonZeroU32::new(5).expect("non-zero")).expect("valid constant"),
322            ),
323            (
324                "okx:/api/v5/public/instruments".to_string(),
325                Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant"),
326            ),
327            (
328                "okx:/api/v5/market/candles".to_string(),
329                Quota::per_second(NonZeroU32::new(50).expect("non-zero")).expect("valid constant"),
330            ),
331            (
332                "okx:/api/v5/market/history-candles".to_string(),
333                Quota::per_second(NonZeroU32::new(20).expect("non-zero")).expect("valid constant"),
334            ),
335            (
336                "okx:/api/v5/market/history-trades".to_string(),
337                Quota::per_second(NonZeroU32::new(30).expect("non-zero")).expect("valid constant"),
338            ),
339            (
340                "okx:/api/v5/trade/order".to_string(),
341                Quota::per_second(NonZeroU32::new(30).expect("non-zero")).expect("valid constant"), // 60 requests / 2 seconds (per instrument)
342            ),
343            (
344                "okx:/api/v5/trade/orders-pending".to_string(),
345                Quota::per_second(NonZeroU32::new(20).expect("non-zero")).expect("valid constant"),
346            ),
347            (
348                "okx:/api/v5/trade/orders-history".to_string(),
349                Quota::per_second(NonZeroU32::new(20).expect("non-zero")).expect("valid constant"),
350            ),
351            (
352                "okx:/api/v5/trade/fills".to_string(),
353                Quota::per_second(NonZeroU32::new(30).expect("non-zero")).expect("valid constant"),
354            ),
355            (
356                "okx:/api/v5/trade/order-algo".to_string(),
357                Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant"),
358            ),
359            (
360                "okx:/api/v5/trade/cancel-algos".to_string(),
361                Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant"),
362            ),
363            (
364                "okx:/api/v5/trade/amend-algos".to_string(),
365                Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant"),
366            ),
367        ]
368    }
369
370    fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
371        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
372        let route = format!("okx:{normalized}");
373
374        vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
375    }
376
377    /// Cancel all pending HTTP requests.
378    pub fn cancel_all_requests(&self) {
379        self.cancellation_token.cancel();
380    }
381
382    /// Get the cancellation token for this client.
383    pub fn cancellation_token(&self) -> &CancellationToken {
384        &self.cancellation_token
385    }
386
387    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
388    /// optionally overridden with a custom base URL.
389    ///
390    /// This version of the client has **no credentials**, so it can only
391    /// call publicly accessible endpoints.
392    ///
393    /// # Errors
394    ///
395    /// Returns an error if the retry manager cannot be created.
396    pub fn new(
397        base_url: Option<String>,
398        timeout_secs: u64,
399        max_retries: u32,
400        retry_delay_ms: u64,
401        retry_delay_max_ms: u64,
402        environment: OKXEnvironment,
403        proxy_url: Option<String>,
404    ) -> Result<Self, OKXHttpError> {
405        let retry_config = RetryConfig {
406            max_retries,
407            initial_delay_ms: retry_delay_ms,
408            max_delay_ms: retry_delay_max_ms,
409            backoff_factor: 2.0,
410            jitter_ms: 1000,
411            operation_timeout_ms: Some(60_000),
412            immediate_first: false,
413            max_elapsed_ms: Some(180_000),
414        };
415
416        let retry_manager = RetryManager::new(retry_config);
417
418        Ok(Self {
419            base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
420            client: HttpClient::new(
421                Self::default_headers(environment),
422                vec![],
423                Self::rate_limiter_quotas(),
424                Some(*OKX_REST_QUOTA),
425                Some(timeout_secs),
426                proxy_url,
427            )
428            .map_err(|e| {
429                OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
430            })?,
431            credential: None,
432            retry_manager,
433            cancellation_token: CancellationToken::new(),
434            environment,
435        })
436    }
437
438    /// Creates a new [`OKXHttpClient`] configured with credentials
439    /// for authenticated requests, optionally using a custom base URL.
440    ///
441    /// # Errors
442    ///
443    /// Returns an error if the retry manager cannot be created.
444    #[expect(clippy::too_many_arguments)]
445    pub fn with_credentials(
446        api_key: String,
447        api_secret: String,
448        api_passphrase: String,
449        base_url: String,
450        timeout_secs: u64,
451        max_retries: u32,
452        retry_delay_ms: u64,
453        retry_delay_max_ms: u64,
454        environment: OKXEnvironment,
455        proxy_url: Option<String>,
456    ) -> Result<Self, OKXHttpError> {
457        let retry_config = RetryConfig {
458            max_retries,
459            initial_delay_ms: retry_delay_ms,
460            max_delay_ms: retry_delay_max_ms,
461            backoff_factor: 2.0,
462            jitter_ms: 1000,
463            operation_timeout_ms: Some(60_000),
464            immediate_first: false,
465            max_elapsed_ms: Some(180_000),
466        };
467
468        let retry_manager = RetryManager::new(retry_config);
469
470        Ok(Self {
471            base_url,
472            client: HttpClient::new(
473                Self::default_headers(environment),
474                vec![],
475                Self::rate_limiter_quotas(),
476                Some(*OKX_REST_QUOTA),
477                Some(timeout_secs),
478                proxy_url,
479            )
480            .map_err(|e| {
481                OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
482            })?,
483            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
484            retry_manager,
485            cancellation_token: CancellationToken::new(),
486            environment,
487        })
488    }
489
490    /// Builds the default headers to include with each request (e.g., `User-Agent`).
491    fn default_headers(environment: OKXEnvironment) -> HashMap<String, String> {
492        let mut headers =
493            HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
494
495        if environment == OKXEnvironment::Demo {
496            headers.insert("x-simulated-trading".to_string(), "1".to_string());
497        }
498
499        headers
500    }
501
502    /// Signs an OKX request with timestamp, API key, passphrase, and signature.
503    ///
504    /// # Errors
505    ///
506    /// Returns [`OKXHttpError::MissingCredentials`] if no credentials are set
507    /// but the request requires authentication.
508    fn sign_request(
509        &self,
510        method: &Method,
511        path: &str,
512        body: Option<&[u8]>,
513    ) -> Result<HashMap<String, String>, OKXHttpError> {
514        let credential = match self.credential.as_ref() {
515            Some(c) => c,
516            None => return Err(OKXHttpError::MissingCredentials),
517        };
518
519        let api_key = credential.api_key().to_string();
520        let api_passphrase = credential.api_passphrase().to_string();
521
522        // OKX requires milliseconds in the timestamp (ISO 8601 with milliseconds)
523        let now = Utc::now();
524        let millis = now.timestamp_subsec_millis();
525        let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{millis:03}Z");
526        let signature = credential.sign_bytes(&timestamp, method.as_str(), path, body);
527
528        let mut headers = HashMap::new();
529        headers.insert("OK-ACCESS-KEY".to_string(), api_key);
530        headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
531        headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
532        headers.insert("OK-ACCESS-SIGN".to_string(), signature);
533
534        Ok(headers)
535    }
536
537    /// Sends an HTTP request to OKX and parses the response into `Vec<T>`.
538    ///
539    /// Internally, this method handles:
540    /// - Building the URL from `base_url` + `path`.
541    /// - Optionally signing the request.
542    /// - Deserializing JSON responses into typed models, or returning a [`OKXHttpError`].
543    /// - Retrying with exponential backoff on transient errors.
544    ///
545    /// # Errors
546    ///
547    /// Returns an error if:
548    /// - The HTTP request fails.
549    /// - Authentication is required but credentials are missing.
550    /// - The response cannot be deserialized into the expected type.
551    /// - The OKX API returns an error response.
552    async fn send_request<T: DeserializeOwned, P: Serialize>(
553        &self,
554        method: Method,
555        path: &str,
556        params: Option<&P>,
557        body: Option<Vec<u8>>,
558        authenticate: bool,
559    ) -> Result<Vec<T>, OKXHttpError> {
560        let url = format!("{}{path}", self.base_url);
561
562        // Pre-compute rate limit keys once outside the retry closure
563        let rate_keys: Vec<String> = Self::rate_limit_keys(path)
564            .into_iter()
565            .map(|k| k.to_string())
566            .collect();
567
568        let operation = || {
569            let url = url.clone();
570            let method = method.clone();
571            let body = body.clone();
572            let rate_keys = rate_keys.clone();
573
574            async move {
575                // Serialize params to query string for signing (if needed)
576                let query_string = if let Some(p) = params {
577                    serde_urlencoded::to_string(p).map_err(|e| {
578                        OKXHttpError::JsonError(format!("Failed to serialize params: {e}"))
579                    })?
580                } else {
581                    String::new()
582                };
583
584                // Build full path with query string for signing
585                let full_path = if query_string.is_empty() {
586                    path.to_string()
587                } else {
588                    format!("{path}?{query_string}")
589                };
590
591                let mut headers = if authenticate {
592                    self.sign_request(&method, &full_path, body.as_deref())?
593                } else {
594                    HashMap::new()
595                };
596
597                // Always set Content-Type header when body is present
598                if body.is_some() {
599                    headers.insert("Content-Type".to_string(), "application/json".to_string());
600                }
601
602                let resp = self
603                    .client
604                    .request_with_params(
605                        method.clone(),
606                        url,
607                        params,
608                        Some(headers),
609                        body,
610                        None,
611                        Some(rate_keys),
612                    )
613                    .await?;
614
615                log::trace!("Response: {resp:?}");
616
617                if resp.status.is_success() {
618                    let okx_response: OKXResponse<T> =
619                        serde_json::from_slice(&resp.body).map_err(|e| {
620                            log::error!("Failed to deserialize OKXResponse: {e}");
621                            OKXHttpError::JsonError(e.to_string())
622                        })?;
623
624                    if okx_response.code != OKX_SUCCESS_CODE {
625                        return Err(OKXHttpError::OkxError {
626                            error_code: okx_response.code,
627                            message: resolve_okx_error_message(&resp.body, &okx_response.msg),
628                        });
629                    }
630
631                    Ok(okx_response.data)
632                } else {
633                    let error_body = String::from_utf8_lossy(&resp.body);
634                    if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
635                        log::debug!("HTTP 404 with body: {error_body}");
636                    } else {
637                        log::error!(
638                            "HTTP error {} with body: {error_body}",
639                            resp.status.as_str()
640                        );
641                    }
642
643                    if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
644                        return Err(OKXHttpError::OkxError {
645                            error_code: parsed_error.code,
646                            message: resolve_okx_error_message(&resp.body, &parsed_error.msg),
647                        });
648                    }
649
650                    Err(OKXHttpError::UnexpectedStatus {
651                        // Fall back to 500 if the venue returns a non-standard
652                        // code so we never panic in the error path.
653                        status: StatusCode::from_u16(resp.status.as_u16())
654                            .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
655                        body: error_body.to_string(),
656                    })
657                }
658            }
659        };
660
661        // Retry strategy based on OKX error responses and HTTP status codes:
662        //
663        // 1. Network errors: always retry (transient connection issues)
664        // 2. HTTP 5xx/429: server errors and rate limiting should be retried
665        // 3. OKX specific retryable error codes (defined in common::consts)
666        //
667        // Note: OKX returns many permanent errors which should NOT be retried
668        // (e.g., "Invalid instrument", "Insufficient balance", "Invalid API Key")
669        let should_retry = |error: &OKXHttpError| -> bool {
670            match error {
671                OKXHttpError::HttpClientError(_) => true,
672                OKXHttpError::UnexpectedStatus { status, .. } => {
673                    status.as_u16() >= 500 || status.as_u16() == 429
674                }
675                OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
676                _ => false,
677            }
678        };
679
680        let create_error = |msg: String| -> OKXHttpError {
681            if msg == "canceled" {
682                OKXHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
683            } else {
684                OKXHttpError::ValidationError(msg)
685            }
686        };
687
688        self.retry_manager
689            .execute_with_retry_with_cancel(
690                path,
691                operation,
692                should_retry,
693                create_error,
694                &self.cancellation_token,
695            )
696            .await
697    }
698
699    /// Sets the position mode for an account.
700    ///
701    /// # Errors
702    ///
703    /// Returns an error if JSON serialization of `params` fails, if the HTTP
704    /// request fails, or if the response body cannot be deserialized.
705    ///
706    /// # References
707    ///
708    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-set-position-mode>
709    pub async fn set_position_mode(
710        &self,
711        params: SetPositionModeParams,
712    ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
713        let path = "/api/v5/account/set-position-mode";
714        let body = serde_json::to_vec(&params)?;
715        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
716            .await
717    }
718
719    /// Requests position tiers information, maximum leverage depends on your borrowings and margin ratio.
720    ///
721    /// # Errors
722    ///
723    /// Returns an error if the HTTP request fails, authentication is rejected
724    /// or the response cannot be deserialized.
725    ///
726    /// # References
727    ///
728    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-position-tiers>
729    pub async fn get_position_tiers(
730        &self,
731        params: GetPositionTiersParams,
732    ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
733        self.send_request(
734            Method::GET,
735            "/api/v5/public/position-tiers",
736            Some(&params),
737            None,
738            false,
739        )
740        .await
741    }
742
743    /// Requests a list of instruments with open contracts.
744    ///
745    /// # Errors
746    ///
747    /// Returns an error if JSON serialization of `params` fails, if the HTTP
748    /// request fails, or if the response body cannot be deserialized.
749    ///
750    /// # References
751    ///
752    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-instruments>
753    pub async fn get_instruments(
754        &self,
755        params: GetInstrumentsParams,
756    ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
757        self.send_request(
758            Method::GET,
759            "/api/v5/public/instruments",
760            Some(&params),
761            None,
762            false,
763        )
764        .await
765    }
766
767    /// Requests option market data for an instrument family.
768    ///
769    /// # Errors
770    ///
771    /// Returns an error if the HTTP request fails or the response cannot be deserialized.
772    ///
773    /// # References
774    ///
775    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-option-market-data>
776    pub async fn get_option_summary(
777        &self,
778        params: GetOptionSummaryParams,
779    ) -> Result<Vec<OKXOptionSummary>, OKXHttpError> {
780        self.send_request(
781            Method::GET,
782            "/api/v5/public/opt-summary",
783            Some(&params),
784            None,
785            false,
786        )
787        .await
788    }
789
790    /// Requests the current server time from OKX.
791    ///
792    /// Retrieves the OKX system time in Unix timestamp (milliseconds). This is useful for
793    /// synchronizing local clocks with the exchange server and logging time drift.
794    ///
795    /// # Errors
796    ///
797    /// Returns an error if the HTTP request fails or if the response body
798    /// cannot be parsed into [`OKXServerTime`].
799    ///
800    /// # References
801    ///
802    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-system-time>
803    pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
804        let response: Vec<OKXServerTime> = self
805            .send_request::<_, ()>(Method::GET, "/api/v5/public/time", None, None, false)
806            .await?;
807        response
808            .first()
809            .map(|t| t.ts)
810            .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
811    }
812
813    /// Requests a mark price.
814    ///
815    /// We set the mark price based on the SPOT index and at a reasonable basis to prevent individual
816    /// users from manipulating the market and causing the contract price to fluctuate.
817    ///
818    /// # Errors
819    ///
820    /// Returns an error if the HTTP request fails or if the response body
821    /// cannot be parsed into [`OKXMarkPrice`].
822    ///
823    /// # References
824    ///
825    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-mark-price>
826    pub async fn get_mark_price(
827        &self,
828        params: GetMarkPriceParams,
829    ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
830        self.send_request(
831            Method::GET,
832            "/api/v5/public/mark-price",
833            Some(&params),
834            None,
835            false,
836        )
837        .await
838    }
839
840    /// Requests the latest index price.
841    ///
842    /// # Errors
843    ///
844    /// Returns an error if the operation fails.
845    ///
846    /// # References
847    ///
848    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-index-tickers>
849    pub async fn get_index_tickers(
850        &self,
851        params: GetIndexTickerParams,
852    ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
853        self.send_request(
854            Method::GET,
855            "/api/v5/market/index-tickers",
856            Some(&params),
857            None,
858            false,
859        )
860        .await
861    }
862
863    /// Requests trades history.
864    ///
865    /// # Errors
866    ///
867    /// Returns an error if the operation fails.
868    ///
869    /// # References
870    ///
871    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-trades-history>
872    pub async fn get_history_trades(
873        &self,
874        params: GetTradesParams,
875    ) -> Result<Vec<OKXTrade>, OKXHttpError> {
876        self.send_request(
877            Method::GET,
878            "/api/v5/market/history-trades",
879            Some(&params),
880            None,
881            false,
882        )
883        .await
884    }
885
886    /// Requests order book snapshot.
887    ///
888    /// # Errors
889    ///
890    /// Returns an error if the operation fails.
891    ///
892    /// # References
893    ///
894    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-order-book>
895    pub async fn get_order_book(
896        &self,
897        params: GetOrderBookParams,
898    ) -> Result<Vec<OKXOrderBookSnapshot>, OKXHttpError> {
899        self.send_request(
900            Method::GET,
901            "/api/v5/market/books",
902            Some(&params),
903            None,
904            false,
905        )
906        .await
907    }
908
909    /// Requests funding rate history.
910    ///
911    /// # Errors
912    ///
913    /// Returns an error if the operation fails.
914    ///
915    /// # References
916    ///
917    /// <https://www.okx.com/docs-v5/en/#public-data-rest-api-get-funding-rate-history>
918    pub async fn get_funding_rate_history(
919        &self,
920        params: GetFundingRateHistoryParams,
921    ) -> Result<Vec<OKXFundingRateHistory>, OKXHttpError> {
922        self.send_request(
923            Method::GET,
924            "/api/v5/public/funding-rate-history",
925            Some(&params),
926            None,
927            false,
928        )
929        .await
930    }
931
932    /// Requests recent candlestick data.
933    ///
934    /// # Errors
935    ///
936    /// Returns an error if the operation fails.
937    ///
938    /// # References
939    ///
940    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
941    pub async fn get_candles(
942        &self,
943        params: GetCandlesticksParams,
944    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
945        self.send_request(
946            Method::GET,
947            "/api/v5/market/candles",
948            Some(&params),
949            None,
950            false,
951        )
952        .await
953    }
954
955    /// Requests historical candlestick data.
956    ///
957    /// # Errors
958    ///
959    /// Returns an error if the operation fails.
960    ///
961    /// # References
962    ///
963    /// <https://www.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
964    pub async fn get_history_candles(
965        &self,
966        params: GetCandlesticksParams,
967    ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
968        self.send_request(
969            Method::GET,
970            "/api/v5/market/history-candles",
971            Some(&params),
972            None,
973            false,
974        )
975        .await
976    }
977
978    /// Requests a list of assets (with non-zero balance), remaining balance, and available amount
979    /// in the trading account.
980    ///
981    /// # Errors
982    ///
983    /// Returns an error if the operation fails.
984    ///
985    /// # References
986    ///
987    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
988    pub async fn get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
989        let path = "/api/v5/account/balance";
990        self.send_request::<_, ()>(Method::GET, path, None, None, true)
991            .await
992    }
993
994    /// Requests fee rates for the account.
995    ///
996    /// Returns fee rates for the specified instrument type and the user's VIP level.
997    ///
998    /// # Errors
999    ///
1000    /// Returns an error if the operation fails.
1001    ///
1002    /// # References
1003    ///
1004    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-fee-rates>
1005    pub async fn get_trade_fee(
1006        &self,
1007        params: GetTradeFeeParams,
1008    ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
1009        self.send_request(
1010            Method::GET,
1011            "/api/v5/account/trade-fee",
1012            Some(&params),
1013            None,
1014            true,
1015        )
1016        .await
1017    }
1018
1019    /// Retrieves a single order’s details.
1020    ///
1021    /// # Errors
1022    ///
1023    /// Returns an error if the operation fails.
1024    ///
1025    /// # References
1026    ///
1027    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order>
1028    pub async fn get_order(
1029        &self,
1030        params: GetOrderParams,
1031    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
1032        self.send_request(
1033            Method::GET,
1034            "/api/v5/trade/order",
1035            Some(&params),
1036            None,
1037            true,
1038        )
1039        .await
1040    }
1041
1042    /// Requests order list (pending orders).
1043    ///
1044    /// # Errors
1045    ///
1046    /// Returns an error if the operation fails.
1047    ///
1048    /// # References
1049    ///
1050    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-list>
1051    pub async fn get_orders_pending(
1052        &self,
1053        params: GetOrderListParams,
1054    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
1055        self.send_request(
1056            Method::GET,
1057            "/api/v5/trade/orders-pending",
1058            Some(&params),
1059            None,
1060            true,
1061        )
1062        .await
1063    }
1064
1065    /// Requests historical order records.
1066    ///
1067    /// # Errors
1068    ///
1069    /// Returns an error if the operation fails.
1070    ///
1071    /// # References
1072    ///
1073    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-orders-history>
1074    pub async fn get_orders_history(
1075        &self,
1076        params: GetOrderHistoryParams,
1077    ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
1078        self.send_request(
1079            Method::GET,
1080            "/api/v5/trade/orders-history",
1081            Some(&params),
1082            None,
1083            true,
1084        )
1085        .await
1086    }
1087
1088    /// Requests pending algo orders.
1089    ///
1090    /// # Errors
1091    ///
1092    /// Returns an error if the operation fails.
1093    pub async fn get_order_algo_pending(
1094        &self,
1095        params: GetAlgoOrdersParams,
1096    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
1097        self.send_request(
1098            Method::GET,
1099            "/api/v5/trade/orders-algo-pending",
1100            Some(&params),
1101            None,
1102            true,
1103        )
1104        .await
1105    }
1106
1107    /// Requests historical algo orders.
1108    ///
1109    /// # Errors
1110    ///
1111    /// Returns an error if the operation fails.
1112    pub async fn get_order_algo_history(
1113        &self,
1114        params: GetAlgoOrdersParams,
1115    ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
1116        self.send_request(
1117            Method::GET,
1118            "/api/v5/trade/orders-algo-history",
1119            Some(&params),
1120            None,
1121            true,
1122        )
1123        .await
1124    }
1125
1126    /// Requests transaction details (fills) for the given parameters.
1127    ///
1128    /// # Errors
1129    ///
1130    /// Returns an error if the operation fails.
1131    ///
1132    /// # References
1133    ///
1134    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>
1135    pub async fn get_fills(
1136        &self,
1137        params: GetTransactionDetailsParams,
1138    ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
1139        self.send_request(
1140            Method::GET,
1141            "/api/v5/trade/fills",
1142            Some(&params),
1143            None,
1144            true,
1145        )
1146        .await
1147    }
1148
1149    /// Requests information on your positions. When the account is in net mode, net positions will
1150    /// be displayed, and when the account is in long/short mode, long or short positions will be
1151    /// displayed. Returns in reverse chronological order using ctime.
1152    ///
1153    /// # Errors
1154    ///
1155    /// Returns an error if the operation fails.
1156    ///
1157    /// # References
1158    ///
1159    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
1160    pub async fn get_positions(
1161        &self,
1162        params: GetPositionsParams,
1163    ) -> Result<Vec<OKXPosition>, OKXHttpError> {
1164        self.send_request(
1165            Method::GET,
1166            "/api/v5/account/positions",
1167            Some(&params),
1168            None,
1169            true,
1170        )
1171        .await
1172    }
1173
1174    /// Requests closed or historical position data.
1175    ///
1176    /// # Errors
1177    ///
1178    /// Returns an error if the operation fails.
1179    ///
1180    /// # References
1181    ///
1182    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions-history>
1183    pub async fn get_positions_history(
1184        &self,
1185        params: GetPositionsHistoryParams,
1186    ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
1187        self.send_request(
1188            Method::GET,
1189            "/api/v5/account/positions-history",
1190            Some(&params),
1191            None,
1192            true,
1193        )
1194        .await
1195    }
1196}
1197
1198/// Provides a higher-level HTTP client for the [OKX](https://okx.com) REST API.
1199///
1200/// This client wraps the underlying `OKXHttpInnerClient` to handle conversions
1201/// into the Nautilus domain model.
1202#[derive(Debug)]
1203#[cfg_attr(
1204    feature = "python",
1205    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.okx", from_py_object)
1206)]
1207#[cfg_attr(
1208    feature = "python",
1209    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.okx")
1210)]
1211pub struct OKXHttpClient {
1212    pub(crate) inner: Arc<OKXRawHttpClient>,
1213    pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
1214    clock: &'static AtomicTime,
1215    cache_initialized: AtomicBool,
1216}
1217
1218impl Clone for OKXHttpClient {
1219    fn clone(&self) -> Self {
1220        let cache_initialized = AtomicBool::new(false);
1221
1222        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
1223        if is_initialized {
1224            cache_initialized.store(true, Ordering::Release);
1225        }
1226
1227        Self {
1228            inner: self.inner.clone(),
1229            instruments_cache: self.instruments_cache.clone(),
1230            cache_initialized,
1231            clock: self.clock,
1232        }
1233    }
1234}
1235
1236impl Default for OKXHttpClient {
1237    fn default() -> Self {
1238        Self::new(None, 60, 3, 1000, 10_000, OKXEnvironment::Live, None)
1239            .expect("Failed to create default OKXHttpClient")
1240    }
1241}
1242
1243impl OKXHttpClient {
1244    /// Creates a new [`OKXHttpClient`] using the default OKX HTTP URL,
1245    /// optionally overridden with a custom base url.
1246    ///
1247    /// This version of the client has **no credentials**, so it can only
1248    /// call publicly accessible endpoints.
1249    ///
1250    /// # Errors
1251    ///
1252    /// Returns an error if the retry manager cannot be created.
1253    pub fn new(
1254        base_url: Option<String>,
1255        timeout_secs: u64,
1256        max_retries: u32,
1257        retry_delay_ms: u64,
1258        retry_delay_max_ms: u64,
1259        environment: OKXEnvironment,
1260        proxy_url: Option<String>,
1261    ) -> anyhow::Result<Self> {
1262        Ok(Self {
1263            inner: Arc::new(OKXRawHttpClient::new(
1264                base_url,
1265                timeout_secs,
1266                max_retries,
1267                retry_delay_ms,
1268                retry_delay_max_ms,
1269                environment,
1270                proxy_url,
1271            )?),
1272            instruments_cache: Arc::new(AtomicMap::new()),
1273            cache_initialized: AtomicBool::new(false),
1274            clock: get_atomic_clock_realtime(),
1275        })
1276    }
1277
1278    /// Generates a timestamp for initialization.
1279    fn generate_ts_init(&self) -> UnixNanos {
1280        self.clock.get_time_ns()
1281    }
1282
1283    /// Creates a new authenticated [`OKXHttpClient`] using environment variables and
1284    /// the default OKX HTTP base url.
1285    ///
1286    /// # Errors
1287    ///
1288    /// Returns an error if the operation fails.
1289    pub fn from_env() -> anyhow::Result<Self> {
1290        Self::with_credentials(
1291            None,
1292            None,
1293            None,
1294            None,
1295            60,
1296            3,
1297            1000,
1298            10_000,
1299            OKXEnvironment::Live,
1300            None,
1301        )
1302    }
1303
1304    /// Creates a new [`OKXHttpClient`] configured with credentials
1305    /// for authenticated requests, optionally using a custom base url.
1306    ///
1307    /// # Errors
1308    ///
1309    /// Returns an error if the operation fails.
1310    #[expect(clippy::too_many_arguments)]
1311    pub fn with_credentials(
1312        api_key: Option<String>,
1313        api_secret: Option<String>,
1314        api_passphrase: Option<String>,
1315        base_url: Option<String>,
1316        timeout_secs: u64,
1317        max_retries: u32,
1318        retry_delay_ms: u64,
1319        retry_delay_max_ms: u64,
1320        environment: OKXEnvironment,
1321        proxy_url: Option<String>,
1322    ) -> anyhow::Result<Self> {
1323        let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
1324        let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
1325        let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
1326        let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
1327
1328        Ok(Self {
1329            inner: Arc::new(OKXRawHttpClient::with_credentials(
1330                api_key,
1331                api_secret,
1332                api_passphrase,
1333                base_url,
1334                timeout_secs,
1335                max_retries,
1336                retry_delay_ms,
1337                retry_delay_max_ms,
1338                environment,
1339                proxy_url,
1340            )?),
1341            instruments_cache: Arc::new(AtomicMap::new()),
1342            cache_initialized: AtomicBool::new(false),
1343            clock: get_atomic_clock_realtime(),
1344        })
1345    }
1346
1347    /// Retrieves an instrument from the cache.
1348    ///
1349    /// # Errors
1350    ///
1351    /// Returns an error if the instrument is not found in the cache.
1352    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1353        self.instruments_cache
1354            .get_cloned(&symbol)
1355            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
1356    }
1357
1358    /// Cancel all pending HTTP requests.
1359    pub fn cancel_all_requests(&self) {
1360        self.inner.cancel_all_requests();
1361    }
1362
1363    /// Get the cancellation token for this client.
1364    pub fn cancellation_token(&self) -> &CancellationToken {
1365        self.inner.cancellation_token()
1366    }
1367
1368    /// Returns the base url being used by the client.
1369    pub fn base_url(&self) -> &str {
1370        self.inner.base_url.as_str()
1371    }
1372
1373    /// Returns the public API key being used by the client.
1374    pub fn api_key(&self) -> Option<&str> {
1375        self.inner.credential.as_ref().map(|c| c.api_key())
1376    }
1377
1378    /// Returns a masked version of the API key for logging purposes.
1379    #[must_use]
1380    pub fn api_key_masked(&self) -> Option<String> {
1381        self.inner.credential.as_ref().map(|c| c.api_key_masked())
1382    }
1383
1384    /// Returns whether the client is configured for demo trading.
1385    #[must_use]
1386    pub fn is_demo(&self) -> bool {
1387        self.inner.environment == OKXEnvironment::Demo
1388    }
1389
1390    /// Requests the current server time from OKX.
1391    ///
1392    /// Returns the OKX system time as a Unix timestamp in milliseconds.
1393    ///
1394    /// # Errors
1395    ///
1396    /// Returns an error if the HTTP request fails or if the response cannot be parsed.
1397    pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
1398        self.inner.get_server_time().await
1399    }
1400
1401    /// Checks if the client is initialized.
1402    ///
1403    /// The client is considered initialized if any instruments have been cached from the venue.
1404    #[must_use]
1405    pub fn is_initialized(&self) -> bool {
1406        self.cache_initialized.load(Ordering::Acquire)
1407    }
1408
1409    /// Returns a snapshot of all instrument symbols currently held in the
1410    /// internal cache.
1411    #[must_use]
1412    pub fn get_cached_symbols(&self) -> Vec<String> {
1413        self.instruments_cache
1414            .load()
1415            .keys()
1416            .map(|k| k.to_string())
1417            .collect()
1418    }
1419
1420    /// Caches multiple instruments.
1421    ///
1422    /// Any existing instruments with the same symbols will be replaced.
1423    pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1424        self.instruments_cache.rcu(|m| {
1425            for inst in instruments {
1426                m.insert(inst.raw_symbol().inner(), inst.clone());
1427            }
1428        });
1429        self.cache_initialized.store(true, Ordering::Release);
1430    }
1431
1432    /// Caches a single instrument.
1433    ///
1434    /// Any existing instrument with the same symbol will be replaced.
1435    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1436        self.instruments_cache
1437            .insert(instrument.raw_symbol().inner(), instrument);
1438        self.cache_initialized.store(true, Ordering::Release);
1439    }
1440
1441    /// Gets an instrument from the cache by symbol.
1442    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1443        self.instruments_cache.get_cloned(symbol)
1444    }
1445
1446    /// Requests the account state for the `account_id` from OKX.
1447    ///
1448    /// # Errors
1449    ///
1450    /// Returns an error if the HTTP request fails or no account state is returned.
1451    pub async fn request_account_state(
1452        &self,
1453        account_id: AccountId,
1454    ) -> anyhow::Result<AccountState> {
1455        let resp = self
1456            .inner
1457            .get_balance()
1458            .await
1459            .map_err(|e| anyhow::anyhow!(e))?;
1460
1461        let ts_init = self.generate_ts_init();
1462        let raw = resp
1463            .first()
1464            .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1465        let account_state = parse_account_state(raw, account_id, ts_init)?;
1466
1467        Ok(account_state)
1468    }
1469
1470    /// Sets the position mode for the account.
1471    ///
1472    /// Defaults to NetMode if no position mode is provided.
1473    ///
1474    /// # Errors
1475    ///
1476    /// Returns an error if the HTTP request fails or the position mode cannot be set.
1477    ///
1478    /// # Note
1479    ///
1480    /// This endpoint only works for accounts with derivatives trading enabled.
1481    /// If the account only has spot trading, this will return an error.
1482    pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1483        let mut params = SetPositionModeParamsBuilder::default();
1484        params.pos_mode(position_mode);
1485        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1486
1487        match self.inner.set_position_mode(params).await {
1488            Ok(_) => Ok(()),
1489            Err(e) => {
1490                if let OKXHttpError::OkxError {
1491                    error_code,
1492                    message,
1493                } = &e
1494                    && error_code == "50115"
1495                {
1496                    log::warn!(
1497                        "Account does not support position mode setting (derivatives trading not enabled): {message}"
1498                    );
1499                    return Ok(()); // Gracefully handle this case
1500                }
1501                anyhow::bail!(e)
1502            }
1503        }
1504    }
1505
1506    /// Requests all instruments for the `instrument_type` from OKX.
1507    ///
1508    /// # Errors
1509    ///
1510    /// Returns an error if the HTTP request fails or instrument parsing fails.
1511    ///
1512    /// # Returns
1513    ///
1514    /// A tuple containing:
1515    /// - `Vec<InstrumentAny>`: The parsed instruments
1516    /// - `Vec<(Ustr, u64)>`: Mappings of inst_id to inst_id_code for WebSocket order operations
1517    pub async fn request_instruments(
1518        &self,
1519        instrument_type: OKXInstrumentType,
1520        instrument_family: Option<String>,
1521    ) -> anyhow::Result<(Vec<InstrumentAny>, Vec<(Ustr, u64)>)> {
1522        let mut params = GetInstrumentsParamsBuilder::default();
1523        params.inst_type(instrument_type);
1524
1525        if let Some(family) = instrument_family.clone() {
1526            params.inst_family(family);
1527        }
1528
1529        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1530
1531        let resp = self
1532            .inner
1533            .get_instruments(params)
1534            .await
1535            .map_err(|e| anyhow::anyhow!(e))?;
1536
1537        let fee_rate_opt = {
1538            let fee_params = GetTradeFeeParams {
1539                inst_type: instrument_type,
1540                uly: None,
1541                inst_family: instrument_family,
1542            };
1543
1544            match self.inner.get_trade_fee(fee_params).await {
1545                Ok(rates) => rates.into_iter().next(),
1546                Err(OKXHttpError::MissingCredentials) => {
1547                    log::debug!("Missing credentials for fee rates, using None");
1548                    None
1549                }
1550                Err(e) => {
1551                    log::warn!("Failed to fetch fee rates for {instrument_type}: {e}");
1552                    None
1553                }
1554            }
1555        };
1556
1557        let ts_init = self.generate_ts_init();
1558
1559        let mut instruments: Vec<InstrumentAny> = Vec::new();
1560        let mut inst_id_codes: Vec<(Ustr, u64)> = Vec::new();
1561
1562        for inst in &resp {
1563            // Collect inst_id_code mappings for WebSocket order operations
1564            if let Some(code) = inst.inst_id_code {
1565                inst_id_codes.push((inst.inst_id, code));
1566            }
1567            // Skip pre-open instruments which have incomplete/empty field values
1568            // Keep suspended instruments as they have valid metadata and may return to live
1569            if inst.state == OKXInstrumentStatus::Preopen {
1570                continue;
1571            }
1572
1573            // Determine which fee fields to use based on contract type
1574            // OKX fee rate convention: positive = rebate, negative = commission
1575            // Nautilus convention: negative = rebate, positive = commission
1576            // Negate to convert between conventions
1577            let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1578                let is_usdt_margined = inst.ct_type == OKXContractType::Linear;
1579                let (maker_str, taker_str) = if is_usdt_margined {
1580                    (&fee_rate.maker_u, &fee_rate.taker_u)
1581                } else {
1582                    (&fee_rate.maker, &fee_rate.taker)
1583                };
1584
1585                let maker = if maker_str.is_empty() {
1586                    None
1587                } else {
1588                    Decimal::from_str(maker_str).ok().map(|v| -v)
1589                };
1590                let taker = if taker_str.is_empty() {
1591                    None
1592                } else {
1593                    Decimal::from_str(taker_str).ok().map(|v| -v)
1594                };
1595
1596                (maker, taker)
1597            } else {
1598                (None, None)
1599            };
1600
1601            match parse_instrument_any(inst, None, None, maker_fee, taker_fee, ts_init) {
1602                Ok(Some(instrument_any)) => {
1603                    instruments.push(instrument_any);
1604                }
1605                Ok(None) => {
1606                    // Unsupported instrument type, skip silently
1607                }
1608                Err(e) => {
1609                    log::warn!("Failed to parse instrument {}: {e}", inst.inst_id);
1610                }
1611            }
1612        }
1613
1614        Ok((instruments, inst_id_codes))
1615    }
1616
1617    /// Requests a single instrument by `instrument_id` from OKX.
1618    ///
1619    /// Fetches the instrument from the API, caches it, and returns it.
1620    ///
1621    /// # Errors
1622    ///
1623    /// This function will return an error if:
1624    /// - The API request fails.
1625    /// - The instrument is not found.
1626    /// - Failed to parse instrument data.
1627    pub async fn request_instrument(
1628        &self,
1629        instrument_id: InstrumentId,
1630    ) -> anyhow::Result<InstrumentAny> {
1631        let symbol = instrument_id.symbol.as_str();
1632        let instrument_type = okx_instrument_type_from_symbol(symbol);
1633
1634        let mut params = GetInstrumentsParamsBuilder::default();
1635        params.inst_type(instrument_type);
1636        params.inst_id(symbol);
1637
1638        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1639
1640        let resp = self
1641            .inner
1642            .get_instruments(params)
1643            .await
1644            .map_err(|e| anyhow::anyhow!(e))?;
1645
1646        let raw_inst = resp
1647            .first()
1648            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found"))?;
1649
1650        // Skip pre-open instruments which have incomplete/empty field values
1651        if raw_inst.state == OKXInstrumentStatus::Preopen {
1652            anyhow::bail!("Instrument {symbol} is in pre-open state");
1653        }
1654
1655        let fee_rate_opt = {
1656            let fee_params = GetTradeFeeParams {
1657                inst_type: instrument_type,
1658                uly: None,
1659                inst_family: None,
1660            };
1661
1662            match self.inner.get_trade_fee(fee_params).await {
1663                Ok(rates) => rates.into_iter().next(),
1664                Err(OKXHttpError::MissingCredentials) => {
1665                    log::debug!("Missing credentials for fee rates, using None");
1666                    None
1667                }
1668                Err(e) => {
1669                    log::warn!("Failed to fetch fee rates for {symbol}: {e}");
1670                    None
1671                }
1672            }
1673        };
1674
1675        // OKX fee rate convention: positive = rebate, negative = commission
1676        // Nautilus convention: negative = rebate, positive = commission
1677        // Negate to convert between conventions
1678        let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1679            let is_usdt_margined = raw_inst.ct_type == OKXContractType::Linear;
1680            let (maker_str, taker_str) = if is_usdt_margined {
1681                (&fee_rate.maker_u, &fee_rate.taker_u)
1682            } else {
1683                (&fee_rate.maker, &fee_rate.taker)
1684            };
1685
1686            let maker = if maker_str.is_empty() {
1687                None
1688            } else {
1689                Decimal::from_str(maker_str).ok().map(|v| -v)
1690            };
1691            let taker = if taker_str.is_empty() {
1692                None
1693            } else {
1694                Decimal::from_str(taker_str).ok().map(|v| -v)
1695            };
1696
1697            (maker, taker)
1698        } else {
1699            (None, None)
1700        };
1701
1702        let ts_init = self.generate_ts_init();
1703        let instrument = parse_instrument_any(raw_inst, None, None, maker_fee, taker_fee, ts_init)?
1704            .ok_or_else(|| anyhow::anyhow!("Unsupported instrument type for {symbol}"))?;
1705
1706        self.cache_instrument(instrument.clone());
1707
1708        Ok(instrument)
1709    }
1710
1711    /// Requests forward prices for OKX options using the option summary endpoint.
1712    ///
1713    /// # Errors
1714    ///
1715    /// Returns an error if the HTTP request fails or no usable instrument family can be resolved.
1716    pub async fn request_forward_prices(
1717        &self,
1718        underlying: &str,
1719        instrument_id: Option<InstrumentId>,
1720    ) -> anyhow::Result<Vec<ForwardPrice>> {
1721        let requests = self.resolve_forward_price_requests(underlying, instrument_id.as_ref())?;
1722        let requested_symbol = instrument_id.as_ref().map(|id| id.symbol.inner());
1723        let requested_instrument_id = instrument_id.as_ref();
1724        let ts_init = self.generate_ts_init();
1725        let mut forward_prices = Vec::new();
1726        let mut seen_expiries = AHashSet::new();
1727
1728        for (inst_family, exp_time) in requests {
1729            let summaries = self
1730                .inner
1731                .get_option_summary(GetOptionSummaryParams {
1732                    inst_family,
1733                    exp_time,
1734                })
1735                .await
1736                .map_err(|e| anyhow::anyhow!(e))?;
1737
1738            for summary in summaries {
1739                if summary.inst_type != OKXInstrumentType::Option {
1740                    continue;
1741                }
1742
1743                if let Some(symbol) = requested_symbol
1744                    && summary.inst_id != symbol
1745                {
1746                    continue;
1747                }
1748
1749                let forward_price = match Decimal::from_str(&summary.fwd_px) {
1750                    Ok(price) if !price.is_zero() => price,
1751                    Ok(_) => continue,
1752                    Err(e) => {
1753                        log::warn!(
1754                            "Skipping invalid OKX forward price for {}: {e}",
1755                            summary.inst_id
1756                        );
1757                        continue;
1758                    }
1759                };
1760
1761                if requested_symbol.is_none() {
1762                    let expiry_key = Self::option_summary_expiry_key(summary.inst_id.as_str())?;
1763                    if !seen_expiries.insert(expiry_key) {
1764                        continue;
1765                    }
1766                }
1767
1768                let ts_event =
1769                    UnixNanos::from(summary.ts.saturating_mul(NANOSECONDS_IN_MILLISECOND));
1770                let instrument_id = if let Some(inst_id) = requested_instrument_id {
1771                    *inst_id
1772                } else {
1773                    parse_instrument_id(summary.inst_id)
1774                };
1775
1776                forward_prices.push(ForwardPrice::new(
1777                    instrument_id,
1778                    forward_price,
1779                    Some(summary.uly.to_string()),
1780                    ts_event,
1781                    ts_init,
1782                ));
1783            }
1784        }
1785
1786        Ok(forward_prices)
1787    }
1788
1789    /// Requests the latest mark price for the `instrument_type` from OKX.
1790    ///
1791    /// # Errors
1792    ///
1793    /// Returns an error if the HTTP request fails or no mark price is returned.
1794    pub async fn request_mark_price(
1795        &self,
1796        instrument_id: InstrumentId,
1797    ) -> anyhow::Result<MarkPriceUpdate> {
1798        let mut params = GetMarkPriceParamsBuilder::default();
1799        params.inst_id(instrument_id.symbol.inner());
1800        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1801
1802        let resp = self
1803            .inner
1804            .get_mark_price(params)
1805            .await
1806            .map_err(|e| anyhow::anyhow!(e))?;
1807
1808        let raw = resp
1809            .first()
1810            .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1811        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1812        let ts_init = self.generate_ts_init();
1813
1814        let mark_price =
1815            parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1816                .map_err(|e| anyhow::anyhow!(e))?;
1817        Ok(mark_price)
1818    }
1819
1820    fn resolve_forward_price_requests(
1821        &self,
1822        underlying: &str,
1823        instrument_id: Option<&InstrumentId>,
1824    ) -> anyhow::Result<Vec<(String, Option<String>)>> {
1825        if let Some(inst_id) = instrument_id {
1826            let symbol = inst_id.symbol.inner().as_str();
1827            let inst_family = extract_inst_family(symbol)?.to_string();
1828            let exp_time = Self::option_summary_exp_time(symbol)?;
1829            return Ok(vec![(inst_family, exp_time)]);
1830        }
1831
1832        let underlying = Ustr::from(underlying);
1833        let mut families = AHashSet::new();
1834
1835        for instrument in self.instruments_cache.load().values() {
1836            let InstrumentAny::CryptoOption(option) = instrument else {
1837                continue;
1838            };
1839
1840            if option.underlying.code != underlying {
1841                continue;
1842            }
1843
1844            let inst_family = extract_inst_family(option.id.symbol.inner().as_str())?;
1845            families.insert(inst_family.to_string());
1846        }
1847
1848        let mut families: Vec<String> = families.into_iter().collect();
1849        families.sort_unstable();
1850
1851        anyhow::ensure!(
1852            !families.is_empty(),
1853            "No cached OKX option families for underlying {underlying}; provide a sample instrument or pre-load option instruments"
1854        );
1855
1856        Ok(families.into_iter().map(|family| (family, None)).collect())
1857    }
1858
1859    fn option_summary_expiry_key(symbol: &str) -> anyhow::Result<String> {
1860        let parts: Vec<&str> = symbol.split('-').collect();
1861        anyhow::ensure!(
1862            parts.len() >= 5,
1863            "Expected OKX option symbol with expiry, received {symbol}"
1864        );
1865        Ok(format!("{}-{}-{}", parts[0], parts[1], parts[2]))
1866    }
1867
1868    fn option_summary_exp_time(symbol: &str) -> anyhow::Result<Option<String>> {
1869        let parts: Vec<&str> = symbol.split('-').collect();
1870        anyhow::ensure!(
1871            parts.len() >= 5,
1872            "Expected OKX option symbol with expiry, received {symbol}"
1873        );
1874        Ok(Some(parts[2].to_string()))
1875    }
1876
1877    /// Requests the latest index price for the `instrument_id` from OKX.
1878    ///
1879    /// # Errors
1880    ///
1881    /// Returns an error if the HTTP request fails or no index price is returned.
1882    pub async fn request_index_price(
1883        &self,
1884        instrument_id: InstrumentId,
1885    ) -> anyhow::Result<IndexPriceUpdate> {
1886        // Index tickers endpoint requires base pair format (e.g., BTC-USDT)
1887        let symbol = instrument_id.symbol.inner();
1888        let (base, quote) = parse_base_quote_from_symbol(symbol.as_str())?;
1889        let inst_id = format!("{base}-{quote}");
1890
1891        let mut params = GetIndexTickerParamsBuilder::default();
1892        params.inst_id(Ustr::from(&inst_id));
1893        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1894
1895        let resp = self
1896            .inner
1897            .get_index_tickers(params)
1898            .await
1899            .map_err(|e| anyhow::anyhow!(e))?;
1900
1901        let raw = resp
1902            .first()
1903            .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1904        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1905        let ts_init = self.generate_ts_init();
1906
1907        let index_price =
1908            parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1909                .map_err(|e| anyhow::anyhow!(e))?;
1910        Ok(index_price)
1911    }
1912
1913    /// Requests an order book snapshot for the `instrument_id`.
1914    ///
1915    /// # Errors
1916    ///
1917    /// Returns an error if the HTTP request fails or book parsing fails.
1918    pub async fn request_book_snapshot(
1919        &self,
1920        instrument_id: InstrumentId,
1921        depth: Option<u32>,
1922    ) -> anyhow::Result<OrderBook> {
1923        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1924        let price_precision = inst.price_precision();
1925        let size_precision = inst.size_precision();
1926
1927        let params = GetOrderBookParams {
1928            inst_id: instrument_id.symbol.to_string(),
1929            sz: depth,
1930        };
1931
1932        let resp = self
1933            .inner
1934            .get_order_book(params)
1935            .await
1936            .map_err(|e| anyhow::anyhow!(e))?;
1937
1938        let snapshot = resp
1939            .first()
1940            .ok_or_else(|| anyhow::anyhow!("No order book returned from OKX"))?;
1941
1942        let ts_event = UnixNanos::from(snapshot.ts * NANOSECONDS_IN_MILLISECOND);
1943        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1944
1945        for (i, level) in snapshot.bids.iter().enumerate() {
1946            let price = parse_price(&level.0, price_precision)?;
1947            let size = parse_quantity(&level.1, size_precision)?;
1948            let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1949            book.add(order, 0, i as u64, ts_event);
1950        }
1951
1952        let bids_len = snapshot.bids.len();
1953
1954        for (i, level) in snapshot.asks.iter().enumerate() {
1955            let price = parse_price(&level.0, price_precision)?;
1956            let size = parse_quantity(&level.1, size_precision)?;
1957            let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1958            book.add(order, 0, (bids_len + i) as u64, ts_event);
1959        }
1960
1961        log::info!(
1962            "Fetched order book for {} with {} bids and {} asks",
1963            instrument_id,
1964            snapshot.bids.len(),
1965            snapshot.asks.len(),
1966        );
1967
1968        Ok(book)
1969    }
1970
1971    /// Requests an order book snapshot as `OrderBookDeltas` for the `instrument_id`.
1972    ///
1973    /// # Errors
1974    ///
1975    /// Returns an error if the HTTP request fails or parsing fails.
1976    pub async fn request_orderbook_snapshot(
1977        &self,
1978        instrument_id: InstrumentId,
1979        depth: Option<u32>,
1980    ) -> anyhow::Result<OrderBookDeltas> {
1981        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1982        let price_precision = inst.price_precision();
1983        let size_precision = inst.size_precision();
1984
1985        let params = GetOrderBookParams {
1986            inst_id: instrument_id.symbol.to_string(),
1987            sz: depth,
1988        };
1989
1990        let resp = self
1991            .inner
1992            .get_order_book(params)
1993            .await
1994            .map_err(|e| anyhow::anyhow!(e))?;
1995
1996        let snapshot = resp
1997            .first()
1998            .ok_or_else(|| anyhow::anyhow!("No order book returned from OKX"))?;
1999
2000        let ts_event = UnixNanos::from(snapshot.ts * NANOSECONDS_IN_MILLISECOND);
2001        let total_levels = snapshot.bids.len() + snapshot.asks.len();
2002        let mut deltas = Vec::with_capacity(total_levels + 1);
2003
2004        let mut clear = OrderBookDelta::clear(instrument_id, 0, ts_event, ts_event);
2005
2006        if total_levels == 0 {
2007            clear.flags |= RecordFlag::F_LAST as u8;
2008        }
2009        deltas.push(clear);
2010
2011        let mut processed = 0_usize;
2012
2013        for (i, level) in snapshot.bids.iter().enumerate() {
2014            let price = parse_price(&level.0, price_precision)?;
2015            let size = parse_quantity(&level.1, size_precision)?;
2016            let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
2017            processed += 1;
2018            let mut flags = RecordFlag::F_SNAPSHOT as u8;
2019
2020            if processed == total_levels {
2021                flags |= RecordFlag::F_LAST as u8;
2022            }
2023            deltas.push(OrderBookDelta::new(
2024                instrument_id,
2025                BookAction::Add,
2026                order,
2027                flags,
2028                0,
2029                ts_event,
2030                ts_event,
2031            ));
2032        }
2033
2034        let bids_len = snapshot.bids.len();
2035
2036        for (i, level) in snapshot.asks.iter().enumerate() {
2037            let price = parse_price(&level.0, price_precision)?;
2038            let size = parse_quantity(&level.1, size_precision)?;
2039            let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
2040            processed += 1;
2041            let mut flags = RecordFlag::F_SNAPSHOT as u8;
2042
2043            if processed == total_levels {
2044                flags |= RecordFlag::F_LAST as u8;
2045            }
2046            deltas.push(OrderBookDelta::new(
2047                instrument_id,
2048                BookAction::Add,
2049                order,
2050                flags,
2051                0,
2052                ts_event,
2053                ts_event,
2054            ));
2055        }
2056
2057        log::info!(
2058            "Fetched order book snapshot for {} with {} bids and {} asks",
2059            instrument_id,
2060            snapshot.bids.len(),
2061            snapshot.asks.len(),
2062        );
2063
2064        OrderBookDeltas::new_checked(instrument_id, deltas)
2065            .context("failed to assemble OrderBookDeltas from OKX snapshot")
2066    }
2067
2068    /// Requests historical funding rates for the `instrument_id`.
2069    ///
2070    /// # Errors
2071    ///
2072    /// Returns an error if the HTTP request fails or parsing fails.
2073    pub async fn request_funding_rates(
2074        &self,
2075        instrument_id: InstrumentId,
2076        start: Option<DateTime<Utc>>,
2077        end: Option<DateTime<Utc>>,
2078        limit: Option<u32>,
2079    ) -> anyhow::Result<Vec<FundingRateUpdate>> {
2080        let mut params = GetFundingRateHistoryParams {
2081            inst_id: instrument_id.symbol.to_string(),
2082            ..Default::default()
2083        };
2084
2085        // OKX uses "before" for newer-than and "after" for older-than
2086        if let Some(start) = start {
2087            params.before = Some(start.timestamp_millis().to_string());
2088        }
2089
2090        if let Some(end) = end {
2091            params.after = Some(end.timestamp_millis().to_string());
2092        }
2093
2094        params.limit = limit;
2095
2096        let resp = self
2097            .inner
2098            .get_funding_rate_history(params)
2099            .await
2100            .map_err(|e| anyhow::anyhow!(e))?;
2101
2102        let mut rates = Vec::with_capacity(resp.len());
2103
2104        for window in resp.windows(2) {
2105            let raw = &window[0];
2106            let interval_millis = raw
2107                .funding_time
2108                .checked_sub(window[1].funding_time)
2109                .context("funding interval negative, funding rates out of order")?;
2110            let rate = parse_funding_rate(raw, instrument_id, Some(interval_millis))?;
2111            rates.push(rate);
2112        }
2113
2114        if let Some(last_raw) = resp.last() {
2115            // oldest funding update has no previous one to compute interval
2116            let rate = parse_funding_rate(last_raw, instrument_id, None)?;
2117            rates.push(rate);
2118        }
2119
2120        // OKX returns newest-first; reverse to chronological order so that
2121        // cache.add_funding_rates (which push_fronts) leaves the newest at front
2122        rates.reverse();
2123
2124        log::info!(
2125            "Fetched {} funding rates for {}",
2126            rates.len(),
2127            instrument_id,
2128        );
2129
2130        Ok(rates)
2131    }
2132
2133    /// Requests trades for the `instrument_id` and `start` -> `end` time range.
2134    ///
2135    /// # Errors
2136    ///
2137    /// Returns an error if the HTTP request fails or trade parsing fails.
2138    pub async fn request_trades(
2139        &self,
2140        instrument_id: InstrumentId,
2141        start: Option<DateTime<Utc>>,
2142        end: Option<DateTime<Utc>>,
2143        limit: Option<u32>,
2144    ) -> anyhow::Result<Vec<TradeTick>> {
2145        const OKX_TRADES_MAX_LIMIT: u32 = 100;
2146        const MAX_PAGES: usize = 500;
2147        const MAX_CONSECUTIVE_EMPTY: usize = 3;
2148
2149        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
2150        enum Mode {
2151            Latest,
2152            Backward,
2153            Range,
2154        }
2155
2156        let limit = if limit == Some(0) { None } else { limit };
2157
2158        if let (Some(s), Some(e)) = (start, end) {
2159            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
2160        }
2161
2162        let now = Utc::now();
2163
2164        if let Some(s) = start
2165            && s > now
2166        {
2167            return Ok(Vec::new());
2168        }
2169
2170        let end = if let Some(e) = end
2171            && e > now
2172        {
2173            Some(now)
2174        } else {
2175            end
2176        };
2177
2178        let mode = match (start, end) {
2179            (None, None) => Mode::Latest,
2180            (Some(_), None) => Mode::Backward,
2181            (None, Some(_)) => Mode::Backward,
2182            (Some(_), Some(_)) => Mode::Range,
2183        };
2184
2185        let start_ms = start.map(|s| s.timestamp_millis());
2186        let end_ms = end.map(|e| e.timestamp_millis());
2187
2188        let ts_init = self.generate_ts_init();
2189        let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
2190
2191        // Historical pagination walks backwards using trade IDs, OKX does not honour timestamps for
2192        // standalone `before` requests (type=2)
2193        if matches!(mode, Mode::Backward | Mode::Range) {
2194            let mut before_trade_id: Option<String> = None;
2195            let mut pages = 0usize;
2196            let mut page_results: Vec<Vec<TradeTick>> = Vec::new();
2197            let mut seen_trades: AHashSet<(String, i64)> = AHashSet::new();
2198            let mut unique_count = 0usize;
2199            let mut consecutive_empty_pages = 0usize;
2200
2201            // Only apply default limit when there's no start boundary
2202            // (start provides a natural stopping point, end alone allows infinite backward pagination)
2203            let effective_limit = if start.is_some() {
2204                limit.unwrap_or(u32::MAX)
2205            } else {
2206                limit.unwrap_or(OKX_TRADES_MAX_LIMIT)
2207            };
2208
2209            log::debug!(
2210                "Starting trades pagination: mode={mode:?}, start={start:?}, end={end:?}, limit={limit:?}, effective_limit={effective_limit}"
2211            );
2212
2213            loop {
2214                if pages >= MAX_PAGES {
2215                    log::warn!("Hit MAX_PAGES limit of {MAX_PAGES}");
2216                    break;
2217                }
2218
2219                if effective_limit < u32::MAX && unique_count >= effective_limit as usize {
2220                    log::debug!("Reached effective limit: unique_count={unique_count}");
2221                    break;
2222                }
2223
2224                let remaining = (effective_limit as usize).saturating_sub(unique_count);
2225                let page_cap = remaining.min(OKX_TRADES_MAX_LIMIT as usize) as u32;
2226
2227                log::debug!(
2228                    "Requesting page {}: before_id={:?}, page_cap={}, unique_count={}",
2229                    pages + 1,
2230                    before_trade_id,
2231                    page_cap,
2232                    unique_count
2233                );
2234
2235                let mut params_builder = GetTradesParamsBuilder::default();
2236                params_builder
2237                    .inst_id(instrument_id.symbol.inner())
2238                    .limit(page_cap)
2239                    .pagination_type(1);
2240
2241                // Use 'after' to get older trades (OKX API: after=cursor means < cursor)
2242                if let Some(ref before_id) = before_trade_id {
2243                    params_builder.after(before_id.clone());
2244                }
2245
2246                let params = params_builder.build().map_err(anyhow::Error::new)?;
2247                let raw = self
2248                    .inner
2249                    .get_history_trades(params)
2250                    .await
2251                    .map_err(anyhow::Error::new)?;
2252
2253                log::debug!("Received {} raw trades from API", raw.len());
2254
2255                if let (Some(first), Some(last)) = (raw.first(), raw.last()) {
2256                    log::debug!(
2257                        "Raw response trade ID range: first={} (newest), last={} (oldest)",
2258                        first.trade_id,
2259                        last.trade_id,
2260                    );
2261                }
2262
2263                if raw.is_empty() {
2264                    log::debug!("API returned empty page, stopping pagination");
2265                    break;
2266                }
2267
2268                pages += 1;
2269
2270                let mut page_trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
2271                let mut hit_start_boundary = false;
2272                let mut filtered_out = 0usize;
2273                let mut duplicates = 0usize;
2274
2275                for r in &raw {
2276                    match parse_trade_tick(
2277                        r,
2278                        instrument_id,
2279                        inst.price_precision(),
2280                        inst.size_precision(),
2281                        ts_init,
2282                    ) {
2283                        Ok(trade) => {
2284                            let ts_ms = trade.ts_event.as_i64() / 1_000_000;
2285
2286                            if let Some(e_ms) = end_ms
2287                                && ts_ms > e_ms
2288                            {
2289                                filtered_out += 1;
2290                                continue;
2291                            }
2292
2293                            if let Some(s_ms) = start_ms
2294                                && ts_ms < s_ms
2295                            {
2296                                hit_start_boundary = true;
2297                                filtered_out += 1;
2298                                break;
2299                            }
2300
2301                            let trade_key = (trade.trade_id.to_string(), trade.ts_event.as_i64());
2302                            if seen_trades.insert(trade_key) {
2303                                unique_count += 1;
2304                                page_trades.push(trade);
2305                            } else {
2306                                duplicates += 1;
2307                            }
2308                        }
2309                        Err(e) => log::error!("{e}"),
2310                    }
2311                }
2312
2313                log::debug!(
2314                    "Page {} processed: {} trades kept, {} filtered out, {} duplicates, hit_start_boundary={}",
2315                    pages,
2316                    page_trades.len(),
2317                    filtered_out,
2318                    duplicates,
2319                    hit_start_boundary
2320                );
2321
2322                // Extract oldest unique trade ID for next page cursor
2323                let oldest_trade_id = if page_trades.is_empty() {
2324                    // Only apply consecutive empty guard if we've already collected some trades
2325                    // This allows historical backfills to paginate through empty prelude
2326                    if unique_count > 0 {
2327                        consecutive_empty_pages += 1;
2328                        if consecutive_empty_pages >= MAX_CONSECUTIVE_EMPTY {
2329                            log::debug!(
2330                                "Stopping: {consecutive_empty_pages} consecutive pages with no trades in range after collecting {unique_count} trades"
2331                            );
2332                            break;
2333                        }
2334                    }
2335                    // No unique trades on page, use raw response for cursor
2336                    raw.last().map(|t| {
2337                        let id = t.trade_id.to_string();
2338                        log::debug!(
2339                            "Setting cursor from raw response (no unique trades): oldest_id={id}"
2340                        );
2341                        id
2342                    })
2343                } else {
2344                    // Use oldest deduplicated trade ID before reversing
2345                    let oldest_id = page_trades.last().map(|t| {
2346                        let id = t.trade_id.to_string();
2347                        log::debug!(
2348                            "Setting cursor from deduplicated trades: oldest_id={}, ts_event={}",
2349                            id,
2350                            t.ts_event.as_i64()
2351                        );
2352                        id
2353                    });
2354                    page_trades.reverse();
2355                    page_results.push(page_trades);
2356                    consecutive_empty_pages = 0;
2357                    oldest_id
2358                };
2359
2360                if let Some(ref old_id) = before_trade_id
2361                    && oldest_trade_id.as_ref() == Some(old_id)
2362                {
2363                    break;
2364                }
2365
2366                if oldest_trade_id.is_none() {
2367                    break;
2368                }
2369
2370                before_trade_id = oldest_trade_id;
2371
2372                if hit_start_boundary {
2373                    break;
2374                }
2375
2376                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2377            }
2378
2379            log::debug!(
2380                "Pagination complete: {pages} pages, {unique_count} unique trades collected"
2381            );
2382
2383            let mut out: Vec<TradeTick> = Vec::new();
2384
2385            for page in page_results.into_iter().rev() {
2386                out.extend(page);
2387            }
2388
2389            // Deduplicate by (trade_id, ts_event) composite key
2390            let mut dedup_keys = AHashSet::new();
2391            let pre_dedup_len = out.len();
2392            out.retain(|trade| {
2393                dedup_keys.insert((trade.trade_id.to_string(), trade.ts_event.as_i64()))
2394            });
2395
2396            if out.len() < pre_dedup_len {
2397                log::debug!(
2398                    "Removed {} duplicate trades during final dedup",
2399                    pre_dedup_len - out.len()
2400                );
2401            }
2402
2403            if let Some(lim) = limit
2404                && lim > 0
2405                && out.len() > lim as usize
2406            {
2407                let excess = out.len() - lim as usize;
2408                log::debug!("Trimming {excess} oldest trades to respect limit={lim}");
2409                out.drain(0..excess);
2410            }
2411
2412            log::debug!("Returning {} trades", out.len());
2413            return Ok(out);
2414        }
2415
2416        let req_limit = limit
2417            .unwrap_or(OKX_TRADES_MAX_LIMIT)
2418            .min(OKX_TRADES_MAX_LIMIT);
2419        let params = GetTradesParamsBuilder::default()
2420            .inst_id(instrument_id.symbol.inner())
2421            .limit(req_limit)
2422            .build()
2423            .map_err(anyhow::Error::new)?;
2424
2425        let raw = self
2426            .inner
2427            .get_history_trades(params)
2428            .await
2429            .map_err(anyhow::Error::new)?;
2430
2431        let mut trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
2432
2433        for r in &raw {
2434            match parse_trade_tick(
2435                r,
2436                instrument_id,
2437                inst.price_precision(),
2438                inst.size_precision(),
2439                ts_init,
2440            ) {
2441                Ok(trade) => trades.push(trade),
2442                Err(e) => log::error!("{e}"),
2443            }
2444        }
2445
2446        // OKX returns newest-first, reverse to oldest-first
2447        trades.reverse();
2448
2449        if let Some(lim) = limit
2450            && lim > 0
2451            && trades.len() > lim as usize
2452        {
2453            trades.drain(0..trades.len() - lim as usize);
2454        }
2455
2456        Ok(trades)
2457    }
2458
2459    /// Requests historical bars for the given bar type and time range.
2460    ///
2461    /// The aggregation source must be `EXTERNAL`. Time range validation ensures start < end.
2462    /// Returns bars sorted oldest to newest.
2463    ///
2464    /// # Errors
2465    ///
2466    /// Returns an error if the request fails.
2467    ///
2468    /// # Endpoint Selection
2469    ///
2470    /// The OKX API has different endpoints with different limits:
2471    /// - Regular endpoint (`/api/v5/market/candles`): ≤ 300 rows/call, ≤ 40 req/2s
2472    ///   - Used when: start is None OR age ≤ 100 days
2473    /// - History endpoint (`/api/v5/market/history-candles`): ≤ 100 rows/call, ≤ 20 req/2s
2474    ///   - Used when: start is Some AND age > 100 days
2475    ///
2476    /// Age is calculated as `Utc::now() - start` at the time of the first request.
2477    ///
2478    /// # Supported Aggregations
2479    ///
2480    /// Maps to OKX bar query parameter:
2481    /// - `Second` → `{n}s`
2482    /// - `Minute` → `{n}m`
2483    /// - `Hour` → `{n}H`
2484    /// - `Day` → `{n}D`
2485    /// - `Week` → `{n}W`
2486    /// - `Month` → `{n}M`
2487    ///
2488    /// # Pagination
2489    ///
2490    /// - Uses `before` parameter for backwards pagination
2491    /// - Pages backwards from end time (or now) to start time
2492    /// - Stops when: limit reached, time window covered, or API returns empty
2493    /// - Rate limit safety: ≥ 50ms between requests
2494    ///
2495    /// # References
2496    ///
2497    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks>
2498    /// - <https://tr.okx.com/docs-v5/en/#order-book-trading-market-data-get-candlesticks-history>
2499    pub async fn request_bars(
2500        &self,
2501        bar_type: BarType,
2502        start: Option<DateTime<Utc>>,
2503        mut end: Option<DateTime<Utc>>,
2504        limit: Option<u32>,
2505    ) -> anyhow::Result<Vec<Bar>> {
2506        const HISTORY_SPLIT_DAYS: i64 = 100;
2507        const MAX_PAGES_SOFT: usize = 500;
2508
2509        #[derive(Clone, Copy, Debug, PartialEq, Eq)]
2510        enum Mode {
2511            Latest,
2512            Backward,
2513            Range,
2514        }
2515
2516        let limit = if limit == Some(0) { None } else { limit };
2517
2518        anyhow::ensure!(
2519            bar_type.aggregation_source() == AggregationSource::External,
2520            "Only EXTERNAL aggregation is supported"
2521        );
2522
2523        if let (Some(s), Some(e)) = (start, end) {
2524            anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
2525        }
2526
2527        let now = Utc::now();
2528
2529        if let Some(s) = start
2530            && s > now
2531        {
2532            return Ok(Vec::new());
2533        }
2534
2535        if let Some(e) = end
2536            && e > now
2537        {
2538            end = Some(now);
2539        }
2540
2541        let spec = bar_type.spec();
2542        let step = spec.step.get();
2543        let bar_param = match spec.aggregation {
2544            BarAggregation::Second => format!("{step}s"),
2545            BarAggregation::Minute => format!("{step}m"),
2546            BarAggregation::Hour => format!("{step}H"),
2547            BarAggregation::Day => format!("{step}D"),
2548            BarAggregation::Week => format!("{step}W"),
2549            BarAggregation::Month => format!("{step}M"),
2550            a => anyhow::bail!("OKX does not support {a:?} aggregation"),
2551        };
2552
2553        let slot_ms: i64 = match spec.aggregation {
2554            BarAggregation::Second => (step as i64) * 1_000,
2555            BarAggregation::Minute => (step as i64) * 60_000,
2556            BarAggregation::Hour => (step as i64) * 3_600_000,
2557            BarAggregation::Day => (step as i64) * 86_400_000,
2558            BarAggregation::Week => (step as i64) * 7 * 86_400_000,
2559            BarAggregation::Month => (step as i64) * 30 * 86_400_000,
2560            _ => unreachable!("Unsupported aggregation should have been caught above"),
2561        };
2562        let slot_ns: i64 = slot_ms * 1_000_000;
2563
2564        let mode = match (start, end) {
2565            (None, None) => Mode::Latest,
2566            (Some(_), None) => Mode::Backward, // Changed: when only start is provided, work backward from now
2567            (None, Some(_)) => Mode::Backward,
2568            (Some(_), Some(_)) => Mode::Range,
2569        };
2570
2571        let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
2572        let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
2573
2574        // Floor start and ceiling end to bar boundaries for cleaner API requests
2575        let start_ms = start.map(|s| {
2576            let ms = s.timestamp_millis();
2577
2578            if slot_ms > 0 {
2579                (ms / slot_ms) * slot_ms // Floor to nearest bar boundary
2580            } else {
2581                ms
2582            }
2583        });
2584        let end_ms = end.map(|e| {
2585            let ms = e.timestamp_millis();
2586
2587            if slot_ms > 0 {
2588                ((ms + slot_ms - 1) / slot_ms) * slot_ms // Ceiling to nearest bar boundary
2589            } else {
2590                ms
2591            }
2592        });
2593        let now_ms = now.timestamp_millis();
2594
2595        let symbol = bar_type.instrument_id().symbol;
2596        let inst = self.instrument_from_cache(symbol.inner())?;
2597
2598        let mut out: Vec<Bar> = Vec::new();
2599        let mut pages = 0usize;
2600
2601        // IMPORTANT: OKX API has COUNTER-INTUITIVE semantics (same for bars and trades):
2602        // - after=X returns records with timestamp < X (upper bound, despite the name!)
2603        // - before=X returns records with timestamp > X (lower bound, despite the name!)
2604        // For Range [start, end], use: before=start (lower bound), after=end (upper bound)
2605        let mut after_ms: Option<i64> = match mode {
2606            Mode::Range => end_ms.or(Some(now_ms)), // Upper bound: bars < end
2607            _ => None,
2608        };
2609        let mut before_ms: Option<i64> = match mode {
2610            Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
2611            Mode::Range => start_ms, // Lower bound: bars > start
2612            Mode::Latest => None,
2613        };
2614
2615        // For Range mode, we'll paginate backwards like Backward mode
2616        let mut forward_prepend_mode = matches!(mode, Mode::Range);
2617
2618        // Adjust before_ms to ensure we get data from the API
2619        // OKX API might not have bars for the very recent past
2620        // This handles both explicit end=now and the actor layer setting end=now when it's None
2621        if matches!(mode, Mode::Backward | Mode::Range)
2622            && let Some(b) = before_ms
2623        {
2624            // OKX endpoints have different data availability windows:
2625            // - Regular endpoint: has most recent data but limited depth
2626            // - History endpoint: has deep history but lags behind current time
2627            // Use a small buffer to avoid the "dead zone"
2628            let buffer_ms = slot_ms.max(60_000); // At least 1 minute or 1 bar
2629            if b >= now_ms.saturating_sub(buffer_ms) {
2630                before_ms = Some(now_ms.saturating_sub(buffer_ms));
2631            }
2632        }
2633
2634        let mut have_latest_first_page = false;
2635        let mut progressless_loops = 0u8;
2636
2637        loop {
2638            if let Some(lim) = limit
2639                && lim > 0
2640                && out.len() >= lim as usize
2641            {
2642                break;
2643            }
2644
2645            if pages >= MAX_PAGES_SOFT {
2646                break;
2647            }
2648
2649            let pivot_ms = if let Some(a) = after_ms {
2650                a
2651            } else if let Some(b) = before_ms {
2652                b
2653            } else {
2654                now_ms
2655            };
2656            // Choose endpoint based on how old the data is:
2657            // - Use regular endpoint for recent data (< 1 hour old)
2658            // - Use history endpoint for older data (> 1 hour old)
2659            // This avoids the "gap" where history endpoint has no recent data
2660            // and regular endpoint has limited depth
2661            let age_ms = now_ms.saturating_sub(pivot_ms);
2662            let age_hours = age_ms / (60 * 60 * 1000);
2663            let using_history = age_hours > 1; // Use history if data is > 1 hour old
2664
2665            let page_ceiling = if using_history { 100 } else { 300 };
2666            let remaining = limit
2667                .filter(|&l| l > 0) // Treat limit=0 as no limit
2668                .map_or(page_ceiling, |l| (l as usize).saturating_sub(out.len()));
2669            let page_cap = remaining.min(page_ceiling);
2670
2671            let mut p = GetCandlesticksParamsBuilder::default();
2672            p.inst_id(symbol.as_str())
2673                .bar(&bar_param)
2674                .limit(page_cap as u32);
2675
2676            // Track whether this planned request uses BEFORE or AFTER.
2677            let mut req_used_before = false;
2678
2679            match mode {
2680                Mode::Latest => {
2681                    if have_latest_first_page && let Some(b) = before_ms {
2682                        p.before_ms(b);
2683                        req_used_before = true;
2684                    }
2685                }
2686                Mode::Backward => {
2687                    // Use 'after' to get older bars (OKX API: after=cursor means < cursor)
2688                    if let Some(b) = before_ms {
2689                        p.after_ms(b);
2690                    }
2691                }
2692                Mode::Range => {
2693                    // For Range mode, use both after and before to specify the full range
2694                    // This is much more efficient than pagination
2695                    if let Some(a) = after_ms {
2696                        p.after_ms(a);
2697                    }
2698
2699                    if let Some(b) = before_ms {
2700                        p.before_ms(b);
2701                        req_used_before = true;
2702                    }
2703                }
2704            }
2705
2706            let params = p.build().map_err(anyhow::Error::new)?;
2707
2708            let mut raw = if using_history {
2709                self.inner
2710                    .get_history_candles(params.clone())
2711                    .await
2712                    .map_err(anyhow::Error::new)?
2713            } else {
2714                self.inner
2715                    .get_candles(params.clone())
2716                    .await
2717                    .map_err(anyhow::Error::new)?
2718            };
2719
2720            // --- Fallbacks on empty page ---
2721            if raw.is_empty() {
2722                // LATEST: retry same cursor via history, then step back a page-interval before giving up
2723                if matches!(mode, Mode::Latest)
2724                    && have_latest_first_page
2725                    && !using_history
2726                    && let Some(b) = before_ms
2727                {
2728                    let mut p2 = GetCandlesticksParamsBuilder::default();
2729                    p2.inst_id(symbol.as_str())
2730                        .bar(&bar_param)
2731                        .limit(page_cap as u32);
2732                    p2.before_ms(b);
2733                    let params2 = p2.build().map_err(anyhow::Error::new)?;
2734                    let raw2 = self
2735                        .inner
2736                        .get_history_candles(params2)
2737                        .await
2738                        .map_err(anyhow::Error::new)?;
2739
2740                    if raw2.is_empty() {
2741                        // Step back one page interval and retry loop
2742                        let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2743                        before_ms = Some(b.saturating_sub(jump));
2744                        progressless_loops = progressless_loops.saturating_add(1);
2745                        if progressless_loops >= 3 {
2746                            break;
2747                        }
2748                        continue;
2749                    } else {
2750                        raw = raw2;
2751                    }
2752                }
2753
2754                // Range mode doesn't need special bootstrap - it uses the normal flow with before_ms set
2755
2756                // If still empty: for Range after first page, try a single backstep window using BEFORE
2757                if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
2758                    let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
2759                    let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
2760
2761                    let mut p2 = GetCandlesticksParamsBuilder::default();
2762                    p2.inst_id(symbol.as_str())
2763                        .bar(&bar_param)
2764                        .limit(page_cap as u32)
2765                        .before_ms(pivot_back);
2766                    let params2 = p2.build().map_err(anyhow::Error::new)?;
2767                    let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
2768                        > HISTORY_SPLIT_DAYS
2769                    {
2770                        self.inner.get_history_candles(params2).await
2771                    } else {
2772                        self.inner.get_candles(params2).await
2773                    }
2774                    .map_err(anyhow::Error::new)?;
2775
2776                    if raw2.is_empty() {
2777                        break;
2778                    } else {
2779                        raw = raw2;
2780                        forward_prepend_mode = true;
2781                        req_used_before = true;
2782                    }
2783                }
2784
2785                // First LATEST page empty: jump back >100d to force history, then continue loop
2786                if raw.is_empty()
2787                    && matches!(mode, Mode::Latest)
2788                    && !have_latest_first_page
2789                    && !using_history
2790                {
2791                    let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
2792                    before_ms = Some(now_ms.saturating_sub(jump_days_ms));
2793                    have_latest_first_page = true;
2794                    continue;
2795                }
2796
2797                // Still empty for any other case? Just break.
2798                if raw.is_empty() {
2799                    break;
2800                }
2801            }
2802            // --- end fallbacks ---
2803
2804            pages += 1;
2805
2806            // Parse, oldest → newest
2807            let ts_init = self.generate_ts_init();
2808            let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2809
2810            for r in &raw {
2811                page.push(parse_candlestick(
2812                    r,
2813                    bar_type,
2814                    inst.price_precision(),
2815                    inst.size_precision(),
2816                    ts_init,
2817                )?);
2818            }
2819            page.reverse();
2820
2821            let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
2822            let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
2823
2824            // Range filter (inclusive)
2825            // For Range mode, if we have no bars yet and this is an early page,
2826            // be more tolerant with the start boundary to handle gaps in data
2827            let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
2828                && out.is_empty()
2829                && pages < 2
2830            {
2831                // On first pages of Range mode with no data yet, include the most recent bar
2832                // even if it's slightly before our start time (within 2 bar periods)
2833                // BUT we want ALL bars in the page that are within our range
2834                let tolerance_ns = slot_ns * 2; // Allow up to 2 bar periods before start
2835
2836                // Debug: log the page range
2837                if let (Some(first), Some(last)) = (page.first(), page.last()) {
2838                    log::debug!(
2839                        "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
2840                        page.len(),
2841                        first.ts_event.as_i64() / 1_000_000,
2842                        last.ts_event.as_i64() / 1_000_000,
2843                        start_ms,
2844                        end_ms,
2845                    );
2846                }
2847
2848                let result: Vec<Bar> = page
2849                    .clone()
2850                    .into_iter()
2851                    .filter(|b| {
2852                        let ts = b.ts_event.as_i64();
2853                        // Accept bars from (start - tolerance) to end
2854                        let ok_after =
2855                            start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
2856                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2857                        ok_after && ok_before
2858                    })
2859                    .collect();
2860
2861                result
2862            } else {
2863                // Normal filtering
2864                page.clone()
2865                    .into_iter()
2866                    .filter(|b| {
2867                        let ts = b.ts_event.as_i64();
2868                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2869                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2870                        ok_after && ok_before
2871                    })
2872                    .collect()
2873            };
2874
2875            if !page.is_empty() && filtered.is_empty() {
2876                // For Range mode, if all bars are before our start time, there's no point continuing
2877                if matches!(mode, Mode::Range)
2878                    && !forward_prepend_mode
2879                    && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
2880                    && newest_ms < start_ms.saturating_sub(slot_ms * 2)
2881                {
2882                    // Bars are too old (more than 2 bar periods before start), stop
2883                    break;
2884                }
2885            }
2886
2887            // Track contribution for progress guard
2888            let contribution;
2889
2890            if out.is_empty() {
2891                contribution = filtered.len();
2892                out = filtered;
2893            } else {
2894                match mode {
2895                    Mode::Backward | Mode::Latest => {
2896                        if let Some(first) = out.first() {
2897                            filtered.retain(|b| b.ts_event < first.ts_event);
2898                        }
2899                        contribution = filtered.len();
2900                        if contribution != 0 {
2901                            let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2902                            new_out.extend_from_slice(&filtered);
2903                            new_out.extend_from_slice(&out);
2904                            out = new_out;
2905                        }
2906                    }
2907                    Mode::Range => {
2908                        if forward_prepend_mode || req_used_before {
2909                            // We are backfilling older pages: prepend them.
2910                            if let Some(first) = out.first() {
2911                                filtered.retain(|b| b.ts_event < first.ts_event);
2912                            }
2913                            contribution = filtered.len();
2914                            if contribution != 0 {
2915                                let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2916                                new_out.extend_from_slice(&filtered);
2917                                new_out.extend_from_slice(&out);
2918                                out = new_out;
2919                            }
2920                        } else {
2921                            // Normal forward: append newer pages.
2922                            if let Some(last) = out.last() {
2923                                filtered.retain(|b| b.ts_event > last.ts_event);
2924                            }
2925                            contribution = filtered.len();
2926                            out.extend(filtered);
2927                        }
2928                    }
2929                }
2930            }
2931
2932            // Duplicate-window mitigation for Latest/Backward/Range
2933            if contribution == 0
2934                && matches!(mode, Mode::Latest | Mode::Backward | Mode::Range)
2935                && let Some(b) = before_ms
2936            {
2937                let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2938                let new_b = b.saturating_sub(jump);
2939                if new_b != b {
2940                    before_ms = Some(new_b);
2941                }
2942            }
2943
2944            if contribution == 0 {
2945                progressless_loops = progressless_loops.saturating_add(1);
2946                if progressless_loops >= 3 {
2947                    break;
2948                }
2949            } else {
2950                progressless_loops = 0;
2951
2952                // Advance cursors only when we made progress
2953                match mode {
2954                    Mode::Latest | Mode::Backward => {
2955                        if let Some(oldest) = page_oldest_ms {
2956                            before_ms = Some(oldest.saturating_sub(1));
2957                            have_latest_first_page = true;
2958                        } else {
2959                            break;
2960                        }
2961                    }
2962                    Mode::Range => {
2963                        if forward_prepend_mode || req_used_before {
2964                            if let Some(oldest) = page_oldest_ms {
2965                                // Move back by at least one bar period to avoid getting the same data
2966                                let jump_back = slot_ms.max(60_000); // At least 1 minute
2967                                before_ms = Some(oldest.saturating_sub(jump_back));
2968                                after_ms = None;
2969                            } else {
2970                                break;
2971                            }
2972                        } else if let Some(newest) = page_newest_ms {
2973                            after_ms = Some(newest.saturating_add(1));
2974                            before_ms = None;
2975                        } else {
2976                            break;
2977                        }
2978                    }
2979                }
2980            }
2981
2982            // Stop conditions
2983            if let Some(lim) = limit
2984                && lim > 0
2985                && out.len() >= lim as usize
2986            {
2987                break;
2988            }
2989
2990            if let Some(ens) = end_ns
2991                && let Some(last) = out.last()
2992                && last.ts_event.as_i64() >= ens
2993            {
2994                break;
2995            }
2996
2997            if let Some(sns) = start_ns
2998                && let Some(first) = out.first()
2999                && (matches!(mode, Mode::Backward) || forward_prepend_mode)
3000                && first.ts_event.as_i64() <= sns
3001            {
3002                // For Range mode, check if we have all bars up to the end time
3003                if matches!(mode, Mode::Range) {
3004                    // Don't stop if we haven't reached the end time yet
3005                    if let Some(ens) = end_ns
3006                        && let Some(last) = out.last()
3007                    {
3008                        let last_ts = last.ts_event.as_i64();
3009                        if last_ts < ens {
3010                            // We have bars before start but haven't reached end, need to continue forward
3011                            // Switch from backward to forward pagination
3012                            forward_prepend_mode = false;
3013                            after_ms = Some((last_ts / 1_000_000).saturating_add(1));
3014                            before_ms = None;
3015                            continue;
3016                        }
3017                    }
3018                }
3019                break;
3020            }
3021
3022            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
3023        }
3024
3025        // Final rescue for FORWARD/RANGE when nothing gathered
3026        if out.is_empty() && matches!(mode, Mode::Range) {
3027            let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
3028            let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
3029            let mut p = GetCandlesticksParamsBuilder::default();
3030            p.inst_id(symbol.as_str())
3031                .bar(&bar_param)
3032                .limit(300)
3033                .before_ms(pivot);
3034            let params = p.build().map_err(anyhow::Error::new)?;
3035            let raw = if hist {
3036                self.inner.get_history_candles(params).await
3037            } else {
3038                self.inner.get_candles(params).await
3039            }
3040            .map_err(anyhow::Error::new)?;
3041
3042            if !raw.is_empty() {
3043                let ts_init = self.generate_ts_init();
3044                let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
3045
3046                for r in &raw {
3047                    page.push(parse_candlestick(
3048                        r,
3049                        bar_type,
3050                        inst.price_precision(),
3051                        inst.size_precision(),
3052                        ts_init,
3053                    )?);
3054                }
3055                page.reverse();
3056                out = page
3057                    .into_iter()
3058                    .filter(|b| {
3059                        let ts = b.ts_event.as_i64();
3060                        let ok_after = start_ns.is_none_or(|sns| ts >= sns);
3061                        let ok_before = end_ns.is_none_or(|ens| ts <= ens);
3062                        ok_after && ok_before
3063                    })
3064                    .collect();
3065            }
3066        }
3067
3068        // Trim against end bound if needed (keep ≤ end)
3069        if let Some(ens) = end_ns {
3070            while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
3071                out.pop();
3072            }
3073        }
3074
3075        // Clamp first bar for Range when using forward pagination
3076        if matches!(mode, Mode::Range)
3077            && !forward_prepend_mode
3078            && let Some(sns) = start_ns
3079        {
3080            let lower = sns.saturating_sub(slot_ns);
3081            while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
3082                out.remove(0);
3083            }
3084        }
3085
3086        // Keep the most recent N bars when limit is specified
3087        if let Some(lim) = limit
3088            && lim > 0
3089            && out.len() > lim as usize
3090        {
3091            let start = out.len() - lim as usize;
3092            out.drain(..start);
3093        }
3094
3095        Ok(out)
3096    }
3097
3098    /// Requests historical order status reports for the given parameters.
3099    ///
3100    /// # Errors
3101    ///
3102    /// Returns an error if the request fails.
3103    ///
3104    /// # References
3105    ///
3106    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-7-days>.
3107    /// - <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-order-history-last-3-months>.
3108    #[expect(clippy::too_many_arguments)]
3109    pub async fn request_order_status_reports(
3110        &self,
3111        account_id: AccountId,
3112        instrument_type: Option<OKXInstrumentType>,
3113        instrument_id: Option<InstrumentId>,
3114        start: Option<DateTime<Utc>>,
3115        end: Option<DateTime<Utc>>,
3116        open_only: bool,
3117        limit: Option<u32>,
3118    ) -> anyhow::Result<Vec<OrderStatusReport>> {
3119        let instrument_type = if let Some(instrument_type) = instrument_type {
3120            instrument_type
3121        } else {
3122            let instrument_id = instrument_id.ok_or_else(|| {
3123                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
3124            })?;
3125            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
3126            okx_instrument_type(&instrument)?
3127        };
3128
3129        let mut history_base = GetOrderHistoryParamsBuilder::default();
3130        history_base.inst_type(instrument_type);
3131
3132        if let Some(instrument_id) = instrument_id.as_ref() {
3133            history_base.inst_id(instrument_id.symbol.inner().to_string());
3134        }
3135        let history_base = history_base.build().map_err(|e| anyhow::anyhow!(e))?;
3136
3137        let mut pending_base = GetOrderListParamsBuilder::default();
3138        pending_base.inst_type(instrument_type);
3139
3140        if let Some(instrument_id) = instrument_id.as_ref() {
3141            pending_base.inst_id(instrument_id.symbol.inner().to_string());
3142        }
3143        let pending_base = pending_base.build().map_err(|e| anyhow::anyhow!(e))?;
3144
3145        let combined_resp = if open_only {
3146            self.paginate_orders_pending(&pending_base, limit).await?
3147        } else {
3148            let (history, pending) = tokio::try_join!(
3149                self.paginate_orders_history(&history_base, limit),
3150                self.paginate_orders_pending(&pending_base, limit),
3151            )?;
3152            let mut combined_resp = history;
3153            combined_resp.extend(pending);
3154            combined_resp
3155        };
3156
3157        // Prepare time range filter
3158        let start_ns = start.map(UnixNanos::from);
3159        let end_ns = end.map(UnixNanos::from);
3160
3161        let ts_init = self.generate_ts_init();
3162        let mut reports = Vec::with_capacity(combined_resp.len());
3163
3164        // Use a seen filter in case pending orders are within the histories "2hr reserve window"
3165        let mut seen: AHashSet<String> = AHashSet::new();
3166
3167        for order in combined_resp {
3168            let seen_key = if !order.cl_ord_id.is_empty() {
3169                order.cl_ord_id.as_str().to_string()
3170            } else if let Some(algo_cl_ord_id) = order
3171                .algo_cl_ord_id
3172                .as_ref()
3173                .filter(|value| !value.as_str().is_empty())
3174            {
3175                algo_cl_ord_id.as_str().to_string()
3176            } else if let Some(algo_id) = order
3177                .algo_id
3178                .as_ref()
3179                .filter(|value| !value.as_str().is_empty())
3180            {
3181                algo_id.as_str().to_string()
3182            } else {
3183                order.ord_id.as_str().to_string()
3184            };
3185
3186            if !seen.insert(seen_key) {
3187                continue; // Reserved pending already reported
3188            }
3189
3190            let Ok(inst) = self.instrument_from_cache(order.inst_id) else {
3191                log::debug!(
3192                    "Skipping order report for instrument not in cache: symbol={}",
3193                    order.inst_id,
3194                );
3195                continue;
3196            };
3197
3198            let report = match parse_order_status_report(
3199                &order,
3200                account_id,
3201                inst.id(),
3202                inst.price_precision(),
3203                inst.size_precision(),
3204                ts_init,
3205            ) {
3206                Ok(report) => report,
3207                Err(e) => {
3208                    log::error!("Failed to parse order status report: {e}");
3209                    continue;
3210                }
3211            };
3212
3213            if let Some(start_ns) = start_ns
3214                && report.ts_last < start_ns
3215            {
3216                continue;
3217            }
3218
3219            if let Some(end_ns) = end_ns
3220                && report.ts_last > end_ns
3221            {
3222                continue;
3223            }
3224
3225            reports.push(report);
3226        }
3227
3228        Ok(reports)
3229    }
3230
3231    // Paginates through order history using `ord_id` as the cursor
3232    async fn paginate_orders_history(
3233        &self,
3234        base: &GetOrderHistoryParams,
3235        limit: Option<u32>,
3236    ) -> anyhow::Result<Vec<OKXOrderHistory>> {
3237        let mut all = Vec::new();
3238        let mut cursor: Option<String> = None;
3239        let mut exhausted = true;
3240
3241        for _ in 0..MAX_RECONCILIATION_PAGES {
3242            let mut params = base.clone();
3243            params.after = cursor.take();
3244
3245            let page = self
3246                .inner
3247                .get_orders_history(params)
3248                .await
3249                .map_err(|e| anyhow::anyhow!(e))?;
3250
3251            let page_len = page.len();
3252            cursor = page.last().map(|o| o.ord_id.to_string());
3253            all.extend(page);
3254
3255            if page_len < OKX_PAGE_SIZE {
3256                exhausted = false;
3257                break;
3258            }
3259
3260            if let Some(lim) = limit
3261                && all.len() >= lim as usize
3262            {
3263                exhausted = false;
3264                break;
3265            }
3266        }
3267
3268        if exhausted && !all.is_empty() {
3269            log::warn!(
3270                "Order history pagination hit {MAX_RECONCILIATION_PAGES} page cap, \
3271                 results may be truncated ({} records)",
3272                all.len()
3273            );
3274        }
3275
3276        if let Some(lim) = limit {
3277            all.truncate(lim as usize);
3278        }
3279
3280        Ok(all)
3281    }
3282
3283    // Paginates through pending orders using `ord_id` as the cursor
3284    async fn paginate_orders_pending(
3285        &self,
3286        base: &GetOrderListParams,
3287        limit: Option<u32>,
3288    ) -> anyhow::Result<Vec<OKXOrderHistory>> {
3289        let mut all = Vec::new();
3290        let mut cursor: Option<String> = None;
3291        let mut exhausted = true;
3292
3293        for _ in 0..MAX_RECONCILIATION_PAGES {
3294            let mut params = base.clone();
3295            params.after = cursor.take();
3296
3297            let page = self
3298                .inner
3299                .get_orders_pending(params)
3300                .await
3301                .map_err(|e| anyhow::anyhow!(e))?;
3302
3303            let page_len = page.len();
3304            cursor = page.last().map(|o| o.ord_id.to_string());
3305            all.extend(page);
3306
3307            if page_len < OKX_PAGE_SIZE {
3308                exhausted = false;
3309                break;
3310            }
3311
3312            if let Some(lim) = limit
3313                && all.len() >= lim as usize
3314            {
3315                exhausted = false;
3316                break;
3317            }
3318        }
3319
3320        if exhausted && !all.is_empty() {
3321            log::warn!(
3322                "Pending orders pagination hit {MAX_RECONCILIATION_PAGES} page cap, \
3323                 results may be truncated ({} records)",
3324                all.len()
3325            );
3326        }
3327
3328        if let Some(lim) = limit {
3329            all.truncate(lim as usize);
3330        }
3331
3332        Ok(all)
3333    }
3334
3335    // Paginates through transaction details (fills) using `bill_id` as the cursor
3336    async fn paginate_fills(
3337        &self,
3338        base: &GetTransactionDetailsParams,
3339        limit: Option<u32>,
3340    ) -> anyhow::Result<Vec<OKXTransactionDetail>> {
3341        let mut all = Vec::new();
3342        let mut cursor: Option<String> = None;
3343        let mut exhausted = true;
3344
3345        for _ in 0..MAX_RECONCILIATION_PAGES {
3346            let mut params = base.clone();
3347            params.after = cursor.take();
3348
3349            let page = self
3350                .inner
3351                .get_fills(params)
3352                .await
3353                .map_err(|e| anyhow::anyhow!(e))?;
3354
3355            let page_len = page.len();
3356            cursor = page.last().map(|o| o.bill_id.to_string());
3357            all.extend(page);
3358
3359            if page_len < OKX_PAGE_SIZE {
3360                exhausted = false;
3361                break;
3362            }
3363
3364            if let Some(lim) = limit
3365                && all.len() >= lim as usize
3366            {
3367                exhausted = false;
3368                break;
3369            }
3370        }
3371
3372        if exhausted && !all.is_empty() {
3373            log::warn!(
3374                "Fill pagination hit {MAX_RECONCILIATION_PAGES} page cap, \
3375                 results may be truncated ({} records)",
3376                all.len()
3377            );
3378        }
3379
3380        if let Some(lim) = limit {
3381            all.truncate(lim as usize);
3382        }
3383
3384        Ok(all)
3385    }
3386
3387    // Paginates through pending algo orders using `algo_id` as the cursor
3388    async fn paginate_algo_pending(
3389        &self,
3390        base: &GetAlgoOrdersParams,
3391        limit: Option<usize>,
3392    ) -> anyhow::Result<Vec<OKXOrderAlgo>> {
3393        let mut all = Vec::new();
3394        let mut cursor: Option<String> = None;
3395        let mut exhausted = true;
3396
3397        for _ in 0..MAX_RECONCILIATION_PAGES {
3398            let mut params = base.clone();
3399            params.after = cursor.take();
3400
3401            let page = match self.inner.get_order_algo_pending(params).await {
3402                Ok(result) => result,
3403                Err(OKXHttpError::UnexpectedStatus { status, .. })
3404                    if status == StatusCode::NOT_FOUND =>
3405                {
3406                    exhausted = false;
3407                    break;
3408                }
3409                Err(e) => return Err(e.into()),
3410            };
3411
3412            let page_len = page.len();
3413            cursor = page.last().map(|o| o.algo_id.clone());
3414            all.extend(page);
3415
3416            if page_len < OKX_PAGE_SIZE {
3417                exhausted = false;
3418                break;
3419            }
3420
3421            if let Some(lim) = limit
3422                && all.len() >= lim
3423            {
3424                exhausted = false;
3425                break;
3426            }
3427        }
3428
3429        if exhausted && !all.is_empty() {
3430            log::warn!(
3431                "Algo pending pagination hit {MAX_RECONCILIATION_PAGES} page cap, \
3432                 results may be truncated ({} records)",
3433                all.len()
3434            );
3435        }
3436
3437        Ok(all)
3438    }
3439
3440    // Paginates through historical algo orders using `algo_id` as the cursor
3441    async fn paginate_algo_history(
3442        &self,
3443        base: &GetAlgoOrdersParams,
3444        limit: Option<usize>,
3445    ) -> anyhow::Result<Vec<OKXOrderAlgo>> {
3446        let mut all = Vec::new();
3447        let mut cursor: Option<String> = None;
3448        let mut exhausted = true;
3449
3450        for _ in 0..MAX_RECONCILIATION_PAGES {
3451            let mut params = base.clone();
3452            params.after = cursor.take();
3453
3454            let page = match self.inner.get_order_algo_history(params).await {
3455                Ok(result) => result,
3456                Err(OKXHttpError::UnexpectedStatus { status, .. })
3457                    if status == StatusCode::NOT_FOUND =>
3458                {
3459                    exhausted = false;
3460                    break;
3461                }
3462                Err(e) => return Err(e.into()),
3463            };
3464
3465            let page_len = page.len();
3466            cursor = page.last().map(|o| o.algo_id.clone());
3467            all.extend(page);
3468
3469            if page_len < OKX_PAGE_SIZE {
3470                exhausted = false;
3471                break;
3472            }
3473
3474            if let Some(lim) = limit
3475                && all.len() >= lim
3476            {
3477                exhausted = false;
3478                break;
3479            }
3480        }
3481
3482        if exhausted && !all.is_empty() {
3483            log::warn!(
3484                "Algo history pagination hit {MAX_RECONCILIATION_PAGES} page cap, \
3485                 results may be truncated ({} records)",
3486                all.len()
3487            );
3488        }
3489
3490        Ok(all)
3491    }
3492
3493    /// Requests fill reports (transaction details) for the given parameters.
3494    ///
3495    /// # Errors
3496    ///
3497    /// Returns an error if the request fails.
3498    ///
3499    /// # References
3500    ///
3501    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-get-transaction-details-last-3-days>.
3502    pub async fn request_fill_reports(
3503        &self,
3504        account_id: AccountId,
3505        instrument_type: Option<OKXInstrumentType>,
3506        instrument_id: Option<InstrumentId>,
3507        start: Option<DateTime<Utc>>,
3508        end: Option<DateTime<Utc>>,
3509        limit: Option<u32>,
3510    ) -> anyhow::Result<Vec<FillReport>> {
3511        let mut params = GetTransactionDetailsParamsBuilder::default();
3512
3513        let instrument_type = if let Some(instrument_type) = instrument_type {
3514            instrument_type
3515        } else {
3516            let instrument_id = instrument_id.ok_or_else(|| {
3517                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
3518            })?;
3519            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
3520            okx_instrument_type(&instrument)?
3521        };
3522
3523        params.inst_type(instrument_type);
3524
3525        if let Some(instrument_id) = instrument_id {
3526            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
3527            let instrument_type = okx_instrument_type(&instrument)?;
3528            params.inst_type(instrument_type);
3529            params.inst_id(instrument_id.symbol.inner().to_string());
3530        }
3531
3532        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
3533
3534        let resp = self.paginate_fills(&params, limit).await?;
3535
3536        // Prepare time range filter
3537        let start_ns = start.map(UnixNanos::from);
3538        let end_ns = end.map(UnixNanos::from);
3539
3540        let ts_init = self.generate_ts_init();
3541        let mut reports = Vec::with_capacity(resp.len());
3542
3543        for detail in resp {
3544            // Skip fills with zero or negative quantity (cancelled orders, etc)
3545            if detail.fill_sz.is_empty() {
3546                continue;
3547            }
3548
3549            if let Ok(qty) = detail.fill_sz.parse::<f64>() {
3550                if qty <= 0.0 {
3551                    continue;
3552                }
3553            } else {
3554                // Skip unparsable quantities
3555                continue;
3556            }
3557
3558            let Ok(inst) = self.instrument_from_cache(detail.inst_id) else {
3559                log::debug!(
3560                    "Skipping fill report for instrument not in cache: symbol={}",
3561                    detail.inst_id,
3562                );
3563                continue;
3564            };
3565
3566            let report = match parse_fill_report(
3567                &detail,
3568                account_id,
3569                inst.id(),
3570                inst.price_precision(),
3571                inst.size_precision(),
3572                ts_init,
3573            ) {
3574                Ok(report) => report,
3575                Err(e) => {
3576                    log::error!("Failed to parse fill report: {e}");
3577                    continue;
3578                }
3579            };
3580
3581            if let Some(start_ns) = start_ns
3582                && report.ts_event < start_ns
3583            {
3584                continue;
3585            }
3586
3587            if let Some(end_ns) = end_ns
3588                && report.ts_event > end_ns
3589            {
3590                continue;
3591            }
3592
3593            reports.push(report);
3594        }
3595
3596        Ok(reports)
3597    }
3598
3599    /// Requests current position status reports for the given parameters.
3600    ///
3601    /// # Position Modes
3602    ///
3603    /// OKX supports two position modes, which affects how position data is returned:
3604    ///
3605    /// ## Net Mode (One-way)
3606    /// - `posSide` field will be `"net"`
3607    /// - `pos` field uses **signed quantities**:
3608    ///   - Positive value = Long position
3609    ///   - Negative value = Short position
3610    ///   - Zero = Flat/no position
3611    ///
3612    /// ## Long/Short Mode (Hedge/Dual-side)
3613    /// - `posSide` field will be `"long"` or `"short"`
3614    /// - `pos` field is **always positive** (use `posSide` to determine actual side)
3615    /// - Allows holding simultaneous long and short positions on the same instrument
3616    /// - Position IDs are suffixed with `-LONG` or `-SHORT` for uniqueness
3617    ///
3618    /// # Errors
3619    ///
3620    /// Returns an error if the request fails.
3621    ///
3622    /// # References
3623    ///
3624    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-positions>
3625    pub async fn request_position_status_reports(
3626        &self,
3627        account_id: AccountId,
3628        instrument_type: Option<OKXInstrumentType>,
3629        instrument_id: Option<InstrumentId>,
3630    ) -> anyhow::Result<Vec<PositionStatusReport>> {
3631        let mut params = GetPositionsParamsBuilder::default();
3632
3633        let instrument_type = if let Some(instrument_type) = instrument_type {
3634            instrument_type
3635        } else {
3636            let instrument_id = instrument_id.ok_or_else(|| {
3637                anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
3638            })?;
3639            let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
3640            okx_instrument_type(&instrument)?
3641        };
3642
3643        params.inst_type(instrument_type);
3644
3645        instrument_id
3646            .as_ref()
3647            .map(|i| params.inst_id(i.symbol.inner()));
3648
3649        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
3650
3651        let resp = self
3652            .inner
3653            .get_positions(params)
3654            .await
3655            .map_err(|e| anyhow::anyhow!(e))?;
3656
3657        let ts_init = self.generate_ts_init();
3658        let mut reports = Vec::with_capacity(resp.len());
3659
3660        for position in resp {
3661            let Ok(inst) = self.instrument_from_cache(position.inst_id) else {
3662                log::debug!(
3663                    "Skipping position report for instrument not in cache: symbol={}",
3664                    position.inst_id,
3665                );
3666                continue;
3667            };
3668
3669            match parse_position_status_report(
3670                &position,
3671                account_id,
3672                inst.id(),
3673                inst.size_precision(),
3674                ts_init,
3675            ) {
3676                Ok(report) => reports.push(report),
3677                Err(e) => {
3678                    log::error!("Failed to parse position status report: {e}");
3679                }
3680            }
3681        }
3682
3683        Ok(reports)
3684    }
3685
3686    /// Requests spot margin position status reports from account balance.
3687    ///
3688    /// Spot margin positions appear in `/api/v5/account/balance` as balance sheet items
3689    /// with non-zero `liab` (liability) or `spotInUseAmt` fields, rather than in the
3690    /// positions endpoint. This method fetches the balance and converts any margin
3691    /// positions into position status reports.
3692    ///
3693    /// # Errors
3694    ///
3695    /// Returns an error if the request fails or parsing fails.
3696    ///
3697    /// # References
3698    ///
3699    /// <https://www.okx.com/docs-v5/en/#trading-account-rest-api-get-balance>
3700    pub async fn request_spot_margin_position_reports(
3701        &self,
3702        account_id: AccountId,
3703    ) -> anyhow::Result<Vec<PositionStatusReport>> {
3704        let accounts = self
3705            .inner
3706            .get_balance()
3707            .await
3708            .map_err(|e| anyhow::anyhow!(e))?;
3709
3710        let ts_init = self.generate_ts_init();
3711        let mut reports = Vec::new();
3712
3713        // Build a base-currency lookup over the cached spot pairs once per
3714        // call. Restricting to `CurrencyPair` (spot) ensures a derivative
3715        // sharing the same base (e.g. `BTC-USDT-SWAP`) is never reported as
3716        // a spot margin position with the wrong instrument id or size
3717        // precision.
3718        //
3719        // When multiple spot pairs share the same base currency, prefer the
3720        // dominant OKX quote (USDT, then USDC, then USD) so a live
3721        // `BTC-USDT` margin position stays reported under `BTC-USDT.OKX`
3722        // rather than being redirected to `BTC-USD.OKX` or any other
3723        // lexically-earlier pair. Unknown quotes fall back to a stable
3724        // lexical order by symbol, matching OKX's own listing precedence
3725        // and keeping the selection deterministic across runs.
3726        let cache_snapshot = self.instruments_cache.load();
3727        let mut candidates: Vec<&InstrumentAny> = cache_snapshot
3728            .values()
3729            .filter(|inst| matches!(inst, InstrumentAny::CurrencyPair(_)))
3730            .collect();
3731        candidates.sort_by(|a, b| {
3732            let a_sym = a.id().symbol.as_str().to_string();
3733            let b_sym = b.id().symbol.as_str().to_string();
3734            spot_quote_priority(&a_sym)
3735                .cmp(&spot_quote_priority(&b_sym))
3736                .then_with(|| a_sym.cmp(&b_sym))
3737        });
3738
3739        let mut by_base: AHashMap<Ustr, (InstrumentId, u8)> = AHashMap::new();
3740
3741        for inst in candidates {
3742            if let Some(base) = inst.base_currency() {
3743                let base_code = Ustr::from(base.code.as_str());
3744                by_base
3745                    .entry(base_code)
3746                    .or_insert_with(|| (inst.id(), inst.size_precision()));
3747            }
3748        }
3749
3750        for account in accounts {
3751            for balance in account.details {
3752                let ccy_str = balance.ccy.as_str();
3753
3754                let Some((instrument_id, size_precision)) =
3755                    by_base.get(&Ustr::from(ccy_str)).copied()
3756                else {
3757                    log::debug!("Skipping balance for {ccy_str} - no matching instrument in cache");
3758                    continue;
3759                };
3760
3761                match parse_spot_margin_position_from_balance(
3762                    &balance,
3763                    account_id,
3764                    instrument_id,
3765                    size_precision,
3766                    ts_init,
3767                ) {
3768                    Ok(Some(report)) => reports.push(report),
3769                    Ok(None) => {} // No margin position for this currency
3770                    Err(e) => {
3771                        log::error!(
3772                            "Failed to parse spot margin position from balance for {ccy_str}: {e}"
3773                        );
3774                    }
3775                }
3776            }
3777        }
3778
3779        Ok(reports)
3780    }
3781
3782    /// Places a regular order via HTTP.
3783    ///
3784    /// # Errors
3785    ///
3786    /// Returns an error if the request fails.
3787    ///
3788    /// # References
3789    ///
3790    /// <https://www.okx.com/docs-v5/en/#order-book-trading-trade-post-place-order>
3791    pub async fn place_order(
3792        &self,
3793        request: OKXPlaceOrderRequest,
3794    ) -> Result<OKXPlaceOrderResponse, OKXHttpError> {
3795        let body =
3796            serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3797
3798        let resp: Vec<OKXPlaceOrderResponse> = self
3799            .inner
3800            .send_request::<_, ()>(Method::POST, "/api/v5/trade/order", None, Some(body), true)
3801            .await?;
3802
3803        resp.into_iter()
3804            .next()
3805            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
3806    }
3807
3808    /// Places an algo order via HTTP.
3809    ///
3810    /// # Errors
3811    ///
3812    /// Returns an error if the request fails.
3813    ///
3814    /// # References
3815    ///
3816    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-place-algo-order>
3817    pub async fn place_algo_order(
3818        &self,
3819        request: OKXPlaceAlgoOrderRequest,
3820    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
3821        let body =
3822            serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3823
3824        let resp: Vec<OKXPlaceAlgoOrderResponse> = self
3825            .inner
3826            .send_request::<_, ()>(
3827                Method::POST,
3828                "/api/v5/trade/order-algo",
3829                None,
3830                Some(body),
3831                true,
3832            )
3833            .await?;
3834
3835        let item = resp
3836            .into_iter()
3837            .next()
3838            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))?;
3839
3840        if let Some(ref code) = item.s_code
3841            && code != "0"
3842        {
3843            let msg = item.s_msg.clone().unwrap_or_default();
3844            return Err(OKXHttpError::OkxError {
3845                error_code: code.clone(),
3846                message: msg,
3847            });
3848        }
3849
3850        Ok(item)
3851    }
3852
3853    /// Cancels an algo order via HTTP.
3854    ///
3855    /// # Errors
3856    ///
3857    /// Returns an error if the request fails.
3858    ///
3859    /// # References
3860    ///
3861    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-cancel-algo-order>
3862    pub async fn cancel_algo_order(
3863        &self,
3864        request: OKXCancelAlgoOrderRequest,
3865    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
3866        // OKX expects an array for cancel-algos endpoint
3867        // Serialize once to bytes to keep signing and sending identical
3868        let body =
3869            serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3870
3871        let resp: Vec<OKXCancelAlgoOrderResponse> = self
3872            .inner
3873            .send_request::<_, ()>(
3874                Method::POST,
3875                "/api/v5/trade/cancel-algos",
3876                None,
3877                Some(body),
3878                true,
3879            )
3880            .await?;
3881
3882        let item = resp
3883            .into_iter()
3884            .next()
3885            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))?;
3886
3887        if let Some(ref code) = item.s_code
3888            && code != "0"
3889        {
3890            let msg = item.s_msg.clone().unwrap_or_default();
3891            return Err(OKXHttpError::OkxError {
3892                error_code: code.clone(),
3893                message: msg,
3894            });
3895        }
3896
3897        Ok(item)
3898    }
3899
3900    /// Cancels multiple algo orders via HTTP in a single request.
3901    ///
3902    /// Items with non-zero `sCode` are logged as warnings but do not
3903    /// fail the entire batch.
3904    ///
3905    /// # Errors
3906    ///
3907    /// Returns an error if the request fails.
3908    ///
3909    /// # References
3910    ///
3911    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-cancel-algo-order>
3912    pub async fn cancel_algo_orders(
3913        &self,
3914        requests: Vec<OKXCancelAlgoOrderRequest>,
3915    ) -> Result<Vec<OKXCancelAlgoOrderResponse>, OKXHttpError> {
3916        if requests.is_empty() {
3917            return Ok(Vec::new());
3918        }
3919
3920        let body =
3921            serde_json::to_vec(&requests).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3922
3923        let resp: Vec<OKXCancelAlgoOrderResponse> = self
3924            .inner
3925            .send_request::<_, ()>(
3926                Method::POST,
3927                "/api/v5/trade/cancel-algos",
3928                None,
3929                Some(body),
3930                true,
3931            )
3932            .await?;
3933
3934        for item in &resp {
3935            if let Some(ref code) = item.s_code
3936                && code != "0"
3937            {
3938                let msg = item.s_msg.as_deref().unwrap_or("");
3939                log::warn!(
3940                    "Algo cancel rejected: algo_id={} sCode={code} sMsg={msg}",
3941                    item.algo_id
3942                );
3943            }
3944        }
3945
3946        Ok(resp)
3947    }
3948
3949    /// Cancels advance algo orders (trailing stop, iceberg, TWAP) via HTTP.
3950    ///
3951    /// These order types cannot use the standard `cancel-algos` endpoint.
3952    /// Items with non-zero `sCode` are logged as warnings.
3953    ///
3954    /// # Errors
3955    ///
3956    /// Returns an error if the request fails.
3957    ///
3958    /// # References
3959    ///
3960    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-cancel-advance-algo-order>
3961    pub async fn cancel_advance_algo_orders(
3962        &self,
3963        requests: Vec<OKXCancelAlgoOrderRequest>,
3964    ) -> Result<Vec<OKXCancelAlgoOrderResponse>, OKXHttpError> {
3965        if requests.is_empty() {
3966            return Ok(Vec::new());
3967        }
3968
3969        let body =
3970            serde_json::to_vec(&requests).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3971
3972        let resp: Vec<OKXCancelAlgoOrderResponse> = self
3973            .inner
3974            .send_request::<_, ()>(
3975                Method::POST,
3976                "/api/v5/trade/cancel-advance-algos",
3977                None,
3978                Some(body),
3979                true,
3980            )
3981            .await?;
3982
3983        for item in &resp {
3984            if let Some(ref code) = item.s_code
3985                && code != "0"
3986            {
3987                let msg = item.s_msg.as_deref().unwrap_or("");
3988                log::warn!(
3989                    "Advance algo cancel rejected: algo_id={} sCode={code} sMsg={msg}",
3990                    item.algo_id
3991                );
3992            }
3993        }
3994
3995        Ok(resp)
3996    }
3997
3998    /// Amends an algo order via HTTP.
3999    ///
4000    /// # Errors
4001    ///
4002    /// Returns an error if the request fails.
4003    ///
4004    /// # References
4005    ///
4006    /// <https://www.okx.com/docs-v5/en/#order-book-trading-algo-trading-post-amend-algo-order>
4007    pub async fn amend_algo_order(
4008        &self,
4009        request: OKXAmendAlgoOrderRequest,
4010    ) -> Result<OKXAmendAlgoOrderResponse, OKXHttpError> {
4011        let body =
4012            serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
4013
4014        let resp: Vec<OKXAmendAlgoOrderResponse> = self
4015            .inner
4016            .send_request::<_, ()>(
4017                Method::POST,
4018                "/api/v5/trade/amend-algos",
4019                None,
4020                Some(body),
4021                true,
4022            )
4023            .await?;
4024
4025        resp.into_iter()
4026            .next()
4027            .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
4028    }
4029
4030    /// Amends an algo order using domain types.
4031    ///
4032    /// This is a convenience method that accepts Nautilus domain types
4033    /// and builds the appropriate OKX request structure internally.
4034    ///
4035    /// # Errors
4036    ///
4037    /// Returns an error if the request fails.
4038    #[expect(clippy::too_many_arguments)]
4039    pub async fn amend_algo_order_with_domain_types(
4040        &self,
4041        instrument_id: InstrumentId,
4042        algo_id: String,
4043        new_trigger_price: Option<Price>,
4044        new_limit_price: Option<Price>,
4045        new_quantity: Option<Quantity>,
4046        new_callback_ratio: Option<String>,
4047        new_callback_spread: Option<String>,
4048        new_activation_price: Option<Price>,
4049    ) -> Result<OKXAmendAlgoOrderResponse, OKXHttpError> {
4050        let request = OKXAmendAlgoOrderRequest {
4051            inst_id: instrument_id.symbol.as_str().to_string(),
4052            algo_id,
4053            algo_cl_ord_id: None,
4054            new_sz: new_quantity.map(|q| q.to_string()),
4055            new_trigger_px: new_trigger_price.map(|p| p.to_string()),
4056            new_order_px: new_limit_price.map(|p| p.to_string()),
4057            new_callback_ratio,
4058            new_callback_spread,
4059            new_active_px: new_activation_price.map(|p| p.to_string()),
4060        };
4061
4062        self.amend_algo_order(request).await
4063    }
4064
4065    /// Places an algo order using domain types.
4066    ///
4067    /// This is a convenience method that accepts Nautilus domain types
4068    /// and builds the appropriate OKX request structure internally.
4069    ///
4070    /// # Errors
4071    ///
4072    /// Returns an error if the request fails.
4073    #[expect(clippy::too_many_arguments)]
4074    pub async fn place_order_with_domain_types(
4075        &self,
4076        instrument_id: InstrumentId,
4077        td_mode: OKXTradeMode,
4078        client_order_id: ClientOrderId,
4079        order_side: OrderSide,
4080        order_type: OrderType,
4081        quantity: Quantity,
4082        time_in_force: Option<TimeInForce>,
4083        price: Option<Price>,
4084        post_only: Option<bool>,
4085        reduce_only: Option<bool>,
4086        quote_quantity: Option<bool>,
4087        position_side: Option<PositionSide>,
4088        attach_algo_ords: Option<Vec<OKXAttachAlgoOrdRequest>>,
4089        px_usd: Option<String>,
4090        px_vol: Option<String>,
4091    ) -> Result<OKXPlaceOrderResponse, OKXHttpError> {
4092        if !OKX_SUPPORTED_ORDER_TYPES.contains(&order_type) {
4093            return Err(OKXHttpError::ValidationError(format!(
4094                "Unsupported order type: {order_type:?}",
4095            )));
4096        }
4097
4098        if matches!(
4099            order_type,
4100            OrderType::StopMarket
4101                | OrderType::StopLimit
4102                | OrderType::MarketIfTouched
4103                | OrderType::LimitIfTouched
4104                | OrderType::TrailingStopMarket
4105        ) {
4106            return Err(OKXHttpError::ValidationError(
4107                "Conditional order types must use OKX algo order placement".to_string(),
4108            ));
4109        }
4110
4111        if let Some(tif) = time_in_force
4112            && !OKX_SUPPORTED_TIME_IN_FORCE.contains(&tif)
4113        {
4114            return Err(OKXHttpError::ValidationError(format!(
4115                "Unsupported time in force: {tif:?}",
4116            )));
4117        }
4118
4119        if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
4120            return Err(OKXHttpError::ValidationError(
4121                "Invalid order side".to_string(),
4122            ));
4123        }
4124
4125        let instrument = self
4126            .instrument_from_cache(instrument_id.symbol.inner())
4127            .map_err(|e| OKXHttpError::ValidationError(e.to_string()))?;
4128        let instrument_type = okx_instrument_type(&instrument)
4129            .map_err(|e| OKXHttpError::ValidationError(e.to_string()))?;
4130
4131        // OKX options only support limit-style orders
4132        if instrument_type == OKXInstrumentType::Option
4133            && matches!(order_type, OrderType::Market | OrderType::MarketToLimit)
4134        {
4135            return Err(OKXHttpError::ValidationError(
4136                "Market orders are not supported for OKX options, use Limit orders instead"
4137                    .to_string(),
4138            ));
4139        }
4140
4141        let side = OKXSide::from(order_side.as_specified());
4142        let pos_side = position_side.map(Into::into).or({
4143            if matches!(
4144                instrument_type,
4145                OKXInstrumentType::Swap | OKXInstrumentType::Futures | OKXInstrumentType::Option
4146            ) {
4147                Some(OKXPositionSide::Net)
4148            } else {
4149                None
4150            }
4151        });
4152
4153        let tgt_ccy = if instrument_type == OKXInstrumentType::Spot
4154            && order_type == OrderType::Market
4155            && td_mode == OKXTradeMode::Cash
4156        {
4157            match quote_quantity {
4158                Some(true) => Some(OKXTargetCurrency::QuoteCcy),
4159                Some(false) if order_side == OrderSide::Buy => Some(OKXTargetCurrency::BaseCcy),
4160                _ => None,
4161            }
4162        } else {
4163            None
4164        };
4165
4166        let (ord_type, px) = if post_only.unwrap_or(false) {
4167            (OKXOrderType::PostOnly, price)
4168        } else if let Some(tif) = time_in_force {
4169            match (order_type, tif) {
4170                (OrderType::Market, TimeInForce::Fok) => {
4171                    return Err(OKXHttpError::ValidationError(
4172                        "Market orders with FOK time-in-force are not supported by OKX. Use Limit order with FOK instead.".to_string(),
4173                    ));
4174                }
4175                (OrderType::Market, TimeInForce::Ioc) => {
4176                    // optimal_limit_ioc only works for SWAP/FUTURES
4177                    if matches!(
4178                        instrument_type,
4179                        OKXInstrumentType::Spot | OKXInstrumentType::Option
4180                    ) {
4181                        (OKXOrderType::Market, price)
4182                    } else {
4183                        (OKXOrderType::OptimalLimitIoc, price)
4184                    }
4185                }
4186                (OrderType::Limit, TimeInForce::Fok) => {
4187                    // OKX uses op_fok for options FOK orders
4188                    if instrument_type == OKXInstrumentType::Option {
4189                        (OKXOrderType::OpFok, price)
4190                    } else {
4191                        (OKXOrderType::Fok, price)
4192                    }
4193                }
4194                (OrderType::Limit, TimeInForce::Ioc) => (OKXOrderType::Ioc, price),
4195                _ => (OKXOrderType::from(order_type), price),
4196            }
4197        } else {
4198            (OKXOrderType::from(order_type), price)
4199        };
4200
4201        // reduceOnly is not applicable to options per OKX docs
4202        let reduce_only = if instrument_type == OKXInstrumentType::Option {
4203            None
4204        } else {
4205            reduce_only
4206        };
4207
4208        // For options: pxUsd/pxVol are mutually exclusive with px
4209        let (px, px_usd, px_vol) = if px_usd.is_some() {
4210            (None, px_usd, None)
4211        } else if px_vol.is_some() {
4212            (None, None, px_vol)
4213        } else {
4214            (px.map(|p| p.to_string()), None, None)
4215        };
4216
4217        let request = OKXPlaceOrderRequest {
4218            inst_id: instrument_id.symbol.as_str().to_string(),
4219            td_mode,
4220            ccy: None,
4221            cl_ord_id: Some(client_order_id.as_str().to_string()),
4222            tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
4223            side,
4224            pos_side,
4225            ord_type,
4226            sz: quantity.to_string(),
4227            px,
4228            px_usd,
4229            px_vol,
4230            reduce_only,
4231            tgt_ccy,
4232            attach_algo_ords,
4233        };
4234
4235        self.place_order(request).await
4236    }
4237
4238    /// Places an algo order using domain types.
4239    ///
4240    /// This is a convenience method that accepts Nautilus domain types
4241    /// and builds the appropriate OKX request structure internally.
4242    ///
4243    /// # Errors
4244    ///
4245    /// Returns an error if the request fails.
4246    #[expect(clippy::too_many_arguments)]
4247    pub async fn place_algo_order_with_domain_types(
4248        &self,
4249        instrument_id: InstrumentId,
4250        td_mode: OKXTradeMode,
4251        client_order_id: ClientOrderId,
4252        order_side: OrderSide,
4253        order_type: OrderType,
4254        quantity: Quantity,
4255        trigger_price: Option<Price>,
4256        trigger_type: Option<TriggerType>,
4257        limit_price: Option<Price>,
4258        reduce_only: Option<bool>,
4259        close_fraction: Option<String>,
4260        callback_ratio: Option<String>,
4261        callback_spread: Option<String>,
4262        activation_price: Option<Price>,
4263    ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
4264        if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
4265            return Err(OKXHttpError::ValidationError(
4266                "Invalid order side".to_string(),
4267            ));
4268        }
4269
4270        let okx_side = OKXSide::from(order_side.as_specified());
4271
4272        // Map trigger type to OKX format
4273        let trigger_px_type_enum = trigger_type.map_or(OKXTriggerType::Last, Into::into);
4274
4275        let uses_close_fraction = close_fraction.is_some();
4276        let (
4277            algo_type,
4278            sz,
4279            trigger_px,
4280            order_px,
4281            trigger_px_type,
4282            sl_trigger_px,
4283            sl_ord_px,
4284            sl_trigger_px_type,
4285            tp_trigger_px,
4286            tp_ord_px,
4287            tp_trigger_px_type,
4288            pos_side,
4289            reduce_only,
4290        ) = if uses_close_fraction {
4291            if order_type == OrderType::TrailingStopMarket {
4292                return Err(OKXHttpError::ValidationError(
4293                    "OKX close_fraction does not support TrailingStopMarket".to_string(),
4294                ));
4295            }
4296
4297            let trigger_px = trigger_price.map(|p| p.to_string()).ok_or_else(|| {
4298                OKXHttpError::ValidationError(
4299                    "OKX close_fraction orders require trigger_price".to_string(),
4300                )
4301            })?;
4302
4303            let close_order_px =
4304                if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
4305                    limit_price.map(|p| p.to_string()).ok_or_else(|| {
4306                        OKXHttpError::ValidationError(format!(
4307                            "OKX {order_type:?} close_fraction orders require limit_price"
4308                        ))
4309                    })?
4310                } else {
4311                    "-1".to_string()
4312                };
4313
4314            let (
4315                sl_trigger_px,
4316                sl_ord_px,
4317                sl_trigger_px_type,
4318                tp_trigger_px,
4319                tp_ord_px,
4320                tp_trigger_px_type,
4321            ) = match order_type {
4322                OrderType::StopMarket | OrderType::StopLimit => (
4323                    Some(trigger_px),
4324                    Some(close_order_px),
4325                    Some(trigger_px_type_enum),
4326                    None,
4327                    None,
4328                    None,
4329                ),
4330                OrderType::MarketIfTouched | OrderType::LimitIfTouched => (
4331                    None,
4332                    None,
4333                    None,
4334                    Some(trigger_px),
4335                    Some(close_order_px),
4336                    Some(trigger_px_type_enum),
4337                ),
4338                _ => {
4339                    return Err(OKXHttpError::ValidationError(format!(
4340                        "OKX close_fraction is only supported for stop/touched conditional orders, received {order_type:?}"
4341                    )));
4342                }
4343            };
4344
4345            (
4346                OKXAlgoOrderType::Conditional,
4347                None,
4348                None,
4349                None,
4350                None,
4351                sl_trigger_px,
4352                sl_ord_px,
4353                sl_trigger_px_type,
4354                tp_trigger_px,
4355                tp_ord_px,
4356                tp_trigger_px_type,
4357                Some(OKXPositionSide::Net),
4358                Some(true),
4359            )
4360        } else {
4361            let algo_type = conditional_order_to_algo_type(order_type)
4362                .map_err(|e| OKXHttpError::ValidationError(e.to_string()))?;
4363
4364            let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched)
4365            {
4366                limit_price.map(|p| p.to_string())
4367            } else if order_type == OrderType::TrailingStopMarket {
4368                None
4369            } else {
4370                Some("-1".to_string())
4371            };
4372
4373            (
4374                algo_type,
4375                Some(quantity.to_string()),
4376                trigger_price.map(|p| p.to_string()),
4377                order_px,
4378                Some(trigger_px_type_enum),
4379                None,
4380                None,
4381                None,
4382                None,
4383                None,
4384                None,
4385                None,
4386                reduce_only,
4387            )
4388        };
4389
4390        let request = OKXPlaceAlgoOrderRequest {
4391            inst_id: instrument_id.symbol.as_str().to_string(),
4392            inst_id_code: None,
4393            td_mode,
4394            side: okx_side,
4395            ord_type: algo_type,
4396            sz,
4397            algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
4398            trigger_px,
4399            order_px,
4400            trigger_px_type,
4401            sl_trigger_px,
4402            sl_ord_px,
4403            sl_trigger_px_type,
4404            tp_trigger_px,
4405            tp_ord_px,
4406            tp_trigger_px_type,
4407            tgt_ccy: None,
4408            pos_side,
4409            close_position: None,
4410            tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
4411            reduce_only,
4412            close_fraction,
4413            callback_ratio,
4414            callback_spread,
4415            active_px: activation_price.map(|p| p.to_string()),
4416        };
4417
4418        self.place_algo_order(request).await
4419    }
4420
4421    /// Cancels an algo order using domain types.
4422    ///
4423    /// This is a convenience method that accepts Nautilus domain types
4424    /// and builds the appropriate OKX request structure internally.
4425    ///
4426    /// # Errors
4427    ///
4428    /// Returns an error if the request fails.
4429    pub async fn cancel_algo_order_with_domain_types(
4430        &self,
4431        instrument_id: InstrumentId,
4432        algo_id: String,
4433    ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
4434        let request = OKXCancelAlgoOrderRequest {
4435            inst_id: instrument_id.symbol.to_string(),
4436            inst_id_code: None,
4437            algo_id: Some(algo_id),
4438            algo_cl_ord_id: None,
4439        };
4440
4441        self.cancel_algo_order(request).await
4442    }
4443
4444    /// Requests algo order status reports.
4445    ///
4446    /// # Errors
4447    ///
4448    /// Returns an error if the request fails.
4449    #[expect(clippy::too_many_arguments)]
4450    pub async fn request_algo_order_status_reports(
4451        &self,
4452        account_id: AccountId,
4453        instrument_type: Option<OKXInstrumentType>,
4454        instrument_id: Option<InstrumentId>,
4455        algo_id: Option<String>,
4456        algo_client_order_id: Option<ClientOrderId>,
4457        state: Option<OKXOrderStatus>,
4458        limit: Option<u32>,
4459    ) -> anyhow::Result<Vec<OrderStatusReport>> {
4460        let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
4461        let has_specific_lookup = algo_id.is_some() || algo_client_order_id.is_some();
4462
4463        let inst_type = if let Some(inst_type) = instrument_type {
4464            inst_type
4465        } else if let Some(inst_id) = instrument_id {
4466            let instrument = self.instrument_from_cache(inst_id.symbol.inner())?;
4467            let inst_type = okx_instrument_type(&instrument)?;
4468            instruments_cache.insert(inst_id.symbol.inner(), instrument);
4469            inst_type
4470        } else {
4471            anyhow::bail!("instrument_type or instrument_id required for algo order query")
4472        };
4473
4474        let ts_init = self.generate_ts_init();
4475        let mut reports = Vec::new();
4476        let mut seen: AHashSet<(String, String)> = AHashSet::new();
4477
4478        for ord_type in [
4479            OKXAlgoOrderType::Oco,
4480            OKXAlgoOrderType::Conditional,
4481            OKXAlgoOrderType::Trigger,
4482            OKXAlgoOrderType::MoveOrderStop,
4483        ] {
4484            let mut params_builder = GetAlgoOrdersParamsBuilder::default();
4485            params_builder.inst_type(inst_type);
4486            params_builder.ord_type(ord_type);
4487
4488            if let Some(inst_id) = instrument_id {
4489                params_builder.inst_id(inst_id.symbol.inner().to_string());
4490            }
4491
4492            if let Some(algo_id) = algo_id.as_ref() {
4493                params_builder.algo_id(algo_id.clone());
4494            }
4495
4496            if let Some(client_order_id) = algo_client_order_id.as_ref() {
4497                params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
4498            }
4499
4500            if let Some(state) = state {
4501                params_builder.state(state);
4502            }
4503
4504            let params = params_builder
4505                .build()
4506                .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
4507
4508            let remaining = limit.map(|l| (l as usize).saturating_sub(reports.len()));
4509            let pending = self.paginate_algo_pending(&params, remaining).await?;
4510            self.collect_algo_reports(
4511                account_id,
4512                &pending,
4513                &mut instruments_cache,
4514                ts_init,
4515                &mut seen,
4516                &mut reports,
4517            )
4518            .await?;
4519
4520            if has_specific_lookup && !reports.is_empty() {
4521                return Ok(reports);
4522            }
4523
4524            if let Some(lim) = limit
4525                && reports.len() >= lim as usize
4526            {
4527                reports.truncate(lim as usize);
4528                return Ok(reports);
4529            }
4530
4531            // OKX's `/orders-algo-history` endpoint rejects calls that
4532            // carry neither a `state` nor an `algoId` / `algoClOrdId`
4533            // narrowing with code 50015. The reconciliation path wants
4534            // only currently-live algo orders (those already appear in
4535            // the pending response above), so skip the history leg when
4536            // the caller supplied no narrowing. Specific-lookup callers
4537            // still hit history because `has_specific_lookup` implies
4538            // `algoId` or `algoClOrdId`, which the endpoint accepts.
4539            if state.is_some() || has_specific_lookup {
4540                let remaining = limit.map(|l| (l as usize).saturating_sub(reports.len()));
4541                let history = self.paginate_algo_history(&params, remaining).await?;
4542                self.collect_algo_reports(
4543                    account_id,
4544                    &history,
4545                    &mut instruments_cache,
4546                    ts_init,
4547                    &mut seen,
4548                    &mut reports,
4549                )
4550                .await?;
4551
4552                if has_specific_lookup && !reports.is_empty() {
4553                    return Ok(reports);
4554                }
4555
4556                if let Some(lim) = limit
4557                    && reports.len() >= lim as usize
4558                {
4559                    reports.truncate(lim as usize);
4560                    return Ok(reports);
4561                }
4562            }
4563        }
4564
4565        Ok(reports)
4566    }
4567
4568    /// Requests an algo order status report by client order identifier.
4569    ///
4570    /// # Errors
4571    ///
4572    /// Returns an error if the request fails.
4573    pub async fn request_algo_order_status_report(
4574        &self,
4575        account_id: AccountId,
4576        instrument_id: InstrumentId,
4577        algo_client_order_id: ClientOrderId,
4578    ) -> anyhow::Result<Option<OrderStatusReport>> {
4579        let reports = self
4580            .request_algo_order_status_reports(
4581                account_id,
4582                None,
4583                Some(instrument_id),
4584                None,
4585                Some(algo_client_order_id),
4586                None,
4587                Some(50_u32),
4588            )
4589            .await?;
4590
4591        Ok(reports.into_iter().next())
4592    }
4593
4594    /// Exposes raw HTTP client for testing purposes
4595    pub fn raw_client(&self) -> &Arc<OKXRawHttpClient> {
4596        &self.inner
4597    }
4598
4599    async fn collect_algo_reports(
4600        &self,
4601        account_id: AccountId,
4602        orders: &[OKXOrderAlgo],
4603        instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
4604        ts_init: UnixNanos,
4605        seen: &mut AHashSet<(String, String)>,
4606        reports: &mut Vec<OrderStatusReport>,
4607    ) -> anyhow::Result<()> {
4608        for order in orders {
4609            let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
4610            if !seen.insert(key) {
4611                continue;
4612            }
4613
4614            let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
4615                instrument.clone()
4616            } else {
4617                let Ok(instrument) = self.instrument_from_cache(order.inst_id) else {
4618                    log::debug!(
4619                        "Skipping algo order report for instrument not in cache: symbol={}",
4620                        order.inst_id,
4621                    );
4622                    continue;
4623                };
4624                instruments_cache.insert(order.inst_id, instrument.clone());
4625                instrument
4626            };
4627
4628            match parse_http_algo_order(order, account_id, &instrument, ts_init) {
4629                Ok(report) => reports.push(report),
4630                Err(e) => {
4631                    log::error!("Failed to parse algo order report: {e}");
4632                }
4633            }
4634        }
4635
4636        Ok(())
4637    }
4638}
4639
4640fn parse_http_algo_order(
4641    order: &OKXOrderAlgo,
4642    account_id: AccountId,
4643    instrument: &InstrumentAny,
4644    ts_init: UnixNanos,
4645) -> anyhow::Result<OrderStatusReport> {
4646    let ord_px = if order.ord_px.is_empty() {
4647        "-1".to_string()
4648    } else {
4649        order.ord_px.clone()
4650    };
4651
4652    let reduce_only = if order.reduce_only.is_empty() {
4653        "false".to_string()
4654    } else {
4655        order.reduce_only.clone()
4656    };
4657
4658    let msg = OKXAlgoOrderMsg {
4659        algo_id: order.algo_id.clone(),
4660        algo_cl_ord_id: order.algo_cl_ord_id.clone(),
4661        cl_ord_id: order.cl_ord_id.clone(),
4662        ord_id: order.ord_id.clone(),
4663        inst_id: order.inst_id,
4664        inst_type: order.inst_type,
4665        ord_type: order.ord_type,
4666        state: order.state,
4667        side: order.side,
4668        pos_side: order.pos_side,
4669        sz: order.sz.clone(),
4670        trigger_px: order.trigger_px.clone(),
4671        trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
4672        sl_trigger_px: order.sl_trigger_px.clone(),
4673        sl_ord_px: order.sl_ord_px.clone(),
4674        sl_trigger_px_type: order.sl_trigger_px_type.unwrap_or(OKXTriggerType::None),
4675        tp_trigger_px: order.tp_trigger_px.clone(),
4676        tp_ord_px: order.tp_ord_px.clone(),
4677        tp_trigger_px_type: order.tp_trigger_px_type.unwrap_or(OKXTriggerType::None),
4678        ord_px,
4679        td_mode: order.td_mode,
4680        lever: order.lever.clone(),
4681        reduce_only,
4682        close_fraction: order.close_fraction.clone(),
4683        actual_px: order.actual_px.clone(),
4684        actual_sz: order.actual_sz.clone(),
4685        notional_usd: order.notional_usd.clone(),
4686        c_time: order.c_time,
4687        u_time: order.u_time,
4688        trigger_time: order.trigger_time.clone(),
4689        tag: order.tag.clone(),
4690        callback_ratio: order.callback_ratio.clone(),
4691        callback_spread: order.callback_spread.clone(),
4692        active_px: order.active_px.clone(),
4693        ccy: None,
4694        tgt_ccy: None,
4695        fee: None,
4696        fee_ccy: None,
4697        advance_ord_type: None,
4698    };
4699
4700    parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
4701}