1use std::{
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroU32,
22 sync::{
23 Arc, RwLock,
24 atomic::{AtomicBool, Ordering},
25 },
26};
27
28use ahash::AHashMap;
29use chrono::{DateTime, Utc};
30use indexmap::IndexMap;
31use nautilus_core::{
32 AtomicMap, AtomicTime, UUID4, consts::NAUTILUS_USER_AGENT, datetime::NANOSECONDS_IN_SECOND,
33 nanos::UnixNanos, time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36 data::{Bar, BarType, BookOrder, TradeTick},
37 enums::{
38 AccountType, BookType, CurrencyType, MarketStatusAction, OrderSide, OrderType,
39 PositionSideSpecified, TimeInForce, TriggerType,
40 },
41 events::AccountState,
42 identifiers::{AccountId, ClientOrderId, InstrumentId, Symbol, VenueOrderId},
43 instruments::{Instrument, InstrumentAny},
44 orderbook::OrderBook,
45 reports::{FillReport, OrderStatusReport, PositionStatusReport},
46 types::{AccountBalance, Currency, Price, Quantity},
47};
48use nautilus_network::{
49 http::{HttpClient, Method, USER_AGENT},
50 ratelimiter::quota::Quota,
51 retry::{RetryConfig, RetryManager},
52};
53use rust_decimal::Decimal;
54use serde::de::DeserializeOwned;
55use tokio_util::sync::CancellationToken;
56use ustr::Ustr;
57
58use super::{models::*, query::*};
59use crate::{
60 common::{
61 consts::{
62 KRAKEN_OFLAG_POST_ONLY, KRAKEN_OFLAG_QUOTE_QUANTITY, KRAKEN_VENUE,
63 NAUTILUS_KRAKEN_BROKER_ID,
64 },
65 credential::KrakenCredential,
66 enums::{
67 KrakenAssetClass, KrakenEnvironment, KrakenOrderSide, KrakenOrderType,
68 KrakenProductType,
69 },
70 parse::{
71 bar_type_to_spot_interval, normalize_currency_code, normalize_spot_symbol, parse_bar,
72 parse_fill_report, parse_order_status_report, parse_spot_instrument,
73 parse_tokenized_instrument, parse_trade_tick_from_array, truncate_cl_ord_id,
74 },
75 urls::get_kraken_http_base_url,
76 },
77 http::error::KrakenHttpError,
78};
79
80pub const KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 5;
82
83const KRAKEN_GLOBAL_RATE_KEY: &str = "kraken:spot:global";
84
85const BATCH_CANCEL_LIMIT: usize = 50;
87
88const BATCH_SUBMIT_LIMIT: usize = 15;
90
91fn compute_time_in_force(
96 is_limit_order: bool,
97 time_in_force: TimeInForce,
98 expire_time: Option<UnixNanos>,
99) -> anyhow::Result<(Option<String>, Option<String>)> {
100 if is_limit_order {
101 match time_in_force {
102 TimeInForce::Gtc => Ok((None, None)), TimeInForce::Ioc => Ok((Some("IOC".to_string()), None)),
104 TimeInForce::Fok => Ok((Some("FOK".to_string()), None)),
105 TimeInForce::Gtd => {
106 let expire = expire_time.ok_or_else(|| {
107 anyhow::anyhow!("GTD time in force requires expire_time parameter")
108 })?;
109 let expire_secs = expire.as_u64() / NANOSECONDS_IN_SECOND;
111 Ok((Some("GTD".to_string()), Some(expire_secs.to_string())))
112 }
113 _ => anyhow::bail!("Unsupported time in force: {time_in_force:?}"),
114 }
115 } else {
116 Ok((None, None))
118 }
119}
120
121pub struct KrakenSpotRawHttpClient {
126 base_url: String,
127 client: HttpClient,
128 credential: Option<KrakenCredential>,
129 retry_manager: RetryManager<KrakenHttpError>,
130 cancellation_token: CancellationToken,
131 clock: &'static AtomicTime,
132 auth_mutex: tokio::sync::Mutex<()>,
134}
135
136impl Default for KrakenSpotRawHttpClient {
137 fn default() -> Self {
138 Self::new(
139 KrakenEnvironment::Mainnet,
140 None,
141 60,
142 None,
143 None,
144 None,
145 None,
146 KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND,
147 )
148 .expect("Failed to create default KrakenSpotRawHttpClient")
149 }
150}
151
152impl Debug for KrakenSpotRawHttpClient {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 f.debug_struct(stringify!(KrakenSpotRawHttpClient))
155 .field("base_url", &self.base_url)
156 .field("has_credentials", &self.credential.is_some())
157 .finish()
158 }
159}
160
161impl KrakenSpotRawHttpClient {
162 #[expect(clippy::too_many_arguments)]
164 pub fn new(
165 environment: KrakenEnvironment,
166 base_url_override: Option<String>,
167 timeout_secs: u64,
168 max_retries: Option<u32>,
169 retry_delay_ms: Option<u64>,
170 retry_delay_max_ms: Option<u64>,
171 proxy_url: Option<String>,
172 max_requests_per_second: u32,
173 ) -> anyhow::Result<Self> {
174 let retry_config = RetryConfig {
175 max_retries: max_retries.unwrap_or(3),
176 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
177 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
178 backoff_factor: 2.0,
179 jitter_ms: 1000,
180 operation_timeout_ms: Some(60_000),
181 immediate_first: false,
182 max_elapsed_ms: Some(180_000),
183 };
184
185 let retry_manager = RetryManager::new(retry_config);
186 let base_url = base_url_override.unwrap_or_else(|| {
187 get_kraken_http_base_url(KrakenProductType::Spot, environment).to_string()
188 });
189
190 Ok(Self {
191 base_url,
192 client: HttpClient::new(
193 Self::default_headers(),
194 vec![],
195 Self::rate_limiter_quotas(max_requests_per_second)?,
196 Some(Self::default_quota(max_requests_per_second)?),
197 Some(timeout_secs),
198 proxy_url,
199 )
200 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
201 credential: None,
202 retry_manager,
203 cancellation_token: CancellationToken::new(),
204 clock: get_atomic_clock_realtime(),
205 auth_mutex: tokio::sync::Mutex::new(()),
206 })
207 }
208
209 #[expect(clippy::too_many_arguments)]
211 pub fn with_credentials(
212 api_key: String,
213 api_secret: String,
214 environment: KrakenEnvironment,
215 base_url_override: Option<String>,
216 timeout_secs: u64,
217 max_retries: Option<u32>,
218 retry_delay_ms: Option<u64>,
219 retry_delay_max_ms: Option<u64>,
220 proxy_url: Option<String>,
221 max_requests_per_second: u32,
222 ) -> anyhow::Result<Self> {
223 let retry_config = RetryConfig {
224 max_retries: max_retries.unwrap_or(3),
225 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
226 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
227 backoff_factor: 2.0,
228 jitter_ms: 1000,
229 operation_timeout_ms: Some(60_000),
230 immediate_first: false,
231 max_elapsed_ms: Some(180_000),
232 };
233
234 let retry_manager = RetryManager::new(retry_config);
235 let base_url = base_url_override.unwrap_or_else(|| {
236 get_kraken_http_base_url(KrakenProductType::Spot, environment).to_string()
237 });
238
239 Ok(Self {
240 base_url,
241 client: HttpClient::new(
242 Self::default_headers(),
243 vec![],
244 Self::rate_limiter_quotas(max_requests_per_second)?,
245 Some(Self::default_quota(max_requests_per_second)?),
246 Some(timeout_secs),
247 proxy_url,
248 )
249 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
250 credential: Some(KrakenCredential::new(api_key, api_secret)),
251 retry_manager,
252 cancellation_token: CancellationToken::new(),
253 clock: get_atomic_clock_realtime(),
254 auth_mutex: tokio::sync::Mutex::new(()),
255 })
256 }
257
258 fn generate_nonce(&self) -> u64 {
263 self.clock.get_time_ns().as_u64()
264 }
265
266 pub fn base_url(&self) -> &str {
268 &self.base_url
269 }
270
271 pub fn credential(&self) -> Option<&KrakenCredential> {
273 self.credential.as_ref()
274 }
275
276 pub fn cancel_all_requests(&self) {
278 self.cancellation_token.cancel();
279 }
280
281 pub fn cancellation_token(&self) -> &CancellationToken {
283 &self.cancellation_token
284 }
285
286 fn default_headers() -> HashMap<String, String> {
287 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
288 }
289
290 fn default_quota(max_requests_per_second: u32) -> anyhow::Result<Quota> {
291 let burst = NonZeroU32::new(max_requests_per_second).unwrap_or(
292 NonZeroU32::new(KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND).expect("non-zero"),
293 );
294 Quota::per_second(burst).ok_or_else(|| {
295 anyhow::anyhow!(
296 "Invalid max_requests_per_second: {max_requests_per_second} exceeds maximum"
297 )
298 })
299 }
300
301 fn rate_limiter_quotas(max_requests_per_second: u32) -> anyhow::Result<Vec<(String, Quota)>> {
302 Ok(vec![(
303 KRAKEN_GLOBAL_RATE_KEY.to_string(),
304 Self::default_quota(max_requests_per_second)?,
305 )])
306 }
307
308 fn rate_limit_keys(endpoint: &str) -> Vec<String> {
309 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
310 let route = format!("kraken:spot:{normalized}");
311 vec![KRAKEN_GLOBAL_RATE_KEY.to_string(), route]
312 }
313
314 fn sign_spot(
315 &self,
316 path: &str,
317 nonce: u64,
318 params: &HashMap<String, String>,
319 ) -> anyhow::Result<(HashMap<String, String>, String)> {
320 let credential = self
321 .credential
322 .as_ref()
323 .ok_or_else(|| anyhow::anyhow!("Missing credentials"))?;
324
325 let (signature, post_data) = credential.sign_spot(path, nonce, params)?;
326
327 let mut headers = HashMap::new();
328 headers.insert("API-Key".to_string(), credential.api_key().to_string());
329 headers.insert("API-Sign".to_string(), signature);
330
331 Ok((headers, post_data))
332 }
333
334 async fn send_request<T: DeserializeOwned>(
335 &self,
336 method: Method,
337 endpoint: &str,
338 body: Option<Vec<u8>>,
339 authenticate: bool,
340 ) -> anyhow::Result<KrakenResponse<T>, KrakenHttpError> {
341 let _guard = if authenticate {
345 Some(self.auth_mutex.lock().await)
346 } else {
347 None
348 };
349
350 let endpoint = endpoint.to_string();
351 let url = format!("{}{endpoint}", self.base_url);
352 let method_clone = method.clone();
353 let body_clone = body.clone();
354
355 let operation = || {
356 let url = url.clone();
357 let method = method_clone.clone();
358 let body = body_clone.clone();
359 let endpoint = endpoint.clone();
360
361 async move {
362 let mut headers = Self::default_headers();
363
364 let final_body = if authenticate {
365 let nonce = self.generate_nonce();
366 log::debug!("Generated nonce {nonce} for {endpoint}");
367
368 let params: HashMap<String, String> = if let Some(ref body_bytes) = body {
369 let body_str = std::str::from_utf8(body_bytes).map_err(|e| {
370 KrakenHttpError::ParseError(format!(
371 "Invalid UTF-8 in request body: {e}"
372 ))
373 })?;
374 serde_urlencoded::from_str(body_str).map_err(|e| {
375 KrakenHttpError::ParseError(format!(
376 "Failed to parse request params: {e}"
377 ))
378 })?
379 } else {
380 HashMap::new()
381 };
382
383 let (auth_headers, post_data) = self
384 .sign_spot(&endpoint, nonce, ¶ms)
385 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
386 headers.extend(auth_headers);
387 Some(post_data.into_bytes())
388 } else {
389 body
390 };
391
392 if method == Method::POST {
393 headers.insert(
394 "Content-Type".to_string(),
395 "application/x-www-form-urlencoded".to_string(),
396 );
397 }
398
399 let rate_limit_keys = Self::rate_limit_keys(&endpoint);
400
401 let response = self
402 .client
403 .request(
404 method,
405 url,
406 None,
407 Some(headers),
408 final_body,
409 None,
410 Some(rate_limit_keys),
411 )
412 .await
413 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
414
415 let status = response.status.as_u16();
416 if status >= 400 {
417 let body = String::from_utf8_lossy(&response.body).to_string();
418 if status == 401 || status == 403 {
420 return Err(KrakenHttpError::AuthenticationError(format!(
421 "HTTP error {status}: {body}"
422 )));
423 }
424 return Err(KrakenHttpError::NetworkError(format!(
425 "HTTP error {status}: {body}"
426 )));
427 }
428
429 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
430 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
431 })?;
432
433 let kraken_response: KrakenResponse<T> = serde_json::from_str(&response_text)
434 .map_err(|e| {
435 KrakenHttpError::ParseError(format!("Failed to deserialize response: {e}"))
436 })?;
437
438 if !kraken_response.error.is_empty() {
439 return Err(KrakenHttpError::ApiError(kraken_response.error));
440 }
441
442 Ok(kraken_response)
443 }
444 };
445
446 let should_retry =
447 |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
448 let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
449
450 self.retry_manager
451 .execute_with_retry_with_cancel(
452 &endpoint,
453 operation,
454 should_retry,
455 create_error,
456 &self.cancellation_token,
457 )
458 .await
459 }
460
461 pub async fn get_server_time(&self) -> anyhow::Result<ServerTime, KrakenHttpError> {
463 let response: KrakenResponse<ServerTime> = self
464 .send_request(Method::GET, "/0/public/Time", None, false)
465 .await?;
466
467 response.result.ok_or_else(|| {
468 KrakenHttpError::ParseError("Missing result in server time response".to_string())
469 })
470 }
471
472 pub async fn get_system_status(&self) -> anyhow::Result<SystemStatus, KrakenHttpError> {
474 let response: KrakenResponse<SystemStatus> = self
475 .send_request(Method::GET, "/0/public/SystemStatus", None, false)
476 .await?;
477
478 response.result.ok_or_else(|| {
479 KrakenHttpError::ParseError("Missing result in system status response".to_string())
480 })
481 }
482
483 pub async fn get_asset_pairs(
488 &self,
489 pairs: Option<Vec<String>>,
490 aclass_base: Option<&str>,
491 ) -> anyhow::Result<AssetPairsResponse, KrakenHttpError> {
492 let mut params = Vec::new();
493
494 if let Some(pairs) = pairs {
495 params.push(format!("pair={}", pairs.join(",")));
496 }
497
498 if let Some(aclass) = aclass_base {
499 params.push(format!("aclass_base={aclass}"));
500 }
501
502 let endpoint = if params.is_empty() {
503 "/0/public/AssetPairs".to_string()
504 } else {
505 format!("/0/public/AssetPairs?{}", params.join("&"))
506 };
507
508 let response: KrakenResponse<AssetPairsResponse> = self
509 .send_request(Method::GET, &endpoint, None, false)
510 .await?;
511
512 response.result.ok_or_else(|| {
513 KrakenHttpError::ParseError("Missing result in asset pairs response".to_string())
514 })
515 }
516
517 pub async fn get_ticker(
519 &self,
520 pairs: Vec<String>,
521 asset_class: Option<KrakenAssetClass>,
522 ) -> anyhow::Result<TickerResponse, KrakenHttpError> {
523 let mut endpoint = format!("/0/public/Ticker?pair={}", pairs.join(","));
524
525 if let Some(aclass) = asset_class {
526 endpoint.push_str(&format!("&asset_class={aclass}"));
527 }
528
529 let response: KrakenResponse<TickerResponse> = self
530 .send_request(Method::GET, &endpoint, None, false)
531 .await?;
532
533 response.result.ok_or_else(|| {
534 KrakenHttpError::ParseError("Missing result in ticker response".to_string())
535 })
536 }
537
538 pub async fn get_ohlc(
540 &self,
541 pair: &str,
542 interval: Option<u32>,
543 since: Option<i64>,
544 asset_class: Option<KrakenAssetClass>,
545 ) -> anyhow::Result<OhlcResponse, KrakenHttpError> {
546 let mut endpoint = format!("/0/public/OHLC?pair={pair}");
547
548 if let Some(aclass) = asset_class {
549 endpoint.push_str(&format!("&asset_class={aclass}"));
550 }
551
552 if let Some(interval) = interval {
553 endpoint.push_str(&format!("&interval={interval}"));
554 }
555
556 if let Some(since) = since {
557 endpoint.push_str(&format!("&since={since}"));
558 }
559
560 let response: KrakenResponse<OhlcResponse> = self
561 .send_request(Method::GET, &endpoint, None, false)
562 .await?;
563
564 response.result.ok_or_else(|| {
565 KrakenHttpError::ParseError("Missing result in OHLC response".to_string())
566 })
567 }
568
569 pub async fn get_book_depth(
571 &self,
572 pair: &str,
573 count: Option<u32>,
574 asset_class: Option<KrakenAssetClass>,
575 ) -> anyhow::Result<OrderBookResponse, KrakenHttpError> {
576 let mut endpoint = format!("/0/public/Depth?pair={pair}");
577
578 if let Some(aclass) = asset_class {
579 endpoint.push_str(&format!("&asset_class={aclass}"));
580 }
581
582 if let Some(count) = count {
583 endpoint.push_str(&format!("&count={count}"));
584 }
585
586 let response: KrakenResponse<OrderBookResponse> = self
587 .send_request(Method::GET, &endpoint, None, false)
588 .await?;
589
590 response.result.ok_or_else(|| {
591 KrakenHttpError::ParseError("Missing result in book depth response".to_string())
592 })
593 }
594
595 pub async fn get_trades(
597 &self,
598 pair: &str,
599 since: Option<String>,
600 asset_class: Option<KrakenAssetClass>,
601 ) -> anyhow::Result<TradesResponse, KrakenHttpError> {
602 let mut endpoint = format!("/0/public/Trades?pair={pair}");
603
604 if let Some(aclass) = asset_class {
605 endpoint.push_str(&format!("&asset_class={aclass}"));
606 }
607
608 if let Some(since) = since {
609 endpoint.push_str(&format!("&since={since}"));
610 }
611
612 let response: KrakenResponse<TradesResponse> = self
613 .send_request(Method::GET, &endpoint, None, false)
614 .await?;
615
616 response.result.ok_or_else(|| {
617 KrakenHttpError::ParseError("Missing result in trades response".to_string())
618 })
619 }
620
621 pub async fn get_websockets_token(&self) -> anyhow::Result<WebSocketToken, KrakenHttpError> {
623 if self.credential.is_none() {
624 return Err(KrakenHttpError::AuthenticationError(
625 "API credentials required for GetWebSocketsToken".to_string(),
626 ));
627 }
628
629 let response: KrakenResponse<WebSocketToken> = self
630 .send_request(Method::POST, "/0/private/GetWebSocketsToken", None, true)
631 .await?;
632
633 response.result.ok_or_else(|| {
634 KrakenHttpError::ParseError("Missing result in websockets token response".to_string())
635 })
636 }
637
638 pub async fn get_open_orders(
640 &self,
641 trades: Option<bool>,
642 userref: Option<i64>,
643 ) -> anyhow::Result<IndexMap<String, SpotOrder>, KrakenHttpError> {
644 if self.credential.is_none() {
645 return Err(KrakenHttpError::AuthenticationError(
646 "API credentials required for OpenOrders".to_string(),
647 ));
648 }
649
650 let mut params = vec![];
651
652 if let Some(trades_flag) = trades {
653 params.push(format!("trades={trades_flag}"));
654 }
655
656 if let Some(userref_val) = userref {
657 params.push(format!("userref={userref_val}"));
658 }
659
660 let body = if params.is_empty() {
661 None
662 } else {
663 Some(params.join("&").into_bytes())
664 };
665
666 let response: KrakenResponse<SpotOpenOrdersResult> = self
667 .send_request(Method::POST, "/0/private/OpenOrders", body, true)
668 .await?;
669
670 let result = response.result.ok_or_else(|| {
671 KrakenHttpError::ParseError("Missing result in open orders response".to_string())
672 })?;
673
674 Ok(result.open)
675 }
676
677 pub async fn get_closed_orders(
679 &self,
680 trades: Option<bool>,
681 userref: Option<i64>,
682 start: Option<i64>,
683 end: Option<i64>,
684 ofs: Option<i32>,
685 closetime: Option<String>,
686 ) -> anyhow::Result<IndexMap<String, SpotOrder>, KrakenHttpError> {
687 if self.credential.is_none() {
688 return Err(KrakenHttpError::AuthenticationError(
689 "API credentials required for ClosedOrders".to_string(),
690 ));
691 }
692
693 let mut params = vec![];
694
695 if let Some(trades_flag) = trades {
696 params.push(format!("trades={trades_flag}"));
697 }
698
699 if let Some(userref_val) = userref {
700 params.push(format!("userref={userref_val}"));
701 }
702
703 if let Some(start_val) = start {
704 params.push(format!("start={start_val}"));
705 }
706
707 if let Some(end_val) = end {
708 params.push(format!("end={end_val}"));
709 }
710
711 if let Some(ofs_val) = ofs {
712 params.push(format!("ofs={ofs_val}"));
713 }
714
715 if let Some(closetime_val) = closetime {
716 params.push(format!("closetime={closetime_val}"));
717 }
718
719 let body = if params.is_empty() {
720 None
721 } else {
722 Some(params.join("&").into_bytes())
723 };
724
725 let response: KrakenResponse<SpotClosedOrdersResult> = self
726 .send_request(Method::POST, "/0/private/ClosedOrders", body, true)
727 .await?;
728
729 let result = response.result.ok_or_else(|| {
730 KrakenHttpError::ParseError("Missing result in closed orders response".to_string())
731 })?;
732
733 Ok(result.closed)
734 }
735
736 pub async fn get_trades_history(
738 &self,
739 trade_type: Option<String>,
740 trades: Option<bool>,
741 start: Option<i64>,
742 end: Option<i64>,
743 ofs: Option<i32>,
744 ) -> anyhow::Result<IndexMap<String, SpotTrade>, KrakenHttpError> {
745 if self.credential.is_none() {
746 return Err(KrakenHttpError::AuthenticationError(
747 "API credentials required for TradesHistory".to_string(),
748 ));
749 }
750
751 let mut params = vec![];
752
753 if let Some(type_val) = trade_type {
754 params.push(format!("type={type_val}"));
755 }
756
757 if let Some(trades_flag) = trades {
758 params.push(format!("trades={trades_flag}"));
759 }
760
761 if let Some(start_val) = start {
762 params.push(format!("start={start_val}"));
763 }
764
765 if let Some(end_val) = end {
766 params.push(format!("end={end_val}"));
767 }
768
769 if let Some(ofs_val) = ofs {
770 params.push(format!("ofs={ofs_val}"));
771 }
772
773 let body = if params.is_empty() {
774 None
775 } else {
776 Some(params.join("&").into_bytes())
777 };
778
779 let response: KrakenResponse<SpotTradesHistoryResult> = self
780 .send_request(Method::POST, "/0/private/TradesHistory", body, true)
781 .await?;
782
783 let result = response.result.ok_or_else(|| {
784 KrakenHttpError::ParseError("Missing result in trades history response".to_string())
785 })?;
786
787 Ok(result.trades)
788 }
789
790 pub async fn add_order(
792 &self,
793 params: &KrakenSpotAddOrderParams,
794 ) -> anyhow::Result<SpotAddOrderResponse, KrakenHttpError> {
795 if self.credential.is_none() {
796 return Err(KrakenHttpError::AuthenticationError(
797 "API credentials required for adding orders".to_string(),
798 ));
799 }
800
801 let param_string = serde_urlencoded::to_string(params)
802 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
803 let body = Some(param_string.into_bytes());
804
805 let response: KrakenResponse<SpotAddOrderResponse> = self
806 .send_request(Method::POST, "/0/private/AddOrder", body, true)
807 .await?;
808
809 response
810 .result
811 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
812 }
813
814 pub async fn add_order_batch(
816 &self,
817 params: &KrakenSpotAddOrderBatchParams,
818 ) -> anyhow::Result<SpotAddOrderBatchResponse, KrakenHttpError> {
819 let credential = self.credential.as_ref().ok_or_else(|| {
820 KrakenHttpError::AuthenticationError(
821 "API credentials required for adding orders".to_string(),
822 )
823 })?;
824
825 let _guard = self.auth_mutex.lock().await;
826
827 let endpoint = "/0/private/AddOrderBatch";
828 let nonce = self.generate_nonce();
829
830 let mut json_body = serde_json::json!({
831 "nonce": nonce.to_string(),
832 "pair": params.pair,
833 "orders": params.orders,
834 });
835
836 if let Some(aclass) = ¶ms.asset_class {
837 json_body["asset_class"] = serde_json::json!(aclass);
838 }
839 let json_str = serde_json::to_string(&json_body)
840 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize: {e}")))?;
841
842 let signature = credential
843 .sign_spot_json(endpoint, nonce, &json_str)
844 .map_err(|e| KrakenHttpError::AuthenticationError(format!("Failed to sign: {e}")))?;
845
846 let mut headers = Self::default_headers();
847 headers.insert("API-Key".to_string(), credential.api_key().to_string());
848 headers.insert("API-Sign".to_string(), signature);
849 headers.insert("Content-Type".to_string(), "application/json".to_string());
850
851 let url = format!("{}{endpoint}", self.base_url);
852 let rate_limit_keys = Self::rate_limit_keys(endpoint);
853
854 let response = self
855 .client
856 .request(
857 Method::POST,
858 url,
859 None,
860 Some(headers),
861 Some(json_str.into_bytes()),
862 None,
863 Some(rate_limit_keys),
864 )
865 .await
866 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
867
868 if !response.status.is_success() {
869 return Err(KrakenHttpError::NetworkError(format!(
870 "HTTP {:?} for {}",
871 response.status, endpoint
872 )));
873 }
874
875 let parsed: KrakenResponse<SpotAddOrderBatchResponse> =
876 serde_json::from_slice(&response.body).map_err(|e| {
877 KrakenHttpError::ParseError(format!("Failed to parse JSON response: {e}"))
878 })?;
879
880 if !parsed.error.is_empty() {
881 return Err(KrakenHttpError::ApiError(parsed.error));
882 }
883
884 parsed
885 .result
886 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
887 }
888
889 pub async fn cancel_order(
891 &self,
892 params: &KrakenSpotCancelOrderParams,
893 ) -> anyhow::Result<SpotCancelOrderResponse, KrakenHttpError> {
894 if self.credential.is_none() {
895 return Err(KrakenHttpError::AuthenticationError(
896 "API credentials required for canceling orders".to_string(),
897 ));
898 }
899
900 let param_string = serde_urlencoded::to_string(params)
901 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
902
903 let body = Some(param_string.into_bytes());
904
905 let response: KrakenResponse<SpotCancelOrderResponse> = self
906 .send_request(Method::POST, "/0/private/CancelOrder", body, true)
907 .await?;
908
909 response
910 .result
911 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
912 }
913
914 pub async fn cancel_order_batch(
916 &self,
917 params: &KrakenSpotCancelOrderBatchParams,
918 ) -> anyhow::Result<SpotCancelOrderBatchResponse, KrakenHttpError> {
919 let credential = self.credential.as_ref().ok_or_else(|| {
920 KrakenHttpError::AuthenticationError(
921 "API credentials required for canceling orders".to_string(),
922 )
923 })?;
924
925 let _guard = self.auth_mutex.lock().await;
927
928 let endpoint = "/0/private/CancelOrderBatch";
929 let nonce = self.generate_nonce();
930
931 let json_body = serde_json::json!({
933 "nonce": nonce.to_string(),
934 "orders": params.orders
935 });
936 let json_str = serde_json::to_string(&json_body)
937 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize: {e}")))?;
938
939 let signature = credential
940 .sign_spot_json(endpoint, nonce, &json_str)
941 .map_err(|e| KrakenHttpError::AuthenticationError(format!("Failed to sign: {e}")))?;
942
943 let mut headers = Self::default_headers();
944 headers.insert("API-Key".to_string(), credential.api_key().to_string());
945 headers.insert("API-Sign".to_string(), signature);
946 headers.insert("Content-Type".to_string(), "application/json".to_string());
947
948 let url = format!("{}{endpoint}", self.base_url);
949 let rate_limit_keys = Self::rate_limit_keys(endpoint);
950
951 let response = self
952 .client
953 .request(
954 Method::POST,
955 url,
956 None,
957 Some(headers),
958 Some(json_str.into_bytes()),
959 None,
960 Some(rate_limit_keys),
961 )
962 .await
963 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
964
965 if response.status.as_u16() >= 400 {
966 let status = response.status.as_u16();
967 let body = String::from_utf8_lossy(&response.body).to_string();
968
969 if status == 401 || status == 403 {
970 return Err(KrakenHttpError::AuthenticationError(format!(
971 "HTTP error {status}: {body}"
972 )));
973 }
974 return Err(KrakenHttpError::NetworkError(format!(
975 "HTTP error {status}: {body}"
976 )));
977 }
978
979 let response_text = String::from_utf8(response.body.to_vec())
980 .map_err(|e| KrakenHttpError::ParseError(format!("Invalid UTF-8: {e}")))?;
981
982 let kraken_response: KrakenResponse<SpotCancelOrderBatchResponse> =
983 serde_json::from_str(&response_text).map_err(|e| {
984 KrakenHttpError::ParseError(format!("Failed to parse response: {e}"))
985 })?;
986
987 if !kraken_response.error.is_empty() {
988 return Err(KrakenHttpError::ApiError(kraken_response.error));
989 }
990
991 kraken_response
992 .result
993 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
994 }
995
996 pub async fn cancel_all_orders(
998 &self,
999 ) -> anyhow::Result<SpotCancelOrderResponse, KrakenHttpError> {
1000 if self.credential.is_none() {
1001 return Err(KrakenHttpError::AuthenticationError(
1002 "API credentials required for canceling orders".to_string(),
1003 ));
1004 }
1005
1006 let response: KrakenResponse<SpotCancelOrderResponse> = self
1007 .send_request(Method::POST, "/0/private/CancelAll", None, true)
1008 .await?;
1009
1010 response
1011 .result
1012 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
1013 }
1014
1015 pub async fn edit_order(
1017 &self,
1018 params: &KrakenSpotEditOrderParams,
1019 ) -> anyhow::Result<SpotEditOrderResponse, KrakenHttpError> {
1020 if self.credential.is_none() {
1021 return Err(KrakenHttpError::AuthenticationError(
1022 "API credentials required for editing orders".to_string(),
1023 ));
1024 }
1025
1026 let param_string = serde_urlencoded::to_string(params)
1027 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
1028
1029 let body = Some(param_string.into_bytes());
1030
1031 let response: KrakenResponse<SpotEditOrderResponse> = self
1032 .send_request(Method::POST, "/0/private/EditOrder", body, true)
1033 .await?;
1034
1035 response
1036 .result
1037 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
1038 }
1039
1040 pub async fn amend_order(
1042 &self,
1043 params: &KrakenSpotAmendOrderParams,
1044 ) -> anyhow::Result<SpotAmendOrderResponse, KrakenHttpError> {
1045 if self.credential.is_none() {
1046 return Err(KrakenHttpError::AuthenticationError(
1047 "API credentials required for amending orders".to_string(),
1048 ));
1049 }
1050
1051 let param_string = serde_urlencoded::to_string(params)
1052 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
1053
1054 let body = Some(param_string.into_bytes());
1055
1056 let response: KrakenResponse<SpotAmendOrderResponse> = self
1057 .send_request(Method::POST, "/0/private/AmendOrder", body, true)
1058 .await?;
1059
1060 response
1061 .result
1062 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
1063 }
1064
1065 pub async fn get_balance(&self) -> anyhow::Result<BalanceResponse, KrakenHttpError> {
1067 if self.credential.is_none() {
1068 return Err(KrakenHttpError::AuthenticationError(
1069 "API credentials required for Balance".to_string(),
1070 ));
1071 }
1072
1073 let response: KrakenResponse<BalanceResponse> = self
1074 .send_request(Method::POST, "/0/private/Balance", None, true)
1075 .await?;
1076
1077 response.result.ok_or_else(|| {
1078 KrakenHttpError::ParseError("Missing result in balance response".to_string())
1079 })
1080 }
1081}
1082
1083#[cfg_attr(
1089 feature = "python",
1090 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken", from_py_object)
1091)]
1092#[cfg_attr(
1093 feature = "python",
1094 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.kraken")
1095)]
1096pub struct KrakenSpotHttpClient {
1097 pub(crate) inner: Arc<KrakenSpotRawHttpClient>,
1098 pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
1099 clock: &'static AtomicTime,
1100 cache_initialized: Arc<AtomicBool>,
1101 use_spot_position_reports: Arc<AtomicBool>,
1102 spot_positions_quote_currency: Arc<RwLock<Ustr>>,
1103}
1104
1105impl Clone for KrakenSpotHttpClient {
1106 fn clone(&self) -> Self {
1107 Self {
1108 inner: self.inner.clone(),
1109 instruments_cache: self.instruments_cache.clone(),
1110 cache_initialized: self.cache_initialized.clone(),
1111 use_spot_position_reports: self.use_spot_position_reports.clone(),
1112 spot_positions_quote_currency: self.spot_positions_quote_currency.clone(),
1113 clock: self.clock,
1114 }
1115 }
1116}
1117
1118impl Default for KrakenSpotHttpClient {
1119 fn default() -> Self {
1120 Self::new(
1121 KrakenEnvironment::Mainnet,
1122 None,
1123 60,
1124 None,
1125 None,
1126 None,
1127 None,
1128 KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND,
1129 )
1130 .expect("Failed to create default KrakenSpotHttpClient")
1131 }
1132}
1133
1134impl Debug for KrakenSpotHttpClient {
1135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1136 f.debug_struct(stringify!(KrakenSpotHttpClient))
1137 .field("inner", &self.inner)
1138 .finish()
1139 }
1140}
1141
1142impl KrakenSpotHttpClient {
1143 #[expect(clippy::too_many_arguments)]
1145 pub fn new(
1146 environment: KrakenEnvironment,
1147 base_url_override: Option<String>,
1148 timeout_secs: u64,
1149 max_retries: Option<u32>,
1150 retry_delay_ms: Option<u64>,
1151 retry_delay_max_ms: Option<u64>,
1152 proxy_url: Option<String>,
1153 max_requests_per_second: u32,
1154 ) -> anyhow::Result<Self> {
1155 Ok(Self {
1156 inner: Arc::new(KrakenSpotRawHttpClient::new(
1157 environment,
1158 base_url_override,
1159 timeout_secs,
1160 max_retries,
1161 retry_delay_ms,
1162 retry_delay_max_ms,
1163 proxy_url,
1164 max_requests_per_second,
1165 )?),
1166 instruments_cache: Arc::new(AtomicMap::new()),
1167 cache_initialized: Arc::new(AtomicBool::new(false)),
1168 use_spot_position_reports: Arc::new(AtomicBool::new(false)),
1169 spot_positions_quote_currency: Arc::new(RwLock::new(Ustr::from("USDT"))),
1170 clock: get_atomic_clock_realtime(),
1171 })
1172 }
1173
1174 #[expect(clippy::too_many_arguments)]
1176 pub fn with_credentials(
1177 api_key: String,
1178 api_secret: String,
1179 environment: KrakenEnvironment,
1180 base_url_override: Option<String>,
1181 timeout_secs: u64,
1182 max_retries: Option<u32>,
1183 retry_delay_ms: Option<u64>,
1184 retry_delay_max_ms: Option<u64>,
1185 proxy_url: Option<String>,
1186 max_requests_per_second: u32,
1187 ) -> anyhow::Result<Self> {
1188 Ok(Self {
1189 inner: Arc::new(KrakenSpotRawHttpClient::with_credentials(
1190 api_key,
1191 api_secret,
1192 environment,
1193 base_url_override,
1194 timeout_secs,
1195 max_retries,
1196 retry_delay_ms,
1197 retry_delay_max_ms,
1198 proxy_url,
1199 max_requests_per_second,
1200 )?),
1201 instruments_cache: Arc::new(AtomicMap::new()),
1202 cache_initialized: Arc::new(AtomicBool::new(false)),
1203 use_spot_position_reports: Arc::new(AtomicBool::new(false)),
1204 spot_positions_quote_currency: Arc::new(RwLock::new(Ustr::from("USDT"))),
1205 clock: get_atomic_clock_realtime(),
1206 })
1207 }
1208
1209 #[expect(clippy::too_many_arguments)]
1217 pub fn from_env(
1218 environment: KrakenEnvironment,
1219 base_url_override: Option<String>,
1220 timeout_secs: u64,
1221 max_retries: Option<u32>,
1222 retry_delay_ms: Option<u64>,
1223 retry_delay_max_ms: Option<u64>,
1224 proxy_url: Option<String>,
1225 max_requests_per_second: u32,
1226 ) -> anyhow::Result<Self> {
1227 if let Some(credential) = KrakenCredential::from_env_spot() {
1228 let (api_key, api_secret) = credential.into_parts();
1229 Self::with_credentials(
1230 api_key,
1231 api_secret,
1232 environment,
1233 base_url_override,
1234 timeout_secs,
1235 max_retries,
1236 retry_delay_ms,
1237 retry_delay_max_ms,
1238 proxy_url,
1239 max_requests_per_second,
1240 )
1241 } else {
1242 Self::new(
1243 environment,
1244 base_url_override,
1245 timeout_secs,
1246 max_retries,
1247 retry_delay_ms,
1248 retry_delay_max_ms,
1249 proxy_url,
1250 max_requests_per_second,
1251 )
1252 }
1253 }
1254
1255 pub fn cancel_all_requests(&self) {
1257 self.inner.cancel_all_requests();
1258 }
1259
1260 pub fn cancellation_token(&self) -> &CancellationToken {
1262 self.inner.cancellation_token()
1263 }
1264
1265 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1267 self.instruments_cache
1268 .insert(instrument.symbol().inner(), instrument);
1269 self.cache_initialized.store(true, Ordering::Release);
1270 }
1271
1272 pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1274 self.instruments_cache.rcu(|m| {
1275 for instrument in instruments {
1276 m.insert(instrument.symbol().inner(), instrument.clone());
1277 }
1278 });
1279 self.cache_initialized.store(true, Ordering::Release);
1280 }
1281
1282 pub fn get_cached_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1284 self.instruments_cache.get_cloned(symbol)
1285 }
1286
1287 fn get_instrument_by_raw_symbol(&self, raw_symbol: &str) -> Option<InstrumentAny> {
1288 self.instruments_cache
1289 .load()
1290 .values()
1291 .find(|inst| inst.raw_symbol().as_str() == raw_symbol)
1292 .cloned()
1293 }
1294
1295 fn generate_ts_init(&self) -> UnixNanos {
1296 self.clock.get_time_ns()
1297 }
1298
1299 fn asset_class_for(instrument: &InstrumentAny) -> Option<KrakenAssetClass> {
1301 if matches!(instrument, InstrumentAny::TokenizedAsset(_)) {
1302 Some(KrakenAssetClass::TokenizedAsset)
1303 } else {
1304 None
1305 }
1306 }
1307
1308 pub fn set_use_spot_position_reports(&self, value: bool) {
1310 self.use_spot_position_reports
1311 .store(value, Ordering::Relaxed);
1312 }
1313
1314 pub fn set_spot_positions_quote_currency(&self, currency: &str) {
1316 let mut guard = self.spot_positions_quote_currency.write().expect("lock");
1317 *guard = Ustr::from(currency);
1318 }
1319
1320 pub async fn get_websockets_token(&self) -> anyhow::Result<WebSocketToken, KrakenHttpError> {
1322 self.inner.get_websockets_token().await
1323 }
1324
1325 pub async fn request_instruments(
1330 &self,
1331 pairs: Option<Vec<String>>,
1332 ) -> anyhow::Result<Vec<InstrumentAny>, KrakenHttpError> {
1333 let ts_init = self.generate_ts_init();
1334 let asset_pairs = self.inner.get_asset_pairs(pairs.clone(), None).await?;
1335
1336 let mut instruments: Vec<InstrumentAny> = asset_pairs
1337 .iter()
1338 .filter_map(|(pair_name, definition)| {
1339 match parse_spot_instrument(pair_name, definition, ts_init, ts_init) {
1340 Ok(instrument) => Some(instrument),
1341 Err(e) => {
1342 log::warn!("Failed to parse instrument {pair_name}: {e}");
1343 None
1344 }
1345 }
1346 })
1347 .collect();
1348
1349 {
1353 match self
1354 .inner
1355 .get_asset_pairs(pairs, Some("tokenized_asset"))
1356 .await
1357 {
1358 Ok(tokenized_pairs) => {
1359 if !tokenized_pairs.is_empty() {
1360 log::info!("Fetched {} tokenized asset pairs", tokenized_pairs.len());
1361 }
1362 let tokenized_instruments: Vec<InstrumentAny> =
1363 tokenized_pairs
1364 .iter()
1365 .filter_map(|(pair_name, definition)| match parse_tokenized_instrument(
1366 pair_name, definition, ts_init, ts_init,
1367 ) {
1368 Ok(instrument) => Some(instrument),
1369 Err(e) => {
1370 log::warn!(
1371 "Failed to parse tokenized instrument {pair_name}: {e}"
1372 );
1373 None
1374 }
1375 })
1376 .collect();
1377 instruments.extend(tokenized_instruments);
1378 }
1379 Err(e) => {
1380 log::warn!("Failed to fetch tokenized asset pairs: {e}");
1381 }
1382 }
1383 }
1384
1385 Ok(instruments)
1386 }
1387
1388 pub async fn request_instrument_statuses(
1394 &self,
1395 pairs: Option<Vec<String>>,
1396 ) -> anyhow::Result<AHashMap<InstrumentId, MarketStatusAction>, KrakenHttpError> {
1397 let asset_pairs = self.inner.get_asset_pairs(pairs.clone(), None).await?;
1398 let mut statuses = collect_spot_statuses(&asset_pairs);
1399
1400 let tokenized_pairs = self
1401 .inner
1402 .get_asset_pairs(pairs, Some("tokenized_asset"))
1403 .await?;
1404 statuses.extend(collect_spot_statuses(&tokenized_pairs));
1405
1406 Ok(statuses)
1407 }
1408
1409 pub async fn request_trades(
1411 &self,
1412 instrument_id: InstrumentId,
1413 start: Option<DateTime<Utc>>,
1414 end: Option<DateTime<Utc>>,
1415 limit: Option<u64>,
1416 ) -> anyhow::Result<Vec<TradeTick>, KrakenHttpError> {
1417 let instrument = self
1418 .get_cached_instrument(&instrument_id.symbol.inner())
1419 .ok_or_else(|| {
1420 KrakenHttpError::ParseError(format!(
1421 "Instrument not found in cache: {instrument_id}",
1422 ))
1423 })?;
1424
1425 let raw_symbol = instrument.raw_symbol().to_string();
1426 let asset_class = Self::asset_class_for(&instrument);
1427 let ts_init = self.generate_ts_init();
1428
1429 let since = start.map(|dt| (dt.timestamp_nanos_opt().unwrap_or(0) as u64).to_string());
1431 let response = self
1432 .inner
1433 .get_trades(&raw_symbol, since, asset_class)
1434 .await?;
1435
1436 let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1437 let mut trades = Vec::new();
1438
1439 for (_pair_name, trade_arrays) in &response.data {
1440 for trade_array in trade_arrays {
1441 match parse_trade_tick_from_array(trade_array, &instrument, ts_init) {
1442 Ok(trade_tick) => {
1443 if let Some(end_nanos) = end_ns
1444 && trade_tick.ts_event.as_u64() > end_nanos
1445 {
1446 continue;
1447 }
1448 trades.push(trade_tick);
1449
1450 if let Some(limit_count) = limit
1451 && trades.len() >= limit_count as usize
1452 {
1453 return Ok(trades);
1454 }
1455 }
1456 Err(e) => {
1457 log::warn!("Failed to parse trade tick: {e}");
1458 }
1459 }
1460 }
1461 }
1462
1463 Ok(trades)
1464 }
1465
1466 pub async fn request_bars(
1468 &self,
1469 bar_type: BarType,
1470 start: Option<DateTime<Utc>>,
1471 end: Option<DateTime<Utc>>,
1472 limit: Option<u64>,
1473 ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
1474 let instrument_id = bar_type.instrument_id();
1475 let instrument = self
1476 .get_cached_instrument(&instrument_id.symbol.inner())
1477 .ok_or_else(|| {
1478 KrakenHttpError::ParseError(format!(
1479 "Instrument not found in cache: {instrument_id}"
1480 ))
1481 })?;
1482
1483 let raw_symbol = instrument.raw_symbol().to_string();
1484 let asset_class = Self::asset_class_for(&instrument);
1485 let ts_init = self.generate_ts_init();
1486
1487 let interval = Some(
1488 bar_type_to_spot_interval(bar_type)
1489 .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?,
1490 );
1491
1492 let since = start.map(|dt| dt.timestamp());
1494 let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1495 let response = self
1496 .inner
1497 .get_ohlc(&raw_symbol, interval, since, asset_class)
1498 .await?;
1499
1500 let mut bars = Vec::new();
1501
1502 for (_pair_name, ohlc_arrays) in &response.data {
1503 for ohlc_array in ohlc_arrays {
1504 if ohlc_array.len() < 8 {
1505 let len = ohlc_array.len();
1506 log::warn!("OHLC array too short: {len}");
1507 continue;
1508 }
1509
1510 let ohlc = OhlcData {
1511 time: ohlc_array[0].as_i64().unwrap_or(0),
1512 open: ohlc_array[1].as_str().unwrap_or("0").to_string(),
1513 high: ohlc_array[2].as_str().unwrap_or("0").to_string(),
1514 low: ohlc_array[3].as_str().unwrap_or("0").to_string(),
1515 close: ohlc_array[4].as_str().unwrap_or("0").to_string(),
1516 vwap: ohlc_array[5].as_str().unwrap_or("0").to_string(),
1517 volume: ohlc_array[6].as_str().unwrap_or("0").to_string(),
1518 count: ohlc_array[7].as_i64().unwrap_or(0),
1519 };
1520
1521 match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
1522 Ok(bar) => {
1523 if let Some(end_nanos) = end_ns
1524 && bar.ts_event.as_u64() > end_nanos
1525 {
1526 continue;
1527 }
1528 bars.push(bar);
1529
1530 if let Some(limit_count) = limit
1531 && bars.len() >= limit_count as usize
1532 {
1533 return Ok(bars);
1534 }
1535 }
1536 Err(e) => {
1537 log::warn!("Failed to parse bar: {e}");
1538 }
1539 }
1540 }
1541 }
1542
1543 Ok(bars)
1544 }
1545
1546 pub async fn request_book_snapshot(
1548 &self,
1549 instrument_id: InstrumentId,
1550 depth: Option<u32>,
1551 ) -> anyhow::Result<OrderBook, KrakenHttpError> {
1552 let instrument = self
1553 .get_cached_instrument(&instrument_id.symbol.inner())
1554 .ok_or_else(|| {
1555 KrakenHttpError::ParseError(format!(
1556 "Instrument not found in cache: {instrument_id}"
1557 ))
1558 })?;
1559
1560 let raw_symbol = instrument.raw_symbol().to_string();
1561 let asset_class = Self::asset_class_for(&instrument);
1562 let price_precision = instrument.price_precision();
1563 let size_precision = instrument.size_precision();
1564 let ts_event = self.generate_ts_init();
1565
1566 let response = self
1567 .inner
1568 .get_book_depth(&raw_symbol, depth, asset_class)
1569 .await?;
1570
1571 let book_data = response.values().next().ok_or_else(|| {
1572 KrakenHttpError::ParseError(format!("No book data returned for {instrument_id}"))
1573 })?;
1574
1575 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1576
1577 for (i, level) in book_data.bids.iter().enumerate() {
1580 let price_str = level.first().and_then(|v| v.as_str()).unwrap_or("0");
1581 let size_str = level.get(1).and_then(|v| v.as_str()).unwrap_or("0");
1582 let price = Price::new(price_str.parse::<f64>().unwrap_or(0.0), price_precision);
1583 let size = Quantity::new(size_str.parse::<f64>().unwrap_or(0.0), size_precision);
1584 let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1585 book.add(order, 0, 0, ts_event);
1586 }
1587
1588 let bids_len = book_data.bids.len();
1589
1590 for (i, level) in book_data.asks.iter().enumerate() {
1591 let price_str = level.first().and_then(|v| v.as_str()).unwrap_or("0");
1592 let size_str = level.get(1).and_then(|v| v.as_str()).unwrap_or("0");
1593 let price = Price::new(price_str.parse::<f64>().unwrap_or(0.0), price_precision);
1594 let size = Quantity::new(size_str.parse::<f64>().unwrap_or(0.0), size_precision);
1595 let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1596 book.add(order, 0, 0, ts_event);
1597 }
1598
1599 Ok(book)
1600 }
1601
1602 pub async fn request_account_state(
1606 &self,
1607 account_id: AccountId,
1608 ) -> anyhow::Result<AccountState> {
1609 let balances_raw = self.inner.get_balance().await?;
1610 let ts_init = self.generate_ts_init();
1611
1612 let balances: Vec<AccountBalance> = balances_raw
1613 .iter()
1614 .filter_map(|(currency_code, amount_str)| {
1615 let amount = Decimal::from_str_exact(amount_str).ok()?;
1616 if amount.is_zero() {
1617 return None;
1618 }
1619
1620 let normalized_code = currency_code
1622 .strip_prefix("X")
1623 .or_else(|| currency_code.strip_prefix("Z"))
1624 .unwrap_or(currency_code);
1625
1626 let currency = Currency::new(
1627 normalized_code,
1628 8, 0,
1630 "0",
1631 CurrencyType::Crypto,
1632 );
1633
1634 AccountBalance::from_total_and_locked(amount, Decimal::ZERO, currency).ok()
1636 })
1637 .collect();
1638
1639 Ok(AccountState::new(
1640 account_id,
1641 AccountType::Cash,
1642 balances,
1643 vec![], true, UUID4::new(),
1646 ts_init,
1647 ts_init,
1648 None,
1649 ))
1650 }
1651
1652 pub async fn request_order_status_reports(
1654 &self,
1655 account_id: AccountId,
1656 instrument_id: Option<InstrumentId>,
1657 start: Option<DateTime<Utc>>,
1658 end: Option<DateTime<Utc>>,
1659 open_only: bool,
1660 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1661 const PAGE_SIZE: i32 = 50;
1662
1663 let ts_init = self.generate_ts_init();
1664 let mut all_reports = Vec::new();
1665
1666 let open_orders = self.inner.get_open_orders(Some(true), None).await?;
1667
1668 for (order_id, order) in &open_orders {
1669 if let Some(ref target_id) = instrument_id {
1670 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1671 if let Some(inst) = instrument
1672 && inst.raw_symbol().as_str() != order.descr.pair
1673 {
1674 continue;
1675 }
1676 }
1677
1678 if let Some(instrument) = self.get_instrument_by_raw_symbol(order.descr.pair.as_str()) {
1679 match parse_order_status_report(order_id, order, &instrument, account_id, ts_init) {
1680 Ok(report) => all_reports.push(report),
1681 Err(e) => {
1682 log::warn!("Failed to parse order {order_id}: {e}");
1683 }
1684 }
1685 }
1686 }
1687
1688 if open_only {
1689 return Ok(all_reports);
1690 }
1691
1692 let start_ts = start.map(|dt| dt.timestamp());
1694 let end_ts = end.map(|dt| dt.timestamp());
1695
1696 let mut offset = 0;
1697
1698 loop {
1699 let closed_orders = self
1700 .inner
1701 .get_closed_orders(Some(true), None, start_ts, end_ts, Some(offset), None)
1702 .await?;
1703
1704 if closed_orders.is_empty() {
1705 break;
1706 }
1707
1708 for (order_id, order) in &closed_orders {
1709 if let Some(ref target_id) = instrument_id {
1710 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1711 if let Some(inst) = instrument
1712 && inst.raw_symbol().as_str() != order.descr.pair
1713 {
1714 continue;
1715 }
1716 }
1717
1718 if let Some(instrument) =
1719 self.get_instrument_by_raw_symbol(order.descr.pair.as_str())
1720 {
1721 match parse_order_status_report(
1722 order_id,
1723 order,
1724 &instrument,
1725 account_id,
1726 ts_init,
1727 ) {
1728 Ok(report) => all_reports.push(report),
1729 Err(e) => {
1730 log::warn!("Failed to parse order {order_id}: {e}");
1731 }
1732 }
1733 }
1734 }
1735
1736 offset += PAGE_SIZE;
1737 }
1738
1739 Ok(all_reports)
1740 }
1741
1742 pub async fn request_fill_reports(
1744 &self,
1745 account_id: AccountId,
1746 instrument_id: Option<InstrumentId>,
1747 start: Option<DateTime<Utc>>,
1748 end: Option<DateTime<Utc>>,
1749 ) -> anyhow::Result<Vec<FillReport>> {
1750 const PAGE_SIZE: i32 = 50;
1751
1752 let ts_init = self.generate_ts_init();
1753 let mut all_reports = Vec::new();
1754
1755 let start_ts = start.map(|dt| dt.timestamp());
1757 let end_ts = end.map(|dt| dt.timestamp());
1758
1759 let mut offset = 0;
1760
1761 loop {
1762 let trades = self
1763 .inner
1764 .get_trades_history(None, Some(true), start_ts, end_ts, Some(offset))
1765 .await?;
1766
1767 if trades.is_empty() {
1768 break;
1769 }
1770
1771 for (trade_id, trade) in &trades {
1772 if let Some(ref target_id) = instrument_id {
1773 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1774 if let Some(inst) = instrument
1775 && inst.raw_symbol().as_str() != trade.pair
1776 {
1777 continue;
1778 }
1779 }
1780
1781 if let Some(instrument) = self.get_instrument_by_raw_symbol(trade.pair.as_str()) {
1782 match parse_fill_report(trade_id, trade, &instrument, account_id, ts_init) {
1783 Ok(report) => all_reports.push(report),
1784 Err(e) => {
1785 log::warn!("Failed to parse trade {trade_id}: {e}");
1786 }
1787 }
1788 }
1789 }
1790
1791 offset += PAGE_SIZE;
1792 }
1793
1794 Ok(all_reports)
1795 }
1796
1797 pub async fn request_position_status_reports(
1802 &self,
1803 account_id: AccountId,
1804 instrument_id: Option<InstrumentId>,
1805 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1806 if self.use_spot_position_reports.load(Ordering::Relaxed) {
1807 self.generate_spot_position_reports_from_wallet(account_id, instrument_id)
1808 .await
1809 } else {
1810 Ok(Vec::new())
1811 }
1812 }
1813
1814 async fn generate_spot_position_reports_from_wallet(
1820 &self,
1821 account_id: AccountId,
1822 instrument_id: Option<InstrumentId>,
1823 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1824 let balances_raw = self.inner.get_balance().await?;
1825 let ts_init = self.generate_ts_init();
1826 let mut wallet_by_coin: HashMap<Ustr, f64> = HashMap::new();
1827
1828 for (currency_code, amount_str) in &balances_raw {
1829 let balance = match amount_str.parse::<f64>() {
1830 Ok(b) => b,
1831 Err(_) => continue,
1832 };
1833
1834 if balance == 0.0 {
1835 continue;
1836 }
1837
1838 wallet_by_coin.insert(Ustr::from(normalize_currency_code(currency_code)), balance);
1839 }
1840
1841 let mut reports = Vec::new();
1842
1843 if let Some(instrument_id) = instrument_id {
1844 if let Some(instrument) = self.get_cached_instrument(&instrument_id.symbol.inner()) {
1845 let base_currency = match instrument.base_currency() {
1846 Some(currency) => currency,
1847 None => return Ok(reports),
1848 };
1849
1850 let coin = Ustr::from(normalize_currency_code(base_currency.code.as_str()));
1851 let wallet_balance = wallet_by_coin.get(&coin).copied().unwrap_or(0.0);
1852
1853 let side = if wallet_balance > 0.0 {
1854 PositionSideSpecified::Long
1855 } else {
1856 PositionSideSpecified::Flat
1857 };
1858
1859 let abs_balance = wallet_balance.abs();
1860 let quantity = Quantity::new(abs_balance, instrument.size_precision());
1861
1862 let report = PositionStatusReport::new(
1863 account_id,
1864 instrument_id,
1865 side,
1866 quantity,
1867 ts_init,
1868 ts_init,
1869 None,
1870 None,
1871 None,
1872 );
1873
1874 reports.push(report);
1875 }
1876 } else {
1877 let quote_filter = *self.spot_positions_quote_currency.read().expect("lock");
1878
1879 let instruments_guard = self.instruments_cache.load();
1880 for instrument in instruments_guard.values() {
1881 let quote_currency = match instrument.quote_currency() {
1882 currency if currency.code == quote_filter => currency,
1883 _ => continue,
1884 };
1885
1886 let base_currency = match instrument.base_currency() {
1887 Some(currency) => currency,
1888 None => continue,
1889 };
1890
1891 let coin = Ustr::from(normalize_currency_code(base_currency.code.as_str()));
1892 let wallet_balance = wallet_by_coin.get(&coin).copied().unwrap_or(0.0);
1893
1894 if wallet_balance == 0.0 {
1895 continue;
1896 }
1897
1898 let side = PositionSideSpecified::Long;
1899 let quantity = Quantity::new(wallet_balance, instrument.size_precision());
1900
1901 if quantity.is_zero() {
1902 continue;
1903 }
1904
1905 log::debug!(
1906 "Spot position: {} {} (quote: {})",
1907 quantity,
1908 base_currency.code,
1909 quote_currency.code
1910 );
1911
1912 let report = PositionStatusReport::new(
1913 account_id,
1914 instrument.id(),
1915 side,
1916 quantity,
1917 ts_init,
1918 ts_init,
1919 None,
1920 None,
1921 None,
1922 );
1923
1924 reports.push(report);
1925 }
1926 }
1927
1928 Ok(reports)
1929 }
1930
1931 #[expect(clippy::too_many_arguments)]
1944 pub async fn submit_order(
1945 &self,
1946 _account_id: AccountId,
1947 instrument_id: InstrumentId,
1948 client_order_id: ClientOrderId,
1949 order_side: OrderSide,
1950 order_type: OrderType,
1951 quantity: Quantity,
1952 time_in_force: TimeInForce,
1953 expire_time: Option<UnixNanos>,
1954 price: Option<Price>,
1955 trigger_price: Option<Price>,
1956 trigger_type: Option<TriggerType>,
1957 trailing_offset: Option<Decimal>,
1958 limit_offset: Option<Decimal>,
1959 reduce_only: bool,
1960 post_only: bool,
1961 quote_quantity: bool,
1962 display_qty: Option<Quantity>,
1963 ) -> anyhow::Result<VenueOrderId> {
1964 let params = self.build_add_order_params(
1965 instrument_id,
1966 client_order_id,
1967 order_side,
1968 order_type,
1969 quantity,
1970 time_in_force,
1971 expire_time,
1972 price,
1973 trigger_price,
1974 trigger_type,
1975 trailing_offset,
1976 limit_offset,
1977 reduce_only,
1978 post_only,
1979 quote_quantity,
1980 display_qty,
1981 )?;
1982 let response = self.inner.add_order(¶ms).await?;
1983
1984 let venue_order_id = response
1985 .txid
1986 .first()
1987 .ok_or_else(|| anyhow::anyhow!("No transaction ID in order response"))?;
1988
1989 Ok(VenueOrderId::new(venue_order_id))
1990 }
1991
1992 #[expect(clippy::type_complexity)]
1997 pub async fn submit_orders_batch(
1998 &self,
1999 orders: Vec<(
2000 InstrumentId,
2001 ClientOrderId,
2002 OrderSide,
2003 OrderType,
2004 Quantity,
2005 TimeInForce,
2006 Option<UnixNanos>,
2007 Option<Price>,
2008 Option<Price>,
2009 Option<TriggerType>,
2010 Option<Decimal>,
2011 Option<Decimal>,
2012 bool,
2013 bool,
2014 bool,
2015 Option<Quantity>,
2016 )>,
2017 ) -> anyhow::Result<Vec<String>> {
2018 let count = orders.len();
2019 if count == 0 {
2020 return Ok(Vec::new());
2021 }
2022
2023 let mut all_statuses: Vec<Option<String>> = vec![None; count];
2024 let mut grouped: AHashMap<Ustr, Vec<(usize, KrakenSpotAddOrderParams)>> = AHashMap::new();
2025
2026 for (
2027 idx,
2028 (
2029 instrument_id,
2030 client_order_id,
2031 order_side,
2032 order_type,
2033 quantity,
2034 time_in_force,
2035 expire_time,
2036 price,
2037 trigger_price,
2038 trigger_type,
2039 trailing_offset,
2040 limit_offset,
2041 reduce_only,
2042 post_only,
2043 quote_quantity,
2044 display_qty,
2045 ),
2046 ) in orders.into_iter().enumerate()
2047 {
2048 match self.build_add_order_params(
2049 instrument_id,
2050 client_order_id,
2051 order_side,
2052 order_type,
2053 quantity,
2054 time_in_force,
2055 expire_time,
2056 price,
2057 trigger_price,
2058 trigger_type,
2059 trailing_offset,
2060 limit_offset,
2061 reduce_only,
2062 post_only,
2063 quote_quantity,
2064 display_qty,
2065 ) {
2066 Ok(params) => {
2067 grouped.entry(params.pair).or_default().push((idx, params));
2068 }
2069 Err(e) => {
2070 all_statuses[idx] = Some(format!("validation_error: {e}"));
2071 }
2072 }
2073 }
2074
2075 let mut grouped_batches: Vec<_> = grouped.into_values().collect();
2076 grouped_batches.sort_by_key(|group| group.first().map_or(usize::MAX, |(idx, _)| *idx));
2077
2078 for grouped_orders in grouped_batches {
2079 for chunk in grouped_orders.chunks(BATCH_SUBMIT_LIMIT) {
2080 if chunk.len() == 1 {
2081 let (idx, params) = &chunk[0];
2082 match self.inner.add_order(params).await {
2083 Ok(response) => {
2084 let status = if response.txid.is_empty() {
2085 "Unknown error".to_string()
2086 } else {
2087 "placed".to_string()
2088 };
2089 all_statuses[*idx] = Some(status);
2090 }
2091 Err(e) => {
2092 all_statuses[*idx] = Some(format!("batch_error: {e}"));
2093 }
2094 }
2095 continue;
2096 }
2097
2098 let batch_params = KrakenSpotAddOrderBatchParams {
2099 pair: chunk[0].1.pair,
2100 orders: chunk
2101 .iter()
2102 .map(|(_, params)| params.clone().into())
2103 .collect(),
2104 asset_class: chunk[0].1.asset_class,
2105 };
2106
2107 match self.inner.add_order_batch(&batch_params).await {
2108 Ok(response) => {
2109 for (offset, (idx, _)) in chunk.iter().enumerate() {
2110 let status = response.orders.get(offset).map_or_else(
2111 || "Unknown error".to_string(),
2112 |order| {
2113 if order.txid.is_some() {
2114 "placed".to_string()
2115 } else {
2116 order
2117 .error
2118 .clone()
2119 .unwrap_or_else(|| "Unknown error".to_string())
2120 }
2121 },
2122 );
2123 all_statuses[*idx] = Some(status);
2124 }
2125 }
2126 Err(e) => {
2127 for (idx, _) in chunk {
2128 all_statuses[*idx] = Some(format!("batch_error: {e}"));
2129 }
2130 }
2131 }
2132 }
2133 }
2134
2135 Ok(all_statuses
2136 .into_iter()
2137 .map(|status| status.unwrap_or_else(|| "Unknown error".to_string()))
2138 .collect())
2139 }
2140
2141 pub async fn modify_order(
2153 &self,
2154 instrument_id: InstrumentId,
2155 client_order_id: Option<ClientOrderId>,
2156 venue_order_id: Option<VenueOrderId>,
2157 quantity: Option<Quantity>,
2158 price: Option<Price>,
2159 trigger_price: Option<Price>,
2160 ) -> anyhow::Result<VenueOrderId> {
2161 let _ = self
2162 .get_cached_instrument(&instrument_id.symbol.inner())
2163 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
2164
2165 let txid = venue_order_id.as_ref().map(|id| id.to_string());
2166 let cl_ord_id = client_order_id.as_ref().map(truncate_cl_ord_id);
2167
2168 if txid.is_none() && cl_ord_id.is_none() {
2169 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2170 }
2171
2172 let mut builder = KrakenSpotAmendOrderParamsBuilder::default();
2173
2174 if let Some(ref id) = txid {
2176 builder.txid(id.clone());
2177 } else if let Some(ref id) = cl_ord_id {
2178 builder.cl_ord_id(id.clone());
2179 }
2180
2181 if let Some(qty) = quantity {
2182 builder.order_qty(qty.to_string());
2183 }
2184
2185 if let Some(p) = price {
2186 builder.limit_price(p.to_string());
2187 }
2188
2189 if let Some(tp) = trigger_price {
2190 builder.trigger_price(tp.to_string());
2191 }
2192
2193 let params = builder
2194 .build()
2195 .map_err(|e| anyhow::anyhow!("Failed to build amend order params: {e}"))?;
2196
2197 let _response = self.inner.amend_order(¶ms).await?;
2198
2199 let order_id = venue_order_id
2201 .ok_or_else(|| anyhow::anyhow!("venue_order_id required for amend response"))?;
2202
2203 Ok(order_id)
2204 }
2205
2206 pub async fn cancel_order(
2216 &self,
2217 _account_id: AccountId,
2218 instrument_id: InstrumentId,
2219 client_order_id: Option<ClientOrderId>,
2220 venue_order_id: Option<VenueOrderId>,
2221 ) -> anyhow::Result<()> {
2222 let _ = self
2223 .get_cached_instrument(&instrument_id.symbol.inner())
2224 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
2225
2226 let txid = venue_order_id.as_ref().map(|id| id.to_string());
2227 let cl_ord_id = client_order_id.as_ref().map(truncate_cl_ord_id);
2228
2229 if txid.is_none() && cl_ord_id.is_none() {
2230 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
2231 }
2232
2233 let mut builder = KrakenSpotCancelOrderParamsBuilder::default();
2236
2237 if let Some(ref id) = txid {
2238 builder.txid(id.clone());
2239 } else if let Some(ref id) = cl_ord_id {
2240 builder.cl_ord_id(id.clone());
2241 }
2242 let params = builder
2243 .build()
2244 .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
2245
2246 self.inner.cancel_order(¶ms).await?;
2247
2248 Ok(())
2249 }
2250
2251 pub async fn cancel_orders_batch(
2253 &self,
2254 venue_order_ids: Vec<VenueOrderId>,
2255 ) -> anyhow::Result<i32> {
2256 if venue_order_ids.is_empty() {
2257 return Ok(0);
2258 }
2259
2260 let mut total_cancelled = 0;
2261
2262 for chunk in venue_order_ids.chunks(BATCH_CANCEL_LIMIT) {
2263 let orders: Vec<String> = chunk.iter().map(|id| id.to_string()).collect();
2264 let params = KrakenSpotCancelOrderBatchParams { orders };
2265
2266 let response = self.inner.cancel_order_batch(¶ms).await?;
2267 total_cancelled += response.count;
2268 }
2269
2270 Ok(total_cancelled)
2271 }
2272
2273 #[expect(clippy::too_many_arguments)]
2274 fn build_add_order_params(
2275 &self,
2276 instrument_id: InstrumentId,
2277 client_order_id: ClientOrderId,
2278 order_side: OrderSide,
2279 order_type: OrderType,
2280 quantity: Quantity,
2281 time_in_force: TimeInForce,
2282 expire_time: Option<UnixNanos>,
2283 price: Option<Price>,
2284 trigger_price: Option<Price>,
2285 trigger_type: Option<TriggerType>,
2286 trailing_offset: Option<Decimal>,
2287 limit_offset: Option<Decimal>,
2288 reduce_only: bool,
2289 post_only: bool,
2290 quote_quantity: bool,
2291 display_qty: Option<Quantity>,
2292 ) -> anyhow::Result<KrakenSpotAddOrderParams> {
2293 let instrument = self
2294 .get_cached_instrument(&instrument_id.symbol.inner())
2295 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
2296
2297 let raw_symbol = instrument.raw_symbol().inner();
2298 let asset_class = Self::asset_class_for(&instrument);
2299
2300 let kraken_side = match order_side {
2301 OrderSide::Buy => KrakenOrderSide::Buy,
2302 OrderSide::Sell => KrakenOrderSide::Sell,
2303 _ => anyhow::bail!("Invalid order side: {order_side:?}"),
2304 };
2305
2306 let kraken_order_type = match order_type {
2307 OrderType::Market => KrakenOrderType::Market,
2308 OrderType::Limit => KrakenOrderType::Limit,
2309 OrderType::StopMarket => KrakenOrderType::StopLoss,
2310 OrderType::StopLimit => KrakenOrderType::StopLossLimit,
2311 OrderType::MarketIfTouched => KrakenOrderType::TakeProfit,
2312 OrderType::LimitIfTouched => KrakenOrderType::TakeProfitLimit,
2313 OrderType::TrailingStopMarket => KrakenOrderType::TrailingStop,
2314 OrderType::TrailingStopLimit => KrakenOrderType::TrailingStopLimit,
2315 _ => anyhow::bail!("Unsupported order type: {order_type:?}"),
2316 };
2317
2318 let mut oflags = Vec::new();
2319 let is_limit_order = matches!(
2320 order_type,
2321 OrderType::Limit
2322 | OrderType::StopLimit
2323 | OrderType::LimitIfTouched
2324 | OrderType::TrailingStopLimit
2325 );
2326
2327 if time_in_force == TimeInForce::Fok && order_type != OrderType::Limit {
2328 anyhow::bail!("FOK time in force only supported for LIMIT orders on Kraken Spot");
2329 }
2330
2331 let (timeinforce, expiretm) =
2332 compute_time_in_force(is_limit_order, time_in_force, expire_time)?;
2333
2334 if post_only {
2335 oflags.push(KRAKEN_OFLAG_POST_ONLY);
2336 }
2337
2338 if reduce_only {
2339 log::warn!("reduce_only is not supported by Kraken Spot API, ignoring");
2340 }
2341
2342 if quote_quantity {
2343 oflags.push(KRAKEN_OFLAG_QUOTE_QUANTITY);
2344 }
2345
2346 let mut builder = KrakenSpotAddOrderParamsBuilder::default();
2347 builder
2348 .cl_ord_id(truncate_cl_ord_id(&client_order_id))
2349 .broker(NAUTILUS_KRAKEN_BROKER_ID)
2350 .pair(raw_symbol)
2351 .side(kraken_side)
2352 .volume(quantity.to_string())
2353 .order_type(kraken_order_type);
2354
2355 let is_conditional = matches!(
2356 order_type,
2357 OrderType::StopMarket
2358 | OrderType::StopLimit
2359 | OrderType::MarketIfTouched
2360 | OrderType::LimitIfTouched
2361 | OrderType::TrailingStopMarket
2362 | OrderType::TrailingStopLimit
2363 );
2364
2365 let is_trailing = matches!(
2366 order_type,
2367 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
2368 );
2369
2370 if is_trailing {
2371 if trigger_price.is_some() {
2372 anyhow::bail!(
2373 "Kraken Spot trailing stops do not support activation trigger prices"
2374 );
2375 }
2376
2377 if let Some(offset) = trailing_offset {
2378 builder.price(offset.to_string());
2379 }
2380
2381 if let Some(offset) = limit_offset {
2382 builder.price2(offset.to_string());
2383 }
2384 } else if is_conditional {
2385 if let Some(trigger) = trigger_price {
2386 builder.price(trigger.to_string());
2387 }
2388
2389 if let Some(limit) = price {
2390 builder.price2(limit.to_string());
2391 }
2392 } else if let Some(limit) = price {
2393 builder.price(limit.to_string());
2394 }
2395
2396 if is_conditional {
2397 match trigger_type {
2398 Some(TriggerType::IndexPrice) => {
2399 builder.trigger("index".to_string());
2400 }
2401 Some(TriggerType::LastPrice | TriggerType::Default) | None => {}
2402 Some(other) => {
2403 anyhow::bail!(
2404 "Unsupported trigger type for Kraken Spot: {other:?} (only LastPrice and IndexPrice supported)"
2405 );
2406 }
2407 }
2408 }
2409
2410 if !oflags.is_empty() {
2411 builder.oflags(oflags.join(","));
2412 }
2413
2414 if let Some(tif) = timeinforce {
2415 builder.timeinforce(tif);
2416 }
2417
2418 if let Some(expire) = expiretm {
2419 builder.expiretm(expire);
2420 }
2421
2422 if let Some(dq) = display_qty {
2423 builder.displayvol(dq.to_string());
2424 }
2425
2426 if let Some(ac) = asset_class {
2427 builder.asset_class(ac);
2428 }
2429
2430 builder
2431 .build()
2432 .map_err(|e| anyhow::anyhow!("Failed to build order params: {e}"))
2433 }
2434}
2435
2436fn collect_spot_statuses(
2437 asset_pairs: &AssetPairsResponse,
2438) -> AHashMap<InstrumentId, MarketStatusAction> {
2439 asset_pairs
2440 .iter()
2441 .map(|(_, definition)| {
2442 let symbol_str = definition.wsname.as_ref().unwrap_or(&definition.altname);
2443 let normalized_symbol = normalize_spot_symbol(symbol_str.as_str());
2444 let instrument_id = InstrumentId::new(Symbol::new(&normalized_symbol), *KRAKEN_VENUE);
2445 let action = definition
2446 .status
2447 .map_or(MarketStatusAction::Trading, MarketStatusAction::from);
2448
2449 (instrument_id, action)
2450 })
2451 .collect()
2452}
2453
2454#[cfg(test)]
2455mod tests {
2456 use nautilus_model::instruments::CurrencyPair;
2457 use rstest::rstest;
2458
2459 use super::*;
2460
2461 #[rstest]
2462 fn test_raw_client_creation() {
2463 let client = KrakenSpotRawHttpClient::default();
2464 assert!(client.credential.is_none());
2465 }
2466
2467 #[rstest]
2468 fn test_raw_client_with_credentials() {
2469 let client = KrakenSpotRawHttpClient::with_credentials(
2470 "test_key".to_string(),
2471 "test_secret".to_string(),
2472 KrakenEnvironment::Mainnet,
2473 None,
2474 60,
2475 None,
2476 None,
2477 None,
2478 None,
2479 KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND,
2480 )
2481 .unwrap();
2482 assert!(client.credential.is_some());
2483 }
2484
2485 #[rstest]
2486 fn test_client_creation() {
2487 let client = KrakenSpotHttpClient::default();
2488 assert!(client.instruments_cache.is_empty());
2489 }
2490
2491 #[rstest]
2492 fn test_client_with_credentials() {
2493 let client = KrakenSpotHttpClient::with_credentials(
2494 "test_key".to_string(),
2495 "test_secret".to_string(),
2496 KrakenEnvironment::Mainnet,
2497 None,
2498 60,
2499 None,
2500 None,
2501 None,
2502 None,
2503 KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND,
2504 )
2505 .unwrap();
2506 assert!(client.instruments_cache.is_empty());
2507 }
2508
2509 #[rstest]
2510 fn test_nonce_generation_strictly_increasing() {
2511 let client = KrakenSpotRawHttpClient::default();
2512
2513 let nonce1 = client.generate_nonce();
2514 let nonce2 = client.generate_nonce();
2515 let nonce3 = client.generate_nonce();
2516
2517 assert!(
2518 nonce2 > nonce1,
2519 "nonce2 ({nonce2}) should be > nonce1 ({nonce1})"
2520 );
2521 assert!(
2522 nonce3 > nonce2,
2523 "nonce3 ({nonce3}) should be > nonce2 ({nonce2})"
2524 );
2525 }
2526
2527 #[rstest]
2528 fn test_nonce_is_nanosecond_timestamp() {
2529 let client = KrakenSpotRawHttpClient::default();
2530
2531 let nonce = client.generate_nonce();
2532
2533 assert!(
2536 nonce > 1_500_000_000_000_000_000,
2537 "Nonce should be nanosecond timestamp"
2538 );
2539 }
2540
2541 #[rstest]
2542 #[case::gtc_limit(true, TimeInForce::Gtc, None, None, None)]
2543 #[case::ioc_limit(true, TimeInForce::Ioc, None, Some("IOC"), None)]
2544 #[case::fok_limit(true, TimeInForce::Fok, None, Some("FOK"), None)]
2545 #[case::gtd_limit_with_expire(
2546 true,
2547 TimeInForce::Gtd,
2548 Some(1_704_067_200_000_000_000u64),
2549 Some("GTD"),
2550 Some("1704067200")
2551 )]
2552 #[case::gtc_market(false, TimeInForce::Gtc, None, None, None)]
2553 #[case::ioc_market(false, TimeInForce::Ioc, None, None, None)]
2554 fn test_compute_time_in_force_success(
2555 #[case] is_limit: bool,
2556 #[case] tif: TimeInForce,
2557 #[case] expire_nanos: Option<u64>,
2558 #[case] expected_tif: Option<&str>,
2559 #[case] expected_expire: Option<&str>,
2560 ) {
2561 let expire_time = expire_nanos.map(UnixNanos::from);
2562 let result = compute_time_in_force(is_limit, tif, expire_time).unwrap();
2563 assert_eq!(result.0, expected_tif.map(String::from));
2564 assert_eq!(result.1, expected_expire.map(String::from));
2565 }
2566
2567 #[rstest]
2568 #[case::gtd_missing_expire(TimeInForce::Gtd, None, "expire_time")]
2569 fn test_compute_time_in_force_errors(
2570 #[case] tif: TimeInForce,
2571 #[case] expire_nanos: Option<u64>,
2572 #[case] expected_error: &str,
2573 ) {
2574 let expire_time = expire_nanos.map(UnixNanos::from);
2575 let result = compute_time_in_force(true, tif, expire_time);
2576 assert!(result.is_err());
2577 assert!(result.unwrap_err().to_string().contains(expected_error));
2578 }
2579
2580 #[rstest]
2581 fn test_build_add_order_params_sets_index_trigger_for_conditional_orders() {
2582 let client = KrakenSpotHttpClient::default();
2583 let instrument_id = cache_test_spot_instrument(&client);
2584
2585 let params = client
2586 .build_add_order_params(
2587 instrument_id,
2588 ClientOrderId::new("spot-trigger-index"),
2589 OrderSide::Buy,
2590 OrderType::StopMarket,
2591 Quantity::from("0.01"),
2592 TimeInForce::Gtc,
2593 None,
2594 None,
2595 Some(Price::from("50000")),
2596 Some(TriggerType::IndexPrice),
2597 None,
2598 None,
2599 false,
2600 false,
2601 false,
2602 None,
2603 )
2604 .unwrap();
2605
2606 assert_eq!(params.trigger, Some("index".to_string()));
2607 assert_eq!(params.price, Some("50000".to_string()));
2608 }
2609
2610 #[rstest]
2611 fn test_build_add_order_params_sets_trailing_offsets() {
2612 let client = KrakenSpotHttpClient::default();
2613 let instrument_id = cache_test_spot_instrument(&client);
2614
2615 let params = client
2616 .build_add_order_params(
2617 instrument_id,
2618 ClientOrderId::new("spot-trailing"),
2619 OrderSide::Sell,
2620 OrderType::TrailingStopLimit,
2621 Quantity::from("0.01"),
2622 TimeInForce::Gtc,
2623 None,
2624 Some(Price::from("49900")),
2625 None,
2626 Some(TriggerType::LastPrice),
2627 Some(Decimal::from(50)),
2628 Some(Decimal::from(25)),
2629 false,
2630 false,
2631 false,
2632 Some(Quantity::from("0.005")),
2633 )
2634 .unwrap();
2635
2636 assert_eq!(params.price, Some("50".to_string()));
2637 assert_eq!(params.price2, Some("25".to_string()));
2638 assert_eq!(params.trigger, None);
2639 assert_eq!(params.displayvol, Some("0.005".to_string()));
2640 }
2641
2642 #[rstest]
2643 fn test_build_add_order_params_rejects_unsupported_trigger_type() {
2644 let client = KrakenSpotHttpClient::default();
2645 let instrument_id = cache_test_spot_instrument(&client);
2646
2647 let error = client
2648 .build_add_order_params(
2649 instrument_id,
2650 ClientOrderId::new("spot-trigger-invalid"),
2651 OrderSide::Buy,
2652 OrderType::StopMarket,
2653 Quantity::from("0.01"),
2654 TimeInForce::Gtc,
2655 None,
2656 None,
2657 Some(Price::from("50000")),
2658 Some(TriggerType::MarkPrice),
2659 None,
2660 None,
2661 false,
2662 false,
2663 false,
2664 None,
2665 )
2666 .unwrap_err();
2667
2668 assert!(
2669 error
2670 .to_string()
2671 .contains("Unsupported trigger type for Kraken Spot")
2672 );
2673 }
2674
2675 fn cache_test_spot_instrument(client: &KrakenSpotHttpClient) -> InstrumentId {
2676 let instrument_id = InstrumentId::from("XBT/USD.KRAKEN");
2677
2678 client.cache_instrument(InstrumentAny::CurrencyPair(CurrencyPair::new(
2679 instrument_id,
2680 Symbol::new("XBTUSD"),
2681 Currency::BTC(),
2682 Currency::USD(),
2683 1,
2684 8,
2685 Price::from("0.1"),
2686 Quantity::from("0.00000001"),
2687 None,
2688 None,
2689 None,
2690 None,
2691 None,
2692 None,
2693 None,
2694 None,
2695 None,
2696 None,
2697 None,
2698 None,
2699 None,
2700 0.into(),
2701 0.into(),
2702 )));
2703
2704 instrument_id
2705 }
2706}