1use std::{
19 collections::HashMap,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, AtomicU64, Ordering},
23 },
24};
25
26use ahash::AHashSet;
27use chrono::{DateTime, Utc};
28use nautilus_core::{
29 AtomicMap, AtomicTime, datetime::nanos_to_millis, nanos::UnixNanos,
30 time::get_atomic_clock_realtime,
31};
32use nautilus_model::{
33 data::{Bar, BarType, TradeTick},
34 enums::{AggregationSource, BarAggregation},
35 events::AccountState,
36 identifiers::{AccountId, InstrumentId},
37 instruments::{Instrument, InstrumentAny},
38 orderbook::OrderBook,
39 reports::{FillReport, OrderStatusReport, PositionStatusReport},
40};
41use nautilus_network::{
42 http::{HttpClient, Method},
43 ratelimiter::quota::Quota,
44 retry::{RetryConfig, RetryManager},
45};
46use serde::{Serialize, de::DeserializeOwned};
47use strum::IntoEnumIterator;
48use tokio_util::sync::CancellationToken;
49use ustr::Ustr;
50
51use super::{
52 error::DeribitHttpError,
53 models::{
54 DeribitAccountSummariesResponse, DeribitBookSummary, DeribitCurrency, DeribitInstrument,
55 DeribitJsonRpcRequest, DeribitJsonRpcResponse, DeribitPosition, DeribitProductType,
56 DeribitTicker, DeribitUserTradesResponse,
57 },
58 query::{
59 GetAccountSummariesParams, GetBookSummaryByCurrencyParams, GetInstrumentParams,
60 GetInstrumentsParams, GetOpenOrdersByInstrumentParams, GetOpenOrdersParams,
61 GetOrderHistoryByCurrencyParams, GetOrderHistoryByInstrumentParams, GetOrderStateParams,
62 GetPositionsParams, GetTickerParams, GetUserTradesByCurrencyAndTimeParams,
63 GetUserTradesByInstrumentAndTimeParams,
64 },
65};
66use crate::{
67 common::{
68 consts::{
69 DERIBIT_ACCOUNT_RATE_KEY, DERIBIT_API_PATH, DERIBIT_GLOBAL_RATE_KEY,
70 DERIBIT_HTTP_ACCOUNT_QUOTA, DERIBIT_HTTP_ORDER_QUOTA, DERIBIT_HTTP_REST_QUOTA,
71 DERIBIT_ORDER_RATE_KEY, JSONRPC_VERSION, should_retry_error_code,
72 },
73 credential::{Credential, credential_env_vars},
74 enums::DeribitEnvironment,
75 parse::{
76 extract_server_timestamp, parse_account_state, parse_bars,
77 parse_deribit_instrument_any, parse_order_book, parse_trade_tick,
78 },
79 urls::get_http_base_url,
80 },
81 http::{
82 models::{DeribitOrderBook, DeribitTradesResponse, DeribitTradingViewChartData},
83 query::{
84 GetLastTradesByInstrumentAndTimeParams, GetOrderBookParams,
85 GetTradingViewChartDataParams,
86 },
87 },
88 websocket::{
89 messages::{DeribitOrderMsg, DeribitUserTradeMsg},
90 parse::{parse_position_status_report, parse_user_order_msg, parse_user_trade_msg},
91 },
92};
93
94pub const DERIBIT_HISTORICAL_TRADES_MAX_COUNT: u32 = 1000;
98
99struct TradePaginator {
105 seen_ids: AHashSet<String>,
106 cursor: i64,
107 end: i64,
108}
109
110impl TradePaginator {
111 fn new(start: i64, end: i64) -> Self {
112 Self {
113 seen_ids: AHashSet::new(),
114 cursor: start,
115 end,
116 }
117 }
118
119 fn advance(
122 &mut self,
123 ids: &[String],
124 timestamps: &[i64],
125 has_more: bool,
126 ) -> Option<Vec<usize>> {
127 if ids.is_empty() {
128 return None;
129 }
130
131 let prev_seen = self.seen_ids.len();
132 let mut new_indices = Vec::new();
133 let mut last_ts = self.cursor;
134
135 for (i, id) in ids.iter().enumerate() {
136 last_ts = timestamps[i];
137
138 if self.seen_ids.insert(id.clone()) {
139 new_indices.push(i);
140 }
141 }
142
143 if !has_more {
144 return Some(new_indices);
145 }
146
147 let new_count = self.seen_ids.len() - prev_seen;
148
149 if new_count == 0 {
150 self.cursor = last_ts + 1;
151 } else {
152 self.cursor = last_ts;
153 }
154
155 Some(new_indices)
156 }
157
158 fn is_exhausted(&self) -> bool {
161 self.cursor > self.end
162 }
163
164 fn reset(&mut self, start: i64) {
165 self.seen_ids.clear();
166 self.cursor = start;
167 }
168}
169
170#[derive(Debug)]
175pub struct DeribitRawHttpClient {
176 base_url: String,
177 client: HttpClient,
178 credential: Option<Credential>,
179 retry_manager: RetryManager<DeribitHttpError>,
180 cancellation_token: CancellationToken,
181 request_id: AtomicU64,
182}
183
184impl DeribitRawHttpClient {
185 pub fn new(
191 base_url: Option<String>,
192 environment: DeribitEnvironment,
193 timeout_secs: u64,
194 max_retries: u32,
195 retry_delay_ms: u64,
196 retry_delay_max_ms: u64,
197 proxy_url: Option<String>,
198 ) -> Result<Self, DeribitHttpError> {
199 let base_url = base_url
200 .unwrap_or_else(|| format!("{}{}", get_http_base_url(environment), DERIBIT_API_PATH));
201 let retry_config = RetryConfig {
202 max_retries,
203 initial_delay_ms: retry_delay_ms,
204 max_delay_ms: retry_delay_max_ms,
205 backoff_factor: 2.0,
206 jitter_ms: 1000,
207 operation_timeout_ms: Some(60_000),
208 immediate_first: false,
209 max_elapsed_ms: Some(180_000),
210 };
211
212 let retry_manager = RetryManager::new(retry_config);
213
214 Ok(Self {
215 base_url,
216 client: HttpClient::new(
217 HashMap::new(),
218 Vec::new(),
219 Self::rate_limiter_quotas(),
220 Some(*DERIBIT_HTTP_REST_QUOTA),
221 Some(timeout_secs),
222 proxy_url,
223 )
224 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
225 credential: None,
226 retry_manager,
227 cancellation_token: CancellationToken::new(),
228 request_id: AtomicU64::new(1),
229 })
230 }
231
232 pub fn cancellation_token(&self) -> &CancellationToken {
234 &self.cancellation_token
235 }
236
237 #[must_use]
239 pub fn is_testnet(&self) -> bool {
240 self.base_url.contains("test.")
241 }
242
243 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
250 vec![
251 (
252 DERIBIT_GLOBAL_RATE_KEY.to_string(),
253 *DERIBIT_HTTP_REST_QUOTA,
254 ),
255 (
256 DERIBIT_ORDER_RATE_KEY.to_string(),
257 *DERIBIT_HTTP_ORDER_QUOTA,
258 ),
259 (
260 DERIBIT_ACCOUNT_RATE_KEY.to_string(),
261 *DERIBIT_HTTP_ACCOUNT_QUOTA,
262 ),
263 ]
264 }
265
266 fn rate_limit_keys(method: &str) -> Vec<String> {
270 let mut keys = vec![DERIBIT_GLOBAL_RATE_KEY.to_string()];
271
272 if Self::is_order_method(method) {
274 keys.push(DERIBIT_ORDER_RATE_KEY.to_string());
275 } else if Self::is_account_method(method) {
276 keys.push(DERIBIT_ACCOUNT_RATE_KEY.to_string());
277 }
278
279 keys.push(format!("deribit:{method}"));
281
282 keys
283 }
284
285 fn is_order_method(method: &str) -> bool {
287 matches!(
288 method,
289 "private/buy"
290 | "private/sell"
291 | "private/edit"
292 | "private/cancel"
293 | "private/cancel_all"
294 | "private/cancel_all_by_currency"
295 | "private/cancel_all_by_instrument"
296 | "private/cancel_by_label"
297 | "private/close_position"
298 )
299 }
300
301 fn is_account_method(method: &str) -> bool {
303 matches!(
304 method,
305 "private/get_account_summaries"
306 | "private/get_account_summary"
307 | "private/get_positions"
308 | "private/get_position"
309 | "private/get_open_orders_by_currency"
310 | "private/get_open_orders_by_instrument"
311 | "private/get_order_state"
312 | "private/get_user_trades_by_currency"
313 | "private/get_user_trades_by_instrument"
314 )
315 }
316
317 #[expect(clippy::too_many_arguments)]
323 pub fn with_credentials(
324 api_key: String,
325 api_secret: String,
326 base_url: Option<String>,
327 environment: DeribitEnvironment,
328 timeout_secs: u64,
329 max_retries: u32,
330 retry_delay_ms: u64,
331 retry_delay_max_ms: u64,
332 proxy_url: Option<String>,
333 ) -> Result<Self, DeribitHttpError> {
334 let base_url = base_url
335 .unwrap_or_else(|| format!("{}{}", get_http_base_url(environment), DERIBIT_API_PATH));
336 let retry_config = RetryConfig {
337 max_retries,
338 initial_delay_ms: retry_delay_ms,
339 max_delay_ms: retry_delay_max_ms,
340 backoff_factor: 2.0,
341 jitter_ms: 1000,
342 operation_timeout_ms: Some(60_000),
343 immediate_first: false,
344 max_elapsed_ms: Some(180_000),
345 };
346
347 let retry_manager = RetryManager::new(retry_config);
348 let credential = Credential::new(api_key, api_secret);
349
350 Ok(Self {
351 base_url,
352 client: HttpClient::new(
353 HashMap::new(),
354 Vec::new(),
355 Self::rate_limiter_quotas(),
356 Some(*DERIBIT_HTTP_REST_QUOTA),
357 Some(timeout_secs),
358 proxy_url,
359 )
360 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
361 credential: Some(credential),
362 retry_manager,
363 cancellation_token: CancellationToken::new(),
364 request_id: AtomicU64::new(1),
365 })
366 }
367
368 #[expect(clippy::too_many_arguments)]
380 pub fn new_with_env(
381 api_key: Option<String>,
382 api_secret: Option<String>,
383 base_url: Option<String>,
384 environment: DeribitEnvironment,
385 timeout_secs: u64,
386 max_retries: u32,
387 retry_delay_ms: u64,
388 retry_delay_max_ms: u64,
389 proxy_url: Option<String>,
390 ) -> Result<Self, DeribitHttpError> {
391 let (key_env, secret_env) = credential_env_vars(environment);
393
394 let api_key = nautilus_core::env::get_or_env_var_opt(api_key, key_env);
396 let api_secret = nautilus_core::env::get_or_env_var_opt(api_secret, secret_env);
397
398 if let (Some(key), Some(secret)) = (api_key, api_secret) {
400 Self::with_credentials(
401 key,
402 secret,
403 base_url,
404 environment,
405 timeout_secs,
406 max_retries,
407 retry_delay_ms,
408 retry_delay_max_ms,
409 proxy_url,
410 )
411 } else {
412 Self::new(
414 base_url,
415 environment,
416 timeout_secs,
417 max_retries,
418 retry_delay_ms,
419 retry_delay_max_ms,
420 proxy_url,
421 )
422 }
423 }
424
425 async fn send_request<T, P>(
427 &self,
428 method: &str,
429 params: P,
430 authenticate: bool,
431 ) -> Result<DeribitJsonRpcResponse<T>, DeribitHttpError>
432 where
433 T: DeserializeOwned,
434 P: Serialize,
435 {
436 let operation_id = format!("{}#{}", self.base_url, method);
438 let params_clone = serde_json::to_value(¶ms)?;
439
440 let operation = || {
441 let method = method.to_string();
442 let params_clone = params_clone.clone();
443
444 async move {
445 let id = self.request_id.fetch_add(1, Ordering::SeqCst);
447 let request = DeribitJsonRpcRequest {
448 jsonrpc: JSONRPC_VERSION,
449 id,
450 method: method.clone(),
451 params: params_clone.clone(),
452 };
453
454 let body = serde_json::to_vec(&request)?;
455
456 let mut headers = HashMap::new();
458 headers.insert("Content-Type".to_string(), "application/json".to_string());
459
460 if authenticate {
462 let credentials = self
463 .credential
464 .as_ref()
465 .ok_or(DeribitHttpError::MissingCredentials)?;
466 let auth_headers = credentials.sign_auth_headers("POST", "/api/v2", &body)?;
467 headers.extend(auth_headers);
468 }
469
470 let rate_limit_keys = Self::rate_limit_keys(&method);
471 let resp = self
472 .client
473 .request(
474 Method::POST,
475 self.base_url.clone(),
476 None,
477 Some(headers),
478 Some(body),
479 None,
480 Some(rate_limit_keys),
481 )
482 .await
483 .map_err(|e| DeribitHttpError::NetworkError(e.to_string()))?;
484
485 let json_value: serde_json::Value = match serde_json::from_slice(&resp.body) {
491 Ok(json) => json,
492 Err(_) => {
493 let error_body = String::from_utf8_lossy(&resp.body);
495 log::error!(
496 "Non-JSON response: method={method}, status={}, body={error_body}",
497 resp.status.as_u16()
498 );
499 return Err(DeribitHttpError::UnexpectedStatus {
500 status: resp.status.as_u16(),
501 body: error_body.to_string(),
502 });
503 }
504 };
505
506 let json_rpc_response: DeribitJsonRpcResponse<T> =
508 serde_json::from_value(json_value.clone()).map_err(|e| {
509 log::error!(
510 "Failed to deserialize Deribit JSON-RPC response: method={method}, status={}, error={e}",
511 resp.status.as_u16()
512 );
513 log::debug!(
514 "Response JSON (first 2000 chars): {}",
515 &json_value
516 .to_string()
517 .chars()
518 .take(2000)
519 .collect::<String>()
520 );
521 DeribitHttpError::JsonError(e.to_string())
522 })?;
523
524 if json_rpc_response.result.is_some() {
526 Ok(json_rpc_response)
527 } else if let Some(error) = &json_rpc_response.error {
528 log::warn!(
530 "Deribit RPC error response: method={method}, http_status={}, error_code={}, error_message={}, error_data={:?}",
531 resp.status.as_u16(),
532 error.code,
533 error.message,
534 error.data
535 );
536
537 Err(DeribitHttpError::from_jsonrpc_error(
539 error.code,
540 error.message.clone(),
541 error.data.as_ref(),
542 ))
543 } else {
544 log::error!(
545 "Response contains neither result nor error field: method={method}, status={}, request_id={:?}",
546 resp.status.as_u16(),
547 json_rpc_response.id
548 );
549 Err(DeribitHttpError::JsonError(
550 "Response contains neither result nor error".to_string(),
551 ))
552 }
553 }
554 };
555
556 let should_retry = |error: &DeribitHttpError| -> bool {
565 match error {
566 DeribitHttpError::NetworkError(_) => true,
567 DeribitHttpError::UnexpectedStatus { status, .. } => {
568 *status >= 500 || *status == 429
569 }
570 DeribitHttpError::DeribitError { error_code, .. } => {
571 should_retry_error_code(*error_code)
572 }
573 _ => false,
574 }
575 };
576
577 let create_error = |msg: String| -> DeribitHttpError {
578 if msg == "canceled" {
579 DeribitHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
580 } else {
581 DeribitHttpError::NetworkError(msg)
582 }
583 };
584
585 self.retry_manager
586 .execute_with_retry_with_cancel(
587 &operation_id,
588 operation,
589 should_retry,
590 create_error,
591 &self.cancellation_token,
592 )
593 .await
594 }
595
596 pub async fn get_instruments(
602 &self,
603 params: GetInstrumentsParams,
604 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitInstrument>>, DeribitHttpError> {
605 self.send_request("public/get_instruments", params, false)
606 .await
607 }
608
609 pub async fn get_instrument(
615 &self,
616 params: GetInstrumentParams,
617 ) -> Result<DeribitJsonRpcResponse<DeribitInstrument>, DeribitHttpError> {
618 self.send_request("public/get_instrument", params, false)
619 .await
620 }
621
622 pub async fn get_last_trades_by_instrument_and_time(
628 &self,
629 params: GetLastTradesByInstrumentAndTimeParams,
630 ) -> Result<DeribitJsonRpcResponse<DeribitTradesResponse>, DeribitHttpError> {
631 self.send_request(
632 "public/get_last_trades_by_instrument_and_time",
633 params,
634 false,
635 )
636 .await
637 }
638
639 pub async fn get_tradingview_chart_data(
645 &self,
646 params: GetTradingViewChartDataParams,
647 ) -> Result<DeribitJsonRpcResponse<DeribitTradingViewChartData>, DeribitHttpError> {
648 self.send_request("public/get_tradingview_chart_data", params, false)
649 .await
650 }
651
652 pub async fn get_account_summaries(
661 &self,
662 params: GetAccountSummariesParams,
663 ) -> Result<DeribitJsonRpcResponse<DeribitAccountSummariesResponse>, DeribitHttpError> {
664 self.send_request("private/get_account_summaries", params, true)
665 .await
666 }
667
668 pub async fn get_order_book(
674 &self,
675 params: GetOrderBookParams,
676 ) -> Result<DeribitJsonRpcResponse<DeribitOrderBook>, DeribitHttpError> {
677 self.send_request("public/get_order_book", params, false)
678 .await
679 }
680
681 pub async fn get_order_state(
690 &self,
691 params: GetOrderStateParams,
692 ) -> Result<DeribitJsonRpcResponse<DeribitOrderMsg>, DeribitHttpError> {
693 self.send_request("private/get_order_state", params, true)
694 .await
695 }
696
697 pub async fn get_open_orders(
706 &self,
707 params: GetOpenOrdersParams,
708 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitOrderMsg>>, DeribitHttpError> {
709 self.send_request("private/get_open_orders", params, true)
710 .await
711 }
712
713 pub async fn get_open_orders_by_instrument(
722 &self,
723 params: GetOpenOrdersByInstrumentParams,
724 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitOrderMsg>>, DeribitHttpError> {
725 self.send_request("private/get_open_orders_by_instrument", params, true)
726 .await
727 }
728
729 pub async fn get_order_history_by_instrument(
738 &self,
739 params: GetOrderHistoryByInstrumentParams,
740 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitOrderMsg>>, DeribitHttpError> {
741 self.send_request("private/get_order_history_by_instrument", params, true)
742 .await
743 }
744
745 pub async fn get_order_history_by_currency(
754 &self,
755 params: GetOrderHistoryByCurrencyParams,
756 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitOrderMsg>>, DeribitHttpError> {
757 self.send_request("private/get_order_history_by_currency", params, true)
758 .await
759 }
760
761 pub async fn get_user_trades_by_instrument_and_time(
770 &self,
771 params: GetUserTradesByInstrumentAndTimeParams,
772 ) -> Result<DeribitJsonRpcResponse<DeribitUserTradesResponse>, DeribitHttpError> {
773 self.send_request(
774 "private/get_user_trades_by_instrument_and_time",
775 params,
776 true,
777 )
778 .await
779 }
780
781 pub async fn get_user_trades_by_currency_and_time(
790 &self,
791 params: GetUserTradesByCurrencyAndTimeParams,
792 ) -> Result<DeribitJsonRpcResponse<DeribitUserTradesResponse>, DeribitHttpError> {
793 self.send_request("private/get_user_trades_by_currency_and_time", params, true)
794 .await
795 }
796
797 pub async fn get_book_summary_by_currency(
803 &self,
804 params: GetBookSummaryByCurrencyParams,
805 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitBookSummary>>, DeribitHttpError> {
806 self.send_request("public/get_book_summary_by_currency", params, false)
807 .await
808 }
809
810 pub async fn get_ticker(
816 &self,
817 params: GetTickerParams,
818 ) -> Result<DeribitJsonRpcResponse<DeribitTicker>, DeribitHttpError> {
819 self.send_request("public/ticker", params, false).await
820 }
821
822 pub async fn get_positions(
831 &self,
832 params: GetPositionsParams,
833 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitPosition>>, DeribitHttpError> {
834 self.send_request("private/get_positions", params, true)
835 .await
836 }
837}
838
839#[derive(Debug)]
844#[cfg_attr(
845 feature = "python",
846 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.deribit", from_py_object)
847)]
848#[cfg_attr(
849 feature = "python",
850 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.deribit")
851)]
852pub struct DeribitHttpClient {
853 pub(crate) inner: Arc<DeribitRawHttpClient>,
854 pub(crate) instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
855 clock: &'static AtomicTime,
856 cache_initialized: AtomicBool,
857}
858
859impl Clone for DeribitHttpClient {
860 fn clone(&self) -> Self {
861 let cache_initialized = AtomicBool::new(false);
862
863 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
864 if is_initialized {
865 cache_initialized.store(true, Ordering::Release);
866 }
867
868 Self {
869 inner: self.inner.clone(),
870 instruments_cache: self.instruments_cache.clone(),
871 cache_initialized,
872 clock: self.clock,
873 }
874 }
875}
876
877impl DeribitHttpClient {
878 pub fn new(
888 base_url: Option<String>,
889 environment: DeribitEnvironment,
890 timeout_secs: u64,
891 max_retries: u32,
892 retry_delay_ms: u64,
893 retry_delay_max_ms: u64,
894 proxy_url: Option<String>,
895 ) -> anyhow::Result<Self> {
896 let raw_client = Arc::new(DeribitRawHttpClient::new(
897 base_url,
898 environment,
899 timeout_secs,
900 max_retries,
901 retry_delay_ms,
902 retry_delay_max_ms,
903 proxy_url,
904 )?);
905
906 Ok(Self {
907 inner: raw_client,
908 instruments_cache: Arc::new(AtomicMap::new()),
909 cache_initialized: AtomicBool::new(false),
910 clock: get_atomic_clock_realtime(),
911 })
912 }
913
914 #[expect(clippy::too_many_arguments)]
926 pub fn new_with_env(
927 api_key: Option<String>,
928 api_secret: Option<String>,
929 base_url: Option<String>,
930 environment: DeribitEnvironment,
931 timeout_secs: u64,
932 max_retries: u32,
933 retry_delay_ms: u64,
934 retry_delay_max_ms: u64,
935 proxy_url: Option<String>,
936 ) -> anyhow::Result<Self> {
937 let raw_client = Arc::new(DeribitRawHttpClient::new_with_env(
938 api_key,
939 api_secret,
940 base_url,
941 environment,
942 timeout_secs,
943 max_retries,
944 retry_delay_ms,
945 retry_delay_max_ms,
946 proxy_url,
947 )?);
948
949 Ok(Self {
950 inner: raw_client,
951 instruments_cache: Arc::new(AtomicMap::new()),
952 cache_initialized: AtomicBool::new(false),
953 clock: get_atomic_clock_realtime(),
954 })
955 }
956
957 pub async fn request_instruments(
963 &self,
964 currency: DeribitCurrency,
965 product_type: Option<DeribitProductType>,
966 ) -> anyhow::Result<Vec<InstrumentAny>> {
967 let params = if let Some(pt) = product_type {
969 GetInstrumentsParams::with_kind(currency, pt)
970 } else {
971 GetInstrumentsParams::new(currency)
972 };
973
974 let full_response = self.inner.get_instruments(params).await?;
976 let result = full_response
977 .result
978 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
979 let ts_event = extract_server_timestamp(full_response.us_out)?;
980 let ts_init = self.generate_ts_init();
981
982 let mut instruments = Vec::new();
984 let mut skipped_count = 0;
985 let mut error_count = 0;
986
987 for raw_instrument in result {
988 match parse_deribit_instrument_any(&raw_instrument, ts_init, ts_event) {
989 Ok(Some(instrument)) => {
990 instruments.push(instrument);
991 }
992 Ok(None) => {
993 skipped_count += 1;
995 log::debug!(
996 "Skipped unsupported instrument type: {} (kind: {:?})",
997 raw_instrument.instrument_name,
998 raw_instrument.kind
999 );
1000 }
1001 Err(e) => {
1002 error_count += 1;
1003 log::warn!(
1004 "Failed to parse instrument {}: {}",
1005 raw_instrument.instrument_name,
1006 e
1007 );
1008 }
1009 }
1010 }
1011
1012 log::info!(
1013 "Parsed {} instruments ({} skipped, {} errors)",
1014 instruments.len(),
1015 skipped_count,
1016 error_count
1017 );
1018
1019 Ok(instruments)
1020 }
1021
1022 pub async fn request_instrument(
1034 &self,
1035 instrument_id: InstrumentId,
1036 ) -> anyhow::Result<InstrumentAny> {
1037 let params = GetInstrumentParams {
1038 instrument_name: instrument_id.symbol.to_string(),
1039 };
1040
1041 let full_response = self.inner.get_instrument(params).await?;
1042 let response = full_response
1043 .result
1044 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1045 let ts_event = extract_server_timestamp(full_response.us_out)?;
1046 let ts_init = self.generate_ts_init();
1047
1048 match parse_deribit_instrument_any(&response, ts_init, ts_event)? {
1049 Some(instrument) => Ok(instrument),
1050 None => anyhow::bail!(
1051 "Unsupported instrument type: {} (kind: {:?})",
1052 response.instrument_name,
1053 response.kind
1054 ),
1055 }
1056 }
1057
1058 pub async fn request_trades(
1081 &self,
1082 instrument_id: InstrumentId,
1083 start: Option<DateTime<Utc>>,
1084 end: Option<DateTime<Utc>>,
1085 limit: Option<u32>,
1086 ) -> anyhow::Result<Vec<TradeTick>> {
1087 let (price_precision, size_precision) =
1089 if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
1090 (instrument.price_precision(), instrument.size_precision())
1091 } else {
1092 log::warn!("Instrument {instrument_id} not in cache, skipping trades request");
1093 anyhow::bail!("Instrument {instrument_id} not in cache");
1094 };
1095
1096 let now = Utc::now();
1098 let end_dt = end.unwrap_or(now);
1099 let start_dt = start.unwrap_or(end_dt - chrono::Duration::hours(1));
1100
1101 if let (Some(s), Some(e)) = (start, end) {
1102 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1103 }
1104
1105 let start_ms = start_dt.timestamp_millis();
1106 let end_ms = end_dt.timestamp_millis();
1107 let ts_init = self.generate_ts_init();
1108 let mut all_trades = Vec::new();
1109 let mut paginator = TradePaginator::new(start_ms, end_ms);
1110
1111 loop {
1112 let params = GetLastTradesByInstrumentAndTimeParams::new(
1113 instrument_id.symbol.to_string(),
1114 paginator.cursor,
1115 end_ms,
1116 Some(DERIBIT_HISTORICAL_TRADES_MAX_COUNT),
1117 Some("asc".to_string()),
1118 );
1119
1120 let full_response = self
1121 .inner
1122 .get_last_trades_by_instrument_and_time(params)
1123 .await
1124 .map_err(|e| anyhow::anyhow!(e))?;
1125
1126 let response_data = full_response
1127 .result
1128 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1129
1130 let ids: Vec<String> = response_data
1131 .trades
1132 .iter()
1133 .map(|t| t.trade_id.clone())
1134 .collect();
1135 let timestamps: Vec<i64> = response_data.trades.iter().map(|t| t.timestamp).collect();
1136
1137 let Some(new_indices) = paginator.advance(&ids, ×tamps, response_data.has_more)
1138 else {
1139 break;
1140 };
1141
1142 for i in &new_indices {
1143 let raw_trade = &response_data.trades[*i];
1144
1145 match parse_trade_tick(
1146 raw_trade,
1147 instrument_id,
1148 price_precision,
1149 size_precision,
1150 ts_init,
1151 ) {
1152 Ok(trade) => {
1153 all_trades.push(trade);
1154
1155 if let Some(max) = limit
1156 && all_trades.len() >= max as usize
1157 {
1158 return Ok(all_trades);
1159 }
1160 }
1161 Err(e) => {
1162 log::warn!(
1163 "Failed to parse trade {} for {}: {}",
1164 raw_trade.trade_id,
1165 instrument_id,
1166 e
1167 );
1168 }
1169 }
1170 }
1171
1172 if !response_data.has_more || paginator.is_exhausted() {
1173 break;
1174 }
1175 }
1176
1177 log::info!(
1178 "Fetched {} historical trades for {} from {} to {}",
1179 all_trades.len(),
1180 instrument_id,
1181 start_dt,
1182 end_dt
1183 );
1184
1185 Ok(all_trades)
1186 }
1187
1188 pub async fn request_bars(
1203 &self,
1204 bar_type: BarType,
1205 start: Option<DateTime<Utc>>,
1206 end: Option<DateTime<Utc>>,
1207 limit: Option<u32>,
1208 ) -> anyhow::Result<Vec<Bar>> {
1209 anyhow::ensure!(
1210 bar_type.aggregation_source() == AggregationSource::External,
1211 "Only EXTERNAL aggregation is supported"
1212 );
1213
1214 let now = Utc::now();
1215
1216 let end_dt = end.unwrap_or(now);
1218 let start_dt = start.unwrap_or(end_dt - chrono::Duration::hours(1));
1219
1220 if let (Some(s), Some(e)) = (start, end) {
1221 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1222 }
1223
1224 let spec = bar_type.spec();
1226 let step = spec.step.get();
1227 let resolution = match spec.aggregation {
1228 BarAggregation::Minute => format!("{step}"),
1229 BarAggregation::Hour => format!("{}", step * 60),
1230 BarAggregation::Day => "1D".to_string(),
1231 a => anyhow::bail!("Deribit does not support {a:?} aggregation"),
1232 };
1233
1234 let supported_resolutions = [
1236 "1", "3", "5", "10", "15", "30", "60", "120", "180", "360", "720", "1D",
1237 ];
1238
1239 if !supported_resolutions.contains(&resolution.as_str()) {
1240 anyhow::bail!(
1241 "Deribit does not support resolution '{resolution}'. Supported: {supported_resolutions:?}"
1242 );
1243 }
1244
1245 let instrument_name = bar_type.instrument_id().symbol.to_string();
1246 let start_timestamp = start_dt.timestamp_millis();
1247 let end_timestamp = end_dt.timestamp_millis();
1248
1249 let params = GetTradingViewChartDataParams::new(
1250 instrument_name,
1251 start_timestamp,
1252 end_timestamp,
1253 resolution,
1254 );
1255
1256 let full_response = self.inner.get_tradingview_chart_data(params).await?;
1257 let chart_data = full_response
1258 .result
1259 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1260
1261 if chart_data.status == "no_data" {
1262 log::debug!("No bar data returned for {bar_type}");
1263 return Ok(Vec::new());
1264 }
1265
1266 let instrument_id = bar_type.instrument_id();
1268 let (price_precision, size_precision) =
1269 if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
1270 (instrument.price_precision(), instrument.size_precision())
1271 } else {
1272 log::warn!("Instrument {instrument_id} not in cache, skipping bars request");
1273 anyhow::bail!("Instrument {instrument_id} not in cache");
1274 };
1275
1276 let ts_init = self.generate_ts_init();
1277 let mut bars = parse_bars(
1278 &chart_data,
1279 bar_type,
1280 price_precision,
1281 size_precision,
1282 ts_init,
1283 )?;
1284
1285 if let Some(max) = limit {
1286 let max = max as usize;
1287 if bars.len() > max {
1288 bars.drain(..bars.len() - max);
1289 }
1290 }
1291
1292 log::info!("Parsed {} bars for {}", bars.len(), bar_type);
1293
1294 Ok(bars)
1295 }
1296
1297 pub async fn request_book_snapshot(
1312 &self,
1313 instrument_id: InstrumentId,
1314 depth: Option<u32>,
1315 ) -> anyhow::Result<OrderBook> {
1316 let (price_precision, size_precision) =
1317 if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
1318 (instrument.price_precision(), instrument.size_precision())
1319 } else {
1320 anyhow::bail!("Instrument {instrument_id} not in cache");
1321 };
1322
1323 let params = GetOrderBookParams::new(instrument_id.symbol.to_string(), depth);
1324 let full_response = self
1325 .inner
1326 .get_order_book(params)
1327 .await
1328 .map_err(|e| anyhow::anyhow!(e))?;
1329
1330 let order_book_data = full_response
1331 .result
1332 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1333
1334 let ts_init = self.generate_ts_init();
1335 let book = parse_order_book(
1336 &order_book_data,
1337 instrument_id,
1338 price_precision,
1339 size_precision,
1340 ts_init,
1341 )?;
1342
1343 log::info!(
1344 "Fetched order book for {} with {} bids and {} asks",
1345 instrument_id,
1346 order_book_data.bids.len(),
1347 order_book_data.asks.len()
1348 );
1349
1350 Ok(book)
1351 }
1352
1353 pub async fn request_account_state(
1364 &self,
1365 account_id: AccountId,
1366 ) -> anyhow::Result<AccountState> {
1367 let params = GetAccountSummariesParams::default();
1368 let full_response = self
1369 .inner
1370 .get_account_summaries(params)
1371 .await
1372 .map_err(|e| anyhow::anyhow!(e))?;
1373 let response_data = full_response
1374 .result
1375 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1376 let ts_init = self.generate_ts_init();
1377 let ts_event = extract_server_timestamp(full_response.us_out)?;
1378
1379 parse_account_state(&response_data.summaries, account_id, ts_init, ts_event)
1380 }
1381
1382 fn generate_ts_init(&self) -> UnixNanos {
1384 self.clock.get_time_ns()
1385 }
1386
1387 pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
1389 self.instruments_cache.rcu(|m| {
1390 for inst in instruments {
1391 m.insert(inst.raw_symbol().inner(), inst.clone());
1392 }
1393 });
1394 self.cache_initialized.store(true, Ordering::Release);
1395 }
1396
1397 #[must_use]
1399 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1400 self.instruments_cache.get_cloned(symbol)
1401 }
1402
1403 #[must_use]
1405 pub fn is_cache_initialized(&self) -> bool {
1406 self.cache_initialized.load(Ordering::Acquire)
1407 }
1408
1409 #[must_use]
1411 pub fn is_testnet(&self) -> bool {
1412 self.inner.is_testnet()
1413 }
1414
1415 pub async fn request_order_status_reports(
1428 &self,
1429 account_id: AccountId,
1430 instrument_id: Option<InstrumentId>,
1431 start: Option<UnixNanos>,
1432 end: Option<UnixNanos>,
1433 open_only: bool,
1434 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1435 let ts_init = self.generate_ts_init();
1436 let mut reports = Vec::new();
1437 let mut seen_order_ids = AHashSet::new();
1438
1439 let mut parse_and_add = |order: &DeribitOrderMsg| {
1440 let symbol = Ustr::from(&order.instrument_name);
1441 if let Some(instrument) = self.get_instrument(&symbol) {
1442 match parse_user_order_msg(order, &instrument, account_id, ts_init) {
1443 Ok(report) => {
1444 let ts_last = report.ts_last;
1446 let in_range = match (start, end) {
1447 (Some(s), Some(e)) => ts_last >= s && ts_last <= e,
1448 (Some(s), None) => ts_last >= s,
1449 (None, Some(e)) => ts_last <= e,
1450 (None, None) => true,
1451 };
1452 if in_range && seen_order_ids.insert(order.order_id.clone()) {
1454 reports.push(report);
1455 }
1456 }
1457 Err(e) => {
1458 log::warn!(
1459 "Failed to parse order {} for {}: {}",
1460 order.order_id,
1461 order.instrument_name,
1462 e
1463 );
1464 }
1465 }
1466 } else {
1467 log::debug!(
1468 "Skipping order {} - instrument {} not in cache",
1469 order.order_id,
1470 order.instrument_name
1471 );
1472 }
1473 };
1474
1475 if let Some(instrument_id) = instrument_id {
1476 let instrument_name = instrument_id.symbol.to_string();
1478
1479 let open_params = GetOpenOrdersByInstrumentParams {
1481 instrument_name: instrument_name.clone(),
1482 r#type: None,
1483 };
1484
1485 if let Some(orders) = self
1486 .inner
1487 .get_open_orders_by_instrument(open_params)
1488 .await?
1489 .result
1490 {
1491 for order in &orders {
1492 parse_and_add(order);
1493 }
1494 }
1495
1496 if !open_only {
1497 const PAGE_SIZE: u32 = 100;
1498 let mut offset: u32 = 0;
1499
1500 loop {
1501 let history_params = GetOrderHistoryByInstrumentParams {
1502 instrument_name: instrument_name.clone(),
1503 count: Some(PAGE_SIZE),
1504 offset: Some(offset),
1505 include_old: Some(true),
1506 include_unfilled: Some(true),
1507 };
1508 let orders = self
1509 .inner
1510 .get_order_history_by_instrument(history_params)
1511 .await?
1512 .result
1513 .unwrap_or_default();
1514
1515 let count = orders.len() as u32;
1516 for order in &orders {
1517 parse_and_add(order);
1518 }
1519
1520 if count < PAGE_SIZE {
1521 break;
1522 }
1523 offset += count;
1524 }
1525 }
1526 } else {
1527 let open_params = GetOpenOrdersParams::default();
1529 if let Some(orders) = self.inner.get_open_orders(open_params).await?.result {
1530 for order in &orders {
1531 parse_and_add(order);
1532 }
1533 }
1534
1535 if !open_only {
1536 const PAGE_SIZE: u32 = 100;
1537
1538 for currency in DeribitCurrency::iter().filter(|c| *c != DeribitCurrency::ANY) {
1539 let mut offset: u32 = 0;
1540
1541 loop {
1542 let history_params = GetOrderHistoryByCurrencyParams {
1543 currency,
1544 kind: None,
1545 count: Some(PAGE_SIZE),
1546 offset: Some(offset),
1547 include_old: Some(true),
1548 include_unfilled: Some(true),
1549 };
1550 let orders = self
1551 .inner
1552 .get_order_history_by_currency(history_params)
1553 .await?
1554 .result
1555 .unwrap_or_default();
1556
1557 let count = orders.len() as u32;
1558 for order in &orders {
1559 parse_and_add(order);
1560 }
1561
1562 if count < PAGE_SIZE {
1563 break;
1564 }
1565 offset += count;
1566 }
1567 }
1568 }
1569 }
1570
1571 log::debug!("Generated {} order status reports", reports.len());
1572 Ok(reports)
1573 }
1574
1575 pub async fn request_fill_reports(
1588 &self,
1589 account_id: AccountId,
1590 instrument_id: Option<InstrumentId>,
1591 start: Option<UnixNanos>,
1592 end: Option<UnixNanos>,
1593 ) -> anyhow::Result<Vec<FillReport>> {
1594 let ts_init = self.generate_ts_init();
1595 let now_ms = Utc::now().timestamp_millis();
1596
1597 let start_ms = start.map_or(0, |ns| nanos_to_millis(ns.as_u64()) as i64);
1599 let end_ms = end.map_or(now_ms, |ns| nanos_to_millis(ns.as_u64()) as i64);
1600 let mut reports = Vec::new();
1601
1602 let mut parse_and_add = |trade: &DeribitUserTradeMsg| {
1604 let symbol = Ustr::from(&trade.instrument_name);
1605 if let Some(instrument) = self.get_instrument(&symbol) {
1606 match parse_user_trade_msg(trade, &instrument, account_id, ts_init) {
1607 Ok(report) => reports.push(report),
1608 Err(e) => {
1609 log::warn!(
1610 "Failed to parse trade {} for {}: {}",
1611 trade.trade_id,
1612 trade.instrument_name,
1613 e
1614 );
1615 }
1616 }
1617 } else {
1618 log::debug!(
1619 "Skipping trade {} - instrument {} not in cache",
1620 trade.trade_id,
1621 trade.instrument_name
1622 );
1623 }
1624 };
1625
1626 let mut paginator = TradePaginator::new(start_ms, end_ms);
1627
1628 if let Some(instrument_id) = instrument_id {
1629 loop {
1630 let params = GetUserTradesByInstrumentAndTimeParams {
1631 instrument_name: instrument_id.symbol.to_string(),
1632 start_timestamp: paginator.cursor,
1633 end_timestamp: end_ms,
1634 count: Some(DERIBIT_HISTORICAL_TRADES_MAX_COUNT),
1635 sorting: Some("asc".to_string()),
1636 };
1637 let response = self
1638 .inner
1639 .get_user_trades_by_instrument_and_time(params)
1640 .await?;
1641
1642 let Some(data) = response.result else { break };
1643
1644 let ids: Vec<String> = data.trades.iter().map(|t| t.trade_id.clone()).collect();
1645 let timestamps: Vec<i64> = data.trades.iter().map(|t| t.timestamp as i64).collect();
1646
1647 let Some(new_indices) = paginator.advance(&ids, ×tamps, data.has_more) else {
1648 break;
1649 };
1650
1651 for i in &new_indices {
1652 parse_and_add(&data.trades[*i]);
1653 }
1654
1655 if !data.has_more || paginator.is_exhausted() {
1656 break;
1657 }
1658 }
1659 } else {
1660 for currency in DeribitCurrency::iter().filter(|c| *c != DeribitCurrency::ANY) {
1661 paginator.reset(start_ms);
1662
1663 loop {
1664 let params = GetUserTradesByCurrencyAndTimeParams {
1665 currency,
1666 start_timestamp: paginator.cursor,
1667 end_timestamp: end_ms,
1668 kind: None,
1669 count: Some(DERIBIT_HISTORICAL_TRADES_MAX_COUNT),
1670 sorting: Some("asc".to_string()),
1671 };
1672 let response = self
1673 .inner
1674 .get_user_trades_by_currency_and_time(params)
1675 .await?;
1676
1677 let Some(data) = response.result else { break };
1678
1679 let ids: Vec<String> = data.trades.iter().map(|t| t.trade_id.clone()).collect();
1680 let timestamps: Vec<i64> =
1681 data.trades.iter().map(|t| t.timestamp as i64).collect();
1682
1683 let Some(new_indices) = paginator.advance(&ids, ×tamps, data.has_more)
1684 else {
1685 break;
1686 };
1687
1688 for i in &new_indices {
1689 parse_and_add(&data.trades[*i]);
1690 }
1691
1692 if !data.has_more || paginator.is_exhausted() {
1693 break;
1694 }
1695 }
1696 }
1697 }
1698
1699 log::debug!("Generated {} fill reports", reports.len());
1700 Ok(reports)
1701 }
1702
1703 pub async fn request_ticker(&self, instrument_name: &str) -> anyhow::Result<DeribitTicker> {
1711 let params = GetTickerParams {
1712 instrument_name: instrument_name.to_string(),
1713 };
1714 let response = self
1715 .inner
1716 .get_ticker(params)
1717 .await
1718 .map_err(|e| anyhow::anyhow!(e))?;
1719 response
1720 .result
1721 .ok_or_else(|| anyhow::anyhow!("No result in ticker response"))
1722 }
1723
1724 pub async fn request_book_summaries(
1733 &self,
1734 currency: &str,
1735 ) -> anyhow::Result<Vec<DeribitBookSummary>> {
1736 let params = GetBookSummaryByCurrencyParams::options(currency);
1737 let full_response = self
1738 .inner
1739 .get_book_summary_by_currency(params)
1740 .await
1741 .map_err(|e| anyhow::anyhow!(e))?;
1742 full_response
1743 .result
1744 .ok_or_else(|| anyhow::anyhow!("No result in book summary response"))
1745 }
1746
1747 pub async fn request_position_status_reports(
1759 &self,
1760 account_id: AccountId,
1761 instrument_id: Option<InstrumentId>,
1762 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1763 let ts_init = self.generate_ts_init();
1764 let mut reports = Vec::new();
1765
1766 let params = GetPositionsParams {
1768 currency: DeribitCurrency::ANY,
1769 kind: None,
1770 };
1771
1772 if let Some(positions) = self.inner.get_positions(params).await?.result {
1773 for position in &positions {
1774 if position.size.is_zero() {
1776 continue;
1777 }
1778
1779 let symbol = position.instrument_name;
1780 if let Some(instrument) = self.get_instrument(&symbol) {
1781 let report =
1782 parse_position_status_report(position, &instrument, account_id, ts_init);
1783 reports.push(report);
1784 } else {
1785 log::debug!(
1786 "Skipping position - instrument {} not in cache",
1787 position.instrument_name
1788 );
1789 }
1790 }
1791 }
1792
1793 if let Some(instrument_id) = instrument_id {
1795 reports.retain(|r| r.instrument_id == instrument_id);
1796 }
1797
1798 log::debug!("Generated {} position status reports", reports.len());
1799 Ok(reports)
1800 }
1801}
1802
1803#[cfg(test)]
1804mod tests {
1805 use rstest::rstest;
1806
1807 use super::*;
1808 use crate::common::consts::{
1809 DERIBIT_ACCOUNT_RATE_KEY, DERIBIT_GLOBAL_RATE_KEY, DERIBIT_ORDER_RATE_KEY,
1810 };
1811
1812 #[rstest]
1813 #[case("private/buy", true, false)]
1814 #[case("private/cancel", true, false)]
1815 #[case("private/get_account_summaries", false, true)]
1816 #[case("private/get_positions", false, true)]
1817 #[case("public/get_instruments", false, false)]
1818 fn test_method_classification(
1819 #[case] method: &str,
1820 #[case] is_order: bool,
1821 #[case] is_account: bool,
1822 ) {
1823 assert_eq!(DeribitRawHttpClient::is_order_method(method), is_order);
1824 assert_eq!(DeribitRawHttpClient::is_account_method(method), is_account);
1825 }
1826
1827 #[rstest]
1828 #[case("private/buy", vec![DERIBIT_GLOBAL_RATE_KEY, DERIBIT_ORDER_RATE_KEY])]
1829 #[case("private/get_account_summaries", vec![DERIBIT_GLOBAL_RATE_KEY, DERIBIT_ACCOUNT_RATE_KEY])]
1830 #[case("public/get_instruments", vec![DERIBIT_GLOBAL_RATE_KEY])]
1831 fn test_rate_limit_keys(#[case] method: &str, #[case] expected_keys: Vec<&str>) {
1832 let keys = DeribitRawHttpClient::rate_limit_keys(method);
1833
1834 for key in &expected_keys {
1835 assert!(keys.contains(&key.to_string()));
1836 }
1837 assert!(keys.contains(&format!("deribit:{method}")));
1838 }
1839
1840 #[rstest]
1841 fn test_paginator_empty_page_returns_none() {
1842 let mut p = TradePaginator::new(100, 200);
1843 assert!(p.advance(&[], &[], true).is_none());
1844 }
1845
1846 #[rstest]
1847 fn test_paginator_single_page_no_more() {
1848 let mut p = TradePaginator::new(100, 200);
1849 let ids = vec!["t1".into(), "t2".into()];
1850 let ts = vec![150, 160];
1851
1852 let result = p.advance(&ids, &ts, false);
1853 assert_eq!(result, Some(vec![0, 1]));
1854 }
1855
1856 #[rstest]
1857 fn test_paginator_dedup_across_pages() {
1858 let mut p = TradePaginator::new(100, 200);
1859
1860 let ids1 = vec!["t1".into(), "t2".into()];
1862 let ts1 = vec![150, 150];
1863 let r1 = p.advance(&ids1, &ts1, true);
1864 assert_eq!(r1, Some(vec![0, 1]));
1865 assert_eq!(p.cursor, 150);
1866
1867 let ids2 = vec!["t2".into(), "t3".into()];
1869 let ts2 = vec![150, 150];
1870 let r2 = p.advance(&ids2, &ts2, false);
1871 assert_eq!(r2, Some(vec![1])); }
1873
1874 #[rstest]
1875 fn test_paginator_all_duplicates_advances_past_timestamp() {
1876 let mut p = TradePaginator::new(100, 200);
1877
1878 let ids = vec!["t1".into(), "t2".into()];
1880 let ts = vec![150, 150];
1881 p.advance(&ids, &ts, true);
1882 assert_eq!(p.cursor, 150);
1883
1884 let r2 = p.advance(&ids, &ts, true);
1886 assert_eq!(r2, Some(vec![])); assert_eq!(p.cursor, 151); }
1889
1890 #[rstest]
1891 fn test_paginator_is_exhausted_strict_greater_than() {
1892 let mut p = TradePaginator::new(100, 150);
1893
1894 let ids = vec!["t1".into()];
1895 let ts = vec![150];
1896 p.advance(&ids, &ts, true);
1897
1898 assert_eq!(p.cursor, 150);
1900 assert!(!p.is_exhausted());
1901
1902 p.advance(&ids, &ts, true);
1904 assert_eq!(p.cursor, 151);
1905 assert!(p.is_exhausted());
1906 }
1907
1908 #[rstest]
1909 fn test_paginator_reset_clears_state() {
1910 let mut p = TradePaginator::new(100, 200);
1911
1912 let ids = vec!["t1".into()];
1913 let ts = vec![150];
1914 p.advance(&ids, &ts, true);
1915 assert_eq!(p.seen_ids.len(), 1);
1916
1917 p.reset(100);
1918 assert_eq!(p.cursor, 100);
1919 assert!(p.seen_ids.is_empty());
1920
1921 let r = p.advance(&ids, &ts, false);
1923 assert_eq!(r, Some(vec![0]));
1924 }
1925}