Skip to main content

nautilus_deribit/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Deribit adapter.
17
18use std::{future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures_util::{StreamExt, pin_mut};
23use nautilus_common::{
24    clients::ExecutionClient,
25    live::{get_runtime, runner::get_exec_event_sender},
26    messages::execution::{
27        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28        GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
29        GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
30        GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
31        SubmitOrderList,
32    },
33};
34use nautilus_core::{
35    MUTEX_POISONED, UnixNanos,
36    datetime::NANOSECONDS_IN_SECOND,
37    time::{AtomicTime, get_atomic_clock_realtime},
38};
39use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
40use nautilus_model::{
41    accounts::AccountAny,
42    enums::{AccountType, OmsType, OrderSide, OrderType, TimeInForce},
43    events::OrderEventAny,
44    identifiers::{AccountId, ClientId, Venue},
45    orders::{Order, OrderAny},
46    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47    types::{AccountBalance, MarginBalance},
48};
49use tokio::task::JoinHandle;
50
51use crate::{
52    common::{
53        consts::{DERIBIT_VENUE, DERIBIT_WS_HEARTBEAT_SECS},
54        enums::resolve_trigger_type,
55    },
56    config::DeribitExecClientConfig,
57    http::{client::DeribitHttpClient, models::DeribitCurrency, query::GetOrderStateParams},
58    websocket::{
59        auth::DERIBIT_EXECUTION_SESSION_NAME,
60        client::DeribitWebSocketClient,
61        messages::{DeribitOrderParams, NautilusWsMessage},
62        parse::parse_user_order_msg,
63    },
64};
65
66/// Deribit live execution client.
67#[derive(Debug)]
68pub struct DeribitExecutionClient {
69    core: ExecutionClientCore,
70    clock: &'static AtomicTime,
71    config: DeribitExecClientConfig,
72    emitter: ExecutionEventEmitter,
73    http_client: DeribitHttpClient,
74    ws_client: DeribitWebSocketClient,
75    ws_stream_handle: Option<JoinHandle<()>>,
76    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
77}
78
79impl DeribitExecutionClient {
80    /// Creates a new [`DeribitExecutionClient`].
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the client fails to initialize.
85    pub fn new(core: ExecutionClientCore, config: DeribitExecClientConfig) -> anyhow::Result<Self> {
86        let http_client = if config.has_api_credentials() {
87            DeribitHttpClient::new_with_env(
88                config.api_key.clone(),
89                config.api_secret.clone(),
90                config.base_url_http.clone(),
91                config.environment,
92                config.http_timeout_secs,
93                config.max_retries,
94                config.retry_delay_initial_ms,
95                config.retry_delay_max_ms,
96                config.proxy_url.clone(),
97            )?
98        } else {
99            DeribitHttpClient::new(
100                config.base_url_http.clone(),
101                config.environment,
102                config.http_timeout_secs,
103                config.max_retries,
104                config.retry_delay_initial_ms,
105                config.retry_delay_max_ms,
106                config.proxy_url.clone(),
107            )?
108        };
109
110        let mut ws_client = DeribitWebSocketClient::new(
111            config.base_url_ws.clone(),
112            config.api_key.clone(),
113            config.api_secret.clone(),
114            DERIBIT_WS_HEARTBEAT_SECS,
115            config.environment,
116            config.transport_backend,
117            config.proxy_url.clone(),
118        )
119        .context("failed to create WebSocket client for execution")?;
120        // Set account ID for order/fill reports
121        ws_client.set_account_id(core.account_id);
122
123        let clock = get_atomic_clock_realtime();
124        let emitter = ExecutionEventEmitter::new(
125            clock,
126            core.trader_id,
127            core.account_id,
128            AccountType::Margin,
129            None,
130        );
131
132        Ok(Self {
133            core,
134            clock,
135            config,
136            emitter,
137            http_client,
138            ws_client,
139            ws_stream_handle: None,
140            pending_tasks: Mutex::new(Vec::new()),
141        })
142    }
143
144    /// Spawns an async task for execution operations.
145    fn spawn_task<F>(&self, description: &'static str, fut: F)
146    where
147        F: Future<Output = anyhow::Result<()>> + Send + 'static,
148    {
149        let runtime = get_runtime();
150        let handle = runtime.spawn(async move {
151            if let Err(e) = fut.await {
152                log::warn!("{description} failed: {e:?}");
153            }
154        });
155
156        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
157        tasks.retain(|handle| !handle.is_finished());
158        tasks.push(handle);
159    }
160
161    /// Aborts all pending async tasks.
162    fn abort_pending_tasks(&self) {
163        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
164        for handle in tasks.drain(..) {
165            handle.abort();
166        }
167    }
168
169    // Rejects unsupported order types and time-in-force values
170    fn build_order_params(order: &dyn Order) -> anyhow::Result<DeribitOrderParams> {
171        let order_type = match order.order_type() {
172            OrderType::Limit => "limit",
173            OrderType::Market => "market",
174            OrderType::StopLimit => "stop_limit",
175            OrderType::StopMarket => "stop_market",
176            OrderType::LimitIfTouched => "take_limit",
177            OrderType::MarketIfTouched => "take_market",
178            other => {
179                anyhow::bail!("Unsupported order type {other:?} for Deribit");
180            }
181        }
182        .to_string();
183
184        let time_in_force = Some(
185            match order.time_in_force() {
186                TimeInForce::Gtc => "good_til_cancelled",
187                TimeInForce::Ioc => "immediate_or_cancel",
188                TimeInForce::Fok => "fill_or_kill",
189                TimeInForce::Gtd => {
190                    if order.expire_time().is_some() {
191                        log::warn!(
192                            "Deribit GTD orders expire at 8:00 UTC only - custom expire_time is ignored. \
193                            For custom expiry times, use managed GTD with emulation_trigger"
194                        );
195                    }
196                    "good_til_day"
197                }
198                other => {
199                    anyhow::bail!("Unsupported time_in_force {other:?} for Deribit");
200                }
201            }
202            .to_string(),
203        );
204
205        // Deribit's `valid_until` is a REQUEST timeout, not order expiry.
206        // Deribit's `good_til_day` expires at end of trading session (8 UTC).
207        let valid_until = None;
208
209        let trigger = resolve_trigger_type(order.trigger_type());
210
211        Ok(DeribitOrderParams {
212            instrument_name: order.instrument_id().symbol.to_string(),
213            amount: order.quantity().as_decimal(),
214            order_type,
215            label: Some(order.client_order_id().to_string()),
216            price: order.price().map(|p| p.as_decimal()),
217            time_in_force,
218            post_only: if order.is_post_only() {
219                Some(true)
220            } else {
221                None
222            },
223            reject_post_only: if order.is_post_only() {
224                Some(true)
225            } else {
226                None
227            },
228            reduce_only: if order.is_reduce_only() {
229                Some(true)
230            } else {
231                None
232            },
233            trigger_price: order.trigger_price().map(|p| p.as_decimal()),
234            trigger,
235            max_show: None,
236            valid_until,
237        })
238    }
239
240    /// Submits a single order to Deribit.
241    ///
242    /// This is the core submission logic shared by `submit_order` and `submit_order_list`.
243    fn submit_single_order(&self, order: &OrderAny, task_name: &'static str) {
244        if order.is_closed() {
245            log::warn!("Cannot submit closed order {}", order.client_order_id());
246            return;
247        }
248
249        let params = match Self::build_order_params(order) {
250            Ok(params) => params,
251            Err(e) => {
252                let ts_event = self.clock.get_time_ns();
253                self.emitter.emit_order_rejected_event(
254                    order.strategy_id(),
255                    order.instrument_id(),
256                    order.client_order_id(),
257                    &format!("{e}"),
258                    ts_event,
259                    false,
260                );
261                return;
262            }
263        };
264        let client_order_id = order.client_order_id();
265        let trader_id = order.trader_id();
266        let strategy_id = order.strategy_id();
267        let instrument_id = order.instrument_id();
268        let order_side = order.order_side();
269
270        log::debug!("OrderSubmitted client_order_id={client_order_id}");
271        self.emitter.emit_order_submitted(order);
272
273        let ws_client = self.ws_client.clone();
274        let emitter = self.emitter.clone();
275        let clock = self.clock;
276
277        self.spawn_task(task_name, async move {
278            let result = ws_client
279                .submit_order(
280                    order_side,
281                    params,
282                    client_order_id,
283                    trader_id,
284                    strategy_id,
285                    instrument_id,
286                )
287                .await;
288
289            if let Err(e) = result {
290                let ts_event = clock.get_time_ns();
291                emitter.emit_order_rejected_event(
292                    strategy_id,
293                    instrument_id,
294                    client_order_id,
295                    &format!("{task_name}-error: {e}"),
296                    ts_event,
297                    false,
298                );
299                return Err(e.into());
300            }
301
302            Ok(())
303        });
304    }
305
306    /// Spawns a stream handler to dispatch WebSocket messages to the execution engine.
307    fn spawn_stream_handler(
308        &mut self,
309        stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
310    ) {
311        if self.ws_stream_handle.is_some() {
312            return;
313        }
314
315        let emitter = self.emitter.clone();
316
317        let handle = get_runtime().spawn(async move {
318            pin_mut!(stream);
319            while let Some(message) = stream.next().await {
320                dispatch_ws_message(message, &emitter);
321            }
322        });
323
324        self.ws_stream_handle = Some(handle);
325        log::info!("WebSocket stream handler started");
326    }
327}
328
329#[async_trait(?Send)]
330impl ExecutionClient for DeribitExecutionClient {
331    fn is_connected(&self) -> bool {
332        self.core.is_connected()
333    }
334
335    fn client_id(&self) -> ClientId {
336        self.core.client_id
337    }
338
339    fn account_id(&self) -> AccountId {
340        self.core.account_id
341    }
342
343    fn venue(&self) -> Venue {
344        *DERIBIT_VENUE
345    }
346
347    fn oms_type(&self) -> OmsType {
348        self.core.oms_type
349    }
350
351    fn get_account(&self) -> Option<AccountAny> {
352        self.core.cache().account(&self.core.account_id).cloned()
353    }
354
355    fn generate_account_state(
356        &self,
357        balances: Vec<AccountBalance>,
358        margins: Vec<MarginBalance>,
359        reported: bool,
360        ts_event: UnixNanos,
361    ) -> anyhow::Result<()> {
362        self.emitter
363            .emit_account_state(balances, margins, reported, ts_event);
364        Ok(())
365    }
366
367    fn start(&mut self) -> anyhow::Result<()> {
368        if self.core.is_started() {
369            return Ok(());
370        }
371
372        let sender = get_exec_event_sender();
373        self.emitter.set_sender(sender);
374        self.core.set_started();
375
376        log::info!(
377            "Started: client_id={}, account_id={}, account_type={:?}, product_types={:?}, environment={}",
378            self.core.client_id,
379            self.core.account_id,
380            self.core.account_type,
381            self.config.product_types,
382            self.config.environment
383        );
384        Ok(())
385    }
386
387    fn stop(&mut self) -> anyhow::Result<()> {
388        if self.core.is_stopped() {
389            return Ok(());
390        }
391
392        self.core.set_stopped();
393        self.core.set_disconnected();
394        self.abort_pending_tasks();
395        log::info!("Stopped: client_id={}", self.core.client_id);
396        Ok(())
397    }
398
399    async fn connect(&mut self) -> anyhow::Result<()> {
400        if self.core.is_connected() {
401            return Ok(());
402        }
403
404        // Check if credentials are available before requesting account state
405        if !self.config.has_api_credentials() {
406            anyhow::bail!("Missing API credentials; set Deribit environment variables");
407        }
408
409        // Set account ID for order/fill reports
410        self.ws_client.set_account_id(self.core.account_id);
411
412        // Fetch and cache instruments in both HTTP client and WebSocket client
413        if !self.core.instruments_initialized() {
414            for product_type in &self.config.product_types {
415                let instruments = self
416                    .http_client
417                    .request_instruments(DeribitCurrency::ANY, Some(*product_type))
418                    .await
419                    .with_context(|| {
420                        format!("failed to request instruments for {product_type:?}")
421                    })?;
422
423                if instruments.is_empty() {
424                    log::warn!("No instruments returned for {product_type:?}");
425                    continue;
426                }
427
428                log::info!("Fetched {} {product_type:?} instruments", instruments.len());
429                self.ws_client.cache_instruments(&instruments);
430                self.http_client.cache_instruments(&instruments);
431            }
432            self.core.set_instruments_initialized();
433        }
434
435        // Fetch initial account state
436        let account_state = self
437            .http_client
438            .request_account_state(self.core.account_id)
439            .await
440            .context("failed to request account state")?;
441
442        self.emitter.send_account_state(account_state);
443
444        self.ws_client
445            .connect()
446            .await
447            .context("failed to connect WebSocket client for execution")?;
448
449        self.ws_client
450            .authenticate_session(DERIBIT_EXECUTION_SESSION_NAME)
451            .await
452            .map_err(|e| anyhow::anyhow!("failed to authenticate WebSocket session: {e}"))?;
453
454        log::info!("WebSocket client authenticated for execution");
455
456        // Subscribe to user order and trade updates for all instruments
457        self.ws_client
458            .subscribe_user_orders()
459            .await
460            .map_err(|e| anyhow::anyhow!("failed to subscribe to user orders: {e}"))?;
461        self.ws_client
462            .subscribe_user_trades()
463            .await
464            .map_err(|e| anyhow::anyhow!("failed to subscribe to user trades: {e}"))?;
465        self.ws_client
466            .subscribe_user_portfolio()
467            .await
468            .map_err(|e| anyhow::anyhow!("failed to subscribe to user portfolio: {e}"))?;
469
470        if let Err(e) = self.ws_client.wait_for_subscriptions_confirmed(30.0).await {
471            // Roll back subscription state so a retry re-sends subscribe requests
472            let _ = self.ws_client.unsubscribe_user_orders().await;
473            let _ = self.ws_client.unsubscribe_user_trades().await;
474            let _ = self.ws_client.unsubscribe_user_portfolio().await;
475            anyhow::bail!("subscription confirmation failed: {e}");
476        }
477
478        log::info!("Subscribed to user order, trade, and portfolio updates");
479
480        // Spawn stream handler to dispatch WebSocket messages to the execution engine
481        let stream = self.ws_client.stream()?;
482        self.spawn_stream_handler(stream);
483
484        self.core.set_connected();
485        log::info!("Connected: client_id={}", self.core.client_id);
486        Ok(())
487    }
488
489    async fn disconnect(&mut self) -> anyhow::Result<()> {
490        if self.core.is_disconnected() {
491            return Ok(());
492        }
493
494        self.abort_pending_tasks();
495
496        // Abort stream handler
497        if let Some(handle) = self.ws_stream_handle.take() {
498            handle.abort();
499        }
500
501        // Close WebSocket client
502        if let Err(e) = self.ws_client.close().await {
503            log::warn!("Error closing WebSocket client: {e}");
504        }
505
506        self.core.set_disconnected();
507        log::info!("Disconnected: client_id={}", self.core.client_id);
508        Ok(())
509    }
510
511    async fn generate_order_status_report(
512        &self,
513        cmd: &GenerateOrderStatusReport,
514    ) -> anyhow::Result<Option<OrderStatusReport>> {
515        // If venue_order_id is provided, fetch the specific order by ID
516        if let Some(venue_order_id) = &cmd.venue_order_id {
517            let params = GetOrderStateParams {
518                order_id: venue_order_id.to_string(),
519            };
520            let ts_init = self.clock.get_time_ns();
521
522            match self.http_client.inner.get_order_state(params).await {
523                Ok(response) => {
524                    if let Some(order) = response.result {
525                        let symbol = ustr::Ustr::from(&order.instrument_name);
526                        if let Some(instrument) = self.http_client.get_instrument(&symbol) {
527                            let report = parse_user_order_msg(
528                                &order,
529                                &instrument,
530                                self.core.account_id,
531                                ts_init,
532                            )?;
533                            return Ok(Some(report));
534                        } else {
535                            log::warn!(
536                                "Instrument {} not in cache for order {}",
537                                order.instrument_name,
538                                order.order_id
539                            );
540                        }
541                    }
542                }
543                Err(e) => {
544                    log::warn!("Failed to get order state: {e}");
545                }
546            }
547            return Ok(None);
548        }
549
550        // If client_order_id is provided, search open then closed orders
551        if let Some(client_order_id) = &cmd.client_order_id {
552            let reports = self
553                .http_client
554                .request_order_status_reports(
555                    self.core.account_id,
556                    cmd.instrument_id,
557                    None,
558                    None,
559                    false, // search all orders, not just open
560                )
561                .await?;
562
563            // Filter by client_order_id
564            for report in reports {
565                if report.client_order_id == Some(*client_order_id) {
566                    return Ok(Some(report));
567                }
568            }
569        }
570
571        Ok(None)
572    }
573
574    async fn generate_order_status_reports(
575        &self,
576        cmd: &GenerateOrderStatusReports,
577    ) -> anyhow::Result<Vec<OrderStatusReport>> {
578        self.http_client
579            .request_order_status_reports(
580                self.core.account_id,
581                cmd.instrument_id,
582                cmd.start,
583                cmd.end,
584                cmd.open_only,
585            )
586            .await
587    }
588
589    async fn generate_fill_reports(
590        &self,
591        cmd: GenerateFillReports,
592    ) -> anyhow::Result<Vec<FillReport>> {
593        let mut reports = self
594            .http_client
595            .request_fill_reports(self.core.account_id, cmd.instrument_id, cmd.start, cmd.end)
596            .await?;
597
598        // Filter by venue_order_id if provided
599        if let Some(venue_order_id) = &cmd.venue_order_id {
600            reports.retain(|r| r.venue_order_id == *venue_order_id);
601        }
602
603        Ok(reports)
604    }
605
606    async fn generate_position_status_reports(
607        &self,
608        cmd: &GeneratePositionStatusReports,
609    ) -> anyhow::Result<Vec<PositionStatusReport>> {
610        self.http_client
611            .request_position_status_reports(self.core.account_id, cmd.instrument_id)
612            .await
613    }
614
615    async fn generate_mass_status(
616        &self,
617        lookback_mins: Option<u64>,
618    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
619        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
620        let ts_now = self.clock.get_time_ns();
621        let start = lookback_mins.map(|mins| {
622            let lookback_ns = mins
623                .saturating_mul(60)
624                .saturating_mul(NANOSECONDS_IN_SECOND);
625            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
626        });
627
628        let order_cmd = GenerateOrderStatusReportsBuilder::default()
629            .ts_init(ts_now)
630            .open_only(false) // get all orders for mass status
631            .start(start)
632            .build()
633            .context("Failed to build GenerateOrderStatusReports")?;
634
635        let fill_cmd = GenerateFillReportsBuilder::default()
636            .ts_init(ts_now)
637            .start(start)
638            .build()
639            .context("Failed to build GenerateFillReports")?;
640
641        let position_cmd = GeneratePositionStatusReportsBuilder::default()
642            .ts_init(ts_now)
643            .start(start)
644            .build()
645            .context("Failed to build GeneratePositionStatusReports")?;
646
647        let (order_reports, fill_reports, position_reports) = tokio::try_join!(
648            self.generate_order_status_reports(&order_cmd),
649            self.generate_fill_reports(fill_cmd),
650            self.generate_position_status_reports(&position_cmd),
651        )?;
652
653        log::info!("Received {} OrderStatusReports", order_reports.len());
654        log::info!("Received {} FillReports", fill_reports.len());
655        log::info!("Received {} PositionReports", position_reports.len());
656
657        let mut mass_status = ExecutionMassStatus::new(
658            self.core.client_id,
659            self.core.account_id,
660            *DERIBIT_VENUE,
661            ts_now,
662            None,
663        );
664
665        mass_status.add_order_reports(order_reports);
666        mass_status.add_fill_reports(fill_reports);
667        mass_status.add_position_reports(position_reports);
668
669        Ok(Some(mass_status))
670    }
671
672    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
673        let http_client = self.http_client.clone();
674        let account_id = self.core.account_id;
675        let emitter = self.emitter.clone();
676
677        self.spawn_task("query_account", async move {
678            let account_state = http_client
679                .request_account_state(account_id)
680                .await
681                .context("failed to query account state (check API credentials are valid)")?;
682
683            emitter.send_account_state(account_state);
684            Ok(())
685        });
686
687        Ok(())
688    }
689
690    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
691        let ws_client = self.ws_client.clone();
692
693        // Extract venue order ID (Deribit's order_id)
694        let order_id = cmd
695            .venue_order_id
696            .as_ref()
697            .ok_or_else(|| anyhow::anyhow!("venue_order_id required for query_order"))?
698            .to_string();
699
700        let client_order_id = cmd.client_order_id;
701        let trader_id = cmd.trader_id;
702        let strategy_id = cmd.strategy_id;
703        let instrument_id = cmd.instrument_id;
704
705        log::info!("Querying order state: order_id={order_id}, client_order_id={client_order_id}");
706
707        // Spawn async task to query order state via WebSocket
708        // Response will be dispatched through the WebSocket stream handler as OrderStatusReport
709        self.spawn_task("query_order", async move {
710            ws_client
711                .query_order(
712                    &order_id,
713                    client_order_id,
714                    trader_id,
715                    strategy_id,
716                    instrument_id,
717                )
718                .await
719                .map_err(|e| anyhow::anyhow!("Query order state failed: {e}"))?;
720            Ok(())
721        });
722
723        Ok(())
724    }
725
726    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
727        let order = self
728            .core
729            .cache()
730            .order(&cmd.client_order_id)
731            .cloned()
732            .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
733        self.submit_single_order(&order, "submit_order");
734        Ok(())
735    }
736
737    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
738        if cmd.order_list.client_order_ids.is_empty() {
739            log::debug!("submit_order_list called with empty order list");
740            return Ok(());
741        }
742
743        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
744
745        log::info!(
746            "Submitting order list {} with {} orders for instrument={}",
747            cmd.order_list.id,
748            orders.len(),
749            cmd.instrument_id
750        );
751
752        // Deribit doesn't have native batch order submission
753        // Loop through and submit each order individually using shared helper
754        for order in &orders {
755            self.submit_single_order(order, "submit_order_list_item");
756        }
757
758        Ok(())
759    }
760
761    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
762        let ws_client = self.ws_client.clone();
763
764        // Extract venue order ID (Deribit's order_id)
765        let order_id = cmd
766            .venue_order_id
767            .as_ref()
768            .ok_or_else(|| anyhow::anyhow!("venue_order_id required for modify_order"))?
769            .to_string();
770
771        // Extract quantity - if not provided, get from order in cache
772        let quantity = if let Some(qty) = cmd.quantity {
773            qty
774        } else {
775            // Get order from cache to use its current quantity
776            let cache = self.core.cache();
777            let order = cache
778                .order(&cmd.client_order_id)
779                .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
780            order.quantity()
781        };
782
783        let price = cmd
784            .price
785            .ok_or_else(|| anyhow::anyhow!("price required for modify_order"))?;
786
787        let client_order_id = cmd.client_order_id;
788        let trader_id = cmd.trader_id;
789        let strategy_id = cmd.strategy_id;
790        let instrument_id = cmd.instrument_id;
791        let venue_order_id = cmd.venue_order_id;
792        let emitter = self.emitter.clone();
793        let clock = self.clock;
794
795        log::info!(
796            "Modifying order: order_id={order_id}, quantity={quantity}, price={price}, client_order_id={client_order_id}"
797        );
798
799        // Spawn async task to send modify via WebSocket
800        self.spawn_task("modify_order", async move {
801            if let Err(e) = ws_client
802                .modify_order(
803                    &order_id,
804                    quantity,
805                    price,
806                    client_order_id,
807                    trader_id,
808                    strategy_id,
809                    instrument_id,
810                )
811                .await
812            {
813                log::error!(
814                    "Modify order failed: order_id={order_id}, client_order_id={client_order_id}, error={e}"
815                );
816
817                let ts_event = clock.get_time_ns();
818                emitter.emit_order_modify_rejected_event(
819                    strategy_id,
820                    instrument_id,
821                    client_order_id,
822                    venue_order_id,
823                    &format!("modify-order-error: {e}"),
824                    ts_event,
825                );
826
827                anyhow::bail!("Modify order failed: {e}");
828            }
829            Ok(())
830        });
831
832        Ok(())
833    }
834
835    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
836        let ws_client = self.ws_client.clone();
837
838        // Extract venue order ID (Deribit's order_id)
839        let order_id = cmd
840            .venue_order_id
841            .as_ref()
842            .ok_or_else(|| anyhow::anyhow!("venue_order_id required for cancel_order"))?
843            .to_string();
844
845        let client_order_id = cmd.client_order_id;
846        let trader_id = cmd.trader_id;
847        let strategy_id = cmd.strategy_id;
848        let instrument_id = cmd.instrument_id;
849        let venue_order_id = cmd.venue_order_id;
850        let emitter = self.emitter.clone();
851        let clock = self.clock;
852
853        log::info!("Canceling order: order_id={order_id}, client_order_id={client_order_id}");
854
855        // Spawn async task to send cancel via WebSocket
856        self.spawn_task("cancel_order", async move {
857            if let Err(e) = ws_client
858                .cancel_order(
859                    &order_id,
860                    client_order_id,
861                    trader_id,
862                    strategy_id,
863                    instrument_id,
864                )
865                .await
866            {
867                log::error!(
868                    "Cancel order failed: order_id={order_id}, client_order_id={client_order_id}, error={e}"
869                );
870
871                let ts_event = clock.get_time_ns();
872                emitter.emit_order_cancel_rejected_event(
873                    strategy_id,
874                    instrument_id,
875                    client_order_id,
876                    venue_order_id,
877                    &format!("cancel-order-error: {e}"),
878                    ts_event,
879                );
880
881                anyhow::bail!("Cancel order failed: {e}");
882            }
883            Ok(())
884        });
885
886        Ok(())
887    }
888
889    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
890        let instrument_id = cmd.instrument_id;
891
892        // If NoOrderSide, use efficient bulk cancel via Deribit API
893        if cmd.order_side == OrderSide::NoOrderSide {
894            log::info!(
895                "Cancelling all orders: instrument={instrument_id}, order_side=NoOrderSide (bulk)"
896            );
897
898            let ws_client = self.ws_client.clone();
899            self.spawn_task("cancel_all_orders", async move {
900                if let Err(e) = ws_client.cancel_all_orders(instrument_id, None).await {
901                    log::error!("Cancel all orders failed for instrument {instrument_id}: {e}");
902                    anyhow::bail!("Cancel all orders failed: {e}");
903                }
904                Ok(())
905            });
906
907            return Ok(());
908        }
909
910        // For specific side (Buy/Sell), filter from cache and cancel individually
911        // Deribit API doesn't support side filtering, so we implement it locally
912        log::info!(
913            "Cancelling orders by side: instrument={}, order_side={}",
914            instrument_id,
915            cmd.order_side
916        );
917
918        let orders_to_cancel: Vec<_> = {
919            let cache = self.core.cache();
920            let open_orders = cache.orders_open(None, Some(&instrument_id), None, None, None);
921
922            open_orders
923                .into_iter()
924                .filter(|order| order.order_side() == cmd.order_side)
925                .filter_map(|order| {
926                    let venue_order_id = order.venue_order_id()?;
927                    Some((
928                        venue_order_id.to_string(),
929                        order.client_order_id(),
930                        order.instrument_id(),
931                        Some(venue_order_id),
932                    ))
933                })
934                .collect()
935        };
936
937        if orders_to_cancel.is_empty() {
938            log::debug!(
939                "No open {} orders to cancel for {}",
940                cmd.order_side,
941                instrument_id
942            );
943            return Ok(());
944        }
945
946        log::info!(
947            "Cancelling {} {} orders for {}",
948            orders_to_cancel.len(),
949            cmd.order_side,
950            instrument_id
951        );
952
953        // Cancel each matching order individually
954        for (venue_order_id_str, client_order_id, order_instrument_id, venue_order_id) in
955            orders_to_cancel
956        {
957            let ws_client = self.ws_client.clone();
958            let trader_id = cmd.trader_id;
959            let strategy_id = cmd.strategy_id;
960            let emitter = self.emitter.clone();
961            let clock = self.clock;
962
963            self.spawn_task("cancel_order_by_side", async move {
964                if let Err(e) = ws_client
965                    .cancel_order(
966                        &venue_order_id_str,
967                        client_order_id,
968                        trader_id,
969                        strategy_id,
970                        order_instrument_id,
971                    )
972                    .await
973                {
974                    log::error!(
975                        "Cancel order failed: order_id={venue_order_id_str}, client_order_id={client_order_id}, error={e}"
976                    );
977
978                    let ts_event = clock.get_time_ns();
979                    emitter.emit_order_cancel_rejected_event(
980                        strategy_id,
981                        order_instrument_id,
982                        client_order_id,
983                        venue_order_id,
984                        &format!("cancel-order-error: {e}"),
985                        ts_event,
986                    );
987                }
988                Ok(())
989            });
990        }
991
992        Ok(())
993    }
994
995    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
996        if cmd.cancels.is_empty() {
997            log::debug!("batch_cancel_orders called with empty cancels list");
998            return Ok(());
999        }
1000
1001        log::info!(
1002            "Batch cancelling {} orders for instrument={}",
1003            cmd.cancels.len(),
1004            cmd.instrument_id
1005        );
1006
1007        // Deribit doesn't have native batch cancel by order ID
1008        // Loop through and cancel each order individually
1009        for cancel in &cmd.cancels {
1010            let order_id = match &cancel.venue_order_id {
1011                Some(id) => id.to_string(),
1012                None => {
1013                    log::warn!(
1014                        "Cannot cancel order {} - no venue_order_id",
1015                        cancel.client_order_id
1016                    );
1017
1018                    // Emit OrderCancelRejected event for missing venue_order_id
1019                    let ts_event = self.clock.get_time_ns();
1020                    self.emitter.emit_order_cancel_rejected_event(
1021                        cancel.strategy_id,
1022                        cancel.instrument_id,
1023                        cancel.client_order_id,
1024                        None,
1025                        "venue_order_id required for cancel",
1026                        ts_event,
1027                    );
1028                    continue;
1029                }
1030            };
1031
1032            let ws_client = self.ws_client.clone();
1033            let emitter = self.emitter.clone();
1034            let clock = self.clock;
1035            let client_order_id = cancel.client_order_id;
1036            let trader_id = cancel.trader_id;
1037            let strategy_id = cancel.strategy_id;
1038            let instrument_id = cancel.instrument_id;
1039
1040            self.spawn_task("batch_cancel_order", async move {
1041                if let Err(e) = ws_client
1042                    .cancel_order(
1043                        &order_id,
1044                        client_order_id,
1045                        trader_id,
1046                        strategy_id,
1047                        instrument_id,
1048                    )
1049                    .await
1050                {
1051                    log::error!(
1052                        "Batch cancel order failed: order_id={order_id}, client_order_id={client_order_id}, error={e}"
1053                    );
1054
1055                    let ts_event = clock.get_time_ns();
1056                    emitter.emit_order_cancel_rejected_event(
1057                        strategy_id,
1058                        instrument_id,
1059                        client_order_id,
1060                        None,
1061                        &format!("batch-cancel-error: {e}"),
1062                        ts_event,
1063                    );
1064
1065                    anyhow::bail!("Batch cancel order failed: {e}");
1066                }
1067                Ok(())
1068            });
1069        }
1070
1071        Ok(())
1072    }
1073}
1074
1075/// Dispatches a WebSocket message using the event emitter.
1076fn dispatch_ws_message(message: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
1077    match message {
1078        NautilusWsMessage::AccountState(state) => {
1079            emitter.send_account_state(state);
1080        }
1081        NautilusWsMessage::OrderStatusReports(reports) => {
1082            log::debug!("Processing {} order status report(s)", reports.len());
1083            for report in reports {
1084                emitter.send_order_status_report(report);
1085            }
1086        }
1087        NautilusWsMessage::FillReports(reports) => {
1088            log::debug!("Processing {} fill report(s)", reports.len());
1089            for report in reports {
1090                emitter.send_fill_report(report);
1091            }
1092        }
1093        NautilusWsMessage::OrderRejected(event) => {
1094            emitter.send_order_event(OrderEventAny::Rejected(event));
1095        }
1096        NautilusWsMessage::OrderAccepted(event) => {
1097            emitter.send_order_event(OrderEventAny::Accepted(event));
1098        }
1099        NautilusWsMessage::OrderCanceled(event) => {
1100            emitter.send_order_event(OrderEventAny::Canceled(event));
1101        }
1102        NautilusWsMessage::OrderExpired(event) => {
1103            emitter.send_order_event(OrderEventAny::Expired(event));
1104        }
1105        NautilusWsMessage::OrderUpdated(event) => {
1106            emitter.send_order_event(OrderEventAny::Updated(event));
1107        }
1108        NautilusWsMessage::OrderCancelRejected(event) => {
1109            emitter.send_order_event(OrderEventAny::CancelRejected(event));
1110        }
1111        NautilusWsMessage::OrderModifyRejected(event) => {
1112            emitter.send_order_event(OrderEventAny::ModifyRejected(event));
1113        }
1114        NautilusWsMessage::Error(e) => {
1115            log::warn!("WebSocket error: {e}");
1116        }
1117        NautilusWsMessage::Reconnected => {
1118            log::info!("WebSocket reconnected");
1119        }
1120        NautilusWsMessage::Authenticated(auth) => {
1121            log::debug!("WebSocket authenticated: scope={}", auth.scope);
1122        }
1123        NautilusWsMessage::AuthenticationFailed(reason) => {
1124            log::error!("Authentication failed in execution client: {reason}");
1125        }
1126        NautilusWsMessage::Data(_)
1127        | NautilusWsMessage::Deltas(_)
1128        | NautilusWsMessage::Instrument(_)
1129        | NautilusWsMessage::InstrumentStatus(_)
1130        | NautilusWsMessage::FundingRates(_)
1131        | NautilusWsMessage::OptionGreeks(_)
1132        | NautilusWsMessage::Raw(_) => {
1133            // Data messages are handled by the data client, not execution
1134            log::trace!("Ignoring data message in execution client");
1135        }
1136    }
1137}