Skip to main content

nautilus_kraken/http/futures/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! HTTP client for the Kraken Futures REST API.
17
18use std::{
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroU32,
22    sync::{
23        Arc,
24        atomic::{AtomicBool, Ordering},
25    },
26};
27
28use ahash::AHashMap;
29use chrono::{DateTime, Utc};
30use nautilus_core::{
31    AtomicMap, AtomicTime, UUID4, consts::NAUTILUS_USER_AGENT, nanos::UnixNanos,
32    time::get_atomic_clock_realtime,
33};
34use nautilus_model::{
35    data::{Bar, BarType, BookOrder, FundingRateUpdate, TradeTick},
36    enums::{
37        AccountType, BookType, CurrencyType, MarketStatusAction, OrderSide, OrderType, TimeInForce,
38        TriggerType,
39    },
40    events::AccountState,
41    identifiers::{AccountId, ClientOrderId, InstrumentId, Symbol, VenueOrderId},
42    instruments::{Instrument, InstrumentAny},
43    orderbook::OrderBook,
44    reports::{FillReport, OrderStatusReport, PositionStatusReport},
45    types::{AccountBalance, Currency, MarginBalance, Money, Price, Quantity},
46};
47use nautilus_network::{
48    http::{HttpClient, Method, USER_AGENT},
49    ratelimiter::quota::Quota,
50    retry::{RetryConfig, RetryManager},
51};
52use rust_decimal::{Decimal, prelude::FromPrimitive};
53use serde::de::DeserializeOwned;
54use tokio_util::sync::CancellationToken;
55use ustr::Ustr;
56
57use super::{models::*, query::*};
58use crate::{
59    common::{
60        consts::{KRAKEN_VENUE, NAUTILUS_KRAKEN_BROKER_ID},
61        credential::KrakenCredential,
62        enums::{
63            KrakenApiResult, KrakenEnvironment, KrakenFuturesOrderType, KrakenOrderSide,
64            KrakenProductType, KrakenSendStatus, KrakenTriggerSignal,
65        },
66        parse::{
67            bar_type_to_futures_resolution, parse_bar, parse_futures_fill_report,
68            parse_futures_instrument, parse_futures_order_event_status_report,
69            parse_futures_order_status_report, parse_futures_position_status_report,
70            parse_futures_public_execution, truncate_cl_ord_id,
71        },
72        urls::get_kraken_http_base_url,
73    },
74    http::{error::KrakenHttpError, models::OhlcData},
75};
76
77/// Default Kraken Futures REST API rate limit (requests per second).
78pub const KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 5;
79
80const KRAKEN_GLOBAL_RATE_KEY: &str = "kraken:futures:global";
81
82/// Maximum orders per batch cancel request for Kraken Futures API.
83const BATCH_CANCEL_LIMIT: usize = 50;
84
85/// Maximum operations per batch order request for Kraken Futures API.
86const BATCH_ORDER_LIMIT: usize = 10;
87
88/// Raw HTTP client for low-level Kraken Futures API operations.
89///
90/// This client handles request/response operations with the Kraken Futures API,
91/// returning venue-specific response types. It does not parse to Nautilus domain types.
92pub struct KrakenFuturesRawHttpClient {
93    base_url: String,
94    client: HttpClient,
95    credential: Option<KrakenCredential>,
96    retry_manager: RetryManager<KrakenHttpError>,
97    cancellation_token: CancellationToken,
98    clock: &'static AtomicTime,
99    /// Mutex to serialize authenticated requests, ensuring nonces arrive at Kraken in order
100    auth_mutex: tokio::sync::Mutex<()>,
101}
102
103impl Default for KrakenFuturesRawHttpClient {
104    fn default() -> Self {
105        Self::new(
106            KrakenEnvironment::Mainnet,
107            None,
108            60,
109            None,
110            None,
111            None,
112            None,
113            KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND,
114        )
115        .expect("Failed to create default KrakenFuturesRawHttpClient")
116    }
117}
118
119impl Debug for KrakenFuturesRawHttpClient {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        f.debug_struct(stringify!(KrakenFuturesRawHttpClient))
122            .field("base_url", &self.base_url)
123            .field("has_credentials", &self.credential.is_some())
124            .finish()
125    }
126}
127
128impl KrakenFuturesRawHttpClient {
129    /// Creates a new [`KrakenFuturesRawHttpClient`].
130    #[expect(clippy::too_many_arguments)]
131    pub fn new(
132        environment: KrakenEnvironment,
133        base_url_override: Option<String>,
134        timeout_secs: u64,
135        max_retries: Option<u32>,
136        retry_delay_ms: Option<u64>,
137        retry_delay_max_ms: Option<u64>,
138        proxy_url: Option<String>,
139        max_requests_per_second: u32,
140    ) -> anyhow::Result<Self> {
141        let retry_config = RetryConfig {
142            max_retries: max_retries.unwrap_or(3),
143            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
144            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
145            backoff_factor: 2.0,
146            jitter_ms: 1000,
147            operation_timeout_ms: Some(60_000),
148            immediate_first: false,
149            max_elapsed_ms: Some(180_000),
150        };
151
152        let retry_manager = RetryManager::new(retry_config);
153        let base_url = base_url_override.unwrap_or_else(|| {
154            get_kraken_http_base_url(KrakenProductType::Futures, environment).to_string()
155        });
156
157        Ok(Self {
158            base_url,
159            client: HttpClient::new(
160                Self::default_headers(),
161                vec![],
162                Self::rate_limiter_quotas(max_requests_per_second)?,
163                Some(Self::default_quota(max_requests_per_second)?),
164                Some(timeout_secs),
165                proxy_url,
166            )
167            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
168            credential: None,
169            retry_manager,
170            cancellation_token: CancellationToken::new(),
171            clock: get_atomic_clock_realtime(),
172            auth_mutex: tokio::sync::Mutex::new(()),
173        })
174    }
175
176    /// Creates a new [`KrakenFuturesRawHttpClient`] with credentials.
177    #[expect(clippy::too_many_arguments)]
178    pub fn with_credentials(
179        api_key: String,
180        api_secret: String,
181        environment: KrakenEnvironment,
182        base_url_override: Option<String>,
183        timeout_secs: u64,
184        max_retries: Option<u32>,
185        retry_delay_ms: Option<u64>,
186        retry_delay_max_ms: Option<u64>,
187        proxy_url: Option<String>,
188        max_requests_per_second: u32,
189    ) -> anyhow::Result<Self> {
190        let retry_config = RetryConfig {
191            max_retries: max_retries.unwrap_or(3),
192            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
193            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
194            backoff_factor: 2.0,
195            jitter_ms: 1000,
196            operation_timeout_ms: Some(60_000),
197            immediate_first: false,
198            max_elapsed_ms: Some(180_000),
199        };
200
201        let retry_manager = RetryManager::new(retry_config);
202        let base_url = base_url_override.unwrap_or_else(|| {
203            get_kraken_http_base_url(KrakenProductType::Futures, environment).to_string()
204        });
205
206        Ok(Self {
207            base_url,
208            client: HttpClient::new(
209                Self::default_headers(),
210                vec![],
211                Self::rate_limiter_quotas(max_requests_per_second)?,
212                Some(Self::default_quota(max_requests_per_second)?),
213                Some(timeout_secs),
214                proxy_url,
215            )
216            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
217            credential: Some(KrakenCredential::new(api_key, api_secret)),
218            retry_manager,
219            cancellation_token: CancellationToken::new(),
220            clock: get_atomic_clock_realtime(),
221            auth_mutex: tokio::sync::Mutex::new(()),
222        })
223    }
224
225    /// Generates a unique nonce for Kraken Futures API requests.
226    ///
227    /// Uses `AtomicTime` for strict monotonicity. The nanosecond timestamp
228    /// guarantees uniqueness even for rapid consecutive calls.
229    fn generate_nonce(&self) -> u64 {
230        self.clock.get_time_ns().as_u64()
231    }
232
233    /// Returns the base URL for this client.
234    pub fn base_url(&self) -> &str {
235        &self.base_url
236    }
237
238    /// Returns the credential for this client, if set.
239    pub fn credential(&self) -> Option<&KrakenCredential> {
240        self.credential.as_ref()
241    }
242
243    /// Cancels all pending HTTP requests.
244    pub fn cancel_all_requests(&self) {
245        self.cancellation_token.cancel();
246    }
247
248    /// Returns the cancellation token for this client.
249    pub fn cancellation_token(&self) -> &CancellationToken {
250        &self.cancellation_token
251    }
252
253    fn default_headers() -> HashMap<String, String> {
254        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
255    }
256
257    fn default_quota(max_requests_per_second: u32) -> anyhow::Result<Quota> {
258        let burst = NonZeroU32::new(max_requests_per_second).unwrap_or(
259            NonZeroU32::new(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND).expect("non-zero"),
260        );
261        Quota::per_second(burst).ok_or_else(|| {
262            anyhow::anyhow!(
263                "Invalid max_requests_per_second: {max_requests_per_second} exceeds maximum"
264            )
265        })
266    }
267
268    fn rate_limiter_quotas(max_requests_per_second: u32) -> anyhow::Result<Vec<(String, Quota)>> {
269        Ok(vec![(
270            KRAKEN_GLOBAL_RATE_KEY.to_string(),
271            Self::default_quota(max_requests_per_second)?,
272        )])
273    }
274
275    fn rate_limit_keys(endpoint: &str) -> Vec<String> {
276        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
277        let route = format!("kraken:futures:{normalized}");
278        vec![KRAKEN_GLOBAL_RATE_KEY.to_string(), route]
279    }
280
281    async fn send_request<T: DeserializeOwned>(
282        &self,
283        method: Method,
284        endpoint: &str,
285        url: String,
286        authenticate: bool,
287    ) -> anyhow::Result<T, KrakenHttpError> {
288        // Serialize authenticated requests to ensure nonces arrive at Kraken in order.
289        // Without this, concurrent requests can race through the network and arrive
290        // out-of-order, causing "Invalid nonce" errors.
291        let _guard = if authenticate {
292            Some(self.auth_mutex.lock().await)
293        } else {
294            None
295        };
296
297        let endpoint = endpoint.to_string();
298        let method_clone = method.clone();
299        let url_clone = url.clone();
300        let credential = self.credential.clone();
301
302        let operation = || {
303            let url = url_clone.clone();
304            let method = method_clone.clone();
305            let endpoint = endpoint.clone();
306            let credential = credential.clone();
307
308            async move {
309                let mut headers = Self::default_headers();
310
311                if authenticate {
312                    let cred = credential.as_ref().ok_or_else(|| {
313                        KrakenHttpError::AuthenticationError(
314                            "Missing credentials for authenticated request".to_string(),
315                        )
316                    })?;
317
318                    let nonce = self.generate_nonce();
319
320                    let signature = cred.sign_futures(&endpoint, "", nonce).map_err(|e| {
321                        KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
322                    })?;
323
324                    let base_url = &self.base_url;
325                    log::debug!(
326                        "Kraken Futures auth: endpoint={endpoint}, nonce={nonce}, base_url={base_url}"
327                    );
328
329                    headers.insert("APIKey".to_string(), cred.api_key().to_string());
330                    headers.insert("Authent".to_string(), signature);
331                    headers.insert("Nonce".to_string(), nonce.to_string());
332                }
333
334                let rate_limit_keys = Self::rate_limit_keys(&endpoint);
335
336                let response = self
337                    .client
338                    .request(
339                        method,
340                        url,
341                        None,
342                        Some(headers),
343                        None,
344                        None,
345                        Some(rate_limit_keys),
346                    )
347                    .await
348                    .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
349
350                let status = response.status.as_u16();
351                if status >= 400 {
352                    let body = String::from_utf8_lossy(&response.body).to_string();
353                    // Don't retry authentication errors
354                    if status == 401 || status == 403 {
355                        return Err(KrakenHttpError::AuthenticationError(format!(
356                            "HTTP error {status}: {body}"
357                        )));
358                    }
359                    return Err(KrakenHttpError::NetworkError(format!(
360                        "HTTP error {status}: {body}"
361                    )));
362                }
363
364                let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
365                    KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
366                })?;
367
368                serde_json::from_str(&response_text).map_err(|e| {
369                    KrakenHttpError::ParseError(format!(
370                        "Failed to deserialize futures response: {e}"
371                    ))
372                })
373            }
374        };
375
376        let should_retry =
377            |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
378        let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
379
380        self.retry_manager
381            .execute_with_retry_with_cancel(
382                &endpoint,
383                operation,
384                should_retry,
385                create_error,
386                &self.cancellation_token,
387            )
388            .await
389    }
390
391    /// Sends authenticated GET request with query parameters included in signature.
392    ///
393    /// For Kraken Futures, GET requests with query params must include them in postData
394    /// for signing: message = postData + nonce + endpoint
395    async fn send_get_with_query<T: DeserializeOwned>(
396        &self,
397        endpoint: &str,
398        url: String,
399        query_string: &str,
400    ) -> anyhow::Result<T, KrakenHttpError> {
401        let _guard = self.auth_mutex.lock().await;
402
403        if self.cancellation_token.is_cancelled() {
404            return Err(KrakenHttpError::NetworkError(
405                "Request cancelled".to_string(),
406            ));
407        }
408
409        let credential = self.credential.as_ref().ok_or_else(|| {
410            KrakenHttpError::AuthenticationError("Missing credentials".to_string())
411        })?;
412
413        let nonce = self.generate_nonce();
414
415        // Query params go in postData for signing (not in endpoint)
416        let signature = credential
417            .sign_futures(endpoint, query_string, nonce)
418            .map_err(|e| {
419                KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
420            })?;
421
422        log::debug!(
423            "Kraken Futures GET with query: endpoint={endpoint}, query={query_string}, nonce={nonce}"
424        );
425
426        let mut headers = Self::default_headers();
427        headers.insert("APIKey".to_string(), credential.api_key().to_string());
428        headers.insert("Authent".to_string(), signature);
429        headers.insert("Nonce".to_string(), nonce.to_string());
430
431        let rate_limit_keys = Self::rate_limit_keys(endpoint);
432
433        let response = self
434            .client
435            .request(
436                Method::GET,
437                url,
438                None,
439                Some(headers),
440                None,
441                None,
442                Some(rate_limit_keys),
443            )
444            .await
445            .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
446
447        let status = response.status.as_u16();
448        if status >= 400 {
449            let body = String::from_utf8_lossy(&response.body).to_string();
450
451            if status == 401 || status == 403 {
452                return Err(KrakenHttpError::AuthenticationError(format!(
453                    "HTTP error {status}: {body}"
454                )));
455            }
456            return Err(KrakenHttpError::NetworkError(format!(
457                "HTTP error {status}: {body}"
458            )));
459        }
460
461        let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
462            KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
463        })?;
464
465        serde_json::from_str(&response_text).map_err(|e| {
466            KrakenHttpError::ParseError(format!("Failed to deserialize futures response: {e}"))
467        })
468    }
469
470    async fn send_request_with_body<T: DeserializeOwned>(
471        &self,
472        endpoint: &str,
473        params: HashMap<String, String>,
474    ) -> anyhow::Result<T, KrakenHttpError> {
475        let post_data = serde_urlencoded::to_string(&params)
476            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
477        self.send_authenticated_post(endpoint, post_data).await
478    }
479
480    /// Sends a request with typed parameters (serializable struct).
481    async fn send_request_with_params<P: serde::Serialize, T: DeserializeOwned>(
482        &self,
483        endpoint: &str,
484        params: &P,
485    ) -> anyhow::Result<T, KrakenHttpError> {
486        let post_data = serde_urlencoded::to_string(params)
487            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
488        self.send_authenticated_post(endpoint, post_data).await
489    }
490
491    /// Core authenticated POST request - takes raw post_data string.
492    async fn send_authenticated_post<T: DeserializeOwned>(
493        &self,
494        endpoint: &str,
495        post_data: String,
496    ) -> anyhow::Result<T, KrakenHttpError> {
497        if self.cancellation_token.is_cancelled() {
498            return Err(KrakenHttpError::NetworkError(
499                "Request cancelled".to_string(),
500            ));
501        }
502
503        // Serialize authenticated requests to ensure nonces arrive at Kraken in order
504        let _guard = self.auth_mutex.lock().await;
505
506        if self.cancellation_token.is_cancelled() {
507            return Err(KrakenHttpError::NetworkError(
508                "Request cancelled".to_string(),
509            ));
510        }
511
512        let credential = self.credential.as_ref().ok_or_else(|| {
513            KrakenHttpError::AuthenticationError("Missing credentials".to_string())
514        })?;
515
516        let nonce = self.generate_nonce();
517        log::debug!("Generated nonce {nonce} for {endpoint}");
518
519        let signature = credential
520            .sign_futures(endpoint, &post_data, nonce)
521            .map_err(|e| {
522                KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
523            })?;
524
525        let url = format!("{}{endpoint}", self.base_url);
526        let mut headers = Self::default_headers();
527        headers.insert(
528            "Content-Type".to_string(),
529            "application/x-www-form-urlencoded".to_string(),
530        );
531        headers.insert("APIKey".to_string(), credential.api_key().to_string());
532        headers.insert("Authent".to_string(), signature);
533        headers.insert("Nonce".to_string(), nonce.to_string());
534
535        let rate_limit_keys = Self::rate_limit_keys(endpoint);
536
537        let response = self
538            .client
539            .request(
540                Method::POST,
541                url,
542                None,
543                Some(headers),
544                Some(post_data.into_bytes()),
545                None,
546                Some(rate_limit_keys),
547            )
548            .await
549            .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
550
551        if response.status.as_u16() >= 400 {
552            let status = response.status.as_u16();
553            let body = String::from_utf8_lossy(&response.body).to_string();
554            return Err(KrakenHttpError::NetworkError(format!(
555                "HTTP error {status}: {body}"
556            )));
557        }
558
559        let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
560            KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
561        })?;
562
563        serde_json::from_str(&response_text).map_err(|e| {
564            log::error!("Failed to parse response from {endpoint}: {response_text}");
565            KrakenHttpError::ParseError(format!("Failed to deserialize response: {e}"))
566        })
567    }
568
569    /// Requests tradable instruments from Kraken Futures.
570    pub async fn get_instruments(
571        &self,
572    ) -> anyhow::Result<FuturesInstrumentsResponse, KrakenHttpError> {
573        let endpoint = "/derivatives/api/v3/instruments";
574        let url = format!("{}{endpoint}", self.base_url);
575
576        self.send_request(Method::GET, endpoint, url, false).await
577    }
578
579    /// Requests ticker information for all futures instruments.
580    pub async fn get_tickers(&self) -> anyhow::Result<FuturesTickersResponse, KrakenHttpError> {
581        let endpoint = "/derivatives/api/v3/tickers";
582        let url = format!("{}{endpoint}", self.base_url);
583
584        self.send_request(Method::GET, endpoint, url, false).await
585    }
586
587    /// Requests order book depth for a futures symbol.
588    pub async fn get_orderbook(
589        &self,
590        symbol: &str,
591    ) -> anyhow::Result<FuturesOrderBookResponse, KrakenHttpError> {
592        let endpoint = format!("/derivatives/api/v3/orderbook?symbol={symbol}");
593        let url = format!("{}{endpoint}", self.base_url);
594
595        self.send_request(Method::GET, &endpoint, url, false).await
596    }
597
598    /// Requests historical funding rates for a futures symbol.
599    pub async fn get_historical_funding_rates(
600        &self,
601        symbol: &str,
602    ) -> anyhow::Result<FuturesHistoricalFundingRatesResponse, KrakenHttpError> {
603        let endpoint = format!("/derivatives/api/v4/historicalfundingrates?symbol={symbol}");
604        let url = format!("{}{endpoint}", self.base_url);
605
606        self.send_request(Method::GET, &endpoint, url, false).await
607    }
608
609    /// Requests OHLC candlestick data for a futures symbol.
610    pub async fn get_ohlc(
611        &self,
612        tick_type: &str,
613        symbol: &str,
614        resolution: &str,
615        from: Option<i64>,
616        to: Option<i64>,
617    ) -> anyhow::Result<FuturesCandlesResponse, KrakenHttpError> {
618        let endpoint = format!("/api/charts/v1/{tick_type}/{symbol}/{resolution}");
619
620        let mut url = format!("{}{endpoint}", self.base_url);
621
622        let mut query_params = Vec::new();
623
624        if let Some(from_ts) = from {
625            query_params.push(format!("from={from_ts}"));
626        }
627
628        if let Some(to_ts) = to {
629            query_params.push(format!("to={to_ts}"));
630        }
631
632        if !query_params.is_empty() {
633            url.push('?');
634            url.push_str(&query_params.join("&"));
635        }
636
637        self.send_request(Method::GET, &endpoint, url, false).await
638    }
639
640    /// Gets public execution events (trades) for a futures symbol.
641    pub async fn get_public_executions(
642        &self,
643        symbol: &str,
644        since: Option<i64>,
645        before: Option<i64>,
646        sort: Option<&str>,
647        continuation_token: Option<&str>,
648    ) -> anyhow::Result<FuturesPublicExecutionsResponse, KrakenHttpError> {
649        let endpoint = format!("/api/history/v3/market/{symbol}/executions");
650
651        let mut url = format!("{}{endpoint}", self.base_url);
652
653        let mut query_params = Vec::new();
654
655        if let Some(since_ts) = since {
656            query_params.push(format!("since={since_ts}"));
657        }
658
659        if let Some(before_ts) = before {
660            query_params.push(format!("before={before_ts}"));
661        }
662
663        if let Some(sort_order) = sort {
664            query_params.push(format!("sort={sort_order}"));
665        }
666
667        if let Some(token) = continuation_token {
668            query_params.push(format!("continuationToken={token}"));
669        }
670
671        if !query_params.is_empty() {
672            url.push('?');
673            url.push_str(&query_params.join("&"));
674        }
675
676        self.send_request(Method::GET, &endpoint, url, false).await
677    }
678
679    /// Requests all open orders (requires authentication).
680    pub async fn get_open_orders(
681        &self,
682    ) -> anyhow::Result<FuturesOpenOrdersResponse, KrakenHttpError> {
683        if self.credential.is_none() {
684            return Err(KrakenHttpError::AuthenticationError(
685                "API credentials required for futures open orders".to_string(),
686            ));
687        }
688
689        let endpoint = "/derivatives/api/v3/openorders";
690        let url = format!("{}{endpoint}", self.base_url);
691
692        self.send_request(Method::GET, endpoint, url, true).await
693    }
694
695    /// Requests historical order events (requires authentication).
696    pub async fn get_order_events(
697        &self,
698        before: Option<i64>,
699        since: Option<i64>,
700        continuation_token: Option<&str>,
701    ) -> anyhow::Result<FuturesOrderEventsResponse, KrakenHttpError> {
702        if self.credential.is_none() {
703            return Err(KrakenHttpError::AuthenticationError(
704                "API credentials required for futures order events".to_string(),
705            ));
706        }
707
708        let endpoint = "/api/history/v2/orders";
709        let mut query_params = Vec::new();
710
711        if let Some(before_ts) = before {
712            query_params.push(format!("before={before_ts}"));
713        }
714
715        if let Some(since_ts) = since {
716            query_params.push(format!("since={since_ts}"));
717        }
718
719        if let Some(token) = continuation_token {
720            query_params.push(format!("continuation_token={token}"));
721        }
722
723        // Build URL with query params
724        let query_string = query_params.join("&");
725        let url = if query_string.is_empty() {
726            format!("{}{endpoint}", self.base_url)
727        } else {
728            format!("{}{endpoint}?{query_string}", self.base_url)
729        };
730
731        // For signing: query params go in postData, not endpoint
732        // Kraken: message = postData + nonce + endpoint
733        self.send_get_with_query(endpoint, url, &query_string).await
734    }
735
736    /// Requests fill/trade history (requires authentication).
737    pub async fn get_fills(
738        &self,
739        last_fill_time: Option<&str>,
740    ) -> anyhow::Result<FuturesFillsResponse, KrakenHttpError> {
741        if self.credential.is_none() {
742            return Err(KrakenHttpError::AuthenticationError(
743                "API credentials required for futures fills".to_string(),
744            ));
745        }
746
747        let endpoint = "/derivatives/api/v3/fills";
748        let query_string = last_fill_time
749            .map(|t| format!("lastFillTime={t}"))
750            .unwrap_or_default();
751
752        let url = if query_string.is_empty() {
753            format!("{}{endpoint}", self.base_url)
754        } else {
755            format!("{}{endpoint}?{query_string}", self.base_url)
756        };
757
758        // Query params go in postData for signing
759        self.send_get_with_query(endpoint, url, &query_string).await
760    }
761
762    /// Requests open positions (requires authentication).
763    pub async fn get_open_positions(
764        &self,
765    ) -> anyhow::Result<FuturesOpenPositionsResponse, KrakenHttpError> {
766        if self.credential.is_none() {
767            return Err(KrakenHttpError::AuthenticationError(
768                "API credentials required for futures open positions".to_string(),
769            ));
770        }
771
772        let endpoint = "/derivatives/api/v3/openpositions";
773        let url = format!("{}{endpoint}", self.base_url);
774
775        self.send_request(Method::GET, endpoint, url, true).await
776    }
777
778    /// Requests all accounts (cash and margin) with balances and margin info.
779    pub async fn get_accounts(&self) -> anyhow::Result<FuturesAccountsResponse, KrakenHttpError> {
780        if self.credential.is_none() {
781            return Err(KrakenHttpError::AuthenticationError(
782                "API credentials required for futures accounts".to_string(),
783            ));
784        }
785
786        let endpoint = "/derivatives/api/v3/accounts";
787        let url = format!("{}{endpoint}", self.base_url);
788
789        self.send_request(Method::GET, endpoint, url, true).await
790    }
791
792    /// Submits a new order (requires authentication).
793    pub async fn send_order(
794        &self,
795        params: HashMap<String, String>,
796    ) -> anyhow::Result<FuturesSendOrderResponse, KrakenHttpError> {
797        if self.credential.is_none() {
798            return Err(KrakenHttpError::AuthenticationError(
799                "API credentials required for sending orders".to_string(),
800            ));
801        }
802
803        let endpoint = "/derivatives/api/v3/sendorder";
804        self.send_request_with_body(endpoint, params).await
805    }
806
807    /// Submits a new order using typed parameters (requires authentication).
808    pub async fn send_order_params(
809        &self,
810        params: &KrakenFuturesSendOrderParams,
811    ) -> anyhow::Result<FuturesSendOrderResponse, KrakenHttpError> {
812        if self.credential.is_none() {
813            return Err(KrakenHttpError::AuthenticationError(
814                "API credentials required for sending orders".to_string(),
815            ));
816        }
817
818        let endpoint = "/derivatives/api/v3/sendorder";
819        self.send_request_with_params(endpoint, params).await
820    }
821
822    /// Cancels an open order (requires authentication).
823    pub async fn cancel_order(
824        &self,
825        order_id: Option<String>,
826        cli_ord_id: Option<String>,
827    ) -> anyhow::Result<FuturesCancelOrderResponse, KrakenHttpError> {
828        if self.credential.is_none() {
829            return Err(KrakenHttpError::AuthenticationError(
830                "API credentials required for canceling orders".to_string(),
831            ));
832        }
833
834        let mut params = HashMap::new();
835
836        if let Some(id) = order_id {
837            params.insert("order_id".to_string(), id);
838        }
839
840        if let Some(id) = cli_ord_id {
841            params.insert("cliOrdId".to_string(), id);
842        }
843
844        let endpoint = "/derivatives/api/v3/cancelorder";
845        self.send_request_with_body(endpoint, params).await
846    }
847
848    /// Edits an existing order (requires authentication).
849    pub async fn edit_order(
850        &self,
851        params: &KrakenFuturesEditOrderParams,
852    ) -> anyhow::Result<FuturesEditOrderResponse, KrakenHttpError> {
853        if self.credential.is_none() {
854            return Err(KrakenHttpError::AuthenticationError(
855                "API credentials required for editing orders".to_string(),
856            ));
857        }
858
859        let endpoint = "/derivatives/api/v3/editorder";
860        self.send_request_with_params(endpoint, params).await
861    }
862
863    /// Submits multiple orders in a single batch request (requires authentication).
864    pub async fn batch_order(
865        &self,
866        params: HashMap<String, String>,
867    ) -> anyhow::Result<FuturesBatchOrderResponse, KrakenHttpError> {
868        if self.credential.is_none() {
869            return Err(KrakenHttpError::AuthenticationError(
870                "API credentials required for batch orders".to_string(),
871            ));
872        }
873
874        let endpoint = "/derivatives/api/v3/batchorder";
875        self.send_request_with_body(endpoint, params).await
876    }
877
878    /// Cancels multiple orders in a single batch request (requires authentication).
879    pub async fn cancel_orders_batch(
880        &self,
881        order_ids: Vec<String>,
882    ) -> anyhow::Result<FuturesBatchCancelResponse, KrakenHttpError> {
883        if self.credential.is_none() {
884            return Err(KrakenHttpError::AuthenticationError(
885                "API credentials required for batch orders".to_string(),
886            ));
887        }
888
889        let batch_items: Vec<KrakenFuturesBatchCancelItem> = order_ids
890            .into_iter()
891            .map(KrakenFuturesBatchCancelItem::from_order_id)
892            .collect();
893
894        let params = KrakenFuturesBatchOrderParams::new(batch_items);
895        let post_data = params
896            .to_body()
897            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize batch: {e}")))?;
898
899        let endpoint = "/derivatives/api/v3/batchorder";
900        self.send_authenticated_post(endpoint, post_data).await
901    }
902
903    /// Submits multiple orders in a single batch request (requires authentication).
904    pub async fn submit_orders_batch(
905        &self,
906        items: Vec<KrakenFuturesBatchSendItem>,
907    ) -> anyhow::Result<FuturesBatchOrderResponse, KrakenHttpError> {
908        if self.credential.is_none() {
909            return Err(KrakenHttpError::AuthenticationError(
910                "API credentials required for batch orders".to_string(),
911            ));
912        }
913
914        let params = KrakenFuturesBatchOrderParams::new(items);
915        let post_data = params
916            .to_body()
917            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize batch: {e}")))?;
918
919        let endpoint = "/derivatives/api/v3/batchorder";
920        self.send_authenticated_post(endpoint, post_data).await
921    }
922
923    /// Edits multiple orders in a single batch request (requires authentication).
924    pub async fn edit_orders_batch(
925        &self,
926        items: Vec<KrakenFuturesBatchEditItem>,
927    ) -> anyhow::Result<FuturesBatchOrderResponse, KrakenHttpError> {
928        if self.credential.is_none() {
929            return Err(KrakenHttpError::AuthenticationError(
930                "API credentials required for batch orders".to_string(),
931            ));
932        }
933
934        let params = KrakenFuturesBatchOrderParams::new(items);
935        let post_data = params
936            .to_body()
937            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize batch: {e}")))?;
938
939        let endpoint = "/derivatives/api/v3/batchorder";
940        self.send_authenticated_post(endpoint, post_data).await
941    }
942
943    /// Cancels all open orders, optionally filtered by symbol (requires authentication).
944    pub async fn cancel_all_orders(
945        &self,
946        symbol: Option<String>,
947    ) -> anyhow::Result<FuturesCancelAllOrdersResponse, KrakenHttpError> {
948        if self.credential.is_none() {
949            return Err(KrakenHttpError::AuthenticationError(
950                "API credentials required for canceling orders".to_string(),
951            ));
952        }
953
954        let mut params = HashMap::new();
955
956        if let Some(sym) = symbol {
957            params.insert("symbol".to_string(), sym);
958        }
959
960        let endpoint = "/derivatives/api/v3/cancelallorders";
961        self.send_request_with_body(endpoint, params).await
962    }
963}
964
965/// High-level HTTP client for the Kraken Futures REST API.
966///
967/// This client wraps the raw client and provides Nautilus domain types.
968/// It maintains an instrument cache and uses it to parse venue responses
969/// into Nautilus domain objects.
970#[cfg_attr(
971    feature = "python",
972    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken", from_py_object)
973)]
974#[cfg_attr(
975    feature = "python",
976    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.kraken")
977)]
978pub struct KrakenFuturesHttpClient {
979    pub(crate) inner: Arc<KrakenFuturesRawHttpClient>,
980    pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
981    clock: &'static AtomicTime,
982    cache_initialized: Arc<AtomicBool>,
983}
984
985impl Clone for KrakenFuturesHttpClient {
986    fn clone(&self) -> Self {
987        Self {
988            inner: self.inner.clone(),
989            instruments_cache: self.instruments_cache.clone(),
990            cache_initialized: self.cache_initialized.clone(),
991            clock: self.clock,
992        }
993    }
994}
995
996impl Default for KrakenFuturesHttpClient {
997    fn default() -> Self {
998        Self::new(
999            KrakenEnvironment::Mainnet,
1000            None,
1001            60,
1002            None,
1003            None,
1004            None,
1005            None,
1006            KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND,
1007        )
1008        .expect("Failed to create default KrakenFuturesHttpClient")
1009    }
1010}
1011
1012impl Debug for KrakenFuturesHttpClient {
1013    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1014        f.debug_struct(stringify!(KrakenFuturesHttpClient))
1015            .field("inner", &self.inner)
1016            .finish()
1017    }
1018}
1019
1020impl KrakenFuturesHttpClient {
1021    /// Creates a new [`KrakenFuturesHttpClient`].
1022    #[expect(clippy::too_many_arguments)]
1023    pub fn new(
1024        environment: KrakenEnvironment,
1025        base_url_override: Option<String>,
1026        timeout_secs: u64,
1027        max_retries: Option<u32>,
1028        retry_delay_ms: Option<u64>,
1029        retry_delay_max_ms: Option<u64>,
1030        proxy_url: Option<String>,
1031        max_requests_per_second: u32,
1032    ) -> anyhow::Result<Self> {
1033        Ok(Self {
1034            inner: Arc::new(KrakenFuturesRawHttpClient::new(
1035                environment,
1036                base_url_override,
1037                timeout_secs,
1038                max_retries,
1039                retry_delay_ms,
1040                retry_delay_max_ms,
1041                proxy_url,
1042                max_requests_per_second,
1043            )?),
1044            instruments_cache: Arc::new(AtomicMap::new()),
1045            cache_initialized: Arc::new(AtomicBool::new(false)),
1046            clock: get_atomic_clock_realtime(),
1047        })
1048    }
1049
1050    /// Creates a new [`KrakenFuturesHttpClient`] with credentials.
1051    #[expect(clippy::too_many_arguments)]
1052    pub fn with_credentials(
1053        api_key: String,
1054        api_secret: String,
1055        environment: KrakenEnvironment,
1056        base_url_override: Option<String>,
1057        timeout_secs: u64,
1058        max_retries: Option<u32>,
1059        retry_delay_ms: Option<u64>,
1060        retry_delay_max_ms: Option<u64>,
1061        proxy_url: Option<String>,
1062        max_requests_per_second: u32,
1063    ) -> anyhow::Result<Self> {
1064        Ok(Self {
1065            inner: Arc::new(KrakenFuturesRawHttpClient::with_credentials(
1066                api_key,
1067                api_secret,
1068                environment,
1069                base_url_override,
1070                timeout_secs,
1071                max_retries,
1072                retry_delay_ms,
1073                retry_delay_max_ms,
1074                proxy_url,
1075                max_requests_per_second,
1076            )?),
1077            instruments_cache: Arc::new(AtomicMap::new()),
1078            cache_initialized: Arc::new(AtomicBool::new(false)),
1079            clock: get_atomic_clock_realtime(),
1080        })
1081    }
1082
1083    /// Creates a new [`KrakenFuturesHttpClient`] loading credentials from environment variables.
1084    ///
1085    /// Looks for `KRAKEN_FUTURES_API_KEY` and `KRAKEN_FUTURES_API_SECRET` (mainnet)
1086    /// or `KRAKEN_FUTURES_DEMO_API_KEY` and `KRAKEN_FUTURES_DEMO_API_SECRET` (demo).
1087    ///
1088    /// Falls back to unauthenticated client if credentials are not set.
1089    #[expect(clippy::too_many_arguments)]
1090    pub fn from_env(
1091        environment: KrakenEnvironment,
1092        base_url_override: Option<String>,
1093        timeout_secs: u64,
1094        max_retries: Option<u32>,
1095        retry_delay_ms: Option<u64>,
1096        retry_delay_max_ms: Option<u64>,
1097        proxy_url: Option<String>,
1098        max_requests_per_second: u32,
1099    ) -> anyhow::Result<Self> {
1100        let demo = environment == KrakenEnvironment::Demo;
1101
1102        if let Some(credential) = KrakenCredential::from_env_futures(demo) {
1103            let (api_key, api_secret) = credential.into_parts();
1104            Self::with_credentials(
1105                api_key,
1106                api_secret,
1107                environment,
1108                base_url_override,
1109                timeout_secs,
1110                max_retries,
1111                retry_delay_ms,
1112                retry_delay_max_ms,
1113                proxy_url,
1114                max_requests_per_second,
1115            )
1116        } else {
1117            Self::new(
1118                environment,
1119                base_url_override,
1120                timeout_secs,
1121                max_retries,
1122                retry_delay_ms,
1123                retry_delay_max_ms,
1124                proxy_url,
1125                max_requests_per_second,
1126            )
1127        }
1128    }
1129
1130    /// Cancels all pending HTTP requests.
1131    pub fn cancel_all_requests(&self) {
1132        self.inner.cancel_all_requests();
1133    }
1134
1135    /// Returns the cancellation token for this client.
1136    pub fn cancellation_token(&self) -> &CancellationToken {
1137        self.inner.cancellation_token()
1138    }
1139
1140    /// Caches an instrument for symbol lookup.
1141    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1142        self.instruments_cache
1143            .insert(instrument.symbol().inner(), instrument);
1144        self.cache_initialized.store(true, Ordering::Release);
1145    }
1146
1147    /// Caches multiple instruments for symbol lookup.
1148    pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1149        self.instruments_cache.rcu(|m| {
1150            for instrument in instruments {
1151                m.insert(instrument.symbol().inner(), instrument.clone());
1152            }
1153        });
1154        self.cache_initialized.store(true, Ordering::Release);
1155    }
1156
1157    /// Gets an instrument from the cache by symbol.
1158    pub fn get_cached_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1159        self.instruments_cache.get_cloned(symbol)
1160    }
1161
1162    fn get_instrument_by_raw_symbol(&self, raw_symbol: &str) -> Option<InstrumentAny> {
1163        self.instruments_cache
1164            .load()
1165            .values()
1166            .find(|inst| inst.raw_symbol().as_str() == raw_symbol)
1167            .cloned()
1168    }
1169
1170    fn generate_ts_init(&self) -> UnixNanos {
1171        self.clock.get_time_ns()
1172    }
1173
1174    /// Requests tradable instruments from Kraken Futures.
1175    pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>, KrakenHttpError> {
1176        let ts_init = self.generate_ts_init();
1177        let response = self.inner.get_instruments().await?;
1178
1179        let instruments: Vec<InstrumentAny> = response
1180            .instruments
1181            .iter()
1182            .filter_map(|fut_instrument| {
1183                match parse_futures_instrument(fut_instrument, ts_init, ts_init) {
1184                    Ok(instrument) => Some(instrument),
1185                    Err(e) => {
1186                        let symbol = &fut_instrument.symbol;
1187                        log::warn!("Failed to parse futures instrument {symbol}: {e}");
1188                        None
1189                    }
1190                }
1191            })
1192            .collect();
1193
1194        Ok(instruments)
1195    }
1196
1197    /// Requests the current market status for Kraken Futures instruments.
1198    pub async fn request_instrument_statuses(
1199        &self,
1200    ) -> anyhow::Result<AHashMap<InstrumentId, MarketStatusAction>, KrakenHttpError> {
1201        let response = self.inner.get_instruments().await?;
1202
1203        Ok(response
1204            .instruments
1205            .iter()
1206            .map(|instrument| {
1207                let instrument_id =
1208                    InstrumentId::new(Symbol::new(&instrument.symbol), *KRAKEN_VENUE);
1209                let action = if instrument.tradeable {
1210                    MarketStatusAction::Trading
1211                } else {
1212                    MarketStatusAction::NotAvailableForTrading
1213                };
1214
1215                (instrument_id, action)
1216            })
1217            .collect())
1218    }
1219
1220    /// Requests the mark price for an instrument.
1221    pub async fn request_mark_price(
1222        &self,
1223        instrument_id: InstrumentId,
1224    ) -> anyhow::Result<f64, KrakenHttpError> {
1225        let instrument = self
1226            .get_cached_instrument(&instrument_id.symbol.inner())
1227            .ok_or_else(|| {
1228                KrakenHttpError::ParseError(format!(
1229                    "Instrument not found in cache: {instrument_id}"
1230                ))
1231            })?;
1232
1233        let raw_symbol = instrument.raw_symbol().to_string();
1234        let tickers = self.inner.get_tickers().await?;
1235
1236        tickers
1237            .tickers
1238            .iter()
1239            .find(|t| t.symbol == raw_symbol)
1240            .ok_or_else(|| {
1241                KrakenHttpError::ParseError(format!("Symbol {raw_symbol} not found in tickers"))
1242            })
1243            .and_then(|t| {
1244                t.mark_price.ok_or_else(|| {
1245                    KrakenHttpError::ParseError(format!(
1246                        "Mark price not available for {raw_symbol} (may not be available in testnet)"
1247                    ))
1248                })
1249            })
1250    }
1251
1252    pub async fn request_index_price(
1253        &self,
1254        instrument_id: InstrumentId,
1255    ) -> anyhow::Result<f64, KrakenHttpError> {
1256        let instrument = self
1257            .get_cached_instrument(&instrument_id.symbol.inner())
1258            .ok_or_else(|| {
1259                KrakenHttpError::ParseError(format!(
1260                    "Instrument not found in cache: {instrument_id}"
1261                ))
1262            })?;
1263
1264        let raw_symbol = instrument.raw_symbol().to_string();
1265        let tickers = self.inner.get_tickers().await?;
1266
1267        tickers
1268            .tickers
1269            .iter()
1270            .find(|t| t.symbol == raw_symbol)
1271            .ok_or_else(|| {
1272                KrakenHttpError::ParseError(format!("Symbol {raw_symbol} not found in tickers"))
1273            })
1274            .and_then(|t| {
1275                t.index_price.ok_or_else(|| {
1276                    KrakenHttpError::ParseError(format!(
1277                        "Index price not available for {raw_symbol} (may not be available in testnet)"
1278                    ))
1279                })
1280            })
1281    }
1282
1283    pub async fn request_trades(
1284        &self,
1285        instrument_id: InstrumentId,
1286        start: Option<DateTime<Utc>>,
1287        end: Option<DateTime<Utc>>,
1288        limit: Option<u64>,
1289    ) -> anyhow::Result<Vec<TradeTick>, KrakenHttpError> {
1290        let instrument = self
1291            .get_cached_instrument(&instrument_id.symbol.inner())
1292            .ok_or_else(|| {
1293                KrakenHttpError::ParseError(format!(
1294                    "Instrument not found in cache: {instrument_id}"
1295                ))
1296            })?;
1297
1298        let raw_symbol = instrument.raw_symbol().to_string();
1299        let ts_init = self.generate_ts_init();
1300
1301        let since = start.map(|dt| dt.timestamp_millis());
1302        let before = end.map(|dt| dt.timestamp_millis());
1303
1304        let response = self
1305            .inner
1306            .get_public_executions(&raw_symbol, since, before, Some("asc"), None)
1307            .await?;
1308
1309        let mut trades = Vec::new();
1310
1311        for element in &response.elements {
1312            let execution = &element.event.execution.execution;
1313            match parse_futures_public_execution(execution, &instrument, ts_init) {
1314                Ok(trade_tick) => {
1315                    trades.push(trade_tick);
1316
1317                    if let Some(limit_count) = limit
1318                        && trades.len() >= limit_count as usize
1319                    {
1320                        return Ok(trades);
1321                    }
1322                }
1323                Err(e) => {
1324                    log::warn!("Failed to parse futures trade tick: {e}");
1325                }
1326            }
1327        }
1328
1329        Ok(trades)
1330    }
1331
1332    pub async fn request_bars(
1333        &self,
1334        bar_type: BarType,
1335        start: Option<DateTime<Utc>>,
1336        end: Option<DateTime<Utc>>,
1337        limit: Option<u64>,
1338    ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
1339        let instrument_id = bar_type.instrument_id();
1340        let instrument = self
1341            .get_cached_instrument(&instrument_id.symbol.inner())
1342            .ok_or_else(|| {
1343                KrakenHttpError::ParseError(format!(
1344                    "Instrument not found in cache: {instrument_id}"
1345                ))
1346            })?;
1347
1348        let raw_symbol = instrument.raw_symbol().to_string();
1349        let ts_init = self.generate_ts_init();
1350        let tick_type = "trade";
1351        let resolution = bar_type_to_futures_resolution(bar_type)
1352            .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?;
1353
1354        // Kraken Futures OHLC API expects Unix timestamp in seconds
1355        let from = start.map(|dt| dt.timestamp());
1356        let to = end.map(|dt| dt.timestamp());
1357        let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1358
1359        let response = self
1360            .inner
1361            .get_ohlc(tick_type, &raw_symbol, resolution, from, to)
1362            .await?;
1363
1364        let mut bars = Vec::new();
1365
1366        for candle in response.candles {
1367            let ohlc = OhlcData {
1368                time: candle.time / 1000,
1369                open: candle.open,
1370                high: candle.high,
1371                low: candle.low,
1372                close: candle.close,
1373                vwap: "0".to_string(),
1374                volume: candle.volume,
1375                count: 0,
1376            };
1377
1378            match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
1379                Ok(bar) => {
1380                    if let Some(end_nanos) = end_ns
1381                        && bar.ts_event.as_u64() > end_nanos
1382                    {
1383                        continue;
1384                    }
1385                    bars.push(bar);
1386
1387                    if let Some(limit_count) = limit
1388                        && bars.len() >= limit_count as usize
1389                    {
1390                        return Ok(bars);
1391                    }
1392                }
1393                Err(e) => {
1394                    log::warn!("Failed to parse futures bar: {e}");
1395                }
1396            }
1397        }
1398
1399        Ok(bars)
1400    }
1401
1402    /// Requests an order book snapshot for a futures instrument.
1403    pub async fn request_book_snapshot(
1404        &self,
1405        instrument_id: InstrumentId,
1406        depth: Option<u32>,
1407    ) -> anyhow::Result<OrderBook, KrakenHttpError> {
1408        let instrument = self
1409            .get_cached_instrument(&instrument_id.symbol.inner())
1410            .ok_or_else(|| {
1411                KrakenHttpError::ParseError(format!(
1412                    "Instrument not found in cache: {instrument_id}"
1413                ))
1414            })?;
1415
1416        let raw_symbol = instrument.raw_symbol().to_string();
1417        let price_precision = instrument.price_precision();
1418        let size_precision = instrument.size_precision();
1419        let ts_event = self.generate_ts_init();
1420
1421        let response = self.inner.get_orderbook(&raw_symbol).await?;
1422        let book_data = &response.order_book;
1423
1424        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1425
1426        let bid_limit = depth.map_or(book_data.bids.len(), |d| {
1427            (d as usize).min(book_data.bids.len())
1428        });
1429        let ask_limit = depth.map_or(book_data.asks.len(), |d| {
1430            (d as usize).min(book_data.asks.len())
1431        });
1432
1433        // Pass sequence=0 so the snapshot does not advance the book's high-water sequence,
1434        // the WS subscription owns sequencing once it starts streaming deltas.
1435        for (i, level) in book_data.bids.iter().take(bid_limit).enumerate() {
1436            let price = Price::new(level.price, price_precision);
1437            let size = Quantity::new(level.qty, size_precision);
1438            let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1439            book.add(order, 0, 0, ts_event);
1440        }
1441
1442        for (i, level) in book_data.asks.iter().take(ask_limit).enumerate() {
1443            let price = Price::new(level.price, price_precision);
1444            let size = Quantity::new(level.qty, size_precision);
1445            let order = BookOrder::new(OrderSide::Sell, price, size, (bid_limit + i) as u64);
1446            book.add(order, 0, 0, ts_event);
1447        }
1448
1449        Ok(book)
1450    }
1451
1452    /// Requests historical funding rates for a futures instrument.
1453    ///
1454    /// Kraken returns all available rates; client-side filtering applies
1455    /// the `start`, `end`, and `limit` constraints from the caller.
1456    pub async fn request_funding_rates(
1457        &self,
1458        instrument_id: InstrumentId,
1459        start: Option<DateTime<Utc>>,
1460        end: Option<DateTime<Utc>>,
1461        limit: Option<usize>,
1462    ) -> anyhow::Result<Vec<FundingRateUpdate>, KrakenHttpError> {
1463        let instrument = self
1464            .get_cached_instrument(&instrument_id.symbol.inner())
1465            .ok_or_else(|| {
1466                KrakenHttpError::ParseError(format!(
1467                    "Instrument not found in cache: {instrument_id}"
1468                ))
1469            })?;
1470
1471        let raw_symbol = instrument.raw_symbol().to_string();
1472        let ts_init = self.generate_ts_init();
1473        let start_ns = start.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1474        let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1475
1476        let response = self.inner.get_historical_funding_rates(&raw_symbol).await?;
1477
1478        let mut rates = Vec::new();
1479
1480        for entry in &response.rates {
1481            let ts_event = entry
1482                .timestamp
1483                .parse::<DateTime<Utc>>()
1484                .map_or(ts_init, |dt| {
1485                    UnixNanos::from(dt.timestamp_nanos_opt().unwrap_or(0) as u64)
1486                });
1487
1488            if let Some(s) = start_ns
1489                && ts_event.as_u64() < s
1490            {
1491                continue;
1492            }
1493
1494            if let Some(e) = end_ns
1495                && ts_event.as_u64() > e
1496            {
1497                continue;
1498            }
1499
1500            let Some(rate) = Decimal::from_f64(entry.relative_funding_rate) else {
1501                continue;
1502            };
1503
1504            rates.push(FundingRateUpdate::new(
1505                instrument_id,
1506                rate,
1507                None,
1508                None,
1509                ts_event,
1510                ts_init,
1511            ));
1512
1513            if let Some(lim) = limit
1514                && rates.len() >= lim
1515            {
1516                break;
1517            }
1518        }
1519
1520        // Kraken returns newest-first; reverse to ascending chronological order
1521        rates.reverse();
1522
1523        Ok(rates)
1524    }
1525
1526    /// Requests account state from the Kraken Futures exchange.
1527    ///
1528    /// This queries the accounts endpoint and converts the response into a
1529    /// Nautilus `AccountState` event containing balances and margin info.
1530    ///
1531    /// # Errors
1532    ///
1533    /// Returns an error if:
1534    /// - Credentials are missing.
1535    /// - The request fails.
1536    /// - Response parsing fails.
1537    pub async fn request_account_state(
1538        &self,
1539        account_id: AccountId,
1540    ) -> anyhow::Result<AccountState> {
1541        let accounts_response = self.inner.get_accounts().await?;
1542
1543        if accounts_response.result != KrakenApiResult::Success {
1544            let error_msg = accounts_response
1545                .error
1546                .unwrap_or_else(|| "Unknown error".to_string());
1547            anyhow::bail!("Failed to get futures accounts: {error_msg}");
1548        }
1549
1550        let ts_init = self.generate_ts_init();
1551
1552        let mut balances: Vec<AccountBalance> = Vec::new();
1553        let mut margins: Vec<MarginBalance> = Vec::new();
1554
1555        for account in accounts_response.accounts.values() {
1556            match account.account_type {
1557                KrakenFuturesAccountType::MultiCollateralMarginAccount => {
1558                    parse_multi_collateral_balances(account, &mut balances);
1559                    parse_multi_collateral_margins(account, &mut margins);
1560                }
1561                KrakenFuturesAccountType::MarginAccount => {
1562                    parse_margin_account_balances(account, &mut balances);
1563                    parse_margin_account_margins(account, &mut margins);
1564                }
1565                KrakenFuturesAccountType::CashAccount => {
1566                    parse_cash_account_balances(account, &mut balances);
1567                }
1568                KrakenFuturesAccountType::Unknown => {
1569                    log::debug!("Unknown account type: {:?}", account.account_type);
1570                }
1571            }
1572        }
1573
1574        Ok(AccountState::new(
1575            account_id,
1576            AccountType::Margin,
1577            balances,
1578            margins,
1579            true,
1580            UUID4::new(),
1581            ts_init,
1582            ts_init,
1583            None,
1584        ))
1585    }
1586
1587    pub async fn request_order_status_reports(
1588        &self,
1589        account_id: AccountId,
1590        instrument_id: Option<InstrumentId>,
1591        start: Option<DateTime<Utc>>,
1592        end: Option<DateTime<Utc>>,
1593        open_only: bool,
1594    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1595        let ts_init = self.generate_ts_init();
1596        let mut all_reports = Vec::new();
1597
1598        let response = self
1599            .inner
1600            .get_open_orders()
1601            .await
1602            .map_err(|e| anyhow::anyhow!("get_open_orders failed: {e}"))?;
1603
1604        if response.result != KrakenApiResult::Success {
1605            let error_msg = response
1606                .error
1607                .unwrap_or_else(|| "Unknown error".to_string());
1608            anyhow::bail!("Failed to get open orders: {error_msg}");
1609        }
1610
1611        for order in &response.open_orders {
1612            if let Some(ref target_id) = instrument_id {
1613                let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1614                if let Some(inst) = instrument
1615                    && inst.raw_symbol().as_str() != order.symbol
1616                {
1617                    continue;
1618                }
1619            }
1620
1621            if let Some(instrument) = self.get_instrument_by_raw_symbol(&order.symbol) {
1622                match parse_futures_order_status_report(order, &instrument, account_id, ts_init) {
1623                    Ok(report) => all_reports.push(report),
1624                    Err(e) => {
1625                        let order_id = &order.order_id;
1626                        log::warn!("Failed to parse futures order {order_id}: {e}");
1627                    }
1628                }
1629            }
1630        }
1631
1632        if !open_only {
1633            // Kraken Futures order events API expects Unix timestamp in milliseconds
1634            let start_ms = start.map(|dt| dt.timestamp_millis());
1635            let end_ms = end.map(|dt| dt.timestamp_millis());
1636            let response = self
1637                .inner
1638                .get_order_events(end_ms, start_ms, None)
1639                .await
1640                .map_err(|e| anyhow::anyhow!("get_order_events failed: {e}"))?;
1641
1642            for event_wrapper in response.order_events {
1643                let event = &event_wrapper.order;
1644
1645                if let Some(ref target_id) = instrument_id {
1646                    let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1647                    if let Some(inst) = instrument
1648                        && inst.raw_symbol().as_str() != event.symbol
1649                    {
1650                        continue;
1651                    }
1652                }
1653
1654                if let Some(instrument) = self.get_instrument_by_raw_symbol(&event.symbol) {
1655                    match parse_futures_order_event_status_report(
1656                        event,
1657                        Some(event_wrapper.event_type),
1658                        &instrument,
1659                        account_id,
1660                        ts_init,
1661                    ) {
1662                        Ok(report) => all_reports.push(report),
1663                        Err(e) => {
1664                            let order_id = &event.order_id;
1665                            log::warn!("Failed to parse futures order event {order_id}: {e}");
1666                        }
1667                    }
1668                }
1669            }
1670        }
1671
1672        Ok(all_reports)
1673    }
1674
1675    pub async fn request_fill_reports(
1676        &self,
1677        account_id: AccountId,
1678        instrument_id: Option<InstrumentId>,
1679        start: Option<DateTime<Utc>>,
1680        end: Option<DateTime<Utc>>,
1681    ) -> anyhow::Result<Vec<FillReport>> {
1682        let ts_init = self.generate_ts_init();
1683        let mut all_reports = Vec::new();
1684
1685        let response = self.inner.get_fills(None).await?;
1686        if response.result != KrakenApiResult::Success {
1687            let error_msg = response
1688                .error
1689                .unwrap_or_else(|| "Unknown error".to_string());
1690            anyhow::bail!("Failed to get fills: {error_msg}");
1691        }
1692
1693        let start_ms = start.map(|dt| dt.timestamp_millis());
1694        let end_ms = end.map(|dt| dt.timestamp_millis());
1695
1696        for fill in response.fills {
1697            if let Some(start_threshold) = start_ms
1698                && let Ok(fill_ts) = DateTime::parse_from_rfc3339(&fill.fill_time)
1699            {
1700                let fill_ms = fill_ts.timestamp_millis();
1701                if fill_ms < start_threshold {
1702                    continue;
1703                }
1704            }
1705
1706            if let Some(end_threshold) = end_ms
1707                && let Ok(fill_ts) = DateTime::parse_from_rfc3339(&fill.fill_time)
1708            {
1709                let fill_ms = fill_ts.timestamp_millis();
1710                if fill_ms > end_threshold {
1711                    continue;
1712                }
1713            }
1714
1715            if let Some(ref target_id) = instrument_id {
1716                let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1717                if let Some(inst) = instrument
1718                    && inst.raw_symbol().as_str() != fill.symbol
1719                {
1720                    continue;
1721                }
1722            }
1723
1724            if let Some(instrument) = self.get_instrument_by_raw_symbol(&fill.symbol) {
1725                match parse_futures_fill_report(&fill, &instrument, account_id, ts_init) {
1726                    Ok(report) => all_reports.push(report),
1727                    Err(e) => {
1728                        let fill_id = &fill.fill_id;
1729                        log::warn!("Failed to parse futures fill {fill_id}: {e}");
1730                    }
1731                }
1732            }
1733        }
1734
1735        Ok(all_reports)
1736    }
1737
1738    pub async fn request_position_status_reports(
1739        &self,
1740        account_id: AccountId,
1741        instrument_id: Option<InstrumentId>,
1742    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1743        let ts_init = self.generate_ts_init();
1744        let mut all_reports = Vec::new();
1745
1746        let response = self.inner.get_open_positions().await?;
1747        if response.result != KrakenApiResult::Success {
1748            let error_msg = response
1749                .error
1750                .unwrap_or_else(|| "Unknown error".to_string());
1751            anyhow::bail!("Failed to get open positions: {error_msg}");
1752        }
1753
1754        for position in response.open_positions {
1755            if let Some(ref target_id) = instrument_id {
1756                let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1757                if let Some(inst) = instrument
1758                    && inst.raw_symbol().as_str() != position.symbol
1759                {
1760                    continue;
1761                }
1762            }
1763
1764            if let Some(instrument) = self.get_instrument_by_raw_symbol(&position.symbol) {
1765                match parse_futures_position_status_report(
1766                    &position,
1767                    &instrument,
1768                    account_id,
1769                    ts_init,
1770                ) {
1771                    Ok(report) => all_reports.push(report),
1772                    Err(e) => {
1773                        let symbol = &position.symbol;
1774                        log::warn!("Failed to parse futures position {symbol}: {e}");
1775                    }
1776                }
1777            }
1778        }
1779
1780        Ok(all_reports)
1781    }
1782
1783    #[expect(clippy::too_many_arguments)]
1784    fn build_send_order_params(
1785        &self,
1786        instrument_id: InstrumentId,
1787        client_order_id: ClientOrderId,
1788        order_side: OrderSide,
1789        order_type: OrderType,
1790        quantity: Quantity,
1791        time_in_force: TimeInForce,
1792        price: Option<Price>,
1793        trigger_price: Option<Price>,
1794        trigger_type: Option<TriggerType>,
1795        reduce_only: bool,
1796        post_only: bool,
1797    ) -> anyhow::Result<KrakenFuturesSendOrderParams> {
1798        let instrument = self
1799            .get_cached_instrument(&instrument_id.symbol.inner())
1800            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1801
1802        let raw_symbol = instrument.raw_symbol().inner();
1803
1804        // Map order type and time-in-force to Kraken order type
1805        // Kraken Futures encodes TIF in the orderType field:
1806        // - lmt = limit (GTC)
1807        // - ioc = immediate-or-cancel
1808        // - post = post-only (maker only)
1809        // - mkt = market
1810        let kraken_order_type = match order_type {
1811            OrderType::Market => KrakenFuturesOrderType::Market,
1812            OrderType::Limit => {
1813                if post_only {
1814                    KrakenFuturesOrderType::Post
1815                } else {
1816                    match time_in_force {
1817                        TimeInForce::Ioc => KrakenFuturesOrderType::Ioc,
1818                        TimeInForce::Fok => {
1819                            anyhow::bail!("FOK not supported by Kraken Futures, use IOC instead")
1820                        }
1821                        TimeInForce::Gtd => {
1822                            anyhow::bail!("GTD not supported by Kraken Futures, use GTC instead")
1823                        }
1824                        _ => KrakenFuturesOrderType::Limit, // GTC is default
1825                    }
1826                }
1827            }
1828            OrderType::StopMarket | OrderType::StopLimit => KrakenFuturesOrderType::Stop,
1829            OrderType::MarketIfTouched | OrderType::LimitIfTouched => {
1830                KrakenFuturesOrderType::TakeProfit
1831            }
1832            _ => anyhow::bail!("Unsupported order type: {order_type:?}"),
1833        };
1834
1835        let kraken_side: KrakenOrderSide = order_side
1836            .try_into()
1837            .map_err(|e| anyhow::anyhow!("Invalid order side: {e}"))?;
1838
1839        let mut builder = KrakenFuturesSendOrderParamsBuilder::default();
1840        builder
1841            .cli_ord_id(truncate_cl_ord_id(&client_order_id))
1842            .broker(NAUTILUS_KRAKEN_BROKER_ID)
1843            .symbol(raw_symbol)
1844            .side(kraken_side)
1845            .size(quantity.to_string())
1846            .order_type(kraken_order_type);
1847
1848        if matches!(
1849            order_type,
1850            OrderType::StopMarket
1851                | OrderType::StopLimit
1852                | OrderType::MarketIfTouched
1853                | OrderType::LimitIfTouched
1854        ) && let Some(signal) = map_futures_trigger_signal(trigger_type)?
1855        {
1856            builder.trigger_signal(signal);
1857        }
1858
1859        match order_type {
1860            OrderType::StopMarket => {
1861                if let Some(trigger) = trigger_price {
1862                    builder.stop_price(trigger.to_string());
1863                }
1864            }
1865            OrderType::StopLimit => {
1866                if let Some(trigger) = trigger_price {
1867                    builder.stop_price(trigger.to_string());
1868                }
1869
1870                if let Some(limit) = price {
1871                    builder.limit_price(limit.to_string());
1872                }
1873            }
1874            OrderType::MarketIfTouched | OrderType::LimitIfTouched => {
1875                if let Some(trigger) = trigger_price {
1876                    builder.stop_price(trigger.to_string());
1877                }
1878
1879                if let Some(limit) = price {
1880                    builder.limit_price(limit.to_string());
1881                }
1882            }
1883            _ => {
1884                if let Some(limit) = price {
1885                    builder.limit_price(limit.to_string());
1886                }
1887            }
1888        }
1889
1890        if reduce_only {
1891            builder.reduce_only(true);
1892        }
1893
1894        builder
1895            .build()
1896            .map_err(|e| anyhow::anyhow!("Failed to build order params: {e}"))
1897    }
1898
1899    /// Submits a new order to the Kraken Futures exchange.
1900    ///
1901    /// # Errors
1902    ///
1903    /// Returns an error if:
1904    /// - Credentials are missing.
1905    /// - The instrument is not found in cache.
1906    /// - The order type or time in force is not supported.
1907    /// - The request fails.
1908    /// - The order is rejected.
1909    #[expect(clippy::too_many_arguments)]
1910    pub async fn submit_order(
1911        &self,
1912        account_id: AccountId,
1913        instrument_id: InstrumentId,
1914        client_order_id: ClientOrderId,
1915        order_side: OrderSide,
1916        order_type: OrderType,
1917        quantity: Quantity,
1918        time_in_force: TimeInForce,
1919        price: Option<Price>,
1920        trigger_price: Option<Price>,
1921        trigger_type: Option<TriggerType>,
1922        reduce_only: bool,
1923        post_only: bool,
1924    ) -> anyhow::Result<OrderStatusReport> {
1925        let instrument = self
1926            .get_cached_instrument(&instrument_id.symbol.inner())
1927            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1928
1929        let params = self.build_send_order_params(
1930            instrument_id,
1931            client_order_id,
1932            order_side,
1933            order_type,
1934            quantity,
1935            time_in_force,
1936            price,
1937            trigger_price,
1938            trigger_type,
1939            reduce_only,
1940            post_only,
1941        )?;
1942
1943        let response = self.inner.send_order_params(&params).await?;
1944
1945        if response.result != KrakenApiResult::Success {
1946            let error_msg = response
1947                .error
1948                .unwrap_or_else(|| "Unknown error".to_string());
1949            anyhow::bail!("Order submission failed: {error_msg}");
1950        }
1951
1952        let send_status = response
1953            .send_status
1954            .ok_or_else(|| anyhow::anyhow!("No send_status in successful response"))?;
1955
1956        let status = &send_status.status;
1957
1958        // Check for post-only rejection (Kraken returns status="postWouldExecute")
1959        if status == "postWouldExecute" {
1960            let reason = send_status
1961                .order_events
1962                .as_ref()
1963                .and_then(|events| events.first())
1964                .and_then(|e| e.reason.clone())
1965                .unwrap_or_else(|| "Post-only order would have crossed".to_string());
1966            anyhow::bail!("POST_ONLY_REJECTED: {reason}");
1967        }
1968
1969        let venue_order_id = send_status
1970            .order_id
1971            .ok_or_else(|| anyhow::anyhow!("No order_id in send_status: {status}"))?;
1972
1973        let ts_init = self.generate_ts_init();
1974
1975        let open_orders_response = self.inner.get_open_orders().await?;
1976        if let Some(order) = open_orders_response
1977            .open_orders
1978            .iter()
1979            .find(|o| o.order_id == venue_order_id)
1980        {
1981            return parse_futures_order_status_report(order, &instrument, account_id, ts_init);
1982        }
1983
1984        // Order not in open orders - may have filled immediately (market order or aggressive limit)
1985        // Try to use order_events from send_status first
1986        if let Some(order_events) = &send_status.order_events
1987            && let Some(send_event) = order_events.first()
1988        {
1989            // Handle regular orders, trigger orders, and execution events
1990            let event = if let Some(order_data) = &send_event.order {
1991                FuturesOrderEvent {
1992                    order_id: order_data.order_id.clone(),
1993                    cli_ord_id: order_data.cli_ord_id.clone(),
1994                    order_type: order_data.order_type,
1995                    symbol: order_data.symbol.clone(),
1996                    side: order_data.side,
1997                    quantity: order_data.quantity,
1998                    filled: order_data.filled,
1999                    limit_price: order_data.limit_price,
2000                    stop_price: order_data.stop_price,
2001                    timestamp: order_data.timestamp.clone(),
2002                    last_update_timestamp: order_data.last_update_timestamp.clone(),
2003                    reduce_only: order_data.reduce_only,
2004                }
2005            } else if let Some(trigger_data) = &send_event.order_trigger {
2006                FuturesOrderEvent {
2007                    order_id: trigger_data.uid.clone(),
2008                    cli_ord_id: trigger_data.client_id.clone(),
2009                    order_type: trigger_data.order_type,
2010                    symbol: trigger_data.symbol.clone(),
2011                    side: trigger_data.side,
2012                    quantity: trigger_data.quantity,
2013                    filled: 0.0,
2014                    limit_price: trigger_data.limit_price,
2015                    stop_price: Some(trigger_data.trigger_price),
2016                    timestamp: trigger_data.timestamp.clone(),
2017                    last_update_timestamp: trigger_data.last_update_timestamp.clone(),
2018                    reduce_only: trigger_data.reduce_only,
2019                }
2020            } else if let Some(prior_exec) = &send_event.order_prior_execution {
2021                // EXECUTION event - use orderPriorExecution data
2022                FuturesOrderEvent {
2023                    order_id: prior_exec.order_id.clone(),
2024                    cli_ord_id: prior_exec.cli_ord_id.clone(),
2025                    order_type: prior_exec.order_type,
2026                    symbol: prior_exec.symbol.clone(),
2027                    side: prior_exec.side,
2028                    quantity: prior_exec.quantity,
2029                    filled: send_event.amount.unwrap_or(prior_exec.quantity), // Use execution amount
2030                    limit_price: prior_exec.limit_price,
2031                    stop_price: prior_exec.stop_price,
2032                    timestamp: prior_exec.timestamp.clone(),
2033                    last_update_timestamp: prior_exec.last_update_timestamp.clone(),
2034                    reduce_only: prior_exec.reduce_only,
2035                }
2036            } else {
2037                anyhow::bail!("No order, orderTrigger, or orderPriorExecution data in event");
2038            };
2039            return parse_futures_order_event_status_report(
2040                &event,
2041                Some(send_event.event_type),
2042                &instrument,
2043                account_id,
2044                ts_init,
2045            );
2046        }
2047
2048        // Fall back to querying order events
2049        let events_response = self.inner.get_order_events(None, None, None).await?;
2050        let event_wrapper = events_response
2051            .order_events
2052            .iter()
2053            .find(|e| e.order.order_id == venue_order_id)
2054            .ok_or_else(|| {
2055                anyhow::anyhow!("Order not found in open orders or events: {venue_order_id}")
2056            })?;
2057
2058        parse_futures_order_event_status_report(
2059            &event_wrapper.order,
2060            Some(event_wrapper.event_type),
2061            &instrument,
2062            account_id,
2063            ts_init,
2064        )
2065    }
2066
2067    /// Modifies an existing order on the Kraken Futures exchange.
2068    ///
2069    /// Returns the new venue order ID assigned to the modified order.
2070    ///
2071    /// # Errors
2072    ///
2073    /// Returns an error if:
2074    /// - Neither `client_order_id` nor `venue_order_id` is provided.
2075    /// - The instrument is not found in cache.
2076    /// - The request fails.
2077    /// - The edit fails on the exchange.
2078    pub async fn modify_order(
2079        &self,
2080        instrument_id: InstrumentId,
2081        client_order_id: Option<ClientOrderId>,
2082        venue_order_id: Option<VenueOrderId>,
2083        quantity: Option<Quantity>,
2084        price: Option<Price>,
2085        trigger_price: Option<Price>,
2086    ) -> anyhow::Result<VenueOrderId> {
2087        let params = self.build_edit_order_params(
2088            instrument_id,
2089            client_order_id,
2090            venue_order_id,
2091            quantity,
2092            price,
2093            trigger_price,
2094        )?;
2095        let original_order_id = params.order_id.clone();
2096
2097        let response = self.inner.edit_order(&params).await?;
2098
2099        if response.result != KrakenApiResult::Success {
2100            let status = &response.edit_status.status;
2101            anyhow::bail!("Order modification failed: {status}");
2102        }
2103
2104        // Return the new order_id from the response, or fall back to the original
2105        let new_venue_order_id = response
2106            .edit_status
2107            .order_id
2108            .or(original_order_id)
2109            .ok_or_else(|| anyhow::anyhow!("No order ID in edit order response"))?;
2110
2111        Ok(VenueOrderId::new(&new_venue_order_id))
2112    }
2113
2114    /// Cancels an order on the Kraken Futures exchange.
2115    ///
2116    /// # Errors
2117    ///
2118    /// Returns an error if:
2119    /// - Credentials are missing.
2120    /// - Neither client_order_id nor venue_order_id is provided.
2121    /// - The request fails.
2122    /// - The order cancellation is rejected.
2123    pub async fn cancel_order(
2124        &self,
2125        _account_id: AccountId,
2126        instrument_id: InstrumentId,
2127        client_order_id: Option<ClientOrderId>,
2128        venue_order_id: Option<VenueOrderId>,
2129    ) -> anyhow::Result<()> {
2130        let _ = self
2131            .get_cached_instrument(&instrument_id.symbol.inner())
2132            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
2133
2134        let order_id = venue_order_id.as_ref().map(|id| id.to_string());
2135        let cli_ord_id = client_order_id.as_ref().map(truncate_cl_ord_id);
2136
2137        if order_id.is_none() && cli_ord_id.is_none() {
2138            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2139        }
2140
2141        let response = self.inner.cancel_order(order_id, cli_ord_id).await?;
2142
2143        if response.result != KrakenApiResult::Success {
2144            let status = &response.cancel_status.status;
2145            anyhow::bail!("Order cancellation failed: {status}");
2146        }
2147
2148        Ok(())
2149    }
2150
2151    /// Cancels multiple orders on the Kraken Futures exchange.
2152    ///
2153    /// Automatically chunks requests into batches of 50 orders.
2154    ///
2155    /// # Parameters
2156    /// - `venue_order_ids` - List of venue order IDs to cancel.
2157    ///
2158    /// # Returns
2159    /// The total number of successfully cancelled orders.
2160    pub async fn cancel_orders_batch(
2161        &self,
2162        venue_order_ids: Vec<VenueOrderId>,
2163    ) -> anyhow::Result<usize> {
2164        if venue_order_ids.is_empty() {
2165            return Ok(0);
2166        }
2167
2168        let mut total_cancelled = 0;
2169
2170        for chunk in venue_order_ids.chunks(BATCH_CANCEL_LIMIT) {
2171            let order_ids: Vec<String> = chunk.iter().map(|id| id.to_string()).collect();
2172            let response = self.inner.cancel_orders_batch(order_ids).await?;
2173
2174            if response.result != KrakenApiResult::Success {
2175                let error_msg = response.error.as_deref().unwrap_or("Unknown error");
2176                anyhow::bail!("Batch cancel failed: {error_msg}");
2177            }
2178
2179            let success_count = response
2180                .batch_status
2181                .iter()
2182                .filter(|s| {
2183                    s.status == Some(KrakenSendStatus::Cancelled)
2184                        || s.cancel_status
2185                            .as_ref()
2186                            .is_some_and(|cs| cs.status == KrakenSendStatus::Cancelled)
2187                })
2188                .count();
2189
2190            total_cancelled += success_count;
2191        }
2192
2193        Ok(total_cancelled)
2194    }
2195
2196    /// Submits multiple orders in a single batch request.
2197    ///
2198    /// Builds batch send items from order parameters, chunks at the batch limit,
2199    /// and returns per-item send statuses.
2200    ///
2201    /// # Errors
2202    ///
2203    /// Returns an error if the batch request fails at the API level.
2204    #[expect(clippy::type_complexity)]
2205    pub async fn submit_orders_batch(
2206        &self,
2207        orders: Vec<(
2208            InstrumentId,
2209            ClientOrderId,
2210            OrderSide,
2211            OrderType,
2212            Quantity,
2213            TimeInForce,
2214            Option<Price>,
2215            Option<Price>,
2216            Option<TriggerType>,
2217            bool,
2218            bool,
2219        )>,
2220    ) -> anyhow::Result<Vec<FuturesSendStatus>> {
2221        let count = orders.len();
2222        if count == 0 {
2223            return Ok(Vec::new());
2224        }
2225
2226        // Build params per-item, collecting validation errors individually
2227        // so one invalid order does not block the valid ones
2228        let mut all_statuses: Vec<Option<FuturesSendStatus>> = vec![None; count];
2229        let mut valid_items = Vec::with_capacity(count);
2230        let mut valid_indices = Vec::with_capacity(count);
2231
2232        for (
2233            idx,
2234            (
2235                instrument_id,
2236                client_order_id,
2237                order_side,
2238                order_type,
2239                quantity,
2240                time_in_force,
2241                price,
2242                trigger_price,
2243                trigger_type,
2244                reduce_only,
2245                post_only,
2246            ),
2247        ) in orders.into_iter().enumerate()
2248        {
2249            match self.build_send_order_params(
2250                instrument_id,
2251                client_order_id,
2252                order_side,
2253                order_type,
2254                quantity,
2255                time_in_force,
2256                price,
2257                trigger_price,
2258                trigger_type,
2259                reduce_only,
2260                post_only,
2261            ) {
2262                Ok(params) => {
2263                    valid_items.push(KrakenFuturesBatchSendItem::from_params(
2264                        params,
2265                        idx.to_string(),
2266                    ));
2267                    valid_indices.push(idx);
2268                }
2269                Err(e) => {
2270                    all_statuses[idx] = Some(FuturesSendStatus {
2271                        order_id: None,
2272                        status: format!("validation_error: {e}"),
2273                        order_events: None,
2274                        cli_ord_id: None,
2275                        received_time: None,
2276                    });
2277                }
2278            }
2279        }
2280
2281        if valid_items.is_empty() {
2282            return Ok(all_statuses.into_iter().flatten().collect());
2283        }
2284
2285        let mut batch_statuses: Vec<FuturesSendStatus> = Vec::with_capacity(valid_items.len());
2286
2287        for chunk in valid_items.chunks(BATCH_ORDER_LIMIT) {
2288            match self.inner.submit_orders_batch(chunk.to_vec()).await {
2289                Ok(response) => {
2290                    if response.result == KrakenApiResult::Success {
2291                        batch_statuses.extend(response.batch_status);
2292                    } else {
2293                        let error_msg = response
2294                            .batch_status
2295                            .first()
2296                            .map_or("Unknown error", |s| s.status.as_str());
2297
2298                        for _ in 0..chunk.len() {
2299                            batch_statuses.push(FuturesSendStatus {
2300                                order_id: None,
2301                                status: format!("api_error: {error_msg}"),
2302                                order_events: None,
2303                                cli_ord_id: None,
2304                                received_time: None,
2305                            });
2306                        }
2307                    }
2308                }
2309                Err(e) => {
2310                    // Fill remaining valid items with error statuses
2311                    let remaining = valid_items.len() - batch_statuses.len();
2312                    for _ in 0..remaining {
2313                        batch_statuses.push(FuturesSendStatus {
2314                            order_id: None,
2315                            status: format!("batch_error: {e}"),
2316                            order_events: None,
2317                            cli_ord_id: None,
2318                            received_time: None,
2319                        });
2320                    }
2321                    break;
2322                }
2323            }
2324        }
2325
2326        // Map batch statuses back to original order positions
2327        for (batch_idx, &original_idx) in valid_indices.iter().enumerate() {
2328            if let Some(status) = batch_statuses.get(batch_idx) {
2329                all_statuses[original_idx] = Some(status.clone());
2330            }
2331        }
2332
2333        Ok(all_statuses.into_iter().flatten().collect())
2334    }
2335
2336    /// Modifies multiple orders in a single batch request.
2337    #[expect(clippy::type_complexity)]
2338    pub async fn edit_orders_batch(
2339        &self,
2340        orders: Vec<(
2341            InstrumentId,
2342            Option<ClientOrderId>,
2343            Option<VenueOrderId>,
2344            Option<Quantity>,
2345            Option<Price>,
2346            Option<Price>,
2347        )>,
2348    ) -> anyhow::Result<Vec<String>> {
2349        let count = orders.len();
2350        if count == 0 {
2351            return Ok(Vec::new());
2352        }
2353
2354        let mut all_statuses: Vec<Option<String>> = vec![None; count];
2355        let mut valid_items = Vec::with_capacity(count);
2356        let mut valid_indices = Vec::with_capacity(count);
2357
2358        for (
2359            idx,
2360            (instrument_id, client_order_id, venue_order_id, quantity, price, trigger_price),
2361        ) in orders.into_iter().enumerate()
2362        {
2363            match self.build_edit_order_params(
2364                instrument_id,
2365                client_order_id,
2366                venue_order_id,
2367                quantity,
2368                price,
2369                trigger_price,
2370            ) {
2371                Ok(params) => {
2372                    valid_items.push(KrakenFuturesBatchEditItem::from_params(
2373                        params,
2374                        idx.to_string(),
2375                    ));
2376                    valid_indices.push(idx);
2377                }
2378                Err(e) => {
2379                    all_statuses[idx] = Some(format!("validation_error: {e}"));
2380                }
2381            }
2382        }
2383
2384        if valid_items.is_empty() {
2385            return Ok(all_statuses.into_iter().flatten().collect());
2386        }
2387
2388        let mut batch_statuses: Vec<String> = Vec::with_capacity(valid_items.len());
2389
2390        for chunk in valid_items.chunks(BATCH_ORDER_LIMIT) {
2391            match self.inner.edit_orders_batch(chunk.to_vec()).await {
2392                Ok(response) => {
2393                    if response.result == KrakenApiResult::Success {
2394                        batch_statuses.extend(response.batch_status.into_iter().map(|s| s.status));
2395                    } else {
2396                        let error_msg = response
2397                            .batch_status
2398                            .first()
2399                            .map_or("Unknown error", |s| s.status.as_str());
2400
2401                        for _ in 0..chunk.len() {
2402                            batch_statuses.push(format!("api_error: {error_msg}"));
2403                        }
2404                    }
2405                }
2406                Err(e) => {
2407                    let remaining = valid_items.len() - batch_statuses.len();
2408                    for _ in 0..remaining {
2409                        batch_statuses.push(format!("batch_error: {e}"));
2410                    }
2411                    break;
2412                }
2413            }
2414        }
2415
2416        for (batch_idx, &original_idx) in valid_indices.iter().enumerate() {
2417            if let Some(status) = batch_statuses.get(batch_idx) {
2418                all_statuses[original_idx] = Some(status.clone());
2419            }
2420        }
2421
2422        Ok(all_statuses.into_iter().flatten().collect())
2423    }
2424
2425    fn build_edit_order_params(
2426        &self,
2427        instrument_id: InstrumentId,
2428        client_order_id: Option<ClientOrderId>,
2429        venue_order_id: Option<VenueOrderId>,
2430        quantity: Option<Quantity>,
2431        price: Option<Price>,
2432        trigger_price: Option<Price>,
2433    ) -> anyhow::Result<KrakenFuturesEditOrderParams> {
2434        let _ = self
2435            .get_cached_instrument(&instrument_id.symbol.inner())
2436            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
2437
2438        let order_id = venue_order_id.as_ref().map(|id| id.to_string());
2439        let cli_ord_id = client_order_id.as_ref().map(truncate_cl_ord_id);
2440
2441        if order_id.is_none() && cli_ord_id.is_none() {
2442            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2443        }
2444
2445        let mut builder = KrakenFuturesEditOrderParamsBuilder::default();
2446
2447        if let Some(ref id) = order_id {
2448            builder.order_id(id.clone());
2449        }
2450
2451        if let Some(ref id) = cli_ord_id {
2452            builder.cli_ord_id(id.clone());
2453        }
2454
2455        if let Some(qty) = quantity {
2456            builder.size(qty.to_string());
2457        }
2458
2459        if let Some(p) = price {
2460            builder.limit_price(p.to_string());
2461        }
2462
2463        if let Some(tp) = trigger_price {
2464            builder.stop_price(tp.to_string());
2465        }
2466
2467        builder
2468            .build()
2469            .map_err(|e| anyhow::anyhow!("Failed to build edit order params: {e}"))
2470    }
2471}
2472
2473fn map_futures_trigger_signal(
2474    trigger_type: Option<TriggerType>,
2475) -> anyhow::Result<Option<KrakenTriggerSignal>> {
2476    match trigger_type {
2477        None => Ok(None),
2478        Some(TriggerType::Default | TriggerType::LastPrice) => Ok(Some(KrakenTriggerSignal::Last)),
2479        Some(TriggerType::MarkPrice) => Ok(Some(KrakenTriggerSignal::Mark)),
2480        Some(TriggerType::IndexPrice) => Ok(Some(KrakenTriggerSignal::Index)),
2481        Some(other) => anyhow::bail!(
2482            "Unsupported trigger type for Kraken Futures: {other:?} (only LastPrice, MarkPrice, and IndexPrice supported)"
2483        ),
2484    }
2485}
2486
2487fn parse_multi_collateral_balances(account: &FuturesAccount, balances: &mut Vec<AccountBalance>) {
2488    for (currency_code, currency_info) in &account.currencies {
2489        if currency_info.quantity == 0.0 {
2490            continue;
2491        }
2492
2493        let currency = Currency::new(
2494            currency_code.as_str(),
2495            8,
2496            0,
2497            currency_code.as_str(),
2498            CurrencyType::Crypto,
2499        );
2500
2501        let total_amount = currency_info.quantity;
2502        let available_amount = currency_info.available.unwrap_or(total_amount);
2503        let locked_amount = total_amount - available_amount;
2504
2505        push_balance_from_f64(
2506            balances,
2507            total_amount,
2508            locked_amount,
2509            currency,
2510            currency_code,
2511        );
2512    }
2513
2514    // Multi-collateral accounts track margin in USD even though the
2515    // actual collateral is held in various crypto currencies.
2516    if let Some(portfolio_value) = account.portfolio_value
2517        && portfolio_value > 0.0
2518    {
2519        let usd_currency = Currency::USD();
2520        let available_usd = account.available_margin.unwrap_or(portfolio_value);
2521        let locked_usd = portfolio_value - available_usd;
2522
2523        push_balance_from_f64(balances, portfolio_value, locked_usd, usd_currency, "USD");
2524    }
2525}
2526
2527// Kraken Futures serves balances as JSON numbers, which serde already parsed to
2528// f64. Converting to Decimal here just moves the value into the fixed-point
2529// constructor; it does not recover any precision lost at the wire parse.
2530fn push_balance_from_f64(
2531    balances: &mut Vec<AccountBalance>,
2532    total: f64,
2533    locked: f64,
2534    currency: Currency,
2535    ccy_label: &str,
2536) {
2537    let Some(total_dec) = Decimal::from_f64(total) else {
2538        log::warn!("Skipping {ccy_label} balance: non-finite total {total}");
2539        return;
2540    };
2541    let Some(locked_dec) = Decimal::from_f64(locked) else {
2542        log::warn!("Skipping {ccy_label} balance: non-finite locked {locked}");
2543        return;
2544    };
2545
2546    match AccountBalance::from_total_and_locked(total_dec, locked_dec, currency) {
2547        Ok(balance) => balances.push(balance),
2548        Err(e) => log::warn!("Skipping {ccy_label} balance: {e}"),
2549    }
2550}
2551
2552fn parse_multi_collateral_margins(account: &FuturesAccount, margins: &mut Vec<MarginBalance>) {
2553    if let Some(initial_margin) = account.initial_margin
2554        && initial_margin > 0.0
2555    {
2556        let usd_currency = Currency::USD();
2557        let maintenance = account
2558            .margin_requirements
2559            .as_ref()
2560            .and_then(|mr| mr.mm)
2561            .unwrap_or(0.0);
2562        // Kraken Futures reports cross-margin aggregates in USD; emit as an
2563        // account-wide entry keyed by USD.
2564        margins.push(MarginBalance::new(
2565            Money::new(initial_margin, usd_currency),
2566            Money::new(maintenance, usd_currency),
2567            None,
2568        ));
2569    }
2570}
2571
2572fn parse_margin_account_balances(account: &FuturesAccount, balances: &mut Vec<AccountBalance>) {
2573    for (currency_code, &amount) in &account.balances {
2574        if amount == 0.0 {
2575            continue;
2576        }
2577
2578        let currency = Currency::new(
2579            currency_code.as_str(),
2580            8,
2581            0,
2582            currency_code.as_str(),
2583            CurrencyType::Crypto,
2584        );
2585
2586        let available = account
2587            .auxiliary
2588            .as_ref()
2589            .and_then(|aux| aux.af)
2590            .unwrap_or(amount);
2591        let locked = amount - available;
2592
2593        push_balance_from_f64(balances, amount, locked, currency, currency_code);
2594    }
2595}
2596
2597fn parse_margin_account_margins(account: &FuturesAccount, margins: &mut Vec<MarginBalance>) {
2598    if let Some(ref mr) = account.margin_requirements {
2599        let im = mr.im.unwrap_or(0.0);
2600        let mm = mr.mm.unwrap_or(0.0);
2601        if im > 0.0 || mm > 0.0 {
2602            let usd_currency = Currency::USD();
2603            margins.push(MarginBalance::new(
2604                Money::new(im, usd_currency),
2605                Money::new(mm, usd_currency),
2606                None,
2607            ));
2608        }
2609    }
2610}
2611
2612fn parse_cash_account_balances(account: &FuturesAccount, balances: &mut Vec<AccountBalance>) {
2613    for (currency_code, &amount) in &account.balances {
2614        if amount == 0.0 {
2615            continue;
2616        }
2617
2618        let currency = Currency::new(
2619            currency_code.as_str(),
2620            8,
2621            0,
2622            currency_code.as_str(),
2623            CurrencyType::Crypto,
2624        );
2625
2626        push_balance_from_f64(balances, amount, 0.0, currency, currency_code);
2627    }
2628}
2629
2630#[cfg(test)]
2631mod tests {
2632    use ahash::AHashMap;
2633    use nautilus_model::instruments::CryptoPerpetual;
2634    use rstest::rstest;
2635
2636    use super::*;
2637
2638    #[rstest]
2639    fn test_raw_client_creation() {
2640        let client = KrakenFuturesRawHttpClient::default();
2641        assert!(client.credential.is_none());
2642        assert!(client.base_url().contains("futures"));
2643    }
2644
2645    #[rstest]
2646    fn test_raw_client_with_credentials() {
2647        let client = KrakenFuturesRawHttpClient::with_credentials(
2648            "test_key".to_string(),
2649            "test_secret".to_string(),
2650            KrakenEnvironment::Mainnet,
2651            None,
2652            60,
2653            None,
2654            None,
2655            None,
2656            None,
2657            KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND,
2658        )
2659        .unwrap();
2660        assert!(client.credential.is_some());
2661    }
2662
2663    #[rstest]
2664    fn test_client_creation() {
2665        let client = KrakenFuturesHttpClient::default();
2666        assert!(client.instruments_cache.is_empty());
2667    }
2668
2669    #[rstest]
2670    fn test_client_with_credentials() {
2671        let client = KrakenFuturesHttpClient::with_credentials(
2672            "test_key".to_string(),
2673            "test_secret".to_string(),
2674            KrakenEnvironment::Mainnet,
2675            None,
2676            60,
2677            None,
2678            None,
2679            None,
2680            None,
2681            KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND,
2682        )
2683        .unwrap();
2684        assert!(client.instruments_cache.is_empty());
2685    }
2686
2687    #[rstest]
2688    fn test_parse_multi_collateral_margins() {
2689        let account = FuturesAccount {
2690            account_type: KrakenFuturesAccountType::MultiCollateralMarginAccount,
2691            balances: AHashMap::new(),
2692            currencies: AHashMap::new(),
2693            auxiliary: None,
2694            margin_requirements: Some(FuturesMarginRequirements {
2695                im: Some(500.0),
2696                mm: Some(250.0),
2697                lt: None,
2698                tt: None,
2699            }),
2700            portfolio_value: Some(10000.0),
2701            available_margin: Some(9500.0),
2702            initial_margin: Some(500.0),
2703            pnl: None,
2704        };
2705
2706        let mut margins = Vec::new();
2707        parse_multi_collateral_margins(&account, &mut margins);
2708
2709        assert_eq!(margins.len(), 1);
2710        let margin = &margins[0];
2711        assert!(margin.instrument_id.is_none());
2712        assert_eq!(margin.currency.code.as_str(), "USD");
2713        assert_eq!(margin.initial.as_f64(), 500.0);
2714        assert_eq!(margin.maintenance.as_f64(), 250.0);
2715    }
2716
2717    #[rstest]
2718    fn test_parse_multi_collateral_margins_zero_skipped() {
2719        let account = FuturesAccount {
2720            account_type: KrakenFuturesAccountType::MultiCollateralMarginAccount,
2721            balances: AHashMap::new(),
2722            currencies: AHashMap::new(),
2723            auxiliary: None,
2724            margin_requirements: None,
2725            portfolio_value: None,
2726            available_margin: None,
2727            initial_margin: Some(0.0),
2728            pnl: None,
2729        };
2730
2731        let mut margins = Vec::new();
2732        parse_multi_collateral_margins(&account, &mut margins);
2733
2734        assert_eq!(margins.len(), 0);
2735    }
2736
2737    #[rstest]
2738    fn test_parse_margin_account_margins() {
2739        let account = FuturesAccount {
2740            account_type: KrakenFuturesAccountType::MarginAccount,
2741            balances: AHashMap::new(),
2742            currencies: AHashMap::new(),
2743            auxiliary: None,
2744            margin_requirements: Some(FuturesMarginRequirements {
2745                im: Some(100.0),
2746                mm: Some(50.0),
2747                lt: None,
2748                tt: None,
2749            }),
2750            portfolio_value: None,
2751            available_margin: None,
2752            initial_margin: None,
2753            pnl: None,
2754        };
2755
2756        let mut margins = Vec::new();
2757        parse_margin_account_margins(&account, &mut margins);
2758
2759        assert_eq!(margins.len(), 1);
2760        let margin = &margins[0];
2761        assert_eq!(margin.initial.as_f64(), 100.0);
2762        assert_eq!(margin.maintenance.as_f64(), 50.0);
2763    }
2764
2765    #[rstest]
2766    fn test_parse_margin_account_margins_no_requirements() {
2767        let account = FuturesAccount {
2768            account_type: KrakenFuturesAccountType::MarginAccount,
2769            balances: AHashMap::new(),
2770            currencies: AHashMap::new(),
2771            auxiliary: None,
2772            margin_requirements: None,
2773            portfolio_value: None,
2774            available_margin: None,
2775            initial_margin: None,
2776            pnl: None,
2777        };
2778
2779        let mut margins = Vec::new();
2780        parse_margin_account_margins(&account, &mut margins);
2781
2782        assert_eq!(margins.len(), 0);
2783    }
2784
2785    #[rstest]
2786    fn test_parse_multi_collateral_balances() {
2787        let mut currencies = AHashMap::new();
2788        currencies.insert(
2789            "BTC".to_string(),
2790            FuturesFlexCurrency {
2791                quantity: 1.5,
2792                value: None,
2793                collateral: None,
2794                available: Some(1.2),
2795            },
2796        );
2797
2798        let account = FuturesAccount {
2799            account_type: KrakenFuturesAccountType::MultiCollateralMarginAccount,
2800            balances: AHashMap::new(),
2801            currencies,
2802            auxiliary: None,
2803            margin_requirements: None,
2804            portfolio_value: Some(50000.0),
2805            available_margin: Some(45000.0),
2806            initial_margin: None,
2807            pnl: None,
2808        };
2809
2810        let mut balances = Vec::new();
2811        parse_multi_collateral_balances(&account, &mut balances);
2812
2813        // BTC balance + USD portfolio balance
2814        assert_eq!(balances.len(), 2);
2815    }
2816
2817    #[rstest]
2818    fn test_parse_margin_account_balances_free_is_derived_from_total_minus_locked() {
2819        // Regression: `free` must be derived via Money fixed-point subtraction so
2820        // the `AccountBalance` invariant `total == locked + free` holds exactly,
2821        // rather than using the raw Kraken `af` (available funds) value which
2822        // can drift at the currency precision and violate the invariant in
2823        // `AccountBalance::new_checked`.
2824        let mut bals = AHashMap::new();
2825        // Values chosen so that Kraken's raw `af` rounds independently from
2826        // `amount - af` at currency precision 8, producing a drifted sum when
2827        // `free` is set directly from `af` instead of derived from `total - locked`.
2828        // With these f64 values (constructed via arithmetic to hit precise bit
2829        // patterns): round(amount * 1e8) = 1_000_000_003, round(af * 1e8) = 4,
2830        // and round((amount - af) * 1e8) = 1_000_000_000, so 4 + 1_000_000_000
2831        // != 1_000_000_003 and the old parse path violates the invariant.
2832        let af_f = 35.0_f64 * 1e-9;
2833        let amount_f = 10.0_f64 + af_f;
2834        bals.insert("XBT".to_string(), amount_f);
2835
2836        let account = FuturesAccount {
2837            account_type: KrakenFuturesAccountType::MarginAccount,
2838            balances: bals,
2839            currencies: AHashMap::new(),
2840            auxiliary: Some(FuturesAuxiliary {
2841                usd: None,
2842                pv: None,
2843                pnl: None,
2844                af: Some(af_f),
2845                funding: None,
2846            }),
2847            margin_requirements: None,
2848            portfolio_value: None,
2849            available_margin: None,
2850            initial_margin: None,
2851            pnl: None,
2852        };
2853
2854        let mut balances = Vec::new();
2855        parse_margin_account_balances(&account, &mut balances);
2856
2857        assert_eq!(balances.len(), 1);
2858        let balance = &balances[0];
2859        // Invariant: total == locked + free (enforced by AccountBalance::new_checked,
2860        // but assert here to pin the derivation property at the parse site).
2861        assert_eq!(balance.total, balance.locked + balance.free);
2862        // Free is the derived side (total - locked), not the raw `af` value.
2863        assert_eq!(balance.free, balance.total - balance.locked);
2864    }
2865
2866    #[rstest]
2867    #[case::nan_total(f64::NAN, 0.0)]
2868    #[case::infinity_total(f64::INFINITY, 0.0)]
2869    #[case::neg_infinity_total(f64::NEG_INFINITY, 0.0)]
2870    #[case::nan_locked(1.0, f64::NAN)]
2871    #[case::infinity_locked(1.0, f64::INFINITY)]
2872    fn test_push_balance_from_f64_skips_non_finite(#[case] total: f64, #[case] locked: f64) {
2873        let currency = Currency::new("BTC", 8, 0, "BTC", CurrencyType::Crypto);
2874        let mut balances = Vec::new();
2875
2876        push_balance_from_f64(&mut balances, total, locked, currency, "BTC");
2877
2878        assert!(balances.is_empty());
2879    }
2880
2881    #[rstest]
2882    fn test_parse_cash_account_balances() {
2883        let mut bals = AHashMap::new();
2884        bals.insert("ETH".to_string(), 10.0);
2885        bals.insert("BTC".to_string(), 0.0); // zero, should be skipped
2886
2887        let account = FuturesAccount {
2888            account_type: KrakenFuturesAccountType::CashAccount,
2889            balances: bals,
2890            currencies: AHashMap::new(),
2891            auxiliary: None,
2892            margin_requirements: None,
2893            portfolio_value: None,
2894            available_margin: None,
2895            initial_margin: None,
2896            pnl: None,
2897        };
2898
2899        let mut balances = Vec::new();
2900        parse_cash_account_balances(&account, &mut balances);
2901
2902        assert_eq!(balances.len(), 1);
2903        let balance = &balances[0];
2904        assert_eq!(balance.total.as_f64(), 10.0);
2905        assert_eq!(balance.locked.as_f64(), 0.0);
2906    }
2907
2908    #[rstest]
2909    #[case(None, None)]
2910    #[case(Some(TriggerType::Default), Some(KrakenTriggerSignal::Last))]
2911    #[case(Some(TriggerType::LastPrice), Some(KrakenTriggerSignal::Last))]
2912    #[case(Some(TriggerType::MarkPrice), Some(KrakenTriggerSignal::Mark))]
2913    #[case(Some(TriggerType::IndexPrice), Some(KrakenTriggerSignal::Index))]
2914    fn test_build_send_order_params_maps_supported_trigger_signals(
2915        #[case] trigger_type: Option<TriggerType>,
2916        #[case] expected_signal: Option<KrakenTriggerSignal>,
2917    ) {
2918        let client = KrakenFuturesHttpClient::default();
2919        let instrument_id = cache_test_futures_instrument(&client);
2920
2921        let params = client
2922            .build_send_order_params(
2923                instrument_id,
2924                ClientOrderId::new("futures-trigger"),
2925                OrderSide::Buy,
2926                OrderType::StopMarket,
2927                Quantity::from("1"),
2928                TimeInForce::Gtc,
2929                None,
2930                Some(Price::from("45000")),
2931                trigger_type,
2932                false,
2933                false,
2934            )
2935            .unwrap();
2936
2937        assert_eq!(params.trigger_signal, expected_signal);
2938    }
2939
2940    #[rstest]
2941    fn test_build_send_order_params_rejects_unsupported_trigger_signal() {
2942        let client = KrakenFuturesHttpClient::default();
2943        let instrument_id = cache_test_futures_instrument(&client);
2944
2945        let error = client
2946            .build_send_order_params(
2947                instrument_id,
2948                ClientOrderId::new("futures-trigger-invalid"),
2949                OrderSide::Buy,
2950                OrderType::StopMarket,
2951                Quantity::from("1"),
2952                TimeInForce::Gtc,
2953                None,
2954                Some(Price::from("45000")),
2955                Some(TriggerType::BidAsk),
2956                false,
2957                false,
2958            )
2959            .unwrap_err();
2960
2961        assert!(
2962            error
2963                .to_string()
2964                .contains("Unsupported trigger type for Kraken Futures")
2965        );
2966    }
2967
2968    fn cache_test_futures_instrument(client: &KrakenFuturesHttpClient) -> InstrumentId {
2969        let instrument_id = InstrumentId::from("PF_XBTUSD.KRAKEN");
2970
2971        client.cache_instrument(InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2972            instrument_id,
2973            Symbol::new("PF_XBTUSD"),
2974            Currency::BTC(),
2975            Currency::USD(),
2976            Currency::USD(),
2977            false,
2978            0,
2979            4,
2980            Price::from("1"),
2981            Quantity::from("0.0001"),
2982            None,
2983            None,
2984            None,
2985            None,
2986            None,
2987            None,
2988            None,
2989            None,
2990            None,
2991            None,
2992            None,
2993            None,
2994            None,
2995            0.into(),
2996            0.into(),
2997        )));
2998
2999        instrument_id
3000    }
3001}