1use std::sync::{
17 Arc,
18 atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25 clients::DataClient,
26 live::{runner::get_data_event_sender, runtime::get_runtime},
27 messages::{
28 DataEvent,
29 data::{
30 BarsResponse, BookResponse, DataResponse, FundingRatesResponse, InstrumentResponse,
31 InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
32 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
33 SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
34 SubscribeInstrument, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
35 UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
36 UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeMarkPrices,
37 UnsubscribeQuotes, UnsubscribeTrades,
38 },
39 },
40};
41use nautilus_core::{
42 AtomicMap, Params, UnixNanos,
43 datetime::datetime_to_unix_nanos,
44 time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_model::{
47 data::{Bar, BarType, BookOrder, Data, FundingRateUpdate, OrderBookDeltas_API},
48 enums::{BarAggregation, BookType, OrderSide},
49 identifiers::{ClientId, InstrumentId, Venue},
50 instruments::{Instrument, InstrumentAny},
51 orderbook::OrderBook,
52 types::{Price, Quantity},
53};
54use rust_decimal::Decimal;
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57use ustr::Ustr;
58
59use crate::{
60 common::{
61 consts::HYPERLIQUID_VENUE,
62 credential::{Secrets, credential_env_vars},
63 parse::bar_type_to_interval,
64 },
65 config::HyperliquidDataClientConfig,
66 http::{
67 client::HyperliquidHttpClient,
68 models::{HyperliquidCandle, HyperliquidFundingHistoryEntry},
69 },
70 websocket::{
71 client::HyperliquidWebSocketClient,
72 messages::{HyperliquidWsMessage, NautilusWsMessage},
73 parse::{
74 parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
75 },
76 },
77};
78
79#[derive(Debug)]
80pub struct HyperliquidDataClient {
81 client_id: ClientId,
82 #[allow(dead_code)]
83 config: HyperliquidDataClientConfig,
84 http_client: HyperliquidHttpClient,
85 ws_client: HyperliquidWebSocketClient,
86 is_connected: AtomicBool,
87 cancellation_token: CancellationToken,
88 tasks: Vec<JoinHandle<()>>,
89 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
90 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
91 coin_to_instrument_id: Arc<AtomicMap<Ustr, InstrumentId>>,
93 clock: &'static AtomicTime,
94 #[allow(dead_code)]
95 instrument_refresh_active: bool,
96}
97
98impl HyperliquidDataClient {
99 pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
105 let clock = get_atomic_clock_realtime();
106 let data_sender = get_data_event_sender();
107
108 let (pk_var, _) = credential_env_vars(config.environment);
111 let has_credentials = config.has_credentials() || std::env::var(pk_var).is_ok();
112
113 let mut http_client = if has_credentials {
114 let secrets =
115 Secrets::resolve(config.private_key.as_deref(), None, config.environment)?;
116 HyperliquidHttpClient::with_secrets(
117 &secrets,
118 config.http_timeout_secs,
119 config.proxy_url.clone(),
120 )?
121 } else {
122 HyperliquidHttpClient::new(
123 config.environment,
124 config.http_timeout_secs,
125 config.proxy_url.clone(),
126 )?
127 };
128
129 if let Some(url) = &config.base_url_http {
131 http_client.set_base_info_url(url.clone());
132 }
133
134 let ws_url = config.base_url_ws.clone();
135 let ws_client = HyperliquidWebSocketClient::new(
136 ws_url,
137 config.environment,
138 None,
139 config.transport_backend,
140 config.proxy_url.clone(),
141 );
142
143 Ok(Self {
144 client_id,
145 config,
146 http_client,
147 ws_client,
148 is_connected: AtomicBool::new(false),
149 cancellation_token: CancellationToken::new(),
150 tasks: Vec::new(),
151 data_sender,
152 instruments: Arc::new(AtomicMap::new()),
153 coin_to_instrument_id: Arc::new(AtomicMap::new()),
154 clock,
155 instrument_refresh_active: false,
156 })
157 }
158
159 fn venue(&self) -> Venue {
160 *HYPERLIQUID_VENUE
161 }
162
163 async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
164 let instruments = self
165 .http_client
166 .request_instruments()
167 .await
168 .context("failed to fetch instruments during bootstrap")?;
169
170 self.instruments.rcu(|m| {
171 for instrument in &instruments {
172 m.insert(instrument.id(), instrument.clone());
173 }
174 });
175
176 self.coin_to_instrument_id.rcu(|m| {
177 for instrument in &instruments {
178 m.insert(instrument.raw_symbol().inner(), instrument.id());
179 }
180 });
181
182 for instrument in &instruments {
183 self.ws_client.cache_instrument(instrument.clone());
184 }
185
186 log::info!(
187 "Bootstrapped {} instruments with {} coin mappings",
188 self.instruments.len(),
189 self.coin_to_instrument_id.len()
190 );
191 Ok(instruments)
192 }
193
194 async fn spawn_ws(&mut self) -> anyhow::Result<()> {
195 let mut ws_client = self.ws_client.clone();
197
198 ws_client
199 .connect()
200 .await
201 .context("failed to connect to Hyperliquid WebSocket")?;
202
203 if let Some(handle) = ws_client.take_task_handle() {
205 self.ws_client.set_task_handle(handle);
206 }
207
208 let data_sender = self.data_sender.clone();
209 let cancellation_token = self.cancellation_token.clone();
210
211 let task = get_runtime().spawn(async move {
212 log::info!("Hyperliquid WebSocket consumption loop started");
213
214 loop {
215 tokio::select! {
216 () = cancellation_token.cancelled() => {
217 log::info!("WebSocket consumption loop cancelled");
218 break;
219 }
220 msg_opt = ws_client.next_event() => {
221 if let Some(msg) = msg_opt {
222 match msg {
223 NautilusWsMessage::Trades(trades) => {
224 for trade in trades {
225 if let Err(e) = data_sender
226 .send(DataEvent::Data(Data::Trade(trade)))
227 {
228 log::error!("Failed to send trade tick: {e}");
229 }
230 }
231 }
232 NautilusWsMessage::Quote(quote) => {
233 if let Err(e) = data_sender
234 .send(DataEvent::Data(Data::Quote(quote)))
235 {
236 log::error!("Failed to send quote tick: {e}");
237 }
238 }
239 NautilusWsMessage::Deltas(deltas) => {
240 if let Err(e) = data_sender
241 .send(DataEvent::Data(Data::Deltas(
242 OrderBookDeltas_API::new(deltas),
243 )))
244 {
245 log::error!("Failed to send order book deltas: {e}");
246 }
247 }
248 NautilusWsMessage::Depth10(depth) => {
249 if let Err(e) =
250 data_sender.send(DataEvent::Data(Data::Depth10(depth)))
251 {
252 log::error!("Failed to send order book depth10: {e}");
253 }
254 }
255 NautilusWsMessage::Candle(bar) => {
256 if let Err(e) = data_sender
257 .send(DataEvent::Data(Data::Bar(bar)))
258 {
259 log::error!("Failed to send bar: {e}");
260 }
261 }
262 NautilusWsMessage::MarkPrice(update) => {
263 if let Err(e) = data_sender
264 .send(DataEvent::Data(Data::MarkPriceUpdate(update)))
265 {
266 log::error!("Failed to send mark price update: {e}");
267 }
268 }
269 NautilusWsMessage::IndexPrice(update) => {
270 if let Err(e) = data_sender
271 .send(DataEvent::Data(Data::IndexPriceUpdate(update)))
272 {
273 log::error!("Failed to send index price update: {e}");
274 }
275 }
276 NautilusWsMessage::FundingRate(update) => {
277 if let Err(e) = data_sender
278 .send(DataEvent::FundingRate(update))
279 {
280 log::error!("Failed to send funding rate update: {e}");
281 }
282 }
283 NautilusWsMessage::Reconnected => {
284 log::info!("WebSocket reconnected");
285 }
286 NautilusWsMessage::Error(e) => {
287 log::error!("WebSocket error: {e}");
288 }
289 NautilusWsMessage::ExecutionReports(_) => {
290 }
292 }
293 } else {
294 log::debug!("WebSocket next_event returned None, stream closed");
296 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
297 }
298 }
299 }
300 }
301
302 log::info!("Hyperliquid WebSocket consumption loop finished");
303 });
304
305 self.tasks.push(task);
306 log::info!("WebSocket consumption task spawned");
307
308 Ok(())
309 }
310
311 #[allow(dead_code)]
312 fn handle_ws_message(
313 msg: HyperliquidWsMessage,
314 ws_client: &HyperliquidWebSocketClient,
315 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
316 instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
317 coin_to_instrument_id: &Arc<AtomicMap<Ustr, InstrumentId>>,
318 _venue: Venue,
319 clock: &'static AtomicTime,
320 ) {
321 match msg {
322 HyperliquidWsMessage::Bbo { data } => {
323 let coin = data.coin;
324 log::debug!("Received BBO message for coin: {coin}");
325
326 let coin_map = coin_to_instrument_id.load();
327 let instrument_id = coin_map.get(&data.coin);
328
329 if let Some(&instrument_id) = instrument_id {
330 let instruments_map = instruments.load();
331 if let Some(instrument) = instruments_map.get(&instrument_id) {
332 let ts_init = clock.get_time_ns();
333
334 match parse_ws_quote_tick(&data, instrument, ts_init) {
335 Ok(quote_tick) => {
336 log::debug!(
337 "Parsed quote tick for {}: bid={}, ask={}",
338 data.coin,
339 quote_tick.bid_price,
340 quote_tick.ask_price
341 );
342
343 if let Err(e) =
344 data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
345 {
346 log::error!("Failed to send quote tick: {e}");
347 }
348 }
349 Err(e) => {
350 log::error!("Failed to parse quote tick for {}: {e}", data.coin);
351 }
352 }
353 }
354 } else {
355 log::warn!(
356 "Received BBO for unknown coin: {} (no matching instrument found)",
357 data.coin
358 );
359 }
360 }
361 HyperliquidWsMessage::Trades { data } => {
362 let count = data.len();
363 log::debug!("Received {count} trade(s)");
364
365 for trade_data in data {
366 let coin = trade_data.coin;
367 let coin_map = coin_to_instrument_id.load();
368
369 if let Some(&instrument_id) = coin_map.get(&coin) {
370 let instruments_map = instruments.load();
371 if let Some(instrument) = instruments_map.get(&instrument_id) {
372 let ts_init = clock.get_time_ns();
373
374 match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
375 Ok(trade_tick) => {
376 if let Err(e) =
377 data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
378 {
379 log::error!("Failed to send trade tick: {e}");
380 }
381 }
382 Err(e) => {
383 log::error!("Failed to parse trade tick for {coin}: {e}");
384 }
385 }
386 }
387 } else {
388 log::warn!("Received trade for unknown coin: {coin}");
389 }
390 }
391 }
392 HyperliquidWsMessage::L2Book { data } => {
393 let coin = data.coin;
394 log::debug!("Received L2 book update for coin: {coin}");
395
396 let coin_map = coin_to_instrument_id.load();
397 if let Some(&instrument_id) = coin_map.get(&data.coin) {
398 let instruments_map = instruments.load();
399 if let Some(instrument) = instruments_map.get(&instrument_id) {
400 let ts_init = clock.get_time_ns();
401
402 match parse_ws_order_book_deltas(&data, instrument, ts_init) {
403 Ok(deltas) => {
404 if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
405 OrderBookDeltas_API::new(deltas),
406 ))) {
407 log::error!("Failed to send order book deltas: {e}");
408 }
409 }
410 Err(e) => {
411 log::error!(
412 "Failed to parse order book deltas for {}: {e}",
413 data.coin
414 );
415 }
416 }
417 }
418 } else {
419 log::warn!("Received L2 book for unknown coin: {coin}");
420 }
421 }
422 HyperliquidWsMessage::Candle { data } => {
423 let coin = &data.s;
424 let interval = &data.i;
425 log::debug!("Received candle for {coin}:{interval}");
426
427 if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
428 let coin = Ustr::from(&data.s);
429 let coin_map = coin_to_instrument_id.load();
430
431 if let Some(&instrument_id) = coin_map.get(&coin) {
432 let instruments_map = instruments.load();
433 if let Some(instrument) = instruments_map.get(&instrument_id) {
434 let ts_init = clock.get_time_ns();
435
436 match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
437 Ok(bar) => {
438 if let Err(e) =
439 data_sender.send(DataEvent::Data(Data::Bar(bar)))
440 {
441 log::error!("Failed to send bar data: {e}");
442 }
443 }
444 Err(e) => {
445 log::error!("Failed to parse candle for {coin}: {e}");
446 }
447 }
448 }
449 } else {
450 log::warn!("Received candle for unknown coin: {coin}");
451 }
452 } else {
453 log::debug!("Received candle for {coin}:{interval} but no BarType tracked");
454 }
455 }
456 _ => {
457 log::trace!("Received unhandled WebSocket message: {msg:?}");
458 }
459 }
460 }
461}
462
463impl HyperliquidDataClient {
464 #[allow(dead_code)]
465 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
466 if let Err(e) = sender.send(DataEvent::Data(data)) {
467 log::error!("Failed to emit data event: {e}");
468 }
469 }
470}
471
472#[async_trait::async_trait(?Send)]
473impl DataClient for HyperliquidDataClient {
474 fn client_id(&self) -> ClientId {
475 self.client_id
476 }
477
478 fn venue(&self) -> Option<Venue> {
479 Some(self.venue())
480 }
481
482 fn start(&mut self) -> anyhow::Result<()> {
483 log::info!(
484 "Starting Hyperliquid data client: client_id={}, environment={:?}, proxy_url={:?}",
485 self.client_id,
486 self.config.environment,
487 self.config.proxy_url,
488 );
489 Ok(())
490 }
491
492 fn stop(&mut self) -> anyhow::Result<()> {
493 log::info!("Stopping Hyperliquid data client {}", self.client_id);
494 self.cancellation_token.cancel();
495 self.is_connected.store(false, Ordering::Relaxed);
496 Ok(())
497 }
498
499 fn reset(&mut self) -> anyhow::Result<()> {
500 log::debug!("Resetting Hyperliquid data client {}", self.client_id);
501 self.is_connected.store(false, Ordering::Relaxed);
502 self.cancellation_token = CancellationToken::new();
503 self.tasks.clear();
504 Ok(())
505 }
506
507 fn dispose(&mut self) -> anyhow::Result<()> {
508 log::debug!("Disposing Hyperliquid data client {}", self.client_id);
509 self.stop()
510 }
511
512 fn is_connected(&self) -> bool {
513 self.is_connected.load(Ordering::Acquire)
514 }
515
516 fn is_disconnected(&self) -> bool {
517 !self.is_connected()
518 }
519
520 async fn connect(&mut self) -> anyhow::Result<()> {
521 if self.is_connected() {
522 return Ok(());
523 }
524
525 let instruments = self
526 .bootstrap_instruments()
527 .await
528 .context("failed to bootstrap instruments")?;
529
530 for instrument in instruments {
531 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
532 log::warn!("Failed to send instrument: {e}");
533 }
534 }
535
536 self.spawn_ws()
537 .await
538 .context("failed to spawn WebSocket client")?;
539
540 self.is_connected.store(true, Ordering::Relaxed);
541 log::info!("Connected: client_id={}", self.client_id);
542
543 Ok(())
544 }
545
546 async fn disconnect(&mut self) -> anyhow::Result<()> {
547 if !self.is_connected() {
548 return Ok(());
549 }
550
551 self.cancellation_token.cancel();
552
553 for task in self.tasks.drain(..) {
554 if let Err(e) = task.await {
555 log::error!("Error waiting for task to complete: {e}");
556 }
557 }
558
559 if let Err(e) = self.ws_client.disconnect().await {
560 log::error!("Error disconnecting WebSocket client: {e}");
561 }
562
563 self.instruments.store(AHashMap::new());
564
565 self.is_connected.store(false, Ordering::Relaxed);
566 log::info!("Disconnected: client_id={}", self.client_id);
567
568 Ok(())
569 }
570
571 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
572 let instruments = self.instruments.load();
573 if let Some(instrument) = instruments.get(&cmd.instrument_id) {
574 if let Err(e) = self
575 .data_sender
576 .send(DataEvent::Instrument(instrument.clone()))
577 {
578 log::error!("Failed to send instrument {}: {e}", cmd.instrument_id);
579 }
580 } else {
581 log::warn!("Instrument {} not found in cache", cmd.instrument_id);
582 }
583 Ok(())
584 }
585
586 fn subscribe_book_deltas(&mut self, subscription: SubscribeBookDeltas) -> anyhow::Result<()> {
587 log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
588
589 if subscription.book_type != BookType::L2_MBP {
590 anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
591 }
592
593 let ws = self.ws_client.clone();
594 let instrument_id = subscription.instrument_id;
595 let (n_sig_figs, mantissa) = parse_book_precision_params(subscription.params.as_ref())?;
596
597 get_runtime().spawn(async move {
598 if let Err(e) = ws
599 .subscribe_book_with_options(instrument_id, n_sig_figs, mantissa)
600 .await
601 {
602 log::error!("Failed to subscribe to book deltas: {e:?}");
603 }
604 });
605
606 Ok(())
607 }
608
609 fn subscribe_book_depth10(&mut self, subscription: SubscribeBookDepth10) -> anyhow::Result<()> {
610 log::debug!(
611 "Subscribing to book depth10: {}",
612 subscription.instrument_id
613 );
614
615 if subscription.book_type != BookType::L2_MBP {
616 anyhow::bail!("Hyperliquid only supports L2_MBP order book depth10");
617 }
618
619 let ws = self.ws_client.clone();
620 let instrument_id = subscription.instrument_id;
621 let (n_sig_figs, mantissa) = parse_book_precision_params(subscription.params.as_ref())?;
622
623 get_runtime().spawn(async move {
624 if let Err(e) = ws
625 .subscribe_book_depth10_with_options(instrument_id, n_sig_figs, mantissa)
626 .await
627 {
628 log::error!("Failed to subscribe to book depth10: {e:?}");
629 }
630 });
631
632 Ok(())
633 }
634
635 fn subscribe_quotes(&mut self, subscription: SubscribeQuotes) -> anyhow::Result<()> {
636 log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
637
638 let ws = self.ws_client.clone();
639 let instrument_id = subscription.instrument_id;
640
641 get_runtime().spawn(async move {
642 if let Err(e) = ws.subscribe_quotes(instrument_id).await {
643 log::error!("Failed to subscribe to quotes: {e:?}");
644 }
645 });
646
647 Ok(())
648 }
649
650 fn subscribe_trades(&mut self, subscription: SubscribeTrades) -> anyhow::Result<()> {
651 log::debug!("Subscribing to trades: {}", subscription.instrument_id);
652
653 let ws = self.ws_client.clone();
654 let instrument_id = subscription.instrument_id;
655
656 get_runtime().spawn(async move {
657 if let Err(e) = ws.subscribe_trades(instrument_id).await {
658 log::error!("Failed to subscribe to trades: {e:?}");
659 }
660 });
661
662 Ok(())
663 }
664
665 fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
666 let ws = self.ws_client.clone();
667 let instrument_id = cmd.instrument_id;
668
669 get_runtime().spawn(async move {
670 if let Err(e) = ws.subscribe_mark_prices(instrument_id).await {
671 log::error!("Failed to subscribe to mark prices: {e:?}");
672 }
673 });
674
675 Ok(())
676 }
677
678 fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
679 let ws = self.ws_client.clone();
680 let instrument_id = cmd.instrument_id;
681
682 get_runtime().spawn(async move {
683 if let Err(e) = ws.subscribe_index_prices(instrument_id).await {
684 log::error!("Failed to subscribe to index prices: {e:?}");
685 }
686 });
687
688 Ok(())
689 }
690
691 fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
692 let ws = self.ws_client.clone();
693 let instrument_id = cmd.instrument_id;
694
695 get_runtime().spawn(async move {
696 if let Err(e) = ws.subscribe_funding_rates(instrument_id).await {
697 log::error!("Failed to subscribe to funding rates: {e:?}");
698 }
699 });
700
701 Ok(())
702 }
703
704 fn subscribe_bars(&mut self, subscription: SubscribeBars) -> anyhow::Result<()> {
705 log::debug!("Subscribing to bars: {}", subscription.bar_type);
706
707 let instrument_id = subscription.bar_type.instrument_id();
708 if !self.instruments.contains_key(&instrument_id) {
709 anyhow::bail!("Instrument {instrument_id} not found");
710 }
711
712 let bar_type = subscription.bar_type;
713 let ws = self.ws_client.clone();
714
715 get_runtime().spawn(async move {
716 if let Err(e) = ws.subscribe_bars(bar_type).await {
717 log::error!("Failed to subscribe to bars: {e:?}");
718 }
719 });
720
721 Ok(())
722 }
723
724 fn unsubscribe_book_deltas(
725 &mut self,
726 unsubscription: &UnsubscribeBookDeltas,
727 ) -> anyhow::Result<()> {
728 log::debug!(
729 "Unsubscribing from book deltas: {}",
730 unsubscription.instrument_id
731 );
732
733 let ws = self.ws_client.clone();
734 let instrument_id = unsubscription.instrument_id;
735
736 get_runtime().spawn(async move {
737 if let Err(e) = ws.unsubscribe_book(instrument_id).await {
738 log::error!("Failed to unsubscribe from book deltas: {e:?}");
739 }
740 });
741
742 Ok(())
743 }
744
745 fn unsubscribe_book_depth10(
746 &mut self,
747 unsubscription: &UnsubscribeBookDepth10,
748 ) -> anyhow::Result<()> {
749 log::debug!(
750 "Unsubscribing from book depth10: {}",
751 unsubscription.instrument_id
752 );
753
754 let ws = self.ws_client.clone();
755 let instrument_id = unsubscription.instrument_id;
756
757 get_runtime().spawn(async move {
758 if let Err(e) = ws.unsubscribe_book_depth10(instrument_id).await {
759 log::error!("Failed to unsubscribe from book depth10: {e:?}");
760 }
761 });
762
763 Ok(())
764 }
765
766 fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
767 log::debug!(
768 "Unsubscribing from quotes: {}",
769 unsubscription.instrument_id
770 );
771
772 let ws = self.ws_client.clone();
773 let instrument_id = unsubscription.instrument_id;
774
775 get_runtime().spawn(async move {
776 if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
777 log::error!("Failed to unsubscribe from quotes: {e:?}");
778 }
779 });
780
781 Ok(())
782 }
783
784 fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
785 log::debug!(
786 "Unsubscribing from trades: {}",
787 unsubscription.instrument_id
788 );
789
790 let ws = self.ws_client.clone();
791 let instrument_id = unsubscription.instrument_id;
792
793 get_runtime().spawn(async move {
794 if let Err(e) = ws.unsubscribe_trades(instrument_id).await {
795 log::error!("Failed to unsubscribe from trades: {e:?}");
796 }
797 });
798
799 Ok(())
800 }
801
802 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
803 let ws = self.ws_client.clone();
804 let instrument_id = cmd.instrument_id;
805
806 get_runtime().spawn(async move {
807 if let Err(e) = ws.unsubscribe_mark_prices(instrument_id).await {
808 log::error!("Failed to unsubscribe from mark prices: {e:?}");
809 }
810 });
811
812 Ok(())
813 }
814
815 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
816 let ws = self.ws_client.clone();
817 let instrument_id = cmd.instrument_id;
818
819 get_runtime().spawn(async move {
820 if let Err(e) = ws.unsubscribe_index_prices(instrument_id).await {
821 log::error!("Failed to unsubscribe from index prices: {e:?}");
822 }
823 });
824
825 Ok(())
826 }
827
828 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
829 let ws = self.ws_client.clone();
830 let instrument_id = cmd.instrument_id;
831
832 get_runtime().spawn(async move {
833 if let Err(e) = ws.unsubscribe_funding_rates(instrument_id).await {
834 log::error!("Failed to unsubscribe from funding rates: {e:?}");
835 }
836 });
837
838 Ok(())
839 }
840
841 fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
842 log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
843
844 let bar_type = unsubscription.bar_type;
845 let ws = self.ws_client.clone();
846
847 get_runtime().spawn(async move {
848 if let Err(e) = ws.unsubscribe_bars(bar_type).await {
849 log::error!("Failed to unsubscribe from bars: {e:?}");
850 }
851 });
852
853 Ok(())
854 }
855
856 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
857 log::debug!("Requesting all instruments");
858
859 let http = self.http_client.clone();
860 let sender = self.data_sender.clone();
861 let instruments_cache = self.instruments.clone();
862 let coin_map = self.coin_to_instrument_id.clone();
863 let ws_instruments = self.ws_client.instruments_cache();
864 let request_id = request.request_id;
865 let client_id = request.client_id.unwrap_or(self.client_id);
866 let venue = self.venue();
867 let start_nanos = datetime_to_unix_nanos(request.start);
868 let end_nanos = datetime_to_unix_nanos(request.end);
869 let params = request.params;
870 let clock = self.clock;
871
872 get_runtime().spawn(async move {
873 match http.request_instruments().await {
874 Ok(instruments) => {
875 instruments_cache.rcu(|instruments_map| {
876 coin_map.rcu(|coin_to_id| {
877 for instrument in &instruments {
878 let instrument_id = instrument.id();
879 instruments_map.insert(instrument_id, instrument.clone());
880 let coin = instrument.raw_symbol().inner();
881 coin_to_id.insert(coin, instrument_id);
882 ws_instruments.insert(coin, instrument.clone());
883 }
884 });
885 });
886
887 let response = DataResponse::Instruments(InstrumentsResponse::new(
888 request_id,
889 client_id,
890 venue,
891 instruments,
892 start_nanos,
893 end_nanos,
894 clock.get_time_ns(),
895 params,
896 ));
897
898 if let Err(e) = sender.send(DataEvent::Response(response)) {
899 log::error!("Failed to send instruments response: {e}");
900 }
901 }
902 Err(e) => {
903 log::error!("Failed to fetch instruments from Hyperliquid: {e:?}");
904 }
905 }
906 });
907
908 Ok(())
909 }
910
911 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
912 log::debug!("Requesting instrument: {}", request.instrument_id);
913
914 let http = self.http_client.clone();
915 let sender = self.data_sender.clone();
916 let instruments_cache = self.instruments.clone();
917 let coin_map = self.coin_to_instrument_id.clone();
918 let ws_instruments = self.ws_client.instruments_cache();
919 let instrument_id = request.instrument_id;
920 let request_id = request.request_id;
921 let client_id = request.client_id.unwrap_or(self.client_id);
922 let start_nanos = datetime_to_unix_nanos(request.start);
923 let end_nanos = datetime_to_unix_nanos(request.end);
924 let params = request.params;
925 let clock = self.clock;
926
927 get_runtime().spawn(async move {
928 match http.request_instruments().await {
929 Ok(all_instruments) => {
930 instruments_cache.rcu(|instruments_map| {
931 coin_map.rcu(|coin_to_id| {
932 for instrument in &all_instruments {
933 let id = instrument.id();
934 instruments_map.insert(id, instrument.clone());
935 let coin = instrument.raw_symbol().inner();
936 coin_to_id.insert(coin, id);
937 ws_instruments.insert(coin, instrument.clone());
938 }
939 });
940 });
941
942 if let Some(instrument) = all_instruments
943 .into_iter()
944 .find(|i| i.id() == instrument_id)
945 {
946 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
947 request_id,
948 client_id,
949 instrument.id(),
950 instrument,
951 start_nanos,
952 end_nanos,
953 clock.get_time_ns(),
954 params,
955 )));
956
957 if let Err(e) = sender.send(DataEvent::Response(response)) {
958 log::error!("Failed to send instrument response: {e}");
959 }
960 } else {
961 log::error!("Instrument not found: {instrument_id}");
962 }
963 }
964 Err(e) => {
965 log::error!("Failed to fetch instruments from Hyperliquid: {e:?}");
966 }
967 }
968 });
969
970 Ok(())
971 }
972
973 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
974 log::debug!("Requesting bars for {}", request.bar_type);
975
976 let http = self.http_client.clone();
977 let sender = self.data_sender.clone();
978 let bar_type = request.bar_type;
979 let start = request.start;
980 let end = request.end;
981 let limit = request.limit.map(|n| n.get() as u32);
982 let request_id = request.request_id;
983 let client_id = request.client_id.unwrap_or(self.client_id);
984 let params = request.params;
985 let clock = self.clock;
986 let start_nanos = datetime_to_unix_nanos(start);
987 let end_nanos = datetime_to_unix_nanos(end);
988 let instruments = Arc::clone(&self.instruments);
989
990 get_runtime().spawn(async move {
991 match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
992 Ok(bars) => {
993 let response = DataResponse::Bars(BarsResponse::new(
994 request_id,
995 client_id,
996 bar_type,
997 bars,
998 start_nanos,
999 end_nanos,
1000 clock.get_time_ns(),
1001 params,
1002 ));
1003
1004 if let Err(e) = sender.send(DataEvent::Response(response)) {
1005 log::error!("Failed to send bars response: {e}");
1006 }
1007 }
1008 Err(e) => log::error!("Bar request failed: {e:?}"),
1009 }
1010 });
1011
1012 Ok(())
1013 }
1014
1015 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1016 anyhow::bail!(
1021 "Historical trade requests are not supported by Hyperliquid for {}; \
1022 subscribe to trades via WebSocket for live trade ticks",
1023 request.instrument_id,
1024 )
1025 }
1026
1027 fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1028 let instrument_id = request.instrument_id;
1029 log::debug!("Requesting funding rates for {instrument_id}");
1030
1031 let instruments = self.instruments.load();
1032 let instrument = instruments
1033 .get(&instrument_id)
1034 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1035
1036 if !matches!(instrument, InstrumentAny::CryptoPerpetual(_)) {
1037 anyhow::bail!("Funding rates are only available for perpetual instruments");
1038 }
1039
1040 let coin = instrument.raw_symbol().to_string();
1041 let http = self.http_client.clone();
1042 let sender = self.data_sender.clone();
1043 let client_id = request.client_id.unwrap_or(self.client_id);
1044 let request_id = request.request_id;
1045 let params = request.params;
1046 let clock = self.clock;
1047 let limit = request.limit.map(|n| n.get());
1048 let start_dt = request.start;
1049 let end_dt = request.end;
1050 let start_nanos = datetime_to_unix_nanos(start_dt);
1051 let end_nanos = datetime_to_unix_nanos(end_dt);
1052
1053 let now_ms = Utc::now().timestamp_millis() as u64;
1054
1055 let default_lookback_ms: u64 = 7 * 86_400_000;
1057 let start_ms = match start_dt {
1058 Some(dt) => dt.timestamp_millis().max(0) as u64,
1059 None => now_ms.saturating_sub(default_lookback_ms),
1060 };
1061 let end_ms = end_dt.map(|dt| dt.timestamp_millis().max(0) as u64);
1062
1063 get_runtime().spawn(async move {
1064 match http.info_funding_history(&coin, start_ms, end_ms).await {
1065 Ok(entries) => {
1066 let mut funding_rates: Vec<FundingRateUpdate> = entries
1067 .iter()
1068 .filter_map(
1069 |entry| match funding_entry_to_update(entry, instrument_id) {
1070 Ok(update) => Some(update),
1071 Err(e) => {
1072 log::warn!(
1073 "Skipping funding history entry for {instrument_id}: {e}",
1074 );
1075 None
1076 }
1077 },
1078 )
1079 .collect();
1080
1081 if let Some(limit) = limit
1082 && funding_rates.len() > limit
1083 {
1084 funding_rates.truncate(limit);
1085 }
1086
1087 log::debug!(
1088 "Fetched {} funding rates for {instrument_id}",
1089 funding_rates.len(),
1090 );
1091
1092 let response = DataResponse::FundingRates(FundingRatesResponse::new(
1093 request_id,
1094 client_id,
1095 instrument_id,
1096 funding_rates,
1097 start_nanos,
1098 end_nanos,
1099 clock.get_time_ns(),
1100 params,
1101 ));
1102
1103 if let Err(e) = sender.send(DataEvent::Response(response)) {
1104 log::error!("Failed to send funding rates response: {e}");
1105 }
1106 }
1107 Err(e) => log::error!("Funding rates request failed for {instrument_id}: {e:?}"),
1108 }
1109 });
1110
1111 Ok(())
1112 }
1113
1114 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1115 let instrument_id = request.instrument_id;
1116 let instruments = self.instruments.load();
1117 let instrument = instruments
1118 .get(&instrument_id)
1119 .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1120
1121 let raw_symbol = instrument.raw_symbol().to_string();
1122 let price_precision = instrument.price_precision();
1123 let size_precision = instrument.size_precision();
1124 let depth = request.depth.map(|d| d.get());
1125
1126 let http = self.http_client.clone();
1127 let sender = self.data_sender.clone();
1128 let client_id = request.client_id.unwrap_or(self.client_id);
1129 let request_id = request.request_id;
1130 let params = request.params;
1131 let clock = self.clock;
1132
1133 get_runtime().spawn(async move {
1134 match http.info_l2_book(&raw_symbol).await {
1135 Ok(l2_book) => {
1136 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1137 let ts_event = UnixNanos::from(l2_book.time * 1_000_000);
1138
1139 let all_bids = l2_book
1140 .levels
1141 .first()
1142 .map_or([].as_slice(), |v| v.as_slice());
1143 let all_asks = l2_book
1144 .levels
1145 .get(1)
1146 .map_or([].as_slice(), |v| v.as_slice());
1147
1148 let bids = match depth {
1149 Some(d) if d < all_bids.len() => &all_bids[..d],
1150 _ => all_bids,
1151 };
1152 let asks = match depth {
1153 Some(d) if d < all_asks.len() => &all_asks[..d],
1154 _ => all_asks,
1155 };
1156
1157 for (i, level) in bids.iter().enumerate() {
1158 let px: f64 = match level.px.parse() {
1159 Ok(v) => v,
1160 Err(_) => continue,
1161 };
1162 let sz: f64 = match level.sz.parse() {
1163 Ok(v) => v,
1164 Err(_) => continue,
1165 };
1166
1167 if sz > 0.0 {
1168 let price = Price::new(px, price_precision);
1169 let size = Quantity::new(sz, size_precision);
1170 let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1171 book.add(order, 0, i as u64, ts_event);
1172 }
1173 }
1174
1175 let bids_len = bids.len();
1176
1177 for (i, level) in asks.iter().enumerate() {
1178 let px: f64 = match level.px.parse() {
1179 Ok(v) => v,
1180 Err(_) => continue,
1181 };
1182 let sz: f64 = match level.sz.parse() {
1183 Ok(v) => v,
1184 Err(_) => continue,
1185 };
1186
1187 if sz > 0.0 {
1188 let price = Price::new(px, price_precision);
1189 let size = Quantity::new(sz, size_precision);
1190 let order =
1191 BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1192 book.add(order, 0, (bids_len + i) as u64, ts_event);
1193 }
1194 }
1195
1196 log::info!(
1197 "Fetched order book for {instrument_id} with {} bids and {} asks",
1198 bids.len(),
1199 asks.len(),
1200 );
1201
1202 let response = DataResponse::Book(BookResponse::new(
1203 request_id,
1204 client_id,
1205 instrument_id,
1206 book,
1207 None,
1208 None,
1209 clock.get_time_ns(),
1210 params,
1211 ));
1212
1213 if let Err(e) = sender.send(DataEvent::Response(response)) {
1214 log::error!("Failed to send book snapshot response: {e}");
1215 }
1216 }
1217 Err(e) => log::error!("Book snapshot request failed for {instrument_id}: {e:?}"),
1218 }
1219 });
1220
1221 Ok(())
1222 }
1223}
1224
1225pub(crate) fn parse_book_precision_params(
1228 params: Option<&Params>,
1229) -> anyhow::Result<(Option<u32>, Option<u32>)> {
1230 let Some(params) = params else {
1231 return Ok((None, None));
1232 };
1233
1234 let read_u32 = |key: &str| -> anyhow::Result<Option<u32>> {
1235 match params.get(key) {
1236 None => Ok(None),
1237 Some(v) => v
1238 .as_u64()
1239 .and_then(|n| u32::try_from(n).ok())
1240 .ok_or_else(|| anyhow::anyhow!("`{key}` must be a positive u32"))
1241 .map(Some),
1242 }
1243 };
1244
1245 Ok((read_u32("n_sig_figs")?, read_u32("mantissa")?))
1246}
1247
1248pub(crate) fn funding_entry_to_update(
1251 entry: &HyperliquidFundingHistoryEntry,
1252 instrument_id: InstrumentId,
1253) -> anyhow::Result<FundingRateUpdate> {
1254 let rate: Decimal = entry
1255 .funding_rate
1256 .parse()
1257 .with_context(|| format!("invalid fundingRate '{}'", entry.funding_rate))?;
1258 let ts = UnixNanos::from(entry.time * 1_000_000);
1259 Ok(FundingRateUpdate::new(
1260 instrument_id,
1261 rate,
1262 Some(60),
1263 None,
1264 ts,
1265 ts,
1266 ))
1267}
1268
1269pub(crate) fn candle_to_bar(
1270 candle: &HyperliquidCandle,
1271 bar_type: BarType,
1272 price_precision: u8,
1273 size_precision: u8,
1274) -> anyhow::Result<Bar> {
1275 let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
1276 let ts_event = ts_init;
1277
1278 let open = candle.open.parse::<f64>().context("parse open price")?;
1279 let high = candle.high.parse::<f64>().context("parse high price")?;
1280 let low = candle.low.parse::<f64>().context("parse low price")?;
1281 let close = candle.close.parse::<f64>().context("parse close price")?;
1282 let volume = candle.volume.parse::<f64>().context("parse volume")?;
1283
1284 Ok(Bar::new(
1285 bar_type,
1286 Price::new(open, price_precision),
1287 Price::new(high, price_precision),
1288 Price::new(low, price_precision),
1289 Price::new(close, price_precision),
1290 Quantity::new(volume, size_precision),
1291 ts_event,
1292 ts_init,
1293 ))
1294}
1295
1296async fn request_bars_from_http(
1298 http_client: HyperliquidHttpClient,
1299 bar_type: BarType,
1300 start: Option<DateTime<Utc>>,
1301 end: Option<DateTime<Utc>>,
1302 limit: Option<u32>,
1303 instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
1304) -> anyhow::Result<Vec<Bar>> {
1305 let instrument_id = bar_type.instrument_id();
1307 let instrument = instruments
1308 .load()
1309 .get(&instrument_id)
1310 .cloned()
1311 .context("instrument not found in cache")?;
1312
1313 let price_precision = instrument.price_precision();
1314 let size_precision = instrument.size_precision();
1315 let raw_symbol = instrument.raw_symbol();
1316 let coin = raw_symbol.as_str();
1317
1318 let interval = bar_type_to_interval(&bar_type)?;
1319
1320 let now = Utc::now();
1322 let end_time = end.unwrap_or(now).timestamp_millis() as u64;
1323 let start_time = if let Some(start) = start {
1324 start.timestamp_millis() as u64
1325 } else {
1326 let spec = bar_type.spec();
1328 let step_ms = match spec.aggregation {
1329 BarAggregation::Minute => spec.step.get() as u64 * 60_000,
1330 BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
1331 BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
1332 _ => 60_000,
1333 };
1334 end_time.saturating_sub(1000 * step_ms)
1335 };
1336
1337 let candles = http_client
1338 .info_candle_snapshot(coin, interval, start_time, end_time)
1339 .await
1340 .context("failed to fetch candle snapshot from Hyperliquid")?;
1341
1342 let mut bars: Vec<Bar> = candles
1343 .iter()
1344 .filter_map(|candle| {
1345 candle_to_bar(candle, bar_type, price_precision, size_precision)
1346 .map_err(|e| {
1347 log::warn!("Failed to convert candle to bar: {e}");
1348 e
1349 })
1350 .ok()
1351 })
1352 .collect();
1353
1354 if let Some(limit) = limit
1355 && bars.len() > limit as usize
1356 {
1357 bars = bars.into_iter().take(limit as usize).collect();
1358 }
1359
1360 log::debug!("Fetched {} bars for {}", bars.len(), bar_type);
1361 Ok(bars)
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366 use rstest::rstest;
1367 use rust_decimal_macros::dec;
1368 use ustr::Ustr;
1369
1370 use super::*;
1371 use crate::common::testing::load_test_data;
1372
1373 fn btc_perp_id() -> InstrumentId {
1374 InstrumentId::from("BTC-PERP.HYPERLIQUID")
1375 }
1376
1377 #[rstest]
1378 fn test_funding_entry_to_update_parses_positive_rate() {
1379 let entry = HyperliquidFundingHistoryEntry {
1380 coin: Ustr::from("BTC"),
1381 funding_rate: "0.0000125".to_string(),
1382 premium: Some("0.00029005".to_string()),
1383 time: 1769908800000,
1384 };
1385 let instrument_id = btc_perp_id();
1386
1387 let update = funding_entry_to_update(&entry, instrument_id).unwrap();
1388
1389 assert_eq!(update.instrument_id, instrument_id);
1390 assert_eq!(update.rate, dec!(0.0000125));
1391 assert_eq!(update.interval, Some(60));
1392 assert!(update.next_funding_ns.is_none());
1393 assert_eq!(update.ts_event, UnixNanos::from(1769908800000 * 1_000_000));
1394 assert_eq!(update.ts_init, update.ts_event);
1395 }
1396
1397 #[rstest]
1398 fn test_funding_entry_to_update_handles_negative_rate() {
1399 let entry = HyperliquidFundingHistoryEntry {
1400 coin: Ustr::from("BTC"),
1401 funding_rate: "-0.0000081".to_string(),
1402 premium: None,
1403 time: 1769912400000,
1404 };
1405 let update = funding_entry_to_update(&entry, btc_perp_id()).unwrap();
1406 assert_eq!(update.rate, dec!(-0.0000081));
1407 }
1408
1409 #[rstest]
1410 fn test_funding_entry_to_update_rejects_invalid_rate() {
1411 let entry = HyperliquidFundingHistoryEntry {
1412 coin: Ustr::from("BTC"),
1413 funding_rate: "not-a-number".to_string(),
1414 premium: None,
1415 time: 1769912400000,
1416 };
1417 let result = funding_entry_to_update(&entry, btc_perp_id());
1418 assert!(result.is_err());
1419 }
1420
1421 #[rstest]
1422 fn test_parse_book_precision_params_none() {
1423 let (n, m) = parse_book_precision_params(None).unwrap();
1424 assert_eq!(n, None);
1425 assert_eq!(m, None);
1426 }
1427
1428 fn make_params(json: serde_json::Value) -> Params {
1429 serde_json::from_value(json).expect("valid params payload")
1430 }
1431
1432 #[rstest]
1433 fn test_parse_book_precision_params_only_n_sig_figs() {
1434 let params = make_params(serde_json::json!({"n_sig_figs": 4}));
1435 let (n, m) = parse_book_precision_params(Some(¶ms)).unwrap();
1436 assert_eq!(n, Some(4));
1437 assert_eq!(m, None);
1438 }
1439
1440 #[rstest]
1441 fn test_parse_book_precision_params_both() {
1442 let params = make_params(serde_json::json!({"n_sig_figs": 5, "mantissa": 2}));
1443 let (n, m) = parse_book_precision_params(Some(¶ms)).unwrap();
1444 assert_eq!(n, Some(5));
1445 assert_eq!(m, Some(2));
1446 }
1447
1448 #[rstest]
1449 fn test_parse_book_precision_params_rejects_negative() {
1450 let params = make_params(serde_json::json!({"n_sig_figs": -1}));
1451 let err = parse_book_precision_params(Some(¶ms)).unwrap_err();
1452 assert!(err.to_string().contains("n_sig_figs"));
1453 }
1454
1455 #[rstest]
1456 fn test_funding_history_fixture_parses() {
1457 let entries: Vec<HyperliquidFundingHistoryEntry> =
1458 load_test_data("http_funding_history.json");
1459 assert_eq!(entries.len(), 3);
1460 assert_eq!(entries[0].coin.as_str(), "BTC");
1461 assert_eq!(entries[0].funding_rate, "0.0000125");
1462 assert_eq!(entries[0].premium.as_deref(), Some("0.00029005"));
1463 assert!(entries[2].premium.is_none());
1464
1465 let updates: Vec<FundingRateUpdate> = entries
1466 .iter()
1467 .map(|e| funding_entry_to_update(e, btc_perp_id()).unwrap())
1468 .collect();
1469 assert_eq!(updates.len(), 3);
1470 assert_eq!(updates[0].rate, dec!(0.0000125));
1471 assert_eq!(updates[1].rate, dec!(-0.0000081));
1472 assert_eq!(updates[2].rate, dec!(0.0000033));
1473 }
1474}