1use std::{
19 sync::{Arc, Mutex},
20 time::{Duration, Instant},
21};
22
23use anyhow::Context;
24use async_trait::async_trait;
25use nautilus_common::{
26 cache::fifo::FifoCache,
27 clients::ExecutionClient,
28 live::{runner::get_exec_event_sender, runtime::get_runtime},
29 messages::execution::{
30 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
31 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
32 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
33 },
34};
35use nautilus_core::{
36 MUTEX_POISONED, Params, UUID4, UnixNanos,
37 time::{AtomicTime, get_atomic_clock_realtime},
38};
39use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
40use nautilus_model::{
41 accounts::AccountAny,
42 enums::{AccountType, OmsType, OrderSide, OrderStatus, OrderType},
43 identifiers::{
44 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
45 },
46 orders::{Order, any::OrderAny},
47 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48 types::{AccountBalance, MarginBalance},
49};
50use tokio::task::JoinHandle;
51use ustr::Ustr;
52
53use crate::{
54 common::{
55 consts::{HYPERLIQUID_VENUE, NAUTILUS_BUILDER_ADDRESS},
56 credential::Secrets,
57 parse::{
58 clamp_price_to_precision, client_order_id_to_cancel_request_with_asset,
59 derive_limit_from_trigger, derive_market_order_price, extract_error_message,
60 extract_inner_error, extract_inner_errors, normalize_price,
61 order_to_hyperliquid_request_with_asset, parse_combined_account_balances_and_margins,
62 round_to_sig_figs,
63 },
64 },
65 config::HyperliquidExecClientConfig,
66 http::{
67 client::HyperliquidHttpClient,
68 models::{
69 ClearinghouseState, Cloid, HyperliquidExecAction, HyperliquidExecBuilderFee,
70 HyperliquidExecGrouping, HyperliquidExecModifyOrderRequest, HyperliquidExecOrderKind,
71 SpotClearinghouseState,
72 },
73 },
74 websocket::{
75 ExecutionReport, NautilusWsMessage,
76 client::HyperliquidWebSocketClient,
77 dispatch::{
78 DispatchOutcome, OrderIdentity, WsDispatchState, dispatch_fill_report,
79 dispatch_order_status_report,
80 },
81 },
82};
83
84#[derive(Debug)]
85pub struct HyperliquidExecutionClient {
86 core: ExecutionClientCore,
87 clock: &'static AtomicTime,
88 config: HyperliquidExecClientConfig,
89 emitter: ExecutionEventEmitter,
90 http_client: HyperliquidHttpClient,
91 ws_client: HyperliquidWebSocketClient,
92 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
93 ws_stream_handle: Mutex<Option<JoinHandle<()>>>,
94 ws_dispatch_state: Arc<WsDispatchState>,
95}
96
97impl HyperliquidExecutionClient {
98 pub fn config(&self) -> &HyperliquidExecClientConfig {
100 &self.config
101 }
102
103 #[must_use]
110 pub fn ws_dispatch_state(&self) -> &Arc<WsDispatchState> {
111 &self.ws_dispatch_state
112 }
113
114 #[allow(
122 clippy::missing_panics_doc,
123 reason = "pending_tasks mutex poisoning is not expected"
124 )]
125 #[must_use]
126 pub fn pending_tasks_all_finished(&self) -> bool {
127 let tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
128 tasks.iter().all(|h| h.is_finished())
129 }
130
131 fn resolve_slippage_bps(&self, params: Option<&Params>) -> u32 {
132 params
133 .and_then(|p| p.get_u64("market_order_slippage_bps"))
134 .map_or(self.config.market_order_slippage_bps, |v| v as u32)
135 }
136
137 fn validate_order_submission(&self, order: &OrderAny) -> anyhow::Result<()> {
138 let instrument_id = order.instrument_id();
141 let symbol = instrument_id.symbol.as_str();
142 if !symbol.ends_with("-PERP") && !symbol.ends_with("-SPOT") {
143 anyhow::bail!(
144 "Unsupported instrument symbol format for Hyperliquid: {symbol} (expected -PERP or -SPOT suffix)"
145 );
146 }
147
148 match order.order_type() {
150 OrderType::Market
151 | OrderType::Limit
152 | OrderType::StopMarket
153 | OrderType::StopLimit
154 | OrderType::MarketIfTouched
155 | OrderType::LimitIfTouched => {}
156 _ => anyhow::bail!(
157 "Unsupported order type for Hyperliquid: {:?}",
158 order.order_type()
159 ),
160 }
161
162 if matches!(
164 order.order_type(),
165 OrderType::StopMarket
166 | OrderType::StopLimit
167 | OrderType::MarketIfTouched
168 | OrderType::LimitIfTouched
169 ) && order.trigger_price().is_none()
170 {
171 anyhow::bail!(
172 "Conditional orders require a trigger price for Hyperliquid: {:?}",
173 order.order_type()
174 );
175 }
176
177 if matches!(
179 order.order_type(),
180 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
181 ) && order.price().is_none()
182 {
183 anyhow::bail!(
184 "Limit orders require a limit price for Hyperliquid: {:?}",
185 order.order_type()
186 );
187 }
188
189 Ok(())
190 }
191
192 pub fn new(
198 core: ExecutionClientCore,
199 config: HyperliquidExecClientConfig,
200 ) -> anyhow::Result<Self> {
201 let secrets = Secrets::resolve(
202 config.private_key.as_deref(),
203 config.vault_address.as_deref(),
204 config.environment,
205 )
206 .context("Hyperliquid execution client requires private key")?;
207
208 let mut http_client = HyperliquidHttpClient::with_secrets(
209 &secrets,
210 config.http_timeout_secs,
211 config.proxy_url.clone(),
212 )
213 .context("failed to create Hyperliquid HTTP client")?;
214
215 http_client.set_account_id(core.account_id);
216 http_client.set_account_address(config.account_address.clone());
217 http_client.set_normalize_prices(config.normalize_prices);
218 http_client.set_market_order_slippage_bps(config.market_order_slippage_bps);
219
220 if let Some(url) = &config.base_url_http {
222 http_client.set_base_info_url(url.clone());
223 }
224
225 if let Some(url) = &config.base_url_exchange {
226 http_client.set_base_exchange_url(url.clone());
227 }
228
229 let ws_url = config.base_url_ws.clone();
230 let ws_client = HyperliquidWebSocketClient::new(
231 ws_url,
232 config.environment,
233 Some(core.account_id),
234 config.transport_backend,
235 config.proxy_url.clone(),
236 );
237
238 let clock = get_atomic_clock_realtime();
239 let emitter = ExecutionEventEmitter::new(
240 clock,
241 core.trader_id,
242 core.account_id,
243 AccountType::Margin,
244 None,
245 );
246
247 Ok(Self {
248 core,
249 clock,
250 config,
251 emitter,
252 http_client,
253 ws_client,
254 pending_tasks: Mutex::new(Vec::new()),
255 ws_stream_handle: Mutex::new(None),
256 ws_dispatch_state: Arc::new(WsDispatchState::new()),
257 })
258 }
259
260 fn register_order_identity(&self, order: &OrderAny) {
261 register_order_identity_into(&self.ws_dispatch_state, order);
262 }
263
264 async fn ensure_instruments_initialized_async(&self) -> anyhow::Result<()> {
265 if self.core.instruments_initialized() {
266 return Ok(());
267 }
268
269 let instruments = self
270 .http_client
271 .request_instruments()
272 .await
273 .context("failed to request Hyperliquid instruments")?;
274
275 if instruments.is_empty() {
276 log::warn!(
277 "Instrument bootstrap yielded no instruments; WebSocket submissions may fail"
278 );
279 } else {
280 log::info!("Initialized {} instruments", instruments.len());
281
282 for instrument in &instruments {
283 self.http_client.cache_instrument(instrument);
284 }
285 }
286
287 self.core.set_instruments_initialized();
288 Ok(())
289 }
290
291 async fn refresh_account_state(&self) -> anyhow::Result<()> {
292 let account_address = self.get_account_address()?;
293
294 let (perp_state, spot_state) = self
295 .fetch_combined_clearinghouse_state(&account_address)
296 .await?;
297
298 log::debug!(
299 "Received clearinghouse state: cross_margin_summary={:?}, asset_positions={}, spot_balances={}",
300 perp_state.cross_margin_summary,
301 perp_state.asset_positions.len(),
302 spot_state.balances.len(),
303 );
304
305 let (balances, margins) =
306 parse_combined_account_balances_and_margins(&perp_state, &spot_state)
307 .context("failed to parse combined account balances and margins")?;
308
309 let ts_event = self.clock.get_time_ns();
312 self.emitter
313 .emit_account_state(balances, margins, true, ts_event);
314
315 log::info!("Account state updated successfully");
316 Ok(())
317 }
318
319 async fn fetch_combined_clearinghouse_state(
320 &self,
321 account_address: &str,
322 ) -> anyhow::Result<(ClearinghouseState, SpotClearinghouseState)> {
323 let perp_json = self
324 .http_client
325 .info_clearinghouse_state(account_address)
326 .await
327 .context("failed to fetch clearinghouse state")?;
328 let perp_state: ClearinghouseState = serde_json::from_value(perp_json)
329 .context("failed to deserialize clearinghouse state")?;
330
331 let spot_json = self
332 .http_client
333 .info_spot_clearinghouse_state(account_address)
334 .await
335 .context("failed to fetch spot clearinghouse state")?;
336 let spot_state: SpotClearinghouseState = serde_json::from_value(spot_json)
337 .context("failed to deserialize spot clearinghouse state")?;
338
339 Ok((perp_state, spot_state))
340 }
341
342 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
343 let account_id = self.core.account_id;
344
345 if self.core.cache().account(&account_id).is_some() {
346 log::info!("Account {account_id} registered");
347 return Ok(());
348 }
349
350 let start = Instant::now();
351 let timeout = Duration::from_secs_f64(timeout_secs);
352 let interval = Duration::from_millis(10);
353
354 loop {
355 tokio::time::sleep(interval).await;
356
357 if self.core.cache().account(&account_id).is_some() {
358 log::info!("Account {account_id} registered");
359 return Ok(());
360 }
361
362 if start.elapsed() >= timeout {
363 anyhow::bail!(
364 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
365 );
366 }
367 }
368 }
369
370 fn get_user_address(&self) -> anyhow::Result<String> {
371 self.http_client
372 .get_user_address()
373 .context("failed to get user address from HTTP client")
374 }
375
376 fn get_account_address(&self) -> anyhow::Result<String> {
377 if let Some(addr) = &self.config.account_address {
378 return Ok(addr.clone());
379 }
380
381 match &self.config.vault_address {
382 Some(vault) => Ok(vault.clone()),
383 None => self.get_user_address(),
384 }
385 }
386
387 fn spawn_task<F>(&self, description: &'static str, fut: F)
388 where
389 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
390 {
391 let runtime = get_runtime();
392 let handle = runtime.spawn(async move {
393 if let Err(e) = fut.await {
394 log::warn!("{description} failed: {e:?}");
395 }
396 });
397
398 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
399 tasks.retain(|handle| !handle.is_finished());
400 tasks.push(handle);
401 }
402
403 fn abort_pending_tasks(&self) {
404 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
405 for handle in tasks.drain(..) {
406 handle.abort();
407 }
408 }
409}
410
411#[async_trait(?Send)]
412impl ExecutionClient for HyperliquidExecutionClient {
413 fn is_connected(&self) -> bool {
414 self.core.is_connected()
415 }
416
417 fn client_id(&self) -> ClientId {
418 self.core.client_id
419 }
420
421 fn account_id(&self) -> AccountId {
422 self.core.account_id
423 }
424
425 fn venue(&self) -> Venue {
426 *HYPERLIQUID_VENUE
427 }
428
429 fn oms_type(&self) -> OmsType {
430 self.core.oms_type
431 }
432
433 fn get_account(&self) -> Option<AccountAny> {
434 self.core.cache().account(&self.core.account_id).cloned()
435 }
436
437 fn generate_account_state(
438 &self,
439 balances: Vec<AccountBalance>,
440 margins: Vec<MarginBalance>,
441 reported: bool,
442 ts_event: UnixNanos,
443 ) -> anyhow::Result<()> {
444 self.emitter
445 .emit_account_state(balances, margins, reported, ts_event);
446 Ok(())
447 }
448
449 fn start(&mut self) -> anyhow::Result<()> {
450 if self.core.is_started() {
451 return Ok(());
452 }
453
454 let sender = get_exec_event_sender();
455 self.emitter.set_sender(sender);
456 self.core.set_started();
457
458 log::info!(
459 "Started: client_id={}, account_id={}, environment={:?}, vault_address={:?}, proxy_url={:?}",
460 self.core.client_id,
461 self.core.account_id,
462 self.config.environment,
463 self.config.vault_address,
464 self.config.proxy_url,
465 );
466
467 Ok(())
468 }
469
470 fn stop(&mut self) -> anyhow::Result<()> {
471 if self.core.is_stopped() {
472 return Ok(());
473 }
474
475 log::info!("Stopping Hyperliquid execution client");
476
477 if let Some(handle) = self.ws_stream_handle.lock().expect(MUTEX_POISONED).take() {
478 handle.abort();
479 }
480
481 self.abort_pending_tasks();
482 self.ws_client.abort();
483
484 self.core.set_disconnected();
485 self.core.set_stopped();
486
487 log::info!("Hyperliquid execution client stopped");
488 Ok(())
489 }
490
491 fn submit_order(&self, cmd: SubmitOrder) -> anyhow::Result<()> {
492 let order = self
493 .core
494 .cache()
495 .order(&cmd.client_order_id)
496 .cloned()
497 .ok_or_else(|| {
498 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
499 })?;
500
501 if order.is_closed() {
502 log::warn!("Cannot submit closed order {}", order.client_order_id());
503 return Ok(());
504 }
505
506 if let Err(e) = self.validate_order_submission(&order) {
507 self.emitter
508 .emit_order_denied(&order, &format!("Validation failed: {e}"));
509 return Err(e);
510 }
511
512 let http_client = self.http_client.clone();
513 let symbol = order.instrument_id().symbol.to_string();
514
515 let asset = match http_client.get_asset_index(&symbol) {
517 Some(a) => a,
518 None => {
519 self.emitter
520 .emit_order_denied(&order, &format!("Asset index not found for {symbol}"));
521 return Ok(());
522 }
523 };
524
525 let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
527 let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
528 let mut hyperliquid_order = match order_to_hyperliquid_request_with_asset(
529 &order,
530 asset,
531 price_decimals,
532 self.config.normalize_prices,
533 slippage_bps,
534 ) {
535 Ok(req) => req,
536 Err(e) => {
537 self.emitter
538 .emit_order_denied(&order, &format!("Order conversion failed: {e}"));
539 return Ok(());
540 }
541 };
542
543 if order.order_type() == OrderType::Market {
545 let instrument_id = order.instrument_id();
546 let cache = self.core.cache();
547 match cache.quote(&instrument_id) {
548 Some(quote) => {
549 let is_buy = order.order_side() == OrderSide::Buy;
550 hyperliquid_order.price =
551 derive_market_order_price(quote, is_buy, price_decimals, slippage_bps);
552 }
553 None => {
554 self.emitter.emit_order_denied(
555 &order,
556 &format!(
557 "No cached quote for {instrument_id}: \
558 subscribe to quote data before submitting market orders"
559 ),
560 );
561 return Ok(());
562 }
563 }
564 }
565
566 log::info!(
567 "Submitting order: id={}, type={:?}, side={:?}, price={}, size={}, kind={:?}",
568 order.client_order_id(),
569 order.order_type(),
570 order.order_side(),
571 hyperliquid_order.price,
572 hyperliquid_order.size,
573 hyperliquid_order.kind,
574 );
575
576 let cloid = Cloid::from_client_order_id(order.client_order_id());
579 self.ws_client
580 .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
581
582 self.register_order_identity(&order);
583
584 self.emitter.emit_order_submitted(&order);
585
586 let emitter = self.emitter.clone();
587 let clock = self.clock;
588 let ws_client = self.ws_client.clone();
589 let cloid_hex = Ustr::from(&cloid.to_hex());
590 let dispatch_state = self.ws_dispatch_state.clone();
591 let client_order_id = order.client_order_id();
592
593 let builder = if self.http_client.has_vault_address() {
596 None
597 } else {
598 Some(HyperliquidExecBuilderFee {
599 address: NAUTILUS_BUILDER_ADDRESS.to_string(),
600 fee_tenths_bp: 0,
601 })
602 };
603
604 self.spawn_task("submit_order", async move {
605 let action = HyperliquidExecAction::Order {
606 orders: vec![hyperliquid_order],
607 grouping: HyperliquidExecGrouping::Na,
608 builder,
609 };
610
611 match http_client.post_action_exec(&action).await {
612 Ok(response) => {
613 if response.is_ok() {
614 if let Some(inner_error) = extract_inner_error(&response) {
615 log::warn!("Order submission rejected by exchange: {inner_error}");
616 let ts = clock.get_time_ns();
617 emitter.emit_order_rejected(&order, &inner_error, ts, false);
618 ws_client.remove_cloid_mapping(&cloid_hex);
619 dispatch_state.cleanup_terminal(&client_order_id);
620 } else {
621 log::info!("Order submitted successfully: {response:?}");
622 }
623 } else {
624 let error_msg = extract_error_message(&response);
625 log::warn!("Order submission rejected by exchange: {error_msg}");
626 let ts = clock.get_time_ns();
627 emitter.emit_order_rejected(&order, &error_msg, ts, false);
628 ws_client.remove_cloid_mapping(&cloid_hex);
629 dispatch_state.cleanup_terminal(&client_order_id);
630 }
631 }
632 Err(e) => {
633 log::error!("Order submission HTTP request failed: {e}");
637 }
638 }
639
640 Ok(())
641 });
642
643 Ok(())
644 }
645
646 fn submit_order_list(&self, cmd: SubmitOrderList) -> anyhow::Result<()> {
647 log::debug!(
648 "Submitting order list with {} orders",
649 cmd.order_list.client_order_ids.len()
650 );
651
652 let http_client = self.http_client.clone();
653 let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
654
655 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
656
657 let mut valid_orders = Vec::new();
659 let mut hyperliquid_orders = Vec::new();
660
661 for order in &orders {
662 let symbol = order.instrument_id().symbol.to_string();
663 let asset = match http_client.get_asset_index(&symbol) {
664 Some(a) => a,
665 None => {
666 self.emitter
667 .emit_order_denied(order, &format!("Asset index not found for {symbol}"));
668 continue;
669 }
670 };
671
672 let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
673
674 match order_to_hyperliquid_request_with_asset(
675 order,
676 asset,
677 price_decimals,
678 self.config.normalize_prices,
679 slippage_bps,
680 ) {
681 Ok(req) => {
682 hyperliquid_orders.push(req);
683 valid_orders.push(order.clone());
684 }
685 Err(e) => {
686 self.emitter
687 .emit_order_denied(order, &format!("Order conversion failed: {e}"));
688 }
689 }
690 }
691
692 if valid_orders.is_empty() {
693 log::warn!("No valid orders to submit in order list");
694 return Ok(());
695 }
696
697 let grouping = determine_order_list_grouping(&valid_orders);
698 log::info!("Order list grouping: {grouping:?}");
699
700 for order in &valid_orders {
701 let cloid = Cloid::from_client_order_id(order.client_order_id());
702 self.ws_client
703 .cache_cloid_mapping(Ustr::from(&cloid.to_hex()), order.client_order_id());
704 self.register_order_identity(order);
705 self.emitter.emit_order_submitted(order);
706 }
707
708 let emitter = self.emitter.clone();
709 let clock = self.clock;
710 let ws_client = self.ws_client.clone();
711 let dispatch_state = self.ws_dispatch_state.clone();
712 let cloid_hexes: Vec<Ustr> = valid_orders
713 .iter()
714 .map(|o| Ustr::from(&Cloid::from_client_order_id(o.client_order_id()).to_hex()))
715 .collect();
716 let client_order_ids: Vec<ClientOrderId> =
717 valid_orders.iter().map(|o| o.client_order_id()).collect();
718
719 let builder = if self.http_client.has_vault_address() {
720 None
721 } else {
722 Some(HyperliquidExecBuilderFee {
723 address: NAUTILUS_BUILDER_ADDRESS.to_string(),
724 fee_tenths_bp: 0,
725 })
726 };
727
728 self.spawn_task("submit_order_list", async move {
729 let action = HyperliquidExecAction::Order {
730 orders: hyperliquid_orders,
731 grouping,
732 builder,
733 };
734
735 match http_client.post_action_exec(&action).await {
736 Ok(response) => {
737 if response.is_ok() {
738 let inner_errors = extract_inner_errors(&response);
739
740 if inner_errors.len() < valid_orders.len() {
746 if let Some(error_msg) = inner_errors.iter().find_map(|e| e.as_ref()) {
747 let ts = clock.get_time_ns();
748
749 for ((order, cloid_hex), cid) in valid_orders
750 .iter()
751 .zip(cloid_hexes.iter())
752 .zip(client_order_ids.iter())
753 {
754 log::warn!(
755 "Order {} rejected by exchange: {error_msg}",
756 order.client_order_id(),
757 );
758 emitter.emit_order_rejected(order, error_msg, ts, false);
759 ws_client.remove_cloid_mapping(cloid_hex);
760 dispatch_state.cleanup_terminal(cid);
761 }
762 } else {
763 log::info!("Order list submitted successfully: {response:?}");
764 }
765 } else if inner_errors.iter().any(|e| e.is_some()) {
766 let ts = clock.get_time_ns();
767
768 for (i, error) in inner_errors.iter().enumerate() {
769 if let Some(error_msg) = error {
770 if let Some(order) = valid_orders.get(i) {
771 log::warn!(
772 "Order {} rejected by exchange: {error_msg}",
773 order.client_order_id(),
774 );
775 emitter.emit_order_rejected(order, error_msg, ts, false);
776 }
777
778 if let Some(cloid_hex) = cloid_hexes.get(i) {
779 ws_client.remove_cloid_mapping(cloid_hex);
780 }
781
782 if let Some(cid) = client_order_ids.get(i) {
783 dispatch_state.cleanup_terminal(cid);
784 }
785 }
786 }
787 } else {
788 log::info!("Order list submitted successfully: {response:?}");
789 }
790 } else {
791 let error_msg = extract_error_message(&response);
792 log::warn!("Order list submission rejected by exchange: {error_msg}");
793 let ts = clock.get_time_ns();
794 for order in &valid_orders {
795 emitter.emit_order_rejected(order, &error_msg, ts, false);
796 }
797
798 for cloid_hex in &cloid_hexes {
799 ws_client.remove_cloid_mapping(cloid_hex);
800 }
801
802 for cid in &client_order_ids {
803 dispatch_state.cleanup_terminal(cid);
804 }
805 }
806 }
807 Err(e) => {
808 log::error!("Order list submission HTTP request failed: {e}");
812 }
813 }
814
815 Ok(())
816 });
817
818 Ok(())
819 }
820
821 fn modify_order(&self, cmd: ModifyOrder) -> anyhow::Result<()> {
822 log::debug!("Modifying order: {cmd:?}");
823
824 let venue_order_id = match cmd.venue_order_id {
825 Some(id) => id,
826 None => {
827 let reason = "venue_order_id is required for modify";
828 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
829 self.emitter.emit_order_modify_rejected_event(
830 cmd.strategy_id,
831 cmd.instrument_id,
832 cmd.client_order_id,
833 None,
834 reason,
835 self.clock.get_time_ns(),
836 );
837 return Ok(());
838 }
839 };
840
841 let oid: u64 = match venue_order_id.as_str().parse() {
842 Ok(id) => id,
843 Err(e) => {
844 let reason = format!("Failed to parse venue_order_id '{venue_order_id}': {e}");
845 log::warn!("{reason}");
846 self.emitter.emit_order_modify_rejected_event(
847 cmd.strategy_id,
848 cmd.instrument_id,
849 cmd.client_order_id,
850 Some(venue_order_id),
851 &reason,
852 self.clock.get_time_ns(),
853 );
854 return Ok(());
855 }
856 };
857
858 let order = match self.core.cache().order(&cmd.client_order_id).cloned() {
860 Some(o) => o,
861 None => {
862 let reason = "order not found in cache";
863 log::warn!("Cannot modify order {}: {reason}", cmd.client_order_id);
864 self.emitter.emit_order_modify_rejected_event(
865 cmd.strategy_id,
866 cmd.instrument_id,
867 cmd.client_order_id,
868 Some(venue_order_id),
869 reason,
870 self.clock.get_time_ns(),
871 );
872 return Ok(());
873 }
874 };
875
876 let http_client = self.http_client.clone();
877 let symbol = cmd.instrument_id.symbol.to_string();
878 let should_normalize = self.config.normalize_prices;
879 let slippage_bps = self.resolve_slippage_bps(cmd.params.as_ref());
880
881 let quantity = cmd.quantity.unwrap_or(order.leaves_qty());
882 let price_decimals = http_client.get_price_precision(&symbol).unwrap_or(2);
883 let asset = match http_client.get_asset_index(&symbol) {
884 Some(a) => a,
885 None => {
886 log::warn!(
887 "Asset index not found for symbol {symbol}, ensure instruments are loaded",
888 );
889 return Ok(());
890 }
891 };
892
893 let hyperliquid_order = match order_to_hyperliquid_request_with_asset(
896 &order,
897 asset,
898 price_decimals,
899 should_normalize,
900 slippage_bps,
901 ) {
902 Ok(mut req) => {
903 if let Some(p) = cmd.price.or(order.price()) {
905 let price_dec = p.as_decimal();
906 req.price = if should_normalize {
907 normalize_price(price_dec, price_decimals).normalize()
908 } else {
909 price_dec.normalize()
910 };
911 } else if let Some(tp) = cmd.trigger_price {
912 let is_buy = order.order_side() == OrderSide::Buy;
915 let base = tp.as_decimal().normalize();
916 let derived = derive_limit_from_trigger(base, is_buy, slippage_bps);
917 let sig_rounded = round_to_sig_figs(derived, 5);
918 req.price =
919 clamp_price_to_precision(sig_rounded, price_decimals, is_buy).normalize();
920 }
921 req.size = quantity.as_decimal().normalize();
924
925 if let (Some(tp), HyperliquidExecOrderKind::Trigger { trigger }) =
927 (cmd.trigger_price, &mut req.kind)
928 {
929 let tp_dec = tp.as_decimal();
930 trigger.trigger_px = if should_normalize {
931 normalize_price(tp_dec, price_decimals).normalize()
932 } else {
933 tp_dec.normalize()
934 };
935 }
936
937 req
938 }
939 Err(e) => {
940 log::warn!("Order conversion failed for modify: {e}");
941 return Ok(());
942 }
943 };
944
945 let dispatch_state = self.ws_dispatch_state.clone();
946 let client_order_id = cmd.client_order_id;
947 let old_venue_order_id = venue_order_id;
948
949 self.spawn_task("modify_order", async move {
950 let action = HyperliquidExecAction::Modify {
951 modify: HyperliquidExecModifyOrderRequest {
952 oid,
953 order: hyperliquid_order,
954 },
955 };
956
957 match http_client.post_action_exec(&action).await {
958 Ok(response) => {
959 if response.is_ok() {
960 if let Some(inner_error) = extract_inner_error(&response) {
961 log::warn!("Order modification rejected by exchange: {inner_error}");
962 } else {
963 dispatch_state.mark_pending_modify(client_order_id, old_venue_order_id);
969 log::info!("Order modified successfully: {response:?}");
970 }
971 } else {
972 let error_msg = extract_error_message(&response);
973 log::warn!("Order modification rejected by exchange: {error_msg}");
974 }
975 }
976 Err(e) => {
977 log::warn!("Order modification HTTP request failed: {e}");
978 }
979 }
980
981 Ok(())
982 });
983
984 Ok(())
985 }
986
987 fn cancel_order(&self, cmd: CancelOrder) -> anyhow::Result<()> {
988 log::debug!("Cancelling order: {cmd:?}");
989
990 let http_client = self.http_client.clone();
991 let emitter = self.emitter.clone();
992 let clock = self.clock;
993 let client_order_id = cmd.client_order_id;
994 let client_order_id_str = cmd.client_order_id.to_string();
995 let strategy_id = cmd.strategy_id;
996 let instrument_id = cmd.instrument_id;
997 let venue_order_id = cmd.venue_order_id;
998 let symbol = cmd.instrument_id.symbol.to_string();
999
1000 self.spawn_task("cancel_order", async move {
1001 let asset = match http_client.get_asset_index(&symbol) {
1002 Some(a) => a,
1003 None => {
1004 emitter.emit_order_cancel_rejected_event(
1005 strategy_id,
1006 instrument_id,
1007 client_order_id,
1008 venue_order_id,
1009 &format!("Asset index not found for symbol {symbol}"),
1010 clock.get_time_ns(),
1011 );
1012 return Ok(());
1013 }
1014 };
1015
1016 let cancel_request =
1017 client_order_id_to_cancel_request_with_asset(&client_order_id_str, asset);
1018 let action = HyperliquidExecAction::CancelByCloid {
1019 cancels: vec![cancel_request],
1020 };
1021
1022 match http_client.post_action_exec(&action).await {
1023 Ok(response) => {
1024 if response.is_ok() {
1025 if let Some(inner_error) = extract_inner_error(&response) {
1026 emitter.emit_order_cancel_rejected_event(
1027 strategy_id,
1028 instrument_id,
1029 client_order_id,
1030 venue_order_id,
1031 &inner_error,
1032 clock.get_time_ns(),
1033 );
1034 } else {
1035 log::info!("Order cancelled successfully: {response:?}");
1036 }
1037 } else {
1038 emitter.emit_order_cancel_rejected_event(
1039 strategy_id,
1040 instrument_id,
1041 client_order_id,
1042 venue_order_id,
1043 &extract_error_message(&response),
1044 clock.get_time_ns(),
1045 );
1046 }
1047 }
1048 Err(e) => {
1049 emitter.emit_order_cancel_rejected_event(
1050 strategy_id,
1051 instrument_id,
1052 client_order_id,
1053 venue_order_id,
1054 &format!("Cancel HTTP request failed: {e}"),
1055 clock.get_time_ns(),
1056 );
1057 }
1058 }
1059
1060 Ok(())
1061 });
1062
1063 Ok(())
1064 }
1065
1066 fn cancel_all_orders(&self, cmd: CancelAllOrders) -> anyhow::Result<()> {
1067 log::debug!("Cancelling all orders: {cmd:?}");
1068
1069 let cache = self.core.cache();
1070 let open_orders = cache.orders_open(
1071 Some(&self.core.venue),
1072 Some(&cmd.instrument_id),
1073 None,
1074 None,
1075 Some(cmd.order_side),
1076 );
1077
1078 if open_orders.is_empty() {
1079 log::debug!("No open orders to cancel for {:?}", cmd.instrument_id);
1080 return Ok(());
1081 }
1082
1083 let symbol = cmd.instrument_id.symbol.to_string();
1084 let instrument_id = cmd.instrument_id;
1085 let strategy_id = cmd.strategy_id;
1086 let entries: Vec<CancelEntry> = open_orders
1087 .iter()
1088 .map(|o| CancelEntry {
1089 strategy_id,
1090 instrument_id,
1091 client_order_id: o.client_order_id(),
1092 venue_order_id: o.venue_order_id(),
1093 symbol: symbol.clone(),
1094 })
1095 .collect();
1096
1097 let http_client = self.http_client.clone();
1098 let emitter = self.emitter.clone();
1099 let clock = self.clock;
1100
1101 self.spawn_task("cancel_all_orders", async move {
1102 let asset = match http_client.get_asset_index(&symbol) {
1103 Some(a) => a,
1104 None => {
1105 let reason = format!("Asset index not found for symbol {symbol}");
1106 log::warn!("{reason}");
1107 let ts = clock.get_time_ns();
1108
1109 for entry in &entries {
1110 emitter.emit_order_cancel_rejected_event(
1111 entry.strategy_id,
1112 entry.instrument_id,
1113 entry.client_order_id,
1114 entry.venue_order_id,
1115 &reason,
1116 ts,
1117 );
1118 }
1119 return Ok(());
1120 }
1121 };
1122
1123 let cancel_requests: Vec<_> = entries
1124 .iter()
1125 .map(|e| {
1126 client_order_id_to_cancel_request_with_asset(e.client_order_id.as_ref(), asset)
1127 })
1128 .collect();
1129
1130 if cancel_requests.is_empty() {
1131 return Ok(());
1132 }
1133
1134 let action = HyperliquidExecAction::CancelByCloid {
1135 cancels: cancel_requests,
1136 };
1137
1138 match http_client.post_action_exec(&action).await {
1139 Ok(response) => {
1140 if response.is_ok() {
1141 let inner_errors = extract_inner_errors(&response);
1142 let ts = clock.get_time_ns();
1143
1144 if inner_errors.is_empty() {
1145 log::info!("Cancel-all submitted successfully: {response:?}");
1146 } else {
1147 for (i, entry) in entries.iter().enumerate() {
1148 if let Some(Some(error_msg)) = inner_errors.get(i) {
1149 log::warn!(
1150 "Cancel for {} rejected by exchange: {error_msg}",
1151 entry.client_order_id,
1152 );
1153 emitter.emit_order_cancel_rejected_event(
1154 entry.strategy_id,
1155 entry.instrument_id,
1156 entry.client_order_id,
1157 entry.venue_order_id,
1158 error_msg,
1159 ts,
1160 );
1161 }
1162 }
1163 }
1164 } else {
1165 let error_msg = extract_error_message(&response);
1166 log::warn!("Cancel-all rejected by exchange: {error_msg}");
1167 let ts = clock.get_time_ns();
1168
1169 for entry in &entries {
1170 emitter.emit_order_cancel_rejected_event(
1171 entry.strategy_id,
1172 entry.instrument_id,
1173 entry.client_order_id,
1174 entry.venue_order_id,
1175 &error_msg,
1176 ts,
1177 );
1178 }
1179 }
1180 }
1181 Err(e) => {
1182 let reason = format!("Cancel-all HTTP request failed: {e}");
1183 log::warn!("{reason}");
1184 let ts = clock.get_time_ns();
1185
1186 for entry in &entries {
1187 emitter.emit_order_cancel_rejected_event(
1188 entry.strategy_id,
1189 entry.instrument_id,
1190 entry.client_order_id,
1191 entry.venue_order_id,
1192 &reason,
1193 ts,
1194 );
1195 }
1196 }
1197 }
1198
1199 Ok(())
1200 });
1201
1202 Ok(())
1203 }
1204
1205 fn batch_cancel_orders(&self, cmd: BatchCancelOrders) -> anyhow::Result<()> {
1206 log::debug!("Batch cancelling orders: {cmd:?}");
1207
1208 if cmd.cancels.is_empty() {
1209 log::debug!("No orders to cancel in batch");
1210 return Ok(());
1211 }
1212
1213 let entries: Vec<CancelEntry> = cmd
1214 .cancels
1215 .iter()
1216 .map(|c| CancelEntry {
1217 strategy_id: c.strategy_id,
1218 instrument_id: c.instrument_id,
1219 client_order_id: c.client_order_id,
1220 venue_order_id: c.venue_order_id,
1221 symbol: c.instrument_id.symbol.to_string(),
1222 })
1223 .collect();
1224
1225 let http_client = self.http_client.clone();
1226 let emitter = self.emitter.clone();
1227 let clock = self.clock;
1228
1229 self.spawn_task("batch_cancel_orders", async move {
1230 let mut cancel_requests = Vec::new();
1231 let mut sent_entries: Vec<&CancelEntry> = Vec::new();
1232
1233 for entry in &entries {
1234 let asset = match http_client.get_asset_index(&entry.symbol) {
1235 Some(a) => a,
1236 None => {
1237 let reason = format!("Asset index not found for symbol {}", entry.symbol);
1238 log::warn!("{reason}, skipping cancel for {}", entry.client_order_id);
1239 emitter.emit_order_cancel_rejected_event(
1240 entry.strategy_id,
1241 entry.instrument_id,
1242 entry.client_order_id,
1243 entry.venue_order_id,
1244 &reason,
1245 clock.get_time_ns(),
1246 );
1247 continue;
1248 }
1249 };
1250 cancel_requests.push(client_order_id_to_cancel_request_with_asset(
1251 entry.client_order_id.as_ref(),
1252 asset,
1253 ));
1254 sent_entries.push(entry);
1255 }
1256
1257 if cancel_requests.is_empty() {
1258 log::warn!("No valid cancel requests in batch");
1259 return Ok(());
1260 }
1261
1262 let action = HyperliquidExecAction::CancelByCloid {
1263 cancels: cancel_requests,
1264 };
1265
1266 match http_client.post_action_exec(&action).await {
1267 Ok(response) => {
1268 if response.is_ok() {
1269 let inner_errors = extract_inner_errors(&response);
1270 let ts = clock.get_time_ns();
1271
1272 if inner_errors.is_empty() {
1273 log::info!("Batch cancel submitted successfully: {response:?}");
1274 } else {
1275 for (i, entry) in sent_entries.iter().enumerate() {
1276 if let Some(Some(error_msg)) = inner_errors.get(i) {
1277 log::warn!(
1278 "Cancel for {} rejected by exchange: {error_msg}",
1279 entry.client_order_id,
1280 );
1281 emitter.emit_order_cancel_rejected_event(
1282 entry.strategy_id,
1283 entry.instrument_id,
1284 entry.client_order_id,
1285 entry.venue_order_id,
1286 error_msg,
1287 ts,
1288 );
1289 }
1290 }
1291 }
1292 } else {
1293 let error_msg = extract_error_message(&response);
1294 log::warn!("Batch cancel rejected by exchange: {error_msg}");
1295 let ts = clock.get_time_ns();
1296
1297 for entry in &sent_entries {
1298 emitter.emit_order_cancel_rejected_event(
1299 entry.strategy_id,
1300 entry.instrument_id,
1301 entry.client_order_id,
1302 entry.venue_order_id,
1303 &error_msg,
1304 ts,
1305 );
1306 }
1307 }
1308 }
1309 Err(e) => {
1310 let reason = format!("Batch cancel HTTP request failed: {e}");
1311 log::warn!("{reason}");
1312 let ts = clock.get_time_ns();
1313
1314 for entry in &sent_entries {
1315 emitter.emit_order_cancel_rejected_event(
1316 entry.strategy_id,
1317 entry.instrument_id,
1318 entry.client_order_id,
1319 entry.venue_order_id,
1320 &reason,
1321 ts,
1322 );
1323 }
1324 }
1325 }
1326
1327 Ok(())
1328 });
1329
1330 Ok(())
1331 }
1332
1333 fn query_account(&self, _cmd: QueryAccount) -> anyhow::Result<()> {
1334 let http_client = self.http_client.clone();
1335 let account_address = self.get_account_address()?;
1336 let emitter = self.emitter.clone();
1337 let clock = self.clock;
1338
1339 self.spawn_task("query_account", async move {
1340 let perp_json = http_client
1341 .info_clearinghouse_state(&account_address)
1342 .await
1343 .context("failed to fetch clearinghouse state")?;
1344
1345 let perp_state: ClearinghouseState = serde_json::from_value(perp_json)
1346 .context("failed to deserialize clearinghouse state")?;
1347
1348 let spot_json = http_client
1349 .info_spot_clearinghouse_state(&account_address)
1350 .await
1351 .context("failed to fetch spot clearinghouse state")?;
1352 let spot_state: SpotClearinghouseState = serde_json::from_value(spot_json)
1353 .context("failed to deserialize spot clearinghouse state")?;
1354
1355 let (balances, margins) =
1356 parse_combined_account_balances_and_margins(&perp_state, &spot_state)
1357 .context("failed to parse combined account balances and margins")?;
1358 let ts_event = clock.get_time_ns();
1359 emitter.emit_account_state(balances, margins, true, ts_event);
1360
1361 Ok(())
1362 });
1363
1364 Ok(())
1365 }
1366
1367 fn query_order(&self, cmd: QueryOrder) -> anyhow::Result<()> {
1368 log::debug!("Querying order: {cmd:?}");
1369
1370 let client_order_id = cmd.client_order_id;
1371 let venue_order_id = match cmd.venue_order_id {
1372 Some(voi) => Some(voi),
1373 None => self.core.cache().venue_order_id(&client_order_id).copied(),
1374 };
1375
1376 let account_address = self.get_account_address()?;
1377 let http_client = self.http_client.clone();
1378 let emitter = self.emitter.clone();
1379
1380 self.spawn_task("query_order", async move {
1381 match http_client
1386 .request_order_status_report_by_client_order_id(&account_address, &client_order_id)
1387 .await
1388 {
1389 Ok(Some(report)) => {
1390 log::info!("Queried order status for {client_order_id}");
1391 emitter.send_order_status_report(report);
1392 return Ok(());
1393 }
1394 Ok(None) => {}
1395 Err(e) => {
1396 log::warn!(
1397 "Failed to query order status for {client_order_id}: {e}; falling back to oid lookup"
1398 );
1399 }
1400 }
1401
1402 let Some(venue_order_id) = venue_order_id else {
1403 log::info!("No order status report found for {client_order_id}");
1404 return Ok(());
1405 };
1406
1407 let oid: u64 = match venue_order_id.as_str().parse() {
1408 Ok(oid) => oid,
1409 Err(e) => {
1410 log::warn!("Failed to parse venue order ID {venue_order_id}: {e}");
1411 return Ok(());
1412 }
1413 };
1414
1415 match http_client
1416 .request_order_status_report(&account_address, oid)
1417 .await
1418 {
1419 Ok(Some(report)) => {
1420 log::info!("Queried order status for oid {oid}");
1421 emitter.send_order_status_report(report);
1422 }
1423 Ok(None) => {
1424 log::info!("No order status report found for oid {oid}");
1425 }
1426 Err(e) => {
1427 log::warn!("Failed to query order status for oid {oid}: {e}");
1428 }
1429 }
1430
1431 Ok(())
1432 });
1433
1434 Ok(())
1435 }
1436
1437 async fn connect(&mut self) -> anyhow::Result<()> {
1438 if self.core.is_connected() {
1439 return Ok(());
1440 }
1441
1442 log::info!("Connecting Hyperliquid execution client");
1443
1444 self.ensure_instruments_initialized_async().await?;
1446
1447 self.start_ws_stream().await?;
1449
1450 let post_ws = async {
1452 self.refresh_account_state().await?;
1453 self.await_account_registered(30.0).await?;
1454
1455 Ok::<(), anyhow::Error>(())
1456 };
1457
1458 if let Err(e) = post_ws.await {
1459 log::warn!("Connect failed after WS started, tearing down: {e}");
1460 let _ = self.ws_client.disconnect().await;
1461 self.abort_pending_tasks();
1462 return Err(e);
1463 }
1464
1465 self.core.set_connected();
1466
1467 log::info!("Connected: client_id={}", self.core.client_id);
1468 Ok(())
1469 }
1470
1471 async fn disconnect(&mut self) -> anyhow::Result<()> {
1472 if self.core.is_disconnected() {
1473 return Ok(());
1474 }
1475
1476 log::info!("Disconnecting Hyperliquid execution client");
1477
1478 self.ws_client.disconnect().await?;
1480
1481 self.abort_pending_tasks();
1483
1484 self.core.set_disconnected();
1485
1486 log::info!("Disconnected: client_id={}", self.core.client_id);
1487 Ok(())
1488 }
1489
1490 async fn generate_order_status_report(
1491 &self,
1492 cmd: &GenerateOrderStatusReport,
1493 ) -> anyhow::Result<Option<OrderStatusReport>> {
1494 let account_address = self.get_account_address()?;
1495
1496 if cmd.venue_order_id.is_none() && cmd.client_order_id.is_none() {
1497 log::warn!(
1498 "Cannot generate order status report without venue_order_id or client_order_id"
1499 );
1500 return Ok(None);
1501 }
1502
1503 if let Some(client_order_id) = &cmd.client_order_id
1507 && let Some(report) = self
1508 .http_client
1509 .request_order_status_report_by_client_order_id(&account_address, client_order_id)
1510 .await
1511 .context("failed to generate order status report by client_order_id")?
1512 {
1513 log::info!("Generated order status report for {client_order_id}");
1514 return Ok(Some(report));
1515 }
1516
1517 let oid = match &cmd.venue_order_id {
1518 Some(venue_order_id) => venue_order_id
1519 .as_str()
1520 .parse::<u64>()
1521 .context("failed to parse venue_order_id as oid")?,
1522 None => match &cmd.client_order_id {
1523 Some(client_order_id) => {
1524 let cached_oid: Option<u64> = self
1525 .core
1526 .cache()
1527 .venue_order_id(client_order_id)
1528 .and_then(|v| v.as_str().parse::<u64>().ok());
1529
1530 match cached_oid {
1531 Some(oid) => oid,
1532 None => {
1533 log::info!("No order status report found for {client_order_id}");
1534 return Ok(None);
1535 }
1536 }
1537 }
1538 None => unreachable!("cmd must carry at least one identifier"),
1539 },
1540 };
1541
1542 let report = self
1543 .http_client
1544 .request_order_status_report(&account_address, oid)
1545 .await
1546 .context("failed to generate order status report")?;
1547
1548 if report.is_some() {
1549 log::info!("Generated order status report for oid {oid}");
1550 } else {
1551 log::info!("No order status report found for oid {oid}");
1552 }
1553 Ok(report)
1554 }
1555
1556 async fn generate_order_status_reports(
1557 &self,
1558 cmd: &GenerateOrderStatusReports,
1559 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1560 let account_address = self.get_account_address()?;
1561
1562 let reports = self
1563 .http_client
1564 .request_order_status_reports(&account_address, cmd.instrument_id)
1565 .await
1566 .context("failed to generate order status reports")?;
1567
1568 let reports = if cmd.open_only {
1570 reports
1571 .into_iter()
1572 .filter(|r| r.order_status.is_open())
1573 .collect()
1574 } else {
1575 reports
1576 };
1577
1578 let reports = match (cmd.start, cmd.end) {
1580 (Some(start), Some(end)) => reports
1581 .into_iter()
1582 .filter(|r| r.ts_last >= start && r.ts_last <= end)
1583 .collect(),
1584 (Some(start), None) => reports.into_iter().filter(|r| r.ts_last >= start).collect(),
1585 (None, Some(end)) => reports.into_iter().filter(|r| r.ts_last <= end).collect(),
1586 (None, None) => reports,
1587 };
1588
1589 log::info!("Generated {} order status reports", reports.len());
1590 Ok(reports)
1591 }
1592
1593 async fn generate_fill_reports(
1594 &self,
1595 cmd: GenerateFillReports,
1596 ) -> anyhow::Result<Vec<FillReport>> {
1597 let account_address = self.get_account_address()?;
1598
1599 let reports = self
1600 .http_client
1601 .request_fill_reports(&account_address, cmd.instrument_id)
1602 .await
1603 .context("failed to generate fill reports")?;
1604
1605 let reports = if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1607 reports
1608 .into_iter()
1609 .filter(|r| r.ts_event >= start && r.ts_event <= end)
1610 .collect()
1611 } else if let Some(start) = cmd.start {
1612 reports
1613 .into_iter()
1614 .filter(|r| r.ts_event >= start)
1615 .collect()
1616 } else if let Some(end) = cmd.end {
1617 reports.into_iter().filter(|r| r.ts_event <= end).collect()
1618 } else {
1619 reports
1620 };
1621
1622 log::info!("Generated {} fill reports", reports.len());
1623 Ok(reports)
1624 }
1625
1626 async fn generate_position_status_reports(
1627 &self,
1628 cmd: &GeneratePositionStatusReports,
1629 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1630 let account_address = self.get_account_address()?;
1631
1632 let reports = self
1634 .http_client
1635 .request_position_status_reports(&account_address, cmd.instrument_id)
1636 .await
1637 .context("failed to generate position status reports")?;
1638
1639 log::info!("Generated {} position status reports", reports.len());
1640 Ok(reports)
1641 }
1642
1643 async fn generate_mass_status(
1644 &self,
1645 lookback_mins: Option<u64>,
1646 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1647 let ts_init = self.clock.get_time_ns();
1648
1649 let order_cmd = GenerateOrderStatusReports::new(
1650 UUID4::new(),
1651 ts_init,
1652 true, None,
1654 None,
1655 None,
1656 None,
1657 None,
1658 );
1659 let fill_cmd =
1660 GenerateFillReports::new(UUID4::new(), ts_init, None, None, None, None, None, None);
1661 let position_cmd =
1662 GeneratePositionStatusReports::new(UUID4::new(), ts_init, None, None, None, None, None);
1663
1664 let order_reports = self.generate_order_status_reports(&order_cmd).await?;
1665 let mut fill_reports = self.generate_fill_reports(fill_cmd).await?;
1666 let position_reports = self.generate_position_status_reports(&position_cmd).await?;
1667
1668 if let Some(mins) = lookback_mins {
1671 let cutoff_ns = ts_init
1672 .as_u64()
1673 .saturating_sub(mins.saturating_mul(60).saturating_mul(1_000_000_000));
1674 let cutoff = UnixNanos::from(cutoff_ns);
1675
1676 fill_reports.retain(|r| r.ts_event >= cutoff);
1677 }
1678
1679 let mut mass_status = ExecutionMassStatus::new(
1680 self.core.client_id,
1681 self.core.account_id,
1682 self.core.venue,
1683 ts_init,
1684 None,
1685 );
1686 mass_status.add_order_reports(order_reports);
1687 mass_status.add_fill_reports(fill_reports);
1688 mass_status.add_position_reports(position_reports);
1689
1690 log::info!(
1691 "Generated mass status: {} orders, {} fills, {} positions",
1692 mass_status.order_reports().len(),
1693 mass_status.fill_reports().len(),
1694 mass_status.position_reports().len(),
1695 );
1696
1697 Ok(Some(mass_status))
1698 }
1699}
1700
1701impl HyperliquidExecutionClient {
1702 async fn start_ws_stream(&mut self) -> anyhow::Result<()> {
1703 {
1704 let handle_guard = self.ws_stream_handle.lock().expect(MUTEX_POISONED);
1705 if handle_guard.is_some() {
1706 return Ok(());
1707 }
1708 }
1709
1710 let user_address = self.get_user_address()?;
1711
1712 let subscription_address = self
1715 .config
1716 .account_address
1717 .as_ref()
1718 .or(self.config.vault_address.as_ref())
1719 .unwrap_or(&user_address)
1720 .clone();
1721
1722 let mut ws_client = self.ws_client.clone();
1723
1724 let instruments = self
1725 .http_client
1726 .request_instruments()
1727 .await
1728 .unwrap_or_default();
1729
1730 for instrument in instruments {
1731 ws_client.cache_instrument(instrument);
1732 }
1733
1734 ws_client.connect().await?;
1736 ws_client
1737 .subscribe_order_updates(&subscription_address)
1738 .await?;
1739 ws_client
1740 .subscribe_user_events(&subscription_address)
1741 .await?;
1742 log::info!("Subscribed to Hyperliquid execution updates for {subscription_address}");
1743
1744 if let Some(handle) = ws_client.take_task_handle() {
1746 self.ws_client.set_task_handle(handle);
1747 }
1748
1749 let emitter = self.emitter.clone();
1750 let dispatch_state = self.ws_dispatch_state.clone();
1751 let clock = self.clock;
1752 let runtime = get_runtime();
1753 let handle = runtime.spawn(async move {
1754 let mut pending_filled_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
1765
1766 loop {
1767 let event = ws_client.next_event().await;
1768
1769 match event {
1770 Some(msg) => match msg {
1771 NautilusWsMessage::ExecutionReports(reports) => {
1772 for report in reports {
1773 handle_execution_report(
1774 report,
1775 &dispatch_state,
1776 &emitter,
1777 &ws_client,
1778 &mut pending_filled_cloids,
1779 clock.get_time_ns(),
1780 );
1781 }
1782 }
1783 NautilusWsMessage::Reconnected => {}
1786 NautilusWsMessage::Error(e) => {
1787 log::error!("WebSocket error: {e}");
1788 }
1789 NautilusWsMessage::Trades(_)
1791 | NautilusWsMessage::Quote(_)
1792 | NautilusWsMessage::Deltas(_)
1793 | NautilusWsMessage::Depth10(_)
1794 | NautilusWsMessage::Candle(_)
1795 | NautilusWsMessage::MarkPrice(_)
1796 | NautilusWsMessage::IndexPrice(_)
1797 | NautilusWsMessage::FundingRate(_) => {}
1798 },
1799 None => {
1800 log::debug!("WebSocket next_event returned None, stream closed");
1801 break;
1802 }
1803 }
1804 }
1805 });
1806
1807 *self.ws_stream_handle.lock().expect(MUTEX_POISONED) = Some(handle);
1808 log::info!("Hyperliquid WebSocket execution stream started");
1809 Ok(())
1810 }
1811}
1812
1813struct CancelEntry {
1818 strategy_id: StrategyId,
1819 instrument_id: InstrumentId,
1820 client_order_id: ClientOrderId,
1821 venue_order_id: Option<VenueOrderId>,
1822 symbol: String,
1823}
1824
1825fn register_order_identity_into(state: &WsDispatchState, order: &OrderAny) {
1830 if order.is_quote_quantity() {
1831 return;
1832 }
1833 state.register_identity(
1834 order.client_order_id(),
1835 OrderIdentity {
1836 strategy_id: order.strategy_id(),
1837 instrument_id: order.instrument_id(),
1838 order_side: order.order_side(),
1839 order_type: order.order_type(),
1840 quantity: order.quantity(),
1841 price: order.price(),
1842 },
1843 );
1844}
1845
1846fn handle_execution_report(
1853 report: ExecutionReport,
1854 dispatch_state: &WsDispatchState,
1855 emitter: &ExecutionEventEmitter,
1856 ws_client: &HyperliquidWebSocketClient,
1857 pending_filled_cloids: &mut FifoCache<ClientOrderId, 10_000>,
1858 ts_init: UnixNanos,
1859) {
1860 match report {
1861 ExecutionReport::Order(order_report) => {
1862 let is_filled_marker = matches!(order_report.order_status, OrderStatus::Filled);
1863 let is_open = order_report.order_status.is_open();
1864 let client_order_id = order_report.client_order_id;
1865
1866 let outcome =
1867 dispatch_order_status_report(&order_report, dispatch_state, emitter, ts_init);
1868
1869 if outcome == DispatchOutcome::External {
1870 emitter.send_order_status_report(order_report);
1871 }
1872
1873 if let Some(id) = client_order_id
1885 && !is_open
1886 {
1887 match outcome {
1888 DispatchOutcome::Skip => {}
1889 DispatchOutcome::Tracked if is_filled_marker => {
1890 pending_filled_cloids.add(id);
1891 }
1892 DispatchOutcome::Tracked | DispatchOutcome::External => {
1893 let cloid = Cloid::from_client_order_id(id);
1894 ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
1895 }
1896 }
1897 }
1898 }
1899 ExecutionReport::Fill(fill_report) => {
1900 let client_order_id = fill_report.client_order_id;
1901
1902 let outcome = dispatch_fill_report(&fill_report, dispatch_state, emitter, ts_init);
1903
1904 if outcome == DispatchOutcome::External {
1905 emitter.send_fill_report(fill_report);
1906 }
1907
1908 if let Some(id) = client_order_id
1911 && pending_filled_cloids.contains(&id)
1912 {
1913 pending_filled_cloids.remove(&id);
1914 let cloid = Cloid::from_client_order_id(id);
1915 ws_client.remove_cloid_mapping(&Ustr::from(&cloid.to_hex()));
1916 }
1917 }
1918 }
1919}
1920
1921use crate::common::parse::determine_order_list_grouping;
1922
1923#[cfg(test)]
1924mod tests {
1925 use nautilus_common::messages::ExecutionEvent;
1926 use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
1927 use nautilus_live::ExecutionEventEmitter;
1928 use nautilus_model::{
1929 enums::{
1930 AccountType, ContingencyType, LiquiditySide, OrderSide, OrderStatus, OrderType,
1931 TimeInForce, TriggerType,
1932 },
1933 events::OrderEventAny,
1934 identifiers::{
1935 AccountId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, VenueOrderId,
1936 },
1937 orders::{OrderAny, limit::LimitOrder, stop_market::StopMarketOrder},
1938 reports::{FillReport, OrderStatusReport},
1939 types::{Currency, Money, Price, Quantity},
1940 };
1941 use nautilus_network::websocket::TransportBackend;
1942 use rstest::rstest;
1943 use ustr::Ustr;
1944
1945 use super::{
1946 Cloid, ExecutionReport, FifoCache, HyperliquidWebSocketClient, OrderIdentity,
1947 WsDispatchState, determine_order_list_grouping, handle_execution_report,
1948 register_order_identity_into,
1949 };
1950 use crate::{common::enums::HyperliquidEnvironment, http::models::HyperliquidExecGrouping};
1951
1952 const TEST_INSTRUMENT_ID: &str = "BTC-USD-PERP.HYPERLIQUID";
1953
1954 fn test_emitter() -> (
1955 ExecutionEventEmitter,
1956 tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1957 ) {
1958 let clock = get_atomic_clock_realtime();
1959 let mut emitter = ExecutionEventEmitter::new(
1960 clock,
1961 TraderId::from("TESTER-001"),
1962 AccountId::from("HYPERLIQUID-001"),
1963 AccountType::Margin,
1964 None,
1965 );
1966 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1967 emitter.set_sender(tx);
1968 (emitter, rx)
1969 }
1970
1971 fn drain_events(
1972 rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
1973 ) -> Vec<ExecutionEvent> {
1974 let mut out = Vec::new();
1975 while let Ok(e) = rx.try_recv() {
1976 out.push(e);
1977 }
1978 out
1979 }
1980
1981 fn make_ws_client() -> HyperliquidWebSocketClient {
1982 HyperliquidWebSocketClient::new(
1986 Some("wss://test.invalid".to_string()),
1987 HyperliquidEnvironment::Testnet,
1988 None,
1989 TransportBackend::default(),
1990 None,
1991 )
1992 }
1993
1994 fn test_identity() -> OrderIdentity {
1995 OrderIdentity {
1996 strategy_id: StrategyId::from("S-001"),
1997 instrument_id: InstrumentId::from(TEST_INSTRUMENT_ID),
1998 order_side: OrderSide::Buy,
1999 order_type: OrderType::Limit,
2000 quantity: Quantity::from("0.0001"),
2001 price: Some(Price::from("56730.0")),
2002 }
2003 }
2004
2005 fn make_status_report(
2006 client_order_id: Option<&str>,
2007 venue_order_id: &str,
2008 status: OrderStatus,
2009 ) -> OrderStatusReport {
2010 OrderStatusReport::new(
2011 AccountId::from("HYPERLIQUID-001"),
2012 InstrumentId::from(TEST_INSTRUMENT_ID),
2013 client_order_id.map(ClientOrderId::new),
2014 VenueOrderId::new(venue_order_id),
2015 OrderSide::Buy,
2016 OrderType::Limit,
2017 TimeInForce::Gtc,
2018 status,
2019 Quantity::from("0.0001"),
2020 Quantity::from("0"),
2021 UnixNanos::default(),
2022 UnixNanos::default(),
2023 UnixNanos::default(),
2024 Some(UUID4::new()),
2025 )
2026 .with_price(Price::from("56730.0"))
2027 }
2028
2029 fn make_fill_report(
2030 client_order_id: Option<&str>,
2031 venue_order_id: &str,
2032 trade_id: &str,
2033 ) -> FillReport {
2034 FillReport::new(
2035 AccountId::from("HYPERLIQUID-001"),
2036 InstrumentId::from(TEST_INSTRUMENT_ID),
2037 VenueOrderId::new(venue_order_id),
2038 TradeId::new(trade_id),
2039 OrderSide::Buy,
2040 Quantity::from("0.0001"),
2041 Price::from("56730.0"),
2042 Money::new(0.0, Currency::USD()),
2043 LiquiditySide::Taker,
2044 client_order_id.map(ClientOrderId::new),
2045 None,
2046 UnixNanos::default(),
2047 UnixNanos::default(),
2048 Some(UUID4::new()),
2049 )
2050 }
2051
2052 fn cloid_for(id: &str) -> Ustr {
2053 let cloid = Cloid::from_client_order_id(ClientOrderId::from(id));
2054 Ustr::from(&cloid.to_hex())
2055 }
2056
2057 fn limit_order(
2058 id: &str,
2059 reduce_only: bool,
2060 contingency: ContingencyType,
2061 linked_ids: Option<Vec<&str>>,
2062 parent_id: Option<&str>,
2063 ) -> OrderAny {
2064 OrderAny::Limit(LimitOrder::new(
2065 TraderId::from("TESTER-001"),
2066 StrategyId::from("S-001"),
2067 InstrumentId::from("ETH-USD-PERP.HYPERLIQUID"),
2068 ClientOrderId::from(id),
2069 OrderSide::Buy,
2070 Quantity::from(1),
2071 Price::from("3000.00"),
2072 TimeInForce::Gtc,
2073 None, false, reduce_only,
2076 false, None, None, None, Some(contingency),
2081 None, linked_ids.map(|ids| ids.into_iter().map(ClientOrderId::from).collect()),
2083 parent_id.map(ClientOrderId::from),
2084 None, None, None, None, Default::default(),
2089 Default::default(),
2090 ))
2091 }
2092
2093 fn stop_order(
2094 id: &str,
2095 reduce_only: bool,
2096 contingency: ContingencyType,
2097 linked_ids: Option<Vec<&str>>,
2098 parent_id: Option<&str>,
2099 ) -> OrderAny {
2100 OrderAny::StopMarket(StopMarketOrder::new(
2101 TraderId::from("TESTER-001"),
2102 StrategyId::from("S-001"),
2103 InstrumentId::from("ETH-USD-PERP.HYPERLIQUID"),
2104 ClientOrderId::from(id),
2105 OrderSide::Sell,
2106 Quantity::from(1),
2107 Price::from("2800.00"),
2108 TriggerType::LastPrice,
2109 TimeInForce::Gtc,
2110 None, reduce_only,
2112 false, None, None, None, Some(contingency),
2117 None, linked_ids.map(|ids| ids.into_iter().map(ClientOrderId::from).collect()),
2119 parent_id.map(ClientOrderId::from),
2120 None, None, None, None, Default::default(),
2125 Default::default(),
2126 ))
2127 }
2128
2129 #[rstest]
2130 #[case::independent_orders(
2131 vec![
2132 limit_order("O-001", false, ContingencyType::NoContingency, None, None),
2133 limit_order("O-002", false, ContingencyType::NoContingency, None, None),
2134 ],
2135 HyperliquidExecGrouping::Na,
2136 )]
2137 #[case::bracket_oto(
2138 vec![
2139 limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002", "O-003"]), None),
2140 limit_order("O-002", true, ContingencyType::Oco, Some(vec!["O-003"]), Some("O-001")),
2141 stop_order("O-003", true, ContingencyType::Oco, Some(vec!["O-002"]), Some("O-001")),
2142 ],
2143 HyperliquidExecGrouping::NormalTpsl,
2144 )]
2145 #[case::oto_not_bracket_shaped(
2146 vec![
2147 limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002"]), None),
2148 limit_order("O-002", false, ContingencyType::Oto, Some(vec!["O-001"]), None),
2149 ],
2150 HyperliquidExecGrouping::Na,
2151 )]
2152 #[case::oco_all_reduce_only(
2153 vec![
2154 limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-002"]), None),
2155 stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-001"]), None),
2156 ],
2157 HyperliquidExecGrouping::PositionTpsl,
2158 )]
2159 #[case::oco_not_all_reduce_only(
2160 vec![
2161 limit_order("O-001", false, ContingencyType::Oco, Some(vec!["O-002"]), None),
2162 stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-001"]), None),
2163 ],
2164 HyperliquidExecGrouping::Na,
2165 )]
2166 #[case::oto_with_non_oco_children(
2167 vec![
2168 limit_order("O-001", false, ContingencyType::Oto, Some(vec!["O-002", "O-003"]), None),
2169 limit_order("O-002", true, ContingencyType::NoContingency, None, None),
2170 stop_order("O-003", true, ContingencyType::NoContingency, None, None),
2171 ],
2172 HyperliquidExecGrouping::Na,
2173 )]
2174 #[case::mixed_oco_and_plain_reduce_only(
2175 vec![
2176 limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-002"]), None),
2177 stop_order("O-002", true, ContingencyType::NoContingency, None, None),
2178 ],
2179 HyperliquidExecGrouping::Na,
2180 )]
2181 #[case::unlinked_oco_reduce_only(
2182 vec![
2183 limit_order("O-001", true, ContingencyType::Oco, Some(vec!["O-099"]), None),
2184 stop_order("O-002", true, ContingencyType::Oco, Some(vec!["O-098"]), None),
2185 ],
2186 HyperliquidExecGrouping::Na,
2187 )]
2188 #[case::single_order(
2189 vec![limit_order("O-001", false, ContingencyType::NoContingency, None, None)],
2190 HyperliquidExecGrouping::Na,
2191 )]
2192 fn test_determine_order_list_grouping(
2193 #[case] orders: Vec<OrderAny>,
2194 #[case] expected: HyperliquidExecGrouping,
2195 ) {
2196 let result = determine_order_list_grouping(&orders);
2197 assert_eq!(result, expected);
2198 }
2199
2200 fn limit_order_with_quote_quantity(id: &str, quote_quantity: bool) -> OrderAny {
2201 OrderAny::Limit(LimitOrder::new(
2202 TraderId::from("TESTER-001"),
2203 StrategyId::from("S-001"),
2204 InstrumentId::from(TEST_INSTRUMENT_ID),
2205 ClientOrderId::from(id),
2206 OrderSide::Buy,
2207 Quantity::from("0.0001"),
2208 Price::from("56730.0"),
2209 TimeInForce::Gtc,
2210 None,
2211 false,
2212 false,
2213 quote_quantity,
2214 None,
2215 None,
2216 None,
2217 Some(ContingencyType::NoContingency),
2218 None,
2219 None,
2220 None,
2221 None,
2222 None,
2223 None,
2224 None,
2225 Default::default(),
2226 Default::default(),
2227 ))
2228 }
2229
2230 #[rstest]
2231 fn test_register_order_identity_registers_regular_order() {
2232 let state = WsDispatchState::new();
2233 let order = limit_order_with_quote_quantity("O-REG-001", false);
2234
2235 register_order_identity_into(&state, &order);
2236
2237 let found = state
2238 .lookup_identity(&ClientOrderId::from("O-REG-001"))
2239 .expect("identity should be registered");
2240 assert_eq!(found.strategy_id, StrategyId::from("S-001"));
2241 assert_eq!(found.instrument_id, InstrumentId::from(TEST_INSTRUMENT_ID));
2242 assert_eq!(found.order_side, OrderSide::Buy);
2243 assert_eq!(found.order_type, OrderType::Limit);
2244 assert_eq!(found.quantity, Quantity::from("0.0001"));
2245 assert_eq!(found.price, Some(Price::from("56730.0")));
2246 }
2247
2248 #[rstest]
2249 fn test_register_order_identity_skips_quote_quantity_order() {
2250 let state = WsDispatchState::new();
2251 let order = limit_order_with_quote_quantity("O-QQ-001", true);
2252
2253 register_order_identity_into(&state, &order);
2254
2255 assert!(
2260 state
2261 .lookup_identity(&ClientOrderId::from("O-QQ-001"))
2262 .is_none()
2263 );
2264 }
2265
2266 #[rstest]
2267 fn test_handle_execution_report_skip_keeps_cloid_mapping() {
2268 let ws_client = make_ws_client();
2273 let (emitter, mut rx) = test_emitter();
2274 let state = WsDispatchState::new();
2275 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2276
2277 let cid = ClientOrderId::from("O-HER-SKIP");
2278 state.register_identity(cid, test_identity());
2279 state.insert_accepted(cid);
2281 state.record_venue_order_id(cid, VenueOrderId::new("new-voi"));
2282
2283 ws_client.cache_cloid_mapping(cloid_for("O-HER-SKIP"), cid);
2284
2285 let stale_cancel = make_status_report(Some("O-HER-SKIP"), "old-voi", OrderStatus::Canceled);
2286 handle_execution_report(
2287 ExecutionReport::Order(stale_cancel),
2288 &state,
2289 &emitter,
2290 &ws_client,
2291 &mut pending_cloids,
2292 UnixNanos::default(),
2293 );
2294
2295 assert!(drain_events(&mut rx).is_empty());
2296 assert_eq!(
2298 ws_client.get_cloid_mapping(&cloid_for("O-HER-SKIP")),
2299 Some(cid)
2300 );
2301 assert!(state.lookup_identity(&cid).is_some());
2303 }
2304
2305 #[rstest]
2306 fn test_handle_execution_report_tracked_terminal_evicts_cloid() {
2307 let ws_client = make_ws_client();
2311 let (emitter, mut rx) = test_emitter();
2312 let state = WsDispatchState::new();
2313 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2314
2315 let cid = ClientOrderId::from("O-HER-CANCEL");
2316 state.register_identity(cid, test_identity());
2317 state.insert_accepted(cid);
2318 state.record_venue_order_id(cid, VenueOrderId::new("v-cancel"));
2319
2320 ws_client.cache_cloid_mapping(cloid_for("O-HER-CANCEL"), cid);
2321
2322 let report = make_status_report(Some("O-HER-CANCEL"), "v-cancel", OrderStatus::Canceled);
2323 handle_execution_report(
2324 ExecutionReport::Order(report),
2325 &state,
2326 &emitter,
2327 &ws_client,
2328 &mut pending_cloids,
2329 UnixNanos::default(),
2330 );
2331
2332 let events = drain_events(&mut rx);
2333 assert_eq!(events.len(), 1);
2334 assert!(matches!(
2335 events[0],
2336 ExecutionEvent::Order(OrderEventAny::Canceled(_))
2337 ));
2338 assert_eq!(
2339 ws_client.get_cloid_mapping(&cloid_for("O-HER-CANCEL")),
2340 None
2341 );
2342 assert!(state.filled_orders.contains(&cid));
2343 }
2344
2345 #[rstest]
2346 fn test_handle_execution_report_filled_marker_then_fill_evicts_on_fill() {
2347 let ws_client = make_ws_client();
2351 let (emitter, mut rx) = test_emitter();
2352 let state = WsDispatchState::new();
2353 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2354
2355 let cid = ClientOrderId::from("O-HER-FILL");
2356 state.register_identity(cid, test_identity());
2357 state.insert_accepted(cid);
2358 state.record_venue_order_id(cid, VenueOrderId::new("v-fill"));
2359
2360 ws_client.cache_cloid_mapping(cloid_for("O-HER-FILL"), cid);
2361
2362 let status_marker = make_status_report(Some("O-HER-FILL"), "v-fill", OrderStatus::Filled);
2363 handle_execution_report(
2364 ExecutionReport::Order(status_marker),
2365 &state,
2366 &emitter,
2367 &ws_client,
2368 &mut pending_cloids,
2369 UnixNanos::default(),
2370 );
2371
2372 assert!(drain_events(&mut rx).is_empty());
2374 assert_eq!(
2375 ws_client.get_cloid_mapping(&cloid_for("O-HER-FILL")),
2376 Some(cid)
2377 );
2378
2379 let fill = make_fill_report(Some("O-HER-FILL"), "v-fill", "trade-fill");
2380 handle_execution_report(
2381 ExecutionReport::Fill(fill),
2382 &state,
2383 &emitter,
2384 &ws_client,
2385 &mut pending_cloids,
2386 UnixNanos::default(),
2387 );
2388
2389 let events = drain_events(&mut rx);
2390 assert_eq!(events.len(), 1);
2391 assert!(matches!(
2392 events[0],
2393 ExecutionEvent::Order(OrderEventAny::Filled(_))
2394 ));
2395 assert_eq!(ws_client.get_cloid_mapping(&cloid_for("O-HER-FILL")), None);
2397 }
2398
2399 #[rstest]
2400 fn test_handle_execution_report_external_terminal_evicts_cloid() {
2401 let ws_client = make_ws_client();
2405 let (emitter, mut rx) = test_emitter();
2406 let state = WsDispatchState::new();
2407 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2408
2409 let cid = ClientOrderId::from("O-HER-EXT");
2410 ws_client.cache_cloid_mapping(cloid_for("O-HER-EXT"), cid);
2411
2412 let report = make_status_report(Some("O-HER-EXT"), "v-ext", OrderStatus::Canceled);
2413 handle_execution_report(
2414 ExecutionReport::Order(report),
2415 &state,
2416 &emitter,
2417 &ws_client,
2418 &mut pending_cloids,
2419 UnixNanos::default(),
2420 );
2421
2422 let events = drain_events(&mut rx);
2423 assert_eq!(events.len(), 1);
2424 assert!(
2425 matches!(events[0], ExecutionEvent::Report(_)),
2426 "external terminal report should forward to the engine as a report",
2427 );
2428 assert_eq!(ws_client.get_cloid_mapping(&cloid_for("O-HER-EXT")), None);
2429 }
2430
2431 #[rstest]
2432 fn test_handle_execution_report_open_status_preserves_cloid() {
2433 let ws_client = make_ws_client();
2435 let (emitter, _rx) = test_emitter();
2436 let state = WsDispatchState::new();
2437 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2438
2439 let cid = ClientOrderId::from("O-HER-OPEN");
2440 state.register_identity(cid, test_identity());
2441 ws_client.cache_cloid_mapping(cloid_for("O-HER-OPEN"), cid);
2442
2443 let report = make_status_report(Some("O-HER-OPEN"), "v-open", OrderStatus::Accepted);
2444 handle_execution_report(
2445 ExecutionReport::Order(report),
2446 &state,
2447 &emitter,
2448 &ws_client,
2449 &mut pending_cloids,
2450 UnixNanos::default(),
2451 );
2452
2453 assert_eq!(
2455 ws_client.get_cloid_mapping(&cloid_for("O-HER-OPEN")),
2456 Some(cid)
2457 );
2458 }
2459
2460 #[rstest]
2461 fn test_handle_execution_report_tracked_accepted_emits_typed_event() {
2462 let ws_client = make_ws_client();
2466 let (emitter, mut rx) = test_emitter();
2467 let state = WsDispatchState::new();
2468 let mut pending_cloids: FifoCache<ClientOrderId, 10_000> = FifoCache::new();
2469
2470 let cid = ClientOrderId::from("O-HER-ACC");
2471 state.register_identity(cid, test_identity());
2472 ws_client.cache_cloid_mapping(cloid_for("O-HER-ACC"), cid);
2473
2474 let report = make_status_report(Some("O-HER-ACC"), "v-acc", OrderStatus::Accepted);
2475 handle_execution_report(
2476 ExecutionReport::Order(report),
2477 &state,
2478 &emitter,
2479 &ws_client,
2480 &mut pending_cloids,
2481 UnixNanos::default(),
2482 );
2483
2484 let events = drain_events(&mut rx);
2485 assert_eq!(events.len(), 1);
2486 assert!(
2487 matches!(events[0], ExecutionEvent::Order(OrderEventAny::Accepted(_))),
2488 "tracked accepted should route through the typed-event path",
2489 );
2490 assert_eq!(
2492 ws_client.get_cloid_mapping(&cloid_for("O-HER-ACC")),
2493 Some(cid)
2494 );
2495 }
2496}