1use std::{
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroU32,
22 sync::{
23 Arc,
24 atomic::{AtomicBool, Ordering},
25 },
26};
27
28use ahash::AHashMap;
29use chrono::{DateTime, Utc};
30use nautilus_core::{
31 AtomicMap, AtomicTime, UUID4, consts::NAUTILUS_USER_AGENT, nanos::UnixNanos,
32 time::get_atomic_clock_realtime,
33};
34use nautilus_model::{
35 data::{Bar, BarType, BookOrder, FundingRateUpdate, TradeTick},
36 enums::{
37 AccountType, BookType, CurrencyType, MarketStatusAction, OrderSide, OrderType, TimeInForce,
38 TriggerType,
39 },
40 events::AccountState,
41 identifiers::{AccountId, ClientOrderId, InstrumentId, Symbol, VenueOrderId},
42 instruments::{Instrument, InstrumentAny},
43 orderbook::OrderBook,
44 reports::{FillReport, OrderStatusReport, PositionStatusReport},
45 types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
46};
47use nautilus_network::{
48 http::{HttpClient, Method, USER_AGENT},
49 ratelimiter::quota::Quota,
50 retry::{RetryConfig, RetryManager},
51};
52use rust_decimal::{Decimal, prelude::FromPrimitive};
53use serde::de::DeserializeOwned;
54use tokio_util::sync::CancellationToken;
55use ustr::Ustr;
56
57use super::{models::*, query::*};
58use crate::{
59 common::{
60 consts::{KRAKEN_VENUE, NAUTILUS_KRAKEN_BROKER_ID},
61 credential::KrakenCredential,
62 enums::{
63 KrakenApiResult, KrakenEnvironment, KrakenFuturesOrderType, KrakenOrderSide,
64 KrakenProductType, KrakenSendStatus, KrakenTriggerSignal,
65 },
66 parse::{
67 bar_type_to_futures_resolution, parse_bar, parse_futures_fill_report,
68 parse_futures_instrument, parse_futures_order_event_status_report,
69 parse_futures_order_status_report, parse_futures_position_status_report,
70 parse_futures_public_execution, truncate_cl_ord_id,
71 },
72 urls::get_kraken_http_base_url,
73 },
74 http::{error::KrakenHttpError, models::OhlcData},
75};
76
77pub const KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 5;
79
80const KRAKEN_GLOBAL_RATE_KEY: &str = "kraken:futures:global";
81
82const BATCH_CANCEL_LIMIT: usize = 50;
84
85const BATCH_ORDER_LIMIT: usize = 10;
87
88pub struct KrakenFuturesRawHttpClient {
93 base_url: String,
94 client: HttpClient,
95 credential: Option<KrakenCredential>,
96 retry_manager: RetryManager<KrakenHttpError>,
97 cancellation_token: CancellationToken,
98 clock: &'static AtomicTime,
99 auth_mutex: tokio::sync::Mutex<()>,
101}
102
103impl Default for KrakenFuturesRawHttpClient {
104 fn default() -> Self {
105 Self::new(
106 KrakenEnvironment::Mainnet,
107 None,
108 60,
109 None,
110 None,
111 None,
112 None,
113 KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND,
114 )
115 .expect("Failed to create default KrakenFuturesRawHttpClient")
116 }
117}
118
119impl Debug for KrakenFuturesRawHttpClient {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 f.debug_struct(stringify!(KrakenFuturesRawHttpClient))
122 .field("base_url", &self.base_url)
123 .field("has_credentials", &self.credential.is_some())
124 .finish()
125 }
126}
127
128impl KrakenFuturesRawHttpClient {
129 #[expect(clippy::too_many_arguments)]
131 pub fn new(
132 environment: KrakenEnvironment,
133 base_url_override: Option<String>,
134 timeout_secs: u64,
135 max_retries: Option<u32>,
136 retry_delay_ms: Option<u64>,
137 retry_delay_max_ms: Option<u64>,
138 proxy_url: Option<String>,
139 max_requests_per_second: u32,
140 ) -> anyhow::Result<Self> {
141 let retry_config = RetryConfig {
142 max_retries: max_retries.unwrap_or(3),
143 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
144 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
145 backoff_factor: 2.0,
146 jitter_ms: 1000,
147 operation_timeout_ms: Some(60_000),
148 immediate_first: false,
149 max_elapsed_ms: Some(180_000),
150 };
151
152 let retry_manager = RetryManager::new(retry_config);
153 let base_url = base_url_override.unwrap_or_else(|| {
154 get_kraken_http_base_url(KrakenProductType::Futures, environment).to_string()
155 });
156
157 Ok(Self {
158 base_url,
159 client: HttpClient::new(
160 Self::default_headers(),
161 vec![],
162 Self::rate_limiter_quotas(max_requests_per_second)?,
163 Some(Self::default_quota(max_requests_per_second)?),
164 Some(timeout_secs),
165 proxy_url,
166 )
167 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
168 credential: None,
169 retry_manager,
170 cancellation_token: CancellationToken::new(),
171 clock: get_atomic_clock_realtime(),
172 auth_mutex: tokio::sync::Mutex::new(()),
173 })
174 }
175
176 #[expect(clippy::too_many_arguments)]
178 pub fn with_credentials(
179 api_key: String,
180 api_secret: String,
181 environment: KrakenEnvironment,
182 base_url_override: Option<String>,
183 timeout_secs: u64,
184 max_retries: Option<u32>,
185 retry_delay_ms: Option<u64>,
186 retry_delay_max_ms: Option<u64>,
187 proxy_url: Option<String>,
188 max_requests_per_second: u32,
189 ) -> anyhow::Result<Self> {
190 let retry_config = RetryConfig {
191 max_retries: max_retries.unwrap_or(3),
192 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
193 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
194 backoff_factor: 2.0,
195 jitter_ms: 1000,
196 operation_timeout_ms: Some(60_000),
197 immediate_first: false,
198 max_elapsed_ms: Some(180_000),
199 };
200
201 let retry_manager = RetryManager::new(retry_config);
202 let base_url = base_url_override.unwrap_or_else(|| {
203 get_kraken_http_base_url(KrakenProductType::Futures, environment).to_string()
204 });
205
206 Ok(Self {
207 base_url,
208 client: HttpClient::new(
209 Self::default_headers(),
210 vec![],
211 Self::rate_limiter_quotas(max_requests_per_second)?,
212 Some(Self::default_quota(max_requests_per_second)?),
213 Some(timeout_secs),
214 proxy_url,
215 )
216 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
217 credential: Some(KrakenCredential::new(api_key, api_secret)),
218 retry_manager,
219 cancellation_token: CancellationToken::new(),
220 clock: get_atomic_clock_realtime(),
221 auth_mutex: tokio::sync::Mutex::new(()),
222 })
223 }
224
225 fn generate_nonce(&self) -> u64 {
230 self.clock.get_time_ns().as_u64()
231 }
232
233 pub fn base_url(&self) -> &str {
235 &self.base_url
236 }
237
238 pub fn credential(&self) -> Option<&KrakenCredential> {
240 self.credential.as_ref()
241 }
242
243 pub fn cancel_all_requests(&self) {
245 self.cancellation_token.cancel();
246 }
247
248 pub fn cancellation_token(&self) -> &CancellationToken {
250 &self.cancellation_token
251 }
252
253 fn default_headers() -> HashMap<String, String> {
254 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
255 }
256
257 fn default_quota(max_requests_per_second: u32) -> anyhow::Result<Quota> {
258 let burst = NonZeroU32::new(max_requests_per_second).unwrap_or(
259 NonZeroU32::new(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND).expect("non-zero"),
260 );
261 Quota::per_second(burst).ok_or_else(|| {
262 anyhow::anyhow!(
263 "Invalid max_requests_per_second: {max_requests_per_second} exceeds maximum"
264 )
265 })
266 }
267
268 fn rate_limiter_quotas(max_requests_per_second: u32) -> anyhow::Result<Vec<(String, Quota)>> {
269 Ok(vec![(
270 KRAKEN_GLOBAL_RATE_KEY.to_string(),
271 Self::default_quota(max_requests_per_second)?,
272 )])
273 }
274
275 fn rate_limit_keys(endpoint: &str) -> Vec<String> {
276 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
277 let route = format!("kraken:futures:{normalized}");
278 vec![KRAKEN_GLOBAL_RATE_KEY.to_string(), route]
279 }
280
281 async fn send_request<T: DeserializeOwned>(
282 &self,
283 method: Method,
284 endpoint: &str,
285 url: String,
286 authenticate: bool,
287 ) -> anyhow::Result<T, KrakenHttpError> {
288 let _guard = if authenticate {
292 Some(self.auth_mutex.lock().await)
293 } else {
294 None
295 };
296
297 let endpoint = endpoint.to_string();
298 let method_clone = method.clone();
299 let url_clone = url.clone();
300 let credential = self.credential.clone();
301
302 let operation = || {
303 let url = url_clone.clone();
304 let method = method_clone.clone();
305 let endpoint = endpoint.clone();
306 let credential = credential.clone();
307
308 async move {
309 let mut headers = Self::default_headers();
310
311 if authenticate {
312 let cred = credential.as_ref().ok_or_else(|| {
313 KrakenHttpError::AuthenticationError(
314 "Missing credentials for authenticated request".to_string(),
315 )
316 })?;
317
318 let nonce = self.generate_nonce();
319
320 let signature = cred.sign_futures(&endpoint, "", nonce).map_err(|e| {
321 KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
322 })?;
323
324 let base_url = &self.base_url;
325 log::debug!(
326 "Kraken Futures auth: endpoint={endpoint}, nonce={nonce}, base_url={base_url}"
327 );
328
329 headers.insert("APIKey".to_string(), cred.api_key().to_string());
330 headers.insert("Authent".to_string(), signature);
331 headers.insert("Nonce".to_string(), nonce.to_string());
332 }
333
334 let rate_limit_keys = Self::rate_limit_keys(&endpoint);
335
336 let response = self
337 .client
338 .request(
339 method,
340 url,
341 None,
342 Some(headers),
343 None,
344 None,
345 Some(rate_limit_keys),
346 )
347 .await
348 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
349
350 let status = response.status.as_u16();
351 if status >= 400 {
352 let body = String::from_utf8_lossy(&response.body).to_string();
353 if status == 401 || status == 403 {
355 return Err(KrakenHttpError::AuthenticationError(format!(
356 "HTTP error {status}: {body}"
357 )));
358 }
359 return Err(KrakenHttpError::NetworkError(format!(
360 "HTTP error {status}: {body}"
361 )));
362 }
363
364 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
365 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
366 })?;
367
368 serde_json::from_str(&response_text).map_err(|e| {
369 KrakenHttpError::ParseError(format!(
370 "Failed to deserialize futures response: {e}"
371 ))
372 })
373 }
374 };
375
376 let should_retry =
377 |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
378 let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
379
380 self.retry_manager
381 .execute_with_retry_with_cancel(
382 &endpoint,
383 operation,
384 should_retry,
385 create_error,
386 &self.cancellation_token,
387 )
388 .await
389 }
390
391 async fn send_get_with_query<T: DeserializeOwned>(
396 &self,
397 endpoint: &str,
398 url: String,
399 query_string: &str,
400 ) -> anyhow::Result<T, KrakenHttpError> {
401 let _guard = self.auth_mutex.lock().await;
402
403 if self.cancellation_token.is_cancelled() {
404 return Err(KrakenHttpError::NetworkError(
405 "Request cancelled".to_string(),
406 ));
407 }
408
409 let credential = self.credential.as_ref().ok_or_else(|| {
410 KrakenHttpError::AuthenticationError("Missing credentials".to_string())
411 })?;
412
413 let nonce = self.generate_nonce();
414
415 let signature = credential
417 .sign_futures(endpoint, query_string, nonce)
418 .map_err(|e| {
419 KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
420 })?;
421
422 log::debug!(
423 "Kraken Futures GET with query: endpoint={endpoint}, query={query_string}, nonce={nonce}"
424 );
425
426 let mut headers = Self::default_headers();
427 headers.insert("APIKey".to_string(), credential.api_key().to_string());
428 headers.insert("Authent".to_string(), signature);
429 headers.insert("Nonce".to_string(), nonce.to_string());
430
431 let rate_limit_keys = Self::rate_limit_keys(endpoint);
432
433 let response = self
434 .client
435 .request(
436 Method::GET,
437 url,
438 None,
439 Some(headers),
440 None,
441 None,
442 Some(rate_limit_keys),
443 )
444 .await
445 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
446
447 let status = response.status.as_u16();
448 if status >= 400 {
449 let body = String::from_utf8_lossy(&response.body).to_string();
450
451 if status == 401 || status == 403 {
452 return Err(KrakenHttpError::AuthenticationError(format!(
453 "HTTP error {status}: {body}"
454 )));
455 }
456 return Err(KrakenHttpError::NetworkError(format!(
457 "HTTP error {status}: {body}"
458 )));
459 }
460
461 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
462 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
463 })?;
464
465 serde_json::from_str(&response_text).map_err(|e| {
466 KrakenHttpError::ParseError(format!("Failed to deserialize futures response: {e}"))
467 })
468 }
469
470 async fn send_request_with_body<T: DeserializeOwned>(
471 &self,
472 endpoint: &str,
473 params: HashMap<String, String>,
474 ) -> anyhow::Result<T, KrakenHttpError> {
475 let post_data = serde_urlencoded::to_string(¶ms)
476 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
477 self.send_authenticated_post(endpoint, post_data).await
478 }
479
480 async fn send_request_with_params<P: serde::Serialize, T: DeserializeOwned>(
482 &self,
483 endpoint: &str,
484 params: &P,
485 ) -> anyhow::Result<T, KrakenHttpError> {
486 let post_data = serde_urlencoded::to_string(params)
487 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
488 self.send_authenticated_post(endpoint, post_data).await
489 }
490
491 async fn send_authenticated_post<T: DeserializeOwned>(
493 &self,
494 endpoint: &str,
495 post_data: String,
496 ) -> anyhow::Result<T, KrakenHttpError> {
497 if self.cancellation_token.is_cancelled() {
498 return Err(KrakenHttpError::NetworkError(
499 "Request cancelled".to_string(),
500 ));
501 }
502
503 let _guard = self.auth_mutex.lock().await;
505
506 if self.cancellation_token.is_cancelled() {
507 return Err(KrakenHttpError::NetworkError(
508 "Request cancelled".to_string(),
509 ));
510 }
511
512 let credential = self.credential.as_ref().ok_or_else(|| {
513 KrakenHttpError::AuthenticationError("Missing credentials".to_string())
514 })?;
515
516 let nonce = self.generate_nonce();
517 log::debug!("Generated nonce {nonce} for {endpoint}");
518
519 let signature = credential
520 .sign_futures(endpoint, &post_data, nonce)
521 .map_err(|e| {
522 KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
523 })?;
524
525 let url = format!("{}{endpoint}", self.base_url);
526 let mut headers = Self::default_headers();
527 headers.insert(
528 "Content-Type".to_string(),
529 "application/x-www-form-urlencoded".to_string(),
530 );
531 headers.insert("APIKey".to_string(), credential.api_key().to_string());
532 headers.insert("Authent".to_string(), signature);
533 headers.insert("Nonce".to_string(), nonce.to_string());
534
535 let rate_limit_keys = Self::rate_limit_keys(endpoint);
536
537 let response = self
538 .client
539 .request(
540 Method::POST,
541 url,
542 None,
543 Some(headers),
544 Some(post_data.into_bytes()),
545 None,
546 Some(rate_limit_keys),
547 )
548 .await
549 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
550
551 if response.status.as_u16() >= 400 {
552 let status = response.status.as_u16();
553 let body = String::from_utf8_lossy(&response.body).to_string();
554 return Err(KrakenHttpError::NetworkError(format!(
555 "HTTP error {status}: {body}"
556 )));
557 }
558
559 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
560 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
561 })?;
562
563 serde_json::from_str(&response_text).map_err(|e| {
564 log::error!("Failed to parse response from {endpoint}: {response_text}");
565 KrakenHttpError::ParseError(format!("Failed to deserialize response: {e}"))
566 })
567 }
568
569 pub async fn get_instruments(
571 &self,
572 ) -> anyhow::Result<FuturesInstrumentsResponse, KrakenHttpError> {
573 let endpoint = "/derivatives/api/v3/instruments";
574 let url = format!("{}{endpoint}", self.base_url);
575
576 self.send_request(Method::GET, endpoint, url, false).await
577 }
578
579 pub async fn get_tickers(&self) -> anyhow::Result<FuturesTickersResponse, KrakenHttpError> {
581 let endpoint = "/derivatives/api/v3/tickers";
582 let url = format!("{}{endpoint}", self.base_url);
583
584 self.send_request(Method::GET, endpoint, url, false).await
585 }
586
587 pub async fn get_orderbook(
589 &self,
590 symbol: &str,
591 ) -> anyhow::Result<FuturesOrderBookResponse, KrakenHttpError> {
592 let endpoint = format!("/derivatives/api/v3/orderbook?symbol={symbol}");
593 let url = format!("{}{endpoint}", self.base_url);
594
595 self.send_request(Method::GET, &endpoint, url, false).await
596 }
597
598 pub async fn get_historical_funding_rates(
600 &self,
601 symbol: &str,
602 ) -> anyhow::Result<FuturesHistoricalFundingRatesResponse, KrakenHttpError> {
603 let endpoint = format!("/derivatives/api/v4/historicalfundingrates?symbol={symbol}");
604 let url = format!("{}{endpoint}", self.base_url);
605
606 self.send_request(Method::GET, &endpoint, url, false).await
607 }
608
609 pub async fn get_ohlc(
611 &self,
612 tick_type: &str,
613 symbol: &str,
614 resolution: &str,
615 from: Option<i64>,
616 to: Option<i64>,
617 ) -> anyhow::Result<FuturesCandlesResponse, KrakenHttpError> {
618 let endpoint = format!("/api/charts/v1/{tick_type}/{symbol}/{resolution}");
619
620 let mut url = format!("{}{endpoint}", self.base_url);
621
622 let mut query_params = Vec::new();
623
624 if let Some(from_ts) = from {
625 query_params.push(format!("from={from_ts}"));
626 }
627
628 if let Some(to_ts) = to {
629 query_params.push(format!("to={to_ts}"));
630 }
631
632 if !query_params.is_empty() {
633 url.push('?');
634 url.push_str(&query_params.join("&"));
635 }
636
637 self.send_request(Method::GET, &endpoint, url, false).await
638 }
639
640 pub async fn get_public_executions(
642 &self,
643 symbol: &str,
644 since: Option<i64>,
645 before: Option<i64>,
646 sort: Option<&str>,
647 continuation_token: Option<&str>,
648 ) -> anyhow::Result<FuturesPublicExecutionsResponse, KrakenHttpError> {
649 let endpoint = format!("/api/history/v3/market/{symbol}/executions");
650
651 let mut url = format!("{}{endpoint}", self.base_url);
652
653 let mut query_params = Vec::new();
654
655 if let Some(since_ts) = since {
656 query_params.push(format!("since={since_ts}"));
657 }
658
659 if let Some(before_ts) = before {
660 query_params.push(format!("before={before_ts}"));
661 }
662
663 if let Some(sort_order) = sort {
664 query_params.push(format!("sort={sort_order}"));
665 }
666
667 if let Some(token) = continuation_token {
668 query_params.push(format!("continuationToken={token}"));
669 }
670
671 if !query_params.is_empty() {
672 url.push('?');
673 url.push_str(&query_params.join("&"));
674 }
675
676 self.send_request(Method::GET, &endpoint, url, false).await
677 }
678
679 pub async fn get_open_orders(
681 &self,
682 ) -> anyhow::Result<FuturesOpenOrdersResponse, KrakenHttpError> {
683 if self.credential.is_none() {
684 return Err(KrakenHttpError::AuthenticationError(
685 "API credentials required for futures open orders".to_string(),
686 ));
687 }
688
689 let endpoint = "/derivatives/api/v3/openorders";
690 let url = format!("{}{endpoint}", self.base_url);
691
692 self.send_request(Method::GET, endpoint, url, true).await
693 }
694
695 pub async fn get_order_events(
697 &self,
698 before: Option<i64>,
699 since: Option<i64>,
700 continuation_token: Option<&str>,
701 ) -> anyhow::Result<FuturesOrderEventsResponse, KrakenHttpError> {
702 if self.credential.is_none() {
703 return Err(KrakenHttpError::AuthenticationError(
704 "API credentials required for futures order events".to_string(),
705 ));
706 }
707
708 let endpoint = "/api/history/v2/orders";
709 let mut query_params = Vec::new();
710
711 if let Some(before_ts) = before {
712 query_params.push(format!("before={before_ts}"));
713 }
714
715 if let Some(since_ts) = since {
716 query_params.push(format!("since={since_ts}"));
717 }
718
719 if let Some(token) = continuation_token {
720 query_params.push(format!("continuation_token={token}"));
721 }
722
723 let query_string = query_params.join("&");
725 let url = if query_string.is_empty() {
726 format!("{}{endpoint}", self.base_url)
727 } else {
728 format!("{}{endpoint}?{query_string}", self.base_url)
729 };
730
731 self.send_get_with_query(endpoint, url, &query_string).await
734 }
735
736 pub async fn get_fills(
738 &self,
739 last_fill_time: Option<&str>,
740 ) -> anyhow::Result<FuturesFillsResponse, KrakenHttpError> {
741 if self.credential.is_none() {
742 return Err(KrakenHttpError::AuthenticationError(
743 "API credentials required for futures fills".to_string(),
744 ));
745 }
746
747 let endpoint = "/derivatives/api/v3/fills";
748 let query_string = last_fill_time
749 .map(|t| format!("lastFillTime={t}"))
750 .unwrap_or_default();
751
752 let url = if query_string.is_empty() {
753 format!("{}{endpoint}", self.base_url)
754 } else {
755 format!("{}{endpoint}?{query_string}", self.base_url)
756 };
757
758 self.send_get_with_query(endpoint, url, &query_string).await
760 }
761
762 pub async fn get_open_positions(
764 &self,
765 ) -> anyhow::Result<FuturesOpenPositionsResponse, KrakenHttpError> {
766 if self.credential.is_none() {
767 return Err(KrakenHttpError::AuthenticationError(
768 "API credentials required for futures open positions".to_string(),
769 ));
770 }
771
772 let endpoint = "/derivatives/api/v3/openpositions";
773 let url = format!("{}{endpoint}", self.base_url);
774
775 self.send_request(Method::GET, endpoint, url, true).await
776 }
777
778 pub async fn get_accounts(&self) -> anyhow::Result<FuturesAccountsResponse, KrakenHttpError> {
780 if self.credential.is_none() {
781 return Err(KrakenHttpError::AuthenticationError(
782 "API credentials required for futures accounts".to_string(),
783 ));
784 }
785
786 let endpoint = "/derivatives/api/v3/accounts";
787 let url = format!("{}{endpoint}", self.base_url);
788
789 self.send_request(Method::GET, endpoint, url, true).await
790 }
791
792 pub async fn send_order(
794 &self,
795 params: HashMap<String, String>,
796 ) -> anyhow::Result<FuturesSendOrderResponse, KrakenHttpError> {
797 if self.credential.is_none() {
798 return Err(KrakenHttpError::AuthenticationError(
799 "API credentials required for sending orders".to_string(),
800 ));
801 }
802
803 let endpoint = "/derivatives/api/v3/sendorder";
804 self.send_request_with_body(endpoint, params).await
805 }
806
807 pub async fn send_order_params(
809 &self,
810 params: &KrakenFuturesSendOrderParams,
811 ) -> anyhow::Result<FuturesSendOrderResponse, KrakenHttpError> {
812 if self.credential.is_none() {
813 return Err(KrakenHttpError::AuthenticationError(
814 "API credentials required for sending orders".to_string(),
815 ));
816 }
817
818 let endpoint = "/derivatives/api/v3/sendorder";
819 self.send_request_with_params(endpoint, params).await
820 }
821
822 pub async fn cancel_order(
824 &self,
825 order_id: Option<String>,
826 cli_ord_id: Option<String>,
827 ) -> anyhow::Result<FuturesCancelOrderResponse, KrakenHttpError> {
828 if self.credential.is_none() {
829 return Err(KrakenHttpError::AuthenticationError(
830 "API credentials required for canceling orders".to_string(),
831 ));
832 }
833
834 let mut params = HashMap::new();
835
836 if let Some(id) = order_id {
837 params.insert("order_id".to_string(), id);
838 }
839
840 if let Some(id) = cli_ord_id {
841 params.insert("cliOrdId".to_string(), id);
842 }
843
844 let endpoint = "/derivatives/api/v3/cancelorder";
845 self.send_request_with_body(endpoint, params).await
846 }
847
848 pub async fn edit_order(
850 &self,
851 params: &KrakenFuturesEditOrderParams,
852 ) -> anyhow::Result<FuturesEditOrderResponse, KrakenHttpError> {
853 if self.credential.is_none() {
854 return Err(KrakenHttpError::AuthenticationError(
855 "API credentials required for editing orders".to_string(),
856 ));
857 }
858
859 let endpoint = "/derivatives/api/v3/editorder";
860 self.send_request_with_params(endpoint, params).await
861 }
862
863 pub async fn batch_order(
865 &self,
866 params: HashMap<String, String>,
867 ) -> anyhow::Result<FuturesBatchOrderResponse, KrakenHttpError> {
868 if self.credential.is_none() {
869 return Err(KrakenHttpError::AuthenticationError(
870 "API credentials required for batch orders".to_string(),
871 ));
872 }
873
874 let endpoint = "/derivatives/api/v3/batchorder";
875 self.send_request_with_body(endpoint, params).await
876 }
877
878 pub async fn cancel_orders_batch(
880 &self,
881 order_ids: Vec<String>,
882 ) -> anyhow::Result<FuturesBatchCancelResponse, KrakenHttpError> {
883 if self.credential.is_none() {
884 return Err(KrakenHttpError::AuthenticationError(
885 "API credentials required for batch orders".to_string(),
886 ));
887 }
888
889 let batch_items: Vec<KrakenFuturesBatchCancelItem> = order_ids
890 .into_iter()
891 .map(KrakenFuturesBatchCancelItem::from_order_id)
892 .collect();
893
894 let params = KrakenFuturesBatchOrderParams::new(batch_items);
895 let post_data = params
896 .to_body()
897 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize batch: {e}")))?;
898
899 let endpoint = "/derivatives/api/v3/batchorder";
900 self.send_authenticated_post(endpoint, post_data).await
901 }
902
903 pub async fn submit_orders_batch(
905 &self,
906 items: Vec<KrakenFuturesBatchSendItem>,
907 ) -> anyhow::Result<FuturesBatchOrderResponse, KrakenHttpError> {
908 if self.credential.is_none() {
909 return Err(KrakenHttpError::AuthenticationError(
910 "API credentials required for batch orders".to_string(),
911 ));
912 }
913
914 let params = KrakenFuturesBatchOrderParams::new(items);
915 let post_data = params
916 .to_body()
917 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize batch: {e}")))?;
918
919 let endpoint = "/derivatives/api/v3/batchorder";
920 self.send_authenticated_post(endpoint, post_data).await
921 }
922
923 pub async fn edit_orders_batch(
925 &self,
926 items: Vec<KrakenFuturesBatchEditItem>,
927 ) -> anyhow::Result<FuturesBatchOrderResponse, KrakenHttpError> {
928 if self.credential.is_none() {
929 return Err(KrakenHttpError::AuthenticationError(
930 "API credentials required for batch orders".to_string(),
931 ));
932 }
933
934 let params = KrakenFuturesBatchOrderParams::new(items);
935 let post_data = params
936 .to_body()
937 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize batch: {e}")))?;
938
939 let endpoint = "/derivatives/api/v3/batchorder";
940 self.send_authenticated_post(endpoint, post_data).await
941 }
942
943 pub async fn cancel_all_orders(
945 &self,
946 symbol: Option<String>,
947 ) -> anyhow::Result<FuturesCancelAllOrdersResponse, KrakenHttpError> {
948 if self.credential.is_none() {
949 return Err(KrakenHttpError::AuthenticationError(
950 "API credentials required for canceling orders".to_string(),
951 ));
952 }
953
954 let mut params = HashMap::new();
955
956 if let Some(sym) = symbol {
957 params.insert("symbol".to_string(), sym);
958 }
959
960 let endpoint = "/derivatives/api/v3/cancelallorders";
961 self.send_request_with_body(endpoint, params).await
962 }
963}
964
965#[cfg_attr(
971 feature = "python",
972 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken", from_py_object)
973)]
974#[cfg_attr(
975 feature = "python",
976 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.kraken")
977)]
978pub struct KrakenFuturesHttpClient {
979 pub(crate) inner: Arc<KrakenFuturesRawHttpClient>,
980 pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
981 clock: &'static AtomicTime,
982 cache_initialized: Arc<AtomicBool>,
983}
984
985impl Clone for KrakenFuturesHttpClient {
986 fn clone(&self) -> Self {
987 Self {
988 inner: self.inner.clone(),
989 instruments_cache: self.instruments_cache.clone(),
990 cache_initialized: self.cache_initialized.clone(),
991 clock: self.clock,
992 }
993 }
994}
995
996impl Default for KrakenFuturesHttpClient {
997 fn default() -> Self {
998 Self::new(
999 KrakenEnvironment::Mainnet,
1000 None,
1001 60,
1002 None,
1003 None,
1004 None,
1005 None,
1006 KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND,
1007 )
1008 .expect("Failed to create default KrakenFuturesHttpClient")
1009 }
1010}
1011
1012impl Debug for KrakenFuturesHttpClient {
1013 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1014 f.debug_struct(stringify!(KrakenFuturesHttpClient))
1015 .field("inner", &self.inner)
1016 .finish()
1017 }
1018}
1019
1020impl KrakenFuturesHttpClient {
1021 #[expect(clippy::too_many_arguments)]
1023 pub fn new(
1024 environment: KrakenEnvironment,
1025 base_url_override: Option<String>,
1026 timeout_secs: u64,
1027 max_retries: Option<u32>,
1028 retry_delay_ms: Option<u64>,
1029 retry_delay_max_ms: Option<u64>,
1030 proxy_url: Option<String>,
1031 max_requests_per_second: u32,
1032 ) -> anyhow::Result<Self> {
1033 Ok(Self {
1034 inner: Arc::new(KrakenFuturesRawHttpClient::new(
1035 environment,
1036 base_url_override,
1037 timeout_secs,
1038 max_retries,
1039 retry_delay_ms,
1040 retry_delay_max_ms,
1041 proxy_url,
1042 max_requests_per_second,
1043 )?),
1044 instruments_cache: Arc::new(AtomicMap::new()),
1045 cache_initialized: Arc::new(AtomicBool::new(false)),
1046 clock: get_atomic_clock_realtime(),
1047 })
1048 }
1049
1050 #[expect(clippy::too_many_arguments)]
1052 pub fn with_credentials(
1053 api_key: String,
1054 api_secret: String,
1055 environment: KrakenEnvironment,
1056 base_url_override: Option<String>,
1057 timeout_secs: u64,
1058 max_retries: Option<u32>,
1059 retry_delay_ms: Option<u64>,
1060 retry_delay_max_ms: Option<u64>,
1061 proxy_url: Option<String>,
1062 max_requests_per_second: u32,
1063 ) -> anyhow::Result<Self> {
1064 Ok(Self {
1065 inner: Arc::new(KrakenFuturesRawHttpClient::with_credentials(
1066 api_key,
1067 api_secret,
1068 environment,
1069 base_url_override,
1070 timeout_secs,
1071 max_retries,
1072 retry_delay_ms,
1073 retry_delay_max_ms,
1074 proxy_url,
1075 max_requests_per_second,
1076 )?),
1077 instruments_cache: Arc::new(AtomicMap::new()),
1078 cache_initialized: Arc::new(AtomicBool::new(false)),
1079 clock: get_atomic_clock_realtime(),
1080 })
1081 }
1082
1083 #[expect(clippy::too_many_arguments)]
1090 pub fn from_env(
1091 environment: KrakenEnvironment,
1092 base_url_override: Option<String>,
1093 timeout_secs: u64,
1094 max_retries: Option<u32>,
1095 retry_delay_ms: Option<u64>,
1096 retry_delay_max_ms: Option<u64>,
1097 proxy_url: Option<String>,
1098 max_requests_per_second: u32,
1099 ) -> anyhow::Result<Self> {
1100 let demo = environment == KrakenEnvironment::Demo;
1101
1102 if let Some(credential) = KrakenCredential::from_env_futures(demo) {
1103 let (api_key, api_secret) = credential.into_parts();
1104 Self::with_credentials(
1105 api_key,
1106 api_secret,
1107 environment,
1108 base_url_override,
1109 timeout_secs,
1110 max_retries,
1111 retry_delay_ms,
1112 retry_delay_max_ms,
1113 proxy_url,
1114 max_requests_per_second,
1115 )
1116 } else {
1117 Self::new(
1118 environment,
1119 base_url_override,
1120 timeout_secs,
1121 max_retries,
1122 retry_delay_ms,
1123 retry_delay_max_ms,
1124 proxy_url,
1125 max_requests_per_second,
1126 )
1127 }
1128 }
1129
1130 pub fn cancel_all_requests(&self) {
1132 self.inner.cancel_all_requests();
1133 }
1134
1135 pub fn cancellation_token(&self) -> &CancellationToken {
1137 self.inner.cancellation_token()
1138 }
1139
1140 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1142 self.instruments_cache
1143 .insert(instrument.symbol().inner(), instrument);
1144 self.cache_initialized.store(true, Ordering::Release);
1145 }
1146
1147 pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1149 self.instruments_cache.rcu(|m| {
1150 for instrument in instruments {
1151 m.insert(instrument.symbol().inner(), instrument.clone());
1152 }
1153 });
1154 self.cache_initialized.store(true, Ordering::Release);
1155 }
1156
1157 pub fn get_cached_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1159 self.instruments_cache.get_cloned(symbol)
1160 }
1161
1162 fn get_instrument_by_raw_symbol(&self, raw_symbol: &str) -> Option<InstrumentAny> {
1163 self.instruments_cache
1164 .load()
1165 .values()
1166 .find(|inst| inst.raw_symbol().as_str() == raw_symbol)
1167 .cloned()
1168 }
1169
1170 fn generate_ts_init(&self) -> UnixNanos {
1171 self.clock.get_time_ns()
1172 }
1173
1174 pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>, KrakenHttpError> {
1176 let ts_init = self.generate_ts_init();
1177 let response = self.inner.get_instruments().await?;
1178
1179 let instruments: Vec<InstrumentAny> = response
1180 .instruments
1181 .iter()
1182 .filter_map(|fut_instrument| {
1183 match parse_futures_instrument(fut_instrument, ts_init, ts_init) {
1184 Ok(instrument) => Some(instrument),
1185 Err(e) => {
1186 let symbol = &fut_instrument.symbol;
1187 log::warn!("Failed to parse futures instrument {symbol}: {e}");
1188 None
1189 }
1190 }
1191 })
1192 .collect();
1193
1194 Ok(instruments)
1195 }
1196
1197 pub async fn request_instrument_statuses(
1199 &self,
1200 ) -> anyhow::Result<AHashMap<InstrumentId, MarketStatusAction>, KrakenHttpError> {
1201 let response = self.inner.get_instruments().await?;
1202
1203 Ok(response
1204 .instruments
1205 .iter()
1206 .map(|instrument| {
1207 let instrument_id =
1208 InstrumentId::new(Symbol::new(&instrument.symbol), *KRAKEN_VENUE);
1209 let action = if instrument.tradeable {
1210 MarketStatusAction::Trading
1211 } else {
1212 MarketStatusAction::NotAvailableForTrading
1213 };
1214
1215 (instrument_id, action)
1216 })
1217 .collect())
1218 }
1219
1220 pub async fn request_mark_price(
1222 &self,
1223 instrument_id: InstrumentId,
1224 ) -> anyhow::Result<f64, KrakenHttpError> {
1225 let instrument = self
1226 .get_cached_instrument(&instrument_id.symbol.inner())
1227 .ok_or_else(|| {
1228 KrakenHttpError::ParseError(format!(
1229 "Instrument not found in cache: {instrument_id}"
1230 ))
1231 })?;
1232
1233 let raw_symbol = instrument.raw_symbol().to_string();
1234 let tickers = self.inner.get_tickers().await?;
1235
1236 tickers
1237 .tickers
1238 .iter()
1239 .find(|t| t.symbol == raw_symbol)
1240 .ok_or_else(|| {
1241 KrakenHttpError::ParseError(format!("Symbol {raw_symbol} not found in tickers"))
1242 })
1243 .and_then(|t| {
1244 t.mark_price.ok_or_else(|| {
1245 KrakenHttpError::ParseError(format!(
1246 "Mark price not available for {raw_symbol} (may not be available in testnet)"
1247 ))
1248 })
1249 })
1250 }
1251
1252 pub async fn request_index_price(
1253 &self,
1254 instrument_id: InstrumentId,
1255 ) -> anyhow::Result<f64, KrakenHttpError> {
1256 let instrument = self
1257 .get_cached_instrument(&instrument_id.symbol.inner())
1258 .ok_or_else(|| {
1259 KrakenHttpError::ParseError(format!(
1260 "Instrument not found in cache: {instrument_id}"
1261 ))
1262 })?;
1263
1264 let raw_symbol = instrument.raw_symbol().to_string();
1265 let tickers = self.inner.get_tickers().await?;
1266
1267 tickers
1268 .tickers
1269 .iter()
1270 .find(|t| t.symbol == raw_symbol)
1271 .ok_or_else(|| {
1272 KrakenHttpError::ParseError(format!("Symbol {raw_symbol} not found in tickers"))
1273 })
1274 .and_then(|t| {
1275 t.index_price.ok_or_else(|| {
1276 KrakenHttpError::ParseError(format!(
1277 "Index price not available for {raw_symbol} (may not be available in testnet)"
1278 ))
1279 })
1280 })
1281 }
1282
1283 pub async fn request_trades(
1284 &self,
1285 instrument_id: InstrumentId,
1286 start: Option<DateTime<Utc>>,
1287 end: Option<DateTime<Utc>>,
1288 limit: Option<u64>,
1289 ) -> anyhow::Result<Vec<TradeTick>, KrakenHttpError> {
1290 let instrument = self
1291 .get_cached_instrument(&instrument_id.symbol.inner())
1292 .ok_or_else(|| {
1293 KrakenHttpError::ParseError(format!(
1294 "Instrument not found in cache: {instrument_id}"
1295 ))
1296 })?;
1297
1298 let raw_symbol = instrument.raw_symbol().to_string();
1299 let ts_init = self.generate_ts_init();
1300
1301 let since = start.map(|dt| dt.timestamp_millis());
1302 let before = end.map(|dt| dt.timestamp_millis());
1303
1304 let response = self
1305 .inner
1306 .get_public_executions(&raw_symbol, since, before, Some("asc"), None)
1307 .await?;
1308
1309 let mut trades = Vec::new();
1310
1311 for element in &response.elements {
1312 let execution = &element.event.execution.execution;
1313 match parse_futures_public_execution(execution, &instrument, ts_init) {
1314 Ok(trade_tick) => {
1315 trades.push(trade_tick);
1316
1317 if let Some(limit_count) = limit
1318 && trades.len() >= limit_count as usize
1319 {
1320 return Ok(trades);
1321 }
1322 }
1323 Err(e) => {
1324 log::warn!("Failed to parse futures trade tick: {e}");
1325 }
1326 }
1327 }
1328
1329 Ok(trades)
1330 }
1331
1332 pub async fn request_bars(
1333 &self,
1334 bar_type: BarType,
1335 start: Option<DateTime<Utc>>,
1336 end: Option<DateTime<Utc>>,
1337 limit: Option<u64>,
1338 ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
1339 let instrument_id = bar_type.instrument_id();
1340 let instrument = self
1341 .get_cached_instrument(&instrument_id.symbol.inner())
1342 .ok_or_else(|| {
1343 KrakenHttpError::ParseError(format!(
1344 "Instrument not found in cache: {instrument_id}"
1345 ))
1346 })?;
1347
1348 let raw_symbol = instrument.raw_symbol().to_string();
1349 let ts_init = self.generate_ts_init();
1350 let tick_type = "trade";
1351 let resolution = bar_type_to_futures_resolution(bar_type)
1352 .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?;
1353
1354 let from = start.map(|dt| dt.timestamp());
1356 let to = end.map(|dt| dt.timestamp());
1357 let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1358
1359 let response = self
1360 .inner
1361 .get_ohlc(tick_type, &raw_symbol, resolution, from, to)
1362 .await?;
1363
1364 let mut bars = Vec::new();
1365
1366 for candle in response.candles {
1367 let ohlc = OhlcData {
1368 time: candle.time / 1000,
1369 open: candle.open,
1370 high: candle.high,
1371 low: candle.low,
1372 close: candle.close,
1373 vwap: "0".to_string(),
1374 volume: candle.volume,
1375 count: 0,
1376 };
1377
1378 match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
1379 Ok(bar) => {
1380 if let Some(end_nanos) = end_ns
1381 && bar.ts_event.as_u64() > end_nanos
1382 {
1383 continue;
1384 }
1385 bars.push(bar);
1386
1387 if let Some(limit_count) = limit
1388 && bars.len() >= limit_count as usize
1389 {
1390 return Ok(bars);
1391 }
1392 }
1393 Err(e) => {
1394 log::warn!("Failed to parse futures bar: {e}");
1395 }
1396 }
1397 }
1398
1399 Ok(bars)
1400 }
1401
1402 pub async fn request_book_snapshot(
1404 &self,
1405 instrument_id: InstrumentId,
1406 depth: Option<u32>,
1407 ) -> anyhow::Result<OrderBook, KrakenHttpError> {
1408 let instrument = self
1409 .get_cached_instrument(&instrument_id.symbol.inner())
1410 .ok_or_else(|| {
1411 KrakenHttpError::ParseError(format!(
1412 "Instrument not found in cache: {instrument_id}"
1413 ))
1414 })?;
1415
1416 let raw_symbol = instrument.raw_symbol().to_string();
1417 let price_precision = instrument.price_precision();
1418 let size_precision = instrument.size_precision();
1419 let ts_event = self.generate_ts_init();
1420
1421 let response = self.inner.get_orderbook(&raw_symbol).await?;
1422 let book_data = &response.order_book;
1423
1424 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1425
1426 let bid_limit = depth.map_or(book_data.bids.len(), |d| {
1427 (d as usize).min(book_data.bids.len())
1428 });
1429 let ask_limit = depth.map_or(book_data.asks.len(), |d| {
1430 (d as usize).min(book_data.asks.len())
1431 });
1432
1433 for (i, level) in book_data.bids.iter().take(bid_limit).enumerate() {
1436 let price = Price::new(level.price, price_precision);
1437 let size = Quantity::new(level.qty, size_precision);
1438 let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1439 book.add(order, 0, 0, ts_event);
1440 }
1441
1442 for (i, level) in book_data.asks.iter().take(ask_limit).enumerate() {
1443 let price = Price::new(level.price, price_precision);
1444 let size = Quantity::new(level.qty, size_precision);
1445 let order = BookOrder::new(OrderSide::Sell, price, size, (bid_limit + i) as u64);
1446 book.add(order, 0, 0, ts_event);
1447 }
1448
1449 Ok(book)
1450 }
1451
1452 pub async fn request_funding_rates(
1457 &self,
1458 instrument_id: InstrumentId,
1459 start: Option<DateTime<Utc>>,
1460 end: Option<DateTime<Utc>>,
1461 limit: Option<usize>,
1462 ) -> anyhow::Result<Vec<FundingRateUpdate>, KrakenHttpError> {
1463 let instrument = self
1464 .get_cached_instrument(&instrument_id.symbol.inner())
1465 .ok_or_else(|| {
1466 KrakenHttpError::ParseError(format!(
1467 "Instrument not found in cache: {instrument_id}"
1468 ))
1469 })?;
1470
1471 let raw_symbol = instrument.raw_symbol().to_string();
1472 let ts_init = self.generate_ts_init();
1473 let start_ns = start.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1474 let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1475
1476 let response = self.inner.get_historical_funding_rates(&raw_symbol).await?;
1477
1478 let mut rates = Vec::new();
1479
1480 for entry in &response.rates {
1481 let ts_event = entry
1482 .timestamp
1483 .parse::<DateTime<Utc>>()
1484 .map_or(ts_init, |dt| {
1485 UnixNanos::from(dt.timestamp_nanos_opt().unwrap_or(0) as u64)
1486 });
1487
1488 if let Some(s) = start_ns
1489 && ts_event.as_u64() < s
1490 {
1491 continue;
1492 }
1493
1494 if let Some(e) = end_ns
1495 && ts_event.as_u64() > e
1496 {
1497 continue;
1498 }
1499
1500 let Some(rate) = Decimal::from_f64(entry.relative_funding_rate) else {
1501 continue;
1502 };
1503
1504 rates.push(FundingRateUpdate::new(
1505 instrument_id,
1506 rate,
1507 None,
1508 None,
1509 ts_event,
1510 ts_init,
1511 ));
1512
1513 if let Some(lim) = limit
1514 && rates.len() >= lim
1515 {
1516 break;
1517 }
1518 }
1519
1520 rates.reverse();
1522
1523 Ok(rates)
1524 }
1525
1526 pub async fn request_account_state(
1538 &self,
1539 account_id: AccountId,
1540 ) -> anyhow::Result<AccountState> {
1541 let accounts_response = self.inner.get_accounts().await?;
1542
1543 if accounts_response.result != KrakenApiResult::Success {
1544 let error_msg = accounts_response
1545 .error
1546 .unwrap_or_else(|| "Unknown error".to_string());
1547 anyhow::bail!("Failed to get futures accounts: {error_msg}");
1548 }
1549
1550 let ts_init = self.generate_ts_init();
1551
1552 let mut balances: Vec<AccountBalance> = Vec::new();
1553 let mut margins: Vec<MarginBalance> = Vec::new();
1554
1555 for account in accounts_response.accounts.values() {
1556 match account.account_type {
1557 KrakenFuturesAccountType::MultiCollateralMarginAccount => {
1558 parse_multi_collateral_balances(account, &mut balances);
1559 parse_multi_collateral_margins(account, &mut margins);
1560 }
1561 KrakenFuturesAccountType::MarginAccount => {
1562 parse_margin_account_balances(account, &mut balances);
1563 parse_margin_account_margins(account, &mut margins);
1564 }
1565 KrakenFuturesAccountType::CashAccount => {
1566 parse_cash_account_balances(account, &mut balances);
1567 }
1568 KrakenFuturesAccountType::Unknown => {
1569 log::debug!("Unknown account type: {:?}", account.account_type);
1570 }
1571 }
1572 }
1573
1574 Ok(AccountState::new(
1575 account_id,
1576 AccountType::Margin,
1577 balances,
1578 margins,
1579 true,
1580 UUID4::new(),
1581 ts_init,
1582 ts_init,
1583 None,
1584 ))
1585 }
1586
1587 pub async fn request_order_status_reports(
1588 &self,
1589 account_id: AccountId,
1590 instrument_id: Option<InstrumentId>,
1591 start: Option<DateTime<Utc>>,
1592 end: Option<DateTime<Utc>>,
1593 open_only: bool,
1594 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1595 let ts_init = self.generate_ts_init();
1596 let mut all_reports = Vec::new();
1597
1598 let response = self
1599 .inner
1600 .get_open_orders()
1601 .await
1602 .map_err(|e| anyhow::anyhow!("get_open_orders failed: {e}"))?;
1603
1604 if response.result != KrakenApiResult::Success {
1605 let error_msg = response
1606 .error
1607 .unwrap_or_else(|| "Unknown error".to_string());
1608 anyhow::bail!("Failed to get open orders: {error_msg}");
1609 }
1610
1611 for order in &response.open_orders {
1612 if let Some(ref target_id) = instrument_id {
1613 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1614 if let Some(inst) = instrument
1615 && inst.raw_symbol().as_str() != order.symbol
1616 {
1617 continue;
1618 }
1619 }
1620
1621 if let Some(instrument) = self.get_instrument_by_raw_symbol(&order.symbol) {
1622 match parse_futures_order_status_report(order, &instrument, account_id, ts_init) {
1623 Ok(report) => all_reports.push(report),
1624 Err(e) => {
1625 let order_id = &order.order_id;
1626 log::warn!("Failed to parse futures order {order_id}: {e}");
1627 }
1628 }
1629 }
1630 }
1631
1632 if !open_only {
1633 let start_ms = start.map(|dt| dt.timestamp_millis());
1635 let end_ms = end.map(|dt| dt.timestamp_millis());
1636 let response = self
1637 .inner
1638 .get_order_events(end_ms, start_ms, None)
1639 .await
1640 .map_err(|e| anyhow::anyhow!("get_order_events failed: {e}"))?;
1641
1642 for event_wrapper in response.order_events {
1643 let event = &event_wrapper.order;
1644
1645 if let Some(ref target_id) = instrument_id {
1646 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1647 if let Some(inst) = instrument
1648 && inst.raw_symbol().as_str() != event.symbol
1649 {
1650 continue;
1651 }
1652 }
1653
1654 if let Some(instrument) = self.get_instrument_by_raw_symbol(&event.symbol) {
1655 match parse_futures_order_event_status_report(
1656 event,
1657 Some(event_wrapper.event_type),
1658 &instrument,
1659 account_id,
1660 ts_init,
1661 ) {
1662 Ok(report) => all_reports.push(report),
1663 Err(e) => {
1664 let order_id = &event.order_id;
1665 log::warn!("Failed to parse futures order event {order_id}: {e}");
1666 }
1667 }
1668 }
1669 }
1670 }
1671
1672 Ok(all_reports)
1673 }
1674
1675 pub async fn request_fill_reports(
1676 &self,
1677 account_id: AccountId,
1678 instrument_id: Option<InstrumentId>,
1679 start: Option<DateTime<Utc>>,
1680 end: Option<DateTime<Utc>>,
1681 ) -> anyhow::Result<Vec<FillReport>> {
1682 let ts_init = self.generate_ts_init();
1683 let mut all_reports = Vec::new();
1684
1685 let response = self.inner.get_fills(None).await?;
1686 if response.result != KrakenApiResult::Success {
1687 let error_msg = response
1688 .error
1689 .unwrap_or_else(|| "Unknown error".to_string());
1690 anyhow::bail!("Failed to get fills: {error_msg}");
1691 }
1692
1693 let start_ms = start.map(|dt| dt.timestamp_millis());
1694 let end_ms = end.map(|dt| dt.timestamp_millis());
1695
1696 for fill in response.fills {
1697 if let Some(start_threshold) = start_ms
1698 && let Ok(fill_ts) = DateTime::parse_from_rfc3339(&fill.fill_time)
1699 {
1700 let fill_ms = fill_ts.timestamp_millis();
1701 if fill_ms < start_threshold {
1702 continue;
1703 }
1704 }
1705
1706 if let Some(end_threshold) = end_ms
1707 && let Ok(fill_ts) = DateTime::parse_from_rfc3339(&fill.fill_time)
1708 {
1709 let fill_ms = fill_ts.timestamp_millis();
1710 if fill_ms > end_threshold {
1711 continue;
1712 }
1713 }
1714
1715 if let Some(ref target_id) = instrument_id {
1716 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1717 if let Some(inst) = instrument
1718 && inst.raw_symbol().as_str() != fill.symbol
1719 {
1720 continue;
1721 }
1722 }
1723
1724 if let Some(instrument) = self.get_instrument_by_raw_symbol(&fill.symbol) {
1725 match parse_futures_fill_report(&fill, &instrument, account_id, ts_init) {
1726 Ok(report) => all_reports.push(report),
1727 Err(e) => {
1728 let fill_id = &fill.fill_id;
1729 log::warn!("Failed to parse futures fill {fill_id}: {e}");
1730 }
1731 }
1732 }
1733 }
1734
1735 Ok(all_reports)
1736 }
1737
1738 pub async fn request_position_status_reports(
1739 &self,
1740 account_id: AccountId,
1741 instrument_id: Option<InstrumentId>,
1742 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1743 let ts_init = self.generate_ts_init();
1744 let mut all_reports = Vec::new();
1745
1746 let response = self.inner.get_open_positions().await?;
1747 if response.result != KrakenApiResult::Success {
1748 let error_msg = response
1749 .error
1750 .unwrap_or_else(|| "Unknown error".to_string());
1751 anyhow::bail!("Failed to get open positions: {error_msg}");
1752 }
1753
1754 for position in response.open_positions {
1755 if let Some(ref target_id) = instrument_id {
1756 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1757 if let Some(inst) = instrument
1758 && inst.raw_symbol().as_str() != position.symbol
1759 {
1760 continue;
1761 }
1762 }
1763
1764 if let Some(instrument) = self.get_instrument_by_raw_symbol(&position.symbol) {
1765 match parse_futures_position_status_report(
1766 &position,
1767 &instrument,
1768 account_id,
1769 ts_init,
1770 ) {
1771 Ok(report) => all_reports.push(report),
1772 Err(e) => {
1773 let symbol = &position.symbol;
1774 log::warn!("Failed to parse futures position {symbol}: {e}");
1775 }
1776 }
1777 }
1778 }
1779
1780 Ok(all_reports)
1781 }
1782
1783 #[expect(clippy::too_many_arguments)]
1784 fn build_send_order_params(
1785 &self,
1786 instrument_id: InstrumentId,
1787 client_order_id: ClientOrderId,
1788 order_side: OrderSide,
1789 order_type: OrderType,
1790 quantity: Quantity,
1791 time_in_force: TimeInForce,
1792 price: Option<Price>,
1793 trigger_price: Option<Price>,
1794 trigger_type: Option<TriggerType>,
1795 reduce_only: bool,
1796 post_only: bool,
1797 ) -> anyhow::Result<KrakenFuturesSendOrderParams> {
1798 let instrument = self
1799 .get_cached_instrument(&instrument_id.symbol.inner())
1800 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1801
1802 let raw_symbol = instrument.raw_symbol().inner();
1803
1804 let kraken_order_type = match order_type {
1811 OrderType::Market => KrakenFuturesOrderType::Market,
1812 OrderType::Limit => {
1813 if post_only {
1814 KrakenFuturesOrderType::Post
1815 } else {
1816 match time_in_force {
1817 TimeInForce::Ioc => KrakenFuturesOrderType::Ioc,
1818 TimeInForce::Fok => {
1819 anyhow::bail!("FOK not supported by Kraken Futures, use IOC instead")
1820 }
1821 TimeInForce::Gtd => {
1822 anyhow::bail!("GTD not supported by Kraken Futures, use GTC instead")
1823 }
1824 _ => KrakenFuturesOrderType::Limit, }
1826 }
1827 }
1828 OrderType::StopMarket | OrderType::StopLimit => KrakenFuturesOrderType::Stop,
1829 OrderType::MarketIfTouched | OrderType::LimitIfTouched => {
1830 KrakenFuturesOrderType::TakeProfit
1831 }
1832 _ => anyhow::bail!("Unsupported order type: {order_type:?}"),
1833 };
1834
1835 let kraken_side: KrakenOrderSide = order_side
1836 .try_into()
1837 .map_err(|e| anyhow::anyhow!("Invalid order side: {e}"))?;
1838
1839 let mut builder = KrakenFuturesSendOrderParamsBuilder::default();
1840 builder
1841 .cli_ord_id(truncate_cl_ord_id(&client_order_id))
1842 .broker(NAUTILUS_KRAKEN_BROKER_ID)
1843 .symbol(raw_symbol)
1844 .side(kraken_side)
1845 .size(quantity.to_string())
1846 .order_type(kraken_order_type);
1847
1848 if matches!(
1849 order_type,
1850 OrderType::StopMarket
1851 | OrderType::StopLimit
1852 | OrderType::MarketIfTouched
1853 | OrderType::LimitIfTouched
1854 ) && let Some(signal) = map_futures_trigger_signal(trigger_type)?
1855 {
1856 builder.trigger_signal(signal);
1857 }
1858
1859 match order_type {
1860 OrderType::StopMarket => {
1861 if let Some(trigger) = trigger_price {
1862 builder.stop_price(trigger.to_string());
1863 }
1864 }
1865 OrderType::StopLimit => {
1866 if let Some(trigger) = trigger_price {
1867 builder.stop_price(trigger.to_string());
1868 }
1869
1870 if let Some(limit) = price {
1871 builder.limit_price(limit.to_string());
1872 }
1873 }
1874 OrderType::MarketIfTouched | OrderType::LimitIfTouched => {
1875 if let Some(trigger) = trigger_price {
1876 builder.stop_price(trigger.to_string());
1877 }
1878
1879 if let Some(limit) = price {
1880 builder.limit_price(limit.to_string());
1881 }
1882 }
1883 _ => {
1884 if let Some(limit) = price {
1885 builder.limit_price(limit.to_string());
1886 }
1887 }
1888 }
1889
1890 if reduce_only {
1891 builder.reduce_only(true);
1892 }
1893
1894 builder
1895 .build()
1896 .map_err(|e| anyhow::anyhow!("Failed to build order params: {e}"))
1897 }
1898
1899 #[expect(clippy::too_many_arguments)]
1910 pub async fn submit_order(
1911 &self,
1912 account_id: AccountId,
1913 instrument_id: InstrumentId,
1914 client_order_id: ClientOrderId,
1915 order_side: OrderSide,
1916 order_type: OrderType,
1917 quantity: Quantity,
1918 time_in_force: TimeInForce,
1919 price: Option<Price>,
1920 trigger_price: Option<Price>,
1921 trigger_type: Option<TriggerType>,
1922 reduce_only: bool,
1923 post_only: bool,
1924 ) -> anyhow::Result<OrderStatusReport> {
1925 let instrument = self
1926 .get_cached_instrument(&instrument_id.symbol.inner())
1927 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1928
1929 let params = self.build_send_order_params(
1930 instrument_id,
1931 client_order_id,
1932 order_side,
1933 order_type,
1934 quantity,
1935 time_in_force,
1936 price,
1937 trigger_price,
1938 trigger_type,
1939 reduce_only,
1940 post_only,
1941 )?;
1942
1943 let response = self.inner.send_order_params(¶ms).await?;
1944
1945 if response.result != KrakenApiResult::Success {
1946 let error_msg = response
1947 .error
1948 .unwrap_or_else(|| "Unknown error".to_string());
1949 anyhow::bail!("Order submission failed: {error_msg}");
1950 }
1951
1952 let send_status = response
1953 .send_status
1954 .ok_or_else(|| anyhow::anyhow!("No send_status in successful response"))?;
1955
1956 let status = &send_status.status;
1957
1958 if status == "postWouldExecute" {
1960 let reason = send_status
1961 .order_events
1962 .as_ref()
1963 .and_then(|events| events.first())
1964 .and_then(|e| e.reason.clone())
1965 .unwrap_or_else(|| "Post-only order would have crossed".to_string());
1966 anyhow::bail!("POST_ONLY_REJECTED: {reason}");
1967 }
1968
1969 let venue_order_id = send_status
1970 .order_id
1971 .ok_or_else(|| anyhow::anyhow!("No order_id in send_status: {status}"))?;
1972
1973 let ts_init = self.generate_ts_init();
1974
1975 let open_orders_response = self.inner.get_open_orders().await?;
1976 if let Some(order) = open_orders_response
1977 .open_orders
1978 .iter()
1979 .find(|o| o.order_id == venue_order_id)
1980 {
1981 return parse_futures_order_status_report(order, &instrument, account_id, ts_init);
1982 }
1983
1984 if let Some(order_events) = &send_status.order_events
1987 && let Some(send_event) = order_events.first()
1988 {
1989 let event = if let Some(order_data) = &send_event.order {
1991 FuturesOrderEvent {
1992 order_id: order_data.order_id.clone(),
1993 cli_ord_id: order_data.cli_ord_id.clone(),
1994 order_type: order_data.order_type,
1995 symbol: order_data.symbol.clone(),
1996 side: order_data.side,
1997 quantity: order_data.quantity,
1998 filled: order_data.filled,
1999 limit_price: order_data.limit_price,
2000 stop_price: order_data.stop_price,
2001 timestamp: order_data.timestamp.clone(),
2002 last_update_timestamp: order_data.last_update_timestamp.clone(),
2003 reduce_only: order_data.reduce_only,
2004 }
2005 } else if let Some(trigger_data) = &send_event.order_trigger {
2006 FuturesOrderEvent {
2007 order_id: trigger_data.uid.clone(),
2008 cli_ord_id: trigger_data.client_id.clone(),
2009 order_type: trigger_data.order_type,
2010 symbol: trigger_data.symbol.clone(),
2011 side: trigger_data.side,
2012 quantity: trigger_data.quantity,
2013 filled: 0.0,
2014 limit_price: trigger_data.limit_price,
2015 stop_price: Some(trigger_data.trigger_price),
2016 timestamp: trigger_data.timestamp.clone(),
2017 last_update_timestamp: trigger_data.last_update_timestamp.clone(),
2018 reduce_only: trigger_data.reduce_only,
2019 }
2020 } else if let Some(prior_exec) = &send_event.order_prior_execution {
2021 FuturesOrderEvent {
2023 order_id: prior_exec.order_id.clone(),
2024 cli_ord_id: prior_exec.cli_ord_id.clone(),
2025 order_type: prior_exec.order_type,
2026 symbol: prior_exec.symbol.clone(),
2027 side: prior_exec.side,
2028 quantity: prior_exec.quantity,
2029 filled: send_event.amount.unwrap_or(prior_exec.quantity), limit_price: prior_exec.limit_price,
2031 stop_price: prior_exec.stop_price,
2032 timestamp: prior_exec.timestamp.clone(),
2033 last_update_timestamp: prior_exec.last_update_timestamp.clone(),
2034 reduce_only: prior_exec.reduce_only,
2035 }
2036 } else {
2037 anyhow::bail!("No order, orderTrigger, or orderPriorExecution data in event");
2038 };
2039 return parse_futures_order_event_status_report(
2040 &event,
2041 Some(send_event.event_type),
2042 &instrument,
2043 account_id,
2044 ts_init,
2045 );
2046 }
2047
2048 let events_response = self.inner.get_order_events(None, None, None).await?;
2050 let event_wrapper = events_response
2051 .order_events
2052 .iter()
2053 .find(|e| e.order.order_id == venue_order_id)
2054 .ok_or_else(|| {
2055 anyhow::anyhow!("Order not found in open orders or events: {venue_order_id}")
2056 })?;
2057
2058 parse_futures_order_event_status_report(
2059 &event_wrapper.order,
2060 Some(event_wrapper.event_type),
2061 &instrument,
2062 account_id,
2063 ts_init,
2064 )
2065 }
2066
2067 pub async fn modify_order(
2079 &self,
2080 instrument_id: InstrumentId,
2081 client_order_id: Option<ClientOrderId>,
2082 venue_order_id: Option<VenueOrderId>,
2083 quantity: Option<Quantity>,
2084 price: Option<Price>,
2085 trigger_price: Option<Price>,
2086 ) -> anyhow::Result<VenueOrderId> {
2087 let params = self.build_edit_order_params(
2088 instrument_id,
2089 client_order_id,
2090 venue_order_id,
2091 quantity,
2092 price,
2093 trigger_price,
2094 )?;
2095 let original_order_id = params.order_id.clone();
2096
2097 let response = self.inner.edit_order(¶ms).await?;
2098
2099 if response.result != KrakenApiResult::Success {
2100 let status = &response.edit_status.status;
2101 anyhow::bail!("Order modification failed: {status}");
2102 }
2103
2104 let new_venue_order_id = response
2106 .edit_status
2107 .order_id
2108 .or(original_order_id)
2109 .ok_or_else(|| anyhow::anyhow!("No order ID in edit order response"))?;
2110
2111 Ok(VenueOrderId::new(&new_venue_order_id))
2112 }
2113
2114 pub async fn cancel_order(
2124 &self,
2125 _account_id: AccountId,
2126 instrument_id: InstrumentId,
2127 client_order_id: Option<ClientOrderId>,
2128 venue_order_id: Option<VenueOrderId>,
2129 ) -> anyhow::Result<()> {
2130 let _ = self
2131 .get_cached_instrument(&instrument_id.symbol.inner())
2132 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
2133
2134 let order_id = venue_order_id.as_ref().map(|id| id.to_string());
2135 let cli_ord_id = client_order_id.as_ref().map(truncate_cl_ord_id);
2136
2137 if order_id.is_none() && cli_ord_id.is_none() {
2138 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2139 }
2140
2141 let response = self.inner.cancel_order(order_id, cli_ord_id).await?;
2142
2143 if response.result != KrakenApiResult::Success {
2144 let status = &response.cancel_status.status;
2145 anyhow::bail!("Order cancellation failed: {status}");
2146 }
2147
2148 Ok(())
2149 }
2150
2151 pub async fn cancel_orders_batch(
2161 &self,
2162 venue_order_ids: Vec<VenueOrderId>,
2163 ) -> anyhow::Result<usize> {
2164 if venue_order_ids.is_empty() {
2165 return Ok(0);
2166 }
2167
2168 let mut total_cancelled = 0;
2169
2170 for chunk in venue_order_ids.chunks(BATCH_CANCEL_LIMIT) {
2171 let order_ids: Vec<String> = chunk.iter().map(|id| id.to_string()).collect();
2172 let response = self.inner.cancel_orders_batch(order_ids).await?;
2173
2174 if response.result != KrakenApiResult::Success {
2175 let error_msg = response.error.as_deref().unwrap_or("Unknown error");
2176 anyhow::bail!("Batch cancel failed: {error_msg}");
2177 }
2178
2179 let success_count = response
2180 .batch_status
2181 .iter()
2182 .filter(|s| {
2183 s.status == Some(KrakenSendStatus::Cancelled)
2184 || s.cancel_status
2185 .as_ref()
2186 .is_some_and(|cs| cs.status == KrakenSendStatus::Cancelled)
2187 })
2188 .count();
2189
2190 total_cancelled += success_count;
2191 }
2192
2193 Ok(total_cancelled)
2194 }
2195
2196 #[expect(clippy::type_complexity)]
2205 pub async fn submit_orders_batch(
2206 &self,
2207 orders: Vec<(
2208 InstrumentId,
2209 ClientOrderId,
2210 OrderSide,
2211 OrderType,
2212 Quantity,
2213 TimeInForce,
2214 Option<Price>,
2215 Option<Price>,
2216 Option<TriggerType>,
2217 bool,
2218 bool,
2219 )>,
2220 ) -> anyhow::Result<Vec<FuturesSendStatus>> {
2221 let count = orders.len();
2222 if count == 0 {
2223 return Ok(Vec::new());
2224 }
2225
2226 let mut all_statuses: Vec<Option<FuturesSendStatus>> = vec![None; count];
2229 let mut valid_items = Vec::with_capacity(count);
2230 let mut valid_indices = Vec::with_capacity(count);
2231
2232 for (
2233 idx,
2234 (
2235 instrument_id,
2236 client_order_id,
2237 order_side,
2238 order_type,
2239 quantity,
2240 time_in_force,
2241 price,
2242 trigger_price,
2243 trigger_type,
2244 reduce_only,
2245 post_only,
2246 ),
2247 ) in orders.into_iter().enumerate()
2248 {
2249 match self.build_send_order_params(
2250 instrument_id,
2251 client_order_id,
2252 order_side,
2253 order_type,
2254 quantity,
2255 time_in_force,
2256 price,
2257 trigger_price,
2258 trigger_type,
2259 reduce_only,
2260 post_only,
2261 ) {
2262 Ok(params) => {
2263 valid_items.push(KrakenFuturesBatchSendItem::from_params(
2264 params,
2265 idx.to_string(),
2266 ));
2267 valid_indices.push(idx);
2268 }
2269 Err(e) => {
2270 all_statuses[idx] = Some(FuturesSendStatus {
2271 order_id: None,
2272 status: format!("validation_error: {e}"),
2273 order_events: None,
2274 cli_ord_id: None,
2275 received_time: None,
2276 });
2277 }
2278 }
2279 }
2280
2281 if valid_items.is_empty() {
2282 return Ok(all_statuses.into_iter().flatten().collect());
2283 }
2284
2285 let mut batch_statuses: Vec<FuturesSendStatus> = Vec::with_capacity(valid_items.len());
2286
2287 for chunk in valid_items.chunks(BATCH_ORDER_LIMIT) {
2288 match self.inner.submit_orders_batch(chunk.to_vec()).await {
2289 Ok(response) => {
2290 if response.result == KrakenApiResult::Success {
2291 batch_statuses.extend(response.batch_status);
2292 } else {
2293 let error_msg = response
2294 .batch_status
2295 .first()
2296 .map_or("Unknown error", |s| s.status.as_str());
2297
2298 for _ in 0..chunk.len() {
2299 batch_statuses.push(FuturesSendStatus {
2300 order_id: None,
2301 status: format!("api_error: {error_msg}"),
2302 order_events: None,
2303 cli_ord_id: None,
2304 received_time: None,
2305 });
2306 }
2307 }
2308 }
2309 Err(e) => {
2310 let remaining = valid_items.len() - batch_statuses.len();
2312 for _ in 0..remaining {
2313 batch_statuses.push(FuturesSendStatus {
2314 order_id: None,
2315 status: format!("batch_error: {e}"),
2316 order_events: None,
2317 cli_ord_id: None,
2318 received_time: None,
2319 });
2320 }
2321 break;
2322 }
2323 }
2324 }
2325
2326 for (batch_idx, &original_idx) in valid_indices.iter().enumerate() {
2328 if let Some(status) = batch_statuses.get(batch_idx) {
2329 all_statuses[original_idx] = Some(status.clone());
2330 }
2331 }
2332
2333 Ok(all_statuses.into_iter().flatten().collect())
2334 }
2335
2336 #[expect(clippy::type_complexity)]
2338 pub async fn edit_orders_batch(
2339 &self,
2340 orders: Vec<(
2341 InstrumentId,
2342 Option<ClientOrderId>,
2343 Option<VenueOrderId>,
2344 Option<Quantity>,
2345 Option<Price>,
2346 Option<Price>,
2347 )>,
2348 ) -> anyhow::Result<Vec<String>> {
2349 let count = orders.len();
2350 if count == 0 {
2351 return Ok(Vec::new());
2352 }
2353
2354 let mut all_statuses: Vec<Option<String>> = vec![None; count];
2355 let mut valid_items = Vec::with_capacity(count);
2356 let mut valid_indices = Vec::with_capacity(count);
2357
2358 for (
2359 idx,
2360 (instrument_id, client_order_id, venue_order_id, quantity, price, trigger_price),
2361 ) in orders.into_iter().enumerate()
2362 {
2363 match self.build_edit_order_params(
2364 instrument_id,
2365 client_order_id,
2366 venue_order_id,
2367 quantity,
2368 price,
2369 trigger_price,
2370 ) {
2371 Ok(params) => {
2372 valid_items.push(KrakenFuturesBatchEditItem::from_params(
2373 params,
2374 idx.to_string(),
2375 ));
2376 valid_indices.push(idx);
2377 }
2378 Err(e) => {
2379 all_statuses[idx] = Some(format!("validation_error: {e}"));
2380 }
2381 }
2382 }
2383
2384 if valid_items.is_empty() {
2385 return Ok(all_statuses.into_iter().flatten().collect());
2386 }
2387
2388 let mut batch_statuses: Vec<String> = Vec::with_capacity(valid_items.len());
2389
2390 for chunk in valid_items.chunks(BATCH_ORDER_LIMIT) {
2391 match self.inner.edit_orders_batch(chunk.to_vec()).await {
2392 Ok(response) => {
2393 if response.result == KrakenApiResult::Success {
2394 batch_statuses.extend(response.batch_status.into_iter().map(|s| s.status));
2395 } else {
2396 let error_msg = response
2397 .batch_status
2398 .first()
2399 .map_or("Unknown error", |s| s.status.as_str());
2400
2401 for _ in 0..chunk.len() {
2402 batch_statuses.push(format!("api_error: {error_msg}"));
2403 }
2404 }
2405 }
2406 Err(e) => {
2407 let remaining = valid_items.len() - batch_statuses.len();
2408 for _ in 0..remaining {
2409 batch_statuses.push(format!("batch_error: {e}"));
2410 }
2411 break;
2412 }
2413 }
2414 }
2415
2416 for (batch_idx, &original_idx) in valid_indices.iter().enumerate() {
2417 if let Some(status) = batch_statuses.get(batch_idx) {
2418 all_statuses[original_idx] = Some(status.clone());
2419 }
2420 }
2421
2422 Ok(all_statuses.into_iter().flatten().collect())
2423 }
2424
2425 fn build_edit_order_params(
2426 &self,
2427 instrument_id: InstrumentId,
2428 client_order_id: Option<ClientOrderId>,
2429 venue_order_id: Option<VenueOrderId>,
2430 quantity: Option<Quantity>,
2431 price: Option<Price>,
2432 trigger_price: Option<Price>,
2433 ) -> anyhow::Result<KrakenFuturesEditOrderParams> {
2434 let _ = self
2435 .get_cached_instrument(&instrument_id.symbol.inner())
2436 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
2437
2438 let order_id = venue_order_id.as_ref().map(|id| id.to_string());
2439 let cli_ord_id = client_order_id.as_ref().map(truncate_cl_ord_id);
2440
2441 if order_id.is_none() && cli_ord_id.is_none() {
2442 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2443 }
2444
2445 let mut builder = KrakenFuturesEditOrderParamsBuilder::default();
2446
2447 if let Some(ref id) = order_id {
2448 builder.order_id(id.clone());
2449 }
2450
2451 if let Some(ref id) = cli_ord_id {
2452 builder.cli_ord_id(id.clone());
2453 }
2454
2455 if let Some(qty) = quantity {
2456 builder.size(qty.to_string());
2457 }
2458
2459 if let Some(p) = price {
2460 builder.limit_price(p.to_string());
2461 }
2462
2463 if let Some(tp) = trigger_price {
2464 builder.stop_price(tp.to_string());
2465 }
2466
2467 builder
2468 .build()
2469 .map_err(|e| anyhow::anyhow!("Failed to build edit order params: {e}"))
2470 }
2471}
2472
2473fn map_futures_trigger_signal(
2474 trigger_type: Option<TriggerType>,
2475) -> anyhow::Result<Option<KrakenTriggerSignal>> {
2476 match trigger_type {
2477 None => Ok(None),
2478 Some(TriggerType::Default | TriggerType::LastPrice) => Ok(Some(KrakenTriggerSignal::Last)),
2479 Some(TriggerType::MarkPrice) => Ok(Some(KrakenTriggerSignal::Mark)),
2480 Some(TriggerType::IndexPrice) => Ok(Some(KrakenTriggerSignal::Index)),
2481 Some(other) => anyhow::bail!(
2482 "Unsupported trigger type for Kraken Futures: {other:?} (only LastPrice, MarkPrice, and IndexPrice supported)"
2483 ),
2484 }
2485}
2486
2487fn parse_multi_collateral_balances(account: &FuturesAccount, balances: &mut Vec<AccountBalance>) {
2488 for (currency_code, currency_info) in &account.currencies {
2489 if currency_info.quantity == 0.0 {
2490 continue;
2491 }
2492
2493 let currency = Currency::new(
2494 currency_code.as_str(),
2495 8,
2496 0,
2497 currency_code.as_str(),
2498 CurrencyType::Crypto,
2499 );
2500
2501 let total_amount = currency_info.quantity;
2502 let available_amount = currency_info.available.unwrap_or(total_amount);
2503 let locked_amount = total_amount - available_amount;
2504
2505 push_balance_from_f64(
2506 balances,
2507 total_amount,
2508 locked_amount,
2509 currency,
2510 currency_code,
2511 );
2512 }
2513
2514 if let Some(portfolio_value) = account.portfolio_value
2517 && portfolio_value > 0.0
2518 {
2519 let usd_currency = Currency::USD();
2520 let available_usd = account.available_margin.unwrap_or(portfolio_value);
2521 let locked_usd = portfolio_value - available_usd;
2522
2523 push_balance_from_f64(balances, portfolio_value, locked_usd, usd_currency, "USD");
2524 }
2525}
2526
2527fn push_balance_from_f64(
2531 balances: &mut Vec<AccountBalance>,
2532 total: f64,
2533 locked: f64,
2534 currency: Currency,
2535 ccy_label: &str,
2536) {
2537 let Some(total_dec) = Decimal::from_f64(total) else {
2538 log::warn!("Skipping {ccy_label} balance: non-finite total {total}");
2539 return;
2540 };
2541 let Some(locked_dec) = Decimal::from_f64(locked) else {
2542 log::warn!("Skipping {ccy_label} balance: non-finite locked {locked}");
2543 return;
2544 };
2545
2546 match AccountBalance::from_total_and_locked(total_dec, locked_dec, currency) {
2547 Ok(balance) => balances.push(balance),
2548 Err(e) => log::warn!("Skipping {ccy_label} balance: {e}"),
2549 }
2550}
2551
2552fn parse_multi_collateral_margins(account: &FuturesAccount, margins: &mut Vec<MarginBalance>) {
2553 if let Some(initial_margin) = account.initial_margin
2554 && initial_margin > 0.0
2555 {
2556 let usd_currency = Currency::USD();
2557 let maintenance = account
2558 .margin_requirements
2559 .as_ref()
2560 .and_then(|mr| mr.mm)
2561 .unwrap_or(0.0);
2562 margins.push(MarginBalance::new(
2565 Money::new(initial_margin, usd_currency),
2566 Money::new(maintenance, usd_currency),
2567 None,
2568 ));
2569 }
2570}
2571
2572fn parse_margin_account_balances(account: &FuturesAccount, balances: &mut Vec<AccountBalance>) {
2573 for (currency_code, &amount) in &account.balances {
2574 if amount == 0.0 {
2575 continue;
2576 }
2577
2578 let currency = Currency::new(
2579 currency_code.as_str(),
2580 8,
2581 0,
2582 currency_code.as_str(),
2583 CurrencyType::Crypto,
2584 );
2585
2586 let available = account
2587 .auxiliary
2588 .as_ref()
2589 .and_then(|aux| aux.af)
2590 .unwrap_or(amount);
2591 let locked = amount - available;
2592
2593 push_balance_from_f64(balances, amount, locked, currency, currency_code);
2594 }
2595}
2596
2597fn parse_margin_account_margins(account: &FuturesAccount, margins: &mut Vec<MarginBalance>) {
2598 if let Some(ref mr) = account.margin_requirements {
2599 let im = mr.im.unwrap_or(0.0);
2600 let mm = mr.mm.unwrap_or(0.0);
2601 if im > 0.0 || mm > 0.0 {
2602 let usd_currency = Currency::USD();
2603 margins.push(MarginBalance::new(
2604 Money::new(im, usd_currency),
2605 Money::new(mm, usd_currency),
2606 None,
2607 ));
2608 }
2609 }
2610}
2611
2612fn parse_cash_account_balances(account: &FuturesAccount, balances: &mut Vec<AccountBalance>) {
2613 for (currency_code, &amount) in &account.balances {
2614 if amount == 0.0 {
2615 continue;
2616 }
2617
2618 let currency = Currency::new(
2619 currency_code.as_str(),
2620 8,
2621 0,
2622 currency_code.as_str(),
2623 CurrencyType::Crypto,
2624 );
2625
2626 push_balance_from_f64(balances, amount, 0.0, currency, currency_code);
2627 }
2628}
2629
2630#[cfg(test)]
2631mod tests {
2632 use ahash::AHashMap;
2633 use nautilus_model::instruments::CryptoPerpetual;
2634 use rstest::rstest;
2635
2636 use super::*;
2637
2638 #[rstest]
2639 fn test_raw_client_creation() {
2640 let client = KrakenFuturesRawHttpClient::default();
2641 assert!(client.credential.is_none());
2642 assert!(client.base_url().contains("futures"));
2643 }
2644
2645 #[rstest]
2646 fn test_raw_client_with_credentials() {
2647 let client = KrakenFuturesRawHttpClient::with_credentials(
2648 "test_key".to_string(),
2649 "test_secret".to_string(),
2650 KrakenEnvironment::Mainnet,
2651 None,
2652 60,
2653 None,
2654 None,
2655 None,
2656 None,
2657 KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND,
2658 )
2659 .unwrap();
2660 assert!(client.credential.is_some());
2661 }
2662
2663 #[rstest]
2664 fn test_client_creation() {
2665 let client = KrakenFuturesHttpClient::default();
2666 assert!(client.instruments_cache.is_empty());
2667 }
2668
2669 #[rstest]
2670 fn test_client_with_credentials() {
2671 let client = KrakenFuturesHttpClient::with_credentials(
2672 "test_key".to_string(),
2673 "test_secret".to_string(),
2674 KrakenEnvironment::Mainnet,
2675 None,
2676 60,
2677 None,
2678 None,
2679 None,
2680 None,
2681 KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND,
2682 )
2683 .unwrap();
2684 assert!(client.instruments_cache.is_empty());
2685 }
2686
2687 #[rstest]
2688 fn test_parse_multi_collateral_margins() {
2689 let account = FuturesAccount {
2690 account_type: KrakenFuturesAccountType::MultiCollateralMarginAccount,
2691 balances: AHashMap::new(),
2692 currencies: AHashMap::new(),
2693 auxiliary: None,
2694 margin_requirements: Some(FuturesMarginRequirements {
2695 im: Some(500.0),
2696 mm: Some(250.0),
2697 lt: None,
2698 tt: None,
2699 }),
2700 portfolio_value: Some(10000.0),
2701 available_margin: Some(9500.0),
2702 initial_margin: Some(500.0),
2703 pnl: None,
2704 };
2705
2706 let mut margins = Vec::new();
2707 parse_multi_collateral_margins(&account, &mut margins);
2708
2709 assert_eq!(margins.len(), 1);
2710 let margin = &margins[0];
2711 assert!(margin.instrument_id.is_none());
2712 assert_eq!(margin.currency.code.as_str(), "USD");
2713 assert_eq!(margin.initial.as_f64(), 500.0);
2714 assert_eq!(margin.maintenance.as_f64(), 250.0);
2715 }
2716
2717 #[rstest]
2718 fn test_parse_multi_collateral_margins_zero_skipped() {
2719 let account = FuturesAccount {
2720 account_type: KrakenFuturesAccountType::MultiCollateralMarginAccount,
2721 balances: AHashMap::new(),
2722 currencies: AHashMap::new(),
2723 auxiliary: None,
2724 margin_requirements: None,
2725 portfolio_value: None,
2726 available_margin: None,
2727 initial_margin: Some(0.0),
2728 pnl: None,
2729 };
2730
2731 let mut margins = Vec::new();
2732 parse_multi_collateral_margins(&account, &mut margins);
2733
2734 assert_eq!(margins.len(), 0);
2735 }
2736
2737 #[rstest]
2738 fn test_parse_margin_account_margins() {
2739 let account = FuturesAccount {
2740 account_type: KrakenFuturesAccountType::MarginAccount,
2741 balances: AHashMap::new(),
2742 currencies: AHashMap::new(),
2743 auxiliary: None,
2744 margin_requirements: Some(FuturesMarginRequirements {
2745 im: Some(100.0),
2746 mm: Some(50.0),
2747 lt: None,
2748 tt: None,
2749 }),
2750 portfolio_value: None,
2751 available_margin: None,
2752 initial_margin: None,
2753 pnl: None,
2754 };
2755
2756 let mut margins = Vec::new();
2757 parse_margin_account_margins(&account, &mut margins);
2758
2759 assert_eq!(margins.len(), 1);
2760 let margin = &margins[0];
2761 assert_eq!(margin.initial.as_f64(), 100.0);
2762 assert_eq!(margin.maintenance.as_f64(), 50.0);
2763 }
2764
2765 #[rstest]
2766 fn test_parse_margin_account_margins_no_requirements() {
2767 let account = FuturesAccount {
2768 account_type: KrakenFuturesAccountType::MarginAccount,
2769 balances: AHashMap::new(),
2770 currencies: AHashMap::new(),
2771 auxiliary: None,
2772 margin_requirements: None,
2773 portfolio_value: None,
2774 available_margin: None,
2775 initial_margin: None,
2776 pnl: None,
2777 };
2778
2779 let mut margins = Vec::new();
2780 parse_margin_account_margins(&account, &mut margins);
2781
2782 assert_eq!(margins.len(), 0);
2783 }
2784
2785 #[rstest]
2786 fn test_parse_multi_collateral_balances() {
2787 let mut currencies = AHashMap::new();
2788 currencies.insert(
2789 "BTC".to_string(),
2790 FuturesFlexCurrency {
2791 quantity: 1.5,
2792 value: None,
2793 collateral: None,
2794 available: Some(1.2),
2795 },
2796 );
2797
2798 let account = FuturesAccount {
2799 account_type: KrakenFuturesAccountType::MultiCollateralMarginAccount,
2800 balances: AHashMap::new(),
2801 currencies,
2802 auxiliary: None,
2803 margin_requirements: None,
2804 portfolio_value: Some(50000.0),
2805 available_margin: Some(45000.0),
2806 initial_margin: None,
2807 pnl: None,
2808 };
2809
2810 let mut balances = Vec::new();
2811 parse_multi_collateral_balances(&account, &mut balances);
2812
2813 assert_eq!(balances.len(), 2);
2815 }
2816
2817 #[rstest]
2818 fn test_parse_margin_account_balances_free_is_derived_from_total_minus_locked() {
2819 let mut bals = AHashMap::new();
2825 let af_f = 35.0_f64 * 1e-9;
2833 let amount_f = 10.0_f64 + af_f;
2834 bals.insert("XBT".to_string(), amount_f);
2835
2836 let account = FuturesAccount {
2837 account_type: KrakenFuturesAccountType::MarginAccount,
2838 balances: bals,
2839 currencies: AHashMap::new(),
2840 auxiliary: Some(FuturesAuxiliary {
2841 usd: None,
2842 pv: None,
2843 pnl: None,
2844 af: Some(af_f),
2845 funding: None,
2846 }),
2847 margin_requirements: None,
2848 portfolio_value: None,
2849 available_margin: None,
2850 initial_margin: None,
2851 pnl: None,
2852 };
2853
2854 let mut balances = Vec::new();
2855 parse_margin_account_balances(&account, &mut balances);
2856
2857 assert_eq!(balances.len(), 1);
2858 let balance = &balances[0];
2859 assert_eq!(balance.total, balance.locked + balance.free);
2862 assert_eq!(balance.free, balance.total - balance.locked);
2864 }
2865
2866 #[rstest]
2867 #[case::nan_total(f64::NAN, 0.0)]
2868 #[case::infinity_total(f64::INFINITY, 0.0)]
2869 #[case::neg_infinity_total(f64::NEG_INFINITY, 0.0)]
2870 #[case::nan_locked(1.0, f64::NAN)]
2871 #[case::infinity_locked(1.0, f64::INFINITY)]
2872 fn test_push_balance_from_f64_skips_non_finite(#[case] total: f64, #[case] locked: f64) {
2873 let currency = Currency::new("BTC", 8, 0, "BTC", CurrencyType::Crypto);
2874 let mut balances = Vec::new();
2875
2876 push_balance_from_f64(&mut balances, total, locked, currency, "BTC");
2877
2878 assert!(balances.is_empty());
2879 }
2880
2881 #[rstest]
2882 fn test_parse_cash_account_balances() {
2883 let mut bals = AHashMap::new();
2884 bals.insert("ETH".to_string(), 10.0);
2885 bals.insert("BTC".to_string(), 0.0); let account = FuturesAccount {
2888 account_type: KrakenFuturesAccountType::CashAccount,
2889 balances: bals,
2890 currencies: AHashMap::new(),
2891 auxiliary: None,
2892 margin_requirements: None,
2893 portfolio_value: None,
2894 available_margin: None,
2895 initial_margin: None,
2896 pnl: None,
2897 };
2898
2899 let mut balances = Vec::new();
2900 parse_cash_account_balances(&account, &mut balances);
2901
2902 assert_eq!(balances.len(), 1);
2903 let balance = &balances[0];
2904 assert_eq!(balance.total.as_f64(), 10.0);
2905 assert_eq!(balance.locked.as_f64(), 0.0);
2906 }
2907
2908 #[rstest]
2909 #[case(None, None)]
2910 #[case(Some(TriggerType::Default), Some(KrakenTriggerSignal::Last))]
2911 #[case(Some(TriggerType::LastPrice), Some(KrakenTriggerSignal::Last))]
2912 #[case(Some(TriggerType::MarkPrice), Some(KrakenTriggerSignal::Mark))]
2913 #[case(Some(TriggerType::IndexPrice), Some(KrakenTriggerSignal::Index))]
2914 fn test_build_send_order_params_maps_supported_trigger_signals(
2915 #[case] trigger_type: Option<TriggerType>,
2916 #[case] expected_signal: Option<KrakenTriggerSignal>,
2917 ) {
2918 let client = KrakenFuturesHttpClient::default();
2919 let instrument_id = cache_test_futures_instrument(&client);
2920
2921 let params = client
2922 .build_send_order_params(
2923 instrument_id,
2924 ClientOrderId::new("futures-trigger"),
2925 OrderSide::Buy,
2926 OrderType::StopMarket,
2927 Quantity::from("1"),
2928 TimeInForce::Gtc,
2929 None,
2930 Some(Price::from("45000")),
2931 trigger_type,
2932 false,
2933 false,
2934 )
2935 .unwrap();
2936
2937 assert_eq!(params.trigger_signal, expected_signal);
2938 }
2939
2940 #[rstest]
2941 fn test_build_send_order_params_rejects_unsupported_trigger_signal() {
2942 let client = KrakenFuturesHttpClient::default();
2943 let instrument_id = cache_test_futures_instrument(&client);
2944
2945 let error = client
2946 .build_send_order_params(
2947 instrument_id,
2948 ClientOrderId::new("futures-trigger-invalid"),
2949 OrderSide::Buy,
2950 OrderType::StopMarket,
2951 Quantity::from("1"),
2952 TimeInForce::Gtc,
2953 None,
2954 Some(Price::from("45000")),
2955 Some(TriggerType::BidAsk),
2956 false,
2957 false,
2958 )
2959 .unwrap_err();
2960
2961 assert!(
2962 error
2963 .to_string()
2964 .contains("Unsupported trigger type for Kraken Futures")
2965 );
2966 }
2967
2968 fn cache_test_futures_instrument(client: &KrakenFuturesHttpClient) -> InstrumentId {
2969 let instrument_id = InstrumentId::from("PF_XBTUSD.KRAKEN");
2970
2971 client.cache_instrument(InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2972 instrument_id,
2973 Symbol::new("PF_XBTUSD"),
2974 Currency::BTC(),
2975 Currency::USD(),
2976 Currency::USD(),
2977 false,
2978 0,
2979 4,
2980 Price::from("1"),
2981 Quantity::from("0.0001"),
2982 None,
2983 None,
2984 None,
2985 None,
2986 None,
2987 None,
2988 None,
2989 None,
2990 None,
2991 None,
2992 None,
2993 None,
2994 None,
2995 0.into(),
2996 0.into(),
2997 )));
2998
2999 instrument_id
3000 }
3001}