1use std::{
19 future::Future,
20 sync::{Arc, Mutex},
21 time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use chrono::{DateTime, Utc};
27use nautilus_common::{
28 clients::ExecutionClient,
29 live::{get_runtime, runner::get_exec_event_sender},
30 messages::execution::{
31 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
33 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34 },
35};
36use nautilus_core::{
37 AtomicMap, MUTEX_POISONED, UnixNanos,
38 time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42 accounts::AccountAny,
43 enums::{AccountType, OmsType, OrderSide, OrderStatus, OrderType},
44 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, Venue},
45 instruments::{Instrument, InstrumentAny},
46 orders::{Order, OrderAny},
47 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48 types::{AccountBalance, MarginBalance, Quantity},
49};
50use rust_decimal::Decimal;
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55 common::{consts::KRAKEN_VENUE, credential::KrakenCredential, parse::truncate_cl_ord_id},
56 config::KrakenExecClientConfig,
57 http::{
58 KrakenFuturesHttpClient, futures::client::KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND,
59 },
60 websocket::{
61 dispatch::{self, OrderIdentity, WsDispatchState},
62 futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
63 },
64};
65
66#[allow(dead_code)]
71#[derive(Debug)]
72pub struct KrakenFuturesExecutionClient {
73 core: ExecutionClientCore,
74 clock: &'static AtomicTime,
75 config: KrakenExecClientConfig,
76 emitter: ExecutionEventEmitter,
77 http: KrakenFuturesHttpClient,
78 ws: KrakenFuturesWebSocketClient,
79 cancellation_token: CancellationToken,
80 ws_stream_handle: Option<JoinHandle<()>>,
81 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
82 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
83 truncated_id_map: Arc<AtomicMap<String, ClientOrderId>>,
84 order_instrument_map: Arc<AtomicMap<String, InstrumentId>>,
85 venue_client_map: Arc<AtomicMap<String, ClientOrderId>>,
86 venue_order_qty: Arc<AtomicMap<String, Quantity>>,
87 ws_dispatch_state: Arc<WsDispatchState>,
88}
89
90impl KrakenFuturesExecutionClient {
91 pub fn new(core: ExecutionClientCore, config: KrakenExecClientConfig) -> anyhow::Result<Self> {
93 let clock = get_atomic_clock_realtime();
94 let emitter = ExecutionEventEmitter::new(
95 clock,
96 core.trader_id,
97 core.account_id,
98 AccountType::Margin,
99 None,
100 );
101
102 let cancellation_token = CancellationToken::new();
103
104 let http = KrakenFuturesHttpClient::with_credentials(
105 config.api_key.clone(),
106 config.api_secret.clone(),
107 config.environment,
108 config.base_url.clone(),
109 config.timeout_secs,
110 None,
111 None,
112 None,
113 config.proxy_url.clone(),
114 config
115 .max_requests_per_second
116 .unwrap_or(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND),
117 )?;
118
119 let credential = KrakenCredential::new(config.api_key.clone(), config.api_secret.clone());
120 let ws = KrakenFuturesWebSocketClient::with_credentials(
121 config.ws_url(),
122 config.heartbeat_interval_secs,
123 Some(credential),
124 config.transport_backend,
125 config.proxy_url.clone(),
126 );
127
128 Ok(Self {
129 core,
130 clock,
131 config,
132 emitter,
133 http,
134 ws,
135 cancellation_token,
136 ws_stream_handle: None,
137 pending_tasks: Mutex::new(Vec::new()),
138 instruments: Arc::new(AtomicMap::new()),
139 truncated_id_map: Arc::new(AtomicMap::new()),
140 order_instrument_map: Arc::new(AtomicMap::new()),
141 venue_client_map: Arc::new(AtomicMap::new()),
142 venue_order_qty: Arc::new(AtomicMap::new()),
143 ws_dispatch_state: Arc::new(WsDispatchState::new()),
144 })
145 }
146
147 fn register_order_identity(&self, order: &OrderAny) {
148 self.ws_dispatch_state.register_identity(
149 order.client_order_id(),
150 OrderIdentity {
151 strategy_id: order.strategy_id(),
152 instrument_id: order.instrument_id(),
153 order_side: order.order_side(),
154 order_type: order.order_type(),
155 quantity: order.quantity(),
156 },
157 );
158 }
159
160 #[must_use]
162 pub fn clock(&self) -> &'static AtomicTime {
163 self.clock
164 }
165
166 #[must_use]
168 pub fn emitter(&self) -> &ExecutionEventEmitter {
169 &self.emitter
170 }
171
172 fn spawn_task<F>(&self, description: &'static str, fut: F)
173 where
174 F: Future<Output = anyhow::Result<()>> + Send + 'static,
175 {
176 let runtime = get_runtime();
177 let handle = runtime.spawn(async move {
178 if let Err(e) = fut.await {
179 log::warn!("{description} failed: {e:?}");
180 }
181 });
182
183 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
184 tasks.retain(|handle| !handle.is_finished());
185 tasks.push(handle);
186 }
187
188 fn submit_single_order(&self, order: &OrderAny, task_name: &'static str) {
189 if order.is_closed() {
190 log::warn!(
191 "Cannot submit closed order: client_order_id={}",
192 order.client_order_id()
193 );
194 return;
195 }
196
197 let account_id = self.core.account_id;
198 let client_order_id = order.client_order_id();
199 let strategy_id = order.strategy_id();
200 let instrument_id = order.instrument_id();
201 let order_side = order.order_side();
202 let order_type = order.order_type();
203 let quantity = order.quantity();
204 let time_in_force = order.time_in_force();
205 let price = order.price();
206 let trigger_price = order.trigger_price();
207 let trigger_type = order.trigger_type();
208 let is_reduce_only = order.is_reduce_only();
209 let is_post_only = order.is_post_only();
210
211 log::debug!("OrderSubmitted: client_order_id={client_order_id}");
212 self.register_order_identity(order);
213 self.emitter.emit_order_submitted(order);
214
215 let kraken_cl_ord_id = truncate_cl_ord_id(&client_order_id);
216
217 if kraken_cl_ord_id != client_order_id.as_str() {
218 self.truncated_id_map
219 .insert(kraken_cl_ord_id, client_order_id);
220 }
221
222 let http = self.http.clone();
223 let emitter = self.emitter.clone();
224 let clock = self.clock;
225 let dispatch_state = self.ws_dispatch_state.clone();
226
227 self.spawn_task(task_name, async move {
228 let result = http
229 .submit_order(
230 account_id,
231 instrument_id,
232 client_order_id,
233 order_side,
234 order_type,
235 quantity,
236 time_in_force,
237 price,
238 trigger_price,
239 trigger_type,
240 is_reduce_only,
241 is_post_only,
242 )
243 .await;
244
245 match result {
246 Ok(_report) => Ok(()),
247 Err(e) => {
248 let ts_event = clock.get_time_ns();
249 let error_msg = format!("{task_name} error: {e}");
250 let due_post_only = error_msg.contains("POST_ONLY_REJECTED");
251 dispatch_state.cleanup_terminal(&client_order_id);
254 emitter.emit_order_rejected_event(
255 strategy_id,
256 instrument_id,
257 client_order_id,
258 &error_msg,
259 ts_event,
260 due_post_only,
261 );
262 Ok(())
263 }
264 }
265 });
266 }
267
268 fn cancel_single_order(&self, cmd: &CancelOrder) {
269 let account_id = self.core.account_id;
270 let client_order_id = cmd.client_order_id;
271 let venue_order_id = cmd.venue_order_id;
272 let strategy_id = cmd.strategy_id;
273 let instrument_id = cmd.instrument_id;
274
275 log::info!(
276 "Canceling order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
277 );
278
279 let http = self.http.clone();
280 let emitter = self.emitter.clone();
281 let clock = self.clock;
282
283 self.spawn_task("cancel_order", async move {
284 if let Err(e) = http
285 .cancel_order(
286 account_id,
287 instrument_id,
288 Some(client_order_id),
289 venue_order_id,
290 )
291 .await
292 {
293 let ts_event = clock.get_time_ns();
294 emitter.emit_order_cancel_rejected_event(
295 strategy_id,
296 instrument_id,
297 client_order_id,
298 venue_order_id,
299 &format!("cancel-order error: {e}"),
300 ts_event,
301 );
302 anyhow::bail!("Cancel order failed: {e}");
303 }
304 Ok(())
305 });
306 }
307
308 fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
309 let mut rx = self
310 .ws
311 .take_output_rx()
312 .context("Failed to take futures WebSocket output receiver")?;
313 let emitter = self.emitter.clone();
314 let instruments = self.instruments.clone();
315 let truncated_id_map = self.truncated_id_map.clone();
316 let order_instrument_map = self.order_instrument_map.clone();
317 let venue_client_map = self.venue_client_map.clone();
318 let venue_order_qty = self.venue_order_qty.clone();
319 let dispatch_state = self.ws_dispatch_state.clone();
320 let account_id = self.core.account_id;
321 let clock = self.clock;
322 let cancellation_token = self.cancellation_token.clone();
323
324 let handle = get_runtime().spawn(async move {
325 loop {
326 tokio::select! {
327 () = cancellation_token.cancelled() => {
328 log::debug!("Futures execution message handler cancelled");
329 break;
330 }
331 msg = rx.recv() => {
332 match msg {
333 Some(ws_msg) => {
334 Self::handle_ws_message(
335 ws_msg,
336 &emitter,
337 &dispatch_state,
338 &instruments,
339 &truncated_id_map,
340 &order_instrument_map,
341 &venue_client_map,
342 &venue_order_qty,
343 account_id,
344 clock,
345 );
346 }
347 None => {
348 log::debug!("Futures execution WebSocket stream ended");
349 break;
350 }
351 }
352 }
353 }
354 }
355 });
356
357 self.ws_stream_handle = Some(handle);
358 Ok(())
359 }
360
361 #[expect(clippy::too_many_arguments)]
362 fn handle_ws_message(
363 msg: KrakenFuturesWsMessage,
364 emitter: &ExecutionEventEmitter,
365 dispatch_state: &Arc<WsDispatchState>,
366 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
367 truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
368 order_instrument_map: &Arc<AtomicMap<String, InstrumentId>>,
369 venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
370 venue_order_qty: &Arc<AtomicMap<String, Quantity>>,
371 account_id: AccountId,
372 clock: &'static AtomicTime,
373 ) {
374 let ts_init = clock.get_time_ns();
375
376 match msg {
377 KrakenFuturesWsMessage::OpenOrdersDelta(delta) => {
378 dispatch::futures::open_orders_delta(
379 &delta,
380 dispatch_state,
381 emitter,
382 instruments,
383 truncated_id_map,
384 order_instrument_map,
385 venue_client_map,
386 venue_order_qty,
387 account_id,
388 ts_init,
389 );
390 }
391 KrakenFuturesWsMessage::OpenOrdersCancel(cancel) => {
392 dispatch::futures::open_orders_cancel(
393 &cancel,
394 dispatch_state,
395 emitter,
396 truncated_id_map,
397 order_instrument_map,
398 venue_client_map,
399 venue_order_qty,
400 account_id,
401 ts_init,
402 );
403 }
404 KrakenFuturesWsMessage::FillsDelta(fills_delta) => {
405 dispatch::futures::fills_delta(
406 &fills_delta,
407 dispatch_state,
408 emitter,
409 instruments,
410 truncated_id_map,
411 venue_client_map,
412 account_id,
413 ts_init,
414 );
415 }
416 KrakenFuturesWsMessage::Challenge(challenge) => {
417 log::debug!("Received challenge: length={}", challenge.len());
418 }
419 KrakenFuturesWsMessage::Reconnected => {
420 log::info!("Futures execution WebSocket reconnected");
421 }
422 KrakenFuturesWsMessage::Ticker(_)
423 | KrakenFuturesWsMessage::Trade(_)
424 | KrakenFuturesWsMessage::BookSnapshot(_)
425 | KrakenFuturesWsMessage::BookDelta(_) => {}
426 }
427 }
428
429 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
430 let account_id = self.core.account_id;
431
432 if self.core.cache().account(&account_id).is_some() {
433 log::info!("Account {account_id} registered");
434 return Ok(());
435 }
436
437 let start = Instant::now();
438 let timeout = Duration::from_secs_f64(timeout_secs);
439 let interval = Duration::from_millis(10);
440
441 loop {
442 tokio::time::sleep(interval).await;
443
444 if self.core.cache().account(&account_id).is_some() {
445 log::info!("Account {account_id} registered");
446 return Ok(());
447 }
448
449 if start.elapsed() >= timeout {
450 anyhow::bail!(
451 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
452 );
453 }
454 }
455 }
456
457 fn modify_single_order(&self, cmd: &ModifyOrder) {
458 let client_order_id = cmd.client_order_id;
459 let venue_order_id = cmd.venue_order_id;
460 let strategy_id = cmd.strategy_id;
461 let instrument_id = cmd.instrument_id;
462 let quantity = cmd.quantity;
463 let price = cmd.price;
464
465 log::info!(
466 "Modifying order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
467 );
468
469 let http = self.http.clone();
470 let emitter = self.emitter.clone();
471 let clock = self.clock;
472
473 self.spawn_task("modify_order", async move {
474 if let Err(e) = http
475 .modify_order(
476 instrument_id,
477 Some(client_order_id),
478 venue_order_id,
479 quantity,
480 price,
481 None,
482 )
483 .await
484 {
485 let ts_event = clock.get_time_ns();
486 emitter.emit_order_modify_rejected_event(
487 strategy_id,
488 instrument_id,
489 client_order_id,
490 venue_order_id,
491 &format!("modify-order error: {e}"),
492 ts_event,
493 );
494 anyhow::bail!("Modify order failed: {e}");
495 }
496 Ok(())
497 });
498 }
499}
500
501#[async_trait(?Send)]
502impl ExecutionClient for KrakenFuturesExecutionClient {
503 fn is_connected(&self) -> bool {
504 self.core.is_connected()
505 }
506
507 fn client_id(&self) -> ClientId {
508 self.core.client_id
509 }
510
511 fn account_id(&self) -> AccountId {
512 self.core.account_id
513 }
514
515 fn venue(&self) -> Venue {
516 *KRAKEN_VENUE
517 }
518
519 fn oms_type(&self) -> OmsType {
520 self.core.oms_type
521 }
522
523 fn get_account(&self) -> Option<AccountAny> {
524 self.core.cache().account(&self.core.account_id).cloned()
525 }
526
527 fn generate_account_state(
528 &self,
529 balances: Vec<AccountBalance>,
530 margins: Vec<MarginBalance>,
531 reported: bool,
532 ts_event: UnixNanos,
533 ) -> anyhow::Result<()> {
534 self.emitter
535 .emit_account_state(balances, margins, reported, ts_event);
536 Ok(())
537 }
538
539 fn start(&mut self) -> anyhow::Result<()> {
540 if self.core.is_started() {
541 return Ok(());
542 }
543
544 self.emitter.set_sender(get_exec_event_sender());
545 self.core.set_started();
546
547 log::info!(
548 "Started: client_id={}, account_id={}, product_type=Futures, environment={:?}",
549 self.core.client_id,
550 self.core.account_id,
551 self.config.environment
552 );
553 Ok(())
554 }
555
556 fn stop(&mut self) -> anyhow::Result<()> {
557 if self.core.is_stopped() {
558 return Ok(());
559 }
560
561 self.cancellation_token.cancel();
562 self.core.set_stopped();
563 self.core.set_disconnected();
564 log::info!("Stopped: client_id={}", self.core.client_id);
565 Ok(())
566 }
567
568 async fn connect(&mut self) -> anyhow::Result<()> {
569 if self.core.is_connected() {
570 return Ok(());
571 }
572
573 if !self.core.instruments_initialized() {
574 let instruments = self
575 .http
576 .request_instruments()
577 .await
578 .context("Failed to load Kraken futures instruments")?;
579 log::info!("Loaded {} Futures instruments", instruments.len());
580 self.http.cache_instruments(&instruments);
581 self.core.set_instruments_initialized();
582 }
583
584 self.instruments.rcu(|m| {
585 for instrument in self.http.instruments_cache.load().values() {
586 m.insert(instrument.id(), instrument.clone());
587 }
588 });
589
590 self.ws
591 .connect()
592 .await
593 .context("Failed to connect futures WebSocket")?;
594 self.ws
595 .wait_until_active(10.0)
596 .await
597 .context("Futures WebSocket failed to become active")?;
598
599 self.ws
600 .authenticate()
601 .await
602 .context("Failed to authenticate futures WebSocket")?;
603
604 let account_state = self
606 .http
607 .request_account_state(self.core.account_id)
608 .await
609 .context("Failed to request Kraken futures account state")?;
610
611 if !account_state.balances.is_empty() {
612 log::info!(
613 "Received account state with {} balance(s)",
614 account_state.balances.len()
615 );
616 }
617 self.emitter.send_account_state(account_state);
618 self.await_account_registered(30.0).await?;
619
620 self.spawn_message_handler()?;
621
622 self.ws
623 .subscribe_executions()
624 .await
625 .context("Failed to subscribe to executions")?;
626
627 log::info!("Futures WebSocket authenticated and subscribed to executions");
628
629 self.core.set_connected();
630 log::info!("Connected: client_id={}", self.core.client_id);
631 Ok(())
632 }
633
634 async fn disconnect(&mut self) -> anyhow::Result<()> {
635 if self.core.is_disconnected() {
636 return Ok(());
637 }
638
639 self.cancellation_token.cancel();
640
641 if let Some(handle) = self.ws_stream_handle.take() {
642 handle.abort();
643 }
644
645 let _ = self.ws.close().await;
646
647 self.cancellation_token = CancellationToken::new();
648 self.core.set_disconnected();
649 log::info!("Disconnected: client_id={}", self.core.client_id);
650 Ok(())
651 }
652
653 async fn generate_order_status_report(
654 &self,
655 cmd: &GenerateOrderStatusReport,
656 ) -> anyhow::Result<Option<OrderStatusReport>> {
657 log::debug!(
658 "Generating order status report: venue_order_id={:?}, client_order_id={:?}",
659 cmd.venue_order_id,
660 cmd.client_order_id
661 );
662
663 let account_id = self.core.account_id;
664 let reports = self
665 .http
666 .request_order_status_reports(account_id, None, None, None, false)
667 .await?;
668
669 let matched = reports.into_iter().find(|r| {
672 cmd.venue_order_id
673 .is_some_and(|id| r.venue_order_id.as_str() == id.as_str())
674 || cmd.client_order_id.is_some_and(|id| {
675 r.client_order_id
676 .as_ref()
677 .is_some_and(|r_id| r_id.as_str() == truncate_cl_ord_id(&id))
678 })
679 });
680
681 if matched.is_some() {
682 return Ok(matched);
683 }
684
685 let Some(order) = self.get_cached_order_for_status_command(cmd) else {
686 return Ok(None);
687 };
688
689 let now = Utc::now();
690 let start = now - Duration::from_secs(5 * 60);
691 let fills = self
692 .http
693 .request_fill_reports(
694 account_id,
695 Some(order.instrument_id()),
696 Some(start),
697 Some(now),
698 )
699 .await?;
700
701 Ok(synthesize_filled_order_status_report(cmd, &order, &fills))
702 }
703
704 async fn generate_order_status_reports(
705 &self,
706 cmd: &GenerateOrderStatusReports,
707 ) -> anyhow::Result<Vec<OrderStatusReport>> {
708 log::debug!(
709 "Generating order status reports: instrument_id={:?}, open_only={}",
710 cmd.instrument_id,
711 cmd.open_only
712 );
713
714 let account_id = self.core.account_id;
715 let start = cmd.start.map(DateTime::<Utc>::from);
716 let end = cmd.end.map(DateTime::<Utc>::from);
717 self.http
718 .request_order_status_reports(account_id, cmd.instrument_id, start, end, cmd.open_only)
719 .await
720 }
721
722 async fn generate_fill_reports(
723 &self,
724 cmd: GenerateFillReports,
725 ) -> anyhow::Result<Vec<FillReport>> {
726 log::debug!(
727 "Generating fill reports: instrument_id={:?}",
728 cmd.instrument_id
729 );
730
731 let account_id = self.core.account_id;
732 let start = cmd.start.map(DateTime::<Utc>::from);
733 let end = cmd.end.map(DateTime::<Utc>::from);
734 let mut reports = self
735 .http
736 .request_fill_reports(account_id, cmd.instrument_id, start, end)
737 .await?;
738
739 if let Some(venue_order_id) = cmd.venue_order_id {
740 reports.retain(|report| report.venue_order_id == venue_order_id);
741 }
742
743 Ok(reports)
744 }
745
746 async fn generate_position_status_reports(
747 &self,
748 cmd: &GeneratePositionStatusReports,
749 ) -> anyhow::Result<Vec<PositionStatusReport>> {
750 log::debug!(
751 "Generating position status reports: instrument_id={:?}",
752 cmd.instrument_id
753 );
754
755 let account_id = self.core.account_id;
756 self.http
757 .request_position_status_reports(account_id, cmd.instrument_id)
758 .await
759 }
760
761 async fn generate_mass_status(
762 &self,
763 lookback_mins: Option<u64>,
764 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
765 log::debug!("Generating mass status: lookback_mins={lookback_mins:?}");
766
767 let start = lookback_mins.map(|mins| Utc::now() - Duration::from_secs(mins * 60));
768
769 let account_id = self.core.account_id;
770 let order_reports = self
771 .http
772 .request_order_status_reports(account_id, None, start, None, true)
773 .await?;
774 let fill_reports = self
775 .http
776 .request_fill_reports(account_id, None, start, None)
777 .await?;
778 let position_reports = self
779 .http
780 .request_position_status_reports(account_id, None)
781 .await?;
782
783 let mut mass_status = ExecutionMassStatus::new(
784 self.core.client_id,
785 self.core.account_id,
786 *KRAKEN_VENUE,
787 self.clock.get_time_ns(),
788 None,
789 );
790 mass_status.add_order_reports(order_reports);
791 mass_status.add_fill_reports(fill_reports);
792 mass_status.add_position_reports(position_reports);
793
794 Ok(Some(mass_status))
795 }
796
797 fn query_account(&self, cmd: QueryAccount) -> anyhow::Result<()> {
798 log::debug!("Querying account: {cmd:?}");
799
800 let account_id = self.core.account_id;
801 let http = self.http.clone();
802 let emitter = self.emitter.clone();
803
804 self.spawn_task("query_account", async move {
805 let account_state = http.request_account_state(account_id).await?;
806 emitter.emit_account_state(
807 account_state.balances.clone(),
808 account_state.margins.clone(),
809 account_state.is_reported,
810 account_state.ts_event,
811 );
812 Ok(())
813 });
814
815 Ok(())
816 }
817
818 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
819 log::debug!("Querying order: {cmd:?}");
820
821 let venue_order_id = cmd
822 .venue_order_id
823 .context("venue_order_id required for query_order")?;
824 let account_id = self.core.account_id;
825 let http = self.http.clone();
826 let emitter = self.emitter.clone();
827
828 self.spawn_task("query_order", async move {
829 let reports = http
830 .request_order_status_reports(account_id, None, None, None, true)
831 .await
832 .context("Failed to query order")?;
833
834 if let Some(report) = reports
835 .into_iter()
836 .find(|r| r.venue_order_id == venue_order_id)
837 {
838 emitter.send_order_status_report(report);
839 }
840 Ok(())
841 });
842
843 Ok(())
844 }
845
846 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
847 let order = self
848 .core
849 .cache()
850 .order(&cmd.client_order_id)
851 .cloned()
852 .ok_or_else(|| anyhow::anyhow!("Order not found in cache: {}", cmd.client_order_id))?;
853 self.submit_single_order(&order, "submit_order");
854 Ok(())
855 }
856
857 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
858 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
859
860 log::info!(
861 "Submitting order list: order_list_id={}, count={}",
862 cmd.order_list.id,
863 orders.len()
864 );
865
866 let mut order_tuples = Vec::with_capacity(orders.len());
867 let mut order_meta = Vec::with_capacity(orders.len());
868
869 for order in &orders {
870 if order.is_closed() {
871 log::warn!(
872 "Cannot submit closed order: client_order_id={}",
873 order.client_order_id()
874 );
875 continue;
876 }
877
878 if order.order_type() == OrderType::Market {
881 self.submit_single_order(order, "submit_order_list");
882 continue;
883 }
884
885 let client_order_id = order.client_order_id();
886 let kraken_cl_ord_id = truncate_cl_ord_id(&client_order_id);
887
888 if kraken_cl_ord_id != client_order_id.as_str() {
889 self.truncated_id_map
890 .insert(kraken_cl_ord_id, client_order_id);
891 }
892
893 self.register_order_identity(order);
894 self.emitter.emit_order_submitted(order);
895
896 order_tuples.push((
897 order.instrument_id(),
898 client_order_id,
899 order.order_side(),
900 order.order_type(),
901 order.quantity(),
902 order.time_in_force(),
903 order.price(),
904 order.trigger_price(),
905 order.trigger_type(),
906 order.is_reduce_only(),
907 order.is_post_only(),
908 ));
909
910 order_meta.push((order.strategy_id(), order.instrument_id(), client_order_id));
911 }
912
913 if order_tuples.is_empty() {
914 return Ok(());
915 }
916
917 let http = self.http.clone();
918 let emitter = self.emitter.clone();
919 let clock = self.clock;
920 let dispatch_state = self.ws_dispatch_state.clone();
921
922 self.spawn_task("submit_order_list", async move {
923 match http.submit_orders_batch(order_tuples).await {
924 Ok(statuses) => {
925 for (i, status) in statuses.iter().enumerate() {
926 if status.status != "placed"
927 && status.status != "filled"
928 && let Some((strategy_id, instrument_id, client_order_id)) =
929 order_meta.get(i)
930 {
931 let ts_event = clock.get_time_ns();
932 let error_msg = format!(
933 "submit_order_list batch item rejected: {}",
934 status.status,
935 );
936 dispatch_state.cleanup_terminal(client_order_id);
937 emitter.emit_order_rejected_event(
938 *strategy_id,
939 *instrument_id,
940 *client_order_id,
941 &error_msg,
942 ts_event,
943 status.status == "postWouldExecute",
944 );
945 }
946 }
947 Ok(())
948 }
949 Err(e) => {
950 let ts_event = clock.get_time_ns();
951
952 for (strategy_id, instrument_id, client_order_id) in &order_meta {
953 let error_msg = format!("submit_order_list batch error: {e}");
954 dispatch_state.cleanup_terminal(client_order_id);
955 emitter.emit_order_rejected_event(
956 *strategy_id,
957 *instrument_id,
958 *client_order_id,
959 &error_msg,
960 ts_event,
961 false,
962 );
963 }
964 Ok(())
965 }
966 }
967 });
968
969 Ok(())
970 }
971
972 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
973 self.modify_single_order(&cmd);
974 Ok(())
975 }
976
977 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
978 self.cancel_single_order(&cmd);
979 Ok(())
980 }
981
982 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
983 let instrument_id = cmd.instrument_id;
984
985 if cmd.order_side == OrderSide::NoOrderSide {
986 log::info!("Canceling all orders: instrument_id={instrument_id} (bulk)");
987
988 let http = self.http.clone();
989 let symbol = instrument_id.symbol.to_string();
990
991 self.spawn_task("cancel_all_orders", async move {
992 if let Err(e) = http.inner.cancel_all_orders(Some(symbol)).await {
993 anyhow::bail!("Cancel all orders failed: {e}");
994 }
995 Ok(())
996 });
997
998 return Ok(());
999 }
1000
1001 log::info!(
1002 "Canceling all orders: instrument_id={instrument_id}, side={:?}",
1003 cmd.order_side
1004 );
1005
1006 let orders_to_cancel: Vec<_> = {
1007 let cache = self.core.cache();
1008 let open_orders = cache.orders_open(None, Some(&instrument_id), None, None, None);
1009
1010 open_orders
1011 .into_iter()
1012 .filter(|order| order.order_side() == cmd.order_side)
1013 .filter_map(|order| {
1014 Some((
1015 order.venue_order_id()?,
1016 order.client_order_id(),
1017 order.instrument_id(),
1018 order.strategy_id(),
1019 ))
1020 })
1021 .collect()
1022 };
1023
1024 let account_id = self.core.account_id;
1025
1026 for (venue_order_id, client_order_id, order_instrument_id, strategy_id) in orders_to_cancel
1027 {
1028 let http = self.http.clone();
1029 let emitter = self.emitter.clone();
1030 let clock = self.clock;
1031
1032 self.spawn_task("cancel_order_by_side", async move {
1033 if let Err(e) = http
1034 .cancel_order(
1035 account_id,
1036 order_instrument_id,
1037 Some(client_order_id),
1038 Some(venue_order_id),
1039 )
1040 .await
1041 {
1042 log::error!("Cancel order failed: {e}");
1043 let ts_event = clock.get_time_ns();
1044 emitter.emit_order_cancel_rejected_event(
1045 strategy_id,
1046 order_instrument_id,
1047 client_order_id,
1048 Some(venue_order_id),
1049 &format!("cancel-order error: {e}"),
1050 ts_event,
1051 );
1052 }
1053 Ok(())
1054 });
1055 }
1056
1057 Ok(())
1058 }
1059
1060 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1061 log::info!(
1062 "Batch canceling orders: instrument_id={}, count={}",
1063 cmd.instrument_id,
1064 cmd.cancels.len()
1065 );
1066
1067 for cancel in &cmd.cancels {
1068 self.cancel_single_order(cancel);
1069 }
1070
1071 Ok(())
1072 }
1073}
1074
1075impl KrakenFuturesExecutionClient {
1076 fn get_cached_order_for_status_command(
1077 &self,
1078 cmd: &GenerateOrderStatusReport,
1079 ) -> Option<OrderAny> {
1080 let cache = self.core.cache();
1081
1082 if let Some(client_order_id) = cmd.client_order_id {
1083 return cache.order(&client_order_id).cloned();
1084 }
1085
1086 let venue_order_id = cmd.venue_order_id?;
1087 let client_order_id = *cache.client_order_id(&venue_order_id)?;
1088 cache.order(&client_order_id).cloned()
1089 }
1090}
1091
1092fn synthesize_filled_order_status_report(
1093 cmd: &GenerateOrderStatusReport,
1094 order: &OrderAny,
1095 fills: &[FillReport],
1096) -> Option<OrderStatusReport> {
1097 let venue_order_id = cmd.venue_order_id.or(order.venue_order_id());
1098 let truncated_client_order_id = truncate_cl_ord_id(&order.client_order_id());
1099
1100 let mut matched: Vec<&FillReport> = if let Some(venue_order_id) = venue_order_id {
1101 fills
1102 .iter()
1103 .filter(|fill| fill.venue_order_id == venue_order_id)
1104 .collect()
1105 } else {
1106 Vec::new()
1107 };
1108
1109 if matched.is_empty() {
1110 matched = fills
1111 .iter()
1112 .filter(|fill| {
1113 fill.client_order_id == Some(order.client_order_id())
1114 || fill
1115 .client_order_id
1116 .as_ref()
1117 .is_some_and(|fill_client_order_id| {
1118 fill_client_order_id.as_str() == truncated_client_order_id
1119 })
1120 })
1121 .collect();
1122 }
1123
1124 if matched.is_empty() {
1125 return None;
1126 }
1127
1128 matched.sort_by_key(|fill| fill.ts_event);
1129 let first_fill = *matched.first()?;
1130 let last_fill = *matched.last()?;
1131
1132 let total_filled = matched
1133 .iter()
1134 .fold(Decimal::ZERO, |acc, fill| acc + fill.last_qty.as_decimal());
1135 if total_filled < order.quantity().as_decimal() {
1136 return None;
1137 }
1138
1139 let total_notional = matched.iter().fold(Decimal::ZERO, |acc, fill| {
1140 acc + fill.last_qty.as_decimal() * fill.last_px.as_decimal()
1141 });
1142 let avg_px = if total_filled.is_zero() {
1143 None
1144 } else {
1145 Some(total_notional / total_filled)
1146 };
1147 let venue_order_id = venue_order_id.unwrap_or(first_fill.venue_order_id);
1148
1149 let mut report = OrderStatusReport::new(
1150 first_fill.account_id,
1151 order.instrument_id(),
1152 Some(order.client_order_id()),
1153 venue_order_id,
1154 order.order_side(),
1155 order.order_type(),
1156 order.time_in_force(),
1157 OrderStatus::Filled,
1158 order.quantity(),
1159 order.quantity(),
1160 first_fill.ts_event,
1161 last_fill.ts_event,
1162 last_fill.ts_init,
1163 None,
1164 );
1165 report.order_list_id = order.order_list_id();
1166 report.venue_position_id = matched.iter().rev().find_map(|fill| fill.venue_position_id);
1167 report.linked_order_ids = order
1168 .linked_order_ids()
1169 .map(|linked_order_ids| linked_order_ids.to_vec());
1170 report.parent_order_id = order.parent_order_id();
1171 report.expire_time = order.expire_time();
1172 report.price = order.price();
1173 report.trigger_price = order.trigger_price();
1174 report.trigger_type = order.trigger_type();
1175 report.avg_px = avg_px;
1176 report.display_qty = order.display_qty();
1177 report.post_only = order.is_post_only();
1178 report.reduce_only = order.is_reduce_only();
1179 Some(report)
1180}
1181
1182#[cfg(test)]
1183mod tests {
1184 use nautilus_core::{UUID4, UnixNanos};
1185 use nautilus_model::{
1186 enums::{LiquiditySide, OrderSide, OrderType, TimeInForce},
1187 identifiers::{AccountId, ClientOrderId, InstrumentId, TradeId, VenueOrderId},
1188 orders::OrderTestBuilder,
1189 reports::FillReport,
1190 types::{Currency, Money, Price, Quantity},
1191 };
1192 use rstest::rstest;
1193
1194 use super::*;
1195
1196 const TEST_INSTRUMENT_ID: &str = "PF_XBTUSD.KRAKEN";
1197
1198 fn make_fill(
1199 venue_order_id: &str,
1200 client_order_id: Option<&str>,
1201 quantity: &str,
1202 price: &str,
1203 ts_event: u64,
1204 ) -> FillReport {
1205 FillReport::new(
1206 AccountId::from("KRAKEN-001"),
1207 InstrumentId::from(TEST_INSTRUMENT_ID),
1208 VenueOrderId::from(venue_order_id),
1209 TradeId::from(format!("T-{ts_event}").as_str()),
1210 OrderSide::Buy,
1211 Quantity::from(quantity),
1212 Price::from(price),
1213 Money::new(0.0, Currency::USD()),
1214 LiquiditySide::Taker,
1215 client_order_id.map(ClientOrderId::from),
1216 None,
1217 UnixNanos::from(ts_event),
1218 UnixNanos::from(ts_event),
1219 None,
1220 )
1221 }
1222
1223 fn make_cmd(
1224 client_order_id: Option<&str>,
1225 venue_order_id: Option<&str>,
1226 ) -> GenerateOrderStatusReport {
1227 GenerateOrderStatusReport::new(
1228 UUID4::new(),
1229 UnixNanos::default(),
1230 Some(InstrumentId::from(TEST_INSTRUMENT_ID)),
1231 client_order_id.map(ClientOrderId::from),
1232 venue_order_id.map(VenueOrderId::from),
1233 None,
1234 None,
1235 )
1236 }
1237
1238 fn make_order(client_order_id: &str) -> OrderAny {
1239 OrderTestBuilder::new(OrderType::Market)
1240 .instrument_id(InstrumentId::from(TEST_INSTRUMENT_ID))
1241 .client_order_id(ClientOrderId::from(client_order_id))
1242 .side(OrderSide::Buy)
1243 .quantity(Quantity::from("100"))
1244 .time_in_force(TimeInForce::Ioc)
1245 .build()
1246 }
1247
1248 #[rstest]
1249 fn test_synthesize_filled_order_status_report_matches_full_fill_by_venue_order_id() {
1250 let order = make_order("O-123456");
1251 let cmd = make_cmd(Some("O-123456"), Some("KRAKEN-789"));
1252 let fills = vec![
1253 make_fill("KRAKEN-789", Some("O-123456"), "40", "50000.0", 1),
1254 make_fill("KRAKEN-789", Some("O-123456"), "60", "50010.0", 2),
1255 make_fill("KRAKEN-OTHER", Some("O-123456"), "999", "1.0", 3),
1256 ];
1257
1258 let report = synthesize_filled_order_status_report(&cmd, &order, &fills)
1259 .expect("expected a filled report");
1260
1261 assert_eq!(report.venue_order_id, VenueOrderId::from("KRAKEN-789"));
1262 assert_eq!(
1263 report.client_order_id,
1264 Some(ClientOrderId::from("O-123456"))
1265 );
1266 assert_eq!(report.order_status, OrderStatus::Filled);
1267 assert_eq!(report.order_type, OrderType::Market);
1268 assert_eq!(report.time_in_force, TimeInForce::Ioc);
1269 assert_eq!(report.quantity, Quantity::from("100"));
1270 assert_eq!(report.filled_qty, Quantity::from("100"));
1271 assert_eq!(
1272 report.avg_px,
1273 Some(Decimal::from_str_exact("50006.0").unwrap())
1274 );
1275 }
1276
1277 #[rstest]
1278 fn test_synthesize_filled_order_status_report_requires_full_fill_size() {
1279 let order = make_order("O-123457");
1280 let cmd = make_cmd(Some("O-123457"), Some("KRAKEN-790"));
1281 let fills = vec![make_fill(
1282 "KRAKEN-790",
1283 Some("O-123457"),
1284 "40",
1285 "50000.0",
1286 1,
1287 )];
1288
1289 assert!(synthesize_filled_order_status_report(&cmd, &order, &fills).is_none());
1290 }
1291
1292 #[rstest]
1293 fn test_synthesize_filled_order_status_report_matches_truncated_client_order_id() {
1294 let long_client_order_id = "O202602270023210040011";
1295 let order = make_order(long_client_order_id);
1296 let cmd = make_cmd(Some(long_client_order_id), None);
1297 let fills = vec![make_fill(
1298 "KRAKEN-791",
1299 Some(truncate_cl_ord_id(&ClientOrderId::from(long_client_order_id)).as_str()),
1300 "100",
1301 "50000.0",
1302 1,
1303 )];
1304
1305 let report = synthesize_filled_order_status_report(&cmd, &order, &fills)
1306 .expect("expected a filled report");
1307
1308 assert_eq!(
1309 report.client_order_id,
1310 Some(ClientOrderId::from(long_client_order_id))
1311 );
1312 assert_eq!(report.venue_order_id, VenueOrderId::from("KRAKEN-791"));
1313 assert_eq!(report.order_status, OrderStatus::Filled);
1314 }
1315}