1use std::{
37 collections::HashMap,
38 fmt::Debug,
39 num::NonZeroU32,
40 str::FromStr,
41 sync::{
42 Arc, LazyLock,
43 atomic::{AtomicBool, Ordering},
44 },
45};
46
47use ahash::{AHashMap, AHashSet};
48use anyhow::Context;
49use chrono::{DateTime, Utc};
50use nautilus_core::{
51 AtomicMap, AtomicTime, UnixNanos, consts::NAUTILUS_USER_AGENT,
52 datetime::NANOSECONDS_IN_MILLISECOND, env::get_or_env_var, string::secret::REDACTED,
53 time::get_atomic_clock_realtime,
54};
55use nautilus_model::{
56 data::{
57 Bar, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate,
58 OrderBookDelta, OrderBookDeltas, TradeTick, forward::ForwardPrice,
59 },
60 enums::{
61 AggregationSource, BarAggregation, BookAction, BookType, OrderSide, OrderType,
62 PositionSide, RecordFlag, TimeInForce, TriggerType,
63 },
64 events::AccountState,
65 identifiers::{AccountId, ClientOrderId, InstrumentId},
66 instruments::{Instrument, InstrumentAny},
67 orderbook::OrderBook,
68 reports::{FillReport, OrderStatusReport, PositionStatusReport},
69 types::{Price, Quantity},
70};
71use nautilus_network::{
72 http::{HttpClient, Method, StatusCode, USER_AGENT},
73 ratelimiter::quota::Quota,
74 retry::{RetryConfig, RetryManager},
75};
76use rust_decimal::Decimal;
77use serde::{Deserialize, Serialize, de::DeserializeOwned};
78use tokio_util::sync::CancellationToken;
79use ustr::Ustr;
80
81use super::{
82 error::OKXHttpError,
83 models::{
84 OKXAccount, OKXAmendAlgoOrderRequest, OKXAmendAlgoOrderResponse, OKXAttachAlgoOrdRequest,
85 OKXCancelAlgoOrderRequest, OKXCancelAlgoOrderResponse, OKXFeeRate, OKXFundingRateHistory,
86 OKXIndexTicker, OKXMarkPrice, OKXOptionSummary, OKXOrderAlgo, OKXOrderBookSnapshot,
87 OKXOrderHistory, OKXPlaceAlgoOrderRequest, OKXPlaceAlgoOrderResponse, OKXPlaceOrderRequest,
88 OKXPlaceOrderResponse, OKXPosition, OKXPositionHistory, OKXPositionTier, OKXServerTime,
89 OKXTransactionDetail,
90 },
91 query::{
92 GetAlgoOrdersParams, GetAlgoOrdersParamsBuilder, GetCandlesticksParams,
93 GetCandlesticksParamsBuilder, GetFundingRateHistoryParams, GetIndexTickerParams,
94 GetIndexTickerParamsBuilder, GetInstrumentsParams, GetInstrumentsParamsBuilder,
95 GetMarkPriceParams, GetMarkPriceParamsBuilder, GetOptionSummaryParams, GetOrderBookParams,
96 GetOrderHistoryParams, GetOrderHistoryParamsBuilder, GetOrderListParams,
97 GetOrderListParamsBuilder, GetPositionTiersParams, GetPositionsHistoryParams,
98 GetPositionsParams, GetPositionsParamsBuilder, GetTradeFeeParams, GetTradesParams,
99 GetTradesParamsBuilder, GetTransactionDetailsParams, GetTransactionDetailsParamsBuilder,
100 SetPositionModeParams, SetPositionModeParamsBuilder,
101 },
102};
103use crate::{
104 common::{
105 consts::{
106 OKX_FIELD_SCODE, OKX_FIELD_SMSG, OKX_HTTP_URL, OKX_NAUTILUS_BROKER_ID,
107 OKX_SUPPORTED_ORDER_TYPES, OKX_SUPPORTED_TIME_IN_FORCE, should_retry_error_code,
108 },
109 credential::Credential,
110 enums::{
111 OKXAlgoOrderType, OKXContractType, OKXEnvironment, OKXInstrumentStatus,
112 OKXInstrumentType, OKXOrderStatus, OKXOrderType, OKXPositionMode, OKXPositionSide,
113 OKXSide, OKXTargetCurrency, OKXTradeMode, OKXTriggerType,
114 conditional_order_to_algo_type,
115 },
116 models::OKXInstrument,
117 parse::{
118 extract_inst_family, okx_instrument_type, okx_instrument_type_from_symbol,
119 parse_account_state, parse_base_quote_from_symbol, parse_candlestick,
120 parse_fill_report, parse_funding_rate, parse_index_price_update, parse_instrument_any,
121 parse_instrument_id, parse_mark_price_update, parse_order_status_report,
122 parse_position_status_report, parse_price, parse_quantity,
123 parse_spot_margin_position_from_balance, parse_trade_tick,
124 },
125 },
126 http::{
127 models::{OKXCandlestick, OKXTrade},
128 query::GetOrderParams,
129 },
130 websocket::{messages::OKXAlgoOrderMsg, parse::parse_algo_order_status_report},
131};
132
133const OKX_SUCCESS_CODE: &str = "0";
134
135fn spot_quote_priority(symbol: &str) -> u8 {
140 symbol.rsplit_once('-').map_or(4, |(_, quote)| match quote {
141 "USDT" => 0,
142 "USDC" => 1,
143 "USD" => 2,
144 _ => 3,
145 })
146}
147
148fn resolve_okx_error_message(response_body: &[u8], top_level_msg: &str) -> String {
149 let message = top_level_msg.trim();
150 let is_generic_top_level = message.eq_ignore_ascii_case("All operations failed");
151 if !message.is_empty() && !is_generic_top_level {
152 return message.to_string();
153 }
154
155 if let Ok(payload) = serde_json::from_slice::<serde_json::Value>(response_body)
156 && let Some(first_item) = payload
157 .get("data")
158 .and_then(serde_json::Value::as_array)
159 .and_then(|items| items.first())
160 {
161 if let Some(s_msg) = first_item
162 .get(OKX_FIELD_SMSG)
163 .and_then(serde_json::Value::as_str)
164 {
165 let s_msg = s_msg.trim();
166 if !s_msg.is_empty() {
167 return s_msg.to_string();
168 }
169 }
170
171 if let Some(s_code) = first_item
172 .get(OKX_FIELD_SCODE)
173 .and_then(serde_json::Value::as_str)
174 {
175 let s_code = s_code.trim();
176 if !s_code.is_empty() {
177 return s_code.to_string();
178 }
179 }
180 }
181
182 String::new()
183}
184
185#[cfg(test)]
186mod tests {
187 use rstest::rstest;
188
189 use super::resolve_okx_error_message;
190
191 #[rstest]
192 fn test_resolve_okx_error_message_prefers_detailed_s_msg_over_generic_top_level() {
193 let body = br#"{
194 "code": "1",
195 "msg": "All operations failed",
196 "data": [
197 {
198 "sCode": "51046",
199 "sMsg": "Test detailed failure"
200 }
201 ]
202 }"#;
203
204 assert_eq!(
205 resolve_okx_error_message(body, "All operations failed"),
206 "Test detailed failure",
207 );
208 }
209
210 #[rstest]
211 #[case("BTC-USD")]
212 #[case("BTC-USD-241217")]
213 #[case("BTC-USD-241217-92000")]
214 fn test_option_summary_expiry_key_rejects_short_symbol(#[case] symbol: &str) {
215 let result = super::OKXHttpClient::option_summary_expiry_key(symbol);
216 assert!(result.is_err());
217 let err = result.unwrap_err().to_string();
218 assert!(
219 err.contains("Expected OKX option symbol with expiry"),
220 "unexpected error: {err}"
221 );
222 }
223
224 #[rstest]
225 fn test_option_summary_expiry_key_extracts_base_quote_expiry() {
226 let result =
227 super::OKXHttpClient::option_summary_expiry_key("BTC-USD-241217-92000-C").unwrap();
228 assert_eq!(result, "BTC-USD-241217");
229 }
230
231 #[rstest]
232 #[case("BTC-USD")]
233 #[case("BTC-USD-241217")]
234 #[case("BTC-USD-241217-92000")]
235 fn test_option_summary_exp_time_rejects_short_symbol(#[case] symbol: &str) {
236 let result = super::OKXHttpClient::option_summary_exp_time(symbol);
237 assert!(result.is_err());
238 let err = result.unwrap_err().to_string();
239 assert!(
240 err.contains("Expected OKX option symbol with expiry"),
241 "unexpected error: {err}"
242 );
243 }
244
245 #[rstest]
246 fn test_option_summary_exp_time_extracts_expiry() {
247 let result =
248 super::OKXHttpClient::option_summary_exp_time("BTC-USD-241217-92000-C").unwrap();
249 assert_eq!(result, Some("241217".to_string()));
250 }
251}
252
253pub static OKX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
262 Quota::per_second(NonZeroU32::new(250).expect("non-zero")).expect("valid constant")
263});
264
265const OKX_GLOBAL_RATE_KEY: &str = "okx:global";
266
267const OKX_PAGE_SIZE: usize = 100;
269
270const MAX_RECONCILIATION_PAGES: usize = 50;
272
273#[derive(Debug, Serialize, Deserialize)]
275pub struct OKXResponse<T> {
276 pub code: String,
278 pub msg: String,
280 pub data: Vec<T>,
282}
283
284pub struct OKXRawHttpClient {
290 base_url: String,
291 client: HttpClient,
292 credential: Option<Credential>,
293 retry_manager: RetryManager<OKXHttpError>,
294 cancellation_token: CancellationToken,
295 environment: OKXEnvironment,
296}
297
298impl Default for OKXRawHttpClient {
299 fn default() -> Self {
300 Self::new(None, 60, 3, 1000, 10_000, OKXEnvironment::Live, None)
301 .expect("Failed to create default OKXRawHttpClient")
302 }
303}
304
305impl Debug for OKXRawHttpClient {
306 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307 let credential = self.credential.as_ref().map(|_| REDACTED);
308 f.debug_struct(stringify!(OKXRawHttpClient))
309 .field("base_url", &self.base_url)
310 .field("credential", &credential)
311 .finish_non_exhaustive()
312 }
313}
314
315impl OKXRawHttpClient {
316 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
317 vec![
318 (OKX_GLOBAL_RATE_KEY.to_string(), *OKX_REST_QUOTA),
319 (
320 "okx:/api/v5/account/balance".to_string(),
321 Quota::per_second(NonZeroU32::new(5).expect("non-zero")).expect("valid constant"),
322 ),
323 (
324 "okx:/api/v5/public/instruments".to_string(),
325 Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant"),
326 ),
327 (
328 "okx:/api/v5/market/candles".to_string(),
329 Quota::per_second(NonZeroU32::new(50).expect("non-zero")).expect("valid constant"),
330 ),
331 (
332 "okx:/api/v5/market/history-candles".to_string(),
333 Quota::per_second(NonZeroU32::new(20).expect("non-zero")).expect("valid constant"),
334 ),
335 (
336 "okx:/api/v5/market/history-trades".to_string(),
337 Quota::per_second(NonZeroU32::new(30).expect("non-zero")).expect("valid constant"),
338 ),
339 (
340 "okx:/api/v5/trade/order".to_string(),
341 Quota::per_second(NonZeroU32::new(30).expect("non-zero")).expect("valid constant"), ),
343 (
344 "okx:/api/v5/trade/orders-pending".to_string(),
345 Quota::per_second(NonZeroU32::new(20).expect("non-zero")).expect("valid constant"),
346 ),
347 (
348 "okx:/api/v5/trade/orders-history".to_string(),
349 Quota::per_second(NonZeroU32::new(20).expect("non-zero")).expect("valid constant"),
350 ),
351 (
352 "okx:/api/v5/trade/fills".to_string(),
353 Quota::per_second(NonZeroU32::new(30).expect("non-zero")).expect("valid constant"),
354 ),
355 (
356 "okx:/api/v5/trade/order-algo".to_string(),
357 Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant"),
358 ),
359 (
360 "okx:/api/v5/trade/cancel-algos".to_string(),
361 Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant"),
362 ),
363 (
364 "okx:/api/v5/trade/amend-algos".to_string(),
365 Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant"),
366 ),
367 ]
368 }
369
370 fn rate_limit_keys(endpoint: &str) -> Vec<Ustr> {
371 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
372 let route = format!("okx:{normalized}");
373
374 vec![Ustr::from(OKX_GLOBAL_RATE_KEY), Ustr::from(route.as_str())]
375 }
376
377 pub fn cancel_all_requests(&self) {
379 self.cancellation_token.cancel();
380 }
381
382 pub fn cancellation_token(&self) -> &CancellationToken {
384 &self.cancellation_token
385 }
386
387 pub fn new(
397 base_url: Option<String>,
398 timeout_secs: u64,
399 max_retries: u32,
400 retry_delay_ms: u64,
401 retry_delay_max_ms: u64,
402 environment: OKXEnvironment,
403 proxy_url: Option<String>,
404 ) -> Result<Self, OKXHttpError> {
405 let retry_config = RetryConfig {
406 max_retries,
407 initial_delay_ms: retry_delay_ms,
408 max_delay_ms: retry_delay_max_ms,
409 backoff_factor: 2.0,
410 jitter_ms: 1000,
411 operation_timeout_ms: Some(60_000),
412 immediate_first: false,
413 max_elapsed_ms: Some(180_000),
414 };
415
416 let retry_manager = RetryManager::new(retry_config);
417
418 Ok(Self {
419 base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
420 client: HttpClient::new(
421 Self::default_headers(environment),
422 vec![],
423 Self::rate_limiter_quotas(),
424 Some(*OKX_REST_QUOTA),
425 Some(timeout_secs),
426 proxy_url,
427 )
428 .map_err(|e| {
429 OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
430 })?,
431 credential: None,
432 retry_manager,
433 cancellation_token: CancellationToken::new(),
434 environment,
435 })
436 }
437
438 #[expect(clippy::too_many_arguments)]
445 pub fn with_credentials(
446 api_key: String,
447 api_secret: String,
448 api_passphrase: String,
449 base_url: String,
450 timeout_secs: u64,
451 max_retries: u32,
452 retry_delay_ms: u64,
453 retry_delay_max_ms: u64,
454 environment: OKXEnvironment,
455 proxy_url: Option<String>,
456 ) -> Result<Self, OKXHttpError> {
457 let retry_config = RetryConfig {
458 max_retries,
459 initial_delay_ms: retry_delay_ms,
460 max_delay_ms: retry_delay_max_ms,
461 backoff_factor: 2.0,
462 jitter_ms: 1000,
463 operation_timeout_ms: Some(60_000),
464 immediate_first: false,
465 max_elapsed_ms: Some(180_000),
466 };
467
468 let retry_manager = RetryManager::new(retry_config);
469
470 Ok(Self {
471 base_url,
472 client: HttpClient::new(
473 Self::default_headers(environment),
474 vec![],
475 Self::rate_limiter_quotas(),
476 Some(*OKX_REST_QUOTA),
477 Some(timeout_secs),
478 proxy_url,
479 )
480 .map_err(|e| {
481 OKXHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
482 })?,
483 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
484 retry_manager,
485 cancellation_token: CancellationToken::new(),
486 environment,
487 })
488 }
489
490 fn default_headers(environment: OKXEnvironment) -> HashMap<String, String> {
492 let mut headers =
493 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())]);
494
495 if environment == OKXEnvironment::Demo {
496 headers.insert("x-simulated-trading".to_string(), "1".to_string());
497 }
498
499 headers
500 }
501
502 fn sign_request(
509 &self,
510 method: &Method,
511 path: &str,
512 body: Option<&[u8]>,
513 ) -> Result<HashMap<String, String>, OKXHttpError> {
514 let credential = match self.credential.as_ref() {
515 Some(c) => c,
516 None => return Err(OKXHttpError::MissingCredentials),
517 };
518
519 let api_key = credential.api_key().to_string();
520 let api_passphrase = credential.api_passphrase().to_string();
521
522 let now = Utc::now();
524 let millis = now.timestamp_subsec_millis();
525 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string() + &format!(".{millis:03}Z");
526 let signature = credential.sign_bytes(×tamp, method.as_str(), path, body);
527
528 let mut headers = HashMap::new();
529 headers.insert("OK-ACCESS-KEY".to_string(), api_key);
530 headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
531 headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
532 headers.insert("OK-ACCESS-SIGN".to_string(), signature);
533
534 Ok(headers)
535 }
536
537 async fn send_request<T: DeserializeOwned, P: Serialize>(
553 &self,
554 method: Method,
555 path: &str,
556 params: Option<&P>,
557 body: Option<Vec<u8>>,
558 authenticate: bool,
559 ) -> Result<Vec<T>, OKXHttpError> {
560 let url = format!("{}{path}", self.base_url);
561
562 let rate_keys: Vec<String> = Self::rate_limit_keys(path)
564 .into_iter()
565 .map(|k| k.to_string())
566 .collect();
567
568 let operation = || {
569 let url = url.clone();
570 let method = method.clone();
571 let body = body.clone();
572 let rate_keys = rate_keys.clone();
573
574 async move {
575 let query_string = if let Some(p) = params {
577 serde_urlencoded::to_string(p).map_err(|e| {
578 OKXHttpError::JsonError(format!("Failed to serialize params: {e}"))
579 })?
580 } else {
581 String::new()
582 };
583
584 let full_path = if query_string.is_empty() {
586 path.to_string()
587 } else {
588 format!("{path}?{query_string}")
589 };
590
591 let mut headers = if authenticate {
592 self.sign_request(&method, &full_path, body.as_deref())?
593 } else {
594 HashMap::new()
595 };
596
597 if body.is_some() {
599 headers.insert("Content-Type".to_string(), "application/json".to_string());
600 }
601
602 let resp = self
603 .client
604 .request_with_params(
605 method.clone(),
606 url,
607 params,
608 Some(headers),
609 body,
610 None,
611 Some(rate_keys),
612 )
613 .await?;
614
615 log::trace!("Response: {resp:?}");
616
617 if resp.status.is_success() {
618 let okx_response: OKXResponse<T> =
619 serde_json::from_slice(&resp.body).map_err(|e| {
620 log::error!("Failed to deserialize OKXResponse: {e}");
621 OKXHttpError::JsonError(e.to_string())
622 })?;
623
624 if okx_response.code != OKX_SUCCESS_CODE {
625 return Err(OKXHttpError::OkxError {
626 error_code: okx_response.code,
627 message: resolve_okx_error_message(&resp.body, &okx_response.msg),
628 });
629 }
630
631 Ok(okx_response.data)
632 } else {
633 let error_body = String::from_utf8_lossy(&resp.body);
634 if resp.status.as_u16() == StatusCode::NOT_FOUND.as_u16() {
635 log::debug!("HTTP 404 with body: {error_body}");
636 } else {
637 log::error!(
638 "HTTP error {} with body: {error_body}",
639 resp.status.as_str()
640 );
641 }
642
643 if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
644 return Err(OKXHttpError::OkxError {
645 error_code: parsed_error.code,
646 message: resolve_okx_error_message(&resp.body, &parsed_error.msg),
647 });
648 }
649
650 Err(OKXHttpError::UnexpectedStatus {
651 status: StatusCode::from_u16(resp.status.as_u16())
654 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
655 body: error_body.to_string(),
656 })
657 }
658 }
659 };
660
661 let should_retry = |error: &OKXHttpError| -> bool {
670 match error {
671 OKXHttpError::HttpClientError(_) => true,
672 OKXHttpError::UnexpectedStatus { status, .. } => {
673 status.as_u16() >= 500 || status.as_u16() == 429
674 }
675 OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
676 _ => false,
677 }
678 };
679
680 let create_error = |msg: String| -> OKXHttpError {
681 if msg == "canceled" {
682 OKXHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
683 } else {
684 OKXHttpError::ValidationError(msg)
685 }
686 };
687
688 self.retry_manager
689 .execute_with_retry_with_cancel(
690 path,
691 operation,
692 should_retry,
693 create_error,
694 &self.cancellation_token,
695 )
696 .await
697 }
698
699 pub async fn set_position_mode(
710 &self,
711 params: SetPositionModeParams,
712 ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
713 let path = "/api/v5/account/set-position-mode";
714 let body = serde_json::to_vec(¶ms)?;
715 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
716 .await
717 }
718
719 pub async fn get_position_tiers(
730 &self,
731 params: GetPositionTiersParams,
732 ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
733 self.send_request(
734 Method::GET,
735 "/api/v5/public/position-tiers",
736 Some(¶ms),
737 None,
738 false,
739 )
740 .await
741 }
742
743 pub async fn get_instruments(
754 &self,
755 params: GetInstrumentsParams,
756 ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
757 self.send_request(
758 Method::GET,
759 "/api/v5/public/instruments",
760 Some(¶ms),
761 None,
762 false,
763 )
764 .await
765 }
766
767 pub async fn get_option_summary(
777 &self,
778 params: GetOptionSummaryParams,
779 ) -> Result<Vec<OKXOptionSummary>, OKXHttpError> {
780 self.send_request(
781 Method::GET,
782 "/api/v5/public/opt-summary",
783 Some(¶ms),
784 None,
785 false,
786 )
787 .await
788 }
789
790 pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
804 let response: Vec<OKXServerTime> = self
805 .send_request::<_, ()>(Method::GET, "/api/v5/public/time", None, None, false)
806 .await?;
807 response
808 .first()
809 .map(|t| t.ts)
810 .ok_or_else(|| OKXHttpError::JsonError("Empty server time response".to_string()))
811 }
812
813 pub async fn get_mark_price(
827 &self,
828 params: GetMarkPriceParams,
829 ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
830 self.send_request(
831 Method::GET,
832 "/api/v5/public/mark-price",
833 Some(¶ms),
834 None,
835 false,
836 )
837 .await
838 }
839
840 pub async fn get_index_tickers(
850 &self,
851 params: GetIndexTickerParams,
852 ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
853 self.send_request(
854 Method::GET,
855 "/api/v5/market/index-tickers",
856 Some(¶ms),
857 None,
858 false,
859 )
860 .await
861 }
862
863 pub async fn get_history_trades(
873 &self,
874 params: GetTradesParams,
875 ) -> Result<Vec<OKXTrade>, OKXHttpError> {
876 self.send_request(
877 Method::GET,
878 "/api/v5/market/history-trades",
879 Some(¶ms),
880 None,
881 false,
882 )
883 .await
884 }
885
886 pub async fn get_order_book(
896 &self,
897 params: GetOrderBookParams,
898 ) -> Result<Vec<OKXOrderBookSnapshot>, OKXHttpError> {
899 self.send_request(
900 Method::GET,
901 "/api/v5/market/books",
902 Some(¶ms),
903 None,
904 false,
905 )
906 .await
907 }
908
909 pub async fn get_funding_rate_history(
919 &self,
920 params: GetFundingRateHistoryParams,
921 ) -> Result<Vec<OKXFundingRateHistory>, OKXHttpError> {
922 self.send_request(
923 Method::GET,
924 "/api/v5/public/funding-rate-history",
925 Some(¶ms),
926 None,
927 false,
928 )
929 .await
930 }
931
932 pub async fn get_candles(
942 &self,
943 params: GetCandlesticksParams,
944 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
945 self.send_request(
946 Method::GET,
947 "/api/v5/market/candles",
948 Some(¶ms),
949 None,
950 false,
951 )
952 .await
953 }
954
955 pub async fn get_history_candles(
965 &self,
966 params: GetCandlesticksParams,
967 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
968 self.send_request(
969 Method::GET,
970 "/api/v5/market/history-candles",
971 Some(¶ms),
972 None,
973 false,
974 )
975 .await
976 }
977
978 pub async fn get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
989 let path = "/api/v5/account/balance";
990 self.send_request::<_, ()>(Method::GET, path, None, None, true)
991 .await
992 }
993
994 pub async fn get_trade_fee(
1006 &self,
1007 params: GetTradeFeeParams,
1008 ) -> Result<Vec<OKXFeeRate>, OKXHttpError> {
1009 self.send_request(
1010 Method::GET,
1011 "/api/v5/account/trade-fee",
1012 Some(¶ms),
1013 None,
1014 true,
1015 )
1016 .await
1017 }
1018
1019 pub async fn get_order(
1029 &self,
1030 params: GetOrderParams,
1031 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
1032 self.send_request(
1033 Method::GET,
1034 "/api/v5/trade/order",
1035 Some(¶ms),
1036 None,
1037 true,
1038 )
1039 .await
1040 }
1041
1042 pub async fn get_orders_pending(
1052 &self,
1053 params: GetOrderListParams,
1054 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
1055 self.send_request(
1056 Method::GET,
1057 "/api/v5/trade/orders-pending",
1058 Some(¶ms),
1059 None,
1060 true,
1061 )
1062 .await
1063 }
1064
1065 pub async fn get_orders_history(
1075 &self,
1076 params: GetOrderHistoryParams,
1077 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
1078 self.send_request(
1079 Method::GET,
1080 "/api/v5/trade/orders-history",
1081 Some(¶ms),
1082 None,
1083 true,
1084 )
1085 .await
1086 }
1087
1088 pub async fn get_order_algo_pending(
1094 &self,
1095 params: GetAlgoOrdersParams,
1096 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
1097 self.send_request(
1098 Method::GET,
1099 "/api/v5/trade/orders-algo-pending",
1100 Some(¶ms),
1101 None,
1102 true,
1103 )
1104 .await
1105 }
1106
1107 pub async fn get_order_algo_history(
1113 &self,
1114 params: GetAlgoOrdersParams,
1115 ) -> Result<Vec<OKXOrderAlgo>, OKXHttpError> {
1116 self.send_request(
1117 Method::GET,
1118 "/api/v5/trade/orders-algo-history",
1119 Some(¶ms),
1120 None,
1121 true,
1122 )
1123 .await
1124 }
1125
1126 pub async fn get_fills(
1136 &self,
1137 params: GetTransactionDetailsParams,
1138 ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
1139 self.send_request(
1140 Method::GET,
1141 "/api/v5/trade/fills",
1142 Some(¶ms),
1143 None,
1144 true,
1145 )
1146 .await
1147 }
1148
1149 pub async fn get_positions(
1161 &self,
1162 params: GetPositionsParams,
1163 ) -> Result<Vec<OKXPosition>, OKXHttpError> {
1164 self.send_request(
1165 Method::GET,
1166 "/api/v5/account/positions",
1167 Some(¶ms),
1168 None,
1169 true,
1170 )
1171 .await
1172 }
1173
1174 pub async fn get_positions_history(
1184 &self,
1185 params: GetPositionsHistoryParams,
1186 ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
1187 self.send_request(
1188 Method::GET,
1189 "/api/v5/account/positions-history",
1190 Some(¶ms),
1191 None,
1192 true,
1193 )
1194 .await
1195 }
1196}
1197
1198#[derive(Debug)]
1203#[cfg_attr(
1204 feature = "python",
1205 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.okx", from_py_object)
1206)]
1207#[cfg_attr(
1208 feature = "python",
1209 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.okx")
1210)]
1211pub struct OKXHttpClient {
1212 pub(crate) inner: Arc<OKXRawHttpClient>,
1213 pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
1214 clock: &'static AtomicTime,
1215 cache_initialized: AtomicBool,
1216}
1217
1218impl Clone for OKXHttpClient {
1219 fn clone(&self) -> Self {
1220 let cache_initialized = AtomicBool::new(false);
1221
1222 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
1223 if is_initialized {
1224 cache_initialized.store(true, Ordering::Release);
1225 }
1226
1227 Self {
1228 inner: self.inner.clone(),
1229 instruments_cache: self.instruments_cache.clone(),
1230 cache_initialized,
1231 clock: self.clock,
1232 }
1233 }
1234}
1235
1236impl Default for OKXHttpClient {
1237 fn default() -> Self {
1238 Self::new(None, 60, 3, 1000, 10_000, OKXEnvironment::Live, None)
1239 .expect("Failed to create default OKXHttpClient")
1240 }
1241}
1242
1243impl OKXHttpClient {
1244 pub fn new(
1254 base_url: Option<String>,
1255 timeout_secs: u64,
1256 max_retries: u32,
1257 retry_delay_ms: u64,
1258 retry_delay_max_ms: u64,
1259 environment: OKXEnvironment,
1260 proxy_url: Option<String>,
1261 ) -> anyhow::Result<Self> {
1262 Ok(Self {
1263 inner: Arc::new(OKXRawHttpClient::new(
1264 base_url,
1265 timeout_secs,
1266 max_retries,
1267 retry_delay_ms,
1268 retry_delay_max_ms,
1269 environment,
1270 proxy_url,
1271 )?),
1272 instruments_cache: Arc::new(AtomicMap::new()),
1273 cache_initialized: AtomicBool::new(false),
1274 clock: get_atomic_clock_realtime(),
1275 })
1276 }
1277
1278 fn generate_ts_init(&self) -> UnixNanos {
1280 self.clock.get_time_ns()
1281 }
1282
1283 pub fn from_env() -> anyhow::Result<Self> {
1290 Self::with_credentials(
1291 None,
1292 None,
1293 None,
1294 None,
1295 60,
1296 3,
1297 1000,
1298 10_000,
1299 OKXEnvironment::Live,
1300 None,
1301 )
1302 }
1303
1304 #[expect(clippy::too_many_arguments)]
1311 pub fn with_credentials(
1312 api_key: Option<String>,
1313 api_secret: Option<String>,
1314 api_passphrase: Option<String>,
1315 base_url: Option<String>,
1316 timeout_secs: u64,
1317 max_retries: u32,
1318 retry_delay_ms: u64,
1319 retry_delay_max_ms: u64,
1320 environment: OKXEnvironment,
1321 proxy_url: Option<String>,
1322 ) -> anyhow::Result<Self> {
1323 let api_key = get_or_env_var(api_key, "OKX_API_KEY")?;
1324 let api_secret = get_or_env_var(api_secret, "OKX_API_SECRET")?;
1325 let api_passphrase = get_or_env_var(api_passphrase, "OKX_API_PASSPHRASE")?;
1326 let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
1327
1328 Ok(Self {
1329 inner: Arc::new(OKXRawHttpClient::with_credentials(
1330 api_key,
1331 api_secret,
1332 api_passphrase,
1333 base_url,
1334 timeout_secs,
1335 max_retries,
1336 retry_delay_ms,
1337 retry_delay_max_ms,
1338 environment,
1339 proxy_url,
1340 )?),
1341 instruments_cache: Arc::new(AtomicMap::new()),
1342 cache_initialized: AtomicBool::new(false),
1343 clock: get_atomic_clock_realtime(),
1344 })
1345 }
1346
1347 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1353 self.instruments_cache
1354 .get_cloned(&symbol)
1355 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
1356 }
1357
1358 pub fn cancel_all_requests(&self) {
1360 self.inner.cancel_all_requests();
1361 }
1362
1363 pub fn cancellation_token(&self) -> &CancellationToken {
1365 self.inner.cancellation_token()
1366 }
1367
1368 pub fn base_url(&self) -> &str {
1370 self.inner.base_url.as_str()
1371 }
1372
1373 pub fn api_key(&self) -> Option<&str> {
1375 self.inner.credential.as_ref().map(|c| c.api_key())
1376 }
1377
1378 #[must_use]
1380 pub fn api_key_masked(&self) -> Option<String> {
1381 self.inner.credential.as_ref().map(|c| c.api_key_masked())
1382 }
1383
1384 #[must_use]
1386 pub fn is_demo(&self) -> bool {
1387 self.inner.environment == OKXEnvironment::Demo
1388 }
1389
1390 pub async fn get_server_time(&self) -> Result<u64, OKXHttpError> {
1398 self.inner.get_server_time().await
1399 }
1400
1401 #[must_use]
1405 pub fn is_initialized(&self) -> bool {
1406 self.cache_initialized.load(Ordering::Acquire)
1407 }
1408
1409 #[must_use]
1412 pub fn get_cached_symbols(&self) -> Vec<String> {
1413 self.instruments_cache
1414 .load()
1415 .keys()
1416 .map(|k| k.to_string())
1417 .collect()
1418 }
1419
1420 pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1424 self.instruments_cache.rcu(|m| {
1425 for inst in instruments {
1426 m.insert(inst.raw_symbol().inner(), inst.clone());
1427 }
1428 });
1429 self.cache_initialized.store(true, Ordering::Release);
1430 }
1431
1432 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1436 self.instruments_cache
1437 .insert(instrument.raw_symbol().inner(), instrument);
1438 self.cache_initialized.store(true, Ordering::Release);
1439 }
1440
1441 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1443 self.instruments_cache.get_cloned(symbol)
1444 }
1445
1446 pub async fn request_account_state(
1452 &self,
1453 account_id: AccountId,
1454 ) -> anyhow::Result<AccountState> {
1455 let resp = self
1456 .inner
1457 .get_balance()
1458 .await
1459 .map_err(|e| anyhow::anyhow!(e))?;
1460
1461 let ts_init = self.generate_ts_init();
1462 let raw = resp
1463 .first()
1464 .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
1465 let account_state = parse_account_state(raw, account_id, ts_init)?;
1466
1467 Ok(account_state)
1468 }
1469
1470 pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
1483 let mut params = SetPositionModeParamsBuilder::default();
1484 params.pos_mode(position_mode);
1485 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1486
1487 match self.inner.set_position_mode(params).await {
1488 Ok(_) => Ok(()),
1489 Err(e) => {
1490 if let OKXHttpError::OkxError {
1491 error_code,
1492 message,
1493 } = &e
1494 && error_code == "50115"
1495 {
1496 log::warn!(
1497 "Account does not support position mode setting (derivatives trading not enabled): {message}"
1498 );
1499 return Ok(()); }
1501 anyhow::bail!(e)
1502 }
1503 }
1504 }
1505
1506 pub async fn request_instruments(
1518 &self,
1519 instrument_type: OKXInstrumentType,
1520 instrument_family: Option<String>,
1521 ) -> anyhow::Result<(Vec<InstrumentAny>, Vec<(Ustr, u64)>)> {
1522 let mut params = GetInstrumentsParamsBuilder::default();
1523 params.inst_type(instrument_type);
1524
1525 if let Some(family) = instrument_family.clone() {
1526 params.inst_family(family);
1527 }
1528
1529 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1530
1531 let resp = self
1532 .inner
1533 .get_instruments(params)
1534 .await
1535 .map_err(|e| anyhow::anyhow!(e))?;
1536
1537 let fee_rate_opt = {
1538 let fee_params = GetTradeFeeParams {
1539 inst_type: instrument_type,
1540 uly: None,
1541 inst_family: instrument_family,
1542 };
1543
1544 match self.inner.get_trade_fee(fee_params).await {
1545 Ok(rates) => rates.into_iter().next(),
1546 Err(OKXHttpError::MissingCredentials) => {
1547 log::debug!("Missing credentials for fee rates, using None");
1548 None
1549 }
1550 Err(e) => {
1551 log::warn!("Failed to fetch fee rates for {instrument_type}: {e}");
1552 None
1553 }
1554 }
1555 };
1556
1557 let ts_init = self.generate_ts_init();
1558
1559 let mut instruments: Vec<InstrumentAny> = Vec::new();
1560 let mut inst_id_codes: Vec<(Ustr, u64)> = Vec::new();
1561
1562 for inst in &resp {
1563 if let Some(code) = inst.inst_id_code {
1565 inst_id_codes.push((inst.inst_id, code));
1566 }
1567 if inst.state == OKXInstrumentStatus::Preopen {
1570 continue;
1571 }
1572
1573 let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1578 let is_usdt_margined = inst.ct_type == OKXContractType::Linear;
1579 let (maker_str, taker_str) = if is_usdt_margined {
1580 (&fee_rate.maker_u, &fee_rate.taker_u)
1581 } else {
1582 (&fee_rate.maker, &fee_rate.taker)
1583 };
1584
1585 let maker = if maker_str.is_empty() {
1586 None
1587 } else {
1588 Decimal::from_str(maker_str).ok().map(|v| -v)
1589 };
1590 let taker = if taker_str.is_empty() {
1591 None
1592 } else {
1593 Decimal::from_str(taker_str).ok().map(|v| -v)
1594 };
1595
1596 (maker, taker)
1597 } else {
1598 (None, None)
1599 };
1600
1601 match parse_instrument_any(inst, None, None, maker_fee, taker_fee, ts_init) {
1602 Ok(Some(instrument_any)) => {
1603 instruments.push(instrument_any);
1604 }
1605 Ok(None) => {
1606 }
1608 Err(e) => {
1609 log::warn!("Failed to parse instrument {}: {e}", inst.inst_id);
1610 }
1611 }
1612 }
1613
1614 Ok((instruments, inst_id_codes))
1615 }
1616
1617 pub async fn request_instrument(
1628 &self,
1629 instrument_id: InstrumentId,
1630 ) -> anyhow::Result<InstrumentAny> {
1631 let symbol = instrument_id.symbol.as_str();
1632 let instrument_type = okx_instrument_type_from_symbol(symbol);
1633
1634 let mut params = GetInstrumentsParamsBuilder::default();
1635 params.inst_type(instrument_type);
1636 params.inst_id(symbol);
1637
1638 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1639
1640 let resp = self
1641 .inner
1642 .get_instruments(params)
1643 .await
1644 .map_err(|e| anyhow::anyhow!(e))?;
1645
1646 let raw_inst = resp
1647 .first()
1648 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found"))?;
1649
1650 if raw_inst.state == OKXInstrumentStatus::Preopen {
1652 anyhow::bail!("Instrument {symbol} is in pre-open state");
1653 }
1654
1655 let fee_rate_opt = {
1656 let fee_params = GetTradeFeeParams {
1657 inst_type: instrument_type,
1658 uly: None,
1659 inst_family: None,
1660 };
1661
1662 match self.inner.get_trade_fee(fee_params).await {
1663 Ok(rates) => rates.into_iter().next(),
1664 Err(OKXHttpError::MissingCredentials) => {
1665 log::debug!("Missing credentials for fee rates, using None");
1666 None
1667 }
1668 Err(e) => {
1669 log::warn!("Failed to fetch fee rates for {symbol}: {e}");
1670 None
1671 }
1672 }
1673 };
1674
1675 let (maker_fee, taker_fee) = if let Some(ref fee_rate) = fee_rate_opt {
1679 let is_usdt_margined = raw_inst.ct_type == OKXContractType::Linear;
1680 let (maker_str, taker_str) = if is_usdt_margined {
1681 (&fee_rate.maker_u, &fee_rate.taker_u)
1682 } else {
1683 (&fee_rate.maker, &fee_rate.taker)
1684 };
1685
1686 let maker = if maker_str.is_empty() {
1687 None
1688 } else {
1689 Decimal::from_str(maker_str).ok().map(|v| -v)
1690 };
1691 let taker = if taker_str.is_empty() {
1692 None
1693 } else {
1694 Decimal::from_str(taker_str).ok().map(|v| -v)
1695 };
1696
1697 (maker, taker)
1698 } else {
1699 (None, None)
1700 };
1701
1702 let ts_init = self.generate_ts_init();
1703 let instrument = parse_instrument_any(raw_inst, None, None, maker_fee, taker_fee, ts_init)?
1704 .ok_or_else(|| anyhow::anyhow!("Unsupported instrument type for {symbol}"))?;
1705
1706 self.cache_instrument(instrument.clone());
1707
1708 Ok(instrument)
1709 }
1710
1711 pub async fn request_forward_prices(
1717 &self,
1718 underlying: &str,
1719 instrument_id: Option<InstrumentId>,
1720 ) -> anyhow::Result<Vec<ForwardPrice>> {
1721 let requests = self.resolve_forward_price_requests(underlying, instrument_id.as_ref())?;
1722 let requested_symbol = instrument_id.as_ref().map(|id| id.symbol.inner());
1723 let requested_instrument_id = instrument_id.as_ref();
1724 let ts_init = self.generate_ts_init();
1725 let mut forward_prices = Vec::new();
1726 let mut seen_expiries = AHashSet::new();
1727
1728 for (inst_family, exp_time) in requests {
1729 let summaries = self
1730 .inner
1731 .get_option_summary(GetOptionSummaryParams {
1732 inst_family,
1733 exp_time,
1734 })
1735 .await
1736 .map_err(|e| anyhow::anyhow!(e))?;
1737
1738 for summary in summaries {
1739 if summary.inst_type != OKXInstrumentType::Option {
1740 continue;
1741 }
1742
1743 if let Some(symbol) = requested_symbol
1744 && summary.inst_id != symbol
1745 {
1746 continue;
1747 }
1748
1749 let forward_price = match Decimal::from_str(&summary.fwd_px) {
1750 Ok(price) if !price.is_zero() => price,
1751 Ok(_) => continue,
1752 Err(e) => {
1753 log::warn!(
1754 "Skipping invalid OKX forward price for {}: {e}",
1755 summary.inst_id
1756 );
1757 continue;
1758 }
1759 };
1760
1761 if requested_symbol.is_none() {
1762 let expiry_key = Self::option_summary_expiry_key(summary.inst_id.as_str())?;
1763 if !seen_expiries.insert(expiry_key) {
1764 continue;
1765 }
1766 }
1767
1768 let ts_event =
1769 UnixNanos::from(summary.ts.saturating_mul(NANOSECONDS_IN_MILLISECOND));
1770 let instrument_id = if let Some(inst_id) = requested_instrument_id {
1771 *inst_id
1772 } else {
1773 parse_instrument_id(summary.inst_id)
1774 };
1775
1776 forward_prices.push(ForwardPrice::new(
1777 instrument_id,
1778 forward_price,
1779 Some(summary.uly.to_string()),
1780 ts_event,
1781 ts_init,
1782 ));
1783 }
1784 }
1785
1786 Ok(forward_prices)
1787 }
1788
1789 pub async fn request_mark_price(
1795 &self,
1796 instrument_id: InstrumentId,
1797 ) -> anyhow::Result<MarkPriceUpdate> {
1798 let mut params = GetMarkPriceParamsBuilder::default();
1799 params.inst_id(instrument_id.symbol.inner());
1800 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1801
1802 let resp = self
1803 .inner
1804 .get_mark_price(params)
1805 .await
1806 .map_err(|e| anyhow::anyhow!(e))?;
1807
1808 let raw = resp
1809 .first()
1810 .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1811 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1812 let ts_init = self.generate_ts_init();
1813
1814 let mark_price =
1815 parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1816 .map_err(|e| anyhow::anyhow!(e))?;
1817 Ok(mark_price)
1818 }
1819
1820 fn resolve_forward_price_requests(
1821 &self,
1822 underlying: &str,
1823 instrument_id: Option<&InstrumentId>,
1824 ) -> anyhow::Result<Vec<(String, Option<String>)>> {
1825 if let Some(inst_id) = instrument_id {
1826 let symbol = inst_id.symbol.inner().as_str();
1827 let inst_family = extract_inst_family(symbol)?.to_string();
1828 let exp_time = Self::option_summary_exp_time(symbol)?;
1829 return Ok(vec![(inst_family, exp_time)]);
1830 }
1831
1832 let underlying = Ustr::from(underlying);
1833 let mut families = AHashSet::new();
1834
1835 for instrument in self.instruments_cache.load().values() {
1836 let InstrumentAny::CryptoOption(option) = instrument else {
1837 continue;
1838 };
1839
1840 if option.underlying.code != underlying {
1841 continue;
1842 }
1843
1844 let inst_family = extract_inst_family(option.id.symbol.inner().as_str())?;
1845 families.insert(inst_family.to_string());
1846 }
1847
1848 let mut families: Vec<String> = families.into_iter().collect();
1849 families.sort_unstable();
1850
1851 anyhow::ensure!(
1852 !families.is_empty(),
1853 "No cached OKX option families for underlying {underlying}; provide a sample instrument or pre-load option instruments"
1854 );
1855
1856 Ok(families.into_iter().map(|family| (family, None)).collect())
1857 }
1858
1859 fn option_summary_expiry_key(symbol: &str) -> anyhow::Result<String> {
1860 let parts: Vec<&str> = symbol.split('-').collect();
1861 anyhow::ensure!(
1862 parts.len() >= 5,
1863 "Expected OKX option symbol with expiry, received {symbol}"
1864 );
1865 Ok(format!("{}-{}-{}", parts[0], parts[1], parts[2]))
1866 }
1867
1868 fn option_summary_exp_time(symbol: &str) -> anyhow::Result<Option<String>> {
1869 let parts: Vec<&str> = symbol.split('-').collect();
1870 anyhow::ensure!(
1871 parts.len() >= 5,
1872 "Expected OKX option symbol with expiry, received {symbol}"
1873 );
1874 Ok(Some(parts[2].to_string()))
1875 }
1876
1877 pub async fn request_index_price(
1883 &self,
1884 instrument_id: InstrumentId,
1885 ) -> anyhow::Result<IndexPriceUpdate> {
1886 let symbol = instrument_id.symbol.inner();
1888 let (base, quote) = parse_base_quote_from_symbol(symbol.as_str())?;
1889 let inst_id = format!("{base}-{quote}");
1890
1891 let mut params = GetIndexTickerParamsBuilder::default();
1892 params.inst_id(Ustr::from(&inst_id));
1893 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1894
1895 let resp = self
1896 .inner
1897 .get_index_tickers(params)
1898 .await
1899 .map_err(|e| anyhow::anyhow!(e))?;
1900
1901 let raw = resp
1902 .first()
1903 .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1904 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1905 let ts_init = self.generate_ts_init();
1906
1907 let index_price =
1908 parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1909 .map_err(|e| anyhow::anyhow!(e))?;
1910 Ok(index_price)
1911 }
1912
1913 pub async fn request_book_snapshot(
1919 &self,
1920 instrument_id: InstrumentId,
1921 depth: Option<u32>,
1922 ) -> anyhow::Result<OrderBook> {
1923 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1924 let price_precision = inst.price_precision();
1925 let size_precision = inst.size_precision();
1926
1927 let params = GetOrderBookParams {
1928 inst_id: instrument_id.symbol.to_string(),
1929 sz: depth,
1930 };
1931
1932 let resp = self
1933 .inner
1934 .get_order_book(params)
1935 .await
1936 .map_err(|e| anyhow::anyhow!(e))?;
1937
1938 let snapshot = resp
1939 .first()
1940 .ok_or_else(|| anyhow::anyhow!("No order book returned from OKX"))?;
1941
1942 let ts_event = UnixNanos::from(snapshot.ts * NANOSECONDS_IN_MILLISECOND);
1943 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1944
1945 for (i, level) in snapshot.bids.iter().enumerate() {
1946 let price = parse_price(&level.0, price_precision)?;
1947 let size = parse_quantity(&level.1, size_precision)?;
1948 let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1949 book.add(order, 0, i as u64, ts_event);
1950 }
1951
1952 let bids_len = snapshot.bids.len();
1953
1954 for (i, level) in snapshot.asks.iter().enumerate() {
1955 let price = parse_price(&level.0, price_precision)?;
1956 let size = parse_quantity(&level.1, size_precision)?;
1957 let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1958 book.add(order, 0, (bids_len + i) as u64, ts_event);
1959 }
1960
1961 log::info!(
1962 "Fetched order book for {} with {} bids and {} asks",
1963 instrument_id,
1964 snapshot.bids.len(),
1965 snapshot.asks.len(),
1966 );
1967
1968 Ok(book)
1969 }
1970
1971 pub async fn request_orderbook_snapshot(
1977 &self,
1978 instrument_id: InstrumentId,
1979 depth: Option<u32>,
1980 ) -> anyhow::Result<OrderBookDeltas> {
1981 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
1982 let price_precision = inst.price_precision();
1983 let size_precision = inst.size_precision();
1984
1985 let params = GetOrderBookParams {
1986 inst_id: instrument_id.symbol.to_string(),
1987 sz: depth,
1988 };
1989
1990 let resp = self
1991 .inner
1992 .get_order_book(params)
1993 .await
1994 .map_err(|e| anyhow::anyhow!(e))?;
1995
1996 let snapshot = resp
1997 .first()
1998 .ok_or_else(|| anyhow::anyhow!("No order book returned from OKX"))?;
1999
2000 let ts_event = UnixNanos::from(snapshot.ts * NANOSECONDS_IN_MILLISECOND);
2001 let total_levels = snapshot.bids.len() + snapshot.asks.len();
2002 let mut deltas = Vec::with_capacity(total_levels + 1);
2003
2004 let mut clear = OrderBookDelta::clear(instrument_id, 0, ts_event, ts_event);
2005
2006 if total_levels == 0 {
2007 clear.flags |= RecordFlag::F_LAST as u8;
2008 }
2009 deltas.push(clear);
2010
2011 let mut processed = 0_usize;
2012
2013 for (i, level) in snapshot.bids.iter().enumerate() {
2014 let price = parse_price(&level.0, price_precision)?;
2015 let size = parse_quantity(&level.1, size_precision)?;
2016 let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
2017 processed += 1;
2018 let mut flags = RecordFlag::F_SNAPSHOT as u8;
2019
2020 if processed == total_levels {
2021 flags |= RecordFlag::F_LAST as u8;
2022 }
2023 deltas.push(OrderBookDelta::new(
2024 instrument_id,
2025 BookAction::Add,
2026 order,
2027 flags,
2028 0,
2029 ts_event,
2030 ts_event,
2031 ));
2032 }
2033
2034 let bids_len = snapshot.bids.len();
2035
2036 for (i, level) in snapshot.asks.iter().enumerate() {
2037 let price = parse_price(&level.0, price_precision)?;
2038 let size = parse_quantity(&level.1, size_precision)?;
2039 let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
2040 processed += 1;
2041 let mut flags = RecordFlag::F_SNAPSHOT as u8;
2042
2043 if processed == total_levels {
2044 flags |= RecordFlag::F_LAST as u8;
2045 }
2046 deltas.push(OrderBookDelta::new(
2047 instrument_id,
2048 BookAction::Add,
2049 order,
2050 flags,
2051 0,
2052 ts_event,
2053 ts_event,
2054 ));
2055 }
2056
2057 log::info!(
2058 "Fetched order book snapshot for {} with {} bids and {} asks",
2059 instrument_id,
2060 snapshot.bids.len(),
2061 snapshot.asks.len(),
2062 );
2063
2064 OrderBookDeltas::new_checked(instrument_id, deltas)
2065 .context("failed to assemble OrderBookDeltas from OKX snapshot")
2066 }
2067
2068 pub async fn request_funding_rates(
2074 &self,
2075 instrument_id: InstrumentId,
2076 start: Option<DateTime<Utc>>,
2077 end: Option<DateTime<Utc>>,
2078 limit: Option<u32>,
2079 ) -> anyhow::Result<Vec<FundingRateUpdate>> {
2080 let mut params = GetFundingRateHistoryParams {
2081 inst_id: instrument_id.symbol.to_string(),
2082 ..Default::default()
2083 };
2084
2085 if let Some(start) = start {
2087 params.before = Some(start.timestamp_millis().to_string());
2088 }
2089
2090 if let Some(end) = end {
2091 params.after = Some(end.timestamp_millis().to_string());
2092 }
2093
2094 params.limit = limit;
2095
2096 let resp = self
2097 .inner
2098 .get_funding_rate_history(params)
2099 .await
2100 .map_err(|e| anyhow::anyhow!(e))?;
2101
2102 let mut rates = Vec::with_capacity(resp.len());
2103
2104 for window in resp.windows(2) {
2105 let raw = &window[0];
2106 let interval_millis = raw
2107 .funding_time
2108 .checked_sub(window[1].funding_time)
2109 .context("funding interval negative, funding rates out of order")?;
2110 let rate = parse_funding_rate(raw, instrument_id, Some(interval_millis))?;
2111 rates.push(rate);
2112 }
2113
2114 if let Some(last_raw) = resp.last() {
2115 let rate = parse_funding_rate(last_raw, instrument_id, None)?;
2117 rates.push(rate);
2118 }
2119
2120 rates.reverse();
2123
2124 log::info!(
2125 "Fetched {} funding rates for {}",
2126 rates.len(),
2127 instrument_id,
2128 );
2129
2130 Ok(rates)
2131 }
2132
2133 pub async fn request_trades(
2139 &self,
2140 instrument_id: InstrumentId,
2141 start: Option<DateTime<Utc>>,
2142 end: Option<DateTime<Utc>>,
2143 limit: Option<u32>,
2144 ) -> anyhow::Result<Vec<TradeTick>> {
2145 const OKX_TRADES_MAX_LIMIT: u32 = 100;
2146 const MAX_PAGES: usize = 500;
2147 const MAX_CONSECUTIVE_EMPTY: usize = 3;
2148
2149 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
2150 enum Mode {
2151 Latest,
2152 Backward,
2153 Range,
2154 }
2155
2156 let limit = if limit == Some(0) { None } else { limit };
2157
2158 if let (Some(s), Some(e)) = (start, end) {
2159 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
2160 }
2161
2162 let now = Utc::now();
2163
2164 if let Some(s) = start
2165 && s > now
2166 {
2167 return Ok(Vec::new());
2168 }
2169
2170 let end = if let Some(e) = end
2171 && e > now
2172 {
2173 Some(now)
2174 } else {
2175 end
2176 };
2177
2178 let mode = match (start, end) {
2179 (None, None) => Mode::Latest,
2180 (Some(_), None) => Mode::Backward,
2181 (None, Some(_)) => Mode::Backward,
2182 (Some(_), Some(_)) => Mode::Range,
2183 };
2184
2185 let start_ms = start.map(|s| s.timestamp_millis());
2186 let end_ms = end.map(|e| e.timestamp_millis());
2187
2188 let ts_init = self.generate_ts_init();
2189 let inst = self.instrument_from_cache(instrument_id.symbol.inner())?;
2190
2191 if matches!(mode, Mode::Backward | Mode::Range) {
2194 let mut before_trade_id: Option<String> = None;
2195 let mut pages = 0usize;
2196 let mut page_results: Vec<Vec<TradeTick>> = Vec::new();
2197 let mut seen_trades: AHashSet<(String, i64)> = AHashSet::new();
2198 let mut unique_count = 0usize;
2199 let mut consecutive_empty_pages = 0usize;
2200
2201 let effective_limit = if start.is_some() {
2204 limit.unwrap_or(u32::MAX)
2205 } else {
2206 limit.unwrap_or(OKX_TRADES_MAX_LIMIT)
2207 };
2208
2209 log::debug!(
2210 "Starting trades pagination: mode={mode:?}, start={start:?}, end={end:?}, limit={limit:?}, effective_limit={effective_limit}"
2211 );
2212
2213 loop {
2214 if pages >= MAX_PAGES {
2215 log::warn!("Hit MAX_PAGES limit of {MAX_PAGES}");
2216 break;
2217 }
2218
2219 if effective_limit < u32::MAX && unique_count >= effective_limit as usize {
2220 log::debug!("Reached effective limit: unique_count={unique_count}");
2221 break;
2222 }
2223
2224 let remaining = (effective_limit as usize).saturating_sub(unique_count);
2225 let page_cap = remaining.min(OKX_TRADES_MAX_LIMIT as usize) as u32;
2226
2227 log::debug!(
2228 "Requesting page {}: before_id={:?}, page_cap={}, unique_count={}",
2229 pages + 1,
2230 before_trade_id,
2231 page_cap,
2232 unique_count
2233 );
2234
2235 let mut params_builder = GetTradesParamsBuilder::default();
2236 params_builder
2237 .inst_id(instrument_id.symbol.inner())
2238 .limit(page_cap)
2239 .pagination_type(1);
2240
2241 if let Some(ref before_id) = before_trade_id {
2243 params_builder.after(before_id.clone());
2244 }
2245
2246 let params = params_builder.build().map_err(anyhow::Error::new)?;
2247 let raw = self
2248 .inner
2249 .get_history_trades(params)
2250 .await
2251 .map_err(anyhow::Error::new)?;
2252
2253 log::debug!("Received {} raw trades from API", raw.len());
2254
2255 if let (Some(first), Some(last)) = (raw.first(), raw.last()) {
2256 log::debug!(
2257 "Raw response trade ID range: first={} (newest), last={} (oldest)",
2258 first.trade_id,
2259 last.trade_id,
2260 );
2261 }
2262
2263 if raw.is_empty() {
2264 log::debug!("API returned empty page, stopping pagination");
2265 break;
2266 }
2267
2268 pages += 1;
2269
2270 let mut page_trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
2271 let mut hit_start_boundary = false;
2272 let mut filtered_out = 0usize;
2273 let mut duplicates = 0usize;
2274
2275 for r in &raw {
2276 match parse_trade_tick(
2277 r,
2278 instrument_id,
2279 inst.price_precision(),
2280 inst.size_precision(),
2281 ts_init,
2282 ) {
2283 Ok(trade) => {
2284 let ts_ms = trade.ts_event.as_i64() / 1_000_000;
2285
2286 if let Some(e_ms) = end_ms
2287 && ts_ms > e_ms
2288 {
2289 filtered_out += 1;
2290 continue;
2291 }
2292
2293 if let Some(s_ms) = start_ms
2294 && ts_ms < s_ms
2295 {
2296 hit_start_boundary = true;
2297 filtered_out += 1;
2298 break;
2299 }
2300
2301 let trade_key = (trade.trade_id.to_string(), trade.ts_event.as_i64());
2302 if seen_trades.insert(trade_key) {
2303 unique_count += 1;
2304 page_trades.push(trade);
2305 } else {
2306 duplicates += 1;
2307 }
2308 }
2309 Err(e) => log::error!("{e}"),
2310 }
2311 }
2312
2313 log::debug!(
2314 "Page {} processed: {} trades kept, {} filtered out, {} duplicates, hit_start_boundary={}",
2315 pages,
2316 page_trades.len(),
2317 filtered_out,
2318 duplicates,
2319 hit_start_boundary
2320 );
2321
2322 let oldest_trade_id = if page_trades.is_empty() {
2324 if unique_count > 0 {
2327 consecutive_empty_pages += 1;
2328 if consecutive_empty_pages >= MAX_CONSECUTIVE_EMPTY {
2329 log::debug!(
2330 "Stopping: {consecutive_empty_pages} consecutive pages with no trades in range after collecting {unique_count} trades"
2331 );
2332 break;
2333 }
2334 }
2335 raw.last().map(|t| {
2337 let id = t.trade_id.to_string();
2338 log::debug!(
2339 "Setting cursor from raw response (no unique trades): oldest_id={id}"
2340 );
2341 id
2342 })
2343 } else {
2344 let oldest_id = page_trades.last().map(|t| {
2346 let id = t.trade_id.to_string();
2347 log::debug!(
2348 "Setting cursor from deduplicated trades: oldest_id={}, ts_event={}",
2349 id,
2350 t.ts_event.as_i64()
2351 );
2352 id
2353 });
2354 page_trades.reverse();
2355 page_results.push(page_trades);
2356 consecutive_empty_pages = 0;
2357 oldest_id
2358 };
2359
2360 if let Some(ref old_id) = before_trade_id
2361 && oldest_trade_id.as_ref() == Some(old_id)
2362 {
2363 break;
2364 }
2365
2366 if oldest_trade_id.is_none() {
2367 break;
2368 }
2369
2370 before_trade_id = oldest_trade_id;
2371
2372 if hit_start_boundary {
2373 break;
2374 }
2375
2376 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
2377 }
2378
2379 log::debug!(
2380 "Pagination complete: {pages} pages, {unique_count} unique trades collected"
2381 );
2382
2383 let mut out: Vec<TradeTick> = Vec::new();
2384
2385 for page in page_results.into_iter().rev() {
2386 out.extend(page);
2387 }
2388
2389 let mut dedup_keys = AHashSet::new();
2391 let pre_dedup_len = out.len();
2392 out.retain(|trade| {
2393 dedup_keys.insert((trade.trade_id.to_string(), trade.ts_event.as_i64()))
2394 });
2395
2396 if out.len() < pre_dedup_len {
2397 log::debug!(
2398 "Removed {} duplicate trades during final dedup",
2399 pre_dedup_len - out.len()
2400 );
2401 }
2402
2403 if let Some(lim) = limit
2404 && lim > 0
2405 && out.len() > lim as usize
2406 {
2407 let excess = out.len() - lim as usize;
2408 log::debug!("Trimming {excess} oldest trades to respect limit={lim}");
2409 out.drain(0..excess);
2410 }
2411
2412 log::debug!("Returning {} trades", out.len());
2413 return Ok(out);
2414 }
2415
2416 let req_limit = limit
2417 .unwrap_or(OKX_TRADES_MAX_LIMIT)
2418 .min(OKX_TRADES_MAX_LIMIT);
2419 let params = GetTradesParamsBuilder::default()
2420 .inst_id(instrument_id.symbol.inner())
2421 .limit(req_limit)
2422 .build()
2423 .map_err(anyhow::Error::new)?;
2424
2425 let raw = self
2426 .inner
2427 .get_history_trades(params)
2428 .await
2429 .map_err(anyhow::Error::new)?;
2430
2431 let mut trades: Vec<TradeTick> = Vec::with_capacity(raw.len());
2432
2433 for r in &raw {
2434 match parse_trade_tick(
2435 r,
2436 instrument_id,
2437 inst.price_precision(),
2438 inst.size_precision(),
2439 ts_init,
2440 ) {
2441 Ok(trade) => trades.push(trade),
2442 Err(e) => log::error!("{e}"),
2443 }
2444 }
2445
2446 trades.reverse();
2448
2449 if let Some(lim) = limit
2450 && lim > 0
2451 && trades.len() > lim as usize
2452 {
2453 trades.drain(0..trades.len() - lim as usize);
2454 }
2455
2456 Ok(trades)
2457 }
2458
2459 pub async fn request_bars(
2500 &self,
2501 bar_type: BarType,
2502 start: Option<DateTime<Utc>>,
2503 mut end: Option<DateTime<Utc>>,
2504 limit: Option<u32>,
2505 ) -> anyhow::Result<Vec<Bar>> {
2506 const HISTORY_SPLIT_DAYS: i64 = 100;
2507 const MAX_PAGES_SOFT: usize = 500;
2508
2509 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
2510 enum Mode {
2511 Latest,
2512 Backward,
2513 Range,
2514 }
2515
2516 let limit = if limit == Some(0) { None } else { limit };
2517
2518 anyhow::ensure!(
2519 bar_type.aggregation_source() == AggregationSource::External,
2520 "Only EXTERNAL aggregation is supported"
2521 );
2522
2523 if let (Some(s), Some(e)) = (start, end) {
2524 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
2525 }
2526
2527 let now = Utc::now();
2528
2529 if let Some(s) = start
2530 && s > now
2531 {
2532 return Ok(Vec::new());
2533 }
2534
2535 if let Some(e) = end
2536 && e > now
2537 {
2538 end = Some(now);
2539 }
2540
2541 let spec = bar_type.spec();
2542 let step = spec.step.get();
2543 let bar_param = match spec.aggregation {
2544 BarAggregation::Second => format!("{step}s"),
2545 BarAggregation::Minute => format!("{step}m"),
2546 BarAggregation::Hour => format!("{step}H"),
2547 BarAggregation::Day => format!("{step}D"),
2548 BarAggregation::Week => format!("{step}W"),
2549 BarAggregation::Month => format!("{step}M"),
2550 a => anyhow::bail!("OKX does not support {a:?} aggregation"),
2551 };
2552
2553 let slot_ms: i64 = match spec.aggregation {
2554 BarAggregation::Second => (step as i64) * 1_000,
2555 BarAggregation::Minute => (step as i64) * 60_000,
2556 BarAggregation::Hour => (step as i64) * 3_600_000,
2557 BarAggregation::Day => (step as i64) * 86_400_000,
2558 BarAggregation::Week => (step as i64) * 7 * 86_400_000,
2559 BarAggregation::Month => (step as i64) * 30 * 86_400_000,
2560 _ => unreachable!("Unsupported aggregation should have been caught above"),
2561 };
2562 let slot_ns: i64 = slot_ms * 1_000_000;
2563
2564 let mode = match (start, end) {
2565 (None, None) => Mode::Latest,
2566 (Some(_), None) => Mode::Backward, (None, Some(_)) => Mode::Backward,
2568 (Some(_), Some(_)) => Mode::Range,
2569 };
2570
2571 let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
2572 let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
2573
2574 let start_ms = start.map(|s| {
2576 let ms = s.timestamp_millis();
2577
2578 if slot_ms > 0 {
2579 (ms / slot_ms) * slot_ms } else {
2581 ms
2582 }
2583 });
2584 let end_ms = end.map(|e| {
2585 let ms = e.timestamp_millis();
2586
2587 if slot_ms > 0 {
2588 ((ms + slot_ms - 1) / slot_ms) * slot_ms } else {
2590 ms
2591 }
2592 });
2593 let now_ms = now.timestamp_millis();
2594
2595 let symbol = bar_type.instrument_id().symbol;
2596 let inst = self.instrument_from_cache(symbol.inner())?;
2597
2598 let mut out: Vec<Bar> = Vec::new();
2599 let mut pages = 0usize;
2600
2601 let mut after_ms: Option<i64> = match mode {
2606 Mode::Range => end_ms.or(Some(now_ms)), _ => None,
2608 };
2609 let mut before_ms: Option<i64> = match mode {
2610 Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
2611 Mode::Range => start_ms, Mode::Latest => None,
2613 };
2614
2615 let mut forward_prepend_mode = matches!(mode, Mode::Range);
2617
2618 if matches!(mode, Mode::Backward | Mode::Range)
2622 && let Some(b) = before_ms
2623 {
2624 let buffer_ms = slot_ms.max(60_000); if b >= now_ms.saturating_sub(buffer_ms) {
2630 before_ms = Some(now_ms.saturating_sub(buffer_ms));
2631 }
2632 }
2633
2634 let mut have_latest_first_page = false;
2635 let mut progressless_loops = 0u8;
2636
2637 loop {
2638 if let Some(lim) = limit
2639 && lim > 0
2640 && out.len() >= lim as usize
2641 {
2642 break;
2643 }
2644
2645 if pages >= MAX_PAGES_SOFT {
2646 break;
2647 }
2648
2649 let pivot_ms = if let Some(a) = after_ms {
2650 a
2651 } else if let Some(b) = before_ms {
2652 b
2653 } else {
2654 now_ms
2655 };
2656 let age_ms = now_ms.saturating_sub(pivot_ms);
2662 let age_hours = age_ms / (60 * 60 * 1000);
2663 let using_history = age_hours > 1; let page_ceiling = if using_history { 100 } else { 300 };
2666 let remaining = limit
2667 .filter(|&l| l > 0) .map_or(page_ceiling, |l| (l as usize).saturating_sub(out.len()));
2669 let page_cap = remaining.min(page_ceiling);
2670
2671 let mut p = GetCandlesticksParamsBuilder::default();
2672 p.inst_id(symbol.as_str())
2673 .bar(&bar_param)
2674 .limit(page_cap as u32);
2675
2676 let mut req_used_before = false;
2678
2679 match mode {
2680 Mode::Latest => {
2681 if have_latest_first_page && let Some(b) = before_ms {
2682 p.before_ms(b);
2683 req_used_before = true;
2684 }
2685 }
2686 Mode::Backward => {
2687 if let Some(b) = before_ms {
2689 p.after_ms(b);
2690 }
2691 }
2692 Mode::Range => {
2693 if let Some(a) = after_ms {
2696 p.after_ms(a);
2697 }
2698
2699 if let Some(b) = before_ms {
2700 p.before_ms(b);
2701 req_used_before = true;
2702 }
2703 }
2704 }
2705
2706 let params = p.build().map_err(anyhow::Error::new)?;
2707
2708 let mut raw = if using_history {
2709 self.inner
2710 .get_history_candles(params.clone())
2711 .await
2712 .map_err(anyhow::Error::new)?
2713 } else {
2714 self.inner
2715 .get_candles(params.clone())
2716 .await
2717 .map_err(anyhow::Error::new)?
2718 };
2719
2720 if raw.is_empty() {
2722 if matches!(mode, Mode::Latest)
2724 && have_latest_first_page
2725 && !using_history
2726 && let Some(b) = before_ms
2727 {
2728 let mut p2 = GetCandlesticksParamsBuilder::default();
2729 p2.inst_id(symbol.as_str())
2730 .bar(&bar_param)
2731 .limit(page_cap as u32);
2732 p2.before_ms(b);
2733 let params2 = p2.build().map_err(anyhow::Error::new)?;
2734 let raw2 = self
2735 .inner
2736 .get_history_candles(params2)
2737 .await
2738 .map_err(anyhow::Error::new)?;
2739
2740 if raw2.is_empty() {
2741 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2743 before_ms = Some(b.saturating_sub(jump));
2744 progressless_loops = progressless_loops.saturating_add(1);
2745 if progressless_loops >= 3 {
2746 break;
2747 }
2748 continue;
2749 } else {
2750 raw = raw2;
2751 }
2752 }
2753
2754 if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
2758 let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
2759 let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
2760
2761 let mut p2 = GetCandlesticksParamsBuilder::default();
2762 p2.inst_id(symbol.as_str())
2763 .bar(&bar_param)
2764 .limit(page_cap as u32)
2765 .before_ms(pivot_back);
2766 let params2 = p2.build().map_err(anyhow::Error::new)?;
2767 let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
2768 > HISTORY_SPLIT_DAYS
2769 {
2770 self.inner.get_history_candles(params2).await
2771 } else {
2772 self.inner.get_candles(params2).await
2773 }
2774 .map_err(anyhow::Error::new)?;
2775
2776 if raw2.is_empty() {
2777 break;
2778 } else {
2779 raw = raw2;
2780 forward_prepend_mode = true;
2781 req_used_before = true;
2782 }
2783 }
2784
2785 if raw.is_empty()
2787 && matches!(mode, Mode::Latest)
2788 && !have_latest_first_page
2789 && !using_history
2790 {
2791 let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
2792 before_ms = Some(now_ms.saturating_sub(jump_days_ms));
2793 have_latest_first_page = true;
2794 continue;
2795 }
2796
2797 if raw.is_empty() {
2799 break;
2800 }
2801 }
2802 pages += 1;
2805
2806 let ts_init = self.generate_ts_init();
2808 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
2809
2810 for r in &raw {
2811 page.push(parse_candlestick(
2812 r,
2813 bar_type,
2814 inst.price_precision(),
2815 inst.size_precision(),
2816 ts_init,
2817 )?);
2818 }
2819 page.reverse();
2820
2821 let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
2822 let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
2823
2824 let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
2828 && out.is_empty()
2829 && pages < 2
2830 {
2831 let tolerance_ns = slot_ns * 2; if let (Some(first), Some(last)) = (page.first(), page.last()) {
2838 log::debug!(
2839 "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
2840 page.len(),
2841 first.ts_event.as_i64() / 1_000_000,
2842 last.ts_event.as_i64() / 1_000_000,
2843 start_ms,
2844 end_ms,
2845 );
2846 }
2847
2848 let result: Vec<Bar> = page
2849 .clone()
2850 .into_iter()
2851 .filter(|b| {
2852 let ts = b.ts_event.as_i64();
2853 let ok_after =
2855 start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
2856 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2857 ok_after && ok_before
2858 })
2859 .collect();
2860
2861 result
2862 } else {
2863 page.clone()
2865 .into_iter()
2866 .filter(|b| {
2867 let ts = b.ts_event.as_i64();
2868 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
2869 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
2870 ok_after && ok_before
2871 })
2872 .collect()
2873 };
2874
2875 if !page.is_empty() && filtered.is_empty() {
2876 if matches!(mode, Mode::Range)
2878 && !forward_prepend_mode
2879 && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
2880 && newest_ms < start_ms.saturating_sub(slot_ms * 2)
2881 {
2882 break;
2884 }
2885 }
2886
2887 let contribution;
2889
2890 if out.is_empty() {
2891 contribution = filtered.len();
2892 out = filtered;
2893 } else {
2894 match mode {
2895 Mode::Backward | Mode::Latest => {
2896 if let Some(first) = out.first() {
2897 filtered.retain(|b| b.ts_event < first.ts_event);
2898 }
2899 contribution = filtered.len();
2900 if contribution != 0 {
2901 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2902 new_out.extend_from_slice(&filtered);
2903 new_out.extend_from_slice(&out);
2904 out = new_out;
2905 }
2906 }
2907 Mode::Range => {
2908 if forward_prepend_mode || req_used_before {
2909 if let Some(first) = out.first() {
2911 filtered.retain(|b| b.ts_event < first.ts_event);
2912 }
2913 contribution = filtered.len();
2914 if contribution != 0 {
2915 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
2916 new_out.extend_from_slice(&filtered);
2917 new_out.extend_from_slice(&out);
2918 out = new_out;
2919 }
2920 } else {
2921 if let Some(last) = out.last() {
2923 filtered.retain(|b| b.ts_event > last.ts_event);
2924 }
2925 contribution = filtered.len();
2926 out.extend(filtered);
2927 }
2928 }
2929 }
2930 }
2931
2932 if contribution == 0
2934 && matches!(mode, Mode::Latest | Mode::Backward | Mode::Range)
2935 && let Some(b) = before_ms
2936 {
2937 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
2938 let new_b = b.saturating_sub(jump);
2939 if new_b != b {
2940 before_ms = Some(new_b);
2941 }
2942 }
2943
2944 if contribution == 0 {
2945 progressless_loops = progressless_loops.saturating_add(1);
2946 if progressless_loops >= 3 {
2947 break;
2948 }
2949 } else {
2950 progressless_loops = 0;
2951
2952 match mode {
2954 Mode::Latest | Mode::Backward => {
2955 if let Some(oldest) = page_oldest_ms {
2956 before_ms = Some(oldest.saturating_sub(1));
2957 have_latest_first_page = true;
2958 } else {
2959 break;
2960 }
2961 }
2962 Mode::Range => {
2963 if forward_prepend_mode || req_used_before {
2964 if let Some(oldest) = page_oldest_ms {
2965 let jump_back = slot_ms.max(60_000); before_ms = Some(oldest.saturating_sub(jump_back));
2968 after_ms = None;
2969 } else {
2970 break;
2971 }
2972 } else if let Some(newest) = page_newest_ms {
2973 after_ms = Some(newest.saturating_add(1));
2974 before_ms = None;
2975 } else {
2976 break;
2977 }
2978 }
2979 }
2980 }
2981
2982 if let Some(lim) = limit
2984 && lim > 0
2985 && out.len() >= lim as usize
2986 {
2987 break;
2988 }
2989
2990 if let Some(ens) = end_ns
2991 && let Some(last) = out.last()
2992 && last.ts_event.as_i64() >= ens
2993 {
2994 break;
2995 }
2996
2997 if let Some(sns) = start_ns
2998 && let Some(first) = out.first()
2999 && (matches!(mode, Mode::Backward) || forward_prepend_mode)
3000 && first.ts_event.as_i64() <= sns
3001 {
3002 if matches!(mode, Mode::Range) {
3004 if let Some(ens) = end_ns
3006 && let Some(last) = out.last()
3007 {
3008 let last_ts = last.ts_event.as_i64();
3009 if last_ts < ens {
3010 forward_prepend_mode = false;
3013 after_ms = Some((last_ts / 1_000_000).saturating_add(1));
3014 before_ms = None;
3015 continue;
3016 }
3017 }
3018 }
3019 break;
3020 }
3021
3022 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
3023 }
3024
3025 if out.is_empty() && matches!(mode, Mode::Range) {
3027 let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
3028 let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
3029 let mut p = GetCandlesticksParamsBuilder::default();
3030 p.inst_id(symbol.as_str())
3031 .bar(&bar_param)
3032 .limit(300)
3033 .before_ms(pivot);
3034 let params = p.build().map_err(anyhow::Error::new)?;
3035 let raw = if hist {
3036 self.inner.get_history_candles(params).await
3037 } else {
3038 self.inner.get_candles(params).await
3039 }
3040 .map_err(anyhow::Error::new)?;
3041
3042 if !raw.is_empty() {
3043 let ts_init = self.generate_ts_init();
3044 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
3045
3046 for r in &raw {
3047 page.push(parse_candlestick(
3048 r,
3049 bar_type,
3050 inst.price_precision(),
3051 inst.size_precision(),
3052 ts_init,
3053 )?);
3054 }
3055 page.reverse();
3056 out = page
3057 .into_iter()
3058 .filter(|b| {
3059 let ts = b.ts_event.as_i64();
3060 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
3061 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
3062 ok_after && ok_before
3063 })
3064 .collect();
3065 }
3066 }
3067
3068 if let Some(ens) = end_ns {
3070 while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
3071 out.pop();
3072 }
3073 }
3074
3075 if matches!(mode, Mode::Range)
3077 && !forward_prepend_mode
3078 && let Some(sns) = start_ns
3079 {
3080 let lower = sns.saturating_sub(slot_ns);
3081 while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
3082 out.remove(0);
3083 }
3084 }
3085
3086 if let Some(lim) = limit
3088 && lim > 0
3089 && out.len() > lim as usize
3090 {
3091 let start = out.len() - lim as usize;
3092 out.drain(..start);
3093 }
3094
3095 Ok(out)
3096 }
3097
3098 #[expect(clippy::too_many_arguments)]
3109 pub async fn request_order_status_reports(
3110 &self,
3111 account_id: AccountId,
3112 instrument_type: Option<OKXInstrumentType>,
3113 instrument_id: Option<InstrumentId>,
3114 start: Option<DateTime<Utc>>,
3115 end: Option<DateTime<Utc>>,
3116 open_only: bool,
3117 limit: Option<u32>,
3118 ) -> anyhow::Result<Vec<OrderStatusReport>> {
3119 let instrument_type = if let Some(instrument_type) = instrument_type {
3120 instrument_type
3121 } else {
3122 let instrument_id = instrument_id.ok_or_else(|| {
3123 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
3124 })?;
3125 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
3126 okx_instrument_type(&instrument)?
3127 };
3128
3129 let mut history_base = GetOrderHistoryParamsBuilder::default();
3130 history_base.inst_type(instrument_type);
3131
3132 if let Some(instrument_id) = instrument_id.as_ref() {
3133 history_base.inst_id(instrument_id.symbol.inner().to_string());
3134 }
3135 let history_base = history_base.build().map_err(|e| anyhow::anyhow!(e))?;
3136
3137 let mut pending_base = GetOrderListParamsBuilder::default();
3138 pending_base.inst_type(instrument_type);
3139
3140 if let Some(instrument_id) = instrument_id.as_ref() {
3141 pending_base.inst_id(instrument_id.symbol.inner().to_string());
3142 }
3143 let pending_base = pending_base.build().map_err(|e| anyhow::anyhow!(e))?;
3144
3145 let combined_resp = if open_only {
3146 self.paginate_orders_pending(&pending_base, limit).await?
3147 } else {
3148 let (history, pending) = tokio::try_join!(
3149 self.paginate_orders_history(&history_base, limit),
3150 self.paginate_orders_pending(&pending_base, limit),
3151 )?;
3152 let mut combined_resp = history;
3153 combined_resp.extend(pending);
3154 combined_resp
3155 };
3156
3157 let start_ns = start.map(UnixNanos::from);
3159 let end_ns = end.map(UnixNanos::from);
3160
3161 let ts_init = self.generate_ts_init();
3162 let mut reports = Vec::with_capacity(combined_resp.len());
3163
3164 let mut seen: AHashSet<String> = AHashSet::new();
3166
3167 for order in combined_resp {
3168 let seen_key = if !order.cl_ord_id.is_empty() {
3169 order.cl_ord_id.as_str().to_string()
3170 } else if let Some(algo_cl_ord_id) = order
3171 .algo_cl_ord_id
3172 .as_ref()
3173 .filter(|value| !value.as_str().is_empty())
3174 {
3175 algo_cl_ord_id.as_str().to_string()
3176 } else if let Some(algo_id) = order
3177 .algo_id
3178 .as_ref()
3179 .filter(|value| !value.as_str().is_empty())
3180 {
3181 algo_id.as_str().to_string()
3182 } else {
3183 order.ord_id.as_str().to_string()
3184 };
3185
3186 if !seen.insert(seen_key) {
3187 continue; }
3189
3190 let Ok(inst) = self.instrument_from_cache(order.inst_id) else {
3191 log::debug!(
3192 "Skipping order report for instrument not in cache: symbol={}",
3193 order.inst_id,
3194 );
3195 continue;
3196 };
3197
3198 let report = match parse_order_status_report(
3199 &order,
3200 account_id,
3201 inst.id(),
3202 inst.price_precision(),
3203 inst.size_precision(),
3204 ts_init,
3205 ) {
3206 Ok(report) => report,
3207 Err(e) => {
3208 log::error!("Failed to parse order status report: {e}");
3209 continue;
3210 }
3211 };
3212
3213 if let Some(start_ns) = start_ns
3214 && report.ts_last < start_ns
3215 {
3216 continue;
3217 }
3218
3219 if let Some(end_ns) = end_ns
3220 && report.ts_last > end_ns
3221 {
3222 continue;
3223 }
3224
3225 reports.push(report);
3226 }
3227
3228 Ok(reports)
3229 }
3230
3231 async fn paginate_orders_history(
3233 &self,
3234 base: &GetOrderHistoryParams,
3235 limit: Option<u32>,
3236 ) -> anyhow::Result<Vec<OKXOrderHistory>> {
3237 let mut all = Vec::new();
3238 let mut cursor: Option<String> = None;
3239 let mut exhausted = true;
3240
3241 for _ in 0..MAX_RECONCILIATION_PAGES {
3242 let mut params = base.clone();
3243 params.after = cursor.take();
3244
3245 let page = self
3246 .inner
3247 .get_orders_history(params)
3248 .await
3249 .map_err(|e| anyhow::anyhow!(e))?;
3250
3251 let page_len = page.len();
3252 cursor = page.last().map(|o| o.ord_id.to_string());
3253 all.extend(page);
3254
3255 if page_len < OKX_PAGE_SIZE {
3256 exhausted = false;
3257 break;
3258 }
3259
3260 if let Some(lim) = limit
3261 && all.len() >= lim as usize
3262 {
3263 exhausted = false;
3264 break;
3265 }
3266 }
3267
3268 if exhausted && !all.is_empty() {
3269 log::warn!(
3270 "Order history pagination hit {MAX_RECONCILIATION_PAGES} page cap, \
3271 results may be truncated ({} records)",
3272 all.len()
3273 );
3274 }
3275
3276 if let Some(lim) = limit {
3277 all.truncate(lim as usize);
3278 }
3279
3280 Ok(all)
3281 }
3282
3283 async fn paginate_orders_pending(
3285 &self,
3286 base: &GetOrderListParams,
3287 limit: Option<u32>,
3288 ) -> anyhow::Result<Vec<OKXOrderHistory>> {
3289 let mut all = Vec::new();
3290 let mut cursor: Option<String> = None;
3291 let mut exhausted = true;
3292
3293 for _ in 0..MAX_RECONCILIATION_PAGES {
3294 let mut params = base.clone();
3295 params.after = cursor.take();
3296
3297 let page = self
3298 .inner
3299 .get_orders_pending(params)
3300 .await
3301 .map_err(|e| anyhow::anyhow!(e))?;
3302
3303 let page_len = page.len();
3304 cursor = page.last().map(|o| o.ord_id.to_string());
3305 all.extend(page);
3306
3307 if page_len < OKX_PAGE_SIZE {
3308 exhausted = false;
3309 break;
3310 }
3311
3312 if let Some(lim) = limit
3313 && all.len() >= lim as usize
3314 {
3315 exhausted = false;
3316 break;
3317 }
3318 }
3319
3320 if exhausted && !all.is_empty() {
3321 log::warn!(
3322 "Pending orders pagination hit {MAX_RECONCILIATION_PAGES} page cap, \
3323 results may be truncated ({} records)",
3324 all.len()
3325 );
3326 }
3327
3328 if let Some(lim) = limit {
3329 all.truncate(lim as usize);
3330 }
3331
3332 Ok(all)
3333 }
3334
3335 async fn paginate_fills(
3337 &self,
3338 base: &GetTransactionDetailsParams,
3339 limit: Option<u32>,
3340 ) -> anyhow::Result<Vec<OKXTransactionDetail>> {
3341 let mut all = Vec::new();
3342 let mut cursor: Option<String> = None;
3343 let mut exhausted = true;
3344
3345 for _ in 0..MAX_RECONCILIATION_PAGES {
3346 let mut params = base.clone();
3347 params.after = cursor.take();
3348
3349 let page = self
3350 .inner
3351 .get_fills(params)
3352 .await
3353 .map_err(|e| anyhow::anyhow!(e))?;
3354
3355 let page_len = page.len();
3356 cursor = page.last().map(|o| o.bill_id.to_string());
3357 all.extend(page);
3358
3359 if page_len < OKX_PAGE_SIZE {
3360 exhausted = false;
3361 break;
3362 }
3363
3364 if let Some(lim) = limit
3365 && all.len() >= lim as usize
3366 {
3367 exhausted = false;
3368 break;
3369 }
3370 }
3371
3372 if exhausted && !all.is_empty() {
3373 log::warn!(
3374 "Fill pagination hit {MAX_RECONCILIATION_PAGES} page cap, \
3375 results may be truncated ({} records)",
3376 all.len()
3377 );
3378 }
3379
3380 if let Some(lim) = limit {
3381 all.truncate(lim as usize);
3382 }
3383
3384 Ok(all)
3385 }
3386
3387 async fn paginate_algo_pending(
3389 &self,
3390 base: &GetAlgoOrdersParams,
3391 limit: Option<usize>,
3392 ) -> anyhow::Result<Vec<OKXOrderAlgo>> {
3393 let mut all = Vec::new();
3394 let mut cursor: Option<String> = None;
3395 let mut exhausted = true;
3396
3397 for _ in 0..MAX_RECONCILIATION_PAGES {
3398 let mut params = base.clone();
3399 params.after = cursor.take();
3400
3401 let page = match self.inner.get_order_algo_pending(params).await {
3402 Ok(result) => result,
3403 Err(OKXHttpError::UnexpectedStatus { status, .. })
3404 if status == StatusCode::NOT_FOUND =>
3405 {
3406 exhausted = false;
3407 break;
3408 }
3409 Err(e) => return Err(e.into()),
3410 };
3411
3412 let page_len = page.len();
3413 cursor = page.last().map(|o| o.algo_id.clone());
3414 all.extend(page);
3415
3416 if page_len < OKX_PAGE_SIZE {
3417 exhausted = false;
3418 break;
3419 }
3420
3421 if let Some(lim) = limit
3422 && all.len() >= lim
3423 {
3424 exhausted = false;
3425 break;
3426 }
3427 }
3428
3429 if exhausted && !all.is_empty() {
3430 log::warn!(
3431 "Algo pending pagination hit {MAX_RECONCILIATION_PAGES} page cap, \
3432 results may be truncated ({} records)",
3433 all.len()
3434 );
3435 }
3436
3437 Ok(all)
3438 }
3439
3440 async fn paginate_algo_history(
3442 &self,
3443 base: &GetAlgoOrdersParams,
3444 limit: Option<usize>,
3445 ) -> anyhow::Result<Vec<OKXOrderAlgo>> {
3446 let mut all = Vec::new();
3447 let mut cursor: Option<String> = None;
3448 let mut exhausted = true;
3449
3450 for _ in 0..MAX_RECONCILIATION_PAGES {
3451 let mut params = base.clone();
3452 params.after = cursor.take();
3453
3454 let page = match self.inner.get_order_algo_history(params).await {
3455 Ok(result) => result,
3456 Err(OKXHttpError::UnexpectedStatus { status, .. })
3457 if status == StatusCode::NOT_FOUND =>
3458 {
3459 exhausted = false;
3460 break;
3461 }
3462 Err(e) => return Err(e.into()),
3463 };
3464
3465 let page_len = page.len();
3466 cursor = page.last().map(|o| o.algo_id.clone());
3467 all.extend(page);
3468
3469 if page_len < OKX_PAGE_SIZE {
3470 exhausted = false;
3471 break;
3472 }
3473
3474 if let Some(lim) = limit
3475 && all.len() >= lim
3476 {
3477 exhausted = false;
3478 break;
3479 }
3480 }
3481
3482 if exhausted && !all.is_empty() {
3483 log::warn!(
3484 "Algo history pagination hit {MAX_RECONCILIATION_PAGES} page cap, \
3485 results may be truncated ({} records)",
3486 all.len()
3487 );
3488 }
3489
3490 Ok(all)
3491 }
3492
3493 pub async fn request_fill_reports(
3503 &self,
3504 account_id: AccountId,
3505 instrument_type: Option<OKXInstrumentType>,
3506 instrument_id: Option<InstrumentId>,
3507 start: Option<DateTime<Utc>>,
3508 end: Option<DateTime<Utc>>,
3509 limit: Option<u32>,
3510 ) -> anyhow::Result<Vec<FillReport>> {
3511 let mut params = GetTransactionDetailsParamsBuilder::default();
3512
3513 let instrument_type = if let Some(instrument_type) = instrument_type {
3514 instrument_type
3515 } else {
3516 let instrument_id = instrument_id.ok_or_else(|| {
3517 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
3518 })?;
3519 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
3520 okx_instrument_type(&instrument)?
3521 };
3522
3523 params.inst_type(instrument_type);
3524
3525 if let Some(instrument_id) = instrument_id {
3526 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
3527 let instrument_type = okx_instrument_type(&instrument)?;
3528 params.inst_type(instrument_type);
3529 params.inst_id(instrument_id.symbol.inner().to_string());
3530 }
3531
3532 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
3533
3534 let resp = self.paginate_fills(¶ms, limit).await?;
3535
3536 let start_ns = start.map(UnixNanos::from);
3538 let end_ns = end.map(UnixNanos::from);
3539
3540 let ts_init = self.generate_ts_init();
3541 let mut reports = Vec::with_capacity(resp.len());
3542
3543 for detail in resp {
3544 if detail.fill_sz.is_empty() {
3546 continue;
3547 }
3548
3549 if let Ok(qty) = detail.fill_sz.parse::<f64>() {
3550 if qty <= 0.0 {
3551 continue;
3552 }
3553 } else {
3554 continue;
3556 }
3557
3558 let Ok(inst) = self.instrument_from_cache(detail.inst_id) else {
3559 log::debug!(
3560 "Skipping fill report for instrument not in cache: symbol={}",
3561 detail.inst_id,
3562 );
3563 continue;
3564 };
3565
3566 let report = match parse_fill_report(
3567 &detail,
3568 account_id,
3569 inst.id(),
3570 inst.price_precision(),
3571 inst.size_precision(),
3572 ts_init,
3573 ) {
3574 Ok(report) => report,
3575 Err(e) => {
3576 log::error!("Failed to parse fill report: {e}");
3577 continue;
3578 }
3579 };
3580
3581 if let Some(start_ns) = start_ns
3582 && report.ts_event < start_ns
3583 {
3584 continue;
3585 }
3586
3587 if let Some(end_ns) = end_ns
3588 && report.ts_event > end_ns
3589 {
3590 continue;
3591 }
3592
3593 reports.push(report);
3594 }
3595
3596 Ok(reports)
3597 }
3598
3599 pub async fn request_position_status_reports(
3626 &self,
3627 account_id: AccountId,
3628 instrument_type: Option<OKXInstrumentType>,
3629 instrument_id: Option<InstrumentId>,
3630 ) -> anyhow::Result<Vec<PositionStatusReport>> {
3631 let mut params = GetPositionsParamsBuilder::default();
3632
3633 let instrument_type = if let Some(instrument_type) = instrument_type {
3634 instrument_type
3635 } else {
3636 let instrument_id = instrument_id.ok_or_else(|| {
3637 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
3638 })?;
3639 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
3640 okx_instrument_type(&instrument)?
3641 };
3642
3643 params.inst_type(instrument_type);
3644
3645 instrument_id
3646 .as_ref()
3647 .map(|i| params.inst_id(i.symbol.inner()));
3648
3649 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
3650
3651 let resp = self
3652 .inner
3653 .get_positions(params)
3654 .await
3655 .map_err(|e| anyhow::anyhow!(e))?;
3656
3657 let ts_init = self.generate_ts_init();
3658 let mut reports = Vec::with_capacity(resp.len());
3659
3660 for position in resp {
3661 let Ok(inst) = self.instrument_from_cache(position.inst_id) else {
3662 log::debug!(
3663 "Skipping position report for instrument not in cache: symbol={}",
3664 position.inst_id,
3665 );
3666 continue;
3667 };
3668
3669 match parse_position_status_report(
3670 &position,
3671 account_id,
3672 inst.id(),
3673 inst.size_precision(),
3674 ts_init,
3675 ) {
3676 Ok(report) => reports.push(report),
3677 Err(e) => {
3678 log::error!("Failed to parse position status report: {e}");
3679 }
3680 }
3681 }
3682
3683 Ok(reports)
3684 }
3685
3686 pub async fn request_spot_margin_position_reports(
3701 &self,
3702 account_id: AccountId,
3703 ) -> anyhow::Result<Vec<PositionStatusReport>> {
3704 let accounts = self
3705 .inner
3706 .get_balance()
3707 .await
3708 .map_err(|e| anyhow::anyhow!(e))?;
3709
3710 let ts_init = self.generate_ts_init();
3711 let mut reports = Vec::new();
3712
3713 let cache_snapshot = self.instruments_cache.load();
3727 let mut candidates: Vec<&InstrumentAny> = cache_snapshot
3728 .values()
3729 .filter(|inst| matches!(inst, InstrumentAny::CurrencyPair(_)))
3730 .collect();
3731 candidates.sort_by(|a, b| {
3732 let a_sym = a.id().symbol.as_str().to_string();
3733 let b_sym = b.id().symbol.as_str().to_string();
3734 spot_quote_priority(&a_sym)
3735 .cmp(&spot_quote_priority(&b_sym))
3736 .then_with(|| a_sym.cmp(&b_sym))
3737 });
3738
3739 let mut by_base: AHashMap<Ustr, (InstrumentId, u8)> = AHashMap::new();
3740
3741 for inst in candidates {
3742 if let Some(base) = inst.base_currency() {
3743 let base_code = Ustr::from(base.code.as_str());
3744 by_base
3745 .entry(base_code)
3746 .or_insert_with(|| (inst.id(), inst.size_precision()));
3747 }
3748 }
3749
3750 for account in accounts {
3751 for balance in account.details {
3752 let ccy_str = balance.ccy.as_str();
3753
3754 let Some((instrument_id, size_precision)) =
3755 by_base.get(&Ustr::from(ccy_str)).copied()
3756 else {
3757 log::debug!("Skipping balance for {ccy_str} - no matching instrument in cache");
3758 continue;
3759 };
3760
3761 match parse_spot_margin_position_from_balance(
3762 &balance,
3763 account_id,
3764 instrument_id,
3765 size_precision,
3766 ts_init,
3767 ) {
3768 Ok(Some(report)) => reports.push(report),
3769 Ok(None) => {} Err(e) => {
3771 log::error!(
3772 "Failed to parse spot margin position from balance for {ccy_str}: {e}"
3773 );
3774 }
3775 }
3776 }
3777 }
3778
3779 Ok(reports)
3780 }
3781
3782 pub async fn place_order(
3792 &self,
3793 request: OKXPlaceOrderRequest,
3794 ) -> Result<OKXPlaceOrderResponse, OKXHttpError> {
3795 let body =
3796 serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3797
3798 let resp: Vec<OKXPlaceOrderResponse> = self
3799 .inner
3800 .send_request::<_, ()>(Method::POST, "/api/v5/trade/order", None, Some(body), true)
3801 .await?;
3802
3803 resp.into_iter()
3804 .next()
3805 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
3806 }
3807
3808 pub async fn place_algo_order(
3818 &self,
3819 request: OKXPlaceAlgoOrderRequest,
3820 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
3821 let body =
3822 serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3823
3824 let resp: Vec<OKXPlaceAlgoOrderResponse> = self
3825 .inner
3826 .send_request::<_, ()>(
3827 Method::POST,
3828 "/api/v5/trade/order-algo",
3829 None,
3830 Some(body),
3831 true,
3832 )
3833 .await?;
3834
3835 let item = resp
3836 .into_iter()
3837 .next()
3838 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))?;
3839
3840 if let Some(ref code) = item.s_code
3841 && code != "0"
3842 {
3843 let msg = item.s_msg.clone().unwrap_or_default();
3844 return Err(OKXHttpError::OkxError {
3845 error_code: code.clone(),
3846 message: msg,
3847 });
3848 }
3849
3850 Ok(item)
3851 }
3852
3853 pub async fn cancel_algo_order(
3863 &self,
3864 request: OKXCancelAlgoOrderRequest,
3865 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
3866 let body =
3869 serde_json::to_vec(&[request]).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3870
3871 let resp: Vec<OKXCancelAlgoOrderResponse> = self
3872 .inner
3873 .send_request::<_, ()>(
3874 Method::POST,
3875 "/api/v5/trade/cancel-algos",
3876 None,
3877 Some(body),
3878 true,
3879 )
3880 .await?;
3881
3882 let item = resp
3883 .into_iter()
3884 .next()
3885 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))?;
3886
3887 if let Some(ref code) = item.s_code
3888 && code != "0"
3889 {
3890 let msg = item.s_msg.clone().unwrap_or_default();
3891 return Err(OKXHttpError::OkxError {
3892 error_code: code.clone(),
3893 message: msg,
3894 });
3895 }
3896
3897 Ok(item)
3898 }
3899
3900 pub async fn cancel_algo_orders(
3913 &self,
3914 requests: Vec<OKXCancelAlgoOrderRequest>,
3915 ) -> Result<Vec<OKXCancelAlgoOrderResponse>, OKXHttpError> {
3916 if requests.is_empty() {
3917 return Ok(Vec::new());
3918 }
3919
3920 let body =
3921 serde_json::to_vec(&requests).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3922
3923 let resp: Vec<OKXCancelAlgoOrderResponse> = self
3924 .inner
3925 .send_request::<_, ()>(
3926 Method::POST,
3927 "/api/v5/trade/cancel-algos",
3928 None,
3929 Some(body),
3930 true,
3931 )
3932 .await?;
3933
3934 for item in &resp {
3935 if let Some(ref code) = item.s_code
3936 && code != "0"
3937 {
3938 let msg = item.s_msg.as_deref().unwrap_or("");
3939 log::warn!(
3940 "Algo cancel rejected: algo_id={} sCode={code} sMsg={msg}",
3941 item.algo_id
3942 );
3943 }
3944 }
3945
3946 Ok(resp)
3947 }
3948
3949 pub async fn cancel_advance_algo_orders(
3962 &self,
3963 requests: Vec<OKXCancelAlgoOrderRequest>,
3964 ) -> Result<Vec<OKXCancelAlgoOrderResponse>, OKXHttpError> {
3965 if requests.is_empty() {
3966 return Ok(Vec::new());
3967 }
3968
3969 let body =
3970 serde_json::to_vec(&requests).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
3971
3972 let resp: Vec<OKXCancelAlgoOrderResponse> = self
3973 .inner
3974 .send_request::<_, ()>(
3975 Method::POST,
3976 "/api/v5/trade/cancel-advance-algos",
3977 None,
3978 Some(body),
3979 true,
3980 )
3981 .await?;
3982
3983 for item in &resp {
3984 if let Some(ref code) = item.s_code
3985 && code != "0"
3986 {
3987 let msg = item.s_msg.as_deref().unwrap_or("");
3988 log::warn!(
3989 "Advance algo cancel rejected: algo_id={} sCode={code} sMsg={msg}",
3990 item.algo_id
3991 );
3992 }
3993 }
3994
3995 Ok(resp)
3996 }
3997
3998 pub async fn amend_algo_order(
4008 &self,
4009 request: OKXAmendAlgoOrderRequest,
4010 ) -> Result<OKXAmendAlgoOrderResponse, OKXHttpError> {
4011 let body =
4012 serde_json::to_vec(&request).map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
4013
4014 let resp: Vec<OKXAmendAlgoOrderResponse> = self
4015 .inner
4016 .send_request::<_, ()>(
4017 Method::POST,
4018 "/api/v5/trade/amend-algos",
4019 None,
4020 Some(body),
4021 true,
4022 )
4023 .await?;
4024
4025 resp.into_iter()
4026 .next()
4027 .ok_or_else(|| OKXHttpError::ValidationError("Empty response".to_string()))
4028 }
4029
4030 #[expect(clippy::too_many_arguments)]
4039 pub async fn amend_algo_order_with_domain_types(
4040 &self,
4041 instrument_id: InstrumentId,
4042 algo_id: String,
4043 new_trigger_price: Option<Price>,
4044 new_limit_price: Option<Price>,
4045 new_quantity: Option<Quantity>,
4046 new_callback_ratio: Option<String>,
4047 new_callback_spread: Option<String>,
4048 new_activation_price: Option<Price>,
4049 ) -> Result<OKXAmendAlgoOrderResponse, OKXHttpError> {
4050 let request = OKXAmendAlgoOrderRequest {
4051 inst_id: instrument_id.symbol.as_str().to_string(),
4052 algo_id,
4053 algo_cl_ord_id: None,
4054 new_sz: new_quantity.map(|q| q.to_string()),
4055 new_trigger_px: new_trigger_price.map(|p| p.to_string()),
4056 new_order_px: new_limit_price.map(|p| p.to_string()),
4057 new_callback_ratio,
4058 new_callback_spread,
4059 new_active_px: new_activation_price.map(|p| p.to_string()),
4060 };
4061
4062 self.amend_algo_order(request).await
4063 }
4064
4065 #[expect(clippy::too_many_arguments)]
4074 pub async fn place_order_with_domain_types(
4075 &self,
4076 instrument_id: InstrumentId,
4077 td_mode: OKXTradeMode,
4078 client_order_id: ClientOrderId,
4079 order_side: OrderSide,
4080 order_type: OrderType,
4081 quantity: Quantity,
4082 time_in_force: Option<TimeInForce>,
4083 price: Option<Price>,
4084 post_only: Option<bool>,
4085 reduce_only: Option<bool>,
4086 quote_quantity: Option<bool>,
4087 position_side: Option<PositionSide>,
4088 attach_algo_ords: Option<Vec<OKXAttachAlgoOrdRequest>>,
4089 px_usd: Option<String>,
4090 px_vol: Option<String>,
4091 ) -> Result<OKXPlaceOrderResponse, OKXHttpError> {
4092 if !OKX_SUPPORTED_ORDER_TYPES.contains(&order_type) {
4093 return Err(OKXHttpError::ValidationError(format!(
4094 "Unsupported order type: {order_type:?}",
4095 )));
4096 }
4097
4098 if matches!(
4099 order_type,
4100 OrderType::StopMarket
4101 | OrderType::StopLimit
4102 | OrderType::MarketIfTouched
4103 | OrderType::LimitIfTouched
4104 | OrderType::TrailingStopMarket
4105 ) {
4106 return Err(OKXHttpError::ValidationError(
4107 "Conditional order types must use OKX algo order placement".to_string(),
4108 ));
4109 }
4110
4111 if let Some(tif) = time_in_force
4112 && !OKX_SUPPORTED_TIME_IN_FORCE.contains(&tif)
4113 {
4114 return Err(OKXHttpError::ValidationError(format!(
4115 "Unsupported time in force: {tif:?}",
4116 )));
4117 }
4118
4119 if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
4120 return Err(OKXHttpError::ValidationError(
4121 "Invalid order side".to_string(),
4122 ));
4123 }
4124
4125 let instrument = self
4126 .instrument_from_cache(instrument_id.symbol.inner())
4127 .map_err(|e| OKXHttpError::ValidationError(e.to_string()))?;
4128 let instrument_type = okx_instrument_type(&instrument)
4129 .map_err(|e| OKXHttpError::ValidationError(e.to_string()))?;
4130
4131 if instrument_type == OKXInstrumentType::Option
4133 && matches!(order_type, OrderType::Market | OrderType::MarketToLimit)
4134 {
4135 return Err(OKXHttpError::ValidationError(
4136 "Market orders are not supported for OKX options, use Limit orders instead"
4137 .to_string(),
4138 ));
4139 }
4140
4141 let side = OKXSide::from(order_side.as_specified());
4142 let pos_side = position_side.map(Into::into).or({
4143 if matches!(
4144 instrument_type,
4145 OKXInstrumentType::Swap | OKXInstrumentType::Futures | OKXInstrumentType::Option
4146 ) {
4147 Some(OKXPositionSide::Net)
4148 } else {
4149 None
4150 }
4151 });
4152
4153 let tgt_ccy = if instrument_type == OKXInstrumentType::Spot
4154 && order_type == OrderType::Market
4155 && td_mode == OKXTradeMode::Cash
4156 {
4157 match quote_quantity {
4158 Some(true) => Some(OKXTargetCurrency::QuoteCcy),
4159 Some(false) if order_side == OrderSide::Buy => Some(OKXTargetCurrency::BaseCcy),
4160 _ => None,
4161 }
4162 } else {
4163 None
4164 };
4165
4166 let (ord_type, px) = if post_only.unwrap_or(false) {
4167 (OKXOrderType::PostOnly, price)
4168 } else if let Some(tif) = time_in_force {
4169 match (order_type, tif) {
4170 (OrderType::Market, TimeInForce::Fok) => {
4171 return Err(OKXHttpError::ValidationError(
4172 "Market orders with FOK time-in-force are not supported by OKX. Use Limit order with FOK instead.".to_string(),
4173 ));
4174 }
4175 (OrderType::Market, TimeInForce::Ioc) => {
4176 if matches!(
4178 instrument_type,
4179 OKXInstrumentType::Spot | OKXInstrumentType::Option
4180 ) {
4181 (OKXOrderType::Market, price)
4182 } else {
4183 (OKXOrderType::OptimalLimitIoc, price)
4184 }
4185 }
4186 (OrderType::Limit, TimeInForce::Fok) => {
4187 if instrument_type == OKXInstrumentType::Option {
4189 (OKXOrderType::OpFok, price)
4190 } else {
4191 (OKXOrderType::Fok, price)
4192 }
4193 }
4194 (OrderType::Limit, TimeInForce::Ioc) => (OKXOrderType::Ioc, price),
4195 _ => (OKXOrderType::from(order_type), price),
4196 }
4197 } else {
4198 (OKXOrderType::from(order_type), price)
4199 };
4200
4201 let reduce_only = if instrument_type == OKXInstrumentType::Option {
4203 None
4204 } else {
4205 reduce_only
4206 };
4207
4208 let (px, px_usd, px_vol) = if px_usd.is_some() {
4210 (None, px_usd, None)
4211 } else if px_vol.is_some() {
4212 (None, None, px_vol)
4213 } else {
4214 (px.map(|p| p.to_string()), None, None)
4215 };
4216
4217 let request = OKXPlaceOrderRequest {
4218 inst_id: instrument_id.symbol.as_str().to_string(),
4219 td_mode,
4220 ccy: None,
4221 cl_ord_id: Some(client_order_id.as_str().to_string()),
4222 tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
4223 side,
4224 pos_side,
4225 ord_type,
4226 sz: quantity.to_string(),
4227 px,
4228 px_usd,
4229 px_vol,
4230 reduce_only,
4231 tgt_ccy,
4232 attach_algo_ords,
4233 };
4234
4235 self.place_order(request).await
4236 }
4237
4238 #[expect(clippy::too_many_arguments)]
4247 pub async fn place_algo_order_with_domain_types(
4248 &self,
4249 instrument_id: InstrumentId,
4250 td_mode: OKXTradeMode,
4251 client_order_id: ClientOrderId,
4252 order_side: OrderSide,
4253 order_type: OrderType,
4254 quantity: Quantity,
4255 trigger_price: Option<Price>,
4256 trigger_type: Option<TriggerType>,
4257 limit_price: Option<Price>,
4258 reduce_only: Option<bool>,
4259 close_fraction: Option<String>,
4260 callback_ratio: Option<String>,
4261 callback_spread: Option<String>,
4262 activation_price: Option<Price>,
4263 ) -> Result<OKXPlaceAlgoOrderResponse, OKXHttpError> {
4264 if !matches!(order_side, OrderSide::Buy | OrderSide::Sell) {
4265 return Err(OKXHttpError::ValidationError(
4266 "Invalid order side".to_string(),
4267 ));
4268 }
4269
4270 let okx_side = OKXSide::from(order_side.as_specified());
4271
4272 let trigger_px_type_enum = trigger_type.map_or(OKXTriggerType::Last, Into::into);
4274
4275 let uses_close_fraction = close_fraction.is_some();
4276 let (
4277 algo_type,
4278 sz,
4279 trigger_px,
4280 order_px,
4281 trigger_px_type,
4282 sl_trigger_px,
4283 sl_ord_px,
4284 sl_trigger_px_type,
4285 tp_trigger_px,
4286 tp_ord_px,
4287 tp_trigger_px_type,
4288 pos_side,
4289 reduce_only,
4290 ) = if uses_close_fraction {
4291 if order_type == OrderType::TrailingStopMarket {
4292 return Err(OKXHttpError::ValidationError(
4293 "OKX close_fraction does not support TrailingStopMarket".to_string(),
4294 ));
4295 }
4296
4297 let trigger_px = trigger_price.map(|p| p.to_string()).ok_or_else(|| {
4298 OKXHttpError::ValidationError(
4299 "OKX close_fraction orders require trigger_price".to_string(),
4300 )
4301 })?;
4302
4303 let close_order_px =
4304 if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched) {
4305 limit_price.map(|p| p.to_string()).ok_or_else(|| {
4306 OKXHttpError::ValidationError(format!(
4307 "OKX {order_type:?} close_fraction orders require limit_price"
4308 ))
4309 })?
4310 } else {
4311 "-1".to_string()
4312 };
4313
4314 let (
4315 sl_trigger_px,
4316 sl_ord_px,
4317 sl_trigger_px_type,
4318 tp_trigger_px,
4319 tp_ord_px,
4320 tp_trigger_px_type,
4321 ) = match order_type {
4322 OrderType::StopMarket | OrderType::StopLimit => (
4323 Some(trigger_px),
4324 Some(close_order_px),
4325 Some(trigger_px_type_enum),
4326 None,
4327 None,
4328 None,
4329 ),
4330 OrderType::MarketIfTouched | OrderType::LimitIfTouched => (
4331 None,
4332 None,
4333 None,
4334 Some(trigger_px),
4335 Some(close_order_px),
4336 Some(trigger_px_type_enum),
4337 ),
4338 _ => {
4339 return Err(OKXHttpError::ValidationError(format!(
4340 "OKX close_fraction is only supported for stop/touched conditional orders, received {order_type:?}"
4341 )));
4342 }
4343 };
4344
4345 (
4346 OKXAlgoOrderType::Conditional,
4347 None,
4348 None,
4349 None,
4350 None,
4351 sl_trigger_px,
4352 sl_ord_px,
4353 sl_trigger_px_type,
4354 tp_trigger_px,
4355 tp_ord_px,
4356 tp_trigger_px_type,
4357 Some(OKXPositionSide::Net),
4358 Some(true),
4359 )
4360 } else {
4361 let algo_type = conditional_order_to_algo_type(order_type)
4362 .map_err(|e| OKXHttpError::ValidationError(e.to_string()))?;
4363
4364 let order_px = if matches!(order_type, OrderType::StopLimit | OrderType::LimitIfTouched)
4365 {
4366 limit_price.map(|p| p.to_string())
4367 } else if order_type == OrderType::TrailingStopMarket {
4368 None
4369 } else {
4370 Some("-1".to_string())
4371 };
4372
4373 (
4374 algo_type,
4375 Some(quantity.to_string()),
4376 trigger_price.map(|p| p.to_string()),
4377 order_px,
4378 Some(trigger_px_type_enum),
4379 None,
4380 None,
4381 None,
4382 None,
4383 None,
4384 None,
4385 None,
4386 reduce_only,
4387 )
4388 };
4389
4390 let request = OKXPlaceAlgoOrderRequest {
4391 inst_id: instrument_id.symbol.as_str().to_string(),
4392 inst_id_code: None,
4393 td_mode,
4394 side: okx_side,
4395 ord_type: algo_type,
4396 sz,
4397 algo_cl_ord_id: Some(client_order_id.as_str().to_string()),
4398 trigger_px,
4399 order_px,
4400 trigger_px_type,
4401 sl_trigger_px,
4402 sl_ord_px,
4403 sl_trigger_px_type,
4404 tp_trigger_px,
4405 tp_ord_px,
4406 tp_trigger_px_type,
4407 tgt_ccy: None,
4408 pos_side,
4409 close_position: None,
4410 tag: Some(OKX_NAUTILUS_BROKER_ID.to_string()),
4411 reduce_only,
4412 close_fraction,
4413 callback_ratio,
4414 callback_spread,
4415 active_px: activation_price.map(|p| p.to_string()),
4416 };
4417
4418 self.place_algo_order(request).await
4419 }
4420
4421 pub async fn cancel_algo_order_with_domain_types(
4430 &self,
4431 instrument_id: InstrumentId,
4432 algo_id: String,
4433 ) -> Result<OKXCancelAlgoOrderResponse, OKXHttpError> {
4434 let request = OKXCancelAlgoOrderRequest {
4435 inst_id: instrument_id.symbol.to_string(),
4436 inst_id_code: None,
4437 algo_id: Some(algo_id),
4438 algo_cl_ord_id: None,
4439 };
4440
4441 self.cancel_algo_order(request).await
4442 }
4443
4444 #[expect(clippy::too_many_arguments)]
4450 pub async fn request_algo_order_status_reports(
4451 &self,
4452 account_id: AccountId,
4453 instrument_type: Option<OKXInstrumentType>,
4454 instrument_id: Option<InstrumentId>,
4455 algo_id: Option<String>,
4456 algo_client_order_id: Option<ClientOrderId>,
4457 state: Option<OKXOrderStatus>,
4458 limit: Option<u32>,
4459 ) -> anyhow::Result<Vec<OrderStatusReport>> {
4460 let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
4461 let has_specific_lookup = algo_id.is_some() || algo_client_order_id.is_some();
4462
4463 let inst_type = if let Some(inst_type) = instrument_type {
4464 inst_type
4465 } else if let Some(inst_id) = instrument_id {
4466 let instrument = self.instrument_from_cache(inst_id.symbol.inner())?;
4467 let inst_type = okx_instrument_type(&instrument)?;
4468 instruments_cache.insert(inst_id.symbol.inner(), instrument);
4469 inst_type
4470 } else {
4471 anyhow::bail!("instrument_type or instrument_id required for algo order query")
4472 };
4473
4474 let ts_init = self.generate_ts_init();
4475 let mut reports = Vec::new();
4476 let mut seen: AHashSet<(String, String)> = AHashSet::new();
4477
4478 for ord_type in [
4479 OKXAlgoOrderType::Oco,
4480 OKXAlgoOrderType::Conditional,
4481 OKXAlgoOrderType::Trigger,
4482 OKXAlgoOrderType::MoveOrderStop,
4483 ] {
4484 let mut params_builder = GetAlgoOrdersParamsBuilder::default();
4485 params_builder.inst_type(inst_type);
4486 params_builder.ord_type(ord_type);
4487
4488 if let Some(inst_id) = instrument_id {
4489 params_builder.inst_id(inst_id.symbol.inner().to_string());
4490 }
4491
4492 if let Some(algo_id) = algo_id.as_ref() {
4493 params_builder.algo_id(algo_id.clone());
4494 }
4495
4496 if let Some(client_order_id) = algo_client_order_id.as_ref() {
4497 params_builder.algo_cl_ord_id(client_order_id.as_str().to_string());
4498 }
4499
4500 if let Some(state) = state {
4501 params_builder.state(state);
4502 }
4503
4504 let params = params_builder
4505 .build()
4506 .map_err(|e| anyhow::anyhow!(format!("Failed to build algo order params: {e}")))?;
4507
4508 let remaining = limit.map(|l| (l as usize).saturating_sub(reports.len()));
4509 let pending = self.paginate_algo_pending(¶ms, remaining).await?;
4510 self.collect_algo_reports(
4511 account_id,
4512 &pending,
4513 &mut instruments_cache,
4514 ts_init,
4515 &mut seen,
4516 &mut reports,
4517 )
4518 .await?;
4519
4520 if has_specific_lookup && !reports.is_empty() {
4521 return Ok(reports);
4522 }
4523
4524 if let Some(lim) = limit
4525 && reports.len() >= lim as usize
4526 {
4527 reports.truncate(lim as usize);
4528 return Ok(reports);
4529 }
4530
4531 if state.is_some() || has_specific_lookup {
4540 let remaining = limit.map(|l| (l as usize).saturating_sub(reports.len()));
4541 let history = self.paginate_algo_history(¶ms, remaining).await?;
4542 self.collect_algo_reports(
4543 account_id,
4544 &history,
4545 &mut instruments_cache,
4546 ts_init,
4547 &mut seen,
4548 &mut reports,
4549 )
4550 .await?;
4551
4552 if has_specific_lookup && !reports.is_empty() {
4553 return Ok(reports);
4554 }
4555
4556 if let Some(lim) = limit
4557 && reports.len() >= lim as usize
4558 {
4559 reports.truncate(lim as usize);
4560 return Ok(reports);
4561 }
4562 }
4563 }
4564
4565 Ok(reports)
4566 }
4567
4568 pub async fn request_algo_order_status_report(
4574 &self,
4575 account_id: AccountId,
4576 instrument_id: InstrumentId,
4577 algo_client_order_id: ClientOrderId,
4578 ) -> anyhow::Result<Option<OrderStatusReport>> {
4579 let reports = self
4580 .request_algo_order_status_reports(
4581 account_id,
4582 None,
4583 Some(instrument_id),
4584 None,
4585 Some(algo_client_order_id),
4586 None,
4587 Some(50_u32),
4588 )
4589 .await?;
4590
4591 Ok(reports.into_iter().next())
4592 }
4593
4594 pub fn raw_client(&self) -> &Arc<OKXRawHttpClient> {
4596 &self.inner
4597 }
4598
4599 async fn collect_algo_reports(
4600 &self,
4601 account_id: AccountId,
4602 orders: &[OKXOrderAlgo],
4603 instruments_cache: &mut AHashMap<Ustr, InstrumentAny>,
4604 ts_init: UnixNanos,
4605 seen: &mut AHashSet<(String, String)>,
4606 reports: &mut Vec<OrderStatusReport>,
4607 ) -> anyhow::Result<()> {
4608 for order in orders {
4609 let key = (order.algo_id.clone(), order.algo_cl_ord_id.clone());
4610 if !seen.insert(key) {
4611 continue;
4612 }
4613
4614 let instrument = if let Some(instrument) = instruments_cache.get(&order.inst_id) {
4615 instrument.clone()
4616 } else {
4617 let Ok(instrument) = self.instrument_from_cache(order.inst_id) else {
4618 log::debug!(
4619 "Skipping algo order report for instrument not in cache: symbol={}",
4620 order.inst_id,
4621 );
4622 continue;
4623 };
4624 instruments_cache.insert(order.inst_id, instrument.clone());
4625 instrument
4626 };
4627
4628 match parse_http_algo_order(order, account_id, &instrument, ts_init) {
4629 Ok(report) => reports.push(report),
4630 Err(e) => {
4631 log::error!("Failed to parse algo order report: {e}");
4632 }
4633 }
4634 }
4635
4636 Ok(())
4637 }
4638}
4639
4640fn parse_http_algo_order(
4641 order: &OKXOrderAlgo,
4642 account_id: AccountId,
4643 instrument: &InstrumentAny,
4644 ts_init: UnixNanos,
4645) -> anyhow::Result<OrderStatusReport> {
4646 let ord_px = if order.ord_px.is_empty() {
4647 "-1".to_string()
4648 } else {
4649 order.ord_px.clone()
4650 };
4651
4652 let reduce_only = if order.reduce_only.is_empty() {
4653 "false".to_string()
4654 } else {
4655 order.reduce_only.clone()
4656 };
4657
4658 let msg = OKXAlgoOrderMsg {
4659 algo_id: order.algo_id.clone(),
4660 algo_cl_ord_id: order.algo_cl_ord_id.clone(),
4661 cl_ord_id: order.cl_ord_id.clone(),
4662 ord_id: order.ord_id.clone(),
4663 inst_id: order.inst_id,
4664 inst_type: order.inst_type,
4665 ord_type: order.ord_type,
4666 state: order.state,
4667 side: order.side,
4668 pos_side: order.pos_side,
4669 sz: order.sz.clone(),
4670 trigger_px: order.trigger_px.clone(),
4671 trigger_px_type: order.trigger_px_type.unwrap_or(OKXTriggerType::None),
4672 sl_trigger_px: order.sl_trigger_px.clone(),
4673 sl_ord_px: order.sl_ord_px.clone(),
4674 sl_trigger_px_type: order.sl_trigger_px_type.unwrap_or(OKXTriggerType::None),
4675 tp_trigger_px: order.tp_trigger_px.clone(),
4676 tp_ord_px: order.tp_ord_px.clone(),
4677 tp_trigger_px_type: order.tp_trigger_px_type.unwrap_or(OKXTriggerType::None),
4678 ord_px,
4679 td_mode: order.td_mode,
4680 lever: order.lever.clone(),
4681 reduce_only,
4682 close_fraction: order.close_fraction.clone(),
4683 actual_px: order.actual_px.clone(),
4684 actual_sz: order.actual_sz.clone(),
4685 notional_usd: order.notional_usd.clone(),
4686 c_time: order.c_time,
4687 u_time: order.u_time,
4688 trigger_time: order.trigger_time.clone(),
4689 tag: order.tag.clone(),
4690 callback_ratio: order.callback_ratio.clone(),
4691 callback_spread: order.callback_spread.clone(),
4692 active_px: order.active_px.clone(),
4693 ccy: None,
4694 tgt_ccy: None,
4695 fee: None,
4696 fee_ccy: None,
4697 advance_ord_type: None,
4698 };
4699
4700 parse_algo_order_status_report(&msg, instrument, account_id, ts_init)
4701}