1use std::{
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroU32,
22 sync::{
23 Arc, LazyLock, RwLock,
24 atomic::{AtomicBool, Ordering},
25 },
26};
27
28use chrono::{DateTime, Utc};
29use nautilus_core::{
30 AtomicMap, AtomicTime, UUID4, consts::NAUTILUS_USER_AGENT, nanos::UnixNanos,
31 time::get_atomic_clock_realtime,
32};
33use nautilus_model::{
34 data::{Bar, BookOrder, FundingRateUpdate, TradeTick},
35 enums::{BookType, OrderSide, OrderType, TimeInForce},
36 events::AccountState,
37 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
38 instruments::{Instrument, any::InstrumentAny},
39 orderbook::OrderBook,
40 reports::{FillReport, OrderStatusReport, PositionStatusReport},
41 types::{Price, Quantity},
42};
43use nautilus_network::{
44 http::HttpClient,
45 ratelimiter::quota::Quota,
46 retry::{RetryConfig, RetryManager},
47};
48use reqwest::{Method, header::USER_AGENT};
49use rust_decimal::Decimal;
50use serde::{Serialize, de::DeserializeOwned};
51use tokio_util::sync::CancellationToken;
52use ustr::Ustr;
53
54use super::{
55 error::AxHttpError,
56 models::{
57 AuthenticateApiKeyRequest, AxAuthenticateResponse, AxBalancesResponse, AxBookResponse,
58 AxCancelAllOrdersResponse, AxCancelOrderResponse, AxCandle, AxCandleResponse,
59 AxCandlesResponse, AxFillsResponse, AxFundingRatesResponse,
60 AxInitialMarginRequirementResponse, AxInstrument, AxInstrumentsResponse,
61 AxOpenOrdersResponse, AxOrderStatusQueryResponse, AxOrdersResponse, AxPlaceOrderResponse,
62 AxPositionsResponse, AxPreviewAggressiveLimitOrderResponse, AxReplaceOrderResponse,
63 AxRiskSnapshotResponse, AxTicker, AxTickersResponse, AxTradesResponse,
64 AxTransactionsResponse, AxWhoAmI, CancelAllOrdersRequest, CancelOrderRequest,
65 PlaceOrderRequest, PreviewAggressiveLimitOrderRequest, ReplaceOrderRequest,
66 },
67 parse::{
68 parse_account_state, parse_bar, parse_fill_report, parse_funding_rate,
69 parse_order_status_report, parse_perp_instrument, parse_position_status_report,
70 parse_trade_tick,
71 },
72 query::{
73 GetBookParams, GetCandleParams, GetCandlesParams, GetFundingRatesParams,
74 GetInstrumentParams, GetOrderStatusParams, GetOrdersParams, GetTickerParams,
75 GetTradesParams, GetTransactionsParams,
76 },
77};
78use crate::common::{
79 consts::{AX_HTTP_URL, AX_ORDERS_URL},
80 credential::Credential,
81 enums::{AxCandleWidth, AxInstrumentState},
82 parse::{cid_to_client_order_id, client_order_id_to_cid},
83};
84
85pub static AX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
89 Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant")
90});
91
92const AX_GLOBAL_RATE_KEY: &str = "architect:global";
93
94pub struct AxRawHttpClient {
99 base_url: String,
100 orders_base_url: String,
101 client: HttpClient,
102 credential: Option<Credential>,
103 session_token: RwLock<Option<String>>,
104 retry_manager: RetryManager<AxHttpError>,
105 cancellation_token: RwLock<CancellationToken>,
106}
107
108impl Default for AxRawHttpClient {
109 fn default() -> Self {
110 Self::new(None, None, 60, 3, 1000, 10_000, None)
111 .expect("Failed to create default AxRawHttpClient")
112 }
113}
114
115impl Debug for AxRawHttpClient {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 let has_session_token = self.session_token.read().is_ok_and(|guard| guard.is_some());
118 f.debug_struct(stringify!(AxRawHttpClient))
119 .field("base_url", &self.base_url)
120 .field("orders_base_url", &self.orders_base_url)
121 .field("has_credentials", &self.credential.is_some())
122 .field("has_session_token", &has_session_token)
123 .finish()
124 }
125}
126
127impl AxRawHttpClient {
128 #[must_use]
130 pub fn base_url(&self) -> &str {
131 &self.base_url
132 }
133
134 #[must_use]
136 pub fn api_key_masked(&self) -> String {
137 self.credential
138 .as_ref()
139 .map_or_else(|| "None".to_string(), |c| c.masked_api_key())
140 }
141
142 pub fn cancel_all_requests(&self) {
148 self.cancellation_token
149 .read()
150 .expect("Lock poisoned")
151 .cancel();
152 }
153
154 pub fn reset_cancellation_token(&self) {
160 *self.cancellation_token.write().expect("Lock poisoned") = CancellationToken::new();
161 }
162
163 pub fn cancellation_token(&self) -> CancellationToken {
169 self.cancellation_token
170 .read()
171 .expect("Lock poisoned")
172 .clone()
173 }
174
175 pub fn new(
181 base_url: Option<String>,
182 orders_base_url: Option<String>,
183 timeout_secs: u64,
184 max_retries: u32,
185 retry_delay_ms: u64,
186 retry_delay_max_ms: u64,
187 proxy_url: Option<String>,
188 ) -> Result<Self, AxHttpError> {
189 let retry_config = RetryConfig {
190 max_retries,
191 initial_delay_ms: retry_delay_ms,
192 max_delay_ms: retry_delay_max_ms,
193 backoff_factor: 2.0,
194 jitter_ms: 1000,
195 operation_timeout_ms: Some(60_000),
196 immediate_first: false,
197 max_elapsed_ms: Some(180_000),
198 };
199
200 let retry_manager = RetryManager::new(retry_config);
201
202 Ok(Self {
203 base_url: base_url.unwrap_or_else(|| AX_HTTP_URL.to_string()),
204 orders_base_url: orders_base_url.unwrap_or_else(|| AX_ORDERS_URL.to_string()),
205 client: HttpClient::new(
206 Self::default_headers(),
207 vec![],
208 Self::rate_limiter_quotas(),
209 Some(*AX_REST_QUOTA),
210 Some(timeout_secs),
211 proxy_url,
212 )
213 .map_err(|e| AxHttpError::NetworkError(format!("Failed to create HTTP client: {e}")))?,
214 credential: None,
215 session_token: RwLock::new(None),
216 retry_manager,
217 cancellation_token: RwLock::new(CancellationToken::new()),
218 })
219 }
220
221 #[expect(clippy::too_many_arguments)]
227 pub fn with_credentials(
228 api_key: String,
229 api_secret: String,
230 base_url: Option<String>,
231 orders_base_url: Option<String>,
232 timeout_secs: u64,
233 max_retries: u32,
234 retry_delay_ms: u64,
235 retry_delay_max_ms: u64,
236 proxy_url: Option<String>,
237 ) -> Result<Self, AxHttpError> {
238 let retry_config = RetryConfig {
239 max_retries,
240 initial_delay_ms: retry_delay_ms,
241 max_delay_ms: retry_delay_max_ms,
242 backoff_factor: 2.0,
243 jitter_ms: 1000,
244 operation_timeout_ms: Some(60_000),
245 immediate_first: false,
246 max_elapsed_ms: Some(180_000),
247 };
248
249 let retry_manager = RetryManager::new(retry_config);
250
251 Ok(Self {
252 base_url: base_url.unwrap_or_else(|| AX_HTTP_URL.to_string()),
253 orders_base_url: orders_base_url.unwrap_or_else(|| AX_ORDERS_URL.to_string()),
254 client: HttpClient::new(
255 Self::default_headers(),
256 vec![],
257 Self::rate_limiter_quotas(),
258 Some(*AX_REST_QUOTA),
259 Some(timeout_secs),
260 proxy_url,
261 )
262 .map_err(|e| AxHttpError::NetworkError(format!("Failed to create HTTP client: {e}")))?,
263 credential: Some(Credential::new(api_key, api_secret)),
264 session_token: RwLock::new(None),
265 retry_manager,
266 cancellation_token: RwLock::new(CancellationToken::new()),
267 })
268 }
269
270 pub fn set_session_token(&self, token: String) {
278 *self.session_token.write().expect("Lock poisoned") = Some(token);
280 }
281
282 fn default_headers() -> HashMap<String, String> {
283 HashMap::from([
284 (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
285 ("Accept".to_string(), "application/json".to_string()),
286 ])
287 }
288
289 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
290 vec![(AX_GLOBAL_RATE_KEY.to_string(), *AX_REST_QUOTA)]
291 }
292
293 fn rate_limit_keys(endpoint: &str) -> Vec<String> {
294 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
295 let route = format!("architect:{normalized}");
296
297 vec![AX_GLOBAL_RATE_KEY.to_string(), route]
298 }
299
300 fn auth_headers(&self) -> Result<HashMap<String, String>, AxHttpError> {
301 let guard = self.session_token.read().expect("Lock poisoned");
303 let session_token = guard.as_ref().ok_or(AxHttpError::MissingSessionToken)?;
304
305 let mut headers = HashMap::new();
306 headers.insert(
307 "Authorization".to_string(),
308 format!("Bearer {session_token}"),
309 );
310
311 Ok(headers)
312 }
313
314 async fn send_request<T: DeserializeOwned, P: Serialize>(
315 &self,
316 method: Method,
317 endpoint: &str,
318 params: Option<&P>,
319 body: Option<Vec<u8>>,
320 authenticate: bool,
321 ) -> Result<T, AxHttpError> {
322 self.send_request_to_url(&self.base_url, method, endpoint, params, body, authenticate)
323 .await
324 }
325
326 async fn send_request_to_url<T: DeserializeOwned, P: Serialize>(
327 &self,
328 base_url: &str,
329 method: Method,
330 endpoint: &str,
331 params: Option<&P>,
332 body: Option<Vec<u8>>,
333 authenticate: bool,
334 ) -> Result<T, AxHttpError> {
335 let endpoint = endpoint.to_string();
336 let url = format!("{base_url}{endpoint}");
337
338 let params_str = if method == Method::GET || method == Method::DELETE {
339 params
340 .map(serde_urlencoded::to_string)
341 .transpose()
342 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize params: {e}")))?
343 } else {
344 None
345 };
346
347 let operation = || {
348 let url = url.clone();
349 let method = method.clone();
350 let endpoint = endpoint.clone();
351 let params_str = params_str.clone();
352 let body = body.clone();
353
354 async move {
355 let mut headers = Self::default_headers();
356
357 if authenticate {
358 let auth_headers = self.auth_headers()?;
359 headers.extend(auth_headers);
360 }
361
362 if body.is_some() {
363 headers.insert("Content-Type".to_string(), "application/json".to_string());
364 }
365
366 let full_url = if let Some(ref query) = params_str {
367 if query.is_empty() {
368 url
369 } else {
370 format!("{url}?{query}")
371 }
372 } else {
373 url
374 };
375
376 let rate_limit_keys = Self::rate_limit_keys(&endpoint);
377
378 let response = self
379 .client
380 .request(
381 method,
382 full_url,
383 None,
384 Some(headers),
385 body,
386 None,
387 Some(rate_limit_keys),
388 )
389 .await?;
390
391 let status = response.status;
392 let response_body = String::from_utf8_lossy(&response.body).to_string();
393
394 if !status.is_success() {
395 return Err(AxHttpError::UnexpectedStatus {
396 status: status.as_u16(),
397 body: response_body,
398 });
399 }
400
401 serde_json::from_str(&response_body).map_err(|e| {
402 AxHttpError::JsonError(format!(
403 "Failed to deserialize response: {e}\nBody: {response_body}"
404 ))
405 })
406 }
407 };
408
409 let is_idempotent = matches!(method, Method::GET | Method::HEAD | Method::OPTIONS);
411 let should_retry = |error: &AxHttpError| -> bool { is_idempotent && error.is_retryable() };
412
413 let create_error = |msg: String| -> AxHttpError {
414 if msg == "canceled" {
415 AxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
416 } else {
417 AxHttpError::NetworkError(msg)
418 }
419 };
420
421 let cancel_token = self
422 .cancellation_token
423 .read()
424 .expect("Lock poisoned")
425 .clone();
426
427 self.retry_manager
428 .execute_with_retry_with_cancel(
429 endpoint.as_str(),
430 operation,
431 should_retry,
432 create_error,
433 &cancel_token,
434 )
435 .await
436 }
437
438 pub async fn get_whoami(&self) -> Result<AxWhoAmI, AxHttpError> {
447 self.send_request::<AxWhoAmI, ()>(Method::GET, "/whoami", None, None, true)
448 .await
449 }
450
451 pub async fn get_instruments(&self) -> Result<AxInstrumentsResponse, AxHttpError> {
460 self.send_request::<AxInstrumentsResponse, ()>(
461 Method::GET,
462 "/instruments",
463 None,
464 None,
465 false,
466 )
467 .await
468 }
469
470 pub async fn get_balances(&self) -> Result<AxBalancesResponse, AxHttpError> {
479 self.send_request::<AxBalancesResponse, ()>(Method::GET, "/balances", None, None, true)
480 .await
481 }
482
483 pub async fn get_positions(&self) -> Result<AxPositionsResponse, AxHttpError> {
492 self.send_request::<AxPositionsResponse, ()>(Method::GET, "/positions", None, None, true)
493 .await
494 }
495
496 pub async fn get_tickers(&self) -> Result<AxTickersResponse, AxHttpError> {
505 self.send_request::<AxTickersResponse, ()>(Method::GET, "/tickers", None, None, true)
506 .await
507 }
508
509 pub async fn get_ticker(&self, symbol: Ustr) -> Result<AxTicker, AxHttpError> {
518 let params = GetTickerParams::new(symbol);
519 self.send_request::<AxTicker, _>(Method::GET, "/ticker", Some(¶ms), None, true)
520 .await
521 }
522
523 pub async fn get_instrument(&self, symbol: Ustr) -> Result<AxInstrument, AxHttpError> {
532 let params = GetInstrumentParams::new(symbol);
533 self.send_request::<AxInstrument, _>(Method::GET, "/instrument", Some(¶ms), None, false)
534 .await
535 }
536
537 pub async fn authenticate(
546 &self,
547 api_key: &str,
548 api_secret: &str,
549 expiration_seconds: i32,
550 ) -> Result<AxAuthenticateResponse, AxHttpError> {
551 let request = AuthenticateApiKeyRequest::new(api_key, api_secret, expiration_seconds);
552
553 let body = serde_json::to_vec(&request)
554 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
555
556 self.send_request::<AxAuthenticateResponse, ()>(
557 Method::POST,
558 "/authenticate",
559 None,
560 Some(body),
561 false,
562 )
563 .await
564 }
565
566 pub async fn authenticate_auto(
581 &self,
582 expiration_seconds: i32,
583 ) -> Result<AxAuthenticateResponse, AxHttpError> {
584 let (api_key, api_secret) = self
585 .resolve_credentials()
586 .ok_or(AxHttpError::MissingCredentials)?;
587
588 self.authenticate(&api_key, &api_secret, expiration_seconds)
589 .await
590 }
591
592 fn resolve_credentials(&self) -> Option<(String, String)> {
593 if let Some(cred) = &self.credential {
594 return Some((cred.api_key().to_string(), cred.api_secret().to_string()));
595 }
596
597 let cred = Credential::resolve(None, None)?;
598 Some((cred.api_key().to_string(), cred.api_secret().to_string()))
599 }
600
601 pub async fn place_order(
610 &self,
611 request: &PlaceOrderRequest,
612 ) -> Result<AxPlaceOrderResponse, AxHttpError> {
613 let body = serde_json::to_vec(request)
614 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
615 self.send_request_to_url::<AxPlaceOrderResponse, ()>(
616 &self.orders_base_url,
617 Method::POST,
618 "/place_order",
619 None,
620 Some(body),
621 true,
622 )
623 .await
624 }
625
626 pub async fn cancel_order(&self, order_id: &str) -> Result<AxCancelOrderResponse, AxHttpError> {
635 let request = CancelOrderRequest::new(order_id);
636 let body = serde_json::to_vec(&request)
637 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
638 self.send_request_to_url::<AxCancelOrderResponse, ()>(
639 &self.orders_base_url,
640 Method::POST,
641 "/cancel_order",
642 None,
643 Some(body),
644 true,
645 )
646 .await
647 }
648
649 pub async fn replace_order(
661 &self,
662 request: &ReplaceOrderRequest,
663 ) -> Result<AxReplaceOrderResponse, AxHttpError> {
664 let body = serde_json::to_vec(request)
665 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
666 self.send_request_to_url::<AxReplaceOrderResponse, ()>(
667 &self.orders_base_url,
668 Method::POST,
669 "/replace_order",
670 None,
671 Some(body),
672 true,
673 )
674 .await
675 }
676
677 pub async fn cancel_all_orders(
686 &self,
687 request: &CancelAllOrdersRequest,
688 ) -> Result<AxCancelAllOrdersResponse, AxHttpError> {
689 let body = serde_json::to_vec(request)
690 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
691 self.send_request_to_url::<AxCancelAllOrdersResponse, ()>(
692 &self.orders_base_url,
693 Method::POST,
694 "/cancel_all_orders",
695 None,
696 Some(body),
697 true,
698 )
699 .await
700 }
701
702 pub async fn get_open_orders(&self) -> Result<AxOpenOrdersResponse, AxHttpError> {
711 self.send_request_to_url::<AxOpenOrdersResponse, ()>(
712 &self.orders_base_url,
713 Method::GET,
714 "/open_orders",
715 None,
716 None,
717 true,
718 )
719 .await
720 }
721
722 pub async fn get_fills(&self) -> Result<AxFillsResponse, AxHttpError> {
731 self.send_request::<AxFillsResponse, ()>(Method::GET, "/fills", None, None, true)
732 .await
733 }
734
735 pub async fn get_candles(
744 &self,
745 symbol: Ustr,
746 start_timestamp_ns: i64,
747 end_timestamp_ns: i64,
748 candle_width: AxCandleWidth,
749 ) -> Result<AxCandlesResponse, AxHttpError> {
750 let params =
751 GetCandlesParams::new(symbol, start_timestamp_ns, end_timestamp_ns, candle_width);
752 self.send_request::<AxCandlesResponse, _>(
753 Method::GET,
754 "/candles",
755 Some(¶ms),
756 None,
757 true,
758 )
759 .await
760 }
761
762 pub async fn get_current_candle(
771 &self,
772 symbol: Ustr,
773 candle_width: AxCandleWidth,
774 ) -> Result<AxCandle, AxHttpError> {
775 let params = GetCandleParams::new(symbol, candle_width);
776 let response = self
777 .send_request::<AxCandleResponse, _>(
778 Method::GET,
779 "/candles/current",
780 Some(¶ms),
781 None,
782 true,
783 )
784 .await?;
785 Ok(response.candle)
786 }
787
788 pub async fn get_last_candle(
797 &self,
798 symbol: Ustr,
799 candle_width: AxCandleWidth,
800 ) -> Result<AxCandle, AxHttpError> {
801 let params = GetCandleParams::new(symbol, candle_width);
802 let response = self
803 .send_request::<AxCandleResponse, _>(
804 Method::GET,
805 "/candles/last",
806 Some(¶ms),
807 None,
808 true,
809 )
810 .await?;
811 Ok(response.candle)
812 }
813
814 pub async fn get_funding_rates(
823 &self,
824 symbol: Ustr,
825 start_timestamp_ns: i64,
826 end_timestamp_ns: i64,
827 ) -> Result<AxFundingRatesResponse, AxHttpError> {
828 let params = GetFundingRatesParams::new(symbol, start_timestamp_ns, end_timestamp_ns);
829 self.send_request::<AxFundingRatesResponse, _>(
830 Method::GET,
831 "/funding-rates",
832 Some(¶ms),
833 None,
834 true,
835 )
836 .await
837 }
838
839 pub async fn get_risk_snapshot(&self) -> Result<AxRiskSnapshotResponse, AxHttpError> {
848 self.send_request::<AxRiskSnapshotResponse, ()>(
849 Method::GET,
850 "/risk-snapshot",
851 None,
852 None,
853 true,
854 )
855 .await
856 }
857
858 pub async fn preview_aggressive_limit_order(
871 &self,
872 request: &PreviewAggressiveLimitOrderRequest,
873 ) -> Result<AxPreviewAggressiveLimitOrderResponse, AxHttpError> {
874 let body = serde_json::to_vec(request)
875 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
876 self.send_request::<AxPreviewAggressiveLimitOrderResponse, ()>(
877 Method::POST,
878 "/preview-aggressive-limit-order",
879 None,
880 Some(body),
881 true,
882 )
883 .await
884 }
885
886 pub async fn get_transactions(
895 &self,
896 transaction_types: Vec<String>,
897 ) -> Result<AxTransactionsResponse, AxHttpError> {
898 let params = GetTransactionsParams::new(transaction_types);
899 self.send_request::<AxTransactionsResponse, _>(
900 Method::GET,
901 "/transactions",
902 Some(¶ms),
903 None,
904 true,
905 )
906 .await
907 }
908
909 pub async fn get_trades(
918 &self,
919 symbol: Ustr,
920 limit: Option<i32>,
921 ) -> Result<AxTradesResponse, AxHttpError> {
922 let params = GetTradesParams::new(symbol, limit);
923 self.send_request::<AxTradesResponse, _>(Method::GET, "/trades", Some(¶ms), None, true)
924 .await
925 }
926
927 pub async fn get_book(
936 &self,
937 symbol: Ustr,
938 level: Option<i32>,
939 ) -> Result<AxBookResponse, AxHttpError> {
940 let params = GetBookParams::new(symbol, level);
941 self.send_request::<AxBookResponse, _>(Method::GET, "/book", Some(¶ms), None, false)
942 .await
943 }
944
945 pub async fn get_order_status_by_id(
954 &self,
955 order_id: &str,
956 ) -> Result<AxOrderStatusQueryResponse, AxHttpError> {
957 let params = GetOrderStatusParams::by_order_id(order_id);
958 self.send_request_to_url::<AxOrderStatusQueryResponse, _>(
959 &self.orders_base_url,
960 Method::GET,
961 "/order-status",
962 Some(¶ms),
963 None,
964 true,
965 )
966 .await
967 }
968
969 pub async fn get_order_status_by_cid(
978 &self,
979 client_order_id: u64,
980 ) -> Result<AxOrderStatusQueryResponse, AxHttpError> {
981 let params = GetOrderStatusParams::by_client_order_id(client_order_id);
982 self.send_request_to_url::<AxOrderStatusQueryResponse, _>(
983 &self.orders_base_url,
984 Method::GET,
985 "/order-status",
986 Some(¶ms),
987 None,
988 true,
989 )
990 .await
991 }
992
993 pub async fn get_orders(
1002 &self,
1003 params: &GetOrdersParams,
1004 ) -> Result<AxOrdersResponse, AxHttpError> {
1005 self.send_request_to_url::<AxOrdersResponse, _>(
1006 &self.orders_base_url,
1007 Method::GET,
1008 "/orders",
1009 Some(params),
1010 None,
1011 true,
1012 )
1013 .await
1014 }
1015
1016 pub async fn check_initial_margin(
1025 &self,
1026 request: &PlaceOrderRequest,
1027 ) -> Result<AxInitialMarginRequirementResponse, AxHttpError> {
1028 let body = serde_json::to_vec(request)
1029 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
1030 self.send_request_to_url::<AxInitialMarginRequirementResponse, ()>(
1031 &self.orders_base_url,
1032 Method::POST,
1033 "/initial-margin-requirement",
1034 None,
1035 Some(body),
1036 true,
1037 )
1038 .await
1039 }
1040}
1041
1042#[derive(Debug)]
1047#[cfg_attr(
1048 feature = "python",
1049 pyo3::pyclass(
1050 module = "nautilus_trader.core.nautilus_pyo3.architect",
1051 from_py_object
1052 )
1053)]
1054#[cfg_attr(
1055 feature = "python",
1056 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.architect_ax")
1057)]
1058pub struct AxHttpClient {
1059 pub(crate) inner: Arc<AxRawHttpClient>,
1060 pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
1061 clock: &'static AtomicTime,
1062 cache_initialized: Arc<AtomicBool>,
1063}
1064
1065impl Clone for AxHttpClient {
1066 fn clone(&self) -> Self {
1067 Self {
1068 inner: self.inner.clone(),
1069 instruments_cache: self.instruments_cache.clone(),
1070 cache_initialized: self.cache_initialized.clone(),
1071 clock: self.clock,
1072 }
1073 }
1074}
1075
1076impl Default for AxHttpClient {
1077 fn default() -> Self {
1078 Self::new(None, None, 60, 3, 1000, 10_000, None)
1079 .expect("Failed to create default AxHttpClient")
1080 }
1081}
1082
1083impl AxHttpClient {
1084 pub fn new(
1090 base_url: Option<String>,
1091 orders_base_url: Option<String>,
1092 timeout_secs: u64,
1093 max_retries: u32,
1094 retry_delay_ms: u64,
1095 retry_delay_max_ms: u64,
1096 proxy_url: Option<String>,
1097 ) -> Result<Self, AxHttpError> {
1098 Ok(Self {
1099 inner: Arc::new(AxRawHttpClient::new(
1100 base_url,
1101 orders_base_url,
1102 timeout_secs,
1103 max_retries,
1104 retry_delay_ms,
1105 retry_delay_max_ms,
1106 proxy_url,
1107 )?),
1108 instruments_cache: Arc::new(AtomicMap::new()),
1109 cache_initialized: Arc::new(AtomicBool::new(false)),
1110 clock: get_atomic_clock_realtime(),
1111 })
1112 }
1113
1114 #[expect(clippy::too_many_arguments)]
1120 pub fn with_credentials(
1121 api_key: String,
1122 api_secret: String,
1123 base_url: Option<String>,
1124 orders_base_url: Option<String>,
1125 timeout_secs: u64,
1126 max_retries: u32,
1127 retry_delay_ms: u64,
1128 retry_delay_max_ms: u64,
1129 proxy_url: Option<String>,
1130 ) -> Result<Self, AxHttpError> {
1131 Ok(Self {
1132 inner: Arc::new(AxRawHttpClient::with_credentials(
1133 api_key,
1134 api_secret,
1135 base_url,
1136 orders_base_url,
1137 timeout_secs,
1138 max_retries,
1139 retry_delay_ms,
1140 retry_delay_max_ms,
1141 proxy_url,
1142 )?),
1143 instruments_cache: Arc::new(AtomicMap::new()),
1144 cache_initialized: Arc::new(AtomicBool::new(false)),
1145 clock: get_atomic_clock_realtime(),
1146 })
1147 }
1148
1149 #[must_use]
1151 pub fn base_url(&self) -> &str {
1152 self.inner.base_url()
1153 }
1154
1155 #[must_use]
1157 pub fn api_key_masked(&self) -> String {
1158 self.inner.api_key_masked()
1159 }
1160
1161 pub fn cancel_all_requests(&self) {
1163 self.inner.cancel_all_requests();
1164 }
1165
1166 pub fn reset_cancellation_token(&self) {
1168 self.inner.reset_cancellation_token();
1169 }
1170
1171 pub fn set_session_token(&self, token: String) {
1175 self.inner.set_session_token(token);
1176 }
1177
1178 fn generate_ts_init(&self) -> UnixNanos {
1180 self.clock.get_time_ns()
1181 }
1182
1183 #[must_use]
1187 pub fn is_initialized(&self) -> bool {
1188 self.cache_initialized.load(Ordering::Acquire)
1189 }
1190
1191 #[must_use]
1193 pub fn get_cached_symbols(&self) -> Vec<String> {
1194 self.instruments_cache
1195 .load()
1196 .keys()
1197 .map(|k| k.to_string())
1198 .collect()
1199 }
1200
1201 pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1205 self.instruments_cache.rcu(|m| {
1206 for inst in instruments {
1207 m.insert(inst.raw_symbol().inner(), inst.clone());
1208 }
1209 });
1210 self.cache_initialized.store(true, Ordering::Release);
1211 }
1212
1213 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1217 self.instruments_cache
1218 .insert(instrument.raw_symbol().inner(), instrument);
1219 self.cache_initialized.store(true, Ordering::Release);
1220 }
1221
1222 pub async fn authenticate(
1230 &self,
1231 api_key: &str,
1232 api_secret: &str,
1233 expiration_seconds: i32,
1234 ) -> Result<String, AxHttpError> {
1235 let resp = self
1236 .inner
1237 .authenticate(api_key, api_secret, expiration_seconds)
1238 .await?;
1239 self.inner.set_session_token(resp.token.clone());
1240 Ok(resp.token)
1241 }
1242
1243 pub async fn authenticate_auto(&self, expiration_seconds: i32) -> Result<String, AxHttpError> {
1260 let resp = self.inner.authenticate_auto(expiration_seconds).await?;
1261 self.inner.set_session_token(resp.token.clone());
1262 Ok(resp.token)
1263 }
1264
1265 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1267 self.instruments_cache.get_cloned(symbol)
1268 }
1269
1270 pub async fn request_instruments(
1276 &self,
1277 maker_fee: Option<Decimal>,
1278 taker_fee: Option<Decimal>,
1279 ) -> anyhow::Result<Vec<InstrumentAny>> {
1280 let resp = self
1281 .inner
1282 .get_instruments()
1283 .await
1284 .map_err(|e| anyhow::anyhow!(e))?;
1285
1286 let maker_fee = maker_fee.unwrap_or(Decimal::ZERO);
1287 let taker_fee = taker_fee.unwrap_or(Decimal::ZERO);
1288 let ts_init = self.generate_ts_init();
1289
1290 let mut instruments: Vec<InstrumentAny> = Vec::new();
1291 for inst in &resp.instruments {
1292 if inst.state == AxInstrumentState::Delisted {
1293 log::debug!("Skipping delisted instrument: {}", inst.symbol);
1294 continue;
1295 }
1296
1297 if inst.symbol.as_str().starts_with("TEST") {
1299 log::debug!("Skipping test instrument: {}", inst.symbol);
1300 continue;
1301 }
1302
1303 match parse_perp_instrument(inst, maker_fee, taker_fee, ts_init, ts_init) {
1304 Ok(instrument) => instruments.push(instrument),
1305 Err(e) => {
1306 log::warn!("Failed to parse instrument {}: {e}", inst.symbol);
1307 }
1308 }
1309 }
1310
1311 Ok(instruments)
1312 }
1313
1314 pub async fn request_instrument(
1320 &self,
1321 symbol: Ustr,
1322 maker_fee: Option<Decimal>,
1323 taker_fee: Option<Decimal>,
1324 ) -> anyhow::Result<InstrumentAny> {
1325 let resp = self
1326 .inner
1327 .get_instrument(symbol)
1328 .await
1329 .map_err(|e| anyhow::anyhow!(e))?;
1330
1331 let maker_fee = maker_fee.unwrap_or(Decimal::ZERO);
1332 let taker_fee = taker_fee.unwrap_or(Decimal::ZERO);
1333 let ts_init = self.generate_ts_init();
1334
1335 parse_perp_instrument(&resp, maker_fee, taker_fee, ts_init, ts_init)
1336 }
1337
1338 pub async fn request_book_snapshot(
1348 &self,
1349 symbol: Ustr,
1350 depth: Option<usize>,
1351 ) -> anyhow::Result<OrderBook> {
1352 let instrument = self
1353 .get_instrument(&symbol)
1354 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1355
1356 let resp = self
1357 .inner
1358 .get_book(symbol, Some(2))
1359 .await
1360 .map_err(|e| anyhow::anyhow!(e))?;
1361
1362 let instrument_id = instrument.id();
1363 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1364
1365 let price_precision = instrument.price_precision();
1366 let size_precision = instrument.size_precision();
1367 let ts_event = UnixNanos::from(resp.book.ts as u64 * 1_000_000_000 + resp.book.tn as u64);
1368
1369 for (i, level) in resp.book.b.iter().enumerate() {
1370 if depth.is_some_and(|d| i >= d) {
1371 break;
1372 }
1373 let price = Price::from_decimal_dp(level.p, price_precision)
1374 .unwrap_or_else(|_| Price::from(level.p.to_string().as_str()));
1375 let size = Quantity::new(level.q as f64, size_precision);
1376 let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1377 book.add(order, 0, i as u64, ts_event);
1378 }
1379
1380 let bids_len = resp.book.b.len();
1381 for (i, level) in resp.book.a.iter().enumerate() {
1382 if depth.is_some_and(|d| i >= d) {
1383 break;
1384 }
1385 let price = Price::from_decimal_dp(level.p, price_precision)
1386 .unwrap_or_else(|_| Price::from(level.p.to_string().as_str()));
1387 let size = Quantity::new(level.q as f64, size_precision);
1388 let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1389 book.add(order, 0, (bids_len + i) as u64, ts_event);
1390 }
1391
1392 Ok(book)
1393 }
1394
1395 pub async fn request_trade_ticks(
1409 &self,
1410 symbol: Ustr,
1411 limit: Option<i32>,
1412 start: Option<UnixNanos>,
1413 end: Option<UnixNanos>,
1414 ) -> anyhow::Result<Vec<TradeTick>> {
1415 let instrument = self
1416 .get_instrument(&symbol)
1417 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1418
1419 let resp = self
1420 .inner
1421 .get_trades(symbol, limit)
1422 .await
1423 .map_err(|e| anyhow::anyhow!(e))?;
1424
1425 let ts_init = self.generate_ts_init();
1426 let mut ticks = Vec::with_capacity(resp.trades.len());
1427
1428 for trade in &resp.trades {
1429 match parse_trade_tick(trade, &instrument, ts_init) {
1430 Ok(tick) => {
1431 if start.is_some_and(|s| tick.ts_event < s) {
1432 continue;
1433 }
1434
1435 if end.is_some_and(|e| tick.ts_event > e) {
1436 continue;
1437 }
1438 ticks.push(tick);
1439 }
1440 Err(e) => {
1441 log::warn!("Failed to parse trade for {symbol}: {e}");
1442 }
1443 }
1444 }
1445
1446 Ok(ticks)
1447 }
1448
1449 pub async fn request_bars(
1460 &self,
1461 symbol: Ustr,
1462 start: Option<DateTime<Utc>>,
1463 end: Option<DateTime<Utc>>,
1464 width: AxCandleWidth,
1465 ) -> anyhow::Result<Vec<Bar>> {
1466 let instrument = self
1467 .get_instrument(&symbol)
1468 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1469
1470 let start_ns = start.and_then(|dt| dt.timestamp_nanos_opt()).unwrap_or(0);
1471 let end_ns = end
1472 .and_then(|dt| dt.timestamp_nanos_opt())
1473 .unwrap_or_else(|| self.generate_ts_init().as_i64());
1474 let resp = self
1475 .inner
1476 .get_candles(symbol, start_ns, end_ns, width)
1477 .await
1478 .map_err(|e| anyhow::anyhow!(e))?;
1479
1480 let ts_init = self.generate_ts_init();
1481 let mut bars = Vec::with_capacity(resp.candles.len());
1482
1483 for candle in &resp.candles {
1484 match parse_bar(candle, &instrument, ts_init) {
1485 Ok(bar) => bars.push(bar),
1486 Err(e) => {
1487 log::warn!("Failed to parse bar for {symbol}: {e}");
1488 }
1489 }
1490 }
1491
1492 Ok(bars)
1493 }
1494
1495 pub async fn request_funding_rates(
1501 &self,
1502 instrument_id: InstrumentId,
1503 start: Option<DateTime<Utc>>,
1504 end: Option<DateTime<Utc>>,
1505 ) -> Result<Vec<FundingRateUpdate>, AxHttpError> {
1506 let symbol = instrument_id.symbol.inner();
1507 let start_ns = start.and_then(|dt| dt.timestamp_nanos_opt()).unwrap_or(0);
1508 let end_ns = end
1509 .and_then(|dt| dt.timestamp_nanos_opt())
1510 .unwrap_or_else(|| self.generate_ts_init().as_i64());
1511 let response = self
1512 .inner
1513 .get_funding_rates(symbol, start_ns, end_ns)
1514 .await?;
1515
1516 let ts_init = self.generate_ts_init();
1517 let funding_rates = response
1518 .funding_rates
1519 .iter()
1520 .map(|r| parse_funding_rate(r, instrument_id, ts_init))
1521 .collect::<anyhow::Result<Vec<_>>>()
1522 .map_err(|e| AxHttpError::from(e.to_string()))?;
1523
1524 Ok(funding_rates)
1525 }
1526
1527 pub async fn request_account_state(
1533 &self,
1534 account_id: AccountId,
1535 ) -> anyhow::Result<AccountState> {
1536 let response = self
1537 .inner
1538 .get_balances()
1539 .await
1540 .map_err(|e| anyhow::anyhow!(e))?;
1541
1542 let ts_init = self.generate_ts_init();
1543 parse_account_state(&response, account_id, ts_init, ts_init)
1544 }
1545
1546 pub async fn check_initial_margin(
1552 &self,
1553 request: &PlaceOrderRequest,
1554 ) -> anyhow::Result<Decimal> {
1555 let resp = self
1556 .inner
1557 .check_initial_margin(request)
1558 .await
1559 .map_err(|e| anyhow::anyhow!(e))?;
1560 Ok(resp.im)
1561 }
1562
1563 #[expect(clippy::too_many_arguments)]
1575 pub async fn request_order_status(
1576 &self,
1577 account_id: AccountId,
1578 instrument_id: InstrumentId,
1579 client_order_id: Option<ClientOrderId>,
1580 venue_order_id: Option<VenueOrderId>,
1581 order_side: OrderSide,
1582 order_type: OrderType,
1583 time_in_force: TimeInForce,
1584 ) -> anyhow::Result<OrderStatusReport> {
1585 let resp = if let Some(ref voi) = venue_order_id {
1586 self.inner.get_order_status_by_id(voi.as_str()).await
1587 } else if let Some(ref coid) = client_order_id {
1588 let cid = client_order_id_to_cid(coid);
1589 self.inner.get_order_status_by_cid(cid).await
1590 } else {
1591 anyhow::bail!("Either venue_order_id or client_order_id must be provided")
1592 }
1593 .map_err(|e| anyhow::anyhow!(e))?;
1594
1595 let detail = resp.status;
1596 let size_precision = self
1597 .get_instrument(&detail.symbol)
1598 .map_or(0, |i| i.size_precision());
1599
1600 let voi = VenueOrderId::new(&detail.order_id);
1601 let order_status = detail.state.into();
1602 let filled = detail.filled_quantity.unwrap_or(0);
1603 let remaining = detail.remaining_quantity.unwrap_or(0);
1604 let quantity = Quantity::new((filled + remaining) as f64, size_precision);
1605 let filled_qty = Quantity::new(filled as f64, size_precision);
1606 let ts_init = self.generate_ts_init();
1607
1608 let resolved_coid = client_order_id.or_else(|| detail.clord_id.map(cid_to_client_order_id));
1609
1610 Ok(OrderStatusReport::new(
1611 account_id,
1612 instrument_id,
1613 resolved_coid,
1614 voi,
1615 order_side,
1616 order_type,
1617 time_in_force,
1618 order_status,
1619 quantity,
1620 filled_qty,
1621 ts_init,
1622 ts_init,
1623 ts_init,
1624 Some(UUID4::new()),
1625 ))
1626 }
1627
1628 pub async fn request_order_status_reports<F>(
1642 &self,
1643 account_id: AccountId,
1644 cid_resolver: Option<F>,
1645 ) -> anyhow::Result<Vec<OrderStatusReport>>
1646 where
1647 F: Fn(u64) -> Option<ClientOrderId>,
1648 {
1649 let response = self
1650 .inner
1651 .get_open_orders()
1652 .await
1653 .map_err(|e| anyhow::anyhow!(e))?;
1654
1655 let ts_init = self.generate_ts_init();
1656 let mut reports = Vec::with_capacity(response.orders.len());
1657
1658 for order in &response.orders {
1659 let instrument = self
1660 .get_instrument(&order.s)
1661 .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", order.s))?;
1662
1663 match parse_order_status_report(
1664 order,
1665 account_id,
1666 &instrument,
1667 ts_init,
1668 cid_resolver.as_ref(),
1669 ) {
1670 Ok(report) => reports.push(report),
1671 Err(e) => {
1672 log::warn!("Failed to parse order {}: {e}", order.oid);
1673 }
1674 }
1675 }
1676
1677 Ok(reports)
1678 }
1679
1680 pub async fn request_fill_reports(
1691 &self,
1692 account_id: AccountId,
1693 ) -> anyhow::Result<Vec<FillReport>> {
1694 let response = self
1695 .inner
1696 .get_fills()
1697 .await
1698 .map_err(|e| anyhow::anyhow!(e))?;
1699
1700 let ts_init = self.generate_ts_init();
1701 let mut reports = Vec::with_capacity(response.fills.len());
1702
1703 for fill in &response.fills {
1704 let instrument = self
1705 .get_instrument(&fill.symbol)
1706 .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", fill.symbol))?;
1707
1708 match parse_fill_report(fill, account_id, &instrument, ts_init) {
1709 Ok(report) => reports.push(report),
1710 Err(e) => {
1711 log::warn!("Failed to parse fill {}: {e}", fill.trade_id);
1712 }
1713 }
1714 }
1715
1716 Ok(reports)
1717 }
1718
1719 pub async fn request_position_reports(
1730 &self,
1731 account_id: AccountId,
1732 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1733 let response = self
1734 .inner
1735 .get_positions()
1736 .await
1737 .map_err(|e| anyhow::anyhow!(e))?;
1738
1739 let ts_init = self.generate_ts_init();
1740 let mut reports = Vec::with_capacity(response.positions.len());
1741
1742 for position in &response.positions {
1743 if position.signed_quantity == 0 {
1745 continue;
1746 }
1747
1748 let instrument = self.get_instrument(&position.symbol).ok_or_else(|| {
1749 anyhow::anyhow!("Instrument {} not found in cache", position.symbol)
1750 })?;
1751
1752 match parse_position_status_report(position, account_id, &instrument, ts_init) {
1753 Ok(report) => reports.push(report),
1754 Err(e) => {
1755 log::warn!("Failed to parse position for {}: {e}", position.symbol);
1756 }
1757 }
1758 }
1759
1760 Ok(reports)
1761 }
1762
1763 pub async fn cancel_all_orders(&self, instrument_id: InstrumentId) -> Result<(), AxHttpError> {
1769 let request = CancelAllOrdersRequest::new().with_symbol(instrument_id.symbol.inner());
1770 self.inner.cancel_all_orders(&request).await?;
1771 Ok(())
1772 }
1773}