Skip to main content

nautilus_architect_ax/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the Ax REST API.
17
18use std::{
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroU32,
22    sync::{
23        Arc, LazyLock, RwLock,
24        atomic::{AtomicBool, Ordering},
25    },
26};
27
28use chrono::{DateTime, Utc};
29use nautilus_core::{
30    AtomicMap, AtomicTime, UUID4, consts::NAUTILUS_USER_AGENT, nanos::UnixNanos,
31    time::get_atomic_clock_realtime,
32};
33use nautilus_model::{
34    data::{Bar, BookOrder, FundingRateUpdate, TradeTick},
35    enums::{BookType, OrderSide, OrderType, TimeInForce},
36    events::AccountState,
37    identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
38    instruments::{Instrument, any::InstrumentAny},
39    orderbook::OrderBook,
40    reports::{FillReport, OrderStatusReport, PositionStatusReport},
41    types::{Price, Quantity},
42};
43use nautilus_network::{
44    http::HttpClient,
45    ratelimiter::quota::Quota,
46    retry::{RetryConfig, RetryManager},
47};
48use reqwest::{Method, header::USER_AGENT};
49use rust_decimal::Decimal;
50use serde::{Serialize, de::DeserializeOwned};
51use tokio_util::sync::CancellationToken;
52use ustr::Ustr;
53
54use super::{
55    error::AxHttpError,
56    models::{
57        AuthenticateApiKeyRequest, AxAuthenticateResponse, AxBalancesResponse, AxBookResponse,
58        AxCancelAllOrdersResponse, AxCancelOrderResponse, AxCandle, AxCandleResponse,
59        AxCandlesResponse, AxFillsResponse, AxFundingRatesResponse,
60        AxInitialMarginRequirementResponse, AxInstrument, AxInstrumentsResponse,
61        AxOpenOrdersResponse, AxOrderStatusQueryResponse, AxOrdersResponse, AxPlaceOrderResponse,
62        AxPositionsResponse, AxPreviewAggressiveLimitOrderResponse, AxReplaceOrderResponse,
63        AxRiskSnapshotResponse, AxTicker, AxTickersResponse, AxTradesResponse,
64        AxTransactionsResponse, AxWhoAmI, CancelAllOrdersRequest, CancelOrderRequest,
65        PlaceOrderRequest, PreviewAggressiveLimitOrderRequest, ReplaceOrderRequest,
66    },
67    parse::{
68        parse_account_state, parse_bar, parse_fill_report, parse_funding_rate,
69        parse_order_status_report, parse_perp_instrument, parse_position_status_report,
70        parse_trade_tick,
71    },
72    query::{
73        GetBookParams, GetCandleParams, GetCandlesParams, GetFundingRatesParams,
74        GetInstrumentParams, GetOrderStatusParams, GetOrdersParams, GetTickerParams,
75        GetTradesParams, GetTransactionsParams,
76    },
77};
78use crate::common::{
79    consts::{AX_HTTP_URL, AX_ORDERS_URL},
80    credential::Credential,
81    enums::{AxCandleWidth, AxInstrumentState},
82    parse::{cid_to_client_order_id, client_order_id_to_cid},
83};
84
85/// Default Ax REST API rate limit.
86///
87/// Conservative default of 10 requests per second.
88pub static AX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
89    Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant")
90});
91
92const AX_GLOBAL_RATE_KEY: &str = "architect:global";
93
94/// Raw HTTP client for low-level AX Exchange API operations.
95///
96/// This client handles request/response operations with the AX Exchange API,
97/// returning venue-specific response types. It does not parse to Nautilus domain types.
98pub struct AxRawHttpClient {
99    base_url: String,
100    orders_base_url: String,
101    client: HttpClient,
102    credential: Option<Credential>,
103    session_token: RwLock<Option<String>>,
104    retry_manager: RetryManager<AxHttpError>,
105    cancellation_token: RwLock<CancellationToken>,
106}
107
108impl Default for AxRawHttpClient {
109    fn default() -> Self {
110        Self::new(None, None, 60, 3, 1000, 10_000, None)
111            .expect("Failed to create default AxRawHttpClient")
112    }
113}
114
115impl Debug for AxRawHttpClient {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        let has_session_token = self.session_token.read().is_ok_and(|guard| guard.is_some());
118        f.debug_struct(stringify!(AxRawHttpClient))
119            .field("base_url", &self.base_url)
120            .field("orders_base_url", &self.orders_base_url)
121            .field("has_credentials", &self.credential.is_some())
122            .field("has_session_token", &has_session_token)
123            .finish()
124    }
125}
126
127impl AxRawHttpClient {
128    /// Returns the base URL for this client.
129    #[must_use]
130    pub fn base_url(&self) -> &str {
131        &self.base_url
132    }
133
134    /// Returns a masked version of the API key for logging purposes.
135    #[must_use]
136    pub fn api_key_masked(&self) -> String {
137        self.credential
138            .as_ref()
139            .map_or_else(|| "None".to_string(), |c| c.masked_api_key())
140    }
141
142    /// Cancel all pending HTTP requests.
143    ///
144    /// # Panics
145    ///
146    /// Panics if the cancellation token lock is poisoned.
147    pub fn cancel_all_requests(&self) {
148        self.cancellation_token
149            .read()
150            .expect("Lock poisoned")
151            .cancel();
152    }
153
154    /// Replaces the cancelled token so new requests can proceed after reconnect.
155    ///
156    /// # Panics
157    ///
158    /// Panics if the cancellation token lock is poisoned.
159    pub fn reset_cancellation_token(&self) {
160        *self.cancellation_token.write().expect("Lock poisoned") = CancellationToken::new();
161    }
162
163    /// Get a clone of the current cancellation token.
164    ///
165    /// # Panics
166    ///
167    /// Panics if the cancellation token lock is poisoned.
168    pub fn cancellation_token(&self) -> CancellationToken {
169        self.cancellation_token
170            .read()
171            .expect("Lock poisoned")
172            .clone()
173    }
174
175    /// Creates a new [`AxRawHttpClient`] using the default Ax HTTP URL.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if the retry manager cannot be created.
180    pub fn new(
181        base_url: Option<String>,
182        orders_base_url: Option<String>,
183        timeout_secs: u64,
184        max_retries: u32,
185        retry_delay_ms: u64,
186        retry_delay_max_ms: u64,
187        proxy_url: Option<String>,
188    ) -> Result<Self, AxHttpError> {
189        let retry_config = RetryConfig {
190            max_retries,
191            initial_delay_ms: retry_delay_ms,
192            max_delay_ms: retry_delay_max_ms,
193            backoff_factor: 2.0,
194            jitter_ms: 1000,
195            operation_timeout_ms: Some(60_000),
196            immediate_first: false,
197            max_elapsed_ms: Some(180_000),
198        };
199
200        let retry_manager = RetryManager::new(retry_config);
201
202        Ok(Self {
203            base_url: base_url.unwrap_or_else(|| AX_HTTP_URL.to_string()),
204            orders_base_url: orders_base_url.unwrap_or_else(|| AX_ORDERS_URL.to_string()),
205            client: HttpClient::new(
206                Self::default_headers(),
207                vec![],
208                Self::rate_limiter_quotas(),
209                Some(*AX_REST_QUOTA),
210                Some(timeout_secs),
211                proxy_url,
212            )
213            .map_err(|e| AxHttpError::NetworkError(format!("Failed to create HTTP client: {e}")))?,
214            credential: None,
215            session_token: RwLock::new(None),
216            retry_manager,
217            cancellation_token: RwLock::new(CancellationToken::new()),
218        })
219    }
220
221    /// Creates a new [`AxRawHttpClient`] configured with credentials.
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if the HTTP client cannot be created.
226    #[expect(clippy::too_many_arguments)]
227    pub fn with_credentials(
228        api_key: String,
229        api_secret: String,
230        base_url: Option<String>,
231        orders_base_url: Option<String>,
232        timeout_secs: u64,
233        max_retries: u32,
234        retry_delay_ms: u64,
235        retry_delay_max_ms: u64,
236        proxy_url: Option<String>,
237    ) -> Result<Self, AxHttpError> {
238        let retry_config = RetryConfig {
239            max_retries,
240            initial_delay_ms: retry_delay_ms,
241            max_delay_ms: retry_delay_max_ms,
242            backoff_factor: 2.0,
243            jitter_ms: 1000,
244            operation_timeout_ms: Some(60_000),
245            immediate_first: false,
246            max_elapsed_ms: Some(180_000),
247        };
248
249        let retry_manager = RetryManager::new(retry_config);
250
251        Ok(Self {
252            base_url: base_url.unwrap_or_else(|| AX_HTTP_URL.to_string()),
253            orders_base_url: orders_base_url.unwrap_or_else(|| AX_ORDERS_URL.to_string()),
254            client: HttpClient::new(
255                Self::default_headers(),
256                vec![],
257                Self::rate_limiter_quotas(),
258                Some(*AX_REST_QUOTA),
259                Some(timeout_secs),
260                proxy_url,
261            )
262            .map_err(|e| AxHttpError::NetworkError(format!("Failed to create HTTP client: {e}")))?,
263            credential: Some(Credential::new(api_key, api_secret)),
264            session_token: RwLock::new(None),
265            retry_manager,
266            cancellation_token: RwLock::new(CancellationToken::new()),
267        })
268    }
269
270    /// Sets the session token for authenticated requests.
271    ///
272    /// The session token is obtained through the login flow and used for bearer token authentication.
273    ///
274    /// # Panics
275    ///
276    /// Panics if the internal lock is poisoned (indicates a panic in another thread).
277    pub fn set_session_token(&self, token: String) {
278        // Lock poisoning indicates a panic in another thread, which is fatal
279        *self.session_token.write().expect("Lock poisoned") = Some(token);
280    }
281
282    fn default_headers() -> HashMap<String, String> {
283        HashMap::from([
284            (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
285            ("Accept".to_string(), "application/json".to_string()),
286        ])
287    }
288
289    fn rate_limiter_quotas() -> Vec<(String, Quota)> {
290        vec![(AX_GLOBAL_RATE_KEY.to_string(), *AX_REST_QUOTA)]
291    }
292
293    fn rate_limit_keys(endpoint: &str) -> Vec<String> {
294        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
295        let route = format!("architect:{normalized}");
296
297        vec![AX_GLOBAL_RATE_KEY.to_string(), route]
298    }
299
300    fn auth_headers(&self) -> Result<HashMap<String, String>, AxHttpError> {
301        // Lock poisoning indicates a panic in another thread, which is fatal
302        let guard = self.session_token.read().expect("Lock poisoned");
303        let session_token = guard.as_ref().ok_or(AxHttpError::MissingSessionToken)?;
304
305        let mut headers = HashMap::new();
306        headers.insert(
307            "Authorization".to_string(),
308            format!("Bearer {session_token}"),
309        );
310
311        Ok(headers)
312    }
313
314    async fn send_request<T: DeserializeOwned, P: Serialize>(
315        &self,
316        method: Method,
317        endpoint: &str,
318        params: Option<&P>,
319        body: Option<Vec<u8>>,
320        authenticate: bool,
321    ) -> Result<T, AxHttpError> {
322        self.send_request_to_url(&self.base_url, method, endpoint, params, body, authenticate)
323            .await
324    }
325
326    async fn send_request_to_url<T: DeserializeOwned, P: Serialize>(
327        &self,
328        base_url: &str,
329        method: Method,
330        endpoint: &str,
331        params: Option<&P>,
332        body: Option<Vec<u8>>,
333        authenticate: bool,
334    ) -> Result<T, AxHttpError> {
335        let endpoint = endpoint.to_string();
336        let url = format!("{base_url}{endpoint}");
337
338        let params_str = if method == Method::GET || method == Method::DELETE {
339            params
340                .map(serde_urlencoded::to_string)
341                .transpose()
342                .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize params: {e}")))?
343        } else {
344            None
345        };
346
347        let operation = || {
348            let url = url.clone();
349            let method = method.clone();
350            let endpoint = endpoint.clone();
351            let params_str = params_str.clone();
352            let body = body.clone();
353
354            async move {
355                let mut headers = Self::default_headers();
356
357                if authenticate {
358                    let auth_headers = self.auth_headers()?;
359                    headers.extend(auth_headers);
360                }
361
362                if body.is_some() {
363                    headers.insert("Content-Type".to_string(), "application/json".to_string());
364                }
365
366                let full_url = if let Some(ref query) = params_str {
367                    if query.is_empty() {
368                        url
369                    } else {
370                        format!("{url}?{query}")
371                    }
372                } else {
373                    url
374                };
375
376                let rate_limit_keys = Self::rate_limit_keys(&endpoint);
377
378                let response = self
379                    .client
380                    .request(
381                        method,
382                        full_url,
383                        None,
384                        Some(headers),
385                        body,
386                        None,
387                        Some(rate_limit_keys),
388                    )
389                    .await?;
390
391                let status = response.status;
392                let response_body = String::from_utf8_lossy(&response.body).to_string();
393
394                if !status.is_success() {
395                    return Err(AxHttpError::UnexpectedStatus {
396                        status: status.as_u16(),
397                        body: response_body,
398                    });
399                }
400
401                serde_json::from_str(&response_body).map_err(|e| {
402                    AxHttpError::JsonError(format!(
403                        "Failed to deserialize response: {e}\nBody: {response_body}"
404                    ))
405                })
406            }
407        };
408
409        // Only retry idempotent methods to avoid duplicate orders/cancels
410        let is_idempotent = matches!(method, Method::GET | Method::HEAD | Method::OPTIONS);
411        let should_retry = |error: &AxHttpError| -> bool { is_idempotent && error.is_retryable() };
412
413        let create_error = |msg: String| -> AxHttpError {
414            if msg == "canceled" {
415                AxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
416            } else {
417                AxHttpError::NetworkError(msg)
418            }
419        };
420
421        let cancel_token = self
422            .cancellation_token
423            .read()
424            .expect("Lock poisoned")
425            .clone();
426
427        self.retry_manager
428            .execute_with_retry_with_cancel(
429                endpoint.as_str(),
430                operation,
431                should_retry,
432                create_error,
433                &cancel_token,
434            )
435            .await
436    }
437
438    /// Fetches the current authenticated user information.
439    ///
440    /// # Endpoint
441    /// `GET /whoami`
442    ///
443    /// # Errors
444    ///
445    /// Returns an error if the request fails or the response cannot be parsed.
446    pub async fn get_whoami(&self) -> Result<AxWhoAmI, AxHttpError> {
447        self.send_request::<AxWhoAmI, ()>(Method::GET, "/whoami", None, None, true)
448            .await
449    }
450
451    /// Fetches all available instruments.
452    ///
453    /// # Endpoint
454    /// `GET /instruments`
455    ///
456    /// # Errors
457    ///
458    /// Returns an error if the request fails or the response cannot be parsed.
459    pub async fn get_instruments(&self) -> Result<AxInstrumentsResponse, AxHttpError> {
460        self.send_request::<AxInstrumentsResponse, ()>(
461            Method::GET,
462            "/instruments",
463            None,
464            None,
465            false,
466        )
467        .await
468    }
469
470    /// Fetches all account balances for the authenticated user.
471    ///
472    /// # Endpoint
473    /// `GET /balances`
474    ///
475    /// # Errors
476    ///
477    /// Returns an error if the request fails or the response cannot be parsed.
478    pub async fn get_balances(&self) -> Result<AxBalancesResponse, AxHttpError> {
479        self.send_request::<AxBalancesResponse, ()>(Method::GET, "/balances", None, None, true)
480            .await
481    }
482
483    /// Fetches all open positions for the authenticated user.
484    ///
485    /// # Endpoint
486    /// `GET /positions`
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if the request fails or the response cannot be parsed.
491    pub async fn get_positions(&self) -> Result<AxPositionsResponse, AxHttpError> {
492        self.send_request::<AxPositionsResponse, ()>(Method::GET, "/positions", None, None, true)
493            .await
494    }
495
496    /// Fetches all tickers.
497    ///
498    /// # Endpoint
499    /// `GET /tickers`
500    ///
501    /// # Errors
502    ///
503    /// Returns an error if the request fails or the response cannot be parsed.
504    pub async fn get_tickers(&self) -> Result<AxTickersResponse, AxHttpError> {
505        self.send_request::<AxTickersResponse, ()>(Method::GET, "/tickers", None, None, true)
506            .await
507    }
508
509    /// Fetches a single ticker by symbol.
510    ///
511    /// # Endpoint
512    /// `GET /ticker?symbol=<symbol>`
513    ///
514    /// # Errors
515    ///
516    /// Returns an error if the request fails or the response cannot be parsed.
517    pub async fn get_ticker(&self, symbol: Ustr) -> Result<AxTicker, AxHttpError> {
518        let params = GetTickerParams::new(symbol);
519        self.send_request::<AxTicker, _>(Method::GET, "/ticker", Some(&params), None, true)
520            .await
521    }
522
523    /// Fetches a single instrument by symbol.
524    ///
525    /// # Endpoint
526    /// `GET /instrument?symbol=<symbol>`
527    ///
528    /// # Errors
529    ///
530    /// Returns an error if the request fails or the response cannot be parsed.
531    pub async fn get_instrument(&self, symbol: Ustr) -> Result<AxInstrument, AxHttpError> {
532        let params = GetInstrumentParams::new(symbol);
533        self.send_request::<AxInstrument, _>(Method::GET, "/instrument", Some(&params), None, false)
534            .await
535    }
536
537    /// Authenticates using API key and secret to obtain a session token.
538    ///
539    /// # Endpoint
540    /// `POST /authenticate`
541    ///
542    /// # Errors
543    ///
544    /// Returns an error if the request fails or the response cannot be parsed.
545    pub async fn authenticate(
546        &self,
547        api_key: &str,
548        api_secret: &str,
549        expiration_seconds: i32,
550    ) -> Result<AxAuthenticateResponse, AxHttpError> {
551        let request = AuthenticateApiKeyRequest::new(api_key, api_secret, expiration_seconds);
552
553        let body = serde_json::to_vec(&request)
554            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
555
556        self.send_request::<AxAuthenticateResponse, ()>(
557            Method::POST,
558            "/authenticate",
559            None,
560            Some(body),
561            false,
562        )
563        .await
564    }
565
566    /// Authenticates using stored credentials or environment variables.
567    ///
568    /// # Credential Resolution
569    ///
570    /// Credentials are resolved in the following order:
571    /// 1. Stored credentials (from `with_credentials` constructor)
572    /// 2. Environment variables (`AX_API_KEY` and `AX_API_SECRET`)
573    ///
574    /// # Errors
575    ///
576    /// Returns an error if:
577    /// - No credentials are available from either source
578    /// - The HTTP request fails
579    /// - The credentials are invalid
580    pub async fn authenticate_auto(
581        &self,
582        expiration_seconds: i32,
583    ) -> Result<AxAuthenticateResponse, AxHttpError> {
584        let (api_key, api_secret) = self
585            .resolve_credentials()
586            .ok_or(AxHttpError::MissingCredentials)?;
587
588        self.authenticate(&api_key, &api_secret, expiration_seconds)
589            .await
590    }
591
592    fn resolve_credentials(&self) -> Option<(String, String)> {
593        if let Some(cred) = &self.credential {
594            return Some((cred.api_key().to_string(), cred.api_secret().to_string()));
595        }
596
597        let cred = Credential::resolve(None, None)?;
598        Some((cred.api_key().to_string(), cred.api_secret().to_string()))
599    }
600
601    /// Places a new order.
602    ///
603    /// # Endpoint
604    /// `POST /place_order` (orders base URL)
605    ///
606    /// # Errors
607    ///
608    /// Returns an error if the request fails or the response cannot be parsed.
609    pub async fn place_order(
610        &self,
611        request: &PlaceOrderRequest,
612    ) -> Result<AxPlaceOrderResponse, AxHttpError> {
613        let body = serde_json::to_vec(request)
614            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
615        self.send_request_to_url::<AxPlaceOrderResponse, ()>(
616            &self.orders_base_url,
617            Method::POST,
618            "/place_order",
619            None,
620            Some(body),
621            true,
622        )
623        .await
624    }
625
626    /// Cancels an existing order.
627    ///
628    /// # Endpoint
629    /// `POST /cancel_order` (orders base URL)
630    ///
631    /// # Errors
632    ///
633    /// Returns an error if the request fails or the response cannot be parsed.
634    pub async fn cancel_order(&self, order_id: &str) -> Result<AxCancelOrderResponse, AxHttpError> {
635        let request = CancelOrderRequest::new(order_id);
636        let body = serde_json::to_vec(&request)
637            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
638        self.send_request_to_url::<AxCancelOrderResponse, ()>(
639            &self.orders_base_url,
640            Method::POST,
641            "/cancel_order",
642            None,
643            Some(body),
644            true,
645        )
646        .await
647    }
648
649    /// Replaces (amends) an existing order.
650    ///
651    /// The exchange cancels the original order and creates a new one with the
652    /// updated fields. Unspecified optional fields inherit from the original.
653    ///
654    /// # Endpoint
655    /// `POST /replace_order` (orders base URL)
656    ///
657    /// # Errors
658    ///
659    /// Returns an error if the request fails or the response cannot be parsed.
660    pub async fn replace_order(
661        &self,
662        request: &ReplaceOrderRequest,
663    ) -> Result<AxReplaceOrderResponse, AxHttpError> {
664        let body = serde_json::to_vec(request)
665            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
666        self.send_request_to_url::<AxReplaceOrderResponse, ()>(
667            &self.orders_base_url,
668            Method::POST,
669            "/replace_order",
670            None,
671            Some(body),
672            true,
673        )
674        .await
675    }
676
677    /// Cancels all open orders, optionally filtered by symbol or venue.
678    ///
679    /// # Endpoint
680    /// `POST /cancel_all_orders` (orders base URL)
681    ///
682    /// # Errors
683    ///
684    /// Returns an error if the request fails or the response cannot be parsed.
685    pub async fn cancel_all_orders(
686        &self,
687        request: &CancelAllOrdersRequest,
688    ) -> Result<AxCancelAllOrdersResponse, AxHttpError> {
689        let body = serde_json::to_vec(request)
690            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
691        self.send_request_to_url::<AxCancelAllOrdersResponse, ()>(
692            &self.orders_base_url,
693            Method::POST,
694            "/cancel_all_orders",
695            None,
696            Some(body),
697            true,
698        )
699        .await
700    }
701
702    /// Fetches all open orders.
703    ///
704    /// # Endpoint
705    /// `GET /open_orders` (orders base URL)
706    ///
707    /// # Errors
708    ///
709    /// Returns an error if the request fails or the response cannot be parsed.
710    pub async fn get_open_orders(&self) -> Result<AxOpenOrdersResponse, AxHttpError> {
711        self.send_request_to_url::<AxOpenOrdersResponse, ()>(
712            &self.orders_base_url,
713            Method::GET,
714            "/open_orders",
715            None,
716            None,
717            true,
718        )
719        .await
720    }
721
722    /// Fetches all fills/trades.
723    ///
724    /// # Endpoint
725    /// `GET /fills`
726    ///
727    /// # Errors
728    ///
729    /// Returns an error if the request fails or the response cannot be parsed.
730    pub async fn get_fills(&self) -> Result<AxFillsResponse, AxHttpError> {
731        self.send_request::<AxFillsResponse, ()>(Method::GET, "/fills", None, None, true)
732            .await
733    }
734
735    /// Fetches historical candles.
736    ///
737    /// # Endpoint
738    /// `GET /candles`
739    ///
740    /// # Errors
741    ///
742    /// Returns an error if the request fails or the response cannot be parsed.
743    pub async fn get_candles(
744        &self,
745        symbol: Ustr,
746        start_timestamp_ns: i64,
747        end_timestamp_ns: i64,
748        candle_width: AxCandleWidth,
749    ) -> Result<AxCandlesResponse, AxHttpError> {
750        let params =
751            GetCandlesParams::new(symbol, start_timestamp_ns, end_timestamp_ns, candle_width);
752        self.send_request::<AxCandlesResponse, _>(
753            Method::GET,
754            "/candles",
755            Some(&params),
756            None,
757            true,
758        )
759        .await
760    }
761
762    /// Fetches the current (incomplete) candle.
763    ///
764    /// # Endpoint
765    /// `GET /candles/current`
766    ///
767    /// # Errors
768    ///
769    /// Returns an error if the request fails or the response cannot be parsed.
770    pub async fn get_current_candle(
771        &self,
772        symbol: Ustr,
773        candle_width: AxCandleWidth,
774    ) -> Result<AxCandle, AxHttpError> {
775        let params = GetCandleParams::new(symbol, candle_width);
776        let response = self
777            .send_request::<AxCandleResponse, _>(
778                Method::GET,
779                "/candles/current",
780                Some(&params),
781                None,
782                true,
783            )
784            .await?;
785        Ok(response.candle)
786    }
787
788    /// Fetches the last completed candle.
789    ///
790    /// # Endpoint
791    /// `GET /candles/last`
792    ///
793    /// # Errors
794    ///
795    /// Returns an error if the request fails or the response cannot be parsed.
796    pub async fn get_last_candle(
797        &self,
798        symbol: Ustr,
799        candle_width: AxCandleWidth,
800    ) -> Result<AxCandle, AxHttpError> {
801        let params = GetCandleParams::new(symbol, candle_width);
802        let response = self
803            .send_request::<AxCandleResponse, _>(
804                Method::GET,
805                "/candles/last",
806                Some(&params),
807                None,
808                true,
809            )
810            .await?;
811        Ok(response.candle)
812    }
813
814    /// Fetches funding rates for a symbol.
815    ///
816    /// # Endpoint
817    /// `GET /funding-rates`
818    ///
819    /// # Errors
820    ///
821    /// Returns an error if the request fails or the response cannot be parsed.
822    pub async fn get_funding_rates(
823        &self,
824        symbol: Ustr,
825        start_timestamp_ns: i64,
826        end_timestamp_ns: i64,
827    ) -> Result<AxFundingRatesResponse, AxHttpError> {
828        let params = GetFundingRatesParams::new(symbol, start_timestamp_ns, end_timestamp_ns);
829        self.send_request::<AxFundingRatesResponse, _>(
830            Method::GET,
831            "/funding-rates",
832            Some(&params),
833            None,
834            true,
835        )
836        .await
837    }
838
839    /// Fetches the current risk snapshot.
840    ///
841    /// # Endpoint
842    /// `GET /risk-snapshot`
843    ///
844    /// # Errors
845    ///
846    /// Returns an error if the request fails or the response cannot be parsed.
847    pub async fn get_risk_snapshot(&self) -> Result<AxRiskSnapshotResponse, AxHttpError> {
848        self.send_request::<AxRiskSnapshotResponse, ()>(
849            Method::GET,
850            "/risk-snapshot",
851            None,
852            None,
853            true,
854        )
855        .await
856    }
857
858    /// Previews an aggressive limit order to get the "take through" price.
859    ///
860    /// This endpoint calculates the price needed to sweep the order book for a given
861    /// quantity, which is used to simulate market orders on AX (which only supports
862    /// limit orders natively).
863    ///
864    /// # Endpoint
865    /// `POST /preview-aggressive-limit-order`
866    ///
867    /// # Errors
868    ///
869    /// Returns an error if the request fails or the response cannot be parsed.
870    pub async fn preview_aggressive_limit_order(
871        &self,
872        request: &PreviewAggressiveLimitOrderRequest,
873    ) -> Result<AxPreviewAggressiveLimitOrderResponse, AxHttpError> {
874        let body = serde_json::to_vec(request)
875            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
876        self.send_request::<AxPreviewAggressiveLimitOrderResponse, ()>(
877            Method::POST,
878            "/preview-aggressive-limit-order",
879            None,
880            Some(body),
881            true,
882        )
883        .await
884    }
885
886    /// Fetches transactions filtered by type.
887    ///
888    /// # Endpoint
889    /// `GET /transactions`
890    ///
891    /// # Errors
892    ///
893    /// Returns an error if the request fails or the response cannot be parsed.
894    pub async fn get_transactions(
895        &self,
896        transaction_types: Vec<String>,
897    ) -> Result<AxTransactionsResponse, AxHttpError> {
898        let params = GetTransactionsParams::new(transaction_types);
899        self.send_request::<AxTransactionsResponse, _>(
900            Method::GET,
901            "/transactions",
902            Some(&params),
903            None,
904            true,
905        )
906        .await
907    }
908
909    /// Fetches recent trades for a symbol.
910    ///
911    /// # Endpoint
912    /// `GET /trades`
913    ///
914    /// # Errors
915    ///
916    /// Returns an error if the request fails or the response cannot be parsed.
917    pub async fn get_trades(
918        &self,
919        symbol: Ustr,
920        limit: Option<i32>,
921    ) -> Result<AxTradesResponse, AxHttpError> {
922        let params = GetTradesParams::new(symbol, limit);
923        self.send_request::<AxTradesResponse, _>(Method::GET, "/trades", Some(&params), None, true)
924            .await
925    }
926
927    /// Fetches an order book snapshot for a symbol.
928    ///
929    /// # Endpoint
930    /// `GET /book`
931    ///
932    /// # Errors
933    ///
934    /// Returns an error if the request fails or the response cannot be parsed.
935    pub async fn get_book(
936        &self,
937        symbol: Ustr,
938        level: Option<i32>,
939    ) -> Result<AxBookResponse, AxHttpError> {
940        let params = GetBookParams::new(symbol, level);
941        self.send_request::<AxBookResponse, _>(Method::GET, "/book", Some(&params), None, false)
942            .await
943    }
944
945    /// Fetches the status of a single order by order ID.
946    ///
947    /// # Endpoint
948    /// `GET /order-status` (orders base URL)
949    ///
950    /// # Errors
951    ///
952    /// Returns an error if the request fails or the response cannot be parsed.
953    pub async fn get_order_status_by_id(
954        &self,
955        order_id: &str,
956    ) -> Result<AxOrderStatusQueryResponse, AxHttpError> {
957        let params = GetOrderStatusParams::by_order_id(order_id);
958        self.send_request_to_url::<AxOrderStatusQueryResponse, _>(
959            &self.orders_base_url,
960            Method::GET,
961            "/order-status",
962            Some(&params),
963            None,
964            true,
965        )
966        .await
967    }
968
969    /// Fetches the status of a single order by client order ID.
970    ///
971    /// # Endpoint
972    /// `GET /order-status` (orders base URL)
973    ///
974    /// # Errors
975    ///
976    /// Returns an error if the request fails or the response cannot be parsed.
977    pub async fn get_order_status_by_cid(
978        &self,
979        client_order_id: u64,
980    ) -> Result<AxOrderStatusQueryResponse, AxHttpError> {
981        let params = GetOrderStatusParams::by_client_order_id(client_order_id);
982        self.send_request_to_url::<AxOrderStatusQueryResponse, _>(
983            &self.orders_base_url,
984            Method::GET,
985            "/order-status",
986            Some(&params),
987            None,
988            true,
989        )
990        .await
991    }
992
993    /// Fetches historical orders with optional filters.
994    ///
995    /// # Endpoint
996    /// `GET /orders` (orders base URL)
997    ///
998    /// # Errors
999    ///
1000    /// Returns an error if the request fails or the response cannot be parsed.
1001    pub async fn get_orders(
1002        &self,
1003        params: &GetOrdersParams,
1004    ) -> Result<AxOrdersResponse, AxHttpError> {
1005        self.send_request_to_url::<AxOrdersResponse, _>(
1006            &self.orders_base_url,
1007            Method::GET,
1008            "/orders",
1009            Some(params),
1010            None,
1011            true,
1012        )
1013        .await
1014    }
1015
1016    /// Checks the initial margin requirement for a proposed order.
1017    ///
1018    /// # Endpoint
1019    /// `POST /initial-margin-requirement` (orders base URL)
1020    ///
1021    /// # Errors
1022    ///
1023    /// Returns an error if the request fails or the response cannot be parsed.
1024    pub async fn check_initial_margin(
1025        &self,
1026        request: &PlaceOrderRequest,
1027    ) -> Result<AxInitialMarginRequirementResponse, AxHttpError> {
1028        let body = serde_json::to_vec(request)
1029            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
1030        self.send_request_to_url::<AxInitialMarginRequirementResponse, ()>(
1031            &self.orders_base_url,
1032            Method::POST,
1033            "/initial-margin-requirement",
1034            None,
1035            Some(body),
1036            true,
1037        )
1038        .await
1039    }
1040}
1041
1042/// High-level HTTP client for the Ax REST API.
1043///
1044/// This client wraps the underlying [`AxRawHttpClient`] to provide a convenient
1045/// interface for Python bindings and instrument caching.
1046#[derive(Debug)]
1047#[cfg_attr(
1048    feature = "python",
1049    pyo3::pyclass(
1050        module = "nautilus_trader.core.nautilus_pyo3.architect",
1051        from_py_object
1052    )
1053)]
1054#[cfg_attr(
1055    feature = "python",
1056    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.architect_ax")
1057)]
1058pub struct AxHttpClient {
1059    pub(crate) inner: Arc<AxRawHttpClient>,
1060    pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
1061    clock: &'static AtomicTime,
1062    cache_initialized: Arc<AtomicBool>,
1063}
1064
1065impl Clone for AxHttpClient {
1066    fn clone(&self) -> Self {
1067        Self {
1068            inner: self.inner.clone(),
1069            instruments_cache: self.instruments_cache.clone(),
1070            cache_initialized: self.cache_initialized.clone(),
1071            clock: self.clock,
1072        }
1073    }
1074}
1075
1076impl Default for AxHttpClient {
1077    fn default() -> Self {
1078        Self::new(None, None, 60, 3, 1000, 10_000, None)
1079            .expect("Failed to create default AxHttpClient")
1080    }
1081}
1082
1083impl AxHttpClient {
1084    /// Creates a new [`AxHttpClient`] using the default Ax HTTP URL.
1085    ///
1086    /// # Errors
1087    ///
1088    /// Returns an error if the retry manager cannot be created.
1089    pub fn new(
1090        base_url: Option<String>,
1091        orders_base_url: Option<String>,
1092        timeout_secs: u64,
1093        max_retries: u32,
1094        retry_delay_ms: u64,
1095        retry_delay_max_ms: u64,
1096        proxy_url: Option<String>,
1097    ) -> Result<Self, AxHttpError> {
1098        Ok(Self {
1099            inner: Arc::new(AxRawHttpClient::new(
1100                base_url,
1101                orders_base_url,
1102                timeout_secs,
1103                max_retries,
1104                retry_delay_ms,
1105                retry_delay_max_ms,
1106                proxy_url,
1107            )?),
1108            instruments_cache: Arc::new(AtomicMap::new()),
1109            cache_initialized: Arc::new(AtomicBool::new(false)),
1110            clock: get_atomic_clock_realtime(),
1111        })
1112    }
1113
1114    /// Creates a new [`AxHttpClient`] configured with credentials.
1115    ///
1116    /// # Errors
1117    ///
1118    /// Returns an error if the HTTP client cannot be created.
1119    #[expect(clippy::too_many_arguments)]
1120    pub fn with_credentials(
1121        api_key: String,
1122        api_secret: String,
1123        base_url: Option<String>,
1124        orders_base_url: Option<String>,
1125        timeout_secs: u64,
1126        max_retries: u32,
1127        retry_delay_ms: u64,
1128        retry_delay_max_ms: u64,
1129        proxy_url: Option<String>,
1130    ) -> Result<Self, AxHttpError> {
1131        Ok(Self {
1132            inner: Arc::new(AxRawHttpClient::with_credentials(
1133                api_key,
1134                api_secret,
1135                base_url,
1136                orders_base_url,
1137                timeout_secs,
1138                max_retries,
1139                retry_delay_ms,
1140                retry_delay_max_ms,
1141                proxy_url,
1142            )?),
1143            instruments_cache: Arc::new(AtomicMap::new()),
1144            cache_initialized: Arc::new(AtomicBool::new(false)),
1145            clock: get_atomic_clock_realtime(),
1146        })
1147    }
1148
1149    /// Returns the base URL for this client.
1150    #[must_use]
1151    pub fn base_url(&self) -> &str {
1152        self.inner.base_url()
1153    }
1154
1155    /// Returns a masked version of the API key for logging purposes.
1156    #[must_use]
1157    pub fn api_key_masked(&self) -> String {
1158        self.inner.api_key_masked()
1159    }
1160
1161    /// Cancel all pending HTTP requests.
1162    pub fn cancel_all_requests(&self) {
1163        self.inner.cancel_all_requests();
1164    }
1165
1166    /// Replaces the cancelled token so new requests can proceed after reconnect.
1167    pub fn reset_cancellation_token(&self) {
1168        self.inner.reset_cancellation_token();
1169    }
1170
1171    /// Sets the session token for authenticated requests.
1172    ///
1173    /// The session token is obtained through the login flow and used for bearer token authentication.
1174    pub fn set_session_token(&self, token: String) {
1175        self.inner.set_session_token(token);
1176    }
1177
1178    /// Generates a timestamp for initialization.
1179    fn generate_ts_init(&self) -> UnixNanos {
1180        self.clock.get_time_ns()
1181    }
1182
1183    /// Checks if the client is initialized.
1184    ///
1185    /// The client is considered initialized if any instruments have been cached from the venue.
1186    #[must_use]
1187    pub fn is_initialized(&self) -> bool {
1188        self.cache_initialized.load(Ordering::Acquire)
1189    }
1190
1191    /// Returns a snapshot of all instrument symbols currently held in the internal cache.
1192    #[must_use]
1193    pub fn get_cached_symbols(&self) -> Vec<String> {
1194        self.instruments_cache
1195            .load()
1196            .keys()
1197            .map(|k| k.to_string())
1198            .collect()
1199    }
1200
1201    /// Caches multiple instruments.
1202    ///
1203    /// Any existing instruments with the same symbols will be replaced.
1204    pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1205        self.instruments_cache.rcu(|m| {
1206            for inst in instruments {
1207                m.insert(inst.raw_symbol().inner(), inst.clone());
1208            }
1209        });
1210        self.cache_initialized.store(true, Ordering::Release);
1211    }
1212
1213    /// Caches a single instrument.
1214    ///
1215    /// Any existing instrument with the same symbol will be replaced.
1216    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1217        self.instruments_cache
1218            .insert(instrument.raw_symbol().inner(), instrument);
1219        self.cache_initialized.store(true, Ordering::Release);
1220    }
1221
1222    /// Authenticates with Ax using API credentials.
1223    ///
1224    /// On success, the session token is automatically stored for subsequent authenticated requests.
1225    ///
1226    /// # Errors
1227    ///
1228    /// Returns an error if the HTTP request fails or credentials are invalid.
1229    pub async fn authenticate(
1230        &self,
1231        api_key: &str,
1232        api_secret: &str,
1233        expiration_seconds: i32,
1234    ) -> Result<String, AxHttpError> {
1235        let resp = self
1236            .inner
1237            .authenticate(api_key, api_secret, expiration_seconds)
1238            .await?;
1239        self.inner.set_session_token(resp.token.clone());
1240        Ok(resp.token)
1241    }
1242
1243    /// Authenticates using stored credentials or environment variables.
1244    ///
1245    /// # Credential Resolution
1246    ///
1247    /// Credentials are resolved in the following order:
1248    /// 1. Stored credentials (from `with_credentials` constructor)
1249    /// 2. Environment variables (`AX_API_KEY` and `AX_API_SECRET`)
1250    ///
1251    /// On success, the session token is automatically stored for subsequent authenticated requests.
1252    ///
1253    /// # Errors
1254    ///
1255    /// Returns an error if:
1256    /// - No credentials are available from either source
1257    /// - The HTTP request fails
1258    /// - The credentials are invalid
1259    pub async fn authenticate_auto(&self, expiration_seconds: i32) -> Result<String, AxHttpError> {
1260        let resp = self.inner.authenticate_auto(expiration_seconds).await?;
1261        self.inner.set_session_token(resp.token.clone());
1262        Ok(resp.token)
1263    }
1264
1265    /// Gets an instrument from the cache by symbol.
1266    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1267        self.instruments_cache.get_cloned(symbol)
1268    }
1269
1270    /// Requests all instruments from Ax.
1271    ///
1272    /// # Errors
1273    ///
1274    /// Returns an error if the HTTP request fails or instrument parsing fails.
1275    pub async fn request_instruments(
1276        &self,
1277        maker_fee: Option<Decimal>,
1278        taker_fee: Option<Decimal>,
1279    ) -> anyhow::Result<Vec<InstrumentAny>> {
1280        let resp = self
1281            .inner
1282            .get_instruments()
1283            .await
1284            .map_err(|e| anyhow::anyhow!(e))?;
1285
1286        let maker_fee = maker_fee.unwrap_or(Decimal::ZERO);
1287        let taker_fee = taker_fee.unwrap_or(Decimal::ZERO);
1288        let ts_init = self.generate_ts_init();
1289
1290        let mut instruments: Vec<InstrumentAny> = Vec::new();
1291        for inst in &resp.instruments {
1292            if inst.state == AxInstrumentState::Delisted {
1293                log::debug!("Skipping delisted instrument: {}", inst.symbol);
1294                continue;
1295            }
1296
1297            // Skip test instruments (not real tradable products)
1298            if inst.symbol.as_str().starts_with("TEST") {
1299                log::debug!("Skipping test instrument: {}", inst.symbol);
1300                continue;
1301            }
1302
1303            match parse_perp_instrument(inst, maker_fee, taker_fee, ts_init, ts_init) {
1304                Ok(instrument) => instruments.push(instrument),
1305                Err(e) => {
1306                    log::warn!("Failed to parse instrument {}: {e}", inst.symbol);
1307                }
1308            }
1309        }
1310
1311        Ok(instruments)
1312    }
1313
1314    /// Requests a single instrument from Ax by symbol.
1315    ///
1316    /// # Errors
1317    ///
1318    /// Returns an error if the HTTP request fails or instrument parsing fails.
1319    pub async fn request_instrument(
1320        &self,
1321        symbol: Ustr,
1322        maker_fee: Option<Decimal>,
1323        taker_fee: Option<Decimal>,
1324    ) -> anyhow::Result<InstrumentAny> {
1325        let resp = self
1326            .inner
1327            .get_instrument(symbol)
1328            .await
1329            .map_err(|e| anyhow::anyhow!(e))?;
1330
1331        let maker_fee = maker_fee.unwrap_or(Decimal::ZERO);
1332        let taker_fee = taker_fee.unwrap_or(Decimal::ZERO);
1333        let ts_init = self.generate_ts_init();
1334
1335        parse_perp_instrument(&resp, maker_fee, taker_fee, ts_init, ts_init)
1336    }
1337
1338    /// Requests an order book snapshot from Ax and builds a Nautilus [`OrderBook`].
1339    ///
1340    /// Requires the instrument to be cached.
1341    ///
1342    /// # Errors
1343    ///
1344    /// Returns an error if:
1345    /// - The instrument is not found in the cache.
1346    /// - The HTTP request fails.
1347    pub async fn request_book_snapshot(
1348        &self,
1349        symbol: Ustr,
1350        depth: Option<usize>,
1351    ) -> anyhow::Result<OrderBook> {
1352        let instrument = self
1353            .get_instrument(&symbol)
1354            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1355
1356        let resp = self
1357            .inner
1358            .get_book(symbol, Some(2))
1359            .await
1360            .map_err(|e| anyhow::anyhow!(e))?;
1361
1362        let instrument_id = instrument.id();
1363        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1364
1365        let price_precision = instrument.price_precision();
1366        let size_precision = instrument.size_precision();
1367        let ts_event = UnixNanos::from(resp.book.ts as u64 * 1_000_000_000 + resp.book.tn as u64);
1368
1369        for (i, level) in resp.book.b.iter().enumerate() {
1370            if depth.is_some_and(|d| i >= d) {
1371                break;
1372            }
1373            let price = Price::from_decimal_dp(level.p, price_precision)
1374                .unwrap_or_else(|_| Price::from(level.p.to_string().as_str()));
1375            let size = Quantity::new(level.q as f64, size_precision);
1376            let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1377            book.add(order, 0, i as u64, ts_event);
1378        }
1379
1380        let bids_len = resp.book.b.len();
1381        for (i, level) in resp.book.a.iter().enumerate() {
1382            if depth.is_some_and(|d| i >= d) {
1383                break;
1384            }
1385            let price = Price::from_decimal_dp(level.p, price_precision)
1386                .unwrap_or_else(|_| Price::from(level.p.to_string().as_str()));
1387            let size = Quantity::new(level.q as f64, size_precision);
1388            let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1389            book.add(order, 0, (bids_len + i) as u64, ts_event);
1390        }
1391
1392        Ok(book)
1393    }
1394
1395    /// Requests recent trades from Ax and parses them to Nautilus [`TradeTick`].
1396    ///
1397    /// The AX trades endpoint does not accept time range parameters, so
1398    /// `start` and `end` are applied as client-side filters after fetching.
1399    ///
1400    /// Requires the instrument to be cached.
1401    ///
1402    /// # Errors
1403    ///
1404    /// Returns an error if:
1405    /// - The instrument is not found in the cache.
1406    /// - The HTTP request fails.
1407    /// - Trade parsing fails.
1408    pub async fn request_trade_ticks(
1409        &self,
1410        symbol: Ustr,
1411        limit: Option<i32>,
1412        start: Option<UnixNanos>,
1413        end: Option<UnixNanos>,
1414    ) -> anyhow::Result<Vec<TradeTick>> {
1415        let instrument = self
1416            .get_instrument(&symbol)
1417            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1418
1419        let resp = self
1420            .inner
1421            .get_trades(symbol, limit)
1422            .await
1423            .map_err(|e| anyhow::anyhow!(e))?;
1424
1425        let ts_init = self.generate_ts_init();
1426        let mut ticks = Vec::with_capacity(resp.trades.len());
1427
1428        for trade in &resp.trades {
1429            match parse_trade_tick(trade, &instrument, ts_init) {
1430                Ok(tick) => {
1431                    if start.is_some_and(|s| tick.ts_event < s) {
1432                        continue;
1433                    }
1434
1435                    if end.is_some_and(|e| tick.ts_event > e) {
1436                        continue;
1437                    }
1438                    ticks.push(tick);
1439                }
1440                Err(e) => {
1441                    log::warn!("Failed to parse trade for {symbol}: {e}");
1442                }
1443            }
1444        }
1445
1446        Ok(ticks)
1447    }
1448
1449    /// Requests historical bars from Ax and parses them to Nautilus Bar types.
1450    ///
1451    /// Requires the instrument to be cached (call `request_instruments` first).
1452    ///
1453    /// # Errors
1454    ///
1455    /// Returns an error if:
1456    /// - The instrument is not found in the cache.
1457    /// - The HTTP request fails.
1458    /// - Bar parsing fails.
1459    pub async fn request_bars(
1460        &self,
1461        symbol: Ustr,
1462        start: Option<DateTime<Utc>>,
1463        end: Option<DateTime<Utc>>,
1464        width: AxCandleWidth,
1465    ) -> anyhow::Result<Vec<Bar>> {
1466        let instrument = self
1467            .get_instrument(&symbol)
1468            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1469
1470        let start_ns = start.and_then(|dt| dt.timestamp_nanos_opt()).unwrap_or(0);
1471        let end_ns = end
1472            .and_then(|dt| dt.timestamp_nanos_opt())
1473            .unwrap_or_else(|| self.generate_ts_init().as_i64());
1474        let resp = self
1475            .inner
1476            .get_candles(symbol, start_ns, end_ns, width)
1477            .await
1478            .map_err(|e| anyhow::anyhow!(e))?;
1479
1480        let ts_init = self.generate_ts_init();
1481        let mut bars = Vec::with_capacity(resp.candles.len());
1482
1483        for candle in &resp.candles {
1484            match parse_bar(candle, &instrument, ts_init) {
1485                Ok(bar) => bars.push(bar),
1486                Err(e) => {
1487                    log::warn!("Failed to parse bar for {symbol}: {e}");
1488                }
1489            }
1490        }
1491
1492        Ok(bars)
1493    }
1494
1495    /// Requests funding rates from Ax and parses them to Nautilus types.
1496    ///
1497    /// # Errors
1498    ///
1499    /// Returns an error if the HTTP request fails.
1500    pub async fn request_funding_rates(
1501        &self,
1502        instrument_id: InstrumentId,
1503        start: Option<DateTime<Utc>>,
1504        end: Option<DateTime<Utc>>,
1505    ) -> Result<Vec<FundingRateUpdate>, AxHttpError> {
1506        let symbol = instrument_id.symbol.inner();
1507        let start_ns = start.and_then(|dt| dt.timestamp_nanos_opt()).unwrap_or(0);
1508        let end_ns = end
1509            .and_then(|dt| dt.timestamp_nanos_opt())
1510            .unwrap_or_else(|| self.generate_ts_init().as_i64());
1511        let response = self
1512            .inner
1513            .get_funding_rates(symbol, start_ns, end_ns)
1514            .await?;
1515
1516        let ts_init = self.generate_ts_init();
1517        let funding_rates = response
1518            .funding_rates
1519            .iter()
1520            .map(|r| parse_funding_rate(r, instrument_id, ts_init))
1521            .collect::<anyhow::Result<Vec<_>>>()
1522            .map_err(|e| AxHttpError::from(e.to_string()))?;
1523
1524        Ok(funding_rates)
1525    }
1526
1527    /// Requests account state from Ax and parses to a Nautilus [`AccountState`].
1528    ///
1529    /// # Errors
1530    ///
1531    /// Returns an error if the HTTP request fails or parsing fails.
1532    pub async fn request_account_state(
1533        &self,
1534        account_id: AccountId,
1535    ) -> anyhow::Result<AccountState> {
1536        let response = self
1537            .inner
1538            .get_balances()
1539            .await
1540            .map_err(|e| anyhow::anyhow!(e))?;
1541
1542        let ts_init = self.generate_ts_init();
1543        parse_account_state(&response, account_id, ts_init, ts_init)
1544    }
1545
1546    /// Checks the initial margin requirement for a proposed order.
1547    ///
1548    /// # Errors
1549    ///
1550    /// Returns an error if the HTTP request fails.
1551    pub async fn check_initial_margin(
1552        &self,
1553        request: &PlaceOrderRequest,
1554    ) -> anyhow::Result<Decimal> {
1555        let resp = self
1556            .inner
1557            .check_initial_margin(request)
1558            .await
1559            .map_err(|e| anyhow::anyhow!(e))?;
1560        Ok(resp.im)
1561    }
1562
1563    /// Queries a single order by venue order ID or client order ID using the
1564    /// dedicated `/order-status` endpoint, which works for any order state.
1565    ///
1566    /// The caller must supply `order_side`, `order_type`, and `time_in_force`
1567    /// because the endpoint does not return these fields.
1568    ///
1569    /// # Errors
1570    ///
1571    /// Returns an error if:
1572    /// - Neither `venue_order_id` nor `client_order_id` is provided.
1573    /// - The HTTP request fails.
1574    #[expect(clippy::too_many_arguments)]
1575    pub async fn request_order_status(
1576        &self,
1577        account_id: AccountId,
1578        instrument_id: InstrumentId,
1579        client_order_id: Option<ClientOrderId>,
1580        venue_order_id: Option<VenueOrderId>,
1581        order_side: OrderSide,
1582        order_type: OrderType,
1583        time_in_force: TimeInForce,
1584    ) -> anyhow::Result<OrderStatusReport> {
1585        let resp = if let Some(ref voi) = venue_order_id {
1586            self.inner.get_order_status_by_id(voi.as_str()).await
1587        } else if let Some(ref coid) = client_order_id {
1588            let cid = client_order_id_to_cid(coid);
1589            self.inner.get_order_status_by_cid(cid).await
1590        } else {
1591            anyhow::bail!("Either venue_order_id or client_order_id must be provided")
1592        }
1593        .map_err(|e| anyhow::anyhow!(e))?;
1594
1595        let detail = resp.status;
1596        let size_precision = self
1597            .get_instrument(&detail.symbol)
1598            .map_or(0, |i| i.size_precision());
1599
1600        let voi = VenueOrderId::new(&detail.order_id);
1601        let order_status = detail.state.into();
1602        let filled = detail.filled_quantity.unwrap_or(0);
1603        let remaining = detail.remaining_quantity.unwrap_or(0);
1604        let quantity = Quantity::new((filled + remaining) as f64, size_precision);
1605        let filled_qty = Quantity::new(filled as f64, size_precision);
1606        let ts_init = self.generate_ts_init();
1607
1608        let resolved_coid = client_order_id.or_else(|| detail.clord_id.map(cid_to_client_order_id));
1609
1610        Ok(OrderStatusReport::new(
1611            account_id,
1612            instrument_id,
1613            resolved_coid,
1614            voi,
1615            order_side,
1616            order_type,
1617            time_in_force,
1618            order_status,
1619            quantity,
1620            filled_qty,
1621            ts_init,
1622            ts_init,
1623            ts_init,
1624            Some(UUID4::new()),
1625        ))
1626    }
1627
1628    /// Requests open orders from Ax and parses them to Nautilus [`OrderStatusReport`].
1629    ///
1630    /// Requires instruments to be cached for parsing order details.
1631    ///
1632    /// The `cid_resolver` parameter is an optional function that resolves a `cid` (u64)
1633    /// to a `ClientOrderId`. This is needed for correlating orders submitted via WebSocket.
1634    ///
1635    /// # Errors
1636    ///
1637    /// Returns an error if:
1638    /// - The HTTP request fails.
1639    /// - An order's instrument is not found in the cache.
1640    /// - Order parsing fails.
1641    pub async fn request_order_status_reports<F>(
1642        &self,
1643        account_id: AccountId,
1644        cid_resolver: Option<F>,
1645    ) -> anyhow::Result<Vec<OrderStatusReport>>
1646    where
1647        F: Fn(u64) -> Option<ClientOrderId>,
1648    {
1649        let response = self
1650            .inner
1651            .get_open_orders()
1652            .await
1653            .map_err(|e| anyhow::anyhow!(e))?;
1654
1655        let ts_init = self.generate_ts_init();
1656        let mut reports = Vec::with_capacity(response.orders.len());
1657
1658        for order in &response.orders {
1659            let instrument = self
1660                .get_instrument(&order.s)
1661                .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", order.s))?;
1662
1663            match parse_order_status_report(
1664                order,
1665                account_id,
1666                &instrument,
1667                ts_init,
1668                cid_resolver.as_ref(),
1669            ) {
1670                Ok(report) => reports.push(report),
1671                Err(e) => {
1672                    log::warn!("Failed to parse order {}: {e}", order.oid);
1673                }
1674            }
1675        }
1676
1677        Ok(reports)
1678    }
1679
1680    /// Requests fills from Ax and parses them to Nautilus [`FillReport`].
1681    ///
1682    /// Requires instruments to be cached for parsing fill details.
1683    ///
1684    /// # Errors
1685    ///
1686    /// Returns an error if:
1687    /// - The HTTP request fails.
1688    /// - A fill's instrument is not found in the cache.
1689    /// - Fill parsing fails.
1690    pub async fn request_fill_reports(
1691        &self,
1692        account_id: AccountId,
1693    ) -> anyhow::Result<Vec<FillReport>> {
1694        let response = self
1695            .inner
1696            .get_fills()
1697            .await
1698            .map_err(|e| anyhow::anyhow!(e))?;
1699
1700        let ts_init = self.generate_ts_init();
1701        let mut reports = Vec::with_capacity(response.fills.len());
1702
1703        for fill in &response.fills {
1704            let instrument = self
1705                .get_instrument(&fill.symbol)
1706                .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", fill.symbol))?;
1707
1708            match parse_fill_report(fill, account_id, &instrument, ts_init) {
1709                Ok(report) => reports.push(report),
1710                Err(e) => {
1711                    log::warn!("Failed to parse fill {}: {e}", fill.trade_id);
1712                }
1713            }
1714        }
1715
1716        Ok(reports)
1717    }
1718
1719    /// Requests positions from Ax and parses them to Nautilus [`PositionStatusReport`].
1720    ///
1721    /// Requires instruments to be cached for parsing position details.
1722    ///
1723    /// # Errors
1724    ///
1725    /// Returns an error if:
1726    /// - The HTTP request fails.
1727    /// - A position's instrument is not found in the cache.
1728    /// - Position parsing fails.
1729    pub async fn request_position_reports(
1730        &self,
1731        account_id: AccountId,
1732    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1733        let response = self
1734            .inner
1735            .get_positions()
1736            .await
1737            .map_err(|e| anyhow::anyhow!(e))?;
1738
1739        let ts_init = self.generate_ts_init();
1740        let mut reports = Vec::with_capacity(response.positions.len());
1741
1742        for position in &response.positions {
1743            // Skip flat positions (zero quantity)
1744            if position.signed_quantity == 0 {
1745                continue;
1746            }
1747
1748            let instrument = self.get_instrument(&position.symbol).ok_or_else(|| {
1749                anyhow::anyhow!("Instrument {} not found in cache", position.symbol)
1750            })?;
1751
1752            match parse_position_status_report(position, account_id, &instrument, ts_init) {
1753                Ok(report) => reports.push(report),
1754                Err(e) => {
1755                    log::warn!("Failed to parse position for {}: {e}", position.symbol);
1756                }
1757            }
1758        }
1759
1760        Ok(reports)
1761    }
1762
1763    /// Cancels all open orders for an instrument.
1764    ///
1765    /// # Errors
1766    ///
1767    /// Returns an error if the request fails.
1768    pub async fn cancel_all_orders(&self, instrument_id: InstrumentId) -> Result<(), AxHttpError> {
1769        let request = CancelAllOrdersRequest::new().with_symbol(instrument_id.symbol.inner());
1770        self.inner.cancel_all_orders(&request).await?;
1771        Ok(())
1772    }
1773}