1use std::{
19 future::Future,
20 sync::{Arc, Mutex},
21 time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use chrono::{DateTime, Utc};
27use futures_util::StreamExt;
28use nautilus_common::{
29 clients::ExecutionClient,
30 live::{get_runtime, runner::get_exec_event_sender},
31 messages::execution::{
32 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
33 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
34 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
35 },
36};
37use nautilus_core::{
38 AtomicMap, MUTEX_POISONED, UnixNanos,
39 time::{AtomicTime, get_atomic_clock_realtime},
40};
41use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
42use nautilus_model::{
43 accounts::AccountAny,
44 enums::{AccountType, OmsType, OrderSide, OrderType, TimeInForce, TrailingOffsetType},
45 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, Venue},
46 instruments::{Instrument, InstrumentAny},
47 orders::{Order, OrderAny},
48 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
49 types::{AccountBalance, MarginBalance},
50};
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55 common::{
56 consts::{KRAKEN_SPOT_POST_ONLY_ERROR, KRAKEN_VENUE},
57 parse::truncate_cl_ord_id,
58 },
59 config::KrakenExecClientConfig,
60 http::{KrakenSpotHttpClient, spot::client::KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND},
61 websocket::{
62 dispatch::{self, OrderIdentity, WsDispatchState},
63 spot_v2::{client::KrakenSpotWebSocketClient, messages::KrakenSpotWsMessage},
64 },
65};
66
67#[allow(dead_code)]
71#[derive(Debug)]
72pub struct KrakenSpotExecutionClient {
73 core: ExecutionClientCore,
74 clock: &'static AtomicTime,
75 config: KrakenExecClientConfig,
76 emitter: ExecutionEventEmitter,
77 http: KrakenSpotHttpClient,
78 ws: KrakenSpotWebSocketClient,
79 cancellation_token: CancellationToken,
80 ws_stream_handle: Option<JoinHandle<()>>,
81 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
82 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
83 order_qty_cache: Arc<AtomicMap<String, f64>>,
84 truncated_id_map: Arc<AtomicMap<String, ClientOrderId>>,
85 ws_dispatch_state: Arc<WsDispatchState>,
86}
87
88impl KrakenSpotExecutionClient {
89 pub fn new(core: ExecutionClientCore, config: KrakenExecClientConfig) -> anyhow::Result<Self> {
91 let clock = get_atomic_clock_realtime();
92 let emitter = ExecutionEventEmitter::new(
93 clock,
94 core.trader_id,
95 core.account_id,
96 AccountType::Cash,
97 None,
98 );
99
100 let cancellation_token = CancellationToken::new();
101
102 let http = KrakenSpotHttpClient::with_credentials(
103 config.api_key.clone(),
104 config.api_secret.clone(),
105 config.environment,
106 config.base_url.clone(),
107 config.timeout_secs,
108 None,
109 None,
110 None,
111 config.proxy_url.clone(),
112 config
113 .max_requests_per_second
114 .unwrap_or(KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND),
115 )?;
116
117 let data_config = crate::config::KrakenDataClientConfig {
118 api_key: Some(config.api_key.clone()),
119 api_secret: Some(config.api_secret.clone()),
120 product_type: config.product_type,
121 environment: config.environment,
122 base_url: config.base_url.clone(),
123 ws_public_url: None,
124 ws_private_url: Some(config.ws_url()),
125 proxy_url: config.proxy_url.clone(),
126 timeout_secs: config.timeout_secs,
127 heartbeat_interval_secs: config.heartbeat_interval_secs,
128 max_requests_per_second: config.max_requests_per_second,
129 transport_backend: config.transport_backend,
130 };
131 let ws = KrakenSpotWebSocketClient::new(
132 data_config,
133 cancellation_token.clone(),
134 config.proxy_url.clone(),
135 );
136
137 Ok(Self {
138 core,
139 clock,
140 config,
141 emitter,
142 http,
143 ws,
144 cancellation_token,
145 ws_stream_handle: None,
146 pending_tasks: Mutex::new(Vec::new()),
147 instruments: Arc::new(AtomicMap::new()),
148 order_qty_cache: Arc::new(AtomicMap::new()),
149 truncated_id_map: Arc::new(AtomicMap::new()),
150 ws_dispatch_state: Arc::new(WsDispatchState::new()),
151 })
152 }
153
154 fn register_order_identity(&self, order: &OrderAny) {
155 if order.is_quote_quantity() {
162 return;
163 }
164 self.ws_dispatch_state.register_identity(
165 order.client_order_id(),
166 OrderIdentity {
167 strategy_id: order.strategy_id(),
168 instrument_id: order.instrument_id(),
169 order_side: order.order_side(),
170 order_type: order.order_type(),
171 quantity: order.quantity(),
172 },
173 );
174 }
175
176 #[must_use]
178 pub fn clock(&self) -> &'static AtomicTime {
179 self.clock
180 }
181
182 #[must_use]
184 pub fn emitter(&self) -> &ExecutionEventEmitter {
185 &self.emitter
186 }
187
188 fn spawn_task<F>(&self, description: &'static str, fut: F)
189 where
190 F: Future<Output = anyhow::Result<()>> + Send + 'static,
191 {
192 let runtime = get_runtime();
193 let handle = runtime.spawn(async move {
194 if let Err(e) = fut.await {
195 log::warn!("{description} failed: {e:?}");
196 }
197 });
198
199 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
200 tasks.retain(|handle| !handle.is_finished());
201 tasks.push(handle);
202 }
203
204 fn submit_single_order(&self, order: &OrderAny, task_name: &'static str) {
205 if order.is_closed() {
206 log::warn!(
207 "Cannot submit closed order: client_order_id={}",
208 order.client_order_id()
209 );
210 return;
211 }
212
213 let order_type = order.order_type();
214 let time_in_force = order.time_in_force();
215
216 if time_in_force == TimeInForce::Fok && order_type != OrderType::Limit {
218 self.emitter.emit_order_denied(
219 order,
220 "FOK time in force only supported for LIMIT orders on Kraken Spot",
221 );
222 return;
223 }
224
225 if matches!(
227 order_type,
228 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
229 ) && let Some(offset_type) = order.trailing_offset_type()
230 && offset_type != TrailingOffsetType::Price
231 {
232 self.emitter.emit_order_denied(
233 order,
234 &format!(
235 "Kraken Spot only supports Price trailing offset type: received {offset_type:?}"
236 ),
237 );
238 return;
239 }
240
241 let account_id = self.core.account_id;
242 let client_order_id = order.client_order_id();
243 let strategy_id = order.strategy_id();
244 let instrument_id = order.instrument_id();
245 let order_side = order.order_side();
246 let quantity = order.quantity();
247 let expire_time = order.expire_time();
248 let price = order.price();
249 let trigger_price = order.trigger_price();
250 let trigger_type = order.trigger_type();
251 let trailing_offset = order.trailing_offset();
252 let limit_offset = order.limit_offset();
253 let is_reduce_only = order.is_reduce_only();
254 let is_post_only = order.is_post_only();
255 let is_quote_quantity = order.is_quote_quantity();
256 let display_qty = order.display_qty();
257
258 log::debug!("OrderSubmitted: client_order_id={client_order_id}");
259 self.register_order_identity(order);
260 self.emitter.emit_order_submitted(order);
261
262 let kraken_cl_ord_id = truncate_cl_ord_id(&client_order_id);
263
264 if !is_quote_quantity {
267 self.order_qty_cache
268 .insert(kraken_cl_ord_id.clone(), quantity.as_f64());
269 }
270
271 if kraken_cl_ord_id != client_order_id.as_str() {
272 self.truncated_id_map
273 .insert(kraken_cl_ord_id, client_order_id);
274 }
275
276 let http = self.http.clone();
277 let emitter = self.emitter.clone();
278 let clock = self.clock;
279 let dispatch_state = self.ws_dispatch_state.clone();
280
281 self.spawn_task(task_name, async move {
282 let result = http
283 .submit_order(
284 account_id,
285 instrument_id,
286 client_order_id,
287 order_side,
288 order_type,
289 quantity,
290 time_in_force,
291 expire_time,
292 price,
293 trigger_price,
294 trigger_type,
295 trailing_offset,
296 limit_offset,
297 is_reduce_only,
298 is_post_only,
299 is_quote_quantity,
300 display_qty,
301 )
302 .await;
303
304 if let Err(e) = result {
305 let ts_event = clock.get_time_ns();
306 let error_msg = format!("{task_name} error: {e}");
307 let due_post_only = error_msg.contains("POST_ONLY_REJECTED")
308 || error_msg.contains(KRAKEN_SPOT_POST_ONLY_ERROR);
309 dispatch_state.cleanup_terminal(&client_order_id);
312 emitter.emit_order_rejected_event(
313 strategy_id,
314 instrument_id,
315 client_order_id,
316 &error_msg,
317 ts_event,
318 due_post_only,
319 );
320 return Ok(());
321 }
322
323 Ok(())
324 });
325 }
326
327 fn cancel_single_order(&self, cmd: &CancelOrder) {
328 let account_id = self.core.account_id;
329 let client_order_id = cmd.client_order_id;
330 let venue_order_id = cmd.venue_order_id;
331 let strategy_id = cmd.strategy_id;
332 let instrument_id = cmd.instrument_id;
333
334 log::info!(
335 "Canceling order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
336 );
337
338 let http = self.http.clone();
339 let emitter = self.emitter.clone();
340 let clock = self.clock;
341
342 self.spawn_task("cancel_order", async move {
343 if let Err(e) = http
344 .cancel_order(
345 account_id,
346 instrument_id,
347 Some(client_order_id),
348 venue_order_id,
349 )
350 .await
351 {
352 let ts_event = clock.get_time_ns();
353 emitter.emit_order_cancel_rejected_event(
354 strategy_id,
355 instrument_id,
356 client_order_id,
357 venue_order_id,
358 &format!("cancel-order error: {e}"),
359 ts_event,
360 );
361 anyhow::bail!("Cancel order failed: {e}");
362 }
363 Ok(())
364 });
365 }
366
367 fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
368 let stream = self.ws.stream().map_err(|e| anyhow::anyhow!("{e}"))?;
369 let emitter = self.emitter.clone();
370 let instruments = self.instruments.clone();
371 let order_qty_cache = self.order_qty_cache.clone();
372 let truncated_id_map = self.truncated_id_map.clone();
373 let dispatch_state = self.ws_dispatch_state.clone();
374 let account_id = self.core.account_id;
375 let clock = self.clock;
376 let cancellation_token = self.cancellation_token.clone();
377
378 let handle = get_runtime().spawn(async move {
379 tokio::pin!(stream);
380
381 loop {
382 tokio::select! {
383 () = cancellation_token.cancelled() => {
384 log::debug!("Spot execution message handler cancelled");
385 break;
386 }
387 msg = stream.next() => {
388 match msg {
389 Some(ws_msg) => {
390 Self::handle_ws_message(
391 ws_msg,
392 &emitter,
393 &dispatch_state,
394 &instruments,
395 &order_qty_cache,
396 &truncated_id_map,
397 account_id,
398 clock,
399 );
400 }
401 None => {
402 log::debug!("Spot execution WebSocket stream ended");
403 break;
404 }
405 }
406 }
407 }
408 }
409 });
410
411 self.ws_stream_handle = Some(handle);
412 Ok(())
413 }
414
415 fn modify_single_order(&self, cmd: &ModifyOrder) {
416 let client_order_id = cmd.client_order_id;
417 let venue_order_id = cmd.venue_order_id;
418 let strategy_id = cmd.strategy_id;
419 let instrument_id = cmd.instrument_id;
420 let quantity = cmd.quantity;
421 let price = cmd.price;
422
423 log::info!(
424 "Modifying order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
425 );
426
427 let http = self.http.clone();
428 let emitter = self.emitter.clone();
429 let clock = self.clock;
430
431 self.spawn_task("modify_order", async move {
432 if let Err(e) = http
433 .modify_order(
434 instrument_id,
435 Some(client_order_id),
436 venue_order_id,
437 quantity,
438 price,
439 None,
440 )
441 .await
442 {
443 let ts_event = clock.get_time_ns();
444 emitter.emit_order_modify_rejected_event(
445 strategy_id,
446 instrument_id,
447 client_order_id,
448 venue_order_id,
449 &format!("modify-order error: {e}"),
450 ts_event,
451 );
452 anyhow::bail!("Modify order failed: {e}");
453 }
454 Ok(())
455 });
456 }
457
458 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
460 let account_id = self.core.account_id;
461
462 if self.core.cache().account(&account_id).is_some() {
463 log::info!("Account {account_id} registered");
464 return Ok(());
465 }
466
467 let start = Instant::now();
468 let timeout = Duration::from_secs_f64(timeout_secs);
469 let interval = Duration::from_millis(10);
470
471 loop {
472 tokio::time::sleep(interval).await;
473
474 if self.core.cache().account(&account_id).is_some() {
475 log::info!("Account {account_id} registered");
476 return Ok(());
477 }
478
479 if start.elapsed() >= timeout {
480 anyhow::bail!(
481 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
482 );
483 }
484 }
485 }
486
487 #[expect(clippy::too_many_arguments)]
488 fn handle_ws_message(
489 msg: KrakenSpotWsMessage,
490 emitter: &ExecutionEventEmitter,
491 dispatch_state: &Arc<WsDispatchState>,
492 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
493 order_qty_cache: &Arc<AtomicMap<String, f64>>,
494 truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
495 account_id: AccountId,
496 clock: &'static AtomicTime,
497 ) {
498 match msg {
499 KrakenSpotWsMessage::Execution(executions) => {
500 let ts_init = clock.get_time_ns();
501
502 for exec in &executions {
503 dispatch::spot::execution(
504 exec,
505 dispatch_state,
506 emitter,
507 instruments,
508 truncated_id_map,
509 order_qty_cache,
510 account_id,
511 ts_init,
512 );
513 }
514 }
515 KrakenSpotWsMessage::Reconnected => {
516 log::info!("Spot execution WebSocket reconnected");
517 }
518 KrakenSpotWsMessage::Ticker(_)
519 | KrakenSpotWsMessage::Trade(_)
520 | KrakenSpotWsMessage::Book { .. }
521 | KrakenSpotWsMessage::Ohlc(_) => {}
522 }
523 }
524}
525
526#[async_trait(?Send)]
527impl ExecutionClient for KrakenSpotExecutionClient {
528 fn is_connected(&self) -> bool {
529 self.core.is_connected()
530 }
531
532 fn client_id(&self) -> ClientId {
533 self.core.client_id
534 }
535
536 fn account_id(&self) -> AccountId {
537 self.core.account_id
538 }
539
540 fn venue(&self) -> Venue {
541 *KRAKEN_VENUE
542 }
543
544 fn oms_type(&self) -> OmsType {
545 self.core.oms_type
546 }
547
548 fn get_account(&self) -> Option<AccountAny> {
549 self.core.cache().account(&self.core.account_id).cloned()
550 }
551
552 fn generate_account_state(
553 &self,
554 balances: Vec<AccountBalance>,
555 margins: Vec<MarginBalance>,
556 reported: bool,
557 ts_event: UnixNanos,
558 ) -> anyhow::Result<()> {
559 self.emitter
560 .emit_account_state(balances, margins, reported, ts_event);
561 Ok(())
562 }
563
564 fn start(&mut self) -> anyhow::Result<()> {
565 if self.core.is_started() {
566 return Ok(());
567 }
568
569 self.emitter.set_sender(get_exec_event_sender());
570 self.core.set_started();
571
572 log::info!(
573 "Started: client_id={}, account_id={}, product_type=Spot, environment={:?}",
574 self.core.client_id,
575 self.core.account_id,
576 self.config.environment
577 );
578 Ok(())
579 }
580
581 fn stop(&mut self) -> anyhow::Result<()> {
582 if self.core.is_stopped() {
583 return Ok(());
584 }
585
586 self.cancellation_token.cancel();
587 self.core.set_stopped();
588 self.core.set_disconnected();
589 log::info!("Stopped: client_id={}", self.core.client_id);
590 Ok(())
591 }
592
593 async fn connect(&mut self) -> anyhow::Result<()> {
594 if self.core.is_connected() {
595 return Ok(());
596 }
597
598 if !self.core.instruments_initialized() {
599 let instruments = self
600 .http
601 .request_instruments(None)
602 .await
603 .context("Failed to load Kraken spot instruments")?;
604 log::info!("Loaded {} Spot instruments", instruments.len());
605 self.http.cache_instruments(&instruments);
606 self.core.set_instruments_initialized();
607 }
608
609 self.ws
610 .connect()
611 .await
612 .context("Failed to connect spot WebSocket")?;
613 self.ws
614 .wait_until_active(10.0)
615 .await
616 .context("Spot WebSocket failed to become active")?;
617
618 self.ws
619 .authenticate()
620 .await
621 .context("Failed to authenticate spot WebSocket")?;
622
623 let account_state = self
628 .http
629 .request_account_state(self.core.account_id)
630 .await
631 .context("Failed to request Kraken account state")?;
632
633 if !account_state.balances.is_empty() {
634 log::info!(
635 "Received account state with {} balance(s)",
636 account_state.balances.len()
637 );
638 }
639
640 self.emitter.send_account_state(account_state);
641 self.await_account_registered(30.0).await?;
642
643 self.spawn_message_handler()?;
644
645 self.instruments.rcu(|m| {
646 for instrument in self.http.instruments_cache.load().values() {
647 m.insert(instrument.id(), instrument.clone());
648 }
649 });
650
651 self.ws
652 .subscribe_executions(false, false)
653 .await
654 .context("Failed to subscribe to executions")?;
655
656 log::info!("Spot WebSocket authenticated and subscribed to executions");
657
658 self.core.set_connected();
659 log::info!("Connected: client_id={}", self.core.client_id);
660 Ok(())
661 }
662
663 async fn disconnect(&mut self) -> anyhow::Result<()> {
664 if self.core.is_disconnected() {
665 return Ok(());
666 }
667
668 self.cancellation_token.cancel();
669
670 if let Some(handle) = self.ws_stream_handle.take() {
671 handle.abort();
672 }
673
674 let _ = self.ws.close().await;
675
676 self.cancellation_token = CancellationToken::new();
677 self.core.set_disconnected();
678 log::info!("Disconnected: client_id={}", self.core.client_id);
679 Ok(())
680 }
681
682 async fn generate_order_status_report(
683 &self,
684 cmd: &GenerateOrderStatusReport,
685 ) -> anyhow::Result<Option<OrderStatusReport>> {
686 log::debug!(
687 "Generating order status report: venue_order_id={:?}, client_order_id={:?}",
688 cmd.venue_order_id,
689 cmd.client_order_id
690 );
691
692 let account_id = self.core.account_id;
693 let reports = self
694 .http
695 .request_order_status_reports(account_id, None, None, None, false)
696 .await?;
697
698 Ok(reports.into_iter().find(|r| {
701 cmd.venue_order_id
702 .is_some_and(|id| r.venue_order_id.as_str() == id.as_str())
703 || cmd.client_order_id.is_some_and(|id| {
704 r.client_order_id
705 .as_ref()
706 .is_some_and(|r_id| r_id.as_str() == truncate_cl_ord_id(&id))
707 })
708 }))
709 }
710
711 async fn generate_order_status_reports(
712 &self,
713 cmd: &GenerateOrderStatusReports,
714 ) -> anyhow::Result<Vec<OrderStatusReport>> {
715 log::debug!(
716 "Generating order status reports: instrument_id={:?}, open_only={}",
717 cmd.instrument_id,
718 cmd.open_only
719 );
720
721 let account_id = self.core.account_id;
722 let start = cmd.start.map(DateTime::<Utc>::from);
723 let end = cmd.end.map(DateTime::<Utc>::from);
724 self.http
725 .request_order_status_reports(account_id, cmd.instrument_id, start, end, cmd.open_only)
726 .await
727 }
728
729 async fn generate_fill_reports(
730 &self,
731 cmd: GenerateFillReports,
732 ) -> anyhow::Result<Vec<FillReport>> {
733 log::debug!(
734 "Generating fill reports: instrument_id={:?}",
735 cmd.instrument_id
736 );
737
738 let account_id = self.core.account_id;
739 let start = cmd.start.map(DateTime::<Utc>::from);
740 let end = cmd.end.map(DateTime::<Utc>::from);
741 self.http
742 .request_fill_reports(account_id, cmd.instrument_id, start, end)
743 .await
744 }
745
746 async fn generate_position_status_reports(
747 &self,
748 cmd: &GeneratePositionStatusReports,
749 ) -> anyhow::Result<Vec<PositionStatusReport>> {
750 log::debug!(
751 "Generating position status reports: instrument_id={:?}",
752 cmd.instrument_id
753 );
754
755 let account_id = self.core.account_id;
756 self.http
757 .request_position_status_reports(account_id, cmd.instrument_id)
758 .await
759 }
760
761 async fn generate_mass_status(
762 &self,
763 lookback_mins: Option<u64>,
764 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
765 log::debug!("Generating mass status: lookback_mins={lookback_mins:?}");
766
767 let start = lookback_mins.map(|mins| Utc::now() - Duration::from_secs(mins * 60));
768
769 let account_id = self.core.account_id;
770 let order_reports = self
771 .http
772 .request_order_status_reports(account_id, None, start, None, true)
773 .await?;
774 let fill_reports = self
775 .http
776 .request_fill_reports(account_id, None, start, None)
777 .await?;
778 let position_reports = self
779 .http
780 .request_position_status_reports(account_id, None)
781 .await?;
782
783 let mut mass_status = ExecutionMassStatus::new(
784 self.core.client_id,
785 self.core.account_id,
786 *KRAKEN_VENUE,
787 self.clock.get_time_ns(),
788 None,
789 );
790 mass_status.add_order_reports(order_reports);
791 mass_status.add_fill_reports(fill_reports);
792 mass_status.add_position_reports(position_reports);
793
794 Ok(Some(mass_status))
795 }
796
797 fn query_account(&self, cmd: QueryAccount) -> anyhow::Result<()> {
798 log::debug!("Querying account: {cmd:?}");
799
800 let account_id = self.core.account_id;
801 let http = self.http.clone();
802 let emitter = self.emitter.clone();
803
804 self.spawn_task("query_account", async move {
805 let account_state = http.request_account_state(account_id).await?;
806 emitter.emit_account_state(
807 account_state.balances.clone(),
808 account_state.margins.clone(),
809 account_state.is_reported,
810 account_state.ts_event,
811 );
812 Ok(())
813 });
814
815 Ok(())
816 }
817
818 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
819 log::debug!("Querying order: {cmd:?}");
820
821 let venue_order_id = cmd
822 .venue_order_id
823 .context("venue_order_id required for query_order")?;
824 let account_id = self.core.account_id;
825 let http = self.http.clone();
826 let emitter = self.emitter.clone();
827
828 self.spawn_task("query_order", async move {
829 let reports = http
830 .request_order_status_reports(account_id, None, None, None, true)
831 .await
832 .context("Failed to query order")?;
833
834 if let Some(report) = reports
835 .into_iter()
836 .find(|r| r.venue_order_id == venue_order_id)
837 {
838 emitter.send_order_status_report(report);
839 }
840 Ok(())
841 });
842
843 Ok(())
844 }
845
846 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
847 let order = self
848 .core
849 .cache()
850 .order(&cmd.client_order_id)
851 .cloned()
852 .ok_or_else(|| anyhow::anyhow!("Order not found in cache: {}", cmd.client_order_id))?;
853 self.submit_single_order(&order, "submit_order");
854 Ok(())
855 }
856
857 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
858 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
859
860 log::info!(
861 "Submitting order list: order_list_id={}, count={}",
862 cmd.order_list.id,
863 orders.len()
864 );
865
866 let mut order_tuples = Vec::with_capacity(orders.len());
867 let mut order_meta = Vec::with_capacity(orders.len());
868
869 for order in &orders {
870 if order.is_closed() {
871 log::warn!(
872 "Cannot submit closed order: client_order_id={}",
873 order.client_order_id()
874 );
875 continue;
876 }
877
878 if order.time_in_force() == TimeInForce::Fok && order.order_type() != OrderType::Limit {
879 self.emitter.emit_order_denied(
880 order,
881 "FOK time in force only supported for LIMIT orders on Kraken Spot",
882 );
883 continue;
884 }
885
886 if matches!(
887 order.order_type(),
888 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
889 ) && let Some(offset_type) = order.trailing_offset_type()
890 && offset_type != TrailingOffsetType::Price
891 {
892 self.emitter.emit_order_denied(
893 order,
894 &format!(
895 "Kraken Spot only supports Price trailing offset type: received {offset_type:?}"
896 ),
897 );
898 continue;
899 }
900
901 let client_order_id = order.client_order_id();
902 let kraken_cl_ord_id = truncate_cl_ord_id(&client_order_id);
903
904 self.register_order_identity(order);
905 self.emitter.emit_order_submitted(order);
906
907 if !order.is_quote_quantity() {
908 self.order_qty_cache
909 .insert(kraken_cl_ord_id.clone(), order.quantity().as_f64());
910 }
911
912 if kraken_cl_ord_id != client_order_id.as_str() {
913 self.truncated_id_map
914 .insert(kraken_cl_ord_id, client_order_id);
915 }
916
917 order_tuples.push((
918 order.instrument_id(),
919 client_order_id,
920 order.order_side(),
921 order.order_type(),
922 order.quantity(),
923 order.time_in_force(),
924 order.expire_time(),
925 order.price(),
926 order.trigger_price(),
927 order.trigger_type(),
928 order.trailing_offset(),
929 order.limit_offset(),
930 order.is_reduce_only(),
931 order.is_post_only(),
932 order.is_quote_quantity(),
933 order.display_qty(),
934 ));
935
936 order_meta.push((order.strategy_id(), order.instrument_id(), client_order_id));
937 }
938
939 if order_tuples.is_empty() {
940 return Ok(());
941 }
942
943 let http = self.http.clone();
944 let emitter = self.emitter.clone();
945 let clock = self.clock;
946 let dispatch_state = self.ws_dispatch_state.clone();
947
948 self.spawn_task("submit_order_list", async move {
949 match http.submit_orders_batch(order_tuples).await {
950 Ok(statuses) => {
951 for (i, status) in statuses.iter().enumerate() {
953 if status != "placed"
954 && let Some((strategy_id, instrument_id, client_order_id)) =
955 order_meta.get(i)
956 {
957 let ts_event = clock.get_time_ns();
958 let due_post_only = status.contains("POST_ONLY_REJECTED")
959 || status.contains(KRAKEN_SPOT_POST_ONLY_ERROR);
960 dispatch_state.cleanup_terminal(client_order_id);
961 emitter.emit_order_rejected_event(
962 *strategy_id,
963 *instrument_id,
964 *client_order_id,
965 &format!("submit_order_list batch item rejected: {status}"),
966 ts_event,
967 due_post_only,
968 );
969 }
970 }
971 Ok(())
972 }
973 Err(e) => {
974 let ts_event = clock.get_time_ns();
975 let error_msg = format!("submit_order_list batch error: {e}");
976
977 for (strategy_id, instrument_id, client_order_id) in &order_meta {
978 dispatch_state.cleanup_terminal(client_order_id);
979 emitter.emit_order_rejected_event(
980 *strategy_id,
981 *instrument_id,
982 *client_order_id,
983 &error_msg,
984 ts_event,
985 false,
986 );
987 }
988 Ok(())
989 }
990 }
991 });
992
993 Ok(())
994 }
995
996 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
997 self.modify_single_order(&cmd);
998 Ok(())
999 }
1000
1001 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1002 self.cancel_single_order(&cmd);
1003 Ok(())
1004 }
1005
1006 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1007 let instrument_id = cmd.instrument_id;
1008
1009 if cmd.order_side == OrderSide::NoOrderSide {
1010 log::info!("Canceling all orders: instrument_id={instrument_id} (bulk)");
1011
1012 let http = self.http.clone();
1013
1014 self.spawn_task("cancel_all_orders", async move {
1015 if let Err(e) = http.inner.cancel_all_orders().await {
1016 anyhow::bail!("Cancel all orders failed: {e}");
1017 }
1018 Ok(())
1019 });
1020
1021 return Ok(());
1022 }
1023
1024 log::info!(
1025 "Canceling all orders: instrument_id={instrument_id}, side={:?}",
1026 cmd.order_side
1027 );
1028
1029 let orders_to_cancel: Vec<_> = {
1030 let cache = self.core.cache();
1031 let open_orders = cache.orders_open(None, Some(&instrument_id), None, None, None);
1032
1033 open_orders
1034 .into_iter()
1035 .filter(|order| order.order_side() == cmd.order_side)
1036 .filter_map(|order| {
1037 Some((
1038 order.venue_order_id()?,
1039 order.client_order_id(),
1040 order.instrument_id(),
1041 order.strategy_id(),
1042 ))
1043 })
1044 .collect()
1045 };
1046
1047 let account_id = self.core.account_id;
1048
1049 for (venue_order_id, client_order_id, order_instrument_id, strategy_id) in orders_to_cancel
1050 {
1051 let http = self.http.clone();
1052 let emitter = self.emitter.clone();
1053 let clock = self.clock;
1054
1055 self.spawn_task("cancel_order_by_side", async move {
1056 if let Err(e) = http
1057 .cancel_order(
1058 account_id,
1059 order_instrument_id,
1060 Some(client_order_id),
1061 Some(venue_order_id),
1062 )
1063 .await
1064 {
1065 log::error!("Cancel order failed: {e}");
1066 let ts_event = clock.get_time_ns();
1067 emitter.emit_order_cancel_rejected_event(
1068 strategy_id,
1069 order_instrument_id,
1070 client_order_id,
1071 Some(venue_order_id),
1072 &format!("cancel-order error: {e}"),
1073 ts_event,
1074 );
1075 }
1076 Ok(())
1077 });
1078 }
1079
1080 Ok(())
1081 }
1082
1083 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1084 log::info!(
1085 "Batch canceling orders: instrument_id={}, count={}",
1086 cmd.instrument_id,
1087 cmd.cancels.len()
1088 );
1089
1090 for cancel in &cmd.cancels {
1091 self.cancel_single_order(cancel);
1092 }
1093
1094 Ok(())
1095 }
1096}