Skip to main content

nautilus_kraken/execution/
spot.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Kraken Spot execution client implementation.
17
18use std::{
19    future::Future,
20    sync::{Arc, Mutex},
21    time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use chrono::{DateTime, Utc};
27use futures_util::StreamExt;
28use nautilus_common::{
29    clients::ExecutionClient,
30    live::{get_runtime, runner::get_exec_event_sender},
31    messages::execution::{
32        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
33        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
34        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
35    },
36};
37use nautilus_core::{
38    AtomicMap, MUTEX_POISONED, UnixNanos,
39    time::{AtomicTime, get_atomic_clock_realtime},
40};
41use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
42use nautilus_model::{
43    accounts::AccountAny,
44    enums::{AccountType, OmsType, OrderSide, OrderType, TimeInForce, TrailingOffsetType},
45    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, Venue},
46    instruments::{Instrument, InstrumentAny},
47    orders::{Order, OrderAny},
48    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
49    types::{AccountBalance, MarginBalance},
50};
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55    common::{
56        consts::{KRAKEN_SPOT_POST_ONLY_ERROR, KRAKEN_VENUE},
57        parse::truncate_cl_ord_id,
58    },
59    config::KrakenExecClientConfig,
60    http::{KrakenSpotHttpClient, spot::client::KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND},
61    websocket::{
62        dispatch::{self, OrderIdentity, WsDispatchState},
63        spot_v2::{client::KrakenSpotWebSocketClient, messages::KrakenSpotWsMessage},
64    },
65};
66
67/// Kraken Spot execution client.
68///
69/// Provides order management and account operations for Kraken Spot markets.
70#[allow(dead_code)]
71#[derive(Debug)]
72pub struct KrakenSpotExecutionClient {
73    core: ExecutionClientCore,
74    clock: &'static AtomicTime,
75    config: KrakenExecClientConfig,
76    emitter: ExecutionEventEmitter,
77    http: KrakenSpotHttpClient,
78    ws: KrakenSpotWebSocketClient,
79    cancellation_token: CancellationToken,
80    ws_stream_handle: Option<JoinHandle<()>>,
81    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
82    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
83    order_qty_cache: Arc<AtomicMap<String, f64>>,
84    truncated_id_map: Arc<AtomicMap<String, ClientOrderId>>,
85    ws_dispatch_state: Arc<WsDispatchState>,
86}
87
88impl KrakenSpotExecutionClient {
89    /// Creates a new [`KrakenSpotExecutionClient`].
90    pub fn new(core: ExecutionClientCore, config: KrakenExecClientConfig) -> anyhow::Result<Self> {
91        let clock = get_atomic_clock_realtime();
92        let emitter = ExecutionEventEmitter::new(
93            clock,
94            core.trader_id,
95            core.account_id,
96            AccountType::Cash,
97            None,
98        );
99
100        let cancellation_token = CancellationToken::new();
101
102        let http = KrakenSpotHttpClient::with_credentials(
103            config.api_key.clone(),
104            config.api_secret.clone(),
105            config.environment,
106            config.base_url.clone(),
107            config.timeout_secs,
108            None,
109            None,
110            None,
111            config.proxy_url.clone(),
112            config
113                .max_requests_per_second
114                .unwrap_or(KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND),
115        )?;
116
117        let data_config = crate::config::KrakenDataClientConfig {
118            api_key: Some(config.api_key.clone()),
119            api_secret: Some(config.api_secret.clone()),
120            product_type: config.product_type,
121            environment: config.environment,
122            base_url: config.base_url.clone(),
123            ws_public_url: None,
124            ws_private_url: Some(config.ws_url()),
125            proxy_url: config.proxy_url.clone(),
126            timeout_secs: config.timeout_secs,
127            heartbeat_interval_secs: config.heartbeat_interval_secs,
128            max_requests_per_second: config.max_requests_per_second,
129            transport_backend: config.transport_backend,
130        };
131        let ws = KrakenSpotWebSocketClient::new(
132            data_config,
133            cancellation_token.clone(),
134            config.proxy_url.clone(),
135        );
136
137        Ok(Self {
138            core,
139            clock,
140            config,
141            emitter,
142            http,
143            ws,
144            cancellation_token,
145            ws_stream_handle: None,
146            pending_tasks: Mutex::new(Vec::new()),
147            instruments: Arc::new(AtomicMap::new()),
148            order_qty_cache: Arc::new(AtomicMap::new()),
149            truncated_id_map: Arc::new(AtomicMap::new()),
150            ws_dispatch_state: Arc::new(WsDispatchState::new()),
151        })
152    }
153
154    fn register_order_identity(&self, order: &OrderAny) {
155        // Quote-quantity orders submit a quote amount (e.g. 100 USD), but the
156        // venue reports fills in base units (e.g. 0.001 BTC). Registering the
157        // raw `order.quantity()` would make the cumulative-fill comparison in
158        // the fill-side dispatch mismatch base against quote, leaving the
159        // order "open" forever. These orders instead flow through the
160        // untracked path and the engine reconciles them from status reports.
161        if order.is_quote_quantity() {
162            return;
163        }
164        self.ws_dispatch_state.register_identity(
165            order.client_order_id(),
166            OrderIdentity {
167                strategy_id: order.strategy_id(),
168                instrument_id: order.instrument_id(),
169                order_side: order.order_side(),
170                order_type: order.order_type(),
171                quantity: order.quantity(),
172            },
173        );
174    }
175
176    /// Returns a reference to the clock.
177    #[must_use]
178    pub fn clock(&self) -> &'static AtomicTime {
179        self.clock
180    }
181
182    /// Returns a reference to the event emitter.
183    #[must_use]
184    pub fn emitter(&self) -> &ExecutionEventEmitter {
185        &self.emitter
186    }
187
188    fn spawn_task<F>(&self, description: &'static str, fut: F)
189    where
190        F: Future<Output = anyhow::Result<()>> + Send + 'static,
191    {
192        let runtime = get_runtime();
193        let handle = runtime.spawn(async move {
194            if let Err(e) = fut.await {
195                log::warn!("{description} failed: {e:?}");
196            }
197        });
198
199        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
200        tasks.retain(|handle| !handle.is_finished());
201        tasks.push(handle);
202    }
203
204    fn submit_single_order(&self, order: &OrderAny, task_name: &'static str) {
205        if order.is_closed() {
206            log::warn!(
207                "Cannot submit closed order: client_order_id={}",
208                order.client_order_id()
209            );
210            return;
211        }
212
213        let order_type = order.order_type();
214        let time_in_force = order.time_in_force();
215
216        // FOK only supported for plain limit orders on Kraken Spot
217        if time_in_force == TimeInForce::Fok && order_type != OrderType::Limit {
218            self.emitter.emit_order_denied(
219                order,
220                "FOK time in force only supported for LIMIT orders on Kraken Spot",
221            );
222            return;
223        }
224
225        // Kraken only supports price-based trailing offsets
226        if matches!(
227            order_type,
228            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
229        ) && let Some(offset_type) = order.trailing_offset_type()
230            && offset_type != TrailingOffsetType::Price
231        {
232            self.emitter.emit_order_denied(
233                order,
234                &format!(
235                    "Kraken Spot only supports Price trailing offset type: received {offset_type:?}"
236                ),
237            );
238            return;
239        }
240
241        let account_id = self.core.account_id;
242        let client_order_id = order.client_order_id();
243        let strategy_id = order.strategy_id();
244        let instrument_id = order.instrument_id();
245        let order_side = order.order_side();
246        let quantity = order.quantity();
247        let expire_time = order.expire_time();
248        let price = order.price();
249        let trigger_price = order.trigger_price();
250        let trigger_type = order.trigger_type();
251        let trailing_offset = order.trailing_offset();
252        let limit_offset = order.limit_offset();
253        let is_reduce_only = order.is_reduce_only();
254        let is_post_only = order.is_post_only();
255        let is_quote_quantity = order.is_quote_quantity();
256        let display_qty = order.display_qty();
257
258        log::debug!("OrderSubmitted: client_order_id={client_order_id}");
259        self.register_order_identity(order);
260        self.emitter.emit_order_submitted(order);
261
262        let kraken_cl_ord_id = truncate_cl_ord_id(&client_order_id);
263
264        // Only cache base-denominated quantities; quote quantities
265        // are not valid for the WS order report fallback path
266        if !is_quote_quantity {
267            self.order_qty_cache
268                .insert(kraken_cl_ord_id.clone(), quantity.as_f64());
269        }
270
271        if kraken_cl_ord_id != client_order_id.as_str() {
272            self.truncated_id_map
273                .insert(kraken_cl_ord_id, client_order_id);
274        }
275
276        let http = self.http.clone();
277        let emitter = self.emitter.clone();
278        let clock = self.clock;
279        let dispatch_state = self.ws_dispatch_state.clone();
280
281        self.spawn_task(task_name, async move {
282            let result = http
283                .submit_order(
284                    account_id,
285                    instrument_id,
286                    client_order_id,
287                    order_side,
288                    order_type,
289                    quantity,
290                    time_in_force,
291                    expire_time,
292                    price,
293                    trigger_price,
294                    trigger_type,
295                    trailing_offset,
296                    limit_offset,
297                    is_reduce_only,
298                    is_post_only,
299                    is_quote_quantity,
300                    display_qty,
301                )
302                .await;
303
304            if let Err(e) = result {
305                let ts_event = clock.get_time_ns();
306                let error_msg = format!("{task_name} error: {e}");
307                let due_post_only = error_msg.contains("POST_ONLY_REJECTED")
308                    || error_msg.contains(KRAKEN_SPOT_POST_ONLY_ERROR);
309                // The order will never appear on the wire, so its dispatch
310                // identity has to be cleaned up here.
311                dispatch_state.cleanup_terminal(&client_order_id);
312                emitter.emit_order_rejected_event(
313                    strategy_id,
314                    instrument_id,
315                    client_order_id,
316                    &error_msg,
317                    ts_event,
318                    due_post_only,
319                );
320                return Ok(());
321            }
322
323            Ok(())
324        });
325    }
326
327    fn cancel_single_order(&self, cmd: &CancelOrder) {
328        let account_id = self.core.account_id;
329        let client_order_id = cmd.client_order_id;
330        let venue_order_id = cmd.venue_order_id;
331        let strategy_id = cmd.strategy_id;
332        let instrument_id = cmd.instrument_id;
333
334        log::info!(
335            "Canceling order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
336        );
337
338        let http = self.http.clone();
339        let emitter = self.emitter.clone();
340        let clock = self.clock;
341
342        self.spawn_task("cancel_order", async move {
343            if let Err(e) = http
344                .cancel_order(
345                    account_id,
346                    instrument_id,
347                    Some(client_order_id),
348                    venue_order_id,
349                )
350                .await
351            {
352                let ts_event = clock.get_time_ns();
353                emitter.emit_order_cancel_rejected_event(
354                    strategy_id,
355                    instrument_id,
356                    client_order_id,
357                    venue_order_id,
358                    &format!("cancel-order error: {e}"),
359                    ts_event,
360                );
361                anyhow::bail!("Cancel order failed: {e}");
362            }
363            Ok(())
364        });
365    }
366
367    fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
368        let stream = self.ws.stream().map_err(|e| anyhow::anyhow!("{e}"))?;
369        let emitter = self.emitter.clone();
370        let instruments = self.instruments.clone();
371        let order_qty_cache = self.order_qty_cache.clone();
372        let truncated_id_map = self.truncated_id_map.clone();
373        let dispatch_state = self.ws_dispatch_state.clone();
374        let account_id = self.core.account_id;
375        let clock = self.clock;
376        let cancellation_token = self.cancellation_token.clone();
377
378        let handle = get_runtime().spawn(async move {
379            tokio::pin!(stream);
380
381            loop {
382                tokio::select! {
383                    () = cancellation_token.cancelled() => {
384                        log::debug!("Spot execution message handler cancelled");
385                        break;
386                    }
387                    msg = stream.next() => {
388                        match msg {
389                            Some(ws_msg) => {
390                                Self::handle_ws_message(
391                                    ws_msg,
392                                    &emitter,
393                                    &dispatch_state,
394                                    &instruments,
395                                    &order_qty_cache,
396                                    &truncated_id_map,
397                                    account_id,
398                                    clock,
399                                );
400                            }
401                            None => {
402                                log::debug!("Spot execution WebSocket stream ended");
403                                break;
404                            }
405                        }
406                    }
407                }
408            }
409        });
410
411        self.ws_stream_handle = Some(handle);
412        Ok(())
413    }
414
415    fn modify_single_order(&self, cmd: &ModifyOrder) {
416        let client_order_id = cmd.client_order_id;
417        let venue_order_id = cmd.venue_order_id;
418        let strategy_id = cmd.strategy_id;
419        let instrument_id = cmd.instrument_id;
420        let quantity = cmd.quantity;
421        let price = cmd.price;
422
423        log::info!(
424            "Modifying order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
425        );
426
427        let http = self.http.clone();
428        let emitter = self.emitter.clone();
429        let clock = self.clock;
430
431        self.spawn_task("modify_order", async move {
432            if let Err(e) = http
433                .modify_order(
434                    instrument_id,
435                    Some(client_order_id),
436                    venue_order_id,
437                    quantity,
438                    price,
439                    None,
440                )
441                .await
442            {
443                let ts_event = clock.get_time_ns();
444                emitter.emit_order_modify_rejected_event(
445                    strategy_id,
446                    instrument_id,
447                    client_order_id,
448                    venue_order_id,
449                    &format!("modify-order error: {e}"),
450                    ts_event,
451                );
452                anyhow::bail!("Modify order failed: {e}");
453            }
454            Ok(())
455        });
456    }
457
458    /// Polls the cache until the account is registered or timeout is reached.
459    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
460        let account_id = self.core.account_id;
461
462        if self.core.cache().account(&account_id).is_some() {
463            log::info!("Account {account_id} registered");
464            return Ok(());
465        }
466
467        let start = Instant::now();
468        let timeout = Duration::from_secs_f64(timeout_secs);
469        let interval = Duration::from_millis(10);
470
471        loop {
472            tokio::time::sleep(interval).await;
473
474            if self.core.cache().account(&account_id).is_some() {
475                log::info!("Account {account_id} registered");
476                return Ok(());
477            }
478
479            if start.elapsed() >= timeout {
480                anyhow::bail!(
481                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
482                );
483            }
484        }
485    }
486
487    #[expect(clippy::too_many_arguments)]
488    fn handle_ws_message(
489        msg: KrakenSpotWsMessage,
490        emitter: &ExecutionEventEmitter,
491        dispatch_state: &Arc<WsDispatchState>,
492        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
493        order_qty_cache: &Arc<AtomicMap<String, f64>>,
494        truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
495        account_id: AccountId,
496        clock: &'static AtomicTime,
497    ) {
498        match msg {
499            KrakenSpotWsMessage::Execution(executions) => {
500                let ts_init = clock.get_time_ns();
501
502                for exec in &executions {
503                    dispatch::spot::execution(
504                        exec,
505                        dispatch_state,
506                        emitter,
507                        instruments,
508                        truncated_id_map,
509                        order_qty_cache,
510                        account_id,
511                        ts_init,
512                    );
513                }
514            }
515            KrakenSpotWsMessage::Reconnected => {
516                log::info!("Spot execution WebSocket reconnected");
517            }
518            KrakenSpotWsMessage::Ticker(_)
519            | KrakenSpotWsMessage::Trade(_)
520            | KrakenSpotWsMessage::Book { .. }
521            | KrakenSpotWsMessage::Ohlc(_) => {}
522        }
523    }
524}
525
526#[async_trait(?Send)]
527impl ExecutionClient for KrakenSpotExecutionClient {
528    fn is_connected(&self) -> bool {
529        self.core.is_connected()
530    }
531
532    fn client_id(&self) -> ClientId {
533        self.core.client_id
534    }
535
536    fn account_id(&self) -> AccountId {
537        self.core.account_id
538    }
539
540    fn venue(&self) -> Venue {
541        *KRAKEN_VENUE
542    }
543
544    fn oms_type(&self) -> OmsType {
545        self.core.oms_type
546    }
547
548    fn get_account(&self) -> Option<AccountAny> {
549        self.core.cache().account(&self.core.account_id).cloned()
550    }
551
552    fn generate_account_state(
553        &self,
554        balances: Vec<AccountBalance>,
555        margins: Vec<MarginBalance>,
556        reported: bool,
557        ts_event: UnixNanos,
558    ) -> anyhow::Result<()> {
559        self.emitter
560            .emit_account_state(balances, margins, reported, ts_event);
561        Ok(())
562    }
563
564    fn start(&mut self) -> anyhow::Result<()> {
565        if self.core.is_started() {
566            return Ok(());
567        }
568
569        self.emitter.set_sender(get_exec_event_sender());
570        self.core.set_started();
571
572        log::info!(
573            "Started: client_id={}, account_id={}, product_type=Spot, environment={:?}",
574            self.core.client_id,
575            self.core.account_id,
576            self.config.environment
577        );
578        Ok(())
579    }
580
581    fn stop(&mut self) -> anyhow::Result<()> {
582        if self.core.is_stopped() {
583            return Ok(());
584        }
585
586        self.cancellation_token.cancel();
587        self.core.set_stopped();
588        self.core.set_disconnected();
589        log::info!("Stopped: client_id={}", self.core.client_id);
590        Ok(())
591    }
592
593    async fn connect(&mut self) -> anyhow::Result<()> {
594        if self.core.is_connected() {
595            return Ok(());
596        }
597
598        if !self.core.instruments_initialized() {
599            let instruments = self
600                .http
601                .request_instruments(None)
602                .await
603                .context("Failed to load Kraken spot instruments")?;
604            log::info!("Loaded {} Spot instruments", instruments.len());
605            self.http.cache_instruments(&instruments);
606            self.core.set_instruments_initialized();
607        }
608
609        self.ws
610            .connect()
611            .await
612            .context("Failed to connect spot WebSocket")?;
613        self.ws
614            .wait_until_active(10.0)
615            .await
616            .context("Spot WebSocket failed to become active")?;
617
618        self.ws
619            .authenticate()
620            .await
621            .context("Failed to authenticate spot WebSocket")?;
622
623        // Request initial account state and await registration before spawning
624        // the message handler. Report events from execution snapshots conflict
625        // with ExecEngine borrows during startup, so account registration must
626        // complete first.
627        let account_state = self
628            .http
629            .request_account_state(self.core.account_id)
630            .await
631            .context("Failed to request Kraken account state")?;
632
633        if !account_state.balances.is_empty() {
634            log::info!(
635                "Received account state with {} balance(s)",
636                account_state.balances.len()
637            );
638        }
639
640        self.emitter.send_account_state(account_state);
641        self.await_account_registered(30.0).await?;
642
643        self.spawn_message_handler()?;
644
645        self.instruments.rcu(|m| {
646            for instrument in self.http.instruments_cache.load().values() {
647                m.insert(instrument.id(), instrument.clone());
648            }
649        });
650
651        self.ws
652            .subscribe_executions(false, false)
653            .await
654            .context("Failed to subscribe to executions")?;
655
656        log::info!("Spot WebSocket authenticated and subscribed to executions");
657
658        self.core.set_connected();
659        log::info!("Connected: client_id={}", self.core.client_id);
660        Ok(())
661    }
662
663    async fn disconnect(&mut self) -> anyhow::Result<()> {
664        if self.core.is_disconnected() {
665            return Ok(());
666        }
667
668        self.cancellation_token.cancel();
669
670        if let Some(handle) = self.ws_stream_handle.take() {
671            handle.abort();
672        }
673
674        let _ = self.ws.close().await;
675
676        self.cancellation_token = CancellationToken::new();
677        self.core.set_disconnected();
678        log::info!("Disconnected: client_id={}", self.core.client_id);
679        Ok(())
680    }
681
682    async fn generate_order_status_report(
683        &self,
684        cmd: &GenerateOrderStatusReport,
685    ) -> anyhow::Result<Option<OrderStatusReport>> {
686        log::debug!(
687            "Generating order status report: venue_order_id={:?}, client_order_id={:?}",
688            cmd.venue_order_id,
689            cmd.client_order_id
690        );
691
692        let account_id = self.core.account_id;
693        let reports = self
694            .http
695            .request_order_status_reports(account_id, None, None, None, false)
696            .await?;
697
698        // Match by venue_order_id or client_order_id (comparing truncated form
699        // since Kraken stores the truncated cl_ord_id for long IDs)
700        Ok(reports.into_iter().find(|r| {
701            cmd.venue_order_id
702                .is_some_and(|id| r.venue_order_id.as_str() == id.as_str())
703                || cmd.client_order_id.is_some_and(|id| {
704                    r.client_order_id
705                        .as_ref()
706                        .is_some_and(|r_id| r_id.as_str() == truncate_cl_ord_id(&id))
707                })
708        }))
709    }
710
711    async fn generate_order_status_reports(
712        &self,
713        cmd: &GenerateOrderStatusReports,
714    ) -> anyhow::Result<Vec<OrderStatusReport>> {
715        log::debug!(
716            "Generating order status reports: instrument_id={:?}, open_only={}",
717            cmd.instrument_id,
718            cmd.open_only
719        );
720
721        let account_id = self.core.account_id;
722        let start = cmd.start.map(DateTime::<Utc>::from);
723        let end = cmd.end.map(DateTime::<Utc>::from);
724        self.http
725            .request_order_status_reports(account_id, cmd.instrument_id, start, end, cmd.open_only)
726            .await
727    }
728
729    async fn generate_fill_reports(
730        &self,
731        cmd: GenerateFillReports,
732    ) -> anyhow::Result<Vec<FillReport>> {
733        log::debug!(
734            "Generating fill reports: instrument_id={:?}",
735            cmd.instrument_id
736        );
737
738        let account_id = self.core.account_id;
739        let start = cmd.start.map(DateTime::<Utc>::from);
740        let end = cmd.end.map(DateTime::<Utc>::from);
741        self.http
742            .request_fill_reports(account_id, cmd.instrument_id, start, end)
743            .await
744    }
745
746    async fn generate_position_status_reports(
747        &self,
748        cmd: &GeneratePositionStatusReports,
749    ) -> anyhow::Result<Vec<PositionStatusReport>> {
750        log::debug!(
751            "Generating position status reports: instrument_id={:?}",
752            cmd.instrument_id
753        );
754
755        let account_id = self.core.account_id;
756        self.http
757            .request_position_status_reports(account_id, cmd.instrument_id)
758            .await
759    }
760
761    async fn generate_mass_status(
762        &self,
763        lookback_mins: Option<u64>,
764    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
765        log::debug!("Generating mass status: lookback_mins={lookback_mins:?}");
766
767        let start = lookback_mins.map(|mins| Utc::now() - Duration::from_secs(mins * 60));
768
769        let account_id = self.core.account_id;
770        let order_reports = self
771            .http
772            .request_order_status_reports(account_id, None, start, None, true)
773            .await?;
774        let fill_reports = self
775            .http
776            .request_fill_reports(account_id, None, start, None)
777            .await?;
778        let position_reports = self
779            .http
780            .request_position_status_reports(account_id, None)
781            .await?;
782
783        let mut mass_status = ExecutionMassStatus::new(
784            self.core.client_id,
785            self.core.account_id,
786            *KRAKEN_VENUE,
787            self.clock.get_time_ns(),
788            None,
789        );
790        mass_status.add_order_reports(order_reports);
791        mass_status.add_fill_reports(fill_reports);
792        mass_status.add_position_reports(position_reports);
793
794        Ok(Some(mass_status))
795    }
796
797    fn query_account(&self, cmd: QueryAccount) -> anyhow::Result<()> {
798        log::debug!("Querying account: {cmd:?}");
799
800        let account_id = self.core.account_id;
801        let http = self.http.clone();
802        let emitter = self.emitter.clone();
803
804        self.spawn_task("query_account", async move {
805            let account_state = http.request_account_state(account_id).await?;
806            emitter.emit_account_state(
807                account_state.balances.clone(),
808                account_state.margins.clone(),
809                account_state.is_reported,
810                account_state.ts_event,
811            );
812            Ok(())
813        });
814
815        Ok(())
816    }
817
818    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
819        log::debug!("Querying order: {cmd:?}");
820
821        let venue_order_id = cmd
822            .venue_order_id
823            .context("venue_order_id required for query_order")?;
824        let account_id = self.core.account_id;
825        let http = self.http.clone();
826        let emitter = self.emitter.clone();
827
828        self.spawn_task("query_order", async move {
829            let reports = http
830                .request_order_status_reports(account_id, None, None, None, true)
831                .await
832                .context("Failed to query order")?;
833
834            if let Some(report) = reports
835                .into_iter()
836                .find(|r| r.venue_order_id == venue_order_id)
837            {
838                emitter.send_order_status_report(report);
839            }
840            Ok(())
841        });
842
843        Ok(())
844    }
845
846    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
847        let order = self
848            .core
849            .cache()
850            .order(&cmd.client_order_id)
851            .cloned()
852            .ok_or_else(|| anyhow::anyhow!("Order not found in cache: {}", cmd.client_order_id))?;
853        self.submit_single_order(&order, "submit_order");
854        Ok(())
855    }
856
857    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
858        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
859
860        log::info!(
861            "Submitting order list: order_list_id={}, count={}",
862            cmd.order_list.id,
863            orders.len()
864        );
865
866        let mut order_tuples = Vec::with_capacity(orders.len());
867        let mut order_meta = Vec::with_capacity(orders.len());
868
869        for order in &orders {
870            if order.is_closed() {
871                log::warn!(
872                    "Cannot submit closed order: client_order_id={}",
873                    order.client_order_id()
874                );
875                continue;
876            }
877
878            if order.time_in_force() == TimeInForce::Fok && order.order_type() != OrderType::Limit {
879                self.emitter.emit_order_denied(
880                    order,
881                    "FOK time in force only supported for LIMIT orders on Kraken Spot",
882                );
883                continue;
884            }
885
886            if matches!(
887                order.order_type(),
888                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
889            ) && let Some(offset_type) = order.trailing_offset_type()
890                && offset_type != TrailingOffsetType::Price
891            {
892                self.emitter.emit_order_denied(
893                    order,
894                    &format!(
895                        "Kraken Spot only supports Price trailing offset type: received {offset_type:?}"
896                    ),
897                );
898                continue;
899            }
900
901            let client_order_id = order.client_order_id();
902            let kraken_cl_ord_id = truncate_cl_ord_id(&client_order_id);
903
904            self.register_order_identity(order);
905            self.emitter.emit_order_submitted(order);
906
907            if !order.is_quote_quantity() {
908                self.order_qty_cache
909                    .insert(kraken_cl_ord_id.clone(), order.quantity().as_f64());
910            }
911
912            if kraken_cl_ord_id != client_order_id.as_str() {
913                self.truncated_id_map
914                    .insert(kraken_cl_ord_id, client_order_id);
915            }
916
917            order_tuples.push((
918                order.instrument_id(),
919                client_order_id,
920                order.order_side(),
921                order.order_type(),
922                order.quantity(),
923                order.time_in_force(),
924                order.expire_time(),
925                order.price(),
926                order.trigger_price(),
927                order.trigger_type(),
928                order.trailing_offset(),
929                order.limit_offset(),
930                order.is_reduce_only(),
931                order.is_post_only(),
932                order.is_quote_quantity(),
933                order.display_qty(),
934            ));
935
936            order_meta.push((order.strategy_id(), order.instrument_id(), client_order_id));
937        }
938
939        if order_tuples.is_empty() {
940            return Ok(());
941        }
942
943        let http = self.http.clone();
944        let emitter = self.emitter.clone();
945        let clock = self.clock;
946        let dispatch_state = self.ws_dispatch_state.clone();
947
948        self.spawn_task("submit_order_list", async move {
949            match http.submit_orders_batch(order_tuples).await {
950                Ok(statuses) => {
951                    // The HTTP helper returns one status per input tuple, including validation failures
952                    for (i, status) in statuses.iter().enumerate() {
953                        if status != "placed"
954                            && let Some((strategy_id, instrument_id, client_order_id)) =
955                                order_meta.get(i)
956                        {
957                            let ts_event = clock.get_time_ns();
958                            let due_post_only = status.contains("POST_ONLY_REJECTED")
959                                || status.contains(KRAKEN_SPOT_POST_ONLY_ERROR);
960                            dispatch_state.cleanup_terminal(client_order_id);
961                            emitter.emit_order_rejected_event(
962                                *strategy_id,
963                                *instrument_id,
964                                *client_order_id,
965                                &format!("submit_order_list batch item rejected: {status}"),
966                                ts_event,
967                                due_post_only,
968                            );
969                        }
970                    }
971                    Ok(())
972                }
973                Err(e) => {
974                    let ts_event = clock.get_time_ns();
975                    let error_msg = format!("submit_order_list batch error: {e}");
976
977                    for (strategy_id, instrument_id, client_order_id) in &order_meta {
978                        dispatch_state.cleanup_terminal(client_order_id);
979                        emitter.emit_order_rejected_event(
980                            *strategy_id,
981                            *instrument_id,
982                            *client_order_id,
983                            &error_msg,
984                            ts_event,
985                            false,
986                        );
987                    }
988                    Ok(())
989                }
990            }
991        });
992
993        Ok(())
994    }
995
996    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
997        self.modify_single_order(&cmd);
998        Ok(())
999    }
1000
1001    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1002        self.cancel_single_order(&cmd);
1003        Ok(())
1004    }
1005
1006    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1007        let instrument_id = cmd.instrument_id;
1008
1009        if cmd.order_side == OrderSide::NoOrderSide {
1010            log::info!("Canceling all orders: instrument_id={instrument_id} (bulk)");
1011
1012            let http = self.http.clone();
1013
1014            self.spawn_task("cancel_all_orders", async move {
1015                if let Err(e) = http.inner.cancel_all_orders().await {
1016                    anyhow::bail!("Cancel all orders failed: {e}");
1017                }
1018                Ok(())
1019            });
1020
1021            return Ok(());
1022        }
1023
1024        log::info!(
1025            "Canceling all orders: instrument_id={instrument_id}, side={:?}",
1026            cmd.order_side
1027        );
1028
1029        let orders_to_cancel: Vec<_> = {
1030            let cache = self.core.cache();
1031            let open_orders = cache.orders_open(None, Some(&instrument_id), None, None, None);
1032
1033            open_orders
1034                .into_iter()
1035                .filter(|order| order.order_side() == cmd.order_side)
1036                .filter_map(|order| {
1037                    Some((
1038                        order.venue_order_id()?,
1039                        order.client_order_id(),
1040                        order.instrument_id(),
1041                        order.strategy_id(),
1042                    ))
1043                })
1044                .collect()
1045        };
1046
1047        let account_id = self.core.account_id;
1048
1049        for (venue_order_id, client_order_id, order_instrument_id, strategy_id) in orders_to_cancel
1050        {
1051            let http = self.http.clone();
1052            let emitter = self.emitter.clone();
1053            let clock = self.clock;
1054
1055            self.spawn_task("cancel_order_by_side", async move {
1056                if let Err(e) = http
1057                    .cancel_order(
1058                        account_id,
1059                        order_instrument_id,
1060                        Some(client_order_id),
1061                        Some(venue_order_id),
1062                    )
1063                    .await
1064                {
1065                    log::error!("Cancel order failed: {e}");
1066                    let ts_event = clock.get_time_ns();
1067                    emitter.emit_order_cancel_rejected_event(
1068                        strategy_id,
1069                        order_instrument_id,
1070                        client_order_id,
1071                        Some(venue_order_id),
1072                        &format!("cancel-order error: {e}"),
1073                        ts_event,
1074                    );
1075                }
1076                Ok(())
1077            });
1078        }
1079
1080        Ok(())
1081    }
1082
1083    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1084        log::info!(
1085            "Batch canceling orders: instrument_id={}, count={}",
1086            cmd.instrument_id,
1087            cmd.cancels.len()
1088        );
1089
1090        for cancel in &cmd.cancels {
1091            self.cancel_single_order(cancel);
1092        }
1093
1094        Ok(())
1095    }
1096}