Skip to main content

nautilus_hyperliquid/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Hyperliquid.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use dashmap::DashMap;
25use nautilus_common::cache::fifo::FifoCache;
26use nautilus_core::{AtomicTime, nanos::UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28    data::BarType,
29    identifiers::{AccountId, ClientOrderId},
30    instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::{
33    RECONNECTED,
34    retry::{RetryManager, create_websocket_retry_manager},
35    websocket::{SubscriptionState, WebSocketClient},
36};
37use tokio_tungstenite::tungstenite::Message;
38use ustr::Ustr;
39
40use super::{
41    client::AssetContextDataType,
42    enums::HyperliquidWsChannel,
43    error::HyperliquidWsError,
44    messages::{
45        CandleData, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest, NautilusWsMessage,
46        SubscriptionRequest, WsActiveAssetCtxData, WsUserEventData,
47    },
48    parse::{
49        parse_ws_asset_context, parse_ws_candle, parse_ws_fill_report, parse_ws_order_book_deltas,
50        parse_ws_order_book_depth10, parse_ws_order_status_report, parse_ws_quote_tick,
51        parse_ws_trade_tick,
52    },
53};
54
55/// Commands sent from the outer client to the inner message handler.
56#[derive(Debug)]
57#[expect(
58    clippy::large_enum_variant,
59    reason = "Commands are ephemeral and immediately consumed"
60)]
61#[allow(private_interfaces)]
62pub enum HandlerCommand {
63    /// Set the WebSocketClient for the handler to use.
64    SetClient(WebSocketClient),
65    /// Disconnect the WebSocket connection.
66    Disconnect,
67    /// Subscribe to the given subscriptions.
68    Subscribe {
69        subscriptions: Vec<SubscriptionRequest>,
70    },
71    /// Unsubscribe from the given subscriptions.
72    Unsubscribe {
73        subscriptions: Vec<SubscriptionRequest>,
74    },
75    /// Initialize the instruments cache with the given instruments.
76    InitializeInstruments(Vec<InstrumentAny>),
77    /// Update a single instrument in the cache.
78    UpdateInstrument(InstrumentAny),
79    /// Add a bar type mapping for candle parsing.
80    AddBarType { key: String, bar_type: BarType },
81    /// Remove a bar type mapping.
82    RemoveBarType { key: String },
83    /// Update asset context subscriptions for a coin.
84    UpdateAssetContextSubs {
85        coin: Ustr,
86        data_types: AHashSet<AssetContextDataType>,
87    },
88    /// Cache spot fill coin mappings for instrument lookup.
89    CacheSpotFillCoins(AHashMap<Ustr, Ustr>),
90    /// Flag whether the `l2Book` stream for `coin` should also be emitted
91    /// as [`NautilusWsMessage::Depth10`] snapshots.
92    SetDepth10Sub { coin: Ustr, subscribed: bool },
93}
94
95pub(super) struct FeedHandler {
96    clock: &'static AtomicTime,
97    signal: Arc<AtomicBool>,
98    client: Option<WebSocketClient>,
99    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
100    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
101    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
102    account_id: Option<AccountId>,
103    subscriptions: SubscriptionState,
104    retry_manager: RetryManager<HyperliquidWsError>,
105    message_buffer: Vec<NautilusWsMessage>,
106    instruments: AHashMap<Ustr, InstrumentAny>,
107    cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
108    bar_types_cache: AHashMap<String, BarType>,
109    bar_cache: AHashMap<String, CandleData>,
110    asset_context_subs: AHashMap<Ustr, AHashSet<AssetContextDataType>>,
111    depth10_subs: AHashSet<Ustr>,
112    processed_trade_ids: FifoCache<u64, 10_000>,
113    mark_price_cache: AHashMap<Ustr, String>,
114    index_price_cache: AHashMap<Ustr, String>,
115    funding_rate_cache: AHashMap<Ustr, String>,
116}
117
118impl FeedHandler {
119    /// Creates a new [`FeedHandler`] instance.
120    pub(super) fn new(
121        signal: Arc<AtomicBool>,
122        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
123        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
124        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
125        account_id: Option<AccountId>,
126        subscriptions: SubscriptionState,
127        cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
128    ) -> Self {
129        Self {
130            clock: get_atomic_clock_realtime(),
131            signal,
132            client: None,
133            cmd_rx,
134            raw_rx,
135            out_tx,
136            account_id,
137            subscriptions,
138            retry_manager: create_websocket_retry_manager(),
139            message_buffer: Vec::new(),
140            instruments: AHashMap::new(),
141            cloid_cache,
142            bar_types_cache: AHashMap::new(),
143            bar_cache: AHashMap::new(),
144            asset_context_subs: AHashMap::new(),
145            depth10_subs: AHashSet::new(),
146            processed_trade_ids: FifoCache::new(),
147            mark_price_cache: AHashMap::new(),
148            index_price_cache: AHashMap::new(),
149            funding_rate_cache: AHashMap::new(),
150        }
151    }
152
153    /// Send a message to the output channel.
154    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), String> {
155        self.out_tx
156            .send(msg)
157            .map_err(|e| format!("Failed to send message: {e}"))
158    }
159
160    /// Check if the handler has received a stop signal.
161    pub(super) fn is_stopped(&self) -> bool {
162        self.signal.load(Ordering::Relaxed)
163    }
164
165    async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
166        if let Some(client) = &self.client {
167            self.retry_manager
168                .execute_with_retry(
169                    "websocket_send",
170                    || {
171                        let payload = payload.clone();
172                        async move {
173                            client.send_text(payload, None).await.map_err(|e| {
174                                HyperliquidWsError::ClientError(format!("Send failed: {e}"))
175                            })
176                        }
177                    },
178                    should_retry_hyperliquid_error,
179                    create_hyperliquid_timeout_error,
180                )
181                .await
182                .map_err(|e| anyhow::anyhow!("{e}"))
183        } else {
184            Err(anyhow::anyhow!("No WebSocket client available"))
185        }
186    }
187
188    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
189        if !self.message_buffer.is_empty() {
190            return Some(self.message_buffer.remove(0));
191        }
192
193        loop {
194            tokio::select! {
195                Some(cmd) = self.cmd_rx.recv() => {
196                    match cmd {
197                        HandlerCommand::SetClient(client) => {
198                            log::debug!("Setting WebSocket client in handler");
199                            self.client = Some(client);
200                        }
201                        HandlerCommand::Disconnect => {
202                            log::debug!("Handler received disconnect command");
203
204                            if let Some(ref client) = self.client {
205                                client.disconnect().await;
206                            }
207                            self.signal.store(true, Ordering::SeqCst);
208                            return None;
209                        }
210                        HandlerCommand::Subscribe { subscriptions } => {
211                            for subscription in subscriptions {
212                                let key = subscription_to_key(&subscription);
213                                self.subscriptions.mark_subscribe(&key);
214
215                                let request = HyperliquidWsRequest::Subscribe { subscription };
216                                match serde_json::to_string(&request) {
217                                    Ok(payload) => {
218                                        log::debug!("Sending subscribe payload: {payload}");
219                                        if let Err(e) = self.send_with_retry(payload).await {
220                                            log::error!("Error subscribing to {key}: {e}");
221                                            self.subscriptions.mark_failure(&key);
222                                        }
223                                    }
224                                    Err(e) => {
225                                        log::error!("Error serializing subscription for {key}: {e}");
226                                        self.subscriptions.mark_failure(&key);
227                                    }
228                                }
229                            }
230                        }
231                        HandlerCommand::Unsubscribe { subscriptions } => {
232                            for subscription in subscriptions {
233                                let key = subscription_to_key(&subscription);
234                                self.subscriptions.mark_unsubscribe(&key);
235
236                                let request = HyperliquidWsRequest::Unsubscribe { subscription };
237                                match serde_json::to_string(&request) {
238                                    Ok(payload) => {
239                                        log::debug!("Sending unsubscribe payload: {payload}");
240                                        if let Err(e) = self.send_with_retry(payload).await {
241                                            log::error!("Error unsubscribing from {key}: {e}");
242                                        }
243                                    }
244                                    Err(e) => {
245                                        log::error!("Error serializing unsubscription for {key}: {e}");
246                                    }
247                                }
248                            }
249                        }
250                        HandlerCommand::InitializeInstruments(instruments) => {
251                            for inst in instruments {
252                                let coin = inst.raw_symbol().inner();
253                                self.instruments.insert(coin, inst);
254                            }
255                        }
256                        HandlerCommand::UpdateInstrument(inst) => {
257                            let coin = inst.raw_symbol().inner();
258                            self.instruments.insert(coin, inst);
259                        }
260                        HandlerCommand::AddBarType { key, bar_type } => {
261                            self.bar_types_cache.insert(key, bar_type);
262                        }
263                        HandlerCommand::RemoveBarType { key } => {
264                            self.bar_types_cache.remove(&key);
265                            self.bar_cache.remove(&key);
266                        }
267                        HandlerCommand::UpdateAssetContextSubs { coin, data_types } => {
268                            if data_types.is_empty() {
269                                self.asset_context_subs.remove(&coin);
270                            } else {
271                                self.asset_context_subs.insert(coin, data_types);
272                            }
273                        }
274                        HandlerCommand::CacheSpotFillCoins(_) => {
275                            // No longer needed - raw_symbol now contains the proper format
276                        }
277                        HandlerCommand::SetDepth10Sub { coin, subscribed } => {
278                            if subscribed {
279                                self.depth10_subs.insert(coin);
280                            } else {
281                                self.depth10_subs.remove(&coin);
282                            }
283                        }
284                    }
285                }
286
287                Some(raw_msg) = self.raw_rx.recv() => {
288                    match raw_msg {
289                        Message::Text(text) => {
290                            if text == RECONNECTED {
291                                log::info!("Received RECONNECTED sentinel");
292                                return Some(NautilusWsMessage::Reconnected);
293                            }
294
295                            match serde_json::from_str::<HyperliquidWsMessage>(&text) {
296                                Ok(msg) => {
297                                    let ts_init = self.clock.get_time_ns();
298
299                                    let nautilus_msgs = Self::parse_to_nautilus_messages(
300                                        msg,
301                                        &self.instruments,
302                                        &self.cloid_cache,
303                                        &self.bar_types_cache,
304                                        self.account_id,
305                                        ts_init,
306                                        &self.asset_context_subs,
307                                        &self.depth10_subs,
308                                        &mut self.processed_trade_ids,
309                                        &mut self.mark_price_cache,
310                                        &mut self.index_price_cache,
311                                        &mut self.funding_rate_cache,
312                                        &mut self.bar_cache,
313                                    );
314
315                                    if !nautilus_msgs.is_empty() {
316                                        let mut iter = nautilus_msgs.into_iter();
317                                        let first = iter.next().unwrap();
318                                        self.message_buffer.extend(iter);
319                                        return Some(first);
320                                    }
321                                }
322                                Err(e) => {
323                                    log::error!("Error parsing WebSocket message: {e}, text: {text}");
324                                }
325                            }
326                        }
327                        Message::Ping(data) => {
328                            if let Some(ref client) = self.client
329                                && let Err(e) = client.send_pong(data.to_vec()).await {
330                                log::error!("Error sending pong: {e}");
331                            }
332                        }
333                        Message::Close(_) => {
334                            log::info!("Received WebSocket close frame");
335                            return None;
336                        }
337                        _ => {}
338                    }
339                }
340
341                else => {
342                    log::debug!("Handler shutting down: stream ended or command channel closed");
343                    return None;
344                }
345            }
346        }
347    }
348
349    #[expect(clippy::too_many_arguments)]
350    fn parse_to_nautilus_messages(
351        msg: HyperliquidWsMessage,
352        instruments: &AHashMap<Ustr, InstrumentAny>,
353        cloid_cache: &DashMap<Ustr, ClientOrderId>,
354        bar_types: &AHashMap<String, BarType>,
355        account_id: Option<AccountId>,
356        ts_init: UnixNanos,
357        asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
358        depth10_subs: &AHashSet<Ustr>,
359        processed_trade_ids: &mut FifoCache<u64, 10_000>,
360        mark_price_cache: &mut AHashMap<Ustr, String>,
361        index_price_cache: &mut AHashMap<Ustr, String>,
362        funding_rate_cache: &mut AHashMap<Ustr, String>,
363        bar_cache: &mut AHashMap<String, CandleData>,
364    ) -> Vec<NautilusWsMessage> {
365        let mut result = Vec::new();
366
367        match msg {
368            HyperliquidWsMessage::OrderUpdates { data } => {
369                if let Some(account_id) = account_id
370                    && let Some(msg) = Self::handle_order_updates(
371                        &data,
372                        instruments,
373                        cloid_cache,
374                        account_id,
375                        ts_init,
376                    )
377                {
378                    result.push(msg);
379                }
380            }
381            HyperliquidWsMessage::UserEvents { data } | HyperliquidWsMessage::User { data } => {
382                // Process fills from userEvents channel (userFills channel is redundant)
383                match data {
384                    WsUserEventData::Fills { fills } => {
385                        log::debug!("Received {} fill(s) from userEvents channel", fills.len());
386                        for fill in &fills {
387                            log::debug!(
388                                "Fill: oid={}, coin={}, side={:?}, sz={}, px={}",
389                                fill.oid,
390                                fill.coin,
391                                fill.side,
392                                fill.sz,
393                                fill.px
394                            );
395                        }
396
397                        if let Some(account_id) = account_id {
398                            log::debug!("Processing fills with account_id={account_id}");
399
400                            if let Some(msg) = Self::handle_user_fills(
401                                &fills,
402                                instruments,
403                                cloid_cache,
404                                account_id,
405                                ts_init,
406                                processed_trade_ids,
407                            ) {
408                                log::debug!("Successfully created fill message");
409                                result.push(msg);
410                            } else {
411                                log::debug!("handle_user_fills returned None (no new fills)");
412                            }
413                        } else {
414                            log::warn!("Cannot process fills: account_id is None");
415                        }
416                    }
417                    WsUserEventData::Liquidation { liquidation } => {
418                        log::warn!(
419                            "Liquidation event: lid={}, liquidator={}, liquidated_user={}, ntl_pos={}, account_value={}",
420                            liquidation.lid,
421                            liquidation.liquidator,
422                            liquidation.liquidated_user,
423                            liquidation.liquidated_ntl_pos,
424                            liquidation.liquidated_account_value,
425                        );
426                    }
427                    _ => {
428                        log::debug!("Received non-fill user event: {data:?}");
429                    }
430                }
431            }
432            HyperliquidWsMessage::UserFills { data } => {
433                // UserFills channel is redundant with userEvents, but handle it for
434                // backwards compatibility if explicitly subscribed
435                if let Some(account_id) = account_id
436                    && let Some(msg) = Self::handle_user_fills(
437                        &data.fills,
438                        instruments,
439                        cloid_cache,
440                        account_id,
441                        ts_init,
442                        processed_trade_ids,
443                    )
444                {
445                    result.push(msg);
446                }
447            }
448            HyperliquidWsMessage::Trades { data } => {
449                if let Some(msg) = Self::handle_trades(&data, instruments, ts_init) {
450                    result.push(msg);
451                }
452            }
453            HyperliquidWsMessage::Bbo { data } => {
454                if let Some(msg) = Self::handle_bbo(&data, instruments, ts_init) {
455                    result.push(msg);
456                }
457            }
458            HyperliquidWsMessage::L2Book { data } => {
459                result.extend(Self::handle_l2_book(
460                    &data,
461                    instruments,
462                    depth10_subs,
463                    ts_init,
464                ));
465            }
466            HyperliquidWsMessage::Candle { data } => {
467                if let Some(msg) =
468                    Self::handle_candle(&data, instruments, bar_types, bar_cache, ts_init)
469                {
470                    result.push(msg);
471                }
472            }
473            HyperliquidWsMessage::ActiveAssetCtx { data }
474            | HyperliquidWsMessage::ActiveSpotAssetCtx { data } => {
475                result.extend(Self::handle_asset_context(
476                    &data,
477                    instruments,
478                    asset_context_subs,
479                    mark_price_cache,
480                    index_price_cache,
481                    funding_rate_cache,
482                    ts_init,
483                ));
484            }
485            HyperliquidWsMessage::Error { data } => {
486                log::warn!("Received error from Hyperliquid WebSocket: {data}");
487            }
488            // Ignore other message types (subscription confirmations, etc)
489            _ => {}
490        }
491
492        result
493    }
494
495    fn handle_order_updates(
496        data: &[super::messages::WsOrderData],
497        instruments: &AHashMap<Ustr, InstrumentAny>,
498        cloid_cache: &DashMap<Ustr, ClientOrderId>,
499        account_id: AccountId,
500        ts_init: UnixNanos,
501    ) -> Option<NautilusWsMessage> {
502        let mut exec_reports = Vec::new();
503
504        for order_update in data {
505            let instrument = instruments.get(&order_update.order.coin);
506
507            if let Some(instrument) = instrument {
508                match parse_ws_order_status_report(order_update, instrument, account_id, ts_init) {
509                    Ok(mut report) => {
510                        // Resolve cloid to real client_order_id if cached
511                        if let Some(cloid) = &order_update.order.cloid {
512                            let cloid_ustr = Ustr::from(cloid.as_str());
513                            if let Some(entry) = cloid_cache.get(&cloid_ustr) {
514                                let real_client_order_id = *entry.value();
515                                log::debug!("Resolved cloid {cloid} -> {real_client_order_id}");
516                                report.client_order_id = Some(real_client_order_id);
517                            }
518                        }
519                        exec_reports.push(ExecutionReport::Order(report));
520                    }
521                    Err(e) => {
522                        log::error!("Error parsing order update: {e}");
523                    }
524                }
525            } else {
526                log::debug!("No instrument found for coin: {}", order_update.order.coin);
527            }
528        }
529
530        if exec_reports.is_empty() {
531            None
532        } else {
533            Some(NautilusWsMessage::ExecutionReports(exec_reports))
534        }
535    }
536
537    fn handle_user_fills(
538        fills: &[super::messages::WsFillData],
539        instruments: &AHashMap<Ustr, InstrumentAny>,
540        cloid_cache: &DashMap<Ustr, ClientOrderId>,
541        account_id: AccountId,
542        ts_init: UnixNanos,
543        processed_trade_ids: &mut FifoCache<u64, 10_000>,
544    ) -> Option<NautilusWsMessage> {
545        let mut exec_reports = Vec::new();
546
547        for fill in fills {
548            if processed_trade_ids.contains(&fill.tid) {
549                log::debug!("Skipping duplicate fill: tid={}", fill.tid);
550                continue;
551            }
552
553            let instrument = instruments.get(&fill.coin);
554
555            if let Some(instrument) = instrument {
556                log::debug!("Found instrument for fill coin={}", fill.coin);
557                match parse_ws_fill_report(fill, instrument, account_id, ts_init) {
558                    Ok(mut report) => {
559                        // Mark processed only after successful parse
560                        processed_trade_ids.add(fill.tid);
561
562                        if let Some(cloid) = &fill.cloid {
563                            let cloid_ustr = Ustr::from(cloid.as_str());
564                            if let Some(entry) = cloid_cache.get(&cloid_ustr) {
565                                let real_client_order_id = *entry.value();
566                                log::debug!(
567                                    "Resolved fill cloid {cloid} -> {real_client_order_id}"
568                                );
569                                report.client_order_id = Some(real_client_order_id);
570                            }
571                        }
572                        log::debug!(
573                            "Parsed fill report: venue_order_id={:?}, trade_id={:?}",
574                            report.venue_order_id,
575                            report.trade_id
576                        );
577                        exec_reports.push(ExecutionReport::Fill(report));
578                    }
579                    Err(e) => {
580                        log::error!("Error parsing fill: {e}");
581                    }
582                }
583            } else {
584                // Not marked as processed so fill is retried if instrument loads later
585                log::warn!(
586                    "No instrument found for fill coin={}. Keys: {:?}",
587                    fill.coin,
588                    instruments.keys().collect::<Vec<_>>()
589                );
590            }
591        }
592
593        if exec_reports.is_empty() {
594            None
595        } else {
596            Some(NautilusWsMessage::ExecutionReports(exec_reports))
597        }
598    }
599
600    fn handle_trades(
601        data: &[super::messages::WsTradeData],
602        instruments: &AHashMap<Ustr, InstrumentAny>,
603        ts_init: UnixNanos,
604    ) -> Option<NautilusWsMessage> {
605        let mut trade_ticks = Vec::new();
606
607        for trade in data {
608            if let Some(instrument) = instruments.get(&trade.coin) {
609                match parse_ws_trade_tick(trade, instrument, ts_init) {
610                    Ok(tick) => trade_ticks.push(tick),
611                    Err(e) => {
612                        log::error!("Error parsing trade tick: {e}");
613                    }
614                }
615            } else {
616                log::debug!("No instrument found for coin: {}", trade.coin);
617            }
618        }
619
620        if trade_ticks.is_empty() {
621            None
622        } else {
623            Some(NautilusWsMessage::Trades(trade_ticks))
624        }
625    }
626
627    fn handle_bbo(
628        data: &super::messages::WsBboData,
629        instruments: &AHashMap<Ustr, InstrumentAny>,
630        ts_init: UnixNanos,
631    ) -> Option<NautilusWsMessage> {
632        if let Some(instrument) = instruments.get(&data.coin) {
633            match parse_ws_quote_tick(data, instrument, ts_init) {
634                Ok(quote_tick) => Some(NautilusWsMessage::Quote(quote_tick)),
635                Err(e) => {
636                    log::error!("Error parsing quote tick: {e}");
637                    None
638                }
639            }
640        } else {
641            log::debug!("No instrument found for coin: {}", data.coin);
642            None
643        }
644    }
645
646    fn handle_l2_book(
647        data: &super::messages::WsBookData,
648        instruments: &AHashMap<Ustr, InstrumentAny>,
649        depth10_subs: &AHashSet<Ustr>,
650        ts_init: UnixNanos,
651    ) -> Vec<NautilusWsMessage> {
652        let mut out = Vec::new();
653
654        let Some(instrument) = instruments.get(&data.coin) else {
655            log::debug!("No instrument found for coin: {}", data.coin);
656            return out;
657        };
658
659        match parse_ws_order_book_deltas(data, instrument, ts_init) {
660            Ok(deltas) => out.push(NautilusWsMessage::Deltas(deltas)),
661            Err(e) => log::error!("Error parsing order book deltas: {e}"),
662        }
663
664        if depth10_subs.contains(&data.coin) {
665            match parse_ws_order_book_depth10(data, instrument, ts_init) {
666                Ok(depth) => out.push(NautilusWsMessage::Depth10(Box::new(depth))),
667                Err(e) => log::error!("Error parsing order book depth10: {e}"),
668            }
669        }
670
671        out
672    }
673
674    fn handle_candle(
675        data: &CandleData,
676        instruments: &AHashMap<Ustr, InstrumentAny>,
677        bar_types: &AHashMap<String, BarType>,
678        bar_cache: &mut AHashMap<String, CandleData>,
679        ts_init: UnixNanos,
680    ) -> Option<NautilusWsMessage> {
681        let key = format!("candle:{}:{}", data.s, data.i);
682
683        let mut closed_bar = None;
684
685        if let Some(cached) = bar_cache.get(&key) {
686            // Emit cached bar when close_time changes, indicating the previous period closed
687            if cached.close_time != data.close_time {
688                log::debug!(
689                    "Bar period changed for {}: prev_close_time={}, new_close_time={}",
690                    data.s,
691                    cached.close_time,
692                    data.close_time
693                );
694                closed_bar = Some(cached.clone());
695            }
696        }
697
698        bar_cache.insert(key.clone(), data.clone());
699
700        if let Some(closed_data) = closed_bar {
701            if let Some(bar_type) = bar_types.get(&key) {
702                if let Some(instrument) = instruments.get(&data.s) {
703                    match parse_ws_candle(&closed_data, instrument, bar_type, ts_init) {
704                        Ok(bar) => return Some(NautilusWsMessage::Candle(bar)),
705                        Err(e) => {
706                            log::error!("Error parsing closed candle: {e}");
707                        }
708                    }
709                } else {
710                    log::debug!("No instrument found for coin: {}", data.s);
711                }
712            } else {
713                log::debug!("No bar type found for key: {key}");
714            }
715        }
716
717        None
718    }
719
720    fn handle_asset_context(
721        data: &WsActiveAssetCtxData,
722        instruments: &AHashMap<Ustr, InstrumentAny>,
723        asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
724        mark_price_cache: &mut AHashMap<Ustr, String>,
725        index_price_cache: &mut AHashMap<Ustr, String>,
726        funding_rate_cache: &mut AHashMap<Ustr, String>,
727        ts_init: UnixNanos,
728    ) -> Vec<NautilusWsMessage> {
729        let mut result = Vec::new();
730
731        let coin = match data {
732            WsActiveAssetCtxData::Perp { coin, .. } => coin,
733            WsActiveAssetCtxData::Spot { coin, .. } => coin,
734        };
735
736        if let Some(instrument) = instruments.get(coin) {
737            let (mark_px, oracle_px, funding) = match data {
738                WsActiveAssetCtxData::Perp { ctx, .. } => (
739                    &ctx.shared.mark_px,
740                    Some(&ctx.oracle_px),
741                    Some(&ctx.funding),
742                ),
743                WsActiveAssetCtxData::Spot { ctx, .. } => (&ctx.shared.mark_px, None, None),
744            };
745
746            let mark_changed = mark_price_cache.get(coin) != Some(mark_px);
747            let index_changed = oracle_px.is_some_and(|px| index_price_cache.get(coin) != Some(px));
748            let funding_changed =
749                funding.is_some_and(|rate| funding_rate_cache.get(coin) != Some(rate));
750
751            let subscribed_types = asset_context_subs.get(coin);
752
753            if mark_changed || index_changed || funding_changed {
754                match parse_ws_asset_context(data, instrument, ts_init) {
755                    Ok((mark_price, index_price, funding_rate)) => {
756                        if mark_changed
757                            && subscribed_types
758                                .is_some_and(|s| s.contains(&AssetContextDataType::MarkPrice))
759                        {
760                            mark_price_cache.insert(*coin, mark_px.clone());
761                            result.push(NautilusWsMessage::MarkPrice(mark_price));
762                        }
763
764                        if index_changed
765                            && subscribed_types
766                                .is_some_and(|s| s.contains(&AssetContextDataType::IndexPrice))
767                        {
768                            if let Some(px) = oracle_px {
769                                index_price_cache.insert(*coin, px.clone());
770                            }
771
772                            if let Some(index) = index_price {
773                                result.push(NautilusWsMessage::IndexPrice(index));
774                            }
775                        }
776
777                        if funding_changed
778                            && subscribed_types
779                                .is_some_and(|s| s.contains(&AssetContextDataType::FundingRate))
780                        {
781                            if let Some(rate) = funding {
782                                funding_rate_cache.insert(*coin, rate.clone());
783                            }
784
785                            if let Some(funding) = funding_rate {
786                                result.push(NautilusWsMessage::FundingRate(funding));
787                            }
788                        }
789                    }
790                    Err(e) => {
791                        log::error!("Error parsing asset context: {e}");
792                    }
793                }
794            }
795        } else {
796            log::debug!("No instrument found for coin: {coin}");
797        }
798
799        result
800    }
801}
802
803pub(crate) fn subscription_to_key(sub: &SubscriptionRequest) -> String {
804    match sub {
805        SubscriptionRequest::AllMids { dex } => {
806            if let Some(dex_name) = dex {
807                format!("{}:{dex_name}", HyperliquidWsChannel::AllMids.as_str())
808            } else {
809                HyperliquidWsChannel::AllMids.as_str().to_string()
810            }
811        }
812        SubscriptionRequest::Notification { user } => {
813            format!("{}:{user}", HyperliquidWsChannel::Notification.as_str())
814        }
815        SubscriptionRequest::WebData2 { user } => {
816            format!("{}:{user}", HyperliquidWsChannel::WebData2.as_str())
817        }
818        SubscriptionRequest::Candle { coin, interval } => {
819            format!(
820                "{}:{coin}:{}",
821                HyperliquidWsChannel::Candle.as_str(),
822                interval.as_str()
823            )
824        }
825        SubscriptionRequest::L2Book { coin, .. } => {
826            format!("{}:{coin}", HyperliquidWsChannel::L2Book.as_str())
827        }
828        SubscriptionRequest::Trades { coin } => {
829            format!("{}:{coin}", HyperliquidWsChannel::Trades.as_str())
830        }
831        SubscriptionRequest::OrderUpdates { user } => {
832            format!("{}:{user}", HyperliquidWsChannel::OrderUpdates.as_str())
833        }
834        SubscriptionRequest::UserEvents { user } => {
835            format!("{}:{user}", HyperliquidWsChannel::UserEvents.as_str())
836        }
837        SubscriptionRequest::UserFills { user, .. } => {
838            format!("{}:{user}", HyperliquidWsChannel::UserFills.as_str())
839        }
840        SubscriptionRequest::UserFundings { user } => {
841            format!("{}:{user}", HyperliquidWsChannel::UserFundings.as_str())
842        }
843        SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
844            format!(
845                "{}:{user}",
846                HyperliquidWsChannel::UserNonFundingLedgerUpdates.as_str()
847            )
848        }
849        SubscriptionRequest::ActiveAssetCtx { coin } => {
850            format!("{}:{coin}", HyperliquidWsChannel::ActiveAssetCtx.as_str())
851        }
852        SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
853            format!(
854                "{}:{coin}",
855                HyperliquidWsChannel::ActiveSpotAssetCtx.as_str()
856            )
857        }
858        SubscriptionRequest::ActiveAssetData { user, coin } => {
859            format!(
860                "{}:{user}:{coin}",
861                HyperliquidWsChannel::ActiveAssetData.as_str()
862            )
863        }
864        SubscriptionRequest::UserTwapSliceFills { user } => {
865            format!(
866                "{}:{user}",
867                HyperliquidWsChannel::UserTwapSliceFills.as_str()
868            )
869        }
870        SubscriptionRequest::UserTwapHistory { user } => {
871            format!("{}:{user}", HyperliquidWsChannel::UserTwapHistory.as_str())
872        }
873        SubscriptionRequest::Bbo { coin } => {
874            format!("{}:{coin}", HyperliquidWsChannel::Bbo.as_str())
875        }
876    }
877}
878
879/// Determines whether a Hyperliquid WebSocket error should trigger a retry.
880pub(crate) fn should_retry_hyperliquid_error(error: &HyperliquidWsError) -> bool {
881    match error {
882        HyperliquidWsError::TungsteniteError(_) => true,
883        HyperliquidWsError::ClientError(msg) => {
884            let msg_lower = msg.to_lowercase();
885            msg_lower.contains("timeout")
886                || msg_lower.contains("timed out")
887                || msg_lower.contains("connection")
888                || msg_lower.contains("network")
889        }
890        _ => false,
891    }
892}
893
894/// Creates a timeout error for Hyperliquid retry logic.
895pub(crate) fn create_hyperliquid_timeout_error(msg: String) -> HyperliquidWsError {
896    HyperliquidWsError::ClientError(msg)
897}
898
899#[cfg(test)]
900mod tests {
901    use ahash::{AHashMap, AHashSet};
902    use nautilus_core::nanos::UnixNanos;
903    use nautilus_model::{
904        identifiers::{InstrumentId, Symbol, Venue},
905        instruments::{CryptoPerpetual, InstrumentAny},
906        types::{Currency, Price, Quantity},
907    };
908    use rstest::rstest;
909    use ustr::Ustr;
910
911    use super::{
912        super::messages::{NautilusWsMessage, WsBookData, WsLevelData},
913        FeedHandler,
914    };
915
916    fn btc_perp() -> InstrumentAny {
917        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
918            InstrumentId::new(Symbol::new("BTC-PERP"), Venue::new("HYPERLIQUID")),
919            Symbol::new("BTC-PERP"),
920            Currency::from("BTC"),
921            Currency::from("USDC"),
922            Currency::from("USDC"),
923            false,
924            2,
925            3,
926            Price::from("0.01"),
927            Quantity::from("0.001"),
928            None,
929            None,
930            None,
931            None,
932            None,
933            None,
934            None,
935            None,
936            None,
937            None,
938            None,
939            None,
940            None,
941            UnixNanos::default(),
942            UnixNanos::default(),
943        ))
944    }
945
946    fn one_level_book() -> WsBookData {
947        WsBookData {
948            coin: Ustr::from("BTC"),
949            levels: [
950                vec![WsLevelData {
951                    px: "100.00".to_string(),
952                    sz: "1.0".to_string(),
953                    n: 1,
954                }],
955                vec![WsLevelData {
956                    px: "100.01".to_string(),
957                    sz: "1.0".to_string(),
958                    n: 1,
959                }],
960            ],
961            time: 1_700_000_000_000,
962        }
963    }
964
965    #[rstest]
966    fn handle_l2_book_emits_deltas_only_when_not_in_depth10_subs() {
967        let mut instruments = AHashMap::new();
968        instruments.insert(Ustr::from("BTC"), btc_perp());
969        let depth10_subs = AHashSet::<Ustr>::new();
970
971        let msgs = FeedHandler::handle_l2_book(
972            &one_level_book(),
973            &instruments,
974            &depth10_subs,
975            UnixNanos::default(),
976        );
977
978        assert_eq!(msgs.len(), 1);
979        assert!(matches!(msgs[0], NautilusWsMessage::Deltas(_)));
980    }
981
982    #[rstest]
983    fn handle_l2_book_emits_deltas_and_depth10_when_coin_in_subs() {
984        let mut instruments = AHashMap::new();
985        instruments.insert(Ustr::from("BTC"), btc_perp());
986        let mut depth10_subs = AHashSet::<Ustr>::new();
987        depth10_subs.insert(Ustr::from("BTC"));
988
989        let msgs = FeedHandler::handle_l2_book(
990            &one_level_book(),
991            &instruments,
992            &depth10_subs,
993            UnixNanos::default(),
994        );
995
996        assert_eq!(msgs.len(), 2);
997        assert!(matches!(msgs[0], NautilusWsMessage::Deltas(_)));
998        assert!(matches!(msgs[1], NautilusWsMessage::Depth10(_)));
999    }
1000
1001    #[rstest]
1002    fn handle_l2_book_returns_empty_when_instrument_unknown() {
1003        let instruments = AHashMap::<Ustr, InstrumentAny>::new();
1004        let depth10_subs = AHashSet::<Ustr>::new();
1005
1006        let msgs = FeedHandler::handle_l2_book(
1007            &one_level_book(),
1008            &instruments,
1009            &depth10_subs,
1010            UnixNanos::default(),
1011        );
1012
1013        assert!(msgs.is_empty());
1014    }
1015}