Skip to main content

nautilus_deribit/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Deribit WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - Connection mode and signal are shared via Arc.
25//!
26//! ## Connection Flow
27//!
28//! 1. Clone the client for async operation.
29//! 2. Connect and populate shared state on the clone.
30//! 3. Spawn stream handler as background task.
31//! 4. Return immediately (non-blocking).
32//!
33//! ## Important Notes
34//!
35//! - Never use `block_on()` - it blocks the runtime.
36//! - Always clone before async blocks for lifetime requirements.
37
38use futures_util::StreamExt;
39use nautilus_common::live::get_runtime;
40use nautilus_core::python::{call_python_threadsafe, to_pyruntime_err, to_pyvalue_err};
41use nautilus_model::{
42    data::{BarType, Data, OrderBookDeltas_API},
43    enums::{OrderSide, OrderType, TimeInForce, TriggerType},
44    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
45    python::{
46        data::data_to_pycapsule,
47        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
48    },
49    types::{Price, Quantity},
50};
51use nautilus_network::websocket::TransportBackend;
52use pyo3::{IntoPyObjectExt, prelude::*};
53
54use crate::{
55    common::{
56        enums::{DeribitEnvironment, DeribitTimeInForce, resolve_trigger_type},
57        parse::parse_instrument_kind_currency,
58    },
59    websocket::{
60        client::DeribitWebSocketClient,
61        enums::DeribitUpdateInterval,
62        messages::{DeribitOrderParams, NautilusWsMessage},
63    },
64};
65
66fn call_python_with_data<F>(call_soon: &Py<PyAny>, callback: &Py<PyAny>, data_converter: F)
67where
68    F: FnOnce(Python) -> PyResult<Py<PyAny>>,
69{
70    Python::attach(|py| match data_converter(py) {
71        Ok(py_obj) => call_python_threadsafe(py, call_soon, callback, py_obj),
72        Err(e) => log::error!("Failed to convert data to Python object: {e}"),
73    });
74}
75
76#[pymethods]
77#[pyo3_stub_gen::derive::gen_stub_pymethods]
78impl DeribitWebSocketClient {
79    /// WebSocket client for connecting to Deribit.
80    #[new]
81    #[pyo3(signature = (
82        url=None,
83        api_key=None,
84        api_secret=None,
85        heartbeat_interval=30,
86        environment=DeribitEnvironment::Mainnet,
87        proxy_url=None,
88    ))]
89    fn py_new(
90        url: Option<String>,
91        api_key: Option<String>,
92        api_secret: Option<String>,
93        heartbeat_interval: u64,
94        environment: DeribitEnvironment,
95        proxy_url: Option<String>,
96    ) -> PyResult<Self> {
97        Self::new(
98            url,
99            api_key,
100            api_secret,
101            heartbeat_interval,
102            environment,
103            TransportBackend::default(),
104            proxy_url,
105        )
106        .map_err(to_pyvalue_err)
107    }
108
109    /// Creates a new public (unauthenticated) client.
110    ///
111    /// Does NOT fall back to environment variables for credentials.
112    ///
113    /// # Errors
114    ///
115    /// Returns an error if initialization fails.
116    #[staticmethod]
117    #[pyo3(name = "new_public", signature = (environment, proxy_url = None))]
118    fn py_new_public(environment: DeribitEnvironment, proxy_url: Option<String>) -> PyResult<Self> {
119        Self::new_public(environment, proxy_url).map_err(to_pyvalue_err)
120    }
121
122    /// Creates an authenticated client with credentials.
123    ///
124    /// Uses environment variables to load credentials:
125    /// - Testnet: `DERIBIT_TESTNET_API_KEY` and `DERIBIT_TESTNET_API_SECRET`
126    /// - Mainnet: `DERIBIT_API_KEY` and `DERIBIT_API_SECRET`
127    #[staticmethod]
128    #[pyo3(name = "with_credentials", signature = (environment, account_id = None, proxy_url = None))]
129    fn py_with_credentials(
130        environment: DeribitEnvironment,
131        account_id: Option<AccountId>,
132        proxy_url: Option<String>,
133    ) -> PyResult<Self> {
134        let mut client = Self::with_credentials(environment, proxy_url).map_err(to_pyvalue_err)?;
135
136        if let Some(id) = account_id {
137            client.set_account_id(id);
138        }
139        Ok(client)
140    }
141
142    /// Returns the WebSocket URL.
143    #[getter]
144    #[pyo3(name = "url")]
145    #[must_use]
146    pub fn py_url(&self) -> String {
147        self.url().to_string()
148    }
149
150    #[getter]
151    #[pyo3(name = "is_testnet")]
152    #[must_use]
153    pub fn py_is_testnet(&self) -> bool {
154        self.environment() == DeribitEnvironment::Testnet
155    }
156
157    /// Returns whether the client is actively connected.
158    #[pyo3(name = "is_active")]
159    #[must_use]
160    fn py_is_active(&self) -> bool {
161        self.is_active()
162    }
163
164    /// Returns whether the client is closed.
165    #[pyo3(name = "is_closed")]
166    #[must_use]
167    fn py_is_closed(&self) -> bool {
168        self.is_closed()
169    }
170
171    /// Returns whether the client has credentials configured.
172    #[pyo3(name = "has_credentials")]
173    #[must_use]
174    fn py_has_credentials(&self) -> bool {
175        self.has_credentials()
176    }
177
178    /// Returns whether the client is authenticated.
179    #[pyo3(name = "is_authenticated")]
180    #[must_use]
181    fn py_is_authenticated(&self) -> bool {
182        self.is_authenticated()
183    }
184
185    /// Cancel all pending WebSocket requests.
186    #[pyo3(name = "cancel_all_requests")]
187    pub fn py_cancel_all_requests(&self) {
188        self.cancel_all_requests();
189    }
190
191    /// Caches instruments for use during message parsing.
192    #[pyo3(name = "cache_instruments")]
193    pub fn py_cache_instruments(
194        &self,
195        py: Python<'_>,
196        instruments: Vec<Py<PyAny>>,
197    ) -> PyResult<()> {
198        let instruments: Result<Vec<_>, _> = instruments
199            .into_iter()
200            .map(|inst| pyobject_to_instrument_any(py, inst))
201            .collect();
202        self.cache_instruments(&instruments?);
203        Ok(())
204    }
205
206    /// Caches a single instrument.
207    #[pyo3(name = "cache_instrument")]
208    pub fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
209        let inst = pyobject_to_instrument_any(py, instrument)?;
210        self.cache_instrument(inst);
211        Ok(())
212    }
213
214    /// Sets the account ID for order/fill reports.
215    #[pyo3(name = "set_account_id")]
216    pub fn py_set_account_id(&mut self, account_id: AccountId) {
217        self.set_account_id(account_id);
218    }
219
220    /// Sets whether bar timestamps should use the close time.
221    ///
222    /// When `true` (default), bar `ts_event` is set to the bar's close time.
223    #[pyo3(name = "set_bars_timestamp_on_close")]
224    pub fn py_set_bars_timestamp_on_close(&mut self, value: bool) {
225        self.set_bars_timestamp_on_close(value);
226    }
227
228    /// Connects to the Deribit WebSocket API.
229    #[pyo3(name = "connect")]
230    #[expect(clippy::needless_pass_by_value)]
231    fn py_connect<'py>(
232        &mut self,
233        py: Python<'py>,
234        loop_: Py<PyAny>,
235        instruments: Vec<Py<PyAny>>,
236        callback: Py<PyAny>,
237    ) -> PyResult<Bound<'py, PyAny>> {
238        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
239
240        let mut instruments_any = Vec::new();
241
242        for inst in instruments {
243            let inst_any = pyobject_to_instrument_any(py, inst)?;
244            instruments_any.push(inst_any);
245        }
246
247        self.cache_instruments(&instruments_any);
248
249        let mut client = self.clone();
250
251        pyo3_async_runtimes::tokio::future_into_py(py, async move {
252            client.connect().await.map_err(to_pyruntime_err)?;
253
254            let stream = client.stream().map_err(to_pyruntime_err)?;
255
256            // Keep client alive in the spawned task to prevent handler from dropping
257            get_runtime().spawn(async move {
258                let _client = client;
259                tokio::pin!(stream);
260
261                while let Some(msg) = stream.next().await {
262                    match msg {
263                        NautilusWsMessage::Instrument(msg) => {
264                            call_python_with_data(&call_soon, &callback, |py| {
265                                instrument_any_to_pyobject(py, *msg)
266                            });
267                        }
268                        NautilusWsMessage::Data(msg) => Python::attach(|py| {
269                            for data in msg {
270                                let py_obj = data_to_pycapsule(py, data);
271                                call_python_threadsafe(py, &call_soon, &callback, py_obj);
272                            }
273                        }),
274                        NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
275                            let py_obj =
276                                data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
277                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
278                        }),
279                        NautilusWsMessage::Error(err) => {
280                            log::error!("WebSocket error: {err}");
281                        }
282                        NautilusWsMessage::Reconnected => {
283                            log::info!("WebSocket reconnected");
284                        }
285                        NautilusWsMessage::Authenticated(auth_result) => {
286                            log::info!("WebSocket authenticated (scope: {})", auth_result.scope);
287                        }
288                        NautilusWsMessage::InstrumentStatus(status) => {
289                            call_python_with_data(&call_soon, &callback, |py| {
290                                status.into_py_any(py)
291                            });
292                        }
293                        NautilusWsMessage::Raw(msg) => {
294                            log::debug!("Received raw message, skipping: {msg}");
295                        }
296                        NautilusWsMessage::FundingRates(funding_rates) => Python::attach(|py| {
297                            for funding_rate in funding_rates {
298                                match Py::new(py, funding_rate) {
299                                    Ok(py_obj) => call_python_threadsafe(
300                                        py,
301                                        &call_soon,
302                                        &callback,
303                                        py_obj.into_any(),
304                                    ),
305                                    Err(e) => {
306                                        log::error!("Failed to create FundingRateUpdate: {e}");
307                                    }
308                                }
309                            }
310                        }),
311                        NautilusWsMessage::OptionGreeks(greeks) => {
312                            call_python_with_data(&call_soon, &callback, |py| {
313                                Py::new(py, greeks).map(|obj| obj.into_any())
314                            });
315                        }
316                        // Execution events - route to Python callback
317                        NautilusWsMessage::OrderStatusReports(reports) => Python::attach(|py| {
318                            for report in reports {
319                                match Py::new(py, report) {
320                                    Ok(py_obj) => call_python_threadsafe(
321                                        py,
322                                        &call_soon,
323                                        &callback,
324                                        py_obj.into_any(),
325                                    ),
326                                    Err(e) => {
327                                        log::error!("Failed to create OrderStatusReport: {e}");
328                                    }
329                                }
330                            }
331                        }),
332                        NautilusWsMessage::FillReports(reports) => Python::attach(|py| {
333                            for report in reports {
334                                match Py::new(py, report) {
335                                    Ok(py_obj) => call_python_threadsafe(
336                                        py,
337                                        &call_soon,
338                                        &callback,
339                                        py_obj.into_any(),
340                                    ),
341                                    Err(e) => log::error!("Failed to create FillReport: {e}"),
342                                }
343                            }
344                        }),
345                        NautilusWsMessage::OrderRejected(msg) => {
346                            call_python_with_data(&call_soon, &callback, |py| msg.into_py_any(py));
347                        }
348                        NautilusWsMessage::OrderAccepted(msg) => {
349                            call_python_with_data(&call_soon, &callback, |py| msg.into_py_any(py));
350                        }
351                        NautilusWsMessage::OrderCanceled(msg) => {
352                            call_python_with_data(&call_soon, &callback, |py| msg.into_py_any(py));
353                        }
354                        NautilusWsMessage::OrderExpired(msg) => {
355                            call_python_with_data(&call_soon, &callback, |py| msg.into_py_any(py));
356                        }
357                        NautilusWsMessage::OrderUpdated(msg) => {
358                            call_python_with_data(&call_soon, &callback, |py| msg.into_py_any(py));
359                        }
360                        NautilusWsMessage::OrderCancelRejected(msg) => {
361                            call_python_with_data(&call_soon, &callback, |py| msg.into_py_any(py));
362                        }
363                        NautilusWsMessage::OrderModifyRejected(msg) => {
364                            call_python_with_data(&call_soon, &callback, |py| msg.into_py_any(py));
365                        }
366                        NautilusWsMessage::AccountState(msg) => {
367                            call_python_with_data(&call_soon, &callback, |py| msg.into_py_any(py));
368                        }
369                        NautilusWsMessage::AuthenticationFailed(reason) => {
370                            log::error!("Authentication failed: {reason}");
371                        }
372                    }
373                }
374            });
375
376            Ok(())
377        })
378    }
379
380    /// Waits until the client is active or timeout expires.
381    #[pyo3(name = "wait_until_active")]
382    fn py_wait_until_active<'py>(
383        &self,
384        py: Python<'py>,
385        timeout_secs: f64,
386    ) -> PyResult<Bound<'py, PyAny>> {
387        let client = self.clone();
388
389        pyo3_async_runtimes::tokio::future_into_py(py, async move {
390            client
391                .wait_until_active(timeout_secs)
392                .await
393                .map_err(to_pyruntime_err)?;
394            Ok(())
395        })
396    }
397
398    /// Closes the WebSocket connection.
399    ///
400    /// # Errors
401    ///
402    /// Returns an error if the close operation fails.
403    #[pyo3(name = "close")]
404    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
405        let client = self.clone();
406
407        pyo3_async_runtimes::tokio::future_into_py(py, async move {
408            if let Err(e) = client.close().await {
409                log::error!("Error on close: {e}");
410            }
411            Ok(())
412        })
413    }
414
415    /// Authenticates the WebSocket session with Deribit.
416    ///
417    /// Uses the `client_signature` grant type with HMAC-SHA256 signature.
418    /// This must be called before subscribing to raw data streams.
419    ///
420    /// # Arguments
421    ///
422    /// * `session_name` - Optional session name for session-scoped authentication.
423    ///   When provided, uses `session:<name>` scope which allows skipping `access_token`
424    ///   in subsequent private requests. When `None`, uses default `connection` scope.
425    ///   Recommended to use session scope for order execution compatibility.
426    #[pyo3(name = "authenticate")]
427    #[pyo3(signature = (session_name=None))]
428    fn py_authenticate<'py>(
429        &self,
430        py: Python<'py>,
431        session_name: Option<String>,
432    ) -> PyResult<Bound<'py, PyAny>> {
433        let client = self.clone();
434
435        pyo3_async_runtimes::tokio::future_into_py(py, async move {
436            client
437                .authenticate(session_name.as_deref())
438                .await
439                .map_err(to_pyruntime_err)?;
440            Ok(())
441        })
442    }
443
444    /// Authenticates with session scope using the provided session name.
445    ///
446    /// Use `DERIBIT_DATA_SESSION_NAME` for data clients and
447    /// `DERIBIT_EXECUTION_SESSION_NAME` for execution clients.
448    #[pyo3(name = "authenticate_session")]
449    fn py_authenticate_session<'py>(
450        &self,
451        py: Python<'py>,
452        session_name: String,
453    ) -> PyResult<Bound<'py, PyAny>> {
454        let client = self.clone();
455
456        pyo3_async_runtimes::tokio::future_into_py(py, async move {
457            client
458                .authenticate_session(&session_name)
459                .await
460                .map_err(|e| {
461                    to_pyruntime_err(format!(
462                        "Failed to authenticate Deribit websocket session '{session_name}': {e}"
463                    ))
464                })?;
465            Ok(())
466        })
467    }
468
469    /// Subscribes to trade updates for an instrument.
470    ///
471    /// # Arguments
472    ///
473    /// * `instrument_id` - The instrument to subscribe to
474    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
475    #[pyo3(name = "subscribe_trades")]
476    #[pyo3(signature = (instrument_id, interval=None))]
477    fn py_subscribe_trades<'py>(
478        &self,
479        py: Python<'py>,
480        instrument_id: InstrumentId,
481        interval: Option<DeribitUpdateInterval>,
482    ) -> PyResult<Bound<'py, PyAny>> {
483        let client = self.clone();
484
485        pyo3_async_runtimes::tokio::future_into_py(py, async move {
486            client
487                .subscribe_trades(instrument_id, interval)
488                .await
489                .map_err(to_pyvalue_err)
490        })
491    }
492
493    /// Unsubscribes from trade updates for an instrument.
494    #[pyo3(name = "unsubscribe_trades")]
495    #[pyo3(signature = (instrument_id, interval=None))]
496    fn py_unsubscribe_trades<'py>(
497        &self,
498        py: Python<'py>,
499        instrument_id: InstrumentId,
500        interval: Option<DeribitUpdateInterval>,
501    ) -> PyResult<Bound<'py, PyAny>> {
502        let client = self.clone();
503
504        pyo3_async_runtimes::tokio::future_into_py(py, async move {
505            client
506                .unsubscribe_trades(instrument_id, interval)
507                .await
508                .map_err(to_pyvalue_err)
509        })
510    }
511
512    /// Subscribes to order book updates for an instrument.
513    ///
514    /// # Arguments
515    ///
516    /// * `instrument_id` - The instrument to subscribe to
517    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
518    #[pyo3(name = "subscribe_book")]
519    #[pyo3(signature = (instrument_id, interval=None, depth=None))]
520    fn py_subscribe_book<'py>(
521        &self,
522        py: Python<'py>,
523        instrument_id: InstrumentId,
524        interval: Option<DeribitUpdateInterval>,
525        depth: Option<u32>,
526    ) -> PyResult<Bound<'py, PyAny>> {
527        let client = self.clone();
528
529        pyo3_async_runtimes::tokio::future_into_py(py, async move {
530            if let Some(d) = depth {
531                client
532                    .subscribe_book_grouped(instrument_id, "none", d, interval)
533                    .await
534                    .map_err(to_pyvalue_err)
535            } else {
536                client
537                    .subscribe_book(instrument_id, interval)
538                    .await
539                    .map_err(to_pyvalue_err)
540            }
541        })
542    }
543
544    /// Unsubscribes from order book updates for an instrument.
545    #[pyo3(name = "unsubscribe_book")]
546    #[pyo3(signature = (instrument_id, interval=None, depth=None))]
547    fn py_unsubscribe_book<'py>(
548        &self,
549        py: Python<'py>,
550        instrument_id: InstrumentId,
551        interval: Option<DeribitUpdateInterval>,
552        depth: Option<u32>,
553    ) -> PyResult<Bound<'py, PyAny>> {
554        let client = self.clone();
555
556        pyo3_async_runtimes::tokio::future_into_py(py, async move {
557            if let Some(d) = depth {
558                client
559                    .unsubscribe_book_grouped(instrument_id, "none", d, interval)
560                    .await
561                    .map_err(to_pyvalue_err)
562            } else {
563                client
564                    .unsubscribe_book(instrument_id, interval)
565                    .await
566                    .map_err(to_pyvalue_err)
567            }
568        })
569    }
570
571    /// Subscribes to grouped (depth-limited) order book updates for an instrument.
572    ///
573    /// Uses the Deribit grouped book channel format: `book.{instrument}.{group}.{depth}.{interval}`
574    ///
575    /// Depth is normalized to Deribit supported values: 1, 10, or 20.
576    #[pyo3(name = "subscribe_book_grouped")]
577    #[pyo3(signature = (instrument_id, group, depth, interval=None))]
578    fn py_subscribe_book_grouped<'py>(
579        &self,
580        py: Python<'py>,
581        instrument_id: InstrumentId,
582        group: String,
583        depth: u32,
584        interval: Option<DeribitUpdateInterval>,
585    ) -> PyResult<Bound<'py, PyAny>> {
586        let client = self.clone();
587
588        pyo3_async_runtimes::tokio::future_into_py(py, async move {
589            client
590                .subscribe_book_grouped(instrument_id, &group, depth, interval)
591                .await
592                .map_err(to_pyvalue_err)
593        })
594    }
595
596    /// Unsubscribes from grouped (depth-limited) order book updates for an instrument.
597    ///
598    /// Depth is normalized to Deribit supported values: 1, 10, or 20.
599    #[pyo3(name = "unsubscribe_book_grouped")]
600    #[pyo3(signature = (instrument_id, group, depth, interval=None))]
601    fn py_unsubscribe_book_grouped<'py>(
602        &self,
603        py: Python<'py>,
604        instrument_id: InstrumentId,
605        group: String,
606        depth: u32,
607        interval: Option<DeribitUpdateInterval>,
608    ) -> PyResult<Bound<'py, PyAny>> {
609        let client = self.clone();
610
611        pyo3_async_runtimes::tokio::future_into_py(py, async move {
612            client
613                .unsubscribe_book_grouped(instrument_id, &group, depth, interval)
614                .await
615                .map_err(to_pyvalue_err)
616        })
617    }
618
619    /// Subscribes to ticker updates for an instrument.
620    ///
621    /// # Arguments
622    ///
623    /// * `instrument_id` - The instrument to subscribe to
624    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
625    #[pyo3(name = "subscribe_ticker")]
626    #[pyo3(signature = (instrument_id, interval=None))]
627    fn py_subscribe_ticker<'py>(
628        &self,
629        py: Python<'py>,
630        instrument_id: InstrumentId,
631        interval: Option<DeribitUpdateInterval>,
632    ) -> PyResult<Bound<'py, PyAny>> {
633        let client = self.clone();
634
635        pyo3_async_runtimes::tokio::future_into_py(py, async move {
636            client
637                .subscribe_ticker(instrument_id, interval)
638                .await
639                .map_err(to_pyvalue_err)
640        })
641    }
642
643    /// Unsubscribes from ticker updates for an instrument.
644    #[pyo3(name = "unsubscribe_ticker")]
645    #[pyo3(signature = (instrument_id, interval=None))]
646    fn py_unsubscribe_ticker<'py>(
647        &self,
648        py: Python<'py>,
649        instrument_id: InstrumentId,
650        interval: Option<DeribitUpdateInterval>,
651    ) -> PyResult<Bound<'py, PyAny>> {
652        let client = self.clone();
653
654        pyo3_async_runtimes::tokio::future_into_py(py, async move {
655            client
656                .unsubscribe_ticker(instrument_id, interval)
657                .await
658                .map_err(to_pyvalue_err)
659        })
660    }
661
662    /// Subscribes to mark prices for the given instrument.
663    ///
664    /// Registers the instrument in the `mark_price_subs` set so the handler
665    /// emits `MarkPriceUpdate` from ticker messages, then subscribes to the ticker channel.
666    #[pyo3(name = "subscribe_mark_prices")]
667    #[pyo3(signature = (instrument_id, interval=None))]
668    fn py_subscribe_mark_prices<'py>(
669        &self,
670        py: Python<'py>,
671        instrument_id: InstrumentId,
672        interval: Option<DeribitUpdateInterval>,
673    ) -> PyResult<Bound<'py, PyAny>> {
674        self.add_mark_price_sub(instrument_id);
675        let client = self.clone();
676
677        pyo3_async_runtimes::tokio::future_into_py(py, async move {
678            client
679                .subscribe_ticker(instrument_id, interval)
680                .await
681                .map_err(to_pyvalue_err)
682        })
683    }
684
685    /// Unsubscribes from mark prices for the given instrument.
686    ///
687    /// Removes the instrument from the `mark_price_subs` set and unsubscribes
688    /// from the ticker channel.
689    #[pyo3(name = "unsubscribe_mark_prices")]
690    #[pyo3(signature = (instrument_id, interval=None))]
691    fn py_unsubscribe_mark_prices<'py>(
692        &self,
693        py: Python<'py>,
694        instrument_id: InstrumentId,
695        interval: Option<DeribitUpdateInterval>,
696    ) -> PyResult<Bound<'py, PyAny>> {
697        self.remove_mark_price_sub(&instrument_id);
698        let client = self.clone();
699
700        pyo3_async_runtimes::tokio::future_into_py(py, async move {
701            client
702                .unsubscribe_ticker(instrument_id, interval)
703                .await
704                .map_err(to_pyvalue_err)
705        })
706    }
707
708    /// Subscribes to index prices for the given instrument.
709    ///
710    /// Registers the instrument in the `index_price_subs` set so the handler
711    /// emits `IndexPriceUpdate` from ticker messages, then subscribes to the ticker channel.
712    #[pyo3(name = "subscribe_index_prices")]
713    #[pyo3(signature = (instrument_id, interval=None))]
714    fn py_subscribe_index_prices<'py>(
715        &self,
716        py: Python<'py>,
717        instrument_id: InstrumentId,
718        interval: Option<DeribitUpdateInterval>,
719    ) -> PyResult<Bound<'py, PyAny>> {
720        self.add_index_price_sub(instrument_id);
721        let client = self.clone();
722
723        pyo3_async_runtimes::tokio::future_into_py(py, async move {
724            client
725                .subscribe_ticker(instrument_id, interval)
726                .await
727                .map_err(to_pyvalue_err)
728        })
729    }
730
731    /// Unsubscribes from index prices for the given instrument.
732    ///
733    /// Removes the instrument from the `index_price_subs` set and unsubscribes
734    /// from the ticker channel.
735    #[pyo3(name = "unsubscribe_index_prices")]
736    #[pyo3(signature = (instrument_id, interval=None))]
737    fn py_unsubscribe_index_prices<'py>(
738        &self,
739        py: Python<'py>,
740        instrument_id: InstrumentId,
741        interval: Option<DeribitUpdateInterval>,
742    ) -> PyResult<Bound<'py, PyAny>> {
743        self.remove_index_price_sub(&instrument_id);
744        let client = self.clone();
745
746        pyo3_async_runtimes::tokio::future_into_py(py, async move {
747            client
748                .unsubscribe_ticker(instrument_id, interval)
749                .await
750                .map_err(to_pyvalue_err)
751        })
752    }
753
754    /// Subscribes to option greeks for the given instrument.
755    ///
756    /// Registers the instrument in the `option_greeks_subs` set so the handler
757    /// emits `OptionGreeks` from ticker messages, then subscribes to the ticker channel.
758    #[pyo3(name = "subscribe_option_greeks")]
759    #[pyo3(signature = (instrument_id, interval=None))]
760    fn py_subscribe_option_greeks<'py>(
761        &self,
762        py: Python<'py>,
763        instrument_id: InstrumentId,
764        interval: Option<DeribitUpdateInterval>,
765    ) -> PyResult<Bound<'py, PyAny>> {
766        self.add_option_greeks_sub(instrument_id);
767        let client = self.clone();
768
769        pyo3_async_runtimes::tokio::future_into_py(py, async move {
770            client
771                .subscribe_ticker(instrument_id, interval)
772                .await
773                .map_err(to_pyvalue_err)
774        })
775    }
776
777    /// Unsubscribes from option greeks for the given instrument.
778    ///
779    /// Removes the instrument from the `option_greeks_subs` set and unsubscribes
780    /// from the ticker channel.
781    #[pyo3(name = "unsubscribe_option_greeks")]
782    #[pyo3(signature = (instrument_id, interval=None))]
783    fn py_unsubscribe_option_greeks<'py>(
784        &self,
785        py: Python<'py>,
786        instrument_id: InstrumentId,
787        interval: Option<DeribitUpdateInterval>,
788    ) -> PyResult<Bound<'py, PyAny>> {
789        self.remove_option_greeks_sub(&instrument_id);
790        let client = self.clone();
791
792        pyo3_async_runtimes::tokio::future_into_py(py, async move {
793            client
794                .unsubscribe_ticker(instrument_id, interval)
795                .await
796                .map_err(to_pyvalue_err)
797        })
798    }
799
800    /// Subscribes to quote (best bid/ask) updates for an instrument.
801    ///
802    /// Note: Quote channel does not support interval parameter.
803    #[pyo3(name = "subscribe_quotes")]
804    fn py_subscribe_quotes<'py>(
805        &self,
806        py: Python<'py>,
807        instrument_id: InstrumentId,
808    ) -> PyResult<Bound<'py, PyAny>> {
809        let client = self.clone();
810
811        pyo3_async_runtimes::tokio::future_into_py(py, async move {
812            client
813                .subscribe_quotes(instrument_id)
814                .await
815                .map_err(to_pyvalue_err)
816        })
817    }
818
819    /// Unsubscribes from quote updates for an instrument.
820    #[pyo3(name = "unsubscribe_quotes")]
821    fn py_unsubscribe_quotes<'py>(
822        &self,
823        py: Python<'py>,
824        instrument_id: InstrumentId,
825    ) -> PyResult<Bound<'py, PyAny>> {
826        let client = self.clone();
827
828        pyo3_async_runtimes::tokio::future_into_py(py, async move {
829            client
830                .unsubscribe_quotes(instrument_id)
831                .await
832                .map_err(to_pyvalue_err)
833        })
834    }
835
836    /// Subscribes to user order updates for all instruments.
837    ///
838    /// Requires authentication. Subscribes to `user.orders.any.any.raw` channel.
839    ///
840    /// # Errors
841    ///
842    /// Returns an error if client is not authenticated or subscription fails.
843    #[pyo3(name = "subscribe_user_orders")]
844    fn py_subscribe_user_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
845        let client = self.clone();
846
847        pyo3_async_runtimes::tokio::future_into_py(py, async move {
848            client.subscribe_user_orders().await.map_err(to_pyvalue_err)
849        })
850    }
851
852    /// Unsubscribes from user order updates for all instruments.
853    ///
854    /// # Errors
855    ///
856    /// Returns an error if unsubscription fails.
857    #[pyo3(name = "unsubscribe_user_orders")]
858    fn py_unsubscribe_user_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
859        let client = self.clone();
860
861        pyo3_async_runtimes::tokio::future_into_py(py, async move {
862            client
863                .unsubscribe_user_orders()
864                .await
865                .map_err(to_pyvalue_err)
866        })
867    }
868
869    /// Subscribes to user trade/fill updates for all instruments.
870    ///
871    /// Requires authentication. Subscribes to `user.trades.any.any.raw` channel.
872    ///
873    /// # Errors
874    ///
875    /// Returns an error if client is not authenticated or subscription fails.
876    #[pyo3(name = "subscribe_user_trades")]
877    fn py_subscribe_user_trades<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
878        let client = self.clone();
879
880        pyo3_async_runtimes::tokio::future_into_py(py, async move {
881            client.subscribe_user_trades().await.map_err(to_pyvalue_err)
882        })
883    }
884
885    /// Unsubscribes from user trade/fill updates for all instruments.
886    ///
887    /// # Errors
888    ///
889    /// Returns an error if unsubscription fails.
890    #[pyo3(name = "unsubscribe_user_trades")]
891    fn py_unsubscribe_user_trades<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
892        let client = self.clone();
893
894        pyo3_async_runtimes::tokio::future_into_py(py, async move {
895            client
896                .unsubscribe_user_trades()
897                .await
898                .map_err(to_pyvalue_err)
899        })
900    }
901
902    /// Subscribes to user portfolio updates for all currencies.
903    ///
904    /// Requires authentication. Subscribes to `user.portfolio.any` channel which
905    /// provides real-time account balance and margin updates for all currencies
906    /// (BTC, ETH, USDC, USDT, etc.).
907    ///
908    /// # Errors
909    ///
910    /// Returns an error if client is not authenticated or subscription fails.
911    #[pyo3(name = "subscribe_user_portfolio")]
912    fn py_subscribe_user_portfolio<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
913        let client = self.clone();
914
915        pyo3_async_runtimes::tokio::future_into_py(py, async move {
916            client
917                .subscribe_user_portfolio()
918                .await
919                .map_err(to_pyvalue_err)
920        })
921    }
922
923    /// Unsubscribes from user portfolio updates for all currencies.
924    ///
925    /// # Errors
926    ///
927    /// Returns an error if unsubscription fails.
928    #[pyo3(name = "unsubscribe_user_portfolio")]
929    fn py_unsubscribe_user_portfolio<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
930        let client = self.clone();
931
932        pyo3_async_runtimes::tokio::future_into_py(py, async move {
933            client
934                .unsubscribe_user_portfolio()
935                .await
936                .map_err(to_pyvalue_err)
937        })
938    }
939
940    /// Subscribes to multiple channels at once.
941    #[pyo3(name = "subscribe")]
942    fn py_subscribe<'py>(
943        &self,
944        py: Python<'py>,
945        channels: Vec<String>,
946    ) -> PyResult<Bound<'py, PyAny>> {
947        let client = self.clone();
948
949        pyo3_async_runtimes::tokio::future_into_py(py, async move {
950            client.subscribe(channels).await.map_err(to_pyvalue_err)
951        })
952    }
953
954    /// Unsubscribes from multiple channels at once.
955    #[pyo3(name = "unsubscribe")]
956    fn py_unsubscribe<'py>(
957        &self,
958        py: Python<'py>,
959        channels: Vec<String>,
960    ) -> PyResult<Bound<'py, PyAny>> {
961        let client = self.clone();
962
963        pyo3_async_runtimes::tokio::future_into_py(py, async move {
964            client.unsubscribe(channels).await.map_err(to_pyvalue_err)
965        })
966    }
967
968    #[pyo3(name = "subscribe_perpetual_interest_rates")]
969    #[pyo3(signature = (instrument_id, interval=None))]
970    fn py_subscribe_perpetual_interest_rates<'py>(
971        &self,
972        py: Python<'py>,
973        instrument_id: InstrumentId,
974        interval: Option<DeribitUpdateInterval>,
975    ) -> PyResult<Bound<'py, PyAny>> {
976        let client = self.clone();
977
978        pyo3_async_runtimes::tokio::future_into_py(py, async move {
979            client
980                .subscribe_perpetual_interests_rates_updates(instrument_id, interval)
981                .await
982                .map_err(to_pyvalue_err)
983        })
984    }
985
986    #[pyo3(name = "unsubscribe_perpetual_interest_rates")]
987    #[pyo3(signature = (instrument_id, interval=None))]
988    fn py_unsubscribe_perpetual_interest_rates<'py>(
989        &self,
990        py: Python<'py>,
991        instrument_id: InstrumentId,
992        interval: Option<DeribitUpdateInterval>,
993    ) -> PyResult<Bound<'py, PyAny>> {
994        let client = self.clone();
995
996        pyo3_async_runtimes::tokio::future_into_py(py, async move {
997            client
998                .unsubscribe_perpetual_interest_rates_updates(instrument_id, interval)
999                .await
1000                .map_err(to_pyvalue_err)
1001        })
1002    }
1003
1004    /// Subscribes to instrument status changes for lifecycle notifications.
1005    ///
1006    /// Channel format: `instrument.state.{kind}.{currency}`
1007    #[pyo3(name = "subscribe_instrument_status")]
1008    fn py_subscribe_instrument_status<'py>(
1009        &self,
1010        py: Python<'py>,
1011        instrument_id: InstrumentId,
1012    ) -> PyResult<Bound<'py, PyAny>> {
1013        let client = self.clone();
1014        let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
1015
1016        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1017            client
1018                .subscribe_instrument_status(&kind, &currency)
1019                .await
1020                .map_err(to_pyvalue_err)
1021        })
1022    }
1023
1024    /// Unsubscribes from instrument status changes.
1025    #[pyo3(name = "unsubscribe_instrument_status")]
1026    fn py_unsubscribe_instrument_status<'py>(
1027        &self,
1028        py: Python<'py>,
1029        instrument_id: InstrumentId,
1030    ) -> PyResult<Bound<'py, PyAny>> {
1031        let client = self.clone();
1032        let (kind, currency) = parse_instrument_kind_currency(&instrument_id);
1033
1034        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1035            client
1036                .unsubscribe_instrument_status(&kind, &currency)
1037                .await
1038                .map_err(to_pyvalue_err)
1039        })
1040    }
1041
1042    /// Subscribes to chart/OHLC bar updates for an instrument.
1043    ///
1044    /// # Arguments
1045    ///
1046    /// * `instrument_id` - The instrument to subscribe to
1047    /// * `resolution` - Bar resolution: "1", "3", "5", "10", "15", "30", "60", "120", "180",
1048    ///   "360", "720", "1D" (minutes or 1D for daily)
1049    #[pyo3(name = "subscribe_chart")]
1050    fn py_subscribe_chart<'py>(
1051        &self,
1052        py: Python<'py>,
1053        instrument_id: InstrumentId,
1054        resolution: String,
1055    ) -> PyResult<Bound<'py, PyAny>> {
1056        let client = self.clone();
1057
1058        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1059            client
1060                .subscribe_chart(instrument_id, &resolution)
1061                .await
1062                .map_err(to_pyvalue_err)
1063        })
1064    }
1065
1066    /// Unsubscribes from chart/OHLC bar updates.
1067    #[pyo3(name = "unsubscribe_chart")]
1068    fn py_unsubscribe_chart<'py>(
1069        &self,
1070        py: Python<'py>,
1071        instrument_id: InstrumentId,
1072        resolution: String,
1073    ) -> PyResult<Bound<'py, PyAny>> {
1074        let client = self.clone();
1075
1076        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1077            client
1078                .unsubscribe_chart(instrument_id, &resolution)
1079                .await
1080                .map_err(to_pyvalue_err)
1081        })
1082    }
1083
1084    /// Subscribes to bar updates for an instrument using a BarType specification.
1085    ///
1086    /// Converts the BarType to the nearest supported Deribit resolution and subscribes
1087    /// to the chart channel.
1088    #[pyo3(name = "subscribe_bars")]
1089    fn py_subscribe_bars<'py>(
1090        &self,
1091        py: Python<'py>,
1092        bar_type: BarType,
1093    ) -> PyResult<Bound<'py, PyAny>> {
1094        let client = self.clone();
1095
1096        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1097            client
1098                .subscribe_bars(bar_type)
1099                .await
1100                .map_err(to_pyvalue_err)
1101        })
1102    }
1103
1104    /// Unsubscribes from bar updates for an instrument using a BarType specification.
1105    #[pyo3(name = "unsubscribe_bars")]
1106    fn py_unsubscribe_bars<'py>(
1107        &self,
1108        py: Python<'py>,
1109        bar_type: BarType,
1110    ) -> PyResult<Bound<'py, PyAny>> {
1111        let client = self.clone();
1112
1113        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1114            client
1115                .unsubscribe_bars(bar_type)
1116                .await
1117                .map_err(to_pyvalue_err)
1118        })
1119    }
1120
1121    /// Submits an order to Deribit via WebSocket.
1122    ///
1123    /// Routes to `private/buy` or `private/sell` JSON-RPC method based on order side.
1124    /// Requires authentication (call `authenticate_session()` first).
1125    #[pyo3(name = "submit_order")]
1126    #[pyo3(signature = (
1127        order_side,
1128        quantity,
1129        order_type,
1130        client_order_id,
1131        trader_id,
1132        strategy_id,
1133        instrument_id,
1134        price=None,
1135        time_in_force=None,
1136        post_only=false,
1137        reduce_only=false,
1138        trigger_price=None,
1139        trigger_type=None,
1140    ))]
1141    #[expect(clippy::too_many_arguments)]
1142    fn py_submit_order<'py>(
1143        &self,
1144        py: Python<'py>,
1145        order_side: OrderSide,
1146        quantity: Quantity,
1147        order_type: OrderType,
1148        client_order_id: ClientOrderId,
1149        trader_id: TraderId,
1150        strategy_id: StrategyId,
1151        instrument_id: InstrumentId,
1152        price: Option<Price>,
1153        time_in_force: Option<TimeInForce>,
1154        post_only: bool,
1155        reduce_only: bool,
1156        trigger_price: Option<Price>,
1157        trigger_type: Option<TriggerType>,
1158    ) -> PyResult<Bound<'py, PyAny>> {
1159        let client = self.clone();
1160        let instrument_name = instrument_id.symbol.to_string();
1161
1162        // Convert Nautilus TimeInForce to Deribit format
1163        let deribit_tif = time_in_force
1164            .map(|tif| {
1165                DeribitTimeInForce::try_from(tif)
1166                    .map(|deribit_tif| deribit_tif.as_str().to_string())
1167            })
1168            .transpose()
1169            .map_err(to_pyvalue_err)?;
1170
1171        let params = DeribitOrderParams {
1172            instrument_name,
1173            amount: quantity.as_decimal(),
1174            order_type: order_type.to_string().to_lowercase(),
1175            label: Some(client_order_id.to_string()),
1176            price: price.map(|p| p.as_decimal()),
1177            time_in_force: deribit_tif,
1178            post_only: if post_only { Some(true) } else { None },
1179            reject_post_only: if post_only { Some(true) } else { None },
1180            reduce_only: if reduce_only { Some(true) } else { None },
1181            trigger_price: trigger_price.map(|p| p.as_decimal()),
1182            trigger: resolve_trigger_type(trigger_type),
1183            max_show: None,
1184            valid_until: None,
1185        };
1186
1187        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1188            client
1189                .submit_order(
1190                    order_side,
1191                    params,
1192                    client_order_id,
1193                    trader_id,
1194                    strategy_id,
1195                    instrument_id,
1196                )
1197                .await
1198                .map_err(to_pyruntime_err)?;
1199            Ok(())
1200        })
1201    }
1202
1203    /// Modifies an existing order on Deribit via WebSocket.
1204    ///
1205    /// The order parameters are sent using the `private/edit` JSON-RPC method.
1206    /// Requires authentication (call `authenticate_session()` first).
1207    #[pyo3(name = "modify_order")]
1208    #[expect(clippy::too_many_arguments)]
1209    fn py_modify_order<'py>(
1210        &self,
1211        py: Python<'py>,
1212        order_id: String,
1213        quantity: Quantity,
1214        price: Price,
1215        client_order_id: ClientOrderId,
1216        trader_id: TraderId,
1217        strategy_id: StrategyId,
1218        instrument_id: InstrumentId,
1219    ) -> PyResult<Bound<'py, PyAny>> {
1220        let client = self.clone();
1221
1222        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1223            client
1224                .modify_order(
1225                    &order_id,
1226                    quantity,
1227                    price,
1228                    client_order_id,
1229                    trader_id,
1230                    strategy_id,
1231                    instrument_id,
1232                )
1233                .await
1234                .map_err(to_pyruntime_err)?;
1235            Ok(())
1236        })
1237    }
1238
1239    /// Cancels an existing order on Deribit via WebSocket.
1240    ///
1241    /// The order is cancelled using the `private/cancel` JSON-RPC method.
1242    /// Requires authentication (call `authenticate_session()` first).
1243    #[pyo3(name = "cancel_order")]
1244    fn py_cancel_order<'py>(
1245        &self,
1246        py: Python<'py>,
1247        order_id: String,
1248        client_order_id: ClientOrderId,
1249        trader_id: TraderId,
1250        strategy_id: StrategyId,
1251        instrument_id: InstrumentId,
1252    ) -> PyResult<Bound<'py, PyAny>> {
1253        let client = self.clone();
1254
1255        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1256            client
1257                .cancel_order(
1258                    &order_id,
1259                    client_order_id,
1260                    trader_id,
1261                    strategy_id,
1262                    instrument_id,
1263                )
1264                .await
1265                .map_err(to_pyruntime_err)?;
1266            Ok(())
1267        })
1268    }
1269
1270    /// Cancels all orders for a specific instrument on Deribit via WebSocket.
1271    ///
1272    /// Uses the `private/cancel_all_by_instrument` JSON-RPC method.
1273    /// Requires authentication (call `authenticate_session()` first).
1274    #[pyo3(name = "cancel_all_orders")]
1275    #[pyo3(signature = (instrument_id, order_type=None))]
1276    fn py_cancel_all_orders<'py>(
1277        &self,
1278        py: Python<'py>,
1279        instrument_id: InstrumentId,
1280        order_type: Option<String>,
1281    ) -> PyResult<Bound<'py, PyAny>> {
1282        let client = self.clone();
1283
1284        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1285            client
1286                .cancel_all_orders(instrument_id, order_type)
1287                .await
1288                .map_err(to_pyruntime_err)?;
1289            Ok(())
1290        })
1291    }
1292
1293    /// Queries the state of an order on Deribit via WebSocket.
1294    ///
1295    /// Uses the `private/get_order_state` JSON-RPC method.
1296    /// Requires authentication (call `authenticate_session()` first).
1297    #[pyo3(name = "query_order")]
1298    fn py_query_order<'py>(
1299        &self,
1300        py: Python<'py>,
1301        order_id: String,
1302        client_order_id: ClientOrderId,
1303        trader_id: TraderId,
1304        strategy_id: StrategyId,
1305        instrument_id: InstrumentId,
1306    ) -> PyResult<Bound<'py, PyAny>> {
1307        let client = self.clone();
1308
1309        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1310            client
1311                .query_order(
1312                    &order_id,
1313                    client_order_id,
1314                    trader_id,
1315                    strategy_id,
1316                    instrument_id,
1317                )
1318                .await
1319                .map_err(to_pyruntime_err)?;
1320            Ok(())
1321        })
1322    }
1323}