1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use dashmap::DashMap;
25use nautilus_common::cache::fifo::FifoCache;
26use nautilus_core::{AtomicTime, nanos::UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28 data::BarType,
29 identifiers::{AccountId, ClientOrderId},
30 instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::{
33 RECONNECTED,
34 retry::{RetryManager, create_websocket_retry_manager},
35 websocket::{SubscriptionState, WebSocketClient},
36};
37use tokio_tungstenite::tungstenite::Message;
38use ustr::Ustr;
39
40use super::{
41 client::AssetContextDataType,
42 enums::HyperliquidWsChannel,
43 error::HyperliquidWsError,
44 messages::{
45 CandleData, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest, NautilusWsMessage,
46 SubscriptionRequest, WsActiveAssetCtxData, WsUserEventData,
47 },
48 parse::{
49 parse_ws_asset_context, parse_ws_candle, parse_ws_fill_report, parse_ws_order_book_deltas,
50 parse_ws_order_book_depth10, parse_ws_order_status_report, parse_ws_quote_tick,
51 parse_ws_trade_tick,
52 },
53};
54
55#[derive(Debug)]
57#[expect(
58 clippy::large_enum_variant,
59 reason = "Commands are ephemeral and immediately consumed"
60)]
61#[allow(private_interfaces)]
62pub enum HandlerCommand {
63 SetClient(WebSocketClient),
65 Disconnect,
67 Subscribe {
69 subscriptions: Vec<SubscriptionRequest>,
70 },
71 Unsubscribe {
73 subscriptions: Vec<SubscriptionRequest>,
74 },
75 InitializeInstruments(Vec<InstrumentAny>),
77 UpdateInstrument(InstrumentAny),
79 AddBarType { key: String, bar_type: BarType },
81 RemoveBarType { key: String },
83 UpdateAssetContextSubs {
85 coin: Ustr,
86 data_types: AHashSet<AssetContextDataType>,
87 },
88 CacheSpotFillCoins(AHashMap<Ustr, Ustr>),
90 SetDepth10Sub { coin: Ustr, subscribed: bool },
93}
94
95pub(super) struct FeedHandler {
96 clock: &'static AtomicTime,
97 signal: Arc<AtomicBool>,
98 client: Option<WebSocketClient>,
99 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
100 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
101 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
102 account_id: Option<AccountId>,
103 subscriptions: SubscriptionState,
104 retry_manager: RetryManager<HyperliquidWsError>,
105 message_buffer: Vec<NautilusWsMessage>,
106 instruments: AHashMap<Ustr, InstrumentAny>,
107 cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
108 bar_types_cache: AHashMap<String, BarType>,
109 bar_cache: AHashMap<String, CandleData>,
110 asset_context_subs: AHashMap<Ustr, AHashSet<AssetContextDataType>>,
111 depth10_subs: AHashSet<Ustr>,
112 processed_trade_ids: FifoCache<u64, 10_000>,
113 mark_price_cache: AHashMap<Ustr, String>,
114 index_price_cache: AHashMap<Ustr, String>,
115 funding_rate_cache: AHashMap<Ustr, String>,
116}
117
118impl FeedHandler {
119 pub(super) fn new(
121 signal: Arc<AtomicBool>,
122 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
123 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
124 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
125 account_id: Option<AccountId>,
126 subscriptions: SubscriptionState,
127 cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
128 ) -> Self {
129 Self {
130 clock: get_atomic_clock_realtime(),
131 signal,
132 client: None,
133 cmd_rx,
134 raw_rx,
135 out_tx,
136 account_id,
137 subscriptions,
138 retry_manager: create_websocket_retry_manager(),
139 message_buffer: Vec::new(),
140 instruments: AHashMap::new(),
141 cloid_cache,
142 bar_types_cache: AHashMap::new(),
143 bar_cache: AHashMap::new(),
144 asset_context_subs: AHashMap::new(),
145 depth10_subs: AHashSet::new(),
146 processed_trade_ids: FifoCache::new(),
147 mark_price_cache: AHashMap::new(),
148 index_price_cache: AHashMap::new(),
149 funding_rate_cache: AHashMap::new(),
150 }
151 }
152
153 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), String> {
155 self.out_tx
156 .send(msg)
157 .map_err(|e| format!("Failed to send message: {e}"))
158 }
159
160 pub(super) fn is_stopped(&self) -> bool {
162 self.signal.load(Ordering::Relaxed)
163 }
164
165 async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
166 if let Some(client) = &self.client {
167 self.retry_manager
168 .execute_with_retry(
169 "websocket_send",
170 || {
171 let payload = payload.clone();
172 async move {
173 client.send_text(payload, None).await.map_err(|e| {
174 HyperliquidWsError::ClientError(format!("Send failed: {e}"))
175 })
176 }
177 },
178 should_retry_hyperliquid_error,
179 create_hyperliquid_timeout_error,
180 )
181 .await
182 .map_err(|e| anyhow::anyhow!("{e}"))
183 } else {
184 Err(anyhow::anyhow!("No WebSocket client available"))
185 }
186 }
187
188 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
189 if !self.message_buffer.is_empty() {
190 return Some(self.message_buffer.remove(0));
191 }
192
193 loop {
194 tokio::select! {
195 Some(cmd) = self.cmd_rx.recv() => {
196 match cmd {
197 HandlerCommand::SetClient(client) => {
198 log::debug!("Setting WebSocket client in handler");
199 self.client = Some(client);
200 }
201 HandlerCommand::Disconnect => {
202 log::debug!("Handler received disconnect command");
203
204 if let Some(ref client) = self.client {
205 client.disconnect().await;
206 }
207 self.signal.store(true, Ordering::SeqCst);
208 return None;
209 }
210 HandlerCommand::Subscribe { subscriptions } => {
211 for subscription in subscriptions {
212 let key = subscription_to_key(&subscription);
213 self.subscriptions.mark_subscribe(&key);
214
215 let request = HyperliquidWsRequest::Subscribe { subscription };
216 match serde_json::to_string(&request) {
217 Ok(payload) => {
218 log::debug!("Sending subscribe payload: {payload}");
219 if let Err(e) = self.send_with_retry(payload).await {
220 log::error!("Error subscribing to {key}: {e}");
221 self.subscriptions.mark_failure(&key);
222 }
223 }
224 Err(e) => {
225 log::error!("Error serializing subscription for {key}: {e}");
226 self.subscriptions.mark_failure(&key);
227 }
228 }
229 }
230 }
231 HandlerCommand::Unsubscribe { subscriptions } => {
232 for subscription in subscriptions {
233 let key = subscription_to_key(&subscription);
234 self.subscriptions.mark_unsubscribe(&key);
235
236 let request = HyperliquidWsRequest::Unsubscribe { subscription };
237 match serde_json::to_string(&request) {
238 Ok(payload) => {
239 log::debug!("Sending unsubscribe payload: {payload}");
240 if let Err(e) = self.send_with_retry(payload).await {
241 log::error!("Error unsubscribing from {key}: {e}");
242 }
243 }
244 Err(e) => {
245 log::error!("Error serializing unsubscription for {key}: {e}");
246 }
247 }
248 }
249 }
250 HandlerCommand::InitializeInstruments(instruments) => {
251 for inst in instruments {
252 let coin = inst.raw_symbol().inner();
253 self.instruments.insert(coin, inst);
254 }
255 }
256 HandlerCommand::UpdateInstrument(inst) => {
257 let coin = inst.raw_symbol().inner();
258 self.instruments.insert(coin, inst);
259 }
260 HandlerCommand::AddBarType { key, bar_type } => {
261 self.bar_types_cache.insert(key, bar_type);
262 }
263 HandlerCommand::RemoveBarType { key } => {
264 self.bar_types_cache.remove(&key);
265 self.bar_cache.remove(&key);
266 }
267 HandlerCommand::UpdateAssetContextSubs { coin, data_types } => {
268 if data_types.is_empty() {
269 self.asset_context_subs.remove(&coin);
270 } else {
271 self.asset_context_subs.insert(coin, data_types);
272 }
273 }
274 HandlerCommand::CacheSpotFillCoins(_) => {
275 }
277 HandlerCommand::SetDepth10Sub { coin, subscribed } => {
278 if subscribed {
279 self.depth10_subs.insert(coin);
280 } else {
281 self.depth10_subs.remove(&coin);
282 }
283 }
284 }
285 }
286
287 Some(raw_msg) = self.raw_rx.recv() => {
288 match raw_msg {
289 Message::Text(text) => {
290 if text == RECONNECTED {
291 log::info!("Received RECONNECTED sentinel");
292 return Some(NautilusWsMessage::Reconnected);
293 }
294
295 match serde_json::from_str::<HyperliquidWsMessage>(&text) {
296 Ok(msg) => {
297 let ts_init = self.clock.get_time_ns();
298
299 let nautilus_msgs = Self::parse_to_nautilus_messages(
300 msg,
301 &self.instruments,
302 &self.cloid_cache,
303 &self.bar_types_cache,
304 self.account_id,
305 ts_init,
306 &self.asset_context_subs,
307 &self.depth10_subs,
308 &mut self.processed_trade_ids,
309 &mut self.mark_price_cache,
310 &mut self.index_price_cache,
311 &mut self.funding_rate_cache,
312 &mut self.bar_cache,
313 );
314
315 if !nautilus_msgs.is_empty() {
316 let mut iter = nautilus_msgs.into_iter();
317 let first = iter.next().unwrap();
318 self.message_buffer.extend(iter);
319 return Some(first);
320 }
321 }
322 Err(e) => {
323 log::error!("Error parsing WebSocket message: {e}, text: {text}");
324 }
325 }
326 }
327 Message::Ping(data) => {
328 if let Some(ref client) = self.client
329 && let Err(e) = client.send_pong(data.to_vec()).await {
330 log::error!("Error sending pong: {e}");
331 }
332 }
333 Message::Close(_) => {
334 log::info!("Received WebSocket close frame");
335 return None;
336 }
337 _ => {}
338 }
339 }
340
341 else => {
342 log::debug!("Handler shutting down: stream ended or command channel closed");
343 return None;
344 }
345 }
346 }
347 }
348
349 #[expect(clippy::too_many_arguments)]
350 fn parse_to_nautilus_messages(
351 msg: HyperliquidWsMessage,
352 instruments: &AHashMap<Ustr, InstrumentAny>,
353 cloid_cache: &DashMap<Ustr, ClientOrderId>,
354 bar_types: &AHashMap<String, BarType>,
355 account_id: Option<AccountId>,
356 ts_init: UnixNanos,
357 asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
358 depth10_subs: &AHashSet<Ustr>,
359 processed_trade_ids: &mut FifoCache<u64, 10_000>,
360 mark_price_cache: &mut AHashMap<Ustr, String>,
361 index_price_cache: &mut AHashMap<Ustr, String>,
362 funding_rate_cache: &mut AHashMap<Ustr, String>,
363 bar_cache: &mut AHashMap<String, CandleData>,
364 ) -> Vec<NautilusWsMessage> {
365 let mut result = Vec::new();
366
367 match msg {
368 HyperliquidWsMessage::OrderUpdates { data } => {
369 if let Some(account_id) = account_id
370 && let Some(msg) = Self::handle_order_updates(
371 &data,
372 instruments,
373 cloid_cache,
374 account_id,
375 ts_init,
376 )
377 {
378 result.push(msg);
379 }
380 }
381 HyperliquidWsMessage::UserEvents { data } | HyperliquidWsMessage::User { data } => {
382 match data {
384 WsUserEventData::Fills { fills } => {
385 log::debug!("Received {} fill(s) from userEvents channel", fills.len());
386 for fill in &fills {
387 log::debug!(
388 "Fill: oid={}, coin={}, side={:?}, sz={}, px={}",
389 fill.oid,
390 fill.coin,
391 fill.side,
392 fill.sz,
393 fill.px
394 );
395 }
396
397 if let Some(account_id) = account_id {
398 log::debug!("Processing fills with account_id={account_id}");
399
400 if let Some(msg) = Self::handle_user_fills(
401 &fills,
402 instruments,
403 cloid_cache,
404 account_id,
405 ts_init,
406 processed_trade_ids,
407 ) {
408 log::debug!("Successfully created fill message");
409 result.push(msg);
410 } else {
411 log::debug!("handle_user_fills returned None (no new fills)");
412 }
413 } else {
414 log::warn!("Cannot process fills: account_id is None");
415 }
416 }
417 WsUserEventData::Liquidation { liquidation } => {
418 log::warn!(
419 "Liquidation event: lid={}, liquidator={}, liquidated_user={}, ntl_pos={}, account_value={}",
420 liquidation.lid,
421 liquidation.liquidator,
422 liquidation.liquidated_user,
423 liquidation.liquidated_ntl_pos,
424 liquidation.liquidated_account_value,
425 );
426 }
427 _ => {
428 log::debug!("Received non-fill user event: {data:?}");
429 }
430 }
431 }
432 HyperliquidWsMessage::UserFills { data } => {
433 if let Some(account_id) = account_id
436 && let Some(msg) = Self::handle_user_fills(
437 &data.fills,
438 instruments,
439 cloid_cache,
440 account_id,
441 ts_init,
442 processed_trade_ids,
443 )
444 {
445 result.push(msg);
446 }
447 }
448 HyperliquidWsMessage::Trades { data } => {
449 if let Some(msg) = Self::handle_trades(&data, instruments, ts_init) {
450 result.push(msg);
451 }
452 }
453 HyperliquidWsMessage::Bbo { data } => {
454 if let Some(msg) = Self::handle_bbo(&data, instruments, ts_init) {
455 result.push(msg);
456 }
457 }
458 HyperliquidWsMessage::L2Book { data } => {
459 result.extend(Self::handle_l2_book(
460 &data,
461 instruments,
462 depth10_subs,
463 ts_init,
464 ));
465 }
466 HyperliquidWsMessage::Candle { data } => {
467 if let Some(msg) =
468 Self::handle_candle(&data, instruments, bar_types, bar_cache, ts_init)
469 {
470 result.push(msg);
471 }
472 }
473 HyperliquidWsMessage::ActiveAssetCtx { data }
474 | HyperliquidWsMessage::ActiveSpotAssetCtx { data } => {
475 result.extend(Self::handle_asset_context(
476 &data,
477 instruments,
478 asset_context_subs,
479 mark_price_cache,
480 index_price_cache,
481 funding_rate_cache,
482 ts_init,
483 ));
484 }
485 HyperliquidWsMessage::Error { data } => {
486 log::warn!("Received error from Hyperliquid WebSocket: {data}");
487 }
488 _ => {}
490 }
491
492 result
493 }
494
495 fn handle_order_updates(
496 data: &[super::messages::WsOrderData],
497 instruments: &AHashMap<Ustr, InstrumentAny>,
498 cloid_cache: &DashMap<Ustr, ClientOrderId>,
499 account_id: AccountId,
500 ts_init: UnixNanos,
501 ) -> Option<NautilusWsMessage> {
502 let mut exec_reports = Vec::new();
503
504 for order_update in data {
505 let instrument = instruments.get(&order_update.order.coin);
506
507 if let Some(instrument) = instrument {
508 match parse_ws_order_status_report(order_update, instrument, account_id, ts_init) {
509 Ok(mut report) => {
510 if let Some(cloid) = &order_update.order.cloid {
512 let cloid_ustr = Ustr::from(cloid.as_str());
513 if let Some(entry) = cloid_cache.get(&cloid_ustr) {
514 let real_client_order_id = *entry.value();
515 log::debug!("Resolved cloid {cloid} -> {real_client_order_id}");
516 report.client_order_id = Some(real_client_order_id);
517 }
518 }
519 exec_reports.push(ExecutionReport::Order(report));
520 }
521 Err(e) => {
522 log::error!("Error parsing order update: {e}");
523 }
524 }
525 } else {
526 log::debug!("No instrument found for coin: {}", order_update.order.coin);
527 }
528 }
529
530 if exec_reports.is_empty() {
531 None
532 } else {
533 Some(NautilusWsMessage::ExecutionReports(exec_reports))
534 }
535 }
536
537 fn handle_user_fills(
538 fills: &[super::messages::WsFillData],
539 instruments: &AHashMap<Ustr, InstrumentAny>,
540 cloid_cache: &DashMap<Ustr, ClientOrderId>,
541 account_id: AccountId,
542 ts_init: UnixNanos,
543 processed_trade_ids: &mut FifoCache<u64, 10_000>,
544 ) -> Option<NautilusWsMessage> {
545 let mut exec_reports = Vec::new();
546
547 for fill in fills {
548 if processed_trade_ids.contains(&fill.tid) {
549 log::debug!("Skipping duplicate fill: tid={}", fill.tid);
550 continue;
551 }
552
553 let instrument = instruments.get(&fill.coin);
554
555 if let Some(instrument) = instrument {
556 log::debug!("Found instrument for fill coin={}", fill.coin);
557 match parse_ws_fill_report(fill, instrument, account_id, ts_init) {
558 Ok(mut report) => {
559 processed_trade_ids.add(fill.tid);
561
562 if let Some(cloid) = &fill.cloid {
563 let cloid_ustr = Ustr::from(cloid.as_str());
564 if let Some(entry) = cloid_cache.get(&cloid_ustr) {
565 let real_client_order_id = *entry.value();
566 log::debug!(
567 "Resolved fill cloid {cloid} -> {real_client_order_id}"
568 );
569 report.client_order_id = Some(real_client_order_id);
570 }
571 }
572 log::debug!(
573 "Parsed fill report: venue_order_id={:?}, trade_id={:?}",
574 report.venue_order_id,
575 report.trade_id
576 );
577 exec_reports.push(ExecutionReport::Fill(report));
578 }
579 Err(e) => {
580 log::error!("Error parsing fill: {e}");
581 }
582 }
583 } else {
584 log::warn!(
586 "No instrument found for fill coin={}. Keys: {:?}",
587 fill.coin,
588 instruments.keys().collect::<Vec<_>>()
589 );
590 }
591 }
592
593 if exec_reports.is_empty() {
594 None
595 } else {
596 Some(NautilusWsMessage::ExecutionReports(exec_reports))
597 }
598 }
599
600 fn handle_trades(
601 data: &[super::messages::WsTradeData],
602 instruments: &AHashMap<Ustr, InstrumentAny>,
603 ts_init: UnixNanos,
604 ) -> Option<NautilusWsMessage> {
605 let mut trade_ticks = Vec::new();
606
607 for trade in data {
608 if let Some(instrument) = instruments.get(&trade.coin) {
609 match parse_ws_trade_tick(trade, instrument, ts_init) {
610 Ok(tick) => trade_ticks.push(tick),
611 Err(e) => {
612 log::error!("Error parsing trade tick: {e}");
613 }
614 }
615 } else {
616 log::debug!("No instrument found for coin: {}", trade.coin);
617 }
618 }
619
620 if trade_ticks.is_empty() {
621 None
622 } else {
623 Some(NautilusWsMessage::Trades(trade_ticks))
624 }
625 }
626
627 fn handle_bbo(
628 data: &super::messages::WsBboData,
629 instruments: &AHashMap<Ustr, InstrumentAny>,
630 ts_init: UnixNanos,
631 ) -> Option<NautilusWsMessage> {
632 if let Some(instrument) = instruments.get(&data.coin) {
633 match parse_ws_quote_tick(data, instrument, ts_init) {
634 Ok(quote_tick) => Some(NautilusWsMessage::Quote(quote_tick)),
635 Err(e) => {
636 log::error!("Error parsing quote tick: {e}");
637 None
638 }
639 }
640 } else {
641 log::debug!("No instrument found for coin: {}", data.coin);
642 None
643 }
644 }
645
646 fn handle_l2_book(
647 data: &super::messages::WsBookData,
648 instruments: &AHashMap<Ustr, InstrumentAny>,
649 depth10_subs: &AHashSet<Ustr>,
650 ts_init: UnixNanos,
651 ) -> Vec<NautilusWsMessage> {
652 let mut out = Vec::new();
653
654 let Some(instrument) = instruments.get(&data.coin) else {
655 log::debug!("No instrument found for coin: {}", data.coin);
656 return out;
657 };
658
659 match parse_ws_order_book_deltas(data, instrument, ts_init) {
660 Ok(deltas) => out.push(NautilusWsMessage::Deltas(deltas)),
661 Err(e) => log::error!("Error parsing order book deltas: {e}"),
662 }
663
664 if depth10_subs.contains(&data.coin) {
665 match parse_ws_order_book_depth10(data, instrument, ts_init) {
666 Ok(depth) => out.push(NautilusWsMessage::Depth10(Box::new(depth))),
667 Err(e) => log::error!("Error parsing order book depth10: {e}"),
668 }
669 }
670
671 out
672 }
673
674 fn handle_candle(
675 data: &CandleData,
676 instruments: &AHashMap<Ustr, InstrumentAny>,
677 bar_types: &AHashMap<String, BarType>,
678 bar_cache: &mut AHashMap<String, CandleData>,
679 ts_init: UnixNanos,
680 ) -> Option<NautilusWsMessage> {
681 let key = format!("candle:{}:{}", data.s, data.i);
682
683 let mut closed_bar = None;
684
685 if let Some(cached) = bar_cache.get(&key) {
686 if cached.close_time != data.close_time {
688 log::debug!(
689 "Bar period changed for {}: prev_close_time={}, new_close_time={}",
690 data.s,
691 cached.close_time,
692 data.close_time
693 );
694 closed_bar = Some(cached.clone());
695 }
696 }
697
698 bar_cache.insert(key.clone(), data.clone());
699
700 if let Some(closed_data) = closed_bar {
701 if let Some(bar_type) = bar_types.get(&key) {
702 if let Some(instrument) = instruments.get(&data.s) {
703 match parse_ws_candle(&closed_data, instrument, bar_type, ts_init) {
704 Ok(bar) => return Some(NautilusWsMessage::Candle(bar)),
705 Err(e) => {
706 log::error!("Error parsing closed candle: {e}");
707 }
708 }
709 } else {
710 log::debug!("No instrument found for coin: {}", data.s);
711 }
712 } else {
713 log::debug!("No bar type found for key: {key}");
714 }
715 }
716
717 None
718 }
719
720 fn handle_asset_context(
721 data: &WsActiveAssetCtxData,
722 instruments: &AHashMap<Ustr, InstrumentAny>,
723 asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
724 mark_price_cache: &mut AHashMap<Ustr, String>,
725 index_price_cache: &mut AHashMap<Ustr, String>,
726 funding_rate_cache: &mut AHashMap<Ustr, String>,
727 ts_init: UnixNanos,
728 ) -> Vec<NautilusWsMessage> {
729 let mut result = Vec::new();
730
731 let coin = match data {
732 WsActiveAssetCtxData::Perp { coin, .. } => coin,
733 WsActiveAssetCtxData::Spot { coin, .. } => coin,
734 };
735
736 if let Some(instrument) = instruments.get(coin) {
737 let (mark_px, oracle_px, funding) = match data {
738 WsActiveAssetCtxData::Perp { ctx, .. } => (
739 &ctx.shared.mark_px,
740 Some(&ctx.oracle_px),
741 Some(&ctx.funding),
742 ),
743 WsActiveAssetCtxData::Spot { ctx, .. } => (&ctx.shared.mark_px, None, None),
744 };
745
746 let mark_changed = mark_price_cache.get(coin) != Some(mark_px);
747 let index_changed = oracle_px.is_some_and(|px| index_price_cache.get(coin) != Some(px));
748 let funding_changed =
749 funding.is_some_and(|rate| funding_rate_cache.get(coin) != Some(rate));
750
751 let subscribed_types = asset_context_subs.get(coin);
752
753 if mark_changed || index_changed || funding_changed {
754 match parse_ws_asset_context(data, instrument, ts_init) {
755 Ok((mark_price, index_price, funding_rate)) => {
756 if mark_changed
757 && subscribed_types
758 .is_some_and(|s| s.contains(&AssetContextDataType::MarkPrice))
759 {
760 mark_price_cache.insert(*coin, mark_px.clone());
761 result.push(NautilusWsMessage::MarkPrice(mark_price));
762 }
763
764 if index_changed
765 && subscribed_types
766 .is_some_and(|s| s.contains(&AssetContextDataType::IndexPrice))
767 {
768 if let Some(px) = oracle_px {
769 index_price_cache.insert(*coin, px.clone());
770 }
771
772 if let Some(index) = index_price {
773 result.push(NautilusWsMessage::IndexPrice(index));
774 }
775 }
776
777 if funding_changed
778 && subscribed_types
779 .is_some_and(|s| s.contains(&AssetContextDataType::FundingRate))
780 {
781 if let Some(rate) = funding {
782 funding_rate_cache.insert(*coin, rate.clone());
783 }
784
785 if let Some(funding) = funding_rate {
786 result.push(NautilusWsMessage::FundingRate(funding));
787 }
788 }
789 }
790 Err(e) => {
791 log::error!("Error parsing asset context: {e}");
792 }
793 }
794 }
795 } else {
796 log::debug!("No instrument found for coin: {coin}");
797 }
798
799 result
800 }
801}
802
803pub(crate) fn subscription_to_key(sub: &SubscriptionRequest) -> String {
804 match sub {
805 SubscriptionRequest::AllMids { dex } => {
806 if let Some(dex_name) = dex {
807 format!("{}:{dex_name}", HyperliquidWsChannel::AllMids.as_str())
808 } else {
809 HyperliquidWsChannel::AllMids.as_str().to_string()
810 }
811 }
812 SubscriptionRequest::Notification { user } => {
813 format!("{}:{user}", HyperliquidWsChannel::Notification.as_str())
814 }
815 SubscriptionRequest::WebData2 { user } => {
816 format!("{}:{user}", HyperliquidWsChannel::WebData2.as_str())
817 }
818 SubscriptionRequest::Candle { coin, interval } => {
819 format!(
820 "{}:{coin}:{}",
821 HyperliquidWsChannel::Candle.as_str(),
822 interval.as_str()
823 )
824 }
825 SubscriptionRequest::L2Book { coin, .. } => {
826 format!("{}:{coin}", HyperliquidWsChannel::L2Book.as_str())
827 }
828 SubscriptionRequest::Trades { coin } => {
829 format!("{}:{coin}", HyperliquidWsChannel::Trades.as_str())
830 }
831 SubscriptionRequest::OrderUpdates { user } => {
832 format!("{}:{user}", HyperliquidWsChannel::OrderUpdates.as_str())
833 }
834 SubscriptionRequest::UserEvents { user } => {
835 format!("{}:{user}", HyperliquidWsChannel::UserEvents.as_str())
836 }
837 SubscriptionRequest::UserFills { user, .. } => {
838 format!("{}:{user}", HyperliquidWsChannel::UserFills.as_str())
839 }
840 SubscriptionRequest::UserFundings { user } => {
841 format!("{}:{user}", HyperliquidWsChannel::UserFundings.as_str())
842 }
843 SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
844 format!(
845 "{}:{user}",
846 HyperliquidWsChannel::UserNonFundingLedgerUpdates.as_str()
847 )
848 }
849 SubscriptionRequest::ActiveAssetCtx { coin } => {
850 format!("{}:{coin}", HyperliquidWsChannel::ActiveAssetCtx.as_str())
851 }
852 SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
853 format!(
854 "{}:{coin}",
855 HyperliquidWsChannel::ActiveSpotAssetCtx.as_str()
856 )
857 }
858 SubscriptionRequest::ActiveAssetData { user, coin } => {
859 format!(
860 "{}:{user}:{coin}",
861 HyperliquidWsChannel::ActiveAssetData.as_str()
862 )
863 }
864 SubscriptionRequest::UserTwapSliceFills { user } => {
865 format!(
866 "{}:{user}",
867 HyperliquidWsChannel::UserTwapSliceFills.as_str()
868 )
869 }
870 SubscriptionRequest::UserTwapHistory { user } => {
871 format!("{}:{user}", HyperliquidWsChannel::UserTwapHistory.as_str())
872 }
873 SubscriptionRequest::Bbo { coin } => {
874 format!("{}:{coin}", HyperliquidWsChannel::Bbo.as_str())
875 }
876 }
877}
878
879pub(crate) fn should_retry_hyperliquid_error(error: &HyperliquidWsError) -> bool {
881 match error {
882 HyperliquidWsError::TungsteniteError(_) => true,
883 HyperliquidWsError::ClientError(msg) => {
884 let msg_lower = msg.to_lowercase();
885 msg_lower.contains("timeout")
886 || msg_lower.contains("timed out")
887 || msg_lower.contains("connection")
888 || msg_lower.contains("network")
889 }
890 _ => false,
891 }
892}
893
894pub(crate) fn create_hyperliquid_timeout_error(msg: String) -> HyperliquidWsError {
896 HyperliquidWsError::ClientError(msg)
897}
898
899#[cfg(test)]
900mod tests {
901 use ahash::{AHashMap, AHashSet};
902 use nautilus_core::nanos::UnixNanos;
903 use nautilus_model::{
904 identifiers::{InstrumentId, Symbol, Venue},
905 instruments::{CryptoPerpetual, InstrumentAny},
906 types::{Currency, Price, Quantity},
907 };
908 use rstest::rstest;
909 use ustr::Ustr;
910
911 use super::{
912 super::messages::{NautilusWsMessage, WsBookData, WsLevelData},
913 FeedHandler,
914 };
915
916 fn btc_perp() -> InstrumentAny {
917 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
918 InstrumentId::new(Symbol::new("BTC-PERP"), Venue::new("HYPERLIQUID")),
919 Symbol::new("BTC-PERP"),
920 Currency::from("BTC"),
921 Currency::from("USDC"),
922 Currency::from("USDC"),
923 false,
924 2,
925 3,
926 Price::from("0.01"),
927 Quantity::from("0.001"),
928 None,
929 None,
930 None,
931 None,
932 None,
933 None,
934 None,
935 None,
936 None,
937 None,
938 None,
939 None,
940 None,
941 UnixNanos::default(),
942 UnixNanos::default(),
943 ))
944 }
945
946 fn one_level_book() -> WsBookData {
947 WsBookData {
948 coin: Ustr::from("BTC"),
949 levels: [
950 vec![WsLevelData {
951 px: "100.00".to_string(),
952 sz: "1.0".to_string(),
953 n: 1,
954 }],
955 vec![WsLevelData {
956 px: "100.01".to_string(),
957 sz: "1.0".to_string(),
958 n: 1,
959 }],
960 ],
961 time: 1_700_000_000_000,
962 }
963 }
964
965 #[rstest]
966 fn handle_l2_book_emits_deltas_only_when_not_in_depth10_subs() {
967 let mut instruments = AHashMap::new();
968 instruments.insert(Ustr::from("BTC"), btc_perp());
969 let depth10_subs = AHashSet::<Ustr>::new();
970
971 let msgs = FeedHandler::handle_l2_book(
972 &one_level_book(),
973 &instruments,
974 &depth10_subs,
975 UnixNanos::default(),
976 );
977
978 assert_eq!(msgs.len(), 1);
979 assert!(matches!(msgs[0], NautilusWsMessage::Deltas(_)));
980 }
981
982 #[rstest]
983 fn handle_l2_book_emits_deltas_and_depth10_when_coin_in_subs() {
984 let mut instruments = AHashMap::new();
985 instruments.insert(Ustr::from("BTC"), btc_perp());
986 let mut depth10_subs = AHashSet::<Ustr>::new();
987 depth10_subs.insert(Ustr::from("BTC"));
988
989 let msgs = FeedHandler::handle_l2_book(
990 &one_level_book(),
991 &instruments,
992 &depth10_subs,
993 UnixNanos::default(),
994 );
995
996 assert_eq!(msgs.len(), 2);
997 assert!(matches!(msgs[0], NautilusWsMessage::Deltas(_)));
998 assert!(matches!(msgs[1], NautilusWsMessage::Depth10(_)));
999 }
1000
1001 #[rstest]
1002 fn handle_l2_book_returns_empty_when_instrument_unknown() {
1003 let instruments = AHashMap::<Ustr, InstrumentAny>::new();
1004 let depth10_subs = AHashSet::<Ustr>::new();
1005
1006 let msgs = FeedHandler::handle_l2_book(
1007 &one_level_book(),
1008 &instruments,
1009 &depth10_subs,
1010 UnixNanos::default(),
1011 );
1012
1013 assert!(msgs.is_empty());
1014 }
1015}