Skip to main content

nautilus_kraken/execution/
futures.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Kraken Futures execution client implementation.
17
18use std::{
19    future::Future,
20    sync::{Arc, Mutex},
21    time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use chrono::{DateTime, Utc};
27use nautilus_common::{
28    clients::ExecutionClient,
29    live::{get_runtime, runner::get_exec_event_sender},
30    messages::execution::{
31        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
32        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
33        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34    },
35};
36use nautilus_core::{
37    AtomicMap, MUTEX_POISONED, UnixNanos,
38    time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42    accounts::AccountAny,
43    enums::{AccountType, OmsType, OrderSide, OrderStatus, OrderType},
44    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, Venue},
45    instruments::{Instrument, InstrumentAny},
46    orders::{Order, OrderAny},
47    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48    types::{AccountBalance, MarginBalance, Quantity},
49};
50use rust_decimal::Decimal;
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55    common::{consts::KRAKEN_VENUE, credential::KrakenCredential, parse::truncate_cl_ord_id},
56    config::KrakenExecClientConfig,
57    http::{
58        KrakenFuturesHttpClient, futures::client::KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND,
59    },
60    websocket::{
61        dispatch::{self, OrderIdentity, WsDispatchState},
62        futures::{client::KrakenFuturesWebSocketClient, messages::KrakenFuturesWsMessage},
63    },
64};
65
66/// Kraken Futures execution client.
67///
68/// Provides order management, account operations, and position management
69/// for Kraken Futures markets.
70#[allow(dead_code)]
71#[derive(Debug)]
72pub struct KrakenFuturesExecutionClient {
73    core: ExecutionClientCore,
74    clock: &'static AtomicTime,
75    config: KrakenExecClientConfig,
76    emitter: ExecutionEventEmitter,
77    http: KrakenFuturesHttpClient,
78    ws: KrakenFuturesWebSocketClient,
79    cancellation_token: CancellationToken,
80    ws_stream_handle: Option<JoinHandle<()>>,
81    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
82    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
83    truncated_id_map: Arc<AtomicMap<String, ClientOrderId>>,
84    order_instrument_map: Arc<AtomicMap<String, InstrumentId>>,
85    venue_client_map: Arc<AtomicMap<String, ClientOrderId>>,
86    venue_order_qty: Arc<AtomicMap<String, Quantity>>,
87    ws_dispatch_state: Arc<WsDispatchState>,
88}
89
90impl KrakenFuturesExecutionClient {
91    /// Creates a new [`KrakenFuturesExecutionClient`].
92    pub fn new(core: ExecutionClientCore, config: KrakenExecClientConfig) -> anyhow::Result<Self> {
93        let clock = get_atomic_clock_realtime();
94        let emitter = ExecutionEventEmitter::new(
95            clock,
96            core.trader_id,
97            core.account_id,
98            AccountType::Margin,
99            None,
100        );
101
102        let cancellation_token = CancellationToken::new();
103
104        let http = KrakenFuturesHttpClient::with_credentials(
105            config.api_key.clone(),
106            config.api_secret.clone(),
107            config.environment,
108            config.base_url.clone(),
109            config.timeout_secs,
110            None,
111            None,
112            None,
113            config.proxy_url.clone(),
114            config
115                .max_requests_per_second
116                .unwrap_or(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND),
117        )?;
118
119        let credential = KrakenCredential::new(config.api_key.clone(), config.api_secret.clone());
120        let ws = KrakenFuturesWebSocketClient::with_credentials(
121            config.ws_url(),
122            config.heartbeat_interval_secs,
123            Some(credential),
124            config.transport_backend,
125            config.proxy_url.clone(),
126        );
127
128        Ok(Self {
129            core,
130            clock,
131            config,
132            emitter,
133            http,
134            ws,
135            cancellation_token,
136            ws_stream_handle: None,
137            pending_tasks: Mutex::new(Vec::new()),
138            instruments: Arc::new(AtomicMap::new()),
139            truncated_id_map: Arc::new(AtomicMap::new()),
140            order_instrument_map: Arc::new(AtomicMap::new()),
141            venue_client_map: Arc::new(AtomicMap::new()),
142            venue_order_qty: Arc::new(AtomicMap::new()),
143            ws_dispatch_state: Arc::new(WsDispatchState::new()),
144        })
145    }
146
147    fn register_order_identity(&self, order: &OrderAny) {
148        self.ws_dispatch_state.register_identity(
149            order.client_order_id(),
150            OrderIdentity {
151                strategy_id: order.strategy_id(),
152                instrument_id: order.instrument_id(),
153                order_side: order.order_side(),
154                order_type: order.order_type(),
155                quantity: order.quantity(),
156            },
157        );
158    }
159
160    /// Returns a reference to the clock.
161    #[must_use]
162    pub fn clock(&self) -> &'static AtomicTime {
163        self.clock
164    }
165
166    /// Returns a reference to the event emitter.
167    #[must_use]
168    pub fn emitter(&self) -> &ExecutionEventEmitter {
169        &self.emitter
170    }
171
172    fn spawn_task<F>(&self, description: &'static str, fut: F)
173    where
174        F: Future<Output = anyhow::Result<()>> + Send + 'static,
175    {
176        let runtime = get_runtime();
177        let handle = runtime.spawn(async move {
178            if let Err(e) = fut.await {
179                log::warn!("{description} failed: {e:?}");
180            }
181        });
182
183        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
184        tasks.retain(|handle| !handle.is_finished());
185        tasks.push(handle);
186    }
187
188    fn submit_single_order(&self, order: &OrderAny, task_name: &'static str) {
189        if order.is_closed() {
190            log::warn!(
191                "Cannot submit closed order: client_order_id={}",
192                order.client_order_id()
193            );
194            return;
195        }
196
197        let account_id = self.core.account_id;
198        let client_order_id = order.client_order_id();
199        let strategy_id = order.strategy_id();
200        let instrument_id = order.instrument_id();
201        let order_side = order.order_side();
202        let order_type = order.order_type();
203        let quantity = order.quantity();
204        let time_in_force = order.time_in_force();
205        let price = order.price();
206        let trigger_price = order.trigger_price();
207        let trigger_type = order.trigger_type();
208        let is_reduce_only = order.is_reduce_only();
209        let is_post_only = order.is_post_only();
210
211        log::debug!("OrderSubmitted: client_order_id={client_order_id}");
212        self.register_order_identity(order);
213        self.emitter.emit_order_submitted(order);
214
215        let kraken_cl_ord_id = truncate_cl_ord_id(&client_order_id);
216
217        if kraken_cl_ord_id != client_order_id.as_str() {
218            self.truncated_id_map
219                .insert(kraken_cl_ord_id, client_order_id);
220        }
221
222        let http = self.http.clone();
223        let emitter = self.emitter.clone();
224        let clock = self.clock;
225        let dispatch_state = self.ws_dispatch_state.clone();
226
227        self.spawn_task(task_name, async move {
228            let result = http
229                .submit_order(
230                    account_id,
231                    instrument_id,
232                    client_order_id,
233                    order_side,
234                    order_type,
235                    quantity,
236                    time_in_force,
237                    price,
238                    trigger_price,
239                    trigger_type,
240                    is_reduce_only,
241                    is_post_only,
242                )
243                .await;
244
245            match result {
246                Ok(_report) => Ok(()),
247                Err(e) => {
248                    let ts_event = clock.get_time_ns();
249                    let error_msg = format!("{task_name} error: {e}");
250                    let due_post_only = error_msg.contains("POST_ONLY_REJECTED");
251                    // The order will never appear on the wire, so its
252                    // dispatch identity has to be cleaned up here.
253                    dispatch_state.cleanup_terminal(&client_order_id);
254                    emitter.emit_order_rejected_event(
255                        strategy_id,
256                        instrument_id,
257                        client_order_id,
258                        &error_msg,
259                        ts_event,
260                        due_post_only,
261                    );
262                    Ok(())
263                }
264            }
265        });
266    }
267
268    fn cancel_single_order(&self, cmd: &CancelOrder) {
269        let account_id = self.core.account_id;
270        let client_order_id = cmd.client_order_id;
271        let venue_order_id = cmd.venue_order_id;
272        let strategy_id = cmd.strategy_id;
273        let instrument_id = cmd.instrument_id;
274
275        log::info!(
276            "Canceling order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
277        );
278
279        let http = self.http.clone();
280        let emitter = self.emitter.clone();
281        let clock = self.clock;
282
283        self.spawn_task("cancel_order", async move {
284            if let Err(e) = http
285                .cancel_order(
286                    account_id,
287                    instrument_id,
288                    Some(client_order_id),
289                    venue_order_id,
290                )
291                .await
292            {
293                let ts_event = clock.get_time_ns();
294                emitter.emit_order_cancel_rejected_event(
295                    strategy_id,
296                    instrument_id,
297                    client_order_id,
298                    venue_order_id,
299                    &format!("cancel-order error: {e}"),
300                    ts_event,
301                );
302                anyhow::bail!("Cancel order failed: {e}");
303            }
304            Ok(())
305        });
306    }
307
308    fn spawn_message_handler(&mut self) -> anyhow::Result<()> {
309        let mut rx = self
310            .ws
311            .take_output_rx()
312            .context("Failed to take futures WebSocket output receiver")?;
313        let emitter = self.emitter.clone();
314        let instruments = self.instruments.clone();
315        let truncated_id_map = self.truncated_id_map.clone();
316        let order_instrument_map = self.order_instrument_map.clone();
317        let venue_client_map = self.venue_client_map.clone();
318        let venue_order_qty = self.venue_order_qty.clone();
319        let dispatch_state = self.ws_dispatch_state.clone();
320        let account_id = self.core.account_id;
321        let clock = self.clock;
322        let cancellation_token = self.cancellation_token.clone();
323
324        let handle = get_runtime().spawn(async move {
325            loop {
326                tokio::select! {
327                    () = cancellation_token.cancelled() => {
328                        log::debug!("Futures execution message handler cancelled");
329                        break;
330                    }
331                    msg = rx.recv() => {
332                        match msg {
333                            Some(ws_msg) => {
334                                Self::handle_ws_message(
335                                    ws_msg,
336                                    &emitter,
337                                    &dispatch_state,
338                                    &instruments,
339                                    &truncated_id_map,
340                                    &order_instrument_map,
341                                    &venue_client_map,
342                                    &venue_order_qty,
343                                    account_id,
344                                    clock,
345                                );
346                            }
347                            None => {
348                                log::debug!("Futures execution WebSocket stream ended");
349                                break;
350                            }
351                        }
352                    }
353                }
354            }
355        });
356
357        self.ws_stream_handle = Some(handle);
358        Ok(())
359    }
360
361    #[expect(clippy::too_many_arguments)]
362    fn handle_ws_message(
363        msg: KrakenFuturesWsMessage,
364        emitter: &ExecutionEventEmitter,
365        dispatch_state: &Arc<WsDispatchState>,
366        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
367        truncated_id_map: &Arc<AtomicMap<String, ClientOrderId>>,
368        order_instrument_map: &Arc<AtomicMap<String, InstrumentId>>,
369        venue_client_map: &Arc<AtomicMap<String, ClientOrderId>>,
370        venue_order_qty: &Arc<AtomicMap<String, Quantity>>,
371        account_id: AccountId,
372        clock: &'static AtomicTime,
373    ) {
374        let ts_init = clock.get_time_ns();
375
376        match msg {
377            KrakenFuturesWsMessage::OpenOrdersDelta(delta) => {
378                dispatch::futures::open_orders_delta(
379                    &delta,
380                    dispatch_state,
381                    emitter,
382                    instruments,
383                    truncated_id_map,
384                    order_instrument_map,
385                    venue_client_map,
386                    venue_order_qty,
387                    account_id,
388                    ts_init,
389                );
390            }
391            KrakenFuturesWsMessage::OpenOrdersCancel(cancel) => {
392                dispatch::futures::open_orders_cancel(
393                    &cancel,
394                    dispatch_state,
395                    emitter,
396                    truncated_id_map,
397                    order_instrument_map,
398                    venue_client_map,
399                    venue_order_qty,
400                    account_id,
401                    ts_init,
402                );
403            }
404            KrakenFuturesWsMessage::FillsDelta(fills_delta) => {
405                dispatch::futures::fills_delta(
406                    &fills_delta,
407                    dispatch_state,
408                    emitter,
409                    instruments,
410                    truncated_id_map,
411                    venue_client_map,
412                    account_id,
413                    ts_init,
414                );
415            }
416            KrakenFuturesWsMessage::Challenge(challenge) => {
417                log::debug!("Received challenge: length={}", challenge.len());
418            }
419            KrakenFuturesWsMessage::Reconnected => {
420                log::info!("Futures execution WebSocket reconnected");
421            }
422            KrakenFuturesWsMessage::Ticker(_)
423            | KrakenFuturesWsMessage::Trade(_)
424            | KrakenFuturesWsMessage::BookSnapshot(_)
425            | KrakenFuturesWsMessage::BookDelta(_) => {}
426        }
427    }
428
429    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
430        let account_id = self.core.account_id;
431
432        if self.core.cache().account(&account_id).is_some() {
433            log::info!("Account {account_id} registered");
434            return Ok(());
435        }
436
437        let start = Instant::now();
438        let timeout = Duration::from_secs_f64(timeout_secs);
439        let interval = Duration::from_millis(10);
440
441        loop {
442            tokio::time::sleep(interval).await;
443
444            if self.core.cache().account(&account_id).is_some() {
445                log::info!("Account {account_id} registered");
446                return Ok(());
447            }
448
449            if start.elapsed() >= timeout {
450                anyhow::bail!(
451                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
452                );
453            }
454        }
455    }
456
457    fn modify_single_order(&self, cmd: &ModifyOrder) {
458        let client_order_id = cmd.client_order_id;
459        let venue_order_id = cmd.venue_order_id;
460        let strategy_id = cmd.strategy_id;
461        let instrument_id = cmd.instrument_id;
462        let quantity = cmd.quantity;
463        let price = cmd.price;
464
465        log::info!(
466            "Modifying order: venue_order_id={venue_order_id:?}, client_order_id={client_order_id}"
467        );
468
469        let http = self.http.clone();
470        let emitter = self.emitter.clone();
471        let clock = self.clock;
472
473        self.spawn_task("modify_order", async move {
474            if let Err(e) = http
475                .modify_order(
476                    instrument_id,
477                    Some(client_order_id),
478                    venue_order_id,
479                    quantity,
480                    price,
481                    None,
482                )
483                .await
484            {
485                let ts_event = clock.get_time_ns();
486                emitter.emit_order_modify_rejected_event(
487                    strategy_id,
488                    instrument_id,
489                    client_order_id,
490                    venue_order_id,
491                    &format!("modify-order error: {e}"),
492                    ts_event,
493                );
494                anyhow::bail!("Modify order failed: {e}");
495            }
496            Ok(())
497        });
498    }
499}
500
501#[async_trait(?Send)]
502impl ExecutionClient for KrakenFuturesExecutionClient {
503    fn is_connected(&self) -> bool {
504        self.core.is_connected()
505    }
506
507    fn client_id(&self) -> ClientId {
508        self.core.client_id
509    }
510
511    fn account_id(&self) -> AccountId {
512        self.core.account_id
513    }
514
515    fn venue(&self) -> Venue {
516        *KRAKEN_VENUE
517    }
518
519    fn oms_type(&self) -> OmsType {
520        self.core.oms_type
521    }
522
523    fn get_account(&self) -> Option<AccountAny> {
524        self.core.cache().account(&self.core.account_id).cloned()
525    }
526
527    fn generate_account_state(
528        &self,
529        balances: Vec<AccountBalance>,
530        margins: Vec<MarginBalance>,
531        reported: bool,
532        ts_event: UnixNanos,
533    ) -> anyhow::Result<()> {
534        self.emitter
535            .emit_account_state(balances, margins, reported, ts_event);
536        Ok(())
537    }
538
539    fn start(&mut self) -> anyhow::Result<()> {
540        if self.core.is_started() {
541            return Ok(());
542        }
543
544        self.emitter.set_sender(get_exec_event_sender());
545        self.core.set_started();
546
547        log::info!(
548            "Started: client_id={}, account_id={}, product_type=Futures, environment={:?}",
549            self.core.client_id,
550            self.core.account_id,
551            self.config.environment
552        );
553        Ok(())
554    }
555
556    fn stop(&mut self) -> anyhow::Result<()> {
557        if self.core.is_stopped() {
558            return Ok(());
559        }
560
561        self.cancellation_token.cancel();
562        self.core.set_stopped();
563        self.core.set_disconnected();
564        log::info!("Stopped: client_id={}", self.core.client_id);
565        Ok(())
566    }
567
568    async fn connect(&mut self) -> anyhow::Result<()> {
569        if self.core.is_connected() {
570            return Ok(());
571        }
572
573        if !self.core.instruments_initialized() {
574            let instruments = self
575                .http
576                .request_instruments()
577                .await
578                .context("Failed to load Kraken futures instruments")?;
579            log::info!("Loaded {} Futures instruments", instruments.len());
580            self.http.cache_instruments(&instruments);
581            self.core.set_instruments_initialized();
582        }
583
584        self.instruments.rcu(|m| {
585            for instrument in self.http.instruments_cache.load().values() {
586                m.insert(instrument.id(), instrument.clone());
587            }
588        });
589
590        self.ws
591            .connect()
592            .await
593            .context("Failed to connect futures WebSocket")?;
594        self.ws
595            .wait_until_active(10.0)
596            .await
597            .context("Futures WebSocket failed to become active")?;
598
599        self.ws
600            .authenticate()
601            .await
602            .context("Failed to authenticate futures WebSocket")?;
603
604        // Request and register account state before message handler
605        let account_state = self
606            .http
607            .request_account_state(self.core.account_id)
608            .await
609            .context("Failed to request Kraken futures account state")?;
610
611        if !account_state.balances.is_empty() {
612            log::info!(
613                "Received account state with {} balance(s)",
614                account_state.balances.len()
615            );
616        }
617        self.emitter.send_account_state(account_state);
618        self.await_account_registered(30.0).await?;
619
620        self.spawn_message_handler()?;
621
622        self.ws
623            .subscribe_executions()
624            .await
625            .context("Failed to subscribe to executions")?;
626
627        log::info!("Futures WebSocket authenticated and subscribed to executions");
628
629        self.core.set_connected();
630        log::info!("Connected: client_id={}", self.core.client_id);
631        Ok(())
632    }
633
634    async fn disconnect(&mut self) -> anyhow::Result<()> {
635        if self.core.is_disconnected() {
636            return Ok(());
637        }
638
639        self.cancellation_token.cancel();
640
641        if let Some(handle) = self.ws_stream_handle.take() {
642            handle.abort();
643        }
644
645        let _ = self.ws.close().await;
646
647        self.cancellation_token = CancellationToken::new();
648        self.core.set_disconnected();
649        log::info!("Disconnected: client_id={}", self.core.client_id);
650        Ok(())
651    }
652
653    async fn generate_order_status_report(
654        &self,
655        cmd: &GenerateOrderStatusReport,
656    ) -> anyhow::Result<Option<OrderStatusReport>> {
657        log::debug!(
658            "Generating order status report: venue_order_id={:?}, client_order_id={:?}",
659            cmd.venue_order_id,
660            cmd.client_order_id
661        );
662
663        let account_id = self.core.account_id;
664        let reports = self
665            .http
666            .request_order_status_reports(account_id, None, None, None, false)
667            .await?;
668
669        // Match by venue_order_id or client_order_id (comparing truncated form
670        // since Kraken stores the truncated cl_ord_id for long IDs)
671        let matched = reports.into_iter().find(|r| {
672            cmd.venue_order_id
673                .is_some_and(|id| r.venue_order_id.as_str() == id.as_str())
674                || cmd.client_order_id.is_some_and(|id| {
675                    r.client_order_id
676                        .as_ref()
677                        .is_some_and(|r_id| r_id.as_str() == truncate_cl_ord_id(&id))
678                })
679        });
680
681        if matched.is_some() {
682            return Ok(matched);
683        }
684
685        let Some(order) = self.get_cached_order_for_status_command(cmd) else {
686            return Ok(None);
687        };
688
689        let now = Utc::now();
690        let start = now - Duration::from_secs(5 * 60);
691        let fills = self
692            .http
693            .request_fill_reports(
694                account_id,
695                Some(order.instrument_id()),
696                Some(start),
697                Some(now),
698            )
699            .await?;
700
701        Ok(synthesize_filled_order_status_report(cmd, &order, &fills))
702    }
703
704    async fn generate_order_status_reports(
705        &self,
706        cmd: &GenerateOrderStatusReports,
707    ) -> anyhow::Result<Vec<OrderStatusReport>> {
708        log::debug!(
709            "Generating order status reports: instrument_id={:?}, open_only={}",
710            cmd.instrument_id,
711            cmd.open_only
712        );
713
714        let account_id = self.core.account_id;
715        let start = cmd.start.map(DateTime::<Utc>::from);
716        let end = cmd.end.map(DateTime::<Utc>::from);
717        self.http
718            .request_order_status_reports(account_id, cmd.instrument_id, start, end, cmd.open_only)
719            .await
720    }
721
722    async fn generate_fill_reports(
723        &self,
724        cmd: GenerateFillReports,
725    ) -> anyhow::Result<Vec<FillReport>> {
726        log::debug!(
727            "Generating fill reports: instrument_id={:?}",
728            cmd.instrument_id
729        );
730
731        let account_id = self.core.account_id;
732        let start = cmd.start.map(DateTime::<Utc>::from);
733        let end = cmd.end.map(DateTime::<Utc>::from);
734        let mut reports = self
735            .http
736            .request_fill_reports(account_id, cmd.instrument_id, start, end)
737            .await?;
738
739        if let Some(venue_order_id) = cmd.venue_order_id {
740            reports.retain(|report| report.venue_order_id == venue_order_id);
741        }
742
743        Ok(reports)
744    }
745
746    async fn generate_position_status_reports(
747        &self,
748        cmd: &GeneratePositionStatusReports,
749    ) -> anyhow::Result<Vec<PositionStatusReport>> {
750        log::debug!(
751            "Generating position status reports: instrument_id={:?}",
752            cmd.instrument_id
753        );
754
755        let account_id = self.core.account_id;
756        self.http
757            .request_position_status_reports(account_id, cmd.instrument_id)
758            .await
759    }
760
761    async fn generate_mass_status(
762        &self,
763        lookback_mins: Option<u64>,
764    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
765        log::debug!("Generating mass status: lookback_mins={lookback_mins:?}");
766
767        let start = lookback_mins.map(|mins| Utc::now() - Duration::from_secs(mins * 60));
768
769        let account_id = self.core.account_id;
770        let order_reports = self
771            .http
772            .request_order_status_reports(account_id, None, start, None, true)
773            .await?;
774        let fill_reports = self
775            .http
776            .request_fill_reports(account_id, None, start, None)
777            .await?;
778        let position_reports = self
779            .http
780            .request_position_status_reports(account_id, None)
781            .await?;
782
783        let mut mass_status = ExecutionMassStatus::new(
784            self.core.client_id,
785            self.core.account_id,
786            *KRAKEN_VENUE,
787            self.clock.get_time_ns(),
788            None,
789        );
790        mass_status.add_order_reports(order_reports);
791        mass_status.add_fill_reports(fill_reports);
792        mass_status.add_position_reports(position_reports);
793
794        Ok(Some(mass_status))
795    }
796
797    fn query_account(&self, cmd: QueryAccount) -> anyhow::Result<()> {
798        log::debug!("Querying account: {cmd:?}");
799
800        let account_id = self.core.account_id;
801        let http = self.http.clone();
802        let emitter = self.emitter.clone();
803
804        self.spawn_task("query_account", async move {
805            let account_state = http.request_account_state(account_id).await?;
806            emitter.emit_account_state(
807                account_state.balances.clone(),
808                account_state.margins.clone(),
809                account_state.is_reported,
810                account_state.ts_event,
811            );
812            Ok(())
813        });
814
815        Ok(())
816    }
817
818    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
819        log::debug!("Querying order: {cmd:?}");
820
821        let venue_order_id = cmd
822            .venue_order_id
823            .context("venue_order_id required for query_order")?;
824        let account_id = self.core.account_id;
825        let http = self.http.clone();
826        let emitter = self.emitter.clone();
827
828        self.spawn_task("query_order", async move {
829            let reports = http
830                .request_order_status_reports(account_id, None, None, None, true)
831                .await
832                .context("Failed to query order")?;
833
834            if let Some(report) = reports
835                .into_iter()
836                .find(|r| r.venue_order_id == venue_order_id)
837            {
838                emitter.send_order_status_report(report);
839            }
840            Ok(())
841        });
842
843        Ok(())
844    }
845
846    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
847        let order = self
848            .core
849            .cache()
850            .order(&cmd.client_order_id)
851            .cloned()
852            .ok_or_else(|| anyhow::anyhow!("Order not found in cache: {}", cmd.client_order_id))?;
853        self.submit_single_order(&order, "submit_order");
854        Ok(())
855    }
856
857    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
858        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
859
860        log::info!(
861            "Submitting order list: order_list_id={}, count={}",
862            cmd.order_list.id,
863            orders.len()
864        );
865
866        let mut order_tuples = Vec::with_capacity(orders.len());
867        let mut order_meta = Vec::with_capacity(orders.len());
868
869        for order in &orders {
870            if order.is_closed() {
871                log::warn!(
872                    "Cannot submit closed order: client_order_id={}",
873                    order.client_order_id()
874                );
875                continue;
876            }
877
878            // Kraken batch endpoint only supports limit and stop orders,
879            // submit market orders individually
880            if order.order_type() == OrderType::Market {
881                self.submit_single_order(order, "submit_order_list");
882                continue;
883            }
884
885            let client_order_id = order.client_order_id();
886            let kraken_cl_ord_id = truncate_cl_ord_id(&client_order_id);
887
888            if kraken_cl_ord_id != client_order_id.as_str() {
889                self.truncated_id_map
890                    .insert(kraken_cl_ord_id, client_order_id);
891            }
892
893            self.register_order_identity(order);
894            self.emitter.emit_order_submitted(order);
895
896            order_tuples.push((
897                order.instrument_id(),
898                client_order_id,
899                order.order_side(),
900                order.order_type(),
901                order.quantity(),
902                order.time_in_force(),
903                order.price(),
904                order.trigger_price(),
905                order.trigger_type(),
906                order.is_reduce_only(),
907                order.is_post_only(),
908            ));
909
910            order_meta.push((order.strategy_id(), order.instrument_id(), client_order_id));
911        }
912
913        if order_tuples.is_empty() {
914            return Ok(());
915        }
916
917        let http = self.http.clone();
918        let emitter = self.emitter.clone();
919        let clock = self.clock;
920        let dispatch_state = self.ws_dispatch_state.clone();
921
922        self.spawn_task("submit_order_list", async move {
923            match http.submit_orders_batch(order_tuples).await {
924                Ok(statuses) => {
925                    for (i, status) in statuses.iter().enumerate() {
926                        if status.status != "placed"
927                            && status.status != "filled"
928                            && let Some((strategy_id, instrument_id, client_order_id)) =
929                                order_meta.get(i)
930                        {
931                            let ts_event = clock.get_time_ns();
932                            let error_msg = format!(
933                                "submit_order_list batch item rejected: {}",
934                                status.status,
935                            );
936                            dispatch_state.cleanup_terminal(client_order_id);
937                            emitter.emit_order_rejected_event(
938                                *strategy_id,
939                                *instrument_id,
940                                *client_order_id,
941                                &error_msg,
942                                ts_event,
943                                status.status == "postWouldExecute",
944                            );
945                        }
946                    }
947                    Ok(())
948                }
949                Err(e) => {
950                    let ts_event = clock.get_time_ns();
951
952                    for (strategy_id, instrument_id, client_order_id) in &order_meta {
953                        let error_msg = format!("submit_order_list batch error: {e}");
954                        dispatch_state.cleanup_terminal(client_order_id);
955                        emitter.emit_order_rejected_event(
956                            *strategy_id,
957                            *instrument_id,
958                            *client_order_id,
959                            &error_msg,
960                            ts_event,
961                            false,
962                        );
963                    }
964                    Ok(())
965                }
966            }
967        });
968
969        Ok(())
970    }
971
972    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
973        self.modify_single_order(&cmd);
974        Ok(())
975    }
976
977    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
978        self.cancel_single_order(&cmd);
979        Ok(())
980    }
981
982    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
983        let instrument_id = cmd.instrument_id;
984
985        if cmd.order_side == OrderSide::NoOrderSide {
986            log::info!("Canceling all orders: instrument_id={instrument_id} (bulk)");
987
988            let http = self.http.clone();
989            let symbol = instrument_id.symbol.to_string();
990
991            self.spawn_task("cancel_all_orders", async move {
992                if let Err(e) = http.inner.cancel_all_orders(Some(symbol)).await {
993                    anyhow::bail!("Cancel all orders failed: {e}");
994                }
995                Ok(())
996            });
997
998            return Ok(());
999        }
1000
1001        log::info!(
1002            "Canceling all orders: instrument_id={instrument_id}, side={:?}",
1003            cmd.order_side
1004        );
1005
1006        let orders_to_cancel: Vec<_> = {
1007            let cache = self.core.cache();
1008            let open_orders = cache.orders_open(None, Some(&instrument_id), None, None, None);
1009
1010            open_orders
1011                .into_iter()
1012                .filter(|order| order.order_side() == cmd.order_side)
1013                .filter_map(|order| {
1014                    Some((
1015                        order.venue_order_id()?,
1016                        order.client_order_id(),
1017                        order.instrument_id(),
1018                        order.strategy_id(),
1019                    ))
1020                })
1021                .collect()
1022        };
1023
1024        let account_id = self.core.account_id;
1025
1026        for (venue_order_id, client_order_id, order_instrument_id, strategy_id) in orders_to_cancel
1027        {
1028            let http = self.http.clone();
1029            let emitter = self.emitter.clone();
1030            let clock = self.clock;
1031
1032            self.spawn_task("cancel_order_by_side", async move {
1033                if let Err(e) = http
1034                    .cancel_order(
1035                        account_id,
1036                        order_instrument_id,
1037                        Some(client_order_id),
1038                        Some(venue_order_id),
1039                    )
1040                    .await
1041                {
1042                    log::error!("Cancel order failed: {e}");
1043                    let ts_event = clock.get_time_ns();
1044                    emitter.emit_order_cancel_rejected_event(
1045                        strategy_id,
1046                        order_instrument_id,
1047                        client_order_id,
1048                        Some(venue_order_id),
1049                        &format!("cancel-order error: {e}"),
1050                        ts_event,
1051                    );
1052                }
1053                Ok(())
1054            });
1055        }
1056
1057        Ok(())
1058    }
1059
1060    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1061        log::info!(
1062            "Batch canceling orders: instrument_id={}, count={}",
1063            cmd.instrument_id,
1064            cmd.cancels.len()
1065        );
1066
1067        for cancel in &cmd.cancels {
1068            self.cancel_single_order(cancel);
1069        }
1070
1071        Ok(())
1072    }
1073}
1074
1075impl KrakenFuturesExecutionClient {
1076    fn get_cached_order_for_status_command(
1077        &self,
1078        cmd: &GenerateOrderStatusReport,
1079    ) -> Option<OrderAny> {
1080        let cache = self.core.cache();
1081
1082        if let Some(client_order_id) = cmd.client_order_id {
1083            return cache.order(&client_order_id).cloned();
1084        }
1085
1086        let venue_order_id = cmd.venue_order_id?;
1087        let client_order_id = *cache.client_order_id(&venue_order_id)?;
1088        cache.order(&client_order_id).cloned()
1089    }
1090}
1091
1092fn synthesize_filled_order_status_report(
1093    cmd: &GenerateOrderStatusReport,
1094    order: &OrderAny,
1095    fills: &[FillReport],
1096) -> Option<OrderStatusReport> {
1097    let venue_order_id = cmd.venue_order_id.or(order.venue_order_id());
1098    let truncated_client_order_id = truncate_cl_ord_id(&order.client_order_id());
1099
1100    let mut matched: Vec<&FillReport> = if let Some(venue_order_id) = venue_order_id {
1101        fills
1102            .iter()
1103            .filter(|fill| fill.venue_order_id == venue_order_id)
1104            .collect()
1105    } else {
1106        Vec::new()
1107    };
1108
1109    if matched.is_empty() {
1110        matched = fills
1111            .iter()
1112            .filter(|fill| {
1113                fill.client_order_id == Some(order.client_order_id())
1114                    || fill
1115                        .client_order_id
1116                        .as_ref()
1117                        .is_some_and(|fill_client_order_id| {
1118                            fill_client_order_id.as_str() == truncated_client_order_id
1119                        })
1120            })
1121            .collect();
1122    }
1123
1124    if matched.is_empty() {
1125        return None;
1126    }
1127
1128    matched.sort_by_key(|fill| fill.ts_event);
1129    let first_fill = *matched.first()?;
1130    let last_fill = *matched.last()?;
1131
1132    let total_filled = matched
1133        .iter()
1134        .fold(Decimal::ZERO, |acc, fill| acc + fill.last_qty.as_decimal());
1135    if total_filled < order.quantity().as_decimal() {
1136        return None;
1137    }
1138
1139    let total_notional = matched.iter().fold(Decimal::ZERO, |acc, fill| {
1140        acc + fill.last_qty.as_decimal() * fill.last_px.as_decimal()
1141    });
1142    let avg_px = if total_filled.is_zero() {
1143        None
1144    } else {
1145        Some(total_notional / total_filled)
1146    };
1147    let venue_order_id = venue_order_id.unwrap_or(first_fill.venue_order_id);
1148
1149    let mut report = OrderStatusReport::new(
1150        first_fill.account_id,
1151        order.instrument_id(),
1152        Some(order.client_order_id()),
1153        venue_order_id,
1154        order.order_side(),
1155        order.order_type(),
1156        order.time_in_force(),
1157        OrderStatus::Filled,
1158        order.quantity(),
1159        order.quantity(),
1160        first_fill.ts_event,
1161        last_fill.ts_event,
1162        last_fill.ts_init,
1163        None,
1164    );
1165    report.order_list_id = order.order_list_id();
1166    report.venue_position_id = matched.iter().rev().find_map(|fill| fill.venue_position_id);
1167    report.linked_order_ids = order
1168        .linked_order_ids()
1169        .map(|linked_order_ids| linked_order_ids.to_vec());
1170    report.parent_order_id = order.parent_order_id();
1171    report.expire_time = order.expire_time();
1172    report.price = order.price();
1173    report.trigger_price = order.trigger_price();
1174    report.trigger_type = order.trigger_type();
1175    report.avg_px = avg_px;
1176    report.display_qty = order.display_qty();
1177    report.post_only = order.is_post_only();
1178    report.reduce_only = order.is_reduce_only();
1179    Some(report)
1180}
1181
1182#[cfg(test)]
1183mod tests {
1184    use nautilus_core::{UUID4, UnixNanos};
1185    use nautilus_model::{
1186        enums::{LiquiditySide, OrderSide, OrderType, TimeInForce},
1187        identifiers::{AccountId, ClientOrderId, InstrumentId, TradeId, VenueOrderId},
1188        orders::OrderTestBuilder,
1189        reports::FillReport,
1190        types::{Currency, Money, Price, Quantity},
1191    };
1192    use rstest::rstest;
1193
1194    use super::*;
1195
1196    const TEST_INSTRUMENT_ID: &str = "PF_XBTUSD.KRAKEN";
1197
1198    fn make_fill(
1199        venue_order_id: &str,
1200        client_order_id: Option<&str>,
1201        quantity: &str,
1202        price: &str,
1203        ts_event: u64,
1204    ) -> FillReport {
1205        FillReport::new(
1206            AccountId::from("KRAKEN-001"),
1207            InstrumentId::from(TEST_INSTRUMENT_ID),
1208            VenueOrderId::from(venue_order_id),
1209            TradeId::from(format!("T-{ts_event}").as_str()),
1210            OrderSide::Buy,
1211            Quantity::from(quantity),
1212            Price::from(price),
1213            Money::new(0.0, Currency::USD()),
1214            LiquiditySide::Taker,
1215            client_order_id.map(ClientOrderId::from),
1216            None,
1217            UnixNanos::from(ts_event),
1218            UnixNanos::from(ts_event),
1219            None,
1220        )
1221    }
1222
1223    fn make_cmd(
1224        client_order_id: Option<&str>,
1225        venue_order_id: Option<&str>,
1226    ) -> GenerateOrderStatusReport {
1227        GenerateOrderStatusReport::new(
1228            UUID4::new(),
1229            UnixNanos::default(),
1230            Some(InstrumentId::from(TEST_INSTRUMENT_ID)),
1231            client_order_id.map(ClientOrderId::from),
1232            venue_order_id.map(VenueOrderId::from),
1233            None,
1234            None,
1235        )
1236    }
1237
1238    fn make_order(client_order_id: &str) -> OrderAny {
1239        OrderTestBuilder::new(OrderType::Market)
1240            .instrument_id(InstrumentId::from(TEST_INSTRUMENT_ID))
1241            .client_order_id(ClientOrderId::from(client_order_id))
1242            .side(OrderSide::Buy)
1243            .quantity(Quantity::from("100"))
1244            .time_in_force(TimeInForce::Ioc)
1245            .build()
1246    }
1247
1248    #[rstest]
1249    fn test_synthesize_filled_order_status_report_matches_full_fill_by_venue_order_id() {
1250        let order = make_order("O-123456");
1251        let cmd = make_cmd(Some("O-123456"), Some("KRAKEN-789"));
1252        let fills = vec![
1253            make_fill("KRAKEN-789", Some("O-123456"), "40", "50000.0", 1),
1254            make_fill("KRAKEN-789", Some("O-123456"), "60", "50010.0", 2),
1255            make_fill("KRAKEN-OTHER", Some("O-123456"), "999", "1.0", 3),
1256        ];
1257
1258        let report = synthesize_filled_order_status_report(&cmd, &order, &fills)
1259            .expect("expected a filled report");
1260
1261        assert_eq!(report.venue_order_id, VenueOrderId::from("KRAKEN-789"));
1262        assert_eq!(
1263            report.client_order_id,
1264            Some(ClientOrderId::from("O-123456"))
1265        );
1266        assert_eq!(report.order_status, OrderStatus::Filled);
1267        assert_eq!(report.order_type, OrderType::Market);
1268        assert_eq!(report.time_in_force, TimeInForce::Ioc);
1269        assert_eq!(report.quantity, Quantity::from("100"));
1270        assert_eq!(report.filled_qty, Quantity::from("100"));
1271        assert_eq!(
1272            report.avg_px,
1273            Some(Decimal::from_str_exact("50006.0").unwrap())
1274        );
1275    }
1276
1277    #[rstest]
1278    fn test_synthesize_filled_order_status_report_requires_full_fill_size() {
1279        let order = make_order("O-123457");
1280        let cmd = make_cmd(Some("O-123457"), Some("KRAKEN-790"));
1281        let fills = vec![make_fill(
1282            "KRAKEN-790",
1283            Some("O-123457"),
1284            "40",
1285            "50000.0",
1286            1,
1287        )];
1288
1289        assert!(synthesize_filled_order_status_report(&cmd, &order, &fills).is_none());
1290    }
1291
1292    #[rstest]
1293    fn test_synthesize_filled_order_status_report_matches_truncated_client_order_id() {
1294        let long_client_order_id = "O202602270023210040011";
1295        let order = make_order(long_client_order_id);
1296        let cmd = make_cmd(Some(long_client_order_id), None);
1297        let fills = vec![make_fill(
1298            "KRAKEN-791",
1299            Some(truncate_cl_ord_id(&ClientOrderId::from(long_client_order_id)).as_str()),
1300            "100",
1301            "50000.0",
1302            1,
1303        )];
1304
1305        let report = synthesize_filled_order_status_report(&cmd, &order, &fills)
1306            .expect("expected a filled report");
1307
1308        assert_eq!(
1309            report.client_order_id,
1310            Some(ClientOrderId::from(long_client_order_id))
1311        );
1312        assert_eq!(report.venue_order_id, VenueOrderId::from("KRAKEN-791"));
1313        assert_eq!(report.order_status, OrderStatus::Filled);
1314    }
1315}