Skip to main content

nautilus_dydx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the dYdX adapter.
17//!
18//! This module provides the execution client for submitting orders, cancellations,
19//! and managing positions on dYdX v4.
20//!
21//! # Order Types
22//!
23//! dYdX supports the following order types:
24//!
25//! - **Market**: Execute immediately at best available price.
26//! - **Limit**: Execute at specified price or better.
27//! - **Stop Market**: Triggered when price crosses stop price, then executes as market order.
28//! - **Stop Limit**: Triggered when price crosses stop price, then places limit order.
29//! - **Take Profit Market**: Close position at profit target, executes as market order.
30//! - **Take Profit Limit**: Close position at profit target, places limit order.
31//!
32//! See <https://docs.dydx.xyz/concepts/trading/orders#types> for details.
33//!
34//! # Order Lifetimes
35//!
36//! Orders can be short-term (expire by block height) or long-term/stateful (expire by timestamp).
37//! Conditional orders (Stop/TakeProfit) are always stateful.
38//!
39//! See <https://docs.dydx.xyz/concepts/trading/orders#short-term-vs-long-term> for details.
40
41use std::{
42    sync::{Arc, Mutex},
43    time::{Duration, Instant},
44};
45
46use ahash::AHashMap;
47use anyhow::Context;
48use async_trait::async_trait;
49use dashmap::DashMap;
50use futures_util::{Stream, StreamExt, pin_mut};
51use nautilus_common::{
52    clients::ExecutionClient,
53    live::{get_runtime, runner::get_exec_event_sender},
54    messages::execution::{
55        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
56        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
57        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
58    },
59};
60use nautilus_core::{
61    MUTEX_POISONED, UUID4, UnixNanos,
62    time::{AtomicTime, get_atomic_clock_realtime},
63};
64use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
65use nautilus_model::{
66    accounts::AccountAny,
67    enums::{AccountType, OmsType, OrderSide, OrderStatus, OrderType, TimeInForce},
68    events::{AccountState, OrderAccepted, OrderCanceled, OrderEventAny},
69    identifiers::{
70        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Symbol, Venue, VenueOrderId,
71    },
72    instruments::{Instrument, InstrumentAny},
73    orders::Order,
74    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
75    types::{AccountBalance, Currency, MarginBalance, Money},
76};
77use nautilus_network::retry::RetryConfig;
78use rust_decimal::Decimal;
79use tokio::task::JoinHandle;
80
81use crate::{
82    common::{
83        consts::DYDX_VENUE,
84        credential::{DydxCredential, credential_env_vars},
85        instrument_cache::InstrumentCache,
86        parse::nanos_to_secs_i64,
87    },
88    config::DydxAdapterConfig,
89    execution::{
90        broadcaster::TxBroadcaster,
91        encoder::ClientOrderIdEncoder,
92        order_builder::OrderMessageBuilder,
93        tx_manager::TransactionManager,
94        types::{LimitOrderParams, OrderContext},
95    },
96    grpc::{DydxGrpcClient, SHORT_TERM_ORDER_MAXIMUM_LIFETIME, types::ChainId},
97    http::{
98        client::DydxHttpClient,
99        parse::{
100            parse_account_state, parse_fill_report, parse_order_status_report,
101            parse_position_status_report,
102        },
103    },
104    websocket::{
105        DydxWsDispatchState, OrderIdentity,
106        client::DydxWebSocketClient,
107        enums::DydxWsOutputMessage,
108        fill_report_to_order_filled,
109        parse::{parse_ws_fill_report, parse_ws_order_report, parse_ws_position_report},
110    },
111};
112
113pub mod block_time;
114pub mod broadcaster;
115pub mod encoder;
116pub mod order_builder;
117pub mod submitter;
118pub mod tx_manager;
119pub mod types;
120pub mod wallet;
121
122use block_time::BlockTimeMonitor;
123
124fn apply_avg_px_from_fills(order_reports: &mut [OrderStatusReport], fill_reports: &[FillReport]) {
125    let mut totals: AHashMap<VenueOrderId, (Decimal, Decimal)> = AHashMap::new();
126    for fill in fill_reports {
127        let entry = totals.entry(fill.venue_order_id).or_default();
128        let qty = fill.last_qty.as_decimal();
129        entry.0 += fill.last_px.as_decimal() * qty;
130        entry.1 += qty;
131    }
132
133    for report in order_reports {
134        if let Some((notional, total_qty)) = totals.get(&report.venue_order_id)
135            && !total_qty.is_zero()
136        {
137            report.avg_px = Some(notional / total_qty);
138        }
139    }
140}
141
142/// Live execution client for the dYdX v4 exchange adapter.
143///
144/// Supports Market, Limit, Stop Market, Stop Limit, Take Profit Market (MarketIfTouched),
145/// and Take Profit Limit (LimitIfTouched) orders via gRPC. Trailing stops are NOT supported
146/// by the dYdX v4 protocol. dYdX requires u32 client IDs - strings are hashed to fit.
147///
148/// # Architecture
149///
150/// The client follows a two-layer execution model:
151/// 1. **Synchronous validation** - Immediate checks and event generation.
152/// 2. **Async submission** - Non-blocking gRPC calls via `TransactionManager`, `TxBroadcaster`, and `OrderMessageBuilder`.
153///
154/// This matches the pattern used in OKX and other exchange adapters, ensuring
155/// consistent behavior across the Nautilus ecosystem.
156#[derive(Debug)]
157pub struct DydxExecutionClient {
158    core: ExecutionClientCore,
159    clock: &'static AtomicTime,
160    config: DydxAdapterConfig,
161    emitter: ExecutionEventEmitter,
162    http_client: DydxHttpClient,
163    ws_client: DydxWebSocketClient,
164    grpc_client: Arc<tokio::sync::RwLock<Option<DydxGrpcClient>>>,
165    instrument_cache: Arc<InstrumentCache>,
166    block_time_monitor: Arc<BlockTimeMonitor>,
167    oracle_prices: Arc<DashMap<InstrumentId, Decimal>>,
168    encoder: Arc<ClientOrderIdEncoder>,
169    dispatch_state: Arc<DydxWsDispatchState>,
170    order_contexts: Arc<DashMap<u32, OrderContext>>,
171    order_id_map: Arc<DashMap<String, (u32, u32)>>,
172    wallet_address: String,
173    subaccount_number: u32,
174    tx_manager: Option<Arc<TransactionManager>>,
175    broadcaster: Option<Arc<TxBroadcaster>>,
176    order_builder: Option<Arc<OrderMessageBuilder>>,
177    ws_stream_handle: Option<JoinHandle<()>>,
178    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
179}
180
181impl DydxExecutionClient {
182    /// Creates a new [`DydxExecutionClient`].
183    ///
184    /// # Errors
185    ///
186    /// Returns an error if credentials are not found or client fails to construct.
187    pub fn new(
188        core: ExecutionClientCore,
189        config: DydxAdapterConfig,
190        wallet_address: String,
191        subaccount_number: u32,
192    ) -> anyhow::Result<Self> {
193        let trader_id = core.trader_id;
194        let account_id = core.account_id;
195        let clock = get_atomic_clock_realtime();
196        let emitter =
197            ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
198
199        let retry_config = RetryConfig {
200            max_retries: config.max_retries,
201            initial_delay_ms: config.retry_delay_initial_ms,
202            max_delay_ms: config.retry_delay_max_ms,
203            ..Default::default()
204        };
205        let http_client = DydxHttpClient::new(
206            Some(config.base_url.clone()),
207            config.timeout_secs,
208            config.proxy_url.clone(),
209            config.network,
210            Some(retry_config),
211        )?;
212
213        // Share the HTTP client's instrument cache with WebSocket client
214        let instrument_cache = http_client.instrument_cache().clone();
215
216        // Use private WebSocket client for authenticated subaccount subscriptions
217        let credential = DydxCredential::resolve(
218            config.private_key.as_deref(),
219            config.network,
220            config.authenticator_ids.clone(),
221        )?
222        .ok_or_else(|| anyhow::anyhow!("Credentials required for execution client"))?;
223
224        // Create WS client with shared instrument cache
225        let ws_client = DydxWebSocketClient::new_private_with_cache(
226            config.ws_url.clone(),
227            credential,
228            core.account_id,
229            instrument_cache.clone(),
230            Some(20),
231            config.transport_backend,
232            config.proxy_url.clone(),
233        );
234
235        let grpc_client = Arc::new(tokio::sync::RwLock::new(None));
236
237        Ok(Self {
238            core,
239            clock,
240            config,
241            emitter,
242            http_client,
243            ws_client,
244            grpc_client,
245            instrument_cache,
246            block_time_monitor: Arc::new(BlockTimeMonitor::new()),
247            oracle_prices: Arc::new(DashMap::new()),
248            encoder: Arc::new(ClientOrderIdEncoder::new()),
249            dispatch_state: Arc::new(DydxWsDispatchState::default()),
250            order_contexts: Arc::new(DashMap::new()),
251            order_id_map: Arc::new(DashMap::new()),
252            wallet_address,
253            subaccount_number,
254            tx_manager: None,
255            broadcaster: None,
256            order_builder: None,
257            ws_stream_handle: None,
258            pending_tasks: Mutex::new(Vec::new()),
259        })
260    }
261
262    fn resolve_private_key(config: &DydxAdapterConfig) -> anyhow::Result<String> {
263        let (private_key_env, _) = credential_env_vars(config.network);
264
265        // 1. Try private key from config
266        if let Some(ref pk) = config.private_key
267            && !pk.trim().is_empty()
268        {
269            return Ok(pk.clone());
270        }
271
272        // 2. Try private key from env var
273        if let Some(pk) = std::env::var(private_key_env)
274            .ok()
275            .filter(|s| !s.trim().is_empty())
276        {
277            return Ok(pk);
278        }
279
280        anyhow::bail!("{private_key_env} not found in config or environment")
281    }
282
283    fn register_order_context(&self, client_id_u32: u32, context: OrderContext) {
284        self.order_contexts.insert(client_id_u32, context);
285    }
286
287    fn get_order_context(&self, client_id_u32: u32) -> Option<OrderContext> {
288        self.order_contexts
289            .get(&client_id_u32)
290            .map(|r| r.value().clone())
291    }
292
293    fn get_chain_id(&self) -> ChainId {
294        self.config.get_chain_id()
295    }
296
297    fn spawn_ws_stream_handler(
298        &mut self,
299        stream: impl Stream<Item = DydxWsOutputMessage> + Send + 'static,
300    ) {
301        if self.ws_stream_handle.is_some() {
302            return;
303        }
304
305        log::debug!("Starting execution WebSocket message processing task");
306
307        // Clone data needed for account state parsing in spawned task
308        let trader_id = self.core.trader_id;
309        let account_id = self.core.account_id;
310        let instrument_cache = self.instrument_cache.clone();
311        let oracle_prices = self.oracle_prices.clone();
312        let encoder = self.encoder.clone();
313        let order_contexts = self.order_contexts.clone();
314        let order_id_map = self.order_id_map.clone();
315        let dispatch_state = self.dispatch_state.clone();
316        let block_time_monitor = self.block_time_monitor.clone();
317        let emitter = self.emitter.clone();
318        let clock = self.clock;
319
320        let handle = get_runtime().spawn(async move {
321            log::debug!("Execution WebSocket message loop started");
322
323            // Cumulative fill totals per untracked order for avg_px computation
324            let mut cum_fill_totals: AHashMap<VenueOrderId, (Decimal, Decimal)> =
325                AHashMap::new();
326
327            pin_mut!(stream);
328            while let Some(msg) = stream.next().await {
329                match msg {
330                    DydxWsOutputMessage::SubaccountSubscribed(msg) => {
331                        log::debug!("Parsing subaccount subscription with full context");
332
333                        let inst_map = instrument_cache.to_instrument_id_map();
334
335                        let oracle_map: std::collections::HashMap<_, _> = oracle_prices
336                            .iter()
337                            .map(|entry| (*entry.key(), *entry.value()))
338                            .collect();
339
340                        let ts_init = clock.get_time_ns();
341                        let ts_event = ts_init;
342
343                        if let Some(ref subaccount) = msg.contents.subaccount {
344                        match parse_account_state(
345                            subaccount,
346                            account_id,
347                            &inst_map,
348                            &oracle_map,
349                            ts_event,
350                            ts_init,
351                        ) {
352                            Ok(account_state) => {
353                                log::debug!(
354                                    "Parsed account state: {} balance(s), {} margin(s)",
355                                    account_state.balances.len(),
356                                    account_state.margins.len()
357                                );
358                                emitter.send_account_state(account_state);
359                            }
360                            Err(e) => {
361                                log::error!("Failed to parse account state: {e}");
362                            }
363                        }
364
365                        if let Some(ref positions) =
366                            subaccount.open_perpetual_positions
367                        {
368                            log::debug!(
369                                "Parsing {} position(s) from subscription",
370                                positions.len()
371                            );
372
373                            for (market, ws_position) in positions {
374                                match parse_ws_position_report(
375                                    ws_position,
376                                    &instrument_cache,
377                                    account_id,
378                                    ts_init,
379                                ) {
380                                    Ok(report) => {
381                                        log::debug!(
382                                            "Parsed position report: {} {} {} {}",
383                                            report.instrument_id,
384                                            report.position_side,
385                                            report.quantity,
386                                            market
387                                        );
388                                        emitter.send_position_report(report);
389                                    }
390                                    Err(e) => {
391                                        log::error!(
392                                            "Failed to parse WebSocket position for {market}: {e}"
393                                        );
394                                    }
395                                }
396                            }
397                        }
398                        } else {
399                            log::warn!("Subaccount subscription without initial state (new/empty subaccount)");
400
401                            let currency = Currency::get_or_create_crypto_with_context("USDC", None);
402                            let zero = Money::zero(currency);
403                            let balance = AccountBalance::new_checked(zero, zero, zero)
404                                .expect("zero balance should always be valid");
405                            let account_state = AccountState::new(
406                                account_id,
407                                AccountType::Margin,
408                                vec![balance],
409                                vec![],
410                                true,
411                                UUID4::new(),
412                                ts_init,
413                                ts_init,
414                                None,
415                            );
416                            emitter.send_account_state(account_state);
417                        }
418                    }
419                    DydxWsOutputMessage::SubaccountsChannelData(data) => {
420                        log::debug!(
421                            "Processing subaccounts channel data (orders={:?}, fills={:?})",
422                            data.contents.orders.as_ref().map(|o| o.len()),
423                            data.contents.fills.as_ref().map(|f| f.len())
424                        );
425                        let ts_init = clock.get_time_ns();
426
427                        let mut terminal_orders: Vec<(u32, u32, String)> = Vec::new();
428                        let mut pending_order_reports = Vec::new();
429
430                        // Phase 1: Parse orders and build order_id_map
431                        if let Some(ref orders) = data.contents.orders {
432                            for ws_order in orders {
433                                log::debug!(
434                                    "Parsing WS order: clob_pair_id={}, status={:?}, client_id={}",
435                                    ws_order.clob_pair_id,
436                                    ws_order.status,
437                                    ws_order.client_id
438                                );
439
440                                if let Ok(client_id_u32) = ws_order.client_id.parse::<u32>() {
441                                    let client_meta = ws_order.client_metadata
442                                        .as_ref()
443                                        .and_then(|s| s.parse::<u32>().ok())
444                                        .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
445                                    order_id_map.insert(ws_order.id.clone(), (client_id_u32, client_meta));
446                                }
447
448                                match parse_ws_order_report(
449                                    ws_order,
450                                    &instrument_cache,
451                                    &order_contexts,
452                                    &encoder,
453                                    account_id,
454                                    ts_init,
455                                ) {
456                                    Ok(report) => {
457                                        if !report.order_status.is_open()
458                                            && let Ok(cid) = ws_order.client_id.parse::<u32>()
459                                        {
460                                            let meta = ws_order.client_metadata
461                                                .as_ref()
462                                                .and_then(|s| s.parse::<u32>().ok())
463                                                .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
464                                            terminal_orders.push((cid, meta, ws_order.id.clone()));
465                                        }
466                                        log::debug!(
467                                            "Parsed order report: {} {} {:?} qty={} client_order_id={:?}",
468                                            report.instrument_id,
469                                            report.order_side,
470                                            report.order_status,
471                                            report.quantity,
472                                            report.client_order_id
473                                        );
474                                        pending_order_reports.push(report);
475                                    }
476                                    Err(e) => {
477                                        log::error!("Failed to parse WebSocket order: {e}");
478                                    }
479                                }
480                            }
481                        }
482
483                        // Phase 2: Process fills (sent before order status for correct reconciliation)
484                        if let Some(ref fills) = data.contents.fills {
485                            for ws_fill in fills {
486                                match parse_ws_fill_report(
487                                    ws_fill,
488                                    &instrument_cache,
489                                    &order_id_map,
490                                    &order_contexts,
491                                    &encoder,
492                                    account_id,
493                                    ts_init,
494                                ) {
495                                    Ok(report) => {
496                                        log::debug!(
497                                            "Parsed fill report: {} {} {} @ {} client_order_id={:?}",
498                                            report.instrument_id,
499                                            report.venue_order_id,
500                                            report.last_qty,
501                                            report.last_px,
502                                            report.client_order_id
503                                        );
504
505                                        let identity = report.client_order_id.and_then(|cid| {
506                                            dispatch_state.order_identities.get(&cid).map(|r| (cid, r.clone()))
507                                        });
508
509                                        if let Some((cid, ident)) = identity {
510                                            // Tracked: synthesize OrderAccepted if not yet emitted
511                                            if !dispatch_state.emitted_accepted.contains(&cid) {
512                                                dispatch_state.insert_accepted(cid);
513                                                let accepted = OrderAccepted::new(
514                                                    trader_id,
515                                                    ident.strategy_id,
516                                                    ident.instrument_id,
517                                                    cid,
518                                                    report.venue_order_id,
519                                                    account_id,
520                                                    UUID4::new(),
521                                                    ts_init,
522                                                    ts_init,
523                                                    false,
524                                                );
525                                                emitter.send_order_event(OrderEventAny::Accepted(accepted));
526                                            }
527
528                                            dispatch_state.insert_filled(cid);
529                                            let instrument = instrument_cache.get(&report.instrument_id);
530                                            let quote_currency = instrument
531                                                .map_or_else(Currency::USD, |i: InstrumentAny| i.quote_currency());
532                                            let filled = fill_report_to_order_filled(
533                                                &report, trader_id, &ident, quote_currency,
534                                            );
535                                            emitter.send_order_event(OrderEventAny::Filled(filled));
536                                        } else {
537                                            // Untracked: track avg_px and emit report
538                                            let entry = cum_fill_totals
539                                                .entry(report.venue_order_id)
540                                                .or_default();
541                                            let qty = report.last_qty.as_decimal();
542                                            entry.0 += report.last_px.as_decimal() * qty;
543                                            entry.1 += qty;
544                                            emitter.send_fill_report(report);
545                                        }
546                                    }
547                                    Err(e) => {
548                                        log::error!("Failed to parse WebSocket fill: {e}");
549                                    }
550                                }
551                            }
552                        }
553
554                        // Phase 3: Process order status updates
555                        // Enrich untracked reports with avg_px from cumulative fills
556                        for report in &mut pending_order_reports {
557                            if let Some((notional, total_qty)) =
558                                cum_fill_totals.get(&report.venue_order_id)
559                                && !total_qty.is_zero()
560                            {
561                                report.avg_px = Some(notional / total_qty);
562                            }
563                        }
564
565                        for report in pending_order_reports {
566                            let identity = report.client_order_id.and_then(|cid| {
567                                dispatch_state.order_identities.get(&cid).map(|r| (cid, r.clone()))
568                            });
569
570                            if let Some((cid, ident)) = identity {
571                                // Tracked order: emit proper lifecycle events
572                                match report.order_status {
573                                    OrderStatus::Accepted => {
574                                        if dispatch_state.emitted_accepted.contains(&cid)
575                                            || dispatch_state.filled_orders.contains(&cid)
576                                        {
577                                            log::debug!("Skipping duplicate Accepted for {cid}");
578                                            continue;
579                                        }
580                                        dispatch_state.insert_accepted(cid);
581                                        let accepted = OrderAccepted::new(
582                                            trader_id,
583                                            ident.strategy_id,
584                                            ident.instrument_id,
585                                            cid,
586                                            report.venue_order_id,
587                                            account_id,
588                                            UUID4::new(),
589                                            report.ts_last,
590                                            ts_init,
591                                            false,
592                                        );
593                                        emitter.send_order_event(OrderEventAny::Accepted(accepted));
594                                    }
595                                    OrderStatus::Canceled => {
596                                        // Synthesize Accepted if not yet emitted
597                                        if !dispatch_state.emitted_accepted.contains(&cid) {
598                                            dispatch_state.insert_accepted(cid);
599                                            let accepted = OrderAccepted::new(
600                                                trader_id,
601                                                ident.strategy_id,
602                                                ident.instrument_id,
603                                                cid,
604                                                report.venue_order_id,
605                                                account_id,
606                                                UUID4::new(),
607                                                ts_init,
608                                                ts_init,
609                                                false,
610                                            );
611                                            emitter.send_order_event(OrderEventAny::Accepted(accepted));
612                                        }
613                                        let canceled = OrderCanceled::new(
614                                            trader_id,
615                                            ident.strategy_id,
616                                            ident.instrument_id,
617                                            cid,
618                                            UUID4::new(),
619                                            report.ts_last,
620                                            ts_init,
621                                            false,
622                                            Some(report.venue_order_id),
623                                            Some(account_id),
624                                        );
625                                        emitter.send_order_event(OrderEventAny::Canceled(canceled));
626                                        dispatch_state.cleanup_terminal(&cid);
627                                    }
628                                    OrderStatus::Filled => {
629                                        // Fills already emitted as OrderFilled in Phase 2
630                                        dispatch_state.cleanup_terminal(&cid);
631                                    }
632                                    _ => {
633                                        // PendingUpdate, PartiallyFilled, etc.
634                                        emitter.send_order_status_report(report);
635                                    }
636                                }
637                            } else {
638                                // Untracked order: emit report for reconciliation
639                                emitter.send_order_status_report(report);
640                            }
641                        }
642
643                        // Phase 4: Cleanup terminal order tracking state
644                        for (client_id, client_metadata, order_id) in terminal_orders {
645                            order_contexts.remove(&client_id);
646                            encoder.remove(client_id, client_metadata);
647                            order_id_map.remove(&order_id);
648                            cum_fill_totals.remove(&VenueOrderId::new(&order_id));
649                        }
650                    }
651                    DydxWsOutputMessage::Markets(contents) => {
652                        if let Some(ref oracle_map) = contents.oracle_prices {
653                            for (symbol_str, oracle_market) in oracle_map {
654                                let instrument_id = {
655                                    let symbol = format!("{symbol_str}-PERP");
656                                    InstrumentId::new(
657                                        Symbol::new(&symbol),
658                                        *crate::common::consts::DYDX_VENUE,
659                                    )
660                                };
661
662                                if instrument_cache.get(&instrument_id).is_some()
663                                    && let Ok(price_dec) = oracle_market.oracle_price.parse::<Decimal>()
664                                {
665                                    oracle_prices.insert(instrument_id, price_dec);
666                                    log::trace!("Updated oracle price for {instrument_id}: {price_dec}");
667                                }
668                            }
669                        }
670
671                        if let Some(ref markets) = contents.markets {
672                            for (symbol_str, market_data) in markets {
673                                if let Some(oracle_price_str) = &market_data.oracle_price {
674                                    let instrument_id = {
675                                        let symbol = format!("{symbol_str}-PERP");
676                                        InstrumentId::new(
677                                            Symbol::new(&symbol),
678                                            *crate::common::consts::DYDX_VENUE,
679                                        )
680                                    };
681
682                                    if instrument_cache.get(&instrument_id).is_some()
683                                        && let Ok(price_dec) = oracle_price_str.parse::<Decimal>()
684                                    {
685                                        oracle_prices.insert(instrument_id, price_dec);
686                                    }
687                                }
688                            }
689                        }
690                    }
691                    DydxWsOutputMessage::BlockHeight { height, time } => {
692                        log::debug!("Block height update: {height} at {time}");
693                        block_time_monitor.record_block(height, time);
694                    }
695                    DydxWsOutputMessage::Error(err) => {
696                        log::error!("WebSocket error: {err:?}");
697                    }
698                    DydxWsOutputMessage::Reconnected => {
699                        log::info!("WebSocket reconnected");
700                    }
701                    _ => {}
702                }
703            }
704            log::debug!("WebSocket message processing task ended");
705        });
706
707        self.ws_stream_handle = Some(handle);
708        log::info!("WebSocket stream handler started");
709    }
710
711    /// Marks instruments as initialized after HTTP client has fetched them.
712    ///
713    /// The instruments are stored in the shared `InstrumentCache` which is automatically
714    /// populated by the HTTP client during `fetch_and_cache_instruments()`.
715    fn mark_instruments_initialized(&self) {
716        let count = self.instrument_cache.len();
717        self.core.set_instruments_initialized();
718        log::debug!("Instruments initialized: {count} instruments in shared cache");
719    }
720
721    fn get_instrument_by_market(&self, market: &str) -> Option<InstrumentAny> {
722        self.instrument_cache.get_by_market(market)
723    }
724
725    fn get_instrument_by_clob_pair_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
726        let instrument = self.instrument_cache.get_by_clob_id(clob_pair_id);
727
728        if instrument.is_none() {
729            self.instrument_cache.log_missing_clob_pair_id(clob_pair_id);
730        }
731
732        instrument
733    }
734
735    /// Gets the execution components, returning an error if not initialized.
736    ///
737    /// This should only be called after `connect()` has completed.
738    fn get_execution_components(
739        &self,
740    ) -> anyhow::Result<(
741        Arc<TransactionManager>,
742        Arc<TxBroadcaster>,
743        Arc<OrderMessageBuilder>,
744    )> {
745        let tx_manager = self
746            .tx_manager
747            .as_ref()
748            .ok_or_else(|| {
749                anyhow::anyhow!("TransactionManager not initialized - call connect() first")
750            })?
751            .clone();
752        let broadcaster = self
753            .broadcaster
754            .as_ref()
755            .ok_or_else(|| anyhow::anyhow!("TxBroadcaster not initialized - call connect() first"))?
756            .clone();
757        let order_builder = self
758            .order_builder
759            .as_ref()
760            .ok_or_else(|| {
761                anyhow::anyhow!("OrderMessageBuilder not initialized - call connect() first")
762            })?
763            .clone();
764        Ok((tx_manager, broadcaster, order_builder))
765    }
766
767    fn spawn_task<F>(&self, label: &'static str, fut: F)
768    where
769        F: Future<Output = anyhow::Result<()>> + Send + 'static,
770    {
771        let handle = get_runtime().spawn(async move {
772            if let Err(e) = fut.await {
773                log::error!("{label}: {e:?}");
774            }
775        });
776
777        self.pending_tasks
778            .lock()
779            .expect(MUTEX_POISONED)
780            .push(handle);
781    }
782
783    /// Spawns an order submission task with error handling and rejection generation.
784    ///
785    /// If the submission fails, generates an `OrderRejected` event with the error details.
786    fn spawn_order_task<F>(
787        &self,
788        label: &'static str,
789        strategy_id: StrategyId,
790        instrument_id: InstrumentId,
791        client_order_id: ClientOrderId,
792        fut: F,
793    ) where
794        F: Future<Output = anyhow::Result<()>> + Send + 'static,
795    {
796        let emitter = self.emitter.clone();
797        let clock = self.clock;
798
799        let handle = get_runtime().spawn(async move {
800            if let Err(e) = fut.await {
801                let error_msg = format!("{label} failed: {e:?}");
802                log::error!("{error_msg}");
803
804                let ts_event = clock.get_time_ns();
805                emitter.emit_order_rejected_event(
806                    strategy_id,
807                    instrument_id,
808                    client_order_id,
809                    &error_msg,
810                    ts_event,
811                    false,
812                );
813            }
814        });
815
816        self.pending_tasks
817            .lock()
818            .expect(MUTEX_POISONED)
819            .push(handle);
820    }
821
822    fn abort_pending_tasks(&self) {
823        let mut guard = self.pending_tasks.lock().expect(MUTEX_POISONED);
824        for handle in guard.drain(..) {
825            handle.abort();
826        }
827    }
828
829    /// Sends an OrderModifyRejected event.
830    fn send_modify_rejected(
831        &self,
832        strategy_id: StrategyId,
833        instrument_id: InstrumentId,
834        client_order_id: ClientOrderId,
835        venue_order_id: Option<VenueOrderId>,
836        reason: &str,
837    ) {
838        let ts_event = self.clock.get_time_ns();
839        self.emitter.emit_order_modify_rejected_event(
840            strategy_id,
841            instrument_id,
842            client_order_id,
843            venue_order_id,
844            reason,
845            ts_event,
846        );
847    }
848
849    /// Waits for the account to be registered in the cache.
850    ///
851    /// This method polls the cache until the account is registered, ensuring that
852    /// execution state reconciliation can process fills correctly (fills require
853    /// the account to be registered for portfolio updates).
854    ///
855    /// # Errors
856    ///
857    /// Returns an error if the account is not registered within the timeout period.
858    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
859        let account_id = self.core.account_id;
860
861        if self.core.cache().account(&account_id).is_some() {
862            log::info!("Account {account_id} registered");
863            return Ok(());
864        }
865
866        let start = Instant::now();
867        let timeout = Duration::from_secs_f64(timeout_secs);
868        let interval = Duration::from_millis(10);
869
870        loop {
871            tokio::time::sleep(interval).await;
872
873            if self.core.cache().account(&account_id).is_some() {
874                log::info!("Account {account_id} registered");
875                return Ok(());
876            }
877
878            if start.elapsed() >= timeout {
879                anyhow::bail!(
880                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
881                );
882            }
883        }
884    }
885}
886
887/// Broadcasts cancel orders with optimal partitioned strategy.
888///
889/// Partitions orders into short-term and long-term/conditional groups:
890/// - Short-term → single `MsgBatchCancel` via `broadcast_short_term()`
891/// - Long-term/conditional → batched `MsgCancelOrder` via `broadcast_with_retry()`
892///
893/// At most 2 gRPC calls regardless of order count or mix.
894async fn broadcast_partitioned_cancels(
895    orders: Vec<(InstrumentId, u32, u32)>,
896    block_height: u32,
897    tx_manager: Arc<TransactionManager>,
898    broadcaster: Arc<TxBroadcaster>,
899    order_builder: Arc<OrderMessageBuilder>,
900) -> anyhow::Result<()> {
901    if orders.is_empty() {
902        return Ok(());
903    }
904
905    let (short_term_orders, long_term_orders): (Vec<_>, Vec<_>) = orders
906        .into_iter()
907        .partition(|(_, _, flags)| *flags == types::ORDER_FLAG_SHORT_TERM);
908
909    // Cancel short-term orders with MsgBatchCancel (single gRPC call)
910    if !short_term_orders.is_empty() {
911        let st_pairs: Vec<_> = short_term_orders
912            .iter()
913            .map(|(inst_id, client_id, _)| (*inst_id, *client_id))
914            .collect();
915
916        log::debug!(
917            "Batch cancelling {} short-term orders with MsgBatchCancel",
918            st_pairs.len()
919        );
920
921        match order_builder.build_batch_cancel_short_term(&st_pairs, block_height) {
922            Ok(msg) => {
923                let operation = format!("BatchCancel {} short-term orders", st_pairs.len());
924                match broadcaster
925                    .broadcast_short_term(&tx_manager, vec![msg], &operation)
926                    .await
927                {
928                    Ok(tx_hash) => {
929                        log::debug!(
930                            "Successfully batch cancelled {} short-term orders, tx_hash: {}",
931                            st_pairs.len(),
932                            tx_hash
933                        );
934                    }
935                    Err(e) => {
936                        log::error!("Short-term batch cancel failed: {e:?}");
937                    }
938                }
939            }
940            Err(e) => {
941                log::error!("Failed to build MsgBatchCancel: {e:?}");
942            }
943        }
944    }
945
946    // Cancel long-term/conditional orders with batched MsgCancelOrder (single gRPC call)
947    if !long_term_orders.is_empty() {
948        log::debug!(
949            "Batch cancelling {} long-term orders",
950            long_term_orders.len(),
951        );
952
953        match order_builder.build_cancel_orders_batch_with_flags(&long_term_orders, block_height) {
954            Ok(cancel_msgs) => {
955                let operation = format!("BatchCancel {} long-term orders", long_term_orders.len());
956                match broadcaster
957                    .broadcast_with_retry(&tx_manager, cancel_msgs, &operation)
958                    .await
959                {
960                    Ok(tx_hash) => {
961                        log::debug!(
962                            "Successfully batch cancelled {} long-term orders, tx_hash: {}",
963                            long_term_orders.len(),
964                            tx_hash
965                        );
966                    }
967                    Err(e) => {
968                        log::error!("Long-term batch cancel failed: {e:?}");
969                    }
970                }
971            }
972            Err(e) => {
973                log::error!("Failed to build long-term cancel messages: {e:?}");
974            }
975        }
976    }
977
978    Ok(())
979}
980
981#[async_trait(?Send)]
982impl ExecutionClient for DydxExecutionClient {
983    fn is_connected(&self) -> bool {
984        self.core.is_connected()
985    }
986
987    fn client_id(&self) -> ClientId {
988        self.core.client_id
989    }
990
991    fn account_id(&self) -> AccountId {
992        self.core.account_id
993    }
994
995    fn venue(&self) -> Venue {
996        *DYDX_VENUE
997    }
998
999    fn oms_type(&self) -> OmsType {
1000        self.core.oms_type
1001    }
1002
1003    fn get_account(&self) -> Option<AccountAny> {
1004        self.core.cache().account(&self.core.account_id).cloned()
1005    }
1006
1007    fn generate_account_state(
1008        &self,
1009        balances: Vec<AccountBalance>,
1010        margins: Vec<MarginBalance>,
1011        reported: bool,
1012        ts_event: UnixNanos,
1013    ) -> anyhow::Result<()> {
1014        self.emitter
1015            .emit_account_state(balances, margins, reported, ts_event);
1016        Ok(())
1017    }
1018
1019    fn start(&mut self) -> anyhow::Result<()> {
1020        if self.core.is_started() {
1021            log::warn!("dYdX execution client already started");
1022            return Ok(());
1023        }
1024
1025        let sender = get_exec_event_sender();
1026        self.emitter.set_sender(sender);
1027        log::info!("Starting dYdX execution client");
1028        self.core.set_started();
1029        Ok(())
1030    }
1031
1032    fn stop(&mut self) -> anyhow::Result<()> {
1033        if self.core.is_stopped() {
1034            log::warn!("dYdX execution client not started");
1035            return Ok(());
1036        }
1037
1038        log::info!("Stopping dYdX execution client");
1039        self.abort_pending_tasks();
1040        self.core.set_stopped();
1041        self.core.set_disconnected();
1042        Ok(())
1043    }
1044
1045    /// Submits an order to dYdX via gRPC.
1046    ///
1047    /// dYdX requires u32 client IDs - Nautilus ClientOrderId strings are hashed to fit.
1048    ///
1049    /// Supported order types:
1050    /// - Market orders (short-term, IOC).
1051    /// - Limit orders (short-term or long-term based on TIF).
1052    /// - Stop Market orders (conditional, triggered at stop price).
1053    /// - Stop Limit orders (conditional, triggered at stop price, executed at limit).
1054    /// - Take Profit Market (MarketIfTouched - triggered at take profit price).
1055    /// - Take Profit Limit (LimitIfTouched - triggered at take profit price, executed at limit).
1056    ///
1057    /// Trailing stop orders are NOT supported by dYdX v4 protocol.
1058    ///
1059    /// Validates synchronously, generates OrderSubmitted event, then spawns async task for
1060    /// gRPC submission to avoid blocking. Unsupported order types generate OrderRejected.
1061    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
1062        // Check connection status first (doesn't need order)
1063        if !self.is_connected() {
1064            let reason = "Cannot submit order: execution client not connected";
1065            log::error!("{reason}");
1066            anyhow::bail!(reason);
1067        }
1068
1069        // Check block height is available for short-term orders
1070        let current_block = self.block_time_monitor.current_block_height();
1071        let order = self
1072            .core
1073            .cache()
1074            .order(&cmd.client_order_id)
1075            .cloned()
1076            .ok_or_else(|| {
1077                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
1078            })?;
1079
1080        let client_order_id = order.client_order_id();
1081        let instrument_id = order.instrument_id();
1082        let strategy_id = order.strategy_id();
1083
1084        if current_block == 0 {
1085            let reason = "Block height not initialized";
1086            log::warn!("Cannot submit order {client_order_id}: {reason}");
1087            let ts_event = self.clock.get_time_ns();
1088            self.emitter.emit_order_rejected_event(
1089                strategy_id,
1090                instrument_id,
1091                client_order_id,
1092                reason,
1093                ts_event,
1094                false,
1095            );
1096            return Ok(());
1097        }
1098
1099        // Check if order is already closed
1100        if order.is_closed() {
1101            log::warn!("Cannot submit closed order {client_order_id}");
1102            return Ok(());
1103        }
1104
1105        // Reject unsupported order types
1106        match order.order_type() {
1107            OrderType::Market
1108            | OrderType::Limit
1109            | OrderType::StopMarket
1110            | OrderType::StopLimit
1111            | OrderType::MarketIfTouched
1112            | OrderType::LimitIfTouched => {}
1113            // Trailing stops not supported by dYdX v4 protocol
1114            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit => {
1115                let reason = "Trailing stop orders not supported by dYdX v4 protocol";
1116                log::error!("{reason}");
1117                let ts_event = self.clock.get_time_ns();
1118                self.emitter.emit_order_rejected_event(
1119                    strategy_id,
1120                    instrument_id,
1121                    client_order_id,
1122                    reason,
1123                    ts_event,
1124                    false,
1125                );
1126                return Ok(());
1127            }
1128            order_type => {
1129                let reason = format!("Order type {order_type:?} not supported by dYdX");
1130                log::error!("{reason}");
1131                let ts_event = self.clock.get_time_ns();
1132                self.emitter.emit_order_rejected_event(
1133                    strategy_id,
1134                    instrument_id,
1135                    client_order_id,
1136                    &reason,
1137                    ts_event,
1138                    false,
1139                );
1140                return Ok(());
1141            }
1142        }
1143
1144        self.emitter.emit_order_submitted(&order);
1145
1146        // Get execution components (must be initialized after connect())
1147        let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1148            Ok(components) => components,
1149            Err(e) => {
1150                log::error!("Failed to get execution components: {e}");
1151                let ts_event = self.clock.get_time_ns();
1152                self.emitter.emit_order_rejected_event(
1153                    strategy_id,
1154                    instrument_id,
1155                    client_order_id,
1156                    &e.to_string(),
1157                    ts_event,
1158                    false,
1159                );
1160                return Ok(());
1161            }
1162        };
1163
1164        let block_height = self.block_time_monitor.current_block_height() as u32;
1165
1166        // Generate client_order_id as (u32, u32) pair before async block (dYdX requires u32 client IDs)
1167        let encoded = match self.encoder.encode(client_order_id) {
1168            Ok(enc) => enc,
1169            Err(e) => {
1170                log::error!("Failed to generate client order ID: {e}");
1171                let ts_event = self.clock.get_time_ns();
1172                self.emitter.emit_order_rejected_event(
1173                    strategy_id,
1174                    instrument_id,
1175                    client_order_id,
1176                    &e.to_string(),
1177                    ts_event,
1178                    false,
1179                );
1180                return Ok(());
1181            }
1182        };
1183        let client_id_u32 = encoded.client_id;
1184        let client_metadata = encoded.client_metadata;
1185
1186        log::info!(
1187            "[SUBMIT_ORDER] Nautilus '{}' -> dYdX u32={} meta={:#x} | instrument={} side={:?} qty={} type={:?}",
1188            client_order_id,
1189            client_id_u32,
1190            client_metadata,
1191            instrument_id,
1192            order.order_side(),
1193            order.quantity(),
1194            order.order_type()
1195        );
1196
1197        // Convert expire_time from nanoseconds to seconds if present
1198        let expire_time = order.expire_time().map(nanos_to_secs_i64);
1199
1200        // Determine order_flags based on order type for later cancellation
1201        let order_flags = match order.order_type() {
1202            // Conditional orders always use ORDER_FLAG_CONDITIONAL
1203            OrderType::StopMarket
1204            | OrderType::StopLimit
1205            | OrderType::MarketIfTouched
1206            | OrderType::LimitIfTouched => types::ORDER_FLAG_CONDITIONAL,
1207            // Market orders are always short-term
1208            OrderType::Market => types::ORDER_FLAG_SHORT_TERM,
1209            // Limit orders depend on time_in_force and expire_time
1210            OrderType::Limit => {
1211                let lifetime = types::OrderLifetime::from_time_in_force(
1212                    order.time_in_force(),
1213                    expire_time,
1214                    false,
1215                    order_builder.max_short_term_secs(),
1216                );
1217                lifetime.order_flags()
1218            }
1219            // Default to long-term for unknown types
1220            _ => types::ORDER_FLAG_LONG_TERM,
1221        };
1222
1223        // Register order context for WebSocket correlation and cancellation
1224        let ts_submitted = self.clock.get_time_ns();
1225        let trader_id = order.trader_id();
1226        self.register_order_context(
1227            client_id_u32,
1228            OrderContext {
1229                client_order_id,
1230                trader_id,
1231                strategy_id,
1232                instrument_id,
1233                submitted_at: ts_submitted,
1234                order_flags,
1235            },
1236        );
1237
1238        // Register dispatch identity so the WS handler emits proper order
1239        // events (OrderAccepted, OrderFilled, OrderCanceled) instead of reports
1240        self.dispatch_state.order_identities.insert(
1241            client_order_id,
1242            OrderIdentity {
1243                instrument_id,
1244                strategy_id,
1245                order_side: order.order_side(),
1246                order_type: order.order_type(),
1247            },
1248        );
1249
1250        self.spawn_order_task(
1251            "submit_order",
1252            strategy_id,
1253            instrument_id,
1254            client_order_id,
1255            async move {
1256                // Build the order message based on order type
1257                let (msg, order_type_str) = match order.order_type() {
1258                    OrderType::Market => {
1259                        let msg = order_builder.build_market_order(
1260                            instrument_id,
1261                            client_id_u32,
1262                            client_metadata,
1263                            order.order_side(),
1264                            order.quantity(),
1265                            block_height,
1266                        )?;
1267                        (msg, "market")
1268                    }
1269                    OrderType::Limit => {
1270                        // Use pre-computed expire_time (with default_short_term_expiry applied)
1271                        let msg = order_builder.build_limit_order(
1272                            instrument_id,
1273                            client_id_u32,
1274                            client_metadata,
1275                            order.order_side(),
1276                            order
1277                                .price()
1278                                .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?,
1279                            order.quantity(),
1280                            order.time_in_force(),
1281                            order.is_post_only(),
1282                            order.is_reduce_only(),
1283                            block_height,
1284                            expire_time, // Uses default_short_term_expiry if configured
1285                        )?;
1286                        (msg, "limit")
1287                    }
1288                    // Conditional orders use their own expiration logic (not affected by default_short_term_expiry)
1289                    // They are always stored on-chain with long-term semantics
1290                    OrderType::StopMarket => {
1291                        let trigger_price = order.trigger_price().ok_or_else(|| {
1292                            anyhow::anyhow!("Stop market order missing trigger_price")
1293                        })?;
1294                        let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1295                        let msg = order_builder.build_stop_market_order(
1296                            instrument_id,
1297                            client_id_u32,
1298                            client_metadata,
1299                            order.order_side(),
1300                            trigger_price,
1301                            order.quantity(),
1302                            order.is_reduce_only(),
1303                            cond_expire,
1304                        )?;
1305                        (msg, "stop_market")
1306                    }
1307                    OrderType::StopLimit => {
1308                        let trigger_price = order.trigger_price().ok_or_else(|| {
1309                            anyhow::anyhow!("Stop limit order missing trigger_price")
1310                        })?;
1311                        let limit_price = order.price().ok_or_else(|| {
1312                            anyhow::anyhow!("Stop limit order missing limit price")
1313                        })?;
1314                        let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1315                        let msg = order_builder.build_stop_limit_order(
1316                            instrument_id,
1317                            client_id_u32,
1318                            client_metadata,
1319                            order.order_side(),
1320                            trigger_price,
1321                            limit_price,
1322                            order.quantity(),
1323                            order.time_in_force(),
1324                            order.is_post_only(),
1325                            order.is_reduce_only(),
1326                            cond_expire,
1327                        )?;
1328                        (msg, "stop_limit")
1329                    }
1330                    // dYdX TakeProfitMarket maps to Nautilus MarketIfTouched
1331                    OrderType::MarketIfTouched => {
1332                        let trigger_price = order.trigger_price().ok_or_else(|| {
1333                            anyhow::anyhow!("Take profit market order missing trigger_price")
1334                        })?;
1335                        let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1336                        let msg = order_builder.build_take_profit_market_order(
1337                            instrument_id,
1338                            client_id_u32,
1339                            client_metadata,
1340                            order.order_side(),
1341                            trigger_price,
1342                            order.quantity(),
1343                            order.is_reduce_only(),
1344                            cond_expire,
1345                        )?;
1346                        (msg, "take_profit_market")
1347                    }
1348                    // dYdX TakeProfitLimit maps to Nautilus LimitIfTouched
1349                    OrderType::LimitIfTouched => {
1350                        let trigger_price = order.trigger_price().ok_or_else(|| {
1351                            anyhow::anyhow!("Take profit limit order missing trigger_price")
1352                        })?;
1353                        let limit_price = order.price().ok_or_else(|| {
1354                            anyhow::anyhow!("Take profit limit order missing limit price")
1355                        })?;
1356                        let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1357                        let msg = order_builder.build_take_profit_limit_order(
1358                            instrument_id,
1359                            client_id_u32,
1360                            client_metadata,
1361                            order.order_side(),
1362                            trigger_price,
1363                            limit_price,
1364                            order.quantity(),
1365                            order.time_in_force(),
1366                            order.is_post_only(),
1367                            order.is_reduce_only(),
1368                            cond_expire,
1369                        )?;
1370                        (msg, "take_profit_limit")
1371                    }
1372                    _ => unreachable!("Order type already validated"),
1373                };
1374
1375                // Broadcast: short-term orders use cached sequence (no increment),
1376                // stateful orders use broadcast_with_retry (proper sequence management)
1377                let operation = format!("Submit {order_type_str} order {client_order_id}");
1378
1379                if order_flags == types::ORDER_FLAG_SHORT_TERM {
1380                    broadcaster
1381                        .broadcast_short_term(&tx_manager, vec![msg], &operation)
1382                        .await?;
1383                } else {
1384                    broadcaster
1385                        .broadcast_with_retry(&tx_manager, vec![msg], &operation)
1386                        .await?;
1387                }
1388                log::debug!("Successfully submitted {order_type_str} order: {client_order_id}");
1389
1390                Ok(())
1391            },
1392        );
1393
1394        Ok(())
1395    }
1396
1397    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
1398        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
1399        let order_count = orders.len();
1400
1401        // Check connection status
1402        if !self.is_connected() {
1403            let reason = "Cannot submit order list: execution client not connected";
1404            log::error!("{reason}");
1405            anyhow::bail!(reason);
1406        }
1407
1408        // Check block height is available
1409        let current_block = self.block_time_monitor.current_block_height();
1410        if current_block == 0 {
1411            let reason = "Block height not initialized";
1412            log::warn!("Cannot submit order list: {reason}");
1413            // Reject all orders in the list
1414            let ts_event = self.clock.get_time_ns();
1415
1416            for order in &orders {
1417                self.emitter.emit_order_rejected_event(
1418                    order.strategy_id(),
1419                    order.instrument_id(),
1420                    order.client_order_id(),
1421                    reason,
1422                    ts_event,
1423                    false,
1424                );
1425            }
1426            return Ok(());
1427        }
1428
1429        // Get execution components early so we can register order contexts
1430        let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1431            Ok(components) => components,
1432            Err(e) => {
1433                log::error!("Failed to get execution components for batch: {e}");
1434                // Reject all orders in the list
1435                let ts_event = self.clock.get_time_ns();
1436
1437                for order in &orders {
1438                    self.emitter.emit_order_rejected_event(
1439                        order.strategy_id(),
1440                        order.instrument_id(),
1441                        order.client_order_id(),
1442                        &e.to_string(),
1443                        ts_event,
1444                        false,
1445                    );
1446                }
1447                return Ok(());
1448            }
1449        };
1450
1451        // Collect limit order parameters for batch submission
1452        let mut order_params: Vec<LimitOrderParams> = Vec::with_capacity(order_count);
1453        let mut order_info: Vec<(ClientOrderId, InstrumentId, StrategyId)> =
1454            Vec::with_capacity(order_count);
1455
1456        for order in &orders {
1457            // Only limit orders can be batched
1458            if order.order_type() != OrderType::Limit {
1459                log::warn!(
1460                    "Order {} has type {:?}, falling back to individual submission",
1461                    order.client_order_id(),
1462                    order.order_type()
1463                );
1464                // Fall back to individual submission for non-limit orders
1465                let submit_cmd = SubmitOrder::new(
1466                    cmd.trader_id,
1467                    cmd.client_id,
1468                    cmd.strategy_id,
1469                    order.instrument_id(),
1470                    order.client_order_id(),
1471                    order.init_event().clone(),
1472                    cmd.exec_algorithm_id,
1473                    cmd.position_id,
1474                    cmd.params.clone(),
1475                    UUID4::new(),
1476                    cmd.ts_init,
1477                );
1478
1479                if let Err(e) = self.submit_order(submit_cmd) {
1480                    log::error!(
1481                        "Failed to submit order {} from order list: {e}",
1482                        order.client_order_id()
1483                    );
1484                }
1485                continue;
1486            }
1487
1488            // Get price (required for limit orders)
1489            let Some(price) = order.price() else {
1490                let ts_event = self.clock.get_time_ns();
1491                self.emitter.emit_order_rejected_event(
1492                    order.strategy_id(),
1493                    order.instrument_id(),
1494                    order.client_order_id(),
1495                    "Limit order missing price",
1496                    ts_event,
1497                    false,
1498                );
1499                continue;
1500            };
1501
1502            // Generate client order ID as (u32, u32) pair
1503            let encoded = match self.encoder.encode(order.client_order_id()) {
1504                Ok(enc) => enc,
1505                Err(e) => {
1506                    log::error!("Failed to generate client order ID: {e}");
1507                    let ts_event = self.clock.get_time_ns();
1508                    self.emitter.emit_order_rejected_event(
1509                        order.strategy_id(),
1510                        order.instrument_id(),
1511                        order.client_order_id(),
1512                        &e.to_string(),
1513                        ts_event,
1514                        false,
1515                    );
1516                    continue;
1517                }
1518            };
1519            let client_id_u32 = encoded.client_id;
1520            let client_metadata = encoded.client_metadata;
1521
1522            // Send OrderSubmitted event
1523            self.emitter.emit_order_submitted(order);
1524
1525            // Determine order_flags for limit orders
1526            let expire_time_secs = order.expire_time().map(nanos_to_secs_i64);
1527            let lifetime = types::OrderLifetime::from_time_in_force(
1528                order.time_in_force(),
1529                expire_time_secs,
1530                false,
1531                order_builder.max_short_term_secs(),
1532            );
1533
1534            // Register order context for WebSocket correlation and cancellation
1535            let ts_submitted = self.clock.get_time_ns();
1536            self.register_order_context(
1537                client_id_u32,
1538                OrderContext {
1539                    client_order_id: order.client_order_id(),
1540                    trader_id: order.trader_id(),
1541                    strategy_id: order.strategy_id(),
1542                    instrument_id: order.instrument_id(),
1543                    submitted_at: ts_submitted,
1544                    order_flags: lifetime.order_flags(),
1545                },
1546            );
1547
1548            // Register dispatch identity for tracked order event emission
1549            self.dispatch_state.order_identities.insert(
1550                order.client_order_id(),
1551                OrderIdentity {
1552                    instrument_id: order.instrument_id(),
1553                    strategy_id: order.strategy_id(),
1554                    order_side: order.order_side(),
1555                    order_type: order.order_type(),
1556                },
1557            );
1558
1559            // Collect order parameters (builder will apply default_short_term_expiry if needed)
1560            order_params.push(LimitOrderParams {
1561                instrument_id: order.instrument_id(),
1562                client_order_id: client_id_u32,
1563                client_metadata,
1564                side: order.order_side(),
1565                price,
1566                quantity: order.quantity(),
1567                time_in_force: order.time_in_force(),
1568                post_only: order.is_post_only(),
1569                reduce_only: order.is_reduce_only(),
1570                expire_time_ns: order.expire_time(),
1571            });
1572            order_info.push((
1573                order.client_order_id(),
1574                order.instrument_id(),
1575                order.strategy_id(),
1576            ));
1577        }
1578
1579        // If no limit orders to batch, we're done
1580        if order_params.is_empty() {
1581            return Ok(());
1582        }
1583
1584        // Check if any orders are short-term
1585        // dYdX protocol restriction: short-term orders CANNOT be batched
1586        // Each short-term order must be in its own transaction
1587        let has_short_term = order_params
1588            .iter()
1589            .any(|params| order_builder.is_short_term_order(params));
1590
1591        let block_height = current_block as u32;
1592        let emitter = self.emitter.clone();
1593        let clock = self.clock;
1594
1595        if has_short_term {
1596            // Submit each order individually (short-term orders cannot be batched).
1597            log::debug!(
1598                "Submitting {} short-term limit orders concurrently (sequence not consumed)",
1599                order_params.len()
1600            );
1601
1602            let order_count = order_params.len();
1603
1604            let handle = get_runtime().spawn(async move {
1605                // Build and broadcast all orders concurrently -- no sequence coordination needed.
1606                // Short-term orders use cached sequence (not incremented) via broadcast_short_term.
1607                let mut handles = Vec::with_capacity(order_count);
1608
1609                for (params, (client_order_id, instrument_id, strategy_id)) in
1610                    order_params.into_iter().zip(order_info)
1611                {
1612                    let tx_manager = tx_manager.clone();
1613                    let broadcaster = broadcaster.clone();
1614                    let order_builder = order_builder.clone();
1615                    let emitter = emitter.clone();
1616
1617                    let handle = get_runtime().spawn(async move {
1618                        // Build order message
1619                        let msg = match order_builder
1620                            .build_limit_order_from_params(&params, block_height)
1621                        {
1622                            Ok(m) => m,
1623                            Err(e) => {
1624                                let error_msg = format!("Failed to build order message: {e:?}");
1625                                log::error!("{error_msg}");
1626                                let ts_event = clock.get_time_ns();
1627                                emitter.emit_order_rejected_event(
1628                                    strategy_id,
1629                                    instrument_id,
1630                                    client_order_id,
1631                                    &error_msg,
1632                                    ts_event,
1633                                    false,
1634                                );
1635                                return;
1636                            }
1637                        };
1638
1639                        // Broadcast with cached sequence (short-term orders don't consume sequences)
1640                        let operation = format!("Submit short-term order {client_order_id}");
1641
1642                        if let Err(e) = broadcaster
1643                            .broadcast_short_term(&tx_manager, vec![msg], &operation)
1644                            .await
1645                        {
1646                            let error_msg = format!("Order submission failed: {e:?}");
1647                            log::error!("{error_msg}");
1648                            let ts_event = clock.get_time_ns();
1649                            emitter.emit_order_rejected_event(
1650                                strategy_id,
1651                                instrument_id,
1652                                client_order_id,
1653                                &error_msg,
1654                                ts_event,
1655                                false,
1656                            );
1657                        }
1658                    });
1659
1660                    handles.push(handle);
1661                }
1662
1663                // Wait for all orders to be submitted
1664                for handle in handles {
1665                    let _ = handle.await;
1666                }
1667            });
1668
1669            // Track the task
1670            self.pending_tasks
1671                .lock()
1672                .expect(MUTEX_POISONED)
1673                .push(handle);
1674        } else {
1675            // All orders are long-term - can batch in single transaction
1676            log::info!(
1677                "Batch submitting {} long-term limit orders in single transaction",
1678                order_params.len()
1679            );
1680
1681            let handle = get_runtime().spawn(async move {
1682                // Build all order messages
1683                let msgs: Result<Vec<_>, _> = order_params
1684                    .iter()
1685                    .map(|params| order_builder.build_limit_order_from_params(params, block_height))
1686                    .collect();
1687
1688                let msgs = match msgs {
1689                    Ok(m) => m,
1690                    Err(e) => {
1691                        let error_msg = format!("Failed to build batch order messages: {e:?}");
1692                        log::error!("{error_msg}");
1693                        // Send OrderRejected for all orders
1694                        let ts_event = clock.get_time_ns();
1695
1696                        for (client_order_id, instrument_id, strategy_id) in order_info {
1697                            emitter.emit_order_rejected_event(
1698                                strategy_id,
1699                                instrument_id,
1700                                client_order_id,
1701                                &error_msg,
1702                                ts_event,
1703                                false,
1704                            );
1705                        }
1706                        return;
1707                    }
1708                };
1709
1710                // Broadcast batch with retry
1711                let operation = format!("Submit batch of {} limit orders", msgs.len());
1712
1713                if let Err(e) = broadcaster
1714                    .broadcast_with_retry(&tx_manager, msgs, &operation)
1715                    .await
1716                {
1717                    let error_msg = format!("Batch order submission failed: {e:?}");
1718                    log::error!("{error_msg}");
1719
1720                    // Send OrderRejected for all orders in the batch
1721                    let ts_event = clock.get_time_ns();
1722
1723                    for (client_order_id, instrument_id, strategy_id) in order_info {
1724                        emitter.emit_order_rejected_event(
1725                            strategy_id,
1726                            instrument_id,
1727                            client_order_id,
1728                            &error_msg,
1729                            ts_event,
1730                            false,
1731                        );
1732                    }
1733                }
1734            });
1735
1736            // Track the task
1737            self.pending_tasks
1738                .lock()
1739                .expect(MUTEX_POISONED)
1740                .push(handle);
1741        }
1742
1743        Ok(())
1744    }
1745
1746    /// dYdX does not support native order modification.
1747    ///
1748    /// Strategies should handle `OrderModifyRejected` by canceling and resubmitting.
1749    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
1750        let reason = "dYdX does not support order modification. Use cancel and resubmit instead.";
1751        log::error!("{reason}");
1752
1753        self.send_modify_rejected(
1754            cmd.strategy_id,
1755            cmd.instrument_id,
1756            cmd.client_order_id,
1757            cmd.venue_order_id,
1758            reason,
1759        );
1760        Ok(())
1761    }
1762
1763    /// Cancels an order on dYdX exchange.
1764    ///
1765    /// Validates the order state and retrieves instrument details before
1766    /// spawning an async task to cancel via gRPC.
1767    ///
1768    /// # Validation
1769    ///
1770    /// - Checks order exists in cache.
1771    /// - Validates order is not already closed.
1772    /// - Retrieves instrument from cache for order builder.
1773    ///
1774    /// The `cmd` contains client/venue order IDs. Returns `Ok(())` if cancel request is
1775    /// spawned successfully or validation fails gracefully. Returns `Err` if not connected.
1776    ///
1777    /// # Events
1778    ///
1779    /// - `OrderCanceled` - Generated when WebSocket confirms cancellation.
1780    /// - `OrderCancelRejected` - Generated if exchange rejects cancellation.
1781    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
1782        if !self.is_connected() {
1783            anyhow::bail!("Cannot cancel order: not connected");
1784        }
1785
1786        let client_order_id = cmd.client_order_id;
1787        let instrument_id = cmd.instrument_id;
1788        let strategy_id = cmd.strategy_id;
1789        let venue_order_id = cmd.venue_order_id;
1790
1791        let (order_time_in_force, order_expire_time) = {
1792            let cache = self.core.cache();
1793
1794            let order = match cache.order(&client_order_id) {
1795                Some(order) => order,
1796                None => {
1797                    log::error!("Cannot cancel order {client_order_id}: not found in cache");
1798                    return Ok(()); // Not an error - order may have been filled/canceled already
1799                }
1800            };
1801
1802            // Validate order is not already closed
1803            if order.is_closed() {
1804                log::warn!(
1805                    "CancelOrder command for {} when order already {} (will not send to exchange)",
1806                    client_order_id,
1807                    order.status()
1808                );
1809                return Ok(());
1810            }
1811
1812            // Verify instrument exists (no need to hold reference)
1813            if cache.instrument(&instrument_id).is_none() {
1814                log::error!(
1815                    "Cannot cancel order {client_order_id}: instrument {instrument_id} not found in cache"
1816                );
1817                return Ok(()); // Not an error - missing instrument is a cache issue
1818            }
1819
1820            // Extract data needed for order_flags fallback
1821            (
1822                order.time_in_force(),
1823                order.expire_time().map(nanos_to_secs_i64),
1824            )
1825        }; // Cache borrow released here
1826
1827        log::debug!("Cancelling order {client_order_id} for instrument {instrument_id}");
1828
1829        // Get execution components (no cache borrow held)
1830        let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1831            Ok(components) => components,
1832            Err(e) => {
1833                log::error!("Failed to get execution components for cancel: {e}");
1834                return Ok(());
1835            }
1836        };
1837
1838        let block_height = self.block_time_monitor.current_block_height() as u32;
1839
1840        // Convert client_order_id to (u32, u32) pair before async block
1841        let encoded = match self.encoder.get(&client_order_id) {
1842            Some(enc) => enc,
1843            None => {
1844                log::error!("Client order ID {client_order_id} not found in cache");
1845                anyhow::bail!("Client order ID not found in cache")
1846            }
1847        };
1848        let client_id_u32 = encoded.client_id;
1849
1850        log::info!(
1851            "[CANCEL_ORDER] Nautilus '{client_order_id}' -> dYdX u32={client_id_u32} | instrument={instrument_id}"
1852        );
1853
1854        // Get stored order_flags from order context (set at submission time)
1855        // This ensures we use the correct flags even if the order has expired
1856        let order_flags = self.get_order_context(client_id_u32).map_or_else(
1857            || {
1858                // Fallback: derive from order parameters if context not found
1859                log::warn!(
1860                    "Order context not found for {client_order_id}, deriving flags from order"
1861                );
1862                types::OrderLifetime::from_time_in_force(
1863                    order_time_in_force, // Using extracted value
1864                    order_expire_time,   // Using extracted value
1865                    false,
1866                    order_builder.max_short_term_secs(),
1867                )
1868                .order_flags()
1869            },
1870            |ctx| ctx.order_flags,
1871        );
1872
1873        let clock = self.clock;
1874        let emitter = self.emitter.clone();
1875
1876        self.spawn_task("cancel_order", async move {
1877            // Build cancel message using stored order_flags
1878            let cancel_msg = match order_builder.build_cancel_order_with_flags(
1879                instrument_id,
1880                client_id_u32,
1881                order_flags,
1882                block_height,
1883            ) {
1884                Ok(msg) => msg,
1885                Err(e) => {
1886                    log::error!("Failed to build cancel message for {client_order_id}: {e:?}");
1887                    let ts_event = clock.get_time_ns();
1888                    emitter.emit_order_cancel_rejected_event(
1889                        strategy_id,
1890                        instrument_id,
1891                        client_order_id,
1892                        venue_order_id,
1893                        &format!("Cancel build failed: {e:?}"),
1894                        ts_event,
1895                    );
1896                    return Ok(());
1897                }
1898            };
1899
1900            // Broadcast cancel: short-term uses cached sequence, stateful uses retry
1901            let cancel_op = format!("Cancel order {client_order_id}");
1902            let result = if order_flags == types::ORDER_FLAG_SHORT_TERM {
1903                broadcaster
1904                    .broadcast_short_term(&tx_manager, vec![cancel_msg], &cancel_op)
1905                    .await
1906            } else {
1907                broadcaster
1908                    .broadcast_with_retry(&tx_manager, vec![cancel_msg], &cancel_op)
1909                    .await
1910            };
1911
1912            match result {
1913                Ok(_) => {
1914                    log::debug!("Successfully cancelled order: {client_order_id}");
1915                }
1916                Err(e) => {
1917                    log::error!("Failed to cancel order {client_order_id}: {e:?}");
1918
1919                    let ts_event = clock.get_time_ns();
1920                    emitter.emit_order_cancel_rejected_event(
1921                        strategy_id,
1922                        instrument_id,
1923                        client_order_id,
1924                        venue_order_id,
1925                        &format!("Cancel order failed: {e:?}"),
1926                        ts_event,
1927                    );
1928                }
1929            }
1930
1931            Ok(())
1932        });
1933
1934        Ok(())
1935    }
1936
1937    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1938        if !self.is_connected() {
1939            anyhow::bail!("Cannot cancel orders: not connected");
1940        }
1941
1942        let instrument_id = cmd.instrument_id;
1943        let order_side_filter = cmd.order_side;
1944
1945        // Extract order data from cache with short-lived borrow
1946        // Collect (client_order_id, time_in_force, expire_time) for each matching order
1947        let order_data: Vec<(ClientOrderId, TimeInForce, Option<UnixNanos>)> = {
1948            let cache = self.core.cache();
1949            cache
1950                .orders_open(None, None, None, None, None)
1951                .into_iter()
1952                .filter(|order| order.instrument_id() == instrument_id)
1953                .filter(|order| {
1954                    order_side_filter == OrderSide::NoOrderSide
1955                        || order.order_side() == order_side_filter
1956                })
1957                .map(|order| {
1958                    (
1959                        order.client_order_id(),
1960                        order.time_in_force(),
1961                        order.expire_time(),
1962                    )
1963                })
1964                .collect()
1965        }; // Cache borrow released here
1966
1967        // Count short-term vs long-term for logging
1968        let short_term_count = order_data
1969            .iter()
1970            .filter(|(_, tif, _)| matches!(tif, TimeInForce::Ioc | TimeInForce::Fok))
1971            .count();
1972        let long_term_count = order_data.len() - short_term_count;
1973
1974        log::debug!(
1975            "Cancel all orders: total={}, short_term={}, long_term={}, instrument_id={instrument_id}, order_side={order_side_filter:?}",
1976            order_data.len(),
1977            short_term_count,
1978            long_term_count
1979        );
1980
1981        // Get execution components (no cache borrow held)
1982        let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1983            Ok(components) => components,
1984            Err(e) => {
1985                log::error!("Failed to get execution components for cancel_all: {e}");
1986                return Ok(());
1987            }
1988        };
1989
1990        let block_height = self.block_time_monitor.current_block_height() as u32;
1991
1992        // Collect (instrument_id, client_id, order_flags) tuples for cancel
1993        // Use stored order_flags from order context to ensure correct cancellation
1994        let mut orders_to_cancel = Vec::new();
1995
1996        for (client_order_id, _time_in_force, _expire_time) in &order_data {
1997            let Some(encoded) = self.encoder.get(client_order_id) else {
1998                log::warn!("Cannot cancel order {client_order_id}: not found in encoder");
1999                continue;
2000            };
2001            let client_id_u32 = encoded.client_id;
2002
2003            // Skip if context already cleaned up (terminal WS event received)
2004            let Some(ctx) = self.get_order_context(client_id_u32) else {
2005                log::debug!(
2006                    "Skipping cancel for {client_order_id}: order context already cleaned up (terminal)"
2007                );
2008                continue;
2009            };
2010            orders_to_cancel.push((instrument_id, client_id_u32, ctx.order_flags));
2011        }
2012
2013        if orders_to_cancel.is_empty() {
2014            return Ok(());
2015        }
2016
2017        log::debug!(
2018            "Cancel all: {} orders (short_term={}, long_term={}), instrument_id={instrument_id}, order_side={order_side_filter:?}",
2019            orders_to_cancel.len(),
2020            orders_to_cancel
2021                .iter()
2022                .filter(|(_, _, f)| *f == types::ORDER_FLAG_SHORT_TERM)
2023                .count(),
2024            orders_to_cancel
2025                .iter()
2026                .filter(|(_, _, f)| *f != types::ORDER_FLAG_SHORT_TERM)
2027                .count(),
2028        );
2029
2030        self.spawn_task("cancel_all_orders", async move {
2031            broadcast_partitioned_cancels(
2032                orders_to_cancel,
2033                block_height,
2034                tx_manager,
2035                broadcaster,
2036                order_builder,
2037            )
2038            .await
2039        });
2040
2041        Ok(())
2042    }
2043
2044    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
2045        if cmd.cancels.is_empty() {
2046            return Ok(());
2047        }
2048
2049        if !self.is_connected() {
2050            anyhow::bail!("Cannot cancel orders: not connected");
2051        }
2052
2053        // Get execution components for broadcasting
2054        let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
2055            Ok(components) => components,
2056            Err(e) => {
2057                log::error!("Failed to get execution components for batch cancel: {e}");
2058                return Ok(());
2059            }
2060        };
2061
2062        // Convert ClientOrderIds to u32 and get order_flags
2063        let mut orders_to_cancel = Vec::with_capacity(cmd.cancels.len());
2064        for cancel in &cmd.cancels {
2065            let client_order_id = cancel.client_order_id;
2066            let encoded = match self.encoder.get(&client_order_id) {
2067                Some(enc) => enc,
2068                None => {
2069                    log::warn!(
2070                        "No u32 mapping found for client_order_id={client_order_id}, skipping cancel"
2071                    );
2072                    continue;
2073                }
2074            };
2075            let client_id_u32 = encoded.client_id;
2076
2077            // Skip if context already cleaned up (terminal WS event received)
2078            let Some(ctx) = self.get_order_context(client_id_u32) else {
2079                log::debug!(
2080                    "Skipping cancel for {client_order_id}: order context already cleaned up (terminal)"
2081                );
2082                continue;
2083            };
2084
2085            orders_to_cancel.push((cancel.instrument_id, client_id_u32, ctx.order_flags));
2086        }
2087
2088        if orders_to_cancel.is_empty() {
2089            log::warn!("No valid orders to cancel in batch");
2090            return Ok(());
2091        }
2092
2093        let block_height = self.block_time_monitor.current_block_height() as u32;
2094
2095        log::debug!(
2096            "Batch cancelling {} orders via partitioned strategy",
2097            orders_to_cancel.len(),
2098        );
2099
2100        self.spawn_task("batch_cancel_orders", async move {
2101            broadcast_partitioned_cancels(
2102                orders_to_cancel,
2103                block_height,
2104                tx_manager,
2105                broadcaster,
2106                order_builder,
2107            )
2108            .await
2109        });
2110
2111        Ok(())
2112    }
2113
2114    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
2115        let http_client = self.http_client.clone();
2116        let wallet_address = self.wallet_address.clone();
2117        let subaccount_number = self.subaccount_number;
2118        let account_id = self.core.account_id;
2119        let emitter = self.emitter.clone();
2120
2121        self.spawn_task("query_account", async move {
2122            let account_state = http_client
2123                .request_account_state(&wallet_address, subaccount_number, account_id)
2124                .await
2125                .context("failed to query account state")?;
2126
2127            emitter.emit_account_state(
2128                account_state.balances.clone(),
2129                account_state.margins.clone(),
2130                account_state.is_reported,
2131                account_state.ts_event,
2132            );
2133            Ok(())
2134        });
2135
2136        Ok(())
2137    }
2138
2139    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
2140        log::debug!("Querying order: client_order_id={}", cmd.client_order_id);
2141
2142        let http_client = self.http_client.clone();
2143        let wallet_address = self.wallet_address.clone();
2144        let subaccount_number = self.subaccount_number;
2145        let account_id = self.core.account_id;
2146        let emitter = self.emitter.clone();
2147        let client_order_id = cmd.client_order_id;
2148        let venue_order_id = cmd.venue_order_id;
2149        let instrument_id = cmd.instrument_id;
2150
2151        self.spawn_task("query_order", async move {
2152            let reports = http_client
2153                .request_order_status_reports(
2154                    &wallet_address,
2155                    subaccount_number,
2156                    account_id,
2157                    Some(instrument_id),
2158                )
2159                .await
2160                .context("failed to query order status")?;
2161
2162            // Find matching report by client_order_id or venue_order_id
2163            let report = reports.into_iter().find(|r| {
2164                if venue_order_id.is_some_and(|vid| r.venue_order_id == vid) {
2165                    return true;
2166                }
2167                r.client_order_id.is_some_and(|cid| cid == client_order_id)
2168            });
2169
2170            if let Some(report) = report {
2171                emitter.send_order_status_report(report);
2172            } else {
2173                log::warn!(
2174                    "No order found for client_order_id={client_order_id}, venue_order_id={venue_order_id:?}"
2175                );
2176            }
2177
2178            Ok(())
2179        });
2180
2181        Ok(())
2182    }
2183
2184    async fn connect(&mut self) -> anyhow::Result<()> {
2185        if self.core.is_connected() {
2186            log::warn!("dYdX execution client already connected");
2187            return Ok(());
2188        }
2189
2190        log::info!("Connecting to dYdX");
2191
2192        log::debug!("Loading instruments from HTTP API");
2193        self.http_client.fetch_and_cache_instruments().await?;
2194        log::debug!(
2195            "Loaded {} instruments from HTTP into shared cache",
2196            self.http_client.cached_instruments_count()
2197        );
2198        self.mark_instruments_initialized();
2199
2200        // Initialize gRPC client (deferred from constructor to avoid blocking)
2201        let grpc_urls = self.config.get_grpc_urls();
2202        let mut grpc_client = DydxGrpcClient::new_with_fallback(&grpc_urls)
2203            .await
2204            .context("failed to construct dYdX gRPC client")?;
2205        log::debug!("gRPC client initialized");
2206
2207        // Fetch initial block height synchronously so orders can be submitted immediately after connect()
2208        let initial_height = grpc_client
2209            .latest_block_height()
2210            .await
2211            .context("failed to fetch initial block height")?;
2212        // Use current time as approximation; actual timestamps will come from WebSocket updates
2213        self.block_time_monitor
2214            .record_block(initial_height.0 as u64, chrono::Utc::now());
2215        log::info!("Initial block height: {}", initial_height.0);
2216
2217        *self.grpc_client.write().await = Some(grpc_client.clone());
2218
2219        // Resolve private key and create TransactionManager (owns wallet and sequence management)
2220        let private_key =
2221            Self::resolve_private_key(&self.config).context("failed to resolve private key")?;
2222        let tx_manager = Arc::new(
2223            TransactionManager::new(
2224                grpc_client.clone(),
2225                &private_key,
2226                self.wallet_address.clone(),
2227                self.get_chain_id(),
2228            )
2229            .context("failed to create TransactionManager")?,
2230        );
2231
2232        tx_manager
2233            .resolve_authenticators()
2234            .await
2235            .context("failed to resolve authenticators")?;
2236
2237        // Proactively initialize sequence from chain so orders can be submitted
2238        // immediately after connect() without first-transaction latency penalty.
2239        tx_manager
2240            .initialize_sequence()
2241            .await
2242            .context("failed to initialize sequence")?;
2243
2244        self.tx_manager = Some(tx_manager);
2245        self.broadcaster = Some(Arc::new(TxBroadcaster::new(
2246            grpc_client,
2247            self.config.grpc_quota(),
2248        )));
2249        self.order_builder = Some(Arc::new(OrderMessageBuilder::new(
2250            self.http_client.clone(),
2251            self.wallet_address.clone(),
2252            self.subaccount_number,
2253            self.block_time_monitor.clone(),
2254        )));
2255        log::debug!(
2256            "OrderMessageBuilder initialized (block_time_monitor ready: {}, max_short_term: {:.1}s)",
2257            self.block_time_monitor.is_ready(),
2258            SHORT_TERM_ORDER_MAXIMUM_LIFETIME as f64
2259                * self.block_time_monitor.seconds_per_block_or_default()
2260        );
2261
2262        // Connect WebSocket
2263        self.ws_client.connect().await?;
2264        log::debug!("WebSocket connected");
2265
2266        // Subscribe to block height updates
2267        self.ws_client.subscribe_block_height().await?;
2268        log::debug!("Subscribed to block height updates");
2269
2270        // Subscribe to markets for instrument data
2271        self.ws_client.subscribe_markets().await?;
2272        log::debug!("Subscribed to markets");
2273
2274        // Subscribe to subaccount updates (wallet is always initialized for execution client)
2275        log::info!(
2276            "Using wallet address for queries: {} (subaccount {})",
2277            self.wallet_address,
2278            self.subaccount_number
2279        );
2280        self.ws_client
2281            .subscribe_subaccount(&self.wallet_address, self.subaccount_number)
2282            .await?;
2283        log::debug!(
2284            "Subscribed to subaccount updates: {}/{}",
2285            self.wallet_address,
2286            self.subaccount_number
2287        );
2288
2289        let stream = self.ws_client.stream();
2290        self.spawn_ws_stream_handler(stream);
2291
2292        // Wait for account to be registered in cache before continuing.
2293        // This ensures execution state reconciliation can process fills correctly
2294        // (fills require the account to be registered for portfolio updates).
2295        self.await_account_registered(30.0).await?;
2296
2297        self.core.set_connected();
2298        log::info!("Connected: client_id={}", self.core.client_id);
2299        Ok(())
2300    }
2301
2302    async fn disconnect(&mut self) -> anyhow::Result<()> {
2303        if self.core.is_disconnected() {
2304            log::warn!("dYdX execution client not connected");
2305            return Ok(());
2306        }
2307
2308        log::info!("Disconnecting from dYdX");
2309
2310        // Unsubscribe from subaccount (execution client always has credentials)
2311        let _ = self
2312            .ws_client
2313            .unsubscribe_subaccount(&self.wallet_address, self.subaccount_number)
2314            .await
2315            .map_err(|e| log::warn!("Failed to unsubscribe from subaccount: {e}"));
2316
2317        // Unsubscribe from markets
2318        let _ = self
2319            .ws_client
2320            .unsubscribe_markets()
2321            .await
2322            .map_err(|e| log::warn!("Failed to unsubscribe from markets: {e}"));
2323
2324        // Unsubscribe from block height
2325        let _ = self
2326            .ws_client
2327            .unsubscribe_block_height()
2328            .await
2329            .map_err(|e| log::warn!("Failed to unsubscribe from block height: {e}"));
2330
2331        // Disconnect WebSocket
2332        self.ws_client.disconnect().await?;
2333
2334        // Abort WebSocket message processing task
2335        if let Some(handle) = self.ws_stream_handle.take() {
2336            handle.abort();
2337            log::debug!("Aborted WebSocket message processing task");
2338        }
2339
2340        // Abort any pending tasks
2341        self.abort_pending_tasks();
2342
2343        self.core.set_disconnected();
2344        log::info!("Disconnected: client_id={}", self.core.client_id);
2345        Ok(())
2346    }
2347
2348    async fn generate_order_status_report(
2349        &self,
2350        cmd: &GenerateOrderStatusReport,
2351    ) -> anyhow::Result<Option<OrderStatusReport>> {
2352        // dYdX Indexer `/v4/orders` caps at `limit` and has no offset cursor, so we
2353        // request the maximum page (1000) to maximise the chance of finding a match
2354        // on active subaccounts. Callers looking for older orders should prefer
2355        // `generate_mass_status` or narrow via `instrument_id`.
2356        const ORDER_LOOKUP_LIMIT: u32 = 1_000;
2357
2358        // Fetch orders, narrowing by market when an instrument filter is provided
2359        let market = cmd
2360            .instrument_id
2361            .map(|id| id.symbol.as_str().trim_end_matches("-PERP").to_string());
2362
2363        let response = self
2364            .http_client
2365            .inner
2366            .get_orders(
2367                &self.wallet_address,
2368                self.subaccount_number,
2369                market.as_deref(),
2370                Some(ORDER_LOOKUP_LIMIT),
2371            )
2372            .await
2373            .context("failed to fetch order from dYdX API")?;
2374
2375        if response.is_empty() {
2376            log::debug!(
2377                "No orders returned for {}/subaccount={} (market_filter={:?})",
2378                self.wallet_address,
2379                self.subaccount_number,
2380                market,
2381            );
2382            return Ok(None);
2383        }
2384
2385        let ts_init = UnixNanos::default();
2386        let scanned_count = response.len();
2387
2388        let report = find_matching_order_report(
2389            &response,
2390            cmd.instrument_id,
2391            cmd.client_order_id,
2392            cmd.venue_order_id,
2393            |clob_pair_id| self.get_instrument_by_clob_pair_id(clob_pair_id),
2394            &self.encoder,
2395            self.core.account_id,
2396            ts_init,
2397        )?;
2398
2399        if report.is_none() {
2400            // The target order was not in the fetched page. Surface the scope so
2401            // callers can tell whether the order is older than the page or the
2402            // filters simply didn't match any returned order.
2403            let page_full = scanned_count == ORDER_LOOKUP_LIMIT as usize;
2404            log::debug!(
2405                "No order matched filters for {}/subaccount={} \
2406                 (client_order_id={:?}, venue_order_id={:?}, instrument_id={:?}, \
2407                 scanned={scanned_count}, page_full={page_full}, limit={ORDER_LOOKUP_LIMIT})",
2408                self.wallet_address,
2409                self.subaccount_number,
2410                cmd.client_order_id,
2411                cmd.venue_order_id,
2412                cmd.instrument_id,
2413            );
2414        }
2415
2416        Ok(report)
2417    }
2418
2419    async fn generate_order_status_reports(
2420        &self,
2421        cmd: &GenerateOrderStatusReports,
2422    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2423        // Query orders from dYdX API
2424        let response = self
2425            .http_client
2426            .inner
2427            .get_orders(
2428                &self.wallet_address,
2429                self.subaccount_number,
2430                None, // market filter
2431                None, // limit
2432            )
2433            .await
2434            .context("failed to fetch orders from dYdX API")?;
2435
2436        let mut reports = Vec::new();
2437        let ts_init = UnixNanos::default();
2438
2439        for order in response {
2440            let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
2441                Some(inst) => inst,
2442                None => continue,
2443            };
2444
2445            if let Some(filter_id) = cmd.instrument_id
2446                && instrument.id() != filter_id
2447            {
2448                continue;
2449            }
2450
2451            match parse_order_status_report(&order, &instrument, self.core.account_id, ts_init) {
2452                Ok(mut r) => {
2453                    if !order.client_id.is_empty()
2454                        && let Ok(client_id_u32) = order.client_id.parse::<u32>()
2455                    {
2456                        self.encoder.register_known_client_id(client_id_u32);
2457
2458                        if let Some(decoded) = self
2459                            .encoder
2460                            .decode_if_known(client_id_u32, order.client_metadata)
2461                        {
2462                            log::debug!(
2463                                "Decoded order: dYdX client_id={} meta={:#x} -> '{}'",
2464                                client_id_u32,
2465                                order.client_metadata,
2466                                decoded,
2467                            );
2468                            r.client_order_id = Some(decoded);
2469                        }
2470                    }
2471                    reports.push(r);
2472                }
2473                Err(e) => {
2474                    log::warn!("Failed to parse order status report: {e}");
2475                }
2476            }
2477        }
2478
2479        // Filter by open_only if specified
2480        if cmd.open_only {
2481            reports.retain(|r| r.order_status.is_open());
2482        }
2483
2484        // Filter by time range if specified
2485        if let Some(start) = cmd.start {
2486            reports.retain(|r| r.ts_last >= start);
2487        }
2488
2489        if let Some(end) = cmd.end {
2490            reports.retain(|r| r.ts_last <= end);
2491        }
2492
2493        Ok(reports)
2494    }
2495
2496    async fn generate_fill_reports(
2497        &self,
2498        cmd: GenerateFillReports,
2499    ) -> anyhow::Result<Vec<FillReport>> {
2500        let response = self
2501            .http_client
2502            .inner
2503            .get_fills(
2504                &self.wallet_address,
2505                self.subaccount_number,
2506                None, // market filter
2507                None, // limit
2508            )
2509            .await
2510            .context("failed to fetch fills from dYdX API")?;
2511
2512        let mut reports = Vec::new();
2513        let ts_init = UnixNanos::default();
2514
2515        for fill in response.fills {
2516            let instrument = match self.get_instrument_by_market(&fill.market) {
2517                Some(inst) => inst,
2518                None => {
2519                    log::warn!("Unknown market in fill: {}", fill.market);
2520                    continue;
2521                }
2522            };
2523
2524            if let Some(filter_id) = cmd.instrument_id
2525                && instrument.id() != filter_id
2526            {
2527                continue;
2528            }
2529
2530            let report = match parse_fill_report(&fill, &instrument, self.core.account_id, ts_init)
2531            {
2532                Ok(r) => r,
2533                Err(e) => {
2534                    log::warn!("Failed to parse fill report: {e}");
2535                    continue;
2536                }
2537            };
2538
2539            reports.push(report);
2540        }
2541
2542        if let Some(venue_order_id) = cmd.venue_order_id {
2543            reports.retain(|r| r.venue_order_id.as_str() == venue_order_id.as_str());
2544        }
2545
2546        Ok(reports)
2547    }
2548
2549    async fn generate_position_status_reports(
2550        &self,
2551        cmd: &GeneratePositionStatusReports,
2552    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2553        // Query subaccount positions from dYdX API
2554        let response = self
2555            .http_client
2556            .inner
2557            .get_subaccount(&self.wallet_address, self.subaccount_number)
2558            .await
2559            .context("failed to fetch subaccount from dYdX API")?;
2560
2561        let mut reports = Vec::new();
2562        let ts_init = UnixNanos::default();
2563
2564        for (market_ticker, perp_position) in &response.subaccount.open_perpetual_positions {
2565            let instrument = match self.get_instrument_by_market(market_ticker) {
2566                Some(inst) => inst,
2567                None => {
2568                    log::warn!("Unknown market in position: {market_ticker}");
2569                    continue;
2570                }
2571            };
2572
2573            if let Some(filter_id) = cmd.instrument_id
2574                && instrument.id() != filter_id
2575            {
2576                continue;
2577            }
2578
2579            let report = match parse_position_status_report(
2580                perp_position,
2581                &instrument,
2582                self.core.account_id,
2583                ts_init,
2584            ) {
2585                Ok(r) => r,
2586                Err(e) => {
2587                    log::warn!("Failed to parse position status report: {e}");
2588                    continue;
2589                }
2590            };
2591
2592            reports.push(report);
2593        }
2594
2595        Ok(reports)
2596    }
2597
2598    async fn generate_mass_status(
2599        &self,
2600        lookback_mins: Option<u64>,
2601    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
2602        let ts_init = UnixNanos::default();
2603
2604        // Query orders
2605        let orders_response = self
2606            .http_client
2607            .inner
2608            .get_orders(&self.wallet_address, self.subaccount_number, None, None)
2609            .await
2610            .context("failed to fetch orders for mass status")?;
2611
2612        // Query subaccount for positions
2613        let subaccount_response = self
2614            .http_client
2615            .inner
2616            .get_subaccount(&self.wallet_address, self.subaccount_number)
2617            .await
2618            .context("failed to fetch subaccount for mass status")?;
2619
2620        // Query fills
2621        let fills_response = self
2622            .http_client
2623            .inner
2624            .get_fills(&self.wallet_address, self.subaccount_number, None, None)
2625            .await
2626            .context("failed to fetch fills for mass status")?;
2627
2628        // Parse order reports
2629        let mut order_reports = Vec::new();
2630        let mut orders_filtered = 0usize;
2631
2632        for order in orders_response {
2633            let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
2634                Some(inst) => inst,
2635                None => {
2636                    orders_filtered += 1;
2637                    continue;
2638                }
2639            };
2640
2641            match parse_order_status_report(&order, &instrument, self.core.account_id, ts_init) {
2642                Ok(mut r) => {
2643                    if !order.client_id.is_empty()
2644                        && let Ok(client_id_u32) = order.client_id.parse::<u32>()
2645                    {
2646                        self.encoder.register_known_client_id(client_id_u32);
2647
2648                        if let Some(decoded) = self
2649                            .encoder
2650                            .decode_if_known(client_id_u32, order.client_metadata)
2651                        {
2652                            log::debug!(
2653                                "Decoded reconciliation order: dYdX client_id={} meta={:#x} -> '{}'",
2654                                client_id_u32,
2655                                order.client_metadata,
2656                                decoded,
2657                            );
2658                            r.client_order_id = Some(decoded);
2659                        }
2660                    }
2661                    order_reports.push(r);
2662                }
2663                Err(e) => {
2664                    log::warn!("Failed to parse order status report: {e}");
2665                    orders_filtered += 1;
2666                }
2667            }
2668        }
2669
2670        // Parse position reports
2671        let mut position_reports = Vec::new();
2672
2673        for (market_ticker, perp_position) in
2674            &subaccount_response.subaccount.open_perpetual_positions
2675        {
2676            let instrument = match self.get_instrument_by_market(market_ticker) {
2677                Some(inst) => inst,
2678                None => continue,
2679            };
2680
2681            match parse_position_status_report(
2682                perp_position,
2683                &instrument,
2684                self.core.account_id,
2685                ts_init,
2686            ) {
2687                Ok(r) => position_reports.push(r),
2688                Err(e) => {
2689                    log::warn!("Failed to parse position status report: {e}");
2690                }
2691            }
2692        }
2693
2694        // Parse fill reports
2695        let mut fill_reports = Vec::new();
2696        let mut fills_filtered = 0usize;
2697
2698        for fill in fills_response.fills {
2699            let instrument = match self.get_instrument_by_market(&fill.market) {
2700                Some(inst) => inst,
2701                None => {
2702                    fills_filtered += 1;
2703                    continue;
2704                }
2705            };
2706
2707            match parse_fill_report(&fill, &instrument, self.core.account_id, ts_init) {
2708                Ok(r) => fill_reports.push(r),
2709                Err(e) => {
2710                    log::warn!("Failed to parse fill report: {e}");
2711                    fills_filtered += 1;
2712                }
2713            }
2714        }
2715
2716        apply_avg_px_from_fills(&mut order_reports, &fill_reports);
2717
2718        // Apply lookback filter to orders and fills (positions are always current state)
2719        if let Some(mins) = lookback_mins {
2720            let now_ns = self.clock.get_time_ns();
2721            let cutoff_ns = now_ns.as_u64().saturating_sub(mins * 60 * 1_000_000_000);
2722            let cutoff = UnixNanos::from(cutoff_ns);
2723
2724            let orders_before = order_reports.len();
2725            order_reports.retain(|r| r.ts_last >= cutoff);
2726            let orders_removed = orders_before - order_reports.len();
2727
2728            let fills_before = fill_reports.len();
2729            fill_reports.retain(|r| r.ts_event >= cutoff);
2730            let fills_removed = fills_before - fill_reports.len();
2731
2732            log::info!(
2733                "Lookback filter ({}min): orders {}->{} (removed {}), fills {}->{} (removed {}), positions {} (unfiltered)",
2734                mins,
2735                orders_before,
2736                order_reports.len(),
2737                orders_removed,
2738                fills_before,
2739                fill_reports.len(),
2740                fills_removed,
2741                position_reports.len(),
2742            );
2743        } else {
2744            log::debug!(
2745                "Generated mass status: {} orders ({} filtered), {} positions, {} fills ({} filtered)",
2746                order_reports.len(),
2747                orders_filtered,
2748                position_reports.len(),
2749                fill_reports.len(),
2750                fills_filtered,
2751            );
2752        }
2753
2754        // Create mass status and add reports
2755        let mut mass_status = ExecutionMassStatus::new(
2756            self.core.client_id,
2757            self.core.account_id,
2758            self.core.venue,
2759            ts_init,
2760            None, // report_id will be auto-generated
2761        );
2762
2763        mass_status.add_order_reports(order_reports);
2764        mass_status.add_position_reports(position_reports);
2765        mass_status.add_fill_reports(fill_reports);
2766
2767        Ok(Some(mass_status))
2768    }
2769}
2770
2771/// Iterates `orders` and returns the first report whose parsed fields match every active
2772/// filter. Extracted from `generate_order_status_report` so the matching loop can be
2773/// exercised in isolation.
2774#[allow(clippy::too_many_arguments)]
2775fn find_matching_order_report<F>(
2776    orders: &[crate::http::models::Order],
2777    instrument_filter: Option<InstrumentId>,
2778    client_order_id_filter: Option<ClientOrderId>,
2779    venue_order_id_filter: Option<VenueOrderId>,
2780    lookup_instrument: F,
2781    encoder: &ClientOrderIdEncoder,
2782    account_id: AccountId,
2783    ts_init: UnixNanos,
2784) -> anyhow::Result<Option<OrderStatusReport>>
2785where
2786    F: Fn(u32) -> Option<InstrumentAny>,
2787{
2788    for order in orders {
2789        let instrument = match lookup_instrument(order.clob_pair_id) {
2790            Some(inst) => inst,
2791            None => continue,
2792        };
2793
2794        if let Some(filter_id) = instrument_filter
2795            && instrument.id() != filter_id
2796        {
2797            continue;
2798        }
2799
2800        let mut report = parse_order_status_report(order, &instrument, account_id, ts_init)
2801            .context("failed to parse order status report")?;
2802
2803        if !order.client_id.is_empty()
2804            && let Ok(client_id_u32) = order.client_id.parse::<u32>()
2805        {
2806            encoder.register_known_client_id(client_id_u32);
2807
2808            if let Some(decoded) = encoder.decode_if_known(client_id_u32, order.client_metadata) {
2809                log::debug!(
2810                    "Decoded order: dYdX client_id={} meta={:#x} -> '{}'",
2811                    client_id_u32,
2812                    order.client_metadata,
2813                    decoded,
2814                );
2815                report.client_order_id = Some(decoded);
2816            }
2817        }
2818
2819        if let Some(client_order_id) = client_order_id_filter
2820            && report.client_order_id != Some(client_order_id)
2821        {
2822            continue;
2823        }
2824
2825        if let Some(venue_order_id) = venue_order_id_filter
2826            && report.venue_order_id.as_str() != venue_order_id.as_str()
2827        {
2828            continue;
2829        }
2830
2831        return Ok(Some(report));
2832    }
2833
2834    Ok(None)
2835}
2836
2837#[cfg(test)]
2838mod tests {
2839    use nautilus_model::{
2840        enums::OrderSide as NautilusOrderSide,
2841        identifiers::Symbol,
2842        instruments::{CryptoPerpetual, InstrumentAny},
2843        types::{Currency, Price, Quantity},
2844    };
2845    use rstest::rstest;
2846    use rust_decimal_macros::dec;
2847
2848    use super::*;
2849    use crate::{
2850        common::enums::{DydxOrderStatus, DydxOrderType, DydxTimeInForce},
2851        http::models::Order,
2852    };
2853
2854    fn test_instrument(symbol: &str, venue: &str) -> InstrumentAny {
2855        let instrument_id = InstrumentId::new(Symbol::new(symbol), Venue::new(venue));
2856        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
2857            instrument_id,
2858            instrument_id.symbol,
2859            Currency::BTC(),
2860            Currency::USD(),
2861            Currency::USD(),
2862            false,
2863            2,
2864            3,
2865            Price::new(0.01, 2),
2866            Quantity::new(0.001, 3),
2867            None,
2868            None,
2869            None,
2870            None,
2871            None,
2872            None,
2873            None,
2874            None,
2875            None,
2876            None,
2877            None,
2878            None,
2879            None,
2880            UnixNanos::default(),
2881            UnixNanos::default(),
2882        ))
2883    }
2884
2885    fn test_order(id: &str, clob_pair_id: u32, client_id: &str) -> Order {
2886        Order {
2887            id: id.to_string(),
2888            subaccount_id: "sub-1".to_string(),
2889            client_id: client_id.to_string(),
2890            clob_pair_id,
2891            side: NautilusOrderSide::Buy,
2892            size: dec!(1.0),
2893            total_filled: dec!(0),
2894            price: dec!(50000),
2895            status: DydxOrderStatus::Open,
2896            order_type: DydxOrderType::Limit,
2897            time_in_force: DydxTimeInForce::Gtt,
2898            reduce_only: false,
2899            post_only: false,
2900            order_flags: 64,
2901            good_til_block: None,
2902            good_til_block_time: None,
2903            created_at_height: Some(100),
2904            client_metadata: 4,
2905            trigger_price: None,
2906            condition_type: None,
2907            conditional_order_trigger_subticks: None,
2908            execution: None,
2909            updated_at: None,
2910            updated_at_height: None,
2911            ticker: None,
2912            subaccount_number: 0,
2913            order_router_address: None,
2914        }
2915    }
2916
2917    #[rstest]
2918    fn test_find_matching_order_report_returns_later_match() {
2919        // Regression guard: earlier implementation fetched limit=1 and returned None
2920        // when the first order didn't match the filter. The fixed iteration logic must
2921        // scan the whole response and return the matching entry.
2922        let btc_inst = test_instrument("BTC-USD-PERP", "DYDX");
2923        let eth_inst = test_instrument("ETH-USD-PERP", "DYDX");
2924
2925        // Response ordered so the non-matching order comes first.
2926        let orders = vec![
2927            test_order("order-eth", 1, "22222"),
2928            test_order("order-btc", 0, "11111"),
2929        ];
2930
2931        let encoder = ClientOrderIdEncoder::new();
2932        let report = find_matching_order_report(
2933            &orders,
2934            Some(btc_inst.id()),
2935            None,
2936            None,
2937            |clob_pair_id| match clob_pair_id {
2938                0 => Some(btc_inst.clone()),
2939                1 => Some(eth_inst.clone()),
2940                _ => None,
2941            },
2942            &encoder,
2943            AccountId::new("DYDX-001"),
2944            UnixNanos::default(),
2945        )
2946        .expect("lookup should succeed");
2947
2948        let report = report.expect("matching order should be found");
2949        assert_eq!(report.instrument_id, btc_inst.id());
2950        assert_eq!(report.venue_order_id.as_str(), "order-btc");
2951    }
2952
2953    #[rstest]
2954    fn test_find_matching_order_report_returns_none_when_no_match() {
2955        let btc_inst = test_instrument("BTC-USD-PERP", "DYDX");
2956        let eth_inst = test_instrument("ETH-USD-PERP", "DYDX");
2957
2958        let orders = vec![
2959            test_order("order-eth-1", 1, "22222"),
2960            test_order("order-eth-2", 1, "33333"),
2961        ];
2962
2963        let encoder = ClientOrderIdEncoder::new();
2964        let report = find_matching_order_report(
2965            &orders,
2966            Some(btc_inst.id()),
2967            None,
2968            None,
2969            |clob_pair_id| match clob_pair_id {
2970                0 => Some(btc_inst.clone()),
2971                1 => Some(eth_inst.clone()),
2972                _ => None,
2973            },
2974            &encoder,
2975            AccountId::new("DYDX-001"),
2976            UnixNanos::default(),
2977        )
2978        .expect("lookup should succeed");
2979
2980        assert!(report.is_none());
2981    }
2982
2983    #[rstest]
2984    fn test_find_matching_order_report_filters_by_venue_order_id() {
2985        let btc_inst = test_instrument("BTC-USD-PERP", "DYDX");
2986
2987        let orders = vec![
2988            test_order("order-a", 0, "11111"),
2989            test_order("order-b", 0, "22222"),
2990            test_order("order-c", 0, "33333"),
2991        ];
2992
2993        let encoder = ClientOrderIdEncoder::new();
2994        let target = VenueOrderId::new("order-b");
2995        let report = find_matching_order_report(
2996            &orders,
2997            None,
2998            None,
2999            Some(target),
3000            |_| Some(btc_inst.clone()),
3001            &encoder,
3002            AccountId::new("DYDX-001"),
3003            UnixNanos::default(),
3004        )
3005        .expect("lookup should succeed")
3006        .expect("matching order should be found");
3007
3008        assert_eq!(report.venue_order_id.as_str(), "order-b");
3009    }
3010
3011    #[rstest]
3012    fn test_find_matching_order_report_skips_orders_without_cached_instrument() {
3013        let btc_inst = test_instrument("BTC-USD-PERP", "DYDX");
3014
3015        let orders = vec![
3016            // First order's clob_pair_id does not resolve -- must be skipped.
3017            test_order("order-unknown", 99, "11111"),
3018            test_order("order-btc", 0, "22222"),
3019        ];
3020
3021        let encoder = ClientOrderIdEncoder::new();
3022        let report = find_matching_order_report(
3023            &orders,
3024            Some(btc_inst.id()),
3025            None,
3026            None,
3027            |clob_pair_id| (clob_pair_id == 0).then(|| btc_inst.clone()),
3028            &encoder,
3029            AccountId::new("DYDX-001"),
3030            UnixNanos::default(),
3031        )
3032        .expect("lookup should succeed")
3033        .expect("matching order should be found");
3034
3035        assert_eq!(report.venue_order_id.as_str(), "order-btc");
3036    }
3037}