1use std::{future::Future, sync::Mutex};
19
20use anyhow::Context;
21use async_trait::async_trait;
22use futures_util::{StreamExt, pin_mut};
23use nautilus_common::{
24 clients::ExecutionClient,
25 live::{get_runtime, runner::get_exec_event_sender},
26 messages::execution::{
27 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
28 GenerateFillReportsBuilder, GenerateOrderStatusReport, GenerateOrderStatusReports,
29 GenerateOrderStatusReportsBuilder, GeneratePositionStatusReports,
30 GeneratePositionStatusReportsBuilder, ModifyOrder, QueryAccount, QueryOrder, SubmitOrder,
31 SubmitOrderList,
32 },
33};
34use nautilus_core::{
35 MUTEX_POISONED, UnixNanos,
36 datetime::NANOSECONDS_IN_SECOND,
37 time::{AtomicTime, get_atomic_clock_realtime},
38};
39use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
40use nautilus_model::{
41 accounts::AccountAny,
42 enums::{AccountType, OmsType, OrderSide, OrderType, TimeInForce},
43 events::OrderEventAny,
44 identifiers::{AccountId, ClientId, Venue},
45 orders::{Order, OrderAny},
46 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47 types::{AccountBalance, MarginBalance},
48};
49use tokio::task::JoinHandle;
50
51use crate::{
52 common::{
53 consts::{DERIBIT_VENUE, DERIBIT_WS_HEARTBEAT_SECS},
54 enums::resolve_trigger_type,
55 },
56 config::DeribitExecClientConfig,
57 http::{client::DeribitHttpClient, models::DeribitCurrency, query::GetOrderStateParams},
58 websocket::{
59 auth::DERIBIT_EXECUTION_SESSION_NAME,
60 client::DeribitWebSocketClient,
61 messages::{DeribitOrderParams, NautilusWsMessage},
62 parse::parse_user_order_msg,
63 },
64};
65
66#[derive(Debug)]
68pub struct DeribitExecutionClient {
69 core: ExecutionClientCore,
70 clock: &'static AtomicTime,
71 config: DeribitExecClientConfig,
72 emitter: ExecutionEventEmitter,
73 http_client: DeribitHttpClient,
74 ws_client: DeribitWebSocketClient,
75 ws_stream_handle: Option<JoinHandle<()>>,
76 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
77}
78
79impl DeribitExecutionClient {
80 pub fn new(core: ExecutionClientCore, config: DeribitExecClientConfig) -> anyhow::Result<Self> {
86 let http_client = if config.has_api_credentials() {
87 DeribitHttpClient::new_with_env(
88 config.api_key.clone(),
89 config.api_secret.clone(),
90 config.base_url_http.clone(),
91 config.environment,
92 config.http_timeout_secs,
93 config.max_retries,
94 config.retry_delay_initial_ms,
95 config.retry_delay_max_ms,
96 config.proxy_url.clone(),
97 )?
98 } else {
99 DeribitHttpClient::new(
100 config.base_url_http.clone(),
101 config.environment,
102 config.http_timeout_secs,
103 config.max_retries,
104 config.retry_delay_initial_ms,
105 config.retry_delay_max_ms,
106 config.proxy_url.clone(),
107 )?
108 };
109
110 let mut ws_client = DeribitWebSocketClient::new(
111 config.base_url_ws.clone(),
112 config.api_key.clone(),
113 config.api_secret.clone(),
114 DERIBIT_WS_HEARTBEAT_SECS,
115 config.environment,
116 config.transport_backend,
117 config.proxy_url.clone(),
118 )
119 .context("failed to create WebSocket client for execution")?;
120 ws_client.set_account_id(core.account_id);
122
123 let clock = get_atomic_clock_realtime();
124 let emitter = ExecutionEventEmitter::new(
125 clock,
126 core.trader_id,
127 core.account_id,
128 AccountType::Margin,
129 None,
130 );
131
132 Ok(Self {
133 core,
134 clock,
135 config,
136 emitter,
137 http_client,
138 ws_client,
139 ws_stream_handle: None,
140 pending_tasks: Mutex::new(Vec::new()),
141 })
142 }
143
144 fn spawn_task<F>(&self, description: &'static str, fut: F)
146 where
147 F: Future<Output = anyhow::Result<()>> + Send + 'static,
148 {
149 let runtime = get_runtime();
150 let handle = runtime.spawn(async move {
151 if let Err(e) = fut.await {
152 log::warn!("{description} failed: {e:?}");
153 }
154 });
155
156 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
157 tasks.retain(|handle| !handle.is_finished());
158 tasks.push(handle);
159 }
160
161 fn abort_pending_tasks(&self) {
163 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
164 for handle in tasks.drain(..) {
165 handle.abort();
166 }
167 }
168
169 fn build_order_params(order: &dyn Order) -> anyhow::Result<DeribitOrderParams> {
171 let order_type = match order.order_type() {
172 OrderType::Limit => "limit",
173 OrderType::Market => "market",
174 OrderType::StopLimit => "stop_limit",
175 OrderType::StopMarket => "stop_market",
176 OrderType::LimitIfTouched => "take_limit",
177 OrderType::MarketIfTouched => "take_market",
178 other => {
179 anyhow::bail!("Unsupported order type {other:?} for Deribit");
180 }
181 }
182 .to_string();
183
184 let time_in_force = Some(
185 match order.time_in_force() {
186 TimeInForce::Gtc => "good_til_cancelled",
187 TimeInForce::Ioc => "immediate_or_cancel",
188 TimeInForce::Fok => "fill_or_kill",
189 TimeInForce::Gtd => {
190 if order.expire_time().is_some() {
191 log::warn!(
192 "Deribit GTD orders expire at 8:00 UTC only - custom expire_time is ignored. \
193 For custom expiry times, use managed GTD with emulation_trigger"
194 );
195 }
196 "good_til_day"
197 }
198 other => {
199 anyhow::bail!("Unsupported time_in_force {other:?} for Deribit");
200 }
201 }
202 .to_string(),
203 );
204
205 let valid_until = None;
208
209 let trigger = resolve_trigger_type(order.trigger_type());
210
211 Ok(DeribitOrderParams {
212 instrument_name: order.instrument_id().symbol.to_string(),
213 amount: order.quantity().as_decimal(),
214 order_type,
215 label: Some(order.client_order_id().to_string()),
216 price: order.price().map(|p| p.as_decimal()),
217 time_in_force,
218 post_only: if order.is_post_only() {
219 Some(true)
220 } else {
221 None
222 },
223 reject_post_only: if order.is_post_only() {
224 Some(true)
225 } else {
226 None
227 },
228 reduce_only: if order.is_reduce_only() {
229 Some(true)
230 } else {
231 None
232 },
233 trigger_price: order.trigger_price().map(|p| p.as_decimal()),
234 trigger,
235 max_show: None,
236 valid_until,
237 })
238 }
239
240 fn submit_single_order(&self, order: &OrderAny, task_name: &'static str) {
244 if order.is_closed() {
245 log::warn!("Cannot submit closed order {}", order.client_order_id());
246 return;
247 }
248
249 let params = match Self::build_order_params(order) {
250 Ok(params) => params,
251 Err(e) => {
252 let ts_event = self.clock.get_time_ns();
253 self.emitter.emit_order_rejected_event(
254 order.strategy_id(),
255 order.instrument_id(),
256 order.client_order_id(),
257 &format!("{e}"),
258 ts_event,
259 false,
260 );
261 return;
262 }
263 };
264 let client_order_id = order.client_order_id();
265 let trader_id = order.trader_id();
266 let strategy_id = order.strategy_id();
267 let instrument_id = order.instrument_id();
268 let order_side = order.order_side();
269
270 log::debug!("OrderSubmitted client_order_id={client_order_id}");
271 self.emitter.emit_order_submitted(order);
272
273 let ws_client = self.ws_client.clone();
274 let emitter = self.emitter.clone();
275 let clock = self.clock;
276
277 self.spawn_task(task_name, async move {
278 let result = ws_client
279 .submit_order(
280 order_side,
281 params,
282 client_order_id,
283 trader_id,
284 strategy_id,
285 instrument_id,
286 )
287 .await;
288
289 if let Err(e) = result {
290 let ts_event = clock.get_time_ns();
291 emitter.emit_order_rejected_event(
292 strategy_id,
293 instrument_id,
294 client_order_id,
295 &format!("{task_name}-error: {e}"),
296 ts_event,
297 false,
298 );
299 return Err(e.into());
300 }
301
302 Ok(())
303 });
304 }
305
306 fn spawn_stream_handler(
308 &mut self,
309 stream: impl futures_util::Stream<Item = NautilusWsMessage> + Send + 'static,
310 ) {
311 if self.ws_stream_handle.is_some() {
312 return;
313 }
314
315 let emitter = self.emitter.clone();
316
317 let handle = get_runtime().spawn(async move {
318 pin_mut!(stream);
319 while let Some(message) = stream.next().await {
320 dispatch_ws_message(message, &emitter);
321 }
322 });
323
324 self.ws_stream_handle = Some(handle);
325 log::info!("WebSocket stream handler started");
326 }
327}
328
329#[async_trait(?Send)]
330impl ExecutionClient for DeribitExecutionClient {
331 fn is_connected(&self) -> bool {
332 self.core.is_connected()
333 }
334
335 fn client_id(&self) -> ClientId {
336 self.core.client_id
337 }
338
339 fn account_id(&self) -> AccountId {
340 self.core.account_id
341 }
342
343 fn venue(&self) -> Venue {
344 *DERIBIT_VENUE
345 }
346
347 fn oms_type(&self) -> OmsType {
348 self.core.oms_type
349 }
350
351 fn get_account(&self) -> Option<AccountAny> {
352 self.core.cache().account(&self.core.account_id).cloned()
353 }
354
355 fn generate_account_state(
356 &self,
357 balances: Vec<AccountBalance>,
358 margins: Vec<MarginBalance>,
359 reported: bool,
360 ts_event: UnixNanos,
361 ) -> anyhow::Result<()> {
362 self.emitter
363 .emit_account_state(balances, margins, reported, ts_event);
364 Ok(())
365 }
366
367 fn start(&mut self) -> anyhow::Result<()> {
368 if self.core.is_started() {
369 return Ok(());
370 }
371
372 let sender = get_exec_event_sender();
373 self.emitter.set_sender(sender);
374 self.core.set_started();
375
376 log::info!(
377 "Started: client_id={}, account_id={}, account_type={:?}, product_types={:?}, environment={}",
378 self.core.client_id,
379 self.core.account_id,
380 self.core.account_type,
381 self.config.product_types,
382 self.config.environment
383 );
384 Ok(())
385 }
386
387 fn stop(&mut self) -> anyhow::Result<()> {
388 if self.core.is_stopped() {
389 return Ok(());
390 }
391
392 self.core.set_stopped();
393 self.core.set_disconnected();
394 self.abort_pending_tasks();
395 log::info!("Stopped: client_id={}", self.core.client_id);
396 Ok(())
397 }
398
399 async fn connect(&mut self) -> anyhow::Result<()> {
400 if self.core.is_connected() {
401 return Ok(());
402 }
403
404 if !self.config.has_api_credentials() {
406 anyhow::bail!("Missing API credentials; set Deribit environment variables");
407 }
408
409 self.ws_client.set_account_id(self.core.account_id);
411
412 if !self.core.instruments_initialized() {
414 for product_type in &self.config.product_types {
415 let instruments = self
416 .http_client
417 .request_instruments(DeribitCurrency::ANY, Some(*product_type))
418 .await
419 .with_context(|| {
420 format!("failed to request instruments for {product_type:?}")
421 })?;
422
423 if instruments.is_empty() {
424 log::warn!("No instruments returned for {product_type:?}");
425 continue;
426 }
427
428 log::info!("Fetched {} {product_type:?} instruments", instruments.len());
429 self.ws_client.cache_instruments(&instruments);
430 self.http_client.cache_instruments(&instruments);
431 }
432 self.core.set_instruments_initialized();
433 }
434
435 let account_state = self
437 .http_client
438 .request_account_state(self.core.account_id)
439 .await
440 .context("failed to request account state")?;
441
442 self.emitter.send_account_state(account_state);
443
444 self.ws_client
445 .connect()
446 .await
447 .context("failed to connect WebSocket client for execution")?;
448
449 self.ws_client
450 .authenticate_session(DERIBIT_EXECUTION_SESSION_NAME)
451 .await
452 .map_err(|e| anyhow::anyhow!("failed to authenticate WebSocket session: {e}"))?;
453
454 log::info!("WebSocket client authenticated for execution");
455
456 self.ws_client
458 .subscribe_user_orders()
459 .await
460 .map_err(|e| anyhow::anyhow!("failed to subscribe to user orders: {e}"))?;
461 self.ws_client
462 .subscribe_user_trades()
463 .await
464 .map_err(|e| anyhow::anyhow!("failed to subscribe to user trades: {e}"))?;
465 self.ws_client
466 .subscribe_user_portfolio()
467 .await
468 .map_err(|e| anyhow::anyhow!("failed to subscribe to user portfolio: {e}"))?;
469
470 if let Err(e) = self.ws_client.wait_for_subscriptions_confirmed(30.0).await {
471 let _ = self.ws_client.unsubscribe_user_orders().await;
473 let _ = self.ws_client.unsubscribe_user_trades().await;
474 let _ = self.ws_client.unsubscribe_user_portfolio().await;
475 anyhow::bail!("subscription confirmation failed: {e}");
476 }
477
478 log::info!("Subscribed to user order, trade, and portfolio updates");
479
480 let stream = self.ws_client.stream()?;
482 self.spawn_stream_handler(stream);
483
484 self.core.set_connected();
485 log::info!("Connected: client_id={}", self.core.client_id);
486 Ok(())
487 }
488
489 async fn disconnect(&mut self) -> anyhow::Result<()> {
490 if self.core.is_disconnected() {
491 return Ok(());
492 }
493
494 self.abort_pending_tasks();
495
496 if let Some(handle) = self.ws_stream_handle.take() {
498 handle.abort();
499 }
500
501 if let Err(e) = self.ws_client.close().await {
503 log::warn!("Error closing WebSocket client: {e}");
504 }
505
506 self.core.set_disconnected();
507 log::info!("Disconnected: client_id={}", self.core.client_id);
508 Ok(())
509 }
510
511 async fn generate_order_status_report(
512 &self,
513 cmd: &GenerateOrderStatusReport,
514 ) -> anyhow::Result<Option<OrderStatusReport>> {
515 if let Some(venue_order_id) = &cmd.venue_order_id {
517 let params = GetOrderStateParams {
518 order_id: venue_order_id.to_string(),
519 };
520 let ts_init = self.clock.get_time_ns();
521
522 match self.http_client.inner.get_order_state(params).await {
523 Ok(response) => {
524 if let Some(order) = response.result {
525 let symbol = ustr::Ustr::from(&order.instrument_name);
526 if let Some(instrument) = self.http_client.get_instrument(&symbol) {
527 let report = parse_user_order_msg(
528 &order,
529 &instrument,
530 self.core.account_id,
531 ts_init,
532 )?;
533 return Ok(Some(report));
534 } else {
535 log::warn!(
536 "Instrument {} not in cache for order {}",
537 order.instrument_name,
538 order.order_id
539 );
540 }
541 }
542 }
543 Err(e) => {
544 log::warn!("Failed to get order state: {e}");
545 }
546 }
547 return Ok(None);
548 }
549
550 if let Some(client_order_id) = &cmd.client_order_id {
552 let reports = self
553 .http_client
554 .request_order_status_reports(
555 self.core.account_id,
556 cmd.instrument_id,
557 None,
558 None,
559 false, )
561 .await?;
562
563 for report in reports {
565 if report.client_order_id == Some(*client_order_id) {
566 return Ok(Some(report));
567 }
568 }
569 }
570
571 Ok(None)
572 }
573
574 async fn generate_order_status_reports(
575 &self,
576 cmd: &GenerateOrderStatusReports,
577 ) -> anyhow::Result<Vec<OrderStatusReport>> {
578 self.http_client
579 .request_order_status_reports(
580 self.core.account_id,
581 cmd.instrument_id,
582 cmd.start,
583 cmd.end,
584 cmd.open_only,
585 )
586 .await
587 }
588
589 async fn generate_fill_reports(
590 &self,
591 cmd: GenerateFillReports,
592 ) -> anyhow::Result<Vec<FillReport>> {
593 let mut reports = self
594 .http_client
595 .request_fill_reports(self.core.account_id, cmd.instrument_id, cmd.start, cmd.end)
596 .await?;
597
598 if let Some(venue_order_id) = &cmd.venue_order_id {
600 reports.retain(|r| r.venue_order_id == *venue_order_id);
601 }
602
603 Ok(reports)
604 }
605
606 async fn generate_position_status_reports(
607 &self,
608 cmd: &GeneratePositionStatusReports,
609 ) -> anyhow::Result<Vec<PositionStatusReport>> {
610 self.http_client
611 .request_position_status_reports(self.core.account_id, cmd.instrument_id)
612 .await
613 }
614
615 async fn generate_mass_status(
616 &self,
617 lookback_mins: Option<u64>,
618 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
619 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
620 let ts_now = self.clock.get_time_ns();
621 let start = lookback_mins.map(|mins| {
622 let lookback_ns = mins
623 .saturating_mul(60)
624 .saturating_mul(NANOSECONDS_IN_SECOND);
625 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
626 });
627
628 let order_cmd = GenerateOrderStatusReportsBuilder::default()
629 .ts_init(ts_now)
630 .open_only(false) .start(start)
632 .build()
633 .context("Failed to build GenerateOrderStatusReports")?;
634
635 let fill_cmd = GenerateFillReportsBuilder::default()
636 .ts_init(ts_now)
637 .start(start)
638 .build()
639 .context("Failed to build GenerateFillReports")?;
640
641 let position_cmd = GeneratePositionStatusReportsBuilder::default()
642 .ts_init(ts_now)
643 .start(start)
644 .build()
645 .context("Failed to build GeneratePositionStatusReports")?;
646
647 let (order_reports, fill_reports, position_reports) = tokio::try_join!(
648 self.generate_order_status_reports(&order_cmd),
649 self.generate_fill_reports(fill_cmd),
650 self.generate_position_status_reports(&position_cmd),
651 )?;
652
653 log::info!("Received {} OrderStatusReports", order_reports.len());
654 log::info!("Received {} FillReports", fill_reports.len());
655 log::info!("Received {} PositionReports", position_reports.len());
656
657 let mut mass_status = ExecutionMassStatus::new(
658 self.core.client_id,
659 self.core.account_id,
660 *DERIBIT_VENUE,
661 ts_now,
662 None,
663 );
664
665 mass_status.add_order_reports(order_reports);
666 mass_status.add_fill_reports(fill_reports);
667 mass_status.add_position_reports(position_reports);
668
669 Ok(Some(mass_status))
670 }
671
672 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
673 let http_client = self.http_client.clone();
674 let account_id = self.core.account_id;
675 let emitter = self.emitter.clone();
676
677 self.spawn_task("query_account", async move {
678 let account_state = http_client
679 .request_account_state(account_id)
680 .await
681 .context("failed to query account state (check API credentials are valid)")?;
682
683 emitter.send_account_state(account_state);
684 Ok(())
685 });
686
687 Ok(())
688 }
689
690 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
691 let ws_client = self.ws_client.clone();
692
693 let order_id = cmd
695 .venue_order_id
696 .as_ref()
697 .ok_or_else(|| anyhow::anyhow!("venue_order_id required for query_order"))?
698 .to_string();
699
700 let client_order_id = cmd.client_order_id;
701 let trader_id = cmd.trader_id;
702 let strategy_id = cmd.strategy_id;
703 let instrument_id = cmd.instrument_id;
704
705 log::info!("Querying order state: order_id={order_id}, client_order_id={client_order_id}");
706
707 self.spawn_task("query_order", async move {
710 ws_client
711 .query_order(
712 &order_id,
713 client_order_id,
714 trader_id,
715 strategy_id,
716 instrument_id,
717 )
718 .await
719 .map_err(|e| anyhow::anyhow!("Query order state failed: {e}"))?;
720 Ok(())
721 });
722
723 Ok(())
724 }
725
726 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
727 let order = self
728 .core
729 .cache()
730 .order(&cmd.client_order_id)
731 .cloned()
732 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
733 self.submit_single_order(&order, "submit_order");
734 Ok(())
735 }
736
737 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
738 if cmd.order_list.client_order_ids.is_empty() {
739 log::debug!("submit_order_list called with empty order list");
740 return Ok(());
741 }
742
743 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
744
745 log::info!(
746 "Submitting order list {} with {} orders for instrument={}",
747 cmd.order_list.id,
748 orders.len(),
749 cmd.instrument_id
750 );
751
752 for order in &orders {
755 self.submit_single_order(order, "submit_order_list_item");
756 }
757
758 Ok(())
759 }
760
761 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
762 let ws_client = self.ws_client.clone();
763
764 let order_id = cmd
766 .venue_order_id
767 .as_ref()
768 .ok_or_else(|| anyhow::anyhow!("venue_order_id required for modify_order"))?
769 .to_string();
770
771 let quantity = if let Some(qty) = cmd.quantity {
773 qty
774 } else {
775 let cache = self.core.cache();
777 let order = cache
778 .order(&cmd.client_order_id)
779 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
780 order.quantity()
781 };
782
783 let price = cmd
784 .price
785 .ok_or_else(|| anyhow::anyhow!("price required for modify_order"))?;
786
787 let client_order_id = cmd.client_order_id;
788 let trader_id = cmd.trader_id;
789 let strategy_id = cmd.strategy_id;
790 let instrument_id = cmd.instrument_id;
791 let venue_order_id = cmd.venue_order_id;
792 let emitter = self.emitter.clone();
793 let clock = self.clock;
794
795 log::info!(
796 "Modifying order: order_id={order_id}, quantity={quantity}, price={price}, client_order_id={client_order_id}"
797 );
798
799 self.spawn_task("modify_order", async move {
801 if let Err(e) = ws_client
802 .modify_order(
803 &order_id,
804 quantity,
805 price,
806 client_order_id,
807 trader_id,
808 strategy_id,
809 instrument_id,
810 )
811 .await
812 {
813 log::error!(
814 "Modify order failed: order_id={order_id}, client_order_id={client_order_id}, error={e}"
815 );
816
817 let ts_event = clock.get_time_ns();
818 emitter.emit_order_modify_rejected_event(
819 strategy_id,
820 instrument_id,
821 client_order_id,
822 venue_order_id,
823 &format!("modify-order-error: {e}"),
824 ts_event,
825 );
826
827 anyhow::bail!("Modify order failed: {e}");
828 }
829 Ok(())
830 });
831
832 Ok(())
833 }
834
835 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
836 let ws_client = self.ws_client.clone();
837
838 let order_id = cmd
840 .venue_order_id
841 .as_ref()
842 .ok_or_else(|| anyhow::anyhow!("venue_order_id required for cancel_order"))?
843 .to_string();
844
845 let client_order_id = cmd.client_order_id;
846 let trader_id = cmd.trader_id;
847 let strategy_id = cmd.strategy_id;
848 let instrument_id = cmd.instrument_id;
849 let venue_order_id = cmd.venue_order_id;
850 let emitter = self.emitter.clone();
851 let clock = self.clock;
852
853 log::info!("Canceling order: order_id={order_id}, client_order_id={client_order_id}");
854
855 self.spawn_task("cancel_order", async move {
857 if let Err(e) = ws_client
858 .cancel_order(
859 &order_id,
860 client_order_id,
861 trader_id,
862 strategy_id,
863 instrument_id,
864 )
865 .await
866 {
867 log::error!(
868 "Cancel order failed: order_id={order_id}, client_order_id={client_order_id}, error={e}"
869 );
870
871 let ts_event = clock.get_time_ns();
872 emitter.emit_order_cancel_rejected_event(
873 strategy_id,
874 instrument_id,
875 client_order_id,
876 venue_order_id,
877 &format!("cancel-order-error: {e}"),
878 ts_event,
879 );
880
881 anyhow::bail!("Cancel order failed: {e}");
882 }
883 Ok(())
884 });
885
886 Ok(())
887 }
888
889 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
890 let instrument_id = cmd.instrument_id;
891
892 if cmd.order_side == OrderSide::NoOrderSide {
894 log::info!(
895 "Cancelling all orders: instrument={instrument_id}, order_side=NoOrderSide (bulk)"
896 );
897
898 let ws_client = self.ws_client.clone();
899 self.spawn_task("cancel_all_orders", async move {
900 if let Err(e) = ws_client.cancel_all_orders(instrument_id, None).await {
901 log::error!("Cancel all orders failed for instrument {instrument_id}: {e}");
902 anyhow::bail!("Cancel all orders failed: {e}");
903 }
904 Ok(())
905 });
906
907 return Ok(());
908 }
909
910 log::info!(
913 "Cancelling orders by side: instrument={}, order_side={}",
914 instrument_id,
915 cmd.order_side
916 );
917
918 let orders_to_cancel: Vec<_> = {
919 let cache = self.core.cache();
920 let open_orders = cache.orders_open(None, Some(&instrument_id), None, None, None);
921
922 open_orders
923 .into_iter()
924 .filter(|order| order.order_side() == cmd.order_side)
925 .filter_map(|order| {
926 let venue_order_id = order.venue_order_id()?;
927 Some((
928 venue_order_id.to_string(),
929 order.client_order_id(),
930 order.instrument_id(),
931 Some(venue_order_id),
932 ))
933 })
934 .collect()
935 };
936
937 if orders_to_cancel.is_empty() {
938 log::debug!(
939 "No open {} orders to cancel for {}",
940 cmd.order_side,
941 instrument_id
942 );
943 return Ok(());
944 }
945
946 log::info!(
947 "Cancelling {} {} orders for {}",
948 orders_to_cancel.len(),
949 cmd.order_side,
950 instrument_id
951 );
952
953 for (venue_order_id_str, client_order_id, order_instrument_id, venue_order_id) in
955 orders_to_cancel
956 {
957 let ws_client = self.ws_client.clone();
958 let trader_id = cmd.trader_id;
959 let strategy_id = cmd.strategy_id;
960 let emitter = self.emitter.clone();
961 let clock = self.clock;
962
963 self.spawn_task("cancel_order_by_side", async move {
964 if let Err(e) = ws_client
965 .cancel_order(
966 &venue_order_id_str,
967 client_order_id,
968 trader_id,
969 strategy_id,
970 order_instrument_id,
971 )
972 .await
973 {
974 log::error!(
975 "Cancel order failed: order_id={venue_order_id_str}, client_order_id={client_order_id}, error={e}"
976 );
977
978 let ts_event = clock.get_time_ns();
979 emitter.emit_order_cancel_rejected_event(
980 strategy_id,
981 order_instrument_id,
982 client_order_id,
983 venue_order_id,
984 &format!("cancel-order-error: {e}"),
985 ts_event,
986 );
987 }
988 Ok(())
989 });
990 }
991
992 Ok(())
993 }
994
995 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
996 if cmd.cancels.is_empty() {
997 log::debug!("batch_cancel_orders called with empty cancels list");
998 return Ok(());
999 }
1000
1001 log::info!(
1002 "Batch cancelling {} orders for instrument={}",
1003 cmd.cancels.len(),
1004 cmd.instrument_id
1005 );
1006
1007 for cancel in &cmd.cancels {
1010 let order_id = match &cancel.venue_order_id {
1011 Some(id) => id.to_string(),
1012 None => {
1013 log::warn!(
1014 "Cannot cancel order {} - no venue_order_id",
1015 cancel.client_order_id
1016 );
1017
1018 let ts_event = self.clock.get_time_ns();
1020 self.emitter.emit_order_cancel_rejected_event(
1021 cancel.strategy_id,
1022 cancel.instrument_id,
1023 cancel.client_order_id,
1024 None,
1025 "venue_order_id required for cancel",
1026 ts_event,
1027 );
1028 continue;
1029 }
1030 };
1031
1032 let ws_client = self.ws_client.clone();
1033 let emitter = self.emitter.clone();
1034 let clock = self.clock;
1035 let client_order_id = cancel.client_order_id;
1036 let trader_id = cancel.trader_id;
1037 let strategy_id = cancel.strategy_id;
1038 let instrument_id = cancel.instrument_id;
1039
1040 self.spawn_task("batch_cancel_order", async move {
1041 if let Err(e) = ws_client
1042 .cancel_order(
1043 &order_id,
1044 client_order_id,
1045 trader_id,
1046 strategy_id,
1047 instrument_id,
1048 )
1049 .await
1050 {
1051 log::error!(
1052 "Batch cancel order failed: order_id={order_id}, client_order_id={client_order_id}, error={e}"
1053 );
1054
1055 let ts_event = clock.get_time_ns();
1056 emitter.emit_order_cancel_rejected_event(
1057 strategy_id,
1058 instrument_id,
1059 client_order_id,
1060 None,
1061 &format!("batch-cancel-error: {e}"),
1062 ts_event,
1063 );
1064
1065 anyhow::bail!("Batch cancel order failed: {e}");
1066 }
1067 Ok(())
1068 });
1069 }
1070
1071 Ok(())
1072 }
1073}
1074
1075fn dispatch_ws_message(message: NautilusWsMessage, emitter: &ExecutionEventEmitter) {
1077 match message {
1078 NautilusWsMessage::AccountState(state) => {
1079 emitter.send_account_state(state);
1080 }
1081 NautilusWsMessage::OrderStatusReports(reports) => {
1082 log::debug!("Processing {} order status report(s)", reports.len());
1083 for report in reports {
1084 emitter.send_order_status_report(report);
1085 }
1086 }
1087 NautilusWsMessage::FillReports(reports) => {
1088 log::debug!("Processing {} fill report(s)", reports.len());
1089 for report in reports {
1090 emitter.send_fill_report(report);
1091 }
1092 }
1093 NautilusWsMessage::OrderRejected(event) => {
1094 emitter.send_order_event(OrderEventAny::Rejected(event));
1095 }
1096 NautilusWsMessage::OrderAccepted(event) => {
1097 emitter.send_order_event(OrderEventAny::Accepted(event));
1098 }
1099 NautilusWsMessage::OrderCanceled(event) => {
1100 emitter.send_order_event(OrderEventAny::Canceled(event));
1101 }
1102 NautilusWsMessage::OrderExpired(event) => {
1103 emitter.send_order_event(OrderEventAny::Expired(event));
1104 }
1105 NautilusWsMessage::OrderUpdated(event) => {
1106 emitter.send_order_event(OrderEventAny::Updated(event));
1107 }
1108 NautilusWsMessage::OrderCancelRejected(event) => {
1109 emitter.send_order_event(OrderEventAny::CancelRejected(event));
1110 }
1111 NautilusWsMessage::OrderModifyRejected(event) => {
1112 emitter.send_order_event(OrderEventAny::ModifyRejected(event));
1113 }
1114 NautilusWsMessage::Error(e) => {
1115 log::warn!("WebSocket error: {e}");
1116 }
1117 NautilusWsMessage::Reconnected => {
1118 log::info!("WebSocket reconnected");
1119 }
1120 NautilusWsMessage::Authenticated(auth) => {
1121 log::debug!("WebSocket authenticated: scope={}", auth.scope);
1122 }
1123 NautilusWsMessage::AuthenticationFailed(reason) => {
1124 log::error!("Authentication failed in execution client: {reason}");
1125 }
1126 NautilusWsMessage::Data(_)
1127 | NautilusWsMessage::Deltas(_)
1128 | NautilusWsMessage::Instrument(_)
1129 | NautilusWsMessage::InstrumentStatus(_)
1130 | NautilusWsMessage::FundingRates(_)
1131 | NautilusWsMessage::OptionGreeks(_)
1132 | NautilusWsMessage::Raw(_) => {
1133 log::trace!("Ignoring data message in execution client");
1135 }
1136 }
1137}