1use std::{
23 fmt::Debug,
24 sync::{
25 Arc, Mutex,
26 atomic::{AtomicBool, AtomicU8, Ordering},
27 },
28 time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use futures_util::Stream;
33use nautilus_common::{enums::LogColor, live::get_runtime, log_info};
34use nautilus_core::{
35 AtomicMap, AtomicSet, consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt,
36 time::get_atomic_clock_realtime,
37};
38use nautilus_model::{
39 data::BarType,
40 enums::OrderSide,
41 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
42 instruments::{Instrument, InstrumentAny},
43 types::{Price, Quantity},
44};
45use nautilus_network::{
46 http::USER_AGENT,
47 mode::ConnectionMode,
48 websocket::{
49 AuthTracker, PingHandler, SubscriptionState, TransportBackend, WebSocketClient,
50 WebSocketConfig, channel_message_handler,
51 },
52};
53use tokio_util::sync::CancellationToken;
54use ustr::Ustr;
55
56use super::{
57 auth::{AuthState, send_auth_request, spawn_token_refresh_task},
58 enums::{DeribitUpdateInterval, DeribitWsChannel},
59 error::{DeribitWsError, DeribitWsResult},
60 handler::{DeribitWsFeedHandler, HandlerCommand},
61 messages::{
62 DeribitCancelAllByInstrumentParams, DeribitCancelParams, DeribitEditParams,
63 DeribitOrderParams, NautilusWsMessage,
64 },
65};
66use crate::common::{
67 consts::{
68 DERIBIT_TESTNET_WS_URL, DERIBIT_WS_HEARTBEAT_SECS, DERIBIT_WS_ORDER_KEY,
69 DERIBIT_WS_ORDER_QUOTA, DERIBIT_WS_SUBSCRIPTION_KEY, DERIBIT_WS_SUBSCRIPTION_QUOTA,
70 DERIBIT_WS_URL,
71 },
72 credential::{Credential, credential_env_vars},
73 enums::DeribitEnvironment,
74 parse::bar_spec_to_resolution,
75};
76
77const AUTHENTICATION_TIMEOUT_SECS: u64 = 30;
79
80#[derive(Clone)]
82#[cfg_attr(
83 feature = "python",
84 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.deribit", from_py_object)
85)]
86#[cfg_attr(
87 feature = "python",
88 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.deribit")
89)]
90pub struct DeribitWebSocketClient {
91 url: String,
92 environment: DeribitEnvironment,
93 heartbeat_interval: Option<u64>,
94 credential: Option<Credential>,
95 auth_state: Arc<tokio::sync::RwLock<Option<AuthState>>>,
96 signal: Arc<AtomicBool>,
97 connection_mode: Arc<ArcSwap<AtomicU8>>,
98 auth_tracker: AuthTracker,
99 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
100 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
101 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
102 subscriptions_state: SubscriptionState,
103 instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
104 option_greeks_subs: Arc<AtomicSet<InstrumentId>>,
105 mark_price_subs: Arc<AtomicSet<InstrumentId>>,
106 index_price_subs: Arc<AtomicSet<InstrumentId>>,
107 cancellation_token: CancellationToken,
108 account_id: Option<AccountId>,
109 bars_timestamp_on_close: bool,
110 subscribe_errors: Arc<Mutex<Vec<String>>>,
111 transport_backend: TransportBackend,
112 proxy_url: Option<String>,
113}
114
115impl Debug for DeribitWebSocketClient {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 f.debug_struct(stringify!(DeribitWebSocketClient))
118 .field("url", &self.url)
119 .field("environment", &self.environment)
120 .field("has_credentials", &self.credential.is_some())
121 .field("is_authenticated", &self.auth_tracker.is_authenticated())
122 .field(
123 "has_auth_state",
124 &self.auth_state.try_read().is_ok_and(|s| s.is_some()),
125 )
126 .field("heartbeat_interval", &self.heartbeat_interval)
127 .finish_non_exhaustive()
128 }
129}
130
131impl DeribitWebSocketClient {
132 pub fn new(
140 url: Option<String>,
141 api_key: Option<String>,
142 api_secret: Option<String>,
143 heartbeat_interval: u64,
144 environment: DeribitEnvironment,
145 transport_backend: TransportBackend,
146 proxy_url: Option<String>,
147 ) -> anyhow::Result<Self> {
148 Self::new_inner(
149 url,
150 api_key,
151 api_secret,
152 heartbeat_interval,
153 environment,
154 true,
155 transport_backend,
156 proxy_url,
157 )
158 }
159
160 #[expect(clippy::too_many_arguments)]
162 fn new_inner(
163 url: Option<String>,
164 api_key: Option<String>,
165 api_secret: Option<String>,
166 heartbeat_interval: u64,
167 environment: DeribitEnvironment,
168 env_fallback: bool,
169 transport_backend: TransportBackend,
170 proxy_url: Option<String>,
171 ) -> anyhow::Result<Self> {
172 let url = url.unwrap_or_else(|| match environment {
173 DeribitEnvironment::Testnet => DERIBIT_TESTNET_WS_URL.to_string(),
174 DeribitEnvironment::Mainnet => DERIBIT_WS_URL.to_string(),
175 });
176
177 let credential =
179 Credential::resolve_with_env_fallback(api_key, api_secret, environment, env_fallback)?;
180
181 if credential.is_some() {
182 log::info!("Credentials loaded ({environment})");
183 } else {
184 log::debug!("No credentials configured - unauthenticated mode");
185 }
186
187 let signal = Arc::new(AtomicBool::new(false));
188 let subscriptions_state = SubscriptionState::new('.');
189
190 Ok(Self {
191 url,
192 environment,
193 heartbeat_interval: Some(heartbeat_interval),
194 credential,
195 auth_state: Arc::new(tokio::sync::RwLock::new(None)),
196 signal,
197 connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
198 ConnectionMode::Closed.as_u8(),
199 ))),
200 auth_tracker: AuthTracker::new(),
201 cmd_tx: {
202 let (tx, _) = tokio::sync::mpsc::unbounded_channel();
203 Arc::new(tokio::sync::RwLock::new(tx))
204 },
205 out_rx: None,
206 task_handle: None,
207 subscriptions_state,
208 instruments_cache: Arc::new(AtomicMap::new()),
209 option_greeks_subs: Arc::new(AtomicSet::new()),
210 mark_price_subs: Arc::new(AtomicSet::new()),
211 index_price_subs: Arc::new(AtomicSet::new()),
212 cancellation_token: CancellationToken::new(),
213 account_id: None,
214 bars_timestamp_on_close: true,
215 subscribe_errors: Arc::new(Mutex::new(Vec::new())),
216 transport_backend,
217 proxy_url,
218 })
219 }
220
221 pub fn new_public(
229 environment: DeribitEnvironment,
230 proxy_url: Option<String>,
231 ) -> anyhow::Result<Self> {
232 Self::new_inner(
233 None,
234 None,
235 None,
236 DERIBIT_WS_HEARTBEAT_SECS,
237 environment,
238 false,
239 TransportBackend::default(),
240 proxy_url,
241 )
242 }
243
244 pub fn new_unauthenticated(
253 url: Option<String>,
254 heartbeat_interval: u64,
255 environment: DeribitEnvironment,
256 ) -> anyhow::Result<Self> {
257 Self::new_inner(
258 url,
259 None,
260 None,
261 heartbeat_interval,
262 environment,
263 false,
264 TransportBackend::default(),
265 None,
266 )
267 }
268
269 pub fn with_credentials(
279 environment: DeribitEnvironment,
280 proxy_url: Option<String>,
281 ) -> anyhow::Result<Self> {
282 let (key_env, secret_env) = credential_env_vars(environment);
283
284 let api_key = get_or_env_var_opt(None, key_env)
285 .ok_or_else(|| anyhow::anyhow!("Missing environment variable: {key_env}"))?;
286 let api_secret = get_or_env_var_opt(None, secret_env)
287 .ok_or_else(|| anyhow::anyhow!("Missing environment variable: {secret_env}"))?;
288
289 Self::new(
290 None,
291 Some(api_key),
292 Some(api_secret),
293 DERIBIT_WS_HEARTBEAT_SECS,
294 environment,
295 TransportBackend::default(),
296 proxy_url,
297 )
298 }
299
300 fn connection_mode(&self) -> ConnectionMode {
302 let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
303 ConnectionMode::from_u8(mode_u8)
304 }
305
306 #[must_use]
308 pub fn is_active(&self) -> bool {
309 self.connection_mode() == ConnectionMode::Active
310 }
311
312 #[must_use]
314 pub fn url(&self) -> &str {
315 &self.url
316 }
317
318 #[must_use]
320 pub fn environment(&self) -> DeribitEnvironment {
321 self.environment
322 }
323
324 #[must_use]
326 pub fn is_closed(&self) -> bool {
327 let mode = self.connection_mode();
328 mode == ConnectionMode::Disconnect || mode == ConnectionMode::Closed
329 }
330
331 pub fn cancel_all_requests(&self) {
333 self.cancellation_token.cancel();
334 }
335
336 #[must_use]
338 pub fn cancellation_token(&self) -> &CancellationToken {
339 &self.cancellation_token
340 }
341
342 pub async fn wait_until_active(&self, timeout_secs: f64) -> DeribitWsResult<()> {
348 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
349
350 tokio::time::timeout(timeout, async {
351 while !self.is_active() {
352 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
353 }
354 })
355 .await
356 .map_err(|_| {
357 DeribitWsError::Timeout(format!(
358 "WebSocket connection timeout after {timeout_secs} seconds"
359 ))
360 })?;
361
362 Ok(())
363 }
364
365 pub async fn wait_for_subscriptions_confirmed(&self, timeout_secs: f64) -> DeribitWsResult<()> {
371 let timeout = Duration::from_secs_f64(timeout_secs);
372
373 tokio::time::timeout(timeout, async {
374 loop {
375 if let Ok(mut errors) = self.subscribe_errors.lock()
377 && !errors.is_empty()
378 {
379 let msg = errors.join("; ");
380 errors.clear();
381 return Err(DeribitWsError::Subscribe(msg));
382 }
383
384 let pending = self.subscriptions_state.pending_subscribe_topics();
385 if pending.is_empty() {
386 return Ok(());
387 }
388 tokio::time::sleep(Duration::from_millis(10)).await;
389 }
390 })
391 .await
392 .map_err(|_| {
393 let pending = self.subscriptions_state.pending_subscribe_topics();
394 DeribitWsError::Timeout(format!(
395 "Subscription confirmation timeout after {timeout_secs}s, \
396 still pending: {pending:?}"
397 ))
398 })?
399 }
400
401 pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
403 self.instruments_cache.rcu(|m| {
404 for inst in instruments {
405 m.insert(inst.raw_symbol().inner(), inst.clone());
406 }
407 });
408 log::debug!("Cached {} instruments", self.instruments_cache.len());
409
410 if self.is_active() {
413 for inst in instruments {
414 let tx = self.cmd_tx.clone();
415 let boxed = Box::new(inst.clone());
416
417 get_runtime().spawn(async move {
418 let _ = tx
419 .read()
420 .await
421 .send(HandlerCommand::UpdateInstrument(boxed));
422 });
423 }
424 }
425 }
426
427 pub fn cache_instrument(&self, instrument: InstrumentAny) {
429 let symbol = instrument.raw_symbol().inner();
430 self.instruments_cache.insert(symbol, instrument);
431
432 if self.is_active() {
434 let tx = self.cmd_tx.clone();
435 let inst = self.instruments_cache.get_cloned(&symbol);
436 if let Some(inst) = inst {
437 get_runtime().spawn(async move {
438 let _ = tx
439 .read()
440 .await
441 .send(HandlerCommand::UpdateInstrument(Box::new(inst)));
442 });
443 }
444 }
445 }
446
447 pub fn set_option_greeks_subs(&mut self, subs: Arc<AtomicSet<InstrumentId>>) {
449 self.option_greeks_subs = subs;
450 }
451
452 pub fn set_mark_price_subs(&mut self, subs: Arc<AtomicSet<InstrumentId>>) {
454 self.mark_price_subs = subs;
455 }
456
457 pub fn set_index_price_subs(&mut self, subs: Arc<AtomicSet<InstrumentId>>) {
459 self.index_price_subs = subs;
460 }
461
462 pub fn add_mark_price_sub(&self, instrument_id: InstrumentId) {
464 self.mark_price_subs.insert(instrument_id);
465 }
466
467 pub fn remove_mark_price_sub(&self, instrument_id: &InstrumentId) {
469 self.mark_price_subs.remove(instrument_id);
470 }
471
472 pub fn add_index_price_sub(&self, instrument_id: InstrumentId) {
474 self.index_price_subs.insert(instrument_id);
475 }
476
477 pub fn remove_index_price_sub(&self, instrument_id: &InstrumentId) {
479 self.index_price_subs.remove(instrument_id);
480 }
481
482 pub fn add_option_greeks_sub(&self, instrument_id: InstrumentId) {
484 self.option_greeks_subs.insert(instrument_id);
485 }
486
487 pub fn remove_option_greeks_sub(&self, instrument_id: &InstrumentId) {
489 self.option_greeks_subs.remove(instrument_id);
490 }
491
492 pub async fn connect(&mut self) -> anyhow::Result<()> {
498 log_info!(
499 "Connecting to WebSocket: {}",
500 self.url,
501 color = LogColor::Blue
502 );
503
504 if let Some(handle) = self.task_handle.take() {
505 handle.abort();
506 }
507
508 self.signal.store(false, Ordering::Relaxed);
511 self.subscriptions_state.clear();
512
513 let (message_handler, raw_rx) = channel_message_handler();
515
516 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
518 });
520
521 let config = WebSocketConfig {
523 url: self.url.clone(),
524 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
525 heartbeat: self.heartbeat_interval,
526 heartbeat_msg: None, reconnect_timeout_ms: Some(5_000),
528 reconnect_delay_initial_ms: None,
529 reconnect_delay_max_ms: None,
530 reconnect_backoff_factor: None,
531 reconnect_jitter_ms: None,
532 reconnect_max_attempts: None,
533 idle_timeout_ms: None,
534 backend: self.transport_backend,
535 proxy_url: self.proxy_url.clone(),
536 };
537
538 let keyed_quotas = vec![
540 (
541 DERIBIT_WS_SUBSCRIPTION_KEY.to_string(),
542 *DERIBIT_WS_SUBSCRIPTION_QUOTA,
543 ),
544 (DERIBIT_WS_ORDER_KEY.to_string(), *DERIBIT_WS_ORDER_QUOTA),
545 ];
546
547 let ws_client = WebSocketClient::connect(
549 config,
550 Some(message_handler),
551 Some(ping_handler),
552 None, keyed_quotas,
554 Some(*DERIBIT_WS_SUBSCRIPTION_QUOTA), )
556 .await?;
557
558 self.connection_mode
560 .store(ws_client.connection_mode_atomic());
561
562 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
564 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
565
566 *self.cmd_tx.write().await = cmd_tx.clone();
568 self.out_rx = Some(Arc::new(out_rx));
569
570 if let Ok(mut errors) = self.subscribe_errors.lock() {
571 errors.clear();
572 }
573
574 let mut handler = DeribitWsFeedHandler::new(
576 self.signal.clone(),
577 cmd_rx,
578 raw_rx,
579 out_tx,
580 self.auth_tracker.clone(),
581 self.subscriptions_state.clone(),
582 self.option_greeks_subs.clone(),
583 self.mark_price_subs.clone(),
584 self.index_price_subs.clone(),
585 self.account_id,
586 self.bars_timestamp_on_close,
587 self.subscribe_errors.clone(),
588 );
589
590 let _ = cmd_tx.send(HandlerCommand::SetClient(ws_client));
592
593 let instruments: Vec<InstrumentAny> =
595 self.instruments_cache.load().values().cloned().collect();
596
597 if !instruments.is_empty() {
598 log::debug!(
599 "Sending {} cached instruments to handler",
600 instruments.len()
601 );
602 let _ = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments));
603 }
604
605 if let Some(interval) = self.heartbeat_interval {
607 let _ = cmd_tx.send(HandlerCommand::SetHeartbeat { interval });
608 }
609
610 let subscriptions_state = self.subscriptions_state.clone();
612 let credential = self.credential.clone();
613 let auth_tracker = self.auth_tracker.clone();
614 let auth_state = self.auth_state.clone();
615
616 let task_handle = get_runtime().spawn(async move {
617 const MAX_REAUTH_ATTEMPTS: u32 = 3;
618
619 let mut pending_reauth = false;
620 let mut reauth_attempts: u32 = 0;
621
622 let mut refresh_cancel = CancellationToken::new();
623 let mut retry_cancel = CancellationToken::new();
624
625 loop {
626 match handler.next().await {
627 Some(msg) => match msg {
628 NautilusWsMessage::Reconnected => {
629 log::info!("Reconnected to WebSocket");
630
631 refresh_cancel.cancel();
633 refresh_cancel = CancellationToken::new();
634 retry_cancel.cancel();
635 retry_cancel = CancellationToken::new();
636
637 let channels = subscriptions_state.all_topics();
638
639 for channel in &channels {
640 subscriptions_state.mark_failure(channel);
641 }
642
643 if let Some(cred) = &credential {
645 log::info!("Re-authenticating after reconnection...");
646
647 let _rx = auth_tracker.begin();
648 pending_reauth = true;
649 reauth_attempts = 1;
650
651 let previous_scope = auth_state
652 .read()
653 .await
654 .as_ref()
655 .map(|s| s.scope.clone());
656
657 send_auth_request(cred, previous_scope, &cmd_tx);
658 } else {
659 if !channels.is_empty() {
661 let _ = cmd_tx.send(HandlerCommand::Subscribe { channels });
662 }
663 }
664 }
665 NautilusWsMessage::Authenticated(result) => {
666 let timestamp = get_atomic_clock_realtime().get_time_ms();
667 let new_auth_state = AuthState::from_auth_result(&result, timestamp);
668 *auth_state.write().await = Some(new_auth_state);
669
670 refresh_cancel.cancel();
671 refresh_cancel = CancellationToken::new();
672 retry_cancel.cancel();
673 retry_cancel = CancellationToken::new();
674
675 spawn_token_refresh_task(
676 result.expires_in,
677 result.refresh_token.clone(),
678 cmd_tx.clone(),
679 refresh_cancel.clone(),
680 );
681
682 if pending_reauth {
683 pending_reauth = false;
684 reauth_attempts = 0;
685 log::info!(
686 "Re-authentication successful (scope: {}), resubscribing to channels",
687 result.scope
688 );
689
690 let channels = subscriptions_state.all_topics();
691
692 if !channels.is_empty() {
693 let _ = cmd_tx.send(HandlerCommand::Subscribe { channels });
694 }
695 } else {
696 log::debug!(
697 "Auth state stored: scope={}, expires_in={}s",
698 result.scope,
699 result.expires_in
700 );
701 }
702 }
703 NautilusWsMessage::AuthenticationFailed(reason) => {
704 if pending_reauth && reauth_attempts < MAX_REAUTH_ATTEMPTS {
705 let delay_secs = 1u64 << reauth_attempts; log::warn!(
707 "Re-authentication attempt {reauth_attempts}/{MAX_REAUTH_ATTEMPTS} \
708 failed: {reason} - retrying in {delay_secs}s",
709 );
710 reauth_attempts += 1;
711
712 if let Some(cred) = &credential {
715 let cred = cred.clone();
716 let auth_state = auth_state.clone();
717 let auth_tracker = auth_tracker.clone();
718 let cmd_tx = cmd_tx.clone();
719 let cancel = retry_cancel.clone();
720
721 get_runtime().spawn(async move {
722 tokio::select! {
723 () = tokio::time::sleep(Duration::from_secs(delay_secs)) => {}
724 () = cancel.cancelled() => return,
725 }
726 let _rx = auth_tracker.begin();
727 let previous_scope = auth_state
728 .read()
729 .await
730 .as_ref()
731 .map(|s| s.scope.clone());
732 send_auth_request(&cred, previous_scope, &cmd_tx);
733 });
734 }
735 } else if pending_reauth {
736 pending_reauth = false;
737 reauth_attempts = 0;
738 log::error!(
739 "Re-authentication failed after {MAX_REAUTH_ATTEMPTS} \
740 attempts: {reason} \
741 - resubscribing to public channels only"
742 );
743
744 let all = subscriptions_state.all_topics();
745 let mut public_channels = Vec::new();
746
747 for ch in &all {
748 if DeribitWsChannel::requires_auth(ch) {
749 subscriptions_state.mark_unsubscribe(ch);
752 subscriptions_state.confirm_unsubscribe(ch);
753 subscriptions_state.remove_reference(ch);
754 } else {
755 public_channels.push(ch.clone());
756 }
757 }
758
759 if !public_channels.is_empty() {
760 let _ = cmd_tx.send(HandlerCommand::Subscribe {
761 channels: public_channels,
762 });
763 }
764 } else {
765 log::error!("Authentication failed: {reason}");
766 }
767 }
768 _ => {}
769 },
770 None => {
771 log::debug!("Handler returned None, stopping task");
772 break;
773 }
774 }
775 }
776 });
777
778 self.task_handle = Some(Arc::new(task_handle));
779 log::info!("Connected to WebSocket");
780
781 Ok(())
782 }
783
784 pub async fn close(&self) -> DeribitWsResult<()> {
790 log::info!("Closing WebSocket connection");
791 self.signal.store(true, Ordering::Relaxed);
792
793 let _ = self.cmd_tx.read().await.send(HandlerCommand::Disconnect);
794
795 if let Some(handle) = &self.task_handle {
797 let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
798 while !handle.is_finished() && tokio::time::Instant::now() < deadline {
799 tokio::time::sleep(Duration::from_millis(50)).await;
800 }
801
802 if !handle.is_finished() {
803 handle.abort();
804 }
805 }
806
807 self.auth_tracker.invalidate();
808
809 Ok(())
810 }
811
812 pub fn stream(&mut self) -> DeribitWsResult<impl Stream<Item = NautilusWsMessage> + 'static> {
818 let rx = self.out_rx.take().ok_or_else(|| {
819 DeribitWsError::ClientError(
820 "Stream receiver already taken or not connected".to_string(),
821 )
822 })?;
823 let mut rx = Arc::try_unwrap(rx).map_err(|_| {
824 DeribitWsError::ClientError(
825 "Cannot take stream ownership - other references exist".to_string(),
826 )
827 })?;
828
829 Ok(async_stream::stream! {
830 while let Some(msg) = rx.recv().await {
831 yield msg;
832 }
833 })
834 }
835
836 #[must_use]
838 pub fn has_credentials(&self) -> bool {
839 self.credential.is_some()
840 }
841
842 #[must_use]
844 pub fn is_authenticated(&self) -> bool {
845 self.auth_tracker.is_authenticated()
846 }
847
848 pub async fn authenticate(&self, session_name: Option<&str>) -> DeribitWsResult<()> {
867 let credential = self.credential.as_ref().ok_or_else(|| {
868 DeribitWsError::Authentication("API credentials not configured".to_string())
869 })?;
870
871 let scope = session_name.map(|name| format!("session:{name}"));
873
874 log::info!("Authenticating WebSocket...");
875
876 let rx = self.auth_tracker.begin();
877
878 let cmd_tx = self.cmd_tx.read().await;
880 send_auth_request(credential, scope, &cmd_tx);
881 drop(cmd_tx);
882
883 match self
885 .auth_tracker
886 .wait_for_result::<DeribitWsError>(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), rx)
887 .await
888 {
889 Ok(()) => {
890 log::info!("WebSocket authenticated successfully");
891 Ok(())
892 }
893 Err(e) => {
894 log::error!("WebSocket authentication failed: error={e}");
895 Err(e)
896 }
897 }
898 }
899
900 pub async fn authenticate_session(&self, session_name: &str) -> DeribitWsResult<()> {
909 self.authenticate(Some(session_name)).await
910 }
911
912 pub async fn auth_state(&self) -> Option<AuthState> {
916 self.auth_state.read().await.clone()
917 }
918
919 pub async fn access_token(&self) -> Option<String> {
921 self.auth_state
922 .read()
923 .await
924 .as_ref()
925 .map(|s| s.access_token.clone())
926 }
927
928 pub fn set_account_id(&mut self, account_id: AccountId) {
930 self.account_id = Some(account_id);
931 }
932
933 pub fn set_bars_timestamp_on_close(&mut self, value: bool) {
937 self.bars_timestamp_on_close = value;
938 }
939
940 async fn send_subscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
941 let mut channels_to_subscribe = Vec::new();
942
943 for channel in channels {
944 if self.subscriptions_state.add_reference(&channel) {
945 self.subscriptions_state.mark_subscribe(&channel);
946 channels_to_subscribe.push(channel);
947 } else {
948 log::debug!("Already subscribed to {channel}, skipping duplicate subscription");
949 }
950 }
951
952 if channels_to_subscribe.is_empty() {
953 return Ok(());
954 }
955
956 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
957 channels: channels_to_subscribe.clone(),
958 }) {
959 for channel in &channels_to_subscribe {
961 self.subscriptions_state.remove_reference(channel);
962 self.subscriptions_state.mark_unsubscribe(channel);
963 self.subscriptions_state.confirm_unsubscribe(channel);
964 }
965 return Err(DeribitWsError::Send(e.to_string()));
966 }
967
968 log::debug!(
969 "Sent subscribe for {} channels",
970 channels_to_subscribe.len()
971 );
972 Ok(())
973 }
974
975 async fn send_unsubscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
976 let mut channels_to_unsubscribe = Vec::new();
977
978 for channel in channels {
979 if self.subscriptions_state.remove_reference(&channel) {
980 self.subscriptions_state.mark_unsubscribe(&channel);
981 channels_to_unsubscribe.push(channel);
982 } else {
983 log::debug!("Still has references to {channel}, skipping unsubscription");
984 }
985 }
986
987 if channels_to_unsubscribe.is_empty() {
988 return Ok(());
989 }
990
991 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Unsubscribe {
992 channels: channels_to_unsubscribe.clone(),
993 }) {
994 for channel in &channels_to_unsubscribe {
1001 self.subscriptions_state.confirm_unsubscribe(channel);
1002 self.subscriptions_state.add_reference(channel);
1003 self.subscriptions_state.confirm_subscribe(channel);
1004 }
1005 return Err(DeribitWsError::Send(e.to_string()));
1006 }
1007
1008 log::debug!(
1009 "Sent unsubscribe for {} channels",
1010 channels_to_unsubscribe.len()
1011 );
1012 Ok(())
1013 }
1014
1015 pub async fn subscribe_trades(
1026 &self,
1027 instrument_id: InstrumentId,
1028 interval: Option<DeribitUpdateInterval>,
1029 ) -> DeribitWsResult<()> {
1030 let interval = interval.unwrap_or_default();
1031 self.check_auth_requirement(interval)?;
1032 let channel =
1033 DeribitWsChannel::Trades.format_channel(instrument_id.symbol.as_str(), Some(interval));
1034 self.send_subscribe(vec![channel]).await
1035 }
1036
1037 pub async fn unsubscribe_trades(
1043 &self,
1044 instrument_id: InstrumentId,
1045 interval: Option<DeribitUpdateInterval>,
1046 ) -> DeribitWsResult<()> {
1047 let interval = interval.unwrap_or_default();
1048 let channel =
1049 DeribitWsChannel::Trades.format_channel(instrument_id.symbol.as_str(), Some(interval));
1050 self.send_unsubscribe(vec![channel]).await
1051 }
1052
1053 pub async fn subscribe_book(
1064 &self,
1065 instrument_id: InstrumentId,
1066 interval: Option<DeribitUpdateInterval>,
1067 ) -> DeribitWsResult<()> {
1068 let interval = interval.unwrap_or_default();
1069 self.check_auth_requirement(interval)?;
1070 let channel =
1071 DeribitWsChannel::Book.format_channel(instrument_id.symbol.as_str(), Some(interval));
1072 self.send_subscribe(vec![channel]).await
1073 }
1074
1075 pub async fn unsubscribe_book(
1081 &self,
1082 instrument_id: InstrumentId,
1083 interval: Option<DeribitUpdateInterval>,
1084 ) -> DeribitWsResult<()> {
1085 let interval = interval.unwrap_or_default();
1086 let channel =
1087 DeribitWsChannel::Book.format_channel(instrument_id.symbol.as_str(), Some(interval));
1088 self.send_unsubscribe(vec![channel]).await
1089 }
1090
1091 pub async fn subscribe_book_grouped(
1101 &self,
1102 instrument_id: InstrumentId,
1103 group: &str,
1104 depth: u32,
1105 interval: Option<DeribitUpdateInterval>,
1106 ) -> DeribitWsResult<()> {
1107 let interval = match interval {
1109 Some(DeribitUpdateInterval::Raw) | None => DeribitUpdateInterval::Ms100,
1110 Some(i) => i,
1111 };
1112
1113 let normalized_depth = if depth < 5 {
1114 1
1115 } else if depth < 15 {
1116 10
1117 } else {
1118 20
1119 };
1120
1121 let channel = format!(
1122 "book.{}.{}.{}.{}",
1123 instrument_id.symbol,
1124 group,
1125 normalized_depth,
1126 interval.as_str()
1127 );
1128 log::debug!("Subscribing to grouped book channel: {channel}");
1129 self.send_subscribe(vec![channel]).await
1130 }
1131
1132 pub async fn unsubscribe_book_grouped(
1140 &self,
1141 instrument_id: InstrumentId,
1142 group: &str,
1143 depth: u32,
1144 interval: Option<DeribitUpdateInterval>,
1145 ) -> DeribitWsResult<()> {
1146 let interval = match interval {
1148 Some(DeribitUpdateInterval::Raw) | None => DeribitUpdateInterval::Ms100,
1149 Some(i) => i,
1150 };
1151
1152 let normalized_depth = if depth < 5 {
1153 1
1154 } else if depth < 15 {
1155 10
1156 } else {
1157 20
1158 };
1159
1160 let channel = format!(
1161 "book.{}.{}.{}.{}",
1162 instrument_id.symbol,
1163 group,
1164 normalized_depth,
1165 interval.as_str()
1166 );
1167 self.send_unsubscribe(vec![channel]).await
1168 }
1169
1170 pub async fn subscribe_ticker(
1181 &self,
1182 instrument_id: InstrumentId,
1183 interval: Option<DeribitUpdateInterval>,
1184 ) -> DeribitWsResult<()> {
1185 let interval = interval.unwrap_or_default();
1186 self.check_auth_requirement(interval)?;
1187 let channel =
1188 DeribitWsChannel::Ticker.format_channel(instrument_id.symbol.as_str(), Some(interval));
1189 self.send_subscribe(vec![channel]).await
1190 }
1191
1192 pub async fn unsubscribe_ticker(
1198 &self,
1199 instrument_id: InstrumentId,
1200 interval: Option<DeribitUpdateInterval>,
1201 ) -> DeribitWsResult<()> {
1202 let interval = interval.unwrap_or_default();
1203 let channel =
1204 DeribitWsChannel::Ticker.format_channel(instrument_id.symbol.as_str(), Some(interval));
1205 self.send_unsubscribe(vec![channel]).await
1206 }
1207
1208 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
1216 let channel = DeribitWsChannel::Quote.format_channel(instrument_id.symbol.as_str(), None);
1217 self.send_subscribe(vec![channel]).await
1218 }
1219
1220 pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
1226 let channel = DeribitWsChannel::Quote.format_channel(instrument_id.symbol.as_str(), None);
1227 self.send_unsubscribe(vec![channel]).await
1228 }
1229
1230 pub async fn subscribe_instrument_status(
1238 &self,
1239 kind: &str,
1240 currency: &str,
1241 ) -> DeribitWsResult<()> {
1242 let channel = DeribitWsChannel::format_instrument_state_channel(kind, currency);
1243 self.send_subscribe(vec![channel]).await
1244 }
1245
1246 pub async fn unsubscribe_instrument_status(
1252 &self,
1253 kind: &str,
1254 currency: &str,
1255 ) -> DeribitWsResult<()> {
1256 let channel = DeribitWsChannel::format_instrument_state_channel(kind, currency);
1257 self.send_unsubscribe(vec![channel]).await
1258 }
1259
1260 pub async fn subscribe_perpetual_interests_rates_updates(
1268 &self,
1269 instrument_id: InstrumentId,
1270 interval: Option<DeribitUpdateInterval>,
1271 ) -> DeribitWsResult<()> {
1272 let interval = interval.unwrap_or(DeribitUpdateInterval::Ms100);
1273 let channel = DeribitWsChannel::Perpetual
1274 .format_channel(instrument_id.symbol.as_str(), Some(interval));
1275
1276 self.send_subscribe(vec![channel]).await
1277 }
1278
1279 pub async fn unsubscribe_perpetual_interest_rates_updates(
1285 &self,
1286 instrument_id: InstrumentId,
1287 interval: Option<DeribitUpdateInterval>,
1288 ) -> DeribitWsResult<()> {
1289 let interval = interval.unwrap_or(DeribitUpdateInterval::Ms100);
1290 let channel = DeribitWsChannel::Perpetual
1291 .format_channel(instrument_id.symbol.as_str(), Some(interval));
1292
1293 self.send_unsubscribe(vec![channel]).await
1294 }
1295
1296 pub async fn subscribe_chart(
1308 &self,
1309 instrument_id: InstrumentId,
1310 resolution: &str,
1311 ) -> DeribitWsResult<()> {
1312 let channel = format!("chart.trades.{}.{}", instrument_id.symbol, resolution);
1314 self.send_subscribe(vec![channel]).await
1315 }
1316
1317 pub async fn unsubscribe_chart(
1323 &self,
1324 instrument_id: InstrumentId,
1325 resolution: &str,
1326 ) -> DeribitWsResult<()> {
1327 let channel = format!("chart.trades.{}.{}", instrument_id.symbol, resolution);
1328 self.send_unsubscribe(vec![channel]).await
1329 }
1330
1331 pub async fn subscribe_bars(&self, bar_type: BarType) -> DeribitWsResult<()> {
1340 let resolution = bar_spec_to_resolution(&bar_type);
1341 self.subscribe_chart(bar_type.instrument_id(), &resolution)
1342 .await
1343 }
1344
1345 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> DeribitWsResult<()> {
1351 let resolution = bar_spec_to_resolution(&bar_type);
1352 self.unsubscribe_chart(bar_type.instrument_id(), &resolution)
1353 .await
1354 }
1355
1356 fn check_auth_requirement(&self, interval: DeribitUpdateInterval) -> DeribitWsResult<()> {
1362 if interval.requires_auth() && !self.is_authenticated() {
1363 return Err(DeribitWsError::Authentication(
1364 "Raw streams require authentication. Call authenticate() first.".to_string(),
1365 ));
1366 }
1367 Ok(())
1368 }
1369
1370 pub async fn subscribe_user_orders(&self) -> DeribitWsResult<()> {
1378 if !self.is_authenticated() {
1379 return Err(DeribitWsError::Authentication(
1380 "User orders subscription requires authentication".to_string(),
1381 ));
1382 }
1383 self.send_subscribe(vec!["user.orders.any.any.raw".to_string()])
1384 .await
1385 }
1386
1387 pub async fn unsubscribe_user_orders(&self) -> DeribitWsResult<()> {
1393 self.send_unsubscribe(vec!["user.orders.any.any.raw".to_string()])
1394 .await
1395 }
1396
1397 pub async fn subscribe_user_trades(&self) -> DeribitWsResult<()> {
1405 if !self.is_authenticated() {
1406 return Err(DeribitWsError::Authentication(
1407 "User trades subscription requires authentication".to_string(),
1408 ));
1409 }
1410 self.send_subscribe(vec!["user.trades.any.any.raw".to_string()])
1411 .await
1412 }
1413
1414 pub async fn unsubscribe_user_trades(&self) -> DeribitWsResult<()> {
1420 self.send_unsubscribe(vec!["user.trades.any.any.raw".to_string()])
1421 .await
1422 }
1423
1424 pub async fn subscribe_user_portfolio(&self) -> DeribitWsResult<()> {
1434 if !self.is_authenticated() {
1435 return Err(DeribitWsError::Authentication(
1436 "User portfolio subscription requires authentication".to_string(),
1437 ));
1438 }
1439 self.send_subscribe(vec!["user.portfolio.any".to_string()])
1440 .await
1441 }
1442
1443 pub async fn unsubscribe_user_portfolio(&self) -> DeribitWsResult<()> {
1449 self.send_unsubscribe(vec!["user.portfolio.any".to_string()])
1450 .await
1451 }
1452
1453 pub async fn subscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
1459 self.send_subscribe(channels).await
1460 }
1461
1462 pub async fn unsubscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
1468 self.send_unsubscribe(channels).await
1469 }
1470
1471 pub async fn submit_order(
1482 &self,
1483 order_side: OrderSide,
1484 params: DeribitOrderParams,
1485 client_order_id: ClientOrderId,
1486 trader_id: TraderId,
1487 strategy_id: StrategyId,
1488 instrument_id: InstrumentId,
1489 ) -> DeribitWsResult<()> {
1490 if !self.is_authenticated() {
1491 return Err(DeribitWsError::Authentication(
1492 "Submit order requires authentication. Call authenticate_session() first."
1493 .to_string(),
1494 ));
1495 }
1496
1497 log::debug!(
1498 "Sending {} order: instrument={}, amount={}, price={:?}, client_order_id={}",
1499 order_side,
1500 params.instrument_name,
1501 params.amount,
1502 params.price,
1503 client_order_id
1504 );
1505
1506 let cmd = match order_side {
1507 OrderSide::Buy => HandlerCommand::Buy {
1508 params,
1509 client_order_id,
1510 trader_id,
1511 strategy_id,
1512 instrument_id,
1513 },
1514 OrderSide::Sell => HandlerCommand::Sell {
1515 params,
1516 client_order_id,
1517 trader_id,
1518 strategy_id,
1519 instrument_id,
1520 },
1521 _ => {
1522 return Err(DeribitWsError::ClientError(format!(
1523 "Invalid order side: {order_side}"
1524 )));
1525 }
1526 };
1527
1528 self.cmd_tx
1529 .read()
1530 .await
1531 .send(cmd)
1532 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1533
1534 Ok(())
1535 }
1536
1537 #[expect(clippy::too_many_arguments)]
1548 pub async fn modify_order(
1549 &self,
1550 order_id: &str,
1551 quantity: Quantity,
1552 price: Price,
1553 client_order_id: ClientOrderId,
1554 trader_id: TraderId,
1555 strategy_id: StrategyId,
1556 instrument_id: InstrumentId,
1557 ) -> DeribitWsResult<()> {
1558 if !self.is_authenticated() {
1559 return Err(DeribitWsError::Authentication(
1560 "Modify order requires authentication. Call authenticate_session() first."
1561 .to_string(),
1562 ));
1563 }
1564
1565 let params = DeribitEditParams {
1566 order_id: order_id.to_string(),
1567 amount: quantity.as_decimal(),
1568 price: Some(price.as_decimal()),
1569 post_only: None,
1570 reject_post_only: None,
1571 reduce_only: None,
1572 trigger_price: None,
1573 };
1574
1575 log::debug!(
1576 "Sending modify order: order_id={order_id}, quantity={quantity}, price={price}, client_order_id={client_order_id}"
1577 );
1578
1579 self.cmd_tx
1580 .read()
1581 .await
1582 .send(HandlerCommand::Edit {
1583 params,
1584 client_order_id,
1585 trader_id,
1586 strategy_id,
1587 instrument_id,
1588 })
1589 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1590
1591 Ok(())
1592 }
1593
1594 pub async fn cancel_order(
1605 &self,
1606 order_id: &str,
1607 client_order_id: ClientOrderId,
1608 trader_id: TraderId,
1609 strategy_id: StrategyId,
1610 instrument_id: InstrumentId,
1611 ) -> DeribitWsResult<()> {
1612 if !self.is_authenticated() {
1613 return Err(DeribitWsError::Authentication(
1614 "Cancel order requires authentication. Call authenticate_session() first."
1615 .to_string(),
1616 ));
1617 }
1618
1619 let params = DeribitCancelParams {
1620 order_id: order_id.to_string(),
1621 };
1622
1623 log::debug!("Sending cancel order: order_id={order_id}, client_order_id={client_order_id}");
1624
1625 self.cmd_tx
1626 .read()
1627 .await
1628 .send(HandlerCommand::Cancel {
1629 params,
1630 client_order_id,
1631 trader_id,
1632 strategy_id,
1633 instrument_id,
1634 })
1635 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1636
1637 Ok(())
1638 }
1639
1640 pub async fn cancel_all_orders(
1651 &self,
1652 instrument_id: InstrumentId,
1653 order_type: Option<String>,
1654 ) -> DeribitWsResult<()> {
1655 if !self.is_authenticated() {
1656 return Err(DeribitWsError::Authentication(
1657 "Cancel all orders requires authentication. Call authenticate_session() first."
1658 .to_string(),
1659 ));
1660 }
1661
1662 let instrument_name = instrument_id.symbol.to_string();
1663 let params = DeribitCancelAllByInstrumentParams {
1664 instrument_name: instrument_name.clone(),
1665 order_type,
1666 };
1667
1668 log::debug!("Sending cancel_all_orders: instrument={instrument_name}");
1669
1670 self.cmd_tx
1671 .read()
1672 .await
1673 .send(HandlerCommand::CancelAllByInstrument {
1674 params,
1675 instrument_id,
1676 })
1677 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1678
1679 Ok(())
1680 }
1681
1682 pub async fn query_order(
1693 &self,
1694 order_id: &str,
1695 client_order_id: ClientOrderId,
1696 trader_id: TraderId,
1697 strategy_id: StrategyId,
1698 instrument_id: InstrumentId,
1699 ) -> DeribitWsResult<()> {
1700 if !self.is_authenticated() {
1701 return Err(DeribitWsError::Authentication(
1702 "Query order state requires authentication. Call authenticate_session() first."
1703 .to_string(),
1704 ));
1705 }
1706
1707 log::debug!("Sending query_order: order_id={order_id}, client_order_id={client_order_id}");
1708
1709 self.cmd_tx
1710 .read()
1711 .await
1712 .send(HandlerCommand::GetOrderState {
1713 order_id: order_id.to_string(),
1714 client_order_id,
1715 trader_id,
1716 strategy_id,
1717 instrument_id,
1718 })
1719 .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1720
1721 Ok(())
1722 }
1723}