1use std::{
42 sync::{Arc, Mutex},
43 time::{Duration, Instant},
44};
45
46use ahash::AHashMap;
47use anyhow::Context;
48use async_trait::async_trait;
49use dashmap::DashMap;
50use futures_util::{Stream, StreamExt, pin_mut};
51use nautilus_common::{
52 clients::ExecutionClient,
53 live::{get_runtime, runner::get_exec_event_sender},
54 messages::execution::{
55 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
56 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
57 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
58 },
59};
60use nautilus_core::{
61 MUTEX_POISONED, UUID4, UnixNanos,
62 time::{AtomicTime, get_atomic_clock_realtime},
63};
64use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
65use nautilus_model::{
66 accounts::AccountAny,
67 enums::{AccountType, OmsType, OrderSide, OrderStatus, OrderType, TimeInForce},
68 events::{AccountState, OrderAccepted, OrderCanceled, OrderEventAny},
69 identifiers::{
70 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Symbol, Venue, VenueOrderId,
71 },
72 instruments::{Instrument, InstrumentAny},
73 orders::Order,
74 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
75 types::{AccountBalance, Currency, MarginBalance, Money},
76};
77use nautilus_network::retry::RetryConfig;
78use rust_decimal::Decimal;
79use tokio::task::JoinHandle;
80
81use crate::{
82 common::{
83 consts::DYDX_VENUE,
84 credential::{DydxCredential, credential_env_vars},
85 instrument_cache::InstrumentCache,
86 parse::nanos_to_secs_i64,
87 },
88 config::DydxAdapterConfig,
89 execution::{
90 broadcaster::TxBroadcaster,
91 encoder::ClientOrderIdEncoder,
92 order_builder::OrderMessageBuilder,
93 tx_manager::TransactionManager,
94 types::{LimitOrderParams, OrderContext},
95 },
96 grpc::{DydxGrpcClient, SHORT_TERM_ORDER_MAXIMUM_LIFETIME, types::ChainId},
97 http::{
98 client::DydxHttpClient,
99 parse::{
100 parse_account_state, parse_fill_report, parse_order_status_report,
101 parse_position_status_report,
102 },
103 },
104 websocket::{
105 DydxWsDispatchState, OrderIdentity,
106 client::DydxWebSocketClient,
107 enums::DydxWsOutputMessage,
108 fill_report_to_order_filled,
109 parse::{parse_ws_fill_report, parse_ws_order_report, parse_ws_position_report},
110 },
111};
112
113pub mod block_time;
114pub mod broadcaster;
115pub mod encoder;
116pub mod order_builder;
117pub mod submitter;
118pub mod tx_manager;
119pub mod types;
120pub mod wallet;
121
122use block_time::BlockTimeMonitor;
123
124fn apply_avg_px_from_fills(order_reports: &mut [OrderStatusReport], fill_reports: &[FillReport]) {
125 let mut totals: AHashMap<VenueOrderId, (Decimal, Decimal)> = AHashMap::new();
126 for fill in fill_reports {
127 let entry = totals.entry(fill.venue_order_id).or_default();
128 let qty = fill.last_qty.as_decimal();
129 entry.0 += fill.last_px.as_decimal() * qty;
130 entry.1 += qty;
131 }
132
133 for report in order_reports {
134 if let Some((notional, total_qty)) = totals.get(&report.venue_order_id)
135 && !total_qty.is_zero()
136 {
137 report.avg_px = Some(notional / total_qty);
138 }
139 }
140}
141
142#[derive(Debug)]
157pub struct DydxExecutionClient {
158 core: ExecutionClientCore,
159 clock: &'static AtomicTime,
160 config: DydxAdapterConfig,
161 emitter: ExecutionEventEmitter,
162 http_client: DydxHttpClient,
163 ws_client: DydxWebSocketClient,
164 grpc_client: Arc<tokio::sync::RwLock<Option<DydxGrpcClient>>>,
165 instrument_cache: Arc<InstrumentCache>,
166 block_time_monitor: Arc<BlockTimeMonitor>,
167 oracle_prices: Arc<DashMap<InstrumentId, Decimal>>,
168 encoder: Arc<ClientOrderIdEncoder>,
169 dispatch_state: Arc<DydxWsDispatchState>,
170 order_contexts: Arc<DashMap<u32, OrderContext>>,
171 order_id_map: Arc<DashMap<String, (u32, u32)>>,
172 wallet_address: String,
173 subaccount_number: u32,
174 tx_manager: Option<Arc<TransactionManager>>,
175 broadcaster: Option<Arc<TxBroadcaster>>,
176 order_builder: Option<Arc<OrderMessageBuilder>>,
177 ws_stream_handle: Option<JoinHandle<()>>,
178 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
179}
180
181impl DydxExecutionClient {
182 pub fn new(
188 core: ExecutionClientCore,
189 config: DydxAdapterConfig,
190 wallet_address: String,
191 subaccount_number: u32,
192 ) -> anyhow::Result<Self> {
193 let trader_id = core.trader_id;
194 let account_id = core.account_id;
195 let clock = get_atomic_clock_realtime();
196 let emitter =
197 ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
198
199 let retry_config = RetryConfig {
200 max_retries: config.max_retries,
201 initial_delay_ms: config.retry_delay_initial_ms,
202 max_delay_ms: config.retry_delay_max_ms,
203 ..Default::default()
204 };
205 let http_client = DydxHttpClient::new(
206 Some(config.base_url.clone()),
207 config.timeout_secs,
208 config.proxy_url.clone(),
209 config.network,
210 Some(retry_config),
211 )?;
212
213 let instrument_cache = http_client.instrument_cache().clone();
215
216 let credential = DydxCredential::resolve(
218 config.private_key.as_deref(),
219 config.network,
220 config.authenticator_ids.clone(),
221 )?
222 .ok_or_else(|| anyhow::anyhow!("Credentials required for execution client"))?;
223
224 let ws_client = DydxWebSocketClient::new_private_with_cache(
226 config.ws_url.clone(),
227 credential,
228 core.account_id,
229 instrument_cache.clone(),
230 Some(20),
231 config.transport_backend,
232 config.proxy_url.clone(),
233 );
234
235 let grpc_client = Arc::new(tokio::sync::RwLock::new(None));
236
237 Ok(Self {
238 core,
239 clock,
240 config,
241 emitter,
242 http_client,
243 ws_client,
244 grpc_client,
245 instrument_cache,
246 block_time_monitor: Arc::new(BlockTimeMonitor::new()),
247 oracle_prices: Arc::new(DashMap::new()),
248 encoder: Arc::new(ClientOrderIdEncoder::new()),
249 dispatch_state: Arc::new(DydxWsDispatchState::default()),
250 order_contexts: Arc::new(DashMap::new()),
251 order_id_map: Arc::new(DashMap::new()),
252 wallet_address,
253 subaccount_number,
254 tx_manager: None,
255 broadcaster: None,
256 order_builder: None,
257 ws_stream_handle: None,
258 pending_tasks: Mutex::new(Vec::new()),
259 })
260 }
261
262 fn resolve_private_key(config: &DydxAdapterConfig) -> anyhow::Result<String> {
263 let (private_key_env, _) = credential_env_vars(config.network);
264
265 if let Some(ref pk) = config.private_key
267 && !pk.trim().is_empty()
268 {
269 return Ok(pk.clone());
270 }
271
272 if let Some(pk) = std::env::var(private_key_env)
274 .ok()
275 .filter(|s| !s.trim().is_empty())
276 {
277 return Ok(pk);
278 }
279
280 anyhow::bail!("{private_key_env} not found in config or environment")
281 }
282
283 fn register_order_context(&self, client_id_u32: u32, context: OrderContext) {
284 self.order_contexts.insert(client_id_u32, context);
285 }
286
287 fn get_order_context(&self, client_id_u32: u32) -> Option<OrderContext> {
288 self.order_contexts
289 .get(&client_id_u32)
290 .map(|r| r.value().clone())
291 }
292
293 fn get_chain_id(&self) -> ChainId {
294 self.config.get_chain_id()
295 }
296
297 fn spawn_ws_stream_handler(
298 &mut self,
299 stream: impl Stream<Item = DydxWsOutputMessage> + Send + 'static,
300 ) {
301 if self.ws_stream_handle.is_some() {
302 return;
303 }
304
305 log::debug!("Starting execution WebSocket message processing task");
306
307 let trader_id = self.core.trader_id;
309 let account_id = self.core.account_id;
310 let instrument_cache = self.instrument_cache.clone();
311 let oracle_prices = self.oracle_prices.clone();
312 let encoder = self.encoder.clone();
313 let order_contexts = self.order_contexts.clone();
314 let order_id_map = self.order_id_map.clone();
315 let dispatch_state = self.dispatch_state.clone();
316 let block_time_monitor = self.block_time_monitor.clone();
317 let emitter = self.emitter.clone();
318 let clock = self.clock;
319
320 let handle = get_runtime().spawn(async move {
321 log::debug!("Execution WebSocket message loop started");
322
323 let mut cum_fill_totals: AHashMap<VenueOrderId, (Decimal, Decimal)> =
325 AHashMap::new();
326
327 pin_mut!(stream);
328 while let Some(msg) = stream.next().await {
329 match msg {
330 DydxWsOutputMessage::SubaccountSubscribed(msg) => {
331 log::debug!("Parsing subaccount subscription with full context");
332
333 let inst_map = instrument_cache.to_instrument_id_map();
334
335 let oracle_map: std::collections::HashMap<_, _> = oracle_prices
336 .iter()
337 .map(|entry| (*entry.key(), *entry.value()))
338 .collect();
339
340 let ts_init = clock.get_time_ns();
341 let ts_event = ts_init;
342
343 if let Some(ref subaccount) = msg.contents.subaccount {
344 match parse_account_state(
345 subaccount,
346 account_id,
347 &inst_map,
348 &oracle_map,
349 ts_event,
350 ts_init,
351 ) {
352 Ok(account_state) => {
353 log::debug!(
354 "Parsed account state: {} balance(s), {} margin(s)",
355 account_state.balances.len(),
356 account_state.margins.len()
357 );
358 emitter.send_account_state(account_state);
359 }
360 Err(e) => {
361 log::error!("Failed to parse account state: {e}");
362 }
363 }
364
365 if let Some(ref positions) =
366 subaccount.open_perpetual_positions
367 {
368 log::debug!(
369 "Parsing {} position(s) from subscription",
370 positions.len()
371 );
372
373 for (market, ws_position) in positions {
374 match parse_ws_position_report(
375 ws_position,
376 &instrument_cache,
377 account_id,
378 ts_init,
379 ) {
380 Ok(report) => {
381 log::debug!(
382 "Parsed position report: {} {} {} {}",
383 report.instrument_id,
384 report.position_side,
385 report.quantity,
386 market
387 );
388 emitter.send_position_report(report);
389 }
390 Err(e) => {
391 log::error!(
392 "Failed to parse WebSocket position for {market}: {e}"
393 );
394 }
395 }
396 }
397 }
398 } else {
399 log::warn!("Subaccount subscription without initial state (new/empty subaccount)");
400
401 let currency = Currency::get_or_create_crypto_with_context("USDC", None);
402 let zero = Money::zero(currency);
403 let balance = AccountBalance::new_checked(zero, zero, zero)
404 .expect("zero balance should always be valid");
405 let account_state = AccountState::new(
406 account_id,
407 AccountType::Margin,
408 vec![balance],
409 vec![],
410 true,
411 UUID4::new(),
412 ts_init,
413 ts_init,
414 None,
415 );
416 emitter.send_account_state(account_state);
417 }
418 }
419 DydxWsOutputMessage::SubaccountsChannelData(data) => {
420 log::debug!(
421 "Processing subaccounts channel data (orders={:?}, fills={:?})",
422 data.contents.orders.as_ref().map(|o| o.len()),
423 data.contents.fills.as_ref().map(|f| f.len())
424 );
425 let ts_init = clock.get_time_ns();
426
427 let mut terminal_orders: Vec<(u32, u32, String)> = Vec::new();
428 let mut pending_order_reports = Vec::new();
429
430 if let Some(ref orders) = data.contents.orders {
432 for ws_order in orders {
433 log::debug!(
434 "Parsing WS order: clob_pair_id={}, status={:?}, client_id={}",
435 ws_order.clob_pair_id,
436 ws_order.status,
437 ws_order.client_id
438 );
439
440 if let Ok(client_id_u32) = ws_order.client_id.parse::<u32>() {
441 let client_meta = ws_order.client_metadata
442 .as_ref()
443 .and_then(|s| s.parse::<u32>().ok())
444 .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
445 order_id_map.insert(ws_order.id.clone(), (client_id_u32, client_meta));
446 }
447
448 match parse_ws_order_report(
449 ws_order,
450 &instrument_cache,
451 &order_contexts,
452 &encoder,
453 account_id,
454 ts_init,
455 ) {
456 Ok(report) => {
457 if !report.order_status.is_open()
458 && let Ok(cid) = ws_order.client_id.parse::<u32>()
459 {
460 let meta = ws_order.client_metadata
461 .as_ref()
462 .and_then(|s| s.parse::<u32>().ok())
463 .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
464 terminal_orders.push((cid, meta, ws_order.id.clone()));
465 }
466 log::debug!(
467 "Parsed order report: {} {} {:?} qty={} client_order_id={:?}",
468 report.instrument_id,
469 report.order_side,
470 report.order_status,
471 report.quantity,
472 report.client_order_id
473 );
474 pending_order_reports.push(report);
475 }
476 Err(e) => {
477 log::error!("Failed to parse WebSocket order: {e}");
478 }
479 }
480 }
481 }
482
483 if let Some(ref fills) = data.contents.fills {
485 for ws_fill in fills {
486 match parse_ws_fill_report(
487 ws_fill,
488 &instrument_cache,
489 &order_id_map,
490 &order_contexts,
491 &encoder,
492 account_id,
493 ts_init,
494 ) {
495 Ok(report) => {
496 log::debug!(
497 "Parsed fill report: {} {} {} @ {} client_order_id={:?}",
498 report.instrument_id,
499 report.venue_order_id,
500 report.last_qty,
501 report.last_px,
502 report.client_order_id
503 );
504
505 let identity = report.client_order_id.and_then(|cid| {
506 dispatch_state.order_identities.get(&cid).map(|r| (cid, r.clone()))
507 });
508
509 if let Some((cid, ident)) = identity {
510 if !dispatch_state.emitted_accepted.contains(&cid) {
512 dispatch_state.insert_accepted(cid);
513 let accepted = OrderAccepted::new(
514 trader_id,
515 ident.strategy_id,
516 ident.instrument_id,
517 cid,
518 report.venue_order_id,
519 account_id,
520 UUID4::new(),
521 ts_init,
522 ts_init,
523 false,
524 );
525 emitter.send_order_event(OrderEventAny::Accepted(accepted));
526 }
527
528 dispatch_state.insert_filled(cid);
529 let instrument = instrument_cache.get(&report.instrument_id);
530 let quote_currency = instrument
531 .map_or_else(Currency::USD, |i: InstrumentAny| i.quote_currency());
532 let filled = fill_report_to_order_filled(
533 &report, trader_id, &ident, quote_currency,
534 );
535 emitter.send_order_event(OrderEventAny::Filled(filled));
536 } else {
537 let entry = cum_fill_totals
539 .entry(report.venue_order_id)
540 .or_default();
541 let qty = report.last_qty.as_decimal();
542 entry.0 += report.last_px.as_decimal() * qty;
543 entry.1 += qty;
544 emitter.send_fill_report(report);
545 }
546 }
547 Err(e) => {
548 log::error!("Failed to parse WebSocket fill: {e}");
549 }
550 }
551 }
552 }
553
554 for report in &mut pending_order_reports {
557 if let Some((notional, total_qty)) =
558 cum_fill_totals.get(&report.venue_order_id)
559 && !total_qty.is_zero()
560 {
561 report.avg_px = Some(notional / total_qty);
562 }
563 }
564
565 for report in pending_order_reports {
566 let identity = report.client_order_id.and_then(|cid| {
567 dispatch_state.order_identities.get(&cid).map(|r| (cid, r.clone()))
568 });
569
570 if let Some((cid, ident)) = identity {
571 match report.order_status {
573 OrderStatus::Accepted => {
574 if dispatch_state.emitted_accepted.contains(&cid)
575 || dispatch_state.filled_orders.contains(&cid)
576 {
577 log::debug!("Skipping duplicate Accepted for {cid}");
578 continue;
579 }
580 dispatch_state.insert_accepted(cid);
581 let accepted = OrderAccepted::new(
582 trader_id,
583 ident.strategy_id,
584 ident.instrument_id,
585 cid,
586 report.venue_order_id,
587 account_id,
588 UUID4::new(),
589 report.ts_last,
590 ts_init,
591 false,
592 );
593 emitter.send_order_event(OrderEventAny::Accepted(accepted));
594 }
595 OrderStatus::Canceled => {
596 if !dispatch_state.emitted_accepted.contains(&cid) {
598 dispatch_state.insert_accepted(cid);
599 let accepted = OrderAccepted::new(
600 trader_id,
601 ident.strategy_id,
602 ident.instrument_id,
603 cid,
604 report.venue_order_id,
605 account_id,
606 UUID4::new(),
607 ts_init,
608 ts_init,
609 false,
610 );
611 emitter.send_order_event(OrderEventAny::Accepted(accepted));
612 }
613 let canceled = OrderCanceled::new(
614 trader_id,
615 ident.strategy_id,
616 ident.instrument_id,
617 cid,
618 UUID4::new(),
619 report.ts_last,
620 ts_init,
621 false,
622 Some(report.venue_order_id),
623 Some(account_id),
624 );
625 emitter.send_order_event(OrderEventAny::Canceled(canceled));
626 dispatch_state.cleanup_terminal(&cid);
627 }
628 OrderStatus::Filled => {
629 dispatch_state.cleanup_terminal(&cid);
631 }
632 _ => {
633 emitter.send_order_status_report(report);
635 }
636 }
637 } else {
638 emitter.send_order_status_report(report);
640 }
641 }
642
643 for (client_id, client_metadata, order_id) in terminal_orders {
645 order_contexts.remove(&client_id);
646 encoder.remove(client_id, client_metadata);
647 order_id_map.remove(&order_id);
648 cum_fill_totals.remove(&VenueOrderId::new(&order_id));
649 }
650 }
651 DydxWsOutputMessage::Markets(contents) => {
652 if let Some(ref oracle_map) = contents.oracle_prices {
653 for (symbol_str, oracle_market) in oracle_map {
654 let instrument_id = {
655 let symbol = format!("{symbol_str}-PERP");
656 InstrumentId::new(
657 Symbol::new(&symbol),
658 *crate::common::consts::DYDX_VENUE,
659 )
660 };
661
662 if instrument_cache.get(&instrument_id).is_some()
663 && let Ok(price_dec) = oracle_market.oracle_price.parse::<Decimal>()
664 {
665 oracle_prices.insert(instrument_id, price_dec);
666 log::trace!("Updated oracle price for {instrument_id}: {price_dec}");
667 }
668 }
669 }
670
671 if let Some(ref markets) = contents.markets {
672 for (symbol_str, market_data) in markets {
673 if let Some(oracle_price_str) = &market_data.oracle_price {
674 let instrument_id = {
675 let symbol = format!("{symbol_str}-PERP");
676 InstrumentId::new(
677 Symbol::new(&symbol),
678 *crate::common::consts::DYDX_VENUE,
679 )
680 };
681
682 if instrument_cache.get(&instrument_id).is_some()
683 && let Ok(price_dec) = oracle_price_str.parse::<Decimal>()
684 {
685 oracle_prices.insert(instrument_id, price_dec);
686 }
687 }
688 }
689 }
690 }
691 DydxWsOutputMessage::BlockHeight { height, time } => {
692 log::debug!("Block height update: {height} at {time}");
693 block_time_monitor.record_block(height, time);
694 }
695 DydxWsOutputMessage::Error(err) => {
696 log::error!("WebSocket error: {err:?}");
697 }
698 DydxWsOutputMessage::Reconnected => {
699 log::info!("WebSocket reconnected");
700 }
701 _ => {}
702 }
703 }
704 log::debug!("WebSocket message processing task ended");
705 });
706
707 self.ws_stream_handle = Some(handle);
708 log::info!("WebSocket stream handler started");
709 }
710
711 fn mark_instruments_initialized(&self) {
716 let count = self.instrument_cache.len();
717 self.core.set_instruments_initialized();
718 log::debug!("Instruments initialized: {count} instruments in shared cache");
719 }
720
721 fn get_instrument_by_market(&self, market: &str) -> Option<InstrumentAny> {
722 self.instrument_cache.get_by_market(market)
723 }
724
725 fn get_instrument_by_clob_pair_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
726 let instrument = self.instrument_cache.get_by_clob_id(clob_pair_id);
727
728 if instrument.is_none() {
729 self.instrument_cache.log_missing_clob_pair_id(clob_pair_id);
730 }
731
732 instrument
733 }
734
735 fn get_execution_components(
739 &self,
740 ) -> anyhow::Result<(
741 Arc<TransactionManager>,
742 Arc<TxBroadcaster>,
743 Arc<OrderMessageBuilder>,
744 )> {
745 let tx_manager = self
746 .tx_manager
747 .as_ref()
748 .ok_or_else(|| {
749 anyhow::anyhow!("TransactionManager not initialized - call connect() first")
750 })?
751 .clone();
752 let broadcaster = self
753 .broadcaster
754 .as_ref()
755 .ok_or_else(|| anyhow::anyhow!("TxBroadcaster not initialized - call connect() first"))?
756 .clone();
757 let order_builder = self
758 .order_builder
759 .as_ref()
760 .ok_or_else(|| {
761 anyhow::anyhow!("OrderMessageBuilder not initialized - call connect() first")
762 })?
763 .clone();
764 Ok((tx_manager, broadcaster, order_builder))
765 }
766
767 fn spawn_task<F>(&self, label: &'static str, fut: F)
768 where
769 F: Future<Output = anyhow::Result<()>> + Send + 'static,
770 {
771 let handle = get_runtime().spawn(async move {
772 if let Err(e) = fut.await {
773 log::error!("{label}: {e:?}");
774 }
775 });
776
777 self.pending_tasks
778 .lock()
779 .expect(MUTEX_POISONED)
780 .push(handle);
781 }
782
783 fn spawn_order_task<F>(
787 &self,
788 label: &'static str,
789 strategy_id: StrategyId,
790 instrument_id: InstrumentId,
791 client_order_id: ClientOrderId,
792 fut: F,
793 ) where
794 F: Future<Output = anyhow::Result<()>> + Send + 'static,
795 {
796 let emitter = self.emitter.clone();
797 let clock = self.clock;
798
799 let handle = get_runtime().spawn(async move {
800 if let Err(e) = fut.await {
801 let error_msg = format!("{label} failed: {e:?}");
802 log::error!("{error_msg}");
803
804 let ts_event = clock.get_time_ns();
805 emitter.emit_order_rejected_event(
806 strategy_id,
807 instrument_id,
808 client_order_id,
809 &error_msg,
810 ts_event,
811 false,
812 );
813 }
814 });
815
816 self.pending_tasks
817 .lock()
818 .expect(MUTEX_POISONED)
819 .push(handle);
820 }
821
822 fn abort_pending_tasks(&self) {
823 let mut guard = self.pending_tasks.lock().expect(MUTEX_POISONED);
824 for handle in guard.drain(..) {
825 handle.abort();
826 }
827 }
828
829 fn send_modify_rejected(
831 &self,
832 strategy_id: StrategyId,
833 instrument_id: InstrumentId,
834 client_order_id: ClientOrderId,
835 venue_order_id: Option<VenueOrderId>,
836 reason: &str,
837 ) {
838 let ts_event = self.clock.get_time_ns();
839 self.emitter.emit_order_modify_rejected_event(
840 strategy_id,
841 instrument_id,
842 client_order_id,
843 venue_order_id,
844 reason,
845 ts_event,
846 );
847 }
848
849 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
859 let account_id = self.core.account_id;
860
861 if self.core.cache().account(&account_id).is_some() {
862 log::info!("Account {account_id} registered");
863 return Ok(());
864 }
865
866 let start = Instant::now();
867 let timeout = Duration::from_secs_f64(timeout_secs);
868 let interval = Duration::from_millis(10);
869
870 loop {
871 tokio::time::sleep(interval).await;
872
873 if self.core.cache().account(&account_id).is_some() {
874 log::info!("Account {account_id} registered");
875 return Ok(());
876 }
877
878 if start.elapsed() >= timeout {
879 anyhow::bail!(
880 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
881 );
882 }
883 }
884 }
885}
886
887async fn broadcast_partitioned_cancels(
895 orders: Vec<(InstrumentId, u32, u32)>,
896 block_height: u32,
897 tx_manager: Arc<TransactionManager>,
898 broadcaster: Arc<TxBroadcaster>,
899 order_builder: Arc<OrderMessageBuilder>,
900) -> anyhow::Result<()> {
901 if orders.is_empty() {
902 return Ok(());
903 }
904
905 let (short_term_orders, long_term_orders): (Vec<_>, Vec<_>) = orders
906 .into_iter()
907 .partition(|(_, _, flags)| *flags == types::ORDER_FLAG_SHORT_TERM);
908
909 if !short_term_orders.is_empty() {
911 let st_pairs: Vec<_> = short_term_orders
912 .iter()
913 .map(|(inst_id, client_id, _)| (*inst_id, *client_id))
914 .collect();
915
916 log::debug!(
917 "Batch cancelling {} short-term orders with MsgBatchCancel",
918 st_pairs.len()
919 );
920
921 match order_builder.build_batch_cancel_short_term(&st_pairs, block_height) {
922 Ok(msg) => {
923 let operation = format!("BatchCancel {} short-term orders", st_pairs.len());
924 match broadcaster
925 .broadcast_short_term(&tx_manager, vec![msg], &operation)
926 .await
927 {
928 Ok(tx_hash) => {
929 log::debug!(
930 "Successfully batch cancelled {} short-term orders, tx_hash: {}",
931 st_pairs.len(),
932 tx_hash
933 );
934 }
935 Err(e) => {
936 log::error!("Short-term batch cancel failed: {e:?}");
937 }
938 }
939 }
940 Err(e) => {
941 log::error!("Failed to build MsgBatchCancel: {e:?}");
942 }
943 }
944 }
945
946 if !long_term_orders.is_empty() {
948 log::debug!(
949 "Batch cancelling {} long-term orders",
950 long_term_orders.len(),
951 );
952
953 match order_builder.build_cancel_orders_batch_with_flags(&long_term_orders, block_height) {
954 Ok(cancel_msgs) => {
955 let operation = format!("BatchCancel {} long-term orders", long_term_orders.len());
956 match broadcaster
957 .broadcast_with_retry(&tx_manager, cancel_msgs, &operation)
958 .await
959 {
960 Ok(tx_hash) => {
961 log::debug!(
962 "Successfully batch cancelled {} long-term orders, tx_hash: {}",
963 long_term_orders.len(),
964 tx_hash
965 );
966 }
967 Err(e) => {
968 log::error!("Long-term batch cancel failed: {e:?}");
969 }
970 }
971 }
972 Err(e) => {
973 log::error!("Failed to build long-term cancel messages: {e:?}");
974 }
975 }
976 }
977
978 Ok(())
979}
980
981#[async_trait(?Send)]
982impl ExecutionClient for DydxExecutionClient {
983 fn is_connected(&self) -> bool {
984 self.core.is_connected()
985 }
986
987 fn client_id(&self) -> ClientId {
988 self.core.client_id
989 }
990
991 fn account_id(&self) -> AccountId {
992 self.core.account_id
993 }
994
995 fn venue(&self) -> Venue {
996 *DYDX_VENUE
997 }
998
999 fn oms_type(&self) -> OmsType {
1000 self.core.oms_type
1001 }
1002
1003 fn get_account(&self) -> Option<AccountAny> {
1004 self.core.cache().account(&self.core.account_id).cloned()
1005 }
1006
1007 fn generate_account_state(
1008 &self,
1009 balances: Vec<AccountBalance>,
1010 margins: Vec<MarginBalance>,
1011 reported: bool,
1012 ts_event: UnixNanos,
1013 ) -> anyhow::Result<()> {
1014 self.emitter
1015 .emit_account_state(balances, margins, reported, ts_event);
1016 Ok(())
1017 }
1018
1019 fn start(&mut self) -> anyhow::Result<()> {
1020 if self.core.is_started() {
1021 log::warn!("dYdX execution client already started");
1022 return Ok(());
1023 }
1024
1025 let sender = get_exec_event_sender();
1026 self.emitter.set_sender(sender);
1027 log::info!("Starting dYdX execution client");
1028 self.core.set_started();
1029 Ok(())
1030 }
1031
1032 fn stop(&mut self) -> anyhow::Result<()> {
1033 if self.core.is_stopped() {
1034 log::warn!("dYdX execution client not started");
1035 return Ok(());
1036 }
1037
1038 log::info!("Stopping dYdX execution client");
1039 self.abort_pending_tasks();
1040 self.core.set_stopped();
1041 self.core.set_disconnected();
1042 Ok(())
1043 }
1044
1045 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
1062 if !self.is_connected() {
1064 let reason = "Cannot submit order: execution client not connected";
1065 log::error!("{reason}");
1066 anyhow::bail!(reason);
1067 }
1068
1069 let current_block = self.block_time_monitor.current_block_height();
1071 let order = self
1072 .core
1073 .cache()
1074 .order(&cmd.client_order_id)
1075 .cloned()
1076 .ok_or_else(|| {
1077 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
1078 })?;
1079
1080 let client_order_id = order.client_order_id();
1081 let instrument_id = order.instrument_id();
1082 let strategy_id = order.strategy_id();
1083
1084 if current_block == 0 {
1085 let reason = "Block height not initialized";
1086 log::warn!("Cannot submit order {client_order_id}: {reason}");
1087 let ts_event = self.clock.get_time_ns();
1088 self.emitter.emit_order_rejected_event(
1089 strategy_id,
1090 instrument_id,
1091 client_order_id,
1092 reason,
1093 ts_event,
1094 false,
1095 );
1096 return Ok(());
1097 }
1098
1099 if order.is_closed() {
1101 log::warn!("Cannot submit closed order {client_order_id}");
1102 return Ok(());
1103 }
1104
1105 match order.order_type() {
1107 OrderType::Market
1108 | OrderType::Limit
1109 | OrderType::StopMarket
1110 | OrderType::StopLimit
1111 | OrderType::MarketIfTouched
1112 | OrderType::LimitIfTouched => {}
1113 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit => {
1115 let reason = "Trailing stop orders not supported by dYdX v4 protocol";
1116 log::error!("{reason}");
1117 let ts_event = self.clock.get_time_ns();
1118 self.emitter.emit_order_rejected_event(
1119 strategy_id,
1120 instrument_id,
1121 client_order_id,
1122 reason,
1123 ts_event,
1124 false,
1125 );
1126 return Ok(());
1127 }
1128 order_type => {
1129 let reason = format!("Order type {order_type:?} not supported by dYdX");
1130 log::error!("{reason}");
1131 let ts_event = self.clock.get_time_ns();
1132 self.emitter.emit_order_rejected_event(
1133 strategy_id,
1134 instrument_id,
1135 client_order_id,
1136 &reason,
1137 ts_event,
1138 false,
1139 );
1140 return Ok(());
1141 }
1142 }
1143
1144 self.emitter.emit_order_submitted(&order);
1145
1146 let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1148 Ok(components) => components,
1149 Err(e) => {
1150 log::error!("Failed to get execution components: {e}");
1151 let ts_event = self.clock.get_time_ns();
1152 self.emitter.emit_order_rejected_event(
1153 strategy_id,
1154 instrument_id,
1155 client_order_id,
1156 &e.to_string(),
1157 ts_event,
1158 false,
1159 );
1160 return Ok(());
1161 }
1162 };
1163
1164 let block_height = self.block_time_monitor.current_block_height() as u32;
1165
1166 let encoded = match self.encoder.encode(client_order_id) {
1168 Ok(enc) => enc,
1169 Err(e) => {
1170 log::error!("Failed to generate client order ID: {e}");
1171 let ts_event = self.clock.get_time_ns();
1172 self.emitter.emit_order_rejected_event(
1173 strategy_id,
1174 instrument_id,
1175 client_order_id,
1176 &e.to_string(),
1177 ts_event,
1178 false,
1179 );
1180 return Ok(());
1181 }
1182 };
1183 let client_id_u32 = encoded.client_id;
1184 let client_metadata = encoded.client_metadata;
1185
1186 log::info!(
1187 "[SUBMIT_ORDER] Nautilus '{}' -> dYdX u32={} meta={:#x} | instrument={} side={:?} qty={} type={:?}",
1188 client_order_id,
1189 client_id_u32,
1190 client_metadata,
1191 instrument_id,
1192 order.order_side(),
1193 order.quantity(),
1194 order.order_type()
1195 );
1196
1197 let expire_time = order.expire_time().map(nanos_to_secs_i64);
1199
1200 let order_flags = match order.order_type() {
1202 OrderType::StopMarket
1204 | OrderType::StopLimit
1205 | OrderType::MarketIfTouched
1206 | OrderType::LimitIfTouched => types::ORDER_FLAG_CONDITIONAL,
1207 OrderType::Market => types::ORDER_FLAG_SHORT_TERM,
1209 OrderType::Limit => {
1211 let lifetime = types::OrderLifetime::from_time_in_force(
1212 order.time_in_force(),
1213 expire_time,
1214 false,
1215 order_builder.max_short_term_secs(),
1216 );
1217 lifetime.order_flags()
1218 }
1219 _ => types::ORDER_FLAG_LONG_TERM,
1221 };
1222
1223 let ts_submitted = self.clock.get_time_ns();
1225 let trader_id = order.trader_id();
1226 self.register_order_context(
1227 client_id_u32,
1228 OrderContext {
1229 client_order_id,
1230 trader_id,
1231 strategy_id,
1232 instrument_id,
1233 submitted_at: ts_submitted,
1234 order_flags,
1235 },
1236 );
1237
1238 self.dispatch_state.order_identities.insert(
1241 client_order_id,
1242 OrderIdentity {
1243 instrument_id,
1244 strategy_id,
1245 order_side: order.order_side(),
1246 order_type: order.order_type(),
1247 },
1248 );
1249
1250 self.spawn_order_task(
1251 "submit_order",
1252 strategy_id,
1253 instrument_id,
1254 client_order_id,
1255 async move {
1256 let (msg, order_type_str) = match order.order_type() {
1258 OrderType::Market => {
1259 let msg = order_builder.build_market_order(
1260 instrument_id,
1261 client_id_u32,
1262 client_metadata,
1263 order.order_side(),
1264 order.quantity(),
1265 block_height,
1266 )?;
1267 (msg, "market")
1268 }
1269 OrderType::Limit => {
1270 let msg = order_builder.build_limit_order(
1272 instrument_id,
1273 client_id_u32,
1274 client_metadata,
1275 order.order_side(),
1276 order
1277 .price()
1278 .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?,
1279 order.quantity(),
1280 order.time_in_force(),
1281 order.is_post_only(),
1282 order.is_reduce_only(),
1283 block_height,
1284 expire_time, )?;
1286 (msg, "limit")
1287 }
1288 OrderType::StopMarket => {
1291 let trigger_price = order.trigger_price().ok_or_else(|| {
1292 anyhow::anyhow!("Stop market order missing trigger_price")
1293 })?;
1294 let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1295 let msg = order_builder.build_stop_market_order(
1296 instrument_id,
1297 client_id_u32,
1298 client_metadata,
1299 order.order_side(),
1300 trigger_price,
1301 order.quantity(),
1302 order.is_reduce_only(),
1303 cond_expire,
1304 )?;
1305 (msg, "stop_market")
1306 }
1307 OrderType::StopLimit => {
1308 let trigger_price = order.trigger_price().ok_or_else(|| {
1309 anyhow::anyhow!("Stop limit order missing trigger_price")
1310 })?;
1311 let limit_price = order.price().ok_or_else(|| {
1312 anyhow::anyhow!("Stop limit order missing limit price")
1313 })?;
1314 let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1315 let msg = order_builder.build_stop_limit_order(
1316 instrument_id,
1317 client_id_u32,
1318 client_metadata,
1319 order.order_side(),
1320 trigger_price,
1321 limit_price,
1322 order.quantity(),
1323 order.time_in_force(),
1324 order.is_post_only(),
1325 order.is_reduce_only(),
1326 cond_expire,
1327 )?;
1328 (msg, "stop_limit")
1329 }
1330 OrderType::MarketIfTouched => {
1332 let trigger_price = order.trigger_price().ok_or_else(|| {
1333 anyhow::anyhow!("Take profit market order missing trigger_price")
1334 })?;
1335 let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1336 let msg = order_builder.build_take_profit_market_order(
1337 instrument_id,
1338 client_id_u32,
1339 client_metadata,
1340 order.order_side(),
1341 trigger_price,
1342 order.quantity(),
1343 order.is_reduce_only(),
1344 cond_expire,
1345 )?;
1346 (msg, "take_profit_market")
1347 }
1348 OrderType::LimitIfTouched => {
1350 let trigger_price = order.trigger_price().ok_or_else(|| {
1351 anyhow::anyhow!("Take profit limit order missing trigger_price")
1352 })?;
1353 let limit_price = order.price().ok_or_else(|| {
1354 anyhow::anyhow!("Take profit limit order missing limit price")
1355 })?;
1356 let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1357 let msg = order_builder.build_take_profit_limit_order(
1358 instrument_id,
1359 client_id_u32,
1360 client_metadata,
1361 order.order_side(),
1362 trigger_price,
1363 limit_price,
1364 order.quantity(),
1365 order.time_in_force(),
1366 order.is_post_only(),
1367 order.is_reduce_only(),
1368 cond_expire,
1369 )?;
1370 (msg, "take_profit_limit")
1371 }
1372 _ => unreachable!("Order type already validated"),
1373 };
1374
1375 let operation = format!("Submit {order_type_str} order {client_order_id}");
1378
1379 if order_flags == types::ORDER_FLAG_SHORT_TERM {
1380 broadcaster
1381 .broadcast_short_term(&tx_manager, vec![msg], &operation)
1382 .await?;
1383 } else {
1384 broadcaster
1385 .broadcast_with_retry(&tx_manager, vec![msg], &operation)
1386 .await?;
1387 }
1388 log::debug!("Successfully submitted {order_type_str} order: {client_order_id}");
1389
1390 Ok(())
1391 },
1392 );
1393
1394 Ok(())
1395 }
1396
1397 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
1398 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
1399 let order_count = orders.len();
1400
1401 if !self.is_connected() {
1403 let reason = "Cannot submit order list: execution client not connected";
1404 log::error!("{reason}");
1405 anyhow::bail!(reason);
1406 }
1407
1408 let current_block = self.block_time_monitor.current_block_height();
1410 if current_block == 0 {
1411 let reason = "Block height not initialized";
1412 log::warn!("Cannot submit order list: {reason}");
1413 let ts_event = self.clock.get_time_ns();
1415
1416 for order in &orders {
1417 self.emitter.emit_order_rejected_event(
1418 order.strategy_id(),
1419 order.instrument_id(),
1420 order.client_order_id(),
1421 reason,
1422 ts_event,
1423 false,
1424 );
1425 }
1426 return Ok(());
1427 }
1428
1429 let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1431 Ok(components) => components,
1432 Err(e) => {
1433 log::error!("Failed to get execution components for batch: {e}");
1434 let ts_event = self.clock.get_time_ns();
1436
1437 for order in &orders {
1438 self.emitter.emit_order_rejected_event(
1439 order.strategy_id(),
1440 order.instrument_id(),
1441 order.client_order_id(),
1442 &e.to_string(),
1443 ts_event,
1444 false,
1445 );
1446 }
1447 return Ok(());
1448 }
1449 };
1450
1451 let mut order_params: Vec<LimitOrderParams> = Vec::with_capacity(order_count);
1453 let mut order_info: Vec<(ClientOrderId, InstrumentId, StrategyId)> =
1454 Vec::with_capacity(order_count);
1455
1456 for order in &orders {
1457 if order.order_type() != OrderType::Limit {
1459 log::warn!(
1460 "Order {} has type {:?}, falling back to individual submission",
1461 order.client_order_id(),
1462 order.order_type()
1463 );
1464 let submit_cmd = SubmitOrder::new(
1466 cmd.trader_id,
1467 cmd.client_id,
1468 cmd.strategy_id,
1469 order.instrument_id(),
1470 order.client_order_id(),
1471 order.init_event().clone(),
1472 cmd.exec_algorithm_id,
1473 cmd.position_id,
1474 cmd.params.clone(),
1475 UUID4::new(),
1476 cmd.ts_init,
1477 );
1478
1479 if let Err(e) = self.submit_order(submit_cmd) {
1480 log::error!(
1481 "Failed to submit order {} from order list: {e}",
1482 order.client_order_id()
1483 );
1484 }
1485 continue;
1486 }
1487
1488 let Some(price) = order.price() else {
1490 let ts_event = self.clock.get_time_ns();
1491 self.emitter.emit_order_rejected_event(
1492 order.strategy_id(),
1493 order.instrument_id(),
1494 order.client_order_id(),
1495 "Limit order missing price",
1496 ts_event,
1497 false,
1498 );
1499 continue;
1500 };
1501
1502 let encoded = match self.encoder.encode(order.client_order_id()) {
1504 Ok(enc) => enc,
1505 Err(e) => {
1506 log::error!("Failed to generate client order ID: {e}");
1507 let ts_event = self.clock.get_time_ns();
1508 self.emitter.emit_order_rejected_event(
1509 order.strategy_id(),
1510 order.instrument_id(),
1511 order.client_order_id(),
1512 &e.to_string(),
1513 ts_event,
1514 false,
1515 );
1516 continue;
1517 }
1518 };
1519 let client_id_u32 = encoded.client_id;
1520 let client_metadata = encoded.client_metadata;
1521
1522 self.emitter.emit_order_submitted(order);
1524
1525 let expire_time_secs = order.expire_time().map(nanos_to_secs_i64);
1527 let lifetime = types::OrderLifetime::from_time_in_force(
1528 order.time_in_force(),
1529 expire_time_secs,
1530 false,
1531 order_builder.max_short_term_secs(),
1532 );
1533
1534 let ts_submitted = self.clock.get_time_ns();
1536 self.register_order_context(
1537 client_id_u32,
1538 OrderContext {
1539 client_order_id: order.client_order_id(),
1540 trader_id: order.trader_id(),
1541 strategy_id: order.strategy_id(),
1542 instrument_id: order.instrument_id(),
1543 submitted_at: ts_submitted,
1544 order_flags: lifetime.order_flags(),
1545 },
1546 );
1547
1548 self.dispatch_state.order_identities.insert(
1550 order.client_order_id(),
1551 OrderIdentity {
1552 instrument_id: order.instrument_id(),
1553 strategy_id: order.strategy_id(),
1554 order_side: order.order_side(),
1555 order_type: order.order_type(),
1556 },
1557 );
1558
1559 order_params.push(LimitOrderParams {
1561 instrument_id: order.instrument_id(),
1562 client_order_id: client_id_u32,
1563 client_metadata,
1564 side: order.order_side(),
1565 price,
1566 quantity: order.quantity(),
1567 time_in_force: order.time_in_force(),
1568 post_only: order.is_post_only(),
1569 reduce_only: order.is_reduce_only(),
1570 expire_time_ns: order.expire_time(),
1571 });
1572 order_info.push((
1573 order.client_order_id(),
1574 order.instrument_id(),
1575 order.strategy_id(),
1576 ));
1577 }
1578
1579 if order_params.is_empty() {
1581 return Ok(());
1582 }
1583
1584 let has_short_term = order_params
1588 .iter()
1589 .any(|params| order_builder.is_short_term_order(params));
1590
1591 let block_height = current_block as u32;
1592 let emitter = self.emitter.clone();
1593 let clock = self.clock;
1594
1595 if has_short_term {
1596 log::debug!(
1598 "Submitting {} short-term limit orders concurrently (sequence not consumed)",
1599 order_params.len()
1600 );
1601
1602 let order_count = order_params.len();
1603
1604 let handle = get_runtime().spawn(async move {
1605 let mut handles = Vec::with_capacity(order_count);
1608
1609 for (params, (client_order_id, instrument_id, strategy_id)) in
1610 order_params.into_iter().zip(order_info)
1611 {
1612 let tx_manager = tx_manager.clone();
1613 let broadcaster = broadcaster.clone();
1614 let order_builder = order_builder.clone();
1615 let emitter = emitter.clone();
1616
1617 let handle = get_runtime().spawn(async move {
1618 let msg = match order_builder
1620 .build_limit_order_from_params(¶ms, block_height)
1621 {
1622 Ok(m) => m,
1623 Err(e) => {
1624 let error_msg = format!("Failed to build order message: {e:?}");
1625 log::error!("{error_msg}");
1626 let ts_event = clock.get_time_ns();
1627 emitter.emit_order_rejected_event(
1628 strategy_id,
1629 instrument_id,
1630 client_order_id,
1631 &error_msg,
1632 ts_event,
1633 false,
1634 );
1635 return;
1636 }
1637 };
1638
1639 let operation = format!("Submit short-term order {client_order_id}");
1641
1642 if let Err(e) = broadcaster
1643 .broadcast_short_term(&tx_manager, vec![msg], &operation)
1644 .await
1645 {
1646 let error_msg = format!("Order submission failed: {e:?}");
1647 log::error!("{error_msg}");
1648 let ts_event = clock.get_time_ns();
1649 emitter.emit_order_rejected_event(
1650 strategy_id,
1651 instrument_id,
1652 client_order_id,
1653 &error_msg,
1654 ts_event,
1655 false,
1656 );
1657 }
1658 });
1659
1660 handles.push(handle);
1661 }
1662
1663 for handle in handles {
1665 let _ = handle.await;
1666 }
1667 });
1668
1669 self.pending_tasks
1671 .lock()
1672 .expect(MUTEX_POISONED)
1673 .push(handle);
1674 } else {
1675 log::info!(
1677 "Batch submitting {} long-term limit orders in single transaction",
1678 order_params.len()
1679 );
1680
1681 let handle = get_runtime().spawn(async move {
1682 let msgs: Result<Vec<_>, _> = order_params
1684 .iter()
1685 .map(|params| order_builder.build_limit_order_from_params(params, block_height))
1686 .collect();
1687
1688 let msgs = match msgs {
1689 Ok(m) => m,
1690 Err(e) => {
1691 let error_msg = format!("Failed to build batch order messages: {e:?}");
1692 log::error!("{error_msg}");
1693 let ts_event = clock.get_time_ns();
1695
1696 for (client_order_id, instrument_id, strategy_id) in order_info {
1697 emitter.emit_order_rejected_event(
1698 strategy_id,
1699 instrument_id,
1700 client_order_id,
1701 &error_msg,
1702 ts_event,
1703 false,
1704 );
1705 }
1706 return;
1707 }
1708 };
1709
1710 let operation = format!("Submit batch of {} limit orders", msgs.len());
1712
1713 if let Err(e) = broadcaster
1714 .broadcast_with_retry(&tx_manager, msgs, &operation)
1715 .await
1716 {
1717 let error_msg = format!("Batch order submission failed: {e:?}");
1718 log::error!("{error_msg}");
1719
1720 let ts_event = clock.get_time_ns();
1722
1723 for (client_order_id, instrument_id, strategy_id) in order_info {
1724 emitter.emit_order_rejected_event(
1725 strategy_id,
1726 instrument_id,
1727 client_order_id,
1728 &error_msg,
1729 ts_event,
1730 false,
1731 );
1732 }
1733 }
1734 });
1735
1736 self.pending_tasks
1738 .lock()
1739 .expect(MUTEX_POISONED)
1740 .push(handle);
1741 }
1742
1743 Ok(())
1744 }
1745
1746 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1750 let reason = "dYdX does not support order modification. Use cancel and resubmit instead.";
1751 log::error!("{reason}");
1752
1753 self.send_modify_rejected(
1754 cmd.strategy_id,
1755 cmd.instrument_id,
1756 cmd.client_order_id,
1757 cmd.venue_order_id,
1758 reason,
1759 );
1760 Ok(())
1761 }
1762
1763 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1782 if !self.is_connected() {
1783 anyhow::bail!("Cannot cancel order: not connected");
1784 }
1785
1786 let client_order_id = cmd.client_order_id;
1787 let instrument_id = cmd.instrument_id;
1788 let strategy_id = cmd.strategy_id;
1789 let venue_order_id = cmd.venue_order_id;
1790
1791 let (order_time_in_force, order_expire_time) = {
1792 let cache = self.core.cache();
1793
1794 let order = match cache.order(&client_order_id) {
1795 Some(order) => order,
1796 None => {
1797 log::error!("Cannot cancel order {client_order_id}: not found in cache");
1798 return Ok(()); }
1800 };
1801
1802 if order.is_closed() {
1804 log::warn!(
1805 "CancelOrder command for {} when order already {} (will not send to exchange)",
1806 client_order_id,
1807 order.status()
1808 );
1809 return Ok(());
1810 }
1811
1812 if cache.instrument(&instrument_id).is_none() {
1814 log::error!(
1815 "Cannot cancel order {client_order_id}: instrument {instrument_id} not found in cache"
1816 );
1817 return Ok(()); }
1819
1820 (
1822 order.time_in_force(),
1823 order.expire_time().map(nanos_to_secs_i64),
1824 )
1825 }; log::debug!("Cancelling order {client_order_id} for instrument {instrument_id}");
1828
1829 let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1831 Ok(components) => components,
1832 Err(e) => {
1833 log::error!("Failed to get execution components for cancel: {e}");
1834 return Ok(());
1835 }
1836 };
1837
1838 let block_height = self.block_time_monitor.current_block_height() as u32;
1839
1840 let encoded = match self.encoder.get(&client_order_id) {
1842 Some(enc) => enc,
1843 None => {
1844 log::error!("Client order ID {client_order_id} not found in cache");
1845 anyhow::bail!("Client order ID not found in cache")
1846 }
1847 };
1848 let client_id_u32 = encoded.client_id;
1849
1850 log::info!(
1851 "[CANCEL_ORDER] Nautilus '{client_order_id}' -> dYdX u32={client_id_u32} | instrument={instrument_id}"
1852 );
1853
1854 let order_flags = self.get_order_context(client_id_u32).map_or_else(
1857 || {
1858 log::warn!(
1860 "Order context not found for {client_order_id}, deriving flags from order"
1861 );
1862 types::OrderLifetime::from_time_in_force(
1863 order_time_in_force, order_expire_time, false,
1866 order_builder.max_short_term_secs(),
1867 )
1868 .order_flags()
1869 },
1870 |ctx| ctx.order_flags,
1871 );
1872
1873 let clock = self.clock;
1874 let emitter = self.emitter.clone();
1875
1876 self.spawn_task("cancel_order", async move {
1877 let cancel_msg = match order_builder.build_cancel_order_with_flags(
1879 instrument_id,
1880 client_id_u32,
1881 order_flags,
1882 block_height,
1883 ) {
1884 Ok(msg) => msg,
1885 Err(e) => {
1886 log::error!("Failed to build cancel message for {client_order_id}: {e:?}");
1887 let ts_event = clock.get_time_ns();
1888 emitter.emit_order_cancel_rejected_event(
1889 strategy_id,
1890 instrument_id,
1891 client_order_id,
1892 venue_order_id,
1893 &format!("Cancel build failed: {e:?}"),
1894 ts_event,
1895 );
1896 return Ok(());
1897 }
1898 };
1899
1900 let cancel_op = format!("Cancel order {client_order_id}");
1902 let result = if order_flags == types::ORDER_FLAG_SHORT_TERM {
1903 broadcaster
1904 .broadcast_short_term(&tx_manager, vec![cancel_msg], &cancel_op)
1905 .await
1906 } else {
1907 broadcaster
1908 .broadcast_with_retry(&tx_manager, vec![cancel_msg], &cancel_op)
1909 .await
1910 };
1911
1912 match result {
1913 Ok(_) => {
1914 log::debug!("Successfully cancelled order: {client_order_id}");
1915 }
1916 Err(e) => {
1917 log::error!("Failed to cancel order {client_order_id}: {e:?}");
1918
1919 let ts_event = clock.get_time_ns();
1920 emitter.emit_order_cancel_rejected_event(
1921 strategy_id,
1922 instrument_id,
1923 client_order_id,
1924 venue_order_id,
1925 &format!("Cancel order failed: {e:?}"),
1926 ts_event,
1927 );
1928 }
1929 }
1930
1931 Ok(())
1932 });
1933
1934 Ok(())
1935 }
1936
1937 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1938 if !self.is_connected() {
1939 anyhow::bail!("Cannot cancel orders: not connected");
1940 }
1941
1942 let instrument_id = cmd.instrument_id;
1943 let order_side_filter = cmd.order_side;
1944
1945 let order_data: Vec<(ClientOrderId, TimeInForce, Option<UnixNanos>)> = {
1948 let cache = self.core.cache();
1949 cache
1950 .orders_open(None, None, None, None, None)
1951 .into_iter()
1952 .filter(|order| order.instrument_id() == instrument_id)
1953 .filter(|order| {
1954 order_side_filter == OrderSide::NoOrderSide
1955 || order.order_side() == order_side_filter
1956 })
1957 .map(|order| {
1958 (
1959 order.client_order_id(),
1960 order.time_in_force(),
1961 order.expire_time(),
1962 )
1963 })
1964 .collect()
1965 }; let short_term_count = order_data
1969 .iter()
1970 .filter(|(_, tif, _)| matches!(tif, TimeInForce::Ioc | TimeInForce::Fok))
1971 .count();
1972 let long_term_count = order_data.len() - short_term_count;
1973
1974 log::debug!(
1975 "Cancel all orders: total={}, short_term={}, long_term={}, instrument_id={instrument_id}, order_side={order_side_filter:?}",
1976 order_data.len(),
1977 short_term_count,
1978 long_term_count
1979 );
1980
1981 let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1983 Ok(components) => components,
1984 Err(e) => {
1985 log::error!("Failed to get execution components for cancel_all: {e}");
1986 return Ok(());
1987 }
1988 };
1989
1990 let block_height = self.block_time_monitor.current_block_height() as u32;
1991
1992 let mut orders_to_cancel = Vec::new();
1995
1996 for (client_order_id, _time_in_force, _expire_time) in &order_data {
1997 let Some(encoded) = self.encoder.get(client_order_id) else {
1998 log::warn!("Cannot cancel order {client_order_id}: not found in encoder");
1999 continue;
2000 };
2001 let client_id_u32 = encoded.client_id;
2002
2003 let Some(ctx) = self.get_order_context(client_id_u32) else {
2005 log::debug!(
2006 "Skipping cancel for {client_order_id}: order context already cleaned up (terminal)"
2007 );
2008 continue;
2009 };
2010 orders_to_cancel.push((instrument_id, client_id_u32, ctx.order_flags));
2011 }
2012
2013 if orders_to_cancel.is_empty() {
2014 return Ok(());
2015 }
2016
2017 log::debug!(
2018 "Cancel all: {} orders (short_term={}, long_term={}), instrument_id={instrument_id}, order_side={order_side_filter:?}",
2019 orders_to_cancel.len(),
2020 orders_to_cancel
2021 .iter()
2022 .filter(|(_, _, f)| *f == types::ORDER_FLAG_SHORT_TERM)
2023 .count(),
2024 orders_to_cancel
2025 .iter()
2026 .filter(|(_, _, f)| *f != types::ORDER_FLAG_SHORT_TERM)
2027 .count(),
2028 );
2029
2030 self.spawn_task("cancel_all_orders", async move {
2031 broadcast_partitioned_cancels(
2032 orders_to_cancel,
2033 block_height,
2034 tx_manager,
2035 broadcaster,
2036 order_builder,
2037 )
2038 .await
2039 });
2040
2041 Ok(())
2042 }
2043
2044 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
2045 if cmd.cancels.is_empty() {
2046 return Ok(());
2047 }
2048
2049 if !self.is_connected() {
2050 anyhow::bail!("Cannot cancel orders: not connected");
2051 }
2052
2053 let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
2055 Ok(components) => components,
2056 Err(e) => {
2057 log::error!("Failed to get execution components for batch cancel: {e}");
2058 return Ok(());
2059 }
2060 };
2061
2062 let mut orders_to_cancel = Vec::with_capacity(cmd.cancels.len());
2064 for cancel in &cmd.cancels {
2065 let client_order_id = cancel.client_order_id;
2066 let encoded = match self.encoder.get(&client_order_id) {
2067 Some(enc) => enc,
2068 None => {
2069 log::warn!(
2070 "No u32 mapping found for client_order_id={client_order_id}, skipping cancel"
2071 );
2072 continue;
2073 }
2074 };
2075 let client_id_u32 = encoded.client_id;
2076
2077 let Some(ctx) = self.get_order_context(client_id_u32) else {
2079 log::debug!(
2080 "Skipping cancel for {client_order_id}: order context already cleaned up (terminal)"
2081 );
2082 continue;
2083 };
2084
2085 orders_to_cancel.push((cancel.instrument_id, client_id_u32, ctx.order_flags));
2086 }
2087
2088 if orders_to_cancel.is_empty() {
2089 log::warn!("No valid orders to cancel in batch");
2090 return Ok(());
2091 }
2092
2093 let block_height = self.block_time_monitor.current_block_height() as u32;
2094
2095 log::debug!(
2096 "Batch cancelling {} orders via partitioned strategy",
2097 orders_to_cancel.len(),
2098 );
2099
2100 self.spawn_task("batch_cancel_orders", async move {
2101 broadcast_partitioned_cancels(
2102 orders_to_cancel,
2103 block_height,
2104 tx_manager,
2105 broadcaster,
2106 order_builder,
2107 )
2108 .await
2109 });
2110
2111 Ok(())
2112 }
2113
2114 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
2115 let http_client = self.http_client.clone();
2116 let wallet_address = self.wallet_address.clone();
2117 let subaccount_number = self.subaccount_number;
2118 let account_id = self.core.account_id;
2119 let emitter = self.emitter.clone();
2120
2121 self.spawn_task("query_account", async move {
2122 let account_state = http_client
2123 .request_account_state(&wallet_address, subaccount_number, account_id)
2124 .await
2125 .context("failed to query account state")?;
2126
2127 emitter.emit_account_state(
2128 account_state.balances.clone(),
2129 account_state.margins.clone(),
2130 account_state.is_reported,
2131 account_state.ts_event,
2132 );
2133 Ok(())
2134 });
2135
2136 Ok(())
2137 }
2138
2139 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
2140 log::debug!("Querying order: client_order_id={}", cmd.client_order_id);
2141
2142 let http_client = self.http_client.clone();
2143 let wallet_address = self.wallet_address.clone();
2144 let subaccount_number = self.subaccount_number;
2145 let account_id = self.core.account_id;
2146 let emitter = self.emitter.clone();
2147 let client_order_id = cmd.client_order_id;
2148 let venue_order_id = cmd.venue_order_id;
2149 let instrument_id = cmd.instrument_id;
2150
2151 self.spawn_task("query_order", async move {
2152 let reports = http_client
2153 .request_order_status_reports(
2154 &wallet_address,
2155 subaccount_number,
2156 account_id,
2157 Some(instrument_id),
2158 )
2159 .await
2160 .context("failed to query order status")?;
2161
2162 let report = reports.into_iter().find(|r| {
2164 if venue_order_id.is_some_and(|vid| r.venue_order_id == vid) {
2165 return true;
2166 }
2167 r.client_order_id.is_some_and(|cid| cid == client_order_id)
2168 });
2169
2170 if let Some(report) = report {
2171 emitter.send_order_status_report(report);
2172 } else {
2173 log::warn!(
2174 "No order found for client_order_id={client_order_id}, venue_order_id={venue_order_id:?}"
2175 );
2176 }
2177
2178 Ok(())
2179 });
2180
2181 Ok(())
2182 }
2183
2184 async fn connect(&mut self) -> anyhow::Result<()> {
2185 if self.core.is_connected() {
2186 log::warn!("dYdX execution client already connected");
2187 return Ok(());
2188 }
2189
2190 log::info!("Connecting to dYdX");
2191
2192 log::debug!("Loading instruments from HTTP API");
2193 self.http_client.fetch_and_cache_instruments().await?;
2194 log::debug!(
2195 "Loaded {} instruments from HTTP into shared cache",
2196 self.http_client.cached_instruments_count()
2197 );
2198 self.mark_instruments_initialized();
2199
2200 let grpc_urls = self.config.get_grpc_urls();
2202 let mut grpc_client = DydxGrpcClient::new_with_fallback(&grpc_urls)
2203 .await
2204 .context("failed to construct dYdX gRPC client")?;
2205 log::debug!("gRPC client initialized");
2206
2207 let initial_height = grpc_client
2209 .latest_block_height()
2210 .await
2211 .context("failed to fetch initial block height")?;
2212 self.block_time_monitor
2214 .record_block(initial_height.0 as u64, chrono::Utc::now());
2215 log::info!("Initial block height: {}", initial_height.0);
2216
2217 *self.grpc_client.write().await = Some(grpc_client.clone());
2218
2219 let private_key =
2221 Self::resolve_private_key(&self.config).context("failed to resolve private key")?;
2222 let tx_manager = Arc::new(
2223 TransactionManager::new(
2224 grpc_client.clone(),
2225 &private_key,
2226 self.wallet_address.clone(),
2227 self.get_chain_id(),
2228 )
2229 .context("failed to create TransactionManager")?,
2230 );
2231
2232 tx_manager
2233 .resolve_authenticators()
2234 .await
2235 .context("failed to resolve authenticators")?;
2236
2237 tx_manager
2240 .initialize_sequence()
2241 .await
2242 .context("failed to initialize sequence")?;
2243
2244 self.tx_manager = Some(tx_manager);
2245 self.broadcaster = Some(Arc::new(TxBroadcaster::new(
2246 grpc_client,
2247 self.config.grpc_quota(),
2248 )));
2249 self.order_builder = Some(Arc::new(OrderMessageBuilder::new(
2250 self.http_client.clone(),
2251 self.wallet_address.clone(),
2252 self.subaccount_number,
2253 self.block_time_monitor.clone(),
2254 )));
2255 log::debug!(
2256 "OrderMessageBuilder initialized (block_time_monitor ready: {}, max_short_term: {:.1}s)",
2257 self.block_time_monitor.is_ready(),
2258 SHORT_TERM_ORDER_MAXIMUM_LIFETIME as f64
2259 * self.block_time_monitor.seconds_per_block_or_default()
2260 );
2261
2262 self.ws_client.connect().await?;
2264 log::debug!("WebSocket connected");
2265
2266 self.ws_client.subscribe_block_height().await?;
2268 log::debug!("Subscribed to block height updates");
2269
2270 self.ws_client.subscribe_markets().await?;
2272 log::debug!("Subscribed to markets");
2273
2274 log::info!(
2276 "Using wallet address for queries: {} (subaccount {})",
2277 self.wallet_address,
2278 self.subaccount_number
2279 );
2280 self.ws_client
2281 .subscribe_subaccount(&self.wallet_address, self.subaccount_number)
2282 .await?;
2283 log::debug!(
2284 "Subscribed to subaccount updates: {}/{}",
2285 self.wallet_address,
2286 self.subaccount_number
2287 );
2288
2289 let stream = self.ws_client.stream();
2290 self.spawn_ws_stream_handler(stream);
2291
2292 self.await_account_registered(30.0).await?;
2296
2297 self.core.set_connected();
2298 log::info!("Connected: client_id={}", self.core.client_id);
2299 Ok(())
2300 }
2301
2302 async fn disconnect(&mut self) -> anyhow::Result<()> {
2303 if self.core.is_disconnected() {
2304 log::warn!("dYdX execution client not connected");
2305 return Ok(());
2306 }
2307
2308 log::info!("Disconnecting from dYdX");
2309
2310 let _ = self
2312 .ws_client
2313 .unsubscribe_subaccount(&self.wallet_address, self.subaccount_number)
2314 .await
2315 .map_err(|e| log::warn!("Failed to unsubscribe from subaccount: {e}"));
2316
2317 let _ = self
2319 .ws_client
2320 .unsubscribe_markets()
2321 .await
2322 .map_err(|e| log::warn!("Failed to unsubscribe from markets: {e}"));
2323
2324 let _ = self
2326 .ws_client
2327 .unsubscribe_block_height()
2328 .await
2329 .map_err(|e| log::warn!("Failed to unsubscribe from block height: {e}"));
2330
2331 self.ws_client.disconnect().await?;
2333
2334 if let Some(handle) = self.ws_stream_handle.take() {
2336 handle.abort();
2337 log::debug!("Aborted WebSocket message processing task");
2338 }
2339
2340 self.abort_pending_tasks();
2342
2343 self.core.set_disconnected();
2344 log::info!("Disconnected: client_id={}", self.core.client_id);
2345 Ok(())
2346 }
2347
2348 async fn generate_order_status_report(
2349 &self,
2350 cmd: &GenerateOrderStatusReport,
2351 ) -> anyhow::Result<Option<OrderStatusReport>> {
2352 const ORDER_LOOKUP_LIMIT: u32 = 1_000;
2357
2358 let market = cmd
2360 .instrument_id
2361 .map(|id| id.symbol.as_str().trim_end_matches("-PERP").to_string());
2362
2363 let response = self
2364 .http_client
2365 .inner
2366 .get_orders(
2367 &self.wallet_address,
2368 self.subaccount_number,
2369 market.as_deref(),
2370 Some(ORDER_LOOKUP_LIMIT),
2371 )
2372 .await
2373 .context("failed to fetch order from dYdX API")?;
2374
2375 if response.is_empty() {
2376 log::debug!(
2377 "No orders returned for {}/subaccount={} (market_filter={:?})",
2378 self.wallet_address,
2379 self.subaccount_number,
2380 market,
2381 );
2382 return Ok(None);
2383 }
2384
2385 let ts_init = UnixNanos::default();
2386 let scanned_count = response.len();
2387
2388 let report = find_matching_order_report(
2389 &response,
2390 cmd.instrument_id,
2391 cmd.client_order_id,
2392 cmd.venue_order_id,
2393 |clob_pair_id| self.get_instrument_by_clob_pair_id(clob_pair_id),
2394 &self.encoder,
2395 self.core.account_id,
2396 ts_init,
2397 )?;
2398
2399 if report.is_none() {
2400 let page_full = scanned_count == ORDER_LOOKUP_LIMIT as usize;
2404 log::debug!(
2405 "No order matched filters for {}/subaccount={} \
2406 (client_order_id={:?}, venue_order_id={:?}, instrument_id={:?}, \
2407 scanned={scanned_count}, page_full={page_full}, limit={ORDER_LOOKUP_LIMIT})",
2408 self.wallet_address,
2409 self.subaccount_number,
2410 cmd.client_order_id,
2411 cmd.venue_order_id,
2412 cmd.instrument_id,
2413 );
2414 }
2415
2416 Ok(report)
2417 }
2418
2419 async fn generate_order_status_reports(
2420 &self,
2421 cmd: &GenerateOrderStatusReports,
2422 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2423 let response = self
2425 .http_client
2426 .inner
2427 .get_orders(
2428 &self.wallet_address,
2429 self.subaccount_number,
2430 None, None, )
2433 .await
2434 .context("failed to fetch orders from dYdX API")?;
2435
2436 let mut reports = Vec::new();
2437 let ts_init = UnixNanos::default();
2438
2439 for order in response {
2440 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
2441 Some(inst) => inst,
2442 None => continue,
2443 };
2444
2445 if let Some(filter_id) = cmd.instrument_id
2446 && instrument.id() != filter_id
2447 {
2448 continue;
2449 }
2450
2451 match parse_order_status_report(&order, &instrument, self.core.account_id, ts_init) {
2452 Ok(mut r) => {
2453 if !order.client_id.is_empty()
2454 && let Ok(client_id_u32) = order.client_id.parse::<u32>()
2455 {
2456 self.encoder.register_known_client_id(client_id_u32);
2457
2458 if let Some(decoded) = self
2459 .encoder
2460 .decode_if_known(client_id_u32, order.client_metadata)
2461 {
2462 log::debug!(
2463 "Decoded order: dYdX client_id={} meta={:#x} -> '{}'",
2464 client_id_u32,
2465 order.client_metadata,
2466 decoded,
2467 );
2468 r.client_order_id = Some(decoded);
2469 }
2470 }
2471 reports.push(r);
2472 }
2473 Err(e) => {
2474 log::warn!("Failed to parse order status report: {e}");
2475 }
2476 }
2477 }
2478
2479 if cmd.open_only {
2481 reports.retain(|r| r.order_status.is_open());
2482 }
2483
2484 if let Some(start) = cmd.start {
2486 reports.retain(|r| r.ts_last >= start);
2487 }
2488
2489 if let Some(end) = cmd.end {
2490 reports.retain(|r| r.ts_last <= end);
2491 }
2492
2493 Ok(reports)
2494 }
2495
2496 async fn generate_fill_reports(
2497 &self,
2498 cmd: GenerateFillReports,
2499 ) -> anyhow::Result<Vec<FillReport>> {
2500 let response = self
2501 .http_client
2502 .inner
2503 .get_fills(
2504 &self.wallet_address,
2505 self.subaccount_number,
2506 None, None, )
2509 .await
2510 .context("failed to fetch fills from dYdX API")?;
2511
2512 let mut reports = Vec::new();
2513 let ts_init = UnixNanos::default();
2514
2515 for fill in response.fills {
2516 let instrument = match self.get_instrument_by_market(&fill.market) {
2517 Some(inst) => inst,
2518 None => {
2519 log::warn!("Unknown market in fill: {}", fill.market);
2520 continue;
2521 }
2522 };
2523
2524 if let Some(filter_id) = cmd.instrument_id
2525 && instrument.id() != filter_id
2526 {
2527 continue;
2528 }
2529
2530 let report = match parse_fill_report(&fill, &instrument, self.core.account_id, ts_init)
2531 {
2532 Ok(r) => r,
2533 Err(e) => {
2534 log::warn!("Failed to parse fill report: {e}");
2535 continue;
2536 }
2537 };
2538
2539 reports.push(report);
2540 }
2541
2542 if let Some(venue_order_id) = cmd.venue_order_id {
2543 reports.retain(|r| r.venue_order_id.as_str() == venue_order_id.as_str());
2544 }
2545
2546 Ok(reports)
2547 }
2548
2549 async fn generate_position_status_reports(
2550 &self,
2551 cmd: &GeneratePositionStatusReports,
2552 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2553 let response = self
2555 .http_client
2556 .inner
2557 .get_subaccount(&self.wallet_address, self.subaccount_number)
2558 .await
2559 .context("failed to fetch subaccount from dYdX API")?;
2560
2561 let mut reports = Vec::new();
2562 let ts_init = UnixNanos::default();
2563
2564 for (market_ticker, perp_position) in &response.subaccount.open_perpetual_positions {
2565 let instrument = match self.get_instrument_by_market(market_ticker) {
2566 Some(inst) => inst,
2567 None => {
2568 log::warn!("Unknown market in position: {market_ticker}");
2569 continue;
2570 }
2571 };
2572
2573 if let Some(filter_id) = cmd.instrument_id
2574 && instrument.id() != filter_id
2575 {
2576 continue;
2577 }
2578
2579 let report = match parse_position_status_report(
2580 perp_position,
2581 &instrument,
2582 self.core.account_id,
2583 ts_init,
2584 ) {
2585 Ok(r) => r,
2586 Err(e) => {
2587 log::warn!("Failed to parse position status report: {e}");
2588 continue;
2589 }
2590 };
2591
2592 reports.push(report);
2593 }
2594
2595 Ok(reports)
2596 }
2597
2598 async fn generate_mass_status(
2599 &self,
2600 lookback_mins: Option<u64>,
2601 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
2602 let ts_init = UnixNanos::default();
2603
2604 let orders_response = self
2606 .http_client
2607 .inner
2608 .get_orders(&self.wallet_address, self.subaccount_number, None, None)
2609 .await
2610 .context("failed to fetch orders for mass status")?;
2611
2612 let subaccount_response = self
2614 .http_client
2615 .inner
2616 .get_subaccount(&self.wallet_address, self.subaccount_number)
2617 .await
2618 .context("failed to fetch subaccount for mass status")?;
2619
2620 let fills_response = self
2622 .http_client
2623 .inner
2624 .get_fills(&self.wallet_address, self.subaccount_number, None, None)
2625 .await
2626 .context("failed to fetch fills for mass status")?;
2627
2628 let mut order_reports = Vec::new();
2630 let mut orders_filtered = 0usize;
2631
2632 for order in orders_response {
2633 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
2634 Some(inst) => inst,
2635 None => {
2636 orders_filtered += 1;
2637 continue;
2638 }
2639 };
2640
2641 match parse_order_status_report(&order, &instrument, self.core.account_id, ts_init) {
2642 Ok(mut r) => {
2643 if !order.client_id.is_empty()
2644 && let Ok(client_id_u32) = order.client_id.parse::<u32>()
2645 {
2646 self.encoder.register_known_client_id(client_id_u32);
2647
2648 if let Some(decoded) = self
2649 .encoder
2650 .decode_if_known(client_id_u32, order.client_metadata)
2651 {
2652 log::debug!(
2653 "Decoded reconciliation order: dYdX client_id={} meta={:#x} -> '{}'",
2654 client_id_u32,
2655 order.client_metadata,
2656 decoded,
2657 );
2658 r.client_order_id = Some(decoded);
2659 }
2660 }
2661 order_reports.push(r);
2662 }
2663 Err(e) => {
2664 log::warn!("Failed to parse order status report: {e}");
2665 orders_filtered += 1;
2666 }
2667 }
2668 }
2669
2670 let mut position_reports = Vec::new();
2672
2673 for (market_ticker, perp_position) in
2674 &subaccount_response.subaccount.open_perpetual_positions
2675 {
2676 let instrument = match self.get_instrument_by_market(market_ticker) {
2677 Some(inst) => inst,
2678 None => continue,
2679 };
2680
2681 match parse_position_status_report(
2682 perp_position,
2683 &instrument,
2684 self.core.account_id,
2685 ts_init,
2686 ) {
2687 Ok(r) => position_reports.push(r),
2688 Err(e) => {
2689 log::warn!("Failed to parse position status report: {e}");
2690 }
2691 }
2692 }
2693
2694 let mut fill_reports = Vec::new();
2696 let mut fills_filtered = 0usize;
2697
2698 for fill in fills_response.fills {
2699 let instrument = match self.get_instrument_by_market(&fill.market) {
2700 Some(inst) => inst,
2701 None => {
2702 fills_filtered += 1;
2703 continue;
2704 }
2705 };
2706
2707 match parse_fill_report(&fill, &instrument, self.core.account_id, ts_init) {
2708 Ok(r) => fill_reports.push(r),
2709 Err(e) => {
2710 log::warn!("Failed to parse fill report: {e}");
2711 fills_filtered += 1;
2712 }
2713 }
2714 }
2715
2716 apply_avg_px_from_fills(&mut order_reports, &fill_reports);
2717
2718 if let Some(mins) = lookback_mins {
2720 let now_ns = self.clock.get_time_ns();
2721 let cutoff_ns = now_ns.as_u64().saturating_sub(mins * 60 * 1_000_000_000);
2722 let cutoff = UnixNanos::from(cutoff_ns);
2723
2724 let orders_before = order_reports.len();
2725 order_reports.retain(|r| r.ts_last >= cutoff);
2726 let orders_removed = orders_before - order_reports.len();
2727
2728 let fills_before = fill_reports.len();
2729 fill_reports.retain(|r| r.ts_event >= cutoff);
2730 let fills_removed = fills_before - fill_reports.len();
2731
2732 log::info!(
2733 "Lookback filter ({}min): orders {}->{} (removed {}), fills {}->{} (removed {}), positions {} (unfiltered)",
2734 mins,
2735 orders_before,
2736 order_reports.len(),
2737 orders_removed,
2738 fills_before,
2739 fill_reports.len(),
2740 fills_removed,
2741 position_reports.len(),
2742 );
2743 } else {
2744 log::debug!(
2745 "Generated mass status: {} orders ({} filtered), {} positions, {} fills ({} filtered)",
2746 order_reports.len(),
2747 orders_filtered,
2748 position_reports.len(),
2749 fill_reports.len(),
2750 fills_filtered,
2751 );
2752 }
2753
2754 let mut mass_status = ExecutionMassStatus::new(
2756 self.core.client_id,
2757 self.core.account_id,
2758 self.core.venue,
2759 ts_init,
2760 None, );
2762
2763 mass_status.add_order_reports(order_reports);
2764 mass_status.add_position_reports(position_reports);
2765 mass_status.add_fill_reports(fill_reports);
2766
2767 Ok(Some(mass_status))
2768 }
2769}
2770
2771#[allow(clippy::too_many_arguments)]
2775fn find_matching_order_report<F>(
2776 orders: &[crate::http::models::Order],
2777 instrument_filter: Option<InstrumentId>,
2778 client_order_id_filter: Option<ClientOrderId>,
2779 venue_order_id_filter: Option<VenueOrderId>,
2780 lookup_instrument: F,
2781 encoder: &ClientOrderIdEncoder,
2782 account_id: AccountId,
2783 ts_init: UnixNanos,
2784) -> anyhow::Result<Option<OrderStatusReport>>
2785where
2786 F: Fn(u32) -> Option<InstrumentAny>,
2787{
2788 for order in orders {
2789 let instrument = match lookup_instrument(order.clob_pair_id) {
2790 Some(inst) => inst,
2791 None => continue,
2792 };
2793
2794 if let Some(filter_id) = instrument_filter
2795 && instrument.id() != filter_id
2796 {
2797 continue;
2798 }
2799
2800 let mut report = parse_order_status_report(order, &instrument, account_id, ts_init)
2801 .context("failed to parse order status report")?;
2802
2803 if !order.client_id.is_empty()
2804 && let Ok(client_id_u32) = order.client_id.parse::<u32>()
2805 {
2806 encoder.register_known_client_id(client_id_u32);
2807
2808 if let Some(decoded) = encoder.decode_if_known(client_id_u32, order.client_metadata) {
2809 log::debug!(
2810 "Decoded order: dYdX client_id={} meta={:#x} -> '{}'",
2811 client_id_u32,
2812 order.client_metadata,
2813 decoded,
2814 );
2815 report.client_order_id = Some(decoded);
2816 }
2817 }
2818
2819 if let Some(client_order_id) = client_order_id_filter
2820 && report.client_order_id != Some(client_order_id)
2821 {
2822 continue;
2823 }
2824
2825 if let Some(venue_order_id) = venue_order_id_filter
2826 && report.venue_order_id.as_str() != venue_order_id.as_str()
2827 {
2828 continue;
2829 }
2830
2831 return Ok(Some(report));
2832 }
2833
2834 Ok(None)
2835}
2836
2837#[cfg(test)]
2838mod tests {
2839 use nautilus_model::{
2840 enums::OrderSide as NautilusOrderSide,
2841 identifiers::Symbol,
2842 instruments::{CryptoPerpetual, InstrumentAny},
2843 types::{Currency, Price, Quantity},
2844 };
2845 use rstest::rstest;
2846 use rust_decimal_macros::dec;
2847
2848 use super::*;
2849 use crate::{
2850 common::enums::{DydxOrderStatus, DydxOrderType, DydxTimeInForce},
2851 http::models::Order,
2852 };
2853
2854 fn test_instrument(symbol: &str, venue: &str) -> InstrumentAny {
2855 let instrument_id = InstrumentId::new(Symbol::new(symbol), Venue::new(venue));
2856 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2857 instrument_id,
2858 instrument_id.symbol,
2859 Currency::BTC(),
2860 Currency::USD(),
2861 Currency::USD(),
2862 false,
2863 2,
2864 3,
2865 Price::new(0.01, 2),
2866 Quantity::new(0.001, 3),
2867 None,
2868 None,
2869 None,
2870 None,
2871 None,
2872 None,
2873 None,
2874 None,
2875 None,
2876 None,
2877 None,
2878 None,
2879 None,
2880 UnixNanos::default(),
2881 UnixNanos::default(),
2882 ))
2883 }
2884
2885 fn test_order(id: &str, clob_pair_id: u32, client_id: &str) -> Order {
2886 Order {
2887 id: id.to_string(),
2888 subaccount_id: "sub-1".to_string(),
2889 client_id: client_id.to_string(),
2890 clob_pair_id,
2891 side: NautilusOrderSide::Buy,
2892 size: dec!(1.0),
2893 total_filled: dec!(0),
2894 price: dec!(50000),
2895 status: DydxOrderStatus::Open,
2896 order_type: DydxOrderType::Limit,
2897 time_in_force: DydxTimeInForce::Gtt,
2898 reduce_only: false,
2899 post_only: false,
2900 order_flags: 64,
2901 good_til_block: None,
2902 good_til_block_time: None,
2903 created_at_height: Some(100),
2904 client_metadata: 4,
2905 trigger_price: None,
2906 condition_type: None,
2907 conditional_order_trigger_subticks: None,
2908 execution: None,
2909 updated_at: None,
2910 updated_at_height: None,
2911 ticker: None,
2912 subaccount_number: 0,
2913 order_router_address: None,
2914 }
2915 }
2916
2917 #[rstest]
2918 fn test_find_matching_order_report_returns_later_match() {
2919 let btc_inst = test_instrument("BTC-USD-PERP", "DYDX");
2923 let eth_inst = test_instrument("ETH-USD-PERP", "DYDX");
2924
2925 let orders = vec![
2927 test_order("order-eth", 1, "22222"),
2928 test_order("order-btc", 0, "11111"),
2929 ];
2930
2931 let encoder = ClientOrderIdEncoder::new();
2932 let report = find_matching_order_report(
2933 &orders,
2934 Some(btc_inst.id()),
2935 None,
2936 None,
2937 |clob_pair_id| match clob_pair_id {
2938 0 => Some(btc_inst.clone()),
2939 1 => Some(eth_inst.clone()),
2940 _ => None,
2941 },
2942 &encoder,
2943 AccountId::new("DYDX-001"),
2944 UnixNanos::default(),
2945 )
2946 .expect("lookup should succeed");
2947
2948 let report = report.expect("matching order should be found");
2949 assert_eq!(report.instrument_id, btc_inst.id());
2950 assert_eq!(report.venue_order_id.as_str(), "order-btc");
2951 }
2952
2953 #[rstest]
2954 fn test_find_matching_order_report_returns_none_when_no_match() {
2955 let btc_inst = test_instrument("BTC-USD-PERP", "DYDX");
2956 let eth_inst = test_instrument("ETH-USD-PERP", "DYDX");
2957
2958 let orders = vec![
2959 test_order("order-eth-1", 1, "22222"),
2960 test_order("order-eth-2", 1, "33333"),
2961 ];
2962
2963 let encoder = ClientOrderIdEncoder::new();
2964 let report = find_matching_order_report(
2965 &orders,
2966 Some(btc_inst.id()),
2967 None,
2968 None,
2969 |clob_pair_id| match clob_pair_id {
2970 0 => Some(btc_inst.clone()),
2971 1 => Some(eth_inst.clone()),
2972 _ => None,
2973 },
2974 &encoder,
2975 AccountId::new("DYDX-001"),
2976 UnixNanos::default(),
2977 )
2978 .expect("lookup should succeed");
2979
2980 assert!(report.is_none());
2981 }
2982
2983 #[rstest]
2984 fn test_find_matching_order_report_filters_by_venue_order_id() {
2985 let btc_inst = test_instrument("BTC-USD-PERP", "DYDX");
2986
2987 let orders = vec![
2988 test_order("order-a", 0, "11111"),
2989 test_order("order-b", 0, "22222"),
2990 test_order("order-c", 0, "33333"),
2991 ];
2992
2993 let encoder = ClientOrderIdEncoder::new();
2994 let target = VenueOrderId::new("order-b");
2995 let report = find_matching_order_report(
2996 &orders,
2997 None,
2998 None,
2999 Some(target),
3000 |_| Some(btc_inst.clone()),
3001 &encoder,
3002 AccountId::new("DYDX-001"),
3003 UnixNanos::default(),
3004 )
3005 .expect("lookup should succeed")
3006 .expect("matching order should be found");
3007
3008 assert_eq!(report.venue_order_id.as_str(), "order-b");
3009 }
3010
3011 #[rstest]
3012 fn test_find_matching_order_report_skips_orders_without_cached_instrument() {
3013 let btc_inst = test_instrument("BTC-USD-PERP", "DYDX");
3014
3015 let orders = vec![
3016 test_order("order-unknown", 99, "11111"),
3018 test_order("order-btc", 0, "22222"),
3019 ];
3020
3021 let encoder = ClientOrderIdEncoder::new();
3022 let report = find_matching_order_report(
3023 &orders,
3024 Some(btc_inst.id()),
3025 None,
3026 None,
3027 |clob_pair_id| (clob_pair_id == 0).then(|| btc_inst.clone()),
3028 &encoder,
3029 AccountId::new("DYDX-001"),
3030 UnixNanos::default(),
3031 )
3032 .expect("lookup should succeed")
3033 .expect("matching order should be found");
3034
3035 assert_eq!(report.venue_order_id.as_str(), "order-btc");
3036 }
3037}