Skip to main content

nautilus_kraken/http/spot/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! HTTP client for the Kraken Spot REST API.
17
18use std::{
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroU32,
22    sync::{
23        Arc, RwLock,
24        atomic::{AtomicBool, Ordering},
25    },
26};
27
28use ahash::AHashMap;
29use chrono::{DateTime, Utc};
30use indexmap::IndexMap;
31use nautilus_core::{
32    AtomicMap, AtomicTime, UUID4, consts::NAUTILUS_USER_AGENT, datetime::NANOSECONDS_IN_SECOND,
33    nanos::UnixNanos, time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36    data::{Bar, BarType, BookOrder, TradeTick},
37    enums::{
38        AccountType, BookType, CurrencyType, MarketStatusAction, OrderSide, OrderType,
39        PositionSideSpecified, TimeInForce, TriggerType,
40    },
41    events::AccountState,
42    identifiers::{AccountId, ClientOrderId, InstrumentId, Symbol, VenueOrderId},
43    instruments::{Instrument, InstrumentAny},
44    orderbook::OrderBook,
45    reports::{FillReport, OrderStatusReport, PositionStatusReport},
46    types::{AccountBalance, Currency, Price, Quantity},
47};
48use nautilus_network::{
49    http::{HttpClient, Method, USER_AGENT},
50    ratelimiter::quota::Quota,
51    retry::{RetryConfig, RetryManager},
52};
53use rust_decimal::Decimal;
54use serde::de::DeserializeOwned;
55use tokio_util::sync::CancellationToken;
56use ustr::Ustr;
57
58use super::{models::*, query::*};
59use crate::{
60    common::{
61        consts::{
62            KRAKEN_OFLAG_POST_ONLY, KRAKEN_OFLAG_QUOTE_QUANTITY, KRAKEN_VENUE,
63            NAUTILUS_KRAKEN_BROKER_ID,
64        },
65        credential::KrakenCredential,
66        enums::{
67            KrakenAssetClass, KrakenEnvironment, KrakenOrderSide, KrakenOrderType,
68            KrakenProductType,
69        },
70        parse::{
71            bar_type_to_spot_interval, normalize_currency_code, normalize_spot_symbol, parse_bar,
72            parse_fill_report, parse_order_status_report, parse_spot_instrument,
73            parse_tokenized_instrument, parse_trade_tick_from_array, truncate_cl_ord_id,
74        },
75        urls::get_kraken_http_base_url,
76    },
77    http::error::KrakenHttpError,
78};
79
80/// Default Kraken Spot REST API rate limit (requests per second).
81pub const KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 5;
82
83const KRAKEN_GLOBAL_RATE_KEY: &str = "kraken:spot:global";
84
85/// Maximum orders per batch cancel request for Kraken Spot API.
86const BATCH_CANCEL_LIMIT: usize = 50;
87
88/// Maximum orders per batch submit request for Kraken Spot API.
89const BATCH_SUBMIT_LIMIT: usize = 15;
90
91/// Computes the time-in-force and expiration time parameters for Kraken Spot orders.
92///
93/// Returns a tuple of (timeinforce, expiretm) for use in order requests.
94/// For limit orders, handles GTC, IOC, and GTD. Market orders return (None, None).
95fn compute_time_in_force(
96    is_limit_order: bool,
97    time_in_force: TimeInForce,
98    expire_time: Option<UnixNanos>,
99) -> anyhow::Result<(Option<String>, Option<String>)> {
100    if is_limit_order {
101        match time_in_force {
102            TimeInForce::Gtc => Ok((None, None)), // Default, no parameter needed
103            TimeInForce::Ioc => Ok((Some("IOC".to_string()), None)),
104            TimeInForce::Fok => Ok((Some("FOK".to_string()), None)),
105            TimeInForce::Gtd => {
106                let expire = expire_time.ok_or_else(|| {
107                    anyhow::anyhow!("GTD time in force requires expire_time parameter")
108                })?;
109                // Convert nanoseconds to seconds for Kraken API
110                let expire_secs = expire.as_u64() / NANOSECONDS_IN_SECOND;
111                Ok((Some("GTD".to_string()), Some(expire_secs.to_string())))
112            }
113            _ => anyhow::bail!("Unsupported time in force: {time_in_force:?}"),
114        }
115    } else {
116        // Market orders are inherently immediate, timeinforce not applicable
117        Ok((None, None))
118    }
119}
120
121/// Raw HTTP client for low-level Kraken Spot API operations.
122///
123/// This client handles request/response operations with the Kraken Spot API,
124/// returning venue-specific response types. It does not parse to Nautilus domain types.
125pub struct KrakenSpotRawHttpClient {
126    base_url: String,
127    client: HttpClient,
128    credential: Option<KrakenCredential>,
129    retry_manager: RetryManager<KrakenHttpError>,
130    cancellation_token: CancellationToken,
131    clock: &'static AtomicTime,
132    /// Mutex to serialize authenticated requests, ensuring nonces arrive at Kraken in order
133    auth_mutex: tokio::sync::Mutex<()>,
134}
135
136impl Default for KrakenSpotRawHttpClient {
137    fn default() -> Self {
138        Self::new(
139            KrakenEnvironment::Mainnet,
140            None,
141            60,
142            None,
143            None,
144            None,
145            None,
146            KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND,
147        )
148        .expect("Failed to create default KrakenSpotRawHttpClient")
149    }
150}
151
152impl Debug for KrakenSpotRawHttpClient {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        f.debug_struct(stringify!(KrakenSpotRawHttpClient))
155            .field("base_url", &self.base_url)
156            .field("has_credentials", &self.credential.is_some())
157            .finish()
158    }
159}
160
161impl KrakenSpotRawHttpClient {
162    /// Creates a new [`KrakenSpotRawHttpClient`].
163    #[expect(clippy::too_many_arguments)]
164    pub fn new(
165        environment: KrakenEnvironment,
166        base_url_override: Option<String>,
167        timeout_secs: u64,
168        max_retries: Option<u32>,
169        retry_delay_ms: Option<u64>,
170        retry_delay_max_ms: Option<u64>,
171        proxy_url: Option<String>,
172        max_requests_per_second: u32,
173    ) -> anyhow::Result<Self> {
174        let retry_config = RetryConfig {
175            max_retries: max_retries.unwrap_or(3),
176            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
177            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
178            backoff_factor: 2.0,
179            jitter_ms: 1000,
180            operation_timeout_ms: Some(60_000),
181            immediate_first: false,
182            max_elapsed_ms: Some(180_000),
183        };
184
185        let retry_manager = RetryManager::new(retry_config);
186        let base_url = base_url_override.unwrap_or_else(|| {
187            get_kraken_http_base_url(KrakenProductType::Spot, environment).to_string()
188        });
189
190        Ok(Self {
191            base_url,
192            client: HttpClient::new(
193                Self::default_headers(),
194                vec![],
195                Self::rate_limiter_quotas(max_requests_per_second)?,
196                Some(Self::default_quota(max_requests_per_second)?),
197                Some(timeout_secs),
198                proxy_url,
199            )
200            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
201            credential: None,
202            retry_manager,
203            cancellation_token: CancellationToken::new(),
204            clock: get_atomic_clock_realtime(),
205            auth_mutex: tokio::sync::Mutex::new(()),
206        })
207    }
208
209    /// Creates a new [`KrakenSpotRawHttpClient`] with credentials.
210    #[expect(clippy::too_many_arguments)]
211    pub fn with_credentials(
212        api_key: String,
213        api_secret: String,
214        environment: KrakenEnvironment,
215        base_url_override: Option<String>,
216        timeout_secs: u64,
217        max_retries: Option<u32>,
218        retry_delay_ms: Option<u64>,
219        retry_delay_max_ms: Option<u64>,
220        proxy_url: Option<String>,
221        max_requests_per_second: u32,
222    ) -> anyhow::Result<Self> {
223        let retry_config = RetryConfig {
224            max_retries: max_retries.unwrap_or(3),
225            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
226            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
227            backoff_factor: 2.0,
228            jitter_ms: 1000,
229            operation_timeout_ms: Some(60_000),
230            immediate_first: false,
231            max_elapsed_ms: Some(180_000),
232        };
233
234        let retry_manager = RetryManager::new(retry_config);
235        let base_url = base_url_override.unwrap_or_else(|| {
236            get_kraken_http_base_url(KrakenProductType::Spot, environment).to_string()
237        });
238
239        Ok(Self {
240            base_url,
241            client: HttpClient::new(
242                Self::default_headers(),
243                vec![],
244                Self::rate_limiter_quotas(max_requests_per_second)?,
245                Some(Self::default_quota(max_requests_per_second)?),
246                Some(timeout_secs),
247                proxy_url,
248            )
249            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
250            credential: Some(KrakenCredential::new(api_key, api_secret)),
251            retry_manager,
252            cancellation_token: CancellationToken::new(),
253            clock: get_atomic_clock_realtime(),
254            auth_mutex: tokio::sync::Mutex::new(()),
255        })
256    }
257
258    /// Generates a unique nonce for Kraken Spot API requests.
259    ///
260    /// Uses `AtomicTime` for strict monotonicity. The nanosecond timestamp
261    /// guarantees uniqueness even for rapid consecutive calls.
262    fn generate_nonce(&self) -> u64 {
263        self.clock.get_time_ns().as_u64()
264    }
265
266    /// Returns the base URL for this client.
267    pub fn base_url(&self) -> &str {
268        &self.base_url
269    }
270
271    /// Returns the credential for this client, if set.
272    pub fn credential(&self) -> Option<&KrakenCredential> {
273        self.credential.as_ref()
274    }
275
276    /// Cancels all pending HTTP requests.
277    pub fn cancel_all_requests(&self) {
278        self.cancellation_token.cancel();
279    }
280
281    /// Returns the cancellation token for this client.
282    pub fn cancellation_token(&self) -> &CancellationToken {
283        &self.cancellation_token
284    }
285
286    fn default_headers() -> HashMap<String, String> {
287        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
288    }
289
290    fn default_quota(max_requests_per_second: u32) -> anyhow::Result<Quota> {
291        let burst = NonZeroU32::new(max_requests_per_second).unwrap_or(
292            NonZeroU32::new(KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND).expect("non-zero"),
293        );
294        Quota::per_second(burst).ok_or_else(|| {
295            anyhow::anyhow!(
296                "Invalid max_requests_per_second: {max_requests_per_second} exceeds maximum"
297            )
298        })
299    }
300
301    fn rate_limiter_quotas(max_requests_per_second: u32) -> anyhow::Result<Vec<(String, Quota)>> {
302        Ok(vec![(
303            KRAKEN_GLOBAL_RATE_KEY.to_string(),
304            Self::default_quota(max_requests_per_second)?,
305        )])
306    }
307
308    fn rate_limit_keys(endpoint: &str) -> Vec<String> {
309        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
310        let route = format!("kraken:spot:{normalized}");
311        vec![KRAKEN_GLOBAL_RATE_KEY.to_string(), route]
312    }
313
314    fn sign_spot(
315        &self,
316        path: &str,
317        nonce: u64,
318        params: &HashMap<String, String>,
319    ) -> anyhow::Result<(HashMap<String, String>, String)> {
320        let credential = self
321            .credential
322            .as_ref()
323            .ok_or_else(|| anyhow::anyhow!("Missing credentials"))?;
324
325        let (signature, post_data) = credential.sign_spot(path, nonce, params)?;
326
327        let mut headers = HashMap::new();
328        headers.insert("API-Key".to_string(), credential.api_key().to_string());
329        headers.insert("API-Sign".to_string(), signature);
330
331        Ok((headers, post_data))
332    }
333
334    async fn send_request<T: DeserializeOwned>(
335        &self,
336        method: Method,
337        endpoint: &str,
338        body: Option<Vec<u8>>,
339        authenticate: bool,
340    ) -> anyhow::Result<KrakenResponse<T>, KrakenHttpError> {
341        // Serialize authenticated requests to ensure nonces arrive at Kraken in order.
342        // Without this, concurrent requests can race through the network and arrive
343        // out-of-order, causing "Invalid nonce" errors.
344        let _guard = if authenticate {
345            Some(self.auth_mutex.lock().await)
346        } else {
347            None
348        };
349
350        let endpoint = endpoint.to_string();
351        let url = format!("{}{endpoint}", self.base_url);
352        let method_clone = method.clone();
353        let body_clone = body.clone();
354
355        let operation = || {
356            let url = url.clone();
357            let method = method_clone.clone();
358            let body = body_clone.clone();
359            let endpoint = endpoint.clone();
360
361            async move {
362                let mut headers = Self::default_headers();
363
364                let final_body = if authenticate {
365                    let nonce = self.generate_nonce();
366                    log::debug!("Generated nonce {nonce} for {endpoint}");
367
368                    let params: HashMap<String, String> = if let Some(ref body_bytes) = body {
369                        let body_str = std::str::from_utf8(body_bytes).map_err(|e| {
370                            KrakenHttpError::ParseError(format!(
371                                "Invalid UTF-8 in request body: {e}"
372                            ))
373                        })?;
374                        serde_urlencoded::from_str(body_str).map_err(|e| {
375                            KrakenHttpError::ParseError(format!(
376                                "Failed to parse request params: {e}"
377                            ))
378                        })?
379                    } else {
380                        HashMap::new()
381                    };
382
383                    let (auth_headers, post_data) = self
384                        .sign_spot(&endpoint, nonce, &params)
385                        .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
386                    headers.extend(auth_headers);
387                    Some(post_data.into_bytes())
388                } else {
389                    body
390                };
391
392                if method == Method::POST {
393                    headers.insert(
394                        "Content-Type".to_string(),
395                        "application/x-www-form-urlencoded".to_string(),
396                    );
397                }
398
399                let rate_limit_keys = Self::rate_limit_keys(&endpoint);
400
401                let response = self
402                    .client
403                    .request(
404                        method,
405                        url,
406                        None,
407                        Some(headers),
408                        final_body,
409                        None,
410                        Some(rate_limit_keys),
411                    )
412                    .await
413                    .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
414
415                let status = response.status.as_u16();
416                if status >= 400 {
417                    let body = String::from_utf8_lossy(&response.body).to_string();
418                    // Don't retry authentication errors
419                    if status == 401 || status == 403 {
420                        return Err(KrakenHttpError::AuthenticationError(format!(
421                            "HTTP error {status}: {body}"
422                        )));
423                    }
424                    return Err(KrakenHttpError::NetworkError(format!(
425                        "HTTP error {status}: {body}"
426                    )));
427                }
428
429                let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
430                    KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
431                })?;
432
433                let kraken_response: KrakenResponse<T> = serde_json::from_str(&response_text)
434                    .map_err(|e| {
435                        KrakenHttpError::ParseError(format!("Failed to deserialize response: {e}"))
436                    })?;
437
438                if !kraken_response.error.is_empty() {
439                    return Err(KrakenHttpError::ApiError(kraken_response.error));
440                }
441
442                Ok(kraken_response)
443            }
444        };
445
446        let should_retry =
447            |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
448        let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
449
450        self.retry_manager
451            .execute_with_retry_with_cancel(
452                &endpoint,
453                operation,
454                should_retry,
455                create_error,
456                &self.cancellation_token,
457            )
458            .await
459    }
460
461    /// Requests the server time from Kraken.
462    pub async fn get_server_time(&self) -> anyhow::Result<ServerTime, KrakenHttpError> {
463        let response: KrakenResponse<ServerTime> = self
464            .send_request(Method::GET, "/0/public/Time", None, false)
465            .await?;
466
467        response.result.ok_or_else(|| {
468            KrakenHttpError::ParseError("Missing result in server time response".to_string())
469        })
470    }
471
472    /// Requests the system status from Kraken.
473    pub async fn get_system_status(&self) -> anyhow::Result<SystemStatus, KrakenHttpError> {
474        let response: KrakenResponse<SystemStatus> = self
475            .send_request(Method::GET, "/0/public/SystemStatus", None, false)
476            .await?;
477
478        response.result.ok_or_else(|| {
479            KrakenHttpError::ParseError("Missing result in system status response".to_string())
480        })
481    }
482
483    /// Requests tradable asset pairs from Kraken.
484    ///
485    /// When `aclass_base` is `None`, the Kraken API defaults to `"currency"` (crypto pairs).
486    /// Pass `"tokenized_asset"` to fetch tokenized equities (xStocks).
487    pub async fn get_asset_pairs(
488        &self,
489        pairs: Option<Vec<String>>,
490        aclass_base: Option<&str>,
491    ) -> anyhow::Result<AssetPairsResponse, KrakenHttpError> {
492        let mut params = Vec::new();
493
494        if let Some(pairs) = pairs {
495            params.push(format!("pair={}", pairs.join(",")));
496        }
497
498        if let Some(aclass) = aclass_base {
499            params.push(format!("aclass_base={aclass}"));
500        }
501
502        let endpoint = if params.is_empty() {
503            "/0/public/AssetPairs".to_string()
504        } else {
505            format!("/0/public/AssetPairs?{}", params.join("&"))
506        };
507
508        let response: KrakenResponse<AssetPairsResponse> = self
509            .send_request(Method::GET, &endpoint, None, false)
510            .await?;
511
512        response.result.ok_or_else(|| {
513            KrakenHttpError::ParseError("Missing result in asset pairs response".to_string())
514        })
515    }
516
517    /// Requests ticker information for asset pairs.
518    pub async fn get_ticker(
519        &self,
520        pairs: Vec<String>,
521        asset_class: Option<KrakenAssetClass>,
522    ) -> anyhow::Result<TickerResponse, KrakenHttpError> {
523        let mut endpoint = format!("/0/public/Ticker?pair={}", pairs.join(","));
524
525        if let Some(aclass) = asset_class {
526            endpoint.push_str(&format!("&asset_class={aclass}"));
527        }
528
529        let response: KrakenResponse<TickerResponse> = self
530            .send_request(Method::GET, &endpoint, None, false)
531            .await?;
532
533        response.result.ok_or_else(|| {
534            KrakenHttpError::ParseError("Missing result in ticker response".to_string())
535        })
536    }
537
538    /// Requests OHLC candlestick data for an asset pair.
539    pub async fn get_ohlc(
540        &self,
541        pair: &str,
542        interval: Option<u32>,
543        since: Option<i64>,
544        asset_class: Option<KrakenAssetClass>,
545    ) -> anyhow::Result<OhlcResponse, KrakenHttpError> {
546        let mut endpoint = format!("/0/public/OHLC?pair={pair}");
547
548        if let Some(aclass) = asset_class {
549            endpoint.push_str(&format!("&asset_class={aclass}"));
550        }
551
552        if let Some(interval) = interval {
553            endpoint.push_str(&format!("&interval={interval}"));
554        }
555
556        if let Some(since) = since {
557            endpoint.push_str(&format!("&since={since}"));
558        }
559
560        let response: KrakenResponse<OhlcResponse> = self
561            .send_request(Method::GET, &endpoint, None, false)
562            .await?;
563
564        response.result.ok_or_else(|| {
565            KrakenHttpError::ParseError("Missing result in OHLC response".to_string())
566        })
567    }
568
569    /// Requests order book depth for an asset pair.
570    pub async fn get_book_depth(
571        &self,
572        pair: &str,
573        count: Option<u32>,
574        asset_class: Option<KrakenAssetClass>,
575    ) -> anyhow::Result<OrderBookResponse, KrakenHttpError> {
576        let mut endpoint = format!("/0/public/Depth?pair={pair}");
577
578        if let Some(aclass) = asset_class {
579            endpoint.push_str(&format!("&asset_class={aclass}"));
580        }
581
582        if let Some(count) = count {
583            endpoint.push_str(&format!("&count={count}"));
584        }
585
586        let response: KrakenResponse<OrderBookResponse> = self
587            .send_request(Method::GET, &endpoint, None, false)
588            .await?;
589
590        response.result.ok_or_else(|| {
591            KrakenHttpError::ParseError("Missing result in book depth response".to_string())
592        })
593    }
594
595    /// Requests recent trades for an asset pair.
596    pub async fn get_trades(
597        &self,
598        pair: &str,
599        since: Option<String>,
600        asset_class: Option<KrakenAssetClass>,
601    ) -> anyhow::Result<TradesResponse, KrakenHttpError> {
602        let mut endpoint = format!("/0/public/Trades?pair={pair}");
603
604        if let Some(aclass) = asset_class {
605            endpoint.push_str(&format!("&asset_class={aclass}"));
606        }
607
608        if let Some(since) = since {
609            endpoint.push_str(&format!("&since={since}"));
610        }
611
612        let response: KrakenResponse<TradesResponse> = self
613            .send_request(Method::GET, &endpoint, None, false)
614            .await?;
615
616        response.result.ok_or_else(|| {
617            KrakenHttpError::ParseError("Missing result in trades response".to_string())
618        })
619    }
620
621    /// Requests an authentication token for WebSocket connections.
622    pub async fn get_websockets_token(&self) -> anyhow::Result<WebSocketToken, KrakenHttpError> {
623        if self.credential.is_none() {
624            return Err(KrakenHttpError::AuthenticationError(
625                "API credentials required for GetWebSocketsToken".to_string(),
626            ));
627        }
628
629        let response: KrakenResponse<WebSocketToken> = self
630            .send_request(Method::POST, "/0/private/GetWebSocketsToken", None, true)
631            .await?;
632
633        response.result.ok_or_else(|| {
634            KrakenHttpError::ParseError("Missing result in websockets token response".to_string())
635        })
636    }
637
638    /// Requests all open orders (requires authentication).
639    pub async fn get_open_orders(
640        &self,
641        trades: Option<bool>,
642        userref: Option<i64>,
643    ) -> anyhow::Result<IndexMap<String, SpotOrder>, KrakenHttpError> {
644        if self.credential.is_none() {
645            return Err(KrakenHttpError::AuthenticationError(
646                "API credentials required for OpenOrders".to_string(),
647            ));
648        }
649
650        let mut params = vec![];
651
652        if let Some(trades_flag) = trades {
653            params.push(format!("trades={trades_flag}"));
654        }
655
656        if let Some(userref_val) = userref {
657            params.push(format!("userref={userref_val}"));
658        }
659
660        let body = if params.is_empty() {
661            None
662        } else {
663            Some(params.join("&").into_bytes())
664        };
665
666        let response: KrakenResponse<SpotOpenOrdersResult> = self
667            .send_request(Method::POST, "/0/private/OpenOrders", body, true)
668            .await?;
669
670        let result = response.result.ok_or_else(|| {
671            KrakenHttpError::ParseError("Missing result in open orders response".to_string())
672        })?;
673
674        Ok(result.open)
675    }
676
677    /// Requests closed orders history (requires authentication).
678    pub async fn get_closed_orders(
679        &self,
680        trades: Option<bool>,
681        userref: Option<i64>,
682        start: Option<i64>,
683        end: Option<i64>,
684        ofs: Option<i32>,
685        closetime: Option<String>,
686    ) -> anyhow::Result<IndexMap<String, SpotOrder>, KrakenHttpError> {
687        if self.credential.is_none() {
688            return Err(KrakenHttpError::AuthenticationError(
689                "API credentials required for ClosedOrders".to_string(),
690            ));
691        }
692
693        let mut params = vec![];
694
695        if let Some(trades_flag) = trades {
696            params.push(format!("trades={trades_flag}"));
697        }
698
699        if let Some(userref_val) = userref {
700            params.push(format!("userref={userref_val}"));
701        }
702
703        if let Some(start_val) = start {
704            params.push(format!("start={start_val}"));
705        }
706
707        if let Some(end_val) = end {
708            params.push(format!("end={end_val}"));
709        }
710
711        if let Some(ofs_val) = ofs {
712            params.push(format!("ofs={ofs_val}"));
713        }
714
715        if let Some(closetime_val) = closetime {
716            params.push(format!("closetime={closetime_val}"));
717        }
718
719        let body = if params.is_empty() {
720            None
721        } else {
722            Some(params.join("&").into_bytes())
723        };
724
725        let response: KrakenResponse<SpotClosedOrdersResult> = self
726            .send_request(Method::POST, "/0/private/ClosedOrders", body, true)
727            .await?;
728
729        let result = response.result.ok_or_else(|| {
730            KrakenHttpError::ParseError("Missing result in closed orders response".to_string())
731        })?;
732
733        Ok(result.closed)
734    }
735
736    /// Requests trades history (requires authentication).
737    pub async fn get_trades_history(
738        &self,
739        trade_type: Option<String>,
740        trades: Option<bool>,
741        start: Option<i64>,
742        end: Option<i64>,
743        ofs: Option<i32>,
744    ) -> anyhow::Result<IndexMap<String, SpotTrade>, KrakenHttpError> {
745        if self.credential.is_none() {
746            return Err(KrakenHttpError::AuthenticationError(
747                "API credentials required for TradesHistory".to_string(),
748            ));
749        }
750
751        let mut params = vec![];
752
753        if let Some(type_val) = trade_type {
754            params.push(format!("type={type_val}"));
755        }
756
757        if let Some(trades_flag) = trades {
758            params.push(format!("trades={trades_flag}"));
759        }
760
761        if let Some(start_val) = start {
762            params.push(format!("start={start_val}"));
763        }
764
765        if let Some(end_val) = end {
766            params.push(format!("end={end_val}"));
767        }
768
769        if let Some(ofs_val) = ofs {
770            params.push(format!("ofs={ofs_val}"));
771        }
772
773        let body = if params.is_empty() {
774            None
775        } else {
776            Some(params.join("&").into_bytes())
777        };
778
779        let response: KrakenResponse<SpotTradesHistoryResult> = self
780            .send_request(Method::POST, "/0/private/TradesHistory", body, true)
781            .await?;
782
783        let result = response.result.ok_or_else(|| {
784            KrakenHttpError::ParseError("Missing result in trades history response".to_string())
785        })?;
786
787        Ok(result.trades)
788    }
789
790    /// Submits a new order (requires authentication).
791    pub async fn add_order(
792        &self,
793        params: &KrakenSpotAddOrderParams,
794    ) -> anyhow::Result<SpotAddOrderResponse, KrakenHttpError> {
795        if self.credential.is_none() {
796            return Err(KrakenHttpError::AuthenticationError(
797                "API credentials required for adding orders".to_string(),
798            ));
799        }
800
801        let param_string = serde_urlencoded::to_string(params)
802            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
803        let body = Some(param_string.into_bytes());
804
805        let response: KrakenResponse<SpotAddOrderResponse> = self
806            .send_request(Method::POST, "/0/private/AddOrder", body, true)
807            .await?;
808
809        response
810            .result
811            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
812    }
813
814    /// Submits multiple orders in a single batch request (requires authentication).
815    pub async fn add_order_batch(
816        &self,
817        params: &KrakenSpotAddOrderBatchParams,
818    ) -> anyhow::Result<SpotAddOrderBatchResponse, KrakenHttpError> {
819        let credential = self.credential.as_ref().ok_or_else(|| {
820            KrakenHttpError::AuthenticationError(
821                "API credentials required for adding orders".to_string(),
822            )
823        })?;
824
825        let _guard = self.auth_mutex.lock().await;
826
827        let endpoint = "/0/private/AddOrderBatch";
828        let nonce = self.generate_nonce();
829
830        let mut json_body = serde_json::json!({
831            "nonce": nonce.to_string(),
832            "pair": params.pair,
833            "orders": params.orders,
834        });
835
836        if let Some(aclass) = &params.asset_class {
837            json_body["asset_class"] = serde_json::json!(aclass);
838        }
839        let json_str = serde_json::to_string(&json_body)
840            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize: {e}")))?;
841
842        let signature = credential
843            .sign_spot_json(endpoint, nonce, &json_str)
844            .map_err(|e| KrakenHttpError::AuthenticationError(format!("Failed to sign: {e}")))?;
845
846        let mut headers = Self::default_headers();
847        headers.insert("API-Key".to_string(), credential.api_key().to_string());
848        headers.insert("API-Sign".to_string(), signature);
849        headers.insert("Content-Type".to_string(), "application/json".to_string());
850
851        let url = format!("{}{endpoint}", self.base_url);
852        let rate_limit_keys = Self::rate_limit_keys(endpoint);
853
854        let response = self
855            .client
856            .request(
857                Method::POST,
858                url,
859                None,
860                Some(headers),
861                Some(json_str.into_bytes()),
862                None,
863                Some(rate_limit_keys),
864            )
865            .await
866            .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
867
868        if !response.status.is_success() {
869            return Err(KrakenHttpError::NetworkError(format!(
870                "HTTP {:?} for {}",
871                response.status, endpoint
872            )));
873        }
874
875        let parsed: KrakenResponse<SpotAddOrderBatchResponse> =
876            serde_json::from_slice(&response.body).map_err(|e| {
877                KrakenHttpError::ParseError(format!("Failed to parse JSON response: {e}"))
878            })?;
879
880        if !parsed.error.is_empty() {
881            return Err(KrakenHttpError::ApiError(parsed.error));
882        }
883
884        parsed
885            .result
886            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
887    }
888
889    /// Cancels an open order (requires authentication).
890    pub async fn cancel_order(
891        &self,
892        params: &KrakenSpotCancelOrderParams,
893    ) -> anyhow::Result<SpotCancelOrderResponse, KrakenHttpError> {
894        if self.credential.is_none() {
895            return Err(KrakenHttpError::AuthenticationError(
896                "API credentials required for canceling orders".to_string(),
897            ));
898        }
899
900        let param_string = serde_urlencoded::to_string(params)
901            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
902
903        let body = Some(param_string.into_bytes());
904
905        let response: KrakenResponse<SpotCancelOrderResponse> = self
906            .send_request(Method::POST, "/0/private/CancelOrder", body, true)
907            .await?;
908
909        response
910            .result
911            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
912    }
913
914    /// Cancels multiple orders in a single batch request (max 50 orders).
915    pub async fn cancel_order_batch(
916        &self,
917        params: &KrakenSpotCancelOrderBatchParams,
918    ) -> anyhow::Result<SpotCancelOrderBatchResponse, KrakenHttpError> {
919        let credential = self.credential.as_ref().ok_or_else(|| {
920            KrakenHttpError::AuthenticationError(
921                "API credentials required for canceling orders".to_string(),
922            )
923        })?;
924
925        // Serialize authenticated requests to ensure nonces arrive at Kraken in order
926        let _guard = self.auth_mutex.lock().await;
927
928        let endpoint = "/0/private/CancelOrderBatch";
929        let nonce = self.generate_nonce();
930
931        // CancelOrderBatch uses JSON body with nonce included
932        let json_body = serde_json::json!({
933            "nonce": nonce.to_string(),
934            "orders": params.orders
935        });
936        let json_str = serde_json::to_string(&json_body)
937            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize: {e}")))?;
938
939        let signature = credential
940            .sign_spot_json(endpoint, nonce, &json_str)
941            .map_err(|e| KrakenHttpError::AuthenticationError(format!("Failed to sign: {e}")))?;
942
943        let mut headers = Self::default_headers();
944        headers.insert("API-Key".to_string(), credential.api_key().to_string());
945        headers.insert("API-Sign".to_string(), signature);
946        headers.insert("Content-Type".to_string(), "application/json".to_string());
947
948        let url = format!("{}{endpoint}", self.base_url);
949        let rate_limit_keys = Self::rate_limit_keys(endpoint);
950
951        let response = self
952            .client
953            .request(
954                Method::POST,
955                url,
956                None,
957                Some(headers),
958                Some(json_str.into_bytes()),
959                None,
960                Some(rate_limit_keys),
961            )
962            .await
963            .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
964
965        if response.status.as_u16() >= 400 {
966            let status = response.status.as_u16();
967            let body = String::from_utf8_lossy(&response.body).to_string();
968
969            if status == 401 || status == 403 {
970                return Err(KrakenHttpError::AuthenticationError(format!(
971                    "HTTP error {status}: {body}"
972                )));
973            }
974            return Err(KrakenHttpError::NetworkError(format!(
975                "HTTP error {status}: {body}"
976            )));
977        }
978
979        let response_text = String::from_utf8(response.body.to_vec())
980            .map_err(|e| KrakenHttpError::ParseError(format!("Invalid UTF-8: {e}")))?;
981
982        let kraken_response: KrakenResponse<SpotCancelOrderBatchResponse> =
983            serde_json::from_str(&response_text).map_err(|e| {
984                KrakenHttpError::ParseError(format!("Failed to parse response: {e}"))
985            })?;
986
987        if !kraken_response.error.is_empty() {
988            return Err(KrakenHttpError::ApiError(kraken_response.error));
989        }
990
991        kraken_response
992            .result
993            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
994    }
995
996    /// Cancels all open orders (requires authentication).
997    pub async fn cancel_all_orders(
998        &self,
999    ) -> anyhow::Result<SpotCancelOrderResponse, KrakenHttpError> {
1000        if self.credential.is_none() {
1001            return Err(KrakenHttpError::AuthenticationError(
1002                "API credentials required for canceling orders".to_string(),
1003            ));
1004        }
1005
1006        let response: KrakenResponse<SpotCancelOrderResponse> = self
1007            .send_request(Method::POST, "/0/private/CancelAll", None, true)
1008            .await?;
1009
1010        response
1011            .result
1012            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
1013    }
1014
1015    /// Edits an existing order (cancel and replace).
1016    pub async fn edit_order(
1017        &self,
1018        params: &KrakenSpotEditOrderParams,
1019    ) -> anyhow::Result<SpotEditOrderResponse, KrakenHttpError> {
1020        if self.credential.is_none() {
1021            return Err(KrakenHttpError::AuthenticationError(
1022                "API credentials required for editing orders".to_string(),
1023            ));
1024        }
1025
1026        let param_string = serde_urlencoded::to_string(params)
1027            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
1028
1029        let body = Some(param_string.into_bytes());
1030
1031        let response: KrakenResponse<SpotEditOrderResponse> = self
1032            .send_request(Method::POST, "/0/private/EditOrder", body, true)
1033            .await?;
1034
1035        response
1036            .result
1037            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
1038    }
1039
1040    /// Amends an existing order in-place (no cancel/replace).
1041    pub async fn amend_order(
1042        &self,
1043        params: &KrakenSpotAmendOrderParams,
1044    ) -> anyhow::Result<SpotAmendOrderResponse, KrakenHttpError> {
1045        if self.credential.is_none() {
1046            return Err(KrakenHttpError::AuthenticationError(
1047                "API credentials required for amending orders".to_string(),
1048            ));
1049        }
1050
1051        let param_string = serde_urlencoded::to_string(params)
1052            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
1053
1054        let body = Some(param_string.into_bytes());
1055
1056        let response: KrakenResponse<SpotAmendOrderResponse> = self
1057            .send_request(Method::POST, "/0/private/AmendOrder", body, true)
1058            .await?;
1059
1060        response
1061            .result
1062            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
1063    }
1064
1065    /// Requests account balances (requires authentication).
1066    pub async fn get_balance(&self) -> anyhow::Result<BalanceResponse, KrakenHttpError> {
1067        if self.credential.is_none() {
1068            return Err(KrakenHttpError::AuthenticationError(
1069                "API credentials required for Balance".to_string(),
1070            ));
1071        }
1072
1073        let response: KrakenResponse<BalanceResponse> = self
1074            .send_request(Method::POST, "/0/private/Balance", None, true)
1075            .await?;
1076
1077        response.result.ok_or_else(|| {
1078            KrakenHttpError::ParseError("Missing result in balance response".to_string())
1079        })
1080    }
1081}
1082
1083/// High-level HTTP client for the Kraken Spot REST API.
1084///
1085/// This client wraps the raw client and provides Nautilus domain types.
1086/// It maintains an instrument cache and uses it to parse venue responses
1087/// into Nautilus domain objects.
1088#[cfg_attr(
1089    feature = "python",
1090    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken", from_py_object)
1091)]
1092#[cfg_attr(
1093    feature = "python",
1094    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.kraken")
1095)]
1096pub struct KrakenSpotHttpClient {
1097    pub(crate) inner: Arc<KrakenSpotRawHttpClient>,
1098    pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
1099    clock: &'static AtomicTime,
1100    cache_initialized: Arc<AtomicBool>,
1101    use_spot_position_reports: Arc<AtomicBool>,
1102    spot_positions_quote_currency: Arc<RwLock<Ustr>>,
1103}
1104
1105impl Clone for KrakenSpotHttpClient {
1106    fn clone(&self) -> Self {
1107        Self {
1108            inner: self.inner.clone(),
1109            instruments_cache: self.instruments_cache.clone(),
1110            cache_initialized: self.cache_initialized.clone(),
1111            use_spot_position_reports: self.use_spot_position_reports.clone(),
1112            spot_positions_quote_currency: self.spot_positions_quote_currency.clone(),
1113            clock: self.clock,
1114        }
1115    }
1116}
1117
1118impl Default for KrakenSpotHttpClient {
1119    fn default() -> Self {
1120        Self::new(
1121            KrakenEnvironment::Mainnet,
1122            None,
1123            60,
1124            None,
1125            None,
1126            None,
1127            None,
1128            KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND,
1129        )
1130        .expect("Failed to create default KrakenSpotHttpClient")
1131    }
1132}
1133
1134impl Debug for KrakenSpotHttpClient {
1135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1136        f.debug_struct(stringify!(KrakenSpotHttpClient))
1137            .field("inner", &self.inner)
1138            .finish()
1139    }
1140}
1141
1142impl KrakenSpotHttpClient {
1143    /// Creates a new [`KrakenSpotHttpClient`].
1144    #[expect(clippy::too_many_arguments)]
1145    pub fn new(
1146        environment: KrakenEnvironment,
1147        base_url_override: Option<String>,
1148        timeout_secs: u64,
1149        max_retries: Option<u32>,
1150        retry_delay_ms: Option<u64>,
1151        retry_delay_max_ms: Option<u64>,
1152        proxy_url: Option<String>,
1153        max_requests_per_second: u32,
1154    ) -> anyhow::Result<Self> {
1155        Ok(Self {
1156            inner: Arc::new(KrakenSpotRawHttpClient::new(
1157                environment,
1158                base_url_override,
1159                timeout_secs,
1160                max_retries,
1161                retry_delay_ms,
1162                retry_delay_max_ms,
1163                proxy_url,
1164                max_requests_per_second,
1165            )?),
1166            instruments_cache: Arc::new(AtomicMap::new()),
1167            cache_initialized: Arc::new(AtomicBool::new(false)),
1168            use_spot_position_reports: Arc::new(AtomicBool::new(false)),
1169            spot_positions_quote_currency: Arc::new(RwLock::new(Ustr::from("USDT"))),
1170            clock: get_atomic_clock_realtime(),
1171        })
1172    }
1173
1174    /// Creates a new [`KrakenSpotHttpClient`] with credentials.
1175    #[expect(clippy::too_many_arguments)]
1176    pub fn with_credentials(
1177        api_key: String,
1178        api_secret: String,
1179        environment: KrakenEnvironment,
1180        base_url_override: Option<String>,
1181        timeout_secs: u64,
1182        max_retries: Option<u32>,
1183        retry_delay_ms: Option<u64>,
1184        retry_delay_max_ms: Option<u64>,
1185        proxy_url: Option<String>,
1186        max_requests_per_second: u32,
1187    ) -> anyhow::Result<Self> {
1188        Ok(Self {
1189            inner: Arc::new(KrakenSpotRawHttpClient::with_credentials(
1190                api_key,
1191                api_secret,
1192                environment,
1193                base_url_override,
1194                timeout_secs,
1195                max_retries,
1196                retry_delay_ms,
1197                retry_delay_max_ms,
1198                proxy_url,
1199                max_requests_per_second,
1200            )?),
1201            instruments_cache: Arc::new(AtomicMap::new()),
1202            cache_initialized: Arc::new(AtomicBool::new(false)),
1203            use_spot_position_reports: Arc::new(AtomicBool::new(false)),
1204            spot_positions_quote_currency: Arc::new(RwLock::new(Ustr::from("USDT"))),
1205            clock: get_atomic_clock_realtime(),
1206        })
1207    }
1208
1209    /// Creates a new [`KrakenSpotHttpClient`] loading credentials from environment variables.
1210    ///
1211    /// Looks for `KRAKEN_SPOT_API_KEY` and `KRAKEN_SPOT_API_SECRET`.
1212    ///
1213    /// Note: Kraken Spot does not have a testnet/demo environment.
1214    ///
1215    /// Falls back to unauthenticated client if credentials are not set.
1216    #[expect(clippy::too_many_arguments)]
1217    pub fn from_env(
1218        environment: KrakenEnvironment,
1219        base_url_override: Option<String>,
1220        timeout_secs: u64,
1221        max_retries: Option<u32>,
1222        retry_delay_ms: Option<u64>,
1223        retry_delay_max_ms: Option<u64>,
1224        proxy_url: Option<String>,
1225        max_requests_per_second: u32,
1226    ) -> anyhow::Result<Self> {
1227        if let Some(credential) = KrakenCredential::from_env_spot() {
1228            let (api_key, api_secret) = credential.into_parts();
1229            Self::with_credentials(
1230                api_key,
1231                api_secret,
1232                environment,
1233                base_url_override,
1234                timeout_secs,
1235                max_retries,
1236                retry_delay_ms,
1237                retry_delay_max_ms,
1238                proxy_url,
1239                max_requests_per_second,
1240            )
1241        } else {
1242            Self::new(
1243                environment,
1244                base_url_override,
1245                timeout_secs,
1246                max_retries,
1247                retry_delay_ms,
1248                retry_delay_max_ms,
1249                proxy_url,
1250                max_requests_per_second,
1251            )
1252        }
1253    }
1254
1255    /// Cancels all pending HTTP requests.
1256    pub fn cancel_all_requests(&self) {
1257        self.inner.cancel_all_requests();
1258    }
1259
1260    /// Returns the cancellation token for this client.
1261    pub fn cancellation_token(&self) -> &CancellationToken {
1262        self.inner.cancellation_token()
1263    }
1264
1265    /// Caches an instrument for symbol lookup.
1266    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1267        self.instruments_cache
1268            .insert(instrument.symbol().inner(), instrument);
1269        self.cache_initialized.store(true, Ordering::Release);
1270    }
1271
1272    /// Caches multiple instruments for symbol lookup.
1273    pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1274        self.instruments_cache.rcu(|m| {
1275            for instrument in instruments {
1276                m.insert(instrument.symbol().inner(), instrument.clone());
1277            }
1278        });
1279        self.cache_initialized.store(true, Ordering::Release);
1280    }
1281
1282    /// Gets an instrument from the cache by symbol.
1283    pub fn get_cached_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1284        self.instruments_cache.get_cloned(symbol)
1285    }
1286
1287    fn get_instrument_by_raw_symbol(&self, raw_symbol: &str) -> Option<InstrumentAny> {
1288        self.instruments_cache
1289            .load()
1290            .values()
1291            .find(|inst| inst.raw_symbol().as_str() == raw_symbol)
1292            .cloned()
1293    }
1294
1295    fn generate_ts_init(&self) -> UnixNanos {
1296        self.clock.get_time_ns()
1297    }
1298
1299    // Kraken requires `asset_class=tokenized_asset` on every request that references a tokenized pair.
1300    fn asset_class_for(instrument: &InstrumentAny) -> Option<KrakenAssetClass> {
1301        if matches!(instrument, InstrumentAny::TokenizedAsset(_)) {
1302            Some(KrakenAssetClass::TokenizedAsset)
1303        } else {
1304            None
1305        }
1306    }
1307
1308    /// Sets whether to generate position reports from wallet balances for SPOT instruments.
1309    pub fn set_use_spot_position_reports(&self, value: bool) {
1310        self.use_spot_position_reports
1311            .store(value, Ordering::Relaxed);
1312    }
1313
1314    /// Sets the quote currency filter for spot position reports.
1315    pub fn set_spot_positions_quote_currency(&self, currency: &str) {
1316        let mut guard = self.spot_positions_quote_currency.write().expect("lock");
1317        *guard = Ustr::from(currency);
1318    }
1319
1320    /// Requests an authentication token for WebSocket connections.
1321    pub async fn get_websockets_token(&self) -> anyhow::Result<WebSocketToken, KrakenHttpError> {
1322        self.inner.get_websockets_token().await
1323    }
1324
1325    /// Requests tradable instruments from Kraken.
1326    ///
1327    /// When `pairs` is `None` (loading all), also fetches tokenized asset pairs
1328    /// (xStocks) and merges them with the default currency pairs.
1329    pub async fn request_instruments(
1330        &self,
1331        pairs: Option<Vec<String>>,
1332    ) -> anyhow::Result<Vec<InstrumentAny>, KrakenHttpError> {
1333        let ts_init = self.generate_ts_init();
1334        let asset_pairs = self.inner.get_asset_pairs(pairs.clone(), None).await?;
1335
1336        let mut instruments: Vec<InstrumentAny> = asset_pairs
1337            .iter()
1338            .filter_map(|(pair_name, definition)| {
1339                match parse_spot_instrument(pair_name, definition, ts_init, ts_init) {
1340                    Ok(instrument) => Some(instrument),
1341                    Err(e) => {
1342                        log::warn!("Failed to parse instrument {pair_name}: {e}");
1343                        None
1344                    }
1345                }
1346            })
1347            .collect();
1348
1349        // Also fetch tokenized asset pairs (xStocks). When loading all pairs this
1350        // picks up tokenized equities; when loading specific pairs it covers the
1351        // case where the requested symbols are tokenized assets.
1352        {
1353            match self
1354                .inner
1355                .get_asset_pairs(pairs, Some("tokenized_asset"))
1356                .await
1357            {
1358                Ok(tokenized_pairs) => {
1359                    if !tokenized_pairs.is_empty() {
1360                        log::info!("Fetched {} tokenized asset pairs", tokenized_pairs.len());
1361                    }
1362                    let tokenized_instruments: Vec<InstrumentAny> =
1363                        tokenized_pairs
1364                            .iter()
1365                            .filter_map(|(pair_name, definition)| match parse_tokenized_instrument(
1366                                pair_name, definition, ts_init, ts_init,
1367                            ) {
1368                                Ok(instrument) => Some(instrument),
1369                                Err(e) => {
1370                                    log::warn!(
1371                                        "Failed to parse tokenized instrument {pair_name}: {e}"
1372                                    );
1373                                    None
1374                                }
1375                            })
1376                            .collect();
1377                    instruments.extend(tokenized_instruments);
1378                }
1379                Err(e) => {
1380                    log::warn!("Failed to fetch tokenized asset pairs: {e}");
1381                }
1382            }
1383        }
1384
1385        Ok(instruments)
1386    }
1387
1388    /// Requests the current market status for Kraken Spot instruments.
1389    ///
1390    /// Fetches both regular and tokenized asset pairs. The call returns an error if
1391    /// either fetch fails so callers can avoid emitting partial snapshots that would
1392    /// otherwise cause the missing tokenized symbols to be diffed as removed.
1393    pub async fn request_instrument_statuses(
1394        &self,
1395        pairs: Option<Vec<String>>,
1396    ) -> anyhow::Result<AHashMap<InstrumentId, MarketStatusAction>, KrakenHttpError> {
1397        let asset_pairs = self.inner.get_asset_pairs(pairs.clone(), None).await?;
1398        let mut statuses = collect_spot_statuses(&asset_pairs);
1399
1400        let tokenized_pairs = self
1401            .inner
1402            .get_asset_pairs(pairs, Some("tokenized_asset"))
1403            .await?;
1404        statuses.extend(collect_spot_statuses(&tokenized_pairs));
1405
1406        Ok(statuses)
1407    }
1408
1409    /// Requests historical trades for an instrument.
1410    pub async fn request_trades(
1411        &self,
1412        instrument_id: InstrumentId,
1413        start: Option<DateTime<Utc>>,
1414        end: Option<DateTime<Utc>>,
1415        limit: Option<u64>,
1416    ) -> anyhow::Result<Vec<TradeTick>, KrakenHttpError> {
1417        let instrument = self
1418            .get_cached_instrument(&instrument_id.symbol.inner())
1419            .ok_or_else(|| {
1420                KrakenHttpError::ParseError(format!(
1421                    "Instrument not found in cache: {instrument_id}",
1422                ))
1423            })?;
1424
1425        let raw_symbol = instrument.raw_symbol().to_string();
1426        let asset_class = Self::asset_class_for(&instrument);
1427        let ts_init = self.generate_ts_init();
1428
1429        // Kraken trades API expects nanoseconds since epoch as string
1430        let since = start.map(|dt| (dt.timestamp_nanos_opt().unwrap_or(0) as u64).to_string());
1431        let response = self
1432            .inner
1433            .get_trades(&raw_symbol, since, asset_class)
1434            .await?;
1435
1436        let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1437        let mut trades = Vec::new();
1438
1439        for (_pair_name, trade_arrays) in &response.data {
1440            for trade_array in trade_arrays {
1441                match parse_trade_tick_from_array(trade_array, &instrument, ts_init) {
1442                    Ok(trade_tick) => {
1443                        if let Some(end_nanos) = end_ns
1444                            && trade_tick.ts_event.as_u64() > end_nanos
1445                        {
1446                            continue;
1447                        }
1448                        trades.push(trade_tick);
1449
1450                        if let Some(limit_count) = limit
1451                            && trades.len() >= limit_count as usize
1452                        {
1453                            return Ok(trades);
1454                        }
1455                    }
1456                    Err(e) => {
1457                        log::warn!("Failed to parse trade tick: {e}");
1458                    }
1459                }
1460            }
1461        }
1462
1463        Ok(trades)
1464    }
1465
1466    /// Requests historical bars/OHLC data for an instrument.
1467    pub async fn request_bars(
1468        &self,
1469        bar_type: BarType,
1470        start: Option<DateTime<Utc>>,
1471        end: Option<DateTime<Utc>>,
1472        limit: Option<u64>,
1473    ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
1474        let instrument_id = bar_type.instrument_id();
1475        let instrument = self
1476            .get_cached_instrument(&instrument_id.symbol.inner())
1477            .ok_or_else(|| {
1478                KrakenHttpError::ParseError(format!(
1479                    "Instrument not found in cache: {instrument_id}"
1480                ))
1481            })?;
1482
1483        let raw_symbol = instrument.raw_symbol().to_string();
1484        let asset_class = Self::asset_class_for(&instrument);
1485        let ts_init = self.generate_ts_init();
1486
1487        let interval = Some(
1488            bar_type_to_spot_interval(bar_type)
1489                .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?,
1490        );
1491
1492        // Kraken OHLC API expects Unix timestamp in seconds
1493        let since = start.map(|dt| dt.timestamp());
1494        let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1495        let response = self
1496            .inner
1497            .get_ohlc(&raw_symbol, interval, since, asset_class)
1498            .await?;
1499
1500        let mut bars = Vec::new();
1501
1502        for (_pair_name, ohlc_arrays) in &response.data {
1503            for ohlc_array in ohlc_arrays {
1504                if ohlc_array.len() < 8 {
1505                    let len = ohlc_array.len();
1506                    log::warn!("OHLC array too short: {len}");
1507                    continue;
1508                }
1509
1510                let ohlc = OhlcData {
1511                    time: ohlc_array[0].as_i64().unwrap_or(0),
1512                    open: ohlc_array[1].as_str().unwrap_or("0").to_string(),
1513                    high: ohlc_array[2].as_str().unwrap_or("0").to_string(),
1514                    low: ohlc_array[3].as_str().unwrap_or("0").to_string(),
1515                    close: ohlc_array[4].as_str().unwrap_or("0").to_string(),
1516                    vwap: ohlc_array[5].as_str().unwrap_or("0").to_string(),
1517                    volume: ohlc_array[6].as_str().unwrap_or("0").to_string(),
1518                    count: ohlc_array[7].as_i64().unwrap_or(0),
1519                };
1520
1521                match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
1522                    Ok(bar) => {
1523                        if let Some(end_nanos) = end_ns
1524                            && bar.ts_event.as_u64() > end_nanos
1525                        {
1526                            continue;
1527                        }
1528                        bars.push(bar);
1529
1530                        if let Some(limit_count) = limit
1531                            && bars.len() >= limit_count as usize
1532                        {
1533                            return Ok(bars);
1534                        }
1535                    }
1536                    Err(e) => {
1537                        log::warn!("Failed to parse bar: {e}");
1538                    }
1539                }
1540            }
1541        }
1542
1543        Ok(bars)
1544    }
1545
1546    /// Requests an order book snapshot for an instrument.
1547    pub async fn request_book_snapshot(
1548        &self,
1549        instrument_id: InstrumentId,
1550        depth: Option<u32>,
1551    ) -> anyhow::Result<OrderBook, KrakenHttpError> {
1552        let instrument = self
1553            .get_cached_instrument(&instrument_id.symbol.inner())
1554            .ok_or_else(|| {
1555                KrakenHttpError::ParseError(format!(
1556                    "Instrument not found in cache: {instrument_id}"
1557                ))
1558            })?;
1559
1560        let raw_symbol = instrument.raw_symbol().to_string();
1561        let asset_class = Self::asset_class_for(&instrument);
1562        let price_precision = instrument.price_precision();
1563        let size_precision = instrument.size_precision();
1564        let ts_event = self.generate_ts_init();
1565
1566        let response = self
1567            .inner
1568            .get_book_depth(&raw_symbol, depth, asset_class)
1569            .await?;
1570
1571        let book_data = response.values().next().ok_or_else(|| {
1572            KrakenHttpError::ParseError(format!("No book data returned for {instrument_id}"))
1573        })?;
1574
1575        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1576
1577        // Pass sequence=0 so the snapshot does not advance the book's high-water sequence,
1578        // the WS subscription owns sequencing once it starts streaming deltas.
1579        for (i, level) in book_data.bids.iter().enumerate() {
1580            let price_str = level.first().and_then(|v| v.as_str()).unwrap_or("0");
1581            let size_str = level.get(1).and_then(|v| v.as_str()).unwrap_or("0");
1582            let price = Price::new(price_str.parse::<f64>().unwrap_or(0.0), price_precision);
1583            let size = Quantity::new(size_str.parse::<f64>().unwrap_or(0.0), size_precision);
1584            let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1585            book.add(order, 0, 0, ts_event);
1586        }
1587
1588        let bids_len = book_data.bids.len();
1589
1590        for (i, level) in book_data.asks.iter().enumerate() {
1591            let price_str = level.first().and_then(|v| v.as_str()).unwrap_or("0");
1592            let size_str = level.get(1).and_then(|v| v.as_str()).unwrap_or("0");
1593            let price = Price::new(price_str.parse::<f64>().unwrap_or(0.0), price_precision);
1594            let size = Quantity::new(size_str.parse::<f64>().unwrap_or(0.0), size_precision);
1595            let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1596            book.add(order, 0, 0, ts_event);
1597        }
1598
1599        Ok(book)
1600    }
1601
1602    /// Requests account state (balances) from Kraken.
1603    ///
1604    /// Returns an `AccountState` containing all currency balances.
1605    pub async fn request_account_state(
1606        &self,
1607        account_id: AccountId,
1608    ) -> anyhow::Result<AccountState> {
1609        let balances_raw = self.inner.get_balance().await?;
1610        let ts_init = self.generate_ts_init();
1611
1612        let balances: Vec<AccountBalance> = balances_raw
1613            .iter()
1614            .filter_map(|(currency_code, amount_str)| {
1615                let amount = Decimal::from_str_exact(amount_str).ok()?;
1616                if amount.is_zero() {
1617                    return None;
1618                }
1619
1620                // Kraken uses X-prefixed names for some currencies (e.g., XXBT for BTC)
1621                let normalized_code = currency_code
1622                    .strip_prefix("X")
1623                    .or_else(|| currency_code.strip_prefix("Z"))
1624                    .unwrap_or(currency_code);
1625
1626                let currency = Currency::new(
1627                    normalized_code,
1628                    8, // Default precision
1629                    0,
1630                    "0",
1631                    CurrencyType::Crypto,
1632                );
1633
1634                // Balance endpoint returns total only, so free = total (no locked info)
1635                AccountBalance::from_total_and_locked(amount, Decimal::ZERO, currency).ok()
1636            })
1637            .collect();
1638
1639        Ok(AccountState::new(
1640            account_id,
1641            AccountType::Cash,
1642            balances,
1643            vec![], // No margins for spot
1644            true,   // reported
1645            UUID4::new(),
1646            ts_init,
1647            ts_init,
1648            None,
1649        ))
1650    }
1651
1652    /// Requests order status reports from Kraken.
1653    pub async fn request_order_status_reports(
1654        &self,
1655        account_id: AccountId,
1656        instrument_id: Option<InstrumentId>,
1657        start: Option<DateTime<Utc>>,
1658        end: Option<DateTime<Utc>>,
1659        open_only: bool,
1660    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1661        const PAGE_SIZE: i32 = 50;
1662
1663        let ts_init = self.generate_ts_init();
1664        let mut all_reports = Vec::new();
1665
1666        let open_orders = self.inner.get_open_orders(Some(true), None).await?;
1667
1668        for (order_id, order) in &open_orders {
1669            if let Some(ref target_id) = instrument_id {
1670                let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1671                if let Some(inst) = instrument
1672                    && inst.raw_symbol().as_str() != order.descr.pair
1673                {
1674                    continue;
1675                }
1676            }
1677
1678            if let Some(instrument) = self.get_instrument_by_raw_symbol(order.descr.pair.as_str()) {
1679                match parse_order_status_report(order_id, order, &instrument, account_id, ts_init) {
1680                    Ok(report) => all_reports.push(report),
1681                    Err(e) => {
1682                        log::warn!("Failed to parse order {order_id}: {e}");
1683                    }
1684                }
1685            }
1686        }
1687
1688        if open_only {
1689            return Ok(all_reports);
1690        }
1691
1692        // Kraken API expects Unix timestamps in seconds
1693        let start_ts = start.map(|dt| dt.timestamp());
1694        let end_ts = end.map(|dt| dt.timestamp());
1695
1696        let mut offset = 0;
1697
1698        loop {
1699            let closed_orders = self
1700                .inner
1701                .get_closed_orders(Some(true), None, start_ts, end_ts, Some(offset), None)
1702                .await?;
1703
1704            if closed_orders.is_empty() {
1705                break;
1706            }
1707
1708            for (order_id, order) in &closed_orders {
1709                if let Some(ref target_id) = instrument_id {
1710                    let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1711                    if let Some(inst) = instrument
1712                        && inst.raw_symbol().as_str() != order.descr.pair
1713                    {
1714                        continue;
1715                    }
1716                }
1717
1718                if let Some(instrument) =
1719                    self.get_instrument_by_raw_symbol(order.descr.pair.as_str())
1720                {
1721                    match parse_order_status_report(
1722                        order_id,
1723                        order,
1724                        &instrument,
1725                        account_id,
1726                        ts_init,
1727                    ) {
1728                        Ok(report) => all_reports.push(report),
1729                        Err(e) => {
1730                            log::warn!("Failed to parse order {order_id}: {e}");
1731                        }
1732                    }
1733                }
1734            }
1735
1736            offset += PAGE_SIZE;
1737        }
1738
1739        Ok(all_reports)
1740    }
1741
1742    /// Requests fill/trade reports from Kraken.
1743    pub async fn request_fill_reports(
1744        &self,
1745        account_id: AccountId,
1746        instrument_id: Option<InstrumentId>,
1747        start: Option<DateTime<Utc>>,
1748        end: Option<DateTime<Utc>>,
1749    ) -> anyhow::Result<Vec<FillReport>> {
1750        const PAGE_SIZE: i32 = 50;
1751
1752        let ts_init = self.generate_ts_init();
1753        let mut all_reports = Vec::new();
1754
1755        // Kraken API expects Unix timestamps in seconds
1756        let start_ts = start.map(|dt| dt.timestamp());
1757        let end_ts = end.map(|dt| dt.timestamp());
1758
1759        let mut offset = 0;
1760
1761        loop {
1762            let trades = self
1763                .inner
1764                .get_trades_history(None, Some(true), start_ts, end_ts, Some(offset))
1765                .await?;
1766
1767            if trades.is_empty() {
1768                break;
1769            }
1770
1771            for (trade_id, trade) in &trades {
1772                if let Some(ref target_id) = instrument_id {
1773                    let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1774                    if let Some(inst) = instrument
1775                        && inst.raw_symbol().as_str() != trade.pair
1776                    {
1777                        continue;
1778                    }
1779                }
1780
1781                if let Some(instrument) = self.get_instrument_by_raw_symbol(trade.pair.as_str()) {
1782                    match parse_fill_report(trade_id, trade, &instrument, account_id, ts_init) {
1783                        Ok(report) => all_reports.push(report),
1784                        Err(e) => {
1785                            log::warn!("Failed to parse trade {trade_id}: {e}");
1786                        }
1787                    }
1788                }
1789            }
1790
1791            offset += PAGE_SIZE;
1792        }
1793
1794        Ok(all_reports)
1795    }
1796
1797    /// Requests position status reports for SPOT instruments.
1798    ///
1799    /// Returns wallet balances as position reports if `use_spot_position_reports` is enabled.
1800    /// Otherwise returns an empty vector (spot traditionally has no "positions").
1801    pub async fn request_position_status_reports(
1802        &self,
1803        account_id: AccountId,
1804        instrument_id: Option<InstrumentId>,
1805    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1806        if self.use_spot_position_reports.load(Ordering::Relaxed) {
1807            self.generate_spot_position_reports_from_wallet(account_id, instrument_id)
1808                .await
1809        } else {
1810            Ok(Vec::new())
1811        }
1812    }
1813
1814    /// Generates SPOT position reports from wallet balances.
1815    ///
1816    /// Kraken spot balances are simple totals (no borrowing concept).
1817    /// Positive balances are reported as LONG positions.
1818    /// Zero balances are reported as FLAT.
1819    async fn generate_spot_position_reports_from_wallet(
1820        &self,
1821        account_id: AccountId,
1822        instrument_id: Option<InstrumentId>,
1823    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1824        let balances_raw = self.inner.get_balance().await?;
1825        let ts_init = self.generate_ts_init();
1826        let mut wallet_by_coin: HashMap<Ustr, f64> = HashMap::new();
1827
1828        for (currency_code, amount_str) in &balances_raw {
1829            let balance = match amount_str.parse::<f64>() {
1830                Ok(b) => b,
1831                Err(_) => continue,
1832            };
1833
1834            if balance == 0.0 {
1835                continue;
1836            }
1837
1838            wallet_by_coin.insert(Ustr::from(normalize_currency_code(currency_code)), balance);
1839        }
1840
1841        let mut reports = Vec::new();
1842
1843        if let Some(instrument_id) = instrument_id {
1844            if let Some(instrument) = self.get_cached_instrument(&instrument_id.symbol.inner()) {
1845                let base_currency = match instrument.base_currency() {
1846                    Some(currency) => currency,
1847                    None => return Ok(reports),
1848                };
1849
1850                let coin = Ustr::from(normalize_currency_code(base_currency.code.as_str()));
1851                let wallet_balance = wallet_by_coin.get(&coin).copied().unwrap_or(0.0);
1852
1853                let side = if wallet_balance > 0.0 {
1854                    PositionSideSpecified::Long
1855                } else {
1856                    PositionSideSpecified::Flat
1857                };
1858
1859                let abs_balance = wallet_balance.abs();
1860                let quantity = Quantity::new(abs_balance, instrument.size_precision());
1861
1862                let report = PositionStatusReport::new(
1863                    account_id,
1864                    instrument_id,
1865                    side,
1866                    quantity,
1867                    ts_init,
1868                    ts_init,
1869                    None,
1870                    None,
1871                    None,
1872                );
1873
1874                reports.push(report);
1875            }
1876        } else {
1877            let quote_filter = *self.spot_positions_quote_currency.read().expect("lock");
1878
1879            let instruments_guard = self.instruments_cache.load();
1880            for instrument in instruments_guard.values() {
1881                let quote_currency = match instrument.quote_currency() {
1882                    currency if currency.code == quote_filter => currency,
1883                    _ => continue,
1884                };
1885
1886                let base_currency = match instrument.base_currency() {
1887                    Some(currency) => currency,
1888                    None => continue,
1889                };
1890
1891                let coin = Ustr::from(normalize_currency_code(base_currency.code.as_str()));
1892                let wallet_balance = wallet_by_coin.get(&coin).copied().unwrap_or(0.0);
1893
1894                if wallet_balance == 0.0 {
1895                    continue;
1896                }
1897
1898                let side = PositionSideSpecified::Long;
1899                let quantity = Quantity::new(wallet_balance, instrument.size_precision());
1900
1901                if quantity.is_zero() {
1902                    continue;
1903                }
1904
1905                log::debug!(
1906                    "Spot position: {} {} (quote: {})",
1907                    quantity,
1908                    base_currency.code,
1909                    quote_currency.code
1910                );
1911
1912                let report = PositionStatusReport::new(
1913                    account_id,
1914                    instrument.id(),
1915                    side,
1916                    quantity,
1917                    ts_init,
1918                    ts_init,
1919                    None,
1920                    None,
1921                    None,
1922                );
1923
1924                reports.push(report);
1925            }
1926        }
1927
1928        Ok(reports)
1929    }
1930
1931    /// Submits a new order to the Kraken Spot exchange.
1932    ///
1933    /// Returns the venue order ID on success. WebSocket handles all execution events.
1934    ///
1935    /// # Errors
1936    ///
1937    /// Returns an error if:
1938    /// - Credentials are missing.
1939    /// - The instrument is not found in cache.
1940    /// - The order type or time in force is not supported.
1941    /// - The request fails.
1942    /// - The order is rejected.
1943    #[expect(clippy::too_many_arguments)]
1944    pub async fn submit_order(
1945        &self,
1946        _account_id: AccountId,
1947        instrument_id: InstrumentId,
1948        client_order_id: ClientOrderId,
1949        order_side: OrderSide,
1950        order_type: OrderType,
1951        quantity: Quantity,
1952        time_in_force: TimeInForce,
1953        expire_time: Option<UnixNanos>,
1954        price: Option<Price>,
1955        trigger_price: Option<Price>,
1956        trigger_type: Option<TriggerType>,
1957        trailing_offset: Option<Decimal>,
1958        limit_offset: Option<Decimal>,
1959        reduce_only: bool,
1960        post_only: bool,
1961        quote_quantity: bool,
1962        display_qty: Option<Quantity>,
1963    ) -> anyhow::Result<VenueOrderId> {
1964        let params = self.build_add_order_params(
1965            instrument_id,
1966            client_order_id,
1967            order_side,
1968            order_type,
1969            quantity,
1970            time_in_force,
1971            expire_time,
1972            price,
1973            trigger_price,
1974            trigger_type,
1975            trailing_offset,
1976            limit_offset,
1977            reduce_only,
1978            post_only,
1979            quote_quantity,
1980            display_qty,
1981        )?;
1982        let response = self.inner.add_order(&params).await?;
1983
1984        let venue_order_id = response
1985            .txid
1986            .first()
1987            .ok_or_else(|| anyhow::anyhow!("No transaction ID in order response"))?;
1988
1989        Ok(VenueOrderId::new(venue_order_id))
1990    }
1991
1992    /// Submits multiple orders to the Kraken Spot exchange.
1993    ///
1994    /// Automatically groups orders by pair and chunks batch requests at the venue
1995    /// limit. Single-order groups fall back to `AddOrder`.
1996    #[expect(clippy::type_complexity)]
1997    pub async fn submit_orders_batch(
1998        &self,
1999        orders: Vec<(
2000            InstrumentId,
2001            ClientOrderId,
2002            OrderSide,
2003            OrderType,
2004            Quantity,
2005            TimeInForce,
2006            Option<UnixNanos>,
2007            Option<Price>,
2008            Option<Price>,
2009            Option<TriggerType>,
2010            Option<Decimal>,
2011            Option<Decimal>,
2012            bool,
2013            bool,
2014            bool,
2015            Option<Quantity>,
2016        )>,
2017    ) -> anyhow::Result<Vec<String>> {
2018        let count = orders.len();
2019        if count == 0 {
2020            return Ok(Vec::new());
2021        }
2022
2023        let mut all_statuses: Vec<Option<String>> = vec![None; count];
2024        let mut grouped: AHashMap<Ustr, Vec<(usize, KrakenSpotAddOrderParams)>> = AHashMap::new();
2025
2026        for (
2027            idx,
2028            (
2029                instrument_id,
2030                client_order_id,
2031                order_side,
2032                order_type,
2033                quantity,
2034                time_in_force,
2035                expire_time,
2036                price,
2037                trigger_price,
2038                trigger_type,
2039                trailing_offset,
2040                limit_offset,
2041                reduce_only,
2042                post_only,
2043                quote_quantity,
2044                display_qty,
2045            ),
2046        ) in orders.into_iter().enumerate()
2047        {
2048            match self.build_add_order_params(
2049                instrument_id,
2050                client_order_id,
2051                order_side,
2052                order_type,
2053                quantity,
2054                time_in_force,
2055                expire_time,
2056                price,
2057                trigger_price,
2058                trigger_type,
2059                trailing_offset,
2060                limit_offset,
2061                reduce_only,
2062                post_only,
2063                quote_quantity,
2064                display_qty,
2065            ) {
2066                Ok(params) => {
2067                    grouped.entry(params.pair).or_default().push((idx, params));
2068                }
2069                Err(e) => {
2070                    all_statuses[idx] = Some(format!("validation_error: {e}"));
2071                }
2072            }
2073        }
2074
2075        let mut grouped_batches: Vec<_> = grouped.into_values().collect();
2076        grouped_batches.sort_by_key(|group| group.first().map_or(usize::MAX, |(idx, _)| *idx));
2077
2078        for grouped_orders in grouped_batches {
2079            for chunk in grouped_orders.chunks(BATCH_SUBMIT_LIMIT) {
2080                if chunk.len() == 1 {
2081                    let (idx, params) = &chunk[0];
2082                    match self.inner.add_order(params).await {
2083                        Ok(response) => {
2084                            let status = if response.txid.is_empty() {
2085                                "Unknown error".to_string()
2086                            } else {
2087                                "placed".to_string()
2088                            };
2089                            all_statuses[*idx] = Some(status);
2090                        }
2091                        Err(e) => {
2092                            all_statuses[*idx] = Some(format!("batch_error: {e}"));
2093                        }
2094                    }
2095                    continue;
2096                }
2097
2098                let batch_params = KrakenSpotAddOrderBatchParams {
2099                    pair: chunk[0].1.pair,
2100                    orders: chunk
2101                        .iter()
2102                        .map(|(_, params)| params.clone().into())
2103                        .collect(),
2104                    asset_class: chunk[0].1.asset_class,
2105                };
2106
2107                match self.inner.add_order_batch(&batch_params).await {
2108                    Ok(response) => {
2109                        for (offset, (idx, _)) in chunk.iter().enumerate() {
2110                            let status = response.orders.get(offset).map_or_else(
2111                                || "Unknown error".to_string(),
2112                                |order| {
2113                                    if order.txid.is_some() {
2114                                        "placed".to_string()
2115                                    } else {
2116                                        order
2117                                            .error
2118                                            .clone()
2119                                            .unwrap_or_else(|| "Unknown error".to_string())
2120                                    }
2121                                },
2122                            );
2123                            all_statuses[*idx] = Some(status);
2124                        }
2125                    }
2126                    Err(e) => {
2127                        for (idx, _) in chunk {
2128                            all_statuses[*idx] = Some(format!("batch_error: {e}"));
2129                        }
2130                    }
2131                }
2132            }
2133        }
2134
2135        Ok(all_statuses
2136            .into_iter()
2137            .map(|status| status.unwrap_or_else(|| "Unknown error".to_string()))
2138            .collect())
2139    }
2140
2141    /// Modifies an existing order on the Kraken Spot exchange using atomic amend.
2142    ///
2143    /// Uses the AmendOrder endpoint which modifies the order in-place,
2144    /// keeping the same order ID and queue position.
2145    ///
2146    /// # Errors
2147    ///
2148    /// Returns an error if:
2149    /// - Neither `client_order_id` nor `venue_order_id` is provided.
2150    /// - The instrument is not found in cache.
2151    /// - The request fails.
2152    pub async fn modify_order(
2153        &self,
2154        instrument_id: InstrumentId,
2155        client_order_id: Option<ClientOrderId>,
2156        venue_order_id: Option<VenueOrderId>,
2157        quantity: Option<Quantity>,
2158        price: Option<Price>,
2159        trigger_price: Option<Price>,
2160    ) -> anyhow::Result<VenueOrderId> {
2161        let _ = self
2162            .get_cached_instrument(&instrument_id.symbol.inner())
2163            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
2164
2165        let txid = venue_order_id.as_ref().map(|id| id.to_string());
2166        let cl_ord_id = client_order_id.as_ref().map(truncate_cl_ord_id);
2167
2168        if txid.is_none() && cl_ord_id.is_none() {
2169            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2170        }
2171
2172        let mut builder = KrakenSpotAmendOrderParamsBuilder::default();
2173
2174        // Prefer txid (venue_order_id) over cl_ord_id
2175        if let Some(ref id) = txid {
2176            builder.txid(id.clone());
2177        } else if let Some(ref id) = cl_ord_id {
2178            builder.cl_ord_id(id.clone());
2179        }
2180
2181        if let Some(qty) = quantity {
2182            builder.order_qty(qty.to_string());
2183        }
2184
2185        if let Some(p) = price {
2186            builder.limit_price(p.to_string());
2187        }
2188
2189        if let Some(tp) = trigger_price {
2190            builder.trigger_price(tp.to_string());
2191        }
2192
2193        let params = builder
2194            .build()
2195            .map_err(|e| anyhow::anyhow!("Failed to build amend order params: {e}"))?;
2196
2197        let _response = self.inner.amend_order(&params).await?;
2198
2199        // AmendOrder modifies in-place, so the order keeps its original ID
2200        let order_id = venue_order_id
2201            .ok_or_else(|| anyhow::anyhow!("venue_order_id required for amend response"))?;
2202
2203        Ok(order_id)
2204    }
2205
2206    /// Cancels an order on the Kraken Spot exchange.
2207    ///
2208    /// # Errors
2209    ///
2210    /// Returns an error if:
2211    /// - Credentials are missing.
2212    /// - Neither client_order_id nor venue_order_id is provided.
2213    /// - The request fails.
2214    /// - The order cancellation is rejected.
2215    pub async fn cancel_order(
2216        &self,
2217        _account_id: AccountId,
2218        instrument_id: InstrumentId,
2219        client_order_id: Option<ClientOrderId>,
2220        venue_order_id: Option<VenueOrderId>,
2221    ) -> anyhow::Result<()> {
2222        let _ = self
2223            .get_cached_instrument(&instrument_id.symbol.inner())
2224            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
2225
2226        let txid = venue_order_id.as_ref().map(|id| id.to_string());
2227        let cl_ord_id = client_order_id.as_ref().map(truncate_cl_ord_id);
2228
2229        if txid.is_none() && cl_ord_id.is_none() {
2230            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2231        }
2232
2233        // Prefer txid (venue identifier) since Kraken always knows it.
2234        // cl_ord_id may not be known to Kraken for reconciled orders.
2235        let mut builder = KrakenSpotCancelOrderParamsBuilder::default();
2236
2237        if let Some(ref id) = txid {
2238            builder.txid(id.clone());
2239        } else if let Some(ref id) = cl_ord_id {
2240            builder.cl_ord_id(id.clone());
2241        }
2242        let params = builder
2243            .build()
2244            .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
2245
2246        self.inner.cancel_order(&params).await?;
2247
2248        Ok(())
2249    }
2250
2251    /// Cancels multiple orders on the Kraken Spot exchange (batched, max 50 per request).
2252    pub async fn cancel_orders_batch(
2253        &self,
2254        venue_order_ids: Vec<VenueOrderId>,
2255    ) -> anyhow::Result<i32> {
2256        if venue_order_ids.is_empty() {
2257            return Ok(0);
2258        }
2259
2260        let mut total_cancelled = 0;
2261
2262        for chunk in venue_order_ids.chunks(BATCH_CANCEL_LIMIT) {
2263            let orders: Vec<String> = chunk.iter().map(|id| id.to_string()).collect();
2264            let params = KrakenSpotCancelOrderBatchParams { orders };
2265
2266            let response = self.inner.cancel_order_batch(&params).await?;
2267            total_cancelled += response.count;
2268        }
2269
2270        Ok(total_cancelled)
2271    }
2272
2273    #[expect(clippy::too_many_arguments)]
2274    fn build_add_order_params(
2275        &self,
2276        instrument_id: InstrumentId,
2277        client_order_id: ClientOrderId,
2278        order_side: OrderSide,
2279        order_type: OrderType,
2280        quantity: Quantity,
2281        time_in_force: TimeInForce,
2282        expire_time: Option<UnixNanos>,
2283        price: Option<Price>,
2284        trigger_price: Option<Price>,
2285        trigger_type: Option<TriggerType>,
2286        trailing_offset: Option<Decimal>,
2287        limit_offset: Option<Decimal>,
2288        reduce_only: bool,
2289        post_only: bool,
2290        quote_quantity: bool,
2291        display_qty: Option<Quantity>,
2292    ) -> anyhow::Result<KrakenSpotAddOrderParams> {
2293        let instrument = self
2294            .get_cached_instrument(&instrument_id.symbol.inner())
2295            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
2296
2297        let raw_symbol = instrument.raw_symbol().inner();
2298        let asset_class = Self::asset_class_for(&instrument);
2299
2300        let kraken_side = match order_side {
2301            OrderSide::Buy => KrakenOrderSide::Buy,
2302            OrderSide::Sell => KrakenOrderSide::Sell,
2303            _ => anyhow::bail!("Invalid order side: {order_side:?}"),
2304        };
2305
2306        let kraken_order_type = match order_type {
2307            OrderType::Market => KrakenOrderType::Market,
2308            OrderType::Limit => KrakenOrderType::Limit,
2309            OrderType::StopMarket => KrakenOrderType::StopLoss,
2310            OrderType::StopLimit => KrakenOrderType::StopLossLimit,
2311            OrderType::MarketIfTouched => KrakenOrderType::TakeProfit,
2312            OrderType::LimitIfTouched => KrakenOrderType::TakeProfitLimit,
2313            OrderType::TrailingStopMarket => KrakenOrderType::TrailingStop,
2314            OrderType::TrailingStopLimit => KrakenOrderType::TrailingStopLimit,
2315            _ => anyhow::bail!("Unsupported order type: {order_type:?}"),
2316        };
2317
2318        let mut oflags = Vec::new();
2319        let is_limit_order = matches!(
2320            order_type,
2321            OrderType::Limit
2322                | OrderType::StopLimit
2323                | OrderType::LimitIfTouched
2324                | OrderType::TrailingStopLimit
2325        );
2326
2327        if time_in_force == TimeInForce::Fok && order_type != OrderType::Limit {
2328            anyhow::bail!("FOK time in force only supported for LIMIT orders on Kraken Spot");
2329        }
2330
2331        let (timeinforce, expiretm) =
2332            compute_time_in_force(is_limit_order, time_in_force, expire_time)?;
2333
2334        if post_only {
2335            oflags.push(KRAKEN_OFLAG_POST_ONLY);
2336        }
2337
2338        if reduce_only {
2339            log::warn!("reduce_only is not supported by Kraken Spot API, ignoring");
2340        }
2341
2342        if quote_quantity {
2343            oflags.push(KRAKEN_OFLAG_QUOTE_QUANTITY);
2344        }
2345
2346        let mut builder = KrakenSpotAddOrderParamsBuilder::default();
2347        builder
2348            .cl_ord_id(truncate_cl_ord_id(&client_order_id))
2349            .broker(NAUTILUS_KRAKEN_BROKER_ID)
2350            .pair(raw_symbol)
2351            .side(kraken_side)
2352            .volume(quantity.to_string())
2353            .order_type(kraken_order_type);
2354
2355        let is_conditional = matches!(
2356            order_type,
2357            OrderType::StopMarket
2358                | OrderType::StopLimit
2359                | OrderType::MarketIfTouched
2360                | OrderType::LimitIfTouched
2361                | OrderType::TrailingStopMarket
2362                | OrderType::TrailingStopLimit
2363        );
2364
2365        let is_trailing = matches!(
2366            order_type,
2367            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
2368        );
2369
2370        if is_trailing {
2371            if trigger_price.is_some() {
2372                anyhow::bail!(
2373                    "Kraken Spot trailing stops do not support activation trigger prices"
2374                );
2375            }
2376
2377            if let Some(offset) = trailing_offset {
2378                builder.price(offset.to_string());
2379            }
2380
2381            if let Some(offset) = limit_offset {
2382                builder.price2(offset.to_string());
2383            }
2384        } else if is_conditional {
2385            if let Some(trigger) = trigger_price {
2386                builder.price(trigger.to_string());
2387            }
2388
2389            if let Some(limit) = price {
2390                builder.price2(limit.to_string());
2391            }
2392        } else if let Some(limit) = price {
2393            builder.price(limit.to_string());
2394        }
2395
2396        if is_conditional {
2397            match trigger_type {
2398                Some(TriggerType::IndexPrice) => {
2399                    builder.trigger("index".to_string());
2400                }
2401                Some(TriggerType::LastPrice | TriggerType::Default) | None => {}
2402                Some(other) => {
2403                    anyhow::bail!(
2404                        "Unsupported trigger type for Kraken Spot: {other:?} (only LastPrice and IndexPrice supported)"
2405                    );
2406                }
2407            }
2408        }
2409
2410        if !oflags.is_empty() {
2411            builder.oflags(oflags.join(","));
2412        }
2413
2414        if let Some(tif) = timeinforce {
2415            builder.timeinforce(tif);
2416        }
2417
2418        if let Some(expire) = expiretm {
2419            builder.expiretm(expire);
2420        }
2421
2422        if let Some(dq) = display_qty {
2423            builder.displayvol(dq.to_string());
2424        }
2425
2426        if let Some(ac) = asset_class {
2427            builder.asset_class(ac);
2428        }
2429
2430        builder
2431            .build()
2432            .map_err(|e| anyhow::anyhow!("Failed to build order params: {e}"))
2433    }
2434}
2435
2436fn collect_spot_statuses(
2437    asset_pairs: &AssetPairsResponse,
2438) -> AHashMap<InstrumentId, MarketStatusAction> {
2439    asset_pairs
2440        .iter()
2441        .map(|(_, definition)| {
2442            let symbol_str = definition.wsname.as_ref().unwrap_or(&definition.altname);
2443            let normalized_symbol = normalize_spot_symbol(symbol_str.as_str());
2444            let instrument_id = InstrumentId::new(Symbol::new(&normalized_symbol), *KRAKEN_VENUE);
2445            let action = definition
2446                .status
2447                .map_or(MarketStatusAction::Trading, MarketStatusAction::from);
2448
2449            (instrument_id, action)
2450        })
2451        .collect()
2452}
2453
2454#[cfg(test)]
2455mod tests {
2456    use nautilus_model::instruments::CurrencyPair;
2457    use rstest::rstest;
2458
2459    use super::*;
2460
2461    #[rstest]
2462    fn test_raw_client_creation() {
2463        let client = KrakenSpotRawHttpClient::default();
2464        assert!(client.credential.is_none());
2465    }
2466
2467    #[rstest]
2468    fn test_raw_client_with_credentials() {
2469        let client = KrakenSpotRawHttpClient::with_credentials(
2470            "test_key".to_string(),
2471            "test_secret".to_string(),
2472            KrakenEnvironment::Mainnet,
2473            None,
2474            60,
2475            None,
2476            None,
2477            None,
2478            None,
2479            KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND,
2480        )
2481        .unwrap();
2482        assert!(client.credential.is_some());
2483    }
2484
2485    #[rstest]
2486    fn test_client_creation() {
2487        let client = KrakenSpotHttpClient::default();
2488        assert!(client.instruments_cache.is_empty());
2489    }
2490
2491    #[rstest]
2492    fn test_client_with_credentials() {
2493        let client = KrakenSpotHttpClient::with_credentials(
2494            "test_key".to_string(),
2495            "test_secret".to_string(),
2496            KrakenEnvironment::Mainnet,
2497            None,
2498            60,
2499            None,
2500            None,
2501            None,
2502            None,
2503            KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND,
2504        )
2505        .unwrap();
2506        assert!(client.instruments_cache.is_empty());
2507    }
2508
2509    #[rstest]
2510    fn test_nonce_generation_strictly_increasing() {
2511        let client = KrakenSpotRawHttpClient::default();
2512
2513        let nonce1 = client.generate_nonce();
2514        let nonce2 = client.generate_nonce();
2515        let nonce3 = client.generate_nonce();
2516
2517        assert!(
2518            nonce2 > nonce1,
2519            "nonce2 ({nonce2}) should be > nonce1 ({nonce1})"
2520        );
2521        assert!(
2522            nonce3 > nonce2,
2523            "nonce3 ({nonce3}) should be > nonce2 ({nonce2})"
2524        );
2525    }
2526
2527    #[rstest]
2528    fn test_nonce_is_nanosecond_timestamp() {
2529        let client = KrakenSpotRawHttpClient::default();
2530
2531        let nonce = client.generate_nonce();
2532
2533        // Nonce should be a nanosecond timestamp (roughly 1.7e18 for Dec 2025)
2534        // Verify it's in a reasonable range (> 1.5e18, which is ~2017)
2535        assert!(
2536            nonce > 1_500_000_000_000_000_000,
2537            "Nonce should be nanosecond timestamp"
2538        );
2539    }
2540
2541    #[rstest]
2542    #[case::gtc_limit(true, TimeInForce::Gtc, None, None, None)]
2543    #[case::ioc_limit(true, TimeInForce::Ioc, None, Some("IOC"), None)]
2544    #[case::fok_limit(true, TimeInForce::Fok, None, Some("FOK"), None)]
2545    #[case::gtd_limit_with_expire(
2546        true,
2547        TimeInForce::Gtd,
2548        Some(1_704_067_200_000_000_000u64),
2549        Some("GTD"),
2550        Some("1704067200")
2551    )]
2552    #[case::gtc_market(false, TimeInForce::Gtc, None, None, None)]
2553    #[case::ioc_market(false, TimeInForce::Ioc, None, None, None)]
2554    fn test_compute_time_in_force_success(
2555        #[case] is_limit: bool,
2556        #[case] tif: TimeInForce,
2557        #[case] expire_nanos: Option<u64>,
2558        #[case] expected_tif: Option<&str>,
2559        #[case] expected_expire: Option<&str>,
2560    ) {
2561        let expire_time = expire_nanos.map(UnixNanos::from);
2562        let result = compute_time_in_force(is_limit, tif, expire_time).unwrap();
2563        assert_eq!(result.0, expected_tif.map(String::from));
2564        assert_eq!(result.1, expected_expire.map(String::from));
2565    }
2566
2567    #[rstest]
2568    #[case::gtd_missing_expire(TimeInForce::Gtd, None, "expire_time")]
2569    fn test_compute_time_in_force_errors(
2570        #[case] tif: TimeInForce,
2571        #[case] expire_nanos: Option<u64>,
2572        #[case] expected_error: &str,
2573    ) {
2574        let expire_time = expire_nanos.map(UnixNanos::from);
2575        let result = compute_time_in_force(true, tif, expire_time);
2576        assert!(result.is_err());
2577        assert!(result.unwrap_err().to_string().contains(expected_error));
2578    }
2579
2580    #[rstest]
2581    fn test_build_add_order_params_sets_index_trigger_for_conditional_orders() {
2582        let client = KrakenSpotHttpClient::default();
2583        let instrument_id = cache_test_spot_instrument(&client);
2584
2585        let params = client
2586            .build_add_order_params(
2587                instrument_id,
2588                ClientOrderId::new("spot-trigger-index"),
2589                OrderSide::Buy,
2590                OrderType::StopMarket,
2591                Quantity::from("0.01"),
2592                TimeInForce::Gtc,
2593                None,
2594                None,
2595                Some(Price::from("50000")),
2596                Some(TriggerType::IndexPrice),
2597                None,
2598                None,
2599                false,
2600                false,
2601                false,
2602                None,
2603            )
2604            .unwrap();
2605
2606        assert_eq!(params.trigger, Some("index".to_string()));
2607        assert_eq!(params.price, Some("50000".to_string()));
2608    }
2609
2610    #[rstest]
2611    fn test_build_add_order_params_sets_trailing_offsets() {
2612        let client = KrakenSpotHttpClient::default();
2613        let instrument_id = cache_test_spot_instrument(&client);
2614
2615        let params = client
2616            .build_add_order_params(
2617                instrument_id,
2618                ClientOrderId::new("spot-trailing"),
2619                OrderSide::Sell,
2620                OrderType::TrailingStopLimit,
2621                Quantity::from("0.01"),
2622                TimeInForce::Gtc,
2623                None,
2624                Some(Price::from("49900")),
2625                None,
2626                Some(TriggerType::LastPrice),
2627                Some(Decimal::from(50)),
2628                Some(Decimal::from(25)),
2629                false,
2630                false,
2631                false,
2632                Some(Quantity::from("0.005")),
2633            )
2634            .unwrap();
2635
2636        assert_eq!(params.price, Some("50".to_string()));
2637        assert_eq!(params.price2, Some("25".to_string()));
2638        assert_eq!(params.trigger, None);
2639        assert_eq!(params.displayvol, Some("0.005".to_string()));
2640    }
2641
2642    #[rstest]
2643    fn test_build_add_order_params_rejects_unsupported_trigger_type() {
2644        let client = KrakenSpotHttpClient::default();
2645        let instrument_id = cache_test_spot_instrument(&client);
2646
2647        let error = client
2648            .build_add_order_params(
2649                instrument_id,
2650                ClientOrderId::new("spot-trigger-invalid"),
2651                OrderSide::Buy,
2652                OrderType::StopMarket,
2653                Quantity::from("0.01"),
2654                TimeInForce::Gtc,
2655                None,
2656                None,
2657                Some(Price::from("50000")),
2658                Some(TriggerType::MarkPrice),
2659                None,
2660                None,
2661                false,
2662                false,
2663                false,
2664                None,
2665            )
2666            .unwrap_err();
2667
2668        assert!(
2669            error
2670                .to_string()
2671                .contains("Unsupported trigger type for Kraken Spot")
2672        );
2673    }
2674
2675    fn cache_test_spot_instrument(client: &KrakenSpotHttpClient) -> InstrumentId {
2676        let instrument_id = InstrumentId::from("XBT/USD.KRAKEN");
2677
2678        client.cache_instrument(InstrumentAny::CurrencyPair(CurrencyPair::new(
2679            instrument_id,
2680            Symbol::new("XBTUSD"),
2681            Currency::BTC(),
2682            Currency::USD(),
2683            1,
2684            8,
2685            Price::from("0.1"),
2686            Quantity::from("0.00000001"),
2687            None,
2688            None,
2689            None,
2690            None,
2691            None,
2692            None,
2693            None,
2694            None,
2695            None,
2696            None,
2697            None,
2698            None,
2699            None,
2700            0.into(),
2701            0.into(),
2702        )));
2703
2704        instrument_id
2705    }
2706}