Skip to main content

nautilus_deribit/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for the Deribit API.
17//!
18//! The [`DeribitWebSocketClient`] provides connectivity to Deribit's WebSocket API using
19//! JSON-RPC 2.0. It supports subscribing to market data channels including trades, order books,
20//! and tickers.
21
22use std::{
23    fmt::Debug,
24    sync::{
25        Arc, Mutex,
26        atomic::{AtomicBool, AtomicU8, Ordering},
27    },
28    time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use futures_util::Stream;
33use nautilus_common::{enums::LogColor, live::get_runtime, log_info};
34use nautilus_core::{
35    AtomicMap, AtomicSet, consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt,
36    time::get_atomic_clock_realtime,
37};
38use nautilus_model::{
39    data::BarType,
40    enums::OrderSide,
41    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
42    instruments::{Instrument, InstrumentAny},
43    types::{Price, Quantity},
44};
45use nautilus_network::{
46    http::USER_AGENT,
47    mode::ConnectionMode,
48    websocket::{
49        AuthTracker, PingHandler, SubscriptionState, TransportBackend, WebSocketClient,
50        WebSocketConfig, channel_message_handler,
51    },
52};
53use tokio_util::sync::CancellationToken;
54use ustr::Ustr;
55
56use super::{
57    auth::{AuthState, send_auth_request, spawn_token_refresh_task},
58    enums::{DeribitUpdateInterval, DeribitWsChannel},
59    error::{DeribitWsError, DeribitWsResult},
60    handler::{DeribitWsFeedHandler, HandlerCommand},
61    messages::{
62        DeribitCancelAllByInstrumentParams, DeribitCancelParams, DeribitEditParams,
63        DeribitOrderParams, NautilusWsMessage,
64    },
65};
66use crate::common::{
67    consts::{
68        DERIBIT_TESTNET_WS_URL, DERIBIT_WS_HEARTBEAT_SECS, DERIBIT_WS_ORDER_KEY,
69        DERIBIT_WS_ORDER_QUOTA, DERIBIT_WS_SUBSCRIPTION_KEY, DERIBIT_WS_SUBSCRIPTION_QUOTA,
70        DERIBIT_WS_URL,
71    },
72    credential::{Credential, credential_env_vars},
73    enums::DeribitEnvironment,
74    parse::bar_spec_to_resolution,
75};
76
77/// Authentication timeout in seconds.
78const AUTHENTICATION_TIMEOUT_SECS: u64 = 30;
79
80/// WebSocket client for connecting to Deribit.
81#[derive(Clone)]
82#[cfg_attr(
83    feature = "python",
84    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.deribit", from_py_object)
85)]
86#[cfg_attr(
87    feature = "python",
88    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.deribit")
89)]
90pub struct DeribitWebSocketClient {
91    url: String,
92    environment: DeribitEnvironment,
93    heartbeat_interval: Option<u64>,
94    credential: Option<Credential>,
95    auth_state: Arc<tokio::sync::RwLock<Option<AuthState>>>,
96    signal: Arc<AtomicBool>,
97    connection_mode: Arc<ArcSwap<AtomicU8>>,
98    auth_tracker: AuthTracker,
99    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
100    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
101    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
102    subscriptions_state: SubscriptionState,
103    instruments_cache: Arc<AtomicMap<Ustr, InstrumentAny>>,
104    option_greeks_subs: Arc<AtomicSet<InstrumentId>>,
105    mark_price_subs: Arc<AtomicSet<InstrumentId>>,
106    index_price_subs: Arc<AtomicSet<InstrumentId>>,
107    cancellation_token: CancellationToken,
108    account_id: Option<AccountId>,
109    bars_timestamp_on_close: bool,
110    subscribe_errors: Arc<Mutex<Vec<String>>>,
111    transport_backend: TransportBackend,
112    proxy_url: Option<String>,
113}
114
115impl Debug for DeribitWebSocketClient {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        f.debug_struct(stringify!(DeribitWebSocketClient))
118            .field("url", &self.url)
119            .field("environment", &self.environment)
120            .field("has_credentials", &self.credential.is_some())
121            .field("is_authenticated", &self.auth_tracker.is_authenticated())
122            .field(
123                "has_auth_state",
124                &self.auth_state.try_read().is_ok_and(|s| s.is_some()),
125            )
126            .field("heartbeat_interval", &self.heartbeat_interval)
127            .finish_non_exhaustive()
128    }
129}
130
131impl DeribitWebSocketClient {
132    /// Creates a new [`DeribitWebSocketClient`] instance.
133    ///
134    /// Falls back to environment variables if credentials are not provided.
135    ///
136    /// # Errors
137    ///
138    /// Returns an error if only one of `api_key` or `api_secret` is provided.
139    pub fn new(
140        url: Option<String>,
141        api_key: Option<String>,
142        api_secret: Option<String>,
143        heartbeat_interval: u64,
144        environment: DeribitEnvironment,
145        transport_backend: TransportBackend,
146        proxy_url: Option<String>,
147    ) -> anyhow::Result<Self> {
148        Self::new_inner(
149            url,
150            api_key,
151            api_secret,
152            heartbeat_interval,
153            environment,
154            true,
155            transport_backend,
156            proxy_url,
157        )
158    }
159
160    /// Internal constructor with control over environment variable fallback.
161    #[expect(clippy::too_many_arguments)]
162    fn new_inner(
163        url: Option<String>,
164        api_key: Option<String>,
165        api_secret: Option<String>,
166        heartbeat_interval: u64,
167        environment: DeribitEnvironment,
168        env_fallback: bool,
169        transport_backend: TransportBackend,
170        proxy_url: Option<String>,
171    ) -> anyhow::Result<Self> {
172        let url = url.unwrap_or_else(|| match environment {
173            DeribitEnvironment::Testnet => DERIBIT_TESTNET_WS_URL.to_string(),
174            DeribitEnvironment::Mainnet => DERIBIT_WS_URL.to_string(),
175        });
176
177        // Resolve credential from config or environment variables (if env_fallback is true)
178        let credential =
179            Credential::resolve_with_env_fallback(api_key, api_secret, environment, env_fallback)?;
180
181        if credential.is_some() {
182            log::info!("Credentials loaded ({environment})");
183        } else {
184            log::debug!("No credentials configured - unauthenticated mode");
185        }
186
187        let signal = Arc::new(AtomicBool::new(false));
188        let subscriptions_state = SubscriptionState::new('.');
189
190        Ok(Self {
191            url,
192            environment,
193            heartbeat_interval: Some(heartbeat_interval),
194            credential,
195            auth_state: Arc::new(tokio::sync::RwLock::new(None)),
196            signal,
197            connection_mode: Arc::new(ArcSwap::from_pointee(AtomicU8::new(
198                ConnectionMode::Closed.as_u8(),
199            ))),
200            auth_tracker: AuthTracker::new(),
201            cmd_tx: {
202                let (tx, _) = tokio::sync::mpsc::unbounded_channel();
203                Arc::new(tokio::sync::RwLock::new(tx))
204            },
205            out_rx: None,
206            task_handle: None,
207            subscriptions_state,
208            instruments_cache: Arc::new(AtomicMap::new()),
209            option_greeks_subs: Arc::new(AtomicSet::new()),
210            mark_price_subs: Arc::new(AtomicSet::new()),
211            index_price_subs: Arc::new(AtomicSet::new()),
212            cancellation_token: CancellationToken::new(),
213            account_id: None,
214            bars_timestamp_on_close: true,
215            subscribe_errors: Arc::new(Mutex::new(Vec::new())),
216            transport_backend,
217            proxy_url,
218        })
219    }
220
221    /// Creates a new public (unauthenticated) client.
222    ///
223    /// Does NOT fall back to environment variables for credentials.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if initialization fails.
228    pub fn new_public(
229        environment: DeribitEnvironment,
230        proxy_url: Option<String>,
231    ) -> anyhow::Result<Self> {
232        Self::new_inner(
233            None,
234            None,
235            None,
236            DERIBIT_WS_HEARTBEAT_SECS,
237            environment,
238            false,
239            TransportBackend::default(),
240            proxy_url,
241        )
242    }
243
244    /// Creates an unauthenticated client with a custom URL.
245    ///
246    /// Does NOT fall back to environment variables for credentials.
247    /// Useful for testing against mock servers.
248    ///
249    /// # Errors
250    ///
251    /// Returns an error if initialization fails.
252    pub fn new_unauthenticated(
253        url: Option<String>,
254        heartbeat_interval: u64,
255        environment: DeribitEnvironment,
256    ) -> anyhow::Result<Self> {
257        Self::new_inner(
258            url,
259            None,
260            None,
261            heartbeat_interval,
262            environment,
263            false,
264            TransportBackend::default(),
265            None,
266        )
267    }
268
269    /// Creates an authenticated client with credentials.
270    ///
271    /// Uses environment variables to load credentials:
272    /// - Testnet: `DERIBIT_TESTNET_API_KEY` and `DERIBIT_TESTNET_API_SECRET`
273    /// - Mainnet: `DERIBIT_API_KEY` and `DERIBIT_API_SECRET`
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if credentials are not found in environment variables.
278    pub fn with_credentials(
279        environment: DeribitEnvironment,
280        proxy_url: Option<String>,
281    ) -> anyhow::Result<Self> {
282        let (key_env, secret_env) = credential_env_vars(environment);
283
284        let api_key = get_or_env_var_opt(None, key_env)
285            .ok_or_else(|| anyhow::anyhow!("Missing environment variable: {key_env}"))?;
286        let api_secret = get_or_env_var_opt(None, secret_env)
287            .ok_or_else(|| anyhow::anyhow!("Missing environment variable: {secret_env}"))?;
288
289        Self::new(
290            None,
291            Some(api_key),
292            Some(api_secret),
293            DERIBIT_WS_HEARTBEAT_SECS,
294            environment,
295            TransportBackend::default(),
296            proxy_url,
297        )
298    }
299
300    /// Returns the current connection mode.
301    fn connection_mode(&self) -> ConnectionMode {
302        let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
303        ConnectionMode::from_u8(mode_u8)
304    }
305
306    /// Returns whether the client is actively connected.
307    #[must_use]
308    pub fn is_active(&self) -> bool {
309        self.connection_mode() == ConnectionMode::Active
310    }
311
312    /// Returns the WebSocket URL.
313    #[must_use]
314    pub fn url(&self) -> &str {
315        &self.url
316    }
317
318    /// Returns the environment for this client.
319    #[must_use]
320    pub fn environment(&self) -> DeribitEnvironment {
321        self.environment
322    }
323
324    /// Returns whether the client is closed.
325    #[must_use]
326    pub fn is_closed(&self) -> bool {
327        let mode = self.connection_mode();
328        mode == ConnectionMode::Disconnect || mode == ConnectionMode::Closed
329    }
330
331    /// Cancel all pending WebSocket requests.
332    pub fn cancel_all_requests(&self) {
333        self.cancellation_token.cancel();
334    }
335
336    /// Returns the cancellation token for this client.
337    #[must_use]
338    pub fn cancellation_token(&self) -> &CancellationToken {
339        &self.cancellation_token
340    }
341
342    /// Waits until the client is active or timeout expires.
343    ///
344    /// # Errors
345    ///
346    /// Returns an error if the timeout expires before the client becomes active.
347    pub async fn wait_until_active(&self, timeout_secs: f64) -> DeribitWsResult<()> {
348        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
349
350        tokio::time::timeout(timeout, async {
351            while !self.is_active() {
352                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
353            }
354        })
355        .await
356        .map_err(|_| {
357            DeribitWsError::Timeout(format!(
358                "WebSocket connection timeout after {timeout_secs} seconds"
359            ))
360        })?;
361
362        Ok(())
363    }
364
365    /// Waits until all pending subscriptions are confirmed by the server.
366    ///
367    /// # Errors
368    ///
369    /// Returns an error if the timeout expires before all subscriptions are confirmed.
370    pub async fn wait_for_subscriptions_confirmed(&self, timeout_secs: f64) -> DeribitWsResult<()> {
371        let timeout = Duration::from_secs_f64(timeout_secs);
372
373        tokio::time::timeout(timeout, async {
374            loop {
375                // Fail fast on permanent subscribe errors
376                if let Ok(mut errors) = self.subscribe_errors.lock()
377                    && !errors.is_empty()
378                {
379                    let msg = errors.join("; ");
380                    errors.clear();
381                    return Err(DeribitWsError::Subscribe(msg));
382                }
383
384                let pending = self.subscriptions_state.pending_subscribe_topics();
385                if pending.is_empty() {
386                    return Ok(());
387                }
388                tokio::time::sleep(Duration::from_millis(10)).await;
389            }
390        })
391        .await
392        .map_err(|_| {
393            let pending = self.subscriptions_state.pending_subscribe_topics();
394            DeribitWsError::Timeout(format!(
395                "Subscription confirmation timeout after {timeout_secs}s, \
396                still pending: {pending:?}"
397            ))
398        })?
399    }
400
401    /// Caches instruments for use during message parsing.
402    pub fn cache_instruments(&self, instruments: &[InstrumentAny]) {
403        self.instruments_cache.rcu(|m| {
404            for inst in instruments {
405                m.insert(inst.raw_symbol().inner(), inst.clone());
406            }
407        });
408        log::debug!("Cached {} instruments", self.instruments_cache.len());
409
410        // Send per-instrument updates to the live handler rather than
411        // a full snapshot, avoiding out-of-order snapshot races.
412        if self.is_active() {
413            for inst in instruments {
414                let tx = self.cmd_tx.clone();
415                let boxed = Box::new(inst.clone());
416
417                get_runtime().spawn(async move {
418                    let _ = tx
419                        .read()
420                        .await
421                        .send(HandlerCommand::UpdateInstrument(boxed));
422                });
423            }
424        }
425    }
426
427    /// Caches a single instrument.
428    pub fn cache_instrument(&self, instrument: InstrumentAny) {
429        let symbol = instrument.raw_symbol().inner();
430        self.instruments_cache.insert(symbol, instrument);
431
432        // If connected, send update to handler
433        if self.is_active() {
434            let tx = self.cmd_tx.clone();
435            let inst = self.instruments_cache.get_cloned(&symbol);
436            if let Some(inst) = inst {
437                get_runtime().spawn(async move {
438                    let _ = tx
439                        .read()
440                        .await
441                        .send(HandlerCommand::UpdateInstrument(Box::new(inst)));
442                });
443            }
444        }
445    }
446
447    /// Sets the shared option greeks subscription set for handler-side gating.
448    pub fn set_option_greeks_subs(&mut self, subs: Arc<AtomicSet<InstrumentId>>) {
449        self.option_greeks_subs = subs;
450    }
451
452    /// Sets the shared mark price subscription set for handler-side gating.
453    pub fn set_mark_price_subs(&mut self, subs: Arc<AtomicSet<InstrumentId>>) {
454        self.mark_price_subs = subs;
455    }
456
457    /// Sets the shared index price subscription set for handler-side gating.
458    pub fn set_index_price_subs(&mut self, subs: Arc<AtomicSet<InstrumentId>>) {
459        self.index_price_subs = subs;
460    }
461
462    /// Registers an instrument for mark price emission from ticker messages.
463    pub fn add_mark_price_sub(&self, instrument_id: InstrumentId) {
464        self.mark_price_subs.insert(instrument_id);
465    }
466
467    /// Unregisters an instrument from mark price emission.
468    pub fn remove_mark_price_sub(&self, instrument_id: &InstrumentId) {
469        self.mark_price_subs.remove(instrument_id);
470    }
471
472    /// Registers an instrument for index price emission from ticker messages.
473    pub fn add_index_price_sub(&self, instrument_id: InstrumentId) {
474        self.index_price_subs.insert(instrument_id);
475    }
476
477    /// Unregisters an instrument from index price emission.
478    pub fn remove_index_price_sub(&self, instrument_id: &InstrumentId) {
479        self.index_price_subs.remove(instrument_id);
480    }
481
482    /// Registers an instrument for option greeks emission from ticker messages.
483    pub fn add_option_greeks_sub(&self, instrument_id: InstrumentId) {
484        self.option_greeks_subs.insert(instrument_id);
485    }
486
487    /// Unregisters an instrument from option greeks emission.
488    pub fn remove_option_greeks_sub(&self, instrument_id: &InstrumentId) {
489        self.option_greeks_subs.remove(instrument_id);
490    }
491
492    /// Connects to the Deribit WebSocket API.
493    ///
494    /// # Errors
495    ///
496    /// Returns an error if the connection fails.
497    pub async fn connect(&mut self) -> anyhow::Result<()> {
498        log_info!(
499            "Connecting to WebSocket: {}",
500            self.url,
501            color = LogColor::Blue
502        );
503
504        if let Some(handle) = self.task_handle.take() {
505            handle.abort();
506        }
507
508        // Reset stop signal and subscription state so callers can
509        // resubscribe cleanly after a manual disconnect/connect cycle.
510        self.signal.store(false, Ordering::Relaxed);
511        self.subscriptions_state.clear();
512
513        // Create message handler and channel
514        let (message_handler, raw_rx) = channel_message_handler();
515
516        // No-op ping handler: handler responds to pings directly
517        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
518            // Handler responds to pings internally
519        });
520
521        // Configure WebSocket client
522        let config = WebSocketConfig {
523            url: self.url.clone(),
524            headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
525            heartbeat: self.heartbeat_interval,
526            heartbeat_msg: None, // Deribit uses JSON-RPC heartbeat, not text ping
527            reconnect_timeout_ms: Some(5_000),
528            reconnect_delay_initial_ms: None,
529            reconnect_delay_max_ms: None,
530            reconnect_backoff_factor: None,
531            reconnect_jitter_ms: None,
532            reconnect_max_attempts: None,
533            idle_timeout_ms: None,
534            backend: self.transport_backend,
535            proxy_url: self.proxy_url.clone(),
536        };
537
538        // Configure rate limits
539        let keyed_quotas = vec![
540            (
541                DERIBIT_WS_SUBSCRIPTION_KEY.to_string(),
542                *DERIBIT_WS_SUBSCRIPTION_QUOTA,
543            ),
544            (DERIBIT_WS_ORDER_KEY.to_string(), *DERIBIT_WS_ORDER_QUOTA),
545        ];
546
547        // Connect the WebSocket
548        let ws_client = WebSocketClient::connect(
549            config,
550            Some(message_handler),
551            Some(ping_handler),
552            None, // post_reconnection
553            keyed_quotas,
554            Some(*DERIBIT_WS_SUBSCRIPTION_QUOTA), // Default quota for non-order operations
555        )
556        .await?;
557
558        // Store connection mode
559        self.connection_mode
560            .store(ws_client.connection_mode_atomic());
561
562        // Create message channels
563        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
564        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
565
566        // Store command sender and output receiver
567        *self.cmd_tx.write().await = cmd_tx.clone();
568        self.out_rx = Some(Arc::new(out_rx));
569
570        if let Ok(mut errors) = self.subscribe_errors.lock() {
571            errors.clear();
572        }
573
574        // Create handler
575        let mut handler = DeribitWsFeedHandler::new(
576            self.signal.clone(),
577            cmd_rx,
578            raw_rx,
579            out_tx,
580            self.auth_tracker.clone(),
581            self.subscriptions_state.clone(),
582            self.option_greeks_subs.clone(),
583            self.mark_price_subs.clone(),
584            self.index_price_subs.clone(),
585            self.account_id,
586            self.bars_timestamp_on_close,
587            self.subscribe_errors.clone(),
588        );
589
590        // Send client to handler
591        let _ = cmd_tx.send(HandlerCommand::SetClient(ws_client));
592
593        // Replay cached instruments
594        let instruments: Vec<InstrumentAny> =
595            self.instruments_cache.load().values().cloned().collect();
596
597        if !instruments.is_empty() {
598            log::debug!(
599                "Sending {} cached instruments to handler",
600                instruments.len()
601            );
602            let _ = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments));
603        }
604
605        // Enable heartbeat if configured
606        if let Some(interval) = self.heartbeat_interval {
607            let _ = cmd_tx.send(HandlerCommand::SetHeartbeat { interval });
608        }
609
610        // Spawn handler task
611        let subscriptions_state = self.subscriptions_state.clone();
612        let credential = self.credential.clone();
613        let auth_tracker = self.auth_tracker.clone();
614        let auth_state = self.auth_state.clone();
615
616        let task_handle = get_runtime().spawn(async move {
617            const MAX_REAUTH_ATTEMPTS: u32 = 3;
618
619            let mut pending_reauth = false;
620            let mut reauth_attempts: u32 = 0;
621
622            let mut refresh_cancel = CancellationToken::new();
623            let mut retry_cancel = CancellationToken::new();
624
625            loop {
626                match handler.next().await {
627                    Some(msg) => match msg {
628                        NautilusWsMessage::Reconnected => {
629                            log::info!("Reconnected to WebSocket");
630
631                            // Cancel stale refresh and retry tasks from prior connection
632                            refresh_cancel.cancel();
633                            refresh_cancel = CancellationToken::new();
634                            retry_cancel.cancel();
635                            retry_cancel = CancellationToken::new();
636
637                            let channels = subscriptions_state.all_topics();
638
639                            for channel in &channels {
640                                subscriptions_state.mark_failure(channel);
641                            }
642
643                            // Check if we need to re-authenticate
644                            if let Some(cred) = &credential {
645                                log::info!("Re-authenticating after reconnection...");
646
647                                let _rx = auth_tracker.begin();
648                                pending_reauth = true;
649                                reauth_attempts = 1;
650
651                                let previous_scope = auth_state
652                                    .read()
653                                    .await
654                                    .as_ref()
655                                    .map(|s| s.scope.clone());
656
657                                send_auth_request(cred, previous_scope, &cmd_tx);
658                            } else {
659                                // No credentials - resubscribe immediately
660                                if !channels.is_empty() {
661                                    let _ = cmd_tx.send(HandlerCommand::Subscribe { channels });
662                                }
663                            }
664                        }
665                        NautilusWsMessage::Authenticated(result) => {
666                            let timestamp = get_atomic_clock_realtime().get_time_ms();
667                            let new_auth_state = AuthState::from_auth_result(&result, timestamp);
668                            *auth_state.write().await = Some(new_auth_state);
669
670                            refresh_cancel.cancel();
671                            refresh_cancel = CancellationToken::new();
672                            retry_cancel.cancel();
673                            retry_cancel = CancellationToken::new();
674
675                            spawn_token_refresh_task(
676                                result.expires_in,
677                                result.refresh_token.clone(),
678                                cmd_tx.clone(),
679                                refresh_cancel.clone(),
680                            );
681
682                            if pending_reauth {
683                                pending_reauth = false;
684                                reauth_attempts = 0;
685                                log::info!(
686                                    "Re-authentication successful (scope: {}), resubscribing to channels",
687                                    result.scope
688                                );
689
690                                let channels = subscriptions_state.all_topics();
691
692                                if !channels.is_empty() {
693                                    let _ = cmd_tx.send(HandlerCommand::Subscribe { channels });
694                                }
695                            } else {
696                                log::debug!(
697                                    "Auth state stored: scope={}, expires_in={}s",
698                                    result.scope,
699                                    result.expires_in
700                                );
701                            }
702                        }
703                        NautilusWsMessage::AuthenticationFailed(reason) => {
704                            if pending_reauth && reauth_attempts < MAX_REAUTH_ATTEMPTS {
705                                let delay_secs = 1u64 << reauth_attempts; // 2s, 4s
706                                log::warn!(
707                                    "Re-authentication attempt {reauth_attempts}/{MAX_REAUTH_ATTEMPTS} \
708                                    failed: {reason} - retrying in {delay_secs}s",
709                                );
710                                reauth_attempts += 1;
711
712                                // Spawn delayed retry so the handler loop keeps
713                                // processing messages during the backoff
714                                if let Some(cred) = &credential {
715                                    let cred = cred.clone();
716                                    let auth_state = auth_state.clone();
717                                    let auth_tracker = auth_tracker.clone();
718                                    let cmd_tx = cmd_tx.clone();
719                                    let cancel = retry_cancel.clone();
720
721                                    get_runtime().spawn(async move {
722                                        tokio::select! {
723                                            () = tokio::time::sleep(Duration::from_secs(delay_secs)) => {}
724                                            () = cancel.cancelled() => return,
725                                        }
726                                        let _rx = auth_tracker.begin();
727                                        let previous_scope = auth_state
728                                            .read()
729                                            .await
730                                            .as_ref()
731                                            .map(|s| s.scope.clone());
732                                        send_auth_request(&cred, previous_scope, &cmd_tx);
733                                    });
734                                }
735                            } else if pending_reauth {
736                                pending_reauth = false;
737                                reauth_attempts = 0;
738                                log::error!(
739                                    "Re-authentication failed after {MAX_REAUTH_ATTEMPTS} \
740                                    attempts: {reason} \
741                                    - resubscribing to public channels only"
742                                );
743
744                                let all = subscriptions_state.all_topics();
745                                let mut public_channels = Vec::new();
746
747                                for ch in &all {
748                                    if DeribitWsChannel::requires_auth(ch) {
749                                        // Release private channels so future subscribe
750                                        // calls aren't skipped as already referenced
751                                        subscriptions_state.mark_unsubscribe(ch);
752                                        subscriptions_state.confirm_unsubscribe(ch);
753                                        subscriptions_state.remove_reference(ch);
754                                    } else {
755                                        public_channels.push(ch.clone());
756                                    }
757                                }
758
759                                if !public_channels.is_empty() {
760                                    let _ = cmd_tx.send(HandlerCommand::Subscribe {
761                                        channels: public_channels,
762                                    });
763                                }
764                            } else {
765                                log::error!("Authentication failed: {reason}");
766                            }
767                        }
768                        _ => {}
769                    },
770                    None => {
771                        log::debug!("Handler returned None, stopping task");
772                        break;
773                    }
774                }
775            }
776        });
777
778        self.task_handle = Some(Arc::new(task_handle));
779        log::info!("Connected to WebSocket");
780
781        Ok(())
782    }
783
784    /// Closes the WebSocket connection.
785    ///
786    /// # Errors
787    ///
788    /// Returns an error if the close operation fails.
789    pub async fn close(&self) -> DeribitWsResult<()> {
790        log::info!("Closing WebSocket connection");
791        self.signal.store(true, Ordering::Relaxed);
792
793        let _ = self.cmd_tx.read().await.send(HandlerCommand::Disconnect);
794
795        // Poll for graceful handler shutdown, abort after 2s deadline
796        if let Some(handle) = &self.task_handle {
797            let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
798            while !handle.is_finished() && tokio::time::Instant::now() < deadline {
799                tokio::time::sleep(Duration::from_millis(50)).await;
800            }
801
802            if !handle.is_finished() {
803                handle.abort();
804            }
805        }
806
807        self.auth_tracker.invalidate();
808
809        Ok(())
810    }
811
812    /// Returns a stream of WebSocket messages.
813    ///
814    /// # Errors
815    ///
816    /// Returns an error if called before `connect()` or if called more than once.
817    pub fn stream(&mut self) -> DeribitWsResult<impl Stream<Item = NautilusWsMessage> + 'static> {
818        let rx = self.out_rx.take().ok_or_else(|| {
819            DeribitWsError::ClientError(
820                "Stream receiver already taken or not connected".to_string(),
821            )
822        })?;
823        let mut rx = Arc::try_unwrap(rx).map_err(|_| {
824            DeribitWsError::ClientError(
825                "Cannot take stream ownership - other references exist".to_string(),
826            )
827        })?;
828
829        Ok(async_stream::stream! {
830            while let Some(msg) = rx.recv().await {
831                yield msg;
832            }
833        })
834    }
835
836    /// Returns whether the client has credentials configured.
837    #[must_use]
838    pub fn has_credentials(&self) -> bool {
839        self.credential.is_some()
840    }
841
842    /// Returns whether the client is authenticated.
843    #[must_use]
844    pub fn is_authenticated(&self) -> bool {
845        self.auth_tracker.is_authenticated()
846    }
847
848    /// Authenticates the WebSocket session with Deribit.
849    ///
850    /// Uses the `client_signature` grant type with HMAC-SHA256 signature.
851    /// This must be called before subscribing to raw data streams.
852    ///
853    /// # Arguments
854    ///
855    /// * `session_name` - Optional session name for session-scoped authentication.
856    ///   When provided, uses `session:<name>` scope which allows skipping `access_token`
857    ///   in subsequent private requests. When `None`, uses default `connection` scope.
858    ///   Recommended to use session scope for order execution compatibility.
859    ///
860    /// # Errors
861    ///
862    /// Returns an error if:
863    /// - No credentials are configured
864    /// - The authentication request fails
865    /// - The authentication times out
866    pub async fn authenticate(&self, session_name: Option<&str>) -> DeribitWsResult<()> {
867        let credential = self.credential.as_ref().ok_or_else(|| {
868            DeribitWsError::Authentication("API credentials not configured".to_string())
869        })?;
870
871        // Determine scope
872        let scope = session_name.map(|name| format!("session:{name}"));
873
874        log::info!("Authenticating WebSocket...");
875
876        let rx = self.auth_tracker.begin();
877
878        // Send authentication request
879        let cmd_tx = self.cmd_tx.read().await;
880        send_auth_request(credential, scope, &cmd_tx);
881        drop(cmd_tx);
882
883        // Wait for authentication result with timeout
884        match self
885            .auth_tracker
886            .wait_for_result::<DeribitWsError>(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), rx)
887            .await
888        {
889            Ok(()) => {
890                log::info!("WebSocket authenticated successfully");
891                Ok(())
892            }
893            Err(e) => {
894                log::error!("WebSocket authentication failed: error={e}");
895                Err(e)
896            }
897        }
898    }
899
900    /// Authenticates with session scope using the provided session name.
901    ///
902    /// Use `DERIBIT_DATA_SESSION_NAME` for data clients and
903    /// `DERIBIT_EXECUTION_SESSION_NAME` for execution clients.
904    ///
905    /// # Errors
906    ///
907    /// Returns an error if authentication fails.
908    pub async fn authenticate_session(&self, session_name: &str) -> DeribitWsResult<()> {
909        self.authenticate(Some(session_name)).await
910    }
911
912    /// Returns the current authentication state containing tokens.
913    ///
914    /// Returns `None` if not authenticated or tokens haven't been stored yet.
915    pub async fn auth_state(&self) -> Option<AuthState> {
916        self.auth_state.read().await.clone()
917    }
918
919    /// Returns the current access token if available.
920    pub async fn access_token(&self) -> Option<String> {
921        self.auth_state
922            .read()
923            .await
924            .as_ref()
925            .map(|s| s.access_token.clone())
926    }
927
928    /// Sets the account ID for order/fill reports.
929    pub fn set_account_id(&mut self, account_id: AccountId) {
930        self.account_id = Some(account_id);
931    }
932
933    /// Sets whether bar timestamps should use the close time.
934    ///
935    /// When `true` (default), bar `ts_event` is set to the bar's close time.
936    pub fn set_bars_timestamp_on_close(&mut self, value: bool) {
937        self.bars_timestamp_on_close = value;
938    }
939
940    async fn send_subscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
941        let mut channels_to_subscribe = Vec::new();
942
943        for channel in channels {
944            if self.subscriptions_state.add_reference(&channel) {
945                self.subscriptions_state.mark_subscribe(&channel);
946                channels_to_subscribe.push(channel);
947            } else {
948                log::debug!("Already subscribed to {channel}, skipping duplicate subscription");
949            }
950        }
951
952        if channels_to_subscribe.is_empty() {
953            return Ok(());
954        }
955
956        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
957            channels: channels_to_subscribe.clone(),
958        }) {
959            // Roll back: remove reference and clear pending_subscribe
960            for channel in &channels_to_subscribe {
961                self.subscriptions_state.remove_reference(channel);
962                self.subscriptions_state.mark_unsubscribe(channel);
963                self.subscriptions_state.confirm_unsubscribe(channel);
964            }
965            return Err(DeribitWsError::Send(e.to_string()));
966        }
967
968        log::debug!(
969            "Sent subscribe for {} channels",
970            channels_to_subscribe.len()
971        );
972        Ok(())
973    }
974
975    async fn send_unsubscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
976        let mut channels_to_unsubscribe = Vec::new();
977
978        for channel in channels {
979            if self.subscriptions_state.remove_reference(&channel) {
980                self.subscriptions_state.mark_unsubscribe(&channel);
981                channels_to_unsubscribe.push(channel);
982            } else {
983                log::debug!("Still has references to {channel}, skipping unsubscription");
984            }
985        }
986
987        if channels_to_unsubscribe.is_empty() {
988            return Ok(());
989        }
990
991        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Unsubscribe {
992            channels: channels_to_unsubscribe.clone(),
993        }) {
994            // Send only fails when the handler task is dead, meaning the
995            // connection is broken. Restore refcount and mark confirmed so
996            // the topic is not wedged in pending_unsubscribe. This may
997            // promote a pending_subscribe topic to confirmed, but that is
998            // harmless: connect() calls clear() on the next connection
999            // attempt, resetting all subscription state.
1000            for channel in &channels_to_unsubscribe {
1001                self.subscriptions_state.confirm_unsubscribe(channel);
1002                self.subscriptions_state.add_reference(channel);
1003                self.subscriptions_state.confirm_subscribe(channel);
1004            }
1005            return Err(DeribitWsError::Send(e.to_string()));
1006        }
1007
1008        log::debug!(
1009            "Sent unsubscribe for {} channels",
1010            channels_to_unsubscribe.len()
1011        );
1012        Ok(())
1013    }
1014
1015    /// Subscribes to trade updates for an instrument.
1016    ///
1017    /// # Arguments
1018    ///
1019    /// * `instrument_id` - The instrument to subscribe to
1020    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
1021    ///
1022    /// # Errors
1023    ///
1024    /// Returns an error if subscription fails or raw is requested without authentication.
1025    pub async fn subscribe_trades(
1026        &self,
1027        instrument_id: InstrumentId,
1028        interval: Option<DeribitUpdateInterval>,
1029    ) -> DeribitWsResult<()> {
1030        let interval = interval.unwrap_or_default();
1031        self.check_auth_requirement(interval)?;
1032        let channel =
1033            DeribitWsChannel::Trades.format_channel(instrument_id.symbol.as_str(), Some(interval));
1034        self.send_subscribe(vec![channel]).await
1035    }
1036
1037    /// Unsubscribes from trade updates for an instrument.
1038    ///
1039    /// # Errors
1040    ///
1041    /// Returns an error if unsubscription fails.
1042    pub async fn unsubscribe_trades(
1043        &self,
1044        instrument_id: InstrumentId,
1045        interval: Option<DeribitUpdateInterval>,
1046    ) -> DeribitWsResult<()> {
1047        let interval = interval.unwrap_or_default();
1048        let channel =
1049            DeribitWsChannel::Trades.format_channel(instrument_id.symbol.as_str(), Some(interval));
1050        self.send_unsubscribe(vec![channel]).await
1051    }
1052
1053    /// Subscribes to order book updates for an instrument.
1054    ///
1055    /// # Arguments
1056    ///
1057    /// * `instrument_id` - The instrument to subscribe to
1058    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
1059    ///
1060    /// # Errors
1061    ///
1062    /// Returns an error if subscription fails or raw is requested without authentication.
1063    pub async fn subscribe_book(
1064        &self,
1065        instrument_id: InstrumentId,
1066        interval: Option<DeribitUpdateInterval>,
1067    ) -> DeribitWsResult<()> {
1068        let interval = interval.unwrap_or_default();
1069        self.check_auth_requirement(interval)?;
1070        let channel =
1071            DeribitWsChannel::Book.format_channel(instrument_id.symbol.as_str(), Some(interval));
1072        self.send_subscribe(vec![channel]).await
1073    }
1074
1075    /// Unsubscribes from order book updates for an instrument.
1076    ///
1077    /// # Errors
1078    ///
1079    /// Returns an error if unsubscription fails.
1080    pub async fn unsubscribe_book(
1081        &self,
1082        instrument_id: InstrumentId,
1083        interval: Option<DeribitUpdateInterval>,
1084    ) -> DeribitWsResult<()> {
1085        let interval = interval.unwrap_or_default();
1086        let channel =
1087            DeribitWsChannel::Book.format_channel(instrument_id.symbol.as_str(), Some(interval));
1088        self.send_unsubscribe(vec![channel]).await
1089    }
1090
1091    /// Subscribes to grouped (depth-limited) order book updates for an instrument.
1092    ///
1093    /// Uses the Deribit grouped book channel format: `book.{instrument}.{group}.{depth}.{interval}`
1094    ///
1095    /// Depth is normalized to Deribit supported values: 1, 10, or 20.
1096    ///
1097    /// # Errors
1098    ///
1099    /// Returns an error if subscription fails or raw is requested without authentication.
1100    pub async fn subscribe_book_grouped(
1101        &self,
1102        instrument_id: InstrumentId,
1103        group: &str,
1104        depth: u32,
1105        interval: Option<DeribitUpdateInterval>,
1106    ) -> DeribitWsResult<()> {
1107        // Grouped book channel only supports 100ms and agg2, not raw
1108        let interval = match interval {
1109            Some(DeribitUpdateInterval::Raw) | None => DeribitUpdateInterval::Ms100,
1110            Some(i) => i,
1111        };
1112
1113        let normalized_depth = if depth < 5 {
1114            1
1115        } else if depth < 15 {
1116            10
1117        } else {
1118            20
1119        };
1120
1121        let channel = format!(
1122            "book.{}.{}.{}.{}",
1123            instrument_id.symbol,
1124            group,
1125            normalized_depth,
1126            interval.as_str()
1127        );
1128        log::debug!("Subscribing to grouped book channel: {channel}");
1129        self.send_subscribe(vec![channel]).await
1130    }
1131
1132    /// Unsubscribes from grouped (depth-limited) order book updates for an instrument.
1133    ///
1134    /// Depth is normalized to Deribit supported values: 1, 10, or 20.
1135    ///
1136    /// # Errors
1137    ///
1138    /// Returns an error if unsubscription fails.
1139    pub async fn unsubscribe_book_grouped(
1140        &self,
1141        instrument_id: InstrumentId,
1142        group: &str,
1143        depth: u32,
1144        interval: Option<DeribitUpdateInterval>,
1145    ) -> DeribitWsResult<()> {
1146        // Grouped book channel only supports 100ms and agg2, not raw
1147        let interval = match interval {
1148            Some(DeribitUpdateInterval::Raw) | None => DeribitUpdateInterval::Ms100,
1149            Some(i) => i,
1150        };
1151
1152        let normalized_depth = if depth < 5 {
1153            1
1154        } else if depth < 15 {
1155            10
1156        } else {
1157            20
1158        };
1159
1160        let channel = format!(
1161            "book.{}.{}.{}.{}",
1162            instrument_id.symbol,
1163            group,
1164            normalized_depth,
1165            interval.as_str()
1166        );
1167        self.send_unsubscribe(vec![channel]).await
1168    }
1169
1170    /// Subscribes to ticker updates for an instrument.
1171    ///
1172    /// # Arguments
1173    ///
1174    /// * `instrument_id` - The instrument to subscribe to
1175    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
1176    ///
1177    /// # Errors
1178    ///
1179    /// Returns an error if subscription fails or raw is requested without authentication.
1180    pub async fn subscribe_ticker(
1181        &self,
1182        instrument_id: InstrumentId,
1183        interval: Option<DeribitUpdateInterval>,
1184    ) -> DeribitWsResult<()> {
1185        let interval = interval.unwrap_or_default();
1186        self.check_auth_requirement(interval)?;
1187        let channel =
1188            DeribitWsChannel::Ticker.format_channel(instrument_id.symbol.as_str(), Some(interval));
1189        self.send_subscribe(vec![channel]).await
1190    }
1191
1192    /// Unsubscribes from ticker updates for an instrument.
1193    ///
1194    /// # Errors
1195    ///
1196    /// Returns an error if unsubscription fails.
1197    pub async fn unsubscribe_ticker(
1198        &self,
1199        instrument_id: InstrumentId,
1200        interval: Option<DeribitUpdateInterval>,
1201    ) -> DeribitWsResult<()> {
1202        let interval = interval.unwrap_or_default();
1203        let channel =
1204            DeribitWsChannel::Ticker.format_channel(instrument_id.symbol.as_str(), Some(interval));
1205        self.send_unsubscribe(vec![channel]).await
1206    }
1207
1208    /// Subscribes to quote (best bid/ask) updates for an instrument.
1209    ///
1210    /// Note: Quote channel does not support interval parameter.
1211    ///
1212    /// # Errors
1213    ///
1214    /// Returns an error if subscription fails.
1215    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
1216        let channel = DeribitWsChannel::Quote.format_channel(instrument_id.symbol.as_str(), None);
1217        self.send_subscribe(vec![channel]).await
1218    }
1219
1220    /// Unsubscribes from quote updates for an instrument.
1221    ///
1222    /// # Errors
1223    ///
1224    /// Returns an error if unsubscription fails.
1225    pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> DeribitWsResult<()> {
1226        let channel = DeribitWsChannel::Quote.format_channel(instrument_id.symbol.as_str(), None);
1227        self.send_unsubscribe(vec![channel]).await
1228    }
1229
1230    /// Subscribes to instrument status changes for lifecycle notifications.
1231    ///
1232    /// Channel format: `instrument.state.{kind}.{currency}`
1233    ///
1234    /// # Errors
1235    ///
1236    /// Returns an error if subscription fails.
1237    pub async fn subscribe_instrument_status(
1238        &self,
1239        kind: &str,
1240        currency: &str,
1241    ) -> DeribitWsResult<()> {
1242        let channel = DeribitWsChannel::format_instrument_state_channel(kind, currency);
1243        self.send_subscribe(vec![channel]).await
1244    }
1245
1246    /// Unsubscribes from instrument status changes.
1247    ///
1248    /// # Errors
1249    ///
1250    /// Returns an error if unsubscription fails.
1251    pub async fn unsubscribe_instrument_status(
1252        &self,
1253        kind: &str,
1254        currency: &str,
1255    ) -> DeribitWsResult<()> {
1256        let channel = DeribitWsChannel::format_instrument_state_channel(kind, currency);
1257        self.send_unsubscribe(vec![channel]).await
1258    }
1259
1260    /// Subscribes to perpetual interest rates updates.
1261    ///
1262    /// Channel format: `perpetual.{instrument_name}.{interval}`
1263    ///
1264    /// # Errors
1265    ///
1266    /// Returns an error if subscription fails.
1267    pub async fn subscribe_perpetual_interests_rates_updates(
1268        &self,
1269        instrument_id: InstrumentId,
1270        interval: Option<DeribitUpdateInterval>,
1271    ) -> DeribitWsResult<()> {
1272        let interval = interval.unwrap_or(DeribitUpdateInterval::Ms100);
1273        let channel = DeribitWsChannel::Perpetual
1274            .format_channel(instrument_id.symbol.as_str(), Some(interval));
1275
1276        self.send_subscribe(vec![channel]).await
1277    }
1278
1279    /// Unsubscribes from perpetual interest rates updates.
1280    ///
1281    /// # Errors
1282    ///
1283    /// Returns an error if subscription fails.
1284    pub async fn unsubscribe_perpetual_interest_rates_updates(
1285        &self,
1286        instrument_id: InstrumentId,
1287        interval: Option<DeribitUpdateInterval>,
1288    ) -> DeribitWsResult<()> {
1289        let interval = interval.unwrap_or(DeribitUpdateInterval::Ms100);
1290        let channel = DeribitWsChannel::Perpetual
1291            .format_channel(instrument_id.symbol.as_str(), Some(interval));
1292
1293        self.send_unsubscribe(vec![channel]).await
1294    }
1295
1296    /// Subscribes to chart/OHLC bar updates for an instrument.
1297    ///
1298    /// # Arguments
1299    ///
1300    /// * `instrument_id` - The instrument to subscribe to
1301    /// * `resolution` - Bar resolution: "1", "3", "5", "10", "15", "30", "60", "120", "180",
1302    ///   "360", "720", "1D" (minutes or 1D for daily)
1303    ///
1304    /// # Errors
1305    ///
1306    /// Returns an error if subscription fails.
1307    pub async fn subscribe_chart(
1308        &self,
1309        instrument_id: InstrumentId,
1310        resolution: &str,
1311    ) -> DeribitWsResult<()> {
1312        // Chart channel format: chart.trades.{instrument}.{resolution}
1313        let channel = format!("chart.trades.{}.{}", instrument_id.symbol, resolution);
1314        self.send_subscribe(vec![channel]).await
1315    }
1316
1317    /// Unsubscribes from chart/OHLC bar updates.
1318    ///
1319    /// # Errors
1320    ///
1321    /// Returns an error if unsubscription fails.
1322    pub async fn unsubscribe_chart(
1323        &self,
1324        instrument_id: InstrumentId,
1325        resolution: &str,
1326    ) -> DeribitWsResult<()> {
1327        let channel = format!("chart.trades.{}.{}", instrument_id.symbol, resolution);
1328        self.send_unsubscribe(vec![channel]).await
1329    }
1330
1331    /// Subscribes to bar updates for an instrument using a BarType specification.
1332    ///
1333    /// Converts the BarType to the nearest supported Deribit resolution and subscribes
1334    /// to the chart channel.
1335    ///
1336    /// # Errors
1337    ///
1338    /// Returns an error if the subscription request fails.
1339    pub async fn subscribe_bars(&self, bar_type: BarType) -> DeribitWsResult<()> {
1340        let resolution = bar_spec_to_resolution(&bar_type);
1341        self.subscribe_chart(bar_type.instrument_id(), &resolution)
1342            .await
1343    }
1344
1345    /// Unsubscribes from bar updates for an instrument using a BarType specification.
1346    ///
1347    /// # Errors
1348    ///
1349    /// Returns an error if the unsubscription request fails.
1350    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> DeribitWsResult<()> {
1351        let resolution = bar_spec_to_resolution(&bar_type);
1352        self.unsubscribe_chart(bar_type.instrument_id(), &resolution)
1353            .await
1354    }
1355
1356    /// Checks if authentication is required for the given interval.
1357    ///
1358    /// # Errors
1359    ///
1360    /// Returns an error if raw interval is requested but client is not authenticated.
1361    fn check_auth_requirement(&self, interval: DeribitUpdateInterval) -> DeribitWsResult<()> {
1362        if interval.requires_auth() && !self.is_authenticated() {
1363            return Err(DeribitWsError::Authentication(
1364                "Raw streams require authentication. Call authenticate() first.".to_string(),
1365            ));
1366        }
1367        Ok(())
1368    }
1369
1370    /// Subscribes to user order updates for all instruments.
1371    ///
1372    /// Requires authentication. Subscribes to `user.orders.any.any.raw` channel.
1373    ///
1374    /// # Errors
1375    ///
1376    /// Returns an error if client is not authenticated or subscription fails.
1377    pub async fn subscribe_user_orders(&self) -> DeribitWsResult<()> {
1378        if !self.is_authenticated() {
1379            return Err(DeribitWsError::Authentication(
1380                "User orders subscription requires authentication".to_string(),
1381            ));
1382        }
1383        self.send_subscribe(vec!["user.orders.any.any.raw".to_string()])
1384            .await
1385    }
1386
1387    /// Unsubscribes from user order updates for all instruments.
1388    ///
1389    /// # Errors
1390    ///
1391    /// Returns an error if unsubscription fails.
1392    pub async fn unsubscribe_user_orders(&self) -> DeribitWsResult<()> {
1393        self.send_unsubscribe(vec!["user.orders.any.any.raw".to_string()])
1394            .await
1395    }
1396
1397    /// Subscribes to user trade/fill updates for all instruments.
1398    ///
1399    /// Requires authentication. Subscribes to `user.trades.any.any.raw` channel.
1400    ///
1401    /// # Errors
1402    ///
1403    /// Returns an error if client is not authenticated or subscription fails.
1404    pub async fn subscribe_user_trades(&self) -> DeribitWsResult<()> {
1405        if !self.is_authenticated() {
1406            return Err(DeribitWsError::Authentication(
1407                "User trades subscription requires authentication".to_string(),
1408            ));
1409        }
1410        self.send_subscribe(vec!["user.trades.any.any.raw".to_string()])
1411            .await
1412    }
1413
1414    /// Unsubscribes from user trade/fill updates for all instruments.
1415    ///
1416    /// # Errors
1417    ///
1418    /// Returns an error if unsubscription fails.
1419    pub async fn unsubscribe_user_trades(&self) -> DeribitWsResult<()> {
1420        self.send_unsubscribe(vec!["user.trades.any.any.raw".to_string()])
1421            .await
1422    }
1423
1424    /// Subscribes to user portfolio updates for all currencies.
1425    ///
1426    /// Requires authentication. Subscribes to `user.portfolio.any` channel which
1427    /// provides real-time account balance and margin updates for all currencies
1428    /// (BTC, ETH, USDC, USDT, etc.).
1429    ///
1430    /// # Errors
1431    ///
1432    /// Returns an error if client is not authenticated or subscription fails.
1433    pub async fn subscribe_user_portfolio(&self) -> DeribitWsResult<()> {
1434        if !self.is_authenticated() {
1435            return Err(DeribitWsError::Authentication(
1436                "User portfolio subscription requires authentication".to_string(),
1437            ));
1438        }
1439        self.send_subscribe(vec!["user.portfolio.any".to_string()])
1440            .await
1441    }
1442
1443    /// Unsubscribes from user portfolio updates for all currencies.
1444    ///
1445    /// # Errors
1446    ///
1447    /// Returns an error if unsubscription fails.
1448    pub async fn unsubscribe_user_portfolio(&self) -> DeribitWsResult<()> {
1449        self.send_unsubscribe(vec!["user.portfolio.any".to_string()])
1450            .await
1451    }
1452
1453    /// Subscribes to multiple channels at once.
1454    ///
1455    /// # Errors
1456    ///
1457    /// Returns an error if subscription fails.
1458    pub async fn subscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
1459        self.send_subscribe(channels).await
1460    }
1461
1462    /// Unsubscribes from multiple channels at once.
1463    ///
1464    /// # Errors
1465    ///
1466    /// Returns an error if unsubscription fails.
1467    pub async fn unsubscribe(&self, channels: Vec<String>) -> DeribitWsResult<()> {
1468        self.send_unsubscribe(channels).await
1469    }
1470
1471    /// Submits an order to Deribit via WebSocket.
1472    ///
1473    /// Routes to `private/buy` or `private/sell` JSON-RPC method based on order side.
1474    /// Requires authentication (call `authenticate_session()` first).
1475    ///
1476    /// # Errors
1477    ///
1478    /// Returns an error if:
1479    /// - The client is not authenticated
1480    /// - The command fails to send
1481    pub async fn submit_order(
1482        &self,
1483        order_side: OrderSide,
1484        params: DeribitOrderParams,
1485        client_order_id: ClientOrderId,
1486        trader_id: TraderId,
1487        strategy_id: StrategyId,
1488        instrument_id: InstrumentId,
1489    ) -> DeribitWsResult<()> {
1490        if !self.is_authenticated() {
1491            return Err(DeribitWsError::Authentication(
1492                "Submit order requires authentication. Call authenticate_session() first."
1493                    .to_string(),
1494            ));
1495        }
1496
1497        log::debug!(
1498            "Sending {} order: instrument={}, amount={}, price={:?}, client_order_id={}",
1499            order_side,
1500            params.instrument_name,
1501            params.amount,
1502            params.price,
1503            client_order_id
1504        );
1505
1506        let cmd = match order_side {
1507            OrderSide::Buy => HandlerCommand::Buy {
1508                params,
1509                client_order_id,
1510                trader_id,
1511                strategy_id,
1512                instrument_id,
1513            },
1514            OrderSide::Sell => HandlerCommand::Sell {
1515                params,
1516                client_order_id,
1517                trader_id,
1518                strategy_id,
1519                instrument_id,
1520            },
1521            _ => {
1522                return Err(DeribitWsError::ClientError(format!(
1523                    "Invalid order side: {order_side}"
1524                )));
1525            }
1526        };
1527
1528        self.cmd_tx
1529            .read()
1530            .await
1531            .send(cmd)
1532            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1533
1534        Ok(())
1535    }
1536
1537    /// Modifies an existing order on Deribit via WebSocket.
1538    ///
1539    /// The order parameters are sent using the `private/edit` JSON-RPC method.
1540    /// Requires authentication (call `authenticate_session()` first).
1541    ///
1542    /// # Errors
1543    ///
1544    /// Returns an error if:
1545    /// - The client is not authenticated
1546    /// - The command fails to send
1547    #[expect(clippy::too_many_arguments)]
1548    pub async fn modify_order(
1549        &self,
1550        order_id: &str,
1551        quantity: Quantity,
1552        price: Price,
1553        client_order_id: ClientOrderId,
1554        trader_id: TraderId,
1555        strategy_id: StrategyId,
1556        instrument_id: InstrumentId,
1557    ) -> DeribitWsResult<()> {
1558        if !self.is_authenticated() {
1559            return Err(DeribitWsError::Authentication(
1560                "Modify order requires authentication. Call authenticate_session() first."
1561                    .to_string(),
1562            ));
1563        }
1564
1565        let params = DeribitEditParams {
1566            order_id: order_id.to_string(),
1567            amount: quantity.as_decimal(),
1568            price: Some(price.as_decimal()),
1569            post_only: None,
1570            reject_post_only: None,
1571            reduce_only: None,
1572            trigger_price: None,
1573        };
1574
1575        log::debug!(
1576            "Sending modify order: order_id={order_id}, quantity={quantity}, price={price}, client_order_id={client_order_id}"
1577        );
1578
1579        self.cmd_tx
1580            .read()
1581            .await
1582            .send(HandlerCommand::Edit {
1583                params,
1584                client_order_id,
1585                trader_id,
1586                strategy_id,
1587                instrument_id,
1588            })
1589            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1590
1591        Ok(())
1592    }
1593
1594    /// Cancels an existing order on Deribit via WebSocket.
1595    ///
1596    /// The order is cancelled using the `private/cancel` JSON-RPC method.
1597    /// Requires authentication (call `authenticate_session()` first).
1598    ///
1599    /// # Errors
1600    ///
1601    /// Returns an error if:
1602    /// - The client is not authenticated
1603    /// - The command fails to send
1604    pub async fn cancel_order(
1605        &self,
1606        order_id: &str,
1607        client_order_id: ClientOrderId,
1608        trader_id: TraderId,
1609        strategy_id: StrategyId,
1610        instrument_id: InstrumentId,
1611    ) -> DeribitWsResult<()> {
1612        if !self.is_authenticated() {
1613            return Err(DeribitWsError::Authentication(
1614                "Cancel order requires authentication. Call authenticate_session() first."
1615                    .to_string(),
1616            ));
1617        }
1618
1619        let params = DeribitCancelParams {
1620            order_id: order_id.to_string(),
1621        };
1622
1623        log::debug!("Sending cancel order: order_id={order_id}, client_order_id={client_order_id}");
1624
1625        self.cmd_tx
1626            .read()
1627            .await
1628            .send(HandlerCommand::Cancel {
1629                params,
1630                client_order_id,
1631                trader_id,
1632                strategy_id,
1633                instrument_id,
1634            })
1635            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1636
1637        Ok(())
1638    }
1639
1640    /// Cancels all orders for a specific instrument on Deribit via WebSocket.
1641    ///
1642    /// Uses the `private/cancel_all_by_instrument` JSON-RPC method.
1643    /// Requires authentication (call `authenticate_session()` first).
1644    ///
1645    /// # Errors
1646    ///
1647    /// Returns an error if:
1648    /// - The client is not authenticated
1649    /// - The command fails to send
1650    pub async fn cancel_all_orders(
1651        &self,
1652        instrument_id: InstrumentId,
1653        order_type: Option<String>,
1654    ) -> DeribitWsResult<()> {
1655        if !self.is_authenticated() {
1656            return Err(DeribitWsError::Authentication(
1657                "Cancel all orders requires authentication. Call authenticate_session() first."
1658                    .to_string(),
1659            ));
1660        }
1661
1662        let instrument_name = instrument_id.symbol.to_string();
1663        let params = DeribitCancelAllByInstrumentParams {
1664            instrument_name: instrument_name.clone(),
1665            order_type,
1666        };
1667
1668        log::debug!("Sending cancel_all_orders: instrument={instrument_name}");
1669
1670        self.cmd_tx
1671            .read()
1672            .await
1673            .send(HandlerCommand::CancelAllByInstrument {
1674                params,
1675                instrument_id,
1676            })
1677            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1678
1679        Ok(())
1680    }
1681
1682    /// Queries the state of an order on Deribit via WebSocket.
1683    ///
1684    /// Uses the `private/get_order_state` JSON-RPC method.
1685    /// Requires authentication (call `authenticate_session()` first).
1686    ///
1687    /// # Errors
1688    ///
1689    /// Returns an error if:
1690    /// - The client is not authenticated
1691    /// - The command fails to send
1692    pub async fn query_order(
1693        &self,
1694        order_id: &str,
1695        client_order_id: ClientOrderId,
1696        trader_id: TraderId,
1697        strategy_id: StrategyId,
1698        instrument_id: InstrumentId,
1699    ) -> DeribitWsResult<()> {
1700        if !self.is_authenticated() {
1701            return Err(DeribitWsError::Authentication(
1702                "Query order state requires authentication. Call authenticate_session() first."
1703                    .to_string(),
1704            ));
1705        }
1706
1707        log::debug!("Sending query_order: order_id={order_id}, client_order_id={client_order_id}");
1708
1709        self.cmd_tx
1710            .read()
1711            .await
1712            .send(HandlerCommand::GetOrderState {
1713                order_id: order_id.to_string(),
1714                client_order_id,
1715                trader_id,
1716                strategy_id,
1717                instrument_id,
1718            })
1719            .map_err(|e| DeribitWsError::Send(e.to_string()))?;
1720
1721        Ok(())
1722    }
1723}