1use std::{
26 collections::HashMap,
27 num::NonZeroU32,
28 sync::{
29 Arc, LazyLock, RwLock,
30 atomic::{AtomicBool, Ordering},
31 },
32};
33
34use chrono::{DateTime, Utc};
35use dashmap::DashMap;
36use nautilus_core::{
37 AtomicMap, AtomicTime, UUID4, UnixNanos,
38 consts::{NAUTILUS_TRADER, NAUTILUS_USER_AGENT},
39 env::get_or_env_var_opt,
40 time::get_atomic_clock_realtime,
41};
42use nautilus_model::{
43 data::{Bar, BarType, TradeTick},
44 enums::{
45 AccountType, AggregationSource, BarAggregation, ContingencyType, OrderSide, OrderType,
46 PriceType, TimeInForce, TrailingOffsetType, TriggerType,
47 },
48 events::AccountState,
49 identifiers::{AccountId, ClientOrderId, InstrumentId, OrderListId, VenueOrderId},
50 instruments::{Instrument as InstrumentTrait, InstrumentAny},
51 reports::{FillReport, OrderStatusReport, PositionStatusReport},
52 types::{MarginBalance, Money, Price, Quantity},
53};
54use nautilus_network::{
55 http::{HttpClient, Method, StatusCode, USER_AGENT},
56 ratelimiter::quota::Quota,
57 retry::{RetryConfig, RetryManager},
58};
59use rust_decimal::Decimal;
60use serde::{Deserialize, Serialize, de::DeserializeOwned};
61use serde_json::Value;
62use tokio_util::sync::CancellationToken;
63use ustr::Ustr;
64
65use super::{
66 error::{BitmexErrorResponse, BitmexHttpError},
67 models::{
68 BitmexApiInfo, BitmexExecution, BitmexInstrument, BitmexMargin, BitmexOrder,
69 BitmexPosition, BitmexTrade, BitmexTradeBin, BitmexWallet,
70 },
71 query::{
72 DeleteAllOrdersParams, DeleteOrderParams, GetExecutionParams, GetExecutionParamsBuilder,
73 GetOrderParams, GetPositionParams, GetPositionParamsBuilder, GetTradeBucketedParams,
74 GetTradeBucketedParamsBuilder, GetTradeParams, GetTradeParamsBuilder,
75 PostCancelAllAfterParams, PostOrderParams, PostPositionLeverageParams, PutOrderParams,
76 },
77};
78use crate::{
79 common::{
80 consts::{BITMEX_HTTP_TESTNET_URL, BITMEX_HTTP_URL},
81 credential::{Credential, credential_env_vars},
82 enums::{
83 BitmexContingencyType, BitmexEnvironment, BitmexExecInstruction, BitmexOrderStatus,
84 BitmexOrderType, BitmexPegPriceType, BitmexSide, BitmexTimeInForce,
85 },
86 parse::{bitmex_currency_divisor, parse_account_balance, quantity_to_u32},
87 },
88 http::{
89 parse::{
90 InstrumentParseResult, parse_fill_report, parse_instrument_any,
91 parse_order_status_report, parse_position_report, parse_trade, parse_trade_bin,
92 },
93 query::{DeleteAllOrdersParamsBuilder, GetOrderParamsBuilder, PutOrderParamsBuilder},
94 },
95 websocket::messages::BitmexMarginMsg,
96};
97
98const BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 10;
104const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED: u32 = 120;
105const BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED: u32 = 30;
106
107const BITMEX_GLOBAL_RATE_KEY: &str = "bitmex:global";
108const BITMEX_MINUTE_RATE_KEY: &str = "bitmex:minute";
109
110static RATE_LIMIT_KEYS: LazyLock<Vec<Ustr>> = LazyLock::new(|| {
111 vec![
112 Ustr::from(BITMEX_GLOBAL_RATE_KEY),
113 Ustr::from(BITMEX_MINUTE_RATE_KEY),
114 ]
115});
116
117#[derive(Debug, Serialize, Deserialize)]
119pub struct BitmexResponse<T> {
120 pub data: Vec<T>,
122}
123
124#[derive(Debug, Clone)]
144pub struct BitmexRawHttpClient {
145 base_url: String,
146 client: HttpClient,
147 credential: Option<Credential>,
148 recv_window_ms: u64,
149 retry_manager: RetryManager<BitmexHttpError>,
150 cancellation_token: Arc<RwLock<CancellationToken>>,
151}
152
153impl Default for BitmexRawHttpClient {
154 fn default() -> Self {
155 Self::new(
156 None,
157 60,
158 3,
159 1000,
160 10_000,
161 10_000,
162 BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND,
163 BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED,
164 None,
165 )
166 .expect("Failed to create default BitmexHttpInnerClient")
167 }
168}
169
170impl BitmexRawHttpClient {
171 #[expect(clippy::too_many_arguments)]
181 pub fn new(
182 base_url: Option<String>,
183 timeout_secs: u64,
184 max_retries: u32,
185 retry_delay_ms: u64,
186 retry_delay_max_ms: u64,
187 recv_window_ms: u64,
188 max_requests_per_second: u32,
189 max_requests_per_minute: u32,
190 proxy_url: Option<String>,
191 ) -> Result<Self, BitmexHttpError> {
192 let retry_config = RetryConfig {
193 max_retries,
194 initial_delay_ms: retry_delay_ms,
195 max_delay_ms: retry_delay_max_ms,
196 backoff_factor: 2.0,
197 jitter_ms: 1000,
198 operation_timeout_ms: Some(60_000),
199 immediate_first: false,
200 max_elapsed_ms: Some(180_000),
201 };
202
203 let retry_manager = RetryManager::new(retry_config);
204
205 Ok(Self {
206 base_url: base_url.unwrap_or(BITMEX_HTTP_URL.to_string()),
207 client: HttpClient::new(
208 Self::default_headers(),
209 vec![],
210 Self::rate_limiter_quotas(max_requests_per_second, max_requests_per_minute)?,
211 Some(Self::default_quota(max_requests_per_second)?),
212 Some(timeout_secs),
213 proxy_url,
214 )
215 .map_err(|e| {
216 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
217 })?,
218 credential: None,
219 recv_window_ms,
220 retry_manager,
221 cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
222 })
223 }
224
225 #[expect(clippy::too_many_arguments)]
232 pub fn with_credentials(
233 api_key: String,
234 api_secret: String,
235 base_url: String,
236 timeout_secs: u64,
237 max_retries: u32,
238 retry_delay_ms: u64,
239 retry_delay_max_ms: u64,
240 recv_window_ms: u64,
241 max_requests_per_second: u32,
242 max_requests_per_minute: u32,
243 proxy_url: Option<String>,
244 ) -> Result<Self, BitmexHttpError> {
245 let retry_config = RetryConfig {
246 max_retries,
247 initial_delay_ms: retry_delay_ms,
248 max_delay_ms: retry_delay_max_ms,
249 backoff_factor: 2.0,
250 jitter_ms: 1000,
251 operation_timeout_ms: Some(60_000),
252 immediate_first: false,
253 max_elapsed_ms: Some(180_000),
254 };
255
256 let retry_manager = RetryManager::new(retry_config);
257
258 Ok(Self {
259 base_url,
260 client: HttpClient::new(
261 Self::default_headers(),
262 vec![],
263 Self::rate_limiter_quotas(max_requests_per_second, max_requests_per_minute)?,
264 Some(Self::default_quota(max_requests_per_second)?),
265 Some(timeout_secs),
266 proxy_url,
267 )
268 .map_err(|e| {
269 BitmexHttpError::NetworkError(format!("Failed to create HTTP client: {e}"))
270 })?,
271 credential: Some(Credential::new(api_key, api_secret)),
272 recv_window_ms,
273 retry_manager,
274 cancellation_token: Arc::new(RwLock::new(CancellationToken::new())),
275 })
276 }
277
278 fn default_headers() -> HashMap<String, String> {
279 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
280 }
281
282 fn default_quota(max_requests_per_second: u32) -> Result<Quota, BitmexHttpError> {
283 let burst = NonZeroU32::new(max_requests_per_second)
284 .unwrap_or(NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND).expect("non-zero"));
285 Quota::per_second(burst).ok_or_else(|| {
286 BitmexHttpError::ValidationError(format!(
287 "Invalid max_requests_per_second: {max_requests_per_second} exceeds maximum"
288 ))
289 })
290 }
291
292 fn rate_limiter_quotas(
293 max_requests_per_second: u32,
294 max_requests_per_minute: u32,
295 ) -> Result<Vec<(String, Quota)>, BitmexHttpError> {
296 let per_sec_quota = Self::default_quota(max_requests_per_second)?;
297 let per_min_quota =
298 Quota::per_minute(NonZeroU32::new(max_requests_per_minute).unwrap_or_else(|| {
299 NonZeroU32::new(BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_AUTHENTICATED)
300 .expect("non-zero")
301 }));
302
303 Ok(vec![
304 (BITMEX_GLOBAL_RATE_KEY.to_string(), per_sec_quota),
305 (BITMEX_MINUTE_RATE_KEY.to_string(), per_min_quota),
306 ])
307 }
308
309 fn rate_limit_keys() -> Vec<Ustr> {
310 RATE_LIMIT_KEYS.clone()
311 }
312
313 pub fn cancel_all_requests(&self) {
319 self.cancellation_token
320 .read()
321 .expect("cancellation token lock poisoned")
322 .cancel();
323 }
324
325 pub fn reset_cancellation_token(&self) {
331 *self
332 .cancellation_token
333 .write()
334 .expect("cancellation token lock poisoned") = CancellationToken::new();
335 }
336
337 pub fn cancellation_token(&self) -> CancellationToken {
343 self.cancellation_token
344 .read()
345 .expect("cancellation token lock poisoned")
346 .clone()
347 }
348
349 fn sign_request(
350 &self,
351 method: &Method,
352 endpoint: &str,
353 body: Option<&[u8]>,
354 ) -> Result<HashMap<String, String>, BitmexHttpError> {
355 let credential = self
356 .credential
357 .as_ref()
358 .ok_or(BitmexHttpError::MissingCredentials)?;
359
360 let expires = Utc::now().timestamp() + (self.recv_window_ms / 1000) as i64;
361 let body_str = body.and_then(|b| std::str::from_utf8(b).ok()).unwrap_or("");
362
363 let full_path = if endpoint.starts_with("/api/v1") {
364 endpoint.to_string()
365 } else {
366 format!("/api/v1{endpoint}")
367 };
368
369 let signature = credential.sign(method.as_str(), &full_path, expires, body_str);
370
371 let mut headers = HashMap::new();
372 headers.insert("api-expires".to_string(), expires.to_string());
373 headers.insert("api-key".to_string(), credential.api_key().to_string());
374 headers.insert("api-signature".to_string(), signature);
375
376 if body.is_some()
378 && (*method == Method::POST || *method == Method::PUT || *method == Method::DELETE)
379 {
380 headers.insert(
381 "Content-Type".to_string(),
382 "application/x-www-form-urlencoded".to_string(),
383 );
384 }
385
386 Ok(headers)
387 }
388
389 async fn send_request<T: DeserializeOwned, P: Serialize>(
390 &self,
391 method: Method,
392 endpoint: &str,
393 params: Option<&P>,
394 body: Option<Vec<u8>>,
395 authenticate: bool,
396 ) -> Result<T, BitmexHttpError> {
397 let endpoint = endpoint.to_string();
398 let method_clone = method.clone();
399 let body_clone = body.clone();
400
401 let params_str = if method == Method::GET || method == Method::DELETE {
404 params
405 .map(serde_urlencoded::to_string)
406 .transpose()
407 .map_err(|e| {
408 BitmexHttpError::JsonError(format!("Failed to serialize params: {e}"))
409 })?
410 } else {
411 None
412 };
413
414 let full_endpoint = match params_str {
415 Some(ref query) if !query.is_empty() => format!("{endpoint}?{query}"),
416 _ => endpoint.clone(),
417 };
418
419 let url = format!("{}{}", self.base_url, full_endpoint);
420
421 let operation = || {
422 let url = url.clone();
423 let method = method_clone.clone();
424 let body = body_clone.clone();
425 let full_endpoint = full_endpoint.clone();
426
427 async move {
428 let headers = if authenticate {
429 Some(self.sign_request(&method, &full_endpoint, body.as_deref())?)
430 } else {
431 None
432 };
433
434 let rate_keys = Self::rate_limit_keys();
435 let resp = self
436 .client
437 .request_with_ustr_keys(method, url, None, headers, body, None, Some(rate_keys))
438 .await?;
439
440 if resp.status.is_success() {
441 serde_json::from_slice(&resp.body).map_err(Into::into)
442 } else if let Ok(error_resp) =
443 serde_json::from_slice::<BitmexErrorResponse>(&resp.body)
444 {
445 Err(error_resp.into())
446 } else {
447 Err(BitmexHttpError::UnexpectedStatus {
448 status: StatusCode::from_u16(resp.status.as_u16())
449 .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
450 body: String::from_utf8_lossy(&resp.body).to_string(),
451 })
452 }
453 }
454 };
455
456 let should_retry = |error: &BitmexHttpError| -> bool {
473 match error {
474 BitmexHttpError::NetworkError(_) => true,
475 BitmexHttpError::UnexpectedStatus { status, .. } => {
476 status.as_u16() >= 500 || status.as_u16() == 429
477 }
478 BitmexHttpError::BitmexError {
479 error_name,
480 message,
481 } => {
482 error_name == "RateLimitError"
483 || (error_name == "HTTPError"
484 && message.to_lowercase().contains("rate limit"))
485 }
486 _ => false,
487 }
488 };
489
490 let create_error = |msg: String| -> BitmexHttpError {
491 if msg == "canceled" {
492 BitmexHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
493 } else {
494 BitmexHttpError::NetworkError(msg)
495 }
496 };
497
498 let cancel_token = self.cancellation_token();
499
500 self.retry_manager
501 .execute_with_retry_with_cancel(
502 endpoint.as_str(),
503 operation,
504 should_retry,
505 create_error,
506 &cancel_token,
507 )
508 .await
509 }
510
511 pub async fn get_instruments(
517 &self,
518 active_only: bool,
519 ) -> Result<Vec<BitmexInstrument>, BitmexHttpError> {
520 let path = if active_only {
521 "/instrument/active"
522 } else {
523 "/instrument"
524 };
525 self.send_request::<_, ()>(Method::GET, path, None, None, false)
526 .await
527 }
528
529 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
539 let response: BitmexApiInfo = self
540 .send_request::<_, ()>(Method::GET, "", None, None, false)
541 .await?;
542 Ok(response.timestamp)
543 }
544
545 pub async fn get_instrument(
556 &self,
557 symbol: &str,
558 ) -> Result<Option<BitmexInstrument>, BitmexHttpError> {
559 let path = &format!("/instrument?symbol={symbol}");
560 let instruments: Vec<BitmexInstrument> = self
561 .send_request::<_, ()>(Method::GET, path, None, None, false)
562 .await?;
563
564 Ok(instruments.into_iter().next())
565 }
566
567 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
573 let endpoint = "/user/wallet";
574 self.send_request::<_, ()>(Method::GET, endpoint, None, None, true)
575 .await
576 }
577
578 pub async fn get_margin(&self, currency: &str) -> Result<BitmexMargin, BitmexHttpError> {
584 let path = format!("/user/margin?currency={currency}");
585 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
586 .await
587 }
588
589 pub async fn get_all_margins(&self) -> Result<Vec<BitmexMargin>, BitmexHttpError> {
595 self.send_request::<_, ()>(Method::GET, "/user/margin?currency=all", None, None, true)
596 .await
597 }
598
599 pub async fn get_trades(
605 &self,
606 params: GetTradeParams,
607 ) -> Result<Vec<BitmexTrade>, BitmexHttpError> {
608 self.send_request(Method::GET, "/trade", Some(¶ms), None, true)
609 .await
610 }
611
612 pub async fn get_trade_bucketed(
618 &self,
619 params: GetTradeBucketedParams,
620 ) -> Result<Vec<BitmexTradeBin>, BitmexHttpError> {
621 self.send_request(Method::GET, "/trade/bucketed", Some(¶ms), None, true)
622 .await
623 }
624
625 pub async fn get_orders(
631 &self,
632 params: GetOrderParams,
633 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
634 self.send_request(Method::GET, "/order", Some(¶ms), None, true)
635 .await
636 }
637
638 pub async fn place_order(&self, params: PostOrderParams) -> Result<Value, BitmexHttpError> {
644 let body = serde_urlencoded::to_string(¶ms)
646 .map_err(|e| {
647 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
648 })?
649 .into_bytes();
650 let path = "/order";
651 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
652 .await
653 }
654
655 pub async fn cancel_orders(&self, params: DeleteOrderParams) -> Result<Value, BitmexHttpError> {
661 let body = serde_urlencoded::to_string(¶ms)
663 .map_err(|e| {
664 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
665 })?
666 .into_bytes();
667 let path = "/order";
668 self.send_request::<_, ()>(Method::DELETE, path, None, Some(body), true)
669 .await
670 }
671
672 pub async fn amend_order(&self, params: PutOrderParams) -> Result<Value, BitmexHttpError> {
678 let body = serde_urlencoded::to_string(¶ms)
680 .map_err(|e| {
681 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
682 })?
683 .into_bytes();
684 let path = "/order";
685 self.send_request::<_, ()>(Method::PUT, path, None, Some(body), true)
686 .await
687 }
688
689 pub async fn cancel_all_orders(
699 &self,
700 params: DeleteAllOrdersParams,
701 ) -> Result<Value, BitmexHttpError> {
702 self.send_request(Method::DELETE, "/order/all", Some(¶ms), None, true)
703 .await
704 }
705
706 pub async fn cancel_all_after(
718 &self,
719 params: PostCancelAllAfterParams,
720 ) -> Result<Value, BitmexHttpError> {
721 let body = serde_urlencoded::to_string(¶ms)
722 .map_err(|e| {
723 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
724 })?
725 .into_bytes();
726 self.send_request::<_, ()>(
727 Method::POST,
728 "/order/cancelAllAfter",
729 None,
730 Some(body),
731 true,
732 )
733 .await
734 }
735
736 pub async fn get_executions(
742 &self,
743 params: GetExecutionParams,
744 ) -> Result<Vec<BitmexExecution>, BitmexHttpError> {
745 let query = serde_urlencoded::to_string(¶ms).map_err(|e| {
746 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
747 })?;
748 let path = format!("/execution/tradeHistory?{query}");
749 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
750 .await
751 }
752
753 pub async fn get_positions(
759 &self,
760 params: GetPositionParams,
761 ) -> Result<Vec<BitmexPosition>, BitmexHttpError> {
762 self.send_request(Method::GET, "/position", Some(¶ms), None, true)
763 .await
764 }
765
766 pub async fn update_position_leverage(
772 &self,
773 params: PostPositionLeverageParams,
774 ) -> Result<BitmexPosition, BitmexHttpError> {
775 let body = serde_urlencoded::to_string(¶ms)
777 .map_err(|e| {
778 BitmexHttpError::ValidationError(format!("Failed to serialize parameters: {e}"))
779 })?
780 .into_bytes();
781 let path = "/position/leverage";
782 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
783 .await
784 }
785}
786
787#[derive(Debug)]
792#[cfg_attr(
793 feature = "python",
794 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex", from_py_object)
795)]
796#[cfg_attr(
797 feature = "python",
798 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.bitmex")
799)]
800pub struct BitmexHttpClient {
801 pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
802 pub(crate) order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
803 clock: &'static AtomicTime,
804 inner: Arc<BitmexRawHttpClient>,
805 cache_initialized: AtomicBool,
806}
807
808impl Clone for BitmexHttpClient {
809 fn clone(&self) -> Self {
810 let cache_initialized = AtomicBool::new(false);
811
812 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
813 if is_initialized {
814 cache_initialized.store(true, Ordering::Release);
815 }
816
817 Self {
818 inner: self.inner.clone(),
819 instruments_cache: self.instruments_cache.clone(),
820 order_type_cache: self.order_type_cache.clone(),
821 cache_initialized,
822 clock: self.clock,
823 }
824 }
825}
826
827impl Default for BitmexHttpClient {
828 fn default() -> Self {
829 Self::new(
830 None,
831 None,
832 None,
833 BitmexEnvironment::Mainnet,
834 60,
835 3,
836 1_000,
837 10_000,
838 10_000,
839 BITMEX_DEFAULT_RATE_LIMIT_PER_SECOND,
840 BITMEX_DEFAULT_RATE_LIMIT_PER_MINUTE_UNAUTHENTICATED,
841 None,
842 )
843 .expect("Failed to create default BitmexHttpClient")
844 }
845}
846
847impl BitmexHttpClient {
848 #[expect(clippy::too_many_arguments)]
854 pub fn new(
855 base_url: Option<String>,
856 api_key: Option<String>,
857 api_secret: Option<String>,
858 environment: BitmexEnvironment,
859 timeout_secs: u64,
860 max_retries: u32,
861 retry_delay_ms: u64,
862 retry_delay_max_ms: u64,
863 recv_window_ms: u64,
864 max_requests_per_second: u32,
865 max_requests_per_minute: u32,
866 proxy_url: Option<String>,
867 ) -> Result<Self, BitmexHttpError> {
868 let url = base_url.unwrap_or_else(|| match environment {
870 BitmexEnvironment::Testnet => BITMEX_HTTP_TESTNET_URL.to_string(),
871 BitmexEnvironment::Mainnet => BITMEX_HTTP_URL.to_string(),
872 });
873
874 let (key_var, secret_var) = credential_env_vars(environment);
875 let api_key = get_or_env_var_opt(api_key, key_var);
876 let api_secret = get_or_env_var_opt(api_secret, secret_var);
877
878 let inner = match (api_key, api_secret) {
879 (Some(key), Some(secret)) => BitmexRawHttpClient::with_credentials(
880 key,
881 secret,
882 url,
883 timeout_secs,
884 max_retries,
885 retry_delay_ms,
886 retry_delay_max_ms,
887 recv_window_ms,
888 max_requests_per_second,
889 max_requests_per_minute,
890 proxy_url,
891 )?,
892 (Some(_), None) | (None, Some(_)) => {
893 return Err(BitmexHttpError::ValidationError(
894 "Both api_key and api_secret must be provided, or neither".to_string(),
895 ));
896 }
897 (None, None) => BitmexRawHttpClient::new(
898 Some(url),
899 timeout_secs,
900 max_retries,
901 retry_delay_ms,
902 retry_delay_max_ms,
903 recv_window_ms,
904 max_requests_per_second,
905 max_requests_per_minute,
906 proxy_url,
907 )?,
908 };
909
910 Ok(Self {
911 inner: Arc::new(inner),
912 instruments_cache: Arc::new(AtomicMap::new()),
913 order_type_cache: Arc::new(DashMap::new()),
914 cache_initialized: AtomicBool::new(false),
915 clock: get_atomic_clock_realtime(),
916 })
917 }
918
919 pub fn from_env() -> anyhow::Result<Self> {
926 Self::with_credentials(
927 None, None, None, 60, 3, 1_000, 10_000, 10_000, 10, 120, None,
928 )
929 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
930 }
931
932 #[expect(clippy::too_many_arguments)]
942 pub fn with_credentials(
943 api_key: Option<String>,
944 api_secret: Option<String>,
945 base_url: Option<String>,
946 timeout_secs: u64,
947 max_retries: u32,
948 retry_delay_ms: u64,
949 retry_delay_max_ms: u64,
950 recv_window_ms: u64,
951 max_requests_per_second: u32,
952 max_requests_per_minute: u32,
953 proxy_url: Option<String>,
954 ) -> anyhow::Result<Self> {
955 let environment = if base_url.as_ref().is_some_and(|url| url.contains("testnet")) {
957 BitmexEnvironment::Testnet
958 } else {
959 BitmexEnvironment::Mainnet
960 };
961
962 let (key_var, secret_var) = credential_env_vars(environment);
963
964 let api_key = get_or_env_var_opt(api_key, key_var);
965 let api_secret = get_or_env_var_opt(api_secret, secret_var);
966
967 if api_key.is_some() && api_secret.is_none() {
969 anyhow::bail!("{secret_var} is required when {key_var} is provided");
970 }
971
972 if api_key.is_none() && api_secret.is_some() {
973 anyhow::bail!("{key_var} is required when {secret_var} is provided");
974 }
975
976 Self::new(
977 base_url,
978 api_key,
979 api_secret,
980 environment,
981 timeout_secs,
982 max_retries,
983 retry_delay_ms,
984 retry_delay_max_ms,
985 recv_window_ms,
986 max_requests_per_second,
987 max_requests_per_minute,
988 proxy_url,
989 )
990 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))
991 }
992
993 #[must_use]
995 pub fn base_url(&self) -> &str {
996 self.inner.base_url.as_str()
997 }
998
999 #[must_use]
1001 pub fn api_key(&self) -> Option<&str> {
1002 self.inner.credential.as_ref().map(|c| c.api_key())
1003 }
1004
1005 #[must_use]
1007 pub fn api_key_masked(&self) -> Option<String> {
1008 self.inner.credential.as_ref().map(|c| c.api_key_masked())
1009 }
1010
1011 pub async fn get_server_time(&self) -> Result<u64, BitmexHttpError> {
1019 self.inner.get_server_time().await
1020 }
1021
1022 pub async fn cancel_all_after(&self, timeout_ms: u64) -> anyhow::Result<()> {
1030 let params = PostCancelAllAfterParams {
1031 timeout: timeout_ms,
1032 };
1033 self.inner.cancel_all_after(params).await?;
1034 Ok(())
1035 }
1036
1037 fn generate_ts_init(&self) -> UnixNanos {
1039 self.clock.get_time_ns()
1040 }
1041
1042 fn is_contingent_order(contingency_type: ContingencyType) -> bool {
1044 matches!(
1045 contingency_type,
1046 ContingencyType::Oco | ContingencyType::Oto | ContingencyType::Ouo
1047 )
1048 }
1049
1050 fn is_parent_contingency(contingency_type: ContingencyType) -> bool {
1052 matches!(
1053 contingency_type,
1054 ContingencyType::Oco | ContingencyType::Oto
1055 )
1056 }
1057
1058 fn populate_linked_order_ids(reports: &mut [OrderStatusReport]) {
1060 let mut order_list_groups: HashMap<OrderListId, Vec<ClientOrderId>> = HashMap::new();
1061 let mut order_list_parents: HashMap<OrderListId, ClientOrderId> = HashMap::new();
1062 let mut prefix_groups: HashMap<String, Vec<ClientOrderId>> = HashMap::new();
1063 let mut prefix_parents: HashMap<String, ClientOrderId> = HashMap::new();
1064
1065 for report in reports.iter() {
1066 let Some(client_order_id) = report.client_order_id else {
1067 continue;
1068 };
1069
1070 if let Some(order_list_id) = report.order_list_id {
1071 order_list_groups
1072 .entry(order_list_id)
1073 .or_default()
1074 .push(client_order_id);
1075
1076 if Self::is_parent_contingency(report.contingency_type) {
1077 order_list_parents
1078 .entry(order_list_id)
1079 .or_insert(client_order_id);
1080 }
1081 }
1082
1083 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1084 && Self::is_contingent_order(report.contingency_type)
1085 {
1086 prefix_groups
1087 .entry(base.to_owned())
1088 .or_default()
1089 .push(client_order_id);
1090
1091 if Self::is_parent_contingency(report.contingency_type) {
1092 prefix_parents
1093 .entry(base.to_owned())
1094 .or_insert(client_order_id);
1095 }
1096 }
1097 }
1098
1099 for report in reports.iter_mut() {
1100 let Some(client_order_id) = report.client_order_id else {
1101 continue;
1102 };
1103
1104 if report.linked_order_ids.is_some() {
1105 continue;
1106 }
1107
1108 if !Self::is_contingent_order(report.contingency_type) {
1110 continue;
1111 }
1112
1113 if let Some(order_list_id) = report.order_list_id
1114 && let Some(group) = order_list_groups.get(&order_list_id)
1115 {
1116 let mut linked: Vec<ClientOrderId> = group
1117 .iter()
1118 .copied()
1119 .filter(|candidate| candidate != &client_order_id)
1120 .collect();
1121
1122 if !linked.is_empty() {
1123 if let Some(parent_id) = order_list_parents.get(&order_list_id) {
1124 if client_order_id == *parent_id {
1125 report.parent_order_id = None;
1126 } else {
1127 linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1128 report.parent_order_id = Some(*parent_id);
1129 }
1130 } else {
1131 report.parent_order_id = None;
1132 }
1133
1134 log::trace!(
1135 "BitMEX linked ids sourced from order list id: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, linked_order_ids={:?}",
1136 client_order_id,
1137 order_list_id,
1138 report.contingency_type,
1139 linked,
1140 );
1141 report.linked_order_ids = Some(linked);
1142 continue;
1143 }
1144
1145 log::trace!(
1146 "BitMEX order list id group had no peers: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}, order_list_group={:?}",
1147 client_order_id,
1148 order_list_id,
1149 report.contingency_type,
1150 group,
1151 );
1152 report.parent_order_id = None;
1153 } else if report.order_list_id.is_none() {
1154 report.parent_order_id = None;
1155 }
1156
1157 if let Some((base, _)) = client_order_id.as_str().rsplit_once('-')
1158 && let Some(group) = prefix_groups.get(base)
1159 {
1160 let mut linked: Vec<ClientOrderId> = group
1161 .iter()
1162 .copied()
1163 .filter(|candidate| candidate != &client_order_id)
1164 .collect();
1165
1166 if !linked.is_empty() {
1167 if let Some(parent_id) = prefix_parents.get(base) {
1168 if client_order_id == *parent_id {
1169 report.parent_order_id = None;
1170 } else {
1171 linked.sort_by_key(|candidate| i32::from(candidate != parent_id));
1172 report.parent_order_id = Some(*parent_id);
1173 }
1174 } else {
1175 report.parent_order_id = None;
1176 }
1177
1178 log::trace!(
1179 "BitMEX linked ids constructed from client order id prefix: client_order_id={:?}, contingency_type={:?}, base={}, linked_order_ids={:?}",
1180 client_order_id,
1181 report.contingency_type,
1182 base,
1183 linked,
1184 );
1185 report.linked_order_ids = Some(linked);
1186 continue;
1187 }
1188
1189 log::trace!(
1190 "BitMEX client order id prefix group had no peers: client_order_id={:?}, contingency_type={:?}, base={}, prefix_group={:?}",
1191 client_order_id,
1192 report.contingency_type,
1193 base,
1194 group,
1195 );
1196 report.parent_order_id = None;
1197 } else if client_order_id.as_str().contains('-') {
1198 report.parent_order_id = None;
1199 }
1200
1201 if Self::is_contingent_order(report.contingency_type) {
1202 log::warn!(
1203 "BitMEX order status report missing linked ids after grouping: client_order_id={:?}, order_list_id={:?}, contingency_type={:?}",
1204 report.client_order_id,
1205 report.order_list_id,
1206 report.contingency_type,
1207 );
1208 report.contingency_type = ContingencyType::NoContingency;
1209 report.parent_order_id = None;
1210 }
1211
1212 report.linked_order_ids = None;
1213 }
1214 }
1215
1216 pub fn cancel_all_requests(&self) {
1218 self.inner.cancel_all_requests();
1219 }
1220
1221 pub fn reset_cancellation_token(&self) {
1223 self.inner.reset_cancellation_token();
1224 }
1225
1226 pub fn cancellation_token(&self) -> CancellationToken {
1228 self.inner.cancellation_token()
1229 }
1230
1231 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1235 self.instruments_cache
1236 .insert(instrument.raw_symbol().inner(), instrument);
1237 self.cache_initialized.store(true, Ordering::Release);
1238 }
1239
1240 pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1244 self.instruments_cache.rcu(|m| {
1245 for inst in instruments {
1246 m.insert(inst.raw_symbol().inner(), inst.clone());
1247 }
1248 });
1249 self.cache_initialized.store(true, Ordering::Release);
1250 }
1251
1252 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1254 self.instruments_cache.get_cloned(symbol)
1255 }
1256
1257 pub async fn request_instrument(
1265 &self,
1266 instrument_id: InstrumentId,
1267 ) -> anyhow::Result<Option<InstrumentAny>> {
1268 let response = self
1269 .inner
1270 .get_instrument(instrument_id.symbol.as_str())
1271 .await?;
1272
1273 let instrument = match response {
1274 Some(instrument) => instrument,
1275 None => return Ok(None),
1276 };
1277
1278 let ts_init = self.generate_ts_init();
1279
1280 match parse_instrument_any(&instrument, ts_init) {
1281 InstrumentParseResult::Ok(inst) => Ok(Some(*inst)),
1282 InstrumentParseResult::Unsupported {
1283 symbol,
1284 instrument_type,
1285 } => {
1286 log::debug!(
1287 "Instrument {symbol} has unsupported type {instrument_type:?}, returning None"
1288 );
1289 Ok(None)
1290 }
1291 InstrumentParseResult::Inactive { symbol, state } => {
1292 log::debug!("Instrument {symbol} is inactive (state={state}), returning None");
1293 Ok(None)
1294 }
1295 InstrumentParseResult::Failed {
1296 symbol,
1297 instrument_type,
1298 error,
1299 } => {
1300 log::error!(
1301 "Failed to parse instrument {symbol} (type={instrument_type:?}): {error}"
1302 );
1303 Ok(None)
1304 }
1305 }
1306 }
1307
1308 pub async fn request_instruments(
1314 &self,
1315 active_only: bool,
1316 ) -> anyhow::Result<Vec<InstrumentAny>> {
1317 let instruments = self.inner.get_instruments(active_only).await?;
1318 let ts_init = self.generate_ts_init();
1319
1320 let mut parsed_instruments = Vec::new();
1321 let mut skipped_count = 0;
1322 let mut inactive_count = 0;
1323 let mut failed_count = 0;
1324 let total_count = instruments.len();
1325
1326 for inst in instruments {
1327 match parse_instrument_any(&inst, ts_init) {
1328 InstrumentParseResult::Ok(instrument_any) => {
1329 parsed_instruments.push(*instrument_any);
1330 }
1331 InstrumentParseResult::Unsupported {
1332 symbol,
1333 instrument_type,
1334 } => {
1335 skipped_count += 1;
1336 log::debug!(
1337 "Skipping unsupported instrument type: symbol={symbol}, type={instrument_type:?}"
1338 );
1339 }
1340 InstrumentParseResult::Inactive { symbol, state } => {
1341 inactive_count += 1;
1342 log::debug!("Skipping inactive instrument: symbol={symbol}, state={state}");
1343 }
1344 InstrumentParseResult::Failed {
1345 symbol,
1346 instrument_type,
1347 error,
1348 } => {
1349 failed_count += 1;
1350 log::error!(
1351 "Failed to parse instrument: symbol={symbol}, type={instrument_type:?}, error={error}"
1352 );
1353 }
1354 }
1355 }
1356
1357 if skipped_count > 0 {
1358 log::info!(
1359 "Skipped {skipped_count} unsupported instrument type(s) out of {total_count} total"
1360 );
1361 }
1362
1363 if inactive_count > 0 {
1364 log::info!(
1365 "Skipped {inactive_count} inactive instrument(s) out of {total_count} total"
1366 );
1367 }
1368
1369 if failed_count > 0 {
1370 log::error!(
1371 "Instrument parse failures: {failed_count} failed out of {total_count} total ({} successfully parsed)",
1372 parsed_instruments.len()
1373 );
1374 }
1375
1376 Ok(parsed_instruments)
1377 }
1378
1379 pub async fn get_wallet(&self) -> Result<BitmexWallet, BitmexHttpError> {
1385 let inner = self.inner.clone();
1386 inner.get_wallet().await
1387 }
1388
1389 pub async fn get_orders(
1395 &self,
1396 params: GetOrderParams,
1397 ) -> Result<Vec<BitmexOrder>, BitmexHttpError> {
1398 let inner = self.inner.clone();
1399 inner.get_orders(params).await
1400 }
1401
1402 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
1408 self.get_instrument(&symbol).ok_or_else(|| {
1409 anyhow::anyhow!(
1410 "Instrument {symbol} not found in cache, ensure instruments loaded first"
1411 )
1412 })
1413 }
1414
1415 pub fn get_price_precision(&self, symbol: Ustr) -> anyhow::Result<u8> {
1422 self.instrument_from_cache(symbol)
1423 .map(|instrument| instrument.price_precision())
1424 }
1425
1426 pub async fn get_margin(&self, currency: &str) -> anyhow::Result<BitmexMargin> {
1432 self.inner
1433 .get_margin(currency)
1434 .await
1435 .map_err(|e| anyhow::anyhow!(e))
1436 }
1437
1438 pub async fn get_all_margins(&self) -> anyhow::Result<Vec<BitmexMargin>> {
1444 self.inner
1445 .get_all_margins()
1446 .await
1447 .map_err(|e| anyhow::anyhow!(e))
1448 }
1449
1450 pub async fn request_account_state(
1456 &self,
1457 account_id: AccountId,
1458 ) -> anyhow::Result<AccountState> {
1459 let margins = self
1460 .inner
1461 .get_all_margins()
1462 .await
1463 .map_err(|e| anyhow::anyhow!(e))?;
1464
1465 let ts_init =
1466 UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default() as u64);
1467
1468 let mut balances = Vec::with_capacity(margins.len());
1469 let mut margins_vec = Vec::new();
1470 let mut latest_timestamp: Option<chrono::DateTime<chrono::Utc>> = None;
1471
1472 for margin in margins {
1473 if let Some(ts) = margin.timestamp {
1474 latest_timestamp = Some(latest_timestamp.map_or(ts, |prev| prev.max(ts)));
1475 }
1476
1477 let margin_msg = BitmexMarginMsg {
1478 account: margin.account,
1479 currency: margin.currency,
1480 risk_limit: margin.risk_limit,
1481 amount: margin.amount,
1482 prev_realised_pnl: margin.prev_realised_pnl,
1483 gross_comm: margin.gross_comm,
1484 gross_open_cost: margin.gross_open_cost,
1485 gross_open_premium: margin.gross_open_premium,
1486 gross_exec_cost: margin.gross_exec_cost,
1487 gross_mark_value: margin.gross_mark_value,
1488 risk_value: margin.risk_value,
1489 init_margin: margin.init_margin,
1490 maint_margin: margin.maint_margin,
1491 target_excess_margin: margin.target_excess_margin,
1492 realised_pnl: margin.realised_pnl,
1493 unrealised_pnl: margin.unrealised_pnl,
1494 wallet_balance: margin.wallet_balance,
1495 margin_balance: margin.margin_balance,
1496 margin_leverage: margin.margin_leverage,
1497 margin_used_pcnt: margin.margin_used_pcnt,
1498 excess_margin: margin.excess_margin,
1499 available_margin: margin.available_margin,
1500 withdrawable_margin: margin.withdrawable_margin,
1501 maker_fee_discount: None,
1502 taker_fee_discount: None,
1503 timestamp: margin.timestamp.unwrap_or_else(chrono::Utc::now),
1504 foreign_margin_balance: None,
1505 foreign_requirement: None,
1506 };
1507
1508 let balance = parse_account_balance(&margin_msg);
1509
1510 let divisor = bitmex_currency_divisor(margin_msg.currency.as_str());
1511 let initial_dec = Decimal::from(margin_msg.init_margin.unwrap_or(0).max(0)) / divisor;
1512 let maintenance_dec =
1513 Decimal::from(margin_msg.maint_margin.unwrap_or(0).max(0)) / divisor;
1514
1515 if !initial_dec.is_zero() || !maintenance_dec.is_zero() {
1516 let currency = balance.total.currency;
1517 margins_vec.push(MarginBalance::new(
1519 Money::from_decimal(initial_dec, currency)
1520 .unwrap_or_else(|_| Money::zero(currency)),
1521 Money::from_decimal(maintenance_dec, currency)
1522 .unwrap_or_else(|_| Money::zero(currency)),
1523 None,
1524 ));
1525 }
1526
1527 balances.push(balance);
1528 }
1529
1530 if balances.is_empty() {
1531 anyhow::bail!("No margin data returned from BitMEX");
1532 }
1533
1534 let account_type = AccountType::Margin;
1535 let is_reported = true;
1536 let event_id = UUID4::new();
1537
1538 let ts_event = latest_timestamp.map_or(ts_init, |ts| {
1540 UnixNanos::from(ts.timestamp_nanos_opt().unwrap_or_default() as u64)
1541 });
1542
1543 Ok(AccountState::new(
1544 account_id,
1545 account_type,
1546 balances,
1547 margins_vec,
1548 is_reported,
1549 event_id,
1550 ts_event,
1551 ts_init,
1552 None,
1553 ))
1554 }
1555
1556 #[expect(clippy::too_many_arguments)]
1563 pub async fn submit_order(
1564 &self,
1565 instrument_id: InstrumentId,
1566 client_order_id: ClientOrderId,
1567 order_side: OrderSide,
1568 order_type: OrderType,
1569 quantity: Quantity,
1570 time_in_force: TimeInForce,
1571 price: Option<Price>,
1572 trigger_price: Option<Price>,
1573 trigger_type: Option<TriggerType>,
1574 trailing_offset: Option<f64>,
1575 trailing_offset_type: Option<TrailingOffsetType>,
1576 display_qty: Option<Quantity>,
1577 post_only: bool,
1578 reduce_only: bool,
1579 order_list_id: Option<OrderListId>,
1580 contingency_type: Option<ContingencyType>,
1581 peg_price_type: Option<BitmexPegPriceType>,
1582 peg_offset_value: Option<f64>,
1583 ) -> anyhow::Result<OrderStatusReport> {
1584 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1585
1586 let mut params = super::query::PostOrderParamsBuilder::default();
1587 params.text(NAUTILUS_TRADER);
1588 params.symbol(instrument_id.symbol.as_str());
1589 params.cl_ord_id(client_order_id.as_str());
1590
1591 if order_side == OrderSide::NoOrderSide {
1592 anyhow::bail!("Order side must be Buy or Sell");
1593 }
1594 let side = BitmexSide::from(order_side.as_specified());
1595 params.side(side);
1596
1597 let ord_type = BitmexOrderType::try_from_order_type(order_type)?;
1598 params.ord_type(ord_type);
1599
1600 params.order_qty(quantity_to_u32(&quantity, &instrument));
1601
1602 let tif = BitmexTimeInForce::try_from_time_in_force(time_in_force)?;
1603 params.time_in_force(tif);
1604
1605 if let Some(price) = price {
1606 params.price(price.as_f64());
1607 }
1608
1609 if let Some(trigger_price) = trigger_price {
1610 params.stop_px(trigger_price.as_f64());
1611 }
1612
1613 if let Some(display_qty) = display_qty {
1614 params.display_qty(quantity_to_u32(&display_qty, &instrument));
1615 }
1616
1617 if let Some(order_list_id) = order_list_id {
1618 params.cl_ord_link_id(order_list_id.as_str());
1619 }
1620
1621 let is_trailing_stop = matches!(
1622 order_type,
1623 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
1624 );
1625
1626 if is_trailing_stop && let Some(offset) = trailing_offset {
1627 if let Some(offset_type) = trailing_offset_type
1628 && offset_type != TrailingOffsetType::Price
1629 {
1630 anyhow::bail!(
1631 "BitMEX only supports PRICE trailing offset type, was {offset_type:?}"
1632 );
1633 }
1634
1635 params.peg_price_type(BitmexPegPriceType::TrailingStopPeg);
1636
1637 let signed_offset = match order_side {
1639 OrderSide::Sell => -offset.abs(),
1640 OrderSide::Buy => offset.abs(),
1641 _ => offset,
1642 };
1643 params.peg_offset_value(signed_offset);
1644 }
1645
1646 if peg_price_type.is_none() && peg_offset_value.is_some() {
1648 anyhow::bail!("`peg_offset_value` requires `peg_price_type`");
1649 }
1650
1651 if let Some(peg_type) = peg_price_type {
1652 if order_type != OrderType::Limit {
1653 anyhow::bail!(
1654 "Pegged orders only supported for LIMIT order type, was {order_type:?}"
1655 );
1656 }
1657 params.ord_type(BitmexOrderType::Pegged);
1658 params.peg_price_type(peg_type);
1659
1660 if let Some(offset) = peg_offset_value {
1661 params.peg_offset_value(offset);
1662 }
1663 }
1664
1665 let mut exec_inst = Vec::new();
1666
1667 if post_only {
1668 exec_inst.push(BitmexExecInstruction::ParticipateDoNotInitiate);
1669 }
1670
1671 if reduce_only {
1672 exec_inst.push(BitmexExecInstruction::ReduceOnly);
1673 }
1674
1675 if (trigger_price.is_some() || is_trailing_stop)
1677 && let Some(trigger_type) = trigger_type
1678 {
1679 match trigger_type {
1680 TriggerType::LastPrice => exec_inst.push(BitmexExecInstruction::LastPrice),
1681 TriggerType::MarkPrice => exec_inst.push(BitmexExecInstruction::MarkPrice),
1682 TriggerType::IndexPrice => exec_inst.push(BitmexExecInstruction::IndexPrice),
1683 _ => {} }
1685 }
1686
1687 if !exec_inst.is_empty() {
1688 params.exec_inst(exec_inst);
1689 }
1690
1691 if let Some(contingency_type) = contingency_type {
1692 let bitmex_contingency = BitmexContingencyType::try_from(contingency_type)?;
1693 params.contingency_type(bitmex_contingency);
1694 }
1695
1696 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1697
1698 let response = self.inner.place_order(params).await?;
1699
1700 let order: BitmexOrder = serde_json::from_value(response)?;
1701
1702 if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1703 let reason = order
1704 .ord_rej_reason
1705 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1706 anyhow::bail!("Order rejected: {reason}");
1707 }
1708
1709 self.order_type_cache.insert(client_order_id, order_type);
1711
1712 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1713 let ts_init = self.generate_ts_init();
1714
1715 parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1716 }
1717
1718 pub async fn cancel_order(
1728 &self,
1729 instrument_id: InstrumentId,
1730 client_order_id: Option<ClientOrderId>,
1731 venue_order_id: Option<VenueOrderId>,
1732 ) -> anyhow::Result<OrderStatusReport> {
1733 let mut params = super::query::DeleteOrderParamsBuilder::default();
1734 params.text(NAUTILUS_TRADER);
1735
1736 if let Some(venue_order_id) = venue_order_id {
1737 params.order_id(vec![venue_order_id.as_str().to_string()]);
1738 } else if let Some(client_order_id) = client_order_id {
1739 params.cl_ord_id(vec![client_order_id.as_str().to_string()]);
1740 } else {
1741 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1742 }
1743
1744 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1745
1746 let response = self.inner.cancel_orders(params).await?;
1747
1748 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1749 let order = orders
1750 .into_iter()
1751 .next()
1752 .ok_or_else(|| anyhow::anyhow!("No order returned in cancel response"))?;
1753
1754 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1755 let ts_init = self.generate_ts_init();
1756
1757 parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1758 }
1759
1760 pub async fn cancel_orders(
1770 &self,
1771 instrument_id: InstrumentId,
1772 client_order_ids: Option<Vec<ClientOrderId>>,
1773 venue_order_ids: Option<Vec<VenueOrderId>>,
1774 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1775 let mut params = super::query::DeleteOrderParamsBuilder::default();
1776 params.text(NAUTILUS_TRADER);
1777
1778 if let Some(venue_order_ids) = venue_order_ids {
1781 if venue_order_ids.is_empty() {
1782 anyhow::bail!("venue_order_ids cannot be empty");
1783 }
1784 params.order_id(
1785 venue_order_ids
1786 .iter()
1787 .map(|id| id.to_string())
1788 .collect::<Vec<_>>(),
1789 );
1790 } else if let Some(client_order_ids) = client_order_ids {
1791 if client_order_ids.is_empty() {
1792 anyhow::bail!("client_order_ids cannot be empty");
1793 }
1794 params.cl_ord_id(
1795 client_order_ids
1796 .iter()
1797 .map(|id| id.to_string())
1798 .collect::<Vec<_>>(),
1799 );
1800 } else {
1801 anyhow::bail!("Either client_order_ids or venue_order_ids must be provided");
1802 }
1803
1804 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1805
1806 let response = self.inner.cancel_orders(params).await?;
1807
1808 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1809
1810 let ts_init = self.generate_ts_init();
1811 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1812
1813 let mut reports = Vec::new();
1814
1815 for order in orders {
1816 reports.push(parse_order_status_report(
1817 &order,
1818 &instrument,
1819 &self.order_type_cache,
1820 ts_init,
1821 )?);
1822 }
1823
1824 Self::populate_linked_order_ids(&mut reports);
1825
1826 Ok(reports)
1827 }
1828
1829 pub async fn cancel_all_orders(
1839 &self,
1840 instrument_id: InstrumentId,
1841 order_side: Option<OrderSide>,
1842 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1843 let mut params = DeleteAllOrdersParamsBuilder::default();
1844 params.text(NAUTILUS_TRADER);
1845 params.symbol(instrument_id.symbol.as_str());
1846
1847 if let Some(side) = order_side {
1848 if side == OrderSide::NoOrderSide {
1849 log::debug!("Ignoring NoOrderSide filter for cancel_all_orders on {instrument_id}",);
1850 } else {
1851 let side = BitmexSide::from(side.as_specified());
1852 params.filter(serde_json::json!({
1853 "side": side
1854 }));
1855 }
1856 }
1857
1858 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1859
1860 let response = self.inner.cancel_all_orders(params).await?;
1861
1862 let orders: Vec<BitmexOrder> = serde_json::from_value(response)?;
1863
1864 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1865 let ts_init = self.generate_ts_init();
1866
1867 let mut reports = Vec::new();
1868
1869 for order in orders {
1870 reports.push(parse_order_status_report(
1871 &order,
1872 &instrument,
1873 &self.order_type_cache,
1874 ts_init,
1875 )?);
1876 }
1877
1878 Self::populate_linked_order_ids(&mut reports);
1879
1880 Ok(reports)
1881 }
1882
1883 pub async fn modify_order(
1894 &self,
1895 instrument_id: InstrumentId,
1896 client_order_id: Option<ClientOrderId>,
1897 venue_order_id: Option<VenueOrderId>,
1898 quantity: Option<Quantity>,
1899 price: Option<Price>,
1900 trigger_price: Option<Price>,
1901 ) -> anyhow::Result<OrderStatusReport> {
1902 let mut params = PutOrderParamsBuilder::default();
1903 params.text(NAUTILUS_TRADER);
1904
1905 if let Some(venue_order_id) = venue_order_id {
1907 params.order_id(venue_order_id.as_str());
1908 } else if let Some(client_order_id) = client_order_id {
1909 params.orig_cl_ord_id(client_order_id.as_str());
1910 } else {
1911 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1912 }
1913
1914 if let Some(quantity) = quantity {
1915 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1916 params.order_qty(quantity_to_u32(&quantity, &instrument));
1917 }
1918
1919 if let Some(price) = price {
1920 params.price(price.as_f64());
1921 }
1922
1923 if let Some(trigger_price) = trigger_price {
1924 params.stop_px(trigger_price.as_f64());
1925 }
1926
1927 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1928
1929 let response = self.inner.amend_order(params).await?;
1930
1931 let order: BitmexOrder = serde_json::from_value(response)?;
1932
1933 if order.ord_status == Some(BitmexOrderStatus::Rejected) {
1934 let reason = order
1935 .ord_rej_reason
1936 .map_or_else(|| "No reason provided".to_string(), |r| r.to_string());
1937 anyhow::bail!("Order modification rejected: {reason}");
1938 }
1939
1940 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1941 let ts_init = self.generate_ts_init();
1942
1943 parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
1944 }
1945
1946 pub async fn query_order(
1955 &self,
1956 instrument_id: InstrumentId,
1957 client_order_id: Option<ClientOrderId>,
1958 venue_order_id: Option<VenueOrderId>,
1959 ) -> anyhow::Result<Option<OrderStatusReport>> {
1960 let mut params = GetOrderParamsBuilder::default();
1961
1962 let filter_json = if let Some(client_order_id) = client_order_id {
1963 serde_json::json!({
1964 "clOrdID": client_order_id.to_string()
1965 })
1966 } else if let Some(venue_order_id) = venue_order_id {
1967 serde_json::json!({
1968 "orderID": venue_order_id.to_string()
1969 })
1970 } else {
1971 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1972 };
1973
1974 params.filter(filter_json);
1975 params.count(1); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1978
1979 let response = self.inner.get_orders(params).await?;
1980
1981 if response.is_empty() {
1982 return Ok(None);
1983 }
1984
1985 let order = &response[0];
1986
1987 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
1988 let ts_init = self.generate_ts_init();
1989
1990 let report =
1991 parse_order_status_report(order, &instrument, &self.order_type_cache, ts_init)?;
1992
1993 Ok(Some(report))
1994 }
1995
1996 pub async fn request_order_status_report(
2005 &self,
2006 instrument_id: InstrumentId,
2007 client_order_id: Option<ClientOrderId>,
2008 venue_order_id: Option<VenueOrderId>,
2009 ) -> anyhow::Result<OrderStatusReport> {
2010 if venue_order_id.is_none() && client_order_id.is_none() {
2011 anyhow::bail!("Either venue_order_id or client_order_id must be provided");
2012 }
2013
2014 let mut params = GetOrderParamsBuilder::default();
2015 params.symbol(instrument_id.symbol.as_str());
2016
2017 if let Some(venue_order_id) = venue_order_id {
2018 params.filter(serde_json::json!({
2019 "orderID": venue_order_id.as_str()
2020 }));
2021 } else if let Some(client_order_id) = client_order_id {
2022 params.filter(serde_json::json!({
2023 "clOrdID": client_order_id.as_str()
2024 }));
2025 }
2026
2027 params.count(1i32);
2028 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2029
2030 let response = self.inner.get_orders(params).await?;
2031
2032 let order = response
2033 .into_iter()
2034 .next()
2035 .ok_or_else(|| anyhow::anyhow!("Order not found"))?;
2036
2037 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2038 let ts_init = self.generate_ts_init();
2039
2040 parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init)
2041 }
2042
2043 pub async fn request_order_status_reports(
2052 &self,
2053 instrument_id: Option<InstrumentId>,
2054 open_only: bool,
2055 start: Option<DateTime<Utc>>,
2056 end: Option<DateTime<Utc>>,
2057 limit: Option<u32>,
2058 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2059 if let (Some(start), Some(end)) = (start, end) {
2060 anyhow::ensure!(
2061 start < end,
2062 "Invalid time range: start={start:?} end={end:?}",
2063 );
2064 }
2065
2066 let mut params = GetOrderParamsBuilder::default();
2067
2068 if let Some(instrument_id) = &instrument_id {
2069 params.symbol(instrument_id.symbol.as_str());
2070 }
2071
2072 if open_only {
2073 params.filter(serde_json::json!({
2074 "open": true
2075 }));
2076 }
2077
2078 if let Some(start) = start {
2079 params.start_time(start);
2080 }
2081
2082 if let Some(end) = end {
2083 params.end_time(end);
2084 }
2085
2086 if let Some(limit) = limit {
2087 params.count(limit as i32);
2088 } else {
2089 params.count(500); }
2091
2092 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2095
2096 let response = self.inner.get_orders(params).await?;
2097
2098 let ts_init = self.generate_ts_init();
2099
2100 let mut reports = Vec::new();
2101
2102 for order in response {
2103 if let Some(start) = start {
2104 match order.timestamp {
2105 Some(timestamp) if timestamp < start => continue,
2106 Some(_) => {}
2107 None => {
2108 log::debug!("Skipping order report without timestamp for bounded query");
2109 continue;
2110 }
2111 }
2112 }
2113
2114 if let Some(end) = end {
2115 match order.timestamp {
2116 Some(timestamp) if timestamp > end => continue,
2117 Some(_) => {}
2118 None => {
2119 log::debug!("Skipping order report without timestamp for bounded query");
2120 continue;
2121 }
2122 }
2123 }
2124
2125 let Some(symbol) = order.symbol else {
2127 log::warn!("Order response missing symbol, skipping");
2128 continue;
2129 };
2130
2131 let Ok(instrument) = self.instrument_from_cache(symbol) else {
2132 log::debug!("Skipping order report for instrument not in cache: symbol={symbol}");
2133 continue;
2134 };
2135
2136 match parse_order_status_report(&order, &instrument, &self.order_type_cache, ts_init) {
2137 Ok(report) => reports.push(report),
2138 Err(e) => log::error!("Failed to parse order status report: {e}"),
2139 }
2140 }
2141
2142 Self::populate_linked_order_ids(&mut reports);
2143
2144 Ok(reports)
2145 }
2146
2147 pub async fn request_trades(
2153 &self,
2154 instrument_id: InstrumentId,
2155 start: Option<DateTime<Utc>>,
2156 end: Option<DateTime<Utc>>,
2157 limit: Option<u32>,
2158 ) -> anyhow::Result<Vec<TradeTick>> {
2159 let mut params = GetTradeParamsBuilder::default();
2160 params.symbol(instrument_id.symbol.as_str());
2161
2162 if let Some(start) = start {
2163 params.start_time(start);
2164 }
2165
2166 if let Some(end) = end {
2167 params.end_time(end);
2168 }
2169
2170 if let (Some(start), Some(end)) = (start, end) {
2171 anyhow::ensure!(
2172 start < end,
2173 "Invalid time range: start={start:?} end={end:?}",
2174 );
2175 }
2176
2177 if let Some(limit) = limit {
2178 let clamped_limit = limit.min(1000);
2179 if limit > 1000 {
2180 log::warn!(
2181 "BitMEX trade request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2182 );
2183 }
2184 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2185 }
2186 params.reverse(false);
2187 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2188
2189 let response = self.inner.get_trades(params).await?;
2190
2191 let ts_init = self.generate_ts_init();
2192
2193 let mut parsed_trades = Vec::new();
2194
2195 for trade in response {
2196 if let Some(start) = start
2197 && trade.timestamp < start
2198 {
2199 continue;
2200 }
2201
2202 if let Some(end) = end
2203 && trade.timestamp > end
2204 {
2205 continue;
2206 }
2207
2208 let Some(instrument) = self.get_instrument(&trade.symbol) else {
2209 log::error!(
2210 "Instrument {} not found in cache, skipping trade",
2211 trade.symbol
2212 );
2213 continue;
2214 };
2215
2216 match parse_trade(&trade, &instrument, ts_init) {
2217 Ok(trade) => parsed_trades.push(trade),
2218 Err(e) => log::error!("Failed to parse trade: {e}"),
2219 }
2220 }
2221
2222 Ok(parsed_trades)
2223 }
2224
2225 pub async fn request_bars(
2232 &self,
2233 mut bar_type: BarType,
2234 start: Option<DateTime<Utc>>,
2235 end: Option<DateTime<Utc>>,
2236 limit: Option<u32>,
2237 partial: bool,
2238 ) -> anyhow::Result<Vec<Bar>> {
2239 bar_type = bar_type.standard();
2240
2241 anyhow::ensure!(
2242 bar_type.aggregation_source() == AggregationSource::External,
2243 "Only EXTERNAL aggregation bars are supported"
2244 );
2245 anyhow::ensure!(
2246 bar_type.spec().price_type == PriceType::Last,
2247 "Only LAST price type bars are supported"
2248 );
2249
2250 if let (Some(start), Some(end)) = (start, end) {
2251 anyhow::ensure!(
2252 start < end,
2253 "Invalid time range: start={start:?} end={end:?}"
2254 );
2255 }
2256
2257 let spec = bar_type.spec();
2258 let bin_size = match (spec.aggregation, spec.step.get()) {
2259 (BarAggregation::Minute, 1) => "1m",
2260 (BarAggregation::Minute, 5) => "5m",
2261 (BarAggregation::Hour, 1) => "1h",
2262 (BarAggregation::Day, 1) => "1d",
2263 _ => anyhow::bail!(
2264 "BitMEX does not support {}-{:?}-{:?} bars",
2265 spec.step.get(),
2266 spec.aggregation,
2267 spec.price_type,
2268 ),
2269 };
2270
2271 let instrument_id = bar_type.instrument_id();
2272 let instrument = self.instrument_from_cache(instrument_id.symbol.inner())?;
2273
2274 let mut params = GetTradeBucketedParamsBuilder::default();
2275 params.symbol(instrument_id.symbol.as_str());
2276 params.bin_size(bin_size);
2277
2278 if partial {
2279 params.partial(true);
2280 }
2281
2282 if let Some(start) = start {
2283 params.start_time(start);
2284 }
2285
2286 if let Some(end) = end {
2287 params.end_time(end);
2288 }
2289
2290 if let Some(limit) = limit {
2291 let clamped_limit = limit.min(1000);
2292 if limit > 1000 {
2293 log::warn!(
2294 "BitMEX bar request limit exceeds venue maximum; clamping: limit={limit}, clamped_limit={clamped_limit}",
2295 );
2296 }
2297 params.count(i32::try_from(clamped_limit).unwrap_or(1000));
2298 }
2299 params.reverse(false);
2300 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2301
2302 let response = self.inner.get_trade_bucketed(params).await?;
2303 let ts_init = self.generate_ts_init();
2304 let mut bars = Vec::new();
2305
2306 for bin in response {
2307 if let Some(start) = start
2308 && bin.timestamp < start
2309 {
2310 continue;
2311 }
2312
2313 if let Some(end) = end
2314 && bin.timestamp > end
2315 {
2316 continue;
2317 }
2318
2319 if bin.symbol != instrument_id.symbol.inner() {
2320 log::warn!(
2321 "Skipping trade bin for unexpected symbol: symbol={}, expected={}",
2322 bin.symbol,
2323 instrument_id.symbol,
2324 );
2325 continue;
2326 }
2327
2328 match parse_trade_bin(&bin, &instrument, &bar_type, ts_init) {
2329 Ok(bar) => bars.push(bar),
2330 Err(e) => log::warn!("Failed to parse trade bin: {e}"),
2331 }
2332 }
2333
2334 Ok(bars)
2335 }
2336
2337 pub async fn request_fill_reports(
2343 &self,
2344 instrument_id: Option<InstrumentId>,
2345 start: Option<DateTime<Utc>>,
2346 end: Option<DateTime<Utc>>,
2347 limit: Option<u32>,
2348 ) -> anyhow::Result<Vec<FillReport>> {
2349 if let (Some(start), Some(end)) = (start, end) {
2350 anyhow::ensure!(
2351 start < end,
2352 "Invalid time range: start={start:?} end={end:?}",
2353 );
2354 }
2355
2356 let mut params = GetExecutionParamsBuilder::default();
2357
2358 if let Some(instrument_id) = instrument_id {
2359 params.symbol(instrument_id.symbol.as_str());
2360 }
2361
2362 if let Some(start) = start {
2363 params.start_time(start);
2364 }
2365
2366 if let Some(end) = end {
2367 params.end_time(end);
2368 }
2369
2370 if let Some(limit) = limit {
2371 params.count(limit as i32);
2372 } else {
2373 params.count(500); }
2375 params.reverse(true); let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
2378
2379 let response = self.inner.get_executions(params).await?;
2380
2381 let ts_init = self.generate_ts_init();
2382
2383 let mut reports = Vec::new();
2384
2385 for exec in response {
2386 if let Some(start) = start {
2387 match exec.transact_time {
2388 Some(timestamp) if timestamp < start => continue,
2389 Some(_) => {}
2390 None => {
2391 log::debug!("Skipping fill report without transact_time for bounded query");
2392 continue;
2393 }
2394 }
2395 }
2396
2397 if let Some(end) = end {
2398 match exec.transact_time {
2399 Some(timestamp) if timestamp > end => continue,
2400 Some(_) => {}
2401 None => {
2402 log::debug!("Skipping fill report without transact_time for bounded query");
2403 continue;
2404 }
2405 }
2406 }
2407
2408 let Some(symbol) = exec.symbol else {
2410 log::debug!("Skipping execution without symbol: {:?}", exec.exec_type);
2411 continue;
2412 };
2413 let symbol_str = symbol.to_string();
2414
2415 let instrument = match self.instrument_from_cache(symbol) {
2416 Ok(instrument) => instrument,
2417 Err(e) => {
2418 log::error!(
2419 "Instrument not found in cache for execution parsing: symbol={symbol_str}, {e}"
2420 );
2421 continue;
2422 }
2423 };
2424
2425 match parse_fill_report(&exec, &instrument, ts_init) {
2426 Ok(report) => reports.push(report),
2427 Err(e) => {
2428 let error_msg = e.to_string();
2430 if error_msg.starts_with("Skipping non-trade execution")
2431 || error_msg.starts_with("Skipping execution without order_id")
2432 {
2433 log::debug!("{e}");
2434 } else {
2435 log::error!("Failed to parse fill report: {e}");
2436 }
2437 }
2438 }
2439 }
2440
2441 Ok(reports)
2442 }
2443
2444 pub async fn request_position_status_reports(
2450 &self,
2451 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2452 let params = GetPositionParamsBuilder::default()
2453 .count(500) .build()
2455 .map_err(|e| anyhow::anyhow!(e))?;
2456
2457 let response = self.inner.get_positions(params).await?;
2458
2459 let ts_init = self.generate_ts_init();
2460
2461 let mut reports = Vec::new();
2462
2463 for pos in response {
2464 let symbol = pos.symbol;
2465 let instrument = match self.instrument_from_cache(symbol) {
2466 Ok(instrument) => instrument,
2467 Err(e) => {
2468 log::error!(
2469 "Instrument not found in cache for position parsing: symbol={}, {e}",
2470 pos.symbol.as_str(),
2471 );
2472 continue;
2473 }
2474 };
2475
2476 match parse_position_report(&pos, &instrument, ts_init) {
2477 Ok(report) => reports.push(report),
2478 Err(e) => log::error!("Failed to parse position report: {e}"),
2479 }
2480 }
2481
2482 Ok(reports)
2483 }
2484
2485 pub async fn update_position_leverage(
2493 &self,
2494 symbol: &str,
2495 leverage: f64,
2496 ) -> anyhow::Result<PositionStatusReport> {
2497 let params = PostPositionLeverageParams {
2498 symbol: symbol.to_string(),
2499 leverage,
2500 target_account_id: None,
2501 };
2502
2503 let response = self.inner.update_position_leverage(params).await?;
2504
2505 let instrument = self.instrument_from_cache(Ustr::from(symbol))?;
2506 let ts_init = self.generate_ts_init();
2507
2508 parse_position_report(&response, &instrument, ts_init)
2509 }
2510}
2511
2512#[cfg(test)]
2513mod tests {
2514 use nautilus_core::UUID4;
2515 use nautilus_model::enums::OrderStatus;
2516 use rstest::rstest;
2517 use serde_json::json;
2518
2519 use super::*;
2520
2521 fn build_report(
2522 client_order_id: &str,
2523 venue_order_id: &str,
2524 contingency_type: ContingencyType,
2525 order_list_id: Option<&str>,
2526 ) -> OrderStatusReport {
2527 let mut report = OrderStatusReport::new(
2528 AccountId::from("BITMEX-1"),
2529 InstrumentId::from("XBTUSD.BITMEX"),
2530 Some(ClientOrderId::from(client_order_id)),
2531 VenueOrderId::from(venue_order_id),
2532 OrderSide::Buy,
2533 OrderType::Limit,
2534 TimeInForce::Gtc,
2535 OrderStatus::Accepted,
2536 Quantity::new(100.0, 0),
2537 Quantity::default(),
2538 UnixNanos::from(1_u64),
2539 UnixNanos::from(1_u64),
2540 UnixNanos::from(1_u64),
2541 Some(UUID4::new()),
2542 );
2543
2544 if let Some(id) = order_list_id {
2545 report = report.with_order_list_id(OrderListId::from(id));
2546 }
2547
2548 report.with_contingency_type(contingency_type)
2549 }
2550
2551 #[rstest]
2552 fn test_sign_request_generates_correct_headers() {
2553 let client = BitmexRawHttpClient::with_credentials(
2554 "test_api_key".to_string(),
2555 "test_api_secret".to_string(),
2556 "http://localhost:8080".to_string(),
2557 60,
2558 3,
2559 1_000,
2560 10_000,
2561 10_000,
2562 10,
2563 120,
2564 None,
2565 )
2566 .expect("Failed to create test client");
2567
2568 let headers = client
2569 .sign_request(&Method::GET, "/api/v1/order", None)
2570 .unwrap();
2571
2572 assert!(headers.contains_key("api-key"));
2573 assert!(headers.contains_key("api-signature"));
2574 assert!(headers.contains_key("api-expires"));
2575 assert_eq!(headers.get("api-key").unwrap(), "test_api_key");
2576 }
2577
2578 #[rstest]
2579 fn test_sign_request_with_body() {
2580 let client = BitmexRawHttpClient::with_credentials(
2581 "test_api_key".to_string(),
2582 "test_api_secret".to_string(),
2583 "http://localhost:8080".to_string(),
2584 60,
2585 3,
2586 1_000,
2587 10_000,
2588 10_000,
2589 10,
2590 120,
2591 None,
2592 )
2593 .expect("Failed to create test client");
2594
2595 let body = json!({"symbol": "XBTUSD", "orderQty": 100});
2596 let body_bytes = serde_json::to_vec(&body).unwrap();
2597
2598 let headers_without_body = client
2599 .sign_request(&Method::POST, "/api/v1/order", None)
2600 .unwrap();
2601 let headers_with_body = client
2602 .sign_request(&Method::POST, "/api/v1/order", Some(&body_bytes))
2603 .unwrap();
2604
2605 assert_ne!(
2607 headers_without_body.get("api-signature").unwrap(),
2608 headers_with_body.get("api-signature").unwrap()
2609 );
2610 }
2611
2612 #[rstest]
2613 fn test_sign_request_uses_custom_recv_window() {
2614 let client_default = BitmexRawHttpClient::with_credentials(
2615 "test_api_key".to_string(),
2616 "test_api_secret".to_string(),
2617 "http://localhost:8080".to_string(),
2618 60,
2619 3,
2620 1_000,
2621 10_000,
2622 10_000, 10,
2624 120,
2625 None,
2626 )
2627 .expect("Failed to create test client");
2628
2629 let client_custom = BitmexRawHttpClient::with_credentials(
2630 "test_api_key".to_string(),
2631 "test_api_secret".to_string(),
2632 "http://localhost:8080".to_string(),
2633 60,
2634 3,
2635 1_000,
2636 10_000,
2637 30_000, 10,
2639 120,
2640 None,
2641 )
2642 .expect("Failed to create test client");
2643
2644 let headers_default = client_default
2645 .sign_request(&Method::GET, "/api/v1/order", None)
2646 .unwrap();
2647 let headers_custom = client_custom
2648 .sign_request(&Method::GET, "/api/v1/order", None)
2649 .unwrap();
2650
2651 let expires_default: i64 = headers_default.get("api-expires").unwrap().parse().unwrap();
2653 let expires_custom: i64 = headers_custom.get("api-expires").unwrap().parse().unwrap();
2654
2655 let now = Utc::now().timestamp();
2657 assert!(expires_default > now);
2658 assert!(expires_custom > now);
2659
2660 assert!(expires_custom > expires_default);
2662
2663 let diff = expires_custom - expires_default;
2666 assert!((18..=25).contains(&diff));
2667 }
2668
2669 #[rstest]
2670 fn test_populate_linked_order_ids_from_order_list() {
2671 let base = "O-20250922-002219-001-000";
2672 let entry = format!("{base}-1");
2673 let stop = format!("{base}-2");
2674 let take = format!("{base}-3");
2675
2676 let mut reports = vec![
2677 build_report(&entry, "V-1", ContingencyType::Oto, Some("OL-1")),
2678 build_report(&stop, "V-2", ContingencyType::Ouo, Some("OL-1")),
2679 build_report(&take, "V-3", ContingencyType::Ouo, Some("OL-1")),
2680 ];
2681
2682 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2683
2684 assert_eq!(
2685 reports[0].linked_order_ids,
2686 Some(vec![
2687 ClientOrderId::from(stop.as_str()),
2688 ClientOrderId::from(take.as_str()),
2689 ]),
2690 );
2691 assert_eq!(
2692 reports[1].linked_order_ids,
2693 Some(vec![
2694 ClientOrderId::from(entry.as_str()),
2695 ClientOrderId::from(take.as_str()),
2696 ]),
2697 );
2698 assert_eq!(
2699 reports[2].linked_order_ids,
2700 Some(vec![
2701 ClientOrderId::from(entry.as_str()),
2702 ClientOrderId::from(stop.as_str()),
2703 ]),
2704 );
2705 }
2706
2707 #[rstest]
2708 fn test_populate_linked_order_ids_from_id_prefix() {
2709 let base = "O-20250922-002220-001-000";
2710 let entry = format!("{base}-1");
2711 let stop = format!("{base}-2");
2712 let take = format!("{base}-3");
2713
2714 let mut reports = vec![
2715 build_report(&entry, "V-1", ContingencyType::Oto, None),
2716 build_report(&stop, "V-2", ContingencyType::Ouo, None),
2717 build_report(&take, "V-3", ContingencyType::Ouo, None),
2718 ];
2719
2720 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2721
2722 assert_eq!(
2723 reports[0].linked_order_ids,
2724 Some(vec![
2725 ClientOrderId::from(stop.as_str()),
2726 ClientOrderId::from(take.as_str()),
2727 ]),
2728 );
2729 assert_eq!(
2730 reports[1].linked_order_ids,
2731 Some(vec![
2732 ClientOrderId::from(entry.as_str()),
2733 ClientOrderId::from(take.as_str()),
2734 ]),
2735 );
2736 assert_eq!(
2737 reports[2].linked_order_ids,
2738 Some(vec![
2739 ClientOrderId::from(entry.as_str()),
2740 ClientOrderId::from(stop.as_str()),
2741 ]),
2742 );
2743 }
2744
2745 #[rstest]
2746 fn test_populate_linked_order_ids_respects_non_contingent_orders() {
2747 let base = "O-20250922-002221-001-000";
2748 let entry = format!("{base}-1");
2749 let passive = format!("{base}-2");
2750
2751 let mut reports = vec![
2752 build_report(&entry, "V-1", ContingencyType::NoContingency, None),
2753 build_report(&passive, "V-2", ContingencyType::Ouo, None),
2754 ];
2755
2756 BitmexHttpClient::populate_linked_order_ids(&mut reports);
2757
2758 assert!(reports[0].linked_order_ids.is_none());
2760
2761 assert!(reports[1].linked_order_ids.is_none());
2763 assert_eq!(reports[1].contingency_type, ContingencyType::NoContingency);
2764 }
2765}