1use std::{
17 str::FromStr,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21 },
22};
23
24use ahash::{AHashMap, AHashSet};
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_common::live::get_runtime;
29use nautilus_core::AtomicMap;
30use nautilus_model::{
31 data::BarType,
32 identifiers::{AccountId, ClientOrderId, InstrumentId},
33 instruments::{Instrument, InstrumentAny},
34};
35use nautilus_network::{
36 mode::ConnectionMode,
37 websocket::{
38 AuthTracker, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
39 channel_message_handler,
40 },
41};
42use ustr::Ustr;
43
44use crate::{
45 common::{
46 consts::ws_url,
47 enums::{HyperliquidBarInterval, HyperliquidEnvironment},
48 parse::bar_type_to_interval,
49 },
50 websocket::{
51 enums::HyperliquidWsChannel,
52 handler::{FeedHandler, HandlerCommand},
53 messages::{NautilusWsMessage, SubscriptionRequest},
54 },
55};
56
57const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
61pub(super) enum AssetContextDataType {
62 MarkPrice,
63 IndexPrice,
64 FundingRate,
65}
66
67#[derive(Debug)]
72#[cfg_attr(
73 feature = "python",
74 pyo3::pyclass(
75 module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
76 from_py_object
77 )
78)]
79#[cfg_attr(
80 feature = "python",
81 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.hyperliquid")
82)]
83pub struct HyperliquidWebSocketClient {
84 url: String,
85 connection_mode: Arc<ArcSwap<AtomicU8>>,
86 signal: Arc<AtomicBool>,
87 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
88 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
89 auth_tracker: AuthTracker,
90 subscriptions: SubscriptionState,
91 instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
92 bar_types: Arc<AtomicMap<String, BarType>>,
93 asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
94 cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
95 task_handle: Option<tokio::task::JoinHandle<()>>,
96 account_id: Option<AccountId>,
97 transport_backend: TransportBackend,
98 proxy_url: Option<String>,
99}
100
101impl Clone for HyperliquidWebSocketClient {
102 fn clone(&self) -> Self {
103 Self {
104 url: self.url.clone(),
105 connection_mode: Arc::clone(&self.connection_mode),
106 signal: Arc::clone(&self.signal),
107 cmd_tx: Arc::clone(&self.cmd_tx),
108 out_rx: None,
109 auth_tracker: self.auth_tracker.clone(),
110 subscriptions: self.subscriptions.clone(),
111 instruments: Arc::clone(&self.instruments),
112 bar_types: Arc::clone(&self.bar_types),
113 asset_context_subs: Arc::clone(&self.asset_context_subs),
114 cloid_cache: Arc::clone(&self.cloid_cache),
115 task_handle: None,
116 account_id: self.account_id,
117 transport_backend: self.transport_backend,
118 proxy_url: self.proxy_url.clone(),
119 }
120 }
121}
122
123impl HyperliquidWebSocketClient {
124 pub fn new(
132 url: Option<String>,
133 environment: HyperliquidEnvironment,
134 account_id: Option<AccountId>,
135 transport_backend: TransportBackend,
136 proxy_url: Option<String>,
137 ) -> Self {
138 let url = url.unwrap_or_else(|| ws_url(environment).to_string());
139 let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
140 ConnectionMode::Closed as u8,
141 ))));
142 Self {
143 url,
144 connection_mode,
145 signal: Arc::new(AtomicBool::new(false)),
146 auth_tracker: AuthTracker::new(),
147 subscriptions: SubscriptionState::new(':'),
148 instruments: Arc::new(AtomicMap::new()),
149 bar_types: Arc::new(AtomicMap::new()),
150 asset_context_subs: Arc::new(DashMap::new()),
151 cloid_cache: Arc::new(DashMap::new()),
152 cmd_tx: {
153 let (tx, _) = tokio::sync::mpsc::unbounded_channel();
155 Arc::new(tokio::sync::RwLock::new(tx))
156 },
157 out_rx: None,
158 task_handle: None,
159 account_id,
160 transport_backend,
161 proxy_url,
162 }
163 }
164
165 pub async fn connect(&mut self) -> anyhow::Result<()> {
167 if self.is_active() {
168 log::warn!("WebSocket already connected");
169 return Ok(());
170 }
171 let (message_handler, raw_rx) = channel_message_handler();
172 let cfg = WebSocketConfig {
173 url: self.url.clone(),
174 headers: vec![],
175 heartbeat: Some(30),
176 heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
177 reconnect_timeout_ms: Some(15_000),
178 reconnect_delay_initial_ms: Some(250),
179 reconnect_delay_max_ms: Some(5_000),
180 reconnect_backoff_factor: Some(2.0),
181 reconnect_jitter_ms: Some(200),
182 reconnect_max_attempts: None,
183 idle_timeout_ms: None,
184 backend: self.transport_backend,
185 proxy_url: self.proxy_url.clone(),
186 };
187 let client =
188 WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
189
190 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
192 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
193
194 *self.cmd_tx.write().await = cmd_tx.clone();
197 self.out_rx = Some(out_rx);
198
199 self.connection_mode.store(client.connection_mode_atomic());
200 log::info!("Hyperliquid WebSocket connected: {}", self.url);
201
202 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
204 anyhow::bail!("Failed to send SetClient command: {e}");
205 }
206
207 let instruments_vec: Vec<InstrumentAny> =
209 self.instruments.load().values().cloned().collect();
210
211 if !instruments_vec.is_empty()
212 && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
213 {
214 log::error!("Failed to send InitializeInstruments: {e}");
215 }
216
217 let signal = Arc::clone(&self.signal);
219 let account_id = self.account_id;
220 let subscriptions = self.subscriptions.clone();
221 let cmd_tx_for_reconnect = cmd_tx.clone();
222 let cloid_cache = Arc::clone(&self.cloid_cache);
223
224 let stream_handle = get_runtime().spawn(async move {
225 let mut handler = FeedHandler::new(
226 signal,
227 cmd_rx,
228 raw_rx,
229 out_tx,
230 account_id,
231 subscriptions.clone(),
232 cloid_cache,
233 );
234
235 let resubscribe_all = || {
236 let topics = subscriptions.all_topics();
237 if topics.is_empty() {
238 log::debug!("No active subscriptions to restore after reconnection");
239 return;
240 }
241
242 log::info!(
243 "Resubscribing to {} active subscriptions after reconnection",
244 topics.len()
245 );
246
247 for topic in topics {
248 match subscription_from_topic(&topic) {
249 Ok(subscription) => {
250 if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
251 subscriptions: vec![subscription],
252 }) {
253 log::error!("Failed to send resubscribe command: {e}");
254 }
255 }
256 Err(e) => {
257 log::error!(
258 "Failed to reconstruct subscription from topic: topic={topic}, {e}"
259 );
260 }
261 }
262 }
263 };
264
265 loop {
266 match handler.next().await {
267 Some(NautilusWsMessage::Reconnected) => {
268 log::info!("WebSocket reconnected");
269 resubscribe_all();
270 }
271 Some(msg) => {
272 if handler.send(msg).is_err() {
273 log::error!("Failed to send message (receiver dropped)");
274 break;
275 }
276 }
277 None => {
278 if handler.is_stopped() {
279 log::debug!("Stop signal received, ending message processing");
280 break;
281 }
282 log::warn!("WebSocket stream ended unexpectedly");
283 break;
284 }
285 }
286 }
287 log::debug!("Handler task completed");
288 });
289 self.task_handle = Some(stream_handle);
290 Ok(())
291 }
292
293 pub fn take_task_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
296 self.task_handle.take()
297 }
298
299 pub fn set_task_handle(&mut self, handle: tokio::task::JoinHandle<()>) {
300 self.task_handle = Some(handle);
301 }
302
303 pub(crate) fn abort(&mut self) {
306 self.signal.store(true, Ordering::Relaxed);
307 self.connection_mode
308 .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
309
310 if let Some(handle) = self.task_handle.take() {
311 handle.abort();
312 }
313 }
314
315 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
317 log::info!("Disconnecting Hyperliquid WebSocket");
318 self.signal.store(true, Ordering::Relaxed);
319
320 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
321 log::debug!(
322 "Failed to send disconnect command (handler may already be shut down): {e}"
323 );
324 }
325
326 if let Some(handle) = self.task_handle.take() {
327 log::debug!("Waiting for task handle to complete");
328 let abort_handle = handle.abort_handle();
329 tokio::select! {
330 result = handle => {
331 match result {
332 Ok(()) => log::debug!("Task handle completed successfully"),
333 Err(e) if e.is_cancelled() => {
334 log::debug!("Task was cancelled");
335 }
336 Err(e) => log::error!("Task handle encountered an error: {e:?}"),
337 }
338 }
339 () = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
340 log::warn!("Timeout waiting for task handle, aborting task");
341 abort_handle.abort();
342 }
343 }
344 } else {
345 log::debug!("No task handle to await");
346 }
347 log::debug!("Disconnected");
348 Ok(())
349 }
350
351 pub fn is_active(&self) -> bool {
353 let mode = self.connection_mode.load();
354 mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
355 }
356
357 pub fn url(&self) -> &str {
359 &self.url
360 }
361
362 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
369 let mut map = AHashMap::new();
370
371 for inst in instruments {
372 let coin = inst.raw_symbol().inner();
373 map.insert(coin, inst);
374 }
375 let count = map.len();
376 self.instruments.store(map);
377 log::info!("Hyperliquid instrument cache initialized with {count} instruments");
378 }
379
380 pub fn cache_instrument(&self, instrument: InstrumentAny) {
384 let coin = instrument.raw_symbol().inner();
385 self.instruments.insert(coin, instrument.clone());
386
387 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
390 let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
391 }
392 }
393
394 #[must_use]
396 pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
397 self.instruments.clone()
398 }
399
400 pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
406 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
407 let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
408 }
409 }
410
411 pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
420 log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
421 self.cloid_cache.insert(cloid, client_order_id);
422 }
423
424 pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
429 if self.cloid_cache.remove(cloid).is_some() {
430 log::debug!("Removed cloid mapping: {cloid}");
431 }
432 }
433
434 pub fn clear_cloid_cache(&self) {
438 let count = self.cloid_cache.len();
439 self.cloid_cache.clear();
440
441 if count > 0 {
442 log::debug!("Cleared {count} cloid mappings from cache");
443 }
444 }
445
446 #[must_use]
448 pub fn cloid_cache_len(&self) -> usize {
449 self.cloid_cache.len()
450 }
451
452 #[must_use]
456 pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
457 self.cloid_cache.get(cloid).map(|entry| *entry.value())
458 }
459
460 pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
464 self.instruments
465 .load()
466 .values()
467 .find(|inst| inst.id() == *id)
468 .cloned()
469 }
470
471 pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
473 self.instruments.get_cloned(symbol)
474 }
475
476 pub fn subscription_count(&self) -> usize {
478 self.subscriptions.len()
479 }
480
481 pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
485 let key = format!("candle:{coin}:{interval}");
487 self.bar_types.load().get(&key).copied()
488 }
489
490 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
492 self.subscribe_book_with_options(instrument_id, None, None)
493 .await
494 }
495
496 pub async fn subscribe_book_with_options(
499 &self,
500 instrument_id: InstrumentId,
501 n_sig_figs: Option<u32>,
502 mantissa: Option<u32>,
503 ) -> anyhow::Result<()> {
504 let instrument = self
505 .get_instrument(&instrument_id)
506 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
507 let coin = instrument.raw_symbol().inner();
508
509 let cmd_tx = self.cmd_tx.read().await;
510
511 cmd_tx
513 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
514 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
515
516 let subscription = SubscriptionRequest::L2Book {
517 coin,
518 mantissa,
519 n_sig_figs,
520 };
521
522 cmd_tx
523 .send(HandlerCommand::Subscribe {
524 subscriptions: vec![subscription],
525 })
526 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
527 Ok(())
528 }
529
530 pub async fn subscribe_book_depth10(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
536 self.subscribe_book_depth10_with_options(instrument_id, None, None)
537 .await
538 }
539
540 pub async fn subscribe_book_depth10_with_options(
543 &self,
544 instrument_id: InstrumentId,
545 n_sig_figs: Option<u32>,
546 mantissa: Option<u32>,
547 ) -> anyhow::Result<()> {
548 let instrument = self
549 .get_instrument(&instrument_id)
550 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
551 let coin = instrument.raw_symbol().inner();
552
553 let cmd_tx = self.cmd_tx.read().await;
554
555 cmd_tx
556 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
557 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
558
559 cmd_tx
560 .send(HandlerCommand::SetDepth10Sub {
561 coin,
562 subscribed: true,
563 })
564 .map_err(|e| anyhow::anyhow!("Failed to send SetDepth10Sub command: {e}"))?;
565
566 let subscription = SubscriptionRequest::L2Book {
567 coin,
568 mantissa,
569 n_sig_figs,
570 };
571
572 cmd_tx
573 .send(HandlerCommand::Subscribe {
574 subscriptions: vec![subscription],
575 })
576 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
577 Ok(())
578 }
579
580 pub async fn unsubscribe_book_depth10(
587 &self,
588 instrument_id: InstrumentId,
589 ) -> anyhow::Result<()> {
590 let instrument = self
591 .get_instrument(&instrument_id)
592 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
593 let coin = instrument.raw_symbol().inner();
594
595 self.cmd_tx
596 .read()
597 .await
598 .send(HandlerCommand::SetDepth10Sub {
599 coin,
600 subscribed: false,
601 })
602 .map_err(|e| anyhow::anyhow!("Failed to send SetDepth10Sub command: {e}"))?;
603 Ok(())
604 }
605
606 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
608 let instrument = self
609 .get_instrument(&instrument_id)
610 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
611 let coin = instrument.raw_symbol().inner();
612
613 let cmd_tx = self.cmd_tx.read().await;
614
615 cmd_tx
617 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
618 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
619
620 let subscription = SubscriptionRequest::Bbo { coin };
621
622 cmd_tx
623 .send(HandlerCommand::Subscribe {
624 subscriptions: vec![subscription],
625 })
626 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
627 Ok(())
628 }
629
630 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
632 let instrument = self
633 .get_instrument(&instrument_id)
634 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
635 let coin = instrument.raw_symbol().inner();
636
637 let cmd_tx = self.cmd_tx.read().await;
638
639 cmd_tx
641 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
642 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
643
644 let subscription = SubscriptionRequest::Trades { coin };
645
646 cmd_tx
647 .send(HandlerCommand::Subscribe {
648 subscriptions: vec![subscription],
649 })
650 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
651 Ok(())
652 }
653
654 pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
656 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
657 .await
658 }
659
660 pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
662 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
663 .await
664 }
665
666 pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
668 let instrument = self
670 .get_instrument(&bar_type.instrument_id())
671 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
672 let coin = instrument.raw_symbol().inner();
673 let interval = bar_type_to_interval(&bar_type)?;
674 let subscription = SubscriptionRequest::Candle { coin, interval };
675
676 let key = format!("candle:{coin}:{interval}");
678 self.bar_types.insert(key.clone(), bar_type);
679
680 let cmd_tx = self.cmd_tx.read().await;
681
682 cmd_tx
683 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
684 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
685
686 cmd_tx
687 .send(HandlerCommand::AddBarType { key, bar_type })
688 .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
689
690 cmd_tx
691 .send(HandlerCommand::Subscribe {
692 subscriptions: vec![subscription],
693 })
694 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
695 Ok(())
696 }
697
698 pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
700 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
701 .await
702 }
703
704 pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
706 let subscription = SubscriptionRequest::OrderUpdates {
707 user: user.to_string(),
708 };
709 self.cmd_tx
710 .read()
711 .await
712 .send(HandlerCommand::Subscribe {
713 subscriptions: vec![subscription],
714 })
715 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
716 Ok(())
717 }
718
719 pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
721 let subscription = SubscriptionRequest::UserEvents {
722 user: user.to_string(),
723 };
724 self.cmd_tx
725 .read()
726 .await
727 .send(HandlerCommand::Subscribe {
728 subscriptions: vec![subscription],
729 })
730 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
731 Ok(())
732 }
733
734 pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
739 let subscription = SubscriptionRequest::UserFills {
740 user: user.to_string(),
741 aggregate_by_time: None,
742 };
743 self.cmd_tx
744 .read()
745 .await
746 .send(HandlerCommand::Subscribe {
747 subscriptions: vec![subscription],
748 })
749 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
750 Ok(())
751 }
752
753 pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
758 self.subscribe_order_updates(user).await?;
759 self.subscribe_user_events(user).await?;
760 Ok(())
761 }
762
763 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
765 let instrument = self
766 .get_instrument(&instrument_id)
767 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
768 let coin = instrument.raw_symbol().inner();
769
770 let subscription = SubscriptionRequest::L2Book {
771 coin,
772 mantissa: None,
773 n_sig_figs: None,
774 };
775
776 self.cmd_tx
777 .read()
778 .await
779 .send(HandlerCommand::Unsubscribe {
780 subscriptions: vec![subscription],
781 })
782 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
783 Ok(())
784 }
785
786 pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
788 let instrument = self
789 .get_instrument(&instrument_id)
790 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
791 let coin = instrument.raw_symbol().inner();
792
793 let subscription = SubscriptionRequest::Bbo { coin };
794
795 self.cmd_tx
796 .read()
797 .await
798 .send(HandlerCommand::Unsubscribe {
799 subscriptions: vec![subscription],
800 })
801 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
802 Ok(())
803 }
804
805 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
807 let instrument = self
808 .get_instrument(&instrument_id)
809 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
810 let coin = instrument.raw_symbol().inner();
811
812 let subscription = SubscriptionRequest::Trades { coin };
813
814 self.cmd_tx
815 .read()
816 .await
817 .send(HandlerCommand::Unsubscribe {
818 subscriptions: vec![subscription],
819 })
820 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
821 Ok(())
822 }
823
824 pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
826 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
827 .await
828 }
829
830 pub async fn unsubscribe_index_prices(
832 &self,
833 instrument_id: InstrumentId,
834 ) -> anyhow::Result<()> {
835 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
836 .await
837 }
838
839 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
841 let instrument = self
843 .get_instrument(&bar_type.instrument_id())
844 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
845 let coin = instrument.raw_symbol().inner();
846 let interval = bar_type_to_interval(&bar_type)?;
847 let subscription = SubscriptionRequest::Candle { coin, interval };
848
849 let key = format!("candle:{coin}:{interval}");
850 self.bar_types.remove(&key);
851
852 let cmd_tx = self.cmd_tx.read().await;
853
854 cmd_tx
855 .send(HandlerCommand::RemoveBarType { key })
856 .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
857
858 cmd_tx
859 .send(HandlerCommand::Unsubscribe {
860 subscriptions: vec![subscription],
861 })
862 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
863 Ok(())
864 }
865
866 pub async fn unsubscribe_funding_rates(
868 &self,
869 instrument_id: InstrumentId,
870 ) -> anyhow::Result<()> {
871 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
872 .await
873 }
874
875 async fn subscribe_asset_context_data(
876 &self,
877 instrument_id: InstrumentId,
878 data_type: AssetContextDataType,
879 ) -> anyhow::Result<()> {
880 let instrument = self
881 .get_instrument(&instrument_id)
882 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
883 let coin = instrument.raw_symbol().inner();
884
885 let mut entry = self.asset_context_subs.entry(coin).or_default();
886 let is_first_subscription = entry.is_empty();
887 entry.insert(data_type);
888 let data_types = entry.clone();
889 drop(entry);
890
891 let cmd_tx = self.cmd_tx.read().await;
892
893 cmd_tx
894 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
895 .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
896
897 if is_first_subscription {
898 log::debug!(
899 "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
900 );
901 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
902
903 cmd_tx
904 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
905 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
906
907 cmd_tx
908 .send(HandlerCommand::Subscribe {
909 subscriptions: vec![subscription],
910 })
911 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
912 } else {
913 log::debug!(
914 "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
915 );
916 }
917
918 Ok(())
919 }
920
921 async fn unsubscribe_asset_context_data(
922 &self,
923 instrument_id: InstrumentId,
924 data_type: AssetContextDataType,
925 ) -> anyhow::Result<()> {
926 let instrument = self
927 .get_instrument(&instrument_id)
928 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
929 let coin = instrument.raw_symbol().inner();
930
931 if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
932 entry.remove(&data_type);
933 let should_unsubscribe = entry.is_empty();
934 let data_types = entry.clone();
935 drop(entry);
936
937 let cmd_tx = self.cmd_tx.read().await;
938
939 if should_unsubscribe {
940 self.asset_context_subs.remove(&coin);
941
942 log::debug!(
943 "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
944 );
945 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
946
947 cmd_tx
948 .send(HandlerCommand::UpdateAssetContextSubs {
949 coin,
950 data_types: AHashSet::new(),
951 })
952 .map_err(|e| {
953 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
954 })?;
955
956 cmd_tx
957 .send(HandlerCommand::Unsubscribe {
958 subscriptions: vec![subscription],
959 })
960 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
961 } else {
962 log::debug!(
963 "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
964 );
965
966 cmd_tx
967 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
968 .map_err(|e| {
969 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
970 })?;
971 }
972 }
973
974 Ok(())
975 }
976
977 pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
981 if let Some(ref mut rx) = self.out_rx {
982 rx.recv().await
983 } else {
984 None
985 }
986 }
987}
988
989fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
992 let (kind, rest) = topic
993 .split_once(':')
994 .map_or((topic, None), |(k, r)| (k, Some(r)));
995
996 let channel = HyperliquidWsChannel::from_wire_str(kind)
997 .ok_or_else(|| anyhow::anyhow!("Unknown subscription channel: {kind}"))?;
998
999 match channel {
1000 HyperliquidWsChannel::AllMids => Ok(SubscriptionRequest::AllMids {
1001 dex: rest.map(|s| s.to_string()),
1002 }),
1003 HyperliquidWsChannel::Notification => Ok(SubscriptionRequest::Notification {
1004 user: rest.context("Missing user")?.to_string(),
1005 }),
1006 HyperliquidWsChannel::WebData2 => Ok(SubscriptionRequest::WebData2 {
1007 user: rest.context("Missing user")?.to_string(),
1008 }),
1009 HyperliquidWsChannel::Candle => {
1010 let rest = rest.context("Missing candle params")?;
1012 let (coin, interval_str) = rest.rsplit_once(':').context("Missing interval")?;
1013 let interval = HyperliquidBarInterval::from_str(interval_str)?;
1014 Ok(SubscriptionRequest::Candle {
1015 coin: Ustr::from(coin),
1016 interval,
1017 })
1018 }
1019 HyperliquidWsChannel::L2Book => Ok(SubscriptionRequest::L2Book {
1020 coin: Ustr::from(rest.context("Missing coin")?),
1021 mantissa: None,
1022 n_sig_figs: None,
1023 }),
1024 HyperliquidWsChannel::Trades => Ok(SubscriptionRequest::Trades {
1025 coin: Ustr::from(rest.context("Missing coin")?),
1026 }),
1027 HyperliquidWsChannel::OrderUpdates => Ok(SubscriptionRequest::OrderUpdates {
1028 user: rest.context("Missing user")?.to_string(),
1029 }),
1030 HyperliquidWsChannel::UserEvents => Ok(SubscriptionRequest::UserEvents {
1031 user: rest.context("Missing user")?.to_string(),
1032 }),
1033 HyperliquidWsChannel::UserFills => Ok(SubscriptionRequest::UserFills {
1034 user: rest.context("Missing user")?.to_string(),
1035 aggregate_by_time: None,
1036 }),
1037 HyperliquidWsChannel::UserFundings => Ok(SubscriptionRequest::UserFundings {
1038 user: rest.context("Missing user")?.to_string(),
1039 }),
1040 HyperliquidWsChannel::UserNonFundingLedgerUpdates => {
1041 Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
1042 user: rest.context("Missing user")?.to_string(),
1043 })
1044 }
1045 HyperliquidWsChannel::ActiveAssetCtx => Ok(SubscriptionRequest::ActiveAssetCtx {
1046 coin: Ustr::from(rest.context("Missing coin")?),
1047 }),
1048 HyperliquidWsChannel::ActiveSpotAssetCtx => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
1049 coin: Ustr::from(rest.context("Missing coin")?),
1050 }),
1051 HyperliquidWsChannel::ActiveAssetData => {
1052 let rest = rest.context("Missing params")?;
1054 let (user, coin) = rest.split_once(':').context("Missing coin")?;
1055 Ok(SubscriptionRequest::ActiveAssetData {
1056 user: user.to_string(),
1057 coin: coin.to_string(),
1058 })
1059 }
1060 HyperliquidWsChannel::UserTwapSliceFills => Ok(SubscriptionRequest::UserTwapSliceFills {
1061 user: rest.context("Missing user")?.to_string(),
1062 }),
1063 HyperliquidWsChannel::UserTwapHistory => Ok(SubscriptionRequest::UserTwapHistory {
1064 user: rest.context("Missing user")?.to_string(),
1065 }),
1066 HyperliquidWsChannel::Bbo => Ok(SubscriptionRequest::Bbo {
1067 coin: Ustr::from(rest.context("Missing coin")?),
1068 }),
1069
1070 HyperliquidWsChannel::SubscriptionResponse
1072 | HyperliquidWsChannel::User
1073 | HyperliquidWsChannel::Post
1074 | HyperliquidWsChannel::Pong
1075 | HyperliquidWsChannel::Error => {
1076 anyhow::bail!("Not a subscription channel: {kind}")
1077 }
1078 }
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083 use rstest::rstest;
1084
1085 use super::*;
1086 use crate::{common::enums::HyperliquidBarInterval, websocket::handler::subscription_to_key};
1087
1088 fn subscription_topic(sub: &SubscriptionRequest) -> String {
1090 subscription_to_key(sub)
1091 }
1092
1093 #[rstest]
1094 #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
1095 #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
1096 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
1097 #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
1098 fn test_subscription_topic_generation(
1099 #[case] subscription: SubscriptionRequest,
1100 #[case] expected_topic: &str,
1101 ) {
1102 assert_eq!(subscription_topic(&subscription), expected_topic);
1103 }
1104
1105 #[rstest]
1106 fn test_subscription_topics_unique() {
1107 let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
1108 let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
1109
1110 let topic1 = subscription_topic(&sub1);
1111 let topic2 = subscription_topic(&sub2);
1112
1113 assert_ne!(topic1, topic2);
1114 }
1115
1116 #[rstest]
1117 #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
1118 #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
1119 #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
1120 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
1121 #[case(SubscriptionRequest::Trades { coin: "vntls:vCURSOR".into() })]
1122 #[case(SubscriptionRequest::L2Book { coin: "vntls:vCURSOR".into(), mantissa: None, n_sig_figs: None })]
1123 #[case(SubscriptionRequest::Candle { coin: "vntls:vCURSOR".into(), interval: HyperliquidBarInterval::OneHour })]
1124 fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
1125 let topic = subscription_topic(&subscription);
1126 let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
1127 assert_eq!(subscription_topic(&reconstructed), topic);
1128 }
1129
1130 #[rstest]
1131 fn test_subscription_topic_candle() {
1132 let sub = SubscriptionRequest::Candle {
1133 coin: "BTC".into(),
1134 interval: HyperliquidBarInterval::OneHour,
1135 };
1136
1137 let topic = subscription_topic(&sub);
1138 assert_eq!(topic, "candle:BTC:1h");
1139 }
1140}