Skip to main content

nautilus_hyperliquid/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::sync::{
17    Arc,
18    atomic::{AtomicBool, Ordering},
19};
20
21use ahash::AHashMap;
22use anyhow::Context;
23use chrono::{DateTime, Utc};
24use nautilus_common::{
25    clients::DataClient,
26    live::{runner::get_data_event_sender, runtime::get_runtime},
27    messages::{
28        DataEvent,
29        data::{
30            BarsResponse, BookResponse, DataResponse, FundingRatesResponse, InstrumentResponse,
31            InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
32            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
33            SubscribeBookDeltas, SubscribeBookDepth10, SubscribeFundingRates, SubscribeIndexPrices,
34            SubscribeInstrument, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
35            UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeBookDepth10,
36            UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeMarkPrices,
37            UnsubscribeQuotes, UnsubscribeTrades,
38        },
39    },
40};
41use nautilus_core::{
42    AtomicMap, Params, UnixNanos,
43    datetime::datetime_to_unix_nanos,
44    time::{AtomicTime, get_atomic_clock_realtime},
45};
46use nautilus_model::{
47    data::{Bar, BarType, BookOrder, Data, FundingRateUpdate, OrderBookDeltas_API},
48    enums::{BarAggregation, BookType, OrderSide},
49    identifiers::{ClientId, InstrumentId, Venue},
50    instruments::{Instrument, InstrumentAny},
51    orderbook::OrderBook,
52    types::{Price, Quantity},
53};
54use rust_decimal::Decimal;
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57use ustr::Ustr;
58
59use crate::{
60    common::{
61        consts::HYPERLIQUID_VENUE,
62        credential::{Secrets, credential_env_vars},
63        parse::bar_type_to_interval,
64    },
65    config::HyperliquidDataClientConfig,
66    http::{
67        client::HyperliquidHttpClient,
68        models::{HyperliquidCandle, HyperliquidFundingHistoryEntry},
69    },
70    websocket::{
71        client::HyperliquidWebSocketClient,
72        messages::{HyperliquidWsMessage, NautilusWsMessage},
73        parse::{
74            parse_ws_candle, parse_ws_order_book_deltas, parse_ws_quote_tick, parse_ws_trade_tick,
75        },
76    },
77};
78
79#[derive(Debug)]
80pub struct HyperliquidDataClient {
81    client_id: ClientId,
82    #[allow(dead_code)]
83    config: HyperliquidDataClientConfig,
84    http_client: HyperliquidHttpClient,
85    ws_client: HyperliquidWebSocketClient,
86    is_connected: AtomicBool,
87    cancellation_token: CancellationToken,
88    tasks: Vec<JoinHandle<()>>,
89    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
90    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
91    // Maps coin symbols (e.g., "BTC") to instrument IDs (e.g., "BTC-PERP")
92    coin_to_instrument_id: Arc<AtomicMap<Ustr, InstrumentId>>,
93    clock: &'static AtomicTime,
94    #[allow(dead_code)]
95    instrument_refresh_active: bool,
96}
97
98impl HyperliquidDataClient {
99    /// Creates a new [`HyperliquidDataClient`] instance.
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if the HTTP client fails to initialize.
104    pub fn new(client_id: ClientId, config: HyperliquidDataClientConfig) -> anyhow::Result<Self> {
105        let clock = get_atomic_clock_realtime();
106        let data_sender = get_data_event_sender();
107
108        // Only fall back to unauthenticated when credentials are absent,
109        // not when they're invalid (fail fast on malformed keys)
110        let (pk_var, _) = credential_env_vars(config.environment);
111        let has_credentials = config.has_credentials() || std::env::var(pk_var).is_ok();
112
113        let mut http_client = if has_credentials {
114            let secrets =
115                Secrets::resolve(config.private_key.as_deref(), None, config.environment)?;
116            HyperliquidHttpClient::with_secrets(
117                &secrets,
118                config.http_timeout_secs,
119                config.proxy_url.clone(),
120            )?
121        } else {
122            HyperliquidHttpClient::new(
123                config.environment,
124                config.http_timeout_secs,
125                config.proxy_url.clone(),
126            )?
127        };
128
129        // Apply URL overrides from config (used for testing with mock servers)
130        if let Some(url) = &config.base_url_http {
131            http_client.set_base_info_url(url.clone());
132        }
133
134        let ws_url = config.base_url_ws.clone();
135        let ws_client = HyperliquidWebSocketClient::new(
136            ws_url,
137            config.environment,
138            None,
139            config.transport_backend,
140            config.proxy_url.clone(),
141        );
142
143        Ok(Self {
144            client_id,
145            config,
146            http_client,
147            ws_client,
148            is_connected: AtomicBool::new(false),
149            cancellation_token: CancellationToken::new(),
150            tasks: Vec::new(),
151            data_sender,
152            instruments: Arc::new(AtomicMap::new()),
153            coin_to_instrument_id: Arc::new(AtomicMap::new()),
154            clock,
155            instrument_refresh_active: false,
156        })
157    }
158
159    fn venue(&self) -> Venue {
160        *HYPERLIQUID_VENUE
161    }
162
163    async fn bootstrap_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
164        let instruments = self
165            .http_client
166            .request_instruments()
167            .await
168            .context("failed to fetch instruments during bootstrap")?;
169
170        self.instruments.rcu(|m| {
171            for instrument in &instruments {
172                m.insert(instrument.id(), instrument.clone());
173            }
174        });
175
176        self.coin_to_instrument_id.rcu(|m| {
177            for instrument in &instruments {
178                m.insert(instrument.raw_symbol().inner(), instrument.id());
179            }
180        });
181
182        for instrument in &instruments {
183            self.ws_client.cache_instrument(instrument.clone());
184        }
185
186        log::info!(
187            "Bootstrapped {} instruments with {} coin mappings",
188            self.instruments.len(),
189            self.coin_to_instrument_id.len()
190        );
191        Ok(instruments)
192    }
193
194    async fn spawn_ws(&mut self) -> anyhow::Result<()> {
195        // Clone client before connecting so the clone can have out_rx set
196        let mut ws_client = self.ws_client.clone();
197
198        ws_client
199            .connect()
200            .await
201            .context("failed to connect to Hyperliquid WebSocket")?;
202
203        // Transfer task handle to original so disconnect() can await it
204        if let Some(handle) = ws_client.take_task_handle() {
205            self.ws_client.set_task_handle(handle);
206        }
207
208        let data_sender = self.data_sender.clone();
209        let cancellation_token = self.cancellation_token.clone();
210
211        let task = get_runtime().spawn(async move {
212            log::info!("Hyperliquid WebSocket consumption loop started");
213
214            loop {
215                tokio::select! {
216                    () = cancellation_token.cancelled() => {
217                        log::info!("WebSocket consumption loop cancelled");
218                        break;
219                    }
220                    msg_opt = ws_client.next_event() => {
221                        if let Some(msg) = msg_opt {
222                            match msg {
223                                NautilusWsMessage::Trades(trades) => {
224                                    for trade in trades {
225                                        if let Err(e) = data_sender
226                                            .send(DataEvent::Data(Data::Trade(trade)))
227                                        {
228                                            log::error!("Failed to send trade tick: {e}");
229                                        }
230                                    }
231                                }
232                                NautilusWsMessage::Quote(quote) => {
233                                    if let Err(e) = data_sender
234                                        .send(DataEvent::Data(Data::Quote(quote)))
235                                    {
236                                        log::error!("Failed to send quote tick: {e}");
237                                    }
238                                }
239                                NautilusWsMessage::Deltas(deltas) => {
240                                    if let Err(e) = data_sender
241                                        .send(DataEvent::Data(Data::Deltas(
242                                            OrderBookDeltas_API::new(deltas),
243                                        )))
244                                    {
245                                        log::error!("Failed to send order book deltas: {e}");
246                                    }
247                                }
248                                NautilusWsMessage::Depth10(depth) => {
249                                    if let Err(e) =
250                                        data_sender.send(DataEvent::Data(Data::Depth10(depth)))
251                                    {
252                                        log::error!("Failed to send order book depth10: {e}");
253                                    }
254                                }
255                                NautilusWsMessage::Candle(bar) => {
256                                    if let Err(e) = data_sender
257                                        .send(DataEvent::Data(Data::Bar(bar)))
258                                    {
259                                        log::error!("Failed to send bar: {e}");
260                                    }
261                                }
262                                NautilusWsMessage::MarkPrice(update) => {
263                                    if let Err(e) = data_sender
264                                        .send(DataEvent::Data(Data::MarkPriceUpdate(update)))
265                                    {
266                                        log::error!("Failed to send mark price update: {e}");
267                                    }
268                                }
269                                NautilusWsMessage::IndexPrice(update) => {
270                                    if let Err(e) = data_sender
271                                        .send(DataEvent::Data(Data::IndexPriceUpdate(update)))
272                                    {
273                                        log::error!("Failed to send index price update: {e}");
274                                    }
275                                }
276                                NautilusWsMessage::FundingRate(update) => {
277                                    if let Err(e) = data_sender
278                                        .send(DataEvent::FundingRate(update))
279                                    {
280                                        log::error!("Failed to send funding rate update: {e}");
281                                    }
282                                }
283                                NautilusWsMessage::Reconnected => {
284                                    log::info!("WebSocket reconnected");
285                                }
286                                NautilusWsMessage::Error(e) => {
287                                    log::error!("WebSocket error: {e}");
288                                }
289                                NautilusWsMessage::ExecutionReports(_) => {
290                                    // Handled by execution client
291                                }
292                            }
293                        } else {
294                            // Connection closed or error
295                            log::debug!("WebSocket next_event returned None, stream closed");
296                            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
297                        }
298                    }
299                }
300            }
301
302            log::info!("Hyperliquid WebSocket consumption loop finished");
303        });
304
305        self.tasks.push(task);
306        log::info!("WebSocket consumption task spawned");
307
308        Ok(())
309    }
310
311    #[allow(dead_code)]
312    fn handle_ws_message(
313        msg: HyperliquidWsMessage,
314        ws_client: &HyperliquidWebSocketClient,
315        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
316        instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
317        coin_to_instrument_id: &Arc<AtomicMap<Ustr, InstrumentId>>,
318        _venue: Venue,
319        clock: &'static AtomicTime,
320    ) {
321        match msg {
322            HyperliquidWsMessage::Bbo { data } => {
323                let coin = data.coin;
324                log::debug!("Received BBO message for coin: {coin}");
325
326                let coin_map = coin_to_instrument_id.load();
327                let instrument_id = coin_map.get(&data.coin);
328
329                if let Some(&instrument_id) = instrument_id {
330                    let instruments_map = instruments.load();
331                    if let Some(instrument) = instruments_map.get(&instrument_id) {
332                        let ts_init = clock.get_time_ns();
333
334                        match parse_ws_quote_tick(&data, instrument, ts_init) {
335                            Ok(quote_tick) => {
336                                log::debug!(
337                                    "Parsed quote tick for {}: bid={}, ask={}",
338                                    data.coin,
339                                    quote_tick.bid_price,
340                                    quote_tick.ask_price
341                                );
342
343                                if let Err(e) =
344                                    data_sender.send(DataEvent::Data(Data::Quote(quote_tick)))
345                                {
346                                    log::error!("Failed to send quote tick: {e}");
347                                }
348                            }
349                            Err(e) => {
350                                log::error!("Failed to parse quote tick for {}: {e}", data.coin);
351                            }
352                        }
353                    }
354                } else {
355                    log::warn!(
356                        "Received BBO for unknown coin: {} (no matching instrument found)",
357                        data.coin
358                    );
359                }
360            }
361            HyperliquidWsMessage::Trades { data } => {
362                let count = data.len();
363                log::debug!("Received {count} trade(s)");
364
365                for trade_data in data {
366                    let coin = trade_data.coin;
367                    let coin_map = coin_to_instrument_id.load();
368
369                    if let Some(&instrument_id) = coin_map.get(&coin) {
370                        let instruments_map = instruments.load();
371                        if let Some(instrument) = instruments_map.get(&instrument_id) {
372                            let ts_init = clock.get_time_ns();
373
374                            match parse_ws_trade_tick(&trade_data, instrument, ts_init) {
375                                Ok(trade_tick) => {
376                                    if let Err(e) =
377                                        data_sender.send(DataEvent::Data(Data::Trade(trade_tick)))
378                                    {
379                                        log::error!("Failed to send trade tick: {e}");
380                                    }
381                                }
382                                Err(e) => {
383                                    log::error!("Failed to parse trade tick for {coin}: {e}");
384                                }
385                            }
386                        }
387                    } else {
388                        log::warn!("Received trade for unknown coin: {coin}");
389                    }
390                }
391            }
392            HyperliquidWsMessage::L2Book { data } => {
393                let coin = data.coin;
394                log::debug!("Received L2 book update for coin: {coin}");
395
396                let coin_map = coin_to_instrument_id.load();
397                if let Some(&instrument_id) = coin_map.get(&data.coin) {
398                    let instruments_map = instruments.load();
399                    if let Some(instrument) = instruments_map.get(&instrument_id) {
400                        let ts_init = clock.get_time_ns();
401
402                        match parse_ws_order_book_deltas(&data, instrument, ts_init) {
403                            Ok(deltas) => {
404                                if let Err(e) = data_sender.send(DataEvent::Data(Data::Deltas(
405                                    OrderBookDeltas_API::new(deltas),
406                                ))) {
407                                    log::error!("Failed to send order book deltas: {e}");
408                                }
409                            }
410                            Err(e) => {
411                                log::error!(
412                                    "Failed to parse order book deltas for {}: {e}",
413                                    data.coin
414                                );
415                            }
416                        }
417                    }
418                } else {
419                    log::warn!("Received L2 book for unknown coin: {coin}");
420                }
421            }
422            HyperliquidWsMessage::Candle { data } => {
423                let coin = &data.s;
424                let interval = &data.i;
425                log::debug!("Received candle for {coin}:{interval}");
426
427                if let Some(bar_type) = ws_client.get_bar_type(&data.s, &data.i) {
428                    let coin = Ustr::from(&data.s);
429                    let coin_map = coin_to_instrument_id.load();
430
431                    if let Some(&instrument_id) = coin_map.get(&coin) {
432                        let instruments_map = instruments.load();
433                        if let Some(instrument) = instruments_map.get(&instrument_id) {
434                            let ts_init = clock.get_time_ns();
435
436                            match parse_ws_candle(&data, instrument, &bar_type, ts_init) {
437                                Ok(bar) => {
438                                    if let Err(e) =
439                                        data_sender.send(DataEvent::Data(Data::Bar(bar)))
440                                    {
441                                        log::error!("Failed to send bar data: {e}");
442                                    }
443                                }
444                                Err(e) => {
445                                    log::error!("Failed to parse candle for {coin}: {e}");
446                                }
447                            }
448                        }
449                    } else {
450                        log::warn!("Received candle for unknown coin: {coin}");
451                    }
452                } else {
453                    log::debug!("Received candle for {coin}:{interval} but no BarType tracked");
454                }
455            }
456            _ => {
457                log::trace!("Received unhandled WebSocket message: {msg:?}");
458            }
459        }
460    }
461}
462
463impl HyperliquidDataClient {
464    #[allow(dead_code)]
465    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
466        if let Err(e) = sender.send(DataEvent::Data(data)) {
467            log::error!("Failed to emit data event: {e}");
468        }
469    }
470}
471
472#[async_trait::async_trait(?Send)]
473impl DataClient for HyperliquidDataClient {
474    fn client_id(&self) -> ClientId {
475        self.client_id
476    }
477
478    fn venue(&self) -> Option<Venue> {
479        Some(self.venue())
480    }
481
482    fn start(&mut self) -> anyhow::Result<()> {
483        log::info!(
484            "Starting Hyperliquid data client: client_id={}, environment={:?}, proxy_url={:?}",
485            self.client_id,
486            self.config.environment,
487            self.config.proxy_url,
488        );
489        Ok(())
490    }
491
492    fn stop(&mut self) -> anyhow::Result<()> {
493        log::info!("Stopping Hyperliquid data client {}", self.client_id);
494        self.cancellation_token.cancel();
495        self.is_connected.store(false, Ordering::Relaxed);
496        Ok(())
497    }
498
499    fn reset(&mut self) -> anyhow::Result<()> {
500        log::debug!("Resetting Hyperliquid data client {}", self.client_id);
501        self.is_connected.store(false, Ordering::Relaxed);
502        self.cancellation_token = CancellationToken::new();
503        self.tasks.clear();
504        Ok(())
505    }
506
507    fn dispose(&mut self) -> anyhow::Result<()> {
508        log::debug!("Disposing Hyperliquid data client {}", self.client_id);
509        self.stop()
510    }
511
512    fn is_connected(&self) -> bool {
513        self.is_connected.load(Ordering::Acquire)
514    }
515
516    fn is_disconnected(&self) -> bool {
517        !self.is_connected()
518    }
519
520    async fn connect(&mut self) -> anyhow::Result<()> {
521        if self.is_connected() {
522            return Ok(());
523        }
524
525        let instruments = self
526            .bootstrap_instruments()
527            .await
528            .context("failed to bootstrap instruments")?;
529
530        for instrument in instruments {
531            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
532                log::warn!("Failed to send instrument: {e}");
533            }
534        }
535
536        self.spawn_ws()
537            .await
538            .context("failed to spawn WebSocket client")?;
539
540        self.is_connected.store(true, Ordering::Relaxed);
541        log::info!("Connected: client_id={}", self.client_id);
542
543        Ok(())
544    }
545
546    async fn disconnect(&mut self) -> anyhow::Result<()> {
547        if !self.is_connected() {
548            return Ok(());
549        }
550
551        self.cancellation_token.cancel();
552
553        for task in self.tasks.drain(..) {
554            if let Err(e) = task.await {
555                log::error!("Error waiting for task to complete: {e}");
556            }
557        }
558
559        if let Err(e) = self.ws_client.disconnect().await {
560            log::error!("Error disconnecting WebSocket client: {e}");
561        }
562
563        self.instruments.store(AHashMap::new());
564
565        self.is_connected.store(false, Ordering::Relaxed);
566        log::info!("Disconnected: client_id={}", self.client_id);
567
568        Ok(())
569    }
570
571    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
572        let instruments = self.instruments.load();
573        if let Some(instrument) = instruments.get(&cmd.instrument_id) {
574            if let Err(e) = self
575                .data_sender
576                .send(DataEvent::Instrument(instrument.clone()))
577            {
578                log::error!("Failed to send instrument {}: {e}", cmd.instrument_id);
579            }
580        } else {
581            log::warn!("Instrument {} not found in cache", cmd.instrument_id);
582        }
583        Ok(())
584    }
585
586    fn subscribe_book_deltas(&mut self, subscription: SubscribeBookDeltas) -> anyhow::Result<()> {
587        log::debug!("Subscribing to book deltas: {}", subscription.instrument_id);
588
589        if subscription.book_type != BookType::L2_MBP {
590            anyhow::bail!("Hyperliquid only supports L2_MBP order book deltas");
591        }
592
593        let ws = self.ws_client.clone();
594        let instrument_id = subscription.instrument_id;
595        let (n_sig_figs, mantissa) = parse_book_precision_params(subscription.params.as_ref())?;
596
597        get_runtime().spawn(async move {
598            if let Err(e) = ws
599                .subscribe_book_with_options(instrument_id, n_sig_figs, mantissa)
600                .await
601            {
602                log::error!("Failed to subscribe to book deltas: {e:?}");
603            }
604        });
605
606        Ok(())
607    }
608
609    fn subscribe_book_depth10(&mut self, subscription: SubscribeBookDepth10) -> anyhow::Result<()> {
610        log::debug!(
611            "Subscribing to book depth10: {}",
612            subscription.instrument_id
613        );
614
615        if subscription.book_type != BookType::L2_MBP {
616            anyhow::bail!("Hyperliquid only supports L2_MBP order book depth10");
617        }
618
619        let ws = self.ws_client.clone();
620        let instrument_id = subscription.instrument_id;
621        let (n_sig_figs, mantissa) = parse_book_precision_params(subscription.params.as_ref())?;
622
623        get_runtime().spawn(async move {
624            if let Err(e) = ws
625                .subscribe_book_depth10_with_options(instrument_id, n_sig_figs, mantissa)
626                .await
627            {
628                log::error!("Failed to subscribe to book depth10: {e:?}");
629            }
630        });
631
632        Ok(())
633    }
634
635    fn subscribe_quotes(&mut self, subscription: SubscribeQuotes) -> anyhow::Result<()> {
636        log::debug!("Subscribing to quotes: {}", subscription.instrument_id);
637
638        let ws = self.ws_client.clone();
639        let instrument_id = subscription.instrument_id;
640
641        get_runtime().spawn(async move {
642            if let Err(e) = ws.subscribe_quotes(instrument_id).await {
643                log::error!("Failed to subscribe to quotes: {e:?}");
644            }
645        });
646
647        Ok(())
648    }
649
650    fn subscribe_trades(&mut self, subscription: SubscribeTrades) -> anyhow::Result<()> {
651        log::debug!("Subscribing to trades: {}", subscription.instrument_id);
652
653        let ws = self.ws_client.clone();
654        let instrument_id = subscription.instrument_id;
655
656        get_runtime().spawn(async move {
657            if let Err(e) = ws.subscribe_trades(instrument_id).await {
658                log::error!("Failed to subscribe to trades: {e:?}");
659            }
660        });
661
662        Ok(())
663    }
664
665    fn subscribe_mark_prices(&mut self, cmd: SubscribeMarkPrices) -> anyhow::Result<()> {
666        let ws = self.ws_client.clone();
667        let instrument_id = cmd.instrument_id;
668
669        get_runtime().spawn(async move {
670            if let Err(e) = ws.subscribe_mark_prices(instrument_id).await {
671                log::error!("Failed to subscribe to mark prices: {e:?}");
672            }
673        });
674
675        Ok(())
676    }
677
678    fn subscribe_index_prices(&mut self, cmd: SubscribeIndexPrices) -> anyhow::Result<()> {
679        let ws = self.ws_client.clone();
680        let instrument_id = cmd.instrument_id;
681
682        get_runtime().spawn(async move {
683            if let Err(e) = ws.subscribe_index_prices(instrument_id).await {
684                log::error!("Failed to subscribe to index prices: {e:?}");
685            }
686        });
687
688        Ok(())
689    }
690
691    fn subscribe_funding_rates(&mut self, cmd: SubscribeFundingRates) -> anyhow::Result<()> {
692        let ws = self.ws_client.clone();
693        let instrument_id = cmd.instrument_id;
694
695        get_runtime().spawn(async move {
696            if let Err(e) = ws.subscribe_funding_rates(instrument_id).await {
697                log::error!("Failed to subscribe to funding rates: {e:?}");
698            }
699        });
700
701        Ok(())
702    }
703
704    fn subscribe_bars(&mut self, subscription: SubscribeBars) -> anyhow::Result<()> {
705        log::debug!("Subscribing to bars: {}", subscription.bar_type);
706
707        let instrument_id = subscription.bar_type.instrument_id();
708        if !self.instruments.contains_key(&instrument_id) {
709            anyhow::bail!("Instrument {instrument_id} not found");
710        }
711
712        let bar_type = subscription.bar_type;
713        let ws = self.ws_client.clone();
714
715        get_runtime().spawn(async move {
716            if let Err(e) = ws.subscribe_bars(bar_type).await {
717                log::error!("Failed to subscribe to bars: {e:?}");
718            }
719        });
720
721        Ok(())
722    }
723
724    fn unsubscribe_book_deltas(
725        &mut self,
726        unsubscription: &UnsubscribeBookDeltas,
727    ) -> anyhow::Result<()> {
728        log::debug!(
729            "Unsubscribing from book deltas: {}",
730            unsubscription.instrument_id
731        );
732
733        let ws = self.ws_client.clone();
734        let instrument_id = unsubscription.instrument_id;
735
736        get_runtime().spawn(async move {
737            if let Err(e) = ws.unsubscribe_book(instrument_id).await {
738                log::error!("Failed to unsubscribe from book deltas: {e:?}");
739            }
740        });
741
742        Ok(())
743    }
744
745    fn unsubscribe_book_depth10(
746        &mut self,
747        unsubscription: &UnsubscribeBookDepth10,
748    ) -> anyhow::Result<()> {
749        log::debug!(
750            "Unsubscribing from book depth10: {}",
751            unsubscription.instrument_id
752        );
753
754        let ws = self.ws_client.clone();
755        let instrument_id = unsubscription.instrument_id;
756
757        get_runtime().spawn(async move {
758            if let Err(e) = ws.unsubscribe_book_depth10(instrument_id).await {
759                log::error!("Failed to unsubscribe from book depth10: {e:?}");
760            }
761        });
762
763        Ok(())
764    }
765
766    fn unsubscribe_quotes(&mut self, unsubscription: &UnsubscribeQuotes) -> anyhow::Result<()> {
767        log::debug!(
768            "Unsubscribing from quotes: {}",
769            unsubscription.instrument_id
770        );
771
772        let ws = self.ws_client.clone();
773        let instrument_id = unsubscription.instrument_id;
774
775        get_runtime().spawn(async move {
776            if let Err(e) = ws.unsubscribe_quotes(instrument_id).await {
777                log::error!("Failed to unsubscribe from quotes: {e:?}");
778            }
779        });
780
781        Ok(())
782    }
783
784    fn unsubscribe_trades(&mut self, unsubscription: &UnsubscribeTrades) -> anyhow::Result<()> {
785        log::debug!(
786            "Unsubscribing from trades: {}",
787            unsubscription.instrument_id
788        );
789
790        let ws = self.ws_client.clone();
791        let instrument_id = unsubscription.instrument_id;
792
793        get_runtime().spawn(async move {
794            if let Err(e) = ws.unsubscribe_trades(instrument_id).await {
795                log::error!("Failed to unsubscribe from trades: {e:?}");
796            }
797        });
798
799        Ok(())
800    }
801
802    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
803        let ws = self.ws_client.clone();
804        let instrument_id = cmd.instrument_id;
805
806        get_runtime().spawn(async move {
807            if let Err(e) = ws.unsubscribe_mark_prices(instrument_id).await {
808                log::error!("Failed to unsubscribe from mark prices: {e:?}");
809            }
810        });
811
812        Ok(())
813    }
814
815    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
816        let ws = self.ws_client.clone();
817        let instrument_id = cmd.instrument_id;
818
819        get_runtime().spawn(async move {
820            if let Err(e) = ws.unsubscribe_index_prices(instrument_id).await {
821                log::error!("Failed to unsubscribe from index prices: {e:?}");
822            }
823        });
824
825        Ok(())
826    }
827
828    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
829        let ws = self.ws_client.clone();
830        let instrument_id = cmd.instrument_id;
831
832        get_runtime().spawn(async move {
833            if let Err(e) = ws.unsubscribe_funding_rates(instrument_id).await {
834                log::error!("Failed to unsubscribe from funding rates: {e:?}");
835            }
836        });
837
838        Ok(())
839    }
840
841    fn unsubscribe_bars(&mut self, unsubscription: &UnsubscribeBars) -> anyhow::Result<()> {
842        log::debug!("Unsubscribing from bars: {}", unsubscription.bar_type);
843
844        let bar_type = unsubscription.bar_type;
845        let ws = self.ws_client.clone();
846
847        get_runtime().spawn(async move {
848            if let Err(e) = ws.unsubscribe_bars(bar_type).await {
849                log::error!("Failed to unsubscribe from bars: {e:?}");
850            }
851        });
852
853        Ok(())
854    }
855
856    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
857        log::debug!("Requesting all instruments");
858
859        let http = self.http_client.clone();
860        let sender = self.data_sender.clone();
861        let instruments_cache = self.instruments.clone();
862        let coin_map = self.coin_to_instrument_id.clone();
863        let ws_instruments = self.ws_client.instruments_cache();
864        let request_id = request.request_id;
865        let client_id = request.client_id.unwrap_or(self.client_id);
866        let venue = self.venue();
867        let start_nanos = datetime_to_unix_nanos(request.start);
868        let end_nanos = datetime_to_unix_nanos(request.end);
869        let params = request.params;
870        let clock = self.clock;
871
872        get_runtime().spawn(async move {
873            match http.request_instruments().await {
874                Ok(instruments) => {
875                    instruments_cache.rcu(|instruments_map| {
876                        coin_map.rcu(|coin_to_id| {
877                            for instrument in &instruments {
878                                let instrument_id = instrument.id();
879                                instruments_map.insert(instrument_id, instrument.clone());
880                                let coin = instrument.raw_symbol().inner();
881                                coin_to_id.insert(coin, instrument_id);
882                                ws_instruments.insert(coin, instrument.clone());
883                            }
884                        });
885                    });
886
887                    let response = DataResponse::Instruments(InstrumentsResponse::new(
888                        request_id,
889                        client_id,
890                        venue,
891                        instruments,
892                        start_nanos,
893                        end_nanos,
894                        clock.get_time_ns(),
895                        params,
896                    ));
897
898                    if let Err(e) = sender.send(DataEvent::Response(response)) {
899                        log::error!("Failed to send instruments response: {e}");
900                    }
901                }
902                Err(e) => {
903                    log::error!("Failed to fetch instruments from Hyperliquid: {e:?}");
904                }
905            }
906        });
907
908        Ok(())
909    }
910
911    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
912        log::debug!("Requesting instrument: {}", request.instrument_id);
913
914        let http = self.http_client.clone();
915        let sender = self.data_sender.clone();
916        let instruments_cache = self.instruments.clone();
917        let coin_map = self.coin_to_instrument_id.clone();
918        let ws_instruments = self.ws_client.instruments_cache();
919        let instrument_id = request.instrument_id;
920        let request_id = request.request_id;
921        let client_id = request.client_id.unwrap_or(self.client_id);
922        let start_nanos = datetime_to_unix_nanos(request.start);
923        let end_nanos = datetime_to_unix_nanos(request.end);
924        let params = request.params;
925        let clock = self.clock;
926
927        get_runtime().spawn(async move {
928            match http.request_instruments().await {
929                Ok(all_instruments) => {
930                    instruments_cache.rcu(|instruments_map| {
931                        coin_map.rcu(|coin_to_id| {
932                            for instrument in &all_instruments {
933                                let id = instrument.id();
934                                instruments_map.insert(id, instrument.clone());
935                                let coin = instrument.raw_symbol().inner();
936                                coin_to_id.insert(coin, id);
937                                ws_instruments.insert(coin, instrument.clone());
938                            }
939                        });
940                    });
941
942                    if let Some(instrument) = all_instruments
943                        .into_iter()
944                        .find(|i| i.id() == instrument_id)
945                    {
946                        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
947                            request_id,
948                            client_id,
949                            instrument.id(),
950                            instrument,
951                            start_nanos,
952                            end_nanos,
953                            clock.get_time_ns(),
954                            params,
955                        )));
956
957                        if let Err(e) = sender.send(DataEvent::Response(response)) {
958                            log::error!("Failed to send instrument response: {e}");
959                        }
960                    } else {
961                        log::error!("Instrument not found: {instrument_id}");
962                    }
963                }
964                Err(e) => {
965                    log::error!("Failed to fetch instruments from Hyperliquid: {e:?}");
966                }
967            }
968        });
969
970        Ok(())
971    }
972
973    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
974        log::debug!("Requesting bars for {}", request.bar_type);
975
976        let http = self.http_client.clone();
977        let sender = self.data_sender.clone();
978        let bar_type = request.bar_type;
979        let start = request.start;
980        let end = request.end;
981        let limit = request.limit.map(|n| n.get() as u32);
982        let request_id = request.request_id;
983        let client_id = request.client_id.unwrap_or(self.client_id);
984        let params = request.params;
985        let clock = self.clock;
986        let start_nanos = datetime_to_unix_nanos(start);
987        let end_nanos = datetime_to_unix_nanos(end);
988        let instruments = Arc::clone(&self.instruments);
989
990        get_runtime().spawn(async move {
991            match request_bars_from_http(http, bar_type, start, end, limit, instruments).await {
992                Ok(bars) => {
993                    let response = DataResponse::Bars(BarsResponse::new(
994                        request_id,
995                        client_id,
996                        bar_type,
997                        bars,
998                        start_nanos,
999                        end_nanos,
1000                        clock.get_time_ns(),
1001                        params,
1002                    ));
1003
1004                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1005                        log::error!("Failed to send bars response: {e}");
1006                    }
1007                }
1008                Err(e) => log::error!("Bar request failed: {e:?}"),
1009            }
1010        });
1011
1012        Ok(())
1013    }
1014
1015    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1016        // Hyperliquid has no public trade-tape REST endpoint; real-time
1017        // trades are available via the `trades` WebSocket channel and
1018        // account-scoped fills via `userFills`/`userFillsByTime`, but
1019        // market-wide trade history cannot be served.
1020        anyhow::bail!(
1021            "Historical trade requests are not supported by Hyperliquid for {}; \
1022             subscribe to trades via WebSocket for live trade ticks",
1023            request.instrument_id,
1024        )
1025    }
1026
1027    fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1028        let instrument_id = request.instrument_id;
1029        log::debug!("Requesting funding rates for {instrument_id}");
1030
1031        let instruments = self.instruments.load();
1032        let instrument = instruments
1033            .get(&instrument_id)
1034            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1035
1036        if !matches!(instrument, InstrumentAny::CryptoPerpetual(_)) {
1037            anyhow::bail!("Funding rates are only available for perpetual instruments");
1038        }
1039
1040        let coin = instrument.raw_symbol().to_string();
1041        let http = self.http_client.clone();
1042        let sender = self.data_sender.clone();
1043        let client_id = request.client_id.unwrap_or(self.client_id);
1044        let request_id = request.request_id;
1045        let params = request.params;
1046        let clock = self.clock;
1047        let limit = request.limit.map(|n| n.get());
1048        let start_dt = request.start;
1049        let end_dt = request.end;
1050        let start_nanos = datetime_to_unix_nanos(start_dt);
1051        let end_nanos = datetime_to_unix_nanos(end_dt);
1052
1053        let now_ms = Utc::now().timestamp_millis() as u64;
1054
1055        // Hyperliquid requires a startTime; default to a 7-day lookback when none given
1056        let default_lookback_ms: u64 = 7 * 86_400_000;
1057        let start_ms = match start_dt {
1058            Some(dt) => dt.timestamp_millis().max(0) as u64,
1059            None => now_ms.saturating_sub(default_lookback_ms),
1060        };
1061        let end_ms = end_dt.map(|dt| dt.timestamp_millis().max(0) as u64);
1062
1063        get_runtime().spawn(async move {
1064            match http.info_funding_history(&coin, start_ms, end_ms).await {
1065                Ok(entries) => {
1066                    let mut funding_rates: Vec<FundingRateUpdate> = entries
1067                        .iter()
1068                        .filter_map(
1069                            |entry| match funding_entry_to_update(entry, instrument_id) {
1070                                Ok(update) => Some(update),
1071                                Err(e) => {
1072                                    log::warn!(
1073                                        "Skipping funding history entry for {instrument_id}: {e}",
1074                                    );
1075                                    None
1076                                }
1077                            },
1078                        )
1079                        .collect();
1080
1081                    if let Some(limit) = limit
1082                        && funding_rates.len() > limit
1083                    {
1084                        funding_rates.truncate(limit);
1085                    }
1086
1087                    log::debug!(
1088                        "Fetched {} funding rates for {instrument_id}",
1089                        funding_rates.len(),
1090                    );
1091
1092                    let response = DataResponse::FundingRates(FundingRatesResponse::new(
1093                        request_id,
1094                        client_id,
1095                        instrument_id,
1096                        funding_rates,
1097                        start_nanos,
1098                        end_nanos,
1099                        clock.get_time_ns(),
1100                        params,
1101                    ));
1102
1103                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1104                        log::error!("Failed to send funding rates response: {e}");
1105                    }
1106                }
1107                Err(e) => log::error!("Funding rates request failed for {instrument_id}: {e:?}"),
1108            }
1109        });
1110
1111        Ok(())
1112    }
1113
1114    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1115        let instrument_id = request.instrument_id;
1116        let instruments = self.instruments.load();
1117        let instrument = instruments
1118            .get(&instrument_id)
1119            .ok_or_else(|| anyhow::anyhow!("Instrument {instrument_id} not found"))?;
1120
1121        let raw_symbol = instrument.raw_symbol().to_string();
1122        let price_precision = instrument.price_precision();
1123        let size_precision = instrument.size_precision();
1124        let depth = request.depth.map(|d| d.get());
1125
1126        let http = self.http_client.clone();
1127        let sender = self.data_sender.clone();
1128        let client_id = request.client_id.unwrap_or(self.client_id);
1129        let request_id = request.request_id;
1130        let params = request.params;
1131        let clock = self.clock;
1132
1133        get_runtime().spawn(async move {
1134            match http.info_l2_book(&raw_symbol).await {
1135                Ok(l2_book) => {
1136                    let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1137                    let ts_event = UnixNanos::from(l2_book.time * 1_000_000);
1138
1139                    let all_bids = l2_book
1140                        .levels
1141                        .first()
1142                        .map_or([].as_slice(), |v| v.as_slice());
1143                    let all_asks = l2_book
1144                        .levels
1145                        .get(1)
1146                        .map_or([].as_slice(), |v| v.as_slice());
1147
1148                    let bids = match depth {
1149                        Some(d) if d < all_bids.len() => &all_bids[..d],
1150                        _ => all_bids,
1151                    };
1152                    let asks = match depth {
1153                        Some(d) if d < all_asks.len() => &all_asks[..d],
1154                        _ => all_asks,
1155                    };
1156
1157                    for (i, level) in bids.iter().enumerate() {
1158                        let px: f64 = match level.px.parse() {
1159                            Ok(v) => v,
1160                            Err(_) => continue,
1161                        };
1162                        let sz: f64 = match level.sz.parse() {
1163                            Ok(v) => v,
1164                            Err(_) => continue,
1165                        };
1166
1167                        if sz > 0.0 {
1168                            let price = Price::new(px, price_precision);
1169                            let size = Quantity::new(sz, size_precision);
1170                            let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1171                            book.add(order, 0, i as u64, ts_event);
1172                        }
1173                    }
1174
1175                    let bids_len = bids.len();
1176
1177                    for (i, level) in asks.iter().enumerate() {
1178                        let px: f64 = match level.px.parse() {
1179                            Ok(v) => v,
1180                            Err(_) => continue,
1181                        };
1182                        let sz: f64 = match level.sz.parse() {
1183                            Ok(v) => v,
1184                            Err(_) => continue,
1185                        };
1186
1187                        if sz > 0.0 {
1188                            let price = Price::new(px, price_precision);
1189                            let size = Quantity::new(sz, size_precision);
1190                            let order =
1191                                BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1192                            book.add(order, 0, (bids_len + i) as u64, ts_event);
1193                        }
1194                    }
1195
1196                    log::info!(
1197                        "Fetched order book for {instrument_id} with {} bids and {} asks",
1198                        bids.len(),
1199                        asks.len(),
1200                    );
1201
1202                    let response = DataResponse::Book(BookResponse::new(
1203                        request_id,
1204                        client_id,
1205                        instrument_id,
1206                        book,
1207                        None,
1208                        None,
1209                        clock.get_time_ns(),
1210                        params,
1211                    ));
1212
1213                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1214                        log::error!("Failed to send book snapshot response: {e}");
1215                    }
1216                }
1217                Err(e) => log::error!("Book snapshot request failed for {instrument_id}: {e:?}"),
1218            }
1219        });
1220
1221        Ok(())
1222    }
1223}
1224
1225// Reads optional `nSigFigs` / `mantissa` L2 precision controls from
1226// `subscribe_params`; bails on non-positive integer values.
1227pub(crate) fn parse_book_precision_params(
1228    params: Option<&Params>,
1229) -> anyhow::Result<(Option<u32>, Option<u32>)> {
1230    let Some(params) = params else {
1231        return Ok((None, None));
1232    };
1233
1234    let read_u32 = |key: &str| -> anyhow::Result<Option<u32>> {
1235        match params.get(key) {
1236            None => Ok(None),
1237            Some(v) => v
1238                .as_u64()
1239                .and_then(|n| u32::try_from(n).ok())
1240                .ok_or_else(|| anyhow::anyhow!("`{key}` must be a positive u32"))
1241                .map(Some),
1242        }
1243    };
1244
1245    Ok((read_u32("n_sig_figs")?, read_u32("mantissa")?))
1246}
1247
1248// Hyperliquid funds perpetuals hourly, so `interval` is fixed at 60 mins;
1249// `time` from the venue marks the end of the funding interval in ms.
1250pub(crate) fn funding_entry_to_update(
1251    entry: &HyperliquidFundingHistoryEntry,
1252    instrument_id: InstrumentId,
1253) -> anyhow::Result<FundingRateUpdate> {
1254    let rate: Decimal = entry
1255        .funding_rate
1256        .parse()
1257        .with_context(|| format!("invalid fundingRate '{}'", entry.funding_rate))?;
1258    let ts = UnixNanos::from(entry.time * 1_000_000);
1259    Ok(FundingRateUpdate::new(
1260        instrument_id,
1261        rate,
1262        Some(60),
1263        None,
1264        ts,
1265        ts,
1266    ))
1267}
1268
1269pub(crate) fn candle_to_bar(
1270    candle: &HyperliquidCandle,
1271    bar_type: BarType,
1272    price_precision: u8,
1273    size_precision: u8,
1274) -> anyhow::Result<Bar> {
1275    let ts_init = UnixNanos::from(candle.timestamp * 1_000_000);
1276    let ts_event = ts_init;
1277
1278    let open = candle.open.parse::<f64>().context("parse open price")?;
1279    let high = candle.high.parse::<f64>().context("parse high price")?;
1280    let low = candle.low.parse::<f64>().context("parse low price")?;
1281    let close = candle.close.parse::<f64>().context("parse close price")?;
1282    let volume = candle.volume.parse::<f64>().context("parse volume")?;
1283
1284    Ok(Bar::new(
1285        bar_type,
1286        Price::new(open, price_precision),
1287        Price::new(high, price_precision),
1288        Price::new(low, price_precision),
1289        Price::new(close, price_precision),
1290        Quantity::new(volume, size_precision),
1291        ts_event,
1292        ts_init,
1293    ))
1294}
1295
1296/// Request bars from HTTP API.
1297async fn request_bars_from_http(
1298    http_client: HyperliquidHttpClient,
1299    bar_type: BarType,
1300    start: Option<DateTime<Utc>>,
1301    end: Option<DateTime<Utc>>,
1302    limit: Option<u32>,
1303    instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>>,
1304) -> anyhow::Result<Vec<Bar>> {
1305    // Get instrument details for precision
1306    let instrument_id = bar_type.instrument_id();
1307    let instrument = instruments
1308        .load()
1309        .get(&instrument_id)
1310        .cloned()
1311        .context("instrument not found in cache")?;
1312
1313    let price_precision = instrument.price_precision();
1314    let size_precision = instrument.size_precision();
1315    let raw_symbol = instrument.raw_symbol();
1316    let coin = raw_symbol.as_str();
1317
1318    let interval = bar_type_to_interval(&bar_type)?;
1319
1320    // Hyperliquid uses millisecond timestamps
1321    let now = Utc::now();
1322    let end_time = end.unwrap_or(now).timestamp_millis() as u64;
1323    let start_time = if let Some(start) = start {
1324        start.timestamp_millis() as u64
1325    } else {
1326        // Default to 1000 bars before end_time
1327        let spec = bar_type.spec();
1328        let step_ms = match spec.aggregation {
1329            BarAggregation::Minute => spec.step.get() as u64 * 60_000,
1330            BarAggregation::Hour => spec.step.get() as u64 * 3_600_000,
1331            BarAggregation::Day => spec.step.get() as u64 * 86_400_000,
1332            _ => 60_000,
1333        };
1334        end_time.saturating_sub(1000 * step_ms)
1335    };
1336
1337    let candles = http_client
1338        .info_candle_snapshot(coin, interval, start_time, end_time)
1339        .await
1340        .context("failed to fetch candle snapshot from Hyperliquid")?;
1341
1342    let mut bars: Vec<Bar> = candles
1343        .iter()
1344        .filter_map(|candle| {
1345            candle_to_bar(candle, bar_type, price_precision, size_precision)
1346                .map_err(|e| {
1347                    log::warn!("Failed to convert candle to bar: {e}");
1348                    e
1349                })
1350                .ok()
1351        })
1352        .collect();
1353
1354    if let Some(limit) = limit
1355        && bars.len() > limit as usize
1356    {
1357        bars = bars.into_iter().take(limit as usize).collect();
1358    }
1359
1360    log::debug!("Fetched {} bars for {}", bars.len(), bar_type);
1361    Ok(bars)
1362}
1363
1364#[cfg(test)]
1365mod tests {
1366    use rstest::rstest;
1367    use rust_decimal_macros::dec;
1368    use ustr::Ustr;
1369
1370    use super::*;
1371    use crate::common::testing::load_test_data;
1372
1373    fn btc_perp_id() -> InstrumentId {
1374        InstrumentId::from("BTC-PERP.HYPERLIQUID")
1375    }
1376
1377    #[rstest]
1378    fn test_funding_entry_to_update_parses_positive_rate() {
1379        let entry = HyperliquidFundingHistoryEntry {
1380            coin: Ustr::from("BTC"),
1381            funding_rate: "0.0000125".to_string(),
1382            premium: Some("0.00029005".to_string()),
1383            time: 1769908800000,
1384        };
1385        let instrument_id = btc_perp_id();
1386
1387        let update = funding_entry_to_update(&entry, instrument_id).unwrap();
1388
1389        assert_eq!(update.instrument_id, instrument_id);
1390        assert_eq!(update.rate, dec!(0.0000125));
1391        assert_eq!(update.interval, Some(60));
1392        assert!(update.next_funding_ns.is_none());
1393        assert_eq!(update.ts_event, UnixNanos::from(1769908800000 * 1_000_000));
1394        assert_eq!(update.ts_init, update.ts_event);
1395    }
1396
1397    #[rstest]
1398    fn test_funding_entry_to_update_handles_negative_rate() {
1399        let entry = HyperliquidFundingHistoryEntry {
1400            coin: Ustr::from("BTC"),
1401            funding_rate: "-0.0000081".to_string(),
1402            premium: None,
1403            time: 1769912400000,
1404        };
1405        let update = funding_entry_to_update(&entry, btc_perp_id()).unwrap();
1406        assert_eq!(update.rate, dec!(-0.0000081));
1407    }
1408
1409    #[rstest]
1410    fn test_funding_entry_to_update_rejects_invalid_rate() {
1411        let entry = HyperliquidFundingHistoryEntry {
1412            coin: Ustr::from("BTC"),
1413            funding_rate: "not-a-number".to_string(),
1414            premium: None,
1415            time: 1769912400000,
1416        };
1417        let result = funding_entry_to_update(&entry, btc_perp_id());
1418        assert!(result.is_err());
1419    }
1420
1421    #[rstest]
1422    fn test_parse_book_precision_params_none() {
1423        let (n, m) = parse_book_precision_params(None).unwrap();
1424        assert_eq!(n, None);
1425        assert_eq!(m, None);
1426    }
1427
1428    fn make_params(json: serde_json::Value) -> Params {
1429        serde_json::from_value(json).expect("valid params payload")
1430    }
1431
1432    #[rstest]
1433    fn test_parse_book_precision_params_only_n_sig_figs() {
1434        let params = make_params(serde_json::json!({"n_sig_figs": 4}));
1435        let (n, m) = parse_book_precision_params(Some(&params)).unwrap();
1436        assert_eq!(n, Some(4));
1437        assert_eq!(m, None);
1438    }
1439
1440    #[rstest]
1441    fn test_parse_book_precision_params_both() {
1442        let params = make_params(serde_json::json!({"n_sig_figs": 5, "mantissa": 2}));
1443        let (n, m) = parse_book_precision_params(Some(&params)).unwrap();
1444        assert_eq!(n, Some(5));
1445        assert_eq!(m, Some(2));
1446    }
1447
1448    #[rstest]
1449    fn test_parse_book_precision_params_rejects_negative() {
1450        let params = make_params(serde_json::json!({"n_sig_figs": -1}));
1451        let err = parse_book_precision_params(Some(&params)).unwrap_err();
1452        assert!(err.to_string().contains("n_sig_figs"));
1453    }
1454
1455    #[rstest]
1456    fn test_funding_history_fixture_parses() {
1457        let entries: Vec<HyperliquidFundingHistoryEntry> =
1458            load_test_data("http_funding_history.json");
1459        assert_eq!(entries.len(), 3);
1460        assert_eq!(entries[0].coin.as_str(), "BTC");
1461        assert_eq!(entries[0].funding_rate, "0.0000125");
1462        assert_eq!(entries[0].premium.as_deref(), Some("0.00029005"));
1463        assert!(entries[2].premium.is_none());
1464
1465        let updates: Vec<FundingRateUpdate> = entries
1466            .iter()
1467            .map(|e| funding_entry_to_update(e, btc_perp_id()).unwrap())
1468            .collect();
1469        assert_eq!(updates.len(), 3);
1470        assert_eq!(updates[0].rate, dec!(0.0000125));
1471        assert_eq!(updates[1].rate, dec!(-0.0000081));
1472        assert_eq!(updates[2].rate, dec!(0.0000033));
1473    }
1474}