Skip to main content

nautilus_hyperliquid/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Hyperliquid adapter.
17
18use std::{
19    sync::{Arc, Mutex},
20    time::{Duration, Instant},
21};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use nautilus_common::{
26    cache::fifo::FifoCache,
27    clients::ExecutionClient,
28    live::{runner::get_exec_event_sender, runtime::get_runtime},
29    messages::execution::{
30        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
31        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
32        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
33    },
34};
35use nautilus_core::{
36    MUTEX_POISONED, Params, UUID4, UnixNanos,
37    time::{AtomicTime, get_atomic_clock_realtime},
38};
39use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
40use nautilus_model::{
41    accounts::AccountAny,
42    enums::{AccountType, OmsType, OrderSide, OrderStatus, OrderType},
43    identifiers::{
44        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
45    },
46    orders::{Order, any::OrderAny},
47    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48    types::{AccountBalance, MarginBalance},
49};
50use tokio::task::JoinHandle;
51use ustr::Ustr;
52
53use crate::{
54    common::{
55        consts::{HYPERLIQUID_VENUE, NAUTILUS_BUILDER_ADDRESS},
56        credential::Secrets,
57        parse::{
58            clamp_price_to_precision, client_order_id_to_cancel_request_with_asset,
59            derive_limit_from_trigger, derive_market_order_price, extract_error_message,
60            extract_inner_error, extract_inner_errors, normalize_price,
61            order_to_hyperliquid_request_with_asset, parse_combined_account_balances_and_margins,
62            round_to_sig_figs,
63        },
64    },
65    config::HyperliquidExecClientConfig,
66    http::{
67        client::HyperliquidHttpClient,
68        models::{
69            ClearinghouseState, Cloid, HyperliquidExecAction, HyperliquidExecBuilderFee,
70            HyperliquidExecGrouping, HyperliquidExecModifyOrderRequest, HyperliquidExecOrderKind,
71            SpotClearinghouseState,
72        },
73    },
74    websocket::{
75        ExecutionReport, NautilusWsMessage,
76        client::HyperliquidWebSocketClient,
77        dispatch::{
78            DispatchOutcome, OrderIdentity, WsDispatchState, dispatch_fill_report,
79            dispatch_order_status_report,
80        },
81    },
82};
83
84#[derive(Debug)]
85pub struct HyperliquidExecutionClient {
86    core: ExecutionClientCore,
87    clock: &'static AtomicTime,
88    config: HyperliquidExecClientConfig,
89    emitter: ExecutionEventEmitter,
90    http_client: HyperliquidHttpClient,
91    ws_client: HyperliquidWebSocketClient,
92    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
93    ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
94    ws_dispatch_state: Arc<WsDispatchState>,
95}
96
97impl HyperliquidExecutionClient {
98    /// Returns a reference to the configuration.
99    pub fn config(&self) -> &HyperliquidExecClientConfig {
100        &self.config
101    }
102
103    /// Returns a reference to the shared WebSocket dispatch state.
104    ///
105    /// Exposes the identity map, pending-modify markers, and cached venue
106    /// order ids used by the two-tier dispatch contract. The state is
107    /// read-write via an [`Arc`]; callers must not mutate it directly, but
108    /// it is useful for inspection in tests and for live debugging.
109    #[must_use]
110    pub fn ws_dispatch_state(&self) -> &Arc<WsDispatchState> {
111        &self.ws_dispatch_state
112    }
113
114    /// Returns `true` when every background task spawned via `spawn_task`
115    /// has completed.
116    ///
117    /// Used in tests to wait for submit / modify / cancel HTTP round-trips
118    /// that fire on the runtime to finish before asserting on dispatch
119    /// state, avoiding bare `sleep` calls when a negative condition needs
120    /// to be checked after the spawned work is done.
121    #[allow(
122        clippy::missing_panics_doc,
123        reason = "pending_tasks mutex poisoning is not expected"
124    )]
125    #[must_use]
126    pub fn pending_tasks_all_finished(&self) -> bool {
127        let tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
128        tasks.iter().all(|h| h.is_finished())
129    }
130
131    fn resolve_slippage_bps(&self, params: Option<&Params>) -> u32 {
132        params
133            .and_then(|p| p.get_u64("market_order_slippage_bps"))
134            .map_or(self.config.market_order_slippage_bps, |v| v as u32)
135    }
136
137    fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
138        // Check if instrument symbol is supported
139        // Hyperliquid instruments: {base}-USD-PERP or {base}-{quote}-SPOT
140        let instrument_id = order.instrument_id();
141        let symbol = instrument_id.symbol.as_str();
142        if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
143            anyhow::bail!(
144                "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
145            );
146        }
147
148        // Check if order type is supported
149        match order.order_type() {
150            OrderType::Market
151            | OrderType::Limit
152            | OrderType::StopMarket
153            | OrderType::StopLimit
154            | OrderType::MarketIfTouched
155            | OrderType::LimitIfTouched => {}
156            _ => anyhow::bail!(
157                "Unsupported order type for Hyperliquid: {:?}",
158                order.order_type()
159            ),
160        }
161
162        // Check if conditional orders have trigger price
163        if matches!(
164            order.order_type(),
165            OrderType::StopMarket
166                | OrderType::StopLimit
167                | OrderType::MarketIfTouched
168                | OrderType::LimitIfTouched
169        ) && order.trigger_price().is_none()
170        {
171            anyhow::bail!(
172                "Conditional orders require a trigger price for Hyperliquid: {:?}",
173                order.order_type()
174            );
175        }
176
177        // Check if limit-based orders have price
178        if matches!(
179            order.order_type(),
180            OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
181        ) && order.price().is_none()
182        {
183            anyhow::bail!(
184                "Limit orders require a limit price for Hyperliquid: {:?}",
185                order.order_type()
186            );
187        }
188
189        Ok(())
190    }
191
192    /// Creates a new [`HyperliquidExecutionClient`].
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if either the HTTP or WebSocket client fail to construct.
197    pub fn new(
198        core: ExecutionClientCore,
199        config: HyperliquidExecClientConfig,
200    ) -> anyhow::Result<Self> {
201        let secrets = Secrets::resolve(
202            config.private_key.as_deref(),
203            config.vault_address.as_deref(),
204            config.environment,
205        )
206        .context("Hyperliquid execution client requires private key")?;
207
208        let mut http_client = HyperliquidHttpClient::with_secrets(
209            &secrets,
210            config.http_timeout_secs,
211            config.proxy_url.clone(),
212        )
213        .context("failed to create Hyperliquid HTTP client")?;
214
215        http_client.set_account_id(core.account_id);
216        http_client.set_account_address(config.account_address.clone());
217        http_client.set_normalize_prices(config.normalize_prices);
218        http_client.set_market_order_slippage_bps(config.market_order_slippage_bps);
219
220        // Apply URL overrides from config (used for testing with mock servers)
221        if let Some(url) = &config.base_url_http {
222            http_client.set_base_info_url(url.clone());
223        }
224
225        if let Some(url) = &config.base_url_exchange {
226            http_client.set_base_exchange_url(url.clone());
227        }
228
229        let ws_url = config.base_url_ws.clone();
230        let ws_client = HyperliquidWebSocketClient::new(
231            ws_url,
232            config.environment,
233            Some(core.account_id),
234            config.transport_backend,
235            config.proxy_url.clone(),
236        );
237
238        let clock = get_atomic_clock_realtime();
239        let emitter = ExecutionEventEmitter::new(
240            clock,
241            core.trader_id,
242            core.account_id,
243            AccountType::Margin,
244            None,
245        );
246
247        Ok(Self {
248            core,
249            clock,
250            config,
251            emitter,
252            http_client,
253            ws_client,
254            pending_tasks: Mutex::new(Vec::new()),
255            ws_stream_handle: Mutex::new(None),
256            ws_dispatch_state: Arc::new(WsDispatchState::new()),
257        })
258    }
259
260    fn register_order_identity(&self, order: &OrderAny) {
261        register_order_identity_into(&self.ws_dispatch_state, order);
262    }
263
264    async fn ensure_instruments_initialized_async(&self) -> anyhow::Result<()> {
265        if self.core.instruments_initialized() {
266            return Ok(());
267        }
268
269        let instruments = self
270            .http_client
271            .request_instruments()
272            .await
273            .context("failed to request Hyperliquid instruments")?;
274
275        if instruments.is_empty() {
276            log::warn!(
277                "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
278            );
279        } else {
280            log::info!("Initialized {} instruments", instruments.len());
281
282            for instrument in &instruments {
283                self.http_client.cache_instrument(instrument);
284            }
285        }
286
287        self.core.set_instruments_initialized();
288        Ok(())
289    }
290
291    async fn refresh_account_state(&self) -> anyhow::Result<()> {
292        let account_address = self.get_account_address()?;
293
294        let (perp_state, spot_state) = self
295            .fetch_combined_clearinghouse_state(&account_address)
296            .await?;
297
298        log::debug!(
299            "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}, spot_balances={}",
300            perp_state.cross_margin_summary,
301            perp_state.asset_positions.len(),
302            spot_state.balances.len(),
303        );
304
305        let (balances, margins) =
306            parse_combined_account_balances_and_margins(&perp_state, &spot_state)
307                .context("failed to parse combined account balances and margins")?;
308
309        // Emit even when both sides are empty so the account registers for
310        // await_account_registered on unfunded wallets.
311        let ts_event = self.clock.get_time_ns();
312        self.emitter
313            .emit_account_state(balances, margins, true, ts_event);
314
315        log::info!("Account state updated successfully");
316        Ok(())
317    }
318
319    async fn fetch_combined_clearinghouse_state(
320        &self,
321        account_address: &str,
322    ) -> anyhow::Result<(ClearinghouseState, SpotClearinghouseState)> {
323        let perp_json = self
324            .http_client
325            .info_clearinghouse_state(account_address)
326            .await
327            .context("failed to fetch clearinghouse state")?;
328        let perp_state: ClearinghouseState = serde_json::from_value(perp_json)
329            .context("failed to deserialize clearinghouse state")?;
330
331        let spot_json = self
332            .http_client
333            .info_spot_clearinghouse_state(account_address)
334            .await
335            .context("failed to fetch spot clearinghouse state")?;
336        let spot_state: SpotClearinghouseState = serde_json::from_value(spot_json)
337            .context("failed to deserialize spot clearinghouse state")?;
338
339        Ok((perp_state, spot_state))
340    }
341
342    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
343        let account_id = self.core.account_id;
344
345        if self.core.cache().account(&account_id).is_some() {
346            log::info!("Account {account_id} registered");
347            return Ok(());
348        }
349
350        let start = Instant::now();
351        let timeout = Duration::from_secs_f64(timeout_secs);
352        let interval = Duration::from_millis(10);
353
354        loop {
355            tokio::time::sleep(interval).await;
356
357            if self.core.cache().account(&account_id).is_some() {
358                log::info!("Account {account_id} registered");
359                return Ok(());
360            }
361
362            if start.elapsed() >= timeout {
363                anyhow::bail!(
364                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
365                );
366            }
367        }
368    }
369
370    fn get_user_address(&self) -> anyhow::Result<String> {
371        self.http_client
372            .get_user_address()
373            .context("failed to get user address from HTTP client")
374    }
375
376    fn get_account_address(&self) -> anyhow::Result<String> {
377        if let Some(addr) = &self.config.account_address {
378            return Ok(addr.clone());
379        }
380
381        match &self.config.vault_address {
382            Some(vault) => Ok(vault.clone()),
383            None => self.get_user_address(),
384        }
385    }
386
387    fn spawn_task<F>(&self, description: &'static str, fut: F)
388    where
389        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
390    {
391        let runtime = get_runtime();
392        let handle = runtime.spawn(async move {
393            if let Err(e) = fut.await {
394                log::warn!("{description} failed: {e:?}");
395            }
396        });
397
398        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
399        tasks.retain(|handle| !handle.is_finished());
400        tasks.push(handle);
401    }
402
403    fn abort_pending_tasks(&self) {
404        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
405        for handle in tasks.drain(..) {
406            handle.abort();
407        }
408    }
409}
410
411#[async_trait(?Send)]
412impl ExecutionClient for HyperliquidExecutionClient {
413    fn is_connected(&self) -> bool {
414        self.core.is_connected()
415    }
416
417    fn client_id(&self) -> ClientId {
418        self.core.client_id
419    }
420
421    fn account_id(&self) -> AccountId {
422        self.core.account_id
423    }
424
425    fn venue(&self) -> Venue {
426        *HYPERLIQUID_VENUE
427    }
428
429    fn oms_type(&self) -> OmsType {
430        self.core.oms_type
431    }
432
433    fn get_account(&self) -> Option<AccountAny> {
434        self.core.cache().account(&self.core.account_id).cloned()
435    }
436
437    fn generate_account_state(
438        &self,
439        balances: Vec<AccountBalance>,
440        margins: Vec<MarginBalance>,
441        reported: bool,
442        ts_event: UnixNanos,
443    ) -> anyhow::Result<()> {
444        self.emitter
445            .emit_account_state(balances, margins, reported, ts_event);
446        Ok(())
447    }
448
449    fn start(&mut self) -> anyhow::Result<()> {
450        if self.core.is_started() {
451            return Ok(());
452        }
453
454        let sender = get_exec_event_sender();
455        self.emitter.set_sender(sender);
456        self.core.set_started();
457
458        log::info!(
459            "Started: client_id={}, account_id={}, environment={:?}, vault_address={:?}, proxy_url={:?}",
460            self.core.client_id,
461            self.core.account_id,
462            self.config.environment,
463            self.config.vault_address,
464            self.config.proxy_url,
465        );
466
467        Ok(())
468    }
469
470    fn stop(&mut self) -> anyhow::Result<()> {
471        if self.core.is_stopped() {
472            return Ok(());
473        }
474
475        log::info!("Stopping Hyperliquid execution client");
476
477        if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
478            handle.abort();
479        }
480
481        self.abort_pending_tasks();
482        self.ws_client.abort();
483
484        self.core.set_disconnected();
485        self.core.set_stopped();
486
487        log::info!("Hyperliquid execution client stopped");
488        Ok(())
489    }
490
491    fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
492        let order = self
493            .core
494            .cache()
495            .order(&cmd.client_order_id)
496            .cloned()
497            .ok_or_else(|| {
498                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
499            })?;
500
501        if order.is_closed() {
502            log::warn!("Cannot submit closed order {}", order.client_order_id());
503            return Ok(());
504        }
505
506        if let Err(e) = self.validate_order_submission(&order) {
507            self.emitter
508                .emit_order_denied(&order, &format!("Validation failed: {e}"));
509            return Err(e);
510        }
511
512        let http_client = self.http_client.clone();
513        let symbol = order.instrument_id().symbol.to_string();
514
515        // Validate asset index exists before marking as submitted
516        let asset = match http_client.get_asset_index(&symbol) {
517            Some(a) => a,
518            None => {
519                self.emitter
520                    .emit_order_denied(&order, &format!("Asset index not found for {symbol}"));
521                return Ok(());
522            }
523        };
524
525        // Validate order conversion before marking as submitted
526        let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
527        let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
528        let mut hyperliquid_order = match order_to_hyperliquid_request_with_asset(
529            &order,
530            asset,
531            price_decimals,
532            self.config.normalize_prices,
533            slippage_bps,
534        ) {
535            Ok(req) => req,
536            Err(e) => {
537                self.emitter
538                    .emit_order_denied(&order, &format!("Order conversion failed: {e}"));
539                return Ok(());
540            }
541        };
542
543        // Market orders need a limit price derived from the cached quote
544        if order.order_type() == OrderType::Market {
545            let instrument_id = order.instrument_id();
546            let cache = self.core.cache();
547            match cache.quote(&instrument_id) {
548                Some(quote) => {
549                    let is_buy = order.order_side() == OrderSide::Buy;
550                    hyperliquid_order.price =
551                        derive_market_order_price(quote, is_buy, price_decimals, slippage_bps);
552                }
553                None => {
554                    self.emitter.emit_order_denied(
555                        &order,
556                        &format!(
557                            "No cached quote for {instrument_id}: \
558                             subscribe to quote data before submitting market orders"
559                        ),
560                    );
561                    return Ok(());
562                }
563            }
564        }
565
566        log::info!(
567            "Submitting order: id={}, type={:?}, side={:?}, price={}, size={}, kind={:?}",
568            order.client_order_id(),
569            order.order_type(),
570            order.order_side(),
571            hyperliquid_order.price,
572            hyperliquid_order.size,
573            hyperliquid_order.kind,
574        );
575
576        // Cache cloid mapping before emitting submitted so WS handler
577        // can resolve order/fill reports back to this client_order_id
578        let cloid = Cloid::from_client_order_id(order.client_order_id());
579        self.ws_client
580            .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
581
582        self.register_order_identity(&order);
583
584        self.emitter.emit_order_submitted(&order);
585
586        let emitter = self.emitter.clone();
587        let clock = self.clock;
588        let ws_client = self.ws_client.clone();
589        let cloid_hex = Ustr::from(&cloid.to_hex());
590        let dispatch_state = self.ws_dispatch_state.clone();
591        let client_order_id = order.client_order_id();
592
593        // Vaults cannot approve builder fees, so skip builder attribution
594        // for vault orders to avoid "Builder fee has not been approved" rejection
595        let builder = if self.http_client.has_vault_address() {
596            None
597        } else {
598            Some(HyperliquidExecBuilderFee {
599                address: NAUTILUS_BUILDER_ADDRESS.to_string(),
600                fee_tenths_bp: 0,
601            })
602        };
603
604        self.spawn_task("submit_order", async move {
605            let action = HyperliquidExecAction::Order {
606                orders: vec![hyperliquid_order],
607                grouping: HyperliquidExecGrouping::Na,
608                builder,
609            };
610
611            match http_client.post_action_exec(&action).await {
612                Ok(response) => {
613                    if response.is_ok() {
614                        if let Some(inner_error) = extract_inner_error(&response) {
615                            log::warn!("Order submission rejected by exchange: {inner_error}");
616                            let ts = clock.get_time_ns();
617                            emitter.emit_order_rejected(&order, &inner_error, ts, false);
618                            ws_client.remove_cloid_mapping(&cloid_hex);
619                            dispatch_state.cleanup_terminal(&client_order_id);
620                        } else {
621                            log::info!("Order submitted successfully: {response:?}");
622                        }
623                    } else {
624                        let error_msg = extract_error_message(&response);
625                        log::warn!("Order submission rejected by exchange: {error_msg}");
626                        let ts = clock.get_time_ns();
627                        emitter.emit_order_rejected(&order, &error_msg, ts, false);
628                        ws_client.remove_cloid_mapping(&cloid_hex);
629                        dispatch_state.cleanup_terminal(&client_order_id);
630                    }
631                }
632                Err(e) => {
633                    // Don't reject on transport errors: the order may have
634                    // landed and WS events will drive the lifecycle. If it
635                    // didn't land, reconciliation on reconnect resolves it.
636                    log::error!("Order submission HTTP request failed: {e}");
637                }
638            }
639
640            Ok(())
641        });
642
643        Ok(())
644    }
645
646    fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
647        log::debug!(
648            "Submitting order list with {} orders",
649            cmd.order_list.client_order_ids.len()
650        );
651
652        let http_client = self.http_client.clone();
653        let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
654
655        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
656
657        // Validate all orders synchronously and collect valid ones
658        let mut valid_orders = Vec::new();
659        let mut hyperliquid_orders = Vec::new();
660
661        for order in &orders {
662            let symbol = order.instrument_id().symbol.to_string();
663            let asset = match http_client.get_asset_index(&symbol) {
664                Some(a) => a,
665                None => {
666                    self.emitter
667                        .emit_order_denied(order, &format!("Asset index not found for {symbol}"));
668                    continue;
669                }
670            };
671
672            let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
673
674            match order_to_hyperliquid_request_with_asset(
675                order,
676                asset,
677                price_decimals,
678                self.config.normalize_prices,
679                slippage_bps,
680            ) {
681                Ok(req) => {
682                    hyperliquid_orders.push(req);
683                    valid_orders.push(order.clone());
684                }
685                Err(e) => {
686                    self.emitter
687                        .emit_order_denied(order, &format!("Order conversion failed: {e}"));
688                }
689            }
690        }
691
692        if valid_orders.is_empty() {
693            log::warn!("No valid orders to submit in order list");
694            return Ok(());
695        }
696
697        let grouping = determine_order_list_grouping(&valid_orders);
698        log::info!("Order list grouping: {grouping:?}");
699
700        for order in &valid_orders {
701            let cloid = Cloid::from_client_order_id(order.client_order_id());
702            self.ws_client
703                .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
704            self.register_order_identity(order);
705            self.emitter.emit_order_submitted(order);
706        }
707
708        let emitter = self.emitter.clone();
709        let clock = self.clock;
710        let ws_client = self.ws_client.clone();
711        let dispatch_state = self.ws_dispatch_state.clone();
712        let cloid_hexes: Vec<Ustr> = valid_orders
713            .iter()
714            .map(|o| Ustr::from(&Cloid::from_client_order_id(o.client_order_id()).to_hex()))
715            .collect();
716        let client_order_ids: Vec<ClientOrderId> =
717            valid_orders.iter().map(|o| o.client_order_id()).collect();
718
719        let builder = if self.http_client.has_vault_address() {
720            None
721        } else {
722            Some(HyperliquidExecBuilderFee {
723                address: NAUTILUS_BUILDER_ADDRESS.to_string(),
724                fee_tenths_bp: 0,
725            })
726        };
727
728        self.spawn_task("submit_order_list", async move {
729            let action = HyperliquidExecAction::Order {
730                orders: hyperliquid_orders,
731                grouping,
732                builder,
733            };
734
735            match http_client.post_action_exec(&action).await {
736                Ok(response) => {
737                    if response.is_ok() {
738                        let inner_errors = extract_inner_errors(&response);
739
740                        // For grouped orders (NormalTpsl/PositionTpsl), the
741                        // exchange returns a single status for the whole group
742                        // rather than one per order. If fewer statuses than
743                        // orders are returned, broadcast the first error (if
744                        // any) to all orders, or treat all as successful.
745                        if inner_errors.len() < valid_orders.len() {
746                            if let Some(error_msg) = inner_errors.iter().find_map(|e| e.as_ref()) {
747                                let ts = clock.get_time_ns();
748
749                                for ((order, cloid_hex), cid) in valid_orders
750                                    .iter()
751                                    .zip(cloid_hexes.iter())
752                                    .zip(client_order_ids.iter())
753                                {
754                                    log::warn!(
755                                        "Order {} rejected by exchange: {error_msg}",
756                                        order.client_order_id(),
757                                    );
758                                    emitter.emit_order_rejected(order, error_msg, ts, false);
759                                    ws_client.remove_cloid_mapping(cloid_hex);
760                                    dispatch_state.cleanup_terminal(cid);
761                                }
762                            } else {
763                                log::info!("Order list submitted successfully: {response:?}");
764                            }
765                        } else if inner_errors.iter().any(|e| e.is_some()) {
766                            let ts = clock.get_time_ns();
767
768                            for (i, error) in inner_errors.iter().enumerate() {
769                                if let Some(error_msg) = error {
770                                    if let Some(order) = valid_orders.get(i) {
771                                        log::warn!(
772                                            "Order {} rejected by exchange: {error_msg}",
773                                            order.client_order_id(),
774                                        );
775                                        emitter.emit_order_rejected(order, error_msg, ts, false);
776                                    }
777
778                                    if let Some(cloid_hex) = cloid_hexes.get(i) {
779                                        ws_client.remove_cloid_mapping(cloid_hex);
780                                    }
781
782                                    if let Some(cid) = client_order_ids.get(i) {
783                                        dispatch_state.cleanup_terminal(cid);
784                                    }
785                                }
786                            }
787                        } else {
788                            log::info!("Order list submitted successfully: {response:?}");
789                        }
790                    } else {
791                        let error_msg = extract_error_message(&response);
792                        log::warn!("Order list submission rejected by exchange: {error_msg}");
793                        let ts = clock.get_time_ns();
794                        for order in &valid_orders {
795                            emitter.emit_order_rejected(order, &error_msg, ts, false);
796                        }
797
798                        for cloid_hex in &cloid_hexes {
799                            ws_client.remove_cloid_mapping(cloid_hex);
800                        }
801
802                        for cid in &client_order_ids {
803                            dispatch_state.cleanup_terminal(cid);
804                        }
805                    }
806                }
807                Err(e) => {
808                    // Don't reject on transport errors: orders may have
809                    // landed and WS events will drive the lifecycle. If they
810                    // didn't land, reconciliation on reconnect resolves it.
811                    log::error!("Order list submission HTTP request failed: {e}");
812                }
813            }
814
815            Ok(())
816        });
817
818        Ok(())
819    }
820
821    fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
822        log::debug!("Modifying order: {cmd:?}");
823
824        let venue_order_id = match cmd.venue_order_id {
825            Some(id) => id,
826            None => {
827                let reason = "venue_order_id is required for modify";
828                log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
829                self.emitter.emit_order_modify_rejected_event(
830                    cmd.strategy_id,
831                    cmd.instrument_id,
832                    cmd.client_order_id,
833                    None,
834                    reason,
835                    self.clock.get_time_ns(),
836                );
837                return Ok(());
838            }
839        };
840
841        let oid: u64 = match venue_order_id.as_str().parse() {
842            Ok(id) => id,
843            Err(e) => {
844                let reason = format!("Failed to parse venue_order_id '{venue_order_id}': {e}");
845                log::warn!("{reason}");
846                self.emitter.emit_order_modify_rejected_event(
847                    cmd.strategy_id,
848                    cmd.instrument_id,
849                    cmd.client_order_id,
850                    Some(venue_order_id),
851                    &reason,
852                    self.clock.get_time_ns(),
853                );
854                return Ok(());
855            }
856        };
857
858        // Look up cached order to get side, reduce_only, post_only, TIF
859        let order = match self.core.cache().order(&cmd.client_order_id).cloned() {
860            Some(o) => o,
861            None => {
862                let reason = "order not found in cache";
863                log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
864                self.emitter.emit_order_modify_rejected_event(
865                    cmd.strategy_id,
866                    cmd.instrument_id,
867                    cmd.client_order_id,
868                    Some(venue_order_id),
869                    reason,
870                    self.clock.get_time_ns(),
871                );
872                return Ok(());
873            }
874        };
875
876        let http_client = self.http_client.clone();
877        let symbol = cmd.instrument_id.symbol.to_string();
878        let should_normalize = self.config.normalize_prices;
879        let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
880
881        let quantity = cmd.quantity.unwrap_or(order.leaves_qty());
882        let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
883        let asset = match http_client.get_asset_index(&symbol) {
884            Some(a) => a,
885            None => {
886                log::warn!(
887                    "Asset index not found for symbol {symbol}, ensure instruments are loaded",
888                );
889                return Ok(());
890            }
891        };
892
893        // Build base request from cached order (derives slippage-adjusted
894        // limit for trigger-market types like StopMarket/MarketIfTouched)
895        let hyperliquid_order = match order_to_hyperliquid_request_with_asset(
896            &order,
897            asset,
898            price_decimals,
899            should_normalize,
900            slippage_bps,
901        ) {
902            Ok(mut req) => {
903                // Only override price when explicitly provided
904                if let Some(p) = cmd.price.or(order.price()) {
905                    let price_dec = p.as_decimal();
906                    req.price = if should_normalize {
907                        normalize_price(price_dec, price_decimals).normalize()
908                    } else {
909                        price_dec.normalize()
910                    };
911                } else if let Some(tp) = cmd.trigger_price {
912                    // Trigger changed but no explicit price: re-derive the
913                    // slippage-adjusted limit from the new trigger
914                    let is_buy = order.order_side() == OrderSide::Buy;
915                    let base = tp.as_decimal().normalize();
916                    let derived = derive_limit_from_trigger(base, is_buy, slippage_bps);
917                    let sig_rounded = round_to_sig_figs(derived, 5);
918                    req.price =
919                        clamp_price_to_precision(sig_rounded, price_decimals, is_buy).normalize();
920                }
921                // else: keep the derived price from order_to_hyperliquid_request
922
923                req.size = quantity.as_decimal().normalize();
924
925                // Update trigger_px if the command provides a new trigger
926                if let (Some(tp), HyperliquidExecOrderKind::Trigger { trigger }) =
927                    (cmd.trigger_price, &mut req.kind)
928                {
929                    let tp_dec = tp.as_decimal();
930                    trigger.trigger_px = if should_normalize {
931                        normalize_price(tp_dec, price_decimals).normalize()
932                    } else {
933                        tp_dec.normalize()
934                    };
935                }
936
937                req
938            }
939            Err(e) => {
940                log::warn!("Order conversion failed for modify: {e}");
941                return Ok(());
942            }
943        };
944
945        let dispatch_state = self.ws_dispatch_state.clone();
946        let client_order_id = cmd.client_order_id;
947        let old_venue_order_id = venue_order_id;
948
949        self.spawn_task("modify_order", async move {
950            let action = HyperliquidExecAction::Modify {
951                modify: HyperliquidExecModifyOrderRequest {
952                    oid,
953                    order: hyperliquid_order,
954                },
955            };
956
957            match http_client.post_action_exec(&action).await {
958                Ok(response) => {
959                    if response.is_ok() {
960                        if let Some(inner_error) = extract_inner_error(&response) {
961                            log::warn!("Order modification rejected by exchange: {inner_error}");
962                        } else {
963                            // Mark the old venue_order_id as in-flight only
964                            // after a confirmed HTTP success. A failed modify
965                            // never leaves stale race state behind, so the
966                            // cancel-before-accept branch never fires on a
967                            // cancel following an independent failed modify.
968                            dispatch_state.mark_pending_modify(client_order_id, old_venue_order_id);
969                            log::info!("Order modified successfully: {response:?}");
970                        }
971                    } else {
972                        let error_msg = extract_error_message(&response);
973                        log::warn!("Order modification rejected by exchange: {error_msg}");
974                    }
975                }
976                Err(e) => {
977                    log::warn!("Order modification HTTP request failed: {e}");
978                }
979            }
980
981            Ok(())
982        });
983
984        Ok(())
985    }
986
987    fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
988        log::debug!("Cancelling order: {cmd:?}");
989
990        let http_client = self.http_client.clone();
991        let emitter = self.emitter.clone();
992        let clock = self.clock;
993        let client_order_id = cmd.client_order_id;
994        let client_order_id_str = cmd.client_order_id.to_string();
995        let strategy_id = cmd.strategy_id;
996        let instrument_id = cmd.instrument_id;
997        let venue_order_id = cmd.venue_order_id;
998        let symbol = cmd.instrument_id.symbol.to_string();
999
1000        self.spawn_task("cancel_order", async move {
1001            let asset = match http_client.get_asset_index(&symbol) {
1002                Some(a) => a,
1003                None => {
1004                    emitter.emit_order_cancel_rejected_event(
1005                        strategy_id,
1006                        instrument_id,
1007                        client_order_id,
1008                        venue_order_id,
1009                        &format!("Asset index not found for symbol {symbol}"),
1010                        clock.get_time_ns(),
1011                    );
1012                    return Ok(());
1013                }
1014            };
1015
1016            let cancel_request =
1017                client_order_id_to_cancel_request_with_asset(&client_order_id_str, asset);
1018            let action = HyperliquidExecAction::CancelByCloid {
1019                cancels: vec![cancel_request],
1020            };
1021
1022            match http_client.post_action_exec(&action).await {
1023                Ok(response) => {
1024                    if response.is_ok() {
1025                        if let Some(inner_error) = extract_inner_error(&response) {
1026                            emitter.emit_order_cancel_rejected_event(
1027                                strategy_id,
1028                                instrument_id,
1029                                client_order_id,
1030                                venue_order_id,
1031                                &inner_error,
1032                                clock.get_time_ns(),
1033                            );
1034                        } else {
1035                            log::info!("Order cancelled successfully: {response:?}");
1036                        }
1037                    } else {
1038                        emitter.emit_order_cancel_rejected_event(
1039                            strategy_id,
1040                            instrument_id,
1041                            client_order_id,
1042                            venue_order_id,
1043                            &extract_error_message(&response),
1044                            clock.get_time_ns(),
1045                        );
1046                    }
1047                }
1048                Err(e) => {
1049                    emitter.emit_order_cancel_rejected_event(
1050                        strategy_id,
1051                        instrument_id,
1052                        client_order_id,
1053                        venue_order_id,
1054                        &format!("Cancel HTTP request failed: {e}"),
1055                        clock.get_time_ns(),
1056                    );
1057                }
1058            }
1059
1060            Ok(())
1061        });
1062
1063        Ok(())
1064    }
1065
1066    fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1067        log::debug!("Cancelling all orders: {cmd:?}");
1068
1069        let cache = self.core.cache();
1070        let open_orders = cache.orders_open(
1071            Some(&self.core.venue),
1072            Some(&cmd.instrument_id),
1073            None,
1074            None,
1075            Some(cmd.order_side),
1076        );
1077
1078        if open_orders.is_empty() {
1079            log::debug!("No open orders to cancel for {:?}", cmd.instrument_id);
1080            return Ok(());
1081        }
1082
1083        let symbol = cmd.instrument_id.symbol.to_string();
1084        let instrument_id = cmd.instrument_id;
1085        let strategy_id = cmd.strategy_id;
1086        let entries: Vec<CancelEntry> = open_orders
1087            .iter()
1088            .map(|o| CancelEntry {
1089                strategy_id,
1090                instrument_id,
1091                client_order_id: o.client_order_id(),
1092                venue_order_id: o.venue_order_id(),
1093                symbol: symbol.clone(),
1094            })
1095            .collect();
1096
1097        let http_client = self.http_client.clone();
1098        let emitter = self.emitter.clone();
1099        let clock = self.clock;
1100
1101        self.spawn_task("cancel_all_orders", async move {
1102            let asset = match http_client.get_asset_index(&symbol) {
1103                Some(a) => a,
1104                None => {
1105                    let reason = format!("Asset index not found for symbol {symbol}");
1106                    log::warn!("{reason}");
1107                    let ts = clock.get_time_ns();
1108
1109                    for entry in &entries {
1110                        emitter.emit_order_cancel_rejected_event(
1111                            entry.strategy_id,
1112                            entry.instrument_id,
1113                            entry.client_order_id,
1114                            entry.venue_order_id,
1115                            &reason,
1116                            ts,
1117                        );
1118                    }
1119                    return Ok(());
1120                }
1121            };
1122
1123            let cancel_requests: Vec<_> = entries
1124                .iter()
1125                .map(|e| {
1126                    client_order_id_to_cancel_request_with_asset(e.client_order_id.as_ref(), asset)
1127                })
1128                .collect();
1129
1130            if cancel_requests.is_empty() {
1131                return Ok(());
1132            }
1133
1134            let action = HyperliquidExecAction::CancelByCloid {
1135                cancels: cancel_requests,
1136            };
1137
1138            match http_client.post_action_exec(&action).await {
1139                Ok(response) => {
1140                    if response.is_ok() {
1141                        let inner_errors = extract_inner_errors(&response);
1142                        let ts = clock.get_time_ns();
1143
1144                        if inner_errors.is_empty() {
1145                            log::info!("Cancel-all submitted successfully: {response:?}");
1146                        } else {
1147                            for (i, entry) in entries.iter().enumerate() {
1148                                if let Some(Some(error_msg)) = inner_errors.get(i) {
1149                                    log::warn!(
1150                                        "Cancel for {} rejected by exchange: {error_msg}",
1151                                        entry.client_order_id,
1152                                    );
1153                                    emitter.emit_order_cancel_rejected_event(
1154                                        entry.strategy_id,
1155                                        entry.instrument_id,
1156                                        entry.client_order_id,
1157                                        entry.venue_order_id,
1158                                        error_msg,
1159                                        ts,
1160                                    );
1161                                }
1162                            }
1163                        }
1164                    } else {
1165                        let error_msg = extract_error_message(&response);
1166                        log::warn!("Cancel-all rejected by exchange: {error_msg}");
1167                        let ts = clock.get_time_ns();
1168
1169                        for entry in &entries {
1170                            emitter.emit_order_cancel_rejected_event(
1171                                entry.strategy_id,
1172                                entry.instrument_id,
1173                                entry.client_order_id,
1174                                entry.venue_order_id,
1175                                &error_msg,
1176                                ts,
1177                            );
1178                        }
1179                    }
1180                }
1181                Err(e) => {
1182                    let reason = format!("Cancel-all HTTP request failed: {e}");
1183                    log::warn!("{reason}");
1184                    let ts = clock.get_time_ns();
1185
1186                    for entry in &entries {
1187                        emitter.emit_order_cancel_rejected_event(
1188                            entry.strategy_id,
1189                            entry.instrument_id,
1190                            entry.client_order_id,
1191                            entry.venue_order_id,
1192                            &reason,
1193                            ts,
1194                        );
1195                    }
1196                }
1197            }
1198
1199            Ok(())
1200        });
1201
1202        Ok(())
1203    }
1204
1205    fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1206        log::debug!("Batch cancelling orders: {cmd:?}");
1207
1208        if cmd.cancels.is_empty() {
1209            log::debug!("No orders to cancel in batch");
1210            return Ok(());
1211        }
1212
1213        let entries: Vec<CancelEntry> = cmd
1214            .cancels
1215            .iter()
1216            .map(|c| CancelEntry {
1217                strategy_id: c.strategy_id,
1218                instrument_id: c.instrument_id,
1219                client_order_id: c.client_order_id,
1220                venue_order_id: c.venue_order_id,
1221                symbol: c.instrument_id.symbol.to_string(),
1222            })
1223            .collect();
1224
1225        let http_client = self.http_client.clone();
1226        let emitter = self.emitter.clone();
1227        let clock = self.clock;
1228
1229        self.spawn_task("batch_cancel_orders", async move {
1230            let mut cancel_requests = Vec::new();
1231            let mut sent_entries: Vec<&CancelEntry> = Vec::new();
1232
1233            for entry in &entries {
1234                let asset = match http_client.get_asset_index(&entry.symbol) {
1235                    Some(a) => a,
1236                    None => {
1237                        let reason = format!("Asset index not found for symbol {}", entry.symbol);
1238                        log::warn!("{reason}, skipping cancel for {}", entry.client_order_id);
1239                        emitter.emit_order_cancel_rejected_event(
1240                            entry.strategy_id,
1241                            entry.instrument_id,
1242                            entry.client_order_id,
1243                            entry.venue_order_id,
1244                            &reason,
1245                            clock.get_time_ns(),
1246                        );
1247                        continue;
1248                    }
1249                };
1250                cancel_requests.push(client_order_id_to_cancel_request_with_asset(
1251                    entry.client_order_id.as_ref(),
1252                    asset,
1253                ));
1254                sent_entries.push(entry);
1255            }
1256
1257            if cancel_requests.is_empty() {
1258                log::warn!("No valid cancel requests in batch");
1259                return Ok(());
1260            }
1261
1262            let action = HyperliquidExecAction::CancelByCloid {
1263                cancels: cancel_requests,
1264            };
1265
1266            match http_client.post_action_exec(&action).await {
1267                Ok(response) => {
1268                    if response.is_ok() {
1269                        let inner_errors = extract_inner_errors(&response);
1270                        let ts = clock.get_time_ns();
1271
1272                        if inner_errors.is_empty() {
1273                            log::info!("Batch cancel submitted successfully: {response:?}");
1274                        } else {
1275                            for (i, entry) in sent_entries.iter().enumerate() {
1276                                if let Some(Some(error_msg)) = inner_errors.get(i) {
1277                                    log::warn!(
1278                                        "Cancel for {} rejected by exchange: {error_msg}",
1279                                        entry.client_order_id,
1280                                    );
1281                                    emitter.emit_order_cancel_rejected_event(
1282                                        entry.strategy_id,
1283                                        entry.instrument_id,
1284                                        entry.client_order_id,
1285                                        entry.venue_order_id,
1286                                        error_msg,
1287                                        ts,
1288                                    );
1289                                }
1290                            }
1291                        }
1292                    } else {
1293                        let error_msg = extract_error_message(&response);
1294                        log::warn!("Batch cancel rejected by exchange: {error_msg}");
1295                        let ts = clock.get_time_ns();
1296
1297                        for entry in &sent_entries {
1298                            emitter.emit_order_cancel_rejected_event(
1299                                entry.strategy_id,
1300                                entry.instrument_id,
1301                                entry.client_order_id,
1302                                entry.venue_order_id,
1303                                &error_msg,
1304                                ts,
1305                            );
1306                        }
1307                    }
1308                }
1309                Err(e) => {
1310                    let reason = format!("Batch cancel HTTP request failed: {e}");
1311                    log::warn!("{reason}");
1312                    let ts = clock.get_time_ns();
1313
1314                    for entry in &sent_entries {
1315                        emitter.emit_order_cancel_rejected_event(
1316                            entry.strategy_id,
1317                            entry.instrument_id,
1318                            entry.client_order_id,
1319                            entry.venue_order_id,
1320                            &reason,
1321                            ts,
1322                        );
1323                    }
1324                }
1325            }
1326
1327            Ok(())
1328        });
1329
1330        Ok(())
1331    }
1332
1333    fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1334        let http_client = self.http_client.clone();
1335        let account_address = self.get_account_address()?;
1336        let emitter = self.emitter.clone();
1337        let clock = self.clock;
1338
1339        self.spawn_task("query_account", async move {
1340            let perp_json = http_client
1341                .info_clearinghouse_state(&account_address)
1342                .await
1343                .context("failed to fetch clearinghouse state")?;
1344
1345            let perp_state: ClearinghouseState = serde_json::from_value(perp_json)
1346                .context("failed to deserialize clearinghouse state")?;
1347
1348            let spot_json = http_client
1349                .info_spot_clearinghouse_state(&account_address)
1350                .await
1351                .context("failed to fetch spot clearinghouse state")?;
1352            let spot_state: SpotClearinghouseState = serde_json::from_value(spot_json)
1353                .context("failed to deserialize spot clearinghouse state")?;
1354
1355            let (balances, margins) =
1356                parse_combined_account_balances_and_margins(&perp_state, &spot_state)
1357                    .context("failed to parse combined account balances and margins")?;
1358            let ts_event = clock.get_time_ns();
1359            emitter.emit_account_state(balances, margins, true, ts_event);
1360
1361            Ok(())
1362        });
1363
1364        Ok(())
1365    }
1366
1367    fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1368        log::debug!("Querying order: {cmd:?}");
1369
1370        let client_order_id = cmd.client_order_id;
1371        let venue_order_id = match cmd.venue_order_id {
1372            Some(voi) => Some(voi),
1373            None => self.core.cache().venue_order_id(&client_order_id).copied(),
1374        };
1375
1376        let account_address = self.get_account_address()?;
1377        let http_client = self.http_client.clone();
1378        let emitter = self.emitter.clone();
1379
1380        self.spawn_task("query_order", async move {
1381            // Search open orders by cloid first so modify/cancel-replace
1382            // resolves to the live replacement rather than a stale cached oid.
1383            // Request errors here are logged and the oid fallback is still tried;
1384            // a transient frontendOpenOrders failure must not abort the whole query.
1385            match http_client
1386                .request_order_status_report_by_client_order_id(&account_address, &client_order_id)
1387                .await
1388            {
1389                Ok(Some(report)) => {
1390                    log::info!("Queried order status for {client_order_id}");
1391                    emitter.send_order_status_report(report);
1392                    return Ok(());
1393                }
1394                Ok(None) => {}
1395                Err(e) => {
1396                    log::warn!(
1397                        "Failed to query order status for {client_order_id}: {e}; falling back to oid lookup"
1398                    );
1399                }
1400            }
1401
1402            let Some(venue_order_id) = venue_order_id else {
1403                log::info!("No order status report found for {client_order_id}");
1404                return Ok(());
1405            };
1406
1407            let oid: u64 = match venue_order_id.as_str().parse() {
1408                Ok(oid) => oid,
1409                Err(e) => {
1410                    log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
1411                    return Ok(());
1412                }
1413            };
1414
1415            match http_client
1416                .request_order_status_report(&account_address, oid)
1417                .await
1418            {
1419                Ok(Some(report)) => {
1420                    log::info!("Queried order status for oid {oid}");
1421                    emitter.send_order_status_report(report);
1422                }
1423                Ok(None) => {
1424                    log::info!("No order status report found for oid {oid}");
1425                }
1426                Err(e) => {
1427                    log::warn!("Failed to query order status for oid {oid}: {e}");
1428                }
1429            }
1430
1431            Ok(())
1432        });
1433
1434        Ok(())
1435    }
1436
1437    async fn connect(&mut self) -> anyhow::Result<()> {
1438        if self.core.is_connected() {
1439            return Ok(());
1440        }
1441
1442        log::info!("Connecting Hyperliquid execution client");
1443
1444        // Ensure instruments are initialized
1445        self.ensure_instruments_initialized_async().await?;
1446
1447        // Start WebSocket stream (connects and subscribes to user channels)
1448        self.start_ws_stream().await?;
1449
1450        // Post-WS setup: if any step fails, tear down WS before returning
1451        let post_ws = async {
1452            self.refresh_account_state().await?;
1453            self.await_account_registered(30.0).await?;
1454
1455            Ok::<(), anyhow::Error>(())
1456        };
1457
1458        if let Err(e) = post_ws.await {
1459            log::warn!("Connect failed after WS started, tearing down: {e}");
1460            let _ = self.ws_client.disconnect().await;
1461            self.abort_pending_tasks();
1462            return Err(e);
1463        }
1464
1465        self.core.set_connected();
1466
1467        log::info!("Connected: client_id={}", self.core.client_id);
1468        Ok(())
1469    }
1470
1471    async fn disconnect(&mut self) -> anyhow::Result<()> {
1472        if self.core.is_disconnected() {
1473            return Ok(());
1474        }
1475
1476        log::info!("Disconnecting Hyperliquid execution client");
1477
1478        // Disconnect WebSocket
1479        self.ws_client.disconnect().await?;
1480
1481        // Abort any pending tasks
1482        self.abort_pending_tasks();
1483
1484        self.core.set_disconnected();
1485
1486        log::info!("Disconnected: client_id={}", self.core.client_id);
1487        Ok(())
1488    }
1489
1490    async fn generate_order_status_report(
1491        &self,
1492        cmd: &GenerateOrderStatusReport,
1493    ) -> anyhow::Result<Option<OrderStatusReport>> {
1494        let account_address = self.get_account_address()?;
1495
1496        if cmd.venue_order_id.is_none() && cmd.client_order_id.is_none() {
1497            log::warn!(
1498                "Cannot generate order status report without venue_order_id or client_order_id"
1499            );
1500            return Ok(None);
1501        }
1502
1503        // Search open orders by cloid first when supplied. Hyperliquid modify
1504        // produces a new venue oid while preserving cloid, so a cached oid can
1505        // point at the canceled leg rather than the live replacement.
1506        if let Some(client_order_id) = &cmd.client_order_id
1507            && let Some(report) = self
1508                .http_client
1509                .request_order_status_report_by_client_order_id(&account_address, client_order_id)
1510                .await
1511                .context("failed to generate order status report by client_order_id")?
1512        {
1513            log::info!("Generated order status report for {client_order_id}");
1514            return Ok(Some(report));
1515        }
1516
1517        let oid = match &cmd.venue_order_id {
1518            Some(venue_order_id) => venue_order_id
1519                .as_str()
1520                .parse::<u64>()
1521                .context("failed to parse venue_order_id as oid")?,
1522            None => match &cmd.client_order_id {
1523                Some(client_order_id) => {
1524                    let cached_oid: Option<u64> = self
1525                        .core
1526                        .cache()
1527                        .venue_order_id(client_order_id)
1528                        .and_then(|v| v.as_str().parse::<u64>().ok());
1529
1530                    match cached_oid {
1531                        Some(oid) => oid,
1532                        None => {
1533                            log::info!("No order status report found for {client_order_id}");
1534                            return Ok(None);
1535                        }
1536                    }
1537                }
1538                None => unreachable!("cmd must carry at least one identifier"),
1539            },
1540        };
1541
1542        let report = self
1543            .http_client
1544            .request_order_status_report(&account_address, oid)
1545            .await
1546            .context("failed to generate order status report")?;
1547
1548        if report.is_some() {
1549            log::info!("Generated order status report for oid {oid}");
1550        } else {
1551            log::info!("No order status report found for oid {oid}");
1552        }
1553        Ok(report)
1554    }
1555
1556    async fn generate_order_status_reports(
1557        &self,
1558        cmd: &GenerateOrderStatusReports,
1559    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1560        let account_address = self.get_account_address()?;
1561
1562        let reports = self
1563            .http_client
1564            .request_order_status_reports(&account_address, cmd.instrument_id)
1565            .await
1566            .context("failed to generate order status reports")?;
1567
1568        // Filter by open_only if specified
1569        let reports = if cmd.open_only {
1570            reports
1571                .into_iter()
1572                .filter(|r| r.order_status.is_open())
1573                .collect()
1574        } else {
1575            reports
1576        };
1577
1578        // Filter by time range if specified
1579        let reports = match (cmd.start, cmd.end) {
1580            (Some(start), Some(end)) => reports
1581                .into_iter()
1582                .filter(|r| r.ts_last >= start && r.ts_last <= end)
1583                .collect(),
1584            (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
1585            (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
1586            (None, None) => reports,
1587        };
1588
1589        log::info!("Generated {} order status reports", reports.len());
1590        Ok(reports)
1591    }
1592
1593    async fn generate_fill_reports(
1594        &self,
1595        cmd: GenerateFillReports,
1596    ) -> anyhow::Result<Vec<FillReport>> {
1597        let account_address = self.get_account_address()?;
1598
1599        let reports = self
1600            .http_client
1601            .request_fill_reports(&account_address, cmd.instrument_id)
1602            .await
1603            .context("failed to generate fill reports")?;
1604
1605        // Filter by time range if specified
1606        let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1607            reports
1608                .into_iter()
1609                .filter(|r| r.ts_event >= start && r.ts_event <= end)
1610                .collect()
1611        } else if let Some(start) = cmd.start {
1612            reports
1613                .into_iter()
1614                .filter(|r| r.ts_event >= start)
1615                .collect()
1616        } else if let Some(end) = cmd.end {
1617            reports.into_iter().filter(|r| r.ts_event <= end).collect()
1618        } else {
1619            reports
1620        };
1621
1622        log::info!("Generated {} fill reports", reports.len());
1623        Ok(reports)
1624    }
1625
1626    async fn generate_position_status_reports(
1627        &self,
1628        cmd: &GeneratePositionStatusReports,
1629    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1630        let account_address = self.get_account_address()?;
1631
1632        // request_position_status_reports already merges spot holdings
1633        let reports = self
1634            .http_client
1635            .request_position_status_reports(&account_address, cmd.instrument_id)
1636            .await
1637            .context("failed to generate position status reports")?;
1638
1639        log::info!("Generated {} position status reports", reports.len());
1640        Ok(reports)
1641    }
1642
1643    async fn generate_mass_status(
1644        &self,
1645        lookback_mins: Option<u64>,
1646    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1647        let ts_init = self.clock.get_time_ns();
1648
1649        let order_cmd = GenerateOrderStatusReports::new(
1650            UUID4::new(),
1651            ts_init,
1652            true, // open_only
1653            None,
1654            None,
1655            None,
1656            None,
1657            None,
1658        );
1659        let fill_cmd =
1660            GenerateFillReports::new(UUID4::new(), ts_init, None, None, None, None, None, None);
1661        let position_cmd =
1662            GeneratePositionStatusReports::new(UUID4::new(), ts_init, None, None, None, None, None);
1663
1664        let order_reports = self.generate_order_status_reports(&order_cmd).await?;
1665        let mut fill_reports = self.generate_fill_reports(fill_cmd).await?;
1666        let position_reports = self.generate_position_status_reports(&position_cmd).await?;
1667
1668        // Apply lookback filter to fills only (positions are current state,
1669        // and open orders must always be included for correct reconciliation)
1670        if let Some(mins) = lookback_mins {
1671            let cutoff_ns = ts_init
1672                .as_u64()
1673                .saturating_sub(mins.saturating_mul(60).saturating_mul(1_000_000_000));
1674            let cutoff = UnixNanos::from(cutoff_ns);
1675
1676            fill_reports.retain(|r| r.ts_event >= cutoff);
1677        }
1678
1679        let mut mass_status = ExecutionMassStatus::new(
1680            self.core.client_id,
1681            self.core.account_id,
1682            self.core.venue,
1683            ts_init,
1684            None,
1685        );
1686        mass_status.add_order_reports(order_reports);
1687        mass_status.add_fill_reports(fill_reports);
1688        mass_status.add_position_reports(position_reports);
1689
1690        log::info!(
1691            "Generated mass status: {} orders, {} fills, {} positions",
1692            mass_status.order_reports().len(),
1693            mass_status.fill_reports().len(),
1694            mass_status.position_reports().len(),
1695        );
1696
1697        Ok(Some(mass_status))
1698    }
1699}
1700
1701impl HyperliquidExecutionClient {
1702    async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1703        {
1704            let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1705            if handle_guard.is_some() {
1706                return Ok(());
1707            }
1708        }
1709
1710        let user_address = self.get_user_address()?;
1711
1712        // Use account_address (agent wallet) or vault address for WS subscriptions,
1713        // otherwise order/fill updates will be missed
1714        let subscription_address = self
1715            .config
1716            .account_address
1717            .as_ref()
1718            .or(self.config.vault_address.as_ref())
1719            .unwrap_or(&user_address)
1720            .clone();
1721
1722        let mut ws_client = self.ws_client.clone();
1723
1724        let instruments = self
1725            .http_client
1726            .request_instruments()
1727            .await
1728            .unwrap_or_default();
1729
1730        for instrument in instruments {
1731            ws_client.cache_instrument(instrument);
1732        }
1733
1734        // Connect and subscribe before spawning the event loop
1735        ws_client.connect().await?;
1736        ws_client
1737            .subscribe_order_updates(&subscription_address)
1738            .await?;
1739        ws_client
1740            .subscribe_user_events(&subscription_address)
1741            .await?;
1742        log::info!("Subscribed to Hyperliquid execution updates for {subscription_address}");
1743
1744        // Transfer task handle to original so disconnect() can await it
1745        if let Some(handle) = ws_client.take_task_handle() {
1746            self.ws_client.set_task_handle(handle);
1747        }
1748
1749        let emitter = self.emitter.clone();
1750        let dispatch_state = self.ws_dispatch_state.clone();
1751        let clock = self.clock;
1752        let runtime = get_runtime();
1753        let handle = runtime.spawn(async move {
1754            // Cloids for external / untracked orders that reach a terminal
1755            // state: we evict their mapping immediately so long-running
1756            // sessions do not leak. Tracked orders clear their own cloid
1757            // mapping from the dispatch `cleanup_terminal` path below.
1758            //
1759            // For a tracked order that hits a status-only `FILLED` marker
1760            // without an accompanying fill, we defer the cloid cleanup until
1761            // the matching `FillReport` arrives so partial fills do not lose
1762            // their `client_order_id` link. The bounded FIFO cache keeps
1763            // orphaned entries from growing unbounded.
1764            let mut pending_filled_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
1765
1766            loop {
1767                let event = ws_client.next_event().await;
1768
1769                match event {
1770                    Some(msg) => match msg {
1771                        NautilusWsMessage::ExecutionReports(reports) => {
1772                            for report in reports {
1773                                handle_execution_report(
1774                                    report,
1775                                    &dispatch_state,
1776                                    &emitter,
1777                                    &ws_client,
1778                                    &mut pending_filled_cloids,
1779                                    clock.get_time_ns(),
1780                                );
1781                            }
1782                        }
1783                        // Reconnected is handled by WS client internally
1784                        // (resubscribe_all) and never forwarded here
1785                        NautilusWsMessage::Reconnected => {}
1786                        NautilusWsMessage::Error(e) => {
1787                            log::error!("WebSocket error: {e}");
1788                        }
1789                        // Handled by data client
1790                        NautilusWsMessage::Trades(_)
1791                        | NautilusWsMessage::Quote(_)
1792                        | NautilusWsMessage::Deltas(_)
1793                        | NautilusWsMessage::Depth10(_)
1794                        | NautilusWsMessage::Candle(_)
1795                        | NautilusWsMessage::MarkPrice(_)
1796                        | NautilusWsMessage::IndexPrice(_)
1797                        | NautilusWsMessage::FundingRate(_) => {}
1798                    },
1799                    None => {
1800                        log::debug!("WebSocket next_event returned None, stream closed");
1801                        break;
1802                    }
1803                }
1804            }
1805        });
1806
1807        *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1808        log::info!("Hyperliquid WebSocket execution stream started");
1809        Ok(())
1810    }
1811}
1812
1813/// Registers an order's identity in the dispatch state so its subsequent
1814/// WebSocket lifecycle can route through the typed-event path.
1815///
1816/// Quote-quantity orders submit a quote amount (e.g. 100 USD) but the venue
1817struct CancelEntry {
1818    strategy_id: StrategyId,
1819    instrument_id: InstrumentId,
1820    client_order_id: ClientOrderId,
1821    venue_order_id: Option<VenueOrderId>,
1822    symbol: String,
1823}
1824
1825/// reports fills in base units. Comparing those two when deciding whether an
1826/// order is fully filled would leave the order stuck "open" forever, so they
1827/// flow through the untracked path and the engine reconciles them from
1828/// status reports instead.
1829fn register_order_identity_into(state: &WsDispatchState, order: &OrderAny) {
1830    if order.is_quote_quantity() {
1831        return;
1832    }
1833    state.register_identity(
1834        order.client_order_id(),
1835        OrderIdentity {
1836            strategy_id: order.strategy_id(),
1837            instrument_id: order.instrument_id(),
1838            order_side: order.order_side(),
1839            order_type: order.order_type(),
1840            quantity: order.quantity(),
1841            price: order.price(),
1842        },
1843    );
1844}
1845
1846/// Routes a single execution report through the two-tier dispatch.
1847///
1848/// For tracked orders this emits typed `OrderEventAny` events via the
1849/// dispatch module; external / untracked orders fall back to the raw report
1850/// so the engine can reconcile. Cloid-mapping cleanup is handled here so
1851/// long-running sessions do not leak mapping entries.
1852fn handle_execution_report(
1853    report: ExecutionReport,
1854    dispatch_state: &WsDispatchState,
1855    emitter: &ExecutionEventEmitter,
1856    ws_client: &HyperliquidWebSocketClient,
1857    pending_filled_cloids: &mut FifoCache<ClientOrderId, 10_000>,
1858    ts_init: UnixNanos,
1859) {
1860    match report {
1861        ExecutionReport::Order(order_report) => {
1862            let is_filled_marker = matches!(order_report.order_status, OrderStatus::Filled);
1863            let is_open = order_report.order_status.is_open();
1864            let client_order_id = order_report.client_order_id;
1865
1866            let outcome =
1867                dispatch_order_status_report(&order_report, dispatch_state, emitter, ts_init);
1868
1869            if outcome == DispatchOutcome::External {
1870                emitter.send_order_status_report(order_report);
1871            }
1872
1873            // Cloid cleanup:
1874            //
1875            // * `Skip` (stale cancel leg of a cancel-replace, cancel-before-accept
1876            //   race, or replay after terminal): leave the mapping intact. The
1877            //   still-open replacement order depends on it for subsequent events,
1878            //   and a genuinely terminal replay had its mapping evicted earlier.
1879            // * `Tracked` + status-only FILLED marker: defer the eviction until
1880            //   the matching `FillReport` lands so the partial fill preceding it
1881            //   keeps its client-order-id link.
1882            // * `Tracked` non-marker terminal and `External` terminal: evict now
1883            //   so long-running sessions do not leak cloid mappings.
1884            if let Some(id) = client_order_id
1885                && !is_open
1886            {
1887                match outcome {
1888                    DispatchOutcome::Skip => {}
1889                    DispatchOutcome::Tracked if is_filled_marker => {
1890                        pending_filled_cloids.add(id);
1891                    }
1892                    DispatchOutcome::Tracked | DispatchOutcome::External => {
1893                        let cloid = Cloid::from_client_order_id(id);
1894                        ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
1895                    }
1896                }
1897            }
1898        }
1899        ExecutionReport::Fill(fill_report) => {
1900            let client_order_id = fill_report.client_order_id;
1901
1902            let outcome = dispatch_fill_report(&fill_report, dispatch_state, emitter, ts_init);
1903
1904            if outcome == DispatchOutcome::External {
1905                emitter.send_fill_report(fill_report);
1906            }
1907
1908            // If this fill matches a deferred FILLED marker, drop the cloid
1909            // mapping now that the fill has landed.
1910            if let Some(id) = client_order_id
1911                && pending_filled_cloids.contains(&id)
1912            {
1913                pending_filled_cloids.remove(&id);
1914                let cloid = Cloid::from_client_order_id(id);
1915                ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
1916            }
1917        }
1918    }
1919}
1920
1921use crate::common::parse::determine_order_list_grouping;
1922
1923#[cfg(test)]
1924mod tests {
1925    use nautilus_common::messages::ExecutionEvent;
1926    use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
1927    use nautilus_live::ExecutionEventEmitter;
1928    use nautilus_model::{
1929        enums::{
1930            AccountType, ContingencyType, LiquiditySide, OrderSide, OrderStatus, OrderType,
1931            TimeInForce, TriggerType,
1932        },
1933        events::OrderEventAny,
1934        identifiers::{
1935            AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
1936        },
1937        orders::{OrderAny, limit::LimitOrder, stop_market::StopMarketOrder},
1938        reports::{FillReport, OrderStatusReport},
1939        types::{Currency, Money, Price, Quantity},
1940    };
1941    use nautilus_network::websocket::TransportBackend;
1942    use rstest::rstest;
1943    use ustr::Ustr;
1944
1945    use super::{
1946        Cloid, ExecutionReport, FifoCache, HyperliquidWebSocketClient, OrderIdentity,
1947        WsDispatchState, determine_order_list_grouping, handle_execution_report,
1948        register_order_identity_into,
1949    };
1950    use crate::{common::enums::HyperliquidEnvironment, http::models::HyperliquidExecGrouping};
1951
1952    const TEST_INSTRUMENT_ID: &str = "BTC-USD-PERP.HYPERLIQUID";
1953
1954    fn test_emitter() -> (
1955        ExecutionEventEmitter,
1956        tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1957    ) {
1958        let clock = get_atomic_clock_realtime();
1959        let mut emitter = ExecutionEventEmitter::new(
1960            clock,
1961            TraderId::from("TESTER-001"),
1962            AccountId::from("HYPERLIQUID-001"),
1963            AccountType::Margin,
1964            None,
1965        );
1966        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1967        emitter.set_sender(tx);
1968        (emitter, rx)
1969    }
1970
1971    fn drain_events(
1972        rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1973    ) -> Vec<ExecutionEvent> {
1974        let mut out = Vec::new();
1975        while let Ok(e) = rx.try_recv() {
1976            out.push(e);
1977        }
1978        out
1979    }
1980
1981    fn make_ws_client() -> HyperliquidWebSocketClient {
1982        // `HyperliquidWebSocketClient::new` does not connect, so this is a
1983        // cheap unit-test shim that still exercises the real `cloid_cache`
1984        // mapping APIs used by `handle_execution_report`.
1985        HyperliquidWebSocketClient::new(
1986            Some("wss://test.invalid".to_string()),
1987            HyperliquidEnvironment::Testnet,
1988            None,
1989            TransportBackend::default(),
1990            None,
1991        )
1992    }
1993
1994    fn test_identity() -> OrderIdentity {
1995        OrderIdentity {
1996            strategy_id: StrategyId::from("S-001"),
1997            instrument_id: InstrumentId::from(TEST_INSTRUMENT_ID),
1998            order_side: OrderSide::Buy,
1999            order_type: OrderType::Limit,
2000            quantity: Quantity::from("0.0001"),
2001            price: Some(Price::from("56730.0")),
2002        }
2003    }
2004
2005    fn make_status_report(
2006        client_order_id: Option<&str>,
2007        venue_order_id: &str,
2008        status: OrderStatus,
2009    ) -> OrderStatusReport {
2010        OrderStatusReport::new(
2011            AccountId::from("HYPERLIQUID-001"),
2012            InstrumentId::from(TEST_INSTRUMENT_ID),
2013            client_order_id.map(ClientOrderId::new),
2014            VenueOrderId::new(venue_order_id),
2015            OrderSide::Buy,
2016            OrderType::Limit,
2017            TimeInForce::Gtc,
2018            status,
2019            Quantity::from("0.0001"),
2020            Quantity::from("0"),
2021            UnixNanos::default(),
2022            UnixNanos::default(),
2023            UnixNanos::default(),
2024            Some(UUID4::new()),
2025        )
2026        .with_price(Price::from("56730.0"))
2027    }
2028
2029    fn make_fill_report(
2030        client_order_id: Option<&str>,
2031        venue_order_id: &str,
2032        trade_id: &str,
2033    ) -> FillReport {
2034        FillReport::new(
2035            AccountId::from("HYPERLIQUID-001"),
2036            InstrumentId::from(TEST_INSTRUMENT_ID),
2037            VenueOrderId::new(venue_order_id),
2038            TradeId::new(trade_id),
2039            OrderSide::Buy,
2040            Quantity::from("0.0001"),
2041            Price::from("56730.0"),
2042            Money::new(0.0, Currency::USD()),
2043            LiquiditySide::Taker,
2044            client_order_id.map(ClientOrderId::new),
2045            None,
2046            UnixNanos::default(),
2047            UnixNanos::default(),
2048            Some(UUID4::new()),
2049        )
2050    }
2051
2052    fn cloid_for(id: &str) -> Ustr {
2053        let cloid = Cloid::from_client_order_id(ClientOrderId::from(id));
2054        Ustr::from(&cloid.to_hex())
2055    }
2056
2057    fn limit_order(
2058        id: &str,
2059        reduce_only: bool,
2060        contingency: ContingencyType,
2061        linked_ids: Option<Vec<&str>>,
2062        parent_id: Option<&str>,
2063    ) -> OrderAny {
2064        OrderAny::Limit(LimitOrder::new(
2065            TraderId::from("TESTER-001"),
2066            StrategyId::from("S-001"),
2067            InstrumentId::from("ETH-USD-PERP.HYPERLIQUID"),
2068            ClientOrderId::from(id),
2069            OrderSide::Buy,
2070            Quantity::from(1),
2071            Price::from("3000.00"),
2072            TimeInForce::Gtc,
2073            None,  // expire_time
2074            false, // post_only
2075            reduce_only,
2076            false, // quote_quantity
2077            None,  // display_qty
2078            None,  // emulation_trigger
2079            None,  // trigger_instrument_id
2080            Some(contingency),
2081            None, // order_list_id
2082            linked_ids.map(|ids| ids.into_iter().map(ClientOrderId::from).collect()),
2083            parent_id.map(ClientOrderId::from),
2084            None, // exec_algorithm_id
2085            None, // exec_algorithm_params
2086            None, // exec_spawn_id
2087            None, // tags
2088            Default::default(),
2089            Default::default(),
2090        ))
2091    }
2092
2093    fn stop_order(
2094        id: &str,
2095        reduce_only: bool,
2096        contingency: ContingencyType,
2097        linked_ids: Option<Vec<&str>>,
2098        parent_id: Option<&str>,
2099    ) -> OrderAny {
2100        OrderAny::StopMarket(StopMarketOrder::new(
2101            TraderId::from("TESTER-001"),
2102            StrategyId::from("S-001"),
2103            InstrumentId::from("ETH-USD-PERP.HYPERLIQUID"),
2104            ClientOrderId::from(id),
2105            OrderSide::Sell,
2106            Quantity::from(1),
2107            Price::from("2800.00"),
2108            TriggerType::LastPrice,
2109            TimeInForce::Gtc,
2110            None, // expire_time
2111            reduce_only,
2112            false, // quote_quantity
2113            None,  // display_qty
2114            None,  // emulation_trigger
2115            None,  // trigger_instrument_id
2116            Some(contingency),
2117            None, // order_list_id
2118            linked_ids.map(|ids| ids.into_iter().map(ClientOrderId::from).collect()),
2119            parent_id.map(ClientOrderId::from),
2120            None, // exec_algorithm_id
2121            None, // exec_algorithm_params
2122            None, // exec_spawn_id
2123            None, // tags
2124            Default::default(),
2125            Default::default(),
2126        ))
2127    }
2128
2129    #[rstest]
2130    #[case::independent_orders(
2131        vec![
2132            limit_order("O-001", false, ContingencyType::NoContingency, None, None),
2133            limit_order("O-002", false, ContingencyType::NoContingency, None, None),
2134        ],
2135        HyperliquidExecGrouping::Na,
2136    )]
2137    #[case::bracket_oto(
2138        vec![
2139            limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002", "O-003"]), None),
2140            limit_order("O-002", true, ContingencyType::Oco, Some(vec!["O-003"]), Some("O-001")),
2141            stop_order("O-003", true, ContingencyType::Oco, Some(vec!["O-002"]), Some("O-001")),
2142        ],
2143        HyperliquidExecGrouping::NormalTpsl,
2144    )]
2145    #[case::oto_not_bracket_shaped(
2146        vec![
2147            limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002"]), None),
2148            limit_order("O-002", false, ContingencyType::Oto, Some(vec!["O-001"]), None),
2149        ],
2150        HyperliquidExecGrouping::Na,
2151    )]
2152    #[case::oco_all_reduce_only(
2153        vec![
2154            limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-002"]), None),
2155            stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-001"]), None),
2156        ],
2157        HyperliquidExecGrouping::PositionTpsl,
2158    )]
2159    #[case::oco_not_all_reduce_only(
2160        vec![
2161            limit_order("O-001", false, ContingencyType::Oco, Some(vec!["O-002"]), None),
2162            stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-001"]), None),
2163        ],
2164        HyperliquidExecGrouping::Na,
2165    )]
2166    #[case::oto_with_non_oco_children(
2167        vec![
2168            limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002", "O-003"]), None),
2169            limit_order("O-002", true, ContingencyType::NoContingency, None, None),
2170            stop_order("O-003", true, ContingencyType::NoContingency, None, None),
2171        ],
2172        HyperliquidExecGrouping::Na,
2173    )]
2174    #[case::mixed_oco_and_plain_reduce_only(
2175        vec![
2176            limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-002"]), None),
2177            stop_order("O-002", true, ContingencyType::NoContingency, None, None),
2178        ],
2179        HyperliquidExecGrouping::Na,
2180    )]
2181    #[case::unlinked_oco_reduce_only(
2182        vec![
2183            limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-099"]), None),
2184            stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-098"]), None),
2185        ],
2186        HyperliquidExecGrouping::Na,
2187    )]
2188    #[case::single_order(
2189        vec![limit_order("O-001", false, ContingencyType::NoContingency, None, None)],
2190        HyperliquidExecGrouping::Na,
2191    )]
2192    fn test_determine_order_list_grouping(
2193        #[case] orders: Vec<OrderAny>,
2194        #[case] expected: HyperliquidExecGrouping,
2195    ) {
2196        let result = determine_order_list_grouping(&orders);
2197        assert_eq!(result, expected);
2198    }
2199
2200    fn limit_order_with_quote_quantity(id: &str, quote_quantity: bool) -> OrderAny {
2201        OrderAny::Limit(LimitOrder::new(
2202            TraderId::from("TESTER-001"),
2203            StrategyId::from("S-001"),
2204            InstrumentId::from(TEST_INSTRUMENT_ID),
2205            ClientOrderId::from(id),
2206            OrderSide::Buy,
2207            Quantity::from("0.0001"),
2208            Price::from("56730.0"),
2209            TimeInForce::Gtc,
2210            None,
2211            false,
2212            false,
2213            quote_quantity,
2214            None,
2215            None,
2216            None,
2217            Some(ContingencyType::NoContingency),
2218            None,
2219            None,
2220            None,
2221            None,
2222            None,
2223            None,
2224            None,
2225            Default::default(),
2226            Default::default(),
2227        ))
2228    }
2229
2230    #[rstest]
2231    fn test_register_order_identity_registers_regular_order() {
2232        let state = WsDispatchState::new();
2233        let order = limit_order_with_quote_quantity("O-REG-001", false);
2234
2235        register_order_identity_into(&state, &order);
2236
2237        let found = state
2238            .lookup_identity(&ClientOrderId::from("O-REG-001"))
2239            .expect("identity should be registered");
2240        assert_eq!(found.strategy_id, StrategyId::from("S-001"));
2241        assert_eq!(found.instrument_id, InstrumentId::from(TEST_INSTRUMENT_ID));
2242        assert_eq!(found.order_side, OrderSide::Buy);
2243        assert_eq!(found.order_type, OrderType::Limit);
2244        assert_eq!(found.quantity, Quantity::from("0.0001"));
2245        assert_eq!(found.price, Some(Price::from("56730.0")));
2246    }
2247
2248    #[rstest]
2249    fn test_register_order_identity_skips_quote_quantity_order() {
2250        let state = WsDispatchState::new();
2251        let order = limit_order_with_quote_quantity("O-QQ-001", true);
2252
2253        register_order_identity_into(&state, &order);
2254
2255        // Quote-quantity orders flow through the untracked path so the engine
2256        // reconciles them from status reports; registering would make the
2257        // cumulative-fill comparison mismatch base-unit fills against the
2258        // quote-unit tracked quantity and leave the order stuck "open".
2259        assert!(
2260            state
2261                .lookup_identity(&ClientOrderId::from("O-QQ-001"))
2262                .is_none()
2263        );
2264    }
2265
2266    #[rstest]
2267    fn test_handle_execution_report_skip_keeps_cloid_mapping() {
2268        // Regression guard for GH-3827: when the dispatch returns Skip (e.g.
2269        // the stale cancel leg of a cancel-replace), the cloid mapping must
2270        // stay in place so the still-open replacement order can still be
2271        // resolved by subsequent events.
2272        let ws_client = make_ws_client();
2273        let (emitter, mut rx) = test_emitter();
2274        let state = WsDispatchState::new();
2275        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2276
2277        let cid = ClientOrderId::from("O-HER-SKIP");
2278        state.register_identity(cid, test_identity());
2279        // Prime state so the later CANCELED(old_voi) is classified as stale.
2280        state.insert_accepted(cid);
2281        state.record_venue_order_id(cid, VenueOrderId::new("new-voi"));
2282
2283        ws_client.cache_cloid_mapping(cloid_for("O-HER-SKIP"), cid);
2284
2285        let stale_cancel = make_status_report(Some("O-HER-SKIP"), "old-voi", OrderStatus::Canceled);
2286        handle_execution_report(
2287            ExecutionReport::Order(stale_cancel),
2288            &state,
2289            &emitter,
2290            &ws_client,
2291            &mut pending_cloids,
2292            UnixNanos::default(),
2293        );
2294
2295        assert!(drain_events(&mut rx).is_empty());
2296        // Cloid mapping preserved; the replacement order still resolves.
2297        assert_eq!(
2298            ws_client.get_cloid_mapping(&cloid_for("O-HER-SKIP")),
2299            Some(cid)
2300        );
2301        // Identity is still tracked (the skip path did not clean up).
2302        assert!(state.lookup_identity(&cid).is_some());
2303    }
2304
2305    #[rstest]
2306    fn test_handle_execution_report_tracked_terminal_evicts_cloid() {
2307        // A tracked CANCELED that reaches a genuine terminal state should
2308        // emit OrderCanceled and evict the cloid mapping so long-running
2309        // sessions do not leak.
2310        let ws_client = make_ws_client();
2311        let (emitter, mut rx) = test_emitter();
2312        let state = WsDispatchState::new();
2313        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2314
2315        let cid = ClientOrderId::from("O-HER-CANCEL");
2316        state.register_identity(cid, test_identity());
2317        state.insert_accepted(cid);
2318        state.record_venue_order_id(cid, VenueOrderId::new("v-cancel"));
2319
2320        ws_client.cache_cloid_mapping(cloid_for("O-HER-CANCEL"), cid);
2321
2322        let report = make_status_report(Some("O-HER-CANCEL"), "v-cancel", OrderStatus::Canceled);
2323        handle_execution_report(
2324            ExecutionReport::Order(report),
2325            &state,
2326            &emitter,
2327            &ws_client,
2328            &mut pending_cloids,
2329            UnixNanos::default(),
2330        );
2331
2332        let events = drain_events(&mut rx);
2333        assert_eq!(events.len(), 1);
2334        assert!(matches!(
2335            events[0],
2336            ExecutionEvent::Order(OrderEventAny::Canceled(_))
2337        ));
2338        assert_eq!(
2339            ws_client.get_cloid_mapping(&cloid_for("O-HER-CANCEL")),
2340            None
2341        );
2342        assert!(state.filled_orders.contains(&cid));
2343    }
2344
2345    #[rstest]
2346    fn test_handle_execution_report_filled_marker_then_fill_evicts_on_fill() {
2347        // The status-only FILLED marker defers the cloid eviction to the
2348        // pending cache; the matching FillReport emits OrderFilled and then
2349        // evicts the cloid mapping as part of the deferred-cleanup path.
2350        let ws_client = make_ws_client();
2351        let (emitter, mut rx) = test_emitter();
2352        let state = WsDispatchState::new();
2353        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2354
2355        let cid = ClientOrderId::from("O-HER-FILL");
2356        state.register_identity(cid, test_identity());
2357        state.insert_accepted(cid);
2358        state.record_venue_order_id(cid, VenueOrderId::new("v-fill"));
2359
2360        ws_client.cache_cloid_mapping(cloid_for("O-HER-FILL"), cid);
2361
2362        let status_marker = make_status_report(Some("O-HER-FILL"), "v-fill", OrderStatus::Filled);
2363        handle_execution_report(
2364            ExecutionReport::Order(status_marker),
2365            &state,
2366            &emitter,
2367            &ws_client,
2368            &mut pending_cloids,
2369            UnixNanos::default(),
2370        );
2371
2372        // Marker arrived: no event, cloid cleanup deferred, mapping retained.
2373        assert!(drain_events(&mut rx).is_empty());
2374        assert_eq!(
2375            ws_client.get_cloid_mapping(&cloid_for("O-HER-FILL")),
2376            Some(cid)
2377        );
2378
2379        let fill = make_fill_report(Some("O-HER-FILL"), "v-fill", "trade-fill");
2380        handle_execution_report(
2381            ExecutionReport::Fill(fill),
2382            &state,
2383            &emitter,
2384            &ws_client,
2385            &mut pending_cloids,
2386            UnixNanos::default(),
2387        );
2388
2389        let events = drain_events(&mut rx);
2390        assert_eq!(events.len(), 1);
2391        assert!(matches!(
2392            events[0],
2393            ExecutionEvent::Order(OrderEventAny::Filled(_))
2394        ));
2395        // Deferred cleanup fires once the fill lands.
2396        assert_eq!(ws_client.get_cloid_mapping(&cloid_for("O-HER-FILL")), None);
2397    }
2398
2399    #[rstest]
2400    fn test_handle_execution_report_external_terminal_evicts_cloid() {
2401        // External (untracked) terminal reports forward to the engine via
2402        // send_order_status_report and immediately evict the cloid mapping
2403        // so the client does not leak mappings for orders it does not own.
2404        let ws_client = make_ws_client();
2405        let (emitter, mut rx) = test_emitter();
2406        let state = WsDispatchState::new();
2407        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2408
2409        let cid = ClientOrderId::from("O-HER-EXT");
2410        ws_client.cache_cloid_mapping(cloid_for("O-HER-EXT"), cid);
2411
2412        let report = make_status_report(Some("O-HER-EXT"), "v-ext", OrderStatus::Canceled);
2413        handle_execution_report(
2414            ExecutionReport::Order(report),
2415            &state,
2416            &emitter,
2417            &ws_client,
2418            &mut pending_cloids,
2419            UnixNanos::default(),
2420        );
2421
2422        let events = drain_events(&mut rx);
2423        assert_eq!(events.len(), 1);
2424        assert!(
2425            matches!(events[0], ExecutionEvent::Report(_)),
2426            "external terminal report should forward to the engine as a report",
2427        );
2428        assert_eq!(ws_client.get_cloid_mapping(&cloid_for("O-HER-EXT")), None);
2429    }
2430
2431    #[rstest]
2432    fn test_handle_execution_report_open_status_preserves_cloid() {
2433        // An open (non-terminal) status must never touch the cloid mapping.
2434        let ws_client = make_ws_client();
2435        let (emitter, _rx) = test_emitter();
2436        let state = WsDispatchState::new();
2437        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2438
2439        let cid = ClientOrderId::from("O-HER-OPEN");
2440        state.register_identity(cid, test_identity());
2441        ws_client.cache_cloid_mapping(cloid_for("O-HER-OPEN"), cid);
2442
2443        let report = make_status_report(Some("O-HER-OPEN"), "v-open", OrderStatus::Accepted);
2444        handle_execution_report(
2445            ExecutionReport::Order(report),
2446            &state,
2447            &emitter,
2448            &ws_client,
2449            &mut pending_cloids,
2450            UnixNanos::default(),
2451        );
2452
2453        // Accepted is open → no cloid eviction regardless of outcome.
2454        assert_eq!(
2455            ws_client.get_cloid_mapping(&cloid_for("O-HER-OPEN")),
2456            Some(cid)
2457        );
2458    }
2459
2460    #[rstest]
2461    fn test_handle_execution_report_tracked_accepted_emits_typed_event() {
2462        // A tracked open ACCEPTED must flow through the typed-event path,
2463        // NOT the raw report fallback. Catches a mutation that swaps the
2464        // branch polarity inside `handle_execution_report`.
2465        let ws_client = make_ws_client();
2466        let (emitter, mut rx) = test_emitter();
2467        let state = WsDispatchState::new();
2468        let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2469
2470        let cid = ClientOrderId::from("O-HER-ACC");
2471        state.register_identity(cid, test_identity());
2472        ws_client.cache_cloid_mapping(cloid_for("O-HER-ACC"), cid);
2473
2474        let report = make_status_report(Some("O-HER-ACC"), "v-acc", OrderStatus::Accepted);
2475        handle_execution_report(
2476            ExecutionReport::Order(report),
2477            &state,
2478            &emitter,
2479            &ws_client,
2480            &mut pending_cloids,
2481            UnixNanos::default(),
2482        );
2483
2484        let events = drain_events(&mut rx);
2485        assert_eq!(events.len(), 1);
2486        assert!(
2487            matches!(events[0], ExecutionEvent::Order(OrderEventAny::Accepted(_))),
2488            "tracked accepted should route through the typed-event path",
2489        );
2490        // Mapping is unchanged because the status is still open.
2491        assert_eq!(
2492            ws_client.get_cloid_mapping(&cloid_for("O-HER-ACC")),
2493            Some(cid)
2494        );
2495    }
2496}