Skip to main content

nautilus_hyperliquid/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    str::FromStr,
18    sync::{
19        Arc,
20        atomic::{AtomicBool, AtomicU8, Ordering},
21    },
22};
23
24use ahash::{AHashMap, AHashSet};
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_common::live::get_runtime;
29use nautilus_core::AtomicMap;
30use nautilus_model::{
31    data::BarType,
32    identifiers::{AccountId, ClientOrderId, InstrumentId},
33    instruments::{Instrument, InstrumentAny},
34};
35use nautilus_network::{
36    mode::ConnectionMode,
37    websocket::{
38        AuthTracker, SubscriptionState, TransportBackend, WebSocketClient, WebSocketConfig,
39        channel_message_handler,
40    },
41};
42use ustr::Ustr;
43
44use crate::{
45    common::{
46        consts::ws_url,
47        enums::{HyperliquidBarInterval, HyperliquidEnvironment},
48        parse::bar_type_to_interval,
49    },
50    websocket::{
51        enums::HyperliquidWsChannel,
52        handler::{FeedHandler, HandlerCommand},
53        messages::{NautilusWsMessage, SubscriptionRequest},
54    },
55};
56
57const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
58
59/// Represents the different data types available from asset context subscriptions.
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
61pub(super) enum AssetContextDataType {
62    MarkPrice,
63    IndexPrice,
64    FundingRate,
65}
66
67/// Hyperliquid WebSocket client following the BitMEX pattern.
68///
69/// Orchestrates WebSocket connection and subscriptions using a command-based architecture,
70/// where the inner FeedHandler owns the WebSocketClient and handles all I/O.
71#[derive(Debug)]
72#[cfg_attr(
73    feature = "python",
74    pyo3::pyclass(
75        module = "nautilus_trader.core.nautilus_pyo3.hyperliquid",
76        from_py_object
77    )
78)]
79#[cfg_attr(
80    feature = "python",
81    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.hyperliquid")
82)]
83pub struct HyperliquidWebSocketClient {
84    url: String,
85    connection_mode: Arc<ArcSwap<AtomicU8>>,
86    signal: Arc<AtomicBool>,
87    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
88    out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
89    auth_tracker: AuthTracker,
90    subscriptions: SubscriptionState,
91    instruments: Arc<AtomicMap<Ustr, InstrumentAny>>,
92    bar_types: Arc<AtomicMap<String, BarType>>,
93    asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
94    cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
95    task_handle: Option<tokio::task::JoinHandle<()>>,
96    account_id: Option<AccountId>,
97    transport_backend: TransportBackend,
98    proxy_url: Option<String>,
99}
100
101impl Clone for HyperliquidWebSocketClient {
102    fn clone(&self) -> Self {
103        Self {
104            url: self.url.clone(),
105            connection_mode: Arc::clone(&self.connection_mode),
106            signal: Arc::clone(&self.signal),
107            cmd_tx: Arc::clone(&self.cmd_tx),
108            out_rx: None,
109            auth_tracker: self.auth_tracker.clone(),
110            subscriptions: self.subscriptions.clone(),
111            instruments: Arc::clone(&self.instruments),
112            bar_types: Arc::clone(&self.bar_types),
113            asset_context_subs: Arc::clone(&self.asset_context_subs),
114            cloid_cache: Arc::clone(&self.cloid_cache),
115            task_handle: None,
116            account_id: self.account_id,
117            transport_backend: self.transport_backend,
118            proxy_url: self.proxy_url.clone(),
119        }
120    }
121}
122
123impl HyperliquidWebSocketClient {
124    /// Creates a new Hyperliquid WebSocket client without connecting.
125    ///
126    /// If `url` is `None`, the appropriate URL will be determined from the `environment`:
127    /// - `Mainnet`: `wss://api.hyperliquid.xyz/ws`
128    /// - `Testnet`: `wss://api.hyperliquid-testnet.xyz/ws`
129    ///
130    /// The connection will be established when `connect()` is called.
131    pub fn new(
132        url: Option<String>,
133        environment: HyperliquidEnvironment,
134        account_id: Option<AccountId>,
135        transport_backend: TransportBackend,
136        proxy_url: Option<String>,
137    ) -> Self {
138        let url = url.unwrap_or_else(|| ws_url(environment).to_string());
139        let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
140            ConnectionMode::Closed as u8,
141        ))));
142        Self {
143            url,
144            connection_mode,
145            signal: Arc::new(AtomicBool::new(false)),
146            auth_tracker: AuthTracker::new(),
147            subscriptions: SubscriptionState::new(':'),
148            instruments: Arc::new(AtomicMap::new()),
149            bar_types: Arc::new(AtomicMap::new()),
150            asset_context_subs: Arc::new(DashMap::new()),
151            cloid_cache: Arc::new(DashMap::new()),
152            cmd_tx: {
153                // Placeholder channel until connect() creates the real handler and replays queued instruments
154                let (tx, _) = tokio::sync::mpsc::unbounded_channel();
155                Arc::new(tokio::sync::RwLock::new(tx))
156            },
157            out_rx: None,
158            task_handle: None,
159            account_id,
160            transport_backend,
161            proxy_url,
162        }
163    }
164
165    /// Establishes WebSocket connection and spawns the message handler.
166    pub async fn connect(&mut self) -> anyhow::Result<()> {
167        if self.is_active() {
168            log::warn!("WebSocket already connected");
169            return Ok(());
170        }
171        let (message_handler, raw_rx) = channel_message_handler();
172        let cfg = WebSocketConfig {
173            url: self.url.clone(),
174            headers: vec![],
175            heartbeat: Some(30),
176            heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
177            reconnect_timeout_ms: Some(15_000),
178            reconnect_delay_initial_ms: Some(250),
179            reconnect_delay_max_ms: Some(5_000),
180            reconnect_backoff_factor: Some(2.0),
181            reconnect_jitter_ms: Some(200),
182            reconnect_max_attempts: None,
183            idle_timeout_ms: None,
184            backend: self.transport_backend,
185            proxy_url: self.proxy_url.clone(),
186        };
187        let client =
188            WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
189
190        // Create channels for handler communication
191        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
192        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
193
194        // Update cmd_tx before connection_mode to avoid race where is_active() returns
195        // true but subscriptions still go to the old placeholder channel
196        *self.cmd_tx.write().await = cmd_tx.clone();
197        self.out_rx = Some(out_rx);
198
199        self.connection_mode.store(client.connection_mode_atomic());
200        log::info!("Hyperliquid WebSocket connected: {}", self.url);
201
202        // Send SetClient command immediately
203        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
204            anyhow::bail!("Failed to send SetClient command: {e}");
205        }
206
207        // Initialize handler with existing instruments
208        let instruments_vec: Vec<InstrumentAny> =
209            self.instruments.load().values().cloned().collect();
210
211        if !instruments_vec.is_empty()
212            && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
213        {
214            log::error!("Failed to send InitializeInstruments: {e}");
215        }
216
217        // Spawn handler task
218        let signal = Arc::clone(&self.signal);
219        let account_id = self.account_id;
220        let subscriptions = self.subscriptions.clone();
221        let cmd_tx_for_reconnect = cmd_tx.clone();
222        let cloid_cache = Arc::clone(&self.cloid_cache);
223
224        let stream_handle = get_runtime().spawn(async move {
225            let mut handler = FeedHandler::new(
226                signal,
227                cmd_rx,
228                raw_rx,
229                out_tx,
230                account_id,
231                subscriptions.clone(),
232                cloid_cache,
233            );
234
235            let resubscribe_all = || {
236                let topics = subscriptions.all_topics();
237                if topics.is_empty() {
238                    log::debug!("No active subscriptions to restore after reconnection");
239                    return;
240                }
241
242                log::info!(
243                    "Resubscribing to {} active subscriptions after reconnection",
244                    topics.len()
245                );
246
247                for topic in topics {
248                    match subscription_from_topic(&topic) {
249                        Ok(subscription) => {
250                            if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
251                                subscriptions: vec![subscription],
252                            }) {
253                                log::error!("Failed to send resubscribe command: {e}");
254                            }
255                        }
256                        Err(e) => {
257                            log::error!(
258                                "Failed to reconstruct subscription from topic: topic={topic}, {e}"
259                            );
260                        }
261                    }
262                }
263            };
264
265            loop {
266                match handler.next().await {
267                    Some(NautilusWsMessage::Reconnected) => {
268                        log::info!("WebSocket reconnected");
269                        resubscribe_all();
270                    }
271                    Some(msg) => {
272                        if handler.send(msg).is_err() {
273                            log::error!("Failed to send message (receiver dropped)");
274                            break;
275                        }
276                    }
277                    None => {
278                        if handler.is_stopped() {
279                            log::debug!("Stop signal received, ending message processing");
280                            break;
281                        }
282                        log::warn!("WebSocket stream ended unexpectedly");
283                        break;
284                    }
285                }
286            }
287            log::debug!("Handler task completed");
288        });
289        self.task_handle = Some(stream_handle);
290        Ok(())
291    }
292
293    /// Takes the handler task handle from this client so that another
294    /// instance (e.g., the non-clone original) can await it on disconnect.
295    pub fn take_task_handle(&mut self) -> Option<tokio::task::JoinHandle<()>> {
296        self.task_handle.take()
297    }
298
299    pub fn set_task_handle(&mut self, handle: tokio::task::JoinHandle<()>) {
300        self.task_handle = Some(handle);
301    }
302
303    /// Force-close fallback for the sync `stop()` path.
304    /// Prefer `disconnect()` for graceful shutdown.
305    pub(crate) fn abort(&mut self) {
306        self.signal.store(true, Ordering::Relaxed);
307        self.connection_mode
308            .store(Arc::new(AtomicU8::new(ConnectionMode::Closed as u8)));
309
310        if let Some(handle) = self.task_handle.take() {
311            handle.abort();
312        }
313    }
314
315    /// Disconnects the WebSocket connection.
316    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
317        log::info!("Disconnecting Hyperliquid WebSocket");
318        self.signal.store(true, Ordering::Relaxed);
319
320        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
321            log::debug!(
322                "Failed to send disconnect command (handler may already be shut down): {e}"
323            );
324        }
325
326        if let Some(handle) = self.task_handle.take() {
327            log::debug!("Waiting for task handle to complete");
328            let abort_handle = handle.abort_handle();
329            tokio::select! {
330                result = handle => {
331                    match result {
332                        Ok(()) => log::debug!("Task handle completed successfully"),
333                        Err(e) if e.is_cancelled() => {
334                            log::debug!("Task was cancelled");
335                        }
336                        Err(e) => log::error!("Task handle encountered an error: {e:?}"),
337                    }
338                }
339                () = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
340                    log::warn!("Timeout waiting for task handle, aborting task");
341                    abort_handle.abort();
342                }
343            }
344        } else {
345            log::debug!("No task handle to await");
346        }
347        log::debug!("Disconnected");
348        Ok(())
349    }
350
351    /// Returns true if the WebSocket is actively connected.
352    pub fn is_active(&self) -> bool {
353        let mode = self.connection_mode.load();
354        mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
355    }
356
357    /// Returns the URL of this WebSocket client.
358    pub fn url(&self) -> &str {
359        &self.url
360    }
361
362    /// Caches multiple instruments.
363    ///
364    /// Clears the existing cache first, then adds all provided instruments.
365    /// Instruments are keyed by their raw_symbol which is unique per instrument:
366    /// - Perps use base currency (e.g., "BTC")
367    /// - Spot uses @{pair_index} format (e.g., "@107") or slash format for PURR
368    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
369        let mut map = AHashMap::new();
370
371        for inst in instruments {
372            let coin = inst.raw_symbol().inner();
373            map.insert(coin, inst);
374        }
375        let count = map.len();
376        self.instruments.store(map);
377        log::info!("Hyperliquid instrument cache initialized with {count} instruments");
378    }
379
380    /// Caches a single instrument.
381    ///
382    /// Any existing instrument with the same raw_symbol will be replaced.
383    pub fn cache_instrument(&self, instrument: InstrumentAny) {
384        let coin = instrument.raw_symbol().inner();
385        self.instruments.insert(coin, instrument.clone());
386
387        // Before connect() the handler isn't running; this send will fail and that's expected
388        // because connect() replays the instruments via InitializeInstruments
389        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
390            let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
391        }
392    }
393
394    /// Returns a shared reference to the instrument cache.
395    #[must_use]
396    pub fn instruments_cache(&self) -> Arc<AtomicMap<Ustr, InstrumentAny>> {
397        self.instruments.clone()
398    }
399
400    /// Caches spot fill coin mappings for instrument lookup.
401    ///
402    /// Hyperliquid WebSocket fills for spot use `@{pair_index}` format (e.g., `@107`),
403    /// while instruments are identified by full symbols (e.g., `HYPE-USDC-SPOT`).
404    /// This mapping allows the handler to look up instruments from spot fills.
405    pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
406        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
407            let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
408        }
409    }
410
411    /// Caches a cloid (hex hash) to client_order_id mapping for order/fill resolution.
412    ///
413    /// The cloid is a keccak256 hash of the client_order_id that Hyperliquid uses internally.
414    /// This mapping allows WebSocket order status and fill reports to be resolved back to
415    /// the original client_order_id.
416    ///
417    /// This writes directly to a shared cache that the handler reads from, avoiding any
418    /// race conditions between caching and WebSocket message processing.
419    pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
420        log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
421        self.cloid_cache.insert(cloid, client_order_id);
422    }
423
424    /// Removes a cloid mapping from the cache.
425    ///
426    /// Should be called when an order reaches a terminal state (filled, canceled, expired)
427    /// to prevent unbounded memory growth in long-running sessions.
428    pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
429        if self.cloid_cache.remove(cloid).is_some() {
430            log::debug!("Removed cloid mapping: {cloid}");
431        }
432    }
433
434    /// Clears all cloid mappings from the cache.
435    ///
436    /// Useful for cleanup during reconnection or shutdown.
437    pub fn clear_cloid_cache(&self) {
438        let count = self.cloid_cache.len();
439        self.cloid_cache.clear();
440
441        if count > 0 {
442            log::debug!("Cleared {count} cloid mappings from cache");
443        }
444    }
445
446    /// Returns the number of cloid mappings in the cache.
447    #[must_use]
448    pub fn cloid_cache_len(&self) -> usize {
449        self.cloid_cache.len()
450    }
451
452    /// Looks up a client_order_id by its cloid hash.
453    ///
454    /// Returns `Some(ClientOrderId)` if the mapping exists, `None` otherwise.
455    #[must_use]
456    pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
457        self.cloid_cache.get(cloid).map(|entry| *entry.value())
458    }
459
460    /// Gets an instrument from the cache by ID.
461    ///
462    /// Searches the cache for a matching instrument ID.
463    pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
464        self.instruments
465            .load()
466            .values()
467            .find(|inst| inst.id() == *id)
468            .cloned()
469    }
470
471    /// Gets an instrument from the cache by raw_symbol (coin).
472    pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
473        self.instruments.get_cloned(symbol)
474    }
475
476    /// Returns the count of confirmed subscriptions.
477    pub fn subscription_count(&self) -> usize {
478        self.subscriptions.len()
479    }
480
481    /// Gets a bar type from the cache by coin and interval.
482    ///
483    /// This looks up the subscription key created when subscribing to bars.
484    pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
485        // Use canonical key format matching subscribe_bars
486        let key = format!("candle:{coin}:{interval}");
487        self.bar_types.load().get(&key).copied()
488    }
489
490    /// Subscribe to L2 order book for an instrument.
491    pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
492        self.subscribe_book_with_options(instrument_id, None, None)
493            .await
494    }
495
496    /// Subscribe to L2 order book with optional `nSigFigs` / `mantissa`
497    /// precision controls passed through to the venue's `l2Book` stream.
498    pub async fn subscribe_book_with_options(
499        &self,
500        instrument_id: InstrumentId,
501        n_sig_figs: Option<u32>,
502        mantissa: Option<u32>,
503    ) -> anyhow::Result<()> {
504        let instrument = self
505            .get_instrument(&instrument_id)
506            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
507        let coin = instrument.raw_symbol().inner();
508
509        let cmd_tx = self.cmd_tx.read().await;
510
511        // Update the handler's coin→instrument mapping for this subscription
512        cmd_tx
513            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
514            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
515
516        let subscription = SubscriptionRequest::L2Book {
517            coin,
518            mantissa,
519            n_sig_figs,
520        };
521
522        cmd_tx
523            .send(HandlerCommand::Subscribe {
524                subscriptions: vec![subscription],
525            })
526            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
527        Ok(())
528    }
529
530    /// Subscribe to order book depth-10 snapshots.
531    ///
532    /// Reuses the same `l2Book` WebSocket subscription as
533    /// [`Self::subscribe_book`] and flags the handler to additionally emit
534    /// `NautilusWsMessage::Depth10` for this coin.
535    pub async fn subscribe_book_depth10(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
536        self.subscribe_book_depth10_with_options(instrument_id, None, None)
537            .await
538    }
539
540    /// Subscribe to depth-10 snapshots with optional `nSigFigs` /
541    /// `mantissa` precision controls.
542    pub async fn subscribe_book_depth10_with_options(
543        &self,
544        instrument_id: InstrumentId,
545        n_sig_figs: Option<u32>,
546        mantissa: Option<u32>,
547    ) -> anyhow::Result<()> {
548        let instrument = self
549            .get_instrument(&instrument_id)
550            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
551        let coin = instrument.raw_symbol().inner();
552
553        let cmd_tx = self.cmd_tx.read().await;
554
555        cmd_tx
556            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
557            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
558
559        cmd_tx
560            .send(HandlerCommand::SetDepth10Sub {
561                coin,
562                subscribed: true,
563            })
564            .map_err(|e| anyhow::anyhow!("Failed to send SetDepth10Sub command: {e}"))?;
565
566        let subscription = SubscriptionRequest::L2Book {
567            coin,
568            mantissa,
569            n_sig_figs,
570        };
571
572        cmd_tx
573            .send(HandlerCommand::Subscribe {
574                subscriptions: vec![subscription],
575            })
576            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
577        Ok(())
578    }
579
580    /// Unsubscribe from order book depth-10 snapshots.
581    ///
582    /// Clears the depth10 emission flag only; the underlying `l2Book`
583    /// stream stays open so active deltas subscribers keep receiving
584    /// updates. Call [`Self::unsubscribe_book`] separately to tear down
585    /// the stream entirely.
586    pub async fn unsubscribe_book_depth10(
587        &self,
588        instrument_id: InstrumentId,
589    ) -> anyhow::Result<()> {
590        let instrument = self
591            .get_instrument(&instrument_id)
592            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
593        let coin = instrument.raw_symbol().inner();
594
595        self.cmd_tx
596            .read()
597            .await
598            .send(HandlerCommand::SetDepth10Sub {
599                coin,
600                subscribed: false,
601            })
602            .map_err(|e| anyhow::anyhow!("Failed to send SetDepth10Sub command: {e}"))?;
603        Ok(())
604    }
605
606    /// Subscribe to best bid/offer (BBO) quotes for an instrument.
607    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
608        let instrument = self
609            .get_instrument(&instrument_id)
610            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
611        let coin = instrument.raw_symbol().inner();
612
613        let cmd_tx = self.cmd_tx.read().await;
614
615        // Update the handler's coin→instrument mapping for this subscription
616        cmd_tx
617            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
618            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
619
620        let subscription = SubscriptionRequest::Bbo { coin };
621
622        cmd_tx
623            .send(HandlerCommand::Subscribe {
624                subscriptions: vec![subscription],
625            })
626            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
627        Ok(())
628    }
629
630    /// Subscribe to trades for an instrument.
631    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
632        let instrument = self
633            .get_instrument(&instrument_id)
634            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
635        let coin = instrument.raw_symbol().inner();
636
637        let cmd_tx = self.cmd_tx.read().await;
638
639        // Update the handler's coin→instrument mapping for this subscription
640        cmd_tx
641            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
642            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
643
644        let subscription = SubscriptionRequest::Trades { coin };
645
646        cmd_tx
647            .send(HandlerCommand::Subscribe {
648                subscriptions: vec![subscription],
649            })
650            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
651        Ok(())
652    }
653
654    /// Subscribe to mark price updates for an instrument.
655    pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
656        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
657            .await
658    }
659
660    /// Subscribe to index/oracle price updates for an instrument.
661    pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
662        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
663            .await
664    }
665
666    /// Subscribe to candle/bar data for a specific coin and interval.
667    pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
668        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
669        let instrument = self
670            .get_instrument(&bar_type.instrument_id())
671            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
672        let coin = instrument.raw_symbol().inner();
673        let interval = bar_type_to_interval(&bar_type)?;
674        let subscription = SubscriptionRequest::Candle { coin, interval };
675
676        // Cache the bar type for parsing using canonical key
677        let key = format!("candle:{coin}:{interval}");
678        self.bar_types.insert(key.clone(), bar_type);
679
680        let cmd_tx = self.cmd_tx.read().await;
681
682        cmd_tx
683            .send(HandlerCommand::UpdateInstrument(instrument.clone()))
684            .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
685
686        cmd_tx
687            .send(HandlerCommand::AddBarType { key, bar_type })
688            .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
689
690        cmd_tx
691            .send(HandlerCommand::Subscribe {
692                subscriptions: vec![subscription],
693            })
694            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
695        Ok(())
696    }
697
698    /// Subscribe to funding rate updates for an instrument.
699    pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
700        self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
701            .await
702    }
703
704    /// Subscribe to order updates for a specific user address.
705    pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
706        let subscription = SubscriptionRequest::OrderUpdates {
707            user: user.to_string(),
708        };
709        self.cmd_tx
710            .read()
711            .await
712            .send(HandlerCommand::Subscribe {
713                subscriptions: vec![subscription],
714            })
715            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
716        Ok(())
717    }
718
719    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
720    pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
721        let subscription = SubscriptionRequest::UserEvents {
722            user: user.to_string(),
723        };
724        self.cmd_tx
725            .read()
726            .await
727            .send(HandlerCommand::Subscribe {
728                subscriptions: vec![subscription],
729            })
730            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
731        Ok(())
732    }
733
734    /// Subscribe to user fills for a specific user address.
735    ///
736    /// Note: This channel is redundant with `userEvents` which already includes fills.
737    /// Prefer using `subscribe_user_events` or `subscribe_all_user_channels` instead.
738    pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
739        let subscription = SubscriptionRequest::UserFills {
740            user: user.to_string(),
741            aggregate_by_time: None,
742        };
743        self.cmd_tx
744            .read()
745            .await
746            .send(HandlerCommand::Subscribe {
747                subscriptions: vec![subscription],
748            })
749            .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
750        Ok(())
751    }
752
753    /// Subscribe to all user channels (order updates + user events) for convenience.
754    ///
755    /// Note: `userEvents` already includes fills, so we don't subscribe to `userFills`
756    /// separately to avoid duplicate fill messages.
757    pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
758        self.subscribe_order_updates(user).await?;
759        self.subscribe_user_events(user).await?;
760        Ok(())
761    }
762
763    /// Unsubscribe from L2 order book for an instrument.
764    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
765        let instrument = self
766            .get_instrument(&instrument_id)
767            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
768        let coin = instrument.raw_symbol().inner();
769
770        let subscription = SubscriptionRequest::L2Book {
771            coin,
772            mantissa: None,
773            n_sig_figs: None,
774        };
775
776        self.cmd_tx
777            .read()
778            .await
779            .send(HandlerCommand::Unsubscribe {
780                subscriptions: vec![subscription],
781            })
782            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
783        Ok(())
784    }
785
786    /// Unsubscribe from quote ticks for an instrument.
787    pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
788        let instrument = self
789            .get_instrument(&instrument_id)
790            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
791        let coin = instrument.raw_symbol().inner();
792
793        let subscription = SubscriptionRequest::Bbo { coin };
794
795        self.cmd_tx
796            .read()
797            .await
798            .send(HandlerCommand::Unsubscribe {
799                subscriptions: vec![subscription],
800            })
801            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
802        Ok(())
803    }
804
805    /// Unsubscribe from trades for an instrument.
806    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
807        let instrument = self
808            .get_instrument(&instrument_id)
809            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
810        let coin = instrument.raw_symbol().inner();
811
812        let subscription = SubscriptionRequest::Trades { coin };
813
814        self.cmd_tx
815            .read()
816            .await
817            .send(HandlerCommand::Unsubscribe {
818                subscriptions: vec![subscription],
819            })
820            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
821        Ok(())
822    }
823
824    /// Unsubscribe from mark price updates for an instrument.
825    pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
826        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
827            .await
828    }
829
830    /// Unsubscribe from index/oracle price updates for an instrument.
831    pub async fn unsubscribe_index_prices(
832        &self,
833        instrument_id: InstrumentId,
834    ) -> anyhow::Result<()> {
835        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
836            .await
837    }
838
839    /// Unsubscribe from candle/bar data.
840    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
841        // Get the instrument to extract the raw_symbol (Hyperliquid ticker)
842        let instrument = self
843            .get_instrument(&bar_type.instrument_id())
844            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
845        let coin = instrument.raw_symbol().inner();
846        let interval = bar_type_to_interval(&bar_type)?;
847        let subscription = SubscriptionRequest::Candle { coin, interval };
848
849        let key = format!("candle:{coin}:{interval}");
850        self.bar_types.remove(&key);
851
852        let cmd_tx = self.cmd_tx.read().await;
853
854        cmd_tx
855            .send(HandlerCommand::RemoveBarType { key })
856            .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
857
858        cmd_tx
859            .send(HandlerCommand::Unsubscribe {
860                subscriptions: vec![subscription],
861            })
862            .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
863        Ok(())
864    }
865
866    /// Unsubscribe from funding rate updates for an instrument.
867    pub async fn unsubscribe_funding_rates(
868        &self,
869        instrument_id: InstrumentId,
870    ) -> anyhow::Result<()> {
871        self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
872            .await
873    }
874
875    async fn subscribe_asset_context_data(
876        &self,
877        instrument_id: InstrumentId,
878        data_type: AssetContextDataType,
879    ) -> anyhow::Result<()> {
880        let instrument = self
881            .get_instrument(&instrument_id)
882            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
883        let coin = instrument.raw_symbol().inner();
884
885        let mut entry = self.asset_context_subs.entry(coin).or_default();
886        let is_first_subscription = entry.is_empty();
887        entry.insert(data_type);
888        let data_types = entry.clone();
889        drop(entry);
890
891        let cmd_tx = self.cmd_tx.read().await;
892
893        cmd_tx
894            .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
895            .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
896
897        if is_first_subscription {
898            log::debug!(
899                "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
900            );
901            let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
902
903            cmd_tx
904                .send(HandlerCommand::UpdateInstrument(instrument.clone()))
905                .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
906
907            cmd_tx
908                .send(HandlerCommand::Subscribe {
909                    subscriptions: vec![subscription],
910                })
911                .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
912        } else {
913            log::debug!(
914                "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
915            );
916        }
917
918        Ok(())
919    }
920
921    async fn unsubscribe_asset_context_data(
922        &self,
923        instrument_id: InstrumentId,
924        data_type: AssetContextDataType,
925    ) -> anyhow::Result<()> {
926        let instrument = self
927            .get_instrument(&instrument_id)
928            .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
929        let coin = instrument.raw_symbol().inner();
930
931        if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
932            entry.remove(&data_type);
933            let should_unsubscribe = entry.is_empty();
934            let data_types = entry.clone();
935            drop(entry);
936
937            let cmd_tx = self.cmd_tx.read().await;
938
939            if should_unsubscribe {
940                self.asset_context_subs.remove(&coin);
941
942                log::debug!(
943                    "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
944                );
945                let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
946
947                cmd_tx
948                    .send(HandlerCommand::UpdateAssetContextSubs {
949                        coin,
950                        data_types: AHashSet::new(),
951                    })
952                    .map_err(|e| {
953                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
954                    })?;
955
956                cmd_tx
957                    .send(HandlerCommand::Unsubscribe {
958                        subscriptions: vec![subscription],
959                    })
960                    .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
961            } else {
962                log::debug!(
963                    "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
964                );
965
966                cmd_tx
967                    .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
968                    .map_err(|e| {
969                        anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
970                    })?;
971            }
972        }
973
974        Ok(())
975    }
976
977    /// Receives the next message from the WebSocket handler.
978    ///
979    /// Returns `None` if the handler has disconnected or the receiver was already taken.
980    pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
981        if let Some(ref mut rx) = self.out_rx {
982            rx.recv().await
983        } else {
984            None
985        }
986    }
987}
988
989// Uses split_once/rsplit_once because coin names can contain colons
990// (e.g., vault tokens `vntls:vCURSOR`)
991fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
992    let (kind, rest) = topic
993        .split_once(':')
994        .map_or((topic, None), |(k, r)| (k, Some(r)));
995
996    let channel = HyperliquidWsChannel::from_wire_str(kind)
997        .ok_or_else(|| anyhow::anyhow!("Unknown subscription channel: {kind}"))?;
998
999    match channel {
1000        HyperliquidWsChannel::AllMids => Ok(SubscriptionRequest::AllMids {
1001            dex: rest.map(|s| s.to_string()),
1002        }),
1003        HyperliquidWsChannel::Notification => Ok(SubscriptionRequest::Notification {
1004            user: rest.context("Missing user")?.to_string(),
1005        }),
1006        HyperliquidWsChannel::WebData2 => Ok(SubscriptionRequest::WebData2 {
1007            user: rest.context("Missing user")?.to_string(),
1008        }),
1009        HyperliquidWsChannel::Candle => {
1010            // Format: candle:{coin}:{interval} - interval is last segment
1011            let rest = rest.context("Missing candle params")?;
1012            let (coin, interval_str) = rest.rsplit_once(':').context("Missing interval")?;
1013            let interval = HyperliquidBarInterval::from_str(interval_str)?;
1014            Ok(SubscriptionRequest::Candle {
1015                coin: Ustr::from(coin),
1016                interval,
1017            })
1018        }
1019        HyperliquidWsChannel::L2Book => Ok(SubscriptionRequest::L2Book {
1020            coin: Ustr::from(rest.context("Missing coin")?),
1021            mantissa: None,
1022            n_sig_figs: None,
1023        }),
1024        HyperliquidWsChannel::Trades => Ok(SubscriptionRequest::Trades {
1025            coin: Ustr::from(rest.context("Missing coin")?),
1026        }),
1027        HyperliquidWsChannel::OrderUpdates => Ok(SubscriptionRequest::OrderUpdates {
1028            user: rest.context("Missing user")?.to_string(),
1029        }),
1030        HyperliquidWsChannel::UserEvents => Ok(SubscriptionRequest::UserEvents {
1031            user: rest.context("Missing user")?.to_string(),
1032        }),
1033        HyperliquidWsChannel::UserFills => Ok(SubscriptionRequest::UserFills {
1034            user: rest.context("Missing user")?.to_string(),
1035            aggregate_by_time: None,
1036        }),
1037        HyperliquidWsChannel::UserFundings => Ok(SubscriptionRequest::UserFundings {
1038            user: rest.context("Missing user")?.to_string(),
1039        }),
1040        HyperliquidWsChannel::UserNonFundingLedgerUpdates => {
1041            Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
1042                user: rest.context("Missing user")?.to_string(),
1043            })
1044        }
1045        HyperliquidWsChannel::ActiveAssetCtx => Ok(SubscriptionRequest::ActiveAssetCtx {
1046            coin: Ustr::from(rest.context("Missing coin")?),
1047        }),
1048        HyperliquidWsChannel::ActiveSpotAssetCtx => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
1049            coin: Ustr::from(rest.context("Missing coin")?),
1050        }),
1051        HyperliquidWsChannel::ActiveAssetData => {
1052            // Format: activeAssetData:{user}:{coin} - user is eth addr (no colons)
1053            let rest = rest.context("Missing params")?;
1054            let (user, coin) = rest.split_once(':').context("Missing coin")?;
1055            Ok(SubscriptionRequest::ActiveAssetData {
1056                user: user.to_string(),
1057                coin: coin.to_string(),
1058            })
1059        }
1060        HyperliquidWsChannel::UserTwapSliceFills => Ok(SubscriptionRequest::UserTwapSliceFills {
1061            user: rest.context("Missing user")?.to_string(),
1062        }),
1063        HyperliquidWsChannel::UserTwapHistory => Ok(SubscriptionRequest::UserTwapHistory {
1064            user: rest.context("Missing user")?.to_string(),
1065        }),
1066        HyperliquidWsChannel::Bbo => Ok(SubscriptionRequest::Bbo {
1067            coin: Ustr::from(rest.context("Missing coin")?),
1068        }),
1069
1070        // Response-only channels are not valid subscription topics
1071        HyperliquidWsChannel::SubscriptionResponse
1072        | HyperliquidWsChannel::User
1073        | HyperliquidWsChannel::Post
1074        | HyperliquidWsChannel::Pong
1075        | HyperliquidWsChannel::Error => {
1076            anyhow::bail!("Not a subscription channel: {kind}")
1077        }
1078    }
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083    use rstest::rstest;
1084
1085    use super::*;
1086    use crate::{common::enums::HyperliquidBarInterval, websocket::handler::subscription_to_key};
1087
1088    /// Generates a unique topic key for a subscription request.
1089    fn subscription_topic(sub: &SubscriptionRequest) -> String {
1090        subscription_to_key(sub)
1091    }
1092
1093    #[rstest]
1094    #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
1095    #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
1096    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
1097    #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
1098    fn test_subscription_topic_generation(
1099        #[case] subscription: SubscriptionRequest,
1100        #[case] expected_topic: &str,
1101    ) {
1102        assert_eq!(subscription_topic(&subscription), expected_topic);
1103    }
1104
1105    #[rstest]
1106    fn test_subscription_topics_unique() {
1107        let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
1108        let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
1109
1110        let topic1 = subscription_topic(&sub1);
1111        let topic2 = subscription_topic(&sub2);
1112
1113        assert_ne!(topic1, topic2);
1114    }
1115
1116    #[rstest]
1117    #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
1118    #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
1119    #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
1120    #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
1121    #[case(SubscriptionRequest::Trades { coin: "vntls:vCURSOR".into() })]
1122    #[case(SubscriptionRequest::L2Book { coin: "vntls:vCURSOR".into(), mantissa: None, n_sig_figs: None })]
1123    #[case(SubscriptionRequest::Candle { coin: "vntls:vCURSOR".into(), interval: HyperliquidBarInterval::OneHour })]
1124    fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
1125        let topic = subscription_topic(&subscription);
1126        let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
1127        assert_eq!(subscription_topic(&reconstructed), topic);
1128    }
1129
1130    #[rstest]
1131    fn test_subscription_topic_candle() {
1132        let sub = SubscriptionRequest::Candle {
1133            coin: "BTC".into(),
1134            interval: HyperliquidBarInterval::OneHour,
1135        };
1136
1137        let topic = subscription_topic(&sub);
1138        assert_eq!(topic, "candle:BTC:1h");
1139    }
1140}