nautilus_hyperliquid/http/
rate_limits.rs1use std::{
17 collections::hash_map::DefaultHasher,
18 hash::{Hash, Hasher},
19 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
20};
21
22use serde_json::Value;
23
24use crate::{
25 common::enums::HyperliquidInfoRequestType,
26 http::query::{ExchangeAction, ExchangeActionParams, InfoRequest},
27};
28
29#[derive(Debug)]
30pub struct WeightedLimiter {
31 capacity: f64, refill_per_sec: f64, state: tokio::sync::Mutex<State>,
34}
35
36#[derive(Debug)]
37struct State {
38 tokens: f64,
39 last_refill: Instant,
40}
41
42impl WeightedLimiter {
43 pub fn per_minute(capacity: u32) -> Self {
44 let cap = capacity as f64;
45 Self {
46 capacity: cap,
47 refill_per_sec: cap / 60.0,
48 state: tokio::sync::Mutex::new(State {
49 tokens: cap,
50 last_refill: Instant::now(),
51 }),
52 }
53 }
54
55 pub async fn acquire(&self, weight: u32) {
57 let need = weight as f64;
58
59 loop {
60 let mut st = self.state.lock().await;
61 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
62
63 if st.tokens >= need {
64 st.tokens -= need;
65 return;
66 }
67 let deficit = need - st.tokens;
68 let secs = deficit / self.refill_per_sec;
69 drop(st);
70 tokio::time::sleep(Duration::from_secs_f64(secs.max(0.01))).await;
71 }
72 }
73
74 pub async fn debit_extra(&self, extra: u32) {
76 if extra == 0 {
77 return;
78 }
79 let mut st = self.state.lock().await;
80 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
81 st.tokens = (st.tokens - extra as f64).max(0.0);
82 }
83
84 pub async fn snapshot(&self) -> RateLimitSnapshot {
85 let mut st = self.state.lock().await;
86 Self::refill_locked(&mut st, self.refill_per_sec, self.capacity);
87 RateLimitSnapshot {
88 capacity: self.capacity as u32,
89 tokens: st.tokens.max(0.0) as u32,
90 }
91 }
92
93 fn refill_locked(st: &mut State, per_sec: f64, cap: f64) {
94 let dt = Instant::now().duration_since(st.last_refill).as_secs_f64();
95 if dt > 0.0 {
96 st.tokens = (st.tokens + dt * per_sec).min(cap);
97 st.last_refill = Instant::now();
98 }
99 }
100}
101
102#[derive(Debug, Clone, Copy)]
103pub struct RateLimitSnapshot {
104 pub capacity: u32,
105 pub tokens: u32,
106}
107
108pub fn backoff_full_jitter(attempt: u32, base: Duration, cap: Duration) -> Duration {
109 let mut hasher = DefaultHasher::new();
110 attempt.hash(&mut hasher);
111 let nanos = SystemTime::now()
112 .duration_since(UNIX_EPOCH)
113 .unwrap_or_default()
114 .as_nanos();
115 nanos.hash(&mut hasher);
116 let hash = hasher.finish();
117
118 let max = (base.as_millis() as u64)
119 .saturating_mul(1u64 << attempt.min(16))
120 .min(cap.as_millis() as u64)
121 .max(base.as_millis() as u64);
122
123 Duration::from_millis((hash % max).max(1))
125}
126
127pub fn info_base_weight(req: &InfoRequest) -> u32 {
129 match req.request_type {
130 HyperliquidInfoRequestType::L2Book
131 | HyperliquidInfoRequestType::AllMids
132 | HyperliquidInfoRequestType::ClearinghouseState
133 | HyperliquidInfoRequestType::OrderStatus
134 | HyperliquidInfoRequestType::SpotClearinghouseState
135 | HyperliquidInfoRequestType::ExchangeStatus
136 | HyperliquidInfoRequestType::UserFees => 2,
137 HyperliquidInfoRequestType::UserRole => 60,
138 _ => 20,
139 }
140}
141
142pub fn info_extra_weight(req: &InfoRequest, json: &Value) -> u32 {
145 let items = match json {
146 Value::Array(a) => a.len(),
147 Value::Object(m) => m
148 .values()
149 .filter_map(|v| v.as_array().map(|a| a.len()))
150 .max()
151 .unwrap_or(0),
152 _ => 0,
153 };
154
155 let unit = match req.request_type {
156 HyperliquidInfoRequestType::CandleSnapshot => 60usize,
157 HyperliquidInfoRequestType::HistoricalOrders
158 | HyperliquidInfoRequestType::UserFills
159 | HyperliquidInfoRequestType::UserFillsByTime
160 | HyperliquidInfoRequestType::FundingHistory
161 | HyperliquidInfoRequestType::UserFunding
162 | HyperliquidInfoRequestType::NonUserFundingUpdates
163 | HyperliquidInfoRequestType::TwapHistory
164 | HyperliquidInfoRequestType::UserTwapSliceFills
165 | HyperliquidInfoRequestType::UserTwapSliceFillsByTime
166 | HyperliquidInfoRequestType::DelegatorHistory
167 | HyperliquidInfoRequestType::DelegatorRewards
168 | HyperliquidInfoRequestType::ValidatorStats => 20usize,
169 _ => return 0,
170 };
171 (items / unit) as u32
172}
173
174pub fn exchange_weight(action: &ExchangeAction) -> u32 {
176 let batch_size = match &action.params {
178 ExchangeActionParams::Order(params) => params.orders.len(),
179 ExchangeActionParams::Cancel(params) => params.cancels.len(),
180 ExchangeActionParams::Modify(_) => {
181 1
183 }
184 ExchangeActionParams::UpdateLeverage(_) | ExchangeActionParams::UpdateIsolatedMargin(_) => {
185 0
186 }
187 };
188 1 + (batch_size as u32 / 40)
189}
190
191#[cfg(test)]
192mod tests {
193 use rstest::rstest;
194 use rust_decimal::Decimal;
195
196 use super::{
197 super::models::{
198 Cloid, HyperliquidExecCancelByCloidRequest, HyperliquidExecGrouping,
199 HyperliquidExecLimitParams, HyperliquidExecOrderKind, HyperliquidExecPlaceOrderRequest,
200 HyperliquidExecTif,
201 },
202 *,
203 };
204 use crate::http::query::{
205 CancelParams, ExchangeAction, ExchangeActionParams, ExchangeActionType, OrderParams,
206 UpdateLeverageParams,
207 };
208
209 #[rstest]
210 #[case(1, 1)]
211 #[case(39, 1)]
212 #[case(40, 2)]
213 #[case(79, 2)]
214 #[case(80, 3)]
215 fn test_exchange_weight_order_steps_every_40(
216 #[case] array_len: usize,
217 #[case] expected_weight: u32,
218 ) {
219 let orders: Vec<HyperliquidExecPlaceOrderRequest> = (0..array_len)
220 .map(|_| HyperliquidExecPlaceOrderRequest {
221 asset: 0,
222 is_buy: true,
223 price: Decimal::new(50000, 0),
224 size: Decimal::new(1, 0),
225 reduce_only: false,
226 kind: HyperliquidExecOrderKind::Limit {
227 limit: HyperliquidExecLimitParams {
228 tif: HyperliquidExecTif::Gtc,
229 },
230 },
231 cloid: Some(Cloid::from_hex("0x00000000000000000000000000000000").unwrap()),
232 })
233 .collect();
234
235 let action = ExchangeAction {
236 action_type: ExchangeActionType::Order,
237 params: ExchangeActionParams::Order(OrderParams {
238 orders,
239 grouping: HyperliquidExecGrouping::Na,
240 builder: None,
241 }),
242 };
243 assert_eq!(exchange_weight(&action), expected_weight);
244 }
245
246 #[rstest]
247 fn test_exchange_weight_cancel() {
248 let cancels: Vec<HyperliquidExecCancelByCloidRequest> = (0..40)
249 .map(|_| HyperliquidExecCancelByCloidRequest {
250 asset: 0,
251 cloid: Cloid::from_hex("0x00000000000000000000000000000000").unwrap(),
252 })
253 .collect();
254
255 let action = ExchangeAction {
256 action_type: ExchangeActionType::Cancel,
257 params: ExchangeActionParams::Cancel(CancelParams { cancels }),
258 };
259 assert_eq!(exchange_weight(&action), 2);
260 }
261
262 #[rstest]
263 fn test_exchange_weight_non_batch_action() {
264 let update_leverage = ExchangeAction {
265 action_type: ExchangeActionType::UpdateLeverage,
266 params: ExchangeActionParams::UpdateLeverage(UpdateLeverageParams {
267 asset: 1,
268 is_cross: true,
269 leverage: 10,
270 }),
271 };
272 assert_eq!(exchange_weight(&update_leverage), 1);
273 }
274
275 #[tokio::test]
276 async fn test_limiter_roughly_caps_to_capacity() {
277 let limiter = WeightedLimiter::per_minute(1200);
278
279 for _ in 0..60 {
281 limiter.acquire(20).await; }
283
284 let t0 = std::time::Instant::now();
286 limiter.acquire(20).await;
287 let elapsed = t0.elapsed();
288
289 assert!(
291 elapsed.as_millis() >= 500,
292 "Expected significant delay, was {}ms",
293 elapsed.as_millis()
294 );
295 }
296
297 #[tokio::test]
298 async fn test_limiter_debit_extra_works() {
299 let limiter = WeightedLimiter::per_minute(100);
300
301 let snapshot = limiter.snapshot().await;
303 assert_eq!(snapshot.capacity, 100);
304 assert_eq!(snapshot.tokens, 100);
305
306 limiter.acquire(30).await;
308 let snapshot = limiter.snapshot().await;
309 assert_eq!(snapshot.tokens, 70);
310
311 limiter.debit_extra(20).await;
313 let snapshot = limiter.snapshot().await;
314 assert_eq!(snapshot.tokens, 50);
315
316 limiter.debit_extra(100).await;
318 let snapshot = limiter.snapshot().await;
319 assert_eq!(snapshot.tokens, 0);
320 }
321
322 #[rstest]
323 #[case(0, 100)]
324 #[case(1, 200)]
325 #[case(2, 400)]
326 fn test_backoff_full_jitter_increases(#[case] attempt: u32, #[case] max_expected_ms: u64) {
327 let base = Duration::from_millis(100);
328 let cap = Duration::from_secs(5);
329
330 let delay = backoff_full_jitter(attempt, base, cap);
331
332 assert!(delay.as_millis() >= 1);
333 assert!(delay.as_millis() <= max_expected_ms as u128);
334 }
335
336 #[rstest]
337 fn test_backoff_full_jitter_respects_cap() {
338 let base = Duration::from_millis(100);
339 let cap = Duration::from_secs(5);
340
341 let delay_high = backoff_full_jitter(10, base, cap);
342 assert!(delay_high.as_millis() <= cap.as_millis());
343 }
344}